Store monitoring TG media in MinIO
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 45s
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 45s
This commit is contained in:
@@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
@@ -21,6 +22,8 @@ import (
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -44,6 +47,13 @@ type config struct {
|
||||
LLMAPIKey string
|
||||
LLMModel string
|
||||
LLMTimeout time.Duration
|
||||
MinioEndpoint string
|
||||
MinioAccessKey string
|
||||
MinioSecretKey string
|
||||
MinioBucket string
|
||||
MinioUseSSL bool
|
||||
MinioRegion string
|
||||
MinioInsecureTLS bool
|
||||
}
|
||||
|
||||
type app struct {
|
||||
@@ -51,6 +61,7 @@ type app struct {
|
||||
db *pgxpool.Pool
|
||||
http *http.Client
|
||||
python *http.Client
|
||||
minio *minio.Client
|
||||
}
|
||||
|
||||
type accessScope struct {
|
||||
@@ -126,11 +137,18 @@ func main() {
|
||||
}
|
||||
defer pool.Close()
|
||||
|
||||
minioClient, err := newMinioClient(cfg)
|
||||
if err != nil {
|
||||
slog.Error("minio_init_failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
srvApp := &app{
|
||||
cfg: cfg,
|
||||
db: pool,
|
||||
http: &http.Client{Timeout: cfg.LLMTimeout},
|
||||
python: &http.Client{Timeout: 15 * time.Minute},
|
||||
minio: minioClient,
|
||||
}
|
||||
|
||||
server := &http.Server{
|
||||
@@ -957,6 +975,9 @@ func (a *app) handleMedia(w http.ResponseWriter, r *http.Request, path string) {
|
||||
writeError(w, http.StatusNotFound, "not found")
|
||||
return
|
||||
}
|
||||
if a.serveMinioMedia(w, r, clean) {
|
||||
return
|
||||
}
|
||||
base, err := filepath.Abs(a.cfg.MediaDir)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "media directory unavailable")
|
||||
@@ -970,6 +991,32 @@ func (a *app) handleMedia(w http.ResponseWriter, r *http.Request, path string) {
|
||||
http.ServeFile(w, r, full)
|
||||
}
|
||||
|
||||
func (a *app) serveMinioMedia(w http.ResponseWriter, r *http.Request, key string) bool {
|
||||
if a.minio == nil || a.cfg.MinioBucket == "" {
|
||||
return false
|
||||
}
|
||||
obj, err := a.minio.GetObject(r.Context(), a.cfg.MinioBucket, key, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
slog.Warn("minio_get_media_failed", "key", key, "error", err)
|
||||
return false
|
||||
}
|
||||
defer obj.Close()
|
||||
info, err := obj.Stat()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if info.ContentType != "" {
|
||||
w.Header().Set("Content-Type", info.ContentType)
|
||||
}
|
||||
if info.Size > 0 {
|
||||
w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10))
|
||||
}
|
||||
if _, err := io.Copy(w, obj); err != nil {
|
||||
slog.Warn("minio_media_stream_failed", "key", key, "error", err)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channelID int64) (bool, error) {
|
||||
var dept sql.NullString
|
||||
err := a.db.QueryRow(ctx, `
|
||||
@@ -1669,6 +1716,25 @@ func valueOrEmpty(v *string) string {
|
||||
return *v
|
||||
}
|
||||
|
||||
func newMinioClient(cfg config) (*minio.Client, error) {
|
||||
if cfg.MinioEndpoint == "" || cfg.MinioAccessKey == "" || cfg.MinioSecretKey == "" || cfg.MinioBucket == "" {
|
||||
return nil, nil
|
||||
}
|
||||
endpoint := strings.TrimPrefix(strings.TrimPrefix(cfg.MinioEndpoint, "https://"), "http://")
|
||||
opts := &minio.Options{
|
||||
Creds: credentials.NewStaticV4(cfg.MinioAccessKey, cfg.MinioSecretKey, ""),
|
||||
Secure: cfg.MinioUseSSL,
|
||||
Region: cfg.MinioRegion,
|
||||
BucketLookup: minio.BucketLookupPath,
|
||||
}
|
||||
if cfg.MinioInsecureTLS {
|
||||
opts.Transport = &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec // optional intra-cluster MinIO mode
|
||||
}
|
||||
}
|
||||
return minio.New(endpoint, opts)
|
||||
}
|
||||
|
||||
func queryInt(raw string, fallback int) int {
|
||||
if raw == "" {
|
||||
return fallback
|
||||
@@ -1715,6 +1781,13 @@ func loadConfig() config {
|
||||
LLMAPIKey: env("LLM_API_KEY", ""),
|
||||
LLMModel: env("LLM_MODEL", "qwen2.5-14b"),
|
||||
LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second,
|
||||
MinioEndpoint: env("MINIO_ENDPOINT", ""),
|
||||
MinioAccessKey: env("MINIO_ACCESS_KEY", ""),
|
||||
MinioSecretKey: env("MINIO_SECRET_KEY", ""),
|
||||
MinioBucket: env("MINIO_BUCKET", "monitoring-tg-media"),
|
||||
MinioUseSSL: envBool("MINIO_USE_SSL", true),
|
||||
MinioRegion: env("MINIO_REGION", "us-east-1"),
|
||||
MinioInsecureTLS: envBool("MINIO_INSECURE_SKIP_VERIFY", false),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user