|
|
|
|
@@ -23,6 +23,7 @@ import (
|
|
|
|
|
"monitoring-tg/internal/aiservice"
|
|
|
|
|
"monitoring-tg/internal/dbretry"
|
|
|
|
|
|
|
|
|
|
commonmw "gitea.estateliga.work/admin/portal-common/middleware"
|
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
|
"github.com/minio/minio-go/v7"
|
|
|
|
|
@@ -72,7 +73,9 @@ type app struct {
|
|
|
|
|
type accessScope struct {
|
|
|
|
|
IsAdmin bool
|
|
|
|
|
CanManage bool
|
|
|
|
|
CanManageAll bool
|
|
|
|
|
CanAuth bool
|
|
|
|
|
CanViewAll bool
|
|
|
|
|
DeptID string
|
|
|
|
|
DeptIDs []string
|
|
|
|
|
}
|
|
|
|
|
@@ -129,6 +132,14 @@ type messageOut struct {
|
|
|
|
|
FetchedAt time.Time `json:"fetched_at"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type componentProbe struct {
|
|
|
|
|
Name string `json:"name"`
|
|
|
|
|
Status string `json:"status"`
|
|
|
|
|
LatencyMs int64 `json:"latency_ms"`
|
|
|
|
|
Error string `json:"error,omitempty"`
|
|
|
|
|
Details any `json:"details,omitempty"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
|
cfg := loadConfig()
|
|
|
|
|
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
|
|
|
|
@@ -185,6 +196,10 @@ func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if path == "/health/detail" {
|
|
|
|
|
a.handleHealthDetail(w, r)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if path == "/" {
|
|
|
|
|
writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-tg", "ui": "portal", "api": "go"})
|
|
|
|
|
return
|
|
|
|
|
@@ -193,10 +208,11 @@ func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
writeError(w, http.StatusNotFound, "not found")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if !a.checkInternalAuth(w, r) {
|
|
|
|
|
return
|
|
|
|
|
commonmw.InternalAuth(a.cfg.InternalAPIKey)(http.HandlerFunc(a.serveAPI)).ServeHTTP(w, r)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) serveAPI(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
path := a.apiPath(r.URL.Path)
|
|
|
|
|
ctx := r.Context()
|
|
|
|
|
switch {
|
|
|
|
|
case r.Method == http.MethodGet && path == "/api/v1/access/me":
|
|
|
|
|
@@ -232,18 +248,6 @@ func (a *app) serveHTTP(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) checkInternalAuth(w http.ResponseWriter, r *http.Request) bool {
|
|
|
|
|
want := strings.TrimSpace(a.cfg.InternalAPIKey)
|
|
|
|
|
if want == "" {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
if r.Header.Get("X-Internal-Key") != want {
|
|
|
|
|
writeError(w, http.StatusUnauthorized, "unauthorized")
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) apiPath(path string) string {
|
|
|
|
|
base := strings.TrimRight(a.cfg.PublicBasePath, "/")
|
|
|
|
|
if base != "" && strings.HasPrefix(path, base+"/") {
|
|
|
|
|
@@ -255,12 +259,390 @@ func (a *app) apiPath(path string) string {
|
|
|
|
|
return path
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) handleHealthDetail(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
components := []componentProbe{
|
|
|
|
|
a.probePostgres(ctx),
|
|
|
|
|
a.probeAIService(ctx),
|
|
|
|
|
a.probeClassificationQueue(ctx),
|
|
|
|
|
a.probeAIJobs(ctx),
|
|
|
|
|
a.probePoller(ctx),
|
|
|
|
|
a.probePollErrors(ctx),
|
|
|
|
|
a.probeMediaStorage(ctx),
|
|
|
|
|
a.probeMediaMetadata(ctx),
|
|
|
|
|
}
|
|
|
|
|
writeJSON(w, http.StatusOK, map[string]any{"components": components})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) probePostgres(ctx context.Context) componentProbe {
|
|
|
|
|
start := time.Now()
|
|
|
|
|
if err := a.db.Ping(ctx); err != nil {
|
|
|
|
|
return componentProbe{Name: "postgres", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
|
|
|
|
}
|
|
|
|
|
return componentProbe{Name: "postgres", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) probeAIService(ctx context.Context) componentProbe {
|
|
|
|
|
start := time.Now()
|
|
|
|
|
if !a.cfg.LLMEnabled {
|
|
|
|
|
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm disabled"}
|
|
|
|
|
}
|
|
|
|
|
status, err := a.ai.ProvidersStatus(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
|
|
|
|
}
|
|
|
|
|
for _, provider := range status.Providers {
|
|
|
|
|
if provider.Name != "llm" {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if provider.Configured && provider.OK {
|
|
|
|
|
return componentProbe{Name: "ai_service", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
|
|
|
|
}
|
|
|
|
|
errMsg := strings.TrimSpace(provider.Error)
|
|
|
|
|
if errMsg == "" {
|
|
|
|
|
errMsg = "llm provider is not ready"
|
|
|
|
|
}
|
|
|
|
|
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: errMsg}
|
|
|
|
|
}
|
|
|
|
|
return componentProbe{Name: "ai_service", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm provider not found"}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) probeClassificationQueue(ctx context.Context) componentProbe {
|
|
|
|
|
start := time.Now()
|
|
|
|
|
var pending, pending24h int64
|
|
|
|
|
err := a.db.QueryRow(ctx, `
|
|
|
|
|
SELECT
|
|
|
|
|
COUNT(*)::bigint,
|
|
|
|
|
COUNT(*) FILTER (WHERE m.fetched_at >= now() - interval '24 hours')::bigint
|
|
|
|
|
FROM channels c
|
|
|
|
|
JOIN channels src ON src.id = COALESCE(c.source_channel_id, c.id)
|
|
|
|
|
JOIN messages m ON m.channel_id = src.id
|
|
|
|
|
JOIN sections s ON s.id = c.section_id
|
|
|
|
|
LEFT JOIN message_classifications mc ON mc.message_id = m.id AND mc.section_id = s.id
|
|
|
|
|
WHERE m.text IS NOT NULL
|
|
|
|
|
AND mc.id IS NULL`,
|
|
|
|
|
).Scan(&pending, &pending24h)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return componentProbe{Name: "classification_queue", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
|
|
|
|
}
|
|
|
|
|
if pending > 0 {
|
|
|
|
|
return componentProbe{
|
|
|
|
|
Name: "classification_queue",
|
|
|
|
|
Status: "down",
|
|
|
|
|
LatencyMs: time.Since(start).Milliseconds(),
|
|
|
|
|
Error: "pending=" + strconv.FormatInt(pending, 10) + " pending_24h=" + strconv.FormatInt(pending24h, 10),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return componentProbe{Name: "classification_queue", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) probeAIJobs(ctx context.Context) componentProbe {
|
|
|
|
|
start := time.Now()
|
|
|
|
|
if !a.cfg.LLMEnabled {
|
|
|
|
|
return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "llm disabled"}
|
|
|
|
|
}
|
|
|
|
|
stats, err := a.ai.Stats(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return componentProbe{Name: "ai_jobs", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
|
|
|
|
}
|
|
|
|
|
var pending, running, staleRunning, failed, failed24h int64
|
|
|
|
|
for _, row := range stats.Backlog {
|
|
|
|
|
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
pending += row.Pending
|
|
|
|
|
running += row.Running
|
|
|
|
|
staleRunning += row.StaleRunning
|
|
|
|
|
}
|
|
|
|
|
for _, row := range stats.Owners {
|
|
|
|
|
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if row.Status == "failed" {
|
|
|
|
|
failed += row.Total
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for _, row := range stats.Errors {
|
|
|
|
|
if row.OwnerService != "monitoring-tg" || row.TaskType != "telegram_classification" {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
failed24h += row.Last24h
|
|
|
|
|
}
|
|
|
|
|
if staleRunning > 0 || failed24h > 0 {
|
|
|
|
|
return componentProbe{
|
|
|
|
|
Name: "ai_jobs",
|
|
|
|
|
Status: "down",
|
|
|
|
|
LatencyMs: time.Since(start).Milliseconds(),
|
|
|
|
|
Error: "pending=" + strconv.FormatInt(pending, 10) +
|
|
|
|
|
" running=" + strconv.FormatInt(running, 10) +
|
|
|
|
|
" stale_running=" + strconv.FormatInt(staleRunning, 10) +
|
|
|
|
|
" failed=" + strconv.FormatInt(failed, 10) +
|
|
|
|
|
" failed_24h=" + strconv.FormatInt(failed24h, 10),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if pending > 0 || running > 0 || failed > 0 {
|
|
|
|
|
return componentProbe{
|
|
|
|
|
Name: "ai_jobs",
|
|
|
|
|
Status: "degraded",
|
|
|
|
|
LatencyMs: time.Since(start).Milliseconds(),
|
|
|
|
|
Error: "pending=" + strconv.FormatInt(pending, 10) +
|
|
|
|
|
" running=" + strconv.FormatInt(running, 10) +
|
|
|
|
|
" stale_running=" + strconv.FormatInt(staleRunning, 10) +
|
|
|
|
|
" failed=" + strconv.FormatInt(failed, 10),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return componentProbe{Name: "ai_jobs", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) probePoller(ctx context.Context) componentProbe {
|
|
|
|
|
start := time.Now()
|
|
|
|
|
staleAfter := maxInt(a.cfg.PollIntervalSeconds*3, 900)
|
|
|
|
|
var active, neverPolled, stale int64
|
|
|
|
|
err := a.db.QueryRow(ctx, `
|
|
|
|
|
SELECT
|
|
|
|
|
COUNT(*)::bigint,
|
|
|
|
|
COUNT(*) FILTER (WHERE last_polled_at IS NULL)::bigint,
|
|
|
|
|
COUNT(*) FILTER (WHERE last_polled_at IS NOT NULL AND last_polled_at < now() - ($1::int * interval '1 second'))::bigint
|
|
|
|
|
FROM channels
|
|
|
|
|
WHERE is_active = true
|
|
|
|
|
AND source_channel_id IS NULL`,
|
|
|
|
|
staleAfter,
|
|
|
|
|
).Scan(&active, &neverPolled, &stale)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return componentProbe{Name: "poller", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
|
|
|
|
}
|
|
|
|
|
if active > 0 && (neverPolled > 0 || stale > 0) {
|
|
|
|
|
return componentProbe{
|
|
|
|
|
Name: "poller",
|
|
|
|
|
Status: "down",
|
|
|
|
|
LatencyMs: time.Since(start).Milliseconds(),
|
|
|
|
|
Error: "active=" + strconv.FormatInt(active, 10) +
|
|
|
|
|
" never_polled=" + strconv.FormatInt(neverPolled, 10) +
|
|
|
|
|
" stale=" + strconv.FormatInt(stale, 10) +
|
|
|
|
|
" stale_after_sec=" + strconv.Itoa(staleAfter),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return componentProbe{Name: "poller", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) probePollErrors(ctx context.Context) componentProbe {
|
|
|
|
|
start := time.Now()
|
|
|
|
|
var total, floodWait, unavailable, other int64
|
|
|
|
|
err := a.db.QueryRow(ctx, `
|
|
|
|
|
SELECT
|
|
|
|
|
COUNT(*) FILTER (WHERE last_poll_status = 'error')::bigint,
|
|
|
|
|
COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_flood_wait')::bigint,
|
|
|
|
|
COUNT(*) FILTER (WHERE last_poll_status = 'error' AND last_poll_error_code = 'telegram_channel_unavailable')::bigint,
|
|
|
|
|
COUNT(*) FILTER (
|
|
|
|
|
WHERE last_poll_status = 'error'
|
|
|
|
|
AND COALESCE(last_poll_error_code, '') NOT IN ('telegram_flood_wait', 'telegram_channel_unavailable')
|
|
|
|
|
)::bigint
|
|
|
|
|
FROM channels
|
|
|
|
|
WHERE is_active = true
|
|
|
|
|
AND source_channel_id IS NULL`,
|
|
|
|
|
).Scan(&total, &floodWait, &unavailable, &other)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return componentProbe{Name: "poll_errors", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
|
|
|
|
}
|
|
|
|
|
if total > 0 {
|
|
|
|
|
recent, recentErr := a.recentPollErrors(ctx)
|
|
|
|
|
status := "degraded"
|
|
|
|
|
if other > 0 {
|
|
|
|
|
status = "down"
|
|
|
|
|
}
|
|
|
|
|
details := map[string]any{
|
|
|
|
|
"total": total,
|
|
|
|
|
"flood_wait": floodWait,
|
|
|
|
|
"unavailable": unavailable,
|
|
|
|
|
"other": other,
|
|
|
|
|
}
|
|
|
|
|
if recentErr != nil {
|
|
|
|
|
details["recent_error"] = recentErr.Error()
|
|
|
|
|
} else {
|
|
|
|
|
details["recent_errors"] = recent
|
|
|
|
|
}
|
|
|
|
|
return componentProbe{
|
|
|
|
|
Name: "poll_errors",
|
|
|
|
|
Status: status,
|
|
|
|
|
LatencyMs: time.Since(start).Milliseconds(),
|
|
|
|
|
Error: "total=" + strconv.FormatInt(total, 10) +
|
|
|
|
|
" flood_wait=" + strconv.FormatInt(floodWait, 10) +
|
|
|
|
|
" unavailable=" + strconv.FormatInt(unavailable, 10) +
|
|
|
|
|
" other=" + strconv.FormatInt(other, 10),
|
|
|
|
|
Details: details,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return componentProbe{Name: "poll_errors", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) recentPollErrors(ctx context.Context) ([]map[string]any, error) {
|
|
|
|
|
rows, err := a.db.Query(ctx, `
|
|
|
|
|
SELECT
|
|
|
|
|
c.id,
|
|
|
|
|
c.identifier,
|
|
|
|
|
COALESCE(c.title, '') AS title,
|
|
|
|
|
s.slug,
|
|
|
|
|
s.title AS section_title,
|
|
|
|
|
COALESCE(c.last_poll_error_code, '') AS error_code,
|
|
|
|
|
COALESCE(c.last_poll_error, '') AS error_text,
|
|
|
|
|
c.last_poll_error_at
|
|
|
|
|
FROM channels c
|
|
|
|
|
LEFT JOIN sections s ON s.id = c.section_id
|
|
|
|
|
WHERE c.is_active = true
|
|
|
|
|
AND c.source_channel_id IS NULL
|
|
|
|
|
AND c.last_poll_status = 'error'
|
|
|
|
|
ORDER BY c.last_poll_error_at DESC NULLS LAST, c.id DESC
|
|
|
|
|
LIMIT 5`)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
defer rows.Close()
|
|
|
|
|
|
|
|
|
|
out := make([]map[string]any, 0, 5)
|
|
|
|
|
for rows.Next() {
|
|
|
|
|
var (
|
|
|
|
|
id int64
|
|
|
|
|
identifier string
|
|
|
|
|
title string
|
|
|
|
|
sectionSlug sql.NullString
|
|
|
|
|
sectionTitle sql.NullString
|
|
|
|
|
code string
|
|
|
|
|
text string
|
|
|
|
|
at sql.NullTime
|
|
|
|
|
)
|
|
|
|
|
if err := rows.Scan(&id, &identifier, &title, §ionSlug, §ionTitle, &code, &text, &at); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
out = append(out, map[string]any{
|
|
|
|
|
"channel_id": id,
|
|
|
|
|
"identifier": identifier,
|
|
|
|
|
"title": nullableString(title),
|
|
|
|
|
"section_slug": nullString(sectionSlug),
|
|
|
|
|
"section_title": nullString(sectionTitle),
|
|
|
|
|
"error_code": nullableString(code),
|
|
|
|
|
"error": nullableString(text),
|
|
|
|
|
"error_at": nullTime(at),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return out, rows.Err()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) probeMediaStorage(ctx context.Context) componentProbe {
|
|
|
|
|
start := time.Now()
|
|
|
|
|
if a.minio == nil || a.cfg.MinioBucket == "" {
|
|
|
|
|
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "minio not configured"}
|
|
|
|
|
}
|
|
|
|
|
exists, err := a.minio.BucketExists(ctx, a.cfg.MinioBucket)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
|
|
|
|
}
|
|
|
|
|
if !exists {
|
|
|
|
|
return componentProbe{Name: "media_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "bucket not found: " + a.cfg.MinioBucket}
|
|
|
|
|
}
|
|
|
|
|
return componentProbe{Name: "media_storage", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) probeMediaMetadata(ctx context.Context) componentProbe {
|
|
|
|
|
start := time.Now()
|
|
|
|
|
var withMedia, missingFiles int64
|
|
|
|
|
err := a.db.QueryRow(ctx, `
|
|
|
|
|
SELECT
|
|
|
|
|
COUNT(*) FILTER (WHERE has_media = true)::bigint,
|
|
|
|
|
COUNT(*) FILTER (
|
|
|
|
|
WHERE has_media = true
|
|
|
|
|
AND jsonb_array_length(COALESCE(media_files, '[]'::jsonb)) = 0
|
|
|
|
|
)::bigint
|
|
|
|
|
FROM messages`,
|
|
|
|
|
).Scan(&withMedia, &missingFiles)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return componentProbe{Name: "media_metadata", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
|
|
|
|
}
|
|
|
|
|
if missingFiles > 0 {
|
|
|
|
|
recent, recentErr := a.recentMissingMedia(ctx)
|
|
|
|
|
details := map[string]any{
|
|
|
|
|
"messages_with_media": withMedia,
|
|
|
|
|
"missing_media_files": missingFiles,
|
|
|
|
|
}
|
|
|
|
|
if recentErr != nil {
|
|
|
|
|
details["recent_error"] = recentErr.Error()
|
|
|
|
|
} else {
|
|
|
|
|
details["recent_missing_media"] = recent
|
|
|
|
|
}
|
|
|
|
|
return componentProbe{
|
|
|
|
|
Name: "media_metadata",
|
|
|
|
|
Status: "down",
|
|
|
|
|
LatencyMs: time.Since(start).Milliseconds(),
|
|
|
|
|
Error: "messages_with_media=" + strconv.FormatInt(withMedia, 10) +
|
|
|
|
|
" missing_media_files=" + strconv.FormatInt(missingFiles, 10),
|
|
|
|
|
Details: details,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return componentProbe{Name: "media_metadata", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) recentMissingMedia(ctx context.Context) ([]map[string]any, error) {
|
|
|
|
|
rows, err := a.db.Query(ctx, `
|
|
|
|
|
SELECT
|
|
|
|
|
m.id,
|
|
|
|
|
m.tg_message_id,
|
|
|
|
|
m.date,
|
|
|
|
|
c.id AS channel_id,
|
|
|
|
|
c.identifier,
|
|
|
|
|
COALESCE(c.title, '') AS channel_title,
|
|
|
|
|
s.slug,
|
|
|
|
|
s.title AS section_title
|
|
|
|
|
FROM messages m
|
|
|
|
|
JOIN channels c ON c.id = m.channel_id
|
|
|
|
|
LEFT JOIN sections s ON s.id = c.section_id
|
|
|
|
|
WHERE m.has_media = true
|
|
|
|
|
AND jsonb_array_length(COALESCE(m.media_files, '[]'::jsonb)) = 0
|
|
|
|
|
ORDER BY m.date DESC, m.id DESC
|
|
|
|
|
LIMIT 5`)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
defer rows.Close()
|
|
|
|
|
|
|
|
|
|
out := make([]map[string]any, 0, 5)
|
|
|
|
|
for rows.Next() {
|
|
|
|
|
var (
|
|
|
|
|
id int64
|
|
|
|
|
tgMessageID int64
|
|
|
|
|
date time.Time
|
|
|
|
|
channelID int64
|
|
|
|
|
identifier string
|
|
|
|
|
channelTitle string
|
|
|
|
|
sectionSlug sql.NullString
|
|
|
|
|
sectionTitle sql.NullString
|
|
|
|
|
)
|
|
|
|
|
if err := rows.Scan(&id, &tgMessageID, &date, &channelID, &identifier, &channelTitle, §ionSlug, §ionTitle); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
out = append(out, map[string]any{
|
|
|
|
|
"message_id": id,
|
|
|
|
|
"tg_message_id": tgMessageID,
|
|
|
|
|
"message_date": date,
|
|
|
|
|
"channel_id": channelID,
|
|
|
|
|
"identifier": identifier,
|
|
|
|
|
"channel_title": nullableString(channelTitle),
|
|
|
|
|
"section_slug": nullString(sectionSlug),
|
|
|
|
|
"section_title": nullString(sectionTitle),
|
|
|
|
|
"repair_action": "/api/v1/channels/" + strconv.FormatInt(channelID, 10) + "/backfill-media",
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return out, rows.Err()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *app) handleAccessMe(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
scope := readAccess(r)
|
|
|
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
|
|
|
"is_admin": scope.IsAdmin,
|
|
|
|
|
"can_manage_department": scope.CanManage,
|
|
|
|
|
"can_manage_all": scope.CanManageAll,
|
|
|
|
|
"can_auth_telegram": scope.CanAuth,
|
|
|
|
|
"can_view_all": scope.CanViewAll,
|
|
|
|
|
"department_id": nullableString(scope.DeptID),
|
|
|
|
|
"department_ids": scope.departmentIDs(),
|
|
|
|
|
})
|
|
|
|
|
@@ -289,7 +671,7 @@ func (a *app) listSections(ctx context.Context, w http.ResponseWriter, r *http.R
|
|
|
|
|
|
|
|
|
|
args := []any{vertical}
|
|
|
|
|
deptFilter := ""
|
|
|
|
|
if !scope.IsAdmin {
|
|
|
|
|
if !scope.canReadAll() {
|
|
|
|
|
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -353,6 +735,7 @@ func (a *app) createSection(ctx context.Context, w http.ResponseWriter, r *http.
|
|
|
|
|
}
|
|
|
|
|
var payload struct {
|
|
|
|
|
Vertical string `json:"vertical"`
|
|
|
|
|
DepartmentID *string `json:"department_id"`
|
|
|
|
|
Slug string `json:"slug"`
|
|
|
|
|
Title string `json:"title"`
|
|
|
|
|
Emoji *string `json:"emoji"`
|
|
|
|
|
@@ -368,7 +751,11 @@ func (a *app) createSection(ctx context.Context, w http.ResponseWriter, r *http.
|
|
|
|
|
writeError(w, http.StatusBadRequest, "vertical, slug and title are required")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
dept := nullableString(scope.primaryDepartmentID())
|
|
|
|
|
deptID := scope.primaryDepartmentID()
|
|
|
|
|
if scope.CanManageAll && payload.DepartmentID != nil {
|
|
|
|
|
deptID = strings.TrimSpace(*payload.DepartmentID)
|
|
|
|
|
}
|
|
|
|
|
dept := nullableString(deptID)
|
|
|
|
|
row := a.db.QueryRow(ctx, `
|
|
|
|
|
INSERT INTO sections (vertical, department_id, slug, title, emoji, description)
|
|
|
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
|
|
|
@@ -459,7 +846,7 @@ func (a *app) updateSection(ctx context.Context, w http.ResponseWriter, r *http.
|
|
|
|
|
}
|
|
|
|
|
args = append(args, vertical, slug)
|
|
|
|
|
where := fmt.Sprintf("vertical = $%d AND slug = $%d", len(args)-1, len(args))
|
|
|
|
|
if !scope.IsAdmin {
|
|
|
|
|
if !scope.CanManageAll {
|
|
|
|
|
var deptFilter string
|
|
|
|
|
args, deptFilter = appendDepartmentFilter(args, scope, "department_id")
|
|
|
|
|
where += deptFilter
|
|
|
|
|
@@ -487,7 +874,7 @@ func (a *app) deleteSection(ctx context.Context, w http.ResponseWriter, r *http.
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
section, err := a.findSection(ctx, vertical, slug, scope)
|
|
|
|
|
section, err := a.findSection(ctx, vertical, slug, scope.forManageLookup())
|
|
|
|
|
if err != nil {
|
|
|
|
|
writeDBError(w, err)
|
|
|
|
|
return
|
|
|
|
|
@@ -511,7 +898,7 @@ func (a *app) deleteSection(ctx context.Context, w http.ResponseWriter, r *http.
|
|
|
|
|
func (a *app) findSection(ctx context.Context, vertical, slug string, scope accessScope) (sectionOut, error) {
|
|
|
|
|
args := []any{vertical, slug}
|
|
|
|
|
where := "s.vertical = $1 AND s.slug = $2"
|
|
|
|
|
if !scope.IsAdmin {
|
|
|
|
|
if !scope.canReadAll() {
|
|
|
|
|
var deptFilter string
|
|
|
|
|
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
|
|
|
|
where += deptFilter
|
|
|
|
|
@@ -555,7 +942,7 @@ func (a *app) listChannels(ctx context.Context, w http.ResponseWriter, r *http.R
|
|
|
|
|
args = append(args, section)
|
|
|
|
|
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
|
|
|
|
|
}
|
|
|
|
|
if !scope.IsAdmin {
|
|
|
|
|
if !scope.canReadAll() {
|
|
|
|
|
var deptFilter string
|
|
|
|
|
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
|
|
|
|
where += deptFilter
|
|
|
|
|
@@ -612,7 +999,7 @@ func (a *app) createChannel(ctx context.Context, w http.ResponseWriter, r *http.
|
|
|
|
|
writeError(w, http.StatusBadRequest, "identifier, vertical and section are required")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
section, err := a.findSection(ctx, payload.Vertical, payload.Section, scope)
|
|
|
|
|
section, err := a.findSection(ctx, payload.Vertical, payload.Section, scope.forManageLookup())
|
|
|
|
|
if err != nil {
|
|
|
|
|
writeDBError(w, err)
|
|
|
|
|
return
|
|
|
|
|
@@ -705,7 +1092,7 @@ func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http.
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil {
|
|
|
|
|
if _, err := a.findChannel(ctx, id, scope.forManageLookup(), r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil {
|
|
|
|
|
writeDBError(w, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
@@ -732,7 +1119,7 @@ func (a *app) updateChannel(ctx context.Context, w http.ResponseWriter, r *http.
|
|
|
|
|
if payload.Vertical != nil && strings.TrimSpace(*payload.Vertical) != "" {
|
|
|
|
|
vertical = strings.TrimSpace(*payload.Vertical)
|
|
|
|
|
}
|
|
|
|
|
section, err := a.findSection(ctx, vertical, strings.TrimSpace(*payload.Section), scope)
|
|
|
|
|
section, err := a.findSection(ctx, vertical, strings.TrimSpace(*payload.Section), scope.forManageLookup())
|
|
|
|
|
if err != nil {
|
|
|
|
|
writeDBError(w, err)
|
|
|
|
|
return
|
|
|
|
|
@@ -766,7 +1153,7 @@ func (a *app) deleteChannel(ctx context.Context, w http.ResponseWriter, r *http.
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if _, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil {
|
|
|
|
|
if _, err := a.findChannel(ctx, id, scope.forManageLookup(), r.URL.Query().Get("vertical"), r.URL.Query().Get("section")); err != nil {
|
|
|
|
|
writeDBError(w, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
@@ -788,7 +1175,7 @@ func (a *app) findChannel(ctx context.Context, id int64, scope accessScope, vert
|
|
|
|
|
args = append(args, strings.TrimSpace(section))
|
|
|
|
|
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
|
|
|
|
|
}
|
|
|
|
|
if !scope.IsAdmin {
|
|
|
|
|
if !scope.canReadAll() {
|
|
|
|
|
var deptFilter string
|
|
|
|
|
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
|
|
|
|
where += deptFilter
|
|
|
|
|
@@ -856,7 +1243,7 @@ func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *ht
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
ch, err := a.findChannel(ctx, id, scope, r.URL.Query().Get("vertical"), r.URL.Query().Get("section"))
|
|
|
|
|
ch, err := a.findChannel(ctx, id, scope.forManageLookup(), r.URL.Query().Get("vertical"), r.URL.Query().Get("section"))
|
|
|
|
|
if err != nil {
|
|
|
|
|
writeDBError(w, err)
|
|
|
|
|
return
|
|
|
|
|
@@ -938,7 +1325,7 @@ func (a *app) handleMessages(ctx context.Context, w http.ResponseWriter, r *http
|
|
|
|
|
args = append(args, key, field)
|
|
|
|
|
where += fmt.Sprintf(" AND COALESCE(mc.verdict ->> $%d, m.extracted -> $%d ->> $%d) = 'true'", len(args), len(args)-1, len(args))
|
|
|
|
|
}
|
|
|
|
|
if !scope.IsAdmin {
|
|
|
|
|
if !scope.canReadAll() {
|
|
|
|
|
var deptFilter string
|
|
|
|
|
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
|
|
|
|
where += deptFilter
|
|
|
|
|
@@ -1007,7 +1394,7 @@ func (a *app) handleMessageItem(ctx context.Context, w http.ResponseWriter, r *h
|
|
|
|
|
}
|
|
|
|
|
args := []any{id}
|
|
|
|
|
where := "m.id = $1"
|
|
|
|
|
if !scope.IsAdmin {
|
|
|
|
|
if !scope.canReadAll() {
|
|
|
|
|
var deptFilter string
|
|
|
|
|
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
|
|
|
|
where += deptFilter
|
|
|
|
|
@@ -1130,7 +1517,7 @@ func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channe
|
|
|
|
|
FROM channels c
|
|
|
|
|
JOIN sections s ON s.id = c.section_id
|
|
|
|
|
WHERE c.id = $1 OR c.source_channel_id = $1
|
|
|
|
|
`, channelID, scope.departmentIDs(), scope.IsAdmin).Scan(&allowed)
|
|
|
|
|
`, channelID, scope.departmentIDs(), scope.canReadAll()).Scan(&allowed)
|
|
|
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
|
|
|
return false, nil
|
|
|
|
|
}
|
|
|
|
|
@@ -1160,7 +1547,7 @@ func (a *app) handleStats(ctx context.Context, w http.ResponseWriter, r *http.Re
|
|
|
|
|
args = append(args, section)
|
|
|
|
|
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
|
|
|
|
|
}
|
|
|
|
|
if !scope.IsAdmin {
|
|
|
|
|
if !scope.canReadAll() {
|
|
|
|
|
var deptFilter string
|
|
|
|
|
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
|
|
|
|
where += deptFilter
|
|
|
|
|
@@ -1279,7 +1666,7 @@ func (a *app) pendingLLM(ctx context.Context, scope accessScope, vertical, secti
|
|
|
|
|
args = append(args, section)
|
|
|
|
|
where += fmt.Sprintf(" AND s.slug = $%d", len(args))
|
|
|
|
|
}
|
|
|
|
|
if !scope.IsAdmin {
|
|
|
|
|
if !scope.canReadAll() {
|
|
|
|
|
var deptFilter string
|
|
|
|
|
args, deptFilter = appendDepartmentFilter(args, scope, "s.department_id")
|
|
|
|
|
where += deptFilter
|
|
|
|
|
@@ -1370,7 +1757,7 @@ func (a *app) savePrompt(ctx context.Context, w http.ResponseWriter, r *http.Req
|
|
|
|
|
writeError(w, http.StatusBadRequest, "prompt is too long (max 30000 chars)")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
deptID, err := a.promptDepartmentID(ctx, scope, vertical, section)
|
|
|
|
|
deptID, err := a.promptDepartmentID(ctx, scope.forManageLookup(), vertical, section)
|
|
|
|
|
if err != nil {
|
|
|
|
|
writeDBError(w, err)
|
|
|
|
|
return
|
|
|
|
|
@@ -1398,7 +1785,7 @@ func (a *app) resetPrompt(ctx context.Context, w http.ResponseWriter, r *http.Re
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
section := strings.TrimSpace(r.URL.Query().Get("section"))
|
|
|
|
|
deptID, err := a.promptDepartmentID(ctx, scope, vertical, section)
|
|
|
|
|
deptID, err := a.promptDepartmentID(ctx, scope.forManageLookup(), vertical, section)
|
|
|
|
|
if err != nil {
|
|
|
|
|
writeDBError(w, err)
|
|
|
|
|
return
|
|
|
|
|
@@ -1503,11 +1890,11 @@ func (a *app) readScope(w http.ResponseWriter, r *http.Request, manage bool) (ac
|
|
|
|
|
writeError(w, http.StatusNotFound, "not found")
|
|
|
|
|
return scope, false
|
|
|
|
|
}
|
|
|
|
|
} else if !scope.IsAdmin && len(scope.departmentIDs()) == 0 {
|
|
|
|
|
} else if !scope.canReadAll() && len(scope.departmentIDs()) == 0 {
|
|
|
|
|
writeError(w, http.StatusForbidden, "department is required")
|
|
|
|
|
return scope, false
|
|
|
|
|
}
|
|
|
|
|
if manage && !scope.IsAdmin && len(scope.departmentIDs()) == 0 {
|
|
|
|
|
if manage && !scope.CanManageAll && len(scope.departmentIDs()) == 0 {
|
|
|
|
|
writeError(w, http.StatusForbidden, "department is required")
|
|
|
|
|
return scope, false
|
|
|
|
|
}
|
|
|
|
|
@@ -1515,35 +1902,37 @@ func (a *app) readScope(w http.ResponseWriter, r *http.Request, manage bool) (ac
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func readAccess(r *http.Request) accessScope {
|
|
|
|
|
admin := r.Header.Get("X-User-Is-Admin") == "1"
|
|
|
|
|
deptHead := r.Header.Get("X-User-Is-Department-Head") == "1"
|
|
|
|
|
canManage := r.Header.Get("X-Monitoring-TG-Can-Manage") == "1"
|
|
|
|
|
canAuth := r.Header.Get("X-Monitoring-TG-Can-Auth") == "1"
|
|
|
|
|
admin := commonmw.HeaderBool(r, "X-User-Is-Admin")
|
|
|
|
|
deptHead := commonmw.HeaderBool(r, "X-User-Is-Department-Head")
|
|
|
|
|
canManagePermission := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-Manage")
|
|
|
|
|
canAuth := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-Auth")
|
|
|
|
|
canViewAll := commonmw.HeaderBool(r, "X-Monitoring-TG-Can-View-All")
|
|
|
|
|
deptID := strings.TrimSpace(r.Header.Get("X-User-Department-Id"))
|
|
|
|
|
deptIDs := parseCSVHeader(r.Header.Get("X-User-Department-Ids"))
|
|
|
|
|
deptIDs := commonmw.HeaderCSV(r, "X-User-Department-Ids")
|
|
|
|
|
if deptID != "" {
|
|
|
|
|
deptIDs = appendUniqueString(deptIDs, deptID)
|
|
|
|
|
}
|
|
|
|
|
return accessScope{
|
|
|
|
|
IsAdmin: admin,
|
|
|
|
|
CanManage: admin || deptHead || canManage,
|
|
|
|
|
CanManage: admin || deptHead || canManagePermission,
|
|
|
|
|
CanManageAll: admin || (canManagePermission && canViewAll),
|
|
|
|
|
CanAuth: admin || canAuth,
|
|
|
|
|
CanViewAll: admin || canViewAll,
|
|
|
|
|
DeptID: deptID,
|
|
|
|
|
DeptIDs: deptIDs,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func parseCSVHeader(raw string) []string {
|
|
|
|
|
raw = strings.TrimSpace(raw)
|
|
|
|
|
if raw == "" {
|
|
|
|
|
return nil
|
|
|
|
|
func (s accessScope) canReadAll() bool {
|
|
|
|
|
return s.IsAdmin || s.CanViewAll
|
|
|
|
|
}
|
|
|
|
|
parts := strings.Split(raw, ",")
|
|
|
|
|
out := make([]string, 0, len(parts))
|
|
|
|
|
for _, part := range parts {
|
|
|
|
|
out = appendUniqueString(out, strings.TrimSpace(part))
|
|
|
|
|
|
|
|
|
|
func (s accessScope) forManageLookup() accessScope {
|
|
|
|
|
if s.CanManageAll {
|
|
|
|
|
return s
|
|
|
|
|
}
|
|
|
|
|
return out
|
|
|
|
|
s.CanViewAll = false
|
|
|
|
|
return s
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func appendUniqueString(items []string, value string) []string {
|
|
|
|
|
|