416 lines
12 KiB
Go
416 lines
12 KiB
Go
package httpapi
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"ai-service/internal/config"
|
|
"ai-service/internal/model"
|
|
"ai-service/internal/store"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
type Server struct {
|
|
store *store.Store
|
|
cfg config.Config
|
|
providerMu sync.Mutex
|
|
providerOKs map[string]providerHealthSnapshot
|
|
}
|
|
|
|
func NewServer(store *store.Store, cfg config.Config) http.Handler {
|
|
return &Server{store: store, cfg: cfg, providerOKs: make(map[string]providerHealthSnapshot)}
|
|
}
|
|
|
|
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
path := strings.TrimSuffix(r.URL.Path, "/")
|
|
if path == "" {
|
|
path = "/"
|
|
}
|
|
if !s.requireAPIToken(path, r) {
|
|
writeError(w, http.StatusUnauthorized, "unauthorized")
|
|
return
|
|
}
|
|
switch {
|
|
case r.Method == http.MethodGet && path == "/healthz":
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
|
case r.Method == http.MethodGet && path == "/readyz":
|
|
s.handleReady(w, r)
|
|
case r.Method == http.MethodGet && path == "/":
|
|
writeJSON(w, http.StatusOK, map[string]string{"service": "ai-service"})
|
|
case r.Method == http.MethodPost && path == "/api/v1/jobs":
|
|
s.handleCreateJob(w, r)
|
|
case r.Method == http.MethodGet && path == "/api/v1/jobs":
|
|
s.handleListJobs(w, r)
|
|
case r.Method == http.MethodPost && path == "/api/v1/jobs/batch":
|
|
s.handleCreateBatch(w, r)
|
|
case r.Method == http.MethodPost && path == "/api/v1/jobs/retry":
|
|
s.handleRetryJobs(w, r)
|
|
case r.Method == http.MethodPost && path == "/api/v1/jobs/cancel":
|
|
s.handleCancelJobs(w, r)
|
|
case r.Method == http.MethodPost && path == "/api/v1/jobs/claim":
|
|
s.handleClaimJobs(w, r)
|
|
case r.Method == http.MethodGet && strings.HasPrefix(path, "/api/v1/jobs/"):
|
|
s.handleGetJob(w, r, path)
|
|
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/retry"):
|
|
s.handleRetryJob(w, r, path)
|
|
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/complete"):
|
|
s.handleCompleteJob(w, r, path)
|
|
case r.Method == http.MethodPost && strings.HasPrefix(path, "/api/v1/jobs/") && strings.HasSuffix(path, "/fail"):
|
|
s.handleFailJob(w, r, path)
|
|
case r.Method == http.MethodGet && path == "/api/v1/stats":
|
|
s.handleStats(w, r)
|
|
case r.Method == http.MethodGet && path == "/api/v1/providers/status":
|
|
s.handleProviderStatus(w, r)
|
|
case r.Method == http.MethodGet && path == "/api/v1/infra/status":
|
|
s.handleInfraStatus(w, r)
|
|
case r.Method == http.MethodGet && path == "/api/v1/dashboard":
|
|
s.handleDashboard(w, r)
|
|
default:
|
|
writeError(w, http.StatusNotFound, "not found")
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) {
|
|
ctx, cancel := contextWithTimeout(r, 3*time.Second)
|
|
defer cancel()
|
|
if err := s.store.Ping(ctx); err != nil {
|
|
writeError(w, http.StatusServiceUnavailable, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ready"})
|
|
}
|
|
|
|
func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) {
|
|
var req model.CreateJob
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "bad json")
|
|
return
|
|
}
|
|
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
|
defer cancel()
|
|
job, err := s.store.CreateJob(ctx, req)
|
|
if err != nil {
|
|
status := http.StatusInternalServerError
|
|
if isValidationError(err) {
|
|
status = http.StatusBadRequest
|
|
}
|
|
writeError(w, status, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusCreated, job)
|
|
}
|
|
|
|
type listJobsResponse struct {
|
|
Jobs []*model.Job `json:"jobs"`
|
|
}
|
|
|
|
func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) {
|
|
filter := jobFilterFromQuery(r)
|
|
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
|
defer cancel()
|
|
jobs, err := s.store.ListJobs(ctx, filter)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, listJobsResponse{Jobs: jobs})
|
|
}
|
|
|
|
type createBatchRequest struct {
|
|
OwnerService string `json:"owner_service"`
|
|
TaskType string `json:"task_type"`
|
|
ModelProfile string `json:"model_profile"`
|
|
Priority int `json:"priority"`
|
|
MaxAttempts int `json:"max_attempts"`
|
|
Jobs []model.CreateJob `json:"jobs"`
|
|
}
|
|
|
|
type createBatchResponse struct {
|
|
Jobs []*model.Job `json:"jobs"`
|
|
}
|
|
|
|
func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
|
|
var req createBatchRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "bad json")
|
|
return
|
|
}
|
|
if len(req.Jobs) == 0 {
|
|
writeError(w, http.StatusBadRequest, "jobs is required")
|
|
return
|
|
}
|
|
ctx, cancel := contextWithTimeout(r, 20*time.Second)
|
|
defer cancel()
|
|
out := createBatchResponse{Jobs: make([]*model.Job, 0, len(req.Jobs))}
|
|
for _, item := range req.Jobs {
|
|
if item.OwnerService == "" {
|
|
item.OwnerService = req.OwnerService
|
|
}
|
|
if item.TaskType == "" {
|
|
item.TaskType = req.TaskType
|
|
}
|
|
if item.ModelProfile == "" {
|
|
item.ModelProfile = req.ModelProfile
|
|
}
|
|
if item.Priority == 0 {
|
|
item.Priority = req.Priority
|
|
}
|
|
if item.MaxAttempts == 0 {
|
|
item.MaxAttempts = req.MaxAttempts
|
|
}
|
|
job, err := s.store.CreateJob(ctx, item)
|
|
if err != nil {
|
|
status := http.StatusInternalServerError
|
|
if isValidationError(err) {
|
|
status = http.StatusBadRequest
|
|
}
|
|
writeError(w, status, err.Error())
|
|
return
|
|
}
|
|
out.Jobs = append(out.Jobs, job)
|
|
}
|
|
writeJSON(w, http.StatusCreated, out)
|
|
}
|
|
|
|
func (s *Server) handleRetryJobs(w http.ResponseWriter, r *http.Request) {
|
|
var req model.JobFilter
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "bad json")
|
|
return
|
|
}
|
|
if !hasAnyFilter(req) {
|
|
writeError(w, http.StatusBadRequest, "at least one filter is required")
|
|
return
|
|
}
|
|
ctx, cancel := contextWithTimeout(r, 20*time.Second)
|
|
defer cancel()
|
|
updated, err := s.store.RetryJobs(ctx, req)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, model.BatchActionResult{Updated: updated})
|
|
}
|
|
|
|
func (s *Server) handleCancelJobs(w http.ResponseWriter, r *http.Request) {
|
|
var req model.JobFilter
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "bad json")
|
|
return
|
|
}
|
|
if !hasAnyFilter(req) {
|
|
writeError(w, http.StatusBadRequest, "at least one filter is required")
|
|
return
|
|
}
|
|
ctx, cancel := contextWithTimeout(r, 20*time.Second)
|
|
defer cancel()
|
|
updated, err := s.store.CancelJobs(ctx, req)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, model.BatchActionResult{Updated: updated})
|
|
}
|
|
|
|
type claimJobsResponse struct {
|
|
Jobs []*model.Job `json:"jobs"`
|
|
}
|
|
|
|
func (s *Server) handleClaimJobs(w http.ResponseWriter, r *http.Request) {
|
|
var req model.ClaimJobs
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "bad json")
|
|
return
|
|
}
|
|
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
|
defer cancel()
|
|
jobs, err := s.store.ClaimJobs(ctx, req)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, claimJobsResponse{Jobs: jobs})
|
|
}
|
|
|
|
func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request, path string) {
|
|
id, err := jobIDFromPath(path, false)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
|
defer cancel()
|
|
job, err := s.store.GetJob(ctx, id)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
if job == nil {
|
|
writeError(w, http.StatusNotFound, "job not found")
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, job)
|
|
}
|
|
|
|
func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request, path string) {
|
|
id, err := jobIDFromPath(path, true)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
|
defer cancel()
|
|
job, err := s.store.RetryJob(ctx, id)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
if job == nil {
|
|
writeError(w, http.StatusNotFound, "retryable job not found")
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, job)
|
|
}
|
|
|
|
func (s *Server) handleCompleteJob(w http.ResponseWriter, r *http.Request, path string) {
|
|
id, err := jobIDFromActionPath(path, "complete")
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
var req model.CompleteJob
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "bad json")
|
|
return
|
|
}
|
|
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
|
defer cancel()
|
|
job, err := s.store.CompleteJob(ctx, id, req)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
if job == nil {
|
|
writeError(w, http.StatusNotFound, "running job not found")
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, job)
|
|
}
|
|
|
|
func (s *Server) handleFailJob(w http.ResponseWriter, r *http.Request, path string) {
|
|
id, err := jobIDFromActionPath(path, "fail")
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
var req model.FailJob
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "bad json")
|
|
return
|
|
}
|
|
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
|
defer cancel()
|
|
job, err := s.store.FailJob(ctx, id, req)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
if job == nil {
|
|
writeError(w, http.StatusNotFound, "running job not found")
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, job)
|
|
}
|
|
|
|
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
|
|
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
|
defer cancel()
|
|
stats, err := s.store.Stats(ctx)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, stats)
|
|
}
|
|
|
|
func jobIDFromPath(path string, retry bool) (uuid.UUID, error) {
|
|
raw := strings.TrimPrefix(path, "/api/v1/jobs/")
|
|
if retry {
|
|
raw = strings.TrimSuffix(raw, "/retry")
|
|
}
|
|
id, err := uuid.Parse(strings.Trim(raw, "/"))
|
|
if err != nil {
|
|
return uuid.Nil, errors.New("bad job id")
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
func jobIDFromActionPath(path string, action string) (uuid.UUID, error) {
|
|
raw := strings.TrimPrefix(path, "/api/v1/jobs/")
|
|
raw = strings.TrimSuffix(raw, "/"+action)
|
|
id, err := uuid.Parse(strings.Trim(raw, "/"))
|
|
if err != nil {
|
|
return uuid.Nil, errors.New("bad job id")
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
func isValidationError(err error) bool {
|
|
msg := err.Error()
|
|
return strings.Contains(msg, " is required")
|
|
}
|
|
|
|
func jobFilterFromQuery(r *http.Request) model.JobFilter {
|
|
q := r.URL.Query()
|
|
return model.JobFilter{
|
|
OwnerService: q.Get("owner_service"),
|
|
OwnerRef: q.Get("owner_ref"),
|
|
TaskType: q.Get("task_type"),
|
|
ModelProfile: q.Get("model_profile"),
|
|
Statuses: splitCSV(q.Get("status")),
|
|
ErrorCodes: splitCSV(q.Get("error_code")),
|
|
Limit: parseIntDefault(q.Get("limit"), 100),
|
|
Offset: parseIntDefault(q.Get("offset"), 0),
|
|
}
|
|
}
|
|
|
|
func splitCSV(raw string) []string {
|
|
if strings.TrimSpace(raw) == "" {
|
|
return nil
|
|
}
|
|
parts := strings.Split(raw, ",")
|
|
out := make([]string, 0, len(parts))
|
|
for _, part := range parts {
|
|
if v := strings.TrimSpace(part); v != "" {
|
|
out = append(out, v)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func parseIntDefault(raw string, fallback int) int {
|
|
if strings.TrimSpace(raw) == "" {
|
|
return fallback
|
|
}
|
|
var out int
|
|
if _, err := fmt.Sscanf(raw, "%d", &out); err != nil {
|
|
return fallback
|
|
}
|
|
return out
|
|
}
|
|
|
|
func hasAnyFilter(filter model.JobFilter) bool {
|
|
return strings.TrimSpace(filter.OwnerService) != "" ||
|
|
strings.TrimSpace(filter.OwnerRef) != "" ||
|
|
strings.TrimSpace(filter.TaskType) != "" ||
|
|
strings.TrimSpace(filter.ModelProfile) != "" ||
|
|
len(filter.Statuses) > 0 ||
|
|
len(filter.ErrorCodes) > 0
|
|
}
|