"""Core monitoring logic. Per project, for every CompetitorListing we already track: 1. Re-fetch the listing's URL. 2. Detect: - Price change → 📈📉 notify - URL returns 404 / removed → ❌ notify, mark removed - Reappeared after removal → ♻️ notify 3. Snapshot every price into PriceHistory. Adding new competitors is done via the web UI (user pastes URLs) — not here. """ from __future__ import annotations import logging import re from concurrent.futures import ThreadPoolExecutor from datetime import datetime from sqlalchemy.orm import Session from app.db import SessionLocal from app.models import ( CompetitorListing, Employee, ListingStatus, PriceHistory, Project, Source, ) from app.scrapers import BayutScraper, PropertyFinderScraper, ScrapedListing from app.services.notifier import notify_admin, send_message logger = logging.getLogger(__name__) PF = PropertyFinderScraper() BAYUT = BayutScraper() # Same-building suggestions beyond exact permit matches are a browse heuristic — # cap how many we show so the page stays usable. _SUGGEST_OTHERS_LIMIT = 30 # Bayut moved to fully client-side rendering (no __NEXT_DATA__, Algolia keys # hidden), so it can't be scraped over plain HTTP — disabled until we add a # headless-browser fetcher. Flip to True once that exists. BAYUT_ENABLED = False def _scraper_for(source: Source): return PF if source == Source.PROPERTYFINDER else BAYUT def detect_source_from_url(url: str) -> Source | None: u = (url or "").lower() if "propertyfinder" in u: return Source.PROPERTYFINDER if "bayut.com" in u: return Source.BAYUT return None def _fmt_price(value: float | None, currency: str | None = "AED") -> str: if value is None: return "—" return f"{value:,.0f} {currency or 'AED'}".replace(",", " ") def _source_label(source: str) -> str: return {"propertyfinder": "PropertyFinder", "bayut": "Bayut"}.get(source, source) def add_competitor_url(db: Session, project: Project, url: str) -> tuple[CompetitorListing | None, str]: """User-facing entrypoint: paste a URL → create CompetitorListing for the project. Returns (listing, error_message). error_message is empty on success. """ url = (url or "").strip() if not url: return None, "URL пустой" source = detect_source_from_url(url) if source is None: return None, "URL должен быть с propertyfinder.ae или bayut.com" if source == Source.BAYUT and not BAYUT_ENABLED: return None, ( "Bayut временно не поддерживается — площадка перешла на защищённый " "рендеринг. Добавляйте ссылки PropertyFinder." ) scraper = _scraper_for(source) scraped = scraper.fetch_listing(url) if scraped is None: return None, "Не удалось загрузить страницу — сайт мог заблокировать запрос, попробуйте позже" if not scraped.is_active: return None, "Страница объявления вернула 404 — ссылка битая или объявление снято" ext_id = scraped.external_id or url # fallback if id wasn't found existing = ( db.query(CompetitorListing) .filter( CompetitorListing.project_id == project.id, CompetitorListing.source == source, CompetitorListing.external_id == ext_id, ) .first() ) if existing: return None, "Это объявление уже добавлено в проект" now = datetime.utcnow() listing = CompetitorListing( project_id=project.id, source=source, external_id=ext_id, url=url, title=scraped.title, agent_name=scraped.agent_name, agency_name=scraped.agency_name, current_price=scraped.price, currency=scraped.currency or "AED", status=ListingStatus.ACTIVE, first_seen_at=now, last_seen_at=now, ) db.add(listing) db.flush() if scraped.price is not None: db.add(PriceHistory(listing_id=listing.id, price=scraped.price, recorded_at=now)) db.commit() return listing, "" def add_competitor_urls(db: Session, project: Project, urls: list[str]) -> dict: """Add several pasted/selected URLs in one go (used by the suggest page's multi-select). Processes them sequentially — each one re-fetches the page — and reports a summary. Returns {'added': int, 'skipped': int, 'errors': [..]}. """ added = 0 skipped = 0 errors: list[str] = [] seen: set[str] = set() for raw in urls: url = (raw or "").strip() if not url or url in seen: continue seen.add(url) listing, err = add_competitor_url(db, project, url) if err == "Это объявление уже добавлено в проект": skipped += 1 elif err: errors.append(err) else: added += 1 return {"added": added, "skipped": skipped, "errors": errors} def check_project(db: Session, project: Project) -> list[str]: """Re-scan all tracked competitor listings for one project. Returns notification texts.""" changes: list[str] = [] now = datetime.utcnow() for listing in list(project.listings): scraper = _scraper_for(listing.source) scraped = scraper.fetch_listing(listing.url) if scraped is None: # Network/parse failure — skip without changing state, try again next cycle. continue if not scraped.is_active: if listing.status == ListingStatus.ACTIVE: listing.status = ListingStatus.REMOVED changes.append( f"❌ Объявление удалено — {_source_label(listing.source.value)}\n" f"{listing.title or 'без названия'}\n" f"Последняя цена: {_fmt_price(listing.current_price, listing.currency)}\n" f"{listing.url}" ) continue listing.last_seen_at = now if listing.status != ListingStatus.ACTIVE: listing.status = ListingStatus.ACTIVE changes.append( f"♻️ Объявление снова активно — {_source_label(listing.source.value)}\n" f"{listing.title or 'без названия'}\n{listing.url}" ) # Update metadata that may have changed if scraped.title: listing.title = scraped.title if scraped.agent_name: listing.agent_name = scraped.agent_name if scraped.agency_name: listing.agency_name = scraped.agency_name old_price = listing.current_price new_price = scraped.price if new_price is not None and old_price is not None and new_price != old_price: delta = new_price - old_price pct = (delta / old_price * 100.0) if old_price else 0.0 arrow = "📈" if delta > 0 else "📉" changes.append( f"{arrow} Цена изменилась — {_source_label(listing.source.value)}\n" f"{listing.title or 'без названия'}\n" f"Было: {_fmt_price(old_price, listing.currency)}\n" f"Стало: {_fmt_price(new_price, scraped.currency or listing.currency)} " f"({'+' if delta > 0 else ''}{delta:,.0f} / {pct:+.1f}%)\n" f"{listing.url}".replace(",", " ") ) listing.current_price = new_price listing.currency = scraped.currency or listing.currency db.add(PriceHistory(listing_id=listing.id, price=new_price, recorded_at=now)) elif new_price is not None and old_price is None: listing.current_price = new_price listing.currency = scraped.currency or listing.currency db.add(PriceHistory(listing_id=listing.id, price=new_price, recorded_at=now)) project.last_checked_at = now db.commit() return changes def _notify_owner(project: Project, changes: list[str]) -> None: if not changes: return owner: Employee | None = project.owner if not owner or not owner.tg_chat_id: logger.warning("Project %s has no owner chat_id; skipping notification", project.id) notify_admin( f"⚠️ Проект {project.title} (#{project.id}) — {len(changes)} изменений, " f"но у владельца не задан tg_chat_id." ) return header = ( f"🏠 {project.title}\n" f"Тип: {project.deal_type.value} · Изменений: {len(changes)}\n" f"——————————" ) send_message(owner.tg_chat_id, header) for c in changes: send_message(owner.tg_chat_id, c) def _reference_url(project: Project, source: Source) -> str | None: """A known listing URL in the project's building for the given source. Portals (PF) scope a building search by an internal numeric location id, not by free text — so we hand the scraper a real listing in the same building (our own `our_url`, else an already-tracked competitor) to resolve that id. """ candidates: list[str] = [] if project.our_url: candidates.append(project.our_url) candidates.extend(l.url for l in project.listings if l.source == source) for url in candidates: if detect_source_from_url(url) == source: return url return None def resolve_our_permit(project: Project) -> str | None: """Our project's DLD permit number. Prefer the value the user typed; else read it off our own listing (PF exposes the number in __NEXT_DATA__).""" if project.dld_permit and project.dld_permit.strip(): return project.dld_permit.strip() ref = _reference_url(project, Source.PROPERTYFINDER) return PF.get_permit(ref) if ref else None def suggest_similar(project: Project, our_permit: str | None = None) -> dict[str, list[ScrapedListing]]: """Search PF + Bayut for listings in this project's building. Candidates that share our DLD permit are the same physical listing under a different broker (rare, but it happens) — those are surfaced first. The rest are same-building heuristics. Returns {'propertyfinder': [...], 'bayut': [...]}. """ out: dict[str, list[ScrapedListing]] = {"propertyfinder": [], "bayut": []} bedrooms = project.bedrooms deal_type = project.deal_type.value try: out["propertyfinder"] = PF.search_similar( project.building, bedrooms, deal_type, location_url=_reference_url(project, Source.PROPERTYFINDER), ) except Exception as e: logger.exception("PF suggest failed: %s", e) if BAYUT_ENABLED: try: out["bayut"] = BAYUT.search_similar( project.building, bedrooms, deal_type, location_url=_reference_url(project, Source.BAYUT), ) except Exception as e: logger.exception("Bayut suggest failed: %s", e) # Hide candidates already tracked for this project — and our own listing. excluded = {(l.source.value, l.external_id) for l in project.listings} if project.our_url: own_src = detect_source_from_url(project.our_url) m = re.search(r"(\d+)\.html", project.our_url) if own_src and m: excluded.add((own_src.value, m.group(1))) for src in out: out[src] = [c for c in out[src] if (src, c.external_id) not in excluded] # Permit-first: PF can't be queried by permit and search results don't carry # it — so read each PF candidate's permit (concurrently) and put the ones # matching ours first. Keep all matches; cap the same-building remainder. if our_permit and out["propertyfinder"]: pf = out["propertyfinder"] try: with ThreadPoolExecutor(max_workers=12) as ex: permits = list(ex.map(PF.get_permit, [c.url for c in pf])) for cand, permit in zip(pf, permits): cand.permit_number = permit matches = [c for c in pf if c.permit_number == our_permit] others = [c for c in pf if c.permit_number != our_permit] out["propertyfinder"] = matches + others[:_SUGGEST_OTHERS_LIMIT] except Exception as e: logger.exception("PF permit enrichment failed: %s", e) return out def run_check_for_project(project_id: int) -> int: db = SessionLocal() try: project = db.get(Project, project_id) if not project: return 0 changes = check_project(db, project) _notify_owner(project, changes) return len(changes) finally: db.close() def run_check_all() -> dict[int, int]: db = SessionLocal() try: ids = [p.id for p in db.query(Project).all()] finally: db.close() summary = {} for pid in ids: try: summary[pid] = run_check_for_project(pid) except Exception as e: logger.exception("Check for project %s failed: %s", pid, e) summary[pid] = -1 return summary