diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 13724fee3..c63ebbfab 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -19,6 +19,11 @@ import ( const BufferSize = 8 * 1024 * 1024 const PreviousBufferCount = 4 +// flushQueueDepth bounds queued flush copies (BufferSize each); a full queue +// blocks producers, so a stalled flush backpressures writers instead of +// pinning hundreds of buffer copies. +const flushQueueDepth = 16 + // Errors that can be returned by log buffer operations var ( // ErrBufferCorrupted indicates the log buffer contains corrupted data @@ -103,7 +108,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc ReadFromDiskFn: readFromDiskFn, notifyFn: notifyFn, subscribers: make(map[string]chan struct{}), - flushChan: make(chan *dataToFlush, 256), + flushChan: make(chan *dataToFlush, flushQueueDepth), isStopping: new(atomic.Bool), shutdownCh: make(chan struct{}), offset: 0, // Will be initialized from existing data if available @@ -260,7 +265,11 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) err defer func() { logBuffer.Unlock() if toFlush != nil { - logBuffer.flushChan <- toFlush + select { + case logBuffer.flushChan <- toFlush: + case <-logBuffer.shutdownCh: + // shutting down; loopFlush may be gone, do not park forever + } } // Only notify if there was no error if marshalErr == nil { @@ -369,7 +378,11 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin defer func() { logBuffer.Unlock() if toFlush != nil { - logBuffer.flushChan <- toFlush + select { + case logBuffer.flushChan <- toFlush: + case <-logBuffer.shutdownCh: + // shutting down; loopFlush may be gone, do not park forever + } } // Only notify if there was no error if marshalErr == nil { @@ -472,18 +485,19 @@ func (logBuffer *LogBuffer) ForceFlush() { logBuffer.Unlock() if toFlush != nil { - // Send to flush channel (with reasonable timeout) + // The live buffer was already sealed and reset by copyToFlushWithCallback, + // so dropping toFlush on a timeout would lose it. Block until queued, + // bailing out only on shutdown. select { case logBuffer.flushChan <- toFlush: - // Successfully queued for flush - now WAIT for it to complete - select { - case <-toFlush.done: - // Flush completed successfully - case <-time.After(5 * time.Second): - // Timeout waiting for flush - this shouldn't happen - } - case <-time.After(2 * time.Second): - // If flush channel is still blocked after 2s, something is wrong + case <-logBuffer.shutdownCh: + return + } + select { + case <-toFlush.done: + // Flush completed + case <-time.After(5 * time.Second): + // Queued but not yet flushed; loopFlush will still persist it } } } @@ -498,9 +512,16 @@ func (logBuffer *LogBuffer) ShutdownLogBuffer() { // notice IsStopping() and exit promptly, even on an idle buffer where no // flush notification would otherwise fire. close(logBuffer.shutdownCh) + logBuffer.Lock() toFlush := logBuffer.copyToFlush() - logBuffer.flushChan <- toFlush - close(logBuffer.flushChan) + logBuffer.Unlock() + if toFlush != nil { + logBuffer.flushChan <- toFlush + } + // nil is the shutdown sentinel: loopFlush drains everything queued before + // it and exits. The channel is never closed, so a sender racing shutdown + // can never panic on a closed channel. + logBuffer.flushChan <- nil } // IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer(). @@ -510,27 +531,28 @@ func (logBuffer *LogBuffer) IsAllFlushed() bool { func (logBuffer *LogBuffer) loopFlush() { for d := range logBuffer.flushChan { - if d != nil { - logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset) - d.releaseMemory() - // local logbuffer is different from aggregate logbuffer here - if d.maxOffset >= 0 { - logBuffer.lastFlushedOffset.Store(d.maxOffset) - } - if !d.stopTime.IsZero() { - logBuffer.lastFlushTsNs.Store(d.stopTime.UnixNano()) - } + if d == nil { + break // shutdown sentinel + } + logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset) + d.releaseMemory() + // local logbuffer is different from aggregate logbuffer here + if d.maxOffset >= 0 { + logBuffer.lastFlushedOffset.Store(d.maxOffset) + } + if !d.stopTime.IsZero() { + logBuffer.lastFlushTsNs.Store(d.stopTime.UnixNano()) + } - // Wake readers that may be waiting to retry disk reads after the flush lands. - if logBuffer.notifyFn != nil { - logBuffer.notifyFn() - } - logBuffer.notifySubscribers() + // Wake readers that may be waiting to retry disk reads after the flush lands. + if logBuffer.notifyFn != nil { + logBuffer.notifyFn() + } + logBuffer.notifySubscribers() - // Signal completion if there's a callback channel - if d.done != nil { - close(d.done) - } + // Signal completion if there's a callback channel + if d.done != nil { + close(d.done) } } logBuffer.isAllFlushed = true @@ -547,7 +569,11 @@ func (logBuffer *LogBuffer) loopInterval() { toFlush := logBuffer.copyToFlush() logBuffer.Unlock() if toFlush != nil { - logBuffer.flushChan <- toFlush + select { + case logBuffer.flushChan <- toFlush: + case <-logBuffer.shutdownCh: + // shutting down; loopFlush may be gone, do not park forever + } } } }