from __future__ import annotations import sqlite3 import time from contextlib import contextmanager from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Iterator from zoneinfo import ZoneInfo MOSCOW = ZoneInfo("Europe/Moscow") # Заявка считается завершённой в этих статусах. TERMINAL_STATUSES = ("closed", "rejected") # Статусы, по которым заявка ещё «в работе» и попадает в SLA/эскалацию. ACTIVE_STATUSES = ("new", "in_progress", "waiting", "deferred") VALID_PRIORITIES = ("low", "medium", "high", "critical") # Рабочее окно SLA-таймера: пн-пт 9:00–18:00 МСК. Вне этих часов таймер «стоит». WORK_START_HOUR = 9 WORK_END_HOUR = 18 WORK_DAYS = {0, 1, 2, 3, 4} # 0 — понедельник, 4 — пятница def _work_window(d: datetime) -> tuple[datetime, datetime]: start = d.replace(hour=WORK_START_HOUR, minute=0, second=0, microsecond=0) end = d.replace(hour=WORK_END_HOUR, minute=0, second=0, microsecond=0) return start, end def work_minutes_between(start: datetime, end: datetime) -> float: """Сколько минут «рабочего времени» прошло между двумя моментами (МСК, пн-пт 9-18).""" if end <= start: return 0.0 start = start.astimezone(MOSCOW) end = end.astimezone(MOSCOW) total = 0.0 cur = start while cur.date() < end.date(): if cur.weekday() in WORK_DAYS: day_start, day_end = _work_window(cur) lo = max(cur, day_start) if lo < day_end: total += (day_end - lo).total_seconds() / 60 cur = (cur + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) if end.weekday() in WORK_DAYS: day_start, day_end = _work_window(end) lo = max(cur, day_start) hi = min(end, day_end) if lo < hi: total += (hi - lo).total_seconds() / 60 return total def moscow_now() -> str: return datetime.now(MOSCOW).strftime("%d.%m.%Y %H:%M:%S МСК") def parse_moscow(value: str | None) -> datetime | None: """Разбирает дату заявки обратно в datetime. Поддерживает текущий формат '25.05.2026 14:45:00 МСК', а также старый дефолт SQLite CURRENT_TIMESTAMP ('2026-05-22 13:58:09', UTC). """ if not value: return None cleaned = value.replace("МСК", "").strip() try: return datetime.strptime(cleaned, "%d.%m.%Y %H:%M:%S").replace(tzinfo=MOSCOW) except ValueError: pass try: return datetime.strptime(cleaned, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc) except ValueError: return None def _epoch(ticket: sqlite3.Row | dict) -> float | None: try: value = ticket["created_epoch"] except (KeyError, IndexError): value = None if value: return float(value) parsed = parse_moscow(ticket["created_at"] if ticket["created_at"] else None) return parsed.timestamp() if parsed else None def is_overdue(ticket: sqlite3.Row | dict, sla_minutes: int) -> bool: # SLA-таймер реакции: тикает только пока заявка «new» и только в рабочие часы. if ticket["status"] != "new": return False base = _epoch(ticket) if base is None: return False created = datetime.fromtimestamp(base, MOSCOW) elapsed = work_minutes_between(created, datetime.now(MOSCOW)) return elapsed > sla_minutes def sla_remaining_minutes(ticket: sqlite3.Row | dict, sla_minutes: int) -> int | None: """Минут до дедлайна реакции (по рабочему времени); отрицательное — насколько просрочено. Возвращает None для любого статуса кроме «new» — SLA реакции к ним не применяется. """ if ticket["status"] != "new": return None base = _epoch(ticket) if base is None: return None created = datetime.fromtimestamp(base, MOSCOW) elapsed = work_minutes_between(created, datetime.now(MOSCOW)) return int(sla_minutes - elapsed) class Database: def __init__(self, path: str) -> None: self.path = path Path(path).parent.mkdir(parents=True, exist_ok=True) self.init() @contextmanager def connect(self) -> Iterator[sqlite3.Connection]: conn = sqlite3.connect(self.path) conn.row_factory = sqlite3.Row try: yield conn conn.commit() finally: conn.close() def init(self) -> None: with self.connect() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS tickets ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, username TEXT, full_name TEXT, department TEXT NOT NULL, topic TEXT NOT NULL, subtopic TEXT NOT NULL, location TEXT NOT NULL, description TEXT NOT NULL, contact_time TEXT NOT NULL, attachment_file_id TEXT, attachments TEXT, status TEXT NOT NULL DEFAULT 'new', priority TEXT NOT NULL DEFAULT 'medium', assignee_id INTEGER, assignee_name TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, created_epoch REAL, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, closed_at TEXT, resolution TEXT, escalated INTEGER NOT NULL DEFAULT 0 ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS ticket_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, ticket_id INTEGER NOT NULL, actor_id INTEGER, actor_name TEXT, kind TEXT NOT NULL, text TEXT, created_at TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS users ( user_id INTEGER PRIMARY KEY, name TEXT, username TEXT, updated_at TEXT ) """ ) # Аддитивные миграции для баз, созданных прежней версией бота. columns = {row["name"] for row in conn.execute("PRAGMA table_info(tickets)")} migrations = { "assignee_name": "ALTER TABLE tickets ADD COLUMN assignee_name TEXT", "priority": "ALTER TABLE tickets ADD COLUMN priority TEXT NOT NULL DEFAULT 'medium'", "created_epoch": "ALTER TABLE tickets ADD COLUMN created_epoch REAL", "escalated": "ALTER TABLE tickets ADD COLUMN escalated INTEGER NOT NULL DEFAULT 0", "attachments": "ALTER TABLE tickets ADD COLUMN attachments TEXT", } for column, statement in migrations.items(): if column not in columns: conn.execute(statement) # Заполняем created_epoch для старых заявок, разбирая текстовую дату. for row in conn.execute( "SELECT id, created_at FROM tickets WHERE created_epoch IS NULL" ).fetchall(): parsed = parse_moscow(row["created_at"]) conn.execute( "UPDATE tickets SET created_epoch = ? WHERE id = ?", (parsed.timestamp() if parsed else None, row["id"]), ) # ----------------------------------------------------------------- users def touch_user(self, user_id: int, name: str | None, username: str | None) -> None: if not user_id: return with self.connect() as conn: conn.execute( """ INSERT INTO users (user_id, name, username, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(user_id) DO UPDATE SET name = excluded.name, username = excluded.username, updated_at = excluded.updated_at """, (user_id, name, username, moscow_now()), ) def user_name(self, user_id: int) -> str | None: with self.connect() as conn: row = conn.execute( "SELECT name, username FROM users WHERE user_id = ?", (user_id,) ).fetchone() if not row: return None return row["name"] or (f"@{row['username']}" if row["username"] else None) # --------------------------------------------------------------- tickets def create_ticket(self, payload: dict[str, Any]) -> int: now = moscow_now() with self.connect() as conn: cursor = conn.execute( """ INSERT INTO tickets ( user_id, username, full_name, department, topic, subtopic, location, description, contact_time, attachment_file_id, attachments, priority, created_at, created_epoch, updated_at ) VALUES ( :user_id, :username, :full_name, :department, :topic, :subtopic, :location, :description, :contact_time, :attachment_file_id, :attachments, :priority, :created_at, :created_epoch, :updated_at ) """, { **payload, "priority": payload.get("priority", "medium"), "created_at": now, "created_epoch": time.time(), "updated_at": now, }, ) ticket_id = int(cursor.lastrowid) conn.execute( """ INSERT INTO ticket_events (ticket_id, actor_id, actor_name, kind, text, created_at) VALUES (?, ?, ?, 'created', ?, ?) """, (ticket_id, payload.get("user_id"), payload.get("full_name"), "Заявка создана", now), ) return ticket_id def find_duplicate_ticket(self, user_id: int, topic: str) -> sqlite3.Row | None: """Открытая заявка того же сотрудника по той же теме — кандидат в дубликаты.""" if not user_id: return None terminal = ", ".join(f"'{s}'" for s in TERMINAL_STATUSES) with self.connect() as conn: return conn.execute( f""" SELECT * FROM tickets WHERE user_id = ? AND topic = ? AND status NOT IN ({terminal}) ORDER BY id DESC LIMIT 1 """, (user_id, topic), ).fetchone() def list_user_tickets(self, user_id: int, status_filter: str = "active") -> list[sqlite3.Row]: terminal = ", ".join(f"'{s}'" for s in TERMINAL_STATUSES) where_status = f"status NOT IN ({terminal})" if status_filter == "closed": where_status = f"status IN ({terminal})" elif status_filter == "all": where_status = "1 = 1" with self.connect() as conn: return conn.execute( f""" SELECT * FROM tickets WHERE user_id = ? AND {where_status} ORDER BY id DESC LIMIT 10 """, (user_id,), ).fetchall() def list_admin_tickets( self, departments: set[str] | None, list_filter: str, admin_id: int, sla_minutes: int, ) -> list[sqlite3.Row]: terminal = ", ".join(f"'{s}'" for s in TERMINAL_STATUSES) clauses: list[str] = [] params: list[Any] = [] if departments: placeholders = ", ".join("?" for _ in departments) clauses.append(f"department IN ({placeholders})") params.extend(sorted(departments)) status_map = { "open": f"status NOT IN ({terminal})", "new": "status = 'new'", "in_progress": "status = 'in_progress'", "waiting": "status = 'waiting'", "deferred": "status = 'deferred'", "closed": f"status IN ({terminal})", } if list_filter == "mine": clauses.append(f"status NOT IN ({terminal})") clauses.append("assignee_id = ?") params.append(admin_id) elif list_filter == "overdue": clauses.append(f"status NOT IN ({terminal})") else: clauses.append(status_map.get(list_filter, status_map["open"])) where = " AND ".join(clauses) if clauses else "1 = 1" # Архив сортируем по убыванию номера и показываем только последние 10, # иначе список разрастётся. Активные списки — по приоритету и давности. if list_filter == "closed": order_by = "id DESC" sql_limit = 10 else: order_by = ( "CASE priority " "WHEN 'critical' THEN 0 WHEN 'high' THEN 1 WHEN 'medium' THEN 2 ELSE 3 END, " "COALESCE(created_epoch, 0) ASC, id ASC" ) sql_limit = 30 with self.connect() as conn: rows = conn.execute( f"SELECT * FROM tickets WHERE {where} ORDER BY {order_by} LIMIT {sql_limit}", params, ).fetchall() if list_filter == "overdue": rows = [row for row in rows if is_overdue(row, sla_minutes)] if list_filter == "closed": return rows return rows[:20] def dashboard(self, departments: set[str] | None, sla_minutes: int) -> dict[str, int]: department_filter = "" params: list[str] = [] if departments: placeholders = ", ".join("?" for _ in departments) department_filter = f" WHERE department IN ({placeholders})" params.extend(sorted(departments)) with self.connect() as conn: rows = conn.execute( f"SELECT status, priority, created_epoch, created_at FROM tickets{department_filter}", params, ).fetchall() stats = {key: 0 for key in ("new", "in_progress", "waiting", "deferred", "closed", "open", "overdue", "mine_na")} for row in rows: status = row["status"] if status in TERMINAL_STATUSES: stats["closed"] += 1 continue stats["open"] += 1 if status in stats: stats[status] += 1 if is_overdue(row, sla_minutes): stats["overdue"] += 1 return stats def get_ticket(self, ticket_id: int) -> sqlite3.Row | None: with self.connect() as conn: return conn.execute( "SELECT * FROM tickets WHERE id = ?", (ticket_id,), ).fetchone() def list_overdue_unescalated(self, sla_minutes: int) -> list[sqlite3.Row]: terminal = ", ".join(f"'{s}'" for s in TERMINAL_STATUSES) with self.connect() as conn: rows = conn.execute( f"SELECT * FROM tickets WHERE status NOT IN ({terminal}) AND escalated = 0" ).fetchall() return [row for row in rows if is_overdue(row, sla_minutes)] def mark_escalated(self, ticket_id: int) -> None: with self.connect() as conn: conn.execute("UPDATE tickets SET escalated = 1 WHERE id = ?", (ticket_id,)) # --------------------------------------------------------------- changes def _touch(self, conn: sqlite3.Connection, ticket_id: int, fields: dict[str, Any]) -> None: fields["updated_at"] = moscow_now() assignments = ", ".join(f"{key} = :{key}" for key in fields) conn.execute( f"UPDATE tickets SET {assignments} WHERE id = :ticket_id", {**fields, "ticket_id": ticket_id}, ) def add_event( self, ticket_id: int, actor_id: int | None, actor_name: str | None, kind: str, text: str | None = None, ) -> None: with self.connect() as conn: conn.execute( """ INSERT INTO ticket_events (ticket_id, actor_id, actor_name, kind, text, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (ticket_id, actor_id, actor_name, kind, text, moscow_now()), ) def list_events(self, ticket_id: int, limit: int = 30) -> list[sqlite3.Row]: with self.connect() as conn: return conn.execute( "SELECT * FROM ticket_events WHERE ticket_id = ? ORDER BY id ASC LIMIT ?", (ticket_id, limit), ).fetchall() def assign_ticket(self, ticket_id: int, assignee_id: int, assignee_name: str) -> None: with self.connect() as conn: self._touch( conn, ticket_id, {"status": "in_progress", "assignee_id": assignee_id, "assignee_name": assignee_name}, ) def set_status(self, ticket_id: int, status: str) -> None: with self.connect() as conn: self._touch(conn, ticket_id, {"status": status}) def set_priority(self, ticket_id: int, priority: str) -> None: with self.connect() as conn: self._touch(conn, ticket_id, {"priority": priority}) def close_ticket( self, ticket_id: int, assignee_id: int, assignee_name: str, resolution: str, status: str = "closed", ) -> None: now = moscow_now() with self.connect() as conn: conn.execute( """ UPDATE tickets SET status = ?, assignee_id = COALESCE(assignee_id, ?), assignee_name = COALESCE(assignee_name, ?), resolution = ?, closed_at = ?, updated_at = ? WHERE id = ? """, (status, assignee_id, assignee_name, resolution, now, now, ticket_id), ) def close_by_user(self, ticket_id: int, resolution: str) -> None: """Заявитель сам закрывает заявку (вопрос решился). Исполнителя не меняем.""" now = moscow_now() with self.connect() as conn: conn.execute( """ UPDATE tickets SET status = 'closed', resolution = ?, closed_at = ?, updated_at = ? WHERE id = ? """, (resolution, now, now, ticket_id), ) def reopen_ticket(self, ticket_id: int, assignee_id: int, assignee_name: str) -> None: now = moscow_now() with self.connect() as conn: conn.execute( """ UPDATE tickets SET status = 'in_progress', assignee_id = ?, assignee_name = ?, resolution = NULL, closed_at = NULL, escalated = 0, updated_at = ? WHERE id = ? """, (assignee_id, assignee_name, now, ticket_id), )