Files
monitoring-pf/app/scrapers/propertyfinder.py
Grendgi 6750722429
All checks were successful
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 38s
CI / go (push) Successful in 26s
CI / python (push) Successful in 16s
feat: parse project metadata from PF links
2026-06-24 14:03:25 +03:00

460 lines
16 KiB
Python

"""PropertyFinder.ae scraper.
Two operations:
- fetch_listing(url): read a listing detail page → ScrapedListing (title/price/agent/permit).
- search_similar(building, bedrooms, deal_type): search PF for similar candidates
by building name + bedrooms filter → list[ScrapedListing].
PF is a Next.js app — listing data sits in <script id="__NEXT_DATA__">.
Note: PF intentionally hides the Trakheesi permit as an image on the detail page,
so permit may come back as None — that's fine, we don't depend on it.
"""
from __future__ import annotations
import logging
import re
from urllib.parse import urljoin
from app.scrapers.base import (
ScrapedListing,
ScraperError,
extract_next_data,
fetch_html,
parse_price,
)
logger = logging.getLogger(__name__)
BASE_URL = "https://www.propertyfinder.ae"
SOURCE = "propertyfinder"
# PF location hierarchy, most specific first. search_similar scopes by the most
# specific id available on a reference listing page.
_LOC_TYPE_PRIORITY = {
"TOWER": 5,
"BUILDING": 4,
"DEVELOPMENT": 3,
"SUBCOMMUNITY": 2,
"COMMUNITY": 1,
"CITY": 0,
}
def _category_for_deal(deal_type: str) -> int:
return 1 if deal_type == "sale" else 2
def _get(d, *keys, default=None):
cur = d
for k in keys:
if not isinstance(cur, dict):
return default
cur = cur.get(k)
if cur is None:
return default
return cur
def _walk(node):
"""Iterate over every dict in a nested JSON structure."""
if isinstance(node, dict):
yield node
for v in node.values():
yield from _walk(v)
elif isinstance(node, list):
for it in node:
yield from _walk(it)
def _extract_price(item: dict) -> tuple[float | None, str | None]:
price = item.get("price")
if isinstance(price, dict):
val = price.get("value") or price.get("amount") or price.get("min") or price.get("from")
cur = price.get("currency") or "AED"
return parse_price(val), cur
if isinstance(price, (int, float, str)):
return parse_price(price), item.get("currency") or "AED"
return None, "AED"
def _extract_broker(item: dict) -> tuple[str | None, str | None]:
broker = item.get("broker") or item.get("agency") or {}
agent = item.get("agent") or item.get("contact") or {}
agency_name = broker.get("name") if isinstance(broker, dict) else None
agent_name = agent.get("name") if isinstance(agent, dict) else None
return agent_name, agency_name
def _extract_permit(item: dict) -> str | None:
for key in ("permit_number", "permitNumber", "trakheesi", "rera", "permit"):
v = item.get(key)
if v:
return str(v).strip()
reg = item.get("regulatory") or item.get("regulation") or {}
if isinstance(reg, dict):
for key in ("permit", "permit_number", "trakheesi", "rera"):
v = reg.get(key)
if v:
return str(v).strip()
return None
def _parse_int(value) -> int | None:
if value is None:
return None
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
return int(value)
text = str(value).strip().lower()
if text in {"studio", "студия"}:
return 0
m = re.search(r"\d+", text)
return int(m.group(0)) if m else None
def _extract_bedrooms(item: dict) -> int | None:
for key in ("bedrooms", "bedroom", "beds", "rooms", "bedroom_count", "bedrooms_count"):
value = item.get(key)
if isinstance(value, dict):
value = value.get("value") or value.get("count") or value.get("name")
parsed = _parse_int(value)
if parsed is not None:
return parsed
for node in _walk(item):
if not isinstance(node, dict):
continue
name = str(node.get("name") or node.get("label") or node.get("key") or "").lower()
if "bed" not in name and "спал" not in name:
continue
parsed = _parse_int(node.get("value") or node.get("count") or node.get("text"))
if parsed is not None:
return parsed
return None
def _area_to_sqft(value, unit: str | None = None) -> float | None:
parsed = parse_price(value)
if parsed is None:
return None
unit_text = (unit or "").lower()
if any(token in unit_text for token in ("sqm", "sq m", "m2", "", "метр")):
return round(parsed * 10.7639, 2)
return parsed
def _extract_size_sqft(item: dict) -> float | None:
for key in ("size", "area", "property_size", "built_up_area", "builtup_area", "plot_area"):
value = item.get(key)
unit = None
if isinstance(value, dict):
unit = value.get("unit") or value.get("unit_label") or value.get("unitLabel")
value = value.get("value") or value.get("amount") or value.get("text")
parsed = _area_to_sqft(value, unit)
if parsed is not None:
return parsed
for node in _walk(item):
if not isinstance(node, dict):
continue
name = str(node.get("name") or node.get("label") or node.get("key") or "").lower()
if not any(token in name for token in ("size", "area", "sqft", "sq ft", "площад")):
continue
parsed = _area_to_sqft(
node.get("value") or node.get("amount") or node.get("text"),
str(node.get("unit") or node.get("unit_label") or ""),
)
if parsed is not None:
return parsed
return None
def _location_candidate(node: dict) -> tuple[int, str] | None:
rank = _LOC_TYPE_PRIORITY.get(str(node.get("type", "")).upper(), -1)
name = str(node.get("name") or "").strip()
if rank < 0 or not name:
return None
return rank, name
def _extract_building_from(node) -> str | None:
best_name: str | None = None
best_rank = -1
for item in _walk(node):
if not isinstance(item, dict):
continue
candidate = _location_candidate(item)
if not candidate:
continue
rank, name = candidate
if rank > best_rank:
best_rank, best_name = rank, name
return best_name
def _extract_building(data: dict, item: dict) -> str | None:
for key in ("location", "location_tree", "locations", "locationTree", "community"):
value = item.get(key)
if value:
building = _extract_building_from(value)
if building:
return building
return _extract_building_from(data)
def _find_permit_on_page(data: dict) -> str | None:
"""The DLD permit number lives in a regulatory block rendered as an image,
but its plain value is still in __NEXT_DATA__: the dict that carries a
`permit_validation_url` (the Trakheesi link) also has the number in
`number`. Walk the page and pull it out."""
for node in _walk(data):
if isinstance(node, dict) and node.get("permit_validation_url") and node.get("number"):
return str(node["number"]).strip()
return None
_ID_FROM_URL = re.compile(r"-(\d+)\.html(?:[?#].*)?$")
def _extract_id_from_url(url: str) -> str | None:
m = _ID_FROM_URL.search(url)
return m.group(1) if m else None
def is_listing_url(url: str) -> bool:
"""True only for a concrete PF listing URL.
PF search pages also contain listing-like JSON. Treating them as a detail
page can bind monitoring to a random result, so callers must reject them.
"""
return bool(_extract_id_from_url(url or ""))
def _is_listing_dict(item: dict) -> bool:
"""Heuristic: a listing dict contains a price plus an id-like field."""
if not isinstance(item, dict):
return False
has_price = "price" in item
has_id = any(k in item for k in ("id", "reference", "listing_id", "externalID"))
return has_price and has_id
class PropertyFinderScraper:
source = SOURCE
def is_listing_url(self, url: str) -> bool:
return is_listing_url(url)
def listing_id_from_url(self, url: str) -> str | None:
return _extract_id_from_url(url)
def fetch_listing(self, url: str) -> ScrapedListing | None:
"""Refetch a known listing URL. Returns:
- ScrapedListing(is_active=False) if the URL returns 404 (listing removed)
- ScrapedListing with current data if alive
- None on network/parse failure (we won't update the DB in that case)
"""
if not is_listing_url(url):
logger.warning("PF fetch_listing rejected non-listing URL: %s", url)
return None
try:
html = fetch_html(url)
except ScraperError as e:
logger.warning("PF refetch failed for %s: %s", url, e)
return None
if not html:
return ScrapedListing(
source=SOURCE, external_id=_extract_id_from_url(url) or "", url=url,
title=None, price=None, currency=None, permit_number=None,
agent_name=None, agency_name=None, is_active=False,
)
data = extract_next_data(html)
if not data:
return None
# On a PF detail page the property dict is nested in pageProps. Walk and pick
# the dict that has both a "price" and an id, ignoring trivial nested ones.
best = None
best_score = -1
for node in _walk(data):
if not _is_listing_dict(node):
continue
score = 0
if "title" in node or "name" in node:
score += 2
if any(k in node for k in ("broker", "agent", "agency")):
score += 2
if "bedrooms" in node or "rooms" in node:
score += 1
if score > best_score:
best_score = score
best = node
if best is None:
logger.warning("PF: no listing dict found in __NEXT_DATA__ for %s", url)
return None
price, currency = _extract_price(best)
agent_name, agency_name = _extract_broker(best)
ext_id = (
str(best.get("id") or best.get("reference") or best.get("listing_id") or "")
or _extract_id_from_url(url)
or ""
)
return ScrapedListing(
source=SOURCE,
external_id=ext_id,
url=url,
title=best.get("title") or best.get("name"),
price=price,
currency=currency,
permit_number=_find_permit_on_page(data) or _extract_permit(best),
agent_name=agent_name,
agency_name=agency_name,
building=_extract_building(data, best),
bedrooms=_extract_bedrooms(best),
size_sqft=_extract_size_sqft(best),
is_active=True,
)
def get_permit(self, url: str) -> str | None:
"""Fetch a listing page and return only its DLD permit number (or None).
Used to compare candidates against our own permit during suggestions."""
if not is_listing_url(url):
logger.warning("PF get_permit rejected non-listing URL: %s", url)
return None
try:
html = fetch_html(url)
except ScraperError as e:
logger.warning("PF get_permit fetch failed for %s: %s", url, e)
return None
data = extract_next_data(html)
return _find_permit_on_page(data) if data else None
def resolve_location_id(self, listing_url: str) -> int | None:
"""Read a PF listing page and return the most specific location id
(tower > building > subcommunity > community).
PF's search only filters by numeric location id (`l=`); the free-text
`q=` param does NOT scope results to a building — it returns unrelated
recommendations. So we derive the location id from a known listing that
sits in the same building (our own listing, or an already-tracked one).
"""
try:
html = fetch_html(listing_url)
except ScraperError as e:
logger.warning("PF resolve_location_id fetch failed for %s: %s", listing_url, e)
return None
data = extract_next_data(html)
if not data:
return None
best_id: object = None
best_rank = -1
for node in _walk(data):
if not isinstance(node, dict):
continue
rank = _LOC_TYPE_PRIORITY.get(str(node.get("type", "")).upper(), -1)
if rank > best_rank and node.get("id") and node.get("name"):
best_rank, best_id = rank, node.get("id")
try:
return int(best_id) if best_id is not None else None
except (TypeError, ValueError):
return None
def search_similar(
self,
building: str | None,
bedrooms: int | None,
deal_type: str,
limit: int = 200,
location_url: str | None = None,
max_pages: int = 8,
) -> list[ScrapedListing]:
"""Search PF for candidates in the same building, scoped by location id.
`location_url` is a reference listing in the target building (our own
listing or an already-tracked competitor) — we resolve it to a PF
location id and search by `l=`. Without it we can't reliably scope a
building search on PF, so we return nothing rather than garbage.
Paginates: a same-permit competitor can sit on any results page (PF
can't be queried by permit), so we collect across pages up to
`max_pages`/`limit`.
"""
location_id = self.resolve_location_id(location_url) if location_url else None
if location_id is None:
logger.info(
"PF search_similar: no location id (url=%r) — skipping (q= text search "
"does not filter by building on PF)", location_url,
)
return []
c = _category_for_deal(deal_type)
base = f"{BASE_URL}/en/search?c={c}&l={location_id}"
if bedrooms is not None:
base += f"&bf={bedrooms}&bt={bedrooms}" # PF uses bf=bedrooms-from, bt=bedrooms-to
results: list[ScrapedListing] = []
seen_ids: set[str] = set()
for page in range(1, max_pages + 1):
page_url = base if page == 1 else f"{base}&page={page}"
try:
html = fetch_html(page_url)
except ScraperError as e:
logger.warning("PF search failed (page %d): %s", page, e)
break
data = extract_next_data(html)
if not data:
break
new_on_page = 0
for node in _walk(data):
if not _is_listing_dict(node):
continue
ext_id = str(node.get("id") or node.get("reference") or "")
if not ext_id or ext_id in seen_ids:
continue
seen_ids.add(ext_id)
new_on_page += 1
# Results are scoped to the location by l=, so no title filter.
title = node.get("title") or node.get("name") or ""
price, currency = _extract_price(node)
agent_name, agency_name = _extract_broker(node)
share = node.get("share_url") or node.get("path")
cand_url = share if str(share).startswith("http") else urljoin(BASE_URL, str(share or ""))
if not is_listing_url(cand_url):
continue
results.append(
ScrapedListing(
source=SOURCE,
external_id=ext_id,
url=cand_url or page_url,
title=title or None,
price=price,
currency=currency,
permit_number=_extract_permit(node),
agent_name=agent_name,
agency_name=agency_name,
building=_extract_building(data, node),
bedrooms=_extract_bedrooms(node),
size_sqft=_extract_size_sqft(node),
is_active=True,
)
)
if len(results) >= limit:
break
# No new listings on this page → we've passed the last page.
if len(results) >= limit or new_on_page == 0:
break
logger.info("PF search_similar: collected %d candidates (l=%s)", len(results), location_id)
return results