package main import ( "bytes" "context" "crypto/tls" "database/sql" "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" "net/url" "os" "os/signal" "path/filepath" "strconv" "strings" "syscall" "time" "monitoring-tg/internal/aiservice" "monitoring-tg/internal/dbretry" commonmw "gitea.estateliga.work/admin/portal-common/middleware" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" ) const ( verticalHR = "hr" ) type config struct { APIHost string APIPort int PublicBasePath string PythonBaseURL string MediaDir string PostgresUser string PostgresPassword string PostgresDB string PostgresHost string PostgresPort int PollIntervalSeconds int LLMEnabled bool LLMModel string LLMTimeout time.Duration AIServiceURL string AIServiceToken string InternalAPIKey string MinioEndpoint string MinioAccessKey string MinioSecretKey string MinioBucket string MinioUseSSL bool MinioRegion string MinioInsecureTLS bool } type app struct { cfg config db *pgxpool.Pool http *http.Client python *http.Client minio *minio.Client ai *aiservice.Client } type accessScope struct { IsAdmin bool CanManage bool CanAuth bool DeptID string DeptIDs []string } type sectionOut struct { ID int64 `json:"id"` Vertical string `json:"vertical"` DepartmentID *string `json:"department_id,omitempty"` Slug string `json:"slug"` Title string `json:"title"` Emoji *string `json:"emoji,omitempty"` Description *string `json:"description,omitempty"` CreatedAt time.Time `json:"created_at"` ChannelsTotal int64 `json:"channels_total"` ChannelsActive int64 `json:"channels_active"` MessagesTotal int64 `json:"messages_total"` LeadsTotal int64 `json:"leads_total"` } type channelOut struct { ID int64 `json:"id"` TGID *int64 `json:"tg_id,omitempty"` SourceChannelID *int64 `json:"source_channel_id,omitempty"` Identifier string `json:"identifier"` Title *string `json:"title,omitempty"` Vertical string `json:"vertical"` SectionID int64 `json:"section_id"` SectionSlug *string `json:"section_slug,omitempty"` IsActive bool `json:"is_active"` LastMessageID *int64 `json:"last_message_id,omitempty"` LastPolledAt *time.Time `json:"last_polled_at,omitempty"` CreatedAt time.Time `json:"created_at"` } type messageOut struct { ID int64 `json:"id"` ChannelID int64 `json:"channel_id"` ChannelVertical *string `json:"channel_vertical,omitempty"` ChannelSectionSlug *string `json:"channel_section_slug,omitempty"` TGMessageID int64 `json:"tg_message_id"` GroupedID *int64 `json:"grouped_id,omitempty"` GroupSize int `json:"group_size"` Date time.Time `json:"date"` Text *string `json:"text,omitempty"` SenderID *int64 `json:"sender_id,omitempty"` SenderUsername *string `json:"sender_username,omitempty"` SenderName *string `json:"sender_name,omitempty"` PostURL *string `json:"post_url,omitempty"` HasMedia bool `json:"has_media"` MediaFiles []map[string]any `json:"media_files,omitempty"` Extracted json.RawMessage `json:"extracted,omitempty"` Views *int64 `json:"views,omitempty"` Forwards *int64 `json:"forwards,omitempty"` FetchedAt time.Time `json:"fetched_at"` } type componentProbe struct { Name string `json:"name"` Status string `json:"status"` LatencyMs int64 `json:"latency_ms"` Error string `json:"error,omitempty"` } func main() { cfg := loadConfig() logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) slog.SetDefault(logger) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() pool, err := dbretry.Connect(ctx, cfg.databaseURL(), 2*time.Minute) if err != nil { slog.Error("db_connect_failed", "error", err) os.Exit(1) } defer pool.Close() minioClient, err := newMinioClient(cfg) if err != nil { slog.Error("minio_init_failed", "error", err) os.Exit(1) } srvApp := &app{ cfg: cfg, db: pool, http: &http.Client{Timeout: cfg.LLMTimeout}, python: &http.Client{Timeout: 15 * time.Minute}, minio: minioClient, ai: aiservice.New(cfg.AIServiceURL, cfg.AIServiceToken, cfg.LLMTimeout), } server := &http.Server{ Addr: fmt.Sprintf("%s:%d", cfg.APIHost, cfg.APIPort), Handler: http.HandlerFunc(srvApp.serveHTTP), ReadHeaderTimeout: 10 * time.Second, } go func() { <-ctx.Done() shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _ = server.Shutdown(shutdownCtx) }() slog.Info("go_api_started", "addr", server.Addr, "python_base_url", cfg.PythonBaseURL) if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { slog.Error("server_failed", "error", err) os.Exit(1) } } func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) { path := a.apiPath(r.URL.Path) if path == "/healthz" { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) return } if path == "/health/detail" { a.handleHealthDetail(w, r) return } if path == "/" { writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-tg", "ui": "portal", "api": "go"}) return } if !strings.HasPrefix(path, "/api/v1") { writeError(w, http.StatusNotFound, "not found") return } commonmw.InternalAuth(a.cfg.InternalAPIKey)(http.HandlerFunc(a.serveAPI)).ServeHTTP(w, r) } func (a *app) serveAPI(w http.ResponseWriter, r *http.Request) { path := a.apiPath(r.URL.Path) ctx := r.Context() switch { case r.Method == http.MethodGet && path == "/api/v1/access/me": a.handleAccessMe(w, r) case path == "/api/v1/auth/status" || strings.HasPrefix(path, "/api/v1/auth/"): a.proxyPython(w, r, path) case path == "/api/v1/sections": a.handleSections(w, r) case strings.HasPrefix(path, "/api/v1/sections/"): a.handleSectionItem(w, r, path) case path == "/api/v1/channels": a.handleChannels(w, r) case strings.HasPrefix(path, "/api/v1/channels/"): a.handleChannelItem(ctx, w, r, path) case strings.HasPrefix(path, "/api/v1/media/"): a.handleMedia(w, r, path) case path == "/api/v1/messages": a.handleMessages(ctx, w, r) case strings.HasPrefix(path, "/api/v1/messages/"): a.handleMessageItem(ctx, w, r, path) case path == "/api/v1/stats": a.handleStats(ctx, w, r) case path == "/api/v1/llm/status": a.handleLLMStatus(ctx, w) case path == "/api/v1/llm/queue": a.handleLLMQueue(ctx, w, r) case path == "/api/v1/llm/prompt": a.handlePrompt(ctx, w, r) case r.Method == http.MethodPost && path == "/api/v1/poll": a.proxyPython(w, r, path) default: writeError(w, http.StatusNotFound, "not found") } } func (a *app) apiPath(path string) string { base := strings.TrimRight(a.cfg.PublicBasePath, "/") if base != "" && strings.HasPrefix(path, base+"/") { return strings.TrimPrefix(path, base) } if base != "" && path == base { return "/" } return path } func (a *app) handleHealthDetail(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second) defer cancel() components := []componentProbe{ a.probePostgres(ctx), a.probeAIService(ctx), a.probeClassificationQueue(ctx), a.probePoller(ctx), a.probeMediaStorage(ctx), a.probeMediaMetadata(ctx), } writeJSON(w, http.StatusOK, map[string]any{"components": components}) } func (a *app) probePostgres(ctx context.Context) componentProbe { start := time.Now() if err := a.db.Ping(ctx); err != nil { return componentProbe{Name: "postgres", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} } return componentProbe{Name: "postgres", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} } func (a *app) probeAIService(ctx context.Context) componentProbe { start := time.Now() if !a.cfg.LLMEnabled { return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm disabled"} } status, err := a.ai.ProvidersStatus(ctx) if err != nil { return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} } for _, provider := range status.Providers { if provider.Name != "llm" { continue } if provider.Configured && provider.OK { return componentProbe{Name: "ai_service", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} } errMsg := strings.TrimSpace(provider.Error) if errMsg == "" { errMsg = "llm provider is not ready" } return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: errMsg} } return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm provider not found"} } func (a *app) probeClassificationQueue(ctx context.Context) componentProbe { start := time.Now() var pending, pending24h int64 err := a.db.QueryRow(ctx, ` SELECT COUNT(*)::bigint, COUNT(*) FILTER (WHERE m.fetched_at >= now() - interval '24 hours')::bigint FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE m.text IS NOT NULL AND mc.id IS NULL`, ).Scan(&pending, &pending24h) if err != nil { return componentProbe{Name: "classification_queue", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} } if pending > 0 { return componentProbe{ Name: "classification_queue", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "pending=" + strconv.FormatInt(pending, 10) + " pending_24h=" + strconv.FormatInt(pending24h, 10), } } return componentProbe{Name: "classification_queue", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} } func (a *app) probePoller(ctx context.Context) componentProbe { start := time.Now() staleAfter := maxInt(a.cfg.PollIntervalSeconds*3, 900) var active, neverPolled, stale int64 err := a.db.QueryRow(ctx, ` SELECT COUNT(*)::bigint, COUNT(*) FILTER (WHERE last_polled_at IS NULL)::bigint, COUNT(*) FILTER (WHERE last_polled_at IS NOT NULL AND last_polled_at < now() - ($1::int * interval '1 second'))::bigint FROM channels WHERE is_active = true AND source_channel_id IS NULL`, staleAfter, ).Scan(&active, &neverPolled, &stale) if err != nil { return componentProbe{Name: "poller", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} } if active > 0 && (neverPolled > 0 || stale > 0) { return componentProbe{ Name: "poller", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "active=" + strconv.FormatInt(active, 10) + " never_polled=" + strconv.FormatInt(neverPolled, 10) + " stale=" + strconv.FormatInt(stale, 10) + " stale_after_sec=" + strconv.Itoa(staleAfter), } } return componentProbe{Name: "poller", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} } func (a *app) probeMediaStorage(ctx context.Context) componentProbe { start := time.Now() if a.minio == nil || a.cfg.MinioBucket == "" { return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "minio not configured"} } exists, err := a.minio.BucketExists(ctx, a.cfg.MinioBucket) if err != nil { return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} } if !exists { return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "bucket not found: " + a.cfg.MinioBucket} } return componentProbe{Name: "media_storage", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} } func (a *app) probeMediaMetadata(ctx context.Context) componentProbe { start := time.Now() var withMedia, missingFiles int64 err := a.db.QueryRow(ctx, ` SELECT COUNT(*) FILTER (WHERE has_media = true)::bigint, COUNT(*) FILTER ( WHERE has_media = true AND jsonb_array_length(COALESCE(media_files, '[]'::jsonb)) = 0 )::bigint FROM messages`, ).Scan(&withMedia, &missingFiles) if err != nil { return componentProbe{Name: "media_metadata", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} } if missingFiles > 0 { return componentProbe{ Name: "media_metadata", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "messages_with_media=" + strconv.FormatInt(withMedia, 10) + " missing_media_files=" + strconv.FormatInt(missingFiles, 10), } } return componentProbe{Name: "media_metadata", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} } func (a *app) handleAccessMe(w http.ResponseWriter, r *http.Request) { scope := readAccess(r) writeJSON(w, http.StatusOK, map[string]any{ "is_admin": scope.IsAdmin, "can_manage_department": scope.CanManage, "can_auth_telegram": scope.CanAuth, "department_id": nullableString(scope.DeptID), "department_ids": scope.departmentIDs(), }) } func (a *app) handleSections(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: a.listSections(r.Context(), w, r) case http.MethodPost: a.createSection(r.Context(), w, r) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) listSections(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } args := []any{vertical} deptFilter := "" if !scope.IsAdmin { args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") } rows, err := a.db.Query(ctx, ` SELECT s.id, s.vertical, COALESCE(s.department_id, ''), s.slug, s.title, COALESCE(s.emoji, ''), COALESCE(s.description, ''), s.created_at, (SELECT count(*) FROM channels c WHERE c.section_id = s.id), (SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true), (SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id WHERE c.section_id = s.id), (SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE c.section_id = s.id AND ( (s.vertical = 'hr' AND COALESCE(mc.verdict->>'is_lead', m.extracted->'hr_lead'->>'is_lead') = 'true') OR (s.vertical <> 'hr' AND COALESCE(mc.verdict->>'is_listing', m.extracted->'lead'->>'is_listing') = 'true') )) FROM sections s WHERE s.vertical = $1`+deptFilter+` ORDER BY s.created_at ASC, s.id ASC `, args...) if err != nil { writeDBError(w, err) return } defer rows.Close() out := []sectionOut{} for rows.Next() { item, err := scanSection(rows) if err != nil { writeDBError(w, err) return } out = append(out, item) } if err := rows.Err(); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, out) } func (a *app) createSection(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } var payload struct { Vertical string `json:"vertical"` Slug string `json:"slug"` Title string `json:"title"` Emoji *string `json:"emoji"` Description *string `json:"description"` } if !readBody(w, r, &payload) { return } payload.Vertical = strings.TrimSpace(payload.Vertical) payload.Slug = normalizeSlug(payload.Slug) payload.Title = strings.TrimSpace(payload.Title) if payload.Vertical == "" || payload.Slug == "" || payload.Title == "" { writeError(w, http.StatusBadRequest, "vertical, slug and title are required") return } dept := nullableString(scope.primaryDepartmentID()) row := a.db.QueryRow(ctx, ` INSERT INTO sections (vertical, department_id, slug, title, emoji, description) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at, 0::bigint, 0::bigint, 0::bigint, 0::bigint `, payload.Vertical, dept, payload.Slug, payload.Title, payload.Emoji, payload.Description) item, err := scanSectionRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusCreated, item) } func (a *app) handleSectionItem(w http.ResponseWriter, r *http.Request, path string) { rest := strings.TrimPrefix(path, "/api/v1/sections/") parts := strings.SplitN(rest, "/", 2) if len(parts) != 2 { writeError(w, http.StatusNotFound, "not found") return } vertical, err := url.PathUnescape(parts[0]) if err != nil { writeError(w, http.StatusBadRequest, "bad vertical") return } slug, err := url.PathUnescape(parts[1]) if err != nil { writeError(w, http.StatusBadRequest, "bad slug") return } switch r.Method { case http.MethodGet: a.getSection(r.Context(), w, r, vertical, slug) case http.MethodPatch: a.updateSection(r.Context(), w, r, vertical, slug) case http.MethodDelete: a.deleteSection(r.Context(), w, r, vertical, slug) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) getSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { scope, ok := a.readScope(w, r, false) if !ok { return } item, err := a.findSection(ctx, vertical, slug, scope) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, item) } func (a *app) updateSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { scope, ok := a.readScope(w, r, true) if !ok { return } var payload struct { Title *string `json:"title"` Emoji *string `json:"emoji"` Description *string `json:"description"` } if !readBody(w, r, &payload) { return } set := []string{} args := []any{} if payload.Title != nil { args = append(args, strings.TrimSpace(*payload.Title)) set = append(set, fmt.Sprintf("title = $%d", len(args))) } if payload.Emoji != nil { args = append(args, nullableString(strings.TrimSpace(*payload.Emoji))) set = append(set, fmt.Sprintf("emoji = $%d", len(args))) } if payload.Description != nil { args = append(args, nullableString(strings.TrimSpace(*payload.Description))) set = append(set, fmt.Sprintf("description = $%d", len(args))) } if len(set) == 0 { a.getSection(ctx, w, r, vertical, slug) return } args = append(args, vertical, slug) where := fmt.Sprintf("vertical = $%d AND slug = $%d", len(args)-1, len(args)) if !scope.IsAdmin { var deptFilter string args, deptFilter = appendDepartmentFilter(args, scope, "department_id") where += deptFilter } row := a.db.QueryRow(ctx, ` UPDATE sections SET `+strings.Join(set, ", ")+` WHERE `+where+` RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at, (SELECT count(*) FROM channels c WHERE c.section_id = sections.id), (SELECT count(*) FROM channels c WHERE c.section_id = sections.id AND c.is_active = true), (SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id WHERE c.section_id = sections.id), 0::bigint `, args...) item, err := scanSectionRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, item) } func (a *app) deleteSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { scope, ok := a.readScope(w, r, true) if !ok { return } section, err := a.findSection(ctx, vertical, slug, scope) if err != nil { writeDBError(w, err) return } var count int64 if err := a.db.QueryRow(ctx, `SELECT count(*) FROM channels WHERE section_id = $1`, section.ID).Scan(&count); err != nil { writeDBError(w, err) return } if count > 0 { writeError(w, http.StatusBadRequest, fmt.Sprintf("section has %d channels - move or delete them first", count)) return } if _, err := a.db.Exec(ctx, `DELETE FROM sections WHERE id = $1`, section.ID); err != nil { writeDBError(w, err) return } w.WriteHeader(http.StatusNoContent) } func (a *app) findSection(ctx context.Context, vertical, slug string, scope accessScope) (sectionOut, error) { args := []any{vertical, slug} where := "s.vertical = $1 AND s.slug = $2" if !scope.IsAdmin { var deptFilter string args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") where += deptFilter } row := a.db.QueryRow(ctx, ` SELECT s.id, s.vertical, COALESCE(s.department_id, ''), s.slug, s.title, COALESCE(s.emoji, ''), COALESCE(s.description, ''), s.created_at, (SELECT count(*) FROM channels c WHERE c.section_id = s.id), (SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true), (SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id WHERE c.section_id = s.id), 0::bigint FROM sections s WHERE `+where+` `, args...) return scanSectionRow(row) } func (a *app) handleChannels(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: a.listChannels(r.Context(), w, r) case http.MethodPost: a.createChannel(r.Context(), w, r) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) listChannels(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) args := []any{vertical} where := "c.vertical = $1" if section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { var deptFilter string args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") where += deptFilter } rows, err := a.db.Query(ctx, ` SELECT c.id, COALESCE(c.tg_id, src.tg_id), c.source_channel_id, c.identifier, COALESCE(c.title, src.title), c.vertical, c.section_id, s.slug, c.is_active, COALESCE(c.last_message_id, src.last_message_id), COALESCE(c.last_polled_at, src.last_polled_at), c.created_at FROM channels c JOIN sections s ON s.id = c.section_id LEFT JOIN channels src ON src.id = c.source_channel_id WHERE `+where+` ORDER BY c.created_at DESC, c.id DESC `, args...) if err != nil { writeDBError(w, err) return } defer rows.Close() out := []channelOut{} for rows.Next() { item, err := scanChannel(rows) if err != nil { writeDBError(w, err) return } out = append(out, item) } if err := rows.Err(); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, out) } func (a *app) createChannel(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } var payload struct { Identifier string `json:"identifier"` Vertical string `json:"vertical"` Section string `json:"section"` } if !readBody(w, r, &payload) { return } payload.Identifier = strings.TrimSpace(payload.Identifier) payload.Vertical = strings.TrimSpace(payload.Vertical) payload.Section = strings.TrimSpace(payload.Section) if payload.Identifier == "" || payload.Vertical == "" || payload.Section == "" { writeError(w, http.StatusBadRequest, "identifier, vertical and section are required") return } section, err := a.findSection(ctx, payload.Vertical, payload.Section, scope) if err != nil { writeDBError(w, err) return } var sourceID *int64 var existingSource sql.NullInt64 err = a.db.QueryRow(ctx, ` SELECT COALESCE(c.source_channel_id, c.id) FROM channels c WHERE c.vertical = $1 AND c.identifier = $2 AND c.section_id <> $3 ORDER BY CASE WHEN c.source_channel_id IS NULL THEN 0 ELSE 1 END, c.id LIMIT 1 `, payload.Vertical, payload.Identifier, section.ID).Scan(&existingSource) if err != nil && !errors.Is(err, pgx.ErrNoRows) { writeDBError(w, err) return } if existingSource.Valid { sourceID = &existingSource.Int64 } row := a.db.QueryRow(ctx, ` INSERT INTO channels (identifier, vertical, section_id, source_channel_id, is_active) VALUES ($1, $2, $3, $5, true) ON CONFLICT ON CONSTRAINT uq_channels_section_identifier DO UPDATE SET is_active = true, source_channel_id = COALESCE(channels.source_channel_id, EXCLUDED.source_channel_id) RETURNING id, tg_id, source_channel_id, identifier, title, vertical, section_id, $4::text, is_active, last_message_id, last_polled_at, created_at `, payload.Identifier, payload.Vertical, section.ID, section.Slug, sourceID) item, err := scanChannelRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusCreated, item) } func (a *app) handleChannelItem(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) { rest := strings.TrimPrefix(path, "/api/v1/channels/") parts := strings.Split(rest, "/") if len(parts) == 0 { writeError(w, http.StatusNotFound, "not found") return } id, err := strconv.ParseInt(parts[0], 10, 64) if err != nil { writeError(w, http.StatusBadRequest, "bad channel id") return } if len(parts) == 2 { switch parts[1] { case "poll", "backfill-media": a.proxyPython(w, r, path) case "reanalyze": a.reanalyzeChannel(ctx, w, r, id) case "stats": a.channelStats(ctx, w, r, id) default: writeError(w, http.StatusNotFound, "not found") } return } switch r.Method { case http.MethodGet: a.getChannel(ctx, w, r, id) case http.MethodPatch: a.updateChannel(ctx, w, r, id) case http.MethodDelete: a.deleteChannel(ctx, w, r, id) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) getChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, false) if !ok { return } channel, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, channel) } func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, true) if !ok { return } if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil { writeDBError(w, err) return } var payload struct { IsActive *bool `json:"is_active"` Vertical *string `json:"vertical"` Section *string `json:"section"` } if !readBody(w, r, &payload) { return } set := []string{} args := []any{} if payload.IsActive != nil { args = append(args, *payload.IsActive) set = append(set, fmt.Sprintf("is_active = $%d", len(args))) } if payload.Vertical != nil { args = append(args, strings.TrimSpace(*payload.Vertical)) set = append(set, fmt.Sprintf("vertical = $%d", len(args))) } if payload.Section != nil && strings.TrimSpace(*payload.Section) != "" { vertical := r.URL.Query().Get("vertical") if payload.Vertical != nil && strings.TrimSpace(*payload.Vertical) != "" { vertical = strings.TrimSpace(*payload.Vertical) } section, err := a.findSection(ctx, vertical, strings.TrimSpace(*payload.Section), scope) if err != nil { writeDBError(w, err) return } args = append(args, section.ID) set = append(set, fmt.Sprintf("section_id = $%d", len(args))) } if len(set) == 0 { a.getChannel(ctx, w, r, id) return } args = append(args, id) row := a.db.QueryRow(ctx, ` UPDATE channels SET `+strings.Join(set, ", ")+` WHERE id = $`+strconv.Itoa(len(args))+` RETURNING id, tg_id, source_channel_id, identifier, title, vertical, section_id, (SELECT slug FROM sections WHERE id = channels.section_id), is_active, last_message_id, last_polled_at, created_at `, args...) item, err := scanChannelRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, item) } func (a *app) deleteChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, true) if !ok { return } if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil { writeDBError(w, err) return } if _, err := a.db.Exec(ctx, `DELETE FROM channels WHERE id = $1`, id); err != nil { writeDBError(w, err) return } w.WriteHeader(http.StatusNoContent) } func (a *app) findChannel(ctx context.Context, id int64, scope accessScope, vertical, section string) (channelOut, error) { args := []any{id} where := "c.id = $1" if strings.TrimSpace(vertical) != "" { args = append(args, strings.TrimSpace(vertical)) where += fmt.Sprintf(" AND c.vertical = $%d", len(args)) } if strings.TrimSpace(section) != "" { args = append(args, strings.TrimSpace(section)) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { var deptFilter string args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") where += deptFilter } row := a.db.QueryRow(ctx, ` SELECT c.id, COALESCE(c.tg_id, src.tg_id), c.source_channel_id, c.identifier, COALESCE(c.title, src.title), c.vertical, c.section_id, s.slug, c.is_active, COALESCE(c.last_message_id, src.last_message_id), COALESCE(c.last_polled_at, src.last_polled_at), c.created_at FROM channels c JOIN sections s ON s.id = c.section_id LEFT JOIN channels src ON src.id = c.source_channel_id WHERE `+where+` `, args...) return scanChannelRow(row) } func (a *app) channelStats(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, false) if !ok { return } ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) if err != nil { writeDBError(w, err) return } var messages, leads int64 key := "lead" field := "is_listing" if ch.Vertical == verticalHR { key = "hr_lead" field = "is_lead" } sourceID := id if ch.SourceChannelID != nil { sourceID = *ch.SourceChannelID } if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1`, sourceID).Scan(&messages); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, ` SELECT count(*) FROM messages m LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = $2 WHERE m.channel_id = $1 AND COALESCE(mc.verdict ->> $4, m.extracted -> $3 ->> $4) = 'true' `, sourceID, ch.SectionID, key, field).Scan(&leads); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{ "channel_id": id, "source_channel_id": sourceID, "messages_total": messages, "leads_total": leads, "last_polled_at": ch.LastPolledAt, "last_message_id": ch.LastMessageID, }) } func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, true) if !ok { return } ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) if err != nil { writeDBError(w, err) return } key := verdictKey(ch.Vertical) sourceID := id if ch.SourceChannelID != nil { sourceID = *ch.SourceChannelID } tag, err := a.db.Exec(ctx, ` DELETE FROM message_classifications WHERE section_id = $1 AND message_id IN (SELECT id FROM messages WHERE channel_id = $2) `, ch.SectionID, sourceID) if err != nil { writeDBError(w, err) return } if _, err := a.db.Exec(ctx, ` UPDATE messages SET extracted = ( CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END ) - $1 WHERE channel_id = $2 `, key, sourceID); err != nil { writeDBError(w, err) return } pending, err := a.pendingLLM(ctx, scope, ch.Vertical, valueOrEmpty(ch.SectionSlug)) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]int64{"updated": tag.RowsAffected(), "pending": pending}) } func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } q := r.URL.Query() limit := clampInt(queryInt(q.Get("limit"), 50), 1, 500) offset := maxInt(queryInt(q.Get("offset"), 0), 0) args := []any{vertical} where := "c.vertical = $1" if section := strings.TrimSpace(q.Get("section")); section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if channelID := strings.TrimSpace(q.Get("channel_id")); channelID != "" { id, err := strconv.ParseInt(channelID, 10, 64) if err != nil { writeError(w, http.StatusBadRequest, "bad channel_id") return } args = append(args, id) where += fmt.Sprintf(" AND c.id = $%d", len(args)) } if text := strings.TrimSpace(q.Get("q")); text != "" { args = append(args, "%"+text+"%") where += fmt.Sprintf(" AND m.text ILIKE $%d", len(args)) } if q.Get("leads_only") == "true" || q.Get("leads_only") == "1" { key := "lead" field := "is_listing" if vertical == verticalHR { key = "hr_lead" field = "is_lead" } args = append(args, key, field) where += fmt.Sprintf(" AND COALESCE(mc.verdict ->> $%d, m.extracted -> $%d ->> $%d) = 'true'", len(args), len(args)-1, len(args)) } if !scope.IsAdmin { var deptFilter string args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") where += deptFilter } fetchLimit := clampInt(limit*5, limit, 1000) args = append(args, fetchLimit, offset) rows, err := a.db.Query(ctx, ` SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, COALESCE( CASE WHEN mc.verdict IS NULL THEN NULL ELSE jsonb_build_object(CASE WHEN c.vertical = 'hr' THEN 'hr_lead' ELSE 'lead' END, mc.verdict) END, m.extracted, 'null'::jsonb )::text, COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE `+where+` ORDER BY m.date DESC, m.id DESC LIMIT $`+strconv.Itoa(len(args)-1)+` OFFSET $`+strconv.Itoa(len(args))+` `, args...) if err != nil { writeDBError(w, err) return } defer rows.Close() out := []messageOut{} for rows.Next() { item, err := scanMessage(rows) if err != nil { writeDBError(w, err) return } a.normalizeMessageMedia(&item) out = append(out, item) } if err := rows.Err(); err != nil { writeDBError(w, err) return } out = aggregateMessages(out, limit) writeJSON(w, http.StatusOK, out) } func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } idRaw := strings.TrimPrefix(path, "/api/v1/messages/") id, err := strconv.ParseInt(idRaw, 10, 64) if err != nil { writeError(w, http.StatusBadRequest, "bad message id") return } args := []any{id} where := "m.id = $1" if !scope.IsAdmin { var deptFilter string args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") where += deptFilter } row := a.db.QueryRow(ctx, ` SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, COALESCE( CASE WHEN mc.verdict IS NULL THEN NULL ELSE jsonb_build_object(CASE WHEN c.vertical = 'hr' THEN 'hr_lead' ELSE 'lead' END, mc.verdict) END, m.extracted, 'null'::jsonb )::text, COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at FROM messages m JOIN channels src ON src.id = m.channel_id JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id JOIN sections s ON s.id = c.section_id LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE `+where+` ORDER BY CASE WHEN c.id = src.id THEN 0 ELSE 1 END LIMIT 1 `, args...) item, err := scanMessageRow(row) if err != nil { writeDBError(w, err) return } a.normalizeMessageMedia(&item) writeJSON(w, http.StatusOK, item) } func (a *app) handleMedia(w http.ResponseWriter, r *http.Request, path string) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } rel, err := url.PathUnescape(strings.TrimPrefix(path, "/api/v1/media/")) if err != nil { writeError(w, http.StatusBadRequest, "bad media path") return } clean := strings.TrimPrefix(filepath.Clean("/"+rel), "/") if clean == "." || clean == "" || strings.HasPrefix(clean, "../") { writeError(w, http.StatusBadRequest, "bad media path") return } parts := strings.SplitN(clean, string(os.PathSeparator), 2) channelID, err := strconv.ParseInt(parts[0], 10, 64) if err != nil || channelID <= 0 { writeError(w, http.StatusBadRequest, "bad media path") return } allowed, err := a.canReadChannelMedia(r.Context(), scope, channelID) if err != nil { writeDBError(w, err) return } if !allowed { writeError(w, http.StatusNotFound, "not found") return } if a.serveMinioMedia(w, r, clean) { return } base, err := filepath.Abs(a.cfg.MediaDir) if err != nil { writeError(w, http.StatusInternalServerError, "media directory unavailable") return } full, err := filepath.Abs(filepath.Join(base, clean)) if err != nil || (full != base && !strings.HasPrefix(full, base+string(os.PathSeparator))) { writeError(w, http.StatusBadRequest, "bad media path") return } http.ServeFile(w, r, full) } func (a *app) serveMinioMedia(w http.ResponseWriter, r *http.Request, key string) bool { if a.minio == nil || a.cfg.MinioBucket == "" { return false } obj, err := a.minio.GetObject(r.Context(), a.cfg.MinioBucket, key, minio.GetObjectOptions{}) if err != nil { slog.Warn("minio_get_media_failed", "key", key, "error", err) return false } defer func() { if err := obj.Close(); err != nil { slog.Warn("minio_media_close_failed", "key", key, "error", err) } }() info, err := obj.Stat() if err != nil { return false } if info.ContentType != "" { w.Header().Set("Content-Type", info.ContentType) } if info.Size > 0 { w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10)) } if _, err := io.Copy(w, obj); err != nil { slog.Warn("minio_media_stream_failed", "key", key, "error", err) } return true } func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channelID int64) (bool, error) { var allowed bool err := a.db.QueryRow(ctx, ` SELECT COALESCE(bool_or($3::boolean OR s.department_id::text = ANY($2::text[])), false) FROM channels c JOIN sections s ON s.id = c.section_id WHERE c.id = $1 OR c.source_channel_id = $1 `, channelID, scope.departmentIDs(), scope.IsAdmin).Scan(&allowed) if errors.Is(err, pgx.ErrNoRows) { return false, nil } if err != nil { return false, err } return allowed, nil } func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) args := []any{vertical} where := "c.vertical = $1" if section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { var deptFilter string args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") where += deptFilter } var channelsTotal, channelsActive, messagesTotal, messages24h, leadsTotal, leads24h int64 var lastPoll sql.NullTime countQuery := `FROM channels c JOIN sections s ON s.id = c.section_id WHERE ` + where if err := a.db.QueryRow(ctx, `SELECT count(*) `+countQuery, args...).Scan(&channelsTotal); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) `+countQuery+` AND c.is_active = true`, args...).Scan(&channelsActive); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT max(c.last_polled_at) `+countQuery, args...).Scan(&lastPoll); err != nil { writeDBError(w, err) return } messageFrom := `FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE ` + where if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom, args...).Scan(&messagesTotal); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+` AND m.fetched_at >= now() - interval '24 hours'`, args...).Scan(&messages24h); err != nil { writeDBError(w, err) return } key := "lead" field := "is_listing" if vertical == verticalHR { key = "hr_lead" field = "is_lead" } leadArgs := append(append([]any{}, args...), key, field) leadClause := fmt.Sprintf(" AND COALESCE(mc.verdict ->> $%d, m.extracted -> $%d ->> $%d) = 'true'", len(leadArgs), len(leadArgs)-1, len(leadArgs)) if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause, leadArgs...).Scan(&leadsTotal); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause+` AND m.fetched_at >= now() - interval '24 hours'`, leadArgs...).Scan(&leads24h); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{ "vertical": vertical, "section_slug": nullableString(section), "channels_total": channelsTotal, "channels_active": channelsActive, "messages_total": messagesTotal, "messages_last_24h": messages24h, "leads_total": leadsTotal, "leads_last_24h": leads24h, "poll_interval_seconds": a.cfg.PollIntervalSeconds, "last_poll_at": nullTime(lastPoll), }) } func (a *app) handleLLMStatus(ctx context.Context, w http.ResponseWriter) { ready := false var providerError string model := a.cfg.LLMModel if a.cfg.LLMEnabled { status, err := a.ai.ProvidersStatus(ctx) if err != nil { providerError = err.Error() } else { for _, provider := range status.Providers { if provider.Name == "llm" { ready = provider.Configured && provider.OK providerError = provider.Error if provider.Model != "" { model = provider.Model } break } } } } writeJSON(w, http.StatusOK, map[string]any{ "enabled": a.cfg.LLMEnabled, "ready": ready, "base_url": a.cfg.AIServiceURL, "model": model, "provider": "ai-service", "provider_error": providerError, }) } func (a *app) handleLLMQueue(ctx context.Context, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } pending, err := a.pendingLLM(ctx, scope, vertical, strings.TrimSpace(r.URL.Query().Get("section"))) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]int64{"pending": pending}) } func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, section string) (int64, error) { args := []any{vertical} where := `c.vertical = $1 AND m.text IS NOT NULL AND mc.id IS NULL` if section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { var deptFilter string args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") where += deptFilter } var pending int64 err := a.db.QueryRow(ctx, ` SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id WHERE `+where, args...).Scan(&pending) return pending, err } func (a *app) handlePrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: a.getPrompt(ctx, w, r) case http.MethodPut: a.savePrompt(ctx, w, r) case http.MethodDelete: a.resetPrompt(ctx, w, r) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) getPrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) deptID, err := a.promptDepartmentID(ctx, scope, vertical, section) if err != nil { writeDBError(w, err) return } prompt, source, err := a.resolvePrompt(ctx, deptID, vertical, section) if err != nil { writeDBError(w, err) return } overridden, err := a.promptExists(ctx, deptID, vertical, section) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{ "vertical": vertical, "department_id": nullableString(deptID), "section": nullableString(section), "prompt": prompt, "default": defaultPrompt(vertical), "source": source, "is_overridden_here": overridden, }) } func (a *app) savePrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) var payload struct { Prompt string `json:"prompt"` } if !readBody(w, r, &payload) { return } text := strings.TrimSpace(payload.Prompt) if text == "" { writeError(w, http.StatusBadRequest, "prompt must be a non-empty string") return } if len(text) > 30000 { writeError(w, http.StatusBadRequest, "prompt is too long (max 30000 chars)") return } deptID, err := a.promptDepartmentID(ctx, scope, vertical, section) if err != nil { writeDBError(w, err) return } key := promptKey(deptID, vertical, section) value, _ := json.Marshal(text) if _, err := a.db.Exec(ctx, ` INSERT INTO app_settings (key, value, updated_at) VALUES ($1, $2::jsonb, now()) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = now() `, key, string(value)); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{"saved": true, "vertical": vertical, "department_id": nullableString(deptID), "section": nullableString(section), "length": len(text)}) } func (a *app) resetPrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) deptID, err := a.promptDepartmentID(ctx, scope, vertical, section) if err != nil { writeDBError(w, err) return } if _, err := a.db.Exec(ctx, `DELETE FROM app_settings WHERE key = $1`, promptKey(deptID, vertical, section)); err != nil { writeDBError(w, err) return } w.WriteHeader(http.StatusNoContent) } func (a *app) promptDepartmentID(ctx context.Context, scope accessScope, vertical, section string) (string, error) { if strings.TrimSpace(section) == "" { return scope.primaryDepartmentID(), nil } item, err := a.findSection(ctx, vertical, section, scope) if err != nil { return "", err } return valueOrEmpty(item.DepartmentID), nil } func (a *app) resolvePrompt(ctx context.Context, deptID, vertical, section string) (string, string, error) { keys := []struct { key string source string }{} if section != "" { keys = append(keys, struct { key string source string }{promptKey(deptID, vertical, section), "section"}) } keys = append(keys, struct { key string source string }{promptKey(deptID, vertical, ""), "vertical"}) for _, candidate := range keys { var text string err := a.db.QueryRow(ctx, `SELECT value #>> '{}' FROM app_settings WHERE key = $1`, candidate.key).Scan(&text) if err == nil && strings.TrimSpace(text) != "" { return text, candidate.source, nil } if err != nil && !errors.Is(err, pgx.ErrNoRows) { return "", "", err } } return defaultPrompt(vertical), "default", nil } func (a *app) promptExists(ctx context.Context, deptID, vertical, section string) (bool, error) { var exists bool err := a.db.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM app_settings WHERE key = $1)`, promptKey(deptID, vertical, section)).Scan(&exists) return exists, err } func (a *app) proxyPython(w http.ResponseWriter, r *http.Request, path string) { scope := readAccess(r) if strings.Contains(path, "/auth/") && !scope.CanAuth { writeError(w, http.StatusNotFound, "not found") return } if (strings.Contains(path, "/poll") || strings.Contains(path, "/backfill-media")) && !scope.CanManage { writeError(w, http.StatusNotFound, "not found") return } target := strings.TrimRight(a.cfg.PythonBaseURL, "/") + path if r.URL.RawQuery != "" { target += "?" + r.URL.RawQuery } body, err := io.ReadAll(r.Body) if err != nil { writeError(w, http.StatusBadRequest, "bad body") return } req, err := http.NewRequestWithContext(r.Context(), r.Method, target, bytes.NewReader(body)) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } req.Header = r.Header.Clone() resp, err := a.python.Do(req) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } defer resp.Body.Close() for k, values := range resp.Header { for _, v := range values { w.Header().Add(k, v) } } w.WriteHeader(resp.StatusCode) _, _ = io.Copy(w, resp.Body) } func (a *app) readScope(w http.ResponseWriter, r *http.Request, manage bool) (accessScope, bool) { scope := readAccess(r) if manage { if !scope.CanManage { writeError(w, http.StatusNotFound, "not found") return scope, false } } else if !scope.IsAdmin && len(scope.departmentIDs()) == 0 { writeError(w, http.StatusForbidden, "department is required") return scope, false } if manage && !scope.IsAdmin && len(scope.departmentIDs()) == 0 { writeError(w, http.StatusForbidden, "department is required") return scope, false } return scope, true } func readAccess(r *http.Request) accessScope { admin := commonmw.HeaderBool(r, "X-User-Is-Admin") deptHead := commonmw.HeaderBool(r, "X-User-Is-Department-Head") canManage := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-Manage") canAuth := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-Auth") deptID := strings.TrimSpace(r.Header.Get("X-User-Department-Id")) deptIDs := commonmw.HeaderCSV(r, "X-User-Department-Ids") if deptID != "" { deptIDs = appendUniqueString(deptIDs, deptID) } return accessScope{ IsAdmin: admin, CanManage: admin || deptHead || canManage, CanAuth: admin || canAuth, DeptID: deptID, DeptIDs: deptIDs, } } func appendUniqueString(items []string, value string) []string { value = strings.TrimSpace(value) if value == "" { return items } for _, item := range items { if item == value { return items } } return append(items, value) } func (s accessScope) departmentIDs() []string { out := make([]string, 0, len(s.DeptIDs)+1) for _, id := range s.DeptIDs { out = appendUniqueString(out, id) } out = appendUniqueString(out, s.DeptID) return out } func (s accessScope) primaryDepartmentID() string { if strings.TrimSpace(s.DeptID) != "" { return strings.TrimSpace(s.DeptID) } ids := s.departmentIDs() if len(ids) == 0 { return "" } return ids[0] } func appendDepartmentFilter(args []any, scope accessScope, column string) ([]any, string) { ids := scope.departmentIDs() if len(ids) == 0 { ids = []string{"__no_department_scope__"} } args = append(args, ids) return args, fmt.Sprintf(" AND %s::text = ANY($%d::text[])", column, len(args)) } type rowScanner interface { Scan(dest ...any) error } func scanSection(rows rowScanner) (sectionOut, error) { var item sectionOut var dept, emoji, description string err := rows.Scan( &item.ID, &item.Vertical, &dept, &item.Slug, &item.Title, &emoji, &description, &item.CreatedAt, &item.ChannelsTotal, &item.ChannelsActive, &item.MessagesTotal, &item.LeadsTotal, ) item.DepartmentID = nullableString(dept) item.Emoji = nullableString(emoji) item.Description = nullableString(description) return item, err } func scanSectionRow(row pgx.Row) (sectionOut, error) { return scanSection(row) } func scanChannel(rows rowScanner) (channelOut, error) { var item channelOut var tgID, sourceID, lastMsg sql.NullInt64 var title, slug sql.NullString var lastPoll sql.NullTime err := rows.Scan( &item.ID, &tgID, &sourceID, &item.Identifier, &title, &item.Vertical, &item.SectionID, &slug, &item.IsActive, &lastMsg, &lastPoll, &item.CreatedAt, ) item.TGID = nullInt(tgID) item.SourceChannelID = nullInt(sourceID) item.Title = nullString(title) item.SectionSlug = nullString(slug) item.LastMessageID = nullInt(lastMsg) item.LastPolledAt = nullTimePtr(lastPoll) return item, err } func scanChannelRow(row pgx.Row) (channelOut, error) { return scanChannel(row) } func scanMessage(rows rowScanner) (messageOut, error) { var item messageOut var vertical, slug string var grouped, senderID, tgID, views, forwards sql.NullInt64 var text, senderUsername, senderName, identifier sql.NullString var extractedText, mediaText string err := rows.Scan( &item.ID, &item.ChannelID, &vertical, &slug, &item.TGMessageID, &grouped, &item.GroupSize, &item.Date, &text, &senderID, &senderUsername, &senderName, &identifier, &tgID, &item.HasMedia, &extractedText, &mediaText, &views, &forwards, &item.FetchedAt, ) item.ChannelVertical = nullableString(vertical) item.ChannelSectionSlug = nullableString(slug) item.GroupedID = nullInt(grouped) item.Text = nullString(text) item.SenderID = nullInt(senderID) item.SenderUsername = nullString(senderUsername) item.SenderName = nullString(senderName) item.Views = nullInt(views) item.Forwards = nullInt(forwards) if extractedText != "" && extractedText != "null" { item.Extracted = json.RawMessage(extractedText) } if mediaText != "" && mediaText != "null" { _ = json.Unmarshal([]byte(mediaText), &item.MediaFiles) } item.PostURL = buildPostURL(identifier.String, tgID, item.TGMessageID) return item, err } func scanMessageRow(row pgx.Row) (messageOut, error) { return scanMessage(row) } func (a *app) normalizeMessageMedia(item *messageOut) { if len(item.MediaFiles) == 0 { return } for _, file := range item.MediaFiles { rawURL, _ := file["url"].(string) rel := mediaRelativePath(rawURL) if rel == "" { continue } file["url"] = a.mediaURL(rel) if _, ok := file["name"].(string); !ok { file["name"] = filepath.Base(rel) } } } func mediaRelativePath(raw string) string { raw = strings.TrimSpace(raw) if raw == "" { return "" } if idx := strings.Index(raw, "/media/"); idx >= 0 { raw = raw[idx+len("/media/"):] } raw = strings.TrimPrefix(raw, "/") raw = strings.TrimPrefix(raw, "media/") clean := strings.TrimPrefix(filepath.Clean("/"+raw), "/") if clean == "." || clean == "" || strings.HasPrefix(clean, "../") { return "" } return clean } func (a *app) mediaURL(rel string) string { base := strings.TrimRight(a.cfg.PublicBasePath, "/") return base + "/api/v1/media/" + strings.TrimPrefix(rel, "/") } func aggregateMessages(items []messageOut, limit int) []messageOut { out := make([]messageOut, 0, len(items)) index := map[string]int{} for _, item := range items { key := messageGroupKey(item) if pos, ok := index[key]; ok { group := &out[pos] group.GroupSize++ group.HasMedia = group.HasMedia || item.HasMedia group.MediaFiles = append(group.MediaFiles, item.MediaFiles...) if group.Text == nil && item.Text != nil { group.Text = item.Text } if len(group.Extracted) == 0 && len(item.Extracted) > 0 { group.Extracted = item.Extracted } continue } if item.GroupSize < 1 { item.GroupSize = 1 } index[key] = len(out) out = append(out, item) } if len(out) > limit { return out[:limit] } return out } func messageGroupKey(item messageOut) string { if item.GroupedID != nil { return fmt.Sprintf("channel:%d:group:%d", item.ChannelID, *item.GroupedID) } return fmt.Sprintf("message:%d", item.ID) } func buildPostURL(identifier string, tgID sql.NullInt64, msgID int64) *string { identifier = strings.TrimSpace(identifier) if identifier == "" || msgID == 0 { return nil } if strings.HasPrefix(identifier, "https://t.me/") { base := strings.TrimRight(identifier, "/") v := base + "/" + strconv.FormatInt(msgID, 10) return &v } if strings.HasPrefix(identifier, "@") { v := "https://t.me/" + strings.TrimPrefix(identifier, "@") + "/" + strconv.FormatInt(msgID, 10) return &v } if !strings.Contains(identifier, "/") && !strings.Contains(identifier, "+") { v := "https://t.me/" + identifier + "/" + strconv.FormatInt(msgID, 10) return &v } return nil } func readBody(w http.ResponseWriter, r *http.Request, out any) bool { defer r.Body.Close() if err := json.NewDecoder(r.Body).Decode(out); err != nil { writeError(w, http.StatusBadRequest, "invalid json") return false } return true } func writeJSON(w http.ResponseWriter, status int, payload any) { w.Header().Set("Content-Type", "application/json; charset=utf-8") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(payload) } func writeError(w http.ResponseWriter, status int, detail string) { writeJSON(w, status, map[string]string{"detail": detail}) } func writeDBError(w http.ResponseWriter, err error) { if errors.Is(err, pgx.ErrNoRows) { writeError(w, http.StatusNotFound, "not found") return } slog.Error("api_db_error", "error", err) writeError(w, http.StatusInternalServerError, "database error") } func queryRequired(w http.ResponseWriter, r *http.Request, name string) string { value := strings.TrimSpace(r.URL.Query().Get(name)) if value == "" { writeError(w, http.StatusBadRequest, name+" is required") } return value } func promptKey(deptID, vertical, section string) string { if deptID == "" { deptID = "global" } if section != "" { return fmt.Sprintf("llm_system_prompt:%s:%s:%s", deptID, vertical, section) } return fmt.Sprintf("llm_system_prompt:%s:%s", deptID, vertical) } func verdictKey(vertical string) string { if vertical == verticalHR { return "hr_lead" } return "lead" } func defaultPrompt(vertical string) string { if vertical == verticalHR { return defaultHRPrompt } return defaultREPrompt } func normalizeSlug(raw string) string { raw = strings.ToLower(strings.TrimSpace(raw)) raw = strings.ReplaceAll(raw, " ", "-") var b strings.Builder for _, r := range raw { if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' || r == '-' { b.WriteRune(r) } } return b.String() } func nullableString(v string) *string { v = strings.TrimSpace(v) if v == "" { return nil } return &v } func nullString(v sql.NullString) *string { if !v.Valid || strings.TrimSpace(v.String) == "" { return nil } return &v.String } func nullInt(v sql.NullInt64) *int64 { if !v.Valid { return nil } return &v.Int64 } func nullTime(v sql.NullTime) any { if !v.Valid { return nil } return v.Time } func nullTimePtr(v sql.NullTime) *time.Time { if !v.Valid { return nil } return &v.Time } func valueOrEmpty(v *string) string { if v == nil { return "" } return *v } func newMinioClient(cfg config) (*minio.Client, error) { if cfg.MinioEndpoint == "" || cfg.MinioAccessKey == "" || cfg.MinioSecretKey == "" || cfg.MinioBucket == "" { return nil, nil } endpoint := strings.TrimPrefix(strings.TrimPrefix(cfg.MinioEndpoint, "https://"), "http://") opts := &minio.Options{ Creds: credentials.NewStaticV4(cfg.MinioAccessKey, cfg.MinioSecretKey, ""), Secure: cfg.MinioUseSSL, Region: cfg.MinioRegion, BucketLookup: minio.BucketLookupPath, } if cfg.MinioInsecureTLS { opts.Transport = &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec // optional intra-cluster MinIO mode } } return minio.New(endpoint, opts) } func queryInt(raw string, fallback int) int { if raw == "" { return fallback } n, err := strconv.Atoi(raw) if err != nil { return fallback } return n } func clampInt(n, min, max int) int { if n < min { return min } if n > max { return max } return n } func maxInt(n, min int) int { if n < min { return min } return n } func loadConfig() config { return config{ APIHost: env("API_HOST", "0.0.0.0"), APIPort: envInt("API_PORT", 8000), PublicBasePath: env("PUBLIC_BASE_PATH", ""), PythonBaseURL: env("PYTHON_BASE_URL", "http://127.0.0.1:8001"), MediaDir: env("MEDIA_DIR", "/data/media"), PostgresUser: env("POSTGRES_USER", "parser"), PostgresPassword: env("POSTGRES_PASSWORD", "parser"), PostgresDB: env("POSTGRES_DB", "parser"), PostgresHost: env("POSTGRES_HOST", "db"), PostgresPort: envInt("POSTGRES_PORT", 5432), PollIntervalSeconds: envInt("POLL_INTERVAL_SECONDS", 60), LLMEnabled: envBool("LLM_ENABLED", true), LLMModel: env("LLM_MODEL", "qwen2.5-14b"), LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second, AIServiceURL: env("AI_SERVICE_URL", ""), AIServiceToken: env("AI_SERVICE_TOKEN", ""), InternalAPIKey: env("INTERNAL_API_KEY", env("PORTAL_INTERNAL_API_KEY", "")), MinioEndpoint: env("MINIO_ENDPOINT", ""), MinioAccessKey: env("MINIO_ACCESS_KEY", ""), MinioSecretKey: env("MINIO_SECRET_KEY", ""), MinioBucket: env("MINIO_BUCKET", "monitoring-tg-media"), MinioUseSSL: envBool("MINIO_USE_SSL", true), MinioRegion: env("MINIO_REGION", "us-east-1"), MinioInsecureTLS: envBool("MINIO_INSECURE_SKIP_VERIFY", false), } } func (c config) databaseURL() string { return fmt.Sprintf( "postgres://%s:%s@%s:%d/%s", url.QueryEscape(c.PostgresUser), url.QueryEscape(c.PostgresPassword), c.PostgresHost, c.PostgresPort, url.QueryEscape(c.PostgresDB), ) } func env(key, fallback string) string { if v := strings.TrimSpace(os.Getenv(key)); v != "" { return v } return fallback } func envInt(key string, fallback int) int { if raw := strings.TrimSpace(os.Getenv(key)); raw != "" { if n, err := strconv.Atoi(raw); err == nil { return n } } return fallback } func envBool(key string, fallback bool) bool { if raw := strings.TrimSpace(os.Getenv(key)); raw != "" { if b, err := strconv.ParseBool(raw); err == nil { return b } if raw == "1" { return true } if raw == "0" { return false } } return fallback } const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала. Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости. Учитывай только сделки с недвижимостью: квартиры, дома, апартаменты, участки, коммерческие помещения и другие объекты недвижимости. Любые продажи или заявки по стройматериалам, мебели, бытовой/строительной технике, автомобилям, услугам, оборудованию и прочим товарам не являются лидами недвижимости — для них is_listing=false. Отвечай строго валидным JSON без markdown: { "is_listing": boolean, "kind": "sale" | "rent" | "purchase" | null, "property_type": string | null, "rooms": string | null, "area_m2": number | null, "price_text": string | null, "price_value": number | null, "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, "location": string | null, "contact_phone": string | null, "contact_name": string | null, "summary": string, "confidence": number } summary всегда по-русски, confidence в диапазоне 0..1.` const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала. Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт. Отвечай строго валидным JSON без markdown: { "is_lead": boolean, "kind": "vacancy" | "resume" | "contact" | null, "title": string | null, "company": string | null, "candidate_name": string | null, "experience_years": number | null, "skills": string[], "location": string | null, "remote": boolean | null, "employment_type": "full-time" | "part-time" | "contract" | "internship" | null, "salary_text": string | null, "salary_value": number | null, "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, "contact_phone": string | null, "contact_name": string | null, "summary": string, "confidence": number } summary всегда по-русски, confidence в диапазоне 0..1.`