326 lines
12 KiB
Python
326 lines
12 KiB
Python
"""PropertyFinder.ae scraper.
|
|
|
|
Two operations:
|
|
- fetch_listing(url): read a listing detail page → ScrapedListing (title/price/agent/permit).
|
|
- search_similar(building, bedrooms, deal_type): search PF for similar candidates
|
|
by building name + bedrooms filter → list[ScrapedListing].
|
|
|
|
PF is a Next.js app — listing data sits in <script id="__NEXT_DATA__">.
|
|
Note: PF intentionally hides the Trakheesi permit as an image on the detail page,
|
|
so permit may come back as None — that's fine, we don't depend on it.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
from urllib.parse import urljoin
|
|
|
|
from app.scrapers.base import (
|
|
ScrapedListing,
|
|
ScraperError,
|
|
extract_next_data,
|
|
fetch_html,
|
|
parse_price,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
BASE_URL = "https://www.propertyfinder.ae"
|
|
SOURCE = "propertyfinder"
|
|
|
|
# PF location hierarchy, most specific first. search_similar scopes by the most
|
|
# specific id available on a reference listing page.
|
|
_LOC_TYPE_PRIORITY = {
|
|
"TOWER": 5,
|
|
"BUILDING": 4,
|
|
"DEVELOPMENT": 3,
|
|
"SUBCOMMUNITY": 2,
|
|
"COMMUNITY": 1,
|
|
"CITY": 0,
|
|
}
|
|
|
|
|
|
def _category_for_deal(deal_type: str) -> int:
|
|
return 1 if deal_type == "sale" else 2
|
|
|
|
|
|
def _get(d, *keys, default=None):
|
|
cur = d
|
|
for k in keys:
|
|
if not isinstance(cur, dict):
|
|
return default
|
|
cur = cur.get(k)
|
|
if cur is None:
|
|
return default
|
|
return cur
|
|
|
|
|
|
def _walk(node):
|
|
"""Iterate over every dict in a nested JSON structure."""
|
|
if isinstance(node, dict):
|
|
yield node
|
|
for v in node.values():
|
|
yield from _walk(v)
|
|
elif isinstance(node, list):
|
|
for it in node:
|
|
yield from _walk(it)
|
|
|
|
|
|
def _extract_price(item: dict) -> tuple[float | None, str | None]:
|
|
price = item.get("price")
|
|
if isinstance(price, dict):
|
|
val = price.get("value") or price.get("amount") or price.get("min") or price.get("from")
|
|
cur = price.get("currency") or "AED"
|
|
return parse_price(val), cur
|
|
if isinstance(price, (int, float, str)):
|
|
return parse_price(price), item.get("currency") or "AED"
|
|
return None, "AED"
|
|
|
|
|
|
def _extract_broker(item: dict) -> tuple[str | None, str | None]:
|
|
broker = item.get("broker") or item.get("agency") or {}
|
|
agent = item.get("agent") or item.get("contact") or {}
|
|
agency_name = broker.get("name") if isinstance(broker, dict) else None
|
|
agent_name = agent.get("name") if isinstance(agent, dict) else None
|
|
return agent_name, agency_name
|
|
|
|
|
|
def _extract_permit(item: dict) -> str | None:
|
|
for key in ("permit_number", "permitNumber", "trakheesi", "rera", "permit"):
|
|
v = item.get(key)
|
|
if v:
|
|
return str(v).strip()
|
|
reg = item.get("regulatory") or item.get("regulation") or {}
|
|
if isinstance(reg, dict):
|
|
for key in ("permit", "permit_number", "trakheesi", "rera"):
|
|
v = reg.get(key)
|
|
if v:
|
|
return str(v).strip()
|
|
return None
|
|
|
|
|
|
def _find_permit_on_page(data: dict) -> str | None:
|
|
"""The DLD permit number lives in a regulatory block rendered as an image,
|
|
but its plain value is still in __NEXT_DATA__: the dict that carries a
|
|
`permit_validation_url` (the Trakheesi link) also has the number in
|
|
`number`. Walk the page and pull it out."""
|
|
for node in _walk(data):
|
|
if isinstance(node, dict) and node.get("permit_validation_url") and node.get("number"):
|
|
return str(node["number"]).strip()
|
|
return None
|
|
|
|
|
|
_ID_FROM_URL = re.compile(r"-(\d+)\.html(?:[?#].*)?$")
|
|
|
|
|
|
def _extract_id_from_url(url: str) -> str | None:
|
|
m = _ID_FROM_URL.search(url)
|
|
return m.group(1) if m else None
|
|
|
|
|
|
def _is_listing_dict(item: dict) -> bool:
|
|
"""Heuristic: a listing dict contains a price plus an id-like field."""
|
|
if not isinstance(item, dict):
|
|
return False
|
|
has_price = "price" in item
|
|
has_id = any(k in item for k in ("id", "reference", "listing_id", "externalID"))
|
|
return has_price and has_id
|
|
|
|
|
|
class PropertyFinderScraper:
|
|
source = SOURCE
|
|
|
|
def fetch_listing(self, url: str) -> ScrapedListing | None:
|
|
"""Refetch a known listing URL. Returns:
|
|
- ScrapedListing(is_active=False) if the URL returns 404 (listing removed)
|
|
- ScrapedListing with current data if alive
|
|
- None on network/parse failure (we won't update the DB in that case)
|
|
"""
|
|
try:
|
|
html = fetch_html(url)
|
|
except ScraperError as e:
|
|
logger.warning("PF refetch failed for %s: %s", url, e)
|
|
return None
|
|
|
|
if not html:
|
|
return ScrapedListing(
|
|
source=SOURCE, external_id=_extract_id_from_url(url) or "", url=url,
|
|
title=None, price=None, currency=None, permit_number=None,
|
|
agent_name=None, agency_name=None, is_active=False,
|
|
)
|
|
|
|
data = extract_next_data(html)
|
|
if not data:
|
|
return None
|
|
|
|
# On a PF detail page the property dict is nested in pageProps. Walk and pick
|
|
# the dict that has both a "price" and an id, ignoring trivial nested ones.
|
|
best = None
|
|
best_score = -1
|
|
for node in _walk(data):
|
|
if not _is_listing_dict(node):
|
|
continue
|
|
score = 0
|
|
if "title" in node or "name" in node:
|
|
score += 2
|
|
if any(k in node for k in ("broker", "agent", "agency")):
|
|
score += 2
|
|
if "bedrooms" in node or "rooms" in node:
|
|
score += 1
|
|
if score > best_score:
|
|
best_score = score
|
|
best = node
|
|
|
|
if best is None:
|
|
logger.warning("PF: no listing dict found in __NEXT_DATA__ for %s", url)
|
|
return None
|
|
|
|
price, currency = _extract_price(best)
|
|
agent_name, agency_name = _extract_broker(best)
|
|
ext_id = (
|
|
str(best.get("id") or best.get("reference") or best.get("listing_id") or "")
|
|
or _extract_id_from_url(url)
|
|
or ""
|
|
)
|
|
return ScrapedListing(
|
|
source=SOURCE,
|
|
external_id=ext_id,
|
|
url=url,
|
|
title=best.get("title") or best.get("name"),
|
|
price=price,
|
|
currency=currency,
|
|
permit_number=_find_permit_on_page(data) or _extract_permit(best),
|
|
agent_name=agent_name,
|
|
agency_name=agency_name,
|
|
is_active=True,
|
|
)
|
|
|
|
def get_permit(self, url: str) -> str | None:
|
|
"""Fetch a listing page and return only its DLD permit number (or None).
|
|
Used to compare candidates against our own permit during suggestions."""
|
|
try:
|
|
html = fetch_html(url)
|
|
except ScraperError as e:
|
|
logger.warning("PF get_permit fetch failed for %s: %s", url, e)
|
|
return None
|
|
data = extract_next_data(html)
|
|
return _find_permit_on_page(data) if data else None
|
|
|
|
def resolve_location_id(self, listing_url: str) -> int | None:
|
|
"""Read a PF listing page and return the most specific location id
|
|
(tower > building > subcommunity > community).
|
|
|
|
PF's search only filters by numeric location id (`l=`); the free-text
|
|
`q=` param does NOT scope results to a building — it returns unrelated
|
|
recommendations. So we derive the location id from a known listing that
|
|
sits in the same building (our own listing, or an already-tracked one).
|
|
"""
|
|
try:
|
|
html = fetch_html(listing_url)
|
|
except ScraperError as e:
|
|
logger.warning("PF resolve_location_id fetch failed for %s: %s", listing_url, e)
|
|
return None
|
|
data = extract_next_data(html)
|
|
if not data:
|
|
return None
|
|
|
|
best_id: object = None
|
|
best_rank = -1
|
|
for node in _walk(data):
|
|
if not isinstance(node, dict):
|
|
continue
|
|
rank = _LOC_TYPE_PRIORITY.get(str(node.get("type", "")).upper(), -1)
|
|
if rank > best_rank and node.get("id") and node.get("name"):
|
|
best_rank, best_id = rank, node.get("id")
|
|
try:
|
|
return int(best_id) if best_id is not None else None
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
def search_similar(
|
|
self,
|
|
building: str | None,
|
|
bedrooms: int | None,
|
|
deal_type: str,
|
|
limit: int = 200,
|
|
location_url: str | None = None,
|
|
max_pages: int = 8,
|
|
) -> list[ScrapedListing]:
|
|
"""Search PF for candidates in the same building, scoped by location id.
|
|
|
|
`location_url` is a reference listing in the target building (our own
|
|
listing or an already-tracked competitor) — we resolve it to a PF
|
|
location id and search by `l=`. Without it we can't reliably scope a
|
|
building search on PF, so we return nothing rather than garbage.
|
|
|
|
Paginates: a same-permit competitor can sit on any results page (PF
|
|
can't be queried by permit), so we collect across pages up to
|
|
`max_pages`/`limit`.
|
|
"""
|
|
location_id = self.resolve_location_id(location_url) if location_url else None
|
|
if location_id is None:
|
|
logger.info(
|
|
"PF search_similar: no location id (url=%r) — skipping (q= text search "
|
|
"does not filter by building on PF)", location_url,
|
|
)
|
|
return []
|
|
|
|
c = _category_for_deal(deal_type)
|
|
base = f"{BASE_URL}/en/search?c={c}&l={location_id}"
|
|
if bedrooms is not None:
|
|
base += f"&bf={bedrooms}&bt={bedrooms}" # PF uses bf=bedrooms-from, bt=bedrooms-to
|
|
|
|
results: list[ScrapedListing] = []
|
|
seen_ids: set[str] = set()
|
|
for page in range(1, max_pages + 1):
|
|
page_url = base if page == 1 else f"{base}&page={page}"
|
|
try:
|
|
html = fetch_html(page_url)
|
|
except ScraperError as e:
|
|
logger.warning("PF search failed (page %d): %s", page, e)
|
|
break
|
|
data = extract_next_data(html)
|
|
if not data:
|
|
break
|
|
|
|
new_on_page = 0
|
|
for node in _walk(data):
|
|
if not _is_listing_dict(node):
|
|
continue
|
|
ext_id = str(node.get("id") or node.get("reference") or "")
|
|
if not ext_id or ext_id in seen_ids:
|
|
continue
|
|
seen_ids.add(ext_id)
|
|
new_on_page += 1
|
|
|
|
# Results are scoped to the location by l=, so no title filter.
|
|
title = node.get("title") or node.get("name") or ""
|
|
price, currency = _extract_price(node)
|
|
agent_name, agency_name = _extract_broker(node)
|
|
share = node.get("share_url") or node.get("path")
|
|
cand_url = share if str(share).startswith("http") else urljoin(BASE_URL, str(share or ""))
|
|
|
|
results.append(
|
|
ScrapedListing(
|
|
source=SOURCE,
|
|
external_id=ext_id,
|
|
url=cand_url or page_url,
|
|
title=title or None,
|
|
price=price,
|
|
currency=currency,
|
|
permit_number=_extract_permit(node),
|
|
agent_name=agent_name,
|
|
agency_name=agency_name,
|
|
is_active=True,
|
|
)
|
|
)
|
|
if len(results) >= limit:
|
|
break
|
|
|
|
# No new listings on this page → we've passed the last page.
|
|
if len(results) >= limit or new_on_page == 0:
|
|
break
|
|
logger.info("PF search_similar: collected %d candidates (l=%s)", len(results), location_id)
|
|
return results
|