Bound the metadata-log flush queue (#9907)

* Bound the metadata-log flush queue

A stalled flush, e.g. slow volume servers under a reconnect storm, let up
to 256 queued 8MB buffer copies pin two gigabytes per log buffer while
producers kept filling the queue. Cap the queue at 16 so a sustained
stall backpressures writers instead of growing the heap. The flush
goroutine never feeds back into the buffer (system-log paths skip event
notification), so blocked producers cannot deadlock the consumer.

* Don't drop a force-flushed buffer on a full queue

ForceFlush enqueued with a two-second timeout, but by then the live
buffer was already sealed and reset, so a timed-out send silently lost
the copy. Block until the flush is queued; the wait for completion stays
bounded since the data is durable once the flush loop drains it.

* Never close the flush channel

ShutdownLogBuffer closed flushChan while producers could still be
blocked sending into it, which panics. Terminate loopFlush with a nil
sentinel instead, so the channel is never closed, and give every
producer-side send a shutdown escape so none parks forever once the
flush loop exits. Everything queued before the sentinel still drains,
preserving IsAllFlushed semantics.

* Copy the shutdown flush under the buffer lock

Every other copyToFlush call site holds the lock; the shutdown path read
the live buffer unlocked while producers could still be appending.
This commit is contained in:
Chris Lu
2026-06-10 10:57:30 -07:00
committed by GitHub
parent bf76040046
commit 7bf2dfc9ab
+61 -35
View File
@@ -19,6 +19,11 @@ import (
const BufferSize = 8 * 1024 * 1024
const PreviousBufferCount = 4
// flushQueueDepth bounds queued flush copies (BufferSize each); a full queue
// blocks producers, so a stalled flush backpressures writers instead of
// pinning hundreds of buffer copies.
const flushQueueDepth = 16
// Errors that can be returned by log buffer operations
var (
// ErrBufferCorrupted indicates the log buffer contains corrupted data
@@ -103,7 +108,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc
ReadFromDiskFn: readFromDiskFn,
notifyFn: notifyFn,
subscribers: make(map[string]chan struct{}),
flushChan: make(chan *dataToFlush, 256),
flushChan: make(chan *dataToFlush, flushQueueDepth),
isStopping: new(atomic.Bool),
shutdownCh: make(chan struct{}),
offset: 0, // Will be initialized from existing data if available
@@ -260,7 +265,11 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) err
defer func() {
logBuffer.Unlock()
if toFlush != nil {
logBuffer.flushChan <- toFlush
select {
case logBuffer.flushChan <- toFlush:
case <-logBuffer.shutdownCh:
// shutting down; loopFlush may be gone, do not park forever
}
}
// Only notify if there was no error
if marshalErr == nil {
@@ -369,7 +378,11 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
defer func() {
logBuffer.Unlock()
if toFlush != nil {
logBuffer.flushChan <- toFlush
select {
case logBuffer.flushChan <- toFlush:
case <-logBuffer.shutdownCh:
// shutting down; loopFlush may be gone, do not park forever
}
}
// Only notify if there was no error
if marshalErr == nil {
@@ -472,18 +485,19 @@ func (logBuffer *LogBuffer) ForceFlush() {
logBuffer.Unlock()
if toFlush != nil {
// Send to flush channel (with reasonable timeout)
// The live buffer was already sealed and reset by copyToFlushWithCallback,
// so dropping toFlush on a timeout would lose it. Block until queued,
// bailing out only on shutdown.
select {
case logBuffer.flushChan <- toFlush:
// Successfully queued for flush - now WAIT for it to complete
select {
case <-toFlush.done:
// Flush completed successfully
case <-time.After(5 * time.Second):
// Timeout waiting for flush - this shouldn't happen
}
case <-time.After(2 * time.Second):
// If flush channel is still blocked after 2s, something is wrong
case <-logBuffer.shutdownCh:
return
}
select {
case <-toFlush.done:
// Flush completed
case <-time.After(5 * time.Second):
// Queued but not yet flushed; loopFlush will still persist it
}
}
}
@@ -498,9 +512,16 @@ func (logBuffer *LogBuffer) ShutdownLogBuffer() {
// notice IsStopping() and exit promptly, even on an idle buffer where no
// flush notification would otherwise fire.
close(logBuffer.shutdownCh)
logBuffer.Lock()
toFlush := logBuffer.copyToFlush()
logBuffer.flushChan <- toFlush
close(logBuffer.flushChan)
logBuffer.Unlock()
if toFlush != nil {
logBuffer.flushChan <- toFlush
}
// nil is the shutdown sentinel: loopFlush drains everything queued before
// it and exits. The channel is never closed, so a sender racing shutdown
// can never panic on a closed channel.
logBuffer.flushChan <- nil
}
// IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer().
@@ -510,27 +531,28 @@ func (logBuffer *LogBuffer) IsAllFlushed() bool {
func (logBuffer *LogBuffer) loopFlush() {
for d := range logBuffer.flushChan {
if d != nil {
logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset)
d.releaseMemory()
// local logbuffer is different from aggregate logbuffer here
if d.maxOffset >= 0 {
logBuffer.lastFlushedOffset.Store(d.maxOffset)
}
if !d.stopTime.IsZero() {
logBuffer.lastFlushTsNs.Store(d.stopTime.UnixNano())
}
if d == nil {
break // shutdown sentinel
}
logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset)
d.releaseMemory()
// local logbuffer is different from aggregate logbuffer here
if d.maxOffset >= 0 {
logBuffer.lastFlushedOffset.Store(d.maxOffset)
}
if !d.stopTime.IsZero() {
logBuffer.lastFlushTsNs.Store(d.stopTime.UnixNano())
}
// Wake readers that may be waiting to retry disk reads after the flush lands.
if logBuffer.notifyFn != nil {
logBuffer.notifyFn()
}
logBuffer.notifySubscribers()
// Wake readers that may be waiting to retry disk reads after the flush lands.
if logBuffer.notifyFn != nil {
logBuffer.notifyFn()
}
logBuffer.notifySubscribers()
// Signal completion if there's a callback channel
if d.done != nil {
close(d.done)
}
// Signal completion if there's a callback channel
if d.done != nil {
close(d.done)
}
}
logBuffer.isAllFlushed = true
@@ -547,7 +569,11 @@ func (logBuffer *LogBuffer) loopInterval() {
toFlush := logBuffer.copyToFlush()
logBuffer.Unlock()
if toFlush != nil {
logBuffer.flushChan <- toFlush
select {
case logBuffer.flushChan <- toFlush:
case <-logBuffer.shutdownCh:
// shutting down; loopFlush may be gone, do not park forever
}
}
}
}