From 837acf2f0098861392d0b9fe4527b8c93125a679 Mon Sep 17 00:00:00 2001 From: Grendgi Date: Wed, 10 Jun 2026 16:36:35 +0300 Subject: [PATCH] Add AI queue backlog metrics --- internal/model/job.go | 21 ++++++++++++++++----- internal/store/store.go | 42 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 6 deletions(-) diff --git a/internal/model/job.go b/internal/model/job.go index 3ea80ba..3c1a7ee 100644 --- a/internal/model/job.go +++ b/internal/model/job.go @@ -129,6 +129,16 @@ type StageStat struct { Retried24h int64 `json:"retried_24h"` } +type BacklogStat struct { + OwnerService string `json:"owner_service"` + TaskType string `json:"task_type"` + ModelProfile string `json:"model_profile"` + Pending int64 `json:"pending"` + Running int64 `json:"running"` + OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"` + OldestPendingScheduledAt string `json:"oldest_pending_scheduled_at,omitempty"` +} + type OwnerStat struct { OwnerService string `json:"owner_service"` TaskType string `json:"task_type"` @@ -138,9 +148,10 @@ type OwnerStat struct { } type Stats struct { - At time.Time `json:"at"` - Queues []QueueStat `json:"queues"` - Owners []OwnerStat `json:"owners,omitempty"` - Errors []ErrorStat `json:"errors,omitempty"` - Stages []StageStat `json:"stages,omitempty"` + At time.Time `json:"at"` + Queues []QueueStat `json:"queues"` + Owners []OwnerStat `json:"owners,omitempty"` + Errors []ErrorStat `json:"errors,omitempty"` + Stages []StageStat `json:"stages,omitempty"` + Backlog []BacklogStat `json:"backlog,omitempty"` } diff --git a/internal/store/store.go b/internal/store/store.go index a752041..200fc47 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -713,7 +713,47 @@ ORDER BY owner_service, task_type, model_profile } out.Stages = append(out.Stages, stat) } - return out, stageRows.Err() + if err := stageRows.Err(); err != nil { + return nil, err + } + + backlogRows, err := s.pool.Query(ctx, ` +SELECT owner_service, + task_type, + model_profile, + count(*) FILTER (WHERE status = 'pending') AS pending, + count(*) FILTER (WHERE status = 'running') AS running, + COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(scheduled_at) FILTER (WHERE status = 'pending')))::bigint, 0) AS oldest_pending_age_seconds, + MIN(scheduled_at) FILTER (WHERE status = 'pending') AS oldest_pending_scheduled_at +FROM ai_jobs +WHERE status IN ('pending', 'running') +GROUP BY owner_service, task_type, model_profile +ORDER BY pending DESC, running DESC, owner_service, task_type, model_profile +`) + if err != nil { + return nil, err + } + defer backlogRows.Close() + for backlogRows.Next() { + var stat model.BacklogStat + var oldestPendingScheduledAt *time.Time + if err := backlogRows.Scan( + &stat.OwnerService, + &stat.TaskType, + &stat.ModelProfile, + &stat.Pending, + &stat.Running, + &stat.OldestPendingAgeSeconds, + &oldestPendingScheduledAt, + ); err != nil { + return nil, err + } + if oldestPendingScheduledAt != nil { + stat.OldestPendingScheduledAt = oldestPendingScheduledAt.UTC().Format(time.RFC3339) + } + out.Backlog = append(out.Backlog, stat) + } + return out, backlogRows.Err() } func scanJobSummary(row pgx.Row) (*model.JobSummary, error) {