diff --git a/README.md b/README.md index 5a8e90d..57d8407 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ or compact `system` / `user` fields. The completed job result contains - `WORKER_ID`, default hostname - `WORKER_POLL_INTERVAL`, default `2s` - `WORKER_CLAIM_LIMIT`, default `4` +- `WORKER_LEASE_TIMEOUT`, default `15m` ## Next integration step diff --git a/cmd/worker/main.go b/cmd/worker/main.go index a1365a6..2aa8dd1 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -41,12 +41,13 @@ func main() { } llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout) - w := worker.New(db, llmClient, cfg.WorkerID, cfg.LLMModel, cfg.WorkerPollInterval, cfg.WorkerClaimLimit) + w := worker.New(db, llmClient, cfg.WorkerID, cfg.LLMModel, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit) slog.Info("ai_worker_started", "worker_id", cfg.WorkerID, "model", cfg.LLMModel, "poll_interval", cfg.WorkerPollInterval.String(), + "lease_timeout", cfg.WorkerLeaseTimeout.String(), "claim_limit", cfg.WorkerClaimLimit, ) w.Run(ctx) diff --git a/internal/config/config.go b/internal/config/config.go index 5df8abb..284e7b8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -21,6 +21,7 @@ type Config struct { WorkerID string WorkerPollInterval time.Duration WorkerClaimLimit int + WorkerLeaseTimeout time.Duration } func Load() Config { @@ -39,6 +40,7 @@ func Load() Config { WorkerID: envString("WORKER_ID", hostname()), WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second), WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4), + WorkerLeaseTimeout: envDuration("WORKER_LEASE_TIMEOUT", 15*time.Minute), } } diff --git a/internal/store/store.go b/internal/store/store.go index 07dbb79..1fe7359 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -264,6 +264,44 @@ RETURNING ` + jobSelectColumns + ` return job, err } +func (s *Store) RequeueStaleRunning(ctx context.Context, olderThan time.Duration, limit int) (int, error) { + if olderThan <= 0 { + olderThan = 15 * time.Minute + } + if limit <= 0 { + limit = 100 + } + if limit > 1000 { + limit = 1000 + } + const q = ` +WITH picked AS ( + SELECT id + FROM ai_jobs + WHERE status = 'running' + AND COALESCE(heartbeat_at, started_at, updated_at) < NOW() - make_interval(secs => $1) + ORDER BY COALESCE(heartbeat_at, started_at, updated_at) ASC + LIMIT $2 +) +UPDATE ai_jobs j +SET status = CASE WHEN j.attempts < j.max_attempts THEN 'pending' ELSE 'failed' END, + error_code = CASE WHEN j.attempts < j.max_attempts THEN NULL ELSE 'stale_worker' END, + error_message = CASE WHEN j.attempts < j.max_attempts THEN NULL ELSE 'worker lease expired' END, + worker_id = NULL, + heartbeat_at = NULL, + completed_at = CASE WHEN j.attempts < j.max_attempts THEN NULL ELSE NOW() END, + scheduled_at = CASE WHEN j.attempts < j.max_attempts THEN NOW() ELSE j.scheduled_at END, + updated_at = NOW() +FROM picked +WHERE j.id = picked.id +` + tag, err := s.pool.Exec(ctx, q, int(olderThan.Seconds()), limit) + if err != nil { + return 0, err + } + return int(tag.RowsAffected()), nil +} + func (s *Store) Stats(ctx context.Context) (*model.Stats, error) { out := &model.Stats{At: time.Now().UTC()} diff --git a/internal/worker/worker.go b/internal/worker/worker.go index ff46055..c58d353 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -24,12 +24,16 @@ type Worker struct { modelProfile string pollInterval time.Duration claimLimit int + leaseTimeout time.Duration } -func New(store *store.Store, llmClient *llm.Client, workerID, modelProfile string, pollInterval time.Duration, claimLimit int) *Worker { +func New(store *store.Store, llmClient *llm.Client, workerID, modelProfile string, pollInterval, leaseTimeout time.Duration, claimLimit int) *Worker { if pollInterval <= 0 { pollInterval = 2 * time.Second } + if leaseTimeout <= 0 { + leaseTimeout = 15 * time.Minute + } if claimLimit <= 0 { claimLimit = 4 } @@ -43,6 +47,7 @@ func New(store *store.Store, llmClient *llm.Client, workerID, modelProfile strin modelProfile: modelProfile, pollInterval: pollInterval, claimLimit: claimLimit, + leaseTimeout: leaseTimeout, } } @@ -60,6 +65,11 @@ func (w *Worker) Run(ctx context.Context) { } func (w *Worker) tick(ctx context.Context) { + if reset, err := w.store.RequeueStaleRunning(ctx, w.leaseTimeout, 100); err != nil { + slog.Error("requeue stale jobs failed", "error", err) + } else if reset > 0 { + slog.Warn("requeued stale jobs", "count", reset) + } jobs, err := w.store.ClaimJobs(ctx, model.ClaimJobs{ WorkerID: w.workerID, TaskTypes: []string{TaskLLMChat, TaskChatCompletion}, diff --git a/k8s/configmap.yaml b/k8s/configmap.yaml index 53d21f4..bcada4c 100644 --- a/k8s/configmap.yaml +++ b/k8s/configmap.yaml @@ -14,3 +14,4 @@ data: WHISPERX_URL: "http://10.2.3.5:8001" WORKER_POLL_INTERVAL: "2s" WORKER_CLAIM_LIMIT: "4" + WORKER_LEASE_TIMEOUT: "15m"