From 6966e6810c1f2299c0f9ae8218b458542fc35efe Mon Sep 17 00:00:00 2001 From: Grendgi Date: Fri, 5 Jun 2026 12:09:51 +0300 Subject: [PATCH] Auto-sync permit competitors in monitoring PF --- app/db.py | 13 ++++ app/models.py | 4 +- app/services/monitor.py | 163 +++++++++++++++++++++++++++++++++++++++- app/worker.py | 35 +++++++-- internal/pf/db.go | 68 +++++++++++++---- internal/pf/http.go | 34 ++++++++- internal/pf/store.go | 60 ++++++++++----- 7 files changed, 333 insertions(+), 44 deletions(-) diff --git a/app/db.py b/app/db.py index 9fa756e..4211645 100644 --- a/app/db.py +++ b/app/db.py @@ -30,6 +30,7 @@ def init_db(): Base.metadata.create_all(bind=engine) _migrate_employees_portal_user_id() + _migrate_competitor_listings_auto_fields() def _migrate_employees_portal_user_id() -> None: @@ -43,3 +44,15 @@ def _migrate_employees_portal_user_id() -> None: conn.execute( text("CREATE UNIQUE INDEX IF NOT EXISTS ix_employees_portal_user_id ON employees (portal_user_id)") ) + + +def _migrate_competitor_listings_auto_fields() -> None: + inspector = inspect(engine) + if "competitor_listings" not in inspector.get_table_names(): + return + columns = {col["name"] for col in inspector.get_columns("competitor_listings")} + with engine.begin() as conn: + if "permit_number" not in columns: + conn.execute(text("ALTER TABLE competitor_listings ADD COLUMN permit_number VARCHAR(100)")) + if "auto_discovered" not in columns: + conn.execute(text("ALTER TABLE competitor_listings ADD COLUMN auto_discovered BOOLEAN NOT NULL DEFAULT 0")) diff --git a/app/models.py b/app/models.py index 1371a61..0a55d45 100644 --- a/app/models.py +++ b/app/models.py @@ -1,7 +1,7 @@ from datetime import datetime from enum import Enum -from sqlalchemy import DateTime, Enum as SAEnum, Float, ForeignKey, Integer, String, Text, UniqueConstraint +from sqlalchemy import Boolean, DateTime, Enum as SAEnum, Float, ForeignKey, Integer, String, Text, UniqueConstraint from sqlalchemy.orm import Mapped, mapped_column, relationship from app.db import Base @@ -80,6 +80,8 @@ class CompetitorListing(Base): title: Mapped[str | None] = mapped_column(String(500), nullable=True) agent_name: Mapped[str | None] = mapped_column(String(300), nullable=True) agency_name: Mapped[str | None] = mapped_column(String(300), nullable=True) + permit_number: Mapped[str | None] = mapped_column(String(100), nullable=True) + auto_discovered: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) current_price: Mapped[float | None] = mapped_column(Float, nullable=True) currency: Mapped[str | None] = mapped_column(String(10), nullable=True, default="AED") diff --git a/app/services/monitor.py b/app/services/monitor.py index e197951..013561a 100644 --- a/app/services/monitor.py +++ b/app/services/monitor.py @@ -70,6 +70,39 @@ def _source_label(source: str) -> str: return {"propertyfinder": "PropertyFinder", "bayut": "Bayut"}.get(source, source) +def _normalize_permit(value: str | None) -> str: + return (value or "").strip().lower() + + +def _listing_key(source: Source | str, external_id: str) -> tuple[str, str]: + source_value = source.value if isinstance(source, Source) else str(source) + return source_value, str(external_id or "") + + +def _format_listing_added(project: Project, listing: CompetitorListing, *, auto: bool) -> str: + title = listing.title or "без названия" + prefix = "✅ Автоматически добавлен конкурент" if auto else "✅ Добавлен конкурент" + return ( + f"{prefix} — {_source_label(listing.source.value)}\n" + f"{title}\n" + f"Цена: {_fmt_price(listing.current_price, listing.currency)}\n" + f"Permit: {listing.permit_number or project.dld_permit or '—'}\n" + f"{listing.url}" + ) + + +def _format_listing_removed(project: Project, listing: CompetitorListing, *, auto: bool) -> str: + title = listing.title or "без названия" + prefix = "🗑️ Автоматически удален конкурент" if auto else "🗑️ Удален конкурент" + return ( + f"{prefix} — {_source_label(listing.source.value)}\n" + f"{title}\n" + f"Последняя цена: {_fmt_price(listing.current_price, listing.currency)}\n" + f"Permit: {listing.permit_number or project.dld_permit or '—'}\n" + f"{listing.url}" + ) + + def add_competitor_url(db: Session, project: Project, url: str) -> tuple[CompetitorListing | None, str]: """User-facing entrypoint: paste a URL → create CompetitorListing for the project. Returns (listing, error_message). error_message is empty on success. @@ -115,6 +148,8 @@ def add_competitor_url(db: Session, project: Project, url: str) -> tuple[Competi title=scraped.title, agent_name=scraped.agent_name, agency_name=scraped.agency_name, + permit_number=scraped.permit_number, + auto_discovered=False, current_price=scraped.price, currency=scraped.currency or "AED", status=ListingStatus.ACTIVE, @@ -153,12 +188,120 @@ def add_competitor_urls(db: Session, project: Project, urls: list[str]) -> dict: return {"added": added, "skipped": skipped, "errors": errors} +def _create_listing_from_candidate( + db: Session, + project: Project, + candidate: ScrapedListing, + *, + permit_number: str, +) -> CompetitorListing: + now = datetime.utcnow() + listing = CompetitorListing( + project_id=project.id, + source=Source(candidate.source), + external_id=candidate.external_id or candidate.url, + url=candidate.url, + title=candidate.title, + agent_name=candidate.agent_name, + agency_name=candidate.agency_name, + permit_number=permit_number, + auto_discovered=True, + current_price=candidate.price, + currency=candidate.currency or "AED", + status=ListingStatus.ACTIVE, + first_seen_at=now, + last_seen_at=now, + ) + db.add(listing) + db.flush() + if candidate.price is not None: + db.add(PriceHistory(listing_id=listing.id, price=candidate.price, recorded_at=now)) + return listing + + +def _hide_tracked_suggestions( + db: Session, + project: Project, + suggestions: dict[str, list[ScrapedListing]], +) -> dict[str, list[ScrapedListing]]: + tracked = { + _listing_key(l.source, l.external_id) + for l in db.query(CompetitorListing).filter(CompetitorListing.project_id == project.id).all() + } + return { + source: [item for item in items if _listing_key(item.source, item.external_id) not in tracked] + for source, items in suggestions.items() + } + + +def sync_permit_competitors( + db: Session, + project: Project, +) -> tuple[list[str], dict[str, list[ScrapedListing]], str | None]: + """Auto-maintain competitor listings with the same DLD permit. + + Exact-permit matches are added automatically. Previously auto-discovered + exact-permit listings that disappear from the next permit search are + deleted. Manual competitors are never auto-deleted. + """ + changes: list[str] = [] + our_permit = resolve_our_permit(project) + if not our_permit: + return changes, {"propertyfinder": [], "bayut": []}, None + + normalized_permit = _normalize_permit(our_permit) + suggestions = suggest_similar(project, our_permit=our_permit, include_tracked=True) + matches = [ + item + for item in suggestions["propertyfinder"] + if _normalize_permit(item.permit_number) == normalized_permit + ] + + matched_keys = {_listing_key(item.source, item.external_id) for item in matches} + existing = { + _listing_key(item.source, item.external_id): item + for item in db.query(CompetitorListing).filter(CompetitorListing.project_id == project.id).all() + } + + for item in matches: + key = _listing_key(item.source, item.external_id) + listing = existing.get(key) + if listing: + listing.permit_number = item.permit_number or our_permit + if item.title: + listing.title = item.title + if item.agent_name: + listing.agent_name = item.agent_name + if item.agency_name: + listing.agency_name = item.agency_name + continue + + listing = _create_listing_from_candidate(db, project, item, permit_number=item.permit_number or our_permit) + existing[key] = listing + changes.append(_format_listing_added(project, listing, auto=True)) + + for listing in list(existing.values()): + if not listing.auto_discovered: + continue + if _normalize_permit(listing.permit_number) != normalized_permit: + continue + if _listing_key(listing.source, listing.external_id) in matched_keys: + continue + changes.append(_format_listing_removed(project, listing, auto=True)) + db.delete(listing) + + return changes, _hide_tracked_suggestions(db, project, suggestions), our_permit + + def check_project(db: Session, project: Project) -> list[str]: """Re-scan all tracked competitor listings for one project. Returns notification texts.""" changes: list[str] = [] now = datetime.utcnow() + sync_changes, _, _ = sync_permit_competitors(db, project) + changes.extend(sync_changes) - for listing in list(project.listings): + listings = db.query(CompetitorListing).filter(CompetitorListing.project_id == project.id).all() + for listing in listings: scraper = _scraper_for(listing.source) scraped = scraper.fetch_listing(listing.url) if scraped is None: @@ -242,6 +385,10 @@ def _notify_owner(project: Project, changes: list[str]) -> None: send_message(owner.tg_chat_id, c) +def notify_project_changes(project: Project, changes: list[str]) -> None: + _notify_owner(project, changes) + + def _reference_url(project: Project, source: Source) -> str | None: """A known listing URL in the project's building for the given source. @@ -268,7 +415,12 @@ def resolve_our_permit(project: Project) -> str | None: return PF.get_permit(ref) if ref else None -def suggest_similar(project: Project, our_permit: str | None = None) -> dict[str, list[ScrapedListing]]: +def suggest_similar( + project: Project, + our_permit: str | None = None, + *, + include_tracked: bool = False, +) -> dict[str, list[ScrapedListing]]: """Search PF + Bayut for listings in this project's building. Candidates that share our DLD permit are the same physical listing under a @@ -295,8 +447,11 @@ def suggest_similar(project: Project, our_permit: str | None = None) -> dict[str except Exception as e: logger.exception("Bayut suggest failed: %s", e) - # Hide candidates already tracked for this project — and our own listing. - excluded = {(l.source.value, l.external_id) for l in project.listings} + # Hide our own listing. Tracked competitors are hidden for UI suggestions, + # but included for automatic permit synchronization. + excluded: set[tuple[str, str]] = set() + if not include_tracked: + excluded.update((l.source.value, l.external_id) for l in project.listings) if project.our_url: own_src = detect_source_from_url(project.our_url) m = re.search(r"(\d+)\.html", project.our_url) diff --git a/app/worker.py b/app/worker.py index ebbeaec..6146cb7 100644 --- a/app/worker.py +++ b/app/worker.py @@ -15,11 +15,11 @@ from app.models import Project from app.services.monitor import ( BAYUT_ENABLED, add_competitor_url, - add_competitor_urls, - resolve_our_permit, + _format_listing_added, + notify_project_changes, run_check_all, run_check_for_project, - suggest_similar, + sync_permit_competitors, ) @@ -66,6 +66,7 @@ def cmd_add_listing(payload: dict[str, Any]) -> None: listing, err = add_competitor_url(db, project, url) if err: _fail(err) + notify_project_changes(project, [_format_listing_added(project, listing, auto=False)]) _write({"listing_id": listing.id}) finally: db.close() @@ -79,7 +80,27 @@ def cmd_add_listings(payload: dict[str, Any]) -> None: project = db.get(Project, project_id) if not project: _fail("project not found") - _write(add_competitor_urls(db, project, urls)) + added = 0 + skipped = 0 + errors: list[str] = [] + notifications: list[str] = [] + seen: set[str] = set() + for raw in urls: + url = str(raw or "").strip() + if not url or url in seen: + continue + seen.add(url) + listing, err = add_competitor_url(db, project, url) + if err == "Это объявление уже добавлено в проект": + skipped += 1 + elif err: + errors.append(err) + else: + added += 1 + notifications.append(_format_listing_added(project, listing, auto=False)) + if notifications: + notify_project_changes(project, notifications) + _write({"added": added, "skipped": skipped, "errors": errors}) finally: db.close() @@ -101,8 +122,10 @@ def cmd_suggest(payload: dict[str, Any]) -> None: project = db.get(Project, project_id) if not project: _fail("project not found") - permit = resolve_our_permit(project) - suggestions = suggest_similar(project, our_permit=permit) + changes, suggestions, permit = sync_permit_competitors(db, project) + db.commit() + if changes: + notify_project_changes(project, changes) _write({ "our_permit": permit, "bayut_enabled": BAYUT_ENABLED, diff --git a/internal/pf/db.go b/internal/pf/db.go index 42276aa..4dc1b9e 100644 --- a/internal/pf/db.go +++ b/internal/pf/db.go @@ -52,20 +52,22 @@ type Project struct { } type Listing struct { - ID int64 `json:"id"` - ProjectID int64 `json:"project_id"` - Source string `json:"source"` - ExternalID string `json:"external_id"` - URL string `json:"url"` - Title *string `json:"title"` - AgentName *string `json:"agent_name"` - AgencyName *string `json:"agency_name"` - CurrentPrice *float64 `json:"current_price"` - Currency *string `json:"currency"` - Status string `json:"status"` - FirstSeenAt *string `json:"first_seen_at"` - LastSeenAt *string `json:"last_seen_at"` - PriceHistory []PricePoint `json:"price_history,omitempty"` + ID int64 `json:"id"` + ProjectID int64 `json:"project_id"` + Source string `json:"source"` + ExternalID string `json:"external_id"` + URL string `json:"url"` + Title *string `json:"title"` + AgentName *string `json:"agent_name"` + AgencyName *string `json:"agency_name"` + PermitNumber *string `json:"permit_number"` + AutoDiscovered bool `json:"auto_discovered"` + CurrentPrice *float64 `json:"current_price"` + Currency *string `json:"currency"` + Status string `json:"status"` + FirstSeenAt *string `json:"first_seen_at"` + LastSeenAt *string `json:"last_seen_at"` + PriceHistory []PricePoint `json:"price_history,omitempty"` } type PricePoint struct { @@ -157,6 +159,8 @@ func (a *App) InitDB(ctx context.Context) error { title VARCHAR(500), agent_name VARCHAR(300), agency_name VARCHAR(300), + permit_number VARCHAR(100), + auto_discovered BOOLEAN NOT NULL DEFAULT 0, current_price FLOAT, currency VARCHAR(10), status VARCHAR(7) NOT NULL, @@ -178,7 +182,10 @@ func (a *App) InitDB(ctx context.Context) error { return err } } - return a.migrateEmployees(ctx) + if err := a.migrateEmployees(ctx); err != nil { + return err + } + return a.migrateCompetitorListings(ctx) } func (a *App) migrateEmployees(ctx context.Context) error { @@ -208,6 +215,37 @@ func (a *App) migrateEmployees(ctx context.Context) error { return err } +func (a *App) migrateCompetitorListings(ctx context.Context) error { + rows, err := a.DB.QueryContext(ctx, `PRAGMA table_info(competitor_listings)`) + if err != nil { + return err + } + defer rows.Close() + columns := map[string]bool{} + for rows.Next() { + var cid int + var name, typ string + var notNull int + var defaultValue any + var pk int + if err := rows.Scan(&cid, &name, &typ, ¬Null, &defaultValue, &pk); err != nil { + return err + } + columns[name] = true + } + if !columns["permit_number"] { + if _, err := a.DB.ExecContext(ctx, `ALTER TABLE competitor_listings ADD COLUMN permit_number VARCHAR(100)`); err != nil { + return err + } + } + if !columns["auto_discovered"] { + if _, err := a.DB.ExecContext(ctx, `ALTER TABLE competitor_listings ADD COLUMN auto_discovered BOOLEAN NOT NULL DEFAULT 0`); err != nil { + return err + } + } + return nil +} + func cleanPtr(value *string) *string { if value == nil { return nil diff --git a/internal/pf/http.go b/internal/pf/http.go index 9a16131..c02c370 100644 --- a/internal/pf/http.go +++ b/internal/pf/http.go @@ -352,13 +352,45 @@ func (s Server) listingItem(w http.ResponseWriter, r *http.Request, path string) writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } - if err := s.App.DeleteListing(r.Context(), emp.ID, id); err != nil { + deleted, err := s.App.DeleteListing(r.Context(), emp.ID, id) + if err != nil { writeError(w, http.StatusNotFound, "listing not found") return } + if deleted.OwnerChatID != nil && *deleted.OwnerChatID != "" { + _ = s.App.TG.SendMessage(r.Context(), *deleted.OwnerChatID, formatDeletedListingMessage(deleted)) + } w.WriteHeader(http.StatusNoContent) } +func formatDeletedListingMessage(deleted *DeletedListing) string { + listing := deleted.Listing + title := "без названия" + if listing.Title != nil && *listing.Title != "" { + title = *listing.Title + } + price := "—" + if listing.CurrentPrice != nil { + currency := "AED" + if listing.Currency != nil && *listing.Currency != "" { + currency = *listing.Currency + } + price = strconv.FormatFloat(*listing.CurrentPrice, 'f', 0, 64) + " " + currency + } + permit := "—" + if listing.PermitNumber != nil && *listing.PermitNumber != "" { + permit = *listing.PermitNumber + } + return "🏠 " + deleted.ProjectTitle + "\n" + + "Тип: " + deleted.ProjectDeal + " · Изменений: 1\n" + + "——————————\n" + + "🗑️ Удален конкурент — " + listing.Source + "\n" + + title + "\n" + + "Последняя цена: " + price + "\n" + + "Permit: " + permit + "\n" + + listing.URL +} + func (s Server) requireEmployee(w http.ResponseWriter, r *http.Request) (*Employee, bool) { emp, err := s.App.CurrentEmployee(r.Context(), portalUserID(r), true) if errors.Is(err, ErrTelegramRequired) { diff --git a/internal/pf/store.go b/internal/pf/store.go index 2abc3bf..7117952 100644 --- a/internal/pf/store.go +++ b/internal/pf/store.go @@ -386,30 +386,53 @@ func (a *App) DeleteProject(ctx context.Context, ownerID, projectID int64) error return tx.Commit() } -func (a *App) DeleteListing(ctx context.Context, ownerID, listingID int64) error { - var id int64 - err := a.DB.QueryRowContext(ctx, ` - SELECT l.id - FROM competitor_listings l JOIN projects p ON p.id = l.project_id - WHERE l.id = ? AND p.owner_id = ?`, listingID, ownerID).Scan(&id) +type DeletedListing struct { + Listing *Listing + ProjectTitle string + ProjectDeal string + OwnerChatID *string +} + +func (a *App) DeleteListing(ctx context.Context, ownerID, listingID int64) (*DeletedListing, error) { + row := a.DB.QueryRowContext(ctx, listingSelect()+` + JOIN projects p ON p.id = l.project_id + WHERE l.id = ? AND p.owner_id = ?`, listingID, ownerID) + listing, err := scanListing(row, false) if errors.Is(err, sql.ErrNoRows) { - return ErrNotFound + return nil, ErrNotFound } if err != nil { - return err + return nil, err } + deleted := &DeletedListing{Listing: listing} + var deal string + var chat sql.NullString + if err := a.DB.QueryRowContext(ctx, ` + SELECT p.title, p.deal_type, e.tg_chat_id + FROM projects p + JOIN employees e ON e.id = p.owner_id + WHERE p.id = ? AND p.owner_id = ?`, listing.ProjectID, ownerID). + Scan(&deleted.ProjectTitle, &deal, &chat); err != nil { + return nil, err + } + deleted.ProjectDeal = enumDealOut(deal) + deleted.OwnerChatID = nullableString(chat) + tx, err := a.DB.BeginTx(ctx, nil) if err != nil { - return err + return nil, err } defer tx.Rollback() - if _, err := tx.ExecContext(ctx, `DELETE FROM price_history WHERE listing_id = ?`, id); err != nil { - return err + if _, err := tx.ExecContext(ctx, `DELETE FROM price_history WHERE listing_id = ?`, listing.ID); err != nil { + return nil, err } - if _, err := tx.ExecContext(ctx, `DELETE FROM competitor_listings WHERE id = ?`, id); err != nil { - return err + if _, err := tx.ExecContext(ctx, `DELETE FROM competitor_listings WHERE id = ?`, listing.ID); err != nil { + return nil, err } - return tx.Commit() + if err := tx.Commit(); err != nil { + return nil, err + } + return deleted, nil } func (a *App) ListingByID(ctx context.Context, id int64, withHistory bool) (*Listing, error) { @@ -547,18 +570,19 @@ func (a *App) PriceHistory(ctx context.Context, listingID int64) ([]PricePoint, func listingSelect() string { return ` SELECT l.id, l.project_id, l.source, l.external_id, l.url, l.title, l.agent_name, - l.agency_name, l.current_price, l.currency, l.status, l.first_seen_at, l.last_seen_at + l.agency_name, l.permit_number, l.auto_discovered, l.current_price, l.currency, l.status, l.first_seen_at, l.last_seen_at FROM competitor_listings l` } func scanListing(row rowScanner, _ bool) (*Listing, error) { var l Listing var source, status string - var title, agent, agency, currency, firstSeen, lastSeen sql.NullString + var title, agent, agency, permit, currency, firstSeen, lastSeen sql.NullString var price sql.NullFloat64 + var autoDiscovered bool if err := row.Scan( &l.ID, &l.ProjectID, &source, &l.ExternalID, &l.URL, &title, &agent, &agency, - &price, ¤cy, &status, &firstSeen, &lastSeen, + &permit, &autoDiscovered, &price, ¤cy, &status, &firstSeen, &lastSeen, ); err != nil { return nil, err } @@ -566,6 +590,8 @@ func scanListing(row rowScanner, _ bool) (*Listing, error) { l.Title = nullableString(title) l.AgentName = nullableString(agent) l.AgencyName = nullableString(agency) + l.PermitNumber = nullableString(permit) + l.AutoDiscovered = autoDiscovered l.CurrentPrice = nullableFloat(price) l.Currency = nullableString(currency) l.Status = enumStatusOut(status)