Compare commits

...

51 Commits

Author SHA1 Message Date
Grendgi
8d06cfabb1 fix: remove unused config helper
All checks were successful
CI / hygiene (push) Successful in 3s
Build and Deploy / build-and-deploy (push) Successful in 28s
CI / test (push) Successful in 22s
2026-06-18 10:28:43 +03:00
Grendgi
b81a8ee6be feat: cancel ai jobs by id
Some checks failed
CI / hygiene (push) Successful in 1s
Build and Deploy / build-and-deploy (push) Successful in 42s
CI / test (push) Failing after 21s
2026-06-17 17:39:29 +03:00
Grendgi
63553fba33 feat: version ai result schemas 2026-06-17 16:46:03 +03:00
Grendgi
f32265400b feat: expand ai retry policy 2026-06-17 16:39:58 +03:00
Grendgi
aad905c2c8 fix: expose ai health component errors 2026-06-17 16:35:47 +03:00
Grendgi
22d85ce646 fix: align ai health status contract 2026-06-17 16:34:44 +03:00
Grendgi
3c124c5f5a feat: expose ai service health detail 2026-06-17 16:31:22 +03:00
Grendgi
773f53f790 Expose AI queue heartbeat metrics
Some checks failed
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 28s
CI / test (push) Failing after 7s
2026-06-12 16:50:15 +03:00
Grendgi
abc64214d2 Add AI service CI hygiene guard 2026-06-12 16:42:35 +03:00
Grendgi
e45884c5e5 Retry AI service database connection on startup 2026-06-12 16:28:02 +03:00
Grendgi
c618ffaff9 Update analysis worker deployment
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 15s
2026-06-11 12:20:47 +03:00
Grendgi
92ac01d8b5 Use verbose Whisper transcription output
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 22s
2026-06-11 11:40:52 +03:00
Grendgi
b536877181 Stabilize Whisper transcription requests
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 21s
2026-06-11 10:16:34 +03:00
Grendgi
bc71caa762 Stabilize AI worker leases
Some checks failed
CI / test (push) Failing after 7s
Build and Deploy / build-and-deploy (push) Successful in 15s
2026-06-10 17:50:53 +03:00
Grendgi
d0980007d7 Add dedicated call analysis worker
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 25s
2026-06-10 17:37:21 +03:00
Grendgi
ea632902bb Shorten AI worker lease timeout
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 15s
2026-06-10 17:05:26 +03:00
Grendgi
800d1d7cdd Keep AI jobs alive during processing
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 1m10s
2026-06-10 16:48:58 +03:00
Grendgi
837acf2f00 Add AI queue backlog metrics
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 24s
2026-06-10 16:36:35 +03:00
Grendgi
631a45aff3 Classify LLM context length errors
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 31s
2026-06-10 16:16:57 +03:00
Grendgi
7a1965e402 Increase transcription worker claim limit
Some checks failed
CI / test (push) Failing after 7s
Build and Deploy / build-and-deploy (push) Successful in 16s
2026-06-10 16:11:35 +03:00
Grendgi
3c2f13b967 Add AI job stage metrics
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 34s
2026-06-10 16:07:21 +03:00
Grendgi
2a481fdc54 Add automatic retry policy for AI jobs
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 33s
2026-06-10 15:58:41 +03:00
Grendgi
f54400e8e2 Requeue stale worker jobs on idempotent create
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 21s
2026-06-10 14:42:14 +03:00
Grendgi
e6ae792325 Drop legacy audio config aliases
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 29s
2026-06-10 14:28:52 +03:00
Grendgi
80fa21ff80 Refresh AI service pipeline docs
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 27s
2026-06-10 13:43:59 +03:00
Grendgi
7d0e27f681 Extend LLM worker lease timeout
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 23s
2026-06-10 13:20:06 +03:00
Grendgi
11247f17de Route transcript summary jobs to LLM worker
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 21s
2026-06-10 12:32:22 +03:00
Grendgi
ae1802dab9 Stop sending prompt to Whisper transcription
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 24s
2026-06-10 11:30:08 +03:00
Grendgi
bde56978d6 Fix Whisper large v3 audio compose
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 18s
2026-06-10 10:15:24 +03:00
Grendgi
8d6cd84403 Switch transcription to Whisper large v3
Some checks failed
CI / test (push) Failing after 10s
Build and Deploy / build-and-deploy (push) Successful in 24s
2026-06-10 10:10:13 +03:00
Grendgi
1b63dcdbf5 Increase Voxtral transcription timeouts
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 20s
2026-06-09 22:27:52 +03:00
Grendgi
bf945e05e3 Use Voxtral JSON transcription without fake speakers
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 23s
2026-06-09 22:09:27 +03:00
Grendgi
e074f6b226 Run Voxtral transcription worker with two jobs
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 19s
2026-06-09 17:16:24 +03:00
Grendgi
9bd6d726f0 Make Voxtral the only transcription provider
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 27s
2026-06-09 16:54:54 +03:00
Grendgi
5c965be8c9 Split single Voxtral transcript segments
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 23s
2026-06-09 16:20:52 +03:00
Grendgi
64bf40b3ba Segment Voxtral transcripts for telephony
All checks were successful
CI / test (push) Successful in 15s
Build and Deploy / build-and-deploy (push) Successful in 32s
2026-06-09 16:12:57 +03:00
Grendgi
e6c2b46cf6 Use Voxtral audio transcription endpoint
All checks were successful
CI / test (push) Successful in 14s
Build and Deploy / build-and-deploy (push) Successful in 30s
2026-06-09 15:51:50 +03:00
Grendgi
817eb8ff71 Disable inactive WhisperX provider status
All checks were successful
CI / test (push) Successful in 12s
Build and Deploy / build-and-deploy (push) Successful in 18s
2026-06-09 15:38:55 +03:00
Grendgi
94e0d03580 Switch transcription comparison to Voxtral
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 18s
2026-06-09 15:11:52 +03:00
Grendgi
add15f1385 Cast transcription comparison averages
All checks were successful
CI / test (push) Successful in 14s
Build and Deploy / build-and-deploy (push) Successful in 23s
2026-06-09 15:01:32 +03:00
Grendgi
35c60f0e0e Add transcription comparison stats 2026-06-09 14:59:08 +03:00
Grendgi
88e7c86836 Document vLLM audio URL payloads
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 16s
2026-06-09 13:49:27 +03:00
Grendgi
1202ebcb7f Use vLLM audio URL payloads
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 24s
2026-06-09 13:46:28 +03:00
Grendgi
2ef71a822b Add vLLM audio image for transcription models
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 22s
2026-06-09 13:38:08 +03:00
Grendgi
76ac9b8896 Add audio model API keys
All checks were successful
CI / test (push) Successful in 14s
Build and Deploy / build-and-deploy (push) Successful in 25s
2026-06-09 13:28:53 +03:00
Grendgi
c31dcb891c Enable Qwen audio endpoint 2026-06-09 13:23:21 +03:00
Grendgi
ee6e948d2e Add single WhisperX load balancer config 2026-06-09 13:20:17 +03:00
Grendgi
e132634c65 Isolate audio model compose network 2026-06-09 13:17:05 +03:00
Grendgi
cac8d89e64 Tune audio model GPU profiles 2026-06-09 12:52:13 +03:00
Grendgi
f49ba7abd5 Add AI server audio model profiles 2026-06-09 12:50:56 +03:00
Grendgi
aaecbb1bed Add transcription provider comparison chain 2026-06-09 12:34:08 +03:00
25 changed files with 1655 additions and 301 deletions

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env bash
set -euo pipefail
fail=0
while IFS= read -r -d '' path; do
base="$(basename "$path")"
case "$base" in
.DS_Store|.env)
echo "::error file=$path::tracked local-only file is forbidden"
fail=1
;;
esac
case "$path" in
*node_modules/*|node_modules/*)
echo "::error file=$path::tracked node_modules content is forbidden"
fail=1
;;
*.tmp|*.temp|*.bak|*.orig|*.rej|*.zip|*.tar|*.tar.gz|*.tgz|*.rar|*.7z)
echo "::error file=$path::tracked temporary/archive artifact is forbidden"
fail=1
;;
esac
if [ -f "$path" ]; then
size="$(wc -c < "$path" | tr -d ' ')"
if [ "${size:-0}" -gt 52428800 ]; then
echo "::error file=$path::tracked file is larger than 50 MiB"
fail=1
fi
fi
done < <(git ls-files -z)
exit "$fail"

View File

@@ -5,8 +5,15 @@ on:
pull_request: pull_request:
jobs: jobs:
hygiene:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: bash .gitea/scripts/hygiene-check.sh
test: test:
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: hygiene
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: actions/setup-go@v5 - uses: actions/setup-go@v5

View File

@@ -58,8 +58,11 @@ jobs:
server=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }} server=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }}
kubectl -n ai-service set image deployment/ai-service-worker \ kubectl -n ai-service set image deployment/ai-service-worker \
worker=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }} worker=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }}
kubectl -n ai-service set image deployment/ai-service-analysis-worker \
worker=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }}
kubectl -n ai-service set image deployment/ai-service-transcription-worker \ kubectl -n ai-service set image deployment/ai-service-transcription-worker \
worker=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }} worker=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }}
kubectl -n ai-service rollout status deployment/ai-service --timeout=180s kubectl -n ai-service rollout status deployment/ai-service --timeout=180s
kubectl -n ai-service rollout status deployment/ai-service-worker --timeout=180s kubectl -n ai-service rollout status deployment/ai-service-worker --timeout=180s
kubectl -n ai-service rollout status deployment/ai-service-analysis-worker --timeout=180s
kubectl -n ai-service rollout status deployment/ai-service-transcription-worker --timeout=180s kubectl -n ai-service rollout status deployment/ai-service-transcription-worker --timeout=180s

View File

@@ -2,8 +2,9 @@
Technical AI job service for Portal workloads. Technical AI job service for Portal workloads.
The first version owns only AI job lifecycle and metrics. Business data stays in AI Service owns technical AI job lifecycle, provider execution and metrics.
domain services such as `telephony`, `monitoring-tg` and `monitoring-pf`. Business data stays in domain services such as `telephony`, `monitoring-tg` and
`monitoring-pf`.
## Generic job contract ## Generic job contract
@@ -14,8 +15,9 @@ The service is intentionally domain-agnostic:
- `owner_ref` is the caller's stable object reference, for example - `owner_ref` is the caller's stable object reference, for example
`beeline/{call_id}` or `channel/{message_id}`. `beeline/{call_id}` or `channel/{message_id}`.
- `task_type` describes the technical task class, for example - `task_type` describes the technical task class, for example
`transcribe`, `call_analysis`, `tg_analysis`, `pf_competitor_analysis`. `transcription`, `transcript_summary`, `call_analysis`,
- `model_profile` selects a runtime profile, for example `whisperx`, `telegram_classification`, `tg_analysis`, `pf_competitor_analysis`.
- `model_profile` selects a runtime profile, for example `whisper-large-v3`,
`qwen2.5-14b`, `vision`, or a future provider profile. `qwen2.5-14b`, `vision`, or a future provider profile.
- `input` and `result` are JSON payloads owned by the caller and worker. - `input` and `result` are JSON payloads owned by the caller and worker.
@@ -24,8 +26,9 @@ service.
## Built-in workers ## Built-in workers
The first built-in worker processes `llm_chat`, `chat_completion` and The LLM worker processes `llm_chat`, `chat_completion`, `call_analysis`,
`call_analysis` jobs whose `model_profile` equals `LLM_MODEL`. `transcript_summary` and `telegram_classification` jobs whose `model_profile`
equals `LLM_MODEL`.
Input can be either explicit messages: Input can be either explicit messages:
@@ -40,11 +43,34 @@ Input can be either explicit messages:
``` ```
or compact `system` / `user` fields. The completed job result contains or compact `system` / `user` fields. The completed job result contains
`content`, `model`, `usage` and `duration_ms`. `schema_version=ai.chat_result.v1`, `content`, `model`, `usage` and
`duration_ms`.
`call_analysis` uses the same input contract as `llm_chat`; callers may include `call_analysis` and `transcript_summary` use the same input contract as
domain metadata fields in `input`, but the worker only reads chat fields such as `llm_chat`; callers may include domain metadata fields in `input`, but the
`system`, `user`, `messages`, `max_tokens` and `response_format`. worker only reads chat fields such as `system`, `user`, `messages`,
`max_tokens` and `response_format`.
`transcription` jobs are processed only by Whisper Large v3
(`openai/whisper-large-v3`) through an OpenAI-compatible
`/v1/audio/transcriptions` endpoint. The returned `segments` field stays
compatible with telephony. If the provider returns one long segment, AI Service
splits it into smaller transcript segments without inventing speaker labels.
The completed job result contains
`schema_version=ai.transcription_result.v1`, `provider`, `model`, `language`,
`segments`, optional provider `attempts` and `duration_ms`.
AI-server compose snippet for Whisper Large v3 lives in
`deploy/ai-server/docker-compose.audio.yml`:
- Whisper endpoint: `http://10.2.3.5:8004`
- Start Whisper:
`docker compose -f docker-compose.yml -f docker-compose.audio.yml --profile whisper-large-v3 up -d whisper-large-v3`
In Kubernetes the dedicated transcription worker may claim more than one
`whisper-large-v3` job at a time. This keeps download/upload/wait overhead from
serializing the queue while the Whisper provider still controls the actual GPU
scheduling.
## API ## API
@@ -62,7 +88,9 @@ domain metadata fields in `input`, but the worker only reads chat fields such as
- `GET /api/v1/providers/status` checks configured AI providers without - `GET /api/v1/providers/status` checks configured AI providers without
returning secrets. returning secrets.
- `GET /api/v1/infra/status` returns AI-server sidecar telemetry - `GET /api/v1/infra/status` returns AI-server sidecar telemetry
(GPU, containers, vLLM and WhisperX live metrics) when configured. (GPU, containers and vLLM live metrics) when configured.
- `GET /health/detail` returns PostgreSQL, provider, queue, error, throughput
and infra components for Portal `admin/health`.
- `GET /healthz` returns process health. - `GET /healthz` returns process health.
- `GET /readyz` checks PostgreSQL readiness. - `GET /readyz` checks PostgreSQL readiness.
- Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`: - Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`:
@@ -72,6 +100,34 @@ All `/api/v1/*` endpoints require `Authorization: Bearer <AI_SERVICE_TOKEN>`
when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open
for Kubernetes probes. for Kubernetes probes.
## Retry policy
Workers store a normalized `error_code` on failed jobs. AI Service requeues only
explicitly retryable categories while attempts remain.
| Category | Retry | Delay |
| --- | --- | --- |
| `provider_unavailable`, `model_unavailable`, `provider_error`, `dependency_error`, `timeout`, `storage_error`, `stale_worker` | yes | 30s |
| `bad_response`, `transcript_hallucination`, `transcript_incomplete`, `internal_error`, `unknown` | yes | 2m |
| `bad_audio`, `bad_input`, `context_length`, `unsupported_task`, `cancelled` | no | - |
Domain services may still expose manual retry for terminal errors after the
underlying data or prompt is corrected.
## Result schemas
AI Service result payloads are versioned with `schema_version`. Consumers should
ignore unknown fields and reject only unsupported major schema names.
Current schemas:
- `ai.chat_result.v1`: `{schema_version, content, model, usage?, duration_ms}`.
- `ai.transcription_result.v1`:
`{schema_version, provider?, model?, attempts?, language, segments, duration_ms}`.
New optional fields may be added to a `v1` schema without a breaking change.
Breaking shape changes require a new schema name.
## Configuration ## Configuration
- `HTTP_HOST`, default `0.0.0.0` - `HTTP_HOST`, default `0.0.0.0`
@@ -83,7 +139,11 @@ for Kubernetes probes.
- `LLM_API_KEY`, primary LLM API key - `LLM_API_KEY`, primary LLM API key
- `LLM_MODEL`, default `qwen2.5-14b` - `LLM_MODEL`, default `qwen2.5-14b`
- `LLM_TIMEOUT`, default `5m` - `LLM_TIMEOUT`, default `5m`
- `WHISPERX_URL`, WhisperX endpoint for transcription jobs - `AUDIO_TRANSCRIPTION_BASE_URL`, OpenAI-compatible transcription endpoint
- `AUDIO_TRANSCRIPTION_MODEL`, default `openai/whisper-large-v3`
- `AUDIO_TRANSCRIPTION_API_KEY`, optional bearer token; falls back to
`AUDIO_LLM_API_KEY`, then `LLM_API_KEY`
- `AUDIO_TRANSCRIPTION_PROMPT`, transcription instruction
- `WORKER_ID`, default hostname - `WORKER_ID`, default hostname
- `WORKER_HTTP_HOST`, default `0.0.0.0` - `WORKER_HTTP_HOST`, default `0.0.0.0`
- `WORKER_HTTP_PORT`, default `8081` - `WORKER_HTTP_PORT`, default `8081`
@@ -91,8 +151,10 @@ for Kubernetes probes.
- `WORKER_CLAIM_LIMIT`, default `4` - `WORKER_CLAIM_LIMIT`, default `4`
- `WORKER_LEASE_TIMEOUT`, default `15m` - `WORKER_LEASE_TIMEOUT`, default `15m`
## Next integration step ## Current telephony pipeline
`telephony` should first mirror low-risk analysis jobs into this service while `telephony` now uses AI Service as the only AI execution path:
continuing local processing. Remote execution can then be enabled by feature
flag per task type. 1. `transcription` turns call audio into segments.
2. `transcript_summary` creates a detailed Russian call summary.
3. `call_analysis` runs tags and negotiation rules against the summary.

View File

@@ -48,15 +48,22 @@ func main() {
} }
llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout) llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout)
transcriber := transcription.New(cfg.WhisperXURL, cfg.WhisperXTimeout, cfg.FfmpegPath, cfg.WhisperXLeadSilence) transcriber := transcription.NewWithOptions(transcription.Options{
AudioBaseURL: cfg.AudioBaseURL,
AudioAPIKey: cfg.AudioAPIKey,
AudioModel: cfg.AudioModel,
AudioTimeout: cfg.AudioTimeout,
AudioPrompt: cfg.AudioPrompt,
})
w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit) w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit)
healthSrv := startHealthServer(ctx, db, cfg) healthSrv := startHealthServer(ctx, db, cfg)
slog.Info("ai_worker_started", slog.Info("ai_worker_started",
"worker_id", cfg.WorkerID, "worker_id", cfg.WorkerID,
"model", cfg.LLMModel, "model", cfg.LLMModel,
"whisperx_enabled", transcriber != nil, "transcription_enabled", transcriber != nil,
"whisperx_lead_silence", cfg.WhisperXLeadSilence.String(), "transcription_provider", transcription.ProviderWhisperLargeV3,
"transcription_model", cfg.AudioModel,
"task_types", cfg.WorkerTaskTypes, "task_types", cfg.WorkerTaskTypes,
"model_profiles", cfg.WorkerModelProfiles, "model_profiles", cfg.WorkerModelProfiles,
"poll_interval", cfg.WorkerPollInterval.String(), "poll_interval", cfg.WorkerPollInterval.String(),
@@ -123,13 +130,15 @@ func (h workerHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}) })
case r.Method == http.MethodGet && path == "/worker/status": case r.Method == http.MethodGet && path == "/worker/status":
writeWorkerJSON(w, http.StatusOK, map[string]any{ writeWorkerJSON(w, http.StatusOK, map[string]any{
"status": "running", "status": "running",
"worker_id": h.cfg.WorkerID, "worker_id": h.cfg.WorkerID,
"task_types": h.cfg.WorkerTaskTypes, "task_types": h.cfg.WorkerTaskTypes,
"model_profiles": h.cfg.WorkerModelProfiles, "model_profiles": h.cfg.WorkerModelProfiles,
"claim_limit": h.cfg.WorkerClaimLimit, "transcription_provider": transcription.ProviderWhisperLargeV3,
"poll_interval": h.cfg.WorkerPollInterval.String(), "transcription_model": h.cfg.AudioModel,
"lease_timeout": h.cfg.WorkerLeaseTimeout.String(), "claim_limit": h.cfg.WorkerClaimLimit,
"poll_interval": h.cfg.WorkerPollInterval.String(),
"lease_timeout": h.cfg.WorkerLeaseTimeout.String(),
}) })
default: default:
writeWorkerJSON(w, http.StatusNotFound, map[string]any{"error": "not found"}) writeWorkerJSON(w, http.StatusNotFound, map[string]any{"error": "not found"})

View File

@@ -0,0 +1,52 @@
services:
whisper-large-v3:
build:
context: .
dockerfile: vllm-audio.Dockerfile
image: vllm-audio:local
container_name: whisper-large-v3
profiles:
- whisper-large-v3
restart: unless-stopped
ipc: host
runtime: nvidia
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
environment:
HUGGING_FACE_HUB_TOKEN: ${HF_TOKEN}
VLLM_API_KEY: ${VLLM_API_KEY}
HF_HOME: /cache
volumes:
- ./data/vllm-cache:/cache
networks:
- audio-models
ports:
- "10.2.3.5:8004:8000"
command:
- "--model"
- "openai/whisper-large-v3"
- "--served-model-name"
- "openai/whisper-large-v3"
- "--host"
- "0.0.0.0"
- "--port"
- "8000"
- "--gpu-memory-utilization"
- "0.55"
- "--api-key"
- "${VLLM_API_KEY}"
healthcheck:
test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 30s
timeout: 5s
retries: 5
start_period: 1200s
networks:
audio-models:
driver: bridge

View File

@@ -0,0 +1,3 @@
FROM vllm/vllm-openai:latest
RUN python3 -m pip install --no-cache-dir av soundfile librosa

View File

@@ -14,16 +14,17 @@ type Config struct {
MigrateOnStart bool MigrateOnStart bool
APIAuthToken string APIAuthToken string
LLMBaseURL string LLMBaseURL string
LLMAPIKey string LLMAPIKey string
LLMModel string LLMModel string
LLMTimeout time.Duration LLMTimeout time.Duration
WhisperXURL string AudioBaseURL string
WhisperXTimeout time.Duration AudioAPIKey string
WhisperXLeadSilence time.Duration AudioModel string
FfmpegPath string AudioTimeout time.Duration
AIStatsSidecarURL string AudioPrompt string
AIStatsTimeout time.Duration AIStatsSidecarURL string
AIStatsTimeout time.Duration
WorkerID string WorkerID string
WorkerHTTPHost string WorkerHTTPHost string
@@ -43,16 +44,17 @@ func Load() Config {
MigrateOnStart: envBool("MIGRATE_ON_START", true), MigrateOnStart: envBool("MIGRATE_ON_START", true),
APIAuthToken: envString("AI_SERVICE_TOKEN", ""), APIAuthToken: envString("AI_SERVICE_TOKEN", ""),
LLMBaseURL: envString("LLM_BASE_URL", ""), LLMBaseURL: envString("LLM_BASE_URL", ""),
LLMAPIKey: envString("LLM_API_KEY", ""), LLMAPIKey: envString("LLM_API_KEY", ""),
LLMModel: envString("LLM_MODEL", "qwen2.5-14b"), LLMModel: envString("LLM_MODEL", "qwen2.5-14b"),
LLMTimeout: envDuration("LLM_TIMEOUT", 5*time.Minute), LLMTimeout: envDuration("LLM_TIMEOUT", 5*time.Minute),
WhisperXURL: envString("WHISPERX_URL", ""), AudioBaseURL: envString("AUDIO_TRANSCRIPTION_BASE_URL", ""),
WhisperXTimeout: envDuration("WHISPERX_TIMEOUT", 10*time.Minute), AudioAPIKey: envString("AUDIO_TRANSCRIPTION_API_KEY", ""),
WhisperXLeadSilence: envDuration("WHISPERX_LEAD_SILENCE", 800*time.Millisecond), AudioModel: envString("AUDIO_TRANSCRIPTION_MODEL", "openai/whisper-large-v3"),
FfmpegPath: envString("FFMPEG_PATH", "/usr/bin/ffmpeg"), AudioTimeout: envDuration("AUDIO_TRANSCRIPTION_TIMEOUT", 10*time.Minute),
AIStatsSidecarURL: envString("AI_STATS_SIDECAR_URL", ""), AudioPrompt: envString("AUDIO_TRANSCRIPTION_PROMPT", defaultAudioPrompt()),
AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second), AIStatsSidecarURL: envString("AI_STATS_SIDECAR_URL", ""),
AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second),
WorkerID: envString("WORKER_ID", hostname()), WorkerID: envString("WORKER_ID", hostname()),
WorkerHTTPHost: envString("WORKER_HTTP_HOST", "0.0.0.0"), WorkerHTTPHost: envString("WORKER_HTTP_HOST", "0.0.0.0"),
@@ -123,6 +125,10 @@ func envCSV(key string) []string {
return out return out
} }
func defaultAudioPrompt() string {
return ""
}
func hostname() string { func hostname() string {
h, err := os.Hostname() h, err := os.Hostname()
if err != nil || h == "" { if err != nil || h == "" {

View File

@@ -5,6 +5,7 @@ import (
"time" "time"
"ai-service/internal/model" "ai-service/internal/model"
"ai-service/internal/transcription"
) )
type dashboardResponse struct { type dashboardResponse struct {
@@ -30,7 +31,7 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
ctx, cancel := contextWithTimeout(r, 12*time.Second) ctx, cancel := contextWithTimeout(r, 12*time.Second)
defer cancel() defer cancel()
stats, err := s.store.Stats(ctx) stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
if err != nil { if err != nil {
writeError(w, http.StatusInternalServerError, err.Error()) writeError(w, http.StatusInternalServerError, err.Error())
return return
@@ -43,7 +44,6 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, err.Error()) writeError(w, http.StatusInternalServerError, err.Error())
return return
} }
resp := dashboardResponse{ resp := dashboardResponse{
At: now, At: now,
Summary: summarizeQueues(stats), Summary: summarizeQueues(stats),
@@ -52,7 +52,7 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
At: now, At: now,
Providers: []providerStatus{ Providers: []providerStatus{
s.checkLLM(ctx), s.checkLLM(ctx),
s.checkWhisperX(ctx), s.checkAudioLLM(ctx, transcription.ProviderWhisperLargeV3, s.cfg.AudioBaseURL, s.cfg.AudioAPIKey, s.cfg.AudioModel, s.cfg.AudioTimeout),
}, },
}, },
Infra: loadInfraSnapshot(r, s.cfg), Infra: loadInfraSnapshot(r, s.cfg),

241
internal/httpapi/health.go Normal file
View File

@@ -0,0 +1,241 @@
package httpapi
import (
"context"
"net/http"
"strings"
"time"
"ai-service/internal/model"
"ai-service/internal/transcription"
)
type healthDetailResponse struct {
Status string `json:"status"`
Generated time.Time `json:"generated_at"`
Components []healthComponent `json:"components"`
}
type healthComponent struct {
Name string `json:"name"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
Data map[string]any `json:"data,omitempty"`
}
func (s *Server) handleHealthDetail(w http.ResponseWriter, r *http.Request) {
ctx, cancel := contextWithTimeout(r, 12*time.Second)
defer cancel()
resp := healthDetailResponse{
Status: "ok",
Generated: time.Now().UTC(),
}
if err := s.store.Ping(ctx); err != nil {
resp.Components = append(resp.Components, healthComponent{
Name: "postgres",
Status: "down",
Error: err.Error(),
})
resp.Status = worseHealthStatus(resp.Status, "down")
writeJSON(w, http.StatusServiceUnavailable, resp)
return
}
resp.Components = append(resp.Components, healthComponent{Name: "postgres", Status: "ok"})
stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
if err != nil {
resp.Components = append(resp.Components, healthComponent{
Name: "queue",
Status: "down",
Error: err.Error(),
})
resp.Status = worseHealthStatus(resp.Status, "down")
writeJSON(w, http.StatusServiceUnavailable, resp)
return
}
for _, component := range []healthComponent{
s.healthProviders(ctx),
healthQueue(stats),
healthErrors(stats),
healthThroughput(stats),
healthInfra(loadInfraSnapshot(r, s.cfg)),
} {
resp.Components = append(resp.Components, component)
resp.Status = worseHealthStatus(resp.Status, component.Status)
}
statusCode := http.StatusOK
if resp.Status == "down" {
statusCode = http.StatusServiceUnavailable
}
writeJSON(w, statusCode, resp)
}
func (s *Server) healthProviders(ctx context.Context) healthComponent {
providers := []providerStatus{
s.checkLLM(ctx),
s.checkAudioLLM(ctx, transcription.ProviderWhisperLargeV3, s.cfg.AudioBaseURL, s.cfg.AudioAPIKey, s.cfg.AudioModel, s.cfg.AudioTimeout),
}
status := "ok"
messages := make([]string, 0)
for _, provider := range providers {
switch {
case !provider.Configured:
status = worseHealthStatus(status, "degraded")
messages = append(messages, provider.Name+" not configured")
case !provider.OK:
status = worseHealthStatus(status, "down")
if provider.Error != "" {
messages = append(messages, provider.Name+": "+provider.Error)
} else {
messages = append(messages, provider.Name+" unavailable")
}
case provider.Stale:
status = worseHealthStatus(status, "degraded")
if provider.Error != "" {
messages = append(messages, provider.Name+": "+provider.Error)
}
}
}
return healthComponent{
Name: "providers",
Status: status,
Error: strings.Join(messages, "; "),
Data: map[string]any{
"providers": providers,
},
}
}
func healthQueue(stats *model.Stats) healthComponent {
var pending, running, staleRunning int64
var oldestPendingAgeSeconds, oldestRunningAgeSeconds int64
for _, row := range stats.Backlog {
pending += row.Pending
running += row.Running
staleRunning += row.StaleRunning
if row.OldestPendingAgeSeconds > oldestPendingAgeSeconds {
oldestPendingAgeSeconds = row.OldestPendingAgeSeconds
}
if row.OldestRunningAgeSeconds > oldestRunningAgeSeconds {
oldestRunningAgeSeconds = row.OldestRunningAgeSeconds
}
}
status := "ok"
message := ""
if staleRunning > 0 {
status = "degraded"
message = "there are stale running jobs"
}
return healthComponent{
Name: "queue",
Status: status,
Error: message,
Data: map[string]any{
"pending": pending,
"running": running,
"stale_running": staleRunning,
"oldest_pending_age_seconds": oldestPendingAgeSeconds,
"oldest_running_age_seconds": oldestRunningAgeSeconds,
"backlog": stats.Backlog,
"queue_status_totals": stats.Queues,
"owner_status_totals": stats.Owners,
},
}
}
func healthErrors(stats *model.Stats) healthComponent {
var failedTotal, failed24h int64
for _, row := range stats.Errors {
failedTotal += row.Total
failed24h += row.Last24h
}
status := "ok"
message := ""
if failed24h > 0 {
status = "degraded"
message = "there are failed jobs in the last 24 hours"
}
return healthComponent{
Name: "errors",
Status: status,
Error: message,
Data: map[string]any{
"failed_total": failedTotal,
"failed_24h": failed24h,
"by_code": stats.Errors,
},
}
}
func healthThroughput(stats *model.Stats) healthComponent {
var done24h, retried24h int64
for _, row := range stats.Stages {
done24h += row.Done24h
retried24h += row.Retried24h
}
pendingByStage := make(map[string]int64)
for _, row := range stats.Backlog {
pendingByStage[row.TaskType+"|"+row.ModelProfile] += row.Pending + row.Running
}
doneByStage := make(map[string]int64)
for _, row := range stats.Stages {
doneByStage[row.TaskType+"|"+row.ModelProfile] += row.Done24h
}
stuckStages := make([]string, 0)
for key, total := range pendingByStage {
if total > 0 && doneByStage[key] == 0 {
stuckStages = append(stuckStages, key)
}
}
status := "ok"
message := ""
if len(stuckStages) > 0 {
status = "degraded"
message = "some active queues have no completed jobs in the last 24 hours"
}
return healthComponent{
Name: "throughput",
Status: status,
Error: message,
Data: map[string]any{
"done_24h": done24h,
"retried_24h": retried24h,
"stuck_stages": stuckStages,
"stages": stats.Stages,
},
}
}
func healthInfra(infra infraStatusResponse) healthComponent {
status := "ok"
message := ""
if infra.SidecarError != "" {
status = "degraded"
message = infra.SidecarError
}
return healthComponent{
Name: "infra",
Status: status,
Error: message,
Data: map[string]any{
"sidecar": infra.Sidecar,
},
}
}
func worseHealthStatus(current, next string) string {
if current == "down" || next == "down" {
return "down"
}
if current == "degraded" || next == "degraded" {
return "degraded"
}
return "ok"
}

View File

@@ -8,6 +8,8 @@ import (
"net/http" "net/http"
"strings" "strings"
"time" "time"
"ai-service/internal/transcription"
) )
type providerStatus struct { type providerStatus struct {
@@ -42,12 +44,58 @@ func (s *Server) handleProviderStatus(w http.ResponseWriter, r *http.Request) {
At: time.Now().UTC(), At: time.Now().UTC(),
Providers: []providerStatus{ Providers: []providerStatus{
s.checkLLM(ctx), s.checkLLM(ctx),
s.checkWhisperX(ctx), s.checkAudioLLM(ctx, transcription.ProviderWhisperLargeV3, s.cfg.AudioBaseURL, s.cfg.AudioAPIKey, s.cfg.AudioModel, s.cfg.AudioTimeout),
}, },
} }
writeJSON(w, http.StatusOK, resp) writeJSON(w, http.StatusOK, resp)
} }
func (s *Server) checkAudioLLM(ctx context.Context, name, baseURL, apiKey, model string, timeout time.Duration) providerStatus {
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
st := providerStatus{
Name: name,
Configured: baseURL != "",
URL: baseURL,
Model: model,
}
if !st.Configured {
return st
}
if timeout <= 0 {
timeout = 10 * time.Minute
}
start := time.Now()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, st.URL+"/v1/models", nil)
if err != nil {
st.Error = err.Error()
return st
}
if apiKey != "" {
req.Header.Set("Authorization", "Bearer "+apiKey)
}
res, err := (&http.Client{Timeout: minDuration(timeout, 3*time.Second)}).Do(req)
st.LatencyMS = time.Since(start).Milliseconds()
if err != nil {
st.Error = err.Error()
return s.withStaleProviderOK(name, st)
}
defer res.Body.Close()
if res.StatusCode >= 300 {
st.Error = fmt.Sprintf("http %d: %s", res.StatusCode, readSmallBody(res.Body))
return s.withStaleProviderOK(name, st)
}
st.OK = true
s.rememberProviderOK(name, st.LatencyMS)
return st
}
func minDuration(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}
func (s *Server) checkLLM(ctx context.Context) providerStatus { func (s *Server) checkLLM(ctx context.Context) providerStatus {
st := providerStatus{ st := providerStatus{
Name: "llm", Name: "llm",
@@ -84,47 +132,6 @@ func (s *Server) checkLLM(ctx context.Context) providerStatus {
return st return st
} }
func (s *Server) checkWhisperX(ctx context.Context) providerStatus {
baseURL := strings.TrimRight(strings.TrimSpace(s.cfg.WhisperXURL), "/")
st := providerStatus{Name: "whisperx", Configured: baseURL != "", URL: baseURL}
if !st.Configured {
return st
}
paths := []string{"/health", "/healthz", "/readyz", "/"}
var lastErr string
for _, path := range paths {
cctx, cancel := context.WithTimeout(ctx, 2*time.Second)
start := time.Now()
req, err := http.NewRequestWithContext(cctx, http.MethodGet, baseURL+path, nil)
if err != nil {
cancel()
lastErr = err.Error()
continue
}
res, err := (&http.Client{Timeout: 2 * time.Second}).Do(req)
st.LatencyMS = time.Since(start).Milliseconds()
cancel()
if err != nil {
lastErr = err.Error()
continue
}
body := ""
if res.StatusCode >= 300 {
body = readSmallBody(res.Body)
}
_ = res.Body.Close()
if res.StatusCode >= 300 {
lastErr = fmt.Sprintf("%s http %d: %s", path, res.StatusCode, body)
continue
}
st.OK = true
s.rememberProviderOK("whisperx", st.LatencyMS)
return st
}
st.Error = lastErr
return s.withStaleProviderOK("whisperx", st)
}
func (s *Server) rememberProviderOK(name string, latencyMS int64) { func (s *Server) rememberProviderOK(name string, latencyMS int64) {
s.providerMu.Lock() s.providerMu.Lock()
defer s.providerMu.Unlock() defer s.providerMu.Unlock()

View File

@@ -41,6 +41,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
case r.Method == http.MethodGet && path == "/readyz": case r.Method == http.MethodGet && path == "/readyz":
s.handleReady(w, r) s.handleReady(w, r)
case r.Method == http.MethodGet && path == "/health/detail":
s.handleHealthDetail(w, r)
case r.Method == http.MethodGet && path == "/": case r.Method == http.MethodGet && path == "/":
writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"}) writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"})
case r.Method == http.MethodPost && path == "/api/v1/jobs": case r.Method == http.MethodPost && path == "/api/v1/jobs":
@@ -59,6 +61,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handleGetJob(w, r, path) s.handleGetJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"): case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"):
s.handleRetryJob(w, r, path) s.handleRetryJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/cancel"):
s.handleCancelJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"): case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"):
s.handleCompleteJob(w, r, path) s.handleCompleteJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"): case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"):
@@ -265,7 +269,7 @@ func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request, path strin
} }
func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) { func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) {
id, err := jobIDFromPath(path, true) id, err := jobIDFromActionPath(path, "retry")
if err != nil { if err != nil {
writeError(w, http.StatusBadRequest, err.Error()) writeError(w, http.StatusBadRequest, err.Error())
return return
@@ -284,6 +288,26 @@ func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path str
writeJSON(w, http.StatusOK, job) writeJSON(w, http.StatusOK, job)
} }
func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request, path string) {
id, err := jobIDFromActionPath(path, "cancel")
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
ctx, cancel := contextWithTimeout(r, 8*time.Second)
defer cancel()
job, err := s.store.CancelJob(ctx, id)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if job == nil {
writeError(w, http.StatusNotFound, "cancellable job not found")
return
}
writeJSON(w, http.StatusOK, job)
}
func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) { func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) {
id, err := jobIDFromActionPath(path, "complete") id, err := jobIDFromActionPath(path, "complete")
if err != nil { if err != nil {
@@ -337,7 +361,7 @@ func (s *Server) handleFailJob(w http.ResponseWriter, r *http.Request, path stri
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) { func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
ctx, cancel := contextWithTimeout(r, 8*time.Second) ctx, cancel := contextWithTimeout(r, 8*time.Second)
defer cancel() defer cancel()
stats, err := s.store.Stats(ctx) stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
if err != nil { if err != nil {
writeError(w, http.StatusInternalServerError, err.Error()) writeError(w, http.StatusInternalServerError, err.Error())
return return

View File

@@ -38,11 +38,14 @@ type Usage struct {
TotalTokens int `json:"total_tokens"` TotalTokens int `json:"total_tokens"`
} }
const ChatResultSchemaVersion = "ai.chat_result.v1"
type ChatResult struct { type ChatResult struct {
Content string `json:"content"` SchemaVersion string `json:"schema_version"`
Model string `json:"model"` Content string `json:"content"`
Usage *Usage `json:"usage,omitempty"` Model string `json:"model"`
DurationMS int64 `json:"duration_ms"` Usage *Usage `json:"usage,omitempty"`
DurationMS int64 `json:"duration_ms"`
} }
type chatRequest struct { type chatRequest struct {
@@ -137,10 +140,11 @@ func (c *Client) Chat(ctx context.Context, in ChatInput) (*ChatResult, error) {
modelName = c.model modelName = c.model
} }
return &ChatResult{ return &ChatResult{
Content: out.Choices[0].Message.Content, SchemaVersion: ChatResultSchemaVersion,
Model: modelName, Content: out.Choices[0].Message.Content,
Usage: out.Usage, Model: modelName,
DurationMS: duration.Milliseconds(), Usage: out.Usage,
DurationMS: duration.Milliseconds(),
}, nil }, nil
} }

View File

@@ -0,0 +1,43 @@
package llm
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
)
func TestChatResultIncludesSchemaVersion(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/v1/chat/completions" {
t.Fatalf("path = %q, want /v1/chat/completions", r.URL.Path)
}
_ = json.NewEncoder(w).Encode(map[string]any{
"model": "qwen2.5-14b",
"choices": []map[string]any{
{"message": map[string]string{"role": "assistant", "content": `{"ok":true}`}},
},
"usage": map[string]int{
"prompt_tokens": 10,
"completion_tokens": 2,
"total_tokens": 12,
},
})
}))
defer server.Close()
client := New(server.URL, "", "fallback-model", 0)
got, err := client.Chat(t.Context(), ChatInput{User: "test", MaxTokens: 32})
if err != nil {
t.Fatalf("Chat: %v", err)
}
if got.SchemaVersion != ChatResultSchemaVersion {
t.Fatalf("schema_version = %q, want %q", got.SchemaVersion, ChatResultSchemaVersion)
}
if got.Content != `{"ok":true}` {
t.Fatalf("content = %q", got.Content)
}
if got.Usage == nil || got.Usage.TotalTokens != 12 {
t.Fatalf("usage = %#v", got.Usage)
}
}

View File

@@ -119,6 +119,30 @@ type ErrorStat struct {
Last24h int64 `json:"last_24h"` Last24h int64 `json:"last_24h"`
} }
type StageStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Done24h int64 `json:"done_24h"`
AvgDurationSeconds int64 `json:"avg_duration_seconds"`
AvgAttempts int64 `json:"avg_attempts"`
Retried24h int64 `json:"retried_24h"`
}
type BacklogStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Pending int64 `json:"pending"`
Running int64 `json:"running"`
StaleRunning int64 `json:"stale_running"`
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
OldestPendingScheduledAt string `json:"oldest_pending_scheduled_at,omitempty"`
OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"`
OldestRunningStartedAt string `json:"oldest_running_started_at,omitempty"`
LastHeartbeatAt string `json:"last_heartbeat_at,omitempty"`
}
type OwnerStat struct { type OwnerStat struct {
OwnerService string `json:"owner_service"` OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"` TaskType string `json:"task_type"`
@@ -128,8 +152,10 @@ type OwnerStat struct {
} }
type Stats struct { type Stats struct {
At time.Time `json:"at"` At time.Time `json:"at"`
Queues []QueueStat `json:"queues"` Queues []QueueStat `json:"queues"`
Owners []OwnerStat `json:"owners,omitempty"` Owners []OwnerStat `json:"owners,omitempty"`
Errors []ErrorStat `json:"errors,omitempty"` Errors []ErrorStat `json:"errors,omitempty"`
Stages []StageStat `json:"stages,omitempty"`
Backlog []BacklogStat `json:"backlog,omitempty"`
} }

View File

@@ -0,0 +1,22 @@
package store
import (
"strings"
"time"
)
type failRetryPolicy struct {
Retryable bool
Delay time.Duration
}
func retryPolicyForError(errorCode string) failRetryPolicy {
switch strings.TrimSpace(errorCode) {
case "provider_unavailable", "model_unavailable", "provider_error", "dependency_error", "timeout", "storage_error", "stale_worker":
return failRetryPolicy{Retryable: true, Delay: 30 * time.Second}
case "bad_response", "transcript_hallucination", "transcript_incomplete", "internal_error", "unknown":
return failRetryPolicy{Retryable: true, Delay: 2 * time.Minute}
default:
return failRetryPolicy{}
}
}

View File

@@ -0,0 +1,45 @@
package store
import (
"testing"
"time"
)
func TestRetryPolicyForError(t *testing.T) {
tests := []struct {
name string
code string
retryable bool
delay time.Duration
}{
{name: "provider unavailable", code: "provider_unavailable", retryable: true, delay: 30 * time.Second},
{name: "model unavailable", code: "model_unavailable", retryable: true, delay: 30 * time.Second},
{name: "provider error", code: "provider_error", retryable: true, delay: 30 * time.Second},
{name: "dependency error", code: "dependency_error", retryable: true, delay: 30 * time.Second},
{name: "timeout", code: "timeout", retryable: true, delay: 30 * time.Second},
{name: "storage", code: "storage_error", retryable: true, delay: 30 * time.Second},
{name: "stale worker", code: "stale_worker", retryable: true, delay: 30 * time.Second},
{name: "bad response", code: "bad_response", retryable: true, delay: 2 * time.Minute},
{name: "transcript hallucination", code: "transcript_hallucination", retryable: true, delay: 2 * time.Minute},
{name: "transcript incomplete", code: "transcript_incomplete", retryable: true, delay: 2 * time.Minute},
{name: "internal error", code: "internal_error", retryable: true, delay: 2 * time.Minute},
{name: "unknown", code: "unknown", retryable: true, delay: 2 * time.Minute},
{name: "bad audio", code: "bad_audio"},
{name: "bad input", code: "bad_input"},
{name: "context length", code: "context_length"},
{name: "unsupported task", code: "unsupported_task"},
{name: "cancelled", code: "cancelled"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := retryPolicyForError(tt.code)
if got.Retryable != tt.retryable {
t.Fatalf("Retryable = %v, want %v", got.Retryable, tt.retryable)
}
if got.Delay != tt.delay {
t.Fatalf("Delay = %s, want %s", got.Delay, tt.delay)
}
})
}
}

View File

@@ -41,17 +41,48 @@ func Open(ctx context.Context, databaseURL string) (*Store, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("parse database url: %w", err) return nil, fmt.Errorf("parse database url: %w", err)
} }
pool, err := pgxpool.NewWithConfig(ctx, cfg) pool, err := connectWithRetry(ctx, cfg, 2*time.Minute)
if err != nil { if err != nil {
return nil, fmt.Errorf("connect postgres: %w", err) return nil, err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("ping postgres: %w", err)
} }
return &Store{pool: pool}, nil return &Store{pool: pool}, nil
} }
func connectWithRetry(ctx context.Context, cfg *pgxpool.Config, maxWait time.Duration) (*pgxpool.Pool, error) {
deadline := time.Now().Add(maxWait)
var lastErr error
for attempt := 1; ; attempt++ {
pool, err := pgxpool.NewWithConfig(ctx, cfg)
if err == nil {
if pingErr := pool.Ping(ctx); pingErr == nil {
return pool, nil
} else {
err = fmt.Errorf("ping postgres: %w", pingErr)
pool.Close()
}
} else {
err = fmt.Errorf("connect postgres: %w", err)
}
lastErr = err
if time.Now().After(deadline) {
return nil, fmt.Errorf("connect postgres after retry: %w", lastErr)
}
sleep := time.Duration(attempt) * time.Second
if sleep > 5*time.Second {
sleep = 5 * time.Second
}
timer := time.NewTimer(sleep)
select {
case <-ctx.Done():
timer.Stop()
return nil, fmt.Errorf("connect postgres cancelled: %w", ctx.Err())
case <-timer.C:
}
}
}
func (s *Store) Close() { func (s *Store) Close() {
s.pool.Close() s.pool.Close()
} }
@@ -78,7 +109,33 @@ INSERT INTO ai_jobs (
) )
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL
DO UPDATE SET updated_at = ai_jobs.updated_at DO UPDATE SET
status = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN 'pending' ELSE ai_jobs.status END,
priority = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.priority ELSE ai_jobs.priority END,
max_attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.max_attempts ELSE ai_jobs.max_attempts END,
attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN 0 ELSE ai_jobs.attempts END,
input = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.input ELSE ai_jobs.input END,
scheduled_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.scheduled_at ELSE ai_jobs.scheduled_at END,
started_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.started_at END,
completed_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.completed_at END,
worker_id = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.worker_id END,
heartbeat_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.heartbeat_at END,
error_code = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.error_code END,
error_message = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.error_message END,
updated_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NOW() ELSE ai_jobs.updated_at END
RETURNING ` + jobSelectColumns + ` RETURNING ` + jobSelectColumns + `
` `
row := s.pool.QueryRow(ctx, q, row := s.pool.QueryRow(ctx, q,
@@ -106,7 +163,33 @@ INSERT INTO ai_jobs (
) )
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL
DO UPDATE SET updated_at = ai_jobs.updated_at DO UPDATE SET
status = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN 'pending' ELSE ai_jobs.status END,
priority = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.priority ELSE ai_jobs.priority END,
max_attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.max_attempts ELSE ai_jobs.max_attempts END,
attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN 0 ELSE ai_jobs.attempts END,
input = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.input ELSE ai_jobs.input END,
scheduled_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.scheduled_at ELSE ai_jobs.scheduled_at END,
started_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.started_at END,
completed_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.completed_at END,
worker_id = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.worker_id END,
heartbeat_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.heartbeat_at END,
error_code = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.error_code END,
error_message = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.error_message END,
updated_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NOW() ELSE ai_jobs.updated_at END
RETURNING ` + jobSelectColumns + ` RETURNING ` + jobSelectColumns + `
` `
var batch pgx.Batch var batch pgx.Batch
@@ -368,6 +451,28 @@ WHERE j.id = picked.id
return int(tag.RowsAffected()), nil return int(tag.RowsAffected()), nil
} }
func (s *Store) CancelJob(ctx context.Context, id uuid.UUID) (*model.Job, error) {
const q = `
UPDATE ai_jobs
SET status = 'cancelled',
completed_at = NOW(),
worker_id = NULL,
heartbeat_at = NULL,
updated_at = NOW()
WHERE id = $1
AND status IN ('pending', 'running')
RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, status,
attempts, max_attempts, input, result, error_code, error_message,
scheduled_at, started_at, completed_at, worker_id, heartbeat_at,
created_at, updated_at, idempotency_key
`
job, err := scanJob(s.pool.QueryRow(ctx, q, id))
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
return job, err
}
func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) { func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) {
normalizeFilter(&filter) normalizeFilter(&filter)
const q = ` const q = `
@@ -490,25 +595,47 @@ RETURNING ` + jobSelectColumns + `
return job, err return job, err
} }
func (s *Store) HeartbeatJob(ctx context.Context, id uuid.UUID) error {
const q = `
UPDATE ai_jobs
SET heartbeat_at = NOW(),
updated_at = NOW()
WHERE id = $1
AND status = 'running'
`
tag, err := s.pool.Exec(ctx, q, id)
if err != nil {
return err
}
if tag.RowsAffected() == 0 {
return nil
}
return nil
}
func (s *Store) FailJob(ctx context.Context, id uuid.UUID, in model.FailJob) (*model.Job, error) { func (s *Store) FailJob(ctx context.Context, id uuid.UUID, in model.FailJob) (*model.Job, error) {
errorCode := strings.TrimSpace(in.ErrorCode) errorCode := strings.TrimSpace(in.ErrorCode)
if errorCode == "" { if errorCode == "" {
errorCode = "unknown" errorCode = "unknown"
} }
errorMessage := strings.TrimSpace(in.ErrorMessage) errorMessage := strings.TrimSpace(in.ErrorMessage)
policy := retryPolicyForError(errorCode)
const q = ` const q = `
UPDATE ai_jobs UPDATE ai_jobs
SET status = 'failed', SET status = CASE WHEN $4 AND attempts < max_attempts THEN 'pending' ELSE 'failed' END,
error_code = $2, error_code = $2,
error_message = $3, error_message = $3,
completed_at = NOW(), scheduled_at = CASE WHEN $4 AND attempts < max_attempts THEN NOW() + make_interval(secs => $5) ELSE scheduled_at END,
heartbeat_at = NOW(), started_at = CASE WHEN $4 AND attempts < max_attempts THEN NULL ELSE started_at END,
completed_at = CASE WHEN $4 AND attempts < max_attempts THEN NULL ELSE NOW() END,
worker_id = NULL,
heartbeat_at = CASE WHEN $4 AND attempts < max_attempts THEN NULL ELSE NOW() END,
updated_at = NOW() updated_at = NOW()
WHERE id = $1 WHERE id = $1
AND status = 'running' AND status = 'running'
RETURNING ` + jobSelectColumns + ` RETURNING ` + jobSelectColumns + `
` `
job, err := scanJob(s.pool.QueryRow(ctx, q, id, errorCode, errorMessage)) job, err := scanJob(s.pool.QueryRow(ctx, q, id, errorCode, errorMessage, policy.Retryable, int(policy.Delay.Seconds())))
if errors.Is(err, pgx.ErrNoRows) { if errors.Is(err, pgx.ErrNoRows) {
return nil, nil return nil, nil
} }
@@ -553,7 +680,10 @@ WHERE j.id = picked.id
return int(tag.RowsAffected()), nil return int(tag.RowsAffected()), nil
} }
func (s *Store) Stats(ctx context.Context) (*model.Stats, error) { func (s *Store) Stats(ctx context.Context, staleAfter time.Duration) (*model.Stats, error) {
if staleAfter <= 0 {
staleAfter = 15 * time.Minute
}
out := &model.Stats{At: time.Now().UTC()} out := &model.Stats{At: time.Now().UTC()}
queueRows, err := s.pool.Query(ctx, ` queueRows, err := s.pool.Query(ctx, `
@@ -618,7 +748,105 @@ ORDER BY owner_service, last_24h DESC, total DESC
} }
out.Errors = append(out.Errors, stat) out.Errors = append(out.Errors, stat)
} }
return out, errorRows.Err() if err := errorRows.Err(); err != nil {
return nil, err
}
stageRows, err := s.pool.Query(ctx, `
SELECT owner_service,
task_type,
model_profile,
count(*) AS done_24h,
COALESCE(ROUND(AVG(EXTRACT(EPOCH FROM (completed_at - started_at))))::bigint, 0) AS avg_duration_seconds,
COALESCE(ROUND(AVG(attempts))::bigint, 0) AS avg_attempts,
count(*) FILTER (WHERE attempts > 1) AS retried_24h
FROM ai_jobs
WHERE status = 'done'
AND started_at IS NOT NULL
AND completed_at IS NOT NULL
AND completed_at > NOW() - INTERVAL '24 hours'
GROUP BY owner_service, task_type, model_profile
ORDER BY owner_service, task_type, model_profile
`)
if err != nil {
return nil, err
}
defer stageRows.Close()
for stageRows.Next() {
var stat model.StageStat
if err := stageRows.Scan(
&stat.OwnerService,
&stat.TaskType,
&stat.ModelProfile,
&stat.Done24h,
&stat.AvgDurationSeconds,
&stat.AvgAttempts,
&stat.Retried24h,
); err != nil {
return nil, err
}
out.Stages = append(out.Stages, stat)
}
if err := stageRows.Err(); err != nil {
return nil, err
}
backlogRows, err := s.pool.Query(ctx, `
SELECT owner_service,
task_type,
model_profile,
count(*) FILTER (WHERE status = 'pending') AS pending,
count(*) FILTER (WHERE status = 'running') AS running,
count(*) FILTER (
WHERE status = 'running'
AND COALESCE(heartbeat_at, started_at, updated_at) < NOW() - make_interval(secs => $1)
) AS stale_running,
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(scheduled_at) FILTER (WHERE status = 'pending')))::bigint, 0) AS oldest_pending_age_seconds,
MIN(scheduled_at) FILTER (WHERE status = 'pending') AS oldest_pending_scheduled_at,
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(started_at) FILTER (WHERE status = 'running')))::bigint, 0) AS oldest_running_age_seconds,
MIN(started_at) FILTER (WHERE status = 'running') AS oldest_running_started_at,
MAX(heartbeat_at) FILTER (WHERE status = 'running') AS last_heartbeat_at
FROM ai_jobs
WHERE status IN ('pending', 'running')
GROUP BY owner_service, task_type, model_profile
ORDER BY stale_running DESC, pending DESC, running DESC, owner_service, task_type, model_profile
`, int(staleAfter.Seconds()))
if err != nil {
return nil, err
}
defer backlogRows.Close()
for backlogRows.Next() {
var stat model.BacklogStat
var oldestPendingScheduledAt *time.Time
var oldestRunningStartedAt *time.Time
var lastHeartbeatAt *time.Time
if err := backlogRows.Scan(
&stat.OwnerService,
&stat.TaskType,
&stat.ModelProfile,
&stat.Pending,
&stat.Running,
&stat.StaleRunning,
&stat.OldestPendingAgeSeconds,
&oldestPendingScheduledAt,
&stat.OldestRunningAgeSeconds,
&oldestRunningStartedAt,
&lastHeartbeatAt,
); err != nil {
return nil, err
}
if oldestPendingScheduledAt != nil {
stat.OldestPendingScheduledAt = oldestPendingScheduledAt.UTC().Format(time.RFC3339)
}
if oldestRunningStartedAt != nil {
stat.OldestRunningStartedAt = oldestRunningStartedAt.UTC().Format(time.RFC3339)
}
if lastHeartbeatAt != nil {
stat.LastHeartbeatAt = lastHeartbeatAt.UTC().Format(time.RFC3339)
}
out.Backlog = append(out.Backlog, stat)
}
return out, backlogRows.Err()
} }
func scanJobSummary(row pgx.Row) (*model.JobSummary, error) { func scanJobSummary(row pgx.Row) (*model.JobSummary, error) {

View File

@@ -4,22 +4,44 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"os"
"os/exec"
"path/filepath" "path/filepath"
"regexp"
"strings" "strings"
"time" "time"
) )
type Client struct { type Client struct {
baseURL string provider ProviderConfig
http *http.Client http *http.Client
ffmpegPath string }
leadSilence time.Duration
const (
ProviderWhisperLargeV3 = "whisper-large-v3"
defaultWhisperModel = "openai/whisper-large-v3"
)
var speakerLabelPattern = regexp.MustCompile(`(?i)(?:^|[\n\r ]+)((?:speaker|спикер|говорящий)\s*\d+)\s*[:-]`)
type Options struct {
AudioBaseURL string
AudioAPIKey string
AudioModel string
AudioTimeout time.Duration
AudioPrompt string
}
type ProviderConfig struct {
Name string
BaseURL string
APIKey string
Model string
Timeout time.Duration
Prompt string
} }
type Input struct { type Input struct {
@@ -38,50 +60,107 @@ type Segment struct {
Speaker string `json:"speaker,omitempty"` Speaker string `json:"speaker,omitempty"`
} }
const ResultSchemaVersion = "ai.transcription_result.v1"
type Result struct { type Result struct {
Language string `json:"language"` SchemaVersion string `json:"schema_version"`
Segments []Segment `json:"segments"` Provider string `json:"provider,omitempty"`
DiarizeError *string `json:"diarize_error,omitempty"` Model string `json:"model,omitempty"`
AlignError *string `json:"align_error,omitempty"` Attempts []Attempt `json:"attempts,omitempty"`
DurationMS int64 `json:"duration_ms"` Language string `json:"language"`
Segments []Segment `json:"segments"`
DiarizeError *string `json:"diarize_error,omitempty"`
AlignError *string `json:"align_error,omitempty"`
DurationMS int64 `json:"duration_ms"`
} }
type whisperResponse struct { type Attempt struct {
Language string `json:"language"` Provider string `json:"provider"`
Segments []Segment `json:"segments"` Model string `json:"model,omitempty"`
DiarizeError *string `json:"diarize_error,omitempty"` Status string `json:"status"`
AlignError *string `json:"align_error,omitempty"` Error string `json:"error,omitempty"`
Text string `json:"text,omitempty"`
Segments []Segment `json:"segments,omitempty"`
DurationMS int64 `json:"duration_ms,omitempty"`
}
type audioLLMResponse struct {
Text string
Model string
Language string
Segments []Segment
}
type audioTranscriptionResponse struct {
Text string `json:"text"`
Language string `json:"language,omitempty"`
Model string `json:"model,omitempty"`
Segments []audioTranscriptionSegment `json:"segments,omitempty"`
Error *struct {
Message string `json:"message"`
} `json:"error,omitempty"`
}
type audioTranscriptionSegment struct {
Start float64 `json:"start"`
End float64 `json:"end"`
Text string `json:"text"`
}
type audioTranscriptionStatusError struct {
status int
body string
}
func (e audioTranscriptionStatusError) Error() string {
return fmt.Sprintf("audio transcription HTTP %d: %s", e.status, e.body)
} }
func New(baseURL string, timeout time.Duration, ffmpegPath string, leadSilence time.Duration) *Client { func New(baseURL string, timeout time.Duration, ffmpegPath string, leadSilence time.Duration) *Client {
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/") return NewWithOptions(Options{
if baseURL == "" { AudioBaseURL: baseURL,
AudioTimeout: timeout,
})
}
func NewWithOptions(opts Options) *Client {
audioPrompt := strings.TrimSpace(opts.AudioPrompt)
provider := buildAudioProvider(opts, audioPrompt)
if provider.BaseURL == "" {
return nil return nil
} }
if timeout <= 0 {
timeout = 10 * time.Minute
}
if leadSilence < 0 {
leadSilence = 0
}
if leadSilence > 5*time.Second {
leadSilence = 5 * time.Second
}
ffmpegPath = strings.TrimSpace(ffmpegPath)
if ffmpegPath == "" {
ffmpegPath = "ffmpeg"
}
return &Client{ return &Client{
baseURL: baseURL, provider: provider,
http: &http.Client{Timeout: timeout}, http: &http.Client{Timeout: provider.Timeout},
ffmpegPath: ffmpegPath,
leadSilence: leadSilence,
} }
} }
func buildAudioProvider(opts Options, prompt string) ProviderConfig {
baseURL := strings.TrimRight(strings.TrimSpace(opts.AudioBaseURL), "/")
if baseURL == "" {
return ProviderConfig{}
}
model := firstNonEmpty(opts.AudioModel, defaultWhisperModel)
return ProviderConfig{
Name: ProviderWhisperLargeV3,
BaseURL: baseURL,
APIKey: strings.TrimSpace(opts.AudioAPIKey),
Model: model,
Timeout: defaultDuration(opts.AudioTimeout, 10*time.Minute),
Prompt: prompt,
}
}
func defaultDuration(v, fallback time.Duration) time.Duration {
if v <= 0 {
return fallback
}
return v
}
func (c *Client) Transcribe(ctx context.Context, in Input) (*Result, error) { func (c *Client) Transcribe(ctx context.Context, in Input) (*Result, error) {
if c == nil || c.baseURL == "" { if c == nil || c.provider.BaseURL == "" {
return nil, fmt.Errorf("whisperx not configured") return nil, fmt.Errorf("audio transcription provider not configured")
} }
if strings.TrimSpace(in.AudioURL) == "" { if strings.TrimSpace(in.AudioURL) == "" {
return nil, fmt.Errorf("audio_url is required") return nil, fmt.Errorf("audio_url is required")
@@ -90,24 +169,46 @@ func (c *Client) Transcribe(ctx context.Context, in Input) (*Result, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if c.leadSilence > 0 { result, attempt, err := c.transcribeWithProvider(ctx, c.provider, audio, filename, in)
audio, filename, err = c.addLeadSilence(ctx, audio, filename)
if err != nil {
return nil, err
}
}
resp, duration, err := c.transcribeAudio(ctx, audio, filename, in)
if err != nil { if err != nil {
return nil, err return nil, err
} }
segments := adjustLeadSilence(resp.Segments, c.leadSilence) result.Attempts = []Attempt{attempt}
return result, nil
}
func (c *Client) transcribeWithProvider(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*Result, Attempt, error) {
providerCtx := ctx
cancel := func() {}
if provider.Timeout > 0 {
providerCtx, cancel = context.WithTimeout(ctx, provider.Timeout)
}
defer cancel()
attempt := Attempt{
Provider: provider.Name,
Model: provider.Model,
Status: "failed",
}
resp, duration, err := c.transcribeOpenAIAudio(providerCtx, provider, audio, filename, in)
attempt.DurationMS = duration.Milliseconds()
if err != nil {
attempt.Error = err.Error()
return nil, attempt, err
}
text := strings.TrimSpace(resp.Text)
segments := normalizeAudioLLMSegments(resp.Segments, text, in.Diarize)
attempt.Status = "ok"
attempt.Model = resp.Model
attempt.Text = text
attempt.Segments = segments
return &Result{ return &Result{
Language: resp.Language, SchemaVersion: ResultSchemaVersion,
Segments: segments, Provider: provider.Name,
DiarizeError: resp.DiarizeError, Model: resp.Model,
AlignError: resp.AlignError, Language: firstNonEmpty(resp.Language, in.Language, "unknown"),
DurationMS: duration.Milliseconds(), Segments: segments,
}, nil DurationMS: duration.Milliseconds(),
}, attempt, nil
} }
func (c *Client) downloadAudio(ctx context.Context, in Input) ([]byte, string, error) { func (c *Client) downloadAudio(ctx context.Context, in Input) ([]byte, string, error) {
@@ -138,83 +239,6 @@ func (c *Client) downloadAudio(ctx context.Context, in Input) ([]byte, string, e
return audio, filename, nil return audio, filename, nil
} }
func (c *Client) addLeadSilence(ctx context.Context, audio []byte, filename string) ([]byte, string, error) {
tmpDir, err := os.MkdirTemp("", "ai-transcribe-*")
if err != nil {
return nil, "", fmt.Errorf("prepare audio temp dir: %w", err)
}
defer os.RemoveAll(tmpDir)
inputPath := filepath.Join(tmpDir, "input"+safeExt(filename))
outputPath := filepath.Join(tmpDir, "padded.mp3")
if err := os.WriteFile(inputPath, audio, 0o600); err != nil {
return nil, "", fmt.Errorf("write audio temp file: %w", err)
}
delayMS := int(c.leadSilence.Milliseconds())
if delayMS <= 0 {
return audio, filename, nil
}
cmd := exec.CommandContext(ctx, c.ffmpegPath,
"-nostdin", "-y",
"-i", inputPath,
"-af", fmt.Sprintf("adelay=%d:all=1", delayMS),
"-codec:a", "libmp3lame",
"-qscale:a", "5",
outputPath,
)
out, err := cmd.CombinedOutput()
if err != nil {
return nil, "", fmt.Errorf("ffmpeg lead silence: %w (%s)", err, trimOutput(out))
}
padded, err := os.ReadFile(outputPath)
if err != nil {
return nil, "", fmt.Errorf("read padded audio: %w", err)
}
if len(padded) == 0 {
return nil, "", fmt.Errorf("padded audio is empty")
}
base := strings.TrimSuffix(filepath.Base(filename), filepath.Ext(filename))
if base == "" || base == "." || base == "/" {
base = "audio"
}
return padded, base + "-padded.mp3", nil
}
func safeExt(filename string) string {
ext := strings.ToLower(filepath.Ext(filename))
switch ext {
case ".mp3", ".wav", ".m4a", ".ogg", ".opus", ".webm":
return ext
default:
return ".audio"
}
}
func trimOutput(out []byte) string {
s := strings.TrimSpace(string(out))
if len(s) > 600 {
return s[:600]
}
return s
}
func adjustLeadSilence(segments []Segment, silence time.Duration) []Segment {
if len(segments) == 0 || silence <= 0 {
return segments
}
shift := silence.Seconds()
out := make([]Segment, 0, len(segments))
for _, segment := range segments {
segment.Start = clampTime(segment.Start - shift)
segment.End = clampTime(segment.End - shift)
if segment.End < segment.Start {
segment.End = segment.Start
}
out = append(out, segment)
}
return out
}
func clampTime(v float64) float64 { func clampTime(v float64) float64 {
if v < 0 { if v < 0 {
return 0 return 0
@@ -222,54 +246,267 @@ func clampTime(v float64) float64 {
return v return v
} }
func (c *Client) transcribeAudio(ctx context.Context, audio []byte, filename string, in Input) (*whisperResponse, time.Duration, error) { func (c *Client) transcribeOpenAIAudio(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*audioLLMResponse, time.Duration, error) {
resp, duration, err := c.doOpenAIAudioTranscription(ctx, provider, audio, filename, in, "verbose_json")
if err == nil {
return resp, duration, nil
}
if !isVerboseJSONUnsupported(err) {
return nil, duration, err
}
fallbackResp, fallbackDuration, fallbackErr := c.doOpenAIAudioTranscription(ctx, provider, audio, filename, in, "json")
return fallbackResp, duration + fallbackDuration, fallbackErr
}
func (c *Client) doOpenAIAudioTranscription(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input, responseFormat string) (*audioLLMResponse, time.Duration, error) {
body := &bytes.Buffer{} body := &bytes.Buffer{}
mw := multipart.NewWriter(body) mw := multipart.NewWriter(body)
if err := mw.WriteField("model", provider.Model); err != nil {
return nil, 0, fmt.Errorf("audio transcription model field: %w", err)
}
if err := mw.WriteField("response_format", responseFormat); err != nil {
return nil, 0, fmt.Errorf("audio transcription response_format field: %w", err)
}
if err := mw.WriteField("temperature", "0"); err != nil {
return nil, 0, fmt.Errorf("audio transcription temperature field: %w", err)
}
if responseFormat == "verbose_json" {
if err := mw.WriteField("timestamp_granularities[]", "segment"); err != nil {
return nil, 0, fmt.Errorf("audio transcription timestamp field: %w", err)
}
}
if prompt := strings.TrimSpace(provider.Prompt); prompt != "" {
if err := mw.WriteField("prompt", prompt); err != nil {
return nil, 0, fmt.Errorf("audio transcription prompt field: %w", err)
}
}
if lang := strings.TrimSpace(in.Language); lang != "" {
if err := mw.WriteField("language", lang); err != nil {
return nil, 0, fmt.Errorf("audio transcription language field: %w", err)
}
}
fw, err := mw.CreateFormFile("file", filename) fw, err := mw.CreateFormFile("file", filename)
if err != nil { if err != nil {
return nil, 0, fmt.Errorf("create form file: %w", err) return nil, 0, fmt.Errorf("audio transcription create file: %w", err)
} }
if _, err := fw.Write(audio); err != nil { if _, err := fw.Write(audio); err != nil {
return nil, 0, fmt.Errorf("copy audio: %w", err) return nil, 0, fmt.Errorf("audio transcription copy audio: %w", err)
}
if in.Language != "" {
_ = mw.WriteField("language", in.Language)
}
if in.Diarize {
_ = mw.WriteField("diarize", "true")
if in.MinSpeakers > 0 {
_ = mw.WriteField("min_speakers", fmt.Sprintf("%d", in.MinSpeakers))
}
if in.MaxSpeakers > 0 {
_ = mw.WriteField("max_speakers", fmt.Sprintf("%d", in.MaxSpeakers))
}
} else {
_ = mw.WriteField("diarize", "false")
} }
if err := mw.Close(); err != nil { if err := mw.Close(); err != nil {
return nil, 0, fmt.Errorf("close form: %w", err) return nil, 0, fmt.Errorf("audio transcription close form: %w", err)
} }
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/transcribe", body) req, err := http.NewRequestWithContext(ctx, http.MethodPost, provider.BaseURL+"/v1/audio/transcriptions", body)
if err != nil { if err != nil {
return nil, 0, fmt.Errorf("whisperx request: %w", err) return nil, 0, fmt.Errorf("audio transcription request: %w", err)
} }
req.Header.Set("Content-Type", mw.FormDataContentType()) req.Header.Set("Content-Type", mw.FormDataContentType())
if provider.APIKey != "" {
req.Header.Set("Authorization", "Bearer "+provider.APIKey)
}
start := time.Now() start := time.Now()
resp, err := c.http.Do(req) resp, err := c.http.Do(req)
duration := time.Since(start) duration := time.Since(start)
if err != nil { if err != nil {
return nil, duration, fmt.Errorf("whisperx do: %w", err) return nil, duration, fmt.Errorf("audio transcription do: %w", err)
} }
defer resp.Body.Close() defer resp.Body.Close()
raw, err := io.ReadAll(io.LimitReader(resp.Body, 4<<20))
if err != nil {
return nil, duration, fmt.Errorf("audio transcription read: %w", err)
}
if resp.StatusCode >= 300 { if resp.StatusCode >= 300 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) return nil, duration, audioTranscriptionStatusError{status: resp.StatusCode, body: strings.TrimSpace(string(raw))}
return nil, duration, fmt.Errorf("whisperx HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
} }
var out whisperResponse var out audioTranscriptionResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { if err := json.Unmarshal(raw, &out); err != nil {
return nil, duration, fmt.Errorf("whisperx decode: %w", err) return nil, duration, fmt.Errorf("audio transcription decode: %w", err)
} }
return &out, duration, nil if out.Error != nil {
return nil, duration, fmt.Errorf("audio transcription error: %s", out.Error.Message)
}
modelName := firstNonEmpty(out.Model, provider.Model)
return &audioLLMResponse{
Text: strings.TrimSpace(out.Text),
Model: modelName,
Language: out.Language,
Segments: convertAudioSegments(out.Segments),
}, duration, nil
}
func isVerboseJSONUnsupported(err error) bool {
var statusErr audioTranscriptionStatusError
if !errors.As(err, &statusErr) {
return false
}
if statusErr.status != http.StatusBadRequest && statusErr.status != http.StatusUnprocessableEntity {
return false
}
body := strings.ToLower(statusErr.body)
return strings.Contains(body, "verbose_json") ||
strings.Contains(body, "response_format") ||
strings.Contains(body, "timestamp_granularities")
}
func convertAudioSegments(in []audioTranscriptionSegment) []Segment {
out := make([]Segment, 0, len(in))
for _, s := range in {
text := strings.TrimSpace(s.Text)
if text == "" {
continue
}
end := s.End
if end < s.Start {
end = s.Start
}
out = append(out, Segment{Start: clampTime(s.Start), End: clampTime(end), Text: text})
}
return out
}
func normalizeAudioLLMSegments(segments []Segment, text string, diarize bool) []Segment {
text = strings.TrimSpace(text)
if text != "" {
if labeled := segmentSpeakerLabeledText(text); len(labeled) > 0 {
return labeled
}
}
if len(segments) <= 1 && text != "" {
heuristic := segmentTranscriptText(text)
if len(heuristic) > len(segments) {
segments = heuristic
}
}
return segments
}
func segmentSpeakerLabeledText(text string) []Segment {
matches := speakerLabelPattern.FindAllStringSubmatchIndex(text, -1)
if len(matches) == 0 {
return nil
}
speakerIDs := map[string]string{}
var out []Segment
var t float64
for i, match := range matches {
label := strings.ToLower(strings.TrimSpace(text[match[2]:match[3]]))
speaker, ok := speakerIDs[label]
if !ok {
speaker = fmt.Sprintf("SPEAKER_%02d", len(speakerIDs))
speakerIDs[label] = speaker
}
start := match[1]
end := len(text)
if i+1 < len(matches) {
end = matches[i+1][0]
}
part := strings.TrimSpace(text[start:end])
part = strings.Trim(part, ":-— ")
if part == "" {
continue
}
words := len(strings.Fields(part))
duration := float64(words) * 0.42
if duration < 1.2 {
duration = 1.2
}
out = append(out, Segment{Start: t, End: t + duration, Text: part, Speaker: speaker})
t += duration
}
return out
}
func segmentTranscriptText(text string) []Segment {
parts := splitTranscriptSentences(text)
out := make([]Segment, 0, len(parts))
var t float64
for _, part := range parts {
words := len(strings.Fields(part))
if words == 0 {
continue
}
duration := float64(words) * 0.42
if duration < 1.2 {
duration = 1.2
}
segment := Segment{Start: t, End: t + duration, Text: part}
out = append(out, segment)
t = segment.End
}
if len(out) == 0 && strings.TrimSpace(text) != "" {
out = append(out, Segment{Start: 0, End: 0, Text: strings.TrimSpace(text)})
}
return out
}
func splitTranscriptSentences(text string) []string {
text = strings.Join(strings.Fields(text), " ")
if text == "" {
return nil
}
var out []string
start := 0
runes := []rune(text)
for i, r := range runes {
if r != '.' && r != '!' && r != '?' && r != '…' {
continue
}
next := i + 1
if next < len(runes) && runes[next] != ' ' {
continue
}
part := strings.TrimSpace(string(runes[start : i+1]))
if part != "" {
out = append(out, part)
}
start = i + 1
for start < len(runes) && runes[start] == ' ' {
start++
}
}
tail := strings.TrimSpace(string(runes[start:]))
if tail != "" {
out = append(out, tail)
}
return mergeShortSegments(out, 8, 34)
}
func mergeShortSegments(parts []string, minWords, maxWords int) []string {
if len(parts) <= 1 {
return parts
}
out := make([]string, 0, len(parts))
var current []string
currentWords := 0
flush := func() {
if len(current) == 0 {
return
}
out = append(out, strings.Join(current, " "))
current = nil
currentWords = 0
}
for _, part := range parts {
words := len(strings.Fields(part))
if currentWords > 0 && currentWords+words > maxWords {
flush()
}
current = append(current, part)
currentWords += words
if currentWords >= minWords {
flush()
}
}
flush()
return out
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return value
}
}
return ""
} }

View File

@@ -1,28 +1,183 @@
package transcription package transcription
import ( import (
"math" "encoding/json"
"net/http"
"net/http/httptest"
"testing" "testing"
"time"
) )
func TestAdjustLeadSilence(t *testing.T) { func TestNewWithOptionsBuildsWhisperProvider(t *testing.T) {
got := adjustLeadSilence([]Segment{ client := NewWithOptions(Options{
{Start: 0.2, End: 1.1, Text: "first"}, AudioBaseURL: "http://whisper",
{Start: 1.4, End: 2.0, Text: "second"}, })
}, 800*time.Millisecond) if client == nil {
t.Fatal("client is nil")
if got[0].Start != 0 {
t.Fatalf("first start = %v, want 0", got[0].Start)
} }
if !near(got[0].End, 0.3) { if client.provider.Name != ProviderWhisperLargeV3 {
t.Fatalf("first end = %v, want 0.3", got[0].End) t.Fatalf("provider = %q, want %q", client.provider.Name, ProviderWhisperLargeV3)
} }
if !near(got[1].Start, 0.6) { if client.provider.Model != "openai/whisper-large-v3" {
t.Fatalf("second start = %v, want 0.6", got[1].Start) t.Fatalf("model = %q", client.provider.Model)
} }
} }
func near(got, want float64) bool { func TestWhisperUsesAudioTranscriptionsEndpoint(t *testing.T) {
return math.Abs(got-want) < 0.000001 audioSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("fake audio"))
}))
defer audioSrv.Close()
var gotPath, gotModel, gotResponseFormat, gotPrompt, gotTemperature, gotTimestampGranularity string
providerSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotPath = r.URL.Path
if err := r.ParseMultipartForm(16 << 20); err != nil {
t.Fatalf("ParseMultipartForm: %v", err)
}
gotModel = r.FormValue("model")
gotResponseFormat = r.FormValue("response_format")
gotPrompt = r.FormValue("prompt")
gotTemperature = r.FormValue("temperature")
gotTimestampGranularity = r.FormValue("timestamp_granularities[]")
if _, _, err := r.FormFile("file"); err != nil {
t.Fatalf("FormFile: %v", err)
}
_ = json.NewEncoder(w).Encode(map[string]any{
"text": "Алло, тест. Да, слышно.",
"segments": []map[string]any{
{"start": 0, "end": 1.2, "text": "Алло, тест."},
{"start": 1.2, "end": 2.4, "text": "Да, слышно."},
},
})
}))
defer providerSrv.Close()
client := NewWithOptions(Options{
AudioBaseURL: providerSrv.URL,
AudioModel: "openai/whisper-large-v3",
})
if client == nil {
t.Fatal("client is nil")
}
got, err := client.Transcribe(t.Context(), Input{AudioURL: audioSrv.URL, Filename: "call.mp3"})
if err != nil {
t.Fatalf("Transcribe: %v", err)
}
if gotPath != "/v1/audio/transcriptions" {
t.Fatalf("path = %q, want /v1/audio/transcriptions", gotPath)
}
if gotModel != "openai/whisper-large-v3" {
t.Fatalf("model = %q", gotModel)
}
if gotResponseFormat != "verbose_json" {
t.Fatalf("response_format = %q, want verbose_json", gotResponseFormat)
}
if gotTemperature != "0" {
t.Fatalf("temperature = %q, want 0", gotTemperature)
}
if gotTimestampGranularity != "segment" {
t.Fatalf("timestamp_granularities[] = %q, want segment", gotTimestampGranularity)
}
if gotPrompt != "" {
t.Fatalf("prompt = %q, want empty", gotPrompt)
}
if len(got.Segments) != 2 || got.Segments[0].Text != "Алло, тест." || got.Segments[1].Start != 1.2 {
t.Fatalf("segments = %#v", got.Segments)
}
if got.SchemaVersion != ResultSchemaVersion {
t.Fatalf("schema_version = %q, want %q", got.SchemaVersion, ResultSchemaVersion)
}
}
func TestWhisperFallsBackToJSONWhenVerboseJSONUnsupported(t *testing.T) {
audioSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("fake audio"))
}))
defer audioSrv.Close()
var formats []string
providerSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if err := r.ParseMultipartForm(16 << 20); err != nil {
t.Fatalf("ParseMultipartForm: %v", err)
}
format := r.FormValue("response_format")
formats = append(formats, format)
if format == "verbose_json" {
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(map[string]any{
"error": map[string]any{"message": "unsupported response_format verbose_json"},
})
return
}
_ = json.NewEncoder(w).Encode(map[string]any{
"text": "Алло, fallback работает.",
})
}))
defer providerSrv.Close()
client := NewWithOptions(Options{
AudioBaseURL: providerSrv.URL,
AudioModel: "openai/whisper-large-v3",
})
got, err := client.Transcribe(t.Context(), Input{AudioURL: audioSrv.URL, Filename: "call.mp3"})
if err != nil {
t.Fatalf("Transcribe: %v", err)
}
if len(formats) != 2 || formats[0] != "verbose_json" || formats[1] != "json" {
t.Fatalf("formats = %#v, want verbose_json then json", formats)
}
if got.Segments[0].Text != "Алло, fallback работает." {
t.Fatalf("segments = %#v", got.Segments)
}
}
func TestSegmentTranscriptTextDoesNotInventSpeakers(t *testing.T) {
got := segmentTranscriptText("Алло, добрый день. Да, слушаю. Скажите, квартира продается? Да, продается.")
if len(got) < 2 {
t.Fatalf("segments = %#v, want multiple", got)
}
if got[0].Speaker != "" || got[1].Speaker != "" {
t.Fatalf("speakers = %q/%q, want empty", got[0].Speaker, got[1].Speaker)
}
if got[1].Start <= got[0].Start {
t.Fatalf("segment times did not advance: %#v", got)
}
}
func TestNormalizeAudioLLMSegmentsSplitsSingleLongSegment(t *testing.T) {
text := "Алло, добрый день. Да, слушаю. Скажите, квартира продается? Да, продается."
got := normalizeAudioLLMSegments([]Segment{{Start: 0, End: 12, Text: text}}, text, true)
if len(got) < 2 {
t.Fatalf("segments = %#v, want heuristic split", got)
}
if got[0].Speaker != "" || got[1].Speaker != "" {
t.Fatalf("speakers = %q/%q, want empty", got[0].Speaker, got[1].Speaker)
}
}
func TestNormalizeAudioLLMSegmentsKeepsSegmentsWithoutFakeSpeakers(t *testing.T) {
got := normalizeAudioLLMSegments([]Segment{
{Start: 0, End: 1, Text: "Алло."},
{Start: 1, End: 2, Text: "Да, слушаю."},
}, "Алло. Да, слушаю.", true)
if len(got) != 2 {
t.Fatalf("segments = %#v", got)
}
if got[0].Speaker != "" || got[1].Speaker != "" {
t.Fatalf("speakers = %q/%q, want empty", got[0].Speaker, got[1].Speaker)
}
}
func TestNormalizeAudioLLMSegmentsUsesExplicitSpeakerLabels(t *testing.T) {
text := "Спикер 1: Алло, добрый день. Спикер 2: Да, слушаю. Спикер 1: Скажите, квартира продается?"
got := normalizeAudioLLMSegments(nil, text, true)
if len(got) != 3 {
t.Fatalf("segments = %#v, want 3", got)
}
if got[0].Speaker != "SPEAKER_00" || got[1].Speaker != "SPEAKER_01" || got[2].Speaker != "SPEAKER_00" {
t.Fatalf("speakers = %q/%q/%q", got[0].Speaker, got[1].Speaker, got[2].Speaker)
}
if got[0].Text != "Алло, добрый день." || got[1].Text != "Да, слушаю." {
t.Fatalf("texts = %#v", got)
}
} }

View File

@@ -20,7 +20,7 @@ const (
TaskCallAnalysis = "call_analysis" TaskCallAnalysis = "call_analysis"
TaskTranscription = "transcription" TaskTranscription = "transcription"
TranscriptionProfile = "whisperx" TranscriptionProfile = "whisper-large-v3"
) )
type Worker struct { type Worker struct {
@@ -113,6 +113,9 @@ func (w *Worker) tick(ctx context.Context) {
} }
func (w *Worker) process(ctx context.Context, job *model.Job) { func (w *Worker) process(ctx context.Context, job *model.Job) {
stopHeartbeat := w.startHeartbeat(ctx, job)
defer stopHeartbeat()
if job.TaskType == TaskTranscription { if job.TaskType == TaskTranscription {
w.processTranscription(ctx, job) w.processTranscription(ctx, job)
return return
@@ -139,7 +142,7 @@ func (w *Worker) process(ctx context.Context, job *model.Job) {
func (w *Worker) processTranscription(ctx context.Context, job *model.Job) { func (w *Worker) processTranscription(ctx context.Context, job *model.Job) {
if w.transcriber == nil { if w.transcriber == nil {
w.fail(ctx, job, "provider_unavailable", "whisperx not configured") w.fail(ctx, job, "provider_unavailable", "transcription providers not configured")
return return
} }
var input transcription.Input var input transcription.Input
@@ -168,6 +171,41 @@ func (w *Worker) fail(ctx context.Context, job *model.Job, code, message string)
} }
} }
func (w *Worker) startHeartbeat(ctx context.Context, job *model.Job) func() {
heartbeatCtx, cancel := context.WithCancel(ctx)
done := make(chan struct{})
ticker := time.NewTicker(w.heartbeatInterval())
go func() {
defer close(done)
defer ticker.Stop()
for {
select {
case <-heartbeatCtx.Done():
return
case <-ticker.C:
if err := w.store.HeartbeatJob(heartbeatCtx, job.ID); err != nil {
slog.Warn("heartbeat job failed", "job_id", job.ID, "error", err)
}
}
}
}()
return func() {
cancel()
<-done
}
}
func (w *Worker) heartbeatInterval() time.Duration {
interval := w.leaseTimeout / 3
if interval < 10*time.Second {
return 10 * time.Second
}
if interval > time.Minute {
return time.Minute
}
return interval
}
func classifyTranscriptionError(err error) string { func classifyTranscriptionError(err error) string {
if err == nil { if err == nil {
return "unknown" return "unknown"
@@ -184,10 +222,12 @@ func classifyTranscriptionError(err error) string {
return "bad_audio" return "bad_audio"
case strings.Contains(s, "audio download") || strings.Contains(s, "audio http 5"): case strings.Contains(s, "audio download") || strings.Contains(s, "audio http 5"):
return "storage_error" return "storage_error"
case strings.Contains(s, "whisperx http 4") || strings.Contains(s, "ffmpeg") || strings.Contains(s, "invalid data") || strings.Contains(s, "could not decode"): case strings.Contains(s, "audio transcription http 4") || strings.Contains(s, "invalid data") || strings.Contains(s, "could not decode"):
return "bad_audio" return "bad_audio"
case strings.Contains(s, "whisperx http 5") || strings.Contains(s, "whisperx do") || strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "closed network connection"): case strings.Contains(s, "audio transcription http 5") || strings.Contains(s, "audio transcription do") || strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "closed network connection"):
return "provider_unavailable" return "provider_unavailable"
case strings.Contains(s, "audio transcription http 4"):
return "bad_input"
case strings.Contains(s, "decode"): case strings.Contains(s, "decode"):
return "bad_response" return "bad_response"
default: default:
@@ -205,6 +245,8 @@ func classifyLLMError(err error) string {
return "timeout" return "timeout"
case strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "no route to host") || strings.Contains(s, "llm http 5"): case strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "no route to host") || strings.Contains(s, "llm http 5"):
return "model_unavailable" return "model_unavailable"
case strings.Contains(s, "maximum context length") || strings.Contains(s, "context length") || strings.Contains(s, "input_tokens"):
return "context_length"
case strings.Contains(s, "llm http 4") || strings.Contains(s, "messages are required"): case strings.Contains(s, "llm http 4") || strings.Contains(s, "messages are required"):
return "bad_input" return "bad_input"
case strings.Contains(s, "llm decode") || strings.Contains(s, "empty choices"): case strings.Contains(s, "llm decode") || strings.Contains(s, "empty choices"):

View File

@@ -0,0 +1,29 @@
package worker
import (
"errors"
"testing"
)
func TestClassifyLLMError(t *testing.T) {
tests := []struct {
name string
err error
want string
}{
{name: "timeout", err: errors.New("context deadline exceeded"), want: "timeout"},
{name: "unavailable", err: errors.New("llm HTTP 500: internal server error"), want: "model_unavailable"},
{name: "context length", err: errors.New("This model's maximum context length is 16384 tokens. input_tokens=16001"), want: "context_length"},
{name: "bad input", err: errors.New("llm HTTP 400: messages are required"), want: "bad_input"},
{name: "bad response", err: errors.New("llm decode: invalid character '<'"), want: "bad_response"},
{name: "unknown", err: errors.New("strange failure"), want: "unknown"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := classifyLLMError(tt.err); got != tt.want {
t.Fatalf("classifyLLMError() = %q, want %q", got, tt.want)
}
})
}
}

View File

@@ -11,10 +11,11 @@ data:
LLM_BASE_URL: "http://10.2.3.5:8002" LLM_BASE_URL: "http://10.2.3.5:8002"
LLM_MODEL: "qwen2.5-14b" LLM_MODEL: "qwen2.5-14b"
LLM_TIMEOUT: "5m" LLM_TIMEOUT: "5m"
WHISPERX_URL: "http://10.2.3.5:8001" # Whisper Large v3 is exposed on the AI server through an OpenAI-compatible
WHISPERX_TIMEOUT: "10m" # /v1/audio/transcriptions endpoint.
WHISPERX_LEAD_SILENCE: "800ms" AUDIO_TRANSCRIPTION_BASE_URL: "http://10.2.3.5:8004"
FFMPEG_PATH: "/usr/bin/ffmpeg" AUDIO_TRANSCRIPTION_MODEL: "openai/whisper-large-v3"
AUDIO_TRANSCRIPTION_TIMEOUT: "30m"
AI_STATS_SIDECAR_URL: "http://10.2.3.5:9090" AI_STATS_SIDECAR_URL: "http://10.2.3.5:9090"
AI_STATS_TIMEOUT: "8s" AI_STATS_TIMEOUT: "8s"
WORKER_POLL_INTERVAL: "2s" WORKER_POLL_INTERVAL: "2s"

View File

@@ -18,4 +18,5 @@ type: Opaque
stringData: stringData:
DATABASE_URL: "postgres://ai_service:ai_service@postgres:5432/ai_service?sslmode=disable" DATABASE_URL: "postgres://ai_service:ai_service@postgres:5432/ai_service?sslmode=disable"
LLM_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32" LLM_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32"
AUDIO_TRANSCRIPTION_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32"
AI_SERVICE_TOKEN: "d18bcacf9e02bae1806ee6b6eeda62b95be6a915c0a22936d9a700128b275442" AI_SERVICE_TOKEN: "d18bcacf9e02bae1806ee6b6eeda62b95be6a915c0a22936d9a700128b275442"

View File

@@ -30,11 +30,81 @@ spec:
fieldRef: fieldRef:
fieldPath: metadata.name fieldPath: metadata.name
- name: WORKER_TASK_TYPES - name: WORKER_TASK_TYPES
value: "llm_chat,chat_completion,call_analysis,telegram_classification" value: "llm_chat,chat_completion,call_analysis,telegram_classification,transcript_summary"
- name: WORKER_MODEL_PROFILES - name: WORKER_MODEL_PROFILES
value: "qwen2.5-14b" value: "qwen2.5-14b"
- name: WORKER_CLAIM_LIMIT - name: WORKER_CLAIM_LIMIT
value: "8" value: "8"
- name: WORKER_LEASE_TIMEOUT
value: "15m"
envFrom:
- configMapRef:
name: ai-service-config
- secretRef:
name: ai-service-secrets
startupProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 5
failureThreshold: 30
readinessProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 10
livenessProbe:
httpGet:
path: /healthz
port: 8081
periodSeconds: 10
resources:
requests:
cpu: 50m
memory: 96Mi
limits:
cpu: 500m
memory: 384Mi
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-service-analysis-worker
namespace: ai-service
spec:
replicas: 1
selector:
matchLabels:
app: ai-service-analysis-worker
template:
metadata:
labels:
app: ai-service-analysis-worker
spec:
terminationGracePeriodSeconds: 20
hostAliases:
- ip: "77.105.173.42"
hostnames:
- "s3-minio.estateliga.work"
containers:
- name: worker
image: localhost:30300/admin/ai-service:latest
command: ["/usr/local/bin/ai-service-worker"]
ports:
- containerPort: 8081
env:
- name: WORKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: WORKER_TASK_TYPES
value: "call_analysis"
- name: WORKER_MODEL_PROFILES
value: "qwen2.5-14b"
- name: WORKER_CLAIM_LIMIT
value: "8"
- name: WORKER_LEASE_TIMEOUT
value: "15m"
envFrom: envFrom:
- configMapRef: - configMapRef:
name: ai-service-config name: ai-service-config
@@ -98,9 +168,11 @@ spec:
- name: WORKER_TASK_TYPES - name: WORKER_TASK_TYPES
value: "transcription" value: "transcription"
- name: WORKER_MODEL_PROFILES - name: WORKER_MODEL_PROFILES
value: "whisperx" value: "whisper-large-v3"
- name: WORKER_CLAIM_LIMIT - name: WORKER_CLAIM_LIMIT
value: "1" value: "4"
- name: WORKER_LEASE_TIMEOUT
value: "15m"
envFrom: envFrom:
- configMapRef: - configMapRef:
name: ai-service-config name: ai-service-config