Files
seaweedfs/weed/command/filer_sync_jobs.go
Chris Lu c2f5db3a02 perf(filer.sync): don't serialize descendants behind dir attribute updates (#9079)
* perf(filer.sync): don't serialize descendants behind dir attribute updates

The MetadataProcessor treated every in-flight directory job as a subtree
barrier: any active dir job at /foo forced all file events under /foo to
wait, and because the admit loop runs on the single stream.Recv()
goroutine, a stalled descendant also stalled the whole gRPC stream. For
large directories this turned every attribute-only dir event (mtime /
xattr / chmod bumps) into a full-subtree pinch point.

Classify dir jobs as barrier (create / delete / rename) vs non-barrier
(filer_pb.IsUpdate on a directory — same parent and same name, i.e. an
in-place attribute update). Only barrier dirs block descendants and get
blocked by ancestor barrier dirs. Non-barrier dir updates still bump the
ancestor descendantCount, so an incoming barrier dir on an ancestor
still waits for them — preserving the "delete /a waits for in-flight
/a/b update" safety.

Tests cover the loosened cases and the preserved barriers:
non-barrier update doesn't block a file descendant, barrier create
still does, barrier delete still waits for in-flight descendants, and
a barrier ancestor still waits for a non-barrier descendant update.

* fix(filer.sync): serialize same-path barrier dir jobs against concurrent ops

Review (Gemini) flagged that pathConflicts had latent same-path gaps
that predated this PR but deserve fixing alongside the dir-conflict
loosening: two barrier dir jobs at the same path could run concurrently
(e.g. create /a and delete /a), and a file job at the same path as an
in-flight barrier dir wasn't blocked either.

Tighten pathConflicts so that:
- an active barrier dir at p blocks every incoming job at p (file,
  barrier dir, or non-barrier attribute update) — same-path promotions,
  renames, and delete/create collisions must serialize;
- an active file at p blocks incoming files and barrier dirs at p;
- non-barrier dir updates at the same path still overlap with each
  other (attribute bumps are last-writer-wins, intentional).

TestDirVsDirConflict and TestFileUnderActiveDirConflict flip their
"same path does not conflict" assertions to match. New
TestSamePathBarrierSerialization covers all five same-path cases
explicitly.

* fix(filer.sync): serialize incoming barrier dir against same-path non-barrier update

Bug introduced by the previous same-path tightening commit and caught
in review (CodeRabbit, critical): a kindNonBarrierDir at /dir1 was not
indexed at its own path, so a later kindBarrierDir at /dir1 saw neither
activeBarrierDirPaths["/dir1"] nor descendantCount["/dir1"] (the latter
only counts strict descendants) and was admitted concurrently with the
in-flight attribute update. That violated the "barrier at p serializes
all work at p" rule.

Track non-barrier dir jobs in a new activeNonBarrierDirPaths map and
check it only from the incoming-barrier-dir branch of pathConflicts.
The map is deliberately invisible to the ancestor check, so non-barrier
updates still don't serialize file descendants — the loosening this PR
is about stays intact.

Regression test added in TestSamePathBarrierSerialization covers both
the admission conflict and the index cleanup on job completion.
2026-04-14 18:34:05 -07:00

332 lines
11 KiB
Go

