feat: expose files storage health detail
This commit is contained in:
@@ -55,7 +55,7 @@ func main() {
|
||||
slog.Warn("ensure bucket failed", "error", err)
|
||||
}
|
||||
|
||||
healthH := handler.NewHealthHandler(pool)
|
||||
healthH := handler.NewHealthHandler(pool, store)
|
||||
nodeRepo := repository.NewNodeRepository(pool)
|
||||
nodeH := handler.NewNodeHandler(cfg, nodeRepo, store)
|
||||
go runTrashPurger(ctx, nodeRepo, store)
|
||||
@@ -67,6 +67,7 @@ func main() {
|
||||
|
||||
r.Get("/healthz", healthH.Healthz)
|
||||
r.Get("/readyz", healthH.Readyz)
|
||||
r.Get("/health/detail", healthH.Detail)
|
||||
|
||||
mountFilesAPI := func(r chi.Router) {
|
||||
r.Get("/nodes", nodeH.List)
|
||||
|
||||
@@ -1,17 +1,24 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"files-service/internal/storage"
|
||||
)
|
||||
|
||||
type HealthHandler struct {
|
||||
pool *pgxpool.Pool
|
||||
pool *pgxpool.Pool
|
||||
store *storage.Storage
|
||||
}
|
||||
|
||||
func NewHealthHandler(pool *pgxpool.Pool) *HealthHandler {
|
||||
return &HealthHandler{pool: pool}
|
||||
func NewHealthHandler(pool *pgxpool.Pool, store *storage.Storage) *HealthHandler {
|
||||
return &HealthHandler{pool: pool, store: store}
|
||||
}
|
||||
|
||||
func (h *HealthHandler) Healthz(w http.ResponseWriter, _ *http.Request) {
|
||||
@@ -25,3 +32,75 @@ func (h *HealthHandler) Readyz(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ready"})
|
||||
}
|
||||
|
||||
func (h *HealthHandler) Detail(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
components := []componentProbe{
|
||||
h.probePostgres(ctx),
|
||||
h.probeStorage(ctx),
|
||||
h.probeTrash(ctx),
|
||||
h.probeStorageMetadata(ctx),
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{"components": components})
|
||||
}
|
||||
|
||||
type componentProbe struct {
|
||||
Name string `json:"name"`
|
||||
Status string `json:"status"`
|
||||
LatencyMs int64 `json:"latency_ms"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
func (h *HealthHandler) probePostgres(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
if err := h.pool.Ping(ctx); err != nil {
|
||||
return componentProbe{Name: "postgres", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||
}
|
||||
return componentProbe{Name: "postgres", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||
}
|
||||
|
||||
func (h *HealthHandler) probeStorage(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
if err := h.store.Check(ctx); err != nil {
|
||||
return componentProbe{Name: "minio_storage", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||
}
|
||||
return componentProbe{Name: "minio_storage", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||
}
|
||||
|
||||
func (h *HealthHandler) probeTrash(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
var due int
|
||||
if err := h.pool.QueryRow(ctx, `
|
||||
SELECT COUNT(*)::int
|
||||
FROM files_nodes
|
||||
WHERE purge_after IS NOT NULL
|
||||
AND purge_after <= now()
|
||||
`).Scan(&due); err != nil {
|
||||
return componentProbe{Name: "trash_purger", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||
}
|
||||
if due > 0 {
|
||||
return componentProbe{Name: "trash_purger", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: "purge_due=" + strconv.Itoa(due)}
|
||||
}
|
||||
return componentProbe{Name: "trash_purger", Status: "ok", LatencyMs: time.Since(start).Milliseconds()}
|
||||
}
|
||||
|
||||
func (h *HealthHandler) probeStorageMetadata(ctx context.Context) componentProbe {
|
||||
start := time.Now()
|
||||
var activeObjects int
|
||||
if err := h.pool.QueryRow(ctx, `
|
||||
SELECT COUNT(*)::int
|
||||
FROM files_nodes
|
||||
WHERE deleted_at IS NULL
|
||||
AND storage_key IS NOT NULL
|
||||
`).Scan(&activeObjects); err != nil {
|
||||
return componentProbe{Name: "storage_metadata", Status: "down", LatencyMs: time.Since(start).Milliseconds(), Error: err.Error()}
|
||||
}
|
||||
return componentProbe{
|
||||
Name: "storage_metadata",
|
||||
Status: "ok",
|
||||
LatencyMs: time.Since(start).Milliseconds(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,6 +66,20 @@ func (s *Storage) EnsureBucket(ctx context.Context) error {
|
||||
return s.client.MakeBucket(ctx, s.cfg.Bucket, minio.MakeBucketOptions{})
|
||||
}
|
||||
|
||||
func (s *Storage) Check(ctx context.Context) error {
|
||||
if !s.Configured() {
|
||||
return errors.New("storage not configured")
|
||||
}
|
||||
exists, err := s.client.BucketExists(ctx, s.cfg.Bucket)
|
||||
if err != nil {
|
||||
return fmt.Errorf("check bucket: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
return fmt.Errorf("bucket %q does not exist", s.cfg.Bucket)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GenerateKey(ownerID, filename string) string {
|
||||
ext := strings.ToLower(strings.TrimPrefix(filepath.Ext(filename), "."))
|
||||
if !AllowedExtension(ext) {
|
||||
|
||||
Reference in New Issue
Block a user