Compare commits

..

56 Commits

Author SHA1 Message Date
Grendgi
8d06cfabb1 fix: remove unused config helper
All checks were successful
CI / hygiene (push) Successful in 3s
Build and Deploy / build-and-deploy (push) Successful in 28s
CI / test (push) Successful in 22s
2026-06-18 10:28:43 +03:00
Grendgi
b81a8ee6be feat: cancel ai jobs by id
Some checks failed
CI / hygiene (push) Successful in 1s
Build and Deploy / build-and-deploy (push) Successful in 42s
CI / test (push) Failing after 21s
2026-06-17 17:39:29 +03:00
Grendgi
63553fba33 feat: version ai result schemas 2026-06-17 16:46:03 +03:00
Grendgi
f32265400b feat: expand ai retry policy 2026-06-17 16:39:58 +03:00
Grendgi
aad905c2c8 fix: expose ai health component errors 2026-06-17 16:35:47 +03:00
Grendgi
22d85ce646 fix: align ai health status contract 2026-06-17 16:34:44 +03:00
Grendgi
3c124c5f5a feat: expose ai service health detail 2026-06-17 16:31:22 +03:00
Grendgi
773f53f790 Expose AI queue heartbeat metrics
Some checks failed
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 28s
CI / test (push) Failing after 7s
2026-06-12 16:50:15 +03:00
Grendgi
abc64214d2 Add AI service CI hygiene guard 2026-06-12 16:42:35 +03:00
Grendgi
e45884c5e5 Retry AI service database connection on startup 2026-06-12 16:28:02 +03:00
Grendgi
c618ffaff9 Update analysis worker deployment
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 15s
2026-06-11 12:20:47 +03:00
Grendgi
92ac01d8b5 Use verbose Whisper transcription output
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 22s
2026-06-11 11:40:52 +03:00
Grendgi
b536877181 Stabilize Whisper transcription requests
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 21s
2026-06-11 10:16:34 +03:00
Grendgi
bc71caa762 Stabilize AI worker leases
Some checks failed
CI / test (push) Failing after 7s
Build and Deploy / build-and-deploy (push) Successful in 15s
2026-06-10 17:50:53 +03:00
Grendgi
d0980007d7 Add dedicated call analysis worker
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 25s
2026-06-10 17:37:21 +03:00
Grendgi
ea632902bb Shorten AI worker lease timeout
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 15s
2026-06-10 17:05:26 +03:00
Grendgi
800d1d7cdd Keep AI jobs alive during processing
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 1m10s
2026-06-10 16:48:58 +03:00
Grendgi
837acf2f00 Add AI queue backlog metrics
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 24s
2026-06-10 16:36:35 +03:00
Grendgi
631a45aff3 Classify LLM context length errors
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 31s
2026-06-10 16:16:57 +03:00
Grendgi
7a1965e402 Increase transcription worker claim limit
Some checks failed
CI / test (push) Failing after 7s
Build and Deploy / build-and-deploy (push) Successful in 16s
2026-06-10 16:11:35 +03:00
Grendgi
3c2f13b967 Add AI job stage metrics
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 34s
2026-06-10 16:07:21 +03:00
Grendgi
2a481fdc54 Add automatic retry policy for AI jobs
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 33s
2026-06-10 15:58:41 +03:00
Grendgi
f54400e8e2 Requeue stale worker jobs on idempotent create
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 21s
2026-06-10 14:42:14 +03:00
Grendgi
e6ae792325 Drop legacy audio config aliases
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 29s
2026-06-10 14:28:52 +03:00
Grendgi
80fa21ff80 Refresh AI service pipeline docs
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 27s
2026-06-10 13:43:59 +03:00
Grendgi
7d0e27f681 Extend LLM worker lease timeout
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 23s
2026-06-10 13:20:06 +03:00
Grendgi
11247f17de Route transcript summary jobs to LLM worker
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 21s
2026-06-10 12:32:22 +03:00
Grendgi
ae1802dab9 Stop sending prompt to Whisper transcription
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 24s
2026-06-10 11:30:08 +03:00
Grendgi
bde56978d6 Fix Whisper large v3 audio compose
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 18s
2026-06-10 10:15:24 +03:00
Grendgi
8d6cd84403 Switch transcription to Whisper large v3
Some checks failed
CI / test (push) Failing after 10s
Build and Deploy / build-and-deploy (push) Successful in 24s
2026-06-10 10:10:13 +03:00
Grendgi
1b63dcdbf5 Increase Voxtral transcription timeouts
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 20s
2026-06-09 22:27:52 +03:00
Grendgi
bf945e05e3 Use Voxtral JSON transcription without fake speakers
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 23s
2026-06-09 22:09:27 +03:00
Grendgi
e074f6b226 Run Voxtral transcription worker with two jobs
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 19s
2026-06-09 17:16:24 +03:00
Grendgi
9bd6d726f0 Make Voxtral the only transcription provider
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 27s
2026-06-09 16:54:54 +03:00
Grendgi
5c965be8c9 Split single Voxtral transcript segments
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 23s
2026-06-09 16:20:52 +03:00
Grendgi
64bf40b3ba Segment Voxtral transcripts for telephony
All checks were successful
CI / test (push) Successful in 15s
Build and Deploy / build-and-deploy (push) Successful in 32s
2026-06-09 16:12:57 +03:00
Grendgi
e6c2b46cf6 Use Voxtral audio transcription endpoint
All checks were successful
CI / test (push) Successful in 14s
Build and Deploy / build-and-deploy (push) Successful in 30s
2026-06-09 15:51:50 +03:00
Grendgi
817eb8ff71 Disable inactive WhisperX provider status
All checks were successful
CI / test (push) Successful in 12s
Build and Deploy / build-and-deploy (push) Successful in 18s
2026-06-09 15:38:55 +03:00
Grendgi
94e0d03580 Switch transcription comparison to Voxtral
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 18s
2026-06-09 15:11:52 +03:00
Grendgi
add15f1385 Cast transcription comparison averages
All checks were successful
CI / test (push) Successful in 14s
Build and Deploy / build-and-deploy (push) Successful in 23s
2026-06-09 15:01:32 +03:00
Grendgi
35c60f0e0e Add transcription comparison stats 2026-06-09 14:59:08 +03:00
Grendgi
88e7c86836 Document vLLM audio URL payloads
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 16s
2026-06-09 13:49:27 +03:00
Grendgi
1202ebcb7f Use vLLM audio URL payloads
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 24s
2026-06-09 13:46:28 +03:00
Grendgi
2ef71a822b Add vLLM audio image for transcription models
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 22s
2026-06-09 13:38:08 +03:00
Grendgi
76ac9b8896 Add audio model API keys
All checks were successful
CI / test (push) Successful in 14s
Build and Deploy / build-and-deploy (push) Successful in 25s
2026-06-09 13:28:53 +03:00
Grendgi
c31dcb891c Enable Qwen audio endpoint 2026-06-09 13:23:21 +03:00
Grendgi
ee6e948d2e Add single WhisperX load balancer config 2026-06-09 13:20:17 +03:00
Grendgi
e132634c65 Isolate audio model compose network 2026-06-09 13:17:05 +03:00
Grendgi
cac8d89e64 Tune audio model GPU profiles 2026-06-09 12:52:13 +03:00
Grendgi
f49ba7abd5 Add AI server audio model profiles 2026-06-09 12:50:56 +03:00
Grendgi
aaecbb1bed Add transcription provider comparison chain 2026-06-09 12:34:08 +03:00
Grendgi
562fad6f87 Group AI errors by owner service
All checks were successful
CI / test (push) Successful in 23s
Build and Deploy / build-and-deploy (push) Successful in 24s
2026-06-09 12:06:01 +03:00
Grendgi
da144ecefe Batch create AI jobs efficiently
All checks were successful
CI / test (push) Successful in 22s
Build and Deploy / build-and-deploy (push) Successful in 25s
2026-06-09 11:49:12 +03:00
Grendgi
6254bf0810 Optimize AI dashboard job payload 2026-06-09 11:44:53 +03:00
Grendgi
092994fe74 Add AI job queue indexes 2026-06-09 11:41:15 +03:00
Grendgi
01ee090fa5 Add worker health endpoints 2026-06-09 11:38:03 +03:00
34 changed files with 2010 additions and 325 deletions

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env bash
set -euo pipefail
fail=0
while IFS= read -r -d '' path; do
base="$(basename "$path")"
case "$base" in
.DS_Store|.env)
echo "::error file=$path::tracked local-only file is forbidden"
fail=1
;;
esac
case "$path" in
*node_modules/*|node_modules/*)
echo "::error file=$path::tracked node_modules content is forbidden"
fail=1
;;
*.tmp|*.temp|*.bak|*.orig|*.rej|*.zip|*.tar|*.tar.gz|*.tgz|*.rar|*.7z)
echo "::error file=$path::tracked temporary/archive artifact is forbidden"
fail=1
;;
esac
if [ -f "$path" ]; then
size="$(wc -c < "$path" | tr -d ' ')"
if [ "${size:-0}" -gt 52428800 ]; then
echo "::error file=$path::tracked file is larger than 50 MiB"
fail=1
fi
fi
done < <(git ls-files -z)
exit "$fail"

View File

@@ -5,8 +5,15 @@ on:
pull_request:
jobs:
hygiene:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: bash .gitea/scripts/hygiene-check.sh
test:
runs-on: ubuntu-latest
needs: hygiene
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5

View File

@@ -58,8 +58,11 @@ jobs:
server=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }}
kubectl -n ai-service set image deployment/ai-service-worker \
worker=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }}
kubectl -n ai-service set image deployment/ai-service-analysis-worker \
worker=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }}
kubectl -n ai-service set image deployment/ai-service-transcription-worker \
worker=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }}
kubectl -n ai-service rollout status deployment/ai-service --timeout=180s
kubectl -n ai-service rollout status deployment/ai-service-worker --timeout=180s
kubectl -n ai-service rollout status deployment/ai-service-analysis-worker --timeout=180s
kubectl -n ai-service rollout status deployment/ai-service-transcription-worker --timeout=180s

View File

@@ -2,8 +2,9 @@
Technical AI job service for Portal workloads.
The first version owns only AI job lifecycle and metrics. Business data stays in
domain services such as `telephony`, `monitoring-tg` and `monitoring-pf`.
AI Service owns technical AI job lifecycle, provider execution and metrics.
Business data stays in domain services such as `telephony`, `monitoring-tg` and
`monitoring-pf`.
## Generic job contract
@@ -14,8 +15,9 @@ The service is intentionally domain-agnostic:
- `owner_ref` is the caller's stable object reference, for example
`beeline/{call_id}` or `channel/{message_id}`.
- `task_type` describes the technical task class, for example
`transcribe`, `call_analysis`, `tg_analysis`, `pf_competitor_analysis`.
- `model_profile` selects a runtime profile, for example `whisperx`,
`transcription`, `transcript_summary`, `call_analysis`,
`telegram_classification`, `tg_analysis`, `pf_competitor_analysis`.
- `model_profile` selects a runtime profile, for example `whisper-large-v3`,
`qwen2.5-14b`, `vision`, or a future provider profile.
- `input` and `result` are JSON payloads owned by the caller and worker.
@@ -24,8 +26,9 @@ service.
## Built-in workers
The first built-in worker processes `llm_chat`, `chat_completion` and
`call_analysis` jobs whose `model_profile` equals `LLM_MODEL`.
The LLM worker processes `llm_chat`, `chat_completion`, `call_analysis`,
`transcript_summary` and `telegram_classification` jobs whose `model_profile`
equals `LLM_MODEL`.
Input can be either explicit messages:
@@ -40,11 +43,34 @@ Input can be either explicit messages:
```
or compact `system` / `user` fields. The completed job result contains
`content`, `model`, `usage` and `duration_ms`.
`schema_version=ai.chat_result.v1`, `content`, `model`, `usage` and
`duration_ms`.
`call_analysis` uses the same input contract as `llm_chat`; callers may include
domain metadata fields in `input`, but the worker only reads chat fields such as
`system`, `user`, `messages`, `max_tokens` and `response_format`.
`call_analysis` and `transcript_summary` use the same input contract as
`llm_chat`; callers may include domain metadata fields in `input`, but the
worker only reads chat fields such as `system`, `user`, `messages`,
`max_tokens` and `response_format`.
`transcription` jobs are processed only by Whisper Large v3
(`openai/whisper-large-v3`) through an OpenAI-compatible
`/v1/audio/transcriptions` endpoint. The returned `segments` field stays
compatible with telephony. If the provider returns one long segment, AI Service
splits it into smaller transcript segments without inventing speaker labels.
The completed job result contains
`schema_version=ai.transcription_result.v1`, `provider`, `model`, `language`,
`segments`, optional provider `attempts` and `duration_ms`.
AI-server compose snippet for Whisper Large v3 lives in
`deploy/ai-server/docker-compose.audio.yml`:
- Whisper endpoint: `http://10.2.3.5:8004`
- Start Whisper:
`docker compose -f docker-compose.yml -f docker-compose.audio.yml --profile whisper-large-v3 up -d whisper-large-v3`
In Kubernetes the dedicated transcription worker may claim more than one
`whisper-large-v3` job at a time. This keeps download/upload/wait overhead from
serializing the queue while the Whisper provider still controls the actual GPU
scheduling.
## API
@@ -62,14 +88,46 @@ domain metadata fields in `input`, but the worker only reads chat fields such as
- `GET /api/v1/providers/status` checks configured AI providers without
returning secrets.
- `GET /api/v1/infra/status` returns AI-server sidecar telemetry
(GPU, containers, vLLM and WhisperX live metrics) when configured.
(GPU, containers and vLLM live metrics) when configured.
- `GET /health/detail` returns PostgreSQL, provider, queue, error, throughput
and infra components for Portal `admin/health`.
- `GET /healthz` returns process health.
- `GET /readyz` checks PostgreSQL readiness.
- Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`:
`GET /healthz`, `GET /readyz` and `GET /worker/status`.
All `/api/v1/*` endpoints require `Authorization: Bearer <AI_SERVICE_TOKEN>`
when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open
for Kubernetes probes.
## Retry policy
Workers store a normalized `error_code` on failed jobs. AI Service requeues only
explicitly retryable categories while attempts remain.
| Category | Retry | Delay |
| --- | --- | --- |
| `provider_unavailable`, `model_unavailable`, `provider_error`, `dependency_error`, `timeout`, `storage_error`, `stale_worker` | yes | 30s |
| `bad_response`, `transcript_hallucination`, `transcript_incomplete`, `internal_error`, `unknown` | yes | 2m |
| `bad_audio`, `bad_input`, `context_length`, `unsupported_task`, `cancelled` | no | - |
Domain services may still expose manual retry for terminal errors after the
underlying data or prompt is corrected.
## Result schemas
AI Service result payloads are versioned with `schema_version`. Consumers should
ignore unknown fields and reject only unsupported major schema names.
Current schemas:
- `ai.chat_result.v1`: `{schema_version, content, model, usage?, duration_ms}`.
- `ai.transcription_result.v1`:
`{schema_version, provider?, model?, attempts?, language, segments, duration_ms}`.
New optional fields may be added to a `v1` schema without a breaking change.
Breaking shape changes require a new schema name.
## Configuration
- `HTTP_HOST`, default `0.0.0.0`
@@ -81,14 +139,22 @@ for Kubernetes probes.
- `LLM_API_KEY`, primary LLM API key
- `LLM_MODEL`, default `qwen2.5-14b`
- `LLM_TIMEOUT`, default `5m`
- `WHISPERX_URL`, WhisperX endpoint for transcription jobs
- `AUDIO_TRANSCRIPTION_BASE_URL`, OpenAI-compatible transcription endpoint
- `AUDIO_TRANSCRIPTION_MODEL`, default `openai/whisper-large-v3`
- `AUDIO_TRANSCRIPTION_API_KEY`, optional bearer token; falls back to
`AUDIO_LLM_API_KEY`, then `LLM_API_KEY`
- `AUDIO_TRANSCRIPTION_PROMPT`, transcription instruction
- `WORKER_ID`, default hostname
- `WORKER_HTTP_HOST`, default `0.0.0.0`
- `WORKER_HTTP_PORT`, default `8081`
- `WORKER_POLL_INTERVAL`, default `2s`
- `WORKER_CLAIM_LIMIT`, default `4`
- `WORKER_LEASE_TIMEOUT`, default `15m`
## Next integration step
## Current telephony pipeline
`telephony` should first mirror low-risk analysis jobs into this service while
continuing local processing. Remote execution can then be enabled by feature
flag per task type.
`telephony` now uses AI Service as the only AI execution path:
1. `transcription` turns call audio into segments.
2. `transcript_summary` creates a detailed Russian call summary.
3. `call_analysis` runs tags and negotiation rules against the summary.

View File

@@ -2,10 +2,16 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
"ai-service/internal/config"
"ai-service/internal/llm"
@@ -42,14 +48,22 @@ func main() {
}
llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout)
transcriber := transcription.New(cfg.WhisperXURL, cfg.WhisperXTimeout, cfg.FfmpegPath, cfg.WhisperXLeadSilence)
transcriber := transcription.NewWithOptions(transcription.Options{
AudioBaseURL: cfg.AudioBaseURL,
AudioAPIKey: cfg.AudioAPIKey,
AudioModel: cfg.AudioModel,
AudioTimeout: cfg.AudioTimeout,
AudioPrompt: cfg.AudioPrompt,
})
w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit)
healthSrv := startHealthServer(ctx, db, cfg)
slog.Info("ai_worker_started",
"worker_id", cfg.WorkerID,
"model", cfg.LLMModel,
"whisperx_enabled", transcriber != nil,
"whisperx_lead_silence", cfg.WhisperXLeadSilence.String(),
"transcription_enabled", transcriber != nil,
"transcription_provider", transcription.ProviderWhisperLargeV3,
"transcription_model", cfg.AudioModel,
"task_types", cfg.WorkerTaskTypes,
"model_profiles", cfg.WorkerModelProfiles,
"poll_interval", cfg.WorkerPollInterval.String(),
@@ -57,4 +71,84 @@ func main() {
"claim_limit", cfg.WorkerClaimLimit,
)
w.Run(ctx)
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = healthSrv.Shutdown(shutdownCtx)
}
type workerHealth struct {
store *store.Store
cfg config.Config
}
func startHealthServer(ctx context.Context, db *store.Store, cfg config.Config) *http.Server {
srv := &http.Server{
Addr: fmt.Sprintf("%s:%d", cfg.WorkerHTTPHost, cfg.WorkerHTTPPort),
Handler: workerHealth{store: db, cfg: cfg},
ReadHeaderTimeout: 5 * time.Second,
}
go func() {
slog.Info("ai_worker_health_started", "addr", srv.Addr)
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
slog.Error("ai_worker_health_failed", "error", err)
}
}()
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = srv.Shutdown(shutdownCtx)
}()
return srv
}
func (h workerHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) {
path := strings.TrimSuffix(r.URL.Path, "/")
if path == "" {
path = "/"
}
switch {
case r.Method == http.MethodGet && path == "/healthz":
writeWorkerJSON(w, http.StatusOK, map[string]any{
"status": "ok",
"worker_id": h.cfg.WorkerID,
})
case r.Method == http.MethodGet && path == "/readyz":
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
defer cancel()
if err := h.store.Ping(ctx); err != nil {
writeWorkerJSON(w, http.StatusServiceUnavailable, map[string]any{
"status": "not_ready",
"error": err.Error(),
})
return
}
writeWorkerJSON(w, http.StatusOK, map[string]any{
"status": "ready",
"worker_id": h.cfg.WorkerID,
})
case r.Method == http.MethodGet && path == "/worker/status":
writeWorkerJSON(w, http.StatusOK, map[string]any{
"status": "running",
"worker_id": h.cfg.WorkerID,
"task_types": h.cfg.WorkerTaskTypes,
"model_profiles": h.cfg.WorkerModelProfiles,
"transcription_provider": transcription.ProviderWhisperLargeV3,
"transcription_model": h.cfg.AudioModel,
"claim_limit": h.cfg.WorkerClaimLimit,
"poll_interval": h.cfg.WorkerPollInterval.String(),
"lease_timeout": h.cfg.WorkerLeaseTimeout.String(),
})
default:
writeWorkerJSON(w, http.StatusNotFound, map[string]any{"error": "not found"})
}
}
func writeWorkerJSON(w http.ResponseWriter, code int, body any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
if err := json.NewEncoder(w).Encode(body); err != nil {
slog.Warn("worker_health_write_failed", "error", err)
}
}

View File

@@ -0,0 +1,52 @@
services:
whisper-large-v3:
build:
context: .
dockerfile: vllm-audio.Dockerfile
image: vllm-audio:local
container_name: whisper-large-v3
profiles:
- whisper-large-v3
restart: unless-stopped
ipc: host
runtime: nvidia
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
environment:
HUGGING_FACE_HUB_TOKEN: ${HF_TOKEN}
VLLM_API_KEY: ${VLLM_API_KEY}
HF_HOME: /cache
volumes:
- ./data/vllm-cache:/cache
networks:
- audio-models
ports:
- "10.2.3.5:8004:8000"
command:
- "--model"
- "openai/whisper-large-v3"
- "--served-model-name"
- "openai/whisper-large-v3"
- "--host"
- "0.0.0.0"
- "--port"
- "8000"
- "--gpu-memory-utilization"
- "0.55"
- "--api-key"
- "${VLLM_API_KEY}"
healthcheck:
test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 30s
timeout: 5s
retries: 5
start_period: 1200s
networks:
audio-models:
driver: bridge

View File

@@ -0,0 +1,3 @@
FROM vllm/vllm-openai:latest
RUN python3 -m pip install --no-cache-dir av soundfile librosa

View File

@@ -18,14 +18,17 @@ type Config struct {
LLMAPIKey string
LLMModel string
LLMTimeout time.Duration
WhisperXURL string
WhisperXTimeout time.Duration
WhisperXLeadSilence time.Duration
FfmpegPath string
AudioBaseURL string
AudioAPIKey string
AudioModel string
AudioTimeout time.Duration
AudioPrompt string
AIStatsSidecarURL string
AIStatsTimeout time.Duration
WorkerID string
WorkerHTTPHost string
WorkerHTTPPort int
WorkerPollInterval time.Duration
WorkerClaimLimit int
WorkerLeaseTimeout time.Duration
@@ -45,14 +48,17 @@ func Load() Config {
LLMAPIKey: envString("LLM_API_KEY", ""),
LLMModel: envString("LLM_MODEL", "qwen2.5-14b"),
LLMTimeout: envDuration("LLM_TIMEOUT", 5*time.Minute),
WhisperXURL: envString("WHISPERX_URL", ""),
WhisperXTimeout: envDuration("WHISPERX_TIMEOUT", 10*time.Minute),
WhisperXLeadSilence: envDuration("WHISPERX_LEAD_SILENCE", 800*time.Millisecond),
FfmpegPath: envString("FFMPEG_PATH", "/usr/bin/ffmpeg"),
AudioBaseURL: envString("AUDIO_TRANSCRIPTION_BASE_URL", ""),
AudioAPIKey: envString("AUDIO_TRANSCRIPTION_API_KEY", ""),
AudioModel: envString("AUDIO_TRANSCRIPTION_MODEL", "openai/whisper-large-v3"),
AudioTimeout: envDuration("AUDIO_TRANSCRIPTION_TIMEOUT", 10*time.Minute),
AudioPrompt: envString("AUDIO_TRANSCRIPTION_PROMPT", defaultAudioPrompt()),
AIStatsSidecarURL: envString("AI_STATS_SIDECAR_URL", ""),
AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second),
WorkerID: envString("WORKER_ID", hostname()),
WorkerHTTPHost: envString("WORKER_HTTP_HOST", "0.0.0.0"),
WorkerHTTPPort: envInt("WORKER_HTTP_PORT", 8081),
WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second),
WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4),
WorkerLeaseTimeout: envDuration("WORKER_LEASE_TIMEOUT", 15*time.Minute),
@@ -119,6 +125,10 @@ func envCSV(key string) []string {
return out
}
func defaultAudioPrompt() string {
return ""
}
func hostname() string {
h, err := os.Hostname()
if err != nil || h == "" {

View File

@@ -5,6 +5,7 @@ import (
"time"
"ai-service/internal/model"
"ai-service/internal/transcription"
)
type dashboardResponse struct {
@@ -13,7 +14,7 @@ type dashboardResponse struct {
Stats *model.Stats `json:"stats"`
Providers providersStatusResponse `json:"providers"`
Infra infraStatusResponse `json:"infra"`
Jobs []*model.Job `json:"jobs"`
Jobs []*model.JobSummary `json:"jobs"`
}
type dashboardSummary struct {
@@ -30,12 +31,12 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
ctx, cancel := contextWithTimeout(r, 12*time.Second)
defer cancel()
stats, err := s.store.Stats(ctx)
stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
jobs, err := s.store.ListJobs(ctx, model.JobFilter{
jobs, err := s.store.ListJobSummaries(ctx, model.JobFilter{
Statuses: []string{model.StatusFailed, model.StatusRunning},
Limit: 40,
})
@@ -43,7 +44,6 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
resp := dashboardResponse{
At: now,
Summary: summarizeQueues(stats),
@@ -52,7 +52,7 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
At: now,
Providers: []providerStatus{
s.checkLLM(ctx),
s.checkWhisperX(ctx),
s.checkAudioLLM(ctx, transcription.ProviderWhisperLargeV3, s.cfg.AudioBaseURL, s.cfg.AudioAPIKey, s.cfg.AudioModel, s.cfg.AudioTimeout),
},
},
Infra: loadInfraSnapshot(r, s.cfg),

241
internal/httpapi/health.go Normal file
View File

@@ -0,0 +1,241 @@
package httpapi
import (
"context"
"net/http"
"strings"
"time"
"ai-service/internal/model"
"ai-service/internal/transcription"
)
type healthDetailResponse struct {
Status string `json:"status"`
Generated time.Time `json:"generated_at"`
Components []healthComponent `json:"components"`
}
type healthComponent struct {
Name string `json:"name"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
Data map[string]any `json:"data,omitempty"`
}
func (s *Server) handleHealthDetail(w http.ResponseWriter, r *http.Request) {
ctx, cancel := contextWithTimeout(r, 12*time.Second)
defer cancel()
resp := healthDetailResponse{
Status: "ok",
Generated: time.Now().UTC(),
}
if err := s.store.Ping(ctx); err != nil {
resp.Components = append(resp.Components, healthComponent{
Name: "postgres",
Status: "down",
Error: err.Error(),
})
resp.Status = worseHealthStatus(resp.Status, "down")
writeJSON(w, http.StatusServiceUnavailable, resp)
return
}
resp.Components = append(resp.Components, healthComponent{Name: "postgres", Status: "ok"})
stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
if err != nil {
resp.Components = append(resp.Components, healthComponent{
Name: "queue",
Status: "down",
Error: err.Error(),
})
resp.Status = worseHealthStatus(resp.Status, "down")
writeJSON(w, http.StatusServiceUnavailable, resp)
return
}
for _, component := range []healthComponent{
s.healthProviders(ctx),
healthQueue(stats),
healthErrors(stats),
healthThroughput(stats),
healthInfra(loadInfraSnapshot(r, s.cfg)),
} {
resp.Components = append(resp.Components, component)
resp.Status = worseHealthStatus(resp.Status, component.Status)
}
statusCode := http.StatusOK
if resp.Status == "down" {
statusCode = http.StatusServiceUnavailable
}
writeJSON(w, statusCode, resp)
}
func (s *Server) healthProviders(ctx context.Context) healthComponent {
providers := []providerStatus{
s.checkLLM(ctx),
s.checkAudioLLM(ctx, transcription.ProviderWhisperLargeV3, s.cfg.AudioBaseURL, s.cfg.AudioAPIKey, s.cfg.AudioModel, s.cfg.AudioTimeout),
}
status := "ok"
messages := make([]string, 0)
for _, provider := range providers {
switch {
case !provider.Configured:
status = worseHealthStatus(status, "degraded")
messages = append(messages, provider.Name+" not configured")
case !provider.OK:
status = worseHealthStatus(status, "down")
if provider.Error != "" {
messages = append(messages, provider.Name+": "+provider.Error)
} else {
messages = append(messages, provider.Name+" unavailable")
}
case provider.Stale:
status = worseHealthStatus(status, "degraded")
if provider.Error != "" {
messages = append(messages, provider.Name+": "+provider.Error)
}
}
}
return healthComponent{
Name: "providers",
Status: status,
Error: strings.Join(messages, "; "),
Data: map[string]any{
"providers": providers,
},
}
}
func healthQueue(stats *model.Stats) healthComponent {
var pending, running, staleRunning int64
var oldestPendingAgeSeconds, oldestRunningAgeSeconds int64
for _, row := range stats.Backlog {
pending += row.Pending
running += row.Running
staleRunning += row.StaleRunning
if row.OldestPendingAgeSeconds > oldestPendingAgeSeconds {
oldestPendingAgeSeconds = row.OldestPendingAgeSeconds
}
if row.OldestRunningAgeSeconds > oldestRunningAgeSeconds {
oldestRunningAgeSeconds = row.OldestRunningAgeSeconds
}
}
status := "ok"
message := ""
if staleRunning > 0 {
status = "degraded"
message = "there are stale running jobs"
}
return healthComponent{
Name: "queue",
Status: status,
Error: message,
Data: map[string]any{
"pending": pending,
"running": running,
"stale_running": staleRunning,
"oldest_pending_age_seconds": oldestPendingAgeSeconds,
"oldest_running_age_seconds": oldestRunningAgeSeconds,
"backlog": stats.Backlog,
"queue_status_totals": stats.Queues,
"owner_status_totals": stats.Owners,
},
}
}
func healthErrors(stats *model.Stats) healthComponent {
var failedTotal, failed24h int64
for _, row := range stats.Errors {
failedTotal += row.Total
failed24h += row.Last24h
}
status := "ok"
message := ""
if failed24h > 0 {
status = "degraded"
message = "there are failed jobs in the last 24 hours"
}
return healthComponent{
Name: "errors",
Status: status,
Error: message,
Data: map[string]any{
"failed_total": failedTotal,
"failed_24h": failed24h,
"by_code": stats.Errors,
},
}
}
func healthThroughput(stats *model.Stats) healthComponent {
var done24h, retried24h int64
for _, row := range stats.Stages {
done24h += row.Done24h
retried24h += row.Retried24h
}
pendingByStage := make(map[string]int64)
for _, row := range stats.Backlog {
pendingByStage[row.TaskType+"|"+row.ModelProfile] += row.Pending + row.Running
}
doneByStage := make(map[string]int64)
for _, row := range stats.Stages {
doneByStage[row.TaskType+"|"+row.ModelProfile] += row.Done24h
}
stuckStages := make([]string, 0)
for key, total := range pendingByStage {
if total > 0 && doneByStage[key] == 0 {
stuckStages = append(stuckStages, key)
}
}
status := "ok"
message := ""
if len(stuckStages) > 0 {
status = "degraded"
message = "some active queues have no completed jobs in the last 24 hours"
}
return healthComponent{
Name: "throughput",
Status: status,
Error: message,
Data: map[string]any{
"done_24h": done24h,
"retried_24h": retried24h,
"stuck_stages": stuckStages,
"stages": stats.Stages,
},
}
}
func healthInfra(infra infraStatusResponse) healthComponent {
status := "ok"
message := ""
if infra.SidecarError != "" {
status = "degraded"
message = infra.SidecarError
}
return healthComponent{
Name: "infra",
Status: status,
Error: message,
Data: map[string]any{
"sidecar": infra.Sidecar,
},
}
}
func worseHealthStatus(current, next string) string {
if current == "down" || next == "down" {
return "down"
}
if current == "degraded" || next == "degraded" {
return "degraded"
}
return "ok"
}

View File

@@ -8,6 +8,8 @@ import (
"net/http"
"strings"
"time"
"ai-service/internal/transcription"
)
type providerStatus struct {
@@ -42,12 +44,58 @@ func (s *Server) handleProviderStatus(w http.ResponseWriter, r *http.Request) {
At: time.Now().UTC(),
Providers: []providerStatus{
s.checkLLM(ctx),
s.checkWhisperX(ctx),
s.checkAudioLLM(ctx, transcription.ProviderWhisperLargeV3, s.cfg.AudioBaseURL, s.cfg.AudioAPIKey, s.cfg.AudioModel, s.cfg.AudioTimeout),
},
}
writeJSON(w, http.StatusOK, resp)
}
func (s *Server) checkAudioLLM(ctx context.Context, name, baseURL, apiKey, model string, timeout time.Duration) providerStatus {
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
st := providerStatus{
Name: name,
Configured: baseURL != "",
URL: baseURL,
Model: model,
}
if !st.Configured {
return st
}
if timeout <= 0 {
timeout = 10 * time.Minute
}
start := time.Now()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, st.URL+"/v1/models", nil)
if err != nil {
st.Error = err.Error()
return st
}
if apiKey != "" {
req.Header.Set("Authorization", "Bearer "+apiKey)
}
res, err := (&http.Client{Timeout: minDuration(timeout, 3*time.Second)}).Do(req)
st.LatencyMS = time.Since(start).Milliseconds()
if err != nil {
st.Error = err.Error()
return s.withStaleProviderOK(name, st)
}
defer res.Body.Close()
if res.StatusCode >= 300 {
st.Error = fmt.Sprintf("http %d: %s", res.StatusCode, readSmallBody(res.Body))
return s.withStaleProviderOK(name, st)
}
st.OK = true
s.rememberProviderOK(name, st.LatencyMS)
return st
}
func minDuration(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}
func (s *Server) checkLLM(ctx context.Context) providerStatus {
st := providerStatus{
Name: "llm",
@@ -84,47 +132,6 @@ func (s *Server) checkLLM(ctx context.Context) providerStatus {
return st
}
func (s *Server) checkWhisperX(ctx context.Context) providerStatus {
baseURL := strings.TrimRight(strings.TrimSpace(s.cfg.WhisperXURL), "/")
st := providerStatus{Name: "whisperx", Configured: baseURL != "", URL: baseURL}
if !st.Configured {
return st
}
paths := []string{"/health", "/healthz", "/readyz", "/"}
var lastErr string
for _, path := range paths {
cctx, cancel := context.WithTimeout(ctx, 2*time.Second)
start := time.Now()
req, err := http.NewRequestWithContext(cctx, http.MethodGet, baseURL+path, nil)
if err != nil {
cancel()
lastErr = err.Error()
continue
}
res, err := (&http.Client{Timeout: 2 * time.Second}).Do(req)
st.LatencyMS = time.Since(start).Milliseconds()
cancel()
if err != nil {
lastErr = err.Error()
continue
}
body := ""
if res.StatusCode >= 300 {
body = readSmallBody(res.Body)
}
_ = res.Body.Close()
if res.StatusCode >= 300 {
lastErr = fmt.Sprintf("%s http %d: %s", path, res.StatusCode, body)
continue
}
st.OK = true
s.rememberProviderOK("whisperx", st.LatencyMS)
return st
}
st.Error = lastErr
return s.withStaleProviderOK("whisperx", st)
}
func (s *Server) rememberProviderOK(name string, latencyMS int64) {
s.providerMu.Lock()
defer s.providerMu.Unlock()

View File

@@ -41,6 +41,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
case r.Method == http.MethodGet && path == "/readyz":
s.handleReady(w, r)
case r.Method == http.MethodGet && path == "/health/detail":
s.handleHealthDetail(w, r)
case r.Method == http.MethodGet && path == "/":
writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"})
case r.Method == http.MethodPost && path == "/api/v1/jobs":
@@ -59,6 +61,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handleGetJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"):
s.handleRetryJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/cancel"):
s.handleCancelJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"):
s.handleCompleteJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"):
@@ -135,6 +139,8 @@ type createBatchResponse struct {
Jobs []*model.Job `json:"jobs"`
}
const maxCreateBatchJobs = 1000
func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
var req createBatchRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
@@ -145,17 +151,21 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusBadRequest, "jobs is required")
return
}
if len(req.Jobs) > maxCreateBatchJobs {
writeError(w, http.StatusBadRequest, fmt.Sprintf("jobs limit is %d", maxCreateBatchJobs))
return
}
ctx, cancel := contextWithTimeout(r, 20*time.Second)
defer cancel()
out := createBatchResponse{Jobs: make([]*model.Job, 0, len(req.Jobs))}
items := make([]model.CreateJob, 0, len(req.Jobs))
for _, item := range req.Jobs {
if item.OwnerService == "" {
if strings.TrimSpace(item.OwnerService) == "" {
item.OwnerService = req.OwnerService
}
if item.TaskType == "" {
if strings.TrimSpace(item.TaskType) == "" {
item.TaskType = req.TaskType
}
if item.ModelProfile == "" {
if strings.TrimSpace(item.ModelProfile) == "" {
item.ModelProfile = req.ModelProfile
}
if item.Priority == 0 {
@@ -164,7 +174,9 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
if item.MaxAttempts == 0 {
item.MaxAttempts = req.MaxAttempts
}
job, err := s.store.CreateJob(ctx, item)
items = append(items, item)
}
jobs, err := s.store.CreateJobs(ctx, items)
if err != nil {
status := http.StatusInternalServerError
if isValidationError(err) {
@@ -173,9 +185,7 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
writeError(w, status, err.Error())
return
}
out.Jobs = append(out.Jobs, job)
}
writeJSON(w, http.StatusCreated, out)
writeJSON(w, http.StatusCreated, createBatchResponse{Jobs: jobs})
}
func (s *Server) handleRetryJobs(w http.ResponseWriter, r *http.Request) {
@@ -259,7 +269,7 @@ func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request, path strin
}
func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) {
id, err := jobIDFromPath(path, true)
id, err := jobIDFromActionPath(path, "retry")
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
@@ -278,6 +288,26 @@ func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path str
writeJSON(w, http.StatusOK, job)
}
func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request, path string) {
id, err := jobIDFromActionPath(path, "cancel")
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
ctx, cancel := contextWithTimeout(r, 8*time.Second)
defer cancel()
job, err := s.store.CancelJob(ctx, id)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if job == nil {
writeError(w, http.StatusNotFound, "cancellable job not found")
return
}
writeJSON(w, http.StatusOK, job)
}
func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) {
id, err := jobIDFromActionPath(path, "complete")
if err != nil {
@@ -331,7 +361,7 @@ func (s *Server) handleFailJob(w http.ResponseWriter, r *http.Request, path stri
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
ctx, cancel := contextWithTimeout(r, 8*time.Second)
defer cancel()
stats, err := s.store.Stats(ctx)
stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return

View File

@@ -38,7 +38,10 @@ type Usage struct {
TotalTokens int `json:"total_tokens"`
}
const ChatResultSchemaVersion = "ai.chat_result.v1"
type ChatResult struct {
SchemaVersion string `json:"schema_version"`
Content string `json:"content"`
Model string `json:"model"`
Usage *Usage `json:"usage,omitempty"`
@@ -137,6 +140,7 @@ func (c *Client) Chat(ctx context.Context, in ChatInput) (*ChatResult, error) {
modelName = c.model
}
return &ChatResult{
SchemaVersion: ChatResultSchemaVersion,
Content: out.Choices[0].Message.Content,
Model: modelName,
Usage: out.Usage,

View File

@@ -0,0 +1,43 @@
package llm
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
)
func TestChatResultIncludesSchemaVersion(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/v1/chat/completions" {
t.Fatalf("path = %q, want /v1/chat/completions", r.URL.Path)
}
_ = json.NewEncoder(w).Encode(map[string]any{
"model": "qwen2.5-14b",
"choices": []map[string]any{
{"message": map[string]string{"role": "assistant", "content": `{"ok":true}`}},
},
"usage": map[string]int{
"prompt_tokens": 10,
"completion_tokens": 2,
"total_tokens": 12,
},
})
}))
defer server.Close()
client := New(server.URL, "", "fallback-model", 0)
got, err := client.Chat(t.Context(), ChatInput{User: "test", MaxTokens: 32})
if err != nil {
t.Fatalf("Chat: %v", err)
}
if got.SchemaVersion != ChatResultSchemaVersion {
t.Fatalf("schema_version = %q, want %q", got.SchemaVersion, ChatResultSchemaVersion)
}
if got.Content != `{"ok":true}` {
t.Fatalf("content = %q", got.Content)
}
if got.Usage == nil || got.Usage.TotalTokens != 12 {
t.Fatalf("usage = %#v", got.Usage)
}
}

View File

@@ -0,0 +1,13 @@
CREATE INDEX IF NOT EXISTS ai_jobs_pending_claim_scope_idx
ON ai_jobs (task_type, model_profile, priority DESC, scheduled_at DESC, created_at DESC)
WHERE status = 'pending';
CREATE INDEX IF NOT EXISTS ai_jobs_running_lease_idx
ON ai_jobs ((COALESCE(heartbeat_at, started_at, updated_at)))
WHERE status = 'running';
CREATE INDEX IF NOT EXISTS ai_jobs_queue_stats_idx
ON ai_jobs (task_type, model_profile, status);
CREATE INDEX IF NOT EXISTS ai_jobs_owner_stats_idx
ON ai_jobs (owner_service, task_type, model_profile, status);

View File

@@ -0,0 +1,2 @@
CREATE INDEX IF NOT EXISTS ai_jobs_recent_status_idx
ON ai_jobs (status, updated_at DESC, created_at DESC);

View File

@@ -0,0 +1,3 @@
CREATE INDEX IF NOT EXISTS ai_jobs_owner_error_stats_idx
ON ai_jobs (owner_service, task_type, model_profile, error_code, updated_at DESC)
WHERE status = 'failed';

View File

@@ -39,6 +39,27 @@ type Job struct {
IdempotencyKey *string `json:"idempotency_key,omitempty"`
}
type JobSummary struct {
ID uuid.UUID `json:"id"`
OwnerService string `json:"owner_service"`
OwnerRef string `json:"owner_ref"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Priority int `json:"priority"`
Status string `json:"status"`
Attempts int `json:"attempts"`
MaxAttempts int `json:"max_attempts"`
ErrorCode *string `json:"error_code,omitempty"`
ErrorMessage *string `json:"error_message,omitempty"`
ScheduledAt time.Time `json:"scheduled_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
WorkerID *string `json:"worker_id,omitempty"`
HeartbeatAt *time.Time `json:"heartbeat_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type CreateJob struct {
OwnerService string `json:"owner_service"`
OwnerRef string `json:"owner_ref"`
@@ -90,6 +111,7 @@ type QueueStat struct {
}
type ErrorStat struct {
OwnerService string `json:"owner_service,omitempty"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
ErrorCode string `json:"error_code"`
@@ -97,6 +119,30 @@ type ErrorStat struct {
Last24h int64 `json:"last_24h"`
}
type StageStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Done24h int64 `json:"done_24h"`
AvgDurationSeconds int64 `json:"avg_duration_seconds"`
AvgAttempts int64 `json:"avg_attempts"`
Retried24h int64 `json:"retried_24h"`
}
type BacklogStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Pending int64 `json:"pending"`
Running int64 `json:"running"`
StaleRunning int64 `json:"stale_running"`
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
OldestPendingScheduledAt string `json:"oldest_pending_scheduled_at,omitempty"`
OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"`
OldestRunningStartedAt string `json:"oldest_running_started_at,omitempty"`
LastHeartbeatAt string `json:"last_heartbeat_at,omitempty"`
}
type OwnerStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
@@ -110,4 +156,6 @@ type Stats struct {
Queues []QueueStat `json:"queues"`
Owners []OwnerStat `json:"owners,omitempty"`
Errors []ErrorStat `json:"errors,omitempty"`
Stages []StageStat `json:"stages,omitempty"`
Backlog []BacklogStat `json:"backlog,omitempty"`
}

View File

@@ -0,0 +1,22 @@
package store
import (
"strings"
"time"
)
type failRetryPolicy struct {
Retryable bool
Delay time.Duration
}
func retryPolicyForError(errorCode string) failRetryPolicy {
switch strings.TrimSpace(errorCode) {
case "provider_unavailable", "model_unavailable", "provider_error", "dependency_error", "timeout", "storage_error", "stale_worker":
return failRetryPolicy{Retryable: true, Delay: 30 * time.Second}
case "bad_response", "transcript_hallucination", "transcript_incomplete", "internal_error", "unknown":
return failRetryPolicy{Retryable: true, Delay: 2 * time.Minute}
default:
return failRetryPolicy{}
}
}

View File

@@ -0,0 +1,45 @@
package store
import (
"testing"
"time"
)
func TestRetryPolicyForError(t *testing.T) {
tests := []struct {
name string
code string
retryable bool
delay time.Duration
}{
{name: "provider unavailable", code: "provider_unavailable", retryable: true, delay: 30 * time.Second},
{name: "model unavailable", code: "model_unavailable", retryable: true, delay: 30 * time.Second},
{name: "provider error", code: "provider_error", retryable: true, delay: 30 * time.Second},
{name: "dependency error", code: "dependency_error", retryable: true, delay: 30 * time.Second},
{name: "timeout", code: "timeout", retryable: true, delay: 30 * time.Second},
{name: "storage", code: "storage_error", retryable: true, delay: 30 * time.Second},
{name: "stale worker", code: "stale_worker", retryable: true, delay: 30 * time.Second},
{name: "bad response", code: "bad_response", retryable: true, delay: 2 * time.Minute},
{name: "transcript hallucination", code: "transcript_hallucination", retryable: true, delay: 2 * time.Minute},
{name: "transcript incomplete", code: "transcript_incomplete", retryable: true, delay: 2 * time.Minute},
{name: "internal error", code: "internal_error", retryable: true, delay: 2 * time.Minute},
{name: "unknown", code: "unknown", retryable: true, delay: 2 * time.Minute},
{name: "bad audio", code: "bad_audio"},
{name: "bad input", code: "bad_input"},
{name: "context length", code: "context_length"},
{name: "unsupported task", code: "unsupported_task"},
{name: "cancelled", code: "cancelled"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := retryPolicyForError(tt.code)
if got.Retryable != tt.retryable {
t.Fatalf("Retryable = %v, want %v", got.Retryable, tt.retryable)
}
if got.Delay != tt.delay {
t.Fatalf("Delay = %s, want %s", got.Delay, tt.delay)
}
})
}
}

View File

@@ -41,17 +41,48 @@ func Open(ctx context.Context, databaseURL string) (*Store, error) {
if err != nil {
return nil, fmt.Errorf("parse database url: %w", err)
}
pool, err := pgxpool.NewWithConfig(ctx, cfg)
pool, err := connectWithRetry(ctx, cfg, 2*time.Minute)
if err != nil {
return nil, fmt.Errorf("connect postgres: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("ping postgres: %w", err)
return nil, err
}
return &Store{pool: pool}, nil
}
func connectWithRetry(ctx context.Context, cfg *pgxpool.Config, maxWait time.Duration) (*pgxpool.Pool, error) {
deadline := time.Now().Add(maxWait)
var lastErr error
for attempt := 1; ; attempt++ {
pool, err := pgxpool.NewWithConfig(ctx, cfg)
if err == nil {
if pingErr := pool.Ping(ctx); pingErr == nil {
return pool, nil
} else {
err = fmt.Errorf("ping postgres: %w", pingErr)
pool.Close()
}
} else {
err = fmt.Errorf("connect postgres: %w", err)
}
lastErr = err
if time.Now().After(deadline) {
return nil, fmt.Errorf("connect postgres after retry: %w", lastErr)
}
sleep := time.Duration(attempt) * time.Second
if sleep > 5*time.Second {
sleep = 5 * time.Second
}
timer := time.NewTimer(sleep)
select {
case <-ctx.Done():
timer.Stop()
return nil, fmt.Errorf("connect postgres cancelled: %w", ctx.Err())
case <-timer.C:
}
}
}
func (s *Store) Close() {
s.pool.Close()
}
@@ -69,16 +100,7 @@ func (s *Store) CreateJob(ctx context.Context, in model.CreateJob) (*model.Job,
if err := validateCreateJob(in); err != nil {
return nil, err
}
if in.MaxAttempts <= 0 {
in.MaxAttempts = 3
}
if len(in.Input) == 0 {
in.Input = json.RawMessage(`{}`)
}
scheduledAt := time.Now().UTC()
if in.ScheduledAt != nil {
scheduledAt = in.ScheduledAt.UTC()
}
normalizeCreateJob(&in)
const q = `
INSERT INTO ai_jobs (
@@ -87,7 +109,33 @@ INSERT INTO ai_jobs (
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL
DO UPDATE SET updated_at = ai_jobs.updated_at
DO UPDATE SET
status = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN 'pending' ELSE ai_jobs.status END,
priority = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.priority ELSE ai_jobs.priority END,
max_attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.max_attempts ELSE ai_jobs.max_attempts END,
attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN 0 ELSE ai_jobs.attempts END,
input = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.input ELSE ai_jobs.input END,
scheduled_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.scheduled_at ELSE ai_jobs.scheduled_at END,
started_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.started_at END,
completed_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.completed_at END,
worker_id = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.worker_id END,
heartbeat_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.heartbeat_at END,
error_code = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.error_code END,
error_message = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.error_message END,
updated_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NOW() ELSE ai_jobs.updated_at END
RETURNING ` + jobSelectColumns + `
`
row := s.pool.QueryRow(ctx, q,
@@ -98,12 +146,107 @@ RETURNING ` + jobSelectColumns + `
in.Priority,
in.MaxAttempts,
in.Input,
scheduledAt,
*in.ScheduledAt,
in.IdempotencyKey,
)
return scanJob(row)
}
func (s *Store) CreateJobs(ctx context.Context, items []model.CreateJob) ([]*model.Job, error) {
if len(items) == 0 {
return []*model.Job{}, nil
}
const q = `
INSERT INTO ai_jobs (
owner_service, owner_ref, task_type, model_profile, priority, max_attempts,
input, scheduled_at, idempotency_key
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL
DO UPDATE SET
status = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN 'pending' ELSE ai_jobs.status END,
priority = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.priority ELSE ai_jobs.priority END,
max_attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.max_attempts ELSE ai_jobs.max_attempts END,
attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN 0 ELSE ai_jobs.attempts END,
input = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.input ELSE ai_jobs.input END,
scheduled_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.scheduled_at ELSE ai_jobs.scheduled_at END,
started_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.started_at END,
completed_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.completed_at END,
worker_id = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.worker_id END,
heartbeat_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.heartbeat_at END,
error_code = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.error_code END,
error_message = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.error_message END,
updated_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NOW() ELSE ai_jobs.updated_at END
RETURNING ` + jobSelectColumns + `
`
var batch pgx.Batch
for i := range items {
if err := validateCreateJob(items[i]); err != nil {
return nil, err
}
normalizeCreateJob(&items[i])
batch.Queue(q,
items[i].OwnerService,
items[i].OwnerRef,
items[i].TaskType,
items[i].ModelProfile,
items[i].Priority,
items[i].MaxAttempts,
items[i].Input,
*items[i].ScheduledAt,
items[i].IdempotencyKey,
)
}
br := s.pool.SendBatch(ctx, &batch)
batchClosed := false
defer func() {
if !batchClosed {
_ = br.Close()
}
}()
out := make([]*model.Job, 0, len(items))
for range items {
job, err := scanJob(br.QueryRow())
if err != nil {
return nil, err
}
out = append(out, job)
}
err := br.Close()
batchClosed = true
if err != nil {
return nil, err
}
return out, nil
}
func normalizeCreateJob(in *model.CreateJob) {
if in.MaxAttempts <= 0 {
in.MaxAttempts = 3
}
if len(in.Input) == 0 {
in.Input = json.RawMessage(`{}`)
}
scheduledAt := time.Now().UTC()
if in.ScheduledAt != nil {
scheduledAt = in.ScheduledAt.UTC()
}
in.ScheduledAt = &scheduledAt
}
func validateCreateJob(in model.CreateJob) error {
switch {
case strings.TrimSpace(in.OwnerService) == "":
@@ -174,6 +317,48 @@ LIMIT $7 OFFSET $8
return out, rows.Err()
}
func (s *Store) ListJobSummaries(ctx context.Context, filter model.JobFilter) ([]*model.JobSummary, error) {
normalizeFilter(&filter)
const q = `
SELECT id, owner_service, owner_ref, task_type, model_profile, priority, status,
attempts, max_attempts, error_code, error_message,
scheduled_at, started_at, completed_at, worker_id, heartbeat_at,
created_at, updated_at
FROM ai_jobs
WHERE ($1 = '' OR owner_service = $1)
AND ($2 = '' OR owner_ref = $2)
AND ($3 = '' OR task_type = $3)
AND ($4 = '' OR model_profile = $4)
AND (cardinality($5::text[]) = 0 OR status = ANY($5::text[]))
AND (cardinality($6::text[]) = 0 OR COALESCE(NULLIF(error_code, ''), 'unknown') = ANY($6::text[]))
ORDER BY updated_at DESC, created_at DESC
LIMIT $7 OFFSET $8
`
rows, err := s.pool.Query(ctx, q,
filter.OwnerService,
filter.OwnerRef,
filter.TaskType,
filter.ModelProfile,
filter.Statuses,
filter.ErrorCodes,
filter.Limit,
filter.Offset,
)
if err != nil {
return nil, err
}
defer rows.Close()
var out []*model.JobSummary
for rows.Next() {
job, err := scanJobSummary(rows)
if err != nil {
return nil, err
}
out = append(out, job)
}
return out, rows.Err()
}
func normalizeFilter(filter *model.JobFilter) {
filter.OwnerService = strings.TrimSpace(filter.OwnerService)
filter.OwnerRef = strings.TrimSpace(filter.OwnerRef)
@@ -266,6 +451,28 @@ WHERE j.id = picked.id
return int(tag.RowsAffected()), nil
}
func (s *Store) CancelJob(ctx context.Context, id uuid.UUID) (*model.Job, error) {
const q = `
UPDATE ai_jobs
SET status = 'cancelled',
completed_at = NOW(),
worker_id = NULL,
heartbeat_at = NULL,
updated_at = NOW()
WHERE id = $1
AND status IN ('pending', 'running')
RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, status,
attempts, max_attempts, input, result, error_code, error_message,
scheduled_at, started_at, completed_at, worker_id, heartbeat_at,
created_at, updated_at, idempotency_key
`
job, err := scanJob(s.pool.QueryRow(ctx, q, id))
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
return job, err
}
func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) {
normalizeFilter(&filter)
const q = `
@@ -388,25 +595,47 @@ RETURNING ` + jobSelectColumns + `
return job, err
}
func (s *Store) HeartbeatJob(ctx context.Context, id uuid.UUID) error {
const q = `
UPDATE ai_jobs
SET heartbeat_at = NOW(),
updated_at = NOW()
WHERE id = $1
AND status = 'running'
`
tag, err := s.pool.Exec(ctx, q, id)
if err != nil {
return err
}
if tag.RowsAffected() == 0 {
return nil
}
return nil
}
func (s *Store) FailJob(ctx context.Context, id uuid.UUID, in model.FailJob) (*model.Job, error) {
errorCode := strings.TrimSpace(in.ErrorCode)
if errorCode == "" {
errorCode = "unknown"
}
errorMessage := strings.TrimSpace(in.ErrorMessage)
policy := retryPolicyForError(errorCode)
const q = `
UPDATE ai_jobs
SET status = 'failed',
SET status = CASE WHEN $4 AND attempts < max_attempts THEN 'pending' ELSE 'failed' END,
error_code = $2,
error_message = $3,
completed_at = NOW(),
heartbeat_at = NOW(),
scheduled_at = CASE WHEN $4 AND attempts < max_attempts THEN NOW() + make_interval(secs => $5) ELSE scheduled_at END,
started_at = CASE WHEN $4 AND attempts < max_attempts THEN NULL ELSE started_at END,
completed_at = CASE WHEN $4 AND attempts < max_attempts THEN NULL ELSE NOW() END,
worker_id = NULL,
heartbeat_at = CASE WHEN $4 AND attempts < max_attempts THEN NULL ELSE NOW() END,
updated_at = NOW()
WHERE id = $1
AND status = 'running'
RETURNING ` + jobSelectColumns + `
`
job, err := scanJob(s.pool.QueryRow(ctx, q, id, errorCode, errorMessage))
job, err := scanJob(s.pool.QueryRow(ctx, q, id, errorCode, errorMessage, policy.Retryable, int(policy.Delay.Seconds())))
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
@@ -451,7 +680,10 @@ WHERE j.id = picked.id
return int(tag.RowsAffected()), nil
}
func (s *Store) Stats(ctx context.Context) (*model.Stats, error) {
func (s *Store) Stats(ctx context.Context, staleAfter time.Duration) (*model.Stats, error) {
if staleAfter <= 0 {
staleAfter = 15 * time.Minute
}
out := &model.Stats{At: time.Now().UTC()}
queueRows, err := s.pool.Query(ctx, `
@@ -497,13 +729,13 @@ ORDER BY owner_service, task_type, model_profile, status
}
errorRows, err := s.pool.Query(ctx, `
SELECT task_type, model_profile, COALESCE(NULLIF(error_code, ''), 'unknown') AS error_code,
SELECT owner_service, task_type, model_profile, COALESCE(NULLIF(error_code, ''), 'unknown') AS error_code,
count(*) AS total,
count(*) FILTER (WHERE updated_at > NOW() - INTERVAL '24 hours') AS last_24h
FROM ai_jobs
WHERE status = 'failed'
GROUP BY task_type, model_profile, COALESCE(NULLIF(error_code, ''), 'unknown')
ORDER BY last_24h DESC, total DESC
GROUP BY owner_service, task_type, model_profile, COALESCE(NULLIF(error_code, ''), 'unknown')
ORDER BY owner_service, last_24h DESC, total DESC
`)
if err != nil {
return nil, err
@@ -511,12 +743,138 @@ ORDER BY last_24h DESC, total DESC
defer errorRows.Close()
for errorRows.Next() {
var stat model.ErrorStat
if err := errorRows.Scan(&stat.TaskType, &stat.ModelProfile, &stat.ErrorCode, &stat.Total, &stat.Last24h); err != nil {
if err := errorRows.Scan(&stat.OwnerService, &stat.TaskType, &stat.ModelProfile, &stat.ErrorCode, &stat.Total, &stat.Last24h); err != nil {
return nil, err
}
out.Errors = append(out.Errors, stat)
}
return out, errorRows.Err()
if err := errorRows.Err(); err != nil {
return nil, err
}
stageRows, err := s.pool.Query(ctx, `
SELECT owner_service,
task_type,
model_profile,
count(*) AS done_24h,
COALESCE(ROUND(AVG(EXTRACT(EPOCH FROM (completed_at - started_at))))::bigint, 0) AS avg_duration_seconds,
COALESCE(ROUND(AVG(attempts))::bigint, 0) AS avg_attempts,
count(*) FILTER (WHERE attempts > 1) AS retried_24h
FROM ai_jobs
WHERE status = 'done'
AND started_at IS NOT NULL
AND completed_at IS NOT NULL
AND completed_at > NOW() - INTERVAL '24 hours'
GROUP BY owner_service, task_type, model_profile
ORDER BY owner_service, task_type, model_profile
`)
if err != nil {
return nil, err
}
defer stageRows.Close()
for stageRows.Next() {
var stat model.StageStat
if err := stageRows.Scan(
&stat.OwnerService,
&stat.TaskType,
&stat.ModelProfile,
&stat.Done24h,
&stat.AvgDurationSeconds,
&stat.AvgAttempts,
&stat.Retried24h,
); err != nil {
return nil, err
}
out.Stages = append(out.Stages, stat)
}
if err := stageRows.Err(); err != nil {
return nil, err
}
backlogRows, err := s.pool.Query(ctx, `
SELECT owner_service,
task_type,
model_profile,
count(*) FILTER (WHERE status = 'pending') AS pending,
count(*) FILTER (WHERE status = 'running') AS running,
count(*) FILTER (
WHERE status = 'running'
AND COALESCE(heartbeat_at, started_at, updated_at) < NOW() - make_interval(secs => $1)
) AS stale_running,
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(scheduled_at) FILTER (WHERE status = 'pending')))::bigint, 0) AS oldest_pending_age_seconds,
MIN(scheduled_at) FILTER (WHERE status = 'pending') AS oldest_pending_scheduled_at,
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(started_at) FILTER (WHERE status = 'running')))::bigint, 0) AS oldest_running_age_seconds,
MIN(started_at) FILTER (WHERE status = 'running') AS oldest_running_started_at,
MAX(heartbeat_at) FILTER (WHERE status = 'running') AS last_heartbeat_at
FROM ai_jobs
WHERE status IN ('pending', 'running')
GROUP BY owner_service, task_type, model_profile
ORDER BY stale_running DESC, pending DESC, running DESC, owner_service, task_type, model_profile
`, int(staleAfter.Seconds()))
if err != nil {
return nil, err
}
defer backlogRows.Close()
for backlogRows.Next() {
var stat model.BacklogStat
var oldestPendingScheduledAt *time.Time
var oldestRunningStartedAt *time.Time
var lastHeartbeatAt *time.Time
if err := backlogRows.Scan(
&stat.OwnerService,
&stat.TaskType,
&stat.ModelProfile,
&stat.Pending,
&stat.Running,
&stat.StaleRunning,
&stat.OldestPendingAgeSeconds,
&oldestPendingScheduledAt,
&stat.OldestRunningAgeSeconds,
&oldestRunningStartedAt,
&lastHeartbeatAt,
); err != nil {
return nil, err
}
if oldestPendingScheduledAt != nil {
stat.OldestPendingScheduledAt = oldestPendingScheduledAt.UTC().Format(time.RFC3339)
}
if oldestRunningStartedAt != nil {
stat.OldestRunningStartedAt = oldestRunningStartedAt.UTC().Format(time.RFC3339)
}
if lastHeartbeatAt != nil {
stat.LastHeartbeatAt = lastHeartbeatAt.UTC().Format(time.RFC3339)
}
out.Backlog = append(out.Backlog, stat)
}
return out, backlogRows.Err()
}
func scanJobSummary(row pgx.Row) (*model.JobSummary, error) {
var job model.JobSummary
err := row.Scan(
&job.ID,
&job.OwnerService,
&job.OwnerRef,
&job.TaskType,
&job.ModelProfile,
&job.Priority,
&job.Status,
&job.Attempts,
&job.MaxAttempts,
&job.ErrorCode,
&job.ErrorMessage,
&job.ScheduledAt,
&job.StartedAt,
&job.CompletedAt,
&job.WorkerID,
&job.HeartbeatAt,
&job.CreatedAt,
&job.UpdatedAt,
)
if err != nil {
return nil, err
}
return &job, nil
}
func scanJob(row pgx.Row) (*model.Job, error) {

View File

@@ -4,22 +4,44 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"time"
)
type Client struct {
baseURL string
provider ProviderConfig
http *http.Client
ffmpegPath string
leadSilence time.Duration
}
const (
ProviderWhisperLargeV3 = "whisper-large-v3"
defaultWhisperModel = "openai/whisper-large-v3"
)
var speakerLabelPattern = regexp.MustCompile(`(?i)(?:^|[\n\r ]+)((?:speaker|спикер|говорящий)\s*\d+)\s*[:-]`)
type Options struct {
AudioBaseURL string
AudioAPIKey string
AudioModel string
AudioTimeout time.Duration
AudioPrompt string
}
type ProviderConfig struct {
Name string
BaseURL string
APIKey string
Model string
Timeout time.Duration
Prompt string
}
type Input struct {
@@ -38,7 +60,13 @@ type Segment struct {
Speaker string `json:"speaker,omitempty"`
}
const ResultSchemaVersion = "ai.transcription_result.v1"
type Result struct {
SchemaVersion string `json:"schema_version"`
Provider string `json:"provider,omitempty"`
Model string `json:"model,omitempty"`
Attempts []Attempt `json:"attempts,omitempty"`
Language string `json:"language"`
Segments []Segment `json:"segments"`
DiarizeError *string `json:"diarize_error,omitempty"`
@@ -46,42 +74,93 @@ type Result struct {
DurationMS int64 `json:"duration_ms"`
}
type whisperResponse struct {
Language string `json:"language"`
Segments []Segment `json:"segments"`
DiarizeError *string `json:"diarize_error,omitempty"`
AlignError *string `json:"align_error,omitempty"`
type Attempt struct {
Provider string `json:"provider"`
Model string `json:"model,omitempty"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
Text string `json:"text,omitempty"`
Segments []Segment `json:"segments,omitempty"`
DurationMS int64 `json:"duration_ms,omitempty"`
}
type audioLLMResponse struct {
Text string
Model string
Language string
Segments []Segment
}
type audioTranscriptionResponse struct {
Text string `json:"text"`
Language string `json:"language,omitempty"`
Model string `json:"model,omitempty"`
Segments []audioTranscriptionSegment `json:"segments,omitempty"`
Error *struct {
Message string `json:"message"`
} `json:"error,omitempty"`
}
type audioTranscriptionSegment struct {
Start float64 `json:"start"`
End float64 `json:"end"`
Text string `json:"text"`
}
type audioTranscriptionStatusError struct {
status int
body string
}
func (e audioTranscriptionStatusError) Error() string {
return fmt.Sprintf("audio transcription HTTP %d: %s", e.status, e.body)
}
func New(baseURL string, timeout time.Duration, ffmpegPath string, leadSilence time.Duration) *Client {
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
if baseURL == "" {
return NewWithOptions(Options{
AudioBaseURL: baseURL,
AudioTimeout: timeout,
})
}
func NewWithOptions(opts Options) *Client {
audioPrompt := strings.TrimSpace(opts.AudioPrompt)
provider := buildAudioProvider(opts, audioPrompt)
if provider.BaseURL == "" {
return nil
}
if timeout <= 0 {
timeout = 10 * time.Minute
}
if leadSilence < 0 {
leadSilence = 0
}
if leadSilence > 5*time.Second {
leadSilence = 5 * time.Second
}
ffmpegPath = strings.TrimSpace(ffmpegPath)
if ffmpegPath == "" {
ffmpegPath = "ffmpeg"
}
return &Client{
baseURL: baseURL,
http: &http.Client{Timeout: timeout},
ffmpegPath: ffmpegPath,
leadSilence: leadSilence,
provider: provider,
http: &http.Client{Timeout: provider.Timeout},
}
}
func buildAudioProvider(opts Options, prompt string) ProviderConfig {
baseURL := strings.TrimRight(strings.TrimSpace(opts.AudioBaseURL), "/")
if baseURL == "" {
return ProviderConfig{}
}
model := firstNonEmpty(opts.AudioModel, defaultWhisperModel)
return ProviderConfig{
Name: ProviderWhisperLargeV3,
BaseURL: baseURL,
APIKey: strings.TrimSpace(opts.AudioAPIKey),
Model: model,
Timeout: defaultDuration(opts.AudioTimeout, 10*time.Minute),
Prompt: prompt,
}
}
func defaultDuration(v, fallback time.Duration) time.Duration {
if v <= 0 {
return fallback
}
return v
}
func (c *Client) Transcribe(ctx context.Context, in Input) (*Result, error) {
if c == nil || c.baseURL == "" {
return nil, fmt.Errorf("whisperx not configured")
if c == nil || c.provider.BaseURL == "" {
return nil, fmt.Errorf("audio transcription provider not configured")
}
if strings.TrimSpace(in.AudioURL) == "" {
return nil, fmt.Errorf("audio_url is required")
@@ -90,24 +169,46 @@ func (c *Client) Transcribe(ctx context.Context, in Input) (*Result, error) {
if err != nil {
return nil, err
}
if c.leadSilence > 0 {
audio, filename, err = c.addLeadSilence(ctx, audio, filename)
result, attempt, err := c.transcribeWithProvider(ctx, c.provider, audio, filename, in)
if err != nil {
return nil, err
}
result.Attempts = []Attempt{attempt}
return result, nil
}
resp, duration, err := c.transcribeAudio(ctx, audio, filename, in)
func (c *Client) transcribeWithProvider(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*Result, Attempt, error) {
providerCtx := ctx
cancel := func() {}
if provider.Timeout > 0 {
providerCtx, cancel = context.WithTimeout(ctx, provider.Timeout)
}
defer cancel()
attempt := Attempt{
Provider: provider.Name,
Model: provider.Model,
Status: "failed",
}
resp, duration, err := c.transcribeOpenAIAudio(providerCtx, provider, audio, filename, in)
attempt.DurationMS = duration.Milliseconds()
if err != nil {
return nil, err
attempt.Error = err.Error()
return nil, attempt, err
}
segments := adjustLeadSilence(resp.Segments, c.leadSilence)
text := strings.TrimSpace(resp.Text)
segments := normalizeAudioLLMSegments(resp.Segments, text, in.Diarize)
attempt.Status = "ok"
attempt.Model = resp.Model
attempt.Text = text
attempt.Segments = segments
return &Result{
Language: resp.Language,
SchemaVersion: ResultSchemaVersion,
Provider: provider.Name,
Model: resp.Model,
Language: firstNonEmpty(resp.Language, in.Language, "unknown"),
Segments: segments,
DiarizeError: resp.DiarizeError,
AlignError: resp.AlignError,
DurationMS: duration.Milliseconds(),
}, nil
}, attempt, nil
}
func (c *Client) downloadAudio(ctx context.Context, in Input) ([]byte, string, error) {
@@ -138,83 +239,6 @@ func (c *Client) downloadAudio(ctx context.Context, in Input) ([]byte, string, e
return audio, filename, nil
}
func (c *Client) addLeadSilence(ctx context.Context, audio []byte, filename string) ([]byte, string, error) {
tmpDir, err := os.MkdirTemp("", "ai-transcribe-*")
if err != nil {
return nil, "", fmt.Errorf("prepare audio temp dir: %w", err)
}
defer os.RemoveAll(tmpDir)
inputPath := filepath.Join(tmpDir, "input"+safeExt(filename))
outputPath := filepath.Join(tmpDir, "padded.mp3")
if err := os.WriteFile(inputPath, audio, 0o600); err != nil {
return nil, "", fmt.Errorf("write audio temp file: %w", err)
}
delayMS := int(c.leadSilence.Milliseconds())
if delayMS <= 0 {
return audio, filename, nil
}
cmd := exec.CommandContext(ctx, c.ffmpegPath,
"-nostdin", "-y",
"-i", inputPath,
"-af", fmt.Sprintf("adelay=%d:all=1", delayMS),
"-codec:a", "libmp3lame",
"-qscale:a", "5",
outputPath,
)
out, err := cmd.CombinedOutput()
if err != nil {
return nil, "", fmt.Errorf("ffmpeg lead silence: %w (%s)", err, trimOutput(out))
}
padded, err := os.ReadFile(outputPath)
if err != nil {
return nil, "", fmt.Errorf("read padded audio: %w", err)
}
if len(padded) == 0 {
return nil, "", fmt.Errorf("padded audio is empty")
}
base := strings.TrimSuffix(filepath.Base(filename), filepath.Ext(filename))
if base == "" || base == "." || base == "/" {
base = "audio"
}
return padded, base + "-padded.mp3", nil
}
func safeExt(filename string) string {
ext := strings.ToLower(filepath.Ext(filename))
switch ext {
case ".mp3", ".wav", ".m4a", ".ogg", ".opus", ".webm":
return ext
default:
return ".audio"
}
}
func trimOutput(out []byte) string {
s := strings.TrimSpace(string(out))
if len(s) > 600 {
return s[:600]
}
return s
}
func adjustLeadSilence(segments []Segment, silence time.Duration) []Segment {
if len(segments) == 0 || silence <= 0 {
return segments
}
shift := silence.Seconds()
out := make([]Segment, 0, len(segments))
for _, segment := range segments {
segment.Start = clampTime(segment.Start - shift)
segment.End = clampTime(segment.End - shift)
if segment.End < segment.Start {
segment.End = segment.Start
}
out = append(out, segment)
}
return out
}
func clampTime(v float64) float64 {
if v < 0 {
return 0
@@ -222,54 +246,267 @@ func clampTime(v float64) float64 {
return v
}
func (c *Client) transcribeAudio(ctx context.Context, audio []byte, filename string, in Input) (*whisperResponse, time.Duration, error) {
body := &bytes.Buffer{}
mw := multipart.NewWriter(body)
fw, err := mw.CreateFormFile("file", filename)
if err != nil {
return nil, 0, fmt.Errorf("create form file: %w", err)
func (c *Client) transcribeOpenAIAudio(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*audioLLMResponse, time.Duration, error) {
resp, duration, err := c.doOpenAIAudioTranscription(ctx, provider, audio, filename, in, "verbose_json")
if err == nil {
return resp, duration, nil
}
if _, err := fw.Write(audio); err != nil {
return nil, 0, fmt.Errorf("copy audio: %w", err)
if !isVerboseJSONUnsupported(err) {
return nil, duration, err
}
if in.Language != "" {
_ = mw.WriteField("language", in.Language)
}
if in.Diarize {
_ = mw.WriteField("diarize", "true")
if in.MinSpeakers > 0 {
_ = mw.WriteField("min_speakers", fmt.Sprintf("%d", in.MinSpeakers))
}
if in.MaxSpeakers > 0 {
_ = mw.WriteField("max_speakers", fmt.Sprintf("%d", in.MaxSpeakers))
}
} else {
_ = mw.WriteField("diarize", "false")
}
if err := mw.Close(); err != nil {
return nil, 0, fmt.Errorf("close form: %w", err)
fallbackResp, fallbackDuration, fallbackErr := c.doOpenAIAudioTranscription(ctx, provider, audio, filename, in, "json")
return fallbackResp, duration + fallbackDuration, fallbackErr
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/transcribe", body)
func (c *Client) doOpenAIAudioTranscription(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input, responseFormat string) (*audioLLMResponse, time.Duration, error) {
body := &bytes.Buffer{}
mw := multipart.NewWriter(body)
if err := mw.WriteField("model", provider.Model); err != nil {
return nil, 0, fmt.Errorf("audio transcription model field: %w", err)
}
if err := mw.WriteField("response_format", responseFormat); err != nil {
return nil, 0, fmt.Errorf("audio transcription response_format field: %w", err)
}
if err := mw.WriteField("temperature", "0"); err != nil {
return nil, 0, fmt.Errorf("audio transcription temperature field: %w", err)
}
if responseFormat == "verbose_json" {
if err := mw.WriteField("timestamp_granularities[]", "segment"); err != nil {
return nil, 0, fmt.Errorf("audio transcription timestamp field: %w", err)
}
}
if prompt := strings.TrimSpace(provider.Prompt); prompt != "" {
if err := mw.WriteField("prompt", prompt); err != nil {
return nil, 0, fmt.Errorf("audio transcription prompt field: %w", err)
}
}
if lang := strings.TrimSpace(in.Language); lang != "" {
if err := mw.WriteField("language", lang); err != nil {
return nil, 0, fmt.Errorf("audio transcription language field: %w", err)
}
}
fw, err := mw.CreateFormFile("file", filename)
if err != nil {
return nil, 0, fmt.Errorf("whisperx request: %w", err)
return nil, 0, fmt.Errorf("audio transcription create file: %w", err)
}
if _, err := fw.Write(audio); err != nil {
return nil, 0, fmt.Errorf("audio transcription copy audio: %w", err)
}
if err := mw.Close(); err != nil {
return nil, 0, fmt.Errorf("audio transcription close form: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, provider.BaseURL+"/v1/audio/transcriptions", body)
if err != nil {
return nil, 0, fmt.Errorf("audio transcription request: %w", err)
}
req.Header.Set("Content-Type", mw.FormDataContentType())
if provider.APIKey != "" {
req.Header.Set("Authorization", "Bearer "+provider.APIKey)
}
start := time.Now()
resp, err := c.http.Do(req)
duration := time.Since(start)
if err != nil {
return nil, duration, fmt.Errorf("whisperx do: %w", err)
return nil, duration, fmt.Errorf("audio transcription do: %w", err)
}
defer resp.Body.Close()
raw, err := io.ReadAll(io.LimitReader(resp.Body, 4<<20))
if err != nil {
return nil, duration, fmt.Errorf("audio transcription read: %w", err)
}
if resp.StatusCode >= 300 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return nil, duration, fmt.Errorf("whisperx HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
return nil, duration, audioTranscriptionStatusError{status: resp.StatusCode, body: strings.TrimSpace(string(raw))}
}
var out whisperResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, duration, fmt.Errorf("whisperx decode: %w", err)
var out audioTranscriptionResponse
if err := json.Unmarshal(raw, &out); err != nil {
return nil, duration, fmt.Errorf("audio transcription decode: %w", err)
}
return &out, duration, nil
if out.Error != nil {
return nil, duration, fmt.Errorf("audio transcription error: %s", out.Error.Message)
}
modelName := firstNonEmpty(out.Model, provider.Model)
return &audioLLMResponse{
Text: strings.TrimSpace(out.Text),
Model: modelName,
Language: out.Language,
Segments: convertAudioSegments(out.Segments),
}, duration, nil
}
func isVerboseJSONUnsupported(err error) bool {
var statusErr audioTranscriptionStatusError
if !errors.As(err, &statusErr) {
return false
}
if statusErr.status != http.StatusBadRequest && statusErr.status != http.StatusUnprocessableEntity {
return false
}
body := strings.ToLower(statusErr.body)
return strings.Contains(body, "verbose_json") ||
strings.Contains(body, "response_format") ||
strings.Contains(body, "timestamp_granularities")
}
func convertAudioSegments(in []audioTranscriptionSegment) []Segment {
out := make([]Segment, 0, len(in))
for _, s := range in {
text := strings.TrimSpace(s.Text)
if text == "" {
continue
}
end := s.End
if end < s.Start {
end = s.Start
}
out = append(out, Segment{Start: clampTime(s.Start), End: clampTime(end), Text: text})
}
return out
}
func normalizeAudioLLMSegments(segments []Segment, text string, diarize bool) []Segment {
text = strings.TrimSpace(text)
if text != "" {
if labeled := segmentSpeakerLabeledText(text); len(labeled) > 0 {
return labeled
}
}
if len(segments) <= 1 && text != "" {
heuristic := segmentTranscriptText(text)
if len(heuristic) > len(segments) {
segments = heuristic
}
}
return segments
}
func segmentSpeakerLabeledText(text string) []Segment {
matches := speakerLabelPattern.FindAllStringSubmatchIndex(text, -1)
if len(matches) == 0 {
return nil
}
speakerIDs := map[string]string{}
var out []Segment
var t float64
for i, match := range matches {
label := strings.ToLower(strings.TrimSpace(text[match[2]:match[3]]))
speaker, ok := speakerIDs[label]
if !ok {
speaker = fmt.Sprintf("SPEAKER_%02d", len(speakerIDs))
speakerIDs[label] = speaker
}
start := match[1]
end := len(text)
if i+1 < len(matches) {
end = matches[i+1][0]
}
part := strings.TrimSpace(text[start:end])
part = strings.Trim(part, ":-— ")
if part == "" {
continue
}
words := len(strings.Fields(part))
duration := float64(words) * 0.42
if duration < 1.2 {
duration = 1.2
}
out = append(out, Segment{Start: t, End: t + duration, Text: part, Speaker: speaker})
t += duration
}
return out
}
func segmentTranscriptText(text string) []Segment {
parts := splitTranscriptSentences(text)
out := make([]Segment, 0, len(parts))
var t float64
for _, part := range parts {
words := len(strings.Fields(part))
if words == 0 {
continue
}
duration := float64(words) * 0.42
if duration < 1.2 {
duration = 1.2
}
segment := Segment{Start: t, End: t + duration, Text: part}
out = append(out, segment)
t = segment.End
}
if len(out) == 0 && strings.TrimSpace(text) != "" {
out = append(out, Segment{Start: 0, End: 0, Text: strings.TrimSpace(text)})
}
return out
}
func splitTranscriptSentences(text string) []string {
text = strings.Join(strings.Fields(text), " ")
if text == "" {
return nil
}
var out []string
start := 0
runes := []rune(text)
for i, r := range runes {
if r != '.' && r != '!' && r != '?' && r != '…' {
continue
}
next := i + 1
if next < len(runes) && runes[next] != ' ' {
continue
}
part := strings.TrimSpace(string(runes[start : i+1]))
if part != "" {
out = append(out, part)
}
start = i + 1
for start < len(runes) && runes[start] == ' ' {
start++
}
}
tail := strings.TrimSpace(string(runes[start:]))
if tail != "" {
out = append(out, tail)
}
return mergeShortSegments(out, 8, 34)
}
func mergeShortSegments(parts []string, minWords, maxWords int) []string {
if len(parts) <= 1 {
return parts
}
out := make([]string, 0, len(parts))
var current []string
currentWords := 0
flush := func() {
if len(current) == 0 {
return
}
out = append(out, strings.Join(current, " "))
current = nil
currentWords = 0
}
for _, part := range parts {
words := len(strings.Fields(part))
if currentWords > 0 && currentWords+words > maxWords {
flush()
}
current = append(current, part)
currentWords += words
if currentWords >= minWords {
flush()
}
}
flush()
return out
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return value
}
}
return ""
}

View File

@@ -1,28 +1,183 @@
package transcription
import (
"math"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
)
func TestAdjustLeadSilence(t *testing.T) {
got := adjustLeadSilence([]Segment{
{Start: 0.2, End: 1.1, Text: "first"},
{Start: 1.4, End: 2.0, Text: "second"},
}, 800*time.Millisecond)
if got[0].Start != 0 {
t.Fatalf("first start = %v, want 0", got[0].Start)
func TestNewWithOptionsBuildsWhisperProvider(t *testing.T) {
client := NewWithOptions(Options{
AudioBaseURL: "http://whisper",
})
if client == nil {
t.Fatal("client is nil")
}
if !near(got[0].End, 0.3) {
t.Fatalf("first end = %v, want 0.3", got[0].End)
if client.provider.Name != ProviderWhisperLargeV3 {
t.Fatalf("provider = %q, want %q", client.provider.Name, ProviderWhisperLargeV3)
}
if !near(got[1].Start, 0.6) {
t.Fatalf("second start = %v, want 0.6", got[1].Start)
if client.provider.Model != "openai/whisper-large-v3" {
t.Fatalf("model = %q", client.provider.Model)
}
}
func near(got, want float64) bool {
return math.Abs(got-want) < 0.000001
func TestWhisperUsesAudioTranscriptionsEndpoint(t *testing.T) {
audioSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("fake audio"))
}))
defer audioSrv.Close()
var gotPath, gotModel, gotResponseFormat, gotPrompt, gotTemperature, gotTimestampGranularity string
providerSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotPath = r.URL.Path
if err := r.ParseMultipartForm(16 << 20); err != nil {
t.Fatalf("ParseMultipartForm: %v", err)
}
gotModel = r.FormValue("model")
gotResponseFormat = r.FormValue("response_format")
gotPrompt = r.FormValue("prompt")
gotTemperature = r.FormValue("temperature")
gotTimestampGranularity = r.FormValue("timestamp_granularities[]")
if _, _, err := r.FormFile("file"); err != nil {
t.Fatalf("FormFile: %v", err)
}
_ = json.NewEncoder(w).Encode(map[string]any{
"text": "Алло, тест. Да, слышно.",
"segments": []map[string]any{
{"start": 0, "end": 1.2, "text": "Алло, тест."},
{"start": 1.2, "end": 2.4, "text": "Да, слышно."},
},
})
}))
defer providerSrv.Close()
client := NewWithOptions(Options{
AudioBaseURL: providerSrv.URL,
AudioModel: "openai/whisper-large-v3",
})
if client == nil {
t.Fatal("client is nil")
}
got, err := client.Transcribe(t.Context(), Input{AudioURL: audioSrv.URL, Filename: "call.mp3"})
if err != nil {
t.Fatalf("Transcribe: %v", err)
}
if gotPath != "/v1/audio/transcriptions" {
t.Fatalf("path = %q, want /v1/audio/transcriptions", gotPath)
}
if gotModel != "openai/whisper-large-v3" {
t.Fatalf("model = %q", gotModel)
}
if gotResponseFormat != "verbose_json" {
t.Fatalf("response_format = %q, want verbose_json", gotResponseFormat)
}
if gotTemperature != "0" {
t.Fatalf("temperature = %q, want 0", gotTemperature)
}
if gotTimestampGranularity != "segment" {
t.Fatalf("timestamp_granularities[] = %q, want segment", gotTimestampGranularity)
}
if gotPrompt != "" {
t.Fatalf("prompt = %q, want empty", gotPrompt)
}
if len(got.Segments) != 2 || got.Segments[0].Text != "Алло, тест." || got.Segments[1].Start != 1.2 {
t.Fatalf("segments = %#v", got.Segments)
}
if got.SchemaVersion != ResultSchemaVersion {
t.Fatalf("schema_version = %q, want %q", got.SchemaVersion, ResultSchemaVersion)
}
}
func TestWhisperFallsBackToJSONWhenVerboseJSONUnsupported(t *testing.T) {
audioSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("fake audio"))
}))
defer audioSrv.Close()
var formats []string
providerSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if err := r.ParseMultipartForm(16 << 20); err != nil {
t.Fatalf("ParseMultipartForm: %v", err)
}
format := r.FormValue("response_format")
formats = append(formats, format)
if format == "verbose_json" {
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(map[string]any{
"error": map[string]any{"message": "unsupported response_format verbose_json"},
})
return
}
_ = json.NewEncoder(w).Encode(map[string]any{
"text": "Алло, fallback работает.",
})
}))
defer providerSrv.Close()
client := NewWithOptions(Options{
AudioBaseURL: providerSrv.URL,
AudioModel: "openai/whisper-large-v3",
})
got, err := client.Transcribe(t.Context(), Input{AudioURL: audioSrv.URL, Filename: "call.mp3"})
if err != nil {
t.Fatalf("Transcribe: %v", err)
}
if len(formats) != 2 || formats[0] != "verbose_json" || formats[1] != "json" {
t.Fatalf("formats = %#v, want verbose_json then json", formats)
}
if got.Segments[0].Text != "Алло, fallback работает." {
t.Fatalf("segments = %#v", got.Segments)
}
}
func TestSegmentTranscriptTextDoesNotInventSpeakers(t *testing.T) {
got := segmentTranscriptText("Алло, добрый день. Да, слушаю. Скажите, квартира продается? Да, продается.")
if len(got) < 2 {
t.Fatalf("segments = %#v, want multiple", got)
}
if got[0].Speaker != "" || got[1].Speaker != "" {
t.Fatalf("speakers = %q/%q, want empty", got[0].Speaker, got[1].Speaker)
}
if got[1].Start <= got[0].Start {
t.Fatalf("segment times did not advance: %#v", got)
}
}
func TestNormalizeAudioLLMSegmentsSplitsSingleLongSegment(t *testing.T) {
text := "Алло, добрый день. Да, слушаю. Скажите, квартира продается? Да, продается."
got := normalizeAudioLLMSegments([]Segment{{Start: 0, End: 12, Text: text}}, text, true)
if len(got) < 2 {
t.Fatalf("segments = %#v, want heuristic split", got)
}
if got[0].Speaker != "" || got[1].Speaker != "" {
t.Fatalf("speakers = %q/%q, want empty", got[0].Speaker, got[1].Speaker)
}
}
func TestNormalizeAudioLLMSegmentsKeepsSegmentsWithoutFakeSpeakers(t *testing.T) {
got := normalizeAudioLLMSegments([]Segment{
{Start: 0, End: 1, Text: "Алло."},
{Start: 1, End: 2, Text: "Да, слушаю."},
}, "Алло. Да, слушаю.", true)
if len(got) != 2 {
t.Fatalf("segments = %#v", got)
}
if got[0].Speaker != "" || got[1].Speaker != "" {
t.Fatalf("speakers = %q/%q, want empty", got[0].Speaker, got[1].Speaker)
}
}
func TestNormalizeAudioLLMSegmentsUsesExplicitSpeakerLabels(t *testing.T) {
text := "Спикер 1: Алло, добрый день. Спикер 2: Да, слушаю. Спикер 1: Скажите, квартира продается?"
got := normalizeAudioLLMSegments(nil, text, true)
if len(got) != 3 {
t.Fatalf("segments = %#v, want 3", got)
}
if got[0].Speaker != "SPEAKER_00" || got[1].Speaker != "SPEAKER_01" || got[2].Speaker != "SPEAKER_00" {
t.Fatalf("speakers = %q/%q/%q", got[0].Speaker, got[1].Speaker, got[2].Speaker)
}
if got[0].Text != "Алло, добрый день." || got[1].Text != "Да, слушаю." {
t.Fatalf("texts = %#v", got)
}
}

View File

@@ -20,7 +20,7 @@ const (
TaskCallAnalysis = "call_analysis"
TaskTranscription = "transcription"
TranscriptionProfile = "whisperx"
TranscriptionProfile = "whisper-large-v3"
)
type Worker struct {
@@ -113,6 +113,9 @@ func (w *Worker) tick(ctx context.Context) {
}
func (w *Worker) process(ctx context.Context, job *model.Job) {
stopHeartbeat := w.startHeartbeat(ctx, job)
defer stopHeartbeat()
if job.TaskType == TaskTranscription {
w.processTranscription(ctx, job)
return
@@ -139,7 +142,7 @@ func (w *Worker) process(ctx context.Context, job *model.Job) {
func (w *Worker) processTranscription(ctx context.Context, job *model.Job) {
if w.transcriber == nil {
w.fail(ctx, job, "provider_unavailable", "whisperx not configured")
w.fail(ctx, job, "provider_unavailable", "transcription providers not configured")
return
}
var input transcription.Input
@@ -168,6 +171,41 @@ func (w *Worker) fail(ctx context.Context, job *model.Job, code, message string)
}
}
func (w *Worker) startHeartbeat(ctx context.Context, job *model.Job) func() {
heartbeatCtx, cancel := context.WithCancel(ctx)
done := make(chan struct{})
ticker := time.NewTicker(w.heartbeatInterval())
go func() {
defer close(done)
defer ticker.Stop()
for {
select {
case <-heartbeatCtx.Done():
return
case <-ticker.C:
if err := w.store.HeartbeatJob(heartbeatCtx, job.ID); err != nil {
slog.Warn("heartbeat job failed", "job_id", job.ID, "error", err)
}
}
}
}()
return func() {
cancel()
<-done
}
}
func (w *Worker) heartbeatInterval() time.Duration {
interval := w.leaseTimeout / 3
if interval < 10*time.Second {
return 10 * time.Second
}
if interval > time.Minute {
return time.Minute
}
return interval
}
func classifyTranscriptionError(err error) string {
if err == nil {
return "unknown"
@@ -184,10 +222,12 @@ func classifyTranscriptionError(err error) string {
return "bad_audio"
case strings.Contains(s, "audio download") || strings.Contains(s, "audio http 5"):
return "storage_error"
case strings.Contains(s, "whisperx http 4") || strings.Contains(s, "ffmpeg") || strings.Contains(s, "invalid data") || strings.Contains(s, "could not decode"):
case strings.Contains(s, "audio transcription http 4") || strings.Contains(s, "invalid data") || strings.Contains(s, "could not decode"):
return "bad_audio"
case strings.Contains(s, "whisperx http 5") || strings.Contains(s, "whisperx do") || strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "closed network connection"):
case strings.Contains(s, "audio transcription http 5") || strings.Contains(s, "audio transcription do") || strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "closed network connection"):
return "provider_unavailable"
case strings.Contains(s, "audio transcription http 4"):
return "bad_input"
case strings.Contains(s, "decode"):
return "bad_response"
default:
@@ -205,6 +245,8 @@ func classifyLLMError(err error) string {
return "timeout"
case strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "no route to host") || strings.Contains(s, "llm http 5"):
return "model_unavailable"
case strings.Contains(s, "maximum context length") || strings.Contains(s, "context length") || strings.Contains(s, "input_tokens"):
return "context_length"
case strings.Contains(s, "llm http 4") || strings.Contains(s, "messages are required"):
return "bad_input"
case strings.Contains(s, "llm decode") || strings.Contains(s, "empty choices"):

View File

@@ -0,0 +1,29 @@
package worker
import (
"errors"
"testing"
)
func TestClassifyLLMError(t *testing.T) {
tests := []struct {
name string
err error
want string
}{
{name: "timeout", err: errors.New("context deadline exceeded"), want: "timeout"},
{name: "unavailable", err: errors.New("llm HTTP 500: internal server error"), want: "model_unavailable"},
{name: "context length", err: errors.New("This model's maximum context length is 16384 tokens. input_tokens=16001"), want: "context_length"},
{name: "bad input", err: errors.New("llm HTTP 400: messages are required"), want: "bad_input"},
{name: "bad response", err: errors.New("llm decode: invalid character '<'"), want: "bad_response"},
{name: "unknown", err: errors.New("strange failure"), want: "unknown"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := classifyLLMError(tt.err); got != tt.want {
t.Fatalf("classifyLLMError() = %q, want %q", got, tt.want)
}
})
}
}

View File

@@ -11,12 +11,15 @@ data:
LLM_BASE_URL: "http://10.2.3.5:8002"
LLM_MODEL: "qwen2.5-14b"
LLM_TIMEOUT: "5m"
WHISPERX_URL: "http://10.2.3.5:8001"
WHISPERX_TIMEOUT: "10m"
WHISPERX_LEAD_SILENCE: "800ms"
FFMPEG_PATH: "/usr/bin/ffmpeg"
# Whisper Large v3 is exposed on the AI server through an OpenAI-compatible
# /v1/audio/transcriptions endpoint.
AUDIO_TRANSCRIPTION_BASE_URL: "http://10.2.3.5:8004"
AUDIO_TRANSCRIPTION_MODEL: "openai/whisper-large-v3"
AUDIO_TRANSCRIPTION_TIMEOUT: "30m"
AI_STATS_SIDECAR_URL: "http://10.2.3.5:9090"
AI_STATS_TIMEOUT: "8s"
WORKER_POLL_INTERVAL: "2s"
WORKER_HTTP_HOST: "0.0.0.0"
WORKER_HTTP_PORT: "8081"
WORKER_CLAIM_LIMIT: "4"
WORKER_LEASE_TIMEOUT: "15m"

View File

@@ -18,4 +18,5 @@ type: Opaque
stringData:
DATABASE_URL: "postgres://ai_service:ai_service@postgres:5432/ai_service?sslmode=disable"
LLM_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32"
AUDIO_TRANSCRIPTION_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32"
AI_SERVICE_TOKEN: "d18bcacf9e02bae1806ee6b6eeda62b95be6a915c0a22936d9a700128b275442"

View File

@@ -22,22 +22,110 @@ spec:
- name: worker
image: localhost:30300/admin/ai-service:latest
command: ["/usr/local/bin/ai-service-worker"]
ports:
- containerPort: 8081
env:
- name: WORKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: WORKER_TASK_TYPES
value: "llm_chat,chat_completion,call_analysis,telegram_classification"
value: "llm_chat,chat_completion,call_analysis,telegram_classification,transcript_summary"
- name: WORKER_MODEL_PROFILES
value: "qwen2.5-14b"
- name: WORKER_CLAIM_LIMIT
value: "8"
- name: WORKER_LEASE_TIMEOUT
value: "15m"
envFrom:
- configMapRef:
name: ai-service-config
- secretRef:
name: ai-service-secrets
startupProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 5
failureThreshold: 30
readinessProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 10
livenessProbe:
httpGet:
path: /healthz
port: 8081
periodSeconds: 10
resources:
requests:
cpu: 50m
memory: 96Mi
limits:
cpu: 500m
memory: 384Mi
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-service-analysis-worker
namespace: ai-service
spec:
replicas: 1
selector:
matchLabels:
app: ai-service-analysis-worker
template:
metadata:
labels:
app: ai-service-analysis-worker
spec:
terminationGracePeriodSeconds: 20
hostAliases:
- ip: "77.105.173.42"
hostnames:
- "s3-minio.estateliga.work"
containers:
- name: worker
image: localhost:30300/admin/ai-service:latest
command: ["/usr/local/bin/ai-service-worker"]
ports:
- containerPort: 8081
env:
- name: WORKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: WORKER_TASK_TYPES
value: "call_analysis"
- name: WORKER_MODEL_PROFILES
value: "qwen2.5-14b"
- name: WORKER_CLAIM_LIMIT
value: "8"
- name: WORKER_LEASE_TIMEOUT
value: "15m"
envFrom:
- configMapRef:
name: ai-service-config
- secretRef:
name: ai-service-secrets
startupProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 5
failureThreshold: 30
readinessProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 10
livenessProbe:
httpGet:
path: /healthz
port: 8081
periodSeconds: 10
resources:
requests:
cpu: 50m
@@ -70,6 +158,8 @@ spec:
- name: worker
image: localhost:30300/admin/ai-service:latest
command: ["/usr/local/bin/ai-service-worker"]
ports:
- containerPort: 8081
env:
- name: WORKER_ID
valueFrom:
@@ -78,14 +168,32 @@ spec:
- name: WORKER_TASK_TYPES
value: "transcription"
- name: WORKER_MODEL_PROFILES
value: "whisperx"
value: "whisper-large-v3"
- name: WORKER_CLAIM_LIMIT
value: "1"
value: "4"
- name: WORKER_LEASE_TIMEOUT
value: "15m"
envFrom:
- configMapRef:
name: ai-service-config
- secretRef:
name: ai-service-secrets
startupProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 5
failureThreshold: 30
readinessProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 10
livenessProbe:
httpGet:
path: /healthz
port: 8081
periodSeconds: 10
resources:
requests:
cpu: 50m

View File

@@ -0,0 +1,4 @@
DROP INDEX IF EXISTS ai_jobs_owner_stats_idx;
DROP INDEX IF EXISTS ai_jobs_queue_stats_idx;
DROP INDEX IF EXISTS ai_jobs_running_lease_idx;
DROP INDEX IF EXISTS ai_jobs_pending_claim_scope_idx;

View File

@@ -0,0 +1,13 @@
CREATE INDEX IF NOT EXISTS ai_jobs_pending_claim_scope_idx
ON ai_jobs (task_type, model_profile, priority DESC, scheduled_at DESC, created_at DESC)
WHERE status = 'pending';
CREATE INDEX IF NOT EXISTS ai_jobs_running_lease_idx
ON ai_jobs ((COALESCE(heartbeat_at, started_at, updated_at)))
WHERE status = 'running';
CREATE INDEX IF NOT EXISTS ai_jobs_queue_stats_idx
ON ai_jobs (task_type, model_profile, status);
CREATE INDEX IF NOT EXISTS ai_jobs_owner_stats_idx
ON ai_jobs (owner_service, task_type, model_profile, status);

View File

@@ -0,0 +1 @@
DROP INDEX IF EXISTS ai_jobs_recent_status_idx;

View File

@@ -0,0 +1,2 @@
CREATE INDEX IF NOT EXISTS ai_jobs_recent_status_idx
ON ai_jobs (status, updated_at DESC, created_at DESC);

View File

@@ -0,0 +1 @@
DROP INDEX IF EXISTS ai_jobs_owner_error_stats_idx;

View File

@@ -0,0 +1,3 @@
CREATE INDEX IF NOT EXISTS ai_jobs_owner_error_stats_idx
ON ai_jobs (owner_service, task_type, model_profile, error_code, updated_at DESC)
WHERE status = 'failed';