Files
ai-service/internal/worker/worker.go
Grendgi e9792274a4
All checks were successful
CI / test (push) Successful in 14s
Build and Deploy / build-and-deploy (push) Successful in 23s
Process call analysis jobs in AI worker
2026-06-08 14:33:01 +03:00

134 lines
3.4 KiB
Go

package worker
import (
"context"
"encoding/json"
"log/slog"
"strings"
"time"
"ai-service/internal/llm"
"ai-service/internal/model"
"ai-service/internal/store"
)
const (
TaskLLMChat = "llm_chat"
TaskChatCompletion = "chat_completion"
TaskCallAnalysis = "call_analysis"
)
type Worker struct {
store *store.Store
llm *llm.Client
workerID string
modelProfile string
pollInterval time.Duration
claimLimit int
leaseTimeout time.Duration
}
func New(store *store.Store, llmClient *llm.Client, workerID, modelProfile string, pollInterval, leaseTimeout time.Duration, claimLimit int) *Worker {
if pollInterval <= 0 {
pollInterval = 2 * time.Second
}
if leaseTimeout <= 0 {
leaseTimeout = 15 * time.Minute
}
if claimLimit <= 0 {
claimLimit = 4
}
if strings.TrimSpace(workerID) == "" {
workerID = "ai-service-worker"
}
return &Worker{
store: store,
llm: llmClient,
workerID: workerID,
modelProfile: modelProfile,
pollInterval: pollInterval,
claimLimit: claimLimit,
leaseTimeout: leaseTimeout,
}
}
func (w *Worker) Run(ctx context.Context) {
ticker := time.NewTicker(w.pollInterval)
defer ticker.Stop()
for {
w.tick(ctx)
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}
func (w *Worker) tick(ctx context.Context) {
if reset, err := w.store.RequeueStaleRunning(ctx, w.leaseTimeout, 100); err != nil {
slog.Error("requeue stale jobs failed", "error", err)
} else if reset > 0 {
slog.Warn("requeued stale jobs", "count", reset)
}
jobs, err := w.store.ClaimJobs(ctx, model.ClaimJobs{
WorkerID: w.workerID,
TaskTypes: []string{TaskLLMChat, TaskChatCompletion, TaskCallAnalysis},
ModelProfiles: []string{w.modelProfile},
Limit: w.claimLimit,
})
if err != nil {
slog.Error("claim jobs failed", "error", err)
return
}
for _, job := range jobs {
w.process(ctx, job)
}
}
func (w *Worker) process(ctx context.Context, job *model.Job) {
var input llm.ChatInput
if err := json.Unmarshal(job.Input, &input); err != nil {
w.fail(ctx, job, "bad_input", err.Error())
return
}
result, err := w.llm.Chat(ctx, input)
if err != nil {
w.fail(ctx, job, classifyLLMError(err), err.Error())
return
}
body, err := json.Marshal(result)
if err != nil {
w.fail(ctx, job, "bad_response", err.Error())
return
}
if _, err := w.store.CompleteJob(ctx, job.ID, model.CompleteJob{Result: body}); err != nil {
slog.Error("complete job failed", "job_id", job.ID, "error", err)
}
}
func (w *Worker) fail(ctx context.Context, job *model.Job, code, message string) {
if _, err := w.store.FailJob(ctx, job.ID, model.FailJob{ErrorCode: code, ErrorMessage: message}); err != nil {
slog.Error("fail job failed", "job_id", job.ID, "error", err)
}
}
func classifyLLMError(err error) string {
if err == nil {
return "unknown"
}
s := strings.ToLower(err.Error())
switch {
case strings.Contains(s, "context deadline exceeded") || strings.Contains(s, "timeout"):
return "timeout"
case strings.Contains(s, "connection refused") || strings.Contains(s, "connection reset") || strings.Contains(s, "no route to host") || strings.Contains(s, "llm http 5"):
return "model_unavailable"
case strings.Contains(s, "llm http 4") || strings.Contains(s, "messages are required"):
return "bad_input"
case strings.Contains(s, "llm decode") || strings.Contains(s, "empty choices"):
return "bad_response"
default:
return "unknown"
}
}