From fc696c9e13f85e5e01c5a22c039d7f0e6359503c Mon Sep 17 00:00:00 2001 From: Grendgi Date: Thu, 11 Jun 2026 17:06:00 +0300 Subject: [PATCH] Add per-section TG classifications --- .../versions/0012_message_classifications.py | 38 +++++++++ cmd/classifier/main.go | 33 +++++--- cmd/server/main.go | 81 +++++++++++++++---- src/parser_bot/db/models.py | 18 +++++ 4 files changed, 144 insertions(+), 26 deletions(-) create mode 100644 alembic/versions/0012_message_classifications.py diff --git a/alembic/versions/0012_message_classifications.py b/alembic/versions/0012_message_classifications.py new file mode 100644 index 0000000..7624649 --- /dev/null +++ b/alembic/versions/0012_message_classifications.py @@ -0,0 +1,38 @@ +"""per-section message classifications + +Revision ID: 0012 +Revises: 0011 +Create Date: 2026-06-11 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +revision: str = "0012" +down_revision: Union[str, None] = "0011" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "message_classifications", + sa.Column("id", sa.Integer(), primary_key=True), + sa.Column("message_id", sa.Integer(), sa.ForeignKey("messages.id", ondelete="CASCADE"), nullable=False), + sa.Column("section_id", sa.Integer(), sa.ForeignKey("sections.id", ondelete="CASCADE"), nullable=False), + sa.Column("vertical", sa.String(length=32), nullable=False), + sa.Column("verdict", postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + sa.UniqueConstraint("message_id", "section_id", name="uq_message_classification_section"), + ) + op.create_index("ix_message_classifications_message", "message_classifications", ["message_id"]) + op.create_index("ix_message_classifications_section", "message_classifications", ["section_id"]) + + +def downgrade() -> None: + op.drop_index("ix_message_classifications_section", table_name="message_classifications") + op.drop_index("ix_message_classifications_message", table_name="message_classifications") + op.drop_table("message_classifications") diff --git a/cmd/classifier/main.go b/cmd/classifier/main.go index cf9adb8..0848292 100644 --- a/cmd/classifier/main.go +++ b/cmd/classifier/main.go @@ -44,6 +44,7 @@ type config struct { type pendingMessage struct { ID int64 + SectionID int64 Text string Vertical string SectionSlug string @@ -142,7 +143,7 @@ func (c *classifier) runOnce(ctx context.Context) (int, error) { } } - if err := c.saveVerdict(ctx, msg.ID, key, verdict); err != nil { + if err := c.saveVerdict(ctx, msg, key, verdict); err != nil { slog.Warn("save_verdict_failed", "message_id", msg.ID, "error", err) continue } @@ -155,20 +156,19 @@ func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error) rows, err := c.db.Query(ctx, ` SELECT m.id, + s.id, m.text, c.vertical, s.slug, COALESCE(s.department_id, ''), - COALESCE(m.extracted, '{}'::jsonb)::text + COALESCE(mc.verdict, '{}'::jsonb)::text FROM messages m - JOIN channels c ON c.id = m.channel_id + JOIN channels src ON src.id = m.channel_id + JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id JOIN sections s ON s.id = c.section_id + LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE m.text IS NOT NULL - AND ( - (c.vertical = 'hr' AND (m.extracted IS NULL OR m.extracted->'hr_lead' IS NULL)) - OR - (c.vertical <> 'hr' AND (m.extracted IS NULL OR m.extracted->'lead' IS NULL)) - ) + AND mc.id IS NULL ORDER BY m.id DESC LIMIT $1 `, c.cfg.ClassifyBatchSize) @@ -181,7 +181,7 @@ func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error) for rows.Next() { var msg pendingMessage var extractedText string - if err := rows.Scan(&msg.ID, &msg.Text, &msg.Vertical, &msg.SectionSlug, &msg.DepartmentID, &extractedText); err != nil { + if err := rows.Scan(&msg.ID, &msg.SectionID, &msg.Text, &msg.Vertical, &msg.SectionSlug, &msg.DepartmentID, &extractedText); err != nil { return nil, err } if err := json.Unmarshal([]byte(extractedText), &msg.Extracted); err != nil { @@ -284,8 +284,19 @@ func (c *classifier) resolvePrompt(ctx context.Context, vertical, departmentID, return defaultPrompt(vertical), nil } -func (c *classifier) saveVerdict(ctx context.Context, id int64, key string, verdict json.RawMessage) error { +func (c *classifier) saveVerdict(ctx context.Context, msg pendingMessage, key string, verdict json.RawMessage) error { _, err := c.db.Exec(ctx, ` + INSERT INTO message_classifications (message_id, section_id, vertical, verdict, updated_at) + VALUES ($1, $2, $3, $4::jsonb, now()) + ON CONFLICT ON CONSTRAINT uq_message_classification_section DO UPDATE + SET vertical = EXCLUDED.vertical, + verdict = EXCLUDED.verdict, + updated_at = now() + `, msg.ID, msg.SectionID, msg.Vertical, string(verdict)) + if err != nil { + return err + } + _, err = c.db.Exec(ctx, ` UPDATE messages SET extracted = jsonb_set( CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END, @@ -294,7 +305,7 @@ func (c *classifier) saveVerdict(ctx context.Context, id int64, key string, verd true ) WHERE id = $1 - `, id, key, string(verdict)) + `, msg.ID, key, string(verdict)) return err } diff --git a/cmd/server/main.go b/cmd/server/main.go index 3550d66..205b6de 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -296,10 +296,11 @@ func (a *app) listSections(ctx context.Context, w http.ResponseWriter, r *http.R FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id + LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE c.section_id = s.id AND ( - (s.vertical = 'hr' AND m.extracted->'hr_lead'->>'is_lead' = 'true') - OR (s.vertical <> 'hr' AND m.extracted->'lead'->>'is_listing' = 'true') + (s.vertical = 'hr' AND COALESCE(mc.verdict->>'is_lead', m.extracted->'hr_lead'->>'is_lead') = 'true') + OR (s.vertical <> 'hr' AND COALESCE(mc.verdict->>'is_listing', m.extracted->'lead'->>'is_listing') = 'true') )) FROM sections s WHERE s.vertical = $1`+deptFilter+` @@ -595,13 +596,30 @@ func (a *app) createChannel(ctx context.Context, w http.ResponseWriter, r *http. writeDBError(w, err) return } + var sourceID *int64 + var existingSource sql.NullInt64 + err = a.db.QueryRow(ctx, ` + SELECT COALESCE(c.source_channel_id, c.id) + FROM channels c + WHERE c.vertical = $1 AND c.identifier = $2 AND c.section_id <> $3 + ORDER BY CASE WHEN c.source_channel_id IS NULL THEN 0 ELSE 1 END, c.id + LIMIT 1 + `, payload.Vertical, payload.Identifier, section.ID).Scan(&existingSource) + if err != nil && !errors.Is(err, pgx.ErrNoRows) { + writeDBError(w, err) + return + } + if existingSource.Valid { + sourceID = &existingSource.Int64 + } row := a.db.QueryRow(ctx, ` - INSERT INTO channels (identifier, vertical, section_id, is_active) - VALUES ($1, $2, $3, true) + INSERT INTO channels (identifier, vertical, section_id, source_channel_id, is_active) + VALUES ($1, $2, $3, $5, true) ON CONFLICT ON CONSTRAINT uq_channels_section_identifier DO UPDATE - SET is_active = true + SET is_active = true, + source_channel_id = COALESCE(channels.source_channel_id, EXCLUDED.source_channel_id) RETURNING id, tg_id, source_channel_id, identifier, title, vertical, section_id, $4::text, is_active, last_message_id, last_polled_at, created_at - `, payload.Identifier, payload.Vertical, section.ID, section.Slug) + `, payload.Identifier, payload.Vertical, section.ID, section.Slug, sourceID) item, err := scanChannelRow(row) if err != nil { writeDBError(w, err) @@ -791,7 +809,13 @@ func (a *app) channelStats(ctx context.Context, w http.ResponseWriter, r *http.R writeDBError(w, err) return } - if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1 AND extracted -> $2 ->> $3 = 'true'`, sourceID, key, field).Scan(&leads); err != nil { + if err := a.db.QueryRow(ctx, ` + SELECT count(*) + FROM messages m + LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = $2 + WHERE m.channel_id = $1 + AND COALESCE(mc.verdict ->> $4, m.extracted -> $3 ->> $4) = 'true' + `, sourceID, ch.SectionID, key, field).Scan(&leads); err != nil { writeDBError(w, err) return } @@ -821,13 +845,21 @@ func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *ht sourceID = *ch.SourceChannelID } tag, err := a.db.Exec(ctx, ` + DELETE FROM message_classifications + WHERE section_id = $1 + AND message_id IN (SELECT id FROM messages WHERE channel_id = $2) + `, ch.SectionID, sourceID) + if err != nil { + writeDBError(w, err) + return + } + if _, err := a.db.Exec(ctx, ` UPDATE messages SET extracted = ( CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END ) - $1 WHERE channel_id = $2 - `, key, sourceID) - if err != nil { + `, key, sourceID); err != nil { writeDBError(w, err) return } @@ -882,7 +914,7 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http field = "is_lead" } args = append(args, key, field) - where += fmt.Sprintf(" AND m.extracted -> $%d ->> $%d = 'true'", len(args)-1, len(args)) + where += fmt.Sprintf(" AND COALESCE(mc.verdict ->> $%d, m.extracted -> $%d ->> $%d) = 'true'", len(args), len(args)-1, len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) @@ -893,12 +925,21 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http rows, err := a.db.Query(ctx, ` SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, - COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, + COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, + COALESCE( + CASE + WHEN mc.verdict IS NULL THEN NULL + ELSE jsonb_build_object(CASE WHEN c.vertical = 'hr' THEN 'hr_lead' ELSE 'lead' END, mc.verdict) + END, + m.extracted, + 'null'::jsonb + )::text, COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id + LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE `+where+` ORDER BY m.date DESC, m.id DESC LIMIT $`+strconv.Itoa(len(args)-1)+` OFFSET $`+strconv.Itoa(len(args))+` @@ -950,12 +991,21 @@ func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *h row := a.db.QueryRow(ctx, ` SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, - COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, + COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, + COALESCE( + CASE + WHEN mc.verdict IS NULL THEN NULL + ELSE jsonb_build_object(CASE WHEN c.vertical = 'hr' THEN 'hr_lead' ELSE 'lead' END, mc.verdict) + END, + m.extracted, + 'null'::jsonb + )::text, COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at FROM messages m JOIN channels src ON src.id = m.channel_id JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id JOIN sections s ON s.id = c.section_id + LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE `+where+` ORDER BY CASE WHEN c.id = src.id THEN 0 ELSE 1 END LIMIT 1 @@ -1106,7 +1156,7 @@ func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Re writeDBError(w, err) return } - messageFrom := `FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id WHERE ` + where + messageFrom := `FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE ` + where if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom, args...).Scan(&messagesTotal); err != nil { writeDBError(w, err) return @@ -1122,7 +1172,7 @@ func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Re field = "is_lead" } leadArgs := append(append([]any{}, args...), key, field) - leadClause := fmt.Sprintf(" AND m.extracted -> $%d ->> $%d = 'true'", len(leadArgs)-1, len(leadArgs)) + leadClause := fmt.Sprintf(" AND COALESCE(mc.verdict ->> $%d, m.extracted -> $%d ->> $%d) = 'true'", len(leadArgs), len(leadArgs)-1, len(leadArgs)) if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause, leadArgs...).Scan(&leadsTotal); err != nil { writeDBError(w, err) return @@ -1199,7 +1249,7 @@ func (a *app) handleLLMQueue(ctx context.Context, w http.ResponseWriter, r *http func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, section string) (int64, error) { args := []any{vertical} - where := `c.vertical = $1 AND m.text IS NOT NULL AND ((c.vertical = 'hr' AND (m.extracted IS NULL OR m.extracted->'hr_lead' IS NULL)) OR (c.vertical <> 'hr' AND (m.extracted IS NULL OR m.extracted->'lead' IS NULL)))` + where := `c.vertical = $1 AND m.text IS NOT NULL AND mc.id IS NULL` if section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) @@ -1215,6 +1265,7 @@ func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, secti JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id + LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE `+where, args...).Scan(&pending) return pending, err } diff --git a/src/parser_bot/db/models.py b/src/parser_bot/db/models.py index 021075f..add0244 100644 --- a/src/parser_bot/db/models.py +++ b/src/parser_bot/db/models.py @@ -119,6 +119,24 @@ class Message(Base): channel: Mapped[Channel] = relationship(back_populates="messages") +class MessageClassification(Base): + __tablename__ = "message_classifications" + __table_args__ = ( + UniqueConstraint("message_id", "section_id", name="uq_message_classification_section"), + Index("ix_message_classifications_message", "message_id"), + Index("ix_message_classifications_section", "section_id"), + ) + + id: Mapped[int] = mapped_column(primary_key=True) + message_id: Mapped[int] = mapped_column(ForeignKey("messages.id", ondelete="CASCADE")) + section_id: Mapped[int] = mapped_column(ForeignKey("sections.id", ondelete="CASCADE")) + vertical: Mapped[str] = mapped_column(String(32)) + verdict: Mapped[dict] = mapped_column(JSONB) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now() + ) + + class AppSetting(Base): """Runtime-editable settings, edited from the UI without a restart."""