Add worker health endpoints
This commit is contained in:
@@ -65,6 +65,8 @@ domain metadata fields in `input`, but the worker only reads chat fields such as
|
||||
(GPU, containers, vLLM and WhisperX live metrics) when configured.
|
||||
- `GET /healthz` returns process health.
|
||||
- `GET /readyz` checks PostgreSQL readiness.
|
||||
- Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`:
|
||||
`GET /healthz`, `GET /readyz` and `GET /worker/status`.
|
||||
|
||||
All `/api/v1/*` endpoints require `Authorization: Bearer <AI_SERVICE_TOKEN>`
|
||||
when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open
|
||||
@@ -83,6 +85,8 @@ for Kubernetes probes.
|
||||
- `LLM_TIMEOUT`, default `5m`
|
||||
- `WHISPERX_URL`, WhisperX endpoint for transcription jobs
|
||||
- `WORKER_ID`, default hostname
|
||||
- `WORKER_HTTP_HOST`, default `0.0.0.0`
|
||||
- `WORKER_HTTP_PORT`, default `8081`
|
||||
- `WORKER_POLL_INTERVAL`, default `2s`
|
||||
- `WORKER_CLAIM_LIMIT`, default `4`
|
||||
- `WORKER_LEASE_TIMEOUT`, default `15m`
|
||||
|
||||
@@ -2,10 +2,16 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"ai-service/internal/config"
|
||||
"ai-service/internal/llm"
|
||||
@@ -44,6 +50,7 @@ func main() {
|
||||
llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout)
|
||||
transcriber := transcription.New(cfg.WhisperXURL, cfg.WhisperXTimeout, cfg.FfmpegPath, cfg.WhisperXLeadSilence)
|
||||
w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit)
|
||||
healthSrv := startHealthServer(ctx, db, cfg)
|
||||
|
||||
slog.Info("ai_worker_started",
|
||||
"worker_id", cfg.WorkerID,
|
||||
@@ -57,4 +64,82 @@ func main() {
|
||||
"claim_limit", cfg.WorkerClaimLimit,
|
||||
)
|
||||
w.Run(ctx)
|
||||
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
_ = healthSrv.Shutdown(shutdownCtx)
|
||||
}
|
||||
|
||||
type workerHealth struct {
|
||||
store *store.Store
|
||||
cfg config.Config
|
||||
}
|
||||
|
||||
func startHealthServer(ctx context.Context, db *store.Store, cfg config.Config) *http.Server {
|
||||
srv := &http.Server{
|
||||
Addr: fmt.Sprintf("%s:%d", cfg.WorkerHTTPHost, cfg.WorkerHTTPPort),
|
||||
Handler: workerHealth{store: db, cfg: cfg},
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
}
|
||||
go func() {
|
||||
slog.Info("ai_worker_health_started", "addr", srv.Addr)
|
||||
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
slog.Error("ai_worker_health_failed", "error", err)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
_ = srv.Shutdown(shutdownCtx)
|
||||
}()
|
||||
return srv
|
||||
}
|
||||
|
||||
func (h workerHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
path := strings.TrimSuffix(r.URL.Path, "/")
|
||||
if path == "" {
|
||||
path = "/"
|
||||
}
|
||||
switch {
|
||||
case r.Method == http.MethodGet && path == "/healthz":
|
||||
writeWorkerJSON(w, http.StatusOK, map[string]any{
|
||||
"status": "ok",
|
||||
"worker_id": h.cfg.WorkerID,
|
||||
})
|
||||
case r.Method == http.MethodGet && path == "/readyz":
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
|
||||
defer cancel()
|
||||
if err := h.store.Ping(ctx); err != nil {
|
||||
writeWorkerJSON(w, http.StatusServiceUnavailable, map[string]any{
|
||||
"status": "not_ready",
|
||||
"error": err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
writeWorkerJSON(w, http.StatusOK, map[string]any{
|
||||
"status": "ready",
|
||||
"worker_id": h.cfg.WorkerID,
|
||||
})
|
||||
case r.Method == http.MethodGet && path == "/worker/status":
|
||||
writeWorkerJSON(w, http.StatusOK, map[string]any{
|
||||
"status": "running",
|
||||
"worker_id": h.cfg.WorkerID,
|
||||
"task_types": h.cfg.WorkerTaskTypes,
|
||||
"model_profiles": h.cfg.WorkerModelProfiles,
|
||||
"claim_limit": h.cfg.WorkerClaimLimit,
|
||||
"poll_interval": h.cfg.WorkerPollInterval.String(),
|
||||
"lease_timeout": h.cfg.WorkerLeaseTimeout.String(),
|
||||
})
|
||||
default:
|
||||
writeWorkerJSON(w, http.StatusNotFound, map[string]any{"error": "not found"})
|
||||
}
|
||||
}
|
||||
|
||||
func writeWorkerJSON(w http.ResponseWriter, code int, body any) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(code)
|
||||
if err := json.NewEncoder(w).Encode(body); err != nil {
|
||||
slog.Warn("worker_health_write_failed", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,8 @@ type Config struct {
|
||||
AIStatsTimeout time.Duration
|
||||
|
||||
WorkerID string
|
||||
WorkerHTTPHost string
|
||||
WorkerHTTPPort int
|
||||
WorkerPollInterval time.Duration
|
||||
WorkerClaimLimit int
|
||||
WorkerLeaseTimeout time.Duration
|
||||
@@ -53,6 +55,8 @@ func Load() Config {
|
||||
AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second),
|
||||
|
||||
WorkerID: envString("WORKER_ID", hostname()),
|
||||
WorkerHTTPHost: envString("WORKER_HTTP_HOST", "0.0.0.0"),
|
||||
WorkerHTTPPort: envInt("WORKER_HTTP_PORT", 8081),
|
||||
WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second),
|
||||
WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4),
|
||||
WorkerLeaseTimeout: envDuration("WORKER_LEASE_TIMEOUT", 15*time.Minute),
|
||||
|
||||
@@ -18,5 +18,7 @@ data:
|
||||
AI_STATS_SIDECAR_URL: "http://10.2.3.5:9090"
|
||||
AI_STATS_TIMEOUT: "8s"
|
||||
WORKER_POLL_INTERVAL: "2s"
|
||||
WORKER_HTTP_HOST: "0.0.0.0"
|
||||
WORKER_HTTP_PORT: "8081"
|
||||
WORKER_CLAIM_LIMIT: "4"
|
||||
WORKER_LEASE_TIMEOUT: "15m"
|
||||
|
||||
@@ -22,6 +22,8 @@ spec:
|
||||
- name: worker
|
||||
image: localhost:30300/admin/ai-service:latest
|
||||
command: ["/usr/local/bin/ai-service-worker"]
|
||||
ports:
|
||||
- containerPort: 8081
|
||||
env:
|
||||
- name: WORKER_ID
|
||||
valueFrom:
|
||||
@@ -38,6 +40,22 @@ spec:
|
||||
name: ai-service-config
|
||||
- secretRef:
|
||||
name: ai-service-secrets
|
||||
startupProbe:
|
||||
httpGet:
|
||||
path: /readyz
|
||||
port: 8081
|
||||
periodSeconds: 5
|
||||
failureThreshold: 30
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /readyz
|
||||
port: 8081
|
||||
periodSeconds: 10
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
path: /healthz
|
||||
port: 8081
|
||||
periodSeconds: 10
|
||||
resources:
|
||||
requests:
|
||||
cpu: 50m
|
||||
@@ -70,6 +88,8 @@ spec:
|
||||
- name: worker
|
||||
image: localhost:30300/admin/ai-service:latest
|
||||
command: ["/usr/local/bin/ai-service-worker"]
|
||||
ports:
|
||||
- containerPort: 8081
|
||||
env:
|
||||
- name: WORKER_ID
|
||||
valueFrom:
|
||||
@@ -86,6 +106,22 @@ spec:
|
||||
name: ai-service-config
|
||||
- secretRef:
|
||||
name: ai-service-secrets
|
||||
startupProbe:
|
||||
httpGet:
|
||||
path: /readyz
|
||||
port: 8081
|
||||
periodSeconds: 5
|
||||
failureThreshold: 30
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /readyz
|
||||
port: 8081
|
||||
periodSeconds: 10
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
path: /healthz
|
||||
port: 8081
|
||||
periodSeconds: 10
|
||||
resources:
|
||||
requests:
|
||||
cpu: 50m
|
||||
|
||||
Reference in New Issue
Block a user