Requeue stale AI jobs
This commit is contained in:
@@ -71,6 +71,7 @@ or compact `system` / `user` fields. The completed job result contains
|
|||||||
- `WORKER_ID`, default hostname
|
- `WORKER_ID`, default hostname
|
||||||
- `WORKER_POLL_INTERVAL`, default `2s`
|
- `WORKER_POLL_INTERVAL`, default `2s`
|
||||||
- `WORKER_CLAIM_LIMIT`, default `4`
|
- `WORKER_CLAIM_LIMIT`, default `4`
|
||||||
|
- `WORKER_LEASE_TIMEOUT`, default `15m`
|
||||||
|
|
||||||
## Next integration step
|
## Next integration step
|
||||||
|
|
||||||
|
|||||||
@@ -41,12 +41,13 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout)
|
llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout)
|
||||||
w := worker.New(db, llmClient, cfg.WorkerID, cfg.LLMModel, cfg.WorkerPollInterval, cfg.WorkerClaimLimit)
|
w := worker.New(db, llmClient, cfg.WorkerID, cfg.LLMModel, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit)
|
||||||
|
|
||||||
slog.Info("ai_worker_started",
|
slog.Info("ai_worker_started",
|
||||||
"worker_id", cfg.WorkerID,
|
"worker_id", cfg.WorkerID,
|
||||||
"model", cfg.LLMModel,
|
"model", cfg.LLMModel,
|
||||||
"poll_interval", cfg.WorkerPollInterval.String(),
|
"poll_interval", cfg.WorkerPollInterval.String(),
|
||||||
|
"lease_timeout", cfg.WorkerLeaseTimeout.String(),
|
||||||
"claim_limit", cfg.WorkerClaimLimit,
|
"claim_limit", cfg.WorkerClaimLimit,
|
||||||
)
|
)
|
||||||
w.Run(ctx)
|
w.Run(ctx)
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ type Config struct {
|
|||||||
WorkerID string
|
WorkerID string
|
||||||
WorkerPollInterval time.Duration
|
WorkerPollInterval time.Duration
|
||||||
WorkerClaimLimit int
|
WorkerClaimLimit int
|
||||||
|
WorkerLeaseTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func Load() Config {
|
func Load() Config {
|
||||||
@@ -39,6 +40,7 @@ func Load() Config {
|
|||||||
WorkerID: envString("WORKER_ID", hostname()),
|
WorkerID: envString("WORKER_ID", hostname()),
|
||||||
WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second),
|
WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second),
|
||||||
WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4),
|
WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4),
|
||||||
|
WorkerLeaseTimeout: envDuration("WORKER_LEASE_TIMEOUT", 15*time.Minute),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -264,6 +264,44 @@ RETURNING ` + jobSelectColumns + `
|
|||||||
return job, err
|
return job, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) RequeueStaleRunning(ctx context.Context, olderThan time.Duration, limit int) (int, error) {
|
||||||
|
if olderThan <= 0 {
|
||||||
|
olderThan = 15 * time.Minute
|
||||||
|
}
|
||||||
|
if limit <= 0 {
|
||||||
|
limit = 100
|
||||||
|
}
|
||||||
|
if limit > 1000 {
|
||||||
|
limit = 1000
|
||||||
|
}
|
||||||
|
const q = `
|
||||||
|
WITH picked AS (
|
||||||
|
SELECT id
|
||||||
|
FROM ai_jobs
|
||||||
|
WHERE status = 'running'
|
||||||
|
AND COALESCE(heartbeat_at, started_at, updated_at) < NOW() - make_interval(secs => $1)
|
||||||
|
ORDER BY COALESCE(heartbeat_at, started_at, updated_at) ASC
|
||||||
|
LIMIT $2
|
||||||
|
)
|
||||||
|
UPDATE ai_jobs j
|
||||||
|
SET status = CASE WHEN j.attempts < j.max_attempts THEN 'pending' ELSE 'failed' END,
|
||||||
|
error_code = CASE WHEN j.attempts < j.max_attempts THEN NULL ELSE 'stale_worker' END,
|
||||||
|
error_message = CASE WHEN j.attempts < j.max_attempts THEN NULL ELSE 'worker lease expired' END,
|
||||||
|
worker_id = NULL,
|
||||||
|
heartbeat_at = NULL,
|
||||||
|
completed_at = CASE WHEN j.attempts < j.max_attempts THEN NULL ELSE NOW() END,
|
||||||
|
scheduled_at = CASE WHEN j.attempts < j.max_attempts THEN NOW() ELSE j.scheduled_at END,
|
||||||
|
updated_at = NOW()
|
||||||
|
FROM picked
|
||||||
|
WHERE j.id = picked.id
|
||||||
|
`
|
||||||
|
tag, err := s.pool.Exec(ctx, q, int(olderThan.Seconds()), limit)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return int(tag.RowsAffected()), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Store) Stats(ctx context.Context) (*model.Stats, error) {
|
func (s *Store) Stats(ctx context.Context) (*model.Stats, error) {
|
||||||
out := &model.Stats{At: time.Now().UTC()}
|
out := &model.Stats{At: time.Now().UTC()}
|
||||||
|
|
||||||
|
|||||||
@@ -24,12 +24,16 @@ type Worker struct {
|
|||||||
modelProfile string
|
modelProfile string
|
||||||
pollInterval time.Duration
|
pollInterval time.Duration
|
||||||
claimLimit int
|
claimLimit int
|
||||||
|
leaseTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(store *store.Store, llmClient *llm.Client, workerID, modelProfile string, pollInterval time.Duration, claimLimit int) *Worker {
|
func New(store *store.Store, llmClient *llm.Client, workerID, modelProfile string, pollInterval, leaseTimeout time.Duration, claimLimit int) *Worker {
|
||||||
if pollInterval <= 0 {
|
if pollInterval <= 0 {
|
||||||
pollInterval = 2 * time.Second
|
pollInterval = 2 * time.Second
|
||||||
}
|
}
|
||||||
|
if leaseTimeout <= 0 {
|
||||||
|
leaseTimeout = 15 * time.Minute
|
||||||
|
}
|
||||||
if claimLimit <= 0 {
|
if claimLimit <= 0 {
|
||||||
claimLimit = 4
|
claimLimit = 4
|
||||||
}
|
}
|
||||||
@@ -43,6 +47,7 @@ func New(store *store.Store, llmClient *llm.Client, workerID, modelProfile strin
|
|||||||
modelProfile: modelProfile,
|
modelProfile: modelProfile,
|
||||||
pollInterval: pollInterval,
|
pollInterval: pollInterval,
|
||||||
claimLimit: claimLimit,
|
claimLimit: claimLimit,
|
||||||
|
leaseTimeout: leaseTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,6 +65,11 @@ func (w *Worker) Run(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *Worker) tick(ctx context.Context) {
|
func (w *Worker) tick(ctx context.Context) {
|
||||||
|
if reset, err := w.store.RequeueStaleRunning(ctx, w.leaseTimeout, 100); err != nil {
|
||||||
|
slog.Error("requeue stale jobs failed", "error", err)
|
||||||
|
} else if reset > 0 {
|
||||||
|
slog.Warn("requeued stale jobs", "count", reset)
|
||||||
|
}
|
||||||
jobs, err := w.store.ClaimJobs(ctx, model.ClaimJobs{
|
jobs, err := w.store.ClaimJobs(ctx, model.ClaimJobs{
|
||||||
WorkerID: w.workerID,
|
WorkerID: w.workerID,
|
||||||
TaskTypes: []string{TaskLLMChat, TaskChatCompletion},
|
TaskTypes: []string{TaskLLMChat, TaskChatCompletion},
|
||||||
|
|||||||
@@ -14,3 +14,4 @@ data:
|
|||||||
WHISPERX_URL: "http://10.2.3.5:8001"
|
WHISPERX_URL: "http://10.2.3.5:8001"
|
||||||
WORKER_POLL_INTERVAL: "2s"
|
WORKER_POLL_INTERVAL: "2s"
|
||||||
WORKER_CLAIM_LIMIT: "4"
|
WORKER_CLAIM_LIMIT: "4"
|
||||||
|
WORKER_LEASE_TIMEOUT: "15m"
|
||||||
|
|||||||
Reference in New Issue
Block a user