diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index aa90a4ed7..1ac56b025 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -304,11 +304,16 @@ message DeleteEntryRequest { bool is_from_other_cluster = 7; repeated int32 signatures = 8; int64 if_not_modified_after = 9; + // Optional precondition evaluated against the current entry atomically with + // the delete, under the filer's per-path lock. Only honored for a + // non-recursive single-entry delete routed to the entry's owner filer. + WriteCondition condition = 10; } message DeleteEntryResponse { string error = 1; SubscribeMetadataResponse metadata_event = 2; + FilerError error_code = 3; // machine-readable error code (e.g. PRECONDITION_FAILED) } message AtomicRenameEntryRequest { diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index 18ffc61bf..dcbe7ffd7 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -70,6 +70,18 @@ func (lc *LockClient) hostForKey(key string) pb.ServerAddress { return lc.seedFiler } +// PrimaryForKey returns the filer that owns key per the current ring view, or +// "" when no ring view has been received yet. Callers use it to route a key's +// operations to a single owner so that owner's local lock serializes them. +func (lc *LockClient) PrimaryForKey(key string) pb.ServerAddress { + lc.ringMu.RLock() + defer lc.ringMu.RUnlock() + if lc.ring == nil { + return "" + } + return lc.ring.GetPrimary(key) +} + type LiveLock struct { key string renewToken string diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index aa90a4ed7..1ac56b025 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -304,11 +304,16 @@ message DeleteEntryRequest { bool is_from_other_cluster = 7; repeated int32 signatures = 8; int64 if_not_modified_after = 9; + // Optional precondition evaluated against the current entry atomically with + // the delete, under the filer's per-path lock. Only honored for a + // non-recursive single-entry delete routed to the entry's owner filer. + WriteCondition condition = 10; } message DeleteEntryResponse { string error = 1; SubscribeMetadataResponse metadata_event = 2; + FilerError error_code = 3; // machine-readable error code (e.g. PRECONDITION_FAILED) } message AtomicRenameEntryRequest { diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index cc26203a2..547e9e6fe 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -1763,8 +1763,12 @@ type DeleteEntryRequest struct { IsFromOtherCluster bool `protobuf:"varint,7,opt,name=is_from_other_cluster,json=isFromOtherCluster,proto3" json:"is_from_other_cluster,omitempty"` Signatures []int32 `protobuf:"varint,8,rep,packed,name=signatures,proto3" json:"signatures,omitempty"` IfNotModifiedAfter int64 `protobuf:"varint,9,opt,name=if_not_modified_after,json=ifNotModifiedAfter,proto3" json:"if_not_modified_after,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // Optional precondition evaluated against the current entry atomically with + // the delete, under the filer's per-path lock. Only honored for a + // non-recursive single-entry delete routed to the entry's owner filer. + Condition *WriteCondition `protobuf:"bytes,10,opt,name=condition,proto3" json:"condition,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *DeleteEntryRequest) Reset() { @@ -1853,10 +1857,18 @@ func (x *DeleteEntryRequest) GetIfNotModifiedAfter() int64 { return 0 } +func (x *DeleteEntryRequest) GetCondition() *WriteCondition { + if x != nil { + return x.Condition + } + return nil +} + type DeleteEntryResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` MetadataEvent *SubscribeMetadataResponse `protobuf:"bytes,2,opt,name=metadata_event,json=metadataEvent,proto3" json:"metadata_event,omitempty"` + ErrorCode FilerError `protobuf:"varint,3,opt,name=error_code,json=errorCode,proto3,enum=filer_pb.FilerError" json:"error_code,omitempty"` // machine-readable error code (e.g. PRECONDITION_FAILED) unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1905,6 +1917,13 @@ func (x *DeleteEntryResponse) GetMetadataEvent() *SubscribeMetadataResponse { return nil } +func (x *DeleteEntryResponse) GetErrorCode() FilerError { + if x != nil { + return x.ErrorCode + } + return FilerError_OK +} + type AtomicRenameEntryRequest struct { state protoimpl.MessageState `protogen:"open.v1"` OldDirectory string `protobuf:"bytes,1,opt,name=old_directory,json=oldDirectory,proto3" json:"old_directory,omitempty"` @@ -5779,7 +5798,7 @@ const file_filer_proto_rawDesc = "" + "\n" + "entry_name\x18\x02 \x01(\tR\tentryName\x12+\n" + "\x06chunks\x18\x03 \x03(\v2\x13.filer_pb.FileChunkR\x06chunks\"\x17\n" + - "\x15AppendToEntryResponse\"\xcb\x02\n" + + "\x15AppendToEntryResponse\"\x83\x03\n" + "\x12DeleteEntryRequest\x12\x1c\n" + "\tdirectory\x18\x01 \x01(\tR\tdirectory\x12\x12\n" + "\x04name\x18\x02 \x01(\tR\x04name\x12$\n" + @@ -5790,10 +5809,14 @@ const file_filer_proto_rawDesc = "" + "\n" + "signatures\x18\b \x03(\x05R\n" + "signatures\x121\n" + - "\x15if_not_modified_after\x18\t \x01(\x03R\x12ifNotModifiedAfter\"w\n" + + "\x15if_not_modified_after\x18\t \x01(\x03R\x12ifNotModifiedAfter\x126\n" + + "\tcondition\x18\n" + + " \x01(\v2\x18.filer_pb.WriteConditionR\tcondition\"\xac\x01\n" + "\x13DeleteEntryResponse\x12\x14\n" + "\x05error\x18\x01 \x01(\tR\x05error\x12J\n" + - "\x0emetadata_event\x18\x02 \x01(\v2#.filer_pb.SubscribeMetadataResponseR\rmetadataEvent\"\xba\x01\n" + + "\x0emetadata_event\x18\x02 \x01(\v2#.filer_pb.SubscribeMetadataResponseR\rmetadataEvent\x123\n" + + "\n" + + "error_code\x18\x03 \x01(\x0e2\x14.filer_pb.FilerErrorR\terrorCode\"\xba\x01\n" + "\x18AtomicRenameEntryRequest\x12#\n" + "\rold_directory\x18\x01 \x01(\tR\foldDirectory\x12\x19\n" + "\bold_name\x18\x02 \x01(\tR\aoldName\x12#\n" + @@ -6278,97 +6301,99 @@ var file_filer_proto_depIdxs = []int32{ 83, // 19: filer_pb.UpdateEntryRequest.expected_extended:type_name -> filer_pb.UpdateEntryRequest.ExpectedExtendedEntry 48, // 20: filer_pb.UpdateEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse 11, // 21: filer_pb.AppendToEntryRequest.chunks:type_name -> filer_pb.FileChunk - 48, // 22: filer_pb.DeleteEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse - 10, // 23: filer_pb.StreamRenameEntryResponse.event_notification:type_name -> filer_pb.EventNotification - 34, // 24: filer_pb.AssignVolumeResponse.location:type_name -> filer_pb.Location - 34, // 25: filer_pb.Locations.locations:type_name -> filer_pb.Location - 84, // 26: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry - 36, // 27: filer_pb.CollectionListResponse.collections:type_name -> filer_pb.Collection - 10, // 28: filer_pb.SubscribeMetadataResponse.event_notification:type_name -> filer_pb.EventNotification - 48, // 29: filer_pb.SubscribeMetadataResponse.events:type_name -> filer_pb.SubscribeMetadataResponse - 49, // 30: filer_pb.SubscribeMetadataResponse.log_file_refs:type_name -> filer_pb.LogFileChunkRef - 11, // 31: filer_pb.LogFileChunkRef.chunks:type_name -> filer_pb.FileChunk - 8, // 32: filer_pb.TraverseBfsMetadataResponse.entry:type_name -> filer_pb.Entry - 85, // 33: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource - 86, // 34: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf - 8, // 35: filer_pb.CacheRemoteObjectToLocalClusterResponse.entry:type_name -> filer_pb.Entry - 48, // 36: filer_pb.CacheRemoteObjectToLocalClusterResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse - 70, // 37: filer_pb.TransferLocksRequest.locks:type_name -> filer_pb.Lock - 15, // 38: filer_pb.StreamMutateEntryRequest.create_request:type_name -> filer_pb.CreateEntryRequest - 18, // 39: filer_pb.StreamMutateEntryRequest.update_request:type_name -> filer_pb.UpdateEntryRequest - 24, // 40: filer_pb.StreamMutateEntryRequest.delete_request:type_name -> filer_pb.DeleteEntryRequest - 28, // 41: filer_pb.StreamMutateEntryRequest.rename_request:type_name -> filer_pb.StreamRenameEntryRequest - 17, // 42: filer_pb.StreamMutateEntryResponse.create_response:type_name -> filer_pb.CreateEntryResponse - 19, // 43: filer_pb.StreamMutateEntryResponse.update_response:type_name -> filer_pb.UpdateEntryResponse - 25, // 44: filer_pb.StreamMutateEntryResponse.delete_response:type_name -> filer_pb.DeleteEntryResponse - 29, // 45: filer_pb.StreamMutateEntryResponse.rename_response:type_name -> filer_pb.StreamRenameEntryResponse - 81, // 46: filer_pb.MountListResponse.mounts:type_name -> filer_pb.MountInfo - 33, // 47: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations - 3, // 48: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest - 5, // 49: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest - 15, // 50: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest - 18, // 51: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest - 20, // 52: filer_pb.SeaweedFiler.TouchAccessTime:input_type -> filer_pb.TouchAccessTimeRequest - 22, // 53: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest - 24, // 54: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest - 26, // 55: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest - 28, // 56: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest - 75, // 57: filer_pb.SeaweedFiler.StreamMutateEntry:input_type -> filer_pb.StreamMutateEntryRequest - 30, // 58: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest - 32, // 59: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest - 37, // 60: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest - 39, // 61: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest - 41, // 62: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest - 43, // 63: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest - 45, // 64: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest - 50, // 65: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest - 47, // 66: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest - 47, // 67: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest - 57, // 68: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest - 59, // 69: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest - 62, // 70: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest - 64, // 71: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest - 66, // 72: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest - 68, // 73: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest - 71, // 74: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest - 73, // 75: filer_pb.SeaweedFiler.ReplicateLock:input_type -> filer_pb.ReplicateLockRequest - 77, // 76: filer_pb.SeaweedFiler.MountRegister:input_type -> filer_pb.MountRegisterRequest - 79, // 77: filer_pb.SeaweedFiler.MountList:input_type -> filer_pb.MountListRequest - 4, // 78: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse - 6, // 79: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse - 17, // 80: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse - 19, // 81: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse - 21, // 82: filer_pb.SeaweedFiler.TouchAccessTime:output_type -> filer_pb.TouchAccessTimeResponse - 23, // 83: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse - 25, // 84: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse - 27, // 85: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse - 29, // 86: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse - 76, // 87: filer_pb.SeaweedFiler.StreamMutateEntry:output_type -> filer_pb.StreamMutateEntryResponse - 31, // 88: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse - 35, // 89: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse - 38, // 90: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse - 40, // 91: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse - 42, // 92: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse - 44, // 93: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse - 46, // 94: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse - 51, // 95: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse - 48, // 96: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse - 48, // 97: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse - 58, // 98: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse - 60, // 99: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse - 63, // 100: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse - 65, // 101: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse - 67, // 102: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse - 69, // 103: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse - 72, // 104: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse - 74, // 105: filer_pb.SeaweedFiler.ReplicateLock:output_type -> filer_pb.ReplicateLockResponse - 78, // 106: filer_pb.SeaweedFiler.MountRegister:output_type -> filer_pb.MountRegisterResponse - 80, // 107: filer_pb.SeaweedFiler.MountList:output_type -> filer_pb.MountListResponse - 78, // [78:108] is the sub-list for method output_type - 48, // [48:78] is the sub-list for method input_type - 48, // [48:48] is the sub-list for extension type_name - 48, // [48:48] is the sub-list for extension extendee - 0, // [0:48] is the sub-list for field type_name + 16, // 22: filer_pb.DeleteEntryRequest.condition:type_name -> filer_pb.WriteCondition + 48, // 23: filer_pb.DeleteEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse + 1, // 24: filer_pb.DeleteEntryResponse.error_code:type_name -> filer_pb.FilerError + 10, // 25: filer_pb.StreamRenameEntryResponse.event_notification:type_name -> filer_pb.EventNotification + 34, // 26: filer_pb.AssignVolumeResponse.location:type_name -> filer_pb.Location + 34, // 27: filer_pb.Locations.locations:type_name -> filer_pb.Location + 84, // 28: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry + 36, // 29: filer_pb.CollectionListResponse.collections:type_name -> filer_pb.Collection + 10, // 30: filer_pb.SubscribeMetadataResponse.event_notification:type_name -> filer_pb.EventNotification + 48, // 31: filer_pb.SubscribeMetadataResponse.events:type_name -> filer_pb.SubscribeMetadataResponse + 49, // 32: filer_pb.SubscribeMetadataResponse.log_file_refs:type_name -> filer_pb.LogFileChunkRef + 11, // 33: filer_pb.LogFileChunkRef.chunks:type_name -> filer_pb.FileChunk + 8, // 34: filer_pb.TraverseBfsMetadataResponse.entry:type_name -> filer_pb.Entry + 85, // 35: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource + 86, // 36: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf + 8, // 37: filer_pb.CacheRemoteObjectToLocalClusterResponse.entry:type_name -> filer_pb.Entry + 48, // 38: filer_pb.CacheRemoteObjectToLocalClusterResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse + 70, // 39: filer_pb.TransferLocksRequest.locks:type_name -> filer_pb.Lock + 15, // 40: filer_pb.StreamMutateEntryRequest.create_request:type_name -> filer_pb.CreateEntryRequest + 18, // 41: filer_pb.StreamMutateEntryRequest.update_request:type_name -> filer_pb.UpdateEntryRequest + 24, // 42: filer_pb.StreamMutateEntryRequest.delete_request:type_name -> filer_pb.DeleteEntryRequest + 28, // 43: filer_pb.StreamMutateEntryRequest.rename_request:type_name -> filer_pb.StreamRenameEntryRequest + 17, // 44: filer_pb.StreamMutateEntryResponse.create_response:type_name -> filer_pb.CreateEntryResponse + 19, // 45: filer_pb.StreamMutateEntryResponse.update_response:type_name -> filer_pb.UpdateEntryResponse + 25, // 46: filer_pb.StreamMutateEntryResponse.delete_response:type_name -> filer_pb.DeleteEntryResponse + 29, // 47: filer_pb.StreamMutateEntryResponse.rename_response:type_name -> filer_pb.StreamRenameEntryResponse + 81, // 48: filer_pb.MountListResponse.mounts:type_name -> filer_pb.MountInfo + 33, // 49: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations + 3, // 50: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest + 5, // 51: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest + 15, // 52: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest + 18, // 53: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest + 20, // 54: filer_pb.SeaweedFiler.TouchAccessTime:input_type -> filer_pb.TouchAccessTimeRequest + 22, // 55: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest + 24, // 56: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest + 26, // 57: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest + 28, // 58: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest + 75, // 59: filer_pb.SeaweedFiler.StreamMutateEntry:input_type -> filer_pb.StreamMutateEntryRequest + 30, // 60: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest + 32, // 61: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest + 37, // 62: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest + 39, // 63: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest + 41, // 64: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest + 43, // 65: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest + 45, // 66: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest + 50, // 67: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest + 47, // 68: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest + 47, // 69: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest + 57, // 70: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest + 59, // 71: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest + 62, // 72: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest + 64, // 73: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest + 66, // 74: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest + 68, // 75: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest + 71, // 76: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest + 73, // 77: filer_pb.SeaweedFiler.ReplicateLock:input_type -> filer_pb.ReplicateLockRequest + 77, // 78: filer_pb.SeaweedFiler.MountRegister:input_type -> filer_pb.MountRegisterRequest + 79, // 79: filer_pb.SeaweedFiler.MountList:input_type -> filer_pb.MountListRequest + 4, // 80: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse + 6, // 81: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse + 17, // 82: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse + 19, // 83: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse + 21, // 84: filer_pb.SeaweedFiler.TouchAccessTime:output_type -> filer_pb.TouchAccessTimeResponse + 23, // 85: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse + 25, // 86: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse + 27, // 87: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse + 29, // 88: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse + 76, // 89: filer_pb.SeaweedFiler.StreamMutateEntry:output_type -> filer_pb.StreamMutateEntryResponse + 31, // 90: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse + 35, // 91: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse + 38, // 92: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse + 40, // 93: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse + 42, // 94: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse + 44, // 95: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse + 46, // 96: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse + 51, // 97: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse + 48, // 98: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse + 48, // 99: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse + 58, // 100: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse + 60, // 101: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse + 63, // 102: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse + 65, // 103: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse + 67, // 104: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse + 69, // 105: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse + 72, // 106: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse + 74, // 107: filer_pb.SeaweedFiler.ReplicateLock:output_type -> filer_pb.ReplicateLockResponse + 78, // 108: filer_pb.SeaweedFiler.MountRegister:output_type -> filer_pb.MountRegisterResponse + 80, // 109: filer_pb.SeaweedFiler.MountList:output_type -> filer_pb.MountListResponse + 80, // [80:110] is the sub-list for method output_type + 50, // [50:80] is the sub-list for method input_type + 50, // [50:50] is the sub-list for extension type_name + 50, // [50:50] is the sub-list for extension extendee + 0, // [0:50] is the sub-list for field type_name } func init() { file_filer_proto_init() } diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index 304a27825..636548198 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -214,33 +214,60 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque } var deleteResult deleteMutationResult - deleteCode := s3a.withObjectWriteLock(bucket, object, func() s3err.ErrorCode { - return s3a.checkDeleteIfMatch(bucket, object, versionId, versioningState, r.Header.Get(s3_constants.IfMatch), s3err.ErrPreconditionFailed) - }, func() s3err.ErrorCode { - if versioningConfigured { - result, errCode := s3a.deleteVersionedObject(r, bucket, object, versionId, versioningState) - if errCode != s3err.ErrNone { - return errCode + var deleteCode s3err.ErrorCode + + // Fast path: route an unversioned object delete to its owner filer, which + // serializes it and evaluates the If-Match precondition under the per-path + // lock, avoiding the distributed lock. Falls back below for versioned / + // object-lock buckets, unsupported conditions, or an unreachable owner. + deleteHandled := false + if !versioningConfigured { + if cond, condOk := buildDeleteCondition(r); condOk { + if owner, ownerOk := s3a.routedObjectOwner(bucket, object); ownerOk { + resp, err := s3a.deleteEntryOnFiler(owner, bucket, object, cond) + switch { + case err != nil: + glog.Warningf("DeleteObjectHandler: routed delete to %s failed for %s/%s, falling back to lock: %v", owner, bucket, object, err) + case resp.ErrorCode == filer_pb.FilerError_PRECONDITION_FAILED: + deleteCode, deleteHandled = s3err.ErrPreconditionFailed, true + case resp.Error != "": + glog.Errorf("DeleteObjectHandler: routed delete failed for %s/%s: %s", bucket, object, resp.Error) + deleteCode, deleteHandled = s3err.ErrInternalError, true + default: + deleteCode, deleteHandled = s3err.ErrNone, true + } } - deleteResult = result + } + } + if !deleteHandled { + deleteCode = s3a.withObjectWriteLock(bucket, object, func() s3err.ErrorCode { + return s3a.checkDeleteIfMatch(bucket, object, versionId, versioningState, r.Header.Get(s3_constants.IfMatch), s3err.ErrPreconditionFailed) + }, func() s3err.ErrorCode { + if versioningConfigured { + result, errCode := s3a.deleteVersionedObject(r, bucket, object, versionId, versioningState) + if errCode != s3err.ErrNone { + return errCode + } + deleteResult = result + return s3err.ErrNone + } + + governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object) + if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil { + glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err) + return s3err.ErrAccessDenied + } + + if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return s3a.deleteUnversionedObjectWithClient(client, bucket, object, false) + }); err != nil { + glog.Errorf("DeleteObjectHandler: failed to delete %s/%s: %v", bucket, object, err) + return s3err.ErrInternalError + } + return s3err.ErrNone - } - - governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object) - if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil { - glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err) - return s3err.ErrAccessDenied - } - - if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return s3a.deleteUnversionedObjectWithClient(client, bucket, object, false) - }); err != nil { - glog.Errorf("DeleteObjectHandler: failed to delete %s/%s: %v", bucket, object, err) - return s3err.ErrInternalError - } - - return s3err.ErrNone - }) + }) + } if deleteCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, deleteCode) return diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 01ef148d7..578d0ba0f 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -802,7 +802,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader } return s3a.checkConditionalHeaders(r, bucket, object) } - createCode := s3a.withObjectWriteLock(bucket, object, preconditionFn, func() s3err.ErrorCode { + createUnderLock := func() s3err.ErrorCode { createErr = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { req := &filer_pb.CreateEntryRequest{ Directory: path.Dir(filePath), @@ -831,7 +831,37 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader } } return s3err.ErrNone - }) + } + + // Fast path: route the create to the object key's owner filer, which + // serializes it with its local per-path lock and evaluates the precondition, + // avoiding the distributed lock. Restricted to the safe subset by + // routedObjectWrite; falls back to the lock for everything else and if the + // owner is unreachable. + var createCode s3err.ErrorCode + routed := false + if owner, cond, ok := s3a.routedObjectWrite(r, bucket, object, afterCreate != nil); ok { + req := &filer_pb.CreateEntryRequest{ + Directory: path.Dir(filePath), + Entry: entry, + Condition: cond, + } + resp, err := s3a.createEntryOnFiler(owner, req) + switch { + case err != nil: + glog.Warningf("putToFiler: routed create to %s failed for %s, falling back to lock: %v", owner, filePath, err) + case resp.ErrorCode == filer_pb.FilerError_PRECONDITION_FAILED: + createCode, routed = s3err.ErrPreconditionFailed, true + case resp.Error != "": + createErr = fmt.Errorf("%s", resp.Error) + createCode, routed = filerErrorToS3Error(createErr), true + default: + entryCreated, createCode, routed = true, s3err.ErrNone, true + } + } + if !routed { + createCode = s3a.withObjectWriteLock(bucket, object, preconditionFn, createUnderLock) + } if createCode != s3err.ErrNone { if createErr != nil { glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr) diff --git a/weed/s3api/s3api_object_routed_write.go b/weed/s3api/s3api_object_routed_write.go new file mode 100644 index 000000000..4d3144410 --- /dev/null +++ b/weed/s3api/s3api_object_routed_write.go @@ -0,0 +1,168 @@ +package s3api + +import ( + "context" + "fmt" + "net/http" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// routedObjectOwner returns the filer that owns this object's metadata for +// route-by-key, or ok=false when the object's writes must keep the distributed +// lock. Versioned and object-lock buckets stay on the lock path: their +// mutations span multiple entries / extra metadata checks a single conditional +// create or delete does not cover. On any lookup error it falls back to be safe. +func (s3a *S3ApiServer) routedObjectOwner(bucket, object string) (pb.ServerAddress, bool) { + if object == "" || s3a.objectWriteLockClient == nil { + return "", false + } + if configured, err := s3a.isVersioningConfigured(bucket); err != nil || configured { + return "", false + } + if locked, err := s3a.isObjectLockEnabled(bucket); err != nil || locked { + return "", false + } + lockKey := fmt.Sprintf("s3.object.write:%s", s3a.toFilerPath(bucket, object)) + owner := s3a.objectWriteLockClient.PrimaryForKey(lockKey) + if owner == "" { + return "", false + } + return owner, true +} + +// routedObjectWrite decides whether an object PUT can take the route-by-key +// fast path: the metadata write goes straight to the key's owner filer, which +// serializes it with its local per-path lock and evaluates the precondition, +// instead of acquiring a distributed lock. It returns the owner filer and the +// reduced precondition, or ok=false to fall back to withObjectWriteLock. +// +// The fast path additionally requires no post-create hook and a precondition +// that reduces to one primitive. +func (s3a *S3ApiServer) routedObjectWrite(r *http.Request, bucket, object string, hasAfterCreate bool) (owner pb.ServerAddress, cond *filer_pb.WriteCondition, ok bool) { + if hasAfterCreate { + return "", nil, false + } + cond, condOk := buildWriteCondition(r) + if !condOk { + return "", nil, false + } + owner, ok = s3a.routedObjectOwner(bucket, object) + if !ok { + return "", nil, false + } + return owner, cond, true +} + +// buildWriteCondition reduces the request's conditional headers to a single +// WriteCondition primitive the filer can evaluate. It only handles the +// unambiguous single-condition cases; anything else (header combinations, +// time-based conditions, ETag lists, weak ETags) returns ok=false so the caller +// keeps the existing gateway-side evaluation under the distributed lock. +func buildWriteCondition(r *http.Request) (*filer_pb.WriteCondition, bool) { + headers, errCode := parseConditionalHeaders(r) + if errCode != s3err.ErrNone { + return nil, false + } + if !headers.isSet { + return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_NONE}, true + } + // Time-based conditions are rare on writes; let the lock path handle them. + if !headers.ifModifiedSince.IsZero() || !headers.ifUnmodifiedSince.IsZero() { + return nil, false + } + hasMatch := headers.ifMatch != "" + hasNoneMatch := headers.ifNoneMatch != "" + switch { + case hasMatch && !hasNoneMatch: + if headers.ifMatch == "*" { + return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_IF_EXISTS}, true + } + if etag, single := singleStrongETag(headers.ifMatch); single { + return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etag: etag}, true + } + return nil, false + case hasNoneMatch && !hasMatch: + if headers.ifNoneMatch == "*" { + return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_IF_NOT_EXISTS}, true + } + if etag, single := singleStrongETag(headers.ifNoneMatch); single { + return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_IF_ETAG_NOT_MATCH, Etag: etag}, true + } + return nil, false + default: + // Both If-Match and If-None-Match present — leave to the lock path. + return nil, false + } +} + +// singleStrongETag returns the normalized ETag when the header carries exactly +// one strong ETag, and false for ETag lists or weak ("W/") ETags, which the +// fast path does not replicate. +func singleStrongETag(v string) (string, bool) { + v = strings.TrimSpace(v) + if strings.Contains(v, ",") { + return "", false + } + if strings.HasPrefix(v, "W/") || strings.HasPrefix(v, "w/") { + return "", false + } + return strings.Trim(v, `"`), true +} + +// createEntryOnFiler sends a CreateEntry directly to the given owner filer so +// its local per-path lock serializes the write. The raw response is returned so +// the caller can distinguish PRECONDITION_FAILED from other outcomes. +func (s3a *S3ApiServer) createEntryOnFiler(owner pb.ServerAddress, req *filer_pb.CreateEntryRequest) (*filer_pb.CreateEntryResponse, error) { + var resp *filer_pb.CreateEntryResponse + err := pb.WithFilerClient(false, 0, owner, s3a.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + var e error + resp, e = client.CreateEntry(context.Background(), req) + return e + }) + return resp, err +} + +// buildDeleteCondition reduces a DeleteObject's If-Match header to a primitive. +// DeleteObject only honors If-Match (matching checkDeleteIfMatch), so other +// conditional headers are ignored here as they are on the existing path. +func buildDeleteCondition(r *http.Request) (*filer_pb.WriteCondition, bool) { + ifMatch := strings.TrimSpace(r.Header.Get(s3_constants.IfMatch)) + switch { + case ifMatch == "": + return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_NONE}, true + case ifMatch == "*": + return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_IF_EXISTS}, true + default: + if etag, single := singleStrongETag(ifMatch); single { + return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etag: etag}, true + } + return nil, false + } +} + +// deleteEntryOnFiler sends a non-recursive DeleteEntry for the object directly +// to its owner filer, which serializes it and evaluates the precondition under +// the per-path lock. Flags mirror the unversioned delete path (doDeleteEntry). +func (s3a *S3ApiServer) deleteEntryOnFiler(owner pb.ServerAddress, bucket, object string, cond *filer_pb.WriteCondition) (*filer_pb.DeleteEntryResponse, error) { + dir, name := util.NewFullPath(s3a.bucketDir(bucket), object).DirAndName() + var resp *filer_pb.DeleteEntryResponse + err := pb.WithFilerClient(false, 0, owner, s3a.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + var e error + resp, e = client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{ + Directory: dir, + Name: name, + IsDeleteData: true, + IsRecursive: false, + IgnoreRecursiveError: true, + Condition: cond, + }) + return e + }) + return resp, err +} diff --git a/weed/s3api/s3api_object_routed_write_test.go b/weed/s3api/s3api_object_routed_write_test.go new file mode 100644 index 000000000..888735297 --- /dev/null +++ b/weed/s3api/s3api_object_routed_write_test.go @@ -0,0 +1,83 @@ +package s3api + +import ( + "net/http" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" +) + +func reqWith(headers map[string]string) *http.Request { + r, _ := http.NewRequest(http.MethodPut, "/b/o", nil) + for k, v := range headers { + r.Header.Set(k, v) + } + return r +} + +func TestBuildWriteCondition(t *testing.T) { + cases := []struct { + name string + headers map[string]string + wantOk bool + want filer_pb.WriteCondition_Kind + wantTag string + }{ + {"none", nil, true, filer_pb.WriteCondition_NONE, ""}, + {"ifnonematch-star", map[string]string{s3_constants.IfNoneMatch: "*"}, true, filer_pb.WriteCondition_IF_NOT_EXISTS, ""}, + {"ifmatch-star", map[string]string{s3_constants.IfMatch: "*"}, true, filer_pb.WriteCondition_IF_EXISTS, ""}, + {"ifmatch-etag", map[string]string{s3_constants.IfMatch: `"abc"`}, true, filer_pb.WriteCondition_IF_ETAG_MATCH, "abc"}, + {"ifnonematch-etag", map[string]string{s3_constants.IfNoneMatch: `"abc"`}, true, filer_pb.WriteCondition_IF_ETAG_NOT_MATCH, "abc"}, + // Cases that must fall back to the lock path: + {"both", map[string]string{s3_constants.IfMatch: "*", s3_constants.IfNoneMatch: "*"}, false, 0, ""}, + {"etag-list", map[string]string{s3_constants.IfMatch: `"a","b"`}, false, 0, ""}, + {"weak-etag", map[string]string{s3_constants.IfMatch: `W/"abc"`}, false, 0, ""}, + {"time-based", map[string]string{s3_constants.IfUnmodifiedSince: "Wed, 21 Oct 2015 07:28:00 GMT"}, false, 0, ""}, + } + for _, tc := range cases { + cond, ok := buildWriteCondition(reqWith(tc.headers)) + if ok != tc.wantOk { + t.Errorf("%s: ok=%v want %v", tc.name, ok, tc.wantOk) + continue + } + if ok { + if cond.Kind != tc.want { + t.Errorf("%s: kind=%v want %v", tc.name, cond.Kind, tc.want) + } + if cond.Etag != tc.wantTag { + t.Errorf("%s: etag=%q want %q", tc.name, cond.Etag, tc.wantTag) + } + } + } +} + +func TestBuildDeleteCondition(t *testing.T) { + cases := []struct { + name string + ifMatch string + wantOk bool + want filer_pb.WriteCondition_Kind + wantTag string + }{ + {"none", "", true, filer_pb.WriteCondition_NONE, ""}, + {"star", "*", true, filer_pb.WriteCondition_IF_EXISTS, ""}, + {"etag", `"abc"`, true, filer_pb.WriteCondition_IF_ETAG_MATCH, "abc"}, + {"list", `"a","b"`, false, 0, ""}, + {"weak", `W/"abc"`, false, 0, ""}, + } + for _, tc := range cases { + h := map[string]string{} + if tc.ifMatch != "" { + h[s3_constants.IfMatch] = tc.ifMatch + } + cond, ok := buildDeleteCondition(reqWith(h)) + if ok != tc.wantOk { + t.Errorf("%s: ok=%v want %v", tc.name, ok, tc.wantOk) + continue + } + if ok && (cond.Kind != tc.want || cond.Etag != tc.wantTag) { + t.Errorf("%s: got kind=%v etag=%q want kind=%v etag=%q", tc.name, cond.Kind, cond.Etag, tc.want, tc.wantTag) + } + } +} diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 607cb4dc6..66f032774 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -96,6 +96,11 @@ type S3ApiServer struct { stsHandlers *STSHandlers // STS HTTP handlers for AssumeRoleWithWebIdentity cipher bool // encrypt data on volume servers newObjectWriteLock func(bucket, object string) objectWriteLock + // objectWriteLockClient holds the lock-ring view used to route an object's + // metadata writes to its owner filer (route-by-key), so that owner's local + // per-path lock serializes them without acquiring a distributed lock. Nil + // when no filers are configured. + objectWriteLockClient *cluster.LockClient // Shared ReaderCache used by the S3 GET streaming path. It lives for the // lifetime of the server so that concurrent and repeat reads share a // single in-flight download per chunk, and so that no per-request @@ -266,6 +271,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl if len(option.Filers) > 0 { objectWriteLockClient := cluster.NewLockClient(option.GrpcDialOption, option.Filers[0]) + s3ApiServer.objectWriteLockClient = objectWriteLockClient // Mirror the master's lock-ring view so each object lock dials the key's // primary filer directly instead of forwarding through the seed filer. // The masterClient already filters updates to this server's filer group. diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 2709baefd..ad457b2aa 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -441,9 +441,35 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr glog.V(4).InfofCtx(ctx, "DeleteEntry %v", req) - ctx, eventSink := filer.WithMetadataEventSink(ctx) - err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures, req.IfNotModifiedAfter) + fullpath := util.JoinPath(req.Directory, req.Name) resp = &filer_pb.DeleteEntryResponse{} + + // A single-entry delete serializes against concurrent mutations to the same + // path so a conditional delete's check and the removal are atomic. Recursive + // deletes span many paths and keep the prior unlocked behavior. + if !req.IsRecursive { + pathLock := fs.entryLockTable.AcquireLock("DeleteEntry", fullpath, util.ExclusiveLock) + defer fs.entryLockTable.ReleaseLock(fullpath, pathLock) + + if req.Condition != nil && req.Condition.Kind != filer_pb.WriteCondition_NONE { + current, findErr := fs.filer.FindEntry(ctx, fullpath) + if findErr != nil && findErr != filer_pb.ErrNotFound { + return &filer_pb.DeleteEntryResponse{}, fmt.Errorf("DeleteEntry condition check %s: %w", fullpath, findErr) + } + if findErr == filer_pb.ErrNotFound { + current = nil + } + if !writeConditionSatisfied(req.Condition, current) { + glog.V(3).InfofCtx(ctx, "DeleteEntry %s: precondition %v failed", fullpath, req.Condition.Kind) + resp.Error = "precondition failed" + resp.ErrorCode = filer_pb.FilerError_PRECONDITION_FAILED + return resp, nil + } + } + } + + ctx, eventSink := filer.WithMetadataEventSink(ctx) + err = fs.filer.DeleteEntryMetaAndData(ctx, fullpath, req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures, req.IfNotModifiedAfter) if err != nil && err != filer_pb.ErrNotFound { resp.Error = err.Error() } else {