From 5d721186cd3035d6d38c2f0bf243a51b4b1efbc9 Mon Sep 17 00:00:00 2001 From: Grendgi Date: Tue, 16 Jun 2026 13:11:24 +0300 Subject: [PATCH] fix: serialize files migrations during rollout --- .gitea/workflows/deploy.yaml | 3 +-- internal/migrate/migrate.go | 20 +++++++++++++++----- k8s/server-deployment.yaml | 3 +-- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/.gitea/workflows/deploy.yaml b/.gitea/workflows/deploy.yaml index 3d5b873..d13fa72 100644 --- a/.gitea/workflows/deploy.yaml +++ b/.gitea/workflows/deploy.yaml @@ -55,5 +55,4 @@ jobs: kubectl apply -f k8s/server-service.yaml kubectl -n files set image deployment/files-server \ files-server=${{ env.NODE_REGISTRY }}/admin/files-server:${{ github.sha }} - kubectl -n files rollout status deployment/files-server --timeout=120s - + kubectl -n files rollout status deployment/files-server --timeout=240s diff --git a/internal/migrate/migrate.go b/internal/migrate/migrate.go index 5678d24..ebf889a 100644 --- a/internal/migrate/migrate.go +++ b/internal/migrate/migrate.go @@ -13,7 +13,17 @@ import ( ) func Run(ctx context.Context, pool *pgxpool.Pool, migrationsDir string) error { - _, err := pool.Exec(ctx, ` + tx, err := pool.Begin(ctx) + if err != nil { + return fmt.Errorf("begin migration tx: %w", err) + } + defer tx.Rollback(ctx) + + if _, err := tx.Exec(ctx, `SELECT pg_advisory_xact_lock(8507432101)`); err != nil { + return fmt.Errorf("acquire migration lock: %w", err) + } + + _, err = tx.Exec(ctx, ` CREATE TABLE IF NOT EXISTS schema_migrations ( version VARCHAR(255) PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT now() @@ -32,7 +42,7 @@ func Run(ctx context.Context, pool *pgxpool.Pool, migrationsDir string) error { for _, f := range files { version := strings.TrimSuffix(filepath.Base(f), ".up.sql") var exists bool - if err := pool.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM schema_migrations WHERE version = $1)`, version).Scan(&exists); err != nil { + if err := tx.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM schema_migrations WHERE version = $1)`, version).Scan(&exists); err != nil { return fmt.Errorf("check migration %s: %w", version, err) } if exists { @@ -42,13 +52,13 @@ func Run(ctx context.Context, pool *pgxpool.Pool, migrationsDir string) error { if err != nil { return fmt.Errorf("read migration %s: %w", version, err) } - if _, err := pool.Exec(ctx, string(sql)); err != nil { + if _, err := tx.Exec(ctx, string(sql)); err != nil { return fmt.Errorf("apply migration %s: %w", version, err) } - if _, err := pool.Exec(ctx, `INSERT INTO schema_migrations (version) VALUES ($1)`, version); err != nil { + if _, err := tx.Exec(ctx, `INSERT INTO schema_migrations (version) VALUES ($1)`, version); err != nil { return fmt.Errorf("record migration %s: %w", version, err) } slog.Info("applied migration", "version", version) } - return nil + return tx.Commit(ctx) } diff --git a/k8s/server-deployment.yaml b/k8s/server-deployment.yaml index f26f93c..cba9d6e 100644 --- a/k8s/server-deployment.yaml +++ b/k8s/server-deployment.yaml @@ -46,7 +46,7 @@ spec: path: /healthz port: 3001 periodSeconds: 5 - failureThreshold: 30 + failureThreshold: 60 livenessProbe: httpGet: path: /healthz @@ -84,4 +84,3 @@ spec: target: type: Utilization averageUtilization: 70 -