package main import ( "context" "encoding/json" "errors" "fmt" "log/slog" "net/url" "os" "os/signal" "strconv" "strings" "syscall" "time" "monitoring-tg/internal/aiservice" "github.com/jackc/pgx/v5/pgxpool" ) const ( verticalHR = "hr" defaultREPromptKey = "real_estate" ) type config struct { PostgresUser string PostgresPassword string PostgresDB string PostgresHost string PostgresPort int LLMEnabled bool LLMModel string LLMTimeout time.Duration LLMMaxTokens int LLMMinTextLength int ClassifyInterval time.Duration ClassifyBatchSize int AIServiceURL string AIServiceToken string } type pendingMessage struct { ID int64 SectionID int64 Text string Vertical string SectionSlug string DepartmentID string Extracted map[string]any } type responseFmt struct { Type string `json:"type"` } func main() { cfg := loadConfig() logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) slog.SetDefault(logger) if !cfg.LLMEnabled { slog.Info("classifier_disabled") return } ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() pool, err := pgxpool.New(ctx, cfg.databaseURL()) if err != nil { slog.Error("db_connect_failed", "error", err) os.Exit(1) } defer pool.Close() worker := &classifier{ cfg: cfg, db: pool, ai: aiservice.New(cfg.AIServiceURL, cfg.AIServiceToken, cfg.LLMTimeout), } slog.Info( "classifier_started", "interval", cfg.ClassifyInterval.String(), "batch", cfg.ClassifyBatchSize, "model", cfg.LLMModel, ) ticker := time.NewTicker(cfg.ClassifyInterval) defer ticker.Stop() for { updated, enqueued, err := worker.runOnce(ctx) if err != nil { slog.Error("classify_batch_failed", "error", err) } else if updated > 0 || enqueued > 0 { slog.Info("classify_batch_done", "updated", updated, "enqueued", enqueued) } select { case <-ctx.Done(): slog.Info("classifier_stopped") return case <-ticker.C: } } } type classifier struct { cfg config db *pgxpool.Pool ai *aiservice.Client } func (c *classifier) runOnce(ctx context.Context) (int, int, error) { rows, err := c.loadPending(ctx) if err != nil { return 0, 0, err } if len(rows) == 0 { return 0, 0, nil } byRef := make(map[string]pendingMessage, len(rows)) jobs := make([]aiservice.CreateJobRequest, 0, len(rows)) updated := 0 for _, msg := range rows { key := verdictKey(msg.Vertical) if _, ok := msg.Extracted[key]; ok { continue } if len(strings.TrimSpace(msg.Text)) < c.cfg.LLMMinTextLength { verdict, err := marshalRaw(negativeVerdict(msg.Vertical)) if err != nil { slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err) continue } if err := c.saveVerdict(ctx, msg, key, verdict); err != nil { slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err) continue } updated++ continue } req, err := c.buildJobRequest(ctx, msg) if err != nil { slog.Warn("build_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "vertical", msg.Vertical, "error", err) continue } byRef[req.OwnerRef] = msg jobs = append(jobs, req) } if len(jobs) == 0 { return updated, 0, nil } created, err := c.ai.CreateJobs(ctx, aiservice.CreateJobsRequest{ OwnerService: "monitoring-tg", TaskType: "telegram_classification", ModelProfile: c.cfg.LLMModel, Priority: 5, MaxAttempts: 2, Jobs: jobs, }) if err != nil { return updated, 0, err } for _, job := range created { msg, ok := byRef[job.OwnerRef] if !ok { continue } switch job.Status { case "done": verdict, err := c.verdictFromJob(job, msg.Vertical) if err != nil { slog.Warn("parse_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err) continue } if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil { slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err) continue } updated++ case "failed", "cancelled": verdict, err := marshalRaw(negativeVerdict(msg.Vertical)) if err != nil { slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err) continue } if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil { slog.Warn("save_failed_job_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err) continue } updated++ slog.Warn("classify_job_failed_marked_negative", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "status", job.Status, "error", derefString(job.ErrorMessage)) } } return updated, len(created), nil } func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error) { rows, err := c.db.Query(ctx, ` SELECT m.id, s.id, m.text, c.vertical, s.slug, COALESCE(s.department_id, ''), COALESCE(mc.verdict, '{}'::jsonb)::text FROM messages m JOIN channels src ON src.id = m.channel_id JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id JOIN sections s ON s.id = c.section_id LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE m.text IS NOT NULL AND mc.id IS NULL ORDER BY m.id DESC LIMIT $1 `, c.cfg.ClassifyBatchSize) if err != nil { return nil, err } defer rows.Close() out := make([]pendingMessage, 0, c.cfg.ClassifyBatchSize) for rows.Next() { var msg pendingMessage var extractedText string if err := rows.Scan(&msg.ID, &msg.SectionID, &msg.Text, &msg.Vertical, &msg.SectionSlug, &msg.DepartmentID, &extractedText); err != nil { return nil, err } if err := json.Unmarshal([]byte(extractedText), &msg.Extracted); err != nil { msg.Extracted = map[string]any{} } out = append(out, msg) } return out, rows.Err() } func (c *classifier) buildJobRequest(ctx context.Context, msg pendingMessage) (aiservice.CreateJobRequest, error) { systemPrompt, err := c.resolvePrompt(ctx, msg.Vertical, msg.DepartmentID, msg.SectionSlug) if err != nil { return aiservice.CreateJobRequest{}, err } systemPrompt = promptWithVerticalGuard(msg.Vertical, systemPrompt) responseFormat, _ := json.Marshal(responseFmt{Type: "json_object"}) payload := aiservice.ChatInput{ Messages: []aiservice.Message{ {Role: "system", Content: systemPrompt}, {Role: "user", Content: buildUserPrompt(msg.Text)}, }, Temperature: 0.1, MaxTokens: c.cfg.LLMMaxTokens, ResponseFormat: responseFormat, } body, err := json.Marshal(payload) if err != nil { return aiservice.CreateJobRequest{}, err } ownerRef := classifyOwnerRef(msg) return aiservice.CreateJobRequest{ OwnerService: "monitoring-tg", OwnerRef: ownerRef, TaskType: "telegram_classification", ModelProfile: c.cfg.LLMModel, Priority: 5, MaxAttempts: 2, Input: body, // Classification is section-specific because prompts are section-specific. IdempotencyKey: "monitoring-tg:telegram_classification:" + ownerRef, }, nil } func (c *classifier) verdictFromJob(job *aiservice.Job, vertical string) (json.RawMessage, error) { if job.Status != "done" { msg := "ai-service job " + job.Status if job.ErrorMessage != nil && *job.ErrorMessage != "" { msg += ": " + *job.ErrorMessage } return nil, errors.New(msg) } var parsed aiservice.ChatResult if err := json.Unmarshal(job.Result, &parsed); err != nil { return nil, err } raw := strings.TrimSpace(parsed.Content) if raw == "" { return nil, errors.New("llm returned empty content") } block, err := extractJSONBlock(raw) if err != nil { return nil, err } normalized, err := normalizeVerdict(vertical, block) if err != nil { return nil, err } return normalized, nil } func (c *classifier) resolvePrompt(ctx context.Context, vertical, departmentID, sectionSlug string) (string, error) { dept := departmentID if dept == "" { dept = "global" } keys := []string{} if sectionSlug != "" { keys = append(keys, promptKey(dept, vertical, sectionSlug)) } keys = append(keys, promptKey(dept, vertical, "")) for _, key := range keys { var text string err := c.db.QueryRow(ctx, `SELECT value #>> '{}' FROM app_settings WHERE key = $1`, key).Scan(&text) if err == nil && strings.TrimSpace(text) != "" { return text, nil } } return defaultPrompt(vertical), nil } func (c *classifier) saveVerdict(ctx context.Context, msg pendingMessage, key string, verdict json.RawMessage) error { _, err := c.db.Exec(ctx, ` INSERT INTO message_classifications (message_id, section_id, vertical, verdict, updated_at) VALUES ($1, $2, $3, $4::jsonb, now()) ON CONFLICT ON CONSTRAINT uq_message_classification_section DO UPDATE SET vertical = EXCLUDED.vertical, verdict = EXCLUDED.verdict, updated_at = now() `, msg.ID, msg.SectionID, msg.Vertical, string(verdict)) if err != nil { return err } _, err = c.db.Exec(ctx, ` UPDATE messages SET extracted = jsonb_set( CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END, ARRAY[$2], $3::jsonb, true ) WHERE id = $1 `, msg.ID, key, string(verdict)) return err } func promptKey(departmentID, vertical, sectionSlug string) string { if sectionSlug != "" { return fmt.Sprintf("llm_system_prompt:%s:%s:%s", departmentID, vertical, sectionSlug) } return fmt.Sprintf("llm_system_prompt:%s:%s", departmentID, vertical) } func verdictKey(vertical string) string { if vertical == verticalHR { return "hr_lead" } return "lead" } func classifyOwnerRef(msg pendingMessage) string { return fmt.Sprintf("%d:%d", msg.ID, msg.SectionID) } func buildUserPrompt(text string) string { return "Текст сообщения:\n```\n" + text + "\n```\nВерни JSON." } func extractJSONBlock(raw string) (json.RawMessage, error) { var payload json.RawMessage if err := json.Unmarshal([]byte(raw), &payload); err == nil { return payload, nil } start := strings.Index(raw, "{") end := strings.LastIndex(raw, "}") if start < 0 || end < start { return nil, errors.New("no json object in llm content") } block := raw[start : end+1] if err := json.Unmarshal([]byte(block), &payload); err != nil { return nil, err } return payload, nil } func normalizeVerdict(vertical string, raw json.RawMessage) (json.RawMessage, error) { var m map[string]any if err := json.Unmarshal(raw, &m); err != nil { return nil, err } if vertical == verticalHR { if _, ok := m["is_lead"]; !ok { m["is_lead"] = false } } else if _, ok := m["is_listing"]; !ok { m["is_listing"] = false } if confidence, ok := asFloat(m["confidence"]); ok { if confidence < 0 { confidence = 0 } if confidence > 1 { confidence = 1 } m["confidence"] = confidence } return marshalRaw(m) } func negativeVerdict(vertical string) map[string]any { if vertical == verticalHR { return map[string]any{ "is_lead": false, "kind": nil, "summary": "", "confidence": 0, } } return map[string]any{ "is_listing": false, "kind": nil, "summary": "", "confidence": 0, } } func marshalRaw(v any) (json.RawMessage, error) { b, err := json.Marshal(v) return json.RawMessage(b), err } func asFloat(v any) (float64, bool) { switch x := v.(type) { case float64: return x, true case float32: return float64(x), true case int: return float64(x), true case int64: return float64(x), true case json.Number: f, err := x.Float64() return f, err == nil default: return 0, false } } func derefString(v *string) string { if v == nil { return "" } return *v } func defaultPrompt(vertical string) string { if vertical == verticalHR { return defaultHRPrompt } return defaultREPrompt } func promptWithVerticalGuard(vertical, prompt string) string { if vertical == verticalHR { return prompt } if strings.Contains(prompt, realEstateOnlyGuard) { return prompt } return strings.TrimSpace(prompt) + "\n\n" + realEstateOnlyGuard } func loadConfig() config { return config{ PostgresUser: env("POSTGRES_USER", "parser"), PostgresPassword: env("POSTGRES_PASSWORD", "parser"), PostgresDB: env("POSTGRES_DB", "parser"), PostgresHost: env("POSTGRES_HOST", "db"), PostgresPort: envInt("POSTGRES_PORT", 5432), LLMEnabled: envBool("LLM_ENABLED", true), LLMModel: env("LLM_MODEL", "qwen2.5-14b"), LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second, LLMMaxTokens: envInt("LLM_MAX_TOKENS", 600), LLMMinTextLength: envInt("LLM_MIN_TEXT_LENGTH", 20), ClassifyInterval: time.Duration(envInt("LLM_CLASSIFY_INTERVAL_SECONDS", 5)) * time.Second, ClassifyBatchSize: envInt("LLM_CLASSIFY_BATCH_SIZE", 200), AIServiceURL: env("AI_SERVICE_URL", ""), AIServiceToken: env("AI_SERVICE_TOKEN", ""), } } func (c config) databaseURL() string { return fmt.Sprintf( "postgres://%s:%s@%s:%d/%s", url.QueryEscape(c.PostgresUser), url.QueryEscape(c.PostgresPassword), c.PostgresHost, c.PostgresPort, url.QueryEscape(c.PostgresDB), ) } func env(key, fallback string) string { if v := strings.TrimSpace(os.Getenv(key)); v != "" { return v } return fallback } func envInt(key string, fallback int) int { if raw := strings.TrimSpace(os.Getenv(key)); raw != "" { if n, err := strconv.Atoi(raw); err == nil { return n } } return fallback } func envBool(key string, fallback bool) bool { if raw := strings.TrimSpace(os.Getenv(key)); raw != "" { if b, err := strconv.ParseBool(raw); err == nil { return b } if raw == "1" { return true } if raw == "0" { return false } } return fallback } const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала. Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости. Учитывай только сделки с недвижимостью: квартиры, дома, апартаменты, участки, коммерческие помещения и другие объекты недвижимости. Любые продажи или заявки по стройматериалам, мебели, бытовой/строительной технике, автомобилям, услугам, оборудованию и прочим товарам не являются лидами недвижимости — для них is_listing=false. Отвечай строго валидным JSON без markdown: { "is_listing": boolean, "kind": "sale" | "rent" | "purchase" | null, "property_type": string | null, "rooms": string | null, "area_m2": number | null, "price_text": string | null, "price_value": number | null, "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, "location": string | null, "contact_phone": string | null, "contact_name": string | null, "summary": string, "confidence": number } summary всегда по-русски, confidence в диапазоне 0..1.` const realEstateOnlyGuard = `Жёсткое правило для недвижимости: учитывай только сделки с недвижимостью. Если сообщение продаёт или покупает стройматериалы, мебель, бытовую/строительную технику, автомобили, услуги, оборудование или любые другие товары/работы не по недвижимости — это НЕ лид недвижимости, верни is_listing=false.` const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала. Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт. Отвечай строго валидным JSON без markdown: { "is_lead": boolean, "kind": "vacancy" | "resume" | "contact" | null, "title": string | null, "company": string | null, "candidate_name": string | null, "experience_years": number | null, "skills": string[], "location": string | null, "remote": boolean | null, "employment_type": "full-time" | "part-time" | "contract" | "internship" | null, "salary_text": string | null, "salary_value": number | null, "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, "contact_phone": string | null, "contact_name": string | null, "summary": string, "confidence": number } summary всегда по-русски, confidence в диапазоне 0..1.`