package main import ( "bytes" "context" "database/sql" "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" "net/url" "os" "os/signal" "path/filepath" "strconv" "strings" "syscall" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) const ( verticalHR = "hr" ) type config struct { APIHost string APIPort int PublicBasePath string PythonBaseURL string MediaDir string PostgresUser string PostgresPassword string PostgresDB string PostgresHost string PostgresPort int PollIntervalSeconds int LLMEnabled bool LLMBaseURL string LLMAPIKey string LLMModel string LLMTimeout time.Duration } type app struct { cfg config db *pgxpool.Pool http *http.Client python *http.Client } type accessScope struct { IsAdmin bool CanManage bool CanAuth bool DeptID string } type sectionOut struct { ID int64 `json:"id"` Vertical string `json:"vertical"` DepartmentID *string `json:"department_id,omitempty"` Slug string `json:"slug"` Title string `json:"title"` Emoji *string `json:"emoji,omitempty"` Description *string `json:"description,omitempty"` CreatedAt time.Time `json:"created_at"` ChannelsTotal int64 `json:"channels_total"` ChannelsActive int64 `json:"channels_active"` MessagesTotal int64 `json:"messages_total"` LeadsTotal int64 `json:"leads_total"` } type channelOut struct { ID int64 `json:"id"` TGID *int64 `json:"tg_id,omitempty"` Identifier string `json:"identifier"` Title *string `json:"title,omitempty"` Vertical string `json:"vertical"` SectionID int64 `json:"section_id"` SectionSlug *string `json:"section_slug,omitempty"` IsActive bool `json:"is_active"` LastMessageID *int64 `json:"last_message_id,omitempty"` LastPolledAt *time.Time `json:"last_polled_at,omitempty"` CreatedAt time.Time `json:"created_at"` } type messageOut struct { ID int64 `json:"id"` ChannelID int64 `json:"channel_id"` ChannelVertical *string `json:"channel_vertical,omitempty"` ChannelSectionSlug *string `json:"channel_section_slug,omitempty"` TGMessageID int64 `json:"tg_message_id"` GroupedID *int64 `json:"grouped_id,omitempty"` GroupSize int `json:"group_size"` Date time.Time `json:"date"` Text *string `json:"text,omitempty"` SenderID *int64 `json:"sender_id,omitempty"` SenderUsername *string `json:"sender_username,omitempty"` SenderName *string `json:"sender_name,omitempty"` PostURL *string `json:"post_url,omitempty"` HasMedia bool `json:"has_media"` MediaFiles []map[string]any `json:"media_files,omitempty"` Extracted json.RawMessage `json:"extracted,omitempty"` Views *int64 `json:"views,omitempty"` Forwards *int64 `json:"forwards,omitempty"` FetchedAt time.Time `json:"fetched_at"` } func main() { cfg := loadConfig() logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) slog.SetDefault(logger) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() pool, err := pgxpool.New(ctx, cfg.databaseURL()) if err != nil { slog.Error("db_connect_failed", "error", err) os.Exit(1) } defer pool.Close() srvApp := &app{ cfg: cfg, db: pool, http: &http.Client{Timeout: cfg.LLMTimeout}, python: &http.Client{Timeout: 15 * time.Minute}, } server := &http.Server{ Addr: fmt.Sprintf("%s:%d", cfg.APIHost, cfg.APIPort), Handler: http.HandlerFunc(srvApp.serveHTTP), ReadHeaderTimeout: 10 * time.Second, } go func() { <-ctx.Done() shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _ = server.Shutdown(shutdownCtx) }() slog.Info("go_api_started", "addr", server.Addr, "python_base_url", cfg.PythonBaseURL) if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { slog.Error("server_failed", "error", err) os.Exit(1) } } func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) { path := a.apiPath(r.URL.Path) if path == "/healthz" { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) return } if path == "/" { writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-tg", "ui": "portal", "api": "go"}) return } if !strings.HasPrefix(path, "/api/v1") { writeError(w, http.StatusNotFound, "not found") return } ctx := r.Context() switch { case r.Method == http.MethodGet && path == "/api/v1/access/me": a.handleAccessMe(w, r) case path == "/api/v1/auth/status" || strings.HasPrefix(path, "/api/v1/auth/"): a.proxyPython(w, r, path) case path == "/api/v1/sections": a.handleSections(w, r) case strings.HasPrefix(path, "/api/v1/sections/"): a.handleSectionItem(w, r, path) case path == "/api/v1/channels": a.handleChannels(w, r) case strings.HasPrefix(path, "/api/v1/channels/"): a.handleChannelItem(ctx, w, r, path) case strings.HasPrefix(path, "/api/v1/media/"): a.handleMedia(w, r, path) case path == "/api/v1/messages": a.handleMessages(ctx, w, r) case strings.HasPrefix(path, "/api/v1/messages/"): a.handleMessageItem(ctx, w, r, path) case path == "/api/v1/stats": a.handleStats(ctx, w, r) case path == "/api/v1/llm/status": a.handleLLMStatus(ctx, w) case path == "/api/v1/llm/queue": a.handleLLMQueue(ctx, w, r) case path == "/api/v1/llm/prompt": a.handlePrompt(ctx, w, r) case r.Method == http.MethodPost && path == "/api/v1/poll": a.proxyPython(w, r, path) default: writeError(w, http.StatusNotFound, "not found") } } func (a *app) apiPath(path string) string { base := strings.TrimRight(a.cfg.PublicBasePath, "/") if base != "" && strings.HasPrefix(path, base+"/") { return strings.TrimPrefix(path, base) } if base != "" && path == base { return "/" } return path } func (a *app) handleAccessMe(w http.ResponseWriter, r *http.Request) { scope := readAccess(r) writeJSON(w, http.StatusOK, map[string]any{ "is_admin": scope.IsAdmin, "can_manage_department": scope.CanManage, "can_auth_telegram": scope.CanAuth, "department_id": nullableString(scope.DeptID), }) } func (a *app) handleSections(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: a.listSections(r.Context(), w, r) case http.MethodPost: a.createSection(r.Context(), w, r) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) listSections(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } args := []any{vertical} deptFilter := "" if !scope.IsAdmin { args = append(args, scope.DeptID) deptFilter = fmt.Sprintf(" AND s.department_id = $%d", len(args)) } rows, err := a.db.Query(ctx, ` SELECT s.id, s.vertical, COALESCE(s.department_id, ''), s.slug, s.title, COALESCE(s.emoji, ''), COALESCE(s.description, ''), s.created_at, (SELECT count(*) FROM channels c WHERE c.section_id = s.id), (SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true), (SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = s.id), (SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = s.id AND ( (s.vertical = 'hr' AND m.extracted->'hr_lead'->>'is_lead' = 'true') OR (s.vertical <> 'hr' AND m.extracted->'lead'->>'is_listing' = 'true') )) FROM sections s WHERE s.vertical = $1`+deptFilter+` ORDER BY s.created_at ASC, s.id ASC `, args...) if err != nil { writeDBError(w, err) return } defer rows.Close() out := []sectionOut{} for rows.Next() { item, err := scanSection(rows) if err != nil { writeDBError(w, err) return } out = append(out, item) } if err := rows.Err(); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, out) } func (a *app) createSection(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } var payload struct { Vertical string `json:"vertical"` Slug string `json:"slug"` Title string `json:"title"` Emoji *string `json:"emoji"` Description *string `json:"description"` } if !readBody(w, r, &payload) { return } payload.Vertical = strings.TrimSpace(payload.Vertical) payload.Slug = normalizeSlug(payload.Slug) payload.Title = strings.TrimSpace(payload.Title) if payload.Vertical == "" || payload.Slug == "" || payload.Title == "" { writeError(w, http.StatusBadRequest, "vertical, slug and title are required") return } dept := nullableString(scope.DeptID) row := a.db.QueryRow(ctx, ` INSERT INTO sections (vertical, department_id, slug, title, emoji, description) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at, 0::bigint, 0::bigint, 0::bigint, 0::bigint `, payload.Vertical, dept, payload.Slug, payload.Title, payload.Emoji, payload.Description) item, err := scanSectionRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusCreated, item) } func (a *app) handleSectionItem(w http.ResponseWriter, r *http.Request, path string) { rest := strings.TrimPrefix(path, "/api/v1/sections/") parts := strings.SplitN(rest, "/", 2) if len(parts) != 2 { writeError(w, http.StatusNotFound, "not found") return } vertical, err := url.PathUnescape(parts[0]) if err != nil { writeError(w, http.StatusBadRequest, "bad vertical") return } slug, err := url.PathUnescape(parts[1]) if err != nil { writeError(w, http.StatusBadRequest, "bad slug") return } switch r.Method { case http.MethodGet: a.getSection(r.Context(), w, r, vertical, slug) case http.MethodPatch: a.updateSection(r.Context(), w, r, vertical, slug) case http.MethodDelete: a.deleteSection(r.Context(), w, r, vertical, slug) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) getSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { scope, ok := a.readScope(w, r, false) if !ok { return } item, err := a.findSection(ctx, vertical, slug, scope) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, item) } func (a *app) updateSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { scope, ok := a.readScope(w, r, true) if !ok { return } var payload struct { Title *string `json:"title"` Emoji *string `json:"emoji"` Description *string `json:"description"` } if !readBody(w, r, &payload) { return } set := []string{} args := []any{} if payload.Title != nil { args = append(args, strings.TrimSpace(*payload.Title)) set = append(set, fmt.Sprintf("title = $%d", len(args))) } if payload.Emoji != nil { args = append(args, nullableString(strings.TrimSpace(*payload.Emoji))) set = append(set, fmt.Sprintf("emoji = $%d", len(args))) } if payload.Description != nil { args = append(args, nullableString(strings.TrimSpace(*payload.Description))) set = append(set, fmt.Sprintf("description = $%d", len(args))) } if len(set) == 0 { a.getSection(ctx, w, r, vertical, slug) return } args = append(args, vertical, slug) where := fmt.Sprintf("vertical = $%d AND slug = $%d", len(args)-1, len(args)) if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` UPDATE sections SET `+strings.Join(set, ", ")+` WHERE `+where+` RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at, (SELECT count(*) FROM channels c WHERE c.section_id = sections.id), (SELECT count(*) FROM channels c WHERE c.section_id = sections.id AND c.is_active = true), (SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = sections.id), 0::bigint `, args...) item, err := scanSectionRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, item) } func (a *app) deleteSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { scope, ok := a.readScope(w, r, true) if !ok { return } section, err := a.findSection(ctx, vertical, slug, scope) if err != nil { writeDBError(w, err) return } var count int64 if err := a.db.QueryRow(ctx, `SELECT count(*) FROM channels WHERE section_id = $1`, section.ID).Scan(&count); err != nil { writeDBError(w, err) return } if count > 0 { writeError(w, http.StatusBadRequest, fmt.Sprintf("section has %d channels - move or delete them first", count)) return } if _, err := a.db.Exec(ctx, `DELETE FROM sections WHERE id = $1`, section.ID); err != nil { writeDBError(w, err) return } w.WriteHeader(http.StatusNoContent) } func (a *app) findSection(ctx context.Context, vertical, slug string, scope accessScope) (sectionOut, error) { args := []any{vertical, slug} where := "s.vertical = $1 AND s.slug = $2" if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` SELECT s.id, s.vertical, COALESCE(s.department_id, ''), s.slug, s.title, COALESCE(s.emoji, ''), COALESCE(s.description, ''), s.created_at, (SELECT count(*) FROM channels c WHERE c.section_id = s.id), (SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true), (SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = s.id), 0::bigint FROM sections s WHERE `+where+` `, args...) return scanSectionRow(row) } func (a *app) handleChannels(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: a.listChannels(r.Context(), w, r) case http.MethodPost: a.createChannel(r.Context(), w, r) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) listChannels(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) args := []any{vertical} where := "c.vertical = $1" if section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } rows, err := a.db.Query(ctx, ` SELECT c.id, c.tg_id, c.identifier, c.title, c.vertical, c.section_id, s.slug, c.is_active, c.last_message_id, c.last_polled_at, c.created_at FROM channels c JOIN sections s ON s.id = c.section_id WHERE `+where+` ORDER BY c.created_at DESC, c.id DESC `, args...) if err != nil { writeDBError(w, err) return } defer rows.Close() out := []channelOut{} for rows.Next() { item, err := scanChannel(rows) if err != nil { writeDBError(w, err) return } out = append(out, item) } if err := rows.Err(); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, out) } func (a *app) createChannel(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } var payload struct { Identifier string `json:"identifier"` Vertical string `json:"vertical"` Section string `json:"section"` } if !readBody(w, r, &payload) { return } payload.Identifier = strings.TrimSpace(payload.Identifier) payload.Vertical = strings.TrimSpace(payload.Vertical) payload.Section = strings.TrimSpace(payload.Section) if payload.Identifier == "" || payload.Vertical == "" || payload.Section == "" { writeError(w, http.StatusBadRequest, "identifier, vertical and section are required") return } section, err := a.findSection(ctx, payload.Vertical, payload.Section, scope) if err != nil { writeDBError(w, err) return } row := a.db.QueryRow(ctx, ` INSERT INTO channels (identifier, vertical, section_id, is_active) VALUES ($1, $2, $3, true) RETURNING id, tg_id, identifier, title, vertical, section_id, $4::text, is_active, last_message_id, last_polled_at, created_at `, payload.Identifier, payload.Vertical, section.ID, section.Slug) item, err := scanChannelRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusCreated, item) } func (a *app) handleChannelItem(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) { rest := strings.TrimPrefix(path, "/api/v1/channels/") parts := strings.Split(rest, "/") if len(parts) == 0 { writeError(w, http.StatusNotFound, "not found") return } id, err := strconv.ParseInt(parts[0], 10, 64) if err != nil { writeError(w, http.StatusBadRequest, "bad channel id") return } if len(parts) == 2 { switch parts[1] { case "poll", "backfill-media": a.proxyPython(w, r, path) case "reanalyze": a.reanalyzeChannel(ctx, w, r, id) case "stats": a.channelStats(ctx, w, r, id) default: writeError(w, http.StatusNotFound, "not found") } return } switch r.Method { case http.MethodGet: a.getChannel(ctx, w, r, id) case http.MethodPatch: a.updateChannel(ctx, w, r, id) case http.MethodDelete: a.deleteChannel(ctx, w, r, id) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) getChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, false) if !ok { return } channel, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, channel) } func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, true) if !ok { return } if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil { writeDBError(w, err) return } var payload struct { IsActive *bool `json:"is_active"` Vertical *string `json:"vertical"` Section *string `json:"section"` } if !readBody(w, r, &payload) { return } set := []string{} args := []any{} if payload.IsActive != nil { args = append(args, *payload.IsActive) set = append(set, fmt.Sprintf("is_active = $%d", len(args))) } if payload.Vertical != nil { args = append(args, strings.TrimSpace(*payload.Vertical)) set = append(set, fmt.Sprintf("vertical = $%d", len(args))) } if payload.Section != nil && strings.TrimSpace(*payload.Section) != "" { vertical := r.URL.Query().Get("vertical") if payload.Vertical != nil && strings.TrimSpace(*payload.Vertical) != "" { vertical = strings.TrimSpace(*payload.Vertical) } section, err := a.findSection(ctx, vertical, strings.TrimSpace(*payload.Section), scope) if err != nil { writeDBError(w, err) return } args = append(args, section.ID) set = append(set, fmt.Sprintf("section_id = $%d", len(args))) } if len(set) == 0 { a.getChannel(ctx, w, r, id) return } args = append(args, id) row := a.db.QueryRow(ctx, ` UPDATE channels SET `+strings.Join(set, ", ")+` WHERE id = $`+strconv.Itoa(len(args))+` RETURNING id, tg_id, identifier, title, vertical, section_id, (SELECT slug FROM sections WHERE id = channels.section_id), is_active, last_message_id, last_polled_at, created_at `, args...) item, err := scanChannelRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, item) } func (a *app) deleteChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, true) if !ok { return } if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil { writeDBError(w, err) return } if _, err := a.db.Exec(ctx, `DELETE FROM channels WHERE id = $1`, id); err != nil { writeDBError(w, err) return } w.WriteHeader(http.StatusNoContent) } func (a *app) findChannel(ctx context.Context, id int64, scope accessScope, vertical, section string) (channelOut, error) { args := []any{id} where := "c.id = $1" if strings.TrimSpace(vertical) != "" { args = append(args, strings.TrimSpace(vertical)) where += fmt.Sprintf(" AND c.vertical = $%d", len(args)) } if strings.TrimSpace(section) != "" { args = append(args, strings.TrimSpace(section)) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` SELECT c.id, c.tg_id, c.identifier, c.title, c.vertical, c.section_id, s.slug, c.is_active, c.last_message_id, c.last_polled_at, c.created_at FROM channels c JOIN sections s ON s.id = c.section_id WHERE `+where+` `, args...) return scanChannelRow(row) } func (a *app) channelStats(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, false) if !ok { return } ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) if err != nil { writeDBError(w, err) return } var messages, leads int64 key := "lead" field := "is_listing" if ch.Vertical == verticalHR { key = "hr_lead" field = "is_lead" } if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1`, id).Scan(&messages); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1 AND extracted -> $2 ->> $3 = 'true'`, id, key, field).Scan(&leads); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{ "channel_id": id, "messages_total": messages, "leads_total": leads, "last_polled_at": ch.LastPolledAt, "last_message_id": ch.LastMessageID, }) } func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, true) if !ok { return } ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) if err != nil { writeDBError(w, err) return } key := verdictKey(ch.Vertical) tag, err := a.db.Exec(ctx, ` UPDATE messages SET extracted = ( CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END ) - $1 WHERE channel_id = $2 `, key, id) if err != nil { writeDBError(w, err) return } pending, err := a.pendingLLM(ctx, scope, ch.Vertical, valueOrEmpty(ch.SectionSlug)) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]int64{"updated": tag.RowsAffected(), "pending": pending}) } func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } q := r.URL.Query() limit := clampInt(queryInt(q.Get("limit"), 50), 1, 500) offset := maxInt(queryInt(q.Get("offset"), 0), 0) args := []any{vertical} where := "c.vertical = $1" if section := strings.TrimSpace(q.Get("section")); section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if channelID := strings.TrimSpace(q.Get("channel_id")); channelID != "" { id, err := strconv.ParseInt(channelID, 10, 64) if err != nil { writeError(w, http.StatusBadRequest, "bad channel_id") return } args = append(args, id) where += fmt.Sprintf(" AND m.channel_id = $%d", len(args)) } if text := strings.TrimSpace(q.Get("q")); text != "" { args = append(args, "%"+text+"%") where += fmt.Sprintf(" AND m.text ILIKE $%d", len(args)) } if q.Get("leads_only") == "true" || q.Get("leads_only") == "1" { key := "lead" field := "is_listing" if vertical == verticalHR { key = "hr_lead" field = "is_lead" } args = append(args, key, field) where += fmt.Sprintf(" AND m.extracted -> $%d ->> $%d = 'true'", len(args)-1, len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } fetchLimit := clampInt(limit*5, limit, 1000) args = append(args, fetchLimit, offset) rows, err := a.db.Query(ctx, ` SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at FROM messages m JOIN channels c ON c.id = m.channel_id JOIN sections s ON s.id = c.section_id WHERE `+where+` ORDER BY m.date DESC, m.id DESC LIMIT $`+strconv.Itoa(len(args)-1)+` OFFSET $`+strconv.Itoa(len(args))+` `, args...) if err != nil { writeDBError(w, err) return } defer rows.Close() out := []messageOut{} for rows.Next() { item, err := scanMessage(rows) if err != nil { writeDBError(w, err) return } a.normalizeMessageMedia(&item) out = append(out, item) } if err := rows.Err(); err != nil { writeDBError(w, err) return } out = aggregateMessages(out, limit) writeJSON(w, http.StatusOK, out) } func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } idRaw := strings.TrimPrefix(path, "/api/v1/messages/") id, err := strconv.ParseInt(idRaw, 10, 64) if err != nil { writeError(w, http.StatusBadRequest, "bad message id") return } args := []any{id} where := "m.id = $1" if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at FROM messages m JOIN channels c ON c.id = m.channel_id JOIN sections s ON s.id = c.section_id WHERE `+where+` `, args...) item, err := scanMessageRow(row) if err != nil { writeDBError(w, err) return } a.normalizeMessageMedia(&item) writeJSON(w, http.StatusOK, item) } func (a *app) handleMedia(w http.ResponseWriter, r *http.Request, path string) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } rel, err := url.PathUnescape(strings.TrimPrefix(path, "/api/v1/media/")) if err != nil { writeError(w, http.StatusBadRequest, "bad media path") return } clean := strings.TrimPrefix(filepath.Clean("/"+rel), "/") if clean == "." || clean == "" || strings.HasPrefix(clean, "../") { writeError(w, http.StatusBadRequest, "bad media path") return } parts := strings.SplitN(clean, string(os.PathSeparator), 2) channelID, err := strconv.ParseInt(parts[0], 10, 64) if err != nil || channelID <= 0 { writeError(w, http.StatusBadRequest, "bad media path") return } allowed, err := a.canReadChannelMedia(r.Context(), scope, channelID) if err != nil { writeDBError(w, err) return } if !allowed { writeError(w, http.StatusNotFound, "not found") return } base, err := filepath.Abs(a.cfg.MediaDir) if err != nil { writeError(w, http.StatusInternalServerError, "media directory unavailable") return } full, err := filepath.Abs(filepath.Join(base, clean)) if err != nil || (full != base && !strings.HasPrefix(full, base+string(os.PathSeparator))) { writeError(w, http.StatusBadRequest, "bad media path") return } http.ServeFile(w, r, full) } func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channelID int64) (bool, error) { var dept sql.NullString err := a.db.QueryRow(ctx, ` SELECT s.department_id FROM channels c JOIN sections s ON s.id = c.section_id WHERE c.id = $1 `, channelID).Scan(&dept) if errors.Is(err, pgx.ErrNoRows) { return false, nil } if err != nil { return false, err } if scope.IsAdmin { return true, nil } return dept.Valid && dept.String == scope.DeptID, nil } func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) args := []any{vertical} where := "c.vertical = $1" if section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } var channelsTotal, channelsActive, messagesTotal, messages24h, leadsTotal, leads24h int64 var lastPoll sql.NullTime countQuery := `FROM channels c JOIN sections s ON s.id = c.section_id WHERE ` + where if err := a.db.QueryRow(ctx, `SELECT count(*) `+countQuery, args...).Scan(&channelsTotal); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) `+countQuery+` AND c.is_active = true`, args...).Scan(&channelsActive); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT max(c.last_polled_at) `+countQuery, args...).Scan(&lastPoll); err != nil { writeDBError(w, err) return } messageFrom := `FROM messages m JOIN channels c ON c.id = m.channel_id JOIN sections s ON s.id = c.section_id WHERE ` + where if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom, args...).Scan(&messagesTotal); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+` AND m.fetched_at >= now() - interval '24 hours'`, args...).Scan(&messages24h); err != nil { writeDBError(w, err) return } key := "lead" field := "is_listing" if vertical == verticalHR { key = "hr_lead" field = "is_lead" } leadArgs := append(append([]any{}, args...), key, field) leadClause := fmt.Sprintf(" AND m.extracted -> $%d ->> $%d = 'true'", len(leadArgs)-1, len(leadArgs)) if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause, leadArgs...).Scan(&leadsTotal); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause+` AND m.fetched_at >= now() - interval '24 hours'`, leadArgs...).Scan(&leads24h); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{ "vertical": vertical, "section_slug": nullableString(section), "channels_total": channelsTotal, "channels_active": channelsActive, "messages_total": messagesTotal, "messages_last_24h": messages24h, "leads_total": leadsTotal, "leads_last_24h": leads24h, "poll_interval_seconds": a.cfg.PollIntervalSeconds, "last_poll_at": nullTime(lastPoll), }) } func (a *app) handleLLMStatus(ctx context.Context, w http.ResponseWriter) { ready := false if a.cfg.LLMEnabled { req, err := http.NewRequestWithContext(ctx, http.MethodGet, strings.TrimRight(a.cfg.LLMBaseURL, "/")+"/v1/models", nil) if err == nil { if a.cfg.LLMAPIKey != "" { req.Header.Set("Authorization", "Bearer "+a.cfg.LLMAPIKey) } resp, err := a.http.Do(req) if err == nil { ready = resp.StatusCode >= 200 && resp.StatusCode < 300 _ = resp.Body.Close() } } } writeJSON(w, http.StatusOK, map[string]any{ "enabled": a.cfg.LLMEnabled, "ready": ready, "base_url": a.cfg.LLMBaseURL, "model": a.cfg.LLMModel, }) } func (a *app) handleLLMQueue(ctx context.Context, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } pending, err := a.pendingLLM(ctx, scope, vertical, strings.TrimSpace(r.URL.Query().Get("section"))) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]int64{"pending": pending}) } func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, section string) (int64, error) { args := []any{vertical} where := `c.vertical = $1 AND m.text IS NOT NULL AND ((c.vertical = 'hr' AND (m.extracted IS NULL OR m.extracted->'hr_lead' IS NULL)) OR (c.vertical <> 'hr' AND (m.extracted IS NULL OR m.extracted->'lead' IS NULL)))` if section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } var pending int64 err := a.db.QueryRow(ctx, ` SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id JOIN sections s ON s.id = c.section_id WHERE `+where, args...).Scan(&pending) return pending, err } func (a *app) handlePrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: a.getPrompt(ctx, w, r) case http.MethodPut: a.savePrompt(ctx, w, r) case http.MethodDelete: a.resetPrompt(ctx, w, r) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) getPrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) if section != "" { if _, err := a.findSection(ctx, vertical, section, scope); err != nil { writeDBError(w, err) return } } prompt, source, err := a.resolvePrompt(ctx, scope.DeptID, vertical, section) if err != nil { writeDBError(w, err) return } overridden, err := a.promptExists(ctx, scope.DeptID, vertical, section) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{ "vertical": vertical, "department_id": nullableString(scope.DeptID), "section": nullableString(section), "prompt": prompt, "default": defaultPrompt(vertical), "source": source, "is_overridden_here": overridden, }) } func (a *app) savePrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) var payload struct { Prompt string `json:"prompt"` } if !readBody(w, r, &payload) { return } text := strings.TrimSpace(payload.Prompt) if text == "" { writeError(w, http.StatusBadRequest, "prompt must be a non-empty string") return } if len(text) > 30000 { writeError(w, http.StatusBadRequest, "prompt is too long (max 30000 chars)") return } key := promptKey(scope.DeptID, vertical, section) value, _ := json.Marshal(text) if _, err := a.db.Exec(ctx, ` INSERT INTO app_settings (key, value, updated_at) VALUES ($1, $2::jsonb, now()) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = now() `, key, string(value)); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{"saved": true, "vertical": vertical, "department_id": nullableString(scope.DeptID), "section": nullableString(section), "length": len(text)}) } func (a *app) resetPrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) if _, err := a.db.Exec(ctx, `DELETE FROM app_settings WHERE key = $1`, promptKey(scope.DeptID, vertical, section)); err != nil { writeDBError(w, err) return } w.WriteHeader(http.StatusNoContent) } func (a *app) resolvePrompt(ctx context.Context, deptID, vertical, section string) (string, string, error) { keys := []struct { key string source string }{} if section != "" { keys = append(keys, struct { key string source string }{promptKey(deptID, vertical, section), "section"}) } keys = append(keys, struct { key string source string }{promptKey(deptID, vertical, ""), "vertical"}) for _, candidate := range keys { var text string err := a.db.QueryRow(ctx, `SELECT value #>> '{}' FROM app_settings WHERE key = $1`, candidate.key).Scan(&text) if err == nil && strings.TrimSpace(text) != "" { return text, candidate.source, nil } if err != nil && !errors.Is(err, pgx.ErrNoRows) { return "", "", err } } return defaultPrompt(vertical), "default", nil } func (a *app) promptExists(ctx context.Context, deptID, vertical, section string) (bool, error) { var exists bool err := a.db.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM app_settings WHERE key = $1)`, promptKey(deptID, vertical, section)).Scan(&exists) return exists, err } func (a *app) proxyPython(w http.ResponseWriter, r *http.Request, path string) { scope := readAccess(r) if strings.Contains(path, "/auth/") && !scope.CanAuth { writeError(w, http.StatusNotFound, "not found") return } if (strings.Contains(path, "/poll") || strings.Contains(path, "/backfill-media")) && !scope.CanManage { writeError(w, http.StatusNotFound, "not found") return } target := strings.TrimRight(a.cfg.PythonBaseURL, "/") + path if r.URL.RawQuery != "" { target += "?" + r.URL.RawQuery } body, err := io.ReadAll(r.Body) if err != nil { writeError(w, http.StatusBadRequest, "bad body") return } req, err := http.NewRequestWithContext(r.Context(), r.Method, target, bytes.NewReader(body)) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } req.Header = r.Header.Clone() resp, err := a.python.Do(req) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } defer resp.Body.Close() for k, values := range resp.Header { for _, v := range values { w.Header().Add(k, v) } } w.WriteHeader(resp.StatusCode) _, _ = io.Copy(w, resp.Body) } func (a *app) readScope(w http.ResponseWriter, r *http.Request, manage bool) (accessScope, bool) { scope := readAccess(r) if manage { if !scope.CanManage { writeError(w, http.StatusNotFound, "not found") return scope, false } } else if !scope.IsAdmin && scope.DeptID == "" { writeError(w, http.StatusForbidden, "department is required") return scope, false } if manage && !scope.IsAdmin && scope.DeptID == "" { writeError(w, http.StatusForbidden, "department is required") return scope, false } return scope, true } func readAccess(r *http.Request) accessScope { admin := r.Header.Get("X-User-Is-Admin") == "1" deptHead := r.Header.Get("X-User-Is-Department-Head") == "1" canManage := r.Header.Get("X-Monitoring-TG-Can-Manage") == "1" canAuth := r.Header.Get("X-Monitoring-TG-Can-Auth") == "1" return accessScope{ IsAdmin: admin, CanManage: admin || deptHead || canManage, CanAuth: admin || canAuth, DeptID: strings.TrimSpace(r.Header.Get("X-User-Department-Id")), } } type rowScanner interface { Scan(dest ...any) error } func scanSection(rows rowScanner) (sectionOut, error) { var item sectionOut var dept, emoji, description string err := rows.Scan( &item.ID, &item.Vertical, &dept, &item.Slug, &item.Title, &emoji, &description, &item.CreatedAt, &item.ChannelsTotal, &item.ChannelsActive, &item.MessagesTotal, &item.LeadsTotal, ) item.DepartmentID = nullableString(dept) item.Emoji = nullableString(emoji) item.Description = nullableString(description) return item, err } func scanSectionRow(row pgx.Row) (sectionOut, error) { return scanSection(row) } func scanChannel(rows rowScanner) (channelOut, error) { var item channelOut var tgID, lastMsg sql.NullInt64 var title, slug sql.NullString var lastPoll sql.NullTime err := rows.Scan( &item.ID, &tgID, &item.Identifier, &title, &item.Vertical, &item.SectionID, &slug, &item.IsActive, &lastMsg, &lastPoll, &item.CreatedAt, ) item.TGID = nullInt(tgID) item.Title = nullString(title) item.SectionSlug = nullString(slug) item.LastMessageID = nullInt(lastMsg) item.LastPolledAt = nullTimePtr(lastPoll) return item, err } func scanChannelRow(row pgx.Row) (channelOut, error) { return scanChannel(row) } func scanMessage(rows rowScanner) (messageOut, error) { var item messageOut var vertical, slug string var grouped, senderID, tgID, views, forwards sql.NullInt64 var text, senderUsername, senderName, identifier sql.NullString var extractedText, mediaText string err := rows.Scan( &item.ID, &item.ChannelID, &vertical, &slug, &item.TGMessageID, &grouped, &item.GroupSize, &item.Date, &text, &senderID, &senderUsername, &senderName, &identifier, &tgID, &item.HasMedia, &extractedText, &mediaText, &views, &forwards, &item.FetchedAt, ) item.ChannelVertical = nullableString(vertical) item.ChannelSectionSlug = nullableString(slug) item.GroupedID = nullInt(grouped) item.Text = nullString(text) item.SenderID = nullInt(senderID) item.SenderUsername = nullString(senderUsername) item.SenderName = nullString(senderName) item.Views = nullInt(views) item.Forwards = nullInt(forwards) if extractedText != "" && extractedText != "null" { item.Extracted = json.RawMessage(extractedText) } if mediaText != "" && mediaText != "null" { _ = json.Unmarshal([]byte(mediaText), &item.MediaFiles) } item.PostURL = buildPostURL(identifier.String, tgID, item.TGMessageID) return item, err } func scanMessageRow(row pgx.Row) (messageOut, error) { return scanMessage(row) } func (a *app) normalizeMessageMedia(item *messageOut) { if len(item.MediaFiles) == 0 { return } for _, file := range item.MediaFiles { rawURL, _ := file["url"].(string) rel := mediaRelativePath(rawURL) if rel == "" { continue } file["url"] = a.mediaURL(rel) if _, ok := file["name"].(string); !ok { file["name"] = filepath.Base(rel) } } } func mediaRelativePath(raw string) string { raw = strings.TrimSpace(raw) if raw == "" { return "" } if idx := strings.Index(raw, "/media/"); idx >= 0 { raw = raw[idx+len("/media/"):] } raw = strings.TrimPrefix(raw, "/") raw = strings.TrimPrefix(raw, "media/") clean := strings.TrimPrefix(filepath.Clean("/"+raw), "/") if clean == "." || clean == "" || strings.HasPrefix(clean, "../") { return "" } return clean } func (a *app) mediaURL(rel string) string { base := strings.TrimRight(a.cfg.PublicBasePath, "/") return base + "/api/v1/media/" + strings.TrimPrefix(rel, "/") } func aggregateMessages(items []messageOut, limit int) []messageOut { out := make([]messageOut, 0, len(items)) index := map[string]int{} for _, item := range items { key := messageGroupKey(item) if pos, ok := index[key]; ok { group := &out[pos] group.GroupSize++ group.HasMedia = group.HasMedia || item.HasMedia group.MediaFiles = append(group.MediaFiles, item.MediaFiles...) if group.Text == nil && item.Text != nil { group.Text = item.Text } if len(group.Extracted) == 0 && len(item.Extracted) > 0 { group.Extracted = item.Extracted } continue } if item.GroupSize < 1 { item.GroupSize = 1 } index[key] = len(out) out = append(out, item) } if len(out) > limit { return out[:limit] } return out } func messageGroupKey(item messageOut) string { if item.GroupedID != nil { return fmt.Sprintf("channel:%d:group:%d", item.ChannelID, *item.GroupedID) } return fmt.Sprintf("message:%d", item.ID) } func buildPostURL(identifier string, tgID sql.NullInt64, msgID int64) *string { identifier = strings.TrimSpace(identifier) if identifier == "" || msgID == 0 { return nil } if strings.HasPrefix(identifier, "https://t.me/") { base := strings.TrimRight(identifier, "/") v := base + "/" + strconv.FormatInt(msgID, 10) return &v } if strings.HasPrefix(identifier, "@") { v := "https://t.me/" + strings.TrimPrefix(identifier, "@") + "/" + strconv.FormatInt(msgID, 10) return &v } if !strings.Contains(identifier, "/") && !strings.Contains(identifier, "+") { v := "https://t.me/" + identifier + "/" + strconv.FormatInt(msgID, 10) return &v } return nil } func readBody(w http.ResponseWriter, r *http.Request, out any) bool { defer r.Body.Close() if err := json.NewDecoder(r.Body).Decode(out); err != nil { writeError(w, http.StatusBadRequest, "invalid json") return false } return true } func writeJSON(w http.ResponseWriter, status int, payload any) { w.Header().Set("Content-Type", "application/json; charset=utf-8") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(payload) } func writeError(w http.ResponseWriter, status int, detail string) { writeJSON(w, status, map[string]string{"detail": detail}) } func writeDBError(w http.ResponseWriter, err error) { if errors.Is(err, pgx.ErrNoRows) { writeError(w, http.StatusNotFound, "not found") return } slog.Error("api_db_error", "error", err) writeError(w, http.StatusInternalServerError, "database error") } func queryRequired(w http.ResponseWriter, r *http.Request, name string) string { value := strings.TrimSpace(r.URL.Query().Get(name)) if value == "" { writeError(w, http.StatusBadRequest, name+" is required") } return value } func promptKey(deptID, vertical, section string) string { if deptID == "" { deptID = "global" } if section != "" { return fmt.Sprintf("llm_system_prompt:%s:%s:%s", deptID, vertical, section) } return fmt.Sprintf("llm_system_prompt:%s:%s", deptID, vertical) } func verdictKey(vertical string) string { if vertical == verticalHR { return "hr_lead" } return "lead" } func defaultPrompt(vertical string) string { if vertical == verticalHR { return defaultHRPrompt } return defaultREPrompt } func normalizeSlug(raw string) string { raw = strings.ToLower(strings.TrimSpace(raw)) raw = strings.ReplaceAll(raw, " ", "-") var b strings.Builder for _, r := range raw { if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' || r == '-' { b.WriteRune(r) } } return b.String() } func nullableString(v string) *string { v = strings.TrimSpace(v) if v == "" { return nil } return &v } func nullString(v sql.NullString) *string { if !v.Valid || strings.TrimSpace(v.String) == "" { return nil } return &v.String } func nullInt(v sql.NullInt64) *int64 { if !v.Valid { return nil } return &v.Int64 } func nullTime(v sql.NullTime) any { if !v.Valid { return nil } return v.Time } func nullTimePtr(v sql.NullTime) *time.Time { if !v.Valid { return nil } return &v.Time } func valueOrEmpty(v *string) string { if v == nil { return "" } return *v } func queryInt(raw string, fallback int) int { if raw == "" { return fallback } n, err := strconv.Atoi(raw) if err != nil { return fallback } return n } func clampInt(n, min, max int) int { if n < min { return min } if n > max { return max } return n } func maxInt(n, min int) int { if n < min { return min } return n } func loadConfig() config { return config{ APIHost: env("API_HOST", "0.0.0.0"), APIPort: envInt("API_PORT", 8000), PublicBasePath: env("PUBLIC_BASE_PATH", ""), PythonBaseURL: env("PYTHON_BASE_URL", "http://127.0.0.1:8001"), MediaDir: env("MEDIA_DIR", "/data/media"), PostgresUser: env("POSTGRES_USER", "parser"), PostgresPassword: env("POSTGRES_PASSWORD", "parser"), PostgresDB: env("POSTGRES_DB", "parser"), PostgresHost: env("POSTGRES_HOST", "db"), PostgresPort: envInt("POSTGRES_PORT", 5432), PollIntervalSeconds: envInt("POLL_INTERVAL_SECONDS", 60), LLMEnabled: envBool("LLM_ENABLED", true), LLMBaseURL: env("LLM_BASE_URL", "http://10.2.3.5:8002"), LLMAPIKey: env("LLM_API_KEY", ""), LLMModel: env("LLM_MODEL", "qwen2.5-14b"), LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second, } } func (c config) databaseURL() string { return fmt.Sprintf( "postgres://%s:%s@%s:%d/%s", url.QueryEscape(c.PostgresUser), url.QueryEscape(c.PostgresPassword), c.PostgresHost, c.PostgresPort, url.QueryEscape(c.PostgresDB), ) } func env(key, fallback string) string { if v := strings.TrimSpace(os.Getenv(key)); v != "" { return v } return fallback } func envInt(key string, fallback int) int { if raw := strings.TrimSpace(os.Getenv(key)); raw != "" { if n, err := strconv.Atoi(raw); err == nil { return n } } return fallback } func envBool(key string, fallback bool) bool { if raw := strings.TrimSpace(os.Getenv(key)); raw != "" { if b, err := strconv.ParseBool(raw); err == nil { return b } if raw == "1" { return true } if raw == "0" { return false } } return fallback } const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала. Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости. Отвечай строго валидным JSON без markdown: { "is_listing": boolean, "kind": "sale" | "rent" | "purchase" | null, "property_type": string | null, "rooms": string | null, "area_m2": number | null, "price_text": string | null, "price_value": number | null, "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, "location": string | null, "contact_phone": string | null, "contact_name": string | null, "summary": string, "confidence": number } summary всегда по-русски, confidence в диапазоне 0..1.` const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала. Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт. Отвечай строго валидным JSON без markdown: { "is_lead": boolean, "kind": "vacancy" | "resume" | "contact" | null, "title": string | null, "company": string | null, "candidate_name": string | null, "experience_years": number | null, "skills": string[], "location": string | null, "remote": boolean | null, "employment_type": "full-time" | "part-time" | "contract" | "internship" | null, "salary_text": string | null, "salary_value": number | null, "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, "contact_phone": string | null, "contact_name": string | null, "summary": string, "confidence": number } summary всегда по-русски, confidence в диапазоне 0..1.`