diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go index ec2891238..f0135a2f4 100644 --- a/weed/mount/meta_cache/meta_cache.go +++ b/weed/mount/meta_cache/meta_cache.go @@ -72,6 +72,7 @@ const ( metadataBeginBuild metadataCompleteBuild metadataAbortBuild + metadataPurgeDir metadataShutdown ) @@ -82,6 +83,7 @@ type metadataApplyRequest struct { options MetadataResponseApplyOptions buildPath util.FullPath snapshotTsNs int64 + resetFn func() done chan error } @@ -272,6 +274,20 @@ func (mc *MetaCache) AbortDirectoryBuild(ctx context.Context, dirPath util.FullP }) } +// PurgeDirectoryChildren asynchronously clears a directory's cached children and +// resets its cached flag (resetFn) via the apply loop. Asynchronous so callers +// like kernel Forget don't block; see purgeDirectoryChildrenNow for why off-loop +// callers must route through here rather than wiping the store directly. +func (mc *MetaCache) PurgeDirectoryChildren(dirPath util.FullPath, resetFn func()) { + _ = mc.enqueueApplyRequest(metadataApplyRequest{ + ctx: context.Background(), + kind: metadataPurgeDir, + buildPath: dirPath, + resetFn: resetFn, + done: make(chan error, 1), + }) +} + func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error { mc.Lock() defer mc.Unlock() @@ -465,6 +481,8 @@ func (mc *MetaCache) handleApplyRequest(req metadataApplyRequest) error { return mc.completeDirectoryBuildNow(req.ctx, req.buildPath, req.snapshotTsNs) case metadataAbortBuild: return mc.abortDirectoryBuildNow(req.buildPath) + case metadataPurgeDir: + return mc.purgeDirectoryChildrenNow(req.ctx, req.buildPath, req.resetFn) case metadataShutdown: return nil default: @@ -621,6 +639,24 @@ func (mc *MetaCache) abortDirectoryBuildNow(dirPath util.FullPath) error { return nil } +// purgeDirectoryChildrenNow runs in the apply loop, serialized with +// completeDirectoryBuildNow's markCachedFn, so no build publish interleaves +// between resetFn (clears the cached flag) and the store wipe. Skipping a +// building directory avoids deleting entries the build inserted but hasn't yet +// published. Together these keep a directory from ending up flagged cached over +// an empty store — which hides every file in it though they remain on the filer. +func (mc *MetaCache) purgeDirectoryChildrenNow(ctx context.Context, dirPath util.FullPath, resetFn func()) error { + if mc.isBuildingDir(dirPath) { + return nil + } + if resetFn != nil { + resetFn() + } + mc.Lock() + defer mc.Unlock() + return mc.localStore.DeleteFolderChildren(ctx, dirPath) +} + func (mc *MetaCache) completeDirectoryBuildNow(ctx context.Context, dirPath util.FullPath, snapshotTsNs int64) error { state := mc.buildingDirs[dirPath] delete(mc.buildingDirs, dirPath) diff --git a/weed/mount/meta_cache/meta_cache_purge_test.go b/weed/mount/meta_cache/meta_cache_purge_test.go new file mode 100644 index 000000000..922925b2a --- /dev/null +++ b/weed/mount/meta_cache/meta_cache_purge_test.go @@ -0,0 +1,98 @@ +package meta_cache + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func insertCacheEntry(t *testing.T, mc *MetaCache, path util.FullPath) { + t.Helper() + if err := mc.InsertEntry(context.Background(), &filer.Entry{ + FullPath: path, + Attr: filer.Attr{ + Crtime: time.Unix(1, 0), + Mtime: time.Unix(1, 0), + Mode: 0100644, + FileSize: 1, + }, + }); err != nil { + t.Fatalf("insert %s: %v", path, err) + } +} + +// barrier flushes the apply loop: enqueued before it, PurgeDirectoryChildren is +// asynchronous, so a synchronous apply-loop call afterward guarantees the purge +// has been processed (the loop is FIFO and single-threaded). +func barrier(t *testing.T, mc *MetaCache) { + t.Helper() + if err := mc.AbortDirectoryBuild(context.Background(), util.FullPath("/__barrier__")); err != nil { + t.Fatalf("barrier: %v", err) + } +} + +// TestPurgeSkippedWhileDirectoryBuilding is the core regression guard: a purge +// (idle eviction / kernel Forget / copy-range fallback) that lands while a +// directory is being rebuilt must NOT wipe the entries the build just inserted. +// Otherwise CompleteDirectoryBuild marks the directory cached over an empty +// store, and every file in it vanishes from the mount though it is safe on the +// filer. +func TestPurgeSkippedWhileDirectoryBuilding(t *testing.T) { + mc, _, _, _ := newTestMetaCache(t, map[util.FullPath]bool{"/": true}) + defer mc.Shutdown() + + dir := util.FullPath("/dir") + if err := mc.BeginDirectoryBuild(context.Background(), dir); err != nil { + t.Fatalf("begin build: %v", err) + } + insertCacheEntry(t, mc, "/dir/a.txt") + insertCacheEntry(t, mc, "/dir/b.txt") + + // A concurrent eviction tries to purge the directory mid-build. It is + // enqueued before CompleteDirectoryBuild, so the apply loop processes it + // first — while /dir is still building. + var resetCalls int32 + mc.PurgeDirectoryChildren(dir, func() { atomic.AddInt32(&resetCalls, 1) }) + + if err := mc.CompleteDirectoryBuild(context.Background(), dir, 0); err != nil { + t.Fatalf("complete build: %v", err) + } + + if got := atomic.LoadInt32(&resetCalls); got != 0 { + t.Fatalf("resetFn ran %d times during build; purge must be skipped", got) + } + if !mc.IsDirectoryCached(dir) { + t.Fatal("/dir should be cached after build completes") + } + for _, name := range []string{"/dir/a.txt", "/dir/b.txt"} { + if _, err := mc.FindEntry(context.Background(), util.FullPath(name)); err != nil { + t.Fatalf("%s missing after mid-build purge: %v", name, err) + } + } +} + +// TestPurgeClearsWhenNotBuilding verifies the eviction path still works off the +// apply loop: with no build in flight, the purge resets the cached flag and +// wipes the store. +func TestPurgeClearsWhenNotBuilding(t *testing.T) { + mc, _, _, _ := newTestMetaCache(t, map[util.FullPath]bool{"/": true, "/dir": true}) + defer mc.Shutdown() + + dir := util.FullPath("/dir") + insertCacheEntry(t, mc, "/dir/a.txt") + + var resetCalls int32 + mc.PurgeDirectoryChildren(dir, func() { atomic.AddInt32(&resetCalls, 1) }) + barrier(t, mc) + + if got := atomic.LoadInt32(&resetCalls); got != 1 { + t.Fatalf("resetFn ran %d times; want 1", got) + } + if _, err := mc.FindEntry(context.Background(), util.FullPath("/dir/a.txt")); err == nil { + t.Fatal("/dir/a.txt should have been purged") + } +} diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index b560735c6..cf0f9fe73 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -505,10 +505,12 @@ func (wfs *WFS) StartBackgroundTasks() error { startTime := time.Now() go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), wfs.option.WritebackCache, func(lastTsNs int64, err error) { glog.Warningf("meta events follow retry from %v: %v", time.Unix(0, lastTsNs), err) - if deleteErr := wfs.metaCache.DeleteFolderChildren(context.Background(), util.FullPath(wfs.option.FilerMountRootPath)); deleteErr != nil { - glog.Warningf("meta cache cleanup failed: %v", deleteErr) - } + // A subscription gap may have dropped events, so distrust every cached + // listing. Reset the flags first (safe — it never deletes entries), then + // wipe the root's stale children through the apply loop so the delete + // cannot strand a concurrent rebuild cached-but-empty. wfs.inodeToPath.InvalidateAllChildrenCache() + wfs.purgeDirectoryCache(util.FullPath(wfs.option.FilerMountRootPath)) }, follower) go wfs.loopCheckQuota() go wfs.loopFlushDirtyMetadata() @@ -682,6 +684,9 @@ func (wfs *WFS) ClearCacheDir() { os.RemoveAll(wfs.option.getUniqueCacheDirForRead()) } +// markDirectoryReadThrough drops a hot directory's cached listing. Only safe +// from the apply loop (onDirectoryUpdate), where it serializes with a build's +// markCachedFn; off-loop callers must use purgeDirectoryCache. func (wfs *WFS) markDirectoryReadThrough(dirPath util.FullPath) { if !wfs.inodeToPath.MarkDirectoryReadThrough(dirPath, time.Now()) { return @@ -691,6 +696,15 @@ func (wfs *WFS) markDirectoryReadThrough(dirPath util.FullPath) { } } +// purgeDirectoryCache drops a directory's cached listing from off the apply loop +// (idle eviction, kernel Forget, copy-range fallback), routing through it so a +// stale wipe can't strand a concurrently-rebuilt directory cached-but-empty. +func (wfs *WFS) purgeDirectoryCache(dirPath util.FullPath) { + wfs.metaCache.PurgeDirectoryChildren(dirPath, func() { + wfs.inodeToPath.InvalidateChildrenCache(dirPath) + }) +} + func (wfs *WFS) loopEvictIdleDirCache() { if wfs.dirIdleEvict <= 0 { return @@ -700,9 +714,7 @@ func (wfs *WFS) loopEvictIdleDirCache() { for range ticker.C { dirs := wfs.inodeToPath.CollectEvictableDirs(time.Now(), wfs.dirIdleEvict) for _, dir := range dirs { - if err := wfs.metaCache.DeleteFolderChildren(context.Background(), dir); err != nil { - glog.V(2).Infof("evict dir cache %s: %v", dir, err) - } + wfs.purgeDirectoryCache(dir) } } } diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go index 5123a320c..1c9bdfb2d 100644 --- a/weed/mount/weedfs_file_copy_range.go +++ b/weed/mount/weedfs_file_copy_range.go @@ -267,7 +267,7 @@ func (wfs *WFS) updateServerSideWholeFileCopyMetaCache(dstPath util.FullPath, en event := metadataUpdateEvent(dir, entry) if applyErr := wfs.applyLocalMetadataEvent(context.Background(), event); applyErr != nil { glog.Warningf("CopyFileRange metadata update %s: %v", dstPath, applyErr) - wfs.markDirectoryReadThrough(util.FullPath(dir)) + wfs.purgeDirectoryCache(util.FullPath(dir)) } } diff --git a/weed/mount/weedfs_forget.go b/weed/mount/weedfs_forget.go index 29fc84251..fb7c986f7 100644 --- a/weed/mount/weedfs_forget.go +++ b/weed/mount/weedfs_forget.go @@ -1,8 +1,6 @@ package mount import ( - "context" - "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -68,6 +66,9 @@ func (wfs *WFS) Forget(nodeid, nlookup uint64) { // fhMap here would couple two unrelated refcounts and could tear down a // still-live handle if Forget ever raced ahead of Release. wfs.inodeToPath.Forget(nodeid, nlookup, func(dir util.FullPath) { - wfs.metaCache.DeleteFolderChildren(context.Background(), dir) + // Runs after Forget releases its lock; a concurrent lookup+rebuild can + // re-cache the directory in that window, so purge through the apply loop + // rather than wiping the store directly. + wfs.purgeDirectoryCache(dir) }) }