Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
273a5ceb91 | ||
|
|
5786958078 | ||
|
|
5b6a1c56ea | ||
|
|
2ca85077a3 | ||
|
|
a1f6966200 | ||
|
|
fa4c67b686 | ||
|
|
56bfdb081a | ||
|
|
15a8804307 |
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
# macOS Finder metadata
|
||||||
|
.DS_Store
|
||||||
59
CHANGELOG.md
Normal file
59
CHANGELOG.md
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
# Changelog
|
||||||
|
|
||||||
|
## v0.4.0 - 2026-06-17
|
||||||
|
|
||||||
|
Added:
|
||||||
|
|
||||||
|
- `durableevent` package with the shared wire contract for durable
|
||||||
|
cross-service events.
|
||||||
|
- Event validation, payload normalization and stable idempotency key helper.
|
||||||
|
|
||||||
|
Migration:
|
||||||
|
|
||||||
|
1. Push `portal-common` commit and tag `v0.4.0`.
|
||||||
|
2. In each producer/consumer service, update:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
go get gitea.estateliga.work/admin/portal-common@v0.4.0
|
||||||
|
go mod tidy
|
||||||
|
```
|
||||||
|
|
||||||
|
3. Use `durableevent.New(...)` for critical cross-service events and publish
|
||||||
|
the JSON envelope to the selected durable transport.
|
||||||
|
4. Keep consumers idempotent by storing or checking `idempotency_key`.
|
||||||
|
|
||||||
|
## v0.3.0 - 2026-06-17
|
||||||
|
|
||||||
|
Added:
|
||||||
|
|
||||||
|
- `audit` package with the shared business-audit event contract.
|
||||||
|
- Safe audit details redaction for sensitive keys such as token, secret,
|
||||||
|
password, api_key, authorization and webhook_url.
|
||||||
|
- `audit.Client` for `POST /api/internal/audit/events` in Portal.
|
||||||
|
|
||||||
|
Migration:
|
||||||
|
|
||||||
|
1. Push `portal-common` commit and tag `v0.3.0`.
|
||||||
|
2. In each service, update:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
go get gitea.estateliga.work/admin/portal-common@v0.3.0
|
||||||
|
go mod tidy
|
||||||
|
```
|
||||||
|
|
||||||
|
3. Use `audit.NewClient(cfg.PortalBaseURL, cfg.PortalAPIKey)` or equivalent
|
||||||
|
Portal URL / internal key config already present in the service.
|
||||||
|
4. Send business events from handlers/workers after the durable operation
|
||||||
|
succeeds.
|
||||||
|
5. Keep Portal proxy audit mappings only as fallback until duplicate events are
|
||||||
|
checked in production.
|
||||||
|
|
||||||
|
## v0.2.0
|
||||||
|
|
||||||
|
Existing shared packages:
|
||||||
|
|
||||||
|
- `db`
|
||||||
|
- `middleware`
|
||||||
|
- `portal`
|
||||||
|
- `redisx`
|
||||||
|
- `eventbus`
|
||||||
40
README.md
40
README.md
@@ -7,6 +7,7 @@ Shared Go-библиотека для микросервисов портала.
|
|||||||
- `db/` — pgxpool init + slow-query tracer. Заменяет идентичный код в 9 сервисах.
|
- `db/` — pgxpool init + slow-query tracer. Заменяет идентичный код в 9 сервисах.
|
||||||
- `middleware/` — `InternalAuth` (X-Internal-Key + X-User-*) + хелперы для кастомных заголовков.
|
- `middleware/` — `InternalAuth` (X-Internal-Key + X-User-*) + хелперы для кастомных заголовков.
|
||||||
- `portal/` — HTTP-клиент portal-сервиса: directory с кэшем+stale fallback, notifications, deactivate user.
|
- `portal/` — HTTP-клиент portal-сервиса: directory с кэшем+stale fallback, notifications, deactivate user.
|
||||||
|
- `audit/` — общий контракт business-audit событий и клиент отправки в Portal.
|
||||||
|
|
||||||
## Использование
|
## Использование
|
||||||
|
|
||||||
@@ -14,6 +15,7 @@ Shared Go-библиотека для микросервисов портала.
|
|||||||
|
|
||||||
```go
|
```go
|
||||||
import (
|
import (
|
||||||
|
"gitea.estateliga.work/admin/portal-common/audit"
|
||||||
"gitea.estateliga.work/admin/portal-common/db"
|
"gitea.estateliga.work/admin/portal-common/db"
|
||||||
"gitea.estateliga.work/admin/portal-common/middleware"
|
"gitea.estateliga.work/admin/portal-common/middleware"
|
||||||
"gitea.estateliga.work/admin/portal-common/portal"
|
"gitea.estateliga.work/admin/portal-common/portal"
|
||||||
@@ -25,8 +27,31 @@ r := chi.NewRouter()
|
|||||||
r.Use(middleware.InternalAuth(cfg.InternalAPIKey))
|
r.Use(middleware.InternalAuth(cfg.InternalAPIKey))
|
||||||
|
|
||||||
portalCli := portal.New(cfg.PortalBaseURL, cfg.PortalAPIKey)
|
portalCli := portal.New(cfg.PortalBaseURL, cfg.PortalAPIKey)
|
||||||
|
auditCli := audit.NewClient(cfg.PortalBaseURL, cfg.PortalAPIKey)
|
||||||
|
_ = auditCli.Send(ctx, audit.Event{
|
||||||
|
Action: "tasks.task_create",
|
||||||
|
EntityType: "task",
|
||||||
|
EntityID: taskID,
|
||||||
|
UserID: actorID,
|
||||||
|
UserName: actorName,
|
||||||
|
Details: map[string]any{
|
||||||
|
"request_id": requestID,
|
||||||
|
},
|
||||||
|
})
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Business audit vocabulary
|
||||||
|
|
||||||
|
События именуются в формате `<service>.<entity>_<verb>`:
|
||||||
|
|
||||||
|
- `service`: короткий код сервиса (`tasks`, `files`, `tg`, `pf`, `telephony`, `portal`).
|
||||||
|
- `entity`: бизнес-сущность (`task`, `node`, `channel`, `project`, `call`).
|
||||||
|
- `verb`: действие в прошедшем бизнес-смысле или команда (`create`, `update`, `delete`, `retry`, `share`, `move`).
|
||||||
|
|
||||||
|
В `Details` можно класть диагностический контекст (`request_id`, `scope`,
|
||||||
|
`status`, `error_code`), но нельзя класть токены, пароли и API-ключи. Пакет
|
||||||
|
`audit` дополнительно вырезает чувствительные ключи перед отправкой.
|
||||||
|
|
||||||
## Dev-режим (без push в Gitea)
|
## Dev-режим (без push в Gitea)
|
||||||
|
|
||||||
В `go.mod` сервиса добавить `replace`:
|
В `go.mod` сервиса добавить `replace`:
|
||||||
@@ -38,9 +63,22 @@ replace gitea.estateliga.work/admin/portal-common => ../portal-common
|
|||||||
Когда библиотека стабилизируется, заменить на pinned тег:
|
Когда библиотека стабилизируется, заменить на pinned тег:
|
||||||
|
|
||||||
```
|
```
|
||||||
require gitea.estateliga.work/admin/portal-common v0.1.0
|
require gitea.estateliga.work/admin/portal-common v0.3.0
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Release
|
||||||
|
|
||||||
|
После изменения публичных пакетов:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
go test ./...
|
||||||
|
git tag v0.3.0
|
||||||
|
git push origin main --tags
|
||||||
|
```
|
||||||
|
|
||||||
|
Сервисы обновляются только после публикации тега, без коммита локального
|
||||||
|
`replace`.
|
||||||
|
|
||||||
## Зачем
|
## Зачем
|
||||||
|
|
||||||
До этого 9 сервисов копировали один в один: pgxpool init, slow-query tracer (500ms threshold), InternalAuth middleware. Tweak'ать tuning централизованно было невозможно. Сейчас изменения идут в одном репо, сервисы пересобираются.
|
До этого 9 сервисов копировали один в один: pgxpool init, slow-query tracer (500ms threshold), InternalAuth middleware. Tweak'ать tuning централизованно было невозможно. Сейчас изменения идут в одном репо, сервисы пересобираются.
|
||||||
|
|||||||
158
audit/audit.go
Normal file
158
audit/audit.go
Normal file
@@ -0,0 +1,158 @@
|
|||||||
|
// Package audit defines the shared business-audit contract used by Portal
|
||||||
|
// microservices.
|
||||||
|
package audit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
MaxTokenLength = 80
|
||||||
|
MaxDetailsBytes = 16 * 1024
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
tokenRe = regexp.MustCompile(`^[a-z][a-z0-9_.-]{1,79}$`)
|
||||||
|
secretKeyRe = regexp.MustCompile(`(?i)token|secret|password|api_?key|bearer|authorization|webhook_url|bitrix`)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Event is the stable wire contract for business-audit events.
|
||||||
|
type Event struct {
|
||||||
|
Action string `json:"action"`
|
||||||
|
EntityType string `json:"entity_type"`
|
||||||
|
EntityID string `json:"entity_id,omitempty"`
|
||||||
|
UserID string `json:"user_id,omitempty"`
|
||||||
|
UserName string `json:"user_name,omitempty"`
|
||||||
|
Details map[string]any `json:"details,omitempty"`
|
||||||
|
IPAddress string `json:"ip_address,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normalize trims fields and redacts sensitive values in Details.
|
||||||
|
func (e Event) Normalize() Event {
|
||||||
|
e.Action = strings.TrimSpace(e.Action)
|
||||||
|
e.EntityType = strings.TrimSpace(e.EntityType)
|
||||||
|
e.EntityID = truncateRunes(strings.TrimSpace(e.EntityID), 255)
|
||||||
|
e.UserID = strings.TrimSpace(e.UserID)
|
||||||
|
e.UserName = strings.TrimSpace(e.UserName)
|
||||||
|
e.IPAddress = strings.TrimSpace(e.IPAddress)
|
||||||
|
if e.Details != nil {
|
||||||
|
if redacted, ok := RedactSecrets(e.Details).(map[string]any); ok {
|
||||||
|
e.Details = redacted
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate checks the event shape before it is sent to Portal.
|
||||||
|
func (e Event) Validate() error {
|
||||||
|
e = e.Normalize()
|
||||||
|
if !ValidToken(e.Action) {
|
||||||
|
return fmt.Errorf("invalid audit action %q", e.Action)
|
||||||
|
}
|
||||||
|
if !ValidToken(e.EntityType) {
|
||||||
|
return fmt.Errorf("invalid audit entity_type %q", e.EntityType)
|
||||||
|
}
|
||||||
|
if e.Details != nil && detailsTooLarge(e.Details) {
|
||||||
|
return fmt.Errorf("audit details exceed %d bytes", MaxDetailsBytes)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ValidToken(value string) bool {
|
||||||
|
value = strings.TrimSpace(value)
|
||||||
|
return len(value) <= MaxTokenLength && tokenRe.MatchString(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RedactSecrets recursively replaces values for sensitive JSON keys.
|
||||||
|
func RedactSecrets(v any) any {
|
||||||
|
switch x := v.(type) {
|
||||||
|
case map[string]any:
|
||||||
|
out := make(map[string]any, len(x))
|
||||||
|
for k, val := range x {
|
||||||
|
if secretKeyRe.MatchString(k) {
|
||||||
|
out[k] = "***"
|
||||||
|
} else {
|
||||||
|
out[k] = RedactSecrets(val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
case []any:
|
||||||
|
out := make([]any, len(x))
|
||||||
|
for i, val := range x {
|
||||||
|
out[i] = RedactSecrets(val)
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
default:
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
baseURL string
|
||||||
|
apiKey string
|
||||||
|
hc *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClient(baseURL, apiKey string) *Client {
|
||||||
|
return &Client{
|
||||||
|
baseURL: strings.TrimRight(baseURL, "/"),
|
||||||
|
apiKey: apiKey,
|
||||||
|
hc: &http.Client{Timeout: 5 * time.Second},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Enabled() bool {
|
||||||
|
return c != nil && c.baseURL != "" && c.apiKey != ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Send(ctx context.Context, event Event) error {
|
||||||
|
if !c.Enabled() {
|
||||||
|
return errors.New("audit client disabled")
|
||||||
|
}
|
||||||
|
event = event.Normalize()
|
||||||
|
if err := event.Validate(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
body, err := json.Marshal(event)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal audit event: %w", err)
|
||||||
|
}
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/internal/audit/events", bytes.NewReader(body))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("new audit request: %w", err)
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
req.Header.Set("X-Internal-Key", c.apiKey)
|
||||||
|
resp, err := c.hc.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("send audit event: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = resp.Body.Close() }()
|
||||||
|
if resp.StatusCode >= 300 {
|
||||||
|
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||||
|
return fmt.Errorf("portal audit returned %d: %s", resp.StatusCode, string(respBody))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func detailsTooLarge(value any) bool {
|
||||||
|
raw, err := json.Marshal(value)
|
||||||
|
return err != nil || len(raw) > MaxDetailsBytes
|
||||||
|
}
|
||||||
|
|
||||||
|
func truncateRunes(value string, max int) string {
|
||||||
|
runes := []rune(value)
|
||||||
|
if len(runes) <= max {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
return string(runes[:max])
|
||||||
|
}
|
||||||
85
audit/audit_test.go
Normal file
85
audit/audit_test.go
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
package audit
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestEventNormalizeRedactsSecretsAndTruncatesEntityID(t *testing.T) {
|
||||||
|
event := Event{
|
||||||
|
Action: " files.share_create ",
|
||||||
|
EntityType: " file_node ",
|
||||||
|
EntityID: longString("x", 300),
|
||||||
|
Details: map[string]any{
|
||||||
|
"request_id": "rid-1",
|
||||||
|
"password": "plain",
|
||||||
|
"nested": map[string]any{
|
||||||
|
"api_key": "secret",
|
||||||
|
},
|
||||||
|
"items": []any{
|
||||||
|
map[string]any{"token": "secret"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
got := event.Normalize()
|
||||||
|
if got.Action != "files.share_create" {
|
||||||
|
t.Fatalf("action was not trimmed: %q", got.Action)
|
||||||
|
}
|
||||||
|
if got.EntityType != "file_node" {
|
||||||
|
t.Fatalf("entity type was not trimmed: %q", got.EntityType)
|
||||||
|
}
|
||||||
|
if len([]rune(got.EntityID)) != 255 {
|
||||||
|
t.Fatalf("entity id was not truncated, got %d", len([]rune(got.EntityID)))
|
||||||
|
}
|
||||||
|
if got.Details["password"] != "***" {
|
||||||
|
t.Fatalf("password was not redacted: %#v", got.Details["password"])
|
||||||
|
}
|
||||||
|
nested := got.Details["nested"].(map[string]any)
|
||||||
|
if nested["api_key"] != "***" {
|
||||||
|
t.Fatalf("nested api_key was not redacted: %#v", nested["api_key"])
|
||||||
|
}
|
||||||
|
items := got.Details["items"].([]any)
|
||||||
|
if items[0].(map[string]any)["token"] != "***" {
|
||||||
|
t.Fatalf("array token was not redacted: %#v", items[0])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventValidate(t *testing.T) {
|
||||||
|
valid := Event{Action: "tasks.task_create", EntityType: "task"}
|
||||||
|
if err := valid.Validate(); err != nil {
|
||||||
|
t.Fatalf("valid event rejected: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
invalidAction := Event{Action: "Task Created", EntityType: "task"}
|
||||||
|
if err := invalidAction.Validate(); err == nil {
|
||||||
|
t.Fatal("invalid action was accepted")
|
||||||
|
}
|
||||||
|
|
||||||
|
invalidEntity := Event{Action: "tasks.task_create", EntityType: "1task"}
|
||||||
|
if err := invalidEntity.Validate(); err == nil {
|
||||||
|
t.Fatal("invalid entity type was accepted")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidToken(t *testing.T) {
|
||||||
|
tests := map[string]bool{
|
||||||
|
"files.node_move": true,
|
||||||
|
"ai.job-retry": true,
|
||||||
|
"a1": true,
|
||||||
|
"A1": false,
|
||||||
|
"a": false,
|
||||||
|
"files node": false,
|
||||||
|
longString("a", MaxTokenLength+1): false,
|
||||||
|
}
|
||||||
|
for value, want := range tests {
|
||||||
|
if got := ValidToken(value); got != want {
|
||||||
|
t.Fatalf("ValidToken(%q) = %v, want %v", value, got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func longString(ch string, n int) string {
|
||||||
|
out := ""
|
||||||
|
for range n {
|
||||||
|
out += ch
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
49
db/pool.go
49
db/pool.go
@@ -104,6 +104,55 @@ func ConnectURL(url string) (*pgxpool.Pool, error) {
|
|||||||
return Connect(cfg)
|
return Connect(cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConnectWithRetry — Connect с exponential-backoff'ом для k8s-rollout race:
|
||||||
|
// pod может стартануть до того, как postgres StatefulSet готов. Без
|
||||||
|
// ретраев это даёт CrashLoopBackoff с noisy логами; с ретраями — pod
|
||||||
|
// мирно ждёт N попыток.
|
||||||
|
//
|
||||||
|
// Backoff: 1s, 2s, 4s, 8s, ..., cap 15s. Возвращает первый успешный pool
|
||||||
|
// или последнюю ошибку после totalTimeout. Используется telephony — там
|
||||||
|
// rollout-race реально срабатывал; остальные сервисы могут перейти при
|
||||||
|
// необходимости.
|
||||||
|
func ConnectWithRetry(ctx context.Context, url string, totalTimeout time.Duration) (*pgxpool.Pool, error) {
|
||||||
|
cfg := Defaults()
|
||||||
|
cfg.DatabaseURL = url
|
||||||
|
return ConnectWithRetryCfg(ctx, cfg, totalTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConnectWithRetryCfg — то же, что ConnectWithRetry, но с кастомным
|
||||||
|
// PoolConfig (для сервисов, которым нужны нестандартные лимиты + retry).
|
||||||
|
func ConnectWithRetryCfg(ctx context.Context, cfg PoolConfig, totalTimeout time.Duration) (*pgxpool.Pool, error) {
|
||||||
|
if totalTimeout <= 0 {
|
||||||
|
totalTimeout = 2 * time.Minute
|
||||||
|
}
|
||||||
|
deadline := time.Now().Add(totalTimeout)
|
||||||
|
delay := time.Second
|
||||||
|
var lastErr error
|
||||||
|
for attempt := 1; ; attempt++ {
|
||||||
|
pool, err := Connect(cfg)
|
||||||
|
if err == nil {
|
||||||
|
if attempt > 1 {
|
||||||
|
slog.Info("db connected after retries", "attempt", attempt)
|
||||||
|
}
|
||||||
|
return pool, nil
|
||||||
|
}
|
||||||
|
lastErr = err
|
||||||
|
if time.Now().Add(delay).After(deadline) {
|
||||||
|
return nil, fmt.Errorf("connect db (after %d attempts): %w", attempt, err)
|
||||||
|
}
|
||||||
|
slog.Warn("db connect failed, retrying", "attempt", attempt, "delay", delay, "error", err)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, fmt.Errorf("ctx cancelled while waiting to retry db: %w (last attempt: %v)", ctx.Err(), lastErr)
|
||||||
|
case <-time.After(delay):
|
||||||
|
}
|
||||||
|
delay *= 2
|
||||||
|
if delay > 15*time.Second {
|
||||||
|
delay = 15 * time.Second
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// slowQueryTracer пишет WARN в slog'е для запросов, длиннее threshold.
|
// slowQueryTracer пишет WARN в slog'е для запросов, длиннее threshold.
|
||||||
// Логируется duration_ms, обрезанный SQL и error (если был). Не префиксируем
|
// Логируется duration_ms, обрезанный SQL и error (если был). Не префиксируем
|
||||||
// сервисом — это делает caller через slog.With() на старте процесса.
|
// сервисом — это делает caller через slog.With() на старте процесса.
|
||||||
|
|||||||
150
durableevent/event.go
Normal file
150
durableevent/event.go
Normal file
@@ -0,0 +1,150 @@
|
|||||||
|
// Package durableevent defines the shared contract for critical cross-service
|
||||||
|
// events that must survive process restarts and temporary subscriber outages.
|
||||||
|
package durableevent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
DefaultSchemaVersion = 1
|
||||||
|
MaxTokenLength = 120
|
||||||
|
MaxIDLength = 255
|
||||||
|
MaxPayloadBytes = 64 * 1024
|
||||||
|
)
|
||||||
|
|
||||||
|
var tokenRe = regexp.MustCompile(`^[a-z][a-z0-9_.-]{1,119}$`)
|
||||||
|
|
||||||
|
// Event is the stable wire shape for durable cross-service events.
|
||||||
|
//
|
||||||
|
// It is intentionally transport-neutral: the same JSON can be written to
|
||||||
|
// Redis Streams, a database outbox, or a journal table.
|
||||||
|
type Event struct {
|
||||||
|
EventType string `json:"event_type"`
|
||||||
|
SchemaVersion int `json:"schema_version"`
|
||||||
|
ActorUserID string `json:"actor_user_id,omitempty"`
|
||||||
|
ActorName string `json:"actor_name,omitempty"`
|
||||||
|
EntityType string `json:"entity_type"`
|
||||||
|
EntityID string `json:"entity_id"`
|
||||||
|
RelatedType string `json:"related_entity_type,omitempty"`
|
||||||
|
RelatedID string `json:"related_entity_id,omitempty"`
|
||||||
|
RequestID string `json:"request_id,omitempty"`
|
||||||
|
IdempotencyKey string `json:"idempotency_key"`
|
||||||
|
OccurredAt time.Time `json:"occurred_at"`
|
||||||
|
Payload json.RawMessage `json:"payload,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(eventType, entityType, entityID string, payload any) (Event, error) {
|
||||||
|
raw, err := Payload(payload)
|
||||||
|
if err != nil {
|
||||||
|
return Event{}, err
|
||||||
|
}
|
||||||
|
event := Event{
|
||||||
|
EventType: eventType,
|
||||||
|
SchemaVersion: DefaultSchemaVersion,
|
||||||
|
EntityType: entityType,
|
||||||
|
EntityID: entityID,
|
||||||
|
OccurredAt: time.Now().UTC(),
|
||||||
|
Payload: raw,
|
||||||
|
}
|
||||||
|
event.IdempotencyKey = BuildIdempotencyKey(event.EventType, event.EntityType, event.EntityID)
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e Event) Normalize() Event {
|
||||||
|
e.EventType = strings.TrimSpace(e.EventType)
|
||||||
|
e.ActorUserID = strings.TrimSpace(e.ActorUserID)
|
||||||
|
e.ActorName = truncate(strings.TrimSpace(e.ActorName), MaxIDLength)
|
||||||
|
e.EntityType = strings.TrimSpace(e.EntityType)
|
||||||
|
e.EntityID = truncate(strings.TrimSpace(e.EntityID), MaxIDLength)
|
||||||
|
e.RelatedType = strings.TrimSpace(e.RelatedType)
|
||||||
|
e.RelatedID = truncate(strings.TrimSpace(e.RelatedID), MaxIDLength)
|
||||||
|
e.RequestID = truncate(strings.TrimSpace(e.RequestID), MaxIDLength)
|
||||||
|
e.IdempotencyKey = truncate(strings.TrimSpace(e.IdempotencyKey), MaxIDLength)
|
||||||
|
if e.SchemaVersion == 0 {
|
||||||
|
e.SchemaVersion = DefaultSchemaVersion
|
||||||
|
}
|
||||||
|
if !e.OccurredAt.IsZero() {
|
||||||
|
e.OccurredAt = e.OccurredAt.UTC()
|
||||||
|
}
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e Event) Validate() error {
|
||||||
|
e = e.Normalize()
|
||||||
|
if !ValidToken(e.EventType) {
|
||||||
|
return fmt.Errorf("invalid event_type %q", e.EventType)
|
||||||
|
}
|
||||||
|
if !ValidToken(e.EntityType) {
|
||||||
|
return fmt.Errorf("invalid entity_type %q", e.EntityType)
|
||||||
|
}
|
||||||
|
if e.EntityID == "" {
|
||||||
|
return errors.New("entity_id is required")
|
||||||
|
}
|
||||||
|
if e.IdempotencyKey == "" {
|
||||||
|
return errors.New("idempotency_key is required")
|
||||||
|
}
|
||||||
|
if e.SchemaVersion < 1 {
|
||||||
|
return errors.New("schema_version must be positive")
|
||||||
|
}
|
||||||
|
if e.OccurredAt.IsZero() {
|
||||||
|
return errors.New("occurred_at is required")
|
||||||
|
}
|
||||||
|
if len(e.Payload) > MaxPayloadBytes {
|
||||||
|
return fmt.Errorf("payload exceeds %d bytes", MaxPayloadBytes)
|
||||||
|
}
|
||||||
|
if len(e.Payload) > 0 && !json.Valid(e.Payload) {
|
||||||
|
return errors.New("payload must be valid JSON")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ValidToken(value string) bool {
|
||||||
|
value = strings.TrimSpace(value)
|
||||||
|
return len(value) <= MaxTokenLength && tokenRe.MatchString(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Payload(value any) (json.RawMessage, error) {
|
||||||
|
if value == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if raw, ok := value.(json.RawMessage); ok {
|
||||||
|
if len(raw) > 0 && !json.Valid(raw) {
|
||||||
|
return nil, errors.New("payload must be valid JSON")
|
||||||
|
}
|
||||||
|
return raw, nil
|
||||||
|
}
|
||||||
|
raw, err := json.Marshal(value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("marshal payload: %w", err)
|
||||||
|
}
|
||||||
|
return raw, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func BuildIdempotencyKey(parts ...string) string {
|
||||||
|
h := sha256.New()
|
||||||
|
for _, part := range parts {
|
||||||
|
part = strings.TrimSpace(part)
|
||||||
|
if part == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, _ = h.Write([]byte(part))
|
||||||
|
_, _ = h.Write([]byte{0})
|
||||||
|
}
|
||||||
|
return hex.EncodeToString(h.Sum(nil))
|
||||||
|
}
|
||||||
|
|
||||||
|
func truncate(value string, max int) string {
|
||||||
|
runes := []rune(value)
|
||||||
|
if len(runes) <= max {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
return string(runes[:max])
|
||||||
|
}
|
||||||
98
durableevent/event_test.go
Normal file
98
durableevent/event_test.go
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
package durableevent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewEventBuildsValidEvent(t *testing.T) {
|
||||||
|
event, err := New("hh.response_received", "hh_response", "resume-1", map[string]any{"resume_id": "resume-1"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("New returned error: %v", err)
|
||||||
|
}
|
||||||
|
if event.SchemaVersion != DefaultSchemaVersion {
|
||||||
|
t.Fatalf("schema version = %d, want %d", event.SchemaVersion, DefaultSchemaVersion)
|
||||||
|
}
|
||||||
|
if event.IdempotencyKey == "" {
|
||||||
|
t.Fatal("idempotency key was not generated")
|
||||||
|
}
|
||||||
|
if err := event.Validate(); err != nil {
|
||||||
|
t.Fatalf("generated event is invalid: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventValidateRejectsInvalidShape(t *testing.T) {
|
||||||
|
validTime := time.Date(2026, 6, 17, 10, 0, 0, 0, time.UTC)
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
event Event
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "bad event type",
|
||||||
|
event: Event{
|
||||||
|
EventType: "HH Response",
|
||||||
|
EntityType: "hh_response",
|
||||||
|
EntityID: "1",
|
||||||
|
IdempotencyKey: "key",
|
||||||
|
SchemaVersion: 1,
|
||||||
|
OccurredAt: validTime,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "missing entity id",
|
||||||
|
event: Event{
|
||||||
|
EventType: "hh.response_received",
|
||||||
|
EntityType: "hh_response",
|
||||||
|
IdempotencyKey: "key",
|
||||||
|
SchemaVersion: 1,
|
||||||
|
OccurredAt: validTime,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid payload",
|
||||||
|
event: Event{
|
||||||
|
EventType: "hh.response_received",
|
||||||
|
EntityType: "hh_response",
|
||||||
|
EntityID: "1",
|
||||||
|
IdempotencyKey: "key",
|
||||||
|
SchemaVersion: 1,
|
||||||
|
OccurredAt: validTime,
|
||||||
|
Payload: json.RawMessage(`{`),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
if err := tt.event.Validate(); err == nil {
|
||||||
|
t.Fatal("invalid event was accepted")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildIdempotencyKeyIsStable(t *testing.T) {
|
||||||
|
a := BuildIdempotencyKey("hh.response_received", "resume-1", "response-2")
|
||||||
|
b := BuildIdempotencyKey("hh.response_received", "resume-1", "response-2")
|
||||||
|
c := BuildIdempotencyKey("hh.response_received", "resume-1", "response-3")
|
||||||
|
if a == "" {
|
||||||
|
t.Fatal("empty idempotency key")
|
||||||
|
}
|
||||||
|
if a != b {
|
||||||
|
t.Fatal("same parts produced different keys")
|
||||||
|
}
|
||||||
|
if a == c {
|
||||||
|
t.Fatal("different parts produced same key")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPayloadAcceptsRawMessage(t *testing.T) {
|
||||||
|
raw := json.RawMessage(`{"ok":true}`)
|
||||||
|
got, err := Payload(raw)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Payload returned error: %v", err)
|
||||||
|
}
|
||||||
|
if string(got) != string(raw) {
|
||||||
|
t.Fatalf("payload = %s, want %s", got, raw)
|
||||||
|
}
|
||||||
|
}
|
||||||
146
eventbus/bus.go
Normal file
146
eventbus/bus.go
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
// Package eventbus — лёгкая cross-service шина событий поверх Redis
|
||||||
|
// pub/sub. Используется для уведомлений между сервисами без жёсткого
|
||||||
|
// HTTP-coupling'а (publisher не знает кто слушает).
|
||||||
|
//
|
||||||
|
// Семантика: at-most-once delivery — Redis pub/sub не сохраняет
|
||||||
|
// сообщения для disconnect'нутых подписчиков. Если нужна гарантия —
|
||||||
|
// ставим вторую сторону (consumer-side journal или Streams). Пока MVP.
|
||||||
|
//
|
||||||
|
// Конвенция топиков: "{service}.{event}", например "tasks.task.completed".
|
||||||
|
// Service-prefix защищает от случайных коллизий имён.
|
||||||
|
package eventbus
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Event — общая обёртка для всех событий. Payload — JSON, специфика на
|
||||||
|
// уровне consumer'а (он типизирует через json.Unmarshal в свою struct).
|
||||||
|
type Event struct {
|
||||||
|
ID string `json:"id"` // UUID4 для idempotency на стороне consumer'а
|
||||||
|
Type string `json:"type"` // напр. "task.completed"
|
||||||
|
Service string `json:"service"` // источник: "tasks", "candidates", ...
|
||||||
|
OccurredAt time.Time `json:"occurred_at"`
|
||||||
|
ActorID string `json:"actor_id,omitempty"`
|
||||||
|
ActorName string `json:"actor_name,omitempty"`
|
||||||
|
Payload json.RawMessage `json:"payload,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bus — публикация + подписка. Один интерфейс, чтобы в тестах можно было
|
||||||
|
// заменить на in-memory mock без Redis.
|
||||||
|
type Bus interface {
|
||||||
|
// Publish — fire-and-forget; возвращает ошибку только если не удалось
|
||||||
|
// отправить в Redis. У consumer'ов ничего не гарантируется.
|
||||||
|
Publish(ctx context.Context, topic string, event Event) error
|
||||||
|
// Subscribe — блокирующая подписка. Завершается при отмене ctx или
|
||||||
|
// фатальной ошибке. Handler'ы вызываются последовательно в одной
|
||||||
|
// горутине на топик; для параллельной обработки — буферизация на
|
||||||
|
// стороне handler'а.
|
||||||
|
Subscribe(ctx context.Context, topic string, handler Handler) error
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler — обработчик одного события. Возврат error логируется как
|
||||||
|
// warning; повторной доставки нет (at-most-once семантика Redis).
|
||||||
|
type Handler func(ctx context.Context, event Event) error
|
||||||
|
|
||||||
|
// redisBus — реализация на Redis pub/sub.
|
||||||
|
type redisBus struct {
|
||||||
|
client *redis.Client
|
||||||
|
service string // имя сервиса-источника, проставляется в Event.Service
|
||||||
|
}
|
||||||
|
|
||||||
|
// New — клиент должен быть pre-pinged (см. portal-common/redisx).
|
||||||
|
// service используется как default-значение Event.Service для Publish'а.
|
||||||
|
func New(client *redis.Client, service string) Bus {
|
||||||
|
return &redisBus{client: client, service: service}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *redisBus) Publish(ctx context.Context, topic string, event Event) error {
|
||||||
|
if event.ID == "" {
|
||||||
|
event.ID = uuid.NewString()
|
||||||
|
}
|
||||||
|
if event.Service == "" {
|
||||||
|
event.Service = b.service
|
||||||
|
}
|
||||||
|
if event.OccurredAt.IsZero() {
|
||||||
|
event.OccurredAt = time.Now().UTC()
|
||||||
|
}
|
||||||
|
body, err := json.Marshal(event)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal event: %w", err)
|
||||||
|
}
|
||||||
|
if err := b.client.Publish(ctx, topic, body).Err(); err != nil {
|
||||||
|
return fmt.Errorf("redis publish %s: %w", topic, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *redisBus) Subscribe(ctx context.Context, topic string, handler Handler) error {
|
||||||
|
if handler == nil {
|
||||||
|
return fmt.Errorf("handler is nil")
|
||||||
|
}
|
||||||
|
sub := b.client.Subscribe(ctx, topic)
|
||||||
|
defer func() { _ = sub.Close() }()
|
||||||
|
// Receive() блокирует до первого сообщения или отмены ctx.
|
||||||
|
if _, err := sub.Receive(ctx); err != nil {
|
||||||
|
return fmt.Errorf("subscribe %s: %w", topic, err)
|
||||||
|
}
|
||||||
|
ch := sub.Channel()
|
||||||
|
slog.Info("eventbus subscribed", "topic", topic)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case msg, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var event Event
|
||||||
|
if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
|
||||||
|
slog.Warn("eventbus: invalid payload", "topic", topic, "err", err, "raw", msg.Payload)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := handler(ctx, event); err != nil {
|
||||||
|
slog.Warn("eventbus: handler error",
|
||||||
|
"topic", topic, "event_id", event.ID, "event_type", event.Type, "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *redisBus) Close() error {
|
||||||
|
// Redis client закрывается caller'ом (он же его создал) — здесь только
|
||||||
|
// no-op, чтобы Bus можно было унести в struct без отдельного управления
|
||||||
|
// жизненным циклом client'а.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// PublishTyped — удобный helper: автоматически marshal'ит payload-структуру.
|
||||||
|
// Использовать когда payload — это типизированная struct.
|
||||||
|
func PublishTyped[T any](ctx context.Context, b Bus, topic, eventType string, actor Actor, payload T) error {
|
||||||
|
raw, err := json.Marshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal payload: %w", err)
|
||||||
|
}
|
||||||
|
return b.Publish(ctx, topic, Event{
|
||||||
|
Type: eventType,
|
||||||
|
ActorID: actor.ID,
|
||||||
|
ActorName: actor.Name,
|
||||||
|
Payload: raw,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Actor — простой пара id/name. Помогает не писать каждый раз
|
||||||
|
// Event{ActorID: ..., ActorName: ...}.
|
||||||
|
type Actor struct {
|
||||||
|
ID string
|
||||||
|
Name string
|
||||||
|
}
|
||||||
8
go.mod
8
go.mod
@@ -10,9 +10,15 @@ module gitea.estateliga.work/admin/portal-common
|
|||||||
|
|
||||||
go 1.25.7
|
go 1.25.7
|
||||||
|
|
||||||
require github.com/jackc/pgx/v5 v5.9.1
|
require (
|
||||||
|
github.com/google/uuid v1.6.0
|
||||||
|
github.com/jackc/pgx/v5 v5.9.1
|
||||||
|
github.com/redis/go-redis/v9 v9.16.0
|
||||||
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||||
|
|||||||
12
go.sum
12
go.sum
@@ -1,6 +1,16 @@
|
|||||||
|
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||||
|
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||||
|
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||||
|
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||||
@@ -11,6 +21,8 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo
|
|||||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4=
|
||||||
|
github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
|
|||||||
45
middleware/headers.go
Normal file
45
middleware/headers.go
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
package middleware
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HeaderBool reads portal-proxy boolean headers.
|
||||||
|
// Contract: "1", "true", "yes" and "on" are true; everything else is false.
|
||||||
|
func HeaderBool(r *http.Request, name string) bool {
|
||||||
|
switch strings.ToLower(strings.TrimSpace(r.Header.Get(name))) {
|
||||||
|
case "1", "true", "yes", "on":
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// HeaderCSV returns a trimmed non-empty CSV list from a portal-proxy header.
|
||||||
|
func HeaderCSV(r *http.Request, name string) []string {
|
||||||
|
raw := strings.TrimSpace(r.Header.Get(name))
|
||||||
|
if raw == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
parts := strings.Split(raw, ",")
|
||||||
|
out := make([]string, 0, len(parts))
|
||||||
|
for _, part := range parts {
|
||||||
|
if v := strings.TrimSpace(part); v != "" {
|
||||||
|
out = append(out, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// HeaderScope returns a lower-cased scope constrained to allowed values.
|
||||||
|
// The fallback is returned when the header is empty or unknown.
|
||||||
|
func HeaderScope(r *http.Request, name, fallback string, allowed ...string) string {
|
||||||
|
value := strings.ToLower(strings.TrimSpace(r.Header.Get(name)))
|
||||||
|
for _, option := range allowed {
|
||||||
|
if value == strings.ToLower(strings.TrimSpace(option)) {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
86
redisx/client.go
Normal file
86
redisx/client.go
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
// Package redisx — общий go-redis клиент с едиными pool-опциями.
|
||||||
|
// Booking/deals/hhru/webhooks-apps копировали одни и те же параметры —
|
||||||
|
// теперь tweak'аем здесь, сервисы пересобираются.
|
||||||
|
package redisx
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
Addr string
|
||||||
|
Password string
|
||||||
|
DB int
|
||||||
|
PoolSize int
|
||||||
|
MinIdleConns int
|
||||||
|
MaxRetries int
|
||||||
|
DialTimeout time.Duration
|
||||||
|
ReadTimeout time.Duration
|
||||||
|
WriteTimeout time.Duration
|
||||||
|
PingTimeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Defaults — общие дефолты, повторяющие исходное поведение всех сервисов.
|
||||||
|
func Defaults() Config {
|
||||||
|
return Config{
|
||||||
|
PoolSize: 10,
|
||||||
|
MinIdleConns: 2,
|
||||||
|
MaxRetries: 3,
|
||||||
|
DialTimeout: 5 * time.Second,
|
||||||
|
ReadTimeout: 3 * time.Second,
|
||||||
|
WriteTimeout: 3 * time.Second,
|
||||||
|
PingTimeout: 5 * time.Second,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// New строит клиент и проверяет соединение через Ping. На неудачу
|
||||||
|
// возвращает ошибку — caller сам решает retry/fail-fast.
|
||||||
|
func New(cfg Config) (*redis.Client, error) {
|
||||||
|
if cfg.Addr == "" {
|
||||||
|
return nil, fmt.Errorf("redis Addr is required")
|
||||||
|
}
|
||||||
|
def := Defaults()
|
||||||
|
if cfg.PoolSize == 0 {
|
||||||
|
cfg.PoolSize = def.PoolSize
|
||||||
|
}
|
||||||
|
if cfg.MinIdleConns == 0 {
|
||||||
|
cfg.MinIdleConns = def.MinIdleConns
|
||||||
|
}
|
||||||
|
if cfg.MaxRetries == 0 {
|
||||||
|
cfg.MaxRetries = def.MaxRetries
|
||||||
|
}
|
||||||
|
if cfg.DialTimeout == 0 {
|
||||||
|
cfg.DialTimeout = def.DialTimeout
|
||||||
|
}
|
||||||
|
if cfg.ReadTimeout == 0 {
|
||||||
|
cfg.ReadTimeout = def.ReadTimeout
|
||||||
|
}
|
||||||
|
if cfg.WriteTimeout == 0 {
|
||||||
|
cfg.WriteTimeout = def.WriteTimeout
|
||||||
|
}
|
||||||
|
if cfg.PingTimeout == 0 {
|
||||||
|
cfg.PingTimeout = def.PingTimeout
|
||||||
|
}
|
||||||
|
c := redis.NewClient(&redis.Options{
|
||||||
|
Addr: cfg.Addr,
|
||||||
|
Password: cfg.Password,
|
||||||
|
DB: cfg.DB,
|
||||||
|
PoolSize: cfg.PoolSize,
|
||||||
|
MinIdleConns: cfg.MinIdleConns,
|
||||||
|
MaxRetries: cfg.MaxRetries,
|
||||||
|
DialTimeout: cfg.DialTimeout,
|
||||||
|
ReadTimeout: cfg.ReadTimeout,
|
||||||
|
WriteTimeout: cfg.WriteTimeout,
|
||||||
|
})
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), cfg.PingTimeout)
|
||||||
|
defer cancel()
|
||||||
|
if err := c.Ping(ctx).Err(); err != nil {
|
||||||
|
_ = c.Close()
|
||||||
|
return nil, fmt.Errorf("redis ping: %w", err)
|
||||||
|
}
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user