From bd3b54dc7d7d5ddb1b56c74b32ad05fd6821bb6b Mon Sep 17 00:00:00 2001 From: Grendgi Date: Wed, 17 Jun 2026 17:01:34 +0300 Subject: [PATCH] feat: track monitoring tg poll errors --- alembic/versions/0013_channel_poll_status.py | 34 ++++++++++++++++++ cmd/server/main.go | 38 ++++++++++++++++++++ src/parser_bot/db/models.py | 4 +++ src/parser_bot/scheduler/poller.py | 34 ++++++++++++++++++ 4 files changed, 110 insertions(+) create mode 100644 alembic/versions/0013_channel_poll_status.py diff --git a/alembic/versions/0013_channel_poll_status.py b/alembic/versions/0013_channel_poll_status.py new file mode 100644 index 0000000..052cdfd --- /dev/null +++ b/alembic/versions/0013_channel_poll_status.py @@ -0,0 +1,34 @@ +"""channel poll status + +Revision ID: 0013 +Revises: 0012 +Create Date: 2026-06-17 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +revision: str = "0013" +down_revision: Union[str, None] = "0012" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column("channels", sa.Column("last_poll_status", sa.String(length=32), nullable=True)) + op.add_column("channels", sa.Column("last_poll_error_code", sa.String(length=64), nullable=True)) + op.add_column("channels", sa.Column("last_poll_error", sa.Text(), nullable=True)) + op.add_column("channels", sa.Column("last_poll_error_at", sa.DateTime(timezone=True), nullable=True)) + op.create_index("ix_channels_last_poll_status", "channels", ["last_poll_status"]) + op.create_index("ix_channels_last_poll_error_code", "channels", ["last_poll_error_code"]) + + +def downgrade() -> None: + op.drop_index("ix_channels_last_poll_error_code", table_name="channels") + op.drop_index("ix_channels_last_poll_status", table_name="channels") + op.drop_column("channels", "last_poll_error_at") + op.drop_column("channels", "last_poll_error") + op.drop_column("channels", "last_poll_error_code") + op.drop_column("channels", "last_poll_status") diff --git a/cmd/server/main.go b/cmd/server/main.go index 47b78a2..9a39433 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -266,6 +266,7 @@ func (a *app) handleHealthDetail(w http.ResponseWriter, r *http.Request) { a.probeClassificationQueue(ctx), a.probeAIJobs(ctx), a.probePoller(ctx), + a.probePollErrors(ctx), a.probeMediaStorage(ctx), a.probeMediaMetadata(ctx), } @@ -423,6 +424,43 @@ func (a *app) probePoller(ctx context.Context) componentProbe { return componentProbe{Name: "poller", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} } +func (a *app) probePollErrors(ctx context.Context) componentProbe { + start := time.Now() + var total, floodWait, unavailable, other int64 + err := a.db.QueryRow(ctx, ` + SELECT + COUNT(*) FILTER (WHERE last_poll_status = 'error')::bigint, + COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_flood_wait')::bigint, + COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_channel_unavailable')::bigint, + COUNT(*) FILTER ( + WHERE last_poll_status = 'error' + AND COALESCE(last_poll_error_code, '') NOT IN ('telegram_flood_wait', 'telegram_channel_unavailable') + )::bigint + FROM channels + WHERE is_active = true + AND source_channel_id IS NULL`, + ).Scan(&total, &floodWait, &unavailable, &other) + if err != nil { + return componentProbe{Name: "poll_errors", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} + } + if total > 0 { + status := "degraded" + if other > 0 { + status = "down" + } + return componentProbe{ + Name: "poll_errors", + Status: status, + LatencyMs: time.Since(start).Milliseconds(), + Error: "total=" + strconv.FormatInt(total, 10) + + " flood_wait=" + strconv.FormatInt(floodWait, 10) + + " unavailable=" + strconv.FormatInt(unavailable, 10) + + " other=" + strconv.FormatInt(other, 10), + } + } + return componentProbe{Name: "poll_errors", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} +} + func (a *app) probeMediaStorage(ctx context.Context) componentProbe { start := time.Now() if a.minio == nil || a.cfg.MinioBucket == "" { diff --git a/src/parser_bot/db/models.py b/src/parser_bot/db/models.py index add0244..0e65b81 100644 --- a/src/parser_bot/db/models.py +++ b/src/parser_bot/db/models.py @@ -78,6 +78,10 @@ class Channel(Base): is_active: Mapped[bool] = mapped_column(default=True, server_default="true") last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True) last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + last_poll_status: Mapped[str | None] = mapped_column(String(32), nullable=True) + last_poll_error_code: Mapped[str | None] = mapped_column(String(64), nullable=True) + last_poll_error: Mapped[str | None] = mapped_column(Text, nullable=True) + last_poll_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now() ) diff --git a/src/parser_bot/scheduler/poller.py b/src/parser_bot/scheduler/poller.py index a2531ec..428bf0a 100644 --- a/src/parser_bot/scheduler/poller.py +++ b/src/parser_bot/scheduler/poller.py @@ -83,6 +83,17 @@ def _translate_telegram_error(exc: Exception, identifier: str) -> PollError: async def poll_channel(channel_id: int) -> int: """Poll one channel for new messages. Returns count of inserted rows.""" + try: + return await _poll_channel(channel_id) + except PollError as exc: + await _record_poll_error(channel_id, exc.code, exc.message) + raise + except Exception as exc: + await _record_poll_error(channel_id, "poll_failed", str(exc)) + raise + + +async def _poll_channel(channel_id: int) -> int: if not await is_authorized(): raise PollUnauthorizedError( "Telegram is not authorized: open Monitoring TG in Portal and authorize it" @@ -99,6 +110,10 @@ async def poll_channel(channel_id: int) -> int: channel.title = channel.title or source.title channel.last_message_id = source.last_message_id channel.last_polled_at = source.last_polled_at + channel.last_poll_status = "alias" + channel.last_poll_error_code = None + channel.last_poll_error = None + channel.last_poll_error_at = None return 0 if channel.tg_id is None or channel.title is None: @@ -122,6 +137,10 @@ async def poll_channel(channel_id: int) -> int: channel.title = channel.title or resolved.title or (source.title if source else None) channel.last_message_id = source.last_message_id if source else channel.last_message_id channel.last_polled_at = source.last_polled_at if source else channel.last_polled_at + channel.last_poll_status = "alias" + channel.last_poll_error_code = None + channel.last_poll_error = None + channel.last_poll_error_at = None log.info( "linked_channel_alias", channel_id=channel.id, @@ -180,6 +199,10 @@ async def poll_channel(channel_id: int) -> int: channel.last_message_id or 0, msgs[-1].tg_message_id ) channel.last_polled_at = datetime.now(timezone.utc) + channel.last_poll_status = "ok" + channel.last_poll_error_code = None + channel.last_poll_error = None + channel.last_poll_error_at = None log.info( "polled_channel", @@ -191,6 +214,17 @@ async def poll_channel(channel_id: int) -> int: return inserted +async def _record_poll_error(channel_id: int, code: str, message: str) -> None: + async with session_scope() as session: + channel = await session.get(Channel, channel_id) + if channel is None: + return + channel.last_poll_status = "error" + channel.last_poll_error_code = code + channel.last_poll_error = message[:1000] + channel.last_poll_error_at = datetime.now(timezone.utc) + + async def poll_all() -> None: if not await is_authorized(): log.debug("poll_skipped_not_authorized")