Compare commits

...

8 Commits

Author SHA1 Message Date
Grendgi
d0f8b48869 feat: add monitoring tg view-all scope
All checks were successful
CI / hygiene (push) Successful in 1s
Build and Deploy / build-and-deploy (push) Successful in 28s
CI / go (push) Successful in 21s
CI / python (push) Successful in 1s
2026-06-19 14:26:51 +03:00
Grendgi
cdbdea250d feat: expose monitoring tg media repair details
All checks were successful
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 45s
CI / go (push) Successful in 31s
CI / python (push) Successful in 2s
2026-06-17 17:22:50 +03:00
Grendgi
696b7eda6f feat: expose monitoring tg poll diagnostics 2026-06-17 17:19:13 +03:00
Grendgi
bd3b54dc7d feat: track monitoring tg poll errors 2026-06-17 17:01:34 +03:00
Grendgi
6cbfdb92e4 feat: expose monitoring tg ai job health 2026-06-17 16:58:47 +03:00
Grendgi
7cc5109cba feat: expose monitoring tg health detail 2026-06-17 15:55:28 +03:00
Grendgi
90c0a346ed chore: use common internal auth 2026-06-17 14:26:04 +03:00
Grendgi
7f7f2427cb chore: use common header parsing 2026-06-17 14:19:13 +03:00
7 changed files with 596 additions and 69 deletions

View File

