Scope monitoring TG by department

This commit is contained in:
Grendgi
2026-06-04 15:31:10 +03:00
parent f9e072774c
commit b78d1eac02
27 changed files with 481 additions and 553 deletions

View File

@@ -32,23 +32,20 @@ API_PORT=8000
MEDIA_DIR=/data/media
MEDIA_MAX_BYTES=20971520
# Local LLM (Ollama) — runs Qwen 2.5 7B Q4 on CPU. Set LLM_ENABLED=false to disable.
# OpenAI-compatible LLM endpoint. In production this can point to the same
# vLLM server/model used by telephony.
LLM_ENABLED=true
LLM_BASE_URL=http://ollama:11434
LLM_MODEL=qwen2.5:7b-instruct-q4_K_M
LLM_BASE_URL=http://10.2.3.5:8002
LLM_API_KEY=
LLM_MODEL=qwen2.5-14b
LLM_TIMEOUT_SECONDS=120
LLM_MAX_TOKENS=600
LLM_MIN_TEXT_LENGTH=20
# How often the background classifier wakes up and how many messages it
# processes per tick. With 5/20s ≈ 900 messages/hour at ~3-6s per call.
LLM_CLASSIFY_INTERVAL_SECONDS=20
LLM_CLASSIFY_BATCH_SIZE=5
# Admin allowlist for /auth.html, /docs, /openapi.json, /redoc and the
# /api/v1/auth/* endpoints. Comma-separated list of client IPs.
# Empty = no restriction (everyone is admin) — convenient for local dev.
# Example: ADMIN_ALLOWED_IPS=89.110.109.221,127.0.0.1
ADMIN_ALLOWED_IPS=
# Honor X-Forwarded-For / X-Real-IP from a reverse proxy (Docker port-
# forward, nginx, traefik) when resolving the client IP for the allowlist.
TRUST_PROXY_HEADERS=true
# Optional local fallback for admin-only UI/API operations. In production the
# portal forwards X-User-Is-Admin=1, so no local password is required.
ADMIN_PASSWORD=

173
README.md
View File

