mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
filer: serialize same-path mutations with a local lock
CreateEntry is a FindEntry-then-write with no lock, so concurrent creates to the same path race: OExcl can admit two creators, and a conditional write has no atomic check-then-act. Add a per-path exclusive lock (util.LockTable, which evicts idle keys so it stays bounded) in the CreateEntry handler so the read and the write are atomic on this filer. Once callers route a key's writes to its owner filer, this local lock is the authoritative serialization point. AppendToEntry moves from the distributed lock to the same per-path lock.
This commit is contained in:
@@ -9,8 +9,6 @@ import (
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||
@@ -186,6 +184,13 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
|
||||
newEntry.TtlSec = 0
|
||||
}
|
||||
|
||||
// Serialize concurrent mutations to the same path on this filer so the
|
||||
// read (existence/condition) and the write are atomic. Callers route a
|
||||
// key's writes to this owner filer, making this local lock sufficient.
|
||||
fullpath := util.NewFullPath(req.Directory, req.Entry.Name)
|
||||
pathLock := fs.entryLockTable.AcquireLock("CreateEntry", fullpath, util.ExclusiveLock)
|
||||
defer fs.entryLockTable.ReleaseLock(fullpath, pathLock)
|
||||
|
||||
ctx, eventSink := filer.WithMetadataEventSink(ctx)
|
||||
createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory, so.MaxFileNameLength)
|
||||
|
||||
@@ -328,9 +333,11 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
|
||||
glog.V(4).InfofCtx(ctx, "AppendToEntry %v", req)
|
||||
fullpath := util.NewFullPath(req.Directory, req.EntryName)
|
||||
|
||||
lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host)
|
||||
lock := lockClient.NewShortLivedLock(string(fullpath), string(fs.option.Host))
|
||||
defer lock.StopShortLivedLock()
|
||||
// Serialize the read-modify-write against concurrent mutations to the same
|
||||
// path on this filer. The append must route to this entry's owner filer for
|
||||
// this local lock to be authoritative.
|
||||
pathLock := fs.entryLockTable.AcquireLock("AppendToEntry", fullpath, util.ExclusiveLock)
|
||||
defer fs.entryLockTable.ReleaseLock(fullpath, pathLock)
|
||||
|
||||
var offset int64 = 0
|
||||
entry, err := fs.filer.FindEntry(ctx, fullpath)
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
// Concurrent OExcl creates for the same path must yield exactly one winner. The
|
||||
// filer's CreateEntry is a FindEntry-then-Insert; without the per-path lock both
|
||||
// racers observe "not found" and both insert. The exclusive entry lock makes the
|
||||
// check-then-act atomic so the losers see ErrEntryAlreadyExists.
|
||||
func TestCreateEntryOExclSerialized(t *testing.T) {
|
||||
store := newRenameTestStore()
|
||||
store.findDelay = 5 * time.Millisecond
|
||||
f := newRenameTestFiler(store)
|
||||
f.DirBucketsPath = "/buckets"
|
||||
|
||||
fs := &FilerServer{
|
||||
filer: f,
|
||||
option: &FilerOption{},
|
||||
entryLockTable: util.NewLockTable[util.FullPath](),
|
||||
}
|
||||
|
||||
const racers = 8
|
||||
var success int32
|
||||
var wg sync.WaitGroup
|
||||
start := make(chan struct{})
|
||||
for i := 0; i < racers; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-start
|
||||
resp, err := fs.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
|
||||
Directory: "/test",
|
||||
OExcl: true,
|
||||
SkipCheckParentDirectory: true,
|
||||
Entry: &filer_pb.Entry{
|
||||
Name: "obj",
|
||||
Attributes: &filer_pb.FuseAttributes{Mtime: 1700000000, FileMode: 0644, Inode: 1},
|
||||
},
|
||||
})
|
||||
if err == nil && resp.Error == "" {
|
||||
atomic.AddInt32(&success, 1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
close(start)
|
||||
wg.Wait()
|
||||
|
||||
if success != 1 {
|
||||
t.Fatalf("expected exactly 1 OExcl winner, got %d", success)
|
||||
}
|
||||
}
|
||||
@@ -29,6 +29,7 @@ type renameTestStore struct {
|
||||
findCalls map[string]int
|
||||
commitErr error
|
||||
deleteErr error
|
||||
findDelay time.Duration // optional: widen check-then-act windows in tests
|
||||
}
|
||||
|
||||
func newRenameTestStore() *renameTestStore {
|
||||
@@ -69,6 +70,9 @@ func (s *renameTestStore) UpdateEntry(_ context.Context, entry *filer.Entry) err
|
||||
}
|
||||
|
||||
func (s *renameTestStore) FindEntry(_ context.Context, p util.FullPath) (*filer.Entry, error) {
|
||||
if s.findDelay > 0 {
|
||||
time.Sleep(s.findDelay)
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.findCalls[string(p)]++
|
||||
|
||||
@@ -124,6 +124,13 @@ type FilerServer struct {
|
||||
// mountPeerRegistry backs the MountRegister / MountList RPCs for peer
|
||||
// chunk sharing (tier 1). Always populated.
|
||||
mountPeerRegistry *filer.MountPeerRegistry
|
||||
|
||||
// entryLockTable serializes mutations to the same entry path on this filer.
|
||||
// It is the local serialization point for read-modify-write operations
|
||||
// (conditional create, append) once writers for a key are routed to this
|
||||
// node, replacing the distributed lock for that purpose. Idle keys are
|
||||
// evicted automatically, so the table stays bounded.
|
||||
entryLockTable *util.LockTable[util.FullPath]
|
||||
}
|
||||
|
||||
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
|
||||
@@ -162,6 +169,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
|
||||
inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
|
||||
recentCopyRequests: make(map[string]recentCopyRequest),
|
||||
CredentialManager: option.CredentialManager,
|
||||
entryLockTable: util.NewLockTable[util.FullPath](),
|
||||
}
|
||||
fs.mountPeerRegistry = filer.NewMountPeerRegistry()
|
||||
go fs.runMountPeerRegistrySweeper()
|
||||
|
||||
Reference in New Issue
Block a user