Compare commits

..

3 Commits

Author SHA1 Message Date
Grendgi
a924cd832b Store monitoring TG media in MinIO
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 45s
2026-06-05 16:58:08 +03:00
Grendgi
4ac976b4eb Harden monitoring TG reanalyze reset 2026-06-05 16:35:58 +03:00
Grendgi
fd6fc6b931 Handle scalar TG extracted payloads 2026-06-05 16:31:20 +03:00
11 changed files with 261 additions and 11 deletions

View File

@@ -300,7 +300,12 @@ func (c *classifier) resolvePrompt(ctx context.Context, vertical, departmentID,
func (c *classifier) saveVerdict(ctx context.Context, id int64, key string, verdict json.RawMessage) error { func (c *classifier) saveVerdict(ctx context.Context, id int64, key string, verdict json.RawMessage) error {
_, err := c.db.Exec(ctx, ` _, err := c.db.Exec(ctx, `
UPDATE messages UPDATE messages
SET extracted = jsonb_set(COALESCE(extracted, '{}'::jsonb), ARRAY[$2], $3::jsonb, true) SET extracted = jsonb_set(
CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END,
ARRAY[$2],
$3::jsonb,
true
)
WHERE id = $1 WHERE id = $1
`, id, key, string(verdict)) `, id, key, string(verdict))
return err return err

View File

@@ -3,6 +3,7 @@ package main
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/tls"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"errors" "errors"
@@ -21,6 +22,8 @@ import (
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
) )
const ( const (
@@ -44,6 +47,13 @@ type config struct {
LLMAPIKey string LLMAPIKey string
LLMModel string LLMModel string
LLMTimeout time.Duration LLMTimeout time.Duration
MinioEndpoint string
MinioAccessKey string
MinioSecretKey string
MinioBucket string
MinioUseSSL bool
MinioRegion string
MinioInsecureTLS bool
} }
type app struct { type app struct {
@@ -51,6 +61,7 @@ type app struct {
db *pgxpool.Pool db *pgxpool.Pool
http *http.Client http *http.Client
python *http.Client python *http.Client
minio *minio.Client
} }
type accessScope struct { type accessScope struct {
@@ -126,11 +137,18 @@ func main() {
} }
defer pool.Close() defer pool.Close()
minioClient, err := newMinioClient(cfg)
if err != nil {
slog.Error("minio_init_failed", "error", err)
os.Exit(1)
}
srvApp := &app{ srvApp := &app{
cfg: cfg, cfg: cfg,
db: pool, db: pool,
http: &http.Client{Timeout: cfg.LLMTimeout}, http: &http.Client{Timeout: cfg.LLMTimeout},
python: &http.Client{Timeout: 15 * time.Minute}, python: &http.Client{Timeout: 15 * time.Minute},
minio: minioClient,
} }
server := &http.Server{ server := &http.Server{
@@ -778,7 +796,13 @@ func (a *app) reanalyzeChannel(ctx context.Context, w http.ResponseWriter, r *ht
return return
} }
key := verdictKey(ch.Vertical) key := verdictKey(ch.Vertical)
tag, err := a.db.Exec(ctx, `UPDATE messages SET extracted = COALESCE(extracted, '{}'::jsonb) - $1 WHERE channel_id = $2`, key, id) tag, err := a.db.Exec(ctx, `
UPDATE messages
SET extracted = (
CASE WHEN jsonb_typeof(extracted) = 'object' THEN extracted ELSE '{}'::jsonb END
) - $1
WHERE channel_id = $2
`, key, id)
if err != nil { if err != nil {
writeDBError(w, err) writeDBError(w, err)
return return
@@ -951,6 +975,9 @@ func (a *app) handleMedia(w http.ResponseWriter, r *http.Request, path string) {
writeError(w, http.StatusNotFound, "not found") writeError(w, http.StatusNotFound, "not found")
return return
} }
if a.serveMinioMedia(w, r, clean) {
return
}
base, err := filepath.Abs(a.cfg.MediaDir) base, err := filepath.Abs(a.cfg.MediaDir)
if err != nil { if err != nil {
writeError(w, http.StatusInternalServerError, "media directory unavailable") writeError(w, http.StatusInternalServerError, "media directory unavailable")
@@ -964,6 +991,32 @@ func (a *app) handleMedia(w http.ResponseWriter, r *http.Request, path string) {
http.ServeFile(w, r, full) http.ServeFile(w, r, full)
} }
func (a *app) serveMinioMedia(w http.ResponseWriter, r *http.Request, key string) bool {
if a.minio == nil || a.cfg.MinioBucket == "" {
return false
}
obj, err := a.minio.GetObject(r.Context(), a.cfg.MinioBucket, key, minio.GetObjectOptions{})
if err != nil {
slog.Warn("minio_get_media_failed", "key", key, "error", err)
return false
}
defer obj.Close()
info, err := obj.Stat()
if err != nil {
return false
}
if info.ContentType != "" {
w.Header().Set("Content-Type", info.ContentType)
}
if info.Size > 0 {
w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10))
}
if _, err := io.Copy(w, obj); err != nil {
slog.Warn("minio_media_stream_failed", "key", key, "error", err)
}
return true
}
func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channelID int64) (bool, error) { func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channelID int64) (bool, error) {
var dept sql.NullString var dept sql.NullString
err := a.db.QueryRow(ctx, ` err := a.db.QueryRow(ctx, `
@@ -1663,6 +1716,25 @@ func valueOrEmpty(v *string) string {
return *v return *v
} }
func newMinioClient(cfg config) (*minio.Client, error) {
if cfg.MinioEndpoint == "" || cfg.MinioAccessKey == "" || cfg.MinioSecretKey == "" || cfg.MinioBucket == "" {
return nil, nil
}
endpoint := strings.TrimPrefix(strings.TrimPrefix(cfg.MinioEndpoint, "https://"), "http://")
opts := &minio.Options{
Creds: credentials.NewStaticV4(cfg.MinioAccessKey, cfg.MinioSecretKey, ""),
Secure: cfg.MinioUseSSL,
Region: cfg.MinioRegion,
BucketLookup: minio.BucketLookupPath,
}
if cfg.MinioInsecureTLS {
opts.Transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec // optional intra-cluster MinIO mode
}
}
return minio.New(endpoint, opts)
}
func queryInt(raw string, fallback int) int { func queryInt(raw string, fallback int) int {
if raw == "" { if raw == "" {
return fallback return fallback
@@ -1709,6 +1781,13 @@ func loadConfig() config {
LLMAPIKey: env("LLM_API_KEY", ""), LLMAPIKey: env("LLM_API_KEY", ""),
LLMModel: env("LLM_MODEL", "qwen2.5-14b"), LLMModel: env("LLM_MODEL", "qwen2.5-14b"),
LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second, LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second,
MinioEndpoint: env("MINIO_ENDPOINT", ""),
MinioAccessKey: env("MINIO_ACCESS_KEY", ""),
MinioSecretKey: env("MINIO_SECRET_KEY", ""),
MinioBucket: env("MINIO_BUCKET", "monitoring-tg-media"),
MinioUseSSL: envBool("MINIO_USE_SSL", true),
MinioRegion: env("MINIO_REGION", "us-east-1"),
MinioInsecureTLS: envBool("MINIO_INSECURE_SKIP_VERIFY", false),
} }
} }

27
go.mod
View File

@@ -2,12 +2,33 @@ module monitoring-tg
go 1.25.7 go 1.25.7
require github.com/jackc/pgx/v5 v5.9.1 require (
github.com/jackc/pgx/v5 v5.9.1
github.com/minio/minio-go/v7 v7.2.0
)
require ( require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect
golang.org/x/sync v0.17.0 // indirect github.com/klauspost/compress v1.18.6 // indirect
golang.org/x/text v0.29.0 // indirect github.com/klauspost/cpuid/v2 v2.2.11 // indirect
github.com/klauspost/crc32 v1.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/minio/crc64nvme v1.1.1 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/philhofer/fwd v1.2.0 // indirect
github.com/rs/xid v1.6.0 // indirect
github.com/tinylib/msgp v1.6.1 // indirect
github.com/zeebo/xxh3 v1.1.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.51.0 // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.44.0 // indirect
golang.org/x/text v0.37.0 // indirect
gopkg.in/ini.v1 v1.67.2 // indirect
) )

62
go.sum
View File

@@ -1,6 +1,13 @@
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@@ -9,18 +16,65 @@ github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc=
github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao=
github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/klauspost/crc32 v1.3.0 h1:sSmTt3gUt81RP655XGZPElI0PelVTZ6YwCRnPSupoFM=
github.com/klauspost/crc32 v1.3.0/go.mod h1:D7kQaZhnkX/Y0tstFGf8VUzv2UofNGqCjnC3zdHB0Hw=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/minio/crc64nvme v1.1.1 h1:8dwx/Pz49suywbO+auHCBpCtlW1OfpcLN7wYgVR6wAI=
github.com/minio/crc64nvme v1.1.1/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.2.0 h1:RCJM0R1XOsRs+A3x3UCaf3ZYbByDaLjFeAi+YCQEPhs=
github.com/minio/minio-go/v7 v7.2.0/go.mod h1:EU9hENAStx/xXduNdrGO5e4X5vk19NtgB+RIPjZO8o0=
github.com/philhofer/fwd v1.2.0 h1:e6DnBTl7vGY+Gz322/ASL4Gyp1FspeMvx1RNDoToZuM=
github.com/philhofer/fwd v1.2.0/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= github.com/tinylib/msgp v1.6.1 h1:ESRv8eL3u+DNHUoSAAQRE50Hm162zqAnBoGv9PzScPY=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= github.com/tinylib/msgp v1.6.1/go.mod h1:RSp0LW9oSxFut3KzESt5Voq4GVWyS+PSulT77roAqEA=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/ini.v1 v1.67.2 h1:JtOSMb9OuaCZKr7h5D/h6iii14sK0hLbplTc6frx4Ss=
gopkg.in/ini.v1 v1.67.2/go.mod h1:x/cyOwCgZqOkJoDIJ3c1KNHMo10+nLGAhh+kn3Zizss=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -14,6 +14,11 @@ data:
POSTGRES_DB: "parser" POSTGRES_DB: "parser"
TG_SESSION_PATH: "/data/session/parser.session" TG_SESSION_PATH: "/data/session/parser.session"
MEDIA_DIR: "/data/media" MEDIA_DIR: "/data/media"
MINIO_ENDPOINT: "s3-minio.estateliga.work"
MINIO_BUCKET: "monitoring-tg-media"
MINIO_USE_SSL: "1"
MINIO_INSECURE_SKIP_VERIFY: "0"
MINIO_REGION: "us-east-1"
POLL_INTERVAL_SECONDS: "60" POLL_INTERVAL_SECONDS: "60"
POLL_HISTORY_LIMIT: "50" POLL_HISTORY_LIMIT: "50"
LLM_ENABLED: "1" LLM_ENABLED: "1"

View File

@@ -11,6 +11,8 @@ stringData:
TG_SESSION_STRING: "" TG_SESSION_STRING: ""
POSTGRES_PASSWORD: "parser" POSTGRES_PASSWORD: "parser"
LLM_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32" LLM_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32"
MINIO_ACCESS_KEY: "admjn"
MINIO_SECRET_KEY: "TropicalMacaw9Fantasize"
--- ---
apiVersion: v1 apiVersion: v1
kind: Secret kind: Secret

View File

@@ -27,6 +27,10 @@ spec:
labels: labels:
app: monitoring-tg-server app: monitoring-tg-server
spec: spec:
hostAliases:
- ip: "77.105.173.42"
hostnames:
- "s3-minio.estateliga.work"
terminationGracePeriodSeconds: 20 terminationGracePeriodSeconds: 20
securityContext: securityContext:
fsGroup: 1000 fsGroup: 1000

View File

@@ -14,6 +14,7 @@ dependencies = [
"pydantic>=2.9", "pydantic>=2.9",
"pydantic-settings>=2.6", "pydantic-settings>=2.6",
"structlog>=24.4", "structlog>=24.4",
"minio>=7.2",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@@ -28,6 +28,12 @@ class Settings(BaseSettings):
media_dir: str = Field("/data/media", alias="MEDIA_DIR") media_dir: str = Field("/data/media", alias="MEDIA_DIR")
media_max_bytes: int = Field(20 * 1024 * 1024, alias="MEDIA_MAX_BYTES") media_max_bytes: int = Field(20 * 1024 * 1024, alias="MEDIA_MAX_BYTES")
minio_endpoint: str = Field("", alias="MINIO_ENDPOINT")
minio_access_key: str = Field("", alias="MINIO_ACCESS_KEY")
minio_secret_key: str = Field("", alias="MINIO_SECRET_KEY")
minio_bucket: str = Field("monitoring-tg-media", alias="MINIO_BUCKET")
minio_use_ssl: bool = Field(True, alias="MINIO_USE_SSL")
minio_region: str = Field("us-east-1", alias="MINIO_REGION")
@property @property
def database_url(self) -> str: def database_url(self) -> str:

55
src/parser_bot/storage.py Normal file
View File

@@ -0,0 +1,55 @@
import asyncio
from pathlib import Path
from minio import Minio
from parser_bot.config import settings
_client: Minio | None = None
_bucket_ready = False
def configured() -> bool:
return bool(
settings.minio_endpoint
and settings.minio_access_key
and settings.minio_secret_key
and settings.minio_bucket
)
def client() -> Minio | None:
global _client
if not configured():
return None
if _client is None:
endpoint = settings.minio_endpoint.removeprefix("https://").removeprefix("http://")
_client = Minio(
endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
secure=settings.minio_use_ssl,
region=settings.minio_region,
)
return _client
async def upload_file(path: Path, key: str, content_type: str | None) -> None:
cli = client()
if cli is None:
raise RuntimeError("minio is not configured")
def _upload() -> None:
global _bucket_ready
if not _bucket_ready:
if not cli.bucket_exists(settings.minio_bucket):
cli.make_bucket(settings.minio_bucket, location=settings.minio_region)
_bucket_ready = True
cli.fput_object(
settings.minio_bucket,
key,
str(path),
content_type=content_type or "application/octet-stream",
)
await asyncio.to_thread(_upload)

View File

@@ -15,6 +15,7 @@ from telethon.tl.types import (
) )
from parser_bot.config import settings from parser_bot.config import settings
from parser_bot import storage
log = structlog.get_logger() log = structlog.get_logger()
@@ -136,9 +137,26 @@ async def _download_message_media(
if path is None: if path is None:
info["skipped"] = "no_file" info["skipped"] = "no_file"
return [info] return [info]
filename = Path(path).name file_path = Path(path)
filename = file_path.name
media_key = f"{channel_id}/{filename}"
public_base = settings.public_base_path.rstrip("/") public_base = settings.public_base_path.rstrip("/")
info["url"] = f"{public_base}/media/{channel_id}/{filename}" info["url"] = f"{public_base}/media/{media_key}"
info["key"] = media_key
info["name"] = filename
if storage.configured():
try:
await storage.upload_file(file_path, media_key, mime)
info["storage"] = "minio"
try:
file_path.unlink()
except OSError:
pass
except Exception as exc:
log.warning("media_minio_upload_failed", msg_id=msg.id, key=media_key, error=str(exc))
info["storage"] = "local"
else:
info["storage"] = "local"
return [info] return [info]