Files

592 lines
17 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"monitoring-tg/internal/aiservice"
"monitoring-tg/internal/dbretry"
"github.com/jackc/pgx/v5/pgxpool"
)
const (
verticalHR = "hr"
defaultREPromptKey = "real_estate"
)
type config struct {
PostgresUser string
PostgresPassword string
PostgresDB string
PostgresHost string
PostgresPort int
LLMEnabled bool
LLMModel string
LLMTimeout time.Duration
LLMMaxTokens int
LLMMinTextLength int
ClassifyInterval time.Duration
ClassifyBatchSize int
AIServiceURL string
AIServiceToken string
}
type pendingMessage struct {
ID int64
SectionID int64
Text string
Vertical string
SectionSlug string
DepartmentID string
Extracted map[string]any
}
type responseFmt struct {
Type string `json:"type"`
}
func main() {
cfg := loadConfig()
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
slog.SetDefault(logger)
if !cfg.LLMEnabled {
slog.Info("classifier_disabled")
return
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
pool, err := dbretry.Connect(ctx, cfg.databaseURL(), 2*time.Minute)
if err != nil {
slog.Error("db_connect_failed", "error", err)
os.Exit(1)
}
defer pool.Close()
worker := &classifier{
cfg: cfg,
db: pool,
ai: aiservice.New(cfg.AIServiceURL, cfg.AIServiceToken, cfg.LLMTimeout),
}
slog.Info(
"classifier_started",
"interval", cfg.ClassifyInterval.String(),
"batch", cfg.ClassifyBatchSize,
"model", cfg.LLMModel,
)
ticker := time.NewTicker(cfg.ClassifyInterval)
defer ticker.Stop()
for {
updated, enqueued, err := worker.runOnce(ctx)
if err != nil {
slog.Error("classify_batch_failed", "error", err)
} else if updated > 0 || enqueued > 0 {
slog.Info("classify_batch_done", "updated", updated, "enqueued", enqueued)
}
select {
case <-ctx.Done():
slog.Info("classifier_stopped")
return
case <-ticker.C:
}
}
}
type classifier struct {
cfg config
db *pgxpool.Pool
ai *aiservice.Client
}
func (c *classifier) runOnce(ctx context.Context) (int, int, error) {
rows, err := c.loadPending(ctx)
if err != nil {
return 0, 0, err
}
if len(rows) == 0 {
return 0, 0, nil
}
byRef := make(map[string]pendingMessage, len(rows))
jobs := make([]aiservice.CreateJobRequest, 0, len(rows))
updated := 0
for _, msg := range rows {
key := verdictKey(msg.Vertical)
if _, ok := msg.Extracted[key]; ok {
continue
}
if len(strings.TrimSpace(msg.Text)) < c.cfg.LLMMinTextLength {
verdict, err := marshalRaw(negativeVerdict(msg.Vertical))
if err != nil {
slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err)
continue
}
if err := c.saveVerdict(ctx, msg, key, verdict); err != nil {
slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err)
continue
}
updated++
continue
}
req, err := c.buildJobRequest(ctx, msg)
if err != nil {
slog.Warn("build_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "vertical", msg.Vertical, "error", err)
continue
}
byRef[req.OwnerRef] = msg
jobs = append(jobs, req)
}
if len(jobs) == 0 {
return updated, 0, nil
}
created, err := c.ai.CreateJobs(ctx, aiservice.CreateJobsRequest{
OwnerService: "monitoring-tg",
TaskType: "telegram_classification",
ModelProfile: c.cfg.LLMModel,
Priority: 5,
MaxAttempts: 2,
Jobs: jobs,
})
if err != nil {
return updated, 0, err
}
for _, job := range created {
msg, ok := byRef[job.OwnerRef]
if !ok {
continue
}
switch job.Status {
case "done":
verdict, err := c.verdictFromJob(job, msg.Vertical)
if err != nil {
slog.Warn("parse_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil {
slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
updated++
case "failed", "cancelled":
verdict, err := marshalRaw(negativeVerdict(msg.Vertical))
if err != nil {
slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil {
slog.Warn("save_failed_job_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
updated++
slog.Warn("classify_job_failed_marked_negative", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "status", job.Status, "error", derefString(job.ErrorMessage))
}
}
return updated, len(created), nil
}
func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error) {
rows, err := c.db.Query(ctx, `
SELECT
m.id,
s.id,
m.text,
c.vertical,
s.slug,
COALESCE(s.department_id, ''),
COALESCE(mc.verdict, '{}'::jsonb)::text
FROM messages m
JOIN channels src ON src.id = m.channel_id
JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id
JOIN sections s ON s.id = c.section_id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE m.text IS NOT NULL
AND mc.id IS NULL
ORDER BY m.id DESC
LIMIT $1
`, c.cfg.ClassifyBatchSize)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]pendingMessage, 0, c.cfg.ClassifyBatchSize)
for rows.Next() {
var msg pendingMessage
var extractedText string
if err := rows.Scan(&msg.ID, &msg.SectionID, &msg.Text, &msg.Vertical, &msg.SectionSlug, &msg.DepartmentID, &extractedText); err != nil {
return nil, err
}
if err := json.Unmarshal([]byte(extractedText), &msg.Extracted); err != nil {
msg.Extracted = map[string]any{}
}
out = append(out, msg)
}
return out, rows.Err()
}
func (c *classifier) buildJobRequest(ctx context.Context, msg pendingMessage) (aiservice.CreateJobRequest, error) {
systemPrompt, err := c.resolvePrompt(ctx, msg.Vertical, msg.DepartmentID, msg.SectionSlug)
if err != nil {
return aiservice.CreateJobRequest{}, err
}
systemPrompt = promptWithVerticalGuard(msg.Vertical, systemPrompt)
responseFormat, _ := json.Marshal(responseFmt{Type: "json_object"})
payload := aiservice.ChatInput{
Messages: []aiservice.Message{
{Role: "system", Content: systemPrompt},
{Role: "user", Content: buildUserPrompt(msg.Text)},
},
Temperature: 0.1,
MaxTokens: c.cfg.LLMMaxTokens,
ResponseFormat: responseFormat,
}
body, err := json.Marshal(payload)
if err != nil {
return aiservice.CreateJobRequest{}, err
}
ownerRef := classifyOwnerRef(msg)
return aiservice.CreateJobRequest{
OwnerService: "monitoring-tg",
OwnerRef: ownerRef,
TaskType: "telegram_classification",
ModelProfile: c.cfg.LLMModel,
Priority: 5,
MaxAttempts: 2,
Input: body,
// Classification is section-specific because prompts are section-specific.
IdempotencyKey: "monitoring-tg:telegram_classification:" + ownerRef,
}, nil
}
func (c *classifier) verdictFromJob(job *aiservice.Job, vertical string) (json.RawMessage, error) {
if job.Status != "done" {
msg := "ai-service job " + job.Status
if job.ErrorMessage != nil && *job.ErrorMessage != "" {
msg += ": " + *job.ErrorMessage
}
return nil, errors.New(msg)
}
var parsed aiservice.ChatResult
if err := json.Unmarshal(job.Result, &parsed); err != nil {
return nil, err
}
raw := strings.TrimSpace(parsed.Content)
if raw == "" {
return nil, errors.New("llm returned empty content")
}
block, err := extractJSONBlock(raw)
if err != nil {
return nil, err
}
normalized, err := normalizeVerdict(vertical, block)
if err != nil {
return nil, err
}
return normalized, nil
}
func (c *classifier) resolvePrompt(ctx context.Context, vertical, departmentID, sectionSlug string) (string, error) {
dept := departmentID
if dept == "" {
dept = "global"
}
keys := []string{}
if sectionSlug != "" {
keys = append(keys, promptKey(dept, vertical, sectionSlug))
}
keys = append(keys, promptKey(dept, vertical, ""))
for _, key := range keys {
var text string
err := c.db.QueryRow(ctx, `SELECT value #>> '{}' FROM app_settings WHERE key = $1`, key).Scan(&text)
if err == nil && strings.TrimSpace(text) != "" {
return text, nil
}
}
return defaultPrompt(vertical), nil
}
func (c *classifier) saveVerdict(ctx context.Context, msg pendingMessage, key string, verdict json.RawMessage) error {
_, err := c.db.Exec(ctx, `
INSERT INTO message_classifications (message_id, section_id, vertical, verdict, updated_at)
VALUES ($1, $2, $3, $4::jsonb, now())
ON CONFLICT ON CONSTRAINT uq_message_classification_section DO UPDATE
SET vertical = EXCLUDED.vertical,
verdict = EXCLUDED.verdict,
updated_at = now()
`, msg.ID, msg.SectionID, msg.Vertical, string(verdict))
if err != nil {
return err
}
_, err = c.db.Exec(ctx, `
UPDATE messages
SET extracted = jsonb_set(
CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END,
ARRAY[$2],
$3::jsonb,
true
)
WHERE id = $1
`, msg.ID, key, string(verdict))
return err
}
func promptKey(departmentID, vertical, sectionSlug string) string {
if sectionSlug != "" {
return fmt.Sprintf("llm_system_prompt:%s:%s:%s", departmentID, vertical, sectionSlug)
}
return fmt.Sprintf("llm_system_prompt:%s:%s", departmentID, vertical)
}
func verdictKey(vertical string) string {
if vertical == verticalHR {
return "hr_lead"
}
return "lead"
}
func classifyOwnerRef(msg pendingMessage) string {
return fmt.Sprintf("%d:%d", msg.ID, msg.SectionID)
}
func buildUserPrompt(text string) string {
return "Текст сообщения:\n```\n" + text + "\n```\nВерни JSON."
}
func extractJSONBlock(raw string) (json.RawMessage, error) {
var payload json.RawMessage
if err := json.Unmarshal([]byte(raw), &payload); err == nil {
return payload, nil
}
start := strings.Index(raw, "{")
end := strings.LastIndex(raw, "}")
if start < 0 || end < start {
return nil, errors.New("no json object in llm content")
}
block := raw[start : end+1]
if err := json.Unmarshal([]byte(block), &payload); err != nil {
return nil, err
}
return payload, nil
}
func normalizeVerdict(vertical string, raw json.RawMessage) (json.RawMessage, error) {
var m map[string]any
if err := json.Unmarshal(raw, &m); err != nil {
return nil, err
}
if vertical == verticalHR {
if _, ok := m["is_lead"]; !ok {
m["is_lead"] = false
}
} else if _, ok := m["is_listing"]; !ok {
m["is_listing"] = false
}
if confidence, ok := asFloat(m["confidence"]); ok {
if confidence < 0 {
confidence = 0
}
if confidence > 1 {
confidence = 1
}
m["confidence"] = confidence
}
return marshalRaw(m)
}
func negativeVerdict(vertical string) map[string]any {
if vertical == verticalHR {
return map[string]any{
"is_lead": false,
"kind": nil,
"summary": "",
"confidence": 0,
}
}
return map[string]any{
"is_listing": false,
"kind": nil,
"summary": "",
"confidence": 0,
}
}
func marshalRaw(v any) (json.RawMessage, error) {
b, err := json.Marshal(v)
return json.RawMessage(b), err
}
func asFloat(v any) (float64, bool) {
switch x := v.(type) {
case float64:
return x, true
case float32:
return float64(x), true
case int:
return float64(x), true
case int64:
return float64(x), true
case json.Number:
f, err := x.Float64()
return f, err == nil
default:
return 0, false
}
}
func derefString(v *string) string {
if v == nil {
return ""
}
return *v
}
func defaultPrompt(vertical string) string {
if vertical == verticalHR {
return defaultHRPrompt
}
return defaultREPrompt
}
func promptWithVerticalGuard(vertical, prompt string) string {
if vertical == verticalHR {
return prompt
}
if strings.Contains(prompt, realEstateOnlyGuard) {
return prompt
}
return strings.TrimSpace(prompt) + "\n\n" + realEstateOnlyGuard
}
func loadConfig() config {
return config{
PostgresUser: env("POSTGRES_USER", "parser"),
PostgresPassword: env("POSTGRES_PASSWORD", "parser"),
PostgresDB: env("POSTGRES_DB", "parser"),
PostgresHost: env("POSTGRES_HOST", "db"),
PostgresPort: envInt("POSTGRES_PORT", 5432),
LLMEnabled: envBool("LLM_ENABLED", true),
LLMModel: env("LLM_MODEL", "qwen2.5-14b"),
LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second,
LLMMaxTokens: envInt("LLM_MAX_TOKENS", 600),
LLMMinTextLength: envInt("LLM_MIN_TEXT_LENGTH", 20),
ClassifyInterval: time.Duration(envInt("LLM_CLASSIFY_INTERVAL_SECONDS", 5)) * time.Second,
ClassifyBatchSize: envInt("LLM_CLASSIFY_BATCH_SIZE", 200),
AIServiceURL: env("AI_SERVICE_URL", ""),
AIServiceToken: env("AI_SERVICE_TOKEN", ""),
}
}
func (c config) databaseURL() string {
return fmt.Sprintf(
"postgres://%s:%s@%s:%d/%s",
url.QueryEscape(c.PostgresUser),
url.QueryEscape(c.PostgresPassword),
c.PostgresHost,
c.PostgresPort,
url.QueryEscape(c.PostgresDB),
)
}
func env(key, fallback string) string {
if v := strings.TrimSpace(os.Getenv(key)); v != "" {
return v
}
return fallback
}
func envInt(key string, fallback int) int {
if raw := strings.TrimSpace(os.Getenv(key)); raw != "" {
if n, err := strconv.Atoi(raw); err == nil {
return n
}
}
return fallback
}
func envBool(key string, fallback bool) bool {
if raw := strings.TrimSpace(os.Getenv(key)); raw != "" {
if b, err := strconv.ParseBool(raw); err == nil {
return b
}
if raw == "1" {
return true
}
if raw == "0" {
return false
}
}
return fallback
}
const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала.
Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости.
Учитывай только сделки с недвижимостью: квартиры, дома, апартаменты, участки, коммерческие помещения и другие объекты недвижимости.
Любые продажи или заявки по стройматериалам, мебели, бытовой/строительной технике, автомобилям, услугам, оборудованию и прочим товарам не являются лидами недвижимости — для них is_listing=false.
Отвечай строго валидным JSON без markdown:
{
"is_listing": boolean,
"kind": "sale" | "rent" | "purchase" | null,
"property_type": string | null,
"rooms": string | null,
"area_m2": number | null,
"price_text": string | null,
"price_value": number | null,
"currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null,
"location": string | null,
"contact_phone": string | null,
"contact_name": string | null,
"summary": string,
"confidence": number
}
summary всегда по-русски, confidence в диапазоне 0..1.`
const realEstateOnlyGuard = `Жёсткое правило для недвижимости: учитывай только сделки с недвижимостью.
Если сообщение продаёт или покупает стройматериалы, мебель, бытовую/строительную технику, автомобили, услуги, оборудование или любые другие товары/работы не по недвижимости — это НЕ лид недвижимости, верни is_listing=false.`
const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала.
Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт.
Отвечай строго валидным JSON без markdown:
{
"is_lead": boolean,
"kind": "vacancy" | "resume" | "contact" | null,
"title": string | null,
"company": string | null,
"candidate_name": string | null,
"experience_years": number | null,
"skills": string[],
"location": string | null,
"remote": boolean | null,
"employment_type": "full-time" | "part-time" | "contract" | "internship" | null,
"salary_text": string | null,
"salary_value": number | null,
"currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null,
"contact_phone": string | null,
"contact_name": string | null,
"summary": string,
"confidence": number
}
summary всегда по-русски, confidence в диапазоне 0..1.`