feat: expose monitoring tg ai job health
This commit is contained in:
@@ -264,6 +264,7 @@ func (a *app) handleHealthDetail(w http.ResponseWriter, r *http.Request) {
|
|||||||
a.probePostgres(ctx),
|
a.probePostgres(ctx),
|
||||||
a.probeAIService(ctx),
|
a.probeAIService(ctx),
|
||||||
a.probeClassificationQueue(ctx),
|
a.probeClassificationQueue(ctx),
|
||||||
|
a.probeAIJobs(ctx),
|
||||||
a.probePoller(ctx),
|
a.probePoller(ctx),
|
||||||
a.probeMediaStorage(ctx),
|
a.probeMediaStorage(ctx),
|
||||||
a.probeMediaMetadata(ctx),
|
a.probeMediaMetadata(ctx),
|
||||||
@@ -333,6 +334,64 @@ func (a *app) probeClassificationQueue(ctx context.Context) componentProbe {
|
|||||||
return componentProbe{Name: "classification_queue", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
return componentProbe{Name: "classification_queue", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *app) probeAIJobs(ctx context.Context) componentProbe {
|
||||||
|
start := time.Now()
|
||||||
|
if !a.cfg.LLMEnabled {
|
||||||
|
return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm disabled"}
|
||||||
|
}
|
||||||
|
stats, err := a.ai.Stats(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||||
|
}
|
||||||
|
var pending, running, staleRunning, failed, failed24h int64
|
||||||
|
for _, row := range stats.Backlog {
|
||||||
|
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pending += row.Pending
|
||||||
|
running += row.Running
|
||||||
|
staleRunning += row.StaleRunning
|
||||||
|
}
|
||||||
|
for _, row := range stats.Owners {
|
||||||
|
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if row.Status == "failed" {
|
||||||
|
failed += row.Total
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, row := range stats.Errors {
|
||||||
|
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
failed24h += row.Last24h
|
||||||
|
}
|
||||||
|
if staleRunning > 0 || failed24h > 0 {
|
||||||
|
return componentProbe{
|
||||||
|
Name: "ai_jobs",
|
||||||
|
Status: "down",
|
||||||
|
LatencyMs: time.Since(start).Milliseconds(),
|
||||||
|
Error: "pending=" + strconv.FormatInt(pending, 10) +
|
||||||
|
" running=" + strconv.FormatInt(running, 10) +
|
||||||
|
" stale_running=" + strconv.FormatInt(staleRunning, 10) +
|
||||||
|
" failed=" + strconv.FormatInt(failed, 10) +
|
||||||
|
" failed_24h=" + strconv.FormatInt(failed24h, 10),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if pending > 0 || running > 0 || failed > 0 {
|
||||||
|
return componentProbe{
|
||||||
|
Name: "ai_jobs",
|
||||||
|
Status: "degraded",
|
||||||
|
LatencyMs: time.Since(start).Milliseconds(),
|
||||||
|
Error: "pending=" + strconv.FormatInt(pending, 10) +
|
||||||
|
" running=" + strconv.FormatInt(running, 10) +
|
||||||
|
" stale_running=" + strconv.FormatInt(staleRunning, 10) +
|
||||||
|
" failed=" + strconv.FormatInt(failed, 10),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return componentProbe{Name: "ai_jobs", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||||
|
}
|
||||||
|
|
||||||
func (a *app) probePoller(ctx context.Context) componentProbe {
|
func (a *app) probePoller(ctx context.Context) componentProbe {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
staleAfter := maxInt(a.cfg.PollIntervalSeconds*3, 900)
|
staleAfter := maxInt(a.cfg.PollIntervalSeconds*3, 900)
|
||||||
|
|||||||
@@ -83,6 +83,41 @@ type ProviderStatus struct {
|
|||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Stats struct {
|
||||||
|
At time.Time `json:"at"`
|
||||||
|
Owners []OwnerStat `json:"owners,omitempty"`
|
||||||
|
Errors []ErrorStat `json:"errors,omitempty"`
|
||||||
|
Backlog []BacklogStat `json:"backlog,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type OwnerStat struct {
|
||||||
|
OwnerService string `json:"owner_service"`
|
||||||
|
TaskType string `json:"task_type"`
|
||||||
|
ModelProfile string `json:"model_profile"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Total int64 `json:"total"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ErrorStat struct {
|
||||||
|
OwnerService string `json:"owner_service,omitempty"`
|
||||||
|
TaskType string `json:"task_type"`
|
||||||
|
ModelProfile string `json:"model_profile"`
|
||||||
|
ErrorCode string `json:"error_code"`
|
||||||
|
Total int64 `json:"total"`
|
||||||
|
Last24h int64 `json:"last_24h"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type BacklogStat struct {
|
||||||
|
OwnerService string `json:"owner_service"`
|
||||||
|
TaskType string `json:"task_type"`
|
||||||
|
ModelProfile string `json:"model_profile"`
|
||||||
|
Pending int64 `json:"pending"`
|
||||||
|
Running int64 `json:"running"`
|
||||||
|
StaleRunning int64 `json:"stale_running"`
|
||||||
|
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
|
||||||
|
OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"`
|
||||||
|
}
|
||||||
|
|
||||||
func New(baseURL, token string, timeout time.Duration) *Client {
|
func New(baseURL, token string, timeout time.Duration) *Client {
|
||||||
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
|
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
|
||||||
if baseURL == "" {
|
if baseURL == "" {
|
||||||
@@ -223,6 +258,29 @@ func (c *Client) ProvidersStatus(ctx context.Context) (*ProvidersStatus, error)
|
|||||||
return &out, nil
|
return &out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) Stats(ctx context.Context) (*Stats, error) {
|
||||||
|
if c == nil {
|
||||||
|
return nil, fmt.Errorf("ai-service not configured")
|
||||||
|
}
|
||||||
|
req, err := c.request(ctx, http.MethodGet, "/api/v1/stats", nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp, err := c.http.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("ai stats: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
|
return nil, fmt.Errorf("ai stats: http %d: %s", resp.StatusCode, readSmall(resp.Body))
|
||||||
|
}
|
||||||
|
var out Stats
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode ai stats: %w", err)
|
||||||
|
}
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) {
|
func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) {
|
||||||
var r io.Reader
|
var r io.Reader
|
||||||
if body != nil {
|
if body != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user