Serve monitoring TG message media
Some checks failed
Build and Deploy / build-and-deploy (push) Failing after 3m16s
Some checks failed
Build and Deploy / build-and-deploy (push) Failing after 3m16s
This commit is contained in:
@@ -13,6 +13,7 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
@@ -31,6 +32,7 @@ type config struct {
|
|||||||
APIPort int
|
APIPort int
|
||||||
PublicBasePath string
|
PublicBasePath string
|
||||||
PythonBaseURL string
|
PythonBaseURL string
|
||||||
|
MediaDir string
|
||||||
PostgresUser string
|
PostgresUser string
|
||||||
PostgresPassword string
|
PostgresPassword string
|
||||||
PostgresDB string
|
PostgresDB string
|
||||||
@@ -88,24 +90,25 @@ type channelOut struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type messageOut struct {
|
type messageOut struct {
|
||||||
ID int64 `json:"id"`
|
ID int64 `json:"id"`
|
||||||
ChannelID int64 `json:"channel_id"`
|
ChannelID int64 `json:"channel_id"`
|
||||||
ChannelVertical *string `json:"channel_vertical,omitempty"`
|
ChannelVertical *string `json:"channel_vertical,omitempty"`
|
||||||
ChannelSectionSlug *string `json:"channel_section_slug,omitempty"`
|
ChannelSectionSlug *string `json:"channel_section_slug,omitempty"`
|
||||||
TGMessageID int64 `json:"tg_message_id"`
|
TGMessageID int64 `json:"tg_message_id"`
|
||||||
GroupedID *int64 `json:"grouped_id,omitempty"`
|
GroupedID *int64 `json:"grouped_id,omitempty"`
|
||||||
GroupSize int `json:"group_size"`
|
GroupSize int `json:"group_size"`
|
||||||
Date time.Time `json:"date"`
|
Date time.Time `json:"date"`
|
||||||
Text *string `json:"text,omitempty"`
|
Text *string `json:"text,omitempty"`
|
||||||
SenderID *int64 `json:"sender_id,omitempty"`
|
SenderID *int64 `json:"sender_id,omitempty"`
|
||||||
SenderUsername *string `json:"sender_username,omitempty"`
|
SenderUsername *string `json:"sender_username,omitempty"`
|
||||||
SenderName *string `json:"sender_name,omitempty"`
|
SenderName *string `json:"sender_name,omitempty"`
|
||||||
PostURL *string `json:"post_url,omitempty"`
|
PostURL *string `json:"post_url,omitempty"`
|
||||||
HasMedia bool `json:"has_media"`
|
HasMedia bool `json:"has_media"`
|
||||||
Extracted json.RawMessage `json:"extracted,omitempty"`
|
MediaFiles []map[string]any `json:"media_files,omitempty"`
|
||||||
Views *int64 `json:"views,omitempty"`
|
Extracted json.RawMessage `json:"extracted,omitempty"`
|
||||||
Forwards *int64 `json:"forwards,omitempty"`
|
Views *int64 `json:"views,omitempty"`
|
||||||
FetchedAt time.Time `json:"fetched_at"`
|
Forwards *int64 `json:"forwards,omitempty"`
|
||||||
|
FetchedAt time.Time `json:"fetched_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@@ -179,6 +182,8 @@ func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
a.handleChannels(w, r)
|
a.handleChannels(w, r)
|
||||||
case strings.HasPrefix(path, "/api/v1/channels/"):
|
case strings.HasPrefix(path, "/api/v1/channels/"):
|
||||||
a.handleChannelItem(ctx, w, r, path)
|
a.handleChannelItem(ctx, w, r, path)
|
||||||
|
case strings.HasPrefix(path, "/api/v1/media/"):
|
||||||
|
a.handleMedia(w, r, path)
|
||||||
case path == "/api/v1/messages":
|
case path == "/api/v1/messages":
|
||||||
a.handleMessages(ctx, w, r)
|
a.handleMessages(ctx, w, r)
|
||||||
case strings.HasPrefix(path, "/api/v1/messages/"):
|
case strings.HasPrefix(path, "/api/v1/messages/"):
|
||||||
@@ -835,12 +840,13 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http
|
|||||||
args = append(args, scope.DeptID)
|
args = append(args, scope.DeptID)
|
||||||
where += fmt.Sprintf(" AND s.department_id = $%d", len(args))
|
where += fmt.Sprintf(" AND s.department_id = $%d", len(args))
|
||||||
}
|
}
|
||||||
args = append(args, limit, offset)
|
fetchLimit := clampInt(limit*5, limit, 1000)
|
||||||
|
args = append(args, fetchLimit, offset)
|
||||||
rows, err := a.db.Query(ctx, `
|
rows, err := a.db.Query(ctx, `
|
||||||
SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int,
|
SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int,
|
||||||
m.date, m.text, m.sender_id, m.sender_username, m.sender_name,
|
m.date, m.text, m.sender_id, m.sender_username, m.sender_name,
|
||||||
c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text,
|
c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text,
|
||||||
m.views, m.forwards, m.fetched_at
|
COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at
|
||||||
FROM messages m
|
FROM messages m
|
||||||
JOIN channels c ON c.id = m.channel_id
|
JOIN channels c ON c.id = m.channel_id
|
||||||
JOIN sections s ON s.id = c.section_id
|
JOIN sections s ON s.id = c.section_id
|
||||||
@@ -860,12 +866,14 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http
|
|||||||
writeDBError(w, err)
|
writeDBError(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
a.normalizeMessageMedia(&item)
|
||||||
out = append(out, item)
|
out = append(out, item)
|
||||||
}
|
}
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
writeDBError(w, err)
|
writeDBError(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
out = aggregateMessages(out, limit)
|
||||||
writeJSON(w, http.StatusOK, out)
|
writeJSON(w, http.StatusOK, out)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -894,7 +902,7 @@ func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *h
|
|||||||
SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int,
|
SELECT m.id, m.channel_id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int,
|
||||||
m.date, m.text, m.sender_id, m.sender_username, m.sender_name,
|
m.date, m.text, m.sender_id, m.sender_username, m.sender_name,
|
||||||
c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text,
|
c.identifier, c.tg_id, m.has_media, COALESCE(m.extracted, 'null'::jsonb)::text,
|
||||||
m.views, m.forwards, m.fetched_at
|
COALESCE(m.media_files, '[]'::jsonb)::text, m.views, m.forwards, m.fetched_at
|
||||||
FROM messages m
|
FROM messages m
|
||||||
JOIN channels c ON c.id = m.channel_id
|
JOIN channels c ON c.id = m.channel_id
|
||||||
JOIN sections s ON s.id = c.section_id
|
JOIN sections s ON s.id = c.section_id
|
||||||
@@ -905,9 +913,77 @@ func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *h
|
|||||||
writeDBError(w, err)
|
writeDBError(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
a.normalizeMessageMedia(&item)
|
||||||
writeJSON(w, http.StatusOK, item)
|
writeJSON(w, http.StatusOK, item)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *app) handleMedia(w http.ResponseWriter, r *http.Request, path string) {
|
||||||
|
if r.Method != http.MethodGet {
|
||||||
|
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
scope, ok := a.readScope(w, r, false)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rel, err := url.PathUnescape(strings.TrimPrefix(path, "/api/v1/media/"))
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusBadRequest, "bad media path")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
clean := strings.TrimPrefix(filepath.Clean("/"+rel), "/")
|
||||||
|
if clean == "." || clean == "" || strings.HasPrefix(clean, "../") {
|
||||||
|
writeError(w, http.StatusBadRequest, "bad media path")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
parts := strings.SplitN(clean, string(os.PathSeparator), 2)
|
||||||
|
channelID, err := strconv.ParseInt(parts[0], 10, 64)
|
||||||
|
if err != nil || channelID <= 0 {
|
||||||
|
writeError(w, http.StatusBadRequest, "bad media path")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
allowed, err := a.canReadChannelMedia(r.Context(), scope, channelID)
|
||||||
|
if err != nil {
|
||||||
|
writeDBError(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !allowed {
|
||||||
|
writeError(w, http.StatusNotFound, "not found")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
base, err := filepath.Abs(a.cfg.MediaDir)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, "media directory unavailable")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
full, err := filepath.Abs(filepath.Join(base, clean))
|
||||||
|
if err != nil || (full != base && !strings.HasPrefix(full, base+string(os.PathSeparator))) {
|
||||||
|
writeError(w, http.StatusBadRequest, "bad media path")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
http.ServeFile(w, r, full)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channelID int64) (bool, error) {
|
||||||
|
var dept sql.NullString
|
||||||
|
err := a.db.QueryRow(ctx, `
|
||||||
|
SELECT s.department_id
|
||||||
|
FROM channels c
|
||||||
|
JOIN sections s ON s.id = c.section_id
|
||||||
|
WHERE c.id = $1
|
||||||
|
`, channelID).Scan(&dept)
|
||||||
|
if errors.Is(err, pgx.ErrNoRows) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if scope.IsAdmin {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return dept.Valid && dept.String == scope.DeptID, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||||
if r.Method != http.MethodGet {
|
if r.Method != http.MethodGet {
|
||||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||||
@@ -1329,7 +1405,7 @@ func scanMessage(rows rowScanner) (messageOut, error) {
|
|||||||
var vertical, slug string
|
var vertical, slug string
|
||||||
var grouped, senderID, tgID, views, forwards sql.NullInt64
|
var grouped, senderID, tgID, views, forwards sql.NullInt64
|
||||||
var text, senderUsername, senderName, identifier sql.NullString
|
var text, senderUsername, senderName, identifier sql.NullString
|
||||||
var extractedText string
|
var extractedText, mediaText string
|
||||||
err := rows.Scan(
|
err := rows.Scan(
|
||||||
&item.ID,
|
&item.ID,
|
||||||
&item.ChannelID,
|
&item.ChannelID,
|
||||||
@@ -1347,6 +1423,7 @@ func scanMessage(rows rowScanner) (messageOut, error) {
|
|||||||
&tgID,
|
&tgID,
|
||||||
&item.HasMedia,
|
&item.HasMedia,
|
||||||
&extractedText,
|
&extractedText,
|
||||||
|
&mediaText,
|
||||||
&views,
|
&views,
|
||||||
&forwards,
|
&forwards,
|
||||||
&item.FetchedAt,
|
&item.FetchedAt,
|
||||||
@@ -1363,6 +1440,9 @@ func scanMessage(rows rowScanner) (messageOut, error) {
|
|||||||
if extractedText != "" && extractedText != "null" {
|
if extractedText != "" && extractedText != "null" {
|
||||||
item.Extracted = json.RawMessage(extractedText)
|
item.Extracted = json.RawMessage(extractedText)
|
||||||
}
|
}
|
||||||
|
if mediaText != "" && mediaText != "null" {
|
||||||
|
_ = json.Unmarshal([]byte(mediaText), &item.MediaFiles)
|
||||||
|
}
|
||||||
item.PostURL = buildPostURL(identifier.String, tgID, item.TGMessageID)
|
item.PostURL = buildPostURL(identifier.String, tgID, item.TGMessageID)
|
||||||
return item, err
|
return item, err
|
||||||
}
|
}
|
||||||
@@ -1371,6 +1451,82 @@ func scanMessageRow(row pgx.Row) (messageOut, error) {
|
|||||||
return scanMessage(row)
|
return scanMessage(row)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *app) normalizeMessageMedia(item *messageOut) {
|
||||||
|
if len(item.MediaFiles) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, file := range item.MediaFiles {
|
||||||
|
rawURL, _ := file["url"].(string)
|
||||||
|
rel := mediaRelativePath(rawURL)
|
||||||
|
if rel == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
file["url"] = a.mediaURL(rel)
|
||||||
|
if _, ok := file["name"].(string); !ok {
|
||||||
|
file["name"] = filepath.Base(rel)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mediaRelativePath(raw string) string {
|
||||||
|
raw = strings.TrimSpace(raw)
|
||||||
|
if raw == "" {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
if idx := strings.Index(raw, "/media/"); idx >= 0 {
|
||||||
|
raw = raw[idx+len("/media/"):]
|
||||||
|
}
|
||||||
|
raw = strings.TrimPrefix(raw, "/")
|
||||||
|
raw = strings.TrimPrefix(raw, "media/")
|
||||||
|
clean := strings.TrimPrefix(filepath.Clean("/"+raw), "/")
|
||||||
|
if clean == "." || clean == "" || strings.HasPrefix(clean, "../") {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return clean
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) mediaURL(rel string) string {
|
||||||
|
base := strings.TrimRight(a.cfg.PublicBasePath, "/")
|
||||||
|
return base + "/api/v1/media/" + strings.TrimPrefix(rel, "/")
|
||||||
|
}
|
||||||
|
|
||||||
|
func aggregateMessages(items []messageOut, limit int) []messageOut {
|
||||||
|
out := make([]messageOut, 0, len(items))
|
||||||
|
index := map[string]int{}
|
||||||
|
for _, item := range items {
|
||||||
|
key := messageGroupKey(item)
|
||||||
|
if pos, ok := index[key]; ok {
|
||||||
|
group := &out[pos]
|
||||||
|
group.GroupSize++
|
||||||
|
group.HasMedia = group.HasMedia || item.HasMedia
|
||||||
|
group.MediaFiles = append(group.MediaFiles, item.MediaFiles...)
|
||||||
|
if group.Text == nil && item.Text != nil {
|
||||||
|
group.Text = item.Text
|
||||||
|
}
|
||||||
|
if len(group.Extracted) == 0 && len(item.Extracted) > 0 {
|
||||||
|
group.Extracted = item.Extracted
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if item.GroupSize < 1 {
|
||||||
|
item.GroupSize = 1
|
||||||
|
}
|
||||||
|
index[key] = len(out)
|
||||||
|
out = append(out, item)
|
||||||
|
}
|
||||||
|
if len(out) > limit {
|
||||||
|
return out[:limit]
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func messageGroupKey(item messageOut) string {
|
||||||
|
if item.GroupedID != nil {
|
||||||
|
return fmt.Sprintf("channel:%d:group:%d", item.ChannelID, *item.GroupedID)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("message:%d", item.ID)
|
||||||
|
}
|
||||||
|
|
||||||
func buildPostURL(identifier string, tgID sql.NullInt64, msgID int64) *string {
|
func buildPostURL(identifier string, tgID sql.NullInt64, msgID int64) *string {
|
||||||
identifier = strings.TrimSpace(identifier)
|
identifier = strings.TrimSpace(identifier)
|
||||||
if identifier == "" || msgID == 0 {
|
if identifier == "" || msgID == 0 {
|
||||||
@@ -1541,6 +1697,7 @@ func loadConfig() config {
|
|||||||
APIPort: envInt("API_PORT", 8000),
|
APIPort: envInt("API_PORT", 8000),
|
||||||
PublicBasePath: env("PUBLIC_BASE_PATH", ""),
|
PublicBasePath: env("PUBLIC_BASE_PATH", ""),
|
||||||
PythonBaseURL: env("PYTHON_BASE_URL", "http://127.0.0.1:8001"),
|
PythonBaseURL: env("PYTHON_BASE_URL", "http://127.0.0.1:8001"),
|
||||||
|
MediaDir: env("MEDIA_DIR", "/data/media"),
|
||||||
PostgresUser: env("POSTGRES_USER", "parser"),
|
PostgresUser: env("POSTGRES_USER", "parser"),
|
||||||
PostgresPassword: env("POSTGRES_PASSWORD", "parser"),
|
PostgresPassword: env("POSTGRES_PASSWORD", "parser"),
|
||||||
PostgresDB: env("POSTGRES_DB", "parser"),
|
PostgresDB: env("POSTGRES_DB", "parser"),
|
||||||
|
|||||||
Reference in New Issue
Block a user