Compare commits
4 Commits
0e2c267053
...
da144ecefe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
da144ecefe | ||
|
|
6254bf0810 | ||
|
|
092994fe74 | ||
|
|
01ee090fa5 |
@@ -65,6 +65,8 @@ domain metadata fields in `input`, but the worker only reads chat fields such as
|
|||||||
(GPU, containers, vLLM and WhisperX live metrics) when configured.
|
(GPU, containers, vLLM and WhisperX live metrics) when configured.
|
||||||
- `GET /healthz` returns process health.
|
- `GET /healthz` returns process health.
|
||||||
- `GET /readyz` checks PostgreSQL readiness.
|
- `GET /readyz` checks PostgreSQL readiness.
|
||||||
|
- Built-in workers expose open Kubernetes endpoints on `WORKER_HTTP_PORT`:
|
||||||
|
`GET /healthz`, `GET /readyz` and `GET /worker/status`.
|
||||||
|
|
||||||
All `/api/v1/*` endpoints require `Authorization: Bearer <AI_SERVICE_TOKEN>`
|
All `/api/v1/*` endpoints require `Authorization: Bearer <AI_SERVICE_TOKEN>`
|
||||||
when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open
|
when `AI_SERVICE_TOKEN` is configured. Health and readiness endpoints stay open
|
||||||
@@ -83,6 +85,8 @@ for Kubernetes probes.
|
|||||||
- `LLM_TIMEOUT`, default `5m`
|
- `LLM_TIMEOUT`, default `5m`
|
||||||
- `WHISPERX_URL`, WhisperX endpoint for transcription jobs
|
- `WHISPERX_URL`, WhisperX endpoint for transcription jobs
|
||||||
- `WORKER_ID`, default hostname
|
- `WORKER_ID`, default hostname
|
||||||
|
- `WORKER_HTTP_HOST`, default `0.0.0.0`
|
||||||
|
- `WORKER_HTTP_PORT`, default `8081`
|
||||||
- `WORKER_POLL_INTERVAL`, default `2s`
|
- `WORKER_POLL_INTERVAL`, default `2s`
|
||||||
- `WORKER_CLAIM_LIMIT`, default `4`
|
- `WORKER_CLAIM_LIMIT`, default `4`
|
||||||
- `WORKER_LEASE_TIMEOUT`, default `15m`
|
- `WORKER_LEASE_TIMEOUT`, default `15m`
|
||||||
|
|||||||
@@ -2,10 +2,16 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"ai-service/internal/config"
|
"ai-service/internal/config"
|
||||||
"ai-service/internal/llm"
|
"ai-service/internal/llm"
|
||||||
@@ -44,6 +50,7 @@ func main() {
|
|||||||
llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout)
|
llmClient := llm.New(cfg.LLMBaseURL, cfg.LLMAPIKey, cfg.LLMModel, cfg.LLMTimeout)
|
||||||
transcriber := transcription.New(cfg.WhisperXURL, cfg.WhisperXTimeout, cfg.FfmpegPath, cfg.WhisperXLeadSilence)
|
transcriber := transcription.New(cfg.WhisperXURL, cfg.WhisperXTimeout, cfg.FfmpegPath, cfg.WhisperXLeadSilence)
|
||||||
w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit)
|
w := worker.New(db, llmClient, transcriber, cfg.WorkerID, cfg.LLMModel, cfg.WorkerTaskTypes, cfg.WorkerModelProfiles, cfg.WorkerPollInterval, cfg.WorkerLeaseTimeout, cfg.WorkerClaimLimit)
|
||||||
|
healthSrv := startHealthServer(ctx, db, cfg)
|
||||||
|
|
||||||
slog.Info("ai_worker_started",
|
slog.Info("ai_worker_started",
|
||||||
"worker_id", cfg.WorkerID,
|
"worker_id", cfg.WorkerID,
|
||||||
@@ -57,4 +64,82 @@ func main() {
|
|||||||
"claim_limit", cfg.WorkerClaimLimit,
|
"claim_limit", cfg.WorkerClaimLimit,
|
||||||
)
|
)
|
||||||
w.Run(ctx)
|
w.Run(ctx)
|
||||||
|
|
||||||
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
_ = healthSrv.Shutdown(shutdownCtx)
|
||||||
|
}
|
||||||
|
|
||||||
|
type workerHealth struct {
|
||||||
|
store *store.Store
|
||||||
|
cfg config.Config
|
||||||
|
}
|
||||||
|
|
||||||
|
func startHealthServer(ctx context.Context, db *store.Store, cfg config.Config) *http.Server {
|
||||||
|
srv := &http.Server{
|
||||||
|
Addr: fmt.Sprintf("%s:%d", cfg.WorkerHTTPHost, cfg.WorkerHTTPPort),
|
||||||
|
Handler: workerHealth{store: db, cfg: cfg},
|
||||||
|
ReadHeaderTimeout: 5 * time.Second,
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
slog.Info("ai_worker_health_started", "addr", srv.Addr)
|
||||||
|
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||||
|
slog.Error("ai_worker_health_failed", "error", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
_ = srv.Shutdown(shutdownCtx)
|
||||||
|
}()
|
||||||
|
return srv
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h workerHealth) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
path := strings.TrimSuffix(r.URL.Path, "/")
|
||||||
|
if path == "" {
|
||||||
|
path = "/"
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case r.Method == http.MethodGet && path == "/healthz":
|
||||||
|
writeWorkerJSON(w, http.StatusOK, map[string]any{
|
||||||
|
"status": "ok",
|
||||||
|
"worker_id": h.cfg.WorkerID,
|
||||||
|
})
|
||||||
|
case r.Method == http.MethodGet && path == "/readyz":
|
||||||
|
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := h.store.Ping(ctx); err != nil {
|
||||||
|
writeWorkerJSON(w, http.StatusServiceUnavailable, map[string]any{
|
||||||
|
"status": "not_ready",
|
||||||
|
"error": err.Error(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeWorkerJSON(w, http.StatusOK, map[string]any{
|
||||||
|
"status": "ready",
|
||||||
|
"worker_id": h.cfg.WorkerID,
|
||||||
|
})
|
||||||
|
case r.Method == http.MethodGet && path == "/worker/status":
|
||||||
|
writeWorkerJSON(w, http.StatusOK, map[string]any{
|
||||||
|
"status": "running",
|
||||||
|
"worker_id": h.cfg.WorkerID,
|
||||||
|
"task_types": h.cfg.WorkerTaskTypes,
|
||||||
|
"model_profiles": h.cfg.WorkerModelProfiles,
|
||||||
|
"claim_limit": h.cfg.WorkerClaimLimit,
|
||||||
|
"poll_interval": h.cfg.WorkerPollInterval.String(),
|
||||||
|
"lease_timeout": h.cfg.WorkerLeaseTimeout.String(),
|
||||||
|
})
|
||||||
|
default:
|
||||||
|
writeWorkerJSON(w, http.StatusNotFound, map[string]any{"error": "not found"})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeWorkerJSON(w http.ResponseWriter, code int, body any) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.WriteHeader(code)
|
||||||
|
if err := json.NewEncoder(w).Encode(body); err != nil {
|
||||||
|
slog.Warn("worker_health_write_failed", "error", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,6 +26,8 @@ type Config struct {
|
|||||||
AIStatsTimeout time.Duration
|
AIStatsTimeout time.Duration
|
||||||
|
|
||||||
WorkerID string
|
WorkerID string
|
||||||
|
WorkerHTTPHost string
|
||||||
|
WorkerHTTPPort int
|
||||||
WorkerPollInterval time.Duration
|
WorkerPollInterval time.Duration
|
||||||
WorkerClaimLimit int
|
WorkerClaimLimit int
|
||||||
WorkerLeaseTimeout time.Duration
|
WorkerLeaseTimeout time.Duration
|
||||||
@@ -53,6 +55,8 @@ func Load() Config {
|
|||||||
AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second),
|
AIStatsTimeout: envDuration("AI_STATS_TIMEOUT", 8*time.Second),
|
||||||
|
|
||||||
WorkerID: envString("WORKER_ID", hostname()),
|
WorkerID: envString("WORKER_ID", hostname()),
|
||||||
|
WorkerHTTPHost: envString("WORKER_HTTP_HOST", "0.0.0.0"),
|
||||||
|
WorkerHTTPPort: envInt("WORKER_HTTP_PORT", 8081),
|
||||||
WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second),
|
WorkerPollInterval: envDuration("WORKER_POLL_INTERVAL", 2*time.Second),
|
||||||
WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4),
|
WorkerClaimLimit: envInt("WORKER_CLAIM_LIMIT", 4),
|
||||||
WorkerLeaseTimeout: envDuration("WORKER_LEASE_TIMEOUT", 15*time.Minute),
|
WorkerLeaseTimeout: envDuration("WORKER_LEASE_TIMEOUT", 15*time.Minute),
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ type dashboardResponse struct {
|
|||||||
Stats *model.Stats `json:"stats"`
|
Stats *model.Stats `json:"stats"`
|
||||||
Providers providersStatusResponse `json:"providers"`
|
Providers providersStatusResponse `json:"providers"`
|
||||||
Infra infraStatusResponse `json:"infra"`
|
Infra infraStatusResponse `json:"infra"`
|
||||||
Jobs []*model.Job `json:"jobs"`
|
Jobs []*model.JobSummary `json:"jobs"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type dashboardSummary struct {
|
type dashboardSummary struct {
|
||||||
@@ -35,7 +35,7 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeError(w, http.StatusInternalServerError, err.Error())
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
jobs, err := s.store.ListJobs(ctx, model.JobFilter{
|
jobs, err := s.store.ListJobSummaries(ctx, model.JobFilter{
|
||||||
Statuses: []string{model.StatusFailed, model.StatusRunning},
|
Statuses: []string{model.StatusFailed, model.StatusRunning},
|
||||||
Limit: 40,
|
Limit: 40,
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -135,6 +135,8 @@ type createBatchResponse struct {
|
|||||||
Jobs []*model.Job `json:"jobs"`
|
Jobs []*model.Job `json:"jobs"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const maxCreateBatchJobs = 1000
|
||||||
|
|
||||||
func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
|
||||||
var req createBatchRequest
|
var req createBatchRequest
|
||||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
@@ -145,17 +147,21 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeError(w, http.StatusBadRequest, "jobs is required")
|
writeError(w, http.StatusBadRequest, "jobs is required")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if len(req.Jobs) > maxCreateBatchJobs {
|
||||||
|
writeError(w, http.StatusBadRequest, fmt.Sprintf("jobs limit is %d", maxCreateBatchJobs))
|
||||||
|
return
|
||||||
|
}
|
||||||
ctx, cancel := contextWithTimeout(r, 20*time.Second)
|
ctx, cancel := contextWithTimeout(r, 20*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
out := createBatchResponse{Jobs: make([]*model.Job, 0, len(req.Jobs))}
|
items := make([]model.CreateJob, 0, len(req.Jobs))
|
||||||
for _, item := range req.Jobs {
|
for _, item := range req.Jobs {
|
||||||
if item.OwnerService == "" {
|
if strings.TrimSpace(item.OwnerService) == "" {
|
||||||
item.OwnerService = req.OwnerService
|
item.OwnerService = req.OwnerService
|
||||||
}
|
}
|
||||||
if item.TaskType == "" {
|
if strings.TrimSpace(item.TaskType) == "" {
|
||||||
item.TaskType = req.TaskType
|
item.TaskType = req.TaskType
|
||||||
}
|
}
|
||||||
if item.ModelProfile == "" {
|
if strings.TrimSpace(item.ModelProfile) == "" {
|
||||||
item.ModelProfile = req.ModelProfile
|
item.ModelProfile = req.ModelProfile
|
||||||
}
|
}
|
||||||
if item.Priority == 0 {
|
if item.Priority == 0 {
|
||||||
@@ -164,7 +170,9 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
|
|||||||
if item.MaxAttempts == 0 {
|
if item.MaxAttempts == 0 {
|
||||||
item.MaxAttempts = req.MaxAttempts
|
item.MaxAttempts = req.MaxAttempts
|
||||||
}
|
}
|
||||||
job, err := s.store.CreateJob(ctx, item)
|
items = append(items, item)
|
||||||
|
}
|
||||||
|
jobs, err := s.store.CreateJobs(ctx, items)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
status := http.StatusInternalServerError
|
status := http.StatusInternalServerError
|
||||||
if isValidationError(err) {
|
if isValidationError(err) {
|
||||||
@@ -173,9 +181,7 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeError(w, status, err.Error())
|
writeError(w, status, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
out.Jobs = append(out.Jobs, job)
|
writeJSON(w, http.StatusCreated, createBatchResponse{Jobs: jobs})
|
||||||
}
|
|
||||||
writeJSON(w, http.StatusCreated, out)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleRetryJobs(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleRetryJobs(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|||||||
13
internal/migrate/sql/004_ai_jobs_queue_indexes.up.sql
Normal file
13
internal/migrate/sql/004_ai_jobs_queue_indexes.up.sql
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
CREATE INDEX IF NOT EXISTS ai_jobs_pending_claim_scope_idx
|
||||||
|
ON ai_jobs (task_type, model_profile, priority DESC, scheduled_at DESC, created_at DESC)
|
||||||
|
WHERE status = 'pending';
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS ai_jobs_running_lease_idx
|
||||||
|
ON ai_jobs ((COALESCE(heartbeat_at, started_at, updated_at)))
|
||||||
|
WHERE status = 'running';
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS ai_jobs_queue_stats_idx
|
||||||
|
ON ai_jobs (task_type, model_profile, status);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS ai_jobs_owner_stats_idx
|
||||||
|
ON ai_jobs (owner_service, task_type, model_profile, status);
|
||||||
2
internal/migrate/sql/005_ai_jobs_recent_status.up.sql
Normal file
2
internal/migrate/sql/005_ai_jobs_recent_status.up.sql
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
CREATE INDEX IF NOT EXISTS ai_jobs_recent_status_idx
|
||||||
|
ON ai_jobs (status, updated_at DESC, created_at DESC);
|
||||||
@@ -39,6 +39,27 @@ type Job struct {
|
|||||||
IdempotencyKey *string `json:"idempotency_key,omitempty"`
|
IdempotencyKey *string `json:"idempotency_key,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type JobSummary struct {
|
||||||
|
ID uuid.UUID `json:"id"`
|
||||||
|
OwnerService string `json:"owner_service"`
|
||||||
|
OwnerRef string `json:"owner_ref"`
|
||||||
|
TaskType string `json:"task_type"`
|
||||||
|
ModelProfile string `json:"model_profile"`
|
||||||
|
Priority int `json:"priority"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Attempts int `json:"attempts"`
|
||||||
|
MaxAttempts int `json:"max_attempts"`
|
||||||
|
ErrorCode *string `json:"error_code,omitempty"`
|
||||||
|
ErrorMessage *string `json:"error_message,omitempty"`
|
||||||
|
ScheduledAt time.Time `json:"scheduled_at"`
|
||||||
|
StartedAt *time.Time `json:"started_at,omitempty"`
|
||||||
|
CompletedAt *time.Time `json:"completed_at,omitempty"`
|
||||||
|
WorkerID *string `json:"worker_id,omitempty"`
|
||||||
|
HeartbeatAt *time.Time `json:"heartbeat_at,omitempty"`
|
||||||
|
CreatedAt time.Time `json:"created_at"`
|
||||||
|
UpdatedAt time.Time `json:"updated_at"`
|
||||||
|
}
|
||||||
|
|
||||||
type CreateJob struct {
|
type CreateJob struct {
|
||||||
OwnerService string `json:"owner_service"`
|
OwnerService string `json:"owner_service"`
|
||||||
OwnerRef string `json:"owner_ref"`
|
OwnerRef string `json:"owner_ref"`
|
||||||
|
|||||||
@@ -69,16 +69,7 @@ func (s *Store) CreateJob(ctx context.Context, in model.CreateJob) (*model.Job,
|
|||||||
if err := validateCreateJob(in); err != nil {
|
if err := validateCreateJob(in); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if in.MaxAttempts <= 0 {
|
normalizeCreateJob(&in)
|
||||||
in.MaxAttempts = 3
|
|
||||||
}
|
|
||||||
if len(in.Input) == 0 {
|
|
||||||
in.Input = json.RawMessage(`{}`)
|
|
||||||
}
|
|
||||||
scheduledAt := time.Now().UTC()
|
|
||||||
if in.ScheduledAt != nil {
|
|
||||||
scheduledAt = in.ScheduledAt.UTC()
|
|
||||||
}
|
|
||||||
|
|
||||||
const q = `
|
const q = `
|
||||||
INSERT INTO ai_jobs (
|
INSERT INTO ai_jobs (
|
||||||
@@ -98,12 +89,81 @@ RETURNING ` + jobSelectColumns + `
|
|||||||
in.Priority,
|
in.Priority,
|
||||||
in.MaxAttempts,
|
in.MaxAttempts,
|
||||||
in.Input,
|
in.Input,
|
||||||
scheduledAt,
|
*in.ScheduledAt,
|
||||||
in.IdempotencyKey,
|
in.IdempotencyKey,
|
||||||
)
|
)
|
||||||
return scanJob(row)
|
return scanJob(row)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) CreateJobs(ctx context.Context, items []model.CreateJob) ([]*model.Job, error) {
|
||||||
|
if len(items) == 0 {
|
||||||
|
return []*model.Job{}, nil
|
||||||
|
}
|
||||||
|
const q = `
|
||||||
|
INSERT INTO ai_jobs (
|
||||||
|
owner_service, owner_ref, task_type, model_profile, priority, max_attempts,
|
||||||
|
input, scheduled_at, idempotency_key
|
||||||
|
)
|
||||||
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
|
||||||
|
ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL
|
||||||
|
DO UPDATE SET updated_at = ai_jobs.updated_at
|
||||||
|
RETURNING ` + jobSelectColumns + `
|
||||||
|
`
|
||||||
|
var batch pgx.Batch
|
||||||
|
for i := range items {
|
||||||
|
if err := validateCreateJob(items[i]); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
normalizeCreateJob(&items[i])
|
||||||
|
batch.Queue(q,
|
||||||
|
items[i].OwnerService,
|
||||||
|
items[i].OwnerRef,
|
||||||
|
items[i].TaskType,
|
||||||
|
items[i].ModelProfile,
|
||||||
|
items[i].Priority,
|
||||||
|
items[i].MaxAttempts,
|
||||||
|
items[i].Input,
|
||||||
|
*items[i].ScheduledAt,
|
||||||
|
items[i].IdempotencyKey,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
br := s.pool.SendBatch(ctx, &batch)
|
||||||
|
batchClosed := false
|
||||||
|
defer func() {
|
||||||
|
if !batchClosed {
|
||||||
|
_ = br.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
out := make([]*model.Job, 0, len(items))
|
||||||
|
for range items {
|
||||||
|
job, err := scanJob(br.QueryRow())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out = append(out, job)
|
||||||
|
}
|
||||||
|
err := br.Close()
|
||||||
|
batchClosed = true
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeCreateJob(in *model.CreateJob) {
|
||||||
|
if in.MaxAttempts <= 0 {
|
||||||
|
in.MaxAttempts = 3
|
||||||
|
}
|
||||||
|
if len(in.Input) == 0 {
|
||||||
|
in.Input = json.RawMessage(`{}`)
|
||||||
|
}
|
||||||
|
scheduledAt := time.Now().UTC()
|
||||||
|
if in.ScheduledAt != nil {
|
||||||
|
scheduledAt = in.ScheduledAt.UTC()
|
||||||
|
}
|
||||||
|
in.ScheduledAt = &scheduledAt
|
||||||
|
}
|
||||||
|
|
||||||
func validateCreateJob(in model.CreateJob) error {
|
func validateCreateJob(in model.CreateJob) error {
|
||||||
switch {
|
switch {
|
||||||
case strings.TrimSpace(in.OwnerService) == "":
|
case strings.TrimSpace(in.OwnerService) == "":
|
||||||
@@ -174,6 +234,48 @@ LIMIT $7 OFFSET $8
|
|||||||
return out, rows.Err()
|
return out, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) ListJobSummaries(ctx context.Context, filter model.JobFilter) ([]*model.JobSummary, error) {
|
||||||
|
normalizeFilter(&filter)
|
||||||
|
const q = `
|
||||||
|
SELECT id, owner_service, owner_ref, task_type, model_profile, priority, status,
|
||||||
|
attempts, max_attempts, error_code, error_message,
|
||||||
|
scheduled_at, started_at, completed_at, worker_id, heartbeat_at,
|
||||||
|
created_at, updated_at
|
||||||
|
FROM ai_jobs
|
||||||
|
WHERE ($1 = '' OR owner_service = $1)
|
||||||
|
AND ($2 = '' OR owner_ref = $2)
|
||||||
|
AND ($3 = '' OR task_type = $3)
|
||||||
|
AND ($4 = '' OR model_profile = $4)
|
||||||
|
AND (cardinality($5::text[]) = 0 OR status = ANY($5::text[]))
|
||||||
|
AND (cardinality($6::text[]) = 0 OR COALESCE(NULLIF(error_code, ''), 'unknown') = ANY($6::text[]))
|
||||||
|
ORDER BY updated_at DESC, created_at DESC
|
||||||
|
LIMIT $7 OFFSET $8
|
||||||
|
`
|
||||||
|
rows, err := s.pool.Query(ctx, q,
|
||||||
|
filter.OwnerService,
|
||||||
|
filter.OwnerRef,
|
||||||
|
filter.TaskType,
|
||||||
|
filter.ModelProfile,
|
||||||
|
filter.Statuses,
|
||||||
|
filter.ErrorCodes,
|
||||||
|
filter.Limit,
|
||||||
|
filter.Offset,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var out []*model.JobSummary
|
||||||
|
for rows.Next() {
|
||||||
|
job, err := scanJobSummary(rows)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out = append(out, job)
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
func normalizeFilter(filter *model.JobFilter) {
|
func normalizeFilter(filter *model.JobFilter) {
|
||||||
filter.OwnerService = strings.TrimSpace(filter.OwnerService)
|
filter.OwnerService = strings.TrimSpace(filter.OwnerService)
|
||||||
filter.OwnerRef = strings.TrimSpace(filter.OwnerRef)
|
filter.OwnerRef = strings.TrimSpace(filter.OwnerRef)
|
||||||
@@ -519,6 +621,34 @@ ORDER BY last_24h DESC, total DESC
|
|||||||
return out, errorRows.Err()
|
return out, errorRows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func scanJobSummary(row pgx.Row) (*model.JobSummary, error) {
|
||||||
|
var job model.JobSummary
|
||||||
|
err := row.Scan(
|
||||||
|
&job.ID,
|
||||||
|
&job.OwnerService,
|
||||||
|
&job.OwnerRef,
|
||||||
|
&job.TaskType,
|
||||||
|
&job.ModelProfile,
|
||||||
|
&job.Priority,
|
||||||
|
&job.Status,
|
||||||
|
&job.Attempts,
|
||||||
|
&job.MaxAttempts,
|
||||||
|
&job.ErrorCode,
|
||||||
|
&job.ErrorMessage,
|
||||||
|
&job.ScheduledAt,
|
||||||
|
&job.StartedAt,
|
||||||
|
&job.CompletedAt,
|
||||||
|
&job.WorkerID,
|
||||||
|
&job.HeartbeatAt,
|
||||||
|
&job.CreatedAt,
|
||||||
|
&job.UpdatedAt,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &job, nil
|
||||||
|
}
|
||||||
|
|
||||||
func scanJob(row pgx.Row) (*model.Job, error) {
|
func scanJob(row pgx.Row) (*model.Job, error) {
|
||||||
var job model.Job
|
var job model.Job
|
||||||
var input []byte
|
var input []byte
|
||||||
|
|||||||
@@ -18,5 +18,7 @@ data:
|
|||||||
AI_STATS_SIDECAR_URL: "http://10.2.3.5:9090"
|
AI_STATS_SIDECAR_URL: "http://10.2.3.5:9090"
|
||||||
AI_STATS_TIMEOUT: "8s"
|
AI_STATS_TIMEOUT: "8s"
|
||||||
WORKER_POLL_INTERVAL: "2s"
|
WORKER_POLL_INTERVAL: "2s"
|
||||||
|
WORKER_HTTP_HOST: "0.0.0.0"
|
||||||
|
WORKER_HTTP_PORT: "8081"
|
||||||
WORKER_CLAIM_LIMIT: "4"
|
WORKER_CLAIM_LIMIT: "4"
|
||||||
WORKER_LEASE_TIMEOUT: "15m"
|
WORKER_LEASE_TIMEOUT: "15m"
|
||||||
|
|||||||
@@ -22,6 +22,8 @@ spec:
|
|||||||
- name: worker
|
- name: worker
|
||||||
image: localhost:30300/admin/ai-service:latest
|
image: localhost:30300/admin/ai-service:latest
|
||||||
command: ["/usr/local/bin/ai-service-worker"]
|
command: ["/usr/local/bin/ai-service-worker"]
|
||||||
|
ports:
|
||||||
|
- containerPort: 8081
|
||||||
env:
|
env:
|
||||||
- name: WORKER_ID
|
- name: WORKER_ID
|
||||||
valueFrom:
|
valueFrom:
|
||||||
@@ -38,6 +40,22 @@ spec:
|
|||||||
name: ai-service-config
|
name: ai-service-config
|
||||||
- secretRef:
|
- secretRef:
|
||||||
name: ai-service-secrets
|
name: ai-service-secrets
|
||||||
|
startupProbe:
|
||||||
|
httpGet:
|
||||||
|
path: /readyz
|
||||||
|
port: 8081
|
||||||
|
periodSeconds: 5
|
||||||
|
failureThreshold: 30
|
||||||
|
readinessProbe:
|
||||||
|
httpGet:
|
||||||
|
path: /readyz
|
||||||
|
port: 8081
|
||||||
|
periodSeconds: 10
|
||||||
|
livenessProbe:
|
||||||
|
httpGet:
|
||||||
|
path: /healthz
|
||||||
|
port: 8081
|
||||||
|
periodSeconds: 10
|
||||||
resources:
|
resources:
|
||||||
requests:
|
requests:
|
||||||
cpu: 50m
|
cpu: 50m
|
||||||
@@ -70,6 +88,8 @@ spec:
|
|||||||
- name: worker
|
- name: worker
|
||||||
image: localhost:30300/admin/ai-service:latest
|
image: localhost:30300/admin/ai-service:latest
|
||||||
command: ["/usr/local/bin/ai-service-worker"]
|
command: ["/usr/local/bin/ai-service-worker"]
|
||||||
|
ports:
|
||||||
|
- containerPort: 8081
|
||||||
env:
|
env:
|
||||||
- name: WORKER_ID
|
- name: WORKER_ID
|
||||||
valueFrom:
|
valueFrom:
|
||||||
@@ -86,6 +106,22 @@ spec:
|
|||||||
name: ai-service-config
|
name: ai-service-config
|
||||||
- secretRef:
|
- secretRef:
|
||||||
name: ai-service-secrets
|
name: ai-service-secrets
|
||||||
|
startupProbe:
|
||||||
|
httpGet:
|
||||||
|
path: /readyz
|
||||||
|
port: 8081
|
||||||
|
periodSeconds: 5
|
||||||
|
failureThreshold: 30
|
||||||
|
readinessProbe:
|
||||||
|
httpGet:
|
||||||
|
path: /readyz
|
||||||
|
port: 8081
|
||||||
|
periodSeconds: 10
|
||||||
|
livenessProbe:
|
||||||
|
httpGet:
|
||||||
|
path: /healthz
|
||||||
|
port: 8081
|
||||||
|
periodSeconds: 10
|
||||||
resources:
|
resources:
|
||||||
requests:
|
requests:
|
||||||
cpu: 50m
|
cpu: 50m
|
||||||
|
|||||||
4
migrations/004_ai_jobs_queue_indexes.down.sql
Normal file
4
migrations/004_ai_jobs_queue_indexes.down.sql
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
DROP INDEX IF EXISTS ai_jobs_owner_stats_idx;
|
||||||
|
DROP INDEX IF EXISTS ai_jobs_queue_stats_idx;
|
||||||
|
DROP INDEX IF EXISTS ai_jobs_running_lease_idx;
|
||||||
|
DROP INDEX IF EXISTS ai_jobs_pending_claim_scope_idx;
|
||||||
13
migrations/004_ai_jobs_queue_indexes.up.sql
Normal file
13
migrations/004_ai_jobs_queue_indexes.up.sql
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
CREATE INDEX IF NOT EXISTS ai_jobs_pending_claim_scope_idx
|
||||||
|
ON ai_jobs (task_type, model_profile, priority DESC, scheduled_at DESC, created_at DESC)
|
||||||
|
WHERE status = 'pending';
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS ai_jobs_running_lease_idx
|
||||||
|
ON ai_jobs ((COALESCE(heartbeat_at, started_at, updated_at)))
|
||||||
|
WHERE status = 'running';
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS ai_jobs_queue_stats_idx
|
||||||
|
ON ai_jobs (task_type, model_profile, status);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS ai_jobs_owner_stats_idx
|
||||||
|
ON ai_jobs (owner_service, task_type, model_profile, status);
|
||||||
1
migrations/005_ai_jobs_recent_status.down.sql
Normal file
1
migrations/005_ai_jobs_recent_status.down.sql
Normal file
@@ -0,0 +1 @@
|
|||||||
|
DROP INDEX IF EXISTS ai_jobs_recent_status_idx;
|
||||||
2
migrations/005_ai_jobs_recent_status.up.sql
Normal file
2
migrations/005_ai_jobs_recent_status.up.sql
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
CREATE INDEX IF NOT EXISTS ai_jobs_recent_status_idx
|
||||||
|
ON ai_jobs (status, updated_at DESC, created_at DESC);
|
||||||
Reference in New Issue
Block a user