Files
files/internal/repository/node.go
Grendgi c831d2c7c6
All checks were successful
CI / hygiene (push) Successful in 2s
Build and Deploy / build-and-deploy (push) Successful in 34s
CI / test (push) Successful in 19s
feat: add visible trash restore for files
2026-06-16 15:48:09 +03:00

512 lines
17 KiB
Go

package repository
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"strings"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"files-service/internal/model"
)
var ErrNotFound = errors.New("not found")
type NodeRepository struct {
pool *pgxpool.Pool
}
func NewNodeRepository(pool *pgxpool.Pool) *NodeRepository {
return &NodeRepository{pool: pool}
}
func scanNode(scan func(dest ...any) error) (*model.Node, error) {
var n model.Node
err := scan(
&n.ID, &n.ParentID, &n.NodeType, &n.Title, &n.OwnerUserID, &n.OwnerDepartmentID,
&n.CreatedBy, &n.UpdatedBy, &n.StorageKey, &n.OriginalFilename, &n.MimeType,
&n.Extension, &n.SizeBytes, &n.OfficeFormat, &n.ExternalURL, &n.Version,
&n.EffectiveAccess, &n.CreatedAt, &n.UpdatedAt, &n.TrashedAt, &n.PurgeAfter, &n.DeletedAt,
)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
return &n, err
}
func (r *NodeRepository) List(ctx context.Context, userID string, subordinateIDs []string, scope string, parentID *string) ([]model.Node, error) {
args := []any{userID, subordinateIDs}
where := []string{"n.deleted_at IS NULL"}
if scope == "trash" {
query := `
SELECT n.id, n.parent_id, n.node_type, n.title, n.owner_user_id, n.owner_department_id,
n.created_by, n.updated_by, n.storage_key, n.original_filename, n.mime_type,
n.extension, n.size_bytes, n.office_format, n.external_url, n.version,
'edit' AS effective_access,
n.created_at, n.updated_at, n.trashed_at, n.purge_after, n.deleted_at
FROM files_nodes n
LEFT JOIN files_nodes p ON p.id = n.parent_id
WHERE n.deleted_at IS NOT NULL
AND (n.parent_id IS NULL OR p.deleted_at IS NULL)
AND (
n.owner_user_id = $1
OR n.owner_user_id::text = ANY($2::text[])
OR EXISTS (
SELECT 1 FROM files_access a
WHERE a.node_id = n.id
AND a.user_id = $1
AND a.access_level = 'edit'
)
)
ORDER BY n.trashed_at DESC NULLS LAST, lower(n.title)`
rows, err := r.pool.Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]model.Node, 0)
for rows.Next() {
n, err := scanNode(rows.Scan)
if err != nil {
return nil, err
}
out = append(out, *n)
}
return out, rows.Err()
}
if parentID == nil || *parentID == "" {
where = append(where, "n.parent_id IS NULL")
} else {
args = append(args, *parentID)
where = append(where, fmt.Sprintf("n.parent_id = $%d", len(args)))
}
switch scope {
case "shared":
if parentID == nil || *parentID == "" {
where = append(where, "n.owner_user_id <> $1")
}
where = append(where, `(
has_node_access(n.id, $1)
OR n.owner_user_id::text = ANY($2::text[])
)`)
default:
where = append(where, "n.owner_user_id = $1")
}
query := `
SELECT n.id, n.parent_id, n.node_type, n.title, n.owner_user_id, n.owner_department_id,
n.created_by, n.updated_by, n.storage_key, n.original_filename, n.mime_type,
n.extension, n.size_bytes, n.office_format, n.external_url, n.version,
effective_node_access(n.id, $1, $2::text[]),
n.created_at, n.updated_at, n.trashed_at, n.purge_after, n.deleted_at
FROM files_nodes n
WHERE ` + strings.Join(where, " AND ") + `
ORDER BY CASE WHEN n.node_type = 'folder' THEN 0 ELSE 1 END, lower(n.title), n.created_at DESC`
rows, err := r.pool.Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]model.Node, 0)
for rows.Next() {
n, err := scanNode(rows.Scan)
if err != nil {
return nil, err
}
out = append(out, *n)
}
return out, rows.Err()
}
func (r *NodeRepository) ListChildrenForPublic(ctx context.Context, parentID string) ([]model.Node, error) {
rows, err := r.pool.Query(ctx, `
SELECT n.id, n.parent_id, n.node_type, n.title, n.owner_user_id, n.owner_department_id,
n.created_by, n.updated_by, n.storage_key, n.original_filename, n.mime_type,
n.extension, n.size_bytes, n.office_format, n.external_url, n.version,
'view' AS effective_access,
n.created_at, n.updated_at, n.trashed_at, n.purge_after, n.deleted_at
FROM files_nodes n
WHERE n.deleted_at IS NULL
AND n.parent_id = $1
ORDER BY CASE WHEN n.node_type = 'folder' THEN 0 ELSE 1 END, lower(n.title), n.created_at DESC
`, parentID)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]model.Node, 0)
for rows.Next() {
n, err := scanNode(rows.Scan)
if err != nil {
return nil, err
}
out = append(out, *n)
}
return out, rows.Err()
}
func (r *NodeRepository) GetForUser(ctx context.Context, id, userID string, subordinateIDs []string) (*model.Node, error) {
return scanNode(r.pool.QueryRow(ctx, `
SELECT n.id, n.parent_id, n.node_type, n.title, n.owner_user_id, n.owner_department_id,
n.created_by, n.updated_by, n.storage_key, n.original_filename, n.mime_type,
n.extension, n.size_bytes, n.office_format, n.external_url, n.version,
effective_node_access(n.id, $2, $3::text[]),
n.created_at, n.updated_at, n.trashed_at, n.purge_after, n.deleted_at
FROM files_nodes n
WHERE n.id = $1
AND n.deleted_at IS NULL
AND effective_node_access(n.id, $2, $3::text[]) <> ''
`, id, userID, subordinateIDs).Scan)
}
func (r *NodeRepository) CreateFolder(ctx context.Context, title string, parentID *string, ownerID string) (*model.Node, error) {
return scanNode(r.pool.QueryRow(ctx, `
INSERT INTO files_nodes (parent_id, node_type, title, owner_user_id, created_by)
VALUES ($1, 'folder', $2, $3, $3)
RETURNING id, parent_id, node_type, title, owner_user_id, owner_department_id,
created_by, updated_by, storage_key, original_filename, mime_type,
extension, size_bytes, office_format, external_url, version,
'edit' AS effective_access, created_at, updated_at, trashed_at, purge_after, deleted_at
`, parentID, title, ownerID).Scan)
}
func (r *NodeRepository) CreateFile(ctx context.Context, n *model.Node) (*model.Node, error) {
return scanNode(r.pool.QueryRow(ctx, `
INSERT INTO files_nodes
(parent_id, node_type, title, owner_user_id, created_by, storage_key,
original_filename, mime_type, extension, size_bytes)
VALUES ($1, 'file', $2, $3, $3, $4, $5, $6, $7, $8)
RETURNING id, parent_id, node_type, title, owner_user_id, owner_department_id,
created_by, updated_by, storage_key, original_filename, mime_type,
extension, size_bytes, office_format, external_url, version,
'edit' AS effective_access, created_at, updated_at, trashed_at, purge_after, deleted_at
`, n.ParentID, n.Title, n.OwnerUserID, n.StorageKey, n.OriginalFilename, n.MimeType, n.Extension, n.SizeBytes).Scan)
}
func (r *NodeRepository) CreateOfficeDocument(ctx context.Context, n *model.Node) (*model.Node, error) {
return scanNode(r.pool.QueryRow(ctx, `
INSERT INTO files_nodes
(parent_id, node_type, title, owner_user_id, created_by, original_filename,
mime_type, extension, size_bytes, office_format, external_url)
VALUES ($1, 'office_document', $2, $3, $3, $4, $5, $6, $7, $8, $9)
RETURNING id, parent_id, node_type, title, owner_user_id, owner_department_id,
created_by, updated_by, storage_key, original_filename, mime_type,
extension, size_bytes, office_format, external_url, version,
'edit' AS effective_access, created_at, updated_at, trashed_at, purge_after, deleted_at
`, n.ParentID, n.Title, n.OwnerUserID, n.OriginalFilename, n.MimeType, n.Extension, n.SizeBytes, n.OfficeFormat, n.ExternalURL).Scan)
}
func (r *NodeRepository) ListOfficeExternalURLs(ctx context.Context, userID string, subordinateIDs []string) ([]string, error) {
rows, err := r.pool.Query(ctx, `
SELECT DISTINCT n.external_url
FROM files_nodes n
WHERE n.deleted_at IS NULL
AND n.node_type = 'office_document'
AND n.external_url IS NOT NULL
AND (
n.owner_user_id = $1
OR has_node_access(n.id, $1)
OR n.owner_user_id::text = ANY($2::text[])
)
ORDER BY n.external_url
`, userID, subordinateIDs)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]string, 0)
for rows.Next() {
var url string
if err := rows.Scan(&url); err != nil {
return nil, err
}
out = append(out, url)
}
return out, rows.Err()
}
func (r *NodeRepository) Update(ctx context.Context, id, actorID string, req model.UpdateNodeRequest) (*model.Node, error) {
return scanNode(r.pool.QueryRow(ctx, `
UPDATE files_nodes
SET title = COALESCE($3, title),
parent_id = COALESCE($4, parent_id),
updated_by = $2,
updated_at = now(),
version = version + 1
WHERE id = $1
AND deleted_at IS NULL
AND effective_node_access(id, $2, '{}'::text[]) = 'edit'
RETURNING id, parent_id, node_type, title, owner_user_id, owner_department_id,
created_by, updated_by, storage_key, original_filename, mime_type,
extension, size_bytes, office_format, external_url, version,
'edit' AS effective_access, created_at, updated_at, trashed_at, purge_after, deleted_at
`, id, actorID, req.Title, req.ParentID).Scan)
}
func (r *NodeRepository) Move(ctx context.Context, id, actorID string, subordinateIDs []string, parentID *string) (*model.Node, error) {
if parentID != nil && *parentID != "" {
var wouldCycle bool
if err := r.pool.QueryRow(ctx, `
WITH RECURSIVE subtree AS (
SELECT id FROM files_nodes WHERE id = $1 AND deleted_at IS NULL
UNION ALL
SELECT c.id FROM files_nodes c JOIN subtree s ON c.parent_id = s.id WHERE c.deleted_at IS NULL
)
SELECT EXISTS (SELECT 1 FROM subtree WHERE id = $2)
`, id, *parentID).Scan(&wouldCycle); err != nil {
return nil, err
}
if wouldCycle {
return nil, model.ErrInvalidMove
}
}
return scanNode(r.pool.QueryRow(ctx, `
UPDATE files_nodes
SET parent_id = $4,
updated_by = $2,
updated_at = now(),
version = version + 1
WHERE id = $1
AND deleted_at IS NULL
AND effective_node_access(id, $2, $3::text[]) = 'edit'
RETURNING id, parent_id, node_type, title, owner_user_id, owner_department_id,
created_by, updated_by, storage_key, original_filename, mime_type,
extension, size_bytes, office_format, external_url, version,
'edit' AS effective_access, created_at, updated_at, trashed_at, purge_after, deleted_at
`, id, actorID, subordinateIDs, parentID).Scan)
}
func (r *NodeRepository) SoftDelete(ctx context.Context, id, actorID string, purgeAfter time.Time) error {
tag, err := r.pool.Exec(ctx, `
WITH RECURSIVE subtree AS (
SELECT id FROM files_nodes WHERE id = $1 AND deleted_at IS NULL
UNION ALL
SELECT c.id FROM files_nodes c JOIN subtree s ON c.parent_id = s.id WHERE c.deleted_at IS NULL
)
UPDATE files_nodes
SET deleted_at = now(), trashed_at = now(), purge_after = $3, updated_by = $2, updated_at = now()
WHERE id IN (SELECT id FROM subtree)
AND effective_node_access($1, $2, '{}'::text[]) = 'edit'
`, id, actorID, purgeAfter)
if err != nil {
return err
}
if tag.RowsAffected() == 0 {
return ErrNotFound
}
return nil
}
func (r *NodeRepository) Restore(ctx context.Context, id, actorID string, subordinateIDs []string) (*model.Node, error) {
return scanNode(r.pool.QueryRow(ctx, `
WITH RECURSIVE subtree AS (
SELECT id FROM files_nodes WHERE id = $1 AND deleted_at IS NOT NULL
UNION ALL
SELECT c.id FROM files_nodes c JOIN subtree s ON c.parent_id = s.id WHERE c.deleted_at IS NOT NULL
),
restored AS (
UPDATE files_nodes n
SET deleted_at = NULL,
trashed_at = NULL,
purge_after = NULL,
parent_id = CASE
WHEN n.id = $1 AND EXISTS (
SELECT 1 FROM files_nodes p WHERE p.id = n.parent_id AND p.deleted_at IS NOT NULL
) THEN NULL
ELSE n.parent_id
END,
updated_by = $2,
updated_at = now(),
version = version + 1
WHERE n.id IN (SELECT id FROM subtree)
AND EXISTS (
SELECT 1 FROM files_nodes root
WHERE root.id = $1
AND root.deleted_at IS NOT NULL
AND (
root.owner_user_id = $2
OR root.owner_user_id::text = ANY($3::text[])
OR EXISTS (
SELECT 1 FROM files_access a
WHERE a.node_id = root.id
AND a.user_id = $2
AND a.access_level = 'edit'
)
)
)
)
SELECT n.id, n.parent_id, n.node_type, n.title, n.owner_user_id, n.owner_department_id,
n.created_by, n.updated_by, n.storage_key, n.original_filename, n.mime_type,
n.extension, n.size_bytes, n.office_format, n.external_url, n.version,
'edit' AS effective_access,
n.created_at, n.updated_at, n.trashed_at, n.purge_after, n.deleted_at
FROM files_nodes n
WHERE n.id = $1
AND n.deleted_at IS NULL
`, id, actorID, subordinateIDs).Scan)
}
func (r *NodeRepository) ListPurgeableStorageKeys(ctx context.Context) ([]string, error) {
rows, err := r.pool.Query(ctx, `
SELECT storage_key
FROM files_nodes
WHERE purge_after IS NOT NULL
AND purge_after <= now()
AND storage_key IS NOT NULL
`)
if err != nil {
return nil, err
}
defer rows.Close()
var out []string
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
return nil, err
}
out = append(out, key)
}
return out, rows.Err()
}
func (r *NodeRepository) PurgeExpired(ctx context.Context) (int64, error) {
tag, err := r.pool.Exec(ctx, `
DELETE FROM files_nodes
WHERE purge_after IS NOT NULL
AND purge_after <= now()
`)
if err != nil {
return 0, err
}
return tag.RowsAffected(), nil
}
func (r *NodeRepository) ListAccess(ctx context.Context, nodeID string) ([]model.Access, error) {
rows, err := r.pool.Query(ctx, `
SELECT user_id, access_level, granted_by, created_at
FROM files_access
WHERE node_id = $1
ORDER BY created_at DESC
`, nodeID)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]model.Access, 0)
for rows.Next() {
var a model.Access
if err := rows.Scan(&a.UserID, &a.AccessLevel, &a.GrantedBy, &a.CreatedAt); err != nil {
return nil, err
}
out = append(out, a)
}
return out, rows.Err()
}
func (r *NodeRepository) ReplaceAccess(ctx context.Context, nodeID, actorID string, access []model.Access) error {
tx, err := r.pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
var canEdit bool
if err := tx.QueryRow(ctx, `SELECT effective_node_access($1, $2, '{}'::text[]) = 'edit'`, nodeID, actorID).Scan(&canEdit); err != nil {
return err
}
if !canEdit {
return ErrNotFound
}
if _, err := tx.Exec(ctx, `DELETE FROM files_access WHERE node_id = $1`, nodeID); err != nil {
return err
}
for _, a := range normalizeAccess(access) {
if _, err := tx.Exec(ctx, `
INSERT INTO files_access (node_id, user_id, access_level, granted_by)
VALUES ($1, $2, $3, $4)
ON CONFLICT (node_id, user_id)
DO UPDATE SET access_level = EXCLUDED.access_level, granted_by = EXCLUDED.granted_by
`, nodeID, a.UserID, a.AccessLevel, actorID); err != nil {
return err
}
}
return tx.Commit(ctx)
}
func normalizeAccess(access []model.Access) []model.Access {
seen := map[string]model.Access{}
for _, a := range access {
if a.UserID == "" {
continue
}
if a.AccessLevel != model.AccessEdit {
a.AccessLevel = model.AccessView
}
if prev, ok := seen[a.UserID]; ok && prev.AccessLevel == model.AccessEdit {
continue
}
seen[a.UserID] = a
}
out := make([]model.Access, 0, len(seen))
for _, a := range seen {
out = append(out, a)
}
return out
}
func (r *NodeRepository) CreatePublicLink(ctx context.Context, nodeID, actorID, token string, expiresAt time.Time) (string, error) {
hash := TokenHash(token)
var id string
err := r.pool.QueryRow(ctx, `
INSERT INTO files_public_links (node_id, token_hash, expires_at, created_by)
SELECT $1, $2, $3, $4
WHERE effective_node_access($1, $4, '{}'::text[]) = 'edit'
RETURNING id
`, nodeID, hash, expiresAt, actorID).Scan(&id)
if errors.Is(err, pgx.ErrNoRows) {
return "", ErrNotFound
}
return id, err
}
func (r *NodeRepository) GetByPublicToken(ctx context.Context, token string) (*model.Node, error) {
return scanNode(r.pool.QueryRow(ctx, `
SELECT n.id, n.parent_id, n.node_type, n.title, n.owner_user_id, n.owner_department_id,
n.created_by, n.updated_by, n.storage_key, n.original_filename, n.mime_type,
n.extension, n.size_bytes, n.office_format, n.external_url, n.version,
'view' AS effective_access, n.created_at, n.updated_at, n.trashed_at, n.purge_after, n.deleted_at
FROM files_public_links l
JOIN files_nodes n ON n.id = l.node_id
WHERE l.token_hash = $1
AND l.revoked_at IS NULL
AND l.expires_at > now()
AND n.deleted_at IS NULL
`, TokenHash(token)).Scan)
}
func (r *NodeRepository) Audit(ctx context.Context, actorID, action, entityType, entityID string, meta string) {
if meta == "" {
meta = "{}"
}
_, _ = r.pool.Exec(ctx, `
INSERT INTO files_audit_events (actor_user_id, action, entity_type, entity_id, meta)
VALUES (NULLIF($1, '')::uuid, $2, $3, NULLIF($4, '')::uuid, $5::jsonb)
`, actorID, action, entityType, entityID, meta)
}
func TokenHash(token string) string {
sum := sha256.Sum256([]byte(token))
return hex.EncodeToString(sum[:])
}