feat(mount): proactive flush of idle writable chunks (#9094)

* feat(mount): proactive flush of idle writable chunks

Add a background goroutine that periodically scans writable chunks
across all open file handles and seals those that are idle and
unlikely to receive further writes, submitting them for async upload.

A writable chunk is proactively flushed when it has been idle for
500ms AND meets one of: nearly full (>=90%), behind the sequential
write frontier by 2+ chunks, or stale for 5+ seconds. Flushing only
happens when the upload pipeline has spare capacity (< half of
concurrent writer slots in use).

This prevents partial chunks from accumulating until fsync/close,
which is particularly beneficial for bursty or small-file workloads
where chunks may never reach IsComplete().

Also fixes a latent bug in ActivityScore where MarkRead/MarkWrite
used value receivers, silently discarding all mutations.

* refactor(mount): reuse WriterPattern instead of duplicating sequential detection

Remove the isSequential atomic from UploadPipeline. The proactive
flusher now reads IsSequentialMode() from the existing WriterPattern
on PageWriter and passes it as a parameter to ProactiveFlush. This
avoids duplicating the sequential/random detection that WriterPattern
already maintains.

* fix(mount): address PR review feedback

- Make ActivityScore thread-safe using atomics (CAS loop for score
  updates, atomic load/swap for timestamp). Previously MarkRead was
  called under RLock while MarkWrite held a write lock, creating a
  data race on the shared fields.

- Fix ProactiveFlush half-capacity guard: use multiplication
  (uploaderCount*2 >= max) instead of floor division (max/2) which
  misbehaves for odd or small concurrentWriterMax values.

* fix(mount): review fixes for proactive flush

- Fix TOCTOU race in lastWriteChunkIndex update: use CAS loop so
  concurrent writers cannot regress the frontier.
- Remove unused UploaderCount() getter.
- Reuse the caller-provided tsNs instead of calling time.Now() again
  in WriteDataAt for lastWriteTsNs, eliminating a redundant syscall
  per write.
This commit is contained in:
Chris Lu
2026-04-16 00:44:24 -07:00
committed by GitHub
parent 46b801aedb
commit d7865909ba
10 changed files with 196 additions and 44 deletions
+4
View File
@@ -92,6 +92,10 @@ func (pages *ChunkedDirtyPages) EvictOneWritableChunk() bool {
return pages.uploadPipeline.EvictOneWritableChunk()
}
func (pages *ChunkedDirtyPages) ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio int64, frontierLag int, isSequential bool) bool {
return pages.uploadPipeline.ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio, frontierLag, isSequential)
}
func (pages *ChunkedDirtyPages) LockForRead(startOffset, stopOffset int64) {
pages.uploadPipeline.LockForRead(startOffset, stopOffset)
}
+4
View File
@@ -85,6 +85,10 @@ func (pw *PageWriter) EvictOneWritableChunk() bool {
return pw.randomWriter.EvictOneWritableChunk()
}
func (pw *PageWriter) ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio int64, frontierLag int, isSequential bool) bool {
return pw.randomWriter.ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio, frontierLag, isSequential)
}
func max(x, y int64) int64 {
if x > y {
return x
+33 -20
View File
@@ -1,39 +1,52 @@
package page_writer
import "time"
import (
"sync/atomic"
"time"
)
type ActivityScore struct {
lastActiveTsNs int64
decayedActivenessScore int64
lastActiveTsNs int64 // atomic
decayedActivenessScore int64 // atomic
}
func NewActivityScore() *ActivityScore {
return &ActivityScore{}
}
func (as ActivityScore) MarkRead() {
func (as *ActivityScore) MarkRead() {
now := time.Now().UnixNano()
deltaTime := (now - as.lastActiveTsNs) >> 30 // about number of seconds
as.lastActiveTsNs = now
as.decayedActivenessScore = as.decayedActivenessScore>>deltaTime + 256
if as.decayedActivenessScore < 0 {
as.decayedActivenessScore = 0
last := atomic.SwapInt64(&as.lastActiveTsNs, now)
deltaTime := (now - last) >> 30 // about number of seconds
for {
old := atomic.LoadInt64(&as.decayedActivenessScore)
score := old>>deltaTime + 256
if score < 0 {
score = 0
}
if atomic.CompareAndSwapInt64(&as.decayedActivenessScore, old, score) {
break
}
}
}
func (as ActivityScore) MarkWrite() {
func (as *ActivityScore) MarkWrite() {
now := time.Now().UnixNano()
deltaTime := (now - as.lastActiveTsNs) >> 30 // about number of seconds
as.lastActiveTsNs = now
as.decayedActivenessScore = as.decayedActivenessScore>>deltaTime + 1024
if as.decayedActivenessScore < 0 {
as.decayedActivenessScore = 0
last := atomic.SwapInt64(&as.lastActiveTsNs, now)
deltaTime := (now - last) >> 30 // about number of seconds
for {
old := atomic.LoadInt64(&as.decayedActivenessScore)
score := old>>deltaTime + 1024
if score < 0 {
score = 0
}
if atomic.CompareAndSwapInt64(&as.decayedActivenessScore, old, score) {
break
}
}
}
func (as ActivityScore) ActivityScore() int64 {
deltaTime := (time.Now().UnixNano() - as.lastActiveTsNs) >> 30 // about number of seconds
return as.decayedActivenessScore >> deltaTime
func (as *ActivityScore) ActivityScore() int64 {
deltaTime := (time.Now().UnixNano() - atomic.LoadInt64(&as.lastActiveTsNs)) >> 30
return atomic.LoadInt64(&as.decayedActivenessScore) >> deltaTime
}
+1
View File
@@ -8,6 +8,7 @@ type DirtyPages interface {
LockForRead(startOffset, stopOffset int64)
UnlockForRead(startOffset, stopOffset int64)
EvictOneWritableChunk() bool
ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio int64, frontierLag int, isSequential bool) bool
}
func max(x, y int64) int64 {
+1
View File
@@ -13,5 +13,6 @@ type PageChunk interface {
IsComplete() bool
ActivityScore() int64
WrittenSize() int64
LastWriteTsNs() int64
SaveContent(saveFn SaveToStorageFunc)
}
+6
View File
@@ -21,6 +21,7 @@ type MemChunk struct {
chunkSize int64
logicChunkIndex LogicChunkIndex
activityScore *ActivityScore
lastWriteTsNs atomic.Int64
}
func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
@@ -50,6 +51,7 @@ func (mc *MemChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
n = copy(mc.buf[innerOffset:], src)
mc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs)
mc.activityScore.MarkWrite()
mc.lastWriteTsNs.Store(tsNs)
return
}
@@ -90,6 +92,10 @@ func (mc *MemChunk) WrittenSize() int64 {
return mc.usage.WrittenSize()
}
func (mc *MemChunk) LastWriteTsNs() int64 {
return mc.lastWriteTsNs.Load()
}
func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
mc.RLock()
defer mc.RUnlock()
@@ -4,6 +4,7 @@ import (
"io"
"os"
"sync"
"sync/atomic"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
@@ -32,6 +33,7 @@ type SwapFileChunk struct {
logicChunkIndex LogicChunkIndex
actualChunkIndex ActualChunkIndex
activityScore *ActivityScore
lastWriteTsNs atomic.Int64
//memChunk *MemChunk
}
@@ -124,6 +126,7 @@ func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n in
}
//sc.memChunk.WriteDataAt(src, offset, tsNs)
sc.activityScore.MarkWrite()
sc.lastWriteTsNs.Store(tsNs)
return
}
@@ -179,6 +182,10 @@ func (sc *SwapFileChunk) WrittenSize() int64 {
return sc.usage.WrittenSize()
}
func (sc *SwapFileChunk) LastWriteTsNs() int64 {
return sc.lastWriteTsNs.Load()
}
func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
sc.RLock()
defer sc.RUnlock()
+91 -24
View File
@@ -12,20 +12,22 @@ import (
type LogicChunkIndex int
type UploadPipeline struct {
uploaderCount int32
uploaderCountCond *sync.Cond
filepath util.FullPath
ChunkSize int64
uploaders *util.LimitedConcurrentExecutor
saveToStorageFn SaveToStorageFunc
writableChunkLimit int
swapFile *SwapFile
chunksLock sync.Mutex
writableChunks map[LogicChunkIndex]PageChunk
sealedChunks map[LogicChunkIndex]*SealedChunk
activeReadChunks map[LogicChunkIndex]int
readerCountCond *sync.Cond
accountant *WriteBufferAccountant
uploaderCount int32
uploaderCountCond *sync.Cond
filepath util.FullPath
ChunkSize int64
uploaders *util.LimitedConcurrentExecutor
saveToStorageFn SaveToStorageFunc
writableChunkLimit int
concurrentWriterMax int32
swapFile *SwapFile
chunksLock sync.Mutex
writableChunks map[LogicChunkIndex]PageChunk
sealedChunks map[LogicChunkIndex]*SealedChunk
activeReadChunks map[LogicChunkIndex]int
readerCountCond *sync.Cond
accountant *WriteBufferAccountant
lastWriteChunkIndex int64 // atomic: highest LogicChunkIndex written
}
type SealedChunk struct {
@@ -63,16 +65,17 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) {
// any synchronization.
func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string, accountant *WriteBufferAccountant) *UploadPipeline {
t := &UploadPipeline{
ChunkSize: chunkSize,
writableChunks: make(map[LogicChunkIndex]PageChunk),
sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
uploaders: writers,
uploaderCountCond: sync.NewCond(&sync.Mutex{}),
saveToStorageFn: saveToStorageFn,
activeReadChunks: make(map[LogicChunkIndex]int),
writableChunkLimit: bufferChunkLimit,
swapFile: NewSwapFile(swapFileDir, chunkSize),
accountant: accountant,
ChunkSize: chunkSize,
writableChunks: make(map[LogicChunkIndex]PageChunk),
sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
uploaders: writers,
uploaderCountCond: sync.NewCond(&sync.Mutex{}),
saveToStorageFn: saveToStorageFn,
activeReadChunks: make(map[LogicChunkIndex]int),
writableChunkLimit: bufferChunkLimit,
concurrentWriterMax: int32(bufferChunkLimit),
swapFile: NewSwapFile(swapFileDir, chunkSize),
accountant: accountant,
}
t.readerCountCond = sync.NewCond(&t.chunksLock)
return t
@@ -85,6 +88,17 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsN
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
// track write frontier for proactive flushing (CAS to avoid regression)
for {
old := atomic.LoadInt64(&up.lastWriteChunkIndex)
if int64(logicChunkIndex) <= old {
break
}
if atomic.CompareAndSwapInt64(&up.lastWriteChunkIndex, old, int64(logicChunkIndex)) {
break
}
}
pageChunk, found := up.writableChunks[logicChunkIndex]
if !found {
// Reserve a chunk-sized slot against the global write budget before
@@ -284,6 +298,59 @@ func (up *UploadPipeline) EvictOneWritableChunk() bool {
return true
}
// ProactiveFlush seals at most one idle writable chunk that is unlikely to
// receive further writes, submitting it for async upload. Returns true if a
// chunk was sealed. The caller (ChunkFlusher) invokes this periodically so
// that partially-written chunks drain continuously instead of piling up
// until fsync.
func (up *UploadPipeline) ProactiveFlush(nowNs int64, idleThresholdNs int64, maxHoldNs int64, fillRatio int64, frontierLag int, isSequential bool) bool {
if up.concurrentWriterMax <= 0 || atomic.LoadInt32(&up.uploaderCount)*2 >= up.concurrentWriterMax {
return false
}
up.chunksLock.Lock()
defer up.chunksLock.Unlock()
if len(up.writableChunks) == 0 {
return false
}
frontier := atomic.LoadInt64(&up.lastWriteChunkIndex)
isSeq := isSequential
var bestIdx LogicChunkIndex = -1
var bestBytes int64 = -1
for lci, chunk := range up.writableChunks {
lastWrite := chunk.LastWriteTsNs()
if lastWrite == 0 {
continue
}
age := nowNs - lastWrite
if age < idleThresholdNs {
continue
}
written := chunk.WrittenSize()
nearlyFull := written >= fillRatio
behindFrontier := isSeq && int64(lci) <= frontier-int64(frontierLag)
stale := age >= maxHoldNs
if !nearlyFull && !behindFrontier && !stale {
continue
}
if written > bestBytes {
bestIdx = lci
bestBytes = written
}
}
if bestIdx < 0 {
return false
}
glog.V(3).Infof("%s proactive flush chunk %d (%d bytes written)", up.filepath, bestIdx, bestBytes)
up.moveToSealed(up.writableChunks[bestIdx], bestIdx)
return true
}
func (up *UploadPipeline) Shutdown() {
up.swapFile.FreeResource()
+1
View File
@@ -376,6 +376,7 @@ func (wfs *WFS) StartBackgroundTasks() error {
go wfs.loopCheckQuota()
go wfs.loopFlushDirtyMetadata()
go wfs.loopEvictIdleDirCache()
go wfs.loopProactiveFlush()
if wfs.option.WritebackCache {
wfs.fileIdPool = NewFileIdPool(wfs)
+48
View File
@@ -0,0 +1,48 @@
package mount
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
)
const (
proactiveFlushInterval = 200 * time.Millisecond
proactiveIdleThreshold = 500 * time.Millisecond
proactiveMaxHoldTime = 5 * time.Second
proactiveFillRatioNumer = 9
proactiveFillRatioDenom = 10
proactiveFrontierLag = 2
)
func (wfs *WFS) loopProactiveFlush() {
ticker := time.NewTicker(proactiveFlushInterval)
defer ticker.Stop()
glog.V(0).Infof("proactive chunk flusher started (idle=%v maxHold=%v)", proactiveIdleThreshold, proactiveMaxHoldTime)
for range ticker.C {
wfs.proactiveFlushOnce()
}
}
func (wfs *WFS) proactiveFlushOnce() {
nowNs := time.Now().UnixNano()
idleNs := proactiveIdleThreshold.Nanoseconds()
maxHoldNs := proactiveMaxHoldTime.Nanoseconds()
fillRatio := wfs.option.ChunkSizeLimit * proactiveFillRatioNumer / proactiveFillRatioDenom
var handles []*FileHandle
wfs.fhMap.RLock()
for _, fh := range wfs.fhMap.inode2fh {
if fh != nil && fh.dirtyPages != nil {
handles = append(handles, fh)
}
}
wfs.fhMap.RUnlock()
for _, fh := range handles {
isSeq := fh.dirtyPages.writerPattern.IsSequentialMode()
fh.dirtyPages.ProactiveFlush(nowNs, idleNs, maxHoldNs, fillRatio, proactiveFrontierLag, isSeq)
}
}