"""Internal JSON worker for Go processes. The Go API/bot/scheduler own infrastructure concerns. Python stays here for PropertyFinder/Bayut scraping and the existing SQLAlchemy monitoring logic. """ from __future__ import annotations import json import sys from typing import Any from app.db import SessionLocal, init_db from app.models import Project from app.services.monitor import ( BAYUT_ENABLED, add_competitor_url, _format_listing_added, notify_project_changes, run_check_all, run_check_for_project, sync_permit_competitors, ) def _read_payload() -> dict[str, Any]: raw = sys.stdin.read().strip() if not raw: return {} return json.loads(raw) def _write(payload: Any) -> None: json.dump(payload, sys.stdout, ensure_ascii=False) sys.stdout.write("\n") def _fail(message: str, status: int = 1) -> None: _write({"error": message}) raise SystemExit(status) def _suggestion_out(item: Any) -> dict[str, Any]: return { "source": item.source, "external_id": item.external_id, "url": item.url, "title": item.title, "price": item.price, "currency": item.currency, "permit_number": item.permit_number, "agent_name": item.agent_name, "agency_name": item.agency_name, "is_active": item.is_active, } def cmd_add_listing(payload: dict[str, Any]) -> None: project_id = int(payload.get("project_id") or 0) url = str(payload.get("url") or "") db = SessionLocal() try: project = db.get(Project, project_id) if not project: _fail("project not found") listing, err = add_competitor_url(db, project, url) if err: _fail(err) notify_project_changes(project, [_format_listing_added(project, listing, auto=False)]) _write({"listing_id": listing.id}) finally: db.close() def cmd_add_listings(payload: dict[str, Any]) -> None: project_id = int(payload.get("project_id") or 0) urls = payload.get("urls") or [] db = SessionLocal() try: project = db.get(Project, project_id) if not project: _fail("project not found") added = 0 skipped = 0 errors: list[str] = [] notifications: list[str] = [] seen: set[str] = set() for raw in urls: url = str(raw or "").strip() if not url or url in seen: continue seen.add(url) listing, err = add_competitor_url(db, project, url) if err == "Это объявление уже добавлено в проект": skipped += 1 elif err: errors.append(err) else: added += 1 notifications.append(_format_listing_added(project, listing, auto=False)) if notifications: notify_project_changes(project, notifications) _write({"added": added, "skipped": skipped, "errors": errors}) finally: db.close() def cmd_check_project(payload: dict[str, Any]) -> None: project_id = int(payload.get("project_id") or 0) _write({"changes": run_check_for_project(project_id)}) def cmd_check_all(_: dict[str, Any]) -> None: summary = run_check_all() _write({str(k): v for k, v in summary.items()}) def cmd_suggest(payload: dict[str, Any]) -> None: project_id = int(payload.get("project_id") or 0) db = SessionLocal() try: project = db.get(Project, project_id) if not project: _fail("project not found") changes, suggestions, permit = sync_permit_competitors(db, project) db.commit() if changes: notify_project_changes(project, changes) _write({ "our_permit": permit, "bayut_enabled": BAYUT_ENABLED, "suggestions": { "propertyfinder": [_suggestion_out(item) for item in suggestions["propertyfinder"]], "bayut": [_suggestion_out(item) for item in suggestions["bayut"]], }, }) finally: db.close() COMMANDS = { "add-listing": cmd_add_listing, "add-listings": cmd_add_listings, "check-project": cmd_check_project, "check-all": cmd_check_all, "suggest": cmd_suggest, } def main() -> None: if len(sys.argv) < 2 or sys.argv[1] not in COMMANDS: _fail("unknown worker command") init_db() payload = _read_payload() COMMANDS[sys.argv[1]](payload) if __name__ == "__main__": main()