fix: serialize files migrations during rollout
This commit is contained in:
@@ -55,5 +55,4 @@ jobs:
|
|||||||
kubectl apply -f k8s/server-service.yaml
|
kubectl apply -f k8s/server-service.yaml
|
||||||
kubectl -n files set image deployment/files-server \
|
kubectl -n files set image deployment/files-server \
|
||||||
files-server=${{ env.NODE_REGISTRY }}/admin/files-server:${{ github.sha }}
|
files-server=${{ env.NODE_REGISTRY }}/admin/files-server:${{ github.sha }}
|
||||||
kubectl -n files rollout status deployment/files-server --timeout=120s
|
kubectl -n files rollout status deployment/files-server --timeout=240s
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,17 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func Run(ctx context.Context, pool *pgxpool.Pool, migrationsDir string) error {
|
func Run(ctx context.Context, pool *pgxpool.Pool, migrationsDir string) error {
|
||||||
_, err := pool.Exec(ctx, `
|
tx, err := pool.Begin(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("begin migration tx: %w", err)
|
||||||
|
}
|
||||||
|
defer tx.Rollback(ctx)
|
||||||
|
|
||||||
|
if _, err := tx.Exec(ctx, `SELECT pg_advisory_xact_lock(8507432101)`); err != nil {
|
||||||
|
return fmt.Errorf("acquire migration lock: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec(ctx, `
|
||||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||||
version VARCHAR(255) PRIMARY KEY,
|
version VARCHAR(255) PRIMARY KEY,
|
||||||
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||||
@@ -32,7 +42,7 @@ func Run(ctx context.Context, pool *pgxpool.Pool, migrationsDir string) error {
|
|||||||
for _, f := range files {
|
for _, f := range files {
|
||||||
version := strings.TrimSuffix(filepath.Base(f), ".up.sql")
|
version := strings.TrimSuffix(filepath.Base(f), ".up.sql")
|
||||||
var exists bool
|
var exists bool
|
||||||
if err := pool.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM schema_migrations WHERE version = $1)`, version).Scan(&exists); err != nil {
|
if err := tx.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM schema_migrations WHERE version = $1)`, version).Scan(&exists); err != nil {
|
||||||
return fmt.Errorf("check migration %s: %w", version, err)
|
return fmt.Errorf("check migration %s: %w", version, err)
|
||||||
}
|
}
|
||||||
if exists {
|
if exists {
|
||||||
@@ -42,13 +52,13 @@ func Run(ctx context.Context, pool *pgxpool.Pool, migrationsDir string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("read migration %s: %w", version, err)
|
return fmt.Errorf("read migration %s: %w", version, err)
|
||||||
}
|
}
|
||||||
if _, err := pool.Exec(ctx, string(sql)); err != nil {
|
if _, err := tx.Exec(ctx, string(sql)); err != nil {
|
||||||
return fmt.Errorf("apply migration %s: %w", version, err)
|
return fmt.Errorf("apply migration %s: %w", version, err)
|
||||||
}
|
}
|
||||||
if _, err := pool.Exec(ctx, `INSERT INTO schema_migrations (version) VALUES ($1)`, version); err != nil {
|
if _, err := tx.Exec(ctx, `INSERT INTO schema_migrations (version) VALUES ($1)`, version); err != nil {
|
||||||
return fmt.Errorf("record migration %s: %w", version, err)
|
return fmt.Errorf("record migration %s: %w", version, err)
|
||||||
}
|
}
|
||||||
slog.Info("applied migration", "version", version)
|
slog.Info("applied migration", "version", version)
|
||||||
}
|
}
|
||||||
return nil
|
return tx.Commit(ctx)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ spec:
|
|||||||
path: /healthz
|
path: /healthz
|
||||||
port: 3001
|
port: 3001
|
||||||
periodSeconds: 5
|
periodSeconds: 5
|
||||||
failureThreshold: 30
|
failureThreshold: 60
|
||||||
livenessProbe:
|
livenessProbe:
|
||||||
httpGet:
|
httpGet:
|
||||||
path: /healthz
|
path: /healthz
|
||||||
@@ -84,4 +84,3 @@ spec:
|
|||||||
target:
|
target:
|
||||||
type: Utilization
|
type: Utilization
|
||||||
averageUtilization: 70
|
averageUtilization: 70
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user