package httpapi import ( "encoding/json" "errors" "fmt" "net/http" "strings" "sync" "time" "ai-service/internal/config" "ai-service/internal/model" "ai-service/internal/store" "github.com/google/uuid" ) type Server struct { store *store.Store cfg config.Config providerMu sync.Mutex providerOKs map[string]providerHealthSnapshot } func NewServer(store *store.Store, cfg config.Config) http.Handler { return &Server{store: store, cfg: cfg, providerOKs: make(map[string]providerHealthSnapshot)} } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { path := strings.TrimSuffix(r.URL.Path, "/") if path == "" { path = "/" } if !s.requireAPIToken(path, r) { writeError(w, http.StatusUnauthorized, "unauthorized") return } switch { case r.Method == http.MethodGet && path == "/healthz": writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) case r.Method == http.MethodGet && path == "/readyz": s.handleReady(w, r) case r.Method == http.MethodGet && path == "/health/detail": s.handleHealthDetail(w, r) case r.Method == http.MethodGet && path == "/": writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"}) case r.Method == http.MethodPost && path == "/api/v1/jobs": s.handleCreateJob(w, r) case r.Method == http.MethodGet && path == "/api/v1/jobs": s.handleListJobs(w, r) case r.Method == http.MethodPost && path == "/api/v1/jobs/batch": s.handleCreateBatch(w, r) case r.Method == http.MethodPost && path == "/api/v1/jobs/retry": s.handleRetryJobs(w, r) case r.Method == http.MethodPost && path == "/api/v1/jobs/cancel": s.handleCancelJobs(w, r) case r.Method == http.MethodPost && path == "/api/v1/jobs/claim": s.handleClaimJobs(w, r) case r.Method == http.MethodGet && strings.HasPrefix(path, "/api/v1/jobs/"): s.handleGetJob(w, r, path) case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"): s.handleRetryJob(w, r, path) case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/cancel"): s.handleCancelJob(w, r, path) case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"): s.handleCompleteJob(w, r, path) case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"): s.handleFailJob(w, r, path) case r.Method == http.MethodGet && path == "/api/v1/stats": s.handleStats(w, r) case r.Method == http.MethodGet && path == "/api/v1/providers/status": s.handleProviderStatus(w, r) case r.Method == http.MethodGet && path == "/api/v1/infra/status": s.handleInfraStatus(w, r) case r.Method == http.MethodGet && path == "/api/v1/dashboard": s.handleDashboard(w, r) default: writeError(w, http.StatusNotFound, "not found") } } func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) { ctx, cancel := contextWithTimeout(r, 3*time.Second) defer cancel() if err := s.store.Ping(ctx); err != nil { writeError(w, http.StatusServiceUnavailable, err.Error()) return } writeJSON(w, http.StatusOK, map[string]string{"status": "ready"}) } func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) { var req model.CreateJob if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "bad json") return } ctx, cancel := contextWithTimeout(r, 8*time.Second) defer cancel() job, err := s.store.CreateJob(ctx, req) if err != nil { status := http.StatusInternalServerError if isValidationError(err) { status = http.StatusBadRequest } writeError(w, status, err.Error()) return } writeJSON(w, http.StatusCreated, job) } type listJobsResponse struct { Jobs []*model.Job `json:"jobs"` } func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) { filter := jobFilterFromQuery(r) ctx, cancel := contextWithTimeout(r, 8*time.Second) defer cancel() jobs, err := s.store.ListJobs(ctx, filter) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, http.StatusOK, listJobsResponse{Jobs: jobs}) } type createBatchRequest struct { OwnerService string `json:"owner_service"` TaskType string `json:"task_type"` ModelProfile string `json:"model_profile"` Priority int `json:"priority"` MaxAttempts int `json:"max_attempts"` Jobs []model.CreateJob `json:"jobs"` } type createBatchResponse struct { Jobs []*model.Job `json:"jobs"` } const maxCreateBatchJobs = 1000 func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) { var req createBatchRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "bad json") return } if len(req.Jobs) == 0 { writeError(w, http.StatusBadRequest, "jobs is required") return } if len(req.Jobs) > maxCreateBatchJobs { writeError(w, http.StatusBadRequest, fmt.Sprintf("jobs limit is %d", maxCreateBatchJobs)) return } ctx, cancel := contextWithTimeout(r, 20*time.Second) defer cancel() items := make([]model.CreateJob, 0, len(req.Jobs)) for _, item := range req.Jobs { if strings.TrimSpace(item.OwnerService) == "" { item.OwnerService = req.OwnerService } if strings.TrimSpace(item.TaskType) == "" { item.TaskType = req.TaskType } if strings.TrimSpace(item.ModelProfile) == "" { item.ModelProfile = req.ModelProfile } if item.Priority == 0 { item.Priority = req.Priority } if item.MaxAttempts == 0 { item.MaxAttempts = req.MaxAttempts } items = append(items, item) } jobs, err := s.store.CreateJobs(ctx, items) if err != nil { status := http.StatusInternalServerError if isValidationError(err) { status = http.StatusBadRequest } writeError(w, status, err.Error()) return } writeJSON(w, http.StatusCreated, createBatchResponse{Jobs: jobs}) } func (s *Server) handleRetryJobs(w http.ResponseWriter, r *http.Request) { var req model.JobFilter if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "bad json") return } if !hasAnyFilter(req) { writeError(w, http.StatusBadRequest, "at least one filter is required") return } ctx, cancel := contextWithTimeout(r, 20*time.Second) defer cancel() updated, err := s.store.RetryJobs(ctx, req) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, http.StatusOK, model.BatchActionResult{Updated: updated}) } func (s *Server) handleCancelJobs(w http.ResponseWriter, r *http.Request) { var req model.JobFilter if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "bad json") return } if !hasAnyFilter(req) { writeError(w, http.StatusBadRequest, "at least one filter is required") return } ctx, cancel := contextWithTimeout(r, 20*time.Second) defer cancel() updated, err := s.store.CancelJobs(ctx, req) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, http.StatusOK, model.BatchActionResult{Updated: updated}) } type claimJobsResponse struct { Jobs []*model.Job `json:"jobs"` } func (s *Server) handleClaimJobs(w http.ResponseWriter, r *http.Request) { var req model.ClaimJobs if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "bad json") return } ctx, cancel := contextWithTimeout(r, 8*time.Second) defer cancel() jobs, err := s.store.ClaimJobs(ctx, req) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, http.StatusOK, claimJobsResponse{Jobs: jobs}) } func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request, path string) { id, err := jobIDFromPath(path, false) if err != nil { writeError(w, http.StatusBadRequest, err.Error()) return } ctx, cancel := contextWithTimeout(r, 8*time.Second) defer cancel() job, err := s.store.GetJob(ctx, id) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } if job == nil { writeError(w, http.StatusNotFound, "job not found") return } writeJSON(w, http.StatusOK, job) } func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) { id, err := jobIDFromActionPath(path, "retry") if err != nil { writeError(w, http.StatusBadRequest, err.Error()) return } ctx, cancel := contextWithTimeout(r, 8*time.Second) defer cancel() job, err := s.store.RetryJob(ctx, id) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } if job == nil { writeError(w, http.StatusNotFound, "retryable job not found") return } writeJSON(w, http.StatusOK, job) } func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request, path string) { id, err := jobIDFromActionPath(path, "cancel") if err != nil { writeError(w, http.StatusBadRequest, err.Error()) return } ctx, cancel := contextWithTimeout(r, 8*time.Second) defer cancel() job, err := s.store.CancelJob(ctx, id) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } if job == nil { writeError(w, http.StatusNotFound, "cancellable job not found") return } writeJSON(w, http.StatusOK, job) } func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) { id, err := jobIDFromActionPath(path, "complete") if err != nil { writeError(w, http.StatusBadRequest, err.Error()) return } var req model.CompleteJob if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "bad json") return } ctx, cancel := contextWithTimeout(r, 8*time.Second) defer cancel() job, err := s.store.CompleteJob(ctx, id, req) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } if job == nil { writeError(w, http.StatusNotFound, "running job not found") return } writeJSON(w, http.StatusOK, job) } func (s *Server) handleFailJob(w http.ResponseWriter, r *http.Request, path string) { id, err := jobIDFromActionPath(path, "fail") if err != nil { writeError(w, http.StatusBadRequest, err.Error()) return } var req model.FailJob if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "bad json") return } ctx, cancel := contextWithTimeout(r, 8*time.Second) defer cancel() job, err := s.store.FailJob(ctx, id, req) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } if job == nil { writeError(w, http.StatusNotFound, "running job not found") return } writeJSON(w, http.StatusOK, job) } func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) { ctx, cancel := contextWithTimeout(r, 8*time.Second) defer cancel() stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, http.StatusOK, stats) } func jobIDFromPath(path string, retry bool) (uuid.UUID, error) { raw := strings.TrimPrefix(path, "/api/v1/jobs/") if retry { raw = strings.TrimSuffix(raw, "/retry") } id, err := uuid.Parse(strings.Trim(raw, "/")) if err != nil { return uuid.Nil, errors.New("bad job id") } return id, nil } func jobIDFromActionPath(path string, action string) (uuid.UUID, error) { raw := strings.TrimPrefix(path, "/api/v1/jobs/") raw = strings.TrimSuffix(raw, "/"+action) id, err := uuid.Parse(strings.Trim(raw, "/")) if err != nil { return uuid.Nil, errors.New("bad job id") } return id, nil } func isValidationError(err error) bool { msg := err.Error() return strings.Contains(msg, " is required") } func jobFilterFromQuery(r *http.Request) model.JobFilter { q := r.URL.Query() return model.JobFilter{ OwnerService: q.Get("owner_service"), OwnerRef: q.Get("owner_ref"), TaskType: q.Get("task_type"), ModelProfile: q.Get("model_profile"), Statuses: splitCSV(q.Get("status")), ErrorCodes: splitCSV(q.Get("error_code")), Limit: parseIntDefault(q.Get("limit"), 100), Offset: parseIntDefault(q.Get("offset"), 0), } } func splitCSV(raw string) []string { if strings.TrimSpace(raw) == "" { return nil } parts := strings.Split(raw, ",") out := make([]string, 0, len(parts)) for _, part := range parts { if v := strings.TrimSpace(part); v != "" { out = append(out, v) } } return out } func parseIntDefault(raw string, fallback int) int { if strings.TrimSpace(raw) == "" { return fallback } var out int if _, err := fmt.Sscanf(raw, "%d", &out); err != nil { return fallback } return out } func hasAnyFilter(filter model.JobFilter) bool { return strings.TrimSpace(filter.OwnerService) != "" || strings.TrimSpace(filter.OwnerRef) != "" || strings.TrimSpace(filter.TaskType) != "" || strings.TrimSpace(filter.ModelProfile) != "" || len(filter.Statuses) > 0 || len(filter.ErrorCodes) > 0 }