Batch create AI jobs efficiently
This commit is contained in:
@@ -135,6 +135,8 @@ type createBatchResponse struct {
|
|||||||
Jobs []*model.Job `json:"jobs"`
|
Jobs []*model.Job `json:"jobs"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const maxCreateBatchJobs = 1000
|
||||||
|
|
||||||
func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
|
||||||
var req createBatchRequest
|
var req createBatchRequest
|
||||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
@@ -145,17 +147,21 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeError(w, http.StatusBadRequest, "jobs is required")
|
writeError(w, http.StatusBadRequest, "jobs is required")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if len(req.Jobs) > maxCreateBatchJobs {
|
||||||
|
writeError(w, http.StatusBadRequest, fmt.Sprintf("jobs limit is %d", maxCreateBatchJobs))
|
||||||
|
return
|
||||||
|
}
|
||||||
ctx, cancel := contextWithTimeout(r, 20*time.Second)
|
ctx, cancel := contextWithTimeout(r, 20*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
out := createBatchResponse{Jobs: make([]*model.Job, 0, len(req.Jobs))}
|
items := make([]model.CreateJob, 0, len(req.Jobs))
|
||||||
for _, item := range req.Jobs {
|
for _, item := range req.Jobs {
|
||||||
if item.OwnerService == "" {
|
if strings.TrimSpace(item.OwnerService) == "" {
|
||||||
item.OwnerService = req.OwnerService
|
item.OwnerService = req.OwnerService
|
||||||
}
|
}
|
||||||
if item.TaskType == "" {
|
if strings.TrimSpace(item.TaskType) == "" {
|
||||||
item.TaskType = req.TaskType
|
item.TaskType = req.TaskType
|
||||||
}
|
}
|
||||||
if item.ModelProfile == "" {
|
if strings.TrimSpace(item.ModelProfile) == "" {
|
||||||
item.ModelProfile = req.ModelProfile
|
item.ModelProfile = req.ModelProfile
|
||||||
}
|
}
|
||||||
if item.Priority == 0 {
|
if item.Priority == 0 {
|
||||||
@@ -164,7 +170,9 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
|
|||||||
if item.MaxAttempts == 0 {
|
if item.MaxAttempts == 0 {
|
||||||
item.MaxAttempts = req.MaxAttempts
|
item.MaxAttempts = req.MaxAttempts
|
||||||
}
|
}
|
||||||
job, err := s.store.CreateJob(ctx, item)
|
items = append(items, item)
|
||||||
|
}
|
||||||
|
jobs, err := s.store.CreateJobs(ctx, items)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
status := http.StatusInternalServerError
|
status := http.StatusInternalServerError
|
||||||
if isValidationError(err) {
|
if isValidationError(err) {
|
||||||
@@ -173,9 +181,7 @@ func (s *Server) handleCreateBatch(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeError(w, status, err.Error())
|
writeError(w, status, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
out.Jobs = append(out.Jobs, job)
|
writeJSON(w, http.StatusCreated, createBatchResponse{Jobs: jobs})
|
||||||
}
|
|
||||||
writeJSON(w, http.StatusCreated, out)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleRetryJobs(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleRetryJobs(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|||||||
@@ -69,16 +69,7 @@ func (s *Store) CreateJob(ctx context.Context, in model.CreateJob) (*model.Job,
|
|||||||
if err := validateCreateJob(in); err != nil {
|
if err := validateCreateJob(in); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if in.MaxAttempts <= 0 {
|
normalizeCreateJob(&in)
|
||||||
in.MaxAttempts = 3
|
|
||||||
}
|
|
||||||
if len(in.Input) == 0 {
|
|
||||||
in.Input = json.RawMessage(`{}`)
|
|
||||||
}
|
|
||||||
scheduledAt := time.Now().UTC()
|
|
||||||
if in.ScheduledAt != nil {
|
|
||||||
scheduledAt = in.ScheduledAt.UTC()
|
|
||||||
}
|
|
||||||
|
|
||||||
const q = `
|
const q = `
|
||||||
INSERT INTO ai_jobs (
|
INSERT INTO ai_jobs (
|
||||||
@@ -98,12 +89,81 @@ RETURNING ` + jobSelectColumns + `
|
|||||||
in.Priority,
|
in.Priority,
|
||||||
in.MaxAttempts,
|
in.MaxAttempts,
|
||||||
in.Input,
|
in.Input,
|
||||||
scheduledAt,
|
*in.ScheduledAt,
|
||||||
in.IdempotencyKey,
|
in.IdempotencyKey,
|
||||||
)
|
)
|
||||||
return scanJob(row)
|
return scanJob(row)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) CreateJobs(ctx context.Context, items []model.CreateJob) ([]*model.Job, error) {
|
||||||
|
if len(items) == 0 {
|
||||||
|
return []*model.Job{}, nil
|
||||||
|
}
|
||||||
|
const q = `
|
||||||
|
INSERT INTO ai_jobs (
|
||||||
|
owner_service, owner_ref, task_type, model_profile, priority, max_attempts,
|
||||||
|
input, scheduled_at, idempotency_key
|
||||||
|
)
|
||||||
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
|
||||||
|
ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL
|
||||||
|
DO UPDATE SET updated_at = ai_jobs.updated_at
|
||||||
|
RETURNING ` + jobSelectColumns + `
|
||||||
|
`
|
||||||
|
var batch pgx.Batch
|
||||||
|
for i := range items {
|
||||||
|
if err := validateCreateJob(items[i]); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
normalizeCreateJob(&items[i])
|
||||||
|
batch.Queue(q,
|
||||||
|
items[i].OwnerService,
|
||||||
|
items[i].OwnerRef,
|
||||||
|
items[i].TaskType,
|
||||||
|
items[i].ModelProfile,
|
||||||
|
items[i].Priority,
|
||||||
|
items[i].MaxAttempts,
|
||||||
|
items[i].Input,
|
||||||
|
*items[i].ScheduledAt,
|
||||||
|
items[i].IdempotencyKey,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
br := s.pool.SendBatch(ctx, &batch)
|
||||||
|
batchClosed := false
|
||||||
|
defer func() {
|
||||||
|
if !batchClosed {
|
||||||
|
_ = br.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
out := make([]*model.Job, 0, len(items))
|
||||||
|
for range items {
|
||||||
|
job, err := scanJob(br.QueryRow())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out = append(out, job)
|
||||||
|
}
|
||||||
|
err := br.Close()
|
||||||
|
batchClosed = true
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeCreateJob(in *model.CreateJob) {
|
||||||
|
if in.MaxAttempts <= 0 {
|
||||||
|
in.MaxAttempts = 3
|
||||||
|
}
|
||||||
|
if len(in.Input) == 0 {
|
||||||
|
in.Input = json.RawMessage(`{}`)
|
||||||
|
}
|
||||||
|
scheduledAt := time.Now().UTC()
|
||||||
|
if in.ScheduledAt != nil {
|
||||||
|
scheduledAt = in.ScheduledAt.UTC()
|
||||||
|
}
|
||||||
|
in.ScheduledAt = &scheduledAt
|
||||||
|
}
|
||||||
|
|
||||||
func validateCreateJob(in model.CreateJob) error {
|
func validateCreateJob(in model.CreateJob) error {
|
||||||
switch {
|
switch {
|
||||||
case strings.TrimSpace(in.OwnerService) == "":
|
case strings.TrimSpace(in.OwnerService) == "":
|
||||||
|
|||||||
Reference in New Issue
Block a user