"""Core monitoring logic.
Per project, for every CompetitorListing we already track:
1. Re-fetch the listing's URL.
2. Detect:
- Price change → 📈📉 notify
- URL returns 404 / removed → ❌ notify, mark removed
- Reappeared after removal → ♻️ notify
3. Snapshot every price into PriceHistory.
Adding new competitors is done via the web UI (user pastes URLs) — not here.
"""
from __future__ import annotations
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from sqlalchemy.orm import Session
from app.db import SessionLocal
from app.models import (
CompetitorListing,
Employee,
ListingStatus,
PriceHistory,
Project,
Source,
)
from app.scrapers import BayutScraper, PropertyFinderScraper, ScrapedListing
from app.services.notifier import notify_admin, send_message
logger = logging.getLogger(__name__)
PF = PropertyFinderScraper()
BAYUT = BayutScraper()
# Same-building suggestions beyond exact permit matches are a browse heuristic —
# cap how many we show so the page stays usable.
_SUGGEST_OTHERS_LIMIT = 30
_PERMIT_MISSING_DELETE_THRESHOLD = 3
# Bayut moved to fully client-side rendering (no __NEXT_DATA__, Algolia keys
# hidden), so it can't be scraped over plain HTTP — disabled until we add a
# headless-browser fetcher. Flip to True once that exists.
BAYUT_ENABLED = False
def _scraper_for(source: Source):
return PF if source == Source.PROPERTYFINDER else BAYUT
def detect_source_from_url(url: str) -> Source | None:
u = (url or "").lower()
if "propertyfinder" in u:
return Source.PROPERTYFINDER
if "bayut.com" in u:
return Source.BAYUT
return None
def _is_supported_listing_url(source: Source, url: str) -> bool:
if source == Source.PROPERTYFINDER:
return PF.is_listing_url(url)
return source == Source.BAYUT
def _fmt_price(value: float | None, currency: str | None = "AED") -> str:
if value is None:
return "—"
return f"{value:,.0f} {currency or 'AED'}".replace(",", " ")
def _source_label(source: str) -> str:
return {"propertyfinder": "PropertyFinder", "bayut": "Bayut"}.get(source, source)
def _normalize_permit(value: str | None) -> str:
return (value or "").strip().lower()
def _listing_key(source: Source | str, external_id: str) -> tuple[str, str]:
source_value = source.value if isinstance(source, Source) else str(source)
return source_value, str(external_id or "")
def _project_own_listing_key(project: Project) -> tuple[str, str] | None:
if not project.our_url:
return None
source = detect_source_from_url(project.our_url)
if source == Source.PROPERTYFINDER:
listing_id = PF.listing_id_from_url(project.our_url)
return _listing_key(source, listing_id) if listing_id else None
return None
def _is_own_listing(project: Project, item: ScrapedListing) -> bool:
own_key = _project_own_listing_key(project)
return bool(own_key and own_key == _listing_key(item.source, item.external_id))
def _format_listing_added(project: Project, listing: CompetitorListing, *, auto: bool) -> str:
title = listing.title or "без названия"
prefix = "✅ Автоматически добавлен конкурент" if auto else "✅ Добавлен конкурент"
return (
f"{prefix} — {_source_label(listing.source.value)}\n"
f"{title}\n"
f"Цена: {_fmt_price(listing.current_price, listing.currency)}\n"
f"Permit: {listing.permit_number or project.dld_permit or '—'}\n"
f"{listing.url}"
)
def _format_listing_removed(project: Project, listing: CompetitorListing, *, auto: bool) -> str:
title = listing.title or "без названия"
prefix = "🗑️ Автоматически удален конкурент" if auto else "🗑️ Удален конкурент"
return (
f"{prefix} — {_source_label(listing.source.value)}\n"
f"{title}\n"
f"Последняя цена: {_fmt_price(listing.current_price, listing.currency)}\n"
f"Permit: {listing.permit_number or project.dld_permit or '—'}\n"
f"{listing.url}"
)
def add_competitor_url(db: Session, project: Project, url: str) -> tuple[CompetitorListing | None, str]:
"""User-facing entrypoint: paste a URL → create CompetitorListing for the project.
Returns (listing, error_message). error_message is empty on success.
"""
url = (url or "").strip()
if not url:
return None, "URL пустой"
source = detect_source_from_url(url)
if source is None:
return None, "URL должен быть с propertyfinder.ae или bayut.com"
if source == Source.BAYUT and not BAYUT_ENABLED:
return None, (
"Bayut временно не поддерживается — площадка перешла на защищённый "
"рендеринг. Добавляйте ссылки PropertyFinder."
)
if not _is_supported_listing_url(source, url):
return None, "Укажите ссылку на конкретное объявление, а не на страницу поиска"
scraper = _scraper_for(source)
scraped = scraper.fetch_listing(url)
if scraped is None:
return None, "Не удалось загрузить страницу — сайт мог заблокировать запрос, попробуйте позже"
if not scraped.is_active:
return None, "Страница объявления вернула 404 — ссылка битая или объявление снято"
ext_id = scraped.external_id or url # fallback if id wasn't found
existing = (
db.query(CompetitorListing)
.filter(
CompetitorListing.project_id == project.id,
CompetitorListing.source == source,
CompetitorListing.external_id == ext_id,
)
.first()
)
if existing:
return None, "Это объявление уже добавлено в проект"
if _is_own_listing(project, scraped):
return None, "Это ссылка на наш объект, а не на конкурента"
now = datetime.utcnow()
listing = CompetitorListing(
project_id=project.id,
source=source,
external_id=ext_id,
url=url,
title=scraped.title,
agent_name=scraped.agent_name,
agency_name=scraped.agency_name,
permit_number=scraped.permit_number,
auto_discovered=False,
current_price=scraped.price,
currency=scraped.currency or "AED",
status=ListingStatus.ACTIVE,
first_seen_at=now,
last_seen_at=now,
)
db.add(listing)
db.flush()
if scraped.price is not None:
db.add(PriceHistory(listing_id=listing.id, price=scraped.price, recorded_at=now))
db.commit()
return listing, ""
def add_competitor_urls(db: Session, project: Project, urls: list[str]) -> dict:
"""Add several pasted/selected URLs in one go (used by the suggest page's
multi-select). Processes them sequentially — each one re-fetches the page —
and reports a summary. Returns {'added': int, 'skipped': int, 'errors': [..]}.
"""
added = 0
skipped = 0
errors: list[str] = []
seen: set[str] = set()
for raw in urls:
url = (raw or "").strip()
if not url or url in seen:
continue
seen.add(url)
listing, err = add_competitor_url(db, project, url)
if err == "Это объявление уже добавлено в проект":
skipped += 1
elif err:
errors.append(err)
else:
added += 1
return {"added": added, "skipped": skipped, "errors": errors}
def _create_listing_from_candidate(
db: Session,
project: Project,
candidate: ScrapedListing,
*,
permit_number: str,
) -> CompetitorListing:
now = datetime.utcnow()
listing = CompetitorListing(
project_id=project.id,
source=Source(candidate.source),
external_id=candidate.external_id or candidate.url,
url=candidate.url,
title=candidate.title,
agent_name=candidate.agent_name,
agency_name=candidate.agency_name,
permit_number=permit_number,
auto_discovered=True,
current_price=candidate.price,
currency=candidate.currency or "AED",
status=ListingStatus.ACTIVE,
first_seen_at=now,
last_seen_at=now,
)
db.add(listing)
db.flush()
if candidate.price is not None:
db.add(PriceHistory(listing_id=listing.id, price=candidate.price, recorded_at=now))
return listing
def _hide_tracked_suggestions(
db: Session,
project: Project,
suggestions: dict[str, list[ScrapedListing]],
) -> dict[str, list[ScrapedListing]]:
tracked = {
_listing_key(l.source, l.external_id)
for l in db.query(CompetitorListing).filter(CompetitorListing.project_id == project.id).all()
}
return {
source: [item for item in items if _listing_key(item.source, item.external_id) not in tracked]
for source, items in suggestions.items()
}
def sync_permit_competitors(
db: Session,
project: Project,
*,
count_missing: bool = True,
) -> tuple[list[str], dict[str, list[ScrapedListing]], str | None]:
"""Auto-maintain competitor listings with the same DLD permit.
Exact-permit matches are added automatically. Previously auto-discovered
exact-permit listings are deleted only after several consecutive permit
searches miss them. Manual competitors are never auto-deleted.
"""
changes: list[str] = []
our_permit = resolve_our_permit(project)
if not our_permit:
return changes, {"propertyfinder": [], "bayut": []}, None
normalized_permit = _normalize_permit(our_permit)
suggestions = suggest_similar(project, our_permit=our_permit, include_tracked=True)
matches = [
item
for item in suggestions["propertyfinder"]
if _normalize_permit(item.permit_number) == normalized_permit
and not _is_own_listing(project, item)
]
matched_keys = {_listing_key(item.source, item.external_id) for item in matches}
existing = {
_listing_key(item.source, item.external_id): item
for item in db.query(CompetitorListing).filter(CompetitorListing.project_id == project.id).all()
}
for item in matches:
key = _listing_key(item.source, item.external_id)
listing = existing.get(key)
if listing:
listing.permit_number = item.permit_number or our_permit
listing.permit_missing_checks = 0
if item.title:
listing.title = item.title
if item.agent_name:
listing.agent_name = item.agent_name
if item.agency_name:
listing.agency_name = item.agency_name
continue
listing = _create_listing_from_candidate(db, project, item, permit_number=item.permit_number or our_permit)
existing[key] = listing
changes.append(_format_listing_added(project, listing, auto=True))
for listing in list(existing.values()):
if not listing.auto_discovered:
continue
if _normalize_permit(listing.permit_number) != normalized_permit:
continue
if _listing_key(listing.source, listing.external_id) in matched_keys:
continue
if not count_missing:
continue
listing.permit_missing_checks = (listing.permit_missing_checks or 0) + 1
if listing.permit_missing_checks < _PERMIT_MISSING_DELETE_THRESHOLD:
continue
changes.append(_format_listing_removed(project, listing, auto=True))
db.delete(listing)
return changes, _hide_tracked_suggestions(db, project, suggestions), our_permit
def check_project(db: Session, project: Project) -> list[str]:
"""Re-scan all tracked competitor listings for one project. Returns notification texts."""
changes: list[str] = []
now = datetime.utcnow()
changes.extend(refresh_our_listing(db, project, now=now))
sync_changes, _, _ = sync_permit_competitors(db, project)
changes.extend(sync_changes)
listings = db.query(CompetitorListing).filter(CompetitorListing.project_id == project.id).all()
for listing in listings:
scraper = _scraper_for(listing.source)
scraped = scraper.fetch_listing(listing.url)
if scraped is None:
# Network/parse failure — skip without changing state, try again next cycle.
continue
if not scraped.is_active:
if listing.status == ListingStatus.ACTIVE:
listing.status = ListingStatus.REMOVED
changes.append(
f"❌ Объявление удалено — {_source_label(listing.source.value)}\n"
f"{listing.title or 'без названия'}\n"
f"Последняя цена: {_fmt_price(listing.current_price, listing.currency)}\n"
f"{listing.url}"
)
continue
listing.last_seen_at = now
if listing.status != ListingStatus.ACTIVE:
listing.status = ListingStatus.ACTIVE
changes.append(
f"♻️ Объявление снова активно — {_source_label(listing.source.value)}\n"
f"{listing.title or 'без названия'}\n{listing.url}"
)
# Update metadata that may have changed
if scraped.title:
listing.title = scraped.title
if scraped.agent_name:
listing.agent_name = scraped.agent_name
if scraped.agency_name:
listing.agency_name = scraped.agency_name
old_price = listing.current_price
new_price = scraped.price
if new_price is not None and old_price is not None and new_price != old_price:
delta = new_price - old_price
pct = (delta / old_price * 100.0) if old_price else 0.0
arrow = "📈" if delta > 0 else "📉"
changes.append(
f"{arrow} Цена изменилась — {_source_label(listing.source.value)}\n"
f"{listing.title or 'без названия'}\n"
f"Было: {_fmt_price(old_price, listing.currency)}\n"
f"Стало: {_fmt_price(new_price, scraped.currency or listing.currency)} "
f"({'+' if delta > 0 else ''}{delta:,.0f} / {pct:+.1f}%)\n"
f"{listing.url}".replace(",", " ")
)
listing.current_price = new_price
listing.currency = scraped.currency or listing.currency
db.add(PriceHistory(listing_id=listing.id, price=new_price, recorded_at=now))
elif new_price is not None and old_price is None:
listing.current_price = new_price
listing.currency = scraped.currency or listing.currency
db.add(PriceHistory(listing_id=listing.id, price=new_price, recorded_at=now))
project.last_checked_at = now
db.commit()
return changes
def refresh_our_listing(db: Session, project: Project, *, now: datetime | None = None) -> list[str]:
"""Parse our own listing and keep project.our_price in sync.
This never creates a competitor listing. It only updates project metadata
from the concrete `our_url`, so PF search pages are ignored.
"""
url = (project.our_url or "").strip()
if not url:
return []
source = detect_source_from_url(url)
if source is None or source == Source.BAYUT and not BAYUT_ENABLED:
return []
if not _is_supported_listing_url(source, url):
logger.warning("Project %s has non-listing our_url: %s", project.id, url)
return []
scraped = _scraper_for(source).fetch_listing(url)
if scraped is None or not scraped.is_active:
return []
changed: list[str] = []
if scraped.permit_number and not project.dld_permit:
project.dld_permit = scraped.permit_number
old_price = project.our_price
new_price = scraped.price
if new_price is not None and old_price != new_price:
project.our_price = new_price
if old_price is not None:
delta = new_price - old_price
pct = (delta / old_price * 100.0) if old_price else 0.0
arrow = "📈" if delta > 0 else "📉"
changed.append(
f"{arrow} Наша цена скорректирована — {_source_label(source.value)}\n"
f"{project.title}\n"
f"Было: {_fmt_price(old_price)}\n"
f"Стало: {_fmt_price(new_price, scraped.currency or 'AED')} "
f"({'+' if delta > 0 else ''}{delta:,.0f} / {pct:+.1f}%)\n"
f"{url}".replace(",", " ")
)
if now is not None:
project.last_checked_at = now
db.flush()
return changed
def _notify_owner(project: Project, changes: list[str]) -> None:
if not changes:
return
owner: Employee | None = project.owner
if not owner or not owner.tg_chat_id:
logger.warning("Project %s has no owner chat_id; skipping notification", project.id)
notify_admin(
f"⚠️ Проект {project.title} (#{project.id}) — {len(changes)} изменений, "
f"но у владельца не задан tg_chat_id."
)
return
header = (
f"🏠 {project.title}\n"
f"Тип: {project.deal_type.value} · Изменений: {len(changes)}\n"
f"——————————"
)
send_message(owner.tg_chat_id, header)
for c in changes:
send_message(owner.tg_chat_id, c)
def notify_project_changes(project: Project, changes: list[str]) -> None:
_notify_owner(project, changes)
def _reference_url(project: Project, source: Source) -> str | None:
"""A known listing URL in the project's building for the given source.
Portals (PF) scope a building search by an internal numeric location id, not
by free text — so we hand the scraper a real listing in the same building
(our own `our_url`, else an already-tracked competitor) to resolve that id.
"""
candidates: list[str] = []
if project.our_url:
candidates.append(project.our_url)
candidates.extend(l.url for l in project.listings if l.source == source)
for url in candidates:
if detect_source_from_url(url) == source:
return url
return None
def resolve_our_permit(project: Project) -> str | None:
"""Our project's DLD permit number. Prefer the value the user typed; else
read it off our own listing (PF exposes the number in __NEXT_DATA__)."""
if project.dld_permit and project.dld_permit.strip():
return project.dld_permit.strip()
ref = _reference_url(project, Source.PROPERTYFINDER)
return PF.get_permit(ref) if ref else None
def suggest_similar(
project: Project,
our_permit: str | None = None,
*,
include_tracked: bool = False,
) -> dict[str, list[ScrapedListing]]:
"""Search PF + Bayut for listings in this project's building.
Candidates that share our DLD permit are the same physical listing under a
different broker (rare, but it happens) — those are surfaced first. The rest
are same-building heuristics. Returns {'propertyfinder': [...], 'bayut': [...]}.
"""
out: dict[str, list[ScrapedListing]] = {"propertyfinder": [], "bayut": []}
bedrooms = project.bedrooms
deal_type = project.deal_type.value
try:
out["propertyfinder"] = PF.search_similar(
project.building, bedrooms, deal_type,
location_url=_reference_url(project, Source.PROPERTYFINDER),
)
except Exception as e:
logger.exception("PF suggest failed: %s", e)
if BAYUT_ENABLED:
try:
out["bayut"] = BAYUT.search_similar(
project.building, bedrooms, deal_type,
location_url=_reference_url(project, Source.BAYUT),
)
except Exception as e:
logger.exception("Bayut suggest failed: %s", e)
# Hide our own listing. Tracked competitors are hidden for UI suggestions,
# but included for automatic permit synchronization.
excluded: set[tuple[str, str]] = set()
if not include_tracked:
excluded.update((l.source.value, l.external_id) for l in project.listings)
if project.our_url:
own_src = detect_source_from_url(project.our_url)
if own_src == Source.PROPERTYFINDER:
listing_id = PF.listing_id_from_url(project.our_url)
if listing_id:
excluded.add((own_src.value, listing_id))
for src in out:
out[src] = [c for c in out[src] if (src, c.external_id) not in excluded]
# Permit-first: PF can't be queried by permit and search results don't carry
# it — so read each PF candidate's permit (concurrently) and put the ones
# matching ours first. Keep all matches; cap the same-building remainder.
if our_permit and out["propertyfinder"]:
pf = out["propertyfinder"]
try:
with ThreadPoolExecutor(max_workers=12) as ex:
permits = list(ex.map(PF.get_permit, [c.url for c in pf]))
for cand, permit in zip(pf, permits):
cand.permit_number = permit
normalized = _normalize_permit(our_permit)
matches = [c for c in pf if _normalize_permit(c.permit_number) == normalized]
others = [c for c in pf if _normalize_permit(c.permit_number) != normalized]
out["propertyfinder"] = matches + others[:_SUGGEST_OTHERS_LIMIT]
except Exception as e:
logger.exception("PF permit enrichment failed: %s", e)
return out
def run_check_for_project(project_id: int) -> int:
db = SessionLocal()
try:
project = db.get(Project, project_id)
if not project:
return 0
changes = check_project(db, project)
_notify_owner(project, changes)
return len(changes)
finally:
db.close()
def run_check_all() -> dict[int, int]:
db = SessionLocal()
try:
ids = [p.id for p in db.query(Project).all()]
finally:
db.close()
summary = {}
for pid in ids:
try:
summary[pid] = run_check_for_project(pid)
except Exception as e:
logger.exception("Check for project %s failed: %s", pid, e)
summary[pid] = -1
return summary