feat: cancel ai jobs by id
This commit is contained in:
@@ -61,6 +61,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
s.handleGetJob(w, r, path)
|
s.handleGetJob(w, r, path)
|
||||||
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"):
|
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"):
|
||||||
s.handleRetryJob(w, r, path)
|
s.handleRetryJob(w, r, path)
|
||||||
|
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/cancel"):
|
||||||
|
s.handleCancelJob(w, r, path)
|
||||||
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"):
|
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"):
|
||||||
s.handleCompleteJob(w, r, path)
|
s.handleCompleteJob(w, r, path)
|
||||||
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"):
|
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"):
|
||||||
@@ -267,7 +269,7 @@ func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request, path strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) {
|
func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) {
|
||||||
id, err := jobIDFromPath(path, true)
|
id, err := jobIDFromActionPath(path, "retry")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(w, http.StatusBadRequest, err.Error())
|
writeError(w, http.StatusBadRequest, err.Error())
|
||||||
return
|
return
|
||||||
@@ -286,6 +288,26 @@ func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path str
|
|||||||
writeJSON(w, http.StatusOK, job)
|
writeJSON(w, http.StatusOK, job)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request, path string) {
|
||||||
|
id, err := jobIDFromActionPath(path, "cancel")
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
job, err := s.store.CancelJob(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if job == nil {
|
||||||
|
writeError(w, http.StatusNotFound, "cancellable job not found")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, job)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) {
|
func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) {
|
||||||
id, err := jobIDFromActionPath(path, "complete")
|
id, err := jobIDFromActionPath(path, "complete")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -451,6 +451,28 @@ WHERE j.id = picked.id
|
|||||||
return int(tag.RowsAffected()), nil
|
return int(tag.RowsAffected()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) CancelJob(ctx context.Context, id uuid.UUID) (*model.Job, error) {
|
||||||
|
const q = `
|
||||||
|
UPDATE ai_jobs
|
||||||
|
SET status = 'cancelled',
|
||||||
|
completed_at = NOW(),
|
||||||
|
worker_id = NULL,
|
||||||
|
heartbeat_at = NULL,
|
||||||
|
updated_at = NOW()
|
||||||
|
WHERE id = $1
|
||||||
|
AND status IN ('pending', 'running')
|
||||||
|
RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, status,
|
||||||
|
attempts, max_attempts, input, result, error_code, error_message,
|
||||||
|
scheduled_at, started_at, completed_at, worker_id, heartbeat_at,
|
||||||
|
created_at, updated_at, idempotency_key
|
||||||
|
`
|
||||||
|
job, err := scanJob(s.pool.QueryRow(ctx, q, id))
|
||||||
|
if errors.Is(err, pgx.ErrNoRows) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return job, err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) {
|
func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) {
|
||||||
normalizeFilter(&filter)
|
normalizeFilter(&filter)
|
||||||
const q = `
|
const q = `
|
||||||
|
|||||||
Reference in New Issue
Block a user