Compare commits
12 Commits
fd1ee0611b
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0f8b48869 | ||
|
|
cdbdea250d | ||
|
|
696b7eda6f | ||
|
|
bd3b54dc7d | ||
|
|
6cbfdb92e4 | ||
|
|
7cc5109cba | ||
|
|
90c0a346ed | ||
|
|
7f7f2427cb | ||
|
|
73afcb64d5 | ||
|
|
5eb8e21eda | ||
|
|
778b48cc12 | ||
|
|
1f1354e72b |
35
.gitea/scripts/hygiene-check.sh
Normal file
35
.gitea/scripts/hygiene-check.sh
Normal file
@@ -0,0 +1,35 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
fail=0
|
||||
|
||||
while IFS= read -r -d '' path; do
|
||||
base="$(basename "$path")"
|
||||
case "$base" in
|
||||
.DS_Store|.env)
|
||||
echo "::error file=$path::tracked local-only file is forbidden"
|
||||
fail=1
|
||||
;;
|
||||
esac
|
||||
|
||||
case "$path" in
|
||||
*node_modules/*|node_modules/*)
|
||||
echo "::error file=$path::tracked node_modules content is forbidden"
|
||||
fail=1
|
||||
;;
|
||||
*.tmp|*.temp|*.bak|*.orig|*.rej|*.zip|*.tar|*.tar.gz|*.tgz|*.rar|*.7z)
|
||||
echo "::error file=$path::tracked temporary/archive artifact is forbidden"
|
||||
fail=1
|
||||
;;
|
||||
esac
|
||||
|
||||
if [ -f "$path" ]; then
|
||||
size="$(wc -c < "$path" | tr -d ' ')"
|
||||
if [ "${size:-0}" -gt 52428800 ]; then
|
||||
echo "::error file=$path::tracked file is larger than 50 MiB"
|
||||
fail=1
|
||||
fi
|
||||
fi
|
||||
done < <(git ls-files -z)
|
||||
|
||||
exit "$fail"
|
||||
@@ -5,8 +5,15 @@ on:
|
||||
pull_request:
|
||||
|
||||
jobs:
|
||||
hygiene:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- run: bash .gitea/scripts/hygiene-check.sh
|
||||
|
||||
go:
|
||||
runs-on: ubuntu-latest
|
||||
needs: hygiene
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-go@v5
|
||||
@@ -22,6 +29,7 @@ jobs:
|
||||
|
||||
python:
|
||||
runs-on: ubuntu-latest
|
||||
needs: hygiene
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- run: python3 -m compileall src alembic
|
||||
|
||||
34
alembic/versions/0013_channel_poll_status.py
Normal file
34
alembic/versions/0013_channel_poll_status.py
Normal file
@@ -0,0 +1,34 @@
|
||||
"""channel poll status
|
||||
|
||||
Revision ID: 0013
|
||||
Revises: 0012
|
||||
Create Date: 2026-06-17
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
revision: str = "0013"
|
||||
down_revision: Union[str, None] = "0012"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column("channels", sa.Column("last_poll_status", sa.String(length=32), nullable=True))
|
||||
op.add_column("channels", sa.Column("last_poll_error_code", sa.String(length=64), nullable=True))
|
||||
op.add_column("channels", sa.Column("last_poll_error", sa.Text(), nullable=True))
|
||||
op.add_column("channels", sa.Column("last_poll_error_at", sa.DateTime(timezone=True), nullable=True))
|
||||
op.create_index("ix_channels_last_poll_status", "channels", ["last_poll_status"])
|
||||
op.create_index("ix_channels_last_poll_error_code", "channels", ["last_poll_error_code"])
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_index("ix_channels_last_poll_error_code", table_name="channels")
|
||||
op.drop_index("ix_channels_last_poll_status", table_name="channels")
|
||||
op.drop_column("channels", "last_poll_error_at")
|
||||
op.drop_column("channels", "last_poll_error")
|
||||
op.drop_column("channels", "last_poll_error_code")
|
||||
op.drop_column("channels", "last_poll_status")
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"time"
|
||||
|
||||
"monitoring-tg/internal/aiservice"
|
||||
"monitoring-tg/internal/dbretry"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
@@ -69,7 +70,7 @@ func main() {
|
||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
pool, err := pgxpool.New(ctx, cfg.databaseURL())
|
||||
pool, err := dbretry.Connect(ctx, cfg.databaseURL(), 2*time.Minute)
|
||||
if err != nil {
|
||||
slog.Error("db_connect_failed", "error", err)
|
||||
os.Exit(1)
|
||||
|
||||
@@ -21,7 +21,9 @@ import (
|
||||
"time"
|
||||
|
||||
"monitoring-tg/internal/aiservice"
|
||||
"monitoring-tg/internal/dbretry"
|
||||
|
||||
commonmw "gitea.estateliga.work/admin/portal-common/middleware"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/minio/minio-go/v7"
|
||||
@@ -49,6 +51,7 @@ type config struct {
|
||||
LLMTimeout time.Duration
|
||||
AIServiceURL string
|
||||
AIServiceToken string
|
||||
InternalAPIKey string
|
||||
MinioEndpoint string
|
||||
MinioAccessKey string
|
||||
MinioSecretKey string
|
||||
@@ -68,10 +71,13 @@ type app struct {
|
||||
}
|
||||
|
||||
type accessScope struct {
|
||||
IsAdmin bool
|
||||
CanManage bool
|
||||
CanAuth bool
|
||||
DeptID string
|
||||
IsAdmin bool
|
||||
CanManage bool
|
||||
CanManageAll bool
|
||||
CanAuth bool
|
||||
CanViewAll bool
|
||||
DeptID string
|
||||
DeptIDs []string
|
||||
}
|
||||
|
||||
type sectionOut struct {
|
||||
@@ -126,6 +132,14 @@ type messageOut struct {
|
||||
FetchedAt time.Time `json:"fetched_at"`
|
||||
}
|
||||
|
||||
type componentProbe struct {
|
||||
Name string `json:"name"`
|
||||
Status string `json:"status"`
|
||||
LatencyMs int64 `json:"latency_ms"`
|
||||
Error string `json:"error,omitempty"`
|
||||
Details any `json:"details,omitempty"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
cfg := loadConfig()
|
||||
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
||||
@@ -134,7 +148,7 @@ func main() {
|
||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
pool, err := pgxpool.New(ctx, cfg.databaseURL())
|
||||
pool, err := dbretry.Connect(ctx, cfg.databaseURL(), 2*time.Minute)
|
||||
if err != nil {
|
||||
slog.Error("db_connect_failed", "error", err)
|
||||
os.Exit(1)
|
||||
@@ -182,6 +196,10 @@ func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
return
|
||||
}
|
||||
if path == "/health/detail" {
|
||||
a.handleHealthDetail(w, r)
|
||||
return
|
||||
}
|
||||
if path == "/" {
|
||||
writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-tg", "ui": "portal", "api": "go"})
|
||||
return
|
||||
@@ -190,7 +208,11 @@ func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
writeError(w, http.StatusNotFound, "not found")
|
||||
return
|
||||
}
|
||||
commonmw.InternalAuth(a.cfg.InternalAPIKey)(http.HandlerFunc(a.serveAPI)).ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func (a *app) serveAPI(w http.ResponseWriter, r *http.Request) {
|
||||
path := a.apiPath(r.URL.Path)
|
||||
ctx := r.Context()
|
||||
switch {
|
||||
case r.Method == http.MethodGet && path == "/api/v1/access/me":
|
||||
@@ -237,13 +259,392 @@ func (a *app) apiPath(path string) string {
|
||||
return path
|
||||
}
|
||||
|
||||
func (a *app) handleHealthDetail(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
components := []componentProbe{
|
||||
a.probePostgres(ctx),
|
||||
a.probeAIService(ctx),
|
||||
a.probeClassificationQueue(ctx),
|
||||
a.probeAIJobs(ctx),
|
||||
a.probePoller(ctx),
|
||||
a.probePollErrors(ctx),
|
||||
a.probeMediaStorage(ctx),
|
||||
a.probeMediaMetadata(ctx),
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"components": components})
|
||||
}
|
||||
|
||||
func (a *app) probePostgres(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
if err := a.db.Ping(ctx); err != nil {
|
||||
return componentProbe{Name: "postgres", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||
}
|
||||
return componentProbe{Name: "postgres", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||
}
|
||||
|
||||
func (a *app) probeAIService(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
if !a.cfg.LLMEnabled {
|
||||
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm disabled"}
|
||||
}
|
||||
status, err := a.ai.ProvidersStatus(ctx)
|
||||
if err != nil {
|
||||
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||
}
|
||||
for _, provider := range status.Providers {
|
||||
if provider.Name != "llm" {
|
||||
continue
|
||||
}
|
||||
if provider.Configured && provider.OK {
|
||||
return componentProbe{Name: "ai_service", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||
}
|
||||
errMsg := strings.TrimSpace(provider.Error)
|
||||
if errMsg == "" {
|
||||
errMsg = "llm provider is not ready"
|
||||
}
|
||||
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: errMsg}
|
||||
}
|
||||
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm provider not found"}
|
||||
}
|
||||
|
||||
func (a *app) probeClassificationQueue(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
var pending, pending24h int64
|
||||
err := a.db.QueryRow(ctx, `
|
||||
SELECT
|
||||
COUNT(*)::bigint,
|
||||
COUNT(*) FILTER (WHERE m.fetched_at >= now() - interval '24 hours')::bigint
|
||||
FROM channels c
|
||||
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
|
||||
JOIN messages m ON m.channel_id = src.id
|
||||
JOIN sections s ON s.id = c.section_id
|
||||
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
|
||||
WHERE m.text IS NOT NULL
|
||||
AND mc.id IS NULL`,
|
||||
).Scan(&pending, &pending24h)
|
||||
if err != nil {
|
||||
return componentProbe{Name: "classification_queue", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||
}
|
||||
if pending > 0 {
|
||||
return componentProbe{
|
||||
Name: "classification_queue",
|
||||
Status: "down",
|
||||
LatencyMs: time.Since(start).Milliseconds(),
|
||||
Error: "pending=" + strconv.FormatInt(pending, 10) + " pending_24h=" + strconv.FormatInt(pending24h, 10),
|
||||
}
|
||||
}
|
||||
return componentProbe{Name: "classification_queue", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||
}
|
||||
|
||||
func (a *app) probeAIJobs(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
if !a.cfg.LLMEnabled {
|
||||
return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm disabled"}
|
||||
}
|
||||
stats, err := a.ai.Stats(ctx)
|
||||
if err != nil {
|
||||
return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||
}
|
||||
var pending, running, staleRunning, failed, failed24h int64
|
||||
for _, row := range stats.Backlog {
|
||||
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
|
||||
continue
|
||||
}
|
||||
pending += row.Pending
|
||||
running += row.Running
|
||||
staleRunning += row.StaleRunning
|
||||
}
|
||||
for _, row := range stats.Owners {
|
||||
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
|
||||
continue
|
||||
}
|
||||
if row.Status == "failed" {
|
||||
failed += row.Total
|
||||
}
|
||||
}
|
||||
for _, row := range stats.Errors {
|
||||
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
|
||||
continue
|
||||
}
|
||||
failed24h += row.Last24h
|
||||
}
|
||||
if staleRunning > 0 || failed24h > 0 {
|
||||
return componentProbe{
|
||||
Name: "ai_jobs",
|
||||
Status: "down",
|
||||
LatencyMs: time.Since(start).Milliseconds(),
|
||||
Error: "pending=" + strconv.FormatInt(pending, 10) +
|
||||
" running=" + strconv.FormatInt(running, 10) +
|
||||
" stale_running=" + strconv.FormatInt(staleRunning, 10) +
|
||||
" failed=" + strconv.FormatInt(failed, 10) +
|
||||
" failed_24h=" + strconv.FormatInt(failed24h, 10),
|
||||
}
|
||||
}
|
||||
if pending > 0 || running > 0 || failed > 0 {
|
||||
return componentProbe{
|
||||
Name: "ai_jobs",
|
||||
Status: "degraded",
|
||||
LatencyMs: time.Since(start).Milliseconds(),
|
||||
Error: "pending=" + strconv.FormatInt(pending, 10) +
|
||||
" running=" + strconv.FormatInt(running, 10) +
|
||||
" stale_running=" + strconv.FormatInt(staleRunning, 10) +
|
||||
" failed=" + strconv.FormatInt(failed, 10),
|
||||
}
|
||||
}
|
||||
return componentProbe{Name: "ai_jobs", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||
}
|
||||
|
||||
func (a *app) probePoller(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
staleAfter := maxInt(a.cfg.PollIntervalSeconds*3, 900)
|
||||
var active, neverPolled, stale int64
|
||||
err := a.db.QueryRow(ctx, `
|
||||
SELECT
|
||||
COUNT(*)::bigint,
|
||||
COUNT(*) FILTER (WHERE last_polled_at IS NULL)::bigint,
|
||||
COUNT(*) FILTER (WHERE last_polled_at IS NOT NULL AND last_polled_at < now() - ($1::int * interval '1 second'))::bigint
|
||||
FROM channels
|
||||
WHERE is_active = true
|
||||
AND source_channel_id IS NULL`,
|
||||
staleAfter,
|
||||
).Scan(&active, &neverPolled, &stale)
|
||||
if err != nil {
|
||||
return componentProbe{Name: "poller", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||
}
|
||||
if active > 0 && (neverPolled > 0 || stale > 0) {
|
||||
return componentProbe{
|
||||
Name: "poller",
|
||||
Status: "down",
|
||||
LatencyMs: time.Since(start).Milliseconds(),
|
||||
Error: "active=" + strconv.FormatInt(active, 10) +
|
||||
" never_polled=" + strconv.FormatInt(neverPolled, 10) +
|
||||
" stale=" + strconv.FormatInt(stale, 10) +
|
||||
" stale_after_sec=" + strconv.Itoa(staleAfter),
|
||||
}
|
||||
}
|
||||
return componentProbe{Name: "poller", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||
}
|
||||
|
||||
func (a *app) probePollErrors(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
var total, floodWait, unavailable, other int64
|
||||
err := a.db.QueryRow(ctx, `
|
||||
SELECT
|
||||
COUNT(*) FILTER (WHERE last_poll_status = 'error')::bigint,
|
||||
COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_flood_wait')::bigint,
|
||||
COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_channel_unavailable')::bigint,
|
||||
COUNT(*) FILTER (
|
||||
WHERE last_poll_status = 'error'
|
||||
AND COALESCE(last_poll_error_code, '') NOT IN ('telegram_flood_wait', 'telegram_channel_unavailable')
|
||||
)::bigint
|
||||
FROM channels
|
||||
WHERE is_active = true
|
||||
AND source_channel_id IS NULL`,
|
||||
).Scan(&total, &floodWait, &unavailable, &other)
|
||||
if err != nil {
|
||||
return componentProbe{Name: "poll_errors", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||
}
|
||||
if total > 0 {
|
||||
recent, recentErr := a.recentPollErrors(ctx)
|
||||
status := "degraded"
|
||||
if other > 0 {
|
||||
status = "down"
|
||||
}
|
||||
details := map[string]any{
|
||||
"total": total,
|
||||
"flood_wait": floodWait,
|
||||
"unavailable": unavailable,
|
||||
"other": other,
|
||||
}
|
||||
if recentErr != nil {
|
||||
details["recent_error"] = recentErr.Error()
|
||||
} else {
|
||||
details["recent_errors"] = recent
|
||||
}
|
||||
return componentProbe{
|
||||
Name: "poll_errors",
|
||||
Status: status,
|
||||
LatencyMs: time.Since(start).Milliseconds(),
|
||||
Error: "total=" + strconv.FormatInt(total, 10) +
|
||||
" flood_wait=" + strconv.FormatInt(floodWait, 10) +
|
||||
" unavailable=" + strconv.FormatInt(unavailable, 10) +
|
||||
" other=" + strconv.FormatInt(other, 10),
|
||||
Details: details,
|
||||
}
|
||||
}
|
||||
return componentProbe{Name: "poll_errors", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||
}
|
||||
|
||||
func (a *app) recentPollErrors(ctx context.Context) ([]map[string]any, error) {
|
||||
rows, err := a.db.Query(ctx, `
|
||||
SELECT
|
||||
c.id,
|
||||
c.identifier,
|
||||
COALESCE(c.title, '') AS title,
|
||||
s.slug,
|
||||
s.title AS section_title,
|
||||
COALESCE(c.last_poll_error_code, '') AS error_code,
|
||||
COALESCE(c.last_poll_error, '') AS error_text,
|
||||
c.last_poll_error_at
|
||||
FROM channels c
|
||||
LEFT JOIN sections s ON s.id = c.section_id
|
||||
WHERE c.is_active = true
|
||||
AND c.source_channel_id IS NULL
|
||||
AND c.last_poll_status = 'error'
|
||||
ORDER BY c.last_poll_error_at DESC NULLS LAST, c.id DESC
|
||||
LIMIT 5`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := make([]map[string]any, 0, 5)
|
||||
for rows.Next() {
|
||||
var (
|
||||
id int64
|
||||
identifier string
|
||||
title string
|
||||
sectionSlug sql.NullString
|
||||
sectionTitle sql.NullString
|
||||
code string
|
||||
text string
|
||||
at sql.NullTime
|
||||
)
|
||||
if err := rows.Scan(&id, &identifier, &title, §ionSlug, §ionTitle, &code, &text, &at); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, map[string]any{
|
||||
"channel_id": id,
|
||||
"identifier": identifier,
|
||||
"title": nullableString(title),
|
||||
"section_slug": nullString(sectionSlug),
|
||||
"section_title": nullString(sectionTitle),
|
||||
"error_code": nullableString(code),
|
||||
"error": nullableString(text),
|
||||
"error_at": nullTime(at),
|
||||
})
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func (a *app) probeMediaStorage(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
if a.minio == nil || a.cfg.MinioBucket == "" {
|
||||
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "minio not configured"}
|
||||
}
|
||||
exists, err := a.minio.BucketExists(ctx, a.cfg.MinioBucket)
|
||||
if err != nil {
|
||||
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||
}
|
||||
if !exists {
|
||||
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "bucket not found: " + a.cfg.MinioBucket}
|
||||
}
|
||||
return componentProbe{Name: "media_storage", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||
}
|
||||
|
||||
func (a *app) probeMediaMetadata(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
var withMedia, missingFiles int64
|
||||
err := a.db.QueryRow(ctx, `
|
||||
SELECT
|
||||
COUNT(*) FILTER (WHERE has_media = true)::bigint,
|
||||
COUNT(*) FILTER (
|
||||
WHERE has_media = true
|
||||
AND jsonb_array_length(COALESCE(media_files, '[]'::jsonb)) = 0
|
||||
)::bigint
|
||||
FROM messages`,
|
||||
).Scan(&withMedia, &missingFiles)
|
||||
if err != nil {
|
||||
return componentProbe{Name: "media_metadata", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||
}
|
||||
if missingFiles > 0 {
|
||||
recent, recentErr := a.recentMissingMedia(ctx)
|
||||
details := map[string]any{
|
||||
"messages_with_media": withMedia,
|
||||
"missing_media_files": missingFiles,
|
||||
}
|
||||
if recentErr != nil {
|
||||
details["recent_error"] = recentErr.Error()
|
||||
} else {
|
||||
details["recent_missing_media"] = recent
|
||||
}
|
||||
return componentProbe{
|
||||
Name: "media_metadata",
|
||||
Status: "down",
|
||||
LatencyMs: time.Since(start).Milliseconds(),
|
||||
Error: "messages_with_media=" + strconv.FormatInt(withMedia, 10) +
|
||||
" missing_media_files=" + strconv.FormatInt(missingFiles, 10),
|
||||
Details: details,
|
||||
}
|
||||
}
|
||||
return componentProbe{Name: "media_metadata", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||
}
|
||||
|
||||
func (a *app) recentMissingMedia(ctx context.Context) ([]map[string]any, error) {
|
||||
rows, err := a.db.Query(ctx, `
|
||||
SELECT
|
||||
m.id,
|
||||
m.tg_message_id,
|
||||
m.date,
|
||||
c.id AS channel_id,
|
||||
c.identifier,
|
||||
COALESCE(c.title, '') AS channel_title,
|
||||
s.slug,
|
||||
s.title AS section_title
|
||||
FROM messages m
|
||||
JOIN channels c ON c.id = m.channel_id
|
||||
LEFT JOIN sections s ON s.id = c.section_id
|
||||
WHERE m.has_media = true
|
||||
AND jsonb_array_length(COALESCE(m.media_files, '[]'::jsonb)) = 0
|
||||
ORDER BY m.date DESC, m.id DESC
|
||||
LIMIT 5`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := make([]map[string]any, 0, 5)
|
||||
for rows.Next() {
|
||||
var (
|
||||
id int64
|
||||
tgMessageID int64
|
||||
date time.Time
|
||||
channelID int64
|
||||
identifier string
|
||||
channelTitle string
|
||||
sectionSlug sql.NullString
|
||||
sectionTitle sql.NullString
|
||||
)
|
||||
if err := rows.Scan(&id, &tgMessageID, &date, &channelID, &identifier, &channelTitle, §ionSlug, §ionTitle); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, map[string]any{
|
||||
"message_id": id,
|
||||
"tg_message_id": tgMessageID,
|
||||
"message_date": date,
|
||||
"channel_id": channelID,
|
||||
"identifier": identifier,
|
||||
"channel_title": nullableString(channelTitle),
|
||||
"section_slug": nullString(sectionSlug),
|
||||
"section_title": nullString(sectionTitle),
|
||||
"repair_action": "/api/v1/channels/" + strconv.FormatInt(channelID, 10) + "/backfill-media",
|
||||
})
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func (a *app) handleAccessMe(w http.ResponseWriter, r *http.Request) {
|
||||
scope := readAccess(r)
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"is_admin": scope.IsAdmin,
|
||||
"can_manage_department": scope.CanManage,
|
||||
"can_manage_all": scope.CanManageAll,
|
||||
"can_auth_telegram": scope.CanAuth,
|
||||
"can_view_all": scope.CanViewAll,
|
||||
"department_id": nullableString(scope.DeptID),
|
||||
"department_ids": scope.departmentIDs(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -270,9 +671,8 @@ func (a *app) listSections(ctx context.Context, w http.ResponseWriter, r *http.R
|
||||
|
||||
args := []any{vertical}
|
||||
deptFilter := ""
|
||||
if !scope.IsAdmin {
|
||||
args = append(args, scope.DeptID)
|
||||
deptFilter = fmt.Sprintf(" AND s.department_id = $%d", len(args))
|
||||
if !scope.canReadAll() {
|
||||
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
||||
}
|
||||
|
||||
rows, err := a.db.Query(ctx, `
|
||||
@@ -334,11 +734,12 @@ func (a *app) createSection(ctx context.Context, w http.ResponseWriter, r *http.
|
||||
return
|
||||
}
|
||||
var payload struct {
|
||||
Vertical string `json:"vertical"`
|
||||
Slug string `json:"slug"`
|
||||
Title string `json:"title"`
|
||||
Emoji *string `json:"emoji"`
|
||||
Description *string `json:"description"`
|
||||
Vertical string `json:"vertical"`
|
||||
DepartmentID *string `json:"department_id"`
|
||||
Slug string `json:"slug"`
|
||||
Title string `json:"title"`
|
||||
Emoji *string `json:"emoji"`
|
||||
Description *string `json:"description"`
|
||||
}
|
||||
if !readBody(w, r, &payload) {
|
||||
return
|
||||
@@ -350,7 +751,11 @@ func (a *app) createSection(ctx context.Context, w http.ResponseWriter, r *http.
|
||||
writeError(w, http.StatusBadRequest, "vertical, slug and title are required")
|
||||
return
|
||||
}
|
||||
dept := nullableString(scope.DeptID)
|
||||
deptID := scope.primaryDepartmentID()
|
||||
if scope.CanManageAll && payload.DepartmentID != nil {
|
||||
deptID = strings.TrimSpace(*payload.DepartmentID)
|
||||
}
|
||||
dept := nullableString(deptID)
|
||||
row := a.db.QueryRow(ctx, `
|
||||
INSERT INTO sections (vertical, department_id, slug, title, emoji, description)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
@@ -441,9 +846,10 @@ func (a *app) updateSection(ctx context.Context, w http.ResponseWriter, r *http.
|
||||
}
|
||||
args = append(args, vertical, slug)
|
||||
where := fmt.Sprintf("vertical = $%d AND slug = $%d", len(args)-1, len(args))
|
||||
if !scope.IsAdmin {
|
||||
args = append(args, scope.DeptID)
|
||||
where += fmt.Sprintf(" AND department_id = $%d", len(args))
|
||||
if !scope.CanManageAll {
|
||||
var deptFilter string
|
||||
args, deptFilter = appendDepartmentFilter(args, scope, "department_id")
|
||||
where += deptFilter
|
||||
}
|
||||
row := a.db.QueryRow(ctx, `
|
||||
UPDATE sections
|
||||
@@ -468,7 +874,7 @@ func (a *app) deleteSection(ctx context.Context, w http.ResponseWriter, r *http.
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
section, err := a.findSection(ctx, vertical, slug, scope)
|
||||
section, err := a.findSection(ctx, vertical, slug, scope.forManageLookup())
|
||||
if err != nil {
|
||||
writeDBError(w, err)
|
||||
return
|
||||
@@ -492,9 +898,10 @@ func (a *app) deleteSection(ctx context.Context, w http.ResponseWriter, r *http.
|
||||
func (a *app) findSection(ctx context.Context, vertical, slug string, scope accessScope) (sectionOut, error) {
|
||||
args := []any{vertical, slug}
|
||||
where := "s.vertical = $1 AND s.slug = $2"
|
||||
if !scope.IsAdmin {
|
||||
args = append(args, scope.DeptID)
|
||||
where += fmt.Sprintf(" AND s.department_id = $%d", len(args))
|
||||
if !scope.canReadAll() {
|
||||
var deptFilter string
|
||||
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
||||
where += deptFilter
|
||||
}
|
||||
row := a.db.QueryRow(ctx, `
|
||||
SELECT s.id, s.vertical, COALESCE(s.department_id, ''), s.slug, s.title, COALESCE(s.emoji, ''), COALESCE(s.description, ''), s.created_at,
|
||||
@@ -535,9 +942,10 @@ func (a *app) listChannels(ctx context.Context, w http.ResponseWriter, r *http.R
|
||||
args = append(args, section)
|
||||
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
|
||||
}
|
||||
if !scope.IsAdmin {
|
||||
args = append(args, scope.DeptID)
|
||||
where += fmt.Sprintf(" AND s.department_id = $%d", len(args))
|
||||
if !scope.canReadAll() {
|
||||
var deptFilter string
|
||||
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
||||
where += deptFilter
|
||||
}
|
||||
rows, err := a.db.Query(ctx, `
|
||||
SELECT c.id, COALESCE(c.tg_id, src.tg_id), c.source_channel_id,
|
||||
@@ -591,7 +999,7 @@ func (a *app) createChannel(ctx context.Context, w http.ResponseWriter, r *http.
|
||||
writeError(w, http.StatusBadRequest, "identifier, vertical and section are required")
|
||||
return
|
||||
}
|
||||
section, err := a.findSection(ctx, payload.Vertical, payload.Section, scope)
|
||||
section, err := a.findSection(ctx, payload.Vertical, payload.Section, scope.forManageLookup())
|
||||
if err != nil {
|
||||
writeDBError(w, err)
|
||||
return
|
||||
@@ -684,7 +1092,7 @@ func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http.
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil {
|
||||
if _, err := a.findChannel(ctx, id, scope.forManageLookup(), r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil {
|
||||
writeDBError(w, err)
|
||||
return
|
||||
}
|
||||
@@ -711,7 +1119,7 @@ func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http.
|
||||
if payload.Vertical != nil && strings.TrimSpace(*payload.Vertical) != "" {
|
||||
vertical = strings.TrimSpace(*payload.Vertical)
|
||||
}
|
||||
section, err := a.findSection(ctx, vertical, strings.TrimSpace(*payload.Section), scope)
|
||||
section, err := a.findSection(ctx, vertical, strings.TrimSpace(*payload.Section), scope.forManageLookup())
|
||||
if err != nil {
|
||||
writeDBError(w, err)
|
||||
return
|
||||
@@ -745,7 +1153,7 @@ func (a *app) deleteChannel(ctx context.Context, w http.ResponseWriter, r *http.
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil {
|
||||
if _, err := a.findChannel(ctx, id, scope.forManageLookup(), r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil {
|
||||
writeDBError(w, err)
|
||||
return
|
||||
}
|
||||
@@ -767,9 +1175,10 @@ func (a *app) findChannel(ctx context.Context, id int64, scope accessScope, vert
|
||||
args = append(args, strings.TrimSpace(section))
|
||||
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
|
||||
}
|
||||
if !scope.IsAdmin {
|
||||
args = append(args, scope.DeptID)
|
||||
where += fmt.Sprintf(" AND s.department_id = $%d", len(args))
|
||||
if !scope.canReadAll() {
|
||||
var deptFilter string
|
||||
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
||||
where += deptFilter
|
||||
}
|
||||
row := a.db.QueryRow(ctx, `
|
||||
SELECT c.id, COALESCE(c.tg_id, src.tg_id), c.source_channel_id,
|
||||
@@ -834,7 +1243,7 @@ func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *ht
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section"))
|
||||
ch, err := a.findChannel(ctx, id, scope.forManageLookup(), r.URL.Query().Get("vertical"), r.URL.Query().Get("section"))
|
||||
if err != nil {
|
||||
writeDBError(w, err)
|
||||
return
|
||||
@@ -916,9 +1325,10 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http
|
||||
args = append(args, key, field)
|
||||
where += fmt.Sprintf(" AND COALESCE(mc.verdict ->> $%d, m.extracted -> $%d ->> $%d) = 'true'", len(args), len(args)-1, len(args))
|
||||
}
|
||||
if !scope.IsAdmin {
|
||||
args = append(args, scope.DeptID)
|
||||
where += fmt.Sprintf(" AND s.department_id = $%d", len(args))
|
||||
if !scope.canReadAll() {
|
||||
var deptFilter string
|
||||
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
||||
where += deptFilter
|
||||
}
|
||||
fetchLimit := clampInt(limit*5, limit, 1000)
|
||||
args = append(args, fetchLimit, offset)
|
||||
@@ -984,9 +1394,10 @@ func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *h
|
||||
}
|
||||
args := []any{id}
|
||||
where := "m.id = $1"
|
||||
if !scope.IsAdmin {
|
||||
args = append(args, scope.DeptID)
|
||||
where += fmt.Sprintf(" AND s.department_id = $%d", len(args))
|
||||
if !scope.canReadAll() {
|
||||
var deptFilter string
|
||||
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
||||
where += deptFilter
|
||||
}
|
||||
row := a.db.QueryRow(ctx, `
|
||||
SELECT m.id, c.id, c.vertical, s.slug, m.tg_message_id, m.grouped_id, 1::int,
|
||||
@@ -1102,11 +1513,11 @@ func (a *app) serveMinioMedia(w http.ResponseWriter, r *http.Request, key string
|
||||
func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channelID int64) (bool, error) {
|
||||
var allowed bool
|
||||
err := a.db.QueryRow(ctx, `
|
||||
SELECT COALESCE(bool_or(s.department_id = $2 OR $3::boolean), false)
|
||||
SELECT COALESCE(bool_or($3::boolean OR s.department_id::text = ANY($2::text[])), false)
|
||||
FROM channels c
|
||||
JOIN sections s ON s.id = c.section_id
|
||||
WHERE c.id = $1 OR c.source_channel_id = $1
|
||||
`, channelID, scope.DeptID, scope.IsAdmin).Scan(&allowed)
|
||||
`, channelID, scope.departmentIDs(), scope.canReadAll()).Scan(&allowed)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return false, nil
|
||||
}
|
||||
@@ -1136,9 +1547,10 @@ func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Re
|
||||
args = append(args, section)
|
||||
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
|
||||
}
|
||||
if !scope.IsAdmin {
|
||||
args = append(args, scope.DeptID)
|
||||
where += fmt.Sprintf(" AND s.department_id = $%d", len(args))
|
||||
if !scope.canReadAll() {
|
||||
var deptFilter string
|
||||
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
||||
where += deptFilter
|
||||
}
|
||||
|
||||
var channelsTotal, channelsActive, messagesTotal, messages24h, leadsTotal, leads24h int64
|
||||
@@ -1254,9 +1666,10 @@ func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, secti
|
||||
args = append(args, section)
|
||||
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
|
||||
}
|
||||
if !scope.IsAdmin {
|
||||
args = append(args, scope.DeptID)
|
||||
where += fmt.Sprintf(" AND s.department_id = $%d", len(args))
|
||||
if !scope.canReadAll() {
|
||||
var deptFilter string
|
||||
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
||||
where += deptFilter
|
||||
}
|
||||
var pending int64
|
||||
err := a.db.QueryRow(ctx, `
|
||||
@@ -1293,25 +1706,24 @@ func (a *app) getPrompt(ctx context.Context, w http.ResponseWriter, r *http.Requ
|
||||
return
|
||||
}
|
||||
section := strings.TrimSpace(r.URL.Query().Get("section"))
|
||||
if section != "" {
|
||||
if _, err := a.findSection(ctx, vertical, section, scope); err != nil {
|
||||
writeDBError(w, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
prompt, source, err := a.resolvePrompt(ctx, scope.DeptID, vertical, section)
|
||||
deptID, err := a.promptDepartmentID(ctx, scope, vertical, section)
|
||||
if err != nil {
|
||||
writeDBError(w, err)
|
||||
return
|
||||
}
|
||||
overridden, err := a.promptExists(ctx, scope.DeptID, vertical, section)
|
||||
prompt, source, err := a.resolvePrompt(ctx, deptID, vertical, section)
|
||||
if err != nil {
|
||||
writeDBError(w, err)
|
||||
return
|
||||
}
|
||||
overridden, err := a.promptExists(ctx, deptID, vertical, section)
|
||||
if err != nil {
|
||||
writeDBError(w, err)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"vertical": vertical,
|
||||
"department_id": nullableString(scope.DeptID),
|
||||
"department_id": nullableString(deptID),
|
||||
"section": nullableString(section),
|
||||
"prompt": prompt,
|
||||
"default": defaultPrompt(vertical),
|
||||
@@ -1345,7 +1757,12 @@ func (a *app) savePrompt(ctx context.Context, w http.ResponseWriter, r *http.Req
|
||||
writeError(w, http.StatusBadRequest, "prompt is too long (max 30000 chars)")
|
||||
return
|
||||
}
|
||||
key := promptKey(scope.DeptID, vertical, section)
|
||||
deptID, err := a.promptDepartmentID(ctx, scope.forManageLookup(), vertical, section)
|
||||
if err != nil {
|
||||
writeDBError(w, err)
|
||||
return
|
||||
}
|
||||
key := promptKey(deptID, vertical, section)
|
||||
value, _ := json.Marshal(text)
|
||||
if _, err := a.db.Exec(ctx, `
|
||||
INSERT INTO app_settings (key, value, updated_at)
|
||||
@@ -1355,7 +1772,7 @@ func (a *app) savePrompt(ctx context.Context, w http.ResponseWriter, r *http.Req
|
||||
writeDBError(w, err)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"saved": true, "vertical": vertical, "department_id": nullableString(scope.DeptID), "section": nullableString(section), "length": len(text)})
|
||||
writeJSON(w, http.StatusOK, map[string]any{"saved": true, "vertical": vertical, "department_id": nullableString(deptID), "section": nullableString(section), "length": len(text)})
|
||||
}
|
||||
|
||||
func (a *app) resetPrompt(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
@@ -1368,13 +1785,29 @@ func (a *app) resetPrompt(ctx context.Context, w http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
section := strings.TrimSpace(r.URL.Query().Get("section"))
|
||||
if _, err := a.db.Exec(ctx, `DELETE FROM app_settings WHERE key = $1`, promptKey(scope.DeptID, vertical, section)); err != nil {
|
||||
deptID, err := a.promptDepartmentID(ctx, scope.forManageLookup(), vertical, section)
|
||||
if err != nil {
|
||||
writeDBError(w, err)
|
||||
return
|
||||
}
|
||||
if _, err := a.db.Exec(ctx, `DELETE FROM app_settings WHERE key = $1`, promptKey(deptID, vertical, section)); err != nil {
|
||||
writeDBError(w, err)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (a *app) promptDepartmentID(ctx context.Context, scope accessScope, vertical, section string) (string, error) {
|
||||
if strings.TrimSpace(section) == "" {
|
||||
return scope.primaryDepartmentID(), nil
|
||||
}
|
||||
item, err := a.findSection(ctx, vertical, section, scope)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return valueOrEmpty(item.DepartmentID), nil
|
||||
}
|
||||
|
||||
func (a *app) resolvePrompt(ctx context.Context, deptID, vertical, section string) (string, string, error) {
|
||||
keys := []struct {
|
||||
key string
|
||||
@@ -1457,11 +1890,11 @@ func (a *app) readScope(w http.ResponseWriter, r *http.Request, manage bool) (ac
|
||||
writeError(w, http.StatusNotFound, "not found")
|
||||
return scope, false
|
||||
}
|
||||
} else if !scope.IsAdmin && scope.DeptID == "" {
|
||||
} else if !scope.canReadAll() && len(scope.departmentIDs()) == 0 {
|
||||
writeError(w, http.StatusForbidden, "department is required")
|
||||
return scope, false
|
||||
}
|
||||
if manage && !scope.IsAdmin && scope.DeptID == "" {
|
||||
if manage && !scope.CanManageAll && len(scope.departmentIDs()) == 0 {
|
||||
writeError(w, http.StatusForbidden, "department is required")
|
||||
return scope, false
|
||||
}
|
||||
@@ -1469,16 +1902,79 @@ func (a *app) readScope(w http.ResponseWriter, r *http.Request, manage bool) (ac
|
||||
}
|
||||
|
||||
func readAccess(r *http.Request) accessScope {
|
||||
admin := r.Header.Get("X-User-Is-Admin") == "1"
|
||||
deptHead := r.Header.Get("X-User-Is-Department-Head") == "1"
|
||||
canManage := r.Header.Get("X-Monitoring-TG-Can-Manage") == "1"
|
||||
canAuth := r.Header.Get("X-Monitoring-TG-Can-Auth") == "1"
|
||||
return accessScope{
|
||||
IsAdmin: admin,
|
||||
CanManage: admin || deptHead || canManage,
|
||||
CanAuth: admin || canAuth,
|
||||
DeptID: strings.TrimSpace(r.Header.Get("X-User-Department-Id")),
|
||||
admin := commonmw.HeaderBool(r, "X-User-Is-Admin")
|
||||
deptHead := commonmw.HeaderBool(r, "X-User-Is-Department-Head")
|
||||
canManagePermission := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-Manage")
|
||||
canAuth := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-Auth")
|
||||
canViewAll := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-View-All")
|
||||
deptID := strings.TrimSpace(r.Header.Get("X-User-Department-Id"))
|
||||
deptIDs := commonmw.HeaderCSV(r, "X-User-Department-Ids")
|
||||
if deptID != "" {
|
||||
deptIDs = appendUniqueString(deptIDs, deptID)
|
||||
}
|
||||
return accessScope{
|
||||
IsAdmin: admin,
|
||||
CanManage: admin || deptHead || canManagePermission,
|
||||
CanManageAll: admin || (canManagePermission && canViewAll),
|
||||
CanAuth: admin || canAuth,
|
||||
CanViewAll: admin || canViewAll,
|
||||
DeptID: deptID,
|
||||
DeptIDs: deptIDs,
|
||||
}
|
||||
}
|
||||
|
||||
func (s accessScope) canReadAll() bool {
|
||||
return s.IsAdmin || s.CanViewAll
|
||||
}
|
||||
|
||||
func (s accessScope) forManageLookup() accessScope {
|
||||
if s.CanManageAll {
|
||||
return s
|
||||
}
|
||||
s.CanViewAll = false
|
||||
return s
|
||||
}
|
||||
|
||||
func appendUniqueString(items []string, value string) []string {
|
||||
value = strings.TrimSpace(value)
|
||||
if value == "" {
|
||||
return items
|
||||
}
|
||||
for _, item := range items {
|
||||
if item == value {
|
||||
return items
|
||||
}
|
||||
}
|
||||
return append(items, value)
|
||||
}
|
||||
|
||||
func (s accessScope) departmentIDs() []string {
|
||||
out := make([]string, 0, len(s.DeptIDs)+1)
|
||||
for _, id := range s.DeptIDs {
|
||||
out = appendUniqueString(out, id)
|
||||
}
|
||||
out = appendUniqueString(out, s.DeptID)
|
||||
return out
|
||||
}
|
||||
|
||||
func (s accessScope) primaryDepartmentID() string {
|
||||
if strings.TrimSpace(s.DeptID) != "" {
|
||||
return strings.TrimSpace(s.DeptID)
|
||||
}
|
||||
ids := s.departmentIDs()
|
||||
if len(ids) == 0 {
|
||||
return ""
|
||||
}
|
||||
return ids[0]
|
||||
}
|
||||
|
||||
func appendDepartmentFilter(args []any, scope accessScope, column string) ([]any, string) {
|
||||
ids := scope.departmentIDs()
|
||||
if len(ids) == 0 {
|
||||
ids = []string{"__no_department_scope__"}
|
||||
}
|
||||
args = append(args, ids)
|
||||
return args, fmt.Sprintf(" AND %s::text = ANY($%d::text[])", column, len(args))
|
||||
}
|
||||
|
||||
type rowScanner interface {
|
||||
@@ -1872,6 +2368,7 @@ func loadConfig() config {
|
||||
LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second,
|
||||
AIServiceURL: env("AI_SERVICE_URL", ""),
|
||||
AIServiceToken: env("AI_SERVICE_TOKEN", ""),
|
||||
InternalAPIKey: env("INTERNAL_API_KEY", env("PORTAL_INTERNAL_API_KEY", "")),
|
||||
MinioEndpoint: env("MINIO_ENDPOINT", ""),
|
||||
MinioAccessKey: env("MINIO_ACCESS_KEY", ""),
|
||||
MinioSecretKey: env("MINIO_SECRET_KEY", ""),
|
||||
|
||||
1
go.mod
1
go.mod
@@ -3,6 +3,7 @@ module monitoring-tg
|
||||
go 1.25.7
|
||||
|
||||
require (
|
||||
gitea.estateliga.work/admin/portal-common v0.3.0
|
||||
github.com/jackc/pgx/v5 v5.9.1
|
||||
github.com/minio/minio-go/v7 v7.2.0
|
||||
)
|
||||
|
||||
2
go.sum
2
go.sum
@@ -1,3 +1,5 @@
|
||||
gitea.estateliga.work/admin/portal-common v0.3.0 h1:xpr9UeLXk5pCcNXcTVGZzJZr0Ni7An7DV0OkuYv9qVM=
|
||||
gitea.estateliga.work/admin/portal-common v0.3.0/go.mod h1:C860q6g38KVMsv+mKv6k1Vm7smVRCycl+N6r63TElnk=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
|
||||
@@ -83,6 +83,41 @@ type ProviderStatus struct {
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type Stats struct {
|
||||
At time.Time `json:"at"`
|
||||
Owners []OwnerStat `json:"owners,omitempty"`
|
||||
Errors []ErrorStat `json:"errors,omitempty"`
|
||||
Backlog []BacklogStat `json:"backlog,omitempty"`
|
||||
}
|
||||
|
||||
type OwnerStat struct {
|
||||
OwnerService string `json:"owner_service"`
|
||||
TaskType string `json:"task_type"`
|
||||
ModelProfile string `json:"model_profile"`
|
||||
Status string `json:"status"`
|
||||
Total int64 `json:"total"`
|
||||
}
|
||||
|
||||
type ErrorStat struct {
|
||||
OwnerService string `json:"owner_service,omitempty"`
|
||||
TaskType string `json:"task_type"`
|
||||
ModelProfile string `json:"model_profile"`
|
||||
ErrorCode string `json:"error_code"`
|
||||
Total int64 `json:"total"`
|
||||
Last24h int64 `json:"last_24h"`
|
||||
}
|
||||
|
||||
type BacklogStat struct {
|
||||
OwnerService string `json:"owner_service"`
|
||||
TaskType string `json:"task_type"`
|
||||
ModelProfile string `json:"model_profile"`
|
||||
Pending int64 `json:"pending"`
|
||||
Running int64 `json:"running"`
|
||||
StaleRunning int64 `json:"stale_running"`
|
||||
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
|
||||
OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"`
|
||||
}
|
||||
|
||||
func New(baseURL, token string, timeout time.Duration) *Client {
|
||||
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
|
||||
if baseURL == "" {
|
||||
@@ -223,6 +258,29 @@ func (c *Client) ProvidersStatus(ctx context.Context) (*ProvidersStatus, error)
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
func (c *Client) Stats(ctx context.Context) (*Stats, error) {
|
||||
if c == nil {
|
||||
return nil, fmt.Errorf("ai-service not configured")
|
||||
}
|
||||
req, err := c.request(ctx, http.MethodGet, "/api/v1/stats", nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ai stats: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return nil, fmt.Errorf("ai stats: http %d: %s", resp.StatusCode, readSmall(resp.Body))
|
||||
}
|
||||
var out Stats
|
||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||
return nil, fmt.Errorf("decode ai stats: %w", err)
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) {
|
||||
var r io.Reader
|
||||
if body != nil {
|
||||
|
||||
44
internal/dbretry/dbretry.go
Normal file
44
internal/dbretry/dbretry.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package dbretry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
func Connect(ctx context.Context, databaseURL string, maxWait time.Duration) (*pgxpool.Pool, error) {
|
||||
deadline := time.Now().Add(maxWait)
|
||||
var lastErr error
|
||||
|
||||
for attempt := 1; ; attempt++ {
|
||||
pool, err := pgxpool.New(ctx, databaseURL)
|
||||
if err == nil {
|
||||
if pingErr := pool.Ping(ctx); pingErr == nil {
|
||||
return pool, nil
|
||||
} else {
|
||||
err = fmt.Errorf("ping postgres: %w", pingErr)
|
||||
pool.Close()
|
||||
}
|
||||
} else {
|
||||
err = fmt.Errorf("connect postgres: %w", err)
|
||||
}
|
||||
lastErr = err
|
||||
|
||||
if time.Now().After(deadline) {
|
||||
return nil, fmt.Errorf("connect postgres after retry: %w", lastErr)
|
||||
}
|
||||
sleep := time.Duration(attempt) * time.Second
|
||||
if sleep > 5*time.Second {
|
||||
sleep = 5 * time.Second
|
||||
}
|
||||
timer := time.NewTimer(sleep)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return nil, fmt.Errorf("connect postgres cancelled: %w", ctx.Err())
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ stringData:
|
||||
TG_PHONE: "+971524994695"
|
||||
TG_SESSION_STRING: ""
|
||||
POSTGRES_PASSWORD: "parser"
|
||||
INTERNAL_API_KEY: "36fe89ed40c01fdc54d3cf4e3fcacc8751dc456a4a1acd394e9fed48257c5734"
|
||||
AI_SERVICE_TOKEN: "d18bcacf9e02bae1806ee6b6eeda62b95be6a915c0a22936d9a700128b275442"
|
||||
MINIO_ACCESS_KEY: "admjn"
|
||||
MINIO_SECRET_KEY: "TropicalMacaw9Fantasize"
|
||||
|
||||
@@ -13,6 +13,19 @@ def portal_department_id(request: Request) -> str | None:
|
||||
return value or None
|
||||
|
||||
|
||||
def portal_department_ids(request: Request) -> list[str]:
|
||||
raw = (request.headers.get("x-user-department-ids") or "").strip()
|
||||
out: list[str] = []
|
||||
for part in raw.split(","):
|
||||
value = part.strip()
|
||||
if value and value not in out:
|
||||
out.append(value)
|
||||
current = portal_department_id(request)
|
||||
if current and current not in out:
|
||||
out.append(current)
|
||||
return out
|
||||
|
||||
|
||||
def is_department_head_request(request: Request) -> bool:
|
||||
return request.headers.get("x-user-is-department-head") == "1"
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from parser_bot.access import (
|
||||
is_admin_request,
|
||||
portal_department_id,
|
||||
portal_department_ids,
|
||||
require_department_manager,
|
||||
require_telegram_auth_manager,
|
||||
)
|
||||
@@ -39,13 +39,13 @@ class AuthCodeResult(BaseModel):
|
||||
needs_password: bool
|
||||
|
||||
|
||||
def _department_scope(request: Request) -> str | None:
|
||||
def _department_scopes(request: Request) -> list[str] | None:
|
||||
if is_admin_request(request):
|
||||
return None
|
||||
dept_id = portal_department_id(request)
|
||||
if not dept_id:
|
||||
dept_ids = portal_department_ids(request)
|
||||
if not dept_ids:
|
||||
raise HTTPException(status_code=403, detail="department is required")
|
||||
return dept_id
|
||||
return dept_ids
|
||||
|
||||
|
||||
async def _require_channel_scope(
|
||||
@@ -55,7 +55,7 @@ async def _require_channel_scope(
|
||||
vertical: str | None,
|
||||
section: str | None,
|
||||
) -> None:
|
||||
department_id = _department_scope(request)
|
||||
department_ids = _department_scopes(request)
|
||||
stmt = (
|
||||
select(Channel.id)
|
||||
.join(Section, Section.id == Channel.section_id)
|
||||
@@ -65,8 +65,8 @@ async def _require_channel_scope(
|
||||
stmt = stmt.where(Channel.vertical == vertical)
|
||||
if section:
|
||||
stmt = stmt.where(Section.slug == section)
|
||||
if department_id is not None:
|
||||
stmt = stmt.where(Section.department_id == department_id)
|
||||
if department_ids is not None:
|
||||
stmt = stmt.where(Section.department_id.in_(department_ids))
|
||||
exists = (await session.execute(stmt)).scalar_one_or_none()
|
||||
if exists is None:
|
||||
raise HTTPException(status_code=404)
|
||||
@@ -182,7 +182,7 @@ async def trigger_poll_all(
|
||||
section: str | None = Query(None),
|
||||
session: AsyncSession = Depends(get_session),
|
||||
) -> dict[str, Any]:
|
||||
department_id = _department_scope(request)
|
||||
department_ids = _department_scopes(request)
|
||||
stmt = (
|
||||
select(Channel.id)
|
||||
.join(Section, Section.id == Channel.section_id)
|
||||
@@ -190,8 +190,8 @@ async def trigger_poll_all(
|
||||
)
|
||||
if section:
|
||||
stmt = stmt.where(Section.slug == section)
|
||||
if department_id is not None:
|
||||
stmt = stmt.where(Section.department_id == department_id)
|
||||
if department_ids is not None:
|
||||
stmt = stmt.where(Section.department_id.in_(department_ids))
|
||||
result = await session.execute(stmt)
|
||||
ids = [row[0] for row in result.all()]
|
||||
background.add_task(_poll_all_in_background, ids)
|
||||
|
||||
@@ -78,6 +78,10 @@ class Channel(Base):
|
||||
is_active: Mapped[bool] = mapped_column(default=True, server_default="true")
|
||||
last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
|
||||
last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
||||
last_poll_status: Mapped[str | None] = mapped_column(String(32), nullable=True)
|
||||
last_poll_error_code: Mapped[str | None] = mapped_column(String(64), nullable=True)
|
||||
last_poll_error: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
last_poll_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), server_default=func.now()
|
||||
)
|
||||
|
||||
@@ -2,7 +2,7 @@ from datetime import datetime, timezone
|
||||
|
||||
import structlog
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy import func, or_, select
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
@@ -83,6 +83,17 @@ def _translate_telegram_error(exc: Exception, identifier: str) -> PollError:
|
||||
|
||||
async def poll_channel(channel_id: int) -> int:
|
||||
"""Poll one channel for new messages. Returns count of inserted rows."""
|
||||
try:
|
||||
return await _poll_channel(channel_id)
|
||||
except PollError as exc:
|
||||
await _record_poll_error(channel_id, exc.code, exc.message)
|
||||
raise
|
||||
except Exception as exc:
|
||||
await _record_poll_error(channel_id, "poll_failed", str(exc))
|
||||
raise
|
||||
|
||||
|
||||
async def _poll_channel(channel_id: int) -> int:
|
||||
if not await is_authorized():
|
||||
raise PollUnauthorizedError(
|
||||
"Telegram is not authorized: open Monitoring TG in Portal and authorize it"
|
||||
@@ -99,6 +110,10 @@ async def poll_channel(channel_id: int) -> int:
|
||||
channel.title = channel.title or source.title
|
||||
channel.last_message_id = source.last_message_id
|
||||
channel.last_polled_at = source.last_polled_at
|
||||
channel.last_poll_status = "alias"
|
||||
channel.last_poll_error_code = None
|
||||
channel.last_poll_error = None
|
||||
channel.last_poll_error_at = None
|
||||
return 0
|
||||
|
||||
if channel.tg_id is None or channel.title is None:
|
||||
@@ -122,6 +137,10 @@ async def poll_channel(channel_id: int) -> int:
|
||||
channel.title = channel.title or resolved.title or (source.title if source else None)
|
||||
channel.last_message_id = source.last_message_id if source else channel.last_message_id
|
||||
channel.last_polled_at = source.last_polled_at if source else channel.last_polled_at
|
||||
channel.last_poll_status = "alias"
|
||||
channel.last_poll_error_code = None
|
||||
channel.last_poll_error = None
|
||||
channel.last_poll_error_at = None
|
||||
log.info(
|
||||
"linked_channel_alias",
|
||||
channel_id=channel.id,
|
||||
@@ -180,6 +199,10 @@ async def poll_channel(channel_id: int) -> int:
|
||||
channel.last_message_id or 0, msgs[-1].tg_message_id
|
||||
)
|
||||
channel.last_polled_at = datetime.now(timezone.utc)
|
||||
channel.last_poll_status = "ok"
|
||||
channel.last_poll_error_code = None
|
||||
channel.last_poll_error = None
|
||||
channel.last_poll_error_at = None
|
||||
|
||||
log.info(
|
||||
"polled_channel",
|
||||
@@ -191,6 +214,17 @@ async def poll_channel(channel_id: int) -> int:
|
||||
return inserted
|
||||
|
||||
|
||||
async def _record_poll_error(channel_id: int, code: str, message: str) -> None:
|
||||
async with session_scope() as session:
|
||||
channel = await session.get(Channel, channel_id)
|
||||
if channel is None:
|
||||
return
|
||||
channel.last_poll_status = "error"
|
||||
channel.last_poll_error_code = code
|
||||
channel.last_poll_error = message[:1000]
|
||||
channel.last_poll_error_at = datetime.now(timezone.utc)
|
||||
|
||||
|
||||
async def poll_all() -> None:
|
||||
if not await is_authorized():
|
||||
log.debug("poll_skipped_not_authorized")
|
||||
@@ -228,10 +262,15 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
|
||||
if channel is None:
|
||||
raise RuntimeError("channel not found")
|
||||
|
||||
missing_media_condition = or_(
|
||||
Message.media_files.is_(None),
|
||||
func.jsonb_array_length(Message.media_files) == 0,
|
||||
)
|
||||
|
||||
pending_q = select(func.count(Message.id)).where(
|
||||
Message.channel_id == channel_id,
|
||||
Message.has_media.is_(True),
|
||||
Message.media_files.is_(None),
|
||||
missing_media_condition,
|
||||
)
|
||||
pending_total = (await session.execute(pending_q)).scalar_one()
|
||||
|
||||
@@ -241,7 +280,7 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
|
||||
.where(
|
||||
Message.channel_id == channel_id,
|
||||
Message.has_media.is_(True),
|
||||
Message.media_files.is_(None),
|
||||
missing_media_condition,
|
||||
)
|
||||
.order_by(Message.tg_message_id.asc())
|
||||
.limit(batch_size)
|
||||
|
||||
Reference in New Issue
Block a user