Add AI job stage metrics
This commit is contained in:
@@ -119,6 +119,16 @@ type ErrorStat struct {
|
|||||||
Last24h int64 `json:"last_24h"`
|
Last24h int64 `json:"last_24h"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type StageStat struct {
|
||||||
|
OwnerService string `json:"owner_service"`
|
||||||
|
TaskType string `json:"task_type"`
|
||||||
|
ModelProfile string `json:"model_profile"`
|
||||||
|
Done24h int64 `json:"done_24h"`
|
||||||
|
AvgDurationSeconds int64 `json:"avg_duration_seconds"`
|
||||||
|
AvgAttempts int64 `json:"avg_attempts"`
|
||||||
|
Retried24h int64 `json:"retried_24h"`
|
||||||
|
}
|
||||||
|
|
||||||
type OwnerStat struct {
|
type OwnerStat struct {
|
||||||
OwnerService string `json:"owner_service"`
|
OwnerService string `json:"owner_service"`
|
||||||
TaskType string `json:"task_type"`
|
TaskType string `json:"task_type"`
|
||||||
@@ -132,4 +142,5 @@ type Stats struct {
|
|||||||
Queues []QueueStat `json:"queues"`
|
Queues []QueueStat `json:"queues"`
|
||||||
Owners []OwnerStat `json:"owners,omitempty"`
|
Owners []OwnerStat `json:"owners,omitempty"`
|
||||||
Errors []ErrorStat `json:"errors,omitempty"`
|
Errors []ErrorStat `json:"errors,omitempty"`
|
||||||
|
Stages []StageStat `json:"stages,omitempty"`
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -674,7 +674,46 @@ ORDER BY owner_service, last_24h DESC, total DESC
|
|||||||
}
|
}
|
||||||
out.Errors = append(out.Errors, stat)
|
out.Errors = append(out.Errors, stat)
|
||||||
}
|
}
|
||||||
return out, errorRows.Err()
|
if err := errorRows.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
stageRows, err := s.pool.Query(ctx, `
|
||||||
|
SELECT owner_service,
|
||||||
|
task_type,
|
||||||
|
model_profile,
|
||||||
|
count(*) AS done_24h,
|
||||||
|
COALESCE(ROUND(AVG(EXTRACT(EPOCH FROM (completed_at - started_at))))::bigint, 0) AS avg_duration_seconds,
|
||||||
|
COALESCE(ROUND(AVG(attempts))::bigint, 0) AS avg_attempts,
|
||||||
|
count(*) FILTER (WHERE attempts > 1) AS retried_24h
|
||||||
|
FROM ai_jobs
|
||||||
|
WHERE status = 'done'
|
||||||
|
AND started_at IS NOT NULL
|
||||||
|
AND completed_at IS NOT NULL
|
||||||
|
AND completed_at > NOW() - INTERVAL '24 hours'
|
||||||
|
GROUP BY owner_service, task_type, model_profile
|
||||||
|
ORDER BY owner_service, task_type, model_profile
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer stageRows.Close()
|
||||||
|
for stageRows.Next() {
|
||||||
|
var stat model.StageStat
|
||||||
|
if err := stageRows.Scan(
|
||||||
|
&stat.OwnerService,
|
||||||
|
&stat.TaskType,
|
||||||
|
&stat.ModelProfile,
|
||||||
|
&stat.Done24h,
|
||||||
|
&stat.AvgDurationSeconds,
|
||||||
|
&stat.AvgAttempts,
|
||||||
|
&stat.Retried24h,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out.Stages = append(out.Stages, stat)
|
||||||
|
}
|
||||||
|
return out, stageRows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func scanJobSummary(row pgx.Row) (*model.JobSummary, error) {
|
func scanJobSummary(row pgx.Row) (*model.JobSummary, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user