diff --git a/weed/mount/page_writer/chunk_interval_list.go b/weed/mount/page_writer/chunk_interval_list.go index 524d398c7..5cb48f903 100644 --- a/weed/mount/page_writer/chunk_interval_list.go +++ b/weed/mount/page_writer/chunk_interval_list.go @@ -86,6 +86,24 @@ func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) { return } +// IsContiguouslyWritten reports whether the written bytes form one +// unbroken run starting at offset 0. Pressure-driven sealing uses this +// to avoid racing in-flight FUSE writeback on a gap range — sealing a +// gappy chunk would emit volume chunks with no coverage for the gap and +// reads would silently zero-fill it (issue #9330). +func (list *ChunkWrittenIntervalList) IsContiguouslyWritten() bool { + first := list.head.next + if first == list.tail || first.StartOffset != 0 { + return false + } + for t := first; t.next != list.tail; t = t.next { + if t.stopOffset != t.next.StartOffset { + return false + } + } + return true +} + func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) { //t := list.head diff --git a/weed/mount/page_writer/chunk_interval_list_test.go b/weed/mount/page_writer/chunk_interval_list_test.go index 558a1f448..77bb58988 100644 --- a/weed/mount/page_writer/chunk_interval_list_test.go +++ b/weed/mount/page_writer/chunk_interval_list_test.go @@ -81,6 +81,37 @@ func hasData(usage *ChunkWrittenIntervalList, chunkStartOffset, x int64) bool { return false } +func TestIsContiguouslyWritten(t *testing.T) { + const chunkSize int64 = 2 * 1024 * 1024 + cases := []struct { + name string + writes [][2]int64 + expected bool + }{ + {name: "empty", writes: nil, expected: false}, + {name: "single full", writes: [][2]int64{{0, chunkSize}}, expected: true}, + {name: "single partial at start", writes: [][2]int64{{0, chunkSize / 4}}, expected: true}, + {name: "two adjacent halves in order", writes: [][2]int64{{0, chunkSize / 2}, {chunkSize / 2, chunkSize}}, expected: true}, + {name: "two adjacent halves out of order", writes: [][2]int64{{chunkSize / 2, chunkSize}, {0, chunkSize / 2}}, expected: true}, + {name: "overlapping covers everything", writes: [][2]int64{{0, chunkSize*3/4 + 1}, {chunkSize / 2, chunkSize}}, expected: true}, + {name: "three adjacent thirds", writes: [][2]int64{{0, chunkSize / 3}, {chunkSize / 3, 2 * chunkSize / 3}, {2 * chunkSize / 3, chunkSize}}, expected: true}, + {name: "single partial in middle", writes: [][2]int64{{chunkSize / 4, chunkSize / 2}}, expected: false}, + {name: "first byte missing", writes: [][2]int64{{1, chunkSize}}, expected: false}, + {name: "gap in middle", writes: [][2]int64{{0, chunkSize / 4}, {chunkSize / 2, chunkSize}}, expected: false}, + {name: "two intervals with 1-byte gap", writes: [][2]int64{{0, 100}, {101, 200}}, expected: false}, + {name: "three intervals with middle gap", writes: [][2]int64{{0, chunkSize / 4}, {chunkSize / 3, chunkSize / 2}, {chunkSize / 2, chunkSize}}, expected: false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + list := newChunkWrittenIntervalList() + for i, w := range tc.writes { + list.MarkWritten(w[0], w[1], int64(i+1)) + } + assert.Equal(t, tc.expected, list.IsContiguouslyWritten(), "IsContiguouslyWritten mismatch") + }) + } +} + func TestIsComplete_AdjacentIntervals(t *testing.T) { // Linux FUSE delivers writes up to FUSE_MAX_PAGES_PER_REQ // (typically 1 MiB) per op, so a 2 MiB chunk filled by sequential diff --git a/weed/mount/page_writer/page_chunk.go b/weed/mount/page_writer/page_chunk.go index 246ef8a71..7c59db8d8 100644 --- a/weed/mount/page_writer/page_chunk.go +++ b/weed/mount/page_writer/page_chunk.go @@ -11,6 +11,7 @@ type PageChunk interface { WriteDataAt(src []byte, offset int64, tsNs int64) (n int) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) IsComplete() bool + IsContiguouslyWritten() bool ActivityScore() int64 WrittenSize() int64 LastWriteTsNs() int64 diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go index a12227187..9d2b13d5d 100644 --- a/weed/mount/page_writer/page_chunk_mem.go +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -81,6 +81,13 @@ func (mc *MemChunk) IsComplete() bool { return mc.usage.IsComplete(mc.chunkSize) } +func (mc *MemChunk) IsContiguouslyWritten() bool { + mc.RLock() + defer mc.RUnlock() + + return mc.usage.IsContiguouslyWritten() +} + func (mc *MemChunk) ActivityScore() int64 { return mc.activityScore.ActivityScore() } diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index eab1bdcf9..dbe69f80c 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -172,6 +172,12 @@ func (sc *SwapFileChunk) IsComplete() bool { return sc.usage.IsComplete(sc.swapfile.chunkSize) } +func (sc *SwapFileChunk) IsContiguouslyWritten() bool { + sc.RLock() + defer sc.RUnlock() + return sc.usage.IsContiguouslyWritten() +} + func (sc *SwapFileChunk) ActivityScore() int64 { return sc.activityScore.ActivityScore() } diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index 6b5d8b293..275a174eb 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -2,6 +2,7 @@ package page_writer import ( "fmt" + "math" "sync" "sync/atomic" @@ -118,34 +119,23 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsN } if !found { if len(up.writableChunks) > up.writableChunkLimit { - // if current file chunks is over the per file buffer count limit + // Per-pipeline soft cap. Seal the fullest gap-free chunk; + // gappy chunks must wait so in-flight FUSE writeback can fill + // the gap (issue #9330). If none qualifies, fall through — + // the memChunkCounter ceiling below redirects to swap. candidateChunkIndex, fullness := LogicChunkIndex(-1), int64(0) for lci, mc := range up.writableChunks { - chunkFullness := mc.WrittenSize() - if fullness < chunkFullness { + if !mc.IsContiguouslyWritten() { + continue + } + if b := mc.WrittenSize(); b > fullness { candidateChunkIndex = lci - fullness = chunkFullness + fullness = b } } - /* // this algo generates too many chunks - candidateChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64) - for wci, wc := range up.writableChunks { - activityScore := wc.ActivityScore() - if lowestActivityScore >= activityScore { - if lowestActivityScore == activityScore { - chunkFullness := wc.WrittenSize() - if fullness < chunkFullness { - candidateChunkIndex = lci - fullness = chunkFullness - } - } - candidateChunkIndex = wci - lowestActivityScore = activityScore - } + if candidateChunkIndex >= 0 { + up.moveToSealed(up.writableChunks[candidateChunkIndex], candidateChunkIndex) } - */ - up.moveToSealed(up.writableChunks[candidateChunkIndex], candidateChunkIndex) - // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) } // fmt.Printf("isSequential:%v len(up.writableChunks):%v memChunkCounter:%v", isSequential, len(up.writableChunks), memChunkCounter) if isSequential && @@ -270,28 +260,52 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic up.chunksLock.Lock() } -// EvictOneWritableChunk force-seals the fullest writable chunk in this -// pipeline, submitting it for async upload. Called by the accountant's -// evictor when Reserve would block. Returns true if a chunk was sealed. -// The fullest-chunk heuristic matches the over-limit path in SaveDataAt: -// sealing the chunk closest to full maximizes the upload's usefulness -// and avoids thrashing on repeatedly re-creating the same half-empty -// chunk. Callers must not hold up.chunksLock. +// EvictOneWritableChunk force-seals one writable chunk so the accountant's +// Reserve loop can make progress. Strict pass picks the fullest gap-free +// chunk (issue #9330). Fallback picks the oldest non-empty writer when +// nothing is gap-free; without it Reserve deadlocks (cond.Wait only wakes +// on Release, Release only fires on upload completion). Callers must not +// hold up.chunksLock. func (up *UploadPipeline) EvictOneWritableChunk() bool { up.chunksLock.Lock() defer up.chunksLock.Unlock() if len(up.writableChunks) == 0 { return false } - var bestIndex LogicChunkIndex - var bestBytes int64 = -1 + + bestIndex := LogicChunkIndex(-1) + var bestBytes int64 for lci, wc := range up.writableChunks { + if !wc.IsContiguouslyWritten() { + continue + } if b := wc.WrittenSize(); b > bestBytes { bestIndex = lci bestBytes = b } } - if bestBytes < 0 { + + if bestIndex < 0 { + oldestTsNs := int64(math.MaxInt64) + var fallbackBytes int64 + for lci, wc := range up.writableChunks { + b := wc.WrittenSize() + if b == 0 { + continue + } + ts := wc.LastWriteTsNs() + if ts == 0 { + continue + } + if ts < oldestTsNs || (ts == oldestTsNs && b > fallbackBytes) { + bestIndex = lci + oldestTsNs = ts + fallbackBytes = b + } + } + } + + if bestIndex < 0 { return false } up.moveToSealed(up.writableChunks[bestIndex], bestIndex) @@ -330,6 +344,13 @@ func (up *UploadPipeline) ProactiveFlush(nowNs int64, idleThresholdNs int64, max if age < idleThresholdNs { continue } + // Skip gappy chunks; FUSE writeback may still be in flight for the + // hole and SaveContent would bake it into split volume chunks + // (issue #9330). Failing here is just a missed flush — FlushAll + // at close still seals everything. + if !chunk.IsContiguouslyWritten() { + continue + } written := chunk.WrittenSize() nearlyFull := written >= fillRatio behindFrontier := isSeq && int64(lci) <= frontier-int64(frontierLag) diff --git a/weed/mount/page_writer/upload_pipeline_test.go b/weed/mount/page_writer/upload_pipeline_test.go index 82f6acf14..d4f95a0b9 100644 --- a/weed/mount/page_writer/upload_pipeline_test.go +++ b/weed/mount/page_writer/upload_pipeline_test.go @@ -46,3 +46,215 @@ func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOf } } } + +// Pressure-driven eviction must not seal a chunk with a leading or +// internal gap (issue #9330). +func TestEvictOneWritableChunk_SkipsGappyChunks(t *testing.T) { + const cs int64 = 2 * 1024 * 1024 + // nil saveToStorage so the async upload SaveContent is a no-op. + up := NewUploadPipeline(util.NewLimitedConcurrentExecutor(2), cs, nil, 16, "", nil) + + block := make([]byte, cs/4) + + // chunk 0: internal gap; chunk 1: leading gap; chunk 2: contiguous from 0. + if _, err := up.SaveDataAt(block, 0, true, 1); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 3*cs/4, true, 2); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/4, true, 3); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/2, true, 4); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 2*cs, true, 5); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 2*cs+cs/4, true, 6); err != nil { + t.Fatal(err) + } + + if !up.EvictOneWritableChunk() { + t.Fatalf("EvictOneWritableChunk returned false; expected contiguous chunk 2 to be evictable") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(2)]; stillWritable { + t.Errorf("chunk 2 should have moved to sealed") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(0)]; !stillWritable { + t.Errorf("chunk 0 (internal gap) must remain writable") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(1)]; !stillWritable { + t.Errorf("chunk 1 (leading gap) must remain writable") + } + + // Filling holes makes IsComplete true; maybeMoveToSealed auto-seals. + if _, err := up.SaveDataAt(block, cs/4, true, 7); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs/2, true, 8); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs, true, 9); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+3*cs/4, true, 10); err != nil { + t.Fatal(err) + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(0)]; stillWritable { + t.Errorf("chunk 0 should have auto-sealed after gap was filled") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(1)]; stillWritable { + t.Errorf("chunk 1 should have auto-sealed after leading range was filled") + } +} + +// When every chunk is gappy, the fallback must still seal one so +// accountant.Reserve can wake; oldest-LastWriteTsNs wins. +func TestEvictOneWritableChunk_FallbackPicksOldestGappy(t *testing.T) { + const cs int64 = 2 * 1024 * 1024 + up := NewUploadPipeline(util.NewLimitedConcurrentExecutor(2), cs, nil, 16, "", nil) + + block := make([]byte, cs/4) + + // Strictly increasing tsNs so chunk 0 is oldest; equal WrittenSize. + if _, err := up.SaveDataAt(block, 0, true, 100); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 3*cs/4, true, 101); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/4, true, 200); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/2, true, 201); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 2*cs, true, 300); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 2*cs+3*cs/4, true, 301); err != nil { + t.Fatal(err) + } + + if !up.EvictOneWritableChunk() { + t.Fatalf("fallback must seal a gappy chunk to free a Reserve slot") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(0)]; stillWritable { + t.Errorf("oldest gappy chunk (0) should have been picked by the fallback") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(1)]; !stillWritable { + t.Errorf("chunk 1 should remain writable") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(2)]; !stillWritable { + t.Errorf("chunk 2 should remain writable") + } +} + +// Strict pass preempts the fallback even when a gappy chunk is older. +func TestEvictOneWritableChunk_PrefersStrictOverFallback(t *testing.T) { + const cs int64 = 2 * 1024 * 1024 + up := NewUploadPipeline(util.NewLimitedConcurrentExecutor(2), cs, nil, 16, "", nil) + + block := make([]byte, cs/4) + + // chunk 0 (older, gappy), chunk 1 (newer, contiguous from 0). + if _, err := up.SaveDataAt(block, 0, true, 100); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 3*cs/4, true, 101); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs, true, 200); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/4, true, 201); err != nil { + t.Fatal(err) + } + + if !up.EvictOneWritableChunk() { + t.Fatalf("EvictOneWritableChunk returned false") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(1)]; stillWritable { + t.Errorf("contiguous chunk 1 must be picked over older gappy chunk 0") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(0)]; !stillWritable { + t.Errorf("gappy chunk 0 must remain writable") + } +} + +// ProactiveFlush also skips gappy chunks (issue #9330). No liveness +// fallback here — unlike EvictOneWritableChunk, returning false is just +// a missed optimization. +func TestProactiveFlush_SkipsGappyChunks(t *testing.T) { + const cs int64 = 2 * 1024 * 1024 + up := NewUploadPipeline(util.NewLimitedConcurrentExecutor(2), cs, nil, 16, "", nil) + + block := make([]byte, cs/4) + + // chunk 0: internal gap; chunk 1: leading gap; chunk 2: contiguous from 0. + if _, err := up.SaveDataAt(block, 0, true, 1); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 3*cs/4, true, 2); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/4, true, 3); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+cs/2, true, 4); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 2*cs, true, 5); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, 2*cs+cs/4, true, 6); err != nil { + t.Fatal(err) + } + + // nowNs >> chunk timestamps so age clears the idle/maxHold gates; + // fillRatio < WrittenSize so nearlyFull is true. Leaves + // IsContiguouslyWritten as the only differentiator. + const ( + nowNs int64 = 1_000_000_000 + idleThresholdNs int64 = 100 + maxHoldNs int64 = 200 + fillRatio int64 = cs / 8 + ) + if !up.ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio, 0, false) { + t.Fatalf("ProactiveFlush returned false; expected contiguous chunk 2 to be sealed") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(2)]; stillWritable { + t.Errorf("chunk 2 should have moved to sealed") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(0)]; !stillWritable { + t.Errorf("chunk 0 (internal gap) must remain writable") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(1)]; !stillWritable { + t.Errorf("chunk 1 (leading gap) must remain writable") + } + + if up.ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio, 0, false) { + t.Errorf("ProactiveFlush returned true with only gappy chunks left") + } + + if _, err := up.SaveDataAt(block, cs/4, true, 7); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs/2, true, 8); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs, true, 9); err != nil { + t.Fatal(err) + } + if _, err := up.SaveDataAt(block, cs+3*cs/4, true, 10); err != nil { + t.Fatal(err) + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(0)]; stillWritable { + t.Errorf("chunk 0 should have auto-sealed after gap was filled") + } + if _, stillWritable := up.writableChunks[LogicChunkIndex(1)]; stillWritable { + t.Errorf("chunk 1 should have auto-sealed after leading range was filled") + } +}