From 01ee090fa52e70aaf9d3aa4126cca3eedd26d981 Mon Sep 17 00:00:00 2001 From: Grendgi Date: Tue, 9 Jun 2026 11:38:03 +0300 Subject: [PATCH] Add worker health endpoints --- README.md | 4 ++ cmd/worker/main.go | 85 ++++++++++++++++++++++++++++++++++++++ internal/config/config.go | 4 ++ k8s/configmap.yaml | 2 + k8s/worker-deployment.yaml | 36 ++++++++++++++++ 5 files changed, 131 insertions(+) diff --git a/README.md b/README.md index a384181..4fb179c 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,8 @@ domain metadata fields in `input`, but the worker only reads chat fields such as (GPU, containers, vLLM and WhisperX live metrics) when configured. - `GET /healthz` returns process health. - `GET /readyz` checks PostgreSQL readiness. +- Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`: + `GET /healthz`, `GET /readyz` and `GET /worker/status`. All `/api/v1/*` endpoints require `Authorization: Bearer ` when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open @@ -83,6 +85,8 @@ for Kubernetes probes. - `LLM_TIMEOUT`, default `5m` - `WHISPERX_URL`, WhisperX endpoint for transcription jobs - `WORKER_ID`, default hostname +- `WORKER_HTTP_HOST`, default `0.0.0.0` +- `WORKER_HTTP_PORT`, default `8081` - `WORKER_POLL_INTERVAL`, default `2s` - `WORKER_CLAIM_LIMIT`, default `4` - `WORKER_LEASE_TIMEOUT`, default `15m` diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 3b4da2b..ae15c08 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -2,10 +2,16 @@ package main import ( "context" + "encoding/json" + "errors" + "fmt" "log/slog" + "net/http" "os" "os/signal" + "strings" "syscall" + "time" "ai-service/internal/config" "ai-service/internal/llm" @@ -44,6 +50,7 @@ func main() { llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout) transcriber := transcription.New(cfg.WhisperXURL, cfg.WhisperXTimeout, cfg.FfmpegPath, cfg.WhisperXLeadSilence) w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit) + healthSrv := startHealthServer(ctx, db, cfg) slog.Info("ai_worker_started", "worker_id", cfg.WorkerID, @@ -57,4 +64,82 @@ func main() { "claim_limit", cfg.WorkerClaimLimit, ) w.Run(ctx) + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = healthSrv.Shutdown(shutdownCtx) +} + +type workerHealth struct { + store *store.Store + cfg config.Config +} + +func startHealthServer(ctx context.Context, db *store.Store, cfg config.Config) *http.Server { + srv := &http.Server{ + Addr: fmt.Sprintf("%s:%d", cfg.WorkerHTTPHost, cfg.WorkerHTTPPort), + Handler: workerHealth{store: db, cfg: cfg}, + ReadHeaderTimeout: 5 * time.Second, + } + go func() { + slog.Info("ai_worker_health_started", "addr", srv.Addr) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + slog.Error("ai_worker_health_failed", "error", err) + } + }() + go func() { + <-ctx.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = srv.Shutdown(shutdownCtx) + }() + return srv +} + +func (h workerHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) { + path := strings.TrimSuffix(r.URL.Path, "/") + if path == "" { + path = "/" + } + switch { + case r.Method == http.MethodGet && path == "/healthz": + writeWorkerJSON(w, http.StatusOK, map[string]any{ + "status": "ok", + "worker_id": h.cfg.WorkerID, + }) + case r.Method == http.MethodGet && path == "/readyz": + ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second) + defer cancel() + if err := h.store.Ping(ctx); err != nil { + writeWorkerJSON(w, http.StatusServiceUnavailable, map[string]any{ + "status": "not_ready", + "error": err.Error(), + }) + return + } + writeWorkerJSON(w, http.StatusOK, map[string]any{ + "status": "ready", + "worker_id": h.cfg.WorkerID, + }) + case r.Method == http.MethodGet && path == "/worker/status": + writeWorkerJSON(w, http.StatusOK, map[string]any{ + "status": "running", + "worker_id": h.cfg.WorkerID, + "task_types": h.cfg.WorkerTaskTypes, + "model_profiles": h.cfg.WorkerModelProfiles, + "claim_limit": h.cfg.WorkerClaimLimit, + "poll_interval": h.cfg.WorkerPollInterval.String(), + "lease_timeout": h.cfg.WorkerLeaseTimeout.String(), + }) + default: + writeWorkerJSON(w, http.StatusNotFound, map[string]any{"error": "not found"}) + } +} + +func writeWorkerJSON(w http.ResponseWriter, code int, body any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + if err := json.NewEncoder(w).Encode(body); err != nil { + slog.Warn("worker_health_write_failed", "error", err) + } } diff --git a/internal/config/config.go b/internal/config/config.go index e337231..da20a3b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -26,6 +26,8 @@ type Config struct { AIStatsTimeout time.Duration WorkerID string + WorkerHTTPHost string + WorkerHTTPPort int WorkerPollInterval time.Duration WorkerClaimLimit int WorkerLeaseTimeout time.Duration @@ -53,6 +55,8 @@ func Load() Config { AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second), WorkerID: envString("WORKER_ID", hostname()), + WorkerHTTPHost: envString("WORKER_HTTP_HOST", "0.0.0.0"), + WorkerHTTPPort: envInt("WORKER_HTTP_PORT", 8081), WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second), WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4), WorkerLeaseTimeout: envDuration("WORKER_LEASE_TIMEOUT", 15*time.Minute), diff --git a/k8s/configmap.yaml b/k8s/configmap.yaml index 31494f1..6066f40 100644 --- a/k8s/configmap.yaml +++ b/k8s/configmap.yaml @@ -18,5 +18,7 @@ data: AI_STATS_SIDECAR_URL: "http://10.2.3.5:9090" AI_STATS_TIMEOUT: "8s" WORKER_POLL_INTERVAL: "2s" + WORKER_HTTP_HOST: "0.0.0.0" + WORKER_HTTP_PORT: "8081" WORKER_CLAIM_LIMIT: "4" WORKER_LEASE_TIMEOUT: "15m" diff --git a/k8s/worker-deployment.yaml b/k8s/worker-deployment.yaml index 92a97e6..ccc84f8 100644 --- a/k8s/worker-deployment.yaml +++ b/k8s/worker-deployment.yaml @@ -22,6 +22,8 @@ spec: - name: worker image: localhost:30300/admin/ai-service:latest command: ["/usr/local/bin/ai-service-worker"] + ports: + - containerPort: 8081 env: - name: WORKER_ID valueFrom: @@ -38,6 +40,22 @@ spec: name: ai-service-config - secretRef: name: ai-service-secrets + startupProbe: + httpGet: + path: /readyz + port: 8081 + periodSeconds: 5 + failureThreshold: 30 + readinessProbe: + httpGet: + path: /readyz + port: 8081 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /healthz + port: 8081 + periodSeconds: 10 resources: requests: cpu: 50m @@ -70,6 +88,8 @@ spec: - name: worker image: localhost:30300/admin/ai-service:latest command: ["/usr/local/bin/ai-service-worker"] + ports: + - containerPort: 8081 env: - name: WORKER_ID valueFrom: @@ -86,6 +106,22 @@ spec: name: ai-service-config - secretRef: name: ai-service-secrets + startupProbe: + httpGet: + path: /readyz + port: 8081 + periodSeconds: 5 + failureThreshold: 30 + readinessProbe: + httpGet: + path: /readyz + port: 8081 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /healthz + port: 8081 + periodSeconds: 10 resources: requests: cpu: 50m