Add monitoring TG service

This commit is contained in:
Grendgi
2026-06-04 14:55:41 +03:00
commit f9e072774c
74 changed files with 7232 additions and 0 deletions

View File

@@ -0,0 +1,334 @@
"""Heuristic extractors for Telegram message text.
Russian-first, regex/keyword based, no ML deps. Goal is to surface signals for
the UI: phone numbers, person names (FIO), and real-estate intent (sale/rent/
purchase). False positives are tolerable — operator triages in the UI.
Output shape (used as JSONB in messages.extracted):
{
"phones": ["+79123456789", ...],
"names": ["Иван Петров", ...],
"real_estate": {
"kind": "sale" | "rent" | "purchase" | null,
"property_type": str | null, # квартира, дом, ...
"rooms": str | null, # "2-к"
"area_m2": float | null,
"price": str | null, # raw matched string
} | null
}
"""
from __future__ import annotations
import re
from typing import Any
# --- Telegram @handles ---------------------------------------------------
# Plain @username — Telegram allows 532 chars, letters/digits/_, no leading digit.
_TG_HANDLE_RE = re.compile(r"(?<![\w/])@([A-Za-z][A-Za-z0-9_]{4,31})\b")
# t.me / telegram.me links to a user/channel handle (not joinchat / +invite).
_TG_LINK_RE = re.compile(
r"(?:https?://)?(?:t|telegram)\.me/(?!joinchat/|\+)([A-Za-z][A-Za-z0-9_]{4,31})\b"
)
def extract_tg_handles(text: str | None) -> list[str]:
if not text:
return []
out: list[str] = []
seen: set[str] = set()
for h in _TG_HANDLE_RE.findall(text):
key = h.lower()
if key in seen:
continue
seen.add(key)
out.append("@" + h)
for h in _TG_LINK_RE.findall(text):
key = h.lower()
if key in seen:
continue
seen.add(key)
out.append("@" + h)
return out
# --- Phones --------------------------------------------------------------
# Russian-format: starts with +7, 7, or 8 (no plus), 11 digits total.
_PHONE_RU_RE = re.compile(
r"(?<!\d)(?:\+?7|8)[\s\-().]*\d{3}[\s\-().]*\d{3}[\s\-().]*\d{2}[\s\-().]*\d{2}(?!\d)"
)
# International format: starts with `+<country code>` then 714 more digits
# with optional separators. Catches +971 (UAE), +1 (US), +44 (UK), etc.
_PHONE_INTL_RE = re.compile(
r"(?<![\w\d])\+\d{1,3}[\s\-().]*(?:\d[\s\-().]*){6,14}\d(?!\d)"
)
def extract_phones(text: str | None) -> list[str]:
if not text:
return []
out: list[str] = []
seen: set[str] = set()
# Pass 1: Russian-style. Normalize to +7XXXXXXXXXX.
for raw in _PHONE_RU_RE.findall(text):
digits = re.sub(r"\D", "", raw)
if len(digits) == 11 and digits[0] in "78":
normalized = "+7" + digits[1:]
elif len(digits) == 10:
normalized = "+7" + digits
else:
continue
if normalized not in seen:
seen.add(normalized)
out.append(normalized)
# Pass 2: international "+<country>...". Keep raw plus-prefix; just
# collapse separators so the result is +<digits>.
for raw in _PHONE_INTL_RE.findall(text):
digits = re.sub(r"\D", "", raw)
if not (8 <= len(digits) <= 15):
continue
normalized = "+" + digits
# If it normalized to something we already captured (e.g. +7 number
# picked up by both passes), skip.
if normalized in seen:
continue
seen.add(normalized)
out.append(normalized)
return out
# --- Names (ФИО) ---------------------------------------------------------
# Two or three capitalized Cyrillic tokens in a row. Allows hyphens (Иванов-Петров).
_NAME_RE = re.compile(
r"\b([А-ЯЁ][а-яё]+(?:\-[А-ЯЁ][а-яё]+)?(?:\s+[А-ЯЁ][а-яё]+(?:\-[А-ЯЁ][а-яё]+)?){1,2})\b"
)
# Common false positives — geo/places/orgs/etc. Skip exact matches.
_NAME_BLOCKLIST = {
"Российская Федерация",
"Санкт Петербург",
"Санкт-Петербург",
"Нижний Новгород",
"Великий Новгород",
"Ростов На Дону",
"Ростов-На-Дону",
"Москва Сити",
"Красная Площадь",
"Чёрное Море",
"Чёрного Моря",
"Без Депозита",
"Без Залога",
"Без Комиссии",
"Сдам Квартиру",
"Продам Квартиру",
"Куплю Квартиру",
"Сдам Студию",
"Продам Студию",
}
# Words that look like names but rarely are (months, weekdays, common nouns).
_NAME_TOKEN_BLOCK = {
"Январь", "Февраль", "Март", "Апрель", "Май", "Июнь",
"Июль", "Август", "Сентябрь", "Октябрь", "Ноябрь", "Декабрь",
"Понедельник", "Вторник", "Среда", "Четверг", "Пятница", "Суббота", "Воскресенье",
"Москва", "Питер", "Россия", "Кремль", "Метро",
}
def extract_names(text: str | None) -> list[str]:
if not text:
return []
out: list[str] = []
seen: set[str] = set()
for match in _NAME_RE.findall(text):
candidate = match.strip()
if candidate in _NAME_BLOCKLIST:
continue
tokens = re.split(r"[\s\-]+", candidate)
if any(t in _NAME_TOKEN_BLOCK for t in tokens):
continue
# Heuristic: at least one token must have len >= 4 (rules out "Ул.")
if not any(len(t) >= 4 for t in tokens):
continue
if candidate not in seen:
seen.add(candidate)
out.append(candidate)
return out
# --- Real estate ---------------------------------------------------------
_DEAL_KEYWORDS: dict[str, tuple[str, ...]] = {
"rent": (
# ru
"сдаётся", "сдается", "сдаю", "сдадим", "сдам", "сдаём",
"аренда", "арендую", "арендуем", "снять",
"посуточно", "помесячно",
# en
"for rent", "to let", "rental", "renting", "lease", "leasing",
"per year", "per month", "/year", "/month", "/mo",
),
"sale": (
# ru
"продаётся", "продается", "продаю", "продадим", "продам", "продаём",
"продажа", "к продаже",
# en
"for sale", "#forsale", "selling", "selling price", "sale price",
),
"purchase": (
# ru
"куплю", "купим", "покупаю", "покупка", "ищу квартиру",
"ищу дом", "ищем квартиру", "рассматриваю покупку",
# en
"looking for", "want to buy", "wanted", "requirement", "wtb",
),
}
_PROPERTY_TYPES: tuple[tuple[str, str], ...] = (
# ru
("квартир", "квартира"),
("студи", "студия"),
("апартамент", "апартаменты"),
("комнат", "комната"),
("таунхаус", "таунхаус"),
("коттедж", "коттедж"),
("дача", "дача"),
("дом", "дом"),
("офис", "офис"),
("склад", "склад"),
("помещен", "помещение"),
("земельн", "земельный участок"),
("участок", "участок"),
("гараж", "гараж"),
("машиномест", "машиноместо"),
# en — kept as Russian labels for UI consistency
("villa", "дом"),
("townhouse", "таунхаус"),
("penthouse", "апартаменты"),
("apartment", "квартира"),
("studio", "студия"),
("plot", "участок"),
(" land ", "участок"),
("office", "офис"),
("warehouse", "склад"),
("retail", "помещение"),
("garage", "гараж"),
)
_AREA_M2_RE = re.compile(
r"(\d[\d\s,]*\d|\d)\s*(?:м[²2]|кв\.?\s*м|кв\.\s*метр)",
re.IGNORECASE,
)
_AREA_SQFT_RE = re.compile(
r"(\d[\d\s,]*\d|\d)\s*(?:sqft|sq\.?\s*ft|sq\s+ft|square\s+feet)",
re.IGNORECASE,
)
def _parse_number(s: str) -> float | None:
cleaned = s.replace(" ", "").replace(",", "")
try:
return float(cleaned)
except ValueError:
return None
_ROOMS_RE = re.compile(
r"\b(\d)[\-\s]*(?:к\b|комн|комнатн|-комнат|br\b|bed\b|bedroom|-bed)",
re.IGNORECASE,
)
# Studio is a special-case "0 rooms" indicator; not extracted as rooms count.
_PRICE_RE = re.compile(
r"(\d[\d\s.,]*\d|\d)\s*(млн|млрд|тыс|тысяч|миллионов?|миллиардов?|руб(?:лей)?|₽|р/мес|/мес|р\b)",
re.IGNORECASE,
)
def _detect_kind(low: str) -> str | None:
for kind, words in _DEAL_KEYWORDS.items():
for w in words:
if w in low:
return kind
return None
def _detect_property_type(low: str) -> str | None:
for stem, label in _PROPERTY_TYPES:
if stem in low:
return label
return None
def extract_real_estate(text: str | None) -> dict[str, Any] | None:
if not text:
return None
low = text.lower()
kind = _detect_kind(low)
prop = _detect_property_type(low)
if kind is None and prop is None:
return None
rooms_m = _ROOMS_RE.search(low)
rooms = f"{rooms_m.group(1)}" if rooms_m else None
if rooms is None and ("студи" in low or "studio" in low):
rooms = "студия"
area: float | None = None
area_m = _AREA_M2_RE.search(text)
if area_m:
area = _parse_number(area_m.group(1))
if area is None:
sqft_m = _AREA_SQFT_RE.search(text)
if sqft_m:
sqft = _parse_number(sqft_m.group(1))
if sqft is not None:
area = round(sqft * 0.0929, 1)
price_m = _PRICE_RE.search(text)
price = price_m.group(0).strip() if price_m else None
return {
"kind": kind,
"property_type": prop,
"rooms": rooms,
"area_m2": area,
"price": price,
}
# --- Top-level analyzer --------------------------------------------------
def analyze(text: str | None) -> dict[str, Any]:
"""Synchronous regex-only analysis. Cheap and runs at insert time."""
return {
"phones": extract_phones(text),
"names": extract_names(text),
"tg_handles": extract_tg_handles(text),
"real_estate": extract_real_estate(text),
}
async def analyze_with_llm(
text: str | None,
vertical: str = "real_estate",
section_slug: str | None = None,
) -> dict[str, Any]:
"""Regex extraction + local LLM lead classification, routed by vertical.
`section_slug` lets the classifier pick a section-specific system prompt
(e.g. Dubai-focused for `real_estate:dubai`) with fallback to the
vertical-default prompt. The LLM verdict goes under `lead` for RE and
under `hr_lead` for HR. Falls back to regex-only if Ollama is unavailable.
"""
base = analyze(text)
# Lazy import to avoid hard dep on httpx in environments where LLM is off.
from parser_bot.llm import classify
verdict = await classify(text, vertical, section_slug) # type: ignore[arg-type]
if verdict is not None:
base["hr_lead" if vertical == "hr" else "lead"] = verdict
return base