Compare commits

..

44 Commits

Author SHA1 Message Date
Grendgi
8d06cfabb1 fix: remove unused config helper
All checks were successful
CI / hygiene (push) Successful in 3s
Build and Deploy / build-and-deploy (push) Successful in 28s
CI / test (push) Successful in 22s
2026-06-18 10:28:43 +03:00
Grendgi
b81a8ee6be feat: cancel ai jobs by id
Some checks failed
CI / hygiene (push) Successful in 1s
Build and Deploy / build-and-deploy (push) Successful in 42s
CI / test (push) Failing after 21s
2026-06-17 17:39:29 +03:00
Grendgi
63553fba33 feat: version ai result schemas 2026-06-17 16:46:03 +03:00
Grendgi
f32265400b feat: expand ai retry policy 2026-06-17 16:39:58 +03:00
Grendgi
aad905c2c8 fix: expose ai health component errors 2026-06-17 16:35:47 +03:00
Grendgi
22d85ce646 fix: align ai health status contract 2026-06-17 16:34:44 +03:00
Grendgi
3c124c5f5a feat: expose ai service health detail 2026-06-17 16:31:22 +03:00
Grendgi
773f53f790 Expose AI queue heartbeat metrics
Some checks failed
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 28s
CI / test (push) Failing after 7s
2026-06-12 16:50:15 +03:00
Grendgi
abc64214d2 Add AI service CI hygiene guard 2026-06-12 16:42:35 +03:00
Grendgi
e45884c5e5 Retry AI service database connection on startup 2026-06-12 16:28:02 +03:00
Grendgi
c618ffaff9 Update analysis worker deployment
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 15s
2026-06-11 12:20:47 +03:00
Grendgi
92ac01d8b5 Use verbose Whisper transcription output
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 22s
2026-06-11 11:40:52 +03:00
Grendgi
b536877181 Stabilize Whisper transcription requests
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 21s
2026-06-11 10:16:34 +03:00
Grendgi
bc71caa762 Stabilize AI worker leases
Some checks failed
CI / test (push) Failing after 7s
Build and Deploy / build-and-deploy (push) Successful in 15s
2026-06-10 17:50:53 +03:00
Grendgi
d0980007d7 Add dedicated call analysis worker
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 25s
2026-06-10 17:37:21 +03:00
Grendgi
ea632902bb Shorten AI worker lease timeout
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 15s
2026-06-10 17:05:26 +03:00
Grendgi
800d1d7cdd Keep AI jobs alive during processing
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 1m10s
2026-06-10 16:48:58 +03:00
Grendgi
837acf2f00 Add AI queue backlog metrics
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 24s
2026-06-10 16:36:35 +03:00
Grendgi
631a45aff3 Classify LLM context length errors
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 31s
2026-06-10 16:16:57 +03:00
Grendgi
7a1965e402 Increase transcription worker claim limit
Some checks failed
CI / test (push) Failing after 7s
Build and Deploy / build-and-deploy (push) Successful in 16s
2026-06-10 16:11:35 +03:00
Grendgi
3c2f13b967 Add AI job stage metrics
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 34s
2026-06-10 16:07:21 +03:00
Grendgi
2a481fdc54 Add automatic retry policy for AI jobs
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 33s
2026-06-10 15:58:41 +03:00
Grendgi
f54400e8e2 Requeue stale worker jobs on idempotent create
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 21s
2026-06-10 14:42:14 +03:00
Grendgi
e6ae792325 Drop legacy audio config aliases
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 29s
2026-06-10 14:28:52 +03:00
Grendgi
80fa21ff80 Refresh AI service pipeline docs
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 27s
2026-06-10 13:43:59 +03:00
Grendgi
7d0e27f681 Extend LLM worker lease timeout
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 23s
2026-06-10 13:20:06 +03:00
Grendgi
11247f17de Route transcript summary jobs to LLM worker
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 21s
2026-06-10 12:32:22 +03:00
Grendgi
ae1802dab9 Stop sending prompt to Whisper transcription
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 24s
2026-06-10 11:30:08 +03:00
Grendgi
bde56978d6 Fix Whisper large v3 audio compose
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 18s
2026-06-10 10:15:24 +03:00
Grendgi
8d6cd84403 Switch transcription to Whisper large v3
Some checks failed
CI / test (push) Failing after 10s
Build and Deploy / build-and-deploy (push) Successful in 24s
2026-06-10 10:10:13 +03:00
Grendgi
1b63dcdbf5 Increase Voxtral transcription timeouts
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 20s
2026-06-09 22:27:52 +03:00
Grendgi
bf945e05e3 Use Voxtral JSON transcription without fake speakers
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 23s
2026-06-09 22:09:27 +03:00
Grendgi
e074f6b226 Run Voxtral transcription worker with two jobs
Some checks failed
CI / test (push) Failing after 9s
Build and Deploy / build-and-deploy (push) Successful in 19s
2026-06-09 17:16:24 +03:00
Grendgi
9bd6d726f0 Make Voxtral the only transcription provider
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 27s
2026-06-09 16:54:54 +03:00
Grendgi
5c965be8c9 Split single Voxtral transcript segments
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 23s
2026-06-09 16:20:52 +03:00
Grendgi
64bf40b3ba Segment Voxtral transcripts for telephony
All checks were successful
CI / test (push) Successful in 15s
Build and Deploy / build-and-deploy (push) Successful in 32s
2026-06-09 16:12:57 +03:00
Grendgi
e6c2b46cf6 Use Voxtral audio transcription endpoint
All checks were successful
CI / test (push) Successful in 14s
Build and Deploy / build-and-deploy (push) Successful in 30s
2026-06-09 15:51:50 +03:00
Grendgi
817eb8ff71 Disable inactive WhisperX provider status
All checks were successful
CI / test (push) Successful in 12s
Build and Deploy / build-and-deploy (push) Successful in 18s
2026-06-09 15:38:55 +03:00
Grendgi
94e0d03580 Switch transcription comparison to Voxtral
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 18s
2026-06-09 15:11:52 +03:00
Grendgi
add15f1385 Cast transcription comparison averages
All checks were successful
CI / test (push) Successful in 14s
Build and Deploy / build-and-deploy (push) Successful in 23s
2026-06-09 15:01:32 +03:00
Grendgi
35c60f0e0e Add transcription comparison stats 2026-06-09 14:59:08 +03:00
Grendgi
88e7c86836 Document vLLM audio URL payloads
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 16s
2026-06-09 13:49:27 +03:00
Grendgi
1202ebcb7f Use vLLM audio URL payloads
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 24s
2026-06-09 13:46:28 +03:00
Grendgi
2ef71a822b Add vLLM audio image for transcription models
All checks were successful
CI / test (push) Successful in 13s
Build and Deploy / build-and-deploy (push) Successful in 22s
2026-06-09 13:38:08 +03:00
26 changed files with 1478 additions and 821 deletions

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env bash
set -euo pipefail
fail=0
while IFS= read -r -d '' path; do
base="$(basename "$path")"
case "$base" in
.DS_Store|.env)
echo "::error file=$path::tracked local-only file is forbidden"
fail=1
;;
esac
case "$path" in
*node_modules/*|node_modules/*)
echo "::error file=$path::tracked node_modules content is forbidden"
fail=1
;;
*.tmp|*.temp|*.bak|*.orig|*.rej|*.zip|*.tar|*.tar.gz|*.tgz|*.rar|*.7z)
echo "::error file=$path::tracked temporary/archive artifact is forbidden"
fail=1
;;
esac
if [ -f "$path" ]; then
size="$(wc -c < "$path" | tr -d ' ')"
if [ "${size:-0}" -gt 52428800 ]; then
echo "::error file=$path::tracked file is larger than 50 MiB"
fail=1
fi
fi
done < <(git ls-files -z)
exit "$fail"

View File

@@ -5,8 +5,15 @@ on:
pull_request: pull_request:
jobs: jobs:
hygiene:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: bash .gitea/scripts/hygiene-check.sh
test: test:
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: hygiene
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: actions/setup-go@v5 - uses: actions/setup-go@v5

View File

@@ -58,8 +58,11 @@ jobs:
server=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }} server=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }}
kubectl -n ai-service set image deployment/ai-service-worker \ kubectl -n ai-service set image deployment/ai-service-worker \
worker=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }} worker=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }}
kubectl -n ai-service set image deployment/ai-service-analysis-worker \
worker=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }}
kubectl -n ai-service set image deployment/ai-service-transcription-worker \ kubectl -n ai-service set image deployment/ai-service-transcription-worker \
worker=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }} worker=${{ env.NODE_REGISTRY }}/admin/ai-service:${{ github.sha }}
kubectl -n ai-service rollout status deployment/ai-service --timeout=180s kubectl -n ai-service rollout status deployment/ai-service --timeout=180s
kubectl -n ai-service rollout status deployment/ai-service-worker --timeout=180s kubectl -n ai-service rollout status deployment/ai-service-worker --timeout=180s
kubectl -n ai-service rollout status deployment/ai-service-analysis-worker --timeout=180s
kubectl -n ai-service rollout status deployment/ai-service-transcription-worker --timeout=180s kubectl -n ai-service rollout status deployment/ai-service-transcription-worker --timeout=180s

124
README.md
View File

@@ -2,8 +2,9 @@
Technical AI job service for Portal workloads. Technical AI job service for Portal workloads.
The first version owns only AI job lifecycle and metrics. Business data stays in AI Service owns technical AI job lifecycle, provider execution and metrics.
domain services such as `telephony`, `monitoring-tg` and `monitoring-pf`. Business data stays in domain services such as `telephony`, `monitoring-tg` and
`monitoring-pf`.
## Generic job contract ## Generic job contract
@@ -14,8 +15,9 @@ The service is intentionally domain-agnostic:
- `owner_ref` is the caller's stable object reference, for example - `owner_ref` is the caller's stable object reference, for example
`beeline/{call_id}` or `channel/{message_id}`. `beeline/{call_id}` or `channel/{message_id}`.
- `task_type` describes the technical task class, for example - `task_type` describes the technical task class, for example
`transcribe`, `call_analysis`, `tg_analysis`, `pf_competitor_analysis`. `transcription`, `transcript_summary`, `call_analysis`,
- `model_profile` selects a runtime profile, for example `whisperx`, `telegram_classification`, `tg_analysis`, `pf_competitor_analysis`.
- `model_profile` selects a runtime profile, for example `whisper-large-v3`,
`qwen2.5-14b`, `vision`, or a future provider profile. `qwen2.5-14b`, `vision`, or a future provider profile.
- `input` and `result` are JSON payloads owned by the caller and worker. - `input` and `result` are JSON payloads owned by the caller and worker.
@@ -24,8 +26,9 @@ service.
## Built-in workers ## Built-in workers
The first built-in worker processes `llm_chat`, `chat_completion` and The LLM worker processes `llm_chat`, `chat_completion`, `call_analysis`,
`call_analysis` jobs whose `model_profile` equals `LLM_MODEL`. `transcript_summary` and `telegram_classification` jobs whose `model_profile`
equals `LLM_MODEL`.
Input can be either explicit messages: Input can be either explicit messages:
@@ -40,39 +43,34 @@ Input can be either explicit messages:
``` ```
or compact `system` / `user` fields. The completed job result contains or compact `system` / `user` fields. The completed job result contains
`content`, `model`, `usage` and `duration_ms`. `schema_version=ai.chat_result.v1`, `content`, `model`, `usage` and
`duration_ms`.
`call_analysis` uses the same input contract as `llm_chat`; callers may include `call_analysis` and `transcript_summary` use the same input contract as
domain metadata fields in `input`, but the worker only reads chat fields such as `llm_chat`; callers may include domain metadata fields in `input`, but the
`system`, `user`, `messages`, `max_tokens` and `response_format`. worker only reads chat fields such as `system`, `user`, `messages`,
`max_tokens` and `response_format`.
`transcription` jobs can run several transcription providers in order for `transcription` jobs are processed only by Whisper Large v3
temporary A/B comparison. The main `segments` field remains compatible with (`openai/whisper-large-v3`) through an OpenAI-compatible
telephony and contains the first successful provider result. The full comparison `/v1/audio/transcriptions` endpoint. The returned `segments` field stays
is stored in `attempts` with `provider`, `model`, `status`, `text`, `segments`, compatible with telephony. If the provider returns one long segment, AI Service
`duration_ms` and `error`. splits it into smaller transcript segments without inventing speaker labels.
The completed job result contains
`schema_version=ai.transcription_result.v1`, `provider`, `model`, `language`,
`segments`, optional provider `attempts` and `duration_ms`.
Recommended comparison order: AI-server compose snippet for Whisper Large v3 lives in
`deploy/ai-server/docker-compose.audio.yml`:
1. `whisperx` - Whisper endpoint: `http://10.2.3.5:8004`
2. `qwen2-audio` (`Qwen/Qwen2-Audio-7B-Instruct`) - Start Whisper:
3. `voxtral-small` (`mistralai/Voxtral-Small-24B-2507`) `docker compose -f docker-compose.yml -f docker-compose.audio.yml --profile whisper-large-v3 up -d whisper-large-v3`
Qwen2-Audio and Voxtral are called through an OpenAI-compatible In Kubernetes the dedicated transcription worker may claim more than one
`/v1/chat/completions` endpoint with `input_audio`; set their endpoint URLs only `whisper-large-v3` job at a time. This keeps download/upload/wait overhead from
after the models are actually exposed on the AI server. serializing the queue while the Whisper provider still controls the actual GPU
scheduling.
AI-server compose snippets for these temporary comparison endpoints live in
`deploy/ai-server/docker-compose.audio.yml`. They are profile-gated because the
single GPU cannot keep the production text vLLM, two WhisperX instances, Qwen2
Audio and Voxtral loaded at the same time:
- Qwen2-Audio endpoint: `http://10.2.3.5:8003`
- Voxtral endpoint: `http://10.2.3.5:8004`
- Start Qwen only:
`docker compose -f docker-compose.yml -f docker-compose.audio.yml --profile qwen-audio up -d qwen-audio`
- Start Voxtral only:
`docker compose -f docker-compose.yml -f docker-compose.audio.yml --profile voxtral-small up -d voxtral-small`
## API ## API
@@ -90,7 +88,9 @@ Audio and Voxtral loaded at the same time:
- `GET /api/v1/providers/status` checks configured AI providers without - `GET /api/v1/providers/status` checks configured AI providers without
returning secrets. returning secrets.
- `GET /api/v1/infra/status` returns AI-server sidecar telemetry - `GET /api/v1/infra/status` returns AI-server sidecar telemetry
(GPU, containers, vLLM and WhisperX live metrics) when configured. (GPU, containers and vLLM live metrics) when configured.
- `GET /health/detail` returns PostgreSQL, provider, queue, error, throughput
and infra components for Portal `admin/health`.
- `GET /healthz` returns process health. - `GET /healthz` returns process health.
- `GET /readyz` checks PostgreSQL readiness. - `GET /readyz` checks PostgreSQL readiness.
- Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`: - Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`:
@@ -100,6 +100,34 @@ All `/api/v1/*` endpoints require `Authorization: Bearer <AI_SERVICE_TOKEN>`
when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open
for Kubernetes probes. for Kubernetes probes.
## Retry policy
Workers store a normalized `error_code` on failed jobs. AI Service requeues only
explicitly retryable categories while attempts remain.
| Category | Retry | Delay |
| --- | --- | --- |
| `provider_unavailable`, `model_unavailable`, `provider_error`, `dependency_error`, `timeout`, `storage_error`, `stale_worker` | yes | 30s |
| `bad_response`, `transcript_hallucination`, `transcript_incomplete`, `internal_error`, `unknown` | yes | 2m |
| `bad_audio`, `bad_input`, `context_length`, `unsupported_task`, `cancelled` | no | - |
Domain services may still expose manual retry for terminal errors after the
underlying data or prompt is corrected.
## Result schemas
AI Service result payloads are versioned with `schema_version`. Consumers should
ignore unknown fields and reject only unsupported major schema names.
Current schemas:
- `ai.chat_result.v1`: `{schema_version, content, model, usage?, duration_ms}`.
- `ai.transcription_result.v1`:
`{schema_version, provider?, model?, attempts?, language, segments, duration_ms}`.
New optional fields may be added to a `v1` schema without a breaking change.
Breaking shape changes require a new schema name.
## Configuration ## Configuration
- `HTTP_HOST`, default `0.0.0.0` - `HTTP_HOST`, default `0.0.0.0`
@@ -111,19 +139,11 @@ for Kubernetes probes.
- `LLM_API_KEY`, primary LLM API key - `LLM_API_KEY`, primary LLM API key
- `LLM_MODEL`, default `qwen2.5-14b` - `LLM_MODEL`, default `qwen2.5-14b`
- `LLM_TIMEOUT`, default `5m` - `LLM_TIMEOUT`, default `5m`
- `TRANSCRIPTION_PROVIDERS`, default `whisperx`, comma-separated ordered list: - `AUDIO_TRANSCRIPTION_BASE_URL`, OpenAI-compatible transcription endpoint
`whisperx,qwen2-audio,voxtral-small` - `AUDIO_TRANSCRIPTION_MODEL`, default `openai/whisper-large-v3`
- `WHISPERX_URL`, WhisperX endpoint for transcription jobs - `AUDIO_TRANSCRIPTION_API_KEY`, optional bearer token; falls back to
- `QWEN_AUDIO_BASE_URL`, OpenAI-compatible endpoint for Qwen2-Audio
- `QWEN_AUDIO_MODEL`, default `Qwen/Qwen2-Audio-7B-Instruct`
- `QWEN_AUDIO_API_KEY`, optional bearer token for Qwen2-Audio; falls back to
`AUDIO_LLM_API_KEY`, then `LLM_API_KEY` `AUDIO_LLM_API_KEY`, then `LLM_API_KEY`
- `VOXTRAL_BASE_URL`, OpenAI-compatible endpoint for Voxtral - `AUDIO_TRANSCRIPTION_PROMPT`, transcription instruction
- `VOXTRAL_MODEL`, default `mistralai/Voxtral-Small-24B-2507`
- `VOXTRAL_API_KEY`, optional bearer token for Voxtral; falls back to
`AUDIO_LLM_API_KEY`, then `LLM_API_KEY`
- `AUDIO_LLM_PROMPT`, transcription instruction for audio LLM providers
- `AUDIO_LLM_MAX_TOKENS`, default `4096`
- `WORKER_ID`, default hostname - `WORKER_ID`, default hostname
- `WORKER_HTTP_HOST`, default `0.0.0.0` - `WORKER_HTTP_HOST`, default `0.0.0.0`
- `WORKER_HTTP_PORT`, default `8081` - `WORKER_HTTP_PORT`, default `8081`
@@ -131,8 +151,10 @@ for Kubernetes probes.
- `WORKER_CLAIM_LIMIT`, default `4` - `WORKER_CLAIM_LIMIT`, default `4`
- `WORKER_LEASE_TIMEOUT`, default `15m` - `WORKER_LEASE_TIMEOUT`, default `15m`
## Next integration step ## Current telephony pipeline
`telephony` should first mirror low-risk analysis jobs into this service while `telephony` now uses AI Service as the only AI execution path:
continuing local processing. Remote execution can then be enabled by feature
flag per task type. 1. `transcription` turns call audio into segments.
2. `transcript_summary` creates a detailed Russian call summary.
3. `call_analysis` runs tags and negotiation rules against the summary.

View File

@@ -49,21 +49,11 @@ func main() {
llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout) llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout)
transcriber := transcription.NewWithOptions(transcription.Options{ transcriber := transcription.NewWithOptions(transcription.Options{
Providers: cfg.TranscriptionProviders, AudioBaseURL: cfg.AudioBaseURL,
WhisperXURL: cfg.WhisperXURL, AudioAPIKey: cfg.AudioAPIKey,
WhisperXTimeout: cfg.WhisperXTimeout, AudioModel: cfg.AudioModel,
FfmpegPath: cfg.FfmpegPath, AudioTimeout: cfg.AudioTimeout,
LeadSilence: cfg.WhisperXLeadSilence, AudioPrompt: cfg.AudioPrompt,
QwenAudioBaseURL: cfg.QwenAudioBaseURL,
QwenAudioAPIKey: cfg.QwenAudioAPIKey,
QwenAudioModel: cfg.QwenAudioModel,
QwenAudioTimeout: cfg.QwenAudioTimeout,
VoxtralBaseURL: cfg.VoxtralBaseURL,
VoxtralAPIKey: cfg.VoxtralAPIKey,
VoxtralModel: cfg.VoxtralModel,
VoxtralTimeout: cfg.VoxtralTimeout,
AudioLLMPrompt: cfg.AudioLLMPrompt,
AudioLLMMaxTokens: cfg.AudioLLMMaxTokens,
}) })
w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit) w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit)
healthSrv := startHealthServer(ctx, db, cfg) healthSrv := startHealthServer(ctx, db, cfg)
@@ -72,8 +62,8 @@ func main() {
"worker_id", cfg.WorkerID, "worker_id", cfg.WorkerID,
"model", cfg.LLMModel, "model", cfg.LLMModel,
"transcription_enabled", transcriber != nil, "transcription_enabled", transcriber != nil,
"transcription_providers", cfg.TranscriptionProviders, "transcription_provider", transcription.ProviderWhisperLargeV3,
"whisperx_lead_silence", cfg.WhisperXLeadSilence.String(), "transcription_model", cfg.AudioModel,
"task_types", cfg.WorkerTaskTypes, "task_types", cfg.WorkerTaskTypes,
"model_profiles", cfg.WorkerModelProfiles, "model_profiles", cfg.WorkerModelProfiles,
"poll_interval", cfg.WorkerPollInterval.String(), "poll_interval", cfg.WorkerPollInterval.String(),
@@ -140,14 +130,15 @@ func (h workerHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}) })
case r.Method == http.MethodGet && path == "/worker/status": case r.Method == http.MethodGet && path == "/worker/status":
writeWorkerJSON(w, http.StatusOK, map[string]any{ writeWorkerJSON(w, http.StatusOK, map[string]any{
"status": "running", "status": "running",
"worker_id": h.cfg.WorkerID, "worker_id": h.cfg.WorkerID,
"task_types": h.cfg.WorkerTaskTypes, "task_types": h.cfg.WorkerTaskTypes,
"model_profiles": h.cfg.WorkerModelProfiles, "model_profiles": h.cfg.WorkerModelProfiles,
"transcription_providers": h.cfg.TranscriptionProviders, "transcription_provider": transcription.ProviderWhisperLargeV3,
"claim_limit": h.cfg.WorkerClaimLimit, "transcription_model": h.cfg.AudioModel,
"poll_interval": h.cfg.WorkerPollInterval.String(), "claim_limit": h.cfg.WorkerClaimLimit,
"lease_timeout": h.cfg.WorkerLeaseTimeout.String(), "poll_interval": h.cfg.WorkerPollInterval.String(),
"lease_timeout": h.cfg.WorkerLeaseTimeout.String(),
}) })
default: default:
writeWorkerJSON(w, http.StatusNotFound, map[string]any{"error": "not found"}) writeWorkerJSON(w, http.StatusNotFound, map[string]any{"error": "not found"})

View File

@@ -1,63 +1,12 @@
services: services:
qwen-audio: whisper-large-v3:
image: vllm/vllm-openai:latest build:
container_name: qwen-audio context: .
dockerfile: vllm-audio.Dockerfile
image: vllm-audio:local
container_name: whisper-large-v3
profiles: profiles:
- qwen-audio - whisper-large-v3
- audio-compare
restart: unless-stopped
ipc: host
runtime: nvidia
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
environment:
HUGGING_FACE_HUB_TOKEN: ${HF_TOKEN}
VLLM_API_KEY: ${VLLM_API_KEY}
HF_HOME: /cache
volumes:
- ./data/vllm-cache:/cache
networks:
- audio-models
ports:
- "10.2.3.5:8003:8000"
command:
- "--model"
- "Qwen/Qwen2-Audio-7B-Instruct"
- "--served-model-name"
- "Qwen/Qwen2-Audio-7B-Instruct"
- "--trust-remote-code"
- "--host"
- "0.0.0.0"
- "--port"
- "8000"
- "--max-model-len"
- "8192"
- "--gpu-memory-utilization"
- "0.25"
- "--api-key"
- "${VLLM_API_KEY}"
- "--max-num-seqs"
- "4"
- "--max-num-batched-tokens"
- "4096"
healthcheck:
test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 30s
timeout: 5s
retries: 5
start_period: 900s
voxtral-small:
image: vllm/vllm-openai:latest
container_name: voxtral-small
profiles:
- voxtral-small
- audio-compare
restart: unless-stopped restart: unless-stopped
ipc: host ipc: host
runtime: nvidia runtime: nvidia
@@ -80,32 +29,17 @@ services:
- "10.2.3.5:8004:8000" - "10.2.3.5:8004:8000"
command: command:
- "--model" - "--model"
- "mistralai/Voxtral-Small-24B-2507" - "openai/whisper-large-v3"
- "--served-model-name" - "--served-model-name"
- "mistralai/Voxtral-Small-24B-2507" - "openai/whisper-large-v3"
- "--tokenizer-mode"
- "mistral"
- "--config-format"
- "mistral"
- "--load-format"
- "mistral"
- "--tool-call-parser"
- "mistral"
- "--enable-auto-tool-choice"
- "--host" - "--host"
- "0.0.0.0" - "0.0.0.0"
- "--port" - "--port"
- "8000" - "8000"
- "--max-model-len"
- "32768"
- "--gpu-memory-utilization" - "--gpu-memory-utilization"
- "0.62" - "0.55"
- "--api-key" - "--api-key"
- "${VLLM_API_KEY}" - "${VLLM_API_KEY}"
- "--max-num-seqs"
- "2"
- "--max-num-batched-tokens"
- "8192"
healthcheck: healthcheck:
test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"] test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 30s interval: 30s

View File

@@ -0,0 +1,3 @@
FROM vllm/vllm-openai:latest
RUN python3 -m pip install --no-cache-dir av soundfile librosa

View File

@@ -1,20 +0,0 @@
upstream whisperx_upstream {
server whisperx-1:8000 max_fails=3 fail_timeout=30s;
}
server {
listen 80 default_server;
client_max_body_size 200m;
location / {
proxy_pass http://whisperx_upstream;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_connect_timeout 30s;
proxy_send_timeout 10m;
proxy_read_timeout 10m;
proxy_request_buffering off;
proxy_buffering off;
}
}

View File

@@ -14,27 +14,17 @@ type Config struct {
MigrateOnStart bool MigrateOnStart bool
APIAuthToken string APIAuthToken string
LLMBaseURL string LLMBaseURL string
LLMAPIKey string LLMAPIKey string
LLMModel string LLMModel string
LLMTimeout time.Duration LLMTimeout time.Duration
TranscriptionProviders []string AudioBaseURL string
WhisperXURL string AudioAPIKey string
WhisperXTimeout time.Duration AudioModel string
WhisperXLeadSilence time.Duration AudioTimeout time.Duration
QwenAudioBaseURL string AudioPrompt string
QwenAudioAPIKey string AIStatsSidecarURL string
QwenAudioModel string AIStatsTimeout time.Duration
QwenAudioTimeout time.Duration
VoxtralBaseURL string
VoxtralAPIKey string
VoxtralModel string
VoxtralTimeout time.Duration
AudioLLMMaxTokens int
AudioLLMPrompt string
FfmpegPath string
AIStatsSidecarURL string
AIStatsTimeout time.Duration
WorkerID string WorkerID string
WorkerHTTPHost string WorkerHTTPHost string
@@ -54,27 +44,17 @@ func Load() Config {
MigrateOnStart: envBool("MIGRATE_ON_START", true), MigrateOnStart: envBool("MIGRATE_ON_START", true),
APIAuthToken: envString("AI_SERVICE_TOKEN", ""), APIAuthToken: envString("AI_SERVICE_TOKEN", ""),
LLMBaseURL: envString("LLM_BASE_URL", ""), LLMBaseURL: envString("LLM_BASE_URL", ""),
LLMAPIKey: envString("LLM_API_KEY", ""), LLMAPIKey: envString("LLM_API_KEY", ""),
LLMModel: envString("LLM_MODEL", "qwen2.5-14b"), LLMModel: envString("LLM_MODEL", "qwen2.5-14b"),
LLMTimeout: envDuration("LLM_TIMEOUT", 5*time.Minute), LLMTimeout: envDuration("LLM_TIMEOUT", 5*time.Minute),
TranscriptionProviders: envCSVDefault("TRANSCRIPTION_PROVIDERS", []string{"whisperx"}), AudioBaseURL: envString("AUDIO_TRANSCRIPTION_BASE_URL", ""),
WhisperXURL: envString("WHISPERX_URL", ""), AudioAPIKey: envString("AUDIO_TRANSCRIPTION_API_KEY", ""),
WhisperXTimeout: envDuration("WHISPERX_TIMEOUT", 10*time.Minute), AudioModel: envString("AUDIO_TRANSCRIPTION_MODEL", "openai/whisper-large-v3"),
WhisperXLeadSilence: envDuration("WHISPERX_LEAD_SILENCE", 800*time.Millisecond), AudioTimeout: envDuration("AUDIO_TRANSCRIPTION_TIMEOUT", 10*time.Minute),
QwenAudioBaseURL: envString("QWEN_AUDIO_BASE_URL", envString("AUDIO_LLM_BASE_URL", "")), AudioPrompt: envString("AUDIO_TRANSCRIPTION_PROMPT", defaultAudioPrompt()),
QwenAudioAPIKey: envString("QWEN_AUDIO_API_KEY", envString("AUDIO_LLM_API_KEY", envString("LLM_API_KEY", ""))), AIStatsSidecarURL: envString("AI_STATS_SIDECAR_URL", ""),
QwenAudioModel: envString("QWEN_AUDIO_MODEL", "Qwen/Qwen2-Audio-7B-Instruct"), AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second),
QwenAudioTimeout: envDuration("QWEN_AUDIO_TIMEOUT", envDuration("AUDIO_LLM_TIMEOUT", 10*time.Minute)),
VoxtralBaseURL: envString("VOXTRAL_BASE_URL", envString("AUDIO_LLM_BASE_URL", "")),
VoxtralAPIKey: envString("VOXTRAL_API_KEY", envString("AUDIO_LLM_API_KEY", envString("LLM_API_KEY", ""))),
VoxtralModel: envString("VOXTRAL_MODEL", "mistralai/Voxtral-Small-24B-2507"),
VoxtralTimeout: envDuration("VOXTRAL_TIMEOUT", envDuration("AUDIO_LLM_TIMEOUT", 10*time.Minute)),
AudioLLMMaxTokens: envInt("AUDIO_LLM_MAX_TOKENS", 4096),
AudioLLMPrompt: envString("AUDIO_LLM_PROMPT", defaultAudioLLMPrompt()),
FfmpegPath: envString("FFMPEG_PATH", "/usr/bin/ffmpeg"),
AIStatsSidecarURL: envString("AI_STATS_SIDECAR_URL", ""),
AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second),
WorkerID: envString("WORKER_ID", hostname()), WorkerID: envString("WORKER_ID", hostname()),
WorkerHTTPHost: envString("WORKER_HTTP_HOST", "0.0.0.0"), WorkerHTTPHost: envString("WORKER_HTTP_HOST", "0.0.0.0"),
@@ -145,15 +125,8 @@ func envCSV(key string) []string {
return out return out
} }
func envCSVDefault(key string, fallback []string) []string { func defaultAudioPrompt() string {
if values := envCSV(key); len(values) > 0 { return ""
return values
}
return fallback
}
func defaultAudioLLMPrompt() string {
return "Расшифруй речь из аудио максимально точно. Сохрани русский язык, имена, телефоны, суммы и смысловые паузы. Не добавляй комментарии, анализ, Markdown или JSON. Верни только чистый текст расшифровки."
} }
func hostname() string { func hostname() string {

View File

@@ -5,6 +5,7 @@ import (
"time" "time"
"ai-service/internal/model" "ai-service/internal/model"
"ai-service/internal/transcription"
) )
type dashboardResponse struct { type dashboardResponse struct {
@@ -30,7 +31,7 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
ctx, cancel := contextWithTimeout(r, 12*time.Second) ctx, cancel := contextWithTimeout(r, 12*time.Second)
defer cancel() defer cancel()
stats, err := s.store.Stats(ctx) stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
if err != nil { if err != nil {
writeError(w, http.StatusInternalServerError, err.Error()) writeError(w, http.StatusInternalServerError, err.Error())
return return
@@ -43,7 +44,6 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, err.Error()) writeError(w, http.StatusInternalServerError, err.Error())
return return
} }
resp := dashboardResponse{ resp := dashboardResponse{
At: now, At: now,
Summary: summarizeQueues(stats), Summary: summarizeQueues(stats),
@@ -52,9 +52,7 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
At: now, At: now,
Providers: []providerStatus{ Providers: []providerStatus{
s.checkLLM(ctx), s.checkLLM(ctx),
s.checkWhisperX(ctx), s.checkAudioLLM(ctx, transcription.ProviderWhisperLargeV3, s.cfg.AudioBaseURL, s.cfg.AudioAPIKey, s.cfg.AudioModel, s.cfg.AudioTimeout),
s.checkAudioLLM(ctx, "qwen2-audio", s.cfg.QwenAudioBaseURL, s.cfg.QwenAudioAPIKey, s.cfg.QwenAudioModel, s.cfg.QwenAudioTimeout),
s.checkAudioLLM(ctx, "voxtral-small", s.cfg.VoxtralBaseURL, s.cfg.VoxtralAPIKey, s.cfg.VoxtralModel, s.cfg.VoxtralTimeout),
}, },
}, },
Infra: loadInfraSnapshot(r, s.cfg), Infra: loadInfraSnapshot(r, s.cfg),

241
internal/httpapi/health.go Normal file
View File

@@ -0,0 +1,241 @@
package httpapi
import (
"context"
"net/http"
"strings"
"time"
"ai-service/internal/model"
"ai-service/internal/transcription"
)
type healthDetailResponse struct {
Status string `json:"status"`
Generated time.Time `json:"generated_at"`
Components []healthComponent `json:"components"`
}
type healthComponent struct {
Name string `json:"name"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
Data map[string]any `json:"data,omitempty"`
}
func (s *Server) handleHealthDetail(w http.ResponseWriter, r *http.Request) {
ctx, cancel := contextWithTimeout(r, 12*time.Second)
defer cancel()
resp := healthDetailResponse{
Status: "ok",
Generated: time.Now().UTC(),
}
if err := s.store.Ping(ctx); err != nil {
resp.Components = append(resp.Components, healthComponent{
Name: "postgres",
Status: "down",
Error: err.Error(),
})
resp.Status = worseHealthStatus(resp.Status, "down")
writeJSON(w, http.StatusServiceUnavailable, resp)
return
}
resp.Components = append(resp.Components, healthComponent{Name: "postgres", Status: "ok"})
stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
if err != nil {
resp.Components = append(resp.Components, healthComponent{
Name: "queue",
Status: "down",
Error: err.Error(),
})
resp.Status = worseHealthStatus(resp.Status, "down")
writeJSON(w, http.StatusServiceUnavailable, resp)
return
}
for _, component := range []healthComponent{
s.healthProviders(ctx),
healthQueue(stats),
healthErrors(stats),
healthThroughput(stats),
healthInfra(loadInfraSnapshot(r, s.cfg)),
} {
resp.Components = append(resp.Components, component)
resp.Status = worseHealthStatus(resp.Status, component.Status)
}
statusCode := http.StatusOK
if resp.Status == "down" {
statusCode = http.StatusServiceUnavailable
}
writeJSON(w, statusCode, resp)
}
func (s *Server) healthProviders(ctx context.Context) healthComponent {
providers := []providerStatus{
s.checkLLM(ctx),
s.checkAudioLLM(ctx, transcription.ProviderWhisperLargeV3, s.cfg.AudioBaseURL, s.cfg.AudioAPIKey, s.cfg.AudioModel, s.cfg.AudioTimeout),
}
status := "ok"
messages := make([]string, 0)
for _, provider := range providers {
switch {
case !provider.Configured:
status = worseHealthStatus(status, "degraded")
messages = append(messages, provider.Name+" not configured")
case !provider.OK:
status = worseHealthStatus(status, "down")
if provider.Error != "" {
messages = append(messages, provider.Name+": "+provider.Error)
} else {
messages = append(messages, provider.Name+" unavailable")
}
case provider.Stale:
status = worseHealthStatus(status, "degraded")
if provider.Error != "" {
messages = append(messages, provider.Name+": "+provider.Error)
}
}
}
return healthComponent{
Name: "providers",
Status: status,
Error: strings.Join(messages, "; "),
Data: map[string]any{
"providers": providers,
},
}
}
func healthQueue(stats *model.Stats) healthComponent {
var pending, running, staleRunning int64
var oldestPendingAgeSeconds, oldestRunningAgeSeconds int64
for _, row := range stats.Backlog {
pending += row.Pending
running += row.Running
staleRunning += row.StaleRunning
if row.OldestPendingAgeSeconds > oldestPendingAgeSeconds {
oldestPendingAgeSeconds = row.OldestPendingAgeSeconds
}
if row.OldestRunningAgeSeconds > oldestRunningAgeSeconds {
oldestRunningAgeSeconds = row.OldestRunningAgeSeconds
}
}
status := "ok"
message := ""
if staleRunning > 0 {
status = "degraded"
message = "there are stale running jobs"
}
return healthComponent{
Name: "queue",
Status: status,
Error: message,
Data: map[string]any{
"pending": pending,
"running": running,
"stale_running": staleRunning,
"oldest_pending_age_seconds": oldestPendingAgeSeconds,
"oldest_running_age_seconds": oldestRunningAgeSeconds,
"backlog": stats.Backlog,
"queue_status_totals": stats.Queues,
"owner_status_totals": stats.Owners,
},
}
}
func healthErrors(stats *model.Stats) healthComponent {
var failedTotal, failed24h int64
for _, row := range stats.Errors {
failedTotal += row.Total
failed24h += row.Last24h
}
status := "ok"
message := ""
if failed24h > 0 {
status = "degraded"
message = "there are failed jobs in the last 24 hours"
}
return healthComponent{
Name: "errors",
Status: status,
Error: message,
Data: map[string]any{
"failed_total": failedTotal,
"failed_24h": failed24h,
"by_code": stats.Errors,
},
}
}
func healthThroughput(stats *model.Stats) healthComponent {
var done24h, retried24h int64
for _, row := range stats.Stages {
done24h += row.Done24h
retried24h += row.Retried24h
}
pendingByStage := make(map[string]int64)
for _, row := range stats.Backlog {
pendingByStage[row.TaskType+"|"+row.ModelProfile] += row.Pending + row.Running
}
doneByStage := make(map[string]int64)
for _, row := range stats.Stages {
doneByStage[row.TaskType+"|"+row.ModelProfile] += row.Done24h
}
stuckStages := make([]string, 0)
for key, total := range pendingByStage {
if total > 0 && doneByStage[key] == 0 {
stuckStages = append(stuckStages, key)
}
}
status := "ok"
message := ""
if len(stuckStages) > 0 {
status = "degraded"
message = "some active queues have no completed jobs in the last 24 hours"
}
return healthComponent{
Name: "throughput",
Status: status,
Error: message,
Data: map[string]any{
"done_24h": done24h,
"retried_24h": retried24h,
"stuck_stages": stuckStages,
"stages": stats.Stages,
},
}
}
func healthInfra(infra infraStatusResponse) healthComponent {
status := "ok"
message := ""
if infra.SidecarError != "" {
status = "degraded"
message = infra.SidecarError
}
return healthComponent{
Name: "infra",
Status: status,
Error: message,
Data: map[string]any{
"sidecar": infra.Sidecar,
},
}
}
func worseHealthStatus(current, next string) string {
if current == "down" || next == "down" {
return "down"
}
if current == "degraded" || next == "degraded" {
return "degraded"
}
return "ok"
}

View File

@@ -8,6 +8,8 @@ import (
"net/http" "net/http"
"strings" "strings"
"time" "time"
"ai-service/internal/transcription"
) )
type providerStatus struct { type providerStatus struct {
@@ -42,9 +44,7 @@ func (s *Server) handleProviderStatus(w http.ResponseWriter, r *http.Request) {
At: time.Now().UTC(), At: time.Now().UTC(),
Providers: []providerStatus{ Providers: []providerStatus{
s.checkLLM(ctx), s.checkLLM(ctx),
s.checkWhisperX(ctx), s.checkAudioLLM(ctx, transcription.ProviderWhisperLargeV3, s.cfg.AudioBaseURL, s.cfg.AudioAPIKey, s.cfg.AudioModel, s.cfg.AudioTimeout),
s.checkAudioLLM(ctx, "qwen2-audio", s.cfg.QwenAudioBaseURL, s.cfg.QwenAudioAPIKey, s.cfg.QwenAudioModel, s.cfg.QwenAudioTimeout),
s.checkAudioLLM(ctx, "voxtral-small", s.cfg.VoxtralBaseURL, s.cfg.VoxtralAPIKey, s.cfg.VoxtralModel, s.cfg.VoxtralTimeout),
}, },
} }
writeJSON(w, http.StatusOK, resp) writeJSON(w, http.StatusOK, resp)
@@ -132,47 +132,6 @@ func (s *Server) checkLLM(ctx context.Context) providerStatus {
return st return st
} }
func (s *Server) checkWhisperX(ctx context.Context) providerStatus {
baseURL := strings.TrimRight(strings.TrimSpace(s.cfg.WhisperXURL), "/")
st := providerStatus{Name: "whisperx", Configured: baseURL != "", URL: baseURL}
if !st.Configured {
return st
}
paths := []string{"/health", "/healthz", "/readyz", "/"}
var lastErr string
for _, path := range paths {
cctx, cancel := context.WithTimeout(ctx, 2*time.Second)
start := time.Now()
req, err := http.NewRequestWithContext(cctx, http.MethodGet, baseURL+path, nil)
if err != nil {
cancel()
lastErr = err.Error()
continue
}
res, err := (&http.Client{Timeout: 2 * time.Second}).Do(req)
st.LatencyMS = time.Since(start).Milliseconds()
cancel()
if err != nil {
lastErr = err.Error()
continue
}
body := ""
if res.StatusCode >= 300 {
body = readSmallBody(res.Body)
}
_ = res.Body.Close()
if res.StatusCode >= 300 {
lastErr = fmt.Sprintf("%s http %d: %s", path, res.StatusCode, body)
continue
}
st.OK = true
s.rememberProviderOK("whisperx", st.LatencyMS)
return st
}
st.Error = lastErr
return s.withStaleProviderOK("whisperx", st)
}
func (s *Server) rememberProviderOK(name string, latencyMS int64) { func (s *Server) rememberProviderOK(name string, latencyMS int64) {
s.providerMu.Lock() s.providerMu.Lock()
defer s.providerMu.Unlock() defer s.providerMu.Unlock()

View File

@@ -41,6 +41,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
case r.Method == http.MethodGet && path == "/readyz": case r.Method == http.MethodGet && path == "/readyz":
s.handleReady(w, r) s.handleReady(w, r)
case r.Method == http.MethodGet && path == "/health/detail":
s.handleHealthDetail(w, r)
case r.Method == http.MethodGet && path == "/": case r.Method == http.MethodGet && path == "/":
writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"}) writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"})
case r.Method == http.MethodPost && path == "/api/v1/jobs": case r.Method == http.MethodPost && path == "/api/v1/jobs":
@@ -59,6 +61,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handleGetJob(w, r, path) s.handleGetJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"): case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"):
s.handleRetryJob(w, r, path) s.handleRetryJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/cancel"):
s.handleCancelJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"): case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"):
s.handleCompleteJob(w, r, path) s.handleCompleteJob(w, r, path)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"): case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"):
@@ -265,7 +269,7 @@ func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request, path strin
} }
func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) { func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) {
id, err := jobIDFromPath(path, true) id, err := jobIDFromActionPath(path, "retry")
if err != nil { if err != nil {
writeError(w, http.StatusBadRequest, err.Error()) writeError(w, http.StatusBadRequest, err.Error())
return return
@@ -284,6 +288,26 @@ func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path str
writeJSON(w, http.StatusOK, job) writeJSON(w, http.StatusOK, job)
} }
func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request, path string) {
id, err := jobIDFromActionPath(path, "cancel")
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
ctx, cancel := contextWithTimeout(r, 8*time.Second)
defer cancel()
job, err := s.store.CancelJob(ctx, id)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if job == nil {
writeError(w, http.StatusNotFound, "cancellable job not found")
return
}
writeJSON(w, http.StatusOK, job)
}
func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) { func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) {
id, err := jobIDFromActionPath(path, "complete") id, err := jobIDFromActionPath(path, "complete")
if err != nil { if err != nil {
@@ -337,7 +361,7 @@ func (s *Server) handleFailJob(w http.ResponseWriter, r *http.Request, path stri
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) { func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
ctx, cancel := contextWithTimeout(r, 8*time.Second) ctx, cancel := contextWithTimeout(r, 8*time.Second)
defer cancel() defer cancel()
stats, err := s.store.Stats(ctx) stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
if err != nil { if err != nil {
writeError(w, http.StatusInternalServerError, err.Error()) writeError(w, http.StatusInternalServerError, err.Error())
return return

View File

@@ -38,11 +38,14 @@ type Usage struct {
TotalTokens int `json:"total_tokens"` TotalTokens int `json:"total_tokens"`
} }
const ChatResultSchemaVersion = "ai.chat_result.v1"
type ChatResult struct { type ChatResult struct {
Content string `json:"content"` SchemaVersion string `json:"schema_version"`
Model string `json:"model"` Content string `json:"content"`
Usage *Usage `json:"usage,omitempty"` Model string `json:"model"`
DurationMS int64 `json:"duration_ms"` Usage *Usage `json:"usage,omitempty"`
DurationMS int64 `json:"duration_ms"`
} }
type chatRequest struct { type chatRequest struct {
@@ -137,10 +140,11 @@ func (c *Client) Chat(ctx context.Context, in ChatInput) (*ChatResult, error) {
modelName = c.model modelName = c.model
} }
return &ChatResult{ return &ChatResult{
Content: out.Choices[0].Message.Content, SchemaVersion: ChatResultSchemaVersion,
Model: modelName, Content: out.Choices[0].Message.Content,
Usage: out.Usage, Model: modelName,
DurationMS: duration.Milliseconds(), Usage: out.Usage,
DurationMS: duration.Milliseconds(),
}, nil }, nil
} }

View File

@@ -0,0 +1,43 @@
package llm
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
)
func TestChatResultIncludesSchemaVersion(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/v1/chat/completions" {
t.Fatalf("path = %q, want /v1/chat/completions", r.URL.Path)
}
_ = json.NewEncoder(w).Encode(map[string]any{
"model": "qwen2.5-14b",
"choices": []map[string]any{
{"message": map[string]string{"role": "assistant", "content": `{"ok":true}`}},
},
"usage": map[string]int{
"prompt_tokens": 10,
"completion_tokens": 2,
"total_tokens": 12,
},
})
}))
defer server.Close()
client := New(server.URL, "", "fallback-model", 0)
got, err := client.Chat(t.Context(), ChatInput{User: "test", MaxTokens: 32})
if err != nil {
t.Fatalf("Chat: %v", err)
}
if got.SchemaVersion != ChatResultSchemaVersion {
t.Fatalf("schema_version = %q, want %q", got.SchemaVersion, ChatResultSchemaVersion)
}
if got.Content != `{"ok":true}` {
t.Fatalf("content = %q", got.Content)
}
if got.Usage == nil || got.Usage.TotalTokens != 12 {
t.Fatalf("usage = %#v", got.Usage)
}
}

View File

@@ -119,6 +119,30 @@ type ErrorStat struct {
Last24h int64 `json:"last_24h"` Last24h int64 `json:"last_24h"`
} }
type StageStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Done24h int64 `json:"done_24h"`
AvgDurationSeconds int64 `json:"avg_duration_seconds"`
AvgAttempts int64 `json:"avg_attempts"`
Retried24h int64 `json:"retried_24h"`
}
type BacklogStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Pending int64 `json:"pending"`
Running int64 `json:"running"`
StaleRunning int64 `json:"stale_running"`
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
OldestPendingScheduledAt string `json:"oldest_pending_scheduled_at,omitempty"`
OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"`
OldestRunningStartedAt string `json:"oldest_running_started_at,omitempty"`
LastHeartbeatAt string `json:"last_heartbeat_at,omitempty"`
}
type OwnerStat struct { type OwnerStat struct {
OwnerService string `json:"owner_service"` OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"` TaskType string `json:"task_type"`
@@ -128,8 +152,10 @@ type OwnerStat struct {
} }
type Stats struct { type Stats struct {
At time.Time `json:"at"` At time.Time `json:"at"`
Queues []QueueStat `json:"queues"` Queues []QueueStat `json:"queues"`
Owners []OwnerStat `json:"owners,omitempty"` Owners []OwnerStat `json:"owners,omitempty"`
Errors []ErrorStat `json:"errors,omitempty"` Errors []ErrorStat `json:"errors,omitempty"`
Stages []StageStat `json:"stages,omitempty"`
Backlog []BacklogStat `json:"backlog,omitempty"`
} }

View File

@@ -0,0 +1,22 @@
package store
import (
"strings"
"time"
)
type failRetryPolicy struct {
Retryable bool
Delay time.Duration
}
func retryPolicyForError(errorCode string) failRetryPolicy {
switch strings.TrimSpace(errorCode) {
case "provider_unavailable", "model_unavailable", "provider_error", "dependency_error", "timeout", "storage_error", "stale_worker":
return failRetryPolicy{Retryable: true, Delay: 30 * time.Second}
case "bad_response", "transcript_hallucination", "transcript_incomplete", "internal_error", "unknown":
return failRetryPolicy{Retryable: true, Delay: 2 * time.Minute}
default:
return failRetryPolicy{}
}
}

View File

@@ -0,0 +1,45 @@
package store
import (
"testing"
"time"
)
func TestRetryPolicyForError(t *testing.T) {
tests := []struct {
name string
code string
retryable bool
delay time.Duration
}{
{name: "provider unavailable", code: "provider_unavailable", retryable: true, delay: 30 * time.Second},
{name: "model unavailable", code: "model_unavailable", retryable: true, delay: 30 * time.Second},
{name: "provider error", code: "provider_error", retryable: true, delay: 30 * time.Second},
{name: "dependency error", code: "dependency_error", retryable: true, delay: 30 * time.Second},
{name: "timeout", code: "timeout", retryable: true, delay: 30 * time.Second},
{name: "storage", code: "storage_error", retryable: true, delay: 30 * time.Second},
{name: "stale worker", code: "stale_worker", retryable: true, delay: 30 * time.Second},
{name: "bad response", code: "bad_response", retryable: true, delay: 2 * time.Minute},
{name: "transcript hallucination", code: "transcript_hallucination", retryable: true, delay: 2 * time.Minute},
{name: "transcript incomplete", code: "transcript_incomplete", retryable: true, delay: 2 * time.Minute},
{name: "internal error", code: "internal_error", retryable: true, delay: 2 * time.Minute},
{name: "unknown", code: "unknown", retryable: true, delay: 2 * time.Minute},
{name: "bad audio", code: "bad_audio"},
{name: "bad input", code: "bad_input"},
{name: "context length", code: "context_length"},
{name: "unsupported task", code: "unsupported_task"},
{name: "cancelled", code: "cancelled"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := retryPolicyForError(tt.code)
if got.Retryable != tt.retryable {
t.Fatalf("Retryable = %v, want %v", got.Retryable, tt.retryable)
}
if got.Delay != tt.delay {
t.Fatalf("Delay = %s, want %s", got.Delay, tt.delay)
}
})
}
}

View File

@@ -41,17 +41,48 @@ func Open(ctx context.Context, databaseURL string) (*Store, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("parse database url: %w", err) return nil, fmt.Errorf("parse database url: %w", err)
} }
pool, err := pgxpool.NewWithConfig(ctx, cfg) pool, err := connectWithRetry(ctx, cfg, 2*time.Minute)
if err != nil { if err != nil {
return nil, fmt.Errorf("connect postgres: %w", err) return nil, err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("ping postgres: %w", err)
} }
return &Store{pool: pool}, nil return &Store{pool: pool}, nil
} }
func connectWithRetry(ctx context.Context, cfg *pgxpool.Config, maxWait time.Duration) (*pgxpool.Pool, error) {
deadline := time.Now().Add(maxWait)
var lastErr error
for attempt := 1; ; attempt++ {
pool, err := pgxpool.NewWithConfig(ctx, cfg)
if err == nil {
if pingErr := pool.Ping(ctx); pingErr == nil {
return pool, nil
} else {
err = fmt.Errorf("ping postgres: %w", pingErr)
pool.Close()
}
} else {
err = fmt.Errorf("connect postgres: %w", err)
}
lastErr = err
if time.Now().After(deadline) {
return nil, fmt.Errorf("connect postgres after retry: %w", lastErr)
}
sleep := time.Duration(attempt) * time.Second
if sleep > 5*time.Second {
sleep = 5 * time.Second
}
timer := time.NewTimer(sleep)
select {
case <-ctx.Done():
timer.Stop()
return nil, fmt.Errorf("connect postgres cancelled: %w", ctx.Err())
case <-timer.C:
}
}
}
func (s *Store) Close() { func (s *Store) Close() {
s.pool.Close() s.pool.Close()
} }
@@ -78,7 +109,33 @@ INSERT INTO ai_jobs (
) )
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL
DO UPDATE SET updated_at = ai_jobs.updated_at DO UPDATE SET
status = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN 'pending' ELSE ai_jobs.status END,
priority = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.priority ELSE ai_jobs.priority END,
max_attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.max_attempts ELSE ai_jobs.max_attempts END,
attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN 0 ELSE ai_jobs.attempts END,
input = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.input ELSE ai_jobs.input END,
scheduled_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.scheduled_at ELSE ai_jobs.scheduled_at END,
started_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.started_at END,
completed_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.completed_at END,
worker_id = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.worker_id END,
heartbeat_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.heartbeat_at END,
error_code = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.error_code END,
error_message = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.error_message END,
updated_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NOW() ELSE ai_jobs.updated_at END
RETURNING ` + jobSelectColumns + ` RETURNING ` + jobSelectColumns + `
` `
row := s.pool.QueryRow(ctx, q, row := s.pool.QueryRow(ctx, q,
@@ -106,7 +163,33 @@ INSERT INTO ai_jobs (
) )
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL
DO UPDATE SET updated_at = ai_jobs.updated_at DO UPDATE SET
status = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN 'pending' ELSE ai_jobs.status END,
priority = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.priority ELSE ai_jobs.priority END,
max_attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.max_attempts ELSE ai_jobs.max_attempts END,
attempts = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN 0 ELSE ai_jobs.attempts END,
input = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.input ELSE ai_jobs.input END,
scheduled_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN EXCLUDED.scheduled_at ELSE ai_jobs.scheduled_at END,
started_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.started_at END,
completed_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.completed_at END,
worker_id = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.worker_id END,
heartbeat_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.heartbeat_at END,
error_code = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.error_code END,
error_message = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NULL ELSE ai_jobs.error_message END,
updated_at = CASE WHEN ai_jobs.status = 'failed' AND ai_jobs.error_code = 'stale_worker'
THEN NOW() ELSE ai_jobs.updated_at END
RETURNING ` + jobSelectColumns + ` RETURNING ` + jobSelectColumns + `
` `
var batch pgx.Batch var batch pgx.Batch
@@ -368,6 +451,28 @@ WHERE j.id = picked.id
return int(tag.RowsAffected()), nil return int(tag.RowsAffected()), nil
} }
func (s *Store) CancelJob(ctx context.Context, id uuid.UUID) (*model.Job, error) {
const q = `
UPDATE ai_jobs
SET status = 'cancelled',
completed_at = NOW(),
worker_id = NULL,
heartbeat_at = NULL,
updated_at = NOW()
WHERE id = $1
AND status IN ('pending', 'running')
RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, status,
attempts, max_attempts, input, result, error_code, error_message,
scheduled_at, started_at, completed_at, worker_id, heartbeat_at,
created_at, updated_at, idempotency_key
`
job, err := scanJob(s.pool.QueryRow(ctx, q, id))
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
return job, err
}
func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) { func (s *Store) CancelJobs(ctx context.Context, filter model.JobFilter) (int, error) {
normalizeFilter(&filter) normalizeFilter(&filter)
const q = ` const q = `
@@ -490,25 +595,47 @@ RETURNING ` + jobSelectColumns + `
return job, err return job, err
} }
func (s *Store) HeartbeatJob(ctx context.Context, id uuid.UUID) error {
const q = `
UPDATE ai_jobs
SET heartbeat_at = NOW(),
updated_at = NOW()
WHERE id = $1
AND status = 'running'
`
tag, err := s.pool.Exec(ctx, q, id)
if err != nil {
return err
}
if tag.RowsAffected() == 0 {
return nil
}
return nil
}
func (s *Store) FailJob(ctx context.Context, id uuid.UUID, in model.FailJob) (*model.Job, error) { func (s *Store) FailJob(ctx context.Context, id uuid.UUID, in model.FailJob) (*model.Job, error) {
errorCode := strings.TrimSpace(in.ErrorCode) errorCode := strings.TrimSpace(in.ErrorCode)
if errorCode == "" { if errorCode == "" {
errorCode = "unknown" errorCode = "unknown"
} }
errorMessage := strings.TrimSpace(in.ErrorMessage) errorMessage := strings.TrimSpace(in.ErrorMessage)
policy := retryPolicyForError(errorCode)
const q = ` const q = `
UPDATE ai_jobs UPDATE ai_jobs
SET status = 'failed', SET status = CASE WHEN $4 AND attempts < max_attempts THEN 'pending' ELSE 'failed' END,
error_code = $2, error_code = $2,
error_message = $3, error_message = $3,
completed_at = NOW(), scheduled_at = CASE WHEN $4 AND attempts < max_attempts THEN NOW() + make_interval(secs => $5) ELSE scheduled_at END,
heartbeat_at = NOW(), started_at = CASE WHEN $4 AND attempts < max_attempts THEN NULL ELSE started_at END,
completed_at = CASE WHEN $4 AND attempts < max_attempts THEN NULL ELSE NOW() END,
worker_id = NULL,
heartbeat_at = CASE WHEN $4 AND attempts < max_attempts THEN NULL ELSE NOW() END,
updated_at = NOW() updated_at = NOW()
WHERE id = $1 WHERE id = $1
AND status = 'running' AND status = 'running'
RETURNING ` + jobSelectColumns + ` RETURNING ` + jobSelectColumns + `
` `
job, err := scanJob(s.pool.QueryRow(ctx, q, id, errorCode, errorMessage)) job, err := scanJob(s.pool.QueryRow(ctx, q, id, errorCode, errorMessage, policy.Retryable, int(policy.Delay.Seconds())))
if errors.Is(err, pgx.ErrNoRows) { if errors.Is(err, pgx.ErrNoRows) {
return nil, nil return nil, nil
} }
@@ -553,7 +680,10 @@ WHERE j.id = picked.id
return int(tag.RowsAffected()), nil return int(tag.RowsAffected()), nil
} }
func (s *Store) Stats(ctx context.Context) (*model.Stats, error) { func (s *Store) Stats(ctx context.Context, staleAfter time.Duration) (*model.Stats, error) {
if staleAfter <= 0 {
staleAfter = 15 * time.Minute
}
out := &model.Stats{At: time.Now().UTC()} out := &model.Stats{At: time.Now().UTC()}
queueRows, err := s.pool.Query(ctx, ` queueRows, err := s.pool.Query(ctx, `
@@ -618,7 +748,105 @@ ORDER BY owner_service, last_24h DESC, total DESC
} }
out.Errors = append(out.Errors, stat) out.Errors = append(out.Errors, stat)
} }
return out, errorRows.Err() if err := errorRows.Err(); err != nil {
return nil, err
}
stageRows, err := s.pool.Query(ctx, `
SELECT owner_service,
task_type,
model_profile,
count(*) AS done_24h,
COALESCE(ROUND(AVG(EXTRACT(EPOCH FROM (completed_at - started_at))))::bigint, 0) AS avg_duration_seconds,
COALESCE(ROUND(AVG(attempts))::bigint, 0) AS avg_attempts,
count(*) FILTER (WHERE attempts > 1) AS retried_24h
FROM ai_jobs
WHERE status = 'done'
AND started_at IS NOT NULL
AND completed_at IS NOT NULL
AND completed_at > NOW() - INTERVAL '24 hours'
GROUP BY owner_service, task_type, model_profile
ORDER BY owner_service, task_type, model_profile
`)
if err != nil {
return nil, err
}
defer stageRows.Close()
for stageRows.Next() {
var stat model.StageStat
if err := stageRows.Scan(
&stat.OwnerService,
&stat.TaskType,
&stat.ModelProfile,
&stat.Done24h,
&stat.AvgDurationSeconds,
&stat.AvgAttempts,
&stat.Retried24h,
); err != nil {
return nil, err
}
out.Stages = append(out.Stages, stat)
}
if err := stageRows.Err(); err != nil {
return nil, err
}
backlogRows, err := s.pool.Query(ctx, `
SELECT owner_service,
task_type,
model_profile,
count(*) FILTER (WHERE status = 'pending') AS pending,
count(*) FILTER (WHERE status = 'running') AS running,
count(*) FILTER (
WHERE status = 'running'
AND COALESCE(heartbeat_at, started_at, updated_at) < NOW() - make_interval(secs => $1)
) AS stale_running,
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(scheduled_at) FILTER (WHERE status = 'pending')))::bigint, 0) AS oldest_pending_age_seconds,
MIN(scheduled_at) FILTER (WHERE status = 'pending') AS oldest_pending_scheduled_at,
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(started_at) FILTER (WHERE status = 'running')))::bigint, 0) AS oldest_running_age_seconds,
MIN(started_at) FILTER (WHERE status = 'running') AS oldest_running_started_at,
MAX(heartbeat_at) FILTER (WHERE status = 'running') AS last_heartbeat_at
FROM ai_jobs
WHERE status IN ('pending', 'running')
GROUP BY owner_service, task_type, model_profile
ORDER BY stale_running DESC, pending DESC, running DESC, owner_service, task_type, model_profile
`, int(staleAfter.Seconds()))
if err != nil {
return nil, err
}
defer backlogRows.Close()
for backlogRows.Next() {
var stat model.BacklogStat
var oldestPendingScheduledAt *time.Time
var oldestRunningStartedAt *time.Time
var lastHeartbeatAt *time.Time
if err := backlogRows.Scan(
&stat.OwnerService,
&stat.TaskType,
&stat.ModelProfile,
&stat.Pending,
&stat.Running,
&stat.StaleRunning,
&stat.OldestPendingAgeSeconds,
&oldestPendingScheduledAt,
&stat.OldestRunningAgeSeconds,
&oldestRunningStartedAt,
&lastHeartbeatAt,
); err != nil {
return nil, err
}
if oldestPendingScheduledAt != nil {
stat.OldestPendingScheduledAt = oldestPendingScheduledAt.UTC().Format(time.RFC3339)
}
if oldestRunningStartedAt != nil {
stat.OldestRunningStartedAt = oldestRunningStartedAt.UTC().Format(time.RFC3339)
}
if lastHeartbeatAt != nil {
stat.LastHeartbeatAt = lastHeartbeatAt.UTC().Format(time.RFC3339)
}
out.Backlog = append(out.Backlog, stat)
}
return out, backlogRows.Err()
} }
func scanJobSummary(row pgx.Row) (*model.JobSummary, error) { func scanJobSummary(row pgx.Row) (*model.JobSummary, error) {

View File

@@ -3,59 +3,45 @@ package transcription
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/base64"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"os"
"os/exec"
"path/filepath" "path/filepath"
"regexp"
"strings" "strings"
"time" "time"
) )
type Client struct { type Client struct {
providers []ProviderConfig provider ProviderConfig
http *http.Client http *http.Client
ffmpegPath string
leadSilence time.Duration
} }
const ( const (
ProviderWhisperX = "whisperx" ProviderWhisperLargeV3 = "whisper-large-v3"
ProviderQwenAudio = "qwen2-audio" defaultWhisperModel = "openai/whisper-large-v3"
ProviderVoxtral = "voxtral-small"
) )
var speakerLabelPattern = regexp.MustCompile(`(?i)(?:^|[\n\r ]+)((?:speaker|спикер|говорящий)\s*\d+)\s*[:-]`)
type Options struct { type Options struct {
Providers []string AudioBaseURL string
WhisperXURL string AudioAPIKey string
WhisperXTimeout time.Duration AudioModel string
FfmpegPath string AudioTimeout time.Duration
LeadSilence time.Duration AudioPrompt string
QwenAudioBaseURL string
QwenAudioAPIKey string
QwenAudioModel string
QwenAudioTimeout time.Duration
VoxtralBaseURL string
VoxtralAPIKey string
VoxtralModel string
VoxtralTimeout time.Duration
AudioLLMPrompt string
AudioLLMMaxTokens int
} }
type ProviderConfig struct { type ProviderConfig struct {
Name string Name string
Kind string BaseURL string
BaseURL string APIKey string
APIKey string Model string
Model string Timeout time.Duration
Timeout time.Duration Prompt string
MaxTokens int
Prompt string
} }
type Input struct { type Input struct {
@@ -74,15 +60,18 @@ type Segment struct {
Speaker string `json:"speaker,omitempty"` Speaker string `json:"speaker,omitempty"`
} }
const ResultSchemaVersion = "ai.transcription_result.v1"
type Result struct { type Result struct {
Provider string `json:"provider,omitempty"` SchemaVersion string `json:"schema_version"`
Model string `json:"model,omitempty"` Provider string `json:"provider,omitempty"`
Attempts []Attempt `json:"attempts,omitempty"` Model string `json:"model,omitempty"`
Language string `json:"language"` Attempts []Attempt `json:"attempts,omitempty"`
Segments []Segment `json:"segments"` Language string `json:"language"`
DiarizeError *string `json:"diarize_error,omitempty"` Segments []Segment `json:"segments"`
AlignError *string `json:"align_error,omitempty"` DiarizeError *string `json:"diarize_error,omitempty"`
DurationMS int64 `json:"duration_ms"` AlignError *string `json:"align_error,omitempty"`
DurationMS int64 `json:"duration_ms"`
} }
type Attempt struct { type Attempt struct {
@@ -95,183 +84,71 @@ type Attempt struct {
DurationMS int64 `json:"duration_ms,omitempty"` DurationMS int64 `json:"duration_ms,omitempty"`
} }
type whisperResponse struct {
Language string `json:"language"`
Segments []Segment `json:"segments"`
DiarizeError *string `json:"diarize_error,omitempty"`
AlignError *string `json:"align_error,omitempty"`
}
type audioLLMResponse struct { type audioLLMResponse struct {
Text string Text string
Model string Model string
Language string
Segments []Segment
} }
type audioLLMChatRequest struct { type audioTranscriptionResponse struct {
Model string `json:"model"` Text string `json:"text"`
Messages []audioLLMChatMessage `json:"messages"` Language string `json:"language,omitempty"`
MaxTokens int `json:"max_tokens,omitempty"` Model string `json:"model,omitempty"`
Temperature float64 `json:"temperature"` Segments []audioTranscriptionSegment `json:"segments,omitempty"`
} Error *struct {
type audioLLMChatMessage struct {
Role string `json:"role"`
Content []audioLLMContentPart `json:"content"`
}
type audioLLMContentPart struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
InputAudio *audioLLMAudio `json:"input_audio,omitempty"`
}
type audioLLMAudio struct {
Data string `json:"data"`
Format string `json:"format,omitempty"`
}
type audioLLMChatResponse struct {
Model string `json:"model,omitempty"`
Choices []struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
} `json:"choices"`
Error *struct {
Message string `json:"message"` Message string `json:"message"`
} `json:"error,omitempty"` } `json:"error,omitempty"`
} }
type audioTranscriptionSegment struct {
Start float64 `json:"start"`
End float64 `json:"end"`
Text string `json:"text"`
}
type audioTranscriptionStatusError struct {
status int
body string
}
func (e audioTranscriptionStatusError) Error() string {
return fmt.Sprintf("audio transcription HTTP %d: %s", e.status, e.body)
}
func New(baseURL string, timeout time.Duration, ffmpegPath string, leadSilence time.Duration) *Client { func New(baseURL string, timeout time.Duration, ffmpegPath string, leadSilence time.Duration) *Client {
return NewWithOptions(Options{ return NewWithOptions(Options{
Providers: []string{ProviderWhisperX}, AudioBaseURL: baseURL,
WhisperXURL: baseURL, AudioTimeout: timeout,
WhisperXTimeout: timeout,
FfmpegPath: ffmpegPath,
LeadSilence: leadSilence,
}) })
} }
func NewWithOptions(opts Options) *Client { func NewWithOptions(opts Options) *Client {
leadSilence := opts.LeadSilence audioPrompt := strings.TrimSpace(opts.AudioPrompt)
if leadSilence < 0 { provider := buildAudioProvider(opts, audioPrompt)
leadSilence = 0 if provider.BaseURL == "" {
}
if leadSilence > 5*time.Second {
leadSilence = 5 * time.Second
}
ffmpegPath := strings.TrimSpace(opts.FfmpegPath)
if ffmpegPath == "" {
ffmpegPath = "ffmpeg"
}
maxTokens := opts.AudioLLMMaxTokens
if maxTokens <= 0 {
maxTokens = 4096
}
audioLLMPrompt := strings.TrimSpace(opts.AudioLLMPrompt)
if audioLLMPrompt == "" {
audioLLMPrompt = "Transcribe the audio exactly. Return only the transcript text."
}
providers := buildProviders(opts, audioLLMPrompt, maxTokens)
if len(providers) == 0 {
return nil return nil
} }
return &Client{ return &Client{
providers: providers, provider: provider,
http: &http.Client{Timeout: maxProviderTimeout(providers)}, http: &http.Client{Timeout: provider.Timeout},
ffmpegPath: ffmpegPath,
leadSilence: leadSilence,
} }
} }
func buildProviders(opts Options, prompt string, maxTokens int) []ProviderConfig { func buildAudioProvider(opts Options, prompt string) ProviderConfig {
order := normalizeProviderOrder(opts.Providers) baseURL := strings.TrimRight(strings.TrimSpace(opts.AudioBaseURL), "/")
out := make([]ProviderConfig, 0, len(order)) if baseURL == "" {
for _, name := range order { return ProviderConfig{}
switch name {
case ProviderWhisperX:
baseURL := strings.TrimRight(strings.TrimSpace(opts.WhisperXURL), "/")
if baseURL == "" {
continue
}
out = append(out, ProviderConfig{
Name: ProviderWhisperX,
Kind: ProviderWhisperX,
BaseURL: baseURL,
Model: ProviderWhisperX,
Timeout: defaultDuration(opts.WhisperXTimeout, 10*time.Minute),
})
case ProviderQwenAudio:
baseURL := strings.TrimRight(strings.TrimSpace(opts.QwenAudioBaseURL), "/")
if baseURL == "" {
continue
}
model := firstNonEmpty(opts.QwenAudioModel, "Qwen/Qwen2-Audio-7B-Instruct")
out = append(out, ProviderConfig{
Name: ProviderQwenAudio,
Kind: "audio_llm",
BaseURL: baseURL,
APIKey: strings.TrimSpace(opts.QwenAudioAPIKey),
Model: model,
Timeout: defaultDuration(opts.QwenAudioTimeout, 10*time.Minute),
MaxTokens: maxTokens,
Prompt: prompt,
})
case ProviderVoxtral:
baseURL := strings.TrimRight(strings.TrimSpace(opts.VoxtralBaseURL), "/")
if baseURL == "" {
continue
}
model := firstNonEmpty(opts.VoxtralModel, "mistralai/Voxtral-Small-24B-2507")
out = append(out, ProviderConfig{
Name: ProviderVoxtral,
Kind: "audio_llm",
BaseURL: baseURL,
APIKey: strings.TrimSpace(opts.VoxtralAPIKey),
Model: model,
Timeout: defaultDuration(opts.VoxtralTimeout, 10*time.Minute),
MaxTokens: maxTokens,
Prompt: prompt,
})
}
} }
return out model := firstNonEmpty(opts.AudioModel, defaultWhisperModel)
} return ProviderConfig{
Name: ProviderWhisperLargeV3,
func normalizeProviderOrder(in []string) []string { BaseURL: baseURL,
if len(in) == 0 { APIKey: strings.TrimSpace(opts.AudioAPIKey),
return []string{ProviderWhisperX} Model: model,
Timeout: defaultDuration(opts.AudioTimeout, 10*time.Minute),
Prompt: prompt,
} }
out := make([]string, 0, len(in))
seen := map[string]bool{}
for _, raw := range in {
name := strings.ToLower(strings.TrimSpace(raw))
switch name {
case "whisper", "whisperx":
name = ProviderWhisperX
case "qwen", "qwen-audio", "qwen2-audio", "qwen2-audio-7b-instruct":
name = ProviderQwenAudio
case "voxtral", "voxtral-small", "voxtral-small-24b-2507":
name = ProviderVoxtral
default:
continue
}
if !seen[name] {
out = append(out, name)
seen[name] = true
}
}
return out
}
func maxProviderTimeout(providers []ProviderConfig) time.Duration {
maxTimeout := 10 * time.Minute
for _, provider := range providers {
if provider.Timeout > maxTimeout {
maxTimeout = provider.Timeout
}
}
return maxTimeout
} }
func defaultDuration(v, fallback time.Duration) time.Duration { func defaultDuration(v, fallback time.Duration) time.Duration {
@@ -282,8 +159,8 @@ func defaultDuration(v, fallback time.Duration) time.Duration {
} }
func (c *Client) Transcribe(ctx context.Context, in Input) (*Result, error) { func (c *Client) Transcribe(ctx context.Context, in Input) (*Result, error) {
if c == nil || len(c.providers) == 0 { if c == nil || c.provider.BaseURL == "" {
return nil, fmt.Errorf("transcription providers not configured") return nil, fmt.Errorf("audio transcription provider not configured")
} }
if strings.TrimSpace(in.AudioURL) == "" { if strings.TrimSpace(in.AudioURL) == "" {
return nil, fmt.Errorf("audio_url is required") return nil, fmt.Errorf("audio_url is required")
@@ -292,31 +169,12 @@ func (c *Client) Transcribe(ctx context.Context, in Input) (*Result, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if c.leadSilence > 0 { result, attempt, err := c.transcribeWithProvider(ctx, c.provider, audio, filename, in)
audio, filename, err = c.addLeadSilence(ctx, audio, filename) if err != nil {
if err != nil { return nil, err
return nil, err
}
} }
var attempts []Attempt result.Attempts = []Attempt{attempt}
var winner *Result return result, nil
var errors []string
for _, provider := range c.providers {
result, attempt, err := c.transcribeWithProvider(ctx, provider, audio, filename, in)
attempts = append(attempts, attempt)
if err != nil {
errors = append(errors, provider.Name+": "+err.Error())
continue
}
if winner == nil {
winner = result
}
}
if winner == nil {
return nil, fmt.Errorf("all transcription providers failed: %s", strings.Join(errors, "; "))
}
winner.Attempts = attempts
return winner, nil
} }
func (c *Client) transcribeWithProvider(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*Result, Attempt, error) { func (c *Client) transcribeWithProvider(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*Result, Attempt, error) {
@@ -331,58 +189,26 @@ func (c *Client) transcribeWithProvider(ctx context.Context, provider ProviderCo
Model: provider.Model, Model: provider.Model,
Status: "failed", Status: "failed",
} }
switch provider.Kind { resp, duration, err := c.transcribeOpenAIAudio(providerCtx, provider, audio, filename, in)
case ProviderWhisperX: attempt.DurationMS = duration.Milliseconds()
resp, duration, err := c.transcribeAudio(providerCtx, provider, audio, filename, in) if err != nil {
attempt.DurationMS = duration.Milliseconds() attempt.Error = err.Error()
if err != nil { return nil, attempt, err
attempt.Error = err.Error()
return nil, attempt, err
}
segments := adjustLeadSilence(resp.Segments, c.leadSilence)
attempt.Status = "ok"
attempt.Segments = segments
attempt.Text = segmentsText(segments)
return &Result{
Provider: provider.Name,
Model: provider.Model,
Language: resp.Language,
Segments: segments,
DiarizeError: resp.DiarizeError,
AlignError: resp.AlignError,
DurationMS: duration.Milliseconds(),
}, attempt, nil
default:
resp, duration, err := c.transcribeAudioLLM(providerCtx, provider, audio, filename, in)
attempt.DurationMS = duration.Milliseconds()
if err != nil {
attempt.Error = err.Error()
return nil, attempt, err
}
text := strings.TrimSpace(resp.Text)
segments := []Segment{{Start: 0, End: 0, Text: text}}
attempt.Status = "ok"
attempt.Model = resp.Model
attempt.Text = text
attempt.Segments = segments
return &Result{
Provider: provider.Name,
Model: resp.Model,
Language: firstNonEmpty(in.Language, "unknown"),
Segments: segments,
DurationMS: duration.Milliseconds(),
}, attempt, nil
} }
} text := strings.TrimSpace(resp.Text)
segments := normalizeAudioLLMSegments(resp.Segments, text, in.Diarize)
func segmentsText(segments []Segment) string { attempt.Status = "ok"
parts := make([]string, 0, len(segments)) attempt.Model = resp.Model
for _, segment := range segments { attempt.Text = text
if text := strings.TrimSpace(segment.Text); text != "" { attempt.Segments = segments
parts = append(parts, text) return &Result{
} SchemaVersion: ResultSchemaVersion,
} Provider: provider.Name,
return strings.Join(parts, "\n") Model: resp.Model,
Language: firstNonEmpty(resp.Language, in.Language, "unknown"),
Segments: segments,
DurationMS: duration.Milliseconds(),
}, attempt, nil
} }
func (c *Client) downloadAudio(ctx context.Context, in Input) ([]byte, string, error) { func (c *Client) downloadAudio(ctx context.Context, in Input) ([]byte, string, error) {
@@ -413,83 +239,6 @@ func (c *Client) downloadAudio(ctx context.Context, in Input) ([]byte, string, e
return audio, filename, nil return audio, filename, nil
} }
func (c *Client) addLeadSilence(ctx context.Context, audio []byte, filename string) ([]byte, string, error) {
tmpDir, err := os.MkdirTemp("", "ai-transcribe-*")
if err != nil {
return nil, "", fmt.Errorf("prepare audio temp dir: %w", err)
}
defer os.RemoveAll(tmpDir)
inputPath := filepath.Join(tmpDir, "input"+safeExt(filename))
outputPath := filepath.Join(tmpDir, "padded.mp3")
if err := os.WriteFile(inputPath, audio, 0o600); err != nil {
return nil, "", fmt.Errorf("write audio temp file: %w", err)
}
delayMS := int(c.leadSilence.Milliseconds())
if delayMS <= 0 {
return audio, filename, nil
}
cmd := exec.CommandContext(ctx, c.ffmpegPath,
"-nostdin", "-y",
"-i", inputPath,
"-af", fmt.Sprintf("adelay=%d:all=1", delayMS),
"-codec:a", "libmp3lame",
"-qscale:a", "5",
outputPath,
)
out, err := cmd.CombinedOutput()
if err != nil {
return nil, "", fmt.Errorf("ffmpeg lead silence: %w (%s)", err, trimOutput(out))
}
padded, err := os.ReadFile(outputPath)
if err != nil {
return nil, "", fmt.Errorf("read padded audio: %w", err)
}
if len(padded) == 0 {
return nil, "", fmt.Errorf("padded audio is empty")
}
base := strings.TrimSuffix(filepath.Base(filename), filepath.Ext(filename))
if base == "" || base == "." || base == "/" {
base = "audio"
}
return padded, base + "-padded.mp3", nil
}
func safeExt(filename string) string {
ext := strings.ToLower(filepath.Ext(filename))
switch ext {
case ".mp3", ".wav", ".m4a", ".ogg", ".opus", ".webm":
return ext
default:
return ".audio"
}
}
func trimOutput(out []byte) string {
s := strings.TrimSpace(string(out))
if len(s) > 600 {
return s[:600]
}
return s
}
func adjustLeadSilence(segments []Segment, silence time.Duration) []Segment {
if len(segments) == 0 || silence <= 0 {
return segments
}
shift := silence.Seconds()
out := make([]Segment, 0, len(segments))
for _, segment := range segments {
segment.Start = clampTime(segment.Start - shift)
segment.End = clampTime(segment.End - shift)
if segment.End < segment.Start {
segment.End = segment.Start
}
out = append(out, segment)
}
return out
}
func clampTime(v float64) float64 { func clampTime(v float64) float64 {
if v < 0 { if v < 0 {
return 0 return 0
@@ -497,95 +246,61 @@ func clampTime(v float64) float64 {
return v return v
} }
func (c *Client) transcribeAudio(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*whisperResponse, time.Duration, error) { func (c *Client) transcribeOpenAIAudio(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*audioLLMResponse, time.Duration, error) {
body := &bytes.Buffer{} resp, duration, err := c.doOpenAIAudioTranscription(ctx, provider, audio, filename, in, "verbose_json")
mw := multipart.NewWriter(body) if err == nil {
fw, err := mw.CreateFormFile("file", filename) return resp, duration, nil
if err != nil {
return nil, 0, fmt.Errorf("create form file: %w", err)
} }
if _, err := fw.Write(audio); err != nil { if !isVerboseJSONUnsupported(err) {
return nil, 0, fmt.Errorf("copy audio: %w", err) return nil, duration, err
} }
if in.Language != "" { fallbackResp, fallbackDuration, fallbackErr := c.doOpenAIAudioTranscription(ctx, provider, audio, filename, in, "json")
_ = mw.WriteField("language", in.Language) return fallbackResp, duration + fallbackDuration, fallbackErr
}
if in.Diarize {
_ = mw.WriteField("diarize", "true")
if in.MinSpeakers > 0 {
_ = mw.WriteField("min_speakers", fmt.Sprintf("%d", in.MinSpeakers))
}
if in.MaxSpeakers > 0 {
_ = mw.WriteField("max_speakers", fmt.Sprintf("%d", in.MaxSpeakers))
}
} else {
_ = mw.WriteField("diarize", "false")
}
if err := mw.Close(); err != nil {
return nil, 0, fmt.Errorf("close form: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, provider.BaseURL+"/transcribe", body)
if err != nil {
return nil, 0, fmt.Errorf("whisperx request: %w", err)
}
req.Header.Set("Content-Type", mw.FormDataContentType())
start := time.Now()
resp, err := c.http.Do(req)
duration := time.Since(start)
if err != nil {
return nil, duration, fmt.Errorf("whisperx do: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return nil, duration, fmt.Errorf("whisperx HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
}
var out whisperResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, duration, fmt.Errorf("whisperx decode: %w", err)
}
return &out, duration, nil
} }
func (c *Client) transcribeAudioLLM(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*audioLLMResponse, time.Duration, error) { func (c *Client) doOpenAIAudioTranscription(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input, responseFormat string) (*audioLLMResponse, time.Duration, error) {
prompt := provider.Prompt body := &bytes.Buffer{}
if in.Language != "" { mw := multipart.NewWriter(body)
prompt += "\nЯзык аудио: " + in.Language + "." if err := mw.WriteField("model", provider.Model); err != nil {
return nil, 0, fmt.Errorf("audio transcription model field: %w", err)
} }
if in.Diarize { if err := mw.WriteField("response_format", responseFormat); err != nil {
prompt += "\nЕсли слышны разные говорящие, разделяй реплики с короткими пометками Спикер 1/Спикер 2." return nil, 0, fmt.Errorf("audio transcription response_format field: %w", err)
} }
reqBody := audioLLMChatRequest{ if err := mw.WriteField("temperature", "0"); err != nil {
Model: provider.Model, return nil, 0, fmt.Errorf("audio transcription temperature field: %w", err)
MaxTokens: provider.MaxTokens,
Temperature: 0,
Messages: []audioLLMChatMessage{
{
Role: "user",
Content: []audioLLMContentPart{
{Type: "text", Text: prompt},
{
Type: "input_audio",
InputAudio: &audioLLMAudio{
Data: base64.StdEncoding.EncodeToString(audio),
Format: audioFormat(filename),
},
},
},
},
},
} }
body, err := json.Marshal(reqBody) if responseFormat == "verbose_json" {
if err := mw.WriteField("timestamp_granularities[]", "segment"); err != nil {
return nil, 0, fmt.Errorf("audio transcription timestamp field: %w", err)
}
}
if prompt := strings.TrimSpace(provider.Prompt); prompt != "" {
if err := mw.WriteField("prompt", prompt); err != nil {
return nil, 0, fmt.Errorf("audio transcription prompt field: %w", err)
}
}
if lang := strings.TrimSpace(in.Language); lang != "" {
if err := mw.WriteField("language", lang); err != nil {
return nil, 0, fmt.Errorf("audio transcription language field: %w", err)
}
}
fw, err := mw.CreateFormFile("file", filename)
if err != nil { if err != nil {
return nil, 0, fmt.Errorf("audio llm marshal: %w", err) return nil, 0, fmt.Errorf("audio transcription create file: %w", err)
} }
req, err := http.NewRequestWithContext(ctx, http.MethodPost, provider.BaseURL+"/v1/chat/completions", bytes.NewReader(body)) if _, err := fw.Write(audio); err != nil {
return nil, 0, fmt.Errorf("audio transcription copy audio: %w", err)
}
if err := mw.Close(); err != nil {
return nil, 0, fmt.Errorf("audio transcription close form: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, provider.BaseURL+"/v1/audio/transcriptions", body)
if err != nil { if err != nil {
return nil, 0, fmt.Errorf("audio llm request: %w", err) return nil, 0, fmt.Errorf("audio transcription request: %w", err)
} }
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", mw.FormDataContentType())
if provider.APIKey != "" { if provider.APIKey != "" {
req.Header.Set("Authorization", "Bearer "+provider.APIKey) req.Header.Set("Authorization", "Bearer "+provider.APIKey)
} }
@@ -594,44 +309,197 @@ func (c *Client) transcribeAudioLLM(ctx context.Context, provider ProviderConfig
resp, err := c.http.Do(req) resp, err := c.http.Do(req)
duration := time.Since(start) duration := time.Since(start)
if err != nil { if err != nil {
return nil, duration, fmt.Errorf("audio llm do: %w", err) return nil, duration, fmt.Errorf("audio transcription do: %w", err)
} }
defer resp.Body.Close() defer resp.Body.Close()
raw, err := io.ReadAll(io.LimitReader(resp.Body, 4<<20)) raw, err := io.ReadAll(io.LimitReader(resp.Body, 4<<20))
if err != nil { if err != nil {
return nil, duration, fmt.Errorf("audio llm read: %w", err) return nil, duration, fmt.Errorf("audio transcription read: %w", err)
} }
if resp.StatusCode >= 300 { if resp.StatusCode >= 300 {
return nil, duration, fmt.Errorf("audio llm HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(raw))) return nil, duration, audioTranscriptionStatusError{status: resp.StatusCode, body: strings.TrimSpace(string(raw))}
} }
var out audioLLMChatResponse var out audioTranscriptionResponse
if err := json.Unmarshal(raw, &out); err != nil { if err := json.Unmarshal(raw, &out); err != nil {
return nil, duration, fmt.Errorf("audio llm decode: %w", err) return nil, duration, fmt.Errorf("audio transcription decode: %w", err)
} }
if out.Error != nil { if out.Error != nil {
return nil, duration, fmt.Errorf("audio llm error: %s", out.Error.Message) return nil, duration, fmt.Errorf("audio transcription error: %s", out.Error.Message)
}
if len(out.Choices) == 0 {
return nil, duration, fmt.Errorf("audio llm: empty choices")
}
modelName := out.Model
if modelName == "" {
modelName = provider.Model
} }
modelName := firstNonEmpty(out.Model, provider.Model)
return &audioLLMResponse{ return &audioLLMResponse{
Text: strings.TrimSpace(out.Choices[0].Message.Content), Text: strings.TrimSpace(out.Text),
Model: modelName, Model: modelName,
Language: out.Language,
Segments: convertAudioSegments(out.Segments),
}, duration, nil }, duration, nil
} }
func audioFormat(filename string) string { func isVerboseJSONUnsupported(err error) bool {
ext := strings.TrimPrefix(strings.ToLower(filepath.Ext(filename)), ".") var statusErr audioTranscriptionStatusError
switch ext { if !errors.As(err, &statusErr) {
case "wav", "mp3", "flac", "m4a", "ogg", "opus", "webm": return false
return ext
default:
return "mp3"
} }
if statusErr.status != http.StatusBadRequest && statusErr.status != http.StatusUnprocessableEntity {
return false
}
body := strings.ToLower(statusErr.body)
return strings.Contains(body, "verbose_json") ||
strings.Contains(body, "response_format") ||
strings.Contains(body, "timestamp_granularities")
}
func convertAudioSegments(in []audioTranscriptionSegment) []Segment {
out := make([]Segment, 0, len(in))
for _, s := range in {
text := strings.TrimSpace(s.Text)
if text == "" {
continue
}
end := s.End
if end < s.Start {
end = s.Start
}
out = append(out, Segment{Start: clampTime(s.Start), End: clampTime(end), Text: text})
}
return out
}
func normalizeAudioLLMSegments(segments []Segment, text string, diarize bool) []Segment {
text = strings.TrimSpace(text)
if text != "" {
if labeled := segmentSpeakerLabeledText(text); len(labeled) > 0 {
return labeled
}
}
if len(segments) <= 1 && text != "" {
heuristic := segmentTranscriptText(text)
if len(heuristic) > len(segments) {
segments = heuristic
}
}
return segments
}
func segmentSpeakerLabeledText(text string) []Segment {
matches := speakerLabelPattern.FindAllStringSubmatchIndex(text, -1)
if len(matches) == 0 {
return nil
}
speakerIDs := map[string]string{}
var out []Segment
var t float64
for i, match := range matches {
label := strings.ToLower(strings.TrimSpace(text[match[2]:match[3]]))
speaker, ok := speakerIDs[label]
if !ok {
speaker = fmt.Sprintf("SPEAKER_%02d", len(speakerIDs))
speakerIDs[label] = speaker
}
start := match[1]
end := len(text)
if i+1 < len(matches) {
end = matches[i+1][0]
}
part := strings.TrimSpace(text[start:end])
part = strings.Trim(part, ":-— ")
if part == "" {
continue
}
words := len(strings.Fields(part))
duration := float64(words) * 0.42
if duration < 1.2 {
duration = 1.2
}
out = append(out, Segment{Start: t, End: t + duration, Text: part, Speaker: speaker})
t += duration
}
return out
}
func segmentTranscriptText(text string) []Segment {
parts := splitTranscriptSentences(text)
out := make([]Segment, 0, len(parts))
var t float64
for _, part := range parts {
words := len(strings.Fields(part))
if words == 0 {
continue
}
duration := float64(words) * 0.42
if duration < 1.2 {
duration = 1.2
}
segment := Segment{Start: t, End: t + duration, Text: part}
out = append(out, segment)
t = segment.End
}
if len(out) == 0 && strings.TrimSpace(text) != "" {
out = append(out, Segment{Start: 0, End: 0, Text: strings.TrimSpace(text)})
}
return out
}
func splitTranscriptSentences(text string) []string {
text = strings.Join(strings.Fields(text), " ")
if text == "" {
return nil
}
var out []string
start := 0
runes := []rune(text)
for i, r := range runes {
if r != '.' && r != '!' && r != '?' && r != '…' {
continue
}
next := i + 1
if next < len(runes) && runes[next] != ' ' {
continue
}
part := strings.TrimSpace(string(runes[start : i+1]))
if part != "" {
out = append(out, part)
}
start = i + 1
for start < len(runes) && runes[start] == ' ' {
start++
}
}
tail := strings.TrimSpace(string(runes[start:]))
if tail != "" {
out = append(out, tail)
}
return mergeShortSegments(out, 8, 34)
}
func mergeShortSegments(parts []string, minWords, maxWords int) []string {
if len(parts) <= 1 {
return parts
}
out := make([]string, 0, len(parts))
var current []string
currentWords := 0
flush := func() {
if len(current) == 0 {
return
}
out = append(out, strings.Join(current, " "))
current = nil
currentWords = 0
}
for _, part := range parts {
words := len(strings.Fields(part))
if currentWords > 0 && currentWords+words > maxWords {
flush()
}
current = append(current, part)
currentWords += words
if currentWords >= minWords {
flush()
}
}
flush()
return out
} }
func firstNonEmpty(values ...string) string { func firstNonEmpty(values ...string) string {

View File

@@ -1,63 +1,183 @@
package transcription package transcription
import ( import (
"math" "encoding/json"
"net/http"
"net/http/httptest"
"testing" "testing"
"time"
) )
func TestAdjustLeadSilence(t *testing.T) { func TestNewWithOptionsBuildsWhisperProvider(t *testing.T) {
got := adjustLeadSilence([]Segment{
{Start: 0.2, End: 1.1, Text: "first"},
{Start: 1.4, End: 2.0, Text: "second"},
}, 800*time.Millisecond)
if got[0].Start != 0 {
t.Fatalf("first start = %v, want 0", got[0].Start)
}
if !near(got[0].End, 0.3) {
t.Fatalf("first end = %v, want 0.3", got[0].End)
}
if !near(got[1].Start, 0.6) {
t.Fatalf("second start = %v, want 0.6", got[1].Start)
}
}
func TestNormalizeProviderOrder(t *testing.T) {
got := normalizeProviderOrder([]string{"whisperx", "qwen", "voxtral", "qwen2-audio"})
want := []string{ProviderWhisperX, ProviderQwenAudio, ProviderVoxtral}
if len(got) != len(want) {
t.Fatalf("providers = %#v, want %#v", got, want)
}
for i := range want {
if got[i] != want[i] {
t.Fatalf("providers = %#v, want %#v", got, want)
}
}
}
func TestNewWithOptionsBuildsComparisonProviders(t *testing.T) {
client := NewWithOptions(Options{ client := NewWithOptions(Options{
Providers: []string{"whisperx", "qwen2-audio", "voxtral-small"}, AudioBaseURL: "http://whisper",
WhisperXURL: "http://whisperx",
QwenAudioBaseURL: "http://qwen",
VoxtralBaseURL: "http://voxtral",
}) })
if client == nil { if client == nil {
t.Fatal("client is nil") t.Fatal("client is nil")
} }
got := make([]string, 0, len(client.providers)) if client.provider.Name != ProviderWhisperLargeV3 {
for _, provider := range client.providers { t.Fatalf("provider = %q, want %q", client.provider.Name, ProviderWhisperLargeV3)
got = append(got, provider.Name)
} }
want := []string{ProviderWhisperX, ProviderQwenAudio, ProviderVoxtral} if client.provider.Model != "openai/whisper-large-v3" {
for i := range want { t.Fatalf("model = %q", client.provider.Model)
if got[i] != want[i] {
t.Fatalf("providers = %#v, want %#v", got, want)
}
} }
} }
func near(got, want float64) bool { func TestWhisperUsesAudioTranscriptionsEndpoint(t *testing.T) {
return math.Abs(got-want) < 0.000001 audioSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("fake audio"))
}))
defer audioSrv.Close()
var gotPath, gotModel, gotResponseFormat, gotPrompt, gotTemperature, gotTimestampGranularity string
providerSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotPath = r.URL.Path
if err := r.ParseMultipartForm(16 << 20); err != nil {
t.Fatalf("ParseMultipartForm: %v", err)
}
gotModel = r.FormValue("model")
gotResponseFormat = r.FormValue("response_format")
gotPrompt = r.FormValue("prompt")
gotTemperature = r.FormValue("temperature")
gotTimestampGranularity = r.FormValue("timestamp_granularities[]")
if _, _, err := r.FormFile("file"); err != nil {
t.Fatalf("FormFile: %v", err)
}
_ = json.NewEncoder(w).Encode(map[string]any{
"text": "Алло, тест. Да, слышно.",
"segments": []map[string]any{
{"start": 0, "end": 1.2, "text": "Алло, тест."},
{"start": 1.2, "end": 2.4, "text": "Да, слышно."},
},
})
}))
defer providerSrv.Close()
client := NewWithOptions(Options{
AudioBaseURL: providerSrv.URL,
AudioModel: "openai/whisper-large-v3",
})
if client == nil {
t.Fatal("client is nil")
}
got, err := client.Transcribe(t.Context(), Input{AudioURL: audioSrv.URL, Filename: "call.mp3"})
if err != nil {
t.Fatalf("Transcribe: %v", err)
}
if gotPath != "/v1/audio/transcriptions" {
t.Fatalf("path = %q, want /v1/audio/transcriptions", gotPath)
}
if gotModel != "openai/whisper-large-v3" {
t.Fatalf("model = %q", gotModel)
}
if gotResponseFormat != "verbose_json" {
t.Fatalf("response_format = %q, want verbose_json", gotResponseFormat)
}
if gotTemperature != "0" {
t.Fatalf("temperature = %q, want 0", gotTemperature)
}
if gotTimestampGranularity != "segment" {
t.Fatalf("timestamp_granularities[] = %q, want segment", gotTimestampGranularity)
}
if gotPrompt != "" {
t.Fatalf("prompt = %q, want empty", gotPrompt)
}
if len(got.Segments) != 2 || got.Segments[0].Text != "Алло, тест." || got.Segments[1].Start != 1.2 {
t.Fatalf("segments = %#v", got.Segments)
}
if got.SchemaVersion != ResultSchemaVersion {
t.Fatalf("schema_version = %q, want %q", got.SchemaVersion, ResultSchemaVersion)
}
}
func TestWhisperFallsBackToJSONWhenVerboseJSONUnsupported(t *testing.T) {
audioSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("fake audio"))
}))
defer audioSrv.Close()
var formats []string
providerSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if err := r.ParseMultipartForm(16 << 20); err != nil {
t.Fatalf("ParseMultipartForm: %v", err)
}
format := r.FormValue("response_format")
formats = append(formats, format)
if format == "verbose_json" {
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(map[string]any{
"error": map[string]any{"message": "unsupported response_format verbose_json"},
})
return
}
_ = json.NewEncoder(w).Encode(map[string]any{
"text": "Алло, fallback работает.",
})
}))
defer providerSrv.Close()
client := NewWithOptions(Options{
AudioBaseURL: providerSrv.URL,
AudioModel: "openai/whisper-large-v3",
})
got, err := client.Transcribe(t.Context(), Input{AudioURL: audioSrv.URL, Filename: "call.mp3"})
if err != nil {
t.Fatalf("Transcribe: %v", err)
}
if len(formats) != 2 || formats[0] != "verbose_json" || formats[1] != "json" {
t.Fatalf("formats = %#v, want verbose_json then json", formats)
}
if got.Segments[0].Text != "Алло, fallback работает." {
t.Fatalf("segments = %#v", got.Segments)
}
}
func TestSegmentTranscriptTextDoesNotInventSpeakers(t *testing.T) {
got := segmentTranscriptText("Алло, добрый день. Да, слушаю. Скажите, квартира продается? Да, продается.")
if len(got) < 2 {
t.Fatalf("segments = %#v, want multiple", got)
}
if got[0].Speaker != "" || got[1].Speaker != "" {
t.Fatalf("speakers = %q/%q, want empty", got[0].Speaker, got[1].Speaker)
}
if got[1].Start <= got[0].Start {
t.Fatalf("segment times did not advance: %#v", got)
}
}
func TestNormalizeAudioLLMSegmentsSplitsSingleLongSegment(t *testing.T) {
text := "Алло, добрый день. Да, слушаю. Скажите, квартира продается? Да, продается."
got := normalizeAudioLLMSegments([]Segment{{Start: 0, End: 12, Text: text}}, text, true)
if len(got) < 2 {
t.Fatalf("segments = %#v, want heuristic split", got)
}
if got[0].Speaker != "" || got[1].Speaker != "" {
t.Fatalf("speakers = %q/%q, want empty", got[0].Speaker, got[1].Speaker)
}
}
func TestNormalizeAudioLLMSegmentsKeepsSegmentsWithoutFakeSpeakers(t *testing.T) {
got := normalizeAudioLLMSegments([]Segment{
{Start: 0, End: 1, Text: "Алло."},
{Start: 1, End: 2, Text: "Да, слушаю."},
}, "Алло. Да, слушаю.", true)
if len(got) != 2 {
t.Fatalf("segments = %#v", got)
}
if got[0].Speaker != "" || got[1].Speaker != "" {
t.Fatalf("speakers = %q/%q, want empty", got[0].Speaker, got[1].Speaker)
}
}
func TestNormalizeAudioLLMSegmentsUsesExplicitSpeakerLabels(t *testing.T) {
text := "Спикер 1: Алло, добрый день. Спикер 2: Да, слушаю. Спикер 1: Скажите, квартира продается?"
got := normalizeAudioLLMSegments(nil, text, true)
if len(got) != 3 {
t.Fatalf("segments = %#v, want 3", got)
}
if got[0].Speaker != "SPEAKER_00" || got[1].Speaker != "SPEAKER_01" || got[2].Speaker != "SPEAKER_00" {
t.Fatalf("speakers = %q/%q/%q", got[0].Speaker, got[1].Speaker, got[2].Speaker)
}
if got[0].Text != "Алло, добрый день." || got[1].Text != "Да, слушаю." {
t.Fatalf("texts = %#v", got)
}
} }

View File

@@ -20,7 +20,7 @@ const (
TaskCallAnalysis = "call_analysis" TaskCallAnalysis = "call_analysis"
TaskTranscription = "transcription" TaskTranscription = "transcription"
TranscriptionProfile = "whisperx" TranscriptionProfile = "whisper-large-v3"
) )
type Worker struct { type Worker struct {
@@ -113,6 +113,9 @@ func (w *Worker) tick(ctx context.Context) {
} }
func (w *Worker) process(ctx context.Context, job *model.Job) { func (w *Worker) process(ctx context.Context, job *model.Job) {
stopHeartbeat := w.startHeartbeat(ctx, job)
defer stopHeartbeat()
if job.TaskType == TaskTranscription { if job.TaskType == TaskTranscription {
w.processTranscription(ctx, job) w.processTranscription(ctx, job)
return return
@@ -168,6 +171,41 @@ func (w *Worker) fail(ctx context.Context, job *model.Job, code, message string)
} }
} }
func (w *Worker) startHeartbeat(ctx context.Context, job *model.Job) func() {
heartbeatCtx, cancel := context.WithCancel(ctx)
done := make(chan struct{})
ticker := time.NewTicker(w.heartbeatInterval())
go func() {
defer close(done)
defer ticker.Stop()
for {
select {
case <-heartbeatCtx.Done():
return
case <-ticker.C:
if err := w.store.HeartbeatJob(heartbeatCtx, job.ID); err != nil {
slog.Warn("heartbeat job failed", "job_id", job.ID, "error", err)
}
}
}
}()
return func() {
cancel()
<-done
}
}
func (w *Worker) heartbeatInterval() time.Duration {
interval := w.leaseTimeout / 3
if interval < 10*time.Second {
return 10 * time.Second
}
if interval > time.Minute {
return time.Minute
}
return interval
}
func classifyTranscriptionError(err error) string { func classifyTranscriptionError(err error) string {
if err == nil { if err == nil {
return "unknown" return "unknown"
@@ -184,11 +222,11 @@ func classifyTranscriptionError(err error) string {
return "bad_audio" return "bad_audio"
case strings.Contains(s, "audio download") || strings.Contains(s, "audio http 5"): case strings.Contains(s, "audio download") || strings.Contains(s, "audio http 5"):
return "storage_error" return "storage_error"
case strings.Contains(s, "whisperx http 4") || strings.Contains(s, "ffmpeg") || strings.Contains(s, "invalid data") || strings.Contains(s, "could not decode"): case strings.Contains(s, "audio transcription http 4") || strings.Contains(s, "invalid data") || strings.Contains(s, "could not decode"):
return "bad_audio" return "bad_audio"
case strings.Contains(s, "whisperx http 5") || strings.Contains(s, "whisperx do") || strings.Contains(s, "audio llm http 5") || strings.Contains(s, "audio llm do") || strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "closed network connection"): case strings.Contains(s, "audio transcription http 5") || strings.Contains(s, "audio transcription do") || strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "closed network connection"):
return "provider_unavailable" return "provider_unavailable"
case strings.Contains(s, "audio llm http 4"): case strings.Contains(s, "audio transcription http 4"):
return "bad_input" return "bad_input"
case strings.Contains(s, "decode"): case strings.Contains(s, "decode"):
return "bad_response" return "bad_response"
@@ -207,6 +245,8 @@ func classifyLLMError(err error) string {
return "timeout" return "timeout"
case strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "no route to host") || strings.Contains(s, "llm http 5"): case strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "no route to host") || strings.Contains(s, "llm http 5"):
return "model_unavailable" return "model_unavailable"
case strings.Contains(s, "maximum context length") || strings.Contains(s, "context length") || strings.Contains(s, "input_tokens"):
return "context_length"
case strings.Contains(s, "llm http 4") || strings.Contains(s, "messages are required"): case strings.Contains(s, "llm http 4") || strings.Contains(s, "messages are required"):
return "bad_input" return "bad_input"
case strings.Contains(s, "llm decode") || strings.Contains(s, "empty choices"): case strings.Contains(s, "llm decode") || strings.Contains(s, "empty choices"):

View File

@@ -0,0 +1,29 @@
package worker
import (
"errors"
"testing"
)
func TestClassifyLLMError(t *testing.T) {
tests := []struct {
name string
err error
want string
}{
{name: "timeout", err: errors.New("context deadline exceeded"), want: "timeout"},
{name: "unavailable", err: errors.New("llm HTTP 500: internal server error"), want: "model_unavailable"},
{name: "context length", err: errors.New("This model's maximum context length is 16384 tokens. input_tokens=16001"), want: "context_length"},
{name: "bad input", err: errors.New("llm HTTP 400: messages are required"), want: "bad_input"},
{name: "bad response", err: errors.New("llm decode: invalid character '<'"), want: "bad_response"},
{name: "unknown", err: errors.New("strange failure"), want: "unknown"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := classifyLLMError(tt.err); got != tt.want {
t.Fatalf("classifyLLMError() = %q, want %q", got, tt.want)
}
})
}
}

View File

@@ -11,20 +11,11 @@ data:
LLM_BASE_URL: "http://10.2.3.5:8002" LLM_BASE_URL: "http://10.2.3.5:8002"
LLM_MODEL: "qwen2.5-14b" LLM_MODEL: "qwen2.5-14b"
LLM_TIMEOUT: "5m" LLM_TIMEOUT: "5m"
TRANSCRIPTION_PROVIDERS: "whisperx,qwen2-audio,voxtral-small" # Whisper Large v3 is exposed on the AI server through an OpenAI-compatible
WHISPERX_URL: "http://10.2.3.5:8001" # /v1/audio/transcriptions endpoint.
WHISPERX_TIMEOUT: "10m" AUDIO_TRANSCRIPTION_BASE_URL: "http://10.2.3.5:8004"
WHISPERX_LEAD_SILENCE: "800ms" AUDIO_TRANSCRIPTION_MODEL: "openai/whisper-large-v3"
# Fill these after Qwen2-Audio and Voxtral are exposed as OpenAI-compatible AUDIO_TRANSCRIPTION_TIMEOUT: "30m"
# chat-completions endpoints on the AI server.
QWEN_AUDIO_BASE_URL: "http://10.2.3.5:8003"
QWEN_AUDIO_MODEL: "Qwen/Qwen2-Audio-7B-Instruct"
QWEN_AUDIO_TIMEOUT: "10m"
VOXTRAL_BASE_URL: ""
VOXTRAL_MODEL: "mistralai/Voxtral-Small-24B-2507"
VOXTRAL_TIMEOUT: "10m"
AUDIO_LLM_MAX_TOKENS: "4096"
FFMPEG_PATH: "/usr/bin/ffmpeg"
AI_STATS_SIDECAR_URL: "http://10.2.3.5:9090" AI_STATS_SIDECAR_URL: "http://10.2.3.5:9090"
AI_STATS_TIMEOUT: "8s" AI_STATS_TIMEOUT: "8s"
WORKER_POLL_INTERVAL: "2s" WORKER_POLL_INTERVAL: "2s"

View File

@@ -18,6 +18,5 @@ type: Opaque
stringData: stringData:
DATABASE_URL: "postgres://ai_service:ai_service@postgres:5432/ai_service?sslmode=disable" DATABASE_URL: "postgres://ai_service:ai_service@postgres:5432/ai_service?sslmode=disable"
LLM_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32" LLM_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32"
QWEN_AUDIO_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32" AUDIO_TRANSCRIPTION_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32"
VOXTRAL_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32"
AI_SERVICE_TOKEN: "d18bcacf9e02bae1806ee6b6eeda62b95be6a915c0a22936d9a700128b275442" AI_SERVICE_TOKEN: "d18bcacf9e02bae1806ee6b6eeda62b95be6a915c0a22936d9a700128b275442"

View File

@@ -30,11 +30,81 @@ spec:
fieldRef: fieldRef:
fieldPath: metadata.name fieldPath: metadata.name
- name: WORKER_TASK_TYPES - name: WORKER_TASK_TYPES
value: "llm_chat,chat_completion,call_analysis,telegram_classification" value: "llm_chat,chat_completion,call_analysis,telegram_classification,transcript_summary"
- name: WORKER_MODEL_PROFILES - name: WORKER_MODEL_PROFILES
value: "qwen2.5-14b" value: "qwen2.5-14b"
- name: WORKER_CLAIM_LIMIT - name: WORKER_CLAIM_LIMIT
value: "8" value: "8"
- name: WORKER_LEASE_TIMEOUT
value: "15m"
envFrom:
- configMapRef:
name: ai-service-config
- secretRef:
name: ai-service-secrets
startupProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 5
failureThreshold: 30
readinessProbe:
httpGet:
path: /readyz
port: 8081
periodSeconds: 10
livenessProbe:
httpGet:
path: /healthz
port: 8081
periodSeconds: 10
resources:
requests:
cpu: 50m
memory: 96Mi
limits:
cpu: 500m
memory: 384Mi
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-service-analysis-worker
namespace: ai-service
spec:
replicas: 1
selector:
matchLabels:
app: ai-service-analysis-worker
template:
metadata:
labels:
app: ai-service-analysis-worker
spec:
terminationGracePeriodSeconds: 20
hostAliases:
- ip: "77.105.173.42"
hostnames:
- "s3-minio.estateliga.work"
containers:
- name: worker
image: localhost:30300/admin/ai-service:latest
command: ["/usr/local/bin/ai-service-worker"]
ports:
- containerPort: 8081
env:
- name: WORKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: WORKER_TASK_TYPES
value: "call_analysis"
- name: WORKER_MODEL_PROFILES
value: "qwen2.5-14b"
- name: WORKER_CLAIM_LIMIT
value: "8"
- name: WORKER_LEASE_TIMEOUT
value: "15m"
envFrom: envFrom:
- configMapRef: - configMapRef:
name: ai-service-config name: ai-service-config
@@ -98,9 +168,11 @@ spec:
- name: WORKER_TASK_TYPES - name: WORKER_TASK_TYPES
value: "transcription" value: "transcription"
- name: WORKER_MODEL_PROFILES - name: WORKER_MODEL_PROFILES
value: "whisperx" value: "whisper-large-v3"
- name: WORKER_CLAIM_LIMIT - name: WORKER_CLAIM_LIMIT
value: "1" value: "4"
- name: WORKER_LEASE_TIMEOUT
value: "15m"
envFrom: envFrom:
- configMapRef: - configMapRef:
name: ai-service-config name: ai-service-config