From 594fc667d5e11effdaf22894c0869f865d197da3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 10 Jun 2026 13:08:34 -0700 Subject: [PATCH] Cut per-subscriber replay decode and widen replay concurrency (#9917) * Filter metadata events before unmarshaling them per subscriber Every subscriber unmarshaled every log entry into a full event just to run the path filter, and entries carry complete chunk lists, so a fleet of path-filtered subscribers spends almost all replay CPU materializing events it then discards. A shallow wire scan now extracts just the directory, entry names and rename destination into a skeleton event, feeds the same matcher, and skips the decode for entries the subscriber cannot match. Any scan surprise (malformed bytes, merged duplicate message fields) falls back to the full decode, and the unsynced-events heartbeat keeps firing for skipped entries. * Raise the legacy replay cap The cap was sized when every replay pinned a private chunk reader per source filer. Replays now share decoded chunks, so sixteen needlessly serializes subscriber catch-up; the expensive part stays bounded by the cache's load gate. * Weight concurrent log-chunk loads by size The flat eight-load gate let eight tiny chunks through as reluctantly as eight full ones. Charge each load's chunk size against a 128MB in-flight budget instead: small chunks decode wide open while full-size ones still serialize enough to cap the transient peak. Oversized weights clamp to the budget so they can always acquire. * Propagate heartbeat send failures and reset the skip counter A failed heartbeat send means the stream is gone, so end the replay instead of scanning on. A delivered event also resets the skip counter, keeping the heartbeat cadence relative to the last thing the client actually received. * Share the unsynced-events counter across the prefilter and delivery Two independent counters could starve the heartbeat: alternating drops reset each side before either reached its threshold. One shared counter increments on every dropped entry, prefiltered or not, and only an actual delivery resets it, restoring the original cadence exactly. * Tighten comments * Benchmark the subscription match paths For a thousand-chunk event that the subscriber filters out, the shallow scan matches in 10us and 9 allocations against 175us and 4031 allocations for the full decode. --- weed/filer/filer_notify.go | 8 +- weed/filer/filer_notify_read.go | 2 +- weed/filer/persisted_log_cache.go | 30 ++-- weed/filer/persisted_log_cache_test.go | 30 ++-- weed/pb/filer_pb/filer_pb_event_scan.go | 142 ++++++++++++++++++ .../filer_pb_event_scan_bench_test.go | 60 ++++++++ weed/pb/filer_pb/filer_pb_event_scan_test.go | 112 ++++++++++++++ weed/server/filer_grpc_server_sub_meta.go | 44 ++++-- ...ler_grpc_server_sub_meta_prefilter_test.go | 96 ++++++++++++ .../server/filer_grpc_server_sub_meta_test.go | 6 +- 10 files changed, 492 insertions(+), 38 deletions(-) create mode 100644 weed/pb/filer_pb/filer_pb_event_scan.go create mode 100644 weed/pb/filer_pb/filer_pb_event_scan_bench_test.go create mode 100644 weed/pb/filer_pb/filer_pb_event_scan_test.go create mode 100644 weed/server/filer_grpc_server_sub_meta_prefilter_test.go diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index c66235afc..19f020185 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -200,11 +200,9 @@ func isChunkNotFoundError(err error) bool { httpNotFoundPattern.MatchString(errMsg) } -// persistedLogReplayLimit caps concurrent legacy replays; each holds a chunk -// reader per source filer, so a reconnect storm of pre-offload clients would -// otherwise pin many GB. Metadata-chunks clients take sendLogFileRefs and never -// reach this path. -const persistedLogReplayLimit = 16 +// persistedLogReplayLimit caps concurrent legacy replays; decodes are shared +// through the persisted-log cache, so this only bounds the listing fan-out. +const persistedLogReplayLimit = 64 var persistedLogReplaySem = make(chan struct{}, persistedLogReplayLimit) diff --git a/weed/filer/filer_notify_read.go b/weed/filer/filer_notify_read.go index 1f0dd0507..8e89805c8 100644 --- a/weed/filer/filer_notify_read.go +++ b/weed/filer/filer_notify_read.go @@ -557,7 +557,7 @@ func (iter *LogFileIterator) getNextCached() (logEntry *filer_pb.LogEntry, err e if chunk.ModifiedTsNs > 0 && chunk.ModifiedTsNs+int64(LogFlushInterval) <= iter.startTsNs { continue } - entries, loadErr := iter.cache.getOrLoad(chunk.GetFileIdString(), func() ([]*filer_pb.LogEntry, bool, error) { + entries, loadErr := iter.cache.getOrLoad(chunk.GetFileIdString(), int64(chunk.Size), func() ([]*filer_pb.LogEntry, bool, error) { return loadLogFileEntriesFn(iter.masterClient, chunk) }) if loadErr != nil { diff --git a/weed/filer/persisted_log_cache.go b/weed/filer/persisted_log_cache.go index 762e17d74..2d48fc6b6 100644 --- a/weed/filer/persisted_log_cache.go +++ b/weed/filer/persisted_log_cache.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "golang.org/x/sync/semaphore" "golang.org/x/sync/singleflight" "google.golang.org/protobuf/proto" @@ -19,8 +20,9 @@ import ( const ( // persistedLogCacheMaxBytes bounds retained entries regardless of subscriber count. persistedLogCacheMaxBytes = 256 << 20 - // persistedLogCacheMaxLoads bounds how many chunks fetch and decode at once. - persistedLogCacheMaxLoads = 8 + // persistedLogCacheLoadBudget bounds in-flight fetch+decode bytes, charged + // by chunk size: small chunks load wide, full-size ones cap the peak. + persistedLogCacheLoadBudget = 128 << 20 // persistedLogCacheIdleTTL frees entries no replay has touched recently, so // the cache holds memory only while subscribers actually replay. persistedLogCacheIdleTTL = 5 * time.Minute @@ -44,7 +46,7 @@ type persistedLogCache struct { curBytes int64 maxBytes int64 sf singleflight.Group - loadSem chan struct{} + loadSem *semaphore.Weighted } type logCacheItem struct { @@ -59,7 +61,7 @@ func newPersistedLogCache(maxBytes int64) *persistedLogCache { ll: list.New(), index: make(map[string]*list.Element), maxBytes: maxBytes, - loadSem: make(chan struct{}, persistedLogCacheMaxLoads), + loadSem: semaphore.NewWeighted(persistedLogCacheLoadBudget), } // the filer's cache lives for the process lifetime go c.loopEvictIdle() @@ -97,7 +99,7 @@ type logLoadResult struct { // coalescing concurrent misses. Only a clean, complete decode is cached: a // chunk-not-found read must be re-probed on later replays, and an incomplete // chunk stays with the streaming fallback. -func (c *persistedLogCache) getOrLoad(fileId string, load func() ([]*filer_pb.LogEntry, bool, error)) ([]*filer_pb.LogEntry, error) { +func (c *persistedLogCache) getOrLoad(fileId string, loadBytes int64, load func() ([]*filer_pb.LogEntry, bool, error)) ([]*filer_pb.LogEntry, error) { if entries, ok := c.lookup(fileId); ok { return entries, nil } @@ -105,7 +107,7 @@ func (c *persistedLogCache) getOrLoad(fileId string, load func() ([]*filer_pb.Lo if entries, ok := c.lookup(fileId); ok { return logLoadResult{entries: entries}, nil } - entries, cacheable, loadErr := c.loadGuarded(load) + entries, cacheable, loadErr := c.loadGuarded(loadBytes, load) if loadErr == nil && cacheable { c.store(fileId, entries) } @@ -115,9 +117,19 @@ func (c *persistedLogCache) getOrLoad(fileId string, load func() ([]*filer_pb.Lo return res.entries, res.err } -func (c *persistedLogCache) loadGuarded(load func() ([]*filer_pb.LogEntry, bool, error)) ([]*filer_pb.LogEntry, bool, error) { - c.loadSem <- struct{}{} - defer func() { <-c.loadSem }() +func (c *persistedLogCache) loadGuarded(loadBytes int64, load func() ([]*filer_pb.LogEntry, bool, error)) ([]*filer_pb.LogEntry, bool, error) { + weight := loadBytes + if weight < 1 { + weight = 1 + } + if weight > persistedLogCacheLoadBudget { + // never exceeds the semaphore size, or the acquire could not succeed + weight = persistedLogCacheLoadBudget + } + if err := c.loadSem.Acquire(context.Background(), weight); err != nil { + return nil, false, err + } + defer c.loadSem.Release(weight) return load() } diff --git a/weed/filer/persisted_log_cache_test.go b/weed/filer/persisted_log_cache_test.go index 3ab5b9ba2..5e0dec700 100644 --- a/weed/filer/persisted_log_cache_test.go +++ b/weed/filer/persisted_log_cache_test.go @@ -48,11 +48,11 @@ func TestPersistedLogCacheHitMiss(t *testing.T) { return logEntriesAt(1, 2, 3), true, nil } - e1, err := c.getOrLoad("3,01", load) + e1, err := c.getOrLoad("3,01", 1, load) if err != nil || len(e1) != 3 { t.Fatalf("first getOrLoad: err=%v len=%d", err, len(e1)) } - e2, err := c.getOrLoad("3,01", load) + e2, err := c.getOrLoad("3,01", 1, load) if err != nil { t.Fatal(err) } @@ -74,10 +74,10 @@ func TestPersistedLogCacheNotCachedWhenUncacheable(t *testing.T) { return logEntriesAt(1, 2), false, nil } - if e, err := c.getOrLoad("3,01", load); err != nil || len(e) != 2 { + if e, err := c.getOrLoad("3,01", 1, load); err != nil || len(e) != 2 { t.Fatalf("first: err=%v len=%d", err, len(e)) } - if e, err := c.getOrLoad("3,01", load); err != nil || len(e) != 2 { + if e, err := c.getOrLoad("3,01", 1, load); err != nil || len(e) != 2 { t.Fatalf("second: err=%v len=%d", err, len(e)) } if n := atomic.LoadInt32(&loads); n != 2 { @@ -104,7 +104,7 @@ func TestPersistedLogCacheSingleFlight(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - if _, err := c.getOrLoad("3,01", load); err != nil { + if _, err := c.getOrLoad("3,01", 1, load); err != nil { t.Error(err) } }() @@ -128,10 +128,10 @@ func TestPersistedLogCacheEviction(t *testing.T) { } } - if _, err := c.getOrLoad("a", mk(1)); err != nil { + if _, err := c.getOrLoad("a", 1, mk(1)); err != nil { t.Fatal(err) } - if _, err := c.getOrLoad("b", mk(2)); err != nil { // pushes over budget, evicts LRU "a" + if _, err := c.getOrLoad("b", 1, mk(2)); err != nil { // pushes over budget, evicts LRU "a" t.Fatal(err) } @@ -364,10 +364,10 @@ func TestPersistedLogCacheIdleEviction(t *testing.T) { load := func() ([]*filer_pb.LogEntry, bool, error) { return logEntriesAt(1), true, nil } - if _, err := c.getOrLoad("idle", load); err != nil { + if _, err := c.getOrLoad("idle", 1, load); err != nil { t.Fatal(err) } - if _, err := c.getOrLoad("hot", load); err != nil { + if _, err := c.getOrLoad("hot", 1, load); err != nil { t.Fatal(err) } @@ -413,3 +413,15 @@ func TestDecodeLogRecordsRejectsImplausibleRecords(t *testing.T) { t.Fatalf("non-increasing prefix: %v", entries) } } + +func TestPersistedLogCacheLoadLargerThanBudget(t *testing.T) { + // a load weight above the semaphore size could never be acquired; it must + // clamp and still run + c := newPersistedLogCache(persistedLogCacheMaxBytes) + e, err := c.getOrLoad("3,01", persistedLogCacheLoadBudget*4, func() ([]*filer_pb.LogEntry, bool, error) { + return logEntriesAt(1), true, nil + }) + if err != nil || len(e) != 1 { + t.Fatalf("oversized load: err=%v len=%d", err, len(e)) + } +} diff --git a/weed/pb/filer_pb/filer_pb_event_scan.go b/weed/pb/filer_pb/filer_pb_event_scan.go new file mode 100644 index 000000000..b99a73e87 --- /dev/null +++ b/weed/pb/filer_pb/filer_pb_event_scan.go @@ -0,0 +1,142 @@ +package filer_pb + +import ( + "google.golang.org/protobuf/encoding/protowire" +) + +// ScanMetadataEventSkeleton extracts just the fields the subscription matcher +// reads, without materializing the entries' chunk lists. ok=false means the +// caller must fall back to a full unmarshal. +func ScanMetadataEventSkeleton(data []byte) (skeleton *SubscribeMetadataResponse, ok bool) { + skeleton = &SubscribeMetadataResponse{} + rest := data + for len(rest) > 0 { + num, typ, n := protowire.ConsumeTag(rest) + if n < 0 { + return nil, false + } + rest = rest[n:] + switch { + case num == 1 && typ == protowire.BytesType: // directory + v, m := protowire.ConsumeBytes(rest) + if m < 0 { + return nil, false + } + skeleton.Directory = string(v) + rest = rest[m:] + case num == 2 && typ == protowire.BytesType: // event_notification + if skeleton.EventNotification != nil { + // repeated occurrences of a message field merge; punt to a full decode + return nil, false + } + v, m := protowire.ConsumeBytes(rest) + if m < 0 { + return nil, false + } + notification, scanOk := scanEventNotificationSkeleton(v) + if !scanOk { + return nil, false + } + skeleton.EventNotification = notification + rest = rest[m:] + case num == 3 && typ == protowire.VarintType: // ts_ns + v, m := protowire.ConsumeVarint(rest) + if m < 0 { + return nil, false + } + skeleton.TsNs = int64(v) + rest = rest[m:] + default: + m := protowire.ConsumeFieldValue(num, typ, rest) + if m < 0 { + return nil, false + } + rest = rest[m:] + } + } + return skeleton, true +} + +func scanEventNotificationSkeleton(data []byte) (*EventNotification, bool) { + notification := &EventNotification{} + rest := data + for len(rest) > 0 { + num, typ, n := protowire.ConsumeTag(rest) + if n < 0 { + return nil, false + } + rest = rest[n:] + switch { + case num == 1 && typ == protowire.BytesType: // old_entry + if notification.OldEntry != nil { + return nil, false + } + v, m := protowire.ConsumeBytes(rest) + if m < 0 { + return nil, false + } + name, scanOk := scanEntryName(v) + if !scanOk { + return nil, false + } + notification.OldEntry = &Entry{Name: name} + rest = rest[m:] + case num == 2 && typ == protowire.BytesType: // new_entry + if notification.NewEntry != nil { + return nil, false + } + v, m := protowire.ConsumeBytes(rest) + if m < 0 { + return nil, false + } + name, scanOk := scanEntryName(v) + if !scanOk { + return nil, false + } + notification.NewEntry = &Entry{Name: name} + rest = rest[m:] + case num == 4 && typ == protowire.BytesType: // new_parent_path + v, m := protowire.ConsumeBytes(rest) + if m < 0 { + return nil, false + } + notification.NewParentPath = string(v) + rest = rest[m:] + default: + m := protowire.ConsumeFieldValue(num, typ, rest) + if m < 0 { + return nil, false + } + rest = rest[m:] + } + } + return notification, true +} + +// scanEntryName walks the whole entry so field order does not matter. +func scanEntryName(data []byte) (string, bool) { + name := "" + rest := data + for len(rest) > 0 { + num, typ, n := protowire.ConsumeTag(rest) + if n < 0 { + return "", false + } + rest = rest[n:] + if num == 1 && typ == protowire.BytesType { // name; last occurrence wins like proto merge + v, m := protowire.ConsumeBytes(rest) + if m < 0 { + return "", false + } + name = string(v) + rest = rest[m:] + continue + } + m := protowire.ConsumeFieldValue(num, typ, rest) + if m < 0 { + return "", false + } + rest = rest[m:] + } + return name, true +} diff --git a/weed/pb/filer_pb/filer_pb_event_scan_bench_test.go b/weed/pb/filer_pb/filer_pb_event_scan_bench_test.go new file mode 100644 index 000000000..8710e0993 --- /dev/null +++ b/weed/pb/filer_pb/filer_pb_event_scan_bench_test.go @@ -0,0 +1,60 @@ +package filer_pb + +import ( + "testing" + + "google.golang.org/protobuf/proto" +) + +func benchmarkEventBytes(b *testing.B, chunkCount int) []byte { + b.Helper() + chunks := make([]*FileChunk, chunkCount) + for i := range chunks { + chunks[i] = &FileChunk{FileId: "3,1234567890ab", Offset: int64(i) * 4096, Size: 4096, ModifiedTsNs: int64(i)} + } + data, err := proto.Marshal(&SubscribeMetadataResponse{ + Directory: "/data/pvc-42", + TsNs: 123456789, + EventNotification: &EventNotification{ + OldEntry: &Entry{Name: "0.log", Chunks: chunks}, + NewEntry: &Entry{Name: "0.log", Chunks: chunks}, + }, + }) + if err != nil { + b.Fatal(err) + } + return data +} + +// the non-matching subscriber case, which is what a per-path fleet hits for +// almost every entry + +func BenchmarkSubscriptionMatchFullDecode(b *testing.B) { + data := benchmarkEventBytes(b, 1000) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + event := &SubscribeMetadataResponse{} + if err := proto.Unmarshal(data, event); err != nil { + b.Fatal(err) + } + if MetadataEventMatchesSubscription(event, "/data/pvc-7/", nil, nil) { + b.Fatal("unexpected match") + } + } +} + +func BenchmarkSubscriptionMatchSkeleton(b *testing.B) { + data := benchmarkEventBytes(b, 1000) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + skeleton, ok := ScanMetadataEventSkeleton(data) + if !ok { + b.Fatal("scan failed") + } + if MetadataEventMatchesSubscription(skeleton, "/data/pvc-7/", nil, nil) { + b.Fatal("unexpected match") + } + } +} diff --git a/weed/pb/filer_pb/filer_pb_event_scan_test.go b/weed/pb/filer_pb/filer_pb_event_scan_test.go new file mode 100644 index 000000000..aa8498ea4 --- /dev/null +++ b/weed/pb/filer_pb/filer_pb_event_scan_test.go @@ -0,0 +1,112 @@ +package filer_pb + +import ( + "testing" + + "google.golang.org/protobuf/encoding/protowire" + "google.golang.org/protobuf/proto" +) + +func marshalEvent(t *testing.T, event *SubscribeMetadataResponse) []byte { + t.Helper() + data, err := proto.Marshal(event) + if err != nil { + t.Fatal(err) + } + return data +} + +func bigEntry(name string) *Entry { + chunks := make([]*FileChunk, 50) + for i := range chunks { + chunks[i] = &FileChunk{FileId: "3,1234567890ab", Offset: int64(i) * 100, Size: 100} + } + return &Entry{Name: name, Chunks: chunks, Attributes: &FuseAttributes{FileSize: 5000, Mtime: 1234567890}} +} + +func TestScanMetadataEventSkeletonMatchesFullDecode(t *testing.T) { + events := map[string]*SubscribeMetadataResponse{ + "create": {Directory: "/data/pvc-7", TsNs: 100, + EventNotification: &EventNotification{NewEntry: bigEntry("a.log")}}, + "update": {Directory: "/data/pvc-7", TsNs: 200, + EventNotification: &EventNotification{OldEntry: bigEntry("a.log"), NewEntry: bigEntry("a.log")}}, + "delete": {Directory: "/data/pvc-7", TsNs: 300, + EventNotification: &EventNotification{OldEntry: bigEntry("a.log"), DeleteChunks: true}}, + "rename within dir": {Directory: "/data/pvc-7", TsNs: 400, + EventNotification: &EventNotification{OldEntry: bigEntry("a.log"), NewEntry: bigEntry("b.log")}}, + "rename across dirs": {Directory: "/data/pvc-7", TsNs: 500, + EventNotification: &EventNotification{OldEntry: bigEntry("a.log"), NewEntry: bigEntry("a.log"), NewParentPath: "/data/pvc-9"}}, + "empty notification": {Directory: "/data/pvc-7", TsNs: 600, + EventNotification: &EventNotification{}}, + "no notification": {Directory: "/data/pvc-7", TsNs: 700}, + } + filters := []struct { + prefix string + prefixes []string + dirs []string + }{ + {prefix: "/data/pvc-7/"}, + {prefix: "/data/pvc-9/"}, + {prefix: "/data/pvc-1/"}, + {prefix: "/"}, + {prefixes: []string{"/other/", "/data/pvc-9/"}}, + {dirs: []string{"/data/pvc-7"}}, + {dirs: []string{"/data/pvc-9"}}, + } + + for name, event := range events { + skeleton, ok := ScanMetadataEventSkeleton(marshalEvent(t, event)) + if !ok { + t.Fatalf("%s: scan failed", name) + } + if skeleton.TsNs != event.TsNs { + t.Errorf("%s: ts %d != %d", name, skeleton.TsNs, event.TsNs) + } + for _, f := range filters { + want := MetadataEventMatchesSubscription(event, f.prefix, f.prefixes, f.dirs) + got := MetadataEventMatchesSubscription(skeleton, f.prefix, f.prefixes, f.dirs) + if got != want { + t.Errorf("%s with filter %+v: skeleton match %v, full match %v", name, f, got, want) + } + } + } +} + +func TestScanMetadataEventSkeletonFallsBack(t *testing.T) { + if _, ok := ScanMetadataEventSkeleton([]byte{0xff, 0xff, 0xff}); ok { + t.Error("malformed payload must not scan") + } + + // two event_notification occurrences merge under proto semantics; the + // scanner must punt rather than guess + one := marshalEvent(t, &SubscribeMetadataResponse{ + EventNotification: &EventNotification{OldEntry: &Entry{Name: "a"}}}) + two := append(append([]byte{}, one...), one...) + if _, ok := ScanMetadataEventSkeleton(two); ok { + t.Error("duplicated message field must fall back to full decode") + } + + // same for a duplicated entry inside the notification + entry, err := proto.Marshal(&Entry{Name: "a"}) + if err != nil { + t.Fatal(err) + } + var notification []byte + for i := 0; i < 2; i++ { + notification = protowire.AppendTag(notification, 1, protowire.BytesType) // old_entry + notification = protowire.AppendBytes(notification, entry) + } + var response []byte + response = protowire.AppendTag(response, 2, protowire.BytesType) // event_notification + response = protowire.AppendBytes(response, notification) + if _, ok := ScanMetadataEventSkeleton(response); ok { + t.Error("duplicated nested entry field must fall back to full decode") + } +} + +func TestScanMetadataEventSkeletonEmptyPayload(t *testing.T) { + skeleton, ok := ScanMetadataEventSkeleton(nil) + if !ok || skeleton.Directory != "" || skeleton.EventNotification != nil { + t.Fatalf("empty payload: %+v ok=%v", skeleton, ok) + } +} diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 0b5a1c3f8..d1dffb358 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -190,14 +190,15 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, aggNotifyChan := fs.filer.MetaAggregator.MetaLogBuffer.RegisterSubscriber(aggNotifyName) defer fs.filer.MetaAggregator.MetaLogBuffer.UnregisterSubscriber(aggNotifyName) - eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName) + var unsyncedEvents int64 + eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName, &unsyncedEvents) // lastSeenTsNs tracks how far the subscriber has read so idle heartbeats are // only emitted once it is caught up to the buffer head. It is read and // written from this single goroutine, so no synchronization is needed. var lastSeenTsNs int64 var lastHeartbeatNs int64 - baseEachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) + baseEachLogEntryFn := eachLogEntryFn(req, sender, eachEventNotificationFn, &unsyncedEvents) eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (bool, error) { lastSeenTsNs = logEntry.TsNs return baseEachLogEntryFn(logEntry) @@ -331,14 +332,15 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq sender := newPipelinedSender(stream, 1024, req.ClientSupportsBatching) defer sender.Close() - eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName) + var unsyncedEvents int64 + eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName, &unsyncedEvents) // lastSeenTsNs tracks how far the subscriber has read so idle heartbeats are // only emitted once it is caught up to the buffer head. It is read and // written from this single goroutine, so no synchronization is needed. var lastSeenTsNs int64 var lastHeartbeatNs int64 - baseEachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) + baseEachLogEntryFn := eachLogEntryFn(req, sender, eachEventNotificationFn, &unsyncedEvents) eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (bool, error) { lastSeenTsNs = logEntry.TsNs return baseEachLogEntryFn(logEntry) @@ -490,8 +492,28 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq } -func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) log_buffer.EachLogEntryFuncType { +func eachLogEntryFn(req *filer_pb.SubscribeMetadataRequest, sender metadataStreamSender, eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error, filtered *int64) log_buffer.EachLogEntryFuncType { + // A shallow scan of the path fields skips unmarshaling chunk-heavy events + // this subscriber would filter out anyway; scan surprises fall back to the + // full decode. Only a delivery resets the shared unsynced-events counter. + prefilter := req.PathPrefix != "" || len(req.PathPrefixes) > 0 || len(req.Directories) > 0 return func(logEntry *filer_pb.LogEntry) (bool, error) { + if prefilter { + if skeleton, ok := filer_pb.ScanMetadataEventSkeleton(logEntry.Data); ok && + !filer_pb.MetadataEventMatchesSubscription(skeleton, req.PathPrefix, req.PathPrefixes, req.Directories) { + *filtered++ + if *filtered > MaxUnsyncedEvents { + if err := sender.Send(&filer_pb.SubscribeMetadataResponse{ + EventNotification: &filer_pb.EventNotification{}, + TsNs: skeleton.TsNs, + }); err != nil { + return false, err + } + *filtered = 0 + } + return false, nil + } + } event := &filer_pb.SubscribeMetadataResponse{} if err := proto.Unmarshal(logEntry.Data, event); err != nil { glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) @@ -574,22 +596,20 @@ func (fs *FilerServer) sendLogFileRefs(ctx context.Context, stream metadataStrea return lastTsNs, false, nil } -func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, sender metadataStreamSender, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { - filtered := 0 - +func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, sender metadataStreamSender, clientName string, filtered *int64) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { defer func() { - if filtered > MaxUnsyncedEvents { + if *filtered > MaxUnsyncedEvents { if err := sender.Send(&filer_pb.SubscribeMetadataResponse{ EventNotification: &filer_pb.EventNotification{}, TsNs: tsNs, }); err == nil { - filtered = 0 + *filtered = 0 } } }() - filtered++ + *filtered++ foundSelf := false for _, sig := range eventNotification.Signatures { if sig == req.Signature && req.Signature != 0 { @@ -636,7 +656,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe glog.V(0).Infof("=> client %v: %+v", clientName, err) return err } - filtered = 0 + *filtered = 0 return nil } } diff --git a/weed/server/filer_grpc_server_sub_meta_prefilter_test.go b/weed/server/filer_grpc_server_sub_meta_prefilter_test.go new file mode 100644 index 000000000..10652bf6b --- /dev/null +++ b/weed/server/filer_grpc_server_sub_meta_prefilter_test.go @@ -0,0 +1,96 @@ +package weed_server + +import ( + "testing" + + "google.golang.org/protobuf/proto" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +type recordingSender struct { + sent []*filer_pb.SubscribeMetadataResponse +} + +func (s *recordingSender) Send(resp *filer_pb.SubscribeMetadataResponse) error { + s.sent = append(s.sent, resp) + return nil +} + +func metadataLogEntry(t *testing.T, dir, name string, tsNs int64) *filer_pb.LogEntry { + t.Helper() + data, err := proto.Marshal(&filer_pb.SubscribeMetadataResponse{ + Directory: dir, + TsNs: tsNs, + EventNotification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: name}, + }, + }) + if err != nil { + t.Fatal(err) + } + return &filer_pb.LogEntry{TsNs: tsNs, Data: data} +} + +func TestEachLogEntryFnPrefilterSkipsDecode(t *testing.T) { + req := &filer_pb.SubscribeMetadataRequest{PathPrefix: "/data/pvc-7/"} + sender := &recordingSender{} + var decoded int + var unsyncedEvents int64 + fn := eachLogEntryFn(req, sender, func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + decoded++ + unsyncedEvents = 0 // emulate a delivery, like the notification fn after a send + return nil + }, &unsyncedEvents) + + // non-matching entries skip the full decode but keep heartbeating + for i := 0; i <= MaxUnsyncedEvents; i++ { + if _, err := fn(metadataLogEntry(t, "/data/pvc-1", "x.log", int64(i+1))); err != nil { + t.Fatal(err) + } + } + if decoded != 0 { + t.Fatalf("non-matching entries must not reach the notification fn, got %d", decoded) + } + if len(sender.sent) != 1 { + t.Fatalf("expected one unsynced heartbeat, got %d", len(sender.sent)) + } + if hb := sender.sent[0]; hb.TsNs != int64(MaxUnsyncedEvents+1) || len(hb.EventNotification.Signatures) != 0 { + t.Fatalf("unexpected heartbeat %+v", hb) + } + + // a matching entry takes the full decode path + if _, err := fn(metadataLogEntry(t, "/data/pvc-7", "y.log", 9999)); err != nil { + t.Fatal(err) + } + if decoded != 1 { + t.Fatalf("matching entry must be decoded and delivered, got %d", decoded) + } + + // the delivery reset the shared counter: a fresh full window passes + // before the next heartbeat + for i := 0; i <= MaxUnsyncedEvents; i++ { + if _, err := fn(metadataLogEntry(t, "/data/pvc-1", "x.log", int64(20000+i))); err != nil { + t.Fatal(err) + } + } + if len(sender.sent) != 2 { + t.Fatalf("expected a second heartbeat after a full window, got %d sends", len(sender.sent)) + } +} + +func TestEachLogEntryFnNoFilterDecodesEverything(t *testing.T) { + req := &filer_pb.SubscribeMetadataRequest{} + var decoded int + var unsyncedEvents int64 + fn := eachLogEntryFn(req, &recordingSender{}, func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + decoded++ + return nil + }, &unsyncedEvents) + if _, err := fn(metadataLogEntry(t, "/anywhere", "x", 1)); err != nil { + t.Fatal(err) + } + if decoded != 1 { + t.Fatalf("unfiltered subscriber must decode every entry, got %d", decoded) + } +} diff --git a/weed/server/filer_grpc_server_sub_meta_test.go b/weed/server/filer_grpc_server_sub_meta_test.go index 74b0f908b..072cf3cbf 100644 --- a/weed/server/filer_grpc_server_sub_meta_test.go +++ b/weed/server/filer_grpc_server_sub_meta_test.go @@ -188,7 +188,8 @@ func TestEachEventNotificationFnMatchesRenameTargetsForAllWatchTypes(t *testing. for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { stream := &collectingStream{} - eachEventFn := fs.eachEventNotificationFn(tt.req, stream, "client") + var unsyncedEvents int64 + eachEventFn := fs.eachEventNotificationFn(tt.req, stream, "client", &unsyncedEvents) newDir := "/etc/remote" if len(tt.req.Directories) > 0 { @@ -509,7 +510,8 @@ func TestFilteredEventsEmitMaxUnsyncedMarker(t *testing.T) { req := &filer_pb.SubscribeMetadataRequest{ClientName: "syncFrom_A_To_B", PathPrefix: "/watched/"} stream := &collectingStream{} - eachEventFn := fs.eachEventNotificationFn(req, stream, "client") + var unsyncedEvents int64 + eachEventFn := fs.eachEventNotificationFn(req, stream, "client", &unsyncedEvents) base := time.Now().UnixNano() var lastTsNs int64