From a924cd832bccffb4a927ae4c2380a244937331b1 Mon Sep 17 00:00:00 2001 From: Grendgi Date: Fri, 5 Jun 2026 16:58:08 +0300 Subject: [PATCH] Store monitoring TG media in MinIO --- cmd/server/main.go | 73 +++++++++++++++++++++++++++++++ go.mod | 27 ++++++++++-- go.sum | 62 ++++++++++++++++++++++++-- k8s/configmap.yaml | 5 +++ k8s/secrets.yaml | 2 + k8s/server-deployment.yaml | 4 ++ pyproject.toml | 1 + src/parser_bot/config.py | 6 +++ src/parser_bot/storage.py | 55 +++++++++++++++++++++++ src/parser_bot/telegram/client.py | 22 +++++++++- 10 files changed, 248 insertions(+), 9 deletions(-) create mode 100644 src/parser_bot/storage.py diff --git a/cmd/server/main.go b/cmd/server/main.go index 1395381..d500f38 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -3,6 +3,7 @@ package main import ( "bytes" "context" + "crypto/tls" "database/sql" "encoding/json" "errors" @@ -21,6 +22,8 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" ) const ( @@ -44,6 +47,13 @@ type config struct { LLMAPIKey string LLMModel string LLMTimeout time.Duration + MinioEndpoint string + MinioAccessKey string + MinioSecretKey string + MinioBucket string + MinioUseSSL bool + MinioRegion string + MinioInsecureTLS bool } type app struct { @@ -51,6 +61,7 @@ type app struct { db *pgxpool.Pool http *http.Client python *http.Client + minio *minio.Client } type accessScope struct { @@ -126,11 +137,18 @@ func main() { } defer pool.Close() + minioClient, err := newMinioClient(cfg) + if err != nil { + slog.Error("minio_init_failed", "error", err) + os.Exit(1) + } + srvApp := &app{ cfg: cfg, db: pool, http: &http.Client{Timeout: cfg.LLMTimeout}, python: &http.Client{Timeout: 15 * time.Minute}, + minio: minioClient, } server := &http.Server{ @@ -957,6 +975,9 @@ func (a *app) handleMedia(w http.ResponseWriter, r *http.Request, path string) { writeError(w, http.StatusNotFound, "not found") return } + if a.serveMinioMedia(w, r, clean) { + return + } base, err := filepath.Abs(a.cfg.MediaDir) if err != nil { writeError(w, http.StatusInternalServerError, "media directory unavailable") @@ -970,6 +991,32 @@ func (a *app) handleMedia(w http.ResponseWriter, r *http.Request, path string) { http.ServeFile(w, r, full) } +func (a *app) serveMinioMedia(w http.ResponseWriter, r *http.Request, key string) bool { + if a.minio == nil || a.cfg.MinioBucket == "" { + return false + } + obj, err := a.minio.GetObject(r.Context(), a.cfg.MinioBucket, key, minio.GetObjectOptions{}) + if err != nil { + slog.Warn("minio_get_media_failed", "key", key, "error", err) + return false + } + defer obj.Close() + info, err := obj.Stat() + if err != nil { + return false + } + if info.ContentType != "" { + w.Header().Set("Content-Type", info.ContentType) + } + if info.Size > 0 { + w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10)) + } + if _, err := io.Copy(w, obj); err != nil { + slog.Warn("minio_media_stream_failed", "key", key, "error", err) + } + return true +} + func (a *app) canReadChannelMedia(ctx context.Context, scope accessScope, channelID int64) (bool, error) { var dept sql.NullString err := a.db.QueryRow(ctx, ` @@ -1669,6 +1716,25 @@ func valueOrEmpty(v *string) string { return *v } +func newMinioClient(cfg config) (*minio.Client, error) { + if cfg.MinioEndpoint == "" || cfg.MinioAccessKey == "" || cfg.MinioSecretKey == "" || cfg.MinioBucket == "" { + return nil, nil + } + endpoint := strings.TrimPrefix(strings.TrimPrefix(cfg.MinioEndpoint, "https://"), "http://") + opts := &minio.Options{ + Creds: credentials.NewStaticV4(cfg.MinioAccessKey, cfg.MinioSecretKey, ""), + Secure: cfg.MinioUseSSL, + Region: cfg.MinioRegion, + BucketLookup: minio.BucketLookupPath, + } + if cfg.MinioInsecureTLS { + opts.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec // optional intra-cluster MinIO mode + } + } + return minio.New(endpoint, opts) +} + func queryInt(raw string, fallback int) int { if raw == "" { return fallback @@ -1715,6 +1781,13 @@ func loadConfig() config { LLMAPIKey: env("LLM_API_KEY", ""), LLMModel: env("LLM_MODEL", "qwen2.5-14b"), LLMTimeout: time.Duration(envInt("LLM_TIMEOUT_SECONDS", 120)) * time.Second, + MinioEndpoint: env("MINIO_ENDPOINT", ""), + MinioAccessKey: env("MINIO_ACCESS_KEY", ""), + MinioSecretKey: env("MINIO_SECRET_KEY", ""), + MinioBucket: env("MINIO_BUCKET", "monitoring-tg-media"), + MinioUseSSL: envBool("MINIO_USE_SSL", true), + MinioRegion: env("MINIO_REGION", "us-east-1"), + MinioInsecureTLS: envBool("MINIO_INSECURE_SKIP_VERIFY", false), } } diff --git a/go.mod b/go.mod index f88e5f3..dde352b 100644 --- a/go.mod +++ b/go.mod @@ -2,12 +2,33 @@ module monitoring-tg go 1.25.7 -require github.com/jackc/pgx/v5 v5.9.1 +require ( + github.com/jackc/pgx/v5 v5.9.1 + github.com/minio/minio-go/v7 v7.2.0 +) require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - golang.org/x/sync v0.17.0 // indirect - golang.org/x/text v0.29.0 // indirect + github.com/klauspost/compress v1.18.6 // indirect + github.com/klauspost/cpuid/v2 v2.2.11 // indirect + github.com/klauspost/crc32 v1.3.0 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/minio/crc64nvme v1.1.1 // indirect + github.com/minio/md5-simd v1.1.2 // indirect + github.com/philhofer/fwd v1.2.0 // indirect + github.com/rs/xid v1.6.0 // indirect + github.com/tinylib/msgp v1.6.1 // indirect + github.com/zeebo/xxh3 v1.1.0 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/crypto v0.51.0 // indirect + golang.org/x/net v0.53.0 // indirect + golang.org/x/sync v0.20.0 // indirect + golang.org/x/sys v0.44.0 // indirect + golang.org/x/text v0.37.0 // indirect + gopkg.in/ini.v1 v1.67.2 // indirect ) diff --git a/go.sum b/go.sum index 8e29ab9..77992d7 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,13 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -9,18 +16,65 @@ github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc= github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao= +github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= +github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU= +github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/klauspost/crc32 v1.3.0 h1:sSmTt3gUt81RP655XGZPElI0PelVTZ6YwCRnPSupoFM= +github.com/klauspost/crc32 v1.3.0/go.mod h1:D7kQaZhnkX/Y0tstFGf8VUzv2UofNGqCjnC3zdHB0Hw= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/minio/crc64nvme v1.1.1 h1:8dwx/Pz49suywbO+auHCBpCtlW1OfpcLN7wYgVR6wAI= +github.com/minio/crc64nvme v1.1.1/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg= +github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= +github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= +github.com/minio/minio-go/v7 v7.2.0 h1:RCJM0R1XOsRs+A3x3UCaf3ZYbByDaLjFeAi+YCQEPhs= +github.com/minio/minio-go/v7 v7.2.0/go.mod h1:EU9hENAStx/xXduNdrGO5e4X5vk19NtgB+RIPjZO8o0= +github.com/philhofer/fwd v1.2.0 h1:e6DnBTl7vGY+Gz322/ASL4Gyp1FspeMvx1RNDoToZuM= +github.com/philhofer/fwd v1.2.0/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= -golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= -golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +github.com/tinylib/msgp v1.6.1 h1:ESRv8eL3u+DNHUoSAAQRE50Hm162zqAnBoGv9PzScPY= +github.com/tinylib/msgp v1.6.1/go.mod h1:RSp0LW9oSxFut3KzESt5Voq4GVWyS+PSulT77roAqEA= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= +github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI= +golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8= +golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= +golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= +golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ= +golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= +golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/ini.v1 v1.67.2 h1:JtOSMb9OuaCZKr7h5D/h6iii14sK0hLbplTc6frx4Ss= +gopkg.in/ini.v1 v1.67.2/go.mod h1:x/cyOwCgZqOkJoDIJ3c1KNHMo10+nLGAhh+kn3Zizss= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/k8s/configmap.yaml b/k8s/configmap.yaml index 2764db0..db0e551 100644 --- a/k8s/configmap.yaml +++ b/k8s/configmap.yaml @@ -14,6 +14,11 @@ data: POSTGRES_DB: "parser" TG_SESSION_PATH: "/data/session/parser.session" MEDIA_DIR: "/data/media" + MINIO_ENDPOINT: "s3-minio.estateliga.work" + MINIO_BUCKET: "monitoring-tg-media" + MINIO_USE_SSL: "1" + MINIO_INSECURE_SKIP_VERIFY: "0" + MINIO_REGION: "us-east-1" POLL_INTERVAL_SECONDS: "60" POLL_HISTORY_LIMIT: "50" LLM_ENABLED: "1" diff --git a/k8s/secrets.yaml b/k8s/secrets.yaml index 481408e..cafcfaf 100644 --- a/k8s/secrets.yaml +++ b/k8s/secrets.yaml @@ -11,6 +11,8 @@ stringData: TG_SESSION_STRING: "" POSTGRES_PASSWORD: "parser" LLM_API_KEY: "sk-111f838ccec43406e078cd9094b6797307cb895236179f32" + MINIO_ACCESS_KEY: "admjn" + MINIO_SECRET_KEY: "TropicalMacaw9Fantasize" --- apiVersion: v1 kind: Secret diff --git a/k8s/server-deployment.yaml b/k8s/server-deployment.yaml index c53bda4..96d8d11 100644 --- a/k8s/server-deployment.yaml +++ b/k8s/server-deployment.yaml @@ -27,6 +27,10 @@ spec: labels: app: monitoring-tg-server spec: + hostAliases: + - ip: "77.105.173.42" + hostnames: + - "s3-minio.estateliga.work" terminationGracePeriodSeconds: 20 securityContext: fsGroup: 1000 diff --git a/pyproject.toml b/pyproject.toml index 0c3a479..5b388a7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "pydantic>=2.9", "pydantic-settings>=2.6", "structlog>=24.4", + "minio>=7.2", ] [project.optional-dependencies] diff --git a/src/parser_bot/config.py b/src/parser_bot/config.py index 1322583..fe37834 100644 --- a/src/parser_bot/config.py +++ b/src/parser_bot/config.py @@ -28,6 +28,12 @@ class Settings(BaseSettings): media_dir: str = Field("/data/media", alias="MEDIA_DIR") media_max_bytes: int = Field(20 * 1024 * 1024, alias="MEDIA_MAX_BYTES") + minio_endpoint: str = Field("", alias="MINIO_ENDPOINT") + minio_access_key: str = Field("", alias="MINIO_ACCESS_KEY") + minio_secret_key: str = Field("", alias="MINIO_SECRET_KEY") + minio_bucket: str = Field("monitoring-tg-media", alias="MINIO_BUCKET") + minio_use_ssl: bool = Field(True, alias="MINIO_USE_SSL") + minio_region: str = Field("us-east-1", alias="MINIO_REGION") @property def database_url(self) -> str: diff --git a/src/parser_bot/storage.py b/src/parser_bot/storage.py new file mode 100644 index 0000000..c9015b1 --- /dev/null +++ b/src/parser_bot/storage.py @@ -0,0 +1,55 @@ +import asyncio +from pathlib import Path + +from minio import Minio + +from parser_bot.config import settings + +_client: Minio | None = None +_bucket_ready = False + + +def configured() -> bool: + return bool( + settings.minio_endpoint + and settings.minio_access_key + and settings.minio_secret_key + and settings.minio_bucket + ) + + +def client() -> Minio | None: + global _client + if not configured(): + return None + if _client is None: + endpoint = settings.minio_endpoint.removeprefix("https://").removeprefix("http://") + _client = Minio( + endpoint, + access_key=settings.minio_access_key, + secret_key=settings.minio_secret_key, + secure=settings.minio_use_ssl, + region=settings.minio_region, + ) + return _client + + +async def upload_file(path: Path, key: str, content_type: str | None) -> None: + cli = client() + if cli is None: + raise RuntimeError("minio is not configured") + + def _upload() -> None: + global _bucket_ready + if not _bucket_ready: + if not cli.bucket_exists(settings.minio_bucket): + cli.make_bucket(settings.minio_bucket, location=settings.minio_region) + _bucket_ready = True + cli.fput_object( + settings.minio_bucket, + key, + str(path), + content_type=content_type or "application/octet-stream", + ) + + await asyncio.to_thread(_upload) diff --git a/src/parser_bot/telegram/client.py b/src/parser_bot/telegram/client.py index 31b6332..d8e390b 100644 --- a/src/parser_bot/telegram/client.py +++ b/src/parser_bot/telegram/client.py @@ -15,6 +15,7 @@ from telethon.tl.types import ( ) from parser_bot.config import settings +from parser_bot import storage log = structlog.get_logger() @@ -136,9 +137,26 @@ async def _download_message_media( if path is None: info["skipped"] = "no_file" return [info] - filename = Path(path).name + file_path = Path(path) + filename = file_path.name + media_key = f"{channel_id}/{filename}" public_base = settings.public_base_path.rstrip("/") - info["url"] = f"{public_base}/media/{channel_id}/{filename}" + info["url"] = f"{public_base}/media/{media_key}" + info["key"] = media_key + info["name"] = filename + if storage.configured(): + try: + await storage.upload_file(file_path, media_key, mime) + info["storage"] = "minio" + try: + file_path.unlink() + except OSError: + pass + except Exception as exc: + log.warning("media_minio_upload_failed", msg_id=msg.id, key=media_key, error=str(exc)) + info["storage"] = "local" + else: + info["storage"] = "local" return [info]