diff --git a/Dockerfile b/Dockerfile index 28372ac..4f9e9f4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,7 +5,8 @@ WORKDIR /src COPY go.mod go.sum ./ COPY cmd ./cmd -RUN CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-classifier ./cmd/classifier +RUN CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-classifier ./cmd/classifier \ + && CGO_ENABLED=0 GOOS=linux go build -o /out/monitoring-tg-server ./cmd/server FROM python:3.11-slim @@ -29,6 +30,7 @@ RUN pip install --upgrade pip && pip install -e . RUN mkdir -p /data/session /data/media COPY --from=go-builder /out/monitoring-tg-classifier /usr/local/bin/monitoring-tg-classifier +COPY --from=go-builder /out/monitoring-tg-server /usr/local/bin/monitoring-tg-server COPY docker/entrypoint.sh /usr/local/bin/entrypoint.sh RUN chmod +x /usr/local/bin/entrypoint.sh @@ -36,4 +38,4 @@ RUN chmod +x /usr/local/bin/entrypoint.sh EXPOSE 8000 ENTRYPOINT ["/usr/local/bin/entrypoint.sh"] -CMD ["python", "-m", "parser_bot.main"] +CMD ["/usr/local/bin/monitoring-tg-server"] diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..a61a60c --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,1637 @@ +package main + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +const ( + verticalHR = "hr" +) + +type config struct { + APIHost string + APIPort int + PublicBasePath string + PythonBaseURL string + PostgresUser string + PostgresPassword string + PostgresDB string + PostgresHost string + PostgresPort int + PollIntervalSeconds int + LLMEnabled bool + LLMBaseURL string + LLMAPIKey string + LLMModel string + LLMTimeout time.Duration +} + +type app struct { + cfg config + db *pgxpool.Pool + http *http.Client + python *http.Client +} + +type accessScope struct { + IsAdmin bool + CanManage bool + DeptID string +} + +type sectionOut struct { + ID int64 `json:"id"` + Vertical string `json:"vertical"` + DepartmentID *string `json:"department_id,omitempty"` + Slug string `json:"slug"` + Title string `json:"title"` + Emoji *string `json:"emoji,omitempty"` + Description *string `json:"description,omitempty"` + CreatedAt time.Time `json:"created_at"` + ChannelsTotal int64 `json:"channels_total"` + ChannelsActive int64 `json:"channels_active"` + MessagesTotal int64 `json:"messages_total"` + LeadsTotal int64 `json:"leads_total"` +} + +type channelOut struct { + ID int64 `json:"id"` + TGID *int64 `json:"tg_id,omitempty"` + Identifier string `json:"identifier"` + Title *string `json:"title,omitempty"` + Vertical string `json:"vertical"` + SectionID int64 `json:"section_id"` + SectionSlug *string `json:"section_slug,omitempty"` + IsActive bool `json:"is_active"` + LastMessageID *int64 `json:"last_message_id,omitempty"` + LastPolledAt *time.Time `json:"last_polled_at,omitempty"` + CreatedAt time.Time `json:"created_at"` +} + +type messageOut struct { + ID int64 `json:"id"` + ChannelID int64 `json:"channel_id"` + ChannelVertical *string `json:"channel_vertical,omitempty"` + ChannelSectionSlug *string `json:"channel_section_slug,omitempty"` + TGMessageID int64 `json:"tg_message_id"` + GroupedID *int64 `json:"grouped_id,omitempty"` + GroupSize int `json:"group_size"` + Date time.Time `json:"date"` + Text *string `json:"text,omitempty"` + SenderID *int64 `json:"sender_id,omitempty"` + SenderUsername *string `json:"sender_username,omitempty"` + SenderName *string `json:"sender_name,omitempty"` + PostURL *string `json:"post_url,omitempty"` + HasMedia bool `json:"has_media"` + Extracted json.RawMessage `json:"extracted,omitempty"` + Views *int64 `json:"views,omitempty"` + Forwards *int64 `json:"forwards,omitempty"` + FetchedAt time.Time `json:"fetched_at"` +} + +func main() { + cfg := loadConfig() + logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + slog.SetDefault(logger) + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + pool, err := pgxpool.New(ctx, cfg.databaseURL()) + if err != nil { + slog.Error("db_connect_failed", "error", err) + os.Exit(1) + } + defer pool.Close() + + srvApp := &app{ + cfg: cfg, + db: pool, + http: &http.Client{Timeout: cfg.LLMTimeout}, + python: &http.Client{Timeout: 15 * time.Minute}, + } + + server := &http.Server{ + Addr: fmt.Sprintf("%s:%d", cfg.APIHost, cfg.APIPort), + Handler: http.HandlerFunc(srvApp.serveHTTP), + ReadHeaderTimeout: 10 * time.Second, + } + + go func() { + <-ctx.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + _ = server.Shutdown(shutdownCtx) + }() + + slog.Info("go_api_started", "addr", server.Addr, "python_base_url", cfg.PythonBaseURL) + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + slog.Error("server_failed", "error", err) + os.Exit(1) + } +} + +func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) { + path := a.apiPath(r.URL.Path) + if path == "/healthz" { + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) + return + } + if path == "/" { + writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-tg", "ui": "portal", "api": "go"}) + return + } + if !strings.HasPrefix(path, "/api/v1") { + writeError(w, http.StatusNotFound, "not found") + return + } + + ctx := r.Context() + switch { + case r.Method == http.MethodGet && path == "/api/v1/access/me": + a.handleAccessMe(w, r) + case path == "/api/v1/auth/status" || strings.HasPrefix(path, "/api/v1/auth/"): + a.proxyPython(w, r, path) + case path == "/api/v1/sections": + a.handleSections(w, r) + case strings.HasPrefix(path, "/api/v1/sections/"): + a.handleSectionItem(w, r, path) + case path == "/api/v1/channels": + a.handleChannels(w, r) + case strings.HasPrefix(path, "/api/v1/channels/"): + a.handleChannelItem(ctx, w, r, path) + case path == "/api/v1/messages": + a.handleMessages(ctx, w, r) + case strings.HasPrefix(path, "/api/v1/messages/"): + a.handleMessageItem(ctx, w, r, path) + case path == "/api/v1/stats": + a.handleStats(ctx, w, r) + case path == "/api/v1/llm/status": + a.handleLLMStatus(ctx, w) + case path == "/api/v1/llm/queue": + a.handleLLMQueue(ctx, w, r) + case path == "/api/v1/llm/prompt": + a.handlePrompt(ctx, w, r) + case r.Method == http.MethodPost && path == "/api/v1/poll": + a.proxyPython(w, r, path) + default: + writeError(w, http.StatusNotFound, "not found") + } +} + +func (a *app) apiPath(path string) string { + base := strings.TrimRight(a.cfg.PublicBasePath, "/") + if base != "" && strings.HasPrefix(path, base+"/") { + return strings.TrimPrefix(path, base) + } + if base != "" && path == base { + return "/" + } + return path +} + +func (a *app) handleAccessMe(w http.ResponseWriter, r *http.Request) { + scope := readAccess(r) + writeJSON(w, http.StatusOK, map[string]any{ + "is_admin": scope.IsAdmin, + "can_manage_department": scope.CanManage, + "department_id": nullableString(scope.DeptID), + }) +} + +func (a *app) handleSections(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + a.listSections(r.Context(), w, r) + case http.MethodPost: + a.createSection(r.Context(), w, r) + default: + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + } +} + +func (a *app) listSections(ctx context.Context, w http.ResponseWriter, r *http.Request) { + scope, ok := a.readScope(w, r, false) + if !ok { + return + } + vertical := queryRequired(w, r, "vertical") + if vertical == "" { + return + } + + args := []any{vertical} + deptFilter := "" + if !scope.IsAdmin { + args = append(args, scope.DeptID) + deptFilter = fmt.Sprintf(" AND s.department_id = $%d", len(args)) + } + + rows, err := a.db.Query(ctx, ` + SELECT + s.id, + s.vertical, + COALESCE(s.department_id, ''), + s.slug, + s.title, + COALESCE(s.emoji, ''), + COALESCE(s.description, ''), + s.created_at, + (SELECT count(*) FROM channels c WHERE c.section_id = s.id), + (SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true), + (SELECT count(*) + FROM messages m + JOIN channels c ON c.id = m.channel_id + WHERE c.section_id = s.id), + (SELECT count(*) + FROM messages m + JOIN channels c ON c.id = m.channel_id + WHERE c.section_id = s.id + AND ( + (s.vertical = 'hr' AND m.extracted->'hr_lead'->>'is_lead' = 'true') + OR (s.vertical <> 'hr' AND m.extracted->'lead'->>'is_listing' = 'true') + )) + FROM sections s + WHERE s.vertical = $1`+deptFilter+` + ORDER BY s.created_at ASC, s.id ASC + `, args...) + if err != nil { + writeDBError(w, err) + return + } + defer rows.Close() + + out := []sectionOut{} + for rows.Next() { + item, err := scanSection(rows) + if err != nil { + writeDBError(w, err) + return + } + out = append(out, item) + } + if err := rows.Err(); err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, out) +} + +func (a *app) createSection(ctx context.Context, w http.ResponseWriter, r *http.Request) { + scope, ok := a.readScope(w, r, true) + if !ok { + return + } + var payload struct { + Vertical string `json:"vertical"` + Slug string `json:"slug"` + Title string `json:"title"` + Emoji *string `json:"emoji"` + Description *string `json:"description"` + } + if !readBody(w, r, &payload) { + return + } + payload.Vertical = strings.TrimSpace(payload.Vertical) + payload.Slug = normalizeSlug(payload.Slug) + payload.Title = strings.TrimSpace(payload.Title) + if payload.Vertical == "" || payload.Slug == "" || payload.Title == "" { + writeError(w, http.StatusBadRequest, "vertical, slug and title are required") + return + } + dept := nullableString(scope.DeptID) + row := a.db.QueryRow(ctx, ` + INSERT INTO sections (vertical, department_id, slug, title, emoji, description) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at, + 0::bigint, 0::bigint, 0::bigint, 0::bigint + `, payload.Vertical, dept, payload.Slug, payload.Title, payload.Emoji, payload.Description) + item, err := scanSectionRow(row) + if err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusCreated, item) +} + +func (a *app) handleSectionItem(w http.ResponseWriter, r *http.Request, path string) { + rest := strings.TrimPrefix(path, "/api/v1/sections/") + parts := strings.SplitN(rest, "/", 2) + if len(parts) != 2 { + writeError(w, http.StatusNotFound, "not found") + return + } + vertical, err := url.PathUnescape(parts[0]) + if err != nil { + writeError(w, http.StatusBadRequest, "bad vertical") + return + } + slug, err := url.PathUnescape(parts[1]) + if err != nil { + writeError(w, http.StatusBadRequest, "bad slug") + return + } + + switch r.Method { + case http.MethodGet: + a.getSection(r.Context(), w, r, vertical, slug) + case http.MethodPatch: + a.updateSection(r.Context(), w, r, vertical, slug) + case http.MethodDelete: + a.deleteSection(r.Context(), w, r, vertical, slug) + default: + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + } +} + +func (a *app) getSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { + scope, ok := a.readScope(w, r, false) + if !ok { + return + } + item, err := a.findSection(ctx, vertical, slug, scope) + if err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, item) +} + +func (a *app) updateSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { + scope, ok := a.readScope(w, r, true) + if !ok { + return + } + var payload struct { + Title *string `json:"title"` + Emoji *string `json:"emoji"` + Description *string `json:"description"` + } + if !readBody(w, r, &payload) { + return + } + set := []string{} + args := []any{} + if payload.Title != nil { + args = append(args, strings.TrimSpace(*payload.Title)) + set = append(set, fmt.Sprintf("title = $%d", len(args))) + } + if payload.Emoji != nil { + args = append(args, nullableString(strings.TrimSpace(*payload.Emoji))) + set = append(set, fmt.Sprintf("emoji = $%d", len(args))) + } + if payload.Description != nil { + args = append(args, nullableString(strings.TrimSpace(*payload.Description))) + set = append(set, fmt.Sprintf("description = $%d", len(args))) + } + if len(set) == 0 { + a.getSection(ctx, w, r, vertical, slug) + return + } + args = append(args, vertical, slug) + where := fmt.Sprintf("vertical = $%d AND slug = $%d", len(args)-1, len(args)) + if !scope.IsAdmin { + args = append(args, scope.DeptID) + where += fmt.Sprintf(" AND department_id = $%d", len(args)) + } + row := a.db.QueryRow(ctx, ` + UPDATE sections + SET `+strings.Join(set, ", ")+` + WHERE `+where+` + RETURNING id, vertical, COALESCE(department_id, ''), slug, title, COALESCE(emoji, ''), COALESCE(description, ''), created_at, + (SELECT count(*) FROM channels c WHERE c.section_id = sections.id), + (SELECT count(*) FROM channels c WHERE c.section_id = sections.id AND c.is_active = true), + (SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = sections.id), + 0::bigint + `, args...) + item, err := scanSectionRow(row) + if err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, item) +} + +func (a *app) deleteSection(ctx context.Context, w http.ResponseWriter, r *http.Request, vertical, slug string) { + scope, ok := a.readScope(w, r, true) + if !ok { + return + } + section, err := a.findSection(ctx, vertical, slug, scope) + if err != nil { + writeDBError(w, err) + return + } + var count int64 + if err := a.db.QueryRow(ctx, `SELECT count(*) FROM channels WHERE section_id = $1`, section.ID).Scan(&count); err != nil { + writeDBError(w, err) + return + } + if count > 0 { + writeError(w, http.StatusBadRequest, fmt.Sprintf("section has %d channels - move or delete them first", count)) + return + } + if _, err := a.db.Exec(ctx, `DELETE FROM sections WHERE id = $1`, section.ID); err != nil { + writeDBError(w, err) + return + } + w.WriteHeader(http.StatusNoContent) +} + +func (a *app) findSection(ctx context.Context, vertical, slug string, scope accessScope) (sectionOut, error) { + args := []any{vertical, slug} + where := "s.vertical = $1 AND s.slug = $2" + if !scope.IsAdmin { + args = append(args, scope.DeptID) + where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) + } + row := a.db.QueryRow(ctx, ` + SELECT s.id, s.vertical, COALESCE(s.department_id, ''), s.slug, s.title, COALESCE(s.emoji, ''), COALESCE(s.description, ''), s.created_at, + (SELECT count(*) FROM channels c WHERE c.section_id = s.id), + (SELECT count(*) FROM channels c WHERE c.section_id = s.id AND c.is_active = true), + (SELECT count(*) FROM messages m JOIN channels c ON c.id = m.channel_id WHERE c.section_id = s.id), + 0::bigint + FROM sections s + WHERE `+where+` + `, args...) + return scanSectionRow(row) +} + +func (a *app) handleChannels(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + a.listChannels(r.Context(), w, r) + case http.MethodPost: + a.createChannel(r.Context(), w, r) + default: + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + } +} + +func (a *app) listChannels(ctx context.Context, w http.ResponseWriter, r *http.Request) { + scope, ok := a.readScope(w, r, false) + if !ok { + return + } + vertical := queryRequired(w, r, "vertical") + if vertical == "" { + return + } + section := strings.TrimSpace(r.URL.Query().Get("section")) + args := []any{vertical} + where := "c.vertical = $1" + if section != "" { + args = append(args, section) + where += fmt.Sprintf(" AND s.slug = $%d", len(args)) + } + if !scope.IsAdmin { + args = append(args, scope.DeptID) + where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) + } + rows, err := a.db.Query(ctx, ` + SELECT c.id, c.tg_id, c.identifier, c.title, c.vertical, c.section_id, s.slug, + c.is_active, c.last_message_id, c.last_polled_at, c.created_at + FROM channels c + JOIN sections s ON s.id = c.section_id + WHERE `+where+` + ORDER BY c.created_at DESC, c.id DESC + `, args...) + if err != nil { + writeDBError(w, err) + return + } + defer rows.Close() + out := []channelOut{} + for rows.Next() { + item, err := scanChannel(rows) + if err != nil { + writeDBError(w, err) + return + } + out = append(out, item) + } + if err := rows.Err(); err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, out) +} + +func (a *app) createChannel(ctx context.Context, w http.ResponseWriter, r *http.Request) { + scope, ok := a.readScope(w, r, true) + if !ok { + return + } + var payload struct { + Identifier string `json:"identifier"` + Vertical string `json:"vertical"` + Section string `json:"section"` + } + if !readBody(w, r, &payload) { + return + } + payload.Identifier = strings.TrimSpace(payload.Identifier) + payload.Vertical = strings.TrimSpace(payload.Vertical) + payload.Section = strings.TrimSpace(payload.Section) + if payload.Identifier == "" || payload.Vertical == "" || payload.Section == "" { + writeError(w, http.StatusBadRequest, "identifier, vertical and section are required") + return + } + section, err := a.findSection(ctx, payload.Vertical, payload.Section, scope) + if err != nil { + writeDBError(w, err) + return + } + row := a.db.QueryRow(ctx, ` + INSERT INTO channels (identifier, vertical, section_id, is_active) + VALUES ($1, $2, $3, true) + RETURNING id, tg_id, identifier, title, vertical, section_id, $4::text, is_active, last_message_id, last_polled_at, created_at + `, payload.Identifier, payload.Vertical, section.ID, section.Slug) + item, err := scanChannelRow(row) + if err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusCreated, item) +} + +func (a *app) handleChannelItem(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) { + rest := strings.TrimPrefix(path, "/api/v1/channels/") + parts := strings.Split(rest, "/") + if len(parts) == 0 { + writeError(w, http.StatusNotFound, "not found") + return + } + id, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil { + writeError(w, http.StatusBadRequest, "bad channel id") + return + } + if len(parts) == 2 { + switch parts[1] { + case "poll", "backfill-media": + a.proxyPython(w, r, path) + case "reanalyze": + a.reanalyzeChannel(ctx, w, r, id) + case "stats": + a.channelStats(ctx, w, r, id) + default: + writeError(w, http.StatusNotFound, "not found") + } + return + } + + switch r.Method { + case http.MethodGet: + a.getChannel(ctx, w, r, id) + case http.MethodPatch: + a.updateChannel(ctx, w, r, id) + case http.MethodDelete: + a.deleteChannel(ctx, w, r, id) + default: + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + } +} + +func (a *app) getChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { + scope, ok := a.readScope(w, r, false) + if !ok { + return + } + channel, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) + if err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, channel) +} + +func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { + scope, ok := a.readScope(w, r, true) + if !ok { + return + } + if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil { + writeDBError(w, err) + return + } + var payload struct { + IsActive *bool `json:"is_active"` + Vertical *string `json:"vertical"` + Section *string `json:"section"` + } + if !readBody(w, r, &payload) { + return + } + set := []string{} + args := []any{} + if payload.IsActive != nil { + args = append(args, *payload.IsActive) + set = append(set, fmt.Sprintf("is_active = $%d", len(args))) + } + if payload.Vertical != nil { + args = append(args, strings.TrimSpace(*payload.Vertical)) + set = append(set, fmt.Sprintf("vertical = $%d", len(args))) + } + if payload.Section != nil && strings.TrimSpace(*payload.Section) != "" { + vertical := r.URL.Query().Get("vertical") + if payload.Vertical != nil && strings.TrimSpace(*payload.Vertical) != "" { + vertical = strings.TrimSpace(*payload.Vertical) + } + section, err := a.findSection(ctx, vertical, strings.TrimSpace(*payload.Section), scope) + if err != nil { + writeDBError(w, err) + return + } + args = append(args, section.ID) + set = append(set, fmt.Sprintf("section_id = $%d", len(args))) + } + if len(set) == 0 { + a.getChannel(ctx, w, r, id) + return + } + args = append(args, id) + row := a.db.QueryRow(ctx, ` + UPDATE channels + SET `+strings.Join(set, ", ")+` + WHERE id = $`+strconv.Itoa(len(args))+` + RETURNING id, tg_id, identifier, title, vertical, section_id, + (SELECT slug FROM sections WHERE id = channels.section_id), + is_active, last_message_id, last_polled_at, created_at + `, args...) + item, err := scanChannelRow(row) + if err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, item) +} + +func (a *app) deleteChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { + scope, ok := a.readScope(w, r, true) + if !ok { + return + } + if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil { + writeDBError(w, err) + return + } + if _, err := a.db.Exec(ctx, `DELETE FROM channels WHERE id = $1`, id); err != nil { + writeDBError(w, err) + return + } + w.WriteHeader(http.StatusNoContent) +} + +func (a *app) findChannel(ctx context.Context, id int64, scope accessScope, vertical, section string) (channelOut, error) { + args := []any{id} + where := "c.id = $1" + if strings.TrimSpace(vertical) != "" { + args = append(args, strings.TrimSpace(vertical)) + where += fmt.Sprintf(" AND c.vertical = $%d", len(args)) + } + if strings.TrimSpace(section) != "" { + args = append(args, strings.TrimSpace(section)) + where += fmt.Sprintf(" AND s.slug = $%d", len(args)) + } + if !scope.IsAdmin { + args = append(args, scope.DeptID) + where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) + } + row := a.db.QueryRow(ctx, ` + SELECT c.id, c.tg_id, c.identifier, c.title, c.vertical, c.section_id, s.slug, + c.is_active, c.last_message_id, c.last_polled_at, c.created_at + FROM channels c + JOIN sections s ON s.id = c.section_id + WHERE `+where+` + `, args...) + return scanChannelRow(row) +} + +func (a *app) channelStats(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { + scope, ok := a.readScope(w, r, false) + if !ok { + return + } + ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) + if err != nil { + writeDBError(w, err) + return + } + var messages, leads int64 + key := "lead" + field := "is_listing" + if ch.Vertical == verticalHR { + key = "hr_lead" + field = "is_lead" + } + if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1`, id).Scan(&messages); err != nil { + writeDBError(w, err) + return + } + if err := a.db.QueryRow(ctx, `SELECT count(*) FROM messages WHERE channel_id = $1 AND extracted -> $2 ->> $3 = 'true'`, id, key, field).Scan(&leads); err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "channel_id": id, + "messages_total": messages, + "leads_total": leads, + "last_polled_at": ch.LastPolledAt, + "last_message_id": ch.LastMessageID, + }) +} + +func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *http.Request, id int64) { + scope, ok := a.readScope(w, r, true) + if !ok { + return + } + ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) + if err != nil { + writeDBError(w, err) + return + } + key := verdictKey(ch.Vertical) + tag, err := a.db.Exec(ctx, `UPDATE messages SET extracted = COALESCE(extracted, '{}'::jsonb) - $1 WHERE channel_id = $2`, key, id) + if err != nil { + writeDBError(w, err) + return + } + pending, err := a.pendingLLM(ctx, scope, ch.Vertical, valueOrEmpty(ch.SectionSlug)) + if err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, map[string]int64{"updated": tag.RowsAffected(), "pending": pending}) +} + +func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + scope, ok := a.readScope(w, r, false) + if !ok { + return + } + vertical := queryRequired(w, r, "vertical") + if vertical == "" { + return + } + q := r.URL.Query() + limit := clampInt(queryInt(q.Get("limit"), 50), 1, 500) + offset := maxInt(queryInt(q.Get("offset"), 0), 0) + args := []any{vertical} + where := "c.vertical = $1" + if section := strings.TrimSpace(q.Get("section")); section != "" { + args = append(args, section) + where += fmt.Sprintf(" AND s.slug = $%d", len(args)) + } + if channelID := strings.TrimSpace(q.Get("channel_id")); channelID != "" { + id, err := strconv.ParseInt(channelID, 10, 64) + if err != nil { + writeError(w, http.StatusBadRequest, "bad channel_id") + return + } + args = append(args, id) + where += fmt.Sprintf(" AND m.channel_id = $%d", len(args)) + } + if text := strings.TrimSpace(q.Get("q")); text != "" { + args = append(args, "%"+text+"%") + where += fmt.Sprintf(" AND m.text ILIKE $%d", len(args)) + } + if q.Get("leads_only") == "true" || q.Get("leads_only") == "1" { + key := "lead" + field := "is_listing" + if vertical == verticalHR { + key = "hr_lead" + field = "is_lead" + } + args = append(args, key, field) + where += fmt.Sprintf(" AND m.extracted -> $%d ->> $%d = 'true'", len(args)-1, len(args)) + } + if !scope.IsAdmin { + args = append(args, scope.DeptID) + where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) + } + args = append(args, limit, offset) + rows, err := a.db.Query(ctx, ` + SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, + m.date, m.text, m.sender_id, m.sender_username, m.sender_name, + c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, + m.views, m.forwards, m.fetched_at + FROM messages m + JOIN channels c ON c.id = m.channel_id + JOIN sections s ON s.id = c.section_id + WHERE `+where+` + ORDER BY m.date DESC, m.id DESC + LIMIT $`+strconv.Itoa(len(args)-1)+` OFFSET $`+strconv.Itoa(len(args))+` + `, args...) + if err != nil { + writeDBError(w, err) + return + } + defer rows.Close() + out := []messageOut{} + for rows.Next() { + item, err := scanMessage(rows) + if err != nil { + writeDBError(w, err) + return + } + out = append(out, item) + } + if err := rows.Err(); err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, out) +} + +func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + scope, ok := a.readScope(w, r, false) + if !ok { + return + } + idRaw := strings.TrimPrefix(path, "/api/v1/messages/") + id, err := strconv.ParseInt(idRaw, 10, 64) + if err != nil { + writeError(w, http.StatusBadRequest, "bad message id") + return + } + args := []any{id} + where := "m.id = $1" + if !scope.IsAdmin { + args = append(args, scope.DeptID) + where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) + } + row := a.db.QueryRow(ctx, ` + SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, + m.date, m.text, m.sender_id, m.sender_username, m.sender_name, + c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, + m.views, m.forwards, m.fetched_at + FROM messages m + JOIN channels c ON c.id = m.channel_id + JOIN sections s ON s.id = c.section_id + WHERE `+where+` + `, args...) + item, err := scanMessageRow(row) + if err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, item) +} + +func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + scope, ok := a.readScope(w, r, false) + if !ok { + return + } + vertical := queryRequired(w, r, "vertical") + if vertical == "" { + return + } + section := strings.TrimSpace(r.URL.Query().Get("section")) + args := []any{vertical} + where := "c.vertical = $1" + if section != "" { + args = append(args, section) + where += fmt.Sprintf(" AND s.slug = $%d", len(args)) + } + if !scope.IsAdmin { + args = append(args, scope.DeptID) + where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) + } + + var channelsTotal, channelsActive, messagesTotal, messages24h, leadsTotal, leads24h int64 + var lastPoll sql.NullTime + countQuery := `FROM channels c JOIN sections s ON s.id = c.section_id WHERE ` + where + if err := a.db.QueryRow(ctx, `SELECT count(*) `+countQuery, args...).Scan(&channelsTotal); err != nil { + writeDBError(w, err) + return + } + if err := a.db.QueryRow(ctx, `SELECT count(*) `+countQuery+` AND c.is_active = true`, args...).Scan(&channelsActive); err != nil { + writeDBError(w, err) + return + } + if err := a.db.QueryRow(ctx, `SELECT max(c.last_polled_at) `+countQuery, args...).Scan(&lastPoll); err != nil { + writeDBError(w, err) + return + } + messageFrom := `FROM messages m JOIN channels c ON c.id = m.channel_id JOIN sections s ON s.id = c.section_id WHERE ` + where + if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom, args...).Scan(&messagesTotal); err != nil { + writeDBError(w, err) + return + } + if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+` AND m.fetched_at >= now() - interval '24 hours'`, args...).Scan(&messages24h); err != nil { + writeDBError(w, err) + return + } + key := "lead" + field := "is_listing" + if vertical == verticalHR { + key = "hr_lead" + field = "is_lead" + } + leadArgs := append(append([]any{}, args...), key, field) + leadClause := fmt.Sprintf(" AND m.extracted -> $%d ->> $%d = 'true'", len(leadArgs)-1, len(leadArgs)) + if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause, leadArgs...).Scan(&leadsTotal); err != nil { + writeDBError(w, err) + return + } + if err := a.db.QueryRow(ctx, `SELECT count(*) `+messageFrom+leadClause+` AND m.fetched_at >= now() - interval '24 hours'`, leadArgs...).Scan(&leads24h); err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "vertical": vertical, + "section_slug": nullableString(section), + "channels_total": channelsTotal, + "channels_active": channelsActive, + "messages_total": messagesTotal, + "messages_last_24h": messages24h, + "leads_total": leadsTotal, + "leads_last_24h": leads24h, + "poll_interval_seconds": a.cfg.PollIntervalSeconds, + "last_poll_at": nullTime(lastPoll), + }) +} + +func (a *app) handleLLMStatus(ctx context.Context, w http.ResponseWriter) { + ready := false + if a.cfg.LLMEnabled { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, strings.TrimRight(a.cfg.LLMBaseURL, "/")+"/v1/models", nil) + if err == nil { + if a.cfg.LLMAPIKey != "" { + req.Header.Set("Authorization", "Bearer "+a.cfg.LLMAPIKey) + } + resp, err := a.http.Do(req) + if err == nil { + ready = resp.StatusCode >= 200 && resp.StatusCode < 300 + _ = resp.Body.Close() + } + } + } + writeJSON(w, http.StatusOK, map[string]any{ + "enabled": a.cfg.LLMEnabled, + "ready": ready, + "base_url": a.cfg.LLMBaseURL, + "model": a.cfg.LLMModel, + }) +} + +func (a *app) handleLLMQueue(ctx context.Context, w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + scope, ok := a.readScope(w, r, false) + if !ok { + return + } + vertical := queryRequired(w, r, "vertical") + if vertical == "" { + return + } + pending, err := a.pendingLLM(ctx, scope, vertical, strings.TrimSpace(r.URL.Query().Get("section"))) + if err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, map[string]int64{"pending": pending}) +} + +func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, section string) (int64, error) { + args := []any{vertical} + where := `c.vertical = $1 AND m.text IS NOT NULL AND ((c.vertical = 'hr' AND (m.extracted IS NULL OR m.extracted->'hr_lead' IS NULL)) OR (c.vertical <> 'hr' AND (m.extracted IS NULL OR m.extracted->'lead' IS NULL)))` + if section != "" { + args = append(args, section) + where += fmt.Sprintf(" AND s.slug = $%d", len(args)) + } + if !scope.IsAdmin { + args = append(args, scope.DeptID) + where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) + } + var pending int64 + err := a.db.QueryRow(ctx, ` + SELECT count(*) + FROM messages m + JOIN channels c ON c.id = m.channel_id + JOIN sections s ON s.id = c.section_id + WHERE `+where, args...).Scan(&pending) + return pending, err +} + +func (a *app) handlePrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + a.getPrompt(ctx, w, r) + case http.MethodPut: + a.savePrompt(ctx, w, r) + case http.MethodDelete: + a.resetPrompt(ctx, w, r) + default: + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + } +} + +func (a *app) getPrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { + scope, ok := a.readScope(w, r, false) + if !ok { + return + } + vertical := queryRequired(w, r, "vertical") + if vertical == "" { + return + } + section := strings.TrimSpace(r.URL.Query().Get("section")) + if section != "" { + if _, err := a.findSection(ctx, vertical, section, scope); err != nil { + writeDBError(w, err) + return + } + } + prompt, source, err := a.resolvePrompt(ctx, scope.DeptID, vertical, section) + if err != nil { + writeDBError(w, err) + return + } + overridden, err := a.promptExists(ctx, scope.DeptID, vertical, section) + if err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "vertical": vertical, + "department_id": nullableString(scope.DeptID), + "section": nullableString(section), + "prompt": prompt, + "default": defaultPrompt(vertical), + "source": source, + "is_overridden_here": overridden, + }) +} + +func (a *app) savePrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { + scope, ok := a.readScope(w, r, true) + if !ok { + return + } + vertical := queryRequired(w, r, "vertical") + if vertical == "" { + return + } + section := strings.TrimSpace(r.URL.Query().Get("section")) + var payload struct { + Prompt string `json:"prompt"` + } + if !readBody(w, r, &payload) { + return + } + text := strings.TrimSpace(payload.Prompt) + if text == "" { + writeError(w, http.StatusBadRequest, "prompt must be a non-empty string") + return + } + if len(text) > 30000 { + writeError(w, http.StatusBadRequest, "prompt is too long (max 30000 chars)") + return + } + key := promptKey(scope.DeptID, vertical, section) + value, _ := json.Marshal(text) + if _, err := a.db.Exec(ctx, ` + INSERT INTO app_settings (key, value, updated_at) + VALUES ($1, $2::jsonb, now()) + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = now() + `, key, string(value)); err != nil { + writeDBError(w, err) + return + } + writeJSON(w, http.StatusOK, map[string]any{"saved": true, "vertical": vertical, "department_id": nullableString(scope.DeptID), "section": nullableString(section), "length": len(text)}) +} + +func (a *app) resetPrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) { + scope, ok := a.readScope(w, r, true) + if !ok { + return + } + vertical := queryRequired(w, r, "vertical") + if vertical == "" { + return + } + section := strings.TrimSpace(r.URL.Query().Get("section")) + if _, err := a.db.Exec(ctx, `DELETE FROM app_settings WHERE key = $1`, promptKey(scope.DeptID, vertical, section)); err != nil { + writeDBError(w, err) + return + } + w.WriteHeader(http.StatusNoContent) +} + +func (a *app) resolvePrompt(ctx context.Context, deptID, vertical, section string) (string, string, error) { + keys := []struct { + key string + source string + }{} + if section != "" { + keys = append(keys, struct { + key string + source string + }{promptKey(deptID, vertical, section), "section"}) + } + keys = append(keys, struct { + key string + source string + }{promptKey(deptID, vertical, ""), "vertical"}) + + for _, candidate := range keys { + var text string + err := a.db.QueryRow(ctx, `SELECT value #>> '{}' FROM app_settings WHERE key = $1`, candidate.key).Scan(&text) + if err == nil && strings.TrimSpace(text) != "" { + return text, candidate.source, nil + } + if err != nil && !errors.Is(err, pgx.ErrNoRows) { + return "", "", err + } + } + return defaultPrompt(vertical), "default", nil +} + +func (a *app) promptExists(ctx context.Context, deptID, vertical, section string) (bool, error) { + var exists bool + err := a.db.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM app_settings WHERE key = $1)`, promptKey(deptID, vertical, section)).Scan(&exists) + return exists, err +} + +func (a *app) proxyPython(w http.ResponseWriter, r *http.Request, path string) { + scope := readAccess(r) + if strings.Contains(path, "/auth/") && !scope.IsAdmin { + writeError(w, http.StatusNotFound, "not found") + return + } + if (strings.Contains(path, "/poll") || strings.Contains(path, "/backfill-media")) && !scope.CanManage { + writeError(w, http.StatusNotFound, "not found") + return + } + target := strings.TrimRight(a.cfg.PythonBaseURL, "/") + path + if r.URL.RawQuery != "" { + target += "?" + r.URL.RawQuery + } + body, err := io.ReadAll(r.Body) + if err != nil { + writeError(w, http.StatusBadRequest, "bad body") + return + } + req, err := http.NewRequestWithContext(r.Context(), r.Method, target, bytes.NewReader(body)) + if err != nil { + writeError(w, http.StatusBadGateway, err.Error()) + return + } + req.Header = r.Header.Clone() + resp, err := a.python.Do(req) + if err != nil { + writeError(w, http.StatusBadGateway, err.Error()) + return + } + defer resp.Body.Close() + for k, values := range resp.Header { + for _, v := range values { + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.StatusCode) + _, _ = io.Copy(w, resp.Body) +} + +func (a *app) readScope(w http.ResponseWriter, r *http.Request, manage bool) (accessScope, bool) { + scope := readAccess(r) + if manage { + if !scope.CanManage { + writeError(w, http.StatusNotFound, "not found") + return scope, false + } + } else if !scope.IsAdmin && scope.DeptID == "" { + writeError(w, http.StatusForbidden, "department is required") + return scope, false + } + if manage && !scope.IsAdmin && scope.DeptID == "" { + writeError(w, http.StatusForbidden, "department is required") + return scope, false + } + return scope, true +} + +func readAccess(r *http.Request) accessScope { + admin := r.Header.Get("X-User-Is-Admin") == "1" + deptHead := r.Header.Get("X-User-Is-Department-Head") == "1" + return accessScope{ + IsAdmin: admin, + CanManage: admin || deptHead, + DeptID: strings.TrimSpace(r.Header.Get("X-User-Department-Id")), + } +} + +type rowScanner interface { + Scan(dest ...any) error +} + +func scanSection(rows rowScanner) (sectionOut, error) { + var item sectionOut + var dept, emoji, description string + err := rows.Scan( + &item.ID, + &item.Vertical, + &dept, + &item.Slug, + &item.Title, + &emoji, + &description, + &item.CreatedAt, + &item.ChannelsTotal, + &item.ChannelsActive, + &item.MessagesTotal, + &item.LeadsTotal, + ) + item.DepartmentID = nullableString(dept) + item.Emoji = nullableString(emoji) + item.Description = nullableString(description) + return item, err +} + +func scanSectionRow(row pgx.Row) (sectionOut, error) { + return scanSection(row) +} + +func scanChannel(rows rowScanner) (channelOut, error) { + var item channelOut + var tgID, lastMsg sql.NullInt64 + var title, slug sql.NullString + var lastPoll sql.NullTime + err := rows.Scan( + &item.ID, + &tgID, + &item.Identifier, + &title, + &item.Vertical, + &item.SectionID, + &slug, + &item.IsActive, + &lastMsg, + &lastPoll, + &item.CreatedAt, + ) + item.TGID = nullInt(tgID) + item.Title = nullString(title) + item.SectionSlug = nullString(slug) + item.LastMessageID = nullInt(lastMsg) + item.LastPolledAt = nullTimePtr(lastPoll) + return item, err +} + +func scanChannelRow(row pgx.Row) (channelOut, error) { + return scanChannel(row) +} + +func scanMessage(rows rowScanner) (messageOut, error) { + var item messageOut + var vertical, slug string + var grouped, senderID, tgID, views, forwards sql.NullInt64 + var text, senderUsername, senderName, identifier sql.NullString + var extractedText string + err := rows.Scan( + &item.ID, + &item.ChannelID, + &vertical, + &slug, + &item.TGMessageID, + &grouped, + &item.GroupSize, + &item.Date, + &text, + &senderID, + &senderUsername, + &senderName, + &identifier, + &tgID, + &item.HasMedia, + &extractedText, + &views, + &forwards, + &item.FetchedAt, + ) + item.ChannelVertical = nullableString(vertical) + item.ChannelSectionSlug = nullableString(slug) + item.GroupedID = nullInt(grouped) + item.Text = nullString(text) + item.SenderID = nullInt(senderID) + item.SenderUsername = nullString(senderUsername) + item.SenderName = nullString(senderName) + item.Views = nullInt(views) + item.Forwards = nullInt(forwards) + if extractedText != "" && extractedText != "null" { + item.Extracted = json.RawMessage(extractedText) + } + item.PostURL = buildPostURL(identifier.String, tgID, item.TGMessageID) + return item, err +} + +func scanMessageRow(row pgx.Row) (messageOut, error) { + return scanMessage(row) +} + +func buildPostURL(identifier string, tgID sql.NullInt64, msgID int64) *string { + identifier = strings.TrimSpace(identifier) + if identifier == "" || msgID == 0 { + return nil + } + if strings.HasPrefix(identifier, "https://t.me/") { + base := strings.TrimRight(identifier, "/") + v := base + "/" + strconv.FormatInt(msgID, 10) + return &v + } + if strings.HasPrefix(identifier, "@") { + v := "https://t.me/" + strings.TrimPrefix(identifier, "@") + "/" + strconv.FormatInt(msgID, 10) + return &v + } + if !strings.Contains(identifier, "/") && !strings.Contains(identifier, "+") { + v := "https://t.me/" + identifier + "/" + strconv.FormatInt(msgID, 10) + return &v + } + return nil +} + +func readBody(w http.ResponseWriter, r *http.Request, out any) bool { + defer r.Body.Close() + if err := json.NewDecoder(r.Body).Decode(out); err != nil { + writeError(w, http.StatusBadRequest, "invalid json") + return false + } + return true +} + +func writeJSON(w http.ResponseWriter, status int, payload any) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(payload) +} + +func writeError(w http.ResponseWriter, status int, detail string) { + writeJSON(w, status, map[string]string{"detail": detail}) +} + +func writeDBError(w http.ResponseWriter, err error) { + if errors.Is(err, pgx.ErrNoRows) { + writeError(w, http.StatusNotFound, "not found") + return + } + slog.Error("api_db_error", "error", err) + writeError(w, http.StatusInternalServerError, "database error") +} + +func queryRequired(w http.ResponseWriter, r *http.Request, name string) string { + value := strings.TrimSpace(r.URL.Query().Get(name)) + if value == "" { + writeError(w, http.StatusBadRequest, name+" is required") + } + return value +} + +func promptKey(deptID, vertical, section string) string { + if deptID == "" { + deptID = "global" + } + if section != "" { + return fmt.Sprintf("llm_system_prompt:%s:%s:%s", deptID, vertical, section) + } + return fmt.Sprintf("llm_system_prompt:%s:%s", deptID, vertical) +} + +func verdictKey(vertical string) string { + if vertical == verticalHR { + return "hr_lead" + } + return "lead" +} + +func defaultPrompt(vertical string) string { + if vertical == verticalHR { + return defaultHRPrompt + } + return defaultREPrompt +} + +func normalizeSlug(raw string) string { + raw = strings.ToLower(strings.TrimSpace(raw)) + raw = strings.ReplaceAll(raw, " ", "-") + var b strings.Builder + for _, r := range raw { + if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '_' || r == '-' { + b.WriteRune(r) + } + } + return b.String() +} + +func nullableString(v string) *string { + v = strings.TrimSpace(v) + if v == "" { + return nil + } + return &v +} + +func nullString(v sql.NullString) *string { + if !v.Valid || strings.TrimSpace(v.String) == "" { + return nil + } + return &v.String +} + +func nullInt(v sql.NullInt64) *int64 { + if !v.Valid { + return nil + } + return &v.Int64 +} + +func nullTime(v sql.NullTime) any { + if !v.Valid { + return nil + } + return v.Time +} + +func nullTimePtr(v sql.NullTime) *time.Time { + if !v.Valid { + return nil + } + return &v.Time +} + +func valueOrEmpty(v *string) string { + if v == nil { + return "" + } + return *v +} + +func queryInt(raw string, fallback int) int { + if raw == "" { + return fallback + } + n, err := strconv.Atoi(raw) + if err != nil { + return fallback + } + return n +} + +func clampInt(n, min, max int) int { + if n < min { + return min + } + if n > max { + return max + } + return n +} + +func maxInt(n, min int) int { + if n < min { + return min + } + return n +} + +func loadConfig() config { + return config{ + APIHost: env("API_HOST", "0.0.0.0"), + APIPort: envInt("API_PORT", 8000), + PublicBasePath: env("PUBLIC_BASE_PATH", ""), + PythonBaseURL: env("PYTHON_BASE_URL", "http://127.0.0.1:8001"), + PostgresUser: env("POSTGRES_USER", "parser"), + PostgresPassword: env("POSTGRES_PASSWORD", "parser"), + PostgresDB: env("POSTGRES_DB", "parser"), + PostgresHost: env("POSTGRES_HOST", "db"), + PostgresPort: envInt("POSTGRES_PORT", 5432), + PollIntervalSeconds: envInt("POLL_INTERVAL_SECONDS", 60), + LLMEnabled: envBool("LLM_ENABLED", true), + LLMBaseURL: env("LLM_BASE_URL", "http://10.2.3.5:8002"), + LLMAPIKey: env("LLM_API_KEY", ""), + LLMModel: env("LLM_MODEL", "qwen2.5-14b"), + LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second, + } +} + +func (c config) databaseURL() string { + return fmt.Sprintf( + "postgres://%s:%s@%s:%d/%s", + url.QueryEscape(c.PostgresUser), + url.QueryEscape(c.PostgresPassword), + c.PostgresHost, + c.PostgresPort, + url.QueryEscape(c.PostgresDB), + ) +} + +func env(key, fallback string) string { + if v := strings.TrimSpace(os.Getenv(key)); v != "" { + return v + } + return fallback +} + +func envInt(key string, fallback int) int { + if raw := strings.TrimSpace(os.Getenv(key)); raw != "" { + if n, err := strconv.Atoi(raw); err == nil { + return n + } + } + return fallback +} + +func envBool(key string, fallback bool) bool { + if raw := strings.TrimSpace(os.Getenv(key)); raw != "" { + if b, err := strconv.ParseBool(raw); err == nil { + return b + } + if raw == "1" { + return true + } + if raw == "0" { + return false + } + } + return fallback +} + +const defaultREPrompt = `Ты — аналитик объявлений о недвижимости. Тебе дают текст из Telegram-канала. +Определи, является ли сообщение реальным объявлением о покупке, продаже или аренде недвижимости. +Отвечай строго валидным JSON без markdown: +{ + "is_listing": boolean, + "kind": "sale" | "rent" | "purchase" | null, + "property_type": string | null, + "rooms": string | null, + "area_m2": number | null, + "price_text": string | null, + "price_value": number | null, + "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, + "location": string | null, + "contact_phone": string | null, + "contact_name": string | null, + "summary": string, + "confidence": number +} +summary всегда по-русски, confidence в диапазоне 0..1.` + +const defaultHRPrompt = `Ты — аналитик HR-объявлений. Тебе дают текст из Telegram-канала. +Определи, относится ли сообщение к рынку труда: вакансия, резюме или короткий HR-контакт. +Отвечай строго валидным JSON без markdown: +{ + "is_lead": boolean, + "kind": "vacancy" | "resume" | "contact" | null, + "title": string | null, + "company": string | null, + "candidate_name": string | null, + "experience_years": number | null, + "skills": string[], + "location": string | null, + "remote": boolean | null, + "employment_type": "full-time" | "part-time" | "contract" | "internship" | null, + "salary_text": string | null, + "salary_value": number | null, + "currency": "RUB" | "USD" | "EUR" | "AED" | "GBP" | "CNY" | "TRY" | "KZT" | "BYN" | "UAH" | null, + "contact_phone": string | null, + "contact_name": string | null, + "summary": string, + "confidence": number +} +summary всегда по-русски, confidence в диапазоне 0..1.` diff --git a/k8s/configmap.yaml b/k8s/configmap.yaml index 3a386ce..2764db0 100644 --- a/k8s/configmap.yaml +++ b/k8s/configmap.yaml @@ -7,6 +7,7 @@ data: API_HOST: "0.0.0.0" API_PORT: "8000" PUBLIC_BASE_PATH: "/api/monitoring-tg" + PYTHON_BASE_URL: "http://127.0.0.1:8001" POSTGRES_HOST: "postgres.monitoring-tg.svc.cluster.local" POSTGRES_PORT: "5432" POSTGRES_USER: "parser" diff --git a/k8s/server-deployment.yaml b/k8s/server-deployment.yaml index 5c0e08d..68806bc 100644 --- a/k8s/server-deployment.yaml +++ b/k8s/server-deployment.yaml @@ -31,6 +31,7 @@ spec: containers: - name: monitoring-tg-server image: localhost:30300/admin/monitoring-tg-server:latest + command: ["/usr/local/bin/monitoring-tg-server"] ports: - containerPort: 8000 envFrom: @@ -64,6 +65,43 @@ spec: limits: cpu: 800m memory: 1Gi + - name: monitoring-tg-telegram + image: localhost:30300/admin/monitoring-tg-server:latest + command: ["python", "-m", "parser_bot.main"] + envFrom: + - configMapRef: + name: monitoring-tg-config + - secretRef: + name: monitoring-tg-secrets + env: + - name: API_PORT + value: "8001" + volumeMounts: + - name: app-data + mountPath: /data + startupProbe: + httpGet: + path: /healthz + port: 8001 + periodSeconds: 5 + failureThreshold: 30 + livenessProbe: + httpGet: + path: /healthz + port: 8001 + periodSeconds: 10 + readinessProbe: + httpGet: + path: /healthz + port: 8001 + periodSeconds: 5 + resources: + requests: + cpu: 80m + memory: 256Mi + limits: + cpu: 800m + memory: 1Gi - name: monitoring-tg-classifier image: localhost:30300/admin/monitoring-tg-server:latest command: ["/usr/local/bin/monitoring-tg-classifier"]