package main import ( "bytes" "context" "crypto/tls" "database/sql" "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" "net/url" "os" "os/signal" "path/filepath" "strconv" "strings" "syscall" "time" "monitoring-tg/internal/aiservice" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" ) const ( verticalHR = "hr" ) type config struct { APIHost string APIPort int PublicBasePath string PythonBaseURL string MediaDir string PostgresUser string PostgresPassword string PostgresDB string PostgresHost string PostgresPort int PollIntervalSeconds int LLMEnabled bool LLMModel string LLMTimeout time.Duration AIServiceURL string AIServiceToken string MinioEndpoint string MinioAccessKey string MinioSecretKey string MinioBucket string MinioUseSSL bool MinioRegion string MinioInsecureTLS bool } type app struct { cfg config db *pgxpool.Pool http *http.Client python *http.Client minio *minio.Client ai *aiservice.Client } type accessScope struct { IsAdmin bool CanManage bool CanAuth bool DeptID string } type sectionOut struct { ID int64 `json:"id"` Vertical string `json:"vertical"` DepartmentID *string `json:"department_id,omitempty"` Slug string `json:"slug"` Title string `json:"title"` Emoji *string `json:"emoji,omitempty"` Description *string `json:"description,omitempty"` CreatedAt time.Time `json:"created_at"` ChannelsTotal int64 `json:"channels_total"` ChannelsActive int64 `json:"channels_active"` MessagesTotal int64 `json:"messages_total"` LeadsTotal int64 `json:"leads_total"` } type channelOut struct { ID int64 `json:"id"` TGID *int64 `json:"tg_id,omitempty"` SourceChannelID *int64 `json:"source_channel_id,omitempty"` Identifier string `json:"identifier"` Title *string `json:"title,omitempty"` Vertical string `json:"vertical"` SectionID int64 `json:"section_id"` SectionSlug *string `json:"section_slug,omitempty"` IsActive bool `json:"is_active"` LastMessageID *int64 `json:"last_message_id,omitempty"` LastPolledAt *time.Time `json:"last_polled_at,omitempty"` CreatedAt time.Time `json:"created_at"` } type messageOut struct { ID int64 `json:"id"` ChannelID int64 `json:"channel_id"` ChannelVertical *string `json:"channel_vertical,omitempty"` ChannelSectionSlug *string `json:"channel_section_slug,omitempty"` TGMessageID int64 `json:"tg_message_id"` GroupedID *int64 `json:"grouped_id,omitempty"` GroupSize int `json:"group_size"` Date time.Time `json:"date"` Text *string `json:"text,omitempty"` SenderID *int64 `json:"sender_id,omitempty"` SenderUsername *string `json:"sender_username,omitempty"` SenderName *string `json:"sender_name,omitempty"` PostURL *string `json:"post_url,omitempty"` HasMedia bool `json:"has_media"` MediaFiles []map[string]any `json:"media_files,omitempty"` Extracted json.RawMessage `json:"extracted,omitempty"` Views *int64 `json:"views,omitempty"` Forwards *int64 `json:"forwards,omitempty"` FetchedAt time.Time `json:"fetched_at"` } func main() { cfg := loadConfig() logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) slog.SetDefault(logger) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() pool, err := pgxpool.New(ctx, cfg.databaseURL()) if err != nil { slog.Error("db_connect_failed", "error", err) os.Exit(1) } defer pool.Close() minioClient, err := newMinioClient(cfg) if err != nil { slog.Error("minio_init_failed", "error", err) os.Exit(1) } srvApp := &app{ cfg: cfg, db: pool, http: &http.Client{Timeout: cfg.LLMTimeout}, python: &http.Client{Timeout: 15 * time.Minute}, minio: minioClient, ai: aiservice.New(cfg.AIServiceURL, cfg.AIServiceToken, cfg.LLMTimeout), } server := &http.Server{ Addr: fmt.Sprintf("%s:%d", cfg.APIHost, cfg.APIPort), Handler: http.HandlerFunc(srvApp.serveHTTP), ReadHeaderTimeout: 10 * time.Second, } go func() { <-ctx.Done() shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _ = server.Shutdown(shutdownCtx) }() slog.Info("go_api_started", "addr", server.Addr, "python_base_url", cfg.PythonBaseURL) if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { slog.Error("server_failed", "error", err) os.Exit(1) } } func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) { path := a.apiPath(r.URL.Path) if path == "/healthz" { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) return } if path == "/" { writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-tg", "ui": "portal", "api": "go"}) return } if !strings.HasPrefix(path, "/api/v1") { writeError(w, http.StatusNotFound, "not found") return } ctx := r.Context() switch { case r.Method == http.MethodGet && path == "/api/v1/access/me": a.handleAccessMe(w, r) case path == "/api/v1/auth/status" || strings.HasPrefix(path, "/api/v1/auth/"): a.proxyPython(w, r, path) case path == "/api/v1/sections": a.handleSections(w, r) case strings.HasPrefix(path, "/api/v1/sections/"): a.handleSectionItem(w, r, path) case path == "/api/v1/channels": a.handleChannels(w, r) case strings.HasPrefix(path, "/api/v1/channels/"): a.handleChannelItem(ctx, w, r, path) case strings.HasPrefix(path, "/api/v1/media/"): a.handleMedia(w, r, path) case path == "/api/v1/messages": a.handleMessages(ctx, w, r) case strings.HasPrefix(path, "/api/v1/messages/"): a.handleMessageItem(ctx, w, r, path) case path == "/api/v1/stats": a.handleStats(ctx, w, r) case path == "/api/v1/llm/status": a.handleLLMStatus(ctx, w) case path == "/api/v1/llm/queue": a.handleLLMQueue(ctx, w, r) case path == "/api/v1/llm/prompt": a.handlePrompt(ctx, w, r) case r.Method == http.MethodPost && path == "/api/v1/poll": a.proxyPython(w, r, path) default: writeError(w, http.StatusNotFound, "not found") } } func (a *app) apiPath(path string) string { base := strings.TrimRight(a.cfg.PublicBasePath, "/") if base != "" && strings.HasPrefix(path, base+"/") { return strings.TrimPrefix(path, base) } if base != "" && path == base { return "/" } return path } func (a *app) handleAccessMe(w http.ResponseWriter, r *http.Request) { scope := readAccess(r) writeJSON(w, http.StatusOK, map[string]any{ "is_admin": scope.IsAdmin, "can_manage_department": scope.CanManage, "can_auth_telegram": scope.CanAuth, "department_id": nullableString(scope.DeptID), }) } func (a *app) handleSections(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: a.listSections(r.Context(), w, r) case http.MethodPost: a.createSection(r.Context(), w, r) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) listSections(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } args := []any{vertical} deptFilter := "" if !scope.IsAdmin { args = append(args, scope.DeptID) deptFilter = fmt.Sprintf(" AND s.department_id = $%d", len(args)) } rows, err := a.db.Query(ctx, ` SELECT s.id, s.vertical, COALESCE(s.department_id, ''), s.slug, s.title, COALESCE(s.emoji, ''), COALESCE(s.description, ''), s.created_at, (SELECT count(*) FROM channels c WHERE c.section_id = s.id), (SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true), (SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id WHERE c.section_id = s.id), (SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id WHERE c.section_id = s.id AND ( (s.vertical = 'hr' AND m.extracted->'hr_lead'->>'is_lead' = 'true') OR (s.vertical <> 'hr' AND m.extracted->'lead'->>'is_listing' = 'true') )) FROM sections s WHERE s.vertical = $1`+deptFilter+` ORDER BY s.created_at ASC, s.id ASC `, args...) if err != nil { writeDBError(w, err) return } defer rows.Close() out := []sectionOut{} for rows.Next() { item, err := scanSection(rows) if err != nil { writeDBError(w, err) return } out = append(out, item) } if err := rows.Err(); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, out) } func (a *app) createSection(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } var payload struct { Vertical string `json:"vertical"` Slug string `json:"slug"` Title string `json:"title"` Emoji *string `json:"emoji"` Description *string `json:"description"` } if !readBody(w, r, &payload) { return } payload.Vertical = strings.TrimSpace(payload.Vertical) payload.Slug = normalizeSlug(payload.Slug) payload.Title = strings.TrimSpace(payload.Title) if payload.Vertical == "" || payload.Slug == "" || payload.Title == "" { writeError(w, http.StatusBadRequest, "vertical, slug and title are required") return } dept := nullableString(scope.DeptID) row := a.db.QueryRow(ctx, ` INSERT INTO sections (vertical, department_id, slug, title, emoji, description) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at, 0::bigint, 0::bigint, 0::bigint, 0::bigint `, payload.Vertical, dept, payload.Slug, payload.Title, payload.Emoji, payload.Description) item, err := scanSectionRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusCreated, item) } func (a *app) handleSectionItem(w http.ResponseWriter, r *http.Request, path string) { rest := strings.TrimPrefix(path, "/api/v1/sections/") parts := strings.SplitN(rest, "/", 2) if len(parts) != 2 { writeError(w, http.StatusNotFound, "not found") return } vertical, err := url.PathUnescape(parts[0]) if err != nil { writeError(w, http.StatusBadRequest, "bad vertical") return } slug, err := url.PathUnescape(parts[1]) if err != nil { writeError(w, http.StatusBadRequest, "bad slug") return } switch r.Method { case http.MethodGet: a.getSection(r.Context(), w, r, vertical, slug) case http.MethodPatch: a.updateSection(r.Context(), w, r, vertical, slug) case http.MethodDelete: a.deleteSection(r.Context(), w, r, vertical, slug) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) getSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { scope, ok := a.readScope(w, r, false) if !ok { return } item, err := a.findSection(ctx, vertical, slug, scope) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, item) } func (a *app) updateSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { scope, ok := a.readScope(w, r, true) if !ok { return } var payload struct { Title *string `json:"title"` Emoji *string `json:"emoji"` Description *string `json:"description"` } if !readBody(w, r, &payload) { return } set := []string{} args := []any{} if payload.Title != nil { args = append(args, strings.TrimSpace(*payload.Title)) set = append(set, fmt.Sprintf("title = $%d", len(args))) } if payload.Emoji != nil { args = append(args, nullableString(strings.TrimSpace(*payload.Emoji))) set = append(set, fmt.Sprintf("emoji = $%d", len(args))) } if payload.Description != nil { args = append(args, nullableString(strings.TrimSpace(*payload.Description))) set = append(set, fmt.Sprintf("description = $%d", len(args))) } if len(set) == 0 { a.getSection(ctx, w, r, vertical, slug) return } args = append(args, vertical, slug) where := fmt.Sprintf("vertical = $%d AND slug = $%d", len(args)-1, len(args)) if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` UPDATE sections SET `+strings.Join(set, ", ")+` WHERE `+where+` RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at, (SELECT count(*) FROM channels c WHERE c.section_id = sections.id), (SELECT count(*) FROM channels c WHERE c.section_id = sections.id AND c.is_active = true), (SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id WHERE c.section_id = sections.id), 0::bigint `, args...) item, err := scanSectionRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, item) } func (a *app) deleteSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { scope, ok := a.readScope(w, r, true) if !ok { return } section, err := a.findSection(ctx, vertical, slug, scope) if err != nil { writeDBError(w, err) return } var count int64 if err := a.db.QueryRow(ctx, `SELECT count(*) FROM channels WHERE section_id = $1`, section.ID).Scan(&count); err != nil { writeDBError(w, err) return } if count > 0 { writeError(w, http.StatusBadRequest, fmt.Sprintf("section has %d channels - move or delete them first", count)) return } if _, err := a.db.Exec(ctx, `DELETE FROM sections WHERE id = $1`, section.ID); err != nil { writeDBError(w, err) return } w.WriteHeader(http.StatusNoContent) } func (a *app) findSection(ctx context.Context, vertical, slug string, scope accessScope) (sectionOut, error) { args := []any{vertical, slug} where := "s.vertical = $1 AND s.slug = $2" if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` SELECT s.id, s.vertical, COALESCE(s.department_id, ''), s.slug, s.title, COALESCE(s.emoji, ''), COALESCE(s.description, ''), s.created_at, (SELECT count(*) FROM channels c WHERE c.section_id = s.id), (SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true), (SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id WHERE c.section_id = s.id), 0::bigint FROM sections s WHERE `+where+` `, args...) return scanSectionRow(row) } func (a *app) handleChannels(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: a.listChannels(r.Context(), w, r) case http.MethodPost: a.createChannel(r.Context(), w, r) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) listChannels(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) args := []any{vertical} where := "c.vertical = $1" if section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } rows, err := a.db.Query(ctx, ` SELECT c.id, COALESCE(c.tg_id, src.tg_id), c.source_channel_id, c.identifier, COALESCE(c.title, src.title), c.vertical, c.section_id, s.slug, c.is_active, COALESCE(c.last_message_id, src.last_message_id), COALESCE(c.last_polled_at, src.last_polled_at), c.created_at FROM channels c JOIN sections s ON s.id = c.section_id LEFT JOIN channels src ON src.id = c.source_channel_id WHERE `+where+` ORDER BY c.created_at DESC, c.id DESC `, args...) if err != nil { writeDBError(w, err) return } defer rows.Close() out := []channelOut{} for rows.Next() { item, err := scanChannel(rows) if err != nil { writeDBError(w, err) return } out = append(out, item) } if err := rows.Err(); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, out) } func (a *app) createChannel(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } var payload struct { Identifier string `json:"identifier"` Vertical string `json:"vertical"` Section string `json:"section"` } if !readBody(w, r, &payload) { return } payload.Identifier = strings.TrimSpace(payload.Identifier) payload.Vertical = strings.TrimSpace(payload.Vertical) payload.Section = strings.TrimSpace(payload.Section) if payload.Identifier == "" || payload.Vertical == "" || payload.Section == "" { writeError(w, http.StatusBadRequest, "identifier, vertical and section are required") return } section, err := a.findSection(ctx, payload.Vertical, payload.Section, scope) if err != nil { writeDBError(w, err) return } row := a.db.QueryRow(ctx, ` INSERT INTO channels (identifier, vertical, section_id, is_active) VALUES ($1, $2, $3, true) ON CONFLICT ON CONSTRAINT uq_channels_section_identifier DO UPDATE SET is_active = true RETURNING id, tg_id, source_channel_id, identifier, title, vertical, section_id, $4::text, is_active, last_message_id, last_polled_at, created_at `, payload.Identifier, payload.Vertical, section.ID, section.Slug) item, err := scanChannelRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusCreated, item) } func (a *app) handleChannelItem(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) { rest := strings.TrimPrefix(path, "/api/v1/channels/") parts := strings.Split(rest, "/") if len(parts) == 0 { writeError(w, http.StatusNotFound, "not found") return } id, err := strconv.ParseInt(parts[0], 10, 64) if err != nil { writeError(w, http.StatusBadRequest, "bad channel id") return } if len(parts) == 2 { switch parts[1] { case "poll", "backfill-media": a.proxyPython(w, r, path) case "reanalyze": a.reanalyzeChannel(ctx, w, r, id) case "stats": a.channelStats(ctx, w, r, id) default: writeError(w, http.StatusNotFound, "not found") } return } switch r.Method { case http.MethodGet: a.getChannel(ctx, w, r, id) case http.MethodPatch: a.updateChannel(ctx, w, r, id) case http.MethodDelete: a.deleteChannel(ctx, w, r, id) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) getChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, false) if !ok { return } channel, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, channel) } func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, true) if !ok { return } if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil { writeDBError(w, err) return } var payload struct { IsActive *bool `json:"is_active"` Vertical *string `json:"vertical"` Section *string `json:"section"` } if !readBody(w, r, &payload) { return } set := []string{} args := []any{} if payload.IsActive != nil { args = append(args, *payload.IsActive) set = append(set, fmt.Sprintf("is_active = $%d", len(args))) } if payload.Vertical != nil { args = append(args, strings.TrimSpace(*payload.Vertical)) set = append(set, fmt.Sprintf("vertical = $%d", len(args))) } if payload.Section != nil && strings.TrimSpace(*payload.Section) != "" { vertical := r.URL.Query().Get("vertical") if payload.Vertical != nil && strings.TrimSpace(*payload.Vertical) != "" { vertical = strings.TrimSpace(*payload.Vertical) } section, err := a.findSection(ctx, vertical, strings.TrimSpace(*payload.Section), scope) if err != nil { writeDBError(w, err) return } args = append(args, section.ID) set = append(set, fmt.Sprintf("section_id = $%d", len(args))) } if len(set) == 0 { a.getChannel(ctx, w, r, id) return } args = append(args, id) row := a.db.QueryRow(ctx, ` UPDATE channels SET `+strings.Join(set, ", ")+` WHERE id = $`+strconv.Itoa(len(args))+` RETURNING id, tg_id, source_channel_id, identifier, title, vertical, section_id, (SELECT slug FROM sections WHERE id = channels.section_id), is_active, last_message_id, last_polled_at, created_at `, args...) item, err := scanChannelRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, item) } func (a *app) deleteChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, true) if !ok { return } if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil { writeDBError(w, err) return } if _, err := a.db.Exec(ctx, `DELETE FROM channels WHERE id = $1`, id); err != nil { writeDBError(w, err) return } w.WriteHeader(http.StatusNoContent) } func (a *app) findChannel(ctx context.Context, id int64, scope accessScope, vertical, section string) (channelOut, error) { args := []any{id} where := "c.id = $1" if strings.TrimSpace(vertical) != "" { args = append(args, strings.TrimSpace(vertical)) where += fmt.Sprintf(" AND c.vertical = $%d", len(args)) } if strings.TrimSpace(section) != "" { args = append(args, strings.TrimSpace(section)) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` SELECT c.id, COALESCE(c.tg_id, src.tg_id), c.source_channel_id, c.identifier, COALESCE(c.title, src.title), c.vertical, c.section_id, s.slug, c.is_active, COALESCE(c.last_message_id, src.last_message_id), COALESCE(c.last_polled_at, src.last_polled_at), c.created_at FROM channels c JOIN sections s ON s.id = c.section_id LEFT JOIN channels src ON src.id = c.source_channel_id WHERE `+where+` `, args...) return scanChannelRow(row) } func (a *app) channelStats(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, false) if !ok { return } ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) if err != nil { writeDBError(w, err) return } var messages, leads int64 key := "lead" field := "is_listing" if ch.Vertical == verticalHR { key = "hr_lead" field = "is_lead" } sourceID := id if ch.SourceChannelID != nil { sourceID = *ch.SourceChannelID } if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1`, sourceID).Scan(&messages); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1 AND extracted -> $2 ->> $3 = 'true'`, sourceID, key, field).Scan(&leads); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{ "channel_id": id, "source_channel_id": sourceID, "messages_total": messages, "leads_total": leads, "last_polled_at": ch.LastPolledAt, "last_message_id": ch.LastMessageID, }) } func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, true) if !ok { return } ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) if err != nil { writeDBError(w, err) return } key := verdictKey(ch.Vertical) sourceID := id if ch.SourceChannelID != nil { sourceID = *ch.SourceChannelID } tag, err := a.db.Exec(ctx, ` UPDATE messages SET extracted = ( CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END ) - $1 WHERE channel_id = $2 `, key, sourceID) if err != nil { writeDBError(w, err) return } pending, err := a.pendingLLM(ctx, scope, ch.Vertical, valueOrEmpty(ch.SectionSlug)) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]int64{"updated": tag.RowsAffected(), "pending": pending}) } func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } q := r.URL.Query() limit := clampInt(queryInt(q.Get("limit"), 50), 1, 500) offset := maxInt(queryInt(q.Get("offset"), 0), 0) args := []any{vertical} where := "c.vertical = $1" if section := strings.TrimSpace(q.Get("section")); section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if channelID := strings.TrimSpace(q.Get("channel_id")); channelID != "" { id, err := strconv.ParseInt(channelID, 10, 64) if err != nil { writeError(w, http.StatusBadRequest, "bad channel_id") return } args = append(args, id) where += fmt.Sprintf(" AND c.id = $%d", len(args)) } if text := strings.TrimSpace(q.Get("q")); text != "" { args = append(args, "%"+text+"%") where += fmt.Sprintf(" AND m.text ILIKE $%d", len(args)) } if q.Get("leads_only") == "true" || q.Get("leads_only") == "1" { key := "lead" field := "is_listing" if vertical == verticalHR { key = "hr_lead" field = "is_lead" } args = append(args, key, field) where += fmt.Sprintf(" AND m.extracted -> $%d ->> $%d = 'true'", len(args)-1, len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } fetchLimit := clampInt(limit*5, limit, 1000) args = append(args, fetchLimit, offset) rows, err := a.db.Query(ctx, ` SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id WHERE `+where+` ORDER BY m.date DESC, m.id DESC LIMIT $`+strconv.Itoa(len(args)-1)+` OFFSET $`+strconv.Itoa(len(args))+` `, args...) if err != nil { writeDBError(w, err) return } defer rows.Close() out := []messageOut{} for rows.Next() { item, err := scanMessage(rows) if err != nil { writeDBError(w, err) return } a.normalizeMessageMedia(&item) out = append(out, item) } if err := rows.Err(); err != nil { writeDBError(w, err) return } out = aggregateMessages(out, limit) writeJSON(w, http.StatusOK, out) } func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } idRaw := strings.TrimPrefix(path, "/api/v1/messages/") id, err := strconv.ParseInt(idRaw, 10, 64) if err != nil { writeError(w, http.StatusBadRequest, "bad message id") return } args := []any{id} where := "m.id = $1" if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, COALESCE(src.identifier, c.identifier), COALESCE(src.tg_id, c.tg_id), m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at FROM messages m JOIN channels src ON src.id = m.channel_id JOIN channels c ON c.id = src.id OR c.source_channel_id = src.id JOIN sections s ON s.id = c.section_id WHERE `+where+` ORDER BY CASE WHEN c.id = src.id THEN 0 ELSE 1 END LIMIT 1 `, args...) item, err := scanMessageRow(row) if err != nil { writeDBError(w, err) return } a.normalizeMessageMedia(&item) writeJSON(w, http.StatusOK, item) } func (a *app) handleMedia(w http.ResponseWriter, r *http.Request, path string) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } rel, err := url.PathUnescape(strings.TrimPrefix(path, "/api/v1/media/")) if err != nil { writeError(w, http.StatusBadRequest, "bad media path") return } clean := strings.TrimPrefix(filepath.Clean("/"+rel), "/") if clean == "." || clean == "" || strings.HasPrefix(clean, "../") { writeError(w, http.StatusBadRequest, "bad media path") return } parts := strings.SplitN(clean, string(os.PathSeparator), 2) channelID, err := strconv.ParseInt(parts[0], 10, 64) if err != nil || channelID <= 0 { writeError(w, http.StatusBadRequest, "bad media path") return } allowed, err := a.canReadChannelMedia(r.Context(), scope, channelID) if err != nil { writeDBError(w, err) return } if !allowed { writeError(w, http.StatusNotFound, "not found") return } if a.serveMinioMedia(w, r, clean) { return } base, err := filepath.Abs(a.cfg.MediaDir) if err != nil { writeError(w, http.StatusInternalServerError, "media directory unavailable") return } full, err := filepath.Abs(filepath.Join(base, clean)) if err != nil || (full != base && !strings.HasPrefix(full, base+string(os.PathSeparator))) { writeError(w, http.StatusBadRequest, "bad media path") return } http.ServeFile(w, r, full) } func (a *app) serveMinioMedia(w http.ResponseWriter, r *http.Request, key string) bool { if a.minio == nil || a.cfg.MinioBucket == "" { return false } obj, err := a.minio.GetObject(r.Context(), a.cfg.MinioBucket, key, minio.GetObjectOptions{}) if err != nil { slog.Warn("minio_get_media_failed", "key", key, "error", err) return false } defer obj.Close() info, err := obj.Stat() if err != nil { return false } if info.ContentType != "" { w.Header().Set("Content-Type", info.ContentType) } if info.Size > 0 { w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10)) } if _, err := io.Copy(w, obj); err != nil { slog.Warn("minio_media_stream_failed", "key", key, "error", err) } return true } func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channelID int64) (bool, error) { var allowed bool err := a.db.QueryRow(ctx, ` SELECT COALESCE(bool_or(s.department_id = $2 OR $3::boolean), false) FROM channels c JOIN sections s ON s.id = c.section_id WHERE c.id = $1 OR c.source_channel_id = $1 `, channelID, scope.DeptID, scope.IsAdmin).Scan(&allowed) if errors.Is(err, pgx.ErrNoRows) { return false, nil } if err != nil { return false, err } return allowed, nil } func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) args := []any{vertical} where := "c.vertical = $1" if section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } var channelsTotal, channelsActive, messagesTotal, messages24h, leadsTotal, leads24h int64 var lastPoll sql.NullTime countQuery := `FROM channels c JOIN sections s ON s.id = c.section_id WHERE ` + where if err := a.db.QueryRow(ctx, `SELECT count(*) `+countQuery, args...).Scan(&channelsTotal); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) `+countQuery+` AND c.is_active = true`, args...).Scan(&channelsActive); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT max(c.last_polled_at) `+countQuery, args...).Scan(&lastPoll); err != nil { writeDBError(w, err) return } messageFrom := `FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id WHERE ` + where if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom, args...).Scan(&messagesTotal); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+` AND m.fetched_at >= now() - interval '24 hours'`, args...).Scan(&messages24h); err != nil { writeDBError(w, err) return } key := "lead" field := "is_listing" if vertical == verticalHR { key = "hr_lead" field = "is_lead" } leadArgs := append(append([]any{}, args...), key, field) leadClause := fmt.Sprintf(" AND m.extracted -> $%d ->> $%d = 'true'", len(leadArgs)-1, len(leadArgs)) if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause, leadArgs...).Scan(&leadsTotal); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause+` AND m.fetched_at >= now() - interval '24 hours'`, leadArgs...).Scan(&leads24h); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{ "vertical": vertical, "section_slug": nullableString(section), "channels_total": channelsTotal, "channels_active": channelsActive, "messages_total": messagesTotal, "messages_last_24h": messages24h, "leads_total": leadsTotal, "leads_last_24h": leads24h, "poll_interval_seconds": a.cfg.PollIntervalSeconds, "last_poll_at": nullTime(lastPoll), }) } func (a *app) handleLLMStatus(ctx context.Context, w http.ResponseWriter) { ready := false var providerError string model := a.cfg.LLMModel if a.cfg.LLMEnabled { status, err := a.ai.ProvidersStatus(ctx) if err != nil { providerError = err.Error() } else { for _, provider := range status.Providers { if provider.Name == "llm" { ready = provider.Configured && provider.OK providerError = provider.Error if provider.Model != "" { model = provider.Model } break } } } } writeJSON(w, http.StatusOK, map[string]any{ "enabled": a.cfg.LLMEnabled, "ready": ready, "base_url": a.cfg.AIServiceURL, "model": model, "provider": "ai-service", "provider_error": providerError, }) } func (a *app) handleLLMQueue(ctx context.Context, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } pending, err := a.pendingLLM(ctx, scope, vertical, strings.TrimSpace(r.URL.Query().Get("section"))) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]int64{"pending": pending}) } func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, section string) (int64, error) { args := []any{vertical} where := `c.vertical = $1 AND m.text IS NOT NULL AND ((c.vertical = 'hr' AND (m.extracted IS NULL OR m.extracted->'hr_lead' IS NULL)) OR (c.vertical <> 'hr' AND (m.extracted IS NULL OR m.extracted->'lead' IS NULL)))` if section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } var pending int64 err := a.db.QueryRow(ctx, ` SELECT count(*) FROM channels c JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id) JOIN messages m ON m.channel_id = src.id JOIN sections s ON s.id = c.section_id WHERE `+where, args...).Scan(&pending) return pending, err } func (a *app) handlePrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: a.getPrompt(ctx, w, r) case http.MethodPut: a.savePrompt(ctx, w, r) case http.MethodDelete: a.resetPrompt(ctx, w, r) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) getPrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) if section != "" { if _, err := a.findSection(ctx, vertical, section, scope); err != nil { writeDBError(w, err) return } } prompt, source, err := a.resolvePrompt(ctx, scope.DeptID, vertical, section) if err != nil { writeDBError(w, err) return } overridden, err := a.promptExists(ctx, scope.DeptID, vertical, section) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{ "vertical": vertical, "department_id": nullableString(scope.DeptID), "section": nullableString(section), "prompt": prompt, "default": defaultPrompt(vertical), "source": source, "is_overridden_here": overridden, }) } func (a *app) savePrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) var payload struct { Prompt string `json:"prompt"` } if !readBody(w, r, &payload) { return } text := strings.TrimSpace(payload.Prompt) if text == "" { writeError(w, http.StatusBadRequest, "prompt must be a non-empty string") return } if len(text) > 30000 { writeError(w, http.StatusBadRequest, "prompt is too long (max 30000 chars)") return } key := promptKey(scope.DeptID, vertical, section) value, _ := json.Marshal(text) if _, err := a.db.Exec(ctx, ` INSERT INTO app_settings (key, value, updated_at) VALUES ($1, $2::jsonb, now()) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = now() `, key, string(value)); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{"saved": true, "vertical": vertical, "department_id": nullableString(scope.DeptID), "section": nullableString(section), "length": len(text)}) } func (a *app) resetPrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) if _, err := a.db.Exec(ctx, `DELETE FROM app_settings WHERE key = $1`, promptKey(scope.DeptID, vertical, section)); err != nil { writeDBError(w, err) return } w.WriteHeader(http.StatusNoContent) } func (a *app) resolvePrompt(ctx context.Context, deptID, vertical, section string) (string, string, error) { keys := []struct { key string source string }{} if section != "" { keys = append(keys, struct { key string source string }{promptKey(deptID, vertical, section), "section"}) } keys = append(keys, struct { key string source string }{promptKey(deptID, vertical, ""), "vertical"}) for _, candidate := range keys { var text string err := a.db.QueryRow(ctx, `SELECT value #>> '{}' FROM app_settings WHERE key = $1`, candidate.key).Scan(&text) if err == nil && strings.TrimSpace(text) != "" { return text, candidate.source, nil } if err != nil && !errors.Is(err, pgx.ErrNoRows) { return "", "", err } } return defaultPrompt(vertical), "default", nil } func (a *app) promptExists(ctx context.Context, deptID, vertical, section string) (bool, error) { var exists bool err := a.db.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM app_settings WHERE key = $1)`, promptKey(deptID, vertical, section)).Scan(&exists) return exists, err } func (a *app) proxyPython(w http.ResponseWriter, r *http.Request, path string) { scope := readAccess(r) if strings.Contains(path, "/auth/") && !scope.CanAuth { writeError(w, http.StatusNotFound, "not found") return } if (strings.Contains(path, "/poll") || strings.Contains(path, "/backfill-media")) && !scope.CanManage { writeError(w, http.StatusNotFound, "not found") return } target := strings.TrimRight(a.cfg.PythonBaseURL, "/") + path if r.URL.RawQuery != "" { target += "?" + r.URL.RawQuery } body, err := io.ReadAll(r.Body) if err != nil { writeError(w, http.StatusBadRequest, "bad body") return } req, err := http.NewRequestWithContext(r.Context(), r.Method, target, bytes.NewReader(body)) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } req.Header = r.Header.Clone() resp, err := a.python.Do(req) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } defer resp.Body.Close() for k, values := range resp.Header { for _, v := range values { w.Header().Add(k, v) } } w.WriteHeader(resp.StatusCode) _, _ = io.Copy(w, resp.Body) } func (a *app) readScope(w http.ResponseWriter, r *http.Request, manage bool) (accessScope, bool) { scope := readAccess(r) if manage { if !scope.CanManage { writeError(w, http.StatusNotFound, "not found") return scope, false } } else if !scope.IsAdmin && scope.DeptID == "" { writeError(w, http.StatusForbidden, "department is required") return scope, false } if manage && !scope.IsAdmin && scope.DeptID == "" { writeError(w, http.StatusForbidden, "department is required") return scope, false } return scope, true } func readAccess(r *http.Request) accessScope { admin := r.Header.Get("X-User-Is-Admin") == "1" deptHead := r.Header.Get("X-User-Is-Department-Head") == "1" canManage := r.Header.Get("X-Monitoring-TG-Can-Manage") == "1" canAuth := r.Header.Get("X-Monitoring-TG-Can-Auth") == "1" return accessScope{ IsAdmin: admin, CanManage: admin || deptHead || canManage, CanAuth: admin || canAuth, DeptID: strings.TrimSpace(r.Header.Get("X-User-Department-Id")), } } type rowScanner interface { Scan(dest ...any) error } func scanSection(rows rowScanner) (sectionOut, error) { var item sectionOut var dept, emoji, description string err := rows.Scan( &item.ID, &item.Vertical, &dept, &item.Slug, &item.Title, &emoji, &description, &item.CreatedAt, &item.ChannelsTotal, &item.ChannelsActive, &item.MessagesTotal, &item.LeadsTotal, ) item.DepartmentID = nullableString(dept) item.Emoji = nullableString(emoji) item.Description = nullableString(description) return item, err } func scanSectionRow(row pgx.Row) (sectionOut, error) { return scanSection(row) } func scanChannel(rows rowScanner) (channelOut, error) { var item channelOut var tgID, sourceID, lastMsg sql.NullInt64 var title, slug sql.NullString var lastPoll sql.NullTime err := rows.Scan( &item.ID, &tgID, &sourceID, &item.Identifier, &title, &item.Vertical, &item.SectionID, &slug, &item.IsActive, &lastMsg, &lastPoll, &item.CreatedAt, ) item.TGID = nullInt(tgID) item.SourceChannelID = nullInt(sourceID) item.Title = nullString(title) item.SectionSlug = nullString(slug) item.LastMessageID = nullInt(lastMsg) item.LastPolledAt = nullTimePtr(lastPoll) return item, err } func scanChannelRow(row pgx.Row) (channelOut, error) { return scanChannel(row) } func scanMessage(rows rowScanner) (messageOut, error) { var item messageOut var vertical, slug string var grouped, senderID, tgID, views, forwards sql.NullInt64 var text, senderUsername, senderName, identifier sql.NullString var extractedText, mediaText string err := rows.Scan( &item.ID, &item.ChannelID, &vertical, &slug, &item.TGMessageID, &grouped, &item.GroupSize, &item.Date, &text, &senderID, &senderUsername, &senderName, &identifier, &tgID, &item.HasMedia, &extractedText, &mediaText, &views, &forwards, &item.FetchedAt, ) item.ChannelVertical = nullableString(vertical) item.ChannelSectionSlug = nullableString(slug) item.GroupedID = nullInt(grouped) item.Text = nullString(text) item.SenderID = nullInt(senderID) item.SenderUsername = nullString(senderUsername) item.SenderName = nullString(senderName) item.Views = nullInt(views) item.Forwards = nullInt(forwards) if extractedText != "" && extractedText != "null" { item.Extracted = json.RawMessage(extractedText) } if mediaText != "" && mediaText != "null" { _ = json.Unmarshal([]byte(mediaText), &item.MediaFiles) } item.PostURL = buildPostURL(identifier.String, tgID, item.TGMessageID) return item, err } func scanMessageRow(row pgx.Row) (messageOut, error) { return scanMessage(row) } func (a *app) normalizeMessageMedia(item *messageOut) { if len(item.MediaFiles) == 0 { return } for _, file := range item.MediaFiles { rawURL, _ := file["url"].(string) rel := mediaRelativePath(rawURL) if rel == "" { continue } file["url"] = a.mediaURL(rel) if _, ok := file["name"].(string); !ok { file["name"] = filepath.Base(rel) } } } func mediaRelativePath(raw string) string { raw = strings.TrimSpace(raw) if raw == "" { return "" } if idx := strings.Index(raw, "/media/"); idx >= 0 { raw = raw[idx+len("/media/"):] } raw = strings.TrimPrefix(raw, "/") raw = strings.TrimPrefix(raw, "media/") clean := strings.TrimPrefix(filepath.Clean("/"+raw), "/") if clean == "." || clean == "" || strings.HasPrefix(clean, "../") { return "" } return clean } func (a *app) mediaURL(rel string) string { base := strings.TrimRight(a.cfg.PublicBasePath, "/") return base + "/api/v1/media/" + strings.TrimPrefix(rel, "/") } func aggregateMessages(items []messageOut, limit int) []messageOut { out := make([]messageOut, 0, len(items)) index := map[string]int{} for _, item := range items { key := messageGroupKey(item) if pos, ok := index[key]; ok { group := &out[pos] group.GroupSize++ group.HasMedia = group.HasMedia || item.HasMedia group.MediaFiles = append(group.MediaFiles, item.MediaFiles...) if group.Text == nil && item.Text != nil { group.Text = item.Text } if len(group.Extracted) == 0 && len(item.Extracted) > 0 { group.Extracted = item.Extracted } continue } if item.GroupSize < 1 { item.GroupSize = 1 } index[key] = len(out) out = append(out, item) } if len(out) > limit { return out[:limit] } return out } func messageGroupKey(item messageOut) string { if item.GroupedID != nil { return fmt.Sprintf("channel:%d:group:%d", item.ChannelID, *item.GroupedID) } return fmt.Sprintf("message:%d", item.ID) } func buildPostURL(identifier string, tgID sql.NullInt64, msgID int64) *string { identifier = strings.TrimSpace(identifier) if identifier == "" || msgID == 0 { return nil } if strings.HasPrefix(identifier, "https://t.me/") { base := strings.TrimRight(identifier, "/") v := base + "/" + strconv.FormatInt(msgID, 10) return &v } if strings.HasPrefix(identifier, "@") { v := "https://t.me/" + strings.TrimPrefix(identifier, "@") + "/" + strconv.FormatInt(msgID, 10) return &v } if !strings.Contains(identifier, "/") && !strings.Contains(identifier, "+") { v := "https://t.me/" + identifier + "/" + strconv.FormatInt(msgID, 10) return &v } return nil } func readBody(w http.ResponseWriter, r *http.Request, out any) bool { defer r.Body.Close() if err := json.NewDecoder(r.Body).Decode(out); err != nil { writeError(w, http.StatusBadRequest, "invalid json") return false } return true } func writeJSON(w http.ResponseWriter, status int, payload any) { w.Header().Set("Content-Type", "application/json; charset=utf-8") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(payload) } func writeError(w http.ResponseWriter, status int, detail string) { writeJSON(w, status, map[string]string{"detail": detail}) } func writeDBError(w http.ResponseWriter, err error) { if errors.Is(err, pgx.ErrNoRows) { writeError(w, http.StatusNotFound, "not found") return } slog.Error("api_db_error", "error", err) writeError(w, http.StatusInternalServerError, "database error") } func queryRequired(w http.ResponseWriter, r *http.Request, name string) string { value := strings.TrimSpace(r.URL.Query().Get(name)) if value == "" { writeError(w, http.StatusBadRequest, name+" is required") } return value } func promptKey(deptID, vertical, section string) string { if deptID == "" { deptID = "global" } if section != "" { return fmt.Sprintf("llm_system_prompt:%s:%s:%s", deptID, vertical, section) } return fmt.Sprintf("llm_system_prompt:%s:%s", deptID, vertical) } func verdictKey(vertical string) string { if vertical == verticalHR { return "hr_lead" } return "lead" } func defaultPrompt(vertical string) string { if vertical == verticalHR { return defaultHRPrompt } return defaultREPrompt } func normalizeSlug(raw string) string { raw = strings.ToLower(strings.TrimSpace(raw)) raw = strings.ReplaceAll(raw, " ", "-") var b strings.Builder for _, r := range raw { if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' || r == '-' { b.WriteRune(r) } } return b.String() } func nullableString(v string) *string { v = strings.TrimSpace(v) if v == "" { return nil } return &v } func nullString(v sql.NullString) *string { if !v.Valid || strings.TrimSpace(v.String) == "" { return nil } return &v.String } func nullInt(v sql.NullInt64) *int64 { if !v.Valid { return nil } return &v.Int64 } func nullTime(v sql.NullTime) any { if !v.Valid { return nil } return v.Time } func nullTimePtr(v sql.NullTime) *time.Time { if !v.Valid { return nil } return &v.Time } func valueOrEmpty(v *string) string { if v == nil { return "" } return *v } func newMinioClient(cfg config) (*minio.Client, error) { if cfg.MinioEndpoint == "" || cfg.MinioAccessKey == "" || cfg.MinioSecretKey == "" || cfg.MinioBucket == "" { return nil, nil } endpoint := strings.TrimPrefix(strings.TrimPrefix(cfg.MinioEndpoint, "https://"), "http://") opts := &minio.Options{ Creds: credentials.NewStaticV4(cfg.MinioAccessKey, cfg.MinioSecretKey, ""), Secure: cfg.MinioUseSSL, Region: cfg.MinioRegion, BucketLookup: minio.BucketLookupPath, } if cfg.MinioInsecureTLS { opts.Transport = &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec // optional intra-cluster MinIO mode } } return minio.New(endpoint, opts) } func queryInt(raw string, fallback int) int { if raw == "" { return fallback } n, err := strconv.Atoi(raw) if err != nil { return fallback } return n } func clampInt(n, min, max int) int { if n < min { return min } if n > max { return max } return n } func maxInt(n, min int) int { if n < min { return min } return n } func loadConfig() config { return config{ APIHost: env("API_HOST", "0.0.0.0"), APIPort: envInt("API_PORT", 8000), PublicBasePath: env("PUBLIC_BASE_PATH", ""), PythonBaseURL: env("PYTHON_BASE_URL", "http://127.0.0.1:8001"), MediaDir: env("MEDIA_DIR", "/data/media"), PostgresUser: env("POSTGRES_USER", "parser"), PostgresPassword: env("POSTGRES_PASSWORD", "parser"), PostgresDB: env("POSTGRES_DB", "parser"), PostgresHost: env("POSTGRES_HOST", "db"), PostgresPort: envInt("POSTGRES_PORT", 5432), PollIntervalSeconds: envInt("POLL_INTERVAL_SECONDS", 60), LLMEnabled: envBool("LLM_ENABLED", true), LLMModel: env("LLM_MODEL", "qwen2.5-14b"), LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second, AIServiceURL: env("AI_SERVICE_URL", ""), AIServiceToken: env("AI_SERVICE_TOKEN", ""), MinioEndpoint: env("MINIO_ENDPOINT", ""), MinioAccessKey: env("MINIO_ACCESS_KEY", ""), MinioSecretKey: env("MINIO_SECRET_KEY", ""), MinioBucket: env("MINIO_BUCKET", "monitoring-tg-media"), MinioUseSSL: envBool("MINIO_USE_SSL", true), MinioRegion: env("MINIO_REGION", "us-east-1"), MinioInsecureTLS: envBool("MINIO_INSECURE_SKIP_VERIFY", false), } } func (c config) databaseURL() string { return fmt.Sprintf( "postgres://%s:%s@%s:%d/%s", url.QueryEscape(c.PostgresUser), url.QueryEscape(c.PostgresPassword), c.PostgresHost, c.PostgresPort, url.QueryEscape(c.PostgresDB), ) } func env(key, fallback string) string { if v := strings.TrimSpace(os.Getenv(key)); v != "" { return v } return fallback } func envInt(key string, fallback int) int { if raw := strings.TrimSpace(os.Getenv(key)); raw != "" { if n, err := strconv.Atoi(raw); err == nil { return n } } return fallback } func envBool(key string, fallback bool) bool { if raw := strings.TrimSpace(os.Getenv(key)); raw != "" { if b, err := strconv.ParseBool(raw); err == nil { return b } if raw == "1" { return true } if raw == "0" { return false } } return fallback } const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала. Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости. Отвечай строго валидным JSON без markdown: { "is_listing": boolean, "kind": "sale" | "rent" | "purchase" | null, "property_type": string | null, "rooms": string | null, "area_m2": number | null, "price_text": string | null, "price_value": number | null, "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, "location": string | null, "contact_phone": string | null, "contact_name": string | null, "summary": string, "confidence": number } summary всегда по-русски, confidence в диапазоне 0..1.` const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала. Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт. Отвечай строго валидным JSON без markdown: { "is_lead": boolean, "kind": "vacancy" | "resume" | "contact" | null, "title": string | null, "company": string | null, "candidate_name": string | null, "experience_years": number | null, "skills": string[], "location": string | null, "remote": boolean | null, "employment_type": "full-time" | "part-time" | "contract" | "internship" | null, "salary_text": string | null, "salary_value": number | null, "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, "contact_phone": string | null, "contact_name": string | null, "summary": string, "confidence": number } summary всегда по-русски, confidence в диапазоне 0..1.`