Files
Grendgi 56bfdb081a add eventbus + redisx
eventbus: pub/sub поверх Redis с типизированным Event-конвертом
(id/type/service/occurred_at/actor/payload). At-most-once семантика
(Redis pub/sub без offset'ов). Конвенция топиков: "{service}.{event}".
Publisher autopopulate'ит id (UUID4), service и occurred_at.
PublishTyped[T] — helper для маршалинга typed payload-структуры.

redisx: shared go-redis клиент с едиными pool/timeout-опциями
(заменяет дубли в booking/deals/hhru/webhooks-apps).
2026-05-20 14:22:01 +03:00

147 lines
5.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package eventbus — лёгкая cross-service шина событий поверх Redis
// pub/sub. Используется для уведомлений между сервисами без жёсткого
// HTTP-coupling'а (publisher не знает кто слушает).
//
// Семантика: at-most-once delivery — Redis pub/sub не сохраняет
// сообщения для disconnect'нутых подписчиков. Если нужна гарантия —
// ставим вторую сторону (consumer-side journal или Streams). Пока MVP.
//
// Конвенция топиков: "{service}.{event}", например "tasks.task.completed".
// Service-prefix защищает от случайных коллизий имён.
package eventbus
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)
// Event — общая обёртка для всех событий. Payload — JSON, специфика на
// уровне consumer'а (он типизирует через json.Unmarshal в свою struct).
type Event struct {
ID string `json:"id"` // UUID4 для idempotency на стороне consumer'а
Type string `json:"type"` // напр. "task.completed"
Service string `json:"service"` // источник: "tasks", "candidates", ...
OccurredAt time.Time `json:"occurred_at"`
ActorID string `json:"actor_id,omitempty"`
ActorName string `json:"actor_name,omitempty"`
Payload json.RawMessage `json:"payload,omitempty"`
}
// Bus — публикация + подписка. Один интерфейс, чтобы в тестах можно было
// заменить на in-memory mock без Redis.
type Bus interface {
// Publish — fire-and-forget; возвращает ошибку только если не удалось
// отправить в Redis. У consumer'ов ничего не гарантируется.
Publish(ctx context.Context, topic string, event Event) error
// Subscribe — блокирующая подписка. Завершается при отмене ctx или
// фатальной ошибке. Handler'ы вызываются последовательно в одной
// горутине на топик; для параллельной обработки — буферизация на
// стороне handler'а.
Subscribe(ctx context.Context, topic string, handler Handler) error
Close() error
}
// Handler — обработчик одного события. Возврат error логируется как
// warning; повторной доставки нет (at-most-once семантика Redis).
type Handler func(ctx context.Context, event Event) error
// redisBus — реализация на Redis pub/sub.
type redisBus struct {
client *redis.Client
service string // имя сервиса-источника, проставляется в Event.Service
}
// New — клиент должен быть pre-pinged (см. portal-common/redisx).
// service используется как default-значение Event.Service для Publish'а.
func New(client *redis.Client, service string) Bus {
return &redisBus{client: client, service: service}
}
func (b *redisBus) Publish(ctx context.Context, topic string, event Event) error {
if event.ID == "" {
event.ID = uuid.NewString()
}
if event.Service == "" {
event.Service = b.service
}
if event.OccurredAt.IsZero() {
event.OccurredAt = time.Now().UTC()
}
body, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
if err := b.client.Publish(ctx, topic, body).Err(); err != nil {
return fmt.Errorf("redis publish %s: %w", topic, err)
}
return nil
}
func (b *redisBus) Subscribe(ctx context.Context, topic string, handler Handler) error {
if handler == nil {
return fmt.Errorf("handler is nil")
}
sub := b.client.Subscribe(ctx, topic)
defer func() { _ = sub.Close() }()
// Receive() блокирует до первого сообщения или отмены ctx.
if _, err := sub.Receive(ctx); err != nil {
return fmt.Errorf("subscribe %s: %w", topic, err)
}
ch := sub.Channel()
slog.Info("eventbus subscribed", "topic", topic)
for {
select {
case <-ctx.Done():
return nil
case msg, ok := <-ch:
if !ok {
return nil
}
var event Event
if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
slog.Warn("eventbus: invalid payload", "topic", topic, "err", err, "raw", msg.Payload)
continue
}
if err := handler(ctx, event); err != nil {
slog.Warn("eventbus: handler error",
"topic", topic, "event_id", event.ID, "event_type", event.Type, "err", err)
}
}
}
}
func (b *redisBus) Close() error {
// Redis client закрывается caller'ом (он же его создал) — здесь только
// no-op, чтобы Bus можно было унести в struct без отдельного управления
// жизненным циклом client'а.
return nil
}
// PublishTyped — удобный helper: автоматически marshal'ит payload-структуру.
// Использовать когда payload — это типизированная struct.
func PublishTyped[T any](ctx context.Context, b Bus, topic, eventType string, actor Actor, payload T) error {
raw, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal payload: %w", err)
}
return b.Publish(ctx, topic, Event{
Type: eventType,
ActorID: actor.ID,
ActorName: actor.Name,
Payload: raw,
})
}
// Actor — простой пара id/name. Помогает не писать каждый раз
// Event{ActorID: ..., ActorName: ...}.
type Actor struct {
ID string
Name string
}