diff --git a/internal/store/store.go b/internal/store/store.go index 8311a7e..9899e6a 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -78,7 +78,33 @@ INSERT INTO ai_jobs ( ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL -DO UPDATE SET updated_at = ai_jobs.updated_at +DO UPDATE SET + status = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN 'pending' ELSE ai_jobs.status END, + priority = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN EXCLUDED.priority ELSE ai_jobs.priority END, + max_attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN EXCLUDED.max_attempts ELSE ai_jobs.max_attempts END, + attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN 0 ELSE ai_jobs.attempts END, + input = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN EXCLUDED.input ELSE ai_jobs.input END, + scheduled_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN EXCLUDED.scheduled_at ELSE ai_jobs.scheduled_at END, + started_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NULL ELSE ai_jobs.started_at END, + completed_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NULL ELSE ai_jobs.completed_at END, + worker_id = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NULL ELSE ai_jobs.worker_id END, + heartbeat_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NULL ELSE ai_jobs.heartbeat_at END, + error_code = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NULL ELSE ai_jobs.error_code END, + error_message = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NULL ELSE ai_jobs.error_message END, + updated_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NOW() ELSE ai_jobs.updated_at END RETURNING ` + jobSelectColumns + ` ` row := s.pool.QueryRow(ctx, q, @@ -106,7 +132,33 @@ INSERT INTO ai_jobs ( ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL -DO UPDATE SET updated_at = ai_jobs.updated_at +DO UPDATE SET + status = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN 'pending' ELSE ai_jobs.status END, + priority = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN EXCLUDED.priority ELSE ai_jobs.priority END, + max_attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN EXCLUDED.max_attempts ELSE ai_jobs.max_attempts END, + attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN 0 ELSE ai_jobs.attempts END, + input = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN EXCLUDED.input ELSE ai_jobs.input END, + scheduled_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN EXCLUDED.scheduled_at ELSE ai_jobs.scheduled_at END, + started_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NULL ELSE ai_jobs.started_at END, + completed_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NULL ELSE ai_jobs.completed_at END, + worker_id = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NULL ELSE ai_jobs.worker_id END, + heartbeat_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NULL ELSE ai_jobs.heartbeat_at END, + error_code = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NULL ELSE ai_jobs.error_code END, + error_message = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NULL ELSE ai_jobs.error_message END, + updated_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker' + THEN NOW() ELSE ai_jobs.updated_at END RETURNING ` + jobSelectColumns + ` ` var batch pgx.Batch