package main import ( "context" "encoding/json" "errors" "fmt" "log/slog" "net/http" "os" "os/signal" "strings" "syscall" "time" "ai-service/internal/config" "ai-service/internal/llm" "ai-service/internal/migrate" "ai-service/internal/store" "ai-service/internal/transcription" "ai-service/internal/worker" ) func main() { cfg := config.Load() slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil))) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() db, err := store.Open(ctx, cfg.DatabaseURL) if err != nil { slog.Error("db_open_failed", "error", err) os.Exit(1) } defer db.Close() if cfg.MigrateOnStart { if err := migrate.Up(ctx, db); err != nil { slog.Error("migrate_failed", "error", err) os.Exit(1) } } if cfg.LLMBaseURL == "" { slog.Error("llm_not_configured") os.Exit(1) } llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout) transcriber := transcription.NewWithOptions(transcription.Options{ Providers: cfg.TranscriptionProviders, WhisperXURL: cfg.WhisperXURL, WhisperXTimeout: cfg.WhisperXTimeout, FfmpegPath: cfg.FfmpegPath, LeadSilence: cfg.WhisperXLeadSilence, QwenAudioBaseURL: cfg.QwenAudioBaseURL, QwenAudioAPIKey: cfg.QwenAudioAPIKey, QwenAudioModel: cfg.QwenAudioModel, QwenAudioTimeout: cfg.QwenAudioTimeout, VoxtralBaseURL: cfg.VoxtralBaseURL, VoxtralAPIKey: cfg.VoxtralAPIKey, VoxtralModel: cfg.VoxtralModel, VoxtralTimeout: cfg.VoxtralTimeout, AudioLLMPrompt: cfg.AudioLLMPrompt, AudioLLMMaxTokens: cfg.AudioLLMMaxTokens, }) w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit) healthSrv := startHealthServer(ctx, db, cfg) slog.Info("ai_worker_started", "worker_id", cfg.WorkerID, "model", cfg.LLMModel, "transcription_enabled", transcriber != nil, "transcription_providers", cfg.TranscriptionProviders, "whisperx_lead_silence", cfg.WhisperXLeadSilence.String(), "task_types", cfg.WorkerTaskTypes, "model_profiles", cfg.WorkerModelProfiles, "poll_interval", cfg.WorkerPollInterval.String(), "lease_timeout", cfg.WorkerLeaseTimeout.String(), "claim_limit", cfg.WorkerClaimLimit, ) w.Run(ctx) shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = healthSrv.Shutdown(shutdownCtx) } type workerHealth struct { store *store.Store cfg config.Config } func startHealthServer(ctx context.Context, db *store.Store, cfg config.Config) *http.Server { srv := &http.Server{ Addr: fmt.Sprintf("%s:%d", cfg.WorkerHTTPHost, cfg.WorkerHTTPPort), Handler: workerHealth{store: db, cfg: cfg}, ReadHeaderTimeout: 5 * time.Second, } go func() { slog.Info("ai_worker_health_started", "addr", srv.Addr) if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { slog.Error("ai_worker_health_failed", "error", err) } }() go func() { <-ctx.Done() shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = srv.Shutdown(shutdownCtx) }() return srv } func (h workerHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) { path := strings.TrimSuffix(r.URL.Path, "/") if path == "" { path = "/" } switch { case r.Method == http.MethodGet && path == "/healthz": writeWorkerJSON(w, http.StatusOK, map[string]any{ "status": "ok", "worker_id": h.cfg.WorkerID, }) case r.Method == http.MethodGet && path == "/readyz": ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second) defer cancel() if err := h.store.Ping(ctx); err != nil { writeWorkerJSON(w, http.StatusServiceUnavailable, map[string]any{ "status": "not_ready", "error": err.Error(), }) return } writeWorkerJSON(w, http.StatusOK, map[string]any{ "status": "ready", "worker_id": h.cfg.WorkerID, }) case r.Method == http.MethodGet && path == "/worker/status": writeWorkerJSON(w, http.StatusOK, map[string]any{ "status": "running", "worker_id": h.cfg.WorkerID, "task_types": h.cfg.WorkerTaskTypes, "model_profiles": h.cfg.WorkerModelProfiles, "transcription_providers": h.cfg.TranscriptionProviders, "claim_limit": h.cfg.WorkerClaimLimit, "poll_interval": h.cfg.WorkerPollInterval.String(), "lease_timeout": h.cfg.WorkerLeaseTimeout.String(), }) default: writeWorkerJSON(w, http.StatusNotFound, map[string]any{"error": "not found"}) } } func writeWorkerJSON(w http.ResponseWriter, code int, body any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) if err := json.NewEncoder(w).Encode(body); err != nil { slog.Warn("worker_health_write_failed", "error", err) } }