diff --git a/internal/httpapi/dashboard.go b/internal/httpapi/dashboard.go index 7feea99..2378017 100644 --- a/internal/httpapi/dashboard.go +++ b/internal/httpapi/dashboard.go @@ -31,7 +31,7 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) { ctx, cancel := contextWithTimeout(r, 12*time.Second) defer cancel() - stats, err := s.store.Stats(ctx) + stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return diff --git a/internal/httpapi/server.go b/internal/httpapi/server.go index 254c3a7..0c27491 100644 --- a/internal/httpapi/server.go +++ b/internal/httpapi/server.go @@ -337,7 +337,7 @@ func (s *Server) handleFailJob(w http.ResponseWriter, r *http.Request, path stri func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) { ctx, cancel := contextWithTimeout(r, 8*time.Second) defer cancel() - stats, err := s.store.Stats(ctx) + stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return diff --git a/internal/model/job.go b/internal/model/job.go index 3c1a7ee..c48eb65 100644 --- a/internal/model/job.go +++ b/internal/model/job.go @@ -135,8 +135,12 @@ type BacklogStat struct { ModelProfile string `json:"model_profile"` Pending int64 `json:"pending"` Running int64 `json:"running"` + StaleRunning int64 `json:"stale_running"` OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"` OldestPendingScheduledAt string `json:"oldest_pending_scheduled_at,omitempty"` + OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"` + OldestRunningStartedAt string `json:"oldest_running_started_at,omitempty"` + LastHeartbeatAt string `json:"last_heartbeat_at,omitempty"` } type OwnerStat struct { diff --git a/internal/store/store.go b/internal/store/store.go index 9013fc1..71563a8 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -658,7 +658,10 @@ WHERE j.id = picked.id return int(tag.RowsAffected()), nil } -func (s *Store) Stats(ctx context.Context) (*model.Stats, error) { +func (s *Store) Stats(ctx context.Context, staleAfter time.Duration) (*model.Stats, error) { + if staleAfter <= 0 { + staleAfter = 15 * time.Minute + } out := &model.Stats{At: time.Now().UTC()} queueRows, err := s.pool.Query(ctx, ` @@ -772,13 +775,20 @@ SELECT owner_service, model_profile, count(*) FILTER (WHERE status = 'pending') AS pending, count(*) FILTER (WHERE status = 'running') AS running, + count(*) FILTER ( + WHERE status = 'running' + AND COALESCE(heartbeat_at, started_at, updated_at) < NOW() - make_interval(secs => $1) + ) AS stale_running, COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(scheduled_at) FILTER (WHERE status = 'pending')))::bigint, 0) AS oldest_pending_age_seconds, - MIN(scheduled_at) FILTER (WHERE status = 'pending') AS oldest_pending_scheduled_at + MIN(scheduled_at) FILTER (WHERE status = 'pending') AS oldest_pending_scheduled_at, + COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(started_at) FILTER (WHERE status = 'running')))::bigint, 0) AS oldest_running_age_seconds, + MIN(started_at) FILTER (WHERE status = 'running') AS oldest_running_started_at, + MAX(heartbeat_at) FILTER (WHERE status = 'running') AS last_heartbeat_at FROM ai_jobs WHERE status IN ('pending', 'running') GROUP BY owner_service, task_type, model_profile -ORDER BY pending DESC, running DESC, owner_service, task_type, model_profile -`) +ORDER BY stale_running DESC, pending DESC, running DESC, owner_service, task_type, model_profile +`, int(staleAfter.Seconds())) if err != nil { return nil, err } @@ -786,20 +796,32 @@ ORDER BY pending DESC, running DESC, owner_service, task_type, model_profile for backlogRows.Next() { var stat model.BacklogStat var oldestPendingScheduledAt *time.Time + var oldestRunningStartedAt *time.Time + var lastHeartbeatAt *time.Time if err := backlogRows.Scan( &stat.OwnerService, &stat.TaskType, &stat.ModelProfile, &stat.Pending, &stat.Running, + &stat.StaleRunning, &stat.OldestPendingAgeSeconds, &oldestPendingScheduledAt, + &stat.OldestRunningAgeSeconds, + &oldestRunningStartedAt, + &lastHeartbeatAt, ); err != nil { return nil, err } if oldestPendingScheduledAt != nil { stat.OldestPendingScheduledAt = oldestPendingScheduledAt.UTC().Format(time.RFC3339) } + if oldestRunningStartedAt != nil { + stat.OldestRunningStartedAt = oldestRunningStartedAt.UTC().Format(time.RFC3339) + } + if lastHeartbeatAt != nil { + stat.LastHeartbeatAt = lastHeartbeatAt.UTC().Format(time.RFC3339) + } out.Backlog = append(out.Backlog, stat) } return out, backlogRows.Err()