141 lines
4.7 KiB
Go
141 lines
4.7 KiB
Go
// Package db — общий pgxpool init для микросервисов: единая конфигурация
|
||
// connection-pool'а, slow-query tracer, retry на startup.
|
||
//
|
||
// История: каждый сервис (booking, candidates, tasks, meet, leaders-reports,
|
||
// telephony, hhru, webhooks-apps, deals) копировал этот код один в один.
|
||
// Разъехавшиеся mode сложно tune'ить централизованно — теперь tweak'аем
|
||
// здесь, а сервисы пересобираются.
|
||
package db
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log/slog"
|
||
"time"
|
||
|
||
"github.com/jackc/pgx/v5"
|
||
"github.com/jackc/pgx/v5/pgxpool"
|
||
)
|
||
|
||
// PoolConfig — параметры пула. Дефолты идентичны тому, что было в каждом
|
||
// сервисе (MaxConns=10, MinConns=2). Slow-query threshold — 500ms.
|
||
//
|
||
// Если сервис хочет другие лимиты (например, candidates с большим
|
||
// объёмом read'ов), можно передать своё значение; nil/zero — берём
|
||
// default.
|
||
type PoolConfig struct {
|
||
DatabaseURL string
|
||
MaxConns int32
|
||
MinConns int32
|
||
MaxConnIdleTime time.Duration
|
||
MaxConnLifetime time.Duration
|
||
SlowQueryThreshold time.Duration
|
||
ConnectTimeout time.Duration
|
||
}
|
||
|
||
// Defaults возвращает PoolConfig с дефолтными значениями, выставлены
|
||
// только числовые лимиты — DatabaseURL caller подставляет сам.
|
||
func Defaults() PoolConfig {
|
||
return PoolConfig{
|
||
MaxConns: 10,
|
||
MinConns: 2,
|
||
MaxConnIdleTime: 5 * time.Minute,
|
||
MaxConnLifetime: 30 * time.Minute,
|
||
SlowQueryThreshold: 500 * time.Millisecond,
|
||
ConnectTimeout: 10 * time.Second,
|
||
}
|
||
}
|
||
|
||
// Connect открывает pgxpool с заданной конфигурацией и tracer'ом slow-queries.
|
||
// Возвращает live-pool после успешного Ping; на ошибке pool закрывается.
|
||
func Connect(cfg PoolConfig) (*pgxpool.Pool, error) {
|
||
if cfg.DatabaseURL == "" {
|
||
return nil, fmt.Errorf("DatabaseURL is required")
|
||
}
|
||
def := Defaults()
|
||
if cfg.MaxConns == 0 {
|
||
cfg.MaxConns = def.MaxConns
|
||
}
|
||
if cfg.MinConns == 0 {
|
||
cfg.MinConns = def.MinConns
|
||
}
|
||
if cfg.MaxConnIdleTime == 0 {
|
||
cfg.MaxConnIdleTime = def.MaxConnIdleTime
|
||
}
|
||
if cfg.MaxConnLifetime == 0 {
|
||
cfg.MaxConnLifetime = def.MaxConnLifetime
|
||
}
|
||
if cfg.SlowQueryThreshold == 0 {
|
||
cfg.SlowQueryThreshold = def.SlowQueryThreshold
|
||
}
|
||
if cfg.ConnectTimeout == 0 {
|
||
cfg.ConnectTimeout = def.ConnectTimeout
|
||
}
|
||
|
||
poolCfg, err := pgxpool.ParseConfig(cfg.DatabaseURL)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("parse db config: %w", err)
|
||
}
|
||
poolCfg.MaxConns = cfg.MaxConns
|
||
poolCfg.MinConns = cfg.MinConns
|
||
poolCfg.MaxConnIdleTime = cfg.MaxConnIdleTime
|
||
poolCfg.MaxConnLifetime = cfg.MaxConnLifetime
|
||
poolCfg.ConnConfig.Tracer = &slowQueryTracer{threshold: cfg.SlowQueryThreshold}
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), cfg.ConnectTimeout)
|
||
defer cancel()
|
||
|
||
pool, err := pgxpool.NewWithConfig(ctx, poolCfg)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("create pool: %w", err)
|
||
}
|
||
if err := pool.Ping(ctx); err != nil {
|
||
pool.Close()
|
||
return nil, fmt.Errorf("ping db: %w", err)
|
||
}
|
||
return pool, nil
|
||
}
|
||
|
||
// ConnectURL — короткая форма Connect(PoolConfig{DatabaseURL: url}).
|
||
// Удобно для сервисов, которым не нужны кастомные лимиты.
|
||
func ConnectURL(url string) (*pgxpool.Pool, error) {
|
||
cfg := Defaults()
|
||
cfg.DatabaseURL = url
|
||
return Connect(cfg)
|
||
}
|
||
|
||
// slowQueryTracer пишет WARN в slog'е для запросов, длиннее threshold.
|
||
// Логируется duration_ms, обрезанный SQL и error (если был). Не префиксируем
|
||
// сервисом — это делает caller через slog.With() на старте процесса.
|
||
type slowQueryTracer struct{ threshold time.Duration }
|
||
|
||
type traceCtxKey struct{}
|
||
type traceState struct {
|
||
start time.Time
|
||
sql string
|
||
}
|
||
|
||
func (t *slowQueryTracer) TraceQueryStart(ctx context.Context, _ *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
|
||
return context.WithValue(ctx, traceCtxKey{}, traceState{start: time.Now(), sql: data.SQL})
|
||
}
|
||
|
||
func (t *slowQueryTracer) TraceQueryEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceQueryEndData) {
|
||
st, ok := ctx.Value(traceCtxKey{}).(traceState)
|
||
if !ok {
|
||
return
|
||
}
|
||
dur := time.Since(st.start)
|
||
if dur < t.threshold {
|
||
return
|
||
}
|
||
sql := st.sql
|
||
if len(sql) > 500 {
|
||
sql = sql[:500] + "…"
|
||
}
|
||
attrs := []any{"duration_ms", dur.Milliseconds(), "sql", sql}
|
||
if data.Err != nil {
|
||
attrs = append(attrs, "error", data.Err)
|
||
}
|
||
slog.Warn("slow query", attrs...)
|
||
}
|