Compare commits

...

6 Commits

Author SHA1 Message Date
Grendgi
b81a8ee6be feat: cancel ai jobs by id
Some checks failed
CI / hygiene (push) Successful in 1s
Build and Deploy / build-and-deploy (push) Successful in 42s
CI / test (push) Failing after 21s
2026-06-17 17:39:29 +03:00
Grendgi
63553fba33 feat: version ai result schemas 2026-06-17 16:46:03 +03:00
Grendgi
f32265400b feat: expand ai retry policy 2026-06-17 16:39:58 +03:00
Grendgi
aad905c2c8 fix: expose ai health component errors 2026-06-17 16:35:47 +03:00
Grendgi
22d85ce646 fix: align ai health status contract 2026-06-17 16:34:44 +03:00
Grendgi
3c124c5f5a feat: expose ai service health detail 2026-06-17 16:31:22 +03:00
10 changed files with 408 additions and 25 deletions

View File

@@ -43,7 +43,8 @@ Input can be either explicit messages:
``` ```
or compact `system` / `user` fields. The completed job result contains or compact `system` / `user` fields. The completed job result contains
`content`, `model`, `usage` and `duration_ms`. `schema_version=ai.chat_result.v1`, `content`, `model`, `usage` and
`duration_ms`.
`call_analysis` and `transcript_summary` use the same input contract as `call_analysis` and `transcript_summary` use the same input contract as
`llm_chat`; callers may include domain metadata fields in `input`, but the `llm_chat`; callers may include domain metadata fields in `input`, but the
@@ -55,6 +56,9 @@ worker only reads chat fields such as `system`, `user`, `messages`,
`/v1/audio/transcriptions` endpoint. The returned `segments` field stays `/v1/audio/transcriptions` endpoint. The returned `segments` field stays
compatible with telephony. If the provider returns one long segment, AI Service compatible with telephony. If the provider returns one long segment, AI Service
splits it into smaller transcript segments without inventing speaker labels. splits it into smaller transcript segments without inventing speaker labels.
The completed job result contains
`schema_version=ai.transcription_result.v1`, `provider`, `model`, `language`,
`segments`, optional provider `attempts` and `duration_ms`.
AI-server compose snippet for Whisper Large v3 lives in AI-server compose snippet for Whisper Large v3 lives in
`deploy/ai-server/docker-compose.audio.yml`: `deploy/ai-server/docker-compose.audio.yml`:
@@ -85,6 +89,8 @@ scheduling.
returning secrets. returning secrets.
- `GET /api/v1/infra/status` returns AI-server sidecar telemetry - `GET /api/v1/infra/status` returns AI-server sidecar telemetry
(GPU, containers and vLLM live metrics) when configured. (GPU, containers and vLLM live metrics) when configured.
- `GET /health/detail` returns PostgreSQL, provider, queue, error, throughput
and infra components for Portal `admin/health`.
- `GET /healthz` returns process health. - `GET /healthz` returns process health.
- `GET /readyz` checks PostgreSQL readiness. - `GET /readyz` checks PostgreSQL readiness.
- Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`: - Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`:
@@ -94,6 +100,34 @@ All `/api/v1/*` endpoints require `Authorization: Bearer <AI_SERVICE_TOKEN>`
when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open
for Kubernetes probes. for Kubernetes probes.
## Retry policy
Workers store a normalized `error_code` on failed jobs. AI Service requeues only
explicitly retryable categories while attempts remain.
| Category | Retry | Delay |
| --- | --- | --- |
| `provider_unavailable`, `model_unavailable`, `provider_error`, `dependency_error`, `timeout`, `storage_error`, `stale_worker` | yes | 30s |
| `bad_response`, `transcript_hallucination`, `transcript_incomplete`, `internal_error`, `unknown` | yes | 2m |
| `bad_audio`, `bad_input`, `context_length`, `unsupported_task`, `cancelled` | no | - |
Domain services may still expose manual retry for terminal errors after the
underlying data or prompt is corrected.
## Result schemas
AI Service result payloads are versioned with `schema_version`. Consumers should
ignore unknown fields and reject only unsupported major schema names.
Current schemas:
- `ai.chat_result.v1`: `{schema_version, content, model, usage?, duration_ms}`.
- `ai.transcription_result.v1`:
`{schema_version, provider?, model?, attempts?, language, segments, duration_ms}`.
New optional fields may be added to a `v1` schema without a breaking change.
Breaking shape changes require a new schema name.
## Configuration ## Configuration
- `HTTP_HOST`, default `0.0.0.0` - `HTTP_HOST`, default `0.0.0.0`

241
internal/httpapi/health.go Normal file
View File

@@ -0,0 +1,241 @@
package httpapi
import (
"context"
"net/http"
"strings"
"time"
"ai-service/internal/model"
"ai-service/internal/transcription"
)
type healthDetailResponse struct {
Status string `json:"status"`
Generated time.Time `json:"generated_at"`
Components []healthComponent `json:"components"`
}
type healthComponent struct {
Name string `json:"name"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
Data map[string]any `json:"data,omitempty"`
}
func (s *Server) handleHealthDetail(w http.ResponseWriter, r *http.Request) {
ctx, cancel := contextWithTimeout(r, 12*time.Second)
defer cancel()
resp := healthDetailResponse{
Status: "ok",
Generated: time.Now().UTC(),
}
if err := s.store.Ping(ctx); err != nil {
resp.Components = append(resp.Components, healthComponent{
Name: "postgres",
Status: "down",
Error: err.Error(),
})
resp.Status = worseHealthStatus(resp.Status, "down")
writeJSON(w, http.StatusServiceUnavailable, resp)
return
}
resp.Components = append(resp.Components, healthComponent{Name: "postgres", Status: "ok"})
stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
if err != nil {
resp.Components = append(resp.Components, healthComponent{
Name: "queue",
Status: "down",
Error: err.Error(),
})
resp.Status = worseHealthStatus(resp.Status, "down")
writeJSON(w, http.StatusServiceUnavailable, resp)
return
}
for _, component := range []healthComponent{
s.healthProviders(ctx),
healthQueue(stats),
healthErrors(stats),
healthThroughput(stats),
healthInfra(loadInfraSnapshot(r, s.cfg)),
} {
resp.Components = append(resp.Components, component)
resp.Status = worseHealthStatus(resp.Status, component.Status)
}
statusCode := http.StatusOK
if resp.Status == "down" {
statusCode = http.StatusServiceUnavailable
}
writeJSON(w, statusCode, resp)
}
func (s *Server) healthProviders(ctx context.Context) healthComponent {
providers := []providerStatus{
s.checkLLM(ctx),
s.checkAudioLLM(ctx, transcription.ProviderWhisperLargeV3, s.cfg.AudioBaseURL, s.cfg.AudioAPIKey, s.cfg.AudioModel, s.cfg.AudioTimeout),
}
status := "ok"
messages := make([]string, 0)
for _, provider := range providers {
switch {
case !provider.Configured:
status = worseHealthStatus(status, "degraded")
messages = append(messages, provider.Name+" not configured")
case !provider.OK:
status = worseHealthStatus(status, "down")
if provider.Error != "" {
messages = append(messages, provider.Name+": "+provider.Error)
} else {
messages = append(messages, provider.Name+" unavailable")
}
case provider.Stale:
status = worseHealthStatus(status, "degraded")
if provider.Error != "" {
messages = append(messages, provider.Name+": "+provider.Error)
}
}
}
return healthComponent{
Name: "providers",
Status: status,
Error: strings.Join(messages, "; "),
Data: map[string]any{
"providers": providers,
},
}
}
func healthQueue(stats *model.Stats) healthComponent {
var pending, running, staleRunning int64
var oldestPendingAgeSeconds, oldestRunningAgeSeconds int64
for _, row := range stats.Backlog {
pending += row.Pending
running += row.Running
staleRunning += row.StaleRunning
if row.OldestPendingAgeSeconds > oldestPendingAgeSeconds {
oldestPendingAgeSeconds = row.OldestPendingAgeSeconds
}
if row.OldestRunningAgeSeconds > oldestRunningAgeSeconds {
oldestRunningAgeSeconds = row.OldestRunningAgeSeconds
}
}
status := "ok"
message := ""
if staleRunning > 0 {
status = "degraded"
message = "there are stale running jobs"
}
return healthComponent{
Name: "queue",
Status: status,
Error: message,
Data: map[string]any{
"pending": pending,
"running": running,
"stale_running": staleRunning,
"oldest_pending_age_seconds": oldestPendingAgeSeconds,
"oldest_running_age_seconds": oldestRunningAgeSeconds,
"backlog": stats.Backlog,
"queue_status_totals": stats.Queues,
"owner_status_totals": stats.Owners,
},
}
}
func healthErrors(stats *model.Stats) healthComponent {
var failedTotal, failed24h int64
for _, row := range stats.Errors {
failedTotal += row.Total
failed24h += row.Last24h
}
status := "ok"
message := ""
if failed24h > 0 {
status = "degraded"
message = "there are failed jobs in the last 24 hours"
}
return healthComponent{
Name: "errors",
Status: status,
Error: message,
Data: map[string]any{
"failed_total": failedTotal,
"failed_24h": failed24h,
"by_code": stats.Errors,
},
}
}
func healthThroughput(stats *model.Stats) healthComponent {
var done24h, retried24h int64
for _, row := range stats.Stages {
done24h += row.Done24h
retried24h += row.Retried24h
}
pendingByStage := make(map[string]int64)
for _, row := range stats.Backlog {
pendingByStage[row.TaskType+"|"+row.ModelProfile] += row.Pending + row.Running
}
doneByStage := make(map[string]int64)
for _, row := range stats.Stages {
doneByStage[row.TaskType+"|"+row.ModelProfile] += row.Done24h
}
stuckStages := make([]string, 0)
for key, total := range pendingByStage {
if total > 0 && doneByStage[key] == 0 {
stuckStages = append(stuckStages, key)
}
}
status := "ok"
message := ""
if len(stuckStages) > 0 {
status = "degraded"
message = "some active queues have no completed jobs in the last 24 hours"
}
return healthComponent{
Name: "throughput",
Status: status,
Error: message,
Data: map[string]any{
"done_24h": done24h,
"retried_24h": retried24h,
"stuck_stages": stuckStages,
"stages": stats.Stages,
},
}
}
func healthInfra(infra infraStatusResponse) healthComponent {
status := "ok"
message := ""
if infra.SidecarError != "" {
status = "degraded"
message = infra.SidecarError
}
return healthComponent{
Name: "infra",
Status: status,
Error: message,
Data: map[string]any{
"sidecar": infra.Sidecar,
},
}
}
func worseHealthStatus(current, next string) string {
if current == "down" || next == "down" {
return "down"
}
if current == "degraded" || next == "degraded" {
return "degraded"
}
return "ok"
}

View File

@@ -41,6 +41,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
case r.Method == http.MethodGet && path == "/readyz": case r.Method == http.MethodGet && path == "/readyz":
s.handleReady(w, r) s.handleReady(w, r)
case r.Method == http.MethodGet && path == "/health/detail":
s.handleHealthDetail(w, r)
case r.Method == http.MethodGet && path == "/": case r.Method == http.MethodGet && path == "/":
writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"}) writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"})
case r.Method == http.MethodPost && path == "/api/v1/jobs": case r.Method == http.MethodPost && path == "/api/v1/jobs":
@@ -59,6 +61,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handleGetJob(w, r, path) s.handleGetJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"): case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"):
s.handleRetryJob(w, r, path) s.handleRetryJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/cancel"):
s.handleCancelJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"): case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"):
s.handleCompleteJob(w, r, path) s.handleCompleteJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"): case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"):
@@ -265,7 +269,7 @@ func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request, path strin
} }
func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) { func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) {
id, err := jobIDFromPath(path, true) id, err := jobIDFromActionPath(path, "retry")
if err != nil { if err != nil {
writeError(w, http.StatusBadRequest, err.Error()) writeError(w, http.StatusBadRequest, err.Error())
return return
@@ -284,6 +288,26 @@ func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path str
writeJSON(w, http.StatusOK, job) writeJSON(w, http.StatusOK, job)
} }
func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request, path string) {
id, err := jobIDFromActionPath(path, "cancel")
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
ctx, cancel := contextWithTimeout(r, 8*time.Second)
defer cancel()
job, err := s.store.CancelJob(ctx, id)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if job == nil {
writeError(w, http.StatusNotFound, "cancellable job not found")
return
}
writeJSON(w, http.StatusOK, job)
}
func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) { func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) {
id, err := jobIDFromActionPath(path, "complete") id, err := jobIDFromActionPath(path, "complete")
if err != nil { if err != nil {

View File

@@ -38,11 +38,14 @@ type Usage struct {
TotalTokens int `json:"total_tokens"` TotalTokens int `json:"total_tokens"`
} }
const ChatResultSchemaVersion = "ai.chat_result.v1"
type ChatResult struct { type ChatResult struct {
Content string `json:"content"` SchemaVersion string `json:"schema_version"`
Model string `json:"model"` Content string `json:"content"`
Usage *Usage `json:"usage,omitempty"` Model string `json:"model"`
DurationMS int64 `json:"duration_ms"` Usage *Usage `json:"usage,omitempty"`
DurationMS int64 `json:"duration_ms"`
} }
type chatRequest struct { type chatRequest struct {
@@ -137,10 +140,11 @@ func (c *Client) Chat(ctx context.Context, in ChatInput) (*ChatResult, error) {
modelName = c.model modelName = c.model
} }
return &ChatResult{ return &ChatResult{
Content: out.Choices[0].Message.Content, SchemaVersion: ChatResultSchemaVersion,
Model: modelName, Content: out.Choices[0].Message.Content,
Usage: out.Usage, Model: modelName,
DurationMS: duration.Milliseconds(), Usage: out.Usage,
DurationMS: duration.Milliseconds(),
}, nil }, nil
} }

View File

@@ -0,0 +1,43 @@
package llm
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
)
func TestChatResultIncludesSchemaVersion(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/v1/chat/completions" {
t.Fatalf("path = %q, want /v1/chat/completions", r.URL.Path)
}
_ = json.NewEncoder(w).Encode(map[string]any{
"model": "qwen2.5-14b",
"choices": []map[string]any{
{"message": map[string]string{"role": "assistant", "content": `{"ok":true}`}},
},
"usage": map[string]int{
"prompt_tokens": 10,
"completion_tokens": 2,
"total_tokens": 12,
},
})
}))
defer server.Close()
client := New(server.URL, "", "fallback-model", 0)
got, err := client.Chat(t.Context(), ChatInput{User: "test", MaxTokens: 32})
if err != nil {
t.Fatalf("Chat: %v", err)
}
if got.SchemaVersion != ChatResultSchemaVersion {
t.Fatalf("schema_version = %q, want %q", got.SchemaVersion, ChatResultSchemaVersion)
}
if got.Content != `{"ok":true}` {
t.Fatalf("content = %q", got.Content)
}
if got.Usage == nil || got.Usage.TotalTokens != 12 {
t.Fatalf("usage = %#v", got.Usage)
}
}

View File

@@ -12,9 +12,9 @@ type failRetryPolicy struct {
func retryPolicyForError(errorCode string) failRetryPolicy { func retryPolicyForError(errorCode string) failRetryPolicy {
switch strings.TrimSpace(errorCode) { switch strings.TrimSpace(errorCode) {
case "provider_unavailable", "model_unavailable", "timeout", "storage_error", "stale_worker": case "provider_unavailable", "model_unavailable", "provider_error", "dependency_error", "timeout", "storage_error", "stale_worker":
return failRetryPolicy{Retryable: true, Delay: 30 * time.Second} return failRetryPolicy{Retryable: true, Delay: 30 * time.Second}
case "bad_response", "unknown": case "bad_response", "transcript_hallucination", "transcript_incomplete", "internal_error", "unknown":
return failRetryPolicy{Retryable: true, Delay: 2 * time.Minute} return failRetryPolicy{Retryable: true, Delay: 2 * time.Minute}
default: default:
return failRetryPolicy{} return failRetryPolicy{}

View File

@@ -14,13 +14,21 @@ func TestRetryPolicyForError(t *testing.T) {
}{ }{
{name: "provider unavailable", code: "provider_unavailable", retryable: true, delay: 30 * time.Second}, {name: "provider unavailable", code: "provider_unavailable", retryable: true, delay: 30 * time.Second},
{name: "model unavailable", code: "model_unavailable", retryable: true, delay: 30 * time.Second}, {name: "model unavailable", code: "model_unavailable", retryable: true, delay: 30 * time.Second},
{name: "provider error", code: "provider_error", retryable: true, delay: 30 * time.Second},
{name: "dependency error", code: "dependency_error", retryable: true, delay: 30 * time.Second},
{name: "timeout", code: "timeout", retryable: true, delay: 30 * time.Second}, {name: "timeout", code: "timeout", retryable: true, delay: 30 * time.Second},
{name: "storage", code: "storage_error", retryable: true, delay: 30 * time.Second}, {name: "storage", code: "storage_error", retryable: true, delay: 30 * time.Second},
{name: "stale worker", code: "stale_worker", retryable: true, delay: 30 * time.Second},
{name: "bad response", code: "bad_response", retryable: true, delay: 2 * time.Minute}, {name: "bad response", code: "bad_response", retryable: true, delay: 2 * time.Minute},
{name: "transcript hallucination", code: "transcript_hallucination", retryable: true, delay: 2 * time.Minute},
{name: "transcript incomplete", code: "transcript_incomplete", retryable: true, delay: 2 * time.Minute},
{name: "internal error", code: "internal_error", retryable: true, delay: 2 * time.Minute},
{name: "unknown", code: "unknown", retryable: true, delay: 2 * time.Minute}, {name: "unknown", code: "unknown", retryable: true, delay: 2 * time.Minute},
{name: "bad audio", code: "bad_audio"}, {name: "bad audio", code: "bad_audio"},
{name: "bad input", code: "bad_input"}, {name: "bad input", code: "bad_input"},
{name: "context length", code: "context_length"}, {name: "context length", code: "context_length"},
{name: "unsupported task", code: "unsupported_task"},
{name: "cancelled", code: "cancelled"},
} }
for _, tt := range tests { for _, tt := range tests {

View File

@@ -451,6 +451,28 @@ WHERE j.id = picked.id
return int(tag.RowsAffected()), nil return int(tag.RowsAffected()), nil
} }
func (s *Store) CancelJob(ctx context.Context, id uuid.UUID) (*model.Job, error) {
const q = `
UPDATE ai_jobs
SET status = 'cancelled',
completed_at = NOW(),
worker_id = NULL,
heartbeat_at = NULL,
updated_at = NOW()
WHERE id = $1
AND status IN ('pending', 'running')
RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, status,
attempts, max_attempts, input, result, error_code, error_message,
scheduled_at, started_at, completed_at, worker_id, heartbeat_at,
created_at, updated_at, idempotency_key
`
job, err := scanJob(s.pool.QueryRow(ctx, q, id))
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
return job, err
}
func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) { func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) {
normalizeFilter(&filter) normalizeFilter(&filter)
const q = ` const q = `

View File

@@ -60,15 +60,18 @@ type Segment struct {
Speaker string `json:"speaker,omitempty"` Speaker string `json:"speaker,omitempty"`
} }
const ResultSchemaVersion = "ai.transcription_result.v1"
type Result struct { type Result struct {
Provider string `json:"provider,omitempty"` SchemaVersion string `json:"schema_version"`
Model string `json:"model,omitempty"` Provider string `json:"provider,omitempty"`
Attempts []Attempt `json:"attempts,omitempty"` Model string `json:"model,omitempty"`
Language string `json:"language"` Attempts []Attempt `json:"attempts,omitempty"`
Segments []Segment `json:"segments"` Language string `json:"language"`
DiarizeError *string `json:"diarize_error,omitempty"` Segments []Segment `json:"segments"`
AlignError *string `json:"align_error,omitempty"` DiarizeError *string `json:"diarize_error,omitempty"`
DurationMS int64 `json:"duration_ms"` AlignError *string `json:"align_error,omitempty"`
DurationMS int64 `json:"duration_ms"`
} }
type Attempt struct { type Attempt struct {
@@ -199,11 +202,12 @@ func (c *Client) transcribeWithProvider(ctx context.Context, provider ProviderCo
attempt.Text = text attempt.Text = text
attempt.Segments = segments attempt.Segments = segments
return &Result{ return &Result{
Provider: provider.Name, SchemaVersion: ResultSchemaVersion,
Model: resp.Model, Provider: provider.Name,
Language: firstNonEmpty(resp.Language, in.Language, "unknown"), Model: resp.Model,
Segments: segments, Language: firstNonEmpty(resp.Language, in.Language, "unknown"),
DurationMS: duration.Milliseconds(), Segments: segments,
DurationMS: duration.Milliseconds(),
}, attempt, nil }, attempt, nil
} }

View File

@@ -84,6 +84,9 @@ func TestWhisperUsesAudioTranscriptionsEndpoint(t *testing.T) {
if len(got.Segments) != 2 || got.Segments[0].Text != "Алло, тест." || got.Segments[1].Start != 1.2 { if len(got.Segments) != 2 || got.Segments[0].Text != "Алло, тест." || got.Segments[1].Start != 1.2 {
t.Fatalf("segments = %#v", got.Segments) t.Fatalf("segments = %#v", got.Segments)
} }
if got.SchemaVersion != ResultSchemaVersion {
t.Fatalf("schema_version = %q, want %q", got.SchemaVersion, ResultSchemaVersion)
}
} }
func TestWhisperFallsBackToJSONWhenVerboseJSONUnsupported(t *testing.T) { func TestWhisperFallsBackToJSONWhenVerboseJSONUnsupported(t *testing.T) {