package main import ( "bytes" "context" "database/sql" "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" "net/url" "os" "os/signal" "strconv" "strings" "syscall" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) const ( verticalHR = "hr" ) type config struct { APIHost string APIPort int PublicBasePath string PythonBaseURL string PostgresUser string PostgresPassword string PostgresDB string PostgresHost string PostgresPort int PollIntervalSeconds int LLMEnabled bool LLMBaseURL string LLMAPIKey string LLMModel string LLMTimeout time.Duration } type app struct { cfg config db *pgxpool.Pool http *http.Client python *http.Client } type accessScope struct { IsAdmin bool CanManage bool DeptID string } type sectionOut struct { ID int64 `json:"id"` Vertical string `json:"vertical"` DepartmentID *string `json:"department_id,omitempty"` Slug string `json:"slug"` Title string `json:"title"` Emoji *string `json:"emoji,omitempty"` Description *string `json:"description,omitempty"` CreatedAt time.Time `json:"created_at"` ChannelsTotal int64 `json:"channels_total"` ChannelsActive int64 `json:"channels_active"` MessagesTotal int64 `json:"messages_total"` LeadsTotal int64 `json:"leads_total"` } type channelOut struct { ID int64 `json:"id"` TGID *int64 `json:"tg_id,omitempty"` Identifier string `json:"identifier"` Title *string `json:"title,omitempty"` Vertical string `json:"vertical"` SectionID int64 `json:"section_id"` SectionSlug *string `json:"section_slug,omitempty"` IsActive bool `json:"is_active"` LastMessageID *int64 `json:"last_message_id,omitempty"` LastPolledAt *time.Time `json:"last_polled_at,omitempty"` CreatedAt time.Time `json:"created_at"` } type messageOut struct { ID int64 `json:"id"` ChannelID int64 `json:"channel_id"` ChannelVertical *string `json:"channel_vertical,omitempty"` ChannelSectionSlug *string `json:"channel_section_slug,omitempty"` TGMessageID int64 `json:"tg_message_id"` GroupedID *int64 `json:"grouped_id,omitempty"` GroupSize int `json:"group_size"` Date time.Time `json:"date"` Text *string `json:"text,omitempty"` SenderID *int64 `json:"sender_id,omitempty"` SenderUsername *string `json:"sender_username,omitempty"` SenderName *string `json:"sender_name,omitempty"` PostURL *string `json:"post_url,omitempty"` HasMedia bool `json:"has_media"` Extracted json.RawMessage `json:"extracted,omitempty"` Views *int64 `json:"views,omitempty"` Forwards *int64 `json:"forwards,omitempty"` FetchedAt time.Time `json:"fetched_at"` } func main() { cfg := loadConfig() logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) slog.SetDefault(logger) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() pool, err := pgxpool.New(ctx, cfg.databaseURL()) if err != nil { slog.Error("db_connect_failed", "error", err) os.Exit(1) } defer pool.Close() srvApp := &app{ cfg: cfg, db: pool, http: &http.Client{Timeout: cfg.LLMTimeout}, python: &http.Client{Timeout: 15 * time.Minute}, } server := &http.Server{ Addr: fmt.Sprintf("%s:%d", cfg.APIHost, cfg.APIPort), Handler: http.HandlerFunc(srvApp.serveHTTP), ReadHeaderTimeout: 10 * time.Second, } go func() { <-ctx.Done() shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _ = server.Shutdown(shutdownCtx) }() slog.Info("go_api_started", "addr", server.Addr, "python_base_url", cfg.PythonBaseURL) if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { slog.Error("server_failed", "error", err) os.Exit(1) } } func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) { path := a.apiPath(r.URL.Path) if path == "/healthz" { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) return } if path == "/" { writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-tg", "ui": "portal", "api": "go"}) return } if !strings.HasPrefix(path, "/api/v1") { writeError(w, http.StatusNotFound, "not found") return } ctx := r.Context() switch { case r.Method == http.MethodGet && path == "/api/v1/access/me": a.handleAccessMe(w, r) case path == "/api/v1/auth/status" || strings.HasPrefix(path, "/api/v1/auth/"): a.proxyPython(w, r, path) case path == "/api/v1/sections": a.handleSections(w, r) case strings.HasPrefix(path, "/api/v1/sections/"): a.handleSectionItem(w, r, path) case path == "/api/v1/channels": a.handleChannels(w, r) case strings.HasPrefix(path, "/api/v1/channels/"): a.handleChannelItem(ctx, w, r, path) case path == "/api/v1/messages": a.handleMessages(ctx, w, r) case strings.HasPrefix(path, "/api/v1/messages/"): a.handleMessageItem(ctx, w, r, path) case path == "/api/v1/stats": a.handleStats(ctx, w, r) case path == "/api/v1/llm/status": a.handleLLMStatus(ctx, w) case path == "/api/v1/llm/queue": a.handleLLMQueue(ctx, w, r) case path == "/api/v1/llm/prompt": a.handlePrompt(ctx, w, r) case r.Method == http.MethodPost && path == "/api/v1/poll": a.proxyPython(w, r, path) default: writeError(w, http.StatusNotFound, "not found") } } func (a *app) apiPath(path string) string { base := strings.TrimRight(a.cfg.PublicBasePath, "/") if base != "" && strings.HasPrefix(path, base+"/") { return strings.TrimPrefix(path, base) } if base != "" && path == base { return "/" } return path } func (a *app) handleAccessMe(w http.ResponseWriter, r *http.Request) { scope := readAccess(r) writeJSON(w, http.StatusOK, map[string]any{ "is_admin": scope.IsAdmin, "can_manage_department": scope.CanManage, "department_id": nullableString(scope.DeptID), }) } func (a *app) handleSections(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: a.listSections(r.Context(), w, r) case http.MethodPost: a.createSection(r.Context(), w, r) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) listSections(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } args := []any{vertical} deptFilter := "" if !scope.IsAdmin { args = append(args, scope.DeptID) deptFilter = fmt.Sprintf(" AND s.department_id = $%d", len(args)) } rows, err := a.db.Query(ctx, ` SELECT s.id, s.vertical, COALESCE(s.department_id, ''), s.slug, s.title, COALESCE(s.emoji, ''), COALESCE(s.description, ''), s.created_at, (SELECT count(*) FROM channels c WHERE c.section_id = s.id), (SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true), (SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = s.id), (SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = s.id AND ( (s.vertical = 'hr' AND m.extracted->'hr_lead'->>'is_lead' = 'true') OR (s.vertical <> 'hr' AND m.extracted->'lead'->>'is_listing' = 'true') )) FROM sections s WHERE s.vertical = $1`+deptFilter+` ORDER BY s.created_at ASC, s.id ASC `, args...) if err != nil { writeDBError(w, err) return } defer rows.Close() out := []sectionOut{} for rows.Next() { item, err := scanSection(rows) if err != nil { writeDBError(w, err) return } out = append(out, item) } if err := rows.Err(); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, out) } func (a *app) createSection(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } var payload struct { Vertical string `json:"vertical"` Slug string `json:"slug"` Title string `json:"title"` Emoji *string `json:"emoji"` Description *string `json:"description"` } if !readBody(w, r, &payload) { return } payload.Vertical = strings.TrimSpace(payload.Vertical) payload.Slug = normalizeSlug(payload.Slug) payload.Title = strings.TrimSpace(payload.Title) if payload.Vertical == "" || payload.Slug == "" || payload.Title == "" { writeError(w, http.StatusBadRequest, "vertical, slug and title are required") return } dept := nullableString(scope.DeptID) row := a.db.QueryRow(ctx, ` INSERT INTO sections (vertical, department_id, slug, title, emoji, description) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at, 0::bigint, 0::bigint, 0::bigint, 0::bigint `, payload.Vertical, dept, payload.Slug, payload.Title, payload.Emoji, payload.Description) item, err := scanSectionRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusCreated, item) } func (a *app) handleSectionItem(w http.ResponseWriter, r *http.Request, path string) { rest := strings.TrimPrefix(path, "/api/v1/sections/") parts := strings.SplitN(rest, "/", 2) if len(parts) != 2 { writeError(w, http.StatusNotFound, "not found") return } vertical, err := url.PathUnescape(parts[0]) if err != nil { writeError(w, http.StatusBadRequest, "bad vertical") return } slug, err := url.PathUnescape(parts[1]) if err != nil { writeError(w, http.StatusBadRequest, "bad slug") return } switch r.Method { case http.MethodGet: a.getSection(r.Context(), w, r, vertical, slug) case http.MethodPatch: a.updateSection(r.Context(), w, r, vertical, slug) case http.MethodDelete: a.deleteSection(r.Context(), w, r, vertical, slug) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) getSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { scope, ok := a.readScope(w, r, false) if !ok { return } item, err := a.findSection(ctx, vertical, slug, scope) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, item) } func (a *app) updateSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { scope, ok := a.readScope(w, r, true) if !ok { return } var payload struct { Title *string `json:"title"` Emoji *string `json:"emoji"` Description *string `json:"description"` } if !readBody(w, r, &payload) { return } set := []string{} args := []any{} if payload.Title != nil { args = append(args, strings.TrimSpace(*payload.Title)) set = append(set, fmt.Sprintf("title = $%d", len(args))) } if payload.Emoji != nil { args = append(args, nullableString(strings.TrimSpace(*payload.Emoji))) set = append(set, fmt.Sprintf("emoji = $%d", len(args))) } if payload.Description != nil { args = append(args, nullableString(strings.TrimSpace(*payload.Description))) set = append(set, fmt.Sprintf("description = $%d", len(args))) } if len(set) == 0 { a.getSection(ctx, w, r, vertical, slug) return } args = append(args, vertical, slug) where := fmt.Sprintf("vertical = $%d AND slug = $%d", len(args)-1, len(args)) if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` UPDATE sections SET `+strings.Join(set, ", ")+` WHERE `+where+` RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at, (SELECT count(*) FROM channels c WHERE c.section_id = sections.id), (SELECT count(*) FROM channels c WHERE c.section_id = sections.id AND c.is_active = true), (SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = sections.id), 0::bigint `, args...) item, err := scanSectionRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, item) } func (a *app) deleteSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { scope, ok := a.readScope(w, r, true) if !ok { return } section, err := a.findSection(ctx, vertical, slug, scope) if err != nil { writeDBError(w, err) return } var count int64 if err := a.db.QueryRow(ctx, `SELECT count(*) FROM channels WHERE section_id = $1`, section.ID).Scan(&count); err != nil { writeDBError(w, err) return } if count > 0 { writeError(w, http.StatusBadRequest, fmt.Sprintf("section has %d channels - move or delete them first", count)) return } if _, err := a.db.Exec(ctx, `DELETE FROM sections WHERE id = $1`, section.ID); err != nil { writeDBError(w, err) return } w.WriteHeader(http.StatusNoContent) } func (a *app) findSection(ctx context.Context, vertical, slug string, scope accessScope) (sectionOut, error) { args := []any{vertical, slug} where := "s.vertical = $1 AND s.slug = $2" if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` SELECT s.id, s.vertical, COALESCE(s.department_id, ''), s.slug, s.title, COALESCE(s.emoji, ''), COALESCE(s.description, ''), s.created_at, (SELECT count(*) FROM channels c WHERE c.section_id = s.id), (SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true), (SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = s.id), 0::bigint FROM sections s WHERE `+where+` `, args...) return scanSectionRow(row) } func (a *app) handleChannels(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: a.listChannels(r.Context(), w, r) case http.MethodPost: a.createChannel(r.Context(), w, r) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) listChannels(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) args := []any{vertical} where := "c.vertical = $1" if section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } rows, err := a.db.Query(ctx, ` SELECT c.id, c.tg_id, c.identifier, c.title, c.vertical, c.section_id, s.slug, c.is_active, c.last_message_id, c.last_polled_at, c.created_at FROM channels c JOIN sections s ON s.id = c.section_id WHERE `+where+` ORDER BY c.created_at DESC, c.id DESC `, args...) if err != nil { writeDBError(w, err) return } defer rows.Close() out := []channelOut{} for rows.Next() { item, err := scanChannel(rows) if err != nil { writeDBError(w, err) return } out = append(out, item) } if err := rows.Err(); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, out) } func (a *app) createChannel(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } var payload struct { Identifier string `json:"identifier"` Vertical string `json:"vertical"` Section string `json:"section"` } if !readBody(w, r, &payload) { return } payload.Identifier = strings.TrimSpace(payload.Identifier) payload.Vertical = strings.TrimSpace(payload.Vertical) payload.Section = strings.TrimSpace(payload.Section) if payload.Identifier == "" || payload.Vertical == "" || payload.Section == "" { writeError(w, http.StatusBadRequest, "identifier, vertical and section are required") return } section, err := a.findSection(ctx, payload.Vertical, payload.Section, scope) if err != nil { writeDBError(w, err) return } row := a.db.QueryRow(ctx, ` INSERT INTO channels (identifier, vertical, section_id, is_active) VALUES ($1, $2, $3, true) RETURNING id, tg_id, identifier, title, vertical, section_id, $4::text, is_active, last_message_id, last_polled_at, created_at `, payload.Identifier, payload.Vertical, section.ID, section.Slug) item, err := scanChannelRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusCreated, item) } func (a *app) handleChannelItem(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) { rest := strings.TrimPrefix(path, "/api/v1/channels/") parts := strings.Split(rest, "/") if len(parts) == 0 { writeError(w, http.StatusNotFound, "not found") return } id, err := strconv.ParseInt(parts[0], 10, 64) if err != nil { writeError(w, http.StatusBadRequest, "bad channel id") return } if len(parts) == 2 { switch parts[1] { case "poll", "backfill-media": a.proxyPython(w, r, path) case "reanalyze": a.reanalyzeChannel(ctx, w, r, id) case "stats": a.channelStats(ctx, w, r, id) default: writeError(w, http.StatusNotFound, "not found") } return } switch r.Method { case http.MethodGet: a.getChannel(ctx, w, r, id) case http.MethodPatch: a.updateChannel(ctx, w, r, id) case http.MethodDelete: a.deleteChannel(ctx, w, r, id) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) getChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, false) if !ok { return } channel, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, channel) } func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, true) if !ok { return } if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil { writeDBError(w, err) return } var payload struct { IsActive *bool `json:"is_active"` Vertical *string `json:"vertical"` Section *string `json:"section"` } if !readBody(w, r, &payload) { return } set := []string{} args := []any{} if payload.IsActive != nil { args = append(args, *payload.IsActive) set = append(set, fmt.Sprintf("is_active = $%d", len(args))) } if payload.Vertical != nil { args = append(args, strings.TrimSpace(*payload.Vertical)) set = append(set, fmt.Sprintf("vertical = $%d", len(args))) } if payload.Section != nil && strings.TrimSpace(*payload.Section) != "" { vertical := r.URL.Query().Get("vertical") if payload.Vertical != nil && strings.TrimSpace(*payload.Vertical) != "" { vertical = strings.TrimSpace(*payload.Vertical) } section, err := a.findSection(ctx, vertical, strings.TrimSpace(*payload.Section), scope) if err != nil { writeDBError(w, err) return } args = append(args, section.ID) set = append(set, fmt.Sprintf("section_id = $%d", len(args))) } if len(set) == 0 { a.getChannel(ctx, w, r, id) return } args = append(args, id) row := a.db.QueryRow(ctx, ` UPDATE channels SET `+strings.Join(set, ", ")+` WHERE id = $`+strconv.Itoa(len(args))+` RETURNING id, tg_id, identifier, title, vertical, section_id, (SELECT slug FROM sections WHERE id = channels.section_id), is_active, last_message_id, last_polled_at, created_at `, args...) item, err := scanChannelRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, item) } func (a *app) deleteChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, true) if !ok { return } if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil { writeDBError(w, err) return } if _, err := a.db.Exec(ctx, `DELETE FROM channels WHERE id = $1`, id); err != nil { writeDBError(w, err) return } w.WriteHeader(http.StatusNoContent) } func (a *app) findChannel(ctx context.Context, id int64, scope accessScope, vertical, section string) (channelOut, error) { args := []any{id} where := "c.id = $1" if strings.TrimSpace(vertical) != "" { args = append(args, strings.TrimSpace(vertical)) where += fmt.Sprintf(" AND c.vertical = $%d", len(args)) } if strings.TrimSpace(section) != "" { args = append(args, strings.TrimSpace(section)) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` SELECT c.id, c.tg_id, c.identifier, c.title, c.vertical, c.section_id, s.slug, c.is_active, c.last_message_id, c.last_polled_at, c.created_at FROM channels c JOIN sections s ON s.id = c.section_id WHERE `+where+` `, args...) return scanChannelRow(row) } func (a *app) channelStats(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, false) if !ok { return } ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) if err != nil { writeDBError(w, err) return } var messages, leads int64 key := "lead" field := "is_listing" if ch.Vertical == verticalHR { key = "hr_lead" field = "is_lead" } if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1`, id).Scan(&messages); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1 AND extracted -> $2 ->> $3 = 'true'`, id, key, field).Scan(&leads); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{ "channel_id": id, "messages_total": messages, "leads_total": leads, "last_polled_at": ch.LastPolledAt, "last_message_id": ch.LastMessageID, }) } func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { scope, ok := a.readScope(w, r, true) if !ok { return } ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) if err != nil { writeDBError(w, err) return } key := verdictKey(ch.Vertical) tag, err := a.db.Exec(ctx, `UPDATE messages SET extracted = COALESCE(extracted, '{}'::jsonb) - $1 WHERE channel_id = $2`, key, id) if err != nil { writeDBError(w, err) return } pending, err := a.pendingLLM(ctx, scope, ch.Vertical, valueOrEmpty(ch.SectionSlug)) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]int64{"updated": tag.RowsAffected(), "pending": pending}) } func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } q := r.URL.Query() limit := clampInt(queryInt(q.Get("limit"), 50), 1, 500) offset := maxInt(queryInt(q.Get("offset"), 0), 0) args := []any{vertical} where := "c.vertical = $1" if section := strings.TrimSpace(q.Get("section")); section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if channelID := strings.TrimSpace(q.Get("channel_id")); channelID != "" { id, err := strconv.ParseInt(channelID, 10, 64) if err != nil { writeError(w, http.StatusBadRequest, "bad channel_id") return } args = append(args, id) where += fmt.Sprintf(" AND m.channel_id = $%d", len(args)) } if text := strings.TrimSpace(q.Get("q")); text != "" { args = append(args, "%"+text+"%") where += fmt.Sprintf(" AND m.text ILIKE $%d", len(args)) } if q.Get("leads_only") == "true" || q.Get("leads_only") == "1" { key := "lead" field := "is_listing" if vertical == verticalHR { key = "hr_lead" field = "is_lead" } args = append(args, key, field) where += fmt.Sprintf(" AND m.extracted -> $%d ->> $%d = 'true'", len(args)-1, len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } args = append(args, limit, offset) rows, err := a.db.Query(ctx, ` SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, m.views, m.forwards, m.fetched_at FROM messages m JOIN channels c ON c.id = m.channel_id JOIN sections s ON s.id = c.section_id WHERE `+where+` ORDER BY m.date DESC, m.id DESC LIMIT $`+strconv.Itoa(len(args)-1)+` OFFSET $`+strconv.Itoa(len(args))+` `, args...) if err != nil { writeDBError(w, err) return } defer rows.Close() out := []messageOut{} for rows.Next() { item, err := scanMessage(rows) if err != nil { writeDBError(w, err) return } out = append(out, item) } if err := rows.Err(); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, out) } func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } idRaw := strings.TrimPrefix(path, "/api/v1/messages/") id, err := strconv.ParseInt(idRaw, 10, 64) if err != nil { writeError(w, http.StatusBadRequest, "bad message id") return } args := []any{id} where := "m.id = $1" if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } row := a.db.QueryRow(ctx, ` SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, m.views, m.forwards, m.fetched_at FROM messages m JOIN channels c ON c.id = m.channel_id JOIN sections s ON s.id = c.section_id WHERE `+where+` `, args...) item, err := scanMessageRow(row) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, item) } func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) args := []any{vertical} where := "c.vertical = $1" if section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } var channelsTotal, channelsActive, messagesTotal, messages24h, leadsTotal, leads24h int64 var lastPoll sql.NullTime countQuery := `FROM channels c JOIN sections s ON s.id = c.section_id WHERE ` + where if err := a.db.QueryRow(ctx, `SELECT count(*) `+countQuery, args...).Scan(&channelsTotal); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) `+countQuery+` AND c.is_active = true`, args...).Scan(&channelsActive); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT max(c.last_polled_at) `+countQuery, args...).Scan(&lastPoll); err != nil { writeDBError(w, err) return } messageFrom := `FROM messages m JOIN channels c ON c.id = m.channel_id JOIN sections s ON s.id = c.section_id WHERE ` + where if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom, args...).Scan(&messagesTotal); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+` AND m.fetched_at >= now() - interval '24 hours'`, args...).Scan(&messages24h); err != nil { writeDBError(w, err) return } key := "lead" field := "is_listing" if vertical == verticalHR { key = "hr_lead" field = "is_lead" } leadArgs := append(append([]any{}, args...), key, field) leadClause := fmt.Sprintf(" AND m.extracted -> $%d ->> $%d = 'true'", len(leadArgs)-1, len(leadArgs)) if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause, leadArgs...).Scan(&leadsTotal); err != nil { writeDBError(w, err) return } if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause+` AND m.fetched_at >= now() - interval '24 hours'`, leadArgs...).Scan(&leads24h); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{ "vertical": vertical, "section_slug": nullableString(section), "channels_total": channelsTotal, "channels_active": channelsActive, "messages_total": messagesTotal, "messages_last_24h": messages24h, "leads_total": leadsTotal, "leads_last_24h": leads24h, "poll_interval_seconds": a.cfg.PollIntervalSeconds, "last_poll_at": nullTime(lastPoll), }) } func (a *app) handleLLMStatus(ctx context.Context, w http.ResponseWriter) { ready := false if a.cfg.LLMEnabled { req, err := http.NewRequestWithContext(ctx, http.MethodGet, strings.TrimRight(a.cfg.LLMBaseURL, "/")+"/v1/models", nil) if err == nil { if a.cfg.LLMAPIKey != "" { req.Header.Set("Authorization", "Bearer "+a.cfg.LLMAPIKey) } resp, err := a.http.Do(req) if err == nil { ready = resp.StatusCode >= 200 && resp.StatusCode < 300 _ = resp.Body.Close() } } } writeJSON(w, http.StatusOK, map[string]any{ "enabled": a.cfg.LLMEnabled, "ready": ready, "base_url": a.cfg.LLMBaseURL, "model": a.cfg.LLMModel, }) } func (a *app) handleLLMQueue(ctx context.Context, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } pending, err := a.pendingLLM(ctx, scope, vertical, strings.TrimSpace(r.URL.Query().Get("section"))) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]int64{"pending": pending}) } func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, section string) (int64, error) { args := []any{vertical} where := `c.vertical = $1 AND m.text IS NOT NULL AND ((c.vertical = 'hr' AND (m.extracted IS NULL OR m.extracted->'hr_lead' IS NULL)) OR (c.vertical <> 'hr' AND (m.extracted IS NULL OR m.extracted->'lead' IS NULL)))` if section != "" { args = append(args, section) where += fmt.Sprintf(" AND s.slug = $%d", len(args)) } if !scope.IsAdmin { args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } var pending int64 err := a.db.QueryRow(ctx, ` SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id JOIN sections s ON s.id = c.section_id WHERE `+where, args...).Scan(&pending) return pending, err } func (a *app) handlePrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: a.getPrompt(ctx, w, r) case http.MethodPut: a.savePrompt(ctx, w, r) case http.MethodDelete: a.resetPrompt(ctx, w, r) default: writeError(w, http.StatusMethodNotAllowed, "method not allowed") } } func (a *app) getPrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, false) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) if section != "" { if _, err := a.findSection(ctx, vertical, section, scope); err != nil { writeDBError(w, err) return } } prompt, source, err := a.resolvePrompt(ctx, scope.DeptID, vertical, section) if err != nil { writeDBError(w, err) return } overridden, err := a.promptExists(ctx, scope.DeptID, vertical, section) if err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{ "vertical": vertical, "department_id": nullableString(scope.DeptID), "section": nullableString(section), "prompt": prompt, "default": defaultPrompt(vertical), "source": source, "is_overridden_here": overridden, }) } func (a *app) savePrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) var payload struct { Prompt string `json:"prompt"` } if !readBody(w, r, &payload) { return } text := strings.TrimSpace(payload.Prompt) if text == "" { writeError(w, http.StatusBadRequest, "prompt must be a non-empty string") return } if len(text) > 30000 { writeError(w, http.StatusBadRequest, "prompt is too long (max 30000 chars)") return } key := promptKey(scope.DeptID, vertical, section) value, _ := json.Marshal(text) if _, err := a.db.Exec(ctx, ` INSERT INTO app_settings (key, value, updated_at) VALUES ($1, $2::jsonb, now()) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = now() `, key, string(value)); err != nil { writeDBError(w, err) return } writeJSON(w, http.StatusOK, map[string]any{"saved": true, "vertical": vertical, "department_id": nullableString(scope.DeptID), "section": nullableString(section), "length": len(text)}) } func (a *app) resetPrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { scope, ok := a.readScope(w, r, true) if !ok { return } vertical := queryRequired(w, r, "vertical") if vertical == "" { return } section := strings.TrimSpace(r.URL.Query().Get("section")) if _, err := a.db.Exec(ctx, `DELETE FROM app_settings WHERE key = $1`, promptKey(scope.DeptID, vertical, section)); err != nil { writeDBError(w, err) return } w.WriteHeader(http.StatusNoContent) } func (a *app) resolvePrompt(ctx context.Context, deptID, vertical, section string) (string, string, error) { keys := []struct { key string source string }{} if section != "" { keys = append(keys, struct { key string source string }{promptKey(deptID, vertical, section), "section"}) } keys = append(keys, struct { key string source string }{promptKey(deptID, vertical, ""), "vertical"}) for _, candidate := range keys { var text string err := a.db.QueryRow(ctx, `SELECT value #>> '{}' FROM app_settings WHERE key = $1`, candidate.key).Scan(&text) if err == nil && strings.TrimSpace(text) != "" { return text, candidate.source, nil } if err != nil && !errors.Is(err, pgx.ErrNoRows) { return "", "", err } } return defaultPrompt(vertical), "default", nil } func (a *app) promptExists(ctx context.Context, deptID, vertical, section string) (bool, error) { var exists bool err := a.db.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM app_settings WHERE key = $1)`, promptKey(deptID, vertical, section)).Scan(&exists) return exists, err } func (a *app) proxyPython(w http.ResponseWriter, r *http.Request, path string) { scope := readAccess(r) if strings.Contains(path, "/auth/") && !scope.IsAdmin { writeError(w, http.StatusNotFound, "not found") return } if (strings.Contains(path, "/poll") || strings.Contains(path, "/backfill-media")) && !scope.CanManage { writeError(w, http.StatusNotFound, "not found") return } target := strings.TrimRight(a.cfg.PythonBaseURL, "/") + path if r.URL.RawQuery != "" { target += "?" + r.URL.RawQuery } body, err := io.ReadAll(r.Body) if err != nil { writeError(w, http.StatusBadRequest, "bad body") return } req, err := http.NewRequestWithContext(r.Context(), r.Method, target, bytes.NewReader(body)) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } req.Header = r.Header.Clone() resp, err := a.python.Do(req) if err != nil { writeError(w, http.StatusBadGateway, err.Error()) return } defer resp.Body.Close() for k, values := range resp.Header { for _, v := range values { w.Header().Add(k, v) } } w.WriteHeader(resp.StatusCode) _, _ = io.Copy(w, resp.Body) } func (a *app) readScope(w http.ResponseWriter, r *http.Request, manage bool) (accessScope, bool) { scope := readAccess(r) if manage { if !scope.CanManage { writeError(w, http.StatusNotFound, "not found") return scope, false } } else if !scope.IsAdmin && scope.DeptID == "" { writeError(w, http.StatusForbidden, "department is required") return scope, false } if manage && !scope.IsAdmin && scope.DeptID == "" { writeError(w, http.StatusForbidden, "department is required") return scope, false } return scope, true } func readAccess(r *http.Request) accessScope { admin := r.Header.Get("X-User-Is-Admin") == "1" deptHead := r.Header.Get("X-User-Is-Department-Head") == "1" return accessScope{ IsAdmin: admin, CanManage: admin || deptHead, DeptID: strings.TrimSpace(r.Header.Get("X-User-Department-Id")), } } type rowScanner interface { Scan(dest ...any) error } func scanSection(rows rowScanner) (sectionOut, error) { var item sectionOut var dept, emoji, description string err := rows.Scan( &item.ID, &item.Vertical, &dept, &item.Slug, &item.Title, &emoji, &description, &item.CreatedAt, &item.ChannelsTotal, &item.ChannelsActive, &item.MessagesTotal, &item.LeadsTotal, ) item.DepartmentID = nullableString(dept) item.Emoji = nullableString(emoji) item.Description = nullableString(description) return item, err } func scanSectionRow(row pgx.Row) (sectionOut, error) { return scanSection(row) } func scanChannel(rows rowScanner) (channelOut, error) { var item channelOut var tgID, lastMsg sql.NullInt64 var title, slug sql.NullString var lastPoll sql.NullTime err := rows.Scan( &item.ID, &tgID, &item.Identifier, &title, &item.Vertical, &item.SectionID, &slug, &item.IsActive, &lastMsg, &lastPoll, &item.CreatedAt, ) item.TGID = nullInt(tgID) item.Title = nullString(title) item.SectionSlug = nullString(slug) item.LastMessageID = nullInt(lastMsg) item.LastPolledAt = nullTimePtr(lastPoll) return item, err } func scanChannelRow(row pgx.Row) (channelOut, error) { return scanChannel(row) } func scanMessage(rows rowScanner) (messageOut, error) { var item messageOut var vertical, slug string var grouped, senderID, tgID, views, forwards sql.NullInt64 var text, senderUsername, senderName, identifier sql.NullString var extractedText string err := rows.Scan( &item.ID, &item.ChannelID, &vertical, &slug, &item.TGMessageID, &grouped, &item.GroupSize, &item.Date, &text, &senderID, &senderUsername, &senderName, &identifier, &tgID, &item.HasMedia, &extractedText, &views, &forwards, &item.FetchedAt, ) item.ChannelVertical = nullableString(vertical) item.ChannelSectionSlug = nullableString(slug) item.GroupedID = nullInt(grouped) item.Text = nullString(text) item.SenderID = nullInt(senderID) item.SenderUsername = nullString(senderUsername) item.SenderName = nullString(senderName) item.Views = nullInt(views) item.Forwards = nullInt(forwards) if extractedText != "" && extractedText != "null" { item.Extracted = json.RawMessage(extractedText) } item.PostURL = buildPostURL(identifier.String, tgID, item.TGMessageID) return item, err } func scanMessageRow(row pgx.Row) (messageOut, error) { return scanMessage(row) } func buildPostURL(identifier string, tgID sql.NullInt64, msgID int64) *string { identifier = strings.TrimSpace(identifier) if identifier == "" || msgID == 0 { return nil } if strings.HasPrefix(identifier, "https://t.me/") { base := strings.TrimRight(identifier, "/") v := base + "/" + strconv.FormatInt(msgID, 10) return &v } if strings.HasPrefix(identifier, "@") { v := "https://t.me/" + strings.TrimPrefix(identifier, "@") + "/" + strconv.FormatInt(msgID, 10) return &v } if !strings.Contains(identifier, "/") && !strings.Contains(identifier, "+") { v := "https://t.me/" + identifier + "/" + strconv.FormatInt(msgID, 10) return &v } return nil } func readBody(w http.ResponseWriter, r *http.Request, out any) bool { defer r.Body.Close() if err := json.NewDecoder(r.Body).Decode(out); err != nil { writeError(w, http.StatusBadRequest, "invalid json") return false } return true } func writeJSON(w http.ResponseWriter, status int, payload any) { w.Header().Set("Content-Type", "application/json; charset=utf-8") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(payload) } func writeError(w http.ResponseWriter, status int, detail string) { writeJSON(w, status, map[string]string{"detail": detail}) } func writeDBError(w http.ResponseWriter, err error) { if errors.Is(err, pgx.ErrNoRows) { writeError(w, http.StatusNotFound, "not found") return } slog.Error("api_db_error", "error", err) writeError(w, http.StatusInternalServerError, "database error") } func queryRequired(w http.ResponseWriter, r *http.Request, name string) string { value := strings.TrimSpace(r.URL.Query().Get(name)) if value == "" { writeError(w, http.StatusBadRequest, name+" is required") } return value } func promptKey(deptID, vertical, section string) string { if deptID == "" { deptID = "global" } if section != "" { return fmt.Sprintf("llm_system_prompt:%s:%s:%s", deptID, vertical, section) } return fmt.Sprintf("llm_system_prompt:%s:%s", deptID, vertical) } func verdictKey(vertical string) string { if vertical == verticalHR { return "hr_lead" } return "lead" } func defaultPrompt(vertical string) string { if vertical == verticalHR { return defaultHRPrompt } return defaultREPrompt } func normalizeSlug(raw string) string { raw = strings.ToLower(strings.TrimSpace(raw)) raw = strings.ReplaceAll(raw, " ", "-") var b strings.Builder for _, r := range raw { if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' || r == '-' { b.WriteRune(r) } } return b.String() } func nullableString(v string) *string { v = strings.TrimSpace(v) if v == "" { return nil } return &v } func nullString(v sql.NullString) *string { if !v.Valid || strings.TrimSpace(v.String) == "" { return nil } return &v.String } func nullInt(v sql.NullInt64) *int64 { if !v.Valid { return nil } return &v.Int64 } func nullTime(v sql.NullTime) any { if !v.Valid { return nil } return v.Time } func nullTimePtr(v sql.NullTime) *time.Time { if !v.Valid { return nil } return &v.Time } func valueOrEmpty(v *string) string { if v == nil { return "" } return *v } func queryInt(raw string, fallback int) int { if raw == "" { return fallback } n, err := strconv.Atoi(raw) if err != nil { return fallback } return n } func clampInt(n, min, max int) int { if n < min { return min } if n > max { return max } return n } func maxInt(n, min int) int { if n < min { return min } return n } func loadConfig() config { return config{ APIHost: env("API_HOST", "0.0.0.0"), APIPort: envInt("API_PORT", 8000), PublicBasePath: env("PUBLIC_BASE_PATH", ""), PythonBaseURL: env("PYTHON_BASE_URL", "http://127.0.0.1:8001"), PostgresUser: env("POSTGRES_USER", "parser"), PostgresPassword: env("POSTGRES_PASSWORD", "parser"), PostgresDB: env("POSTGRES_DB", "parser"), PostgresHost: env("POSTGRES_HOST", "db"), PostgresPort: envInt("POSTGRES_PORT", 5432), PollIntervalSeconds: envInt("POLL_INTERVAL_SECONDS", 60), LLMEnabled: envBool("LLM_ENABLED", true), LLMBaseURL: env("LLM_BASE_URL", "http://10.2.3.5:8002"), LLMAPIKey: env("LLM_API_KEY", ""), LLMModel: env("LLM_MODEL", "qwen2.5-14b"), LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second, } } func (c config) databaseURL() string { return fmt.Sprintf( "postgres://%s:%s@%s:%d/%s", url.QueryEscape(c.PostgresUser), url.QueryEscape(c.PostgresPassword), c.PostgresHost, c.PostgresPort, url.QueryEscape(c.PostgresDB), ) } func env(key, fallback string) string { if v := strings.TrimSpace(os.Getenv(key)); v != "" { return v } return fallback } func envInt(key string, fallback int) int { if raw := strings.TrimSpace(os.Getenv(key)); raw != "" { if n, err := strconv.Atoi(raw); err == nil { return n } } return fallback } func envBool(key string, fallback bool) bool { if raw := strings.TrimSpace(os.Getenv(key)); raw != "" { if b, err := strconv.ParseBool(raw); err == nil { return b } if raw == "1" { return true } if raw == "0" { return false } } return fallback } const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала. Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости. Отвечай строго валидным JSON без markdown: { "is_listing": boolean, "kind": "sale" | "rent" | "purchase" | null, "property_type": string | null, "rooms": string | null, "area_m2": number | null, "price_text": string | null, "price_value": number | null, "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, "location": string | null, "contact_phone": string | null, "contact_name": string | null, "summary": string, "confidence": number } summary всегда по-русски, confidence в диапазоне 0..1.` const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала. Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт. Отвечай строго валидным JSON без markdown: { "is_lead": boolean, "kind": "vacancy" | "resume" | "contact" | null, "title": string | null, "company": string | null, "candidate_name": string | null, "experience_years": number | null, "skills": string[], "location": string | null, "remote": boolean | null, "employment_type": "full-time" | "part-time" | "contract" | "internship" | null, "salary_text": string | null, "salary_value": number | null, "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, "contact_phone": string | null, "contact_name": string | null, "summary": string, "confidence": number } summary всегда по-русски, confidence в диапазоне 0..1.`