diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 76dcb4dee..614a96e13 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -9,8 +9,6 @@ import ( "path/filepath" "time" - "github.com/seaweedfs/seaweedfs/weed/cluster" - "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -186,6 +184,13 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr newEntry.TtlSec = 0 } + // Serialize concurrent mutations to the same path on this filer so the + // read (existence/condition) and the write are atomic. Callers route a + // key's writes to this owner filer, making this local lock sufficient. + fullpath := util.NewFullPath(req.Directory, req.Entry.Name) + pathLock := fs.entryLockTable.AcquireLock("CreateEntry", fullpath, util.ExclusiveLock) + defer fs.entryLockTable.ReleaseLock(fullpath, pathLock) + ctx, eventSink := filer.WithMetadataEventSink(ctx) createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory, so.MaxFileNameLength) @@ -328,9 +333,11 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo glog.V(4).InfofCtx(ctx, "AppendToEntry %v", req) fullpath := util.NewFullPath(req.Directory, req.EntryName) - lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host) - lock := lockClient.NewShortLivedLock(string(fullpath), string(fs.option.Host)) - defer lock.StopShortLivedLock() + // Serialize the read-modify-write against concurrent mutations to the same + // path on this filer. The append must route to this entry's owner filer for + // this local lock to be authoritative. + pathLock := fs.entryLockTable.AcquireLock("AppendToEntry", fullpath, util.ExclusiveLock) + defer fs.entryLockTable.ReleaseLock(fullpath, pathLock) var offset int64 = 0 entry, err := fs.filer.FindEntry(ctx, fullpath) diff --git a/weed/server/filer_grpc_server_create_lock_test.go b/weed/server/filer_grpc_server_create_lock_test.go new file mode 100644 index 000000000..eb79886aa --- /dev/null +++ b/weed/server/filer_grpc_server_create_lock_test.go @@ -0,0 +1,59 @@ +package weed_server + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// Concurrent OExcl creates for the same path must yield exactly one winner. The +// filer's CreateEntry is a FindEntry-then-Insert; without the per-path lock both +// racers observe "not found" and both insert. The exclusive entry lock makes the +// check-then-act atomic so the losers see ErrEntryAlreadyExists. +func TestCreateEntryOExclSerialized(t *testing.T) { + store := newRenameTestStore() + store.findDelay = 5 * time.Millisecond + f := newRenameTestFiler(store) + f.DirBucketsPath = "/buckets" + + fs := &FilerServer{ + filer: f, + option: &FilerOption{}, + entryLockTable: util.NewLockTable[util.FullPath](), + } + + const racers = 8 + var success int32 + var wg sync.WaitGroup + start := make(chan struct{}) + for i := 0; i < racers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-start + resp, err := fs.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ + Directory: "/test", + OExcl: true, + SkipCheckParentDirectory: true, + Entry: &filer_pb.Entry{ + Name: "obj", + Attributes: &filer_pb.FuseAttributes{Mtime: 1700000000, FileMode: 0644, Inode: 1}, + }, + }) + if err == nil && resp.Error == "" { + atomic.AddInt32(&success, 1) + } + }() + } + close(start) + wg.Wait() + + if success != 1 { + t.Fatalf("expected exactly 1 OExcl winner, got %d", success) + } +} diff --git a/weed/server/filer_grpc_server_rename_test.go b/weed/server/filer_grpc_server_rename_test.go index aec0c0740..3fe9392a1 100644 --- a/weed/server/filer_grpc_server_rename_test.go +++ b/weed/server/filer_grpc_server_rename_test.go @@ -29,6 +29,7 @@ type renameTestStore struct { findCalls map[string]int commitErr error deleteErr error + findDelay time.Duration // optional: widen check-then-act windows in tests } func newRenameTestStore() *renameTestStore { @@ -69,6 +70,9 @@ func (s *renameTestStore) UpdateEntry(_ context.Context, entry *filer.Entry) err } func (s *renameTestStore) FindEntry(_ context.Context, p util.FullPath) (*filer.Entry, error) { + if s.findDelay > 0 { + time.Sleep(s.findDelay) + } s.mu.Lock() defer s.mu.Unlock() s.findCalls[string(p)]++ diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 52b42be3a..786dc1345 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -124,6 +124,13 @@ type FilerServer struct { // mountPeerRegistry backs the MountRegister / MountList RPCs for peer // chunk sharing (tier 1). Always populated. mountPeerRegistry *filer.MountPeerRegistry + + // entryLockTable serializes mutations to the same entry path on this filer. + // It is the local serialization point for read-modify-write operations + // (conditional create, append) once writers for a key are routed to this + // node, replacing the distributed lock for that purpose. Idle keys are + // evicted automatically, so the table stays bounded. + entryLockTable *util.LockTable[util.FullPath] } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { @@ -162,6 +169,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), recentCopyRequests: make(map[string]recentCopyRequest), CredentialManager: option.CredentialManager, + entryLockTable: util.NewLockTable[util.FullPath](), } fs.mountPeerRegistry = filer.NewMountPeerRegistry() go fs.runMountPeerRegistrySweeper()