eventbus: pub/sub поверх Redis с типизированным Event-конвертом
(id/type/service/occurred_at/actor/payload). At-most-once семантика
(Redis pub/sub без offset'ов). Конвенция топиков: "{service}.{event}".
Publisher autopopulate'ит id (UUID4), service и occurred_at.
PublishTyped[T] — helper для маршалинга typed payload-структуры.
redisx: shared go-redis клиент с едиными pool/timeout-опциями
(заменяет дубли в booking/deals/hhru/webhooks-apps).
147 lines
5.8 KiB
Go
147 lines
5.8 KiB
Go
// Package eventbus — лёгкая cross-service шина событий поверх Redis
|
||
// pub/sub. Используется для уведомлений между сервисами без жёсткого
|
||
// HTTP-coupling'а (publisher не знает кто слушает).
|
||
//
|
||
// Семантика: at-most-once delivery — Redis pub/sub не сохраняет
|
||
// сообщения для disconnect'нутых подписчиков. Если нужна гарантия —
|
||
// ставим вторую сторону (consumer-side journal или Streams). Пока MVP.
|
||
//
|
||
// Конвенция топиков: "{service}.{event}", например "tasks.task.completed".
|
||
// Service-prefix защищает от случайных коллизий имён.
|
||
package eventbus
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"log/slog"
|
||
"time"
|
||
|
||
"github.com/google/uuid"
|
||
"github.com/redis/go-redis/v9"
|
||
)
|
||
|
||
// Event — общая обёртка для всех событий. Payload — JSON, специфика на
|
||
// уровне consumer'а (он типизирует через json.Unmarshal в свою struct).
|
||
type Event struct {
|
||
ID string `json:"id"` // UUID4 для idempotency на стороне consumer'а
|
||
Type string `json:"type"` // напр. "task.completed"
|
||
Service string `json:"service"` // источник: "tasks", "candidates", ...
|
||
OccurredAt time.Time `json:"occurred_at"`
|
||
ActorID string `json:"actor_id,omitempty"`
|
||
ActorName string `json:"actor_name,omitempty"`
|
||
Payload json.RawMessage `json:"payload,omitempty"`
|
||
}
|
||
|
||
// Bus — публикация + подписка. Один интерфейс, чтобы в тестах можно было
|
||
// заменить на in-memory mock без Redis.
|
||
type Bus interface {
|
||
// Publish — fire-and-forget; возвращает ошибку только если не удалось
|
||
// отправить в Redis. У consumer'ов ничего не гарантируется.
|
||
Publish(ctx context.Context, topic string, event Event) error
|
||
// Subscribe — блокирующая подписка. Завершается при отмене ctx или
|
||
// фатальной ошибке. Handler'ы вызываются последовательно в одной
|
||
// горутине на топик; для параллельной обработки — буферизация на
|
||
// стороне handler'а.
|
||
Subscribe(ctx context.Context, topic string, handler Handler) error
|
||
Close() error
|
||
}
|
||
|
||
// Handler — обработчик одного события. Возврат error логируется как
|
||
// warning; повторной доставки нет (at-most-once семантика Redis).
|
||
type Handler func(ctx context.Context, event Event) error
|
||
|
||
// redisBus — реализация на Redis pub/sub.
|
||
type redisBus struct {
|
||
client *redis.Client
|
||
service string // имя сервиса-источника, проставляется в Event.Service
|
||
}
|
||
|
||
// New — клиент должен быть pre-pinged (см. portal-common/redisx).
|
||
// service используется как default-значение Event.Service для Publish'а.
|
||
func New(client *redis.Client, service string) Bus {
|
||
return &redisBus{client: client, service: service}
|
||
}
|
||
|
||
func (b *redisBus) Publish(ctx context.Context, topic string, event Event) error {
|
||
if event.ID == "" {
|
||
event.ID = uuid.NewString()
|
||
}
|
||
if event.Service == "" {
|
||
event.Service = b.service
|
||
}
|
||
if event.OccurredAt.IsZero() {
|
||
event.OccurredAt = time.Now().UTC()
|
||
}
|
||
body, err := json.Marshal(event)
|
||
if err != nil {
|
||
return fmt.Errorf("marshal event: %w", err)
|
||
}
|
||
if err := b.client.Publish(ctx, topic, body).Err(); err != nil {
|
||
return fmt.Errorf("redis publish %s: %w", topic, err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (b *redisBus) Subscribe(ctx context.Context, topic string, handler Handler) error {
|
||
if handler == nil {
|
||
return fmt.Errorf("handler is nil")
|
||
}
|
||
sub := b.client.Subscribe(ctx, topic)
|
||
defer func() { _ = sub.Close() }()
|
||
// Receive() блокирует до первого сообщения или отмены ctx.
|
||
if _, err := sub.Receive(ctx); err != nil {
|
||
return fmt.Errorf("subscribe %s: %w", topic, err)
|
||
}
|
||
ch := sub.Channel()
|
||
slog.Info("eventbus subscribed", "topic", topic)
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return nil
|
||
case msg, ok := <-ch:
|
||
if !ok {
|
||
return nil
|
||
}
|
||
var event Event
|
||
if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
|
||
slog.Warn("eventbus: invalid payload", "topic", topic, "err", err, "raw", msg.Payload)
|
||
continue
|
||
}
|
||
if err := handler(ctx, event); err != nil {
|
||
slog.Warn("eventbus: handler error",
|
||
"topic", topic, "event_id", event.ID, "event_type", event.Type, "err", err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (b *redisBus) Close() error {
|
||
// Redis client закрывается caller'ом (он же его создал) — здесь только
|
||
// no-op, чтобы Bus можно было унести в struct без отдельного управления
|
||
// жизненным циклом client'а.
|
||
return nil
|
||
}
|
||
|
||
// PublishTyped — удобный helper: автоматически marshal'ит payload-структуру.
|
||
// Использовать когда payload — это типизированная struct.
|
||
func PublishTyped[T any](ctx context.Context, b Bus, topic, eventType string, actor Actor, payload T) error {
|
||
raw, err := json.Marshal(payload)
|
||
if err != nil {
|
||
return fmt.Errorf("marshal payload: %w", err)
|
||
}
|
||
return b.Publish(ctx, topic, Event{
|
||
Type: eventType,
|
||
ActorID: actor.ID,
|
||
ActorName: actor.Name,
|
||
Payload: raw,
|
||
})
|
||
}
|
||
|
||
// Actor — простой пара id/name. Помогает не писать каждый раз
|
||
// Event{ActorID: ..., ActorName: ...}.
|
||
type Actor struct {
|
||
ID string
|
||
Name string
|
||
}
|