diff --git a/weed/server/filer_grpc_server_stream_mutate.go b/weed/server/filer_grpc_server_stream_mutate.go index 36a5e02bb..b66c8800c 100644 --- a/weed/server/filer_grpc_server_stream_mutate.go +++ b/weed/server/filer_grpc_server_stream_mutate.go @@ -2,90 +2,174 @@ package weed_server import ( "context" + "fmt" "io" "strings" + "sync" "syscall" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) +// streamMutateConcurrency bounds the number of in-flight mutations processed +// concurrently per client stream. Set to the typical filer-store sweet spot +// so one noisy mount cannot exhaust filer resources. +const streamMutateConcurrency = 64 + +// streamMutatePendingLimit caps total outstanding requests per stream +// (admitted + waiting-for-admission). Prevents goroutine explosion when a +// client floods requests against a conflicted path while leaving enough +// headroom that cross-path requests never block Recv in practice. +const streamMutatePendingLimit = 1024 + +// syncStream wraps a bidi stream so that concurrent goroutines can Send +// without interleaving frames. gRPC requires that Send not be called +// concurrently; this mutex is the serialization point. +type syncStream struct { + stream grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse] + mu sync.Mutex +} + +func (s *syncStream) Send(r *filer_pb.StreamMutateEntryResponse) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.stream.Send(r) +} + func (fs *FilerServer) StreamMutateEntry(stream grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse]) error { + ss := &syncStream{stream: stream} + // Path-keyed admission + subtree barriers, adapted from filer.sync's + // MetadataProcessor (weed/command/filer_sync_jobs.go). Admit blocks when + // a new request conflicts with an in-flight one on the same path or with + // a barrier directory at the same path / an ancestor. + sched := newMutateScheduler(streamMutateConcurrency) + // pendingSem caps goroutines-per-stream. The receive loop only blocks on + // this sem (not on Admit), so one conflicted path cannot head-of-line + // block receipt of unrelated paths — later distinct-path requests can be + // spawned, admitted, and processed while the conflicted request waits. + pendingSem := make(chan struct{}, streamMutatePendingLimit) + var wg sync.WaitGroup + // Track the first fatal send error across worker goroutines. + var sendErrMu sync.Mutex + var sendErr error + setSendErr := func(e error) { + sendErrMu.Lock() + if sendErr == nil { + sendErr = e + } + sendErrMu.Unlock() + } + getSendErr := func() error { + sendErrMu.Lock() + defer sendErrMu.Unlock() + return sendErr + } + for { + if e := getSendErr(); e != nil { + wg.Wait() + return e + } req, err := stream.Recv() if err == io.EOF { - return nil + wg.Wait() + return getSendErr() } if err != nil { + wg.Wait() return err } - switch r := req.Request.(type) { + primary, secondary, kind := classifyMutation(req) + pendingSem <- struct{}{} + wg.Add(1) + go func(req *filer_pb.StreamMutateEntryRequest, p, s util.FullPath, k mutateJobKind) { + defer wg.Done() + defer func() { <-pendingSem }() + // Admission happens off the Recv loop so a conflicted path never + // blocks receipt of unrelated requests. + sched.Admit(p, s, k) + defer sched.Done(p, s, k) + if e := fs.handleStreamMutateRequest(ss, req); e != nil { + setSendErr(e) + } + }(req, primary, secondary, kind) + } +} - case *filer_pb.StreamMutateEntryRequest_CreateRequest: - resp, createErr := fs.CreateEntry(stream.Context(), r.CreateRequest) - if createErr != nil { - resp = &filer_pb.CreateEntryResponse{Error: createErr.Error()} - } - streamResp := &filer_pb.StreamMutateEntryResponse{ - RequestId: req.RequestId, - IsLast: true, - Response: &filer_pb.StreamMutateEntryResponse_CreateResponse{CreateResponse: resp}, - } - if resp.Error != "" { - streamResp.Error = resp.Error - streamResp.Errno = int32(syscall.EIO) - } - if sendErr := stream.Send(streamResp); sendErr != nil { - return sendErr - } +// handleStreamMutateRequest processes one request and sends exactly one +// (possibly multi-message) response via the shared sync stream. It returns +// a non-nil error only if Send fails — in which case the caller should +// tear down the stream. +func (fs *FilerServer) handleStreamMutateRequest(ss *syncStream, req *filer_pb.StreamMutateEntryRequest) error { + switch r := req.Request.(type) { - case *filer_pb.StreamMutateEntryRequest_UpdateRequest: - resp, updateErr := fs.UpdateEntry(stream.Context(), r.UpdateRequest) - if updateErr != nil { - resp = &filer_pb.UpdateEntryResponse{} - } - streamResp := &filer_pb.StreamMutateEntryResponse{ - RequestId: req.RequestId, - IsLast: true, - Response: &filer_pb.StreamMutateEntryResponse_UpdateResponse{UpdateResponse: resp}, - } - if updateErr != nil { - streamResp.Error = updateErr.Error() - streamResp.Errno = int32(syscall.EIO) - } - if sendErr := stream.Send(streamResp); sendErr != nil { - return sendErr - } - - case *filer_pb.StreamMutateEntryRequest_DeleteRequest: - resp, deleteErr := fs.DeleteEntry(stream.Context(), r.DeleteRequest) - if deleteErr != nil { - resp = &filer_pb.DeleteEntryResponse{Error: deleteErr.Error()} - } - streamResp := &filer_pb.StreamMutateEntryResponse{ - RequestId: req.RequestId, - IsLast: true, - Response: &filer_pb.StreamMutateEntryResponse_DeleteResponse{DeleteResponse: resp}, - } - if resp.Error != "" { - streamResp.Error = resp.Error - streamResp.Errno = int32(syscall.EIO) - } - if sendErr := stream.Send(streamResp); sendErr != nil { - return sendErr - } - - case *filer_pb.StreamMutateEntryRequest_RenameRequest: - if err := fs.handleStreamMutateRename(stream, req.RequestId, r.RenameRequest); err != nil { - return err - } - - default: - glog.Warningf("StreamMutateEntry: unknown request type %T", req.Request) + case *filer_pb.StreamMutateEntryRequest_CreateRequest: + resp, createErr := fs.CreateEntry(ss.stream.Context(), r.CreateRequest) + if createErr != nil { + resp = &filer_pb.CreateEntryResponse{Error: createErr.Error()} } + out := &filer_pb.StreamMutateEntryResponse{ + RequestId: req.RequestId, + IsLast: true, + Response: &filer_pb.StreamMutateEntryResponse_CreateResponse{CreateResponse: resp}, + } + if resp.Error != "" { + out.Error = resp.Error + out.Errno = int32(syscall.EIO) + } + return ss.Send(out) + + case *filer_pb.StreamMutateEntryRequest_UpdateRequest: + resp, updateErr := fs.UpdateEntry(ss.stream.Context(), r.UpdateRequest) + if updateErr != nil { + resp = &filer_pb.UpdateEntryResponse{} + } + out := &filer_pb.StreamMutateEntryResponse{ + RequestId: req.RequestId, + IsLast: true, + Response: &filer_pb.StreamMutateEntryResponse_UpdateResponse{UpdateResponse: resp}, + } + if updateErr != nil { + out.Error = updateErr.Error() + out.Errno = int32(syscall.EIO) + } + return ss.Send(out) + + case *filer_pb.StreamMutateEntryRequest_DeleteRequest: + resp, deleteErr := fs.DeleteEntry(ss.stream.Context(), r.DeleteRequest) + if deleteErr != nil { + resp = &filer_pb.DeleteEntryResponse{Error: deleteErr.Error()} + } + out := &filer_pb.StreamMutateEntryResponse{ + RequestId: req.RequestId, + IsLast: true, + Response: &filer_pb.StreamMutateEntryResponse_DeleteResponse{DeleteResponse: resp}, + } + if resp.Error != "" { + out.Error = resp.Error + out.Errno = int32(syscall.EIO) + } + return ss.Send(out) + + case *filer_pb.StreamMutateEntryRequest_RenameRequest: + return fs.handleStreamMutateRename(ss, req.RequestId, r.RenameRequest) + + default: + // Send a terminal error response so the client's per-RequestId waiter + // is released; returning nil here would leak the client-side waiter + // forever when a future oneof variant or a malformed request arrives. + glog.Warningf("StreamMutateEntry: unknown request type %T", req.Request) + return ss.Send(&filer_pb.StreamMutateEntryResponse{ + RequestId: req.RequestId, + IsLast: true, + Error: "unknown request type", + Errno: int32(syscall.EINVAL), + }) } } @@ -93,7 +177,7 @@ func (fs *FilerServer) StreamMutateEntry(stream grpc.BidiStreamingServer[filer_p // using a proxy stream that converts StreamRenameEntryResponse events into // StreamMutateEntryResponse messages on the parent bidi stream. func (fs *FilerServer) handleStreamMutateRename( - parent grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse], + parent *syncStream, requestId uint64, req *filer_pb.StreamRenameEntryRequest, ) error { @@ -112,10 +196,7 @@ func (fs *FilerServer) handleStreamMutateRename( finalResp.Errno = renameErrno(renameErr) glog.V(0).Infof("StreamMutateEntry rename: %v", renameErr) } - if sendErr := parent.Send(finalResp); sendErr != nil { - return sendErr - } - return nil + return parent.Send(finalResp) } // renameStreamProxy adapts the bidi StreamMutateEntry stream to look like a @@ -123,7 +204,7 @@ func (fs *FilerServer) handleStreamMutateRename( // moveEntry expect. Each Send() call forwards the response as a non-final // StreamMutateEntryResponse. type renameStreamProxy struct { - parent grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse] + parent *syncStream requestId uint64 } @@ -136,14 +217,32 @@ func (p *renameStreamProxy) Send(resp *filer_pb.StreamRenameEntryResponse) error } func (p *renameStreamProxy) Context() context.Context { - return p.parent.Context() + return p.parent.stream.Context() } -func (p *renameStreamProxy) SendMsg(m any) error { return p.parent.SendMsg(m) } -func (p *renameStreamProxy) RecvMsg(m any) error { return p.parent.RecvMsg(m) } -func (p *renameStreamProxy) SetHeader(md metadata.MD) error { return p.parent.SetHeader(md) } -func (p *renameStreamProxy) SendHeader(md metadata.MD) error { return p.parent.SendHeader(md) } -func (p *renameStreamProxy) SetTrailer(md metadata.MD) { p.parent.SetTrailer(md) } +// SendMsg routes through Send so the payload is wrapped into a +// StreamMutateEntryResponse and goes through the syncStream mutex. Calling +// SendMsg with anything other than *filer_pb.StreamRenameEntryResponse would +// emit the wrong protobuf type on this RPC, so reject other shapes. +func (p *renameStreamProxy) SendMsg(m any) error { + resp, ok := m.(*filer_pb.StreamRenameEntryResponse) + if !ok { + return fmt.Errorf("renameStreamProxy.SendMsg: unexpected type %T", m) + } + return p.Send(resp) +} + +// RecvMsg on the proxy would race with the outer StreamMutateEntry recv +// loop and could steal unrelated mutation requests. The rename logic never +// calls RecvMsg (it is strictly a server-push stream), so fail loudly if it +// ever does. +func (p *renameStreamProxy) RecvMsg(m any) error { + return fmt.Errorf("renameStreamProxy.RecvMsg is not supported") +} + +func (p *renameStreamProxy) SetHeader(md metadata.MD) error { return p.parent.stream.SetHeader(md) } +func (p *renameStreamProxy) SendHeader(md metadata.MD) error { return p.parent.stream.SendHeader(md) } +func (p *renameStreamProxy) SetTrailer(md metadata.MD) { p.parent.stream.SetTrailer(md) } // renameErrno maps a rename error to a POSIX errno for the client. func renameErrno(err error) int32 { diff --git a/weed/server/filer_grpc_server_stream_mutate_bench_test.go b/weed/server/filer_grpc_server_stream_mutate_bench_test.go new file mode 100644 index 000000000..cfd8a7981 --- /dev/null +++ b/weed/server/filer_grpc_server_stream_mutate_bench_test.go @@ -0,0 +1,792 @@ +package weed_server + +import ( + "context" + "io" + "net" + "sync" + "sync/atomic" + "syscall" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// fakeFilerServer simulates a filer whose CreateEntry takes a fixed amount of +// wall-clock time (meant to model the filer-store commit latency). It implements +// only what these benchmarks need: unary CreateEntry and bidi StreamMutateEntry. +// +// The StreamMutateEntry handler mirrors the production handler's structure: +// a single goroutine per stream, processing one request at a time. +type fakeFilerServer struct { + filer_pb.UnimplementedSeaweedFilerServer + serviceDelay time.Duration + createCalls atomic.Int64 +} + +func (s *fakeFilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (*filer_pb.CreateEntryResponse, error) { + s.createCalls.Add(1) + if s.serviceDelay > 0 { + time.Sleep(s.serviceDelay) + } + return &filer_pb.CreateEntryResponse{}, nil +} + +// StreamMutateEntry mirrors the OLD serial handler: one goroutine, strictly +// one request at a time. +func (s *fakeFilerServer) StreamMutateEntry(stream grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse]) error { + for { + req, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + switch r := req.Request.(type) { + case *filer_pb.StreamMutateEntryRequest_CreateRequest: + resp, createErr := s.CreateEntry(stream.Context(), r.CreateRequest) + if createErr != nil { + resp = &filer_pb.CreateEntryResponse{Error: createErr.Error()} + } + out := &filer_pb.StreamMutateEntryResponse{ + RequestId: req.RequestId, + IsLast: true, + Response: &filer_pb.StreamMutateEntryResponse_CreateResponse{CreateResponse: resp}, + } + if resp.Error != "" { + out.Error = resp.Error + out.Errno = int32(syscall.EIO) + } + if sendErr := stream.Send(out); sendErr != nil { + return sendErr + } + } + } +} + +// fakeConcurrentFilerServer mirrors the NEW concurrent handler: per-request +// goroutine, bounded by a semaphore, with a Send mutex for stream safety. +// Structurally identical to weed/server/filer_grpc_server_stream_mutate.go +// after the parallelization patch. +type fakeConcurrentFilerServer struct { + filer_pb.UnimplementedSeaweedFilerServer + serviceDelay time.Duration + concurrency int + createCalls atomic.Int64 +} + +func (s *fakeConcurrentFilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (*filer_pb.CreateEntryResponse, error) { + s.createCalls.Add(1) + if s.serviceDelay > 0 { + time.Sleep(s.serviceDelay) + } + return &filer_pb.CreateEntryResponse{}, nil +} + +func (s *fakeConcurrentFilerServer) StreamMutateEntry(stream grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse]) error { + var sendMu sync.Mutex + send := func(r *filer_pb.StreamMutateEntryResponse) error { + sendMu.Lock() + defer sendMu.Unlock() + return stream.Send(r) + } + sem := make(chan struct{}, s.concurrency) + var wg sync.WaitGroup + for { + req, err := stream.Recv() + if err == io.EOF { + wg.Wait() + return nil + } + if err != nil { + wg.Wait() + return err + } + sem <- struct{}{} + wg.Add(1) + go func(req *filer_pb.StreamMutateEntryRequest) { + defer wg.Done() + defer func() { <-sem }() + switch r := req.Request.(type) { + case *filer_pb.StreamMutateEntryRequest_CreateRequest: + resp, _ := s.CreateEntry(stream.Context(), r.CreateRequest) + _ = send(&filer_pb.StreamMutateEntryResponse{ + RequestId: req.RequestId, + IsLast: true, + Response: &filer_pb.StreamMutateEntryResponse_CreateResponse{CreateResponse: resp}, + }) + } + }(req) + } +} + +// startFakeConcurrentFilerServer spins up the concurrent-handler variant. +func startFakeConcurrentFilerServer(t testing.TB, serviceDelay time.Duration, concurrency int) (string, *fakeConcurrentFilerServer, func()) { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + srv := grpc.NewServer() + fake := &fakeConcurrentFilerServer{serviceDelay: serviceDelay, concurrency: concurrency} + filer_pb.RegisterSeaweedFilerServer(srv, fake) + go srv.Serve(lis) + return lis.Addr().String(), fake, func() { + srv.GracefulStop() + _ = lis.Close() + } +} + +// fakeSchedulerFilerServer uses the real mutateScheduler from the production +// handler to admit requests by (path, kind). Per-path operations serialize; +// cross-path operations run in parallel. +type fakeSchedulerFilerServer struct { + filer_pb.UnimplementedSeaweedFilerServer + serviceDelay time.Duration + concurrency int + createCalls atomic.Int64 + maxInFlight atomic.Int64 // sampled peak of in-flight create goroutines + curInFlight atomic.Int64 +} + +func (s *fakeSchedulerFilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (*filer_pb.CreateEntryResponse, error) { + s.createCalls.Add(1) + cur := s.curInFlight.Add(1) + for { + peak := s.maxInFlight.Load() + if cur <= peak || s.maxInFlight.CompareAndSwap(peak, cur) { + break + } + } + if s.serviceDelay > 0 { + time.Sleep(s.serviceDelay) + } + s.curInFlight.Add(-1) + return &filer_pb.CreateEntryResponse{}, nil +} + +func (s *fakeSchedulerFilerServer) StreamMutateEntry(stream grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse]) error { + var sendMu sync.Mutex + send := func(r *filer_pb.StreamMutateEntryResponse) error { + sendMu.Lock() + defer sendMu.Unlock() + return stream.Send(r) + } + sched := newMutateScheduler(s.concurrency) + var wg sync.WaitGroup + for { + req, err := stream.Recv() + if err == io.EOF { + wg.Wait() + return nil + } + if err != nil { + wg.Wait() + return err + } + primary, secondary, kind := classifyMutation(req) + sched.Admit(primary, secondary, kind) + wg.Add(1) + go func(req *filer_pb.StreamMutateEntryRequest) { + defer wg.Done() + defer sched.Done(primary, secondary, kind) + switch r := req.Request.(type) { + case *filer_pb.StreamMutateEntryRequest_CreateRequest: + resp, _ := s.CreateEntry(stream.Context(), r.CreateRequest) + _ = send(&filer_pb.StreamMutateEntryResponse{ + RequestId: req.RequestId, + IsLast: true, + Response: &filer_pb.StreamMutateEntryResponse_CreateResponse{CreateResponse: resp}, + }) + } + }(req) + } +} + +func startFakeSchedulerFilerServer(t testing.TB, serviceDelay time.Duration, concurrency int) (string, *fakeSchedulerFilerServer, func()) { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + srv := grpc.NewServer() + fake := &fakeSchedulerFilerServer{serviceDelay: serviceDelay, concurrency: concurrency} + filer_pb.RegisterSeaweedFilerServer(srv, fake) + go srv.Serve(lis) + return lis.Addr().String(), fake, func() { + srv.GracefulStop() + _ = lis.Close() + } +} + +// startFakeFilerServer spins up an in-process gRPC filer with the given per- +// request service delay, returns the dial target and a shutdown func. +func startFakeFilerServer(t testing.TB, serviceDelay time.Duration) (string, *fakeFilerServer, func()) { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + srv := grpc.NewServer() + fake := &fakeFilerServer{serviceDelay: serviceDelay} + filer_pb.RegisterSeaweedFilerServer(srv, fake) + go srv.Serve(lis) + return lis.Addr().String(), fake, func() { + srv.GracefulStop() + _ = lis.Close() + } +} + +// dialFakeFiler returns a client connection to the in-process filer. +func dialFakeFiler(t testing.TB, addr string) *grpc.ClientConn { + t.Helper() + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("dial: %v", err) + } + return conn +} + +// opsForWorker splits `ops` across `concurrency` workers, handing the +// remainder to the first `remainder` workers so the total is always exactly +// `ops`. Returns the count for worker index g. +func opsForWorker(g, concurrency, ops int) int { + per := ops / concurrency + if g < ops%concurrency { + return per + 1 + } + return per +} + +// runUnaryCreateWorkload fires `ops` CreateEntry unary RPCs across `concurrency` +// goroutines, all sharing a single client connection. Returns total duration. +func runUnaryCreateWorkload(t testing.TB, conn *grpc.ClientConn, concurrency, ops int) time.Duration { + t.Helper() + client := filer_pb.NewSeaweedFilerClient(conn) + var wg sync.WaitGroup + start := time.Now() + for g := 0; g < concurrency; g++ { + wg.Add(1) + count := opsForWorker(g, concurrency, ops) + go func() { + defer wg.Done() + for i := 0; i < count; i++ { + _, err := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ + Directory: "/", + Entry: &filer_pb.Entry{ + Name: "f", + IsDirectory: false, + }, + }) + if err != nil { + t.Errorf("unary CreateEntry: %v", err) + return + } + } + }() + } + wg.Wait() + return time.Since(start) +} + +// pathFn chooses the (directory, name) of the i'th op. The default used by the +// existing callers is samePath — all ops hit "/f". distinctPath gives every +// op its own name so cross-path parallelism can kick in. +type pathFn func(i int) (dir, name string) + +func samePath(_ int) (string, string) { return "/", "f" } +func distinctPath(i int) (string, string) { return "/", "f" + itoa(i) } + +// itoa is a tight local decimal formatter — avoids a strconv import inside a +// hot benchmark loop. +func itoa(n int) string { + if n == 0 { + return "0" + } + buf := make([]byte, 0, 12) + for n > 0 { + buf = append(buf, byte('0'+n%10)) + n /= 10 + } + for i, j := 0, len(buf)-1; i < j; i, j = i+1, j-1 { + buf[i], buf[j] = buf[j], buf[i] + } + return string(buf) +} + +// runStreamCreateWorkload fires `ops` CreateEntry ops through a SINGLE bidi +// stream, with `concurrency` goroutines all submitting to the same stream via +// a small multiplexer that mirrors weed/mount/weedfs_stream_mutate.go's +// sendLoop+recvLoop structure. Returns total duration. +func runStreamCreateWorkload(t testing.TB, conn *grpc.ClientConn, concurrency, ops int) time.Duration { + return runStreamCreateWorkloadAt(t, conn, concurrency, ops, samePath) +} + +func runStreamCreateWorkloadAt(t testing.TB, conn *grpc.ClientConn, concurrency, ops int, path pathFn) time.Duration { + t.Helper() + client := filer_pb.NewSeaweedFilerClient(conn) + stream, err := client.StreamMutateEntry(context.Background()) + if err != nil { + t.Fatalf("open stream: %v", err) + } + + var nextID atomic.Uint64 + var pending sync.Map // map[uint64]chan struct{} + + // single recv goroutine, dispatching by request id + recvErr := make(chan error, 1) + go func() { + for { + resp, err := stream.Recv() + if err != nil { + recvErr <- err + return + } + if ch, ok := pending.LoadAndDelete(resp.RequestId); ok { + close(ch.(chan struct{})) + } + } + }() + + // dedicated send goroutine (gRPC Send is not concurrent-safe) + sendCh := make(chan *filer_pb.StreamMutateEntryRequest, 512) + sendErr := make(chan error, 1) + go func() { + for req := range sendCh { + if err := stream.Send(req); err != nil { + sendErr <- err + return + } + } + sendErr <- nil + }() + + var wg sync.WaitGroup + start := time.Now() + offset := 0 + for g := 0; g < concurrency; g++ { + wg.Add(1) + count := opsForWorker(g, concurrency, ops) + startOff := offset + offset += count + go func(startOff, count int) { + defer wg.Done() + for i := 0; i < count; i++ { + id := nextID.Add(1) + done := make(chan struct{}) + pending.Store(id, done) + dir, name := path(startOff + i) + sendCh <- &filer_pb.StreamMutateEntryRequest{ + RequestId: id, + Request: &filer_pb.StreamMutateEntryRequest_CreateRequest{ + CreateRequest: &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{Name: name}, + }, + }, + } + <-done + } + }(startOff, count) + } + wg.Wait() + elapsed := time.Since(start) + + close(sendCh) + _ = stream.CloseSend() + select { + case <-sendErr: + case <-time.After(time.Second): + } + select { + case <-recvErr: + case <-time.After(time.Second): + } + return elapsed +} + +// runStreamAsyncCreateWorkload fires `ops` CreateEntry ops through a single +// bidi stream WITHOUT the client waiting for responses between sends. The +// timer starts at the first send and stops when the last response is received, +// so this measures true end-to-end throughput (not just client-send rate). +// +// This isolates the question: does relaxing client-side per-request waiting +// help throughput when the server is a serial loop? +func runStreamAsyncCreateWorkload(t testing.TB, conn *grpc.ClientConn, concurrency, ops int) time.Duration { + t.Helper() + client := filer_pb.NewSeaweedFilerClient(conn) + stream, err := client.StreamMutateEntry(context.Background()) + if err != nil { + t.Fatalf("open stream: %v", err) + } + + var nextID atomic.Uint64 + var outstanding atomic.Int64 + allDone := make(chan struct{}) + + // Single recv goroutine — signals allDone when the last response arrives. + recvErr := make(chan error, 1) + go func() { + for { + _, err := stream.Recv() + if err != nil { + recvErr <- err + return + } + if outstanding.Add(-1) == 0 { + select { + case <-allDone: + default: + close(allDone) + } + return + } + } + }() + + // Dedicated send goroutine (gRPC Send is not concurrent-safe). + sendCh := make(chan *filer_pb.StreamMutateEntryRequest, 4096) + sendErr := make(chan error, 1) + go func() { + for req := range sendCh { + if err := stream.Send(req); err != nil { + sendErr <- err + return + } + } + sendErr <- nil + }() + + outstanding.Store(int64(ops)) + + var wg sync.WaitGroup + start := time.Now() + for g := 0; g < concurrency; g++ { + wg.Add(1) + count := opsForWorker(g, concurrency, ops) + go func(count int) { + defer wg.Done() + for i := 0; i < count; i++ { + id := nextID.Add(1) + // Fire-and-forget: push to sendCh, do not wait for response. + sendCh <- &filer_pb.StreamMutateEntryRequest{ + RequestId: id, + Request: &filer_pb.StreamMutateEntryRequest_CreateRequest{ + CreateRequest: &filer_pb.CreateEntryRequest{ + Directory: "/", + Entry: &filer_pb.Entry{Name: "f"}, + }, + }, + } + } + }(count) + } + wg.Wait() + // All requests queued; wait for server to drain all responses. + <-allDone + elapsed := time.Since(start) + + close(sendCh) + _ = stream.CloseSend() + select { + case <-sendErr: + case <-time.After(time.Second): + } + select { + case <-recvErr: + case <-time.After(time.Second): + } + return elapsed +} + +// TestReproStreamSerializationCeiling runs a head-to-head: same concurrency, +// same op count, same per-request service delay. The only difference is the +// transport — unary (one goroutine per RPC at the server) vs single bidi stream +// (one goroutine for the whole stream at the server). +// +// With a 2ms per-request service delay, unary with N=12 should deliver ~6000/s +// (12 concurrent handlers, each 2ms), while stream delivers ~500/s (serial). +func TestReproStreamSerializationCeiling(t *testing.T) { + if testing.Short() { + t.Skip("skipping repro in -short mode") + } + + const ( + concurrency = 12 + opsPerRun = 1200 + serviceDelay = 2 * time.Millisecond + ) + + addr, fake, stop := startFakeFilerServer(t, serviceDelay) + defer stop() + + conn := dialFakeFiler(t, addr) + defer conn.Close() + + // warm up one call so the gRPC HTTP/2 connection is fully established + warmupClient := filer_pb.NewSeaweedFilerClient(conn) + if _, err := warmupClient.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ + Directory: "/", + Entry: &filer_pb.Entry{Name: "warmup"}, + }); err != nil { + t.Fatalf("warmup: %v", err) + } + fake.createCalls.Store(0) + + unaryDur := runUnaryCreateWorkload(t, conn, concurrency, opsPerRun) + unaryCalls := fake.createCalls.Load() + fake.createCalls.Store(0) + + streamDur := runStreamCreateWorkload(t, conn, concurrency, opsPerRun) + streamCalls := fake.createCalls.Load() + + unaryQPS := float64(unaryCalls) / unaryDur.Seconds() + streamQPS := float64(streamCalls) / streamDur.Seconds() + + t.Logf("service delay: %v", serviceDelay) + t.Logf("concurrency: %d", concurrency) + t.Logf("ops per run: %d", opsPerRun) + t.Logf("unary : %d calls in %v -> %.0f QPS", unaryCalls, unaryDur, unaryQPS) + t.Logf("stream : %d calls in %v -> %.0f QPS", streamCalls, streamDur, streamQPS) + t.Logf("unary / stream ratio: %.2fx", unaryQPS/streamQPS) + + // The serial stream handler cannot exceed 1 / serviceDelay regardless of + // client concurrency. Assert that unary is meaningfully faster — if the + // server is ever parallelized, this assertion is safe to revisit. + if unaryQPS < streamQPS*2 { + t.Fatalf("expected unary to be at least 2x faster than stream, got unary=%.0f stream=%.0f", + unaryQPS, streamQPS) + } + + // Stream QPS should cluster near the theoretical ceiling of 1/serviceDelay. + theoreticalMax := 1.0 / serviceDelay.Seconds() + if streamQPS > theoreticalMax*1.25 { + t.Fatalf("stream QPS %.0f exceeds theoretical serial ceiling %.0f — handler may have changed", + streamQPS, theoreticalMax) + } +} + +// TestServerSerialVsConcurrentHandler measures the lift from changing the +// server-side handler from serial (one goroutine per stream) to concurrent +// (per-request goroutine, bounded by a semaphore, Send protected by a mutex). +// +// Client is identical in both runs (sync stream, N concurrent submitters). +// Only the server handler differs. +func TestServerSerialVsConcurrentHandler(t *testing.T) { + if testing.Short() { + t.Skip("skipping repro in -short mode") + } + + const ( + opsPerRun = 2400 + serviceDelay = 2 * time.Millisecond + serverConcurrency = 64 + ) + + // Serial server (the OLD handler). + serialAddr, serialFake, stopSerial := startFakeFilerServer(t, serviceDelay) + defer stopSerial() + serialConn := dialFakeFiler(t, serialAddr) + defer serialConn.Close() + + // Concurrent server (the NEW handler). + concAddr, concFake, stopConc := startFakeConcurrentFilerServer(t, serviceDelay, serverConcurrency) + defer stopConc() + concConn := dialFakeFiler(t, concAddr) + defer concConn.Close() + + // Warm up both connections. + for _, c := range []*grpc.ClientConn{serialConn, concConn} { + client := filer_pb.NewSeaweedFilerClient(c) + if _, err := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ + Directory: "/", + Entry: &filer_pb.Entry{Name: "warmup"}, + }); err != nil { + t.Fatalf("warmup: %v", err) + } + } + + clientConcurrencies := []int{1, 12, 64, 256} + theoreticalSerial := 1.0 / serviceDelay.Seconds() + theoreticalConcurrent := float64(serverConcurrency) / serviceDelay.Seconds() + + t.Logf("service delay: %v", serviceDelay) + t.Logf("server-side concurrency cap: %d", serverConcurrency) + t.Logf("theoretical serial ceiling: %.0f QPS", theoreticalSerial) + t.Logf("theoretical concurrent cap: %.0f QPS", theoreticalConcurrent) + t.Logf("ops per run: %d", opsPerRun) + t.Logf("") + t.Logf("%-16s %-14s %-14s %-10s", + "client workers", "serial QPS", "concurrent QPS", "lift") + + for _, c := range clientConcurrencies { + serialFake.createCalls.Store(0) + serialDur := runStreamCreateWorkload(t, serialConn, c, opsPerRun) + serialCalls := serialFake.createCalls.Load() + serialQPS := float64(serialCalls) / serialDur.Seconds() + + concFake.createCalls.Store(0) + concDur := runStreamCreateWorkload(t, concConn, c, opsPerRun) + concCalls := concFake.createCalls.Load() + concQPS := float64(concCalls) / concDur.Seconds() + + t.Logf("%-16d %-14.0f %-14.0f %.2fx", + c, serialQPS, concQPS, concQPS/serialQPS) + } +} + +// TestSchedulerOrderedParallelism compares three server-side handler shapes +// under two workloads: +// +// serial : OLD handler — one goroutine per stream +// sem : NEW handler v1 — per-request goroutine + semaphore, no ordering +// scheduler : NEW handler v2 — per-path admission + subtree barriers +// (mirrors filer.sync's MetadataProcessor) +// +// Workload "distinct" puts each op on its own path; workload "same" puts every +// op on the same path. +// +// Expected shape: +// - serial — both workloads ~1/serviceDelay (no parallelism) +// - sem — both workloads lift ~serverConcurrency× (wrong: reorders +// same-path ops, acceptable for our filer store but undesired) +// - scheduler — distinct workload lifts ~serverConcurrency× +// same workload falls back to ~1/serviceDelay (correctness) +func TestSchedulerOrderedParallelism(t *testing.T) { + if testing.Short() { + t.Skip("skipping repro in -short mode") + } + + const ( + opsPerRun = 2400 + serviceDelay = 2 * time.Millisecond + serverConcurrency = 64 + clientWorkers = 12 + ) + + // Serial, sem-only concurrent, scheduler-based concurrent. + serialAddr, serialFake, stopSerial := startFakeFilerServer(t, serviceDelay) + defer stopSerial() + serialConn := dialFakeFiler(t, serialAddr) + defer serialConn.Close() + + semAddr, semFake, stopSem := startFakeConcurrentFilerServer(t, serviceDelay, serverConcurrency) + defer stopSem() + semConn := dialFakeFiler(t, semAddr) + defer semConn.Close() + + schedAddr, schedFake, stopSched := startFakeSchedulerFilerServer(t, serviceDelay, serverConcurrency) + defer stopSched() + schedConn := dialFakeFiler(t, schedAddr) + defer schedConn.Close() + + // Warm up all three connections. + for _, c := range []*grpc.ClientConn{serialConn, semConn, schedConn} { + client := filer_pb.NewSeaweedFilerClient(c) + if _, err := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ + Directory: "/", + Entry: &filer_pb.Entry{Name: "warmup"}, + }); err != nil { + t.Fatalf("warmup: %v", err) + } + } + + t.Logf("service delay: %v", serviceDelay) + t.Logf("server concurrency: %d", serverConcurrency) + t.Logf("client workers: %d", clientWorkers) + t.Logf("ops per run: %d", opsPerRun) + t.Logf("serial ceiling: %.0f QPS", 1.0/serviceDelay.Seconds()) + t.Logf("concurrent ceiling: %.0f QPS (%d x %.0f)", float64(serverConcurrency)/serviceDelay.Seconds(), serverConcurrency, 1.0/serviceDelay.Seconds()) + t.Logf("") + + workloads := []struct { + name string + fn pathFn + }{ + {"distinct paths", distinctPath}, + {"same path ", samePath}, + } + + t.Logf("%-16s %-14s %-14s %-14s %-14s", + "workload", "serial QPS", "sem QPS", "sched QPS", "sched peak-conc") + + for _, w := range workloads { + serialFake.createCalls.Store(0) + serialDur := runStreamCreateWorkloadAt(t, serialConn, clientWorkers, opsPerRun, w.fn) + serialQPS := float64(serialFake.createCalls.Load()) / serialDur.Seconds() + + semFake.createCalls.Store(0) + semDur := runStreamCreateWorkloadAt(t, semConn, clientWorkers, opsPerRun, w.fn) + semQPS := float64(semFake.createCalls.Load()) / semDur.Seconds() + + schedFake.createCalls.Store(0) + schedFake.maxInFlight.Store(0) + schedDur := runStreamCreateWorkloadAt(t, schedConn, clientWorkers, opsPerRun, w.fn) + schedQPS := float64(schedFake.createCalls.Load()) / schedDur.Seconds() + schedPeak := schedFake.maxInFlight.Load() + + t.Logf("%-16s %-14.0f %-14.0f %-14.0f %d", + w.name, serialQPS, semQPS, schedQPS, schedPeak) + } +} + +// TestStreamSyncVsAsyncClient measures whether making the CLIENT side +// asynchronous (fire-and-forget, no per-request waits) lifts the server-set +// ceiling. It runs sync and async workloads against the same serial server +// and compares end-to-end throughput. +// +// Expectation: at steady state, async cannot exceed 1/serviceDelay because the +// server still processes one request at a time. Any "improvement" is transient +// (in-flight pipelining up to the HTTP/2 window) and disappears as ops >> window. +func TestStreamSyncVsAsyncClient(t *testing.T) { + if testing.Short() { + t.Skip("skipping repro in -short mode") + } + + const ( + opsPerRun = 2400 + serviceDelay = 2 * time.Millisecond + ) + + addr, fake, stop := startFakeFilerServer(t, serviceDelay) + defer stop() + + conn := dialFakeFiler(t, addr) + defer conn.Close() + + warmupClient := filer_pb.NewSeaweedFilerClient(conn) + if _, err := warmupClient.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ + Directory: "/", + Entry: &filer_pb.Entry{Name: "warmup"}, + }); err != nil { + t.Fatalf("warmup: %v", err) + } + + // Sweep concurrency to show that async does not break the 1/serviceDelay + // ceiling no matter how much client-side parallelism we throw at it. + concurrencies := []int{1, 12, 64, 256} + t.Logf("service delay: %v (theoretical serial ceiling = %.0f QPS)", + serviceDelay, 1.0/serviceDelay.Seconds()) + t.Logf("ops per run: %d", opsPerRun) + t.Logf("") + t.Logf("%-12s %-12s %-12s %-10s", "concurrency", "sync QPS", "async QPS", "ratio") + + for _, c := range concurrencies { + fake.createCalls.Store(0) + syncDur := runStreamCreateWorkload(t, conn, c, opsPerRun) + syncCalls := fake.createCalls.Load() + syncQPS := float64(syncCalls) / syncDur.Seconds() + + fake.createCalls.Store(0) + asyncDur := runStreamAsyncCreateWorkload(t, conn, c, opsPerRun) + asyncCalls := fake.createCalls.Load() + asyncQPS := float64(asyncCalls) / asyncDur.Seconds() + + t.Logf("%-12d %-12.0f %-12.0f %-10.2fx", + c, syncQPS, asyncQPS, asyncQPS/syncQPS) + } +} diff --git a/weed/server/filer_grpc_server_stream_mutate_scheduler.go b/weed/server/filer_grpc_server_stream_mutate_scheduler.go new file mode 100644 index 000000000..91efa3def --- /dev/null +++ b/weed/server/filer_grpc_server_stream_mutate_scheduler.go @@ -0,0 +1,379 @@ +package weed_server + +import ( + "path" + "sync" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// mutateJobKind classifies a streamed mutation for conflict detection. This is +// the same taxonomy MetadataProcessor uses in weed/command/filer_sync_jobs.go: +// separate a "barrier" directory event (create/delete/rename) from an in-place +// attribute-only directory update so that mtime/xattr bumps do not serialize +// every file operation in the subtree. +type mutateJobKind int + +const ( + // kindMutateFile: a regular file event. Conflicts with any in-flight job + // on the same path and with any barrier directory on the same path or on + // an ancestor. + kindMutateFile mutateJobKind = iota + // kindMutateBarrierDir: a directory create, a directory rename, or a + // delete (target type unknown on the wire). Acts as a subtree barrier: + // must drain every active descendant, and blocks every new job under it + // until it completes. + kindMutateBarrierDir + // kindMutateNonBarrierDir: an in-place directory attribute update + // (mode / xattr / mtime with unchanged name). Conflicts with any other + // in-flight job at the same path, but never with descendants. + kindMutateNonBarrierDir +) + +// mutateScheduler serializes mutations by path while allowing cross-path work +// to run in parallel. The conflict taxonomy mirrors filer.sync's +// MetadataProcessor (weed/command/filer_sync_jobs.go); admission adds +// per-path FIFO ordering so two requests arriving on the same stream on the +// same path are always processed in arrival order, even when a Cond broadcast +// would otherwise race them. +// +// Invariants: +// - pathQueue[p] contains every waiter (pending + admitted) that is +// interested in path p, in arrival order. admit dequeues nothing; Done +// dequeues the head. +// - A waiter is admissible iff it is the head of every path queue it +// joined (primary and secondary), pathConflictsLocked passes for each +// path against *active* state, and totalActive < concurrencyLimit. +type mutateScheduler struct { + concurrencyLimit int + + mu sync.Mutex + + totalActive int + activeFilePaths map[util.FullPath]int + activeBarrierDirPaths map[util.FullPath]int + activeNonBarrierDirPaths map[util.FullPath]int + descendantCount map[util.FullPath]int + + // pathQueue maps each path to the FIFO list of waiters (pending or + // admitted) interested in it. Entries are removed by Done. + pathQueue map[util.FullPath][]*mutateWaiter +} + +// mutateWaiter is one outstanding Admit call. admitted flips under mu when +// the waiter moves from pending to active; ready is closed at the same time +// so the Admit caller unblocks. +type mutateWaiter struct { + primary, secondary util.FullPath + kind mutateJobKind + admitted bool + ready chan struct{} +} + +func newMutateScheduler(concurrency int) *mutateScheduler { + return &mutateScheduler{ + concurrencyLimit: concurrency, + activeFilePaths: make(map[util.FullPath]int), + activeBarrierDirPaths: make(map[util.FullPath]int), + activeNonBarrierDirPaths: make(map[util.FullPath]int), + descendantCount: make(map[util.FullPath]int), + pathQueue: make(map[util.FullPath][]*mutateWaiter), + } +} + +// Admit blocks until this (primary, secondary, kind) tuple can be admitted +// without violating the conflict rules and without exceeding concurrencyLimit, +// and until every earlier waiter on any of its paths has itself been admitted. +// On return the job is registered in the indexes; the caller must call Done +// with the same arguments when the work is finished. +// +// For single-path operations (create / update / delete) pass secondary="". +// For rename, pass old path as primary and new path as secondary; kind is +// kindMutateBarrierDir. +func (s *mutateScheduler) Admit(primary, secondary util.FullPath, kind mutateJobKind) { + w := &mutateWaiter{ + primary: primary, + secondary: secondary, + kind: kind, + ready: make(chan struct{}), + } + + s.mu.Lock() + s.pathQueue[primary] = append(s.pathQueue[primary], w) + if secondary != "" && secondary != primary { + s.pathQueue[secondary] = append(s.pathQueue[secondary], w) + } + s.tryPromoteLocked() + s.mu.Unlock() + + <-w.ready +} + +// Done releases the slot reserved by Admit and promotes any waiters that +// became admissible as a result. Must be called exactly once per successful +// Admit with the same arguments. +func (s *mutateScheduler) Done(primary, secondary util.FullPath, kind mutateJobKind) { + s.mu.Lock() + defer s.mu.Unlock() + + s.removePathLocked(primary, kind) + if secondary != "" { + s.removePathLocked(secondary, kind) + } + s.totalActive-- + + s.dequeueHeadLocked(primary) + if secondary != "" && secondary != primary { + s.dequeueHeadLocked(secondary) + } + + s.tryPromoteLocked() +} + +// dequeueHeadLocked removes the current head of path p and deletes the map +// entry when the queue becomes empty. Must be called under s.mu. +func (s *mutateScheduler) dequeueHeadLocked(p util.FullPath) { + q := s.pathQueue[p] + if len(q) == 0 { + return + } + // Shift left without reallocating; drop the reference so the waiter can + // be garbage-collected before the tail shrinks. + q[0] = nil + copy(q, q[1:]) + q = q[:len(q)-1] + if len(q) == 0 { + delete(s.pathQueue, p) + } else { + s.pathQueue[p] = q + } +} + +// tryPromoteLocked admits as many queue heads as possible while respecting +// path-FIFO order, the active-state conflict rules, and concurrencyLimit. +// The admitted set can grow in one call because admitting one waiter frees +// zero or more paths whose new heads may now pass the conflict check. +// Must be called under s.mu. +func (s *mutateScheduler) tryPromoteLocked() { + for s.totalActive < s.concurrencyLimit { + promoted := false + // Walk distinct head waiters across all path queues. Map iteration + // order is randomized, which is fine: path-FIFO is preserved by the + // head-of-queue check inside admitIfHeadLocked, and cross-path order + // is not constrained. + for _, q := range s.pathQueue { + if len(q) == 0 { + continue + } + w := q[0] + if w.admitted { + continue + } + if s.admitIfHeadLocked(w) { + promoted = true + if s.totalActive >= s.concurrencyLimit { + return + } + } + } + if !promoted { + return + } + } +} + +// admitIfHeadLocked admits w if w is the head of every path queue it joined +// and pathConflictsLocked passes. Returns true if admitted. Must be called +// under s.mu. +func (s *mutateScheduler) admitIfHeadLocked(w *mutateWaiter) bool { + if s.pathQueue[w.primary][0] != w { + return false + } + if w.secondary != "" && w.secondary != w.primary { + q := s.pathQueue[w.secondary] + if len(q) == 0 || q[0] != w { + return false + } + } + if s.pathConflictsLocked(w.primary, w.kind) { + return false + } + if w.secondary != "" && s.pathConflictsLocked(w.secondary, w.kind) { + return false + } + s.addPathLocked(w.primary, w.kind) + if w.secondary != "" { + s.addPathLocked(w.secondary, w.kind) + } + s.totalActive++ + w.admitted = true + close(w.ready) + return true +} + +// pathConflictsLocked mirrors MetadataProcessor.pathConflicts exactly. +func (s *mutateScheduler) pathConflictsLocked(p util.FullPath, kind mutateJobKind) bool { + if s.activeBarrierDirPaths[p] > 0 { + return true + } + if kind == kindMutateBarrierDir && s.activeNonBarrierDirPaths[p] > 0 { + return true + } + if s.activeFilePaths[p] > 0 && (kind == kindMutateFile || kind == kindMutateBarrierDir) { + return true + } + if kind == kindMutateBarrierDir && s.descendantCount[p] > 0 { + return true + } + for _, ancestor := range mutatePathAncestors(p) { + if s.activeBarrierDirPaths[ancestor] > 0 { + return true + } + } + return false +} + +func (s *mutateScheduler) addPathLocked(p util.FullPath, kind mutateJobKind) { + switch kind { + case kindMutateFile: + s.activeFilePaths[p]++ + case kindMutateBarrierDir: + s.activeBarrierDirPaths[p]++ + case kindMutateNonBarrierDir: + s.activeNonBarrierDirPaths[p]++ + } + for _, ancestor := range mutatePathAncestors(p) { + s.descendantCount[ancestor]++ + } +} + +func (s *mutateScheduler) removePathLocked(p util.FullPath, kind mutateJobKind) { + switch kind { + case kindMutateFile: + if s.activeFilePaths[p] <= 1 { + delete(s.activeFilePaths, p) + } else { + s.activeFilePaths[p]-- + } + case kindMutateBarrierDir: + if s.activeBarrierDirPaths[p] <= 1 { + delete(s.activeBarrierDirPaths, p) + } else { + s.activeBarrierDirPaths[p]-- + } + case kindMutateNonBarrierDir: + if s.activeNonBarrierDirPaths[p] <= 1 { + delete(s.activeNonBarrierDirPaths, p) + } else { + s.activeNonBarrierDirPaths[p]-- + } + } + for _, ancestor := range mutatePathAncestors(p) { + if s.descendantCount[ancestor] <= 1 { + delete(s.descendantCount, ancestor) + } else { + s.descendantCount[ancestor]-- + } + } +} + +// mutatePathAncestors mirrors filer_sync_jobs.go:pathAncestors — returns the +// proper ancestor directories of p. For "/a/b/c", returns ["/a/b", "/a", "/"]. +func mutatePathAncestors(p util.FullPath) []util.FullPath { + var ancestors []util.FullPath + s := string(p) + for { + parent := path.Dir(s) + if parent == s { + break + } + ancestors = append(ancestors, util.FullPath(parent)) + s = parent + } + return ancestors +} + +// classifyMutation extracts the admission key(s) and kind for a mutation +// request. It returns primary, secondary (empty unless rename), and kind. +// +// Malformed requests (missing oneof payload, nil Entry, missing rename fields) +// are routed to a barrier at "/" so admission still runs under the full stream +// lock; the handler will then send EINVAL to the client. This keeps the +// scheduler and Recv loop crash-free regardless of client-side validation. +// +// Deletes are always classified as kindMutateBarrierDir because a +// DeleteEntryRequest can target a directory (empty or, with IsRecursive, with +// contents) but does not carry IsDirectory on the wire. Treating every delete +// as a barrier at its target path makes it conflict with an in-flight +// non-barrier directory update on the same path (e.g. chmod), which a +// kindMutateFile classification would miss. +func classifyMutation(req *filer_pb.StreamMutateEntryRequest) (primary, secondary util.FullPath, kind mutateJobKind) { + // Default fallback for any shape we cannot classify safely. + primary = util.FullPath("/") + kind = kindMutateBarrierDir + + switch r := req.Request.(type) { + + case *filer_pb.StreamMutateEntryRequest_CreateRequest: + cr := r.CreateRequest + if cr == nil || cr.Entry == nil { + return + } + primary = util.FullPath(cr.Directory).Child(cr.Entry.Name) + kind = classifyEntry(cr.Entry.IsDirectory, false) + return + + case *filer_pb.StreamMutateEntryRequest_UpdateRequest: + ur := r.UpdateRequest + if ur == nil || ur.Entry == nil { + return + } + primary = util.FullPath(ur.Directory).Child(ur.Entry.Name) + // UpdateEntry never changes the name, so directory updates are always + // in-place attribute updates. File updates (chunk manifests, xattrs) + // are kindMutateFile and thus serialize against same-path file ops. + kind = classifyEntry(ur.Entry.IsDirectory, true) + return + + case *filer_pb.StreamMutateEntryRequest_DeleteRequest: + dr := r.DeleteRequest + if dr == nil { + return + } + primary = util.FullPath(dr.Directory).Child(dr.Name) + // Barrier regardless of IsRecursive: the request does not carry the + // target's IsDirectory, and barrier classification correctly blocks + // concurrent non-barrier dir updates at the same path. Descendant + // wait for a non-recursive delete of a non-empty dir is wasted but + // not wrong — that call fails at the store anyway. + kind = kindMutateBarrierDir + return + + case *filer_pb.StreamMutateEntryRequest_RenameRequest: + rr := r.RenameRequest + if rr == nil { + return + } + primary = util.FullPath(rr.OldDirectory).Child(rr.OldName) + secondary = util.FullPath(rr.NewDirectory).Child(rr.NewName) + // Renames reshape the namespace on both sides; conservatively treat as + // a subtree barrier so any in-flight descendant drains before the move + // and no new descendant is admitted until it completes. + kind = kindMutateBarrierDir + return + + default: + return + } +} + +func classifyEntry(isDirectory, isAttributeUpdate bool) mutateJobKind { + if !isDirectory { + return kindMutateFile + } + if isAttributeUpdate { + return kindMutateNonBarrierDir + } + return kindMutateBarrierDir +} diff --git a/weed/server/filer_grpc_server_stream_mutate_scheduler_test.go b/weed/server/filer_grpc_server_stream_mutate_scheduler_test.go new file mode 100644 index 000000000..ac8b6bfec --- /dev/null +++ b/weed/server/filer_grpc_server_stream_mutate_scheduler_test.go @@ -0,0 +1,528 @@ +package weed_server + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// TestMutatePathAncestors mirrors TestPathAncestors in filer_sync_jobs_test.go. +func TestMutatePathAncestors(t *testing.T) { + tests := []struct { + path util.FullPath + want []util.FullPath + }{ + {"/a/b/c/file.txt", []util.FullPath{"/a/b/c", "/a/b", "/a", "/"}}, + {"/a/b", []util.FullPath{"/a", "/"}}, + {"/a", []util.FullPath{"/"}}, + {"/", nil}, + } + for _, tt := range tests { + got := mutatePathAncestors(tt.path) + if len(got) != len(tt.want) { + t.Errorf("mutatePathAncestors(%q) = %v, want %v", tt.path, got, tt.want) + continue + } + for i := range got { + if got[i] != tt.want[i] { + t.Errorf("mutatePathAncestors(%q)[%d] = %q, want %q", tt.path, i, got[i], tt.want[i]) + } + } + } +} + +// TestClassifyMutation covers the four request shapes and the delete-recursive +// barrier upgrade. +func TestClassifyMutation(t *testing.T) { + tests := []struct { + name string + req *filer_pb.StreamMutateEntryRequest + wantPrimary util.FullPath + wantSecondary util.FullPath + wantKind mutateJobKind + }{ + { + name: "create file", + req: &filer_pb.StreamMutateEntryRequest{ + Request: &filer_pb.StreamMutateEntryRequest_CreateRequest{ + CreateRequest: &filer_pb.CreateEntryRequest{ + Directory: "/a", + Entry: &filer_pb.Entry{Name: "f", IsDirectory: false}, + }, + }, + }, + wantPrimary: "/a/f", + wantKind: kindMutateFile, + }, + { + name: "create directory (barrier)", + req: &filer_pb.StreamMutateEntryRequest{ + Request: &filer_pb.StreamMutateEntryRequest_CreateRequest{ + CreateRequest: &filer_pb.CreateEntryRequest{ + Directory: "/a", + Entry: &filer_pb.Entry{Name: "d", IsDirectory: true}, + }, + }, + }, + wantPrimary: "/a/d", + wantKind: kindMutateBarrierDir, + }, + { + name: "update file", + req: &filer_pb.StreamMutateEntryRequest{ + Request: &filer_pb.StreamMutateEntryRequest_UpdateRequest{ + UpdateRequest: &filer_pb.UpdateEntryRequest{ + Directory: "/a", + Entry: &filer_pb.Entry{Name: "f", IsDirectory: false}, + }, + }, + }, + wantPrimary: "/a/f", + wantKind: kindMutateFile, + }, + { + name: "update directory (non-barrier attr bump)", + req: &filer_pb.StreamMutateEntryRequest{ + Request: &filer_pb.StreamMutateEntryRequest_UpdateRequest{ + UpdateRequest: &filer_pb.UpdateEntryRequest{ + Directory: "/", + Entry: &filer_pb.Entry{Name: "a", IsDirectory: true}, + }, + }, + }, + wantPrimary: "/a", + wantKind: kindMutateNonBarrierDir, + }, + { + name: "delete non-recursive (barrier: target type unknown)", + req: &filer_pb.StreamMutateEntryRequest{ + Request: &filer_pb.StreamMutateEntryRequest_DeleteRequest{ + DeleteRequest: &filer_pb.DeleteEntryRequest{ + Directory: "/a", Name: "f", + }, + }, + }, + wantPrimary: "/a/f", + wantKind: kindMutateBarrierDir, + }, + { + name: "delete recursive (barrier)", + req: &filer_pb.StreamMutateEntryRequest{ + Request: &filer_pb.StreamMutateEntryRequest_DeleteRequest{ + DeleteRequest: &filer_pb.DeleteEntryRequest{ + Directory: "/a", Name: "d", IsRecursive: true, + }, + }, + }, + wantPrimary: "/a/d", + wantKind: kindMutateBarrierDir, + }, + { + name: "malformed create (nil Entry)", + req: &filer_pb.StreamMutateEntryRequest{ + Request: &filer_pb.StreamMutateEntryRequest_CreateRequest{ + CreateRequest: &filer_pb.CreateEntryRequest{Directory: "/a"}, + }, + }, + wantPrimary: "/", + wantKind: kindMutateBarrierDir, + }, + { + name: "malformed update (nil Entry)", + req: &filer_pb.StreamMutateEntryRequest{ + Request: &filer_pb.StreamMutateEntryRequest_UpdateRequest{ + UpdateRequest: &filer_pb.UpdateEntryRequest{Directory: "/a"}, + }, + }, + wantPrimary: "/", + wantKind: kindMutateBarrierDir, + }, + { + name: "empty oneof", + req: &filer_pb.StreamMutateEntryRequest{}, + wantPrimary: "/", + wantKind: kindMutateBarrierDir, + }, + { + name: "rename", + req: &filer_pb.StreamMutateEntryRequest{ + Request: &filer_pb.StreamMutateEntryRequest_RenameRequest{ + RenameRequest: &filer_pb.StreamRenameEntryRequest{ + OldDirectory: "/src", OldName: "a", + NewDirectory: "/dst", NewName: "b", + }, + }, + }, + wantPrimary: "/src/a", + wantSecondary: "/dst/b", + wantKind: kindMutateBarrierDir, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, s, k := classifyMutation(tt.req) + if p != tt.wantPrimary { + t.Errorf("primary = %q, want %q", p, tt.wantPrimary) + } + if s != tt.wantSecondary { + t.Errorf("secondary = %q, want %q", s, tt.wantSecondary) + } + if k != tt.wantKind { + t.Errorf("kind = %v, want %v", k, tt.wantKind) + } + }) + } +} + +// TestPathConflictSamePathFile: two file ops on the same path conflict; on +// different paths they do not. +func TestPathConflictSamePathFile(t *testing.T) { + s := newMutateScheduler(100) + s.addPathLocked("/a/f", kindMutateFile) + + if !s.pathConflictsLocked("/a/f", kindMutateFile) { + t.Error("same-path file+file should conflict") + } + if s.pathConflictsLocked("/a/g", kindMutateFile) { + t.Error("different-path file+file should not conflict") + } +} + +// TestPathConflictBarrierBlocksSamePath: a barrier dir in flight at /a blocks +// any new job (file, barrier, non-barrier) at /a. +func TestPathConflictBarrierBlocksSamePath(t *testing.T) { + s := newMutateScheduler(100) + s.addPathLocked("/a", kindMutateBarrierDir) + + for _, k := range []mutateJobKind{kindMutateFile, kindMutateBarrierDir, kindMutateNonBarrierDir} { + if !s.pathConflictsLocked("/a", k) { + t.Errorf("barrier at /a should block kind %v at /a", k) + } + } +} + +// TestPathConflictBarrierBlocksDescendants: a barrier at /a blocks any job +// strictly under /a, regardless of kind. +func TestPathConflictBarrierBlocksDescendants(t *testing.T) { + s := newMutateScheduler(100) + s.addPathLocked("/a", kindMutateBarrierDir) + + for _, child := range []util.FullPath{"/a/f", "/a/b", "/a/b/c"} { + for _, k := range []mutateJobKind{kindMutateFile, kindMutateBarrierDir, kindMutateNonBarrierDir} { + if !s.pathConflictsLocked(child, k) { + t.Errorf("barrier at /a should block %v at %q", k, child) + } + } + } +} + +// TestPathConflictIncomingBarrierWaitsForDescendants: a new barrier at /a must +// wait for any active descendant (even a file) to drain. +func TestPathConflictIncomingBarrierWaitsForDescendants(t *testing.T) { + s := newMutateScheduler(100) + s.addPathLocked("/a/f", kindMutateFile) + + if !s.pathConflictsLocked("/a", kindMutateBarrierDir) { + t.Error("incoming barrier at /a should wait for active file at /a/f") + } +} + +// TestPathConflictNonBarrierDirAllowsDescendants: an in-flight attribute-only +// dir update at /a does NOT block descendants. This is the filer.sync +// optimization that prevents mtime bumps from serializing file writes. +func TestPathConflictNonBarrierDirAllowsDescendants(t *testing.T) { + s := newMutateScheduler(100) + s.addPathLocked("/a", kindMutateNonBarrierDir) + + if s.pathConflictsLocked("/a/f", kindMutateFile) { + t.Error("non-barrier dir update at /a should not block file at /a/f") + } + if s.pathConflictsLocked("/a/b", kindMutateNonBarrierDir) { + t.Error("non-barrier dir update at /a should not block another non-barrier at /a/b") + } +} + +// TestPathConflictIncomingBarrierWaitsForSamePathNonBarrier: a delete/rename on +// a dir must wait for an in-flight chmod/xattr/mtime update at the same dir. +func TestPathConflictIncomingBarrierWaitsForSamePathNonBarrier(t *testing.T) { + s := newMutateScheduler(100) + s.addPathLocked("/a", kindMutateNonBarrierDir) + + if !s.pathConflictsLocked("/a", kindMutateBarrierDir) { + t.Error("incoming barrier at /a should wait for non-barrier update at /a") + } + // But non-barrier vs non-barrier at the same path may overlap (attr bumps). + if s.pathConflictsLocked("/a", kindMutateNonBarrierDir) { + t.Error("non-barrier vs non-barrier at /a should not conflict") + } +} + +// TestAdmitDoneLifecycle: Admit marks state; Done unmarks; a subsequent +// conflicting Admit that was blocked must unblock after Done. +func TestAdmitDoneLifecycle(t *testing.T) { + s := newMutateScheduler(100) + s.Admit("/a/f", "", kindMutateFile) + // State is registered. + s.mu.Lock() + if s.activeFilePaths["/a/f"] != 1 { + s.mu.Unlock() + t.Fatalf("activeFilePaths[/a/f] = %d, want 1", s.activeFilePaths["/a/f"]) + } + s.mu.Unlock() + + // Start a blocked admit on the same path; it must not complete until Done. + admitted := make(chan struct{}) + go func() { + s.Admit("/a/f", "", kindMutateFile) + close(admitted) + }() + select { + case <-admitted: + t.Fatal("second Admit should block while first is active") + case <-time.After(50 * time.Millisecond): + } + + s.Done("/a/f", "", kindMutateFile) + + select { + case <-admitted: + case <-time.After(time.Second): + t.Fatal("second Admit did not unblock after Done") + } + + // Clean up. + s.Done("/a/f", "", kindMutateFile) +} + +// TestAdmitParallelDistinctPaths: different paths do not block each other. +func TestAdmitParallelDistinctPaths(t *testing.T) { + s := newMutateScheduler(100) + done := make(chan struct{}, 3) + for i, p := range []util.FullPath{"/a", "/b", "/c"} { + go func(p util.FullPath, i int) { + s.Admit(p, "", kindMutateFile) + done <- struct{}{} + }(p, i) + } + for i := 0; i < 3; i++ { + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("distinct-path Admits blocked each other") + } + } + for _, p := range []util.FullPath{"/a", "/b", "/c"} { + s.Done(p, "", kindMutateFile) + } +} + +// TestAdmitConcurrencyCap: with concurrencyLimit=2 and three Admits, the +// third must block until one of the first two calls Done. +func TestAdmitConcurrencyCap(t *testing.T) { + s := newMutateScheduler(2) + s.Admit("/a", "", kindMutateFile) + s.Admit("/b", "", kindMutateFile) + + admittedThird := make(chan struct{}) + go func() { + s.Admit("/c", "", kindMutateFile) + close(admittedThird) + }() + select { + case <-admittedThird: + t.Fatal("third Admit should block when cap is reached") + case <-time.After(50 * time.Millisecond): + } + + s.Done("/a", "", kindMutateFile) + select { + case <-admittedThird: + case <-time.After(time.Second): + t.Fatal("third Admit did not unblock after Done") + } + + s.Done("/b", "", kindMutateFile) + s.Done("/c", "", kindMutateFile) +} + +// TestAdmitRenameTwoPathConflict: a rename holds both src and dst. A later +// single-path admit on either must block until Done clears both. +func TestAdmitRenameTwoPathConflict(t *testing.T) { + s := newMutateScheduler(100) + s.Admit("/src/a", "/dst/b", kindMutateBarrierDir) + + for _, blocked := range []util.FullPath{"/src/a", "/dst/b"} { + admitted := make(chan struct{}) + go func(p util.FullPath) { + s.Admit(p, "", kindMutateFile) + close(admitted) + }(blocked) + select { + case <-admitted: + t.Fatalf("Admit at %q should block while rename holds both paths", blocked) + case <-time.After(20 * time.Millisecond): + } + } + + s.Done("/src/a", "/dst/b", kindMutateBarrierDir) + + // Two waiters now become admissible; drain them. + deadline := time.After(time.Second) + drained := 0 + for drained < 2 { + select { + case <-deadline: + t.Fatalf("only %d waiters unblocked after Done", drained) + default: + // Attempt to Done both waiters; they may still be racing to Admit. + time.Sleep(10 * time.Millisecond) + s.mu.Lock() + active := s.totalActive + s.mu.Unlock() + if active == 2 { + s.Done("/src/a", "", kindMutateFile) + s.Done("/dst/b", "", kindMutateFile) + drained = 2 + } + } + } +} + +// waitQueueLen blocks until pathQueue[p] has exactly n entries, or fails the +// test on timeout. Uses the scheduler's own mutex to observe state so the +// caller never needs a sleep to "let things settle". +func waitQueueLen(t *testing.T, s *mutateScheduler, p util.FullPath, n int) { + t.Helper() + deadline := time.Now().Add(time.Second) + for { + s.mu.Lock() + got := len(s.pathQueue[p]) + s.mu.Unlock() + if got == n { + return + } + if time.Now().After(deadline) { + t.Fatalf("pathQueue[%q] len = %d, want %d (timeout)", p, got, n) + } + time.Sleep(100 * time.Microsecond) + } +} + +// TestAdmitSamePathFIFO verifies arrival order is preserved for same-path +// admits. Regression test for the Cond.Broadcast race where later admits +// could be woken and registered before earlier ones. +// +// The arrival ordering is made deterministic by observing the scheduler's +// per-path queue length between spawns — no sleep-based fudging. +func TestAdmitSamePathFIFO(t *testing.T) { + s := newMutateScheduler(100) + + // A barrier on /a holds the whole path while we enqueue N waiters. + // After this call pathQueue["/a"] has 1 entry (the barrier holder). + s.Admit("/a", "", kindMutateBarrierDir) + + const N = 20 + order := make(chan int, N) + for i := 0; i < N; i++ { + i := i + go func() { + s.Admit("/a", "", kindMutateFile) + order <- i + s.Done("/a", "", kindMutateFile) + }() + // Before spawning the next goroutine, wait until this one has + // observably enqueued itself. The barrier holder plus i+1 file + // waiters should be present on /a. + waitQueueLen(t, s, "/a", 1+i+1) + } + + // Still held — nothing admitted yet. + select { + case got := <-order: + t.Fatalf("unexpected early admit %d while /a is held", got) + case <-time.After(20 * time.Millisecond): + } + + s.Done("/a", "", kindMutateBarrierDir) + + for i := 0; i < N; i++ { + select { + case got := <-order: + if got != i { + t.Fatalf("admit order[%d] = %d, want %d (FIFO violated)", i, got, i) + } + case <-time.After(time.Second): + t.Fatalf("only %d/%d admits completed", i, N) + } + } +} + +// TestAdmitSamePathNonBarrierSerializes verifies that two non-barrier dir +// updates at the same path no longer overlap (filer.sync's last-writer-wins +// optimization is intentionally dropped for streamed mutations, which carry +// client-submitted operations whose order matters). +func TestAdmitSamePathNonBarrierSerializes(t *testing.T) { + s := newMutateScheduler(100) + s.Admit("/a", "", kindMutateNonBarrierDir) + + second := make(chan struct{}) + go func() { + s.Admit("/a", "", kindMutateNonBarrierDir) + close(second) + }() + select { + case <-second: + t.Fatal("second non-barrier dir update should not run concurrently with first at same path") + case <-time.After(30 * time.Millisecond): + } + s.Done("/a", "", kindMutateNonBarrierDir) + select { + case <-second: + case <-time.After(time.Second): + t.Fatal("second non-barrier admit did not unblock after first Done") + } + s.Done("/a", "", kindMutateNonBarrierDir) +} + +// TestAdmitPressureFromManyWaiters: 100 goroutines all want /a; exactly one at +// a time is active; all 100 eventually complete. This is both a smoke test for +// the broadcast/signal wake-up and a guard against lost wake-ups. +func TestAdmitPressureFromManyWaiters(t *testing.T) { + s := newMutateScheduler(10) // cap higher than peak needed — path conflict should be the bottleneck + const N = 100 + + var activeAtOnce atomic.Int32 + var peak atomic.Int32 + done := make(chan struct{}, N) + + for i := 0; i < N; i++ { + go func() { + s.Admit("/a", "", kindMutateFile) + a := activeAtOnce.Add(1) + for { + p := peak.Load() + if a <= p || peak.CompareAndSwap(p, a) { + break + } + } + time.Sleep(time.Millisecond) // keep admission held so overlap is observable + activeAtOnce.Add(-1) + s.Done("/a", "", kindMutateFile) + done <- struct{}{} + }() + } + + for i := 0; i < N; i++ { + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatalf("only %d/%d admits completed — possible lost wake-up", i, N) + } + } + if peak.Load() != 1 { + t.Errorf("peak concurrent same-path admits = %d, want 1", peak.Load()) + } +}