Validate PF project listings and sync own price
All checks were successful
CI / go (push) Successful in 41s
CI / python (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 33s

This commit is contained in:
Grendgi
2026-06-11 14:46:35 +03:00
parent d53ecb2add
commit c763ff423d
3 changed files with 171 additions and 6 deletions

View File

@@ -119,6 +119,15 @@ def _extract_id_from_url(url: str) -> str | None:
return m.group(1) if m else None
def is_listing_url(url: str) -> bool:
"""True only for a concrete PF listing URL.
PF search pages also contain listing-like JSON. Treating them as a detail
page can bind monitoring to a random result, so callers must reject them.
"""
return bool(_extract_id_from_url(url or ""))
def _is_listing_dict(item: dict) -> bool:
"""Heuristic: a listing dict contains a price plus an id-like field."""
if not isinstance(item, dict):
@@ -131,12 +140,22 @@ def _is_listing_dict(item: dict) -> bool:
class PropertyFinderScraper:
source = SOURCE
def is_listing_url(self, url: str) -> bool:
return is_listing_url(url)
def listing_id_from_url(self, url: str) -> str | None:
return _extract_id_from_url(url)
def fetch_listing(self, url: str) -> ScrapedListing | None:
"""Refetch a known listing URL. Returns:
- ScrapedListing(is_active=False) if the URL returns 404 (listing removed)
- ScrapedListing with current data if alive
- None on network/parse failure (we won't update the DB in that case)
"""
if not is_listing_url(url):
logger.warning("PF fetch_listing rejected non-listing URL: %s", url)
return None
try:
html = fetch_html(url)
except ScraperError as e:
@@ -199,6 +218,9 @@ class PropertyFinderScraper:
def get_permit(self, url: str) -> str | None:
"""Fetch a listing page and return only its DLD permit number (or None).
Used to compare candidates against our own permit during suggestions."""
if not is_listing_url(url):
logger.warning("PF get_permit rejected non-listing URL: %s", url)
return None
try:
html = fetch_html(url)
except ScraperError as e:
@@ -300,6 +322,8 @@ class PropertyFinderScraper:
agent_name, agency_name = _extract_broker(node)
share = node.get("share_url") or node.get("path")
cand_url = share if str(share).startswith("http") else urljoin(BASE_URL, str(share or ""))
if not is_listing_url(cand_url):
continue
results.append(
ScrapedListing(

View File

@@ -14,7 +14,6 @@ Adding new competitors is done via the web UI (user pastes URLs) — not here.
from __future__ import annotations
import logging
import re
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
@@ -61,6 +60,12 @@ def detect_source_from_url(url: str) -> Source | None:
return None
def _is_supported_listing_url(source: Source, url: str) -> bool:
if source == Source.PROPERTYFINDER:
return PF.is_listing_url(url)
return source == Source.BAYUT
def _fmt_price(value: float | None, currency: str | None = "AED") -> str:
if value is None:
return ""
@@ -80,6 +85,21 @@ def _listing_key(source: Source | str, external_id: str) -> tuple[str, str]:
return source_value, str(external_id or "")
def _project_own_listing_key(project: Project) -> tuple[str, str] | None:
if not project.our_url:
return None
source = detect_source_from_url(project.our_url)
if source == Source.PROPERTYFINDER:
listing_id = PF.listing_id_from_url(project.our_url)
return _listing_key(source, listing_id) if listing_id else None
return None
def _is_own_listing(project: Project, item: ScrapedListing) -> bool:
own_key = _project_own_listing_key(project)
return bool(own_key and own_key == _listing_key(item.source, item.external_id))
def _format_listing_added(project: Project, listing: CompetitorListing, *, auto: bool) -> str:
title = listing.title or "без названия"
prefix = "✅ <b>Автоматически добавлен конкурент</b>" if auto else "✅ <b>Добавлен конкурент</b>"
@@ -119,6 +139,8 @@ def add_competitor_url(db: Session, project: Project, url: str) -> tuple[Competi
"Bayut временно не поддерживается — площадка перешла на защищённый "
"рендеринг. Добавляйте ссылки PropertyFinder."
)
if not _is_supported_listing_url(source, url):
return None, "Укажите ссылку на конкретное объявление, а не на страницу поиска"
scraper = _scraper_for(source)
scraped = scraper.fetch_listing(url)
@@ -139,6 +161,8 @@ def add_competitor_url(db: Session, project: Project, url: str) -> tuple[Competi
)
if existing:
return None, "Это объявление уже добавлено в проект"
if _is_own_listing(project, scraped):
return None, "Это ссылка на наш объект, а не на конкурента"
now = datetime.utcnow()
listing = CompetitorListing(
@@ -258,6 +282,7 @@ def sync_permit_competitors(
item
for item in suggestions["propertyfinder"]
if _normalize_permit(item.permit_number) == normalized_permit
and not _is_own_listing(project, item)
]
matched_keys = {_listing_key(item.source, item.external_id) for item in matches}
@@ -306,6 +331,7 @@ def check_project(db: Session, project: Project) -> list[str]:
"""Re-scan all tracked competitor listings for one project. Returns notification texts."""
changes: list[str] = []
now = datetime.utcnow()
changes.extend(refresh_our_listing(db, project, now=now))
sync_changes, _, _ = sync_permit_competitors(db, project)
changes.extend(sync_changes)
@@ -372,6 +398,52 @@ def check_project(db: Session, project: Project) -> list[str]:
return changes
def refresh_our_listing(db: Session, project: Project, *, now: datetime | None = None) -> list[str]:
"""Parse our own listing and keep project.our_price in sync.
This never creates a competitor listing. It only updates project metadata
from the concrete `our_url`, so PF search pages are ignored.
"""
url = (project.our_url or "").strip()
if not url:
return []
source = detect_source_from_url(url)
if source is None or source == Source.BAYUT and not BAYUT_ENABLED:
return []
if not _is_supported_listing_url(source, url):
logger.warning("Project %s has non-listing our_url: %s", project.id, url)
return []
scraped = _scraper_for(source).fetch_listing(url)
if scraped is None or not scraped.is_active:
return []
changed: list[str] = []
if scraped.permit_number and not project.dld_permit:
project.dld_permit = scraped.permit_number
old_price = project.our_price
new_price = scraped.price
if new_price is not None and old_price != new_price:
project.our_price = new_price
if old_price is not None:
delta = new_price - old_price
pct = (delta / old_price * 100.0) if old_price else 0.0
arrow = "📈" if delta > 0 else "📉"
changed.append(
f"{arrow} <b>Наша цена скорректирована</b> — {_source_label(source.value)}\n"
f"{project.title}\n"
f"Было: {_fmt_price(old_price)}\n"
f"Стало: {_fmt_price(new_price, scraped.currency or 'AED')} "
f"({'+' if delta > 0 else ''}{delta:,.0f} / {pct:+.1f}%)\n"
f"{url}".replace(",", " ")
)
if now is not None:
project.last_checked_at = now
db.flush()
return changed
def _notify_owner(project: Project, changes: list[str]) -> None:
if not changes:
return
@@ -463,9 +535,10 @@ def suggest_similar(
excluded.update((l.source.value, l.external_id) for l in project.listings)
if project.our_url:
own_src = detect_source_from_url(project.our_url)
m = re.search(r"(\d+)\.html", project.our_url)
if own_src and m:
excluded.add((own_src.value, m.group(1)))
if own_src == Source.PROPERTYFINDER:
listing_id = PF.listing_id_from_url(project.our_url)
if listing_id:
excluded.add((own_src.value, listing_id))
for src in out:
out[src] = [c for c in out[src] if (src, c.external_id) not in excluded]
@@ -479,8 +552,9 @@ def suggest_similar(
permits = list(ex.map(PF.get_permit, [c.url for c in pf]))
for cand, permit in zip(pf, permits):
cand.permit_number = permit
matches = [c for c in pf if c.permit_number == our_permit]
others = [c for c in pf if c.permit_number != our_permit]
normalized = _normalize_permit(our_permit)
matches = [c for c in pf if _normalize_permit(c.permit_number) == normalized]
others = [c for c in pf if _normalize_permit(c.permit_number) != normalized]
out["propertyfinder"] = matches + others[:_SUGGEST_OTHERS_LIMIT]
except Exception as e:
logger.exception("PF permit enrichment failed: %s", e)