diff --git a/alembic/versions/0011_channel_aliases.py b/alembic/versions/0011_channel_aliases.py new file mode 100644 index 0000000..716bcf5 --- /dev/null +++ b/alembic/versions/0011_channel_aliases.py @@ -0,0 +1,43 @@ +"""channel aliases across department sections + +Revision ID: 0011 +Revises: 0010 +Create Date: 2026-06-08 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +revision: str = "0011" +down_revision: Union[str, None] = "0010" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column("channels", sa.Column("source_channel_id", sa.Integer(), nullable=True)) + op.create_foreign_key( + "fk_channels_source_channel_id", + "channels", + "channels", + ["source_channel_id"], + ["id"], + ondelete="SET NULL", + ) + op.drop_constraint("channels_identifier_key", "channels", type_="unique") + op.create_unique_constraint( + "uq_channels_section_identifier", + "channels", + ["section_id", "identifier"], + ) + op.create_index("ix_channels_source_channel_id", "channels", ["source_channel_id"]) + + +def downgrade() -> None: + op.drop_index("ix_channels_source_channel_id", table_name="channels") + op.drop_constraint("uq_channels_section_identifier", "channels", type_="unique") + op.create_unique_constraint("channels_identifier_key", "channels", ["identifier"]) + op.drop_constraint("fk_channels_source_channel_id", "channels", type_="foreignkey") + op.drop_column("channels", "source_channel_id") diff --git a/cmd/server/main.go b/cmd/server/main.go index 1f30bff..10e63cc 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -90,17 +90,18 @@ type sectionOut struct { } type channelOut struct { - ID int64 `json:"id"` - TGID *int64 `json:"tg_id,omitempty"` - Identifier string `json:"identifier"` - Title *string `json:"title,omitempty"` - Vertical string `json:"vertical"` - SectionID int64 `json:"section_id"` - SectionSlug *string `json:"section_slug,omitempty"` - IsActive bool `json:"is_active"` - LastMessageID *int64 `json:"last_message_id,omitempty"` - LastPolledAt *time.Time `json:"last_polled_at,omitempty"` - CreatedAt time.Time `json:"created_at"` + ID int64 `json:"id"` + TGID *int64 `json:"tg_id,omitempty"` + SourceChannelID *int64 `json:"source_channel_id,omitempty"` + Identifier string `json:"identifier"` + Title *string `json:"title,omitempty"` + Vertical string `json:"vertical"` + SectionID int64 `json:"section_id"` + SectionSlug *string `json:"section_slug,omitempty"` + IsActive bool `json:"is_active"` + LastMessageID *int64 `json:"last_message_id,omitempty"` + LastPolledAt *time.Time `json:"last_polled_at,omitempty"` + CreatedAt time.Time `json:"created_at"` } type messageOut struct { @@ -287,12 +288,14 @@ func (a *app) listSections(ctx context.Context, w http.ResponseWriter, r *http.R (SELECT count(*) FROM channels c WHERE c.section_id = s.id), (SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true), (SELECT count(*) - FROM messages m - JOIN channels c ON c.id = m.channel_id + FROM channels c + JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) + JOIN messages m ON m.channel_id = src.id WHERE c.section_id = s.id), (SELECT count(*) - FROM messages m - JOIN channels c ON c.id = m.channel_id + FROM channels c + JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) + JOIN messages m ON m.channel_id = src.id WHERE c.section_id = s.id AND ( (s.vertical = 'hr' AND m.extracted->'hr_lead'->>'is_lead' = 'true') @@ -448,7 +451,7 @@ func (a *app) updateSection(ctx context.Context, w http.ResponseWriter, r *http. RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at, (SELECT count(*) FROM channels c WHERE c.section_id = sections.id), (SELECT count(*) FROM channels c WHERE c.section_id = sections.id AND c.is_active = true), - (SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = sections.id), + (SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id WHERE c.section_id = sections.id), 0::bigint `, args...) item, err := scanSectionRow(row) @@ -496,7 +499,7 @@ func (a *app) findSection(ctx context.Context, vertical, slug string, scope acce SELECT s.id, s.vertical, COALESCE(s.department_id, ''), s.slug, s.title, COALESCE(s.emoji, ''), COALESCE(s.description, ''), s.created_at, (SELECT count(*) FROM channels c WHERE c.section_id = s.id), (SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true), - (SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = s.id), + (SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id WHERE c.section_id = s.id), 0::bigint FROM sections s WHERE `+where+` @@ -536,10 +539,13 @@ func (a *app) listChannels(ctx context.Context, w http.ResponseWriter, r *http.R where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } rows, err := a.db.Query(ctx, ` - SELECT c.id, c.tg_id, c.identifier, c.title, c.vertical, c.section_id, s.slug, - c.is_active, c.last_message_id, c.last_polled_at, c.created_at + SELECT c.id, COALESCE(c.tg_id, src.tg_id), c.source_channel_id, + c.identifier, COALESCE(c.title, src.title), c.vertical, c.section_id, s.slug, + c.is_active, COALESCE(c.last_message_id, src.last_message_id), + COALESCE(c.last_polled_at, src.last_polled_at), c.created_at FROM channels c JOIN sections s ON s.id = c.section_id + LEFT JOIN channels src ON src.id = c.source_channel_id WHERE `+where+` ORDER BY c.created_at DESC, c.id DESC `, args...) @@ -592,7 +598,9 @@ func (a *app) createChannel(ctx context.Context, w http.ResponseWriter, r *http. row := a.db.QueryRow(ctx, ` INSERT INTO channels (identifier, vertical, section_id, is_active) VALUES ($1, $2, $3, true) - RETURNING id, tg_id, identifier, title, vertical, section_id, $4::text, is_active, last_message_id, last_polled_at, created_at + ON CONFLICT ON CONSTRAINT uq_channels_section_identifier DO UPDATE + SET is_active = true + RETURNING id, tg_id, source_channel_id, identifier, title, vertical, section_id, $4::text, is_active, last_message_id, last_polled_at, created_at `, payload.Identifier, payload.Vertical, section.ID, section.Slug) item, err := scanChannelRow(row) if err != nil { @@ -702,7 +710,7 @@ func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http. UPDATE channels SET `+strings.Join(set, ", ")+` WHERE id = $`+strconv.Itoa(len(args))+` - RETURNING id, tg_id, identifier, title, vertical, section_id, + RETURNING id, tg_id, source_channel_id, identifier, title, vertical, section_id, (SELECT slug FROM sections WHERE id = channels.section_id), is_active, last_message_id, last_polled_at, created_at `, args...) @@ -746,10 +754,13 @@ func (a *app) findChannel(ctx context.Context, id int64, scope accessScope, vert where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` - SELECT c.id, c.tg_id, c.identifier, c.title, c.vertical, c.section_id, s.slug, - c.is_active, c.last_message_id, c.last_polled_at, c.created_at + SELECT c.id, COALESCE(c.tg_id, src.tg_id), c.source_channel_id, + c.identifier, COALESCE(c.title, src.title), c.vertical, c.section_id, s.slug, + c.is_active, COALESCE(c.last_message_id, src.last_message_id), + COALESCE(c.last_polled_at, src.last_polled_at), c.created_at FROM channels c JOIN sections s ON s.id = c.section_id + LEFT JOIN channels src ON src.id = c.source_channel_id WHERE `+where+` `, args...) return scanChannelRow(row) @@ -772,20 +783,25 @@ func (a *app) channelStats(ctx context.Context, w http.ResponseWriter, r *http.R key = "hr_lead" field = "is_lead" } - if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1`, id).Scan(&messages); err != nil { + sourceID := id + if ch.SourceChannelID != nil { + sourceID = *ch.SourceChannelID + } + if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1`, sourceID).Scan(&messages); err != nil { writeDBError(w, err) return } - if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1 AND extracted -> $2 ->> $3 = 'true'`, id, key, field).Scan(&leads); err != nil { + if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1 AND extracted -> $2 ->> $3 = 'true'`, sourceID, key, field).Scan(&leads); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{ - "channel_id": id, - "messages_total": messages, - "leads_total": leads, - "last_polled_at": ch.LastPolledAt, - "last_message_id": ch.LastMessageID, + "channel_id": id, + "source_channel_id": sourceID, + "messages_total": messages, + "leads_total": leads, + "last_polled_at": ch.LastPolledAt, + "last_message_id": ch.LastMessageID, }) } @@ -800,13 +816,17 @@ func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *ht return } key := verdictKey(ch.Vertical) + sourceID := id + if ch.SourceChannelID != nil { + sourceID = *ch.SourceChannelID + } tag, err := a.db.Exec(ctx, ` UPDATE messages SET extracted = ( CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END ) - $1 WHERE channel_id = $2 - `, key, id) + `, key, sourceID) if err != nil { writeDBError(w, err) return @@ -848,7 +868,7 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http return } args = append(args, id) - where += fmt.Sprintf(" AND m.channel_id = $%d", len(args)) + where += fmt.Sprintf(" AND c.id = $%d", len(args)) } if text := strings.TrimSpace(q.Get("q")); text != "" { args = append(args, "%"+text+"%") @@ -871,12 +891,13 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http fetchLimit := clampInt(limit*5, limit, 1000) args = append(args, fetchLimit, offset) rows, err := a.db.Query(ctx, ` - SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, + SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, - c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, + COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at - FROM messages m - JOIN channels c ON c.id = m.channel_id + FROM channels c + JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) + JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id WHERE `+where+` ORDER BY m.date DESC, m.id DESC @@ -927,14 +948,17 @@ func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *h where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` - SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, + SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, - c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, + COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at FROM messages m - JOIN channels c ON c.id = m.channel_id + JOIN channels src ON src.id = m.channel_id + JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id JOIN sections s ON s.id = c.section_id WHERE `+where+` + ORDER BY CASE WHEN c.id = src.id THEN 0 ELSE 1 END + LIMIT 1 `, args...) item, err := scanMessageRow(row) if err != nil { @@ -1022,23 +1046,20 @@ func (a *app) serveMinioMedia(w http.ResponseWriter, r *http.Request, key string } func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channelID int64) (bool, error) { - var dept sql.NullString + var allowed bool err := a.db.QueryRow(ctx, ` - SELECT s.department_id + SELECT COALESCE(bool_or(s.department_id = $2 OR $3::boolean), false) FROM channels c JOIN sections s ON s.id = c.section_id - WHERE c.id = $1 - `, channelID).Scan(&dept) + WHERE c.id = $1 OR c.source_channel_id = $1 + `, channelID, scope.DeptID, scope.IsAdmin).Scan(&allowed) if errors.Is(err, pgx.ErrNoRows) { return false, nil } if err != nil { return false, err } - if scope.IsAdmin { - return true, nil - } - return dept.Valid && dept.String == scope.DeptID, nil + return allowed, nil } func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Request) { @@ -1081,7 +1102,7 @@ func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Re writeDBError(w, err) return } - messageFrom := `FROM messages m JOIN channels c ON c.id = m.channel_id JOIN sections s ON s.id = c.section_id WHERE ` + where + messageFrom := `FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id WHERE ` + where if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom, args...).Scan(&messagesTotal); err != nil { writeDBError(w, err) return @@ -1186,8 +1207,9 @@ func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, secti var pending int64 err := a.db.QueryRow(ctx, ` SELECT count(*) - FROM messages m - JOIN channels c ON c.id = m.channel_id + FROM channels c + JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) + JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id WHERE `+where, args...).Scan(&pending) return pending, err @@ -1437,12 +1459,13 @@ func scanSectionRow(row pgx.Row) (sectionOut, error) { func scanChannel(rows rowScanner) (channelOut, error) { var item channelOut - var tgID, lastMsg sql.NullInt64 + var tgID, sourceID, lastMsg sql.NullInt64 var title, slug sql.NullString var lastPoll sql.NullTime err := rows.Scan( &item.ID, &tgID, + &sourceID, &item.Identifier, &title, &item.Vertical, @@ -1454,6 +1477,7 @@ func scanChannel(rows rowScanner) (channelOut, error) { &item.CreatedAt, ) item.TGID = nullInt(tgID) + item.SourceChannelID = nullInt(sourceID) item.Title = nullString(title) item.SectionSlug = nullString(slug) item.LastMessageID = nullInt(lastMsg) diff --git a/src/parser_bot/db/models.py b/src/parser_bot/db/models.py index 9f8e20c..021075f 100644 --- a/src/parser_bot/db/models.py +++ b/src/parser_bot/db/models.py @@ -54,12 +54,16 @@ class Section(Base): class Channel(Base): __tablename__ = "channels" + __table_args__ = ( + UniqueConstraint("section_id", "identifier", name="uq_channels_section_identifier"), + Index("ix_channels_source_channel_id", "source_channel_id"), + ) id: Mapped[int] = mapped_column(primary_key=True) # Telegram numeric channel id (peer id), nullable until first resolve tg_id: Mapped[int | None] = mapped_column(BigInteger, unique=True, nullable=True) # Username or t.me/joinchat link supplied by user - identifier: Mapped[str] = mapped_column(String(255), unique=True) + identifier: Mapped[str] = mapped_column(String(255)) title: Mapped[str | None] = mapped_column(String(512), nullable=True) # 'real_estate' or 'hr' — picks which LLM prompt and lead schema is used vertical: Mapped[str] = mapped_column( @@ -68,6 +72,9 @@ class Channel(Base): section_id: Mapped[int] = mapped_column( ForeignKey("sections.id", ondelete="RESTRICT"), index=True ) + source_channel_id: Mapped[int | None] = mapped_column( + ForeignKey("channels.id", ondelete="SET NULL"), nullable=True + ) is_active: Mapped[bool] = mapped_column(default=True, server_default="true") last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True) last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) diff --git a/src/parser_bot/scheduler/poller.py b/src/parser_bot/scheduler/poller.py index 66d29e5..a2531ec 100644 --- a/src/parser_bot/scheduler/poller.py +++ b/src/parser_bot/scheduler/poller.py @@ -92,6 +92,14 @@ async def poll_channel(channel_id: int) -> int: channel = await session.get(Channel, channel_id) if channel is None or not channel.is_active: return 0 + if channel.source_channel_id is not None: + source = await session.get(Channel, channel.source_channel_id) + if source is not None: + channel.tg_id = None + channel.title = channel.title or source.title + channel.last_message_id = source.last_message_id + channel.last_polled_at = source.last_polled_at + return 0 if channel.tg_id is None or channel.title is None: try: @@ -108,9 +116,19 @@ async def poll_channel(channel_id: int) -> int: ) ).scalar_one_or_none() if duplicate_id is not None: - raise PollDuplicateChannelError( - f"Telegram channel is already connected to channel #{duplicate_id}" + source = await session.get(Channel, duplicate_id) + channel.source_channel_id = duplicate_id + channel.tg_id = None + channel.title = channel.title or resolved.title or (source.title if source else None) + channel.last_message_id = source.last_message_id if source else channel.last_message_id + channel.last_polled_at = source.last_polled_at if source else channel.last_polled_at + log.info( + "linked_channel_alias", + channel_id=channel.id, + source_channel_id=duplicate_id, + tg_id=resolved.tg_id, ) + return 0 channel.tg_id = resolved.tg_id channel.title = resolved.title try: