add eventbus + redisx
eventbus: pub/sub поверх Redis с типизированным Event-конвертом
(id/type/service/occurred_at/actor/payload). At-most-once семантика
(Redis pub/sub без offset'ов). Конвенция топиков: "{service}.{event}".
Publisher autopopulate'ит id (UUID4), service и occurred_at.
PublishTyped[T] — helper для маршалинга typed payload-структуры.
redisx: shared go-redis клиент с едиными pool/timeout-опциями
(заменяет дубли в booking/deals/hhru/webhooks-apps).
This commit is contained in:
146
eventbus/bus.go
Normal file
146
eventbus/bus.go
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
// Package eventbus — лёгкая cross-service шина событий поверх Redis
|
||||||
|
// pub/sub. Используется для уведомлений между сервисами без жёсткого
|
||||||
|
// HTTP-coupling'а (publisher не знает кто слушает).
|
||||||
|
//
|
||||||
|
// Семантика: at-most-once delivery — Redis pub/sub не сохраняет
|
||||||
|
// сообщения для disconnect'нутых подписчиков. Если нужна гарантия —
|
||||||
|
// ставим вторую сторону (consumer-side journal или Streams). Пока MVP.
|
||||||
|
//
|
||||||
|
// Конвенция топиков: "{service}.{event}", например "tasks.task.completed".
|
||||||
|
// Service-prefix защищает от случайных коллизий имён.
|
||||||
|
package eventbus
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Event — общая обёртка для всех событий. Payload — JSON, специфика на
|
||||||
|
// уровне consumer'а (он типизирует через json.Unmarshal в свою struct).
|
||||||
|
type Event struct {
|
||||||
|
ID string `json:"id"` // UUID4 для idempotency на стороне consumer'а
|
||||||
|
Type string `json:"type"` // напр. "task.completed"
|
||||||
|
Service string `json:"service"` // источник: "tasks", "candidates", ...
|
||||||
|
OccurredAt time.Time `json:"occurred_at"`
|
||||||
|
ActorID string `json:"actor_id,omitempty"`
|
||||||
|
ActorName string `json:"actor_name,omitempty"`
|
||||||
|
Payload json.RawMessage `json:"payload,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bus — публикация + подписка. Один интерфейс, чтобы в тестах можно было
|
||||||
|
// заменить на in-memory mock без Redis.
|
||||||
|
type Bus interface {
|
||||||
|
// Publish — fire-and-forget; возвращает ошибку только если не удалось
|
||||||
|
// отправить в Redis. У consumer'ов ничего не гарантируется.
|
||||||
|
Publish(ctx context.Context, topic string, event Event) error
|
||||||
|
// Subscribe — блокирующая подписка. Завершается при отмене ctx или
|
||||||
|
// фатальной ошибке. Handler'ы вызываются последовательно в одной
|
||||||
|
// горутине на топик; для параллельной обработки — буферизация на
|
||||||
|
// стороне handler'а.
|
||||||
|
Subscribe(ctx context.Context, topic string, handler Handler) error
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler — обработчик одного события. Возврат error логируется как
|
||||||
|
// warning; повторной доставки нет (at-most-once семантика Redis).
|
||||||
|
type Handler func(ctx context.Context, event Event) error
|
||||||
|
|
||||||
|
// redisBus — реализация на Redis pub/sub.
|
||||||
|
type redisBus struct {
|
||||||
|
client *redis.Client
|
||||||
|
service string // имя сервиса-источника, проставляется в Event.Service
|
||||||
|
}
|
||||||
|
|
||||||
|
// New — клиент должен быть pre-pinged (см. portal-common/redisx).
|
||||||
|
// service используется как default-значение Event.Service для Publish'а.
|
||||||
|
func New(client *redis.Client, service string) Bus {
|
||||||
|
return &redisBus{client: client, service: service}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *redisBus) Publish(ctx context.Context, topic string, event Event) error {
|
||||||
|
if event.ID == "" {
|
||||||
|
event.ID = uuid.NewString()
|
||||||
|
}
|
||||||
|
if event.Service == "" {
|
||||||
|
event.Service = b.service
|
||||||
|
}
|
||||||
|
if event.OccurredAt.IsZero() {
|
||||||
|
event.OccurredAt = time.Now().UTC()
|
||||||
|
}
|
||||||
|
body, err := json.Marshal(event)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal event: %w", err)
|
||||||
|
}
|
||||||
|
if err := b.client.Publish(ctx, topic, body).Err(); err != nil {
|
||||||
|
return fmt.Errorf("redis publish %s: %w", topic, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *redisBus) Subscribe(ctx context.Context, topic string, handler Handler) error {
|
||||||
|
if handler == nil {
|
||||||
|
return fmt.Errorf("handler is nil")
|
||||||
|
}
|
||||||
|
sub := b.client.Subscribe(ctx, topic)
|
||||||
|
defer func() { _ = sub.Close() }()
|
||||||
|
// Receive() блокирует до первого сообщения или отмены ctx.
|
||||||
|
if _, err := sub.Receive(ctx); err != nil {
|
||||||
|
return fmt.Errorf("subscribe %s: %w", topic, err)
|
||||||
|
}
|
||||||
|
ch := sub.Channel()
|
||||||
|
slog.Info("eventbus subscribed", "topic", topic)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case msg, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var event Event
|
||||||
|
if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
|
||||||
|
slog.Warn("eventbus: invalid payload", "topic", topic, "err", err, "raw", msg.Payload)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := handler(ctx, event); err != nil {
|
||||||
|
slog.Warn("eventbus: handler error",
|
||||||
|
"topic", topic, "event_id", event.ID, "event_type", event.Type, "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *redisBus) Close() error {
|
||||||
|
// Redis client закрывается caller'ом (он же его создал) — здесь только
|
||||||
|
// no-op, чтобы Bus можно было унести в struct без отдельного управления
|
||||||
|
// жизненным циклом client'а.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// PublishTyped — удобный helper: автоматически marshal'ит payload-структуру.
|
||||||
|
// Использовать когда payload — это типизированная struct.
|
||||||
|
func PublishTyped[T any](ctx context.Context, b Bus, topic, eventType string, actor Actor, payload T) error {
|
||||||
|
raw, err := json.Marshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal payload: %w", err)
|
||||||
|
}
|
||||||
|
return b.Publish(ctx, topic, Event{
|
||||||
|
Type: eventType,
|
||||||
|
ActorID: actor.ID,
|
||||||
|
ActorName: actor.Name,
|
||||||
|
Payload: raw,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Actor — простой пара id/name. Помогает не писать каждый раз
|
||||||
|
// Event{ActorID: ..., ActorName: ...}.
|
||||||
|
type Actor struct {
|
||||||
|
ID string
|
||||||
|
Name string
|
||||||
|
}
|
||||||
8
go.mod
8
go.mod
@@ -10,9 +10,15 @@ module gitea.estateliga.work/admin/portal-common
|
|||||||
|
|
||||||
go 1.25.7
|
go 1.25.7
|
||||||
|
|
||||||
require github.com/jackc/pgx/v5 v5.9.1
|
require (
|
||||||
|
github.com/google/uuid v1.6.0
|
||||||
|
github.com/jackc/pgx/v5 v5.9.1
|
||||||
|
github.com/redis/go-redis/v9 v9.16.0
|
||||||
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||||
|
|||||||
12
go.sum
12
go.sum
@@ -1,6 +1,16 @@
|
|||||||
|
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||||
|
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||||
|
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||||
|
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||||
@@ -11,6 +21,8 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo
|
|||||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4=
|
||||||
|
github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
|
|||||||
86
redisx/client.go
Normal file
86
redisx/client.go
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
// Package redisx — общий go-redis клиент с едиными pool-опциями.
|
||||||
|
// Booking/deals/hhru/webhooks-apps копировали одни и те же параметры —
|
||||||
|
// теперь tweak'аем здесь, сервисы пересобираются.
|
||||||
|
package redisx
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
Addr string
|
||||||
|
Password string
|
||||||
|
DB int
|
||||||
|
PoolSize int
|
||||||
|
MinIdleConns int
|
||||||
|
MaxRetries int
|
||||||
|
DialTimeout time.Duration
|
||||||
|
ReadTimeout time.Duration
|
||||||
|
WriteTimeout time.Duration
|
||||||
|
PingTimeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Defaults — общие дефолты, повторяющие исходное поведение всех сервисов.
|
||||||
|
func Defaults() Config {
|
||||||
|
return Config{
|
||||||
|
PoolSize: 10,
|
||||||
|
MinIdleConns: 2,
|
||||||
|
MaxRetries: 3,
|
||||||
|
DialTimeout: 5 * time.Second,
|
||||||
|
ReadTimeout: 3 * time.Second,
|
||||||
|
WriteTimeout: 3 * time.Second,
|
||||||
|
PingTimeout: 5 * time.Second,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// New строит клиент и проверяет соединение через Ping. На неудачу
|
||||||
|
// возвращает ошибку — caller сам решает retry/fail-fast.
|
||||||
|
func New(cfg Config) (*redis.Client, error) {
|
||||||
|
if cfg.Addr == "" {
|
||||||
|
return nil, fmt.Errorf("redis Addr is required")
|
||||||
|
}
|
||||||
|
def := Defaults()
|
||||||
|
if cfg.PoolSize == 0 {
|
||||||
|
cfg.PoolSize = def.PoolSize
|
||||||
|
}
|
||||||
|
if cfg.MinIdleConns == 0 {
|
||||||
|
cfg.MinIdleConns = def.MinIdleConns
|
||||||
|
}
|
||||||
|
if cfg.MaxRetries == 0 {
|
||||||
|
cfg.MaxRetries = def.MaxRetries
|
||||||
|
}
|
||||||
|
if cfg.DialTimeout == 0 {
|
||||||
|
cfg.DialTimeout = def.DialTimeout
|
||||||
|
}
|
||||||
|
if cfg.ReadTimeout == 0 {
|
||||||
|
cfg.ReadTimeout = def.ReadTimeout
|
||||||
|
}
|
||||||
|
if cfg.WriteTimeout == 0 {
|
||||||
|
cfg.WriteTimeout = def.WriteTimeout
|
||||||
|
}
|
||||||
|
if cfg.PingTimeout == 0 {
|
||||||
|
cfg.PingTimeout = def.PingTimeout
|
||||||
|
}
|
||||||
|
c := redis.NewClient(&redis.Options{
|
||||||
|
Addr: cfg.Addr,
|
||||||
|
Password: cfg.Password,
|
||||||
|
DB: cfg.DB,
|
||||||
|
PoolSize: cfg.PoolSize,
|
||||||
|
MinIdleConns: cfg.MinIdleConns,
|
||||||
|
MaxRetries: cfg.MaxRetries,
|
||||||
|
DialTimeout: cfg.DialTimeout,
|
||||||
|
ReadTimeout: cfg.ReadTimeout,
|
||||||
|
WriteTimeout: cfg.WriteTimeout,
|
||||||
|
})
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), cfg.PingTimeout)
|
||||||
|
defer cancel()
|
||||||
|
if err := c.Ping(ctx).Err(); err != nil {
|
||||||
|
_ = c.Close()
|
||||||
|
return nil, fmt.Errorf("redis ping: %w", err)
|
||||||
|
}
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user