"""Internal JSON worker for Go processes. The Go API/bot/scheduler own infrastructure concerns. Python stays here for PropertyFinder/Bayut scraping and the existing SQLAlchemy monitoring logic. """ from __future__ import annotations import json import sys from typing import Any from app.db import SessionLocal, init_db from app.models import Project from app.services.monitor import ( BAYUT_ENABLED, add_competitor_url, add_competitor_urls, resolve_our_permit, run_check_all, run_check_for_project, suggest_similar, ) def _read_payload() -> dict[str, Any]: raw = sys.stdin.read().strip() if not raw: return {} return json.loads(raw) def _write(payload: Any) -> None: json.dump(payload, sys.stdout, ensure_ascii=False) sys.stdout.write("\n") def _fail(message: str, status: int = 1) -> None: _write({"error": message}) raise SystemExit(status) def _suggestion_out(item: Any) -> dict[str, Any]: return { "source": item.source, "external_id": item.external_id, "url": item.url, "title": item.title, "price": item.price, "currency": item.currency, "permit_number": item.permit_number, "agent_name": item.agent_name, "agency_name": item.agency_name, "is_active": item.is_active, } def cmd_add_listing(payload: dict[str, Any]) -> None: project_id = int(payload.get("project_id") or 0) url = str(payload.get("url") or "") db = SessionLocal() try: project = db.get(Project, project_id) if not project: _fail("project not found") listing, err = add_competitor_url(db, project, url) if err: _fail(err) _write({"listing_id": listing.id}) finally: db.close() def cmd_add_listings(payload: dict[str, Any]) -> None: project_id = int(payload.get("project_id") or 0) urls = payload.get("urls") or [] db = SessionLocal() try: project = db.get(Project, project_id) if not project: _fail("project not found") _write(add_competitor_urls(db, project, urls)) finally: db.close() def cmd_check_project(payload: dict[str, Any]) -> None: project_id = int(payload.get("project_id") or 0) _write({"changes": run_check_for_project(project_id)}) def cmd_check_all(_: dict[str, Any]) -> None: summary = run_check_all() _write({str(k): v for k, v in summary.items()}) def cmd_suggest(payload: dict[str, Any]) -> None: project_id = int(payload.get("project_id") or 0) db = SessionLocal() try: project = db.get(Project, project_id) if not project: _fail("project not found") permit = resolve_our_permit(project) suggestions = suggest_similar(project, our_permit=permit) _write({ "our_permit": permit, "bayut_enabled": BAYUT_ENABLED, "suggestions": { "propertyfinder": [_suggestion_out(item) for item in suggestions["propertyfinder"]], "bayut": [_suggestion_out(item) for item in suggestions["bayut"]], }, }) finally: db.close() COMMANDS = { "add-listing": cmd_add_listing, "add-listings": cmd_add_listings, "check-project": cmd_check_project, "check-all": cmd_check_all, "suggest": cmd_suggest, } def main() -> None: if len(sys.argv) < 2 or sys.argv[1] not in COMMANDS: _fail("unknown worker command") init_db() payload = _read_payload() COMMANDS[sys.argv[1]](payload) if __name__ == "__main__": main()