commit e2f2adf90020b3f9ebc9e7f5381bb9110f379411 Author: Grendgi Date: Mon Jun 8 13:23:10 2026 +0300 Initial AI service skeleton diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..751a610 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +bin/ +dist/ +tmp/ +.DS_Store diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..0020d8d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,23 @@ +FROM golang:1.25-alpine AS builder + +WORKDIR /src + +COPY go.mod go.sum ./ +RUN go mod download + +COPY cmd ./cmd +COPY internal ./internal + +RUN CGO_ENABLED=0 GOOS=linux go build -o /out/ai-service ./cmd/server + +FROM alpine:3.22 + +RUN apk add --no-cache ca-certificates tini + +WORKDIR /app +COPY --from=builder /out/ai-service /usr/local/bin/ai-service + +EXPOSE 8080 + +ENTRYPOINT ["/sbin/tini", "--"] +CMD ["/usr/local/bin/ai-service"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..a746f21 --- /dev/null +++ b/README.md @@ -0,0 +1,29 @@ +# AI Service + +Technical AI job service for Portal workloads. + +The first version owns only AI job lifecycle and metrics. Business data stays in +domain services such as `telephony`, `monitoring-tg` and `monitoring-pf`. + +## API + +- `POST /api/v1/jobs` creates one job. +- `POST /api/v1/jobs/batch` creates many jobs with shared defaults. +- `GET /api/v1/jobs/{id}` returns technical job state and result. +- `POST /api/v1/jobs/{id}/retry` resets failed/running jobs to `pending`. +- `GET /api/v1/stats` returns queue and error counters. +- `GET /healthz` returns process health. +- `GET /readyz` checks PostgreSQL readiness. + +## Configuration + +- `HTTP_HOST`, default `0.0.0.0` +- `HTTP_PORT`, default `8080` +- `DATABASE_URL`, required +- `MIGRATE_ON_START`, default `true` + +## Next integration step + +`telephony` should first mirror low-risk analysis jobs into this service while +continuing local processing. Remote execution can then be enabled by feature +flag per task type. diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..04b760d --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,59 @@ +package main + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "ai-service/internal/config" + "ai-service/internal/httpapi" + "ai-service/internal/migrate" + "ai-service/internal/store" +) + +func main() { + cfg := config.Load() + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil))) + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + db, err := store.Open(ctx, cfg.DatabaseURL) + if err != nil { + slog.Error("db_open_failed", "error", err) + os.Exit(1) + } + defer db.Close() + + if cfg.MigrateOnStart { + if err := migrate.Up(ctx, db); err != nil { + slog.Error("migrate_failed", "error", err) + os.Exit(1) + } + } + + server := &http.Server{ + Addr: fmt.Sprintf("%s:%d", cfg.HTTPHost, cfg.HTTPPort), + Handler: httpapi.NewServer(db, cfg), + ReadHeaderTimeout: 10 * time.Second, + } + + go func() { + <-ctx.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + _ = server.Shutdown(shutdownCtx) + }() + + slog.Info("ai_service_started", "addr", server.Addr) + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + slog.Error("server_failed", "error", err) + os.Exit(1) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..bf74abb --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module ai-service + +go 1.25.0 + +require ( + github.com/google/uuid v1.6.0 + github.com/jackc/pgx/v5 v5.7.6 +) + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + golang.org/x/crypto v0.37.0 // indirect + golang.org/x/sync v0.13.0 // indirect + golang.org/x/text v0.24.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..084d1d3 --- /dev/null +++ b/go.sum @@ -0,0 +1,30 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk= +github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= +golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= +golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= +golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..c24140b --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,53 @@ +package config + +import ( + "os" + "strconv" +) + +type Config struct { + HTTPHost string + HTTPPort int + DatabaseURL string + MigrateOnStart bool +} + +func Load() Config { + return Config{ + HTTPHost: envString("HTTP_HOST", "0.0.0.0"), + HTTPPort: envInt("HTTP_PORT", 8080), + DatabaseURL: envString("DATABASE_URL", ""), + MigrateOnStart: envBool("MIGRATE_ON_START", true), + } +} + +func envString(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func envInt(key string, fallback int) int { + raw := os.Getenv(key) + if raw == "" { + return fallback + } + v, err := strconv.Atoi(raw) + if err != nil { + return fallback + } + return v +} + +func envBool(key string, fallback bool) bool { + raw := os.Getenv(key) + if raw == "" { + return fallback + } + v, err := strconv.ParseBool(raw) + if err != nil { + return fallback + } + return v +} diff --git a/internal/httpapi/helpers.go b/internal/httpapi/helpers.go new file mode 100644 index 0000000..9da2ed5 --- /dev/null +++ b/internal/httpapi/helpers.go @@ -0,0 +1,22 @@ +package httpapi + +import ( + "context" + "encoding/json" + "net/http" + "time" +) + +func contextWithTimeout(r *http.Request, timeout time.Duration) (context.Context, context.CancelFunc) { + return context.WithTimeout(r.Context(), timeout) +} + +func writeJSON(w http.ResponseWriter, status int, payload any) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(payload) +} + +func writeError(w http.ResponseWriter, status int, message string) { + writeJSON(w, status, map[string]string{"error": message}) +} diff --git a/internal/httpapi/server.go b/internal/httpapi/server.go new file mode 100644 index 0000000..74fc4d9 --- /dev/null +++ b/internal/httpapi/server.go @@ -0,0 +1,205 @@ +package httpapi + +import ( + "encoding/json" + "errors" + "net/http" + "strings" + "time" + + "ai-service/internal/config" + "ai-service/internal/model" + "ai-service/internal/store" + + "github.com/google/uuid" +) + +type Server struct { + store *store.Store + cfg config.Config +} + +func NewServer(store *store.Store, cfg config.Config) http.Handler { + return &Server{store: store, cfg: cfg} +} + +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + path := strings.TrimSuffix(r.URL.Path, "/") + if path == "" { + path = "/" + } + switch { + case r.Method == http.MethodGet && path == "/healthz": + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) + case r.Method == http.MethodGet && path == "/readyz": + s.handleReady(w, r) + case r.Method == http.MethodGet && path == "/": + writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"}) + case r.Method == http.MethodPost && path == "/api/v1/jobs": + s.handleCreateJob(w, r) + case r.Method == http.MethodPost && path == "/api/v1/jobs/batch": + s.handleCreateBatch(w, r) + case r.Method == http.MethodGet && strings.HasPrefix(path, "/api/v1/jobs/"): + s.handleGetJob(w, r, path) + case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"): + s.handleRetryJob(w, r, path) + case r.Method == http.MethodGet && path == "/api/v1/stats": + s.handleStats(w, r) + default: + writeError(w, http.StatusNotFound, "not found") + } +} + +func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) { + ctx, cancel := contextWithTimeout(r, 3*time.Second) + defer cancel() + if err := s.store.Ping(ctx); err != nil { + writeError(w, http.StatusServiceUnavailable, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]string{"status": "ready"}) +} + +func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) { + var req model.CreateJob + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "bad json") + return + } + ctx, cancel := contextWithTimeout(r, 8*time.Second) + defer cancel() + job, err := s.store.CreateJob(ctx, req) + if err != nil { + status := http.StatusInternalServerError + if isValidationError(err) { + status = http.StatusBadRequest + } + writeError(w, status, err.Error()) + return + } + writeJSON(w, http.StatusCreated, job) +} + +type createBatchRequest struct { + OwnerService string `json:"owner_service"` + TaskType string `json:"task_type"` + ModelProfile string `json:"model_profile"` + Priority int `json:"priority"` + MaxAttempts int `json:"max_attempts"` + Jobs []model.CreateJob `json:"jobs"` +} + +type createBatchResponse struct { + Jobs []*model.Job `json:"jobs"` +} + +func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) { + var req createBatchRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "bad json") + return + } + if len(req.Jobs) == 0 { + writeError(w, http.StatusBadRequest, "jobs is required") + return + } + ctx, cancel := contextWithTimeout(r, 20*time.Second) + defer cancel() + out := createBatchResponse{Jobs: make([]*model.Job, 0, len(req.Jobs))} + for _, item := range req.Jobs { + if item.OwnerService == "" { + item.OwnerService = req.OwnerService + } + if item.TaskType == "" { + item.TaskType = req.TaskType + } + if item.ModelProfile == "" { + item.ModelProfile = req.ModelProfile + } + if item.Priority == 0 { + item.Priority = req.Priority + } + if item.MaxAttempts == 0 { + item.MaxAttempts = req.MaxAttempts + } + job, err := s.store.CreateJob(ctx, item) + if err != nil { + status := http.StatusInternalServerError + if isValidationError(err) { + status = http.StatusBadRequest + } + writeError(w, status, err.Error()) + return + } + out.Jobs = append(out.Jobs, job) + } + writeJSON(w, http.StatusCreated, out) +} + +func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request, path string) { + id, err := jobIDFromPath(path, false) + if err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + ctx, cancel := contextWithTimeout(r, 8*time.Second) + defer cancel() + job, err := s.store.GetJob(ctx, id) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + if job == nil { + writeError(w, http.StatusNotFound, "job not found") + return + } + writeJSON(w, http.StatusOK, job) +} + +func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) { + id, err := jobIDFromPath(path, true) + if err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + ctx, cancel := contextWithTimeout(r, 8*time.Second) + defer cancel() + job, err := s.store.RetryJob(ctx, id) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + if job == nil { + writeError(w, http.StatusNotFound, "retryable job not found") + return + } + writeJSON(w, http.StatusOK, job) +} + +func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) { + ctx, cancel := contextWithTimeout(r, 8*time.Second) + defer cancel() + stats, err := s.store.Stats(ctx) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, stats) +} + +func jobIDFromPath(path string, retry bool) (uuid.UUID, error) { + raw := strings.TrimPrefix(path, "/api/v1/jobs/") + if retry { + raw = strings.TrimSuffix(raw, "/retry") + } + id, err := uuid.Parse(strings.Trim(raw, "/")) + if err != nil { + return uuid.Nil, errors.New("bad job id") + } + return id, nil +} + +func isValidationError(err error) bool { + msg := err.Error() + return strings.Contains(msg, " is required") +} diff --git a/internal/migrate/migrate.go b/internal/migrate/migrate.go new file mode 100644 index 0000000..42266af --- /dev/null +++ b/internal/migrate/migrate.go @@ -0,0 +1,38 @@ +package migrate + +import ( + "context" + "embed" + "fmt" + "sort" + "strings" + + "ai-service/internal/store" +) + +//go:embed sql/*.up.sql +var migrationFiles embed.FS + +func Up(ctx context.Context, db *store.Store) error { + entries, err := migrationFiles.ReadDir("sql") + if err != nil { + return err + } + var names []string + for _, entry := range entries { + if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".up.sql") { + names = append(names, entry.Name()) + } + } + sort.Strings(names) + for _, name := range names { + body, err := migrationFiles.ReadFile("sql/" + name) + if err != nil { + return err + } + if err := db.Exec(ctx, string(body)); err != nil { + return fmt.Errorf("%s: %w", name, err) + } + } + return nil +} diff --git a/internal/migrate/sql/001_ai_jobs.up.sql b/internal/migrate/sql/001_ai_jobs.up.sql new file mode 100644 index 0000000..5fb930a --- /dev/null +++ b/internal/migrate/sql/001_ai_jobs.up.sql @@ -0,0 +1,39 @@ +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +CREATE TABLE IF NOT EXISTS ai_jobs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + owner_service TEXT NOT NULL, + owner_ref TEXT NOT NULL, + task_type TEXT NOT NULL, + model_profile TEXT NOT NULL, + priority INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'pending' + CHECK (status IN ('pending', 'running', 'done', 'failed', 'cancelled')), + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 3, + input JSONB NOT NULL DEFAULT '{}'::jsonb, + result JSONB, + error_code TEXT, + error_message TEXT, + scheduled_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + idempotency_key TEXT +); + +CREATE UNIQUE INDEX IF NOT EXISTS ai_jobs_idempotency_key_idx + ON ai_jobs (idempotency_key) + WHERE idempotency_key IS NOT NULL; + +CREATE INDEX IF NOT EXISTS ai_jobs_queue_idx + ON ai_jobs (status, priority DESC, scheduled_at ASC, created_at ASC) + WHERE status IN ('pending', 'running'); + +CREATE INDEX IF NOT EXISTS ai_jobs_owner_idx + ON ai_jobs (owner_service, owner_ref); + +CREATE INDEX IF NOT EXISTS ai_jobs_error_idx + ON ai_jobs (task_type, model_profile, error_code, updated_at DESC) + WHERE status = 'failed'; diff --git a/internal/model/job.go b/internal/model/job.go new file mode 100644 index 0000000..4ee821b --- /dev/null +++ b/internal/model/job.go @@ -0,0 +1,71 @@ +package model + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" +) + +const ( + StatusPending = "pending" + StatusRunning = "running" + StatusDone = "done" + StatusFailed = "failed" + StatusCancelled = "cancelled" +) + +type Job struct { + ID uuid.UUID `json:"id"` + OwnerService string `json:"owner_service"` + OwnerRef string `json:"owner_ref"` + TaskType string `json:"task_type"` + ModelProfile string `json:"model_profile"` + Priority int `json:"priority"` + Status string `json:"status"` + Attempts int `json:"attempts"` + MaxAttempts int `json:"max_attempts"` + Input json.RawMessage `json:"input"` + Result json.RawMessage `json:"result,omitempty"` + ErrorCode *string `json:"error_code,omitempty"` + ErrorMessage *string `json:"error_message,omitempty"` + ScheduledAt time.Time `json:"scheduled_at"` + StartedAt *time.Time `json:"started_at,omitempty"` + CompletedAt *time.Time `json:"completed_at,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + IdempotencyKey *string `json:"idempotency_key,omitempty"` +} + +type CreateJob struct { + OwnerService string `json:"owner_service"` + OwnerRef string `json:"owner_ref"` + TaskType string `json:"task_type"` + ModelProfile string `json:"model_profile"` + Priority int `json:"priority"` + MaxAttempts int `json:"max_attempts"` + Input json.RawMessage `json:"input"` + ScheduledAt *time.Time `json:"scheduled_at,omitempty"` + IdempotencyKey *string `json:"idempotency_key,omitempty"` +} + +type QueueStat struct { + TaskType string `json:"task_type"` + ModelProfile string `json:"model_profile"` + Status string `json:"status"` + Total int64 `json:"total"` +} + +type ErrorStat struct { + TaskType string `json:"task_type"` + ModelProfile string `json:"model_profile"` + ErrorCode string `json:"error_code"` + Total int64 `json:"total"` + Last24h int64 `json:"last_24h"` +} + +type Stats struct { + At time.Time `json:"at"` + Queues []QueueStat `json:"queues"` + Errors []ErrorStat `json:"errors,omitempty"` +} diff --git a/internal/store/store.go b/internal/store/store.go new file mode 100644 index 0000000..e8fd66f --- /dev/null +++ b/internal/store/store.go @@ -0,0 +1,229 @@ +package store + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "ai-service/internal/model" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +type Store struct { + pool *pgxpool.Pool +} + +func Open(ctx context.Context, databaseURL string) (*Store, error) { + if strings.TrimSpace(databaseURL) == "" { + return nil, errors.New("DATABASE_URL is required") + } + cfg, err := pgxpool.ParseConfig(databaseURL) + if err != nil { + return nil, fmt.Errorf("parse database url: %w", err) + } + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("connect postgres: %w", err) + } + if err := pool.Ping(ctx); err != nil { + pool.Close() + return nil, fmt.Errorf("ping postgres: %w", err) + } + return &Store{pool: pool}, nil +} + +func (s *Store) Close() { + s.pool.Close() +} + +func (s *Store) Ping(ctx context.Context) error { + return s.pool.Ping(ctx) +} + +func (s *Store) Exec(ctx context.Context, sql string, args ...any) error { + _, err := s.pool.Exec(ctx, sql, args...) + return err +} + +func (s *Store) CreateJob(ctx context.Context, in model.CreateJob) (*model.Job, error) { + if err := validateCreateJob(in); err != nil { + return nil, err + } + if in.MaxAttempts <= 0 { + in.MaxAttempts = 3 + } + if len(in.Input) == 0 { + in.Input = json.RawMessage(`{}`) + } + scheduledAt := time.Now().UTC() + if in.ScheduledAt != nil { + scheduledAt = in.ScheduledAt.UTC() + } + + const q = ` +INSERT INTO ai_jobs ( + owner_service, owner_ref, task_type, model_profile, priority, max_attempts, + input, scheduled_at, idempotency_key +) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) +ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL +DO UPDATE SET updated_at = ai_jobs.updated_at +RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, status, + attempts, max_attempts, input, result, error_code, error_message, + scheduled_at, started_at, completed_at, created_at, updated_at, idempotency_key +` + row := s.pool.QueryRow(ctx, q, + in.OwnerService, + in.OwnerRef, + in.TaskType, + in.ModelProfile, + in.Priority, + in.MaxAttempts, + in.Input, + scheduledAt, + in.IdempotencyKey, + ) + return scanJob(row) +} + +func validateCreateJob(in model.CreateJob) error { + switch { + case strings.TrimSpace(in.OwnerService) == "": + return errors.New("owner_service is required") + case strings.TrimSpace(in.OwnerRef) == "": + return errors.New("owner_ref is required") + case strings.TrimSpace(in.TaskType) == "": + return errors.New("task_type is required") + case strings.TrimSpace(in.ModelProfile) == "": + return errors.New("model_profile is required") + default: + return nil + } +} + +func (s *Store) GetJob(ctx context.Context, id uuid.UUID) (*model.Job, error) { + const q = ` +SELECT id, owner_service, owner_ref, task_type, model_profile, priority, status, + attempts, max_attempts, input, result, error_code, error_message, + scheduled_at, started_at, completed_at, created_at, updated_at, idempotency_key +FROM ai_jobs +WHERE id = $1 +` + job, err := scanJob(s.pool.QueryRow(ctx, q, id)) + if errors.Is(err, pgx.ErrNoRows) { + return nil, nil + } + return job, err +} + +func (s *Store) RetryJob(ctx context.Context, id uuid.UUID) (*model.Job, error) { + const q = ` +UPDATE ai_jobs +SET status = 'pending', + attempts = 0, + started_at = NULL, + completed_at = NULL, + error_code = NULL, + error_message = NULL, + scheduled_at = NOW(), + updated_at = NOW() +WHERE id = $1 + AND status IN ('failed', 'running') +RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, status, + attempts, max_attempts, input, result, error_code, error_message, + scheduled_at, started_at, completed_at, created_at, updated_at, idempotency_key +` + job, err := scanJob(s.pool.QueryRow(ctx, q, id)) + if errors.Is(err, pgx.ErrNoRows) { + return nil, nil + } + return job, err +} + +func (s *Store) Stats(ctx context.Context) (*model.Stats, error) { + out := &model.Stats{At: time.Now().UTC()} + + queueRows, err := s.pool.Query(ctx, ` +SELECT task_type, model_profile, status, count(*) +FROM ai_jobs +GROUP BY task_type, model_profile, status +ORDER BY task_type, model_profile, status +`) + if err != nil { + return nil, err + } + defer queueRows.Close() + for queueRows.Next() { + var stat model.QueueStat + if err := queueRows.Scan(&stat.TaskType, &stat.ModelProfile, &stat.Status, &stat.Total); err != nil { + return nil, err + } + out.Queues = append(out.Queues, stat) + } + if err := queueRows.Err(); err != nil { + return nil, err + } + + errorRows, err := s.pool.Query(ctx, ` +SELECT task_type, model_profile, COALESCE(NULLIF(error_code, ''), 'unknown') AS error_code, + count(*) AS total, + count(*) FILTER (WHERE updated_at > NOW() - INTERVAL '24 hours') AS last_24h +FROM ai_jobs +WHERE status = 'failed' +GROUP BY task_type, model_profile, COALESCE(NULLIF(error_code, ''), 'unknown') +ORDER BY last_24h DESC, total DESC +`) + if err != nil { + return nil, err + } + defer errorRows.Close() + for errorRows.Next() { + var stat model.ErrorStat + if err := errorRows.Scan(&stat.TaskType, &stat.ModelProfile, &stat.ErrorCode, &stat.Total, &stat.Last24h); err != nil { + return nil, err + } + out.Errors = append(out.Errors, stat) + } + return out, errorRows.Err() +} + +func scanJob(row pgx.Row) (*model.Job, error) { + var job model.Job + var input []byte + var result []byte + err := row.Scan( + &job.ID, + &job.OwnerService, + &job.OwnerRef, + &job.TaskType, + &job.ModelProfile, + &job.Priority, + &job.Status, + &job.Attempts, + &job.MaxAttempts, + &input, + &result, + &job.ErrorCode, + &job.ErrorMessage, + &job.ScheduledAt, + &job.StartedAt, + &job.CompletedAt, + &job.CreatedAt, + &job.UpdatedAt, + &job.IdempotencyKey, + ) + if err != nil { + return nil, err + } + job.Input = json.RawMessage(input) + if len(result) > 0 { + job.Result = json.RawMessage(result) + } + return &job, nil +} diff --git a/k8s/configmap.yaml b/k8s/configmap.yaml new file mode 100644 index 0000000..3f24174 --- /dev/null +++ b/k8s/configmap.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: ai-service-config + namespace: ai-service +data: + HTTP_HOST: "0.0.0.0" + HTTP_PORT: "8080" + MIGRATE_ON_START: "true" diff --git a/k8s/kustomization.yaml b/k8s/kustomization.yaml new file mode 100644 index 0000000..945ba89 --- /dev/null +++ b/k8s/kustomization.yaml @@ -0,0 +1,11 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: ai-service + +resources: + - namespace.yaml + - configmap.yaml + - secrets.yaml + - server-deployment.yaml + - server-service.yaml diff --git a/k8s/namespace.yaml b/k8s/namespace.yaml new file mode 100644 index 0000000..1c2e392 --- /dev/null +++ b/k8s/namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: ai-service diff --git a/k8s/secrets.yaml b/k8s/secrets.yaml new file mode 100644 index 0000000..140dd9c --- /dev/null +++ b/k8s/secrets.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Secret +metadata: + name: ai-service-secrets + namespace: ai-service +type: Opaque +stringData: + DATABASE_URL: "postgres://ai_service:CHANGE_ME@postgres:5432/ai_service?sslmode=disable" diff --git a/k8s/server-deployment.yaml b/k8s/server-deployment.yaml new file mode 100644 index 0000000..1722583 --- /dev/null +++ b/k8s/server-deployment.yaml @@ -0,0 +1,49 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ai-service + namespace: ai-service +spec: + replicas: 1 + selector: + matchLabels: + app: ai-service + template: + metadata: + labels: + app: ai-service + spec: + terminationGracePeriodSeconds: 20 + containers: + - name: server + image: localhost:30300/admin/ai-service:latest + ports: + - containerPort: 8080 + envFrom: + - configMapRef: + name: ai-service-config + - secretRef: + name: ai-service-secrets + startupProbe: + httpGet: + path: /readyz + port: 8080 + periodSeconds: 5 + failureThreshold: 30 + readinessProbe: + httpGet: + path: /readyz + port: 8080 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /healthz + port: 8080 + periodSeconds: 10 + resources: + requests: + cpu: 50m + memory: 96Mi + limits: + cpu: 500m + memory: 384Mi diff --git a/k8s/server-service.yaml b/k8s/server-service.yaml new file mode 100644 index 0000000..f2a508b --- /dev/null +++ b/k8s/server-service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: ai-service + namespace: ai-service +spec: + selector: + app: ai-service + ports: + - name: http + port: 8080 + targetPort: 8080 diff --git a/migrations/001_ai_jobs.down.sql b/migrations/001_ai_jobs.down.sql new file mode 100644 index 0000000..f7aa9de --- /dev/null +++ b/migrations/001_ai_jobs.down.sql @@ -0,0 +1,5 @@ +DROP INDEX IF EXISTS ai_jobs_error_idx; +DROP INDEX IF EXISTS ai_jobs_owner_idx; +DROP INDEX IF EXISTS ai_jobs_queue_idx; +DROP INDEX IF EXISTS ai_jobs_idempotency_key_idx; +DROP TABLE IF EXISTS ai_jobs; diff --git a/migrations/001_ai_jobs.up.sql b/migrations/001_ai_jobs.up.sql new file mode 100644 index 0000000..5fb930a --- /dev/null +++ b/migrations/001_ai_jobs.up.sql @@ -0,0 +1,39 @@ +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +CREATE TABLE IF NOT EXISTS ai_jobs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + owner_service TEXT NOT NULL, + owner_ref TEXT NOT NULL, + task_type TEXT NOT NULL, + model_profile TEXT NOT NULL, + priority INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'pending' + CHECK (status IN ('pending', 'running', 'done', 'failed', 'cancelled')), + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 3, + input JSONB NOT NULL DEFAULT '{}'::jsonb, + result JSONB, + error_code TEXT, + error_message TEXT, + scheduled_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + idempotency_key TEXT +); + +CREATE UNIQUE INDEX IF NOT EXISTS ai_jobs_idempotency_key_idx + ON ai_jobs (idempotency_key) + WHERE idempotency_key IS NOT NULL; + +CREATE INDEX IF NOT EXISTS ai_jobs_queue_idx + ON ai_jobs (status, priority DESC, scheduled_at ASC, created_at ASC) + WHERE status IN ('pending', 'running'); + +CREATE INDEX IF NOT EXISTS ai_jobs_owner_idx + ON ai_jobs (owner_service, owner_ref); + +CREATE INDEX IF NOT EXISTS ai_jobs_error_idx + ON ai_jobs (task_type, model_profile, error_code, updated_at DESC) + WHERE status = 'failed';