diff --git a/cmd/server/main.go b/cmd/server/main.go index b6a69bb..cd44c82 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -558,17 +558,81 @@ func (a *app) probeMediaMetadata(ctx context.Context) componentProbe { return componentProbe{Name: "media_metadata", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} } if missingFiles > 0 { + recent, recentErr := a.recentMissingMedia(ctx) + details := map[string]any{ + "messages_with_media": withMedia, + "missing_media_files": missingFiles, + } + if recentErr != nil { + details["recent_error"] = recentErr.Error() + } else { + details["recent_missing_media"] = recent + } return componentProbe{ Name: "media_metadata", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "messages_with_media=" + strconv.FormatInt(withMedia, 10) + " missing_media_files=" + strconv.FormatInt(missingFiles, 10), + Details: details, } } return componentProbe{Name: "media_metadata", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} } +func (a *app) recentMissingMedia(ctx context.Context) ([]map[string]any, error) { + rows, err := a.db.Query(ctx, ` + SELECT + m.id, + m.tg_message_id, + m.date, + c.id AS channel_id, + c.identifier, + COALESCE(c.title, '') AS channel_title, + s.slug, + s.title AS section_title + FROM messages m + JOIN channels c ON c.id = m.channel_id + LEFT JOIN sections s ON s.id = c.section_id + WHERE m.has_media = true + AND jsonb_array_length(COALESCE(m.media_files, '[]'::jsonb)) = 0 + ORDER BY m.date DESC, m.id DESC + LIMIT 5`) + if err != nil { + return nil, err + } + defer rows.Close() + + out := make([]map[string]any, 0, 5) + for rows.Next() { + var ( + id int64 + tgMessageID int64 + date time.Time + channelID int64 + identifier string + channelTitle string + sectionSlug sql.NullString + sectionTitle sql.NullString + ) + if err := rows.Scan(&id, &tgMessageID, &date, &channelID, &identifier, &channelTitle, §ionSlug, §ionTitle); err != nil { + return nil, err + } + out = append(out, map[string]any{ + "message_id": id, + "tg_message_id": tgMessageID, + "message_date": date, + "channel_id": channelID, + "identifier": identifier, + "channel_title": nullableString(channelTitle), + "section_slug": nullString(sectionSlug), + "section_title": nullString(sectionTitle), + "repair_action": "/api/v1/channels/" + strconv.FormatInt(channelID, 10) + "/backfill-media", + }) + } + return out, rows.Err() +} + func (a *app) handleAccessMe(w http.ResponseWriter, r *http.Request) { scope := readAccess(r) writeJSON(w, http.StatusOK, map[string]any{ diff --git a/src/parser_bot/scheduler/poller.py b/src/parser_bot/scheduler/poller.py index 428bf0a..a3609a8 100644 --- a/src/parser_bot/scheduler/poller.py +++ b/src/parser_bot/scheduler/poller.py @@ -2,7 +2,7 @@ from datetime import datetime, timezone import structlog from apscheduler.schedulers.asyncio import AsyncIOScheduler -from sqlalchemy import func, select +from sqlalchemy import func, or_, select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.exc import IntegrityError @@ -262,10 +262,15 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int if channel is None: raise RuntimeError("channel not found") + missing_media_condition = or_( + Message.media_files.is_(None), + func.jsonb_array_length(Message.media_files) == 0, + ) + pending_q = select(func.count(Message.id)).where( Message.channel_id == channel_id, Message.has_media.is_(True), - Message.media_files.is_(None), + missing_media_condition, ) pending_total = (await session.execute(pending_q)).scalar_one() @@ -275,7 +280,7 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int .where( Message.channel_id == channel_id, Message.has_media.is_(True), - Message.media_files.is_(None), + missing_media_condition, ) .order_by(Message.tg_message_id.asc()) .limit(batch_size)