Make Voxtral the only transcription provider
Some checks failed
CI / test (push) Failing after 8s
Build and Deploy / build-and-deploy (push) Successful in 27s

This commit is contained in:
Grendgi
2026-06-09 16:54:54 +03:00
parent 5c965be8c9
commit 9bd6d726f0
15 changed files with 128 additions and 900 deletions

View File

@@ -15,7 +15,7 @@ The service is intentionally domain-agnostic:
`beeline/{call_id}` or `channel/{message_id}`.
- `task_type` describes the technical task class, for example
`transcribe`, `call_analysis`, `tg_analysis`, `pf_competitor_analysis`.
- `model_profile` selects a runtime profile, for example `whisperx`,
- `model_profile` selects a runtime profile, for example `voxtral-small`,
`qwen2.5-14b`, `vision`, or a future provider profile.
- `input` and `result` are JSON payloads owned by the caller and worker.
@@ -46,32 +46,18 @@ or compact `system` / `user` fields. The completed job result contains
domain metadata fields in `input`, but the worker only reads chat fields such as
`system`, `user`, `messages`, `max_tokens` and `response_format`.
`transcription` jobs can run several transcription providers in order for
temporary A/B comparison. The main `segments` field remains compatible with
telephony and contains the first successful provider result. The full comparison
is stored in `attempts` with `provider`, `model`, `status`, `text`, `segments`,
`duration_ms` and `error`.
`transcription` jobs are processed only by Voxtral Small
(`mistralai/Voxtral-Small-24B-2507`) through an OpenAI-compatible
`/v1/audio/transcriptions` endpoint. The returned `segments` field stays
compatible with telephony. If the provider returns one long segment, AI Service
splits it into smaller transcript segments and adds heuristic speaker labels
when diarization is requested.
Recommended comparison order:
AI-server compose snippet for Voxtral lives in
`deploy/ai-server/docker-compose.audio.yml`:
1. `whisperx`
2. `qwen2-audio` (`Qwen/Qwen2-Audio-7B-Instruct`)
3. `voxtral-small` (`mistralai/Voxtral-Small-24B-2507`)
Qwen2-Audio and Voxtral are called through an OpenAI-compatible
`/v1/chat/completions` endpoint with vLLM-style `audio_url` data URLs; set
their endpoint URLs only after the models are actually exposed on the AI server.
AI-server compose snippets for these temporary comparison endpoints live in
`deploy/ai-server/docker-compose.audio.yml`. They are profile-gated because the
single GPU cannot keep the production text vLLM, two WhisperX instances, Qwen2
Audio and Voxtral loaded at the same time:
- Qwen2-Audio endpoint: `http://10.2.3.5:8003`
- Voxtral endpoint: `http://10.2.3.5:8004`
- Start Qwen only:
`docker compose -f docker-compose.yml -f docker-compose.audio.yml --profile qwen-audio up -d qwen-audio`
- Start Voxtral only:
- Start Voxtral:
`docker compose -f docker-compose.yml -f docker-compose.audio.yml --profile voxtral-small up -d voxtral-small`
## API
@@ -90,7 +76,7 @@ Audio and Voxtral loaded at the same time:
- `GET /api/v1/providers/status` checks configured AI providers without
returning secrets.
- `GET /api/v1/infra/status` returns AI-server sidecar telemetry
(GPU, containers, vLLM and WhisperX live metrics) when configured.
(GPU, containers and vLLM live metrics) when configured.
- `GET /healthz` returns process health.
- `GET /readyz` checks PostgreSQL readiness.
- Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`:
@@ -111,19 +97,11 @@ for Kubernetes probes.
- `LLM_API_KEY`, primary LLM API key
- `LLM_MODEL`, default `qwen2.5-14b`
- `LLM_TIMEOUT`, default `5m`
- `TRANSCRIPTION_PROVIDERS`, default `whisperx`, comma-separated ordered list:
`whisperx,qwen2-audio,voxtral-small`
- `WHISPERX_URL`, WhisperX endpoint for transcription jobs
- `QWEN_AUDIO_BASE_URL`, OpenAI-compatible endpoint for Qwen2-Audio
- `QWEN_AUDIO_MODEL`, default `Qwen/Qwen2-Audio-7B-Instruct`
- `QWEN_AUDIO_API_KEY`, optional bearer token for Qwen2-Audio; falls back to
`AUDIO_LLM_API_KEY`, then `LLM_API_KEY`
- `VOXTRAL_BASE_URL`, OpenAI-compatible endpoint for Voxtral
- `VOXTRAL_MODEL`, default `mistralai/Voxtral-Small-24B-2507`
- `VOXTRAL_API_KEY`, optional bearer token for Voxtral; falls back to
`AUDIO_LLM_API_KEY`, then `LLM_API_KEY`
- `AUDIO_LLM_PROMPT`, transcription instruction for audio LLM providers
- `AUDIO_LLM_MAX_TOKENS`, default `4096`
- `AUDIO_LLM_PROMPT`, transcription instruction for Voxtral
- `WORKER_ID`, default hostname
- `WORKER_HTTP_HOST`, default `0.0.0.0`
- `WORKER_HTTP_PORT`, default `8081`

View File

@@ -49,21 +49,11 @@ func main() {
llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout)
transcriber := transcription.NewWithOptions(transcription.Options{
Providers: cfg.TranscriptionProviders,
WhisperXURL: cfg.WhisperXURL,
WhisperXTimeout: cfg.WhisperXTimeout,
FfmpegPath: cfg.FfmpegPath,
LeadSilence: cfg.WhisperXLeadSilence,
QwenAudioBaseURL: cfg.QwenAudioBaseURL,
QwenAudioAPIKey: cfg.QwenAudioAPIKey,
QwenAudioModel: cfg.QwenAudioModel,
QwenAudioTimeout: cfg.QwenAudioTimeout,
VoxtralBaseURL: cfg.VoxtralBaseURL,
VoxtralAPIKey: cfg.VoxtralAPIKey,
VoxtralModel: cfg.VoxtralModel,
VoxtralTimeout: cfg.VoxtralTimeout,
AudioLLMPrompt: cfg.AudioLLMPrompt,
AudioLLMMaxTokens: cfg.AudioLLMMaxTokens,
})
w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit)
healthSrv := startHealthServer(ctx, db, cfg)
@@ -72,8 +62,8 @@ func main() {
"worker_id", cfg.WorkerID,
"model", cfg.LLMModel,
"transcription_enabled", transcriber != nil,
"transcription_providers", cfg.TranscriptionProviders,
"whisperx_lead_silence", cfg.WhisperXLeadSilence.String(),
"transcription_provider", "voxtral-small",
"transcription_model", cfg.VoxtralModel,
"task_types", cfg.WorkerTaskTypes,
"model_profiles", cfg.WorkerModelProfiles,
"poll_interval", cfg.WorkerPollInterval.String(),
@@ -144,7 +134,8 @@ func (h workerHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) {
"worker_id": h.cfg.WorkerID,
"task_types": h.cfg.WorkerTaskTypes,
"model_profiles": h.cfg.WorkerModelProfiles,
"transcription_providers": h.cfg.TranscriptionProviders,
"transcription_provider": "voxtral-small",
"transcription_model": h.cfg.VoxtralModel,
"claim_limit": h.cfg.WorkerClaimLimit,
"poll_interval": h.cfg.WorkerPollInterval.String(),
"lease_timeout": h.cfg.WorkerLeaseTimeout.String(),

View File

@@ -1,60 +1,4 @@
services:
qwen-audio:
build:
context: .
dockerfile: vllm-audio.Dockerfile
image: vllm-audio:local
container_name: qwen-audio
profiles:
- qwen-audio
- audio-compare
restart: unless-stopped
ipc: host
runtime: nvidia
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
environment:
HUGGING_FACE_HUB_TOKEN: ${HF_TOKEN}
VLLM_API_KEY: ${VLLM_API_KEY}
HF_HOME: /cache
volumes:
- ./data/vllm-cache:/cache
networks:
- audio-models
ports:
- "10.2.3.5:8003:8000"
command:
- "--model"
- "Qwen/Qwen2-Audio-7B-Instruct"
- "--served-model-name"
- "Qwen/Qwen2-Audio-7B-Instruct"
- "--trust-remote-code"
- "--host"
- "0.0.0.0"
- "--port"
- "8000"
- "--max-model-len"
- "8192"
- "--gpu-memory-utilization"
- "0.25"
- "--api-key"
- "${VLLM_API_KEY}"
- "--max-num-seqs"
- "4"
- "--max-num-batched-tokens"
- "4096"
healthcheck:
test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 30s
timeout: 5s
retries: 5
start_period: 900s
voxtral-small:
build:
context: .
@@ -63,7 +7,6 @@ services:
container_name: voxtral-small
profiles:
- voxtral-small
- audio-compare
restart: unless-stopped
ipc: host
runtime: nvidia

View File

@@ -1,20 +0,0 @@
upstream whisperx_upstream {
server whisperx-1:8000 max_fails=3 fail_timeout=30s;
}
server {
listen 80 default_server;
client_max_body_size 200m;
location / {
proxy_pass http://whisperx_upstream;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_connect_timeout 30s;
proxy_send_timeout 10m;
proxy_read_timeout 10m;
proxy_request_buffering off;
proxy_buffering off;
}
}

View File

@@ -18,21 +18,11 @@ type Config struct {
LLMAPIKey string
LLMModel string
LLMTimeout time.Duration
TranscriptionProviders []string
WhisperXURL string
WhisperXTimeout time.Duration
WhisperXLeadSilence time.Duration
QwenAudioBaseURL string
QwenAudioAPIKey string
QwenAudioModel string
QwenAudioTimeout time.Duration
VoxtralBaseURL string
VoxtralAPIKey string
VoxtralModel string
VoxtralTimeout time.Duration
AudioLLMMaxTokens int
AudioLLMPrompt string
FfmpegPath string
AIStatsSidecarURL string
AIStatsTimeout time.Duration
@@ -58,21 +48,11 @@ func Load() Config {
LLMAPIKey: envString("LLM_API_KEY", ""),
LLMModel: envString("LLM_MODEL", "qwen2.5-14b"),
LLMTimeout: envDuration("LLM_TIMEOUT", 5*time.Minute),
TranscriptionProviders: envCSVDefault("TRANSCRIPTION_PROVIDERS", []string{"whisperx"}),
WhisperXURL: envString("WHISPERX_URL", ""),
WhisperXTimeout: envDuration("WHISPERX_TIMEOUT", 10*time.Minute),
WhisperXLeadSilence: envDuration("WHISPERX_LEAD_SILENCE", 800*time.Millisecond),
QwenAudioBaseURL: envString("QWEN_AUDIO_BASE_URL", envString("AUDIO_LLM_BASE_URL", "")),
QwenAudioAPIKey: envString("QWEN_AUDIO_API_KEY", envString("AUDIO_LLM_API_KEY", envString("LLM_API_KEY", ""))),
QwenAudioModel: envString("QWEN_AUDIO_MODEL", "Qwen/Qwen2-Audio-7B-Instruct"),
QwenAudioTimeout: envDuration("QWEN_AUDIO_TIMEOUT", envDuration("AUDIO_LLM_TIMEOUT", 10*time.Minute)),
VoxtralBaseURL: envString("VOXTRAL_BASE_URL", envString("AUDIO_LLM_BASE_URL", "")),
VoxtralAPIKey: envString("VOXTRAL_API_KEY", envString("AUDIO_LLM_API_KEY", envString("LLM_API_KEY", ""))),
VoxtralModel: envString("VOXTRAL_MODEL", "mistralai/Voxtral-Small-24B-2507"),
VoxtralTimeout: envDuration("VOXTRAL_TIMEOUT", envDuration("AUDIO_LLM_TIMEOUT", 10*time.Minute)),
AudioLLMMaxTokens: envInt("AUDIO_LLM_MAX_TOKENS", 4096),
AudioLLMPrompt: envString("AUDIO_LLM_PROMPT", defaultAudioLLMPrompt()),
FfmpegPath: envString("FFMPEG_PATH", "/usr/bin/ffmpeg"),
AIStatsSidecarURL: envString("AI_STATS_SIDECAR_URL", ""),
AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second),

View File

@@ -13,7 +13,6 @@ type dashboardResponse struct {
Stats *model.Stats `json:"stats"`
Providers providersStatusResponse `json:"providers"`
Infra infraStatusResponse `json:"infra"`
TranscriptionComparison []model.TranscriptionComparisonStat `json:"transcription_comparison"`
Jobs []*model.JobSummary `json:"jobs"`
}
@@ -44,12 +43,6 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
comparison, err := s.store.TranscriptionComparison(ctx)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
resp := dashboardResponse{
At: now,
Summary: summarizeQueues(stats),
@@ -58,13 +51,10 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
At: now,
Providers: []providerStatus{
s.checkLLM(ctx),
s.checkWhisperX(ctx),
s.checkAudioLLM(ctx, "qwen2-audio", s.cfg.QwenAudioBaseURL, s.cfg.QwenAudioAPIKey, s.cfg.QwenAudioModel, s.cfg.QwenAudioTimeout),
s.checkAudioLLM(ctx, "voxtral-small", s.cfg.VoxtralBaseURL, s.cfg.VoxtralAPIKey, s.cfg.VoxtralModel, s.cfg.VoxtralTimeout),
},
},
Infra: loadInfraSnapshot(r, s.cfg),
TranscriptionComparison: comparison,
Jobs: jobs,
}
writeJSON(w, http.StatusOK, resp)

View File

@@ -42,8 +42,6 @@ func (s *Server) handleProviderStatus(w http.ResponseWriter, r *http.Request) {
At: time.Now().UTC(),
Providers: []providerStatus{
s.checkLLM(ctx),
s.checkWhisperX(ctx),
s.checkAudioLLM(ctx, "qwen2-audio", s.cfg.QwenAudioBaseURL, s.cfg.QwenAudioAPIKey, s.cfg.QwenAudioModel, s.cfg.QwenAudioTimeout),
s.checkAudioLLM(ctx, "voxtral-small", s.cfg.VoxtralBaseURL, s.cfg.VoxtralAPIKey, s.cfg.VoxtralModel, s.cfg.VoxtralTimeout),
},
}
@@ -132,47 +130,6 @@ func (s *Server) checkLLM(ctx context.Context) providerStatus {
return st
}
func (s *Server) checkWhisperX(ctx context.Context) providerStatus {
baseURL := strings.TrimRight(strings.TrimSpace(s.cfg.WhisperXURL), "/")
st := providerStatus{Name: "whisperx", Configured: baseURL != "", URL: baseURL}
if !st.Configured {
return st
}
paths := []string{"/health", "/healthz", "/readyz", "/"}
var lastErr string
for _, path := range paths {
cctx, cancel := context.WithTimeout(ctx, 2*time.Second)
start := time.Now()
req, err := http.NewRequestWithContext(cctx, http.MethodGet, baseURL+path, nil)
if err != nil {
cancel()
lastErr = err.Error()
continue
}
res, err := (&http.Client{Timeout: 2 * time.Second}).Do(req)
st.LatencyMS = time.Since(start).Milliseconds()
cancel()
if err != nil {
lastErr = err.Error()
continue
}
body := ""
if res.StatusCode >= 300 {
body = readSmallBody(res.Body)
}
_ = res.Body.Close()
if res.StatusCode >= 300 {
lastErr = fmt.Sprintf("%s http %d: %s", path, res.StatusCode, body)
continue
}
st.OK = true
s.rememberProviderOK("whisperx", st.LatencyMS)
return st
}
st.Error = lastErr
return s.withStaleProviderOK("whisperx", st)
}
func (s *Server) rememberProviderOK(name string, latencyMS int64) {
s.providerMu.Lock()
defer s.providerMu.Unlock()

View File

@@ -119,22 +119,6 @@ type ErrorStat struct {
Last24h int64 `json:"last_24h"`
}
type TranscriptionComparisonStat struct {
Provider string `json:"provider"`
Model string `json:"model,omitempty"`
Attempts int64 `json:"attempts"`
Success int64 `json:"success"`
Failed int64 `json:"failed"`
SuccessRate float64 `json:"success_rate"`
Wins int64 `json:"wins"`
Last24hAttempts int64 `json:"last_24h_attempts"`
Last24hSuccess int64 `json:"last_24h_success"`
AvgDurationMS int64 `json:"avg_duration_ms"`
P50DurationMS int64 `json:"p50_duration_ms"`
AvgTextChars int64 `json:"avg_text_chars"`
LastAt *time.Time `json:"last_at,omitempty"`
}
type OwnerStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`

