"""FastAPI API for Monitoring PF. The user interface lives in Portal. This service exposes only JSON endpoints and trusts Portal-provided headers for admin state. """ from __future__ import annotations from datetime import datetime from typing import Any from fastapi import Depends, FastAPI, HTTPException, Request from pydantic import BaseModel, Field from sqlalchemy.orm import Session, joinedload from app.config import settings from app.db import get_db, init_db from app.models import CompetitorListing, DealType, Employee, ListingStatus, Project from app.services.monitor import ( BAYUT_ENABLED, add_competitor_url, add_competitor_urls, resolve_our_permit, run_check_for_project, suggest_similar, ) app = FastAPI(title="Monitoring PF") class EmployeeCreate(BaseModel): name: str = Field(..., min_length=1, max_length=200) tg_username: str | None = Field(None, max_length=200) tg_chat_id: str | None = Field(None, max_length=64) class EmployeeUpdate(BaseModel): name: str | None = Field(None, min_length=1, max_length=200) tg_username: str | None = Field(None, max_length=200) tg_chat_id: str | None = Field(None, max_length=64) class ProjectCreate(BaseModel): title: str = Field(..., min_length=1, max_length=300) deal_type: DealType owner_id: int our_price: float | None = None notes: str | None = None dld_permit: str | None = Field(None, max_length=100) building: str | None = Field(None, max_length=300) bedrooms: int | None = None size_sqft: float | None = None our_url: str | None = None class ProjectUpdate(BaseModel): title: str | None = Field(None, min_length=1, max_length=300) deal_type: DealType | None = None owner_id: int | None = None our_price: float | None = None notes: str | None = None dld_permit: str | None = Field(None, max_length=100) building: str | None = Field(None, max_length=300) bedrooms: int | None = None size_sqft: float | None = None our_url: str | None = None class ListingCreate(BaseModel): url: str = Field(..., min_length=1) class ListingsBulkCreate(BaseModel): urls: list[str] = Field(default_factory=list) @app.on_event("startup") def _startup() -> None: init_db() @app.get("/healthz") def healthz() -> dict[str, str]: return {"status": "ok"} @app.get("/") def index() -> dict[str, str]: return {"service": "monitoring-pf", "ui": "portal"} def _is_admin(request: Request) -> bool: return request.headers.get("x-user-is-admin") == "1" def _require_admin(request: Request) -> None: if not _is_admin(request): raise HTTPException(status_code=404, detail="not found") def _clean(value: str | None) -> str | None: value = (value or "").strip() return value or None def _dt(value: datetime | None) -> str | None: return value.isoformat() if value else None def _employee_out(employee: Employee) -> dict[str, Any]: return { "id": employee.id, "name": employee.name, "tg_chat_id": employee.tg_chat_id, "tg_username": employee.tg_username, "projects_total": len(employee.projects or []), "created_at": _dt(employee.created_at), } def _history_out(listing: CompetitorListing) -> list[dict[str, Any]]: return [ {"id": h.id, "price": h.price, "recorded_at": _dt(h.recorded_at)} for h in listing.price_history ] def _listing_out(listing: CompetitorListing, *, with_history: bool = False) -> dict[str, Any]: out = { "id": listing.id, "project_id": listing.project_id, "source": listing.source.value, "external_id": listing.external_id, "url": listing.url, "title": listing.title, "agent_name": listing.agent_name, "agency_name": listing.agency_name, "current_price": listing.current_price, "currency": listing.currency, "status": listing.status.value, "first_seen_at": _dt(listing.first_seen_at), "last_seen_at": _dt(listing.last_seen_at), } if with_history: out["price_history"] = _history_out(listing) return out def _project_stats(project: Project) -> dict[str, Any]: listings = project.listings or [] active = [l for l in listings if l.status == ListingStatus.ACTIVE] prices = [l.current_price for l in active if l.current_price is not None] return { "listings_total": len(listings), "listings_active": len(active), "listings_removed": len(listings) - len(active), "min_competitor_price": min(prices) if prices else None, } def _project_out(project: Project, *, detail: bool = False) -> dict[str, Any]: out = { "id": project.id, "title": project.title, "deal_type": project.deal_type.value, "our_price": project.our_price, "notes": project.notes, "dld_permit": project.dld_permit, "building": project.building, "bedrooms": project.bedrooms, "size_sqft": project.size_sqft, "our_url": project.our_url, "owner_id": project.owner_id, "owner": _employee_out(project.owner) if project.owner else None, "created_at": _dt(project.created_at), "last_checked_at": _dt(project.last_checked_at), **_project_stats(project), } if detail: out["listings"] = [_listing_out(l, with_history=True) for l in project.listings] return out def _suggestion_out(item: Any) -> dict[str, Any]: return { "source": item.source, "external_id": item.external_id, "url": item.url, "title": item.title, "price": item.price, "currency": item.currency, "permit_number": item.permit_number, "agent_name": item.agent_name, "agency_name": item.agency_name, "is_active": item.is_active, } @app.get("/api/v1/access/me") def access_me(request: Request) -> dict[str, Any]: return {"is_admin": _is_admin(request)} @app.get("/api/v1/summary") def summary(db: Session = Depends(get_db)) -> dict[str, Any]: projects = db.query(Project).options(joinedload(Project.listings)).all() employees = db.query(Employee).all() listings = db.query(CompetitorListing).all() active = [l for l in listings if l.status == ListingStatus.ACTIVE] return { "projects_total": len(projects), "employees_total": len(employees), "listings_total": len(listings), "listings_active": len(active), "listings_removed": len(listings) - len(active), "scrape_interval_hours": settings.scrape_interval_hours, "bayut_enabled": BAYUT_ENABLED, } @app.get("/api/v1/employees") def employees_list(db: Session = Depends(get_db)) -> list[dict[str, Any]]: employees = ( db.query(Employee) .options(joinedload(Employee.projects)) .order_by(Employee.name) .all() ) return [_employee_out(employee) for employee in employees] @app.post("/api/v1/employees", status_code=201) def employee_create(payload: EmployeeCreate, db: Session = Depends(get_db)) -> dict[str, Any]: employee = Employee( name=payload.name.strip(), tg_username=_clean(payload.tg_username).lstrip("@") if _clean(payload.tg_username) else None, tg_chat_id=_clean(payload.tg_chat_id), ) db.add(employee) db.commit() db.refresh(employee) return _employee_out(employee) @app.patch("/api/v1/employees/{employee_id}") def employee_update( employee_id: int, payload: EmployeeUpdate, request: Request, db: Session = Depends(get_db), ) -> dict[str, Any]: _require_admin(request) employee = db.get(Employee, employee_id) if not employee: raise HTTPException(404, "employee not found") if payload.name is not None: employee.name = payload.name.strip() if payload.tg_username is not None: employee.tg_username = payload.tg_username.strip().lstrip("@") or None if payload.tg_chat_id is not None: chat_id = _clean(payload.tg_chat_id) if chat_id and chat_id != employee.tg_chat_id: clash = ( db.query(Employee) .filter(Employee.tg_chat_id == chat_id, Employee.id != employee.id) .first() ) if clash: raise HTTPException(400, f"chat_id already belongs to {clash.name}") employee.tg_chat_id = chat_id db.commit() db.refresh(employee) return _employee_out(employee) @app.delete("/api/v1/employees/{employee_id}", status_code=204) def employee_delete(employee_id: int, request: Request, db: Session = Depends(get_db)) -> None: _require_admin(request) employee = db.get(Employee, employee_id) if not employee: raise HTTPException(404, "employee not found") if employee.projects: raise HTTPException(400, "employee has projects") db.delete(employee) db.commit() @app.get("/api/v1/projects") def projects_list(db: Session = Depends(get_db)) -> list[dict[str, Any]]: projects = ( db.query(Project) .options(joinedload(Project.owner), joinedload(Project.listings)) .order_by(Project.created_at.desc()) .all() ) return [_project_out(project) for project in projects] @app.post("/api/v1/projects", status_code=201) def project_create(payload: ProjectCreate, db: Session = Depends(get_db)) -> dict[str, Any]: owner = db.get(Employee, payload.owner_id) if not owner: raise HTTPException(404, "employee not found") project = Project( title=payload.title.strip(), deal_type=payload.deal_type, owner_id=owner.id, our_price=payload.our_price, notes=_clean(payload.notes), dld_permit=_clean(payload.dld_permit), building=_clean(payload.building), bedrooms=payload.bedrooms, size_sqft=payload.size_sqft, our_url=_clean(payload.our_url), ) db.add(project) db.commit() db.refresh(project) return _project_out(project, detail=True) @app.get("/api/v1/projects/{project_id}") def project_detail(project_id: int, db: Session = Depends(get_db)) -> dict[str, Any]: project = ( db.query(Project) .options( joinedload(Project.owner), joinedload(Project.listings).joinedload(CompetitorListing.price_history), ) .filter(Project.id == project_id) .first() ) if not project: raise HTTPException(404, "project not found") return _project_out(project, detail=True) @app.patch("/api/v1/projects/{project_id}") def project_update(project_id: int, payload: ProjectUpdate, db: Session = Depends(get_db)) -> dict[str, Any]: project = db.get(Project, project_id) if not project: raise HTTPException(404, "project not found") data = payload.model_dump(exclude_unset=True) if "owner_id" in data and data["owner_id"] is not None: owner = db.get(Employee, data["owner_id"]) if not owner: raise HTTPException(404, "employee not found") project.owner_id = owner.id for field in ("title", "deal_type", "our_price", "notes", "dld_permit", "building", "bedrooms", "size_sqft", "our_url"): if field not in data or field == "owner_id": continue value = data[field] if isinstance(value, str): value = _clean(value) setattr(project, field, value) db.commit() return project_detail(project_id, db) @app.delete("/api/v1/projects/{project_id}", status_code=204) def project_delete(project_id: int, request: Request, db: Session = Depends(get_db)) -> None: _require_admin(request) project = db.get(Project, project_id) if not project: raise HTTPException(404, "project not found") db.delete(project) db.commit() @app.post("/api/v1/projects/{project_id}/check") def project_check_now(project_id: int, db: Session = Depends(get_db)) -> dict[str, int]: if not db.get(Project, project_id): raise HTTPException(404, "project not found") db.close() changes = run_check_for_project(project_id) return {"changes": changes} @app.post("/api/v1/projects/{project_id}/listings", status_code=201) def listing_create(project_id: int, payload: ListingCreate, db: Session = Depends(get_db)) -> dict[str, Any]: project = db.get(Project, project_id) if not project: raise HTTPException(404, "project not found") listing, err = add_competitor_url(db, project, payload.url) if err: raise HTTPException(400, err) return _listing_out(listing, with_history=True) @app.post("/api/v1/projects/{project_id}/listings/bulk") def listings_bulk(project_id: int, payload: ListingsBulkCreate, db: Session = Depends(get_db)) -> dict[str, Any]: project = db.get(Project, project_id) if not project: raise HTTPException(404, "project not found") return add_competitor_urls(db, project, payload.urls) @app.delete("/api/v1/listings/{listing_id}", status_code=204) def listing_delete(listing_id: int, request: Request, db: Session = Depends(get_db)) -> None: _require_admin(request) listing = db.get(CompetitorListing, listing_id) if not listing: raise HTTPException(404, "listing not found") db.delete(listing) db.commit() @app.get("/api/v1/projects/{project_id}/suggest") def project_suggest(project_id: int, db: Session = Depends(get_db)) -> dict[str, Any]: project = ( db.query(Project) .options(joinedload(Project.listings)) .filter(Project.id == project_id) .first() ) if not project: raise HTTPException(404, "project not found") permit = resolve_our_permit(project) suggestions = suggest_similar(project, our_permit=permit) return { "our_permit": permit, "bayut_enabled": BAYUT_ENABLED, "suggestions": { "propertyfinder": [_suggestion_out(item) for item in suggestions["propertyfinder"]], "bayut": [_suggestion_out(item) for item in suggestions["bayut"]], }, }