Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
15a8804307 |
49
db/pool.go
49
db/pool.go
@@ -104,6 +104,55 @@ func ConnectURL(url string) (*pgxpool.Pool, error) {
|
|||||||
return Connect(cfg)
|
return Connect(cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConnectWithRetry — Connect с exponential-backoff'ом для k8s-rollout race:
|
||||||
|
// pod может стартануть до того, как postgres StatefulSet готов. Без
|
||||||
|
// ретраев это даёт CrashLoopBackoff с noisy логами; с ретраями — pod
|
||||||
|
// мирно ждёт N попыток.
|
||||||
|
//
|
||||||
|
// Backoff: 1s, 2s, 4s, 8s, ..., cap 15s. Возвращает первый успешный pool
|
||||||
|
// или последнюю ошибку после totalTimeout. Используется telephony — там
|
||||||
|
// rollout-race реально срабатывал; остальные сервисы могут перейти при
|
||||||
|
// необходимости.
|
||||||
|
func ConnectWithRetry(ctx context.Context, url string, totalTimeout time.Duration) (*pgxpool.Pool, error) {
|
||||||
|
cfg := Defaults()
|
||||||
|
cfg.DatabaseURL = url
|
||||||
|
return ConnectWithRetryCfg(ctx, cfg, totalTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConnectWithRetryCfg — то же, что ConnectWithRetry, но с кастомным
|
||||||
|
// PoolConfig (для сервисов, которым нужны нестандартные лимиты + retry).
|
||||||
|
func ConnectWithRetryCfg(ctx context.Context, cfg PoolConfig, totalTimeout time.Duration) (*pgxpool.Pool, error) {
|
||||||
|
if totalTimeout <= 0 {
|
||||||
|
totalTimeout = 2 * time.Minute
|
||||||
|
}
|
||||||
|
deadline := time.Now().Add(totalTimeout)
|
||||||
|
delay := time.Second
|
||||||
|
var lastErr error
|
||||||
|
for attempt := 1; ; attempt++ {
|
||||||
|
pool, err := Connect(cfg)
|
||||||
|
if err == nil {
|
||||||
|
if attempt > 1 {
|
||||||
|
slog.Info("db connected after retries", "attempt", attempt)
|
||||||
|
}
|
||||||
|
return pool, nil
|
||||||
|
}
|
||||||
|
lastErr = err
|
||||||
|
if time.Now().Add(delay).After(deadline) {
|
||||||
|
return nil, fmt.Errorf("connect db (after %d attempts): %w", attempt, err)
|
||||||
|
}
|
||||||
|
slog.Warn("db connect failed, retrying", "attempt", attempt, "delay", delay, "error", err)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, fmt.Errorf("ctx cancelled while waiting to retry db: %w (last attempt: %v)", ctx.Err(), lastErr)
|
||||||
|
case <-time.After(delay):
|
||||||
|
}
|
||||||
|
delay *= 2
|
||||||
|
if delay > 15*time.Second {
|
||||||
|
delay = 15 * time.Second
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// slowQueryTracer пишет WARN в slog'е для запросов, длиннее threshold.
|
// slowQueryTracer пишет WARN в slog'е для запросов, длиннее threshold.
|
||||||
// Логируется duration_ms, обрезанный SQL и error (если был). Не префиксируем
|
// Логируется duration_ms, обрезанный SQL и error (если был). Не префиксируем
|
||||||
// сервисом — это делает caller через slog.With() на старте процесса.
|
// сервисом — это делает caller через slog.With() на старте процесса.
|
||||||
|
|||||||
Reference in New Issue
Block a user