View File

@@ -2,7 +2,6 @@ package store
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
@@ -622,92 +621,6 @@ ORDER BY owner_service, last_24h DESC, total DESC
return out, errorRows.Err()
}
func (s *Store) TranscriptionComparison(ctx context.Context) ([]model.TranscriptionComparisonStat, error) {
const q = `
WITH done_jobs AS (
SELECT result, completed_at, result->>'provider' AS winner_provider
FROM ai_jobs
WHERE task_type = 'transcription'
AND status = 'done'
AND result ? 'attempts'
),
attempts AS (
SELECT
completed_at,
winner_provider,
item
FROM done_jobs
CROSS JOIN LATERAL jsonb_array_elements(result->'attempts') AS item
)
SELECT
item->>'provider' AS provider,
COALESCE(NULLIF(MAX(item->>'model'), ''), '') AS model,
count(*) AS attempts,
count(*) FILTER (WHERE item->>'status' = 'ok') AS success,
count(*) FILTER (WHERE item->>'status' <> 'ok') AS failed,
COALESCE(
count(*) FILTER (WHERE item->>'status' = 'ok')::double precision / NULLIF(count(*), 0),
0
) AS success_rate,
count(*) FILTER (WHERE item->>'provider' = winner_provider) AS wins,
count(*) FILTER (WHERE completed_at > NOW() - INTERVAL '24 hours') AS last_24h_attempts,
count(*) FILTER (WHERE completed_at > NOW() - INTERVAL '24 hours' AND item->>'status' = 'ok') AS last_24h_success,
COALESCE(avg(NULLIF(item->>'duration_ms', '')::bigint)::double precision, 0) AS avg_duration_ms,
COALESCE(percentile_cont(0.5) WITHIN GROUP (ORDER BY NULLIF(item->>'duration_ms', '')::bigint), 0) AS p50_duration_ms,
COALESCE(avg(length(COALESCE(item->>'text', '')))::double precision, 0) AS avg_text_chars,
max(completed_at) AS last_at
FROM attempts
WHERE COALESCE(item->>'provider', '') <> ''
GROUP BY item->>'provider'
ORDER BY wins DESC, success DESC, item->>'provider'
`
rows, err := s.pool.Query(ctx, q)
if err != nil {
return nil, err
}
defer rows.Close()
var out []model.TranscriptionComparisonStat
for rows.Next() {
var stat model.TranscriptionComparisonStat
var avgDuration sql.NullFloat64
var p50Duration sql.NullFloat64
var avgText sql.NullFloat64
if err := rows.Scan(
&stat.Provider,
&stat.Model,
&stat.Attempts,
&stat.Success,
&stat.Failed,
&stat.SuccessRate,
&stat.Wins,
&stat.Last24hAttempts,
&stat.Last24hSuccess,
&avgDuration,
&p50Duration,
&avgText,
&stat.LastAt,
); err != nil {
return nil, err
}
stat.AvgDurationMS = roundedInt64(avgDuration)
stat.P50DurationMS = roundedInt64(p50Duration)
stat.AvgTextChars = roundedInt64(avgText)
out = append(out, stat)
}
return out, rows.Err()
}
func roundedInt64(v sql.NullFloat64) int64 {
if !v.Valid {
return 0
}
if v.Float64 < 0 {
return 0
}
return int64(v.Float64 + 0.5)
}
func scanJobSummary(row pgx.Row) (*model.JobSummary, error) {
var job model.JobSummary
err := row.Scan(

View File

@@ -3,58 +3,37 @@ package transcription
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
)
type Client struct {
providers []ProviderConfig
provider ProviderConfig
http *http.Client
ffmpegPath string
leadSilence time.Duration
}
const (
ProviderWhisperX = "whisperx"
ProviderQwenAudio = "qwen2-audio"
ProviderVoxtral = "voxtral-small"
)
const ProviderVoxtral = "voxtral-small"
type Options struct {
Providers []string
WhisperXURL string
WhisperXTimeout time.Duration
FfmpegPath string
LeadSilence time.Duration
QwenAudioBaseURL string
QwenAudioAPIKey string
QwenAudioModel string
QwenAudioTimeout time.Duration
VoxtralBaseURL string
VoxtralAPIKey string
VoxtralModel string
VoxtralTimeout time.Duration
AudioLLMPrompt string
AudioLLMMaxTokens int
}
type ProviderConfig struct {
Name string
Kind string
BaseURL string
APIKey string
Model string
Timeout time.Duration
MaxTokens int
Prompt string
}
@@ -95,13 +74,6 @@ type Attempt struct {
DurationMS int64 `json:"duration_ms,omitempty"`
}
type whisperResponse struct {
Language string `json:"language"`
Segments []Segment `json:"segments"`
DiarizeError *string `json:"diarize_error,omitempty"`
AlignError *string `json:"align_error,omitempty"`
}
type audioLLMResponse struct {
Text string
Model string
@@ -109,40 +81,6 @@ type audioLLMResponse struct {
Segments []Segment
}
type audioLLMChatRequest struct {
Model string `json:"model"`
Messages []audioLLMChatMessage `json:"messages"`
MaxTokens int `json:"max_tokens,omitempty"`
Temperature float64 `json:"temperature"`
}
type audioLLMChatMessage struct {
Role string `json:"role"`
Content []audioLLMContentPart `json:"content"`
}
type audioLLMContentPart struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
AudioURL *audioLLMURLRef `json:"audio_url,omitempty"`
}
type audioLLMURLRef struct {
URL string `json:"url"`
}
type audioLLMChatResponse struct {
Model string `json:"model,omitempty"`
Choices []struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
} `json:"choices"`
Error *struct {
Message string `json:"message"`
} `json:"error,omitempty"`
}
type audioTranscriptionResponse struct {
Text string `json:"text"`
Language string `json:"language,omitempty"`
@@ -161,134 +99,40 @@ type audioTranscriptionSegment struct {
func New(baseURL string, timeout time.Duration, ffmpegPath string, leadSilence time.Duration) *Client {
return NewWithOptions(Options{
Providers: []string{ProviderWhisperX},
WhisperXURL: baseURL,
WhisperXTimeout: timeout,
FfmpegPath: ffmpegPath,
LeadSilence: leadSilence,
VoxtralBaseURL: baseURL,
VoxtralTimeout: timeout,
})
}
func NewWithOptions(opts Options) *Client {
leadSilence := opts.LeadSilence
if leadSilence < 0 {
leadSilence = 0
}
if leadSilence > 5*time.Second {
leadSilence = 5 * time.Second
}
ffmpegPath := strings.TrimSpace(opts.FfmpegPath)
if ffmpegPath == "" {
ffmpegPath = "ffmpeg"
}
maxTokens := opts.AudioLLMMaxTokens
if maxTokens <= 0 {
maxTokens = 4096
}
audioLLMPrompt := strings.TrimSpace(opts.AudioLLMPrompt)
if audioLLMPrompt == "" {
audioLLMPrompt = "Transcribe the audio exactly. Return only the transcript text."
}
providers := buildProviders(opts, audioLLMPrompt, maxTokens)
if len(providers) == 0 {
provider := buildVoxtralProvider(opts, audioLLMPrompt)
if provider.BaseURL == "" {
return nil
}
return &Client{
providers: providers,
http: &http.Client{Timeout: maxProviderTimeout(providers)},
ffmpegPath: ffmpegPath,
leadSilence: leadSilence,
provider: provider,
http: &http.Client{Timeout: provider.Timeout},
}
}
func buildProviders(opts Options, prompt string, maxTokens int) []ProviderConfig {
order := normalizeProviderOrder(opts.Providers)
out := make([]ProviderConfig, 0, len(order))
for _, name := range order {
switch name {
case ProviderWhisperX:
baseURL := strings.TrimRight(strings.TrimSpace(opts.WhisperXURL), "/")
if baseURL == "" {
continue
}
out = append(out, ProviderConfig{
Name: ProviderWhisperX,
Kind: ProviderWhisperX,
BaseURL: baseURL,
Model: ProviderWhisperX,
Timeout: defaultDuration(opts.WhisperXTimeout, 10*time.Minute),
})
case ProviderQwenAudio:
baseURL := strings.TrimRight(strings.TrimSpace(opts.QwenAudioBaseURL), "/")
if baseURL == "" {
continue
}
model := firstNonEmpty(opts.QwenAudioModel, "Qwen/Qwen2-Audio-7B-Instruct")
out = append(out, ProviderConfig{
Name: ProviderQwenAudio,
Kind: "audio_llm",
BaseURL: baseURL,
APIKey: strings.TrimSpace(opts.QwenAudioAPIKey),
Model: model,
Timeout: defaultDuration(opts.QwenAudioTimeout, 10*time.Minute),
MaxTokens: maxTokens,
Prompt: prompt,
})
case ProviderVoxtral:
func buildVoxtralProvider(opts Options, prompt string) ProviderConfig {
baseURL := strings.TrimRight(strings.TrimSpace(opts.VoxtralBaseURL), "/")
if baseURL == "" {
continue
return ProviderConfig{}
}
model := firstNonEmpty(opts.VoxtralModel, "mistralai/Voxtral-Small-24B-2507")
out = append(out, ProviderConfig{
return ProviderConfig{
Name: ProviderVoxtral,
Kind: "audio_transcription",
BaseURL: baseURL,
APIKey: strings.TrimSpace(opts.VoxtralAPIKey),
Model: model,
Timeout: defaultDuration(opts.VoxtralTimeout, 10*time.Minute),
MaxTokens: maxTokens,
Prompt: prompt,
})
}
}
return out
}
func normalizeProviderOrder(in []string) []string {
if len(in) == 0 {
return []string{ProviderWhisperX}
}
out := make([]string, 0, len(in))
seen := map[string]bool{}
for _, raw := range in {
name := strings.ToLower(strings.TrimSpace(raw))
switch name {
case "whisper", "whisperx":
name = ProviderWhisperX
case "qwen", "qwen-audio", "qwen2-audio", "qwen2-audio-7b-instruct":
name = ProviderQwenAudio
case "voxtral", "voxtral-small", "voxtral-small-24b-2507":
name = ProviderVoxtral
default:
continue
}
if !seen[name] {
out = append(out, name)
seen[name] = true
}
}
return out
}
func maxProviderTimeout(providers []ProviderConfig) time.Duration {
maxTimeout := 10 * time.Minute
for _, provider := range providers {
if provider.Timeout > maxTimeout {
maxTimeout = provider.Timeout
}
}
return maxTimeout
}
func defaultDuration(v, fallback time.Duration) time.Duration {
@@ -299,8 +143,8 @@ func defaultDuration(v, fallback time.Duration) time.Duration {
}
func (c *Client) Transcribe(ctx context.Context, in Input) (*Result, error) {
if c == nil || len(c.providers) == 0 {
return nil, fmt.Errorf("transcription providers not configured")
if c == nil || c.provider.BaseURL == "" {
return nil, fmt.Errorf("voxtral transcription provider not configured")
}
if strings.TrimSpace(in.AudioURL) == "" {
return nil, fmt.Errorf("audio_url is required")
@@ -309,31 +153,12 @@ func (c *Client) Transcribe(ctx context.Context, in Input) (*Result, error) {
if err != nil {
return nil, err
}
if c.leadSilence > 0 {
audio, filename, err = c.addLeadSilence(ctx, audio, filename)
result, attempt, err := c.transcribeWithProvider(ctx, c.provider, audio, filename, in)
if err != nil {
return nil, err
}
}
var attempts []Attempt
var winner *Result
var errors []string
for _, provider := range c.providers {
result, attempt, err := c.transcribeWithProvider(ctx, provider, audio, filename, in)
attempts = append(attempts, attempt)
if err != nil {
errors = append(errors, provider.Name+": "+err.Error())
continue
}
if winner == nil {
winner = result
}
}
if winner == nil {
return nil, fmt.Errorf("all transcription providers failed: %s", strings.Join(errors, "; "))
}
winner.Attempts = attempts
return winner, nil
result.Attempts = []Attempt{attempt}
return result, nil
}
func (c *Client) transcribeWithProvider(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*Result, Attempt, error) {
@@ -348,29 +173,7 @@ func (c *Client) transcribeWithProvider(ctx context.Context, provider ProviderCo
Model: provider.Model,
Status: "failed",
}
switch provider.Kind {
case ProviderWhisperX:
resp, duration, err := c.transcribeAudio(providerCtx, provider, audio, filename, in)
attempt.DurationMS = duration.Milliseconds()
if err != nil {
attempt.Error = err.Error()
return nil, attempt, err
}
segments := adjustLeadSilence(resp.Segments, c.leadSilence)
attempt.Status = "ok"
attempt.Segments = segments
attempt.Text = segmentsText(segments)
return &Result{
Provider: provider.Name,
Model: provider.Model,
Language: resp.Language,
Segments: segments,
DiarizeError: resp.DiarizeError,
AlignError: resp.AlignError,
DurationMS: duration.Milliseconds(),
}, attempt, nil
default:
resp, duration, err := c.transcribeAudioLLM(providerCtx, provider, audio, filename, in)
resp, duration, err := c.transcribeOpenAIAudio(providerCtx, provider, audio, filename, in)
attempt.DurationMS = duration.Milliseconds()
if err != nil {
attempt.Error = err.Error()
@@ -389,17 +192,6 @@ func (c *Client) transcribeWithProvider(ctx context.Context, provider ProviderCo
Segments: segments,
DurationMS: duration.Milliseconds(),
}, attempt, nil
}
}
func segmentsText(segments []Segment) string {
parts := make([]string, 0, len(segments))
for _, segment := range segments {
if text := strings.TrimSpace(segment.Text); text != "" {
parts = append(parts, text)
}
}
return strings.Join(parts, "\n")
}
func (c *Client) downloadAudio(ctx context.Context, in Input) ([]byte, string, error) {
@@ -430,83 +222,6 @@ func (c *Client) downloadAudio(ctx context.Context, in Input) ([]byte, string, e
return audio, filename, nil
}
func (c *Client) addLeadSilence(ctx context.Context, audio []byte, filename string) ([]byte, string, error) {
tmpDir, err := os.MkdirTemp("", "ai-transcribe-*")
if err != nil {
return nil, "", fmt.Errorf("prepare audio temp dir: %w", err)
}
defer os.RemoveAll(tmpDir)
inputPath := filepath.Join(tmpDir, "input"+safeExt(filename))
outputPath := filepath.Join(tmpDir, "padded.mp3")
if err := os.WriteFile(inputPath, audio, 0o600); err != nil {
return nil, "", fmt.Errorf("write audio temp file: %w", err)
}
delayMS := int(c.leadSilence.Milliseconds())
if delayMS <= 0 {
return audio, filename, nil
}
cmd := exec.CommandContext(ctx, c.ffmpegPath,
"-nostdin", "-y",
"-i", inputPath,
"-af", fmt.Sprintf("adelay=%d:all=1", delayMS),
"-codec:a", "libmp3lame",
"-qscale:a", "5",
outputPath,
)
out, err := cmd.CombinedOutput()
if err != nil {
return nil, "", fmt.Errorf("ffmpeg lead silence: %w (%s)", err, trimOutput(out))
}
padded, err := os.ReadFile(outputPath)
if err != nil {
return nil, "", fmt.Errorf("read padded audio: %w", err)
}
if len(padded) == 0 {
return nil, "", fmt.Errorf("padded audio is empty")
}
base := strings.TrimSuffix(filepath.Base(filename), filepath.Ext(filename))
if base == "" || base == "." || base == "/" {
base = "audio"
}
return padded, base + "-padded.mp3", nil
}
func safeExt(filename string) string {
ext := strings.ToLower(filepath.Ext(filename))
switch ext {
case ".mp3", ".wav", ".m4a", ".ogg", ".opus", ".webm":
return ext
default:
return ".audio"
}
}
func trimOutput(out []byte) string {
s := strings.TrimSpace(string(out))
if len(s) > 600 {
return s[:600]
}
return s
}
func adjustLeadSilence(segments []Segment, silence time.Duration) []Segment {
if len(segments) == 0 || silence <= 0 {
return segments
}
shift := silence.Seconds()
out := make([]Segment, 0, len(segments))
for _, segment := range segments {
segment.Start = clampTime(segment.Start - shift)
segment.End = clampTime(segment.End - shift)
if segment.End < segment.Start {
segment.End = segment.Start
}
out = append(out, segment)
}
return out
}
func clampTime(v float64) float64 {
if v < 0 {
return 0
@@ -514,133 +229,6 @@ func clampTime(v float64) float64 {
return v
}
func (c *Client) transcribeAudio(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*whisperResponse, time.Duration, error) {
body := &bytes.Buffer{}
mw := multipart.NewWriter(body)
fw, err := mw.CreateFormFile("file", filename)
if err != nil {
return nil, 0, fmt.Errorf("create form file: %w", err)
}
if _, err := fw.Write(audio); err != nil {
return nil, 0, fmt.Errorf("copy audio: %w", err)
}
if in.Language != "" {
_ = mw.WriteField("language", in.Language)
}
if in.Diarize {
_ = mw.WriteField("diarize", "true")
if in.MinSpeakers > 0 {
_ = mw.WriteField("min_speakers", fmt.Sprintf("%d", in.MinSpeakers))
}
if in.MaxSpeakers > 0 {
_ = mw.WriteField("max_speakers", fmt.Sprintf("%d", in.MaxSpeakers))
}
} else {
_ = mw.WriteField("diarize", "false")
}
if err := mw.Close(); err != nil {
return nil, 0, fmt.Errorf("close form: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, provider.BaseURL+"/transcribe", body)
if err != nil {
return nil, 0, fmt.Errorf("whisperx request: %w", err)
}
req.Header.Set("Content-Type", mw.FormDataContentType())
start := time.Now()
resp, err := c.http.Do(req)
duration := time.Since(start)
if err != nil {
return nil, duration, fmt.Errorf("whisperx do: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return nil, duration, fmt.Errorf("whisperx HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
}
var out whisperResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, duration, fmt.Errorf("whisperx decode: %w", err)
}
return &out, duration, nil
}
func (c *Client) transcribeAudioLLM(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*audioLLMResponse, time.Duration, error) {
if provider.Kind == "audio_transcription" {
return c.transcribeOpenAIAudio(ctx, provider, audio, filename, in)
}
prompt := provider.Prompt
if in.Language != "" {
prompt += "\nЯзык аудио: " + in.Language + "."
}
if in.Diarize {
prompt += "\nЕсли слышны разные говорящие, разделяй реплики с короткими пометками Спикер 1/Спикер 2."
}
reqBody := audioLLMChatRequest{
Model: provider.Model,
MaxTokens: provider.MaxTokens,
Temperature: 0,
Messages: []audioLLMChatMessage{
{
Role: "user",
Content: []audioLLMContentPart{
{Type: "text", Text: prompt},
{
Type: "audio_url",
AudioURL: &audioLLMURLRef{URL: audioDataURL(audio, filename)},
},
},
},
},
}
body, err := json.Marshal(reqBody)
if err != nil {
return nil, 0, fmt.Errorf("audio llm marshal: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, provider.BaseURL+"/v1/chat/completions", bytes.NewReader(body))
if err != nil {
return nil, 0, fmt.Errorf("audio llm request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if provider.APIKey != "" {
req.Header.Set("Authorization", "Bearer "+provider.APIKey)
}
start := time.Now()
resp, err := c.http.Do(req)
duration := time.Since(start)
if err != nil {
return nil, duration, fmt.Errorf("audio llm do: %w", err)
}
defer resp.Body.Close()
raw, err := io.ReadAll(io.LimitReader(resp.Body, 4<<20))
if err != nil {
return nil, duration, fmt.Errorf("audio llm read: %w", err)
}
if resp.StatusCode >= 300 {
return nil, duration, fmt.Errorf("audio llm HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(raw)))
}
var out audioLLMChatResponse
if err := json.Unmarshal(raw, &out); err != nil {
return nil, duration, fmt.Errorf("audio llm decode: %w", err)
}
if out.Error != nil {
return nil, duration, fmt.Errorf("audio llm error: %s", out.Error.Message)
}
if len(out.Choices) == 0 {
return nil, duration, fmt.Errorf("audio llm: empty choices")
}
modelName := out.Model
if modelName == "" {
modelName = provider.Model
}
return &audioLLMResponse{
Text: strings.TrimSpace(out.Choices[0].Message.Content),
Model: modelName,
}, duration, nil
}
func (c *Client) transcribeOpenAIAudio(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*audioLLMResponse, time.Duration, error) {
resp, duration, err := c.doOpenAIAudioTranscription(ctx, provider, audio, filename, in, "verbose_json")
if err == nil {
@@ -874,20 +462,6 @@ func mergeShortSegments(parts []string, minWords, maxWords int) []string {
return out
}
func audioFormat(filename string) string {
ext := strings.TrimPrefix(strings.ToLower(filepath.Ext(filename)), ".")
switch ext {
case "wav", "mp3", "flac", "m4a", "ogg", "opus", "webm":
return ext
default:
return "mp3"
}
}
func audioDataURL(audio []byte, filename string) string {
return "data:audio/" + audioFormat(filename) + ";base64," + base64.StdEncoding.EncodeToString(audio)
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {

View File

@@ -2,70 +2,23 @@ package transcription
import (
"encoding/json"
"math"
"net/http"
"net/http/httptest"
"testing"
"time"
)
func TestAdjustLeadSilence(t *testing.T) {
got := adjustLeadSilence([]Segment{
{Start: 0.2, End: 1.1, Text: "first"},
{Start: 1.4, End: 2.0, Text: "second"},
}, 800*time.Millisecond)
if got[0].Start != 0 {
t.Fatalf("first start = %v, want 0", got[0].Start)
}
if !near(got[0].End, 0.3) {
t.Fatalf("first end = %v, want 0.3", got[0].End)
}
if !near(got[1].Start, 0.6) {
t.Fatalf("second start = %v, want 0.6", got[1].Start)
}
}
func TestNormalizeProviderOrder(t *testing.T) {
got := normalizeProviderOrder([]string{"whisperx", "qwen", "voxtral", "qwen2-audio"})
want := []string{ProviderWhisperX, ProviderQwenAudio, ProviderVoxtral}
if len(got) != len(want) {
t.Fatalf("providers = %#v, want %#v", got, want)
}
for i := range want {
if got[i] != want[i] {
t.Fatalf("providers = %#v, want %#v", got, want)
}
}
}
func TestNewWithOptionsBuildsComparisonProviders(t *testing.T) {
func TestNewWithOptionsBuildsVoxtralProvider(t *testing.T) {
client := NewWithOptions(Options{
Providers: []string{"whisperx", "qwen2-audio", "voxtral-small"},
WhisperXURL: "http://whisperx",
QwenAudioBaseURL: "http://qwen",
VoxtralBaseURL: "http://voxtral",
})
if client == nil {
t.Fatal("client is nil")
}
got := make([]string, 0, len(client.providers))
for _, provider := range client.providers {
got = append(got, provider.Name)
if client.provider.Name != ProviderVoxtral {
t.Fatalf("provider = %q, want %q", client.provider.Name, ProviderVoxtral)
}
want := []string{ProviderWhisperX, ProviderQwenAudio, ProviderVoxtral}
for i := range want {
if got[i] != want[i] {
t.Fatalf("providers = %#v, want %#v", got, want)
}
}
}
func TestAudioDataURLUsesVLLMAudioURLFormat(t *testing.T) {
got := audioDataURL([]byte("abc"), "call.wav")
want := "data:audio/wav;base64,YWJj"
if got != want {
t.Fatalf("audio data url = %q, want %q", got, want)
if client.provider.Model != "mistralai/Voxtral-Small-24B-2507" {
t.Fatalf("model = %q", client.provider.Model)
}
}
@@ -97,7 +50,6 @@ func TestVoxtralUsesAudioTranscriptionsEndpoint(t *testing.T) {
defer providerSrv.Close()
client := NewWithOptions(Options{
Providers: []string{"voxtral-small"},
VoxtralBaseURL: providerSrv.URL,
VoxtralModel: "mistralai/Voxtral-Small-24B-2507",
})
@@ -158,7 +110,3 @@ func TestNormalizeAudioLLMSegmentsKeepsSegmentsAndAddsSpeakers(t *testing.T) {
t.Fatalf("speakers = %q/%q", got[0].Speaker, got[1].Speaker)
}
}
func near(got, want float64) bool {
return math.Abs(got-want) < 0.000001
}

View File

@@ -20,7 +20,7 @@ const (
TaskCallAnalysis = "call_analysis"
TaskTranscription = "transcription"
TranscriptionProfile = "whisperx"
TranscriptionProfile = "voxtral-small"
)
type Worker struct {
@@ -184,11 +184,11 @@ func classifyTranscriptionError(err error) string {
return "bad_audio"
case strings.Contains(s, "audio download") || strings.Contains(s, "audio http 5"):
return "storage_error"
case strings.Contains(s, "whisperx http 4") || strings.Contains(s, "ffmpeg") || strings.Contains(s, "invalid data") || strings.Contains(s, "could not decode"):
case strings.Contains(s, "audio transcription http 4") || strings.Contains(s, "invalid data") || strings.Contains(s, "could not decode"):
return "bad_audio"
case strings.Contains(s, "whisperx http 5") || strings.Contains(s, "whisperx do") || strings.Contains(s, "audio llm http 5") || strings.Contains(s, "audio llm do") || strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "closed network connection"):
case strings.Contains(s, "audio transcription http 5") || strings.Contains(s, "audio transcription do") || strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "closed network connection"):
return "provider_unavailable"
case strings.Contains(s, "audio llm http 4"):
case strings.Contains(s, "audio transcription http 4"):
return "bad_input"
case strings.Contains(s, "decode"):
return "bad_response"

View File

@@ -11,20 +11,11 @@ data:
LLM_BASE_URL: "http://10.2.3.5:8002"
LLM_MODEL: "qwen2.5-14b"
LLM_TIMEOUT: "5m"
TRANSCRIPTION_PROVIDERS: "voxtral-small"
WHISPERX_URL: ""
WHISPERX_TIMEOUT: "10m"
WHISPERX_LEAD_SILENCE: "800ms"
# Fill these after Qwen2-Audio and Voxtral are exposed as OpenAI-compatible
# chat-completions endpoints on the AI server.
QWEN_AUDIO_BASE_URL: ""
QWEN_AUDIO_MODEL: "Qwen/Qwen2-Audio-7B-Instruct"
QWEN_AUDIO_TIMEOUT: "10m"
# Voxtral Small is the only transcription provider. It is exposed on the AI
# server through an OpenAI-compatible /v1/audio/transcriptions endpoint.
VOXTRAL_BASE_URL: "http://10.2.3.5:8004"
VOXTRAL_MODEL: "mistralai/Voxtral-Small-24B-2507"
VOXTRAL_TIMEOUT: "10m"
AUDIO_LLM_MAX_TOKENS: "4096"
FFMPEG_PATH: "/usr/bin/ffmpeg"
AI_STATS_SIDECAR_URL: "http://10.2.3.5:9090"
AI_STATS_TIMEOUT: "8s"
WORKER_POLL_INTERVAL: "2s"

View File

@@ -18,6 +18,5 @@ type: Opaque
stringData:
DATABASE_URL: "postgres://ai_service:ai_service@postgres:5432/ai_service?sslmode=disable"
LLM_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32"
QWEN_AUDIO_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32"
VOXTRAL_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32"
AI_SERVICE_TOKEN: "d18bcacf9e02bae1806ee6b6eeda62b95be6a915c0a22936d9a700128b275442"

View File

@@ -98,7 +98,7 @@ spec:
- name: WORKER_TASK_TYPES
value: "transcription"
- name: WORKER_MODEL_PROFILES
value: "whisperx"
value: "voxtral-small"
- name: WORKER_CLAIM_LIMIT
value: "1"
envFrom: