From 56bfdb081ac9f462e4c25ce35f96dafba4467216 Mon Sep 17 00:00:00 2001 From: Grendgi Date: Wed, 20 May 2026 14:22:01 +0300 Subject: [PATCH] add eventbus + redisx MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit eventbus: pub/sub поверх Redis с типизированным Event-конвертом (id/type/service/occurred_at/actor/payload). At-most-once семантика (Redis pub/sub без offset'ов). Конвенция топиков: "{service}.{event}". Publisher autopopulate'ит id (UUID4), service и occurred_at. PublishTyped[T] — helper для маршалинга typed payload-структуры. redisx: shared go-redis клиент с едиными pool/timeout-опциями (заменяет дубли в booking/deals/hhru/webhooks-apps). --- eventbus/bus.go | 146 +++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 8 ++- go.sum | 12 ++++ redisx/client.go | 86 ++++++++++++++++++++++++++++ 4 files changed, 251 insertions(+), 1 deletion(-) create mode 100644 eventbus/bus.go create mode 100644 redisx/client.go diff --git a/eventbus/bus.go b/eventbus/bus.go new file mode 100644 index 0000000..257f4d6 --- /dev/null +++ b/eventbus/bus.go @@ -0,0 +1,146 @@ +// Package eventbus — лёгкая cross-service шина событий поверх Redis +// pub/sub. Используется для уведомлений между сервисами без жёсткого +// HTTP-coupling'а (publisher не знает кто слушает). +// +// Семантика: at-most-once delivery — Redis pub/sub не сохраняет +// сообщения для disconnect'нутых подписчиков. Если нужна гарантия — +// ставим вторую сторону (consumer-side journal или Streams). Пока MVP. +// +// Конвенция топиков: "{service}.{event}", например "tasks.task.completed". +// Service-prefix защищает от случайных коллизий имён. +package eventbus + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/redis/go-redis/v9" +) + +// Event — общая обёртка для всех событий. Payload — JSON, специфика на +// уровне consumer'а (он типизирует через json.Unmarshal в свою struct). +type Event struct { + ID string `json:"id"` // UUID4 для idempotency на стороне consumer'а + Type string `json:"type"` // напр. "task.completed" + Service string `json:"service"` // источник: "tasks", "candidates", ... + OccurredAt time.Time `json:"occurred_at"` + ActorID string `json:"actor_id,omitempty"` + ActorName string `json:"actor_name,omitempty"` + Payload json.RawMessage `json:"payload,omitempty"` +} + +// Bus — публикация + подписка. Один интерфейс, чтобы в тестах можно было +// заменить на in-memory mock без Redis. +type Bus interface { + // Publish — fire-and-forget; возвращает ошибку только если не удалось + // отправить в Redis. У consumer'ов ничего не гарантируется. + Publish(ctx context.Context, topic string, event Event) error + // Subscribe — блокирующая подписка. Завершается при отмене ctx или + // фатальной ошибке. Handler'ы вызываются последовательно в одной + // горутине на топик; для параллельной обработки — буферизация на + // стороне handler'а. + Subscribe(ctx context.Context, topic string, handler Handler) error + Close() error +} + +// Handler — обработчик одного события. Возврат error логируется как +// warning; повторной доставки нет (at-most-once семантика Redis). +type Handler func(ctx context.Context, event Event) error + +// redisBus — реализация на Redis pub/sub. +type redisBus struct { + client *redis.Client + service string // имя сервиса-источника, проставляется в Event.Service +} + +// New — клиент должен быть pre-pinged (см. portal-common/redisx). +// service используется как default-значение Event.Service для Publish'а. +func New(client *redis.Client, service string) Bus { + return &redisBus{client: client, service: service} +} + +func (b *redisBus) Publish(ctx context.Context, topic string, event Event) error { + if event.ID == "" { + event.ID = uuid.NewString() + } + if event.Service == "" { + event.Service = b.service + } + if event.OccurredAt.IsZero() { + event.OccurredAt = time.Now().UTC() + } + body, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("marshal event: %w", err) + } + if err := b.client.Publish(ctx, topic, body).Err(); err != nil { + return fmt.Errorf("redis publish %s: %w", topic, err) + } + return nil +} + +func (b *redisBus) Subscribe(ctx context.Context, topic string, handler Handler) error { + if handler == nil { + return fmt.Errorf("handler is nil") + } + sub := b.client.Subscribe(ctx, topic) + defer func() { _ = sub.Close() }() + // Receive() блокирует до первого сообщения или отмены ctx. + if _, err := sub.Receive(ctx); err != nil { + return fmt.Errorf("subscribe %s: %w", topic, err) + } + ch := sub.Channel() + slog.Info("eventbus subscribed", "topic", topic) + for { + select { + case <-ctx.Done(): + return nil + case msg, ok := <-ch: + if !ok { + return nil + } + var event Event + if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil { + slog.Warn("eventbus: invalid payload", "topic", topic, "err", err, "raw", msg.Payload) + continue + } + if err := handler(ctx, event); err != nil { + slog.Warn("eventbus: handler error", + "topic", topic, "event_id", event.ID, "event_type", event.Type, "err", err) + } + } + } +} + +func (b *redisBus) Close() error { + // Redis client закрывается caller'ом (он же его создал) — здесь только + // no-op, чтобы Bus можно было унести в struct без отдельного управления + // жизненным циклом client'а. + return nil +} + +// PublishTyped — удобный helper: автоматически marshal'ит payload-структуру. +// Использовать когда payload — это типизированная struct. +func PublishTyped[T any](ctx context.Context, b Bus, topic, eventType string, actor Actor, payload T) error { + raw, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshal payload: %w", err) + } + return b.Publish(ctx, topic, Event{ + Type: eventType, + ActorID: actor.ID, + ActorName: actor.Name, + Payload: raw, + }) +} + +// Actor — простой пара id/name. Помогает не писать каждый раз +// Event{ActorID: ..., ActorName: ...}. +type Actor struct { + ID string + Name string +} diff --git a/go.mod b/go.mod index 21c040f..ea68711 100644 --- a/go.mod +++ b/go.mod @@ -10,9 +10,15 @@ module gitea.estateliga.work/admin/portal-common go 1.25.7 -require github.com/jackc/pgx/v5 v5.9.1 +require ( + github.com/google/uuid v1.6.0 + github.com/jackc/pgx/v5 v5.9.1 + github.com/redis/go-redis/v9 v9.16.0 +) require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect diff --git a/go.sum b/go.sum index 8e29ab9..2aec57d 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,16 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -11,6 +21,8 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4= +github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/redisx/client.go b/redisx/client.go new file mode 100644 index 0000000..097f332 --- /dev/null +++ b/redisx/client.go @@ -0,0 +1,86 @@ +// Package redisx — общий go-redis клиент с едиными pool-опциями. +// Booking/deals/hhru/webhooks-apps копировали одни и те же параметры — +// теперь tweak'аем здесь, сервисы пересобираются. +package redisx + +import ( + "context" + "fmt" + "time" + + "github.com/redis/go-redis/v9" +) + +type Config struct { + Addr string + Password string + DB int + PoolSize int + MinIdleConns int + MaxRetries int + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + PingTimeout time.Duration +} + +// Defaults — общие дефолты, повторяющие исходное поведение всех сервисов. +func Defaults() Config { + return Config{ + PoolSize: 10, + MinIdleConns: 2, + MaxRetries: 3, + DialTimeout: 5 * time.Second, + ReadTimeout: 3 * time.Second, + WriteTimeout: 3 * time.Second, + PingTimeout: 5 * time.Second, + } +} + +// New строит клиент и проверяет соединение через Ping. На неудачу +// возвращает ошибку — caller сам решает retry/fail-fast. +func New(cfg Config) (*redis.Client, error) { + if cfg.Addr == "" { + return nil, fmt.Errorf("redis Addr is required") + } + def := Defaults() + if cfg.PoolSize == 0 { + cfg.PoolSize = def.PoolSize + } + if cfg.MinIdleConns == 0 { + cfg.MinIdleConns = def.MinIdleConns + } + if cfg.MaxRetries == 0 { + cfg.MaxRetries = def.MaxRetries + } + if cfg.DialTimeout == 0 { + cfg.DialTimeout = def.DialTimeout + } + if cfg.ReadTimeout == 0 { + cfg.ReadTimeout = def.ReadTimeout + } + if cfg.WriteTimeout == 0 { + cfg.WriteTimeout = def.WriteTimeout + } + if cfg.PingTimeout == 0 { + cfg.PingTimeout = def.PingTimeout + } + c := redis.NewClient(&redis.Options{ + Addr: cfg.Addr, + Password: cfg.Password, + DB: cfg.DB, + PoolSize: cfg.PoolSize, + MinIdleConns: cfg.MinIdleConns, + MaxRetries: cfg.MaxRetries, + DialTimeout: cfg.DialTimeout, + ReadTimeout: cfg.ReadTimeout, + WriteTimeout: cfg.WriteTimeout, + }) + ctx, cancel := context.WithTimeout(context.Background(), cfg.PingTimeout) + defer cancel() + if err := c.Ping(ctx).Err(); err != nil { + _ = c.Close() + return nil, fmt.Errorf("redis ping: %w", err) + } + return c, nil +}