Handle Telegram poll failures gracefully
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 31s
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 31s
This commit is contained in:
@@ -14,7 +14,7 @@ from parser_bot.access import (
|
|||||||
from parser_bot.config import settings
|
from parser_bot.config import settings
|
||||||
from parser_bot.db.models import Channel, Section
|
from parser_bot.db.models import Channel, Section
|
||||||
from parser_bot.db.session import get_session
|
from parser_bot.db.session import get_session
|
||||||
from parser_bot.scheduler.poller import backfill_media, poll_channel
|
from parser_bot.scheduler.poller import PollError, backfill_media, poll_channel
|
||||||
from parser_bot.telegram import client as tg
|
from parser_bot.telegram import client as tg
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
@@ -72,6 +72,17 @@ async def _require_channel_scope(
|
|||||||
raise HTTPException(status_code=404)
|
raise HTTPException(status_code=404)
|
||||||
|
|
||||||
|
|
||||||
|
def _poll_http_error(exc: PollError) -> HTTPException:
|
||||||
|
headers = None
|
||||||
|
if exc.retry_after is not None:
|
||||||
|
headers = {"Retry-After": str(exc.retry_after)}
|
||||||
|
return HTTPException(
|
||||||
|
status_code=exc.status_code,
|
||||||
|
detail={"code": exc.code, "message": exc.message},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/auth/status", response_model=AuthStatus, dependencies=[Depends(require_telegram_auth_manager)])
|
@router.get("/auth/status", response_model=AuthStatus, dependencies=[Depends(require_telegram_auth_manager)])
|
||||||
async def auth_status() -> AuthStatus:
|
async def auth_status() -> AuthStatus:
|
||||||
authorized = await tg.is_authorized()
|
authorized = await tg.is_authorized()
|
||||||
@@ -128,7 +139,10 @@ async def trigger_poll(
|
|||||||
session: AsyncSession = Depends(get_session),
|
session: AsyncSession = Depends(get_session),
|
||||||
) -> dict[str, int]:
|
) -> dict[str, int]:
|
||||||
await _require_channel_scope(session, request, channel_id, vertical, section)
|
await _require_channel_scope(session, request, channel_id, vertical, section)
|
||||||
|
try:
|
||||||
inserted = await poll_channel(channel_id)
|
inserted = await poll_channel(channel_id)
|
||||||
|
except PollError as exc:
|
||||||
|
raise _poll_http_error(exc)
|
||||||
return {"inserted": inserted}
|
return {"inserted": inserted}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import structlog
|
|||||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||||
from sqlalchemy import func, select
|
from sqlalchemy import func, select
|
||||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||||
|
from sqlalchemy.exc import IntegrityError
|
||||||
|
|
||||||
from parser_bot.config import settings
|
from parser_bot.config import settings
|
||||||
from parser_bot.db.models import Channel, Message
|
from parser_bot.db.models import Channel, Message
|
||||||
@@ -19,24 +20,115 @@ from parser_bot.telegram.client import (
|
|||||||
log = structlog.get_logger()
|
log = structlog.get_logger()
|
||||||
|
|
||||||
|
|
||||||
|
class PollError(RuntimeError):
|
||||||
|
status_code = 400
|
||||||
|
code = "poll_failed"
|
||||||
|
retry_after: int | None = None
|
||||||
|
|
||||||
|
def __init__(self, message: str) -> None:
|
||||||
|
super().__init__(message)
|
||||||
|
self.message = message
|
||||||
|
|
||||||
|
|
||||||
|
class PollUnauthorizedError(PollError):
|
||||||
|
status_code = 401
|
||||||
|
code = "telegram_not_authorized"
|
||||||
|
|
||||||
|
|
||||||
|
class PollChannelUnavailableError(PollError):
|
||||||
|
status_code = 422
|
||||||
|
code = "telegram_channel_unavailable"
|
||||||
|
|
||||||
|
|
||||||
|
class PollFloodWaitError(PollError):
|
||||||
|
status_code = 429
|
||||||
|
code = "telegram_flood_wait"
|
||||||
|
|
||||||
|
def __init__(self, seconds: int | None) -> None:
|
||||||
|
self.retry_after = seconds
|
||||||
|
wait = f"{seconds} sec" if seconds else "a while"
|
||||||
|
super().__init__(f"Telegram asked to wait {wait} before retrying")
|
||||||
|
|
||||||
|
|
||||||
|
class PollDuplicateChannelError(PollError):
|
||||||
|
status_code = 409
|
||||||
|
code = "telegram_channel_duplicate"
|
||||||
|
|
||||||
|
|
||||||
|
def _translate_telegram_error(exc: Exception, identifier: str) -> PollError:
|
||||||
|
name = type(exc).__name__
|
||||||
|
message = str(exc)
|
||||||
|
lower = message.lower()
|
||||||
|
if name == "FloodWaitError":
|
||||||
|
return PollFloodWaitError(getattr(exc, "seconds", None))
|
||||||
|
if name in {
|
||||||
|
"ChannelPrivateError",
|
||||||
|
"UsernameInvalidError",
|
||||||
|
"UsernameNotOccupiedError",
|
||||||
|
"InviteHashExpiredError",
|
||||||
|
"InviteHashInvalidError",
|
||||||
|
"ChatAdminRequiredError",
|
||||||
|
}:
|
||||||
|
return PollChannelUnavailableError(message)
|
||||||
|
if (
|
||||||
|
"cannot get entity" in lower
|
||||||
|
or "cannot find any entity" in lower
|
||||||
|
or "not part of" in lower
|
||||||
|
or "join the group" in lower
|
||||||
|
or "invalid channel" in lower
|
||||||
|
):
|
||||||
|
return PollChannelUnavailableError(message)
|
||||||
|
return PollError(f"Cannot poll {identifier}: {message}")
|
||||||
|
|
||||||
|
|
||||||
async def poll_channel(channel_id: int) -> int:
|
async def poll_channel(channel_id: int) -> int:
|
||||||
"""Poll one channel for new messages. Returns count of inserted rows."""
|
"""Poll one channel for new messages. Returns count of inserted rows."""
|
||||||
|
if not await is_authorized():
|
||||||
|
raise PollUnauthorizedError(
|
||||||
|
"Telegram is not authorized: open Monitoring TG in Portal and authorize it"
|
||||||
|
)
|
||||||
|
|
||||||
async with session_scope() as session:
|
async with session_scope() as session:
|
||||||
channel = await session.get(Channel, channel_id)
|
channel = await session.get(Channel, channel_id)
|
||||||
if channel is None or not channel.is_active:
|
if channel is None or not channel.is_active:
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
if channel.tg_id is None or channel.title is None:
|
if channel.tg_id is None or channel.title is None:
|
||||||
|
try:
|
||||||
resolved = await resolve_channel(channel.identifier)
|
resolved = await resolve_channel(channel.identifier)
|
||||||
|
except Exception as exc:
|
||||||
|
raise _translate_telegram_error(exc, channel.identifier) from exc
|
||||||
|
|
||||||
|
duplicate_id = (
|
||||||
|
await session.execute(
|
||||||
|
select(Channel.id).where(
|
||||||
|
Channel.tg_id == resolved.tg_id,
|
||||||
|
Channel.id != channel.id,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).scalar_one_or_none()
|
||||||
|
if duplicate_id is not None:
|
||||||
|
raise PollDuplicateChannelError(
|
||||||
|
f"Telegram channel is already connected to channel #{duplicate_id}"
|
||||||
|
)
|
||||||
channel.tg_id = resolved.tg_id
|
channel.tg_id = resolved.tg_id
|
||||||
channel.title = resolved.title
|
channel.title = resolved.title
|
||||||
|
try:
|
||||||
|
await session.flush()
|
||||||
|
except IntegrityError as exc:
|
||||||
|
raise PollDuplicateChannelError(
|
||||||
|
"Telegram channel is already connected to another channel"
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
try:
|
||||||
msgs = await fetch_new_messages(
|
msgs = await fetch_new_messages(
|
||||||
channel.identifier,
|
channel.identifier,
|
||||||
min_id=channel.last_message_id,
|
min_id=channel.last_message_id,
|
||||||
limit=settings.poll_history_limit,
|
limit=settings.poll_history_limit,
|
||||||
download_media_for_channel_id=channel.id,
|
download_media_for_channel_id=channel.id,
|
||||||
)
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
raise _translate_telegram_error(exc, channel.identifier) from exc
|
||||||
|
|
||||||
inserted = 0
|
inserted = 0
|
||||||
for m in msgs:
|
for m in msgs:
|
||||||
@@ -93,6 +185,13 @@ async def poll_all() -> None:
|
|||||||
for channel_id in ids:
|
for channel_id in ids:
|
||||||
try:
|
try:
|
||||||
await poll_channel(channel_id)
|
await poll_channel(channel_id)
|
||||||
|
except PollError as exc:
|
||||||
|
log.warning(
|
||||||
|
"poll_skipped",
|
||||||
|
channel_id=channel_id,
|
||||||
|
code=exc.code,
|
||||||
|
error=exc.message,
|
||||||
|
)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
log.error("poll_failed", channel_id=channel_id, error=str(exc))
|
log.error("poll_failed", channel_id=channel_id, error=str(exc))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user