@@ -1,123 +1,68 @@
# parser-tg-bot
# monitoring-tg
Парсер публичных Telegram-каналов на Telethon (MTProto). Сохраняет сообщения в Postgres,
управляется через REST API. Период опроса настраивается через `.env`. На следующем шаге
легко перевести на realtime через `events.NewMessage`.
Сервис мониторинга Telegram-каналов для портала. Он сохраняет сообщения в
Postgres, раскладывает каналы по вертикалям/подразделам и выполняет AI-анализ
через OpenAI-compatible endpoint, общий с другими сервисами портала.
## Стек
## Доступ
- Python 3.11, Telethon, FastAPI, SQLAlchemy 2 (async) + Alembic, APScheduler, Postgres 16
- Админские операции остаются за админом портала: portal прокидывает
`X-User-Is-Admin=1`.
- Отдел видит только свои подразделы, каналы, сообщения и промпты через
`X-User-Department-Id`.
- Руководитель отдела может создавать и редактировать подразделы своего отдела:
portal прокидывает `X-User-Is-Department-Head=1`.
- Пароли подразделов и IP allowlist удалены.
## Конфигурация
Основные переменные:
```env
TG_API_ID=
TG_API_HASH=
TG_PHONE=
TG_SESSION_STRING=
POSTGRES_HOST=postgres.monitoring-tg.svc.cluster.local
POSTGRES_PORT=5432
POSTGRES_USER=parser
POSTGRES_PASSWORD=parser
POSTGRES_DB=parser
PUBLIC_BASE_PATH=/api/monitoring-tg
LLM_ENABLED=true
LLM_BASE_URL=http://10.2.3.5:8002
LLM_API_KEY=
LLM_MODEL=qwen2.5-14b
```
Для локальной админской отладки можно задать `ADMIN_PASSWORD`, но в проде доступ
должен идти через портал.
## Запуск в k8s
Манифесты лежат в `k8s/`. Перед применением нужно заполнить `k8s/secrets.yaml`
реальными Telegram-кредами и, при необходимости, `LLM_API_KEY`.
```bash
kubectl apply -k k8s
```
Миграции выполняются entrypoint-ом контейнера перед запуском API.
## Структура
```text
src/parser_bot/
├── api/ # FastAPI роуты + Pydantic-схемы
├── db/ # SQLAlchemy модели + сессии
├── scheduler/ # APScheduler-воркер периодического опроса
├── telegram/ # Telethon-клиент (resolve, fetch)
├── web/static/ # SPA-странички (HTML/CSS/JS, без бандлера)
├── config.py # pydantic-settings
└── main.py # FastAPI lifespan + uvicorn
alembic/ # миграции
├── api/ FastAPI роуты + Pydantic-схемы
├── db/ SQLAlchemy модели + сессии
├── scheduler/ APScheduler-воркер периодического опроса
├── telegram/ Telethon-клиент
├── web/static/ страницы UI без бандлера
├── config.py pydantic-settings
└── main.py FastAPI lifespan + uvicorn
alembic/ миграции
k8s/ манифесты для портала
```
## Первый запуск (локально, через Docker)
1. Получить `api_id` и `api_hash` на [my.telegram.org](https://my.telegram.org) → API development tools.
2. Скопировать `.env.example` в `.env` и заполнить `TG_API_ID`, `TG_API_HASH`, `TG_PHONE`.
3. Поднять Postgres + накатить миграции:
```bash
docker compose up -d db
docker compose run --rm app alembic upgrade head
```
4. Запуск:
```bash
docker compose up -d
docker compose logs app --tail=50
```
5. **Авторизация Telegram** — открыть [http://localhost:8000/auth.html](http://localhost:8000/auth.html)
и нажать «Отправить код». Telegram пришлёт код на номер из `TG_PHONE` →
ввести код (и 2FA-пароль, если включён). Готово, парсер начнёт опрос.
Сессия сохраняется в `./data/session/parser.session` — рестарты её переиспользуют,
повторно входить не нужно.
### Админ-доступ и коды подразделов
- `ADMIN_PASSWORD` — дополнительный пароль для админских функций. Если не задан,
остаётся прежний режим: доступ определяется только `ADMIN_ALLOWED_IPS`.
- [http://localhost:8000/admin.html](http://localhost:8000/admin.html) — вход по
админ-паролю. После входа доступны удаление и редактирование подразделов,
просмотр их кодов, управление каналами, ручной опрос, промпты, авторизация
Telegram и Swagger.
- При создании подраздела обязательно задаётся `Код доступа`. Пользователь вводит
этот код при первом открытии данных подраздела; после входа он может добавлять
каналы в этот подраздел. Админ видит код в списке подразделов.
### Прод-вариант: без UI и без volume (k8s-friendly)
Сделай интерактивный логин **один раз** на dev-машине и получи опаковую строку:
```bash
docker compose run --rm -it app python -m parser_bot.auth
```
Скрипт напечатает строку вида `TG_SESSION_STRING=1AbcD...`. Положи её в
`.env` или k8s Secret — после этого приложение поднимается без UI и без
монтирования сессионного файла:
```ini
TG_SESSION_STRING=1AbcDef... # вместо TG_SESSION_PATH/volume
```
> ⚠️ **`ApiIdPublishedFloodError`** — Telegram заблокировал твою пару
> `api_id`/`api_hash` (попала в публичный доступ). Создай **новое** приложение
> на [my.telegram.org](https://my.telegram.org) и не публикуй креды нигде.
> Старый `api_id` восстановить нельзя.
## UI
После запуска доступны страницы:
- [Дашборд](http://localhost:8000/) — общая статистика, топ каналов, кнопка опросить всех
- [Каналы](http://localhost:8000/channels.html) — добавить / удалить / включить-выключить / опросить вручную
- [Сообщения](http://localhost:8000/messages.html) — фильтр по каналу, поиск по тексту, пагинация, raw JSON
- [Настройки](http://localhost:8000/settings.html) — текущая конфигурация и подсказки
- [Авторизация](http://localhost:8000/auth.html) — веб-логин в Telegram (код + 2FA)
- [Swagger UI](http://localhost:8000/docs) — интерактивный API
Глубокая ссылка `messages.html?channel_id=42` открывает ленту конкретного канала.
## API
- `GET /healthz` — health check
- `GET /api/v1/auth/status` — авторизован ли клиент
- `POST /api/v1/auth/send-code` — отправить код на `TG_PHONE`
- `POST /api/v1/auth/submit-code` `{"code": "12345"}` — подтвердить код
- `POST /api/v1/auth/submit-password` `{"password": "..."}` — 2FA-пароль
- `POST /api/v1/auth/logout` — завершить сессию
- `GET /api/v1/stats` — глобальные счётчики
- `GET /api/v1/settings` — read-only вид конфигурации
- `GET /api/v1/channels` — список каналов
- `POST /api/v1/channels` `{"identifier": "@durov"}` — добавить
- `GET /api/v1/channels/{id}` — карточка
- `PATCH /api/v1/channels/{id}` `{"is_active": false}` — включить/выключить
- `DELETE /api/v1/channels/{id}` — удалить
- `GET /api/v1/channels/{id}/stats` — счётчики по каналу
- `POST /api/v1/channels/{id}/poll` — форсировать опрос одного канала
- `POST /api/v1/poll` — форсировать опрос всех активных каналов
- `GET /api/v1/messages?channel_id=...&q=...&limit=50&offset=0` — лента
- `GET /api/v1/messages/{id}` — одно сообщение (с `raw` JSONB)
## Дальше
- **Realtime**: заменить APScheduler на `client.add_event_handler(handler, events.NewMessage)`,
оставив periodic poll как фоновый «доводчик» для пропущенных сообщений.
- **Go-микросервис**: контракт = таблицы `channels` / `messages` в Postgres.
Go-сервис может либо читать ту же БД, либо ходить в `/api/v1/messages`.
- **k8s**: добавить Helm-чарт; `data/session/` маппится на PVC, `.env` — в Secret.

View File

@@ -0,0 +1,52 @@
"""department-scoped sections
Revision ID: 0010
Revises: 0009
Create Date: 2026-06-04
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0010"
down_revision: Union[str, None] = "0009"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.alter_column(
"app_settings",
"key",
existing_type=sa.String(length=128),
type_=sa.String(length=255),
existing_nullable=False,
)
op.add_column("sections", sa.Column("department_id", sa.String(length=64), nullable=True))
op.drop_column("sections", "access_code")
op.drop_constraint("uq_section_vertical_slug", "sections", type_="unique")
op.create_unique_constraint(
"uq_section_vertical_department_slug",
"sections",
["vertical", "department_id", "slug"],
)
op.drop_index("ix_sections_vertical", table_name="sections")
op.create_index("ix_sections_vertical_department", "sections", ["vertical", "department_id"])
def downgrade() -> None:
op.drop_index("ix_sections_vertical_department", table_name="sections")
op.create_index("ix_sections_vertical", "sections", ["vertical"])
op.drop_constraint("uq_section_vertical_department_slug", "sections", type_="unique")
op.create_unique_constraint("uq_section_vertical_slug", "sections", ["vertical", "slug"])
op.add_column("sections", sa.Column("access_code", sa.String(length=255), nullable=True))
op.drop_column("sections", "department_id")
op.alter_column(
"app_settings",
"key",
existing_type=sa.String(length=255),
type_=sa.String(length=128),
existing_nullable=False,
)

View File

@@ -1,64 +0,0 @@
services:
ollama:
image: ollama/ollama:latest
environment:
OLLAMA_HOST: 0.0.0.0:11434
OLLAMA_KEEP_ALIVE: 24h
OLLAMA_NUM_PARALLEL: "1"
OLLAMA_NUM_THREAD: "8"
volumes:
- ./data/ollama:/root/.ollama
ports:
- "11434:11434"
healthcheck:
test: ["CMD", "ollama", "list"]
interval: 10s
timeout: 5s
retries: 30
restart: unless-stopped
ollama-pull:
image: ollama/ollama:latest
depends_on:
ollama:
condition: service_healthy
environment:
OLLAMA_HOST: ollama:11434
entrypoint: ["/bin/sh", "-c"]
command: ["ollama list | grep -q qwen2.5:7b-instruct-q4_K_M || ollama pull qwen2.5:7b-instruct-q4_K_M"]
restart: "no"
db:
image: postgres:16-alpine
environment:
POSTGRES_USER: ${POSTGRES_USER:-parser}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-parser}
POSTGRES_DB: ${POSTGRES_DB:-parser}
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-parser}"]
interval: 5s
timeout: 5s
retries: 10
app:
build: .
env_file: .env
depends_on:
db:
condition: service_healthy
ollama:
condition: service_healthy
ports:
- "80:8000"
volumes:
- ./data/session:/data/session
- ./data/media:/data/media
- ./src:/app/src
- ./alembic:/app/alembic
volumes:
pgdata:

View File

@@ -16,5 +16,6 @@ data:
POLL_INTERVAL_SECONDS: "60"
POLL_HISTORY_LIMIT: "50"
LLM_ENABLED: "1"
LLM_BASE_URL: "http://ollama.ollama.svc.cluster.local:11434"
LLM_MODEL: "qwen2.5:7b-instruct-q4_K_M"
LLM_BASE_URL: "http://10.2.3.5:8002"
LLM_MODEL: "qwen2.5-14b"
LLM_MAX_TOKENS: "600"

View File

@@ -10,7 +10,7 @@ stringData:
TG_PHONE: "CHANGE_ME"
TG_SESSION_STRING: ""
POSTGRES_PASSWORD: "parser"
ADMIN_ALLOWED_IPS: ""
LLM_API_KEY: ""
ADMIN_PASSWORD: "CHANGE_ME"
---
apiVersion: v1

View File

@@ -1,15 +1,4 @@
"""Admin access helpers for admin-only surfaces (auth, OpenAPI docs).
Resolution:
1. If `ADMIN_ALLOWED_IPS` is empty → no network restriction.
2. Otherwise the request's client IP must be in the allowlist.
3. When `TRUST_PROXY_HEADERS=true` (default) and one of the proxy headers
is present, the first IP in `X-Forwarded-For` (or `X-Real-IP`) is used.
Without this, behind a Docker port-forward the source IP is always the
gateway, which is useless for ACLs.
4. If `ADMIN_PASSWORD` is set, the request must also present a valid signed
admin cookie or the password in `X-Admin-Password`.
"""
"""Portal-aware access helpers for monitoring-tg."""
from __future__ import annotations
import hashlib
@@ -24,26 +13,6 @@ ADMIN_COOKIE = "parser_admin"
_ADMIN_TOKEN_MESSAGE = b"parser-tg-bot-admin-v1"
def client_ip(request: Request) -> str:
"""Best-effort source IP of the request."""
if settings.trust_proxy_headers:
xff = request.headers.get("x-forwarded-for")
if xff:
# Standard form: "client, proxy1, proxy2" — first is closest to user.
return xff.split(",")[0].strip()
real = request.headers.get("x-real-ip")
if real:
return real.strip()
return request.client.host if request.client else "0.0.0.0"
def is_admin_network_allowed(request: Request) -> bool:
allowed = settings.admin_ip_set
if not allowed:
return True
return client_ip(request) in allowed
def admin_password_enabled() -> bool:
return bool(settings.admin_password)
@@ -88,21 +57,41 @@ def clear_admin_cookie(response: Response) -> None:
def is_admin_request(request: Request) -> bool:
if not is_admin_network_allowed(request):
return False
if not settings.admin_password:
if request.headers.get("x-user-is-admin") == "1":
return True
if not settings.admin_password:
return False
return verify_admin_token(request.cookies.get(ADMIN_COOKIE)) or verify_admin_password(
request.headers.get("x-admin-password")
)
def require_admin_network(request: Request) -> None:
"""FastAPI dependency for the admin login page/API.
"""Compatibility dependency for the local admin login page.
This keeps the IP allowlist useful even before the password cookie exists.
IP allowlists were removed: portal's X-User-Is-Admin header is the
production boundary, and ADMIN_PASSWORD is only a local fallback.
"""
if not is_admin_network_allowed(request):
if is_admin_request(request) or admin_password_enabled():
return
raise HTTPException(status_code=404)
def portal_department_id(request: Request) -> str | None:
value = (request.headers.get("x-user-department-id") or "").strip()
return value or None
def is_department_head_request(request: Request) -> bool:
return request.headers.get("x-user-is-department-head") == "1"
def can_manage_department(request: Request) -> bool:
return is_admin_request(request) or is_department_head_request(request)
def require_department_manager(request: Request) -> None:
if not can_manage_department(request):
raise HTTPException(status_code=404)

View File

@@ -1,6 +1,3 @@
import hashlib
import hmac
import secrets
from datetime import datetime, timedelta, timezone
from typing import Any, Literal
@@ -21,12 +18,13 @@ from parser_bot import llm as llm_client
from parser_bot import prompt_store
from parser_bot.access import (
admin_password_enabled,
can_manage_department,
clear_admin_cookie,
client_ip,
is_admin_network_allowed,
is_admin_request,
portal_department_id,
require_admin,
require_admin_network,
require_department_manager,
set_admin_cookie,
verify_admin_password,
)
@@ -44,7 +42,6 @@ from parser_bot.api.schemas import (
GlobalStats,
MessageOut,
SectionCreate,
SectionLogin,
SectionOut,
SectionUpdate,
SectionWithStats,
@@ -72,13 +69,17 @@ def _verdict_key(vertical: str) -> str:
async def _get_section(
session: AsyncSession, vertical: Vertical, slug: str
session: AsyncSession,
vertical: Vertical,
slug: str,
department_id: str | None = None,
) -> Section:
"""Find a section by (vertical, slug) or 404."""
result = await session.execute(
select(Section).where(Section.vertical == vertical, Section.slug == slug)
)
section = result.scalar_one_or_none()
stmt = select(Section).where(Section.vertical == vertical, Section.slug == slug)
if department_id is not None:
stmt = stmt.where(Section.department_id == department_id)
result = await session.execute(stmt)
section = result.scalars().first()
if section is None:
raise HTTPException(
status_code=404, detail=f"section {vertical}:{slug} not found"
@@ -86,41 +87,24 @@ async def _get_section(
return section
def _section_cookie_name(vertical: str, slug: str) -> str:
return f"parser_section_{vertical}_{slug}"
def _read_department_id(request: Request) -> str | None:
if is_admin_request(request):
return None
dept_id = portal_department_id(request)
if not dept_id:
raise HTTPException(status_code=403, detail="department is required")
return dept_id
def _section_token(section: Section) -> str:
if not section.access_code:
return ""
return hmac.new(
section.access_code.encode("utf-8"),
f"{section.vertical}:{section.slug}".encode("utf-8"),
hashlib.sha256,
).hexdigest()
def _set_section_cookie(response: Response, section: Section) -> None:
response.set_cookie(
_section_cookie_name(section.vertical, section.slug),
_section_token(section),
httponly=True,
samesite="lax",
secure=False,
max_age=60 * 60 * 24 * 30,
)
def _section_is_unlocked(request: Request, section: Section) -> bool:
if not section.access_code:
return True
direct_code = request.headers.get("x-section-code")
if direct_code and secrets.compare_digest(direct_code, section.access_code):
return True
cookie_token = request.cookies.get(
_section_cookie_name(section.vertical, section.slug)
)
return secrets.compare_digest(cookie_token or "", _section_token(section))
def _manage_department_id(request: Request) -> str | None:
if not can_manage_department(request):
raise HTTPException(status_code=403, detail="department manager required")
if is_admin_request(request):
return None
dept_id = portal_department_id(request)
if not dept_id:
raise HTTPException(status_code=403, detail="department is required")
return dept_id
async def _require_scope_access(
@@ -129,14 +113,10 @@ async def _require_scope_access(
vertical: Vertical | None,
section_slug: str | None,
) -> Section | None:
if is_admin_request(request):
return None
if vertical is None or section_slug is None:
raise HTTPException(status_code=401, detail="section code required")
section = await _get_section(session, vertical, section_slug)
if not _section_is_unlocked(request, section):
raise HTTPException(status_code=401, detail="section code required")
return section
department_id = _read_department_id(request)
if vertical is not None and section_slug is not None:
return await _get_section(session, vertical, section_slug, department_id)
return None
async def _get_channel_in_scope(
@@ -144,6 +124,7 @@ async def _get_channel_in_scope(
channel_id: int,
vertical: Vertical | None,
section_slug: str | None,
department_id: str | None = None,
) -> tuple[Channel, str]:
"""Load a channel and its section slug; 404 if any scope constraint fails.
@@ -151,18 +132,20 @@ async def _get_channel_in_scope(
cannot accidentally read or mutate something from another scope.
"""
result = await session.execute(
select(Channel, Section.slug)
select(Channel, Section.slug, Section.department_id)
.join(Section, Section.id == Channel.section_id)
.where(Channel.id == channel_id)
)
row = result.one_or_none()
if row is None:
raise HTTPException(status_code=404)
channel, ch_section_slug = row
channel, ch_section_slug, ch_department_id = row
if vertical is not None and channel.vertical != vertical:
raise HTTPException(status_code=404)
if section_slug is not None and ch_section_slug != section_slug:
raise HTTPException(status_code=404)
if department_id is not None and ch_department_id != department_id:
raise HTTPException(status_code=404)
return channel, ch_section_slug
@@ -182,25 +165,19 @@ def _channel_out(channel: Channel, section_slug: str | None) -> dict[str, Any]:
}
# --- Access (admin allowlist) -------------------------------------------
# --- Access --------------------------------------------------------------
@router.get("/access/me")
async def access_me(request: Request) -> dict[str, Any]:
"""Tell the frontend whether the current client is on the admin allowlist.
Used by JS to show/hide the «Авторизация» and «API» nav links. Always
returns 200 (so it's safe to call from every page); the boolean is what
the UI keys off.
"""
admin = is_admin_request(request)
ip_allowed = is_admin_network_allowed(request)
department_id = portal_department_id(request)
can_manage = can_manage_department(request)
return {
"is_admin": admin,
"can_manage_department": can_manage,
"department_id": department_id,
"admin_password_enabled": admin_password_enabled(),
"admin_ip_allowed": ip_allowed,
"ip": client_ip(request) if admin else None,
"restricted": bool(settings.admin_ip_set),
}
@@ -221,24 +198,8 @@ async def admin_logout(response: Response) -> None:
clear_admin_cookie(response)
@router.post("/access/section-login", status_code=204)
async def section_login(
payload: SectionLogin,
response: Response,
session: AsyncSession = Depends(get_session),
) -> None:
section = await _get_section(session, payload.vertical, payload.section)
if not section.access_code:
_set_section_cookie(response, section)
return
if not secrets.compare_digest(payload.code, section.access_code):
raise HTTPException(status_code=401, detail="invalid section code")
_set_section_cookie(response, section)
# --- Auth (admin-only) --------------------------------------------------
# Telegram session controls are an admin surface — gate with the same
# IP allowlist so an unauth visitor can't even probe the login state.
# Telegram session controls are an admin surface.
@router.get("/auth/status", response_model=AuthStatus, dependencies=[Depends(require_admin)])
@@ -305,6 +266,7 @@ async def list_sections(
Used by the section-chooser page. Counts are computed in a single query
via LEFT JOINs so empty sections still appear.
"""
department_id = _read_department_id(request)
# Per-section channel counts.
ch_total_sub = (
select(Channel.section_id, func.count(Channel.id).label("ct"))
@@ -347,6 +309,11 @@ async def list_sections(
func.coalesce(leads_sub.c.lt, 0),
)
.where(Section.vertical == vertical)
.where(
Section.department_id == department_id
if department_id is not None
else sa.true()
)
.outerjoin(ch_total_sub, ch_total_sub.c.section_id == Section.id)
.outerjoin(ch_active_sub, ch_active_sub.c.section_id == Section.id)
.outerjoin(msg_total_sub, msg_total_sub.c.section_id == Section.id)
@@ -355,16 +322,15 @@ async def list_sections(
)
).all()
can_view_codes = is_admin_request(request)
return [
SectionWithStats(
id=s.id,
vertical=s.vertical,
department_id=s.department_id,
slug=s.slug,
title=s.title,
emoji=s.emoji,
description=s.description,
access_code=s.access_code if can_view_codes else None,
created_at=s.created_at,
channels_total=ct,
channels_active=ca,
@@ -377,12 +343,17 @@ async def list_sections(
@router.post("/sections", response_model=SectionOut, status_code=201)
async def create_section(
payload: SectionCreate, session: AsyncSession = Depends(get_session)
payload: SectionCreate,
request: Request,
session: AsyncSession = Depends(get_session),
) -> Section:
department_id = _manage_department_id(request)
# Reject duplicates with a friendly message instead of a constraint error.
existing = await session.execute(
select(Section).where(
Section.vertical == payload.vertical, Section.slug == payload.slug
Section.vertical == payload.vertical,
Section.department_id == department_id,
Section.slug == payload.slug,
)
)
if existing.scalar_one_or_none() is not None:
@@ -390,7 +361,7 @@ async def create_section(
status_code=409,
detail=f"section {payload.vertical}:{payload.slug} already exists",
)
section = Section(**payload.model_dump())
section = Section(**payload.model_dump(), department_id=department_id)
session.add(section)
await session.commit()
await session.refresh(section)
@@ -404,15 +375,16 @@ async def get_section(
request: Request,
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
section = await _get_section(session, vertical, slug)
department_id = _read_department_id(request)
section = await _get_section(session, vertical, slug, department_id)
return {
"id": section.id,
"vertical": section.vertical,
"department_id": section.department_id,
"slug": section.slug,
"title": section.title,
"emoji": section.emoji,
"description": section.description,
"access_code": section.access_code if is_admin_request(request) else None,
"created_at": section.created_at,
}
@@ -420,15 +392,17 @@ async def get_section(
@router.patch(
"/sections/{vertical}/{slug}",
response_model=SectionOut,
dependencies=[Depends(require_admin)],
dependencies=[Depends(require_department_manager)],
)
async def update_section(
vertical: Vertical,
slug: str,
payload: SectionUpdate,
request: Request,
session: AsyncSession = Depends(get_session),
) -> Section:
section = await _get_section(session, vertical, slug)
department_id = _manage_department_id(request)
section = await _get_section(session, vertical, slug, department_id)
data = payload.model_dump(exclude_unset=True)
for k, v in data.items():
setattr(section, k, v)
@@ -440,12 +414,16 @@ async def update_section(
@router.delete(
"/sections/{vertical}/{slug}",
status_code=204,
dependencies=[Depends(require_admin)],
dependencies=[Depends(require_department_manager)],
)
async def delete_section(
vertical: Vertical, slug: str, session: AsyncSession = Depends(get_session)
vertical: Vertical,
slug: str,
request: Request,
session: AsyncSession = Depends(get_session),
) -> None:
section = await _get_section(session, vertical, slug)
department_id = _manage_department_id(request)
section = await _get_section(session, vertical, slug, department_id)
# Block deletion when channels are still attached — keeps data referenceable.
count = (
await session.execute(
@@ -458,7 +436,7 @@ async def delete_section(
detail=f"section has {count} channels — move or delete them first",
)
# Drop the per-section LLM prompt too if any.
await prompt_store.reset(vertical, slug)
await prompt_store.reset(vertical, section.department_id, slug)
await session.delete(section)
await session.commit()
@@ -473,6 +451,7 @@ async def list_channels(
section: str | None = Query(None, description="optional section slug filter"),
session: AsyncSession = Depends(get_session),
) -> list[dict[str, Any]]:
department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
stmt = (
select(Channel, Section.slug)
@@ -480,6 +459,8 @@ async def list_channels(
.where(Channel.vertical == vertical)
.order_by(Channel.id)
)
if department_id is not None:
stmt = stmt.where(Section.department_id == department_id)
if section is not None:
stmt = stmt.where(Section.slug == section)
rows = (await session.execute(stmt)).all()
@@ -496,14 +477,14 @@ async def add_channel(
request: Request,
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
await _require_scope_access(request, session, payload.vertical, payload.section)
department_id = _manage_department_id(request)
existing = await session.execute(
select(Channel).where(Channel.identifier == payload.identifier)
)
if existing.scalar_one_or_none() is not None:
raise HTTPException(status_code=409, detail="channel already exists")
section = await _get_section(session, payload.vertical, payload.section)
section = await _get_section(session, payload.vertical, payload.section, department_id)
if not await tg.is_authorized():
raise HTTPException(status_code=401, detail="not authorized: log in at /auth.html")
@@ -533,9 +514,10 @@ async def get_channel(
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
channel, section_slug = await _get_channel_in_scope(
session, channel_id, vertical, section
session, channel_id, vertical, section, department_id
)
return _channel_out(channel, section_slug)
@@ -543,22 +525,24 @@ async def get_channel(
@router.patch(
"/channels/{channel_id}",
response_model=ChannelOut,
dependencies=[Depends(require_admin)],
dependencies=[Depends(require_department_manager)],
)
async def update_channel(
channel_id: int,
payload: ChannelUpdate,
request: Request,
vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"),
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
channel, _ = await _get_channel_in_scope(session, channel_id, vertical, section)
department_id = _manage_department_id(request)
channel, _ = await _get_channel_in_scope(session, channel_id, vertical, section, department_id)
if payload.is_active is not None:
channel.is_active = payload.is_active
if payload.vertical is not None:
channel.vertical = payload.vertical
if payload.section is not None:
new_section = await _get_section(session, channel.vertical, payload.section)
new_section = await _get_section(session, channel.vertical, payload.section, department_id)
channel.section_id = new_section.id
await session.commit()
await session.refresh(channel)
@@ -570,43 +554,49 @@ async def update_channel(
@router.delete(
"/channels/{channel_id}",
status_code=204,
dependencies=[Depends(require_admin)],
dependencies=[Depends(require_department_manager)],
)
async def delete_channel(
channel_id: int,
request: Request,
vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"),
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> None:
channel, _ = await _get_channel_in_scope(session, channel_id, vertical, section)
department_id = _manage_department_id(request)
channel, _ = await _get_channel_in_scope(session, channel_id, vertical, section, department_id)
await session.delete(channel)
await session.commit()
@router.post("/channels/{channel_id}/poll", dependencies=[Depends(require_admin)])
@router.post("/channels/{channel_id}/poll", dependencies=[Depends(require_department_manager)])
async def trigger_poll(
channel_id: int,
request: Request,
vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"),
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> dict[str, int]:
await _get_channel_in_scope(session, channel_id, vertical, section)
department_id = _manage_department_id(request)
await _get_channel_in_scope(session, channel_id, vertical, section, department_id)
inserted = await poll_channel(channel_id)
return {"inserted": inserted}
@router.post(
"/channels/{channel_id}/backfill-media",
dependencies=[Depends(require_admin)],
dependencies=[Depends(require_department_manager)],
)
async def trigger_backfill_media(
channel_id: int,
request: Request,
batch: int = Query(50, ge=1, le=200),
vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"),
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> dict[str, int]:
await _get_channel_in_scope(session, channel_id, vertical, section)
department_id = _manage_department_id(request)
await _get_channel_in_scope(session, channel_id, vertical, section, department_id)
try:
return await backfill_media(channel_id, batch_size=batch)
except RuntimeError as exc:
@@ -615,16 +605,18 @@ async def trigger_backfill_media(
@router.post(
"/channels/{channel_id}/reanalyze",
dependencies=[Depends(require_admin)],
dependencies=[Depends(require_department_manager)],
)
async def trigger_reanalyze(
channel_id: int,
request: Request,
batch: int = Query(500, ge=1, le=2000),
vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"),
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> dict[str, int]:
await _get_channel_in_scope(session, channel_id, vertical, section)
department_id = _manage_department_id(request)
await _get_channel_in_scope(session, channel_id, vertical, section, department_id)
return await reanalyze_channel(channel_id, batch_size=batch)
@@ -636,9 +628,10 @@ async def channel_stats(
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> ChannelStats:
department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
channel, section_slug = await _get_channel_in_scope(
session, channel_id, vertical, section
session, channel_id, vertical, section, department_id
)
counts = await session.execute(
select(func.count(Message.id), func.max(Message.date)).where(
@@ -689,6 +682,7 @@ async def list_messages(
Every query joins through channels (and optionally sections) so a
cross-vertical / cross-section read is impossible at the API boundary.
"""
department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
# Step 1: pick the group keys for this page, ordered by latest activity.
@@ -698,16 +692,17 @@ async def list_messages(
g_stmt = (
select(group_key, func.max(Message.date).label("group_date"))
.join(Channel, Channel.id == Message.channel_id)
.join(Section, Section.id == Channel.section_id)
.where(Channel.vertical == vertical)
.group_by(group_key)
.order_by(func.max(Message.date).desc())
.limit(limit)
.offset(offset)
)
if department_id is not None:
g_stmt = g_stmt.where(Section.department_id == department_id)
if section is not None:
g_stmt = g_stmt.join(Section, Section.id == Channel.section_id).where(
Section.slug == section
)
g_stmt = g_stmt.where(Section.slug == section)
if channel_id is not None:
g_stmt = g_stmt.where(Message.channel_id == channel_id)
if q:
@@ -748,13 +743,14 @@ async def list_messages(
rows_stmt = (
select(Message)
.join(Channel, Channel.id == Message.channel_id)
.join(Section, Section.id == Channel.section_id)
.where(group_key.in_(page_keys), Channel.vertical == vertical)
.order_by(Message.tg_message_id.asc())
)
if department_id is not None:
rows_stmt = rows_stmt.where(Section.department_id == department_id)
if section is not None:
rows_stmt = rows_stmt.join(Section, Section.id == Channel.section_id).where(
Section.slug == section
)
rows_stmt = rows_stmt.where(Section.slug == section)
if channel_id is not None:
rows_stmt = rows_stmt.where(Message.channel_id == channel_id)
rows = list((await session.execute(rows_stmt)).scalars().all())
@@ -832,12 +828,15 @@ async def get_message(
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> Message:
department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
msg = await session.get(Message, message_id)
if msg is None:
raise HTTPException(status_code=404)
if vertical is not None or section is not None:
await _get_channel_in_scope(session, msg.channel_id, vertical, section)
await _get_channel_in_scope(session, msg.channel_id, vertical, section, department_id)
elif department_id is not None:
await _get_channel_in_scope(session, msg.channel_id, None, None, department_id)
return msg
@@ -851,11 +850,15 @@ async def global_stats(
section: str | None = Query(None, description="optional section slug"),
session: AsyncSession = Depends(get_session),
) -> GlobalStats:
department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
base_channel_where = [Channel.vertical == vertical]
section_join_needed = section is not None
section_join_needed = section is not None or department_id is not None
if section_join_needed:
base_channel_where.append(Section.slug == section)
if section is not None:
base_channel_where.append(Section.slug == section)
if department_id is not None:
base_channel_where.append(Section.department_id == department_id)
def _channel_query(*extra):
stmt = select(func.count(Channel.id)).where(*base_channel_where, *extra)
@@ -918,7 +921,7 @@ async def global_stats(
@router.get("/llm/status")
async def llm_status() -> dict[str, Any]:
"""Whether the local LLM (Ollama) is reachable and the configured model is loaded."""
"""Whether the OpenAI-compatible LLM endpoint is reachable."""
ready = await llm_client.is_ready()
return {
"enabled": settings.llm_enabled,
@@ -936,8 +939,9 @@ async def llm_queue(
session: AsyncSession = Depends(get_session),
) -> dict[str, int]:
"""Pending classifications restricted to the vertical (+ section if given)."""
department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
return {"pending": await pending_llm_count(vertical, section)}
return {"pending": await pending_llm_count(vertical, section, department_id)}
@router.get("/llm/prompt")
@@ -954,13 +958,15 @@ async def llm_prompt_get(
`source` is one of `section` / `vertical` / `default` and tells the UI
whether the override is at the requested level or just inherited.
"""
department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
default = llm_client.default_prompt(vertical)
text, source = await prompt_store.get(vertical, section, default)
text, source = await prompt_store.get(vertical, department_id, section, default)
# Also return whether THIS exact level has its own override (for UI).
overridden_here = await prompt_store.is_overridden(vertical, section)
overridden_here = await prompt_store.is_overridden(vertical, department_id, section)
return {
"vertical": vertical,
"department_id": department_id,
"section": section,
"prompt": text,
"default": default,
@@ -969,14 +975,16 @@ async def llm_prompt_get(
}
@router.put("/llm/prompt", dependencies=[Depends(require_admin)])
@router.put("/llm/prompt", dependencies=[Depends(require_department_manager)])
async def llm_prompt_put(
payload: dict,
request: Request,
vertical: Vertical = Query(..., description="required: real_estate | hr"),
section: str | None = Query(
None, description="optional section slug — save at section level"
),
) -> dict[str, Any]:
department_id = _manage_department_id(request)
text = payload.get("prompt")
if not isinstance(text, str) or not text.strip():
raise HTTPException(status_code=400, detail="prompt must be a non-empty string")
@@ -985,20 +993,28 @@ async def llm_prompt_put(
status_code=400, detail="prompt is too long (max 30000 chars)"
)
try:
await prompt_store.set_prompt(vertical, section, text)
await prompt_store.set_prompt(vertical, department_id, section, text)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
return {"saved": True, "vertical": vertical, "section": section, "length": len(text)}
return {
"saved": True,
"vertical": vertical,
"department_id": department_id,
"section": section,
"length": len(text),
}
@router.delete("/llm/prompt", status_code=204, dependencies=[Depends(require_admin)])
@router.delete("/llm/prompt", status_code=204, dependencies=[Depends(require_department_manager)])
async def llm_prompt_reset(
request: Request,
vertical: Vertical = Query(..., description="required: real_estate | hr"),
section: str | None = Query(
None, description="optional section slug — reset that level"
),
) -> None:
await prompt_store.reset(vertical, section)
department_id = _manage_department_id(request)
await prompt_store.reset(vertical, department_id, section)
# --- Settings & batch poll ---------------------------------------------
@@ -1027,21 +1043,24 @@ async def _poll_all_in_background(channel_ids: list[int]) -> None:
continue
@router.post("/poll", dependencies=[Depends(require_admin)])
@router.post("/poll", dependencies=[Depends(require_department_manager)])
async def trigger_poll_all(
background: BackgroundTasks,
request: Request,
vertical: Vertical = Query(..., description="required: real_estate | hr"),
section: str | None = Query(None, description="optional section slug"),
session: AsyncSession = Depends(get_session),
) -> dict[str, int]:
"""Queue a poll of every active channel in this vertical (+ section if given)."""
department_id = _manage_department_id(request)
stmt = select(Channel.id).where(
Channel.is_active.is_(True), Channel.vertical == vertical
)
stmt = stmt.join(Section, Section.id == Channel.section_id)
if department_id is not None:
stmt = stmt.where(Section.department_id == department_id)
if section is not None:
stmt = stmt.join(Section, Section.id == Channel.section_id).where(
Section.slug == section
)
stmt = stmt.where(Section.slug == section)
result = await session.execute(stmt)
ids = [row[0] for row in result.all()]
background.add_task(_poll_all_in_background, ids)

View File

@@ -16,7 +16,6 @@ class SectionCreate(BaseModel):
title: str = Field(..., min_length=1, max_length=255)
emoji: str | None = Field(None, max_length=8)
description: str | None = None
access_code: str = Field(..., min_length=3, max_length=255)
@field_validator("slug")
@classmethod
@@ -32,7 +31,6 @@ class SectionUpdate(BaseModel):
title: str | None = Field(None, min_length=1, max_length=255)
emoji: str | None = Field(None, max_length=8)
description: str | None = None
access_code: str | None = Field(None, min_length=3, max_length=255)
class SectionOut(BaseModel):
@@ -40,11 +38,11 @@ class SectionOut(BaseModel):
id: int
vertical: Vertical
department_id: str | None = None
slug: str
title: str
emoji: str | None
description: str | None
access_code: str | None = None
created_at: datetime
@@ -223,9 +221,3 @@ class AuthCodeResult(BaseModel):
class AdminLogin(BaseModel):
password: str = Field(..., min_length=1)
class SectionLogin(BaseModel):
vertical: Vertical
section: str = Field(..., min_length=1, max_length=64)
code: str = Field(..., min_length=1, max_length=255)

View File

@@ -3,7 +3,7 @@ TG_SESSION_STRING into your .env / k8s Secret, then deploy without ever
touching auth again.
Usage:
docker compose run --rm -it app python -m parser_bot.auth
python -m parser_bot.auth
Telegram requires interactive code entry only for the very first login;
the resulting StringSession can be reused on any host until you log out
@@ -22,7 +22,7 @@ async def main() -> int:
if not sys.stdin.isatty():
print(
"ERROR: not a TTY. Re-run with: "
"docker compose run --rm -it app python -m parser_bot.auth",
"python -m parser_bot.auth",
file=sys.stderr,
)
return 2

View File

@@ -29,29 +29,20 @@ class Settings(BaseSettings):
media_dir: str = Field("/data/media", alias="MEDIA_DIR")
media_max_bytes: int = Field(20 * 1024 * 1024, alias="MEDIA_MAX_BYTES")
# Local LLM via Ollama for lead classification & extraction
# OpenAI-compatible LLM endpoint, shared with telephony/vLLM in production.
llm_enabled: bool = Field(True, alias="LLM_ENABLED")
llm_base_url: str = Field("http://ollama:11434", alias="LLM_BASE_URL")
llm_model: str = Field("qwen2.5:7b-instruct-q4_K_M", alias="LLM_MODEL")
llm_base_url: str = Field("http://10.2.3.5:8002", alias="LLM_BASE_URL")
llm_api_key: str = Field("", alias="LLM_API_KEY")
llm_model: str = Field("qwen2.5-14b", alias="LLM_MODEL")
llm_timeout_seconds: int = Field(120, alias="LLM_TIMEOUT_SECONDS")
llm_max_tokens: int = Field(600, alias="LLM_MAX_TOKENS")
llm_min_text_length: int = Field(20, alias="LLM_MIN_TEXT_LENGTH")
llm_classify_interval_seconds: int = Field(20, alias="LLM_CLASSIFY_INTERVAL_SECONDS")
llm_classify_batch_size: int = Field(5, alias="LLM_CLASSIFY_BATCH_SIZE")
# Admin allowlist for /auth.html, /docs, /openapi.json, /redoc and the
# /auth/* API endpoints. Comma-separated IPv4/IPv6. Empty (default) means
# no restriction — convenient for local dev. Set explicitly in prod.
admin_allowed_ips: str = Field("", alias="ADMIN_ALLOWED_IPS")
# Optional second factor for admin-only UI/API operations. Empty keeps the
# previous IP-only behavior for local/dev deployments.
# Optional local fallback for admin-only UI/API operations. In production
# portal sets X-User-Is-Admin=1 and no local password is required.
admin_password: str = Field("", alias="ADMIN_PASSWORD")
# When true, honor X-Forwarded-For / X-Real-IP set by a reverse proxy
# in front of uvicorn (Docker port-forward, nginx, traefik, etc).
trust_proxy_headers: bool = Field(True, alias="TRUST_PROXY_HEADERS")
@property
def admin_ip_set(self) -> set[str]:
return {s.strip() for s in self.admin_allowed_ips.split(",") if s.strip()}
@property
def database_url(self) -> str:

View File

@@ -29,17 +29,22 @@ class Section(Base):
__tablename__ = "sections"
__table_args__ = (
UniqueConstraint("vertical", "slug", name="uq_section_vertical_slug"),
Index("ix_sections_vertical", "vertical"),
UniqueConstraint(
"vertical",
"department_id",
"slug",
name="uq_section_vertical_department_slug",
),
Index("ix_sections_vertical_department", "vertical", "department_id"),
)
id: Mapped[int] = mapped_column(primary_key=True)
vertical: Mapped[str] = mapped_column(String(32))
department_id: Mapped[str | None] = mapped_column(String(64), nullable=True)
slug: Mapped[str] = mapped_column(String(64))
title: Mapped[str] = mapped_column(String(255))
emoji: Mapped[str | None] = mapped_column(String(8), nullable=True)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
access_code: Mapped[str | None] = mapped_column(String(255), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
@@ -112,7 +117,7 @@ class AppSetting(Base):
__tablename__ = "app_settings"
key: Mapped[str] = mapped_column(String(128), primary_key=True)
key: Mapped[str] = mapped_column(String(255), primary_key=True)
value: Mapped[dict | str | int | bool | None] = mapped_column(JSONB, nullable=False)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()

View File

@@ -315,20 +315,21 @@ def analyze(text: str | None) -> dict[str, Any]:
async def analyze_with_llm(
text: str | None,
vertical: str = "real_estate",
department_id: str | None = None,
section_slug: str | None = None,
) -> dict[str, Any]:
"""Regex extraction + local LLM lead classification, routed by vertical.
`section_slug` lets the classifier pick a section-specific system prompt
(e.g. Dubai-focused for `real_estate:dubai`) with fallback to the
vertical-default prompt. The LLM verdict goes under `lead` for RE and
under `hr_lead` for HR. Falls back to regex-only if Ollama is unavailable.
`department_id` + `section_slug` let the classifier pick a department and
section-specific system prompt with fallback to the department vertical
prompt. The LLM verdict goes under `lead` for RE and
under `hr_lead` for HR. Falls back to regex-only if the LLM is unavailable.
"""
base = analyze(text)
# Lazy import to avoid hard dep on httpx in environments where LLM is off.
from parser_bot.llm import classify
verdict = await classify(text, vertical, section_slug) # type: ignore[arg-type]
verdict = await classify(text, vertical, department_id, section_slug) # type: ignore[arg-type]
if verdict is not None:
base["hr_lead" if vertical == "hr" else "lead"] = verdict
return base

View File

@@ -1,4 +1,4 @@
"""Local LLM (Ollama) client for lead classification & extraction.
"""OpenAI-compatible LLM client for lead classification & extraction.
Two verticals share one model and one process:
- real_estate: high recall on listings (sale/rent/purchase),
@@ -7,9 +7,6 @@ Two verticals share one model and one process:
The system prompt and JSON schema differ per vertical; the rest of the
plumbing (timeouts, single-lock concurrency, JSON-mode parsing) is shared.
On any error returns `None` and the caller falls back to regex-only extraction.
The model runs on CPU via Ollama (Qwen2.5 7B Q4_K_M). Each call ~36s on
i5-12400. Concurrency is 1 (Ollama already saturates CPU per call).
"""
from __future__ import annotations
@@ -271,17 +268,38 @@ def _as_float(v: Any) -> float | None:
async def is_ready() -> bool:
"""Check that Ollama is up and the configured model is pulled."""
"""Check that the OpenAI-compatible model endpoint is reachable."""
try:
async with httpx.AsyncClient(timeout=5) as client:
r = await client.get(f"{settings.llm_base_url}/api/tags")
headers = _headers()
r = await client.get(f"{_base_url()}/v1/models", headers=headers)
r.raise_for_status()
tags = {m.get("name") for m in r.json().get("models", [])}
return any(t.startswith(settings.llm_model.split(":")[0]) for t in tags)
models = r.json().get("data", [])
if not models:
return True
model_ids = {
str(m.get("id") or m.get("name") or "")
for m in models
if isinstance(m, dict)
}
return settings.llm_model in model_ids or any(
mid.startswith(settings.llm_model) for mid in model_ids
)
except Exception:
return False
def _base_url() -> str:
return settings.llm_base_url.rstrip("/")
def _headers() -> dict[str, str]:
headers = {"Content-Type": "application/json"}
if settings.llm_api_key:
headers["Authorization"] = f"Bearer {settings.llm_api_key}"
return headers
def default_prompt(vertical: Vertical) -> str:
return DEFAULT_HR_SYSTEM_PROMPT if vertical == "hr" else DEFAULT_RE_SYSTEM_PROMPT
@@ -289,6 +307,7 @@ def default_prompt(vertical: Vertical) -> str:
async def classify(
text: str | None,
vertical: Vertical = "real_estate",
department_id: str | None = None,
section_slug: str | None = None,
) -> dict | None:
"""Classify a message text under the given vertical/section.
@@ -306,25 +325,28 @@ async def classify(
# Lazy import to avoid a circular: prompt_store -> db.session -> config.
from parser_bot import prompt_store
system = await prompt_store.resolve(vertical, section_slug, default_prompt(vertical))
system = await prompt_store.resolve(
vertical, department_id, section_slug, default_prompt(vertical)
)
payload = {
"model": settings.llm_model,
"prompt": _build_user_prompt(text),
"system": system,
"format": "json",
"stream": False,
"options": {"temperature": 0.1, "num_ctx": 4096, "num_predict": 600},
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": _build_user_prompt(text)},
],
"temperature": 0.1,
"max_tokens": settings.llm_max_tokens,
"response_format": {"type": "json_object"},
}
async with _lock:
try:
async with httpx.AsyncClient(timeout=settings.llm_timeout_seconds) as client:
r = await client.post(
f"{settings.llm_base_url}/api/generate", json=payload
f"{_base_url()}/v1/chat/completions",
headers=_headers(),
json=payload,
)
if r.status_code != 200:
# Surface the actual server message — most useful one is
# `model '...' not found`, which otherwise would just look
# like a generic HTTP error and leave the worker to spin.
log.warning(
"llm_request_failed",
status=r.status_code,
@@ -341,7 +363,9 @@ async def classify(
)
return None
raw = (data.get("response") or "").strip()
choices = data.get("choices") or []
message = choices[0].get("message") if choices and isinstance(choices[0], dict) else None
raw = ((message or {}).get("content") or "").strip()
if not raw:
return None
try:

View File

@@ -1,8 +1,8 @@
"""Runtime-editable LLM system prompts, persisted in app_settings.
Three resolution levels with fallback (more specific → less specific):
1. `llm_system_prompt:<vertical>:<section_slug>` — section override
2. `llm_system_prompt:<vertical>` — vertical override
1. `llm_system_prompt:<department_id>:<vertical>:<section_slug>` — section override
2. `llm_system_prompt:<department_id>:<vertical>` — department vertical override
3. built-in DEFAULT_RE_SYSTEM_PROMPT / DEFAULT_HR_SYSTEM_PROMPT
The prompt is read on every classification call but cached for a short
@@ -27,10 +27,15 @@ _CACHE_TTL_S = 5.0
_cache: dict[str, tuple[float, str | None]] = {}
def _key(vertical: Vertical, section_slug: str | None = None) -> str:
def _key(
vertical: Vertical,
department_id: str | None,
section_slug: str | None = None,
) -> str:
dept = department_id or "global"
if section_slug:
return f"{_KEY_PREFIX}{vertical}:{section_slug}"
return f"{_KEY_PREFIX}{vertical}"
return f"{_KEY_PREFIX}{dept}:{vertical}:{section_slug}"
return f"{_KEY_PREFIX}{dept}:{vertical}"
async def _load(key: str) -> str | None:
@@ -52,7 +57,7 @@ async def _load(key: str) -> str | None:
async def resolve(
vertical: Vertical, section_slug: str | None, default: str
vertical: Vertical, department_id: str | None, section_slug: str | None, default: str
) -> str:
"""Pick the most specific prompt available, falling back to `default`.
@@ -60,39 +65,39 @@ async def resolve(
the classifier uses for every message.
"""
if section_slug:
text = await _load(_key(vertical, section_slug))
text = await _load(_key(vertical, department_id, section_slug))
if text is not None:
return text
text = await _load(_key(vertical))
text = await _load(_key(vertical, department_id))
if text is not None:
return text
return default
async def get(
vertical: Vertical, section_slug: str | None, default: str
vertical: Vertical, department_id: str | None, section_slug: str | None, default: str
) -> tuple[str, str]:
"""For the settings UI: return (text, source) where source is one of
'section' | 'vertical' | 'default'. Lets the editor show which override
is currently active without a second round-trip.
"""
if section_slug:
text = await _load(_key(vertical, section_slug))
text = await _load(_key(vertical, department_id, section_slug))
if text is not None:
return text, "section"
text = await _load(_key(vertical))
text = await _load(_key(vertical, department_id))
if text is not None:
return text, "vertical"
return default, "default"
async def set_prompt(
vertical: Vertical, section_slug: str | None, text: str
vertical: Vertical, department_id: str | None, section_slug: str | None, text: str
) -> None:
"""Save a new prompt at the given level (section or vertical)."""
if not isinstance(text, str) or not text.strip():
raise ValueError("prompt must be a non-empty string")
key = _key(vertical, section_slug)
key = _key(vertical, department_id, section_slug)
async with session_scope() as session:
stmt = (
pg_insert(AppSetting)
@@ -105,9 +110,11 @@ async def set_prompt(
invalidate(key)
async def reset(vertical: Vertical, section_slug: str | None) -> None:
async def reset(
vertical: Vertical, department_id: str | None, section_slug: str | None
) -> None:
"""Drop the override at the given level."""
key = _key(vertical, section_slug)
key = _key(vertical, department_id, section_slug)
async with session_scope() as session:
await session.execute(
AppSetting.__table__.delete().where(AppSetting.key == key)
@@ -123,8 +130,8 @@ def invalidate(key: str | None = None) -> None:
async def is_overridden(
vertical: Vertical, section_slug: str | None = None
vertical: Vertical, department_id: str | None, section_slug: str | None = None
) -> bool:
"""True iff a custom prompt is stored at this exact level."""
text = await _load(_key(vertical, section_slug))
text = await _load(_key(vertical, department_id, section_slug))
return text is not None

View File

@@ -191,14 +191,14 @@ async def reanalyze_channel(channel_id: int, batch_size: int = 5) -> dict[str, i
"""
async with session_scope() as session:
result = await session.execute(
select(Channel, Section.slug)
select(Channel, Section.slug, Section.department_id)
.join(Section, Section.id == Channel.section_id)
.where(Channel.id == channel_id)
)
row = result.one_or_none()
if row is None:
return {"updated": 0, "pending": 0}
channel, section_slug = row
channel, section_slug, department_id = row
vertical = channel.vertical
needs_work = _needs_work_clause(vertical)
@@ -230,7 +230,7 @@ async def reanalyze_channel(channel_id: int, batch_size: int = 5) -> dict[str, i
updated = 0
for db_id, text in rows:
extracted = (
await analyze_with_llm(text, vertical, section_slug)
await analyze_with_llm(text, vertical, department_id, section_slug)
if settings.llm_enabled
else analyze(text)
)
@@ -252,7 +252,9 @@ async def reanalyze_channel(channel_id: int, batch_size: int = 5) -> dict[str, i
async def pending_llm_count(
vertical: str | None = None, section_slug: str | None = None
vertical: str | None = None,
section_slug: str | None = None,
department_id: str | None = None,
) -> int:
"""How many text messages still need LLM classification.
@@ -271,10 +273,14 @@ async def pending_llm_count(
stmt = stmt.join(Channel, Channel.id == Message.channel_id).where(
Channel.vertical == vertical
)
if section_slug is not None:
stmt = stmt.join(Section, Section.id == Channel.section_id).where(
Section.slug == section_slug
)
if section_slug is not None or department_id is not None:
if vertical is None:
stmt = stmt.join(Channel, Channel.id == Message.channel_id)
stmt = stmt.join(Section, Section.id == Channel.section_id)
if section_slug is not None:
stmt = stmt.where(Section.slug == section_slug)
if department_id is not None:
stmt = stmt.where(Section.department_id == department_id)
return (await session.execute(stmt)).scalar_one()
@@ -292,7 +298,13 @@ async def classify_pending(batch_size: int = 5) -> int:
async with session_scope() as session:
rows = (
await session.execute(
select(Message.id, Message.text, Channel.vertical, Section.slug)
select(
Message.id,
Message.text,
Channel.vertical,
Section.slug,
Section.department_id,
)
.join(Channel, Channel.id == Message.channel_id)
.join(Section, Section.id == Channel.section_id)
.where(Message.text.is_not(None), needs_work)
@@ -304,7 +316,7 @@ async def classify_pending(batch_size: int = 5) -> int:
return 0
updated = 0
for db_id, text, vertical, section_slug in rows:
for db_id, text, vertical, section_slug, department_id in rows:
# If extracted already has THIS vertical's verdict, skip — needs_work
# uses an OR over both keys and would otherwise re-run RE channels
# that already have a lead just because hr_lead is null.
@@ -314,7 +326,7 @@ async def classify_pending(batch_size: int = 5) -> int:
key = _verdict_key(vertical)
if existing and existing.get(key) is not None:
continue
extracted = await analyze_with_llm(text, vertical, section_slug)
extracted = await analyze_with_llm(text, vertical, department_id, section_slug)
msg = await session.get(Message, db_id)
if msg is None:
continue

View File

@@ -73,7 +73,7 @@
Для деплоя в k8s удобнее заранее получить опаковую строку сессии и положить её
в Secret — тогда поды поднимаются без интерактива:
</p>
<pre>docker compose run --rm -it app python -m parser_bot.auth</pre>
<pre>python -m parser_bot.auth</pre>
<p class="muted">
Скрипт напечатает <span class="mono">TG_SESSION_STRING=...</span> — вставить
в <span class="mono">.env</span> или Secret и забыть про авторизацию.

View File

@@ -48,11 +48,6 @@
<span style="min-width:120px" class="muted">Иконка</span>
<input type="text" id="new-emoji" maxlength="4" placeholder="💻" style="width:80px" />
</label>
<label class="row" style="gap:8px; margin-bottom:8px">
<span style="min-width:120px" class="muted">Код доступа</span>
<input type="text" id="new-access-code" required minlength="3"
autocomplete="new-password" style="flex:1" />
</label>
<label class="row" style="gap:8px; margin-bottom:8px; align-items:flex-start">
<span style="min-width:120px" class="muted">Описание</span>
<textarea id="new-description" rows="3" style="flex:1"></textarea>
@@ -76,11 +71,6 @@
<span style="min-width:120px" class="muted">Иконка</span>
<input type="text" id="edit-emoji" maxlength="4" style="width:80px" />
</label>
<label class="row" style="gap:8px; margin-bottom:8px">
<span style="min-width:120px" class="muted">Код доступа</span>
<input type="text" id="edit-access-code" required minlength="3"
autocomplete="new-password" style="flex:1" />
</label>
<label class="row" style="gap:8px; margin-bottom:8px; align-items:flex-start">
<span style="min-width:120px" class="muted">Описание</span>
<textarea id="edit-description" rows="3" style="flex:1"></textarea>

View File

@@ -22,9 +22,8 @@
</tbody>
</table>
<div class="muted" style="font-size:12px; margin-top:12px">
Параметры задаются через переменные окружения (<span class="mono">.env</span>).
Для изменения отредактируйте <span class="mono">.env</span> и перезапустите контейнер:
<span class="mono">docker compose restart app</span>.
Параметры задаются через переменные окружения и k8s-манифесты.
Для изменения обновите ConfigMap/Secret и перезапустите deployment.
</div>
</div>

View File

@@ -1,37 +1,50 @@
// Ask the backend whether this client is on the admin allowlist and hide
// admin-only nav links if not. The backend independently enforces the
// allowlist on every admin endpoint, so this is purely cosmetic — it just
// removes dead controls from the UI for non-admin visitors.
// Ask the backend which portal rights are available. `isAdmin()` is kept as
// a legacy name for "can manage this department" because the UI already uses
// it to show edit buttons.
let _adminPromise = null;
export function isAdmin() {
if (!_adminPromise) {
_adminPromise = fetch("/api/monitoring-tg/api/v1/access/me")
.then(r => r.ok ? r.json() : { is_admin: false })
.then(d => !!d.is_admin)
.catch(() => false);
}
return _adminPromise;
}
let _statusPromise = null;
export function adminStatus() {
return fetch("/api/monitoring-tg/api/v1/access/me")
.then(r => r.ok ? r.json() : { is_admin: false, admin_ip_allowed: false })
.catch(() => ({ is_admin: false, admin_ip_allowed: false }));
if (!_statusPromise) {
_statusPromise = fetch("/api/monitoring-tg/api/v1/access/me")
.then(r => r.ok ? r.json() : {
is_admin: false,
can_manage_department: false,
admin_password_enabled: false,
})
.catch(() => ({
is_admin: false,
can_manage_department: false,
admin_password_enabled: false,
}));
}
return _statusPromise;
}
export function isAdmin() {
return adminStatus().then(d => !!d.can_manage_department);
}
export function isPortalAdmin() {
return adminStatus().then(d => !!d.is_admin);
}
adminStatus().then(status => {
const admin = !!status.is_admin;
const canOpenAdmin = !!status.admin_ip_allowed;
if (admin) return;
const manager = !!status.can_manage_department;
const canOpenAdminLogin = !!status.admin_password_enabled;
// Remove any `.admin-link` from the DOM. Works for both server-rendered
// navs (auth.html, chooser pages) and JS-built navs (nav.js fires before
// its own write, but DOMContentLoaded ordering means the elements appear
// after — handle via a MutationObserver for late insertions).
const hide = () => {
document.querySelectorAll(".admin-link").forEach(el => el.remove());
document.querySelectorAll(".admin-only").forEach(el => el.remove());
if (!canOpenAdmin) {
if (!admin) {
document.querySelectorAll(".admin-link").forEach(el => el.remove());
}
if (!manager) {
document.querySelectorAll(".admin-only").forEach(el => el.remove());
}
if (admin || !canOpenAdminLogin) {
document.querySelectorAll(".admin-login-link").forEach(el => el.remove());
}
};

View File

@@ -18,7 +18,7 @@ async function refresh() {
form.hidden = true;
logoutBtn.hidden = false;
} else if (!status.admin_password_enabled) {
statusEl.textContent = "Админ пароль не задан. Доступ управляется IP-allowlist.";
statusEl.textContent = "Админ пароль не задан. Доступ управляется порталом.";
form.hidden = true;
logoutBtn.hidden = true;
} else {

View File

@@ -1,32 +1,9 @@
import { getVertical, getSection } from "/api/monitoring-tg/static/js/vertical.js";
const BASE = "/api/monitoring-tg/api/v1";
let sectionLoginPromise = null;
async function unlockCurrentSection() {
if (sectionLoginPromise) return sectionLoginPromise;
sectionLoginPromise = (async () => {
const vertical = getVertical();
const section = getSection();
if (!section) return false;
const code = prompt(`Введите код подраздела "${section}"`);
if (!code) return false;
await request("/access/section-login", {
method: "POST",
body: JSON.stringify({ vertical, section, code }),
sectionRetry: false,
});
return true;
})();
try {
return await sectionLoginPromise;
} finally {
sectionLoginPromise = null;
}
}
async function request(path, options = {}) {
const { sectionRetry = true, ...fetchOptions } = options;
const fetchOptions = options;
const res = await fetch(BASE + path, {
headers: { "Content-Type": "application/json" },
...fetchOptions,
@@ -34,11 +11,6 @@ async function request(path, options = {}) {
if (!res.ok) {
let detail = res.statusText;
try { detail = (await res.json()).detail || detail; } catch {}
if (res.status === 401 && detail === "section code required" && sectionRetry) {
if (await unlockCurrentSection()) {
return request(path, { ...options, sectionRetry: false });
}
}
throw new Error(`${res.status}: ${detail}`);
}
if (res.status === 204) return null;
@@ -67,16 +39,8 @@ export const api = {
request("/access/admin-login", {
method: "POST",
body: JSON.stringify({ password }),
sectionRetry: false,
}),
adminLogout: () =>
request("/access/admin-logout", { method: "POST", sectionRetry: false }),
sectionLogin: ({ vertical, section, code }) =>
request("/access/section-login", {
method: "POST",
body: JSON.stringify({ vertical, section, code }),
sectionRetry: false,
}),
adminLogout: () => request("/access/admin-logout", { method: "POST" }),
// Auth — section-agnostic.
authStatus: () => request("/auth/status"),
@@ -89,12 +53,12 @@ export const api = {
// Sections (sub-sections within a vertical).
listSections: (vertical) => request(`/sections?${qs({}, { vertical, section: null })}`),
createSection: ({ vertical, slug, title, emoji, description, accessCode }) =>
createSection: ({ vertical, slug, title, emoji, description }) =>
request("/sections", {
method: "POST",
body: JSON.stringify({
vertical: vertical ?? getVertical(),
slug, title, emoji, description, access_code: accessCode,
slug, title, emoji, description,
}),
}),
updateSection: (vertical, slug, patch) =>

View File

@@ -1,12 +1,12 @@
import { api } from "/api/monitoring-tg/static/js/api.js";
import { isAdmin } from "/api/monitoring-tg/static/js/access.js";
import { isPortalAdmin } from "/api/monitoring-tg/static/js/access.js";
import { appBase } from "/api/monitoring-tg/static/js/vertical.js";
// "Telegram not authorized" banner. Only useful for admins — non-admin
// visitors can't open /auth.html anyway, so showing the banner would be
// noise (and the /auth/status call itself 404s for non-admins).
(async () => {
if (!(await isAdmin())) return;
if (!(await isPortalAdmin())) return;
try {
const status = await api.authStatus();
if (status.authorized) return;

View File

@@ -37,7 +37,6 @@ async function render() {
<span title="🎯 Лидов">${s.leads_total.toLocaleString()} лидов</span>
</div>
${s.description ? `<div class="section-desc muted">${escape(s.description)}</div>` : ""}
${admin ? `<div class="section-code mono">Код: ${escape(s.access_code || "не задан")}</div>` : ""}
<div class="section-slug muted mono">${escape(V)} / ${escape(s.slug)}</div>
</a>
${admin ? `
@@ -124,14 +123,9 @@ document.getElementById("create-form").addEventListener("submit", async (e) => {
return;
}
const emoji = document.getElementById("new-emoji").value.trim() || null;
const accessCode = document.getElementById("new-access-code").value.trim();
if (accessCode.length < 3) {
toast("Код доступа должен быть не короче 3 символов", "error");
return;
}
const description = document.getElementById("new-description").value.trim() || null;
try {
await api.createSection({ vertical: V, slug, title, emoji, description, accessCode });
await api.createSection({ vertical: V, slug, title, emoji, description });
toast(`Подраздел "${title}" создан`, "success");
document.getElementById("create-dialog").close();
resetForm();
@@ -153,7 +147,6 @@ document.getElementById("sections-grid").addEventListener("click", async (e) =>
document.getElementById("edit-slug").value = slug;
document.getElementById("edit-title").value = section.title || "";
document.getElementById("edit-emoji").value = section.emoji || "";
document.getElementById("edit-access-code").value = section.access_code || "";
document.getElementById("edit-description").value = section.description || "";
document.getElementById("edit-dialog").showModal();
setTimeout(() => document.getElementById("edit-title").focus(), 50);
@@ -177,19 +170,13 @@ document.getElementById("edit-form").addEventListener("submit", async (e) => {
const slug = document.getElementById("edit-slug").value;
const title = document.getElementById("edit-title").value.trim();
const emoji = document.getElementById("edit-emoji").value.trim() || null;
const accessCode = document.getElementById("edit-access-code").value.trim();
const description = document.getElementById("edit-description").value.trim() || null;
if (!title) return;
if (accessCode.length < 3) {
toast("Код доступа должен быть не короче 3 символов", "error");
return;
}
try {
await api.updateSection(V, slug, {
title,
emoji,
description,
access_code: accessCode,
});
toast(`Подраздел "${title}" сохранён`, "success");
document.getElementById("edit-dialog").close();

View File

@@ -4,6 +4,7 @@ import { getVertical, getSection, VERTICAL_META } from "/api/monitoring-tg/stati
const V = getVertical();
const section = getSection();
const meta = VERTICAL_META[V];
const PROMPT_LIMIT = 30000;
// `level` decides which override layer the editor edits/saves/resets.
// "section" → store key llm_system_prompt:<vertical>:<section_slug>
@@ -33,6 +34,11 @@ function levelScope() {
async function loadConfig() {
const res = await fetch("/api/monitoring-tg/api/v1/settings");
if (res.status === 404 || res.status === 403) {
document.getElementById("config-tbody").innerHTML =
`<tr><td colspan="2" class="empty">Конфигурация сервиса доступна только админу портала</td></tr>`;
return;
}
if (!res.ok) throw new Error(res.statusText);
const cfg = await res.json();
const stats = await api.globalStats();
@@ -72,6 +78,7 @@ async function loadPrompt() {
editor.value = data.prompt || "";
const status = document.getElementById("prompt-status");
const lengthEl = document.getElementById("prompt-length");
editor.maxLength = PROMPT_LIMIT;
const map = {
section: ["override · подраздел", "ok"],
@@ -81,9 +88,17 @@ async function loadPrompt() {
const [label, cls] = map[data.source] || ["—", "off"];
status.textContent = label;
status.className = `badge ${cls}`;
lengthEl.textContent = `${(data.prompt || "").length.toLocaleString()} символов`;
const used = (data.prompt || "").length;
lengthEl.textContent =
`${used.toLocaleString()} / ${PROMPT_LIMIT.toLocaleString()} символов`;
}
document.getElementById("prompt-editor").addEventListener("input", (e) => {
const lengthEl = document.getElementById("prompt-length");
lengthEl.textContent =
`${e.target.value.length.toLocaleString()} / ${PROMPT_LIMIT.toLocaleString()} символов`;
});
document.getElementById("prompt-save").addEventListener("click", async (e) => {
const text = document.getElementById("prompt-editor").value;
e.target.disabled = true;

View File

@@ -48,11 +48,6 @@
<span style="min-width:120px" class="muted">Иконка</span>
<input type="text" id="new-emoji" maxlength="4" placeholder="🌴" style="width:80px" />
</label>
<label class="row" style="gap:8px; margin-bottom:8px">
<span style="min-width:120px" class="muted">Код доступа</span>
<input type="text" id="new-access-code" required minlength="3"
autocomplete="new-password" style="flex:1" />
</label>
<label class="row" style="gap:8px; margin-bottom:8px; align-items:flex-start">
<span style="min-width:120px" class="muted">Описание</span>
<textarea id="new-description" rows="3" style="flex:1"></textarea>
@@ -76,11 +71,6 @@
<span style="min-width:120px" class="muted">Иконка</span>
<input type="text" id="edit-emoji" maxlength="4" style="width:80px" />
</label>
<label class="row" style="gap:8px; margin-bottom:8px">
<span style="min-width:120px" class="muted">Код доступа</span>
<input type="text" id="edit-access-code" required minlength="3"
autocomplete="new-password" style="flex:1" />
</label>
<label class="row" style="gap:8px; margin-bottom:8px; align-items:flex-start">
<span style="min-width:120px" class="muted">Описание</span>
<textarea id="edit-description" rows="3" style="flex:1"></textarea>

View File

@@ -22,9 +22,8 @@
</tbody>
</table>
<div class="muted" style="font-size:12px; margin-top:12px">
Параметры задаются через переменные окружения (<span class="mono">.env</span>).
Для изменения отредактируйте <span class="mono">.env</span> и перезапустите контейнер:
<span class="mono">docker compose restart app</span>.
Параметры задаются через переменные окружения и k8s-манифесты.
Для изменения обновите ConfigMap/Secret и перезапустите deployment.
</div>
</div>