From 800d1d7cdde11e8341311f1d1dd0fc3bf9d233ba Mon Sep 17 00:00:00 2001 From: Grendgi Date: Wed, 10 Jun 2026 16:48:58 +0300 Subject: [PATCH] Keep AI jobs alive during processing --- internal/store/store.go | 18 ++++++++++++++++++ internal/worker/worker.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/internal/store/store.go b/internal/store/store.go index 200fc47..b8a8074 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -542,6 +542,24 @@ RETURNING ` + jobSelectColumns + ` return job, err } +func (s *Store) HeartbeatJob(ctx context.Context, id uuid.UUID) error { + const q = ` +UPDATE ai_jobs +SET heartbeat_at = NOW(), + updated_at = NOW() +WHERE id = $1 + AND status = 'running' +` + tag, err := s.pool.Exec(ctx, q, id) + if err != nil { + return err + } + if tag.RowsAffected() == 0 { + return nil + } + return nil +} + func (s *Store) FailJob(ctx context.Context, id uuid.UUID, in model.FailJob) (*model.Job, error) { errorCode := strings.TrimSpace(in.ErrorCode) if errorCode == "" { diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 332f378..6f6db11 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -113,6 +113,9 @@ func (w *Worker) tick(ctx context.Context) { } func (w *Worker) process(ctx context.Context, job *model.Job) { + stopHeartbeat := w.startHeartbeat(ctx, job) + defer stopHeartbeat() + if job.TaskType == TaskTranscription { w.processTranscription(ctx, job) return @@ -168,6 +171,41 @@ func (w *Worker) fail(ctx context.Context, job *model.Job, code, message string) } } +func (w *Worker) startHeartbeat(ctx context.Context, job *model.Job) func() { + heartbeatCtx, cancel := context.WithCancel(ctx) + done := make(chan struct{}) + ticker := time.NewTicker(w.heartbeatInterval()) + go func() { + defer close(done) + defer ticker.Stop() + for { + select { + case <-heartbeatCtx.Done(): + return + case <-ticker.C: + if err := w.store.HeartbeatJob(heartbeatCtx, job.ID); err != nil { + slog.Warn("heartbeat job failed", "job_id", job.ID, "error", err) + } + } + } + }() + return func() { + cancel() + <-done + } +} + +func (w *Worker) heartbeatInterval() time.Duration { + interval := w.leaseTimeout / 3 + if interval < 10*time.Second { + return 10 * time.Second + } + if interval > time.Minute { + return time.Minute + } + return interval +} + func classifyTranscriptionError(err error) string { if err == nil { return "unknown"