diff --git a/internal/httpapi/dashboard.go b/internal/httpapi/dashboard.go index 88313db..79ee903 100644 --- a/internal/httpapi/dashboard.go +++ b/internal/httpapi/dashboard.go @@ -13,7 +13,7 @@ type dashboardResponse struct { Stats *model.Stats `json:"stats"` Providers providersStatusResponse `json:"providers"` Infra infraStatusResponse `json:"infra"` - Jobs []*model.Job `json:"jobs"` + Jobs []*model.JobSummary `json:"jobs"` } type dashboardSummary struct { @@ -35,7 +35,7 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err.Error()) return } - jobs, err := s.store.ListJobs(ctx, model.JobFilter{ + jobs, err := s.store.ListJobSummaries(ctx, model.JobFilter{ Statuses: []string{model.StatusFailed, model.StatusRunning}, Limit: 40, }) diff --git a/internal/migrate/sql/005_ai_jobs_recent_status.up.sql b/internal/migrate/sql/005_ai_jobs_recent_status.up.sql new file mode 100644 index 0000000..53008b0 --- /dev/null +++ b/internal/migrate/sql/005_ai_jobs_recent_status.up.sql @@ -0,0 +1,2 @@ +CREATE INDEX IF NOT EXISTS ai_jobs_recent_status_idx + ON ai_jobs (status, updated_at DESC, created_at DESC); diff --git a/internal/model/job.go b/internal/model/job.go index eb6b96f..3cc9b32 100644 --- a/internal/model/job.go +++ b/internal/model/job.go @@ -39,6 +39,27 @@ type Job struct { IdempotencyKey *string `json:"idempotency_key,omitempty"` } +type JobSummary struct { + ID uuid.UUID `json:"id"` + OwnerService string `json:"owner_service"` + OwnerRef string `json:"owner_ref"` + TaskType string `json:"task_type"` + ModelProfile string `json:"model_profile"` + Priority int `json:"priority"` + Status string `json:"status"` + Attempts int `json:"attempts"` + MaxAttempts int `json:"max_attempts"` + ErrorCode *string `json:"error_code,omitempty"` + ErrorMessage *string `json:"error_message,omitempty"` + ScheduledAt time.Time `json:"scheduled_at"` + StartedAt *time.Time `json:"started_at,omitempty"` + CompletedAt *time.Time `json:"completed_at,omitempty"` + WorkerID *string `json:"worker_id,omitempty"` + HeartbeatAt *time.Time `json:"heartbeat_at,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + type CreateJob struct { OwnerService string `json:"owner_service"` OwnerRef string `json:"owner_ref"` diff --git a/internal/store/store.go b/internal/store/store.go index ec31e3b..879d538 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -174,6 +174,48 @@ LIMIT $7 OFFSET $8 return out, rows.Err() } +func (s *Store) ListJobSummaries(ctx context.Context, filter model.JobFilter) ([]*model.JobSummary, error) { + normalizeFilter(&filter) + const q = ` +SELECT id, owner_service, owner_ref, task_type, model_profile, priority, status, + attempts, max_attempts, error_code, error_message, + scheduled_at, started_at, completed_at, worker_id, heartbeat_at, + created_at, updated_at +FROM ai_jobs +WHERE ($1 = '' OR owner_service = $1) + AND ($2 = '' OR owner_ref = $2) + AND ($3 = '' OR task_type = $3) + AND ($4 = '' OR model_profile = $4) + AND (cardinality($5::text[]) = 0 OR status = ANY($5::text[])) + AND (cardinality($6::text[]) = 0 OR COALESCE(NULLIF(error_code, ''), 'unknown') = ANY($6::text[])) +ORDER BY updated_at DESC, created_at DESC +LIMIT $7 OFFSET $8 +` + rows, err := s.pool.Query(ctx, q, + filter.OwnerService, + filter.OwnerRef, + filter.TaskType, + filter.ModelProfile, + filter.Statuses, + filter.ErrorCodes, + filter.Limit, + filter.Offset, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var out []*model.JobSummary + for rows.Next() { + job, err := scanJobSummary(rows) + if err != nil { + return nil, err + } + out = append(out, job) + } + return out, rows.Err() +} + func normalizeFilter(filter *model.JobFilter) { filter.OwnerService = strings.TrimSpace(filter.OwnerService) filter.OwnerRef = strings.TrimSpace(filter.OwnerRef) @@ -519,6 +561,34 @@ ORDER BY last_24h DESC, total DESC return out, errorRows.Err() } +func scanJobSummary(row pgx.Row) (*model.JobSummary, error) { + var job model.JobSummary + err := row.Scan( + &job.ID, + &job.OwnerService, + &job.OwnerRef, + &job.TaskType, + &job.ModelProfile, + &job.Priority, + &job.Status, + &job.Attempts, + &job.MaxAttempts, + &job.ErrorCode, + &job.ErrorMessage, + &job.ScheduledAt, + &job.StartedAt, + &job.CompletedAt, + &job.WorkerID, + &job.HeartbeatAt, + &job.CreatedAt, + &job.UpdatedAt, + ) + if err != nil { + return nil, err + } + return &job, nil +} + func scanJob(row pgx.Row) (*model.Job, error) { var job model.Job var input []byte diff --git a/migrations/005_ai_jobs_recent_status.down.sql b/migrations/005_ai_jobs_recent_status.down.sql new file mode 100644 index 0000000..a08e6f7 --- /dev/null +++ b/migrations/005_ai_jobs_recent_status.down.sql @@ -0,0 +1 @@ +DROP INDEX IF EXISTS ai_jobs_recent_status_idx; diff --git a/migrations/005_ai_jobs_recent_status.up.sql b/migrations/005_ai_jobs_recent_status.up.sql new file mode 100644 index 0000000..53008b0 --- /dev/null +++ b/migrations/005_ai_jobs_recent_status.up.sql @@ -0,0 +1,2 @@ +CREATE INDEX IF NOT EXISTS ai_jobs_recent_status_idx + ON ai_jobs (status, updated_at DESC, created_at DESC);