From 15a8804307116e143a7603f3f81084a7bf8d3a77 Mon Sep 17 00:00:00 2001 From: Grendgi Date: Wed, 20 May 2026 14:18:03 +0300 Subject: [PATCH] =?UTF-8?q?db:=20ConnectWithRetry=20=D0=B4=D0=BB=D1=8F=20k?= =?UTF-8?q?8s=20rollout=20race?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Telephony раньше имел свой ConnectWithRetry (1s→2s→4s→8s, cap 15s, totalTimeout 2 мин), чтобы пережить старт pod'а до готовности postgres StatefulSet'а. При миграции на portal-common.db.ConnectURL retry потерян, pod CrashLoopBackoff с noisy логами. Добавил ConnectWithRetry(Cfg) с тем же backoff'ом — telephony и любой сервис, которому нужна in-process устойчивость, может использовать вместо ConnectURL. --- db/pool.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/db/pool.go b/db/pool.go index 2c9ce49..ca399e4 100644 --- a/db/pool.go +++ b/db/pool.go @@ -104,6 +104,55 @@ func ConnectURL(url string) (*pgxpool.Pool, error) { return Connect(cfg) } +// ConnectWithRetry — Connect с exponential-backoff'ом для k8s-rollout race: +// pod может стартануть до того, как postgres StatefulSet готов. Без +// ретраев это даёт CrashLoopBackoff с noisy логами; с ретраями — pod +// мирно ждёт N попыток. +// +// Backoff: 1s, 2s, 4s, 8s, ..., cap 15s. Возвращает первый успешный pool +// или последнюю ошибку после totalTimeout. Используется telephony — там +// rollout-race реально срабатывал; остальные сервисы могут перейти при +// необходимости. +func ConnectWithRetry(ctx context.Context, url string, totalTimeout time.Duration) (*pgxpool.Pool, error) { + cfg := Defaults() + cfg.DatabaseURL = url + return ConnectWithRetryCfg(ctx, cfg, totalTimeout) +} + +// ConnectWithRetryCfg — то же, что ConnectWithRetry, но с кастомным +// PoolConfig (для сервисов, которым нужны нестандартные лимиты + retry). +func ConnectWithRetryCfg(ctx context.Context, cfg PoolConfig, totalTimeout time.Duration) (*pgxpool.Pool, error) { + if totalTimeout <= 0 { + totalTimeout = 2 * time.Minute + } + deadline := time.Now().Add(totalTimeout) + delay := time.Second + var lastErr error + for attempt := 1; ; attempt++ { + pool, err := Connect(cfg) + if err == nil { + if attempt > 1 { + slog.Info("db connected after retries", "attempt", attempt) + } + return pool, nil + } + lastErr = err + if time.Now().Add(delay).After(deadline) { + return nil, fmt.Errorf("connect db (after %d attempts): %w", attempt, err) + } + slog.Warn("db connect failed, retrying", "attempt", attempt, "delay", delay, "error", err) + select { + case <-ctx.Done(): + return nil, fmt.Errorf("ctx cancelled while waiting to retry db: %w (last attempt: %v)", ctx.Err(), lastErr) + case <-time.After(delay): + } + delay *= 2 + if delay > 15*time.Second { + delay = 15 * time.Second + } + } +} + // slowQueryTracer пишет WARN в slog'е для запросов, длиннее threshold. // Логируется duration_ms, обрезанный SQL и error (если был). Не префиксируем // сервисом — это делает caller через slog.With() на старте процесса.