Expose AI queue heartbeat metrics
This commit is contained in:
@@ -658,7 +658,10 @@ WHERE j.id = picked.id
|
||||
return int(tag.RowsAffected()), nil
|
||||
}
|
||||
|
||||
func (s *Store) Stats(ctx context.Context) (*model.Stats, error) {
|
||||
func (s *Store) Stats(ctx context.Context, staleAfter time.Duration) (*model.Stats, error) {
|
||||
if staleAfter <= 0 {
|
||||
staleAfter = 15 * time.Minute
|
||||
}
|
||||
out := &model.Stats{At: time.Now().UTC()}
|
||||
|
||||
queueRows, err := s.pool.Query(ctx, `
|
||||
@@ -772,13 +775,20 @@ SELECT owner_service,
|
||||
model_profile,
|
||||
count(*) FILTER (WHERE status = 'pending') AS pending,
|
||||
count(*) FILTER (WHERE status = 'running') AS running,
|
||||
count(*) FILTER (
|
||||
WHERE status = 'running'
|
||||
AND COALESCE(heartbeat_at, started_at, updated_at) < NOW() - make_interval(secs => $1)
|
||||
) AS stale_running,
|
||||
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(scheduled_at) FILTER (WHERE status = 'pending')))::bigint, 0) AS oldest_pending_age_seconds,
|
||||
MIN(scheduled_at) FILTER (WHERE status = 'pending') AS oldest_pending_scheduled_at
|
||||
MIN(scheduled_at) FILTER (WHERE status = 'pending') AS oldest_pending_scheduled_at,
|
||||
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(started_at) FILTER (WHERE status = 'running')))::bigint, 0) AS oldest_running_age_seconds,
|
||||
MIN(started_at) FILTER (WHERE status = 'running') AS oldest_running_started_at,
|
||||
MAX(heartbeat_at) FILTER (WHERE status = 'running') AS last_heartbeat_at
|
||||
FROM ai_jobs
|
||||
WHERE status IN ('pending', 'running')
|
||||
GROUP BY owner_service, task_type, model_profile
|
||||
ORDER BY pending DESC, running DESC, owner_service, task_type, model_profile
|
||||
`)
|
||||
ORDER BY stale_running DESC, pending DESC, running DESC, owner_service, task_type, model_profile
|
||||
`, int(staleAfter.Seconds()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -786,20 +796,32 @@ ORDER BY pending DESC, running DESC, owner_service, task_type, model_profile
|
||||
for backlogRows.Next() {
|
||||
var stat model.BacklogStat
|
||||
var oldestPendingScheduledAt *time.Time
|
||||
var oldestRunningStartedAt *time.Time
|
||||
var lastHeartbeatAt *time.Time
|
||||
if err := backlogRows.Scan(
|
||||
&stat.OwnerService,
|
||||
&stat.TaskType,
|
||||
&stat.ModelProfile,
|
||||
&stat.Pending,
|
||||
&stat.Running,
|
||||
&stat.StaleRunning,
|
||||
&stat.OldestPendingAgeSeconds,
|
||||
&oldestPendingScheduledAt,
|
||||
&stat.OldestRunningAgeSeconds,
|
||||
&oldestRunningStartedAt,
|
||||
&lastHeartbeatAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if oldestPendingScheduledAt != nil {
|
||||
stat.OldestPendingScheduledAt = oldestPendingScheduledAt.UTC().Format(time.RFC3339)
|
||||
}
|
||||
if oldestRunningStartedAt != nil {
|
||||
stat.OldestRunningStartedAt = oldestRunningStartedAt.UTC().Format(time.RFC3339)
|
||||
}
|
||||
if lastHeartbeatAt != nil {
|
||||
stat.LastHeartbeatAt = lastHeartbeatAt.UTC().Format(time.RFC3339)
|
||||
}
|
||||
out.Backlog = append(out.Backlog, stat)
|
||||
}
|
||||
return out, backlogRows.Err()
|
||||
|
||||
Reference in New Issue
Block a user