Files
monitoring-pf/app/services/monitor.py
2026-06-04 14:55:41 +03:00

355 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Core monitoring logic.
Per project, for every CompetitorListing we already track:
1. Re-fetch the listing's URL.
2. Detect:
- Price change → 📈📉 notify
- URL returns 404 / removed → ❌ notify, mark removed
- Reappeared after removal → ♻️ notify
3. Snapshot every price into PriceHistory.
Adding new competitors is done via the web UI (user pastes URLs) — not here.
"""
from __future__ import annotations
import logging
import re
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from sqlalchemy.orm import Session
from app.db import SessionLocal
from app.models import (
CompetitorListing,
Employee,
ListingStatus,
PriceHistory,
Project,
Source,
)
from app.scrapers import BayutScraper, PropertyFinderScraper, ScrapedListing
from app.services.notifier import notify_admin, send_message
logger = logging.getLogger(__name__)
PF = PropertyFinderScraper()
BAYUT = BayutScraper()
# Same-building suggestions beyond exact permit matches are a browse heuristic —
# cap how many we show so the page stays usable.
_SUGGEST_OTHERS_LIMIT = 30
# Bayut moved to fully client-side rendering (no __NEXT_DATA__, Algolia keys
# hidden), so it can't be scraped over plain HTTP — disabled until we add a
# headless-browser fetcher. Flip to True once that exists.
BAYUT_ENABLED = False
def _scraper_for(source: Source):
return PF if source == Source.PROPERTYFINDER else BAYUT
def detect_source_from_url(url: str) -> Source | None:
u = (url or "").lower()
if "propertyfinder" in u:
return Source.PROPERTYFINDER
if "bayut.com" in u:
return Source.BAYUT
return None
def _fmt_price(value: float | None, currency: str | None = "AED") -> str:
if value is None:
return ""
return f"{value:,.0f} {currency or 'AED'}".replace(",", " ")
def _source_label(source: str) -> str:
return {"propertyfinder": "PropertyFinder", "bayut": "Bayut"}.get(source, source)
def add_competitor_url(db: Session, project: Project, url: str) -> tuple[CompetitorListing | None, str]:
"""User-facing entrypoint: paste a URL → create CompetitorListing for the project.
Returns (listing, error_message). error_message is empty on success.
"""
url = (url or "").strip()
if not url:
return None, "URL пустой"
source = detect_source_from_url(url)
if source is None:
return None, "URL должен быть с propertyfinder.ae или bayut.com"
if source == Source.BAYUT and not BAYUT_ENABLED:
return None, (
"Bayut временно не поддерживается — площадка перешла на защищённый "
"рендеринг. Добавляйте ссылки PropertyFinder."
)
scraper = _scraper_for(source)
scraped = scraper.fetch_listing(url)
if scraped is None:
return None, "Не удалось загрузить страницу — сайт мог заблокировать запрос, попробуйте позже"
if not scraped.is_active:
return None, "Страница объявления вернула 404 — ссылка битая или объявление снято"
ext_id = scraped.external_id or url # fallback if id wasn't found
existing = (
db.query(CompetitorListing)
.filter(
CompetitorListing.project_id == project.id,
CompetitorListing.source == source,
CompetitorListing.external_id == ext_id,
)
.first()
)
if existing:
return None, "Это объявление уже добавлено в проект"
now = datetime.utcnow()
listing = CompetitorListing(
project_id=project.id,
source=source,
external_id=ext_id,
url=url,
title=scraped.title,
agent_name=scraped.agent_name,
agency_name=scraped.agency_name,
current_price=scraped.price,
currency=scraped.currency or "AED",
status=ListingStatus.ACTIVE,
first_seen_at=now,
last_seen_at=now,
)
db.add(listing)
db.flush()
if scraped.price is not None:
db.add(PriceHistory(listing_id=listing.id, price=scraped.price, recorded_at=now))
db.commit()
return listing, ""
def add_competitor_urls(db: Session, project: Project, urls: list[str]) -> dict:
"""Add several pasted/selected URLs in one go (used by the suggest page's
multi-select). Processes them sequentially — each one re-fetches the page —
and reports a summary. Returns {'added': int, 'skipped': int, 'errors': [..]}.
"""
added = 0
skipped = 0
errors: list[str] = []
seen: set[str] = set()
for raw in urls:
url = (raw or "").strip()
if not url or url in seen:
continue
seen.add(url)
listing, err = add_competitor_url(db, project, url)
if err == "Это объявление уже добавлено в проект":
skipped += 1
elif err:
errors.append(err)
else:
added += 1
return {"added": added, "skipped": skipped, "errors": errors}
def check_project(db: Session, project: Project) -> list[str]:
"""Re-scan all tracked competitor listings for one project. Returns notification texts."""
changes: list[str] = []
now = datetime.utcnow()
for listing in list(project.listings):
scraper = _scraper_for(listing.source)
scraped = scraper.fetch_listing(listing.url)
if scraped is None:
# Network/parse failure — skip without changing state, try again next cycle.
continue
if not scraped.is_active:
if listing.status == ListingStatus.ACTIVE:
listing.status = ListingStatus.REMOVED
changes.append(
f"❌ <b>Объявление удалено</b> — {_source_label(listing.source.value)}\n"
f"{listing.title or 'без названия'}\n"
f"Последняя цена: {_fmt_price(listing.current_price, listing.currency)}\n"
f"{listing.url}"
)
continue
listing.last_seen_at = now
if listing.status != ListingStatus.ACTIVE:
listing.status = ListingStatus.ACTIVE
changes.append(
f"♻️ <b>Объявление снова активно</b> — {_source_label(listing.source.value)}\n"
f"{listing.title or 'без названия'}\n{listing.url}"
)
# Update metadata that may have changed
if scraped.title:
listing.title = scraped.title
if scraped.agent_name:
listing.agent_name = scraped.agent_name
if scraped.agency_name:
listing.agency_name = scraped.agency_name
old_price = listing.current_price
new_price = scraped.price
if new_price is not None and old_price is not None and new_price != old_price:
delta = new_price - old_price
pct = (delta / old_price * 100.0) if old_price else 0.0
arrow = "📈" if delta > 0 else "📉"
changes.append(
f"{arrow} <b>Цена изменилась</b> — {_source_label(listing.source.value)}\n"
f"{listing.title or 'без названия'}\n"
f"Было: {_fmt_price(old_price, listing.currency)}\n"
f"Стало: {_fmt_price(new_price, scraped.currency or listing.currency)} "
f"({'+' if delta > 0 else ''}{delta:,.0f} / {pct:+.1f}%)\n"
f"{listing.url}".replace(",", " ")
)
listing.current_price = new_price
listing.currency = scraped.currency or listing.currency
db.add(PriceHistory(listing_id=listing.id, price=new_price, recorded_at=now))
elif new_price is not None and old_price is None:
listing.current_price = new_price
listing.currency = scraped.currency or listing.currency
db.add(PriceHistory(listing_id=listing.id, price=new_price, recorded_at=now))
project.last_checked_at = now
db.commit()
return changes
def _notify_owner(project: Project, changes: list[str]) -> None:
if not changes:
return
owner: Employee | None = project.owner
if not owner or not owner.tg_chat_id:
logger.warning("Project %s has no owner chat_id; skipping notification", project.id)
notify_admin(
f"⚠️ Проект <b>{project.title}</b> (#{project.id}) — {len(changes)} изменений, "
f"но у владельца не задан tg_chat_id."
)
return
header = (
f"🏠 <b>{project.title}</b>\n"
f"Тип: {project.deal_type.value} · Изменений: {len(changes)}\n"
f"——————————"
)
send_message(owner.tg_chat_id, header)
for c in changes:
send_message(owner.tg_chat_id, c)
def _reference_url(project: Project, source: Source) -> str | None:
"""A known listing URL in the project's building for the given source.
Portals (PF) scope a building search by an internal numeric location id, not
by free text — so we hand the scraper a real listing in the same building
(our own `our_url`, else an already-tracked competitor) to resolve that id.
"""
candidates: list[str] = []
if project.our_url:
candidates.append(project.our_url)
candidates.extend(l.url for l in project.listings if l.source == source)
for url in candidates:
if detect_source_from_url(url) == source:
return url
return None
def resolve_our_permit(project: Project) -> str | None:
"""Our project's DLD permit number. Prefer the value the user typed; else
read it off our own listing (PF exposes the number in __NEXT_DATA__)."""
if project.dld_permit and project.dld_permit.strip():
return project.dld_permit.strip()
ref = _reference_url(project, Source.PROPERTYFINDER)
return PF.get_permit(ref) if ref else None
def suggest_similar(project: Project, our_permit: str | None = None) -> dict[str, list[ScrapedListing]]:
"""Search PF + Bayut for listings in this project's building.
Candidates that share our DLD permit are the same physical listing under a
different broker (rare, but it happens) — those are surfaced first. The rest
are same-building heuristics. Returns {'propertyfinder': [...], 'bayut': [...]}.
"""
out: dict[str, list[ScrapedListing]] = {"propertyfinder": [], "bayut": []}
bedrooms = project.bedrooms
deal_type = project.deal_type.value
try:
out["propertyfinder"] = PF.search_similar(
project.building, bedrooms, deal_type,
location_url=_reference_url(project, Source.PROPERTYFINDER),
)
except Exception as e:
logger.exception("PF suggest failed: %s", e)
if BAYUT_ENABLED:
try:
out["bayut"] = BAYUT.search_similar(
project.building, bedrooms, deal_type,
location_url=_reference_url(project, Source.BAYUT),
)
except Exception as e:
logger.exception("Bayut suggest failed: %s", e)
# Hide candidates already tracked for this project — and our own listing.
excluded = {(l.source.value, l.external_id) for l in project.listings}
if project.our_url:
own_src = detect_source_from_url(project.our_url)
m = re.search(r"(\d+)\.html", project.our_url)
if own_src and m:
excluded.add((own_src.value, m.group(1)))
for src in out:
out[src] = [c for c in out[src] if (src, c.external_id) not in excluded]
# Permit-first: PF can't be queried by permit and search results don't carry
# it — so read each PF candidate's permit (concurrently) and put the ones
# matching ours first. Keep all matches; cap the same-building remainder.
if our_permit and out["propertyfinder"]:
pf = out["propertyfinder"]
try:
with ThreadPoolExecutor(max_workers=12) as ex:
permits = list(ex.map(PF.get_permit, [c.url for c in pf]))
for cand, permit in zip(pf, permits):
cand.permit_number = permit
matches = [c for c in pf if c.permit_number == our_permit]
others = [c for c in pf if c.permit_number != our_permit]
out["propertyfinder"] = matches + others[:_SUGGEST_OTHERS_LIMIT]
except Exception as e:
logger.exception("PF permit enrichment failed: %s", e)
return out
def run_check_for_project(project_id: int) -> int:
db = SessionLocal()
try:
project = db.get(Project, project_id)
if not project:
return 0
changes = check_project(db, project)
_notify_owner(project, changes)
return len(changes)
finally:
db.close()
def run_check_all() -> dict[int, int]:
db = SessionLocal()
try:
ids = [p.id for p in db.query(Project).all()]
finally:
db.close()
summary = {}
for pid in ids:
try:
summary[pid] = run_check_for_project(pid)
except Exception as e:
logger.exception("Check for project %s failed: %s", pid, e)
summary[pid] = -1
return summary