// Package db — общий pgxpool init для микросервисов: единая конфигурация // connection-pool'а, slow-query tracer, retry на startup. // // История: каждый сервис (booking, candidates, tasks, meet, leaders-reports, // telephony, hhru, webhooks-apps, deals) копировал этот код один в один. // Разъехавшиеся mode сложно tune'ить централизованно — теперь tweak'аем // здесь, а сервисы пересобираются. package db import ( "context" "fmt" "log/slog" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) // PoolConfig — параметры пула. Дефолты идентичны тому, что было в каждом // сервисе (MaxConns=10, MinConns=2). Slow-query threshold — 500ms. // // Если сервис хочет другие лимиты (например, candidates с большим // объёмом read'ов), можно передать своё значение; nil/zero — берём // default. type PoolConfig struct { DatabaseURL string MaxConns int32 MinConns int32 MaxConnIdleTime time.Duration MaxConnLifetime time.Duration SlowQueryThreshold time.Duration ConnectTimeout time.Duration } // Defaults возвращает PoolConfig с дефолтными значениями, выставлены // только числовые лимиты — DatabaseURL caller подставляет сам. func Defaults() PoolConfig { return PoolConfig{ MaxConns: 10, MinConns: 2, MaxConnIdleTime: 5 * time.Minute, MaxConnLifetime: 30 * time.Minute, SlowQueryThreshold: 500 * time.Millisecond, ConnectTimeout: 10 * time.Second, } } // Connect открывает pgxpool с заданной конфигурацией и tracer'ом slow-queries. // Возвращает live-pool после успешного Ping; на ошибке pool закрывается. func Connect(cfg PoolConfig) (*pgxpool.Pool, error) { if cfg.DatabaseURL == "" { return nil, fmt.Errorf("DatabaseURL is required") } def := Defaults() if cfg.MaxConns == 0 { cfg.MaxConns = def.MaxConns } if cfg.MinConns == 0 { cfg.MinConns = def.MinConns } if cfg.MaxConnIdleTime == 0 { cfg.MaxConnIdleTime = def.MaxConnIdleTime } if cfg.MaxConnLifetime == 0 { cfg.MaxConnLifetime = def.MaxConnLifetime } if cfg.SlowQueryThreshold == 0 { cfg.SlowQueryThreshold = def.SlowQueryThreshold } if cfg.ConnectTimeout == 0 { cfg.ConnectTimeout = def.ConnectTimeout } poolCfg, err := pgxpool.ParseConfig(cfg.DatabaseURL) if err != nil { return nil, fmt.Errorf("parse db config: %w", err) } poolCfg.MaxConns = cfg.MaxConns poolCfg.MinConns = cfg.MinConns poolCfg.MaxConnIdleTime = cfg.MaxConnIdleTime poolCfg.MaxConnLifetime = cfg.MaxConnLifetime poolCfg.ConnConfig.Tracer = &slowQueryTracer{threshold: cfg.SlowQueryThreshold} ctx, cancel := context.WithTimeout(context.Background(), cfg.ConnectTimeout) defer cancel() pool, err := pgxpool.NewWithConfig(ctx, poolCfg) if err != nil { return nil, fmt.Errorf("create pool: %w", err) } if err := pool.Ping(ctx); err != nil { pool.Close() return nil, fmt.Errorf("ping db: %w", err) } return pool, nil } // ConnectURL — короткая форма Connect(PoolConfig{DatabaseURL: url}). // Удобно для сервисов, которым не нужны кастомные лимиты. func ConnectURL(url string) (*pgxpool.Pool, error) { cfg := Defaults() cfg.DatabaseURL = url return Connect(cfg) } // ConnectWithRetry — Connect с exponential-backoff'ом для k8s-rollout race: // pod может стартануть до того, как postgres StatefulSet готов. Без // ретраев это даёт CrashLoopBackoff с noisy логами; с ретраями — pod // мирно ждёт N попыток. // // Backoff: 1s, 2s, 4s, 8s, ..., cap 15s. Возвращает первый успешный pool // или последнюю ошибку после totalTimeout. Используется telephony — там // rollout-race реально срабатывал; остальные сервисы могут перейти при // необходимости. func ConnectWithRetry(ctx context.Context, url string, totalTimeout time.Duration) (*pgxpool.Pool, error) { cfg := Defaults() cfg.DatabaseURL = url return ConnectWithRetryCfg(ctx, cfg, totalTimeout) } // ConnectWithRetryCfg — то же, что ConnectWithRetry, но с кастомным // PoolConfig (для сервисов, которым нужны нестандартные лимиты + retry). func ConnectWithRetryCfg(ctx context.Context, cfg PoolConfig, totalTimeout time.Duration) (*pgxpool.Pool, error) { if totalTimeout <= 0 { totalTimeout = 2 * time.Minute } deadline := time.Now().Add(totalTimeout) delay := time.Second var lastErr error for attempt := 1; ; attempt++ { pool, err := Connect(cfg) if err == nil { if attempt > 1 { slog.Info("db connected after retries", "attempt", attempt) } return pool, nil } lastErr = err if time.Now().Add(delay).After(deadline) { return nil, fmt.Errorf("connect db (after %d attempts): %w", attempt, err) } slog.Warn("db connect failed, retrying", "attempt", attempt, "delay", delay, "error", err) select { case <-ctx.Done(): return nil, fmt.Errorf("ctx cancelled while waiting to retry db: %w (last attempt: %v)", ctx.Err(), lastErr) case <-time.After(delay): } delay *= 2 if delay > 15*time.Second { delay = 15 * time.Second } } } // slowQueryTracer пишет WARN в slog'е для запросов, длиннее threshold. // Логируется duration_ms, обрезанный SQL и error (если был). Не префиксируем // сервисом — это делает caller через slog.With() на старте процесса. type slowQueryTracer struct{ threshold time.Duration } type traceCtxKey struct{} type traceState struct { start time.Time sql string } func (t *slowQueryTracer) TraceQueryStart(ctx context.Context, _ *pgx.Conn, data pgx.TraceQueryStartData) context.Context { return context.WithValue(ctx, traceCtxKey{}, traceState{start: time.Now(), sql: data.SQL}) } func (t *slowQueryTracer) TraceQueryEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceQueryEndData) { st, ok := ctx.Value(traceCtxKey{}).(traceState) if !ok { return } dur := time.Since(st.start) if dur < t.threshold { return } sql := st.sql if len(sql) > 500 { sql = sql[:500] + "…" } attrs := []any{"duration_ms", dur.Milliseconds(), "sql", sql} if data.Err != nil { attrs = append(attrs, "error", data.Err) } slog.Warn("slow query", attrs...) }