Compare commits
10 Commits
c618ffaff9
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d06cfabb1 | ||
|
|
b81a8ee6be | ||
|
|
63553fba33 | ||
|
|
f32265400b | ||
|
|
aad905c2c8 | ||
|
|
22d85ce646 | ||
|
|
3c124c5f5a | ||
|
|
773f53f790 | ||
|
|
abc64214d2 | ||
|
|
e45884c5e5 |
35
.gitea/scripts/hygiene-check.sh
Normal file
35
.gitea/scripts/hygiene-check.sh
Normal file
@@ -0,0 +1,35 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
fail=0
|
||||
|
||||
while IFS= read -r -d '' path; do
|
||||
base="$(basename "$path")"
|
||||
case "$base" in
|
||||
.DS_Store|.env)
|
||||
echo "::error file=$path::tracked local-only file is forbidden"
|
||||
fail=1
|
||||
;;
|
||||
esac
|
||||
|
||||
case "$path" in
|
||||
*node_modules/*|node_modules/*)
|
||||
echo "::error file=$path::tracked node_modules content is forbidden"
|
||||
fail=1
|
||||
;;
|
||||
*.tmp|*.temp|*.bak|*.orig|*.rej|*.zip|*.tar|*.tar.gz|*.tgz|*.rar|*.7z)
|
||||
echo "::error file=$path::tracked temporary/archive artifact is forbidden"
|
||||
fail=1
|
||||
;;
|
||||
esac
|
||||
|
||||
if [ -f "$path" ]; then
|
||||
size="$(wc -c < "$path" | tr -d ' ')"
|
||||
if [ "${size:-0}" -gt 52428800 ]; then
|
||||
echo "::error file=$path::tracked file is larger than 50 MiB"
|
||||
fail=1
|
||||
fi
|
||||
fi
|
||||
done < <(git ls-files -z)
|
||||
|
||||
exit "$fail"
|
||||
@@ -5,8 +5,15 @@ on:
|
||||
pull_request:
|
||||
|
||||
jobs:
|
||||
hygiene:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- run: bash .gitea/scripts/hygiene-check.sh
|
||||
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
needs: hygiene
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-go@v5
|
||||
|
||||
36
README.md
36
README.md
@@ -43,7 +43,8 @@ Input can be either explicit messages:
|
||||
```
|
||||
|
||||
or compact `system` / `user` fields. The completed job result contains
|
||||
`content`, `model`, `usage` and `duration_ms`.
|
||||
`schema_version=ai.chat_result.v1`, `content`, `model`, `usage` and
|
||||
`duration_ms`.
|
||||
|
||||
`call_analysis` and `transcript_summary` use the same input contract as
|
||||
`llm_chat`; callers may include domain metadata fields in `input`, but the
|
||||
@@ -55,6 +56,9 @@ worker only reads chat fields such as `system`, `user`, `messages`,
|
||||
`/v1/audio/transcriptions` endpoint. The returned `segments` field stays
|
||||
compatible with telephony. If the provider returns one long segment, AI Service
|
||||
splits it into smaller transcript segments without inventing speaker labels.
|
||||
The completed job result contains
|
||||
`schema_version=ai.transcription_result.v1`, `provider`, `model`, `language`,
|
||||
`segments`, optional provider `attempts` and `duration_ms`.
|
||||
|
||||
AI-server compose snippet for Whisper Large v3 lives in
|
||||
`deploy/ai-server/docker-compose.audio.yml`:
|
||||
@@ -85,6 +89,8 @@ scheduling.
|
||||
returning secrets.
|
||||
- `GET /api/v1/infra/status` returns AI-server sidecar telemetry
|
||||
(GPU, containers and vLLM live metrics) when configured.
|
||||
- `GET /health/detail` returns PostgreSQL, provider, queue, error, throughput
|
||||
and infra components for Portal `admin/health`.
|
||||
- `GET /healthz` returns process health.
|
||||
- `GET /readyz` checks PostgreSQL readiness.
|
||||
- Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`:
|
||||
@@ -94,6 +100,34 @@ All `/api/v1/*` endpoints require `Authorization: Bearer <AI_SERVICE_TOKEN>`
|
||||
when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open
|
||||
for Kubernetes probes.
|
||||
|
||||
## Retry policy
|
||||
|
||||
Workers store a normalized `error_code` on failed jobs. AI Service requeues only
|
||||
explicitly retryable categories while attempts remain.
|
||||
|
||||
| Category | Retry | Delay |
|
||||
| --- | --- | --- |
|
||||
| `provider_unavailable`, `model_unavailable`, `provider_error`, `dependency_error`, `timeout`, `storage_error`, `stale_worker` | yes | 30s |
|
||||
| `bad_response`, `transcript_hallucination`, `transcript_incomplete`, `internal_error`, `unknown` | yes | 2m |
|
||||
| `bad_audio`, `bad_input`, `context_length`, `unsupported_task`, `cancelled` | no | - |
|
||||
|
||||
Domain services may still expose manual retry for terminal errors after the
|
||||
underlying data or prompt is corrected.
|
||||
|
||||
## Result schemas
|
||||
|
||||
AI Service result payloads are versioned with `schema_version`. Consumers should
|
||||
ignore unknown fields and reject only unsupported major schema names.
|
||||
|
||||
Current schemas:
|
||||
|
||||
- `ai.chat_result.v1`: `{schema_version, content, model, usage?, duration_ms}`.
|
||||
- `ai.transcription_result.v1`:
|
||||
`{schema_version, provider?, model?, attempts?, language, segments, duration_ms}`.
|
||||
|
||||
New optional fields may be added to a `v1` schema without a breaking change.
|
||||
Breaking shape changes require a new schema name.
|
||||
|
||||
## Configuration
|
||||
|
||||
- `HTTP_HOST`, default `0.0.0.0`
|
||||
|
||||
@@ -125,13 +125,6 @@ func envCSV(key string) []string {
|
||||
return out
|
||||
}
|
||||
|
||||
func envCSVDefault(key string, fallback []string) []string {
|
||||
if values := envCSV(key); len(values) > 0 {
|
||||
return values
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
func defaultAudioPrompt() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := contextWithTimeout(r, 12*time.Second)
|
||||
defer cancel()
|
||||
|
||||
stats, err := s.store.Stats(ctx)
|
||||
stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
|
||||
241
internal/httpapi/health.go
Normal file
241
internal/httpapi/health.go
Normal file
@@ -0,0 +1,241 @@
|
||||
package httpapi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"ai-service/internal/model"
|
||||
"ai-service/internal/transcription"
|
||||
)
|
||||
|
||||
type healthDetailResponse struct {
|
||||
Status string `json:"status"`
|
||||
Generated time.Time `json:"generated_at"`
|
||||
Components []healthComponent `json:"components"`
|
||||
}
|
||||
|
||||
type healthComponent struct {
|
||||
Name string `json:"name"`
|
||||
Status string `json:"status"`
|
||||
Error string `json:"error,omitempty"`
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
}
|
||||
|
||||
func (s *Server) handleHealthDetail(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := contextWithTimeout(r, 12*time.Second)
|
||||
defer cancel()
|
||||
|
||||
resp := healthDetailResponse{
|
||||
Status: "ok",
|
||||
Generated: time.Now().UTC(),
|
||||
}
|
||||
|
||||
if err := s.store.Ping(ctx); err != nil {
|
||||
resp.Components = append(resp.Components, healthComponent{
|
||||
Name: "postgres",
|
||||
Status: "down",
|
||||
Error: err.Error(),
|
||||
})
|
||||
resp.Status = worseHealthStatus(resp.Status, "down")
|
||||
writeJSON(w, http.StatusServiceUnavailable, resp)
|
||||
return
|
||||
}
|
||||
resp.Components = append(resp.Components, healthComponent{Name: "postgres", Status: "ok"})
|
||||
|
||||
stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
|
||||
if err != nil {
|
||||
resp.Components = append(resp.Components, healthComponent{
|
||||
Name: "queue",
|
||||
Status: "down",
|
||||
Error: err.Error(),
|
||||
})
|
||||
resp.Status = worseHealthStatus(resp.Status, "down")
|
||||
writeJSON(w, http.StatusServiceUnavailable, resp)
|
||||
return
|
||||
}
|
||||
|
||||
for _, component := range []healthComponent{
|
||||
s.healthProviders(ctx),
|
||||
healthQueue(stats),
|
||||
healthErrors(stats),
|
||||
healthThroughput(stats),
|
||||
healthInfra(loadInfraSnapshot(r, s.cfg)),
|
||||
} {
|
||||
resp.Components = append(resp.Components, component)
|
||||
resp.Status = worseHealthStatus(resp.Status, component.Status)
|
||||
}
|
||||
|
||||
statusCode := http.StatusOK
|
||||
if resp.Status == "down" {
|
||||
statusCode = http.StatusServiceUnavailable
|
||||
}
|
||||
writeJSON(w, statusCode, resp)
|
||||
}
|
||||
|
||||
func (s *Server) healthProviders(ctx context.Context) healthComponent {
|
||||
providers := []providerStatus{
|
||||
s.checkLLM(ctx),
|
||||
s.checkAudioLLM(ctx, transcription.ProviderWhisperLargeV3, s.cfg.AudioBaseURL, s.cfg.AudioAPIKey, s.cfg.AudioModel, s.cfg.AudioTimeout),
|
||||
}
|
||||
status := "ok"
|
||||
messages := make([]string, 0)
|
||||
for _, provider := range providers {
|
||||
switch {
|
||||
case !provider.Configured:
|
||||
status = worseHealthStatus(status, "degraded")
|
||||
messages = append(messages, provider.Name+" not configured")
|
||||
case !provider.OK:
|
||||
status = worseHealthStatus(status, "down")
|
||||
if provider.Error != "" {
|
||||
messages = append(messages, provider.Name+": "+provider.Error)
|
||||
} else {
|
||||
messages = append(messages, provider.Name+" unavailable")
|
||||
}
|
||||
case provider.Stale:
|
||||
status = worseHealthStatus(status, "degraded")
|
||||
if provider.Error != "" {
|
||||
messages = append(messages, provider.Name+": "+provider.Error)
|
||||
}
|
||||
}
|
||||
}
|
||||
return healthComponent{
|
||||
Name: "providers",
|
||||
Status: status,
|
||||
Error: strings.Join(messages, "; "),
|
||||
Data: map[string]any{
|
||||
"providers": providers,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func healthQueue(stats *model.Stats) healthComponent {
|
||||
var pending, running, staleRunning int64
|
||||
var oldestPendingAgeSeconds, oldestRunningAgeSeconds int64
|
||||
for _, row := range stats.Backlog {
|
||||
pending += row.Pending
|
||||
running += row.Running
|
||||
staleRunning += row.StaleRunning
|
||||
if row.OldestPendingAgeSeconds > oldestPendingAgeSeconds {
|
||||
oldestPendingAgeSeconds = row.OldestPendingAgeSeconds
|
||||
}
|
||||
if row.OldestRunningAgeSeconds > oldestRunningAgeSeconds {
|
||||
oldestRunningAgeSeconds = row.OldestRunningAgeSeconds
|
||||
}
|
||||
}
|
||||
status := "ok"
|
||||
message := ""
|
||||
if staleRunning > 0 {
|
||||
status = "degraded"
|
||||
message = "there are stale running jobs"
|
||||
}
|
||||
return healthComponent{
|
||||
Name: "queue",
|
||||
Status: status,
|
||||
Error: message,
|
||||
Data: map[string]any{
|
||||
"pending": pending,
|
||||
"running": running,
|
||||
"stale_running": staleRunning,
|
||||
"oldest_pending_age_seconds": oldestPendingAgeSeconds,
|
||||
"oldest_running_age_seconds": oldestRunningAgeSeconds,
|
||||
"backlog": stats.Backlog,
|
||||
"queue_status_totals": stats.Queues,
|
||||
"owner_status_totals": stats.Owners,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func healthErrors(stats *model.Stats) healthComponent {
|
||||
var failedTotal, failed24h int64
|
||||
for _, row := range stats.Errors {
|
||||
failedTotal += row.Total
|
||||
failed24h += row.Last24h
|
||||
}
|
||||
status := "ok"
|
||||
message := ""
|
||||
if failed24h > 0 {
|
||||
status = "degraded"
|
||||
message = "there are failed jobs in the last 24 hours"
|
||||
}
|
||||
return healthComponent{
|
||||
Name: "errors",
|
||||
Status: status,
|
||||
Error: message,
|
||||
Data: map[string]any{
|
||||
"failed_total": failedTotal,
|
||||
"failed_24h": failed24h,
|
||||
"by_code": stats.Errors,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func healthThroughput(stats *model.Stats) healthComponent {
|
||||
var done24h, retried24h int64
|
||||
for _, row := range stats.Stages {
|
||||
done24h += row.Done24h
|
||||
retried24h += row.Retried24h
|
||||
}
|
||||
|
||||
pendingByStage := make(map[string]int64)
|
||||
for _, row := range stats.Backlog {
|
||||
pendingByStage[row.TaskType+"|"+row.ModelProfile] += row.Pending + row.Running
|
||||
}
|
||||
doneByStage := make(map[string]int64)
|
||||
for _, row := range stats.Stages {
|
||||
doneByStage[row.TaskType+"|"+row.ModelProfile] += row.Done24h
|
||||
}
|
||||
|
||||
stuckStages := make([]string, 0)
|
||||
for key, total := range pendingByStage {
|
||||
if total > 0 && doneByStage[key] == 0 {
|
||||
stuckStages = append(stuckStages, key)
|
||||
}
|
||||
}
|
||||
|
||||
status := "ok"
|
||||
message := ""
|
||||
if len(stuckStages) > 0 {
|
||||
status = "degraded"
|
||||
message = "some active queues have no completed jobs in the last 24 hours"
|
||||
}
|
||||
return healthComponent{
|
||||
Name: "throughput",
|
||||
Status: status,
|
||||
Error: message,
|
||||
Data: map[string]any{
|
||||
"done_24h": done24h,
|
||||
"retried_24h": retried24h,
|
||||
"stuck_stages": stuckStages,
|
||||
"stages": stats.Stages,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func healthInfra(infra infraStatusResponse) healthComponent {
|
||||
status := "ok"
|
||||
message := ""
|
||||
if infra.SidecarError != "" {
|
||||
status = "degraded"
|
||||
message = infra.SidecarError
|
||||
}
|
||||
return healthComponent{
|
||||
Name: "infra",
|
||||
Status: status,
|
||||
Error: message,
|
||||
Data: map[string]any{
|
||||
"sidecar": infra.Sidecar,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func worseHealthStatus(current, next string) string {
|
||||
if current == "down" || next == "down" {
|
||||
return "down"
|
||||
}
|
||||
if current == "degraded" || next == "degraded" {
|
||||
return "degraded"
|
||||
}
|
||||
return "ok"
|
||||
}
|
||||
@@ -41,6 +41,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
case r.Method == http.MethodGet && path == "/readyz":
|
||||
s.handleReady(w, r)
|
||||
case r.Method == http.MethodGet && path == "/health/detail":
|
||||
s.handleHealthDetail(w, r)
|
||||
case r.Method == http.MethodGet && path == "/":
|
||||
writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"})
|
||||
case r.Method == http.MethodPost && path == "/api/v1/jobs":
|
||||
@@ -59,6 +61,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
s.handleGetJob(w, r, path)
|
||||
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"):
|
||||
s.handleRetryJob(w, r, path)
|
||||
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/cancel"):
|
||||
s.handleCancelJob(w, r, path)
|
||||
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"):
|
||||
s.handleCompleteJob(w, r, path)
|
||||
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"):
|
||||
@@ -265,7 +269,7 @@ func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request, path strin
|
||||
}
|
||||
|
||||
func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) {
|
||||
id, err := jobIDFromPath(path, true)
|
||||
id, err := jobIDFromActionPath(path, "retry")
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
@@ -284,6 +288,26 @@ func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path str
|
||||
writeJSON(w, http.StatusOK, job)
|
||||
}
|
||||
|
||||
func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request, path string) {
|
||||
id, err := jobIDFromActionPath(path, "cancel")
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
||||
defer cancel()
|
||||
job, err := s.store.CancelJob(ctx, id)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
if job == nil {
|
||||
writeError(w, http.StatusNotFound, "cancellable job not found")
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, job)
|
||||
}
|
||||
|
||||
func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) {
|
||||
id, err := jobIDFromActionPath(path, "complete")
|
||||
if err != nil {
|
||||
@@ -337,7 +361,7 @@ func (s *Server) handleFailJob(w http.ResponseWriter, r *http.Request, path stri
|
||||
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
||||
defer cancel()
|
||||
stats, err := s.store.Stats(ctx)
|
||||
stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
|
||||
@@ -38,7 +38,10 @@ type Usage struct {
|
||||
TotalTokens int `json:"total_tokens"`
|
||||
}
|
||||
|
||||
const ChatResultSchemaVersion = "ai.chat_result.v1"
|
||||
|
||||
type ChatResult struct {
|
||||
SchemaVersion string `json:"schema_version"`
|
||||
Content string `json:"content"`
|
||||
Model string `json:"model"`
|
||||
Usage *Usage `json:"usage,omitempty"`
|
||||
@@ -137,6 +140,7 @@ func (c *Client) Chat(ctx context.Context, in ChatInput) (*ChatResult, error) {
|
||||
modelName = c.model
|
||||
}
|
||||
return &ChatResult{
|
||||
SchemaVersion: ChatResultSchemaVersion,
|
||||
Content: out.Choices[0].Message.Content,
|
||||
Model: modelName,
|
||||
Usage: out.Usage,
|
||||
|
||||
43
internal/llm/client_test.go
Normal file
43
internal/llm/client_test.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package llm
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestChatResultIncludesSchemaVersion(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/chat/completions" {
|
||||
t.Fatalf("path = %q, want /v1/chat/completions", r.URL.Path)
|
||||
}
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"model": "qwen2.5-14b",
|
||||
"choices": []map[string]any{
|
||||
{"message": map[string]string{"role": "assistant", "content": `{"ok":true}`}},
|
||||
},
|
||||
"usage": map[string]int{
|
||||
"prompt_tokens": 10,
|
||||
"completion_tokens": 2,
|
||||
"total_tokens": 12,
|
||||
},
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := New(server.URL, "", "fallback-model", 0)
|
||||
got, err := client.Chat(t.Context(), ChatInput{User: "test", MaxTokens: 32})
|
||||
if err != nil {
|
||||
t.Fatalf("Chat: %v", err)
|
||||
}
|
||||
if got.SchemaVersion != ChatResultSchemaVersion {
|
||||
t.Fatalf("schema_version = %q, want %q", got.SchemaVersion, ChatResultSchemaVersion)
|
||||
}
|
||||
if got.Content != `{"ok":true}` {
|
||||
t.Fatalf("content = %q", got.Content)
|
||||
}
|
||||
if got.Usage == nil || got.Usage.TotalTokens != 12 {
|
||||
t.Fatalf("usage = %#v", got.Usage)
|
||||
}
|
||||
}
|
||||
@@ -135,8 +135,12 @@ type BacklogStat struct {
|
||||
ModelProfile string `json:"model_profile"`
|
||||
Pending int64 `json:"pending"`
|
||||
Running int64 `json:"running"`
|
||||
StaleRunning int64 `json:"stale_running"`
|
||||
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
|
||||
OldestPendingScheduledAt string `json:"oldest_pending_scheduled_at,omitempty"`
|
||||
OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"`
|
||||
OldestRunningStartedAt string `json:"oldest_running_started_at,omitempty"`
|
||||
LastHeartbeatAt string `json:"last_heartbeat_at,omitempty"`
|
||||
}
|
||||
|
||||
type OwnerStat struct {
|
||||
|
||||
@@ -12,9 +12,9 @@ type failRetryPolicy struct {
|
||||
|
||||
func retryPolicyForError(errorCode string) failRetryPolicy {
|
||||
switch strings.TrimSpace(errorCode) {
|
||||
case "provider_unavailable", "model_unavailable", "timeout", "storage_error", "stale_worker":
|
||||
case "provider_unavailable", "model_unavailable", "provider_error", "dependency_error", "timeout", "storage_error", "stale_worker":
|
||||
return failRetryPolicy{Retryable: true, Delay: 30 * time.Second}
|
||||
case "bad_response", "unknown":
|
||||
case "bad_response", "transcript_hallucination", "transcript_incomplete", "internal_error", "unknown":
|
||||
return failRetryPolicy{Retryable: true, Delay: 2 * time.Minute}
|
||||
default:
|
||||
return failRetryPolicy{}
|
||||
|
||||
@@ -14,13 +14,21 @@ func TestRetryPolicyForError(t *testing.T) {
|
||||
}{
|
||||
{name: "provider unavailable", code: "provider_unavailable", retryable: true, delay: 30 * time.Second},
|
||||
{name: "model unavailable", code: "model_unavailable", retryable: true, delay: 30 * time.Second},
|
||||
{name: "provider error", code: "provider_error", retryable: true, delay: 30 * time.Second},
|
||||
{name: "dependency error", code: "dependency_error", retryable: true, delay: 30 * time.Second},
|
||||
{name: "timeout", code: "timeout", retryable: true, delay: 30 * time.Second},
|
||||
{name: "storage", code: "storage_error", retryable: true, delay: 30 * time.Second},
|
||||
{name: "stale worker", code: "stale_worker", retryable: true, delay: 30 * time.Second},
|
||||
{name: "bad response", code: "bad_response", retryable: true, delay: 2 * time.Minute},
|
||||
{name: "transcript hallucination", code: "transcript_hallucination", retryable: true, delay: 2 * time.Minute},
|
||||
{name: "transcript incomplete", code: "transcript_incomplete", retryable: true, delay: 2 * time.Minute},
|
||||
{name: "internal error", code: "internal_error", retryable: true, delay: 2 * time.Minute},
|
||||
{name: "unknown", code: "unknown", retryable: true, delay: 2 * time.Minute},
|
||||
{name: "bad audio", code: "bad_audio"},
|
||||
{name: "bad input", code: "bad_input"},
|
||||
{name: "context length", code: "context_length"},
|
||||
{name: "unsupported task", code: "unsupported_task"},
|
||||
{name: "cancelled", code: "cancelled"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
||||
@@ -41,17 +41,48 @@ func Open(ctx context.Context, databaseURL string) (*Store, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse database url: %w", err)
|
||||
}
|
||||
pool, err := pgxpool.NewWithConfig(ctx, cfg)
|
||||
pool, err := connectWithRetry(ctx, cfg, 2*time.Minute)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connect postgres: %w", err)
|
||||
}
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
pool.Close()
|
||||
return nil, fmt.Errorf("ping postgres: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
return &Store{pool: pool}, nil
|
||||
}
|
||||
|
||||
func connectWithRetry(ctx context.Context, cfg *pgxpool.Config, maxWait time.Duration) (*pgxpool.Pool, error) {
|
||||
deadline := time.Now().Add(maxWait)
|
||||
var lastErr error
|
||||
|
||||
for attempt := 1; ; attempt++ {
|
||||
pool, err := pgxpool.NewWithConfig(ctx, cfg)
|
||||
if err == nil {
|
||||
if pingErr := pool.Ping(ctx); pingErr == nil {
|
||||
return pool, nil
|
||||
} else {
|
||||
err = fmt.Errorf("ping postgres: %w", pingErr)
|
||||
pool.Close()
|
||||
}
|
||||
} else {
|
||||
err = fmt.Errorf("connect postgres: %w", err)
|
||||
}
|
||||
lastErr = err
|
||||
|
||||
if time.Now().After(deadline) {
|
||||
return nil, fmt.Errorf("connect postgres after retry: %w", lastErr)
|
||||
}
|
||||
sleep := time.Duration(attempt) * time.Second
|
||||
if sleep > 5*time.Second {
|
||||
sleep = 5 * time.Second
|
||||
}
|
||||
timer := time.NewTimer(sleep)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return nil, fmt.Errorf("connect postgres cancelled: %w", ctx.Err())
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) Close() {
|
||||
s.pool.Close()
|
||||
}
|
||||
@@ -420,6 +451,28 @@ WHERE j.id = picked.id
|
||||
return int(tag.RowsAffected()), nil
|
||||
}
|
||||
|
||||
func (s *Store) CancelJob(ctx context.Context, id uuid.UUID) (*model.Job, error) {
|
||||
const q = `
|
||||
UPDATE ai_jobs
|
||||
SET status = 'cancelled',
|
||||
completed_at = NOW(),
|
||||
worker_id = NULL,
|
||||
heartbeat_at = NULL,
|
||||
updated_at = NOW()
|
||||
WHERE id = $1
|
||||
AND status IN ('pending', 'running')
|
||||
RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, status,
|
||||
attempts, max_attempts, input, result, error_code, error_message,
|
||||
scheduled_at, started_at, completed_at, worker_id, heartbeat_at,
|
||||
created_at, updated_at, idempotency_key
|
||||
`
|
||||
job, err := scanJob(s.pool.QueryRow(ctx, q, id))
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil, nil
|
||||
}
|
||||
return job, err
|
||||
}
|
||||
|
||||
func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) {
|
||||
normalizeFilter(&filter)
|
||||
const q = `
|
||||
@@ -627,7 +680,10 @@ WHERE j.id = picked.id
|
||||
return int(tag.RowsAffected()), nil
|
||||
}
|
||||
|
||||
func (s *Store) Stats(ctx context.Context) (*model.Stats, error) {
|
||||
func (s *Store) Stats(ctx context.Context, staleAfter time.Duration) (*model.Stats, error) {
|
||||
if staleAfter <= 0 {
|
||||
staleAfter = 15 * time.Minute
|
||||
}
|
||||
out := &model.Stats{At: time.Now().UTC()}
|
||||
|
||||
queueRows, err := s.pool.Query(ctx, `
|
||||
@@ -741,13 +797,20 @@ SELECT owner_service,
|
||||
model_profile,
|
||||
count(*) FILTER (WHERE status = 'pending') AS pending,
|
||||
count(*) FILTER (WHERE status = 'running') AS running,
|
||||
count(*) FILTER (
|
||||
WHERE status = 'running'
|
||||
AND COALESCE(heartbeat_at, started_at, updated_at) < NOW() - make_interval(secs => $1)
|
||||
) AS stale_running,
|
||||
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(scheduled_at) FILTER (WHERE status = 'pending')))::bigint, 0) AS oldest_pending_age_seconds,
|
||||
MIN(scheduled_at) FILTER (WHERE status = 'pending') AS oldest_pending_scheduled_at
|
||||
MIN(scheduled_at) FILTER (WHERE status = 'pending') AS oldest_pending_scheduled_at,
|
||||
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(started_at) FILTER (WHERE status = 'running')))::bigint, 0) AS oldest_running_age_seconds,
|
||||
MIN(started_at) FILTER (WHERE status = 'running') AS oldest_running_started_at,
|
||||
MAX(heartbeat_at) FILTER (WHERE status = 'running') AS last_heartbeat_at
|
||||
FROM ai_jobs
|
||||
WHERE status IN ('pending', 'running')
|
||||
GROUP BY owner_service, task_type, model_profile
|
||||
ORDER BY pending DESC, running DESC, owner_service, task_type, model_profile
|
||||
`)
|
||||
ORDER BY stale_running DESC, pending DESC, running DESC, owner_service, task_type, model_profile
|
||||
`, int(staleAfter.Seconds()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -755,20 +818,32 @@ ORDER BY pending DESC, running DESC, owner_service, task_type, model_profile
|
||||
for backlogRows.Next() {
|
||||
var stat model.BacklogStat
|
||||
var oldestPendingScheduledAt *time.Time
|
||||
var oldestRunningStartedAt *time.Time
|
||||
var lastHeartbeatAt *time.Time
|
||||
if err := backlogRows.Scan(
|
||||
&stat.OwnerService,
|
||||
&stat.TaskType,
|
||||
&stat.ModelProfile,
|
||||
&stat.Pending,
|
||||
&stat.Running,
|
||||
&stat.StaleRunning,
|
||||
&stat.OldestPendingAgeSeconds,
|
||||
&oldestPendingScheduledAt,
|
||||
&stat.OldestRunningAgeSeconds,
|
||||
&oldestRunningStartedAt,
|
||||
&lastHeartbeatAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if oldestPendingScheduledAt != nil {
|
||||
stat.OldestPendingScheduledAt = oldestPendingScheduledAt.UTC().Format(time.RFC3339)
|
||||
}
|
||||
if oldestRunningStartedAt != nil {
|
||||
stat.OldestRunningStartedAt = oldestRunningStartedAt.UTC().Format(time.RFC3339)
|
||||
}
|
||||
if lastHeartbeatAt != nil {
|
||||
stat.LastHeartbeatAt = lastHeartbeatAt.UTC().Format(time.RFC3339)
|
||||
}
|
||||
out.Backlog = append(out.Backlog, stat)
|
||||
}
|
||||
return out, backlogRows.Err()
|
||||
|
||||
@@ -60,7 +60,10 @@ type Segment struct {
|
||||
Speaker string `json:"speaker,omitempty"`
|
||||
}
|
||||
|
||||
const ResultSchemaVersion = "ai.transcription_result.v1"
|
||||
|
||||
type Result struct {
|
||||
SchemaVersion string `json:"schema_version"`
|
||||
Provider string `json:"provider,omitempty"`
|
||||
Model string `json:"model,omitempty"`
|
||||
Attempts []Attempt `json:"attempts,omitempty"`
|
||||
@@ -199,6 +202,7 @@ func (c *Client) transcribeWithProvider(ctx context.Context, provider ProviderCo
|
||||
attempt.Text = text
|
||||
attempt.Segments = segments
|
||||
return &Result{
|
||||
SchemaVersion: ResultSchemaVersion,
|
||||
Provider: provider.Name,
|
||||
Model: resp.Model,
|
||||
Language: firstNonEmpty(resp.Language, in.Language, "unknown"),
|
||||
|
||||
@@ -84,6 +84,9 @@ func TestWhisperUsesAudioTranscriptionsEndpoint(t *testing.T) {
|
||||
if len(got.Segments) != 2 || got.Segments[0].Text != "Алло, тест." || got.Segments[1].Start != 1.2 {
|
||||
t.Fatalf("segments = %#v", got.Segments)
|
||||
}
|
||||
if got.SchemaVersion != ResultSchemaVersion {
|
||||
t.Fatalf("schema_version = %q, want %q", got.SchemaVersion, ResultSchemaVersion)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWhisperFallsBackToJSONWhenVerboseJSONUnsupported(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user