Files
monitoring-pf/app/services/monitor.py
Grendgi 6966e6810c
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 35s
Auto-sync permit competitors in monitoring PF
2026-06-05 12:09:51 +03:00

510 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Core monitoring logic.
Per project, for every CompetitorListing we already track:
1. Re-fetch the listing's URL.
2. Detect:
- Price change → 📈📉 notify
- URL returns 404 / removed → ❌ notify, mark removed
- Reappeared after removal → ♻️ notify
3. Snapshot every price into PriceHistory.
Adding new competitors is done via the web UI (user pastes URLs) — not here.
"""
from __future__ import annotations
import logging
import re
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from sqlalchemy.orm import Session
from app.db import SessionLocal
from app.models import (
CompetitorListing,
Employee,
ListingStatus,
PriceHistory,
Project,
Source,
)
from app.scrapers import BayutScraper, PropertyFinderScraper, ScrapedListing
from app.services.notifier import notify_admin, send_message
logger = logging.getLogger(__name__)
PF = PropertyFinderScraper()
BAYUT = BayutScraper()
# Same-building suggestions beyond exact permit matches are a browse heuristic —
# cap how many we show so the page stays usable.
_SUGGEST_OTHERS_LIMIT = 30
# Bayut moved to fully client-side rendering (no __NEXT_DATA__, Algolia keys
# hidden), so it can't be scraped over plain HTTP — disabled until we add a
# headless-browser fetcher. Flip to True once that exists.
BAYUT_ENABLED = False
def _scraper_for(source: Source):
return PF if source == Source.PROPERTYFINDER else BAYUT
def detect_source_from_url(url: str) -> Source | None:
u = (url or "").lower()
if "propertyfinder" in u:
return Source.PROPERTYFINDER
if "bayut.com" in u:
return Source.BAYUT
return None
def _fmt_price(value: float | None, currency: str | None = "AED") -> str:
if value is None:
return ""
return f"{value:,.0f} {currency or 'AED'}".replace(",", " ")
def _source_label(source: str) -> str:
return {"propertyfinder": "PropertyFinder", "bayut": "Bayut"}.get(source, source)
def _normalize_permit(value: str | None) -> str:
return (value or "").strip().lower()
def _listing_key(source: Source | str, external_id: str) -> tuple[str, str]:
source_value = source.value if isinstance(source, Source) else str(source)
return source_value, str(external_id or "")
def _format_listing_added(project: Project, listing: CompetitorListing, *, auto: bool) -> str:
title = listing.title or "без названия"
prefix = "✅ <b>Автоматически добавлен конкурент</b>" if auto else "✅ <b>Добавлен конкурент</b>"
return (
f"{prefix}{_source_label(listing.source.value)}\n"
f"{title}\n"
f"Цена: {_fmt_price(listing.current_price, listing.currency)}\n"
f"Permit: <code>{listing.permit_number or project.dld_permit or ''}</code>\n"
f"{listing.url}"
)
def _format_listing_removed(project: Project, listing: CompetitorListing, *, auto: bool) -> str:
title = listing.title or "без названия"
prefix = "🗑️ <b>Автоматически удален конкурент</b>" if auto else "🗑️ <b>Удален конкурент</b>"
return (
f"{prefix}{_source_label(listing.source.value)}\n"
f"{title}\n"
f"Последняя цена: {_fmt_price(listing.current_price, listing.currency)}\n"
f"Permit: <code>{listing.permit_number or project.dld_permit or ''}</code>\n"
f"{listing.url}"
)
def add_competitor_url(db: Session, project: Project, url: str) -> tuple[CompetitorListing | None, str]:
"""User-facing entrypoint: paste a URL → create CompetitorListing for the project.
Returns (listing, error_message). error_message is empty on success.
"""
url = (url or "").strip()
if not url:
return None, "URL пустой"
source = detect_source_from_url(url)
if source is None:
return None, "URL должен быть с propertyfinder.ae или bayut.com"
if source == Source.BAYUT and not BAYUT_ENABLED:
return None, (
"Bayut временно не поддерживается — площадка перешла на защищённый "
"рендеринг. Добавляйте ссылки PropertyFinder."
)
scraper = _scraper_for(source)
scraped = scraper.fetch_listing(url)
if scraped is None:
return None, "Не удалось загрузить страницу — сайт мог заблокировать запрос, попробуйте позже"
if not scraped.is_active:
return None, "Страница объявления вернула 404 — ссылка битая или объявление снято"
ext_id = scraped.external_id or url # fallback if id wasn't found
existing = (
db.query(CompetitorListing)
.filter(
CompetitorListing.project_id == project.id,
CompetitorListing.source == source,
CompetitorListing.external_id == ext_id,
)
.first()
)
if existing:
return None, "Это объявление уже добавлено в проект"
now = datetime.utcnow()
listing = CompetitorListing(
project_id=project.id,
source=source,
external_id=ext_id,
url=url,
title=scraped.title,
agent_name=scraped.agent_name,
agency_name=scraped.agency_name,
permit_number=scraped.permit_number,
auto_discovered=False,
current_price=scraped.price,
currency=scraped.currency or "AED",
status=ListingStatus.ACTIVE,
first_seen_at=now,
last_seen_at=now,
)
db.add(listing)
db.flush()
if scraped.price is not None:
db.add(PriceHistory(listing_id=listing.id, price=scraped.price, recorded_at=now))
db.commit()
return listing, ""
def add_competitor_urls(db: Session, project: Project, urls: list[str]) -> dict:
"""Add several pasted/selected URLs in one go (used by the suggest page's
multi-select). Processes them sequentially — each one re-fetches the page —
and reports a summary. Returns {'added': int, 'skipped': int, 'errors': [..]}.
"""
added = 0
skipped = 0
errors: list[str] = []
seen: set[str] = set()
for raw in urls:
url = (raw or "").strip()
if not url or url in seen:
continue
seen.add(url)
listing, err = add_competitor_url(db, project, url)
if err == "Это объявление уже добавлено в проект":
skipped += 1
elif err:
errors.append(err)
else:
added += 1
return {"added": added, "skipped": skipped, "errors": errors}
def _create_listing_from_candidate(
db: Session,
project: Project,
candidate: ScrapedListing,
*,
permit_number: str,
) -> CompetitorListing:
now = datetime.utcnow()
listing = CompetitorListing(
project_id=project.id,
source=Source(candidate.source),
external_id=candidate.external_id or candidate.url,
url=candidate.url,
title=candidate.title,
agent_name=candidate.agent_name,
agency_name=candidate.agency_name,
permit_number=permit_number,
auto_discovered=True,
current_price=candidate.price,
currency=candidate.currency or "AED",
status=ListingStatus.ACTIVE,
first_seen_at=now,
last_seen_at=now,
)
db.add(listing)
db.flush()
if candidate.price is not None:
db.add(PriceHistory(listing_id=listing.id, price=candidate.price, recorded_at=now))
return listing
def _hide_tracked_suggestions(
db: Session,
project: Project,
suggestions: dict[str, list[ScrapedListing]],
) -> dict[str, list[ScrapedListing]]:
tracked = {
_listing_key(l.source, l.external_id)
for l in db.query(CompetitorListing).filter(CompetitorListing.project_id == project.id).all()
}
return {
source: [item for item in items if _listing_key(item.source, item.external_id) not in tracked]
for source, items in suggestions.items()
}
def sync_permit_competitors(
db: Session,
project: Project,
) -> tuple[list[str], dict[str, list[ScrapedListing]], str | None]:
"""Auto-maintain competitor listings with the same DLD permit.
Exact-permit matches are added automatically. Previously auto-discovered
exact-permit listings that disappear from the next permit search are
deleted. Manual competitors are never auto-deleted.
"""
changes: list[str] = []
our_permit = resolve_our_permit(project)
if not our_permit:
return changes, {"propertyfinder": [], "bayut": []}, None
normalized_permit = _normalize_permit(our_permit)
suggestions = suggest_similar(project, our_permit=our_permit, include_tracked=True)
matches = [
item
for item in suggestions["propertyfinder"]
if _normalize_permit(item.permit_number) == normalized_permit
]
matched_keys = {_listing_key(item.source, item.external_id) for item in matches}
existing = {
_listing_key(item.source, item.external_id): item
for item in db.query(CompetitorListing).filter(CompetitorListing.project_id == project.id).all()
}
for item in matches:
key = _listing_key(item.source, item.external_id)
listing = existing.get(key)
if listing:
listing.permit_number = item.permit_number or our_permit
if item.title:
listing.title = item.title
if item.agent_name:
listing.agent_name = item.agent_name
if item.agency_name:
listing.agency_name = item.agency_name
continue
listing = _create_listing_from_candidate(db, project, item, permit_number=item.permit_number or our_permit)
existing[key] = listing
changes.append(_format_listing_added(project, listing, auto=True))
for listing in list(existing.values()):
if not listing.auto_discovered:
continue
if _normalize_permit(listing.permit_number) != normalized_permit:
continue
if _listing_key(listing.source, listing.external_id) in matched_keys:
continue
changes.append(_format_listing_removed(project, listing, auto=True))
db.delete(listing)
return changes, _hide_tracked_suggestions(db, project, suggestions), our_permit
def check_project(db: Session, project: Project) -> list[str]:
"""Re-scan all tracked competitor listings for one project. Returns notification texts."""
changes: list[str] = []
now = datetime.utcnow()
sync_changes, _, _ = sync_permit_competitors(db, project)
changes.extend(sync_changes)
listings = db.query(CompetitorListing).filter(CompetitorListing.project_id == project.id).all()
for listing in listings:
scraper = _scraper_for(listing.source)
scraped = scraper.fetch_listing(listing.url)
if scraped is None:
# Network/parse failure — skip without changing state, try again next cycle.
continue
if not scraped.is_active:
if listing.status == ListingStatus.ACTIVE:
listing.status = ListingStatus.REMOVED
changes.append(
f"❌ <b>Объявление удалено</b> — {_source_label(listing.source.value)}\n"
f"{listing.title or 'без названия'}\n"
f"Последняя цена: {_fmt_price(listing.current_price, listing.currency)}\n"
f"{listing.url}"
)
continue
listing.last_seen_at = now
if listing.status != ListingStatus.ACTIVE:
listing.status = ListingStatus.ACTIVE
changes.append(
f"♻️ <b>Объявление снова активно</b> — {_source_label(listing.source.value)}\n"
f"{listing.title or 'без названия'}\n{listing.url}"
)
# Update metadata that may have changed
if scraped.title:
listing.title = scraped.title
if scraped.agent_name:
listing.agent_name = scraped.agent_name
if scraped.agency_name:
listing.agency_name = scraped.agency_name
old_price = listing.current_price
new_price = scraped.price
if new_price is not None and old_price is not None and new_price != old_price:
delta = new_price - old_price
pct = (delta / old_price * 100.0) if old_price else 0.0
arrow = "📈" if delta > 0 else "📉"
changes.append(
f"{arrow} <b>Цена изменилась</b> — {_source_label(listing.source.value)}\n"
f"{listing.title or 'без названия'}\n"
f"Было: {_fmt_price(old_price, listing.currency)}\n"
f"Стало: {_fmt_price(new_price, scraped.currency or listing.currency)} "
f"({'+' if delta > 0 else ''}{delta:,.0f} / {pct:+.1f}%)\n"
f"{listing.url}".replace(",", " ")
)
listing.current_price = new_price
listing.currency = scraped.currency or listing.currency
db.add(PriceHistory(listing_id=listing.id, price=new_price, recorded_at=now))
elif new_price is not None and old_price is None:
listing.current_price = new_price
listing.currency = scraped.currency or listing.currency
db.add(PriceHistory(listing_id=listing.id, price=new_price, recorded_at=now))
project.last_checked_at = now
db.commit()
return changes
def _notify_owner(project: Project, changes: list[str]) -> None:
if not changes:
return
owner: Employee | None = project.owner
if not owner or not owner.tg_chat_id:
logger.warning("Project %s has no owner chat_id; skipping notification", project.id)
notify_admin(
f"⚠️ Проект <b>{project.title}</b> (#{project.id}) — {len(changes)} изменений, "
f"но у владельца не задан tg_chat_id."
)
return
header = (
f"🏠 <b>{project.title}</b>\n"
f"Тип: {project.deal_type.value} · Изменений: {len(changes)}\n"
f"——————————"
)
send_message(owner.tg_chat_id, header)
for c in changes:
send_message(owner.tg_chat_id, c)
def notify_project_changes(project: Project, changes: list[str]) -> None:
_notify_owner(project, changes)
def _reference_url(project: Project, source: Source) -> str | None:
"""A known listing URL in the project's building for the given source.
Portals (PF) scope a building search by an internal numeric location id, not
by free text — so we hand the scraper a real listing in the same building
(our own `our_url`, else an already-tracked competitor) to resolve that id.
"""
candidates: list[str] = []
if project.our_url:
candidates.append(project.our_url)
candidates.extend(l.url for l in project.listings if l.source == source)
for url in candidates:
if detect_source_from_url(url) == source:
return url
return None
def resolve_our_permit(project: Project) -> str | None:
"""Our project's DLD permit number. Prefer the value the user typed; else
read it off our own listing (PF exposes the number in __NEXT_DATA__)."""
if project.dld_permit and project.dld_permit.strip():
return project.dld_permit.strip()
ref = _reference_url(project, Source.PROPERTYFINDER)
return PF.get_permit(ref) if ref else None
def suggest_similar(
project: Project,
our_permit: str | None = None,
*,
include_tracked: bool = False,
) -> dict[str, list[ScrapedListing]]:
"""Search PF + Bayut for listings in this project's building.
Candidates that share our DLD permit are the same physical listing under a
different broker (rare, but it happens) — those are surfaced first. The rest
are same-building heuristics. Returns {'propertyfinder': [...], 'bayut': [...]}.
"""
out: dict[str, list[ScrapedListing]] = {"propertyfinder": [], "bayut": []}
bedrooms = project.bedrooms
deal_type = project.deal_type.value
try:
out["propertyfinder"] = PF.search_similar(
project.building, bedrooms, deal_type,
location_url=_reference_url(project, Source.PROPERTYFINDER),
)
except Exception as e:
logger.exception("PF suggest failed: %s", e)
if BAYUT_ENABLED:
try:
out["bayut"] = BAYUT.search_similar(
project.building, bedrooms, deal_type,
location_url=_reference_url(project, Source.BAYUT),
)
except Exception as e:
logger.exception("Bayut suggest failed: %s", e)
# Hide our own listing. Tracked competitors are hidden for UI suggestions,
# but included for automatic permit synchronization.
excluded: set[tuple[str, str]] = set()
if not include_tracked:
excluded.update((l.source.value, l.external_id) for l in project.listings)
if project.our_url:
own_src = detect_source_from_url(project.our_url)
m = re.search(r"(\d+)\.html", project.our_url)
if own_src and m:
excluded.add((own_src.value, m.group(1)))
for src in out:
out[src] = [c for c in out[src] if (src, c.external_id) not in excluded]
# Permit-first: PF can't be queried by permit and search results don't carry
# it — so read each PF candidate's permit (concurrently) and put the ones
# matching ours first. Keep all matches; cap the same-building remainder.
if our_permit and out["propertyfinder"]:
pf = out["propertyfinder"]
try:
with ThreadPoolExecutor(max_workers=12) as ex:
permits = list(ex.map(PF.get_permit, [c.url for c in pf]))
for cand, permit in zip(pf, permits):
cand.permit_number = permit
matches = [c for c in pf if c.permit_number == our_permit]
others = [c for c in pf if c.permit_number != our_permit]
out["propertyfinder"] = matches + others[:_SUGGEST_OTHERS_LIMIT]
except Exception as e:
logger.exception("PF permit enrichment failed: %s", e)
return out
def run_check_for_project(project_id: int) -> int:
db = SessionLocal()
try:
project = db.get(Project, project_id)
if not project:
return 0
changes = check_project(db, project)
_notify_owner(project, changes)
return len(changes)
finally:
db.close()
def run_check_all() -> dict[int, int]:
db = SessionLocal()
try:
ids = [p.id for p in db.query(Project).all()]
finally:
db.close()
summary = {}
for pid in ids:
try:
summary[pid] = run_check_for_project(pid)
except Exception as e:
logger.exception("Check for project %s failed: %s", pid, e)
summary[pid] = -1
return summary