package command
import (
"container/heap"
"path"
"sync"
"sync/atomic"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// tsMinHeap implements heap.Interface for int64 timestamps.
type tsMinHeap []int64
func (h tsMinHeap) Len() int { return len(h) }
func (h tsMinHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h tsMinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *tsMinHeap) Push(x any) { *h = append(*h, x.(int64)) }
func (h *tsMinHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
// jobKind classifies a sync job for conflict detection. Directory events are
// split into "barrier" (create/delete/rename) and "non-barrier" (in-place
// attribute update) so that attribute-only directory updates — which do not
// reshape the namespace — no longer serialize every file operation in the
// subtree.
type jobKind int
const (
// kindFile is a regular file event.
kindFile jobKind = iota
// kindBarrierDir is a directory create, delete, or rename. It acts as a
// subtree barrier: it waits for all active descendants to drain, and it
// blocks every event under it from being admitted until it completes.
kindBarrierDir
// kindNonBarrierDir is a directory attribute update (mtime/xattr/chmod
// with the same parent and name). It does not block descendants and is
// not blocked by ancestor directories, but it still bumps the ancestor
// descendant counters so an incoming barrier dir on an ancestor path
// still waits for it to drain.
kindNonBarrierDir
)
type syncJobPaths struct {
path util.FullPath
newPath util.FullPath // empty for non-renames
kind jobKind
}
type MetadataProcessor struct {
activeJobs map[int64]*syncJobPaths
activeJobsLock sync.Mutex
activeJobsCond *sync.Cond
concurrencyLimit int
fn pb.ProcessMetadataFunc
processedTsWatermark atomic.Int64
// Indexes for O(depth) conflict detection, replacing O(n) linear scan.
// activeFilePaths counts active file jobs at each exact path.
activeFilePaths map[util.FullPath]int
// activeBarrierDirPaths counts active barrier-dir jobs at each exact
// path. Only barrier dirs are tracked here; non-barrier dir updates are
// deliberately invisible to the ancestor check so that they don't
// serialize every file descendant.
activeBarrierDirPaths map[util.FullPath]int
// activeNonBarrierDirPaths counts active non-barrier dir jobs at each
// exact path. This is read *only* by incoming barrier dirs, so a
// delete/rename/create at p correctly waits for an in-flight chmod/
// xattr/mtime update at the same p. It is deliberately invisible to the
// ancestor check, so non-barrier updates still don't serialize file
// descendants.
activeNonBarrierDirPaths map[util.FullPath]int
// descendantCount counts active jobs (of any kind) strictly under each
// directory. Read by incoming barrier dirs so they wait for their whole
// subtree to drain before running, regardless of descendant kind.
descendantCount map[util.FullPath]int
// tsHeap is a min-heap of active job timestamps with lazy deletion,
// used for O(log n) amortized watermark tracking.
tsHeap tsMinHeap
}
func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor {
t := &MetadataProcessor{
fn: fn,
activeJobs: make(map[int64]*syncJobPaths),
concurrencyLimit: concurrency,
activeFilePaths: make(map[util.FullPath]int),
activeBarrierDirPaths: make(map[util.FullPath]int),
activeNonBarrierDirPaths: make(map[util.FullPath]int),
descendantCount: make(map[util.FullPath]int),
}
t.processedTsWatermark.Store(offsetTsNs)
t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
return t
}
// pathAncestors returns all proper ancestor directories of p.
// For "/a/b/c", returns ["/a/b", "/a", "/"].
func pathAncestors(p util.FullPath) []util.FullPath {
var ancestors []util.FullPath
s := string(p)
for {
parent := path.Dir(s)
if parent == s {
break
}
ancestors = append(ancestors, util.FullPath(parent))
s = parent
}
return ancestors
}
// addPathToIndex registers a path in the conflict detection indexes.
// Must be called under activeJobsLock.
func (t *MetadataProcessor) addPathToIndex(p util.FullPath, kind jobKind) {
switch kind {
case kindFile:
t.activeFilePaths[p]++
case kindBarrierDir:
t.activeBarrierDirPaths[p]++
case kindNonBarrierDir:
t.activeNonBarrierDirPaths[p]++
}
for _, ancestor := range pathAncestors(p) {
t.descendantCount[ancestor]++
}
}
// removePathFromIndex unregisters a path from the conflict detection indexes.
// Must be called under activeJobsLock.
func (t *MetadataProcessor) removePathFromIndex(p util.FullPath, kind jobKind) {
switch kind {
case kindFile:
if t.activeFilePaths[p] <= 1 {
delete(t.activeFilePaths, p)
} else {
t.activeFilePaths[p]--
}
case kindBarrierDir:
if t.activeBarrierDirPaths[p] <= 1 {
delete(t.activeBarrierDirPaths, p)
} else {
t.activeBarrierDirPaths[p]--
}
case kindNonBarrierDir:
if t.activeNonBarrierDirPaths[p] <= 1 {
delete(t.activeNonBarrierDirPaths, p)
} else {
t.activeNonBarrierDirPaths[p]--
}
}
for _, ancestor := range pathAncestors(p) {
if t.descendantCount[ancestor] <= 1 {
delete(t.descendantCount, ancestor)
} else {
t.descendantCount[ancestor]--
}
}
}
// pathConflicts checks if a single path conflicts with any active job.
// Conflict rules:
// - any kind vs same-path barrier dir: wait (a create/delete/rename on p
// must fully serialize against any other operation touching p, including
// non-barrier attribute updates and files at the same path)
// - incoming barrier dir vs same-path non-barrier dir update: wait (a
// delete/rename/create on p must wait for an in-flight chmod/xattr/mtime
// update at the same p to drain)
// - file vs same-path file: wait
// - file vs same-path barrier dir: wait (covered by the barrier-at-p check
// above; also serializes a file-to-dir / dir-to-file promotion)
// - barrier dir vs same-path file: wait
// - barrier dir vs any descendant (file or dir, barrier or not): wait
// - barrier ancestor: always wait, regardless of incoming kind
// - non-barrier dir vs descendants: never conflicts
// - non-barrier dir vs same-path non-barrier dir: never conflicts (attribute
// bumps are "last writer wins"; this intentionally lets rapid mtime /
// xattr updates overlap)
func (t *MetadataProcessor) pathConflicts(p util.FullPath, kind jobKind) bool {
// A barrier dir in flight at p serializes every new job at p. This is the
// strictest same-path rule and applies regardless of incoming kind.
if t.activeBarrierDirPaths[p] > 0 {
return true
}
// An incoming barrier dir must also wait for any in-flight non-barrier
// dir update at the same path. Without this check, a delete or rename on
// a directory could overlap with an attribute bump in progress for the
// same directory.
if kind == kindBarrierDir && t.activeNonBarrierDirPaths[p] > 0 {
return true
}
// A file in flight at p blocks new file or barrier-dir jobs at p. A
// non-barrier dir update at p is allowed through — by construction files
// and dirs at the same path only coexist across a promotion, which is a
// barrier event handled by the check above.
if t.activeFilePaths[p] > 0 && (kind == kindFile || kind == kindBarrierDir) {
return true
}
// Barrier dirs additionally wait for their whole in-flight subtree.
if kind == kindBarrierDir && t.descendantCount[p] > 0 {
return true
}
// Any barrier dir on a proper ancestor blocks everything under it.
for _, ancestor := range pathAncestors(p) {
if t.activeBarrierDirPaths[ancestor] > 0 {
return true
}
}
return false
}
func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
p, newPath, kind := extractJobInfo(resp)
if t.pathConflicts(p, kind) {
return true
}
if newPath != "" && t.pathConflicts(newPath, kind) {
return true
}
return false
}
func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) {
if filer_pb.IsEmpty(resp) {
return
}
t.activeJobsLock.Lock()
defer t.activeJobsLock.Unlock()
for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) {
t.activeJobsCond.Wait()
}
p, newPath, kind := extractJobInfo(resp)
jobPaths := &syncJobPaths{path: p, newPath: newPath, kind: kind}
t.activeJobs[resp.TsNs] = jobPaths
t.addPathToIndex(p, kind)
if newPath != "" {
t.addPathToIndex(newPath, kind)
}
heap.Push(&t.tsHeap, resp.TsNs)
go func() {
if err := util.Retry("metadata processor", func() error {
return t.fn(resp)
}); err != nil {
glog.Errorf("process %v: %v", resp, err)
}
t.activeJobsLock.Lock()
defer t.activeJobsLock.Unlock()
delete(t.activeJobs, resp.TsNs)
t.removePathFromIndex(jobPaths.path, jobPaths.kind)
if jobPaths.newPath != "" {
t.removePathFromIndex(jobPaths.newPath, jobPaths.kind)
}
// Lazy-clean stale entries from heap top (already-completed jobs).
// Each entry is pushed once and popped once: O(log n) amortized.
for t.tsHeap.Len() > 0 {
if _, active := t.activeJobs[t.tsHeap[0]]; active {
break
}
heap.Pop(&t.tsHeap)
}
// If this was the oldest job, advance the watermark.
if t.tsHeap.Len() == 0 || resp.TsNs < t.tsHeap[0] {
t.processedTsWatermark.Store(resp.TsNs)
}
t.activeJobsCond.Signal()
}()
}
// extractJobInfo derives the conflict-detection path(s) and job kind for a
// metadata event. A rename returns both the source and destination paths; all
// other event shapes return only the primary path.
func extractJobInfo(resp *filer_pb.SubscribeMetadataResponse) (p, newPath util.FullPath, kind jobKind) {
oldEntry := resp.EventNotification.OldEntry
newEntry := resp.EventNotification.NewEntry
// create
if filer_pb.IsCreate(resp) {
p = util.FullPath(resp.Directory).Child(newEntry.Name)
kind = classifyDirEvent(newEntry.IsDirectory, false)
return
}
if filer_pb.IsDelete(resp) {
p = util.FullPath(resp.Directory).Child(oldEntry.Name)
kind = classifyDirEvent(oldEntry.IsDirectory, false)
return
}
if filer_pb.IsUpdate(resp) {
p = util.FullPath(resp.Directory).Child(newEntry.Name)
// In-place attribute update: non-barrier when the entry is a dir.
kind = classifyDirEvent(newEntry.IsDirectory, true)
return
}
// renaming: the namespace is reshaped on both sides, so a directory
// rename is a barrier on both source and destination.
p = util.FullPath(resp.Directory).Child(oldEntry.Name)
newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name)
kind = classifyDirEvent(oldEntry.IsDirectory, false)
return
}
// classifyDirEvent maps an entry's (isDirectory, isAttributeUpdate) pair to a
// jobKind. Attribute-only updates on directories are the only non-barrier
// case; everything else on a directory (create/delete/rename) is a barrier,
// and everything on a file is kindFile.
func classifyDirEvent(isDirectory, isAttributeUpdate bool) jobKind {
if !isDirectory {
return kindFile
}
if isAttributeUpdate {
return kindNonBarrierDir
}
return kindBarrierDir
}