Files
monitoring-tg/cmd/server/main.go
Grendgi cdbdea250d
All checks were successful
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 45s
CI / go (push) Successful in 31s
CI / python (push) Successful in 2s
feat: expose monitoring tg media repair details
2026-06-17 17:22:50 +03:00

2445 lines
74 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"bytes"
"context"
"crypto/tls"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"
"monitoring-tg/internal/aiservice"
"monitoring-tg/internal/dbretry"
commonmw "gitea.estateliga.work/admin/portal-common/middleware"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
const (
verticalHR = "hr"
)
type config struct {
APIHost string
APIPort int
PublicBasePath string
PythonBaseURL string
MediaDir string
PostgresUser string
PostgresPassword string
PostgresDB string
PostgresHost string
PostgresPort int
PollIntervalSeconds int
LLMEnabled bool
LLMModel string
LLMTimeout time.Duration
AIServiceURL string
AIServiceToken string
InternalAPIKey string
MinioEndpoint string
MinioAccessKey string
MinioSecretKey string
MinioBucket string
MinioUseSSL bool
MinioRegion string
MinioInsecureTLS bool
}
type app struct {
cfg config
db *pgxpool.Pool
http *http.Client
python *http.Client
minio *minio.Client
ai *aiservice.Client
}
type accessScope struct {
IsAdmin bool
CanManage bool
CanAuth bool
DeptID string
DeptIDs []string
}
type sectionOut struct {
ID int64 `json:"id"`
Vertical string `json:"vertical"`
DepartmentID *string `json:"department_id,omitempty"`
Slug string `json:"slug"`
Title string `json:"title"`
Emoji *string `json:"emoji,omitempty"`
Description *string `json:"description,omitempty"`
CreatedAt time.Time `json:"created_at"`
ChannelsTotal int64 `json:"channels_total"`
ChannelsActive int64 `json:"channels_active"`
MessagesTotal int64 `json:"messages_total"`
LeadsTotal int64 `json:"leads_total"`
}
type channelOut struct {
ID int64 `json:"id"`
TGID *int64 `json:"tg_id,omitempty"`
SourceChannelID *int64 `json:"source_channel_id,omitempty"`
Identifier string `json:"identifier"`
Title *string `json:"title,omitempty"`
Vertical string `json:"vertical"`
SectionID int64 `json:"section_id"`
SectionSlug *string `json:"section_slug,omitempty"`
IsActive bool `json:"is_active"`
LastMessageID *int64 `json:"last_message_id,omitempty"`
LastPolledAt *time.Time `json:"last_polled_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
type messageOut struct {
ID int64 `json:"id"`
ChannelID int64 `json:"channel_id"`
ChannelVertical *string `json:"channel_vertical,omitempty"`
ChannelSectionSlug *string `json:"channel_section_slug,omitempty"`
TGMessageID int64 `json:"tg_message_id"`
GroupedID *int64 `json:"grouped_id,omitempty"`
GroupSize int `json:"group_size"`
Date time.Time `json:"date"`
Text *string `json:"text,omitempty"`
SenderID *int64 `json:"sender_id,omitempty"`
SenderUsername *string `json:"sender_username,omitempty"`
SenderName *string `json:"sender_name,omitempty"`
PostURL *string `json:"post_url,omitempty"`
HasMedia bool `json:"has_media"`
MediaFiles []map[string]any `json:"media_files,omitempty"`
Extracted json.RawMessage `json:"extracted,omitempty"`
Views *int64 `json:"views,omitempty"`
Forwards *int64 `json:"forwards,omitempty"`
FetchedAt time.Time `json:"fetched_at"`
}
type componentProbe struct {
Name string `json:"name"`
Status string `json:"status"`
LatencyMs int64 `json:"latency_ms"`
Error string `json:"error,omitempty"`
Details any `json:"details,omitempty"`
}
func main() {
cfg := loadConfig()
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
slog.SetDefault(logger)
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
pool, err := dbretry.Connect(ctx, cfg.databaseURL(), 2*time.Minute)
if err != nil {
slog.Error("db_connect_failed", "error", err)
os.Exit(1)
}
defer pool.Close()
minioClient, err := newMinioClient(cfg)
if err != nil {
slog.Error("minio_init_failed", "error", err)
os.Exit(1)
}
srvApp := &app{
cfg: cfg,
db: pool,
http: &http.Client{Timeout: cfg.LLMTimeout},
python: &http.Client{Timeout: 15 * time.Minute},
minio: minioClient,
ai: aiservice.New(cfg.AIServiceURL, cfg.AIServiceToken, cfg.LLMTimeout),
}
server := &http.Server{
Addr: fmt.Sprintf("%s:%d", cfg.APIHost, cfg.APIPort),
Handler: http.HandlerFunc(srvApp.serveHTTP),
ReadHeaderTimeout: 10 * time.Second,
}
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = server.Shutdown(shutdownCtx)
}()
slog.Info("go_api_started", "addr", server.Addr, "python_base_url", cfg.PythonBaseURL)
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
slog.Error("server_failed", "error", err)
os.Exit(1)
}
}
func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) {
path := a.apiPath(r.URL.Path)
if path == "/healthz" {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
return
}
if path == "/health/detail" {
a.handleHealthDetail(w, r)
return
}
if path == "/" {
writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-tg", "ui": "portal", "api": "go"})
return
}
if !strings.HasPrefix(path, "/api/v1") {
writeError(w, http.StatusNotFound, "not found")
return
}
commonmw.InternalAuth(a.cfg.InternalAPIKey)(http.HandlerFunc(a.serveAPI)).ServeHTTP(w, r)
}
func (a *app) serveAPI(w http.ResponseWriter, r *http.Request) {
path := a.apiPath(r.URL.Path)
ctx := r.Context()
switch {
case r.Method == http.MethodGet && path == "/api/v1/access/me":
a.handleAccessMe(w, r)
case path == "/api/v1/auth/status" || strings.HasPrefix(path, "/api/v1/auth/"):
a.proxyPython(w, r, path)
case path == "/api/v1/sections":
a.handleSections(w, r)
case strings.HasPrefix(path, "/api/v1/sections/"):
a.handleSectionItem(w, r, path)
case path == "/api/v1/channels":
a.handleChannels(w, r)
case strings.HasPrefix(path, "/api/v1/channels/"):
a.handleChannelItem(ctx, w, r, path)
case strings.HasPrefix(path, "/api/v1/media/"):
a.handleMedia(w, r, path)
case path == "/api/v1/messages":
a.handleMessages(ctx, w, r)
case strings.HasPrefix(path, "/api/v1/messages/"):
a.handleMessageItem(ctx, w, r, path)
case path == "/api/v1/stats":
a.handleStats(ctx, w, r)
case path == "/api/v1/llm/status":
a.handleLLMStatus(ctx, w)
case path == "/api/v1/llm/queue":
a.handleLLMQueue(ctx, w, r)
case path == "/api/v1/llm/prompt":
a.handlePrompt(ctx, w, r)
case r.Method == http.MethodPost && path == "/api/v1/poll":
a.proxyPython(w, r, path)
default:
writeError(w, http.StatusNotFound, "not found")
}
}
func (a *app) apiPath(path string) string {
base := strings.TrimRight(a.cfg.PublicBasePath, "/")
if base != "" && strings.HasPrefix(path, base+"/") {
return strings.TrimPrefix(path, base)
}
if base != "" && path == base {
return "/"
}
return path
}
func (a *app) handleHealthDetail(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
defer cancel()
components := []componentProbe{
a.probePostgres(ctx),
a.probeAIService(ctx),
a.probeClassificationQueue(ctx),
a.probeAIJobs(ctx),
a.probePoller(ctx),
a.probePollErrors(ctx),
a.probeMediaStorage(ctx),
a.probeMediaMetadata(ctx),
}
writeJSON(w, http.StatusOK, map[string]any{"components": components})
}
func (a *app) probePostgres(ctx context.Context) componentProbe {
start := time.Now()
if err := a.db.Ping(ctx); err != nil {
return componentProbe{Name: "postgres", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
return componentProbe{Name: "postgres", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) probeAIService(ctx context.Context) componentProbe {
start := time.Now()
if !a.cfg.LLMEnabled {
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm disabled"}
}
status, err := a.ai.ProvidersStatus(ctx)
if err != nil {
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
for _, provider := range status.Providers {
if provider.Name != "llm" {
continue
}
if provider.Configured && provider.OK {
return componentProbe{Name: "ai_service", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
errMsg := strings.TrimSpace(provider.Error)
if errMsg == "" {
errMsg = "llm provider is not ready"
}
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: errMsg}
}
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm provider not found"}
}
func (a *app) probeClassificationQueue(ctx context.Context) componentProbe {
start := time.Now()
var pending, pending24h int64
err := a.db.QueryRow(ctx, `
SELECT
COUNT(*)::bigint,
COUNT(*) FILTER (WHERE m.fetched_at >= now() - interval '24 hours')::bigint
FROM channels c
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
JOIN messages m ON m.channel_id = src.id
JOIN sections s ON s.id = c.section_id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE m.text IS NOT NULL
AND mc.id IS NULL`,
).Scan(&pending, &pending24h)
if err != nil {
return componentProbe{Name: "classification_queue", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
if pending > 0 {
return componentProbe{
Name: "classification_queue",
Status: "down",
LatencyMs: time.Since(start).Milliseconds(),
Error: "pending=" + strconv.FormatInt(pending, 10) + " pending_24h=" + strconv.FormatInt(pending24h, 10),
}
}
return componentProbe{Name: "classification_queue", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) probeAIJobs(ctx context.Context) componentProbe {
start := time.Now()
if !a.cfg.LLMEnabled {
return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm disabled"}
}
stats, err := a.ai.Stats(ctx)
if err != nil {
return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
var pending, running, staleRunning, failed, failed24h int64
for _, row := range stats.Backlog {
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
continue
}
pending += row.Pending
running += row.Running
staleRunning += row.StaleRunning
}
for _, row := range stats.Owners {
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
continue
}
if row.Status == "failed" {
failed += row.Total
}
}
for _, row := range stats.Errors {
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
continue
}
failed24h += row.Last24h
}
if staleRunning > 0 || failed24h > 0 {
return componentProbe{
Name: "ai_jobs",
Status: "down",
LatencyMs: time.Since(start).Milliseconds(),
Error: "pending=" + strconv.FormatInt(pending, 10) +
" running=" + strconv.FormatInt(running, 10) +
" stale_running=" + strconv.FormatInt(staleRunning, 10) +
" failed=" + strconv.FormatInt(failed, 10) +
" failed_24h=" + strconv.FormatInt(failed24h, 10),
}
}
if pending > 0 || running > 0 || failed > 0 {
return componentProbe{
Name: "ai_jobs",
Status: "degraded",
LatencyMs: time.Since(start).Milliseconds(),
Error: "pending=" + strconv.FormatInt(pending, 10) +
" running=" + strconv.FormatInt(running, 10) +
" stale_running=" + strconv.FormatInt(staleRunning, 10) +
" failed=" + strconv.FormatInt(failed, 10),
}
}
return componentProbe{Name: "ai_jobs", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) probePoller(ctx context.Context) componentProbe {
start := time.Now()
staleAfter := maxInt(a.cfg.PollIntervalSeconds*3, 900)
var active, neverPolled, stale int64
err := a.db.QueryRow(ctx, `
SELECT
COUNT(*)::bigint,
COUNT(*) FILTER (WHERE last_polled_at IS NULL)::bigint,
COUNT(*) FILTER (WHERE last_polled_at IS NOT NULL AND last_polled_at < now() - ($1::int * interval '1 second'))::bigint
FROM channels
WHERE is_active = true
AND source_channel_id IS NULL`,
staleAfter,
).Scan(&active, &neverPolled, &stale)
if err != nil {
return componentProbe{Name: "poller", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
if active > 0 && (neverPolled > 0 || stale > 0) {
return componentProbe{
Name: "poller",
Status: "down",
LatencyMs: time.Since(start).Milliseconds(),
Error: "active=" + strconv.FormatInt(active, 10) +
" never_polled=" + strconv.FormatInt(neverPolled, 10) +
" stale=" + strconv.FormatInt(stale, 10) +
" stale_after_sec=" + strconv.Itoa(staleAfter),
}
}
return componentProbe{Name: "poller", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) probePollErrors(ctx context.Context) componentProbe {
start := time.Now()
var total, floodWait, unavailable, other int64
err := a.db.QueryRow(ctx, `
SELECT
COUNT(*) FILTER (WHERE last_poll_status = 'error')::bigint,
COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_flood_wait')::bigint,
COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_channel_unavailable')::bigint,
COUNT(*) FILTER (
WHERE last_poll_status = 'error'
AND COALESCE(last_poll_error_code, '') NOT IN ('telegram_flood_wait', 'telegram_channel_unavailable')
)::bigint
FROM channels
WHERE is_active = true
AND source_channel_id IS NULL`,
).Scan(&total, &floodWait, &unavailable, &other)
if err != nil {
return componentProbe{Name: "poll_errors", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
if total > 0 {
recent, recentErr := a.recentPollErrors(ctx)
status := "degraded"
if other > 0 {
status = "down"
}
details := map[string]any{
"total": total,
"flood_wait": floodWait,
"unavailable": unavailable,
"other": other,
}
if recentErr != nil {
details["recent_error"] = recentErr.Error()
} else {
details["recent_errors"] = recent
}
return componentProbe{
Name: "poll_errors",
Status: status,
LatencyMs: time.Since(start).Milliseconds(),
Error: "total=" + strconv.FormatInt(total, 10) +
" flood_wait=" + strconv.FormatInt(floodWait, 10) +
" unavailable=" + strconv.FormatInt(unavailable, 10) +
" other=" + strconv.FormatInt(other, 10),
Details: details,
}
}
return componentProbe{Name: "poll_errors", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) recentPollErrors(ctx context.Context) ([]map[string]any, error) {
rows, err := a.db.Query(ctx, `
SELECT
c.id,
c.identifier,
COALESCE(c.title, '') AS title,
s.slug,
s.title AS section_title,
COALESCE(c.last_poll_error_code, '') AS error_code,
COALESCE(c.last_poll_error, '') AS error_text,
c.last_poll_error_at
FROM channels c
LEFT JOIN sections s ON s.id = c.section_id
WHERE c.is_active = true
AND c.source_channel_id IS NULL
AND c.last_poll_status = 'error'
ORDER BY c.last_poll_error_at DESC NULLS LAST, c.id DESC
LIMIT 5`)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]map[string]any, 0, 5)
for rows.Next() {
var (
id int64
identifier string
title string
sectionSlug sql.NullString
sectionTitle sql.NullString
code string
text string
at sql.NullTime
)
if err := rows.Scan(&id, &identifier, &title, &sectionSlug, &sectionTitle, &code, &text, &at); err != nil {
return nil, err
}
out = append(out, map[string]any{
"channel_id": id,
"identifier": identifier,
"title": nullableString(title),
"section_slug": nullString(sectionSlug),
"section_title": nullString(sectionTitle),
"error_code": nullableString(code),
"error": nullableString(text),
"error_at": nullTime(at),
})
}
return out, rows.Err()
}
func (a *app) probeMediaStorage(ctx context.Context) componentProbe {
start := time.Now()
if a.minio == nil || a.cfg.MinioBucket == "" {
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "minio not configured"}
}
exists, err := a.minio.BucketExists(ctx, a.cfg.MinioBucket)
if err != nil {
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
if !exists {
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "bucket not found: " + a.cfg.MinioBucket}
}
return componentProbe{Name: "media_storage", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) probeMediaMetadata(ctx context.Context) componentProbe {
start := time.Now()
var withMedia, missingFiles int64
err := a.db.QueryRow(ctx, `
SELECT
COUNT(*) FILTER (WHERE has_media = true)::bigint,
COUNT(*) FILTER (
WHERE has_media = true
AND jsonb_array_length(COALESCE(media_files, '[]'::jsonb)) = 0
)::bigint
FROM messages`,
).Scan(&withMedia, &missingFiles)
if err != nil {
return componentProbe{Name: "media_metadata", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
if missingFiles > 0 {
recent, recentErr := a.recentMissingMedia(ctx)
details := map[string]any{
"messages_with_media": withMedia,
"missing_media_files": missingFiles,
}
if recentErr != nil {
details["recent_error"] = recentErr.Error()
} else {
details["recent_missing_media"] = recent
}
return componentProbe{
Name: "media_metadata",
Status: "down",
LatencyMs: time.Since(start).Milliseconds(),
Error: "messages_with_media=" + strconv.FormatInt(withMedia, 10) +
" missing_media_files=" + strconv.FormatInt(missingFiles, 10),
Details: details,
}
}
return componentProbe{Name: "media_metadata", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) recentMissingMedia(ctx context.Context) ([]map[string]any, error) {
rows, err := a.db.Query(ctx, `
SELECT
m.id,
m.tg_message_id,
m.date,
c.id AS channel_id,
c.identifier,
COALESCE(c.title, '') AS channel_title,
s.slug,
s.title AS section_title
FROM messages m
JOIN channels c ON c.id = m.channel_id
LEFT JOIN sections s ON s.id = c.section_id
WHERE m.has_media = true
AND jsonb_array_length(COALESCE(m.media_files, '[]'::jsonb)) = 0
ORDER BY m.date DESC, m.id DESC
LIMIT 5`)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]map[string]any, 0, 5)
for rows.Next() {
var (
id int64
tgMessageID int64
date time.Time
channelID int64
identifier string
channelTitle string
sectionSlug sql.NullString
sectionTitle sql.NullString
)
if err := rows.Scan(&id, &tgMessageID, &date, &channelID, &identifier, &channelTitle, &sectionSlug, &sectionTitle); err != nil {
return nil, err
}
out = append(out, map[string]any{
"message_id": id,
"tg_message_id": tgMessageID,
"message_date": date,
"channel_id": channelID,
"identifier": identifier,
"channel_title": nullableString(channelTitle),
"section_slug": nullString(sectionSlug),
"section_title": nullString(sectionTitle),
"repair_action": "/api/v1/channels/" + strconv.FormatInt(channelID, 10) + "/backfill-media",
})
}
return out, rows.Err()
}
func (a *app) handleAccessMe(w http.ResponseWriter, r *http.Request) {
scope := readAccess(r)
writeJSON(w, http.StatusOK, map[string]any{
"is_admin": scope.IsAdmin,
"can_manage_department": scope.CanManage,
"can_auth_telegram": scope.CanAuth,
"department_id": nullableString(scope.DeptID),
"department_ids": scope.departmentIDs(),
})
}
func (a *app) handleSections(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
a.listSections(r.Context(), w, r)
case http.MethodPost:
a.createSection(r.Context(), w, r)
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
}
}
func (a *app) listSections(ctx context.Context, w http.ResponseWriter, r *http.Request) {
scope, ok := a.readScope(w, r, false)
if !ok {
return
}
vertical := queryRequired(w, r, "vertical")
if vertical == "" {
return
}
args := []any{vertical}
deptFilter := ""
if !scope.IsAdmin {
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
}
rows, err := a.db.Query(ctx, `
SELECT
s.id,
s.vertical,
COALESCE(s.department_id, ''),
s.slug,
s.title,
COALESCE(s.emoji, ''),
COALESCE(s.description, ''),
s.created_at,
(SELECT count(*) FROM channels c WHERE c.section_id = s.id),
(SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true),
(SELECT count(*)
FROM channels c
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
JOIN messages m ON m.channel_id = src.id
WHERE c.section_id = s.id),
(SELECT count(*)
FROM channels c
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
JOIN messages m ON m.channel_id = src.id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE c.section_id = s.id
AND (
(s.vertical = 'hr' AND COALESCE(mc.verdict->>'is_lead', m.extracted->'hr_lead'->>'is_lead') = 'true')
OR (s.vertical <> 'hr' AND COALESCE(mc.verdict->>'is_listing', m.extracted->'lead'->>'is_listing') = 'true')
))
FROM sections s
WHERE s.vertical = $1`+deptFilter+`
ORDER BY s.created_at ASC, s.id ASC
`, args...)
if err != nil {
writeDBError(w, err)
return
}
defer rows.Close()
out := []sectionOut{}
for rows.Next() {
item, err := scanSection(rows)
if err != nil {
writeDBError(w, err)
return
}
out = append(out, item)
}
if err := rows.Err(); err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusOK, out)
}
func (a *app) createSection(ctx context.Context, w http.ResponseWriter, r *http.Request) {
scope, ok := a.readScope(w, r, true)
if !ok {
return
}
var payload struct {
Vertical string `json:"vertical"`
Slug string `json:"slug"`
Title string `json:"title"`
Emoji *string `json:"emoji"`
Description *string `json:"description"`
}
if !readBody(w, r, &payload) {
return
}
payload.Vertical = strings.TrimSpace(payload.Vertical)
payload.Slug = normalizeSlug(payload.Slug)
payload.Title = strings.TrimSpace(payload.Title)
if payload.Vertical == "" || payload.Slug == "" || payload.Title == "" {
writeError(w, http.StatusBadRequest, "vertical, slug and title are required")
return
}
dept := nullableString(scope.primaryDepartmentID())
row := a.db.QueryRow(ctx, `
INSERT INTO sections (vertical, department_id, slug, title, emoji, description)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at,
0::bigint, 0::bigint, 0::bigint, 0::bigint
`, payload.Vertical, dept, payload.Slug, payload.Title, payload.Emoji, payload.Description)
item, err := scanSectionRow(row)
if err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusCreated, item)
}
func (a *app) handleSectionItem(w http.ResponseWriter, r *http.Request, path string) {
rest := strings.TrimPrefix(path, "/api/v1/sections/")
parts := strings.SplitN(rest, "/", 2)
if len(parts) != 2 {
writeError(w, http.StatusNotFound, "not found")
return
}
vertical, err := url.PathUnescape(parts[0])
if err != nil {
writeError(w, http.StatusBadRequest, "bad vertical")
return
}
slug, err := url.PathUnescape(parts[1])
if err != nil {
writeError(w, http.StatusBadRequest, "bad slug")
return
}
switch r.Method {
case http.MethodGet:
a.getSection(r.Context(), w, r, vertical, slug)
case http.MethodPatch:
a.updateSection(r.Context(), w, r, vertical, slug)
case http.MethodDelete:
a.deleteSection(r.Context(), w, r, vertical, slug)
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
}
}
func (a *app) getSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) {
scope, ok := a.readScope(w, r, false)
if !ok {
return
}
item, err := a.findSection(ctx, vertical, slug, scope)
if err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusOK, item)
}
func (a *app) updateSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) {
scope, ok := a.readScope(w, r, true)
if !ok {
return
}
var payload struct {
Title *string `json:"title"`
Emoji *string `json:"emoji"`
Description *string `json:"description"`
}
if !readBody(w, r, &payload) {
return
}
set := []string{}
args := []any{}
if payload.Title != nil {
args = append(args, strings.TrimSpace(*payload.Title))
set = append(set, fmt.Sprintf("title = $%d", len(args)))
}
if payload.Emoji != nil {
args = append(args, nullableString(strings.TrimSpace(*payload.Emoji)))
set = append(set, fmt.Sprintf("emoji = $%d", len(args)))
}
if payload.Description != nil {
args = append(args, nullableString(strings.TrimSpace(*payload.Description)))
set = append(set, fmt.Sprintf("description = $%d", len(args)))
}
if len(set) == 0 {
a.getSection(ctx, w, r, vertical, slug)
return
}
args = append(args, vertical, slug)
where := fmt.Sprintf("vertical = $%d AND slug = $%d", len(args)-1, len(args))
if !scope.IsAdmin {
var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "department_id")
where += deptFilter
}
row := a.db.QueryRow(ctx, `
UPDATE sections
SET `+strings.Join(set, ", ")+`
WHERE `+where+`
RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at,
(SELECT count(*) FROM channels c WHERE c.section_id = sections.id),
(SELECT count(*) FROM channels c WHERE c.section_id = sections.id AND c.is_active = true),
(SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id WHERE c.section_id = sections.id),
0::bigint
`, args...)
item, err := scanSectionRow(row)
if err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusOK, item)
}
func (a *app) deleteSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) {
scope, ok := a.readScope(w, r, true)
if !ok {
return
}
section, err := a.findSection(ctx, vertical, slug, scope)
if err != nil {
writeDBError(w, err)
return
}
var count int64
if err := a.db.QueryRow(ctx, `SELECT count(*) FROM channels WHERE section_id = $1`, section.ID).Scan(&count); err != nil {
writeDBError(w, err)
return
}
if count > 0 {
writeError(w, http.StatusBadRequest, fmt.Sprintf("section has %d channels - move or delete them first", count))
return
}
if _, err := a.db.Exec(ctx, `DELETE FROM sections WHERE id = $1`, section.ID); err != nil {
writeDBError(w, err)
return
}
w.WriteHeader(http.StatusNoContent)
}
func (a *app) findSection(ctx context.Context, vertical, slug string, scope accessScope) (sectionOut, error) {
args := []any{vertical, slug}
where := "s.vertical = $1 AND s.slug = $2"
if !scope.IsAdmin {
var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter
}
row := a.db.QueryRow(ctx, `
SELECT s.id, s.vertical, COALESCE(s.department_id, ''), s.slug, s.title, COALESCE(s.emoji, ''), COALESCE(s.description, ''), s.created_at,
(SELECT count(*) FROM channels c WHERE c.section_id = s.id),
(SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true),
(SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id WHERE c.section_id = s.id),
0::bigint
FROM sections s
WHERE `+where+`
`, args...)
return scanSectionRow(row)
}
func (a *app) handleChannels(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
a.listChannels(r.Context(), w, r)
case http.MethodPost:
a.createChannel(r.Context(), w, r)
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
}
}
func (a *app) listChannels(ctx context.Context, w http.ResponseWriter, r *http.Request) {
scope, ok := a.readScope(w, r, false)
if !ok {
return
}
vertical := queryRequired(w, r, "vertical")
if vertical == "" {
return
}
section := strings.TrimSpace(r.URL.Query().Get("section"))
args := []any{vertical}
where := "c.vertical = $1"
if section != "" {
args = append(args, section)
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
}
if !scope.IsAdmin {
var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter
}
rows, err := a.db.Query(ctx, `
SELECT c.id, COALESCE(c.tg_id, src.tg_id), c.source_channel_id,
c.identifier, COALESCE(c.title, src.title), c.vertical, c.section_id, s.slug,
c.is_active, COALESCE(c.last_message_id, src.last_message_id),
COALESCE(c.last_polled_at, src.last_polled_at), c.created_at
FROM channels c
JOIN sections s ON s.id = c.section_id
LEFT JOIN channels src ON src.id = c.source_channel_id
WHERE `+where+`
ORDER BY c.created_at DESC, c.id DESC
`, args...)
if err != nil {
writeDBError(w, err)
return
}
defer rows.Close()
out := []channelOut{}
for rows.Next() {
item, err := scanChannel(rows)
if err != nil {
writeDBError(w, err)
return
}
out = append(out, item)
}
if err := rows.Err(); err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusOK, out)
}
func (a *app) createChannel(ctx context.Context, w http.ResponseWriter, r *http.Request) {
scope, ok := a.readScope(w, r, true)
if !ok {
return
}
var payload struct {
Identifier string `json:"identifier"`
Vertical string `json:"vertical"`
Section string `json:"section"`
}
if !readBody(w, r, &payload) {
return
}
payload.Identifier = strings.TrimSpace(payload.Identifier)
payload.Vertical = strings.TrimSpace(payload.Vertical)
payload.Section = strings.TrimSpace(payload.Section)
if payload.Identifier == "" || payload.Vertical == "" || payload.Section == "" {
writeError(w, http.StatusBadRequest, "identifier, vertical and section are required")
return
}
section, err := a.findSection(ctx, payload.Vertical, payload.Section, scope)
if err != nil {
writeDBError(w, err)
return
}
var sourceID *int64
var existingSource sql.NullInt64
err = a.db.QueryRow(ctx, `
SELECT COALESCE(c.source_channel_id, c.id)
FROM channels c
WHERE c.vertical = $1 AND c.identifier = $2 AND c.section_id <> $3
ORDER BY CASE WHEN c.source_channel_id IS NULL THEN 0 ELSE 1 END, c.id
LIMIT 1
`, payload.Vertical, payload.Identifier, section.ID).Scan(&existingSource)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
writeDBError(w, err)
return
}
if existingSource.Valid {
sourceID = &existingSource.Int64
}
row := a.db.QueryRow(ctx, `
INSERT INTO channels (identifier, vertical, section_id, source_channel_id, is_active)
VALUES ($1, $2, $3, $5, true)
ON CONFLICT ON CONSTRAINT uq_channels_section_identifier DO UPDATE
SET is_active = true,
source_channel_id = COALESCE(channels.source_channel_id, EXCLUDED.source_channel_id)
RETURNING id, tg_id, source_channel_id, identifier, title, vertical, section_id, $4::text, is_active, last_message_id, last_polled_at, created_at
`, payload.Identifier, payload.Vertical, section.ID, section.Slug, sourceID)
item, err := scanChannelRow(row)
if err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusCreated, item)
}
func (a *app) handleChannelItem(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) {
rest := strings.TrimPrefix(path, "/api/v1/channels/")
parts := strings.Split(rest, "/")
if len(parts) == 0 {
writeError(w, http.StatusNotFound, "not found")
return
}
id, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
writeError(w, http.StatusBadRequest, "bad channel id")
return
}
if len(parts) == 2 {
switch parts[1] {
case "poll", "backfill-media":
a.proxyPython(w, r, path)
case "reanalyze":
a.reanalyzeChannel(ctx, w, r, id)
case "stats":
a.channelStats(ctx, w, r, id)
default:
writeError(w, http.StatusNotFound, "not found")
}
return
}
switch r.Method {
case http.MethodGet:
a.getChannel(ctx, w, r, id)
case http.MethodPatch:
a.updateChannel(ctx, w, r, id)
case http.MethodDelete:
a.deleteChannel(ctx, w, r, id)
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
}
}
func (a *app) getChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) {
scope, ok := a.readScope(w, r, false)
if !ok {
return
}
channel, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section"))
if err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusOK, channel)
}
func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) {
scope, ok := a.readScope(w, r, true)
if !ok {
return
}
if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil {
writeDBError(w, err)
return
}
var payload struct {
IsActive *bool `json:"is_active"`
Vertical *string `json:"vertical"`
Section *string `json:"section"`
}
if !readBody(w, r, &payload) {
return
}
set := []string{}
args := []any{}
if payload.IsActive != nil {
args = append(args, *payload.IsActive)
set = append(set, fmt.Sprintf("is_active = $%d", len(args)))
}
if payload.Vertical != nil {
args = append(args, strings.TrimSpace(*payload.Vertical))
set = append(set, fmt.Sprintf("vertical = $%d", len(args)))
}
if payload.Section != nil && strings.TrimSpace(*payload.Section) != "" {
vertical := r.URL.Query().Get("vertical")
if payload.Vertical != nil && strings.TrimSpace(*payload.Vertical) != "" {
vertical = strings.TrimSpace(*payload.Vertical)
}
section, err := a.findSection(ctx, vertical, strings.TrimSpace(*payload.Section), scope)
if err != nil {
writeDBError(w, err)
return
}
args = append(args, section.ID)
set = append(set, fmt.Sprintf("section_id = $%d", len(args)))
}
if len(set) == 0 {
a.getChannel(ctx, w, r, id)
return
}
args = append(args, id)
row := a.db.QueryRow(ctx, `
UPDATE channels
SET `+strings.Join(set, ", ")+`
WHERE id = $`+strconv.Itoa(len(args))+`
RETURNING id, tg_id, source_channel_id, identifier, title, vertical, section_id,
(SELECT slug FROM sections WHERE id = channels.section_id),
is_active, last_message_id, last_polled_at, created_at
`, args...)
item, err := scanChannelRow(row)
if err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusOK, item)
}
func (a *app) deleteChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) {
scope, ok := a.readScope(w, r, true)
if !ok {
return
}
if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil {
writeDBError(w, err)
return
}
if _, err := a.db.Exec(ctx, `DELETE FROM channels WHERE id = $1`, id); err != nil {
writeDBError(w, err)
return
}
w.WriteHeader(http.StatusNoContent)
}
func (a *app) findChannel(ctx context.Context, id int64, scope accessScope, vertical, section string) (channelOut, error) {
args := []any{id}
where := "c.id = $1"
if strings.TrimSpace(vertical) != "" {
args = append(args, strings.TrimSpace(vertical))
where += fmt.Sprintf(" AND c.vertical = $%d", len(args))
}
if strings.TrimSpace(section) != "" {
args = append(args, strings.TrimSpace(section))
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
}
if !scope.IsAdmin {
var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter
}
row := a.db.QueryRow(ctx, `
SELECT c.id, COALESCE(c.tg_id, src.tg_id), c.source_channel_id,
c.identifier, COALESCE(c.title, src.title), c.vertical, c.section_id, s.slug,
c.is_active, COALESCE(c.last_message_id, src.last_message_id),
COALESCE(c.last_polled_at, src.last_polled_at), c.created_at
FROM channels c
JOIN sections s ON s.id = c.section_id
LEFT JOIN channels src ON src.id = c.source_channel_id
WHERE `+where+`
`, args...)
return scanChannelRow(row)
}
func (a *app) channelStats(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) {
scope, ok := a.readScope(w, r, false)
if !ok {
return
}
ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section"))
if err != nil {
writeDBError(w, err)
return
}
var messages, leads int64
key := "lead"
field := "is_listing"
if ch.Vertical == verticalHR {
key = "hr_lead"
field = "is_lead"
}
sourceID := id
if ch.SourceChannelID != nil {
sourceID = *ch.SourceChannelID
}
if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1`, sourceID).Scan(&messages); err != nil {
writeDBError(w, err)
return
}
if err := a.db.QueryRow(ctx, `
SELECT count(*)
FROM messages m
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = $2
WHERE m.channel_id = $1
AND COALESCE(mc.verdict ->> $4, m.extracted -> $3 ->> $4) = 'true'
`, sourceID, ch.SectionID, key, field).Scan(&leads); err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]any{
"channel_id": id,
"source_channel_id": sourceID,
"messages_total": messages,
"leads_total": leads,
"last_polled_at": ch.LastPolledAt,
"last_message_id": ch.LastMessageID,
})
}
func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) {
scope, ok := a.readScope(w, r, true)
if !ok {
return
}
ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section"))
if err != nil {
writeDBError(w, err)
return
}
key := verdictKey(ch.Vertical)
sourceID := id
if ch.SourceChannelID != nil {
sourceID = *ch.SourceChannelID
}
tag, err := a.db.Exec(ctx, `
DELETE FROM message_classifications
WHERE section_id = $1
AND message_id IN (SELECT id FROM messages WHERE channel_id = $2)
`, ch.SectionID, sourceID)
if err != nil {
writeDBError(w, err)
return
}
if _, err := a.db.Exec(ctx, `
UPDATE messages
SET extracted = (
CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END
) - $1
WHERE channel_id = $2
`, key, sourceID); err != nil {
writeDBError(w, err)
return
}
pending, err := a.pendingLLM(ctx, scope, ch.Vertical, valueOrEmpty(ch.SectionSlug))
if err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]int64{"updated": tag.RowsAffected(), "pending": pending})
}
func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
scope, ok := a.readScope(w, r, false)
if !ok {
return
}
vertical := queryRequired(w, r, "vertical")
if vertical == "" {
return
}
q := r.URL.Query()
limit := clampInt(queryInt(q.Get("limit"), 50), 1, 500)
offset := maxInt(queryInt(q.Get("offset"), 0), 0)
args := []any{vertical}
where := "c.vertical = $1"
if section := strings.TrimSpace(q.Get("section")); section != "" {
args = append(args, section)
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
}
if channelID := strings.TrimSpace(q.Get("channel_id")); channelID != "" {
id, err := strconv.ParseInt(channelID, 10, 64)
if err != nil {
writeError(w, http.StatusBadRequest, "bad channel_id")
return
}
args = append(args, id)
where += fmt.Sprintf(" AND c.id = $%d", len(args))
}
if text := strings.TrimSpace(q.Get("q")); text != "" {
args = append(args, "%"+text+"%")
where += fmt.Sprintf(" AND m.text ILIKE $%d", len(args))
}
if q.Get("leads_only") == "true" || q.Get("leads_only") == "1" {
key := "lead"
field := "is_listing"
if vertical == verticalHR {
key = "hr_lead"
field = "is_lead"
}
args = append(args, key, field)
where += fmt.Sprintf(" AND COALESCE(mc.verdict ->> $%d, m.extracted -> $%d ->> $%d) = 'true'", len(args), len(args)-1, len(args))
}
if !scope.IsAdmin {
var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter
}
fetchLimit := clampInt(limit*5, limit, 1000)
args = append(args, fetchLimit, offset)
rows, err := a.db.Query(ctx, `
SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int,
m.date, m.text, m.sender_id, m.sender_username, m.sender_name,
COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media,
COALESCE(
CASE
WHEN mc.verdict IS NULL THEN NULL
ELSE jsonb_build_object(CASE WHEN c.vertical = 'hr' THEN 'hr_lead' ELSE 'lead' END, mc.verdict)
END,
m.extracted,
'null'::jsonb
)::text,
COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at
FROM channels c
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
JOIN messages m ON m.channel_id = src.id
JOIN sections s ON s.id = c.section_id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE `+where+`
ORDER BY m.date DESC, m.id DESC
LIMIT $`+strconv.Itoa(len(args)-1)+` OFFSET $`+strconv.Itoa(len(args))+`
`, args...)
if err != nil {
writeDBError(w, err)
return
}
defer rows.Close()
out := []messageOut{}
for rows.Next() {
item, err := scanMessage(rows)
if err != nil {
writeDBError(w, err)
return
}
a.normalizeMessageMedia(&item)
out = append(out, item)
}
if err := rows.Err(); err != nil {
writeDBError(w, err)
return
}
out = aggregateMessages(out, limit)
writeJSON(w, http.StatusOK, out)
}
func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
scope, ok := a.readScope(w, r, false)
if !ok {
return
}
idRaw := strings.TrimPrefix(path, "/api/v1/messages/")
id, err := strconv.ParseInt(idRaw, 10, 64)
if err != nil {
writeError(w, http.StatusBadRequest, "bad message id")
return
}
args := []any{id}
where := "m.id = $1"
if !scope.IsAdmin {
var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter
}
row := a.db.QueryRow(ctx, `
SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int,
m.date, m.text, m.sender_id, m.sender_username, m.sender_name,
COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media,
COALESCE(
CASE
WHEN mc.verdict IS NULL THEN NULL
ELSE jsonb_build_object(CASE WHEN c.vertical = 'hr' THEN 'hr_lead' ELSE 'lead' END, mc.verdict)
END,
m.extracted,
'null'::jsonb
)::text,
COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at
FROM messages m
JOIN channels src ON src.id = m.channel_id
JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id
JOIN sections s ON s.id = c.section_id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE `+where+`
ORDER BY CASE WHEN c.id = src.id THEN 0 ELSE 1 END
LIMIT 1
`, args...)
item, err := scanMessageRow(row)
if err != nil {
writeDBError(w, err)
return
}
a.normalizeMessageMedia(&item)
writeJSON(w, http.StatusOK, item)
}
func (a *app) handleMedia(w http.ResponseWriter, r *http.Request, path string) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
scope, ok := a.readScope(w, r, false)
if !ok {
return
}
rel, err := url.PathUnescape(strings.TrimPrefix(path, "/api/v1/media/"))
if err != nil {
writeError(w, http.StatusBadRequest, "bad media path")
return
}
clean := strings.TrimPrefix(filepath.Clean("/"+rel), "/")
if clean == "." || clean == "" || strings.HasPrefix(clean, "../") {
writeError(w, http.StatusBadRequest, "bad media path")
return
}
parts := strings.SplitN(clean, string(os.PathSeparator), 2)
channelID, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil || channelID <= 0 {
writeError(w, http.StatusBadRequest, "bad media path")
return
}
allowed, err := a.canReadChannelMedia(r.Context(), scope, channelID)
if err != nil {
writeDBError(w, err)
return
}
if !allowed {
writeError(w, http.StatusNotFound, "not found")
return
}
if a.serveMinioMedia(w, r, clean) {
return
}
base, err := filepath.Abs(a.cfg.MediaDir)
if err != nil {
writeError(w, http.StatusInternalServerError, "media directory unavailable")
return
}
full, err := filepath.Abs(filepath.Join(base, clean))
if err != nil || (full != base && !strings.HasPrefix(full, base+string(os.PathSeparator))) {
writeError(w, http.StatusBadRequest, "bad media path")
return
}
http.ServeFile(w, r, full)
}
func (a *app) serveMinioMedia(w http.ResponseWriter, r *http.Request, key string) bool {
if a.minio == nil || a.cfg.MinioBucket == "" {
return false
}
obj, err := a.minio.GetObject(r.Context(), a.cfg.MinioBucket, key, minio.GetObjectOptions{})
if err != nil {
slog.Warn("minio_get_media_failed", "key", key, "error", err)
return false
}
defer func() {
if err := obj.Close(); err != nil {
slog.Warn("minio_media_close_failed", "key", key, "error", err)
}
}()
info, err := obj.Stat()
if err != nil {
return false
}
if info.ContentType != "" {
w.Header().Set("Content-Type", info.ContentType)
}
if info.Size > 0 {
w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10))
}
if _, err := io.Copy(w, obj); err != nil {
slog.Warn("minio_media_stream_failed", "key", key, "error", err)
}
return true
}
func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channelID int64) (bool, error) {
var allowed bool
err := a.db.QueryRow(ctx, `
SELECT COALESCE(bool_or($3::boolean OR s.department_id::text = ANY($2::text[])), false)
FROM channels c
JOIN sections s ON s.id = c.section_id
WHERE c.id = $1 OR c.source_channel_id = $1
`, channelID, scope.departmentIDs(), scope.IsAdmin).Scan(&allowed)
if errors.Is(err, pgx.ErrNoRows) {
return false, nil
}
if err != nil {
return false, err
}
return allowed, nil
}
func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
scope, ok := a.readScope(w, r, false)
if !ok {
return
}
vertical := queryRequired(w, r, "vertical")
if vertical == "" {
return
}
section := strings.TrimSpace(r.URL.Query().Get("section"))
args := []any{vertical}
where := "c.vertical = $1"
if section != "" {
args = append(args, section)
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
}
if !scope.IsAdmin {
var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter
}
var channelsTotal, channelsActive, messagesTotal, messages24h, leadsTotal, leads24h int64
var lastPoll sql.NullTime
countQuery := `FROM channels c JOIN sections s ON s.id = c.section_id WHERE ` + where
if err := a.db.QueryRow(ctx, `SELECT count(*) `+countQuery, args...).Scan(&channelsTotal); err != nil {
writeDBError(w, err)
return
}
if err := a.db.QueryRow(ctx, `SELECT count(*) `+countQuery+` AND c.is_active = true`, args...).Scan(&channelsActive); err != nil {
writeDBError(w, err)
return
}
if err := a.db.QueryRow(ctx, `SELECT max(c.last_polled_at) `+countQuery, args...).Scan(&lastPoll); err != nil {
writeDBError(w, err)
return
}
messageFrom := `FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE ` + where
if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom, args...).Scan(&messagesTotal); err != nil {
writeDBError(w, err)
return
}
if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+` AND m.fetched_at >= now() - interval '24 hours'`, args...).Scan(&messages24h); err != nil {
writeDBError(w, err)
return
}
key := "lead"
field := "is_listing"
if vertical == verticalHR {
key = "hr_lead"
field = "is_lead"
}
leadArgs := append(append([]any{}, args...), key, field)
leadClause := fmt.Sprintf(" AND COALESCE(mc.verdict ->> $%d, m.extracted -> $%d ->> $%d) = 'true'", len(leadArgs), len(leadArgs)-1, len(leadArgs))
if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause, leadArgs...).Scan(&leadsTotal); err != nil {
writeDBError(w, err)
return
}
if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause+` AND m.fetched_at >= now() - interval '24 hours'`, leadArgs...).Scan(&leads24h); err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]any{
"vertical": vertical,
"section_slug": nullableString(section),
"channels_total": channelsTotal,
"channels_active": channelsActive,
"messages_total": messagesTotal,
"messages_last_24h": messages24h,
"leads_total": leadsTotal,
"leads_last_24h": leads24h,
"poll_interval_seconds": a.cfg.PollIntervalSeconds,
"last_poll_at": nullTime(lastPoll),
})
}
func (a *app) handleLLMStatus(ctx context.Context, w http.ResponseWriter) {
ready := false
var providerError string
model := a.cfg.LLMModel
if a.cfg.LLMEnabled {
status, err := a.ai.ProvidersStatus(ctx)
if err != nil {
providerError = err.Error()
} else {
for _, provider := range status.Providers {
if provider.Name == "llm" {
ready = provider.Configured && provider.OK
providerError = provider.Error
if provider.Model != "" {
model = provider.Model
}
break
}
}
}
}
writeJSON(w, http.StatusOK, map[string]any{
"enabled": a.cfg.LLMEnabled,
"ready": ready,
"base_url": a.cfg.AIServiceURL,
"model": model,
"provider": "ai-service",
"provider_error": providerError,
})
}
func (a *app) handleLLMQueue(ctx context.Context, w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
scope, ok := a.readScope(w, r, false)
if !ok {
return
}
vertical := queryRequired(w, r, "vertical")
if vertical == "" {
return
}
pending, err := a.pendingLLM(ctx, scope, vertical, strings.TrimSpace(r.URL.Query().Get("section")))
if err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]int64{"pending": pending})
}
func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, section string) (int64, error) {
args := []any{vertical}
where := `c.vertical = $1 AND m.text IS NOT NULL AND mc.id IS NULL`
if section != "" {
args = append(args, section)
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
}
if !scope.IsAdmin {
var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter
}
var pending int64
err := a.db.QueryRow(ctx, `
SELECT count(*)
FROM channels c
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
JOIN messages m ON m.channel_id = src.id
JOIN sections s ON s.id = c.section_id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE `+where, args...).Scan(&pending)
return pending, err
}
func (a *app) handlePrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
a.getPrompt(ctx, w, r)
case http.MethodPut:
a.savePrompt(ctx, w, r)
case http.MethodDelete:
a.resetPrompt(ctx, w, r)
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
}
}
func (a *app) getPrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) {
scope, ok := a.readScope(w, r, false)
if !ok {
return
}
vertical := queryRequired(w, r, "vertical")
if vertical == "" {
return
}
section := strings.TrimSpace(r.URL.Query().Get("section"))
deptID, err := a.promptDepartmentID(ctx, scope, vertical, section)
if err != nil {
writeDBError(w, err)
return
}
prompt, source, err := a.resolvePrompt(ctx, deptID, vertical, section)
if err != nil {
writeDBError(w, err)
return
}
overridden, err := a.promptExists(ctx, deptID, vertical, section)
if err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]any{
"vertical": vertical,
"department_id": nullableString(deptID),
"section": nullableString(section),
"prompt": prompt,
"default": defaultPrompt(vertical),
"source": source,
"is_overridden_here": overridden,
})
}
func (a *app) savePrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) {
scope, ok := a.readScope(w, r, true)
if !ok {
return
}
vertical := queryRequired(w, r, "vertical")
if vertical == "" {
return
}
section := strings.TrimSpace(r.URL.Query().Get("section"))
var payload struct {
Prompt string `json:"prompt"`
}
if !readBody(w, r, &payload) {
return
}
text := strings.TrimSpace(payload.Prompt)
if text == "" {
writeError(w, http.StatusBadRequest, "prompt must be a non-empty string")
return
}
if len(text) > 30000 {
writeError(w, http.StatusBadRequest, "prompt is too long (max 30000 chars)")
return
}
deptID, err := a.promptDepartmentID(ctx, scope, vertical, section)
if err != nil {
writeDBError(w, err)
return
}
key := promptKey(deptID, vertical, section)
value, _ := json.Marshal(text)
if _, err := a.db.Exec(ctx, `
INSERT INTO app_settings (key, value, updated_at)
VALUES ($1, $2::jsonb, now())
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = now()
`, key, string(value)); err != nil {
writeDBError(w, err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"saved": true, "vertical": vertical, "department_id": nullableString(deptID), "section": nullableString(section), "length": len(text)})
}
func (a *app) resetPrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) {
scope, ok := a.readScope(w, r, true)
if !ok {
return
}
vertical := queryRequired(w, r, "vertical")
if vertical == "" {
return
}
section := strings.TrimSpace(r.URL.Query().Get("section"))
deptID, err := a.promptDepartmentID(ctx, scope, vertical, section)
if err != nil {
writeDBError(w, err)
return
}
if _, err := a.db.Exec(ctx, `DELETE FROM app_settings WHERE key = $1`, promptKey(deptID, vertical, section)); err != nil {
writeDBError(w, err)
return
}
w.WriteHeader(http.StatusNoContent)
}
func (a *app) promptDepartmentID(ctx context.Context, scope accessScope, vertical, section string) (string, error) {
if strings.TrimSpace(section) == "" {
return scope.primaryDepartmentID(), nil
}
item, err := a.findSection(ctx, vertical, section, scope)
if err != nil {
return "", err
}
return valueOrEmpty(item.DepartmentID), nil
}
func (a *app) resolvePrompt(ctx context.Context, deptID, vertical, section string) (string, string, error) {
keys := []struct {
key string
source string
}{}
if section != "" {
keys = append(keys, struct {
key string
source string
}{promptKey(deptID, vertical, section), "section"})
}
keys = append(keys, struct {
key string
source string
}{promptKey(deptID, vertical, ""), "vertical"})
for _, candidate := range keys {
var text string
err := a.db.QueryRow(ctx, `SELECT value #>> '{}' FROM app_settings WHERE key = $1`, candidate.key).Scan(&text)
if err == nil && strings.TrimSpace(text) != "" {
return text, candidate.source, nil
}
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return "", "", err
}
}
return defaultPrompt(vertical), "default", nil
}
func (a *app) promptExists(ctx context.Context, deptID, vertical, section string) (bool, error) {
var exists bool
err := a.db.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM app_settings WHERE key = $1)`, promptKey(deptID, vertical, section)).Scan(&exists)
return exists, err
}
func (a *app) proxyPython(w http.ResponseWriter, r *http.Request, path string) {
scope := readAccess(r)
if strings.Contains(path, "/auth/") && !scope.CanAuth {
writeError(w, http.StatusNotFound, "not found")
return
}
if (strings.Contains(path, "/poll") || strings.Contains(path, "/backfill-media")) && !scope.CanManage {
writeError(w, http.StatusNotFound, "not found")
return
}
target := strings.TrimRight(a.cfg.PythonBaseURL, "/") + path
if r.URL.RawQuery != "" {
target += "?" + r.URL.RawQuery
}
body, err := io.ReadAll(r.Body)
if err != nil {
writeError(w, http.StatusBadRequest, "bad body")
return
}
req, err := http.NewRequestWithContext(r.Context(), r.Method, target, bytes.NewReader(body))
if err != nil {
writeError(w, http.StatusBadGateway, err.Error())
return
}
req.Header = r.Header.Clone()
resp, err := a.python.Do(req)
if err != nil {
writeError(w, http.StatusBadGateway, err.Error())
return
}
defer resp.Body.Close()
for k, values := range resp.Header {
for _, v := range values {
w.Header().Add(k, v)
}
}
w.WriteHeader(resp.StatusCode)
_, _ = io.Copy(w, resp.Body)
}
func (a *app) readScope(w http.ResponseWriter, r *http.Request, manage bool) (accessScope, bool) {
scope := readAccess(r)
if manage {
if !scope.CanManage {
writeError(w, http.StatusNotFound, "not found")
return scope, false
}
} else if !scope.IsAdmin && len(scope.departmentIDs()) == 0 {
writeError(w, http.StatusForbidden, "department is required")
return scope, false
}
if manage && !scope.IsAdmin && len(scope.departmentIDs()) == 0 {
writeError(w, http.StatusForbidden, "department is required")
return scope, false
}
return scope, true
}
func readAccess(r *http.Request) accessScope {
admin := commonmw.HeaderBool(r, "X-User-Is-Admin")
deptHead := commonmw.HeaderBool(r, "X-User-Is-Department-Head")
canManage := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-Manage")
canAuth := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-Auth")
deptID := strings.TrimSpace(r.Header.Get("X-User-Department-Id"))
deptIDs := commonmw.HeaderCSV(r, "X-User-Department-Ids")
if deptID != "" {
deptIDs = appendUniqueString(deptIDs, deptID)
}
return accessScope{
IsAdmin: admin,
CanManage: admin || deptHead || canManage,
CanAuth: admin || canAuth,
DeptID: deptID,
DeptIDs: deptIDs,
}
}
func appendUniqueString(items []string, value string) []string {
value = strings.TrimSpace(value)
if value == "" {
return items
}
for _, item := range items {
if item == value {
return items
}
}
return append(items, value)
}
func (s accessScope) departmentIDs() []string {
out := make([]string, 0, len(s.DeptIDs)+1)
for _, id := range s.DeptIDs {
out = appendUniqueString(out, id)
}
out = appendUniqueString(out, s.DeptID)
return out
}
func (s accessScope) primaryDepartmentID() string {
if strings.TrimSpace(s.DeptID) != "" {
return strings.TrimSpace(s.DeptID)
}
ids := s.departmentIDs()
if len(ids) == 0 {
return ""
}
return ids[0]
}
func appendDepartmentFilter(args []any, scope accessScope, column string) ([]any, string) {
ids := scope.departmentIDs()
if len(ids) == 0 {
ids = []string{"__no_department_scope__"}
}
args = append(args, ids)
return args, fmt.Sprintf(" AND %s::text = ANY($%d::text[])", column, len(args))
}
type rowScanner interface {
Scan(dest ...any) error
}
func scanSection(rows rowScanner) (sectionOut, error) {
var item sectionOut
var dept, emoji, description string
err := rows.Scan(
&item.ID,
&item.Vertical,
&dept,
&item.Slug,
&item.Title,
&emoji,
&description,
&item.CreatedAt,
&item.ChannelsTotal,
&item.ChannelsActive,
&item.MessagesTotal,
&item.LeadsTotal,
)
item.DepartmentID = nullableString(dept)
item.Emoji = nullableString(emoji)
item.Description = nullableString(description)
return item, err
}
func scanSectionRow(row pgx.Row) (sectionOut, error) {
return scanSection(row)
}
func scanChannel(rows rowScanner) (channelOut, error) {
var item channelOut
var tgID, sourceID, lastMsg sql.NullInt64
var title, slug sql.NullString
var lastPoll sql.NullTime
err := rows.Scan(
&item.ID,
&tgID,
&sourceID,
&item.Identifier,
&title,
&item.Vertical,
&item.SectionID,
&slug,
&item.IsActive,
&lastMsg,
&lastPoll,
&item.CreatedAt,
)
item.TGID = nullInt(tgID)
item.SourceChannelID = nullInt(sourceID)
item.Title = nullString(title)
item.SectionSlug = nullString(slug)
item.LastMessageID = nullInt(lastMsg)
item.LastPolledAt = nullTimePtr(lastPoll)
return item, err
}
func scanChannelRow(row pgx.Row) (channelOut, error) {
return scanChannel(row)
}
func scanMessage(rows rowScanner) (messageOut, error) {
var item messageOut
var vertical, slug string
var grouped, senderID, tgID, views, forwards sql.NullInt64
var text, senderUsername, senderName, identifier sql.NullString
var extractedText, mediaText string
err := rows.Scan(
&item.ID,
&item.ChannelID,
&vertical,
&slug,
&item.TGMessageID,
&grouped,
&item.GroupSize,
&item.Date,
&text,
&senderID,
&senderUsername,
&senderName,
&identifier,
&tgID,
&item.HasMedia,
&extractedText,
&mediaText,
&views,
&forwards,
&item.FetchedAt,
)
item.ChannelVertical = nullableString(vertical)
item.ChannelSectionSlug = nullableString(slug)
item.GroupedID = nullInt(grouped)
item.Text = nullString(text)
item.SenderID = nullInt(senderID)
item.SenderUsername = nullString(senderUsername)
item.SenderName = nullString(senderName)
item.Views = nullInt(views)
item.Forwards = nullInt(forwards)
if extractedText != "" && extractedText != "null" {
item.Extracted = json.RawMessage(extractedText)
}
if mediaText != "" && mediaText != "null" {
_ = json.Unmarshal([]byte(mediaText), &item.MediaFiles)
}
item.PostURL = buildPostURL(identifier.String, tgID, item.TGMessageID)
return item, err
}
func scanMessageRow(row pgx.Row) (messageOut, error) {
return scanMessage(row)
}
func (a *app) normalizeMessageMedia(item *messageOut) {
if len(item.MediaFiles) == 0 {
return
}
for _, file := range item.MediaFiles {
rawURL, _ := file["url"].(string)
rel := mediaRelativePath(rawURL)
if rel == "" {
continue
}
file["url"] = a.mediaURL(rel)
if _, ok := file["name"].(string); !ok {
file["name"] = filepath.Base(rel)
}
}
}
func mediaRelativePath(raw string) string {
raw = strings.TrimSpace(raw)
if raw == "" {
return ""
}
if idx := strings.Index(raw, "/media/"); idx >= 0 {
raw = raw[idx+len("/media/"):]
}
raw = strings.TrimPrefix(raw, "/")
raw = strings.TrimPrefix(raw, "media/")
clean := strings.TrimPrefix(filepath.Clean("/"+raw), "/")
if clean == "." || clean == "" || strings.HasPrefix(clean, "../") {
return ""
}
return clean
}
func (a *app) mediaURL(rel string) string {
base := strings.TrimRight(a.cfg.PublicBasePath, "/")
return base + "/api/v1/media/" + strings.TrimPrefix(rel, "/")
}
func aggregateMessages(items []messageOut, limit int) []messageOut {
out := make([]messageOut, 0, len(items))
index := map[string]int{}
for _, item := range items {
key := messageGroupKey(item)
if pos, ok := index[key]; ok {
group := &out[pos]
group.GroupSize++
group.HasMedia = group.HasMedia || item.HasMedia
group.MediaFiles = append(group.MediaFiles, item.MediaFiles...)
if group.Text == nil && item.Text != nil {
group.Text = item.Text
}
if len(group.Extracted) == 0 && len(item.Extracted) > 0 {
group.Extracted = item.Extracted
}
continue
}
if item.GroupSize < 1 {
item.GroupSize = 1
}
index[key] = len(out)
out = append(out, item)
}
if len(out) > limit {
return out[:limit]
}
return out
}
func messageGroupKey(item messageOut) string {
if item.GroupedID != nil {
return fmt.Sprintf("channel:%d:group:%d", item.ChannelID, *item.GroupedID)
}
return fmt.Sprintf("message:%d", item.ID)
}
func buildPostURL(identifier string, tgID sql.NullInt64, msgID int64) *string {
identifier = strings.TrimSpace(identifier)
if identifier == "" || msgID == 0 {
return nil
}
if strings.HasPrefix(identifier, "https://t.me/") {
base := strings.TrimRight(identifier, "/")
v := base + "/" + strconv.FormatInt(msgID, 10)
return &v
}
if strings.HasPrefix(identifier, "@") {
v := "https://t.me/" + strings.TrimPrefix(identifier, "@") + "/" + strconv.FormatInt(msgID, 10)
return &v
}
if !strings.Contains(identifier, "/") && !strings.Contains(identifier, "+") {
v := "https://t.me/" + identifier + "/" + strconv.FormatInt(msgID, 10)
return &v
}
return nil
}
func readBody(w http.ResponseWriter, r *http.Request, out any) bool {
defer r.Body.Close()
if err := json.NewDecoder(r.Body).Decode(out); err != nil {
writeError(w, http.StatusBadRequest, "invalid json")
return false
}
return true
}
func writeJSON(w http.ResponseWriter, status int, payload any) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(payload)
}
func writeError(w http.ResponseWriter, status int, detail string) {
writeJSON(w, status, map[string]string{"detail": detail})
}
func writeDBError(w http.ResponseWriter, err error) {
if errors.Is(err, pgx.ErrNoRows) {
writeError(w, http.StatusNotFound, "not found")
return
}
slog.Error("api_db_error", "error", err)
writeError(w, http.StatusInternalServerError, "database error")
}
func queryRequired(w http.ResponseWriter, r *http.Request, name string) string {
value := strings.TrimSpace(r.URL.Query().Get(name))
if value == "" {
writeError(w, http.StatusBadRequest, name+" is required")
}
return value
}
func promptKey(deptID, vertical, section string) string {
if deptID == "" {
deptID = "global"
}
if section != "" {
return fmt.Sprintf("llm_system_prompt:%s:%s:%s", deptID, vertical, section)
}
return fmt.Sprintf("llm_system_prompt:%s:%s", deptID, vertical)
}
func verdictKey(vertical string) string {
if vertical == verticalHR {
return "hr_lead"
}
return "lead"
}
func defaultPrompt(vertical string) string {
if vertical == verticalHR {
return defaultHRPrompt
}
return defaultREPrompt
}
func normalizeSlug(raw string) string {
raw = strings.ToLower(strings.TrimSpace(raw))
raw = strings.ReplaceAll(raw, " ", "-")
var b strings.Builder
for _, r := range raw {
if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' || r == '-' {
b.WriteRune(r)
}
}
return b.String()
}
func nullableString(v string) *string {
v = strings.TrimSpace(v)
if v == "" {
return nil
}
return &v
}
func nullString(v sql.NullString) *string {
if !v.Valid || strings.TrimSpace(v.String) == "" {
return nil
}
return &v.String
}
func nullInt(v sql.NullInt64) *int64 {
if !v.Valid {
return nil
}
return &v.Int64
}
func nullTime(v sql.NullTime) any {
if !v.Valid {
return nil
}
return v.Time
}
func nullTimePtr(v sql.NullTime) *time.Time {
if !v.Valid {
return nil
}
return &v.Time
}
func valueOrEmpty(v *string) string {
if v == nil {
return ""
}
return *v
}
func newMinioClient(cfg config) (*minio.Client, error) {
if cfg.MinioEndpoint == "" || cfg.MinioAccessKey == "" || cfg.MinioSecretKey == "" || cfg.MinioBucket == "" {
return nil, nil
}
endpoint := strings.TrimPrefix(strings.TrimPrefix(cfg.MinioEndpoint, "https://"), "http://")
opts := &minio.Options{
Creds: credentials.NewStaticV4(cfg.MinioAccessKey, cfg.MinioSecretKey, ""),
Secure: cfg.MinioUseSSL,
Region: cfg.MinioRegion,
BucketLookup: minio.BucketLookupPath,
}
if cfg.MinioInsecureTLS {
opts.Transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec // optional intra-cluster MinIO mode
}
}
return minio.New(endpoint, opts)
}
func queryInt(raw string, fallback int) int {
if raw == "" {
return fallback
}
n, err := strconv.Atoi(raw)
if err != nil {
return fallback
}
return n
}
func clampInt(n, min, max int) int {
if n < min {
return min
}
if n > max {
return max
}
return n
}
func maxInt(n, min int) int {
if n < min {
return min
}
return n
}
func loadConfig() config {
return config{
APIHost: env("API_HOST", "0.0.0.0"),
APIPort: envInt("API_PORT", 8000),
PublicBasePath: env("PUBLIC_BASE_PATH", ""),
PythonBaseURL: env("PYTHON_BASE_URL", "http://127.0.0.1:8001"),
MediaDir: env("MEDIA_DIR", "/data/media"),
PostgresUser: env("POSTGRES_USER", "parser"),
PostgresPassword: env("POSTGRES_PASSWORD", "parser"),
PostgresDB: env("POSTGRES_DB", "parser"),
PostgresHost: env("POSTGRES_HOST", "db"),
PostgresPort: envInt("POSTGRES_PORT", 5432),
PollIntervalSeconds: envInt("POLL_INTERVAL_SECONDS", 60),
LLMEnabled: envBool("LLM_ENABLED", true),
LLMModel: env("LLM_MODEL", "qwen2.5-14b"),
LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second,
AIServiceURL: env("AI_SERVICE_URL", ""),
AIServiceToken: env("AI_SERVICE_TOKEN", ""),
InternalAPIKey: env("INTERNAL_API_KEY", env("PORTAL_INTERNAL_API_KEY", "")),
MinioEndpoint: env("MINIO_ENDPOINT", ""),
MinioAccessKey: env("MINIO_ACCESS_KEY", ""),
MinioSecretKey: env("MINIO_SECRET_KEY", ""),
MinioBucket: env("MINIO_BUCKET", "monitoring-tg-media"),
MinioUseSSL: envBool("MINIO_USE_SSL", true),
MinioRegion: env("MINIO_REGION", "us-east-1"),
MinioInsecureTLS: envBool("MINIO_INSECURE_SKIP_VERIFY", false),
}
}
func (c config) databaseURL() string {
return fmt.Sprintf(
"postgres://%s:%s@%s:%d/%s",
url.QueryEscape(c.PostgresUser),
url.QueryEscape(c.PostgresPassword),
c.PostgresHost,
c.PostgresPort,
url.QueryEscape(c.PostgresDB),
)
}
func env(key, fallback string) string {
if v := strings.TrimSpace(os.Getenv(key)); v != "" {
return v
}
return fallback
}
func envInt(key string, fallback int) int {
if raw := strings.TrimSpace(os.Getenv(key)); raw != "" {
if n, err := strconv.Atoi(raw); err == nil {
return n
}
}
return fallback
}
func envBool(key string, fallback bool) bool {
if raw := strings.TrimSpace(os.Getenv(key)); raw != "" {
if b, err := strconv.ParseBool(raw); err == nil {
return b
}
if raw == "1" {
return true
}
if raw == "0" {
return false
}
}
return fallback
}
const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала.
Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости.
Учитывай только сделки с недвижимостью: квартиры, дома, апартаменты, участки, коммерческие помещения и другие объекты недвижимости.
Любые продажи или заявки по стройматериалам, мебели, бытовой/строительной технике, автомобилям, услугам, оборудованию и прочим товарам не являются лидами недвижимости — для них is_listing=false.
Отвечай строго валидным JSON без markdown:
{
"is_listing": boolean,
"kind": "sale" | "rent" | "purchase" | null,
"property_type": string | null,
"rooms": string | null,
"area_m2": number | null,
"price_text": string | null,
"price_value": number | null,
"currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null,
"location": string | null,
"contact_phone": string | null,
"contact_name": string | null,
"summary": string,
"confidence": number
}
summary всегда по-русски, confidence в диапазоне 0..1.`
const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала.
Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт.
Отвечай строго валидным JSON без markdown:
{
"is_lead": boolean,
"kind": "vacancy" | "resume" | "contact" | null,
"title": string | null,
"company": string | null,
"candidate_name": string | null,
"experience_years": number | null,
"skills": string[],
"location": string | null,
"remote": boolean | null,
"employment_type": "full-time" | "part-time" | "contract" | "internship" | null,
"salary_text": string | null,
"salary_value": number | null,
"currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null,
"contact_phone": string | null,
"contact_name": string | null,
"summary": string,
"confidence": number
}
summary всегда по-русски, confidence в диапазоне 0..1.`