Compare commits

...

29 Commits

Author SHA1 Message Date
Grendgi
d0f8b48869 feat: add monitoring tg view-all scope
All checks were successful
CI / hygiene (push) Successful in 1s
Build and Deploy / build-and-deploy (push) Successful in 28s
CI / go (push) Successful in 21s
CI / python (push) Successful in 1s
2026-06-19 14:26:51 +03:00
Grendgi
cdbdea250d feat: expose monitoring tg media repair details
All checks were successful
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 45s
CI / go (push) Successful in 31s
CI / python (push) Successful in 2s
2026-06-17 17:22:50 +03:00
Grendgi
696b7eda6f feat: expose monitoring tg poll diagnostics 2026-06-17 17:19:13 +03:00
Grendgi
bd3b54dc7d feat: track monitoring tg poll errors 2026-06-17 17:01:34 +03:00
Grendgi
6cbfdb92e4 feat: expose monitoring tg ai job health 2026-06-17 16:58:47 +03:00
Grendgi
7cc5109cba feat: expose monitoring tg health detail 2026-06-17 15:55:28 +03:00
Grendgi
90c0a346ed chore: use common internal auth 2026-06-17 14:26:04 +03:00
Grendgi
7f7f2427cb chore: use common header parsing 2026-06-17 14:19:13 +03:00
Grendgi
73afcb64d5 Allow monitoring tg access to managed departments
All checks were successful
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 44s
CI / go (push) Successful in 16s
CI / python (push) Successful in 2s
2026-06-15 18:02:09 +03:00
Grendgi
5eb8e21eda Add monitoring TG CI hygiene guard
All checks were successful
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 27s
CI / go (push) Successful in 26s
CI / python (push) Successful in 2s
2026-06-12 16:42:35 +03:00
Grendgi
778b48cc12 Protect monitoring TG API with internal key 2026-06-12 16:32:10 +03:00
Grendgi
1f1354e72b Retry monitoring TG database connection on startup 2026-06-12 16:28:02 +03:00
Grendgi
fd1ee0611b Batch enqueue TG AI classifications
All checks were successful
CI / go (push) Successful in 18s
CI / python (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 29s
2026-06-12 09:01:41 +03:00
Grendgi
fc696c9e13 Add per-section TG classifications
All checks were successful
CI / go (push) Successful in 16s
CI / python (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 46s
2026-06-11 17:06:00 +03:00
Grendgi
f66ca4b6d4 Ignore non-real-estate sales in TG leads
All checks were successful
CI / go (push) Successful in 26s
CI / python (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 27s
2026-06-11 14:51:29 +03:00
Grendgi
eb12190729 Fix monitoring TG CI lint issues
All checks were successful
CI / go (push) Successful in 28s
CI / python (push) Successful in 1s
Build and Deploy / build-and-deploy (push) Successful in 26s
2026-06-09 11:33:20 +03:00
Grendgi
c4ad6c6c84 Add CI workflow
Some checks failed
Build and Deploy / build-and-deploy (push) Successful in 18s
CI / go (push) Failing after 49s
CI / python (push) Failing after 26s
2026-06-09 10:32:22 +03:00
Grendgi
2d0d751115 Share Telegram channels across sections
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 43s
2026-06-08 23:54:49 +03:00
Grendgi
ddea7002f1 Deploy all monitoring TG containers
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 15s
2026-06-08 23:33:08 +03:00
Grendgi
29490e5f93 Return skipped status for Telegram poll limits
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 30s
2026-06-08 23:29:38 +03:00
Grendgi
5165b31910 Handle Telegram poll failures gracefully
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 31s
2026-06-08 23:14:29 +03:00
Grendgi
e075b11761 Fix Docker build context for Go classifier
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 30s
2026-06-08 16:55:55 +03:00
Grendgi
bc15c2e116 Allow monitoring TG AI retries
Some checks failed
Build and Deploy / build-and-deploy (push) Failing after 5s
2026-06-08 15:48:47 +03:00
Grendgi
8259a01a88 Route monitoring TG classification through AI service
Some checks failed
Build and Deploy / build-and-deploy (push) Failing after 5s
2026-06-08 15:47:42 +03:00
Grendgi
a924cd832b Store monitoring TG media in MinIO
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 45s
2026-06-05 16:58:08 +03:00
Grendgi
4ac976b4eb Harden monitoring TG reanalyze reset 2026-06-05 16:35:58 +03:00
Grendgi
fd6fc6b931 Handle scalar TG extracted payloads 2026-06-05 16:31:20 +03:00
Grendgi
76b6230c7a Configure monitoring TG LLM key
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 15s
2026-06-05 16:25:54 +03:00
Grendgi
4e94ce092e Avoid parallel monitoring TG telegram pods 2026-06-05 16:18:00 +03:00
26 changed files with 1993 additions and 289 deletions

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env bash
set -euo pipefail
fail=0
while IFS= read -r -d '' path; do
base="$(basename "$path")"
case "$base" in
.DS_Store|.env)
echo "::error file=$path::tracked local-only file is forbidden"
fail=1
;;
esac
case "$path" in
*node_modules/*|node_modules/*)
echo "::error file=$path::tracked node_modules content is forbidden"
fail=1
;;
*.tmp|*.temp|*.bak|*.orig|*.rej|*.zip|*.tar|*.tar.gz|*.tgz|*.rar|*.7z)
echo "::error file=$path::tracked temporary/archive artifact is forbidden"
fail=1
;;
esac
if [ -f "$path" ]; then
size="$(wc -c < "$path" | tr -d ' ')"
if [ "${size:-0}" -gt 52428800 ]; then
echo "::error file=$path::tracked file is larger than 50 MiB"
fail=1
fi
fi
done < <(git ls-files -z)
exit "$fail"

35
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,35 @@
name: CI
on:
push:
pull_request:
jobs:
hygiene:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: bash .gitea/scripts/hygiene-check.sh
go:
runs-on: ubuntu-latest
needs: hygiene
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- run: go build ./...
- run: go test ./...
- uses: golangci/golangci-lint-action@v7
with:
version: v2.4
args: --config .golangci.yml ./...
python:
runs-on: ubuntu-latest
needs: hygiene
steps:
- uses: actions/checkout@v4
- run: python3 -m compileall src alembic

View File

@@ -54,5 +54,8 @@ jobs:
kubectl apply -f k8s/server-deployment.yaml
kubectl apply -f k8s/server-service.yaml
kubectl -n monitoring-tg set image deployment/monitoring-tg-server \
monitoring-tg-server=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }}
monitoring-tg-migrate=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }} \
monitoring-tg-server=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }} \
monitoring-tg-telegram=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }} \
monitoring-tg-classifier=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }}
kubectl -n monitoring-tg rollout status deployment/monitoring-tg-server --timeout=180s

36
.golangci.yml Normal file
View File

@@ -0,0 +1,36 @@
version: "2"
run:
timeout: 3m
linters:
default: none
enable:
- errcheck
- govet
- ineffassign
- staticcheck
- unused
settings:
errcheck:
check-type-assertions: true
check-blank: false
exclude-functions:
- (io.Closer).Close
- (net/http.ResponseWriter).Write
- (*encoding/json.Encoder).Encode
- io.Copy
- fmt.Fprintf
- (github.com/jackc/pgx/v5.Tx).Rollback
- os.RemoveAll
staticcheck:
checks: ["all", "-SA1019", "-ST1000", "-ST1005", "-ST1020", "-ST1021", "-ST1022"]
exclusions:
rules:
- path: _test\.go
linters:
- errcheck
issues:
max-issues-per-linter: 0
max-same-issues: 0

View File

@@ -4,6 +4,7 @@ WORKDIR /src
COPY go.mod go.sum ./
COPY cmd ./cmd
COPY internal ./internal
RUN CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-classifier ./cmd/classifier \
&& CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-server ./cmd/server

View File

@@ -4,8 +4,8 @@ Backend-сервис мониторинга Telegram-каналов для Porta
AI-классификация работают на Go, Python оставлен только как внутренний
MTProto/Telethon-адаптер для авторизации, опроса каналов и дозагрузки медиа.
Сервис сохраняет сообщения в Postgres, раскладывает каналы по
вертикалям/подразделам и выполняет AI-анализ через OpenAI-compatible endpoint,
общий с другими сервисами портала.
вертикалям/подразделам и выполняет AI-анализ через общий `ai-service`,
который уже сам обращается к OpenAI-compatible backend.
Пользовательский UI живёт в `portal/frontend/src/app/features/monitoring-tg`.
Этот сервис не отдаёт отдельные HTML-страницы и работает как API/worker за
@@ -33,7 +33,7 @@ MTProto/Telethon-адаптер для авторизации, опроса ка
## Запуск в k8s
Манифесты лежат в `k8s/`. Перед применением нужно заполнить `k8s/secrets.yaml`
реальными Telegram-кредами и, при необходимости, `LLM_API_KEY`.
реальными Telegram-кредами и `AI_SERVICE_TOKEN`.
```bash
kubectl apply -k k8s

View File

@@ -0,0 +1,43 @@
"""channel aliases across department sections
Revision ID: 0011
Revises: 0010
Create Date: 2026-06-08
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0011"
down_revision: Union[str, None] = "0010"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column("channels", sa.Column("source_channel_id", sa.Integer(), nullable=True))
op.create_foreign_key(
"fk_channels_source_channel_id",
"channels",
"channels",
["source_channel_id"],
["id"],
ondelete="SET NULL",
)
op.drop_constraint("channels_identifier_key", "channels", type_="unique")
op.create_unique_constraint(
"uq_channels_section_identifier",
"channels",
["section_id", "identifier"],
)
op.create_index("ix_channels_source_channel_id", "channels", ["source_channel_id"])
def downgrade() -> None:
op.drop_index("ix_channels_source_channel_id", table_name="channels")
op.drop_constraint("uq_channels_section_identifier", "channels", type_="unique")
op.create_unique_constraint("channels_identifier_key", "channels", ["identifier"])
op.drop_constraint("fk_channels_source_channel_id", "channels", type_="foreignkey")
op.drop_column("channels", "source_channel_id")

View File

@@ -0,0 +1,38 @@
"""per-section message classifications
Revision ID: 0012
Revises: 0011
Create Date: 2026-06-11
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
revision: str = "0012"
down_revision: Union[str, None] = "0011"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"message_classifications",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("message_id", sa.Integer(), sa.ForeignKey("messages.id", ondelete="CASCADE"), nullable=False),
sa.Column("section_id", sa.Integer(), sa.ForeignKey("sections.id", ondelete="CASCADE"), nullable=False),
sa.Column("vertical", sa.String(length=32), nullable=False),
sa.Column("verdict", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.UniqueConstraint("message_id", "section_id", name="uq_message_classification_section"),
)
op.create_index("ix_message_classifications_message", "message_classifications", ["message_id"])
op.create_index("ix_message_classifications_section", "message_classifications", ["section_id"])
def downgrade() -> None:
op.drop_index("ix_message_classifications_section", table_name="message_classifications")
op.drop_index("ix_message_classifications_message", table_name="message_classifications")
op.drop_table("message_classifications")

View File

@@ -0,0 +1,34 @@
"""channel poll status
Revision ID: 0013
Revises: 0012
Create Date: 2026-06-17
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0013"
down_revision: Union[str, None] = "0012"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column("channels", sa.Column("last_poll_status", sa.String(length=32), nullable=True))
op.add_column("channels", sa.Column("last_poll_error_code", sa.String(length=64), nullable=True))
op.add_column("channels", sa.Column("last_poll_error", sa.Text(), nullable=True))
op.add_column("channels", sa.Column("last_poll_error_at", sa.DateTime(timezone=True), nullable=True))
op.create_index("ix_channels_last_poll_status", "channels", ["last_poll_status"])
op.create_index("ix_channels_last_poll_error_code", "channels", ["last_poll_error_code"])
def downgrade() -> None:
op.drop_index("ix_channels_last_poll_error_code", table_name="channels")
op.drop_index("ix_channels_last_poll_status", table_name="channels")
op.drop_column("channels", "last_poll_error_at")
op.drop_column("channels", "last_poll_error")
op.drop_column("channels", "last_poll_error_code")
op.drop_column("channels", "last_poll_status")

View File

@@ -1,14 +1,11 @@
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"os/signal"
@@ -17,6 +14,9 @@ import (
"syscall"
"time"
"monitoring-tg/internal/aiservice"
"monitoring-tg/internal/dbretry"
"github.com/jackc/pgx/v5/pgxpool"
)
@@ -33,18 +33,19 @@ type config struct {
PostgresPort int
LLMEnabled bool
LLMBaseURL string
LLMAPIKey string
LLMModel string
LLMTimeout time.Duration
LLMMaxTokens int
LLMMinTextLength int
ClassifyInterval time.Duration
ClassifyBatchSize int
AIServiceURL string
AIServiceToken string
}
type pendingMessage struct {
ID int64
SectionID int64
Text string
Vertical string
SectionSlug string
@@ -52,29 +53,10 @@ type pendingMessage struct {
Extracted map[string]any
}
type chatRequest struct {
Model string `json:"model"`
Messages []chatMessage `json:"messages"`
Temperature float64 `json:"temperature"`
MaxTokens int `json:"max_tokens"`
ResponseFormat responseFmt `json:"response_format"`
}
type chatMessage struct {
Role string `json:"role"`
Content string `json:"content"`
}
type responseFmt struct {
Type string `json:"type"`
}
type chatResponse struct {
Choices []struct {
Message chatMessage `json:"message"`
} `json:"choices"`
}
func main() {
cfg := loadConfig()
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
@@ -88,14 +70,18 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
pool, err := pgxpool.New(ctx, cfg.databaseURL())
pool, err := dbretry.Connect(ctx, cfg.databaseURL(), 2*time.Minute)
if err != nil {
slog.Error("db_connect_failed", "error", err)
os.Exit(1)
}
defer pool.Close()
worker := &classifier{cfg: cfg, db: pool, http: &http.Client{Timeout: cfg.LLMTimeout}}
worker := &classifier{
cfg: cfg,
db: pool,
ai: aiservice.New(cfg.AIServiceURL, cfg.AIServiceToken, cfg.LLMTimeout),
}
slog.Info(
"classifier_started",
"interval", cfg.ClassifyInterval.String(),
@@ -107,11 +93,11 @@ func main() {
defer ticker.Stop()
for {
updated, err := worker.runOnce(ctx)
updated, enqueued, err := worker.runOnce(ctx)
if err != nil {
slog.Error("classify_batch_failed", "error", err)
} else if updated > 0 {
slog.Info("classify_batch_done", "updated", updated)
} else if updated > 0 || enqueued > 0 {
slog.Info("classify_batch_done", "updated", updated, "enqueued", enqueued)
}
select {
@@ -126,65 +112,114 @@ func main() {
type classifier struct {
cfg config
db *pgxpool.Pool
http *http.Client
ai *aiservice.Client
}
func (c *classifier) runOnce(ctx context.Context) (int, error) {
func (c *classifier) runOnce(ctx context.Context) (int, int, error) {
rows, err := c.loadPending(ctx)
if err != nil {
return 0, err
return 0, 0, err
}
if len(rows) == 0 {
return 0, nil
return 0, 0, nil
}
byRef := make(map[string]pendingMessage, len(rows))
jobs := make([]aiservice.CreateJobRequest, 0, len(rows))
updated := 0
for _, msg := range rows {
key := verdictKey(msg.Vertical)
if _, ok := msg.Extracted[key]; ok {
continue
}
verdict, err := c.classify(ctx, msg)
if len(strings.TrimSpace(msg.Text)) < c.cfg.LLMMinTextLength {
verdict, err := marshalRaw(negativeVerdict(msg.Vertical))
if err != nil {
slog.Warn("llm_classify_failed", "message_id", msg.ID, "vertical", msg.Vertical, "error", err)
slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err)
continue
}
if len(verdict) == 0 {
verdict, err = marshalRaw(negativeVerdict(msg.Vertical))
if err != nil {
slog.Warn("negative_verdict_failed", "message_id", msg.ID, "error", err)
continue
}
}
if err := c.saveVerdict(ctx, msg.ID, key, verdict); err != nil {
slog.Warn("save_verdict_failed", "message_id", msg.ID, "error", err)
if err := c.saveVerdict(ctx, msg, key, verdict); err != nil {
slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err)
continue
}
updated++
continue
}
return updated, nil
req, err := c.buildJobRequest(ctx, msg)
if err != nil {
slog.Warn("build_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "vertical", msg.Vertical, "error", err)
continue
}
byRef[req.OwnerRef] = msg
jobs = append(jobs, req)
}
if len(jobs) == 0 {
return updated, 0, nil
}
created, err := c.ai.CreateJobs(ctx, aiservice.CreateJobsRequest{
OwnerService: "monitoring-tg",
TaskType: "telegram_classification",
ModelProfile: c.cfg.LLMModel,
Priority: 5,
MaxAttempts: 2,
Jobs: jobs,
})
if err != nil {
return updated, 0, err
}
for _, job := range created {
msg, ok := byRef[job.OwnerRef]
if !ok {
continue
}
switch job.Status {
case "done":
verdict, err := c.verdictFromJob(job, msg.Vertical)
if err != nil {
slog.Warn("parse_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil {
slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
updated++
case "failed", "cancelled":
verdict, err := marshalRaw(negativeVerdict(msg.Vertical))
if err != nil {
slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil {
slog.Warn("save_failed_job_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
continue
}
updated++
slog.Warn("classify_job_failed_marked_negative", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "status", job.Status, "error", derefString(job.ErrorMessage))
}
}
return updated, len(created), nil
}
func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error) {
rows, err := c.db.Query(ctx, `
SELECT
m.id,
s.id,
m.text,
c.vertical,
s.slug,
COALESCE(s.department_id, ''),
COALESCE(m.extracted, '{}'::jsonb)::text
COALESCE(mc.verdict, '{}'::jsonb)::text
FROM messages m
JOIN channels c ON c.id = m.channel_id
JOIN channels src ON src.id = m.channel_id
JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id
JOIN sections s ON s.id = c.section_id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE m.text IS NOT NULL
AND (
(c.vertical = 'hr' AND (m.extracted IS NULL OR m.extracted->'hr_lead' IS NULL))
OR
(c.vertical <> 'hr' AND (m.extracted IS NULL OR m.extracted->'lead' IS NULL))
)
AND mc.id IS NULL
ORDER BY m.id DESC
LIMIT $1
`, c.cfg.ClassifyBatchSize)
@@ -197,7 +232,7 @@ func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error)
for rows.Next() {
var msg pendingMessage
var extractedText string
if err := rows.Scan(&msg.ID, &msg.Text, &msg.Vertical, &msg.SectionSlug, &msg.DepartmentID, &extractedText); err != nil {
if err := rows.Scan(&msg.ID, &msg.SectionID, &msg.Text, &msg.Vertical, &msg.SectionSlug, &msg.DepartmentID, &extractedText); err != nil {
return nil, err
}
if err := json.Unmarshal([]byte(extractedText), &msg.Extracted); err != nil {
@@ -208,60 +243,56 @@ func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error)
return out, rows.Err()
}
func (c *classifier) classify(ctx context.Context, msg pendingMessage) (json.RawMessage, error) {
if len(strings.TrimSpace(msg.Text)) < c.cfg.LLMMinTextLength {
return marshalRaw(negativeVerdict(msg.Vertical))
}
func (c *classifier) buildJobRequest(ctx context.Context, msg pendingMessage) (aiservice.CreateJobRequest, error) {
systemPrompt, err := c.resolvePrompt(ctx, msg.Vertical, msg.DepartmentID, msg.SectionSlug)
if err != nil {
return nil, err
return aiservice.CreateJobRequest{}, err
}
systemPrompt = promptWithVerticalGuard(msg.Vertical, systemPrompt)
payload := chatRequest{
Model: c.cfg.LLMModel,
Messages: []chatMessage{
responseFormat, _ := json.Marshal(responseFmt{Type: "json_object"})
payload := aiservice.ChatInput{
Messages: []aiservice.Message{
{Role: "system", Content: systemPrompt},
{Role: "user", Content: buildUserPrompt(msg.Text)},
},
Temperature: 0.1,
MaxTokens: c.cfg.LLMMaxTokens,
ResponseFormat: responseFmt{Type: "json_object"},
ResponseFormat: responseFormat,
}
body, err := json.Marshal(payload)
if err != nil {
return aiservice.CreateJobRequest{}, err
}
ownerRef := classifyOwnerRef(msg)
return aiservice.CreateJobRequest{
OwnerService: "monitoring-tg",
OwnerRef: ownerRef,
TaskType: "telegram_classification",
ModelProfile: c.cfg.LLMModel,
Priority: 5,
MaxAttempts: 2,
Input: body,
// Classification is section-specific because prompts are section-specific.
IdempotencyKey: "monitoring-tg:telegram_classification:" + ownerRef,
}, nil
}
func (c *classifier) verdictFromJob(job *aiservice.Job, vertical string) (json.RawMessage, error) {
if job.Status != "done" {
msg := "ai-service job " + job.Status
if job.ErrorMessage != nil && *job.ErrorMessage != "" {
msg += ": " + *job.ErrorMessage
}
return nil, errors.New(msg)
}
var parsed aiservice.ChatResult
if err := json.Unmarshal(job.Result, &parsed); err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, strings.TrimRight(c.cfg.LLMBaseURL, "/")+"/v1/chat/completions", bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
if c.cfg.LLMAPIKey != "" {
req.Header.Set("Authorization", "Bearer "+c.cfg.LLMAPIKey)
}
resp, err := c.http.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
return nil, fmt.Errorf("llm http %d: %s", resp.StatusCode, strings.TrimSpace(string(b)))
}
var parsed chatResponse
if err := json.NewDecoder(resp.Body).Decode(&parsed); err != nil {
return nil, err
}
if len(parsed.Choices) == 0 {
return nil, errors.New("llm returned no choices")
}
raw := strings.TrimSpace(parsed.Choices[0].Message.Content)
raw := strings.TrimSpace(parsed.Content)
if raw == "" {
return nil, errors.New("llm returned empty content")
}
@@ -269,7 +300,7 @@ func (c *classifier) classify(ctx context.Context, msg pendingMessage) (json.Raw
if err != nil {
return nil, err
}
normalized, err := normalizeVerdict(msg.Vertical, block)
normalized, err := normalizeVerdict(vertical, block)
if err != nil {
return nil, err
}
@@ -297,12 +328,28 @@ func (c *classifier) resolvePrompt(ctx context.Context, vertical, departmentID,
return defaultPrompt(vertical), nil
}
func (c *classifier) saveVerdict(ctx context.Context, id int64, key string, verdict json.RawMessage) error {
func (c *classifier) saveVerdict(ctx context.Context, msg pendingMessage, key string, verdict json.RawMessage) error {
_, err := c.db.Exec(ctx, `
INSERT INTO message_classifications (message_id, section_id, vertical, verdict, updated_at)
VALUES ($1, $2, $3, $4::jsonb, now())
ON CONFLICT ON CONSTRAINT uq_message_classification_section DO UPDATE
SET vertical = EXCLUDED.vertical,
verdict = EXCLUDED.verdict,
updated_at = now()
`, msg.ID, msg.SectionID, msg.Vertical, string(verdict))
if err != nil {
return err
}
_, err = c.db.Exec(ctx, `
UPDATE messages
SET extracted = jsonb_set(COALESCE(extracted, '{}'::jsonb), ARRAY[$2], $3::jsonb, true)
SET extracted = jsonb_set(
CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END,
ARRAY[$2],
$3::jsonb,
true
)
WHERE id = $1
`, id, key, string(verdict))
`, msg.ID, key, string(verdict))
return err
}
@@ -320,6 +367,10 @@ func verdictKey(vertical string) string {
return "lead"
}
func classifyOwnerRef(msg pendingMessage) string {
return fmt.Sprintf("%d:%d", msg.ID, msg.SectionID)
}
func buildUserPrompt(text string) string {
return "Текст сообщения:\n```\n" + text + "\n```\nВерни JSON."
}
@@ -405,6 +456,13 @@ func asFloat(v any) (float64, bool) {
}
}
func derefString(v *string) string {
if v == nil {
return ""
}
return *v
}
func defaultPrompt(vertical string) string {
if vertical == verticalHR {
return defaultHRPrompt
@@ -412,6 +470,16 @@ func defaultPrompt(vertical string) string {
return defaultREPrompt
}
func promptWithVerticalGuard(vertical, prompt string) string {
if vertical == verticalHR {
return prompt
}
if strings.Contains(prompt, realEstateOnlyGuard) {
return prompt
}
return strings.TrimSpace(prompt) + "\n\n" + realEstateOnlyGuard
}
func loadConfig() config {
return config{
PostgresUser: env("POSTGRES_USER", "parser"),
@@ -420,14 +488,14 @@ func loadConfig() config {
PostgresHost: env("POSTGRES_HOST", "db"),
PostgresPort: envInt("POSTGRES_PORT", 5432),
LLMEnabled: envBool("LLM_ENABLED", true),
LLMBaseURL: env("LLM_BASE_URL", "http://10.2.3.5:8002"),
LLMAPIKey: env("LLM_API_KEY", ""),
LLMModel: env("LLM_MODEL", "qwen2.5-14b"),
LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second,
LLMMaxTokens: envInt("LLM_MAX_TOKENS", 600),
LLMMinTextLength: envInt("LLM_MIN_TEXT_LENGTH", 20),
ClassifyInterval: time.Duration(envInt("LLM_CLASSIFY_INTERVAL_SECONDS", 20)) * time.Second,
ClassifyBatchSize: envInt("LLM_CLASSIFY_BATCH_SIZE", 5),
ClassifyInterval: time.Duration(envInt("LLM_CLASSIFY_INTERVAL_SECONDS", 5)) * time.Second,
ClassifyBatchSize: envInt("LLM_CLASSIFY_BATCH_SIZE", 200),
AIServiceURL: env("AI_SERVICE_URL", ""),
AIServiceToken: env("AI_SERVICE_TOKEN", ""),
}
}
@@ -475,6 +543,8 @@ func envBool(key string, fallback bool) bool {
const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала.
Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости.
Учитывай только сделки с недвижимостью: квартиры, дома, апартаменты, участки, коммерческие помещения и другие объекты недвижимости.
Любые продажи или заявки по стройматериалам, мебели, бытовой/строительной технике, автомобилям, услугам, оборудованию и прочим товарам не являются лидами недвижимости — для них is_listing=false.
Отвечай строго валидным JSON без markdown:
{
"is_listing": boolean,
@@ -493,6 +563,9 @@ const defaultREPrompt = `Ты — аналитик объявлений о не
}
summary всегда по-русски, confidence в диапазоне 0..1.`
const realEstateOnlyGuard = `Жёсткое правило для недвижимости: учитывай только сделки с недвижимостью.
Если сообщение продаёт или покупает стройматериалы, мебель, бытовую/строительную технику, автомобили, услуги, оборудование или любые другие товары/работы не по недвижимости — это НЕ лид недвижимости, верни is_listing=false.`
const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала.
Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт.
Отвечай строго валидным JSON без markdown:

File diff suppressed because it is too large Load Diff

28
go.mod
View File

@@ -2,12 +2,34 @@ module monitoring-tg
go 1.25.7
require github.com/jackc/pgx/v5 v5.9.1
require (
gitea.estateliga.work/admin/portal-common v0.3.0
github.com/jackc/pgx/v5 v5.9.1
github.com/minio/minio-go/v7 v7.2.0
)
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/text v0.29.0 // indirect
github.com/klauspost/compress v1.18.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.11 // indirect
github.com/klauspost/crc32 v1.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/minio/crc64nvme v1.1.1 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/philhofer/fwd v1.2.0 // indirect
github.com/rs/xid v1.6.0 // indirect
github.com/tinylib/msgp v1.6.1 // indirect
github.com/zeebo/xxh3 v1.1.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.51.0 // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.44.0 // indirect
golang.org/x/text v0.37.0 // indirect
gopkg.in/ini.v1 v1.67.2 // indirect
)

64
go.sum
View File

@@ -1,6 +1,15 @@
gitea.estateliga.work/admin/portal-common v0.3.0 h1:xpr9UeLXk5pCcNXcTVGZzJZr0Ni7An7DV0OkuYv9qVM=
gitea.estateliga.work/admin/portal-common v0.3.0/go.mod h1:C860q6g38KVMsv+mKv6k1Vm7smVRCycl+N6r63TElnk=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@@ -9,18 +18,65 @@ github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc=
github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao=
github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/klauspost/crc32 v1.3.0 h1:sSmTt3gUt81RP655XGZPElI0PelVTZ6YwCRnPSupoFM=
github.com/klauspost/crc32 v1.3.0/go.mod h1:D7kQaZhnkX/Y0tstFGf8VUzv2UofNGqCjnC3zdHB0Hw=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/minio/crc64nvme v1.1.1 h1:8dwx/Pz49suywbO+auHCBpCtlW1OfpcLN7wYgVR6wAI=
github.com/minio/crc64nvme v1.1.1/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.2.0 h1:RCJM0R1XOsRs+A3x3UCaf3ZYbByDaLjFeAi+YCQEPhs=
github.com/minio/minio-go/v7 v7.2.0/go.mod h1:EU9hENAStx/xXduNdrGO5e4X5vk19NtgB+RIPjZO8o0=
github.com/philhofer/fwd v1.2.0 h1:e6DnBTl7vGY+Gz322/ASL4Gyp1FspeMvx1RNDoToZuM=
github.com/philhofer/fwd v1.2.0/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
github.com/tinylib/msgp v1.6.1 h1:ESRv8eL3u+DNHUoSAAQRE50Hm162zqAnBoGv9PzScPY=
github.com/tinylib/msgp v1.6.1/go.mod h1:RSp0LW9oSxFut3KzESt5Voq4GVWyS+PSulT77roAqEA=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/ini.v1 v1.67.2 h1:JtOSMb9OuaCZKr7h5D/h6iii14sK0hLbplTc6frx4Ss=
gopkg.in/ini.v1 v1.67.2/go.mod h1:x/cyOwCgZqOkJoDIJ3c1KNHMo10+nLGAhh+kn3Zizss=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,308 @@
package aiservice
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)
type Client struct {
baseURL string
token string
http *http.Client
}
type Message struct {
Role string `json:"role"`
Content string `json:"content"`
}
type ChatInput struct {
Messages []Message `json:"messages"`
Temperature float64 `json:"temperature"`
MaxTokens int `json:"max_tokens,omitempty"`
ResponseFormat json.RawMessage `json:"response_format,omitempty"`
}
type CreateJobRequest struct {
OwnerService string `json:"owner_service"`
OwnerRef string `json:"owner_ref"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Priority int `json:"priority"`
MaxAttempts int `json:"max_attempts"`
Input json.RawMessage `json:"input"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
}
type CreateJobsRequest struct {
OwnerService string `json:"owner_service,omitempty"`
TaskType string `json:"task_type,omitempty"`
ModelProfile string `json:"model_profile,omitempty"`
Priority int `json:"priority,omitempty"`
MaxAttempts int `json:"max_attempts,omitempty"`
Jobs []CreateJobRequest `json:"jobs"`
}
type Job struct {
ID string `json:"id"`
OwnerService string `json:"owner_service,omitempty"`
OwnerRef string `json:"owner_ref,omitempty"`
TaskType string `json:"task_type,omitempty"`
ModelProfile string `json:"model_profile,omitempty"`
Status string `json:"status"`
Result json.RawMessage `json:"result,omitempty"`
ErrorCode *string `json:"error_code,omitempty"`
ErrorMessage *string `json:"error_message,omitempty"`
IdempotencyKey *string `json:"idempotency_key,omitempty"`
}
type ChatResult struct {
Content string `json:"content"`
Model string `json:"model"`
DurationMS int64 `json:"duration_ms"`
}
type ProvidersStatus struct {
At time.Time `json:"at"`
Providers []ProviderStatus `json:"providers"`
}
type ProviderStatus struct {
Name string `json:"name"`
Configured bool `json:"configured"`
OK bool `json:"ok"`
URL string `json:"url,omitempty"`
Model string `json:"model,omitempty"`
LatencyMS int64 `json:"latency_ms,omitempty"`
Error string `json:"error,omitempty"`
}
type Stats struct {
At time.Time `json:"at"`
Owners []OwnerStat `json:"owners,omitempty"`
Errors []ErrorStat `json:"errors,omitempty"`
Backlog []BacklogStat `json:"backlog,omitempty"`
}
type OwnerStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Status string `json:"status"`
Total int64 `json:"total"`
}
type ErrorStat struct {
OwnerService string `json:"owner_service,omitempty"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
ErrorCode string `json:"error_code"`
Total int64 `json:"total"`
Last24h int64 `json:"last_24h"`
}
type BacklogStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Pending int64 `json:"pending"`
Running int64 `json:"running"`
StaleRunning int64 `json:"stale_running"`
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"`
}
func New(baseURL, token string, timeout time.Duration) *Client {
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
if baseURL == "" {
return nil
}
if timeout <= 0 {
timeout = 2 * time.Minute
}
return &Client{
baseURL: baseURL,
token: strings.TrimSpace(token),
http: &http.Client{Timeout: timeout},
}
}
func (c *Client) CreateJob(ctx context.Context, req CreateJobRequest) (*Job, error) {
if c == nil {
return nil, fmt.Errorf("ai-service not configured")
}
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal ai job: %w", err)
}
httpReq, err := c.request(ctx, http.MethodPost, "/api/v1/jobs", body)
if err != nil {
return nil, err
}
resp, err := c.http.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("create ai job: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("create ai job: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out Job
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai job: %w", err)
}
return &out, nil
}
func (c *Client) CreateJobs(ctx context.Context, req CreateJobsRequest) ([]*Job, error) {
if c == nil {
return nil, fmt.Errorf("ai-service not configured")
}
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal ai jobs: %w", err)
}
httpReq, err := c.request(ctx, http.MethodPost, "/api/v1/jobs/batch", body)
if err != nil {
return nil, err
}
resp, err := c.http.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("create ai jobs: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("create ai jobs: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out struct {
Jobs []*Job `json:"jobs"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai jobs: %w", err)
}
return out.Jobs, nil
}
func (c *Client) GetJob(ctx context.Context, id string) (*Job, error) {
if c == nil || strings.TrimSpace(id) == "" {
return nil, fmt.Errorf("ai job id is required")
}
req, err := c.request(ctx, http.MethodGet, "/api/v1/jobs/"+id, nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("get ai job: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("get ai job: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out Job
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai job: %w", err)
}
return &out, nil
}
func (c *Client) WaitJob(ctx context.Context, id string, pollInterval time.Duration) (*Job, error) {
if pollInterval <= 0 {
pollInterval = 2 * time.Second
}
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
job, err := c.GetJob(ctx, id)
if err != nil {
return nil, err
}
switch job.Status {
case "done", "failed", "cancelled":
return job, nil
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ticker.C:
}
}
}
func (c *Client) ProvidersStatus(ctx context.Context) (*ProvidersStatus, error) {
if c == nil {
return nil, fmt.Errorf("ai-service not configured")
}
req, err := c.request(ctx, http.MethodGet, "/api/v1/providers/status", nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("ai providers status: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("ai providers status: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out ProvidersStatus
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai providers status: %w", err)
}
return &out, nil
}
func (c *Client) Stats(ctx context.Context) (*Stats, error) {
if c == nil {
return nil, fmt.Errorf("ai-service not configured")
}
req, err := c.request(ctx, http.MethodGet, "/api/v1/stats", nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("ai stats: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("ai stats: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out Stats
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai stats: %w", err)
}
return &out, nil
}
func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) {
var r io.Reader
if body != nil {
r = bytes.NewReader(body)
}
req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, r)
if err != nil {
return nil, err
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
if c.token != "" {
req.Header.Set("Authorization", "Bearer "+c.token)
}
return req, nil
}
func readSmall(r io.Reader) string {
body, err := io.ReadAll(io.LimitReader(r, 1024))
if err != nil {
return err.Error()
}
return strings.TrimSpace(string(body))
}

View File

@@ -0,0 +1,44 @@
package dbretry
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
func Connect(ctx context.Context, databaseURL string, maxWait time.Duration) (*pgxpool.Pool, error) {
deadline := time.Now().Add(maxWait)
var lastErr error
for attempt := 1; ; attempt++ {
pool, err := pgxpool.New(ctx, databaseURL)
if err == nil {
if pingErr := pool.Ping(ctx); pingErr == nil {
return pool, nil
} else {
err = fmt.Errorf("ping postgres: %w", pingErr)
pool.Close()
}
} else {
err = fmt.Errorf("connect postgres: %w", err)
}
lastErr = err
if time.Now().After(deadline) {
return nil, fmt.Errorf("connect postgres after retry: %w", lastErr)
}
sleep := time.Duration(attempt) * time.Second
if sleep > 5*time.Second {
sleep = 5 * time.Second
}
timer := time.NewTimer(sleep)
select {
case <-ctx.Done():
timer.Stop()
return nil, fmt.Errorf("connect postgres cancelled: %w", ctx.Err())
case <-timer.C:
}
}
}

View File

@@ -14,10 +14,15 @@ data:
POSTGRES_DB: "parser"
TG_SESSION_PATH: "/data/session/parser.session"
MEDIA_DIR: "/data/media"
MINIO_ENDPOINT: "s3-minio.estateliga.work"
MINIO_BUCKET: "monitoring-tg-media"
MINIO_USE_SSL: "1"
MINIO_INSECURE_SKIP_VERIFY: "0"
MINIO_REGION: "us-east-1"
POLL_INTERVAL_SECONDS: "60"
POLL_HISTORY_LIMIT: "50"
LLM_ENABLED: "1"
LLM_BASE_URL: "http://10.2.3.5:8002"
LLM_MODEL: "qwen2.5-14b"
LLM_MAX_TOKENS: "600"
LLM_CLASSIFIER_OWNER: "go"
AI_SERVICE_URL: "http://ai-service.ai-service.svc.cluster.local:8080"

View File

@@ -10,7 +10,10 @@ stringData:
TG_PHONE: "+971524994695"
TG_SESSION_STRING: ""
POSTGRES_PASSWORD: "parser"
LLM_API_KEY: ""
INTERNAL_API_KEY: "36fe89ed40c01fdc54d3cf4e3fcacc8751dc456a4a1acd394e9fed48257c5734"
AI_SERVICE_TOKEN: "d18bcacf9e02bae1806ee6b6eeda62b95be6a915c0a22936d9a700128b275442"
MINIO_ACCESS_KEY: "admjn"
MINIO_SECRET_KEY: "TropicalMacaw9Fantasize"
---
apiVersion: v1
kind: Secret

View File

@@ -17,6 +17,8 @@ metadata:
namespace: monitoring-tg
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: monitoring-tg-server
@@ -25,6 +27,10 @@ spec:
labels:
app: monitoring-tg-server
spec:
hostAliases:
- ip: "77.105.173.42"
hostnames:
- "s3-minio.estateliga.work"
terminationGracePeriodSeconds: 20
securityContext:
fsGroup: 1000

View File

@@ -14,6 +14,7 @@ dependencies = [
"pydantic>=2.9",
"pydantic-settings>=2.6",
"structlog>=24.4",
"minio>=7.2",
]
[project.optional-dependencies]

View File

@@ -13,6 +13,19 @@ def portal_department_id(request: Request) -> str | None:
return value or None
def portal_department_ids(request: Request) -> list[str]:
raw = (request.headers.get("x-user-department-ids") or "").strip()
out: list[str] = []
for part in raw.split(","):
value = part.strip()
if value and value not in out:
out.append(value)
current = portal_department_id(request)
if current and current not in out:
out.append(current)
return out
def is_department_head_request(request: Request) -> bool:
return request.headers.get("x-user-is-department-head") == "1"

View File

@@ -7,14 +7,14 @@ from sqlalchemy.ext.asyncio import AsyncSession
from parser_bot.access import (
is_admin_request,
portal_department_id,
portal_department_ids,
require_department_manager,
require_telegram_auth_manager,
)
from parser_bot.config import settings
from parser_bot.db.models import Channel, Section
from parser_bot.db.session import get_session
from parser_bot.scheduler.poller import backfill_media, poll_channel
from parser_bot.scheduler.poller import PollError, backfill_media, poll_channel
from parser_bot.telegram import client as tg
router = APIRouter()
@@ -39,13 +39,13 @@ class AuthCodeResult(BaseModel):
needs_password: bool
def _department_scope(request: Request) -> str | None:
def _department_scopes(request: Request) -> list[str] | None:
if is_admin_request(request):
return None
dept_id = portal_department_id(request)
if not dept_id:
dept_ids = portal_department_ids(request)
if not dept_ids:
raise HTTPException(status_code=403, detail="department is required")
return dept_id
return dept_ids
async def _require_channel_scope(
@@ -55,7 +55,7 @@ async def _require_channel_scope(
vertical: str | None,
section: str | None,
) -> None:
department_id = _department_scope(request)
department_ids = _department_scopes(request)
stmt = (
select(Channel.id)
.join(Section, Section.id == Channel.section_id)
@@ -65,13 +65,25 @@ async def _require_channel_scope(
stmt = stmt.where(Channel.vertical == vertical)
if section:
stmt = stmt.where(Section.slug == section)
if department_id is not None:
stmt = stmt.where(Section.department_id == department_id)
if department_ids is not None:
stmt = stmt.where(Section.department_id.in_(department_ids))
exists = (await session.execute(stmt)).scalar_one_or_none()
if exists is None:
raise HTTPException(status_code=404)
def _poll_skipped_result(exc: PollError) -> dict[str, Any]:
result: dict[str, Any] = {
"inserted": 0,
"status": "skipped",
"code": exc.code,
"message": exc.message,
}
if exc.retry_after is not None:
result["retry_after"] = exc.retry_after
return result
@router.get("/auth/status", response_model=AuthStatus, dependencies=[Depends(require_telegram_auth_manager)])
async def auth_status() -> AuthStatus:
authorized = await tg.is_authorized()
@@ -126,10 +138,13 @@ async def trigger_poll(
vertical: str | None = Query(None),
section: str | None = Query(None),
session: AsyncSession = Depends(get_session),
) -> dict[str, int]:
) -> dict[str, Any]:
await _require_channel_scope(session, request, channel_id, vertical, section)
try:
inserted = await poll_channel(channel_id)
return {"inserted": inserted}
except PollError as exc:
return _poll_skipped_result(exc)
return {"inserted": inserted, "status": "ok"}
@router.post(
@@ -167,7 +182,7 @@ async def trigger_poll_all(
section: str | None = Query(None),
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
department_id = _department_scope(request)
department_ids = _department_scopes(request)
stmt = (
select(Channel.id)
.join(Section, Section.id == Channel.section_id)
@@ -175,8 +190,8 @@ async def trigger_poll_all(
)
if section:
stmt = stmt.where(Section.slug == section)
if department_id is not None:
stmt = stmt.where(Section.department_id == department_id)
if department_ids is not None:
stmt = stmt.where(Section.department_id.in_(department_ids))
result = await session.execute(stmt)
ids = [row[0] for row in result.all()]
background.add_task(_poll_all_in_background, ids)

View File

@@ -28,6 +28,12 @@ class Settings(BaseSettings):
media_dir: str = Field("/data/media", alias="MEDIA_DIR")
media_max_bytes: int = Field(20 * 1024 * 1024, alias="MEDIA_MAX_BYTES")
minio_endpoint: str = Field("", alias="MINIO_ENDPOINT")
minio_access_key: str = Field("", alias="MINIO_ACCESS_KEY")
minio_secret_key: str = Field("", alias="MINIO_SECRET_KEY")
minio_bucket: str = Field("monitoring-tg-media", alias="MINIO_BUCKET")
minio_use_ssl: bool = Field(True, alias="MINIO_USE_SSL")
minio_region: str = Field("us-east-1", alias="MINIO_REGION")
@property
def database_url(self) -> str:

View File

@@ -54,12 +54,16 @@ class Section(Base):
class Channel(Base):
__tablename__ = "channels"
__table_args__ = (
UniqueConstraint("section_id", "identifier", name="uq_channels_section_identifier"),
Index("ix_channels_source_channel_id", "source_channel_id"),
)
id: Mapped[int] = mapped_column(primary_key=True)
# Telegram numeric channel id (peer id), nullable until first resolve
tg_id: Mapped[int | None] = mapped_column(BigInteger, unique=True, nullable=True)
# Username or t.me/joinchat link supplied by user
identifier: Mapped[str] = mapped_column(String(255), unique=True)
identifier: Mapped[str] = mapped_column(String(255))
title: Mapped[str | None] = mapped_column(String(512), nullable=True)
# 'real_estate' or 'hr' — picks which LLM prompt and lead schema is used
vertical: Mapped[str] = mapped_column(
@@ -68,9 +72,16 @@ class Channel(Base):
section_id: Mapped[int] = mapped_column(
ForeignKey("sections.id", ondelete="RESTRICT"), index=True
)
source_channel_id: Mapped[int | None] = mapped_column(
ForeignKey("channels.id", ondelete="SET NULL"), nullable=True
)
is_active: Mapped[bool] = mapped_column(default=True, server_default="true")
last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
last_poll_status: Mapped[str | None] = mapped_column(String(32), nullable=True)
last_poll_error_code: Mapped[str | None] = mapped_column(String(64), nullable=True)
last_poll_error: Mapped[str | None] = mapped_column(Text, nullable=True)
last_poll_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
@@ -112,6 +123,24 @@ class Message(Base):
channel: Mapped[Channel] = relationship(back_populates="messages")
class MessageClassification(Base):
__tablename__ = "message_classifications"
__table_args__ = (
UniqueConstraint("message_id", "section_id", name="uq_message_classification_section"),
Index("ix_message_classifications_message", "message_id"),
Index("ix_message_classifications_section", "section_id"),
)
id: Mapped[int] = mapped_column(primary_key=True)
message_id: Mapped[int] = mapped_column(ForeignKey("messages.id", ondelete="CASCADE"))
section_id: Mapped[int] = mapped_column(ForeignKey("sections.id", ondelete="CASCADE"))
vertical: Mapped[str] = mapped_column(String(32))
verdict: Mapped[dict] = mapped_column(JSONB)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
class AppSetting(Base):
"""Runtime-editable settings, edited from the UI without a restart."""

View File

@@ -2,8 +2,9 @@ from datetime import datetime, timezone
import structlog
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy import func, select
from sqlalchemy import func, or_, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.exc import IntegrityError
from parser_bot.config import settings
from parser_bot.db.models import Channel, Message
@@ -19,24 +20,152 @@ from parser_bot.telegram.client import (
log = structlog.get_logger()
class PollError(RuntimeError):
status_code = 400
code = "poll_failed"
retry_after: int | None = None
def __init__(self, message: str) -> None:
super().__init__(message)
self.message = message
class PollUnauthorizedError(PollError):
status_code = 401
code = "telegram_not_authorized"
class PollChannelUnavailableError(PollError):
status_code = 422
code = "telegram_channel_unavailable"
class PollFloodWaitError(PollError):
status_code = 429
code = "telegram_flood_wait"
def __init__(self, seconds: int | None) -> None:
self.retry_after = seconds
wait = f"{seconds} sec" if seconds else "a while"
super().__init__(f"Telegram asked to wait {wait} before retrying")
class PollDuplicateChannelError(PollError):
status_code = 409
code = "telegram_channel_duplicate"
def _translate_telegram_error(exc: Exception, identifier: str) -> PollError:
name = type(exc).__name__
message = str(exc)
lower = message.lower()
if name == "FloodWaitError":
return PollFloodWaitError(getattr(exc, "seconds", None))
if name in {
"ChannelPrivateError",
"UsernameInvalidError",
"UsernameNotOccupiedError",
"InviteHashExpiredError",
"InviteHashInvalidError",
"ChatAdminRequiredError",
}:
return PollChannelUnavailableError(message)
if (
"cannot get entity" in lower
or "cannot find any entity" in lower
or "not part of" in lower
or "join the group" in lower
or "invalid channel" in lower
):
return PollChannelUnavailableError(message)
return PollError(f"Cannot poll {identifier}: {message}")
async def poll_channel(channel_id: int) -> int:
"""Poll one channel for new messages. Returns count of inserted rows."""
try:
return await _poll_channel(channel_id)
except PollError as exc:
await _record_poll_error(channel_id, exc.code, exc.message)
raise
except Exception as exc:
await _record_poll_error(channel_id, "poll_failed", str(exc))
raise
async def _poll_channel(channel_id: int) -> int:
if not await is_authorized():
raise PollUnauthorizedError(
"Telegram is not authorized: open Monitoring TG in Portal and authorize it"
)
async with session_scope() as session:
channel = await session.get(Channel, channel_id)
if channel is None or not channel.is_active:
return 0
if channel.source_channel_id is not None:
source = await session.get(Channel, channel.source_channel_id)
if source is not None:
channel.tg_id = None
channel.title = channel.title or source.title
channel.last_message_id = source.last_message_id
channel.last_polled_at = source.last_polled_at
channel.last_poll_status = "alias"
channel.last_poll_error_code = None
channel.last_poll_error = None
channel.last_poll_error_at = None
return 0
if channel.tg_id is None or channel.title is None:
try:
resolved = await resolve_channel(channel.identifier)
except Exception as exc:
raise _translate_telegram_error(exc, channel.identifier) from exc
duplicate_id = (
await session.execute(
select(Channel.id).where(
Channel.tg_id == resolved.tg_id,
Channel.id != channel.id,
)
)
).scalar_one_or_none()
if duplicate_id is not None:
source = await session.get(Channel, duplicate_id)
channel.source_channel_id = duplicate_id
channel.tg_id = None
channel.title = channel.title or resolved.title or (source.title if source else None)
channel.last_message_id = source.last_message_id if source else channel.last_message_id
channel.last_polled_at = source.last_polled_at if source else channel.last_polled_at
channel.last_poll_status = "alias"
channel.last_poll_error_code = None
channel.last_poll_error = None
channel.last_poll_error_at = None
log.info(
"linked_channel_alias",
channel_id=channel.id,
source_channel_id=duplicate_id,
tg_id=resolved.tg_id,
)
return 0
channel.tg_id = resolved.tg_id
channel.title = resolved.title
try:
await session.flush()
except IntegrityError as exc:
raise PollDuplicateChannelError(
"Telegram channel is already connected to another channel"
) from exc
try:
msgs = await fetch_new_messages(
channel.identifier,
min_id=channel.last_message_id,
limit=settings.poll_history_limit,
download_media_for_channel_id=channel.id,
)
except Exception as exc:
raise _translate_telegram_error(exc, channel.identifier) from exc
inserted = 0
for m in msgs:
@@ -70,6 +199,10 @@ async def poll_channel(channel_id: int) -> int:
channel.last_message_id or 0, msgs[-1].tg_message_id
)
channel.last_polled_at = datetime.now(timezone.utc)
channel.last_poll_status = "ok"
channel.last_poll_error_code = None
channel.last_poll_error = None
channel.last_poll_error_at = None
log.info(
"polled_channel",
@@ -81,6 +214,17 @@ async def poll_channel(channel_id: int) -> int:
return inserted
async def _record_poll_error(channel_id: int, code: str, message: str) -> None:
async with session_scope() as session:
channel = await session.get(Channel, channel_id)
if channel is None:
return
channel.last_poll_status = "error"
channel.last_poll_error_code = code
channel.last_poll_error = message[:1000]
channel.last_poll_error_at = datetime.now(timezone.utc)
async def poll_all() -> None:
if not await is_authorized():
log.debug("poll_skipped_not_authorized")
@@ -93,6 +237,13 @@ async def poll_all() -> None:
for channel_id in ids:
try:
await poll_channel(channel_id)
except PollError as exc:
log.warning(
"poll_skipped",
channel_id=channel_id,
code=exc.code,
error=exc.message,
)
except Exception as exc:
log.error("poll_failed", channel_id=channel_id, error=str(exc))
@@ -111,10 +262,15 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
if channel is None:
raise RuntimeError("channel not found")
missing_media_condition = or_(
Message.media_files.is_(None),
func.jsonb_array_length(Message.media_files) == 0,
)
pending_q = select(func.count(Message.id)).where(
Message.channel_id == channel_id,
Message.has_media.is_(True),
Message.media_files.is_(None),
missing_media_condition,
)
pending_total = (await session.execute(pending_q)).scalar_one()
@@ -124,7 +280,7 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
.where(
Message.channel_id == channel_id,
Message.has_media.is_(True),
Message.media_files.is_(None),
missing_media_condition,
)
.order_by(Message.tg_message_id.asc())
.limit(batch_size)

55
src/parser_bot/storage.py Normal file
View File

@@ -0,0 +1,55 @@
import asyncio
from pathlib import Path
from minio import Minio
from parser_bot.config import settings
_client: Minio | None = None
_bucket_ready = False
def configured() -> bool:
return bool(
settings.minio_endpoint
and settings.minio_access_key
and settings.minio_secret_key
and settings.minio_bucket
)
def client() -> Minio | None:
global _client
if not configured():
return None
if _client is None:
endpoint = settings.minio_endpoint.removeprefix("https://").removeprefix("http://")
_client = Minio(
endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
secure=settings.minio_use_ssl,
region=settings.minio_region,
)
return _client
async def upload_file(path: Path, key: str, content_type: str | None) -> None:
cli = client()
if cli is None:
raise RuntimeError("minio is not configured")
def _upload() -> None:
global _bucket_ready
if not _bucket_ready:
if not cli.bucket_exists(settings.minio_bucket):
cli.make_bucket(settings.minio_bucket, location=settings.minio_region)
_bucket_ready = True
cli.fput_object(
settings.minio_bucket,
key,
str(path),
content_type=content_type or "application/octet-stream",
)
await asyncio.to_thread(_upload)

View File

@@ -15,6 +15,7 @@ from telethon.tl.types import (
)
from parser_bot.config import settings
from parser_bot import storage
log = structlog.get_logger()
@@ -136,9 +137,26 @@ async def _download_message_media(
if path is None:
info["skipped"] = "no_file"
return [info]
filename = Path(path).name
file_path = Path(path)
filename = file_path.name
media_key = f"{channel_id}/{filename}"
public_base = settings.public_base_path.rstrip("/")
info["url"] = f"{public_base}/media/{channel_id}/{filename}"
info["url"] = f"{public_base}/media/{media_key}"
info["key"] = media_key
info["name"] = filename
if storage.configured():
try:
await storage.upload_file(file_path, media_key, mime)
info["storage"] = "minio"
try:
file_path.unlink()
except OSError:
pass
except Exception as exc:
log.warning("media_minio_upload_failed", msg_id=msg.id, key=media_key, error=str(exc))
info["storage"] = "local"
else:
info["storage"] = "local"
return [info]