diff --git a/README.md b/README.md index 4fb179c..e0ea826 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,22 @@ or compact `system` / `user` fields. The completed job result contains domain metadata fields in `input`, but the worker only reads chat fields such as `system`, `user`, `messages`, `max_tokens` and `response_format`. +`transcription` jobs can run several transcription providers in order for +temporary A/B comparison. The main `segments` field remains compatible with +telephony and contains the first successful provider result. The full comparison +is stored in `attempts` with `provider`, `model`, `status`, `text`, `segments`, +`duration_ms` and `error`. + +Recommended comparison order: + +1. `whisperx` +2. `qwen2-audio` (`Qwen/Qwen2-Audio-7B-Instruct`) +3. `voxtral-small` (`mistralai/Voxtral-Small-24B-2507`) + +Qwen2-Audio and Voxtral are called through an OpenAI-compatible +`/v1/chat/completions` endpoint with `input_audio`; set their endpoint URLs only +after the models are actually exposed on the AI server. + ## API - `POST /api/v1/jobs` creates one job. @@ -83,7 +99,17 @@ for Kubernetes probes. - `LLM_API_KEY`, primary LLM API key - `LLM_MODEL`, default `qwen2.5-14b` - `LLM_TIMEOUT`, default `5m` +- `TRANSCRIPTION_PROVIDERS`, default `whisperx`, comma-separated ordered list: + `whisperx,qwen2-audio,voxtral-small` - `WHISPERX_URL`, WhisperX endpoint for transcription jobs +- `QWEN_AUDIO_BASE_URL`, OpenAI-compatible endpoint for Qwen2-Audio +- `QWEN_AUDIO_MODEL`, default `Qwen/Qwen2-Audio-7B-Instruct` +- `QWEN_AUDIO_API_KEY`, optional bearer token for Qwen2-Audio +- `VOXTRAL_BASE_URL`, OpenAI-compatible endpoint for Voxtral +- `VOXTRAL_MODEL`, default `mistralai/Voxtral-Small-24B-2507` +- `VOXTRAL_API_KEY`, optional bearer token for Voxtral +- `AUDIO_LLM_PROMPT`, transcription instruction for audio LLM providers +- `AUDIO_LLM_MAX_TOKENS`, default `4096` - `WORKER_ID`, default hostname - `WORKER_HTTP_HOST`, default `0.0.0.0` - `WORKER_HTTP_PORT`, default `8081` diff --git a/cmd/worker/main.go b/cmd/worker/main.go index ae15c08..4680ad2 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -48,14 +48,31 @@ func main() { } llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout) - transcriber := transcription.New(cfg.WhisperXURL, cfg.WhisperXTimeout, cfg.FfmpegPath, cfg.WhisperXLeadSilence) + transcriber := transcription.NewWithOptions(transcription.Options{ + Providers: cfg.TranscriptionProviders, + WhisperXURL: cfg.WhisperXURL, + WhisperXTimeout: cfg.WhisperXTimeout, + FfmpegPath: cfg.FfmpegPath, + LeadSilence: cfg.WhisperXLeadSilence, + QwenAudioBaseURL: cfg.QwenAudioBaseURL, + QwenAudioAPIKey: cfg.QwenAudioAPIKey, + QwenAudioModel: cfg.QwenAudioModel, + QwenAudioTimeout: cfg.QwenAudioTimeout, + VoxtralBaseURL: cfg.VoxtralBaseURL, + VoxtralAPIKey: cfg.VoxtralAPIKey, + VoxtralModel: cfg.VoxtralModel, + VoxtralTimeout: cfg.VoxtralTimeout, + AudioLLMPrompt: cfg.AudioLLMPrompt, + AudioLLMMaxTokens: cfg.AudioLLMMaxTokens, + }) w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit) healthSrv := startHealthServer(ctx, db, cfg) slog.Info("ai_worker_started", "worker_id", cfg.WorkerID, "model", cfg.LLMModel, - "whisperx_enabled", transcriber != nil, + "transcription_enabled", transcriber != nil, + "transcription_providers", cfg.TranscriptionProviders, "whisperx_lead_silence", cfg.WhisperXLeadSilence.String(), "task_types", cfg.WorkerTaskTypes, "model_profiles", cfg.WorkerModelProfiles, @@ -123,13 +140,14 @@ func (h workerHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) { }) case r.Method == http.MethodGet && path == "/worker/status": writeWorkerJSON(w, http.StatusOK, map[string]any{ - "status": "running", - "worker_id": h.cfg.WorkerID, - "task_types": h.cfg.WorkerTaskTypes, - "model_profiles": h.cfg.WorkerModelProfiles, - "claim_limit": h.cfg.WorkerClaimLimit, - "poll_interval": h.cfg.WorkerPollInterval.String(), - "lease_timeout": h.cfg.WorkerLeaseTimeout.String(), + "status": "running", + "worker_id": h.cfg.WorkerID, + "task_types": h.cfg.WorkerTaskTypes, + "model_profiles": h.cfg.WorkerModelProfiles, + "transcription_providers": h.cfg.TranscriptionProviders, + "claim_limit": h.cfg.WorkerClaimLimit, + "poll_interval": h.cfg.WorkerPollInterval.String(), + "lease_timeout": h.cfg.WorkerLeaseTimeout.String(), }) default: writeWorkerJSON(w, http.StatusNotFound, map[string]any{"error": "not found"}) diff --git a/internal/config/config.go b/internal/config/config.go index da20a3b..082c076 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -14,16 +14,27 @@ type Config struct { MigrateOnStart bool APIAuthToken string - LLMBaseURL string - LLMAPIKey string - LLMModel string - LLMTimeout time.Duration - WhisperXURL string - WhisperXTimeout time.Duration - WhisperXLeadSilence time.Duration - FfmpegPath string - AIStatsSidecarURL string - AIStatsTimeout time.Duration + LLMBaseURL string + LLMAPIKey string + LLMModel string + LLMTimeout time.Duration + TranscriptionProviders []string + WhisperXURL string + WhisperXTimeout time.Duration + WhisperXLeadSilence time.Duration + QwenAudioBaseURL string + QwenAudioAPIKey string + QwenAudioModel string + QwenAudioTimeout time.Duration + VoxtralBaseURL string + VoxtralAPIKey string + VoxtralModel string + VoxtralTimeout time.Duration + AudioLLMMaxTokens int + AudioLLMPrompt string + FfmpegPath string + AIStatsSidecarURL string + AIStatsTimeout time.Duration WorkerID string WorkerHTTPHost string @@ -43,16 +54,27 @@ func Load() Config { MigrateOnStart: envBool("MIGRATE_ON_START", true), APIAuthToken: envString("AI_SERVICE_TOKEN", ""), - LLMBaseURL: envString("LLM_BASE_URL", ""), - LLMAPIKey: envString("LLM_API_KEY", ""), - LLMModel: envString("LLM_MODEL", "qwen2.5-14b"), - LLMTimeout: envDuration("LLM_TIMEOUT", 5*time.Minute), - WhisperXURL: envString("WHISPERX_URL", ""), - WhisperXTimeout: envDuration("WHISPERX_TIMEOUT", 10*time.Minute), - WhisperXLeadSilence: envDuration("WHISPERX_LEAD_SILENCE", 800*time.Millisecond), - FfmpegPath: envString("FFMPEG_PATH", "/usr/bin/ffmpeg"), - AIStatsSidecarURL: envString("AI_STATS_SIDECAR_URL", ""), - AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second), + LLMBaseURL: envString("LLM_BASE_URL", ""), + LLMAPIKey: envString("LLM_API_KEY", ""), + LLMModel: envString("LLM_MODEL", "qwen2.5-14b"), + LLMTimeout: envDuration("LLM_TIMEOUT", 5*time.Minute), + TranscriptionProviders: envCSVDefault("TRANSCRIPTION_PROVIDERS", []string{"whisperx"}), + WhisperXURL: envString("WHISPERX_URL", ""), + WhisperXTimeout: envDuration("WHISPERX_TIMEOUT", 10*time.Minute), + WhisperXLeadSilence: envDuration("WHISPERX_LEAD_SILENCE", 800*time.Millisecond), + QwenAudioBaseURL: envString("QWEN_AUDIO_BASE_URL", envString("AUDIO_LLM_BASE_URL", "")), + QwenAudioAPIKey: envString("QWEN_AUDIO_API_KEY", envString("AUDIO_LLM_API_KEY", "")), + QwenAudioModel: envString("QWEN_AUDIO_MODEL", "Qwen/Qwen2-Audio-7B-Instruct"), + QwenAudioTimeout: envDuration("QWEN_AUDIO_TIMEOUT", envDuration("AUDIO_LLM_TIMEOUT", 10*time.Minute)), + VoxtralBaseURL: envString("VOXTRAL_BASE_URL", envString("AUDIO_LLM_BASE_URL", "")), + VoxtralAPIKey: envString("VOXTRAL_API_KEY", envString("AUDIO_LLM_API_KEY", "")), + VoxtralModel: envString("VOXTRAL_MODEL", "mistralai/Voxtral-Small-24B-2507"), + VoxtralTimeout: envDuration("VOXTRAL_TIMEOUT", envDuration("AUDIO_LLM_TIMEOUT", 10*time.Minute)), + AudioLLMMaxTokens: envInt("AUDIO_LLM_MAX_TOKENS", 4096), + AudioLLMPrompt: envString("AUDIO_LLM_PROMPT", defaultAudioLLMPrompt()), + FfmpegPath: envString("FFMPEG_PATH", "/usr/bin/ffmpeg"), + AIStatsSidecarURL: envString("AI_STATS_SIDECAR_URL", ""), + AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second), WorkerID: envString("WORKER_ID", hostname()), WorkerHTTPHost: envString("WORKER_HTTP_HOST", "0.0.0.0"), @@ -123,6 +145,17 @@ func envCSV(key string) []string { return out } +func envCSVDefault(key string, fallback []string) []string { + if values := envCSV(key); len(values) > 0 { + return values + } + return fallback +} + +func defaultAudioLLMPrompt() string { + return "Расшифруй речь из аудио максимально точно. Сохрани русский язык, имена, телефоны, суммы и смысловые паузы. Не добавляй комментарии, анализ, Markdown или JSON. Верни только чистый текст расшифровки." +} + func hostname() string { h, err := os.Hostname() if err != nil || h == "" { diff --git a/internal/httpapi/dashboard.go b/internal/httpapi/dashboard.go index 79ee903..30a7914 100644 --- a/internal/httpapi/dashboard.go +++ b/internal/httpapi/dashboard.go @@ -53,6 +53,8 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) { Providers: []providerStatus{ s.checkLLM(ctx), s.checkWhisperX(ctx), + s.checkAudioLLM(ctx, "qwen2-audio", s.cfg.QwenAudioBaseURL, s.cfg.QwenAudioAPIKey, s.cfg.QwenAudioModel, s.cfg.QwenAudioTimeout), + s.checkAudioLLM(ctx, "voxtral-small", s.cfg.VoxtralBaseURL, s.cfg.VoxtralAPIKey, s.cfg.VoxtralModel, s.cfg.VoxtralTimeout), }, }, Infra: loadInfraSnapshot(r, s.cfg), diff --git a/internal/httpapi/providers.go b/internal/httpapi/providers.go index 8237658..1de940c 100644 --- a/internal/httpapi/providers.go +++ b/internal/httpapi/providers.go @@ -43,11 +43,59 @@ func (s *Server) handleProviderStatus(w http.ResponseWriter, r *http.Request) { Providers: []providerStatus{ s.checkLLM(ctx), s.checkWhisperX(ctx), + s.checkAudioLLM(ctx, "qwen2-audio", s.cfg.QwenAudioBaseURL, s.cfg.QwenAudioAPIKey, s.cfg.QwenAudioModel, s.cfg.QwenAudioTimeout), + s.checkAudioLLM(ctx, "voxtral-small", s.cfg.VoxtralBaseURL, s.cfg.VoxtralAPIKey, s.cfg.VoxtralModel, s.cfg.VoxtralTimeout), }, } writeJSON(w, http.StatusOK, resp) } +func (s *Server) checkAudioLLM(ctx context.Context, name, baseURL, apiKey, model string, timeout time.Duration) providerStatus { + baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/") + st := providerStatus{ + Name: name, + Configured: baseURL != "", + URL: baseURL, + Model: model, + } + if !st.Configured { + return st + } + if timeout <= 0 { + timeout = 10 * time.Minute + } + start := time.Now() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, st.URL+"/v1/models", nil) + if err != nil { + st.Error = err.Error() + return st + } + if apiKey != "" { + req.Header.Set("Authorization", "Bearer "+apiKey) + } + res, err := (&http.Client{Timeout: minDuration(timeout, 3*time.Second)}).Do(req) + st.LatencyMS = time.Since(start).Milliseconds() + if err != nil { + st.Error = err.Error() + return s.withStaleProviderOK(name, st) + } + defer res.Body.Close() + if res.StatusCode >= 300 { + st.Error = fmt.Sprintf("http %d: %s", res.StatusCode, readSmallBody(res.Body)) + return s.withStaleProviderOK(name, st) + } + st.OK = true + s.rememberProviderOK(name, st.LatencyMS) + return st +} + +func minDuration(a, b time.Duration) time.Duration { + if a < b { + return a + } + return b +} + func (s *Server) checkLLM(ctx context.Context) providerStatus { st := providerStatus{ Name: "llm", diff --git a/internal/transcription/client.go b/internal/transcription/client.go index de18059..3e3563e 100644 --- a/internal/transcription/client.go +++ b/internal/transcription/client.go @@ -3,6 +3,7 @@ package transcription import ( "bytes" "context" + "encoding/base64" "encoding/json" "fmt" "io" @@ -16,12 +17,47 @@ import ( ) type Client struct { - baseURL string + providers []ProviderConfig http *http.Client ffmpegPath string leadSilence time.Duration } +const ( + ProviderWhisperX = "whisperx" + ProviderQwenAudio = "qwen2-audio" + ProviderVoxtral = "voxtral-small" +) + +type Options struct { + Providers []string + WhisperXURL string + WhisperXTimeout time.Duration + FfmpegPath string + LeadSilence time.Duration + QwenAudioBaseURL string + QwenAudioAPIKey string + QwenAudioModel string + QwenAudioTimeout time.Duration + VoxtralBaseURL string + VoxtralAPIKey string + VoxtralModel string + VoxtralTimeout time.Duration + AudioLLMPrompt string + AudioLLMMaxTokens int +} + +type ProviderConfig struct { + Name string + Kind string + BaseURL string + APIKey string + Model string + Timeout time.Duration + MaxTokens int + Prompt string +} + type Input struct { AudioURL string `json:"audio_url"` Filename string `json:"filename,omitempty"` @@ -39,6 +75,9 @@ type Segment struct { } type Result struct { + Provider string `json:"provider,omitempty"` + Model string `json:"model,omitempty"` + Attempts []Attempt `json:"attempts,omitempty"` Language string `json:"language"` Segments []Segment `json:"segments"` DiarizeError *string `json:"diarize_error,omitempty"` @@ -46,6 +85,16 @@ type Result struct { DurationMS int64 `json:"duration_ms"` } +type Attempt struct { + Provider string `json:"provider"` + Model string `json:"model,omitempty"` + Status string `json:"status"` + Error string `json:"error,omitempty"` + Text string `json:"text,omitempty"` + Segments []Segment `json:"segments,omitempty"` + DurationMS int64 `json:"duration_ms,omitempty"` +} + type whisperResponse struct { Language string `json:"language"` Segments []Segment `json:"segments"` @@ -53,35 +102,188 @@ type whisperResponse struct { AlignError *string `json:"align_error,omitempty"` } +type audioLLMResponse struct { + Text string + Model string +} + +type audioLLMChatRequest struct { + Model string `json:"model"` + Messages []audioLLMChatMessage `json:"messages"` + MaxTokens int `json:"max_tokens,omitempty"` + Temperature float64 `json:"temperature"` +} + +type audioLLMChatMessage struct { + Role string `json:"role"` + Content []audioLLMContentPart `json:"content"` +} + +type audioLLMContentPart struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + InputAudio *audioLLMAudio `json:"input_audio,omitempty"` +} + +type audioLLMAudio struct { + Data string `json:"data"` + Format string `json:"format,omitempty"` +} + +type audioLLMChatResponse struct { + Model string `json:"model,omitempty"` + Choices []struct { + Message struct { + Content string `json:"content"` + } `json:"message"` + } `json:"choices"` + Error *struct { + Message string `json:"message"` + } `json:"error,omitempty"` +} + func New(baseURL string, timeout time.Duration, ffmpegPath string, leadSilence time.Duration) *Client { - baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/") - if baseURL == "" { - return nil - } - if timeout <= 0 { - timeout = 10 * time.Minute - } + return NewWithOptions(Options{ + Providers: []string{ProviderWhisperX}, + WhisperXURL: baseURL, + WhisperXTimeout: timeout, + FfmpegPath: ffmpegPath, + LeadSilence: leadSilence, + }) +} + +func NewWithOptions(opts Options) *Client { + leadSilence := opts.LeadSilence if leadSilence < 0 { leadSilence = 0 } if leadSilence > 5*time.Second { leadSilence = 5 * time.Second } - ffmpegPath = strings.TrimSpace(ffmpegPath) + ffmpegPath := strings.TrimSpace(opts.FfmpegPath) if ffmpegPath == "" { ffmpegPath = "ffmpeg" } + maxTokens := opts.AudioLLMMaxTokens + if maxTokens <= 0 { + maxTokens = 4096 + } + audioLLMPrompt := strings.TrimSpace(opts.AudioLLMPrompt) + if audioLLMPrompt == "" { + audioLLMPrompt = "Transcribe the audio exactly. Return only the transcript text." + } + providers := buildProviders(opts, audioLLMPrompt, maxTokens) + if len(providers) == 0 { + return nil + } return &Client{ - baseURL: baseURL, - http: &http.Client{Timeout: timeout}, + providers: providers, + http: &http.Client{Timeout: maxProviderTimeout(providers)}, ffmpegPath: ffmpegPath, leadSilence: leadSilence, } } +func buildProviders(opts Options, prompt string, maxTokens int) []ProviderConfig { + order := normalizeProviderOrder(opts.Providers) + out := make([]ProviderConfig, 0, len(order)) + for _, name := range order { + switch name { + case ProviderWhisperX: + baseURL := strings.TrimRight(strings.TrimSpace(opts.WhisperXURL), "/") + if baseURL == "" { + continue + } + out = append(out, ProviderConfig{ + Name: ProviderWhisperX, + Kind: ProviderWhisperX, + BaseURL: baseURL, + Model: ProviderWhisperX, + Timeout: defaultDuration(opts.WhisperXTimeout, 10*time.Minute), + }) + case ProviderQwenAudio: + baseURL := strings.TrimRight(strings.TrimSpace(opts.QwenAudioBaseURL), "/") + if baseURL == "" { + continue + } + model := firstNonEmpty(opts.QwenAudioModel, "Qwen/Qwen2-Audio-7B-Instruct") + out = append(out, ProviderConfig{ + Name: ProviderQwenAudio, + Kind: "audio_llm", + BaseURL: baseURL, + APIKey: strings.TrimSpace(opts.QwenAudioAPIKey), + Model: model, + Timeout: defaultDuration(opts.QwenAudioTimeout, 10*time.Minute), + MaxTokens: maxTokens, + Prompt: prompt, + }) + case ProviderVoxtral: + baseURL := strings.TrimRight(strings.TrimSpace(opts.VoxtralBaseURL), "/") + if baseURL == "" { + continue + } + model := firstNonEmpty(opts.VoxtralModel, "mistralai/Voxtral-Small-24B-2507") + out = append(out, ProviderConfig{ + Name: ProviderVoxtral, + Kind: "audio_llm", + BaseURL: baseURL, + APIKey: strings.TrimSpace(opts.VoxtralAPIKey), + Model: model, + Timeout: defaultDuration(opts.VoxtralTimeout, 10*time.Minute), + MaxTokens: maxTokens, + Prompt: prompt, + }) + } + } + return out +} + +func normalizeProviderOrder(in []string) []string { + if len(in) == 0 { + return []string{ProviderWhisperX} + } + out := make([]string, 0, len(in)) + seen := map[string]bool{} + for _, raw := range in { + name := strings.ToLower(strings.TrimSpace(raw)) + switch name { + case "whisper", "whisperx": + name = ProviderWhisperX + case "qwen", "qwen-audio", "qwen2-audio", "qwen2-audio-7b-instruct": + name = ProviderQwenAudio + case "voxtral", "voxtral-small", "voxtral-small-24b-2507": + name = ProviderVoxtral + default: + continue + } + if !seen[name] { + out = append(out, name) + seen[name] = true + } + } + return out +} + +func maxProviderTimeout(providers []ProviderConfig) time.Duration { + maxTimeout := 10 * time.Minute + for _, provider := range providers { + if provider.Timeout > maxTimeout { + maxTimeout = provider.Timeout + } + } + return maxTimeout +} + +func defaultDuration(v, fallback time.Duration) time.Duration { + if v <= 0 { + return fallback + } + return v +} + func (c *Client) Transcribe(ctx context.Context, in Input) (*Result, error) { - if c == nil || c.baseURL == "" { - return nil, fmt.Errorf("whisperx not configured") + if c == nil || len(c.providers) == 0 { + return nil, fmt.Errorf("transcription providers not configured") } if strings.TrimSpace(in.AudioURL) == "" { return nil, fmt.Errorf("audio_url is required") @@ -96,18 +298,91 @@ func (c *Client) Transcribe(ctx context.Context, in Input) (*Result, error) { return nil, err } } - resp, duration, err := c.transcribeAudio(ctx, audio, filename, in) - if err != nil { - return nil, err + var attempts []Attempt + var winner *Result + var errors []string + for _, provider := range c.providers { + result, attempt, err := c.transcribeWithProvider(ctx, provider, audio, filename, in) + attempts = append(attempts, attempt) + if err != nil { + errors = append(errors, provider.Name+": "+err.Error()) + continue + } + if winner == nil { + winner = result + } } - segments := adjustLeadSilence(resp.Segments, c.leadSilence) - return &Result{ - Language: resp.Language, - Segments: segments, - DiarizeError: resp.DiarizeError, - AlignError: resp.AlignError, - DurationMS: duration.Milliseconds(), - }, nil + if winner == nil { + return nil, fmt.Errorf("all transcription providers failed: %s", strings.Join(errors, "; ")) + } + winner.Attempts = attempts + return winner, nil +} + +func (c *Client) transcribeWithProvider(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*Result, Attempt, error) { + providerCtx := ctx + cancel := func() {} + if provider.Timeout > 0 { + providerCtx, cancel = context.WithTimeout(ctx, provider.Timeout) + } + defer cancel() + attempt := Attempt{ + Provider: provider.Name, + Model: provider.Model, + Status: "failed", + } + switch provider.Kind { + case ProviderWhisperX: + resp, duration, err := c.transcribeAudio(providerCtx, provider, audio, filename, in) + attempt.DurationMS = duration.Milliseconds() + if err != nil { + attempt.Error = err.Error() + return nil, attempt, err + } + segments := adjustLeadSilence(resp.Segments, c.leadSilence) + attempt.Status = "ok" + attempt.Segments = segments + attempt.Text = segmentsText(segments) + return &Result{ + Provider: provider.Name, + Model: provider.Model, + Language: resp.Language, + Segments: segments, + DiarizeError: resp.DiarizeError, + AlignError: resp.AlignError, + DurationMS: duration.Milliseconds(), + }, attempt, nil + default: + resp, duration, err := c.transcribeAudioLLM(providerCtx, provider, audio, filename, in) + attempt.DurationMS = duration.Milliseconds() + if err != nil { + attempt.Error = err.Error() + return nil, attempt, err + } + text := strings.TrimSpace(resp.Text) + segments := []Segment{{Start: 0, End: 0, Text: text}} + attempt.Status = "ok" + attempt.Model = resp.Model + attempt.Text = text + attempt.Segments = segments + return &Result{ + Provider: provider.Name, + Model: resp.Model, + Language: firstNonEmpty(in.Language, "unknown"), + Segments: segments, + DurationMS: duration.Milliseconds(), + }, attempt, nil + } +} + +func segmentsText(segments []Segment) string { + parts := make([]string, 0, len(segments)) + for _, segment := range segments { + if text := strings.TrimSpace(segment.Text); text != "" { + parts = append(parts, text) + } + } + return strings.Join(parts, "\n") } func (c *Client) downloadAudio(ctx context.Context, in Input) ([]byte, string, error) { @@ -222,7 +497,7 @@ func clampTime(v float64) float64 { return v } -func (c *Client) transcribeAudio(ctx context.Context, audio []byte, filename string, in Input) (*whisperResponse, time.Duration, error) { +func (c *Client) transcribeAudio(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*whisperResponse, time.Duration, error) { body := &bytes.Buffer{} mw := multipart.NewWriter(body) fw, err := mw.CreateFormFile("file", filename) @@ -250,7 +525,7 @@ func (c *Client) transcribeAudio(ctx context.Context, audio []byte, filename str return nil, 0, fmt.Errorf("close form: %w", err) } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/transcribe", body) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, provider.BaseURL+"/transcribe", body) if err != nil { return nil, 0, fmt.Errorf("whisperx request: %w", err) } @@ -273,3 +548,97 @@ func (c *Client) transcribeAudio(ctx context.Context, audio []byte, filename str } return &out, duration, nil } + +func (c *Client) transcribeAudioLLM(ctx context.Context, provider ProviderConfig, audio []byte, filename string, in Input) (*audioLLMResponse, time.Duration, error) { + prompt := provider.Prompt + if in.Language != "" { + prompt += "\nЯзык аудио: " + in.Language + "." + } + if in.Diarize { + prompt += "\nЕсли слышны разные говорящие, разделяй реплики с короткими пометками Спикер 1/Спикер 2." + } + reqBody := audioLLMChatRequest{ + Model: provider.Model, + MaxTokens: provider.MaxTokens, + Temperature: 0, + Messages: []audioLLMChatMessage{ + { + Role: "user", + Content: []audioLLMContentPart{ + {Type: "text", Text: prompt}, + { + Type: "input_audio", + InputAudio: &audioLLMAudio{ + Data: base64.StdEncoding.EncodeToString(audio), + Format: audioFormat(filename), + }, + }, + }, + }, + }, + } + body, err := json.Marshal(reqBody) + if err != nil { + return nil, 0, fmt.Errorf("audio llm marshal: %w", err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, provider.BaseURL+"/v1/chat/completions", bytes.NewReader(body)) + if err != nil { + return nil, 0, fmt.Errorf("audio llm request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + if provider.APIKey != "" { + req.Header.Set("Authorization", "Bearer "+provider.APIKey) + } + + start := time.Now() + resp, err := c.http.Do(req) + duration := time.Since(start) + if err != nil { + return nil, duration, fmt.Errorf("audio llm do: %w", err) + } + defer resp.Body.Close() + raw, err := io.ReadAll(io.LimitReader(resp.Body, 4<<20)) + if err != nil { + return nil, duration, fmt.Errorf("audio llm read: %w", err) + } + if resp.StatusCode >= 300 { + return nil, duration, fmt.Errorf("audio llm HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(raw))) + } + var out audioLLMChatResponse + if err := json.Unmarshal(raw, &out); err != nil { + return nil, duration, fmt.Errorf("audio llm decode: %w", err) + } + if out.Error != nil { + return nil, duration, fmt.Errorf("audio llm error: %s", out.Error.Message) + } + if len(out.Choices) == 0 { + return nil, duration, fmt.Errorf("audio llm: empty choices") + } + modelName := out.Model + if modelName == "" { + modelName = provider.Model + } + return &audioLLMResponse{ + Text: strings.TrimSpace(out.Choices[0].Message.Content), + Model: modelName, + }, duration, nil +} + +func audioFormat(filename string) string { + ext := strings.TrimPrefix(strings.ToLower(filepath.Ext(filename)), ".") + switch ext { + case "wav", "mp3", "flac", "m4a", "ogg", "opus", "webm": + return ext + default: + return "mp3" + } +} + +func firstNonEmpty(values ...string) string { + for _, value := range values { + if strings.TrimSpace(value) != "" { + return value + } + } + return "" +} diff --git a/internal/transcription/client_test.go b/internal/transcription/client_test.go index fb31b72..6f44e94 100644 --- a/internal/transcription/client_test.go +++ b/internal/transcription/client_test.go @@ -23,6 +23,41 @@ func TestAdjustLeadSilence(t *testing.T) { } } +func TestNormalizeProviderOrder(t *testing.T) { + got := normalizeProviderOrder([]string{"whisperx", "qwen", "voxtral", "qwen2-audio"}) + want := []string{ProviderWhisperX, ProviderQwenAudio, ProviderVoxtral} + if len(got) != len(want) { + t.Fatalf("providers = %#v, want %#v", got, want) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("providers = %#v, want %#v", got, want) + } + } +} + +func TestNewWithOptionsBuildsComparisonProviders(t *testing.T) { + client := NewWithOptions(Options{ + Providers: []string{"whisperx", "qwen2-audio", "voxtral-small"}, + WhisperXURL: "http://whisperx", + QwenAudioBaseURL: "http://qwen", + VoxtralBaseURL: "http://voxtral", + }) + if client == nil { + t.Fatal("client is nil") + } + got := make([]string, 0, len(client.providers)) + for _, provider := range client.providers { + got = append(got, provider.Name) + } + want := []string{ProviderWhisperX, ProviderQwenAudio, ProviderVoxtral} + for i := range want { + if got[i] != want[i] { + t.Fatalf("providers = %#v, want %#v", got, want) + } + } +} + func near(got, want float64) bool { return math.Abs(got-want) < 0.000001 } diff --git a/internal/worker/worker.go b/internal/worker/worker.go index ec5396e..228e1a0 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -139,7 +139,7 @@ func (w *Worker) process(ctx context.Context, job *model.Job) { func (w *Worker) processTranscription(ctx context.Context, job *model.Job) { if w.transcriber == nil { - w.fail(ctx, job, "provider_unavailable", "whisperx not configured") + w.fail(ctx, job, "provider_unavailable", "transcription providers not configured") return } var input transcription.Input @@ -186,8 +186,10 @@ func classifyTranscriptionError(err error) string { return "storage_error" case strings.Contains(s, "whisperx http 4") || strings.Contains(s, "ffmpeg") || strings.Contains(s, "invalid data") || strings.Contains(s, "could not decode"): return "bad_audio" - case strings.Contains(s, "whisperx http 5") || strings.Contains(s, "whisperx do") || strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "closed network connection"): + case strings.Contains(s, "whisperx http 5") || strings.Contains(s, "whisperx do") || strings.Contains(s, "audio llm http 5") || strings.Contains(s, "audio llm do") || strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "closed network connection"): return "provider_unavailable" + case strings.Contains(s, "audio llm http 4"): + return "bad_input" case strings.Contains(s, "decode"): return "bad_response" default: diff --git a/k8s/configmap.yaml b/k8s/configmap.yaml index 6066f40..fb0e753 100644 --- a/k8s/configmap.yaml +++ b/k8s/configmap.yaml @@ -11,9 +11,19 @@ data: LLM_BASE_URL: "http://10.2.3.5:8002" LLM_MODEL: "qwen2.5-14b" LLM_TIMEOUT: "5m" + TRANSCRIPTION_PROVIDERS: "whisperx,qwen2-audio,voxtral-small" WHISPERX_URL: "http://10.2.3.5:8001" WHISPERX_TIMEOUT: "10m" WHISPERX_LEAD_SILENCE: "800ms" + # Fill these after Qwen2-Audio and Voxtral are exposed as OpenAI-compatible + # chat-completions endpoints on the AI server. + QWEN_AUDIO_BASE_URL: "" + QWEN_AUDIO_MODEL: "Qwen/Qwen2-Audio-7B-Instruct" + QWEN_AUDIO_TIMEOUT: "10m" + VOXTRAL_BASE_URL: "" + VOXTRAL_MODEL: "mistralai/Voxtral-Small-24B-2507" + VOXTRAL_TIMEOUT: "10m" + AUDIO_LLM_MAX_TOKENS: "4096" FFMPEG_PATH: "/usr/bin/ffmpeg" AI_STATS_SIDECAR_URL: "http://10.2.3.5:9090" AI_STATS_TIMEOUT: "8s"