Compare commits
6 Commits
773f53f790
...
b81a8ee6be
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b81a8ee6be | ||
|
|
63553fba33 | ||
|
|
f32265400b | ||
|
|
aad905c2c8 | ||
|
|
22d85ce646 | ||
|
|
3c124c5f5a |
36
README.md
36
README.md
@@ -43,7 +43,8 @@ Input can be either explicit messages:
|
|||||||
```
|
```
|
||||||
|
|
||||||
or compact `system` / `user` fields. The completed job result contains
|
or compact `system` / `user` fields. The completed job result contains
|
||||||
`content`, `model`, `usage` and `duration_ms`.
|
`schema_version=ai.chat_result.v1`, `content`, `model`, `usage` and
|
||||||
|
`duration_ms`.
|
||||||
|
|
||||||
`call_analysis` and `transcript_summary` use the same input contract as
|
`call_analysis` and `transcript_summary` use the same input contract as
|
||||||
`llm_chat`; callers may include domain metadata fields in `input`, but the
|
`llm_chat`; callers may include domain metadata fields in `input`, but the
|
||||||
@@ -55,6 +56,9 @@ worker only reads chat fields such as `system`, `user`, `messages`,
|
|||||||
`/v1/audio/transcriptions` endpoint. The returned `segments` field stays
|
`/v1/audio/transcriptions` endpoint. The returned `segments` field stays
|
||||||
compatible with telephony. If the provider returns one long segment, AI Service
|
compatible with telephony. If the provider returns one long segment, AI Service
|
||||||
splits it into smaller transcript segments without inventing speaker labels.
|
splits it into smaller transcript segments without inventing speaker labels.
|
||||||
|
The completed job result contains
|
||||||
|
`schema_version=ai.transcription_result.v1`, `provider`, `model`, `language`,
|
||||||
|
`segments`, optional provider `attempts` and `duration_ms`.
|
||||||
|
|
||||||
AI-server compose snippet for Whisper Large v3 lives in
|
AI-server compose snippet for Whisper Large v3 lives in
|
||||||
`deploy/ai-server/docker-compose.audio.yml`:
|
`deploy/ai-server/docker-compose.audio.yml`:
|
||||||
@@ -85,6 +89,8 @@ scheduling.
|
|||||||
returning secrets.
|
returning secrets.
|
||||||
- `GET /api/v1/infra/status` returns AI-server sidecar telemetry
|
- `GET /api/v1/infra/status` returns AI-server sidecar telemetry
|
||||||
(GPU, containers and vLLM live metrics) when configured.
|
(GPU, containers and vLLM live metrics) when configured.
|
||||||
|
- `GET /health/detail` returns PostgreSQL, provider, queue, error, throughput
|
||||||
|
and infra components for Portal `admin/health`.
|
||||||
- `GET /healthz` returns process health.
|
- `GET /healthz` returns process health.
|
||||||
- `GET /readyz` checks PostgreSQL readiness.
|
- `GET /readyz` checks PostgreSQL readiness.
|
||||||
- Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`:
|
- Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`:
|
||||||
@@ -94,6 +100,34 @@ All `/api/v1/*` endpoints require `Authorization: Bearer <AI_SERVICE_TOKEN>`
|
|||||||
when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open
|
when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open
|
||||||
for Kubernetes probes.
|
for Kubernetes probes.
|
||||||
|
|
||||||
|
## Retry policy
|
||||||
|
|
||||||
|
Workers store a normalized `error_code` on failed jobs. AI Service requeues only
|
||||||
|
explicitly retryable categories while attempts remain.
|
||||||
|
|
||||||
|
| Category | Retry | Delay |
|
||||||
|
| --- | --- | --- |
|
||||||
|
| `provider_unavailable`, `model_unavailable`, `provider_error`, `dependency_error`, `timeout`, `storage_error`, `stale_worker` | yes | 30s |
|
||||||
|
| `bad_response`, `transcript_hallucination`, `transcript_incomplete`, `internal_error`, `unknown` | yes | 2m |
|
||||||
|
| `bad_audio`, `bad_input`, `context_length`, `unsupported_task`, `cancelled` | no | - |
|
||||||
|
|
||||||
|
Domain services may still expose manual retry for terminal errors after the
|
||||||
|
underlying data or prompt is corrected.
|
||||||
|
|
||||||
|
## Result schemas
|
||||||
|
|
||||||
|
AI Service result payloads are versioned with `schema_version`. Consumers should
|
||||||
|
ignore unknown fields and reject only unsupported major schema names.
|
||||||
|
|
||||||
|
Current schemas:
|
||||||
|
|
||||||
|
- `ai.chat_result.v1`: `{schema_version, content, model, usage?, duration_ms}`.
|
||||||
|
- `ai.transcription_result.v1`:
|
||||||
|
`{schema_version, provider?, model?, attempts?, language, segments, duration_ms}`.
|
||||||
|
|
||||||
|
New optional fields may be added to a `v1` schema without a breaking change.
|
||||||
|
Breaking shape changes require a new schema name.
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
|
|
||||||
- `HTTP_HOST`, default `0.0.0.0`
|
- `HTTP_HOST`, default `0.0.0.0`
|
||||||
|
|||||||
241
internal/httpapi/health.go
Normal file
241
internal/httpapi/health.go
Normal file
@@ -0,0 +1,241 @@
|
|||||||
|
package httpapi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"ai-service/internal/model"
|
||||||
|
"ai-service/internal/transcription"
|
||||||
|
)
|
||||||
|
|
||||||
|
type healthDetailResponse struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
Generated time.Time `json:"generated_at"`
|
||||||
|
Components []healthComponent `json:"components"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type healthComponent struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
Data map[string]any `json:"data,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleHealthDetail(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx, cancel := contextWithTimeout(r, 12*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
resp := healthDetailResponse{
|
||||||
|
Status: "ok",
|
||||||
|
Generated: time.Now().UTC(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.store.Ping(ctx); err != nil {
|
||||||
|
resp.Components = append(resp.Components, healthComponent{
|
||||||
|
Name: "postgres",
|
||||||
|
Status: "down",
|
||||||
|
Error: err.Error(),
|
||||||
|
})
|
||||||
|
resp.Status = worseHealthStatus(resp.Status, "down")
|
||||||
|
writeJSON(w, http.StatusServiceUnavailable, resp)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resp.Components = append(resp.Components, healthComponent{Name: "postgres", Status: "ok"})
|
||||||
|
|
||||||
|
stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
|
||||||
|
if err != nil {
|
||||||
|
resp.Components = append(resp.Components, healthComponent{
|
||||||
|
Name: "queue",
|
||||||
|
Status: "down",
|
||||||
|
Error: err.Error(),
|
||||||
|
})
|
||||||
|
resp.Status = worseHealthStatus(resp.Status, "down")
|
||||||
|
writeJSON(w, http.StatusServiceUnavailable, resp)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, component := range []healthComponent{
|
||||||
|
s.healthProviders(ctx),
|
||||||
|
healthQueue(stats),
|
||||||
|
healthErrors(stats),
|
||||||
|
healthThroughput(stats),
|
||||||
|
healthInfra(loadInfraSnapshot(r, s.cfg)),
|
||||||
|
} {
|
||||||
|
resp.Components = append(resp.Components, component)
|
||||||
|
resp.Status = worseHealthStatus(resp.Status, component.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
statusCode := http.StatusOK
|
||||||
|
if resp.Status == "down" {
|
||||||
|
statusCode = http.StatusServiceUnavailable
|
||||||
|
}
|
||||||
|
writeJSON(w, statusCode, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) healthProviders(ctx context.Context) healthComponent {
|
||||||
|
providers := []providerStatus{
|
||||||
|
s.checkLLM(ctx),
|
||||||
|
s.checkAudioLLM(ctx, transcription.ProviderWhisperLargeV3, s.cfg.AudioBaseURL, s.cfg.AudioAPIKey, s.cfg.AudioModel, s.cfg.AudioTimeout),
|
||||||
|
}
|
||||||
|
status := "ok"
|
||||||
|
messages := make([]string, 0)
|
||||||
|
for _, provider := range providers {
|
||||||
|
switch {
|
||||||
|
case !provider.Configured:
|
||||||
|
status = worseHealthStatus(status, "degraded")
|
||||||
|
messages = append(messages, provider.Name+" not configured")
|
||||||
|
case !provider.OK:
|
||||||
|
status = worseHealthStatus(status, "down")
|
||||||
|
if provider.Error != "" {
|
||||||
|
messages = append(messages, provider.Name+": "+provider.Error)
|
||||||
|
} else {
|
||||||
|
messages = append(messages, provider.Name+" unavailable")
|
||||||
|
}
|
||||||
|
case provider.Stale:
|
||||||
|
status = worseHealthStatus(status, "degraded")
|
||||||
|
if provider.Error != "" {
|
||||||
|
messages = append(messages, provider.Name+": "+provider.Error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return healthComponent{
|
||||||
|
Name: "providers",
|
||||||
|
Status: status,
|
||||||
|
Error: strings.Join(messages, "; "),
|
||||||
|
Data: map[string]any{
|
||||||
|
"providers": providers,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func healthQueue(stats *model.Stats) healthComponent {
|
||||||
|
var pending, running, staleRunning int64
|
||||||
|
var oldestPendingAgeSeconds, oldestRunningAgeSeconds int64
|
||||||
|
for _, row := range stats.Backlog {
|
||||||
|
pending += row.Pending
|
||||||
|
running += row.Running
|
||||||
|
staleRunning += row.StaleRunning
|
||||||
|
if row.OldestPendingAgeSeconds > oldestPendingAgeSeconds {
|
||||||
|
oldestPendingAgeSeconds = row.OldestPendingAgeSeconds
|
||||||
|
}
|
||||||
|
if row.OldestRunningAgeSeconds > oldestRunningAgeSeconds {
|
||||||
|
oldestRunningAgeSeconds = row.OldestRunningAgeSeconds
|
||||||
|
}
|
||||||
|
}
|
||||||
|
status := "ok"
|
||||||
|
message := ""
|
||||||
|
if staleRunning > 0 {
|
||||||
|
status = "degraded"
|
||||||
|
message = "there are stale running jobs"
|
||||||
|
}
|
||||||
|
return healthComponent{
|
||||||
|
Name: "queue",
|
||||||
|
Status: status,
|
||||||
|
Error: message,
|
||||||
|
Data: map[string]any{
|
||||||
|
"pending": pending,
|
||||||
|
"running": running,
|
||||||
|
"stale_running": staleRunning,
|
||||||
|
"oldest_pending_age_seconds": oldestPendingAgeSeconds,
|
||||||
|
"oldest_running_age_seconds": oldestRunningAgeSeconds,
|
||||||
|
"backlog": stats.Backlog,
|
||||||
|
"queue_status_totals": stats.Queues,
|
||||||
|
"owner_status_totals": stats.Owners,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func healthErrors(stats *model.Stats) healthComponent {
|
||||||
|
var failedTotal, failed24h int64
|
||||||
|
for _, row := range stats.Errors {
|
||||||
|
failedTotal += row.Total
|
||||||
|
failed24h += row.Last24h
|
||||||
|
}
|
||||||
|
status := "ok"
|
||||||
|
message := ""
|
||||||
|
if failed24h > 0 {
|
||||||
|
status = "degraded"
|
||||||
|
message = "there are failed jobs in the last 24 hours"
|
||||||
|
}
|
||||||
|
return healthComponent{
|
||||||
|
Name: "errors",
|
||||||
|
Status: status,
|
||||||
|
Error: message,
|
||||||
|
Data: map[string]any{
|
||||||
|
"failed_total": failedTotal,
|
||||||
|
"failed_24h": failed24h,
|
||||||
|
"by_code": stats.Errors,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func healthThroughput(stats *model.Stats) healthComponent {
|
||||||
|
var done24h, retried24h int64
|
||||||
|
for _, row := range stats.Stages {
|
||||||
|
done24h += row.Done24h
|
||||||
|
retried24h += row.Retried24h
|
||||||
|
}
|
||||||
|
|
||||||
|
pendingByStage := make(map[string]int64)
|
||||||
|
for _, row := range stats.Backlog {
|
||||||
|
pendingByStage[row.TaskType+"|"+row.ModelProfile] += row.Pending + row.Running
|
||||||
|
}
|
||||||
|
doneByStage := make(map[string]int64)
|
||||||
|
for _, row := range stats.Stages {
|
||||||
|
doneByStage[row.TaskType+"|"+row.ModelProfile] += row.Done24h
|
||||||
|
}
|
||||||
|
|
||||||
|
stuckStages := make([]string, 0)
|
||||||
|
for key, total := range pendingByStage {
|
||||||
|
if total > 0 && doneByStage[key] == 0 {
|
||||||
|
stuckStages = append(stuckStages, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
status := "ok"
|
||||||
|
message := ""
|
||||||
|
if len(stuckStages) > 0 {
|
||||||
|
status = "degraded"
|
||||||
|
message = "some active queues have no completed jobs in the last 24 hours"
|
||||||
|
}
|
||||||
|
return healthComponent{
|
||||||
|
Name: "throughput",
|
||||||
|
Status: status,
|
||||||
|
Error: message,
|
||||||
|
Data: map[string]any{
|
||||||
|
"done_24h": done24h,
|
||||||
|
"retried_24h": retried24h,
|
||||||
|
"stuck_stages": stuckStages,
|
||||||
|
"stages": stats.Stages,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func healthInfra(infra infraStatusResponse) healthComponent {
|
||||||
|
status := "ok"
|
||||||
|
message := ""
|
||||||
|
if infra.SidecarError != "" {
|
||||||
|
status = "degraded"
|
||||||
|
message = infra.SidecarError
|
||||||
|
}
|
||||||
|
return healthComponent{
|
||||||
|
Name: "infra",
|
||||||
|
Status: status,
|
||||||
|
Error: message,
|
||||||
|
Data: map[string]any{
|
||||||
|
"sidecar": infra.Sidecar,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func worseHealthStatus(current, next string) string {
|
||||||
|
if current == "down" || next == "down" {
|
||||||
|
return "down"
|
||||||
|
}
|
||||||
|
if current == "degraded" || next == "degraded" {
|
||||||
|
return "degraded"
|
||||||
|
}
|
||||||
|
return "ok"
|
||||||
|
}
|
||||||
@@ -41,6 +41,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||||
case r.Method == http.MethodGet && path == "/readyz":
|
case r.Method == http.MethodGet && path == "/readyz":
|
||||||
s.handleReady(w, r)
|
s.handleReady(w, r)
|
||||||
|
case r.Method == http.MethodGet && path == "/health/detail":
|
||||||
|
s.handleHealthDetail(w, r)
|
||||||
case r.Method == http.MethodGet && path == "/":
|
case r.Method == http.MethodGet && path == "/":
|
||||||
writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"})
|
writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"})
|
||||||
case r.Method == http.MethodPost && path == "/api/v1/jobs":
|
case r.Method == http.MethodPost && path == "/api/v1/jobs":
|
||||||
@@ -59,6 +61,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
s.handleGetJob(w, r, path)
|
s.handleGetJob(w, r, path)
|
||||||
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"):
|
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"):
|
||||||
s.handleRetryJob(w, r, path)
|
s.handleRetryJob(w, r, path)
|
||||||
|
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/cancel"):
|
||||||
|
s.handleCancelJob(w, r, path)
|
||||||
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"):
|
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"):
|
||||||
s.handleCompleteJob(w, r, path)
|
s.handleCompleteJob(w, r, path)
|
||||||
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"):
|
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"):
|
||||||
@@ -265,7 +269,7 @@ func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request, path strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) {
|
func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) {
|
||||||
id, err := jobIDFromPath(path, true)
|
id, err := jobIDFromActionPath(path, "retry")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(w, http.StatusBadRequest, err.Error())
|
writeError(w, http.StatusBadRequest, err.Error())
|
||||||
return
|
return
|
||||||
@@ -284,6 +288,26 @@ func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path str
|
|||||||
writeJSON(w, http.StatusOK, job)
|
writeJSON(w, http.StatusOK, job)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request, path string) {
|
||||||
|
id, err := jobIDFromActionPath(path, "cancel")
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
job, err := s.store.CancelJob(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if job == nil {
|
||||||
|
writeError(w, http.StatusNotFound, "cancellable job not found")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, job)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) {
|
func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) {
|
||||||
id, err := jobIDFromActionPath(path, "complete")
|
id, err := jobIDFromActionPath(path, "complete")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -38,7 +38,10 @@ type Usage struct {
|
|||||||
TotalTokens int `json:"total_tokens"`
|
TotalTokens int `json:"total_tokens"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const ChatResultSchemaVersion = "ai.chat_result.v1"
|
||||||
|
|
||||||
type ChatResult struct {
|
type ChatResult struct {
|
||||||
|
SchemaVersion string `json:"schema_version"`
|
||||||
Content string `json:"content"`
|
Content string `json:"content"`
|
||||||
Model string `json:"model"`
|
Model string `json:"model"`
|
||||||
Usage *Usage `json:"usage,omitempty"`
|
Usage *Usage `json:"usage,omitempty"`
|
||||||
@@ -137,6 +140,7 @@ func (c *Client) Chat(ctx context.Context, in ChatInput) (*ChatResult, error) {
|
|||||||
modelName = c.model
|
modelName = c.model
|
||||||
}
|
}
|
||||||
return &ChatResult{
|
return &ChatResult{
|
||||||
|
SchemaVersion: ChatResultSchemaVersion,
|
||||||
Content: out.Choices[0].Message.Content,
|
Content: out.Choices[0].Message.Content,
|
||||||
Model: modelName,
|
Model: modelName,
|
||||||
Usage: out.Usage,
|
Usage: out.Usage,
|
||||||
|
|||||||
43
internal/llm/client_test.go
Normal file
43
internal/llm/client_test.go
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
package llm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestChatResultIncludesSchemaVersion(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.URL.Path != "/v1/chat/completions" {
|
||||||
|
t.Fatalf("path = %q, want /v1/chat/completions", r.URL.Path)
|
||||||
|
}
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"model": "qwen2.5-14b",
|
||||||
|
"choices": []map[string]any{
|
||||||
|
{"message": map[string]string{"role": "assistant", "content": `{"ok":true}`}},
|
||||||
|
},
|
||||||
|
"usage": map[string]int{
|
||||||
|
"prompt_tokens": 10,
|
||||||
|
"completion_tokens": 2,
|
||||||
|
"total_tokens": 12,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := New(server.URL, "", "fallback-model", 0)
|
||||||
|
got, err := client.Chat(t.Context(), ChatInput{User: "test", MaxTokens: 32})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Chat: %v", err)
|
||||||
|
}
|
||||||
|
if got.SchemaVersion != ChatResultSchemaVersion {
|
||||||
|
t.Fatalf("schema_version = %q, want %q", got.SchemaVersion, ChatResultSchemaVersion)
|
||||||
|
}
|
||||||
|
if got.Content != `{"ok":true}` {
|
||||||
|
t.Fatalf("content = %q", got.Content)
|
||||||
|
}
|
||||||
|
if got.Usage == nil || got.Usage.TotalTokens != 12 {
|
||||||
|
t.Fatalf("usage = %#v", got.Usage)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -12,9 +12,9 @@ type failRetryPolicy struct {
|
|||||||
|
|
||||||
func retryPolicyForError(errorCode string) failRetryPolicy {
|
func retryPolicyForError(errorCode string) failRetryPolicy {
|
||||||
switch strings.TrimSpace(errorCode) {
|
switch strings.TrimSpace(errorCode) {
|
||||||
case "provider_unavailable", "model_unavailable", "timeout", "storage_error", "stale_worker":
|
case "provider_unavailable", "model_unavailable", "provider_error", "dependency_error", "timeout", "storage_error", "stale_worker":
|
||||||
return failRetryPolicy{Retryable: true, Delay: 30 * time.Second}
|
return failRetryPolicy{Retryable: true, Delay: 30 * time.Second}
|
||||||
case "bad_response", "unknown":
|
case "bad_response", "transcript_hallucination", "transcript_incomplete", "internal_error", "unknown":
|
||||||
return failRetryPolicy{Retryable: true, Delay: 2 * time.Minute}
|
return failRetryPolicy{Retryable: true, Delay: 2 * time.Minute}
|
||||||
default:
|
default:
|
||||||
return failRetryPolicy{}
|
return failRetryPolicy{}
|
||||||
|
|||||||
@@ -14,13 +14,21 @@ func TestRetryPolicyForError(t *testing.T) {
|
|||||||
}{
|
}{
|
||||||
{name: "provider unavailable", code: "provider_unavailable", retryable: true, delay: 30 * time.Second},
|
{name: "provider unavailable", code: "provider_unavailable", retryable: true, delay: 30 * time.Second},
|
||||||
{name: "model unavailable", code: "model_unavailable", retryable: true, delay: 30 * time.Second},
|
{name: "model unavailable", code: "model_unavailable", retryable: true, delay: 30 * time.Second},
|
||||||
|
{name: "provider error", code: "provider_error", retryable: true, delay: 30 * time.Second},
|
||||||
|
{name: "dependency error", code: "dependency_error", retryable: true, delay: 30 * time.Second},
|
||||||
{name: "timeout", code: "timeout", retryable: true, delay: 30 * time.Second},
|
{name: "timeout", code: "timeout", retryable: true, delay: 30 * time.Second},
|
||||||
{name: "storage", code: "storage_error", retryable: true, delay: 30 * time.Second},
|
{name: "storage", code: "storage_error", retryable: true, delay: 30 * time.Second},
|
||||||
|
{name: "stale worker", code: "stale_worker", retryable: true, delay: 30 * time.Second},
|
||||||
{name: "bad response", code: "bad_response", retryable: true, delay: 2 * time.Minute},
|
{name: "bad response", code: "bad_response", retryable: true, delay: 2 * time.Minute},
|
||||||
|
{name: "transcript hallucination", code: "transcript_hallucination", retryable: true, delay: 2 * time.Minute},
|
||||||
|
{name: "transcript incomplete", code: "transcript_incomplete", retryable: true, delay: 2 * time.Minute},
|
||||||
|
{name: "internal error", code: "internal_error", retryable: true, delay: 2 * time.Minute},
|
||||||
{name: "unknown", code: "unknown", retryable: true, delay: 2 * time.Minute},
|
{name: "unknown", code: "unknown", retryable: true, delay: 2 * time.Minute},
|
||||||
{name: "bad audio", code: "bad_audio"},
|
{name: "bad audio", code: "bad_audio"},
|
||||||
{name: "bad input", code: "bad_input"},
|
{name: "bad input", code: "bad_input"},
|
||||||
{name: "context length", code: "context_length"},
|
{name: "context length", code: "context_length"},
|
||||||
|
{name: "unsupported task", code: "unsupported_task"},
|
||||||
|
{name: "cancelled", code: "cancelled"},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
|||||||
@@ -451,6 +451,28 @@ WHERE j.id = picked.id
|
|||||||
return int(tag.RowsAffected()), nil
|
return int(tag.RowsAffected()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) CancelJob(ctx context.Context, id uuid.UUID) (*model.Job, error) {
|
||||||
|
const q = `
|
||||||
|
UPDATE ai_jobs
|
||||||
|
SET status = 'cancelled',
|
||||||
|
completed_at = NOW(),
|
||||||
|
worker_id = NULL,
|
||||||
|
heartbeat_at = NULL,
|
||||||
|
updated_at = NOW()
|
||||||
|
WHERE id = $1
|
||||||
|
AND status IN ('pending', 'running')
|
||||||
|
RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, status,
|
||||||
|
attempts, max_attempts, input, result, error_code, error_message,
|
||||||
|
scheduled_at, started_at, completed_at, worker_id, heartbeat_at,
|
||||||
|
created_at, updated_at, idempotency_key
|
||||||
|
`
|
||||||
|
job, err := scanJob(s.pool.QueryRow(ctx, q, id))
|
||||||
|
if errors.Is(err, pgx.ErrNoRows) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return job, err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) {
|
func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) {
|
||||||
normalizeFilter(&filter)
|
normalizeFilter(&filter)
|
||||||
const q = `
|
const q = `
|
||||||
|
|||||||
@@ -60,7 +60,10 @@ type Segment struct {
|
|||||||
Speaker string `json:"speaker,omitempty"`
|
Speaker string `json:"speaker,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const ResultSchemaVersion = "ai.transcription_result.v1"
|
||||||
|
|
||||||
type Result struct {
|
type Result struct {
|
||||||
|
SchemaVersion string `json:"schema_version"`
|
||||||
Provider string `json:"provider,omitempty"`
|
Provider string `json:"provider,omitempty"`
|
||||||
Model string `json:"model,omitempty"`
|
Model string `json:"model,omitempty"`
|
||||||
Attempts []Attempt `json:"attempts,omitempty"`
|
Attempts []Attempt `json:"attempts,omitempty"`
|
||||||
@@ -199,6 +202,7 @@ func (c *Client) transcribeWithProvider(ctx context.Context, provider ProviderCo
|
|||||||
attempt.Text = text
|
attempt.Text = text
|
||||||
attempt.Segments = segments
|
attempt.Segments = segments
|
||||||
return &Result{
|
return &Result{
|
||||||
|
SchemaVersion: ResultSchemaVersion,
|
||||||
Provider: provider.Name,
|
Provider: provider.Name,
|
||||||
Model: resp.Model,
|
Model: resp.Model,
|
||||||
Language: firstNonEmpty(resp.Language, in.Language, "unknown"),
|
Language: firstNonEmpty(resp.Language, in.Language, "unknown"),
|
||||||
|
|||||||
@@ -84,6 +84,9 @@ func TestWhisperUsesAudioTranscriptionsEndpoint(t *testing.T) {
|
|||||||
if len(got.Segments) != 2 || got.Segments[0].Text != "Алло, тест." || got.Segments[1].Start != 1.2 {
|
if len(got.Segments) != 2 || got.Segments[0].Text != "Алло, тест." || got.Segments[1].Start != 1.2 {
|
||||||
t.Fatalf("segments = %#v", got.Segments)
|
t.Fatalf("segments = %#v", got.Segments)
|
||||||
}
|
}
|
||||||
|
if got.SchemaVersion != ResultSchemaVersion {
|
||||||
|
t.Fatalf("schema_version = %q, want %q", got.SchemaVersion, ResultSchemaVersion)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWhisperFallsBackToJSONWhenVerboseJSONUnsupported(t *testing.T) {
|
func TestWhisperFallsBackToJSONWhenVerboseJSONUnsupported(t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user