feat: track monitoring tg poll errors
This commit is contained in:
34
alembic/versions/0013_channel_poll_status.py
Normal file
34
alembic/versions/0013_channel_poll_status.py
Normal file
@@ -0,0 +1,34 @@
|
||||
"""channel poll status
|
||||
|
||||
Revision ID: 0013
|
||||
Revises: 0012
|
||||
Create Date: 2026-06-17
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
revision: str = "0013"
|
||||
down_revision: Union[str, None] = "0012"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column("channels", sa.Column("last_poll_status", sa.String(length=32), nullable=True))
|
||||
op.add_column("channels", sa.Column("last_poll_error_code", sa.String(length=64), nullable=True))
|
||||
op.add_column("channels", sa.Column("last_poll_error", sa.Text(), nullable=True))
|
||||
op.add_column("channels", sa.Column("last_poll_error_at", sa.DateTime(timezone=True), nullable=True))
|
||||
op.create_index("ix_channels_last_poll_status", "channels", ["last_poll_status"])
|
||||
op.create_index("ix_channels_last_poll_error_code", "channels", ["last_poll_error_code"])
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_index("ix_channels_last_poll_error_code", table_name="channels")
|
||||
op.drop_index("ix_channels_last_poll_status", table_name="channels")
|
||||
op.drop_column("channels", "last_poll_error_at")
|
||||
op.drop_column("channels", "last_poll_error")
|
||||
op.drop_column("channels", "last_poll_error_code")
|
||||
op.drop_column("channels", "last_poll_status")
|
||||
@@ -266,6 +266,7 @@ func (a *app) handleHealthDetail(w http.ResponseWriter, r *http.Request) {
|
||||
a.probeClassificationQueue(ctx),
|
||||
a.probeAIJobs(ctx),
|
||||
a.probePoller(ctx),
|
||||
a.probePollErrors(ctx),
|
||||
a.probeMediaStorage(ctx),
|
||||
a.probeMediaMetadata(ctx),
|
||||
}
|
||||
@@ -423,6 +424,43 @@ func (a *app) probePoller(ctx context.Context) componentProbe {
|
||||
return componentProbe{Name: "poller", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||
}
|
||||
|
||||
func (a *app) probePollErrors(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
var total, floodWait, unavailable, other int64
|
||||
err := a.db.QueryRow(ctx, `
|
||||
SELECT
|
||||
COUNT(*) FILTER (WHERE last_poll_status = 'error')::bigint,
|
||||
COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_flood_wait')::bigint,
|
||||
COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_channel_unavailable')::bigint,
|
||||
COUNT(*) FILTER (
|
||||
WHERE last_poll_status = 'error'
|
||||
AND COALESCE(last_poll_error_code, '') NOT IN ('telegram_flood_wait', 'telegram_channel_unavailable')
|
||||
)::bigint
|
||||
FROM channels
|
||||
WHERE is_active = true
|
||||
AND source_channel_id IS NULL`,
|
||||
).Scan(&total, &floodWait, &unavailable, &other)
|
||||
if err != nil {
|
||||
return componentProbe{Name: "poll_errors", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||
}
|
||||
if total > 0 {
|
||||
status := "degraded"
|
||||
if other > 0 {
|
||||
status = "down"
|
||||
}
|
||||
return componentProbe{
|
||||
Name: "poll_errors",
|
||||
Status: status,
|
||||
LatencyMs: time.Since(start).Milliseconds(),
|
||||
Error: "total=" + strconv.FormatInt(total, 10) +
|
||||
" flood_wait=" + strconv.FormatInt(floodWait, 10) +
|
||||
" unavailable=" + strconv.FormatInt(unavailable, 10) +
|
||||
" other=" + strconv.FormatInt(other, 10),
|
||||
}
|
||||
}
|
||||
return componentProbe{Name: "poll_errors", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||
}
|
||||
|
||||
func (a *app) probeMediaStorage(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
if a.minio == nil || a.cfg.MinioBucket == "" {
|
||||
|
||||
@@ -78,6 +78,10 @@ class Channel(Base):
|
||||
is_active: Mapped[bool] = mapped_column(default=True, server_default="true")
|
||||
last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
|
||||
last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
||||
last_poll_status: Mapped[str | None] = mapped_column(String(32), nullable=True)
|
||||
last_poll_error_code: Mapped[str | None] = mapped_column(String(64), nullable=True)
|
||||
last_poll_error: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
last_poll_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), server_default=func.now()
|
||||
)
|
||||
|
||||
@@ -83,6 +83,17 @@ def _translate_telegram_error(exc: Exception, identifier: str) -> PollError:
|
||||
|
||||
async def poll_channel(channel_id: int) -> int:
|
||||
"""Poll one channel for new messages. Returns count of inserted rows."""
|
||||
try:
|
||||
return await _poll_channel(channel_id)
|
||||
except PollError as exc:
|
||||
await _record_poll_error(channel_id, exc.code, exc.message)
|
||||
raise
|
||||
except Exception as exc:
|
||||
await _record_poll_error(channel_id, "poll_failed", str(exc))
|
||||
raise
|
||||
|
||||
|
||||
async def _poll_channel(channel_id: int) -> int:
|
||||
if not await is_authorized():
|
||||
raise PollUnauthorizedError(
|
||||
"Telegram is not authorized: open Monitoring TG in Portal and authorize it"
|
||||
@@ -99,6 +110,10 @@ async def poll_channel(channel_id: int) -> int:
|
||||
channel.title = channel.title or source.title
|
||||
channel.last_message_id = source.last_message_id
|
||||
channel.last_polled_at = source.last_polled_at
|
||||
channel.last_poll_status = "alias"
|
||||
channel.last_poll_error_code = None
|
||||
channel.last_poll_error = None
|
||||
channel.last_poll_error_at = None
|
||||
return 0
|
||||
|
||||
if channel.tg_id is None or channel.title is None:
|
||||
@@ -122,6 +137,10 @@ async def poll_channel(channel_id: int) -> int:
|
||||
channel.title = channel.title or resolved.title or (source.title if source else None)
|
||||
channel.last_message_id = source.last_message_id if source else channel.last_message_id
|
||||
channel.last_polled_at = source.last_polled_at if source else channel.last_polled_at
|
||||
channel.last_poll_status = "alias"
|
||||
channel.last_poll_error_code = None
|
||||
channel.last_poll_error = None
|
||||
channel.last_poll_error_at = None
|
||||
log.info(
|
||||
"linked_channel_alias",
|
||||
channel_id=channel.id,
|
||||
@@ -180,6 +199,10 @@ async def poll_channel(channel_id: int) -> int:
|
||||
channel.last_message_id or 0, msgs[-1].tg_message_id
|
||||
)
|
||||
channel.last_polled_at = datetime.now(timezone.utc)
|
||||
channel.last_poll_status = "ok"
|
||||
channel.last_poll_error_code = None
|
||||
channel.last_poll_error = None
|
||||
channel.last_poll_error_at = None
|
||||
|
||||
log.info(
|
||||
"polled_channel",
|
||||
@@ -191,6 +214,17 @@ async def poll_channel(channel_id: int) -> int:
|
||||
return inserted
|
||||
|
||||
|
||||
async def _record_poll_error(channel_id: int, code: str, message: str) -> None:
|
||||
async with session_scope() as session:
|
||||
channel = await session.get(Channel, channel_id)
|
||||
if channel is None:
|
||||
return
|
||||
channel.last_poll_status = "error"
|
||||
channel.last_poll_error_code = code
|
||||
channel.last_poll_error = message[:1000]
|
||||
channel.last_poll_error_at = datetime.now(timezone.utc)
|
||||
|
||||
|
||||
async def poll_all() -> None:
|
||||
if not await is_authorized():
|
||||
log.debug("poll_skipped_not_authorized")
|
||||
|
||||
Reference in New Issue
Block a user