Compare commits

..

24 Commits

Author SHA1 Message Date
Grendgi
d0f8b48869 feat: add monitoring tg view-all scope
All checks were successful
CI / hygiene (push) Successful in 1s
Build and Deploy / build-and-deploy (push) Successful in 28s
CI / go (push) Successful in 21s
CI / python (push) Successful in 1s
2026-06-19 14:26:51 +03:00
Grendgi
cdbdea250d feat: expose monitoring tg media repair details
All checks were successful
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 45s
CI / go (push) Successful in 31s
CI / python (push) Successful in 2s
2026-06-17 17:22:50 +03:00
Grendgi
696b7eda6f feat: expose monitoring tg poll diagnostics 2026-06-17 17:19:13 +03:00
Grendgi
bd3b54dc7d feat: track monitoring tg poll errors 2026-06-17 17:01:34 +03:00
Grendgi
6cbfdb92e4 feat: expose monitoring tg ai job health 2026-06-17 16:58:47 +03:00
Grendgi
7cc5109cba feat: expose monitoring tg health detail 2026-06-17 15:55:28 +03:00
Grendgi
90c0a346ed chore: use common internal auth 2026-06-17 14:26:04 +03:00
Grendgi
7f7f2427cb chore: use common header parsing 2026-06-17 14:19:13 +03:00
Grendgi
73afcb64d5 Allow monitoring tg access to managed departments
All checks were successful
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 44s
CI / go (push) Successful in 16s
CI / python (push) Successful in 2s
2026-06-15 18:02:09 +03:00
Grendgi
5eb8e21eda Add monitoring TG CI hygiene guard
All checks were successful
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 27s
CI / go (push) Successful in 26s
CI / python (push) Successful in 2s
2026-06-12 16:42:35 +03:00
Grendgi
778b48cc12 Protect monitoring TG API with internal key 2026-06-12 16:32:10 +03:00
Grendgi
1f1354e72b Retry monitoring TG database connection on startup 2026-06-12 16:28:02 +03:00
Grendgi
fd1ee0611b Batch enqueue TG AI classifications
All checks were successful
CI / go (push) Successful in 18s
CI / python (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 29s
2026-06-12 09:01:41 +03:00
Grendgi
fc696c9e13 Add per-section TG classifications
All checks were successful
CI / go (push) Successful in 16s
CI / python (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 46s
2026-06-11 17:06:00 +03:00
Grendgi
f66ca4b6d4 Ignore non-real-estate sales in TG leads
All checks were successful
CI / go (push) Successful in 26s
CI / python (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 27s
2026-06-11 14:51:29 +03:00
Grendgi
eb12190729 Fix monitoring TG CI lint issues
All checks were successful
CI / go (push) Successful in 28s
CI / python (push) Successful in 1s
Build and Deploy / build-and-deploy (push) Successful in 26s
2026-06-09 11:33:20 +03:00
Grendgi
c4ad6c6c84 Add CI workflow
Some checks failed
Build and Deploy / build-and-deploy (push) Successful in 18s
CI / go (push) Failing after 49s
CI / python (push) Failing after 26s
2026-06-09 10:32:22 +03:00
Grendgi
2d0d751115 Share Telegram channels across sections
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 43s
2026-06-08 23:54:49 +03:00
Grendgi
ddea7002f1 Deploy all monitoring TG containers
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 15s
2026-06-08 23:33:08 +03:00
Grendgi
29490e5f93 Return skipped status for Telegram poll limits
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 30s
2026-06-08 23:29:38 +03:00
Grendgi
5165b31910 Handle Telegram poll failures gracefully
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 31s
2026-06-08 23:14:29 +03:00
Grendgi
e075b11761 Fix Docker build context for Go classifier
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 30s
2026-06-08 16:55:55 +03:00
Grendgi
bc15c2e116 Allow monitoring TG AI retries
Some checks failed
Build and Deploy / build-and-deploy (push) Failing after 5s
2026-06-08 15:48:47 +03:00
Grendgi
8259a01a88 Route monitoring TG classification through AI service
Some checks failed
Build and Deploy / build-and-deploy (push) Failing after 5s
2026-06-08 15:47:42 +03:00
21 changed files with 1733 additions and 281 deletions

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env bash
set -euo pipefail
fail=0
while IFS= read -r -d '' path; do
base="$(basename "$path")"
case "$base" in
.DS_Store|.env)
echo "::error file=$path::tracked local-only file is forbidden"
fail=1
;;
esac
case "$path" in
*node_modules/*|node_modules/*)
echo "::error file=$path::tracked node_modules content is forbidden"
fail=1
;;
*.tmp|*.temp|*.bak|*.orig|*.rej|*.zip|*.tar|*.tar.gz|*.tgz|*.rar|*.7z)
echo "::error file=$path::tracked temporary/archive artifact is forbidden"
fail=1
;;
esac
if [ -f "$path" ]; then
size="$(wc -c < "$path" | tr -d ' ')"
if [ "${size:-0}" -gt 52428800 ]; then
echo "::error file=$path::tracked file is larger than 50 MiB"
fail=1
fi
fi
done < <(git ls-files -z)
exit "$fail"

35
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,35 @@
name: CI
on:
push:
pull_request:
jobs:
hygiene:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: bash .gitea/scripts/hygiene-check.sh
go:
runs-on: ubuntu-latest
needs: hygiene
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- run: go build ./...
- run: go test ./...
- uses: golangci/golangci-lint-action@v7
with:
version: v2.4
args: --config .golangci.yml ./...
python:
runs-on: ubuntu-latest
needs: hygiene
steps:
- uses: actions/checkout@v4
- run: python3 -m compileall src alembic

View File

@@ -54,5 +54,8 @@ jobs:
kubectl apply -f k8s/server-deployment.yaml
kubectl apply -f k8s/server-service.yaml
kubectl -n monitoring-tg set image deployment/monitoring-tg-server \
monitoring-tg-server=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }}
monitoring-tg-migrate=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }} \
monitoring-tg-server=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }} \
monitoring-tg-telegram=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }} \
monitoring-tg-classifier=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }}
kubectl -n monitoring-tg rollout status deployment/monitoring-tg-server --timeout=180s

36
.golangci.yml Normal file
View File

@@ -0,0 +1,36 @@
version: "2"
run:
timeout: 3m
linters:
default: none
enable:
- errcheck
- govet
- ineffassign
- staticcheck
- unused
settings:
errcheck:
check-type-assertions: true
check-blank: false
exclude-functions:
- (io.Closer).Close
- (net/http.ResponseWriter).Write
- (*encoding/json.Encoder).Encode
- io.Copy
- fmt.Fprintf
- (github.com/jackc/pgx/v5.Tx).Rollback
- os.RemoveAll
staticcheck:
checks: ["all", "-SA1019", "-ST1000", "-ST1005", "-ST1020", "-ST1021", "-ST1022"]
exclusions:
rules:
- path: _test\.go
linters:
- errcheck
issues:
max-issues-per-linter: 0
max-same-issues: 0

View File

@@ -4,6 +4,7 @@ WORKDIR /src
COPY go.mod go.sum ./
COPY cmd ./cmd
COPY internal ./internal
RUN CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-classifier ./cmd/classifier \
&& CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-server ./cmd/server

View File

@@ -4,8 +4,8 @@ Backend-сервис мониторинга Telegram-каналов для Porta
AI-классификация работают на Go, Python оставлен только как внутренний
MTProto/Telethon-адаптер для авторизации, опроса каналов и дозагрузки медиа.
Сервис сохраняет сообщения в Postgres, раскладывает каналы по
вертикалям/подразделам и выполняет AI-анализ через OpenAI-compatible endpoint,
общий с другими сервисами портала.
вертикалям/подразделам и выполняет AI-анализ через общий `ai-service`,
который уже сам обращается к OpenAI-compatible backend.
Пользовательский UI живёт в `portal/frontend/src/app/features/monitoring-tg`.
Этот сервис не отдаёт отдельные HTML-страницы и работает как API/worker за
@@ -33,7 +33,7 @@ MTProto/Telethon-адаптер для авторизации, опроса ка
## Запуск в k8s
Манифесты лежат в `k8s/`. Перед применением нужно заполнить `k8s/secrets.yaml`
реальными Telegram-кредами и, при необходимости, `LLM_API_KEY`.
реальными Telegram-кредами и `AI_SERVICE_TOKEN`.
```bash
kubectl apply -k k8s

View File

@@ -0,0 +1,43 @@
"""channel aliases across department sections
Revision ID: 0011
Revises: 0010
Create Date: 2026-06-08
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0011"
down_revision: Union[str, None] = "0010"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column("channels", sa.Column("source_channel_id", sa.Integer(), nullable=True))
op.create_foreign_key(
"fk_channels_source_channel_id",
"channels",
"channels",
["source_channel_id"],
["id"],
ondelete="SET NULL",
)
op.drop_constraint("channels_identifier_key", "channels", type_="unique")
op.create_unique_constraint(
"uq_channels_section_identifier",
"channels",
["section_id", "identifier"],
)
op.create_index("ix_channels_source_channel_id", "channels", ["source_channel_id"])
def downgrade() -> None:
op.drop_index("ix_channels_source_channel_id", table_name="channels")
op.drop_constraint("uq_channels_section_identifier", "channels", type_="unique")
op.create_unique_constraint("channels_identifier_key", "channels", ["identifier"])
op.drop_constraint("fk_channels_source_channel_id", "channels", type_="foreignkey")
op.drop_column("channels", "source_channel_id")

View File

@@ -0,0 +1,38 @@
"""per-section message classifications
Revision ID: 0012
Revises: 0011
Create Date: 2026-06-11
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
revision: str = "0012"
down_revision: Union[str, None] = "0011"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"message_classifications",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("message_id", sa.Integer(), sa.ForeignKey("messages.id", ondelete="CASCADE"), nullable=False),
sa.Column("section_id", sa.Integer(), sa.ForeignKey("sections.id", ondelete="CASCADE"), nullable=False),
sa.Column("vertical", sa.String(length=32), nullable=False),
sa.Column("verdict", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.UniqueConstraint("message_id", "section_id", name="uq_message_classification_section"),
)
op.create_index("ix_message_classifications_message", "message_classifications", ["message_id"])
op.create_index("ix_message_classifications_section", "message_classifications", ["section_id"])
def downgrade() -> None:
op.drop_index("ix_message_classifications_section", table_name="message_classifications")
op.drop_index("ix_message_classifications_message", table_name="message_classifications")
op.drop_table("message_classifications")

View File

@@ -0,0 +1,34 @@
"""channel poll status
Revision ID: 0013
Revises: 0012
Create Date: 2026-06-17
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0013"
down_revision: Union[str, None] = "0012"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column("channels", sa.Column("last_poll_status", sa.String(length=32), nullable=True))
op.add_column("channels", sa.Column("last_poll_error_code", sa.String(length=64), nullable=True))
op.add_column("channels", sa.Column("last_poll_error", sa.Text(), nullable=True))
op.add_column("channels", sa.Column("last_poll_error_at", sa.DateTime(timezone=True), nullable=True))
op.create_index("ix_channels_last_poll_status", "channels", ["last_poll_status"])
op.create_index("ix_channels_last_poll_error_code", "channels", ["last_poll_error_code"])
def downgrade() -> None:
op.drop_index("ix_channels_last_poll_error_code", table_name="channels")
op.drop_index("ix_channels_last_poll_status", table_name="channels")
op.drop_column("channels", "last_poll_error_at")
op.drop_column("channels", "last_poll_error")
op.drop_column("channels", "last_poll_error_code")
op.drop_column("channels", "last_poll_status")

View File

@@ -1,14 +1,11 @@
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"os/signal"
@@ -17,6 +14,9 @@ import (
"syscall"
"time"
"monitoring-tg/internal/aiservice"
"monitoring-tg/internal/dbretry"
"github.com/jackc/pgx/v5/pgxpool"
)
@@ -33,18 +33,19 @@ type config struct {
PostgresPort int
LLMEnabled bool
LLMBaseURL string
LLMAPIKey string
LLMModel string
LLMTimeout time.Duration
LLMMaxTokens int
LLMMinTextLength int
ClassifyInterval time.Duration
ClassifyBatchSize int
AIServiceURL string
AIServiceToken string
}
type pendingMessage struct {
ID int64
SectionID int64
Text string
Vertical string
SectionSlug string
@@ -52,29 +53,10 @@ type pendingMessage struct {
Extracted map[string]any
}
type chatRequest struct {
Model string `json:"model"`
Messages []chatMessage `json:"messages"`
Temperature float64 `json:"temperature"`
MaxTokens int `json:"max_tokens"`
ResponseFormat responseFmt `json:"response_format"`
}
type chatMessage struct {
Role string `json:"role"`
Content string `json:"content"`
}
type responseFmt struct {
Type string `json:"type"`
}
type chatResponse struct {
Choices []struct {
Message chatMessage `json:"message"`
} `json:"choices"`
}
func main() {
cfg := loadConfig()
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
@@ -88,14 +70,18 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
pool, err := pgxpool.New(ctx, cfg.databaseURL())
pool, err := dbretry.Connect(ctx, cfg.databaseURL(), 2*time.Minute)
if err != nil {
slog.Error("db_connect_failed", "error", err)
os.Exit(1)
}
defer pool.Close()
worker := &classifier{cfg: cfg, db: pool, http: &http.Client{Timeout: cfg.LLMTimeout}}
worker := &classifier{
cfg: cfg,
db: pool,
ai: aiservice.New(cfg.AIServiceURL, cfg.AIServiceToken, cfg.LLMTimeout),
}
slog.Info(
"classifier_started",
"interval", cfg.ClassifyInterval.String(),
@@ -107,11 +93,11 @@ func main() {
defer ticker.Stop()
for {
updated, err := worker.runOnce(ctx)
updated, enqueued, err := worker.runOnce(ctx)
if err != nil {
slog.Error("classify_batch_failed", "error", err)
} else if updated > 0 {
slog.Info("classify_batch_done", "updated", updated)
} else if updated > 0 || enqueued > 0 {
slog.Info("classify_batch_done", "updated", updated, "enqueued", enqueued)
}
select {
@@ -124,67 +110,116 @@ func main() {
}
type classifier struct {
cfg config
db *pgxpool.Pool
http *http.Client
cfg config
db *pgxpool.Pool
ai *aiservice.Client
}
func (c *classifier) runOnce(ctx context.Context) (int, error) {
func (c *classifier) runOnce(ctx context.Context) (int, int, error) {
rows, err := c.loadPending(ctx)
if err != nil {
return 0, err
return 0, 0, err
}
if len(rows) == 0 {
return 0, nil
return 0, 0, nil
}
byRef := make(map[string]pendingMessage, len(rows))
jobs := make([]aiservice.CreateJobRequest, 0, len(rows))
updated := 0
for _, msg := range rows {
key := verdictKey(msg.Vertical)
if _, ok := msg.Extracted[key]; ok {
continue
}
verdict, err := c.classify(ctx, msg)
if err != nil {
slog.Warn("llm_classify_failed", "message_id", msg.ID, "vertical", msg.Vertical, "error", err)
continue
}
if len(verdict) == 0 {
verdict, err = marshalRaw(negativeVerdict(msg.Vertical))
if len(strings.TrimSpace(msg.Text)) < c.cfg.LLMMinTextLength {
verdict, err := marshalRaw(negativeVerdict(msg.Vertical))
if err != nil {
slog.Warn("negative_verdict_failed", "message_id", msg.ID, "error", err)
slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err)
continue
}
}
if err := c.saveVerdict(ctx, msg.ID, key, verdict); err != nil {
slog.Warn("save_verdict_failed", "message_id", msg.ID, "error", err)
if err := c.saveVerdict(ctx, msg, key, verdict); err != nil {
slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err)
continue
}
updated++
continue
}
updated++
req, err := c.buildJobRequest(ctx, msg)
if err != nil {
slog.Warn("build_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "vertical", msg.Vertical, "error", err)
continue
}
byRef[req.OwnerRef] = msg
jobs = append(jobs, req)
}
return updated, nil
if len(jobs) == 0 {
return updated, 0, nil
}
created, err := c.ai.CreateJobs(ctx, aiservice.CreateJobsRequest{
OwnerService: "monitoring-tg",
TaskType: "telegram_classification",
ModelProfile: c.cfg.LLMModel,
Priority: 5,
MaxAttempts: 2,
Jobs: jobs,
})
if err != nil {
return updated, 0, err
}
for _, job := range created {
msg, ok := byRef[job.OwnerRef]
if !ok {
continue
}
switch job.Status {
case "done":
verdict, err := c.verdictFromJob(job, msg.Vertical)
if err != nil {
slog.Warn("parse_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil {
slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
updated++
case "failed", "cancelled":
verdict, err := marshalRaw(negativeVerdict(msg.Vertical))
if err != nil {
slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil {
slog.Warn("save_failed_job_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
updated++
slog.Warn("classify_job_failed_marked_negative", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "status", job.Status, "error", derefString(job.ErrorMessage))
}
}
return updated, len(created), nil
}
func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error) {
rows, err := c.db.Query(ctx, `
SELECT
m.id,
s.id,
m.text,
c.vertical,
s.slug,
COALESCE(s.department_id, ''),
COALESCE(m.extracted, '{}'::jsonb)::text
COALESCE(mc.verdict, '{}'::jsonb)::text
FROM messages m
JOIN channels c ON c.id = m.channel_id
JOIN channels src ON src.id = m.channel_id
JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id
JOIN sections s ON s.id = c.section_id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE m.text IS NOT NULL
AND (
(c.vertical = 'hr' AND (m.extracted IS NULL OR m.extracted->'hr_lead' IS NULL))
OR
(c.vertical <> 'hr' AND (m.extracted IS NULL OR m.extracted->'lead' IS NULL))
)
AND mc.id IS NULL
ORDER BY m.id DESC
LIMIT $1
`, c.cfg.ClassifyBatchSize)
@@ -197,7 +232,7 @@ func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error)
for rows.Next() {
var msg pendingMessage
var extractedText string
if err := rows.Scan(&msg.ID, &msg.Text, &msg.Vertical, &msg.SectionSlug, &msg.DepartmentID, &extractedText); err != nil {
if err := rows.Scan(&msg.ID, &msg.SectionID, &msg.Text, &msg.Vertical, &msg.SectionSlug, &msg.DepartmentID, &extractedText); err != nil {
return nil, err
}
if err := json.Unmarshal([]byte(extractedText), &msg.Extracted); err != nil {
@@ -208,60 +243,56 @@ func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error)
return out, rows.Err()
}
func (c *classifier) classify(ctx context.Context, msg pendingMessage) (json.RawMessage, error) {
if len(strings.TrimSpace(msg.Text)) < c.cfg.LLMMinTextLength {
return marshalRaw(negativeVerdict(msg.Vertical))
}
func (c *classifier) buildJobRequest(ctx context.Context, msg pendingMessage) (aiservice.CreateJobRequest, error) {
systemPrompt, err := c.resolvePrompt(ctx, msg.Vertical, msg.DepartmentID, msg.SectionSlug)
if err != nil {
return nil, err
return aiservice.CreateJobRequest{}, err
}
systemPrompt = promptWithVerticalGuard(msg.Vertical, systemPrompt)
payload := chatRequest{
Model: c.cfg.LLMModel,
Messages: []chatMessage{
responseFormat, _ := json.Marshal(responseFmt{Type: "json_object"})
payload := aiservice.ChatInput{
Messages: []aiservice.Message{
{Role: "system", Content: systemPrompt},
{Role: "user", Content: buildUserPrompt(msg.Text)},
},
Temperature: 0.1,
MaxTokens: c.cfg.LLMMaxTokens,
ResponseFormat: responseFmt{Type: "json_object"},
ResponseFormat: responseFormat,
}
body, err := json.Marshal(payload)
if err != nil {
return aiservice.CreateJobRequest{}, err
}
ownerRef := classifyOwnerRef(msg)
return aiservice.CreateJobRequest{
OwnerService: "monitoring-tg",
OwnerRef: ownerRef,
TaskType: "telegram_classification",
ModelProfile: c.cfg.LLMModel,
Priority: 5,
MaxAttempts: 2,
Input: body,
// Classification is section-specific because prompts are section-specific.
IdempotencyKey: "monitoring-tg:telegram_classification:" + ownerRef,
}, nil
}
func (c *classifier) verdictFromJob(job *aiservice.Job, vertical string) (json.RawMessage, error) {
if job.Status != "done" {
msg := "ai-service job " + job.Status
if job.ErrorMessage != nil && *job.ErrorMessage != "" {
msg += ": " + *job.ErrorMessage
}
return nil, errors.New(msg)
}
var parsed aiservice.ChatResult
if err := json.Unmarshal(job.Result, &parsed); err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, strings.TrimRight(c.cfg.LLMBaseURL, "/")+"/v1/chat/completions", bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
if c.cfg.LLMAPIKey != "" {
req.Header.Set("Authorization", "Bearer "+c.cfg.LLMAPIKey)
}
resp, err := c.http.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
return nil, fmt.Errorf("llm http %d: %s", resp.StatusCode, strings.TrimSpace(string(b)))
}
var parsed chatResponse
if err := json.NewDecoder(resp.Body).Decode(&parsed); err != nil {
return nil, err
}
if len(parsed.Choices) == 0 {
return nil, errors.New("llm returned no choices")
}
raw := strings.TrimSpace(parsed.Choices[0].Message.Content)
raw := strings.TrimSpace(parsed.Content)
if raw == "" {
return nil, errors.New("llm returned empty content")
}
@@ -269,7 +300,7 @@ func (c *classifier) classify(ctx context.Context, msg pendingMessage) (json.Raw
if err != nil {
return nil, err
}
normalized, err := normalizeVerdict(msg.Vertical, block)
normalized, err := normalizeVerdict(vertical, block)
if err != nil {
return nil, err
}
@@ -297,8 +328,19 @@ func (c *classifier) resolvePrompt(ctx context.Context, vertical, departmentID,
return defaultPrompt(vertical), nil
}
func (c *classifier) saveVerdict(ctx context.Context, id int64, key string, verdict json.RawMessage) error {
func (c *classifier) saveVerdict(ctx context.Context, msg pendingMessage, key string, verdict json.RawMessage) error {
_, err := c.db.Exec(ctx, `
INSERT INTO message_classifications (message_id, section_id, vertical, verdict, updated_at)
VALUES ($1, $2, $3, $4::jsonb, now())
ON CONFLICT ON CONSTRAINT uq_message_classification_section DO UPDATE
SET vertical = EXCLUDED.vertical,
verdict = EXCLUDED.verdict,
updated_at = now()
`, msg.ID, msg.SectionID, msg.Vertical, string(verdict))
if err != nil {
return err
}
_, err = c.db.Exec(ctx, `
UPDATE messages
SET extracted = jsonb_set(
CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END,
@@ -307,7 +349,7 @@ func (c *classifier) saveVerdict(ctx context.Context, id int64, key string, verd
true
)
WHERE id = $1
`, id, key, string(verdict))
`, msg.ID, key, string(verdict))
return err
}
@@ -325,6 +367,10 @@ func verdictKey(vertical string) string {
return "lead"
}
func classifyOwnerRef(msg pendingMessage) string {
return fmt.Sprintf("%d:%d", msg.ID, msg.SectionID)
}
func buildUserPrompt(text string) string {
return "Текст сообщения:\n```\n" + text + "\n```\nВерни JSON."
}
@@ -410,6 +456,13 @@ func asFloat(v any) (float64, bool) {
}
}
func derefString(v *string) string {
if v == nil {
return ""
}
return *v
}
func defaultPrompt(vertical string) string {
if vertical == verticalHR {
return defaultHRPrompt
@@ -417,6 +470,16 @@ func defaultPrompt(vertical string) string {
return defaultREPrompt
}
func promptWithVerticalGuard(vertical, prompt string) string {
if vertical == verticalHR {
return prompt
}
if strings.Contains(prompt, realEstateOnlyGuard) {
return prompt
}
return strings.TrimSpace(prompt) + "\n\n" + realEstateOnlyGuard
}
func loadConfig() config {
return config{
PostgresUser: env("POSTGRES_USER", "parser"),
@@ -425,14 +488,14 @@ func loadConfig() config {
PostgresHost: env("POSTGRES_HOST", "db"),
PostgresPort: envInt("POSTGRES_PORT", 5432),
LLMEnabled: envBool("LLM_ENABLED", true),
LLMBaseURL: env("LLM_BASE_URL", "http://10.2.3.5:8002"),
LLMAPIKey: env("LLM_API_KEY", ""),
LLMModel: env("LLM_MODEL", "qwen2.5-14b"),
LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second,
LLMMaxTokens: envInt("LLM_MAX_TOKENS", 600),
LLMMinTextLength: envInt("LLM_MIN_TEXT_LENGTH", 20),
ClassifyInterval: time.Duration(envInt("LLM_CLASSIFY_INTERVAL_SECONDS", 20)) * time.Second,
ClassifyBatchSize: envInt("LLM_CLASSIFY_BATCH_SIZE", 5),
ClassifyInterval: time.Duration(envInt("LLM_CLASSIFY_INTERVAL_SECONDS", 5)) * time.Second,
ClassifyBatchSize: envInt("LLM_CLASSIFY_BATCH_SIZE", 200),
AIServiceURL: env("AI_SERVICE_URL", ""),
AIServiceToken: env("AI_SERVICE_TOKEN", ""),
}
}
@@ -480,6 +543,8 @@ func envBool(key string, fallback bool) bool {
const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала.
Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости.
Учитывай только сделки с недвижимостью: квартиры, дома, апартаменты, участки, коммерческие помещения и другие объекты недвижимости.
Любые продажи или заявки по стройматериалам, мебели, бытовой/строительной технике, автомобилям, услугам, оборудованию и прочим товарам не являются лидами недвижимости — для них is_listing=false.
Отвечай строго валидным JSON без markdown:
{
"is_listing": boolean,
@@ -498,6 +563,9 @@ const defaultREPrompt = `Ты — аналитик объявлений о не
}
summary всегда по-русски, confidence в диапазоне 0..1.`
const realEstateOnlyGuard = `Жёсткое правило для недвижимости: учитывай только сделки с недвижимостью.
Если сообщение продаёт или покупает стройматериалы, мебель, бытовую/строительную технику, автомобили, услуги, оборудование или любые другие товары/работы не по недвижимости — это НЕ лид недвижимости, верни is_listing=false.`
const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала.
Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт.
Отвечай строго валидным JSON без markdown:

File diff suppressed because it is too large Load Diff

1
go.mod
View File

@@ -3,6 +3,7 @@ module monitoring-tg
go 1.25.7
require (
gitea.estateliga.work/admin/portal-common v0.3.0
github.com/jackc/pgx/v5 v5.9.1
github.com/minio/minio-go/v7 v7.2.0
)

2
go.sum
View File

@@ -1,3 +1,5 @@
gitea.estateliga.work/admin/portal-common v0.3.0 h1:xpr9UeLXk5pCcNXcTVGZzJZr0Ni7An7DV0OkuYv9qVM=
gitea.estateliga.work/admin/portal-common v0.3.0/go.mod h1:C860q6g38KVMsv+mKv6k1Vm7smVRCycl+N6r63TElnk=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=

View File

@@ -0,0 +1,308 @@
package aiservice
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)
type Client struct {
baseURL string
token string
http *http.Client
}
type Message struct {
Role string `json:"role"`
Content string `json:"content"`
}
type ChatInput struct {
Messages []Message `json:"messages"`
Temperature float64 `json:"temperature"`
MaxTokens int `json:"max_tokens,omitempty"`
ResponseFormat json.RawMessage `json:"response_format,omitempty"`
}
type CreateJobRequest struct {
OwnerService string `json:"owner_service"`
OwnerRef string `json:"owner_ref"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Priority int `json:"priority"`
MaxAttempts int `json:"max_attempts"`
Input json.RawMessage `json:"input"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
}
type CreateJobsRequest struct {
OwnerService string `json:"owner_service,omitempty"`
TaskType string `json:"task_type,omitempty"`
ModelProfile string `json:"model_profile,omitempty"`
Priority int `json:"priority,omitempty"`
MaxAttempts int `json:"max_attempts,omitempty"`
Jobs []CreateJobRequest `json:"jobs"`
}
type Job struct {
ID string `json:"id"`
OwnerService string `json:"owner_service,omitempty"`
OwnerRef string `json:"owner_ref,omitempty"`
TaskType string `json:"task_type,omitempty"`
ModelProfile string `json:"model_profile,omitempty"`
Status string `json:"status"`
Result json.RawMessage `json:"result,omitempty"`
ErrorCode *string `json:"error_code,omitempty"`
ErrorMessage *string `json:"error_message,omitempty"`
IdempotencyKey *string `json:"idempotency_key,omitempty"`
}
type ChatResult struct {
Content string `json:"content"`
Model string `json:"model"`
DurationMS int64 `json:"duration_ms"`
}
type ProvidersStatus struct {
At time.Time `json:"at"`
Providers []ProviderStatus `json:"providers"`
}
type ProviderStatus struct {
Name string `json:"name"`
Configured bool `json:"configured"`
OK bool `json:"ok"`
URL string `json:"url,omitempty"`
Model string `json:"model,omitempty"`
LatencyMS int64 `json:"latency_ms,omitempty"`
Error string `json:"error,omitempty"`
}
type Stats struct {
At time.Time `json:"at"`
Owners []OwnerStat `json:"owners,omitempty"`
Errors []ErrorStat `json:"errors,omitempty"`
Backlog []BacklogStat `json:"backlog,omitempty"`
}
type OwnerStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Status string `json:"status"`
Total int64 `json:"total"`
}
type ErrorStat struct {
OwnerService string `json:"owner_service,omitempty"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
ErrorCode string `json:"error_code"`
Total int64 `json:"total"`
Last24h int64 `json:"last_24h"`
}
type BacklogStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Pending int64 `json:"pending"`
Running int64 `json:"running"`
StaleRunning int64 `json:"stale_running"`
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"`
}
func New(baseURL, token string, timeout time.Duration) *Client {
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
if baseURL == "" {
return nil
}
if timeout <= 0 {
timeout = 2 * time.Minute
}
return &Client{
baseURL: baseURL,
token: strings.TrimSpace(token),
http: &http.Client{Timeout: timeout},
}
}
func (c *Client) CreateJob(ctx context.Context, req CreateJobRequest) (*Job, error) {
if c == nil {
return nil, fmt.Errorf("ai-service not configured")
}
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal ai job: %w", err)
}
httpReq, err := c.request(ctx, http.MethodPost, "/api/v1/jobs", body)
if err != nil {
return nil, err
}
resp, err := c.http.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("create ai job: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("create ai job: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out Job
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai job: %w", err)
}
return &out, nil
}
func (c *Client) CreateJobs(ctx context.Context, req CreateJobsRequest) ([]*Job, error) {
if c == nil {
return nil, fmt.Errorf("ai-service not configured")
}
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal ai jobs: %w", err)
}
httpReq, err := c.request(ctx, http.MethodPost, "/api/v1/jobs/batch", body)
if err != nil {
return nil, err
}
resp, err := c.http.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("create ai jobs: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("create ai jobs: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out struct {
Jobs []*Job `json:"jobs"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai jobs: %w", err)
}
return out.Jobs, nil
}
func (c *Client) GetJob(ctx context.Context, id string) (*Job, error) {
if c == nil || strings.TrimSpace(id) == "" {
return nil, fmt.Errorf("ai job id is required")
}
req, err := c.request(ctx, http.MethodGet, "/api/v1/jobs/"+id, nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("get ai job: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("get ai job: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out Job
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai job: %w", err)
}
return &out, nil
}
func (c *Client) WaitJob(ctx context.Context, id string, pollInterval time.Duration) (*Job, error) {
if pollInterval <= 0 {
pollInterval = 2 * time.Second
}
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
job, err := c.GetJob(ctx, id)
if err != nil {
return nil, err
}
switch job.Status {
case "done", "failed", "cancelled":
return job, nil
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ticker.C:
}
}
}
func (c *Client) ProvidersStatus(ctx context.Context) (*ProvidersStatus, error) {
if c == nil {
return nil, fmt.Errorf("ai-service not configured")
}
req, err := c.request(ctx, http.MethodGet, "/api/v1/providers/status", nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("ai providers status: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("ai providers status: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out ProvidersStatus
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai providers status: %w", err)
}
return &out, nil
}
func (c *Client) Stats(ctx context.Context) (*Stats, error) {
if c == nil {
return nil, fmt.Errorf("ai-service not configured")
}
req, err := c.request(ctx, http.MethodGet, "/api/v1/stats", nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("ai stats: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("ai stats: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out Stats
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai stats: %w", err)
}
return &out, nil
}
func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) {
var r io.Reader
if body != nil {
r = bytes.NewReader(body)
}
req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, r)
if err != nil {
return nil, err
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
if c.token != "" {
req.Header.Set("Authorization", "Bearer "+c.token)
}
return req, nil
}
func readSmall(r io.Reader) string {
body, err := io.ReadAll(io.LimitReader(r, 1024))
if err != nil {
return err.Error()
}
return strings.TrimSpace(string(body))
}

View File

@@ -0,0 +1,44 @@
package dbretry
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
func Connect(ctx context.Context, databaseURL string, maxWait time.Duration) (*pgxpool.Pool, error) {
deadline := time.Now().Add(maxWait)
var lastErr error
for attempt := 1; ; attempt++ {
pool, err := pgxpool.New(ctx, databaseURL)
if err == nil {
if pingErr := pool.Ping(ctx); pingErr == nil {
return pool, nil
} else {
err = fmt.Errorf("ping postgres: %w", pingErr)
pool.Close()
}
} else {
err = fmt.Errorf("connect postgres: %w", err)
}
lastErr = err
if time.Now().After(deadline) {
return nil, fmt.Errorf("connect postgres after retry: %w", lastErr)
}
sleep := time.Duration(attempt) * time.Second
if sleep > 5*time.Second {
sleep = 5 * time.Second
}
timer := time.NewTimer(sleep)
select {
case <-ctx.Done():
timer.Stop()
return nil, fmt.Errorf("connect postgres cancelled: %w", ctx.Err())
case <-timer.C:
}
}
}

View File

@@ -22,7 +22,7 @@ data:
POLL_INTERVAL_SECONDS: "60"
POLL_HISTORY_LIMIT: "50"
LLM_ENABLED: "1"
LLM_BASE_URL: "http://10.2.3.5:8002"
LLM_MODEL: "qwen2.5-14b"
LLM_MAX_TOKENS: "600"
LLM_CLASSIFIER_OWNER: "go"
AI_SERVICE_URL: "http://ai-service.ai-service.svc.cluster.local:8080"

View File

@@ -10,7 +10,8 @@ stringData:
TG_PHONE: "+971524994695"
TG_SESSION_STRING: ""
POSTGRES_PASSWORD: "parser"
LLM_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32"
INTERNAL_API_KEY: "36fe89ed40c01fdc54d3cf4e3fcacc8751dc456a4a1acd394e9fed48257c5734"
AI_SERVICE_TOKEN: "d18bcacf9e02bae1806ee6b6eeda62b95be6a915c0a22936d9a700128b275442"
MINIO_ACCESS_KEY: "admjn"
MINIO_SECRET_KEY: "TropicalMacaw9Fantasize"
---

View File

@@ -13,6 +13,19 @@ def portal_department_id(request: Request) -> str | None:
return value or None
def portal_department_ids(request: Request) -> list[str]:
raw = (request.headers.get("x-user-department-ids") or "").strip()
out: list[str] = []
for part in raw.split(","):
value = part.strip()
if value and value not in out:
out.append(value)
current = portal_department_id(request)
if current and current not in out:
out.append(current)
return out
def is_department_head_request(request: Request) -> bool:
return request.headers.get("x-user-is-department-head") == "1"

View File

@@ -7,14 +7,14 @@ from sqlalchemy.ext.asyncio import AsyncSession
from parser_bot.access import (
is_admin_request,
portal_department_id,
portal_department_ids,
require_department_manager,
require_telegram_auth_manager,
)
from parser_bot.config import settings
from parser_bot.db.models import Channel, Section
from parser_bot.db.session import get_session
from parser_bot.scheduler.poller import backfill_media, poll_channel
from parser_bot.scheduler.poller import PollError, backfill_media, poll_channel
from parser_bot.telegram import client as tg
router = APIRouter()
@@ -39,13 +39,13 @@ class AuthCodeResult(BaseModel):
needs_password: bool
def _department_scope(request: Request) -> str | None:
def _department_scopes(request: Request) -> list[str] | None:
if is_admin_request(request):
return None
dept_id = portal_department_id(request)
if not dept_id:
dept_ids = portal_department_ids(request)
if not dept_ids:
raise HTTPException(status_code=403, detail="department is required")
return dept_id
return dept_ids
async def _require_channel_scope(
@@ -55,7 +55,7 @@ async def _require_channel_scope(
vertical: str | None,
section: str | None,
) -> None:
department_id = _department_scope(request)
department_ids = _department_scopes(request)
stmt = (
select(Channel.id)
.join(Section, Section.id == Channel.section_id)
@@ -65,13 +65,25 @@ async def _require_channel_scope(
stmt = stmt.where(Channel.vertical == vertical)
if section:
stmt = stmt.where(Section.slug == section)
if department_id is not None:
stmt = stmt.where(Section.department_id == department_id)
if department_ids is not None:
stmt = stmt.where(Section.department_id.in_(department_ids))
exists = (await session.execute(stmt)).scalar_one_or_none()
if exists is None:
raise HTTPException(status_code=404)
def _poll_skipped_result(exc: PollError) -> dict[str, Any]:
result: dict[str, Any] = {
"inserted": 0,
"status": "skipped",
"code": exc.code,
"message": exc.message,
}
if exc.retry_after is not None:
result["retry_after"] = exc.retry_after
return result
@router.get("/auth/status", response_model=AuthStatus, dependencies=[Depends(require_telegram_auth_manager)])
async def auth_status() -> AuthStatus:
authorized = await tg.is_authorized()
@@ -126,10 +138,13 @@ async def trigger_poll(
vertical: str | None = Query(None),
section: str | None = Query(None),
session: AsyncSession = Depends(get_session),
) -> dict[str, int]:
) -> dict[str, Any]:
await _require_channel_scope(session, request, channel_id, vertical, section)
inserted = await poll_channel(channel_id)
return {"inserted": inserted}
try:
inserted = await poll_channel(channel_id)
except PollError as exc:
return _poll_skipped_result(exc)
return {"inserted": inserted, "status": "ok"}
@router.post(
@@ -167,7 +182,7 @@ async def trigger_poll_all(
section: str | None = Query(None),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
department_id = _department_scope(request)
department_ids = _department_scopes(request)
stmt = (
select(Channel.id)
.join(Section, Section.id == Channel.section_id)
@@ -175,8 +190,8 @@ async def trigger_poll_all(
)
if section:
stmt = stmt.where(Section.slug == section)
if department_id is not None:
stmt = stmt.where(Section.department_id == department_id)
if department_ids is not None:
stmt = stmt.where(Section.department_id.in_(department_ids))
result = await session.execute(stmt)
ids = [row[0] for row in result.all()]
background.add_task(_poll_all_in_background, ids)

View File

@@ -54,12 +54,16 @@ class Section(Base):
class Channel(Base):
__tablename__ = "channels"
__table_args__ = (
UniqueConstraint("section_id", "identifier", name="uq_channels_section_identifier"),
Index("ix_channels_source_channel_id", "source_channel_id"),
)
id: Mapped[int] = mapped_column(primary_key=True)
# Telegram numeric channel id (peer id), nullable until first resolve
tg_id: Mapped[int | None] = mapped_column(BigInteger, unique=True, nullable=True)
# Username or t.me/joinchat link supplied by user
identifier: Mapped[str] = mapped_column(String(255), unique=True)
identifier: Mapped[str] = mapped_column(String(255))
title: Mapped[str | None] = mapped_column(String(512), nullable=True)
# 'real_estate' or 'hr' — picks which LLM prompt and lead schema is used
vertical: Mapped[str] = mapped_column(
@@ -68,9 +72,16 @@ class Channel(Base):
section_id: Mapped[int] = mapped_column(
ForeignKey("sections.id", ondelete="RESTRICT"), index=True
)
source_channel_id: Mapped[int | None] = mapped_column(
ForeignKey("channels.id", ondelete="SET NULL"), nullable=True
)
is_active: Mapped[bool] = mapped_column(default=True, server_default="true")
last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
last_poll_status: Mapped[str | None] = mapped_column(String(32), nullable=True)
last_poll_error_code: Mapped[str | None] = mapped_column(String(64), nullable=True)
last_poll_error: Mapped[str | None] = mapped_column(Text, nullable=True)
last_poll_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
@@ -112,6 +123,24 @@ class Message(Base):
channel: Mapped[Channel] = relationship(back_populates="messages")
class MessageClassification(Base):
__tablename__ = "message_classifications"
__table_args__ = (
UniqueConstraint("message_id", "section_id", name="uq_message_classification_section"),
Index("ix_message_classifications_message", "message_id"),
Index("ix_message_classifications_section", "section_id"),
)
id: Mapped[int] = mapped_column(primary_key=True)
message_id: Mapped[int] = mapped_column(ForeignKey("messages.id", ondelete="CASCADE"))
section_id: Mapped[int] = mapped_column(ForeignKey("sections.id", ondelete="CASCADE"))
vertical: Mapped[str] = mapped_column(String(32))
verdict: Mapped[dict] = mapped_column(JSONB)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
class AppSetting(Base):
"""Runtime-editable settings, edited from the UI without a restart."""

View File

@@ -2,8 +2,9 @@ from datetime import datetime, timezone
import structlog
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy import func, select
from sqlalchemy import func, or_, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.exc import IntegrityError
from parser_bot.config import settings
from parser_bot.db.models import Channel, Message
@@ -19,24 +20,152 @@ from parser_bot.telegram.client import (
log = structlog.get_logger()
class PollError(RuntimeError):
status_code = 400
code = "poll_failed"
retry_after: int | None = None
def __init__(self, message: str) -> None:
super().__init__(message)
self.message = message
class PollUnauthorizedError(PollError):
status_code = 401
code = "telegram_not_authorized"
class PollChannelUnavailableError(PollError):
status_code = 422
code = "telegram_channel_unavailable"
class PollFloodWaitError(PollError):
status_code = 429
code = "telegram_flood_wait"
def __init__(self, seconds: int | None) -> None:
self.retry_after = seconds
wait = f"{seconds} sec" if seconds else "a while"
super().__init__(f"Telegram asked to wait {wait} before retrying")
class PollDuplicateChannelError(PollError):
status_code = 409
code = "telegram_channel_duplicate"
def _translate_telegram_error(exc: Exception, identifier: str) -> PollError:
name = type(exc).__name__
message = str(exc)
lower = message.lower()
if name == "FloodWaitError":
return PollFloodWaitError(getattr(exc, "seconds", None))
if name in {
"ChannelPrivateError",
"UsernameInvalidError",
"UsernameNotOccupiedError",
"InviteHashExpiredError",
"InviteHashInvalidError",
"ChatAdminRequiredError",
}:
return PollChannelUnavailableError(message)
if (
"cannot get entity" in lower
or "cannot find any entity" in lower
or "not part of" in lower
or "join the group" in lower
or "invalid channel" in lower
):
return PollChannelUnavailableError(message)
return PollError(f"Cannot poll {identifier}: {message}")
async def poll_channel(channel_id: int) -> int:
"""Poll one channel for new messages. Returns count of inserted rows."""
try:
return await _poll_channel(channel_id)
except PollError as exc:
await _record_poll_error(channel_id, exc.code, exc.message)
raise
except Exception as exc:
await _record_poll_error(channel_id, "poll_failed", str(exc))
raise
async def _poll_channel(channel_id: int) -> int:
if not await is_authorized():
raise PollUnauthorizedError(
"Telegram is not authorized: open Monitoring TG in Portal and authorize it"
)
async with session_scope() as session:
channel = await session.get(Channel, channel_id)
if channel is None or not channel.is_active:
return 0
if channel.source_channel_id is not None:
source = await session.get(Channel, channel.source_channel_id)
if source is not None:
channel.tg_id = None
channel.title = channel.title or source.title
channel.last_message_id = source.last_message_id
channel.last_polled_at = source.last_polled_at
channel.last_poll_status = "alias"
channel.last_poll_error_code = None
channel.last_poll_error = None
channel.last_poll_error_at = None
return 0
if channel.tg_id is None or channel.title is None:
resolved = await resolve_channel(channel.identifier)
try:
resolved = await resolve_channel(channel.identifier)
except Exception as exc:
raise _translate_telegram_error(exc, channel.identifier) from exc
duplicate_id = (
await session.execute(
select(Channel.id).where(
Channel.tg_id == resolved.tg_id,
Channel.id != channel.id,
)
)
).scalar_one_or_none()
if duplicate_id is not None:
source = await session.get(Channel, duplicate_id)
channel.source_channel_id = duplicate_id
channel.tg_id = None
channel.title = channel.title or resolved.title or (source.title if source else None)
channel.last_message_id = source.last_message_id if source else channel.last_message_id
channel.last_polled_at = source.last_polled_at if source else channel.last_polled_at
channel.last_poll_status = "alias"
channel.last_poll_error_code = None
channel.last_poll_error = None
channel.last_poll_error_at = None
log.info(
"linked_channel_alias",
channel_id=channel.id,
source_channel_id=duplicate_id,
tg_id=resolved.tg_id,
)
return 0
channel.tg_id = resolved.tg_id
channel.title = resolved.title
try:
await session.flush()
except IntegrityError as exc:
raise PollDuplicateChannelError(
"Telegram channel is already connected to another channel"
) from exc
msgs = await fetch_new_messages(
channel.identifier,
min_id=channel.last_message_id,
limit=settings.poll_history_limit,
download_media_for_channel_id=channel.id,
)
try:
msgs = await fetch_new_messages(
channel.identifier,
min_id=channel.last_message_id,
limit=settings.poll_history_limit,
download_media_for_channel_id=channel.id,
)
except Exception as exc:
raise _translate_telegram_error(exc, channel.identifier) from exc
inserted = 0
for m in msgs:
@@ -70,6 +199,10 @@ async def poll_channel(channel_id: int) -> int:
channel.last_message_id or 0, msgs[-1].tg_message_id
)
channel.last_polled_at = datetime.now(timezone.utc)
channel.last_poll_status = "ok"
channel.last_poll_error_code = None
channel.last_poll_error = None
channel.last_poll_error_at = None
log.info(
"polled_channel",
@@ -81,6 +214,17 @@ async def poll_channel(channel_id: int) -> int:
return inserted
async def _record_poll_error(channel_id: int, code: str, message: str) -> None:
async with session_scope() as session:
channel = await session.get(Channel, channel_id)
if channel is None:
return
channel.last_poll_status = "error"
channel.last_poll_error_code = code
channel.last_poll_error = message[:1000]
channel.last_poll_error_at = datetime.now(timezone.utc)
async def poll_all() -> None:
if not await is_authorized():
log.debug("poll_skipped_not_authorized")
@@ -93,6 +237,13 @@ async def poll_all() -> None:
for channel_id in ids:
try:
await poll_channel(channel_id)
except PollError as exc:
log.warning(
"poll_skipped",
channel_id=channel_id,
code=exc.code,
error=exc.message,
)
except Exception as exc:
log.error("poll_failed", channel_id=channel_id, error=str(exc))
@@ -111,10 +262,15 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
if channel is None:
raise RuntimeError("channel not found")
missing_media_condition = or_(
Message.media_files.is_(None),
func.jsonb_array_length(Message.media_files) == 0,
)
pending_q = select(func.count(Message.id)).where(
Message.channel_id == channel_id,
Message.has_media.is_(True),
Message.media_files.is_(None),
missing_media_condition,
)
pending_total = (await session.execute(pending_q)).scalar_one()
@@ -124,7 +280,7 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
.where(
Message.channel_id == channel_id,
Message.has_media.is_(True),
Message.media_files.is_(None),
missing_media_condition,
)
.order_by(Message.tg_message_id.asc())
.limit(batch_size)