diff --git a/Dockerfile b/Dockerfile
index 0077163..207ca08 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,3 +1,15 @@
+FROM golang:1.25-alpine AS go-builder
+
+WORKDIR /src
+
+COPY go.mod go.sum ./
+COPY cmd ./cmd
+COPY internal ./internal
+
+RUN CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-pf-server ./cmd/server \
+ && CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-pf-bot ./cmd/bot \
+ && CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-pf-scheduler ./cmd/scheduler
+
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1 \
@@ -15,11 +27,14 @@ COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY app ./app
-COPY run_web.py ./
+
+COPY --from=go-builder /out/monitoring-pf-server /usr/local/bin/monitoring-pf-server
+COPY --from=go-builder /out/monitoring-pf-bot /usr/local/bin/monitoring-pf-bot
+COPY --from=go-builder /out/monitoring-pf-scheduler /usr/local/bin/monitoring-pf-scheduler
RUN mkdir -p /app/data
EXPOSE 8000
ENTRYPOINT ["/usr/bin/tini", "--"]
-CMD ["python", "run_web.py"]
+CMD ["/usr/local/bin/monitoring-pf-server"]
diff --git a/README.md b/README.md
index ed225c0..44ed3a6 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,8 @@
Сервис мониторинга объявлений PropertyFinder по DLD Permit Number для портала.
Он хранит проекты, конкурирующие объявления и историю цен. Пользовательский UI
живёт в Portal: `portal/frontend/src/app/features/monitoring-pf`; этот сервис
-отдаёт только JSON API, Telegram bot и scheduler.
+отдаёт JSON API, Telegram bot и scheduler. Инфраструктурные процессы написаны
+на Go; Python оставлен для скраперов PropertyFinder/Bayut и внутреннего worker.
## Назначение
@@ -29,13 +30,16 @@ PIN-login больше нет.
## Структура
```text
+cmd/
+├── server/ Go JSON API для Portal
+├── bot/ Go Telegram-бот
+├── scheduler/ Go фоновый сканер
+internal/pf/ общий Go-код БД/API/Telegram
app/
-├── config.py настройки окружения
-├── db.py SQLAlchemy engine/session
+├── worker.py внутренний Python JSON worker для Go
+├── config.py настройки окружения для worker
+├── db.py SQLAlchemy engine/session для worker
├── models.py Employee, Project, CompetitorListing, PriceHistory
-├── web.py FastAPI JSON API для Portal
-├── bot.py Telegram-бот
-├── scheduler.py фоновый сканер
├── scrapers/ PropertyFinder/Bayut парсеры
├── services/ бизнес-логика и уведомления
k8s/ манифесты для портала
diff --git a/app/bot.py b/app/bot.py
deleted file mode 100644
index 0708e43..0000000
--- a/app/bot.py
+++ /dev/null
@@ -1,228 +0,0 @@
-"""Telegram bot — registers employees by chat_id and lets them trigger checks.
-
-Run as a separate process: `python -m app.bot`.
-
-Bot commands (set via @BotFather → /setcommands):
- start - Подключить себя как сотрудника
- list - Список своих проектов
- check - Проверить все мои проекты сейчас
- whoami - Показать свой chat_id
-"""
-
-from __future__ import annotations
-
-import asyncio
-import logging
-
-from sqlalchemy.orm import joinedload
-from telegram import Update
-from telegram.ext import (
- Application,
- CommandHandler,
- ContextTypes,
-)
-
-from app.config import settings
-from app.db import SessionLocal, init_db
-from app.models import Employee, Project
-from app.services.monitor import run_check_for_project
-
-logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
-logger = logging.getLogger(__name__)
-
-
-def _portal_user_code(context: ContextTypes.DEFAULT_TYPE) -> str | None:
- if not context.args:
- return None
- code = context.args[0].strip()
- return code or None
-
-
-async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- if not update.effective_user or not update.effective_chat:
- return
- user = update.effective_user
- chat_id = str(update.effective_chat.id)
- username = user.username
- portal_user_id = _portal_user_code(context)
-
- db = SessionLocal()
- try:
- existing = (
- db.query(Employee).filter(Employee.tg_chat_id == chat_id).first()
- )
- if existing:
- if portal_user_id and existing.portal_user_id and existing.portal_user_id != portal_user_id:
- await update.message.reply_text(
- "Этот Telegram уже подключен к другому пользователю Portal.",
- )
- return
- if portal_user_id and not existing.portal_user_id:
- clash = db.query(Employee).filter(Employee.portal_user_id == portal_user_id).first()
- if clash and clash.id != existing.id:
- await update.message.reply_text(
- "Этот пользователь Portal уже подключен к другому Telegram.",
- )
- return
- existing.portal_user_id = portal_user_id
- existing.tg_username = username
- db.commit()
- await update.message.reply_text(
- f"✅ Вы уже подключены как {existing.name}.\n"
- f"chat_id: {chat_id}",
- parse_mode="HTML",
- )
- return
-
- if portal_user_id:
- employee = db.query(Employee).filter(Employee.portal_user_id == portal_user_id).first()
- name = (user.full_name or username or f"user_{chat_id}").strip()
- if employee:
- if employee.tg_chat_id and employee.tg_chat_id != chat_id:
- await update.message.reply_text(
- "Этот пользователь Portal уже подключен к другому Telegram.",
- )
- return
- employee.name = employee.name or name
- employee.tg_chat_id = chat_id
- employee.tg_username = username
- else:
- employee = Employee(
- name=name,
- portal_user_id=portal_user_id,
- tg_chat_id=chat_id,
- tg_username=username,
- )
- db.add(employee)
- db.commit()
- await update.message.reply_text(
- f"✅ Привет, {name}! Telegram подключен к вашему аккаунту Portal.\n"
- f"Теперь можно добавлять объекты мониторинга в Portal.",
- parse_mode="HTML",
- )
- return
-
- # Try to find by username (admin pre-created employee w/o chat_id)
- if username:
- placeholder = (
- db.query(Employee)
- .filter(Employee.tg_username == username, Employee.tg_chat_id.is_(None))
- .first()
- )
- if placeholder:
- placeholder.tg_chat_id = chat_id
- db.commit()
- await update.message.reply_text(
- f"✅ Привет, {placeholder.name}! Вы успешно подключены.\n"
- f"Уведомления будут приходить сюда.",
- parse_mode="HTML",
- )
- return
-
- await update.message.reply_text(
- "Откройте Portal → Мониторинг PF и нажмите подключение Telegram.\n"
- "Бот должен получить команду вида:\n"
- "/start ваш_код_из_Portal",
- parse_mode="HTML",
- )
- finally:
- db.close()
-
-
-async def cmd_whoami(update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
- if not update.effective_chat:
- return
- chat_id = str(update.effective_chat.id)
- db = SessionLocal()
- try:
- e = db.query(Employee).filter(Employee.tg_chat_id == chat_id).first()
- if e:
- await update.message.reply_text(
- f"Вы: {e.name}\nchat_id: {chat_id}",
- parse_mode="HTML",
- )
- else:
- await update.message.reply_text(
- f"Вы пока не подключены. Отправьте /start.\nchat_id: {chat_id}",
- parse_mode="HTML",
- )
- finally:
- db.close()
-
-
-async def cmd_list(update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
- if not update.effective_chat:
- return
- chat_id = str(update.effective_chat.id)
- db = SessionLocal()
- try:
- e = (
- db.query(Employee)
- .options(joinedload(Employee.projects))
- .filter(Employee.tg_chat_id == chat_id)
- .first()
- )
- if not e:
- await update.message.reply_text("Сначала /start.")
- return
- if not e.projects:
- await update.message.reply_text("У вас пока нет проектов.")
- return
- lines = [f"Ваши проекты ({len(e.projects)}):"]
- for p in e.projects:
- lines.append(
- f"• #{p.id} {p.title} — {p.dld_permit} "
- f"({p.deal_type.value})"
- )
- await update.message.reply_text("\n".join(lines), parse_mode="HTML")
- finally:
- db.close()
-
-
-async def cmd_check(update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
- if not update.effective_chat:
- return
- chat_id = str(update.effective_chat.id)
- db = SessionLocal()
- try:
- e = (
- db.query(Employee)
- .options(joinedload(Employee.projects))
- .filter(Employee.tg_chat_id == chat_id)
- .first()
- )
- if not e:
- await update.message.reply_text("Сначала /start.")
- return
- if not e.projects:
- await update.message.reply_text("У вас нет проектов.")
- return
- ids = [p.id for p in e.projects]
- finally:
- db.close()
-
- await update.message.reply_text(f"⏳ Запускаю проверку {len(ids)} проектов…")
- total_changes = 0
- for pid in ids:
- try:
- total_changes += await asyncio.to_thread(run_check_for_project, pid)
- except Exception as ex:
- logger.exception("check failed for %s: %s", pid, ex)
- await update.message.reply_text(f"✅ Готово. Изменений: {total_changes}")
-
-
-def main() -> None:
- if not settings.tg_bot_token:
- raise SystemExit("TG_BOT_TOKEN не задан в k8s/secrets.yaml")
- init_db()
- app = Application.builder().token(settings.tg_bot_token).build()
- app.add_handler(CommandHandler("start", cmd_start))
- app.add_handler(CommandHandler("whoami", cmd_whoami))
- app.add_handler(CommandHandler("list", cmd_list))
- app.add_handler(CommandHandler("check", cmd_check))
- logger.info("Bot polling…")
- app.run_polling(allowed_updates=Update.ALL_TYPES)
-
-
-if __name__ == "__main__":
- main()
diff --git a/app/config.py b/app/config.py
index ef8c48f..6cb900e 100644
--- a/app/config.py
+++ b/app/config.py
@@ -28,9 +28,6 @@ class Settings(BaseSettings):
model_config = SettingsConfigDict(extra="ignore")
tg_bot_token: str = ""
- web_host: str = "127.0.0.1"
- web_port: int = 8000
- public_base_path: str = ""
scrape_interval_hours: int = 4
database_url: str = f"sqlite:///{DATA_DIR / 'monitor.db'}"
admin_chat_id: str = ""
diff --git a/app/scheduler.py b/app/scheduler.py
deleted file mode 100644
index 79a20cb..0000000
--- a/app/scheduler.py
+++ /dev/null
@@ -1,57 +0,0 @@
-"""Background scheduler — runs run_check_all() every N hours.
-
-Run as a separate process: `python -m app.scheduler`.
-"""
-
-from __future__ import annotations
-
-import logging
-import time
-
-from apscheduler.schedulers.blocking import BlockingScheduler
-from apscheduler.triggers.interval import IntervalTrigger
-
-from app.config import settings
-from app.db import init_db
-from app.services.monitor import run_check_all
-
-logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
-logger = logging.getLogger(__name__)
-
-
-def job() -> None:
- logger.info("Scheduled scan starting…")
- start = time.time()
- summary = run_check_all()
- elapsed = time.time() - start
- total_changes = sum(c for c in summary.values() if c > 0)
- logger.info(
- "Scan done in %.1fs. Projects: %d, total changes: %d",
- elapsed, len(summary), total_changes,
- )
-
-
-def main() -> None:
- init_db()
- hours = max(1, settings.scrape_interval_hours)
- scheduler = BlockingScheduler(timezone="UTC")
- scheduler.add_job(
- job,
- trigger=IntervalTrigger(hours=hours),
- # Omit next_run_time so APScheduler defaults the first run to now+interval
- # (i.e. don't fire immediately at startup, fire after one interval, then
- # every interval). Passing next_run_time=None instead creates the job in a
- # PAUSED state and it never fires — that was the bug.
- id="periodic-scan",
- max_instances=1,
- coalesce=True,
- )
- logger.info("Scheduler started — interval %d hour(s).", hours)
- try:
- scheduler.start()
- except (KeyboardInterrupt, SystemExit):
- logger.info("Scheduler stopped.")
-
-
-if __name__ == "__main__":
- main()
diff --git a/app/web.py b/app/web.py
deleted file mode 100644
index 4c031f3..0000000
--- a/app/web.py
+++ /dev/null
@@ -1,498 +0,0 @@
-"""FastAPI API for Monitoring PF.
-
-The user interface lives in Portal. This service exposes only JSON endpoints
-and trusts Portal-provided headers for admin state.
-"""
-
-from __future__ import annotations
-
-from datetime import datetime
-from typing import Any
-
-from fastapi import Depends, FastAPI, HTTPException, Request
-from pydantic import BaseModel, Field
-from sqlalchemy.orm import Session, joinedload
-
-from app.config import settings
-from app.db import get_db, init_db
-from app.models import CompetitorListing, DealType, Employee, ListingStatus, Project
-from app.services.monitor import (
- BAYUT_ENABLED,
- add_competitor_url,
- add_competitor_urls,
- resolve_our_permit,
- run_check_for_project,
- suggest_similar,
-)
-
-app = FastAPI(title="Monitoring PF")
-
-
-class EmployeeCreate(BaseModel):
- name: str = Field(..., min_length=1, max_length=200)
- portal_user_id: str | None = Field(None, max_length=100)
- tg_username: str | None = Field(None, max_length=200)
- tg_chat_id: str | None = Field(None, max_length=64)
-
-
-class EmployeeUpdate(BaseModel):
- name: str | None = Field(None, min_length=1, max_length=200)
- tg_username: str | None = Field(None, max_length=200)
- tg_chat_id: str | None = Field(None, max_length=64)
-
-
-class ProjectCreate(BaseModel):
- title: str = Field(..., min_length=1, max_length=300)
- deal_type: DealType
- owner_id: int | None = None
- our_price: float | None = None
- notes: str | None = None
- dld_permit: str | None = Field(None, max_length=100)
- building: str | None = Field(None, max_length=300)
- bedrooms: int | None = None
- size_sqft: float | None = None
- our_url: str | None = None
-
-
-class ProjectUpdate(BaseModel):
- title: str | None = Field(None, min_length=1, max_length=300)
- deal_type: DealType | None = None
- owner_id: int | None = None
- our_price: float | None = None
- notes: str | None = None
- dld_permit: str | None = Field(None, max_length=100)
- building: str | None = Field(None, max_length=300)
- bedrooms: int | None = None
- size_sqft: float | None = None
- our_url: str | None = None
-
-
-class ListingCreate(BaseModel):
- url: str = Field(..., min_length=1)
-
-
-class ListingsBulkCreate(BaseModel):
- urls: list[str] = Field(default_factory=list)
-
-
-@app.on_event("startup")
-def _startup() -> None:
- init_db()
-
-
-@app.get("/healthz")
-def healthz() -> dict[str, str]:
- return {"status": "ok"}
-
-
-@app.get("/")
-def index() -> dict[str, str]:
- return {"service": "monitoring-pf", "ui": "portal"}
-
-
-def _is_admin(request: Request) -> bool:
- return request.headers.get("x-user-is-admin") == "1"
-
-
-def _require_admin(request: Request) -> None:
- if not _is_admin(request):
- raise HTTPException(status_code=404, detail="not found")
-
-
-def _clean(value: str | None) -> str | None:
- value = (value or "").strip()
- return value or None
-
-
-def _portal_user_id(request: Request) -> str | None:
- return _clean(request.headers.get("x-user-id"))
-
-
-def _telegram_start_command(portal_user_id: str | None) -> str | None:
- return f"/start {portal_user_id}" if portal_user_id else None
-
-
-def _telegram_start_link(portal_user_id: str | None) -> str | None:
- username = settings.tg_bot_username.strip().lstrip("@")
- if not username or not portal_user_id:
- return None
- return f"https://t.me/{username}?start={portal_user_id}"
-
-
-def _current_employee(request: Request, db: Session, *, required: bool = True) -> Employee | None:
- portal_user_id = _portal_user_id(request)
- if not portal_user_id:
- if required:
- raise HTTPException(status_code=401, detail="portal user is not available")
- return None
- employee = db.query(Employee).filter(Employee.portal_user_id == portal_user_id).first()
- if (not employee or not employee.tg_chat_id) and required:
- raise HTTPException(status_code=403, detail="Сначала авторизуйтесь в Telegram-боте Monitoring PF")
- return employee
-
-
-def _owned_project(request: Request, db: Session, project_id: int, *, with_detail: bool = False) -> Project:
- employee = _current_employee(request, db)
- query = db.query(Project).filter(Project.id == project_id, Project.owner_id == employee.id)
- if with_detail:
- query = query.options(
- joinedload(Project.owner),
- joinedload(Project.listings).joinedload(CompetitorListing.price_history),
- )
- else:
- query = query.options(joinedload(Project.owner), joinedload(Project.listings))
- project = query.first()
- if not project:
- raise HTTPException(404, "project not found")
- return project
-
-
-def _dt(value: datetime | None) -> str | None:
- return value.isoformat() if value else None
-
-
-def _employee_out(employee: Employee) -> dict[str, Any]:
- return {
- "id": employee.id,
- "name": employee.name,
- "portal_user_id": employee.portal_user_id,
- "tg_chat_id": employee.tg_chat_id,
- "tg_username": employee.tg_username,
- "projects_total": len(employee.projects or []),
- "created_at": _dt(employee.created_at),
- }
-
-
-def _history_out(listing: CompetitorListing) -> list[dict[str, Any]]:
- return [
- {"id": h.id, "price": h.price, "recorded_at": _dt(h.recorded_at)}
- for h in listing.price_history
- ]
-
-
-def _listing_out(listing: CompetitorListing, *, with_history: bool = False) -> dict[str, Any]:
- out = {
- "id": listing.id,
- "project_id": listing.project_id,
- "source": listing.source.value,
- "external_id": listing.external_id,
- "url": listing.url,
- "title": listing.title,
- "agent_name": listing.agent_name,
- "agency_name": listing.agency_name,
- "current_price": listing.current_price,
- "currency": listing.currency,
- "status": listing.status.value,
- "first_seen_at": _dt(listing.first_seen_at),
- "last_seen_at": _dt(listing.last_seen_at),
- }
- if with_history:
- out["price_history"] = _history_out(listing)
- return out
-
-
-def _project_stats(project: Project) -> dict[str, Any]:
- listings = project.listings or []
- active = [l for l in listings if l.status == ListingStatus.ACTIVE]
- prices = [l.current_price for l in active if l.current_price is not None]
- return {
- "listings_total": len(listings),
- "listings_active": len(active),
- "listings_removed": len(listings) - len(active),
- "min_competitor_price": min(prices) if prices else None,
- }
-
-
-def _project_out(project: Project, *, detail: bool = False) -> dict[str, Any]:
- out = {
- "id": project.id,
- "title": project.title,
- "deal_type": project.deal_type.value,
- "our_price": project.our_price,
- "notes": project.notes,
- "dld_permit": project.dld_permit,
- "building": project.building,
- "bedrooms": project.bedrooms,
- "size_sqft": project.size_sqft,
- "our_url": project.our_url,
- "owner_id": project.owner_id,
- "owner": _employee_out(project.owner) if project.owner else None,
- "created_at": _dt(project.created_at),
- "last_checked_at": _dt(project.last_checked_at),
- **_project_stats(project),
- }
- if detail:
- out["listings"] = [_listing_out(l, with_history=True) for l in project.listings]
- return out
-
-
-def _suggestion_out(item: Any) -> dict[str, Any]:
- return {
- "source": item.source,
- "external_id": item.external_id,
- "url": item.url,
- "title": item.title,
- "price": item.price,
- "currency": item.currency,
- "permit_number": item.permit_number,
- "agent_name": item.agent_name,
- "agency_name": item.agency_name,
- "is_active": item.is_active,
- }
-
-
-@app.get("/api/v1/access/me")
-def access_me(request: Request, db: Session = Depends(get_db)) -> dict[str, Any]:
- portal_user_id = _portal_user_id(request)
- employee = _current_employee(request, db, required=False)
- return {
- "is_admin": _is_admin(request),
- "portal_user_id": portal_user_id,
- "telegram_linked": bool(employee and employee.tg_chat_id),
- "employee": _employee_out(employee) if employee else None,
- "telegram_bot_username": settings.tg_bot_username.strip().lstrip("@") or None,
- "telegram_start_command": _telegram_start_command(portal_user_id),
- "telegram_start_link": _telegram_start_link(portal_user_id),
- }
-
-
-@app.get("/api/v1/summary")
-def summary(request: Request, db: Session = Depends(get_db)) -> dict[str, Any]:
- employee = _current_employee(request, db, required=False)
- if not employee or not employee.tg_chat_id:
- projects = []
- employees = []
- listings = []
- else:
- projects = (
- db.query(Project)
- .options(joinedload(Project.listings))
- .filter(Project.owner_id == employee.id)
- .all()
- )
- employees = [employee]
- listings = (
- db.query(CompetitorListing)
- .join(Project)
- .filter(Project.owner_id == employee.id)
- .all()
- )
- active = [l for l in listings if l.status == ListingStatus.ACTIVE]
- return {
- "projects_total": len(projects),
- "employees_total": len(employees),
- "listings_total": len(listings),
- "listings_active": len(active),
- "listings_removed": len(listings) - len(active),
- "scrape_interval_hours": settings.scrape_interval_hours,
- "bayut_enabled": BAYUT_ENABLED,
- }
-
-
-@app.get("/api/v1/employees")
-def employees_list(request: Request, db: Session = Depends(get_db)) -> list[dict[str, Any]]:
- if _is_admin(request):
- employees = (
- db.query(Employee)
- .options(joinedload(Employee.projects))
- .order_by(Employee.name)
- .all()
- )
- else:
- employee = _current_employee(request, db, required=False)
- employees = [employee] if employee and employee.tg_chat_id else []
- return [_employee_out(employee) for employee in employees]
-
-
-@app.post("/api/v1/employees", status_code=201)
-def employee_create(
- payload: EmployeeCreate,
- request: Request,
- db: Session = Depends(get_db),
-) -> dict[str, Any]:
- _require_admin(request)
- employee = Employee(
- name=payload.name.strip(),
- portal_user_id=_clean(payload.portal_user_id),
- tg_username=_clean(payload.tg_username).lstrip("@") if _clean(payload.tg_username) else None,
- tg_chat_id=_clean(payload.tg_chat_id),
- )
- db.add(employee)
- db.commit()
- db.refresh(employee)
- return _employee_out(employee)
-
-
-@app.patch("/api/v1/employees/{employee_id}")
-def employee_update(
- employee_id: int,
- payload: EmployeeUpdate,
- request: Request,
- db: Session = Depends(get_db),
-) -> dict[str, Any]:
- _require_admin(request)
- employee = db.get(Employee, employee_id)
- if not employee:
- raise HTTPException(404, "employee not found")
- if payload.name is not None:
- employee.name = payload.name.strip()
- if payload.tg_username is not None:
- employee.tg_username = payload.tg_username.strip().lstrip("@") or None
- if payload.tg_chat_id is not None:
- chat_id = _clean(payload.tg_chat_id)
- if chat_id and chat_id != employee.tg_chat_id:
- clash = (
- db.query(Employee)
- .filter(Employee.tg_chat_id == chat_id, Employee.id != employee.id)
- .first()
- )
- if clash:
- raise HTTPException(400, f"chat_id already belongs to {clash.name}")
- employee.tg_chat_id = chat_id
- db.commit()
- db.refresh(employee)
- return _employee_out(employee)
-
-
-@app.delete("/api/v1/employees/{employee_id}", status_code=204)
-def employee_delete(employee_id: int, request: Request, db: Session = Depends(get_db)) -> None:
- _require_admin(request)
- employee = db.get(Employee, employee_id)
- if not employee:
- raise HTTPException(404, "employee not found")
- if employee.projects:
- raise HTTPException(400, "employee has projects")
- db.delete(employee)
- db.commit()
-
-
-@app.get("/api/v1/projects")
-def projects_list(request: Request, db: Session = Depends(get_db)) -> list[dict[str, Any]]:
- employee = _current_employee(request, db)
- projects = (
- db.query(Project)
- .options(joinedload(Project.owner), joinedload(Project.listings))
- .filter(Project.owner_id == employee.id)
- .order_by(Project.created_at.desc())
- .all()
- )
- return [_project_out(project) for project in projects]
-
-
-@app.post("/api/v1/projects", status_code=201)
-def project_create(payload: ProjectCreate, request: Request, db: Session = Depends(get_db)) -> dict[str, Any]:
- owner = _current_employee(request, db)
- project = Project(
- title=payload.title.strip(),
- deal_type=payload.deal_type,
- owner_id=owner.id,
- our_price=payload.our_price,
- notes=_clean(payload.notes),
- dld_permit=_clean(payload.dld_permit),
- building=_clean(payload.building),
- bedrooms=payload.bedrooms,
- size_sqft=payload.size_sqft,
- our_url=_clean(payload.our_url),
- )
- db.add(project)
- db.commit()
- db.refresh(project)
- return _project_out(project, detail=True)
-
-
-@app.get("/api/v1/projects/{project_id}")
-def project_detail(project_id: int, request: Request, db: Session = Depends(get_db)) -> dict[str, Any]:
- project = _owned_project(request, db, project_id, with_detail=True)
- return _project_out(project, detail=True)
-
-
-@app.patch("/api/v1/projects/{project_id}")
-def project_update(
- project_id: int,
- payload: ProjectUpdate,
- request: Request,
- db: Session = Depends(get_db),
-) -> dict[str, Any]:
- project = _owned_project(request, db, project_id)
- data = payload.model_dump(exclude_unset=True)
- data.pop("owner_id", None)
- for field in ("title", "deal_type", "our_price", "notes", "dld_permit", "building", "bedrooms", "size_sqft", "our_url"):
- if field not in data:
- continue
- value = data[field]
- if isinstance(value, str):
- value = _clean(value)
- setattr(project, field, value)
- db.commit()
- db.refresh(project)
- return _project_out(_owned_project(request, db, project_id, with_detail=True), detail=True)
-
-
-@app.delete("/api/v1/projects/{project_id}", status_code=204)
-def project_delete(project_id: int, request: Request, db: Session = Depends(get_db)) -> None:
- project = _owned_project(request, db, project_id)
- db.delete(project)
- db.commit()
-
-
-@app.post("/api/v1/projects/{project_id}/check")
-def project_check_now(project_id: int, request: Request, db: Session = Depends(get_db)) -> dict[str, int]:
- _owned_project(request, db, project_id)
- db.close()
- changes = run_check_for_project(project_id)
- return {"changes": changes}
-
-
-@app.post("/api/v1/projects/{project_id}/listings", status_code=201)
-def listing_create(
- project_id: int,
- payload: ListingCreate,
- request: Request,
- db: Session = Depends(get_db),
-) -> dict[str, Any]:
- project = _owned_project(request, db, project_id)
- listing, err = add_competitor_url(db, project, payload.url)
- if err:
- raise HTTPException(400, err)
- return _listing_out(listing, with_history=True)
-
-
-@app.post("/api/v1/projects/{project_id}/listings/bulk")
-def listings_bulk(
- project_id: int,
- payload: ListingsBulkCreate,
- request: Request,
- db: Session = Depends(get_db),
-) -> dict[str, Any]:
- project = _owned_project(request, db, project_id)
- return add_competitor_urls(db, project, payload.urls)
-
-
-@app.delete("/api/v1/listings/{listing_id}", status_code=204)
-def listing_delete(listing_id: int, request: Request, db: Session = Depends(get_db)) -> None:
- employee = _current_employee(request, db)
- listing = (
- db.query(CompetitorListing)
- .join(Project)
- .filter(CompetitorListing.id == listing_id, Project.owner_id == employee.id)
- .first()
- )
- if not listing:
- raise HTTPException(404, "listing not found")
- db.delete(listing)
- db.commit()
-
-
-@app.get("/api/v1/projects/{project_id}/suggest")
-def project_suggest(project_id: int, request: Request, db: Session = Depends(get_db)) -> dict[str, Any]:
- project = _owned_project(request, db, project_id)
- permit = resolve_our_permit(project)
- suggestions = suggest_similar(project, our_permit=permit)
- return {
- "our_permit": permit,
- "bayut_enabled": BAYUT_ENABLED,
- "suggestions": {
- "propertyfinder": [_suggestion_out(item) for item in suggestions["propertyfinder"]],
- "bayut": [_suggestion_out(item) for item in suggestions["bayut"]],
- },
- }
diff --git a/app/worker.py b/app/worker.py
new file mode 100644
index 0000000..ebbeaec
--- /dev/null
+++ b/app/worker.py
@@ -0,0 +1,136 @@
+"""Internal JSON worker for Go processes.
+
+The Go API/bot/scheduler own infrastructure concerns. Python stays here for
+PropertyFinder/Bayut scraping and the existing SQLAlchemy monitoring logic.
+"""
+
+from __future__ import annotations
+
+import json
+import sys
+from typing import Any
+
+from app.db import SessionLocal, init_db
+from app.models import Project
+from app.services.monitor import (
+ BAYUT_ENABLED,
+ add_competitor_url,
+ add_competitor_urls,
+ resolve_our_permit,
+ run_check_all,
+ run_check_for_project,
+ suggest_similar,
+)
+
+
+def _read_payload() -> dict[str, Any]:
+ raw = sys.stdin.read().strip()
+ if not raw:
+ return {}
+ return json.loads(raw)
+
+
+def _write(payload: Any) -> None:
+ json.dump(payload, sys.stdout, ensure_ascii=False)
+ sys.stdout.write("\n")
+
+
+def _fail(message: str, status: int = 1) -> None:
+ _write({"error": message})
+ raise SystemExit(status)
+
+
+def _suggestion_out(item: Any) -> dict[str, Any]:
+ return {
+ "source": item.source,
+ "external_id": item.external_id,
+ "url": item.url,
+ "title": item.title,
+ "price": item.price,
+ "currency": item.currency,
+ "permit_number": item.permit_number,
+ "agent_name": item.agent_name,
+ "agency_name": item.agency_name,
+ "is_active": item.is_active,
+ }
+
+
+def cmd_add_listing(payload: dict[str, Any]) -> None:
+ project_id = int(payload.get("project_id") or 0)
+ url = str(payload.get("url") or "")
+ db = SessionLocal()
+ try:
+ project = db.get(Project, project_id)
+ if not project:
+ _fail("project not found")
+ listing, err = add_competitor_url(db, project, url)
+ if err:
+ _fail(err)
+ _write({"listing_id": listing.id})
+ finally:
+ db.close()
+
+
+def cmd_add_listings(payload: dict[str, Any]) -> None:
+ project_id = int(payload.get("project_id") or 0)
+ urls = payload.get("urls") or []
+ db = SessionLocal()
+ try:
+ project = db.get(Project, project_id)
+ if not project:
+ _fail("project not found")
+ _write(add_competitor_urls(db, project, urls))
+ finally:
+ db.close()
+
+
+def cmd_check_project(payload: dict[str, Any]) -> None:
+ project_id = int(payload.get("project_id") or 0)
+ _write({"changes": run_check_for_project(project_id)})
+
+
+def cmd_check_all(_: dict[str, Any]) -> None:
+ summary = run_check_all()
+ _write({str(k): v for k, v in summary.items()})
+
+
+def cmd_suggest(payload: dict[str, Any]) -> None:
+ project_id = int(payload.get("project_id") or 0)
+ db = SessionLocal()
+ try:
+ project = db.get(Project, project_id)
+ if not project:
+ _fail("project not found")
+ permit = resolve_our_permit(project)
+ suggestions = suggest_similar(project, our_permit=permit)
+ _write({
+ "our_permit": permit,
+ "bayut_enabled": BAYUT_ENABLED,
+ "suggestions": {
+ "propertyfinder": [_suggestion_out(item) for item in suggestions["propertyfinder"]],
+ "bayut": [_suggestion_out(item) for item in suggestions["bayut"]],
+ },
+ })
+ finally:
+ db.close()
+
+
+COMMANDS = {
+ "add-listing": cmd_add_listing,
+ "add-listings": cmd_add_listings,
+ "check-project": cmd_check_project,
+ "check-all": cmd_check_all,
+ "suggest": cmd_suggest,
+}
+
+
+def main() -> None:
+ if len(sys.argv) < 2 or sys.argv[1] not in COMMANDS:
+ _fail("unknown worker command")
+ init_db()
+ payload = _read_payload()
+ COMMANDS[sys.argv[1]](payload)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/cmd/bot/main.go b/cmd/bot/main.go
new file mode 100644
index 0000000..f889631
--- /dev/null
+++ b/cmd/bot/main.go
@@ -0,0 +1,187 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "fmt"
+ "log/slog"
+ "os"
+ "os/signal"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+
+ "monitoring-pf/internal/pf"
+)
+
+func main() {
+ cfg := pf.LoadConfig()
+ logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
+ slog.SetDefault(logger)
+
+ ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
+ defer stop()
+
+ app, err := pf.OpenApp(ctx, cfg)
+ if err != nil {
+ slog.Error("db_open_failed", "error", err)
+ os.Exit(1)
+ }
+ defer app.Close()
+ if !app.TG.Enabled() {
+ slog.Error("telegram_token_missing")
+ os.Exit(1)
+ }
+
+ slog.Info("monitoring_pf_go_bot_started")
+ var offset int64
+ for ctx.Err() == nil {
+ updates, err := app.TG.GetUpdates(ctx, offset)
+ if err != nil {
+ slog.Warn("telegram_get_updates_failed", "error", err)
+ time.Sleep(3 * time.Second)
+ continue
+ }
+ for _, update := range updates {
+ offset = update.UpdateID + 1
+ if update.Message != nil {
+ handleMessage(ctx, app, update.Message)
+ }
+ }
+ }
+}
+
+func handleMessage(ctx context.Context, app *pf.App, msg *pf.TGMessage) {
+ text := strings.TrimSpace(msg.Text)
+ if text == "" || !strings.HasPrefix(text, "/") {
+ return
+ }
+ command, arg := splitCommand(text)
+ switch command {
+ case "/start":
+ handleStart(ctx, app, msg, arg)
+ case "/whoami":
+ handleWhoami(ctx, app, msg)
+ case "/list":
+ handleList(ctx, app, msg)
+ case "/check":
+ handleCheck(ctx, app, msg)
+ }
+}
+
+func splitCommand(text string) (string, string) {
+ parts := strings.Fields(text)
+ if len(parts) == 0 {
+ return "", ""
+ }
+ command := strings.Split(parts[0], "@")[0]
+ arg := ""
+ if len(parts) > 1 {
+ arg = parts[1]
+ }
+ return command, arg
+}
+
+func handleStart(ctx context.Context, app *pf.App, msg *pf.TGMessage, portalUserID string) {
+ chatID := strconv.FormatInt(msg.Chat.ID, 10)
+ user := msg.From
+ username := ""
+ name := "user_" + chatID
+ if user != nil {
+ username = user.Username
+ name = user.FullName()
+ }
+ if portalUserID == "" {
+ _ = app.TG.SendMessage(ctx, chatID,
+ "Откройте Portal → Мониторинг PF и нажмите подключение Telegram.\n"+
+ "Бот должен получить команду вида:\n/start ваш_код_из_Portal")
+ return
+ }
+ emp, err := app.LinkTelegram(ctx, portalUserID, chatID, username, name)
+ if err != nil {
+ _ = app.TG.SendMessage(ctx, chatID, "Не удалось подключить Telegram: "+err.Error())
+ return
+ }
+ _ = app.TG.SendMessage(ctx, chatID,
+ fmt.Sprintf("✅ Привет, %s! Telegram подключен к вашему аккаунту Portal.\nТеперь можно добавлять объекты мониторинга в Portal.", emp.Name))
+}
+
+func handleWhoami(ctx context.Context, app *pf.App, msg *pf.TGMessage) {
+ chatID := strconv.FormatInt(msg.Chat.ID, 10)
+ emp, err := app.EmployeeByChatID(ctx, chatID)
+ if errors.Is(err, sql.ErrNoRows) {
+ _ = app.TG.SendMessage(ctx, chatID, "Вы пока не подключены. Откройте Portal → Мониторинг PF и запустите подключение.\nchat_id: "+chatID+"")
+ return
+ }
+ if err != nil {
+ _ = app.TG.SendMessage(ctx, chatID, "Ошибка: "+err.Error())
+ return
+ }
+ _ = app.TG.SendMessage(ctx, chatID, fmt.Sprintf("Вы: %s\nchat_id: %s", emp.Name, chatID))
+}
+
+func handleList(ctx context.Context, app *pf.App, msg *pf.TGMessage) {
+ chatID := strconv.FormatInt(msg.Chat.ID, 10)
+ emp, err := app.EmployeeByChatID(ctx, chatID)
+ if errors.Is(err, sql.ErrNoRows) {
+ _ = app.TG.SendMessage(ctx, chatID, "Сначала подключитесь через Portal → Мониторинг PF.")
+ return
+ }
+ if err != nil {
+ _ = app.TG.SendMessage(ctx, chatID, "Ошибка: "+err.Error())
+ return
+ }
+ projects, err := app.ListProjects(ctx, emp.ID)
+ if err != nil {
+ _ = app.TG.SendMessage(ctx, chatID, "Ошибка: "+err.Error())
+ return
+ }
+ if len(projects) == 0 {
+ _ = app.TG.SendMessage(ctx, chatID, "У вас пока нет проектов.")
+ return
+ }
+ lines := []string{fmt.Sprintf("Ваши проекты (%d):", len(projects))}
+ for _, p := range projects {
+ permit := "—"
+ if p.DLDPermit != nil {
+ permit = *p.DLDPermit
+ }
+ lines = append(lines, fmt.Sprintf("• #%d %s — %s (%s)", p.ID, p.Title, permit, p.DealType))
+ }
+ _ = app.TG.SendMessage(ctx, chatID, strings.Join(lines, "\n"))
+}
+
+func handleCheck(ctx context.Context, app *pf.App, msg *pf.TGMessage) {
+ chatID := strconv.FormatInt(msg.Chat.ID, 10)
+ emp, err := app.EmployeeByChatID(ctx, chatID)
+ if errors.Is(err, sql.ErrNoRows) {
+ _ = app.TG.SendMessage(ctx, chatID, "Сначала подключитесь через Portal → Мониторинг PF.")
+ return
+ }
+ if err != nil {
+ _ = app.TG.SendMessage(ctx, chatID, "Ошибка: "+err.Error())
+ return
+ }
+ projects, err := app.ListProjects(ctx, emp.ID)
+ if err != nil {
+ _ = app.TG.SendMessage(ctx, chatID, "Ошибка: "+err.Error())
+ return
+ }
+ if len(projects) == 0 {
+ _ = app.TG.SendMessage(ctx, chatID, "У вас нет проектов.")
+ return
+ }
+ _ = app.TG.SendMessage(ctx, chatID, fmt.Sprintf("⏳ Запускаю проверку %d проектов…", len(projects)))
+ total := 0
+ for _, p := range projects {
+ changes, err := app.Worker.CheckProject(ctx, p.ID)
+ if err != nil {
+ slog.Warn("check_project_failed", "project_id", p.ID, "error", err)
+ continue
+ }
+ total += changes
+ }
+ _ = app.TG.SendMessage(ctx, chatID, fmt.Sprintf("✅ Готово. Изменений: %d", total))
+}
diff --git a/cmd/scheduler/main.go b/cmd/scheduler/main.go
new file mode 100644
index 0000000..50b1585
--- /dev/null
+++ b/cmd/scheduler/main.go
@@ -0,0 +1,61 @@
+package main
+
+import (
+ "context"
+ "log/slog"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "monitoring-pf/internal/pf"
+)
+
+func main() {
+ cfg := pf.LoadConfig()
+ logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
+ slog.SetDefault(logger)
+
+ ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
+ defer stop()
+
+ app, err := pf.OpenApp(ctx, cfg)
+ if err != nil {
+ slog.Error("db_open_failed", "error", err)
+ os.Exit(1)
+ }
+ defer app.Close()
+
+ interval := cfg.SchedulerInterval()
+ slog.Info("monitoring_pf_scheduler_started", "interval", interval.String())
+ timer := time.NewTimer(interval)
+ defer timer.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ slog.Info("monitoring_pf_scheduler_stopped")
+ return
+ case <-timer.C:
+ run(ctx, app)
+ timer.Reset(interval)
+ }
+ }
+}
+
+func run(ctx context.Context, app *pf.App) {
+ start := time.Now()
+ slog.Info("scheduled_scan_starting")
+ summary, err := app.Worker.CheckAll(ctx)
+ if err != nil {
+ slog.Error("scheduled_scan_failed", "error", err)
+ return
+ }
+ total := 0
+ for _, changes := range summary {
+ if changes > 0 {
+ total += changes
+ }
+ }
+ slog.Info("scheduled_scan_done", "projects", len(summary), "total_changes", total, "elapsed", time.Since(start).String())
+}
diff --git a/cmd/server/main.go b/cmd/server/main.go
new file mode 100644
index 0000000..cc1db39
--- /dev/null
+++ b/cmd/server/main.go
@@ -0,0 +1,50 @@
+package main
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "net/http"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "monitoring-pf/internal/pf"
+)
+
+func main() {
+ cfg := pf.LoadConfig()
+ logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
+ slog.SetDefault(logger)
+
+ ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
+ defer stop()
+
+ app, err := pf.OpenApp(ctx, cfg)
+ if err != nil {
+ slog.Error("db_open_failed", "error", err)
+ os.Exit(1)
+ }
+ defer app.Close()
+
+ server := &http.Server{
+ Addr: fmt.Sprintf("%s:%d", cfg.WebHost, cfg.WebPort),
+ Handler: pf.Server{App: app},
+ ReadHeaderTimeout: 10 * time.Second,
+ }
+
+ go func() {
+ <-ctx.Done()
+ shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ _ = server.Shutdown(shutdownCtx)
+ }()
+
+ slog.Info("monitoring_pf_go_server_started", "addr", server.Addr)
+ if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
+ slog.Error("server_failed", "error", err)
+ os.Exit(1)
+ }
+}
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..ce6b8ab
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,17 @@
+module monitoring-pf
+
+go 1.25.0
+
+require modernc.org/sqlite v1.50.1
+
+require (
+ github.com/dustin/go-humanize v1.0.1 // indirect
+ github.com/google/uuid v1.6.0 // indirect
+ github.com/mattn/go-isatty v0.0.20 // indirect
+ github.com/ncruces/go-strftime v1.0.0 // indirect
+ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
+ golang.org/x/sys v0.42.0 // indirect
+ modernc.org/libc v1.72.3 // indirect
+ modernc.org/mathutil v1.7.1 // indirect
+ modernc.org/memory v1.11.0 // indirect
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..38131a8
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,51 @@
+github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
+github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
+github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
+github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
+github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
+github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
+github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
+github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
+github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
+golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8=
+golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w=
+golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
+golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
+golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
+golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
+golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=
+modernc.org/cc/v4 v4.28.2 h1:3tQ0lf2ADtoby2EtSP+J7IE2SHwEJdP8ioR59wx7XpY=
+modernc.org/cc/v4 v4.28.2/go.mod h1:OnovgIhbbMXMu1aISnJ0wvVD1KnW+cAUJkIrAWh+kVI=
+modernc.org/ccgo/v4 v4.34.0 h1:yRLPFZieg532OT4rp4JFNIVcquwalMX26G95WQDqwCQ=
+modernc.org/ccgo/v4 v4.34.0/go.mod h1:AS5WYMyBakQ+fhsHhtP8mWB82KTGPkNNJDGfGQCe0/A=
+modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM=
+modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU=
+modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
+modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
+modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo=
+modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY=
+modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks=
+modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI=
+modernc.org/libc v1.72.3 h1:ZnDF4tXn4NBXFutMMQC4vtbTFSXhhKzR73fv0beZEAU=
+modernc.org/libc v1.72.3/go.mod h1:dn0dZNnnn1clLyvRxLxYExxiKRZIRENOfqQ8XEeg4Qs=
+modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
+modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
+modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
+modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
+modernc.org/opt v0.2.0 h1:tGyef5ApycA7FSEOMraay9SaTk5zmbx7Tu+cJs4QKZg=
+modernc.org/opt v0.2.0/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
+modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
+modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
+modernc.org/sqlite v1.50.1 h1:l+cQvn0sd0zJJtfygGHuQJ5AjlrwXmWPw4KP3ZMwr9w=
+modernc.org/sqlite v1.50.1/go.mod h1:tcNzv5p84E0skkmJn038y+hWJbLQXQqEnQfeh5r2JLM=
+modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
+modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
+modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
+modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
diff --git a/internal/pf/config.go b/internal/pf/config.go
new file mode 100644
index 0000000..a9bfa26
--- /dev/null
+++ b/internal/pf/config.go
@@ -0,0 +1,58 @@
+package pf
+
+import (
+ "os"
+ "strconv"
+ "strings"
+ "time"
+)
+
+type Config struct {
+ WebHost string
+ WebPort int
+ PublicBasePath string
+ DatabaseURL string
+ ScrapeIntervalHours int
+ TGBotToken string
+ TGBotUsername string
+ WorkerPython string
+ WorkerModule string
+}
+
+func LoadConfig() Config {
+ return Config{
+ WebHost: env("WEB_HOST", "127.0.0.1"),
+ WebPort: envInt("WEB_PORT", 8000),
+ PublicBasePath: strings.TrimRight(env("PUBLIC_BASE_PATH", ""), "/"),
+ DatabaseURL: env("DATABASE_URL", "sqlite:///data/monitor.db"),
+ ScrapeIntervalHours: max(1, envInt("SCRAPE_INTERVAL_HOURS", 4)),
+ TGBotToken: env("TG_BOT_TOKEN", ""),
+ TGBotUsername: strings.TrimPrefix(env("TG_BOT_USERNAME", ""), "@"),
+ WorkerPython: env("WORKER_PYTHON", "python"),
+ WorkerModule: env("WORKER_MODULE", "app.worker"),
+ }
+}
+
+func (c Config) SchedulerInterval() time.Duration {
+ return time.Duration(c.ScrapeIntervalHours) * time.Hour
+}
+
+func env(key, fallback string) string {
+ value := strings.TrimSpace(os.Getenv(key))
+ if value == "" {
+ return fallback
+ }
+ return value
+}
+
+func envInt(key string, fallback int) int {
+ raw := strings.TrimSpace(os.Getenv(key))
+ if raw == "" {
+ return fallback
+ }
+ value, err := strconv.Atoi(raw)
+ if err != nil {
+ return fallback
+ }
+ return value
+}
diff --git a/internal/pf/db.go b/internal/pf/db.go
new file mode 100644
index 0000000..42276aa
--- /dev/null
+++ b/internal/pf/db.go
@@ -0,0 +1,316 @@
+package pf
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "net/url"
+ "path/filepath"
+ "strings"
+ "time"
+
+ _ "modernc.org/sqlite"
+)
+
+type App struct {
+ Cfg Config
+ DB *sql.DB
+ Worker *Worker
+ TG *Telegram
+}
+
+type Employee struct {
+ ID int64 `json:"id"`
+ Name string `json:"name"`
+ PortalUserID *string `json:"portal_user_id"`
+ TGChatID *string `json:"tg_chat_id"`
+ TGUsername *string `json:"tg_username"`
+ ProjectsTotal int64 `json:"projects_total"`
+ CreatedAt *string `json:"created_at"`
+}
+
+type Project struct {
+ ID int64 `json:"id"`
+ Title string `json:"title"`
+ DealType string `json:"deal_type"`
+ OurPrice *float64 `json:"our_price"`
+ Notes *string `json:"notes"`
+ DLDPermit *string `json:"dld_permit"`
+ Building *string `json:"building"`
+ Bedrooms *int64 `json:"bedrooms"`
+ SizeSqft *float64 `json:"size_sqft"`
+ OurURL *string `json:"our_url"`
+ OwnerID int64 `json:"owner_id"`
+ Owner *Employee `json:"owner,omitempty"`
+ CreatedAt *string `json:"created_at"`
+ LastCheckedAt *string `json:"last_checked_at"`
+ ListingsTotal int64 `json:"listings_total"`
+ ListingsActive int64 `json:"listings_active"`
+ ListingsRemoved int64 `json:"listings_removed"`
+ MinCompetitorPrice *float64 `json:"min_competitor_price"`
+ Listings []Listing `json:"listings,omitempty"`
+}
+
+type Listing struct {
+ ID int64 `json:"id"`
+ ProjectID int64 `json:"project_id"`
+ Source string `json:"source"`
+ ExternalID string `json:"external_id"`
+ URL string `json:"url"`
+ Title *string `json:"title"`
+ AgentName *string `json:"agent_name"`
+ AgencyName *string `json:"agency_name"`
+ CurrentPrice *float64 `json:"current_price"`
+ Currency *string `json:"currency"`
+ Status string `json:"status"`
+ FirstSeenAt *string `json:"first_seen_at"`
+ LastSeenAt *string `json:"last_seen_at"`
+ PriceHistory []PricePoint `json:"price_history,omitempty"`
+}
+
+type PricePoint struct {
+ ID int64 `json:"id"`
+ Price *float64 `json:"price"`
+ RecordedAt *string `json:"recorded_at"`
+}
+
+func OpenApp(ctx context.Context, cfg Config) (*App, error) {
+ db, err := sql.Open("sqlite", sqliteDSN(cfg.DatabaseURL))
+ if err != nil {
+ return nil, err
+ }
+ db.SetMaxOpenConns(1)
+ if err := db.PingContext(ctx); err != nil {
+ _ = db.Close()
+ return nil, err
+ }
+ app := &App{
+ Cfg: cfg,
+ DB: db,
+ Worker: NewWorker(cfg),
+ TG: NewTelegram(cfg.TGBotToken),
+ }
+ if err := app.InitDB(ctx); err != nil {
+ _ = db.Close()
+ return nil, err
+ }
+ return app, nil
+}
+
+func sqliteDSN(databaseURL string) string {
+ const prefix = "sqlite:///"
+ if strings.HasPrefix(databaseURL, prefix) {
+ path := strings.TrimPrefix(databaseURL, prefix)
+ if !strings.HasPrefix(path, "/") {
+ path = filepath.Clean(path)
+ }
+ return path
+ }
+ if strings.HasPrefix(databaseURL, "sqlite://") {
+ u, err := url.Parse(databaseURL)
+ if err == nil && u.Path != "" {
+ return u.Path
+ }
+ }
+ return databaseURL
+}
+
+func (a *App) Close() error {
+ return a.DB.Close()
+}
+
+func (a *App) InitDB(ctx context.Context) error {
+ stmts := []string{
+ `CREATE TABLE IF NOT EXISTS employees (
+ id INTEGER PRIMARY KEY,
+ name VARCHAR(200) NOT NULL,
+ portal_user_id VARCHAR(100),
+ tg_chat_id VARCHAR(64),
+ tg_username VARCHAR(200),
+ created_at DATETIME
+ )`,
+ `CREATE UNIQUE INDEX IF NOT EXISTS ix_employees_portal_user_id ON employees (portal_user_id)`,
+ `CREATE UNIQUE INDEX IF NOT EXISTS ix_employees_tg_chat_id ON employees (tg_chat_id)`,
+ `CREATE TABLE IF NOT EXISTS projects (
+ id INTEGER PRIMARY KEY,
+ title VARCHAR(300) NOT NULL,
+ deal_type VARCHAR(4) NOT NULL,
+ our_price FLOAT,
+ notes TEXT,
+ dld_permit VARCHAR(100),
+ building VARCHAR(300),
+ bedrooms INTEGER,
+ size_sqft FLOAT,
+ our_url TEXT,
+ owner_id INTEGER NOT NULL,
+ created_at DATETIME,
+ last_checked_at DATETIME,
+ FOREIGN KEY(owner_id) REFERENCES employees(id)
+ )`,
+ `CREATE INDEX IF NOT EXISTS ix_projects_dld_permit ON projects (dld_permit)`,
+ `CREATE TABLE IF NOT EXISTS competitor_listings (
+ id INTEGER PRIMARY KEY,
+ project_id INTEGER NOT NULL,
+ source VARCHAR(14) NOT NULL,
+ external_id VARCHAR(100) NOT NULL,
+ url TEXT NOT NULL,
+ title VARCHAR(500),
+ agent_name VARCHAR(300),
+ agency_name VARCHAR(300),
+ current_price FLOAT,
+ currency VARCHAR(10),
+ status VARCHAR(7) NOT NULL,
+ first_seen_at DATETIME,
+ last_seen_at DATETIME,
+ FOREIGN KEY(project_id) REFERENCES projects(id)
+ )`,
+ `CREATE UNIQUE INDEX IF NOT EXISTS uq_listing ON competitor_listings (project_id, source, external_id)`,
+ `CREATE TABLE IF NOT EXISTS price_history (
+ id INTEGER PRIMARY KEY,
+ listing_id INTEGER NOT NULL,
+ price FLOAT,
+ recorded_at DATETIME,
+ FOREIGN KEY(listing_id) REFERENCES competitor_listings(id)
+ )`,
+ }
+ for _, stmt := range stmts {
+ if _, err := a.DB.ExecContext(ctx, stmt); err != nil {
+ return err
+ }
+ }
+ return a.migrateEmployees(ctx)
+}
+
+func (a *App) migrateEmployees(ctx context.Context) error {
+ rows, err := a.DB.QueryContext(ctx, `PRAGMA table_info(employees)`)
+ if err != nil {
+ return err
+ }
+ defer rows.Close()
+ columns := map[string]bool{}
+ for rows.Next() {
+ var cid int
+ var name, typ string
+ var notNull int
+ var defaultValue any
+ var pk int
+ if err := rows.Scan(&cid, &name, &typ, ¬Null, &defaultValue, &pk); err != nil {
+ return err
+ }
+ columns[name] = true
+ }
+ if !columns["portal_user_id"] {
+ if _, err := a.DB.ExecContext(ctx, `ALTER TABLE employees ADD COLUMN portal_user_id VARCHAR(100)`); err != nil {
+ return err
+ }
+ }
+ _, err = a.DB.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS ix_employees_portal_user_id ON employees (portal_user_id)`)
+ return err
+}
+
+func cleanPtr(value *string) *string {
+ if value == nil {
+ return nil
+ }
+ v := strings.TrimSpace(*value)
+ if v == "" {
+ return nil
+ }
+ return &v
+}
+
+func cleanString(value string) string {
+ return strings.TrimSpace(value)
+}
+
+func dbNow() string {
+ return time.Now().UTC().Format("2006-01-02 15:04:05.000000")
+}
+
+func enumDealIn(value string) (string, error) {
+ switch strings.ToLower(strings.TrimSpace(value)) {
+ case "sale", "SALE":
+ return "SALE", nil
+ case "rent", "RENT":
+ return "RENT", nil
+ default:
+ return "", fmt.Errorf("invalid deal_type")
+ }
+}
+
+func enumDealOut(value string) string {
+ switch strings.ToUpper(value) {
+ case "RENT":
+ return "rent"
+ default:
+ return "sale"
+ }
+}
+
+func enumSourceOut(value string) string {
+ switch strings.ToUpper(value) {
+ case "BAYUT":
+ return "bayut"
+ default:
+ return "propertyfinder"
+ }
+}
+
+func enumStatusOut(value string) string {
+ switch strings.ToUpper(value) {
+ case "REMOVED":
+ return "removed"
+ default:
+ return "active"
+ }
+}
+
+func enumStatusIn(value string) string {
+ switch strings.ToLower(value) {
+ case "removed":
+ return "REMOVED"
+ default:
+ return "ACTIVE"
+ }
+}
+
+func timeOut(raw sql.NullString) *string {
+ if !raw.Valid || strings.TrimSpace(raw.String) == "" {
+ return nil
+ }
+ value := strings.TrimSpace(raw.String)
+ layouts := []string{
+ time.RFC3339Nano,
+ "2006-01-02 15:04:05.999999",
+ "2006-01-02 15:04:05",
+ "2006-01-02T15:04:05.999999",
+ }
+ for _, layout := range layouts {
+ if t, err := time.Parse(layout, value); err == nil {
+ out := t.UTC().Format(time.RFC3339)
+ return &out
+ }
+ }
+ return &value
+}
+
+func nullableString(ns sql.NullString) *string {
+ if !ns.Valid {
+ return nil
+ }
+ return &ns.String
+}
+
+func nullableFloat(nf sql.NullFloat64) *float64 {
+ if !nf.Valid {
+ return nil
+ }
+ return &nf.Float64
+}
+
+func nullableInt(ni sql.NullInt64) *int64 {
+ if !ni.Valid {
+ return nil
+ }
+ return &ni.Int64
+}
diff --git a/internal/pf/http.go b/internal/pf/http.go
new file mode 100644
index 0000000..052d117
--- /dev/null
+++ b/internal/pf/http.go
@@ -0,0 +1,414 @@
+package pf
+
+import (
+ "encoding/json"
+ "errors"
+ "net/http"
+ "strconv"
+ "strings"
+)
+
+type Server struct {
+ App *App
+}
+
+type listingPayload struct {
+ URL string `json:"url"`
+}
+
+type bulkPayload struct {
+ URLs []string `json:"urls"`
+}
+
+func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ path := s.apiPath(r.URL.Path)
+ switch {
+ case path == "/healthz":
+ writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
+ case path == "/":
+ writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-pf", "ui": "portal", "api": "go"})
+ case !strings.HasPrefix(path, "/api/v1"):
+ writeError(w, http.StatusNotFound, "not found")
+ case path == "/api/v1/access/me" && r.Method == http.MethodGet:
+ s.accessMe(w, r)
+ case path == "/api/v1/summary" && r.Method == http.MethodGet:
+ s.summary(w, r)
+ case path == "/api/v1/employees":
+ s.employees(w, r)
+ case strings.HasPrefix(path, "/api/v1/employees/"):
+ s.employeeItem(w, r, path)
+ case path == "/api/v1/projects":
+ s.projects(w, r)
+ case strings.HasPrefix(path, "/api/v1/projects/"):
+ s.projectItem(w, r, path)
+ case strings.HasPrefix(path, "/api/v1/listings/"):
+ s.listingItem(w, r, path)
+ default:
+ writeError(w, http.StatusNotFound, "not found")
+ }
+}
+
+func (s Server) apiPath(path string) string {
+ base := s.App.Cfg.PublicBasePath
+ if base != "" && path == base {
+ return "/"
+ }
+ if base != "" && strings.HasPrefix(path, base+"/") {
+ return strings.TrimPrefix(path, base)
+ }
+ return path
+}
+
+func (s Server) accessMe(w http.ResponseWriter, r *http.Request) {
+ portalID := portalUserID(r)
+ emp, err := s.App.CurrentEmployee(r.Context(), portalID, false)
+ if err != nil {
+ writeError(w, http.StatusInternalServerError, err.Error())
+ return
+ }
+ var link *string
+ if s.App.Cfg.TGBotUsername != "" && portalID != "" {
+ v := "https://t.me/" + s.App.Cfg.TGBotUsername + "?start=" + portalID
+ link = &v
+ }
+ var command *string
+ if portalID != "" {
+ v := "/start " + portalID
+ command = &v
+ }
+ writeJSON(w, http.StatusOK, map[string]any{
+ "is_admin": isAdmin(r),
+ "portal_user_id": nullablePlain(portalID),
+ "telegram_linked": emp != nil && emp.TGChatID != nil && *emp.TGChatID != "",
+ "employee": emp,
+ "telegram_bot_username": nullablePlain(s.App.Cfg.TGBotUsername),
+ "telegram_start_command": command,
+ "telegram_start_link": link,
+ })
+}
+
+func (s Server) summary(w http.ResponseWriter, r *http.Request) {
+ emp, err := s.App.CurrentEmployee(r.Context(), portalUserID(r), false)
+ if err != nil {
+ writeError(w, http.StatusInternalServerError, err.Error())
+ return
+ }
+ if emp != nil && emp.TGChatID == nil {
+ emp = nil
+ }
+ out, err := s.App.Summary(r.Context(), emp)
+ if err != nil {
+ writeError(w, http.StatusInternalServerError, err.Error())
+ return
+ }
+ writeJSON(w, http.StatusOK, out)
+}
+
+func (s Server) employees(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case http.MethodGet:
+ emp, err := s.App.CurrentEmployee(r.Context(), portalUserID(r), false)
+ if err != nil {
+ writeError(w, http.StatusInternalServerError, err.Error())
+ return
+ }
+ items, err := s.App.ListEmployees(r.Context(), isAdmin(r), emp)
+ if err != nil {
+ writeError(w, http.StatusInternalServerError, err.Error())
+ return
+ }
+ writeJSON(w, http.StatusOK, items)
+ case http.MethodPost:
+ if !isAdmin(r) {
+ writeError(w, http.StatusNotFound, "not found")
+ return
+ }
+ var payload EmployeePayload
+ if !decodeJSON(w, r, &payload) {
+ return
+ }
+ emp, err := s.App.CreateEmployee(r.Context(), payload)
+ if err != nil {
+ writeError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+ writeJSON(w, http.StatusCreated, emp)
+ default:
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ }
+}
+
+func (s Server) employeeItem(w http.ResponseWriter, r *http.Request, path string) {
+ id, ok := pathID(w, strings.TrimPrefix(path, "/api/v1/employees/"))
+ if !ok {
+ return
+ }
+ if !isAdmin(r) {
+ writeError(w, http.StatusNotFound, "not found")
+ return
+ }
+ switch r.Method {
+ case http.MethodPatch:
+ var payload EmployeePayload
+ if !decodeJSON(w, r, &payload) {
+ return
+ }
+ emp, err := s.App.UpdateEmployee(r.Context(), id, payload)
+ if err != nil {
+ status := http.StatusBadRequest
+ if errors.Is(err, ErrNotFound) {
+ status = http.StatusNotFound
+ }
+ writeError(w, status, err.Error())
+ return
+ }
+ writeJSON(w, http.StatusOK, emp)
+ case http.MethodDelete:
+ err := s.App.DeleteEmployee(r.Context(), id)
+ if err != nil {
+ status := http.StatusBadRequest
+ if errors.Is(err, ErrNotFound) {
+ status = http.StatusNotFound
+ }
+ writeError(w, status, err.Error())
+ return
+ }
+ w.WriteHeader(http.StatusNoContent)
+ default:
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ }
+}
+
+func (s Server) projects(w http.ResponseWriter, r *http.Request) {
+ emp, ok := s.requireEmployee(w, r)
+ if !ok {
+ return
+ }
+ switch r.Method {
+ case http.MethodGet:
+ items, err := s.App.ListProjects(r.Context(), emp.ID)
+ if err != nil {
+ writeError(w, http.StatusInternalServerError, err.Error())
+ return
+ }
+ writeJSON(w, http.StatusOK, items)
+ case http.MethodPost:
+ var payload ProjectPayload
+ if !decodeJSON(w, r, &payload) {
+ return
+ }
+ project, err := s.App.CreateProject(r.Context(), emp.ID, payload)
+ if err != nil {
+ writeError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+ writeJSON(w, http.StatusCreated, project)
+ default:
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ }
+}
+
+func (s Server) projectItem(w http.ResponseWriter, r *http.Request, path string) {
+ emp, ok := s.requireEmployee(w, r)
+ if !ok {
+ return
+ }
+ rest := strings.TrimPrefix(path, "/api/v1/projects/")
+ parts := strings.Split(strings.Trim(rest, "/"), "/")
+ if len(parts) == 0 {
+ writeError(w, http.StatusNotFound, "not found")
+ return
+ }
+ projectID, ok := pathID(w, parts[0])
+ if !ok {
+ return
+ }
+ if len(parts) == 1 {
+ s.projectCRUD(w, r, emp.ID, projectID)
+ return
+ }
+ switch {
+ case len(parts) == 2 && parts[1] == "check" && r.Method == http.MethodPost:
+ if _, err := s.App.ProjectByID(r.Context(), emp.ID, projectID, false); err != nil {
+ writeError(w, http.StatusNotFound, "project not found")
+ return
+ }
+ changes, err := s.App.Worker.CheckProject(r.Context(), projectID)
+ if err != nil {
+ writeError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+ writeJSON(w, http.StatusOK, map[string]int{"changes": changes})
+ case len(parts) == 2 && parts[1] == "suggest" && r.Method == http.MethodGet:
+ if _, err := s.App.ProjectByID(r.Context(), emp.ID, projectID, false); err != nil {
+ writeError(w, http.StatusNotFound, "project not found")
+ return
+ }
+ out, err := s.App.Worker.Suggest(r.Context(), projectID)
+ if err != nil {
+ writeError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+ writeJSON(w, http.StatusOK, out)
+ case len(parts) == 2 && parts[1] == "listings" && r.Method == http.MethodPost:
+ s.addListing(w, r, emp.ID, projectID)
+ case len(parts) == 3 && parts[1] == "listings" && parts[2] == "bulk" && r.Method == http.MethodPost:
+ s.addListings(w, r, emp.ID, projectID)
+ default:
+ writeError(w, http.StatusNotFound, "not found")
+ }
+}
+
+func (s Server) projectCRUD(w http.ResponseWriter, r *http.Request, ownerID, projectID int64) {
+ switch r.Method {
+ case http.MethodGet:
+ project, err := s.App.ProjectByID(r.Context(), ownerID, projectID, true)
+ if err != nil {
+ writeError(w, http.StatusNotFound, "project not found")
+ return
+ }
+ writeJSON(w, http.StatusOK, project)
+ case http.MethodPatch:
+ var payload ProjectPayload
+ if !decodeJSON(w, r, &payload) {
+ return
+ }
+ project, err := s.App.UpdateProject(r.Context(), ownerID, projectID, payload)
+ if err != nil {
+ status := http.StatusBadRequest
+ if errors.Is(err, ErrNotFound) {
+ status = http.StatusNotFound
+ }
+ writeError(w, status, err.Error())
+ return
+ }
+ writeJSON(w, http.StatusOK, project)
+ case http.MethodDelete:
+ if err := s.App.DeleteProject(r.Context(), ownerID, projectID); err != nil {
+ writeError(w, http.StatusNotFound, "project not found")
+ return
+ }
+ w.WriteHeader(http.StatusNoContent)
+ default:
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ }
+}
+
+func (s Server) addListing(w http.ResponseWriter, r *http.Request, ownerID, projectID int64) {
+ if _, err := s.App.ProjectByID(r.Context(), ownerID, projectID, false); err != nil {
+ writeError(w, http.StatusNotFound, "project not found")
+ return
+ }
+ var payload listingPayload
+ if !decodeJSON(w, r, &payload) {
+ return
+ }
+ id, err := s.App.Worker.AddListing(r.Context(), projectID, payload.URL)
+ if err != nil {
+ writeError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+ listing, err := s.App.ListingByID(r.Context(), id, true)
+ if err != nil {
+ writeError(w, http.StatusInternalServerError, err.Error())
+ return
+ }
+ writeJSON(w, http.StatusCreated, listing)
+}
+
+func (s Server) addListings(w http.ResponseWriter, r *http.Request, ownerID, projectID int64) {
+ if _, err := s.App.ProjectByID(r.Context(), ownerID, projectID, false); err != nil {
+ writeError(w, http.StatusNotFound, "project not found")
+ return
+ }
+ var payload bulkPayload
+ if !decodeJSON(w, r, &payload) {
+ return
+ }
+ out, err := s.App.Worker.AddListings(r.Context(), projectID, payload.URLs)
+ if err != nil {
+ writeError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+ writeJSON(w, http.StatusOK, out)
+}
+
+func (s Server) listingItem(w http.ResponseWriter, r *http.Request, path string) {
+ emp, ok := s.requireEmployee(w, r)
+ if !ok {
+ return
+ }
+ id, ok := pathID(w, strings.TrimPrefix(path, "/api/v1/listings/"))
+ if !ok {
+ return
+ }
+ if r.Method != http.MethodDelete {
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+ if err := s.App.DeleteListing(r.Context(), emp.ID, id); err != nil {
+ writeError(w, http.StatusNotFound, "listing not found")
+ return
+ }
+ w.WriteHeader(http.StatusNoContent)
+}
+
+func (s Server) requireEmployee(w http.ResponseWriter, r *http.Request) (*Employee, bool) {
+ emp, err := s.App.CurrentEmployee(r.Context(), portalUserID(r), true)
+ if errors.Is(err, ErrTelegramRequired) {
+ writeError(w, http.StatusForbidden, "Сначала авторизуйтесь в Telegram-боте Monitoring PF")
+ return nil, false
+ }
+ if err != nil {
+ writeError(w, http.StatusInternalServerError, err.Error())
+ return nil, false
+ }
+ return emp, true
+}
+
+func portalUserID(r *http.Request) string {
+ return strings.TrimSpace(r.Header.Get("X-User-Id"))
+}
+
+func isAdmin(r *http.Request) bool {
+ return r.Header.Get("X-User-Is-Admin") == "1"
+}
+
+func nullablePlain(value string) *string {
+ if strings.TrimSpace(value) == "" {
+ return nil
+ }
+ return &value
+}
+
+func pathID(w http.ResponseWriter, value string) (int64, bool) {
+ if strings.Contains(value, "/") {
+ writeError(w, http.StatusNotFound, "not found")
+ return 0, false
+ }
+ id, err := strconv.ParseInt(value, 10, 64)
+ if err != nil || id <= 0 {
+ writeError(w, http.StatusNotFound, "not found")
+ return 0, false
+ }
+ return id, true
+}
+
+func decodeJSON(w http.ResponseWriter, r *http.Request, out any) bool {
+ defer r.Body.Close()
+ if err := json.NewDecoder(r.Body).Decode(out); err != nil {
+ writeError(w, http.StatusBadRequest, "invalid json")
+ return false
+ }
+ return true
+}
+
+func writeJSON(w http.ResponseWriter, status int, value any) {
+ w.Header().Set("Content-Type", "application/json; charset=utf-8")
+ w.WriteHeader(status)
+ _ = json.NewEncoder(w).Encode(value)
+}
+
+func writeError(w http.ResponseWriter, status int, detail string) {
+ writeJSON(w, status, map[string]string{"detail": detail})
+}
diff --git a/internal/pf/store.go b/internal/pf/store.go
new file mode 100644
index 0000000..511ab74
--- /dev/null
+++ b/internal/pf/store.go
@@ -0,0 +1,569 @@
+package pf
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "fmt"
+)
+
+var ErrNotFound = errors.New("not found")
+var ErrTelegramRequired = errors.New("telegram required")
+
+func (a *App) CurrentEmployee(ctx context.Context, portalUserID string, required bool) (*Employee, error) {
+ if portalUserID == "" {
+ if required {
+ return nil, ErrTelegramRequired
+ }
+ return nil, nil
+ }
+ emp, err := a.EmployeeByPortalUserID(ctx, portalUserID)
+ if errors.Is(err, sql.ErrNoRows) {
+ if required {
+ return nil, ErrTelegramRequired
+ }
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ if emp.TGChatID == nil || *emp.TGChatID == "" {
+ if required {
+ return nil, ErrTelegramRequired
+ }
+ }
+ return emp, nil
+}
+
+func (a *App) EmployeeByPortalUserID(ctx context.Context, portalUserID string) (*Employee, error) {
+ row := a.DB.QueryRowContext(ctx, employeeSelect()+` WHERE e.portal_user_id = ?`, portalUserID)
+ return scanEmployee(row)
+}
+
+func (a *App) EmployeeByChatID(ctx context.Context, chatID string) (*Employee, error) {
+ row := a.DB.QueryRowContext(ctx, employeeSelect()+` WHERE e.tg_chat_id = ?`, chatID)
+ return scanEmployee(row)
+}
+
+func (a *App) ListEmployees(ctx context.Context, isAdmin bool, current *Employee) ([]Employee, error) {
+ if !isAdmin {
+ if current == nil {
+ return []Employee{}, nil
+ }
+ return []Employee{*current}, nil
+ }
+ rows, err := a.DB.QueryContext(ctx, employeeSelect()+` ORDER BY e.name`)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ return scanEmployees(rows)
+}
+
+type EmployeePayload struct {
+ Name string `json:"name"`
+ PortalUserID *string `json:"portal_user_id"`
+ TGUsername *string `json:"tg_username"`
+ TGChatID *string `json:"tg_chat_id"`
+}
+
+func (a *App) CreateEmployee(ctx context.Context, p EmployeePayload) (*Employee, error) {
+ name := cleanString(p.Name)
+ if name == "" {
+ return nil, fmt.Errorf("name is required")
+ }
+ username := cleanPtr(p.TGUsername)
+ if username != nil && len(*username) > 0 && (*username)[0] == '@' {
+ u := (*username)[1:]
+ username = &u
+ }
+ res, err := a.DB.ExecContext(ctx, `
+ INSERT INTO employees (name, portal_user_id, tg_chat_id, tg_username, created_at)
+ VALUES (?, ?, ?, ?, ?)`,
+ name, cleanPtr(p.PortalUserID), cleanPtr(p.TGChatID), username, dbNow(),
+ )
+ if err != nil {
+ return nil, err
+ }
+ id, _ := res.LastInsertId()
+ return a.EmployeeByID(ctx, id)
+}
+
+func (a *App) EmployeeByID(ctx context.Context, id int64) (*Employee, error) {
+ row := a.DB.QueryRowContext(ctx, employeeSelect()+` WHERE e.id = ?`, id)
+ return scanEmployee(row)
+}
+
+func (a *App) UpdateEmployee(ctx context.Context, id int64, p EmployeePayload) (*Employee, error) {
+ emp, err := a.EmployeeByID(ctx, id)
+ if err != nil {
+ return nil, ErrNotFound
+ }
+ name := cleanString(p.Name)
+ if name == "" {
+ name = emp.Name
+ }
+ username := cleanPtr(p.TGUsername)
+ if username != nil && len(*username) > 0 && (*username)[0] == '@' {
+ u := (*username)[1:]
+ username = &u
+ }
+ if _, err := a.DB.ExecContext(ctx, `
+ UPDATE employees
+ SET name = ?, portal_user_id = COALESCE(?, portal_user_id), tg_username = ?, tg_chat_id = ?
+ WHERE id = ?`,
+ name, cleanPtr(p.PortalUserID), username, cleanPtr(p.TGChatID), id,
+ ); err != nil {
+ return nil, err
+ }
+ return a.EmployeeByID(ctx, id)
+}
+
+func (a *App) DeleteEmployee(ctx context.Context, id int64) error {
+ var count int64
+ if err := a.DB.QueryRowContext(ctx, `SELECT count(*) FROM projects WHERE owner_id = ?`, id).Scan(&count); err != nil {
+ return err
+ }
+ if count > 0 {
+ return fmt.Errorf("employee has projects")
+ }
+ res, err := a.DB.ExecContext(ctx, `DELETE FROM employees WHERE id = ?`, id)
+ if err != nil {
+ return err
+ }
+ affected, _ := res.RowsAffected()
+ if affected == 0 {
+ return ErrNotFound
+ }
+ return nil
+}
+
+func (a *App) LinkTelegram(ctx context.Context, portalUserID, chatID, username, name string) (*Employee, error) {
+ if existing, err := a.EmployeeByChatID(ctx, chatID); err == nil {
+ if existing.PortalUserID != nil && *existing.PortalUserID != "" && *existing.PortalUserID != portalUserID {
+ return nil, fmt.Errorf("telegram belongs to another portal user")
+ }
+ if existing.PortalUserID == nil || *existing.PortalUserID == "" {
+ if _, err := a.DB.ExecContext(ctx, `
+ UPDATE employees SET portal_user_id = ?, tg_username = ? WHERE id = ?`,
+ portalUserID, nullIfEmpty(username), existing.ID,
+ ); err != nil {
+ return nil, err
+ }
+ }
+ return a.EmployeeByID(ctx, existing.ID)
+ }
+ if emp, err := a.EmployeeByPortalUserID(ctx, portalUserID); err == nil {
+ if emp.TGChatID != nil && *emp.TGChatID != "" && *emp.TGChatID != chatID {
+ return nil, fmt.Errorf("portal user belongs to another telegram")
+ }
+ _, err := a.DB.ExecContext(ctx, `
+ UPDATE employees SET tg_chat_id = ?, tg_username = ?, name = COALESCE(NULLIF(name, ''), ?) WHERE id = ?`,
+ chatID, nullIfEmpty(username), name, emp.ID,
+ )
+ if err != nil {
+ return nil, err
+ }
+ return a.EmployeeByID(ctx, emp.ID)
+ }
+ res, err := a.DB.ExecContext(ctx, `
+ INSERT INTO employees (name, portal_user_id, tg_chat_id, tg_username, created_at)
+ VALUES (?, ?, ?, ?, ?)`,
+ name, portalUserID, chatID, nullIfEmpty(username), dbNow(),
+ )
+ if err != nil {
+ return nil, err
+ }
+ id, _ := res.LastInsertId()
+ return a.EmployeeByID(ctx, id)
+}
+
+func employeeSelect() string {
+ return `
+ SELECT e.id, e.name, e.portal_user_id, e.tg_chat_id, e.tg_username, e.created_at,
+ (SELECT count(*) FROM projects p WHERE p.owner_id = e.id) AS projects_total
+ FROM employees e`
+}
+
+type rowScanner interface {
+ Scan(dest ...any) error
+}
+
+func scanEmployee(row rowScanner) (*Employee, error) {
+ var emp Employee
+ var portal, chat, username, created sql.NullString
+ if err := row.Scan(&emp.ID, &emp.Name, &portal, &chat, &username, &created, &emp.ProjectsTotal); err != nil {
+ return nil, err
+ }
+ emp.PortalUserID = nullableString(portal)
+ emp.TGChatID = nullableString(chat)
+ emp.TGUsername = nullableString(username)
+ emp.CreatedAt = timeOut(created)
+ return &emp, nil
+}
+
+func scanEmployees(rows *sql.Rows) ([]Employee, error) {
+ items := []Employee{}
+ for rows.Next() {
+ item, err := scanEmployee(rows)
+ if err != nil {
+ return nil, err
+ }
+ items = append(items, *item)
+ }
+ return items, rows.Err()
+}
+
+type ProjectPayload struct {
+ Title string `json:"title"`
+ DealType string `json:"deal_type"`
+ OurPrice *float64 `json:"our_price"`
+ Notes *string `json:"notes"`
+ DLDPermit *string `json:"dld_permit"`
+ Building *string `json:"building"`
+ Bedrooms *int64 `json:"bedrooms"`
+ SizeSqft *float64 `json:"size_sqft"`
+ OurURL *string `json:"our_url"`
+}
+
+func (a *App) Summary(ctx context.Context, emp *Employee) (map[string]any, error) {
+ out := map[string]any{
+ "projects_total": 0,
+ "employees_total": 0,
+ "listings_total": 0,
+ "listings_active": 0,
+ "listings_removed": 0,
+ "scrape_interval_hours": a.Cfg.ScrapeIntervalHours,
+ "bayut_enabled": false,
+ }
+ if emp == nil {
+ return out, nil
+ }
+ var projects int64
+ var listings, active, removed sql.NullInt64
+ err := a.DB.QueryRowContext(ctx, `SELECT count(*) FROM projects WHERE owner_id = ?`, emp.ID).Scan(&projects)
+ if err != nil {
+ return nil, err
+ }
+ err = a.DB.QueryRowContext(ctx, `
+ SELECT count(*),
+ sum(CASE WHEN status IN ('ACTIVE','active') THEN 1 ELSE 0 END),
+ sum(CASE WHEN status IN ('REMOVED','removed') THEN 1 ELSE 0 END)
+ FROM competitor_listings l JOIN projects p ON p.id = l.project_id
+ WHERE p.owner_id = ?`, emp.ID).Scan(&listings, &active, &removed)
+ if err != nil {
+ return nil, err
+ }
+ out["projects_total"] = projects
+ out["employees_total"] = 1
+ out["listings_total"] = nullIntValue(listings)
+ out["listings_active"] = nullIntValue(active)
+ out["listings_removed"] = nullIntValue(removed)
+ return out, nil
+}
+
+func (a *App) ListProjects(ctx context.Context, ownerID int64) ([]Project, error) {
+ rows, err := a.DB.QueryContext(ctx, projectSelect()+` WHERE p.owner_id = ? ORDER BY p.created_at DESC`, ownerID)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ items := []Project{}
+ for rows.Next() {
+ p, err := a.scanProject(rows, false)
+ if err != nil {
+ return nil, err
+ }
+ items = append(items, *p)
+ }
+ return items, rows.Err()
+}
+
+func (a *App) ProjectByID(ctx context.Context, ownerID, projectID int64, detail bool) (*Project, error) {
+ row := a.DB.QueryRowContext(ctx, projectSelect()+` WHERE p.id = ? AND p.owner_id = ?`, projectID, ownerID)
+ p, err := a.scanProject(row, detail)
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, ErrNotFound
+ }
+ if err != nil {
+ return nil, err
+ }
+ return p, nil
+}
+
+func (a *App) CreateProject(ctx context.Context, ownerID int64, p ProjectPayload) (*Project, error) {
+ title := cleanString(p.Title)
+ if title == "" {
+ return nil, fmt.Errorf("title is required")
+ }
+ deal, err := enumDealIn(p.DealType)
+ if err != nil {
+ return nil, err
+ }
+ res, err := a.DB.ExecContext(ctx, `
+ INSERT INTO projects
+ (title, deal_type, owner_id, our_price, notes, dld_permit, building, bedrooms, size_sqft, our_url, created_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ title, deal, ownerID, p.OurPrice, cleanPtr(p.Notes), cleanPtr(p.DLDPermit), cleanPtr(p.Building),
+ p.Bedrooms, p.SizeSqft, cleanPtr(p.OurURL), dbNow(),
+ )
+ if err != nil {
+ return nil, err
+ }
+ id, _ := res.LastInsertId()
+ return a.ProjectByID(ctx, ownerID, id, true)
+}
+
+func (a *App) UpdateProject(ctx context.Context, ownerID, projectID int64, p ProjectPayload) (*Project, error) {
+ current, err := a.ProjectByID(ctx, ownerID, projectID, false)
+ if err != nil {
+ return nil, err
+ }
+ title := cleanString(p.Title)
+ if title == "" {
+ title = current.Title
+ }
+ deal := "SALE"
+ if current.DealType == "rent" {
+ deal = "RENT"
+ }
+ if p.DealType != "" {
+ deal, err = enumDealIn(p.DealType)
+ if err != nil {
+ return nil, err
+ }
+ }
+ _, err = a.DB.ExecContext(ctx, `
+ UPDATE projects
+ SET title = ?, deal_type = ?, our_price = ?, notes = ?, dld_permit = ?,
+ building = ?, bedrooms = ?, size_sqft = ?, our_url = ?
+ WHERE id = ? AND owner_id = ?`,
+ title, deal, p.OurPrice, cleanPtr(p.Notes), cleanPtr(p.DLDPermit), cleanPtr(p.Building),
+ p.Bedrooms, p.SizeSqft, cleanPtr(p.OurURL), projectID, ownerID,
+ )
+ if err != nil {
+ return nil, err
+ }
+ return a.ProjectByID(ctx, ownerID, projectID, true)
+}
+
+func (a *App) DeleteProject(ctx context.Context, ownerID, projectID int64) error {
+ tx, err := a.DB.BeginTx(ctx, nil)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+ listingRows, err := tx.QueryContext(ctx, `SELECT id FROM competitor_listings WHERE project_id = ?`, projectID)
+ if err != nil {
+ return err
+ }
+ listingIDs := []int64{}
+ for listingRows.Next() {
+ var id int64
+ if err := listingRows.Scan(&id); err != nil {
+ listingRows.Close()
+ return err
+ }
+ listingIDs = append(listingIDs, id)
+ }
+ listingRows.Close()
+ for _, id := range listingIDs {
+ if _, err := tx.ExecContext(ctx, `DELETE FROM price_history WHERE listing_id = ?`, id); err != nil {
+ return err
+ }
+ }
+ if _, err := tx.ExecContext(ctx, `DELETE FROM competitor_listings WHERE project_id = ?`, projectID); err != nil {
+ return err
+ }
+ res, err := tx.ExecContext(ctx, `DELETE FROM projects WHERE id = ? AND owner_id = ?`, projectID, ownerID)
+ if err != nil {
+ return err
+ }
+ affected, _ := res.RowsAffected()
+ if affected == 0 {
+ return ErrNotFound
+ }
+ return tx.Commit()
+}
+
+func (a *App) DeleteListing(ctx context.Context, ownerID, listingID int64) error {
+ var id int64
+ err := a.DB.QueryRowContext(ctx, `
+ SELECT l.id
+ FROM competitor_listings l JOIN projects p ON p.id = l.project_id
+ WHERE l.id = ? AND p.owner_id = ?`, listingID, ownerID).Scan(&id)
+ if errors.Is(err, sql.ErrNoRows) {
+ return ErrNotFound
+ }
+ if err != nil {
+ return err
+ }
+ tx, err := a.DB.BeginTx(ctx, nil)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+ if _, err := tx.ExecContext(ctx, `DELETE FROM price_history WHERE listing_id = ?`, id); err != nil {
+ return err
+ }
+ if _, err := tx.ExecContext(ctx, `DELETE FROM competitor_listings WHERE id = ?`, id); err != nil {
+ return err
+ }
+ return tx.Commit()
+}
+
+func (a *App) ListingByID(ctx context.Context, id int64, withHistory bool) (*Listing, error) {
+ row := a.DB.QueryRowContext(ctx, listingSelect()+` WHERE l.id = ?`, id)
+ item, err := scanListing(row, withHistory)
+ if err != nil {
+ return nil, err
+ }
+ if withHistory {
+ history, err := a.PriceHistory(ctx, id)
+ if err != nil {
+ return nil, err
+ }
+ item.PriceHistory = history
+ }
+ return item, nil
+}
+
+func projectSelect() string {
+ return `
+ SELECT p.id, p.title, p.deal_type, p.our_price, p.notes, p.dld_permit, p.building,
+ p.bedrooms, p.size_sqft, p.our_url, p.owner_id, p.created_at, p.last_checked_at,
+ (SELECT count(*) FROM competitor_listings l WHERE l.project_id = p.id),
+ (SELECT count(*) FROM competitor_listings l WHERE l.project_id = p.id AND l.status IN ('ACTIVE','active')),
+ (SELECT count(*) FROM competitor_listings l WHERE l.project_id = p.id AND l.status IN ('REMOVED','removed')),
+ (SELECT min(l.current_price) FROM competitor_listings l WHERE l.project_id = p.id AND l.status IN ('ACTIVE','active') AND l.current_price IS NOT NULL)
+ FROM projects p`
+}
+
+func (a *App) scanProject(row rowScanner, detail bool) (*Project, error) {
+ var p Project
+ var deal string
+ var price, size, minPrice sql.NullFloat64
+ var notes, permit, building, ourURL, created, checked sql.NullString
+ var bedrooms sql.NullInt64
+ if err := row.Scan(
+ &p.ID, &p.Title, &deal, &price, ¬es, &permit, &building, &bedrooms, &size, &ourURL,
+ &p.OwnerID, &created, &checked, &p.ListingsTotal, &p.ListingsActive, &p.ListingsRemoved, &minPrice,
+ ); err != nil {
+ return nil, err
+ }
+ p.DealType = enumDealOut(deal)
+ p.OurPrice = nullableFloat(price)
+ p.Notes = nullableString(notes)
+ p.DLDPermit = nullableString(permit)
+ p.Building = nullableString(building)
+ p.Bedrooms = nullableInt(bedrooms)
+ p.SizeSqft = nullableFloat(size)
+ p.OurURL = nullableString(ourURL)
+ p.CreatedAt = timeOut(created)
+ p.LastCheckedAt = timeOut(checked)
+ p.MinCompetitorPrice = nullableFloat(minPrice)
+ if owner, err := a.EmployeeByID(context.Background(), p.OwnerID); err == nil {
+ p.Owner = owner
+ }
+ if detail {
+ listings, err := a.ListingsForProject(context.Background(), p.ID, true)
+ if err != nil {
+ return nil, err
+ }
+ p.Listings = listings
+ }
+ return &p, nil
+}
+
+func (a *App) ListingsForProject(ctx context.Context, projectID int64, withHistory bool) ([]Listing, error) {
+ rows, err := a.DB.QueryContext(ctx, listingSelect()+` WHERE l.project_id = ? ORDER BY l.first_seen_at DESC`, projectID)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ items := []Listing{}
+ for rows.Next() {
+ item, err := scanListing(rows, false)
+ if err != nil {
+ return nil, err
+ }
+ if withHistory {
+ item.PriceHistory, err = a.PriceHistory(ctx, item.ID)
+ if err != nil {
+ return nil, err
+ }
+ }
+ items = append(items, *item)
+ }
+ return items, rows.Err()
+}
+
+func (a *App) PriceHistory(ctx context.Context, listingID int64) ([]PricePoint, error) {
+ rows, err := a.DB.QueryContext(ctx, `
+ SELECT id, price, recorded_at
+ FROM price_history
+ WHERE listing_id = ?
+ ORDER BY recorded_at DESC`, listingID)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ out := []PricePoint{}
+ for rows.Next() {
+ var p PricePoint
+ var price sql.NullFloat64
+ var recorded sql.NullString
+ if err := rows.Scan(&p.ID, &price, &recorded); err != nil {
+ return nil, err
+ }
+ p.Price = nullableFloat(price)
+ p.RecordedAt = timeOut(recorded)
+ out = append(out, p)
+ }
+ return out, rows.Err()
+}
+
+func listingSelect() string {
+ return `
+ SELECT l.id, l.project_id, l.source, l.external_id, l.url, l.title, l.agent_name,
+ l.agency_name, l.current_price, l.currency, l.status, l.first_seen_at, l.last_seen_at
+ FROM competitor_listings l`
+}
+
+func scanListing(row rowScanner, _ bool) (*Listing, error) {
+ var l Listing
+ var source, status string
+ var title, agent, agency, currency, firstSeen, lastSeen sql.NullString
+ var price sql.NullFloat64
+ if err := row.Scan(
+ &l.ID, &l.ProjectID, &source, &l.ExternalID, &l.URL, &title, &agent, &agency,
+ &price, ¤cy, &status, &firstSeen, &lastSeen,
+ ); err != nil {
+ return nil, err
+ }
+ l.Source = enumSourceOut(source)
+ l.Title = nullableString(title)
+ l.AgentName = nullableString(agent)
+ l.AgencyName = nullableString(agency)
+ l.CurrentPrice = nullableFloat(price)
+ l.Currency = nullableString(currency)
+ l.Status = enumStatusOut(status)
+ l.FirstSeenAt = timeOut(firstSeen)
+ l.LastSeenAt = timeOut(lastSeen)
+ return &l, nil
+}
+
+func nullIfEmpty(value string) *string {
+ value = cleanString(value)
+ if value == "" {
+ return nil
+ }
+ return &value
+}
+
+func nullIntValue(value sql.NullInt64) int64 {
+ if !value.Valid {
+ return 0
+ }
+ return value.Int64
+}
diff --git a/internal/pf/telegram.go b/internal/pf/telegram.go
new file mode 100644
index 0000000..4e3f2b6
--- /dev/null
+++ b/internal/pf/telegram.go
@@ -0,0 +1,129 @@
+package pf
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/http"
+ "net/url"
+ "strings"
+ "time"
+)
+
+type Telegram struct {
+ token string
+ client *http.Client
+}
+
+type tgResponse[T any] struct {
+ OK bool `json:"ok"`
+ Description string `json:"description"`
+ Result T `json:"result"`
+}
+
+type TGUpdate struct {
+ UpdateID int64 `json:"update_id"`
+ Message *TGMessage `json:"message"`
+}
+
+type TGMessage struct {
+ MessageID int64 `json:"message_id"`
+ Text string `json:"text"`
+ Chat TGChat `json:"chat"`
+ From *TGUser `json:"from"`
+}
+
+type TGChat struct {
+ ID int64 `json:"id"`
+}
+
+type TGUser struct {
+ ID int64 `json:"id"`
+ Username string `json:"username"`
+ FirstName string `json:"first_name"`
+ LastName string `json:"last_name"`
+}
+
+func NewTelegram(token string) *Telegram {
+ return &Telegram{token: token, client: &http.Client{Timeout: 35 * time.Second}}
+}
+
+func (t *Telegram) Enabled() bool {
+ return strings.TrimSpace(t.token) != ""
+}
+
+func (t *Telegram) SendMessage(ctx context.Context, chatID string, text string) error {
+ if !t.Enabled() {
+ return nil
+ }
+ payload := map[string]any{
+ "chat_id": chatID,
+ "text": text,
+ "parse_mode": "HTML",
+ "disable_web_page_preview": false,
+ }
+ body, _ := json.Marshal(payload)
+ req, err := http.NewRequestWithContext(ctx, http.MethodPost, t.apiURL("sendMessage"), bytes.NewReader(body))
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ resp, err := t.client.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ var out tgResponse[json.RawMessage]
+ if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
+ return err
+ }
+ if !out.OK {
+ return errors.New(out.Description)
+ }
+ return nil
+}
+
+func (t *Telegram) GetUpdates(ctx context.Context, offset int64) ([]TGUpdate, error) {
+ if !t.Enabled() {
+ return nil, fmt.Errorf("TG_BOT_TOKEN не задан в k8s/secrets.yaml")
+ }
+ values := url.Values{}
+ values.Set("timeout", "25")
+ if offset > 0 {
+ values.Set("offset", fmt.Sprintf("%d", offset))
+ }
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, t.apiURL("getUpdates")+"?"+values.Encode(), nil)
+ if err != nil {
+ return nil, err
+ }
+ resp, err := t.client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+ var out tgResponse[[]TGUpdate]
+ if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
+ return nil, err
+ }
+ if !out.OK {
+ return nil, errors.New(out.Description)
+ }
+ return out.Result, nil
+}
+
+func (t *Telegram) apiURL(method string) string {
+ return "https://api.telegram.org/bot" + t.token + "/" + method
+}
+
+func (u TGUser) FullName() string {
+ name := strings.TrimSpace(strings.TrimSpace(u.FirstName) + " " + strings.TrimSpace(u.LastName))
+ if name != "" {
+ return name
+ }
+ if u.Username != "" {
+ return u.Username
+ }
+ return fmt.Sprintf("user_%d", u.ID)
+}
diff --git a/internal/pf/worker.go b/internal/pf/worker.go
new file mode 100644
index 0000000..b186faf
--- /dev/null
+++ b/internal/pf/worker.go
@@ -0,0 +1,134 @@
+package pf
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "os/exec"
+ "time"
+)
+
+type Worker struct {
+ python string
+ module string
+}
+
+type workerError struct {
+ Error string `json:"error"`
+}
+
+type workerListing struct {
+ ListingID int64 `json:"listing_id"`
+ Error string `json:"error"`
+}
+
+type BulkResult struct {
+ Added int `json:"added"`
+ Skipped int `json:"skipped"`
+ Errors []string `json:"errors"`
+}
+
+type CheckResult struct {
+ Changes int `json:"changes"`
+}
+
+type Suggestion struct {
+ Source string `json:"source"`
+ ExternalID string `json:"external_id"`
+ URL string `json:"url"`
+ Title *string `json:"title"`
+ Price *float64 `json:"price"`
+ Currency *string `json:"currency"`
+ PermitNumber *string `json:"permit_number"`
+ AgentName *string `json:"agent_name"`
+ AgencyName *string `json:"agency_name"`
+ IsActive bool `json:"is_active"`
+}
+
+type SuggestionsResponse struct {
+ OurPermit *string `json:"our_permit"`
+ BayutEnabled bool `json:"bayut_enabled"`
+ Suggestions struct {
+ PropertyFinder []Suggestion `json:"propertyfinder"`
+ Bayut []Suggestion `json:"bayut"`
+ } `json:"suggestions"`
+}
+
+func NewWorker(cfg Config) *Worker {
+ return &Worker{python: cfg.WorkerPython, module: cfg.WorkerModule}
+}
+
+func (w *Worker) AddListing(ctx context.Context, projectID int64, url string) (int64, error) {
+ var out workerListing
+ err := w.call(ctx, "add-listing", map[string]any{"project_id": projectID, "url": url}, &out)
+ if err != nil {
+ return 0, err
+ }
+ if out.Error != "" {
+ return 0, errors.New(out.Error)
+ }
+ return out.ListingID, nil
+}
+
+func (w *Worker) AddListings(ctx context.Context, projectID int64, urls []string) (*BulkResult, error) {
+ var out BulkResult
+ if err := w.call(ctx, "add-listings", map[string]any{"project_id": projectID, "urls": urls}, &out); err != nil {
+ return nil, err
+ }
+ return &out, nil
+}
+
+func (w *Worker) CheckProject(ctx context.Context, projectID int64) (int, error) {
+ var out CheckResult
+ if err := w.call(ctx, "check-project", map[string]any{"project_id": projectID}, &out); err != nil {
+ return 0, err
+ }
+ return out.Changes, nil
+}
+
+func (w *Worker) CheckAll(ctx context.Context) (map[string]int, error) {
+ var out map[string]int
+ if err := w.call(ctx, "check-all", map[string]any{}, &out); err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+func (w *Worker) Suggest(ctx context.Context, projectID int64) (*SuggestionsResponse, error) {
+ var out SuggestionsResponse
+ if err := w.call(ctx, "suggest", map[string]any{"project_id": projectID}, &out); err != nil {
+ return nil, err
+ }
+ return &out, nil
+}
+
+func (w *Worker) call(ctx context.Context, command string, payload any, out any) error {
+ ctx, cancel := context.WithTimeout(ctx, 15*time.Minute)
+ defer cancel()
+
+ body, err := json.Marshal(payload)
+ if err != nil {
+ return err
+ }
+ cmd := exec.CommandContext(ctx, w.python, "-m", w.module, command)
+ cmd.Stdin = bytes.NewReader(body)
+ var stdout, stderr bytes.Buffer
+ cmd.Stdout = &stdout
+ cmd.Stderr = &stderr
+ if err := cmd.Run(); err != nil {
+ var apiErr workerError
+ if json.Unmarshal(stdout.Bytes(), &apiErr) == nil && apiErr.Error != "" {
+ return errors.New(apiErr.Error)
+ }
+ if stderr.Len() > 0 {
+ return errors.New(stderr.String())
+ }
+ return err
+ }
+ if err := json.Unmarshal(stdout.Bytes(), out); err != nil {
+ return fmt.Errorf("worker json decode failed: %w", err)
+ }
+ return nil
+}
diff --git a/k8s/server-deployment.yaml b/k8s/server-deployment.yaml
index d9ffe50..fe366a5 100644
--- a/k8s/server-deployment.yaml
+++ b/k8s/server-deployment.yaml
@@ -31,7 +31,7 @@ spec:
containers:
- name: web
image: localhost:30300/admin/monitoring-pf-server:latest
- command: ["python", "run_web.py"]
+ command: ["/usr/local/bin/monitoring-pf-server"]
ports:
- containerPort: 8000
envFrom:
@@ -67,7 +67,7 @@ spec:
memory: 768Mi
- name: bot
image: localhost:30300/admin/monitoring-pf-server:latest
- command: ["python", "-m", "app.bot"]
+ command: ["/usr/local/bin/monitoring-pf-bot"]
envFrom:
- configMapRef:
name: monitoring-pf-config
@@ -85,7 +85,7 @@ spec:
memory: 384Mi
- name: scheduler
image: localhost:30300/admin/monitoring-pf-server:latest
- command: ["python", "-m", "app.scheduler"]
+ command: ["/usr/local/bin/monitoring-pf-scheduler"]
envFrom:
- configMapRef:
name: monitoring-pf-config
diff --git a/requirements.txt b/requirements.txt
index 7cb0320..9689c4b 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,11 +1,7 @@
-fastapi==0.115.6
-uvicorn[standard]==0.32.1
sqlalchemy==2.0.36
httpx==0.28.1
brotli==1.2.0 # lets httpx decode Content-Encoding: br (Bayut serves Brotli)
beautifulsoup4==4.12.3
lxml==5.3.0
-apscheduler==3.11.0
-python-telegram-bot==21.9
pydantic==2.10.4
pydantic-settings==2.7.0
diff --git a/run_web.py b/run_web.py
deleted file mode 100644
index 716d105..0000000
--- a/run_web.py
+++ /dev/null
@@ -1,13 +0,0 @@
-"""Convenience launcher for the web UI: `python run_web.py`."""
-
-import uvicorn
-
-from app.config import settings
-
-if __name__ == "__main__":
- uvicorn.run(
- "app.web:app",
- host=settings.web_host,
- port=settings.web_port,
- reload=False,
- )