feat: add file move and trash retention
This commit is contained in:
@@ -56,7 +56,9 @@ func main() {
|
||||
}
|
||||
|
||||
healthH := handler.NewHealthHandler(pool)
|
||||
nodeH := handler.NewNodeHandler(cfg, repository.NewNodeRepository(pool), store)
|
||||
nodeRepo := repository.NewNodeRepository(pool)
|
||||
nodeH := handler.NewNodeHandler(cfg, nodeRepo, store)
|
||||
go runTrashPurger(ctx, nodeRepo, store)
|
||||
|
||||
r := chi.NewRouter()
|
||||
r.Use(chimw.RequestID)
|
||||
@@ -73,6 +75,7 @@ func main() {
|
||||
r.Route("/nodes/{id}", func(r chi.Router) {
|
||||
r.Get("/", nodeH.Get)
|
||||
r.Patch("/", nodeH.Update)
|
||||
r.Post("/move", nodeH.Move)
|
||||
r.Delete("/", nodeH.Delete)
|
||||
r.Get("/download", nodeH.Download)
|
||||
r.Get("/access", nodeH.ListAccess)
|
||||
@@ -116,3 +119,37 @@ func main() {
|
||||
slog.Error("server shutdown error", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func runTrashPurger(ctx context.Context, repo *repository.NodeRepository, store *storage.Storage) {
|
||||
ticker := time.NewTicker(time.Hour)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
purgeExpiredTrash(ctx, repo, store)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func purgeExpiredTrash(ctx context.Context, repo *repository.NodeRepository, store *storage.Storage) {
|
||||
keys, err := repo.ListPurgeableStorageKeys(ctx)
|
||||
if err != nil {
|
||||
slog.Warn("list purgeable files failed", "error", err)
|
||||
return
|
||||
}
|
||||
for _, key := range keys {
|
||||
if err := store.RemoveObject(ctx, key); err != nil {
|
||||
slog.Warn("remove purged file object failed", "key", key, "error", err)
|
||||
}
|
||||
}
|
||||
count, err := repo.PurgeExpired(ctx)
|
||||
if err != nil {
|
||||
slog.Warn("purge expired trash failed", "error", err)
|
||||
return
|
||||
}
|
||||
if count > 0 {
|
||||
slog.Info("purged expired trash", "nodes", count, "objects", len(keys))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,10 +156,38 @@ func (h *NodeHandler) Update(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, http.StatusOK, node)
|
||||
}
|
||||
|
||||
func (h *NodeHandler) Move(w http.ResponseWriter, r *http.Request) {
|
||||
var req model.MoveNodeRequest
|
||||
if err := decodeJSON(r, &req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid json")
|
||||
return
|
||||
}
|
||||
userID := commonmw.GetUserID(r.Context())
|
||||
if !h.requireWritableParent(w, r, userID, req.ParentID) {
|
||||
return
|
||||
}
|
||||
node, err := h.repo.Move(r.Context(), chi.URLParam(r, "id"), userID, subordinates(r), req.ParentID)
|
||||
if errors.Is(err, repository.ErrNotFound) {
|
||||
writeError(w, http.StatusNotFound, "file not found")
|
||||
return
|
||||
}
|
||||
if errors.Is(err, model.ErrInvalidMove) {
|
||||
writeError(w, http.StatusBadRequest, "folder cannot be moved inside itself")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
writeInternalError(w, r, err, "failed to move file")
|
||||
return
|
||||
}
|
||||
h.repo.Audit(r.Context(), userID, "files.node_move", "files_node", node.ID, "{}")
|
||||
writeJSON(w, http.StatusOK, node)
|
||||
}
|
||||
|
||||
func (h *NodeHandler) Delete(w http.ResponseWriter, r *http.Request) {
|
||||
userID := commonmw.GetUserID(r.Context())
|
||||
id := chi.URLParam(r, "id")
|
||||
if err := h.repo.SoftDelete(r.Context(), id, userID); errors.Is(err, repository.ErrNotFound) {
|
||||
purgeAfter := time.Now().Add(30 * 24 * time.Hour)
|
||||
if err := h.repo.SoftDelete(r.Context(), id, userID, purgeAfter); errors.Is(err, repository.ErrNotFound) {
|
||||
writeError(w, http.StatusNotFound, "file not found")
|
||||
return
|
||||
} else if err != nil {
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package model
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
NodeTypeFolder = "folder"
|
||||
@@ -12,6 +15,8 @@ const (
|
||||
AccessEdit = "edit"
|
||||
)
|
||||
|
||||
var ErrInvalidMove = errors.New("invalid move")
|
||||
|
||||
type Node struct {
|
||||
ID string `json:"id"`
|
||||
ParentID *string `json:"parent_id,omitempty"`
|
||||
@@ -32,6 +37,8 @@ type Node struct {
|
||||
EffectiveAccess string `json:"effective_access"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
TrashedAt *time.Time `json:"trashed_at,omitempty"`
|
||||
PurgeAfter *time.Time `json:"purge_after,omitempty"`
|
||||
DeletedAt *time.Time `json:"deleted_at,omitempty"`
|
||||
}
|
||||
|
||||
@@ -52,6 +59,10 @@ type UpdateNodeRequest struct {
|
||||
Title *string `json:"title"`
|
||||
}
|
||||
|
||||
type MoveNodeRequest struct {
|
||||
ParentID *string `json:"parent_id"`
|
||||
}
|
||||
|
||||
type ReplaceAccessRequest struct {
|
||||
Access []Access `json:"access"`
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ func scanNode(scan func(dest ...any) error) (*model.Node, error) {
|
||||
&n.ID, &n.ParentID, &n.NodeType, &n.Title, &n.OwnerUserID, &n.OwnerDepartmentID,
|
||||
&n.CreatedBy, &n.UpdatedBy, &n.StorageKey, &n.OriginalFilename, &n.MimeType,
|
||||
&n.Extension, &n.SizeBytes, &n.OfficeFormat, &n.ExternalURL, &n.Version,
|
||||
&n.EffectiveAccess, &n.CreatedAt, &n.UpdatedAt, &n.DeletedAt,
|
||||
&n.EffectiveAccess, &n.CreatedAt, &n.UpdatedAt, &n.TrashedAt, &n.PurgeAfter, &n.DeletedAt,
|
||||
)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil, ErrNotFound
|
||||
@@ -67,7 +67,7 @@ func (r *NodeRepository) List(ctx context.Context, userID string, subordinateIDs
|
||||
n.created_by, n.updated_by, n.storage_key, n.original_filename, n.mime_type,
|
||||
n.extension, n.size_bytes, n.office_format, n.external_url, n.version,
|
||||
effective_node_access(n.id, $1, $2::text[]),
|
||||
n.created_at, n.updated_at, n.deleted_at
|
||||
n.created_at, n.updated_at, n.trashed_at, n.purge_after, n.deleted_at
|
||||
FROM files_nodes n
|
||||
WHERE ` + strings.Join(where, " AND ") + `
|
||||
ORDER BY CASE WHEN n.node_type = 'folder' THEN 0 ELSE 1 END, lower(n.title), n.created_at DESC`
|
||||
@@ -95,7 +95,7 @@ func (r *NodeRepository) GetForUser(ctx context.Context, id, userID string, subo
|
||||
n.created_by, n.updated_by, n.storage_key, n.original_filename, n.mime_type,
|
||||
n.extension, n.size_bytes, n.office_format, n.external_url, n.version,
|
||||
effective_node_access(n.id, $2, $3::text[]),
|
||||
n.created_at, n.updated_at, n.deleted_at
|
||||
n.created_at, n.updated_at, n.trashed_at, n.purge_after, n.deleted_at
|
||||
FROM files_nodes n
|
||||
WHERE n.id = $1
|
||||
AND n.deleted_at IS NULL
|
||||
@@ -110,7 +110,7 @@ func (r *NodeRepository) CreateFolder(ctx context.Context, title string, parentI
|
||||
RETURNING id, parent_id, node_type, title, owner_user_id, owner_department_id,
|
||||
created_by, updated_by, storage_key, original_filename, mime_type,
|
||||
extension, size_bytes, office_format, external_url, version,
|
||||
'edit' AS effective_access, created_at, updated_at, deleted_at
|
||||
'edit' AS effective_access, created_at, updated_at, trashed_at, purge_after, deleted_at
|
||||
`, parentID, title, ownerID).Scan)
|
||||
}
|
||||
|
||||
@@ -123,7 +123,7 @@ func (r *NodeRepository) CreateFile(ctx context.Context, n *model.Node) (*model.
|
||||
RETURNING id, parent_id, node_type, title, owner_user_id, owner_department_id,
|
||||
created_by, updated_by, storage_key, original_filename, mime_type,
|
||||
extension, size_bytes, office_format, external_url, version,
|
||||
'edit' AS effective_access, created_at, updated_at, deleted_at
|
||||
'edit' AS effective_access, created_at, updated_at, trashed_at, purge_after, deleted_at
|
||||
`, n.ParentID, n.Title, n.OwnerUserID, n.StorageKey, n.OriginalFilename, n.MimeType, n.Extension, n.SizeBytes).Scan)
|
||||
}
|
||||
|
||||
@@ -141,11 +141,44 @@ func (r *NodeRepository) Update(ctx context.Context, id, actorID string, req mod
|
||||
RETURNING id, parent_id, node_type, title, owner_user_id, owner_department_id,
|
||||
created_by, updated_by, storage_key, original_filename, mime_type,
|
||||
extension, size_bytes, office_format, external_url, version,
|
||||
'edit' AS effective_access, created_at, updated_at, deleted_at
|
||||
'edit' AS effective_access, created_at, updated_at, trashed_at, purge_after, deleted_at
|
||||
`, id, actorID, req.Title, req.ParentID).Scan)
|
||||
}
|
||||
|
||||
func (r *NodeRepository) SoftDelete(ctx context.Context, id, actorID string) error {
|
||||
func (r *NodeRepository) Move(ctx context.Context, id, actorID string, subordinateIDs []string, parentID *string) (*model.Node, error) {
|
||||
if parentID != nil && *parentID != "" {
|
||||
var wouldCycle bool
|
||||
if err := r.pool.QueryRow(ctx, `
|
||||
WITH RECURSIVE subtree AS (
|
||||
SELECT id FROM files_nodes WHERE id = $1 AND deleted_at IS NULL
|
||||
UNION ALL
|
||||
SELECT c.id FROM files_nodes c JOIN subtree s ON c.parent_id = s.id WHERE c.deleted_at IS NULL
|
||||
)
|
||||
SELECT EXISTS (SELECT 1 FROM subtree WHERE id = $2)
|
||||
`, id, *parentID).Scan(&wouldCycle); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if wouldCycle {
|
||||
return nil, model.ErrInvalidMove
|
||||
}
|
||||
}
|
||||
return scanNode(r.pool.QueryRow(ctx, `
|
||||
UPDATE files_nodes
|
||||
SET parent_id = $4,
|
||||
updated_by = $2,
|
||||
updated_at = now(),
|
||||
version = version + 1
|
||||
WHERE id = $1
|
||||
AND deleted_at IS NULL
|
||||
AND effective_node_access(id, $2, $3::text[]) = 'edit'
|
||||
RETURNING id, parent_id, node_type, title, owner_user_id, owner_department_id,
|
||||
created_by, updated_by, storage_key, original_filename, mime_type,
|
||||
extension, size_bytes, office_format, external_url, version,
|
||||
'edit' AS effective_access, created_at, updated_at, trashed_at, purge_after, deleted_at
|
||||
`, id, actorID, subordinateIDs, parentID).Scan)
|
||||
}
|
||||
|
||||
func (r *NodeRepository) SoftDelete(ctx context.Context, id, actorID string, purgeAfter time.Time) error {
|
||||
tag, err := r.pool.Exec(ctx, `
|
||||
WITH RECURSIVE subtree AS (
|
||||
SELECT id FROM files_nodes WHERE id = $1 AND deleted_at IS NULL
|
||||
@@ -153,10 +186,10 @@ func (r *NodeRepository) SoftDelete(ctx context.Context, id, actorID string) err
|
||||
SELECT c.id FROM files_nodes c JOIN subtree s ON c.parent_id = s.id WHERE c.deleted_at IS NULL
|
||||
)
|
||||
UPDATE files_nodes
|
||||
SET deleted_at = now(), updated_by = $2, updated_at = now()
|
||||
SET deleted_at = now(), trashed_at = now(), purge_after = $3, updated_by = $2, updated_at = now()
|
||||
WHERE id IN (SELECT id FROM subtree)
|
||||
AND effective_node_access($1, $2, '{}'::text[]) = 'edit'
|
||||
`, id, actorID)
|
||||
`, id, actorID, purgeAfter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -166,6 +199,41 @@ func (r *NodeRepository) SoftDelete(ctx context.Context, id, actorID string) err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *NodeRepository) ListPurgeableStorageKeys(ctx context.Context) ([]string, error) {
|
||||
rows, err := r.pool.Query(ctx, `
|
||||
SELECT storage_key
|
||||
FROM files_nodes
|
||||
WHERE purge_after IS NOT NULL
|
||||
AND purge_after <= now()
|
||||
AND storage_key IS NOT NULL
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []string
|
||||
for rows.Next() {
|
||||
var key string
|
||||
if err := rows.Scan(&key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, key)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func (r *NodeRepository) PurgeExpired(ctx context.Context) (int64, error) {
|
||||
tag, err := r.pool.Exec(ctx, `
|
||||
DELETE FROM files_nodes
|
||||
WHERE purge_after IS NOT NULL
|
||||
AND purge_after <= now()
|
||||
`)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return tag.RowsAffected(), nil
|
||||
}
|
||||
|
||||
func (r *NodeRepository) ListAccess(ctx context.Context, nodeID string) ([]model.Access, error) {
|
||||
rows, err := r.pool.Query(ctx, `
|
||||
SELECT user_id, access_level, granted_by, created_at
|
||||
@@ -260,7 +328,7 @@ func (r *NodeRepository) GetByPublicToken(ctx context.Context, token string) (*m
|
||||
SELECT n.id, n.parent_id, n.node_type, n.title, n.owner_user_id, n.owner_department_id,
|
||||
n.created_by, n.updated_by, n.storage_key, n.original_filename, n.mime_type,
|
||||
n.extension, n.size_bytes, n.office_format, n.external_url, n.version,
|
||||
'view' AS effective_access, n.created_at, n.updated_at, n.deleted_at
|
||||
'view' AS effective_access, n.created_at, n.updated_at, n.trashed_at, n.purge_after, n.deleted_at
|
||||
FROM files_public_links l
|
||||
JOIN files_nodes n ON n.id = l.node_id
|
||||
WHERE l.token_hash = $1
|
||||
|
||||
@@ -137,6 +137,13 @@ func (s *Storage) GetObject(ctx context.Context, key string, rangeStart, rangeEn
|
||||
return obj, &ObjectInfo{Size: info.Size, ContentType: info.ContentType}, nil
|
||||
}
|
||||
|
||||
func (s *Storage) RemoveObject(ctx context.Context, key string) error {
|
||||
if !s.Configured() {
|
||||
return errors.New("storage not configured")
|
||||
}
|
||||
return s.client.RemoveObject(ctx, s.cfg.Bucket, key, minio.RemoveObjectOptions{})
|
||||
}
|
||||
|
||||
func ParseRange(header string, totalSize int64) (start, end int64, ok bool) {
|
||||
if !strings.HasPrefix(header, "bytes=") {
|
||||
return 0, 0, false
|
||||
|
||||
5
migrations/003_trash_retention.down.sql
Normal file
5
migrations/003_trash_retention.down.sql
Normal file
@@ -0,0 +1,5 @@
|
||||
DROP INDEX IF EXISTS files_nodes_purge_after_idx;
|
||||
|
||||
ALTER TABLE files_nodes
|
||||
DROP COLUMN IF EXISTS purge_after,
|
||||
DROP COLUMN IF EXISTS trashed_at;
|
||||
7
migrations/003_trash_retention.up.sql
Normal file
7
migrations/003_trash_retention.up.sql
Normal file
@@ -0,0 +1,7 @@
|
||||
ALTER TABLE files_nodes
|
||||
ADD COLUMN IF NOT EXISTS trashed_at TIMESTAMPTZ,
|
||||
ADD COLUMN IF NOT EXISTS purge_after TIMESTAMPTZ;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS files_nodes_purge_after_idx
|
||||
ON files_nodes(purge_after)
|
||||
WHERE purge_after IS NOT NULL;
|
||||
Reference in New Issue
Block a user