diff --git a/cmd/worker/main.go b/cmd/worker/main.go index a8751ab..abff1de 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -43,12 +43,14 @@ func main() { llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout) transcriber := transcription.New(cfg.WhisperXURL, cfg.WhisperXTimeout) - w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit) + w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit) slog.Info("ai_worker_started", "worker_id", cfg.WorkerID, "model", cfg.LLMModel, "whisperx_enabled", transcriber != nil, + "task_types", cfg.WorkerTaskTypes, + "model_profiles", cfg.WorkerModelProfiles, "poll_interval", cfg.WorkerPollInterval.String(), "lease_timeout", cfg.WorkerLeaseTimeout.String(), "claim_limit", cfg.WorkerClaimLimit, diff --git a/internal/config/config.go b/internal/config/config.go index 5e34d30..bae8f7c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,6 +3,7 @@ package config import ( "os" "strconv" + "strings" "time" ) @@ -20,10 +21,12 @@ type Config struct { WhisperXURL string WhisperXTimeout time.Duration - WorkerID string - WorkerPollInterval time.Duration - WorkerClaimLimit int - WorkerLeaseTimeout time.Duration + WorkerID string + WorkerPollInterval time.Duration + WorkerClaimLimit int + WorkerLeaseTimeout time.Duration + WorkerTaskTypes []string + WorkerModelProfiles []string } func Load() Config { @@ -41,10 +44,12 @@ func Load() Config { WhisperXURL: envString("WHISPERX_URL", ""), WhisperXTimeout: envDuration("WHISPERX_TIMEOUT", 10*time.Minute), - WorkerID: envString("WORKER_ID", hostname()), - WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second), - WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4), - WorkerLeaseTimeout: envDuration("WORKER_LEASE_TIMEOUT", 15*time.Minute), + WorkerID: envString("WORKER_ID", hostname()), + WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second), + WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4), + WorkerLeaseTimeout: envDuration("WORKER_LEASE_TIMEOUT", 15*time.Minute), + WorkerTaskTypes: envCSV("WORKER_TASK_TYPES"), + WorkerModelProfiles: envCSV("WORKER_MODEL_PROFILES"), } } @@ -91,6 +96,21 @@ func envDuration(key string, fallback time.Duration) time.Duration { return v } +func envCSV(key string) []string { + raw := strings.TrimSpace(os.Getenv(key)) + if raw == "" { + return nil + } + parts := strings.Split(raw, ",") + out := make([]string, 0, len(parts)) + for _, part := range parts { + if v := strings.TrimSpace(part); v != "" { + out = append(out, v) + } + } + return out +} + func hostname() string { h, err := os.Hostname() if err != nil || h == "" { diff --git a/internal/worker/worker.go b/internal/worker/worker.go index f430df5..0d5d3aa 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -23,17 +23,19 @@ const ( ) type Worker struct { - store *store.Store - llm *llm.Client - transcriber *transcription.Client - workerID string - modelProfile string - pollInterval time.Duration - claimLimit int - leaseTimeout time.Duration + store *store.Store + llm *llm.Client + transcriber *transcription.Client + workerID string + modelProfile string + taskTypes []string + modelProfiles []string + pollInterval time.Duration + claimLimit int + leaseTimeout time.Duration } -func New(store *store.Store, llmClient *llm.Client, transcriber *transcription.Client, workerID, modelProfile string, pollInterval, leaseTimeout time.Duration, claimLimit int) *Worker { +func New(store *store.Store, llmClient *llm.Client, transcriber *transcription.Client, workerID, modelProfile string, taskTypes, modelProfiles []string, pollInterval, leaseTimeout time.Duration, claimLimit int) *Worker { if pollInterval <= 0 { pollInterval = 2 * time.Second } @@ -46,15 +48,20 @@ func New(store *store.Store, llmClient *llm.Client, transcriber *transcription.C if strings.TrimSpace(workerID) == "" { workerID = "ai-service-worker" } + if len(modelProfiles) == 0 { + modelProfiles = []string{modelProfile, TranscriptionProfile} + } return &Worker{ - store: store, - llm: llmClient, - transcriber: transcriber, - workerID: workerID, - modelProfile: modelProfile, - pollInterval: pollInterval, - claimLimit: claimLimit, - leaseTimeout: leaseTimeout, + store: store, + llm: llmClient, + transcriber: transcriber, + workerID: workerID, + modelProfile: modelProfile, + taskTypes: taskTypes, + modelProfiles: modelProfiles, + pollInterval: pollInterval, + claimLimit: claimLimit, + leaseTimeout: leaseTimeout, } } @@ -79,8 +86,8 @@ func (w *Worker) tick(ctx context.Context) { } jobs, err := w.store.ClaimJobs(ctx, model.ClaimJobs{ WorkerID: w.workerID, - TaskTypes: nil, - ModelProfiles: []string{w.modelProfile, TranscriptionProfile}, + TaskTypes: w.taskTypes, + ModelProfiles: w.modelProfiles, Limit: w.claimLimit, }) if err != nil { diff --git a/k8s/worker-deployment.yaml b/k8s/worker-deployment.yaml index 3bdad01..2eacee6 100644 --- a/k8s/worker-deployment.yaml +++ b/k8s/worker-deployment.yaml @@ -27,6 +27,58 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: WORKER_TASK_TYPES + value: "llm_chat,chat_completion,call_analysis,telegram_classification" + - name: WORKER_MODEL_PROFILES + value: "qwen2.5-14b" + envFrom: + - configMapRef: + name: ai-service-config + - secretRef: + name: ai-service-secrets + resources: + requests: + cpu: 50m + memory: 96Mi + limits: + cpu: 500m + memory: 384Mi +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ai-service-transcription-worker + namespace: ai-service +spec: + replicas: 1 + selector: + matchLabels: + app: ai-service-transcription-worker + template: + metadata: + labels: + app: ai-service-transcription-worker + spec: + terminationGracePeriodSeconds: 20 + hostAliases: + - ip: "77.105.173.42" + hostnames: + - "s3-minio.estateliga.work" + containers: + - name: worker + image: localhost:30300/admin/ai-service:latest + command: ["/usr/local/bin/ai-service-worker"] + env: + - name: WORKER_ID + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: WORKER_TASK_TYPES + value: "transcription" + - name: WORKER_MODEL_PROFILES + value: "whisperx" + - name: WORKER_CLAIM_LIMIT + value: "1" envFrom: - configMapRef: name: ai-service-config