From fd1ee0611bfd74868e20438f35a77c7bda99820f Mon Sep 17 00:00:00 2001 From: Grendgi Date: Fri, 12 Jun 2026 09:01:41 +0300 Subject: [PATCH] Batch enqueue TG AI classifications --- cmd/classifier/main.go | 140 ++++++++++++++++++++++++----------- internal/aiservice/client.go | 53 +++++++++++-- 2 files changed, 145 insertions(+), 48 deletions(-) diff --git a/cmd/classifier/main.go b/cmd/classifier/main.go index 0848292..d68b556 100644 --- a/cmd/classifier/main.go +++ b/cmd/classifier/main.go @@ -92,11 +92,11 @@ func main() { defer ticker.Stop() for { - updated, err := worker.runOnce(ctx) + updated, enqueued, err := worker.runOnce(ctx) if err != nil { slog.Error("classify_batch_failed", "error", err) - } else if updated > 0 { - slog.Info("classify_batch_done", "updated", updated) + } else if updated > 0 || enqueued > 0 { + slog.Info("classify_batch_done", "updated", updated, "enqueued", enqueued) } select { @@ -114,42 +114,92 @@ type classifier struct { ai *aiservice.Client } -func (c *classifier) runOnce(ctx context.Context) (int, error) { +func (c *classifier) runOnce(ctx context.Context) (int, int, error) { rows, err := c.loadPending(ctx) if err != nil { - return 0, err + return 0, 0, err } if len(rows) == 0 { - return 0, nil + return 0, 0, nil } + byRef := make(map[string]pendingMessage, len(rows)) + jobs := make([]aiservice.CreateJobRequest, 0, len(rows)) updated := 0 for _, msg := range rows { key := verdictKey(msg.Vertical) if _, ok := msg.Extracted[key]; ok { continue } - - verdict, err := c.classify(ctx, msg) - if err != nil { - slog.Warn("llm_classify_failed", "message_id", msg.ID, "vertical", msg.Vertical, "error", err) - continue - } - if len(verdict) == 0 { - verdict, err = marshalRaw(negativeVerdict(msg.Vertical)) + if len(strings.TrimSpace(msg.Text)) < c.cfg.LLMMinTextLength { + verdict, err := marshalRaw(negativeVerdict(msg.Vertical)) if err != nil { - slog.Warn("negative_verdict_failed", "message_id", msg.ID, "error", err) + slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err) continue } - } - - if err := c.saveVerdict(ctx, msg, key, verdict); err != nil { - slog.Warn("save_verdict_failed", "message_id", msg.ID, "error", err) + if err := c.saveVerdict(ctx, msg, key, verdict); err != nil { + slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err) + continue + } + updated++ continue } - updated++ + + req, err := c.buildJobRequest(ctx, msg) + if err != nil { + slog.Warn("build_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "vertical", msg.Vertical, "error", err) + continue + } + byRef[req.OwnerRef] = msg + jobs = append(jobs, req) } - return updated, nil + if len(jobs) == 0 { + return updated, 0, nil + } + + created, err := c.ai.CreateJobs(ctx, aiservice.CreateJobsRequest{ + OwnerService: "monitoring-tg", + TaskType: "telegram_classification", + ModelProfile: c.cfg.LLMModel, + Priority: 5, + MaxAttempts: 2, + Jobs: jobs, + }) + if err != nil { + return updated, 0, err + } + for _, job := range created { + msg, ok := byRef[job.OwnerRef] + if !ok { + continue + } + switch job.Status { + case "done": + verdict, err := c.verdictFromJob(job, msg.Vertical) + if err != nil { + slog.Warn("parse_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err) + continue + } + if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil { + slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err) + continue + } + updated++ + case "failed", "cancelled": + verdict, err := marshalRaw(negativeVerdict(msg.Vertical)) + if err != nil { + slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err) + continue + } + if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil { + slog.Warn("save_failed_job_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err) + continue + } + updated++ + slog.Warn("classify_job_failed_marked_negative", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "status", job.Status, "error", derefString(job.ErrorMessage)) + } + } + return updated, len(created), nil } func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error) { @@ -192,14 +242,10 @@ func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error) return out, rows.Err() } -func (c *classifier) classify(ctx context.Context, msg pendingMessage) (json.RawMessage, error) { - if len(strings.TrimSpace(msg.Text)) < c.cfg.LLMMinTextLength { - return marshalRaw(negativeVerdict(msg.Vertical)) - } - +func (c *classifier) buildJobRequest(ctx context.Context, msg pendingMessage) (aiservice.CreateJobRequest, error) { systemPrompt, err := c.resolvePrompt(ctx, msg.Vertical, msg.DepartmentID, msg.SectionSlug) if err != nil { - return nil, err + return aiservice.CreateJobRequest{}, err } systemPrompt = promptWithVerticalGuard(msg.Vertical, systemPrompt) @@ -215,27 +261,24 @@ func (c *classifier) classify(ctx context.Context, msg pendingMessage) (json.Raw } body, err := json.Marshal(payload) if err != nil { - return nil, err + return aiservice.CreateJobRequest{}, err } - job, err := c.ai.CreateJob(ctx, aiservice.CreateJobRequest{ + ownerRef := classifyOwnerRef(msg) + return aiservice.CreateJobRequest{ OwnerService: "monitoring-tg", - OwnerRef: fmt.Sprintf("%d", msg.ID), + OwnerRef: ownerRef, TaskType: "telegram_classification", ModelProfile: c.cfg.LLMModel, Priority: 5, MaxAttempts: 2, Input: body, - }) - if err != nil { - return nil, err - } - waitCtx, cancel := context.WithTimeout(ctx, c.cfg.LLMTimeout) - defer cancel() - job, err = c.ai.WaitJob(waitCtx, job.ID, 2*time.Second) - if err != nil { - return nil, err - } + // Classification is section-specific because prompts are section-specific. + IdempotencyKey: "monitoring-tg:telegram_classification:" + ownerRef, + }, nil +} + +func (c *classifier) verdictFromJob(job *aiservice.Job, vertical string) (json.RawMessage, error) { if job.Status != "done" { msg := "ai-service job " + job.Status if job.ErrorMessage != nil && *job.ErrorMessage != "" { @@ -256,7 +299,7 @@ func (c *classifier) classify(ctx context.Context, msg pendingMessage) (json.Raw if err != nil { return nil, err } - normalized, err := normalizeVerdict(msg.Vertical, block) + normalized, err := normalizeVerdict(vertical, block) if err != nil { return nil, err } @@ -323,6 +366,10 @@ func verdictKey(vertical string) string { return "lead" } +func classifyOwnerRef(msg pendingMessage) string { + return fmt.Sprintf("%d:%d", msg.ID, msg.SectionID) +} + func buildUserPrompt(text string) string { return "Текст сообщения:\n```\n" + text + "\n```\nВерни JSON." } @@ -408,6 +455,13 @@ func asFloat(v any) (float64, bool) { } } +func derefString(v *string) string { + if v == nil { + return "" + } + return *v +} + func defaultPrompt(vertical string) string { if vertical == verticalHR { return defaultHRPrompt @@ -437,8 +491,8 @@ func loadConfig() config { LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second, LLMMaxTokens: envInt("LLM_MAX_TOKENS", 600), LLMMinTextLength: envInt("LLM_MIN_TEXT_LENGTH", 20), - ClassifyInterval: time.Duration(envInt("LLM_CLASSIFY_INTERVAL_SECONDS", 20)) * time.Second, - ClassifyBatchSize: envInt("LLM_CLASSIFY_BATCH_SIZE", 5), + ClassifyInterval: time.Duration(envInt("LLM_CLASSIFY_INTERVAL_SECONDS", 5)) * time.Second, + ClassifyBatchSize: envInt("LLM_CLASSIFY_BATCH_SIZE", 200), AIServiceURL: env("AI_SERVICE_URL", ""), AIServiceToken: env("AI_SERVICE_TOKEN", ""), } diff --git a/internal/aiservice/client.go b/internal/aiservice/client.go index 5be7a3b..c9f6291 100644 --- a/internal/aiservice/client.go +++ b/internal/aiservice/client.go @@ -40,12 +40,26 @@ type CreateJobRequest struct { IdempotencyKey string `json:"idempotency_key,omitempty"` } +type CreateJobsRequest struct { + OwnerService string `json:"owner_service,omitempty"` + TaskType string `json:"task_type,omitempty"` + ModelProfile string `json:"model_profile,omitempty"` + Priority int `json:"priority,omitempty"` + MaxAttempts int `json:"max_attempts,omitempty"` + Jobs []CreateJobRequest `json:"jobs"` +} + type Job struct { - ID string `json:"id"` - Status string `json:"status"` - Result json.RawMessage `json:"result,omitempty"` - ErrorCode *string `json:"error_code,omitempty"` - ErrorMessage *string `json:"error_message,omitempty"` + ID string `json:"id"` + OwnerService string `json:"owner_service,omitempty"` + OwnerRef string `json:"owner_ref,omitempty"` + TaskType string `json:"task_type,omitempty"` + ModelProfile string `json:"model_profile,omitempty"` + Status string `json:"status"` + Result json.RawMessage `json:"result,omitempty"` + ErrorCode *string `json:"error_code,omitempty"` + ErrorMessage *string `json:"error_message,omitempty"` + IdempotencyKey *string `json:"idempotency_key,omitempty"` } type ChatResult struct { @@ -111,6 +125,35 @@ func (c *Client) CreateJob(ctx context.Context, req CreateJobRequest) (*Job, err return &out, nil } +func (c *Client) CreateJobs(ctx context.Context, req CreateJobsRequest) ([]*Job, error) { + if c == nil { + return nil, fmt.Errorf("ai-service not configured") + } + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal ai jobs: %w", err) + } + httpReq, err := c.request(ctx, http.MethodPost, "/api/v1/jobs/batch", body) + if err != nil { + return nil, err + } + resp, err := c.http.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("create ai jobs: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("create ai jobs: http %d: %s", resp.StatusCode, readSmall(resp.Body)) + } + var out struct { + Jobs []*Job `json:"jobs"` + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("decode ai jobs: %w", err) + } + return out.Jobs, nil +} + func (c *Client) GetJob(ctx context.Context, id string) (*Job, error) { if c == nil || strings.TrimSpace(id) == "" { return nil, fmt.Errorf("ai job id is required")