@@ -0,0 +1,34 @@
"""channel poll status
Revision ID: 0013
Revises: 0012
Create Date: 2026-06-17
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0013"
down_revision: Union[str, None] = "0012"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column("channels", sa.Column("last_poll_status", sa.String(length=32), nullable=True))
op.add_column("channels", sa.Column("last_poll_error_code", sa.String(length=64), nullable=True))
op.add_column("channels", sa.Column("last_poll_error", sa.Text(), nullable=True))
op.add_column("channels", sa.Column("last_poll_error_at", sa.DateTime(timezone=True), nullable=True))
op.create_index("ix_channels_last_poll_status", "channels", ["last_poll_status"])
op.create_index("ix_channels_last_poll_error_code", "channels", ["last_poll_error_code"])
def downgrade() -> None:
op.drop_index("ix_channels_last_poll_error_code", table_name="channels")
op.drop_index("ix_channels_last_poll_status", table_name="channels")
op.drop_column("channels", "last_poll_error_at")
op.drop_column("channels", "last_poll_error")
op.drop_column("channels", "last_poll_error_code")
op.drop_column("channels", "last_poll_status")

View File

@@ -23,6 +23,7 @@ import (
"monitoring-tg/internal/aiservice" "monitoring-tg/internal/aiservice"
"monitoring-tg/internal/dbretry" "monitoring-tg/internal/dbretry"
commonmw "gitea.estateliga.work/admin/portal-common/middleware"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7"
@@ -72,7 +73,9 @@ type app struct {
type accessScope struct { type accessScope struct {
IsAdmin bool IsAdmin bool
CanManage bool CanManage bool
CanManageAll bool
CanAuth bool CanAuth bool
CanViewAll bool
DeptID string DeptID string
DeptIDs []string DeptIDs []string
} }
@@ -129,6 +132,14 @@ type messageOut struct {
FetchedAt time.Time `json:"fetched_at"` FetchedAt time.Time `json:"fetched_at"`
} }
type componentProbe struct {
Name string `json:"name"`
Status string `json:"status"`
LatencyMs int64 `json:"latency_ms"`
Error string `json:"error,omitempty"`
Details any `json:"details,omitempty"`
}
func main() { func main() {
cfg := loadConfig() cfg := loadConfig()
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
@@ -185,6 +196,10 @@ func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
return return
} }
if path == "/health/detail" {
a.handleHealthDetail(w, r)
return
}
if path == "/" { if path == "/" {
writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-tg", "ui": "portal", "api": "go"}) writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-tg", "ui": "portal", "api": "go"})
return return
@@ -193,10 +208,11 @@ func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusNotFound, "not found") writeError(w, http.StatusNotFound, "not found")
return return
} }
if !a.checkInternalAuth(w, r) { commonmw.InternalAuth(a.cfg.InternalAPIKey)(http.HandlerFunc(a.serveAPI)).ServeHTTP(w, r)
return
} }
func (a *app) serveAPI(w http.ResponseWriter, r *http.Request) {
path := a.apiPath(r.URL.Path)
ctx := r.Context() ctx := r.Context()
switch { switch {
case r.Method == http.MethodGet && path == "/api/v1/access/me": case r.Method == http.MethodGet && path == "/api/v1/access/me":
@@ -232,18 +248,6 @@ func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
func (a *app) checkInternalAuth(w http.ResponseWriter, r *http.Request) bool {
want := strings.TrimSpace(a.cfg.InternalAPIKey)
if want == "" {
return true
}
if r.Header.Get("X-Internal-Key") != want {
writeError(w, http.StatusUnauthorized, "unauthorized")
return false
}
return true
}
func (a *app) apiPath(path string) string { func (a *app) apiPath(path string) string {
base := strings.TrimRight(a.cfg.PublicBasePath, "/") base := strings.TrimRight(a.cfg.PublicBasePath, "/")
if base != "" && strings.HasPrefix(path, base+"/") { if base != "" && strings.HasPrefix(path, base+"/") {
@@ -255,12 +259,390 @@ func (a *app) apiPath(path string) string {
return path return path
} }
func (a *app) handleHealthDetail(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
defer cancel()
components := []componentProbe{
a.probePostgres(ctx),
a.probeAIService(ctx),
a.probeClassificationQueue(ctx),
a.probeAIJobs(ctx),
a.probePoller(ctx),
a.probePollErrors(ctx),
a.probeMediaStorage(ctx),
a.probeMediaMetadata(ctx),
}
writeJSON(w, http.StatusOK, map[string]any{"components": components})
}
func (a *app) probePostgres(ctx context.Context) componentProbe {
start := time.Now()
if err := a.db.Ping(ctx); err != nil {
return componentProbe{Name: "postgres", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
return componentProbe{Name: "postgres", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) probeAIService(ctx context.Context) componentProbe {
start := time.Now()
if !a.cfg.LLMEnabled {
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm disabled"}
}
status, err := a.ai.ProvidersStatus(ctx)
if err != nil {
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
for _, provider := range status.Providers {
if provider.Name != "llm" {
continue
}
if provider.Configured && provider.OK {
return componentProbe{Name: "ai_service", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
errMsg := strings.TrimSpace(provider.Error)
if errMsg == "" {
errMsg = "llm provider is not ready"
}
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: errMsg}
}
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm provider not found"}
}
func (a *app) probeClassificationQueue(ctx context.Context) componentProbe {
start := time.Now()
var pending, pending24h int64
err := a.db.QueryRow(ctx, `
SELECT
COUNT(*)::bigint,
COUNT(*) FILTER (WHERE m.fetched_at >= now() - interval '24 hours')::bigint
FROM channels c
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
JOIN messages m ON m.channel_id = src.id
JOIN sections s ON s.id = c.section_id
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
WHERE m.text IS NOT NULL
AND mc.id IS NULL`,
).Scan(&pending, &pending24h)
if err != nil {
return componentProbe{Name: "classification_queue", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
if pending > 0 {
return componentProbe{
Name: "classification_queue",
Status: "down",
LatencyMs: time.Since(start).Milliseconds(),
Error: "pending=" + strconv.FormatInt(pending, 10) + " pending_24h=" + strconv.FormatInt(pending24h, 10),
}
}
return componentProbe{Name: "classification_queue", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) probeAIJobs(ctx context.Context) componentProbe {
start := time.Now()
if !a.cfg.LLMEnabled {
return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm disabled"}
}
stats, err := a.ai.Stats(ctx)
if err != nil {
return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
var pending, running, staleRunning, failed, failed24h int64
for _, row := range stats.Backlog {
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
continue
}
pending += row.Pending
running += row.Running
staleRunning += row.StaleRunning
}
for _, row := range stats.Owners {
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
continue
}
if row.Status == "failed" {
failed += row.Total
}
}
for _, row := range stats.Errors {
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
continue
}
failed24h += row.Last24h
}
if staleRunning > 0 || failed24h > 0 {
return componentProbe{
Name: "ai_jobs",
Status: "down",
LatencyMs: time.Since(start).Milliseconds(),
Error: "pending=" + strconv.FormatInt(pending, 10) +
" running=" + strconv.FormatInt(running, 10) +
" stale_running=" + strconv.FormatInt(staleRunning, 10) +
" failed=" + strconv.FormatInt(failed, 10) +
" failed_24h=" + strconv.FormatInt(failed24h, 10),
}
}
if pending > 0 || running > 0 || failed > 0 {
return componentProbe{
Name: "ai_jobs",
Status: "degraded",
LatencyMs: time.Since(start).Milliseconds(),
Error: "pending=" + strconv.FormatInt(pending, 10) +
" running=" + strconv.FormatInt(running, 10) +
" stale_running=" + strconv.FormatInt(staleRunning, 10) +
" failed=" + strconv.FormatInt(failed, 10),
}
}
return componentProbe{Name: "ai_jobs", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) probePoller(ctx context.Context) componentProbe {
start := time.Now()
staleAfter := maxInt(a.cfg.PollIntervalSeconds*3, 900)
var active, neverPolled, stale int64
err := a.db.QueryRow(ctx, `
SELECT
COUNT(*)::bigint,
COUNT(*) FILTER (WHERE last_polled_at IS NULL)::bigint,
COUNT(*) FILTER (WHERE last_polled_at IS NOT NULL AND last_polled_at < now() - ($1::int * interval '1 second'))::bigint
FROM channels
WHERE is_active = true
AND source_channel_id IS NULL`,
staleAfter,
).Scan(&active, &neverPolled, &stale)
if err != nil {
return componentProbe{Name: "poller", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
if active > 0 && (neverPolled > 0 || stale > 0) {
return componentProbe{
Name: "poller",
Status: "down",
LatencyMs: time.Since(start).Milliseconds(),
Error: "active=" + strconv.FormatInt(active, 10) +
" never_polled=" + strconv.FormatInt(neverPolled, 10) +
" stale=" + strconv.FormatInt(stale, 10) +
" stale_after_sec=" + strconv.Itoa(staleAfter),
}
}
return componentProbe{Name: "poller", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) probePollErrors(ctx context.Context) componentProbe {
start := time.Now()
var total, floodWait, unavailable, other int64
err := a.db.QueryRow(ctx, `
SELECT
COUNT(*) FILTER (WHERE last_poll_status = 'error')::bigint,
COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_flood_wait')::bigint,
COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_channel_unavailable')::bigint,
COUNT(*) FILTER (
WHERE last_poll_status = 'error'
AND COALESCE(last_poll_error_code, '') NOT IN ('telegram_flood_wait', 'telegram_channel_unavailable')
)::bigint
FROM channels
WHERE is_active = true
AND source_channel_id IS NULL`,
).Scan(&total, &floodWait, &unavailable, &other)
if err != nil {
return componentProbe{Name: "poll_errors", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
if total > 0 {
recent, recentErr := a.recentPollErrors(ctx)
status := "degraded"
if other > 0 {
status = "down"
}
details := map[string]any{
"total": total,
"flood_wait": floodWait,
"unavailable": unavailable,
"other": other,
}
if recentErr != nil {
details["recent_error"] = recentErr.Error()
} else {
details["recent_errors"] = recent
}
return componentProbe{
Name: "poll_errors",
Status: status,
LatencyMs: time.Since(start).Milliseconds(),
Error: "total=" + strconv.FormatInt(total, 10) +
" flood_wait=" + strconv.FormatInt(floodWait, 10) +
" unavailable=" + strconv.FormatInt(unavailable, 10) +
" other=" + strconv.FormatInt(other, 10),
Details: details,
}
}
return componentProbe{Name: "poll_errors", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) recentPollErrors(ctx context.Context) ([]map[string]any, error) {
rows, err := a.db.Query(ctx, `
SELECT
c.id,
c.identifier,
COALESCE(c.title, '') AS title,
s.slug,
s.title AS section_title,
COALESCE(c.last_poll_error_code, '') AS error_code,
COALESCE(c.last_poll_error, '') AS error_text,
c.last_poll_error_at
FROM channels c
LEFT JOIN sections s ON s.id = c.section_id
WHERE c.is_active = true
AND c.source_channel_id IS NULL
AND c.last_poll_status = 'error'
ORDER BY c.last_poll_error_at DESC NULLS LAST, c.id DESC
LIMIT 5`)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]map[string]any, 0, 5)
for rows.Next() {
var (
id int64
identifier string
title string
sectionSlug sql.NullString
sectionTitle sql.NullString
code string
text string
at sql.NullTime
)
if err := rows.Scan(&id, &identifier, &title, &sectionSlug, &sectionTitle, &code, &text, &at); err != nil {
return nil, err
}
out = append(out, map[string]any{
"channel_id": id,
"identifier": identifier,
"title": nullableString(title),
"section_slug": nullString(sectionSlug),
"section_title": nullString(sectionTitle),
"error_code": nullableString(code),
"error": nullableString(text),
"error_at": nullTime(at),
})
}
return out, rows.Err()
}
func (a *app) probeMediaStorage(ctx context.Context) componentProbe {
start := time.Now()
if a.minio == nil || a.cfg.MinioBucket == "" {
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "minio not configured"}
}
exists, err := a.minio.BucketExists(ctx, a.cfg.MinioBucket)
if err != nil {
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
if !exists {
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "bucket not found: " + a.cfg.MinioBucket}
}
return componentProbe{Name: "media_storage", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) probeMediaMetadata(ctx context.Context) componentProbe {
start := time.Now()
var withMedia, missingFiles int64
err := a.db.QueryRow(ctx, `
SELECT
COUNT(*) FILTER (WHERE has_media = true)::bigint,
COUNT(*) FILTER (
WHERE has_media = true
AND jsonb_array_length(COALESCE(media_files, '[]'::jsonb)) = 0
)::bigint
FROM messages`,
).Scan(&withMedia, &missingFiles)
if err != nil {
return componentProbe{Name: "media_metadata", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
}
if missingFiles > 0 {
recent, recentErr := a.recentMissingMedia(ctx)
details := map[string]any{
"messages_with_media": withMedia,
"missing_media_files": missingFiles,
}
if recentErr != nil {
details["recent_error"] = recentErr.Error()
} else {
details["recent_missing_media"] = recent
}
return componentProbe{
Name: "media_metadata",
Status: "down",
LatencyMs: time.Since(start).Milliseconds(),
Error: "messages_with_media=" + strconv.FormatInt(withMedia, 10) +
" missing_media_files=" + strconv.FormatInt(missingFiles, 10),
Details: details,
}
}
return componentProbe{Name: "media_metadata", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
}
func (a *app) recentMissingMedia(ctx context.Context) ([]map[string]any, error) {
rows, err := a.db.Query(ctx, `
SELECT
m.id,
m.tg_message_id,
m.date,
c.id AS channel_id,
c.identifier,
COALESCE(c.title, '') AS channel_title,
s.slug,
s.title AS section_title
FROM messages m
JOIN channels c ON c.id = m.channel_id
LEFT JOIN sections s ON s.id = c.section_id
WHERE m.has_media = true
AND jsonb_array_length(COALESCE(m.media_files, '[]'::jsonb)) = 0
ORDER BY m.date DESC, m.id DESC
LIMIT 5`)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]map[string]any, 0, 5)
for rows.Next() {
var (
id int64
tgMessageID int64
date time.Time
channelID int64
identifier string
channelTitle string
sectionSlug sql.NullString
sectionTitle sql.NullString
)
if err := rows.Scan(&id, &tgMessageID, &date, &channelID, &identifier, &channelTitle, &sectionSlug, &sectionTitle); err != nil {
return nil, err
}
out = append(out, map[string]any{
"message_id": id,
"tg_message_id": tgMessageID,
"message_date": date,
"channel_id": channelID,
"identifier": identifier,
"channel_title": nullableString(channelTitle),
"section_slug": nullString(sectionSlug),
"section_title": nullString(sectionTitle),
"repair_action": "/api/v1/channels/" + strconv.FormatInt(channelID, 10) + "/backfill-media",
})
}
return out, rows.Err()
}
func (a *app) handleAccessMe(w http.ResponseWriter, r *http.Request) { func (a *app) handleAccessMe(w http.ResponseWriter, r *http.Request) {
scope := readAccess(r) scope := readAccess(r)
writeJSON(w, http.StatusOK, map[string]any{ writeJSON(w, http.StatusOK, map[string]any{
"is_admin": scope.IsAdmin, "is_admin": scope.IsAdmin,
"can_manage_department": scope.CanManage, "can_manage_department": scope.CanManage,
"can_manage_all": scope.CanManageAll,
"can_auth_telegram": scope.CanAuth, "can_auth_telegram": scope.CanAuth,
"can_view_all": scope.CanViewAll,
"department_id": nullableString(scope.DeptID), "department_id": nullableString(scope.DeptID),
"department_ids": scope.departmentIDs(), "department_ids": scope.departmentIDs(),
}) })
@@ -289,7 +671,7 @@ func (a *app) listSections(ctx context.Context, w http.ResponseWriter, r *http.R
args := []any{vertical} args := []any{vertical}
deptFilter := "" deptFilter := ""
if !scope.IsAdmin { if !scope.canReadAll() {
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
} }
@@ -353,6 +735,7 @@ func (a *app) createSection(ctx context.Context, w http.ResponseWriter, r *http.
} }
var payload struct { var payload struct {
Vertical string `json:"vertical"` Vertical string `json:"vertical"`
DepartmentID *string `json:"department_id"`
Slug string `json:"slug"` Slug string `json:"slug"`
Title string `json:"title"` Title string `json:"title"`
Emoji *string `json:"emoji"` Emoji *string `json:"emoji"`
@@ -368,7 +751,11 @@ func (a *app) createSection(ctx context.Context, w http.ResponseWriter, r *http.
writeError(w, http.StatusBadRequest, "vertical, slug and title are required") writeError(w, http.StatusBadRequest, "vertical, slug and title are required")
return return
} }
dept := nullableString(scope.primaryDepartmentID()) deptID := scope.primaryDepartmentID()
if scope.CanManageAll && payload.DepartmentID != nil {
deptID = strings.TrimSpace(*payload.DepartmentID)
}
dept := nullableString(deptID)
row := a.db.QueryRow(ctx, ` row := a.db.QueryRow(ctx, `
INSERT INTO sections (vertical, department_id, slug, title, emoji, description) INSERT INTO sections (vertical, department_id, slug, title, emoji, description)
VALUES ($1, $2, $3, $4, $5, $6) VALUES ($1, $2, $3, $4, $5, $6)
@@ -459,7 +846,7 @@ func (a *app) updateSection(ctx context.Context, w http.ResponseWriter, r *http.
} }
args = append(args, vertical, slug) args = append(args, vertical, slug)
where := fmt.Sprintf("vertical = $%d AND slug = $%d", len(args)-1, len(args)) where := fmt.Sprintf("vertical = $%d AND slug = $%d", len(args)-1, len(args))
if !scope.IsAdmin { if !scope.CanManageAll {
var deptFilter string var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "department_id") args, deptFilter = appendDepartmentFilter(args, scope, "department_id")
where += deptFilter where += deptFilter
@@ -487,7 +874,7 @@ func (a *app) deleteSection(ctx context.Context, w http.ResponseWriter, r *http.
if !ok { if !ok {
return return
} }
section, err := a.findSection(ctx, vertical, slug, scope) section, err := a.findSection(ctx, vertical, slug, scope.forManageLookup())
if err != nil { if err != nil {
writeDBError(w, err) writeDBError(w, err)
return return
@@ -511,7 +898,7 @@ func (a *app) deleteSection(ctx context.Context, w http.ResponseWriter, r *http.
func (a *app) findSection(ctx context.Context, vertical, slug string, scope accessScope) (sectionOut, error) { func (a *app) findSection(ctx context.Context, vertical, slug string, scope accessScope) (sectionOut, error) {
args := []any{vertical, slug} args := []any{vertical, slug}
where := "s.vertical = $1 AND s.slug = $2" where := "s.vertical = $1 AND s.slug = $2"
if !scope.IsAdmin { if !scope.canReadAll() {
var deptFilter string var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter where += deptFilter
@@ -555,7 +942,7 @@ func (a *app) listChannels(ctx context.Context, w http.ResponseWriter, r *http.R
args = append(args, section) args = append(args, section)
where += fmt.Sprintf(" AND s.slug = $%d", len(args)) where += fmt.Sprintf(" AND s.slug = $%d", len(args))
} }
if !scope.IsAdmin { if !scope.canReadAll() {
var deptFilter string var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter where += deptFilter
@@ -612,7 +999,7 @@ func (a *app) createChannel(ctx context.Context, w http.ResponseWriter, r *http.
writeError(w, http.StatusBadRequest, "identifier, vertical and section are required") writeError(w, http.StatusBadRequest, "identifier, vertical and section are required")
return return
} }
section, err := a.findSection(ctx, payload.Vertical, payload.Section, scope) section, err := a.findSection(ctx, payload.Vertical, payload.Section, scope.forManageLookup())
if err != nil { if err != nil {
writeDBError(w, err) writeDBError(w, err)
return return
@@ -705,7 +1092,7 @@ func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http.
if !ok { if !ok {
return return
} }
if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil { if _, err := a.findChannel(ctx, id, scope.forManageLookup(), r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil {
writeDBError(w, err) writeDBError(w, err)
return return
} }
@@ -732,7 +1119,7 @@ func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http.
if payload.Vertical != nil && strings.TrimSpace(*payload.Vertical) != "" { if payload.Vertical != nil && strings.TrimSpace(*payload.Vertical) != "" {
vertical = strings.TrimSpace(*payload.Vertical) vertical = strings.TrimSpace(*payload.Vertical)
} }
section, err := a.findSection(ctx, vertical, strings.TrimSpace(*payload.Section), scope) section, err := a.findSection(ctx, vertical, strings.TrimSpace(*payload.Section), scope.forManageLookup())
if err != nil { if err != nil {
writeDBError(w, err) writeDBError(w, err)
return return
@@ -766,7 +1153,7 @@ func (a *app) deleteChannel(ctx context.Context, w http.ResponseWriter, r *http.
if !ok { if !ok {
return return
} }
if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil { if _, err := a.findChannel(ctx, id, scope.forManageLookup(), r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil {
writeDBError(w, err) writeDBError(w, err)
return return
} }
@@ -788,7 +1175,7 @@ func (a *app) findChannel(ctx context.Context, id int64, scope accessScope, vert
args = append(args, strings.TrimSpace(section)) args = append(args, strings.TrimSpace(section))
where += fmt.Sprintf(" AND s.slug = $%d", len(args)) where += fmt.Sprintf(" AND s.slug = $%d", len(args))
} }
if !scope.IsAdmin { if !scope.canReadAll() {
var deptFilter string var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter where += deptFilter
@@ -856,7 +1243,7 @@ func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *ht
if !ok { if !ok {
return return
} }
ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")) ch, err := a.findChannel(ctx, id, scope.forManageLookup(), r.URL.Query().Get("vertical"), r.URL.Query().Get("section"))
if err != nil { if err != nil {
writeDBError(w, err) writeDBError(w, err)
return return
@@ -938,7 +1325,7 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http
args = append(args, key, field) args = append(args, key, field)
where += fmt.Sprintf(" AND COALESCE(mc.verdict ->> $%d, m.extracted -> $%d ->> $%d) = 'true'", len(args), len(args)-1, len(args)) where += fmt.Sprintf(" AND COALESCE(mc.verdict ->> $%d, m.extracted -> $%d ->> $%d) = 'true'", len(args), len(args)-1, len(args))
} }
if !scope.IsAdmin { if !scope.canReadAll() {
var deptFilter string var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter where += deptFilter
@@ -1007,7 +1394,7 @@ func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *h
} }
args := []any{id} args := []any{id}
where := "m.id = $1" where := "m.id = $1"
if !scope.IsAdmin { if !scope.canReadAll() {
var deptFilter string var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter where += deptFilter
@@ -1130,7 +1517,7 @@ func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channe
FROM channels c FROM channels c
JOIN sections s ON s.id = c.section_id JOIN sections s ON s.id = c.section_id
WHERE c.id = $1 OR c.source_channel_id = $1 WHERE c.id = $1 OR c.source_channel_id = $1
`, channelID, scope.departmentIDs(), scope.IsAdmin).Scan(&allowed) `, channelID, scope.departmentIDs(), scope.canReadAll()).Scan(&allowed)
if errors.Is(err, pgx.ErrNoRows) { if errors.Is(err, pgx.ErrNoRows) {
return false, nil return false, nil
} }
@@ -1160,7 +1547,7 @@ func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Re
args = append(args, section) args = append(args, section)
where += fmt.Sprintf(" AND s.slug = $%d", len(args)) where += fmt.Sprintf(" AND s.slug = $%d", len(args))
} }
if !scope.IsAdmin { if !scope.canReadAll() {
var deptFilter string var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter where += deptFilter
@@ -1279,7 +1666,7 @@ func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, secti
args = append(args, section) args = append(args, section)
where += fmt.Sprintf(" AND s.slug = $%d", len(args)) where += fmt.Sprintf(" AND s.slug = $%d", len(args))
} }
if !scope.IsAdmin { if !scope.canReadAll() {
var deptFilter string var deptFilter string
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id") args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
where += deptFilter where += deptFilter
@@ -1370,7 +1757,7 @@ func (a *app) savePrompt(ctx context.Context, w http.ResponseWriter, r *http.Req
writeError(w, http.StatusBadRequest, "prompt is too long (max 30000 chars)") writeError(w, http.StatusBadRequest, "prompt is too long (max 30000 chars)")
return return
} }
deptID, err := a.promptDepartmentID(ctx, scope, vertical, section) deptID, err := a.promptDepartmentID(ctx, scope.forManageLookup(), vertical, section)
if err != nil { if err != nil {
writeDBError(w, err) writeDBError(w, err)
return return
@@ -1398,7 +1785,7 @@ func (a *app) resetPrompt(ctx context.Context, w http.ResponseWriter, r *http.Re
return return
} }
section := strings.TrimSpace(r.URL.Query().Get("section")) section := strings.TrimSpace(r.URL.Query().Get("section"))
deptID, err := a.promptDepartmentID(ctx, scope, vertical, section) deptID, err := a.promptDepartmentID(ctx, scope.forManageLookup(), vertical, section)
if err != nil { if err != nil {
writeDBError(w, err) writeDBError(w, err)
return return
@@ -1503,11 +1890,11 @@ func (a *app) readScope(w http.ResponseWriter, r *http.Request, manage bool) (ac
writeError(w, http.StatusNotFound, "not found") writeError(w, http.StatusNotFound, "not found")
return scope, false return scope, false
} }
} else if !scope.IsAdmin && len(scope.departmentIDs()) == 0 { } else if !scope.canReadAll() && len(scope.departmentIDs()) == 0 {
writeError(w, http.StatusForbidden, "department is required") writeError(w, http.StatusForbidden, "department is required")
return scope, false return scope, false
} }
if manage && !scope.IsAdmin && len(scope.departmentIDs()) == 0 { if manage && !scope.CanManageAll && len(scope.departmentIDs()) == 0 {
writeError(w, http.StatusForbidden, "department is required") writeError(w, http.StatusForbidden, "department is required")
return scope, false return scope, false
} }
@@ -1515,35 +1902,37 @@ func (a *app) readScope(w http.ResponseWriter, r *http.Request, manage bool) (ac
} }
func readAccess(r *http.Request) accessScope { func readAccess(r *http.Request) accessScope {
admin := r.Header.Get("X-User-Is-Admin") == "1" admin := commonmw.HeaderBool(r, "X-User-Is-Admin")
deptHead := r.Header.Get("X-User-Is-Department-Head") == "1" deptHead := commonmw.HeaderBool(r, "X-User-Is-Department-Head")
canManage := r.Header.Get("X-Monitoring-TG-Can-Manage") == "1" canManagePermission := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-Manage")
canAuth := r.Header.Get("X-Monitoring-TG-Can-Auth") == "1" canAuth := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-Auth")
canViewAll := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-View-All")
deptID := strings.TrimSpace(r.Header.Get("X-User-Department-Id")) deptID := strings.TrimSpace(r.Header.Get("X-User-Department-Id"))
deptIDs := parseCSVHeader(r.Header.Get("X-User-Department-Ids")) deptIDs := commonmw.HeaderCSV(r, "X-User-Department-Ids")
if deptID != "" { if deptID != "" {
deptIDs = appendUniqueString(deptIDs, deptID) deptIDs = appendUniqueString(deptIDs, deptID)
} }
return accessScope{ return accessScope{
IsAdmin: admin, IsAdmin: admin,
CanManage: admin || deptHead || canManage, CanManage: admin || deptHead || canManagePermission,
CanManageAll: admin || (canManagePermission && canViewAll),
CanAuth: admin || canAuth, CanAuth: admin || canAuth,
CanViewAll: admin || canViewAll,
DeptID: deptID, DeptID: deptID,
DeptIDs: deptIDs, DeptIDs: deptIDs,
} }
} }
func parseCSVHeader(raw string) []string { func (s accessScope) canReadAll() bool {
raw = strings.TrimSpace(raw) return s.IsAdmin || s.CanViewAll
if raw == "" {
return nil
} }
parts := strings.Split(raw, ",")
out := make([]string, 0, len(parts)) func (s accessScope) forManageLookup() accessScope {
for _, part := range parts { if s.CanManageAll {
out = appendUniqueString(out, strings.TrimSpace(part)) return s
} }
return out s.CanViewAll = false
return s
} }
func appendUniqueString(items []string, value string) []string { func appendUniqueString(items []string, value string) []string {

1
go.mod
View File

@@ -3,6 +3,7 @@ module monitoring-tg
go 1.25.7 go 1.25.7
require ( require (
gitea.estateliga.work/admin/portal-common v0.3.0
github.com/jackc/pgx/v5 v5.9.1 github.com/jackc/pgx/v5 v5.9.1
github.com/minio/minio-go/v7 v7.2.0 github.com/minio/minio-go/v7 v7.2.0
) )

2
go.sum
View File

@@ -1,3 +1,5 @@
gitea.estateliga.work/admin/portal-common v0.3.0 h1:xpr9UeLXk5pCcNXcTVGZzJZr0Ni7An7DV0OkuYv9qVM=
gitea.estateliga.work/admin/portal-common v0.3.0/go.mod h1:C860q6g38KVMsv+mKv6k1Vm7smVRCycl+N6r63TElnk=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=

View File

@@ -83,6 +83,41 @@ type ProviderStatus struct {
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
type Stats struct {
At time.Time `json:"at"`
Owners []OwnerStat `json:"owners,omitempty"`
Errors []ErrorStat `json:"errors,omitempty"`
Backlog []BacklogStat `json:"backlog,omitempty"`
}
type OwnerStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Status string `json:"status"`
Total int64 `json:"total"`
}
type ErrorStat struct {
OwnerService string `json:"owner_service,omitempty"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
ErrorCode string `json:"error_code"`
Total int64 `json:"total"`
Last24h int64 `json:"last_24h"`
}
type BacklogStat struct {
OwnerService string `json:"owner_service"`
TaskType string `json:"task_type"`
ModelProfile string `json:"model_profile"`
Pending int64 `json:"pending"`
Running int64 `json:"running"`
StaleRunning int64 `json:"stale_running"`
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"`
}
func New(baseURL, token string, timeout time.Duration) *Client { func New(baseURL, token string, timeout time.Duration) *Client {
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/") baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
if baseURL == "" { if baseURL == "" {
@@ -223,6 +258,29 @@ func (c *Client) ProvidersStatus(ctx context.Context) (*ProvidersStatus, error)
return &out, nil return &out, nil
} }
func (c *Client) Stats(ctx context.Context) (*Stats, error) {
if c == nil {
return nil, fmt.Errorf("ai-service not configured")
}
req, err := c.request(ctx, http.MethodGet, "/api/v1/stats", nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("ai stats: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("ai stats: http %d: %s", resp.StatusCode, readSmall(resp.Body))
}
var out Stats
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode ai stats: %w", err)
}
return &out, nil
}
func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) { func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) {
var r io.Reader var r io.Reader
if body != nil { if body != nil {

View File

@@ -78,6 +78,10 @@ class Channel(Base):
is_active: Mapped[bool] = mapped_column(default=True, server_default="true") is_active: Mapped[bool] = mapped_column(default=True, server_default="true")
last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True) last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
last_poll_status: Mapped[str | None] = mapped_column(String(32), nullable=True)
last_poll_error_code: Mapped[str | None] = mapped_column(String(64), nullable=True)
last_poll_error: Mapped[str | None] = mapped_column(Text, nullable=True)
last_poll_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now() DateTime(timezone=True), server_default=func.now()
) )

View File

@@ -2,7 +2,7 @@ from datetime import datetime, timezone
import structlog import structlog
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy import func, select from sqlalchemy import func, or_, select
from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
@@ -83,6 +83,17 @@ def _translate_telegram_error(exc: Exception, identifier: str) -> PollError:
async def poll_channel(channel_id: int) -> int: async def poll_channel(channel_id: int) -> int:
"""Poll one channel for new messages. Returns count of inserted rows.""" """Poll one channel for new messages. Returns count of inserted rows."""
try:
return await _poll_channel(channel_id)
except PollError as exc:
await _record_poll_error(channel_id, exc.code, exc.message)
raise
except Exception as exc:
await _record_poll_error(channel_id, "poll_failed", str(exc))
raise
async def _poll_channel(channel_id: int) -> int:
if not await is_authorized(): if not await is_authorized():
raise PollUnauthorizedError( raise PollUnauthorizedError(
"Telegram is not authorized: open Monitoring TG in Portal and authorize it" "Telegram is not authorized: open Monitoring TG in Portal and authorize it"
@@ -99,6 +110,10 @@ async def poll_channel(channel_id: int) -> int:
channel.title = channel.title or source.title channel.title = channel.title or source.title
channel.last_message_id = source.last_message_id channel.last_message_id = source.last_message_id
channel.last_polled_at = source.last_polled_at channel.last_polled_at = source.last_polled_at
channel.last_poll_status = "alias"
channel.last_poll_error_code = None
channel.last_poll_error = None
channel.last_poll_error_at = None
return 0 return 0
if channel.tg_id is None or channel.title is None: if channel.tg_id is None or channel.title is None:
@@ -122,6 +137,10 @@ async def poll_channel(channel_id: int) -> int:
channel.title = channel.title or resolved.title or (source.title if source else None) channel.title = channel.title or resolved.title or (source.title if source else None)
channel.last_message_id = source.last_message_id if source else channel.last_message_id channel.last_message_id = source.last_message_id if source else channel.last_message_id
channel.last_polled_at = source.last_polled_at if source else channel.last_polled_at channel.last_polled_at = source.last_polled_at if source else channel.last_polled_at
channel.last_poll_status = "alias"
channel.last_poll_error_code = None
channel.last_poll_error = None
channel.last_poll_error_at = None
log.info( log.info(
"linked_channel_alias", "linked_channel_alias",
channel_id=channel.id, channel_id=channel.id,
@@ -180,6 +199,10 @@ async def poll_channel(channel_id: int) -> int:
channel.last_message_id or 0, msgs[-1].tg_message_id channel.last_message_id or 0, msgs[-1].tg_message_id
) )
channel.last_polled_at = datetime.now(timezone.utc) channel.last_polled_at = datetime.now(timezone.utc)
channel.last_poll_status = "ok"
channel.last_poll_error_code = None
channel.last_poll_error = None
channel.last_poll_error_at = None
log.info( log.info(
"polled_channel", "polled_channel",
@@ -191,6 +214,17 @@ async def poll_channel(channel_id: int) -> int:
return inserted return inserted
async def _record_poll_error(channel_id: int, code: str, message: str) -> None:
async with session_scope() as session:
channel = await session.get(Channel, channel_id)
if channel is None:
return
channel.last_poll_status = "error"
channel.last_poll_error_code = code
channel.last_poll_error = message[:1000]
channel.last_poll_error_at = datetime.now(timezone.utc)
async def poll_all() -> None: async def poll_all() -> None:
if not await is_authorized(): if not await is_authorized():
log.debug("poll_skipped_not_authorized") log.debug("poll_skipped_not_authorized")
@@ -228,10 +262,15 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
if channel is None: if channel is None:
raise RuntimeError("channel not found") raise RuntimeError("channel not found")
missing_media_condition = or_(
Message.media_files.is_(None),
func.jsonb_array_length(Message.media_files) == 0,
)
pending_q = select(func.count(Message.id)).where( pending_q = select(func.count(Message.id)).where(
Message.channel_id == channel_id, Message.channel_id == channel_id,
Message.has_media.is_(True), Message.has_media.is_(True),
Message.media_files.is_(None), missing_media_condition,
) )
pending_total = (await session.execute(pending_q)).scalar_one() pending_total = (await session.execute(pending_q)).scalar_one()
@@ -241,7 +280,7 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
.where( .where(
Message.channel_id == channel_id, Message.channel_id == channel_id,
Message.has_media.is_(True), Message.has_media.is_(True),
Message.media_files.is_(None), missing_media_condition,
) )
.order_by(Message.tg_message_id.asc()) .order_by(Message.tg_message_id.asc())
.limit(batch_size) .limit(batch_size)