diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go index ce0d34b71..a7b7d5926 100644 --- a/weed/mount/dirty_pages_chunked.go +++ b/weed/mount/dirty_pages_chunked.go @@ -92,6 +92,10 @@ func (pages *ChunkedDirtyPages) EvictOneWritableChunk() bool { return pages.uploadPipeline.EvictOneWritableChunk() } +func (pages *ChunkedDirtyPages) ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio int64, frontierLag int, isSequential bool) bool { + return pages.uploadPipeline.ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio, frontierLag, isSequential) +} + func (pages *ChunkedDirtyPages) LockForRead(startOffset, stopOffset int64) { pages.uploadPipeline.LockForRead(startOffset, stopOffset) } diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go index fd262c557..4fb1d773b 100644 --- a/weed/mount/page_writer.go +++ b/weed/mount/page_writer.go @@ -85,6 +85,10 @@ func (pw *PageWriter) EvictOneWritableChunk() bool { return pw.randomWriter.EvictOneWritableChunk() } +func (pw *PageWriter) ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio int64, frontierLag int, isSequential bool) bool { + return pw.randomWriter.ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio, frontierLag, isSequential) +} + func max(x, y int64) int64 { if x > y { return x diff --git a/weed/mount/page_writer/activity_score.go b/weed/mount/page_writer/activity_score.go index 22da87e37..cec9a791a 100644 --- a/weed/mount/page_writer/activity_score.go +++ b/weed/mount/page_writer/activity_score.go @@ -1,39 +1,52 @@ package page_writer -import "time" +import ( + "sync/atomic" + "time" +) type ActivityScore struct { - lastActiveTsNs int64 - decayedActivenessScore int64 + lastActiveTsNs int64 // atomic + decayedActivenessScore int64 // atomic } func NewActivityScore() *ActivityScore { return &ActivityScore{} } -func (as ActivityScore) MarkRead() { +func (as *ActivityScore) MarkRead() { now := time.Now().UnixNano() - deltaTime := (now - as.lastActiveTsNs) >> 30 // about number of seconds - as.lastActiveTsNs = now - - as.decayedActivenessScore = as.decayedActivenessScore>>deltaTime + 256 - if as.decayedActivenessScore < 0 { - as.decayedActivenessScore = 0 + last := atomic.SwapInt64(&as.lastActiveTsNs, now) + deltaTime := (now - last) >> 30 // about number of seconds + for { + old := atomic.LoadInt64(&as.decayedActivenessScore) + score := old>>deltaTime + 256 + if score < 0 { + score = 0 + } + if atomic.CompareAndSwapInt64(&as.decayedActivenessScore, old, score) { + break + } } } -func (as ActivityScore) MarkWrite() { +func (as *ActivityScore) MarkWrite() { now := time.Now().UnixNano() - deltaTime := (now - as.lastActiveTsNs) >> 30 // about number of seconds - as.lastActiveTsNs = now - - as.decayedActivenessScore = as.decayedActivenessScore>>deltaTime + 1024 - if as.decayedActivenessScore < 0 { - as.decayedActivenessScore = 0 + last := atomic.SwapInt64(&as.lastActiveTsNs, now) + deltaTime := (now - last) >> 30 // about number of seconds + for { + old := atomic.LoadInt64(&as.decayedActivenessScore) + score := old>>deltaTime + 1024 + if score < 0 { + score = 0 + } + if atomic.CompareAndSwapInt64(&as.decayedActivenessScore, old, score) { + break + } } } -func (as ActivityScore) ActivityScore() int64 { - deltaTime := (time.Now().UnixNano() - as.lastActiveTsNs) >> 30 // about number of seconds - return as.decayedActivenessScore >> deltaTime +func (as *ActivityScore) ActivityScore() int64 { + deltaTime := (time.Now().UnixNano() - atomic.LoadInt64(&as.lastActiveTsNs)) >> 30 + return atomic.LoadInt64(&as.decayedActivenessScore) >> deltaTime } diff --git a/weed/mount/page_writer/dirty_pages.go b/weed/mount/page_writer/dirty_pages.go index 8c32a0be1..04a5f4dc0 100644 --- a/weed/mount/page_writer/dirty_pages.go +++ b/weed/mount/page_writer/dirty_pages.go @@ -8,6 +8,7 @@ type DirtyPages interface { LockForRead(startOffset, stopOffset int64) UnlockForRead(startOffset, stopOffset int64) EvictOneWritableChunk() bool + ProactiveFlush(nowNs, idleThresholdNs, maxHoldNs, fillRatio int64, frontierLag int, isSequential bool) bool } func max(x, y int64) int64 { diff --git a/weed/mount/page_writer/page_chunk.go b/weed/mount/page_writer/page_chunk.go index ac1d24622..246ef8a71 100644 --- a/weed/mount/page_writer/page_chunk.go +++ b/weed/mount/page_writer/page_chunk.go @@ -13,5 +13,6 @@ type PageChunk interface { IsComplete() bool ActivityScore() int64 WrittenSize() int64 + LastWriteTsNs() int64 SaveContent(saveFn SaveToStorageFunc) } diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go index 45922b2ff..a12227187 100644 --- a/weed/mount/page_writer/page_chunk_mem.go +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -21,6 +21,7 @@ type MemChunk struct { chunkSize int64 logicChunkIndex LogicChunkIndex activityScore *ActivityScore + lastWriteTsNs atomic.Int64 } func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk { @@ -50,6 +51,7 @@ func (mc *MemChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) { n = copy(mc.buf[innerOffset:], src) mc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs) mc.activityScore.MarkWrite() + mc.lastWriteTsNs.Store(tsNs) return } @@ -90,6 +92,10 @@ func (mc *MemChunk) WrittenSize() int64 { return mc.usage.WrittenSize() } +func (mc *MemChunk) LastWriteTsNs() int64 { + return mc.lastWriteTsNs.Load() +} + func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { mc.RLock() defer mc.RUnlock() diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index bf60be67a..eab1bdcf9 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -4,6 +4,7 @@ import ( "io" "os" "sync" + "sync/atomic" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util" @@ -32,6 +33,7 @@ type SwapFileChunk struct { logicChunkIndex LogicChunkIndex actualChunkIndex ActualChunkIndex activityScore *ActivityScore + lastWriteTsNs atomic.Int64 //memChunk *MemChunk } @@ -124,6 +126,7 @@ func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n in } //sc.memChunk.WriteDataAt(src, offset, tsNs) sc.activityScore.MarkWrite() + sc.lastWriteTsNs.Store(tsNs) return } @@ -179,6 +182,10 @@ func (sc *SwapFileChunk) WrittenSize() int64 { return sc.usage.WrittenSize() } +func (sc *SwapFileChunk) LastWriteTsNs() int64 { + return sc.lastWriteTsNs.Load() +} + func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { sc.RLock() defer sc.RUnlock() diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index ea3a15f39..6b5d8b293 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -12,20 +12,22 @@ import ( type LogicChunkIndex int type UploadPipeline struct { - uploaderCount int32 - uploaderCountCond *sync.Cond - filepath util.FullPath - ChunkSize int64 - uploaders *util.LimitedConcurrentExecutor - saveToStorageFn SaveToStorageFunc - writableChunkLimit int - swapFile *SwapFile - chunksLock sync.Mutex - writableChunks map[LogicChunkIndex]PageChunk - sealedChunks map[LogicChunkIndex]*SealedChunk - activeReadChunks map[LogicChunkIndex]int - readerCountCond *sync.Cond - accountant *WriteBufferAccountant + uploaderCount int32 + uploaderCountCond *sync.Cond + filepath util.FullPath + ChunkSize int64 + uploaders *util.LimitedConcurrentExecutor + saveToStorageFn SaveToStorageFunc + writableChunkLimit int + concurrentWriterMax int32 + swapFile *SwapFile + chunksLock sync.Mutex + writableChunks map[LogicChunkIndex]PageChunk + sealedChunks map[LogicChunkIndex]*SealedChunk + activeReadChunks map[LogicChunkIndex]int + readerCountCond *sync.Cond + accountant *WriteBufferAccountant + lastWriteChunkIndex int64 // atomic: highest LogicChunkIndex written } type SealedChunk struct { @@ -63,16 +65,17 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) { // any synchronization. func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string, accountant *WriteBufferAccountant) *UploadPipeline { t := &UploadPipeline{ - ChunkSize: chunkSize, - writableChunks: make(map[LogicChunkIndex]PageChunk), - sealedChunks: make(map[LogicChunkIndex]*SealedChunk), - uploaders: writers, - uploaderCountCond: sync.NewCond(&sync.Mutex{}), - saveToStorageFn: saveToStorageFn, - activeReadChunks: make(map[LogicChunkIndex]int), - writableChunkLimit: bufferChunkLimit, - swapFile: NewSwapFile(swapFileDir, chunkSize), - accountant: accountant, + ChunkSize: chunkSize, + writableChunks: make(map[LogicChunkIndex]PageChunk), + sealedChunks: make(map[LogicChunkIndex]*SealedChunk), + uploaders: writers, + uploaderCountCond: sync.NewCond(&sync.Mutex{}), + saveToStorageFn: saveToStorageFn, + activeReadChunks: make(map[LogicChunkIndex]int), + writableChunkLimit: bufferChunkLimit, + concurrentWriterMax: int32(bufferChunkLimit), + swapFile: NewSwapFile(swapFileDir, chunkSize), + accountant: accountant, } t.readerCountCond = sync.NewCond(&t.chunksLock) return t @@ -85,6 +88,17 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsN logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) + // track write frontier for proactive flushing (CAS to avoid regression) + for { + old := atomic.LoadInt64(&up.lastWriteChunkIndex) + if int64(logicChunkIndex) <= old { + break + } + if atomic.CompareAndSwapInt64(&up.lastWriteChunkIndex, old, int64(logicChunkIndex)) { + break + } + } + pageChunk, found := up.writableChunks[logicChunkIndex] if !found { // Reserve a chunk-sized slot against the global write budget before @@ -284,6 +298,59 @@ func (up *UploadPipeline) EvictOneWritableChunk() bool { return true } +// ProactiveFlush seals at most one idle writable chunk that is unlikely to +// receive further writes, submitting it for async upload. Returns true if a +// chunk was sealed. The caller (ChunkFlusher) invokes this periodically so +// that partially-written chunks drain continuously instead of piling up +// until fsync. +func (up *UploadPipeline) ProactiveFlush(nowNs int64, idleThresholdNs int64, maxHoldNs int64, fillRatio int64, frontierLag int, isSequential bool) bool { + if up.concurrentWriterMax <= 0 || atomic.LoadInt32(&up.uploaderCount)*2 >= up.concurrentWriterMax { + return false + } + + up.chunksLock.Lock() + defer up.chunksLock.Unlock() + + if len(up.writableChunks) == 0 { + return false + } + + frontier := atomic.LoadInt64(&up.lastWriteChunkIndex) + isSeq := isSequential + + var bestIdx LogicChunkIndex = -1 + var bestBytes int64 = -1 + + for lci, chunk := range up.writableChunks { + lastWrite := chunk.LastWriteTsNs() + if lastWrite == 0 { + continue + } + age := nowNs - lastWrite + if age < idleThresholdNs { + continue + } + written := chunk.WrittenSize() + nearlyFull := written >= fillRatio + behindFrontier := isSeq && int64(lci) <= frontier-int64(frontierLag) + stale := age >= maxHoldNs + + if !nearlyFull && !behindFrontier && !stale { + continue + } + if written > bestBytes { + bestIdx = lci + bestBytes = written + } + } + if bestIdx < 0 { + return false + } + glog.V(3).Infof("%s proactive flush chunk %d (%d bytes written)", up.filepath, bestIdx, bestBytes) + up.moveToSealed(up.writableChunks[bestIdx], bestIdx) + return true +} + func (up *UploadPipeline) Shutdown() { up.swapFile.FreeResource() diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index e6564c93e..6c4fa0de0 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -376,6 +376,7 @@ func (wfs *WFS) StartBackgroundTasks() error { go wfs.loopCheckQuota() go wfs.loopFlushDirtyMetadata() go wfs.loopEvictIdleDirCache() + go wfs.loopProactiveFlush() if wfs.option.WritebackCache { wfs.fileIdPool = NewFileIdPool(wfs) diff --git a/weed/mount/weedfs_chunk_flusher.go b/weed/mount/weedfs_chunk_flusher.go new file mode 100644 index 000000000..d98503429 --- /dev/null +++ b/weed/mount/weedfs_chunk_flusher.go @@ -0,0 +1,48 @@ +package mount + +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +const ( + proactiveFlushInterval = 200 * time.Millisecond + proactiveIdleThreshold = 500 * time.Millisecond + proactiveMaxHoldTime = 5 * time.Second + proactiveFillRatioNumer = 9 + proactiveFillRatioDenom = 10 + proactiveFrontierLag = 2 +) + +func (wfs *WFS) loopProactiveFlush() { + ticker := time.NewTicker(proactiveFlushInterval) + defer ticker.Stop() + + glog.V(0).Infof("proactive chunk flusher started (idle=%v maxHold=%v)", proactiveIdleThreshold, proactiveMaxHoldTime) + + for range ticker.C { + wfs.proactiveFlushOnce() + } +} + +func (wfs *WFS) proactiveFlushOnce() { + nowNs := time.Now().UnixNano() + idleNs := proactiveIdleThreshold.Nanoseconds() + maxHoldNs := proactiveMaxHoldTime.Nanoseconds() + fillRatio := wfs.option.ChunkSizeLimit * proactiveFillRatioNumer / proactiveFillRatioDenom + + var handles []*FileHandle + wfs.fhMap.RLock() + for _, fh := range wfs.fhMap.inode2fh { + if fh != nil && fh.dirtyPages != nil { + handles = append(handles, fh) + } + } + wfs.fhMap.RUnlock() + + for _, fh := range handles { + isSeq := fh.dirtyPages.writerPattern.IsSequentialMode() + fh.dirtyPages.ProactiveFlush(nowNs, idleNs, maxHoldNs, fillRatio, proactiveFrontierLag, isSeq) + } +}