Add per-section TG classifications
All checks were successful
CI / go (push) Successful in 16s
CI / python (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 46s

This commit is contained in:
Grendgi
2026-06-11 17:06:00 +03:00
parent f66ca4b6d4
commit fc696c9e13
4 changed files with 144 additions and 26 deletions

View File

@@ -296,10 +296,11 @@ func (a *app) listSections(ctx context.Context, w http.ResponseWriter, r *http.R
FROM channels c
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
JOIN messages m ON m.channel_id = src.id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE c.section_id = s.id
AND (
(s.vertical = 'hr' AND m.extracted->'hr_lead'->>'is_lead' = 'true')
OR (s.vertical <> 'hr' AND m.extracted->'lead'->>'is_listing' = 'true')
(s.vertical = 'hr' AND COALESCE(mc.verdict->>'is_lead', m.extracted->'hr_lead'->>'is_lead') = 'true')
OR (s.vertical <> 'hr' AND COALESCE(mc.verdict->>'is_listing', m.extracted->'lead'->>'is_listing') = 'true')
))
FROM sections s
WHERE s.vertical = $1`+deptFilter+`
@@ -595,13 +596,30 @@ func (a *app) createChannel(ctx context.Context, w http.ResponseWriter, r *http.
writeDBError(w, err)
return
}
var sourceID *int64
var existingSource sql.NullInt64
err = a.db.QueryRow(ctx, `
SELECT COALESCE(c.source_channel_id, c.id)
FROM channels c
WHERE c.vertical = $1 AND c.identifier = $2 AND c.section_id <> $3
ORDER BY CASE WHEN c.source_channel_id IS NULL THEN 0 ELSE 1 END, c.id
LIMIT 1
`, payload.Vertical, payload.Identifier, section.ID).Scan(&existingSource)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
writeDBError(w, err)
return
}
if existingSource.Valid {
sourceID = &existingSource.Int64
}
row := a.db.QueryRow(ctx, `
INSERT INTO channels (identifier, vertical, section_id, is_active)
VALUES ($1, $2, $3, true)
INSERT INTO channels (identifier, vertical, section_id, source_channel_id, is_active)
VALUES ($1, $2, $3, $5, true)
ON CONFLICT ON CONSTRAINT uq_channels_section_identifier DO UPDATE
SET is_active = true
SET is_active = true,
source_channel_id = COALESCE(channels.source_channel_id, EXCLUDED.source_channel_id)
RETURNING id, tg_id, source_channel_id, identifier, title, vertical, section_id, $4::text, is_active, last_message_id, last_polled_at, created_at
`, payload.Identifier, payload.Vertical, section.ID, section.Slug)
`, payload.Identifier, payload.Vertical, section.ID, section.Slug, sourceID)
item, err := scanChannelRow(row)
if err != nil {
writeDBError(w, err)
@@ -791,7 +809,13 @@ func (a *app) channelStats(ctx context.Context, w http.ResponseWriter, r *http.R
writeDBError(w, err)
return
}
if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1 AND extracted -> $2 ->> $3 = 'true'`, sourceID, key, field).Scan(&leads); err != nil {
if err := a.db.QueryRow(ctx, `
SELECT count(*)
FROM messages m
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = $2
WHERE m.channel_id = $1
AND COALESCE(mc.verdict ->> $4, m.extracted -> $3 ->> $4) = 'true'
`, sourceID, ch.SectionID, key, field).Scan(&leads); err != nil {
writeDBError(w, err)
return
}
@@ -821,13 +845,21 @@ func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *ht
sourceID = *ch.SourceChannelID
}
tag, err := a.db.Exec(ctx, `
DELETE FROM message_classifications
WHERE section_id = $1
AND message_id IN (SELECT id FROM messages WHERE channel_id = $2)
`, ch.SectionID, sourceID)
if err != nil {
writeDBError(w, err)
return
}
if _, err := a.db.Exec(ctx, `
UPDATE messages
SET extracted = (
CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END
) - $1
WHERE channel_id = $2
`, key, sourceID)
if err != nil {
`, key, sourceID); err != nil {
writeDBError(w, err)
return
}
@@ -882,7 +914,7 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http
field = "is_lead"
}
args = append(args, key, field)
where += fmt.Sprintf(" AND m.extracted -> $%d ->> $%d = 'true'", len(args)-1, len(args))
where += fmt.Sprintf(" AND COALESCE(mc.verdict ->> $%d, m.extracted -> $%d ->> $%d) = 'true'", len(args), len(args)-1, len(args))
}
if !scope.IsAdmin {
args = append(args, scope.DeptID)
@@ -893,12 +925,21 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http
rows, err := a.db.Query(ctx, `
SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int,
m.date, m.text, m.sender_id, m.sender_username, m.sender_name,
COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text,
COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media,
COALESCE(
CASE
WHEN mc.verdict IS NULL THEN NULL
ELSE jsonb_build_object(CASE WHEN c.vertical = 'hr' THEN 'hr_lead' ELSE 'lead' END, mc.verdict)
END,
m.extracted,
'null'::jsonb
)::text,
COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at
FROM channels c
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
JOIN messages m ON m.channel_id = src.id
JOIN sections s ON s.id = c.section_id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE `+where+`
ORDER BY m.date DESC, m.id DESC
LIMIT $`+strconv.Itoa(len(args)-1)+` OFFSET $`+strconv.Itoa(len(args))+`
@@ -950,12 +991,21 @@ func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *h
row := a.db.QueryRow(ctx, `
SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int,
m.date, m.text, m.sender_id, m.sender_username, m.sender_name,
COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text,
COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media,
COALESCE(
CASE
WHEN mc.verdict IS NULL THEN NULL
ELSE jsonb_build_object(CASE WHEN c.vertical = 'hr' THEN 'hr_lead' ELSE 'lead' END, mc.verdict)
END,
m.extracted,
'null'::jsonb
)::text,
COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at
FROM messages m
JOIN channels src ON src.id = m.channel_id
JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id
JOIN sections s ON s.id = c.section_id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE `+where+`
ORDER BY CASE WHEN c.id = src.id THEN 0 ELSE 1 END
LIMIT 1
@@ -1106,7 +1156,7 @@ func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Re
writeDBError(w, err)
return
}
messageFrom := `FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id WHERE ` + where
messageFrom := `FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE ` + where
if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom, args...).Scan(&messagesTotal); err != nil {
writeDBError(w, err)
return
@@ -1122,7 +1172,7 @@ func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Re
field = "is_lead"
}
leadArgs := append(append([]any{}, args...), key, field)
leadClause := fmt.Sprintf(" AND m.extracted -> $%d ->> $%d = 'true'", len(leadArgs)-1, len(leadArgs))
leadClause := fmt.Sprintf(" AND COALESCE(mc.verdict ->> $%d, m.extracted -> $%d ->> $%d) = 'true'", len(leadArgs), len(leadArgs)-1, len(leadArgs))
if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause, leadArgs...).Scan(&leadsTotal); err != nil {
writeDBError(w, err)
return
@@ -1199,7 +1249,7 @@ func (a *app) handleLLMQueue(ctx context.Context, w http.ResponseWriter, r *http
func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, section string) (int64, error) {
args := []any{vertical}
where := `c.vertical = $1 AND m.text IS NOT NULL AND ((c.vertical = 'hr' AND (m.extracted IS NULL OR m.extracted->'hr_lead' IS NULL)) OR (c.vertical <> 'hr' AND (m.extracted IS NULL OR m.extracted->'lead' IS NULL)))`
where := `c.vertical = $1 AND m.text IS NOT NULL AND mc.id IS NULL`
if section != "" {
args = append(args, section)
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
@@ -1215,6 +1265,7 @@ func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, secti
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
JOIN messages m ON m.channel_id = src.id
JOIN sections s ON s.id = c.section_id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE `+where, args...).Scan(&pending)
return pending, err
}