diff --git a/cmd/server/main.go b/cmd/server/main.go index 5318068..f7069d8 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -13,6 +13,7 @@ import ( "net/url" "os" "os/signal" + "path/filepath" "strconv" "strings" "syscall" @@ -31,6 +32,7 @@ type config struct { APIPort int PublicBasePath string PythonBaseURL string + MediaDir string PostgresUser string PostgresPassword string PostgresDB string @@ -88,24 +90,25 @@ type channelOut struct { } type messageOut struct { - ID int64 `json:"id"` - ChannelID int64 `json:"channel_id"` - ChannelVertical *string `json:"channel_vertical,omitempty"` - ChannelSectionSlug *string `json:"channel_section_slug,omitempty"` - TGMessageID int64 `json:"tg_message_id"` - GroupedID *int64 `json:"grouped_id,omitempty"` - GroupSize int `json:"group_size"` - Date time.Time `json:"date"` - Text *string `json:"text,omitempty"` - SenderID *int64 `json:"sender_id,omitempty"` - SenderUsername *string `json:"sender_username,omitempty"` - SenderName *string `json:"sender_name,omitempty"` - PostURL *string `json:"post_url,omitempty"` - HasMedia bool `json:"has_media"` - Extracted json.RawMessage `json:"extracted,omitempty"` - Views *int64 `json:"views,omitempty"` - Forwards *int64 `json:"forwards,omitempty"` - FetchedAt time.Time `json:"fetched_at"` + ID int64 `json:"id"` + ChannelID int64 `json:"channel_id"` + ChannelVertical *string `json:"channel_vertical,omitempty"` + ChannelSectionSlug *string `json:"channel_section_slug,omitempty"` + TGMessageID int64 `json:"tg_message_id"` + GroupedID *int64 `json:"grouped_id,omitempty"` + GroupSize int `json:"group_size"` + Date time.Time `json:"date"` + Text *string `json:"text,omitempty"` + SenderID *int64 `json:"sender_id,omitempty"` + SenderUsername *string `json:"sender_username,omitempty"` + SenderName *string `json:"sender_name,omitempty"` + PostURL *string `json:"post_url,omitempty"` + HasMedia bool `json:"has_media"` + MediaFiles []map[string]any `json:"media_files,omitempty"` + Extracted json.RawMessage `json:"extracted,omitempty"` + Views *int64 `json:"views,omitempty"` + Forwards *int64 `json:"forwards,omitempty"` + FetchedAt time.Time `json:"fetched_at"` } func main() { @@ -179,6 +182,8 @@ func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) { a.handleChannels(w, r) case strings.HasPrefix(path, "/api/v1/channels/"): a.handleChannelItem(ctx, w, r, path) + case strings.HasPrefix(path, "/api/v1/media/"): + a.handleMedia(w, r, path) case path == "/api/v1/messages": a.handleMessages(ctx, w, r) case strings.HasPrefix(path, "/api/v1/messages/"): @@ -835,12 +840,13 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http args = append(args, scope.DeptID) where += fmt.Sprintf(" AND s.department_id = $%d", len(args)) } - args = append(args, limit, offset) + fetchLimit := clampInt(limit*5, limit, 1000) + args = append(args, fetchLimit, offset) rows, err := a.db.Query(ctx, ` SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, - m.views, m.forwards, m.fetched_at + COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at FROM messages m JOIN channels c ON c.id = m.channel_id JOIN sections s ON s.id = c.section_id @@ -860,12 +866,14 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http writeDBError(w, err) return } + a.normalizeMessageMedia(&item) out = append(out, item) } if err := rows.Err(); err != nil { writeDBError(w, err) return } + out = aggregateMessages(out, limit) writeJSON(w, http.StatusOK, out) } @@ -894,7 +902,7 @@ func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *h SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int, m.date, m.text, m.sender_id, m.sender_username, m.sender_name, c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text, - m.views, m.forwards, m.fetched_at + COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at FROM messages m JOIN channels c ON c.id = m.channel_id JOIN sections s ON s.id = c.section_id @@ -905,9 +913,77 @@ func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *h writeDBError(w, err) return } + a.normalizeMessageMedia(&item) writeJSON(w, http.StatusOK, item) } +func (a *app) handleMedia(w http.ResponseWriter, r *http.Request, path string) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + scope, ok := a.readScope(w, r, false) + if !ok { + return + } + rel, err := url.PathUnescape(strings.TrimPrefix(path, "/api/v1/media/")) + if err != nil { + writeError(w, http.StatusBadRequest, "bad media path") + return + } + clean := strings.TrimPrefix(filepath.Clean("/"+rel), "/") + if clean == "." || clean == "" || strings.HasPrefix(clean, "../") { + writeError(w, http.StatusBadRequest, "bad media path") + return + } + parts := strings.SplitN(clean, string(os.PathSeparator), 2) + channelID, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil || channelID <= 0 { + writeError(w, http.StatusBadRequest, "bad media path") + return + } + allowed, err := a.canReadChannelMedia(r.Context(), scope, channelID) + if err != nil { + writeDBError(w, err) + return + } + if !allowed { + writeError(w, http.StatusNotFound, "not found") + return + } + base, err := filepath.Abs(a.cfg.MediaDir) + if err != nil { + writeError(w, http.StatusInternalServerError, "media directory unavailable") + return + } + full, err := filepath.Abs(filepath.Join(base, clean)) + if err != nil || (full != base && !strings.HasPrefix(full, base+string(os.PathSeparator))) { + writeError(w, http.StatusBadRequest, "bad media path") + return + } + http.ServeFile(w, r, full) +} + +func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channelID int64) (bool, error) { + var dept sql.NullString + err := a.db.QueryRow(ctx, ` + SELECT s.department_id + FROM channels c + JOIN sections s ON s.id = c.section_id + WHERE c.id = $1 + `, channelID).Scan(&dept) + if errors.Is(err, pgx.ErrNoRows) { + return false, nil + } + if err != nil { + return false, err + } + if scope.IsAdmin { + return true, nil + } + return dept.Valid && dept.String == scope.DeptID, nil +} + func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") @@ -1329,7 +1405,7 @@ func scanMessage(rows rowScanner) (messageOut, error) { var vertical, slug string var grouped, senderID, tgID, views, forwards sql.NullInt64 var text, senderUsername, senderName, identifier sql.NullString - var extractedText string + var extractedText, mediaText string err := rows.Scan( &item.ID, &item.ChannelID, @@ -1347,6 +1423,7 @@ func scanMessage(rows rowScanner) (messageOut, error) { &tgID, &item.HasMedia, &extractedText, + &mediaText, &views, &forwards, &item.FetchedAt, @@ -1363,6 +1440,9 @@ func scanMessage(rows rowScanner) (messageOut, error) { if extractedText != "" && extractedText != "null" { item.Extracted = json.RawMessage(extractedText) } + if mediaText != "" && mediaText != "null" { + _ = json.Unmarshal([]byte(mediaText), &item.MediaFiles) + } item.PostURL = buildPostURL(identifier.String, tgID, item.TGMessageID) return item, err } @@ -1371,6 +1451,82 @@ func scanMessageRow(row pgx.Row) (messageOut, error) { return scanMessage(row) } +func (a *app) normalizeMessageMedia(item *messageOut) { + if len(item.MediaFiles) == 0 { + return + } + for _, file := range item.MediaFiles { + rawURL, _ := file["url"].(string) + rel := mediaRelativePath(rawURL) + if rel == "" { + continue + } + file["url"] = a.mediaURL(rel) + if _, ok := file["name"].(string); !ok { + file["name"] = filepath.Base(rel) + } + } +} + +func mediaRelativePath(raw string) string { + raw = strings.TrimSpace(raw) + if raw == "" { + return "" + } + if idx := strings.Index(raw, "/media/"); idx >= 0 { + raw = raw[idx+len("/media/"):] + } + raw = strings.TrimPrefix(raw, "/") + raw = strings.TrimPrefix(raw, "media/") + clean := strings.TrimPrefix(filepath.Clean("/"+raw), "/") + if clean == "." || clean == "" || strings.HasPrefix(clean, "../") { + return "" + } + return clean +} + +func (a *app) mediaURL(rel string) string { + base := strings.TrimRight(a.cfg.PublicBasePath, "/") + return base + "/api/v1/media/" + strings.TrimPrefix(rel, "/") +} + +func aggregateMessages(items []messageOut, limit int) []messageOut { + out := make([]messageOut, 0, len(items)) + index := map[string]int{} + for _, item := range items { + key := messageGroupKey(item) + if pos, ok := index[key]; ok { + group := &out[pos] + group.GroupSize++ + group.HasMedia = group.HasMedia || item.HasMedia + group.MediaFiles = append(group.MediaFiles, item.MediaFiles...) + if group.Text == nil && item.Text != nil { + group.Text = item.Text + } + if len(group.Extracted) == 0 && len(item.Extracted) > 0 { + group.Extracted = item.Extracted + } + continue + } + if item.GroupSize < 1 { + item.GroupSize = 1 + } + index[key] = len(out) + out = append(out, item) + } + if len(out) > limit { + return out[:limit] + } + return out +} + +func messageGroupKey(item messageOut) string { + if item.GroupedID != nil { + return fmt.Sprintf("channel:%d:group:%d", item.ChannelID, *item.GroupedID) + } + return fmt.Sprintf("message:%d", item.ID) +} + func buildPostURL(identifier string, tgID sql.NullInt64, msgID int64) *string { identifier = strings.TrimSpace(identifier) if identifier == "" || msgID == 0 { @@ -1541,6 +1697,7 @@ func loadConfig() config { APIPort: envInt("API_PORT", 8000), PublicBasePath: env("PUBLIC_BASE_PATH", ""), PythonBaseURL: env("PYTHON_BASE_URL", "http://127.0.0.1:8001"), + MediaDir: env("MEDIA_DIR", "/data/media"), PostgresUser: env("POSTGRES_USER", "parser"), PostgresPassword: env("POSTGRES_PASSWORD", "parser"), PostgresDB: env("POSTGRES_DB", "parser"),