Compare commits
7 Commits
73afcb64d5
...
cdbdea250d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cdbdea250d | ||
|
|
696b7eda6f | ||
|
|
bd3b54dc7d | ||
|
|
6cbfdb92e4 | ||
|
|
7cc5109cba | ||
|
|
90c0a346ed | ||
|
|
7f7f2427cb |
34
alembic/versions/0013_channel_poll_status.py
Normal file
34
alembic/versions/0013_channel_poll_status.py
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
"""channel poll status
|
||||||
|
|
||||||
|
Revision ID: 0013
|
||||||
|
Revises: 0012
|
||||||
|
Create Date: 2026-06-17
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
revision: str = "0013"
|
||||||
|
down_revision: Union[str, None] = "0012"
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
op.add_column("channels", sa.Column("last_poll_status", sa.String(length=32), nullable=True))
|
||||||
|
op.add_column("channels", sa.Column("last_poll_error_code", sa.String(length=64), nullable=True))
|
||||||
|
op.add_column("channels", sa.Column("last_poll_error", sa.Text(), nullable=True))
|
||||||
|
op.add_column("channels", sa.Column("last_poll_error_at", sa.DateTime(timezone=True), nullable=True))
|
||||||
|
op.create_index("ix_channels_last_poll_status", "channels", ["last_poll_status"])
|
||||||
|
op.create_index("ix_channels_last_poll_error_code", "channels", ["last_poll_error_code"])
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
op.drop_index("ix_channels_last_poll_error_code", table_name="channels")
|
||||||
|
op.drop_index("ix_channels_last_poll_status", table_name="channels")
|
||||||
|
op.drop_column("channels", "last_poll_error_at")
|
||||||
|
op.drop_column("channels", "last_poll_error")
|
||||||
|
op.drop_column("channels", "last_poll_error_code")
|
||||||
|
op.drop_column("channels", "last_poll_status")
|
||||||
@@ -23,6 +23,7 @@ import (
|
|||||||
"monitoring-tg/internal/aiservice"
|
"monitoring-tg/internal/aiservice"
|
||||||
"monitoring-tg/internal/dbretry"
|
"monitoring-tg/internal/dbretry"
|
||||||
|
|
||||||
|
commonmw "gitea.estateliga.work/admin/portal-common/middleware"
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
"github.com/minio/minio-go/v7"
|
"github.com/minio/minio-go/v7"
|
||||||
@@ -129,6 +130,14 @@ type messageOut struct {
|
|||||||
FetchedAt time.Time `json:"fetched_at"`
|
FetchedAt time.Time `json:"fetched_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type componentProbe struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
LatencyMs int64 `json:"latency_ms"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
Details any `json:"details,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
cfg := loadConfig()
|
cfg := loadConfig()
|
||||||
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
||||||
@@ -185,6 +194,10 @@ func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if path == "/health/detail" {
|
||||||
|
a.handleHealthDetail(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
if path == "/" {
|
if path == "/" {
|
||||||
writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-tg", "ui": "portal", "api": "go"})
|
writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-tg", "ui": "portal", "api": "go"})
|
||||||
return
|
return
|
||||||
@@ -193,10 +206,11 @@ func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeError(w, http.StatusNotFound, "not found")
|
writeError(w, http.StatusNotFound, "not found")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !a.checkInternalAuth(w, r) {
|
commonmw.InternalAuth(a.cfg.InternalAPIKey)(http.HandlerFunc(a.serveAPI)).ServeHTTP(w, r)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *app) serveAPI(w http.ResponseWriter, r *http.Request) {
|
||||||
|
path := a.apiPath(r.URL.Path)
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
switch {
|
switch {
|
||||||
case r.Method == http.MethodGet && path == "/api/v1/access/me":
|
case r.Method == http.MethodGet && path == "/api/v1/access/me":
|
||||||
@@ -232,18 +246,6 @@ func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) checkInternalAuth(w http.ResponseWriter, r *http.Request) bool {
|
|
||||||
want := strings.TrimSpace(a.cfg.InternalAPIKey)
|
|
||||||
if want == "" {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if r.Header.Get("X-Internal-Key") != want {
|
|
||||||
writeError(w, http.StatusUnauthorized, "unauthorized")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *app) apiPath(path string) string {
|
func (a *app) apiPath(path string) string {
|
||||||
base := strings.TrimRight(a.cfg.PublicBasePath, "/")
|
base := strings.TrimRight(a.cfg.PublicBasePath, "/")
|
||||||
if base != "" && strings.HasPrefix(path, base+"/") {
|
if base != "" && strings.HasPrefix(path, base+"/") {
|
||||||
@@ -255,6 +257,382 @@ func (a *app) apiPath(path string) string {
|
|||||||
return path
|
return path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *app) handleHealthDetail(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
components := []componentProbe{
|
||||||
|
a.probePostgres(ctx),
|
||||||
|
a.probeAIService(ctx),
|
||||||
|
a.probeClassificationQueue(ctx),
|
||||||
|
a.probeAIJobs(ctx),
|
||||||
|
a.probePoller(ctx),
|
||||||
|
a.probePollErrors(ctx),
|
||||||
|
a.probeMediaStorage(ctx),
|
||||||
|
a.probeMediaMetadata(ctx),
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"components": components})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) probePostgres(ctx context.Context) componentProbe {
|
||||||
|
start := time.Now()
|
||||||
|
if err := a.db.Ping(ctx); err != nil {
|
||||||
|
return componentProbe{Name: "postgres", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||||
|
}
|
||||||
|
return componentProbe{Name: "postgres", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) probeAIService(ctx context.Context) componentProbe {
|
||||||
|
start := time.Now()
|
||||||
|
if !a.cfg.LLMEnabled {
|
||||||
|
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm disabled"}
|
||||||
|
}
|
||||||
|
status, err := a.ai.ProvidersStatus(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||||
|
}
|
||||||
|
for _, provider := range status.Providers {
|
||||||
|
if provider.Name != "llm" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if provider.Configured && provider.OK {
|
||||||
|
return componentProbe{Name: "ai_service", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||||
|
}
|
||||||
|
errMsg := strings.TrimSpace(provider.Error)
|
||||||
|
if errMsg == "" {
|
||||||
|
errMsg = "llm provider is not ready"
|
||||||
|
}
|
||||||
|
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: errMsg}
|
||||||
|
}
|
||||||
|
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm provider not found"}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) probeClassificationQueue(ctx context.Context) componentProbe {
|
||||||
|
start := time.Now()
|
||||||
|
var pending, pending24h int64
|
||||||
|
err := a.db.QueryRow(ctx, `
|
||||||
|
SELECT
|
||||||
|
COUNT(*)::bigint,
|
||||||
|
COUNT(*) FILTER (WHERE m.fetched_at >= now() - interval '24 hours')::bigint
|
||||||
|
FROM channels c
|
||||||
|
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
|
||||||
|
JOIN messages m ON m.channel_id = src.id
|
||||||
|
JOIN sections s ON s.id = c.section_id
|
||||||
|
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
|
||||||
|
WHERE m.text IS NOT NULL
|
||||||
|
AND mc.id IS NULL`,
|
||||||
|
).Scan(&pending, &pending24h)
|
||||||
|
if err != nil {
|
||||||
|
return componentProbe{Name: "classification_queue", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||||
|
}
|
||||||
|
if pending > 0 {
|
||||||
|
return componentProbe{
|
||||||
|
Name: "classification_queue",
|
||||||
|
Status: "down",
|
||||||
|
LatencyMs: time.Since(start).Milliseconds(),
|
||||||
|
Error: "pending=" + strconv.FormatInt(pending, 10) + " pending_24h=" + strconv.FormatInt(pending24h, 10),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return componentProbe{Name: "classification_queue", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) probeAIJobs(ctx context.Context) componentProbe {
|
||||||
|
start := time.Now()
|
||||||
|
if !a.cfg.LLMEnabled {
|
||||||
|
return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm disabled"}
|
||||||
|
}
|
||||||
|
stats, err := a.ai.Stats(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||||
|
}
|
||||||
|
var pending, running, staleRunning, failed, failed24h int64
|
||||||
|
for _, row := range stats.Backlog {
|
||||||
|
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pending += row.Pending
|
||||||
|
running += row.Running
|
||||||
|
staleRunning += row.StaleRunning
|
||||||
|
}
|
||||||
|
for _, row := range stats.Owners {
|
||||||
|
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if row.Status == "failed" {
|
||||||
|
failed += row.Total
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, row := range stats.Errors {
|
||||||
|
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
failed24h += row.Last24h
|
||||||
|
}
|
||||||
|
if staleRunning > 0 || failed24h > 0 {
|
||||||
|
return componentProbe{
|
||||||
|
Name: "ai_jobs",
|
||||||
|
Status: "down",
|
||||||
|
LatencyMs: time.Since(start).Milliseconds(),
|
||||||
|
Error: "pending=" + strconv.FormatInt(pending, 10) +
|
||||||
|
" running=" + strconv.FormatInt(running, 10) +
|
||||||
|
" stale_running=" + strconv.FormatInt(staleRunning, 10) +
|
||||||
|
" failed=" + strconv.FormatInt(failed, 10) +
|
||||||
|
" failed_24h=" + strconv.FormatInt(failed24h, 10),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if pending > 0 || running > 0 || failed > 0 {
|
||||||
|
return componentProbe{
|
||||||
|
Name: "ai_jobs",
|
||||||
|
Status: "degraded",
|
||||||
|
LatencyMs: time.Since(start).Milliseconds(),
|
||||||
|
Error: "pending=" + strconv.FormatInt(pending, 10) +
|
||||||
|
" running=" + strconv.FormatInt(running, 10) +
|
||||||
|
" stale_running=" + strconv.FormatInt(staleRunning, 10) +
|
||||||
|
" failed=" + strconv.FormatInt(failed, 10),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return componentProbe{Name: "ai_jobs", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) probePoller(ctx context.Context) componentProbe {
|
||||||
|
start := time.Now()
|
||||||
|
staleAfter := maxInt(a.cfg.PollIntervalSeconds*3, 900)
|
||||||
|
var active, neverPolled, stale int64
|
||||||
|
err := a.db.QueryRow(ctx, `
|
||||||
|
SELECT
|
||||||
|
COUNT(*)::bigint,
|
||||||
|
COUNT(*) FILTER (WHERE last_polled_at IS NULL)::bigint,
|
||||||
|
COUNT(*) FILTER (WHERE last_polled_at IS NOT NULL AND last_polled_at < now() - ($1::int * interval '1 second'))::bigint
|
||||||
|
FROM channels
|
||||||
|
WHERE is_active = true
|
||||||
|
AND source_channel_id IS NULL`,
|
||||||
|
staleAfter,
|
||||||
|
).Scan(&active, &neverPolled, &stale)
|
||||||
|
if err != nil {
|
||||||
|
return componentProbe{Name: "poller", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||||
|
}
|
||||||
|
if active > 0 && (neverPolled > 0 || stale > 0) {
|
||||||
|
return componentProbe{
|
||||||
|
Name: "poller",
|
||||||
|
Status: "down",
|
||||||
|
LatencyMs: time.Since(start).Milliseconds(),
|
||||||
|
Error: "active=" + strconv.FormatInt(active, 10) +
|
||||||
|
" never_polled=" + strconv.FormatInt(neverPolled, 10) +
|
||||||
|
" stale=" + strconv.FormatInt(stale, 10) +
|
||||||
|
" stale_after_sec=" + strconv.Itoa(staleAfter),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return componentProbe{Name: "poller", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) probePollErrors(ctx context.Context) componentProbe {
|
||||||
|
start := time.Now()
|
||||||
|
var total, floodWait, unavailable, other int64
|
||||||
|
err := a.db.QueryRow(ctx, `
|
||||||
|
SELECT
|
||||||
|
COUNT(*) FILTER (WHERE last_poll_status = 'error')::bigint,
|
||||||
|
COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_flood_wait')::bigint,
|
||||||
|
COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_channel_unavailable')::bigint,
|
||||||
|
COUNT(*) FILTER (
|
||||||
|
WHERE last_poll_status = 'error'
|
||||||
|
AND COALESCE(last_poll_error_code, '') NOT IN ('telegram_flood_wait', 'telegram_channel_unavailable')
|
||||||
|
)::bigint
|
||||||
|
FROM channels
|
||||||
|
WHERE is_active = true
|
||||||
|
AND source_channel_id IS NULL`,
|
||||||
|
).Scan(&total, &floodWait, &unavailable, &other)
|
||||||
|
if err != nil {
|
||||||
|
return componentProbe{Name: "poll_errors", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||||
|
}
|
||||||
|
if total > 0 {
|
||||||
|
recent, recentErr := a.recentPollErrors(ctx)
|
||||||
|
status := "degraded"
|
||||||
|
if other > 0 {
|
||||||
|
status = "down"
|
||||||
|
}
|
||||||
|
details := map[string]any{
|
||||||
|
"total": total,
|
||||||
|
"flood_wait": floodWait,
|
||||||
|
"unavailable": unavailable,
|
||||||
|
"other": other,
|
||||||
|
}
|
||||||
|
if recentErr != nil {
|
||||||
|
details["recent_error"] = recentErr.Error()
|
||||||
|
} else {
|
||||||
|
details["recent_errors"] = recent
|
||||||
|
}
|
||||||
|
return componentProbe{
|
||||||
|
Name: "poll_errors",
|
||||||
|
Status: status,
|
||||||
|
LatencyMs: time.Since(start).Milliseconds(),
|
||||||
|
Error: "total=" + strconv.FormatInt(total, 10) +
|
||||||
|
" flood_wait=" + strconv.FormatInt(floodWait, 10) +
|
||||||
|
" unavailable=" + strconv.FormatInt(unavailable, 10) +
|
||||||
|
" other=" + strconv.FormatInt(other, 10),
|
||||||
|
Details: details,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return componentProbe{Name: "poll_errors", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) recentPollErrors(ctx context.Context) ([]map[string]any, error) {
|
||||||
|
rows, err := a.db.Query(ctx, `
|
||||||
|
SELECT
|
||||||
|
c.id,
|
||||||
|
c.identifier,
|
||||||
|
COALESCE(c.title, '') AS title,
|
||||||
|
s.slug,
|
||||||
|
s.title AS section_title,
|
||||||
|
COALESCE(c.last_poll_error_code, '') AS error_code,
|
||||||
|
COALESCE(c.last_poll_error, '') AS error_text,
|
||||||
|
c.last_poll_error_at
|
||||||
|
FROM channels c
|
||||||
|
LEFT JOIN sections s ON s.id = c.section_id
|
||||||
|
WHERE c.is_active = true
|
||||||
|
AND c.source_channel_id IS NULL
|
||||||
|
AND c.last_poll_status = 'error'
|
||||||
|
ORDER BY c.last_poll_error_at DESC NULLS LAST, c.id DESC
|
||||||
|
LIMIT 5`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
out := make([]map[string]any, 0, 5)
|
||||||
|
for rows.Next() {
|
||||||
|
var (
|
||||||
|
id int64
|
||||||
|
identifier string
|
||||||
|
title string
|
||||||
|
sectionSlug sql.NullString
|
||||||
|
sectionTitle sql.NullString
|
||||||
|
code string
|
||||||
|
text string
|
||||||
|
at sql.NullTime
|
||||||
|
)
|
||||||
|
if err := rows.Scan(&id, &identifier, &title, §ionSlug, §ionTitle, &code, &text, &at); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out = append(out, map[string]any{
|
||||||
|
"channel_id": id,
|
||||||
|
"identifier": identifier,
|
||||||
|
"title": nullableString(title),
|
||||||
|
"section_slug": nullString(sectionSlug),
|
||||||
|
"section_title": nullString(sectionTitle),
|
||||||
|
"error_code": nullableString(code),
|
||||||
|
"error": nullableString(text),
|
||||||
|
"error_at": nullTime(at),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) probeMediaStorage(ctx context.Context) componentProbe {
|
||||||
|
start := time.Now()
|
||||||
|
if a.minio == nil || a.cfg.MinioBucket == "" {
|
||||||
|
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "minio not configured"}
|
||||||
|
}
|
||||||
|
exists, err := a.minio.BucketExists(ctx, a.cfg.MinioBucket)
|
||||||
|
if err != nil {
|
||||||
|
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "bucket not found: " + a.cfg.MinioBucket}
|
||||||
|
}
|
||||||
|
return componentProbe{Name: "media_storage", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) probeMediaMetadata(ctx context.Context) componentProbe {
|
||||||
|
start := time.Now()
|
||||||
|
var withMedia, missingFiles int64
|
||||||
|
err := a.db.QueryRow(ctx, `
|
||||||
|
SELECT
|
||||||
|
COUNT(*) FILTER (WHERE has_media = true)::bigint,
|
||||||
|
COUNT(*) FILTER (
|
||||||
|
WHERE has_media = true
|
||||||
|
AND jsonb_array_length(COALESCE(media_files, '[]'::jsonb)) = 0
|
||||||
|
)::bigint
|
||||||
|
FROM messages`,
|
||||||
|
).Scan(&withMedia, &missingFiles)
|
||||||
|
if err != nil {
|
||||||
|
return componentProbe{Name: "media_metadata", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||||
|
}
|
||||||
|
if missingFiles > 0 {
|
||||||
|
recent, recentErr := a.recentMissingMedia(ctx)
|
||||||
|
details := map[string]any{
|
||||||
|
"messages_with_media": withMedia,
|
||||||
|
"missing_media_files": missingFiles,
|
||||||
|
}
|
||||||
|
if recentErr != nil {
|
||||||
|
details["recent_error"] = recentErr.Error()
|
||||||
|
} else {
|
||||||
|
details["recent_missing_media"] = recent
|
||||||
|
}
|
||||||
|
return componentProbe{
|
||||||
|
Name: "media_metadata",
|
||||||
|
Status: "down",
|
||||||
|
LatencyMs: time.Since(start).Milliseconds(),
|
||||||
|
Error: "messages_with_media=" + strconv.FormatInt(withMedia, 10) +
|
||||||
|
" missing_media_files=" + strconv.FormatInt(missingFiles, 10),
|
||||||
|
Details: details,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return componentProbe{Name: "media_metadata", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) recentMissingMedia(ctx context.Context) ([]map[string]any, error) {
|
||||||
|
rows, err := a.db.Query(ctx, `
|
||||||
|
SELECT
|
||||||
|
m.id,
|
||||||
|
m.tg_message_id,
|
||||||
|
m.date,
|
||||||
|
c.id AS channel_id,
|
||||||
|
c.identifier,
|
||||||
|
COALESCE(c.title, '') AS channel_title,
|
||||||
|
s.slug,
|
||||||
|
s.title AS section_title
|
||||||
|
FROM messages m
|
||||||
|
JOIN channels c ON c.id = m.channel_id
|
||||||
|
LEFT JOIN sections s ON s.id = c.section_id
|
||||||
|
WHERE m.has_media = true
|
||||||
|
AND jsonb_array_length(COALESCE(m.media_files, '[]'::jsonb)) = 0
|
||||||
|
ORDER BY m.date DESC, m.id DESC
|
||||||
|
LIMIT 5`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
out := make([]map[string]any, 0, 5)
|
||||||
|
for rows.Next() {
|
||||||
|
var (
|
||||||
|
id int64
|
||||||
|
tgMessageID int64
|
||||||
|
date time.Time
|
||||||
|
channelID int64
|
||||||
|
identifier string
|
||||||
|
channelTitle string
|
||||||
|
sectionSlug sql.NullString
|
||||||
|
sectionTitle sql.NullString
|
||||||
|
)
|
||||||
|
if err := rows.Scan(&id, &tgMessageID, &date, &channelID, &identifier, &channelTitle, §ionSlug, §ionTitle); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out = append(out, map[string]any{
|
||||||
|
"message_id": id,
|
||||||
|
"tg_message_id": tgMessageID,
|
||||||
|
"message_date": date,
|
||||||
|
"channel_id": channelID,
|
||||||
|
"identifier": identifier,
|
||||||
|
"channel_title": nullableString(channelTitle),
|
||||||
|
"section_slug": nullString(sectionSlug),
|
||||||
|
"section_title": nullString(sectionTitle),
|
||||||
|
"repair_action": "/api/v1/channels/" + strconv.FormatInt(channelID, 10) + "/backfill-media",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
func (a *app) handleAccessMe(w http.ResponseWriter, r *http.Request) {
|
func (a *app) handleAccessMe(w http.ResponseWriter, r *http.Request) {
|
||||||
scope := readAccess(r)
|
scope := readAccess(r)
|
||||||
writeJSON(w, http.StatusOK, map[string]any{
|
writeJSON(w, http.StatusOK, map[string]any{
|
||||||
@@ -1515,12 +1893,12 @@ func (a *app) readScope(w http.ResponseWriter, r *http.Request, manage bool) (ac
|
|||||||
}
|
}
|
||||||
|
|
||||||
func readAccess(r *http.Request) accessScope {
|
func readAccess(r *http.Request) accessScope {
|
||||||
admin := r.Header.Get("X-User-Is-Admin") == "1"
|
admin := commonmw.HeaderBool(r, "X-User-Is-Admin")
|
||||||
deptHead := r.Header.Get("X-User-Is-Department-Head") == "1"
|
deptHead := commonmw.HeaderBool(r, "X-User-Is-Department-Head")
|
||||||
canManage := r.Header.Get("X-Monitoring-TG-Can-Manage") == "1"
|
canManage := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-Manage")
|
||||||
canAuth := r.Header.Get("X-Monitoring-TG-Can-Auth") == "1"
|
canAuth := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-Auth")
|
||||||
deptID := strings.TrimSpace(r.Header.Get("X-User-Department-Id"))
|
deptID := strings.TrimSpace(r.Header.Get("X-User-Department-Id"))
|
||||||
deptIDs := parseCSVHeader(r.Header.Get("X-User-Department-Ids"))
|
deptIDs := commonmw.HeaderCSV(r, "X-User-Department-Ids")
|
||||||
if deptID != "" {
|
if deptID != "" {
|
||||||
deptIDs = appendUniqueString(deptIDs, deptID)
|
deptIDs = appendUniqueString(deptIDs, deptID)
|
||||||
}
|
}
|
||||||
@@ -1533,19 +1911,6 @@ func readAccess(r *http.Request) accessScope {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseCSVHeader(raw string) []string {
|
|
||||||
raw = strings.TrimSpace(raw)
|
|
||||||
if raw == "" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
parts := strings.Split(raw, ",")
|
|
||||||
out := make([]string, 0, len(parts))
|
|
||||||
for _, part := range parts {
|
|
||||||
out = appendUniqueString(out, strings.TrimSpace(part))
|
|
||||||
}
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
func appendUniqueString(items []string, value string) []string {
|
func appendUniqueString(items []string, value string) []string {
|
||||||
value = strings.TrimSpace(value)
|
value = strings.TrimSpace(value)
|
||||||
if value == "" {
|
if value == "" {
|
||||||
|
|||||||
1
go.mod
1
go.mod
@@ -3,6 +3,7 @@ module monitoring-tg
|
|||||||
go 1.25.7
|
go 1.25.7
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
gitea.estateliga.work/admin/portal-common v0.3.0
|
||||||
github.com/jackc/pgx/v5 v5.9.1
|
github.com/jackc/pgx/v5 v5.9.1
|
||||||
github.com/minio/minio-go/v7 v7.2.0
|
github.com/minio/minio-go/v7 v7.2.0
|
||||||
)
|
)
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -1,3 +1,5 @@
|
|||||||
|
gitea.estateliga.work/admin/portal-common v0.3.0 h1:xpr9UeLXk5pCcNXcTVGZzJZr0Ni7An7DV0OkuYv9qVM=
|
||||||
|
gitea.estateliga.work/admin/portal-common v0.3.0/go.mod h1:C860q6g38KVMsv+mKv6k1Vm7smVRCycl+N6r63TElnk=
|
||||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||||
|
|||||||
@@ -83,6 +83,41 @@ type ProviderStatus struct {
|
|||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Stats struct {
|
||||||
|
At time.Time `json:"at"`
|
||||||
|
Owners []OwnerStat `json:"owners,omitempty"`
|
||||||
|
Errors []ErrorStat `json:"errors,omitempty"`
|
||||||
|
Backlog []BacklogStat `json:"backlog,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type OwnerStat struct {
|
||||||
|
OwnerService string `json:"owner_service"`
|
||||||
|
TaskType string `json:"task_type"`
|
||||||
|
ModelProfile string `json:"model_profile"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Total int64 `json:"total"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ErrorStat struct {
|
||||||
|
OwnerService string `json:"owner_service,omitempty"`
|
||||||
|
TaskType string `json:"task_type"`
|
||||||
|
ModelProfile string `json:"model_profile"`
|
||||||
|
ErrorCode string `json:"error_code"`
|
||||||
|
Total int64 `json:"total"`
|
||||||
|
Last24h int64 `json:"last_24h"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type BacklogStat struct {
|
||||||
|
OwnerService string `json:"owner_service"`
|
||||||
|
TaskType string `json:"task_type"`
|
||||||
|
ModelProfile string `json:"model_profile"`
|
||||||
|
Pending int64 `json:"pending"`
|
||||||
|
Running int64 `json:"running"`
|
||||||
|
StaleRunning int64 `json:"stale_running"`
|
||||||
|
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
|
||||||
|
OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"`
|
||||||
|
}
|
||||||
|
|
||||||
func New(baseURL, token string, timeout time.Duration) *Client {
|
func New(baseURL, token string, timeout time.Duration) *Client {
|
||||||
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
|
baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
|
||||||
if baseURL == "" {
|
if baseURL == "" {
|
||||||
@@ -223,6 +258,29 @@ func (c *Client) ProvidersStatus(ctx context.Context) (*ProvidersStatus, error)
|
|||||||
return &out, nil
|
return &out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) Stats(ctx context.Context) (*Stats, error) {
|
||||||
|
if c == nil {
|
||||||
|
return nil, fmt.Errorf("ai-service not configured")
|
||||||
|
}
|
||||||
|
req, err := c.request(ctx, http.MethodGet, "/api/v1/stats", nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp, err := c.http.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("ai stats: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
|
return nil, fmt.Errorf("ai stats: http %d: %s", resp.StatusCode, readSmall(resp.Body))
|
||||||
|
}
|
||||||
|
var out Stats
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode ai stats: %w", err)
|
||||||
|
}
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) {
|
func (c *Client) request(ctx context.Context, method, path string, body []byte) (*http.Request, error) {
|
||||||
var r io.Reader
|
var r io.Reader
|
||||||
if body != nil {
|
if body != nil {
|
||||||
|
|||||||
@@ -78,6 +78,10 @@ class Channel(Base):
|
|||||||
is_active: Mapped[bool] = mapped_column(default=True, server_default="true")
|
is_active: Mapped[bool] = mapped_column(default=True, server_default="true")
|
||||||
last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
|
last_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
|
||||||
last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
last_polled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
||||||
|
last_poll_status: Mapped[str | None] = mapped_column(String(32), nullable=True)
|
||||||
|
last_poll_error_code: Mapped[str | None] = mapped_column(String(64), nullable=True)
|
||||||
|
last_poll_error: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||||
|
last_poll_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
||||||
created_at: Mapped[datetime] = mapped_column(
|
created_at: Mapped[datetime] = mapped_column(
|
||||||
DateTime(timezone=True), server_default=func.now()
|
DateTime(timezone=True), server_default=func.now()
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ from datetime import datetime, timezone
|
|||||||
|
|
||||||
import structlog
|
import structlog
|
||||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||||
from sqlalchemy import func, select
|
from sqlalchemy import func, or_, select
|
||||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||||
from sqlalchemy.exc import IntegrityError
|
from sqlalchemy.exc import IntegrityError
|
||||||
|
|
||||||
@@ -83,6 +83,17 @@ def _translate_telegram_error(exc: Exception, identifier: str) -> PollError:
|
|||||||
|
|
||||||
async def poll_channel(channel_id: int) -> int:
|
async def poll_channel(channel_id: int) -> int:
|
||||||
"""Poll one channel for new messages. Returns count of inserted rows."""
|
"""Poll one channel for new messages. Returns count of inserted rows."""
|
||||||
|
try:
|
||||||
|
return await _poll_channel(channel_id)
|
||||||
|
except PollError as exc:
|
||||||
|
await _record_poll_error(channel_id, exc.code, exc.message)
|
||||||
|
raise
|
||||||
|
except Exception as exc:
|
||||||
|
await _record_poll_error(channel_id, "poll_failed", str(exc))
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
async def _poll_channel(channel_id: int) -> int:
|
||||||
if not await is_authorized():
|
if not await is_authorized():
|
||||||
raise PollUnauthorizedError(
|
raise PollUnauthorizedError(
|
||||||
"Telegram is not authorized: open Monitoring TG in Portal and authorize it"
|
"Telegram is not authorized: open Monitoring TG in Portal and authorize it"
|
||||||
@@ -99,6 +110,10 @@ async def poll_channel(channel_id: int) -> int:
|
|||||||
channel.title = channel.title or source.title
|
channel.title = channel.title or source.title
|
||||||
channel.last_message_id = source.last_message_id
|
channel.last_message_id = source.last_message_id
|
||||||
channel.last_polled_at = source.last_polled_at
|
channel.last_polled_at = source.last_polled_at
|
||||||
|
channel.last_poll_status = "alias"
|
||||||
|
channel.last_poll_error_code = None
|
||||||
|
channel.last_poll_error = None
|
||||||
|
channel.last_poll_error_at = None
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
if channel.tg_id is None or channel.title is None:
|
if channel.tg_id is None or channel.title is None:
|
||||||
@@ -122,6 +137,10 @@ async def poll_channel(channel_id: int) -> int:
|
|||||||
channel.title = channel.title or resolved.title or (source.title if source else None)
|
channel.title = channel.title or resolved.title or (source.title if source else None)
|
||||||
channel.last_message_id = source.last_message_id if source else channel.last_message_id
|
channel.last_message_id = source.last_message_id if source else channel.last_message_id
|
||||||
channel.last_polled_at = source.last_polled_at if source else channel.last_polled_at
|
channel.last_polled_at = source.last_polled_at if source else channel.last_polled_at
|
||||||
|
channel.last_poll_status = "alias"
|
||||||
|
channel.last_poll_error_code = None
|
||||||
|
channel.last_poll_error = None
|
||||||
|
channel.last_poll_error_at = None
|
||||||
log.info(
|
log.info(
|
||||||
"linked_channel_alias",
|
"linked_channel_alias",
|
||||||
channel_id=channel.id,
|
channel_id=channel.id,
|
||||||
@@ -180,6 +199,10 @@ async def poll_channel(channel_id: int) -> int:
|
|||||||
channel.last_message_id or 0, msgs[-1].tg_message_id
|
channel.last_message_id or 0, msgs[-1].tg_message_id
|
||||||
)
|
)
|
||||||
channel.last_polled_at = datetime.now(timezone.utc)
|
channel.last_polled_at = datetime.now(timezone.utc)
|
||||||
|
channel.last_poll_status = "ok"
|
||||||
|
channel.last_poll_error_code = None
|
||||||
|
channel.last_poll_error = None
|
||||||
|
channel.last_poll_error_at = None
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
"polled_channel",
|
"polled_channel",
|
||||||
@@ -191,6 +214,17 @@ async def poll_channel(channel_id: int) -> int:
|
|||||||
return inserted
|
return inserted
|
||||||
|
|
||||||
|
|
||||||
|
async def _record_poll_error(channel_id: int, code: str, message: str) -> None:
|
||||||
|
async with session_scope() as session:
|
||||||
|
channel = await session.get(Channel, channel_id)
|
||||||
|
if channel is None:
|
||||||
|
return
|
||||||
|
channel.last_poll_status = "error"
|
||||||
|
channel.last_poll_error_code = code
|
||||||
|
channel.last_poll_error = message[:1000]
|
||||||
|
channel.last_poll_error_at = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
|
||||||
async def poll_all() -> None:
|
async def poll_all() -> None:
|
||||||
if not await is_authorized():
|
if not await is_authorized():
|
||||||
log.debug("poll_skipped_not_authorized")
|
log.debug("poll_skipped_not_authorized")
|
||||||
@@ -228,10 +262,15 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
|
|||||||
if channel is None:
|
if channel is None:
|
||||||
raise RuntimeError("channel not found")
|
raise RuntimeError("channel not found")
|
||||||
|
|
||||||
|
missing_media_condition = or_(
|
||||||
|
Message.media_files.is_(None),
|
||||||
|
func.jsonb_array_length(Message.media_files) == 0,
|
||||||
|
)
|
||||||
|
|
||||||
pending_q = select(func.count(Message.id)).where(
|
pending_q = select(func.count(Message.id)).where(
|
||||||
Message.channel_id == channel_id,
|
Message.channel_id == channel_id,
|
||||||
Message.has_media.is_(True),
|
Message.has_media.is_(True),
|
||||||
Message.media_files.is_(None),
|
missing_media_condition,
|
||||||
)
|
)
|
||||||
pending_total = (await session.execute(pending_q)).scalar_one()
|
pending_total = (await session.execute(pending_q)).scalar_one()
|
||||||
|
|
||||||
@@ -241,7 +280,7 @@ async def backfill_media(channel_id: int, batch_size: int = 50) -> dict[str, int
|
|||||||
.where(
|
.where(
|
||||||
Message.channel_id == channel_id,
|
Message.channel_id == channel_id,
|
||||||
Message.has_media.is_(True),
|
Message.has_media.is_(True),
|
||||||
Message.media_files.is_(None),
|
missing_media_condition,
|
||||||
)
|
)
|
||||||
.order_by(Message.tg_message_id.asc())
|
.order_by(Message.tg_message_id.asc())
|
||||||
.limit(batch_size)
|
.limit(batch_size)
|
||||||
|
|||||||
Reference in New Issue
Block a user