diff --git a/app/worker.py b/app/worker.py index 85425fd..88c4560 100644 --- a/app/worker.py +++ b/app/worker.py @@ -10,6 +10,8 @@ import json import sys from typing import Any +from sqlalchemy import text + from app.db import SessionLocal, init_db from app.models import Project from app.services.monitor import ( @@ -131,7 +133,17 @@ def cmd_suggest(payload: dict[str, Any]) -> None: db.close() +def cmd_health(_: dict[str, Any]) -> None: + db = SessionLocal() + try: + db.execute(text("SELECT 1")) + _write({"status": "ok"}) + finally: + db.close() + + COMMANDS = { + "health": cmd_health, "add-listing": cmd_add_listing, "add-listings": cmd_add_listings, "check-project": cmd_check_project, diff --git a/internal/pf/http.go b/internal/pf/http.go index 17f946f..a2721a0 100644 --- a/internal/pf/http.go +++ b/internal/pf/http.go @@ -1,12 +1,14 @@ package pf import ( + "context" "database/sql" "encoding/json" "errors" "net/http" "strconv" "strings" + "time" commonmw "gitea.estateliga.work/admin/portal-common/middleware" ) @@ -23,11 +25,20 @@ type bulkPayload struct { URLs []string `json:"urls"` } +type componentProbe struct { + Name string `json:"name"` + Status string `json:"status"` + LatencyMs int64 `json:"latency_ms"` + Error string `json:"error,omitempty"` +} + func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { path := s.apiPath(r.URL.Path) switch { case path == "/healthz": writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) + case path == "/health/detail": + s.healthDetail(w, r) case path == "/": writeJSON(w, http.StatusOK, map[string]string{"service": "monitoring-pf", "ui": "portal", "api": "go"}) case !strings.HasPrefix(path, "/api/v1"): @@ -72,6 +83,104 @@ func (s Server) apiPath(path string) string { return path } +func (s Server) healthDetail(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + components := []componentProbe{ + s.probeDatabase(ctx), + s.probeWorker(ctx), + s.probeTelegram(ctx), + s.probeScheduler(ctx), + s.probeProjectIntegrity(ctx), + } + writeJSON(w, http.StatusOK, map[string]any{"components": components}) +} + +func (s Server) probeDatabase(ctx context.Context) componentProbe { + start := time.Now() + if err := s.App.DB.PingContext(ctx); err != nil { + return componentProbe{Name: "sqlite", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} + } + return componentProbe{Name: "sqlite", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} +} + +func (s Server) probeWorker(ctx context.Context) componentProbe { + start := time.Now() + if err := s.App.Worker.Health(ctx); err != nil { + return componentProbe{Name: "python_worker", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} + } + return componentProbe{Name: "python_worker", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} +} + +func (s Server) probeTelegram(ctx context.Context) componentProbe { + start := time.Now() + if !s.App.TG.Enabled() { + return componentProbe{Name: "telegram_bot", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "TG_BOT_TOKEN is not configured"} + } + if _, err := s.App.TG.BotUsername(ctx); err != nil { + return componentProbe{Name: "telegram_bot", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} + } + return componentProbe{Name: "telegram_bot", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} +} + +func (s Server) probeScheduler(ctx context.Context) componentProbe { + start := time.Now() + staleAfter := max(1, s.App.Cfg.ScrapeIntervalHours*2) * 3600 + staleModifier := "-" + strconv.Itoa(staleAfter) + " seconds" + var total, neverChecked, stale int64 + err := s.App.DB.QueryRowContext(ctx, ` + SELECT + COUNT(*) AS total, + COALESCE(SUM(CASE WHEN last_checked_at IS NULL THEN 1 ELSE 0 END), 0) AS never_checked, + COALESCE(SUM(CASE WHEN last_checked_at IS NOT NULL AND datetime(last_checked_at) < datetime('now', ?) THEN 1 ELSE 0 END), 0) AS stale + FROM projects`, + staleModifier, + ).Scan(&total, &neverChecked, &stale) + if err != nil { + return componentProbe{Name: "scheduler", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} + } + if total > 0 && (neverChecked > 0 || stale > 0) { + return componentProbe{ + Name: "scheduler", + Status: "down", + LatencyMs: time.Since(start).Milliseconds(), + Error: "projects=" + strconv.FormatInt(total, 10) + + " never_checked=" + strconv.FormatInt(neverChecked, 10) + + " stale=" + strconv.FormatInt(stale, 10) + + " stale_after_sec=" + strconv.Itoa(staleAfter), + } + } + return componentProbe{Name: "scheduler", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} +} + +func (s Server) probeProjectIntegrity(ctx context.Context) componentProbe { + start := time.Now() + var projects, missingOwner, autoWithoutPermit, searchURLs int64 + err := s.App.DB.QueryRowContext(ctx, ` + SELECT + (SELECT COUNT(*) FROM projects), + (SELECT COUNT(*) FROM projects p LEFT JOIN employees e ON e.id = p.owner_id WHERE e.id IS NULL), + (SELECT COUNT(*) FROM competitor_listings WHERE auto_discovered = 1 AND (permit_number IS NULL OR trim(permit_number) = '')), + (SELECT COUNT(*) FROM competitor_listings WHERE url LIKE '%/search%' OR url LIKE '%?%' AND external_id = '') + `).Scan(&projects, &missingOwner, &autoWithoutPermit, &searchURLs) + if err != nil { + return componentProbe{Name: "project_integrity", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()} + } + if missingOwner > 0 || autoWithoutPermit > 0 || searchURLs > 0 { + return componentProbe{ + Name: "project_integrity", + Status: "down", + LatencyMs: time.Since(start).Milliseconds(), + Error: "projects=" + strconv.FormatInt(projects, 10) + + " missing_owner=" + strconv.FormatInt(missingOwner, 10) + + " auto_without_permit=" + strconv.FormatInt(autoWithoutPermit, 10) + + " suspicious_search_urls=" + strconv.FormatInt(searchURLs, 10), + } + } + return componentProbe{Name: "project_integrity", Status: "ok", LatencyMs: time.Since(start).Milliseconds()} +} + func (s Server) accessMe(w http.ResponseWriter, r *http.Request) { portalID := portalUserID(r) emp, err := s.App.CurrentEmployee(r.Context(), portalID, false) diff --git a/internal/pf/worker.go b/internal/pf/worker.go index b186faf..752ee9b 100644 --- a/internal/pf/worker.go +++ b/internal/pf/worker.go @@ -34,6 +34,10 @@ type CheckResult struct { Changes int `json:"changes"` } +type HealthResult struct { + Status string `json:"status"` +} + type Suggestion struct { Source string `json:"source"` ExternalID string `json:"external_id"` @@ -104,6 +108,17 @@ func (w *Worker) Suggest(ctx context.Context, projectID int64) (*SuggestionsResp return &out, nil } +func (w *Worker) Health(ctx context.Context) error { + var out HealthResult + if err := w.call(ctx, "health", map[string]any{}, &out); err != nil { + return err + } + if out.Status != "ok" { + return fmt.Errorf("worker status=%s", out.Status) + } + return nil +} + func (w *Worker) call(ctx context.Context, command string, payload any, out any) error { ctx, cancel := context.WithTimeout(ctx, 15*time.Minute) defer cancel()