From 8259a01a88b9f2b750d0568ffca10bd92335cea5 Mon Sep 17 00:00:00 2001 From: Grendgi Date: Mon, 8 Jun 2026 15:47:42 +0300 Subject: [PATCH] Route monitoring TG classification through AI service --- README.md | 6 +- cmd/classifier/main.go | 93 +++++++--------- cmd/server/main.go | 46 +++++--- internal/aiservice/client.go | 207 +++++++++++++++++++++++++++++++++++ k8s/configmap.yaml | 2 +- k8s/secrets.yaml | 2 +- 6 files changed, 281 insertions(+), 75 deletions(-) create mode 100644 internal/aiservice/client.go diff --git a/README.md b/README.md index 252cbf6..d7a9d0b 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,8 @@ Backend-сервис мониторинга Telegram-каналов для Porta AI-классификация работают на Go, Python оставлен только как внутренний MTProto/Telethon-адаптер для авторизации, опроса каналов и дозагрузки медиа. Сервис сохраняет сообщения в Postgres, раскладывает каналы по -вертикалям/подразделам и выполняет AI-анализ через OpenAI-compatible endpoint, -общий с другими сервисами портала. +вертикалям/подразделам и выполняет AI-анализ через общий `ai-service`, +который уже сам обращается к OpenAI-compatible backend. Пользовательский UI живёт в `portal/frontend/src/app/features/monitoring-tg`. Этот сервис не отдаёт отдельные HTML-страницы и работает как API/worker за @@ -33,7 +33,7 @@ MTProto/Telethon-адаптер для авторизации, опроса ка ## Запуск в k8s Манифесты лежат в `k8s/`. Перед применением нужно заполнить `k8s/secrets.yaml` -реальными Telegram-кредами и, при необходимости, `LLM_API_KEY`. +реальными Telegram-кредами и `AI_SERVICE_TOKEN`. ```bash kubectl apply -k k8s diff --git a/cmd/classifier/main.go b/cmd/classifier/main.go index d531bfc..cf35f5b 100644 --- a/cmd/classifier/main.go +++ b/cmd/classifier/main.go @@ -1,14 +1,11 @@ package main import ( - "bytes" "context" "encoding/json" "errors" "fmt" - "io" "log/slog" - "net/http" "net/url" "os" "os/signal" @@ -17,6 +14,8 @@ import ( "syscall" "time" + "monitoring-tg/internal/aiservice" + "github.com/jackc/pgx/v5/pgxpool" ) @@ -33,14 +32,14 @@ type config struct { PostgresPort int LLMEnabled bool - LLMBaseURL string - LLMAPIKey string LLMModel string LLMTimeout time.Duration LLMMaxTokens int LLMMinTextLength int ClassifyInterval time.Duration ClassifyBatchSize int + AIServiceURL string + AIServiceToken string } type pendingMessage struct { @@ -52,29 +51,10 @@ type pendingMessage struct { Extracted map[string]any } -type chatRequest struct { - Model string `json:"model"` - Messages []chatMessage `json:"messages"` - Temperature float64 `json:"temperature"` - MaxTokens int `json:"max_tokens"` - ResponseFormat responseFmt `json:"response_format"` -} - -type chatMessage struct { - Role string `json:"role"` - Content string `json:"content"` -} - type responseFmt struct { Type string `json:"type"` } -type chatResponse struct { - Choices []struct { - Message chatMessage `json:"message"` - } `json:"choices"` -} - func main() { cfg := loadConfig() logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) @@ -95,7 +75,11 @@ func main() { } defer pool.Close() - worker := &classifier{cfg: cfg, db: pool, http: &http.Client{Timeout: cfg.LLMTimeout}} + worker := &classifier{ + cfg: cfg, + db: pool, + ai: aiservice.New(cfg.AIServiceURL, cfg.AIServiceToken, cfg.LLMTimeout), + } slog.Info( "classifier_started", "interval", cfg.ClassifyInterval.String(), @@ -124,9 +108,9 @@ func main() { } type classifier struct { - cfg config - db *pgxpool.Pool - http *http.Client + cfg config + db *pgxpool.Pool + ai *aiservice.Client } func (c *classifier) runOnce(ctx context.Context) (int, error) { @@ -218,50 +202,53 @@ func (c *classifier) classify(ctx context.Context, msg pendingMessage) (json.Raw return nil, err } - payload := chatRequest{ - Model: c.cfg.LLMModel, - Messages: []chatMessage{ + responseFormat, _ := json.Marshal(responseFmt{Type: "json_object"}) + payload := aiservice.ChatInput{ + Messages: []aiservice.Message{ {Role: "system", Content: systemPrompt}, {Role: "user", Content: buildUserPrompt(msg.Text)}, }, Temperature: 0.1, MaxTokens: c.cfg.LLMMaxTokens, - ResponseFormat: responseFmt{Type: "json_object"}, + ResponseFormat: responseFormat, } body, err := json.Marshal(payload) if err != nil { return nil, err } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, strings.TrimRight(c.cfg.LLMBaseURL, "/")+"/v1/chat/completions", bytes.NewReader(body)) + job, err := c.ai.CreateJob(ctx, aiservice.CreateJobRequest{ + OwnerService: "monitoring-tg", + OwnerRef: fmt.Sprintf("%d", msg.ID), + TaskType: "telegram_classification", + ModelProfile: c.cfg.LLMModel, + Priority: 5, + MaxAttempts: 2, + Input: body, + IdempotencyKey: fmt.Sprintf("monitoring-tg:telegram_classification:%d", msg.ID), + }) if err != nil { return nil, err } - req.Header.Set("Content-Type", "application/json") - if c.cfg.LLMAPIKey != "" { - req.Header.Set("Authorization", "Bearer "+c.cfg.LLMAPIKey) - } - - resp, err := c.http.Do(req) + waitCtx, cancel := context.WithTimeout(ctx, c.cfg.LLMTimeout) + defer cancel() + job, err = c.ai.WaitJob(waitCtx, job.ID, 2*time.Second) if err != nil { return nil, err } - defer resp.Body.Close() - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - b, _ := io.ReadAll(io.LimitReader(resp.Body, 512)) - return nil, fmt.Errorf("llm http %d: %s", resp.StatusCode, strings.TrimSpace(string(b))) + if job.Status != "done" { + msg := "ai-service job " + job.Status + if job.ErrorMessage != nil && *job.ErrorMessage != "" { + msg += ": " + *job.ErrorMessage + } + return nil, errors.New(msg) } - - var parsed chatResponse - if err := json.NewDecoder(resp.Body).Decode(&parsed); err != nil { + var parsed aiservice.ChatResult + if err := json.Unmarshal(job.Result, &parsed); err != nil { return nil, err } - if len(parsed.Choices) == 0 { - return nil, errors.New("llm returned no choices") - } - raw := strings.TrimSpace(parsed.Choices[0].Message.Content) + raw := strings.TrimSpace(parsed.Content) if raw == "" { return nil, errors.New("llm returned empty content") } @@ -425,14 +412,14 @@ func loadConfig() config { PostgresHost: env("POSTGRES_HOST", "db"), PostgresPort: envInt("POSTGRES_PORT", 5432), LLMEnabled: envBool("LLM_ENABLED", true), - LLMBaseURL: env("LLM_BASE_URL", "http://10.2.3.5:8002"), - LLMAPIKey: env("LLM_API_KEY", ""), LLMModel: env("LLM_MODEL", "qwen2.5-14b"), LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second, LLMMaxTokens: envInt("LLM_MAX_TOKENS", 600), LLMMinTextLength: envInt("LLM_MIN_TEXT_LENGTH", 20), ClassifyInterval: time.Duration(envInt("LLM_CLASSIFY_INTERVAL_SECONDS", 20)) * time.Second, ClassifyBatchSize: envInt("LLM_CLASSIFY_BATCH_SIZE", 5), + AIServiceURL: env("AI_SERVICE_URL", ""), + AIServiceToken: env("AI_SERVICE_TOKEN", ""), } } diff --git a/cmd/server/main.go b/cmd/server/main.go index d500f38..1f30bff 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -20,6 +20,8 @@ import ( "syscall" "time" + "monitoring-tg/internal/aiservice" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/minio/minio-go/v7" @@ -43,10 +45,10 @@ type config struct { PostgresPort int PollIntervalSeconds int LLMEnabled bool - LLMBaseURL string - LLMAPIKey string LLMModel string LLMTimeout time.Duration + AIServiceURL string + AIServiceToken string MinioEndpoint string MinioAccessKey string MinioSecretKey string @@ -62,6 +64,7 @@ type app struct { http *http.Client python *http.Client minio *minio.Client + ai *aiservice.Client } type accessScope struct { @@ -149,6 +152,7 @@ func main() { http: &http.Client{Timeout: cfg.LLMTimeout}, python: &http.Client{Timeout: 15 * time.Minute}, minio: minioClient, + ai: aiservice.New(cfg.AIServiceURL, cfg.AIServiceToken, cfg.LLMTimeout), } server := &http.Server{ @@ -1118,24 +1122,32 @@ func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Re func (a *app) handleLLMStatus(ctx context.Context, w http.ResponseWriter) { ready := false + var providerError string + model := a.cfg.LLMModel if a.cfg.LLMEnabled { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, strings.TrimRight(a.cfg.LLMBaseURL, "/")+"/v1/models", nil) - if err == nil { - if a.cfg.LLMAPIKey != "" { - req.Header.Set("Authorization", "Bearer "+a.cfg.LLMAPIKey) - } - resp, err := a.http.Do(req) - if err == nil { - ready = resp.StatusCode >= 200 && resp.StatusCode < 300 - _ = resp.Body.Close() + status, err := a.ai.ProvidersStatus(ctx) + if err != nil { + providerError = err.Error() + } else { + for _, provider := range status.Providers { + if provider.Name == "llm" { + ready = provider.Configured && provider.OK + providerError = provider.Error + if provider.Model != "" { + model = provider.Model + } + break + } } } } writeJSON(w, http.StatusOK, map[string]any{ - "enabled": a.cfg.LLMEnabled, - "ready": ready, - "base_url": a.cfg.LLMBaseURL, - "model": a.cfg.LLMModel, + "enabled": a.cfg.LLMEnabled, + "ready": ready, + "base_url": a.cfg.AIServiceURL, + "model": model, + "provider": "ai-service", + "provider_error": providerError, }) } @@ -1777,10 +1789,10 @@ func loadConfig() config { PostgresPort: envInt("POSTGRES_PORT", 5432), PollIntervalSeconds: envInt("POLL_INTERVAL_SECONDS", 60), LLMEnabled: envBool("LLM_ENABLED", true), - LLMBaseURL: env("LLM_BASE_URL", "http://10.2.3.5:8002"), - LLMAPIKey: env("LLM_API_KEY", ""), LLMModel: env("LLM_MODEL", "qwen2.5-14b"), LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second, + AIServiceURL: env("AI_SERVICE_URL", ""), + AIServiceToken: env("AI_SERVICE_TOKEN", ""), MinioEndpoint: env("MINIO_ENDPOINT", ""), MinioAccessKey: env("MINIO_ACCESS_KEY", ""), MinioSecretKey: env("MINIO_SECRET_KEY", ""), diff --git a/internal/aiservice/client.go b/internal/aiservice/client.go new file mode 100644 index 0000000..5be7a3b --- /dev/null +++ b/internal/aiservice/client.go @@ -0,0 +1,207 @@ +package aiservice + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +type Client struct { + baseURL string + token string + http *http.Client +} + +type Message struct { + Role string `json:"role"` + Content string `json:"content"` +} + +type ChatInput struct { + Messages []Message `json:"messages"` + Temperature float64 `json:"temperature"` + MaxTokens int `json:"max_tokens,omitempty"` + ResponseFormat json.RawMessage `json:"response_format,omitempty"` +} + +type CreateJobRequest struct { + OwnerService string `json:"owner_service"` + OwnerRef string `json:"owner_ref"` + TaskType string `json:"task_type"` + ModelProfile string `json:"model_profile"` + Priority int `json:"priority"` + MaxAttempts int `json:"max_attempts"` + Input json.RawMessage `json:"input"` + IdempotencyKey string `json:"idempotency_key,omitempty"` +} + +type Job struct { + ID string `json:"id"` + Status string `json:"status"` + Result json.RawMessage `json:"result,omitempty"` + ErrorCode *string `json:"error_code,omitempty"` + ErrorMessage *string `json:"error_message,omitempty"` +} + +type ChatResult struct { + Content string `json:"content"` + Model string `json:"model"` + DurationMS int64 `json:"duration_ms"` +} + +type ProvidersStatus struct { + At time.Time `json:"at"` + Providers []ProviderStatus `json:"providers"` +} + +type ProviderStatus struct { + Name string `json:"name"` + Configured bool `json:"configured"` + OK bool `json:"ok"` + URL string `json:"url,omitempty"` + Model string `json:"model,omitempty"` + LatencyMS int64 `json:"latency_ms,omitempty"` + Error string `json:"error,omitempty"` +} + +func New(baseURL, token string, timeout time.Duration) *Client { + baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/") + if baseURL == "" { + return nil + } + if timeout <= 0 { + timeout = 2 * time.Minute + } + return &Client{ + baseURL: baseURL, + token: strings.TrimSpace(token), + http: &http.Client{Timeout: timeout}, + } +} + +func (c *Client) CreateJob(ctx context.Context, req CreateJobRequest) (*Job, error) { + if c == nil { + return nil, fmt.Errorf("ai-service not configured") + } + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal ai job: %w", err) + } + httpReq, err := c.request(ctx, http.MethodPost, "/api/v1/jobs", body) + if err != nil { + return nil, err + } + resp, err := c.http.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("create ai job: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("create ai job: http %d: %s", resp.StatusCode, readSmall(resp.Body)) + } + var out Job + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("decode ai job: %w", err) + } + return &out, nil +} + +func (c *Client) GetJob(ctx context.Context, id string) (*Job, error) { + if c == nil || strings.TrimSpace(id) == "" { + return nil, fmt.Errorf("ai job id is required") + } + req, err := c.request(ctx, http.MethodGet, "/api/v1/jobs/"+id, nil) + if err != nil { + return nil, err + } + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("get ai job: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("get ai job: http %d: %s", resp.StatusCode, readSmall(resp.Body)) + } + var out Job + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("decode ai job: %w", err) + } + return &out, nil +} + +func (c *Client) WaitJob(ctx context.Context, id string, pollInterval time.Duration) (*Job, error) { + if pollInterval <= 0 { + pollInterval = 2 * time.Second + } + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + for { + job, err := c.GetJob(ctx, id) + if err != nil { + return nil, err + } + switch job.Status { + case "done", "failed", "cancelled": + return job, nil + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + } + } +} + +func (c *Client) ProvidersStatus(ctx context.Context) (*ProvidersStatus, error) { + if c == nil { + return nil, fmt.Errorf("ai-service not configured") + } + req, err := c.request(ctx, http.MethodGet, "/api/v1/providers/status", nil) + if err != nil { + return nil, err + } + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("ai providers status: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("ai providers status: http %d: %s", resp.StatusCode, readSmall(resp.Body)) + } + var out ProvidersStatus + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("decode ai providers status: %w", err) + } + return &out, nil +} + +func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) { + var r io.Reader + if body != nil { + r = bytes.NewReader(body) + } + req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, r) + if err != nil { + return nil, err + } + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + if c.token != "" { + req.Header.Set("Authorization", "Bearer "+c.token) + } + return req, nil +} + +func readSmall(r io.Reader) string { + body, err := io.ReadAll(io.LimitReader(r, 1024)) + if err != nil { + return err.Error() + } + return strings.TrimSpace(string(body)) +} diff --git a/k8s/configmap.yaml b/k8s/configmap.yaml index db0e551..66cefff 100644 --- a/k8s/configmap.yaml +++ b/k8s/configmap.yaml @@ -22,7 +22,7 @@ data: POLL_INTERVAL_SECONDS: "60" POLL_HISTORY_LIMIT: "50" LLM_ENABLED: "1" - LLM_BASE_URL: "http://10.2.3.5:8002" LLM_MODEL: "qwen2.5-14b" LLM_MAX_TOKENS: "600" LLM_CLASSIFIER_OWNER: "go" + AI_SERVICE_URL: "http://ai-service.ai-service.svc.cluster.local:8080" diff --git a/k8s/secrets.yaml b/k8s/secrets.yaml index cafcfaf..55f6ac4 100644 --- a/k8s/secrets.yaml +++ b/k8s/secrets.yaml @@ -10,7 +10,7 @@ stringData: TG_PHONE: "+971524994695" TG_SESSION_STRING: "" POSTGRES_PASSWORD: "parser" - LLM_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32" + AI_SERVICE_TOKEN: "d18bcacf9e02bae1806ee6b6eeda62b95be6a915c0a22936d9a700128b275442" MINIO_ACCESS_KEY: "admjn" MINIO_SECRET_KEY: "TropicalMacaw9Fantasize" ---