diff --git a/.env.example b/.env.example index 0f388a3..34b1e05 100644 --- a/.env.example +++ b/.env.example @@ -24,16 +24,17 @@ POSTGRES_PORT=5432 POLL_INTERVAL_SECONDS=60 POLL_HISTORY_LIMIT=50 -# API +# Go public API API_HOST=0.0.0.0 API_PORT=8000 +PUBLIC_BASE_PATH=/api/monitoring-tg +PYTHON_BASE_URL=http://127.0.0.1:8001 # Media (downloaded photos / small videos / docs from parsed messages) MEDIA_DIR=/data/media MEDIA_MAX_BYTES=20971520 -# OpenAI-compatible LLM endpoint. In production this can point to the same -# vLLM server/model used by telephony. +# OpenAI-compatible LLM endpoint used by the Go classifier. LLM_ENABLED=true LLM_BASE_URL=http://10.2.3.5:8002 LLM_API_KEY= @@ -41,8 +42,6 @@ LLM_MODEL=qwen2.5-14b LLM_TIMEOUT_SECONDS=120 LLM_MAX_TOKENS=600 LLM_MIN_TEXT_LENGTH=20 -LLM_CLASSIFIER_OWNER=python -# How often the background classifier wakes up and how many messages it -# processes per tick. With 5/20s ≈ 900 messages/hour at ~3-6s per call. +LLM_CLASSIFIER_OWNER=go LLM_CLASSIFY_INTERVAL_SECONDS=20 LLM_CLASSIFY_BATCH_SIZE=5 diff --git a/README.md b/README.md index 48d9488..114038b 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,9 @@ # monitoring-tg -Backend-сервис мониторинга Telegram-каналов для Portal. Python-часть отвечает -за MTProto/Telethon, API и опрос каналов, а фоновая AI-классификация вынесена -в Go-воркер. Сервис сохраняет сообщения в Postgres, раскладывает каналы по +Backend-сервис мониторинга Telegram-каналов для Portal. Публичный API и +AI-классификация работают на Go, Python оставлен только как внутренний +MTProto/Telethon-адаптер для авторизации, опроса каналов и дозагрузки медиа. +Сервис сохраняет сообщения в Postgres, раскладывает каналы по вертикалям/подразделам и выполняет AI-анализ через OpenAI-compatible endpoint, общий с другими сервисами портала. @@ -37,6 +38,7 @@ POSTGRES_PASSWORD=parser POSTGRES_DB=parser PUBLIC_BASE_PATH=/api/monitoring-tg +PYTHON_BASE_URL=http://127.0.0.1:8001 LLM_ENABLED=true LLM_BASE_URL=http://10.2.3.5:8002 @@ -57,18 +59,19 @@ LLM_CLASSIFIER_OWNER=go kubectl apply -k k8s ``` -Миграции выполняются entrypoint-ом контейнера перед запуском API. +Миграции выполняются entrypoint-ом контейнера перед запуском процессов. ## Структура ```text src/parser_bot/ -├── api/ FastAPI роуты + Pydantic-схемы +├── api/ внутренние FastAPI роуты Telegram-адаптера ├── db/ SQLAlchemy модели + сессии ├── scheduler/ APScheduler-воркер периодического опроса ├── telegram/ Telethon-клиент ├── config.py pydantic-settings -└── main.py FastAPI lifespan + uvicorn +└── main.py FastAPI lifespan + uvicorn для внутреннего адаптера +cmd/server/ Go API для Portal cmd/classifier/ Go-воркер фоновой LLM-классификации сообщений alembic/ миграции k8s/ манифесты для портала diff --git a/pyproject.toml b/pyproject.toml index eb5b590..fcdaeb2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "parser-tg-bot" version = "0.1.0" -description = "Telegram channel parser — periodic polling + storage, future Go microservice" +description = "Telegram channel monitoring service with Go API/classifier and Python Telethon adapter" requires-python = ">=3.11" dependencies = [ "telethon>=1.36", @@ -15,7 +15,6 @@ dependencies = [ "pydantic-settings>=2.6", "python-dotenv>=1.0", "structlog>=24.4", - "httpx>=0.27", ] [project.optional-dependencies] diff --git a/src/parser_bot/api/routes.py b/src/parser_bot/api/routes.py index 6ea76d6..474a98a 100644 --- a/src/parser_bot/api/routes.py +++ b/src/parser_bot/api/routes.py @@ -1,180 +1,74 @@ -from datetime import datetime, timedelta, timezone -from typing import Any, Literal +from typing import Any -import sqlalchemy as sa -from fastapi import ( - APIRouter, - BackgroundTasks, - Depends, - HTTPException, - Query, - Request, -) -from sqlalchemy import func, select +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, Request +from pydantic import BaseModel, Field +from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from parser_bot import llm as llm_client -from parser_bot import prompt_store from parser_bot.access import ( - can_manage_department, is_admin_request, portal_department_id, require_admin, require_department_manager, ) - -from parser_bot.api.schemas import ( - AuthCode, - AuthCodeResult, - AuthPassword, - AuthStatus, - ChannelCreate, - ChannelOut, - ChannelStats, - ChannelUpdate, - GlobalStats, - MessageOut, - SectionCreate, - SectionOut, - SectionUpdate, - SectionWithStats, -) from parser_bot.config import settings -from parser_bot.db.models import Channel, Message, Section +from parser_bot.db.models import Channel, Section from parser_bot.db.session import get_session -from parser_bot.links import post_url as build_post_url -from parser_bot.scheduler.poller import ( - backfill_media, - pending_llm_count, - poll_channel, - reanalyze_channel, -) +from parser_bot.scheduler.poller import backfill_media, poll_channel from parser_bot.telegram import client as tg router = APIRouter() -Vertical = Literal["real_estate", "hr"] +class AuthStatus(BaseModel): + authorized: bool + username: str | None = None + phone: str | None = None -def _verdict_key(vertical: str) -> str: - return "hr_lead" if vertical == "hr" else "lead" +class AuthCode(BaseModel): + code: str = Field(..., min_length=3, max_length=12) -async def _get_section( +class AuthPassword(BaseModel): + password: str = Field(..., min_length=1) + + +class AuthCodeResult(BaseModel): + needs_password: bool + + +def _department_scope(request: Request) -> str | None: + if is_admin_request(request): + return None + dept_id = portal_department_id(request) + if not dept_id: + raise HTTPException(status_code=403, detail="department is required") + return dept_id + + +async def _require_channel_scope( session: AsyncSession, - vertical: Vertical, - slug: str, - department_id: str | None = None, -) -> Section: - """Find a section by (vertical, slug) or 404.""" - stmt = select(Section).where(Section.vertical == vertical, Section.slug == slug) - if department_id is not None: - stmt = stmt.where(Section.department_id == department_id) - result = await session.execute(stmt) - section = result.scalars().first() - if section is None: - raise HTTPException( - status_code=404, detail=f"section {vertical}:{slug} not found" - ) - return section - - -def _read_department_id(request: Request) -> str | None: - if is_admin_request(request): - return None - dept_id = portal_department_id(request) - if not dept_id: - raise HTTPException(status_code=403, detail="department is required") - return dept_id - - -def _manage_department_id(request: Request) -> str | None: - if not can_manage_department(request): - raise HTTPException(status_code=403, detail="department manager required") - if is_admin_request(request): - return None - dept_id = portal_department_id(request) - if not dept_id: - raise HTTPException(status_code=403, detail="department is required") - return dept_id - - -async def _require_scope_access( request: Request, - session: AsyncSession, - vertical: Vertical | None, - section_slug: str | None, -) -> Section | None: - department_id = _read_department_id(request) - if vertical is not None and section_slug is not None: - return await _get_section(session, vertical, section_slug, department_id) - return None - - -async def _get_channel_in_scope( - session: AsyncSession, channel_id: int, - vertical: Vertical | None, - section_slug: str | None, - department_id: str | None = None, -) -> tuple[Channel, str]: - """Load a channel and its section slug; 404 if any scope constraint fails. - - All per-id endpoints route through this so a vertical/section-scoped UI - cannot accidentally read or mutate something from another scope. - """ - result = await session.execute( - select(Channel, Section.slug, Section.department_id) + vertical: str | None, + section: str | None, +) -> None: + department_id = _department_scope(request) + stmt = ( + select(Channel.id) .join(Section, Section.id == Channel.section_id) .where(Channel.id == channel_id) ) - row = result.one_or_none() - if row is None: + if vertical: + stmt = stmt.where(Channel.vertical == vertical) + if section: + stmt = stmt.where(Section.slug == section) + if department_id is not None: + stmt = stmt.where(Section.department_id == department_id) + exists = (await session.execute(stmt)).scalar_one_or_none() + if exists is None: raise HTTPException(status_code=404) - channel, ch_section_slug, ch_department_id = row - if vertical is not None and channel.vertical != vertical: - raise HTTPException(status_code=404) - if section_slug is not None and ch_section_slug != section_slug: - raise HTTPException(status_code=404) - if department_id is not None and ch_department_id != department_id: - raise HTTPException(status_code=404) - return channel, ch_section_slug - - -def _channel_out(channel: Channel, section_slug: str | None) -> dict[str, Any]: - return { - "id": channel.id, - "tg_id": channel.tg_id, - "identifier": channel.identifier, - "title": channel.title, - "vertical": channel.vertical, - "section_id": channel.section_id, - "section_slug": section_slug, - "is_active": channel.is_active, - "last_message_id": channel.last_message_id, - "last_polled_at": channel.last_polled_at, - "created_at": channel.created_at, - } - - -# --- Access -------------------------------------------------------------- - - -@router.get("/access/me") -async def access_me(request: Request) -> dict[str, Any]: - admin = is_admin_request(request) - department_id = portal_department_id(request) - can_manage = can_manage_department(request) - return { - "is_admin": admin, - "can_manage_department": can_manage, - "department_id": department_id, - } - - -# --- Auth (admin-only) -------------------------------------------------- -# Telegram session controls are an admin surface. @router.get("/auth/status", response_model=AuthStatus, dependencies=[Depends(require_admin)]) @@ -187,9 +81,7 @@ async def auth_status() -> AuthStatus: ) -@router.post( - "/auth/send-code", status_code=204, dependencies=[Depends(require_admin)] -) +@router.post("/auth/send-code", status_code=204, dependencies=[Depends(require_admin)]) async def auth_send_code() -> None: try: await tg.send_login_code() @@ -210,9 +102,7 @@ async def auth_submit_code(payload: AuthCode) -> AuthCodeResult: return AuthCodeResult(needs_password=needs_password) -@router.post( - "/auth/submit-password", status_code=204, dependencies=[Depends(require_admin)] -) +@router.post("/auth/submit-password", status_code=204, dependencies=[Depends(require_admin)]) async def auth_submit_password(payload: AuthPassword) -> None: try: await tg.submit_login_password(payload.password) @@ -220,343 +110,20 @@ async def auth_submit_password(payload: AuthPassword) -> None: raise HTTPException(status_code=400, detail=str(exc)) -@router.post( - "/auth/logout", status_code=204, dependencies=[Depends(require_admin)] -) +@router.post("/auth/logout", status_code=204, dependencies=[Depends(require_admin)]) async def auth_logout() -> None: await tg.logout() -# --- Sections ----------------------------------------------------------- - - -@router.get("/sections", response_model=list[SectionWithStats]) -async def list_sections( - request: Request, - vertical: Vertical = Query(..., description="required: real_estate | hr"), - session: AsyncSession = Depends(get_session), -) -> list[SectionWithStats]: - """List sub-sections inside a vertical, each with rollup counts. - - Used by the section-chooser page. Counts are computed in a single query - via LEFT JOINs so empty sections still appear. - """ - department_id = _read_department_id(request) - # Per-section channel counts. - ch_total_sub = ( - select(Channel.section_id, func.count(Channel.id).label("ct")) - .group_by(Channel.section_id) - .subquery() - ) - ch_active_sub = ( - select(Channel.section_id, func.count(Channel.id).label("ca")) - .where(Channel.is_active.is_(True)) - .group_by(Channel.section_id) - .subquery() - ) - # Per-section message counts. - msg_total_sub = ( - select(Channel.section_id, func.count(Message.id).label("mt")) - .join(Message, Message.channel_id == Channel.id) - .group_by(Channel.section_id) - .subquery() - ) - # Per-section leads (uses vertical-appropriate verdict key). - if vertical == "hr": - lead_clause = Message.extracted["hr_lead"]["is_lead"].astext == "true" - else: - lead_clause = Message.extracted["lead"]["is_listing"].astext == "true" - leads_sub = ( - select(Channel.section_id, func.count(Message.id).label("lt")) - .join(Message, Message.channel_id == Channel.id) - .where(lead_clause) - .group_by(Channel.section_id) - .subquery() - ) - - rows = ( - await session.execute( - select( - Section, - func.coalesce(ch_total_sub.c.ct, 0), - func.coalesce(ch_active_sub.c.ca, 0), - func.coalesce(msg_total_sub.c.mt, 0), - func.coalesce(leads_sub.c.lt, 0), - ) - .where(Section.vertical == vertical) - .where( - Section.department_id == department_id - if department_id is not None - else sa.true() - ) - .outerjoin(ch_total_sub, ch_total_sub.c.section_id == Section.id) - .outerjoin(ch_active_sub, ch_active_sub.c.section_id == Section.id) - .outerjoin(msg_total_sub, msg_total_sub.c.section_id == Section.id) - .outerjoin(leads_sub, leads_sub.c.section_id == Section.id) - .order_by(Section.slug) - ) - ).all() - - return [ - SectionWithStats( - id=s.id, - vertical=s.vertical, - department_id=s.department_id, - slug=s.slug, - title=s.title, - emoji=s.emoji, - description=s.description, - created_at=s.created_at, - channels_total=ct, - channels_active=ca, - messages_total=mt, - leads_total=lt, - ) - for (s, ct, ca, mt, lt) in rows - ] - - -@router.post("/sections", response_model=SectionOut, status_code=201) -async def create_section( - payload: SectionCreate, - request: Request, - session: AsyncSession = Depends(get_session), -) -> Section: - department_id = _manage_department_id(request) - # Reject duplicates with a friendly message instead of a constraint error. - existing = await session.execute( - select(Section).where( - Section.vertical == payload.vertical, - Section.department_id == department_id, - Section.slug == payload.slug, - ) - ) - if existing.scalar_one_or_none() is not None: - raise HTTPException( - status_code=409, - detail=f"section {payload.vertical}:{payload.slug} already exists", - ) - section = Section(**payload.model_dump(), department_id=department_id) - session.add(section) - await session.commit() - await session.refresh(section) - return section - - -@router.get("/sections/{vertical}/{slug}", response_model=SectionOut) -async def get_section( - vertical: Vertical, - slug: str, - request: Request, - session: AsyncSession = Depends(get_session), -) -> dict[str, Any]: - department_id = _read_department_id(request) - section = await _get_section(session, vertical, slug, department_id) - return { - "id": section.id, - "vertical": section.vertical, - "department_id": section.department_id, - "slug": section.slug, - "title": section.title, - "emoji": section.emoji, - "description": section.description, - "created_at": section.created_at, - } - - -@router.patch( - "/sections/{vertical}/{slug}", - response_model=SectionOut, - dependencies=[Depends(require_department_manager)], -) -async def update_section( - vertical: Vertical, - slug: str, - payload: SectionUpdate, - request: Request, - session: AsyncSession = Depends(get_session), -) -> Section: - department_id = _manage_department_id(request) - section = await _get_section(session, vertical, slug, department_id) - data = payload.model_dump(exclude_unset=True) - for k, v in data.items(): - setattr(section, k, v) - await session.commit() - await session.refresh(section) - return section - - -@router.delete( - "/sections/{vertical}/{slug}", - status_code=204, - dependencies=[Depends(require_department_manager)], -) -async def delete_section( - vertical: Vertical, - slug: str, - request: Request, - session: AsyncSession = Depends(get_session), -) -> None: - department_id = _manage_department_id(request) - section = await _get_section(session, vertical, slug, department_id) - # Block deletion when channels are still attached — keeps data referenceable. - count = ( - await session.execute( - select(func.count(Channel.id)).where(Channel.section_id == section.id) - ) - ).scalar_one() - if count: - raise HTTPException( - status_code=409, - detail=f"section has {count} channels — move or delete them first", - ) - # Drop the per-section LLM prompt too if any. - await prompt_store.reset(vertical, section.department_id, slug) - await session.delete(section) - await session.commit() - - -# --- Channels ----------------------------------------------------------- - - -@router.get("/channels", response_model=list[ChannelOut]) -async def list_channels( - request: Request, - vertical: Vertical = Query(..., description="required: real_estate | hr"), - section: str | None = Query(None, description="optional section slug filter"), - session: AsyncSession = Depends(get_session), -) -> list[dict[str, Any]]: - department_id = _read_department_id(request) - await _require_scope_access(request, session, vertical, section) - stmt = ( - select(Channel, Section.slug) - .join(Section, Section.id == Channel.section_id) - .where(Channel.vertical == vertical) - .order_by(Channel.id) - ) - if department_id is not None: - stmt = stmt.where(Section.department_id == department_id) - if section is not None: - stmt = stmt.where(Section.slug == section) - rows = (await session.execute(stmt)).all() - return [_channel_out(ch, slug) for (ch, slug) in rows] - - -@router.post( - "/channels", - response_model=ChannelOut, - status_code=201, -) -async def add_channel( - payload: ChannelCreate, - request: Request, - session: AsyncSession = Depends(get_session), -) -> dict[str, Any]: - department_id = _manage_department_id(request) - existing = await session.execute( - select(Channel).where(Channel.identifier == payload.identifier) - ) - if existing.scalar_one_or_none() is not None: - raise HTTPException(status_code=409, detail="channel already exists") - - section = await _get_section(session, payload.vertical, payload.section, department_id) - - if not await tg.is_authorized(): - raise HTTPException( - status_code=401, - detail="not authorized: open Monitoring TG in Portal and authorize Telegram", - ) - try: - resolved = await tg.resolve_channel(payload.identifier) - except Exception as exc: - raise HTTPException(status_code=400, detail=f"cannot resolve channel: {exc}") - - channel = Channel( - identifier=payload.identifier, - tg_id=resolved.tg_id, - title=resolved.title, - vertical=payload.vertical, - section_id=section.id, - ) - session.add(channel) - await session.commit() - await session.refresh(channel) - return _channel_out(channel, section.slug) - - -@router.get("/channels/{channel_id}", response_model=ChannelOut) -async def get_channel( - channel_id: int, - request: Request, - vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"), - section: str | None = Query(None, description="scope: 404 if mismatched"), - session: AsyncSession = Depends(get_session), -) -> dict[str, Any]: - department_id = _read_department_id(request) - await _require_scope_access(request, session, vertical, section) - channel, section_slug = await _get_channel_in_scope( - session, channel_id, vertical, section, department_id - ) - return _channel_out(channel, section_slug) - - -@router.patch( - "/channels/{channel_id}", - response_model=ChannelOut, - dependencies=[Depends(require_department_manager)], -) -async def update_channel( - channel_id: int, - payload: ChannelUpdate, - request: Request, - vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"), - section: str | None = Query(None, description="scope: 404 if mismatched"), - session: AsyncSession = Depends(get_session), -) -> dict[str, Any]: - department_id = _manage_department_id(request) - channel, _ = await _get_channel_in_scope(session, channel_id, vertical, section, department_id) - if payload.is_active is not None: - channel.is_active = payload.is_active - if payload.vertical is not None: - channel.vertical = payload.vertical - if payload.section is not None: - new_section = await _get_section(session, channel.vertical, payload.section, department_id) - channel.section_id = new_section.id - await session.commit() - await session.refresh(channel) - # Reload the section slug since it may have changed. - section_row = await session.get(Section, channel.section_id) - return _channel_out(channel, section_row.slug if section_row else None) - - -@router.delete( - "/channels/{channel_id}", - status_code=204, - dependencies=[Depends(require_department_manager)], -) -async def delete_channel( - channel_id: int, - request: Request, - vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"), - section: str | None = Query(None, description="scope: 404 if mismatched"), - session: AsyncSession = Depends(get_session), -) -> None: - department_id = _manage_department_id(request) - channel, _ = await _get_channel_in_scope(session, channel_id, vertical, section, department_id) - await session.delete(channel) - await session.commit() - - @router.post("/channels/{channel_id}/poll", dependencies=[Depends(require_department_manager)]) async def trigger_poll( channel_id: int, request: Request, - vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"), - section: str | None = Query(None, description="scope: 404 if mismatched"), + vertical: str | None = Query(None), + section: str | None = Query(None), session: AsyncSession = Depends(get_session), ) -> dict[str, int]: - department_id = _manage_department_id(request) - await _get_channel_in_scope(session, channel_id, vertical, section, department_id) + await _require_channel_scope(session, request, channel_id, vertical, section) inserted = await poll_channel(channel_id) return {"inserted": inserted} @@ -568,455 +135,22 @@ async def trigger_poll( async def trigger_backfill_media( channel_id: int, request: Request, - batch: int = Query(50, ge=1, le=200), - vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"), - section: str | None = Query(None, description="scope: 404 if mismatched"), + batch: int = Query(50, ge=1, le=500), + vertical: str | None = Query(None), + section: str | None = Query(None), session: AsyncSession = Depends(get_session), ) -> dict[str, int]: - department_id = _manage_department_id(request) - await _get_channel_in_scope(session, channel_id, vertical, section, department_id) + await _require_channel_scope(session, request, channel_id, vertical, section) try: return await backfill_media(channel_id, batch_size=batch) except RuntimeError as exc: raise HTTPException(status_code=400, detail=str(exc)) -@router.post( - "/channels/{channel_id}/reanalyze", - dependencies=[Depends(require_department_manager)], -) -async def trigger_reanalyze( - channel_id: int, - request: Request, - batch: int = Query(500, ge=1, le=2000), - vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"), - section: str | None = Query(None, description="scope: 404 if mismatched"), - session: AsyncSession = Depends(get_session), -) -> dict[str, int]: - department_id = _manage_department_id(request) - await _get_channel_in_scope(session, channel_id, vertical, section, department_id) - return await reanalyze_channel(channel_id, batch_size=batch) - - -@router.get("/channels/{channel_id}/stats", response_model=ChannelStats) -async def channel_stats( - channel_id: int, - request: Request, - vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"), - section: str | None = Query(None, description="scope: 404 if mismatched"), - session: AsyncSession = Depends(get_session), -) -> ChannelStats: - department_id = _read_department_id(request) - await _require_scope_access(request, session, vertical, section) - channel, section_slug = await _get_channel_in_scope( - session, channel_id, vertical, section, department_id - ) - counts = await session.execute( - select(func.count(Message.id), func.max(Message.date)).where( - Message.channel_id == channel_id - ) - ) - msg_count, last_date = counts.one() - return ChannelStats( - channel_id=channel.id, - identifier=channel.identifier, - title=channel.title, - vertical=channel.vertical, - section_slug=section_slug, - is_active=channel.is_active, - last_polled_at=channel.last_polled_at, - message_count=msg_count or 0, - last_message_at=last_date, - ) - - -# --- Messages ----------------------------------------------------------- - - -@router.get("/messages", response_model=list[MessageOut]) -async def list_messages( - request: Request, - vertical: Vertical = Query(..., description="required: real_estate | hr"), - section: str | None = Query(None, description="optional section slug"), - channel_id: int | None = None, - q: str | None = Query(None, description="full-text search in message body"), - real_estate: str | None = Query( - None, description="filter by deal kind: any|sale|rent|purchase" - ), - hr_kind: str | None = Query( - None, description="filter by HR lead kind: any|vacancy|resume|contact" - ), - leads_only: bool = Query(False, description="only LLM-confirmed leads"), - min_confidence: float = Query( - 0.5, ge=0.0, le=1.0, description="minimum LLM confidence when leads_only=true" - ), - has_phone: bool = Query(False, description="only messages with extracted phone numbers"), - limit: int = Query(50, ge=1, le=500), - offset: int = Query(0, ge=0), - session: AsyncSession = Depends(get_session), -) -> list[MessageOut]: - """Return messages grouped by Telegram album (`grouped_id`). - - Every query joins through channels (and optionally sections) so a - cross-vertical / cross-section read is impossible at the API boundary. - """ - department_id = _read_department_id(request) - await _require_scope_access(request, session, vertical, section) - - # Step 1: pick the group keys for this page, ordered by latest activity. - group_key = func.coalesce( - Message.grouped_id, -Message.id # solo messages get a unique negative key - ).label("group_key") - g_stmt = ( - select(group_key, func.max(Message.date).label("group_date")) - .join(Channel, Channel.id == Message.channel_id) - .join(Section, Section.id == Channel.section_id) - .where(Channel.vertical == vertical) - .group_by(group_key) - .order_by(func.max(Message.date).desc()) - .limit(limit) - .offset(offset) - ) - if department_id is not None: - g_stmt = g_stmt.where(Section.department_id == department_id) - if section is not None: - g_stmt = g_stmt.where(Section.slug == section) - if channel_id is not None: - g_stmt = g_stmt.where(Message.channel_id == channel_id) - if q: - g_stmt = g_stmt.where(Message.text.ilike(f"%{q}%")) - if real_estate == "any": - g_stmt = g_stmt.where(Message.extracted["real_estate"].is_not(None)) - elif real_estate in ("sale", "rent", "purchase"): - g_stmt = g_stmt.where( - Message.extracted["real_estate"]["kind"].astext == real_estate - ) - if hr_kind == "any": - g_stmt = g_stmt.where(Message.extracted["hr_lead"].is_not(None)) - elif hr_kind in ("vacancy", "resume", "contact"): - g_stmt = g_stmt.where( - Message.extracted["hr_lead"]["kind"].astext == hr_kind - ) - if has_phone: - g_stmt = g_stmt.where( - func.jsonb_array_length(Message.extracted["phones"]) > 0 - ) - if leads_only: - if vertical == "hr": - g_stmt = g_stmt.where( - Message.extracted["hr_lead"]["is_lead"].astext == "true", - Message.extracted["hr_lead"]["confidence"].astext.cast(sa.Float) >= min_confidence, - ) - else: - g_stmt = g_stmt.where( - Message.extracted["lead"]["is_listing"].astext == "true", - Message.extracted["lead"]["confidence"].astext.cast(sa.Float) >= min_confidence, - ) - - page = (await session.execute(g_stmt)).all() - if not page: - return [] - page_keys = [row.group_key for row in page] - - rows_stmt = ( - select(Message) - .join(Channel, Channel.id == Message.channel_id) - .join(Section, Section.id == Channel.section_id) - .where(group_key.in_(page_keys), Channel.vertical == vertical) - .order_by(Message.tg_message_id.asc()) - ) - if department_id is not None: - rows_stmt = rows_stmt.where(Section.department_id == department_id) - if section is not None: - rows_stmt = rows_stmt.where(Section.slug == section) - if channel_id is not None: - rows_stmt = rows_stmt.where(Message.channel_id == channel_id) - rows = list((await session.execute(rows_stmt)).scalars().all()) - - by_key: dict[int, list[Message]] = {} - for r in rows: - key = r.grouped_id if r.grouped_id is not None else -r.id - by_key.setdefault(key, []).append(r) - - channel_ids = {r.channel_id for r in rows} - channels_map: dict[int, tuple[Channel, str]] = {} - if channel_ids: - ch_rows = ( - await session.execute( - select(Channel, Section.slug) - .join(Section, Section.id == Channel.section_id) - .where(Channel.id.in_(channel_ids)) - ) - ).all() - channels_map = {c.id: (c, slug) for (c, slug) in ch_rows} - - out: list[MessageOut] = [] - for key in page_keys: - group_rows = by_key.get(key) - if not group_rows: - continue - canonical = min(group_rows, key=lambda m: m.tg_message_id) - text = next((m.text for m in group_rows if m.text), None) - media: list = [] - for m in group_rows: - if m.media_files: - media.extend(m.media_files) - ch_pair = channels_map.get(canonical.channel_id) - ch, ch_slug = (ch_pair if ch_pair else (None, None)) - url = build_post_url( - ch.identifier if ch else None, - ch.tg_id if ch else None, - canonical.tg_message_id, - ) - out.append( - MessageOut( - id=canonical.id, - channel_id=canonical.channel_id, - channel_vertical=ch.vertical if ch else None, - channel_section_slug=ch_slug, - tg_message_id=canonical.tg_message_id, - grouped_id=canonical.grouped_id, - group_size=len(group_rows), - date=min(m.date for m in group_rows), - text=text, - sender_id=canonical.sender_id, - sender_username=canonical.sender_username, - sender_name=canonical.sender_name, - post_url=url, - has_media=any(m.has_media for m in group_rows), - media_files=media or None, - extracted=next( - (m.extracted for m in group_rows if m.extracted), None - ), - views=max((m.views for m in group_rows if m.views is not None), default=None), - forwards=max( - (m.forwards for m in group_rows if m.forwards is not None), default=None - ), - fetched_at=canonical.fetched_at, - ) - ) - return out - - -@router.get("/messages/{message_id}", response_model=MessageOut) -async def get_message( - message_id: int, - request: Request, - vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"), - section: str | None = Query(None, description="scope: 404 if mismatched"), - session: AsyncSession = Depends(get_session), -) -> Message: - department_id = _read_department_id(request) - await _require_scope_access(request, session, vertical, section) - msg = await session.get(Message, message_id) - if msg is None: - raise HTTPException(status_code=404) - if vertical is not None or section is not None: - await _get_channel_in_scope(session, msg.channel_id, vertical, section, department_id) - elif department_id is not None: - await _get_channel_in_scope(session, msg.channel_id, None, None, department_id) - return msg - - -# --- Stats -------------------------------------------------------------- - - -@router.get("/stats", response_model=GlobalStats) -async def global_stats( - request: Request, - vertical: Vertical = Query(..., description="required: real_estate | hr"), - section: str | None = Query(None, description="optional section slug"), - session: AsyncSession = Depends(get_session), -) -> GlobalStats: - department_id = _read_department_id(request) - await _require_scope_access(request, session, vertical, section) - base_channel_where = [Channel.vertical == vertical] - section_join_needed = section is not None or department_id is not None - if section_join_needed: - if section is not None: - base_channel_where.append(Section.slug == section) - if department_id is not None: - base_channel_where.append(Section.department_id == department_id) - - def _channel_query(*extra): - stmt = select(func.count(Channel.id)).where(*base_channel_where, *extra) - if section_join_needed: - stmt = stmt.join(Section, Section.id == Channel.section_id) - return stmt - - def _message_query(*extra): - stmt = ( - select(func.count(Message.id)) - .join(Channel, Channel.id == Message.channel_id) - .where(*base_channel_where, *extra) - ) - if section_join_needed: - stmt = stmt.join(Section, Section.id == Channel.section_id) - return stmt - - channels_total = (await session.execute(_channel_query())).scalar_one() - channels_active = ( - await session.execute(_channel_query(Channel.is_active.is_(True))) - ).scalar_one() - messages_total = (await session.execute(_message_query())).scalar_one() - - since = datetime.now(timezone.utc) - timedelta(hours=24) - messages_24h = ( - await session.execute(_message_query(Message.fetched_at >= since)) - ).scalar_one() - - last_poll_stmt = select(func.max(Channel.last_polled_at)).where(*base_channel_where) - if section_join_needed: - last_poll_stmt = last_poll_stmt.join(Section, Section.id == Channel.section_id) - last_poll = (await session.execute(last_poll_stmt)).scalar_one() - - if vertical == "hr": - lead_clause = Message.extracted["hr_lead"]["is_lead"].astext == "true" - else: - lead_clause = Message.extracted["lead"]["is_listing"].astext == "true" - - leads_total = (await session.execute(_message_query(lead_clause))).scalar_one() - leads_24h = ( - await session.execute(_message_query(lead_clause, Message.fetched_at >= since)) - ).scalar_one() - - return GlobalStats( - vertical=vertical, - section_slug=section, - channels_total=channels_total, - channels_active=channels_active, - messages_total=messages_total, - messages_last_24h=messages_24h, - leads_total=leads_total or 0, - leads_last_24h=leads_24h or 0, - poll_interval_seconds=settings.poll_interval_seconds, - last_poll_at=last_poll, - ) - - -# --- LLM ---------------------------------------------------------------- - - -@router.get("/llm/status") -async def llm_status() -> dict[str, Any]: - """Whether the OpenAI-compatible LLM endpoint is reachable.""" - ready = await llm_client.is_ready() - return { - "enabled": settings.llm_enabled, - "ready": ready, - "base_url": settings.llm_base_url, - "model": settings.llm_model, - } - - -@router.get("/llm/queue") -async def llm_queue( - request: Request, - vertical: Vertical = Query(..., description="required: real_estate | hr"), - section: str | None = Query(None, description="optional section slug"), - session: AsyncSession = Depends(get_session), -) -> dict[str, int]: - """Pending classifications restricted to the vertical (+ section if given).""" - department_id = _read_department_id(request) - await _require_scope_access(request, session, vertical, section) - return {"pending": await pending_llm_count(vertical, section, department_id)} - - -@router.get("/llm/prompt") -async def llm_prompt_get( - request: Request, - vertical: Vertical = Query(..., description="required: real_estate | hr"), - section: str | None = Query( - None, description="optional section slug — return that level's prompt" - ), - session: AsyncSession = Depends(get_session), -) -> dict[str, Any]: - """Active LLM system prompt for the (vertical, section) level + the source. - - `source` is one of `section` / `vertical` / `default` and tells the UI - whether the override is at the requested level or just inherited. - """ - department_id = _read_department_id(request) - await _require_scope_access(request, session, vertical, section) - default = llm_client.default_prompt(vertical) - text, source = await prompt_store.get(vertical, department_id, section, default) - # Also return whether THIS exact level has its own override (for UI). - overridden_here = await prompt_store.is_overridden(vertical, department_id, section) - return { - "vertical": vertical, - "department_id": department_id, - "section": section, - "prompt": text, - "default": default, - "source": source, - "is_overridden_here": overridden_here, - } - - -@router.put("/llm/prompt", dependencies=[Depends(require_department_manager)]) -async def llm_prompt_put( - payload: dict, - request: Request, - vertical: Vertical = Query(..., description="required: real_estate | hr"), - section: str | None = Query( - None, description="optional section slug — save at section level" - ), -) -> dict[str, Any]: - department_id = _manage_department_id(request) - text = payload.get("prompt") - if not isinstance(text, str) or not text.strip(): - raise HTTPException(status_code=400, detail="prompt must be a non-empty string") - if len(text) > 30000: - raise HTTPException( - status_code=400, detail="prompt is too long (max 30000 chars)" - ) - try: - await prompt_store.set_prompt(vertical, department_id, section, text) - except ValueError as exc: - raise HTTPException(status_code=400, detail=str(exc)) - return { - "saved": True, - "vertical": vertical, - "department_id": department_id, - "section": section, - "length": len(text), - } - - -@router.delete("/llm/prompt", status_code=204, dependencies=[Depends(require_department_manager)]) -async def llm_prompt_reset( - request: Request, - vertical: Vertical = Query(..., description="required: real_estate | hr"), - section: str | None = Query( - None, description="optional section slug — reset that level" - ), -) -> None: - department_id = _manage_department_id(request) - await prompt_store.reset(vertical, department_id, section) - - -# --- Settings & batch poll --------------------------------------------- - - -@router.get("/settings", dependencies=[Depends(require_admin)]) -async def get_settings_view() -> dict[str, str | int]: - """Read-only view of runtime config — managed via .env, not editable from UI.""" - return { - "poll_interval_seconds": settings.poll_interval_seconds, - "poll_history_limit": settings.poll_history_limit, - "tg_session_path": settings.tg_session_path, - "postgres_host": settings.postgres_host, - "postgres_port": settings.postgres_port, - "postgres_db": settings.postgres_db, - "api_host": settings.api_host, - "api_port": settings.api_port, - } - - async def _poll_all_in_background(channel_ids: list[int]) -> None: - for cid in channel_ids: + for channel_id in channel_ids: try: - await poll_channel(cid) + await poll_channel(channel_id) except Exception: continue @@ -1025,20 +159,20 @@ async def _poll_all_in_background(channel_ids: list[int]) -> None: async def trigger_poll_all( background: BackgroundTasks, request: Request, - vertical: Vertical = Query(..., description="required: real_estate | hr"), - section: str | None = Query(None, description="optional section slug"), + vertical: str = Query(...), + section: str | None = Query(None), session: AsyncSession = Depends(get_session), -) -> dict[str, int]: - """Queue a poll of every active channel in this vertical (+ section if given).""" - department_id = _manage_department_id(request) - stmt = select(Channel.id).where( - Channel.is_active.is_(True), Channel.vertical == vertical +) -> dict[str, Any]: + department_id = _department_scope(request) + stmt = ( + select(Channel.id) + .join(Section, Section.id == Channel.section_id) + .where(Channel.is_active.is_(True), Channel.vertical == vertical) ) - stmt = stmt.join(Section, Section.id == Channel.section_id) + if section: + stmt = stmt.where(Section.slug == section) if department_id is not None: stmt = stmt.where(Section.department_id == department_id) - if section is not None: - stmt = stmt.where(Section.slug == section) result = await session.execute(stmt) ids = [row[0] for row in result.all()] background.add_task(_poll_all_in_background, ids) diff --git a/src/parser_bot/api/schemas.py b/src/parser_bot/api/schemas.py deleted file mode 100644 index 3438910..0000000 --- a/src/parser_bot/api/schemas.py +++ /dev/null @@ -1,219 +0,0 @@ -import re -from datetime import datetime -from typing import Literal - -from pydantic import BaseModel, ConfigDict, Field, field_validator - -Vertical = Literal["real_estate", "hr"] - -# Section slugs are used as URL segments — keep them URL-safe. -_SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,62}[a-z0-9]$|^[a-z0-9]$") - - -class SectionCreate(BaseModel): - vertical: Vertical - slug: str = Field(..., min_length=1, max_length=64) - title: str = Field(..., min_length=1, max_length=255) - emoji: str | None = Field(None, max_length=8) - description: str | None = None - - @field_validator("slug") - @classmethod - def _check_slug(cls, v: str) -> str: - if not _SLUG_RE.match(v): - raise ValueError( - "slug must be lowercase letters/digits with '-' or '_' separators" - ) - return v - - -class SectionUpdate(BaseModel): - title: str | None = Field(None, min_length=1, max_length=255) - emoji: str | None = Field(None, max_length=8) - description: str | None = None - - -class SectionOut(BaseModel): - model_config = ConfigDict(from_attributes=True) - - id: int - vertical: Vertical - department_id: str | None = None - slug: str - title: str - emoji: str | None - description: str | None - created_at: datetime - - -class SectionWithStats(SectionOut): - """Section payload enriched with rollup counts for the section chooser page.""" - - channels_total: int = 0 - channels_active: int = 0 - messages_total: int = 0 - leads_total: int = 0 - - -class ChannelCreate(BaseModel): - identifier: str = Field(..., min_length=1, max_length=255, description="@username or t.me link") - vertical: Vertical = "real_estate" - section: str = Field( - ..., min_length=1, max_length=64, - description="Slug of the section inside the vertical (e.g. 'dubai')", - ) - - -class ChannelUpdate(BaseModel): - is_active: bool | None = None - vertical: Vertical | None = None - section: str | None = Field( - None, min_length=1, max_length=64, - description="Move the channel to another section in the same vertical", - ) - - -class ChannelOut(BaseModel): - model_config = ConfigDict(from_attributes=True) - - id: int - tg_id: int | None - identifier: str - title: str | None - vertical: Vertical - section_id: int - section_slug: str | None = None - is_active: bool - last_message_id: int | None - last_polled_at: datetime | None - created_at: datetime - - -class ChannelStats(BaseModel): - channel_id: int - identifier: str - title: str | None - vertical: Vertical - section_slug: str | None = None - is_active: bool - last_polled_at: datetime | None - message_count: int - last_message_at: datetime | None - - -class MediaFile(BaseModel): - kind: str # photo | video | document | audio | sticker | unknown - url: str | None = None - mime: str | None = None - size: int | None = None - skipped: str | None = None # set when not downloaded (e.g. "too_large") - - -class RealEstate(BaseModel): - kind: str | None = None - property_type: str | None = None - rooms: str | None = None - area_m2: float | None = None - price: str | None = None - - -class Lead(BaseModel): - is_listing: bool - kind: str | None = None # sale | rent | purchase - property_type: str | None = None - rooms: str | None = None - area_m2: float | None = None - price_text: str | None = None - price_value: float | None = None - currency: str | None = None # RUB | USD | EUR | AED | GBP | CNY | TRY | KZT | BYN | UAH - location: str | None = None - contact_phone: str | None = None - contact_name: str | None = None - summary: str | None = None - confidence: float = 0.0 - - -class HrLead(BaseModel): - """LLM verdict for HR-vertical messages (jobs / resumes / bare contacts).""" - - is_lead: bool - kind: str | None = None # vacancy | resume | contact - title: str | None = None - company: str | None = None - candidate_name: str | None = None - experience_years: float | None = None - skills: list[str] = [] - location: str | None = None - remote: bool | None = None - employment_type: str | None = None - salary_text: str | None = None - salary_value: float | None = None - currency: str | None = None - contact_phone: str | None = None - contact_name: str | None = None - summary: str | None = None - confidence: float = 0.0 - - -class Extracted(BaseModel): - phones: list[str] = [] - names: list[str] = [] - tg_handles: list[str] = [] - real_estate: RealEstate | None = None - lead: Lead | None = None - hr_lead: HrLead | None = None - - -class MessageOut(BaseModel): - model_config = ConfigDict(from_attributes=True) - - id: int - channel_id: int - channel_vertical: Vertical | None = None - channel_section_slug: str | None = None - tg_message_id: int - grouped_id: int | None = None - group_size: int = 1 - date: datetime - text: str | None - sender_id: int | None - has_media: bool - media_files: list[MediaFile] | None = None - extracted: Extracted | None = None - sender_username: str | None = None - sender_name: str | None = None - post_url: str | None = None - views: int | None - forwards: int | None - fetched_at: datetime - - -class GlobalStats(BaseModel): - vertical: Vertical - section_slug: str | None = None - channels_total: int - channels_active: int - messages_total: int - messages_last_24h: int - leads_total: int = 0 - leads_last_24h: int = 0 - poll_interval_seconds: int - last_poll_at: datetime | None - - -class AuthStatus(BaseModel): - authorized: bool - username: str | None = None - phone: str | None = None - - -class AuthCode(BaseModel): - code: str = Field(..., min_length=3, max_length=12) - - -class AuthPassword(BaseModel): - password: str = Field(..., min_length=1) - - -class AuthCodeResult(BaseModel): - needs_password: bool diff --git a/src/parser_bot/config.py b/src/parser_bot/config.py index 976fc88..3e26059 100644 --- a/src/parser_bot/config.py +++ b/src/parser_bot/config.py @@ -29,18 +29,6 @@ class Settings(BaseSettings): media_dir: str = Field("/data/media", alias="MEDIA_DIR") media_max_bytes: int = Field(20 * 1024 * 1024, alias="MEDIA_MAX_BYTES") - # OpenAI-compatible LLM endpoint, shared with telephony/vLLM in production. - llm_enabled: bool = Field(True, alias="LLM_ENABLED") - llm_base_url: str = Field("http://10.2.3.5:8002", alias="LLM_BASE_URL") - llm_api_key: str = Field("", alias="LLM_API_KEY") - llm_model: str = Field("qwen2.5-14b", alias="LLM_MODEL") - llm_timeout_seconds: int = Field(120, alias="LLM_TIMEOUT_SECONDS") - llm_max_tokens: int = Field(600, alias="LLM_MAX_TOKENS") - llm_min_text_length: int = Field(20, alias="LLM_MIN_TEXT_LENGTH") - llm_classify_interval_seconds: int = Field(20, alias="LLM_CLASSIFY_INTERVAL_SECONDS") - llm_classify_batch_size: int = Field(5, alias="LLM_CLASSIFY_BATCH_SIZE") - llm_classifier_owner: str = Field("python", alias="LLM_CLASSIFIER_OWNER") - @property def database_url(self) -> str: return ( diff --git a/src/parser_bot/extractors.py b/src/parser_bot/extractors.py index 769ad69..d770d5b 100644 --- a/src/parser_bot/extractors.py +++ b/src/parser_bot/extractors.py @@ -310,26 +310,3 @@ def analyze(text: str | None) -> dict[str, Any]: "tg_handles": extract_tg_handles(text), "real_estate": extract_real_estate(text), } - - -async def analyze_with_llm( - text: str | None, - vertical: str = "real_estate", - department_id: str | None = None, - section_slug: str | None = None, -) -> dict[str, Any]: - """Regex extraction + local LLM lead classification, routed by vertical. - - `department_id` + `section_slug` let the classifier pick a department and - section-specific system prompt with fallback to the department vertical - prompt. The LLM verdict goes under `lead` for RE and - under `hr_lead` for HR. Falls back to regex-only if the LLM is unavailable. - """ - base = analyze(text) - # Lazy import to avoid hard dep on httpx in environments where LLM is off. - from parser_bot.llm import classify - - verdict = await classify(text, vertical, department_id, section_slug) # type: ignore[arg-type] - if verdict is not None: - base["hr_lead" if vertical == "hr" else "lead"] = verdict - return base diff --git a/src/parser_bot/links.py b/src/parser_bot/links.py deleted file mode 100644 index 687351e..0000000 --- a/src/parser_bot/links.py +++ /dev/null @@ -1,44 +0,0 @@ -"""Build Telegram URLs from stored channel metadata.""" -from __future__ import annotations - -import re - -_USERNAME_RE = re.compile(r"^@?([A-Za-z][A-Za-z0-9_]{4,31})$") -_TME_URL_RE = re.compile( - r"^(?:https?://)?(?:t|telegram)\.me/(?:s/)?([A-Za-z][A-Za-z0-9_]{4,31})(?:/.*)?$" -) - - -def channel_username(identifier: str | None) -> str | None: - """Extract the public username from a channel identifier if any. - - Returns None for private channels (joinchat, +invite, raw IDs). - """ - if not identifier: - return None - s = identifier.strip() - m = _USERNAME_RE.match(s) - if m: - return m.group(1) - m = _TME_URL_RE.match(s) - if m: - return m.group(1) - return None - - -def post_url(identifier: str | None, tg_id: int | None, tg_message_id: int) -> str | None: - """Build a deep link to a Telegram post. - - Public channel: https://t.me// - Private channel (no public username, only tg_id): https://t.me/c// - where is the absolute id with the leading -100 stripped. - """ - username = channel_username(identifier) - if username: - return f"https://t.me/{username}/{tg_message_id}" - if tg_id is None: - return None - raw = abs(tg_id) - s = str(raw) - short = s[3:] if s.startswith("100") and len(s) > 3 else s - return f"https://t.me/c/{short}/{tg_message_id}" diff --git a/src/parser_bot/llm.py b/src/parser_bot/llm.py deleted file mode 100644 index ed28c24..0000000 --- a/src/parser_bot/llm.py +++ /dev/null @@ -1,387 +0,0 @@ -"""OpenAI-compatible LLM client for lead classification & extraction. - -Two verticals share one model and one process: - - real_estate: high recall on listings (sale/rent/purchase), - - hr: vacancies, resumes, bare contact leads. - -The system prompt and JSON schema differ per vertical; the rest of the -plumbing (timeouts, single-lock concurrency, JSON-mode parsing) is shared. -On any error returns `None` and the caller falls back to regex-only extraction. -""" -from __future__ import annotations - -import asyncio -import json -from typing import Any, Literal - -import httpx -import structlog - -from parser_bot.config import settings - -log = structlog.get_logger() - - -# Single shared lock so we never run two LLM requests at once on the GPU — -# they would just thrash VRAM and finish slower than sequential. -_lock = asyncio.Lock() - - -Vertical = Literal["real_estate", "hr"] - - -DEFAULT_RE_SYSTEM_PROMPT = """\ -Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала. -Сообщение МОЖЕТ БЫТЬ НА ЛЮБОМ ЯЗЫКЕ — русский, английский, арабский, любой -другой. Обрабатывай его одинаково независимо от языка. - -Задача: определить, является ли это РЕАЛЬНЫМ объявлением о покупке, продаже -или аренде НЕДВИЖИМОСТИ (квартира, дом/villa, студия/studio, апартаменты, -комната, таунхаус/townhouse, дача, коттедж, пентхаус/penthouse, офис, -склад, помещение, земельный участок/plot/land, гараж, машиноместо). -Учитывай намёки и нечёткие формулировки — лучше отметить сомнительный лид -как `is_listing=true` с низкой confidence, чем пропустить. - -Сигналы что это ОБЪЯВЛЕНИЕ (kind): -— продажа/sale: «продаётся», «продаю», «продажа», «for sale», «#forsale», - «selling price», «selling», «price», «AED 33M», ценник в любой валюте. -— аренда/rent: «сдаётся», «сдаю», «аренда», «for rent», «to let», «rental», - «per year», «per month», «AED ... /year». -— покупка/purchase: «куплю», «куплю в», «looking for», «want to buy», - «wanted», «requirement». - -ОДНО сообщение может быть и про продажу, И про аренду одновременно -(«FOR SALE | RENT» / «продажа или аренда»). В таком случае выбирай -основное намерение по самому тексту; если равноценно — `kind="sale"` -и упомяни аренду в summary. - -НЕ объявления (is_listing=false): -— общие новости / статьи / аналитика рынка; -— воспоминания и истории («когда-то продавал квартиру»); -— шутки, мемы, цитаты; -— реклама услуг агентств без конкретного объекта; -— чужие пересланные объявления без контактов и явного предложения от автора. - -Отвечай СТРОГО валидным JSON по схеме (никаких комментариев, никакого markdown): -{ - "is_listing": boolean, - "kind": "sale" | "rent" | "purchase" | null, - "property_type": "квартира" | "дом" | "студия" | "апартаменты" | "комната" | "таунхаус" | "дача" | "коттедж" | "офис" | "склад" | "помещение" | "участок" | "гараж" | "машиноместо" | null, - "rooms": "студия" | "1-к" | "2-к" | "3-к" | "4-к" | "5+к" | null, - "area_m2": number | null, - "price_text": string | null, - "price_value": number | null, - "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, - "location": string | null, - "contact_phone": string | null, - "contact_name": string | null, - "summary": string, - "confidence": number -} - -Поля: -- summary — ОДНО короткое предложение НА РУССКОМ языке (даже если исходный - текст на английском или другом). Это нужно для единообразного UI. -- property_type — пиши значение по-русски (villa→дом, apartment→квартира, - townhouse→таунхаус, plot/land→участок, studio→студия, penthouse→апартаменты, - house→дом, office→офис, warehouse→склад, retail→помещение). -- rooms — для англоязычного «3BR», «3 BR», «3 bed», «3-bedroom» возвращай - «3-к»; для «studio» → «студия». -- area_m2 — площадь В КВАДРАТНЫХ МЕТРАХ. Если в тексте sqft / sq.ft / sq ft / - square feet — переведи: m² = sqft × 0.0929. Округляй до целого. -- confidence ∈ [0, 1]: 0.9+ если явное объявление с ценой/контактом, - 0.5–0.8 если правдоподобно, 0.2–0.4 если намёк. -- price_text — точная цитата из текста («2.5 млн ₽», «AED 850 000», «$320k», - «300 тыс. дирхам», «د.إ 1.2M», «70,000,000 AED», «AED 4.3M», «AED 1.75M»). -- price_value — числовая величина цены В УКАЗАННОЙ ВАЛЮТЕ (не конвертируй). - Раскрывай сокращения: «AED 4.3M» → 4300000, «$320k» → 320000. -- currency — определяй гибко: ₽/руб/р/RUB/рублей → RUB; $/USD/долл/бакс → USD; - €/EUR/евро → EUR; AED/дирхам/дирхамов/дирхама/dh/dhs/د.إ/Dirhams → AED; - ₺/TRY/лир/лира → TRY; ¥/CNY/юань → CNY; ₸/KZT/тенге → KZT; - Br/BYN/бел.руб → BYN; ₴/UAH/грн → UAH. Если не уверен — null. -- contact_phone — любой номер телефона в тексте (с + или без, российский, - ОАЭ, любой международный). -""" - - -DEFAULT_HR_SYSTEM_PROMPT = """\ -Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала. Сообщение -МОЖЕТ БЫТЬ НА ЛЮБОМ ЯЗЫКЕ — обрабатывай одинаково. - -Задача: определить, относится ли сообщение к рынку труда, и какого типа лид -это. Допускаются три типа (`kind`): -— vacancy — компания/наниматель ищет сотрудника («ищем разработчика», - «hiring backend engineer», «требуется бухгалтер», «we are looking for»); -— resume — соискатель ищет работу («ищу работу», «open to work», «available - for hire», «рассматриваю предложения», «my CV», «резюме»); -— contact — короткое сообщение с именем/контактом и намёком на профессию, - без явной вакансии/резюме («Иван Петров, Python, +7…», «@nick — UI/UX, - Дубай»). Используй, когда vacancy и resume не подходят, но из текста ясно, - что это HR-контакт. - -Лучше отметить сомнительный случай `is_lead=true` с низкой confidence, -чем пропустить. НО полностью исключай: -— общие новости и аналитика рынка труда без конкретной вакансии/резюме; -— реклама курсов, школ, маркетплейсов услуг (Profi.ru и т.п.); -— чужие пересланные посты без контактов и без явного предложения от автора; -— объявления о продаже/аренде недвижимости, услуг и товаров; -— мемы, шутки, цитаты. - -Отвечай СТРОГО валидным JSON по схеме (никаких комментариев, никакого markdown): -{ - "is_lead": boolean, - "kind": "vacancy" | "resume" | "contact" | null, - "title": string | null, - "company": string | null, - "candidate_name": string | null, - "experience_years": number | null, - "skills": string[], - "location": string | null, - "remote": boolean | null, - "employment_type": "full-time" | "part-time" | "contract" | "internship" | null, - "salary_text": string | null, - "salary_value": number | null, - "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, - "contact_phone": string | null, - "contact_name": string | null, - "summary": string, - "confidence": number -} - -Поля: -- title — должность/роль ОДНОЙ строкой («Senior Python Developer», «Бухгалтер», - «UI/UX-дизайнер»). Для resume — желаемая роль. Для contact — то, что заявлено. -- company — название компании-нанимателя, если оно явно указано (vacancy). -- candidate_name — ФИО или ник кандидата (resume / contact). -- experience_years — стаж в годах числом. «5+ years» → 5. Если не указан — null. -- skills — короткий массив ключевых навыков/технологий (до ~10 элементов). -- remote — true для «удалёнка / remote / WFH / hybrid: remote», false для - «офис / on-site», null если не указано. -- employment_type — full-time для «полная занятость / full-time», part-time - для «частичная / part-time», contract для «договор/контракт/freelance», - internship для «стажировка/internship». Иначе null. -- salary_text — точная цитата с зарплатой («200–300k ₽», «$5k/mo», «AED 18,000 per month»). -- salary_value — число В УКАЗАННОЙ ВАЛЮТЕ. Если диапазон — нижняя граница. - Раскрывай сокращения: «200k» → 200000, «1.5M» → 1500000. -- currency — определяй гибко: ₽/руб/RUB → RUB; $/USD/долл → USD; €/EUR/евро → EUR; - AED/дирхам/dh/dhs → AED; ₺/TRY/лир → TRY; ¥/CNY/юань → CNY; ₸/KZT/тенге → KZT; - Br/BYN/бел.руб → BYN; ₴/UAH/грн → UAH. Если не уверен — null. -- contact_phone — любой номер телефона (RU / международный, с + или без). -- contact_name — имя контактного лица (рекрутер / соискатель / автор). -- summary — ОДНО короткое предложение НА РУССКОМ языке. -- confidence ∈ [0, 1]: 0.9+ если явная вакансия/резюме с деталями, 0.5–0.8 - если правдоподобно, 0.2–0.4 если намёк. -""" - - -# Back-compat alias — older imports referenced DEFAULT_SYSTEM_PROMPT. -DEFAULT_SYSTEM_PROMPT = DEFAULT_RE_SYSTEM_PROMPT - - -def _build_user_prompt(text: str) -> str: - return f"Текст сообщения:\n```\n{text}\n```\nВерни JSON." - - -_VALID_CURRENCIES = { - "RUB", "USD", "EUR", "AED", "GBP", "CNY", "TRY", "KZT", "BYN", "UAH" -} - - -def _coerce_real_estate(payload: Any) -> dict | None: - if not isinstance(payload, dict): - return None - is_listing = bool(payload.get("is_listing")) - currency = payload.get("currency") - if currency is not None: - currency = str(currency).upper() - if currency not in _VALID_CURRENCIES: - currency = None - return { - "is_listing": is_listing, - "kind": payload.get("kind") if payload.get("kind") in ("sale", "rent", "purchase") else None, - "property_type": payload.get("property_type") or None, - "rooms": payload.get("rooms") or None, - "area_m2": _as_float(payload.get("area_m2")), - "price_text": payload.get("price_text") or None, - "price_value": _as_float(payload.get("price_value")), - "currency": currency, - "location": payload.get("location") or None, - "contact_phone": payload.get("contact_phone") or None, - "contact_name": payload.get("contact_name") or None, - "summary": (payload.get("summary") or "")[:300], - "confidence": max(0.0, min(1.0, _as_float(payload.get("confidence")) or 0.0)), - } - - -def _coerce_hr(payload: Any) -> dict | None: - if not isinstance(payload, dict): - return None - is_lead = bool(payload.get("is_lead")) - currency = payload.get("currency") - if currency is not None: - currency = str(currency).upper() - if currency not in _VALID_CURRENCIES: - currency = None - skills_raw = payload.get("skills") or [] - if isinstance(skills_raw, str): - skills = [s.strip() for s in skills_raw.split(",") if s.strip()] - elif isinstance(skills_raw, list): - skills = [str(s).strip() for s in skills_raw if str(s).strip()] - else: - skills = [] - skills = skills[:15] - employment = payload.get("employment_type") - if employment is not None and employment not in ( - "full-time", "part-time", "contract", "internship" - ): - employment = None - remote_raw = payload.get("remote") - remote = bool(remote_raw) if isinstance(remote_raw, bool) else None - return { - "is_lead": is_lead, - "kind": payload.get("kind") if payload.get("kind") in ("vacancy", "resume", "contact") else None, - "title": payload.get("title") or None, - "company": payload.get("company") or None, - "candidate_name": payload.get("candidate_name") or None, - "experience_years": _as_float(payload.get("experience_years")), - "skills": skills, - "location": payload.get("location") or None, - "remote": remote, - "employment_type": employment, - "salary_text": payload.get("salary_text") or None, - "salary_value": _as_float(payload.get("salary_value")), - "currency": currency, - "contact_phone": payload.get("contact_phone") or None, - "contact_name": payload.get("contact_name") or None, - "summary": (payload.get("summary") or "")[:300], - "confidence": max(0.0, min(1.0, _as_float(payload.get("confidence")) or 0.0)), - } - - -def _as_float(v: Any) -> float | None: - if v is None or isinstance(v, bool): - return None - try: - return float(v) - except (TypeError, ValueError): - return None - - -async def is_ready() -> bool: - """Check that the OpenAI-compatible model endpoint is reachable.""" - try: - async with httpx.AsyncClient(timeout=5) as client: - headers = _headers() - r = await client.get(f"{_base_url()}/v1/models", headers=headers) - r.raise_for_status() - models = r.json().get("data", []) - if not models: - return True - model_ids = { - str(m.get("id") or m.get("name") or "") - for m in models - if isinstance(m, dict) - } - return settings.llm_model in model_ids or any( - mid.startswith(settings.llm_model) for mid in model_ids - ) - except Exception: - return False - - -def _base_url() -> str: - return settings.llm_base_url.rstrip("/") - - -def _headers() -> dict[str, str]: - headers = {"Content-Type": "application/json"} - if settings.llm_api_key: - headers["Authorization"] = f"Bearer {settings.llm_api_key}" - return headers - - -def default_prompt(vertical: Vertical) -> str: - return DEFAULT_HR_SYSTEM_PROMPT if vertical == "hr" else DEFAULT_RE_SYSTEM_PROMPT - - -async def classify( - text: str | None, - vertical: Vertical = "real_estate", - department_id: str | None = None, - section_slug: str | None = None, -) -> dict | None: - """Classify a message text under the given vertical/section. - - The system prompt is resolved with `section → vertical → built-in` fallback, - so a per-section prompt can fine-tune extraction (e.g. AED/sqft for Dubai) - while unconfigured sections keep using the vertical-wide prompt. - Returns a vertical-specific structured dict or None on error / short text. - """ - if not settings.llm_enabled: - return None - if not text or len(text.strip()) < settings.llm_min_text_length: - return None - - # Lazy import to avoid a circular: prompt_store -> db.session -> config. - from parser_bot import prompt_store - - system = await prompt_store.resolve( - vertical, department_id, section_slug, default_prompt(vertical) - ) - payload = { - "model": settings.llm_model, - "messages": [ - {"role": "system", "content": system}, - {"role": "user", "content": _build_user_prompt(text)}, - ], - "temperature": 0.1, - "max_tokens": settings.llm_max_tokens, - "response_format": {"type": "json_object"}, - } - async with _lock: - try: - async with httpx.AsyncClient(timeout=settings.llm_timeout_seconds) as client: - r = await client.post( - f"{_base_url()}/v1/chat/completions", - headers=_headers(), - json=payload, - ) - if r.status_code != 200: - log.warning( - "llm_request_failed", - status=r.status_code, - model=settings.llm_model, - vertical=vertical, - section=section_slug, - body=r.text[:300], - ) - return None - data = r.json() - except Exception as exc: - log.warning( - "llm_request_failed", error=str(exc), model=settings.llm_model, vertical=vertical - ) - return None - - choices = data.get("choices") or [] - message = choices[0].get("message") if choices and isinstance(choices[0], dict) else None - raw = ((message or {}).get("content") or "").strip() - if not raw: - return None - try: - parsed = json.loads(raw) - except json.JSONDecodeError: - # Best effort: extract first {...} block. - start, end = raw.find("{"), raw.rfind("}") - if start == -1 or end == -1: - log.warning("llm_invalid_json", raw=raw[:200], vertical=vertical) - return None - try: - parsed = json.loads(raw[start : end + 1]) - except json.JSONDecodeError: - log.warning("llm_invalid_json", raw=raw[:200], vertical=vertical) - return None - - if vertical == "hr": - return _coerce_hr(parsed) - return _coerce_real_estate(parsed) diff --git a/src/parser_bot/main.py b/src/parser_bot/main.py index 68208eb..28ed799 100644 --- a/src/parser_bot/main.py +++ b/src/parser_bot/main.py @@ -1,5 +1,4 @@ from contextlib import asynccontextmanager -from pathlib import Path import structlog import uvicorn @@ -7,7 +6,6 @@ from fastapi import Depends, FastAPI from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html from fastapi.openapi.utils import get_openapi from fastapi.responses import JSONResponse -from fastapi.staticfiles import StaticFiles from parser_bot.access import require_admin from parser_bot.api.routes import router @@ -105,10 +103,6 @@ def create_app() -> FastAPI: title=app.title + " — redoc", ) - media_dir = Path(settings.media_dir) - media_dir.mkdir(parents=True, exist_ok=True) - # /media is fine to cache — file names are content-stable. - app.mount("/media", StaticFiles(directory=media_dir), name="media") return app diff --git a/src/parser_bot/prompt_store.py b/src/parser_bot/prompt_store.py deleted file mode 100644 index 919e6bc..0000000 --- a/src/parser_bot/prompt_store.py +++ /dev/null @@ -1,137 +0,0 @@ -"""Runtime-editable LLM system prompts, persisted in app_settings. - -Three resolution levels with fallback (more specific → less specific): - 1. `llm_system_prompt:::` — section override - 2. `llm_system_prompt::` — department vertical override - 3. built-in DEFAULT_RE_SYSTEM_PROMPT / DEFAULT_HR_SYSTEM_PROMPT - -The prompt is read on every classification call but cached for a short -window so the DB isn't hit per-message. Edits via the API invalidate the -cache for that level, so a save in the UI takes effect within seconds. -""" -from __future__ import annotations - -import time -from typing import Literal - -from sqlalchemy import select -from sqlalchemy.dialects.postgresql import insert as pg_insert - -from parser_bot.db.models import AppSetting -from parser_bot.db.session import session_scope - -Vertical = Literal["real_estate", "hr"] - -_KEY_PREFIX = "llm_system_prompt:" -_CACHE_TTL_S = 5.0 -_cache: dict[str, tuple[float, str | None]] = {} - - -def _key( - vertical: Vertical, - department_id: str | None, - section_slug: str | None = None, -) -> str: - dept = department_id or "global" - if section_slug: - return f"{_KEY_PREFIX}{dept}:{vertical}:{section_slug}" - return f"{_KEY_PREFIX}{dept}:{vertical}" - - -async def _load(key: str) -> str | None: - """Read a stored prompt by exact key. None if missing or empty.""" - now = time.monotonic() - cached_at, cached_value = _cache.get(key, (0.0, None)) - if now - cached_at < _CACHE_TTL_S: - return cached_value - - async with session_scope() as session: - row = await session.execute( - select(AppSetting.value).where(AppSetting.key == key) - ) - value = row.scalar_one_or_none() - - text = value if isinstance(value, str) and value.strip() else None - _cache[key] = (now, text) - return text - - -async def resolve( - vertical: Vertical, department_id: str | None, section_slug: str | None, default: str -) -> str: - """Pick the most specific prompt available, falling back to `default`. - - Always consults section-level → vertical-level → default. This is what - the classifier uses for every message. - """ - if section_slug: - text = await _load(_key(vertical, department_id, section_slug)) - if text is not None: - return text - text = await _load(_key(vertical, department_id)) - if text is not None: - return text - return default - - -async def get( - vertical: Vertical, department_id: str | None, section_slug: str | None, default: str -) -> tuple[str, str]: - """For the settings UI: return (text, source) where source is one of - 'section' | 'vertical' | 'default'. Lets the editor show which override - is currently active without a second round-trip. - """ - if section_slug: - text = await _load(_key(vertical, department_id, section_slug)) - if text is not None: - return text, "section" - text = await _load(_key(vertical, department_id)) - if text is not None: - return text, "vertical" - return default, "default" - - -async def set_prompt( - vertical: Vertical, department_id: str | None, section_slug: str | None, text: str -) -> None: - """Save a new prompt at the given level (section or vertical).""" - if not isinstance(text, str) or not text.strip(): - raise ValueError("prompt must be a non-empty string") - key = _key(vertical, department_id, section_slug) - async with session_scope() as session: - stmt = ( - pg_insert(AppSetting) - .values(key=key, value=text) - .on_conflict_do_update( - index_elements=["key"], set_={"value": text} - ) - ) - await session.execute(stmt) - invalidate(key) - - -async def reset( - vertical: Vertical, department_id: str | None, section_slug: str | None -) -> None: - """Drop the override at the given level.""" - key = _key(vertical, department_id, section_slug) - async with session_scope() as session: - await session.execute( - AppSetting.__table__.delete().where(AppSetting.key == key) - ) - invalidate(key) - - -def invalidate(key: str | None = None) -> None: - if key is None: - _cache.clear() - else: - _cache.pop(key, None) - - -async def is_overridden( - vertical: Vertical, department_id: str | None, section_slug: str | None = None -) -> bool: - """True iff a custom prompt is stored at this exact level.""" - text = await _load(_key(vertical, department_id, section_slug)) - return text is not None diff --git a/src/parser_bot/scheduler/poller.py b/src/parser_bot/scheduler/poller.py index 2ecc8c1..22aa0b3 100644 --- a/src/parser_bot/scheduler/poller.py +++ b/src/parser_bot/scheduler/poller.py @@ -6,9 +6,9 @@ from sqlalchemy import func, select from sqlalchemy.dialects.postgresql import insert as pg_insert from parser_bot.config import settings -from parser_bot.db.models import Channel, Message, Section +from parser_bot.db.models import Channel, Message from parser_bot.db.session import session_scope -from parser_bot.extractors import analyze, analyze_with_llm +from parser_bot.extractors import analyze from parser_bot.telegram.client import ( fetch_new_messages, fetch_specific_messages_with_media, @@ -19,29 +19,6 @@ from parser_bot.telegram.client import ( log = structlog.get_logger() -def _verdict_key(vertical: str) -> str: - """JSONB key under `extracted` where the LLM verdict lives for this vertical.""" - return "hr_lead" if vertical == "hr" else "lead" - - -def _needs_work_clause(vertical: str | None): - """Rows that still need LLM classification. - - A row needs work when: - - extracted IS NULL (never analyzed), or - - the verdict for this vertical is missing. - - Without `vertical`, falls back to "missing any verdict" — used by - aggregate /llm/queue display when no vertical is selected. - """ - if vertical is None: - return (Message.extracted.is_(None)) | ( - Message.extracted["lead"].is_(None) & Message.extracted["hr_lead"].is_(None) - ) - key = _verdict_key(vertical) - return (Message.extracted.is_(None)) | (Message.extracted[key].is_(None)) - - async def poll_channel(channel_id: int) -> int: """Poll one channel for new messages. Returns count of inserted rows.""" async with session_scope() as session: @@ -63,9 +40,8 @@ async def poll_channel(channel_id: int) -> int: inserted = 0 for m in msgs: - # Only the cheap regex pass runs in the poll path. LLM classification - # is handled by `classify_pending` in a background scheduler job so - # that a poll request never blocks on a 5s/message LLM call. + # Only the cheap regex pass runs in Python. LLM classification is + # handled by the Go classifier so Telegram polling stays lightweight. stmt = ( pg_insert(Message) .values( @@ -182,162 +158,6 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int return {"updated": updated, "pending": max(0, pending_total - updated)} -async def reanalyze_channel(channel_id: int, batch_size: int = 5) -> dict[str, int]: - """Re-run extractors (regex + LLM) over messages missing this channel's verdict. - - Picks the vertical AND section from the channel row so the right LLM - prompt is used. Only reanalyzes rows where the corresponding verdict key - is missing. Newest first so fresh leads surface during long backfills. - """ - async with session_scope() as session: - result = await session.execute( - select(Channel, Section.slug, Section.department_id) - .join(Section, Section.id == Channel.section_id) - .where(Channel.id == channel_id) - ) - row = result.one_or_none() - if row is None: - return {"updated": 0, "pending": 0} - channel, section_slug, department_id = row - vertical = channel.vertical - needs_work = _needs_work_clause(vertical) - - pending_total = ( - await session.execute( - select(func.count(Message.id)).where( - Message.channel_id == channel_id, - Message.text.is_not(None), - needs_work, - ) - ) - ).scalar_one() - - rows = ( - await session.execute( - select(Message.id, Message.text) - .where( - Message.channel_id == channel_id, - Message.text.is_not(None), - needs_work, - ) - .order_by(Message.id.desc()) - .limit(batch_size) - ) - ).all() - if not rows: - return {"updated": 0, "pending": 0} - - updated = 0 - for db_id, text in rows: - extracted = ( - await analyze_with_llm(text, vertical, department_id, section_slug) - if settings.llm_enabled - else analyze(text) - ) - msg = await session.get(Message, db_id) - if msg is None: - continue - msg.extracted = extracted - updated += 1 - - log.info( - "reanalyzed_channel", - channel_id=channel_id, - vertical=vertical, - section=section_slug, - updated=updated, - remaining=max(0, pending_total - updated), - ) - return {"updated": updated, "pending": max(0, pending_total - updated)} - - -async def pending_llm_count( - vertical: str | None = None, - section_slug: str | None = None, - department_id: str | None = None, -) -> int: - """How many text messages still need LLM classification. - - When `vertical` is set, only counts messages from channels of that vertical - (and optionally that section) whose vertical-specific verdict is missing. - """ - if not settings.llm_enabled: - return 0 - needs_work = _needs_work_clause(vertical) - async with session_scope() as session: - stmt = select(func.count(Message.id)).where( - Message.text.is_not(None), - needs_work, - ) - if vertical is not None: - stmt = stmt.join(Channel, Channel.id == Message.channel_id).where( - Channel.vertical == vertical - ) - if section_slug is not None or department_id is not None: - if vertical is None: - stmt = stmt.join(Channel, Channel.id == Message.channel_id) - stmt = stmt.join(Section, Section.id == Channel.section_id) - if section_slug is not None: - stmt = stmt.where(Section.slug == section_slug) - if department_id is not None: - stmt = stmt.where(Section.department_id == department_id) - return (await session.execute(stmt)).scalar_one() - - -async def classify_pending(batch_size: int = 5) -> int: - """Run LLM over a batch of unclassified messages across all channels. - - Walks newest-first and picks the prompt/vertical/section from each - message's channel, so RE and HR channels (and per-section overrides) - share the same classifier worker without crosstalk. - """ - if not settings.llm_enabled: - return 0 - needs_work = _needs_work_clause(None) - - async with session_scope() as session: - rows = ( - await session.execute( - select( - Message.id, - Message.text, - Channel.vertical, - Section.slug, - Section.department_id, - ) - .join(Channel, Channel.id == Message.channel_id) - .join(Section, Section.id == Channel.section_id) - .where(Message.text.is_not(None), needs_work) - .order_by(Message.id.desc()) - .limit(batch_size) - ) - ).all() - if not rows: - return 0 - - updated = 0 - for db_id, text, vertical, section_slug, department_id in rows: - # If extracted already has THIS vertical's verdict, skip — needs_work - # uses an OR over both keys and would otherwise re-run RE channels - # that already have a lead just because hr_lead is null. - existing = ( - await session.execute(select(Message.extracted).where(Message.id == db_id)) - ).scalar_one_or_none() - key = _verdict_key(vertical) - if existing and existing.get(key) is not None: - continue - extracted = await analyze_with_llm(text, vertical, department_id, section_slug) - msg = await session.get(Message, db_id) - if msg is None: - continue - msg.extracted = extracted - updated += 1 - - if updated: - log.info("classify_pending_batch", updated=updated) - return updated - - def build_scheduler() -> AsyncIOScheduler: scheduler = AsyncIOScheduler() scheduler.add_job( @@ -348,14 +168,4 @@ def build_scheduler() -> AsyncIOScheduler: max_instances=1, coalesce=True, ) - if settings.llm_enabled and settings.llm_classifier_owner != "go": - scheduler.add_job( - classify_pending, - "interval", - seconds=settings.llm_classify_interval_seconds, - id="classify_pending", - max_instances=1, - coalesce=True, - kwargs={"batch_size": settings.llm_classify_batch_size}, - ) return scheduler