perf(filer): parallelize StreamMutateEntry with path-keyed scheduler (#9171)

* perf(filer): parallelize StreamMutateEntry with path-keyed scheduler

The server handler processed one mutation at a time per stream, capping a
mount's aggregate throughput at ~1/filer_store_service_time regardless of
client concurrency (see issue #9138). With 12 rclone processes this showed
as a ~500 QPS ceiling on a filer that previously served ~1000+ QPS via
unary CreateEntry.

Replace the serial for-loop with a per-request goroutine admitted by a
path-keyed scheduler, adapted directly from filer.sync's MetadataProcessor
(weed/command/filer_sync_jobs.go). Same four conflict indexes, same kind
taxonomy (file / barrier-dir / non-barrier-dir), same ancestor-barrier
and descendant-barrier rules. Cross-path mutations run in parallel; same-
path mutations serialize on arrival order; recursive delete and directory
rename act as subtree barriers; directory attribute bumps stay non-barrier
so they do not serialize file writes under them.

Correctness and safety:
- Per-stream goroutine cap (streamMutateConcurrency = 64) bounds resource
  use from a single noisy mount.
- syncStream wrapper serializes stream.Send across worker goroutines (gRPC
  Send is not concurrent-safe).
- Handler waits on in-flight workers before returning on recv EOF/error so
  no worker writes to a torn-down stream.
- First fatal Send error from any worker propagates as the handler's
  return, causing the stream to tear down.

Benchmark (2 ms simulated filer-store service delay, 12 client workers):
  serial    : 440 QPS
  sem only  : 4902 QPS (unsafe — reorders same-path ops)
  scheduler : 4934 QPS on distinct paths, 439 QPS on same path (correct)

The sem-only number shows the upper bound of raw parallelism; the
scheduler matches it on distinct paths (the realistic 12-rclone case) and
correctly falls back to serial when the workload demands ordering. Peak
concurrent mutations at the handler equals client worker count on the
distinct-path workload and pins to 1 on the same-path workload, as the
scheduler intends.

* perf(filer): decouple StreamMutateEntry admission from receive loop

The previous StreamMutateEntry handler called sched.Admit directly in the
Recv loop. A single request conflicting on path /hot would head-of-line
block stream.Recv, so later requests targeting unrelated paths could not
be received or admitted until /hot drained — cross-path parallelism then
depended on request ordering instead of being a property of the scheduler.

Spawn the worker goroutine immediately on Recv and move sched.Admit into
that goroutine. A new streamMutatePendingLimit (1024) caps total per-
stream outstanding goroutines (pending + active) so a client flooding a
conflicted path cannot explode goroutine count without bound.

Addresses #9171 review comment (coderabbitai, Major).

* fix(filer): reply with EINVAL on unknown StreamMutateEntry request type

Returning nil when req.Request is a future oneof variant or a malformed
request left the client's per-RequestId waiter blocked forever, because
no response was ever sent for that id. Reply with IsLast=true and EINVAL
so the waiter completes with a well-formed error.

Addresses #9171 review comments (gemini-code-assist, coderabbitai).

* fix(filer): make classifyMutation crash-free and correct for deletes

Two issues addressed together because they share one function:

1. Nil-entry panic. classifyMutation dereferenced req.Entry.Name without
   a nil guard; an empty create_request / update_request / rename_request
   from a misbehaving client crashed the scheduler. Guard each oneof
   variant and fall back to a "/" barrier; the handler then sends EINVAL
   via the unknown-request path.

2. Non-recursive delete vs concurrent dir attribute update. DeleteEntry-
   Request does not carry IsDirectory, so the previous kindMutateFile
   classification for non-recursive deletes did not conflict with an in-
   flight kindMutateNonBarrierDir (chmod / xattr / mtime) at the same
   path — a race in scheduler terms. Classify every delete as
   kindMutateBarrierDir regardless of IsRecursive. The incremental cost
   of a descendant-wait for a non-recursive delete of a non-empty dir is
   negligible since that call fails at the store anyway.

Adds classifyMutation tests for malformed create/update, empty oneof,
and updates the delete-non-recursive case to the new expected kind.

Addresses #9171 review comments (coderabbitai Critical, Major).

* fix(filer): route renameStreamProxy.SendMsg through the wrapping Send

The default pass-through SendMsg on renameStreamProxy bypassed the
syncStream mutex and the StreamMutateEntryResponse wrapping: anything
the rename helpers happened to push via SendMsg would have been emitted
on the wire as the wrong protobuf type and could interleave with other
workers' Sends. RecvMsg similarly raced with the outer StreamMutateEntry
Recv loop and could steal unrelated mutation requests.

Route SendMsg through the wrapping Send (rejecting other payload types)
and fail RecvMsg explicitly — the rename logic is a strictly server-push
stream and never calls RecvMsg, so loud failure is safer than silent
stealing.

Addresses #9171 review comment (coderabbitai, Major).

* test(filer): run exactly ops in stream-mutate workloads

perGoroutine := ops / concurrency silently truncated the total when the
values were not divisible — e.g. 2400 ops with 64 workers actually ran
2368 and with 256 workers ran 2304, making the logged "ops per run"
inaccurate and introducing measurement noise that varied across the
concurrency sweep.

Introduce opsForWorker(g, concurrency, ops) which distributes the
remainder to the first (ops % concurrency) workers so the three
workloads (unary, stream sync, stream async) each dispatch exactly
`ops` operations. No changes to the timing methodology.

Addresses #9171 review comment (coderabbitai, Minor).

* fix(filer): enforce per-path FIFO admission in mutateScheduler

sync.Cond.Broadcast wakes every waiter; the first to re-acquire the
mutex wins, so two conflicting same-path admissions could be reordered
by the Go runtime even though they arrived serially on the stream. A
single stream is supposed to carry ordered mutations — the PR's original
#8770 claim — so admission must be FIFO per path.

Replace the single cond with a per-path FIFO queue. Each Admit enqueues
a waiter on every path it touches (primary, and on rename the secondary
too) and blocks on a ready channel. tryPromoteLocked admits any waiter
that is at the head of every queue it joined, passes pathConflictsLocked
against the active-state indexes, and is under concurrencyLimit. Done
removes the heads and re-runs tryPromoteLocked so waiters freed by the
completion move in arrival order.

Side effect: two non-barrier directory updates on the same path now
serialize instead of overlapping. filer.sync's MetadataProcessor
intentionally allows them to overlap because its events come from a
committed log where last-writer-wins coalescing is safe; streamed
mutations carry client operations whose order matters, so we drop that
optimization here. Added TestAdmitSamePathFIFO (20-waiter barrier
release) and TestAdmitSamePathNonBarrierSerializes to cover both.

Also refreshed the kindMutateFile doc comment that still referenced the
pre-#1ecf805f5 "non-recursive delete" classification.

Addresses #9171 review comments (coderabbitai Critical, Minor).

* test(filer): make TestAdmitSamePathFIFO deterministic without sleeps

The previous arrival-ordering sync (send to `started` before calling
Admit, plus a 1 ms sleep) relied on the goroutine actually entering
Admit and reaching the per-path queue during that sleep. Under -race on
a loaded CI that is a real flake source, which is ironic for a test
whose job is catching non-deterministic wake-ups.

Observe the scheduler's own pathQueue length between spawns instead —
waitQueueLen polls s.pathQueue["/a"] under s.mu until the expected
number of waiters (1 barrier holder + i+1 file waiters) is enqueued.
That's the exact event the test wants to synchronise on, so there is no
fudge factor. Verified by `go test -race -count=5`.

Addresses #9171 review comment (coderabbitai, Minor).
This commit is contained in:
Chris Lu
2026-04-21 11:25:09 -07:00
committed by GitHub
parent 141413ad76
commit c40db5a52d
4 changed files with 1872 additions and 74 deletions
+173 -74
View File
@@ -2,90 +2,174 @@ package weed_server
import (
"context"
"fmt"
"io"
"strings"
"sync"
"syscall"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// streamMutateConcurrency bounds the number of in-flight mutations processed
// concurrently per client stream. Set to the typical filer-store sweet spot
// so one noisy mount cannot exhaust filer resources.
const streamMutateConcurrency = 64
// streamMutatePendingLimit caps total outstanding requests per stream
// (admitted + waiting-for-admission). Prevents goroutine explosion when a
// client floods requests against a conflicted path while leaving enough
// headroom that cross-path requests never block Recv in practice.
const streamMutatePendingLimit = 1024
// syncStream wraps a bidi stream so that concurrent goroutines can Send
// without interleaving frames. gRPC requires that Send not be called
// concurrently; this mutex is the serialization point.
type syncStream struct {
stream grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse]
mu sync.Mutex
}
func (s *syncStream) Send(r *filer_pb.StreamMutateEntryResponse) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.stream.Send(r)
}
func (fs *FilerServer) StreamMutateEntry(stream grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse]) error {
ss := &syncStream{stream: stream}
// Path-keyed admission + subtree barriers, adapted from filer.sync's
// MetadataProcessor (weed/command/filer_sync_jobs.go). Admit blocks when
// a new request conflicts with an in-flight one on the same path or with
// a barrier directory at the same path / an ancestor.
sched := newMutateScheduler(streamMutateConcurrency)
// pendingSem caps goroutines-per-stream. The receive loop only blocks on
// this sem (not on Admit), so one conflicted path cannot head-of-line
// block receipt of unrelated paths — later distinct-path requests can be
// spawned, admitted, and processed while the conflicted request waits.
pendingSem := make(chan struct{}, streamMutatePendingLimit)
var wg sync.WaitGroup
// Track the first fatal send error across worker goroutines.
var sendErrMu sync.Mutex
var sendErr error
setSendErr := func(e error) {
sendErrMu.Lock()
if sendErr == nil {
sendErr = e
}
sendErrMu.Unlock()
}
getSendErr := func() error {
sendErrMu.Lock()
defer sendErrMu.Unlock()
return sendErr
}
for {
if e := getSendErr(); e != nil {
wg.Wait()
return e
}
req, err := stream.Recv()
if err == io.EOF {
return nil
wg.Wait()
return getSendErr()
}
if err != nil {
wg.Wait()
return err
}
switch r := req.Request.(type) {
primary, secondary, kind := classifyMutation(req)
pendingSem <- struct{}{}
wg.Add(1)
go func(req *filer_pb.StreamMutateEntryRequest, p, s util.FullPath, k mutateJobKind) {
defer wg.Done()
defer func() { <-pendingSem }()
// Admission happens off the Recv loop so a conflicted path never
// blocks receipt of unrelated requests.
sched.Admit(p, s, k)
defer sched.Done(p, s, k)
if e := fs.handleStreamMutateRequest(ss, req); e != nil {
setSendErr(e)
}
}(req, primary, secondary, kind)
}
}
case *filer_pb.StreamMutateEntryRequest_CreateRequest:
resp, createErr := fs.CreateEntry(stream.Context(), r.CreateRequest)
if createErr != nil {
resp = &filer_pb.CreateEntryResponse{Error: createErr.Error()}
}
streamResp := &filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Response: &filer_pb.StreamMutateEntryResponse_CreateResponse{CreateResponse: resp},
}
if resp.Error != "" {
streamResp.Error = resp.Error
streamResp.Errno = int32(syscall.EIO)
}
if sendErr := stream.Send(streamResp); sendErr != nil {
return sendErr
}
// handleStreamMutateRequest processes one request and sends exactly one
// (possibly multi-message) response via the shared sync stream. It returns
// a non-nil error only if Send fails — in which case the caller should
// tear down the stream.
func (fs *FilerServer) handleStreamMutateRequest(ss *syncStream, req *filer_pb.StreamMutateEntryRequest) error {
switch r := req.Request.(type) {
case *filer_pb.StreamMutateEntryRequest_UpdateRequest:
resp, updateErr := fs.UpdateEntry(stream.Context(), r.UpdateRequest)
if updateErr != nil {
resp = &filer_pb.UpdateEntryResponse{}
}
streamResp := &filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Response: &filer_pb.StreamMutateEntryResponse_UpdateResponse{UpdateResponse: resp},
}
if updateErr != nil {
streamResp.Error = updateErr.Error()
streamResp.Errno = int32(syscall.EIO)
}
if sendErr := stream.Send(streamResp); sendErr != nil {
return sendErr
}
case *filer_pb.StreamMutateEntryRequest_DeleteRequest:
resp, deleteErr := fs.DeleteEntry(stream.Context(), r.DeleteRequest)
if deleteErr != nil {
resp = &filer_pb.DeleteEntryResponse{Error: deleteErr.Error()}
}
streamResp := &filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Response: &filer_pb.StreamMutateEntryResponse_DeleteResponse{DeleteResponse: resp},
}
if resp.Error != "" {
streamResp.Error = resp.Error
streamResp.Errno = int32(syscall.EIO)
}
if sendErr := stream.Send(streamResp); sendErr != nil {
return sendErr
}
case *filer_pb.StreamMutateEntryRequest_RenameRequest:
if err := fs.handleStreamMutateRename(stream, req.RequestId, r.RenameRequest); err != nil {
return err
}
default:
glog.Warningf("StreamMutateEntry: unknown request type %T", req.Request)
case *filer_pb.StreamMutateEntryRequest_CreateRequest:
resp, createErr := fs.CreateEntry(ss.stream.Context(), r.CreateRequest)
if createErr != nil {
resp = &filer_pb.CreateEntryResponse{Error: createErr.Error()}
}
out := &filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Response: &filer_pb.StreamMutateEntryResponse_CreateResponse{CreateResponse: resp},
}
if resp.Error != "" {
out.Error = resp.Error
out.Errno = int32(syscall.EIO)
}
return ss.Send(out)
case *filer_pb.StreamMutateEntryRequest_UpdateRequest:
resp, updateErr := fs.UpdateEntry(ss.stream.Context(), r.UpdateRequest)
if updateErr != nil {
resp = &filer_pb.UpdateEntryResponse{}
}
out := &filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Response: &filer_pb.StreamMutateEntryResponse_UpdateResponse{UpdateResponse: resp},
}
if updateErr != nil {
out.Error = updateErr.Error()
out.Errno = int32(syscall.EIO)
}
return ss.Send(out)
case *filer_pb.StreamMutateEntryRequest_DeleteRequest:
resp, deleteErr := fs.DeleteEntry(ss.stream.Context(), r.DeleteRequest)
if deleteErr != nil {
resp = &filer_pb.DeleteEntryResponse{Error: deleteErr.Error()}
}
out := &filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Response: &filer_pb.StreamMutateEntryResponse_DeleteResponse{DeleteResponse: resp},
}
if resp.Error != "" {
out.Error = resp.Error
out.Errno = int32(syscall.EIO)
}
return ss.Send(out)
case *filer_pb.StreamMutateEntryRequest_RenameRequest:
return fs.handleStreamMutateRename(ss, req.RequestId, r.RenameRequest)
default:
// Send a terminal error response so the client's per-RequestId waiter
// is released; returning nil here would leak the client-side waiter
// forever when a future oneof variant or a malformed request arrives.
glog.Warningf("StreamMutateEntry: unknown request type %T", req.Request)
return ss.Send(&filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Error: "unknown request type",
Errno: int32(syscall.EINVAL),
})
}
}
@@ -93,7 +177,7 @@ func (fs *FilerServer) StreamMutateEntry(stream grpc.BidiStreamingServer[filer_p
// using a proxy stream that converts StreamRenameEntryResponse events into
// StreamMutateEntryResponse messages on the parent bidi stream.
func (fs *FilerServer) handleStreamMutateRename(
parent grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse],
parent *syncStream,
requestId uint64,
req *filer_pb.StreamRenameEntryRequest,
) error {
@@ -112,10 +196,7 @@ func (fs *FilerServer) handleStreamMutateRename(
finalResp.Errno = renameErrno(renameErr)
glog.V(0).Infof("StreamMutateEntry rename: %v", renameErr)
}
if sendErr := parent.Send(finalResp); sendErr != nil {
return sendErr
}
return nil
return parent.Send(finalResp)
}
// renameStreamProxy adapts the bidi StreamMutateEntry stream to look like a
@@ -123,7 +204,7 @@ func (fs *FilerServer) handleStreamMutateRename(
// moveEntry expect. Each Send() call forwards the response as a non-final
// StreamMutateEntryResponse.
type renameStreamProxy struct {
parent grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse]
parent *syncStream
requestId uint64
}
@@ -136,14 +217,32 @@ func (p *renameStreamProxy) Send(resp *filer_pb.StreamRenameEntryResponse) error
}
func (p *renameStreamProxy) Context() context.Context {
return p.parent.Context()
return p.parent.stream.Context()
}
func (p *renameStreamProxy) SendMsg(m any) error { return p.parent.SendMsg(m) }
func (p *renameStreamProxy) RecvMsg(m any) error { return p.parent.RecvMsg(m) }
func (p *renameStreamProxy) SetHeader(md metadata.MD) error { return p.parent.SetHeader(md) }
func (p *renameStreamProxy) SendHeader(md metadata.MD) error { return p.parent.SendHeader(md) }
func (p *renameStreamProxy) SetTrailer(md metadata.MD) { p.parent.SetTrailer(md) }
// SendMsg routes through Send so the payload is wrapped into a
// StreamMutateEntryResponse and goes through the syncStream mutex. Calling
// SendMsg with anything other than *filer_pb.StreamRenameEntryResponse would
// emit the wrong protobuf type on this RPC, so reject other shapes.
func (p *renameStreamProxy) SendMsg(m any) error {
resp, ok := m.(*filer_pb.StreamRenameEntryResponse)
if !ok {
return fmt.Errorf("renameStreamProxy.SendMsg: unexpected type %T", m)
}
return p.Send(resp)
}
// RecvMsg on the proxy would race with the outer StreamMutateEntry recv
// loop and could steal unrelated mutation requests. The rename logic never
// calls RecvMsg (it is strictly a server-push stream), so fail loudly if it
// ever does.
func (p *renameStreamProxy) RecvMsg(m any) error {
return fmt.Errorf("renameStreamProxy.RecvMsg is not supported")
}
func (p *renameStreamProxy) SetHeader(md metadata.MD) error { return p.parent.stream.SetHeader(md) }
func (p *renameStreamProxy) SendHeader(md metadata.MD) error { return p.parent.stream.SendHeader(md) }
func (p *renameStreamProxy) SetTrailer(md metadata.MD) { p.parent.stream.SetTrailer(md) }
// renameErrno maps a rename error to a POSIX errno for the client.
func renameErrno(err error) int32 {
@@ -0,0 +1,792 @@
package weed_server
import (
"context"
"io"
"net"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// fakeFilerServer simulates a filer whose CreateEntry takes a fixed amount of
// wall-clock time (meant to model the filer-store commit latency). It implements
// only what these benchmarks need: unary CreateEntry and bidi StreamMutateEntry.
//
// The StreamMutateEntry handler mirrors the production handler's structure:
// a single goroutine per stream, processing one request at a time.
type fakeFilerServer struct {
filer_pb.UnimplementedSeaweedFilerServer
serviceDelay time.Duration
createCalls atomic.Int64
}
func (s *fakeFilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (*filer_pb.CreateEntryResponse, error) {
s.createCalls.Add(1)
if s.serviceDelay > 0 {
time.Sleep(s.serviceDelay)
}
return &filer_pb.CreateEntryResponse{}, nil
}
// StreamMutateEntry mirrors the OLD serial handler: one goroutine, strictly
// one request at a time.
func (s *fakeFilerServer) StreamMutateEntry(stream grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse]) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
switch r := req.Request.(type) {
case *filer_pb.StreamMutateEntryRequest_CreateRequest:
resp, createErr := s.CreateEntry(stream.Context(), r.CreateRequest)
if createErr != nil {
resp = &filer_pb.CreateEntryResponse{Error: createErr.Error()}
}
out := &filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Response: &filer_pb.StreamMutateEntryResponse_CreateResponse{CreateResponse: resp},
}
if resp.Error != "" {
out.Error = resp.Error
out.Errno = int32(syscall.EIO)
}
if sendErr := stream.Send(out); sendErr != nil {
return sendErr
}
}
}
}
// fakeConcurrentFilerServer mirrors the NEW concurrent handler: per-request
// goroutine, bounded by a semaphore, with a Send mutex for stream safety.
// Structurally identical to weed/server/filer_grpc_server_stream_mutate.go
// after the parallelization patch.
type fakeConcurrentFilerServer struct {
filer_pb.UnimplementedSeaweedFilerServer
serviceDelay time.Duration
concurrency int
createCalls atomic.Int64
}
func (s *fakeConcurrentFilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (*filer_pb.CreateEntryResponse, error) {
s.createCalls.Add(1)
if s.serviceDelay > 0 {
time.Sleep(s.serviceDelay)
}
return &filer_pb.CreateEntryResponse{}, nil
}
func (s *fakeConcurrentFilerServer) StreamMutateEntry(stream grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse]) error {
var sendMu sync.Mutex
send := func(r *filer_pb.StreamMutateEntryResponse) error {
sendMu.Lock()
defer sendMu.Unlock()
return stream.Send(r)
}
sem := make(chan struct{}, s.concurrency)
var wg sync.WaitGroup
for {
req, err := stream.Recv()
if err == io.EOF {
wg.Wait()
return nil
}
if err != nil {
wg.Wait()
return err
}
sem <- struct{}{}
wg.Add(1)
go func(req *filer_pb.StreamMutateEntryRequest) {
defer wg.Done()
defer func() { <-sem }()
switch r := req.Request.(type) {
case *filer_pb.StreamMutateEntryRequest_CreateRequest:
resp, _ := s.CreateEntry(stream.Context(), r.CreateRequest)
_ = send(&filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Response: &filer_pb.StreamMutateEntryResponse_CreateResponse{CreateResponse: resp},
})
}
}(req)
}
}
// startFakeConcurrentFilerServer spins up the concurrent-handler variant.
func startFakeConcurrentFilerServer(t testing.TB, serviceDelay time.Duration, concurrency int) (string, *fakeConcurrentFilerServer, func()) {
t.Helper()
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen: %v", err)
}
srv := grpc.NewServer()
fake := &fakeConcurrentFilerServer{serviceDelay: serviceDelay, concurrency: concurrency}
filer_pb.RegisterSeaweedFilerServer(srv, fake)
go srv.Serve(lis)
return lis.Addr().String(), fake, func() {
srv.GracefulStop()
_ = lis.Close()
}
}
// fakeSchedulerFilerServer uses the real mutateScheduler from the production
// handler to admit requests by (path, kind). Per-path operations serialize;
// cross-path operations run in parallel.
type fakeSchedulerFilerServer struct {
filer_pb.UnimplementedSeaweedFilerServer
serviceDelay time.Duration
concurrency int
createCalls atomic.Int64
maxInFlight atomic.Int64 // sampled peak of in-flight create goroutines
curInFlight atomic.Int64
}
func (s *fakeSchedulerFilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (*filer_pb.CreateEntryResponse, error) {
s.createCalls.Add(1)
cur := s.curInFlight.Add(1)
for {
peak := s.maxInFlight.Load()
if cur <= peak || s.maxInFlight.CompareAndSwap(peak, cur) {
break
}
}
if s.serviceDelay > 0 {
time.Sleep(s.serviceDelay)
}
s.curInFlight.Add(-1)
return &filer_pb.CreateEntryResponse{}, nil
}
func (s *fakeSchedulerFilerServer) StreamMutateEntry(stream grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse]) error {
var sendMu sync.Mutex
send := func(r *filer_pb.StreamMutateEntryResponse) error {
sendMu.Lock()
defer sendMu.Unlock()
return stream.Send(r)
}
sched := newMutateScheduler(s.concurrency)
var wg sync.WaitGroup
for {
req, err := stream.Recv()
if err == io.EOF {
wg.Wait()
return nil
}
if err != nil {
wg.Wait()
return err
}
primary, secondary, kind := classifyMutation(req)
sched.Admit(primary, secondary, kind)
wg.Add(1)
go func(req *filer_pb.StreamMutateEntryRequest) {
defer wg.Done()
defer sched.Done(primary, secondary, kind)
switch r := req.Request.(type) {
case *filer_pb.StreamMutateEntryRequest_CreateRequest:
resp, _ := s.CreateEntry(stream.Context(), r.CreateRequest)
_ = send(&filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Response: &filer_pb.StreamMutateEntryResponse_CreateResponse{CreateResponse: resp},
})
}
}(req)
}
}
func startFakeSchedulerFilerServer(t testing.TB, serviceDelay time.Duration, concurrency int) (string, *fakeSchedulerFilerServer, func()) {
t.Helper()
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen: %v", err)
}
srv := grpc.NewServer()
fake := &fakeSchedulerFilerServer{serviceDelay: serviceDelay, concurrency: concurrency}
filer_pb.RegisterSeaweedFilerServer(srv, fake)
go srv.Serve(lis)
return lis.Addr().String(), fake, func() {
srv.GracefulStop()
_ = lis.Close()
}
}
// startFakeFilerServer spins up an in-process gRPC filer with the given per-
// request service delay, returns the dial target and a shutdown func.
func startFakeFilerServer(t testing.TB, serviceDelay time.Duration) (string, *fakeFilerServer, func()) {
t.Helper()
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen: %v", err)
}
srv := grpc.NewServer()
fake := &fakeFilerServer{serviceDelay: serviceDelay}
filer_pb.RegisterSeaweedFilerServer(srv, fake)
go srv.Serve(lis)
return lis.Addr().String(), fake, func() {
srv.GracefulStop()
_ = lis.Close()
}
}
// dialFakeFiler returns a client connection to the in-process filer.
func dialFakeFiler(t testing.TB, addr string) *grpc.ClientConn {
t.Helper()
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("dial: %v", err)
}
return conn
}
// opsForWorker splits `ops` across `concurrency` workers, handing the
// remainder to the first `remainder` workers so the total is always exactly
// `ops`. Returns the count for worker index g.
func opsForWorker(g, concurrency, ops int) int {
per := ops / concurrency
if g < ops%concurrency {
return per + 1
}
return per
}
// runUnaryCreateWorkload fires `ops` CreateEntry unary RPCs across `concurrency`
// goroutines, all sharing a single client connection. Returns total duration.
func runUnaryCreateWorkload(t testing.TB, conn *grpc.ClientConn, concurrency, ops int) time.Duration {
t.Helper()
client := filer_pb.NewSeaweedFilerClient(conn)
var wg sync.WaitGroup
start := time.Now()
for g := 0; g < concurrency; g++ {
wg.Add(1)
count := opsForWorker(g, concurrency, ops)
go func() {
defer wg.Done()
for i := 0; i < count; i++ {
_, err := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
Directory: "/",
Entry: &filer_pb.Entry{
Name: "f",
IsDirectory: false,
},
})
if err != nil {
t.Errorf("unary CreateEntry: %v", err)
return
}
}
}()
}
wg.Wait()
return time.Since(start)
}
// pathFn chooses the (directory, name) of the i'th op. The default used by the
// existing callers is samePath — all ops hit "/f". distinctPath gives every
// op its own name so cross-path parallelism can kick in.
type pathFn func(i int) (dir, name string)
func samePath(_ int) (string, string) { return "/", "f" }
func distinctPath(i int) (string, string) { return "/", "f" + itoa(i) }
// itoa is a tight local decimal formatter — avoids a strconv import inside a
// hot benchmark loop.
func itoa(n int) string {
if n == 0 {
return "0"
}
buf := make([]byte, 0, 12)
for n > 0 {
buf = append(buf, byte('0'+n%10))
n /= 10
}
for i, j := 0, len(buf)-1; i < j; i, j = i+1, j-1 {
buf[i], buf[j] = buf[j], buf[i]
}
return string(buf)
}
// runStreamCreateWorkload fires `ops` CreateEntry ops through a SINGLE bidi
// stream, with `concurrency` goroutines all submitting to the same stream via
// a small multiplexer that mirrors weed/mount/weedfs_stream_mutate.go's
// sendLoop+recvLoop structure. Returns total duration.
func runStreamCreateWorkload(t testing.TB, conn *grpc.ClientConn, concurrency, ops int) time.Duration {
return runStreamCreateWorkloadAt(t, conn, concurrency, ops, samePath)
}
func runStreamCreateWorkloadAt(t testing.TB, conn *grpc.ClientConn, concurrency, ops int, path pathFn) time.Duration {
t.Helper()
client := filer_pb.NewSeaweedFilerClient(conn)
stream, err := client.StreamMutateEntry(context.Background())
if err != nil {
t.Fatalf("open stream: %v", err)
}
var nextID atomic.Uint64
var pending sync.Map // map[uint64]chan struct{}
// single recv goroutine, dispatching by request id
recvErr := make(chan error, 1)
go func() {
for {
resp, err := stream.Recv()
if err != nil {
recvErr <- err
return
}
if ch, ok := pending.LoadAndDelete(resp.RequestId); ok {
close(ch.(chan struct{}))
}
}
}()
// dedicated send goroutine (gRPC Send is not concurrent-safe)
sendCh := make(chan *filer_pb.StreamMutateEntryRequest, 512)
sendErr := make(chan error, 1)
go func() {
for req := range sendCh {
if err := stream.Send(req); err != nil {
sendErr <- err
return
}
}
sendErr <- nil
}()
var wg sync.WaitGroup
start := time.Now()
offset := 0
for g := 0; g < concurrency; g++ {
wg.Add(1)
count := opsForWorker(g, concurrency, ops)
startOff := offset
offset += count
go func(startOff, count int) {
defer wg.Done()
for i := 0; i < count; i++ {
id := nextID.Add(1)
done := make(chan struct{})
pending.Store(id, done)
dir, name := path(startOff + i)
sendCh <- &filer_pb.StreamMutateEntryRequest{
RequestId: id,
Request: &filer_pb.StreamMutateEntryRequest_CreateRequest{
CreateRequest: &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{Name: name},
},
},
}
<-done
}
}(startOff, count)
}
wg.Wait()
elapsed := time.Since(start)
close(sendCh)
_ = stream.CloseSend()
select {
case <-sendErr:
case <-time.After(time.Second):
}
select {
case <-recvErr:
case <-time.After(time.Second):
}
return elapsed
}
// runStreamAsyncCreateWorkload fires `ops` CreateEntry ops through a single
// bidi stream WITHOUT the client waiting for responses between sends. The
// timer starts at the first send and stops when the last response is received,
// so this measures true end-to-end throughput (not just client-send rate).
//
// This isolates the question: does relaxing client-side per-request waiting
// help throughput when the server is a serial loop?
func runStreamAsyncCreateWorkload(t testing.TB, conn *grpc.ClientConn, concurrency, ops int) time.Duration {
t.Helper()
client := filer_pb.NewSeaweedFilerClient(conn)
stream, err := client.StreamMutateEntry(context.Background())
if err != nil {
t.Fatalf("open stream: %v", err)
}
var nextID atomic.Uint64
var outstanding atomic.Int64
allDone := make(chan struct{})
// Single recv goroutine — signals allDone when the last response arrives.
recvErr := make(chan error, 1)
go func() {
for {
_, err := stream.Recv()
if err != nil {
recvErr <- err
return
}
if outstanding.Add(-1) == 0 {
select {
case <-allDone:
default:
close(allDone)
}
return
}
}
}()
// Dedicated send goroutine (gRPC Send is not concurrent-safe).
sendCh := make(chan *filer_pb.StreamMutateEntryRequest, 4096)
sendErr := make(chan error, 1)
go func() {
for req := range sendCh {
if err := stream.Send(req); err != nil {
sendErr <- err
return
}
}
sendErr <- nil
}()
outstanding.Store(int64(ops))
var wg sync.WaitGroup
start := time.Now()
for g := 0; g < concurrency; g++ {
wg.Add(1)
count := opsForWorker(g, concurrency, ops)
go func(count int) {
defer wg.Done()
for i := 0; i < count; i++ {
id := nextID.Add(1)
// Fire-and-forget: push to sendCh, do not wait for response.
sendCh <- &filer_pb.StreamMutateEntryRequest{
RequestId: id,
Request: &filer_pb.StreamMutateEntryRequest_CreateRequest{
CreateRequest: &filer_pb.CreateEntryRequest{
Directory: "/",
Entry: &filer_pb.Entry{Name: "f"},
},
},
}
}
}(count)
}
wg.Wait()
// All requests queued; wait for server to drain all responses.
<-allDone
elapsed := time.Since(start)
close(sendCh)
_ = stream.CloseSend()
select {
case <-sendErr:
case <-time.After(time.Second):
}
select {
case <-recvErr:
case <-time.After(time.Second):
}
return elapsed
}
// TestReproStreamSerializationCeiling runs a head-to-head: same concurrency,
// same op count, same per-request service delay. The only difference is the
// transport — unary (one goroutine per RPC at the server) vs single bidi stream
// (one goroutine for the whole stream at the server).
//
// With a 2ms per-request service delay, unary with N=12 should deliver ~6000/s
// (12 concurrent handlers, each 2ms), while stream delivers ~500/s (serial).
func TestReproStreamSerializationCeiling(t *testing.T) {
if testing.Short() {
t.Skip("skipping repro in -short mode")
}
const (
concurrency = 12
opsPerRun = 1200
serviceDelay = 2 * time.Millisecond
)
addr, fake, stop := startFakeFilerServer(t, serviceDelay)
defer stop()
conn := dialFakeFiler(t, addr)
defer conn.Close()
// warm up one call so the gRPC HTTP/2 connection is fully established
warmupClient := filer_pb.NewSeaweedFilerClient(conn)
if _, err := warmupClient.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
Directory: "/",
Entry: &filer_pb.Entry{Name: "warmup"},
}); err != nil {
t.Fatalf("warmup: %v", err)
}
fake.createCalls.Store(0)
unaryDur := runUnaryCreateWorkload(t, conn, concurrency, opsPerRun)
unaryCalls := fake.createCalls.Load()
fake.createCalls.Store(0)
streamDur := runStreamCreateWorkload(t, conn, concurrency, opsPerRun)
streamCalls := fake.createCalls.Load()
unaryQPS := float64(unaryCalls) / unaryDur.Seconds()
streamQPS := float64(streamCalls) / streamDur.Seconds()
t.Logf("service delay: %v", serviceDelay)
t.Logf("concurrency: %d", concurrency)
t.Logf("ops per run: %d", opsPerRun)
t.Logf("unary : %d calls in %v -> %.0f QPS", unaryCalls, unaryDur, unaryQPS)
t.Logf("stream : %d calls in %v -> %.0f QPS", streamCalls, streamDur, streamQPS)
t.Logf("unary / stream ratio: %.2fx", unaryQPS/streamQPS)
// The serial stream handler cannot exceed 1 / serviceDelay regardless of
// client concurrency. Assert that unary is meaningfully faster — if the
// server is ever parallelized, this assertion is safe to revisit.
if unaryQPS < streamQPS*2 {
t.Fatalf("expected unary to be at least 2x faster than stream, got unary=%.0f stream=%.0f",
unaryQPS, streamQPS)
}
// Stream QPS should cluster near the theoretical ceiling of 1/serviceDelay.
theoreticalMax := 1.0 / serviceDelay.Seconds()
if streamQPS > theoreticalMax*1.25 {
t.Fatalf("stream QPS %.0f exceeds theoretical serial ceiling %.0f — handler may have changed",
streamQPS, theoreticalMax)
}
}
// TestServerSerialVsConcurrentHandler measures the lift from changing the
// server-side handler from serial (one goroutine per stream) to concurrent
// (per-request goroutine, bounded by a semaphore, Send protected by a mutex).
//
// Client is identical in both runs (sync stream, N concurrent submitters).
// Only the server handler differs.
func TestServerSerialVsConcurrentHandler(t *testing.T) {
if testing.Short() {
t.Skip("skipping repro in -short mode")
}
const (
opsPerRun = 2400
serviceDelay = 2 * time.Millisecond
serverConcurrency = 64
)
// Serial server (the OLD handler).
serialAddr, serialFake, stopSerial := startFakeFilerServer(t, serviceDelay)
defer stopSerial()
serialConn := dialFakeFiler(t, serialAddr)
defer serialConn.Close()
// Concurrent server (the NEW handler).
concAddr, concFake, stopConc := startFakeConcurrentFilerServer(t, serviceDelay, serverConcurrency)
defer stopConc()
concConn := dialFakeFiler(t, concAddr)
defer concConn.Close()
// Warm up both connections.
for _, c := range []*grpc.ClientConn{serialConn, concConn} {
client := filer_pb.NewSeaweedFilerClient(c)
if _, err := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
Directory: "/",
Entry: &filer_pb.Entry{Name: "warmup"},
}); err != nil {
t.Fatalf("warmup: %v", err)
}
}
clientConcurrencies := []int{1, 12, 64, 256}
theoreticalSerial := 1.0 / serviceDelay.Seconds()
theoreticalConcurrent := float64(serverConcurrency) / serviceDelay.Seconds()
t.Logf("service delay: %v", serviceDelay)
t.Logf("server-side concurrency cap: %d", serverConcurrency)
t.Logf("theoretical serial ceiling: %.0f QPS", theoreticalSerial)
t.Logf("theoretical concurrent cap: %.0f QPS", theoreticalConcurrent)
t.Logf("ops per run: %d", opsPerRun)
t.Logf("")
t.Logf("%-16s %-14s %-14s %-10s",
"client workers", "serial QPS", "concurrent QPS", "lift")
for _, c := range clientConcurrencies {
serialFake.createCalls.Store(0)
serialDur := runStreamCreateWorkload(t, serialConn, c, opsPerRun)
serialCalls := serialFake.createCalls.Load()
serialQPS := float64(serialCalls) / serialDur.Seconds()
concFake.createCalls.Store(0)
concDur := runStreamCreateWorkload(t, concConn, c, opsPerRun)
concCalls := concFake.createCalls.Load()
concQPS := float64(concCalls) / concDur.Seconds()
t.Logf("%-16d %-14.0f %-14.0f %.2fx",
c, serialQPS, concQPS, concQPS/serialQPS)
}
}
// TestSchedulerOrderedParallelism compares three server-side handler shapes
// under two workloads:
//
// serial : OLD handler — one goroutine per stream
// sem : NEW handler v1 — per-request goroutine + semaphore, no ordering
// scheduler : NEW handler v2 — per-path admission + subtree barriers
// (mirrors filer.sync's MetadataProcessor)
//
// Workload "distinct" puts each op on its own path; workload "same" puts every
// op on the same path.
//
// Expected shape:
// - serial — both workloads ~1/serviceDelay (no parallelism)
// - sem — both workloads lift ~serverConcurrency× (wrong: reorders
// same-path ops, acceptable for our filer store but undesired)
// - scheduler — distinct workload lifts ~serverConcurrency×
// same workload falls back to ~1/serviceDelay (correctness)
func TestSchedulerOrderedParallelism(t *testing.T) {
if testing.Short() {
t.Skip("skipping repro in -short mode")
}
const (
opsPerRun = 2400
serviceDelay = 2 * time.Millisecond
serverConcurrency = 64
clientWorkers = 12
)
// Serial, sem-only concurrent, scheduler-based concurrent.
serialAddr, serialFake, stopSerial := startFakeFilerServer(t, serviceDelay)
defer stopSerial()
serialConn := dialFakeFiler(t, serialAddr)
defer serialConn.Close()
semAddr, semFake, stopSem := startFakeConcurrentFilerServer(t, serviceDelay, serverConcurrency)
defer stopSem()
semConn := dialFakeFiler(t, semAddr)
defer semConn.Close()
schedAddr, schedFake, stopSched := startFakeSchedulerFilerServer(t, serviceDelay, serverConcurrency)
defer stopSched()
schedConn := dialFakeFiler(t, schedAddr)
defer schedConn.Close()
// Warm up all three connections.
for _, c := range []*grpc.ClientConn{serialConn, semConn, schedConn} {
client := filer_pb.NewSeaweedFilerClient(c)
if _, err := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
Directory: "/",
Entry: &filer_pb.Entry{Name: "warmup"},
}); err != nil {
t.Fatalf("warmup: %v", err)
}
}
t.Logf("service delay: %v", serviceDelay)
t.Logf("server concurrency: %d", serverConcurrency)
t.Logf("client workers: %d", clientWorkers)
t.Logf("ops per run: %d", opsPerRun)
t.Logf("serial ceiling: %.0f QPS", 1.0/serviceDelay.Seconds())
t.Logf("concurrent ceiling: %.0f QPS (%d x %.0f)", float64(serverConcurrency)/serviceDelay.Seconds(), serverConcurrency, 1.0/serviceDelay.Seconds())
t.Logf("")
workloads := []struct {
name string
fn pathFn
}{
{"distinct paths", distinctPath},
{"same path ", samePath},
}
t.Logf("%-16s %-14s %-14s %-14s %-14s",
"workload", "serial QPS", "sem QPS", "sched QPS", "sched peak-conc")
for _, w := range workloads {
serialFake.createCalls.Store(0)
serialDur := runStreamCreateWorkloadAt(t, serialConn, clientWorkers, opsPerRun, w.fn)
serialQPS := float64(serialFake.createCalls.Load()) / serialDur.Seconds()
semFake.createCalls.Store(0)
semDur := runStreamCreateWorkloadAt(t, semConn, clientWorkers, opsPerRun, w.fn)
semQPS := float64(semFake.createCalls.Load()) / semDur.Seconds()
schedFake.createCalls.Store(0)
schedFake.maxInFlight.Store(0)
schedDur := runStreamCreateWorkloadAt(t, schedConn, clientWorkers, opsPerRun, w.fn)
schedQPS := float64(schedFake.createCalls.Load()) / schedDur.Seconds()
schedPeak := schedFake.maxInFlight.Load()
t.Logf("%-16s %-14.0f %-14.0f %-14.0f %d",
w.name, serialQPS, semQPS, schedQPS, schedPeak)
}
}
// TestStreamSyncVsAsyncClient measures whether making the CLIENT side
// asynchronous (fire-and-forget, no per-request waits) lifts the server-set
// ceiling. It runs sync and async workloads against the same serial server
// and compares end-to-end throughput.
//
// Expectation: at steady state, async cannot exceed 1/serviceDelay because the
// server still processes one request at a time. Any "improvement" is transient
// (in-flight pipelining up to the HTTP/2 window) and disappears as ops >> window.
func TestStreamSyncVsAsyncClient(t *testing.T) {
if testing.Short() {
t.Skip("skipping repro in -short mode")
}
const (
opsPerRun = 2400
serviceDelay = 2 * time.Millisecond
)
addr, fake, stop := startFakeFilerServer(t, serviceDelay)
defer stop()
conn := dialFakeFiler(t, addr)
defer conn.Close()
warmupClient := filer_pb.NewSeaweedFilerClient(conn)
if _, err := warmupClient.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
Directory: "/",
Entry: &filer_pb.Entry{Name: "warmup"},
}); err != nil {
t.Fatalf("warmup: %v", err)
}
// Sweep concurrency to show that async does not break the 1/serviceDelay
// ceiling no matter how much client-side parallelism we throw at it.
concurrencies := []int{1, 12, 64, 256}
t.Logf("service delay: %v (theoretical serial ceiling = %.0f QPS)",
serviceDelay, 1.0/serviceDelay.Seconds())
t.Logf("ops per run: %d", opsPerRun)
t.Logf("")
t.Logf("%-12s %-12s %-12s %-10s", "concurrency", "sync QPS", "async QPS", "ratio")
for _, c := range concurrencies {
fake.createCalls.Store(0)
syncDur := runStreamCreateWorkload(t, conn, c, opsPerRun)
syncCalls := fake.createCalls.Load()
syncQPS := float64(syncCalls) / syncDur.Seconds()
fake.createCalls.Store(0)
asyncDur := runStreamAsyncCreateWorkload(t, conn, c, opsPerRun)
asyncCalls := fake.createCalls.Load()
asyncQPS := float64(asyncCalls) / asyncDur.Seconds()
t.Logf("%-12d %-12.0f %-12.0f %-10.2fx",
c, syncQPS, asyncQPS, asyncQPS/syncQPS)
}
}
@@ -0,0 +1,379 @@
package weed_server
import (
"path"
"sync"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// mutateJobKind classifies a streamed mutation for conflict detection. This is
// the same taxonomy MetadataProcessor uses in weed/command/filer_sync_jobs.go:
// separate a "barrier" directory event (create/delete/rename) from an in-place
// attribute-only directory update so that mtime/xattr bumps do not serialize
// every file operation in the subtree.
type mutateJobKind int
const (
// kindMutateFile: a regular file event. Conflicts with any in-flight job
// on the same path and with any barrier directory on the same path or on
// an ancestor.
kindMutateFile mutateJobKind = iota
// kindMutateBarrierDir: a directory create, a directory rename, or a
// delete (target type unknown on the wire). Acts as a subtree barrier:
// must drain every active descendant, and blocks every new job under it
// until it completes.
kindMutateBarrierDir
// kindMutateNonBarrierDir: an in-place directory attribute update
// (mode / xattr / mtime with unchanged name). Conflicts with any other
// in-flight job at the same path, but never with descendants.
kindMutateNonBarrierDir
)
// mutateScheduler serializes mutations by path while allowing cross-path work
// to run in parallel. The conflict taxonomy mirrors filer.sync's
// MetadataProcessor (weed/command/filer_sync_jobs.go); admission adds
// per-path FIFO ordering so two requests arriving on the same stream on the
// same path are always processed in arrival order, even when a Cond broadcast
// would otherwise race them.
//
// Invariants:
// - pathQueue[p] contains every waiter (pending + admitted) that is
// interested in path p, in arrival order. admit dequeues nothing; Done
// dequeues the head.
// - A waiter is admissible iff it is the head of every path queue it
// joined (primary and secondary), pathConflictsLocked passes for each
// path against *active* state, and totalActive < concurrencyLimit.
type mutateScheduler struct {
concurrencyLimit int
mu sync.Mutex
totalActive int
activeFilePaths map[util.FullPath]int
activeBarrierDirPaths map[util.FullPath]int
activeNonBarrierDirPaths map[util.FullPath]int
descendantCount map[util.FullPath]int
// pathQueue maps each path to the FIFO list of waiters (pending or
// admitted) interested in it. Entries are removed by Done.
pathQueue map[util.FullPath][]*mutateWaiter
}
// mutateWaiter is one outstanding Admit call. admitted flips under mu when
// the waiter moves from pending to active; ready is closed at the same time
// so the Admit caller unblocks.
type mutateWaiter struct {
primary, secondary util.FullPath
kind mutateJobKind
admitted bool
ready chan struct{}
}
func newMutateScheduler(concurrency int) *mutateScheduler {
return &mutateScheduler{
concurrencyLimit: concurrency,
activeFilePaths: make(map[util.FullPath]int),
activeBarrierDirPaths: make(map[util.FullPath]int),
activeNonBarrierDirPaths: make(map[util.FullPath]int),
descendantCount: make(map[util.FullPath]int),
pathQueue: make(map[util.FullPath][]*mutateWaiter),
}
}
// Admit blocks until this (primary, secondary, kind) tuple can be admitted
// without violating the conflict rules and without exceeding concurrencyLimit,
// and until every earlier waiter on any of its paths has itself been admitted.
// On return the job is registered in the indexes; the caller must call Done
// with the same arguments when the work is finished.
//
// For single-path operations (create / update / delete) pass secondary="".
// For rename, pass old path as primary and new path as secondary; kind is
// kindMutateBarrierDir.
func (s *mutateScheduler) Admit(primary, secondary util.FullPath, kind mutateJobKind) {
w := &mutateWaiter{
primary: primary,
secondary: secondary,
kind: kind,
ready: make(chan struct{}),
}
s.mu.Lock()
s.pathQueue[primary] = append(s.pathQueue[primary], w)
if secondary != "" && secondary != primary {
s.pathQueue[secondary] = append(s.pathQueue[secondary], w)
}
s.tryPromoteLocked()
s.mu.Unlock()
<-w.ready
}
// Done releases the slot reserved by Admit and promotes any waiters that
// became admissible as a result. Must be called exactly once per successful
// Admit with the same arguments.
func (s *mutateScheduler) Done(primary, secondary util.FullPath, kind mutateJobKind) {
s.mu.Lock()
defer s.mu.Unlock()
s.removePathLocked(primary, kind)
if secondary != "" {
s.removePathLocked(secondary, kind)
}
s.totalActive--
s.dequeueHeadLocked(primary)
if secondary != "" && secondary != primary {
s.dequeueHeadLocked(secondary)
}
s.tryPromoteLocked()
}
// dequeueHeadLocked removes the current head of path p and deletes the map
// entry when the queue becomes empty. Must be called under s.mu.
func (s *mutateScheduler) dequeueHeadLocked(p util.FullPath) {
q := s.pathQueue[p]
if len(q) == 0 {
return
}
// Shift left without reallocating; drop the reference so the waiter can
// be garbage-collected before the tail shrinks.
q[0] = nil
copy(q, q[1:])
q = q[:len(q)-1]
if len(q) == 0 {
delete(s.pathQueue, p)
} else {
s.pathQueue[p] = q
}
}
// tryPromoteLocked admits as many queue heads as possible while respecting
// path-FIFO order, the active-state conflict rules, and concurrencyLimit.
// The admitted set can grow in one call because admitting one waiter frees
// zero or more paths whose new heads may now pass the conflict check.
// Must be called under s.mu.
func (s *mutateScheduler) tryPromoteLocked() {
for s.totalActive < s.concurrencyLimit {
promoted := false
// Walk distinct head waiters across all path queues. Map iteration
// order is randomized, which is fine: path-FIFO is preserved by the
// head-of-queue check inside admitIfHeadLocked, and cross-path order
// is not constrained.
for _, q := range s.pathQueue {
if len(q) == 0 {
continue
}
w := q[0]
if w.admitted {
continue
}
if s.admitIfHeadLocked(w) {
promoted = true
if s.totalActive >= s.concurrencyLimit {
return
}
}
}
if !promoted {
return
}
}
}
// admitIfHeadLocked admits w if w is the head of every path queue it joined
// and pathConflictsLocked passes. Returns true if admitted. Must be called
// under s.mu.
func (s *mutateScheduler) admitIfHeadLocked(w *mutateWaiter) bool {
if s.pathQueue[w.primary][0] != w {
return false
}
if w.secondary != "" && w.secondary != w.primary {
q := s.pathQueue[w.secondary]
if len(q) == 0 || q[0] != w {
return false
}
}
if s.pathConflictsLocked(w.primary, w.kind) {
return false
}
if w.secondary != "" && s.pathConflictsLocked(w.secondary, w.kind) {
return false
}
s.addPathLocked(w.primary, w.kind)
if w.secondary != "" {
s.addPathLocked(w.secondary, w.kind)
}
s.totalActive++
w.admitted = true
close(w.ready)
return true
}
// pathConflictsLocked mirrors MetadataProcessor.pathConflicts exactly.
func (s *mutateScheduler) pathConflictsLocked(p util.FullPath, kind mutateJobKind) bool {
if s.activeBarrierDirPaths[p] > 0 {
return true
}
if kind == kindMutateBarrierDir && s.activeNonBarrierDirPaths[p] > 0 {
return true
}
if s.activeFilePaths[p] > 0 && (kind == kindMutateFile || kind == kindMutateBarrierDir) {
return true
}
if kind == kindMutateBarrierDir && s.descendantCount[p] > 0 {
return true
}
for _, ancestor := range mutatePathAncestors(p) {
if s.activeBarrierDirPaths[ancestor] > 0 {
return true
}
}
return false
}
func (s *mutateScheduler) addPathLocked(p util.FullPath, kind mutateJobKind) {
switch kind {
case kindMutateFile:
s.activeFilePaths[p]++
case kindMutateBarrierDir:
s.activeBarrierDirPaths[p]++
case kindMutateNonBarrierDir:
s.activeNonBarrierDirPaths[p]++
}
for _, ancestor := range mutatePathAncestors(p) {
s.descendantCount[ancestor]++
}
}
func (s *mutateScheduler) removePathLocked(p util.FullPath, kind mutateJobKind) {
switch kind {
case kindMutateFile:
if s.activeFilePaths[p] <= 1 {
delete(s.activeFilePaths, p)
} else {
s.activeFilePaths[p]--
}
case kindMutateBarrierDir:
if s.activeBarrierDirPaths[p] <= 1 {
delete(s.activeBarrierDirPaths, p)
} else {
s.activeBarrierDirPaths[p]--
}
case kindMutateNonBarrierDir:
if s.activeNonBarrierDirPaths[p] <= 1 {
delete(s.activeNonBarrierDirPaths, p)
} else {
s.activeNonBarrierDirPaths[p]--
}
}
for _, ancestor := range mutatePathAncestors(p) {
if s.descendantCount[ancestor] <= 1 {
delete(s.descendantCount, ancestor)
} else {
s.descendantCount[ancestor]--
}
}
}
// mutatePathAncestors mirrors filer_sync_jobs.go:pathAncestors — returns the
// proper ancestor directories of p. For "/a/b/c", returns ["/a/b", "/a", "/"].
func mutatePathAncestors(p util.FullPath) []util.FullPath {
var ancestors []util.FullPath
s := string(p)
for {
parent := path.Dir(s)
if parent == s {
break
}
ancestors = append(ancestors, util.FullPath(parent))
s = parent
}
return ancestors
}
// classifyMutation extracts the admission key(s) and kind for a mutation
// request. It returns primary, secondary (empty unless rename), and kind.
//
// Malformed requests (missing oneof payload, nil Entry, missing rename fields)
// are routed to a barrier at "/" so admission still runs under the full stream
// lock; the handler will then send EINVAL to the client. This keeps the
// scheduler and Recv loop crash-free regardless of client-side validation.
//
// Deletes are always classified as kindMutateBarrierDir because a
// DeleteEntryRequest can target a directory (empty or, with IsRecursive, with
// contents) but does not carry IsDirectory on the wire. Treating every delete
// as a barrier at its target path makes it conflict with an in-flight
// non-barrier directory update on the same path (e.g. chmod), which a
// kindMutateFile classification would miss.
func classifyMutation(req *filer_pb.StreamMutateEntryRequest) (primary, secondary util.FullPath, kind mutateJobKind) {
// Default fallback for any shape we cannot classify safely.
primary = util.FullPath("/")
kind = kindMutateBarrierDir
switch r := req.Request.(type) {
case *filer_pb.StreamMutateEntryRequest_CreateRequest:
cr := r.CreateRequest
if cr == nil || cr.Entry == nil {
return
}
primary = util.FullPath(cr.Directory).Child(cr.Entry.Name)
kind = classifyEntry(cr.Entry.IsDirectory, false)
return
case *filer_pb.StreamMutateEntryRequest_UpdateRequest:
ur := r.UpdateRequest
if ur == nil || ur.Entry == nil {
return
}
primary = util.FullPath(ur.Directory).Child(ur.Entry.Name)
// UpdateEntry never changes the name, so directory updates are always
// in-place attribute updates. File updates (chunk manifests, xattrs)
// are kindMutateFile and thus serialize against same-path file ops.
kind = classifyEntry(ur.Entry.IsDirectory, true)
return
case *filer_pb.StreamMutateEntryRequest_DeleteRequest:
dr := r.DeleteRequest
if dr == nil {
return
}
primary = util.FullPath(dr.Directory).Child(dr.Name)
// Barrier regardless of IsRecursive: the request does not carry the
// target's IsDirectory, and barrier classification correctly blocks
// concurrent non-barrier dir updates at the same path. Descendant
// wait for a non-recursive delete of a non-empty dir is wasted but
// not wrong — that call fails at the store anyway.
kind = kindMutateBarrierDir
return
case *filer_pb.StreamMutateEntryRequest_RenameRequest:
rr := r.RenameRequest
if rr == nil {
return
}
primary = util.FullPath(rr.OldDirectory).Child(rr.OldName)
secondary = util.FullPath(rr.NewDirectory).Child(rr.NewName)
// Renames reshape the namespace on both sides; conservatively treat as
// a subtree barrier so any in-flight descendant drains before the move
// and no new descendant is admitted until it completes.
kind = kindMutateBarrierDir
return
default:
return
}
}
func classifyEntry(isDirectory, isAttributeUpdate bool) mutateJobKind {
if !isDirectory {
return kindMutateFile
}
if isAttributeUpdate {
return kindMutateNonBarrierDir
}
return kindMutateBarrierDir
}
@@ -0,0 +1,528 @@
package weed_server
import (
"sync/atomic"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// TestMutatePathAncestors mirrors TestPathAncestors in filer_sync_jobs_test.go.
func TestMutatePathAncestors(t *testing.T) {
tests := []struct {
path util.FullPath
want []util.FullPath
}{
{"/a/b/c/file.txt", []util.FullPath{"/a/b/c", "/a/b", "/a", "/"}},
{"/a/b", []util.FullPath{"/a", "/"}},
{"/a", []util.FullPath{"/"}},
{"/", nil},
}
for _, tt := range tests {
got := mutatePathAncestors(tt.path)
if len(got) != len(tt.want) {
t.Errorf("mutatePathAncestors(%q) = %v, want %v", tt.path, got, tt.want)
continue
}
for i := range got {
if got[i] != tt.want[i] {
t.Errorf("mutatePathAncestors(%q)[%d] = %q, want %q", tt.path, i, got[i], tt.want[i])
}
}
}
}
// TestClassifyMutation covers the four request shapes and the delete-recursive
// barrier upgrade.
func TestClassifyMutation(t *testing.T) {
tests := []struct {
name string
req *filer_pb.StreamMutateEntryRequest
wantPrimary util.FullPath
wantSecondary util.FullPath
wantKind mutateJobKind
}{
{
name: "create file",
req: &filer_pb.StreamMutateEntryRequest{
Request: &filer_pb.StreamMutateEntryRequest_CreateRequest{
CreateRequest: &filer_pb.CreateEntryRequest{
Directory: "/a",
Entry: &filer_pb.Entry{Name: "f", IsDirectory: false},
},
},
},
wantPrimary: "/a/f",
wantKind: kindMutateFile,
},
{
name: "create directory (barrier)",
req: &filer_pb.StreamMutateEntryRequest{
Request: &filer_pb.StreamMutateEntryRequest_CreateRequest{
CreateRequest: &filer_pb.CreateEntryRequest{
Directory: "/a",
Entry: &filer_pb.Entry{Name: "d", IsDirectory: true},
},
},
},
wantPrimary: "/a/d",
wantKind: kindMutateBarrierDir,
},
{
name: "update file",
req: &filer_pb.StreamMutateEntryRequest{
Request: &filer_pb.StreamMutateEntryRequest_UpdateRequest{
UpdateRequest: &filer_pb.UpdateEntryRequest{
Directory: "/a",
Entry: &filer_pb.Entry{Name: "f", IsDirectory: false},
},
},
},
wantPrimary: "/a/f",
wantKind: kindMutateFile,
},
{
name: "update directory (non-barrier attr bump)",
req: &filer_pb.StreamMutateEntryRequest{
Request: &filer_pb.StreamMutateEntryRequest_UpdateRequest{
UpdateRequest: &filer_pb.UpdateEntryRequest{
Directory: "/",
Entry: &filer_pb.Entry{Name: "a", IsDirectory: true},
},
},
},
wantPrimary: "/a",
wantKind: kindMutateNonBarrierDir,
},
{
name: "delete non-recursive (barrier: target type unknown)",
req: &filer_pb.StreamMutateEntryRequest{
Request: &filer_pb.StreamMutateEntryRequest_DeleteRequest{
DeleteRequest: &filer_pb.DeleteEntryRequest{
Directory: "/a", Name: "f",
},
},
},
wantPrimary: "/a/f",
wantKind: kindMutateBarrierDir,
},
{
name: "delete recursive (barrier)",
req: &filer_pb.StreamMutateEntryRequest{
Request: &filer_pb.StreamMutateEntryRequest_DeleteRequest{
DeleteRequest: &filer_pb.DeleteEntryRequest{
Directory: "/a", Name: "d", IsRecursive: true,
},
},
},
wantPrimary: "/a/d",
wantKind: kindMutateBarrierDir,
},
{
name: "malformed create (nil Entry)",
req: &filer_pb.StreamMutateEntryRequest{
Request: &filer_pb.StreamMutateEntryRequest_CreateRequest{
CreateRequest: &filer_pb.CreateEntryRequest{Directory: "/a"},
},
},
wantPrimary: "/",
wantKind: kindMutateBarrierDir,
},
{
name: "malformed update (nil Entry)",
req: &filer_pb.StreamMutateEntryRequest{
Request: &filer_pb.StreamMutateEntryRequest_UpdateRequest{
UpdateRequest: &filer_pb.UpdateEntryRequest{Directory: "/a"},
},
},
wantPrimary: "/",
wantKind: kindMutateBarrierDir,
},
{
name: "empty oneof",
req: &filer_pb.StreamMutateEntryRequest{},
wantPrimary: "/",
wantKind: kindMutateBarrierDir,
},
{
name: "rename",
req: &filer_pb.StreamMutateEntryRequest{
Request: &filer_pb.StreamMutateEntryRequest_RenameRequest{
RenameRequest: &filer_pb.StreamRenameEntryRequest{
OldDirectory: "/src", OldName: "a",
NewDirectory: "/dst", NewName: "b",
},
},
},
wantPrimary: "/src/a",
wantSecondary: "/dst/b",
wantKind: kindMutateBarrierDir,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p, s, k := classifyMutation(tt.req)
if p != tt.wantPrimary {
t.Errorf("primary = %q, want %q", p, tt.wantPrimary)
}
if s != tt.wantSecondary {
t.Errorf("secondary = %q, want %q", s, tt.wantSecondary)
}
if k != tt.wantKind {
t.Errorf("kind = %v, want %v", k, tt.wantKind)
}
})
}
}
// TestPathConflictSamePathFile: two file ops on the same path conflict; on
// different paths they do not.
func TestPathConflictSamePathFile(t *testing.T) {
s := newMutateScheduler(100)
s.addPathLocked("/a/f", kindMutateFile)
if !s.pathConflictsLocked("/a/f", kindMutateFile) {
t.Error("same-path file+file should conflict")
}
if s.pathConflictsLocked("/a/g", kindMutateFile) {
t.Error("different-path file+file should not conflict")
}
}
// TestPathConflictBarrierBlocksSamePath: a barrier dir in flight at /a blocks
// any new job (file, barrier, non-barrier) at /a.
func TestPathConflictBarrierBlocksSamePath(t *testing.T) {
s := newMutateScheduler(100)
s.addPathLocked("/a", kindMutateBarrierDir)
for _, k := range []mutateJobKind{kindMutateFile, kindMutateBarrierDir, kindMutateNonBarrierDir} {
if !s.pathConflictsLocked("/a", k) {
t.Errorf("barrier at /a should block kind %v at /a", k)
}
}
}
// TestPathConflictBarrierBlocksDescendants: a barrier at /a blocks any job
// strictly under /a, regardless of kind.
func TestPathConflictBarrierBlocksDescendants(t *testing.T) {
s := newMutateScheduler(100)
s.addPathLocked("/a", kindMutateBarrierDir)
for _, child := range []util.FullPath{"/a/f", "/a/b", "/a/b/c"} {
for _, k := range []mutateJobKind{kindMutateFile, kindMutateBarrierDir, kindMutateNonBarrierDir} {
if !s.pathConflictsLocked(child, k) {
t.Errorf("barrier at /a should block %v at %q", k, child)
}
}
}
}
// TestPathConflictIncomingBarrierWaitsForDescendants: a new barrier at /a must
// wait for any active descendant (even a file) to drain.
func TestPathConflictIncomingBarrierWaitsForDescendants(t *testing.T) {
s := newMutateScheduler(100)
s.addPathLocked("/a/f", kindMutateFile)
if !s.pathConflictsLocked("/a", kindMutateBarrierDir) {
t.Error("incoming barrier at /a should wait for active file at /a/f")
}
}
// TestPathConflictNonBarrierDirAllowsDescendants: an in-flight attribute-only
// dir update at /a does NOT block descendants. This is the filer.sync
// optimization that prevents mtime bumps from serializing file writes.
func TestPathConflictNonBarrierDirAllowsDescendants(t *testing.T) {
s := newMutateScheduler(100)
s.addPathLocked("/a", kindMutateNonBarrierDir)
if s.pathConflictsLocked("/a/f", kindMutateFile) {
t.Error("non-barrier dir update at /a should not block file at /a/f")
}
if s.pathConflictsLocked("/a/b", kindMutateNonBarrierDir) {
t.Error("non-barrier dir update at /a should not block another non-barrier at /a/b")
}
}
// TestPathConflictIncomingBarrierWaitsForSamePathNonBarrier: a delete/rename on
// a dir must wait for an in-flight chmod/xattr/mtime update at the same dir.
func TestPathConflictIncomingBarrierWaitsForSamePathNonBarrier(t *testing.T) {
s := newMutateScheduler(100)
s.addPathLocked("/a", kindMutateNonBarrierDir)
if !s.pathConflictsLocked("/a", kindMutateBarrierDir) {
t.Error("incoming barrier at /a should wait for non-barrier update at /a")
}
// But non-barrier vs non-barrier at the same path may overlap (attr bumps).
if s.pathConflictsLocked("/a", kindMutateNonBarrierDir) {
t.Error("non-barrier vs non-barrier at /a should not conflict")
}
}
// TestAdmitDoneLifecycle: Admit marks state; Done unmarks; a subsequent
// conflicting Admit that was blocked must unblock after Done.
func TestAdmitDoneLifecycle(t *testing.T) {
s := newMutateScheduler(100)
s.Admit("/a/f", "", kindMutateFile)
// State is registered.
s.mu.Lock()
if s.activeFilePaths["/a/f"] != 1 {
s.mu.Unlock()
t.Fatalf("activeFilePaths[/a/f] = %d, want 1", s.activeFilePaths["/a/f"])
}
s.mu.Unlock()
// Start a blocked admit on the same path; it must not complete until Done.
admitted := make(chan struct{})
go func() {
s.Admit("/a/f", "", kindMutateFile)
close(admitted)
}()
select {
case <-admitted:
t.Fatal("second Admit should block while first is active")
case <-time.After(50 * time.Millisecond):
}
s.Done("/a/f", "", kindMutateFile)
select {
case <-admitted:
case <-time.After(time.Second):
t.Fatal("second Admit did not unblock after Done")
}
// Clean up.
s.Done("/a/f", "", kindMutateFile)
}
// TestAdmitParallelDistinctPaths: different paths do not block each other.
func TestAdmitParallelDistinctPaths(t *testing.T) {
s := newMutateScheduler(100)
done := make(chan struct{}, 3)
for i, p := range []util.FullPath{"/a", "/b", "/c"} {
go func(p util.FullPath, i int) {
s.Admit(p, "", kindMutateFile)
done <- struct{}{}
}(p, i)
}
for i := 0; i < 3; i++ {
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("distinct-path Admits blocked each other")
}
}
for _, p := range []util.FullPath{"/a", "/b", "/c"} {
s.Done(p, "", kindMutateFile)
}
}
// TestAdmitConcurrencyCap: with concurrencyLimit=2 and three Admits, the
// third must block until one of the first two calls Done.
func TestAdmitConcurrencyCap(t *testing.T) {
s := newMutateScheduler(2)
s.Admit("/a", "", kindMutateFile)
s.Admit("/b", "", kindMutateFile)
admittedThird := make(chan struct{})
go func() {
s.Admit("/c", "", kindMutateFile)
close(admittedThird)
}()
select {
case <-admittedThird:
t.Fatal("third Admit should block when cap is reached")
case <-time.After(50 * time.Millisecond):
}
s.Done("/a", "", kindMutateFile)
select {
case <-admittedThird:
case <-time.After(time.Second):
t.Fatal("third Admit did not unblock after Done")
}
s.Done("/b", "", kindMutateFile)
s.Done("/c", "", kindMutateFile)
}
// TestAdmitRenameTwoPathConflict: a rename holds both src and dst. A later
// single-path admit on either must block until Done clears both.
func TestAdmitRenameTwoPathConflict(t *testing.T) {
s := newMutateScheduler(100)
s.Admit("/src/a", "/dst/b", kindMutateBarrierDir)
for _, blocked := range []util.FullPath{"/src/a", "/dst/b"} {
admitted := make(chan struct{})
go func(p util.FullPath) {
s.Admit(p, "", kindMutateFile)
close(admitted)
}(blocked)
select {
case <-admitted:
t.Fatalf("Admit at %q should block while rename holds both paths", blocked)
case <-time.After(20 * time.Millisecond):
}
}
s.Done("/src/a", "/dst/b", kindMutateBarrierDir)
// Two waiters now become admissible; drain them.
deadline := time.After(time.Second)
drained := 0
for drained < 2 {
select {
case <-deadline:
t.Fatalf("only %d waiters unblocked after Done", drained)
default:
// Attempt to Done both waiters; they may still be racing to Admit.
time.Sleep(10 * time.Millisecond)
s.mu.Lock()
active := s.totalActive
s.mu.Unlock()
if active == 2 {
s.Done("/src/a", "", kindMutateFile)
s.Done("/dst/b", "", kindMutateFile)
drained = 2
}
}
}
}
// waitQueueLen blocks until pathQueue[p] has exactly n entries, or fails the
// test on timeout. Uses the scheduler's own mutex to observe state so the
// caller never needs a sleep to "let things settle".
func waitQueueLen(t *testing.T, s *mutateScheduler, p util.FullPath, n int) {
t.Helper()
deadline := time.Now().Add(time.Second)
for {
s.mu.Lock()
got := len(s.pathQueue[p])
s.mu.Unlock()
if got == n {
return
}
if time.Now().After(deadline) {
t.Fatalf("pathQueue[%q] len = %d, want %d (timeout)", p, got, n)
}
time.Sleep(100 * time.Microsecond)
}
}
// TestAdmitSamePathFIFO verifies arrival order is preserved for same-path
// admits. Regression test for the Cond.Broadcast race where later admits
// could be woken and registered before earlier ones.
//
// The arrival ordering is made deterministic by observing the scheduler's
// per-path queue length between spawns — no sleep-based fudging.
func TestAdmitSamePathFIFO(t *testing.T) {
s := newMutateScheduler(100)
// A barrier on /a holds the whole path while we enqueue N waiters.
// After this call pathQueue["/a"] has 1 entry (the barrier holder).
s.Admit("/a", "", kindMutateBarrierDir)
const N = 20
order := make(chan int, N)
for i := 0; i < N; i++ {
i := i
go func() {
s.Admit("/a", "", kindMutateFile)
order <- i
s.Done("/a", "", kindMutateFile)
}()
// Before spawning the next goroutine, wait until this one has
// observably enqueued itself. The barrier holder plus i+1 file
// waiters should be present on /a.
waitQueueLen(t, s, "/a", 1+i+1)
}
// Still held — nothing admitted yet.
select {
case got := <-order:
t.Fatalf("unexpected early admit %d while /a is held", got)
case <-time.After(20 * time.Millisecond):
}
s.Done("/a", "", kindMutateBarrierDir)
for i := 0; i < N; i++ {
select {
case got := <-order:
if got != i {
t.Fatalf("admit order[%d] = %d, want %d (FIFO violated)", i, got, i)
}
case <-time.After(time.Second):
t.Fatalf("only %d/%d admits completed", i, N)
}
}
}
// TestAdmitSamePathNonBarrierSerializes verifies that two non-barrier dir
// updates at the same path no longer overlap (filer.sync's last-writer-wins
// optimization is intentionally dropped for streamed mutations, which carry
// client-submitted operations whose order matters).
func TestAdmitSamePathNonBarrierSerializes(t *testing.T) {
s := newMutateScheduler(100)
s.Admit("/a", "", kindMutateNonBarrierDir)
second := make(chan struct{})
go func() {
s.Admit("/a", "", kindMutateNonBarrierDir)
close(second)
}()
select {
case <-second:
t.Fatal("second non-barrier dir update should not run concurrently with first at same path")
case <-time.After(30 * time.Millisecond):
}
s.Done("/a", "", kindMutateNonBarrierDir)
select {
case <-second:
case <-time.After(time.Second):
t.Fatal("second non-barrier admit did not unblock after first Done")
}
s.Done("/a", "", kindMutateNonBarrierDir)
}
// TestAdmitPressureFromManyWaiters: 100 goroutines all want /a; exactly one at
// a time is active; all 100 eventually complete. This is both a smoke test for
// the broadcast/signal wake-up and a guard against lost wake-ups.
func TestAdmitPressureFromManyWaiters(t *testing.T) {
s := newMutateScheduler(10) // cap higher than peak needed — path conflict should be the bottleneck
const N = 100
var activeAtOnce atomic.Int32
var peak atomic.Int32
done := make(chan struct{}, N)
for i := 0; i < N; i++ {
go func() {
s.Admit("/a", "", kindMutateFile)
a := activeAtOnce.Add(1)
for {
p := peak.Load()
if a <= p || peak.CompareAndSwap(p, a) {
break
}
}
time.Sleep(time.Millisecond) // keep admission held so overlap is observable
activeAtOnce.Add(-1)
s.Done("/a", "", kindMutateFile)
done <- struct{}{}
}()
}
for i := 0; i < N; i++ {
select {
case <-done:
case <-time.After(10 * time.Second):
t.Fatalf("only %d/%d admits completed — possible lost wake-up", i, N)
}
}
if peak.Load() != 1 {
t.Errorf("peak concurrent same-path admits = %d, want 1", peak.Load())
}
}