From 5165b31910108099ee219c6a5fd2c16d6a9c6211 Mon Sep 17 00:00:00 2001 From: Grendgi Date: Mon, 8 Jun 2026 23:14:29 +0300 Subject: [PATCH] Handle Telegram poll failures gracefully --- src/parser_bot/api/routes.py | 18 ++++- src/parser_bot/scheduler/poller.py | 113 +++++++++++++++++++++++++++-- 2 files changed, 122 insertions(+), 9 deletions(-) diff --git a/src/parser_bot/api/routes.py b/src/parser_bot/api/routes.py index 4e63d16..dbdf640 100644 --- a/src/parser_bot/api/routes.py +++ b/src/parser_bot/api/routes.py @@ -14,7 +14,7 @@ from parser_bot.access import ( from parser_bot.config import settings from parser_bot.db.models import Channel, Section from parser_bot.db.session import get_session -from parser_bot.scheduler.poller import backfill_media, poll_channel +from parser_bot.scheduler.poller import PollError, backfill_media, poll_channel from parser_bot.telegram import client as tg router = APIRouter() @@ -72,6 +72,17 @@ async def _require_channel_scope( raise HTTPException(status_code=404) +def _poll_http_error(exc: PollError) -> HTTPException: + headers = None + if exc.retry_after is not None: + headers = {"Retry-After": str(exc.retry_after)} + return HTTPException( + status_code=exc.status_code, + detail={"code": exc.code, "message": exc.message}, + headers=headers, + ) + + @router.get("/auth/status", response_model=AuthStatus, dependencies=[Depends(require_telegram_auth_manager)]) async def auth_status() -> AuthStatus: authorized = await tg.is_authorized() @@ -128,7 +139,10 @@ async def trigger_poll( session: AsyncSession = Depends(get_session), ) -> dict[str, int]: await _require_channel_scope(session, request, channel_id, vertical, section) - inserted = await poll_channel(channel_id) + try: + inserted = await poll_channel(channel_id) + except PollError as exc: + raise _poll_http_error(exc) return {"inserted": inserted} diff --git a/src/parser_bot/scheduler/poller.py b/src/parser_bot/scheduler/poller.py index 22aa0b3..66d29e5 100644 --- a/src/parser_bot/scheduler/poller.py +++ b/src/parser_bot/scheduler/poller.py @@ -4,6 +4,7 @@ import structlog from apscheduler.schedulers.asyncio import AsyncIOScheduler from sqlalchemy import func, select from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.exc import IntegrityError from parser_bot.config import settings from parser_bot.db.models import Channel, Message @@ -19,24 +20,115 @@ from parser_bot.telegram.client import ( log = structlog.get_logger() +class PollError(RuntimeError): + status_code = 400 + code = "poll_failed" + retry_after: int | None = None + + def __init__(self, message: str) -> None: + super().__init__(message) + self.message = message + + +class PollUnauthorizedError(PollError): + status_code = 401 + code = "telegram_not_authorized" + + +class PollChannelUnavailableError(PollError): + status_code = 422 + code = "telegram_channel_unavailable" + + +class PollFloodWaitError(PollError): + status_code = 429 + code = "telegram_flood_wait" + + def __init__(self, seconds: int | None) -> None: + self.retry_after = seconds + wait = f"{seconds} sec" if seconds else "a while" + super().__init__(f"Telegram asked to wait {wait} before retrying") + + +class PollDuplicateChannelError(PollError): + status_code = 409 + code = "telegram_channel_duplicate" + + +def _translate_telegram_error(exc: Exception, identifier: str) -> PollError: + name = type(exc).__name__ + message = str(exc) + lower = message.lower() + if name == "FloodWaitError": + return PollFloodWaitError(getattr(exc, "seconds", None)) + if name in { + "ChannelPrivateError", + "UsernameInvalidError", + "UsernameNotOccupiedError", + "InviteHashExpiredError", + "InviteHashInvalidError", + "ChatAdminRequiredError", + }: + return PollChannelUnavailableError(message) + if ( + "cannot get entity" in lower + or "cannot find any entity" in lower + or "not part of" in lower + or "join the group" in lower + or "invalid channel" in lower + ): + return PollChannelUnavailableError(message) + return PollError(f"Cannot poll {identifier}: {message}") + + async def poll_channel(channel_id: int) -> int: """Poll one channel for new messages. Returns count of inserted rows.""" + if not await is_authorized(): + raise PollUnauthorizedError( + "Telegram is not authorized: open Monitoring TG in Portal and authorize it" + ) + async with session_scope() as session: channel = await session.get(Channel, channel_id) if channel is None or not channel.is_active: return 0 if channel.tg_id is None or channel.title is None: - resolved = await resolve_channel(channel.identifier) + try: + resolved = await resolve_channel(channel.identifier) + except Exception as exc: + raise _translate_telegram_error(exc, channel.identifier) from exc + + duplicate_id = ( + await session.execute( + select(Channel.id).where( + Channel.tg_id == resolved.tg_id, + Channel.id != channel.id, + ) + ) + ).scalar_one_or_none() + if duplicate_id is not None: + raise PollDuplicateChannelError( + f"Telegram channel is already connected to channel #{duplicate_id}" + ) channel.tg_id = resolved.tg_id channel.title = resolved.title + try: + await session.flush() + except IntegrityError as exc: + raise PollDuplicateChannelError( + "Telegram channel is already connected to another channel" + ) from exc - msgs = await fetch_new_messages( - channel.identifier, - min_id=channel.last_message_id, - limit=settings.poll_history_limit, - download_media_for_channel_id=channel.id, - ) + try: + msgs = await fetch_new_messages( + channel.identifier, + min_id=channel.last_message_id, + limit=settings.poll_history_limit, + download_media_for_channel_id=channel.id, + ) + except Exception as exc: + raise _translate_telegram_error(exc, channel.identifier) from exc inserted = 0 for m in msgs: @@ -93,6 +185,13 @@ async def poll_all() -> None: for channel_id in ids: try: await poll_channel(channel_id) + except PollError as exc: + log.warning( + "poll_skipped", + channel_id=channel_id, + code=exc.code, + error=exc.message, + ) except Exception as exc: log.error("poll_failed", channel_id=channel_id, error=str(exc))