Files
seaweedfs/weed/server/filer_grpc_server_stream_mutate.go
T
Chris Lu c40db5a52d perf(filer): parallelize StreamMutateEntry with path-keyed scheduler (#9171)
* perf(filer): parallelize StreamMutateEntry with path-keyed scheduler

The server handler processed one mutation at a time per stream, capping a
mount's aggregate throughput at ~1/filer_store_service_time regardless of
client concurrency (see issue #9138). With 12 rclone processes this showed
as a ~500 QPS ceiling on a filer that previously served ~1000+ QPS via
unary CreateEntry.

Replace the serial for-loop with a per-request goroutine admitted by a
path-keyed scheduler, adapted directly from filer.sync's MetadataProcessor
(weed/command/filer_sync_jobs.go). Same four conflict indexes, same kind
taxonomy (file / barrier-dir / non-barrier-dir), same ancestor-barrier
and descendant-barrier rules. Cross-path mutations run in parallel; same-
path mutations serialize on arrival order; recursive delete and directory
rename act as subtree barriers; directory attribute bumps stay non-barrier
so they do not serialize file writes under them.

Correctness and safety:
- Per-stream goroutine cap (streamMutateConcurrency = 64) bounds resource
  use from a single noisy mount.
- syncStream wrapper serializes stream.Send across worker goroutines (gRPC
  Send is not concurrent-safe).
- Handler waits on in-flight workers before returning on recv EOF/error so
  no worker writes to a torn-down stream.
- First fatal Send error from any worker propagates as the handler's
  return, causing the stream to tear down.

Benchmark (2 ms simulated filer-store service delay, 12 client workers):
  serial    : 440 QPS
  sem only  : 4902 QPS (unsafe — reorders same-path ops)
  scheduler : 4934 QPS on distinct paths, 439 QPS on same path (correct)

The sem-only number shows the upper bound of raw parallelism; the
scheduler matches it on distinct paths (the realistic 12-rclone case) and
correctly falls back to serial when the workload demands ordering. Peak
concurrent mutations at the handler equals client worker count on the
distinct-path workload and pins to 1 on the same-path workload, as the
scheduler intends.

* perf(filer): decouple StreamMutateEntry admission from receive loop

The previous StreamMutateEntry handler called sched.Admit directly in the
Recv loop. A single request conflicting on path /hot would head-of-line
block stream.Recv, so later requests targeting unrelated paths could not
be received or admitted until /hot drained — cross-path parallelism then
depended on request ordering instead of being a property of the scheduler.

Spawn the worker goroutine immediately on Recv and move sched.Admit into
that goroutine. A new streamMutatePendingLimit (1024) caps total per-
stream outstanding goroutines (pending + active) so a client flooding a
conflicted path cannot explode goroutine count without bound.

Addresses #9171 review comment (coderabbitai, Major).

* fix(filer): reply with EINVAL on unknown StreamMutateEntry request type

Returning nil when req.Request is a future oneof variant or a malformed
request left the client's per-RequestId waiter blocked forever, because
no response was ever sent for that id. Reply with IsLast=true and EINVAL
so the waiter completes with a well-formed error.

Addresses #9171 review comments (gemini-code-assist, coderabbitai).

* fix(filer): make classifyMutation crash-free and correct for deletes

Two issues addressed together because they share one function:

1. Nil-entry panic. classifyMutation dereferenced req.Entry.Name without
   a nil guard; an empty create_request / update_request / rename_request
   from a misbehaving client crashed the scheduler. Guard each oneof
   variant and fall back to a "/" barrier; the handler then sends EINVAL
   via the unknown-request path.

2. Non-recursive delete vs concurrent dir attribute update. DeleteEntry-
   Request does not carry IsDirectory, so the previous kindMutateFile
   classification for non-recursive deletes did not conflict with an in-
   flight kindMutateNonBarrierDir (chmod / xattr / mtime) at the same
   path — a race in scheduler terms. Classify every delete as
   kindMutateBarrierDir regardless of IsRecursive. The incremental cost
   of a descendant-wait for a non-recursive delete of a non-empty dir is
   negligible since that call fails at the store anyway.

Adds classifyMutation tests for malformed create/update, empty oneof,
and updates the delete-non-recursive case to the new expected kind.

Addresses #9171 review comments (coderabbitai Critical, Major).

* fix(filer): route renameStreamProxy.SendMsg through the wrapping Send

The default pass-through SendMsg on renameStreamProxy bypassed the
syncStream mutex and the StreamMutateEntryResponse wrapping: anything
the rename helpers happened to push via SendMsg would have been emitted
on the wire as the wrong protobuf type and could interleave with other
workers' Sends. RecvMsg similarly raced with the outer StreamMutateEntry
Recv loop and could steal unrelated mutation requests.

Route SendMsg through the wrapping Send (rejecting other payload types)
and fail RecvMsg explicitly — the rename logic is a strictly server-push
stream and never calls RecvMsg, so loud failure is safer than silent
stealing.

Addresses #9171 review comment (coderabbitai, Major).

* test(filer): run exactly ops in stream-mutate workloads

perGoroutine := ops / concurrency silently truncated the total when the
values were not divisible — e.g. 2400 ops with 64 workers actually ran
2368 and with 256 workers ran 2304, making the logged "ops per run"
inaccurate and introducing measurement noise that varied across the
concurrency sweep.

Introduce opsForWorker(g, concurrency, ops) which distributes the
remainder to the first (ops % concurrency) workers so the three
workloads (unary, stream sync, stream async) each dispatch exactly
`ops` operations. No changes to the timing methodology.

Addresses #9171 review comment (coderabbitai, Minor).

* fix(filer): enforce per-path FIFO admission in mutateScheduler

sync.Cond.Broadcast wakes every waiter; the first to re-acquire the
mutex wins, so two conflicting same-path admissions could be reordered
by the Go runtime even though they arrived serially on the stream. A
single stream is supposed to carry ordered mutations — the PR's original
#8770 claim — so admission must be FIFO per path.

Replace the single cond with a per-path FIFO queue. Each Admit enqueues
a waiter on every path it touches (primary, and on rename the secondary
too) and blocks on a ready channel. tryPromoteLocked admits any waiter
that is at the head of every queue it joined, passes pathConflictsLocked
against the active-state indexes, and is under concurrencyLimit. Done
removes the heads and re-runs tryPromoteLocked so waiters freed by the
completion move in arrival order.

Side effect: two non-barrier directory updates on the same path now
serialize instead of overlapping. filer.sync's MetadataProcessor
intentionally allows them to overlap because its events come from a
committed log where last-writer-wins coalescing is safe; streamed
mutations carry client operations whose order matters, so we drop that
optimization here. Added TestAdmitSamePathFIFO (20-waiter barrier
release) and TestAdmitSamePathNonBarrierSerializes to cover both.

Also refreshed the kindMutateFile doc comment that still referenced the
pre-#1ecf805f5 "non-recursive delete" classification.

Addresses #9171 review comments (coderabbitai Critical, Minor).

* test(filer): make TestAdmitSamePathFIFO deterministic without sleeps

The previous arrival-ordering sync (send to `started` before calling
Admit, plus a 1 ms sleep) relied on the goroutine actually entering
Admit and reaching the per-path queue during that sleep. Under -race on
a loaded CI that is a real flake source, which is ironic for a test
whose job is catching non-deterministic wake-ups.

Observe the scheduler's own pathQueue length between spawns instead —
waitQueueLen polls s.pathQueue["/a"] under s.mu until the expected
number of waiters (1 barrier holder + i+1 file waiters) is enqueued.
That's the exact event the test wants to synchronise on, so there is no
fudge factor. Verified by `go test -race -count=5`.

Addresses #9171 review comment (coderabbitai, Minor).
2026-04-21 11:25:09 -07:00

261 lines
8.9 KiB
Go

package weed_server
import (
"context"
"fmt"
"io"
"strings"
"sync"
"syscall"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// streamMutateConcurrency bounds the number of in-flight mutations processed
// concurrently per client stream. Set to the typical filer-store sweet spot
// so one noisy mount cannot exhaust filer resources.
const streamMutateConcurrency = 64
// streamMutatePendingLimit caps total outstanding requests per stream
// (admitted + waiting-for-admission). Prevents goroutine explosion when a
// client floods requests against a conflicted path while leaving enough
// headroom that cross-path requests never block Recv in practice.
const streamMutatePendingLimit = 1024
// syncStream wraps a bidi stream so that concurrent goroutines can Send
// without interleaving frames. gRPC requires that Send not be called
// concurrently; this mutex is the serialization point.
type syncStream struct {
stream grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse]
mu sync.Mutex
}
func (s *syncStream) Send(r *filer_pb.StreamMutateEntryResponse) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.stream.Send(r)
}
func (fs *FilerServer) StreamMutateEntry(stream grpc.BidiStreamingServer[filer_pb.StreamMutateEntryRequest, filer_pb.StreamMutateEntryResponse]) error {
ss := &syncStream{stream: stream}
// Path-keyed admission + subtree barriers, adapted from filer.sync's
// MetadataProcessor (weed/command/filer_sync_jobs.go). Admit blocks when
// a new request conflicts with an in-flight one on the same path or with
// a barrier directory at the same path / an ancestor.
sched := newMutateScheduler(streamMutateConcurrency)
// pendingSem caps goroutines-per-stream. The receive loop only blocks on
// this sem (not on Admit), so one conflicted path cannot head-of-line
// block receipt of unrelated paths — later distinct-path requests can be
// spawned, admitted, and processed while the conflicted request waits.
pendingSem := make(chan struct{}, streamMutatePendingLimit)
var wg sync.WaitGroup
// Track the first fatal send error across worker goroutines.
var sendErrMu sync.Mutex
var sendErr error
setSendErr := func(e error) {
sendErrMu.Lock()
if sendErr == nil {
sendErr = e
}
sendErrMu.Unlock()
}
getSendErr := func() error {
sendErrMu.Lock()
defer sendErrMu.Unlock()
return sendErr
}
for {
if e := getSendErr(); e != nil {
wg.Wait()
return e
}
req, err := stream.Recv()
if err == io.EOF {
wg.Wait()
return getSendErr()
}
if err != nil {
wg.Wait()
return err
}
primary, secondary, kind := classifyMutation(req)
pendingSem <- struct{}{}
wg.Add(1)
go func(req *filer_pb.StreamMutateEntryRequest, p, s util.FullPath, k mutateJobKind) {
defer wg.Done()
defer func() { <-pendingSem }()
// Admission happens off the Recv loop so a conflicted path never
// blocks receipt of unrelated requests.
sched.Admit(p, s, k)
defer sched.Done(p, s, k)
if e := fs.handleStreamMutateRequest(ss, req); e != nil {
setSendErr(e)
}
}(req, primary, secondary, kind)
}
}
// handleStreamMutateRequest processes one request and sends exactly one
// (possibly multi-message) response via the shared sync stream. It returns
// a non-nil error only if Send fails — in which case the caller should
// tear down the stream.
func (fs *FilerServer) handleStreamMutateRequest(ss *syncStream, req *filer_pb.StreamMutateEntryRequest) error {
switch r := req.Request.(type) {
case *filer_pb.StreamMutateEntryRequest_CreateRequest:
resp, createErr := fs.CreateEntry(ss.stream.Context(), r.CreateRequest)
if createErr != nil {
resp = &filer_pb.CreateEntryResponse{Error: createErr.Error()}
}
out := &filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Response: &filer_pb.StreamMutateEntryResponse_CreateResponse{CreateResponse: resp},
}
if resp.Error != "" {
out.Error = resp.Error
out.Errno = int32(syscall.EIO)
}
return ss.Send(out)
case *filer_pb.StreamMutateEntryRequest_UpdateRequest:
resp, updateErr := fs.UpdateEntry(ss.stream.Context(), r.UpdateRequest)
if updateErr != nil {
resp = &filer_pb.UpdateEntryResponse{}
}
out := &filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Response: &filer_pb.StreamMutateEntryResponse_UpdateResponse{UpdateResponse: resp},
}
if updateErr != nil {
out.Error = updateErr.Error()
out.Errno = int32(syscall.EIO)
}
return ss.Send(out)
case *filer_pb.StreamMutateEntryRequest_DeleteRequest:
resp, deleteErr := fs.DeleteEntry(ss.stream.Context(), r.DeleteRequest)
if deleteErr != nil {
resp = &filer_pb.DeleteEntryResponse{Error: deleteErr.Error()}
}
out := &filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Response: &filer_pb.StreamMutateEntryResponse_DeleteResponse{DeleteResponse: resp},
}
if resp.Error != "" {
out.Error = resp.Error
out.Errno = int32(syscall.EIO)
}
return ss.Send(out)
case *filer_pb.StreamMutateEntryRequest_RenameRequest:
return fs.handleStreamMutateRename(ss, req.RequestId, r.RenameRequest)
default:
// Send a terminal error response so the client's per-RequestId waiter
// is released; returning nil here would leak the client-side waiter
// forever when a future oneof variant or a malformed request arrives.
glog.Warningf("StreamMutateEntry: unknown request type %T", req.Request)
return ss.Send(&filer_pb.StreamMutateEntryResponse{
RequestId: req.RequestId,
IsLast: true,
Error: "unknown request type",
Errno: int32(syscall.EINVAL),
})
}
}
// handleStreamMutateRename delegates to the existing StreamRenameEntry logic
// using a proxy stream that converts StreamRenameEntryResponse events into
// StreamMutateEntryResponse messages on the parent bidi stream.
func (fs *FilerServer) handleStreamMutateRename(
parent *syncStream,
requestId uint64,
req *filer_pb.StreamRenameEntryRequest,
) error {
proxy := &renameStreamProxy{parent: parent, requestId: requestId}
renameErr := fs.StreamRenameEntry(req, proxy)
// Always send a final is_last=true to signal rename completion.
finalResp := &filer_pb.StreamMutateEntryResponse{
RequestId: requestId,
IsLast: true,
Response: &filer_pb.StreamMutateEntryResponse_RenameResponse{
RenameResponse: &filer_pb.StreamRenameEntryResponse{},
},
}
if renameErr != nil {
finalResp.Error = renameErr.Error()
finalResp.Errno = renameErrno(renameErr)
glog.V(0).Infof("StreamMutateEntry rename: %v", renameErr)
}
return parent.Send(finalResp)
}
// renameStreamProxy adapts the bidi StreamMutateEntry stream to look like a
// SeaweedFiler_StreamRenameEntryServer, which is what StreamRenameEntry and
// moveEntry expect. Each Send() call forwards the response as a non-final
// StreamMutateEntryResponse.
type renameStreamProxy struct {
parent *syncStream
requestId uint64
}
func (p *renameStreamProxy) Send(resp *filer_pb.StreamRenameEntryResponse) error {
return p.parent.Send(&filer_pb.StreamMutateEntryResponse{
RequestId: p.requestId,
IsLast: false,
Response: &filer_pb.StreamMutateEntryResponse_RenameResponse{RenameResponse: resp},
})
}
func (p *renameStreamProxy) Context() context.Context {
return p.parent.stream.Context()
}
// SendMsg routes through Send so the payload is wrapped into a
// StreamMutateEntryResponse and goes through the syncStream mutex. Calling
// SendMsg with anything other than *filer_pb.StreamRenameEntryResponse would
// emit the wrong protobuf type on this RPC, so reject other shapes.
func (p *renameStreamProxy) SendMsg(m any) error {
resp, ok := m.(*filer_pb.StreamRenameEntryResponse)
if !ok {
return fmt.Errorf("renameStreamProxy.SendMsg: unexpected type %T", m)
}
return p.Send(resp)
}
// RecvMsg on the proxy would race with the outer StreamMutateEntry recv
// loop and could steal unrelated mutation requests. The rename logic never
// calls RecvMsg (it is strictly a server-push stream), so fail loudly if it
// ever does.
func (p *renameStreamProxy) RecvMsg(m any) error {
return fmt.Errorf("renameStreamProxy.RecvMsg is not supported")
}
func (p *renameStreamProxy) SetHeader(md metadata.MD) error { return p.parent.stream.SetHeader(md) }
func (p *renameStreamProxy) SendHeader(md metadata.MD) error { return p.parent.stream.SendHeader(md) }
func (p *renameStreamProxy) SetTrailer(md metadata.MD) { p.parent.stream.SetTrailer(md) }
// renameErrno maps a rename error to a POSIX errno for the client.
func renameErrno(err error) int32 {
msg := err.Error()
switch {
case strings.Contains(msg, "not found"):
return int32(syscall.ENOENT)
case strings.Contains(msg, "not empty"):
return int32(syscall.ENOTEMPTY)
case strings.Contains(msg, "not directory"):
return int32(syscall.ENOTDIR)
default:
return int32(syscall.EIO)
}
}