package migrate import ( "context" "fmt" "log/slog" "os" "path/filepath" "sort" "strings" "github.com/jackc/pgx/v5/pgxpool" ) func Run(ctx context.Context, pool *pgxpool.Pool, migrationsDir string) error { tx, err := pool.Begin(ctx) if err != nil { return fmt.Errorf("begin migration tx: %w", err) } defer tx.Rollback(ctx) if _, err := tx.Exec(ctx, `SELECT pg_advisory_xact_lock(8507432101)`); err != nil { return fmt.Errorf("acquire migration lock: %w", err) } _, err = tx.Exec(ctx, ` CREATE TABLE IF NOT EXISTS schema_migrations ( version VARCHAR(255) PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT now() ) `) if err != nil { return fmt.Errorf("create migrations table: %w", err) } files, err := filepath.Glob(filepath.Join(migrationsDir, "*.up.sql")) if err != nil { return fmt.Errorf("glob migrations: %w", err) } sort.Strings(files) for _, f := range files { version := strings.TrimSuffix(filepath.Base(f), ".up.sql") var exists bool if err := tx.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM schema_migrations WHERE version = $1)`, version).Scan(&exists); err != nil { return fmt.Errorf("check migration %s: %w", version, err) } if exists { continue } sql, err := os.ReadFile(f) if err != nil { return fmt.Errorf("read migration %s: %w", version, err) } if _, err := tx.Exec(ctx, string(sql)); err != nil { return fmt.Errorf("apply migration %s: %w", version, err) } if _, err := tx.Exec(ctx, `INSERT INTO schema_migrations (version) VALUES ($1)`, version); err != nil { return fmt.Errorf("record migration %s: %w", version, err) } slog.Info("applied migration", "version", version) } return tx.Commit(ctx) }