Share decoded metadata-log entries across subscriber replays (#9903)

perf(filer): share decoded log entries across metadata replays

Concurrent SubscribeMetadata replays of the same persisted log history each
opened a chunk reader per source filer and re-decoded the same files, so a
reconnect storm multiplied into many GB of buffers. Cache the decoded entries
of completed log files in a bounded LRU, coalescing concurrent loads with
single-flight and bounding concurrent decodes. Each hit is validated against
the file's current chunk set, so a file that received a late append is
reloaded rather than served stale; reads that stop on an unreachable chunk are
delivered but not cached so a transient outage re-probes on the next replay.
This commit is contained in:
Chris Lu
2026-06-09 13:34:11 -07:00
committed by GitHub
parent e12052ee6b
commit 9e98ec4b2e
4 changed files with 530 additions and 8 deletions
+2
View File
@@ -64,6 +64,7 @@ type Filer struct {
DeletionRetryQueue *DeletionRetryQueue
EmptyFolderCleaner *empty_folder_cleanup.EmptyFolderCleaner
EmptyFolderCleanupDelay time.Duration
persistedLogCache *persistedLogCache
}
func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer {
@@ -78,6 +79,7 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH
MaxFilenameLength: maxFilenameLength,
deletionQuit: make(chan struct{}),
DeletionRetryQueue: NewDeletionRetryQueue(),
persistedLogCache: newPersistedLogCache(persistedLogCacheMaxBytes),
}
if f.UniqueFilerId < 0 {
f.UniqueFilerId = -f.UniqueFilerId
+60 -8
View File
@@ -300,7 +300,7 @@ func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) {
}
iter, found := v.perFilerIteratorMap[filerId]
if !found {
iter = newLogFileQueueIterator(c.f.MasterClient, util.NewQueue[*LogFileEntry](), c.startTsNs, c.stopTsNs)
iter = newLogFileQueueIterator(c.f.MasterClient, c.f.persistedLogCache, util.NewQueue[*LogFileEntry](), c.startTsNs, c.stopTsNs)
v.perFilerIteratorMap[filerId] = iter
freshFilerIds[filerId] = hourMinuteEntry.Name()
}
@@ -340,15 +340,17 @@ func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) {
type LogFileQueueIterator struct {
q *util.Queue[*LogFileEntry]
masterClient *wdclient.MasterClient
cache *persistedLogCache
startTsNs int64
stopTsNs int64
currentFileIterator *LogFileIterator
}
func newLogFileQueueIterator(masterClient *wdclient.MasterClient, q *util.Queue[*LogFileEntry], startTsNs, stopTsNs int64) *LogFileQueueIterator {
func newLogFileQueueIterator(masterClient *wdclient.MasterClient, cache *persistedLogCache, q *util.Queue[*LogFileEntry], startTsNs, stopTsNs int64) *LogFileQueueIterator {
return &LogFileQueueIterator{
q: q,
masterClient: masterClient,
cache: cache,
startTsNs: startTsNs,
stopTsNs: stopTsNs,
}
@@ -418,27 +420,48 @@ func (iter *LogFileQueueIterator) getNext(v *OrderedLogVisitor) (logEntry *filer
if next != nil && next.TsNs <= iter.startTsNs {
continue
}
iter.currentFileIterator = newLogFileIterator(iter.masterClient, t.FileEntry, iter.startTsNs, iter.stopTsNs)
iter.currentFileIterator = newLogFileIterator(iter.masterClient, iter.cache, t.FileEntry, t.TsNs, iter.startTsNs, iter.stopTsNs)
}
}
// ----------
type LogFileIterator struct {
r io.Reader
sizeBuf []byte
// streaming mode (current/incomplete file): one entry read at a time.
r io.Reader
sizeBuf []byte
// cached mode (completed file): iterate the shared decoded slice. Entries are
// read-only and shared across subscribers, so they are never mutated here.
cached []*filer_pb.LogEntry
cachedPos int
loadErr error
startTsNs int64
stopTsNs int64
filePath string
}
func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator {
func newLogFileIterator(masterClient *wdclient.MasterClient, cache *persistedLogCache, fileEntry *Entry, fileTsNs, startTsNs, stopTsNs int64) *LogFileIterator {
filePath := string(fileEntry.FullPath)
if cache != nil && logFileIsCacheable(fileTsNs) {
chunks := fileEntry.GetChunks()
fingerprint := chunksFingerprint(chunks)
entries, err := cache.getOrLoad(filePath, fingerprint, func() ([]*filer_pb.LogEntry, bool, error) {
return loadLogFileEntries(masterClient, filePath, chunks)
})
return &LogFileIterator{
cached: entries,
loadErr: err,
startTsNs: startTsNs,
stopTsNs: stopTsNs,
filePath: filePath,
}
}
return &LogFileIterator{
r: NewChunkStreamReaderFromFiler(context.Background(), masterClient, fileEntry.Chunks),
sizeBuf: make([]byte, 4),
startTsNs: startTsNs,
stopTsNs: stopTsNs,
filePath: string(fileEntry.FullPath),
filePath: filePath,
}
}
@@ -451,6 +474,9 @@ func (iter *LogFileIterator) Close() error {
// getNext will return io.EOF when done
func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error) {
if iter.r == nil {
return iter.getNextCached()
}
var n int
for {
n, err = iter.r.Read(iter.sizeBuf)
@@ -461,7 +487,9 @@ func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error)
return nil, fmt.Errorf("size %d bytes, expected 4 bytes", n)
}
size := util.BytesToUint32(iter.sizeBuf)
// println("entry size", size)
if size > maxLogEntrySize {
return nil, fmt.Errorf("%s entry size %d exceeds %d", iter.filePath, size, maxLogEntrySize)
}
entryData := make([]byte, size)
n, err = iter.r.Read(entryData)
if err != nil {
@@ -483,3 +511,27 @@ func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error)
return
}
}
// getNextCached yields entries from the shared cached slice, applying the same
// per-subscriber timestamp filtering as the streaming path. Any load error is
// surfaced only after the read entries are yielded, so a partial read delivers
// its prefix before failing, just like the streaming path.
func (iter *LogFileIterator) getNextCached() (logEntry *filer_pb.LogEntry, err error) {
for iter.cachedPos < len(iter.cached) {
logEntry = iter.cached[iter.cachedPos]
iter.cachedPos++
if logEntry.TsNs <= iter.startTsNs {
continue
}
if iter.stopTsNs != 0 && logEntry.TsNs > iter.stopTsNs {
return nil, io.EOF
}
return logEntry, nil
}
if iter.loadErr != nil {
err = iter.loadErr
iter.loadErr = nil
return nil, err
}
return nil, io.EOF
}
+242
View File
@@ -0,0 +1,242 @@
package filer
import (
"container/list"
"context"
"fmt"
"io"
"sync"
"time"
"golang.org/x/sync/singleflight"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
const (
// persistedLogCacheMaxBytes bounds retained entries regardless of subscriber
// count.
persistedLogCacheMaxBytes = 256 << 20
// persistedLogCacheMinAge keeps the actively-written current file out of the
// cache to avoid reload churn. It is a heuristic, not a correctness gate:
// every hit is validated against the file's current chunk set (a fingerprint),
// so a file that turns out to have grown is reloaded rather than served stale.
persistedLogCacheMinAge = 2 * LogFlushInterval
// persistedLogCacheMaxLoads bounds how many distinct files decode at once, so
// the transient peak (an 8MB chunk reader plus the decoded slice per load)
// stays bounded no matter how many positions the subscriber fleet spans.
persistedLogCacheMaxLoads = 8
// maxLogEntrySize guards the per-entry allocation against a corrupt size
// prefix. The write path caps a single entry well under this, so it never
// rejects a legitimate event.
maxLogEntrySize = 1 << 30
)
// persistedLogCache shares the decoded entries of completed metadata log files
// across concurrent SubscribeMetadata replays, so N replays of the same history
// read and decode each file once instead of N times. Cached entries are shared
// read-only; callers must not mutate them.
type persistedLogCache struct {
mu sync.Mutex
ll *list.List // front = most recently used; values are *logCacheItem
index map[string]*list.Element
curBytes int64
maxBytes int64
sf singleflight.Group
loadSem chan struct{}
}
type logCacheItem struct {
key string
// fingerprint identifies the file's chunk set when it was decoded. A cached
// item is only served when the caller's fingerprint still matches; an
// appended-to file (e.g. a delayed flush landed after the file was first read)
// produces a new fingerprint and is reloaded.
fingerprint string
entries []*filer_pb.LogEntry
bytes int64
}
func newPersistedLogCache(maxBytes int64) *persistedLogCache {
return &persistedLogCache{
ll: list.New(),
index: make(map[string]*list.Element),
maxBytes: maxBytes,
loadSem: make(chan struct{}, persistedLogCacheMaxLoads),
}
}
// logFileIsCacheable reports whether a log file identified by its minute
// timestamp is old enough that caching it is worthwhile (it is unlikely to still
// be receiving appends). Correctness does not depend on this being exact.
func logFileIsCacheable(fileTsNs int64) bool {
return time.Since(time.Unix(0, fileTsNs)) > persistedLogCacheMinAge
}
// chunksFingerprint summarizes a log file's append-only chunk set; it changes
// whenever the file grows.
func chunksFingerprint(chunks []*filer_pb.FileChunk) string {
if len(chunks) == 0 {
return "empty"
}
var total uint64
for _, c := range chunks {
total += c.Size
}
return fmt.Sprintf("%d/%d/%s", len(chunks), total, chunks[len(chunks)-1].GetFileIdString())
}
// getOrLoad returns the decoded entries for key, loading them once on miss and
// coalescing concurrent misses for the same (key, fingerprint). A cached item
// whose fingerprint no longer matches is treated as a miss and replaced. The
// returned slice is shared and must be treated as read-only.
type logLoadResult struct {
entries []*filer_pb.LogEntry
err error
}
// load returns the decoded entries, whether the result is stable enough to
// cache, and any error. A read that stopped on a missing chunk is delivered to
// this caller but not cached: the truncation point depends on transient volume
// availability, not on the (cached) fingerprint, so it must be re-probed on
// later replays. On a genuine error the partially-read entries are still
// returned alongside the error so the caller can deliver them before failing,
// matching the streaming path; only a clean, complete read is cached.
func (c *persistedLogCache) getOrLoad(key, fingerprint string, load func() ([]*filer_pb.LogEntry, bool, error)) ([]*filer_pb.LogEntry, error) {
if entries, ok := c.lookup(key, fingerprint); ok {
return entries, nil
}
v, _, _ := c.sf.Do(key+"\x00"+fingerprint, func() (interface{}, error) {
if entries, ok := c.lookup(key, fingerprint); ok {
return logLoadResult{entries: entries}, nil
}
entries, cacheable, loadErr := c.loadGuarded(load)
if loadErr == nil && cacheable {
c.store(key, fingerprint, entries)
}
return logLoadResult{entries: entries, err: loadErr}, nil
})
res := v.(logLoadResult)
return res.entries, res.err
}
// loadGuarded runs load under the concurrent-load semaphore, releasing the slot
// via defer so an abnormal exit cannot leak slots and wedge future loads.
func (c *persistedLogCache) loadGuarded(load func() ([]*filer_pb.LogEntry, bool, error)) ([]*filer_pb.LogEntry, bool, error) {
c.loadSem <- struct{}{}
defer func() { <-c.loadSem }()
return load()
}
func (c *persistedLogCache) lookup(key, fingerprint string) ([]*filer_pb.LogEntry, bool) {
c.mu.Lock()
defer c.mu.Unlock()
el, ok := c.index[key]
if !ok {
return nil, false
}
item := el.Value.(*logCacheItem)
if item.fingerprint != fingerprint {
// The file changed (grew) since it was cached; drop the stale snapshot so
// the caller reloads and stores the current version.
c.removeElement(el)
return nil, false
}
c.ll.MoveToFront(el)
return item.entries, true
}
func (c *persistedLogCache) store(key, fingerprint string, entries []*filer_pb.LogEntry) {
bytes := estimateEntriesBytes(entries)
if bytes > c.maxBytes {
// A single file larger than the whole budget would evict everything else
// and still not fit; serve it this round but do not retain it.
return
}
c.mu.Lock()
defer c.mu.Unlock()
if el, ok := c.index[key]; ok {
c.removeElement(el) // replace any older version of this file
}
el := c.ll.PushFront(&logCacheItem{key: key, fingerprint: fingerprint, entries: entries, bytes: bytes})
c.index[key] = el
c.curBytes += bytes
// Evict least-recently-used until under budget, but always keep what we just
// inserted (a single file larger than the budget still serves this round).
for c.curBytes > c.maxBytes && c.ll.Len() > 1 {
c.removeElement(c.ll.Back())
}
}
// removeElement drops an element from both the list and the index. Caller holds mu.
func (c *persistedLogCache) removeElement(el *list.Element) {
item := el.Value.(*logCacheItem)
c.ll.Remove(el)
delete(c.index, item.key)
c.curBytes -= item.bytes
}
func estimateEntriesBytes(entries []*filer_pb.LogEntry) int64 {
// LogEntry struct (~112B) + slice slot (8B) + a little slack, plus the
// payload backing arrays. Deliberately generous so curBytes does not run
// under the real retained heap.
total := int64(len(entries)) * 128
for _, e := range entries {
total += int64(len(e.Data)+len(e.Key)) + 16
}
return total
}
// loadLogFileEntries reads a whole persisted log file from volume servers and
// decodes every LogEntry. The second result reports whether the read reached a
// clean end (cacheable). A chunk-not-found stop returns the readable prefix with
// cacheable=false: the file's chunk list (the cache fingerprint) is unchanged by
// a momentarily-unreachable volume, so caching the prefix would pin a truncation
// that a transient outage should self-heal. Such reads mirror the streaming
// path for this round but are re-probed on the next replay instead.
func loadLogFileEntries(masterClient *wdclient.MasterClient, filePath string, chunks []*filer_pb.FileChunk) (entries []*filer_pb.LogEntry, cacheable bool, err error) {
r := NewChunkStreamReaderFromFiler(context.Background(), masterClient, chunks)
defer r.Close()
sizeBuf := make([]byte, 4)
for {
if _, readErr := io.ReadFull(r, sizeBuf); readErr != nil {
if readErr == io.EOF {
// clean end exactly at a record boundary: every record was read
return entries, true, nil
}
if readErr == io.ErrUnexpectedEOF {
// stopped mid-frame (a partial size prefix): ambiguous, possibly a
// transient short read, so deliver the prefix but do not cache it
glog.V(1).Infof("log file %s ends mid-record, not caching", filePath)
return entries, false, nil
}
if isChunkNotFoundError(readErr) {
glog.V(1).Infof("log file %s has an unreadable chunk, not caching: %v", filePath, readErr)
return entries, false, nil
}
return entries, false, readErr
}
size := util.BytesToUint32(sizeBuf)
if size > maxLogEntrySize {
return entries, false, fmt.Errorf("log file %s entry size %d exceeds %d", filePath, size, maxLogEntrySize)
}
entryData := make([]byte, size)
if _, readErr := io.ReadFull(r, entryData); readErr != nil {
if isChunkNotFoundError(readErr) {
glog.V(1).Infof("log file %s has an unreadable chunk, not caching: %v", filePath, readErr)
return entries, false, nil
}
return entries, false, readErr
}
logEntry := &filer_pb.LogEntry{}
if unmarshalErr := proto.Unmarshal(entryData, logEntry); unmarshalErr != nil {
return entries, false, unmarshalErr
}
entries = append(entries, logEntry)
}
}
+226
View File
@@ -0,0 +1,226 @@
package filer
import (
"fmt"
"io"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
func logEntriesAt(tsNs ...int64) []*filer_pb.LogEntry {
out := make([]*filer_pb.LogEntry, 0, len(tsNs))
for _, ts := range tsNs {
out = append(out, &filer_pb.LogEntry{TsNs: ts, Data: []byte("x")})
}
return out
}
func TestPersistedLogCacheHitMiss(t *testing.T) {
c := newPersistedLogCache(persistedLogCacheMaxBytes)
var loads int32
load := func() ([]*filer_pb.LogEntry, bool, error) {
atomic.AddInt32(&loads, 1)
return logEntriesAt(1, 2, 3), true, nil
}
e1, err := c.getOrLoad("k", "fp", load)
if err != nil || len(e1) != 3 {
t.Fatalf("first getOrLoad: err=%v len=%d", err, len(e1))
}
e2, err := c.getOrLoad("k", "fp", load)
if err != nil {
t.Fatal(err)
}
if n := atomic.LoadInt32(&loads); n != 1 {
t.Fatalf("expected loader called once, got %d", n)
}
if &e1[0] != &e2[0] {
t.Fatal("cache should return the same shared slice")
}
}
func TestPersistedLogCacheFingerprintReload(t *testing.T) {
c := newPersistedLogCache(persistedLogCacheMaxBytes)
var loads int32
loadV1 := func() ([]*filer_pb.LogEntry, bool, error) {
atomic.AddInt32(&loads, 1)
return logEntriesAt(1, 2), true, nil
}
loadV2 := func() ([]*filer_pb.LogEntry, bool, error) {
atomic.AddInt32(&loads, 1)
return logEntriesAt(1, 2, 3), true, nil // a delayed append grew the file
}
if e, _ := c.getOrLoad("k", "fp1", loadV1); len(e) != 2 {
t.Fatalf("v1 len=%d", len(e))
}
// a later replay observes the grown file (new fingerprint): it must reload,
// not serve the stale truncated snapshot.
e, err := c.getOrLoad("k", "fp2", loadV2)
if err != nil {
t.Fatal(err)
}
if len(e) != 3 {
t.Fatalf("expected reload to 3 entries, got %d", len(e))
}
if n := atomic.LoadInt32(&loads); n != 2 {
t.Fatalf("expected one reload (2 loads), got %d", n)
}
// the cache now holds the new version; the matching fingerprint hits
if _, err := c.getOrLoad("k", "fp2", loadV2); err != nil {
t.Fatal(err)
}
if n := atomic.LoadInt32(&loads); n != 2 {
t.Fatalf("expected fp2 to hit (still 2 loads), got %d", n)
}
}
func TestPersistedLogCacheNotCachedWhenUncacheable(t *testing.T) {
c := newPersistedLogCache(persistedLogCacheMaxBytes)
var loads int32
// cacheable=false models a chunk-not-found stop: deliver the prefix this
// round but never pin it, so a transient outage is re-probed next replay.
load := func() ([]*filer_pb.LogEntry, bool, error) {
atomic.AddInt32(&loads, 1)
return logEntriesAt(1, 2), false, nil
}
if e, err := c.getOrLoad("k", "fp", load); err != nil || len(e) != 2 {
t.Fatalf("first: err=%v len=%d", err, len(e))
}
if e, err := c.getOrLoad("k", "fp", load); err != nil || len(e) != 2 {
t.Fatalf("second: err=%v len=%d", err, len(e))
}
if n := atomic.LoadInt32(&loads); n != 2 {
t.Fatalf("uncacheable result must re-load every time, got %d loads", n)
}
}
func TestChunksFingerprintChangesOnGrowth(t *testing.T) {
c1 := []*filer_pb.FileChunk{{FileId: "3,01", Size: 100}}
c2 := []*filer_pb.FileChunk{{FileId: "3,01", Size: 100}, {FileId: "3,02", Size: 50}}
if chunksFingerprint(c1) == chunksFingerprint(c2) {
t.Fatal("fingerprint must change when a chunk is appended")
}
}
func TestPersistedLogCacheSingleFlight(t *testing.T) {
c := newPersistedLogCache(persistedLogCacheMaxBytes)
var loads int32
release := make(chan struct{})
load := func() ([]*filer_pb.LogEntry, bool, error) {
atomic.AddInt32(&loads, 1)
<-release // hold the flight open so concurrent callers coalesce
return logEntriesAt(1), true, nil
}
const n = 20
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if _, err := c.getOrLoad("k", "fp", load); err != nil {
t.Error(err)
}
}()
}
time.Sleep(50 * time.Millisecond)
close(release)
wg.Wait()
if got := atomic.LoadInt32(&loads); got != 1 {
t.Fatalf("expected 1 coalesced load across %d callers, got %d", n, got)
}
}
func TestPersistedLogCacheEviction(t *testing.T) {
// One entry estimates to 128 + 100 + 16 = 244 bytes; budget holds one, not two.
c := newPersistedLogCache(300)
mk := func(ts int64) func() ([]*filer_pb.LogEntry, bool, error) {
return func() ([]*filer_pb.LogEntry, bool, error) {
return []*filer_pb.LogEntry{{TsNs: ts, Data: make([]byte, 100)}}, true, nil
}
}
if _, err := c.getOrLoad("a", "fp", mk(1)); err != nil {
t.Fatal(err)
}
if _, err := c.getOrLoad("b", "fp", mk(2)); err != nil { // pushes over budget, evicts LRU "a"
t.Fatal(err)
}
c.mu.Lock()
_, hasA := c.index["a"]
_, hasB := c.index["b"]
c.mu.Unlock()
if hasA {
t.Error("least-recently-used entry a should have been evicted")
}
if !hasB {
t.Error("most-recently-used entry b should remain")
}
}
func TestLogFileIsCacheable(t *testing.T) {
now := time.Now()
if logFileIsCacheable(now.UnixNano()) {
t.Error("a current-minute file must not be cacheable")
}
old := now.Add(-persistedLogCacheMinAge - time.Minute).UnixNano()
if !logFileIsCacheable(old) {
t.Error("a file older than the min age should be cacheable")
}
}
func TestLogFileIteratorCachedFiltering(t *testing.T) {
iter := &LogFileIterator{
cached: logEntriesAt(10, 20, 30, 40),
startTsNs: 15,
stopTsNs: 35,
}
var got []int64
for {
e, err := iter.getNext()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
got = append(got, e.TsNs)
}
want := []int64{20, 30} // 10 filtered by startTsNs, 40 cut off by stopTsNs
if fmt.Sprint(got) != fmt.Sprint(want) {
t.Fatalf("cached filtering: got %v, want %v", got, want)
}
}
func TestLogFileIteratorCachedLoadError(t *testing.T) {
iter := &LogFileIterator{loadErr: fmt.Errorf("boom")}
if _, err := iter.getNext(); err == nil || err.Error() != "boom" {
t.Fatalf("expected load error surfaced on first getNext, got %v", err)
}
}
func TestLogFileIteratorCachedYieldsBeforeError(t *testing.T) {
iter := &LogFileIterator{cached: logEntriesAt(10, 20), loadErr: fmt.Errorf("boom")}
var got []int64
for {
e, err := iter.getNext()
if err != nil {
if err.Error() != "boom" {
t.Fatalf("expected boom after entries, got %v", err)
}
break
}
got = append(got, e.TsNs)
}
if fmt.Sprint(got) != fmt.Sprint([]int64{10, 20}) {
t.Fatalf("partial read must yield entries before the error, got %v", got)
}
}