Share Telegram channels across sections
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 43s

This commit is contained in:
Grendgi
2026-06-08 23:54:49 +03:00
parent ddea7002f1
commit 2d0d751115
4 changed files with 146 additions and 54 deletions

View File

@@ -0,0 +1,43 @@
"""channel aliases across department sections
Revision ID: 0011
Revises: 0010
Create Date: 2026-06-08
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0011"
down_revision: Union[str, None] = "0010"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column("channels", sa.Column("source_channel_id", sa.Integer(), nullable=True))
op.create_foreign_key(
"fk_channels_source_channel_id",
"channels",
"channels",
["source_channel_id"],
["id"],
ondelete="SET NULL",
)
op.drop_constraint("channels_identifier_key", "channels", type_="unique")
op.create_unique_constraint(
"uq_channels_section_identifier",
"channels",
["section_id", "identifier"],
)
op.create_index("ix_channels_source_channel_id", "channels", ["source_channel_id"])
def downgrade() -> None:
op.drop_index("ix_channels_source_channel_id", table_name="channels")
op.drop_constraint("uq_channels_section_identifier", "channels", type_="unique")
op.create_unique_constraint("channels_identifier_key", "channels", ["identifier"])
op.drop_constraint("fk_channels_source_channel_id", "channels", type_="foreignkey")
op.drop_column("channels", "source_channel_id")

View File

@@ -92,6 +92,7 @@ type sectionOut struct {
type channelOut struct {
ID int64 `json:"id"`
TGID *int64 `json:"tg_id,omitempty"`
SourceChannelID *int64 `json:"source_channel_id,omitempty"`
Identifier string `json:"identifier"`
Title *string `json:"title,omitempty"`
Vertical string `json:"vertical"`
@@ -287,12 +288,14 @@ func (a *app) listSections(ctx context.Context, w http.ResponseWriter, r *http.R
(SELECT count(*) FROM channels c WHERE c.section_id = s.id),
(SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true),
(SELECT count(*)
FROM messages m
JOIN channels c ON c.id = m.channel_id
FROM channels c
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
JOIN messages m ON m.channel_id = src.id
WHERE c.section_id = s.id),
(SELECT count(*)
FROM messages m
JOIN channels c ON c.id = m.channel_id
FROM channels c
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
JOIN messages m ON m.channel_id = src.id
WHERE c.section_id = s.id
AND (
(s.vertical = 'hr' AND m.extracted->'hr_lead'->>'is_lead' = 'true')
@@ -448,7 +451,7 @@ func (a *app) updateSection(ctx context.Context, w http.ResponseWriter, r *http.
RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at,
(SELECT count(*) FROM channels c WHERE c.section_id = sections.id),
(SELECT count(*) FROM channels c WHERE c.section_id = sections.id AND c.is_active = true),
(SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = sections.id),
(SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id WHERE c.section_id = sections.id),
0::bigint
`, args...)
item, err := scanSectionRow(row)
@@ -496,7 +499,7 @@ func (a *app) findSection(ctx context.Context, vertical, slug string, scope acce
SELECT s.id, s.vertical, COALESCE(s.department_id, ''), s.slug, s.title, COALESCE(s.emoji, ''), COALESCE(s.description, ''), s.created_at,
(SELECT count(*) FROM channels c WHERE c.section_id = s.id),
(SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true),
(SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = s.id),
(SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id WHERE c.section_id = s.id),
0::bigint
FROM sections s
WHERE `+where+`
@@ -536,10 +539,13 @@ func (a *app) listChannels(ctx context.Context, w http.ResponseWriter, r *http.R
where += fmt.Sprintf(" AND s.department_id = $%d", len(args))
}
rows, err := a.db.Query(ctx, `
SELECT c.id, c.tg_id, c.identifier, c.title, c.vertical, c.section_id, s.slug,
c.is_active, c.last_message_id, c.last_polled_at, c.created_at
SELECT c.id, COALESCE(c.tg_id, src.tg_id), c.source_channel_id,
c.identifier, COALESCE(c.title, src.title), c.vertical, c.section_id, s.slug,
c.is_active, COALESCE(c.last_message_id, src.last_message_id),
COALESCE(c.last_polled_at, src.last_polled_at), c.created_at
FROM channels c
JOIN sections s ON s.id = c.section_id
LEFT JOIN channels src ON src.id = c.source_channel_id
WHERE `+where+`
ORDER BY c.created_at DESC, c.id DESC
`, args...)
@@ -592,7 +598,9 @@ func (a *app) createChannel(ctx context.Context, w http.ResponseWriter, r *http.
row := a.db.QueryRow(ctx, `
INSERT INTO channels (identifier, vertical, section_id, is_active)
VALUES ($1, $2, $3, true)
RETURNING id, tg_id, identifier, title, vertical, section_id, $4::text, is_active, last_message_id, last_polled_at, created_at
ON CONFLICT ON CONSTRAINT uq_channels_section_identifier DO UPDATE
SET is_active = true
RETURNING id, tg_id, source_channel_id, identifier, title, vertical, section_id, $4::text, is_active, last_message_id, last_polled_at, created_at
`, payload.Identifier, payload.Vertical, section.ID, section.Slug)
item, err := scanChannelRow(row)
if err != nil {
@@ -702,7 +710,7 @@ func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http.
UPDATE channels
SET `+strings.Join(set, ", ")+`
WHERE id = $`+strconv.Itoa(len(args))+`
RETURNING id, tg_id, identifier, title, vertical, section_id,
RETURNING id, tg_id, source_channel_id, identifier, title, vertical, section_id,
(SELECT slug FROM sections WHERE id = channels.section_id),
is_active, last_message_id, last_polled_at, created_at
`, args...)
@@ -746,10 +754,13 @@ func (a *app) findChannel(ctx context.Context, id int64, scope accessScope, vert
where += fmt.Sprintf(" AND s.department_id = $%d", len(args))
}
row := a.db.QueryRow(ctx, `
SELECT c.id, c.tg_id, c.identifier, c.title, c.vertical, c.section_id, s.slug,
c.is_active, c.last_message_id, c.last_polled_at, c.created_at
SELECT c.id, COALESCE(c.tg_id, src.tg_id), c.source_channel_id,
c.identifier, COALESCE(c.title, src.title), c.vertical, c.section_id, s.slug,
c.is_active, COALESCE(c.last_message_id, src.last_message_id),
COALESCE(c.last_polled_at, src.last_polled_at), c.created_at
FROM channels c
JOIN sections s ON s.id = c.section_id
LEFT JOIN channels src ON src.id = c.source_channel_id
WHERE `+where+`
`, args...)
return scanChannelRow(row)
@@ -772,16 +783,21 @@ func (a *app) channelStats(ctx context.Context, w http.ResponseWriter, r *http.R
key = "hr_lead"
field = "is_lead"
}
if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1`, id).Scan(&messages); err != nil {
sourceID := id
if ch.SourceChannelID != nil {
sourceID = *ch.SourceChannelID
}
if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1`, sourceID).Scan(&messages); err != nil {
writeDBError(w, err)
return
}
if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1 AND extracted -> $2 ->> $3 = 'true'`, id, key, field).Scan(&leads); err != nil {
if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1 AND extracted -> $2 ->> $3 = 'true'`, sourceID, key, field).Scan(&leads); err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]any{
"channel_id": id,
"source_channel_id": sourceID,
"messages_total": messages,
"leads_total": leads,
"last_polled_at": ch.LastPolledAt,
@@ -800,13 +816,17 @@ func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *ht
return
}
key := verdictKey(ch.Vertical)
sourceID := id
if ch.SourceChannelID != nil {
sourceID = *ch.SourceChannelID
}
tag, err := a.db.Exec(ctx, `
UPDATE messages
SET extracted = (
CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END
) - $1
WHERE channel_id = $2
`, key, id)
`, key, sourceID)
if err != nil {
writeDBError(w, err)
return
@@ -848,7 +868,7 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http
return
}
args = append(args, id)
where += fmt.Sprintf(" AND m.channel_id = $%d", len(args))
where += fmt.Sprintf(" AND c.id = $%d", len(args))
}
if text := strings.TrimSpace(q.Get("q")); text != "" {
args = append(args, "%"+text+"%")
@@ -871,12 +891,13 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http
fetchLimit := clampInt(limit*5, limit, 1000)
args = append(args, fetchLimit, offset)
rows, err := a.db.Query(ctx, `
SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int,
SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int,
m.date, m.text, m.sender_id, m.sender_username, m.sender_name,
c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text,
COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text,
COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at
FROM messages m
JOIN channels c ON c.id = m.channel_id
FROM channels c
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
JOIN messages m ON m.channel_id = src.id
JOIN sections s ON s.id = c.section_id
WHERE `+where+`
ORDER BY m.date DESC, m.id DESC
@@ -927,14 +948,17 @@ func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *h
where += fmt.Sprintf(" AND s.department_id = $%d", len(args))
}
row := a.db.QueryRow(ctx, `
SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int,
SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int,
m.date, m.text, m.sender_id, m.sender_username, m.sender_name,
c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text,
COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text,
COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at
FROM messages m
JOIN channels c ON c.id = m.channel_id
JOIN channels src ON src.id = m.channel_id
JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id
JOIN sections s ON s.id = c.section_id
WHERE `+where+`
ORDER BY CASE WHEN c.id = src.id THEN 0 ELSE 1 END
LIMIT 1
`, args...)
item, err := scanMessageRow(row)
if err != nil {
@@ -1022,23 +1046,20 @@ func (a *app) serveMinioMedia(w http.ResponseWriter, r *http.Request, key string
}
func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channelID int64) (bool, error) {
var dept sql.NullString
var allowed bool
err := a.db.QueryRow(ctx, `
SELECT s.department_id
SELECT COALESCE(bool_or(s.department_id = $2 OR $3::boolean), false)
FROM channels c
JOIN sections s ON s.id = c.section_id
WHERE c.id = $1
`, channelID).Scan(&dept)
WHERE c.id = $1 OR c.source_channel_id = $1
`, channelID, scope.DeptID, scope.IsAdmin).Scan(&allowed)
if errors.Is(err, pgx.ErrNoRows) {
return false, nil
}
if err != nil {
return false, err
}
if scope.IsAdmin {
return true, nil
}
return dept.Valid && dept.String == scope.DeptID, nil
return allowed, nil
}
func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Request) {
@@ -1081,7 +1102,7 @@ func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Re
writeDBError(w, err)
return
}
messageFrom := `FROM messages m JOIN channels c ON c.id = m.channel_id JOIN sections s ON s.id = c.section_id WHERE ` + where
messageFrom := `FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id WHERE ` + where
if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom, args...).Scan(&messagesTotal); err != nil {
writeDBError(w, err)
return
@@ -1186,8 +1207,9 @@ func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, secti
var pending int64
err := a.db.QueryRow(ctx, `
SELECT count(*)
FROM messages m
JOIN channels c ON c.id = m.channel_id
FROM channels c
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
JOIN messages m ON m.channel_id = src.id
JOIN sections s ON s.id = c.section_id
WHERE `+where, args...).Scan(&pending)
return pending, err
@@ -1437,12 +1459,13 @@ func scanSectionRow(row pgx.Row) (sectionOut, error) {
func scanChannel(rows rowScanner) (channelOut, error) {
var item channelOut
var tgID, lastMsg sql.NullInt64
var tgID, sourceID, lastMsg sql.NullInt64
var title, slug sql.NullString
var lastPoll sql.NullTime
err := rows.Scan(
&item.ID,
&tgID,
&sourceID,
&item.Identifier,
&title,
&item.Vertical,
@@ -1454,6 +1477,7 @@ func scanChannel(rows rowScanner) (channelOut, error) {
&item.CreatedAt,
)
item.TGID = nullInt(tgID)
item.SourceChannelID = nullInt(sourceID)
item.Title = nullString(title)
item.SectionSlug = nullString(slug)
item.LastMessageID = nullInt(lastMsg)

View File

@@ -54,12 +54,16 @@ class Section(Base):
class Channel(Base):
__tablename__ = "channels"
__table_args__ = (
UniqueConstraint("section_id", "identifier", name="uq_channels_section_identifier"),
Index("ix_channels_source_channel_id", "source_channel_id"),
)
id: Mapped[int] = mapped_column(primary_key=True)
# Telegram numeric channel id (peer id), nullable until first resolve
tg_id: Mapped[int | None] = mapped_column(BigInteger, unique=True, nullable=True)
# Username or t.me/joinchat link supplied by user
identifier: Mapped[str] = mapped_column(String(255), unique=True)
identifier: Mapped[str] = mapped_column(String(255))
title: Mapped[str | None] = mapped_column(String(512), nullable=True)
# 'real_estate' or 'hr' — picks which LLM prompt and lead schema is used
vertical: Mapped[str] = mapped_column(
@@ -68,6 +72,9 @@ class Channel(Base):
section_id: Mapped[int] = mapped_column(
ForeignKey("sections.id", ondelete="RESTRICT"), index=True
)
source_channel_id: Mapped[int | None] = mapped_column(
ForeignKey("channels.id", ondelete="SET NULL"), nullable=True
)
is_active: Mapped[bool] = mapped_column(default=True, server_default="true")
last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)

View File

@@ -92,6 +92,14 @@ async def poll_channel(channel_id: int) -> int:
channel = await session.get(Channel, channel_id)
if channel is None or not channel.is_active:
return 0
if channel.source_channel_id is not None:
source = await session.get(Channel, channel.source_channel_id)
if source is not None:
channel.tg_id = None
channel.title = channel.title or source.title
channel.last_message_id = source.last_message_id
channel.last_polled_at = source.last_polled_at
return 0
if channel.tg_id is None or channel.title is None:
try:
@@ -108,9 +116,19 @@ async def poll_channel(channel_id: int) -> int:
)
).scalar_one_or_none()
if duplicate_id is not None:
raise PollDuplicateChannelError(
f"Telegram channel is already connected to channel #{duplicate_id}"
source = await session.get(Channel, duplicate_id)
channel.source_channel_id = duplicate_id
channel.tg_id = None
channel.title = channel.title or resolved.title or (source.title if source else None)
channel.last_message_id = source.last_message_id if source else channel.last_message_id
channel.last_polled_at = source.last_polled_at if source else channel.last_polled_at
log.info(
"linked_channel_alias",
channel_id=channel.id,
source_channel_id=duplicate_id,
tg_id=resolved.tg_id,
)
return 0
channel.tg_id = resolved.tg_id
channel.title = resolved.title
try: