// Package durableevent defines the shared contract for critical cross-service // events that must survive process restarts and temporary subscriber outages. package durableevent import ( "crypto/sha256" "encoding/hex" "encoding/json" "errors" "fmt" "regexp" "strings" "time" ) const ( DefaultSchemaVersion = 1 MaxTokenLength = 120 MaxIDLength = 255 MaxPayloadBytes = 64 * 1024 ) var tokenRe = regexp.MustCompile(`^[a-z][a-z0-9_.-]{1,119}$`) // Event is the stable wire shape for durable cross-service events. // // It is intentionally transport-neutral: the same JSON can be written to // Redis Streams, a database outbox, or a journal table. type Event struct { EventType string `json:"event_type"` SchemaVersion int `json:"schema_version"` ActorUserID string `json:"actor_user_id,omitempty"` ActorName string `json:"actor_name,omitempty"` EntityType string `json:"entity_type"` EntityID string `json:"entity_id"` RelatedType string `json:"related_entity_type,omitempty"` RelatedID string `json:"related_entity_id,omitempty"` RequestID string `json:"request_id,omitempty"` IdempotencyKey string `json:"idempotency_key"` OccurredAt time.Time `json:"occurred_at"` Payload json.RawMessage `json:"payload,omitempty"` } func New(eventType, entityType, entityID string, payload any) (Event, error) { raw, err := Payload(payload) if err != nil { return Event{}, err } event := Event{ EventType: eventType, SchemaVersion: DefaultSchemaVersion, EntityType: entityType, EntityID: entityID, OccurredAt: time.Now().UTC(), Payload: raw, } event.IdempotencyKey = BuildIdempotencyKey(event.EventType, event.EntityType, event.EntityID) return event, nil } func (e Event) Normalize() Event { e.EventType = strings.TrimSpace(e.EventType) e.ActorUserID = strings.TrimSpace(e.ActorUserID) e.ActorName = truncate(strings.TrimSpace(e.ActorName), MaxIDLength) e.EntityType = strings.TrimSpace(e.EntityType) e.EntityID = truncate(strings.TrimSpace(e.EntityID), MaxIDLength) e.RelatedType = strings.TrimSpace(e.RelatedType) e.RelatedID = truncate(strings.TrimSpace(e.RelatedID), MaxIDLength) e.RequestID = truncate(strings.TrimSpace(e.RequestID), MaxIDLength) e.IdempotencyKey = truncate(strings.TrimSpace(e.IdempotencyKey), MaxIDLength) if e.SchemaVersion == 0 { e.SchemaVersion = DefaultSchemaVersion } if !e.OccurredAt.IsZero() { e.OccurredAt = e.OccurredAt.UTC() } return e } func (e Event) Validate() error { e = e.Normalize() if !ValidToken(e.EventType) { return fmt.Errorf("invalid event_type %q", e.EventType) } if !ValidToken(e.EntityType) { return fmt.Errorf("invalid entity_type %q", e.EntityType) } if e.EntityID == "" { return errors.New("entity_id is required") } if e.IdempotencyKey == "" { return errors.New("idempotency_key is required") } if e.SchemaVersion < 1 { return errors.New("schema_version must be positive") } if e.OccurredAt.IsZero() { return errors.New("occurred_at is required") } if len(e.Payload) > MaxPayloadBytes { return fmt.Errorf("payload exceeds %d bytes", MaxPayloadBytes) } if len(e.Payload) > 0 && !json.Valid(e.Payload) { return errors.New("payload must be valid JSON") } return nil } func ValidToken(value string) bool { value = strings.TrimSpace(value) return len(value) <= MaxTokenLength && tokenRe.MatchString(value) } func Payload(value any) (json.RawMessage, error) { if value == nil { return nil, nil } if raw, ok := value.(json.RawMessage); ok { if len(raw) > 0 && !json.Valid(raw) { return nil, errors.New("payload must be valid JSON") } return raw, nil } raw, err := json.Marshal(value) if err != nil { return nil, fmt.Errorf("marshal payload: %w", err) } return raw, nil } func BuildIdempotencyKey(parts ...string) string { h := sha256.New() for _, part := range parts { part = strings.TrimSpace(part) if part == "" { continue } _, _ = h.Write([]byte(part)) _, _ = h.Write([]byte{0}) } return hex.EncodeToString(h.Sum(nil)) } func truncate(value string, max int) string { runes := []rune(value) if len(runes) <= max { return value } return string(runes[:max]) }