Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
273a5ceb91 | ||
|
|
5786958078 |
22
CHANGELOG.md
22
CHANGELOG.md
@@ -1,5 +1,27 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## v0.4.0 - 2026-06-17
|
||||||
|
|
||||||
|
Added:
|
||||||
|
|
||||||
|
- `durableevent` package with the shared wire contract for durable
|
||||||
|
cross-service events.
|
||||||
|
- Event validation, payload normalization and stable idempotency key helper.
|
||||||
|
|
||||||
|
Migration:
|
||||||
|
|
||||||
|
1. Push `portal-common` commit and tag `v0.4.0`.
|
||||||
|
2. In each producer/consumer service, update:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
go get gitea.estateliga.work/admin/portal-common@v0.4.0
|
||||||
|
go mod tidy
|
||||||
|
```
|
||||||
|
|
||||||
|
3. Use `durableevent.New(...)` for critical cross-service events and publish
|
||||||
|
the JSON envelope to the selected durable transport.
|
||||||
|
4. Keep consumers idempotent by storing or checking `idempotency_key`.
|
||||||
|
|
||||||
## v0.3.0 - 2026-06-17
|
## v0.3.0 - 2026-06-17
|
||||||
|
|
||||||
Added:
|
Added:
|
||||||
|
|||||||
150
durableevent/event.go
Normal file
150
durableevent/event.go
Normal file
@@ -0,0 +1,150 @@
|
|||||||
|
// Package durableevent defines the shared contract for critical cross-service
|
||||||
|
// events that must survive process restarts and temporary subscriber outages.
|
||||||
|
package durableevent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
DefaultSchemaVersion = 1
|
||||||
|
MaxTokenLength = 120
|
||||||
|
MaxIDLength = 255
|
||||||
|
MaxPayloadBytes = 64 * 1024
|
||||||
|
)
|
||||||
|
|
||||||
|
var tokenRe = regexp.MustCompile(`^[a-z][a-z0-9_.-]{1,119}$`)
|
||||||
|
|
||||||
|
// Event is the stable wire shape for durable cross-service events.
|
||||||
|
//
|
||||||
|
// It is intentionally transport-neutral: the same JSON can be written to
|
||||||
|
// Redis Streams, a database outbox, or a journal table.
|
||||||
|
type Event struct {
|
||||||
|
EventType string `json:"event_type"`
|
||||||
|
SchemaVersion int `json:"schema_version"`
|
||||||
|
ActorUserID string `json:"actor_user_id,omitempty"`
|
||||||
|
ActorName string `json:"actor_name,omitempty"`
|
||||||
|
EntityType string `json:"entity_type"`
|
||||||
|
EntityID string `json:"entity_id"`
|
||||||
|
RelatedType string `json:"related_entity_type,omitempty"`
|
||||||
|
RelatedID string `json:"related_entity_id,omitempty"`
|
||||||
|
RequestID string `json:"request_id,omitempty"`
|
||||||
|
IdempotencyKey string `json:"idempotency_key"`
|
||||||
|
OccurredAt time.Time `json:"occurred_at"`
|
||||||
|
Payload json.RawMessage `json:"payload,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(eventType, entityType, entityID string, payload any) (Event, error) {
|
||||||
|
raw, err := Payload(payload)
|
||||||
|
if err != nil {
|
||||||
|
return Event{}, err
|
||||||
|
}
|
||||||
|
event := Event{
|
||||||
|
EventType: eventType,
|
||||||
|
SchemaVersion: DefaultSchemaVersion,
|
||||||
|
EntityType: entityType,
|
||||||
|
EntityID: entityID,
|
||||||
|
OccurredAt: time.Now().UTC(),
|
||||||
|
Payload: raw,
|
||||||
|
}
|
||||||
|
event.IdempotencyKey = BuildIdempotencyKey(event.EventType, event.EntityType, event.EntityID)
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e Event) Normalize() Event {
|
||||||
|
e.EventType = strings.TrimSpace(e.EventType)
|
||||||
|
e.ActorUserID = strings.TrimSpace(e.ActorUserID)
|
||||||
|
e.ActorName = truncate(strings.TrimSpace(e.ActorName), MaxIDLength)
|
||||||
|
e.EntityType = strings.TrimSpace(e.EntityType)
|
||||||
|
e.EntityID = truncate(strings.TrimSpace(e.EntityID), MaxIDLength)
|
||||||
|
e.RelatedType = strings.TrimSpace(e.RelatedType)
|
||||||
|
e.RelatedID = truncate(strings.TrimSpace(e.RelatedID), MaxIDLength)
|
||||||
|
e.RequestID = truncate(strings.TrimSpace(e.RequestID), MaxIDLength)
|
||||||
|
e.IdempotencyKey = truncate(strings.TrimSpace(e.IdempotencyKey), MaxIDLength)
|
||||||
|
if e.SchemaVersion == 0 {
|
||||||
|
e.SchemaVersion = DefaultSchemaVersion
|
||||||
|
}
|
||||||
|
if !e.OccurredAt.IsZero() {
|
||||||
|
e.OccurredAt = e.OccurredAt.UTC()
|
||||||
|
}
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e Event) Validate() error {
|
||||||
|
e = e.Normalize()
|
||||||
|
if !ValidToken(e.EventType) {
|
||||||
|
return fmt.Errorf("invalid event_type %q", e.EventType)
|
||||||
|
}
|
||||||
|
if !ValidToken(e.EntityType) {
|
||||||
|
return fmt.Errorf("invalid entity_type %q", e.EntityType)
|
||||||
|
}
|
||||||
|
if e.EntityID == "" {
|
||||||
|
return errors.New("entity_id is required")
|
||||||
|
}
|
||||||
|
if e.IdempotencyKey == "" {
|
||||||
|
return errors.New("idempotency_key is required")
|
||||||
|
}
|
||||||
|
if e.SchemaVersion < 1 {
|
||||||
|
return errors.New("schema_version must be positive")
|
||||||
|
}
|
||||||
|
if e.OccurredAt.IsZero() {
|
||||||
|
return errors.New("occurred_at is required")
|
||||||
|
}
|
||||||
|
if len(e.Payload) > MaxPayloadBytes {
|
||||||
|
return fmt.Errorf("payload exceeds %d bytes", MaxPayloadBytes)
|
||||||
|
}
|
||||||
|
if len(e.Payload) > 0 && !json.Valid(e.Payload) {
|
||||||
|
return errors.New("payload must be valid JSON")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ValidToken(value string) bool {
|
||||||
|
value = strings.TrimSpace(value)
|
||||||
|
return len(value) <= MaxTokenLength && tokenRe.MatchString(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Payload(value any) (json.RawMessage, error) {
|
||||||
|
if value == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if raw, ok := value.(json.RawMessage); ok {
|
||||||
|
if len(raw) > 0 && !json.Valid(raw) {
|
||||||
|
return nil, errors.New("payload must be valid JSON")
|
||||||
|
}
|
||||||
|
return raw, nil
|
||||||
|
}
|
||||||
|
raw, err := json.Marshal(value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("marshal payload: %w", err)
|
||||||
|
}
|
||||||
|
return raw, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func BuildIdempotencyKey(parts ...string) string {
|
||||||
|
h := sha256.New()
|
||||||
|
for _, part := range parts {
|
||||||
|
part = strings.TrimSpace(part)
|
||||||
|
if part == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, _ = h.Write([]byte(part))
|
||||||
|
_, _ = h.Write([]byte{0})
|
||||||
|
}
|
||||||
|
return hex.EncodeToString(h.Sum(nil))
|
||||||
|
}
|
||||||
|
|
||||||
|
func truncate(value string, max int) string {
|
||||||
|
runes := []rune(value)
|
||||||
|
if len(runes) <= max {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
return string(runes[:max])
|
||||||
|
}
|
||||||
98
durableevent/event_test.go
Normal file
98
durableevent/event_test.go
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
package durableevent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewEventBuildsValidEvent(t *testing.T) {
|
||||||
|
event, err := New("hh.response_received", "hh_response", "resume-1", map[string]any{"resume_id": "resume-1"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("New returned error: %v", err)
|
||||||
|
}
|
||||||
|
if event.SchemaVersion != DefaultSchemaVersion {
|
||||||
|
t.Fatalf("schema version = %d, want %d", event.SchemaVersion, DefaultSchemaVersion)
|
||||||
|
}
|
||||||
|
if event.IdempotencyKey == "" {
|
||||||
|
t.Fatal("idempotency key was not generated")
|
||||||
|
}
|
||||||
|
if err := event.Validate(); err != nil {
|
||||||
|
t.Fatalf("generated event is invalid: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventValidateRejectsInvalidShape(t *testing.T) {
|
||||||
|
validTime := time.Date(2026, 6, 17, 10, 0, 0, 0, time.UTC)
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
event Event
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "bad event type",
|
||||||
|
event: Event{
|
||||||
|
EventType: "HH Response",
|
||||||
|
EntityType: "hh_response",
|
||||||
|
EntityID: "1",
|
||||||
|
IdempotencyKey: "key",
|
||||||
|
SchemaVersion: 1,
|
||||||
|
OccurredAt: validTime,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "missing entity id",
|
||||||
|
event: Event{
|
||||||
|
EventType: "hh.response_received",
|
||||||
|
EntityType: "hh_response",
|
||||||
|
IdempotencyKey: "key",
|
||||||
|
SchemaVersion: 1,
|
||||||
|
OccurredAt: validTime,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid payload",
|
||||||
|
event: Event{
|
||||||
|
EventType: "hh.response_received",
|
||||||
|
EntityType: "hh_response",
|
||||||
|
EntityID: "1",
|
||||||
|
IdempotencyKey: "key",
|
||||||
|
SchemaVersion: 1,
|
||||||
|
OccurredAt: validTime,
|
||||||
|
Payload: json.RawMessage(`{`),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
if err := tt.event.Validate(); err == nil {
|
||||||
|
t.Fatal("invalid event was accepted")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildIdempotencyKeyIsStable(t *testing.T) {
|
||||||
|
a := BuildIdempotencyKey("hh.response_received", "resume-1", "response-2")
|
||||||
|
b := BuildIdempotencyKey("hh.response_received", "resume-1", "response-2")
|
||||||
|
c := BuildIdempotencyKey("hh.response_received", "resume-1", "response-3")
|
||||||
|
if a == "" {
|
||||||
|
t.Fatal("empty idempotency key")
|
||||||
|
}
|
||||||
|
if a != b {
|
||||||
|
t.Fatal("same parts produced different keys")
|
||||||
|
}
|
||||||
|
if a == c {
|
||||||
|
t.Fatal("different parts produced same key")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPayloadAcceptsRawMessage(t *testing.T) {
|
||||||
|
raw := json.RawMessage(`{"ok":true}`)
|
||||||
|
got, err := Payload(raw)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Payload returned error: %v", err)
|
||||||
|
}
|
||||||
|
if string(got) != string(raw) {
|
||||||
|
t.Fatalf("payload = %s, want %s", got, raw)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user