Compare commits
29 Commits
a025f17544
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0f8b48869 | ||
|
|
cdbdea250d | ||
|
|
696b7eda6f | ||
|
|
bd3b54dc7d | ||
|
|
6cbfdb92e4 | ||
|
|
7cc5109cba | ||
|
|
90c0a346ed | ||
|
|
7f7f2427cb | ||
|
|
73afcb64d5 | ||
|
|
5eb8e21eda | ||
|
|
778b48cc12 | ||
|
|
1f1354e72b | ||
|
|
fd1ee0611b | ||
|
|
fc696c9e13 | ||
|
|
f66ca4b6d4 | ||
|
|
eb12190729 | ||
|
|
c4ad6c6c84 | ||
|
|
2d0d751115 | ||
|
|
ddea7002f1 | ||
|
|
29490e5f93 | ||
|
|
5165b31910 | ||
|
|
e075b11761 | ||
|
|
bc15c2e116 | ||
|
|
8259a01a88 | ||
|
|
a924cd832b | ||
|
|
4ac976b4eb | ||
|
|
fd6fc6b931 | ||
|
|
76b6230c7a | ||
|
|
4e94ce092e |
35
.gitea/scripts/hygiene-check.sh
Normal file
35
.gitea/scripts/hygiene-check.sh
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
fail=0
|
||||||
|
|
||||||
|
while IFS= read -r -d '' path; do
|
||||||
|
base="$(basename "$path")"
|
||||||
|
case "$base" in
|
||||||
|
.DS_Store|.env)
|
||||||
|
echo "::error file=$path::tracked local-only file is forbidden"
|
||||||
|
fail=1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
|
||||||
|
case "$path" in
|
||||||
|
*node_modules/*|node_modules/*)
|
||||||
|
echo "::error file=$path::tracked node_modules content is forbidden"
|
||||||
|
fail=1
|
||||||
|
;;
|
||||||
|
*.tmp|*.temp|*.bak|*.orig|*.rej|*.zip|*.tar|*.tar.gz|*.tgz|*.rar|*.7z)
|
||||||
|
echo "::error file=$path::tracked temporary/archive artifact is forbidden"
|
||||||
|
fail=1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
|
||||||
|
if [ -f "$path" ]; then
|
||||||
|
size="$(wc -c < "$path" | tr -d ' ')"
|
||||||
|
if [ "${size:-0}" -gt 52428800 ]; then
|
||||||
|
echo "::error file=$path::tracked file is larger than 50 MiB"
|
||||||
|
fail=1
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
done < <(git ls-files -z)
|
||||||
|
|
||||||
|
exit "$fail"
|
||||||
35
.gitea/workflows/ci.yml
Normal file
35
.gitea/workflows/ci.yml
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
name: CI
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
pull_request:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
hygiene:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
- run: bash .gitea/scripts/hygiene-check.sh
|
||||||
|
|
||||||
|
go:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
needs: hygiene
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
- uses: actions/setup-go@v5
|
||||||
|
with:
|
||||||
|
go-version-file: go.mod
|
||||||
|
cache: true
|
||||||
|
- run: go build ./...
|
||||||
|
- run: go test ./...
|
||||||
|
- uses: golangci/golangci-lint-action@v7
|
||||||
|
with:
|
||||||
|
version: v2.4
|
||||||
|
args: --config .golangci.yml ./...
|
||||||
|
|
||||||
|
python:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
needs: hygiene
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
- run: python3 -m compileall src alembic
|
||||||
@@ -54,5 +54,8 @@ jobs:
|
|||||||
kubectl apply -f k8s/server-deployment.yaml
|
kubectl apply -f k8s/server-deployment.yaml
|
||||||
kubectl apply -f k8s/server-service.yaml
|
kubectl apply -f k8s/server-service.yaml
|
||||||
kubectl -n monitoring-tg set image deployment/monitoring-tg-server \
|
kubectl -n monitoring-tg set image deployment/monitoring-tg-server \
|
||||||
monitoring-tg-server=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }}
|
monitoring-tg-migrate=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }} \
|
||||||
|
monitoring-tg-server=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }} \
|
||||||
|
monitoring-tg-telegram=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }} \
|
||||||
|
monitoring-tg-classifier=${{ env.NODE_REGISTRY }}/admin/monitoring-tg-server:${{ github.sha }}
|
||||||
kubectl -n monitoring-tg rollout status deployment/monitoring-tg-server --timeout=180s
|
kubectl -n monitoring-tg rollout status deployment/monitoring-tg-server --timeout=180s
|
||||||
|
|||||||
36
.golangci.yml
Normal file
36
.golangci.yml
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
version: "2"
|
||||||
|
|
||||||
|
run:
|
||||||
|
timeout: 3m
|
||||||
|
|
||||||
|
linters:
|
||||||
|
default: none
|
||||||
|
enable:
|
||||||
|
- errcheck
|
||||||
|
- govet
|
||||||
|
- ineffassign
|
||||||
|
- staticcheck
|
||||||
|
- unused
|
||||||
|
settings:
|
||||||
|
errcheck:
|
||||||
|
check-type-assertions: true
|
||||||
|
check-blank: false
|
||||||
|
exclude-functions:
|
||||||
|
- (io.Closer).Close
|
||||||
|
- (net/http.ResponseWriter).Write
|
||||||
|
- (*encoding/json.Encoder).Encode
|
||||||
|
- io.Copy
|
||||||
|
- fmt.Fprintf
|
||||||
|
- (github.com/jackc/pgx/v5.Tx).Rollback
|
||||||
|
- os.RemoveAll
|
||||||
|
staticcheck:
|
||||||
|
checks: ["all", "-SA1019", "-ST1000", "-ST1005", "-ST1020", "-ST1021", "-ST1022"]
|
||||||
|
exclusions:
|
||||||
|
rules:
|
||||||
|
- path: _test\.go
|
||||||
|
linters:
|
||||||
|
- errcheck
|
||||||
|
|
||||||
|
issues:
|
||||||
|
max-issues-per-linter: 0
|
||||||
|
max-same-issues: 0
|
||||||
@@ -4,6 +4,7 @@ WORKDIR /src
|
|||||||
|
|
||||||
COPY go.mod go.sum ./
|
COPY go.mod go.sum ./
|
||||||
COPY cmd ./cmd
|
COPY cmd ./cmd
|
||||||
|
COPY internal ./internal
|
||||||
|
|
||||||
RUN CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-classifier ./cmd/classifier \
|
RUN CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-classifier ./cmd/classifier \
|
||||||
&& CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-server ./cmd/server
|
&& CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-server ./cmd/server
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ Backend-сервис мониторинга Telegram-каналов для Porta
|
|||||||
AI-классификация работают на Go, Python оставлен только как внутренний
|
AI-классификация работают на Go, Python оставлен только как внутренний
|
||||||
MTProto/Telethon-адаптер для авторизации, опроса каналов и дозагрузки медиа.
|
MTProto/Telethon-адаптер для авторизации, опроса каналов и дозагрузки медиа.
|
||||||
Сервис сохраняет сообщения в Postgres, раскладывает каналы по
|
Сервис сохраняет сообщения в Postgres, раскладывает каналы по
|
||||||
вертикалям/подразделам и выполняет AI-анализ через OpenAI-compatible endpoint,
|
вертикалям/подразделам и выполняет AI-анализ через общий `ai-service`,
|
||||||
общий с другими сервисами портала.
|
который уже сам обращается к OpenAI-compatible backend.
|
||||||
|
|
||||||
Пользовательский UI живёт в `portal/frontend/src/app/features/monitoring-tg`.
|
Пользовательский UI живёт в `portal/frontend/src/app/features/monitoring-tg`.
|
||||||
Этот сервис не отдаёт отдельные HTML-страницы и работает как API/worker за
|
Этот сервис не отдаёт отдельные HTML-страницы и работает как API/worker за
|
||||||
@@ -33,7 +33,7 @@ MTProto/Telethon-адаптер для авторизации, опроса ка
|
|||||||
## Запуск в k8s
|
## Запуск в k8s
|
||||||
|
|
||||||
Манифесты лежат в `k8s/`. Перед применением нужно заполнить `k8s/secrets.yaml`
|
Манифесты лежат в `k8s/`. Перед применением нужно заполнить `k8s/secrets.yaml`
|
||||||
реальными Telegram-кредами и, при необходимости, `LLM_API_KEY`.
|
реальными Telegram-кредами и `AI_SERVICE_TOKEN`.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
kubectl apply -k k8s
|
kubectl apply -k k8s
|
||||||
|
|||||||
43
alembic/versions/0011_channel_aliases.py
Normal file
43
alembic/versions/0011_channel_aliases.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
"""channel aliases across department sections
|
||||||
|
|
||||||
|
Revision ID: 0011
|
||||||
|
Revises: 0010
|
||||||
|
Create Date: 2026-06-08
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
revision: str = "0011"
|
||||||
|
down_revision: Union[str, None] = "0010"
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
op.add_column("channels", sa.Column("source_channel_id", sa.Integer(), nullable=True))
|
||||||
|
op.create_foreign_key(
|
||||||
|
"fk_channels_source_channel_id",
|
||||||
|
"channels",
|
||||||
|
"channels",
|
||||||
|
["source_channel_id"],
|
||||||
|
["id"],
|
||||||
|
ondelete="SET NULL",
|
||||||
|
)
|
||||||
|
op.drop_constraint("channels_identifier_key", "channels", type_="unique")
|
||||||
|
op.create_unique_constraint(
|
||||||
|
"uq_channels_section_identifier",
|
||||||
|
"channels",
|
||||||
|
["section_id", "identifier"],
|
||||||
|
)
|
||||||
|
op.create_index("ix_channels_source_channel_id", "channels", ["source_channel_id"])
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
op.drop_index("ix_channels_source_channel_id", table_name="channels")
|
||||||
|
op.drop_constraint("uq_channels_section_identifier", "channels", type_="unique")
|
||||||
|
op.create_unique_constraint("channels_identifier_key", "channels", ["identifier"])
|
||||||
|
op.drop_constraint("fk_channels_source_channel_id", "channels", type_="foreignkey")
|
||||||
|
op.drop_column("channels", "source_channel_id")
|
||||||
38
alembic/versions/0012_message_classifications.py
Normal file
38
alembic/versions/0012_message_classifications.py
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
"""per-section message classifications
|
||||||
|
|
||||||
|
Revision ID: 0012
|
||||||
|
Revises: 0011
|
||||||
|
Create Date: 2026-06-11
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
from sqlalchemy.dialects import postgresql
|
||||||
|
|
||||||
|
revision: str = "0012"
|
||||||
|
down_revision: Union[str, None] = "0011"
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
op.create_table(
|
||||||
|
"message_classifications",
|
||||||
|
sa.Column("id", sa.Integer(), primary_key=True),
|
||||||
|
sa.Column("message_id", sa.Integer(), sa.ForeignKey("messages.id", ondelete="CASCADE"), nullable=False),
|
||||||
|
sa.Column("section_id", sa.Integer(), sa.ForeignKey("sections.id", ondelete="CASCADE"), nullable=False),
|
||||||
|
sa.Column("vertical", sa.String(length=32), nullable=False),
|
||||||
|
sa.Column("verdict", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
|
||||||
|
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
|
||||||
|
sa.UniqueConstraint("message_id", "section_id", name="uq_message_classification_section"),
|
||||||
|
)
|
||||||
|
op.create_index("ix_message_classifications_message", "message_classifications", ["message_id"])
|
||||||
|
op.create_index("ix_message_classifications_section", "message_classifications", ["section_id"])
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
op.drop_index("ix_message_classifications_section", table_name="message_classifications")
|
||||||
|
op.drop_index("ix_message_classifications_message", table_name="message_classifications")
|
||||||
|
op.drop_table("message_classifications")
|
||||||
34
alembic/versions/0013_channel_poll_status.py
Normal file
34
alembic/versions/0013_channel_poll_status.py
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
"""channel poll status
|
||||||
|
|
||||||
|
Revision ID: 0013
|
||||||
|
Revises: 0012
|
||||||
|
Create Date: 2026-06-17
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
revision: str = "0013"
|
||||||
|
down_revision: Union[str, None] = "0012"
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
op.add_column("channels", sa.Column("last_poll_status", sa.String(length=32), nullable=True))
|
||||||
|
op.add_column("channels", sa.Column("last_poll_error_code", sa.String(length=64), nullable=True))
|
||||||
|
op.add_column("channels", sa.Column("last_poll_error", sa.Text(), nullable=True))
|
||||||
|
op.add_column("channels", sa.Column("last_poll_error_at", sa.DateTime(timezone=True), nullable=True))
|
||||||
|
op.create_index("ix_channels_last_poll_status", "channels", ["last_poll_status"])
|
||||||
|
op.create_index("ix_channels_last_poll_error_code", "channels", ["last_poll_error_code"])
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
op.drop_index("ix_channels_last_poll_error_code", table_name="channels")
|
||||||
|
op.drop_index("ix_channels_last_poll_status", table_name="channels")
|
||||||
|
op.drop_column("channels", "last_poll_error_at")
|
||||||
|
op.drop_column("channels", "last_poll_error")
|
||||||
|
op.drop_column("channels", "last_poll_error_code")
|
||||||
|
op.drop_column("channels", "last_poll_status")
|
||||||
@@ -1,14 +1,11 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
@@ -17,6 +14,9 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"monitoring-tg/internal/aiservice"
|
||||||
|
"monitoring-tg/internal/dbretry"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -33,18 +33,19 @@ type config struct {
|
|||||||
PostgresPort int
|
PostgresPort int
|
||||||
|
|
||||||
LLMEnabled bool
|
LLMEnabled bool
|
||||||
LLMBaseURL string
|
|
||||||
LLMAPIKey string
|
|
||||||
LLMModel string
|
LLMModel string
|
||||||
LLMTimeout time.Duration
|
LLMTimeout time.Duration
|
||||||
LLMMaxTokens int
|
LLMMaxTokens int
|
||||||
LLMMinTextLength int
|
LLMMinTextLength int
|
||||||
ClassifyInterval time.Duration
|
ClassifyInterval time.Duration
|
||||||
ClassifyBatchSize int
|
ClassifyBatchSize int
|
||||||
|
AIServiceURL string
|
||||||
|
AIServiceToken string
|
||||||
}
|
}
|
||||||
|
|
||||||
type pendingMessage struct {
|
type pendingMessage struct {
|
||||||
ID int64
|
ID int64
|
||||||
|
SectionID int64
|
||||||
Text string
|
Text string
|
||||||
Vertical string
|
Vertical string
|
||||||
SectionSlug string
|
SectionSlug string
|
||||||
@@ -52,29 +53,10 @@ type pendingMessage struct {
|
|||||||
Extracted map[string]any
|
Extracted map[string]any
|
||||||
}
|
}
|
||||||
|
|
||||||
type chatRequest struct {
|
|
||||||
Model string `json:"model"`
|
|
||||||
Messages []chatMessage `json:"messages"`
|
|
||||||
Temperature float64 `json:"temperature"`
|
|
||||||
MaxTokens int `json:"max_tokens"`
|
|
||||||
ResponseFormat responseFmt `json:"response_format"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type chatMessage struct {
|
|
||||||
Role string `json:"role"`
|
|
||||||
Content string `json:"content"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type responseFmt struct {
|
type responseFmt struct {
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type chatResponse struct {
|
|
||||||
Choices []struct {
|
|
||||||
Message chatMessage `json:"message"`
|
|
||||||
} `json:"choices"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
cfg := loadConfig()
|
cfg := loadConfig()
|
||||||
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
||||||
@@ -88,14 +70,18 @@ func main() {
|
|||||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
defer stop()
|
defer stop()
|
||||||
|
|
||||||
pool, err := pgxpool.New(ctx, cfg.databaseURL())
|
pool, err := dbretry.Connect(ctx, cfg.databaseURL(), 2*time.Minute)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("db_connect_failed", "error", err)
|
slog.Error("db_connect_failed", "error", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
worker := &classifier{cfg: cfg, db: pool, http: &http.Client{Timeout: cfg.LLMTimeout}}
|
worker := &classifier{
|
||||||
|
cfg: cfg,
|
||||||
|
db: pool,
|
||||||
|
ai: aiservice.New(cfg.AIServiceURL, cfg.AIServiceToken, cfg.LLMTimeout),
|
||||||
|
}
|
||||||
slog.Info(
|
slog.Info(
|
||||||
"classifier_started",
|
"classifier_started",
|
||||||
"interval", cfg.ClassifyInterval.String(),
|
"interval", cfg.ClassifyInterval.String(),
|
||||||
@@ -107,11 +93,11 @@ func main() {
|
|||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
updated, err := worker.runOnce(ctx)
|
updated, enqueued, err := worker.runOnce(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("classify_batch_failed", "error", err)
|
slog.Error("classify_batch_failed", "error", err)
|
||||||
} else if updated > 0 {
|
} else if updated > 0 || enqueued > 0 {
|
||||||
slog.Info("classify_batch_done", "updated", updated)
|
slog.Info("classify_batch_done", "updated", updated, "enqueued", enqueued)
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@@ -124,67 +110,116 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type classifier struct {
|
type classifier struct {
|
||||||
cfg config
|
cfg config
|
||||||
db *pgxpool.Pool
|
db *pgxpool.Pool
|
||||||
http *http.Client
|
ai *aiservice.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *classifier) runOnce(ctx context.Context) (int, error) {
|
func (c *classifier) runOnce(ctx context.Context) (int, int, error) {
|
||||||
rows, err := c.loadPending(ctx)
|
rows, err := c.loadPending(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
if len(rows) == 0 {
|
if len(rows) == 0 {
|
||||||
return 0, nil
|
return 0, 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
byRef := make(map[string]pendingMessage, len(rows))
|
||||||
|
jobs := make([]aiservice.CreateJobRequest, 0, len(rows))
|
||||||
updated := 0
|
updated := 0
|
||||||
for _, msg := range rows {
|
for _, msg := range rows {
|
||||||
key := verdictKey(msg.Vertical)
|
key := verdictKey(msg.Vertical)
|
||||||
if _, ok := msg.Extracted[key]; ok {
|
if _, ok := msg.Extracted[key]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if len(strings.TrimSpace(msg.Text)) < c.cfg.LLMMinTextLength {
|
||||||
verdict, err := c.classify(ctx, msg)
|
verdict, err := marshalRaw(negativeVerdict(msg.Vertical))
|
||||||
if err != nil {
|
|
||||||
slog.Warn("llm_classify_failed", "message_id", msg.ID, "vertical", msg.Vertical, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if len(verdict) == 0 {
|
|
||||||
verdict, err = marshalRaw(negativeVerdict(msg.Vertical))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Warn("negative_verdict_failed", "message_id", msg.ID, "error", err)
|
slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
if err := c.saveVerdict(ctx, msg, key, verdict); err != nil {
|
||||||
|
slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "error", err)
|
||||||
if err := c.saveVerdict(ctx, msg.ID, key, verdict); err != nil {
|
continue
|
||||||
slog.Warn("save_verdict_failed", "message_id", msg.ID, "error", err)
|
}
|
||||||
|
updated++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
updated++
|
|
||||||
|
req, err := c.buildJobRequest(ctx, msg)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("build_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "vertical", msg.Vertical, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
byRef[req.OwnerRef] = msg
|
||||||
|
jobs = append(jobs, req)
|
||||||
}
|
}
|
||||||
return updated, nil
|
if len(jobs) == 0 {
|
||||||
|
return updated, 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
created, err := c.ai.CreateJobs(ctx, aiservice.CreateJobsRequest{
|
||||||
|
OwnerService: "monitoring-tg",
|
||||||
|
TaskType: "telegram_classification",
|
||||||
|
ModelProfile: c.cfg.LLMModel,
|
||||||
|
Priority: 5,
|
||||||
|
MaxAttempts: 2,
|
||||||
|
Jobs: jobs,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return updated, 0, err
|
||||||
|
}
|
||||||
|
for _, job := range created {
|
||||||
|
msg, ok := byRef[job.OwnerRef]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch job.Status {
|
||||||
|
case "done":
|
||||||
|
verdict, err := c.verdictFromJob(job, msg.Vertical)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("parse_classify_job_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil {
|
||||||
|
slog.Warn("save_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
updated++
|
||||||
|
case "failed", "cancelled":
|
||||||
|
verdict, err := marshalRaw(negativeVerdict(msg.Vertical))
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("negative_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := c.saveVerdict(ctx, msg, verdictKey(msg.Vertical), verdict); err != nil {
|
||||||
|
slog.Warn("save_failed_job_verdict_failed", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
updated++
|
||||||
|
slog.Warn("classify_job_failed_marked_negative", "message_id", msg.ID, "section_id", msg.SectionID, "job_id", job.ID, "status", job.Status, "error", derefString(job.ErrorMessage))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return updated, len(created), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error) {
|
func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error) {
|
||||||
rows, err := c.db.Query(ctx, `
|
rows, err := c.db.Query(ctx, `
|
||||||
SELECT
|
SELECT
|
||||||
m.id,
|
m.id,
|
||||||
|
s.id,
|
||||||
m.text,
|
m.text,
|
||||||
c.vertical,
|
c.vertical,
|
||||||
s.slug,
|
s.slug,
|
||||||
COALESCE(s.department_id, ''),
|
COALESCE(s.department_id, ''),
|
||||||
COALESCE(m.extracted, '{}'::jsonb)::text
|
COALESCE(mc.verdict, '{}'::jsonb)::text
|
||||||
FROM messages m
|
FROM messages m
|
||||||
JOIN channels c ON c.id = m.channel_id
|
JOIN channels src ON src.id = m.channel_id
|
||||||
|
JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id
|
||||||
JOIN sections s ON s.id = c.section_id
|
JOIN sections s ON s.id = c.section_id
|
||||||
|
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
|
||||||
WHERE m.text IS NOT NULL
|
WHERE m.text IS NOT NULL
|
||||||
AND (
|
AND mc.id IS NULL
|
||||||
(c.vertical = 'hr' AND (m.extracted IS NULL OR m.extracted->'hr_lead' IS NULL))
|
|
||||||
OR
|
|
||||||
(c.vertical <> 'hr' AND (m.extracted IS NULL OR m.extracted->'lead' IS NULL))
|
|
||||||
)
|
|
||||||
ORDER BY m.id DESC
|
ORDER BY m.id DESC
|
||||||
LIMIT $1
|
LIMIT $1
|
||||||
`, c.cfg.ClassifyBatchSize)
|
`, c.cfg.ClassifyBatchSize)
|
||||||
@@ -197,7 +232,7 @@ func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error)
|
|||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var msg pendingMessage
|
var msg pendingMessage
|
||||||
var extractedText string
|
var extractedText string
|
||||||
if err := rows.Scan(&msg.ID, &msg.Text, &msg.Vertical, &msg.SectionSlug, &msg.DepartmentID, &extractedText); err != nil {
|
if err := rows.Scan(&msg.ID, &msg.SectionID, &msg.Text, &msg.Vertical, &msg.SectionSlug, &msg.DepartmentID, &extractedText); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := json.Unmarshal([]byte(extractedText), &msg.Extracted); err != nil {
|
if err := json.Unmarshal([]byte(extractedText), &msg.Extracted); err != nil {
|
||||||
@@ -208,60 +243,56 @@ func (c *classifier) loadPending(ctx context.Context) ([]pendingMessage, error)
|
|||||||
return out, rows.Err()
|
return out, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *classifier) classify(ctx context.Context, msg pendingMessage) (json.RawMessage, error) {
|
func (c *classifier) buildJobRequest(ctx context.Context, msg pendingMessage) (aiservice.CreateJobRequest, error) {
|
||||||
if len(strings.TrimSpace(msg.Text)) < c.cfg.LLMMinTextLength {
|
|
||||||
return marshalRaw(negativeVerdict(msg.Vertical))
|
|
||||||
}
|
|
||||||
|
|
||||||
systemPrompt, err := c.resolvePrompt(ctx, msg.Vertical, msg.DepartmentID, msg.SectionSlug)
|
systemPrompt, err := c.resolvePrompt(ctx, msg.Vertical, msg.DepartmentID, msg.SectionSlug)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return aiservice.CreateJobRequest{}, err
|
||||||
}
|
}
|
||||||
|
systemPrompt = promptWithVerticalGuard(msg.Vertical, systemPrompt)
|
||||||
|
|
||||||
payload := chatRequest{
|
responseFormat, _ := json.Marshal(responseFmt{Type: "json_object"})
|
||||||
Model: c.cfg.LLMModel,
|
payload := aiservice.ChatInput{
|
||||||
Messages: []chatMessage{
|
Messages: []aiservice.Message{
|
||||||
{Role: "system", Content: systemPrompt},
|
{Role: "system", Content: systemPrompt},
|
||||||
{Role: "user", Content: buildUserPrompt(msg.Text)},
|
{Role: "user", Content: buildUserPrompt(msg.Text)},
|
||||||
},
|
},
|
||||||
Temperature: 0.1,
|
Temperature: 0.1,
|
||||||
MaxTokens: c.cfg.LLMMaxTokens,
|
MaxTokens: c.cfg.LLMMaxTokens,
|
||||||
ResponseFormat: responseFmt{Type: "json_object"},
|
ResponseFormat: responseFormat,
|
||||||
}
|
}
|
||||||
body, err := json.Marshal(payload)
|
body, err := json.Marshal(payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
return aiservice.CreateJobRequest{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ownerRef := classifyOwnerRef(msg)
|
||||||
|
return aiservice.CreateJobRequest{
|
||||||
|
OwnerService: "monitoring-tg",
|
||||||
|
OwnerRef: ownerRef,
|
||||||
|
TaskType: "telegram_classification",
|
||||||
|
ModelProfile: c.cfg.LLMModel,
|
||||||
|
Priority: 5,
|
||||||
|
MaxAttempts: 2,
|
||||||
|
Input: body,
|
||||||
|
// Classification is section-specific because prompts are section-specific.
|
||||||
|
IdempotencyKey: "monitoring-tg:telegram_classification:" + ownerRef,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *classifier) verdictFromJob(job *aiservice.Job, vertical string) (json.RawMessage, error) {
|
||||||
|
if job.Status != "done" {
|
||||||
|
msg := "ai-service job " + job.Status
|
||||||
|
if job.ErrorMessage != nil && *job.ErrorMessage != "" {
|
||||||
|
msg += ": " + *job.ErrorMessage
|
||||||
|
}
|
||||||
|
return nil, errors.New(msg)
|
||||||
|
}
|
||||||
|
var parsed aiservice.ChatResult
|
||||||
|
if err := json.Unmarshal(job.Result, &parsed); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, strings.TrimRight(c.cfg.LLMBaseURL, "/")+"/v1/chat/completions", bytes.NewReader(body))
|
raw := strings.TrimSpace(parsed.Content)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
req.Header.Set("Content-Type", "application/json")
|
|
||||||
if c.cfg.LLMAPIKey != "" {
|
|
||||||
req.Header.Set("Authorization", "Bearer "+c.cfg.LLMAPIKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := c.http.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
||||||
b, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
|
|
||||||
return nil, fmt.Errorf("llm http %d: %s", resp.StatusCode, strings.TrimSpace(string(b)))
|
|
||||||
}
|
|
||||||
|
|
||||||
var parsed chatResponse
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&parsed); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if len(parsed.Choices) == 0 {
|
|
||||||
return nil, errors.New("llm returned no choices")
|
|
||||||
}
|
|
||||||
|
|
||||||
raw := strings.TrimSpace(parsed.Choices[0].Message.Content)
|
|
||||||
if raw == "" {
|
if raw == "" {
|
||||||
return nil, errors.New("llm returned empty content")
|
return nil, errors.New("llm returned empty content")
|
||||||
}
|
}
|
||||||
@@ -269,7 +300,7 @@ func (c *classifier) classify(ctx context.Context, msg pendingMessage) (json.Raw
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
normalized, err := normalizeVerdict(msg.Vertical, block)
|
normalized, err := normalizeVerdict(vertical, block)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -297,12 +328,28 @@ func (c *classifier) resolvePrompt(ctx context.Context, vertical, departmentID,
|
|||||||
return defaultPrompt(vertical), nil
|
return defaultPrompt(vertical), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *classifier) saveVerdict(ctx context.Context, id int64, key string, verdict json.RawMessage) error {
|
func (c *classifier) saveVerdict(ctx context.Context, msg pendingMessage, key string, verdict json.RawMessage) error {
|
||||||
_, err := c.db.Exec(ctx, `
|
_, err := c.db.Exec(ctx, `
|
||||||
|
INSERT INTO message_classifications (message_id, section_id, vertical, verdict, updated_at)
|
||||||
|
VALUES ($1, $2, $3, $4::jsonb, now())
|
||||||
|
ON CONFLICT ON CONSTRAINT uq_message_classification_section DO UPDATE
|
||||||
|
SET vertical = EXCLUDED.vertical,
|
||||||
|
verdict = EXCLUDED.verdict,
|
||||||
|
updated_at = now()
|
||||||
|
`, msg.ID, msg.SectionID, msg.Vertical, string(verdict))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = c.db.Exec(ctx, `
|
||||||
UPDATE messages
|
UPDATE messages
|
||||||
SET extracted = jsonb_set(COALESCE(extracted, '{}'::jsonb), ARRAY[$2], $3::jsonb, true)
|
SET extracted = jsonb_set(
|
||||||
|
CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END,
|
||||||
|
ARRAY[$2],
|
||||||
|
$3::jsonb,
|
||||||
|
true
|
||||||
|
)
|
||||||
WHERE id = $1
|
WHERE id = $1
|
||||||
`, id, key, string(verdict))
|
`, msg.ID, key, string(verdict))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -320,6 +367,10 @@ func verdictKey(vertical string) string {
|
|||||||
return "lead"
|
return "lead"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func classifyOwnerRef(msg pendingMessage) string {
|
||||||
|
return fmt.Sprintf("%d:%d", msg.ID, msg.SectionID)
|
||||||
|
}
|
||||||
|
|
||||||
func buildUserPrompt(text string) string {
|
func buildUserPrompt(text string) string {
|
||||||
return "Текст сообщения:\n```\n" + text + "\n```\nВерни JSON."
|
return "Текст сообщения:\n```\n" + text + "\n```\nВерни JSON."
|
||||||
}
|
}
|
||||||
@@ -405,6 +456,13 @@ func asFloat(v any) (float64, bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func derefString(v *string) string {
|
||||||
|
if v == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return *v
|
||||||
|
}
|
||||||
|
|
||||||
func defaultPrompt(vertical string) string {
|
func defaultPrompt(vertical string) string {
|
||||||
if vertical == verticalHR {
|
if vertical == verticalHR {
|
||||||
return defaultHRPrompt
|
return defaultHRPrompt
|
||||||
@@ -412,6 +470,16 @@ func defaultPrompt(vertical string) string {
|
|||||||
return defaultREPrompt
|
return defaultREPrompt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func promptWithVerticalGuard(vertical, prompt string) string {
|
||||||
|
if vertical == verticalHR {
|
||||||
|
return prompt
|
||||||
|
}
|
||||||
|
if strings.Contains(prompt, realEstateOnlyGuard) {
|
||||||
|
return prompt
|
||||||
|
}
|
||||||
|
return strings.TrimSpace(prompt) + "\n\n" + realEstateOnlyGuard
|
||||||
|
}
|
||||||
|
|
||||||
func loadConfig() config {
|
func loadConfig() config {
|
||||||
return config{
|
return config{
|
||||||
PostgresUser: env("POSTGRES_USER", "parser"),
|
PostgresUser: env("POSTGRES_USER", "parser"),
|
||||||
@@ -420,14 +488,14 @@ func loadConfig() config {
|
|||||||
PostgresHost: env("POSTGRES_HOST", "db"),
|
PostgresHost: env("POSTGRES_HOST", "db"),
|
||||||
PostgresPort: envInt("POSTGRES_PORT", 5432),
|
PostgresPort: envInt("POSTGRES_PORT", 5432),
|
||||||
LLMEnabled: envBool("LLM_ENABLED", true),
|
LLMEnabled: envBool("LLM_ENABLED", true),
|
||||||
LLMBaseURL: env("LLM_BASE_URL", "http://10.2.3.5:8002"),
|
|
||||||
LLMAPIKey: env("LLM_API_KEY", ""),
|
|
||||||
LLMModel: env("LLM_MODEL", "qwen2.5-14b"),
|
LLMModel: env("LLM_MODEL", "qwen2.5-14b"),
|
||||||
LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second,
|
LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second,
|
||||||
LLMMaxTokens: envInt("LLM_MAX_TOKENS", 600),
|
LLMMaxTokens: envInt("LLM_MAX_TOKENS", 600),
|
||||||
LLMMinTextLength: envInt("LLM_MIN_TEXT_LENGTH", 20),
|
LLMMinTextLength: envInt("LLM_MIN_TEXT_LENGTH", 20),
|
||||||
ClassifyInterval: time.Duration(envInt("LLM_CLASSIFY_INTERVAL_SECONDS", 20)) * time.Second,
|
ClassifyInterval: time.Duration(envInt("LLM_CLASSIFY_INTERVAL_SECONDS", 5)) * time.Second,
|
||||||
ClassifyBatchSize: envInt("LLM_CLASSIFY_BATCH_SIZE", 5),
|
ClassifyBatchSize: envInt("LLM_CLASSIFY_BATCH_SIZE", 200),
|
||||||
|
AIServiceURL: env("AI_SERVICE_URL", ""),
|
||||||
|
AIServiceToken: env("AI_SERVICE_TOKEN", ""),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -475,6 +543,8 @@ func envBool(key string, fallback bool) bool {
|
|||||||
|
|
||||||
const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала.
|
const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала.
|
||||||
Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости.
|
Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости.
|
||||||
|
Учитывай только сделки с недвижимостью: квартиры, дома, апартаменты, участки, коммерческие помещения и другие объекты недвижимости.
|
||||||
|
Любые продажи или заявки по стройматериалам, мебели, бытовой/строительной технике, автомобилям, услугам, оборудованию и прочим товарам не являются лидами недвижимости — для них is_listing=false.
|
||||||
Отвечай строго валидным JSON без markdown:
|
Отвечай строго валидным JSON без markdown:
|
||||||
{
|
{
|
||||||
"is_listing": boolean,
|
"is_listing": boolean,
|
||||||
@@ -493,6 +563,9 @@ const defaultREPrompt = `Ты — аналитик объявлений о не
|
|||||||
}
|
}
|
||||||
summary всегда по-русски, confidence в диапазоне 0..1.`
|
summary всегда по-русски, confidence в диапазоне 0..1.`
|
||||||
|
|
||||||
|
const realEstateOnlyGuard = `Жёсткое правило для недвижимости: учитывай только сделки с недвижимостью.
|
||||||
|
Если сообщение продаёт или покупает стройматериалы, мебель, бытовую/строительную технику, автомобили, услуги, оборудование или любые другие товары/работы не по недвижимости — это НЕ лид недвижимости, верни is_listing=false.`
|
||||||
|
|
||||||
const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала.
|
const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала.
|
||||||
Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт.
|
Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт.
|
||||||
Отвечай строго валидным JSON без markdown:
|
Отвечай строго валидным JSON без markdown:
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
28
go.mod
28
go.mod
@@ -2,12 +2,34 @@ module monitoring-tg
|
|||||||
|
|
||||||
go 1.25.7
|
go 1.25.7
|
||||||
|
|
||||||
require github.com/jackc/pgx/v5 v5.9.1
|
require (
|
||||||
|
gitea.estateliga.work/admin/portal-common v0.3.0
|
||||||
|
github.com/jackc/pgx/v5 v5.9.1
|
||||||
|
github.com/minio/minio-go/v7 v7.2.0
|
||||||
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||||
|
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||||
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||||
golang.org/x/sync v0.17.0 // indirect
|
github.com/klauspost/compress v1.18.6 // indirect
|
||||||
golang.org/x/text v0.29.0 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.11 // indirect
|
||||||
|
github.com/klauspost/crc32 v1.3.0 // indirect
|
||||||
|
github.com/kr/text v0.2.0 // indirect
|
||||||
|
github.com/minio/crc64nvme v1.1.1 // indirect
|
||||||
|
github.com/minio/md5-simd v1.1.2 // indirect
|
||||||
|
github.com/philhofer/fwd v1.2.0 // indirect
|
||||||
|
github.com/rs/xid v1.6.0 // indirect
|
||||||
|
github.com/tinylib/msgp v1.6.1 // indirect
|
||||||
|
github.com/zeebo/xxh3 v1.1.0 // indirect
|
||||||
|
go.yaml.in/yaml/v3 v3.0.4 // indirect
|
||||||
|
golang.org/x/crypto v0.51.0 // indirect
|
||||||
|
golang.org/x/net v0.53.0 // indirect
|
||||||
|
golang.org/x/sync v0.20.0 // indirect
|
||||||
|
golang.org/x/sys v0.44.0 // indirect
|
||||||
|
golang.org/x/text v0.37.0 // indirect
|
||||||
|
gopkg.in/ini.v1 v1.67.2 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
64
go.sum
64
go.sum
@@ -1,6 +1,15 @@
|
|||||||
|
gitea.estateliga.work/admin/portal-common v0.3.0 h1:xpr9UeLXk5pCcNXcTVGZzJZr0Ni7An7DV0OkuYv9qVM=
|
||||||
|
gitea.estateliga.work/admin/portal-common v0.3.0/go.mod h1:C860q6g38KVMsv+mKv6k1Vm7smVRCycl+N6r63TElnk=
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
|
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||||
|
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||||
@@ -9,18 +18,65 @@ github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc=
|
|||||||
github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
|
github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
|
||||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||||
|
github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao=
|
||||||
|
github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
|
||||||
|
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||||
|
github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
|
||||||
|
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
||||||
|
github.com/klauspost/crc32 v1.3.0 h1:sSmTt3gUt81RP655XGZPElI0PelVTZ6YwCRnPSupoFM=
|
||||||
|
github.com/klauspost/crc32 v1.3.0/go.mod h1:D7kQaZhnkX/Y0tstFGf8VUzv2UofNGqCjnC3zdHB0Hw=
|
||||||
|
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
|
||||||
|
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
|
||||||
|
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||||
|
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||||
|
github.com/minio/crc64nvme v1.1.1 h1:8dwx/Pz49suywbO+auHCBpCtlW1OfpcLN7wYgVR6wAI=
|
||||||
|
github.com/minio/crc64nvme v1.1.1/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg=
|
||||||
|
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
|
||||||
|
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
|
||||||
|
github.com/minio/minio-go/v7 v7.2.0 h1:RCJM0R1XOsRs+A3x3UCaf3ZYbByDaLjFeAi+YCQEPhs=
|
||||||
|
github.com/minio/minio-go/v7 v7.2.0/go.mod h1:EU9hENAStx/xXduNdrGO5e4X5vk19NtgB+RIPjZO8o0=
|
||||||
|
github.com/philhofer/fwd v1.2.0 h1:e6DnBTl7vGY+Gz322/ASL4Gyp1FspeMvx1RNDoToZuM=
|
||||||
|
github.com/philhofer/fwd v1.2.0/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||||
|
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||||
|
github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU=
|
||||||
|
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||||
|
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||||
|
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
|
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
|
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||||
|
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||||
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
|
github.com/tinylib/msgp v1.6.1 h1:ESRv8eL3u+DNHUoSAAQRE50Hm162zqAnBoGv9PzScPY=
|
||||||
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
github.com/tinylib/msgp v1.6.1/go.mod h1:RSp0LW9oSxFut3KzESt5Voq4GVWyS+PSulT77roAqEA=
|
||||||
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
|
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
|
||||||
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
|
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
|
||||||
|
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
|
||||||
|
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
|
||||||
|
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
|
||||||
|
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
||||||
|
golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
|
||||||
|
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
|
||||||
|
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
|
||||||
|
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
|
||||||
|
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
|
||||||
|
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
|
||||||
|
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
|
||||||
|
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||||
|
golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
|
||||||
|
golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||||
|
gopkg.in/ini.v1 v1.67.2 h1:JtOSMb9OuaCZKr7h5D/h6iii14sK0hLbplTc6frx4Ss=
|
||||||
|
gopkg.in/ini.v1 v1.67.2/go.mod h1:x/cyOwCgZqOkJoDIJ3c1KNHMo10+nLGAhh+kn3Zizss=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
|||||||
308
internal/aiservice/client.go
Normal file
308
internal/aiservice/client.go
Normal file
@@ -0,0 +1,308 @@
|
|||||||
|
package aiservice
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
baseURL string
|
||||||
|
token string
|
||||||
|
http *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
type Message struct {
|
||||||
|
Role string `json:"role"`
|
||||||
|
Content string `json:"content"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChatInput struct {
|
||||||
|
Messages []Message `json:"messages"`
|
||||||
|
Temperature float64 `json:"temperature"`
|
||||||
|
MaxTokens int `json:"max_tokens,omitempty"`
|
||||||
|
ResponseFormat json.RawMessage `json:"response_format,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CreateJobRequest struct {
|
||||||
|
OwnerService string `json:"owner_service"`
|
||||||
|
OwnerRef string `json:"owner_ref"`
|
||||||
|
TaskType string `json:"task_type"`
|
||||||
|
ModelProfile string `json:"model_profile"`
|
||||||
|
Priority int `json:"priority"`
|
||||||
|
MaxAttempts int `json:"max_attempts"`
|
||||||
|
Input json.RawMessage `json:"input"`
|
||||||
|
IdempotencyKey string `json:"idempotency_key,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CreateJobsRequest struct {
|
||||||
|
OwnerService string `json:"owner_service,omitempty"`
|
||||||
|
TaskType string `json:"task_type,omitempty"`
|
||||||
|
ModelProfile string `json:"model_profile,omitempty"`
|
||||||
|
Priority int `json:"priority,omitempty"`
|
||||||
|
MaxAttempts int `json:"max_attempts,omitempty"`
|
||||||
|
Jobs []CreateJobRequest `json:"jobs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Job struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
OwnerService string `json:"owner_service,omitempty"`
|
||||||
|
OwnerRef string `json:"owner_ref,omitempty"`
|
||||||
|
TaskType string `json:"task_type,omitempty"`
|
||||||
|
ModelProfile string `json:"model_profile,omitempty"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Result json.RawMessage `json:"result,omitempty"`
|
||||||
|
ErrorCode *string `json:"error_code,omitempty"`
|
||||||
|
ErrorMessage *string `json:"error_message,omitempty"`
|
||||||
|
IdempotencyKey *string `json:"idempotency_key,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChatResult struct {
|
||||||
|
Content string `json:"content"`
|
||||||
|
Model string `json:"model"`
|
||||||
|
DurationMS int64 `json:"duration_ms"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ProvidersStatus struct {
|
||||||
|
At time.Time `json:"at"`
|
||||||
|
Providers []ProviderStatus `json:"providers"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ProviderStatus struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Configured bool `json:"configured"`
|
||||||
|
OK bool `json:"ok"`
|
||||||
|
URL string `json:"url,omitempty"`
|
||||||
|
Model string `json:"model,omitempty"`
|
||||||
|
LatencyMS int64 `json:"latency_ms,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Stats struct {
|
||||||
|
At time.Time `json:"at"`
|
||||||
|
Owners []OwnerStat `json:"owners,omitempty"`
|
||||||
|
Errors []ErrorStat `json:"errors,omitempty"`
|
||||||
|
Backlog []BacklogStat `json:"backlog,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type OwnerStat struct {
|
||||||
|
OwnerService string `json:"owner_service"`
|
||||||
|
TaskType string `json:"task_type"`
|
||||||
|
ModelProfile string `json:"model_profile"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Total int64 `json:"total"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ErrorStat struct {
|
||||||
|
OwnerService string `json:"owner_service,omitempty"`
|
||||||
|
TaskType string `json:"task_type"`
|
||||||
|
ModelProfile string `json:"model_profile"`
|
||||||
|
ErrorCode string `json:"error_code"`
|
||||||
|
Total int64 `json:"total"`
|
||||||
|
Last24h int64 `json:"last_24h"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type BacklogStat struct {
|
||||||
|
OwnerService string `json:"owner_service"`
|
||||||
|
TaskType string `json:"task_type"`
|
||||||
|
ModelProfile string `json:"model_profile"`
|
||||||
|
Pending int64 `json:"pending"`
|
||||||
|
Running int64 `json:"running"`
|
||||||
|
StaleRunning int64 `json:"stale_running"`
|
||||||
|
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
|
||||||
|
OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(baseURL, token string, timeout time.Duration) *Client {
|
||||||
|
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
|
||||||
|
if baseURL == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if timeout <= 0 {
|
||||||
|
timeout = 2 * time.Minute
|
||||||
|
}
|
||||||
|
return &Client{
|
||||||
|
baseURL: baseURL,
|
||||||
|
token: strings.TrimSpace(token),
|
||||||
|
http: &http.Client{Timeout: timeout},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) CreateJob(ctx context.Context, req CreateJobRequest) (*Job, error) {
|
||||||
|
if c == nil {
|
||||||
|
return nil, fmt.Errorf("ai-service not configured")
|
||||||
|
}
|
||||||
|
body, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("marshal ai job: %w", err)
|
||||||
|
}
|
||||||
|
httpReq, err := c.request(ctx, http.MethodPost, "/api/v1/jobs", body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp, err := c.http.Do(httpReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create ai job: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
|
return nil, fmt.Errorf("create ai job: http %d: %s", resp.StatusCode, readSmall(resp.Body))
|
||||||
|
}
|
||||||
|
var out Job
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode ai job: %w", err)
|
||||||
|
}
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) CreateJobs(ctx context.Context, req CreateJobsRequest) ([]*Job, error) {
|
||||||
|
if c == nil {
|
||||||
|
return nil, fmt.Errorf("ai-service not configured")
|
||||||
|
}
|
||||||
|
body, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("marshal ai jobs: %w", err)
|
||||||
|
}
|
||||||
|
httpReq, err := c.request(ctx, http.MethodPost, "/api/v1/jobs/batch", body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp, err := c.http.Do(httpReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create ai jobs: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
|
return nil, fmt.Errorf("create ai jobs: http %d: %s", resp.StatusCode, readSmall(resp.Body))
|
||||||
|
}
|
||||||
|
var out struct {
|
||||||
|
Jobs []*Job `json:"jobs"`
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode ai jobs: %w", err)
|
||||||
|
}
|
||||||
|
return out.Jobs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) GetJob(ctx context.Context, id string) (*Job, error) {
|
||||||
|
if c == nil || strings.TrimSpace(id) == "" {
|
||||||
|
return nil, fmt.Errorf("ai job id is required")
|
||||||
|
}
|
||||||
|
req, err := c.request(ctx, http.MethodGet, "/api/v1/jobs/"+id, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp, err := c.http.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("get ai job: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
|
return nil, fmt.Errorf("get ai job: http %d: %s", resp.StatusCode, readSmall(resp.Body))
|
||||||
|
}
|
||||||
|
var out Job
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode ai job: %w", err)
|
||||||
|
}
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) WaitJob(ctx context.Context, id string, pollInterval time.Duration) (*Job, error) {
|
||||||
|
if pollInterval <= 0 {
|
||||||
|
pollInterval = 2 * time.Second
|
||||||
|
}
|
||||||
|
ticker := time.NewTicker(pollInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
job, err := c.GetJob(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
switch job.Status {
|
||||||
|
case "done", "failed", "cancelled":
|
||||||
|
return job, nil
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
case <-ticker.C:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) ProvidersStatus(ctx context.Context) (*ProvidersStatus, error) {
|
||||||
|
if c == nil {
|
||||||
|
return nil, fmt.Errorf("ai-service not configured")
|
||||||
|
}
|
||||||
|
req, err := c.request(ctx, http.MethodGet, "/api/v1/providers/status", nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp, err := c.http.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("ai providers status: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
|
return nil, fmt.Errorf("ai providers status: http %d: %s", resp.StatusCode, readSmall(resp.Body))
|
||||||
|
}
|
||||||
|
var out ProvidersStatus
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode ai providers status: %w", err)
|
||||||
|
}
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Stats(ctx context.Context) (*Stats, error) {
|
||||||
|
if c == nil {
|
||||||
|
return nil, fmt.Errorf("ai-service not configured")
|
||||||
|
}
|
||||||
|
req, err := c.request(ctx, http.MethodGet, "/api/v1/stats", nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp, err := c.http.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("ai stats: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
|
return nil, fmt.Errorf("ai stats: http %d: %s", resp.StatusCode, readSmall(resp.Body))
|
||||||
|
}
|
||||||
|
var out Stats
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode ai stats: %w", err)
|
||||||
|
}
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) {
|
||||||
|
var r io.Reader
|
||||||
|
if body != nil {
|
||||||
|
r = bytes.NewReader(body)
|
||||||
|
}
|
||||||
|
req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if body != nil {
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
}
|
||||||
|
if c.token != "" {
|
||||||
|
req.Header.Set("Authorization", "Bearer "+c.token)
|
||||||
|
}
|
||||||
|
return req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readSmall(r io.Reader) string {
|
||||||
|
body, err := io.ReadAll(io.LimitReader(r, 1024))
|
||||||
|
if err != nil {
|
||||||
|
return err.Error()
|
||||||
|
}
|
||||||
|
return strings.TrimSpace(string(body))
|
||||||
|
}
|
||||||
44
internal/dbretry/dbretry.go
Normal file
44
internal/dbretry/dbretry.go
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
package dbretry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Connect(ctx context.Context, databaseURL string, maxWait time.Duration) (*pgxpool.Pool, error) {
|
||||||
|
deadline := time.Now().Add(maxWait)
|
||||||
|
var lastErr error
|
||||||
|
|
||||||
|
for attempt := 1; ; attempt++ {
|
||||||
|
pool, err := pgxpool.New(ctx, databaseURL)
|
||||||
|
if err == nil {
|
||||||
|
if pingErr := pool.Ping(ctx); pingErr == nil {
|
||||||
|
return pool, nil
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("ping postgres: %w", pingErr)
|
||||||
|
pool.Close()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("connect postgres: %w", err)
|
||||||
|
}
|
||||||
|
lastErr = err
|
||||||
|
|
||||||
|
if time.Now().After(deadline) {
|
||||||
|
return nil, fmt.Errorf("connect postgres after retry: %w", lastErr)
|
||||||
|
}
|
||||||
|
sleep := time.Duration(attempt) * time.Second
|
||||||
|
if sleep > 5*time.Second {
|
||||||
|
sleep = 5 * time.Second
|
||||||
|
}
|
||||||
|
timer := time.NewTimer(sleep)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
timer.Stop()
|
||||||
|
return nil, fmt.Errorf("connect postgres cancelled: %w", ctx.Err())
|
||||||
|
case <-timer.C:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -14,10 +14,15 @@ data:
|
|||||||
POSTGRES_DB: "parser"
|
POSTGRES_DB: "parser"
|
||||||
TG_SESSION_PATH: "/data/session/parser.session"
|
TG_SESSION_PATH: "/data/session/parser.session"
|
||||||
MEDIA_DIR: "/data/media"
|
MEDIA_DIR: "/data/media"
|
||||||
|
MINIO_ENDPOINT: "s3-minio.estateliga.work"
|
||||||
|
MINIO_BUCKET: "monitoring-tg-media"
|
||||||
|
MINIO_USE_SSL: "1"
|
||||||
|
MINIO_INSECURE_SKIP_VERIFY: "0"
|
||||||
|
MINIO_REGION: "us-east-1"
|
||||||
POLL_INTERVAL_SECONDS: "60"
|
POLL_INTERVAL_SECONDS: "60"
|
||||||
POLL_HISTORY_LIMIT: "50"
|
POLL_HISTORY_LIMIT: "50"
|
||||||
LLM_ENABLED: "1"
|
LLM_ENABLED: "1"
|
||||||
LLM_BASE_URL: "http://10.2.3.5:8002"
|
|
||||||
LLM_MODEL: "qwen2.5-14b"
|
LLM_MODEL: "qwen2.5-14b"
|
||||||
LLM_MAX_TOKENS: "600"
|
LLM_MAX_TOKENS: "600"
|
||||||
LLM_CLASSIFIER_OWNER: "go"
|
LLM_CLASSIFIER_OWNER: "go"
|
||||||
|
AI_SERVICE_URL: "http://ai-service.ai-service.svc.cluster.local:8080"
|
||||||
|
|||||||
@@ -10,7 +10,10 @@ stringData:
|
|||||||
TG_PHONE: "+971524994695"
|
TG_PHONE: "+971524994695"
|
||||||
TG_SESSION_STRING: ""
|
TG_SESSION_STRING: ""
|
||||||
POSTGRES_PASSWORD: "parser"
|
POSTGRES_PASSWORD: "parser"
|
||||||
LLM_API_KEY: ""
|
INTERNAL_API_KEY: "36fe89ed40c01fdc54d3cf4e3fcacc8751dc456a4a1acd394e9fed48257c5734"
|
||||||
|
AI_SERVICE_TOKEN: "d18bcacf9e02bae1806ee6b6eeda62b95be6a915c0a22936d9a700128b275442"
|
||||||
|
MINIO_ACCESS_KEY: "admjn"
|
||||||
|
MINIO_SECRET_KEY: "TropicalMacaw9Fantasize"
|
||||||
---
|
---
|
||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: Secret
|
kind: Secret
|
||||||
|
|||||||
@@ -17,6 +17,8 @@ metadata:
|
|||||||
namespace: monitoring-tg
|
namespace: monitoring-tg
|
||||||
spec:
|
spec:
|
||||||
replicas: 1
|
replicas: 1
|
||||||
|
strategy:
|
||||||
|
type: Recreate
|
||||||
selector:
|
selector:
|
||||||
matchLabels:
|
matchLabels:
|
||||||
app: monitoring-tg-server
|
app: monitoring-tg-server
|
||||||
@@ -25,6 +27,10 @@ spec:
|
|||||||
labels:
|
labels:
|
||||||
app: monitoring-tg-server
|
app: monitoring-tg-server
|
||||||
spec:
|
spec:
|
||||||
|
hostAliases:
|
||||||
|
- ip: "77.105.173.42"
|
||||||
|
hostnames:
|
||||||
|
- "s3-minio.estateliga.work"
|
||||||
terminationGracePeriodSeconds: 20
|
terminationGracePeriodSeconds: 20
|
||||||
securityContext:
|
securityContext:
|
||||||
fsGroup: 1000
|
fsGroup: 1000
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ dependencies = [
|
|||||||
"pydantic>=2.9",
|
"pydantic>=2.9",
|
||||||
"pydantic-settings>=2.6",
|
"pydantic-settings>=2.6",
|
||||||
"structlog>=24.4",
|
"structlog>=24.4",
|
||||||
|
"minio>=7.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
|
|||||||
@@ -13,6 +13,19 @@ def portal_department_id(request: Request) -> str | None:
|
|||||||
return value or None
|
return value or None
|
||||||
|
|
||||||
|
|
||||||
|
def portal_department_ids(request: Request) -> list[str]:
|
||||||
|
raw = (request.headers.get("x-user-department-ids") or "").strip()
|
||||||
|
out: list[str] = []
|
||||||
|
for part in raw.split(","):
|
||||||
|
value = part.strip()
|
||||||
|
if value and value not in out:
|
||||||
|
out.append(value)
|
||||||
|
current = portal_department_id(request)
|
||||||
|
if current and current not in out:
|
||||||
|
out.append(current)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
def is_department_head_request(request: Request) -> bool:
|
def is_department_head_request(request: Request) -> bool:
|
||||||
return request.headers.get("x-user-is-department-head") == "1"
|
return request.headers.get("x-user-is-department-head") == "1"
|
||||||
|
|
||||||
|
|||||||
@@ -7,14 +7,14 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
|||||||
|
|
||||||
from parser_bot.access import (
|
from parser_bot.access import (
|
||||||
is_admin_request,
|
is_admin_request,
|
||||||
portal_department_id,
|
portal_department_ids,
|
||||||
require_department_manager,
|
require_department_manager,
|
||||||
require_telegram_auth_manager,
|
require_telegram_auth_manager,
|
||||||
)
|
)
|
||||||
from parser_bot.config import settings
|
from parser_bot.config import settings
|
||||||
from parser_bot.db.models import Channel, Section
|
from parser_bot.db.models import Channel, Section
|
||||||
from parser_bot.db.session import get_session
|
from parser_bot.db.session import get_session
|
||||||
from parser_bot.scheduler.poller import backfill_media, poll_channel
|
from parser_bot.scheduler.poller import PollError, backfill_media, poll_channel
|
||||||
from parser_bot.telegram import client as tg
|
from parser_bot.telegram import client as tg
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
@@ -39,13 +39,13 @@ class AuthCodeResult(BaseModel):
|
|||||||
needs_password: bool
|
needs_password: bool
|
||||||
|
|
||||||
|
|
||||||
def _department_scope(request: Request) -> str | None:
|
def _department_scopes(request: Request) -> list[str] | None:
|
||||||
if is_admin_request(request):
|
if is_admin_request(request):
|
||||||
return None
|
return None
|
||||||
dept_id = portal_department_id(request)
|
dept_ids = portal_department_ids(request)
|
||||||
if not dept_id:
|
if not dept_ids:
|
||||||
raise HTTPException(status_code=403, detail="department is required")
|
raise HTTPException(status_code=403, detail="department is required")
|
||||||
return dept_id
|
return dept_ids
|
||||||
|
|
||||||
|
|
||||||
async def _require_channel_scope(
|
async def _require_channel_scope(
|
||||||
@@ -55,7 +55,7 @@ async def _require_channel_scope(
|
|||||||
vertical: str | None,
|
vertical: str | None,
|
||||||
section: str | None,
|
section: str | None,
|
||||||
) -> None:
|
) -> None:
|
||||||
department_id = _department_scope(request)
|
department_ids = _department_scopes(request)
|
||||||
stmt = (
|
stmt = (
|
||||||
select(Channel.id)
|
select(Channel.id)
|
||||||
.join(Section, Section.id == Channel.section_id)
|
.join(Section, Section.id == Channel.section_id)
|
||||||
@@ -65,13 +65,25 @@ async def _require_channel_scope(
|
|||||||
stmt = stmt.where(Channel.vertical == vertical)
|
stmt = stmt.where(Channel.vertical == vertical)
|
||||||
if section:
|
if section:
|
||||||
stmt = stmt.where(Section.slug == section)
|
stmt = stmt.where(Section.slug == section)
|
||||||
if department_id is not None:
|
if department_ids is not None:
|
||||||
stmt = stmt.where(Section.department_id == department_id)
|
stmt = stmt.where(Section.department_id.in_(department_ids))
|
||||||
exists = (await session.execute(stmt)).scalar_one_or_none()
|
exists = (await session.execute(stmt)).scalar_one_or_none()
|
||||||
if exists is None:
|
if exists is None:
|
||||||
raise HTTPException(status_code=404)
|
raise HTTPException(status_code=404)
|
||||||
|
|
||||||
|
|
||||||
|
def _poll_skipped_result(exc: PollError) -> dict[str, Any]:
|
||||||
|
result: dict[str, Any] = {
|
||||||
|
"inserted": 0,
|
||||||
|
"status": "skipped",
|
||||||
|
"code": exc.code,
|
||||||
|
"message": exc.message,
|
||||||
|
}
|
||||||
|
if exc.retry_after is not None:
|
||||||
|
result["retry_after"] = exc.retry_after
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
@router.get("/auth/status", response_model=AuthStatus, dependencies=[Depends(require_telegram_auth_manager)])
|
@router.get("/auth/status", response_model=AuthStatus, dependencies=[Depends(require_telegram_auth_manager)])
|
||||||
async def auth_status() -> AuthStatus:
|
async def auth_status() -> AuthStatus:
|
||||||
authorized = await tg.is_authorized()
|
authorized = await tg.is_authorized()
|
||||||
@@ -126,10 +138,13 @@ async def trigger_poll(
|
|||||||
vertical: str | None = Query(None),
|
vertical: str | None = Query(None),
|
||||||
section: str | None = Query(None),
|
section: str | None = Query(None),
|
||||||
session: AsyncSession = Depends(get_session),
|
session: AsyncSession = Depends(get_session),
|
||||||
) -> dict[str, int]:
|
) -> dict[str, Any]:
|
||||||
await _require_channel_scope(session, request, channel_id, vertical, section)
|
await _require_channel_scope(session, request, channel_id, vertical, section)
|
||||||
inserted = await poll_channel(channel_id)
|
try:
|
||||||
return {"inserted": inserted}
|
inserted = await poll_channel(channel_id)
|
||||||
|
except PollError as exc:
|
||||||
|
return _poll_skipped_result(exc)
|
||||||
|
return {"inserted": inserted, "status": "ok"}
|
||||||
|
|
||||||
|
|
||||||
@router.post(
|
@router.post(
|
||||||
@@ -167,7 +182,7 @@ async def trigger_poll_all(
|
|||||||
section: str | None = Query(None),
|
section: str | None = Query(None),
|
||||||
session: AsyncSession = Depends(get_session),
|
session: AsyncSession = Depends(get_session),
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
department_id = _department_scope(request)
|
department_ids = _department_scopes(request)
|
||||||
stmt = (
|
stmt = (
|
||||||
select(Channel.id)
|
select(Channel.id)
|
||||||
.join(Section, Section.id == Channel.section_id)
|
.join(Section, Section.id == Channel.section_id)
|
||||||
@@ -175,8 +190,8 @@ async def trigger_poll_all(
|
|||||||
)
|
)
|
||||||
if section:
|
if section:
|
||||||
stmt = stmt.where(Section.slug == section)
|
stmt = stmt.where(Section.slug == section)
|
||||||
if department_id is not None:
|
if department_ids is not None:
|
||||||
stmt = stmt.where(Section.department_id == department_id)
|
stmt = stmt.where(Section.department_id.in_(department_ids))
|
||||||
result = await session.execute(stmt)
|
result = await session.execute(stmt)
|
||||||
ids = [row[0] for row in result.all()]
|
ids = [row[0] for row in result.all()]
|
||||||
background.add_task(_poll_all_in_background, ids)
|
background.add_task(_poll_all_in_background, ids)
|
||||||
|
|||||||
@@ -28,6 +28,12 @@ class Settings(BaseSettings):
|
|||||||
|
|
||||||
media_dir: str = Field("/data/media", alias="MEDIA_DIR")
|
media_dir: str = Field("/data/media", alias="MEDIA_DIR")
|
||||||
media_max_bytes: int = Field(20 * 1024 * 1024, alias="MEDIA_MAX_BYTES")
|
media_max_bytes: int = Field(20 * 1024 * 1024, alias="MEDIA_MAX_BYTES")
|
||||||
|
minio_endpoint: str = Field("", alias="MINIO_ENDPOINT")
|
||||||
|
minio_access_key: str = Field("", alias="MINIO_ACCESS_KEY")
|
||||||
|
minio_secret_key: str = Field("", alias="MINIO_SECRET_KEY")
|
||||||
|
minio_bucket: str = Field("monitoring-tg-media", alias="MINIO_BUCKET")
|
||||||
|
minio_use_ssl: bool = Field(True, alias="MINIO_USE_SSL")
|
||||||
|
minio_region: str = Field("us-east-1", alias="MINIO_REGION")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def database_url(self) -> str:
|
def database_url(self) -> str:
|
||||||
|
|||||||
@@ -54,12 +54,16 @@ class Section(Base):
|
|||||||
|
|
||||||
class Channel(Base):
|
class Channel(Base):
|
||||||
__tablename__ = "channels"
|
__tablename__ = "channels"
|
||||||
|
__table_args__ = (
|
||||||
|
UniqueConstraint("section_id", "identifier", name="uq_channels_section_identifier"),
|
||||||
|
Index("ix_channels_source_channel_id", "source_channel_id"),
|
||||||
|
)
|
||||||
|
|
||||||
id: Mapped[int] = mapped_column(primary_key=True)
|
id: Mapped[int] = mapped_column(primary_key=True)
|
||||||
# Telegram numeric channel id (peer id), nullable until first resolve
|
# Telegram numeric channel id (peer id), nullable until first resolve
|
||||||
tg_id: Mapped[int | None] = mapped_column(BigInteger, unique=True, nullable=True)
|
tg_id: Mapped[int | None] = mapped_column(BigInteger, unique=True, nullable=True)
|
||||||
# Username or t.me/joinchat link supplied by user
|
# Username or t.me/joinchat link supplied by user
|
||||||
identifier: Mapped[str] = mapped_column(String(255), unique=True)
|
identifier: Mapped[str] = mapped_column(String(255))
|
||||||
title: Mapped[str | None] = mapped_column(String(512), nullable=True)
|
title: Mapped[str | None] = mapped_column(String(512), nullable=True)
|
||||||
# 'real_estate' or 'hr' — picks which LLM prompt and lead schema is used
|
# 'real_estate' or 'hr' — picks which LLM prompt and lead schema is used
|
||||||
vertical: Mapped[str] = mapped_column(
|
vertical: Mapped[str] = mapped_column(
|
||||||
@@ -68,9 +72,16 @@ class Channel(Base):
|
|||||||
section_id: Mapped[int] = mapped_column(
|
section_id: Mapped[int] = mapped_column(
|
||||||
ForeignKey("sections.id", ondelete="RESTRICT"), index=True
|
ForeignKey("sections.id", ondelete="RESTRICT"), index=True
|
||||||
)
|
)
|
||||||
|
source_channel_id: Mapped[int | None] = mapped_column(
|
||||||
|
ForeignKey("channels.id", ondelete="SET NULL"), nullable=True
|
||||||
|
)
|
||||||
is_active: Mapped[bool] = mapped_column(default=True, server_default="true")
|
is_active: Mapped[bool] = mapped_column(default=True, server_default="true")
|
||||||
last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
|
last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
|
||||||
last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
||||||
|
last_poll_status: Mapped[str | None] = mapped_column(String(32), nullable=True)
|
||||||
|
last_poll_error_code: Mapped[str | None] = mapped_column(String(64), nullable=True)
|
||||||
|
last_poll_error: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||||
|
last_poll_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
||||||
created_at: Mapped[datetime] = mapped_column(
|
created_at: Mapped[datetime] = mapped_column(
|
||||||
DateTime(timezone=True), server_default=func.now()
|
DateTime(timezone=True), server_default=func.now()
|
||||||
)
|
)
|
||||||
@@ -112,6 +123,24 @@ class Message(Base):
|
|||||||
channel: Mapped[Channel] = relationship(back_populates="messages")
|
channel: Mapped[Channel] = relationship(back_populates="messages")
|
||||||
|
|
||||||
|
|
||||||
|
class MessageClassification(Base):
|
||||||
|
__tablename__ = "message_classifications"
|
||||||
|
__table_args__ = (
|
||||||
|
UniqueConstraint("message_id", "section_id", name="uq_message_classification_section"),
|
||||||
|
Index("ix_message_classifications_message", "message_id"),
|
||||||
|
Index("ix_message_classifications_section", "section_id"),
|
||||||
|
)
|
||||||
|
|
||||||
|
id: Mapped[int] = mapped_column(primary_key=True)
|
||||||
|
message_id: Mapped[int] = mapped_column(ForeignKey("messages.id", ondelete="CASCADE"))
|
||||||
|
section_id: Mapped[int] = mapped_column(ForeignKey("sections.id", ondelete="CASCADE"))
|
||||||
|
vertical: Mapped[str] = mapped_column(String(32))
|
||||||
|
verdict: Mapped[dict] = mapped_column(JSONB)
|
||||||
|
updated_at: Mapped[datetime] = mapped_column(
|
||||||
|
DateTime(timezone=True), server_default=func.now()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class AppSetting(Base):
|
class AppSetting(Base):
|
||||||
"""Runtime-editable settings, edited from the UI without a restart."""
|
"""Runtime-editable settings, edited from the UI without a restart."""
|
||||||
|
|
||||||
|
|||||||
@@ -2,8 +2,9 @@ from datetime import datetime, timezone
|
|||||||
|
|
||||||
import structlog
|
import structlog
|
||||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||||
from sqlalchemy import func, select
|
from sqlalchemy import func, or_, select
|
||||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||||
|
from sqlalchemy.exc import IntegrityError
|
||||||
|
|
||||||
from parser_bot.config import settings
|
from parser_bot.config import settings
|
||||||
from parser_bot.db.models import Channel, Message
|
from parser_bot.db.models import Channel, Message
|
||||||
@@ -19,24 +20,152 @@ from parser_bot.telegram.client import (
|
|||||||
log = structlog.get_logger()
|
log = structlog.get_logger()
|
||||||
|
|
||||||
|
|
||||||
|
class PollError(RuntimeError):
|
||||||
|
status_code = 400
|
||||||
|
code = "poll_failed"
|
||||||
|
retry_after: int | None = None
|
||||||
|
|
||||||
|
def __init__(self, message: str) -> None:
|
||||||
|
super().__init__(message)
|
||||||
|
self.message = message
|
||||||
|
|
||||||
|
|
||||||
|
class PollUnauthorizedError(PollError):
|
||||||
|
status_code = 401
|
||||||
|
code = "telegram_not_authorized"
|
||||||
|
|
||||||
|
|
||||||
|
class PollChannelUnavailableError(PollError):
|
||||||
|
status_code = 422
|
||||||
|
code = "telegram_channel_unavailable"
|
||||||
|
|
||||||
|
|
||||||
|
class PollFloodWaitError(PollError):
|
||||||
|
status_code = 429
|
||||||
|
code = "telegram_flood_wait"
|
||||||
|
|
||||||
|
def __init__(self, seconds: int | None) -> None:
|
||||||
|
self.retry_after = seconds
|
||||||
|
wait = f"{seconds} sec" if seconds else "a while"
|
||||||
|
super().__init__(f"Telegram asked to wait {wait} before retrying")
|
||||||
|
|
||||||
|
|
||||||
|
class PollDuplicateChannelError(PollError):
|
||||||
|
status_code = 409
|
||||||
|
code = "telegram_channel_duplicate"
|
||||||
|
|
||||||
|
|
||||||
|
def _translate_telegram_error(exc: Exception, identifier: str) -> PollError:
|
||||||
|
name = type(exc).__name__
|
||||||
|
message = str(exc)
|
||||||
|
lower = message.lower()
|
||||||
|
if name == "FloodWaitError":
|
||||||
|
return PollFloodWaitError(getattr(exc, "seconds", None))
|
||||||
|
if name in {
|
||||||
|
"ChannelPrivateError",
|
||||||
|
"UsernameInvalidError",
|
||||||
|
"UsernameNotOccupiedError",
|
||||||
|
"InviteHashExpiredError",
|
||||||
|
"InviteHashInvalidError",
|
||||||
|
"ChatAdminRequiredError",
|
||||||
|
}:
|
||||||
|
return PollChannelUnavailableError(message)
|
||||||
|
if (
|
||||||
|
"cannot get entity" in lower
|
||||||
|
or "cannot find any entity" in lower
|
||||||
|
or "not part of" in lower
|
||||||
|
or "join the group" in lower
|
||||||
|
or "invalid channel" in lower
|
||||||
|
):
|
||||||
|
return PollChannelUnavailableError(message)
|
||||||
|
return PollError(f"Cannot poll {identifier}: {message}")
|
||||||
|
|
||||||
|
|
||||||
async def poll_channel(channel_id: int) -> int:
|
async def poll_channel(channel_id: int) -> int:
|
||||||
"""Poll one channel for new messages. Returns count of inserted rows."""
|
"""Poll one channel for new messages. Returns count of inserted rows."""
|
||||||
|
try:
|
||||||
|
return await _poll_channel(channel_id)
|
||||||
|
except PollError as exc:
|
||||||
|
await _record_poll_error(channel_id, exc.code, exc.message)
|
||||||
|
raise
|
||||||
|
except Exception as exc:
|
||||||
|
await _record_poll_error(channel_id, "poll_failed", str(exc))
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
async def _poll_channel(channel_id: int) -> int:
|
||||||
|
if not await is_authorized():
|
||||||
|
raise PollUnauthorizedError(
|
||||||
|
"Telegram is not authorized: open Monitoring TG in Portal and authorize it"
|
||||||
|
)
|
||||||
|
|
||||||
async with session_scope() as session:
|
async with session_scope() as session:
|
||||||
channel = await session.get(Channel, channel_id)
|
channel = await session.get(Channel, channel_id)
|
||||||
if channel is None or not channel.is_active:
|
if channel is None or not channel.is_active:
|
||||||
return 0
|
return 0
|
||||||
|
if channel.source_channel_id is not None:
|
||||||
|
source = await session.get(Channel, channel.source_channel_id)
|
||||||
|
if source is not None:
|
||||||
|
channel.tg_id = None
|
||||||
|
channel.title = channel.title or source.title
|
||||||
|
channel.last_message_id = source.last_message_id
|
||||||
|
channel.last_polled_at = source.last_polled_at
|
||||||
|
channel.last_poll_status = "alias"
|
||||||
|
channel.last_poll_error_code = None
|
||||||
|
channel.last_poll_error = None
|
||||||
|
channel.last_poll_error_at = None
|
||||||
|
return 0
|
||||||
|
|
||||||
if channel.tg_id is None or channel.title is None:
|
if channel.tg_id is None or channel.title is None:
|
||||||
resolved = await resolve_channel(channel.identifier)
|
try:
|
||||||
|
resolved = await resolve_channel(channel.identifier)
|
||||||
|
except Exception as exc:
|
||||||
|
raise _translate_telegram_error(exc, channel.identifier) from exc
|
||||||
|
|
||||||
|
duplicate_id = (
|
||||||
|
await session.execute(
|
||||||
|
select(Channel.id).where(
|
||||||
|
Channel.tg_id == resolved.tg_id,
|
||||||
|
Channel.id != channel.id,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).scalar_one_or_none()
|
||||||
|
if duplicate_id is not None:
|
||||||
|
source = await session.get(Channel, duplicate_id)
|
||||||
|
channel.source_channel_id = duplicate_id
|
||||||
|
channel.tg_id = None
|
||||||
|
channel.title = channel.title or resolved.title or (source.title if source else None)
|
||||||
|
channel.last_message_id = source.last_message_id if source else channel.last_message_id
|
||||||
|
channel.last_polled_at = source.last_polled_at if source else channel.last_polled_at
|
||||||
|
channel.last_poll_status = "alias"
|
||||||
|
channel.last_poll_error_code = None
|
||||||
|
channel.last_poll_error = None
|
||||||
|
channel.last_poll_error_at = None
|
||||||
|
log.info(
|
||||||
|
"linked_channel_alias",
|
||||||
|
channel_id=channel.id,
|
||||||
|
source_channel_id=duplicate_id,
|
||||||
|
tg_id=resolved.tg_id,
|
||||||
|
)
|
||||||
|
return 0
|
||||||
channel.tg_id = resolved.tg_id
|
channel.tg_id = resolved.tg_id
|
||||||
channel.title = resolved.title
|
channel.title = resolved.title
|
||||||
|
try:
|
||||||
|
await session.flush()
|
||||||
|
except IntegrityError as exc:
|
||||||
|
raise PollDuplicateChannelError(
|
||||||
|
"Telegram channel is already connected to another channel"
|
||||||
|
) from exc
|
||||||
|
|
||||||
msgs = await fetch_new_messages(
|
try:
|
||||||
channel.identifier,
|
msgs = await fetch_new_messages(
|
||||||
min_id=channel.last_message_id,
|
channel.identifier,
|
||||||
limit=settings.poll_history_limit,
|
min_id=channel.last_message_id,
|
||||||
download_media_for_channel_id=channel.id,
|
limit=settings.poll_history_limit,
|
||||||
)
|
download_media_for_channel_id=channel.id,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
raise _translate_telegram_error(exc, channel.identifier) from exc
|
||||||
|
|
||||||
inserted = 0
|
inserted = 0
|
||||||
for m in msgs:
|
for m in msgs:
|
||||||
@@ -70,6 +199,10 @@ async def poll_channel(channel_id: int) -> int:
|
|||||||
channel.last_message_id or 0, msgs[-1].tg_message_id
|
channel.last_message_id or 0, msgs[-1].tg_message_id
|
||||||
)
|
)
|
||||||
channel.last_polled_at = datetime.now(timezone.utc)
|
channel.last_polled_at = datetime.now(timezone.utc)
|
||||||
|
channel.last_poll_status = "ok"
|
||||||
|
channel.last_poll_error_code = None
|
||||||
|
channel.last_poll_error = None
|
||||||
|
channel.last_poll_error_at = None
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
"polled_channel",
|
"polled_channel",
|
||||||
@@ -81,6 +214,17 @@ async def poll_channel(channel_id: int) -> int:
|
|||||||
return inserted
|
return inserted
|
||||||
|
|
||||||
|
|
||||||
|
async def _record_poll_error(channel_id: int, code: str, message: str) -> None:
|
||||||
|
async with session_scope() as session:
|
||||||
|
channel = await session.get(Channel, channel_id)
|
||||||
|
if channel is None:
|
||||||
|
return
|
||||||
|
channel.last_poll_status = "error"
|
||||||
|
channel.last_poll_error_code = code
|
||||||
|
channel.last_poll_error = message[:1000]
|
||||||
|
channel.last_poll_error_at = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
|
||||||
async def poll_all() -> None:
|
async def poll_all() -> None:
|
||||||
if not await is_authorized():
|
if not await is_authorized():
|
||||||
log.debug("poll_skipped_not_authorized")
|
log.debug("poll_skipped_not_authorized")
|
||||||
@@ -93,6 +237,13 @@ async def poll_all() -> None:
|
|||||||
for channel_id in ids:
|
for channel_id in ids:
|
||||||
try:
|
try:
|
||||||
await poll_channel(channel_id)
|
await poll_channel(channel_id)
|
||||||
|
except PollError as exc:
|
||||||
|
log.warning(
|
||||||
|
"poll_skipped",
|
||||||
|
channel_id=channel_id,
|
||||||
|
code=exc.code,
|
||||||
|
error=exc.message,
|
||||||
|
)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
log.error("poll_failed", channel_id=channel_id, error=str(exc))
|
log.error("poll_failed", channel_id=channel_id, error=str(exc))
|
||||||
|
|
||||||
@@ -111,10 +262,15 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
|
|||||||
if channel is None:
|
if channel is None:
|
||||||
raise RuntimeError("channel not found")
|
raise RuntimeError("channel not found")
|
||||||
|
|
||||||
|
missing_media_condition = or_(
|
||||||
|
Message.media_files.is_(None),
|
||||||
|
func.jsonb_array_length(Message.media_files) == 0,
|
||||||
|
)
|
||||||
|
|
||||||
pending_q = select(func.count(Message.id)).where(
|
pending_q = select(func.count(Message.id)).where(
|
||||||
Message.channel_id == channel_id,
|
Message.channel_id == channel_id,
|
||||||
Message.has_media.is_(True),
|
Message.has_media.is_(True),
|
||||||
Message.media_files.is_(None),
|
missing_media_condition,
|
||||||
)
|
)
|
||||||
pending_total = (await session.execute(pending_q)).scalar_one()
|
pending_total = (await session.execute(pending_q)).scalar_one()
|
||||||
|
|
||||||
@@ -124,7 +280,7 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
|
|||||||
.where(
|
.where(
|
||||||
Message.channel_id == channel_id,
|
Message.channel_id == channel_id,
|
||||||
Message.has_media.is_(True),
|
Message.has_media.is_(True),
|
||||||
Message.media_files.is_(None),
|
missing_media_condition,
|
||||||
)
|
)
|
||||||
.order_by(Message.tg_message_id.asc())
|
.order_by(Message.tg_message_id.asc())
|
||||||
.limit(batch_size)
|
.limit(batch_size)
|
||||||
|
|||||||
55
src/parser_bot/storage.py
Normal file
55
src/parser_bot/storage.py
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
import asyncio
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from minio import Minio
|
||||||
|
|
||||||
|
from parser_bot.config import settings
|
||||||
|
|
||||||
|
_client: Minio | None = None
|
||||||
|
_bucket_ready = False
|
||||||
|
|
||||||
|
|
||||||
|
def configured() -> bool:
|
||||||
|
return bool(
|
||||||
|
settings.minio_endpoint
|
||||||
|
and settings.minio_access_key
|
||||||
|
and settings.minio_secret_key
|
||||||
|
and settings.minio_bucket
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def client() -> Minio | None:
|
||||||
|
global _client
|
||||||
|
if not configured():
|
||||||
|
return None
|
||||||
|
if _client is None:
|
||||||
|
endpoint = settings.minio_endpoint.removeprefix("https://").removeprefix("http://")
|
||||||
|
_client = Minio(
|
||||||
|
endpoint,
|
||||||
|
access_key=settings.minio_access_key,
|
||||||
|
secret_key=settings.minio_secret_key,
|
||||||
|
secure=settings.minio_use_ssl,
|
||||||
|
region=settings.minio_region,
|
||||||
|
)
|
||||||
|
return _client
|
||||||
|
|
||||||
|
|
||||||
|
async def upload_file(path: Path, key: str, content_type: str | None) -> None:
|
||||||
|
cli = client()
|
||||||
|
if cli is None:
|
||||||
|
raise RuntimeError("minio is not configured")
|
||||||
|
|
||||||
|
def _upload() -> None:
|
||||||
|
global _bucket_ready
|
||||||
|
if not _bucket_ready:
|
||||||
|
if not cli.bucket_exists(settings.minio_bucket):
|
||||||
|
cli.make_bucket(settings.minio_bucket, location=settings.minio_region)
|
||||||
|
_bucket_ready = True
|
||||||
|
cli.fput_object(
|
||||||
|
settings.minio_bucket,
|
||||||
|
key,
|
||||||
|
str(path),
|
||||||
|
content_type=content_type or "application/octet-stream",
|
||||||
|
)
|
||||||
|
|
||||||
|
await asyncio.to_thread(_upload)
|
||||||
@@ -15,6 +15,7 @@ from telethon.tl.types import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
from parser_bot.config import settings
|
from parser_bot.config import settings
|
||||||
|
from parser_bot import storage
|
||||||
|
|
||||||
log = structlog.get_logger()
|
log = structlog.get_logger()
|
||||||
|
|
||||||
@@ -136,9 +137,26 @@ async def _download_message_media(
|
|||||||
if path is None:
|
if path is None:
|
||||||
info["skipped"] = "no_file"
|
info["skipped"] = "no_file"
|
||||||
return [info]
|
return [info]
|
||||||
filename = Path(path).name
|
file_path = Path(path)
|
||||||
|
filename = file_path.name
|
||||||
|
media_key = f"{channel_id}/{filename}"
|
||||||
public_base = settings.public_base_path.rstrip("/")
|
public_base = settings.public_base_path.rstrip("/")
|
||||||
info["url"] = f"{public_base}/media/{channel_id}/{filename}"
|
info["url"] = f"{public_base}/media/{media_key}"
|
||||||
|
info["key"] = media_key
|
||||||
|
info["name"] = filename
|
||||||
|
if storage.configured():
|
||||||
|
try:
|
||||||
|
await storage.upload_file(file_path, media_key, mime)
|
||||||
|
info["storage"] = "minio"
|
||||||
|
try:
|
||||||
|
file_path.unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
except Exception as exc:
|
||||||
|
log.warning("media_minio_upload_failed", msg_id=msg.id, key=media_key, error=str(exc))
|
||||||
|
info["storage"] = "local"
|
||||||
|
else:
|
||||||
|
info["storage"] = "local"
|
||||||
return [info]
|
return [info]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user