Compare commits

..

27 Commits

Author SHA1 Message Date
Grendgi
d0f8b48869 feat: add monitoring tg view-all scope
All checks were successful
CI / hygiene (push) Successful in 1s
Build and Deploy / build-and-deploy (push) Successful in 28s
CI / go (push) Successful in 21s
CI / python (push) Successful in 1s
2026-06-19 14:26:51 +03:00
Grendgi
cdbdea250d feat: expose monitoring tg media repair details
All checks were successful
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 45s
CI / go (push) Successful in 31s
CI / python (push) Successful in 2s
2026-06-17 17:22:50 +03:00
Grendgi
696b7eda6f feat: expose monitoring tg poll diagnostics 2026-06-17 17:19:13 +03:00
Grendgi
bd3b54dc7d feat: track monitoring tg poll errors 2026-06-17 17:01:34 +03:00
Grendgi
6cbfdb92e4 feat: expose monitoring tg ai job health 2026-06-17 16:58:47 +03:00
Grendgi
7cc5109cba feat: expose monitoring tg health detail 2026-06-17 15:55:28 +03:00
Grendgi
90c0a346ed chore: use common internal auth 2026-06-17 14:26:04 +03:00
Grendgi
7f7f2427cb chore: use common header parsing 2026-06-17 14:19:13 +03:00
Grendgi
73afcb64d5 Allow monitoring tg access to managed departments
All checks were successful
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 44s
CI / go (push) Successful in 16s
CI / python (push) Successful in 2s
2026-06-15 18:02:09 +03:00
Grendgi
5eb8e21eda Add monitoring TG CI hygiene guard
All checks were successful
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 27s
CI / go (push) Successful in 26s
CI / python (push) Successful in 2s
2026-06-12 16:42:35 +03:00
Grendgi
778b48cc12 Protect monitoring TG API with internal key 2026-06-12 16:32:10 +03:00
Grendgi
1f1354e72b Retry monitoring TG database connection on startup 2026-06-12 16:28:02 +03:00
Grendgi
fd1ee0611b Batch enqueue TG AI classifications
All checks were successful
CI / go (push) Successful in 18s
CI / python (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 29s
2026-06-12 09:01:41 +03:00
Grendgi
fc696c9e13 Add per-section TG classifications
All checks were successful
CI / go (push) Successful in 16s
CI / python (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 46s
2026-06-11 17:06:00 +03:00
Grendgi
f66ca4b6d4 Ignore non-real-estate sales in TG leads
All checks were successful
CI / go (push) Successful in 26s
CI / python (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 27s
2026-06-11 14:51:29 +03:00
Grendgi
eb12190729 Fix monitoring TG CI lint issues
All checks were successful
CI / go (push) Successful in 28s
CI / python (push) Successful in 1s
Build and Deploy / build-and-deploy (push) Successful in 26s
2026-06-09 11:33:20 +03:00
Grendgi
c4ad6c6c84 Add CI workflow
Some checks failed
Build and Deploy / build-and-deploy (push) Successful in 18s
CI / go (push) Failing after 49s
CI / python (push) Failing after 26s
2026-06-09 10:32:22 +03:00
Grendgi
2d0d751115 Share Telegram channels across sections
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 43s
2026-06-08 23:54:49 +03:00
Grendgi
ddea7002f1 Deploy all monitoring TG containers
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 15s
2026-06-08 23:33:08 +03:00
Grendgi
29490e5f93 Return skipped status for Telegram poll limits
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 30s
2026-06-08 23:29:38 +03:00
Grendgi
5165b31910 Handle Telegram poll failures gracefully
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 31s
2026-06-08 23:14:29 +03:00
Grendgi
e075b11761 Fix Docker build context for Go classifier
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 30s
2026-06-08 16:55:55 +03:00
Grendgi
bc15c2e116 Allow monitoring TG AI retries
Some checks failed
Build and Deploy / build-and-deploy (push) Failing after 5s
2026-06-08 15:48:47 +03:00
Grendgi
8259a01a88 Route monitoring TG classification through AI service
Some checks failed
Build and Deploy / build-and-deploy (push) Failing after 5s
2026-06-08 15:47:42 +03:00
Grendgi
a924cd832b Store monitoring TG media in MinIO
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 45s
2026-06-05 16:58:08 +03:00
Grendgi
4ac976b4eb Harden monitoring TG reanalyze reset 2026-06-05 16:35:58 +03:00
Grendgi
fd6fc6b931 Handle scalar TG extracted payloads 2026-06-05 16:31:20 +03:00
26 changed files with 1991 additions and 289 deletions

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env bash
set -euo pipefail
fail=0
while IFS= read -r -d '' path; do
base="$(basename "$path")"
case "$base" in
.DS_Store|.env)
echo "::error file=$path::tracked local-only file is forbidden"
fail=1
;;
esac
case "$path" in
*node_modules/*|node_modules/*)
echo "::error file=$path::tracked node_modules content is forbidden"
fail=1
;;
*.tmp|*.temp|*.bak|*.orig|*.rej|*.zip|*.tar|*.tar.gz|*.tgz|*.rar|*.7z)
echo "::error file=$path::tracked temporary/archive artifact is forbidden"
fail=1
;;
esac
if [ -f "$path" ]; then
size="$(wc -c < "$path" | tr -d ' ')"
if [ "${size:-0}" -gt 52428800 ]; then
echo "::error file=$path::tracked file is larger than 50 MiB"
fail=1
fi
fi
done < <(git ls-files -z)
exit "$fail"

35
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,35 @@
name: CI
on:
push:
pull_request:
jobs:
hygiene:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: bash .gitea/scripts/hygiene-check.sh
go:
runs-on: ubuntu-latest
needs: hygiene
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- run: go build ./...
- run: go test ./...
- uses: golangci/golangci-lint-action@v7
with:
version: v2.4
args: --config .golangci.yml ./...
python:
runs-on: ubuntu-latest
needs: hygiene
steps:
- uses: actions/checkout@v4
- run: python3 -m compileall src alembic

View File

@@ -54,5 +54,8 @@ jobs:
kubectl apply -f k8s/server-deployment.yaml kubectl apply -f k8s/server-deployment.yaml
kubectl apply -f k8s/server-service.yaml kubectl apply -f k8s/server-service.yaml
kubectl -n monitoring-tg set image deployment/monitoring-tg-server \ kubectl -n monitoring-tg set image deployment/monitoring-tg-server \
monitoring-tg-server=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }} monitoring-tg-migrate=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }} \
monitoring-tg-server=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }} \
monitoring-tg-telegram=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }} \
monitoring-tg-classifier=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }}
kubectl -n monitoring-tg rollout status deployment/monitoring-tg-server --timeout=180s kubectl -n monitoring-tg rollout status deployment/monitoring-tg-server --timeout=180s

36
.golangci.yml Normal file
View File

@@ -0,0 +1,36 @@
version: "2"
run:
timeout: 3m
linters:
default: none
enable:
- errcheck
- govet
- ineffassign
- staticcheck
- unused
settings:
errcheck:
check-type-assertions: true
check-blank: false
exclude-functions:
- (io.Closer).Close
- (net/http.ResponseWriter).Write
- (*encoding/json.Encoder).Encode
- io.Copy
- fmt.Fprintf
- (github.com/jackc/pgx/v5.Tx).Rollback
- os.RemoveAll
staticcheck:
checks: ["all", "-SA1019", "-ST1000", "-ST1005", "-ST1020", "-ST1021", "-ST1022"]
exclusions:
rules:
- path: _test\.go
linters:
- errcheck
issues:
max-issues-per-linter: 0
max-same-issues: 0

View File

@@ -4,6 +4,7 @@ WORKDIR /src
COPY go.mod go.sum ./ COPY go.mod go.sum ./
COPY cmd ./cmd COPY cmd ./cmd
COPY internal ./internal
RUN CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-classifier ./cmd/classifier \ RUN CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-classifier ./cmd/classifier \
&& CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-server ./cmd/server && CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-server ./cmd/server

View File

@@ -4,8 +4,8 @@ Backend-сервис мониторинга Telegram-каналов для Porta
AI-классификация работают на Go, Python оставлен только как внутренний AI-классификация работают на Go, Python оставлен только как внутренний
MTProto/Telethon-адаптер для авторизации, опроса каналов и дозагрузки медиа. MTProto/Telethon-адаптер для авторизации, опроса каналов и дозагрузки медиа.
Сервис сохраняет сообщения в Postgres, раскладывает каналы по Сервис сохраняет сообщения в Postgres, раскладывает каналы по
вертикалям/подразделам и выполняет AI-анализ через OpenAI-compatible endpoint, вертикалям/подразделам и выполняет AI-анализ через общий `ai-service`,
общий с другими сервисами портала. который уже сам обращается к OpenAI-compatible backend.
Пользовательский UI живёт в `portal/frontend/src/app/features/monitoring-tg`. Пользовательский UI живёт в `portal/frontend/src/app/features/monitoring-tg`.
Этот сервис не отдаёт отдельные HTML-страницы и работает как API/worker за Этот сервис не отдаёт отдельные HTML-страницы и работает как API/worker за
@@ -33,7 +33,7 @@ MTProto/Telethon-адаптер для авторизации, опроса ка
## Запуск в k8s ## Запуск в k8s
Манифесты лежат в `k8s/`. Перед применением нужно заполнить `k8s/secrets.yaml` Манифесты лежат в `k8s/`. Перед применением нужно заполнить `k8s/secrets.yaml`
реальными Telegram-кредами и, при необходимости, `LLM_API_KEY`. реальными Telegram-кредами и `AI_SERVICE_TOKEN`.
```bash ```bash
kubectl apply -k k8s kubectl apply -k k8s

View File

@@ -0,0 +1,43 @@
"""channel aliases across department sections
Revision ID: 0011
Revises: 0010
Create Date: 2026-06-08
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0011"
down_revision: Union[str, None] = "0010"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column("channels", sa.Column("source_channel_id", sa.Integer(), nullable=True))
op.create_foreign_key(
"fk_channels_source_channel_id",
"channels",
"channels",
["source_channel_id"],
["id"],
ondelete="SET NULL",
)
op.drop_constraint("channels_identifier_key", "channels", type_="unique")
op.create_unique_constraint(
"uq_channels_section_identifier",
"channels",
["section_id", "identifier"],
)
op.create_index("ix_channels_source_channel_id", "channels", ["source_channel_id"])
def downgrade() -> None:
op.drop_index("ix_channels_source_channel_id", table_name="channels")
op.drop_constraint("uq_channels_section_identifier", "channels", type_="unique")
op.create_unique_constraint("channels_identifier_key", "channels", ["identifier"])
op.drop_constraint("fk_channels_source_channel_id", "channels", type_="foreignkey")
op.drop_column("channels", "source_channel_id")

View File

@@ -0,0 +1,38 @@
"""per-section message classifications
Revision ID: 0012
Revises: 0011
Create Date: 2026-06-11
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
revision: str = "0012"
down_revision: Union[str, None] = "0011"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"message_classifications",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("message_id", sa.Integer(), sa.ForeignKey("messages.id", ondelete="CASCADE"), nullable=False),
sa.Column("section_id", sa.Integer(), sa.ForeignKey("sections.id", ondelete="CASCADE"), nullable=False),
sa.Column("vertical", sa.String(length=32), nullable=False),
sa.Column("verdict", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.UniqueConstraint("message_id", "section_id", name="uq_message_classification_section"),
)
op.create_index("ix_message_classifications_message", "message_classifications", ["message_id"])
op.create_index("ix_message_classifications_section", "message_classifications", ["section_id"])
def downgrade() -> None:
op.drop_index("ix_message_classifications_section", table_name="message_classifications")
op.drop_index("ix_message_classifications_message", table_name="message_classifications")
op.drop_table("message_classifications")

View File

@@ -0,0 +1,34 @@
"""channel poll status
Revision ID: 0013
Revises: 0012
Create Date: 2026-06-17
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0013"
down_revision: Union[str, None] = "0012"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column("channels", sa.Column("last_poll_status", sa.String(length=32), nullable=True))
op.add_column("channels", sa.Column("last_poll_error_code", sa.String(length=64), nullable=True))
op.add_column("channels", sa.Column("last_poll_error", sa.Text(), nullable=True))
op.add_column("channels", sa.Column("last_poll_error_at", sa.DateTime(timezone=True), nullable=True))
op.create_index("ix_channels_last_poll_status", "channels", ["last_poll_status"])
op.create_index("ix_channels_last_poll_error_code", "channels", ["last_poll_error_code"])
def downgrade() -> None:
op.drop_index("ix_channels_last_poll_error_code", table_name="channels")
op.drop_index("ix_channels_last_poll_status", table_name="channels")
op.drop_column("channels", "last_poll_error_at")
op.drop_column("channels", "last_poll_error")
op.drop_column("channels", "last_poll_error_code")
op.drop_column("channels", "last_poll_status")

View File

@@ -1,14 +1,11 @@
package main package main
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"log/slog" "log/slog"
"net/http"
"net/url" "net/url"
"os" "os"
"os/signal" "os/signal"
@@ -17,6 +14,9 @@ import (
"syscall" "syscall"
"time" "time"
"monitoring-tg/internal/aiservice"
"monitoring-tg/internal/dbretry"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
) )
@@ -33,18 +33,19 @@ type config struct {
PostgresPort int PostgresPort int
LLMEnabled bool LLMEnabled bool
LLMBaseURL string
LLMAPIKey string
LLMModel string LLMModel string
LLMTimeout time.Duration LLMTimeout time.Duration
LLMMaxTokens int LLMMaxTokens int
LLMMinTextLength int LLMMinTextLength int
ClassifyInterval time.Duration ClassifyInterval time.Duration
ClassifyBatchSize int ClassifyBatchSize int
AIServiceURL string
AIServiceToken string
} }
type pendingMessage struct { type pendingMessage struct {
ID int64 ID int64
SectionID int64
Text string Text string
Vertical string Vertical string
SectionSlug string SectionSlug string
@@ -52,29 +53,10 @@ type pendingMessage struct {
Extracted map[string]any Extracted map[string]any
} }
type chatRequest struct {
Model string `json:"model"`
Messages []chatMessage `json:"messages"`
Temperature float64 `json:"temperature"`
MaxTokens int `json:"max_tokens"`
ResponseFormat responseFmt `json:"response_format"`
}
type chatMessage struct {
Role string `json:"role"`
Content string `json:"content"`
}
type responseFmt struct { type responseFmt struct {
Type string `json:"type"` Type string `json:"type"`
} }
type chatResponse struct {
Choices []struct {
Message chatMessage `json:"message"`
} `json:"choices"`
}
func main() { func main() {
cfg := loadConfig() cfg := loadConfig()
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
@@ -88,14 +70,18 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop() defer stop()
pool, err := pgxpool.New(ctx, cfg.databaseURL()) pool, err := dbretry.Connect(ctx, cfg.databaseURL(), 2*time.Minute)
if err != nil { if err != nil {
slog.Error("db_connect_failed", "error", err) slog.Error("db_connect_failed", "error", err)
os.Exit(1) os.Exit(1)
} }
defer pool.Close() defer pool.Close()
worker := &classifier{cfg: cfg, db: pool, http: &http.Client{Timeout: cfg.LLMTimeout}} worker := &classifier{
cfg: cfg,
db: pool,
ai: aiservice.New(cfg.AIServiceURL, cfg.AIServiceToken, cfg.LLMTimeout),
}
slog.Info( slog.Info(
"classifier_started", "classifier_started",
"interval", cfg.ClassifyInterval.String(), "interval", cfg.ClassifyInterval.String(),
@@ -107,11 +93,11 @@ func main() {
defer ticker.Stop() defer ticker.Stop()
for { for {
updated, err := worker.runOnce(ctx) updated, enqueued, err := worker.runOnce(ctx)
if err != nil { if err != nil {
slog.Error("classify_batch_failed", "error", err) slog.Error("classify_batch_failed", "error", err)
} else if updated > 0 { } else if updated > 0 || enqueued > 0 {
slog.Info("classify_batch_done", "updated", updated) slog.Info("classify_batch_done", "updated", updated, "enqueued", enqueued)
} }
select { select {
@@ -124,67 +110,116 @@ func main() {
} }
type classifier struct { type classifier struct {
cfg config cfg config
db *pgxpool.Pool db *pgxpool.Pool
http *http.Client ai *aiservice.Client
} }
func (c *classifier) runOnce(ctx context.Context) (int, error) { func (c *classifier) runOnce(ctx context.Context) (int, int, error) {
rows, err := c.loadPending(ctx) rows, err := c.loadPending(ctx)
if err != nil { if err != nil {
return 0, err return 0, 0, err
} }
if len(rows) == 0 { if len(rows) == 0 {
return 0, nil return 0, 0, nil
} }
byRef := make(map[string]pendingMessage, len(rows))
jobs := make([]aiservice.CreateJobRequest, 0, len(rows))
updated := 0 updated := 0
for _, msg := range rows { for _, msg := range rows {
key := verdictKey(msg.Vertical) key := verdictKey(msg.Vertical)
if _, ok := msg.Extracted[key]; ok { if _, ok := msg.Extracted[key]; ok {
continue continue
} }
if len(strings.TrimSpace(msg.Text)) < c.cfg.LLMMinTextLength {
verdict, err := c.classify(ctx, msg) verdict, err := marshalRaw(negativeVerdict(msg.Vertical))
if err != nil {
slog.Warn("llm_classify_failed", "message_id", msg.ID, "vertical", msg.Vertical, "error", err)
continue
}
if len(verdict) == 0 {
verdict, err = marshalRaw(negativeVerdict(msg.Vertical))
if err != nil { if err != nil {
slog.Warn("negative_verdict_failed", "message_id", msg.ID, "error", err) slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err)
continue continue
} }
} if err := c.saveVerdict(ctx, msg, key, verdict); err != nil {
slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err)
if err := c.saveVerdict(ctx, msg.ID, key, verdict); err != nil { continue
slog.Warn("save_verdict_failed", "message_id", msg.ID, "error", err) }
updated++
continue continue
} }
updated++
req, err := c.buildJobRequest(ctx, msg)
if err != nil {
slog.Warn("build_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "vertical", msg.Vertical, "error", err)
continue
}
byRef[req.OwnerRef] = msg
jobs = append(jobs, req)
} }
return updated, nil if len(jobs) == 0 {
return updated, 0, nil
}
created, err := c.ai.CreateJobs(ctx, aiservice.CreateJobsRequest{
OwnerService: "monitoring-tg",
TaskType: "telegram_classification",
ModelProfile: c.cfg.LLMModel,
Priority: 5,
MaxAttempts: 2,
Jobs: jobs,
})
if err != nil {
return updated, 0, err
}
for _, job := range created {
msg, ok := byRef[job.OwnerRef]
if !ok {
continue
}
switch job.Status {
case "done":
verdict, err := c.verdictFromJob(job, msg.Vertical)
if err != nil {
slog.Warn("parse_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil {
slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
updated++
case "failed", "cancelled":
verdict, err := marshalRaw(negativeVerdict(msg.Vertical))
if err != nil {
slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil {
slog.Warn("save_failed_job_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
updated++
slog.Warn("classify_job_failed_marked_negative", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "status", job.Status, "error", derefString(job.ErrorMessage))
}
}
return updated, len(created), nil
} }
func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error) { func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error) {
rows, err := c.db.Query(ctx, ` rows, err := c.db.Query(ctx, `
SELECT SELECT
m.id, m.id,
s.id,
m.text, m.text,
c.vertical, c.vertical,
s.slug, s.slug,
COALESCE(s.department_id, ''), COALESCE(s.department_id, ''),
COALESCE(m.extracted, '{}'::jsonb)::text COALESCE(mc.verdict, '{}'::jsonb)::text
FROM messages m FROM messages m
JOIN channels c ON c.id = m.channel_id JOIN channels src ON src.id = m.channel_id
JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id
JOIN sections s ON s.id = c.section_id JOIN sections s ON s.id = c.section_id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE m.text IS NOT NULL WHERE m.text IS NOT NULL
AND ( AND mc.id IS NULL
(c.vertical = 'hr' AND (m.extracted IS NULL OR m.extracted->'hr_lead' IS NULL))
OR
(c.vertical <> 'hr' AND (m.extracted IS NULL OR m.extracted->'lead' IS NULL))
)
ORDER BY m.id DESC ORDER BY m.id DESC
LIMIT $1 LIMIT $1
`, c.cfg.ClassifyBatchSize) `, c.cfg.ClassifyBatchSize)
@@ -197,7 +232,7 @@ func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error)
for rows.Next() { for rows.Next() {
var msg pendingMessage var msg pendingMessage
var extractedText string var extractedText string
if err := rows.Scan(&msg.ID, &msg.Text, &msg.Vertical, &msg.SectionSlug, &msg.DepartmentID, &extractedText); err != nil { if err := rows.Scan(&msg.ID, &msg.SectionID, &msg.Text, &msg.Vertical, &msg.SectionSlug, &msg.DepartmentID, &extractedText); err != nil {
return nil, err return nil, err
} }
if err := json.Unmarshal([]byte(extractedText), &msg.Extracted); err != nil { if err := json.Unmarshal([]byte(extractedText), &msg.Extracted); err != nil {
@@ -208,60 +243,56 @@ func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error)
return out, rows.Err() return out, rows.Err()
} }
func (c *classifier) classify(ctx context.Context, msg pendingMessage) (json.RawMessage, error) { func (c *classifier) buildJobRequest(ctx context.Context, msg pendingMessage) (aiservice.CreateJobRequest, error) {
if len(strings.TrimSpace(msg.Text)) < c.cfg.LLMMinTextLength {
return marshalRaw(negativeVerdict(msg.Vertical))
}
systemPrompt, err := c.resolvePrompt(ctx, msg.Vertical, msg.DepartmentID, msg.SectionSlug) systemPrompt, err := c.resolvePrompt(ctx, msg.Vertical, msg.DepartmentID, msg.SectionSlug)
if err != nil { if err != nil {
return nil, err return aiservice.CreateJobRequest{}, err
} }
systemPrompt = promptWithVerticalGuard(msg.Vertical, systemPrompt)
payload := chatRequest{ responseFormat, _ := json.Marshal(responseFmt{Type: "json_object"})
Model: c.cfg.LLMModel, payload := aiservice.ChatInput{
Messages: []chatMessage{ Messages: []aiservice.Message{
{Role: "system", Content: systemPrompt}, {Role: "system", Content: systemPrompt},
{Role: "user", Content: buildUserPrompt(msg.Text)}, {Role: "user", Content: buildUserPrompt(msg.Text)},
}, },
Temperature: 0.1, Temperature: 0.1,
MaxTokens: c.cfg.LLMMaxTokens, MaxTokens: c.cfg.LLMMaxTokens,
ResponseFormat: responseFmt{Type: "json_object"}, ResponseFormat: responseFormat,
} }
body, err := json.Marshal(payload) body, err := json.Marshal(payload)
if err != nil { if err != nil {
return aiservice.CreateJobRequest{}, err
}
ownerRef := classifyOwnerRef(msg)
return aiservice.CreateJobRequest{
OwnerService: "monitoring-tg",
OwnerRef: ownerRef,
TaskType: "telegram_classification",
ModelProfile: c.cfg.LLMModel,
Priority: 5,
MaxAttempts: 2,
Input: body,
// Classification is section-specific because prompts are section-specific.
IdempotencyKey: "monitoring-tg:telegram_classification:" + ownerRef,
}, nil
}
func (c *classifier) verdictFromJob(job *aiservice.Job, vertical string) (json.RawMessage, error) {
if job.Status != "done" {
msg := "ai-service job " + job.Status
if job.ErrorMessage != nil && *job.ErrorMessage != "" {
msg += ": " + *job.ErrorMessage
}
return nil, errors.New(msg)
}
var parsed aiservice.ChatResult
if err := json.Unmarshal(job.Result, &parsed); err != nil {
return nil, err return nil, err
} }
req, err := http.NewRequestWithContext(ctx, http.MethodPost, strings.TrimRight(c.cfg.LLMBaseURL, "/")+"/v1/chat/completions", bytes.NewReader(body)) raw := strings.TrimSpace(parsed.Content)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
if c.cfg.LLMAPIKey != "" {
req.Header.Set("Authorization", "Bearer "+c.cfg.LLMAPIKey)
}
resp, err := c.http.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
return nil, fmt.Errorf("llm http %d: %s", resp.StatusCode, strings.TrimSpace(string(b)))
}
var parsed chatResponse
if err := json.NewDecoder(resp.Body).Decode(&parsed); err != nil {
return nil, err
}
if len(parsed.Choices) == 0 {
return nil, errors.New("llm returned no choices")
}
raw := strings.TrimSpace(parsed.Choices[0].Message.Content)
if raw == "" { if raw == "" {
return nil, errors.New("llm returned empty content") return nil, errors.New("llm returned empty content")
} }
@@ -269,7 +300,7 @@ func (c *classifier) classify(ctx context.Context, msg pendingMessage) (json.Raw
if err != nil { if err != nil {
return nil, err return nil, err
} }
normalized, err := normalizeVerdict(msg.Vertical, block) normalized, err := normalizeVerdict(vertical, block)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -297,12 +328,28 @@ func (c *classifier) resolvePrompt(ctx context.Context, vertical, departmentID,
return defaultPrompt(vertical), nil return defaultPrompt(vertical), nil
} }
func (c *classifier) saveVerdict(ctx context.Context, id int64, key string, verdict json.RawMessage) error { func (c *classifier) saveVerdict(ctx context.Context, msg pendingMessage, key string, verdict json.RawMessage) error {
_, err := c.db.Exec(ctx, ` _, err := c.db.Exec(ctx, `
INSERT INTO message_classifications (message_id, section_id, vertical, verdict, updated_at)
VALUES ($1, $2, $3, $4::jsonb, now())
ON CONFLICT ON CONSTRAINT uq_message_classification_section DO UPDATE
SET vertical = EXCLUDED.vertical,
verdict = EXCLUDED.verdict,
updated_at = now()
`, msg.ID, msg.SectionID, msg.Vertical, string(verdict))
if err != nil {
return err
}
_, err = c.db.Exec(ctx, `
UPDATE messages UPDATE messages
SET extracted = jsonb_set(COALESCE(extracted, '{}'::jsonb), ARRAY[$2], $3::jsonb, true) SET extracted = jsonb_set(
CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END,
ARRAY[$2],
$3::jsonb,
true
)
WHERE id = $1 WHERE id = $1
`, id, key, string(verdict)) `, msg.ID, key, string(verdict))
return err return err
} }
@@ -320,6 +367,10 @@ func verdictKey(vertical string) string {
return "lead" return "lead"
} }
func classifyOwnerRef(msg pendingMessage) string {
return fmt.Sprintf("%d:%d", msg.ID, msg.SectionID)
}
func buildUserPrompt(text string) string { func buildUserPrompt(text string) string {
return "Текст сообщения:\n```\n" + text + "\n```\nВерни JSON." return "Текст сообщения:\n```\n" + text + "\n```\nВерни JSON."
} }
@@ -405,6 +456,13 @@ func asFloat(v any) (float64, bool) {
} }
} }
func derefString(v *string) string {
if v == nil {
return ""
}
return *v
}
func defaultPrompt(vertical string) string { func defaultPrompt(vertical string) string {
if vertical == verticalHR { if vertical == verticalHR {
return defaultHRPrompt return defaultHRPrompt
@@ -412,6 +470,16 @@ func defaultPrompt(vertical string) string {
return defaultREPrompt return defaultREPrompt
} }
func promptWithVerticalGuard(vertical, prompt string) string {
if vertical == verticalHR {
return prompt
}
if strings.Contains(prompt, realEstateOnlyGuard) {
return prompt
}
return strings.TrimSpace(prompt) + "\n\n" + realEstateOnlyGuard
}
func loadConfig() config { func loadConfig() config {
return config{ return config{
PostgresUser: env("POSTGRES_USER", "parser"), PostgresUser: env("POSTGRES_USER", "parser"),
@@ -420,14 +488,14 @@ func loadConfig() config {
PostgresHost: env("POSTGRES_HOST", "db"), PostgresHost: env("POSTGRES_HOST", "db"),
PostgresPort: envInt("POSTGRES_PORT", 5432), PostgresPort: envInt("POSTGRES_PORT", 5432),
LLMEnabled: envBool("LLM_ENABLED", true), LLMEnabled: envBool("LLM_ENABLED", true),
LLMBaseURL: env("LLM_BASE_URL", "http://10.2.3.5:8002"),
LLMAPIKey: env("LLM_API_KEY", ""),
LLMModel: env("LLM_MODEL", "qwen2.5-14b"), LLMModel: env("LLM_MODEL", "qwen2.5-14b"),
LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second, LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second,
LLMMaxTokens: envInt("LLM_MAX_TOKENS", 600), LLMMaxTokens: envInt("LLM_MAX_TOKENS", 600),
LLMMinTextLength: envInt("LLM_MIN_TEXT_LENGTH", 20), LLMMinTextLength: envInt("LLM_MIN_TEXT_LENGTH", 20),
ClassifyInterval: time.Duration(envInt("LLM_CLASSIFY_INTERVAL_SECONDS", 20)) * time.Second, ClassifyInterval: time.Duration(envInt("LLM_CLASSIFY_INTERVAL_SECONDS", 5)) * time.Second,
ClassifyBatchSize: envInt("LLM_CLASSIFY_BATCH_SIZE", 5), ClassifyBatchSize: envInt("LLM_CLASSIFY_BATCH_SIZE", 200),
AIServiceURL: env("AI_SERVICE_URL", ""),
AIServiceToken: env("AI_SERVICE_TOKEN", ""),
} }
} }
@@ -475,6 +543,8 @@ func envBool(key string, fallback bool) bool {
const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала. const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала.
Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости. Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости.
Учитывай только сделки с недвижимостью: квартиры, дома, апартаменты, участки, коммерческие помещения и другие объекты недвижимости.
Любые продажи или заявки по стройматериалам, мебели, бытовой/строительной технике, автомобилям, услугам, оборудованию и прочим товарам не являются лидами недвижимости — для них is_listing=false.
Отвечай строго валидным JSON без markdown: Отвечай строго валидным JSON без markdown:
{ {
"is_listing": boolean, "is_listing": boolean,
@@ -493,6 +563,9 @@ const defaultREPrompt = `Ты — аналитик объявлений о не
} }
summary всегда по-русски, confidence в диапазоне 0..1.` summary всегда по-русски, confidence в диапазоне 0..1.`
const realEstateOnlyGuard = `Жёсткое правило для недвижимости: учитывай только сделки с недвижимостью.
Если сообщение продаёт или покупает стройматериалы, мебель, бытовую/строительную технику, автомобили, услуги, оборудование или любые другие товары/работы не по недвижимости — это НЕ лид недвижимости, верни is_listing=false.`
const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала. const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала.
Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт. Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт.
Отвечай строго валидным JSON без markdown: Отвечай строго валидным JSON без markdown:

File diff suppressed because it is too large Load Diff

28
go.mod
View File

@@ -2,12 +2,34 @@ module monitoring-tg
go 1.25.7 go 1.25.7
require github.com/jackc/pgx/v5 v5.9.1 require (
gitea.estateliga.work/admin/portal-common v0.3.0
github.com/jackc/pgx/v5 v5.9.1
github.com/minio/minio-go/v7 v7.2.0
)
require ( require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect
golang.org/x/sync v0.17.0 // indirect github.com/klauspost/compress v1.18.6 // indirect
golang.org/x/text v0.29.0 // indirect github.com/klauspost/cpuid/v2 v2.2.11 // indirect
github.com/klauspost/crc32 v1.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/minio/crc64nvme v1.1.1 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/philhofer/fwd v1.2.0 // indirect
github.com/rs/xid v1.6.0 // indirect
github.com/tinylib/msgp v1.6.1 // indirect
github.com/zeebo/xxh3 v1.1.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.51.0 // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.44.0 // indirect
golang.org/x/text v0.37.0 // indirect
gopkg.in/ini.v1 v1.67.2 // indirect
) )

64
go.sum
View File

@@ -1,6 +1,15 @@
gitea.estateliga.work/admin/portal-common v0.3.0 h1:xpr9UeLXk5pCcNXcTVGZzJZr0Ni7An7DV0OkuYv9qVM=
gitea.estateliga.work/admin/portal-common v0.3.0/go.mod h1:C860q6g38KVMsv+mKv6k1Vm7smVRCycl+N6r63TElnk=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@@ -9,18 +18,65 @@ github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc=
github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao=
github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/klauspost/crc32 v1.3.0 h1:sSmTt3gUt81RP655XGZPElI0PelVTZ6YwCRnPSupoFM=
github.com/klauspost/crc32 v1.3.0/go.mod h1:D7kQaZhnkX/Y0tstFGf8VUzv2UofNGqCjnC3zdHB0Hw=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/minio/crc64nvme v1.1.1 h1:8dwx/Pz49suywbO+auHCBpCtlW1OfpcLN7wYgVR6wAI=
github.com/minio/crc64nvme v1.1.1/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.2.0 h1:RCJM0R1XOsRs+A3x3UCaf3ZYbByDaLjFeAi+YCQEPhs=
github.com/minio/minio-go/v7 v7.2.0/go.mod h1:EU9hENAStx/xXduNdrGO5e4X5vk19NtgB+RIPjZO8o0=
github.com/philhofer/fwd v1.2.0 h1:e6DnBTl7vGY+Gz322/ASL4Gyp1FspeMvx1RNDoToZuM=
github.com/philhofer/fwd v1.2.0/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= github.com/tinylib/msgp v1.6.1 h1:ESRv8eL3u+DNHUoSAAQRE50Hm162zqAnBoGv9PzScPY=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= github.com/tinylib/msgp v1.6.1/go.mod h1:RSp0LW9oSxFut3KzESt5Voq4GVWyS+PSulT77roAqEA=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/ini.v1 v1.67.2 h1:JtOSMb9OuaCZKr7h5D/h6iii14sK0hLbplTc6frx4Ss=
gopkg.in/ini.v1 v1.67.2/go.mod h1:x/cyOwCgZqOkJoDIJ3c1KNHMo10+nLGAhh+kn3Zizss=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,308 @@
package aiservice
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)
type Client struct {
baseURL string
token string
http *http.Client
}
type Message struct {
Role string `json:"role"`
Content string `json:"content"`
}
type ChatInput struct {
Messages []Message `json:"messages"`
Temperature float64 `json:"temperature"`
MaxTokens int `json:"max_tokens,omitempty"`
ResponseFormat json.RawMessage `json:"response_format,omitempty"`
}
type CreateJobRequest struct {
OwnerService string `json:"owner_service"`
OwnerRef string `json:"owner_ref"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Priority int `json:"priority"`
MaxAttempts int `json:"max_attempts"`
Input json.RawMessage `json:"input"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
}
type CreateJobsRequest struct {
OwnerService string `json:"owner_service,omitempty"`
TaskType string `json:"task_type,omitempty"`
ModelProfile string `json:"model_profile,omitempty"`
Priority int `json:"priority,omitempty"`
MaxAttempts int `json:"max_attempts,omitempty"`
Jobs []CreateJobRequest `json:"jobs"`
}
type Job struct {
ID string `json:"id"`
OwnerService string `json:"owner_service,omitempty"`
OwnerRef string `json:"owner_ref,omitempty"`
TaskType string `json:"task_type,omitempty"`
ModelProfile string `json:"model_profile,omitempty"`
Status string `json:"status"`
Result json.RawMessage `json:"result,omitempty"`
ErrorCode *string `json:"error_code,omitempty"`
ErrorMessage *string `json:"error_message,omitempty"`
IdempotencyKey *string `json:"idempotency_key,omitempty"`
}
type ChatResult struct {
Content string `json:"content"`
Model string `json:"model"`
DurationMS int64 `json:"duration_ms"`
}
type ProvidersStatus struct {
At time.Time `json:"at"`
Providers []ProviderStatus `json:"providers"`
}
type ProviderStatus struct {
Name string `json:"name"`
Configured bool `json:"configured"`
OK bool `json:"ok"`
URL string `json:"url,omitempty"`
Model string `json:"model,omitempty"`
LatencyMS int64 `json:"latency_ms,omitempty"`
Error string `json:"error,omitempty"`
}
type Stats struct {
At time.Time `json:"at"`
Owners []OwnerStat `json:"owners,omitempty"`
Errors []ErrorStat `json:"errors,omitempty"`
Backlog []BacklogStat `json:"backlog,omitempty"`
}
type OwnerStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Status string `json:"status"`
Total int64 `json:"total"`
}
type ErrorStat struct {
OwnerService string `json:"owner_service,omitempty"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
ErrorCode string `json:"error_code"`
Total int64 `json:"total"`
Last24h int64 `json:"last_24h"`
}
type BacklogStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Pending int64 `json:"pending"`
Running int64 `json:"running"`
StaleRunning int64 `json:"stale_running"`
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"`
}
func New(baseURL, token string, timeout time.Duration) *Client {
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
if baseURL == "" {
return nil
}
if timeout <= 0 {
timeout = 2 * time.Minute
}
return &Client{
baseURL: baseURL,
token: strings.TrimSpace(token),
http: &http.Client{Timeout: timeout},
}
}
func (c *Client) CreateJob(ctx context.Context, req CreateJobRequest) (*Job, error) {
if c == nil {
return nil, fmt.Errorf("ai-service not configured")
}
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal ai job: %w", err)
}
httpReq, err := c.request(ctx, http.MethodPost, "/api/v1/jobs", body)
if err != nil {
return nil, err
}
resp, err := c.http.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("create ai job: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("create ai job: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out Job
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai job: %w", err)
}
return &out, nil
}
func (c *Client) CreateJobs(ctx context.Context, req CreateJobsRequest) ([]*Job, error) {
if c == nil {
return nil, fmt.Errorf("ai-service not configured")
}
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal ai jobs: %w", err)
}
httpReq, err := c.request(ctx, http.MethodPost, "/api/v1/jobs/batch", body)
if err != nil {
return nil, err
}
resp, err := c.http.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("create ai jobs: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("create ai jobs: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out struct {
Jobs []*Job `json:"jobs"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai jobs: %w", err)
}
return out.Jobs, nil
}
func (c *Client) GetJob(ctx context.Context, id string) (*Job, error) {
if c == nil || strings.TrimSpace(id) == "" {
return nil, fmt.Errorf("ai job id is required")
}
req, err := c.request(ctx, http.MethodGet, "/api/v1/jobs/"+id, nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("get ai job: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("get ai job: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out Job
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai job: %w", err)
}
return &out, nil
}
func (c *Client) WaitJob(ctx context.Context, id string, pollInterval time.Duration) (*Job, error) {
if pollInterval <= 0 {
pollInterval = 2 * time.Second
}
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
job, err := c.GetJob(ctx, id)
if err != nil {
return nil, err
}
switch job.Status {
case "done", "failed", "cancelled":
return job, nil
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ticker.C:
}
}
}
func (c *Client) ProvidersStatus(ctx context.Context) (*ProvidersStatus, error) {
if c == nil {
return nil, fmt.Errorf("ai-service not configured")
}
req, err := c.request(ctx, http.MethodGet, "/api/v1/providers/status", nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("ai providers status: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("ai providers status: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out ProvidersStatus
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai providers status: %w", err)
}
return &out, nil
}
func (c *Client) Stats(ctx context.Context) (*Stats, error) {
if c == nil {
return nil, fmt.Errorf("ai-service not configured")
}
req, err := c.request(ctx, http.MethodGet, "/api/v1/stats", nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("ai stats: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("ai stats: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out Stats
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai stats: %w", err)
}
return &out, nil
}
func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) {
var r io.Reader
if body != nil {
r = bytes.NewReader(body)
}
req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, r)
if err != nil {
return nil, err
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
if c.token != "" {
req.Header.Set("Authorization", "Bearer "+c.token)
}
return req, nil
}
func readSmall(r io.Reader) string {
body, err := io.ReadAll(io.LimitReader(r, 1024))
if err != nil {
return err.Error()
}
return strings.TrimSpace(string(body))
}

View File

@@ -0,0 +1,44 @@
package dbretry
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
func Connect(ctx context.Context, databaseURL string, maxWait time.Duration) (*pgxpool.Pool, error) {
deadline := time.Now().Add(maxWait)
var lastErr error
for attempt := 1; ; attempt++ {
pool, err := pgxpool.New(ctx, databaseURL)
if err == nil {
if pingErr := pool.Ping(ctx); pingErr == nil {
return pool, nil
} else {
err = fmt.Errorf("ping postgres: %w", pingErr)
pool.Close()
}
} else {
err = fmt.Errorf("connect postgres: %w", err)
}
lastErr = err
if time.Now().After(deadline) {
return nil, fmt.Errorf("connect postgres after retry: %w", lastErr)
}
sleep := time.Duration(attempt) * time.Second
if sleep > 5*time.Second {
sleep = 5 * time.Second
}
timer := time.NewTimer(sleep)
select {
case <-ctx.Done():
timer.Stop()
return nil, fmt.Errorf("connect postgres cancelled: %w", ctx.Err())
case <-timer.C:
}
}
}

View File

@@ -14,10 +14,15 @@ data:
POSTGRES_DB: "parser" POSTGRES_DB: "parser"
TG_SESSION_PATH: "/data/session/parser.session" TG_SESSION_PATH: "/data/session/parser.session"
MEDIA_DIR: "/data/media" MEDIA_DIR: "/data/media"
MINIO_ENDPOINT: "s3-minio.estateliga.work"
MINIO_BUCKET: "monitoring-tg-media"
MINIO_USE_SSL: "1"
MINIO_INSECURE_SKIP_VERIFY: "0"
MINIO_REGION: "us-east-1"
POLL_INTERVAL_SECONDS: "60" POLL_INTERVAL_SECONDS: "60"
POLL_HISTORY_LIMIT: "50" POLL_HISTORY_LIMIT: "50"
LLM_ENABLED: "1" LLM_ENABLED: "1"
LLM_BASE_URL: "http://10.2.3.5:8002"
LLM_MODEL: "qwen2.5-14b" LLM_MODEL: "qwen2.5-14b"
LLM_MAX_TOKENS: "600" LLM_MAX_TOKENS: "600"
LLM_CLASSIFIER_OWNER: "go" LLM_CLASSIFIER_OWNER: "go"
AI_SERVICE_URL: "http://ai-service.ai-service.svc.cluster.local:8080"

View File

@@ -10,7 +10,10 @@ stringData:
TG_PHONE: "+971524994695" TG_PHONE: "+971524994695"
TG_SESSION_STRING: "" TG_SESSION_STRING: ""
POSTGRES_PASSWORD: "parser" POSTGRES_PASSWORD: "parser"
LLM_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32" INTERNAL_API_KEY: "36fe89ed40c01fdc54d3cf4e3fcacc8751dc456a4a1acd394e9fed48257c5734"
AI_SERVICE_TOKEN: "d18bcacf9e02bae1806ee6b6eeda62b95be6a915c0a22936d9a700128b275442"
MINIO_ACCESS_KEY: "admjn"
MINIO_SECRET_KEY: "TropicalMacaw9Fantasize"
--- ---
apiVersion: v1 apiVersion: v1
kind: Secret kind: Secret

View File

@@ -27,6 +27,10 @@ spec:
labels: labels:
app: monitoring-tg-server app: monitoring-tg-server
spec: spec:
hostAliases:
- ip: "77.105.173.42"
hostnames:
- "s3-minio.estateliga.work"
terminationGracePeriodSeconds: 20 terminationGracePeriodSeconds: 20
securityContext: securityContext:
fsGroup: 1000 fsGroup: 1000

View File

@@ -14,6 +14,7 @@ dependencies = [
"pydantic>=2.9", "pydantic>=2.9",
"pydantic-settings>=2.6", "pydantic-settings>=2.6",
"structlog>=24.4", "structlog>=24.4",
"minio>=7.2",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@@ -13,6 +13,19 @@ def portal_department_id(request: Request) -> str | None:
return value or None return value or None
def portal_department_ids(request: Request) -> list[str]:
raw = (request.headers.get("x-user-department-ids") or "").strip()
out: list[str] = []
for part in raw.split(","):
value = part.strip()
if value and value not in out:
out.append(value)
current = portal_department_id(request)
if current and current not in out:
out.append(current)
return out
def is_department_head_request(request: Request) -> bool: def is_department_head_request(request: Request) -> bool:
return request.headers.get("x-user-is-department-head") == "1" return request.headers.get("x-user-is-department-head") == "1"

View File

@@ -7,14 +7,14 @@ from sqlalchemy.ext.asyncio import AsyncSession
from parser_bot.access import ( from parser_bot.access import (
is_admin_request, is_admin_request,
portal_department_id, portal_department_ids,
require_department_manager, require_department_manager,
require_telegram_auth_manager, require_telegram_auth_manager,
) )
from parser_bot.config import settings from parser_bot.config import settings
from parser_bot.db.models import Channel, Section from parser_bot.db.models import Channel, Section
from parser_bot.db.session import get_session from parser_bot.db.session import get_session
from parser_bot.scheduler.poller import backfill_media, poll_channel from parser_bot.scheduler.poller import PollError, backfill_media, poll_channel
from parser_bot.telegram import client as tg from parser_bot.telegram import client as tg
router = APIRouter() router = APIRouter()
@@ -39,13 +39,13 @@ class AuthCodeResult(BaseModel):
needs_password: bool needs_password: bool
def _department_scope(request: Request) -> str | None: def _department_scopes(request: Request) -> list[str] | None:
if is_admin_request(request): if is_admin_request(request):
return None return None
dept_id = portal_department_id(request) dept_ids = portal_department_ids(request)
if not dept_id: if not dept_ids:
raise HTTPException(status_code=403, detail="department is required") raise HTTPException(status_code=403, detail="department is required")
return dept_id return dept_ids
async def _require_channel_scope( async def _require_channel_scope(
@@ -55,7 +55,7 @@ async def _require_channel_scope(
vertical: str | None, vertical: str | None,
section: str | None, section: str | None,
) -> None: ) -> None:
department_id = _department_scope(request) department_ids = _department_scopes(request)
stmt = ( stmt = (
select(Channel.id) select(Channel.id)
.join(Section, Section.id == Channel.section_id) .join(Section, Section.id == Channel.section_id)
@@ -65,13 +65,25 @@ async def _require_channel_scope(
stmt = stmt.where(Channel.vertical == vertical) stmt = stmt.where(Channel.vertical == vertical)
if section: if section:
stmt = stmt.where(Section.slug == section) stmt = stmt.where(Section.slug == section)
if department_id is not None: if department_ids is not None:
stmt = stmt.where(Section.department_id == department_id) stmt = stmt.where(Section.department_id.in_(department_ids))
exists = (await session.execute(stmt)).scalar_one_or_none() exists = (await session.execute(stmt)).scalar_one_or_none()
if exists is None: if exists is None:
raise HTTPException(status_code=404) raise HTTPException(status_code=404)
def _poll_skipped_result(exc: PollError) -> dict[str, Any]:
result: dict[str, Any] = {
"inserted": 0,
"status": "skipped",
"code": exc.code,
"message": exc.message,
}
if exc.retry_after is not None:
result["retry_after"] = exc.retry_after
return result
@router.get("/auth/status", response_model=AuthStatus, dependencies=[Depends(require_telegram_auth_manager)]) @router.get("/auth/status", response_model=AuthStatus, dependencies=[Depends(require_telegram_auth_manager)])
async def auth_status() -> AuthStatus: async def auth_status() -> AuthStatus:
authorized = await tg.is_authorized() authorized = await tg.is_authorized()
@@ -126,10 +138,13 @@ async def trigger_poll(
vertical: str | None = Query(None), vertical: str | None = Query(None),
section: str | None = Query(None), section: str | None = Query(None),
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
) -> dict[str, int]: ) -> dict[str, Any]:
await _require_channel_scope(session, request, channel_id, vertical, section) await _require_channel_scope(session, request, channel_id, vertical, section)
inserted = await poll_channel(channel_id) try:
return {"inserted": inserted} inserted = await poll_channel(channel_id)
except PollError as exc:
return _poll_skipped_result(exc)
return {"inserted": inserted, "status": "ok"}
@router.post( @router.post(
@@ -167,7 +182,7 @@ async def trigger_poll_all(
section: str | None = Query(None), section: str | None = Query(None),
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
) -> dict[str, Any]: ) -> dict[str, Any]:
department_id = _department_scope(request) department_ids = _department_scopes(request)
stmt = ( stmt = (
select(Channel.id) select(Channel.id)
.join(Section, Section.id == Channel.section_id) .join(Section, Section.id == Channel.section_id)
@@ -175,8 +190,8 @@ async def trigger_poll_all(
) )
if section: if section:
stmt = stmt.where(Section.slug == section) stmt = stmt.where(Section.slug == section)
if department_id is not None: if department_ids is not None:
stmt = stmt.where(Section.department_id == department_id) stmt = stmt.where(Section.department_id.in_(department_ids))
result = await session.execute(stmt) result = await session.execute(stmt)
ids = [row[0] for row in result.all()] ids = [row[0] for row in result.all()]
background.add_task(_poll_all_in_background, ids) background.add_task(_poll_all_in_background, ids)

View File

@@ -28,6 +28,12 @@ class Settings(BaseSettings):
media_dir: str = Field("/data/media", alias="MEDIA_DIR") media_dir: str = Field("/data/media", alias="MEDIA_DIR")
media_max_bytes: int = Field(20 * 1024 * 1024, alias="MEDIA_MAX_BYTES") media_max_bytes: int = Field(20 * 1024 * 1024, alias="MEDIA_MAX_BYTES")
minio_endpoint: str = Field("", alias="MINIO_ENDPOINT")
minio_access_key: str = Field("", alias="MINIO_ACCESS_KEY")
minio_secret_key: str = Field("", alias="MINIO_SECRET_KEY")
minio_bucket: str = Field("monitoring-tg-media", alias="MINIO_BUCKET")
minio_use_ssl: bool = Field(True, alias="MINIO_USE_SSL")
minio_region: str = Field("us-east-1", alias="MINIO_REGION")
@property @property
def database_url(self) -> str: def database_url(self) -> str:

View File

@@ -54,12 +54,16 @@ class Section(Base):
class Channel(Base): class Channel(Base):
__tablename__ = "channels" __tablename__ = "channels"
__table_args__ = (
UniqueConstraint("section_id", "identifier", name="uq_channels_section_identifier"),
Index("ix_channels_source_channel_id", "source_channel_id"),
)
id: Mapped[int] = mapped_column(primary_key=True) id: Mapped[int] = mapped_column(primary_key=True)
# Telegram numeric channel id (peer id), nullable until first resolve # Telegram numeric channel id (peer id), nullable until first resolve
tg_id: Mapped[int | None] = mapped_column(BigInteger, unique=True, nullable=True) tg_id: Mapped[int | None] = mapped_column(BigInteger, unique=True, nullable=True)
# Username or t.me/joinchat link supplied by user # Username or t.me/joinchat link supplied by user
identifier: Mapped[str] = mapped_column(String(255), unique=True) identifier: Mapped[str] = mapped_column(String(255))
title: Mapped[str | None] = mapped_column(String(512), nullable=True) title: Mapped[str | None] = mapped_column(String(512), nullable=True)
# 'real_estate' or 'hr' — picks which LLM prompt and lead schema is used # 'real_estate' or 'hr' — picks which LLM prompt and lead schema is used
vertical: Mapped[str] = mapped_column( vertical: Mapped[str] = mapped_column(
@@ -68,9 +72,16 @@ class Channel(Base):
section_id: Mapped[int] = mapped_column( section_id: Mapped[int] = mapped_column(
ForeignKey("sections.id", ondelete="RESTRICT"), index=True ForeignKey("sections.id", ondelete="RESTRICT"), index=True
) )
source_channel_id: Mapped[int | None] = mapped_column(
ForeignKey("channels.id", ondelete="SET NULL"), nullable=True
)
is_active: Mapped[bool] = mapped_column(default=True, server_default="true") is_active: Mapped[bool] = mapped_column(default=True, server_default="true")
last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True) last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
last_poll_status: Mapped[str | None] = mapped_column(String(32), nullable=True)
last_poll_error_code: Mapped[str | None] = mapped_column(String(64), nullable=True)
last_poll_error: Mapped[str | None] = mapped_column(Text, nullable=True)
last_poll_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now() DateTime(timezone=True), server_default=func.now()
) )
@@ -112,6 +123,24 @@ class Message(Base):
channel: Mapped[Channel] = relationship(back_populates="messages") channel: Mapped[Channel] = relationship(back_populates="messages")
class MessageClassification(Base):
__tablename__ = "message_classifications"
__table_args__ = (
UniqueConstraint("message_id", "section_id", name="uq_message_classification_section"),
Index("ix_message_classifications_message", "message_id"),
Index("ix_message_classifications_section", "section_id"),
)
id: Mapped[int] = mapped_column(primary_key=True)
message_id: Mapped[int] = mapped_column(ForeignKey("messages.id", ondelete="CASCADE"))
section_id: Mapped[int] = mapped_column(ForeignKey("sections.id", ondelete="CASCADE"))
vertical: Mapped[str] = mapped_column(String(32))
verdict: Mapped[dict] = mapped_column(JSONB)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
class AppSetting(Base): class AppSetting(Base):
"""Runtime-editable settings, edited from the UI without a restart.""" """Runtime-editable settings, edited from the UI without a restart."""

View File

@@ -2,8 +2,9 @@ from datetime import datetime, timezone
import structlog import structlog
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy import func, select from sqlalchemy import func, or_, select
from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.exc import IntegrityError
from parser_bot.config import settings from parser_bot.config import settings
from parser_bot.db.models import Channel, Message from parser_bot.db.models import Channel, Message
@@ -19,24 +20,152 @@ from parser_bot.telegram.client import (
log = structlog.get_logger() log = structlog.get_logger()
class PollError(RuntimeError):
status_code = 400
code = "poll_failed"
retry_after: int | None = None
def __init__(self, message: str) -> None:
super().__init__(message)
self.message = message
class PollUnauthorizedError(PollError):
status_code = 401
code = "telegram_not_authorized"
class PollChannelUnavailableError(PollError):
status_code = 422
code = "telegram_channel_unavailable"
class PollFloodWaitError(PollError):
status_code = 429
code = "telegram_flood_wait"
def __init__(self, seconds: int | None) -> None:
self.retry_after = seconds
wait = f"{seconds} sec" if seconds else "a while"
super().__init__(f"Telegram asked to wait {wait} before retrying")
class PollDuplicateChannelError(PollError):
status_code = 409
code = "telegram_channel_duplicate"
def _translate_telegram_error(exc: Exception, identifier: str) -> PollError:
name = type(exc).__name__
message = str(exc)
lower = message.lower()
if name == "FloodWaitError":
return PollFloodWaitError(getattr(exc, "seconds", None))
if name in {
"ChannelPrivateError",
"UsernameInvalidError",
"UsernameNotOccupiedError",
"InviteHashExpiredError",
"InviteHashInvalidError",
"ChatAdminRequiredError",
}:
return PollChannelUnavailableError(message)
if (
"cannot get entity" in lower
or "cannot find any entity" in lower
or "not part of" in lower
or "join the group" in lower
or "invalid channel" in lower
):
return PollChannelUnavailableError(message)
return PollError(f"Cannot poll {identifier}: {message}")
async def poll_channel(channel_id: int) -> int: async def poll_channel(channel_id: int) -> int:
"""Poll one channel for new messages. Returns count of inserted rows.""" """Poll one channel for new messages. Returns count of inserted rows."""
try:
return await _poll_channel(channel_id)
except PollError as exc:
await _record_poll_error(channel_id, exc.code, exc.message)
raise
except Exception as exc:
await _record_poll_error(channel_id, "poll_failed", str(exc))
raise
async def _poll_channel(channel_id: int) -> int:
if not await is_authorized():
raise PollUnauthorizedError(
"Telegram is not authorized: open Monitoring TG in Portal and authorize it"
)
async with session_scope() as session: async with session_scope() as session:
channel = await session.get(Channel, channel_id) channel = await session.get(Channel, channel_id)
if channel is None or not channel.is_active: if channel is None or not channel.is_active:
return 0 return 0
if channel.source_channel_id is not None:
source = await session.get(Channel, channel.source_channel_id)
if source is not None:
channel.tg_id = None
channel.title = channel.title or source.title
channel.last_message_id = source.last_message_id
channel.last_polled_at = source.last_polled_at
channel.last_poll_status = "alias"
channel.last_poll_error_code = None
channel.last_poll_error = None
channel.last_poll_error_at = None
return 0
if channel.tg_id is None or channel.title is None: if channel.tg_id is None or channel.title is None:
resolved = await resolve_channel(channel.identifier) try:
resolved = await resolve_channel(channel.identifier)
except Exception as exc:
raise _translate_telegram_error(exc, channel.identifier) from exc
duplicate_id = (
await session.execute(
select(Channel.id).where(
Channel.tg_id == resolved.tg_id,
Channel.id != channel.id,
)
)
).scalar_one_or_none()
if duplicate_id is not None:
source = await session.get(Channel, duplicate_id)
channel.source_channel_id = duplicate_id
channel.tg_id = None
channel.title = channel.title or resolved.title or (source.title if source else None)
channel.last_message_id = source.last_message_id if source else channel.last_message_id
channel.last_polled_at = source.last_polled_at if source else channel.last_polled_at
channel.last_poll_status = "alias"
channel.last_poll_error_code = None
channel.last_poll_error = None
channel.last_poll_error_at = None
log.info(
"linked_channel_alias",
channel_id=channel.id,
source_channel_id=duplicate_id,
tg_id=resolved.tg_id,
)
return 0
channel.tg_id = resolved.tg_id channel.tg_id = resolved.tg_id
channel.title = resolved.title channel.title = resolved.title
try:
await session.flush()
except IntegrityError as exc:
raise PollDuplicateChannelError(
"Telegram channel is already connected to another channel"
) from exc
msgs = await fetch_new_messages( try:
channel.identifier, msgs = await fetch_new_messages(
min_id=channel.last_message_id, channel.identifier,
limit=settings.poll_history_limit, min_id=channel.last_message_id,
download_media_for_channel_id=channel.id, limit=settings.poll_history_limit,
) download_media_for_channel_id=channel.id,
)
except Exception as exc:
raise _translate_telegram_error(exc, channel.identifier) from exc
inserted = 0 inserted = 0
for m in msgs: for m in msgs:
@@ -70,6 +199,10 @@ async def poll_channel(channel_id: int) -> int:
channel.last_message_id or 0, msgs[-1].tg_message_id channel.last_message_id or 0, msgs[-1].tg_message_id
) )
channel.last_polled_at = datetime.now(timezone.utc) channel.last_polled_at = datetime.now(timezone.utc)
channel.last_poll_status = "ok"
channel.last_poll_error_code = None
channel.last_poll_error = None
channel.last_poll_error_at = None
log.info( log.info(
"polled_channel", "polled_channel",
@@ -81,6 +214,17 @@ async def poll_channel(channel_id: int) -> int:
return inserted return inserted
async def _record_poll_error(channel_id: int, code: str, message: str) -> None:
async with session_scope() as session:
channel = await session.get(Channel, channel_id)
if channel is None:
return
channel.last_poll_status = "error"
channel.last_poll_error_code = code
channel.last_poll_error = message[:1000]
channel.last_poll_error_at = datetime.now(timezone.utc)
async def poll_all() -> None: async def poll_all() -> None:
if not await is_authorized(): if not await is_authorized():
log.debug("poll_skipped_not_authorized") log.debug("poll_skipped_not_authorized")
@@ -93,6 +237,13 @@ async def poll_all() -> None:
for channel_id in ids: for channel_id in ids:
try: try:
await poll_channel(channel_id) await poll_channel(channel_id)
except PollError as exc:
log.warning(
"poll_skipped",
channel_id=channel_id,
code=exc.code,
error=exc.message,
)
except Exception as exc: except Exception as exc:
log.error("poll_failed", channel_id=channel_id, error=str(exc)) log.error("poll_failed", channel_id=channel_id, error=str(exc))
@@ -111,10 +262,15 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
if channel is None: if channel is None:
raise RuntimeError("channel not found") raise RuntimeError("channel not found")
missing_media_condition = or_(
Message.media_files.is_(None),
func.jsonb_array_length(Message.media_files) == 0,
)
pending_q = select(func.count(Message.id)).where( pending_q = select(func.count(Message.id)).where(
Message.channel_id == channel_id, Message.channel_id == channel_id,
Message.has_media.is_(True), Message.has_media.is_(True),
Message.media_files.is_(None), missing_media_condition,
) )
pending_total = (await session.execute(pending_q)).scalar_one() pending_total = (await session.execute(pending_q)).scalar_one()
@@ -124,7 +280,7 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
.where( .where(
Message.channel_id == channel_id, Message.channel_id == channel_id,
Message.has_media.is_(True), Message.has_media.is_(True),
Message.media_files.is_(None), missing_media_condition,
) )
.order_by(Message.tg_message_id.asc()) .order_by(Message.tg_message_id.asc())
.limit(batch_size) .limit(batch_size)

55
src/parser_bot/storage.py Normal file
View File

@@ -0,0 +1,55 @@
import asyncio
from pathlib import Path
from minio import Minio
from parser_bot.config import settings
_client: Minio | None = None
_bucket_ready = False
def configured() -> bool:
return bool(
settings.minio_endpoint
and settings.minio_access_key
and settings.minio_secret_key
and settings.minio_bucket
)
def client() -> Minio | None:
global _client
if not configured():
return None
if _client is None:
endpoint = settings.minio_endpoint.removeprefix("https://").removeprefix("http://")
_client = Minio(
endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
secure=settings.minio_use_ssl,
region=settings.minio_region,
)
return _client
async def upload_file(path: Path, key: str, content_type: str | None) -> None:
cli = client()
if cli is None:
raise RuntimeError("minio is not configured")
def _upload() -> None:
global _bucket_ready
if not _bucket_ready:
if not cli.bucket_exists(settings.minio_bucket):
cli.make_bucket(settings.minio_bucket, location=settings.minio_region)
_bucket_ready = True
cli.fput_object(
settings.minio_bucket,
key,
str(path),
content_type=content_type or "application/octet-stream",
)
await asyncio.to_thread(_upload)

View File

@@ -15,6 +15,7 @@ from telethon.tl.types import (
) )
from parser_bot.config import settings from parser_bot.config import settings
from parser_bot import storage
log = structlog.get_logger() log = structlog.get_logger()
@@ -136,9 +137,26 @@ async def _download_message_media(
if path is None: if path is None:
info["skipped"] = "no_file" info["skipped"] = "no_file"
return [info] return [info]
filename = Path(path).name file_path = Path(path)
filename = file_path.name
media_key = f"{channel_id}/{filename}"
public_base = settings.public_base_path.rstrip("/") public_base = settings.public_base_path.rstrip("/")
info["url"] = f"{public_base}/media/{channel_id}/{filename}" info["url"] = f"{public_base}/media/{media_key}"
info["key"] = media_key
info["name"] = filename
if storage.configured():
try:
await storage.upload_file(file_path, media_key, mime)
info["storage"] = "minio"
try:
file_path.unlink()
except OSError:
pass
except Exception as exc:
log.warning("media_minio_upload_failed", msg_id=msg.id, key=media_key, error=str(exc))
info["storage"] = "local"
else:
info["storage"] = "local"
return [info] return [info]