Cut per-subscriber replay decode and widen replay concurrency (#9917)

* Filter metadata events before unmarshaling them per subscriber

Every subscriber unmarshaled every log entry into a full event just to
run the path filter, and entries carry complete chunk lists, so a fleet
of path-filtered subscribers spends almost all replay CPU materializing
events it then discards. A shallow wire scan now extracts just the
directory, entry names and rename destination into a skeleton event,
feeds the same matcher, and skips the decode for entries the subscriber
cannot match. Any scan surprise (malformed bytes, merged duplicate
message fields) falls back to the full decode, and the unsynced-events
heartbeat keeps firing for skipped entries.

* Raise the legacy replay cap

The cap was sized when every replay pinned a private chunk reader per
source filer. Replays now share decoded chunks, so sixteen needlessly
serializes subscriber catch-up; the expensive part stays bounded by the
cache's load gate.

* Weight concurrent log-chunk loads by size

The flat eight-load gate let eight tiny chunks through as reluctantly as
eight full ones. Charge each load's chunk size against a 128MB in-flight
budget instead: small chunks decode wide open while full-size ones still
serialize enough to cap the transient peak. Oversized weights clamp to
the budget so they can always acquire.

* Propagate heartbeat send failures and reset the skip counter

A failed heartbeat send means the stream is gone, so end the replay
instead of scanning on. A delivered event also resets the skip counter,
keeping the heartbeat cadence relative to the last thing the client
actually received.

* Share the unsynced-events counter across the prefilter and delivery

Two independent counters could starve the heartbeat: alternating drops
reset each side before either reached its threshold. One shared counter
increments on every dropped entry, prefiltered or not, and only an
actual delivery resets it, restoring the original cadence exactly.

* Tighten comments

* Benchmark the subscription match paths

For a thousand-chunk event that the subscriber filters out, the shallow
scan matches in 10us and 9 allocations against 175us and 4031
allocations for the full decode.
This commit is contained in:
Chris Lu
2026-06-10 13:08:34 -07:00
committed by GitHub
parent e56a1c4c05
commit 594fc667d5
10 changed files with 492 additions and 38 deletions
+3 -5
View File
@@ -200,11 +200,9 @@ func isChunkNotFoundError(err error) bool {
httpNotFoundPattern.MatchString(errMsg)
}
// persistedLogReplayLimit caps concurrent legacy replays; each holds a chunk
// reader per source filer, so a reconnect storm of pre-offload clients would
// otherwise pin many GB. Metadata-chunks clients take sendLogFileRefs and never
// reach this path.
const persistedLogReplayLimit = 16
// persistedLogReplayLimit caps concurrent legacy replays; decodes are shared
// through the persisted-log cache, so this only bounds the listing fan-out.
const persistedLogReplayLimit = 64
var persistedLogReplaySem = make(chan struct{}, persistedLogReplayLimit)
+1 -1
View File
@@ -557,7 +557,7 @@ func (iter *LogFileIterator) getNextCached() (logEntry *filer_pb.LogEntry, err e
if chunk.ModifiedTsNs > 0 && chunk.ModifiedTsNs+int64(LogFlushInterval) <= iter.startTsNs {
continue
}
entries, loadErr := iter.cache.getOrLoad(chunk.GetFileIdString(), func() ([]*filer_pb.LogEntry, bool, error) {
entries, loadErr := iter.cache.getOrLoad(chunk.GetFileIdString(), int64(chunk.Size), func() ([]*filer_pb.LogEntry, bool, error) {
return loadLogFileEntriesFn(iter.masterClient, chunk)
})
if loadErr != nil {
+21 -9
View File
@@ -8,6 +8,7 @@ import (
"sync"
"time"
"golang.org/x/sync/semaphore"
"golang.org/x/sync/singleflight"
"google.golang.org/protobuf/proto"
@@ -19,8 +20,9 @@ import (
const (
// persistedLogCacheMaxBytes bounds retained entries regardless of subscriber count.
persistedLogCacheMaxBytes = 256 << 20
// persistedLogCacheMaxLoads bounds how many chunks fetch and decode at once.
persistedLogCacheMaxLoads = 8
// persistedLogCacheLoadBudget bounds in-flight fetch+decode bytes, charged
// by chunk size: small chunks load wide, full-size ones cap the peak.
persistedLogCacheLoadBudget = 128 << 20
// persistedLogCacheIdleTTL frees entries no replay has touched recently, so
// the cache holds memory only while subscribers actually replay.
persistedLogCacheIdleTTL = 5 * time.Minute
@@ -44,7 +46,7 @@ type persistedLogCache struct {
curBytes int64
maxBytes int64
sf singleflight.Group
loadSem chan struct{}
loadSem *semaphore.Weighted
}
type logCacheItem struct {
@@ -59,7 +61,7 @@ func newPersistedLogCache(maxBytes int64) *persistedLogCache {
ll: list.New(),
index: make(map[string]*list.Element),
maxBytes: maxBytes,
loadSem: make(chan struct{}, persistedLogCacheMaxLoads),
loadSem: semaphore.NewWeighted(persistedLogCacheLoadBudget),
}
// the filer's cache lives for the process lifetime
go c.loopEvictIdle()
@@ -97,7 +99,7 @@ type logLoadResult struct {
// coalescing concurrent misses. Only a clean, complete decode is cached: a
// chunk-not-found read must be re-probed on later replays, and an incomplete
// chunk stays with the streaming fallback.
func (c *persistedLogCache) getOrLoad(fileId string, load func() ([]*filer_pb.LogEntry, bool, error)) ([]*filer_pb.LogEntry, error) {
func (c *persistedLogCache) getOrLoad(fileId string, loadBytes int64, load func() ([]*filer_pb.LogEntry, bool, error)) ([]*filer_pb.LogEntry, error) {
if entries, ok := c.lookup(fileId); ok {
return entries, nil
}
@@ -105,7 +107,7 @@ func (c *persistedLogCache) getOrLoad(fileId string, load func() ([]*filer_pb.Lo
if entries, ok := c.lookup(fileId); ok {
return logLoadResult{entries: entries}, nil
}
entries, cacheable, loadErr := c.loadGuarded(load)
entries, cacheable, loadErr := c.loadGuarded(loadBytes, load)
if loadErr == nil && cacheable {
c.store(fileId, entries)
}
@@ -115,9 +117,19 @@ func (c *persistedLogCache) getOrLoad(fileId string, load func() ([]*filer_pb.Lo
return res.entries, res.err
}
func (c *persistedLogCache) loadGuarded(load func() ([]*filer_pb.LogEntry, bool, error)) ([]*filer_pb.LogEntry, bool, error) {
c.loadSem <- struct{}{}
defer func() { <-c.loadSem }()
func (c *persistedLogCache) loadGuarded(loadBytes int64, load func() ([]*filer_pb.LogEntry, bool, error)) ([]*filer_pb.LogEntry, bool, error) {
weight := loadBytes
if weight < 1 {
weight = 1
}
if weight > persistedLogCacheLoadBudget {
// never exceeds the semaphore size, or the acquire could not succeed
weight = persistedLogCacheLoadBudget
}
if err := c.loadSem.Acquire(context.Background(), weight); err != nil {
return nil, false, err
}
defer c.loadSem.Release(weight)
return load()
}
+21 -9
View File
@@ -48,11 +48,11 @@ func TestPersistedLogCacheHitMiss(t *testing.T) {
return logEntriesAt(1, 2, 3), true, nil
}
e1, err := c.getOrLoad("3,01", load)
e1, err := c.getOrLoad("3,01", 1, load)
if err != nil || len(e1) != 3 {
t.Fatalf("first getOrLoad: err=%v len=%d", err, len(e1))
}
e2, err := c.getOrLoad("3,01", load)
e2, err := c.getOrLoad("3,01", 1, load)
if err != nil {
t.Fatal(err)
}
@@ -74,10 +74,10 @@ func TestPersistedLogCacheNotCachedWhenUncacheable(t *testing.T) {
return logEntriesAt(1, 2), false, nil
}
if e, err := c.getOrLoad("3,01", load); err != nil || len(e) != 2 {
if e, err := c.getOrLoad("3,01", 1, load); err != nil || len(e) != 2 {
t.Fatalf("first: err=%v len=%d", err, len(e))
}
if e, err := c.getOrLoad("3,01", load); err != nil || len(e) != 2 {
if e, err := c.getOrLoad("3,01", 1, load); err != nil || len(e) != 2 {
t.Fatalf("second: err=%v len=%d", err, len(e))
}
if n := atomic.LoadInt32(&loads); n != 2 {
@@ -104,7 +104,7 @@ func TestPersistedLogCacheSingleFlight(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
if _, err := c.getOrLoad("3,01", load); err != nil {
if _, err := c.getOrLoad("3,01", 1, load); err != nil {
t.Error(err)
}
}()
@@ -128,10 +128,10 @@ func TestPersistedLogCacheEviction(t *testing.T) {
}
}
if _, err := c.getOrLoad("a", mk(1)); err != nil {
if _, err := c.getOrLoad("a", 1, mk(1)); err != nil {
t.Fatal(err)
}
if _, err := c.getOrLoad("b", mk(2)); err != nil { // pushes over budget, evicts LRU "a"
if _, err := c.getOrLoad("b", 1, mk(2)); err != nil { // pushes over budget, evicts LRU "a"
t.Fatal(err)
}
@@ -364,10 +364,10 @@ func TestPersistedLogCacheIdleEviction(t *testing.T) {
load := func() ([]*filer_pb.LogEntry, bool, error) {
return logEntriesAt(1), true, nil
}
if _, err := c.getOrLoad("idle", load); err != nil {
if _, err := c.getOrLoad("idle", 1, load); err != nil {
t.Fatal(err)
}
if _, err := c.getOrLoad("hot", load); err != nil {
if _, err := c.getOrLoad("hot", 1, load); err != nil {
t.Fatal(err)
}
@@ -413,3 +413,15 @@ func TestDecodeLogRecordsRejectsImplausibleRecords(t *testing.T) {
t.Fatalf("non-increasing prefix: %v", entries)
}
}
func TestPersistedLogCacheLoadLargerThanBudget(t *testing.T) {
// a load weight above the semaphore size could never be acquired; it must
// clamp and still run
c := newPersistedLogCache(persistedLogCacheMaxBytes)
e, err := c.getOrLoad("3,01", persistedLogCacheLoadBudget*4, func() ([]*filer_pb.LogEntry, bool, error) {
return logEntriesAt(1), true, nil
})
if err != nil || len(e) != 1 {
t.Fatalf("oversized load: err=%v len=%d", err, len(e))
}
}
+142
View File
@@ -0,0 +1,142 @@
package filer_pb
import (
"google.golang.org/protobuf/encoding/protowire"
)
// ScanMetadataEventSkeleton extracts just the fields the subscription matcher
// reads, without materializing the entries' chunk lists. ok=false means the
// caller must fall back to a full unmarshal.
func ScanMetadataEventSkeleton(data []byte) (skeleton *SubscribeMetadataResponse, ok bool) {
skeleton = &SubscribeMetadataResponse{}
rest := data
for len(rest) > 0 {
num, typ, n := protowire.ConsumeTag(rest)
if n < 0 {
return nil, false
}
rest = rest[n:]
switch {
case num == 1 && typ == protowire.BytesType: // directory
v, m := protowire.ConsumeBytes(rest)
if m < 0 {
return nil, false
}
skeleton.Directory = string(v)
rest = rest[m:]
case num == 2 && typ == protowire.BytesType: // event_notification
if skeleton.EventNotification != nil {
// repeated occurrences of a message field merge; punt to a full decode
return nil, false
}
v, m := protowire.ConsumeBytes(rest)
if m < 0 {
return nil, false
}
notification, scanOk := scanEventNotificationSkeleton(v)
if !scanOk {
return nil, false
}
skeleton.EventNotification = notification
rest = rest[m:]
case num == 3 && typ == protowire.VarintType: // ts_ns
v, m := protowire.ConsumeVarint(rest)
if m < 0 {
return nil, false
}
skeleton.TsNs = int64(v)
rest = rest[m:]
default:
m := protowire.ConsumeFieldValue(num, typ, rest)
if m < 0 {
return nil, false
}
rest = rest[m:]
}
}
return skeleton, true
}
func scanEventNotificationSkeleton(data []byte) (*EventNotification, bool) {
notification := &EventNotification{}
rest := data
for len(rest) > 0 {
num, typ, n := protowire.ConsumeTag(rest)
if n < 0 {
return nil, false
}
rest = rest[n:]
switch {
case num == 1 && typ == protowire.BytesType: // old_entry
if notification.OldEntry != nil {
return nil, false
}
v, m := protowire.ConsumeBytes(rest)
if m < 0 {
return nil, false
}
name, scanOk := scanEntryName(v)
if !scanOk {
return nil, false
}
notification.OldEntry = &Entry{Name: name}
rest = rest[m:]
case num == 2 && typ == protowire.BytesType: // new_entry
if notification.NewEntry != nil {
return nil, false
}
v, m := protowire.ConsumeBytes(rest)
if m < 0 {
return nil, false
}
name, scanOk := scanEntryName(v)
if !scanOk {
return nil, false
}
notification.NewEntry = &Entry{Name: name}
rest = rest[m:]
case num == 4 && typ == protowire.BytesType: // new_parent_path
v, m := protowire.ConsumeBytes(rest)
if m < 0 {
return nil, false
}
notification.NewParentPath = string(v)
rest = rest[m:]
default:
m := protowire.ConsumeFieldValue(num, typ, rest)
if m < 0 {
return nil, false
}
rest = rest[m:]
}
}
return notification, true
}
// scanEntryName walks the whole entry so field order does not matter.
func scanEntryName(data []byte) (string, bool) {
name := ""
rest := data
for len(rest) > 0 {
num, typ, n := protowire.ConsumeTag(rest)
if n < 0 {
return "", false
}
rest = rest[n:]
if num == 1 && typ == protowire.BytesType { // name; last occurrence wins like proto merge
v, m := protowire.ConsumeBytes(rest)
if m < 0 {
return "", false
}
name = string(v)
rest = rest[m:]
continue
}
m := protowire.ConsumeFieldValue(num, typ, rest)
if m < 0 {
return "", false
}
rest = rest[m:]
}
return name, true
}
@@ -0,0 +1,60 @@
package filer_pb
import (
"testing"
"google.golang.org/protobuf/proto"
)
func benchmarkEventBytes(b *testing.B, chunkCount int) []byte {
b.Helper()
chunks := make([]*FileChunk, chunkCount)
for i := range chunks {
chunks[i] = &FileChunk{FileId: "3,1234567890ab", Offset: int64(i) * 4096, Size: 4096, ModifiedTsNs: int64(i)}
}
data, err := proto.Marshal(&SubscribeMetadataResponse{
Directory: "/data/pvc-42",
TsNs: 123456789,
EventNotification: &EventNotification{
OldEntry: &Entry{Name: "0.log", Chunks: chunks},
NewEntry: &Entry{Name: "0.log", Chunks: chunks},
},
})
if err != nil {
b.Fatal(err)
}
return data
}
// the non-matching subscriber case, which is what a per-path fleet hits for
// almost every entry
func BenchmarkSubscriptionMatchFullDecode(b *testing.B) {
data := benchmarkEventBytes(b, 1000)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
event := &SubscribeMetadataResponse{}
if err := proto.Unmarshal(data, event); err != nil {
b.Fatal(err)
}
if MetadataEventMatchesSubscription(event, "/data/pvc-7/", nil, nil) {
b.Fatal("unexpected match")
}
}
}
func BenchmarkSubscriptionMatchSkeleton(b *testing.B) {
data := benchmarkEventBytes(b, 1000)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
skeleton, ok := ScanMetadataEventSkeleton(data)
if !ok {
b.Fatal("scan failed")
}
if MetadataEventMatchesSubscription(skeleton, "/data/pvc-7/", nil, nil) {
b.Fatal("unexpected match")
}
}
}
@@ -0,0 +1,112 @@
package filer_pb
import (
"testing"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
)
func marshalEvent(t *testing.T, event *SubscribeMetadataResponse) []byte {
t.Helper()
data, err := proto.Marshal(event)
if err != nil {
t.Fatal(err)
}
return data
}
func bigEntry(name string) *Entry {
chunks := make([]*FileChunk, 50)
for i := range chunks {
chunks[i] = &FileChunk{FileId: "3,1234567890ab", Offset: int64(i) * 100, Size: 100}
}
return &Entry{Name: name, Chunks: chunks, Attributes: &FuseAttributes{FileSize: 5000, Mtime: 1234567890}}
}
func TestScanMetadataEventSkeletonMatchesFullDecode(t *testing.T) {
events := map[string]*SubscribeMetadataResponse{
"create": {Directory: "/data/pvc-7", TsNs: 100,
EventNotification: &EventNotification{NewEntry: bigEntry("a.log")}},
"update": {Directory: "/data/pvc-7", TsNs: 200,
EventNotification: &EventNotification{OldEntry: bigEntry("a.log"), NewEntry: bigEntry("a.log")}},
"delete": {Directory: "/data/pvc-7", TsNs: 300,
EventNotification: &EventNotification{OldEntry: bigEntry("a.log"), DeleteChunks: true}},
"rename within dir": {Directory: "/data/pvc-7", TsNs: 400,
EventNotification: &EventNotification{OldEntry: bigEntry("a.log"), NewEntry: bigEntry("b.log")}},
"rename across dirs": {Directory: "/data/pvc-7", TsNs: 500,
EventNotification: &EventNotification{OldEntry: bigEntry("a.log"), NewEntry: bigEntry("a.log"), NewParentPath: "/data/pvc-9"}},
"empty notification": {Directory: "/data/pvc-7", TsNs: 600,
EventNotification: &EventNotification{}},
"no notification": {Directory: "/data/pvc-7", TsNs: 700},
}
filters := []struct {
prefix string
prefixes []string
dirs []string
}{
{prefix: "/data/pvc-7/"},
{prefix: "/data/pvc-9/"},
{prefix: "/data/pvc-1/"},
{prefix: "/"},
{prefixes: []string{"/other/", "/data/pvc-9/"}},
{dirs: []string{"/data/pvc-7"}},
{dirs: []string{"/data/pvc-9"}},
}
for name, event := range events {
skeleton, ok := ScanMetadataEventSkeleton(marshalEvent(t, event))
if !ok {
t.Fatalf("%s: scan failed", name)
}
if skeleton.TsNs != event.TsNs {
t.Errorf("%s: ts %d != %d", name, skeleton.TsNs, event.TsNs)
}
for _, f := range filters {
want := MetadataEventMatchesSubscription(event, f.prefix, f.prefixes, f.dirs)
got := MetadataEventMatchesSubscription(skeleton, f.prefix, f.prefixes, f.dirs)
if got != want {
t.Errorf("%s with filter %+v: skeleton match %v, full match %v", name, f, got, want)
}
}
}
}
func TestScanMetadataEventSkeletonFallsBack(t *testing.T) {
if _, ok := ScanMetadataEventSkeleton([]byte{0xff, 0xff, 0xff}); ok {
t.Error("malformed payload must not scan")
}
// two event_notification occurrences merge under proto semantics; the
// scanner must punt rather than guess
one := marshalEvent(t, &SubscribeMetadataResponse{
EventNotification: &EventNotification{OldEntry: &Entry{Name: "a"}}})
two := append(append([]byte{}, one...), one...)
if _, ok := ScanMetadataEventSkeleton(two); ok {
t.Error("duplicated message field must fall back to full decode")
}
// same for a duplicated entry inside the notification
entry, err := proto.Marshal(&Entry{Name: "a"})
if err != nil {
t.Fatal(err)
}
var notification []byte
for i := 0; i < 2; i++ {
notification = protowire.AppendTag(notification, 1, protowire.BytesType) // old_entry
notification = protowire.AppendBytes(notification, entry)
}
var response []byte
response = protowire.AppendTag(response, 2, protowire.BytesType) // event_notification
response = protowire.AppendBytes(response, notification)
if _, ok := ScanMetadataEventSkeleton(response); ok {
t.Error("duplicated nested entry field must fall back to full decode")
}
}
func TestScanMetadataEventSkeletonEmptyPayload(t *testing.T) {
skeleton, ok := ScanMetadataEventSkeleton(nil)
if !ok || skeleton.Directory != "" || skeleton.EventNotification != nil {
t.Fatalf("empty payload: %+v ok=%v", skeleton, ok)
}
}
+32 -12
View File
@@ -190,14 +190,15 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
aggNotifyChan := fs.filer.MetaAggregator.MetaLogBuffer.RegisterSubscriber(aggNotifyName)
defer fs.filer.MetaAggregator.MetaLogBuffer.UnregisterSubscriber(aggNotifyName)
eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName)
var unsyncedEvents int64
eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName, &unsyncedEvents)
// lastSeenTsNs tracks how far the subscriber has read so idle heartbeats are
// only emitted once it is caught up to the buffer head. It is read and
// written from this single goroutine, so no synchronization is needed.
var lastSeenTsNs int64
var lastHeartbeatNs int64
baseEachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
baseEachLogEntryFn := eachLogEntryFn(req, sender, eachEventNotificationFn, &unsyncedEvents)
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
lastSeenTsNs = logEntry.TsNs
return baseEachLogEntryFn(logEntry)
@@ -331,14 +332,15 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
sender := newPipelinedSender(stream, 1024, req.ClientSupportsBatching)
defer sender.Close()
eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName)
var unsyncedEvents int64
eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName, &unsyncedEvents)
// lastSeenTsNs tracks how far the subscriber has read so idle heartbeats are
// only emitted once it is caught up to the buffer head. It is read and
// written from this single goroutine, so no synchronization is needed.
var lastSeenTsNs int64
var lastHeartbeatNs int64
baseEachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
baseEachLogEntryFn := eachLogEntryFn(req, sender, eachEventNotificationFn, &unsyncedEvents)
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
lastSeenTsNs = logEntry.TsNs
return baseEachLogEntryFn(logEntry)
@@ -490,8 +492,28 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
}
func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) log_buffer.EachLogEntryFuncType {
func eachLogEntryFn(req *filer_pb.SubscribeMetadataRequest, sender metadataStreamSender, eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error, filtered *int64) log_buffer.EachLogEntryFuncType {
// A shallow scan of the path fields skips unmarshaling chunk-heavy events
// this subscriber would filter out anyway; scan surprises fall back to the
// full decode. Only a delivery resets the shared unsynced-events counter.
prefilter := req.PathPrefix != "" || len(req.PathPrefixes) > 0 || len(req.Directories) > 0
return func(logEntry *filer_pb.LogEntry) (bool, error) {
if prefilter {
if skeleton, ok := filer_pb.ScanMetadataEventSkeleton(logEntry.Data); ok &&
!filer_pb.MetadataEventMatchesSubscription(skeleton, req.PathPrefix, req.PathPrefixes, req.Directories) {
*filtered++
if *filtered > MaxUnsyncedEvents {
if err := sender.Send(&filer_pb.SubscribeMetadataResponse{
EventNotification: &filer_pb.EventNotification{},
TsNs: skeleton.TsNs,
}); err != nil {
return false, err
}
*filtered = 0
}
return false, nil
}
}
event := &filer_pb.SubscribeMetadataResponse{}
if err := proto.Unmarshal(logEntry.Data, event); err != nil {
glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
@@ -574,22 +596,20 @@ func (fs *FilerServer) sendLogFileRefs(ctx context.Context, stream metadataStrea
return lastTsNs, false, nil
}
func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, sender metadataStreamSender, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
filtered := 0
func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, sender metadataStreamSender, clientName string, filtered *int64) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
defer func() {
if filtered > MaxUnsyncedEvents {
if *filtered > MaxUnsyncedEvents {
if err := sender.Send(&filer_pb.SubscribeMetadataResponse{
EventNotification: &filer_pb.EventNotification{},
TsNs: tsNs,
}); err == nil {
filtered = 0
*filtered = 0
}
}
}()
filtered++
*filtered++
foundSelf := false
for _, sig := range eventNotification.Signatures {
if sig == req.Signature && req.Signature != 0 {
@@ -636,7 +656,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe
glog.V(0).Infof("=> client %v: %+v", clientName, err)
return err
}
filtered = 0
*filtered = 0
return nil
}
}
@@ -0,0 +1,96 @@
package weed_server
import (
"testing"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
type recordingSender struct {
sent []*filer_pb.SubscribeMetadataResponse
}
func (s *recordingSender) Send(resp *filer_pb.SubscribeMetadataResponse) error {
s.sent = append(s.sent, resp)
return nil
}
func metadataLogEntry(t *testing.T, dir, name string, tsNs int64) *filer_pb.LogEntry {
t.Helper()
data, err := proto.Marshal(&filer_pb.SubscribeMetadataResponse{
Directory: dir,
TsNs: tsNs,
EventNotification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: name},
},
})
if err != nil {
t.Fatal(err)
}
return &filer_pb.LogEntry{TsNs: tsNs, Data: data}
}
func TestEachLogEntryFnPrefilterSkipsDecode(t *testing.T) {
req := &filer_pb.SubscribeMetadataRequest{PathPrefix: "/data/pvc-7/"}
sender := &recordingSender{}
var decoded int
var unsyncedEvents int64
fn := eachLogEntryFn(req, sender, func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
decoded++
unsyncedEvents = 0 // emulate a delivery, like the notification fn after a send
return nil
}, &unsyncedEvents)
// non-matching entries skip the full decode but keep heartbeating
for i := 0; i <= MaxUnsyncedEvents; i++ {
if _, err := fn(metadataLogEntry(t, "/data/pvc-1", "x.log", int64(i+1))); err != nil {
t.Fatal(err)
}
}
if decoded != 0 {
t.Fatalf("non-matching entries must not reach the notification fn, got %d", decoded)
}
if len(sender.sent) != 1 {
t.Fatalf("expected one unsynced heartbeat, got %d", len(sender.sent))
}
if hb := sender.sent[0]; hb.TsNs != int64(MaxUnsyncedEvents+1) || len(hb.EventNotification.Signatures) != 0 {
t.Fatalf("unexpected heartbeat %+v", hb)
}
// a matching entry takes the full decode path
if _, err := fn(metadataLogEntry(t, "/data/pvc-7", "y.log", 9999)); err != nil {
t.Fatal(err)
}
if decoded != 1 {
t.Fatalf("matching entry must be decoded and delivered, got %d", decoded)
}
// the delivery reset the shared counter: a fresh full window passes
// before the next heartbeat
for i := 0; i <= MaxUnsyncedEvents; i++ {
if _, err := fn(metadataLogEntry(t, "/data/pvc-1", "x.log", int64(20000+i))); err != nil {
t.Fatal(err)
}
}
if len(sender.sent) != 2 {
t.Fatalf("expected a second heartbeat after a full window, got %d sends", len(sender.sent))
}
}
func TestEachLogEntryFnNoFilterDecodesEverything(t *testing.T) {
req := &filer_pb.SubscribeMetadataRequest{}
var decoded int
var unsyncedEvents int64
fn := eachLogEntryFn(req, &recordingSender{}, func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
decoded++
return nil
}, &unsyncedEvents)
if _, err := fn(metadataLogEntry(t, "/anywhere", "x", 1)); err != nil {
t.Fatal(err)
}
if decoded != 1 {
t.Fatalf("unfiltered subscriber must decode every entry, got %d", decoded)
}
}
@@ -188,7 +188,8 @@ func TestEachEventNotificationFnMatchesRenameTargetsForAllWatchTypes(t *testing.
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stream := &collectingStream{}
eachEventFn := fs.eachEventNotificationFn(tt.req, stream, "client")
var unsyncedEvents int64
eachEventFn := fs.eachEventNotificationFn(tt.req, stream, "client", &unsyncedEvents)
newDir := "/etc/remote"
if len(tt.req.Directories) > 0 {
@@ -509,7 +510,8 @@ func TestFilteredEventsEmitMaxUnsyncedMarker(t *testing.T) {
req := &filer_pb.SubscribeMetadataRequest{ClientName: "syncFrom_A_To_B", PathPrefix: "/watched/"}
stream := &collectingStream{}
eachEventFn := fs.eachEventNotificationFn(req, stream, "client")
var unsyncedEvents int64
eachEventFn := fs.eachEventNotificationFn(req, stream, "client", &unsyncedEvents)
base := time.Now().UnixNano()
var lastTsNs int64