From 7bf2dfc9ab59b7e6c438ac8f0df4f48ecbb258d2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 10 Jun 2026 10:57:30 -0700 Subject: [PATCH] Bound the metadata-log flush queue (#9907) * Bound the metadata-log flush queue A stalled flush, e.g. slow volume servers under a reconnect storm, let up to 256 queued 8MB buffer copies pin two gigabytes per log buffer while producers kept filling the queue. Cap the queue at 16 so a sustained stall backpressures writers instead of growing the heap. The flush goroutine never feeds back into the buffer (system-log paths skip event notification), so blocked producers cannot deadlock the consumer. * Don't drop a force-flushed buffer on a full queue ForceFlush enqueued with a two-second timeout, but by then the live buffer was already sealed and reset, so a timed-out send silently lost the copy. Block until the flush is queued; the wait for completion stays bounded since the data is durable once the flush loop drains it. * Never close the flush channel ShutdownLogBuffer closed flushChan while producers could still be blocked sending into it, which panics. Terminate loopFlush with a nil sentinel instead, so the channel is never closed, and give every producer-side send a shutdown escape so none parks forever once the flush loop exits. Everything queued before the sentinel still drains, preserving IsAllFlushed semantics. * Copy the shutdown flush under the buffer lock Every other copyToFlush call site holds the lock; the shutdown path read the live buffer unlocked while producers could still be appending. --- weed/util/log_buffer/log_buffer.go | 96 +++++++++++++++++++----------- 1 file changed, 61 insertions(+), 35 deletions(-) diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 13724fee3..c63ebbfab 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -19,6 +19,11 @@ import ( const BufferSize = 8 * 1024 * 1024 const PreviousBufferCount = 4 +// flushQueueDepth bounds queued flush copies (BufferSize each); a full queue +// blocks producers, so a stalled flush backpressures writers instead of +// pinning hundreds of buffer copies. +const flushQueueDepth = 16 + // Errors that can be returned by log buffer operations var ( // ErrBufferCorrupted indicates the log buffer contains corrupted data @@ -103,7 +108,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc ReadFromDiskFn: readFromDiskFn, notifyFn: notifyFn, subscribers: make(map[string]chan struct{}), - flushChan: make(chan *dataToFlush, 256), + flushChan: make(chan *dataToFlush, flushQueueDepth), isStopping: new(atomic.Bool), shutdownCh: make(chan struct{}), offset: 0, // Will be initialized from existing data if available @@ -260,7 +265,11 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) err defer func() { logBuffer.Unlock() if toFlush != nil { - logBuffer.flushChan <- toFlush + select { + case logBuffer.flushChan <- toFlush: + case <-logBuffer.shutdownCh: + // shutting down; loopFlush may be gone, do not park forever + } } // Only notify if there was no error if marshalErr == nil { @@ -369,7 +378,11 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin defer func() { logBuffer.Unlock() if toFlush != nil { - logBuffer.flushChan <- toFlush + select { + case logBuffer.flushChan <- toFlush: + case <-logBuffer.shutdownCh: + // shutting down; loopFlush may be gone, do not park forever + } } // Only notify if there was no error if marshalErr == nil { @@ -472,18 +485,19 @@ func (logBuffer *LogBuffer) ForceFlush() { logBuffer.Unlock() if toFlush != nil { - // Send to flush channel (with reasonable timeout) + // The live buffer was already sealed and reset by copyToFlushWithCallback, + // so dropping toFlush on a timeout would lose it. Block until queued, + // bailing out only on shutdown. select { case logBuffer.flushChan <- toFlush: - // Successfully queued for flush - now WAIT for it to complete - select { - case <-toFlush.done: - // Flush completed successfully - case <-time.After(5 * time.Second): - // Timeout waiting for flush - this shouldn't happen - } - case <-time.After(2 * time.Second): - // If flush channel is still blocked after 2s, something is wrong + case <-logBuffer.shutdownCh: + return + } + select { + case <-toFlush.done: + // Flush completed + case <-time.After(5 * time.Second): + // Queued but not yet flushed; loopFlush will still persist it } } } @@ -498,9 +512,16 @@ func (logBuffer *LogBuffer) ShutdownLogBuffer() { // notice IsStopping() and exit promptly, even on an idle buffer where no // flush notification would otherwise fire. close(logBuffer.shutdownCh) + logBuffer.Lock() toFlush := logBuffer.copyToFlush() - logBuffer.flushChan <- toFlush - close(logBuffer.flushChan) + logBuffer.Unlock() + if toFlush != nil { + logBuffer.flushChan <- toFlush + } + // nil is the shutdown sentinel: loopFlush drains everything queued before + // it and exits. The channel is never closed, so a sender racing shutdown + // can never panic on a closed channel. + logBuffer.flushChan <- nil } // IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer(). @@ -510,27 +531,28 @@ func (logBuffer *LogBuffer) IsAllFlushed() bool { func (logBuffer *LogBuffer) loopFlush() { for d := range logBuffer.flushChan { - if d != nil { - logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset) - d.releaseMemory() - // local logbuffer is different from aggregate logbuffer here - if d.maxOffset >= 0 { - logBuffer.lastFlushedOffset.Store(d.maxOffset) - } - if !d.stopTime.IsZero() { - logBuffer.lastFlushTsNs.Store(d.stopTime.UnixNano()) - } + if d == nil { + break // shutdown sentinel + } + logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset) + d.releaseMemory() + // local logbuffer is different from aggregate logbuffer here + if d.maxOffset >= 0 { + logBuffer.lastFlushedOffset.Store(d.maxOffset) + } + if !d.stopTime.IsZero() { + logBuffer.lastFlushTsNs.Store(d.stopTime.UnixNano()) + } - // Wake readers that may be waiting to retry disk reads after the flush lands. - if logBuffer.notifyFn != nil { - logBuffer.notifyFn() - } - logBuffer.notifySubscribers() + // Wake readers that may be waiting to retry disk reads after the flush lands. + if logBuffer.notifyFn != nil { + logBuffer.notifyFn() + } + logBuffer.notifySubscribers() - // Signal completion if there's a callback channel - if d.done != nil { - close(d.done) - } + // Signal completion if there's a callback channel + if d.done != nil { + close(d.done) } } logBuffer.isAllFlushed = true @@ -547,7 +569,11 @@ func (logBuffer *LogBuffer) loopInterval() { toFlush := logBuffer.copyToFlush() logBuffer.Unlock() if toFlush != nil { - logBuffer.flushChan <- toFlush + select { + case logBuffer.flushChan <- toFlush: + case <-logBuffer.shutdownCh: + // shutting down; loopFlush may be gone, do not park forever + } } } }