"""Core monitoring logic. Per project, for every CompetitorListing we already track: 1. Re-fetch the listing's URL. 2. Detect: - Price change → 📈📉 notify - URL returns 404 / removed → ❌ notify, mark removed - Reappeared after removal → ♻️ notify 3. Snapshot every price into PriceHistory. Adding new competitors is done via the web UI (user pastes URLs) — not here. """ from __future__ import annotations import logging from concurrent.futures import ThreadPoolExecutor from datetime import datetime from sqlalchemy.orm import Session from app.db import SessionLocal from app.models import ( CompetitorListing, Employee, ListingStatus, PriceHistory, Project, Source, ) from app.scrapers import BayutScraper, PropertyFinderScraper, ScrapedListing from app.services.notifier import notify_admin, send_message logger = logging.getLogger(__name__) PF = PropertyFinderScraper() BAYUT = BayutScraper() # Same-building suggestions beyond exact permit matches are a browse heuristic — # cap how many we show so the page stays usable. _SUGGEST_OTHERS_LIMIT = 30 _PERMIT_MISSING_DELETE_THRESHOLD = 3 # Bayut moved to fully client-side rendering (no __NEXT_DATA__, Algolia keys # hidden), so it can't be scraped over plain HTTP — disabled until we add a # headless-browser fetcher. Flip to True once that exists. BAYUT_ENABLED = False def _scraper_for(source: Source): return PF if source == Source.PROPERTYFINDER else BAYUT def detect_source_from_url(url: str) -> Source | None: u = (url or "").lower() if "propertyfinder" in u: return Source.PROPERTYFINDER if "bayut.com" in u: return Source.BAYUT return None def _is_supported_listing_url(source: Source, url: str) -> bool: if source == Source.PROPERTYFINDER: return PF.is_listing_url(url) return source == Source.BAYUT def _fmt_price(value: float | None, currency: str | None = "AED") -> str: if value is None: return "—" return f"{value:,.0f} {currency or 'AED'}".replace(",", " ") def _source_label(source: str) -> str: return {"propertyfinder": "PropertyFinder", "bayut": "Bayut"}.get(source, source) def _normalize_permit(value: str | None) -> str: return (value or "").strip().lower() def _listing_key(source: Source | str, external_id: str) -> tuple[str, str]: source_value = source.value if isinstance(source, Source) else str(source) return source_value, str(external_id or "") def _project_own_listing_key(project: Project) -> tuple[str, str] | None: if not project.our_url: return None source = detect_source_from_url(project.our_url) if source == Source.PROPERTYFINDER: listing_id = PF.listing_id_from_url(project.our_url) return _listing_key(source, listing_id) if listing_id else None return None def _is_own_listing(project: Project, item: ScrapedListing) -> bool: own_key = _project_own_listing_key(project) return bool(own_key and own_key == _listing_key(item.source, item.external_id)) def _format_listing_added(project: Project, listing: CompetitorListing, *, auto: bool) -> str: title = listing.title or "без названия" prefix = "✅ Автоматически добавлен конкурент" if auto else "✅ Добавлен конкурент" return ( f"{prefix} — {_source_label(listing.source.value)}\n" f"{title}\n" f"Цена: {_fmt_price(listing.current_price, listing.currency)}\n" f"Permit: {listing.permit_number or project.dld_permit or '—'}\n" f"{listing.url}" ) def _format_listing_removed(project: Project, listing: CompetitorListing, *, auto: bool) -> str: title = listing.title or "без названия" prefix = "🗑️ Автоматически удален конкурент" if auto else "🗑️ Удален конкурент" return ( f"{prefix} — {_source_label(listing.source.value)}\n" f"{title}\n" f"Последняя цена: {_fmt_price(listing.current_price, listing.currency)}\n" f"Permit: {listing.permit_number or project.dld_permit or '—'}\n" f"{listing.url}" ) def add_competitor_url(db: Session, project: Project, url: str) -> tuple[CompetitorListing | None, str]: """User-facing entrypoint: paste a URL → create CompetitorListing for the project. Returns (listing, error_message). error_message is empty on success. """ url = (url or "").strip() if not url: return None, "URL пустой" source = detect_source_from_url(url) if source is None: return None, "URL должен быть с propertyfinder.ae или bayut.com" if source == Source.BAYUT and not BAYUT_ENABLED: return None, ( "Bayut временно не поддерживается — площадка перешла на защищённый " "рендеринг. Добавляйте ссылки PropertyFinder." ) if not _is_supported_listing_url(source, url): return None, "Укажите ссылку на конкретное объявление, а не на страницу поиска" scraper = _scraper_for(source) scraped = scraper.fetch_listing(url) if scraped is None: return None, "Не удалось загрузить страницу — сайт мог заблокировать запрос, попробуйте позже" if not scraped.is_active: return None, "Страница объявления вернула 404 — ссылка битая или объявление снято" ext_id = scraped.external_id or url # fallback if id wasn't found existing = ( db.query(CompetitorListing) .filter( CompetitorListing.project_id == project.id, CompetitorListing.source == source, CompetitorListing.external_id == ext_id, ) .first() ) if existing: return None, "Это объявление уже добавлено в проект" if _is_own_listing(project, scraped): return None, "Это ссылка на наш объект, а не на конкурента" now = datetime.utcnow() listing = CompetitorListing( project_id=project.id, source=source, external_id=ext_id, url=url, title=scraped.title, agent_name=scraped.agent_name, agency_name=scraped.agency_name, permit_number=scraped.permit_number, auto_discovered=False, current_price=scraped.price, currency=scraped.currency or "AED", status=ListingStatus.ACTIVE, first_seen_at=now, last_seen_at=now, ) db.add(listing) db.flush() if scraped.price is not None: db.add(PriceHistory(listing_id=listing.id, price=scraped.price, recorded_at=now)) db.commit() return listing, "" def parse_our_listing_url(url: str) -> dict: """Parse our own PF listing for project metadata. Used by the Go API before project validation, so users can paste only the concrete object URL and let the service fill price/permit/building/area. """ url = (url or "").strip() if not url: raise ValueError("URL пустой") source = detect_source_from_url(url) if source is None: raise ValueError("URL должен быть с propertyfinder.ae или bayut.com") if source == Source.BAYUT and not BAYUT_ENABLED: raise ValueError( "Bayut временно не поддерживается — площадка перешла на защищённый " "рендеринг. Используйте ссылку PropertyFinder." ) if not _is_supported_listing_url(source, url): raise ValueError("Укажите ссылку на конкретное объявление, а не на страницу поиска") scraped = _scraper_for(source).fetch_listing(url) if scraped is None: raise ValueError("Не удалось загрузить страницу — сайт мог заблокировать запрос, попробуйте позже") if not scraped.is_active: raise ValueError("Страница объявления вернула 404 — ссылка битая или объявление снято") return { "title": scraped.title, "our_price": scraped.price, "dld_permit": scraped.permit_number, "building": scraped.building, "bedrooms": scraped.bedrooms, "size_sqft": scraped.size_sqft, "currency": scraped.currency or "AED", } def add_competitor_urls(db: Session, project: Project, urls: list[str]) -> dict: """Add several pasted/selected URLs in one go (used by the suggest page's multi-select). Processes them sequentially — each one re-fetches the page — and reports a summary. Returns {'added': int, 'skipped': int, 'errors': [..]}. """ added = 0 skipped = 0 errors: list[str] = [] seen: set[str] = set() for raw in urls: url = (raw or "").strip() if not url or url in seen: continue seen.add(url) listing, err = add_competitor_url(db, project, url) if err == "Это объявление уже добавлено в проект": skipped += 1 elif err: errors.append(err) else: added += 1 return {"added": added, "skipped": skipped, "errors": errors} def _create_listing_from_candidate( db: Session, project: Project, candidate: ScrapedListing, *, permit_number: str, ) -> CompetitorListing: now = datetime.utcnow() listing = CompetitorListing( project_id=project.id, source=Source(candidate.source), external_id=candidate.external_id or candidate.url, url=candidate.url, title=candidate.title, agent_name=candidate.agent_name, agency_name=candidate.agency_name, permit_number=permit_number, auto_discovered=True, current_price=candidate.price, currency=candidate.currency or "AED", status=ListingStatus.ACTIVE, first_seen_at=now, last_seen_at=now, ) db.add(listing) db.flush() if candidate.price is not None: db.add(PriceHistory(listing_id=listing.id, price=candidate.price, recorded_at=now)) return listing def _hide_tracked_suggestions( db: Session, project: Project, suggestions: dict[str, list[ScrapedListing]], ) -> dict[str, list[ScrapedListing]]: tracked = { _listing_key(l.source, l.external_id) for l in db.query(CompetitorListing).filter(CompetitorListing.project_id == project.id).all() } return { source: [item for item in items if _listing_key(item.source, item.external_id) not in tracked] for source, items in suggestions.items() } def sync_permit_competitors( db: Session, project: Project, *, count_missing: bool = True, ) -> tuple[list[str], dict[str, list[ScrapedListing]], str | None]: """Auto-maintain competitor listings with the same DLD permit. Exact-permit matches are added automatically. Previously auto-discovered exact-permit listings are deleted only after several consecutive permit searches miss them. Manual competitors are never auto-deleted. """ changes: list[str] = [] our_permit = resolve_our_permit(project) if not our_permit: return changes, {"propertyfinder": [], "bayut": []}, None normalized_permit = _normalize_permit(our_permit) suggestions = suggest_similar(project, our_permit=our_permit, include_tracked=True) matches = [ item for item in suggestions["propertyfinder"] if _normalize_permit(item.permit_number) == normalized_permit and not _is_own_listing(project, item) ] matched_keys = {_listing_key(item.source, item.external_id) for item in matches} existing = { _listing_key(item.source, item.external_id): item for item in db.query(CompetitorListing).filter(CompetitorListing.project_id == project.id).all() } for item in matches: key = _listing_key(item.source, item.external_id) listing = existing.get(key) if listing: listing.permit_number = item.permit_number or our_permit listing.permit_missing_checks = 0 if item.title: listing.title = item.title if item.agent_name: listing.agent_name = item.agent_name if item.agency_name: listing.agency_name = item.agency_name continue listing = _create_listing_from_candidate(db, project, item, permit_number=item.permit_number or our_permit) existing[key] = listing changes.append(_format_listing_added(project, listing, auto=True)) for listing in list(existing.values()): if not listing.auto_discovered: continue if _normalize_permit(listing.permit_number) != normalized_permit: continue if _listing_key(listing.source, listing.external_id) in matched_keys: continue if not count_missing: continue listing.permit_missing_checks = (listing.permit_missing_checks or 0) + 1 if listing.permit_missing_checks < _PERMIT_MISSING_DELETE_THRESHOLD: continue changes.append(_format_listing_removed(project, listing, auto=True)) db.delete(listing) return changes, _hide_tracked_suggestions(db, project, suggestions), our_permit def check_project(db: Session, project: Project) -> list[str]: """Re-scan all tracked competitor listings for one project. Returns notification texts.""" changes: list[str] = [] now = datetime.utcnow() changes.extend(refresh_our_listing(db, project, now=now)) sync_changes, _, _ = sync_permit_competitors(db, project) changes.extend(sync_changes) listings = db.query(CompetitorListing).filter(CompetitorListing.project_id == project.id).all() for listing in listings: scraper = _scraper_for(listing.source) scraped = scraper.fetch_listing(listing.url) if scraped is None: # Network/parse failure — skip without changing state, try again next cycle. continue if not scraped.is_active: if listing.status == ListingStatus.ACTIVE: listing.status = ListingStatus.REMOVED changes.append( f"❌ Объявление удалено — {_source_label(listing.source.value)}\n" f"{listing.title or 'без названия'}\n" f"Последняя цена: {_fmt_price(listing.current_price, listing.currency)}\n" f"{listing.url}" ) continue listing.last_seen_at = now if listing.status != ListingStatus.ACTIVE: listing.status = ListingStatus.ACTIVE changes.append( f"♻️ Объявление снова активно — {_source_label(listing.source.value)}\n" f"{listing.title or 'без названия'}\n{listing.url}" ) # Update metadata that may have changed if scraped.title: listing.title = scraped.title if scraped.agent_name: listing.agent_name = scraped.agent_name if scraped.agency_name: listing.agency_name = scraped.agency_name old_price = listing.current_price new_price = scraped.price if new_price is not None and old_price is not None and new_price != old_price: delta = new_price - old_price pct = (delta / old_price * 100.0) if old_price else 0.0 arrow = "📈" if delta > 0 else "📉" changes.append( f"{arrow} Цена изменилась — {_source_label(listing.source.value)}\n" f"{listing.title or 'без названия'}\n" f"Было: {_fmt_price(old_price, listing.currency)}\n" f"Стало: {_fmt_price(new_price, scraped.currency or listing.currency)} " f"({'+' if delta > 0 else ''}{delta:,.0f} / {pct:+.1f}%)\n" f"{listing.url}".replace(",", " ") ) listing.current_price = new_price listing.currency = scraped.currency or listing.currency db.add(PriceHistory(listing_id=listing.id, price=new_price, recorded_at=now)) elif new_price is not None and old_price is None: listing.current_price = new_price listing.currency = scraped.currency or listing.currency db.add(PriceHistory(listing_id=listing.id, price=new_price, recorded_at=now)) project.last_checked_at = now db.commit() return changes def refresh_our_listing(db: Session, project: Project, *, now: datetime | None = None) -> list[str]: """Parse our own listing and keep project.our_price in sync. This never creates a competitor listing. It only updates project metadata from the concrete `our_url`, so PF search pages are ignored. """ url = (project.our_url or "").strip() if not url: return [] source = detect_source_from_url(url) if source is None or source == Source.BAYUT and not BAYUT_ENABLED: return [] if not _is_supported_listing_url(source, url): logger.warning("Project %s has non-listing our_url: %s", project.id, url) return [] scraped = _scraper_for(source).fetch_listing(url) if scraped is None or not scraped.is_active: return [] changed: list[str] = [] if scraped.permit_number and not project.dld_permit: project.dld_permit = scraped.permit_number if scraped.building and not project.building: project.building = scraped.building if scraped.bedrooms is not None and project.bedrooms is None: project.bedrooms = scraped.bedrooms if scraped.size_sqft is not None and project.size_sqft is None: project.size_sqft = scraped.size_sqft old_price = project.our_price new_price = scraped.price if new_price is not None and old_price != new_price: project.our_price = new_price if old_price is not None: delta = new_price - old_price pct = (delta / old_price * 100.0) if old_price else 0.0 arrow = "📈" if delta > 0 else "📉" changed.append( f"{arrow} Наша цена скорректирована — {_source_label(source.value)}\n" f"{project.title}\n" f"Было: {_fmt_price(old_price)}\n" f"Стало: {_fmt_price(new_price, scraped.currency or 'AED')} " f"({'+' if delta > 0 else ''}{delta:,.0f} / {pct:+.1f}%)\n" f"{url}".replace(",", " ") ) if now is not None: project.last_checked_at = now db.flush() return changed def _notify_owner(project: Project, changes: list[str]) -> None: if not changes: return owner: Employee | None = project.owner if not owner or not owner.tg_chat_id: logger.warning("Project %s has no owner chat_id; skipping notification", project.id) notify_admin( f"⚠️ Проект {project.title} (#{project.id}) — {len(changes)} изменений, " f"но у владельца не задан tg_chat_id." ) return header = ( f"🏠 {project.title}\n" f"Тип: {project.deal_type.value} · Изменений: {len(changes)}\n" f"——————————" ) send_message(owner.tg_chat_id, header) for c in changes: send_message(owner.tg_chat_id, c) def notify_project_changes(project: Project, changes: list[str]) -> None: _notify_owner(project, changes) def _reference_url(project: Project, source: Source) -> str | None: """A known listing URL in the project's building for the given source. Portals (PF) scope a building search by an internal numeric location id, not by free text — so we hand the scraper a real listing in the same building (our own `our_url`, else an already-tracked competitor) to resolve that id. """ candidates: list[str] = [] if project.our_url: candidates.append(project.our_url) candidates.extend(l.url for l in project.listings if l.source == source) for url in candidates: if detect_source_from_url(url) == source: return url return None def resolve_our_permit(project: Project) -> str | None: """Our project's DLD permit number. Prefer the value the user typed; else read it off our own listing (PF exposes the number in __NEXT_DATA__).""" if project.dld_permit and project.dld_permit.strip(): return project.dld_permit.strip() ref = _reference_url(project, Source.PROPERTYFINDER) return PF.get_permit(ref) if ref else None def suggest_similar( project: Project, our_permit: str | None = None, *, include_tracked: bool = False, ) -> dict[str, list[ScrapedListing]]: """Search PF + Bayut for listings in this project's building. Candidates that share our DLD permit are the same physical listing under a different broker (rare, but it happens) — those are surfaced first. The rest are same-building heuristics. Returns {'propertyfinder': [...], 'bayut': [...]}. """ out: dict[str, list[ScrapedListing]] = {"propertyfinder": [], "bayut": []} bedrooms = project.bedrooms deal_type = project.deal_type.value try: out["propertyfinder"] = PF.search_similar( project.building, bedrooms, deal_type, location_url=_reference_url(project, Source.PROPERTYFINDER), ) except Exception as e: logger.exception("PF suggest failed: %s", e) if BAYUT_ENABLED: try: out["bayut"] = BAYUT.search_similar( project.building, bedrooms, deal_type, location_url=_reference_url(project, Source.BAYUT), ) except Exception as e: logger.exception("Bayut suggest failed: %s", e) # Hide our own listing. Tracked competitors are hidden for UI suggestions, # but included for automatic permit synchronization. excluded: set[tuple[str, str]] = set() if not include_tracked: excluded.update((l.source.value, l.external_id) for l in project.listings) if project.our_url: own_src = detect_source_from_url(project.our_url) if own_src == Source.PROPERTYFINDER: listing_id = PF.listing_id_from_url(project.our_url) if listing_id: excluded.add((own_src.value, listing_id)) for src in out: out[src] = [c for c in out[src] if (src, c.external_id) not in excluded] # Permit-first: PF can't be queried by permit and search results don't carry # it — so read each PF candidate's permit (concurrently) and put the ones # matching ours first. Keep all matches; cap the same-building remainder. if our_permit and out["propertyfinder"]: pf = out["propertyfinder"] try: with ThreadPoolExecutor(max_workers=12) as ex: permits = list(ex.map(PF.get_permit, [c.url for c in pf])) for cand, permit in zip(pf, permits): cand.permit_number = permit normalized = _normalize_permit(our_permit) matches = [c for c in pf if _normalize_permit(c.permit_number) == normalized] others = [c for c in pf if _normalize_permit(c.permit_number) != normalized] out["propertyfinder"] = matches + others[:_SUGGEST_OTHERS_LIMIT] except Exception as e: logger.exception("PF permit enrichment failed: %s", e) return out def run_check_for_project(project_id: int) -> int: db = SessionLocal() try: project = db.get(Project, project_id) if not project: return 0 changes = check_project(db, project) _notify_owner(project, changes) return len(changes) finally: db.close() def run_check_all() -> dict[int, int]: db = SessionLocal() try: ids = [p.id for p in db.query(Project).all()] finally: db.close() summary = {} for pid in ids: try: summary[pid] = run_check_for_project(pid) except Exception as e: logger.exception("Check for project %s failed: %s", pid, e) summary[pid] = -1 return summary