Telephony раньше имел свой ConnectWithRetry (1s→2s→4s→8s, cap 15s, totalTimeout 2 мин), чтобы пережить старт pod'а до готовности postgres StatefulSet'а. При миграции на portal-common.db.ConnectURL retry потерян, pod CrashLoopBackoff с noisy логами. Добавил ConnectWithRetry(Cfg) с тем же backoff'ом — telephony и любой сервис, которому нужна in-process устойчивость, может использовать вместо ConnectURL.
190 lines
6.7 KiB
Go
190 lines
6.7 KiB
Go
// Package db — общий pgxpool init для микросервисов: единая конфигурация
|
||
// connection-pool'а, slow-query tracer, retry на startup.
|
||
//
|
||
// История: каждый сервис (booking, candidates, tasks, meet, leaders-reports,
|
||
// telephony, hhru, webhooks-apps, deals) копировал этот код один в один.
|
||
// Разъехавшиеся mode сложно tune'ить централизованно — теперь tweak'аем
|
||
// здесь, а сервисы пересобираются.
|
||
package db
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log/slog"
|
||
"time"
|
||
|
||
"github.com/jackc/pgx/v5"
|
||
"github.com/jackc/pgx/v5/pgxpool"
|
||
)
|
||
|
||
// PoolConfig — параметры пула. Дефолты идентичны тому, что было в каждом
|
||
// сервисе (MaxConns=10, MinConns=2). Slow-query threshold — 500ms.
|
||
//
|
||
// Если сервис хочет другие лимиты (например, candidates с большим
|
||
// объёмом read'ов), можно передать своё значение; nil/zero — берём
|
||
// default.
|
||
type PoolConfig struct {
|
||
DatabaseURL string
|
||
MaxConns int32
|
||
MinConns int32
|
||
MaxConnIdleTime time.Duration
|
||
MaxConnLifetime time.Duration
|
||
SlowQueryThreshold time.Duration
|
||
ConnectTimeout time.Duration
|
||
}
|
||
|
||
// Defaults возвращает PoolConfig с дефолтными значениями, выставлены
|
||
// только числовые лимиты — DatabaseURL caller подставляет сам.
|
||
func Defaults() PoolConfig {
|
||
return PoolConfig{
|
||
MaxConns: 10,
|
||
MinConns: 2,
|
||
MaxConnIdleTime: 5 * time.Minute,
|
||
MaxConnLifetime: 30 * time.Minute,
|
||
SlowQueryThreshold: 500 * time.Millisecond,
|
||
ConnectTimeout: 10 * time.Second,
|
||
}
|
||
}
|
||
|
||
// Connect открывает pgxpool с заданной конфигурацией и tracer'ом slow-queries.
|
||
// Возвращает live-pool после успешного Ping; на ошибке pool закрывается.
|
||
func Connect(cfg PoolConfig) (*pgxpool.Pool, error) {
|
||
if cfg.DatabaseURL == "" {
|
||
return nil, fmt.Errorf("DatabaseURL is required")
|
||
}
|
||
def := Defaults()
|
||
if cfg.MaxConns == 0 {
|
||
cfg.MaxConns = def.MaxConns
|
||
}
|
||
if cfg.MinConns == 0 {
|
||
cfg.MinConns = def.MinConns
|
||
}
|
||
if cfg.MaxConnIdleTime == 0 {
|
||
cfg.MaxConnIdleTime = def.MaxConnIdleTime
|
||
}
|
||
if cfg.MaxConnLifetime == 0 {
|
||
cfg.MaxConnLifetime = def.MaxConnLifetime
|
||
}
|
||
if cfg.SlowQueryThreshold == 0 {
|
||
cfg.SlowQueryThreshold = def.SlowQueryThreshold
|
||
}
|
||
if cfg.ConnectTimeout == 0 {
|
||
cfg.ConnectTimeout = def.ConnectTimeout
|
||
}
|
||
|
||
poolCfg, err := pgxpool.ParseConfig(cfg.DatabaseURL)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("parse db config: %w", err)
|
||
}
|
||
poolCfg.MaxConns = cfg.MaxConns
|
||
poolCfg.MinConns = cfg.MinConns
|
||
poolCfg.MaxConnIdleTime = cfg.MaxConnIdleTime
|
||
poolCfg.MaxConnLifetime = cfg.MaxConnLifetime
|
||
poolCfg.ConnConfig.Tracer = &slowQueryTracer{threshold: cfg.SlowQueryThreshold}
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), cfg.ConnectTimeout)
|
||
defer cancel()
|
||
|
||
pool, err := pgxpool.NewWithConfig(ctx, poolCfg)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("create pool: %w", err)
|
||
}
|
||
if err := pool.Ping(ctx); err != nil {
|
||
pool.Close()
|
||
return nil, fmt.Errorf("ping db: %w", err)
|
||
}
|
||
return pool, nil
|
||
}
|
||
|
||
// ConnectURL — короткая форма Connect(PoolConfig{DatabaseURL: url}).
|
||
// Удобно для сервисов, которым не нужны кастомные лимиты.
|
||
func ConnectURL(url string) (*pgxpool.Pool, error) {
|
||
cfg := Defaults()
|
||
cfg.DatabaseURL = url
|
||
return Connect(cfg)
|
||
}
|
||
|
||
// ConnectWithRetry — Connect с exponential-backoff'ом для k8s-rollout race:
|
||
// pod может стартануть до того, как postgres StatefulSet готов. Без
|
||
// ретраев это даёт CrashLoopBackoff с noisy логами; с ретраями — pod
|
||
// мирно ждёт N попыток.
|
||
//
|
||
// Backoff: 1s, 2s, 4s, 8s, ..., cap 15s. Возвращает первый успешный pool
|
||
// или последнюю ошибку после totalTimeout. Используется telephony — там
|
||
// rollout-race реально срабатывал; остальные сервисы могут перейти при
|
||
// необходимости.
|
||
func ConnectWithRetry(ctx context.Context, url string, totalTimeout time.Duration) (*pgxpool.Pool, error) {
|
||
cfg := Defaults()
|
||
cfg.DatabaseURL = url
|
||
return ConnectWithRetryCfg(ctx, cfg, totalTimeout)
|
||
}
|
||
|
||
// ConnectWithRetryCfg — то же, что ConnectWithRetry, но с кастомным
|
||
// PoolConfig (для сервисов, которым нужны нестандартные лимиты + retry).
|
||
func ConnectWithRetryCfg(ctx context.Context, cfg PoolConfig, totalTimeout time.Duration) (*pgxpool.Pool, error) {
|
||
if totalTimeout <= 0 {
|
||
totalTimeout = 2 * time.Minute
|
||
}
|
||
deadline := time.Now().Add(totalTimeout)
|
||
delay := time.Second
|
||
var lastErr error
|
||
for attempt := 1; ; attempt++ {
|
||
pool, err := Connect(cfg)
|
||
if err == nil {
|
||
if attempt > 1 {
|
||
slog.Info("db connected after retries", "attempt", attempt)
|
||
}
|
||
return pool, nil
|
||
}
|
||
lastErr = err
|
||
if time.Now().Add(delay).After(deadline) {
|
||
return nil, fmt.Errorf("connect db (after %d attempts): %w", attempt, err)
|
||
}
|
||
slog.Warn("db connect failed, retrying", "attempt", attempt, "delay", delay, "error", err)
|
||
select {
|
||
case <-ctx.Done():
|
||
return nil, fmt.Errorf("ctx cancelled while waiting to retry db: %w (last attempt: %v)", ctx.Err(), lastErr)
|
||
case <-time.After(delay):
|
||
}
|
||
delay *= 2
|
||
if delay > 15*time.Second {
|
||
delay = 15 * time.Second
|
||
}
|
||
}
|
||
}
|
||
|
||
// slowQueryTracer пишет WARN в slog'е для запросов, длиннее threshold.
|
||
// Логируется duration_ms, обрезанный SQL и error (если был). Не префиксируем
|
||
// сервисом — это делает caller через slog.With() на старте процесса.
|
||
type slowQueryTracer struct{ threshold time.Duration }
|
||
|
||
type traceCtxKey struct{}
|
||
type traceState struct {
|
||
start time.Time
|
||
sql string
|
||
}
|
||
|
||
func (t *slowQueryTracer) TraceQueryStart(ctx context.Context, _ *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
|
||
return context.WithValue(ctx, traceCtxKey{}, traceState{start: time.Now(), sql: data.SQL})
|
||
}
|
||
|
||
func (t *slowQueryTracer) TraceQueryEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceQueryEndData) {
|
||
st, ok := ctx.Value(traceCtxKey{}).(traceState)
|
||
if !ok {
|
||
return
|
||
}
|
||
dur := time.Since(st.start)
|
||
if dur < t.threshold {
|
||
return
|
||
}
|
||
sql := st.sql
|
||
if len(sql) > 500 {
|
||
sql = sql[:500] + "…"
|
||
}
|
||
attrs := []any{"duration_ms", dur.Milliseconds(), "sql", sql}
|
||
if data.Err != nil {
|
||
attrs = append(attrs, "error", data.Err)
|
||
}
|
||
slog.Warn("slow query", attrs...)
|
||
}
|