eventbus: pub/sub поверх Redis с типизированным Event-конвертом
(id/type/service/occurred_at/actor/payload). At-most-once семантика
(Redis pub/sub без offset'ов). Конвенция топиков: "{service}.{event}".
Publisher autopopulate'ит id (UUID4), service и occurred_at.
PublishTyped[T] — helper для маршалинга typed payload-структуры.
redisx: shared go-redis клиент с едиными pool/timeout-опциями
(заменяет дубли в booking/deals/hhru/webhooks-apps).
87 lines
2.3 KiB
Go
87 lines
2.3 KiB
Go
// Package redisx — общий go-redis клиент с едиными pool-опциями.
|
||
// Booking/deals/hhru/webhooks-apps копировали одни и те же параметры —
|
||
// теперь tweak'аем здесь, сервисы пересобираются.
|
||
package redisx
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"time"
|
||
|
||
"github.com/redis/go-redis/v9"
|
||
)
|
||
|
||
type Config struct {
|
||
Addr string
|
||
Password string
|
||
DB int
|
||
PoolSize int
|
||
MinIdleConns int
|
||
MaxRetries int
|
||
DialTimeout time.Duration
|
||
ReadTimeout time.Duration
|
||
WriteTimeout time.Duration
|
||
PingTimeout time.Duration
|
||
}
|
||
|
||
// Defaults — общие дефолты, повторяющие исходное поведение всех сервисов.
|
||
func Defaults() Config {
|
||
return Config{
|
||
PoolSize: 10,
|
||
MinIdleConns: 2,
|
||
MaxRetries: 3,
|
||
DialTimeout: 5 * time.Second,
|
||
ReadTimeout: 3 * time.Second,
|
||
WriteTimeout: 3 * time.Second,
|
||
PingTimeout: 5 * time.Second,
|
||
}
|
||
}
|
||
|
||
// New строит клиент и проверяет соединение через Ping. На неудачу
|
||
// возвращает ошибку — caller сам решает retry/fail-fast.
|
||
func New(cfg Config) (*redis.Client, error) {
|
||
if cfg.Addr == "" {
|
||
return nil, fmt.Errorf("redis Addr is required")
|
||
}
|
||
def := Defaults()
|
||
if cfg.PoolSize == 0 {
|
||
cfg.PoolSize = def.PoolSize
|
||
}
|
||
if cfg.MinIdleConns == 0 {
|
||
cfg.MinIdleConns = def.MinIdleConns
|
||
}
|
||
if cfg.MaxRetries == 0 {
|
||
cfg.MaxRetries = def.MaxRetries
|
||
}
|
||
if cfg.DialTimeout == 0 {
|
||
cfg.DialTimeout = def.DialTimeout
|
||
}
|
||
if cfg.ReadTimeout == 0 {
|
||
cfg.ReadTimeout = def.ReadTimeout
|
||
}
|
||
if cfg.WriteTimeout == 0 {
|
||
cfg.WriteTimeout = def.WriteTimeout
|
||
}
|
||
if cfg.PingTimeout == 0 {
|
||
cfg.PingTimeout = def.PingTimeout
|
||
}
|
||
c := redis.NewClient(&redis.Options{
|
||
Addr: cfg.Addr,
|
||
Password: cfg.Password,
|
||
DB: cfg.DB,
|
||
PoolSize: cfg.PoolSize,
|
||
MinIdleConns: cfg.MinIdleConns,
|
||
MaxRetries: cfg.MaxRetries,
|
||
DialTimeout: cfg.DialTimeout,
|
||
ReadTimeout: cfg.ReadTimeout,
|
||
WriteTimeout: cfg.WriteTimeout,
|
||
})
|
||
ctx, cancel := context.WithTimeout(context.Background(), cfg.PingTimeout)
|
||
defer cancel()
|
||
if err := c.Ping(ctx).Err(); err != nil {
|
||
_ = c.Close()
|
||
return nil, fmt.Errorf("redis ping: %w", err)
|
||
}
|
||
return c, nil
|
||
}
|