diff --git a/cmd/server/main.go b/cmd/server/main.go index 2894950..47b78a2 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -264,6 +264,7 @@ func (a *app) handleHealthDetail(w http.ResponseWriter, r *http.Request) { a.probePostgres(ctx), a.probeAIService(ctx), a.probeClassificationQueue(ctx), + a.probeAIJobs(ctx), a.probePoller(ctx), a.probeMediaStorage(ctx), a.probeMediaMetadata(ctx), @@ -333,6 +334,64 @@ func (a *app) probeClassificationQueue(ctx context.Context) componentProbe { return componentProbe{Name: "classification_queue", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} } +func (a *app) probeAIJobs(ctx context.Context) componentProbe { + start := time.Now() + if !a.cfg.LLMEnabled { + return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm disabled"} + } + stats, err := a.ai.Stats(ctx) + if err != nil { + return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} + } + var pending, running, staleRunning, failed, failed24h int64 + for _, row := range stats.Backlog { + if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" { + continue + } + pending += row.Pending + running += row.Running + staleRunning += row.StaleRunning + } + for _, row := range stats.Owners { + if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" { + continue + } + if row.Status == "failed" { + failed += row.Total + } + } + for _, row := range stats.Errors { + if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" { + continue + } + failed24h += row.Last24h + } + if staleRunning > 0 || failed24h > 0 { + return componentProbe{ + Name: "ai_jobs", + Status: "down", + LatencyMs: time.Since(start).Milliseconds(), + Error: "pending=" + strconv.FormatInt(pending, 10) + + " running=" + strconv.FormatInt(running, 10) + + " stale_running=" + strconv.FormatInt(staleRunning, 10) + + " failed=" + strconv.FormatInt(failed, 10) + + " failed_24h=" + strconv.FormatInt(failed24h, 10), + } + } + if pending > 0 || running > 0 || failed > 0 { + return componentProbe{ + Name: "ai_jobs", + Status: "degraded", + LatencyMs: time.Since(start).Milliseconds(), + Error: "pending=" + strconv.FormatInt(pending, 10) + + " running=" + strconv.FormatInt(running, 10) + + " stale_running=" + strconv.FormatInt(staleRunning, 10) + + " failed=" + strconv.FormatInt(failed, 10), + } + } + return componentProbe{Name: "ai_jobs", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} +} + func (a *app) probePoller(ctx context.Context) componentProbe { start := time.Now() staleAfter := maxInt(a.cfg.PollIntervalSeconds*3, 900) diff --git a/internal/aiservice/client.go b/internal/aiservice/client.go index c9f6291..06b2736 100644 --- a/internal/aiservice/client.go +++ b/internal/aiservice/client.go @@ -83,6 +83,41 @@ type ProviderStatus struct { Error string `json:"error,omitempty"` } +type Stats struct { + At time.Time `json:"at"` + Owners []OwnerStat `json:"owners,omitempty"` + Errors []ErrorStat `json:"errors,omitempty"` + Backlog []BacklogStat `json:"backlog,omitempty"` +} + +type OwnerStat struct { + OwnerService string `json:"owner_service"` + TaskType string `json:"task_type"` + ModelProfile string `json:"model_profile"` + Status string `json:"status"` + Total int64 `json:"total"` +} + +type ErrorStat struct { + OwnerService string `json:"owner_service,omitempty"` + TaskType string `json:"task_type"` + ModelProfile string `json:"model_profile"` + ErrorCode string `json:"error_code"` + Total int64 `json:"total"` + Last24h int64 `json:"last_24h"` +} + +type BacklogStat struct { + OwnerService string `json:"owner_service"` + TaskType string `json:"task_type"` + ModelProfile string `json:"model_profile"` + Pending int64 `json:"pending"` + Running int64 `json:"running"` + StaleRunning int64 `json:"stale_running"` + OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"` + OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"` +} + func New(baseURL, token string, timeout time.Duration) *Client { baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/") if baseURL == "" { @@ -223,6 +258,29 @@ func (c *Client) ProvidersStatus(ctx context.Context) (*ProvidersStatus, error) return &out, nil } +func (c *Client) Stats(ctx context.Context) (*Stats, error) { + if c == nil { + return nil, fmt.Errorf("ai-service not configured") + } + req, err := c.request(ctx, http.MethodGet, "/api/v1/stats", nil) + if err != nil { + return nil, err + } + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("ai stats: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("ai stats: http %d: %s", resp.StatusCode, readSmall(resp.Body)) + } + var out Stats + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("decode ai stats: %w", err) + } + return &out, nil +} + func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) { var r io.Reader if body != nil {