Split AI service worker task queues
All checks were successful
CI / test (push) Successful in 15s
Build and Deploy / build-and-deploy (push) Successful in 27s

This commit is contained in:
Grendgi
2026-06-08 15:57:38 +03:00
parent 33317cf20d
commit 039bcdb2b2
4 changed files with 109 additions and 28 deletions

View File

@@ -43,12 +43,14 @@ func main() {
llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout) llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout)
transcriber := transcription.New(cfg.WhisperXURL, cfg.WhisperXTimeout) transcriber := transcription.New(cfg.WhisperXURL, cfg.WhisperXTimeout)
w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit) w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit)
slog.Info("ai_worker_started", slog.Info("ai_worker_started",
"worker_id", cfg.WorkerID, "worker_id", cfg.WorkerID,
"model", cfg.LLMModel, "model", cfg.LLMModel,
"whisperx_enabled", transcriber != nil, "whisperx_enabled", transcriber != nil,
"task_types", cfg.WorkerTaskTypes,
"model_profiles", cfg.WorkerModelProfiles,
"poll_interval", cfg.WorkerPollInterval.String(), "poll_interval", cfg.WorkerPollInterval.String(),
"lease_timeout", cfg.WorkerLeaseTimeout.String(), "lease_timeout", cfg.WorkerLeaseTimeout.String(),
"claim_limit", cfg.WorkerClaimLimit, "claim_limit", cfg.WorkerClaimLimit,

View File

@@ -3,6 +3,7 @@ package config
import ( import (
"os" "os"
"strconv" "strconv"
"strings"
"time" "time"
) )
@@ -20,10 +21,12 @@ type Config struct {
WhisperXURL string WhisperXURL string
WhisperXTimeout time.Duration WhisperXTimeout time.Duration
WorkerID string WorkerID string
WorkerPollInterval time.Duration WorkerPollInterval time.Duration
WorkerClaimLimit int WorkerClaimLimit int
WorkerLeaseTimeout time.Duration WorkerLeaseTimeout time.Duration
WorkerTaskTypes []string
WorkerModelProfiles []string
} }
func Load() Config { func Load() Config {
@@ -41,10 +44,12 @@ func Load() Config {
WhisperXURL: envString("WHISPERX_URL", ""), WhisperXURL: envString("WHISPERX_URL", ""),
WhisperXTimeout: envDuration("WHISPERX_TIMEOUT", 10*time.Minute), WhisperXTimeout: envDuration("WHISPERX_TIMEOUT", 10*time.Minute),
WorkerID: envString("WORKER_ID", hostname()), WorkerID: envString("WORKER_ID", hostname()),
WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second), WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second),
WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4), WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4),
WorkerLeaseTimeout: envDuration("WORKER_LEASE_TIMEOUT", 15*time.Minute), WorkerLeaseTimeout: envDuration("WORKER_LEASE_TIMEOUT", 15*time.Minute),
WorkerTaskTypes: envCSV("WORKER_TASK_TYPES"),
WorkerModelProfiles: envCSV("WORKER_MODEL_PROFILES"),
} }
} }
@@ -91,6 +96,21 @@ func envDuration(key string, fallback time.Duration) time.Duration {
return v return v
} }
func envCSV(key string) []string {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
return nil
}
parts := strings.Split(raw, ",")
out := make([]string, 0, len(parts))
for _, part := range parts {
if v := strings.TrimSpace(part); v != "" {
out = append(out, v)
}
}
return out
}
func hostname() string { func hostname() string {
h, err := os.Hostname() h, err := os.Hostname()
if err != nil || h == "" { if err != nil || h == "" {

View File

@@ -23,17 +23,19 @@ const (
) )
type Worker struct { type Worker struct {
store *store.Store store *store.Store
llm *llm.Client llm *llm.Client
transcriber *transcription.Client transcriber *transcription.Client
workerID string workerID string
modelProfile string modelProfile string
pollInterval time.Duration taskTypes []string
claimLimit int modelProfiles []string
leaseTimeout time.Duration pollInterval time.Duration
claimLimit int
leaseTimeout time.Duration
} }
func New(store *store.Store, llmClient *llm.Client, transcriber *transcription.Client, workerID, modelProfile string, pollInterval, leaseTimeout time.Duration, claimLimit int) *Worker { func New(store *store.Store, llmClient *llm.Client, transcriber *transcription.Client, workerID, modelProfile string, taskTypes, modelProfiles []string, pollInterval, leaseTimeout time.Duration, claimLimit int) *Worker {
if pollInterval <= 0 { if pollInterval <= 0 {
pollInterval = 2 * time.Second pollInterval = 2 * time.Second
} }
@@ -46,15 +48,20 @@ func New(store *store.Store, llmClient *llm.Client, transcriber *transcription.C
if strings.TrimSpace(workerID) == "" { if strings.TrimSpace(workerID) == "" {
workerID = "ai-service-worker" workerID = "ai-service-worker"
} }
if len(modelProfiles) == 0 {
modelProfiles = []string{modelProfile, TranscriptionProfile}
}
return &Worker{ return &Worker{
store: store, store: store,
llm: llmClient, llm: llmClient,
transcriber: transcriber, transcriber: transcriber,
workerID: workerID, workerID: workerID,
modelProfile: modelProfile, modelProfile: modelProfile,
pollInterval: pollInterval, taskTypes: taskTypes,
claimLimit: claimLimit, modelProfiles: modelProfiles,
leaseTimeout: leaseTimeout, pollInterval: pollInterval,
claimLimit: claimLimit,
leaseTimeout: leaseTimeout,
} }
} }
@@ -79,8 +86,8 @@ func (w *Worker) tick(ctx context.Context) {
} }
jobs, err := w.store.ClaimJobs(ctx, model.ClaimJobs{ jobs, err := w.store.ClaimJobs(ctx, model.ClaimJobs{
WorkerID: w.workerID, WorkerID: w.workerID,
TaskTypes: nil, TaskTypes: w.taskTypes,
ModelProfiles: []string{w.modelProfile, TranscriptionProfile}, ModelProfiles: w.modelProfiles,
Limit: w.claimLimit, Limit: w.claimLimit,
}) })
if err != nil { if err != nil {

View File

@@ -27,6 +27,58 @@ spec:
valueFrom: valueFrom:
fieldRef: fieldRef:
fieldPath: metadata.name fieldPath: metadata.name
- name: WORKER_TASK_TYPES
value: "llm_chat,chat_completion,call_analysis,telegram_classification"
- name: WORKER_MODEL_PROFILES
value: "qwen2.5-14b"
envFrom:
- configMapRef:
name: ai-service-config
- secretRef:
name: ai-service-secrets
resources:
requests:
cpu: 50m
memory: 96Mi
limits:
cpu: 500m
memory: 384Mi
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-service-transcription-worker
namespace: ai-service
spec:
replicas: 1
selector:
matchLabels:
app: ai-service-transcription-worker
template:
metadata:
labels:
app: ai-service-transcription-worker
spec:
terminationGracePeriodSeconds: 20
hostAliases:
- ip: "77.105.173.42"
hostnames:
- "s3-minio.estateliga.work"
containers:
- name: worker
image: localhost:30300/admin/ai-service:latest
command: ["/usr/local/bin/ai-service-worker"]
env:
- name: WORKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: WORKER_TASK_TYPES
value: "transcription"
- name: WORKER_MODEL_PROFILES
value: "whisperx"
- name: WORKER_CLAIM_LIMIT
value: "1"
envFrom: envFrom:
- configMapRef: - configMapRef:
name: ai-service-config name: ai-service-config