diff --git a/test/s3/spark/issue_8285_repro_test.go b/test/s3/spark/issue_8285_repro_test.go index 66b46439d..89cd531bd 100644 --- a/test/s3/spark/issue_8285_repro_test.go +++ b/test/s3/spark/issue_8285_repro_test.go @@ -70,7 +70,8 @@ print("WRITE_COUNT=" + str(count)) "issue-8285/output/_temporary/0/", "issue-8285/output/_temporary/0/_temporary/", } - lingering := waitForObjectsToDisappear(t, env, "test", temporaryCandidates, 35*time.Second) + // Empty folder cleanup has a 2m default delay + 30s processor interval + lingering := waitForObjectsToDisappear(t, env, "test", temporaryCandidates, 3*time.Minute) if len(lingering) > 0 { t.Fatalf("issue #8285 regression detected: lingering temporary directories: %v", lingering) } diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index a0e04e32c..d65e97207 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -13,6 +13,9 @@ # recursive_delete will delete all sub folders and files, similar to "rm -Rf" recursive_delete = false #max_file_name_length = 255 +# for S3: how long to wait before deleting an empty folder. +# increase this if using tools like Spark that create temporary directories. +#s3.empty_folder_cleanup_delay = "2m" #################################################### # The following are filer store options diff --git a/weed/filer/empty_folder_cleanup/cleanup_queue.go b/weed/filer/empty_folder_cleanup/cleanup_queue.go index 5e2acd85c..44fa0b409 100644 --- a/weed/filer/empty_folder_cleanup/cleanup_queue.go +++ b/weed/filer/empty_folder_cleanup/cleanup_queue.go @@ -153,6 +153,27 @@ func (q *CleanupQueue) Pop() (string, string, bool) { return item.folder, item.triggeredBy, true } +// PopOlderThan removes and returns the oldest folder only if it has been in the queue +// for longer than the specified duration. Returns empty string and false if no item qualifies. +func (q *CleanupQueue) PopOlderThan(minAge time.Duration) (string, string, bool) { + q.mu.Lock() + defer q.mu.Unlock() + + front := q.items.Front() + if front == nil { + return "", "", false + } + + item := front.Value.(*queueItem) + if time.Since(item.queueTime) <= minAge { + return "", "", false + } + + q.items.Remove(front) + delete(q.itemsMap, item.folder) + return item.folder, item.triggeredBy, true +} + // Peek returns the oldest folder without removing it. // Returns the folder and queue time if available, or empty values if queue is empty. func (q *CleanupQueue) Peek() (folder string, triggeredBy string, queueTime time.Time, ok bool) { diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go index 4feb16184..db41f49a6 100644 --- a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go @@ -18,8 +18,8 @@ const ( DefaultMaxCountCheck = 1000 DefaultCacheExpiry = 5 * time.Minute DefaultQueueMaxSize = 1000 - DefaultQueueMaxAge = 5 * time.Second - DefaultProcessorSleep = 10 * time.Second // How often to check queue + DefaultQueueMaxAge = 2 * time.Minute + DefaultProcessorSleep = 30 * time.Second // How often to check queue ) // FilerOperations defines the filer operations needed by EmptyFolderCleaner @@ -70,15 +70,20 @@ type EmptyFolderCleaner struct { stopCh chan struct{} } -// NewEmptyFolderCleaner creates a new EmptyFolderCleaner -func NewEmptyFolderCleaner(filer FilerOperations, lockRing *lock_manager.LockRing, host pb.ServerAddress, bucketPath string) *EmptyFolderCleaner { +// NewEmptyFolderCleaner creates a new EmptyFolderCleaner. +// cleanupDelay controls how long an empty folder must remain in the queue before deletion. +// If zero, DefaultQueueMaxAge is used. +func NewEmptyFolderCleaner(filer FilerOperations, lockRing *lock_manager.LockRing, host pb.ServerAddress, bucketPath string, cleanupDelay time.Duration) *EmptyFolderCleaner { + if cleanupDelay <= 0 { + cleanupDelay = DefaultQueueMaxAge + } efc := &EmptyFolderCleaner{ filer: filer, lockRing: lockRing, host: host, folderCounts: make(map[string]*folderState), bucketCleanupPolicies: make(map[string]*bucketCleanupPolicyState), - cleanupQueue: NewCleanupQueue(DefaultQueueMaxSize, DefaultQueueMaxAge), + cleanupQueue: NewCleanupQueue(DefaultQueueMaxSize, cleanupDelay), maxCountCheck: DefaultMaxCountCheck, cacheExpiry: DefaultCacheExpiry, processorSleep: DefaultProcessorSleep, @@ -207,27 +212,22 @@ func (efc *EmptyFolderCleaner) cleanupProcessor() { // processCleanupQueue processes items from the cleanup queue func (efc *EmptyFolderCleaner) processCleanupQueue() { - // Check if we should process - if !efc.cleanupQueue.ShouldProcess() { - if efc.cleanupQueue.Len() > 0 { - glog.Infof("EmptyFolderCleaner: pending queue not processed yet (len=%d, oldest_age=%v, max_size=%d, max_age=%v)", - efc.cleanupQueue.Len(), efc.cleanupQueue.OldestAge(), efc.cleanupQueue.maxSize, efc.cleanupQueue.maxAge) - } + if efc.cleanupQueue.Len() == 0 { return } - glog.V(3).Infof("EmptyFolderCleaner: processing cleanup queue (len=%d, age=%v)", + glog.V(3).Infof("EmptyFolderCleaner: processing cleanup queue (len=%d, oldest_age=%v)", efc.cleanupQueue.Len(), efc.cleanupQueue.OldestAge()) - // Process all items that are ready - for efc.cleanupQueue.Len() > 0 { + // Only process items that have been queued longer than maxAge + for { // Check if still enabled if !efc.IsEnabled() { return } - // Pop the oldest item - folder, triggeredBy, ok := efc.cleanupQueue.Pop() + // Only pop items old enough — newer items stay in the queue + folder, triggeredBy, ok := efc.cleanupQueue.PopOlderThan(efc.cleanupQueue.maxAge) if !ok { break } @@ -322,9 +322,18 @@ func (efc *EmptyFolderCleaner) executeCleanup(folder string, triggeredBy string) delete(efc.folderCounts, folder) efc.mu.Unlock() - // Note: No need to recursively check parent folder here. - // The deletion of this folder will generate a metadata event, - // which will trigger OnDeleteEvent for the parent folder. + // After deleting this folder, immediately try to clean the parent. + // Relying solely on cascading metadata events would re-enter the full + // delay queue for each ancestor level, causing multi-minute cascading + // waits (e.g. 3 levels × 2m = 6m+). Instead, walk up eagerly. + parentDir, _ := util.FullPath(folder).DirAndName() + if parentDir != "" && parentDir != folder && + efc.bucketPath != "" && isUnderBucketPath(parentDir, efc.bucketPath) { + // Remove any pending queue entry for the parent so we don't + // double-process it later from a stale event. + efc.cleanupQueue.Remove(parentDir) + efc.executeCleanup(parentDir, triggeredBy) + } } // countItems counts items in a folder (up to maxCountCheck) diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go index e3621dcd3..02511ac33 100644 --- a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go @@ -655,7 +655,7 @@ func TestEmptyFolderCleaner_queueFIFOOrder(t *testing.T) { cleaner.Stop() } -func TestEmptyFolderCleaner_processCleanupQueue_drainsAllOnceTriggered(t *testing.T) { +func TestEmptyFolderCleaner_processCleanupQueue_onlyProcessesAgedItems(t *testing.T) { lockRing := lock_manager.NewLockRing(5 * time.Second) lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}, 0) @@ -670,6 +670,7 @@ func TestEmptyFolderCleaner_processCleanupQueue_drainsAllOnceTriggered(t *testin }, } + maxAge := 100 * time.Millisecond cleaner := &EmptyFolderCleaner{ filer: mock, lockRing: lockRing, @@ -677,25 +678,27 @@ func TestEmptyFolderCleaner_processCleanupQueue_drainsAllOnceTriggered(t *testin bucketPath: "/buckets", enabled: true, folderCounts: make(map[string]*folderState), - cleanupQueue: NewCleanupQueue(2, time.Hour), + cleanupQueue: NewCleanupQueue(1000, maxAge), maxCountCheck: 1000, cacheExpiry: time.Minute, processorSleep: time.Second, stopCh: make(chan struct{}), } - now := time.Now() - cleaner.cleanupQueue.Add("/buckets/test/folder1", "i1", now) - cleaner.cleanupQueue.Add("/buckets/test/folder2", "i2", now.Add(time.Millisecond)) - cleaner.cleanupQueue.Add("/buckets/test/folder3", "i3", now.Add(2*time.Millisecond)) + // Add old items (well past maxAge) and a fresh item + old := time.Now().Add(-time.Second) + cleaner.cleanupQueue.Add("/buckets/test/folder1", "i1", old) + cleaner.cleanupQueue.Add("/buckets/test/folder2", "i2", old.Add(time.Millisecond)) + cleaner.cleanupQueue.Add("/buckets/test/folder3", "i3", time.Now()) // fresh, should NOT be processed cleaner.processCleanupQueue() - if got := cleaner.cleanupQueue.Len(); got != 0 { - t.Fatalf("expected queue to be drained, got len=%d", got) + // Only the two old items should have been processed + if len(deleted) != 2 { + t.Fatalf("expected 2 deleted folders (aged items only), got %d: %v", len(deleted), deleted) } - if len(deleted) != 3 { - t.Fatalf("expected 3 deleted folders, got %d", len(deleted)) + if got := cleaner.cleanupQueue.Len(); got != 1 { + t.Fatalf("expected 1 item remaining in queue, got %d", got) } } diff --git a/weed/filer/filer.go b/weed/filer/filer.go index c16c41f57..8a7ba3e27 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -61,7 +61,8 @@ type Filer struct { MaxFilenameLength uint32 deletionQuit chan struct{} DeletionRetryQueue *DeletionRetryQueue - EmptyFolderCleaner *empty_folder_cleanup.EmptyFolderCleaner + EmptyFolderCleaner *empty_folder_cleanup.EmptyFolderCleaner + EmptyFolderCleanupDelay time.Duration } func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer { @@ -123,7 +124,7 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*maste glog.V(0).Infof("%s aggregate from peers %+v", self, snapshot) // Initialize the empty folder cleaner using the same LockRing as Dlm for consistent hashing - f.EmptyFolderCleaner = empty_folder_cleanup.NewEmptyFolderCleaner(f, f.Dlm.LockRing, self, f.DirBucketsPath) + f.EmptyFolderCleaner = empty_folder_cleanup.NewEmptyFolderCleaner(f, f.Dlm.LockRing, self, f.DirBucketsPath, f.EmptyFolderCleanupDelay) f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption) f.MasterClient.SetOnPeerUpdateFn(func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 91564bb22..7c9669d9e 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -248,6 +248,10 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) glog.Fatalf("%s bootstrap from %+v: %v", option.Host, existingNodes, err) } } + v.SetDefault("filer.options.s3.empty_folder_cleanup_delay", "2m") + if d, err := time.ParseDuration(v.GetString("filer.options.s3.empty_folder_cleanup_delay")); err == nil { + fs.filer.EmptyFolderCleanupDelay = d + } fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime) fs.filer.LoadFilerConf()