// Package eventbus — лёгкая cross-service шина событий поверх Redis // pub/sub. Используется для уведомлений между сервисами без жёсткого // HTTP-coupling'а (publisher не знает кто слушает). // // Семантика: at-most-once delivery — Redis pub/sub не сохраняет // сообщения для disconnect'нутых подписчиков. Если нужна гарантия — // ставим вторую сторону (consumer-side journal или Streams). Пока MVP. // // Конвенция топиков: "{service}.{event}", например "tasks.task.completed". // Service-prefix защищает от случайных коллизий имён. package eventbus import ( "context" "encoding/json" "fmt" "log/slog" "time" "github.com/google/uuid" "github.com/redis/go-redis/v9" ) // Event — общая обёртка для всех событий. Payload — JSON, специфика на // уровне consumer'а (он типизирует через json.Unmarshal в свою struct). type Event struct { ID string `json:"id"` // UUID4 для idempotency на стороне consumer'а Type string `json:"type"` // напр. "task.completed" Service string `json:"service"` // источник: "tasks", "candidates", ... OccurredAt time.Time `json:"occurred_at"` ActorID string `json:"actor_id,omitempty"` ActorName string `json:"actor_name,omitempty"` Payload json.RawMessage `json:"payload,omitempty"` } // Bus — публикация + подписка. Один интерфейс, чтобы в тестах можно было // заменить на in-memory mock без Redis. type Bus interface { // Publish — fire-and-forget; возвращает ошибку только если не удалось // отправить в Redis. У consumer'ов ничего не гарантируется. Publish(ctx context.Context, topic string, event Event) error // Subscribe — блокирующая подписка. Завершается при отмене ctx или // фатальной ошибке. Handler'ы вызываются последовательно в одной // горутине на топик; для параллельной обработки — буферизация на // стороне handler'а. Subscribe(ctx context.Context, topic string, handler Handler) error Close() error } // Handler — обработчик одного события. Возврат error логируется как // warning; повторной доставки нет (at-most-once семантика Redis). type Handler func(ctx context.Context, event Event) error // redisBus — реализация на Redis pub/sub. type redisBus struct { client *redis.Client service string // имя сервиса-источника, проставляется в Event.Service } // New — клиент должен быть pre-pinged (см. portal-common/redisx). // service используется как default-значение Event.Service для Publish'а. func New(client *redis.Client, service string) Bus { return &redisBus{client: client, service: service} } func (b *redisBus) Publish(ctx context.Context, topic string, event Event) error { if event.ID == "" { event.ID = uuid.NewString() } if event.Service == "" { event.Service = b.service } if event.OccurredAt.IsZero() { event.OccurredAt = time.Now().UTC() } body, err := json.Marshal(event) if err != nil { return fmt.Errorf("marshal event: %w", err) } if err := b.client.Publish(ctx, topic, body).Err(); err != nil { return fmt.Errorf("redis publish %s: %w", topic, err) } return nil } func (b *redisBus) Subscribe(ctx context.Context, topic string, handler Handler) error { if handler == nil { return fmt.Errorf("handler is nil") } sub := b.client.Subscribe(ctx, topic) defer func() { _ = sub.Close() }() // Receive() блокирует до первого сообщения или отмены ctx. if _, err := sub.Receive(ctx); err != nil { return fmt.Errorf("subscribe %s: %w", topic, err) } ch := sub.Channel() slog.Info("eventbus subscribed", "topic", topic) for { select { case <-ctx.Done(): return nil case msg, ok := <-ch: if !ok { return nil } var event Event if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil { slog.Warn("eventbus: invalid payload", "topic", topic, "err", err, "raw", msg.Payload) continue } if err := handler(ctx, event); err != nil { slog.Warn("eventbus: handler error", "topic", topic, "event_id", event.ID, "event_type", event.Type, "err", err) } } } } func (b *redisBus) Close() error { // Redis client закрывается caller'ом (он же его создал) — здесь только // no-op, чтобы Bus можно было унести в struct без отдельного управления // жизненным циклом client'а. return nil } // PublishTyped — удобный helper: автоматически marshal'ит payload-структуру. // Использовать когда payload — это типизированная struct. func PublishTyped[T any](ctx context.Context, b Bus, topic, eventType string, actor Actor, payload T) error { raw, err := json.Marshal(payload) if err != nil { return fmt.Errorf("marshal payload: %w", err) } return b.Publish(ctx, topic, Event{ Type: eventType, ActorID: actor.ID, ActorName: actor.Name, Payload: raw, }) } // Actor — простой пара id/name. Помогает не писать каждый раз // Event{ActorID: ..., ActorName: ...}. type Actor struct { ID string Name string }