Compare commits

..

4 Commits

Author SHA1 Message Date
Grendgi
da144ecefe Batch create AI jobs efficiently
All checks were successful
CI / test (push) Successful in 22s
Build and Deploy / build-and-deploy (push) Successful in 25s
2026-06-09 11:49:12 +03:00
Grendgi
6254bf0810 Optimize AI dashboard job payload 2026-06-09 11:44:53 +03:00
Grendgi
092994fe74 Add AI job queue indexes 2026-06-09 11:41:15 +03:00
Grendgi
01ee090fa5 Add worker health endpoints 2026-06-09 11:38:03 +03:00
15 changed files with 351 additions and 28 deletions

View File

@@ -65,6 +65,8 @@ domain metadata fields in `input`, but the worker only reads chat fields such as
(GPU, containers, vLLM and WhisperX live metrics) when configured. (GPU, containers, vLLM and WhisperX live metrics) when configured.
- `GET /healthz` returns process health. - `GET /healthz` returns process health.
- `GET /readyz` checks PostgreSQL readiness. - `GET /readyz` checks PostgreSQL readiness.
- Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`:
`GET /healthz`, `GET /readyz` and `GET /worker/status`.
All `/api/v1/*` endpoints require `Authorization: Bearer <AI_SERVICE_TOKEN>` All `/api/v1/*` endpoints require `Authorization: Bearer <AI_SERVICE_TOKEN>`
when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open
@@ -83,6 +85,8 @@ for Kubernetes probes.
- `LLM_TIMEOUT`, default `5m` - `LLM_TIMEOUT`, default `5m`
- `WHISPERX_URL`, WhisperX endpoint for transcription jobs - `WHISPERX_URL`, WhisperX endpoint for transcription jobs
- `WORKER_ID`, default hostname - `WORKER_ID`, default hostname
- `WORKER_HTTP_HOST`, default `0.0.0.0`
- `WORKER_HTTP_PORT`, default `8081`
- `WORKER_POLL_INTERVAL`, default `2s` - `WORKER_POLL_INTERVAL`, default `2s`
- `WORKER_CLAIM_LIMIT`, default `4` - `WORKER_CLAIM_LIMIT`, default `4`
- `WORKER_LEASE_TIMEOUT`, default `15m` - `WORKER_LEASE_TIMEOUT`, default `15m`

View File

@@ -2,10 +2,16 @@ package main
import ( import (
"context" "context"
"encoding/json"
"errors"
"fmt"
"log/slog" "log/slog"
"net/http"
"os" "os"
"os/signal" "os/signal"
"strings"
"syscall" "syscall"
"time"
"ai-service/internal/config" "ai-service/internal/config"
"ai-service/internal/llm" "ai-service/internal/llm"
@@ -44,6 +50,7 @@ func main() {
llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout) llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout)
transcriber := transcription.New(cfg.WhisperXURL, cfg.WhisperXTimeout, cfg.FfmpegPath, cfg.WhisperXLeadSilence) transcriber := transcription.New(cfg.WhisperXURL, cfg.WhisperXTimeout, cfg.FfmpegPath, cfg.WhisperXLeadSilence)
w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit) w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit)
healthSrv := startHealthServer(ctx, db, cfg)
slog.Info("ai_worker_started", slog.Info("ai_worker_started",
"worker_id", cfg.WorkerID, "worker_id", cfg.WorkerID,
@@ -57,4 +64,82 @@ func main() {
"claim_limit", cfg.WorkerClaimLimit, "claim_limit", cfg.WorkerClaimLimit,
) )
w.Run(ctx) w.Run(ctx)
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = healthSrv.Shutdown(shutdownCtx)
}
type workerHealth struct {
store *store.Store
cfg config.Config
}
func startHealthServer(ctx context.Context, db *store.Store, cfg config.Config) *http.Server {
srv := &http.Server{
Addr: fmt.Sprintf("%s:%d", cfg.WorkerHTTPHost, cfg.WorkerHTTPPort),
Handler: workerHealth{store: db, cfg: cfg},
ReadHeaderTimeout: 5 * time.Second,
}
go func() {
slog.Info("ai_worker_health_started", "addr", srv.Addr)
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
slog.Error("ai_worker_health_failed", "error", err)
}
}()
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = srv.Shutdown(shutdownCtx)
}()
return srv
}
func (h workerHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) {
path := strings.TrimSuffix(r.URL.Path, "/")
if path == "" {
path = "/"
}
switch {
case r.Method == http.MethodGet && path == "/healthz":
writeWorkerJSON(w, http.StatusOK, map[string]any{
"status": "ok",
"worker_id": h.cfg.WorkerID,
})
case r.Method == http.MethodGet && path == "/readyz":
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
defer cancel()
if err := h.store.Ping(ctx); err != nil {
writeWorkerJSON(w, http.StatusServiceUnavailable, map[string]any{
"status": "not_ready",
"error": err.Error(),
})
return
}
writeWorkerJSON(w, http.StatusOK, map[string]any{
"status": "ready",
"worker_id": h.cfg.WorkerID,
})
case r.Method == http.MethodGet && path == "/worker/status":
writeWorkerJSON(w, http.StatusOK, map[string]any{
"status": "running",
"worker_id": h.cfg.WorkerID,
"task_types": h.cfg.WorkerTaskTypes,
"model_profiles": h.cfg.WorkerModelProfiles,
"claim_limit": h.cfg.WorkerClaimLimit,
"poll_interval": h.cfg.WorkerPollInterval.String(),
"lease_timeout": h.cfg.WorkerLeaseTimeout.String(),
})
default:
writeWorkerJSON(w, http.StatusNotFound, map[string]any{"error": "not found"})
}
}
func writeWorkerJSON(w http.ResponseWriter, code int, body any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
if err := json.NewEncoder(w).Encode(body); err != nil {
slog.Warn("worker_health_write_failed", "error", err)
}
} }

View File

@@ -26,6 +26,8 @@ type Config struct {
AIStatsTimeout time.Duration AIStatsTimeout time.Duration
WorkerID string WorkerID string
WorkerHTTPHost string
WorkerHTTPPort int
WorkerPollInterval time.Duration WorkerPollInterval time.Duration
WorkerClaimLimit int WorkerClaimLimit int
WorkerLeaseTimeout time.Duration WorkerLeaseTimeout time.Duration
@@ -53,6 +55,8 @@ func Load() Config {
AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second), AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second),
WorkerID: envString("WORKER_ID", hostname()), WorkerID: envString("WORKER_ID", hostname()),
WorkerHTTPHost: envString("WORKER_HTTP_HOST", "0.0.0.0"),
WorkerHTTPPort: envInt("WORKER_HTTP_PORT", 8081),
WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second), WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second),
WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4), WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4),
WorkerLeaseTimeout: envDuration("WORKER_LEASE_TIMEOUT", 15*time.Minute), WorkerLeaseTimeout: envDuration("WORKER_LEASE_TIMEOUT", 15*time.Minute),

View File

@@ -13,7 +13,7 @@ type dashboardResponse struct {
Stats *model.Stats `json:"stats"` Stats *model.Stats `json:"stats"`
Providers providersStatusResponse `json:"providers"` Providers providersStatusResponse `json:"providers"`
Infra infraStatusResponse `json:"infra"` Infra infraStatusResponse `json:"infra"`
Jobs []*model.Job `json:"jobs"` Jobs []*model.JobSummary `json:"jobs"`
} }
type dashboardSummary struct { type dashboardSummary struct {
@@ -35,7 +35,7 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, err.Error()) writeError(w, http.StatusInternalServerError, err.Error())
return return
} }
jobs, err := s.store.ListJobs(ctx, model.JobFilter{ jobs, err := s.store.ListJobSummaries(ctx, model.JobFilter{
Statuses: []string{model.StatusFailed, model.StatusRunning}, Statuses: []string{model.StatusFailed, model.StatusRunning},
Limit: 40, Limit: 40,
}) })

View File

@@ -135,6 +135,8 @@ type createBatchResponse struct {
Jobs []*model.Job `json:"jobs"` Jobs []*model.Job `json:"jobs"`
} }
const maxCreateBatchJobs = 1000
func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) { func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
var req createBatchRequest var req createBatchRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil { if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
@@ -145,17 +147,21 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusBadRequest, "jobs is required") writeError(w, http.StatusBadRequest, "jobs is required")
return return
} }
if len(req.Jobs) > maxCreateBatchJobs {
writeError(w, http.StatusBadRequest, fmt.Sprintf("jobs limit is %d", maxCreateBatchJobs))
return
}
ctx, cancel := contextWithTimeout(r, 20*time.Second) ctx, cancel := contextWithTimeout(r, 20*time.Second)
defer cancel() defer cancel()
out := createBatchResponse{Jobs: make([]*model.Job, 0, len(req.Jobs))} items := make([]model.CreateJob, 0, len(req.Jobs))
for _, item := range req.Jobs { for _, item := range req.Jobs {
if item.OwnerService == "" { if strings.TrimSpace(item.OwnerService) == "" {
item.OwnerService = req.OwnerService item.OwnerService = req.OwnerService
} }
if item.TaskType == "" { if strings.TrimSpace(item.TaskType) == "" {
item.TaskType = req.TaskType item.TaskType = req.TaskType
} }
if item.ModelProfile == "" { if strings.TrimSpace(item.ModelProfile) == "" {
item.ModelProfile = req.ModelProfile item.ModelProfile = req.ModelProfile
} }
if item.Priority == 0 { if item.Priority == 0 {
@@ -164,7 +170,9 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
if item.MaxAttempts == 0 { if item.MaxAttempts == 0 {
item.MaxAttempts = req.MaxAttempts item.MaxAttempts = req.MaxAttempts
} }
job, err := s.store.CreateJob(ctx, item) items = append(items, item)
}
jobs, err := s.store.CreateJobs(ctx, items)
if err != nil { if err != nil {
status := http.StatusInternalServerError status := http.StatusInternalServerError
if isValidationError(err) { if isValidationError(err) {
@@ -173,9 +181,7 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
writeError(w, status, err.Error()) writeError(w, status, err.Error())
return return
} }
out.Jobs = append(out.Jobs, job) writeJSON(w, http.StatusCreated, createBatchResponse{Jobs: jobs})
}
writeJSON(w, http.StatusCreated, out)
} }
func (s *Server) handleRetryJobs(w http.ResponseWriter, r *http.Request) { func (s *Server) handleRetryJobs(w http.ResponseWriter, r *http.Request) {

View File

@@ -0,0 +1,13 @@
CREATE INDEX IF NOT EXISTS ai_jobs_pending_claim_scope_idx
ON ai_jobs (task_type, model_profile, priority DESC, scheduled_at DESC, created_at DESC)
WHERE status = 'pending';
CREATE INDEX IF NOT EXISTS ai_jobs_running_lease_idx
ON ai_jobs ((COALESCE(heartbeat_at, started_at, updated_at)))
WHERE status = 'running';
CREATE INDEX IF NOT EXISTS ai_jobs_queue_stats_idx
ON ai_jobs (task_type, model_profile, status);
CREATE INDEX IF NOT EXISTS ai_jobs_owner_stats_idx
ON ai_jobs (owner_service, task_type, model_profile, status);

View File

@@ -0,0 +1,2 @@
CREATE INDEX IF NOT EXISTS ai_jobs_recent_status_idx
ON ai_jobs (status, updated_at DESC, created_at DESC);

View File

@@ -39,6 +39,27 @@ type Job struct {
IdempotencyKey *string `json:"idempotency_key,omitempty"` IdempotencyKey *string `json:"idempotency_key,omitempty"`
} }
type JobSummary struct {
ID uuid.UUID `json:"id"`
OwnerService string `json:"owner_service"`
OwnerRef string `json:"owner_ref"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Priority int `json:"priority"`
Status string `json:"status"`
Attempts int `json:"attempts"`
MaxAttempts int `json:"max_attempts"`
ErrorCode *string `json:"error_code,omitempty"`
ErrorMessage *string `json:"error_message,omitempty"`
ScheduledAt time.Time `json:"scheduled_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
WorkerID *string `json:"worker_id,omitempty"`
HeartbeatAt *time.Time `json:"heartbeat_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type CreateJob struct { type CreateJob struct {
OwnerService string `json:"owner_service"` OwnerService string `json:"owner_service"`
OwnerRef string `json:"owner_ref"` OwnerRef string `json:"owner_ref"`

View File

@@ -69,16 +69,7 @@ func (s *Store) CreateJob(ctx context.Context, in model.CreateJob) (*model.Job,
if err := validateCreateJob(in); err != nil { if err := validateCreateJob(in); err != nil {
return nil, err return nil, err
} }
if in.MaxAttempts <= 0 { normalizeCreateJob(&in)
in.MaxAttempts = 3
}
if len(in.Input) == 0 {
in.Input = json.RawMessage(`{}`)
}
scheduledAt := time.Now().UTC()
if in.ScheduledAt != nil {
scheduledAt = in.ScheduledAt.UTC()
}
const q = ` const q = `
INSERT INTO ai_jobs ( INSERT INTO ai_jobs (
@@ -98,12 +89,81 @@ RETURNING ` + jobSelectColumns + `
in.Priority, in.Priority,
in.MaxAttempts, in.MaxAttempts,
in.Input, in.Input,
scheduledAt, *in.ScheduledAt,
in.IdempotencyKey, in.IdempotencyKey,
) )
return scanJob(row) return scanJob(row)
} }
func (s *Store) CreateJobs(ctx context.Context, items []model.CreateJob) ([]*model.Job, error) {
if len(items) == 0 {
return []*model.Job{}, nil
}
const q = `
INSERT INTO ai_jobs (
owner_service, owner_ref, task_type, model_profile, priority, max_attempts,
input, scheduled_at, idempotency_key
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL
DO UPDATE SET updated_at = ai_jobs.updated_at
RETURNING ` + jobSelectColumns + `
`
var batch pgx.Batch
for i := range items {
if err := validateCreateJob(items[i]); err != nil {
return nil, err
}
normalizeCreateJob(&items[i])
batch.Queue(q,
items[i].OwnerService,
items[i].OwnerRef,
items[i].TaskType,
items[i].ModelProfile,
items[i].Priority,
items[i].MaxAttempts,
items[i].Input,
*items[i].ScheduledAt,
items[i].IdempotencyKey,
)
}
br := s.pool.SendBatch(ctx, &batch)
batchClosed := false
defer func() {
if !batchClosed {
_ = br.Close()
}
}()
out := make([]*model.Job, 0, len(items))
for range items {
job, err := scanJob(br.QueryRow())
if err != nil {
return nil, err
}
out = append(out, job)
}
err := br.Close()
batchClosed = true
if err != nil {
return nil, err
}
return out, nil
}
func normalizeCreateJob(in *model.CreateJob) {
if in.MaxAttempts <= 0 {
in.MaxAttempts = 3
}
if len(in.Input) == 0 {
in.Input = json.RawMessage(`{}`)
}
scheduledAt := time.Now().UTC()
if in.ScheduledAt != nil {
scheduledAt = in.ScheduledAt.UTC()
}
in.ScheduledAt = &scheduledAt
}
func validateCreateJob(in model.CreateJob) error { func validateCreateJob(in model.CreateJob) error {
switch { switch {
case strings.TrimSpace(in.OwnerService) == "": case strings.TrimSpace(in.OwnerService) == "":
@@ -174,6 +234,48 @@ LIMIT $7 OFFSET $8
return out, rows.Err() return out, rows.Err()
} }
func (s *Store) ListJobSummaries(ctx context.Context, filter model.JobFilter) ([]*model.JobSummary, error) {
normalizeFilter(&filter)
const q = `
SELECT id, owner_service, owner_ref, task_type, model_profile, priority, status,
attempts, max_attempts, error_code, error_message,
scheduled_at, started_at, completed_at, worker_id, heartbeat_at,
created_at, updated_at
FROM ai_jobs
WHERE ($1 = '' OR owner_service = $1)
AND ($2 = '' OR owner_ref = $2)
AND ($3 = '' OR task_type = $3)
AND ($4 = '' OR model_profile = $4)
AND (cardinality($5::text[]) = 0 OR status = ANY($5::text[]))
AND (cardinality($6::text[]) = 0 OR COALESCE(NULLIF(error_code, ''), 'unknown') = ANY($6::text[]))
ORDER BY updated_at DESC, created_at DESC
LIMIT $7 OFFSET $8
`
rows, err := s.pool.Query(ctx, q,
filter.OwnerService,
filter.OwnerRef,
filter.TaskType,
filter.ModelProfile,
filter.Statuses,
filter.ErrorCodes,
filter.Limit,
filter.Offset,
)
if err != nil {
return nil, err
}
defer rows.Close()
var out []*model.JobSummary
for rows.Next() {
job, err := scanJobSummary(rows)
if err != nil {
return nil, err
}
out = append(out, job)
}
return out, rows.Err()
}
func normalizeFilter(filter *model.JobFilter) { func normalizeFilter(filter *model.JobFilter) {
filter.OwnerService = strings.TrimSpace(filter.OwnerService) filter.OwnerService = strings.TrimSpace(filter.OwnerService)
filter.OwnerRef = strings.TrimSpace(filter.OwnerRef) filter.OwnerRef = strings.TrimSpace(filter.OwnerRef)
@@ -519,6 +621,34 @@ ORDER BY last_24h DESC, total DESC
return out, errorRows.Err() return out, errorRows.Err()
} }
func scanJobSummary(row pgx.Row) (*model.JobSummary, error) {
var job model.JobSummary
err := row.Scan(
&job.ID,
&job.OwnerService,
&job.OwnerRef,
&job.TaskType,
&job.ModelProfile,
&job.Priority,
&job.Status,
&job.Attempts,
&job.MaxAttempts,
&job.ErrorCode,
&job.ErrorMessage,
&job.ScheduledAt,
&job.StartedAt,
&job.CompletedAt,
&job.WorkerID,
&job.HeartbeatAt,
&job.CreatedAt,
&job.UpdatedAt,
)
if err != nil {
return nil, err
}
return &job, nil
}
func scanJob(row pgx.Row) (*model.Job, error) { func scanJob(row pgx.Row) (*model.Job, error) {
var job model.Job var job model.Job
var input []byte var input []byte

View File

@@ -18,5 +18,7 @@ data:
AI_STATS_SIDECAR_URL: "http://10.2.3.5:9090" AI_STATS_SIDECAR_URL: "http://10.2.3.5:9090"
AI_STATS_TIMEOUT: "8s" AI_STATS_TIMEOUT: "8s"
WORKER_POLL_INTERVAL: "2s" WORKER_POLL_INTERVAL: "2s"
WORKER_HTTP_HOST: "0.0.0.0"
WORKER_HTTP_PORT: "8081"
WORKER_CLAIM_LIMIT: "4" WORKER_CLAIM_LIMIT: "4"
WORKER_LEASE_TIMEOUT: "15m" WORKER_LEASE_TIMEOUT: "15m"

View File

@@ -22,6 +22,8 @@ spec:
- name: worker - name: worker
image: localhost:30300/admin/ai-service:latest image: localhost:30300/admin/ai-service:latest
command: ["/usr/local/bin/ai-service-worker"] command: ["/usr/local/bin/ai-service-worker"]
ports:
- containerPort: 8081
env: env:
- name: WORKER_ID - name: WORKER_ID
valueFrom: valueFrom:
@@ -38,6 +40,22 @@ spec:
name: ai-service-config name: ai-service-config
- secretRef: - secretRef:
name: ai-service-secrets name: ai-service-secrets
startupProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 5
failureThreshold: 30
readinessProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 10
livenessProbe:
httpGet:
path: /healthz
port: 8081
periodSeconds: 10
resources: resources:
requests: requests:
cpu: 50m cpu: 50m
@@ -70,6 +88,8 @@ spec:
- name: worker - name: worker
image: localhost:30300/admin/ai-service:latest image: localhost:30300/admin/ai-service:latest
command: ["/usr/local/bin/ai-service-worker"] command: ["/usr/local/bin/ai-service-worker"]
ports:
- containerPort: 8081
env: env:
- name: WORKER_ID - name: WORKER_ID
valueFrom: valueFrom:
@@ -86,6 +106,22 @@ spec:
name: ai-service-config name: ai-service-config
- secretRef: - secretRef:
name: ai-service-secrets name: ai-service-secrets
startupProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 5
failureThreshold: 30
readinessProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 10
livenessProbe:
httpGet:
path: /healthz
port: 8081
periodSeconds: 10
resources: resources:
requests: requests:
cpu: 50m cpu: 50m

View File

@@ -0,0 +1,4 @@
DROP INDEX IF EXISTS ai_jobs_owner_stats_idx;
DROP INDEX IF EXISTS ai_jobs_queue_stats_idx;
DROP INDEX IF EXISTS ai_jobs_running_lease_idx;
DROP INDEX IF EXISTS ai_jobs_pending_claim_scope_idx;

View File

@@ -0,0 +1,13 @@
CREATE INDEX IF NOT EXISTS ai_jobs_pending_claim_scope_idx
ON ai_jobs (task_type, model_profile, priority DESC, scheduled_at DESC, created_at DESC)
WHERE status = 'pending';
CREATE INDEX IF NOT EXISTS ai_jobs_running_lease_idx
ON ai_jobs ((COALESCE(heartbeat_at, started_at, updated_at)))
WHERE status = 'running';
CREATE INDEX IF NOT EXISTS ai_jobs_queue_stats_idx
ON ai_jobs (task_type, model_profile, status);
CREATE INDEX IF NOT EXISTS ai_jobs_owner_stats_idx
ON ai_jobs (owner_service, task_type, model_profile, status);

View File

@@ -0,0 +1 @@
DROP INDEX IF EXISTS ai_jobs_recent_status_idx;

View File

@@ -0,0 +1,2 @@
CREATE INDEX IF NOT EXISTS ai_jobs_recent_status_idx
ON ai_jobs (status, updated_at DESC, created_at DESC);