From e96190d128824e89bd63950d9f94980e2632971c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 6 May 2026 15:26:56 -0700 Subject: [PATCH] fix(mount): skip pressure-eviction of gappy page chunks (#9330) (#9334) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(mount): skip pressure-eviction of gappy page chunks (#9330) A page chunk whose written-interval list has an internal hole was being sealed under buffer-pressure eviction, then SaveContent would emit one volume chunk per maximal adjacent run with no chunk covering the hole; reads then silently zero-fill the gap (filer/stream.go:177-186). On a sequential cp through FUSE, that bakes in-flight 4 KiB writes into split volume chunks and leaves chunk-sized blocks of zeros on the destination. Filter the pressure-driven sealers (SaveDataAt's over-limit path, EvictOneWritableChunk, ProactiveFlush) to only seal chunks whose written intervals form one unbroken run. The flush-on-close path (FlushAll) is unchanged: at close every gap is by definition a sparse-file write that the app legitimately never made. * fix(mount): also gate IsContiguouslyWritten on leading zero-offset Tighten IsContiguouslyWritten to also reject empty lists and lists whose first interval does not start at offset 0. The internal-gap and leading-gap cases are symmetric for pressure-driven sealing: both put in-flight FUSE writeback for the missing range at risk of being baked into split volume chunks. The flush-on-close path is still unfiltered (sparse writes are sealed legitimately at FlushAll). Also align EvictOneWritableChunk's bestBytes initialization with SaveDataAt (start at 0) so an empty chunk is never picked, matching the new semantic. Addresses gemini-code-assist review on PR #9334. * fix(mount): preserve cap-pressure liveness in EvictOneWritableChunk The previous version of this fix had EvictOneWritableChunk return false whenever every dirty chunk was gappy. That broke the accountant's Reserve loop: cond.Wait only wakes on Release, Release only fires on upload completion, and refusing to seal anything means no upload starts — the writer hangs at the -writeBufferSizeMB cap forever. Two-pass selection: prefer the fullest gap-free chunk (issue #9330: this is what protects sequential cp from racing FUSE writeback), fall back to the oldest non-empty writer when nothing is gap-free. Oldest-first maximizes the chance that FUSE writeback for the gap range has already settled. The actual sealing path is unchanged — SaveContent still emits one volume chunk per maximal adjacent run; pages that arrive after the seal land in a fresh MemChunk for the same logicChunkIndex and are sealed in turn, so coverage is reconstructed at read time by readResolvedChunks. Sequential cp at default settings always hits the strict pass (writes arrive contiguous-from-0 within their logicChunkIndex), so the bug-fix behavior is preserved; the fallback only runs under genuinely sparse workloads or under FUSE writeback so backed up that no chunk has settled, where forced progress is preferable to a hung mount. * test(mount): pin ProactiveFlush gap-skip behavior (#9330) Sibling regression test for the ProactiveFlush guard added in this series: same 3-chunk setup as TestEvictOneWritableChunk_SkipsGappyChunks (internal gap, leading gap, contiguous). Verifies ProactiveFlush picks the contiguous chunk when staleness criteria are otherwise satisfied, returns false when only gappy chunks remain (no liveness fallback like EvictOneWritableChunk has — failing here is just a missed optimization), and that filling the holes lets the chunks auto-seal via maybeMoveToSealed. * style(mount): trim verbose comments on #9330 fix --- weed/mount/page_writer/chunk_interval_list.go | 18 ++ .../page_writer/chunk_interval_list_test.go | 31 +++ weed/mount/page_writer/page_chunk.go | 1 + weed/mount/page_writer/page_chunk_mem.go | 7 + weed/mount/page_writer/page_chunk_swapfile.go | 6 + weed/mount/page_writer/upload_pipeline.go | 85 ++++--- .../mount/page_writer/upload_pipeline_test.go | 212 ++++++++++++++++++ 7 files changed, 328 insertions(+), 32 deletions(-) diff --git a/weed/mount/page_writer/chunk_interval_list.go b/weed/mount/page_writer/chunk_interval_list.go index 524d398c7..5cb48f903 100644 --- a/weed/mount/page_writer/chunk_interval_list.go +++ b/weed/mount/page_writer/chunk_interval_list.go @@ -86,6 +86,24 @@ func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) { return } +// IsContiguouslyWritten reports whether the written bytes form one +// unbroken run starting at offset 0. Pressure-driven sealing uses this +// to avoid racing in-flight FUSE writeback on a gap range — sealing a +// gappy chunk would emit volume chunks with no coverage for the gap and +// reads would silently zero-fill it (issue #9330). +func (list *ChunkWrittenIntervalList) IsContiguouslyWritten() bool { + first := list.head.next + if first == list.tail || first.StartOffset != 0 { + return false + } + for t := first; t.next != list.tail; t = t.next { + if t.stopOffset != t.next.StartOffset { + return false + } + } + return true +} + func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) { //t := list.head diff --git a/weed/mount/page_writer/chunk_interval_list_test.go b/weed/mount/page_writer/chunk_interval_list_test.go index 558a1f448..77bb58988 100644 --- a/weed/mount/page_writer/chunk_interval_list_test.go +++ b/weed/mount/page_writer/chunk_interval_list_test.go @@ -81,6 +81,37 @@ func hasData(usage *ChunkWrittenIntervalList, chunkStartOffset, x int64) bool { return false } +func TestIsContiguouslyWritten(t *testing.T) { + const chunkSize int64 = 2 * 1024 * 1024 + cases := []struct { + name string + writes [][2]int64 + expected bool + }{ + {name: "empty", writes: nil, expected: false}, + {name: "single full", writes: [][2]int64{{0, chunkSize}}, expected: true}, + {name: "single partial at start", writes: [][2]int64{{0, chunkSize / 4}}, expected: true}, + {name: "two adjacent halves in order", writes: [][2]int64{{0, chunkSize / 2}, {chunkSize / 2, chunkSize}}, expected: true}, + {name: "two adjacent halves out of order", writes: [][2]int64{{chunkSize / 2, chunkSize}, {0, chunkSize / 2}}, expected: true}, + {name: "overlapping covers everything", writes: [][2]int64{{0, chunkSize*3/4 + 1}, {chunkSize / 2, chunkSize}}, expected: true}, + {name: "three adjacent thirds", writes: [][2]int64{{0, chunkSize / 3}, {chunkSize / 3, 2 * chunkSize / 3}, {2 * chunkSize / 3, chunkSize}}, expected: true}, + {name: "single partial in middle", writes: [][2]int64{{chunkSize / 4, chunkSize / 2}}, expected: false}, + {name: "first byte missing", writes: [][2]int64{{1, chunkSize}}, expected: false}, + {name: "gap in middle", writes: [][2]int64{{0, chunkSize / 4}, {chunkSize / 2, chunkSize}}, expected: false}, + {name: "two intervals with 1-byte gap", writes: [][2]int64{{0, 100}, {101, 200}}, expected: false}, + {name: "three intervals with middle gap", writes: [][2]int64{{0, chunkSize / 4}, {chunkSize / 3, chunkSize / 2}, {chunkSize / 2, chunkSize}}, expected: false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + list := newChunkWrittenIntervalList() + for i, w := range tc.writes { + list.MarkWritten(w[0], w[1], int64(i+1)) + } + assert.Equal(t, tc.expected, list.IsContiguouslyWritten(), "IsContiguouslyWritten mismatch") + }) + } +} + func TestIsComplete_AdjacentIntervals(t *testing.T) { // Linux FUSE delivers writes up to FUSE_MAX_PAGES_PER_REQ // (typically 1 MiB) per op, so a 2 MiB chunk filled by sequential diff --git a/weed/mount/page_writer/page_chunk.go b/weed/mount/page_writer/page_chunk.go index 246ef8a71..7c59db8d8 100644 --- a/weed/mount/page_writer/page_chunk.go +++ b/weed/mount/page_writer/page_chunk.go @@ -11,6 +11,7 @@ type PageChunk interface { WriteDataAt(src []byte, offset int64, tsNs int64) (n int) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) IsComplete() bool + IsContiguouslyWritten() bool ActivityScore() int64 WrittenSize() int64 LastWriteTsNs() int64 diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go index a12227187..9d2b13d5d 100644 --- a/weed/mount/page_writer/page_chunk_mem.go +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -81,6 +81,13 @@ func (mc *MemChunk) IsComplete() bool { return mc.usage.IsComplete(mc.chunkSize) } +func (mc *MemChunk) IsContiguouslyWritten() bool { + mc.RLock() + defer mc.RUnlock() + + return mc.usage.IsContiguouslyWritten() +} + func (mc *MemChunk) ActivityScore() int64 { return mc.activityScore.ActivityScore() } diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index eab1bdcf9..dbe69f80c 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -172,6 +172,12 @@ func (sc *SwapFileChunk) IsComplete() bool { return sc.usage.IsComplete(sc.swapfile.chunkSize) } +func (sc *SwapFileChunk) IsContiguouslyWritten() bool { + sc.RLock() + defer sc.RUnlock() + return sc.usage.IsContiguouslyWritten() +} + func (sc *SwapFileChunk) ActivityScore() int64 { return sc.activityScore.ActivityScore() } diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index 6b5d8b293..275a174eb 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -2,6 +2,7 @@ package page_writer import ( "fmt" + "math" "sync" "sync/atomic" @@ -118,34 +119,23 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsN } if !found { if len(up.writableChunks) > up.writableChunkLimit { - // if current file chunks is over the per file buffer count limit + // Per-pipeline soft cap. Seal the fullest gap-free chunk; + // gappy chunks must wait so in-flight FUSE writeback can fill + // the gap (issue #9330). If none qualifies, fall through — + // the memChunkCounter ceiling below redirects to swap. candidateChunkIndex, fullness := LogicChunkIndex(-1), int64(0) for lci, mc := range up.writableChunks { - chunkFullness := mc.WrittenSize() - if fullness < chunkFullness { + if !mc.IsContiguouslyWritten() { + continue + } + if b := mc.WrittenSize(); b > fullness { candidateChunkIndex = lci - fullness = chunkFullness + fullness = b } } - /* // this algo generates too many chunks - candidateChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64) - for wci, wc := range up.writableChunks { - activityScore := wc.ActivityScore() - if lowestActivityScore >= activityScore { - if lowestActivityScore == activityScore { - chunkFullness := wc.WrittenSize() - if fullness < chunkFullness { - candidateChunkIndex = lci - fullness = chunkFullness - } - } - candidateChunkIndex = wci - lowestActivityScore = activityScore - } + if candidateChunkIndex >= 0 { + up.moveToSealed(up.writableChunks[candidateChunkIndex], candidateChunkIndex) } - */ - up.moveToSealed(up.writableChunks[candidateChunkIndex], candidateChunkIndex) - // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) } // fmt.Printf("isSequential:%v len(up.writableChunks):%v memChunkCounter:%v", isSequential, len(up.writableChunks), memChunkCounter) if isSequential && @@ -270,28 +260,52 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic up.chunksLock.Lock() } -// EvictOneWritableChunk force-seals the fullest writable chunk in this -// pipeline, submitting it for async upload. Called by the accountant's -// evictor when Reserve would block. Returns true if a chunk was sealed. -// The fullest-chunk heuristic matches the over-limit path in SaveDataAt: -// sealing the chunk closest to full maximizes the upload's usefulness -// and avoids thrashing on repeatedly re-creating the same half-empty -// chunk. Callers must not hold up.chunksLock. +// EvictOneWritableChunk force-seals one writable chunk so the accountant's +// Reserve loop can make progress. Strict pass picks the fullest gap-free +// chunk (issue #9330). Fallback picks the oldest non-empty writer when +// nothing is gap-free; without it Reserve deadlocks (cond.Wait only wakes +// on Release, Release only fires on upload completion). Callers must not +// hold up.chunksLock. func (up *UploadPipeline) EvictOneWritableChunk() bool { up.chunksLock.Lock() defer up.chunksLock.Unlock() if len(up.writableChunks) == 0 { return false } - var bestIndex LogicChunkIndex - var bestBytes int64 = -1 + + bestIndex := LogicChunkIndex(-1) + var bestBytes int64 for lci, wc := range up.writableChunks { + if !wc.IsContiguouslyWritten() { + continue + } if b := wc.WrittenSize(); b > bestBytes { bestIndex = lci bestBytes = b } } - if bestBytes < 0 { + + if bestIndex < 0 { + oldestTsNs := int64(math.MaxInt64) + var fallbackBytes int64 + for lci, wc := range up.writableChunks { + b := wc.WrittenSize() + if b == 0 { + continue + } + ts := wc.LastWriteTsNs() + if ts == 0 { + continue + } + if ts < oldestTsNs || (ts == oldestTsNs && b > fallbackBytes) { + bestIndex = lci + oldestTsNs = ts + fallbackBytes = b + } + } + } + + if bestIndex < 0 { return false } up.moveToSealed(up.writableChunks[bestIndex], bestIndex) @@ -330,6 +344,13 @@ func (up *UploadPipeline) ProactiveFlush(nowNs int64, idleThresholdNs int64, max if age < idleThresholdNs { continue } + // Skip gappy chunks; FUSE writeback may still be in flight for the + // hole and SaveContent would bake it into split volume chunks + // (issue #9330). Failing here is just a missed flush — FlushAll + // at close still seals everything. + if !chunk.IsContiguouslyWritten() { + continue + } written := chunk.WrittenSize() nearlyFull := written >= fillRatio behindFrontier := isSeq && int64(lci) <= frontier-int64(frontierLag) diff --git a/weed/mount/page_writer/upload_pipeline_test.go b/weed/mount/page_writer/upload_pipeline_test.go index 82f6acf14..d4f95a0b9 100644 --- a/weed/mount/page_writer/upload_pipeline_test.go +++ b/weed/mount/page_writer/upload_pipeline_test.go @@ -46,3 +46,215 @@ func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOf } } } + +// Pressure-driven eviction must not seal a chunk with a leading or +// internal gap (issue #9330). +func TestEvictOneWritableChunk_SkipsGappyChunks(t *testing.T) { + const cs int64 = 2 * 1024 * 1024 + // nil saveToStorage so the async upload SaveContent is a no-op. + up := NewUploadPipeline(util.NewLimitedConcurrentExecutor(2), cs, nil, 16, "", nil) + + block := make([]byte, cs/4) + + // chunk 0: internal gap; chunk 1: leading gap; chunk 2: contiguous from 0. + if _, err := up.SaveDataAt(block, 0, true, 1); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 3*cs/4, true, 2); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/4, true, 3); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/2, true, 4); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 2*cs, true, 5); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 2*cs+cs/4, true, 6); err != nil { + t.Fatal(err) + } + + if !up.EvictOneWritableChunk() { + t.Fatalf("EvictOneWritableChunk returned false; expected contiguous chunk 2 to be evictable") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(2)]; stillWritable { + t.Errorf("chunk 2 should have moved to sealed") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(0)]; !stillWritable { + t.Errorf("chunk 0 (internal gap) must remain writable") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(1)]; !stillWritable { + t.Errorf("chunk 1 (leading gap) must remain writable") + } + + // Filling holes makes IsComplete true; maybeMoveToSealed auto-seals. + if _, err := up.SaveDataAt(block, cs/4, true, 7); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs/2, true, 8); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs, true, 9); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+3*cs/4, true, 10); err != nil { + t.Fatal(err) + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(0)]; stillWritable { + t.Errorf("chunk 0 should have auto-sealed after gap was filled") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(1)]; stillWritable { + t.Errorf("chunk 1 should have auto-sealed after leading range was filled") + } +} + +// When every chunk is gappy, the fallback must still seal one so +// accountant.Reserve can wake; oldest-LastWriteTsNs wins. +func TestEvictOneWritableChunk_FallbackPicksOldestGappy(t *testing.T) { + const cs int64 = 2 * 1024 * 1024 + up := NewUploadPipeline(util.NewLimitedConcurrentExecutor(2), cs, nil, 16, "", nil) + + block := make([]byte, cs/4) + + // Strictly increasing tsNs so chunk 0 is oldest; equal WrittenSize. + if _, err := up.SaveDataAt(block, 0, true, 100); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 3*cs/4, true, 101); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/4, true, 200); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/2, true, 201); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 2*cs, true, 300); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 2*cs+3*cs/4, true, 301); err != nil { + t.Fatal(err) + } + + if !up.EvictOneWritableChunk() { + t.Fatalf("fallback must seal a gappy chunk to free a Reserve slot") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(0)]; stillWritable { + t.Errorf("oldest gappy chunk (0) should have been picked by the fallback") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(1)]; !stillWritable { + t.Errorf("chunk 1 should remain writable") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(2)]; !stillWritable { + t.Errorf("chunk 2 should remain writable") + } +} + +// Strict pass preempts the fallback even when a gappy chunk is older. +func TestEvictOneWritableChunk_PrefersStrictOverFallback(t *testing.T) { + const cs int64 = 2 * 1024 * 1024 + up := NewUploadPipeline(util.NewLimitedConcurrentExecutor(2), cs, nil, 16, "", nil) + + block := make([]byte, cs/4) + + // chunk 0 (older, gappy), chunk 1 (newer, contiguous from 0). + if _, err := up.SaveDataAt(block, 0, true, 100); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 3*cs/4, true, 101); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs, true, 200); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/4, true, 201); err != nil { + t.Fatal(err) + } + + if !up.EvictOneWritableChunk() { + t.Fatalf("EvictOneWritableChunk returned false") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(1)]; stillWritable { + t.Errorf("contiguous chunk 1 must be picked over older gappy chunk 0") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(0)]; !stillWritable { + t.Errorf("gappy chunk 0 must remain writable") + } +} + +// ProactiveFlush also skips gappy chunks (issue #9330). No liveness +// fallback here — unlike EvictOneWritableChunk, returning false is just +// a missed optimization. +func TestProactiveFlush_SkipsGappyChunks(t *testing.T) { + const cs int64 = 2 * 1024 * 1024 + up := NewUploadPipeline(util.NewLimitedConcurrentExecutor(2), cs, nil, 16, "", nil) + + block := make([]byte, cs/4) + + // chunk 0: internal gap; chunk 1: leading gap; chunk 2: contiguous from 0. + if _, err := up.SaveDataAt(block, 0, true, 1); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 3*cs/4, true, 2); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/4, true, 3); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/2, true, 4); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 2*cs, true, 5); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 2*cs+cs/4, true, 6); err != nil { + t.Fatal(err) + } + + // nowNs >> chunk timestamps so age clears the idle/maxHold gates; + // fillRatio < WrittenSize so nearlyFull is true. Leaves + // IsContiguouslyWritten as the only differentiator. + const ( + nowNs int64 = 1_000_000_000 + idleThresholdNs int64 = 100 + maxHoldNs int64 = 200 + fillRatio int64 = cs / 8 + ) + if !up.ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio, 0, false) { + t.Fatalf("ProactiveFlush returned false; expected contiguous chunk 2 to be sealed") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(2)]; stillWritable { + t.Errorf("chunk 2 should have moved to sealed") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(0)]; !stillWritable { + t.Errorf("chunk 0 (internal gap) must remain writable") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(1)]; !stillWritable { + t.Errorf("chunk 1 (leading gap) must remain writable") + } + + if up.ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio, 0, false) { + t.Errorf("ProactiveFlush returned true with only gappy chunks left") + } + + if _, err := up.SaveDataAt(block, cs/4, true, 7); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs/2, true, 8); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs, true, 9); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+3*cs/4, true, 10); err != nil { + t.Fatal(err) + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(0)]; stillWritable { + t.Errorf("chunk 0 should have auto-sealed after gap was filled") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(1)]; stillWritable { + t.Errorf("chunk 1 should have auto-sealed after leading range was filled") + } +}