Files
monitoring-pf/app/web.py
2026-06-04 14:55:41 +03:00

387 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""FastAPI web app — UI for managing projects, competitor listings, employees."""
from __future__ import annotations
from datetime import timedelta
from pathlib import Path
from fastapi import Depends, FastAPI, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from urllib.parse import quote
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session, joinedload
from app.auth import (
ADMIN_COOKIE,
COOKIE_MAX_AGE,
admin_configured,
admin_token,
pin_ok,
request_is_admin,
)
from app.config import BASE_DIR, settings
from app.db import get_db, init_db
from app.models import CompetitorListing, DealType, Employee, Project
from app.services.monitor import (
BAYUT_ENABLED,
add_competitor_url,
add_competitor_urls,
resolve_our_permit,
run_check_for_project,
suggest_similar,
)
app = FastAPI(title="DLD Monitor")
templates = Jinja2Templates(directory=str(Path(BASE_DIR) / "app" / "templates"))
# Timestamps are stored as naive UTC (datetime.utcnow). Show them in Moscow time
# (UTC+3) in the UI via the `msk` Jinja filter.
_MSK_OFFSET = timedelta(hours=3)
def _to_msk(dt, fmt: str = "%Y-%m-%d %H:%M"):
if dt is None:
return ""
return (dt + _MSK_OFFSET).strftime(fmt)
templates.env.filters["msk"] = _to_msk
def web_path(path: str = "/") -> str:
base = settings.public_base_path.rstrip("/")
if not path.startswith("/"):
path = "/" + path
return f"{base}{path}" if base else path
templates.env.globals["url_path"] = web_path
@app.on_event("startup")
def _startup() -> None:
init_db()
@app.middleware("http")
async def _admin_state(request: Request, call_next):
"""Expose admin status to every route and template (request.state.is_admin)."""
request.state.is_admin = request_is_admin(request)
request.state.admin_configured = admin_configured()
return await call_next(request)
@app.get("/healthz")
def healthz() -> dict[str, str]:
return {"status": "ok"}
def admin_required(request: Request) -> None:
"""Dependency: block the request unless the session is in admin mode."""
if not request.state.is_admin:
raise HTTPException(403, "Действие доступно только администратору. Войдите в режим админа.")
def _safe_next(next_url: str) -> str:
"""Only allow same-site relative redirects after login."""
return next_url if next_url.startswith("/") and not next_url.startswith("//") else "/"
# --- Admin session --------------------------------------------------------
@app.get("/admin/login", response_class=HTMLResponse)
def admin_login_page(request: Request, next: str = "/", error: str | None = None):
return templates.TemplateResponse(
"admin_login.html",
{"request": request, "next": _safe_next(next), "error": error, "flash": None},
)
@app.post("/admin/login")
def admin_login(request: Request, pin: str = Form(...), next: str = Form("/")):
dest = _safe_next(next)
if not pin_ok(pin):
return templates.TemplateResponse(
"admin_login.html",
{"request": request, "next": dest, "error": "Неверный PIN", "flash": None},
status_code=401,
)
resp = RedirectResponse(web_path(dest), status_code=303)
resp.set_cookie(
ADMIN_COOKIE, admin_token(), max_age=COOKIE_MAX_AGE,
httponly=True, samesite="lax",
)
return resp
@app.post("/admin/logout")
def admin_logout(next: str = Form("/")):
resp = RedirectResponse(web_path(_safe_next(next)), status_code=303)
resp.delete_cookie(ADMIN_COOKIE)
return resp
def _opt_float(s: str) -> float | None:
s = (s or "").strip()
if not s:
return None
try:
return float(s)
except ValueError:
return None
def _opt_int(s: str) -> int | None:
s = (s or "").strip()
if not s:
return None
try:
return int(s)
except ValueError:
return None
# --- Projects -------------------------------------------------------------
@app.get("/", response_class=HTMLResponse)
def projects_list(request: Request, db: Session = Depends(get_db)):
projects = (
db.query(Project)
.options(joinedload(Project.owner), joinedload(Project.listings))
.order_by(Project.created_at.desc())
.all()
)
return templates.TemplateResponse(
"projects_list.html", {"request": request, "projects": projects, "flash": None}
)
@app.get("/projects/new", response_class=HTMLResponse)
def new_project_form(request: Request, db: Session = Depends(get_db)):
employees = db.query(Employee).order_by(Employee.name).all()
return templates.TemplateResponse(
"project_form.html",
{
"request": request,
"employees": employees,
"no_employees": not employees,
"flash": None,
},
)
@app.post("/projects/new")
def create_project(
title: str = Form(...),
deal_type: str = Form(...),
owner_id: int = Form(...),
our_price: str = Form(""),
building: str = Form(""),
bedrooms: str = Form(""),
size_sqft: str = Form(""),
our_url: str = Form(""),
dld_permit: str = Form(""),
notes: str = Form(""),
db: Session = Depends(get_db),
):
owner = db.get(Employee, owner_id)
if not owner:
raise HTTPException(404, "Employee not found")
project = Project(
title=title.strip(),
deal_type=DealType(deal_type),
our_price=_opt_float(our_price),
building=building.strip() or None,
bedrooms=_opt_int(bedrooms),
size_sqft=_opt_float(size_sqft),
our_url=our_url.strip() or None,
dld_permit=dld_permit.strip() or None,
notes=notes.strip() or None,
owner_id=owner.id,
)
db.add(project)
db.commit()
return RedirectResponse(web_path(f"/projects/{project.id}"), status_code=303)
@app.get("/projects/{project_id}", response_class=HTMLResponse)
def project_detail(
project_id: int,
request: Request,
error: str | None = None,
message: str | None = None,
db: Session = Depends(get_db),
):
project = (
db.query(Project)
.options(joinedload(Project.owner), joinedload(Project.listings))
.filter(Project.id == project_id)
.first()
)
if not project:
raise HTTPException(404, "Project not found")
return templates.TemplateResponse(
"project_detail.html",
{"request": request, "project": project, "error": error, "message": message, "flash": None},
)
@app.post("/projects/{project_id}/check")
def project_check_now(project_id: int, db: Session = Depends(get_db)):
if not db.get(Project, project_id):
raise HTTPException(404, "Project not found")
db.close()
run_check_for_project(project_id)
return RedirectResponse(web_path(f"/projects/{project_id}"), status_code=303)
@app.post("/projects/{project_id}/delete")
def project_delete(project_id: int, db: Session = Depends(get_db), _: None = Depends(admin_required)):
project = db.get(Project, project_id)
if not project:
raise HTTPException(404, "Project not found")
db.delete(project)
db.commit()
return RedirectResponse(web_path("/"), status_code=303)
# --- Competitor listings (per project) ------------------------------------
@app.post("/projects/{project_id}/listings")
def add_listing(project_id: int, url: str = Form(...), db: Session = Depends(get_db)):
project = db.get(Project, project_id)
if not project:
raise HTTPException(404, "Project not found")
listing, err = add_competitor_url(db, project, url)
if err:
return RedirectResponse(web_path(f"/projects/{project_id}?error={quote(err)}"), status_code=303)
return RedirectResponse(
web_path(f"/projects/{project_id}?message=Добавлено"), status_code=303,
)
@app.post("/projects/{project_id}/listings/bulk")
def add_listings_bulk(
project_id: int,
urls: list[str] = Form(default=[]),
db: Session = Depends(get_db),
):
project = db.get(Project, project_id)
if not project:
raise HTTPException(404, "Project not found")
if not urls:
return RedirectResponse(
web_path(f"/projects/{project_id}?error={quote('Ничего не выбрано')}"),
status_code=303,
)
result = add_competitor_urls(db, project, urls)
parts = [f"Добавлено: {result['added']}"]
if result["skipped"]:
parts.append(f"уже были: {result['skipped']}")
if result["errors"]:
parts.append(f"ошибок: {len(result['errors'])}")
msg = " · ".join(parts)
return RedirectResponse(
web_path(f"/projects/{project_id}?message={quote(msg)}"), status_code=303,
)
@app.post("/listings/{listing_id}/delete")
def listing_delete(listing_id: int, db: Session = Depends(get_db), _: None = Depends(admin_required)):
listing = db.get(CompetitorListing, listing_id)
if not listing:
raise HTTPException(404, "Listing not found")
project_id = listing.project_id
db.delete(listing)
db.commit()
return RedirectResponse(web_path(f"/projects/{project_id}"), status_code=303)
@app.get("/projects/{project_id}/suggest", response_class=HTMLResponse)
def project_suggest(project_id: int, request: Request, db: Session = Depends(get_db)):
project = (
db.query(Project)
.options(joinedload(Project.listings))
.filter(Project.id == project_id)
.first()
)
if not project:
raise HTTPException(404, "Project not found")
our_permit = resolve_our_permit(project)
suggestions = suggest_similar(project, our_permit=our_permit)
return templates.TemplateResponse(
"suggest.html",
{
"request": request,
"project": project,
"suggestions": suggestions,
"our_permit": our_permit,
"bayut_enabled": BAYUT_ENABLED,
"flash": None,
},
)
# --- Employees ------------------------------------------------------------
@app.get("/employees", response_class=HTMLResponse)
def employees_page(request: Request, db: Session = Depends(get_db)):
employees = (
db.query(Employee).options(joinedload(Employee.projects)).order_by(Employee.name).all()
)
return templates.TemplateResponse(
"employees.html", {"request": request, "employees": employees, "flash": None}
)
@app.post("/employees")
def employee_create(
name: str = Form(...),
tg_username: str = Form(""),
db: Session = Depends(get_db),
):
e = Employee(name=name.strip(), tg_username=tg_username.strip().lstrip("@") or None)
db.add(e)
db.commit()
return RedirectResponse(web_path("/employees"), status_code=303)
@app.post("/employees/{employee_id}/update")
def employee_update(
employee_id: int,
name: str = Form(...),
tg_username: str = Form(""),
tg_chat_id: str = Form(""),
db: Session = Depends(get_db),
_: None = Depends(admin_required),
):
e = db.get(Employee, employee_id)
if not e:
raise HTTPException(404, "Employee not found")
e.name = name.strip()
e.tg_username = tg_username.strip().lstrip("@") or None
new_chat_id = tg_chat_id.strip() or None
if new_chat_id and new_chat_id != e.tg_chat_id:
clash = (
db.query(Employee)
.filter(Employee.tg_chat_id == new_chat_id, Employee.id != e.id)
.first()
)
if clash:
raise HTTPException(400, f"chat_id {new_chat_id} уже привязан к сотруднику «{clash.name}»")
e.tg_chat_id = new_chat_id
db.commit()
return RedirectResponse(web_path("/employees"), status_code=303)
@app.post("/employees/{employee_id}/delete")
def employee_delete(employee_id: int, db: Session = Depends(get_db), _: None = Depends(admin_required)):
e = db.get(Employee, employee_id)
if not e:
raise HTTPException(404, "Employee not found")
if e.projects:
raise HTTPException(400, "Сотрудник связан с проектами — сначала переназначьте или удалите проекты")
db.delete(e)
db.commit()
return RedirectResponse(web_path("/employees"), status_code=303)