"""Heuristic extractors for Telegram message text. Russian-first, regex/keyword based, no ML deps. Goal is to surface signals for the UI: phone numbers, person names (FIO), and real-estate intent (sale/rent/ purchase). False positives are tolerable — operator triages in the UI. Output shape (used as JSONB in messages.extracted): { "phones": ["+79123456789", ...], "names": ["Иван Петров", ...], "real_estate": { "kind": "sale" | "rent" | "purchase" | null, "property_type": str | null, # квартира, дом, ... "rooms": str | null, # "2-к" "area_m2": float | null, "price": str | null, # raw matched string } | null } """ from __future__ import annotations import re from typing import Any # --- Telegram @handles --------------------------------------------------- # Plain @username — Telegram allows 5–32 chars, letters/digits/_, no leading digit. _TG_HANDLE_RE = re.compile(r"(? list[str]: if not text: return [] out: list[str] = [] seen: set[str] = set() for h in _TG_HANDLE_RE.findall(text): key = h.lower() if key in seen: continue seen.add(key) out.append("@" + h) for h in _TG_LINK_RE.findall(text): key = h.lower() if key in seen: continue seen.add(key) out.append("@" + h) return out # --- Phones -------------------------------------------------------------- # Russian-format: starts with +7, 7, or 8 (no plus), 11 digits total. _PHONE_RU_RE = re.compile( r"(?` then 7–14 more digits # with optional separators. Catches +971 (UAE), +1 (US), +44 (UK), etc. _PHONE_INTL_RE = re.compile( r"(? list[str]: if not text: return [] out: list[str] = [] seen: set[str] = set() # Pass 1: Russian-style. Normalize to +7XXXXXXXXXX. for raw in _PHONE_RU_RE.findall(text): digits = re.sub(r"\D", "", raw) if len(digits) == 11 and digits[0] in "78": normalized = "+7" + digits[1:] elif len(digits) == 10: normalized = "+7" + digits else: continue if normalized not in seen: seen.add(normalized) out.append(normalized) # Pass 2: international "+...". Keep raw plus-prefix; just # collapse separators so the result is +. for raw in _PHONE_INTL_RE.findall(text): digits = re.sub(r"\D", "", raw) if not (8 <= len(digits) <= 15): continue normalized = "+" + digits # If it normalized to something we already captured (e.g. +7 number # picked up by both passes), skip. if normalized in seen: continue seen.add(normalized) out.append(normalized) return out # --- Names (ФИО) --------------------------------------------------------- # Two or three capitalized Cyrillic tokens in a row. Allows hyphens (Иванов-Петров). _NAME_RE = re.compile( r"\b([А-ЯЁ][а-яё]+(?:\-[А-ЯЁ][а-яё]+)?(?:\s+[А-ЯЁ][а-яё]+(?:\-[А-ЯЁ][а-яё]+)?){1,2})\b" ) # Common false positives — geo/places/orgs/etc. Skip exact matches. _NAME_BLOCKLIST = { "Российская Федерация", "Санкт Петербург", "Санкт-Петербург", "Нижний Новгород", "Великий Новгород", "Ростов На Дону", "Ростов-На-Дону", "Москва Сити", "Красная Площадь", "Чёрное Море", "Чёрного Моря", "Без Депозита", "Без Залога", "Без Комиссии", "Сдам Квартиру", "Продам Квартиру", "Куплю Квартиру", "Сдам Студию", "Продам Студию", } # Words that look like names but rarely are (months, weekdays, common nouns). _NAME_TOKEN_BLOCK = { "Январь", "Февраль", "Март", "Апрель", "Май", "Июнь", "Июль", "Август", "Сентябрь", "Октябрь", "Ноябрь", "Декабрь", "Понедельник", "Вторник", "Среда", "Четверг", "Пятница", "Суббота", "Воскресенье", "Москва", "Питер", "Россия", "Кремль", "Метро", } def extract_names(text: str | None) -> list[str]: if not text: return [] out: list[str] = [] seen: set[str] = set() for match in _NAME_RE.findall(text): candidate = match.strip() if candidate in _NAME_BLOCKLIST: continue tokens = re.split(r"[\s\-]+", candidate) if any(t in _NAME_TOKEN_BLOCK for t in tokens): continue # Heuristic: at least one token must have len >= 4 (rules out "Ул.") if not any(len(t) >= 4 for t in tokens): continue if candidate not in seen: seen.add(candidate) out.append(candidate) return out # --- Real estate --------------------------------------------------------- _DEAL_KEYWORDS: dict[str, tuple[str, ...]] = { "rent": ( # ru "сдаётся", "сдается", "сдаю", "сдадим", "сдам", "сдаём", "аренда", "арендую", "арендуем", "снять", "посуточно", "помесячно", # en "for rent", "to let", "rental", "renting", "lease", "leasing", "per year", "per month", "/year", "/month", "/mo", ), "sale": ( # ru "продаётся", "продается", "продаю", "продадим", "продам", "продаём", "продажа", "к продаже", # en "for sale", "#forsale", "selling", "selling price", "sale price", ), "purchase": ( # ru "куплю", "купим", "покупаю", "покупка", "ищу квартиру", "ищу дом", "ищем квартиру", "рассматриваю покупку", # en "looking for", "want to buy", "wanted", "requirement", "wtb", ), } _PROPERTY_TYPES: tuple[tuple[str, str], ...] = ( # ru ("квартир", "квартира"), ("студи", "студия"), ("апартамент", "апартаменты"), ("комнат", "комната"), ("таунхаус", "таунхаус"), ("коттедж", "коттедж"), ("дача", "дача"), ("дом", "дом"), ("офис", "офис"), ("склад", "склад"), ("помещен", "помещение"), ("земельн", "земельный участок"), ("участок", "участок"), ("гараж", "гараж"), ("машиномест", "машиноместо"), # en — kept as Russian labels for UI consistency ("villa", "дом"), ("townhouse", "таунхаус"), ("penthouse", "апартаменты"), ("apartment", "квартира"), ("studio", "студия"), ("plot", "участок"), (" land ", "участок"), ("office", "офис"), ("warehouse", "склад"), ("retail", "помещение"), ("garage", "гараж"), ) _AREA_M2_RE = re.compile( r"(\d[\d\s,]*\d|\d)\s*(?:м[²2]|кв\.?\s*м|кв\.\s*метр)", re.IGNORECASE, ) _AREA_SQFT_RE = re.compile( r"(\d[\d\s,]*\d|\d)\s*(?:sqft|sq\.?\s*ft|sq\s+ft|square\s+feet)", re.IGNORECASE, ) def _parse_number(s: str) -> float | None: cleaned = s.replace(" ", "").replace(",", "") try: return float(cleaned) except ValueError: return None _ROOMS_RE = re.compile( r"\b(\d)[\-\s]*(?:к\b|комн|комнатн|-комнат|br\b|bed\b|bedroom|-bed)", re.IGNORECASE, ) # Studio is a special-case "0 rooms" indicator; not extracted as rooms count. _PRICE_RE = re.compile( r"(\d[\d\s.,]*\d|\d)\s*(млн|млрд|тыс|тысяч|миллионов?|миллиардов?|руб(?:лей)?|₽|р/мес|/мес|р\b)", re.IGNORECASE, ) def _detect_kind(low: str) -> str | None: for kind, words in _DEAL_KEYWORDS.items(): for w in words: if w in low: return kind return None def _detect_property_type(low: str) -> str | None: for stem, label in _PROPERTY_TYPES: if stem in low: return label return None def extract_real_estate(text: str | None) -> dict[str, Any] | None: if not text: return None low = text.lower() kind = _detect_kind(low) prop = _detect_property_type(low) if kind is None and prop is None: return None rooms_m = _ROOMS_RE.search(low) rooms = f"{rooms_m.group(1)}-к" if rooms_m else None if rooms is None and ("студи" in low or "studio" in low): rooms = "студия" area: float | None = None area_m = _AREA_M2_RE.search(text) if area_m: area = _parse_number(area_m.group(1)) if area is None: sqft_m = _AREA_SQFT_RE.search(text) if sqft_m: sqft = _parse_number(sqft_m.group(1)) if sqft is not None: area = round(sqft * 0.0929, 1) price_m = _PRICE_RE.search(text) price = price_m.group(0).strip() if price_m else None return { "kind": kind, "property_type": prop, "rooms": rooms, "area_m2": area, "price": price, } # --- Top-level analyzer -------------------------------------------------- def analyze(text: str | None) -> dict[str, Any]: """Synchronous regex-only analysis. Cheap and runs at insert time.""" return { "phones": extract_phones(text), "names": extract_names(text), "tg_handles": extract_tg_handles(text), "real_estate": extract_real_estate(text), }