diff --git a/durableevent/event.go b/durableevent/event.go new file mode 100644 index 0000000..4c99b74 --- /dev/null +++ b/durableevent/event.go @@ -0,0 +1,150 @@ +// Package durableevent defines the shared contract for critical cross-service +// events that must survive process restarts and temporary subscriber outages. +package durableevent + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "regexp" + "strings" + "time" +) + +const ( + DefaultSchemaVersion = 1 + MaxTokenLength = 120 + MaxIDLength = 255 + MaxPayloadBytes = 64 * 1024 +) + +var tokenRe = regexp.MustCompile(`^[a-z][a-z0-9_.-]{1,119}$`) + +// Event is the stable wire shape for durable cross-service events. +// +// It is intentionally transport-neutral: the same JSON can be written to +// Redis Streams, a database outbox, or a journal table. +type Event struct { + EventType string `json:"event_type"` + SchemaVersion int `json:"schema_version"` + ActorUserID string `json:"actor_user_id,omitempty"` + ActorName string `json:"actor_name,omitempty"` + EntityType string `json:"entity_type"` + EntityID string `json:"entity_id"` + RelatedType string `json:"related_entity_type,omitempty"` + RelatedID string `json:"related_entity_id,omitempty"` + RequestID string `json:"request_id,omitempty"` + IdempotencyKey string `json:"idempotency_key"` + OccurredAt time.Time `json:"occurred_at"` + Payload json.RawMessage `json:"payload,omitempty"` +} + +func New(eventType, entityType, entityID string, payload any) (Event, error) { + raw, err := Payload(payload) + if err != nil { + return Event{}, err + } + event := Event{ + EventType: eventType, + SchemaVersion: DefaultSchemaVersion, + EntityType: entityType, + EntityID: entityID, + OccurredAt: time.Now().UTC(), + Payload: raw, + } + event.IdempotencyKey = BuildIdempotencyKey(event.EventType, event.EntityType, event.EntityID) + return event, nil +} + +func (e Event) Normalize() Event { + e.EventType = strings.TrimSpace(e.EventType) + e.ActorUserID = strings.TrimSpace(e.ActorUserID) + e.ActorName = truncate(strings.TrimSpace(e.ActorName), MaxIDLength) + e.EntityType = strings.TrimSpace(e.EntityType) + e.EntityID = truncate(strings.TrimSpace(e.EntityID), MaxIDLength) + e.RelatedType = strings.TrimSpace(e.RelatedType) + e.RelatedID = truncate(strings.TrimSpace(e.RelatedID), MaxIDLength) + e.RequestID = truncate(strings.TrimSpace(e.RequestID), MaxIDLength) + e.IdempotencyKey = truncate(strings.TrimSpace(e.IdempotencyKey), MaxIDLength) + if e.SchemaVersion == 0 { + e.SchemaVersion = DefaultSchemaVersion + } + if !e.OccurredAt.IsZero() { + e.OccurredAt = e.OccurredAt.UTC() + } + return e +} + +func (e Event) Validate() error { + e = e.Normalize() + if !ValidToken(e.EventType) { + return fmt.Errorf("invalid event_type %q", e.EventType) + } + if !ValidToken(e.EntityType) { + return fmt.Errorf("invalid entity_type %q", e.EntityType) + } + if e.EntityID == "" { + return errors.New("entity_id is required") + } + if e.IdempotencyKey == "" { + return errors.New("idempotency_key is required") + } + if e.SchemaVersion < 1 { + return errors.New("schema_version must be positive") + } + if e.OccurredAt.IsZero() { + return errors.New("occurred_at is required") + } + if len(e.Payload) > MaxPayloadBytes { + return fmt.Errorf("payload exceeds %d bytes", MaxPayloadBytes) + } + if len(e.Payload) > 0 && !json.Valid(e.Payload) { + return errors.New("payload must be valid JSON") + } + return nil +} + +func ValidToken(value string) bool { + value = strings.TrimSpace(value) + return len(value) <= MaxTokenLength && tokenRe.MatchString(value) +} + +func Payload(value any) (json.RawMessage, error) { + if value == nil { + return nil, nil + } + if raw, ok := value.(json.RawMessage); ok { + if len(raw) > 0 && !json.Valid(raw) { + return nil, errors.New("payload must be valid JSON") + } + return raw, nil + } + raw, err := json.Marshal(value) + if err != nil { + return nil, fmt.Errorf("marshal payload: %w", err) + } + return raw, nil +} + +func BuildIdempotencyKey(parts ...string) string { + h := sha256.New() + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "" { + continue + } + _, _ = h.Write([]byte(part)) + _, _ = h.Write([]byte{0}) + } + return hex.EncodeToString(h.Sum(nil)) +} + +func truncate(value string, max int) string { + runes := []rune(value) + if len(runes) <= max { + return value + } + return string(runes[:max]) +} diff --git a/durableevent/event_test.go b/durableevent/event_test.go new file mode 100644 index 0000000..3880e83 --- /dev/null +++ b/durableevent/event_test.go @@ -0,0 +1,98 @@ +package durableevent + +import ( + "encoding/json" + "testing" + "time" +) + +func TestNewEventBuildsValidEvent(t *testing.T) { + event, err := New("hh.response_received", "hh_response", "resume-1", map[string]any{"resume_id": "resume-1"}) + if err != nil { + t.Fatalf("New returned error: %v", err) + } + if event.SchemaVersion != DefaultSchemaVersion { + t.Fatalf("schema version = %d, want %d", event.SchemaVersion, DefaultSchemaVersion) + } + if event.IdempotencyKey == "" { + t.Fatal("idempotency key was not generated") + } + if err := event.Validate(); err != nil { + t.Fatalf("generated event is invalid: %v", err) + } +} + +func TestEventValidateRejectsInvalidShape(t *testing.T) { + validTime := time.Date(2026, 6, 17, 10, 0, 0, 0, time.UTC) + tests := []struct { + name string + event Event + }{ + { + name: "bad event type", + event: Event{ + EventType: "HH Response", + EntityType: "hh_response", + EntityID: "1", + IdempotencyKey: "key", + SchemaVersion: 1, + OccurredAt: validTime, + }, + }, + { + name: "missing entity id", + event: Event{ + EventType: "hh.response_received", + EntityType: "hh_response", + IdempotencyKey: "key", + SchemaVersion: 1, + OccurredAt: validTime, + }, + }, + { + name: "invalid payload", + event: Event{ + EventType: "hh.response_received", + EntityType: "hh_response", + EntityID: "1", + IdempotencyKey: "key", + SchemaVersion: 1, + OccurredAt: validTime, + Payload: json.RawMessage(`{`), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := tt.event.Validate(); err == nil { + t.Fatal("invalid event was accepted") + } + }) + } +} + +func TestBuildIdempotencyKeyIsStable(t *testing.T) { + a := BuildIdempotencyKey("hh.response_received", "resume-1", "response-2") + b := BuildIdempotencyKey("hh.response_received", "resume-1", "response-2") + c := BuildIdempotencyKey("hh.response_received", "resume-1", "response-3") + if a == "" { + t.Fatal("empty idempotency key") + } + if a != b { + t.Fatal("same parts produced different keys") + } + if a == c { + t.Fatal("different parts produced same key") + } +} + +func TestPayloadAcceptsRawMessage(t *testing.T) { + raw := json.RawMessage(`{"ok":true}`) + got, err := Payload(raw) + if err != nil { + t.Fatalf("Payload returned error: %v", err) + } + if string(got) != string(raw) { + t.Fatalf("payload = %s, want %s", got, raw) + } +}