Add AI job management endpoints
This commit is contained in:
@@ -45,7 +45,10 @@ or compact `system` / `user` fields. The completed job result contains
|
|||||||
## API
|
## API
|
||||||
|
|
||||||
- `POST /api/v1/jobs` creates one job.
|
- `POST /api/v1/jobs` creates one job.
|
||||||
|
- `GET /api/v1/jobs` lists jobs with query filters.
|
||||||
- `POST /api/v1/jobs/batch` creates many jobs with shared defaults.
|
- `POST /api/v1/jobs/batch` creates many jobs with shared defaults.
|
||||||
|
- `POST /api/v1/jobs/retry` retries failed/running jobs by filter.
|
||||||
|
- `POST /api/v1/jobs/cancel` cancels pending/running jobs by filter.
|
||||||
- `POST /api/v1/jobs/claim` atomically claims pending jobs for a worker.
|
- `POST /api/v1/jobs/claim` atomically claims pending jobs for a worker.
|
||||||
- `GET /api/v1/jobs/{id}` returns technical job state and result.
|
- `GET /api/v1/jobs/{id}` returns technical job state and result.
|
||||||
- `POST /api/v1/jobs/{id}/complete` stores a successful job result.
|
- `POST /api/v1/jobs/{id}/complete` stores a successful job result.
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package httpapi
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -37,8 +38,14 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"})
|
writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"})
|
||||||
case r.Method == http.MethodPost && path == "/api/v1/jobs":
|
case r.Method == http.MethodPost && path == "/api/v1/jobs":
|
||||||
s.handleCreateJob(w, r)
|
s.handleCreateJob(w, r)
|
||||||
|
case r.Method == http.MethodGet && path == "/api/v1/jobs":
|
||||||
|
s.handleListJobs(w, r)
|
||||||
case r.Method == http.MethodPost && path == "/api/v1/jobs/batch":
|
case r.Method == http.MethodPost && path == "/api/v1/jobs/batch":
|
||||||
s.handleCreateBatch(w, r)
|
s.handleCreateBatch(w, r)
|
||||||
|
case r.Method == http.MethodPost && path == "/api/v1/jobs/retry":
|
||||||
|
s.handleRetryJobs(w, r)
|
||||||
|
case r.Method == http.MethodPost && path == "/api/v1/jobs/cancel":
|
||||||
|
s.handleCancelJobs(w, r)
|
||||||
case r.Method == http.MethodPost && path == "/api/v1/jobs/claim":
|
case r.Method == http.MethodPost && path == "/api/v1/jobs/claim":
|
||||||
s.handleClaimJobs(w, r)
|
s.handleClaimJobs(w, r)
|
||||||
case r.Method == http.MethodGet && strings.HasPrefix(path, "/api/v1/jobs/"):
|
case r.Method == http.MethodGet && strings.HasPrefix(path, "/api/v1/jobs/"):
|
||||||
@@ -88,6 +95,22 @@ func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeJSON(w, http.StatusCreated, job)
|
writeJSON(w, http.StatusCreated, job)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type listJobsResponse struct {
|
||||||
|
Jobs []*model.Job `json:"jobs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) {
|
||||||
|
filter := jobFilterFromQuery(r)
|
||||||
|
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
jobs, err := s.store.ListJobs(ctx, filter)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, listJobsResponse{Jobs: jobs})
|
||||||
|
}
|
||||||
|
|
||||||
type createBatchRequest struct {
|
type createBatchRequest struct {
|
||||||
OwnerService string `json:"owner_service"`
|
OwnerService string `json:"owner_service"`
|
||||||
TaskType string `json:"task_type"`
|
TaskType string `json:"task_type"`
|
||||||
@@ -144,6 +167,46 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeJSON(w, http.StatusCreated, out)
|
writeJSON(w, http.StatusCreated, out)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleRetryJobs(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var req model.JobFilter
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
writeError(w, http.StatusBadRequest, "bad json")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !hasAnyFilter(req) {
|
||||||
|
writeError(w, http.StatusBadRequest, "at least one filter is required")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ctx, cancel := contextWithTimeout(r, 20*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
updated, err := s.store.RetryJobs(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, model.BatchActionResult{Updated: updated})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleCancelJobs(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var req model.JobFilter
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
writeError(w, http.StatusBadRequest, "bad json")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !hasAnyFilter(req) {
|
||||||
|
writeError(w, http.StatusBadRequest, "at least one filter is required")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ctx, cancel := contextWithTimeout(r, 20*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
updated, err := s.store.CancelJobs(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, model.BatchActionResult{Updated: updated})
|
||||||
|
}
|
||||||
|
|
||||||
type claimJobsResponse struct {
|
type claimJobsResponse struct {
|
||||||
Jobs []*model.Job `json:"jobs"`
|
Jobs []*model.Job `json:"jobs"`
|
||||||
}
|
}
|
||||||
@@ -291,3 +354,51 @@ func isValidationError(err error) bool {
|
|||||||
msg := err.Error()
|
msg := err.Error()
|
||||||
return strings.Contains(msg, " is required")
|
return strings.Contains(msg, " is required")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func jobFilterFromQuery(r *http.Request) model.JobFilter {
|
||||||
|
q := r.URL.Query()
|
||||||
|
return model.JobFilter{
|
||||||
|
OwnerService: q.Get("owner_service"),
|
||||||
|
OwnerRef: q.Get("owner_ref"),
|
||||||
|
TaskType: q.Get("task_type"),
|
||||||
|
ModelProfile: q.Get("model_profile"),
|
||||||
|
Statuses: splitCSV(q.Get("status")),
|
||||||
|
ErrorCodes: splitCSV(q.Get("error_code")),
|
||||||
|
Limit: parseIntDefault(q.Get("limit"), 100),
|
||||||
|
Offset: parseIntDefault(q.Get("offset"), 0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitCSV(raw string) []string {
|
||||||
|
if strings.TrimSpace(raw) == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
parts := strings.Split(raw, ",")
|
||||||
|
out := make([]string, 0, len(parts))
|
||||||
|
for _, part := range parts {
|
||||||
|
if v := strings.TrimSpace(part); v != "" {
|
||||||
|
out = append(out, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseIntDefault(raw string, fallback int) int {
|
||||||
|
if strings.TrimSpace(raw) == "" {
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
var out int
|
||||||
|
if _, err := fmt.Sscanf(raw, "%d", &out); err != nil {
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func hasAnyFilter(filter model.JobFilter) bool {
|
||||||
|
return strings.TrimSpace(filter.OwnerService) != "" ||
|
||||||
|
strings.TrimSpace(filter.OwnerRef) != "" ||
|
||||||
|
strings.TrimSpace(filter.TaskType) != "" ||
|
||||||
|
strings.TrimSpace(filter.ModelProfile) != "" ||
|
||||||
|
len(filter.Statuses) > 0 ||
|
||||||
|
len(filter.ErrorCodes) > 0
|
||||||
|
}
|
||||||
|
|||||||
@@ -67,6 +67,21 @@ type FailJob struct {
|
|||||||
ErrorMessage string `json:"error_message"`
|
ErrorMessage string `json:"error_message"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type JobFilter struct {
|
||||||
|
OwnerService string `json:"owner_service,omitempty"`
|
||||||
|
OwnerRef string `json:"owner_ref,omitempty"`
|
||||||
|
TaskType string `json:"task_type,omitempty"`
|
||||||
|
ModelProfile string `json:"model_profile,omitempty"`
|
||||||
|
Statuses []string `json:"statuses,omitempty"`
|
||||||
|
ErrorCodes []string `json:"error_codes,omitempty"`
|
||||||
|
Limit int `json:"limit,omitempty"`
|
||||||
|
Offset int `json:"offset,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type BatchActionResult struct {
|
||||||
|
Updated int `json:"updated"`
|
||||||
|
}
|
||||||
|
|
||||||
type QueueStat struct {
|
type QueueStat struct {
|
||||||
TaskType string `json:"task_type"`
|
TaskType string `json:"task_type"`
|
||||||
ModelProfile string `json:"model_profile"`
|
ModelProfile string `json:"model_profile"`
|
||||||
@@ -82,8 +97,17 @@ type ErrorStat struct {
|
|||||||
Last24h int64 `json:"last_24h"`
|
Last24h int64 `json:"last_24h"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type OwnerStat struct {
|
||||||
|
OwnerService string `json:"owner_service"`
|
||||||
|
TaskType string `json:"task_type"`
|
||||||
|
ModelProfile string `json:"model_profile"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Total int64 `json:"total"`
|
||||||
|
}
|
||||||
|
|
||||||
type Stats struct {
|
type Stats struct {
|
||||||
At time.Time `json:"at"`
|
At time.Time `json:"at"`
|
||||||
Queues []QueueStat `json:"queues"`
|
Queues []QueueStat `json:"queues"`
|
||||||
|
Owners []OwnerStat `json:"owners,omitempty"`
|
||||||
Errors []ErrorStat `json:"errors,omitempty"`
|
Errors []ErrorStat `json:"errors,omitempty"`
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -128,6 +128,67 @@ WHERE id = $1
|
|||||||
return job, err
|
return job, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) ListJobs(ctx context.Context, filter model.JobFilter) ([]*model.Job, error) {
|
||||||
|
normalizeFilter(&filter)
|
||||||
|
const q = `
|
||||||
|
SELECT ` + jobSelectColumns + `
|
||||||
|
FROM ai_jobs
|
||||||
|
WHERE ($1 = '' OR owner_service = $1)
|
||||||
|
AND ($2 = '' OR owner_ref = $2)
|
||||||
|
AND ($3 = '' OR task_type = $3)
|
||||||
|
AND ($4 = '' OR model_profile = $4)
|
||||||
|
AND (cardinality($5::text[]) = 0 OR status = ANY($5::text[]))
|
||||||
|
AND (cardinality($6::text[]) = 0 OR COALESCE(NULLIF(error_code, ''), 'unknown') = ANY($6::text[]))
|
||||||
|
ORDER BY created_at DESC
|
||||||
|
LIMIT $7 OFFSET $8
|
||||||
|
`
|
||||||
|
rows, err := s.pool.Query(ctx, q,
|
||||||
|
filter.OwnerService,
|
||||||
|
filter.OwnerRef,
|
||||||
|
filter.TaskType,
|
||||||
|
filter.ModelProfile,
|
||||||
|
filter.Statuses,
|
||||||
|
filter.ErrorCodes,
|
||||||
|
filter.Limit,
|
||||||
|
filter.Offset,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var out []*model.Job
|
||||||
|
for rows.Next() {
|
||||||
|
job, err := scanJob(rows)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out = append(out, job)
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeFilter(filter *model.JobFilter) {
|
||||||
|
filter.OwnerService = strings.TrimSpace(filter.OwnerService)
|
||||||
|
filter.OwnerRef = strings.TrimSpace(filter.OwnerRef)
|
||||||
|
filter.TaskType = strings.TrimSpace(filter.TaskType)
|
||||||
|
filter.ModelProfile = strings.TrimSpace(filter.ModelProfile)
|
||||||
|
if filter.Statuses == nil {
|
||||||
|
filter.Statuses = []string{}
|
||||||
|
}
|
||||||
|
if filter.ErrorCodes == nil {
|
||||||
|
filter.ErrorCodes = []string{}
|
||||||
|
}
|
||||||
|
if filter.Limit <= 0 {
|
||||||
|
filter.Limit = 100
|
||||||
|
}
|
||||||
|
if filter.Limit > 500 {
|
||||||
|
filter.Limit = 500
|
||||||
|
}
|
||||||
|
if filter.Offset < 0 {
|
||||||
|
filter.Offset = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Store) RetryJob(ctx context.Context, id uuid.UUID) (*model.Job, error) {
|
func (s *Store) RetryJob(ctx context.Context, id uuid.UUID) (*model.Job, error) {
|
||||||
const q = `
|
const q = `
|
||||||
UPDATE ai_jobs
|
UPDATE ai_jobs
|
||||||
@@ -155,6 +216,87 @@ RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, stat
|
|||||||
return job, err
|
return job, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) RetryJobs(ctx context.Context, filter model.JobFilter) (int, error) {
|
||||||
|
normalizeFilter(&filter)
|
||||||
|
const q = `
|
||||||
|
WITH picked AS (
|
||||||
|
SELECT id
|
||||||
|
FROM ai_jobs
|
||||||
|
WHERE status IN ('failed', 'running')
|
||||||
|
AND ($1 = '' OR owner_service = $1)
|
||||||
|
AND ($2 = '' OR owner_ref = $2)
|
||||||
|
AND ($3 = '' OR task_type = $3)
|
||||||
|
AND ($4 = '' OR model_profile = $4)
|
||||||
|
AND (cardinality($5::text[]) = 0 OR COALESCE(NULLIF(error_code, ''), 'unknown') = ANY($5::text[]))
|
||||||
|
ORDER BY updated_at ASC
|
||||||
|
LIMIT $6
|
||||||
|
)
|
||||||
|
UPDATE ai_jobs j
|
||||||
|
SET status = 'pending',
|
||||||
|
attempts = 0,
|
||||||
|
started_at = NULL,
|
||||||
|
completed_at = NULL,
|
||||||
|
error_code = NULL,
|
||||||
|
error_message = NULL,
|
||||||
|
worker_id = NULL,
|
||||||
|
heartbeat_at = NULL,
|
||||||
|
scheduled_at = NOW(),
|
||||||
|
updated_at = NOW()
|
||||||
|
FROM picked
|
||||||
|
WHERE j.id = picked.id
|
||||||
|
`
|
||||||
|
tag, err := s.pool.Exec(ctx, q,
|
||||||
|
filter.OwnerService,
|
||||||
|
filter.OwnerRef,
|
||||||
|
filter.TaskType,
|
||||||
|
filter.ModelProfile,
|
||||||
|
filter.ErrorCodes,
|
||||||
|
filter.Limit,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return int(tag.RowsAffected()), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) {
|
||||||
|
normalizeFilter(&filter)
|
||||||
|
const q = `
|
||||||
|
WITH picked AS (
|
||||||
|
SELECT id
|
||||||
|
FROM ai_jobs
|
||||||
|
WHERE status IN ('pending', 'running')
|
||||||
|
AND ($1 = '' OR owner_service = $1)
|
||||||
|
AND ($2 = '' OR owner_ref = $2)
|
||||||
|
AND ($3 = '' OR task_type = $3)
|
||||||
|
AND ($4 = '' OR model_profile = $4)
|
||||||
|
AND (cardinality($5::text[]) = 0 OR status = ANY($5::text[]))
|
||||||
|
ORDER BY updated_at ASC
|
||||||
|
LIMIT $6
|
||||||
|
)
|
||||||
|
UPDATE ai_jobs j
|
||||||
|
SET status = 'cancelled',
|
||||||
|
completed_at = NOW(),
|
||||||
|
worker_id = NULL,
|
||||||
|
heartbeat_at = NULL,
|
||||||
|
updated_at = NOW()
|
||||||
|
FROM picked
|
||||||
|
WHERE j.id = picked.id
|
||||||
|
`
|
||||||
|
tag, err := s.pool.Exec(ctx, q,
|
||||||
|
filter.OwnerService,
|
||||||
|
filter.OwnerRef,
|
||||||
|
filter.TaskType,
|
||||||
|
filter.ModelProfile,
|
||||||
|
filter.Statuses,
|
||||||
|
filter.Limit,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return int(tag.RowsAffected()), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Store) ClaimJobs(ctx context.Context, in model.ClaimJobs) ([]*model.Job, error) {
|
func (s *Store) ClaimJobs(ctx context.Context, in model.ClaimJobs) ([]*model.Job, error) {
|
||||||
if in.Limit <= 0 {
|
if in.Limit <= 0 {
|
||||||
in.Limit = 1
|
in.Limit = 1
|
||||||
@@ -326,6 +468,27 @@ ORDER BY task_type, model_profile, status
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ownerRows, err := s.pool.Query(ctx, `
|
||||||
|
SELECT owner_service, task_type, model_profile, status, count(*)
|
||||||
|
FROM ai_jobs
|
||||||
|
GROUP BY owner_service, task_type, model_profile, status
|
||||||
|
ORDER BY owner_service, task_type, model_profile, status
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer ownerRows.Close()
|
||||||
|
for ownerRows.Next() {
|
||||||
|
var stat model.OwnerStat
|
||||||
|
if err := ownerRows.Scan(&stat.OwnerService, &stat.TaskType, &stat.ModelProfile, &stat.Status, &stat.Total); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out.Owners = append(out.Owners, stat)
|
||||||
|
}
|
||||||
|
if err := ownerRows.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
errorRows, err := s.pool.Query(ctx, `
|
errorRows, err := s.pool.Query(ctx, `
|
||||||
SELECT task_type, model_profile, COALESCE(NULLIF(error_code, ''), 'unknown') AS error_code,
|
SELECT task_type, model_profile, COALESCE(NULLIF(error_code, ''), 'unknown') AS error_code,
|
||||||
count(*) AS total,
|
count(*) AS total,
|
||||||
|
|||||||
Reference in New Issue
Block a user