From 7452b1d5f2158216e431ca0e3e5d181fac8bddf5 Mon Sep 17 00:00:00 2001 From: Grendgi Date: Mon, 8 Jun 2026 13:57:28 +0300 Subject: [PATCH] Add AI job management endpoints --- README.md | 3 + internal/httpapi/server.go | 111 +++++++++++++++++++++++++ internal/model/job.go | 24 ++++++ internal/store/store.go | 163 +++++++++++++++++++++++++++++++++++++ 4 files changed, 301 insertions(+) diff --git a/README.md b/README.md index 57d8407..de6fc06 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,10 @@ or compact `system` / `user` fields. The completed job result contains ## API - `POST /api/v1/jobs` creates one job. +- `GET /api/v1/jobs` lists jobs with query filters. - `POST /api/v1/jobs/batch` creates many jobs with shared defaults. +- `POST /api/v1/jobs/retry` retries failed/running jobs by filter. +- `POST /api/v1/jobs/cancel` cancels pending/running jobs by filter. - `POST /api/v1/jobs/claim` atomically claims pending jobs for a worker. - `GET /api/v1/jobs/{id}` returns technical job state and result. - `POST /api/v1/jobs/{id}/complete` stores a successful job result. diff --git a/internal/httpapi/server.go b/internal/httpapi/server.go index 2ad6ade..570bd1a 100644 --- a/internal/httpapi/server.go +++ b/internal/httpapi/server.go @@ -3,6 +3,7 @@ package httpapi import ( "encoding/json" "errors" + "fmt" "net/http" "strings" "time" @@ -37,8 +38,14 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"}) case r.Method == http.MethodPost && path == "/api/v1/jobs": s.handleCreateJob(w, r) + case r.Method == http.MethodGet && path == "/api/v1/jobs": + s.handleListJobs(w, r) case r.Method == http.MethodPost && path == "/api/v1/jobs/batch": s.handleCreateBatch(w, r) + case r.Method == http.MethodPost && path == "/api/v1/jobs/retry": + s.handleRetryJobs(w, r) + case r.Method == http.MethodPost && path == "/api/v1/jobs/cancel": + s.handleCancelJobs(w, r) case r.Method == http.MethodPost && path == "/api/v1/jobs/claim": s.handleClaimJobs(w, r) case r.Method == http.MethodGet && strings.HasPrefix(path, "/api/v1/jobs/"): @@ -88,6 +95,22 @@ func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusCreated, job) } +type listJobsResponse struct { + Jobs []*model.Job `json:"jobs"` +} + +func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) { + filter := jobFilterFromQuery(r) + ctx, cancel := contextWithTimeout(r, 8*time.Second) + defer cancel() + jobs, err := s.store.ListJobs(ctx, filter) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, listJobsResponse{Jobs: jobs}) +} + type createBatchRequest struct { OwnerService string `json:"owner_service"` TaskType string `json:"task_type"` @@ -144,6 +167,46 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusCreated, out) } +func (s *Server) handleRetryJobs(w http.ResponseWriter, r *http.Request) { + var req model.JobFilter + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "bad json") + return + } + if !hasAnyFilter(req) { + writeError(w, http.StatusBadRequest, "at least one filter is required") + return + } + ctx, cancel := contextWithTimeout(r, 20*time.Second) + defer cancel() + updated, err := s.store.RetryJobs(ctx, req) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, model.BatchActionResult{Updated: updated}) +} + +func (s *Server) handleCancelJobs(w http.ResponseWriter, r *http.Request) { + var req model.JobFilter + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "bad json") + return + } + if !hasAnyFilter(req) { + writeError(w, http.StatusBadRequest, "at least one filter is required") + return + } + ctx, cancel := contextWithTimeout(r, 20*time.Second) + defer cancel() + updated, err := s.store.CancelJobs(ctx, req) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, model.BatchActionResult{Updated: updated}) +} + type claimJobsResponse struct { Jobs []*model.Job `json:"jobs"` } @@ -291,3 +354,51 @@ func isValidationError(err error) bool { msg := err.Error() return strings.Contains(msg, " is required") } + +func jobFilterFromQuery(r *http.Request) model.JobFilter { + q := r.URL.Query() + return model.JobFilter{ + OwnerService: q.Get("owner_service"), + OwnerRef: q.Get("owner_ref"), + TaskType: q.Get("task_type"), + ModelProfile: q.Get("model_profile"), + Statuses: splitCSV(q.Get("status")), + ErrorCodes: splitCSV(q.Get("error_code")), + Limit: parseIntDefault(q.Get("limit"), 100), + Offset: parseIntDefault(q.Get("offset"), 0), + } +} + +func splitCSV(raw string) []string { + if strings.TrimSpace(raw) == "" { + return nil + } + parts := strings.Split(raw, ",") + out := make([]string, 0, len(parts)) + for _, part := range parts { + if v := strings.TrimSpace(part); v != "" { + out = append(out, v) + } + } + return out +} + +func parseIntDefault(raw string, fallback int) int { + if strings.TrimSpace(raw) == "" { + return fallback + } + var out int + if _, err := fmt.Sscanf(raw, "%d", &out); err != nil { + return fallback + } + return out +} + +func hasAnyFilter(filter model.JobFilter) bool { + return strings.TrimSpace(filter.OwnerService) != "" || + strings.TrimSpace(filter.OwnerRef) != "" || + strings.TrimSpace(filter.TaskType) != "" || + strings.TrimSpace(filter.ModelProfile) != "" || + len(filter.Statuses) > 0 || + len(filter.ErrorCodes) > 0 +} diff --git a/internal/model/job.go b/internal/model/job.go index bf4c56b..eb6b96f 100644 --- a/internal/model/job.go +++ b/internal/model/job.go @@ -67,6 +67,21 @@ type FailJob struct { ErrorMessage string `json:"error_message"` } +type JobFilter struct { + OwnerService string `json:"owner_service,omitempty"` + OwnerRef string `json:"owner_ref,omitempty"` + TaskType string `json:"task_type,omitempty"` + ModelProfile string `json:"model_profile,omitempty"` + Statuses []string `json:"statuses,omitempty"` + ErrorCodes []string `json:"error_codes,omitempty"` + Limit int `json:"limit,omitempty"` + Offset int `json:"offset,omitempty"` +} + +type BatchActionResult struct { + Updated int `json:"updated"` +} + type QueueStat struct { TaskType string `json:"task_type"` ModelProfile string `json:"model_profile"` @@ -82,8 +97,17 @@ type ErrorStat struct { Last24h int64 `json:"last_24h"` } +type OwnerStat struct { + OwnerService string `json:"owner_service"` + TaskType string `json:"task_type"` + ModelProfile string `json:"model_profile"` + Status string `json:"status"` + Total int64 `json:"total"` +} + type Stats struct { At time.Time `json:"at"` Queues []QueueStat `json:"queues"` + Owners []OwnerStat `json:"owners,omitempty"` Errors []ErrorStat `json:"errors,omitempty"` } diff --git a/internal/store/store.go b/internal/store/store.go index 1fe7359..a387acb 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -128,6 +128,67 @@ WHERE id = $1 return job, err } +func (s *Store) ListJobs(ctx context.Context, filter model.JobFilter) ([]*model.Job, error) { + normalizeFilter(&filter) + const q = ` +SELECT ` + jobSelectColumns + ` +FROM ai_jobs +WHERE ($1 = '' OR owner_service = $1) + AND ($2 = '' OR owner_ref = $2) + AND ($3 = '' OR task_type = $3) + AND ($4 = '' OR model_profile = $4) + AND (cardinality($5::text[]) = 0 OR status = ANY($5::text[])) + AND (cardinality($6::text[]) = 0 OR COALESCE(NULLIF(error_code, ''), 'unknown') = ANY($6::text[])) +ORDER BY created_at DESC +LIMIT $7 OFFSET $8 +` + rows, err := s.pool.Query(ctx, q, + filter.OwnerService, + filter.OwnerRef, + filter.TaskType, + filter.ModelProfile, + filter.Statuses, + filter.ErrorCodes, + filter.Limit, + filter.Offset, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var out []*model.Job + for rows.Next() { + job, err := scanJob(rows) + if err != nil { + return nil, err + } + out = append(out, job) + } + return out, rows.Err() +} + +func normalizeFilter(filter *model.JobFilter) { + filter.OwnerService = strings.TrimSpace(filter.OwnerService) + filter.OwnerRef = strings.TrimSpace(filter.OwnerRef) + filter.TaskType = strings.TrimSpace(filter.TaskType) + filter.ModelProfile = strings.TrimSpace(filter.ModelProfile) + if filter.Statuses == nil { + filter.Statuses = []string{} + } + if filter.ErrorCodes == nil { + filter.ErrorCodes = []string{} + } + if filter.Limit <= 0 { + filter.Limit = 100 + } + if filter.Limit > 500 { + filter.Limit = 500 + } + if filter.Offset < 0 { + filter.Offset = 0 + } +} + func (s *Store) RetryJob(ctx context.Context, id uuid.UUID) (*model.Job, error) { const q = ` UPDATE ai_jobs @@ -155,6 +216,87 @@ RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, stat return job, err } +func (s *Store) RetryJobs(ctx context.Context, filter model.JobFilter) (int, error) { + normalizeFilter(&filter) + const q = ` +WITH picked AS ( + SELECT id + FROM ai_jobs + WHERE status IN ('failed', 'running') + AND ($1 = '' OR owner_service = $1) + AND ($2 = '' OR owner_ref = $2) + AND ($3 = '' OR task_type = $3) + AND ($4 = '' OR model_profile = $4) + AND (cardinality($5::text[]) = 0 OR COALESCE(NULLIF(error_code, ''), 'unknown') = ANY($5::text[])) + ORDER BY updated_at ASC + LIMIT $6 +) +UPDATE ai_jobs j +SET status = 'pending', + attempts = 0, + started_at = NULL, + completed_at = NULL, + error_code = NULL, + error_message = NULL, + worker_id = NULL, + heartbeat_at = NULL, + scheduled_at = NOW(), + updated_at = NOW() +FROM picked +WHERE j.id = picked.id +` + tag, err := s.pool.Exec(ctx, q, + filter.OwnerService, + filter.OwnerRef, + filter.TaskType, + filter.ModelProfile, + filter.ErrorCodes, + filter.Limit, + ) + if err != nil { + return 0, err + } + return int(tag.RowsAffected()), nil +} + +func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) { + normalizeFilter(&filter) + const q = ` +WITH picked AS ( + SELECT id + FROM ai_jobs + WHERE status IN ('pending', 'running') + AND ($1 = '' OR owner_service = $1) + AND ($2 = '' OR owner_ref = $2) + AND ($3 = '' OR task_type = $3) + AND ($4 = '' OR model_profile = $4) + AND (cardinality($5::text[]) = 0 OR status = ANY($5::text[])) + ORDER BY updated_at ASC + LIMIT $6 +) +UPDATE ai_jobs j +SET status = 'cancelled', + completed_at = NOW(), + worker_id = NULL, + heartbeat_at = NULL, + updated_at = NOW() +FROM picked +WHERE j.id = picked.id +` + tag, err := s.pool.Exec(ctx, q, + filter.OwnerService, + filter.OwnerRef, + filter.TaskType, + filter.ModelProfile, + filter.Statuses, + filter.Limit, + ) + if err != nil { + return 0, err + } + return int(tag.RowsAffected()), nil +} + func (s *Store) ClaimJobs(ctx context.Context, in model.ClaimJobs) ([]*model.Job, error) { if in.Limit <= 0 { in.Limit = 1 @@ -326,6 +468,27 @@ ORDER BY task_type, model_profile, status return nil, err } + ownerRows, err := s.pool.Query(ctx, ` +SELECT owner_service, task_type, model_profile, status, count(*) +FROM ai_jobs +GROUP BY owner_service, task_type, model_profile, status +ORDER BY owner_service, task_type, model_profile, status +`) + if err != nil { + return nil, err + } + defer ownerRows.Close() + for ownerRows.Next() { + var stat model.OwnerStat + if err := ownerRows.Scan(&stat.OwnerService, &stat.TaskType, &stat.ModelProfile, &stat.Status, &stat.Total); err != nil { + return nil, err + } + out.Owners = append(out.Owners, stat) + } + if err := ownerRows.Err(); err != nil { + return nil, err + } + errorRows, err := s.pool.Query(ctx, ` SELECT task_type, model_profile, COALESCE(NULLIF(error_code, ''), 'unknown') AS error_code, count(*) AS total,