From 9e98ec4b2e17dc4277e4c53e9f02ab21ed1899b0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 9 Jun 2026 13:34:11 -0700 Subject: [PATCH] Share decoded metadata-log entries across subscriber replays (#9903) perf(filer): share decoded log entries across metadata replays Concurrent SubscribeMetadata replays of the same persisted log history each opened a chunk reader per source filer and re-decoded the same files, so a reconnect storm multiplied into many GB of buffers. Cache the decoded entries of completed log files in a bounded LRU, coalescing concurrent loads with single-flight and bounding concurrent decodes. Each hit is validated against the file's current chunk set, so a file that received a late append is reloaded rather than served stale; reads that stop on an unreachable chunk are delivered but not cached so a transient outage re-probes on the next replay. --- weed/filer/filer.go | 2 + weed/filer/filer_notify_read.go | 68 ++++++- weed/filer/persisted_log_cache.go | 242 +++++++++++++++++++++++++ weed/filer/persisted_log_cache_test.go | 226 +++++++++++++++++++++++ 4 files changed, 530 insertions(+), 8 deletions(-) create mode 100644 weed/filer/persisted_log_cache.go create mode 100644 weed/filer/persisted_log_cache_test.go diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 750862751..9e13898ae 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -64,6 +64,7 @@ type Filer struct { DeletionRetryQueue *DeletionRetryQueue EmptyFolderCleaner *empty_folder_cleanup.EmptyFolderCleaner EmptyFolderCleanupDelay time.Duration + persistedLogCache *persistedLogCache } func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer { @@ -78,6 +79,7 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH MaxFilenameLength: maxFilenameLength, deletionQuit: make(chan struct{}), DeletionRetryQueue: NewDeletionRetryQueue(), + persistedLogCache: newPersistedLogCache(persistedLogCacheMaxBytes), } if f.UniqueFilerId < 0 { f.UniqueFilerId = -f.UniqueFilerId diff --git a/weed/filer/filer_notify_read.go b/weed/filer/filer_notify_read.go index c5c08caa8..695e38af5 100644 --- a/weed/filer/filer_notify_read.go +++ b/weed/filer/filer_notify_read.go @@ -300,7 +300,7 @@ func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) { } iter, found := v.perFilerIteratorMap[filerId] if !found { - iter = newLogFileQueueIterator(c.f.MasterClient, util.NewQueue[*LogFileEntry](), c.startTsNs, c.stopTsNs) + iter = newLogFileQueueIterator(c.f.MasterClient, c.f.persistedLogCache, util.NewQueue[*LogFileEntry](), c.startTsNs, c.stopTsNs) v.perFilerIteratorMap[filerId] = iter freshFilerIds[filerId] = hourMinuteEntry.Name() } @@ -340,15 +340,17 @@ func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) { type LogFileQueueIterator struct { q *util.Queue[*LogFileEntry] masterClient *wdclient.MasterClient + cache *persistedLogCache startTsNs int64 stopTsNs int64 currentFileIterator *LogFileIterator } -func newLogFileQueueIterator(masterClient *wdclient.MasterClient, q *util.Queue[*LogFileEntry], startTsNs, stopTsNs int64) *LogFileQueueIterator { +func newLogFileQueueIterator(masterClient *wdclient.MasterClient, cache *persistedLogCache, q *util.Queue[*LogFileEntry], startTsNs, stopTsNs int64) *LogFileQueueIterator { return &LogFileQueueIterator{ q: q, masterClient: masterClient, + cache: cache, startTsNs: startTsNs, stopTsNs: stopTsNs, } @@ -418,27 +420,48 @@ func (iter *LogFileQueueIterator) getNext(v *OrderedLogVisitor) (logEntry *filer if next != nil && next.TsNs <= iter.startTsNs { continue } - iter.currentFileIterator = newLogFileIterator(iter.masterClient, t.FileEntry, iter.startTsNs, iter.stopTsNs) + iter.currentFileIterator = newLogFileIterator(iter.masterClient, iter.cache, t.FileEntry, t.TsNs, iter.startTsNs, iter.stopTsNs) } } // ---------- type LogFileIterator struct { - r io.Reader - sizeBuf []byte + // streaming mode (current/incomplete file): one entry read at a time. + r io.Reader + sizeBuf []byte + // cached mode (completed file): iterate the shared decoded slice. Entries are + // read-only and shared across subscribers, so they are never mutated here. + cached []*filer_pb.LogEntry + cachedPos int + loadErr error startTsNs int64 stopTsNs int64 filePath string } -func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator { +func newLogFileIterator(masterClient *wdclient.MasterClient, cache *persistedLogCache, fileEntry *Entry, fileTsNs, startTsNs, stopTsNs int64) *LogFileIterator { + filePath := string(fileEntry.FullPath) + if cache != nil && logFileIsCacheable(fileTsNs) { + chunks := fileEntry.GetChunks() + fingerprint := chunksFingerprint(chunks) + entries, err := cache.getOrLoad(filePath, fingerprint, func() ([]*filer_pb.LogEntry, bool, error) { + return loadLogFileEntries(masterClient, filePath, chunks) + }) + return &LogFileIterator{ + cached: entries, + loadErr: err, + startTsNs: startTsNs, + stopTsNs: stopTsNs, + filePath: filePath, + } + } return &LogFileIterator{ r: NewChunkStreamReaderFromFiler(context.Background(), masterClient, fileEntry.Chunks), sizeBuf: make([]byte, 4), startTsNs: startTsNs, stopTsNs: stopTsNs, - filePath: string(fileEntry.FullPath), + filePath: filePath, } } @@ -451,6 +474,9 @@ func (iter *LogFileIterator) Close() error { // getNext will return io.EOF when done func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error) { + if iter.r == nil { + return iter.getNextCached() + } var n int for { n, err = iter.r.Read(iter.sizeBuf) @@ -461,7 +487,9 @@ func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error) return nil, fmt.Errorf("size %d bytes, expected 4 bytes", n) } size := util.BytesToUint32(iter.sizeBuf) - // println("entry size", size) + if size > maxLogEntrySize { + return nil, fmt.Errorf("%s entry size %d exceeds %d", iter.filePath, size, maxLogEntrySize) + } entryData := make([]byte, size) n, err = iter.r.Read(entryData) if err != nil { @@ -483,3 +511,27 @@ func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error) return } } + +// getNextCached yields entries from the shared cached slice, applying the same +// per-subscriber timestamp filtering as the streaming path. Any load error is +// surfaced only after the read entries are yielded, so a partial read delivers +// its prefix before failing, just like the streaming path. +func (iter *LogFileIterator) getNextCached() (logEntry *filer_pb.LogEntry, err error) { + for iter.cachedPos < len(iter.cached) { + logEntry = iter.cached[iter.cachedPos] + iter.cachedPos++ + if logEntry.TsNs <= iter.startTsNs { + continue + } + if iter.stopTsNs != 0 && logEntry.TsNs > iter.stopTsNs { + return nil, io.EOF + } + return logEntry, nil + } + if iter.loadErr != nil { + err = iter.loadErr + iter.loadErr = nil + return nil, err + } + return nil, io.EOF +} diff --git a/weed/filer/persisted_log_cache.go b/weed/filer/persisted_log_cache.go new file mode 100644 index 000000000..c24777cb7 --- /dev/null +++ b/weed/filer/persisted_log_cache.go @@ -0,0 +1,242 @@ +package filer + +import ( + "container/list" + "context" + "fmt" + "io" + "sync" + "time" + + "golang.org/x/sync/singleflight" + "google.golang.org/protobuf/proto" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/wdclient" +) + +const ( + // persistedLogCacheMaxBytes bounds retained entries regardless of subscriber + // count. + persistedLogCacheMaxBytes = 256 << 20 + // persistedLogCacheMinAge keeps the actively-written current file out of the + // cache to avoid reload churn. It is a heuristic, not a correctness gate: + // every hit is validated against the file's current chunk set (a fingerprint), + // so a file that turns out to have grown is reloaded rather than served stale. + persistedLogCacheMinAge = 2 * LogFlushInterval + // persistedLogCacheMaxLoads bounds how many distinct files decode at once, so + // the transient peak (an 8MB chunk reader plus the decoded slice per load) + // stays bounded no matter how many positions the subscriber fleet spans. + persistedLogCacheMaxLoads = 8 + // maxLogEntrySize guards the per-entry allocation against a corrupt size + // prefix. The write path caps a single entry well under this, so it never + // rejects a legitimate event. + maxLogEntrySize = 1 << 30 +) + +// persistedLogCache shares the decoded entries of completed metadata log files +// across concurrent SubscribeMetadata replays, so N replays of the same history +// read and decode each file once instead of N times. Cached entries are shared +// read-only; callers must not mutate them. +type persistedLogCache struct { + mu sync.Mutex + ll *list.List // front = most recently used; values are *logCacheItem + index map[string]*list.Element + curBytes int64 + maxBytes int64 + sf singleflight.Group + loadSem chan struct{} +} + +type logCacheItem struct { + key string + // fingerprint identifies the file's chunk set when it was decoded. A cached + // item is only served when the caller's fingerprint still matches; an + // appended-to file (e.g. a delayed flush landed after the file was first read) + // produces a new fingerprint and is reloaded. + fingerprint string + entries []*filer_pb.LogEntry + bytes int64 +} + +func newPersistedLogCache(maxBytes int64) *persistedLogCache { + return &persistedLogCache{ + ll: list.New(), + index: make(map[string]*list.Element), + maxBytes: maxBytes, + loadSem: make(chan struct{}, persistedLogCacheMaxLoads), + } +} + +// logFileIsCacheable reports whether a log file identified by its minute +// timestamp is old enough that caching it is worthwhile (it is unlikely to still +// be receiving appends). Correctness does not depend on this being exact. +func logFileIsCacheable(fileTsNs int64) bool { + return time.Since(time.Unix(0, fileTsNs)) > persistedLogCacheMinAge +} + +// chunksFingerprint summarizes a log file's append-only chunk set; it changes +// whenever the file grows. +func chunksFingerprint(chunks []*filer_pb.FileChunk) string { + if len(chunks) == 0 { + return "empty" + } + var total uint64 + for _, c := range chunks { + total += c.Size + } + return fmt.Sprintf("%d/%d/%s", len(chunks), total, chunks[len(chunks)-1].GetFileIdString()) +} + +// getOrLoad returns the decoded entries for key, loading them once on miss and +// coalescing concurrent misses for the same (key, fingerprint). A cached item +// whose fingerprint no longer matches is treated as a miss and replaced. The +// returned slice is shared and must be treated as read-only. +type logLoadResult struct { + entries []*filer_pb.LogEntry + err error +} + +// load returns the decoded entries, whether the result is stable enough to +// cache, and any error. A read that stopped on a missing chunk is delivered to +// this caller but not cached: the truncation point depends on transient volume +// availability, not on the (cached) fingerprint, so it must be re-probed on +// later replays. On a genuine error the partially-read entries are still +// returned alongside the error so the caller can deliver them before failing, +// matching the streaming path; only a clean, complete read is cached. +func (c *persistedLogCache) getOrLoad(key, fingerprint string, load func() ([]*filer_pb.LogEntry, bool, error)) ([]*filer_pb.LogEntry, error) { + if entries, ok := c.lookup(key, fingerprint); ok { + return entries, nil + } + v, _, _ := c.sf.Do(key+"\x00"+fingerprint, func() (interface{}, error) { + if entries, ok := c.lookup(key, fingerprint); ok { + return logLoadResult{entries: entries}, nil + } + entries, cacheable, loadErr := c.loadGuarded(load) + if loadErr == nil && cacheable { + c.store(key, fingerprint, entries) + } + return logLoadResult{entries: entries, err: loadErr}, nil + }) + res := v.(logLoadResult) + return res.entries, res.err +} + +// loadGuarded runs load under the concurrent-load semaphore, releasing the slot +// via defer so an abnormal exit cannot leak slots and wedge future loads. +func (c *persistedLogCache) loadGuarded(load func() ([]*filer_pb.LogEntry, bool, error)) ([]*filer_pb.LogEntry, bool, error) { + c.loadSem <- struct{}{} + defer func() { <-c.loadSem }() + return load() +} + +func (c *persistedLogCache) lookup(key, fingerprint string) ([]*filer_pb.LogEntry, bool) { + c.mu.Lock() + defer c.mu.Unlock() + el, ok := c.index[key] + if !ok { + return nil, false + } + item := el.Value.(*logCacheItem) + if item.fingerprint != fingerprint { + // The file changed (grew) since it was cached; drop the stale snapshot so + // the caller reloads and stores the current version. + c.removeElement(el) + return nil, false + } + c.ll.MoveToFront(el) + return item.entries, true +} + +func (c *persistedLogCache) store(key, fingerprint string, entries []*filer_pb.LogEntry) { + bytes := estimateEntriesBytes(entries) + if bytes > c.maxBytes { + // A single file larger than the whole budget would evict everything else + // and still not fit; serve it this round but do not retain it. + return + } + c.mu.Lock() + defer c.mu.Unlock() + if el, ok := c.index[key]; ok { + c.removeElement(el) // replace any older version of this file + } + el := c.ll.PushFront(&logCacheItem{key: key, fingerprint: fingerprint, entries: entries, bytes: bytes}) + c.index[key] = el + c.curBytes += bytes + // Evict least-recently-used until under budget, but always keep what we just + // inserted (a single file larger than the budget still serves this round). + for c.curBytes > c.maxBytes && c.ll.Len() > 1 { + c.removeElement(c.ll.Back()) + } +} + +// removeElement drops an element from both the list and the index. Caller holds mu. +func (c *persistedLogCache) removeElement(el *list.Element) { + item := el.Value.(*logCacheItem) + c.ll.Remove(el) + delete(c.index, item.key) + c.curBytes -= item.bytes +} + +func estimateEntriesBytes(entries []*filer_pb.LogEntry) int64 { + // LogEntry struct (~112B) + slice slot (8B) + a little slack, plus the + // payload backing arrays. Deliberately generous so curBytes does not run + // under the real retained heap. + total := int64(len(entries)) * 128 + for _, e := range entries { + total += int64(len(e.Data)+len(e.Key)) + 16 + } + return total +} + +// loadLogFileEntries reads a whole persisted log file from volume servers and +// decodes every LogEntry. The second result reports whether the read reached a +// clean end (cacheable). A chunk-not-found stop returns the readable prefix with +// cacheable=false: the file's chunk list (the cache fingerprint) is unchanged by +// a momentarily-unreachable volume, so caching the prefix would pin a truncation +// that a transient outage should self-heal. Such reads mirror the streaming +// path for this round but are re-probed on the next replay instead. +func loadLogFileEntries(masterClient *wdclient.MasterClient, filePath string, chunks []*filer_pb.FileChunk) (entries []*filer_pb.LogEntry, cacheable bool, err error) { + r := NewChunkStreamReaderFromFiler(context.Background(), masterClient, chunks) + defer r.Close() + + sizeBuf := make([]byte, 4) + for { + if _, readErr := io.ReadFull(r, sizeBuf); readErr != nil { + if readErr == io.EOF { + // clean end exactly at a record boundary: every record was read + return entries, true, nil + } + if readErr == io.ErrUnexpectedEOF { + // stopped mid-frame (a partial size prefix): ambiguous, possibly a + // transient short read, so deliver the prefix but do not cache it + glog.V(1).Infof("log file %s ends mid-record, not caching", filePath) + return entries, false, nil + } + if isChunkNotFoundError(readErr) { + glog.V(1).Infof("log file %s has an unreadable chunk, not caching: %v", filePath, readErr) + return entries, false, nil + } + return entries, false, readErr + } + size := util.BytesToUint32(sizeBuf) + if size > maxLogEntrySize { + return entries, false, fmt.Errorf("log file %s entry size %d exceeds %d", filePath, size, maxLogEntrySize) + } + entryData := make([]byte, size) + if _, readErr := io.ReadFull(r, entryData); readErr != nil { + if isChunkNotFoundError(readErr) { + glog.V(1).Infof("log file %s has an unreadable chunk, not caching: %v", filePath, readErr) + return entries, false, nil + } + return entries, false, readErr + } + logEntry := &filer_pb.LogEntry{} + if unmarshalErr := proto.Unmarshal(entryData, logEntry); unmarshalErr != nil { + return entries, false, unmarshalErr + } + entries = append(entries, logEntry) + } +} diff --git a/weed/filer/persisted_log_cache_test.go b/weed/filer/persisted_log_cache_test.go new file mode 100644 index 000000000..e3db7fbb4 --- /dev/null +++ b/weed/filer/persisted_log_cache_test.go @@ -0,0 +1,226 @@ +package filer + +import ( + "fmt" + "io" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +func logEntriesAt(tsNs ...int64) []*filer_pb.LogEntry { + out := make([]*filer_pb.LogEntry, 0, len(tsNs)) + for _, ts := range tsNs { + out = append(out, &filer_pb.LogEntry{TsNs: ts, Data: []byte("x")}) + } + return out +} + +func TestPersistedLogCacheHitMiss(t *testing.T) { + c := newPersistedLogCache(persistedLogCacheMaxBytes) + var loads int32 + load := func() ([]*filer_pb.LogEntry, bool, error) { + atomic.AddInt32(&loads, 1) + return logEntriesAt(1, 2, 3), true, nil + } + + e1, err := c.getOrLoad("k", "fp", load) + if err != nil || len(e1) != 3 { + t.Fatalf("first getOrLoad: err=%v len=%d", err, len(e1)) + } + e2, err := c.getOrLoad("k", "fp", load) + if err != nil { + t.Fatal(err) + } + if n := atomic.LoadInt32(&loads); n != 1 { + t.Fatalf("expected loader called once, got %d", n) + } + if &e1[0] != &e2[0] { + t.Fatal("cache should return the same shared slice") + } +} + +func TestPersistedLogCacheFingerprintReload(t *testing.T) { + c := newPersistedLogCache(persistedLogCacheMaxBytes) + var loads int32 + loadV1 := func() ([]*filer_pb.LogEntry, bool, error) { + atomic.AddInt32(&loads, 1) + return logEntriesAt(1, 2), true, nil + } + loadV2 := func() ([]*filer_pb.LogEntry, bool, error) { + atomic.AddInt32(&loads, 1) + return logEntriesAt(1, 2, 3), true, nil // a delayed append grew the file + } + + if e, _ := c.getOrLoad("k", "fp1", loadV1); len(e) != 2 { + t.Fatalf("v1 len=%d", len(e)) + } + // a later replay observes the grown file (new fingerprint): it must reload, + // not serve the stale truncated snapshot. + e, err := c.getOrLoad("k", "fp2", loadV2) + if err != nil { + t.Fatal(err) + } + if len(e) != 3 { + t.Fatalf("expected reload to 3 entries, got %d", len(e)) + } + if n := atomic.LoadInt32(&loads); n != 2 { + t.Fatalf("expected one reload (2 loads), got %d", n) + } + // the cache now holds the new version; the matching fingerprint hits + if _, err := c.getOrLoad("k", "fp2", loadV2); err != nil { + t.Fatal(err) + } + if n := atomic.LoadInt32(&loads); n != 2 { + t.Fatalf("expected fp2 to hit (still 2 loads), got %d", n) + } +} + +func TestPersistedLogCacheNotCachedWhenUncacheable(t *testing.T) { + c := newPersistedLogCache(persistedLogCacheMaxBytes) + var loads int32 + // cacheable=false models a chunk-not-found stop: deliver the prefix this + // round but never pin it, so a transient outage is re-probed next replay. + load := func() ([]*filer_pb.LogEntry, bool, error) { + atomic.AddInt32(&loads, 1) + return logEntriesAt(1, 2), false, nil + } + + if e, err := c.getOrLoad("k", "fp", load); err != nil || len(e) != 2 { + t.Fatalf("first: err=%v len=%d", err, len(e)) + } + if e, err := c.getOrLoad("k", "fp", load); err != nil || len(e) != 2 { + t.Fatalf("second: err=%v len=%d", err, len(e)) + } + if n := atomic.LoadInt32(&loads); n != 2 { + t.Fatalf("uncacheable result must re-load every time, got %d loads", n) + } +} + +func TestChunksFingerprintChangesOnGrowth(t *testing.T) { + c1 := []*filer_pb.FileChunk{{FileId: "3,01", Size: 100}} + c2 := []*filer_pb.FileChunk{{FileId: "3,01", Size: 100}, {FileId: "3,02", Size: 50}} + if chunksFingerprint(c1) == chunksFingerprint(c2) { + t.Fatal("fingerprint must change when a chunk is appended") + } +} + +func TestPersistedLogCacheSingleFlight(t *testing.T) { + c := newPersistedLogCache(persistedLogCacheMaxBytes) + var loads int32 + release := make(chan struct{}) + load := func() ([]*filer_pb.LogEntry, bool, error) { + atomic.AddInt32(&loads, 1) + <-release // hold the flight open so concurrent callers coalesce + return logEntriesAt(1), true, nil + } + + const n = 20 + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + if _, err := c.getOrLoad("k", "fp", load); err != nil { + t.Error(err) + } + }() + } + time.Sleep(50 * time.Millisecond) + close(release) + wg.Wait() + + if got := atomic.LoadInt32(&loads); got != 1 { + t.Fatalf("expected 1 coalesced load across %d callers, got %d", n, got) + } +} + +func TestPersistedLogCacheEviction(t *testing.T) { + // One entry estimates to 128 + 100 + 16 = 244 bytes; budget holds one, not two. + c := newPersistedLogCache(300) + mk := func(ts int64) func() ([]*filer_pb.LogEntry, bool, error) { + return func() ([]*filer_pb.LogEntry, bool, error) { + return []*filer_pb.LogEntry{{TsNs: ts, Data: make([]byte, 100)}}, true, nil + } + } + + if _, err := c.getOrLoad("a", "fp", mk(1)); err != nil { + t.Fatal(err) + } + if _, err := c.getOrLoad("b", "fp", mk(2)); err != nil { // pushes over budget, evicts LRU "a" + t.Fatal(err) + } + + c.mu.Lock() + _, hasA := c.index["a"] + _, hasB := c.index["b"] + c.mu.Unlock() + if hasA { + t.Error("least-recently-used entry a should have been evicted") + } + if !hasB { + t.Error("most-recently-used entry b should remain") + } +} + +func TestLogFileIsCacheable(t *testing.T) { + now := time.Now() + if logFileIsCacheable(now.UnixNano()) { + t.Error("a current-minute file must not be cacheable") + } + old := now.Add(-persistedLogCacheMinAge - time.Minute).UnixNano() + if !logFileIsCacheable(old) { + t.Error("a file older than the min age should be cacheable") + } +} + +func TestLogFileIteratorCachedFiltering(t *testing.T) { + iter := &LogFileIterator{ + cached: logEntriesAt(10, 20, 30, 40), + startTsNs: 15, + stopTsNs: 35, + } + var got []int64 + for { + e, err := iter.getNext() + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + got = append(got, e.TsNs) + } + want := []int64{20, 30} // 10 filtered by startTsNs, 40 cut off by stopTsNs + if fmt.Sprint(got) != fmt.Sprint(want) { + t.Fatalf("cached filtering: got %v, want %v", got, want) + } +} + +func TestLogFileIteratorCachedLoadError(t *testing.T) { + iter := &LogFileIterator{loadErr: fmt.Errorf("boom")} + if _, err := iter.getNext(); err == nil || err.Error() != "boom" { + t.Fatalf("expected load error surfaced on first getNext, got %v", err) + } +} + +func TestLogFileIteratorCachedYieldsBeforeError(t *testing.T) { + iter := &LogFileIterator{cached: logEntriesAt(10, 20), loadErr: fmt.Errorf("boom")} + var got []int64 + for { + e, err := iter.getNext() + if err != nil { + if err.Error() != "boom" { + t.Fatalf("expected boom after entries, got %v", err) + } + break + } + got = append(got, e.TsNs) + } + if fmt.Sprint(got) != fmt.Sprint([]int64{10, 20}) { + t.Fatalf("partial read must yield entries before the error, got %v", got) + } +}