diff --git a/internal/httpapi/server.go b/internal/httpapi/server.go index 5d4df11..ca5cc26 100644 --- a/internal/httpapi/server.go +++ b/internal/httpapi/server.go @@ -61,6 +61,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.handleGetJob(w, r, path) case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"): s.handleRetryJob(w, r, path) + case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/cancel"): + s.handleCancelJob(w, r, path) case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"): s.handleCompleteJob(w, r, path) case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"): @@ -267,7 +269,7 @@ func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request, path strin } func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) { - id, err := jobIDFromPath(path, true) + id, err := jobIDFromActionPath(path, "retry") if err != nil { writeError(w, http.StatusBadRequest, err.Error()) return @@ -286,6 +288,26 @@ func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path str writeJSON(w, http.StatusOK, job) } +func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request, path string) { + id, err := jobIDFromActionPath(path, "cancel") + if err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + ctx, cancel := contextWithTimeout(r, 8*time.Second) + defer cancel() + job, err := s.store.CancelJob(ctx, id) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + if job == nil { + writeError(w, http.StatusNotFound, "cancellable job not found") + return + } + writeJSON(w, http.StatusOK, job) +} + func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) { id, err := jobIDFromActionPath(path, "complete") if err != nil { diff --git a/internal/store/store.go b/internal/store/store.go index 71563a8..965c9ac 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -451,6 +451,28 @@ WHERE j.id = picked.id return int(tag.RowsAffected()), nil } +func (s *Store) CancelJob(ctx context.Context, id uuid.UUID) (*model.Job, error) { + const q = ` +UPDATE ai_jobs +SET status = 'cancelled', + completed_at = NOW(), + worker_id = NULL, + heartbeat_at = NULL, + updated_at = NOW() +WHERE id = $1 + AND status IN ('pending', 'running') +RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, status, + attempts, max_attempts, input, result, error_code, error_message, + scheduled_at, started_at, completed_at, worker_id, heartbeat_at, + created_at, updated_at, idempotency_key +` + job, err := scanJob(s.pool.QueryRow(ctx, q, id)) + if errors.Is(err, pgx.ErrNoRows) { + return nil, nil + } + return job, err +} + func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) { normalizeFilter(&filter) const q = `