Files
portal-common/db/pool.go
Grendgi 15a8804307 db: ConnectWithRetry для k8s rollout race
Telephony раньше имел свой ConnectWithRetry (1s→2s→4s→8s, cap 15s,
totalTimeout 2 мин), чтобы пережить старт pod'а до готовности postgres
StatefulSet'а. При миграции на portal-common.db.ConnectURL retry потерян,
pod CrashLoopBackoff с noisy логами. Добавил ConnectWithRetry(Cfg)
с тем же backoff'ом — telephony и любой сервис, которому нужна
in-process устойчивость, может использовать вместо ConnectURL.
2026-05-20 14:18:03 +03:00

190 lines
6.7 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package db — общий pgxpool init для микросервисов: единая конфигурация
// connection-pool'а, slow-query tracer, retry на startup.
//
// История: каждый сервис (booking, candidates, tasks, meet, leaders-reports,
// telephony, hhru, webhooks-apps, deals) копировал этот код один в один.
// Разъехавшиеся mode сложно tune'ить централизованно — теперь tweak'аем
// здесь, а сервисы пересобираются.
package db
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// PoolConfig — параметры пула. Дефолты идентичны тому, что было в каждом
// сервисе (MaxConns=10, MinConns=2). Slow-query threshold — 500ms.
//
// Если сервис хочет другие лимиты (например, candidates с большим
// объёмом read'ов), можно передать своё значение; nil/zero — берём
// default.
type PoolConfig struct {
DatabaseURL string
MaxConns int32
MinConns int32
MaxConnIdleTime time.Duration
MaxConnLifetime time.Duration
SlowQueryThreshold time.Duration
ConnectTimeout time.Duration
}
// Defaults возвращает PoolConfig с дефолтными значениями, выставлены
// только числовые лимиты — DatabaseURL caller подставляет сам.
func Defaults() PoolConfig {
return PoolConfig{
MaxConns: 10,
MinConns: 2,
MaxConnIdleTime: 5 * time.Minute,
MaxConnLifetime: 30 * time.Minute,
SlowQueryThreshold: 500 * time.Millisecond,
ConnectTimeout: 10 * time.Second,
}
}
// Connect открывает pgxpool с заданной конфигурацией и tracer'ом slow-queries.
// Возвращает live-pool после успешного Ping; на ошибке pool закрывается.
func Connect(cfg PoolConfig) (*pgxpool.Pool, error) {
if cfg.DatabaseURL == "" {
return nil, fmt.Errorf("DatabaseURL is required")
}
def := Defaults()
if cfg.MaxConns == 0 {
cfg.MaxConns = def.MaxConns
}
if cfg.MinConns == 0 {
cfg.MinConns = def.MinConns
}
if cfg.MaxConnIdleTime == 0 {
cfg.MaxConnIdleTime = def.MaxConnIdleTime
}
if cfg.MaxConnLifetime == 0 {
cfg.MaxConnLifetime = def.MaxConnLifetime
}
if cfg.SlowQueryThreshold == 0 {
cfg.SlowQueryThreshold = def.SlowQueryThreshold
}
if cfg.ConnectTimeout == 0 {
cfg.ConnectTimeout = def.ConnectTimeout
}
poolCfg, err := pgxpool.ParseConfig(cfg.DatabaseURL)
if err != nil {
return nil, fmt.Errorf("parse db config: %w", err)
}
poolCfg.MaxConns = cfg.MaxConns
poolCfg.MinConns = cfg.MinConns
poolCfg.MaxConnIdleTime = cfg.MaxConnIdleTime
poolCfg.MaxConnLifetime = cfg.MaxConnLifetime
poolCfg.ConnConfig.Tracer = &slowQueryTracer{threshold: cfg.SlowQueryThreshold}
ctx, cancel := context.WithTimeout(context.Background(), cfg.ConnectTimeout)
defer cancel()
pool, err := pgxpool.NewWithConfig(ctx, poolCfg)
if err != nil {
return nil, fmt.Errorf("create pool: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("ping db: %w", err)
}
return pool, nil
}
// ConnectURL — короткая форма Connect(PoolConfig{DatabaseURL: url}).
// Удобно для сервисов, которым не нужны кастомные лимиты.
func ConnectURL(url string) (*pgxpool.Pool, error) {
cfg := Defaults()
cfg.DatabaseURL = url
return Connect(cfg)
}
// ConnectWithRetry — Connect с exponential-backoff'ом для k8s-rollout race:
// pod может стартануть до того, как postgres StatefulSet готов. Без
// ретраев это даёт CrashLoopBackoff с noisy логами; с ретраями — pod
// мирно ждёт N попыток.
//
// Backoff: 1s, 2s, 4s, 8s, ..., cap 15s. Возвращает первый успешный pool
// или последнюю ошибку после totalTimeout. Используется telephony — там
// rollout-race реально срабатывал; остальные сервисы могут перейти при
// необходимости.
func ConnectWithRetry(ctx context.Context, url string, totalTimeout time.Duration) (*pgxpool.Pool, error) {
cfg := Defaults()
cfg.DatabaseURL = url
return ConnectWithRetryCfg(ctx, cfg, totalTimeout)
}
// ConnectWithRetryCfg — то же, что ConnectWithRetry, но с кастомным
// PoolConfig (для сервисов, которым нужны нестандартные лимиты + retry).
func ConnectWithRetryCfg(ctx context.Context, cfg PoolConfig, totalTimeout time.Duration) (*pgxpool.Pool, error) {
if totalTimeout <= 0 {
totalTimeout = 2 * time.Minute
}
deadline := time.Now().Add(totalTimeout)
delay := time.Second
var lastErr error
for attempt := 1; ; attempt++ {
pool, err := Connect(cfg)
if err == nil {
if attempt > 1 {
slog.Info("db connected after retries", "attempt", attempt)
}
return pool, nil
}
lastErr = err
if time.Now().Add(delay).After(deadline) {
return nil, fmt.Errorf("connect db (after %d attempts): %w", attempt, err)
}
slog.Warn("db connect failed, retrying", "attempt", attempt, "delay", delay, "error", err)
select {
case <-ctx.Done():
return nil, fmt.Errorf("ctx cancelled while waiting to retry db: %w (last attempt: %v)", ctx.Err(), lastErr)
case <-time.After(delay):
}
delay *= 2
if delay > 15*time.Second {
delay = 15 * time.Second
}
}
}
// slowQueryTracer пишет WARN в slog'е для запросов, длиннее threshold.
// Логируется duration_ms, обрезанный SQL и error (если был). Не префиксируем
// сервисом — это делает caller через slog.With() на старте процесса.
type slowQueryTracer struct{ threshold time.Duration }
type traceCtxKey struct{}
type traceState struct {
start time.Time
sql string
}
func (t *slowQueryTracer) TraceQueryStart(ctx context.Context, _ *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
return context.WithValue(ctx, traceCtxKey{}, traceState{start: time.Now(), sql: data.SQL})
}
func (t *slowQueryTracer) TraceQueryEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceQueryEndData) {
st, ok := ctx.Value(traceCtxKey{}).(traceState)
if !ok {
return
}
dur := time.Since(st.start)
if dur < t.threshold {
return
}
sql := st.sql
if len(sql) > 500 {
sql = sql[:500] + "…"
}
attrs := []any{"duration_ms", dur.Milliseconds(), "sql", sql}
if data.Err != nil {
attrs = append(attrs, "error", data.Err)
}
slog.Warn("slow query", attrs...)
}