From b78d1eac02be6cba661c5be2795b4448d5f71ad4 Mon Sep 17 00:00:00 2001
From: Grendgi
Date: Thu, 4 Jun 2026 15:31:10 +0300
Subject: [PATCH] Scope monitoring TG by department
---
.env.example | 21 +-
README.md | 173 ++++-------
alembic/versions/0010_department_sections.py | 52 ++++
docker-compose.yml | 64 ----
k8s/configmap.yaml | 5 +-
k8s/secrets.yaml | 2 +-
src/parser_bot/access.py | 65 ++--
src/parser_bot/api/routes.py | 283 ++++++++++--------
src/parser_bot/api/schemas.py | 10 +-
src/parser_bot/auth.py | 4 +-
src/parser_bot/config.py | 23 +-
src/parser_bot/db/models.py | 13 +-
src/parser_bot/extractors.py | 11 +-
src/parser_bot/llm.py | 62 ++--
src/parser_bot/prompt_store.py | 41 +--
src/parser_bot/scheduler/poller.py | 34 ++-
src/parser_bot/web/static/auth.html | 2 +-
src/parser_bot/web/static/hr/index.html | 10 -
.../web/static/hr/section/settings.html | 5 +-
src/parser_bot/web/static/js/access.js | 57 ++--
src/parser_bot/web/static/js/admin.js | 2 +-
src/parser_bot/web/static/js/api.js | 44 +--
src/parser_bot/web/static/js/nav-status.js | 4 +-
src/parser_bot/web/static/js/sections-list.js | 15 +-
src/parser_bot/web/static/js/settings.js | 17 +-
.../web/static/real-estate/index.html | 10 -
.../static/real-estate/section/settings.html | 5 +-
27 files changed, 481 insertions(+), 553 deletions(-)
create mode 100644 alembic/versions/0010_department_sections.py
delete mode 100644 docker-compose.yml
diff --git a/.env.example b/.env.example
index 12fd497..622ab18 100644
--- a/.env.example
+++ b/.env.example
@@ -32,23 +32,20 @@ API_PORT=8000
MEDIA_DIR=/data/media
MEDIA_MAX_BYTES=20971520
-# Local LLM (Ollama) — runs Qwen 2.5 7B Q4 on CPU. Set LLM_ENABLED=false to disable.
+# OpenAI-compatible LLM endpoint. In production this can point to the same
+# vLLM server/model used by telephony.
LLM_ENABLED=true
-LLM_BASE_URL=http://ollama:11434
-LLM_MODEL=qwen2.5:7b-instruct-q4_K_M
+LLM_BASE_URL=http://10.2.3.5:8002
+LLM_API_KEY=
+LLM_MODEL=qwen2.5-14b
LLM_TIMEOUT_SECONDS=120
+LLM_MAX_TOKENS=600
LLM_MIN_TEXT_LENGTH=20
# How often the background classifier wakes up and how many messages it
# processes per tick. With 5/20s ≈ 900 messages/hour at ~3-6s per call.
LLM_CLASSIFY_INTERVAL_SECONDS=20
LLM_CLASSIFY_BATCH_SIZE=5
-# Admin allowlist for /auth.html, /docs, /openapi.json, /redoc and the
-# /api/v1/auth/* endpoints. Comma-separated list of client IPs.
-# Empty = no restriction (everyone is admin) — convenient for local dev.
-# Example: ADMIN_ALLOWED_IPS=89.110.109.221,127.0.0.1
-ADMIN_ALLOWED_IPS=
-# Honor X-Forwarded-For / X-Real-IP from a reverse proxy (Docker port-
-# forward, nginx, traefik) when resolving the client IP for the allowlist.
-TRUST_PROXY_HEADERS=true
-
+# Optional local fallback for admin-only UI/API operations. In production the
+# portal forwards X-User-Is-Admin=1, so no local password is required.
+ADMIN_PASSWORD=
diff --git a/README.md b/README.md
index 632a9db..d9c7ef5 100644
--- a/README.md
+++ b/README.md
@@ -1,123 +1,68 @@
-# parser-tg-bot
+# monitoring-tg
-Парсер публичных Telegram-каналов на Telethon (MTProto). Сохраняет сообщения в Postgres,
-управляется через REST API. Период опроса настраивается через `.env`. На следующем шаге
-легко перевести на realtime через `events.NewMessage`.
+Сервис мониторинга Telegram-каналов для портала. Он сохраняет сообщения в
+Postgres, раскладывает каналы по вертикалям/подразделам и выполняет AI-анализ
+через OpenAI-compatible endpoint, общий с другими сервисами портала.
-## Стек
+## Доступ
-- Python 3.11, Telethon, FastAPI, SQLAlchemy 2 (async) + Alembic, APScheduler, Postgres 16
+- Админские операции остаются за админом портала: portal прокидывает
+ `X-User-Is-Admin=1`.
+- Отдел видит только свои подразделы, каналы, сообщения и промпты через
+ `X-User-Department-Id`.
+- Руководитель отдела может создавать и редактировать подразделы своего отдела:
+ portal прокидывает `X-User-Is-Department-Head=1`.
+- Пароли подразделов и IP allowlist удалены.
+
+## Конфигурация
+
+Основные переменные:
+
+```env
+TG_API_ID=
+TG_API_HASH=
+TG_PHONE=
+TG_SESSION_STRING=
+
+POSTGRES_HOST=postgres.monitoring-tg.svc.cluster.local
+POSTGRES_PORT=5432
+POSTGRES_USER=parser
+POSTGRES_PASSWORD=parser
+POSTGRES_DB=parser
+
+PUBLIC_BASE_PATH=/api/monitoring-tg
+
+LLM_ENABLED=true
+LLM_BASE_URL=http://10.2.3.5:8002
+LLM_API_KEY=
+LLM_MODEL=qwen2.5-14b
+```
+
+Для локальной админской отладки можно задать `ADMIN_PASSWORD`, но в проде доступ
+должен идти через портал.
+
+## Запуск в k8s
+
+Манифесты лежат в `k8s/`. Перед применением нужно заполнить `k8s/secrets.yaml`
+реальными Telegram-кредами и, при необходимости, `LLM_API_KEY`.
+
+```bash
+kubectl apply -k k8s
+```
+
+Миграции выполняются entrypoint-ом контейнера перед запуском API.
## Структура
```text
src/parser_bot/
-├── api/ # FastAPI роуты + Pydantic-схемы
-├── db/ # SQLAlchemy модели + сессии
-├── scheduler/ # APScheduler-воркер периодического опроса
-├── telegram/ # Telethon-клиент (resolve, fetch)
-├── web/static/ # SPA-странички (HTML/CSS/JS, без бандлера)
-├── config.py # pydantic-settings
-└── main.py # FastAPI lifespan + uvicorn
-alembic/ # миграции
+├── api/ FastAPI роуты + Pydantic-схемы
+├── db/ SQLAlchemy модели + сессии
+├── scheduler/ APScheduler-воркер периодического опроса
+├── telegram/ Telethon-клиент
+├── web/static/ страницы UI без бандлера
+├── config.py pydantic-settings
+└── main.py FastAPI lifespan + uvicorn
+alembic/ миграции
+k8s/ манифесты для портала
```
-
-## Первый запуск (локально, через Docker)
-
-1. Получить `api_id` и `api_hash` на [my.telegram.org](https://my.telegram.org) → API development tools.
-2. Скопировать `.env.example` в `.env` и заполнить `TG_API_ID`, `TG_API_HASH`, `TG_PHONE`.
-3. Поднять Postgres + накатить миграции:
-
- ```bash
- docker compose up -d db
- docker compose run --rm app alembic upgrade head
- ```
-
-4. Запуск:
-
- ```bash
- docker compose up -d
- docker compose logs app --tail=50
- ```
-
-5. **Авторизация Telegram** — открыть [http://localhost:8000/auth.html](http://localhost:8000/auth.html)
- и нажать «Отправить код». Telegram пришлёт код на номер из `TG_PHONE` →
- ввести код (и 2FA-пароль, если включён). Готово, парсер начнёт опрос.
-
- Сессия сохраняется в `./data/session/parser.session` — рестарты её переиспользуют,
- повторно входить не нужно.
-
-### Админ-доступ и коды подразделов
-
-- `ADMIN_PASSWORD` — дополнительный пароль для админских функций. Если не задан,
- остаётся прежний режим: доступ определяется только `ADMIN_ALLOWED_IPS`.
-- [http://localhost:8000/admin.html](http://localhost:8000/admin.html) — вход по
- админ-паролю. После входа доступны удаление и редактирование подразделов,
- просмотр их кодов, управление каналами, ручной опрос, промпты, авторизация
- Telegram и Swagger.
-- При создании подраздела обязательно задаётся `Код доступа`. Пользователь вводит
- этот код при первом открытии данных подраздела; после входа он может добавлять
- каналы в этот подраздел. Админ видит код в списке подразделов.
-
-### Прод-вариант: без UI и без volume (k8s-friendly)
-
-Сделай интерактивный логин **один раз** на dev-машине и получи опаковую строку:
-
-```bash
-docker compose run --rm -it app python -m parser_bot.auth
-```
-
-Скрипт напечатает строку вида `TG_SESSION_STRING=1AbcD...`. Положи её в
-`.env` или k8s Secret — после этого приложение поднимается без UI и без
-монтирования сессионного файла:
-
-```ini
-TG_SESSION_STRING=1AbcDef... # вместо TG_SESSION_PATH/volume
-```
-
-> ⚠️ **`ApiIdPublishedFloodError`** — Telegram заблокировал твою пару
-> `api_id`/`api_hash` (попала в публичный доступ). Создай **новое** приложение
-> на [my.telegram.org](https://my.telegram.org) и не публикуй креды нигде.
-> Старый `api_id` восстановить нельзя.
-
-## UI
-
-После запуска доступны страницы:
-
-- [Дашборд](http://localhost:8000/) — общая статистика, топ каналов, кнопка опросить всех
-- [Каналы](http://localhost:8000/channels.html) — добавить / удалить / включить-выключить / опросить вручную
-- [Сообщения](http://localhost:8000/messages.html) — фильтр по каналу, поиск по тексту, пагинация, raw JSON
-- [Настройки](http://localhost:8000/settings.html) — текущая конфигурация и подсказки
-- [Авторизация](http://localhost:8000/auth.html) — веб-логин в Telegram (код + 2FA)
-- [Swagger UI](http://localhost:8000/docs) — интерактивный API
-
-Глубокая ссылка `messages.html?channel_id=42` открывает ленту конкретного канала.
-
-## API
-
-- `GET /healthz` — health check
-- `GET /api/v1/auth/status` — авторизован ли клиент
-- `POST /api/v1/auth/send-code` — отправить код на `TG_PHONE`
-- `POST /api/v1/auth/submit-code` `{"code": "12345"}` — подтвердить код
-- `POST /api/v1/auth/submit-password` `{"password": "..."}` — 2FA-пароль
-- `POST /api/v1/auth/logout` — завершить сессию
-- `GET /api/v1/stats` — глобальные счётчики
-- `GET /api/v1/settings` — read-only вид конфигурации
-- `GET /api/v1/channels` — список каналов
-- `POST /api/v1/channels` `{"identifier": "@durov"}` — добавить
-- `GET /api/v1/channels/{id}` — карточка
-- `PATCH /api/v1/channels/{id}` `{"is_active": false}` — включить/выключить
-- `DELETE /api/v1/channels/{id}` — удалить
-- `GET /api/v1/channels/{id}/stats` — счётчики по каналу
-- `POST /api/v1/channels/{id}/poll` — форсировать опрос одного канала
-- `POST /api/v1/poll` — форсировать опрос всех активных каналов
-- `GET /api/v1/messages?channel_id=...&q=...&limit=50&offset=0` — лента
-- `GET /api/v1/messages/{id}` — одно сообщение (с `raw` JSONB)
-
-## Дальше
-
-- **Realtime**: заменить APScheduler на `client.add_event_handler(handler, events.NewMessage)`,
- оставив periodic poll как фоновый «доводчик» для пропущенных сообщений.
-- **Go-микросервис**: контракт = таблицы `channels` / `messages` в Postgres.
- Go-сервис может либо читать ту же БД, либо ходить в `/api/v1/messages`.
-- **k8s**: добавить Helm-чарт; `data/session/` маппится на PVC, `.env` — в Secret.
diff --git a/alembic/versions/0010_department_sections.py b/alembic/versions/0010_department_sections.py
new file mode 100644
index 0000000..9c02c9b
--- /dev/null
+++ b/alembic/versions/0010_department_sections.py
@@ -0,0 +1,52 @@
+"""department-scoped sections
+
+Revision ID: 0010
+Revises: 0009
+Create Date: 2026-06-04
+
+"""
+from typing import Sequence, Union
+
+import sqlalchemy as sa
+from alembic import op
+
+revision: str = "0010"
+down_revision: Union[str, None] = "0009"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ op.alter_column(
+ "app_settings",
+ "key",
+ existing_type=sa.String(length=128),
+ type_=sa.String(length=255),
+ existing_nullable=False,
+ )
+ op.add_column("sections", sa.Column("department_id", sa.String(length=64), nullable=True))
+ op.drop_column("sections", "access_code")
+ op.drop_constraint("uq_section_vertical_slug", "sections", type_="unique")
+ op.create_unique_constraint(
+ "uq_section_vertical_department_slug",
+ "sections",
+ ["vertical", "department_id", "slug"],
+ )
+ op.drop_index("ix_sections_vertical", table_name="sections")
+ op.create_index("ix_sections_vertical_department", "sections", ["vertical", "department_id"])
+
+
+def downgrade() -> None:
+ op.drop_index("ix_sections_vertical_department", table_name="sections")
+ op.create_index("ix_sections_vertical", "sections", ["vertical"])
+ op.drop_constraint("uq_section_vertical_department_slug", "sections", type_="unique")
+ op.create_unique_constraint("uq_section_vertical_slug", "sections", ["vertical", "slug"])
+ op.add_column("sections", sa.Column("access_code", sa.String(length=255), nullable=True))
+ op.drop_column("sections", "department_id")
+ op.alter_column(
+ "app_settings",
+ "key",
+ existing_type=sa.String(length=255),
+ type_=sa.String(length=128),
+ existing_nullable=False,
+ )
diff --git a/docker-compose.yml b/docker-compose.yml
deleted file mode 100644
index 24a8e10..0000000
--- a/docker-compose.yml
+++ /dev/null
@@ -1,64 +0,0 @@
-services:
- ollama:
- image: ollama/ollama:latest
- environment:
- OLLAMA_HOST: 0.0.0.0:11434
- OLLAMA_KEEP_ALIVE: 24h
- OLLAMA_NUM_PARALLEL: "1"
- OLLAMA_NUM_THREAD: "8"
- volumes:
- - ./data/ollama:/root/.ollama
- ports:
- - "11434:11434"
- healthcheck:
- test: ["CMD", "ollama", "list"]
- interval: 10s
- timeout: 5s
- retries: 30
- restart: unless-stopped
-
- ollama-pull:
- image: ollama/ollama:latest
- depends_on:
- ollama:
- condition: service_healthy
- environment:
- OLLAMA_HOST: ollama:11434
- entrypoint: ["/bin/sh", "-c"]
- command: ["ollama list | grep -q qwen2.5:7b-instruct-q4_K_M || ollama pull qwen2.5:7b-instruct-q4_K_M"]
- restart: "no"
-
- db:
- image: postgres:16-alpine
- environment:
- POSTGRES_USER: ${POSTGRES_USER:-parser}
- POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-parser}
- POSTGRES_DB: ${POSTGRES_DB:-parser}
- ports:
- - "5432:5432"
- volumes:
- - pgdata:/var/lib/postgresql/data
- healthcheck:
- test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-parser}"]
- interval: 5s
- timeout: 5s
- retries: 10
-
- app:
- build: .
- env_file: .env
- depends_on:
- db:
- condition: service_healthy
- ollama:
- condition: service_healthy
- ports:
- - "80:8000"
- volumes:
- - ./data/session:/data/session
- - ./data/media:/data/media
- - ./src:/app/src
- - ./alembic:/app/alembic
-
-volumes:
- pgdata:
diff --git a/k8s/configmap.yaml b/k8s/configmap.yaml
index 013f7d6..5fb88ad 100644
--- a/k8s/configmap.yaml
+++ b/k8s/configmap.yaml
@@ -16,5 +16,6 @@ data:
POLL_INTERVAL_SECONDS: "60"
POLL_HISTORY_LIMIT: "50"
LLM_ENABLED: "1"
- LLM_BASE_URL: "http://ollama.ollama.svc.cluster.local:11434"
- LLM_MODEL: "qwen2.5:7b-instruct-q4_K_M"
+ LLM_BASE_URL: "http://10.2.3.5:8002"
+ LLM_MODEL: "qwen2.5-14b"
+ LLM_MAX_TOKENS: "600"
diff --git a/k8s/secrets.yaml b/k8s/secrets.yaml
index d8e9308..c55b746 100644
--- a/k8s/secrets.yaml
+++ b/k8s/secrets.yaml
@@ -10,7 +10,7 @@ stringData:
TG_PHONE: "CHANGE_ME"
TG_SESSION_STRING: ""
POSTGRES_PASSWORD: "parser"
- ADMIN_ALLOWED_IPS: ""
+ LLM_API_KEY: ""
ADMIN_PASSWORD: "CHANGE_ME"
---
apiVersion: v1
diff --git a/src/parser_bot/access.py b/src/parser_bot/access.py
index dbbb460..9038c6f 100644
--- a/src/parser_bot/access.py
+++ b/src/parser_bot/access.py
@@ -1,15 +1,4 @@
-"""Admin access helpers for admin-only surfaces (auth, OpenAPI docs).
-
-Resolution:
- 1. If `ADMIN_ALLOWED_IPS` is empty → no network restriction.
- 2. Otherwise the request's client IP must be in the allowlist.
- 3. When `TRUST_PROXY_HEADERS=true` (default) and one of the proxy headers
- is present, the first IP in `X-Forwarded-For` (or `X-Real-IP`) is used.
- Without this, behind a Docker port-forward the source IP is always the
- gateway, which is useless for ACLs.
- 4. If `ADMIN_PASSWORD` is set, the request must also present a valid signed
- admin cookie or the password in `X-Admin-Password`.
-"""
+"""Portal-aware access helpers for monitoring-tg."""
from __future__ import annotations
import hashlib
@@ -24,26 +13,6 @@ ADMIN_COOKIE = "parser_admin"
_ADMIN_TOKEN_MESSAGE = b"parser-tg-bot-admin-v1"
-def client_ip(request: Request) -> str:
- """Best-effort source IP of the request."""
- if settings.trust_proxy_headers:
- xff = request.headers.get("x-forwarded-for")
- if xff:
- # Standard form: "client, proxy1, proxy2" — first is closest to user.
- return xff.split(",")[0].strip()
- real = request.headers.get("x-real-ip")
- if real:
- return real.strip()
- return request.client.host if request.client else "0.0.0.0"
-
-
-def is_admin_network_allowed(request: Request) -> bool:
- allowed = settings.admin_ip_set
- if not allowed:
- return True
- return client_ip(request) in allowed
-
-
def admin_password_enabled() -> bool:
return bool(settings.admin_password)
@@ -88,21 +57,41 @@ def clear_admin_cookie(response: Response) -> None:
def is_admin_request(request: Request) -> bool:
- if not is_admin_network_allowed(request):
- return False
- if not settings.admin_password:
+ if request.headers.get("x-user-is-admin") == "1":
return True
+ if not settings.admin_password:
+ return False
return verify_admin_token(request.cookies.get(ADMIN_COOKIE)) or verify_admin_password(
request.headers.get("x-admin-password")
)
def require_admin_network(request: Request) -> None:
- """FastAPI dependency for the admin login page/API.
+ """Compatibility dependency for the local admin login page.
- This keeps the IP allowlist useful even before the password cookie exists.
+ IP allowlists were removed: portal's X-User-Is-Admin header is the
+ production boundary, and ADMIN_PASSWORD is only a local fallback.
"""
- if not is_admin_network_allowed(request):
+ if is_admin_request(request) or admin_password_enabled():
+ return
+ raise HTTPException(status_code=404)
+
+
+def portal_department_id(request: Request) -> str | None:
+ value = (request.headers.get("x-user-department-id") or "").strip()
+ return value or None
+
+
+def is_department_head_request(request: Request) -> bool:
+ return request.headers.get("x-user-is-department-head") == "1"
+
+
+def can_manage_department(request: Request) -> bool:
+ return is_admin_request(request) or is_department_head_request(request)
+
+
+def require_department_manager(request: Request) -> None:
+ if not can_manage_department(request):
raise HTTPException(status_code=404)
diff --git a/src/parser_bot/api/routes.py b/src/parser_bot/api/routes.py
index c4be44f..158c349 100644
--- a/src/parser_bot/api/routes.py
+++ b/src/parser_bot/api/routes.py
@@ -1,6 +1,3 @@
-import hashlib
-import hmac
-import secrets
from datetime import datetime, timedelta, timezone
from typing import Any, Literal
@@ -21,12 +18,13 @@ from parser_bot import llm as llm_client
from parser_bot import prompt_store
from parser_bot.access import (
admin_password_enabled,
+ can_manage_department,
clear_admin_cookie,
- client_ip,
- is_admin_network_allowed,
is_admin_request,
+ portal_department_id,
require_admin,
require_admin_network,
+ require_department_manager,
set_admin_cookie,
verify_admin_password,
)
@@ -44,7 +42,6 @@ from parser_bot.api.schemas import (
GlobalStats,
MessageOut,
SectionCreate,
- SectionLogin,
SectionOut,
SectionUpdate,
SectionWithStats,
@@ -72,13 +69,17 @@ def _verdict_key(vertical: str) -> str:
async def _get_section(
- session: AsyncSession, vertical: Vertical, slug: str
+ session: AsyncSession,
+ vertical: Vertical,
+ slug: str,
+ department_id: str | None = None,
) -> Section:
"""Find a section by (vertical, slug) or 404."""
- result = await session.execute(
- select(Section).where(Section.vertical == vertical, Section.slug == slug)
- )
- section = result.scalar_one_or_none()
+ stmt = select(Section).where(Section.vertical == vertical, Section.slug == slug)
+ if department_id is not None:
+ stmt = stmt.where(Section.department_id == department_id)
+ result = await session.execute(stmt)
+ section = result.scalars().first()
if section is None:
raise HTTPException(
status_code=404, detail=f"section {vertical}:{slug} not found"
@@ -86,41 +87,24 @@ async def _get_section(
return section
-def _section_cookie_name(vertical: str, slug: str) -> str:
- return f"parser_section_{vertical}_{slug}"
+def _read_department_id(request: Request) -> str | None:
+ if is_admin_request(request):
+ return None
+ dept_id = portal_department_id(request)
+ if not dept_id:
+ raise HTTPException(status_code=403, detail="department is required")
+ return dept_id
-def _section_token(section: Section) -> str:
- if not section.access_code:
- return ""
- return hmac.new(
- section.access_code.encode("utf-8"),
- f"{section.vertical}:{section.slug}".encode("utf-8"),
- hashlib.sha256,
- ).hexdigest()
-
-
-def _set_section_cookie(response: Response, section: Section) -> None:
- response.set_cookie(
- _section_cookie_name(section.vertical, section.slug),
- _section_token(section),
- httponly=True,
- samesite="lax",
- secure=False,
- max_age=60 * 60 * 24 * 30,
- )
-
-
-def _section_is_unlocked(request: Request, section: Section) -> bool:
- if not section.access_code:
- return True
- direct_code = request.headers.get("x-section-code")
- if direct_code and secrets.compare_digest(direct_code, section.access_code):
- return True
- cookie_token = request.cookies.get(
- _section_cookie_name(section.vertical, section.slug)
- )
- return secrets.compare_digest(cookie_token or "", _section_token(section))
+def _manage_department_id(request: Request) -> str | None:
+ if not can_manage_department(request):
+ raise HTTPException(status_code=403, detail="department manager required")
+ if is_admin_request(request):
+ return None
+ dept_id = portal_department_id(request)
+ if not dept_id:
+ raise HTTPException(status_code=403, detail="department is required")
+ return dept_id
async def _require_scope_access(
@@ -129,14 +113,10 @@ async def _require_scope_access(
vertical: Vertical | None,
section_slug: str | None,
) -> Section | None:
- if is_admin_request(request):
- return None
- if vertical is None or section_slug is None:
- raise HTTPException(status_code=401, detail="section code required")
- section = await _get_section(session, vertical, section_slug)
- if not _section_is_unlocked(request, section):
- raise HTTPException(status_code=401, detail="section code required")
- return section
+ department_id = _read_department_id(request)
+ if vertical is not None and section_slug is not None:
+ return await _get_section(session, vertical, section_slug, department_id)
+ return None
async def _get_channel_in_scope(
@@ -144,6 +124,7 @@ async def _get_channel_in_scope(
channel_id: int,
vertical: Vertical | None,
section_slug: str | None,
+ department_id: str | None = None,
) -> tuple[Channel, str]:
"""Load a channel and its section slug; 404 if any scope constraint fails.
@@ -151,18 +132,20 @@ async def _get_channel_in_scope(
cannot accidentally read or mutate something from another scope.
"""
result = await session.execute(
- select(Channel, Section.slug)
+ select(Channel, Section.slug, Section.department_id)
.join(Section, Section.id == Channel.section_id)
.where(Channel.id == channel_id)
)
row = result.one_or_none()
if row is None:
raise HTTPException(status_code=404)
- channel, ch_section_slug = row
+ channel, ch_section_slug, ch_department_id = row
if vertical is not None and channel.vertical != vertical:
raise HTTPException(status_code=404)
if section_slug is not None and ch_section_slug != section_slug:
raise HTTPException(status_code=404)
+ if department_id is not None and ch_department_id != department_id:
+ raise HTTPException(status_code=404)
return channel, ch_section_slug
@@ -182,25 +165,19 @@ def _channel_out(channel: Channel, section_slug: str | None) -> dict[str, Any]:
}
-# --- Access (admin allowlist) -------------------------------------------
+# --- Access --------------------------------------------------------------
@router.get("/access/me")
async def access_me(request: Request) -> dict[str, Any]:
- """Tell the frontend whether the current client is on the admin allowlist.
-
- Used by JS to show/hide the «Авторизация» and «API» nav links. Always
- returns 200 (so it's safe to call from every page); the boolean is what
- the UI keys off.
- """
admin = is_admin_request(request)
- ip_allowed = is_admin_network_allowed(request)
+ department_id = portal_department_id(request)
+ can_manage = can_manage_department(request)
return {
"is_admin": admin,
+ "can_manage_department": can_manage,
+ "department_id": department_id,
"admin_password_enabled": admin_password_enabled(),
- "admin_ip_allowed": ip_allowed,
- "ip": client_ip(request) if admin else None,
- "restricted": bool(settings.admin_ip_set),
}
@@ -221,24 +198,8 @@ async def admin_logout(response: Response) -> None:
clear_admin_cookie(response)
-@router.post("/access/section-login", status_code=204)
-async def section_login(
- payload: SectionLogin,
- response: Response,
- session: AsyncSession = Depends(get_session),
-) -> None:
- section = await _get_section(session, payload.vertical, payload.section)
- if not section.access_code:
- _set_section_cookie(response, section)
- return
- if not secrets.compare_digest(payload.code, section.access_code):
- raise HTTPException(status_code=401, detail="invalid section code")
- _set_section_cookie(response, section)
-
-
# --- Auth (admin-only) --------------------------------------------------
-# Telegram session controls are an admin surface — gate with the same
-# IP allowlist so an unauth visitor can't even probe the login state.
+# Telegram session controls are an admin surface.
@router.get("/auth/status", response_model=AuthStatus, dependencies=[Depends(require_admin)])
@@ -305,6 +266,7 @@ async def list_sections(
Used by the section-chooser page. Counts are computed in a single query
via LEFT JOINs so empty sections still appear.
"""
+ department_id = _read_department_id(request)
# Per-section channel counts.
ch_total_sub = (
select(Channel.section_id, func.count(Channel.id).label("ct"))
@@ -347,6 +309,11 @@ async def list_sections(
func.coalesce(leads_sub.c.lt, 0),
)
.where(Section.vertical == vertical)
+ .where(
+ Section.department_id == department_id
+ if department_id is not None
+ else sa.true()
+ )
.outerjoin(ch_total_sub, ch_total_sub.c.section_id == Section.id)
.outerjoin(ch_active_sub, ch_active_sub.c.section_id == Section.id)
.outerjoin(msg_total_sub, msg_total_sub.c.section_id == Section.id)
@@ -355,16 +322,15 @@ async def list_sections(
)
).all()
- can_view_codes = is_admin_request(request)
return [
SectionWithStats(
id=s.id,
vertical=s.vertical,
+ department_id=s.department_id,
slug=s.slug,
title=s.title,
emoji=s.emoji,
description=s.description,
- access_code=s.access_code if can_view_codes else None,
created_at=s.created_at,
channels_total=ct,
channels_active=ca,
@@ -377,12 +343,17 @@ async def list_sections(
@router.post("/sections", response_model=SectionOut, status_code=201)
async def create_section(
- payload: SectionCreate, session: AsyncSession = Depends(get_session)
+ payload: SectionCreate,
+ request: Request,
+ session: AsyncSession = Depends(get_session),
) -> Section:
+ department_id = _manage_department_id(request)
# Reject duplicates with a friendly message instead of a constraint error.
existing = await session.execute(
select(Section).where(
- Section.vertical == payload.vertical, Section.slug == payload.slug
+ Section.vertical == payload.vertical,
+ Section.department_id == department_id,
+ Section.slug == payload.slug,
)
)
if existing.scalar_one_or_none() is not None:
@@ -390,7 +361,7 @@ async def create_section(
status_code=409,
detail=f"section {payload.vertical}:{payload.slug} already exists",
)
- section = Section(**payload.model_dump())
+ section = Section(**payload.model_dump(), department_id=department_id)
session.add(section)
await session.commit()
await session.refresh(section)
@@ -404,15 +375,16 @@ async def get_section(
request: Request,
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
- section = await _get_section(session, vertical, slug)
+ department_id = _read_department_id(request)
+ section = await _get_section(session, vertical, slug, department_id)
return {
"id": section.id,
"vertical": section.vertical,
+ "department_id": section.department_id,
"slug": section.slug,
"title": section.title,
"emoji": section.emoji,
"description": section.description,
- "access_code": section.access_code if is_admin_request(request) else None,
"created_at": section.created_at,
}
@@ -420,15 +392,17 @@ async def get_section(
@router.patch(
"/sections/{vertical}/{slug}",
response_model=SectionOut,
- dependencies=[Depends(require_admin)],
+ dependencies=[Depends(require_department_manager)],
)
async def update_section(
vertical: Vertical,
slug: str,
payload: SectionUpdate,
+ request: Request,
session: AsyncSession = Depends(get_session),
) -> Section:
- section = await _get_section(session, vertical, slug)
+ department_id = _manage_department_id(request)
+ section = await _get_section(session, vertical, slug, department_id)
data = payload.model_dump(exclude_unset=True)
for k, v in data.items():
setattr(section, k, v)
@@ -440,12 +414,16 @@ async def update_section(
@router.delete(
"/sections/{vertical}/{slug}",
status_code=204,
- dependencies=[Depends(require_admin)],
+ dependencies=[Depends(require_department_manager)],
)
async def delete_section(
- vertical: Vertical, slug: str, session: AsyncSession = Depends(get_session)
+ vertical: Vertical,
+ slug: str,
+ request: Request,
+ session: AsyncSession = Depends(get_session),
) -> None:
- section = await _get_section(session, vertical, slug)
+ department_id = _manage_department_id(request)
+ section = await _get_section(session, vertical, slug, department_id)
# Block deletion when channels are still attached — keeps data referenceable.
count = (
await session.execute(
@@ -458,7 +436,7 @@ async def delete_section(
detail=f"section has {count} channels — move or delete them first",
)
# Drop the per-section LLM prompt too if any.
- await prompt_store.reset(vertical, slug)
+ await prompt_store.reset(vertical, section.department_id, slug)
await session.delete(section)
await session.commit()
@@ -473,6 +451,7 @@ async def list_channels(
section: str | None = Query(None, description="optional section slug filter"),
session: AsyncSession = Depends(get_session),
) -> list[dict[str, Any]]:
+ department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
stmt = (
select(Channel, Section.slug)
@@ -480,6 +459,8 @@ async def list_channels(
.where(Channel.vertical == vertical)
.order_by(Channel.id)
)
+ if department_id is not None:
+ stmt = stmt.where(Section.department_id == department_id)
if section is not None:
stmt = stmt.where(Section.slug == section)
rows = (await session.execute(stmt)).all()
@@ -496,14 +477,14 @@ async def add_channel(
request: Request,
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
- await _require_scope_access(request, session, payload.vertical, payload.section)
+ department_id = _manage_department_id(request)
existing = await session.execute(
select(Channel).where(Channel.identifier == payload.identifier)
)
if existing.scalar_one_or_none() is not None:
raise HTTPException(status_code=409, detail="channel already exists")
- section = await _get_section(session, payload.vertical, payload.section)
+ section = await _get_section(session, payload.vertical, payload.section, department_id)
if not await tg.is_authorized():
raise HTTPException(status_code=401, detail="not authorized: log in at /auth.html")
@@ -533,9 +514,10 @@ async def get_channel(
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
+ department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
channel, section_slug = await _get_channel_in_scope(
- session, channel_id, vertical, section
+ session, channel_id, vertical, section, department_id
)
return _channel_out(channel, section_slug)
@@ -543,22 +525,24 @@ async def get_channel(
@router.patch(
"/channels/{channel_id}",
response_model=ChannelOut,
- dependencies=[Depends(require_admin)],
+ dependencies=[Depends(require_department_manager)],
)
async def update_channel(
channel_id: int,
payload: ChannelUpdate,
+ request: Request,
vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"),
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
- channel, _ = await _get_channel_in_scope(session, channel_id, vertical, section)
+ department_id = _manage_department_id(request)
+ channel, _ = await _get_channel_in_scope(session, channel_id, vertical, section, department_id)
if payload.is_active is not None:
channel.is_active = payload.is_active
if payload.vertical is not None:
channel.vertical = payload.vertical
if payload.section is not None:
- new_section = await _get_section(session, channel.vertical, payload.section)
+ new_section = await _get_section(session, channel.vertical, payload.section, department_id)
channel.section_id = new_section.id
await session.commit()
await session.refresh(channel)
@@ -570,43 +554,49 @@ async def update_channel(
@router.delete(
"/channels/{channel_id}",
status_code=204,
- dependencies=[Depends(require_admin)],
+ dependencies=[Depends(require_department_manager)],
)
async def delete_channel(
channel_id: int,
+ request: Request,
vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"),
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> None:
- channel, _ = await _get_channel_in_scope(session, channel_id, vertical, section)
+ department_id = _manage_department_id(request)
+ channel, _ = await _get_channel_in_scope(session, channel_id, vertical, section, department_id)
await session.delete(channel)
await session.commit()
-@router.post("/channels/{channel_id}/poll", dependencies=[Depends(require_admin)])
+@router.post("/channels/{channel_id}/poll", dependencies=[Depends(require_department_manager)])
async def trigger_poll(
channel_id: int,
+ request: Request,
vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"),
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> dict[str, int]:
- await _get_channel_in_scope(session, channel_id, vertical, section)
+ department_id = _manage_department_id(request)
+ await _get_channel_in_scope(session, channel_id, vertical, section, department_id)
inserted = await poll_channel(channel_id)
return {"inserted": inserted}
@router.post(
"/channels/{channel_id}/backfill-media",
- dependencies=[Depends(require_admin)],
+ dependencies=[Depends(require_department_manager)],
)
async def trigger_backfill_media(
channel_id: int,
+ request: Request,
batch: int = Query(50, ge=1, le=200),
vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"),
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> dict[str, int]:
- await _get_channel_in_scope(session, channel_id, vertical, section)
+ department_id = _manage_department_id(request)
+ await _get_channel_in_scope(session, channel_id, vertical, section, department_id)
try:
return await backfill_media(channel_id, batch_size=batch)
except RuntimeError as exc:
@@ -615,16 +605,18 @@ async def trigger_backfill_media(
@router.post(
"/channels/{channel_id}/reanalyze",
- dependencies=[Depends(require_admin)],
+ dependencies=[Depends(require_department_manager)],
)
async def trigger_reanalyze(
channel_id: int,
+ request: Request,
batch: int = Query(500, ge=1, le=2000),
vertical: Vertical | None = Query(None, description="scope: 404 if mismatched"),
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> dict[str, int]:
- await _get_channel_in_scope(session, channel_id, vertical, section)
+ department_id = _manage_department_id(request)
+ await _get_channel_in_scope(session, channel_id, vertical, section, department_id)
return await reanalyze_channel(channel_id, batch_size=batch)
@@ -636,9 +628,10 @@ async def channel_stats(
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> ChannelStats:
+ department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
channel, section_slug = await _get_channel_in_scope(
- session, channel_id, vertical, section
+ session, channel_id, vertical, section, department_id
)
counts = await session.execute(
select(func.count(Message.id), func.max(Message.date)).where(
@@ -689,6 +682,7 @@ async def list_messages(
Every query joins through channels (and optionally sections) so a
cross-vertical / cross-section read is impossible at the API boundary.
"""
+ department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
# Step 1: pick the group keys for this page, ordered by latest activity.
@@ -698,16 +692,17 @@ async def list_messages(
g_stmt = (
select(group_key, func.max(Message.date).label("group_date"))
.join(Channel, Channel.id == Message.channel_id)
+ .join(Section, Section.id == Channel.section_id)
.where(Channel.vertical == vertical)
.group_by(group_key)
.order_by(func.max(Message.date).desc())
.limit(limit)
.offset(offset)
)
+ if department_id is not None:
+ g_stmt = g_stmt.where(Section.department_id == department_id)
if section is not None:
- g_stmt = g_stmt.join(Section, Section.id == Channel.section_id).where(
- Section.slug == section
- )
+ g_stmt = g_stmt.where(Section.slug == section)
if channel_id is not None:
g_stmt = g_stmt.where(Message.channel_id == channel_id)
if q:
@@ -748,13 +743,14 @@ async def list_messages(
rows_stmt = (
select(Message)
.join(Channel, Channel.id == Message.channel_id)
+ .join(Section, Section.id == Channel.section_id)
.where(group_key.in_(page_keys), Channel.vertical == vertical)
.order_by(Message.tg_message_id.asc())
)
+ if department_id is not None:
+ rows_stmt = rows_stmt.where(Section.department_id == department_id)
if section is not None:
- rows_stmt = rows_stmt.join(Section, Section.id == Channel.section_id).where(
- Section.slug == section
- )
+ rows_stmt = rows_stmt.where(Section.slug == section)
if channel_id is not None:
rows_stmt = rows_stmt.where(Message.channel_id == channel_id)
rows = list((await session.execute(rows_stmt)).scalars().all())
@@ -832,12 +828,15 @@ async def get_message(
section: str | None = Query(None, description="scope: 404 if mismatched"),
session: AsyncSession = Depends(get_session),
) -> Message:
+ department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
msg = await session.get(Message, message_id)
if msg is None:
raise HTTPException(status_code=404)
if vertical is not None or section is not None:
- await _get_channel_in_scope(session, msg.channel_id, vertical, section)
+ await _get_channel_in_scope(session, msg.channel_id, vertical, section, department_id)
+ elif department_id is not None:
+ await _get_channel_in_scope(session, msg.channel_id, None, None, department_id)
return msg
@@ -851,11 +850,15 @@ async def global_stats(
section: str | None = Query(None, description="optional section slug"),
session: AsyncSession = Depends(get_session),
) -> GlobalStats:
+ department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
base_channel_where = [Channel.vertical == vertical]
- section_join_needed = section is not None
+ section_join_needed = section is not None or department_id is not None
if section_join_needed:
- base_channel_where.append(Section.slug == section)
+ if section is not None:
+ base_channel_where.append(Section.slug == section)
+ if department_id is not None:
+ base_channel_where.append(Section.department_id == department_id)
def _channel_query(*extra):
stmt = select(func.count(Channel.id)).where(*base_channel_where, *extra)
@@ -918,7 +921,7 @@ async def global_stats(
@router.get("/llm/status")
async def llm_status() -> dict[str, Any]:
- """Whether the local LLM (Ollama) is reachable and the configured model is loaded."""
+ """Whether the OpenAI-compatible LLM endpoint is reachable."""
ready = await llm_client.is_ready()
return {
"enabled": settings.llm_enabled,
@@ -936,8 +939,9 @@ async def llm_queue(
session: AsyncSession = Depends(get_session),
) -> dict[str, int]:
"""Pending classifications restricted to the vertical (+ section if given)."""
+ department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
- return {"pending": await pending_llm_count(vertical, section)}
+ return {"pending": await pending_llm_count(vertical, section, department_id)}
@router.get("/llm/prompt")
@@ -954,13 +958,15 @@ async def llm_prompt_get(
`source` is one of `section` / `vertical` / `default` and tells the UI
whether the override is at the requested level or just inherited.
"""
+ department_id = _read_department_id(request)
await _require_scope_access(request, session, vertical, section)
default = llm_client.default_prompt(vertical)
- text, source = await prompt_store.get(vertical, section, default)
+ text, source = await prompt_store.get(vertical, department_id, section, default)
# Also return whether THIS exact level has its own override (for UI).
- overridden_here = await prompt_store.is_overridden(vertical, section)
+ overridden_here = await prompt_store.is_overridden(vertical, department_id, section)
return {
"vertical": vertical,
+ "department_id": department_id,
"section": section,
"prompt": text,
"default": default,
@@ -969,14 +975,16 @@ async def llm_prompt_get(
}
-@router.put("/llm/prompt", dependencies=[Depends(require_admin)])
+@router.put("/llm/prompt", dependencies=[Depends(require_department_manager)])
async def llm_prompt_put(
payload: dict,
+ request: Request,
vertical: Vertical = Query(..., description="required: real_estate | hr"),
section: str | None = Query(
None, description="optional section slug — save at section level"
),
) -> dict[str, Any]:
+ department_id = _manage_department_id(request)
text = payload.get("prompt")
if not isinstance(text, str) or not text.strip():
raise HTTPException(status_code=400, detail="prompt must be a non-empty string")
@@ -985,20 +993,28 @@ async def llm_prompt_put(
status_code=400, detail="prompt is too long (max 30000 chars)"
)
try:
- await prompt_store.set_prompt(vertical, section, text)
+ await prompt_store.set_prompt(vertical, department_id, section, text)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
- return {"saved": True, "vertical": vertical, "section": section, "length": len(text)}
+ return {
+ "saved": True,
+ "vertical": vertical,
+ "department_id": department_id,
+ "section": section,
+ "length": len(text),
+ }
-@router.delete("/llm/prompt", status_code=204, dependencies=[Depends(require_admin)])
+@router.delete("/llm/prompt", status_code=204, dependencies=[Depends(require_department_manager)])
async def llm_prompt_reset(
+ request: Request,
vertical: Vertical = Query(..., description="required: real_estate | hr"),
section: str | None = Query(
None, description="optional section slug — reset that level"
),
) -> None:
- await prompt_store.reset(vertical, section)
+ department_id = _manage_department_id(request)
+ await prompt_store.reset(vertical, department_id, section)
# --- Settings & batch poll ---------------------------------------------
@@ -1027,21 +1043,24 @@ async def _poll_all_in_background(channel_ids: list[int]) -> None:
continue
-@router.post("/poll", dependencies=[Depends(require_admin)])
+@router.post("/poll", dependencies=[Depends(require_department_manager)])
async def trigger_poll_all(
background: BackgroundTasks,
+ request: Request,
vertical: Vertical = Query(..., description="required: real_estate | hr"),
section: str | None = Query(None, description="optional section slug"),
session: AsyncSession = Depends(get_session),
) -> dict[str, int]:
"""Queue a poll of every active channel in this vertical (+ section if given)."""
+ department_id = _manage_department_id(request)
stmt = select(Channel.id).where(
Channel.is_active.is_(True), Channel.vertical == vertical
)
+ stmt = stmt.join(Section, Section.id == Channel.section_id)
+ if department_id is not None:
+ stmt = stmt.where(Section.department_id == department_id)
if section is not None:
- stmt = stmt.join(Section, Section.id == Channel.section_id).where(
- Section.slug == section
- )
+ stmt = stmt.where(Section.slug == section)
result = await session.execute(stmt)
ids = [row[0] for row in result.all()]
background.add_task(_poll_all_in_background, ids)
diff --git a/src/parser_bot/api/schemas.py b/src/parser_bot/api/schemas.py
index efe471e..00f21b6 100644
--- a/src/parser_bot/api/schemas.py
+++ b/src/parser_bot/api/schemas.py
@@ -16,7 +16,6 @@ class SectionCreate(BaseModel):
title: str = Field(..., min_length=1, max_length=255)
emoji: str | None = Field(None, max_length=8)
description: str | None = None
- access_code: str = Field(..., min_length=3, max_length=255)
@field_validator("slug")
@classmethod
@@ -32,7 +31,6 @@ class SectionUpdate(BaseModel):
title: str | None = Field(None, min_length=1, max_length=255)
emoji: str | None = Field(None, max_length=8)
description: str | None = None
- access_code: str | None = Field(None, min_length=3, max_length=255)
class SectionOut(BaseModel):
@@ -40,11 +38,11 @@ class SectionOut(BaseModel):
id: int
vertical: Vertical
+ department_id: str | None = None
slug: str
title: str
emoji: str | None
description: str | None
- access_code: str | None = None
created_at: datetime
@@ -223,9 +221,3 @@ class AuthCodeResult(BaseModel):
class AdminLogin(BaseModel):
password: str = Field(..., min_length=1)
-
-
-class SectionLogin(BaseModel):
- vertical: Vertical
- section: str = Field(..., min_length=1, max_length=64)
- code: str = Field(..., min_length=1, max_length=255)
diff --git a/src/parser_bot/auth.py b/src/parser_bot/auth.py
index 8783aca..dce24dd 100644
--- a/src/parser_bot/auth.py
+++ b/src/parser_bot/auth.py
@@ -3,7 +3,7 @@ TG_SESSION_STRING into your .env / k8s Secret, then deploy without ever
touching auth again.
Usage:
- docker compose run --rm -it app python -m parser_bot.auth
+ python -m parser_bot.auth
Telegram requires interactive code entry only for the very first login;
the resulting StringSession can be reused on any host until you log out
@@ -22,7 +22,7 @@ async def main() -> int:
if not sys.stdin.isatty():
print(
"ERROR: not a TTY. Re-run with: "
- "docker compose run --rm -it app python -m parser_bot.auth",
+ "python -m parser_bot.auth",
file=sys.stderr,
)
return 2
diff --git a/src/parser_bot/config.py b/src/parser_bot/config.py
index 3a54710..d02b048 100644
--- a/src/parser_bot/config.py
+++ b/src/parser_bot/config.py
@@ -29,29 +29,20 @@ class Settings(BaseSettings):
media_dir: str = Field("/data/media", alias="MEDIA_DIR")
media_max_bytes: int = Field(20 * 1024 * 1024, alias="MEDIA_MAX_BYTES")
- # Local LLM via Ollama for lead classification & extraction
+ # OpenAI-compatible LLM endpoint, shared with telephony/vLLM in production.
llm_enabled: bool = Field(True, alias="LLM_ENABLED")
- llm_base_url: str = Field("http://ollama:11434", alias="LLM_BASE_URL")
- llm_model: str = Field("qwen2.5:7b-instruct-q4_K_M", alias="LLM_MODEL")
+ llm_base_url: str = Field("http://10.2.3.5:8002", alias="LLM_BASE_URL")
+ llm_api_key: str = Field("", alias="LLM_API_KEY")
+ llm_model: str = Field("qwen2.5-14b", alias="LLM_MODEL")
llm_timeout_seconds: int = Field(120, alias="LLM_TIMEOUT_SECONDS")
+ llm_max_tokens: int = Field(600, alias="LLM_MAX_TOKENS")
llm_min_text_length: int = Field(20, alias="LLM_MIN_TEXT_LENGTH")
llm_classify_interval_seconds: int = Field(20, alias="LLM_CLASSIFY_INTERVAL_SECONDS")
llm_classify_batch_size: int = Field(5, alias="LLM_CLASSIFY_BATCH_SIZE")
- # Admin allowlist for /auth.html, /docs, /openapi.json, /redoc and the
- # /auth/* API endpoints. Comma-separated IPv4/IPv6. Empty (default) means
- # no restriction — convenient for local dev. Set explicitly in prod.
- admin_allowed_ips: str = Field("", alias="ADMIN_ALLOWED_IPS")
- # Optional second factor for admin-only UI/API operations. Empty keeps the
- # previous IP-only behavior for local/dev deployments.
+ # Optional local fallback for admin-only UI/API operations. In production
+ # portal sets X-User-Is-Admin=1 and no local password is required.
admin_password: str = Field("", alias="ADMIN_PASSWORD")
- # When true, honor X-Forwarded-For / X-Real-IP set by a reverse proxy
- # in front of uvicorn (Docker port-forward, nginx, traefik, etc).
- trust_proxy_headers: bool = Field(True, alias="TRUST_PROXY_HEADERS")
-
- @property
- def admin_ip_set(self) -> set[str]:
- return {s.strip() for s in self.admin_allowed_ips.split(",") if s.strip()}
@property
def database_url(self) -> str:
diff --git a/src/parser_bot/db/models.py b/src/parser_bot/db/models.py
index fc75505..9f8e20c 100644
--- a/src/parser_bot/db/models.py
+++ b/src/parser_bot/db/models.py
@@ -29,17 +29,22 @@ class Section(Base):
__tablename__ = "sections"
__table_args__ = (
- UniqueConstraint("vertical", "slug", name="uq_section_vertical_slug"),
- Index("ix_sections_vertical", "vertical"),
+ UniqueConstraint(
+ "vertical",
+ "department_id",
+ "slug",
+ name="uq_section_vertical_department_slug",
+ ),
+ Index("ix_sections_vertical_department", "vertical", "department_id"),
)
id: Mapped[int] = mapped_column(primary_key=True)
vertical: Mapped[str] = mapped_column(String(32))
+ department_id: Mapped[str | None] = mapped_column(String(64), nullable=True)
slug: Mapped[str] = mapped_column(String(64))
title: Mapped[str] = mapped_column(String(255))
emoji: Mapped[str | None] = mapped_column(String(8), nullable=True)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
- access_code: Mapped[str | None] = mapped_column(String(255), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
@@ -112,7 +117,7 @@ class AppSetting(Base):
__tablename__ = "app_settings"
- key: Mapped[str] = mapped_column(String(128), primary_key=True)
+ key: Mapped[str] = mapped_column(String(255), primary_key=True)
value: Mapped[dict | str | int | bool | None] = mapped_column(JSONB, nullable=False)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
diff --git a/src/parser_bot/extractors.py b/src/parser_bot/extractors.py
index 9d12716..769ad69 100644
--- a/src/parser_bot/extractors.py
+++ b/src/parser_bot/extractors.py
@@ -315,20 +315,21 @@ def analyze(text: str | None) -> dict[str, Any]:
async def analyze_with_llm(
text: str | None,
vertical: str = "real_estate",
+ department_id: str | None = None,
section_slug: str | None = None,
) -> dict[str, Any]:
"""Regex extraction + local LLM lead classification, routed by vertical.
- `section_slug` lets the classifier pick a section-specific system prompt
- (e.g. Dubai-focused for `real_estate:dubai`) with fallback to the
- vertical-default prompt. The LLM verdict goes under `lead` for RE and
- under `hr_lead` for HR. Falls back to regex-only if Ollama is unavailable.
+ `department_id` + `section_slug` let the classifier pick a department and
+ section-specific system prompt with fallback to the department vertical
+ prompt. The LLM verdict goes under `lead` for RE and
+ under `hr_lead` for HR. Falls back to regex-only if the LLM is unavailable.
"""
base = analyze(text)
# Lazy import to avoid hard dep on httpx in environments where LLM is off.
from parser_bot.llm import classify
- verdict = await classify(text, vertical, section_slug) # type: ignore[arg-type]
+ verdict = await classify(text, vertical, department_id, section_slug) # type: ignore[arg-type]
if verdict is not None:
base["hr_lead" if vertical == "hr" else "lead"] = verdict
return base
diff --git a/src/parser_bot/llm.py b/src/parser_bot/llm.py
index fd12d15..ed28c24 100644
--- a/src/parser_bot/llm.py
+++ b/src/parser_bot/llm.py
@@ -1,4 +1,4 @@
-"""Local LLM (Ollama) client for lead classification & extraction.
+"""OpenAI-compatible LLM client for lead classification & extraction.
Two verticals share one model and one process:
- real_estate: high recall on listings (sale/rent/purchase),
@@ -7,9 +7,6 @@ Two verticals share one model and one process:
The system prompt and JSON schema differ per vertical; the rest of the
plumbing (timeouts, single-lock concurrency, JSON-mode parsing) is shared.
On any error returns `None` and the caller falls back to regex-only extraction.
-
-The model runs on CPU via Ollama (Qwen2.5 7B Q4_K_M). Each call ~3–6s on
-i5-12400. Concurrency is 1 (Ollama already saturates CPU per call).
"""
from __future__ import annotations
@@ -271,17 +268,38 @@ def _as_float(v: Any) -> float | None:
async def is_ready() -> bool:
- """Check that Ollama is up and the configured model is pulled."""
+ """Check that the OpenAI-compatible model endpoint is reachable."""
try:
async with httpx.AsyncClient(timeout=5) as client:
- r = await client.get(f"{settings.llm_base_url}/api/tags")
+ headers = _headers()
+ r = await client.get(f"{_base_url()}/v1/models", headers=headers)
r.raise_for_status()
- tags = {m.get("name") for m in r.json().get("models", [])}
- return any(t.startswith(settings.llm_model.split(":")[0]) for t in tags)
+ models = r.json().get("data", [])
+ if not models:
+ return True
+ model_ids = {
+ str(m.get("id") or m.get("name") or "")
+ for m in models
+ if isinstance(m, dict)
+ }
+ return settings.llm_model in model_ids or any(
+ mid.startswith(settings.llm_model) for mid in model_ids
+ )
except Exception:
return False
+def _base_url() -> str:
+ return settings.llm_base_url.rstrip("/")
+
+
+def _headers() -> dict[str, str]:
+ headers = {"Content-Type": "application/json"}
+ if settings.llm_api_key:
+ headers["Authorization"] = f"Bearer {settings.llm_api_key}"
+ return headers
+
+
def default_prompt(vertical: Vertical) -> str:
return DEFAULT_HR_SYSTEM_PROMPT if vertical == "hr" else DEFAULT_RE_SYSTEM_PROMPT
@@ -289,6 +307,7 @@ def default_prompt(vertical: Vertical) -> str:
async def classify(
text: str | None,
vertical: Vertical = "real_estate",
+ department_id: str | None = None,
section_slug: str | None = None,
) -> dict | None:
"""Classify a message text under the given vertical/section.
@@ -306,25 +325,28 @@ async def classify(
# Lazy import to avoid a circular: prompt_store -> db.session -> config.
from parser_bot import prompt_store
- system = await prompt_store.resolve(vertical, section_slug, default_prompt(vertical))
+ system = await prompt_store.resolve(
+ vertical, department_id, section_slug, default_prompt(vertical)
+ )
payload = {
"model": settings.llm_model,
- "prompt": _build_user_prompt(text),
- "system": system,
- "format": "json",
- "stream": False,
- "options": {"temperature": 0.1, "num_ctx": 4096, "num_predict": 600},
+ "messages": [
+ {"role": "system", "content": system},
+ {"role": "user", "content": _build_user_prompt(text)},
+ ],
+ "temperature": 0.1,
+ "max_tokens": settings.llm_max_tokens,
+ "response_format": {"type": "json_object"},
}
async with _lock:
try:
async with httpx.AsyncClient(timeout=settings.llm_timeout_seconds) as client:
r = await client.post(
- f"{settings.llm_base_url}/api/generate", json=payload
+ f"{_base_url()}/v1/chat/completions",
+ headers=_headers(),
+ json=payload,
)
if r.status_code != 200:
- # Surface the actual server message — most useful one is
- # `model '...' not found`, which otherwise would just look
- # like a generic HTTP error and leave the worker to spin.
log.warning(
"llm_request_failed",
status=r.status_code,
@@ -341,7 +363,9 @@ async def classify(
)
return None
- raw = (data.get("response") or "").strip()
+ choices = data.get("choices") or []
+ message = choices[0].get("message") if choices and isinstance(choices[0], dict) else None
+ raw = ((message or {}).get("content") or "").strip()
if not raw:
return None
try:
diff --git a/src/parser_bot/prompt_store.py b/src/parser_bot/prompt_store.py
index 2da3b3f..919e6bc 100644
--- a/src/parser_bot/prompt_store.py
+++ b/src/parser_bot/prompt_store.py
@@ -1,8 +1,8 @@
"""Runtime-editable LLM system prompts, persisted in app_settings.
Three resolution levels with fallback (more specific → less specific):
- 1. `llm_system_prompt::` — section override
- 2. `llm_system_prompt:` — vertical override
+ 1. `llm_system_prompt:::` — section override
+ 2. `llm_system_prompt::` — department vertical override
3. built-in DEFAULT_RE_SYSTEM_PROMPT / DEFAULT_HR_SYSTEM_PROMPT
The prompt is read on every classification call but cached for a short
@@ -27,10 +27,15 @@ _CACHE_TTL_S = 5.0
_cache: dict[str, tuple[float, str | None]] = {}
-def _key(vertical: Vertical, section_slug: str | None = None) -> str:
+def _key(
+ vertical: Vertical,
+ department_id: str | None,
+ section_slug: str | None = None,
+) -> str:
+ dept = department_id or "global"
if section_slug:
- return f"{_KEY_PREFIX}{vertical}:{section_slug}"
- return f"{_KEY_PREFIX}{vertical}"
+ return f"{_KEY_PREFIX}{dept}:{vertical}:{section_slug}"
+ return f"{_KEY_PREFIX}{dept}:{vertical}"
async def _load(key: str) -> str | None:
@@ -52,7 +57,7 @@ async def _load(key: str) -> str | None:
async def resolve(
- vertical: Vertical, section_slug: str | None, default: str
+ vertical: Vertical, department_id: str | None, section_slug: str | None, default: str
) -> str:
"""Pick the most specific prompt available, falling back to `default`.
@@ -60,39 +65,39 @@ async def resolve(
the classifier uses for every message.
"""
if section_slug:
- text = await _load(_key(vertical, section_slug))
+ text = await _load(_key(vertical, department_id, section_slug))
if text is not None:
return text
- text = await _load(_key(vertical))
+ text = await _load(_key(vertical, department_id))
if text is not None:
return text
return default
async def get(
- vertical: Vertical, section_slug: str | None, default: str
+ vertical: Vertical, department_id: str | None, section_slug: str | None, default: str
) -> tuple[str, str]:
"""For the settings UI: return (text, source) where source is one of
'section' | 'vertical' | 'default'. Lets the editor show which override
is currently active without a second round-trip.
"""
if section_slug:
- text = await _load(_key(vertical, section_slug))
+ text = await _load(_key(vertical, department_id, section_slug))
if text is not None:
return text, "section"
- text = await _load(_key(vertical))
+ text = await _load(_key(vertical, department_id))
if text is not None:
return text, "vertical"
return default, "default"
async def set_prompt(
- vertical: Vertical, section_slug: str | None, text: str
+ vertical: Vertical, department_id: str | None, section_slug: str | None, text: str
) -> None:
"""Save a new prompt at the given level (section or vertical)."""
if not isinstance(text, str) or not text.strip():
raise ValueError("prompt must be a non-empty string")
- key = _key(vertical, section_slug)
+ key = _key(vertical, department_id, section_slug)
async with session_scope() as session:
stmt = (
pg_insert(AppSetting)
@@ -105,9 +110,11 @@ async def set_prompt(
invalidate(key)
-async def reset(vertical: Vertical, section_slug: str | None) -> None:
+async def reset(
+ vertical: Vertical, department_id: str | None, section_slug: str | None
+) -> None:
"""Drop the override at the given level."""
- key = _key(vertical, section_slug)
+ key = _key(vertical, department_id, section_slug)
async with session_scope() as session:
await session.execute(
AppSetting.__table__.delete().where(AppSetting.key == key)
@@ -123,8 +130,8 @@ def invalidate(key: str | None = None) -> None:
async def is_overridden(
- vertical: Vertical, section_slug: str | None = None
+ vertical: Vertical, department_id: str | None, section_slug: str | None = None
) -> bool:
"""True iff a custom prompt is stored at this exact level."""
- text = await _load(_key(vertical, section_slug))
+ text = await _load(_key(vertical, department_id, section_slug))
return text is not None
diff --git a/src/parser_bot/scheduler/poller.py b/src/parser_bot/scheduler/poller.py
index 13bdc06..8b7395d 100644
--- a/src/parser_bot/scheduler/poller.py
+++ b/src/parser_bot/scheduler/poller.py
@@ -191,14 +191,14 @@ async def reanalyze_channel(channel_id: int, batch_size: int = 5) -> dict[str, i
"""
async with session_scope() as session:
result = await session.execute(
- select(Channel, Section.slug)
+ select(Channel, Section.slug, Section.department_id)
.join(Section, Section.id == Channel.section_id)
.where(Channel.id == channel_id)
)
row = result.one_or_none()
if row is None:
return {"updated": 0, "pending": 0}
- channel, section_slug = row
+ channel, section_slug, department_id = row
vertical = channel.vertical
needs_work = _needs_work_clause(vertical)
@@ -230,7 +230,7 @@ async def reanalyze_channel(channel_id: int, batch_size: int = 5) -> dict[str, i
updated = 0
for db_id, text in rows:
extracted = (
- await analyze_with_llm(text, vertical, section_slug)
+ await analyze_with_llm(text, vertical, department_id, section_slug)
if settings.llm_enabled
else analyze(text)
)
@@ -252,7 +252,9 @@ async def reanalyze_channel(channel_id: int, batch_size: int = 5) -> dict[str, i
async def pending_llm_count(
- vertical: str | None = None, section_slug: str | None = None
+ vertical: str | None = None,
+ section_slug: str | None = None,
+ department_id: str | None = None,
) -> int:
"""How many text messages still need LLM classification.
@@ -271,10 +273,14 @@ async def pending_llm_count(
stmt = stmt.join(Channel, Channel.id == Message.channel_id).where(
Channel.vertical == vertical
)
- if section_slug is not None:
- stmt = stmt.join(Section, Section.id == Channel.section_id).where(
- Section.slug == section_slug
- )
+ if section_slug is not None or department_id is not None:
+ if vertical is None:
+ stmt = stmt.join(Channel, Channel.id == Message.channel_id)
+ stmt = stmt.join(Section, Section.id == Channel.section_id)
+ if section_slug is not None:
+ stmt = stmt.where(Section.slug == section_slug)
+ if department_id is not None:
+ stmt = stmt.where(Section.department_id == department_id)
return (await session.execute(stmt)).scalar_one()
@@ -292,7 +298,13 @@ async def classify_pending(batch_size: int = 5) -> int:
async with session_scope() as session:
rows = (
await session.execute(
- select(Message.id, Message.text, Channel.vertical, Section.slug)
+ select(
+ Message.id,
+ Message.text,
+ Channel.vertical,
+ Section.slug,
+ Section.department_id,
+ )
.join(Channel, Channel.id == Message.channel_id)
.join(Section, Section.id == Channel.section_id)
.where(Message.text.is_not(None), needs_work)
@@ -304,7 +316,7 @@ async def classify_pending(batch_size: int = 5) -> int:
return 0
updated = 0
- for db_id, text, vertical, section_slug in rows:
+ for db_id, text, vertical, section_slug, department_id in rows:
# If extracted already has THIS vertical's verdict, skip — needs_work
# uses an OR over both keys and would otherwise re-run RE channels
# that already have a lead just because hr_lead is null.
@@ -314,7 +326,7 @@ async def classify_pending(batch_size: int = 5) -> int:
key = _verdict_key(vertical)
if existing and existing.get(key) is not None:
continue
- extracted = await analyze_with_llm(text, vertical, section_slug)
+ extracted = await analyze_with_llm(text, vertical, department_id, section_slug)
msg = await session.get(Message, db_id)
if msg is None:
continue
diff --git a/src/parser_bot/web/static/auth.html b/src/parser_bot/web/static/auth.html
index 1fb0a33..acbedb2 100644
--- a/src/parser_bot/web/static/auth.html
+++ b/src/parser_bot/web/static/auth.html
@@ -73,7 +73,7 @@
Для деплоя в k8s удобнее заранее получить опаковую строку сессии и положить её
в Secret — тогда поды поднимаются без интерактива:
- docker compose run --rm -it app python -m parser_bot.auth
+ python -m parser_bot.auth
Скрипт напечатает TG_SESSION_STRING=... — вставить
в .env или Secret и забыть про авторизацию.
diff --git a/src/parser_bot/web/static/hr/index.html b/src/parser_bot/web/static/hr/index.html
index 794c0ca..9c8ea87 100644
--- a/src/parser_bot/web/static/hr/index.html
+++ b/src/parser_bot/web/static/hr/index.html
@@ -48,11 +48,6 @@
Иконка
-
-
-
-