mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
s3: route single-entry object writes to the owner filer, off the DLM
For non-versioned, non-object-lock buckets, PutObject and unversioned DeleteObject now send their metadata write straight to the object key's owner filer (resolved from the lock-ring view) with the precondition attached. The owner serializes the write with its local per-path lock and evaluates the condition atomically, so these paths no longer acquire a distributed lock. The fast path is opt-in per request and falls back to withObjectWriteLock for anything it does not fully cover: versioned or object-lock buckets, conditions that do not reduce to a single primitive (ETag lists, weak ETags, time-based, combined headers), an unresolved owner, or an unreachable owner filer. So behavior is unchanged outside the safe subset. Multi-step finalizations (copy, CompleteMultipartUpload) and versioned writes keep the distributed lock: they mutate several entries under one held lock, which a single conditional create or delete does not cover. DeleteEntry gains the same optional WriteCondition and per-path lock as CreateEntry so a routed conditional delete is atomic on the owner.
This commit is contained in:
@@ -304,11 +304,16 @@ message DeleteEntryRequest {
|
||||
bool is_from_other_cluster = 7;
|
||||
repeated int32 signatures = 8;
|
||||
int64 if_not_modified_after = 9;
|
||||
// Optional precondition evaluated against the current entry atomically with
|
||||
// the delete, under the filer's per-path lock. Only honored for a
|
||||
// non-recursive single-entry delete routed to the entry's owner filer.
|
||||
WriteCondition condition = 10;
|
||||
}
|
||||
|
||||
message DeleteEntryResponse {
|
||||
string error = 1;
|
||||
SubscribeMetadataResponse metadata_event = 2;
|
||||
FilerError error_code = 3; // machine-readable error code (e.g. PRECONDITION_FAILED)
|
||||
}
|
||||
|
||||
message AtomicRenameEntryRequest {
|
||||
|
||||
@@ -70,6 +70,18 @@ func (lc *LockClient) hostForKey(key string) pb.ServerAddress {
|
||||
return lc.seedFiler
|
||||
}
|
||||
|
||||
// PrimaryForKey returns the filer that owns key per the current ring view, or
|
||||
// "" when no ring view has been received yet. Callers use it to route a key's
|
||||
// operations to a single owner so that owner's local lock serializes them.
|
||||
func (lc *LockClient) PrimaryForKey(key string) pb.ServerAddress {
|
||||
lc.ringMu.RLock()
|
||||
defer lc.ringMu.RUnlock()
|
||||
if lc.ring == nil {
|
||||
return ""
|
||||
}
|
||||
return lc.ring.GetPrimary(key)
|
||||
}
|
||||
|
||||
type LiveLock struct {
|
||||
key string
|
||||
renewToken string
|
||||
|
||||
@@ -304,11 +304,16 @@ message DeleteEntryRequest {
|
||||
bool is_from_other_cluster = 7;
|
||||
repeated int32 signatures = 8;
|
||||
int64 if_not_modified_after = 9;
|
||||
// Optional precondition evaluated against the current entry atomically with
|
||||
// the delete, under the filer's per-path lock. Only honored for a
|
||||
// non-recursive single-entry delete routed to the entry's owner filer.
|
||||
WriteCondition condition = 10;
|
||||
}
|
||||
|
||||
message DeleteEntryResponse {
|
||||
string error = 1;
|
||||
SubscribeMetadataResponse metadata_event = 2;
|
||||
FilerError error_code = 3; // machine-readable error code (e.g. PRECONDITION_FAILED)
|
||||
}
|
||||
|
||||
message AtomicRenameEntryRequest {
|
||||
|
||||
+121
-96
@@ -1763,8 +1763,12 @@ type DeleteEntryRequest struct {
|
||||
IsFromOtherCluster bool `protobuf:"varint,7,opt,name=is_from_other_cluster,json=isFromOtherCluster,proto3" json:"is_from_other_cluster,omitempty"`
|
||||
Signatures []int32 `protobuf:"varint,8,rep,packed,name=signatures,proto3" json:"signatures,omitempty"`
|
||||
IfNotModifiedAfter int64 `protobuf:"varint,9,opt,name=if_not_modified_after,json=ifNotModifiedAfter,proto3" json:"if_not_modified_after,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
// Optional precondition evaluated against the current entry atomically with
|
||||
// the delete, under the filer's per-path lock. Only honored for a
|
||||
// non-recursive single-entry delete routed to the entry's owner filer.
|
||||
Condition *WriteCondition `protobuf:"bytes,10,opt,name=condition,proto3" json:"condition,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
func (x *DeleteEntryRequest) Reset() {
|
||||
@@ -1853,10 +1857,18 @@ func (x *DeleteEntryRequest) GetIfNotModifiedAfter() int64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *DeleteEntryRequest) GetCondition() *WriteCondition {
|
||||
if x != nil {
|
||||
return x.Condition
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type DeleteEntryResponse struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
|
||||
MetadataEvent *SubscribeMetadataResponse `protobuf:"bytes,2,opt,name=metadata_event,json=metadataEvent,proto3" json:"metadata_event,omitempty"`
|
||||
ErrorCode FilerError `protobuf:"varint,3,opt,name=error_code,json=errorCode,proto3,enum=filer_pb.FilerError" json:"error_code,omitempty"` // machine-readable error code (e.g. PRECONDITION_FAILED)
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -1905,6 +1917,13 @@ func (x *DeleteEntryResponse) GetMetadataEvent() *SubscribeMetadataResponse {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *DeleteEntryResponse) GetErrorCode() FilerError {
|
||||
if x != nil {
|
||||
return x.ErrorCode
|
||||
}
|
||||
return FilerError_OK
|
||||
}
|
||||
|
||||
type AtomicRenameEntryRequest struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
OldDirectory string `protobuf:"bytes,1,opt,name=old_directory,json=oldDirectory,proto3" json:"old_directory,omitempty"`
|
||||
@@ -5779,7 +5798,7 @@ const file_filer_proto_rawDesc = "" +
|
||||
"\n" +
|
||||
"entry_name\x18\x02 \x01(\tR\tentryName\x12+\n" +
|
||||
"\x06chunks\x18\x03 \x03(\v2\x13.filer_pb.FileChunkR\x06chunks\"\x17\n" +
|
||||
"\x15AppendToEntryResponse\"\xcb\x02\n" +
|
||||
"\x15AppendToEntryResponse\"\x83\x03\n" +
|
||||
"\x12DeleteEntryRequest\x12\x1c\n" +
|
||||
"\tdirectory\x18\x01 \x01(\tR\tdirectory\x12\x12\n" +
|
||||
"\x04name\x18\x02 \x01(\tR\x04name\x12$\n" +
|
||||
@@ -5790,10 +5809,14 @@ const file_filer_proto_rawDesc = "" +
|
||||
"\n" +
|
||||
"signatures\x18\b \x03(\x05R\n" +
|
||||
"signatures\x121\n" +
|
||||
"\x15if_not_modified_after\x18\t \x01(\x03R\x12ifNotModifiedAfter\"w\n" +
|
||||
"\x15if_not_modified_after\x18\t \x01(\x03R\x12ifNotModifiedAfter\x126\n" +
|
||||
"\tcondition\x18\n" +
|
||||
" \x01(\v2\x18.filer_pb.WriteConditionR\tcondition\"\xac\x01\n" +
|
||||
"\x13DeleteEntryResponse\x12\x14\n" +
|
||||
"\x05error\x18\x01 \x01(\tR\x05error\x12J\n" +
|
||||
"\x0emetadata_event\x18\x02 \x01(\v2#.filer_pb.SubscribeMetadataResponseR\rmetadataEvent\"\xba\x01\n" +
|
||||
"\x0emetadata_event\x18\x02 \x01(\v2#.filer_pb.SubscribeMetadataResponseR\rmetadataEvent\x123\n" +
|
||||
"\n" +
|
||||
"error_code\x18\x03 \x01(\x0e2\x14.filer_pb.FilerErrorR\terrorCode\"\xba\x01\n" +
|
||||
"\x18AtomicRenameEntryRequest\x12#\n" +
|
||||
"\rold_directory\x18\x01 \x01(\tR\foldDirectory\x12\x19\n" +
|
||||
"\bold_name\x18\x02 \x01(\tR\aoldName\x12#\n" +
|
||||
@@ -6278,97 +6301,99 @@ var file_filer_proto_depIdxs = []int32{
|
||||
83, // 19: filer_pb.UpdateEntryRequest.expected_extended:type_name -> filer_pb.UpdateEntryRequest.ExpectedExtendedEntry
|
||||
48, // 20: filer_pb.UpdateEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse
|
||||
11, // 21: filer_pb.AppendToEntryRequest.chunks:type_name -> filer_pb.FileChunk
|
||||
48, // 22: filer_pb.DeleteEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse
|
||||
10, // 23: filer_pb.StreamRenameEntryResponse.event_notification:type_name -> filer_pb.EventNotification
|
||||
34, // 24: filer_pb.AssignVolumeResponse.location:type_name -> filer_pb.Location
|
||||
34, // 25: filer_pb.Locations.locations:type_name -> filer_pb.Location
|
||||
84, // 26: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry
|
||||
36, // 27: filer_pb.CollectionListResponse.collections:type_name -> filer_pb.Collection
|
||||
10, // 28: filer_pb.SubscribeMetadataResponse.event_notification:type_name -> filer_pb.EventNotification
|
||||
48, // 29: filer_pb.SubscribeMetadataResponse.events:type_name -> filer_pb.SubscribeMetadataResponse
|
||||
49, // 30: filer_pb.SubscribeMetadataResponse.log_file_refs:type_name -> filer_pb.LogFileChunkRef
|
||||
11, // 31: filer_pb.LogFileChunkRef.chunks:type_name -> filer_pb.FileChunk
|
||||
8, // 32: filer_pb.TraverseBfsMetadataResponse.entry:type_name -> filer_pb.Entry
|
||||
85, // 33: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource
|
||||
86, // 34: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf
|
||||
8, // 35: filer_pb.CacheRemoteObjectToLocalClusterResponse.entry:type_name -> filer_pb.Entry
|
||||
48, // 36: filer_pb.CacheRemoteObjectToLocalClusterResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse
|
||||
70, // 37: filer_pb.TransferLocksRequest.locks:type_name -> filer_pb.Lock
|
||||
15, // 38: filer_pb.StreamMutateEntryRequest.create_request:type_name -> filer_pb.CreateEntryRequest
|
||||
18, // 39: filer_pb.StreamMutateEntryRequest.update_request:type_name -> filer_pb.UpdateEntryRequest
|
||||
24, // 40: filer_pb.StreamMutateEntryRequest.delete_request:type_name -> filer_pb.DeleteEntryRequest
|
||||
28, // 41: filer_pb.StreamMutateEntryRequest.rename_request:type_name -> filer_pb.StreamRenameEntryRequest
|
||||
17, // 42: filer_pb.StreamMutateEntryResponse.create_response:type_name -> filer_pb.CreateEntryResponse
|
||||
19, // 43: filer_pb.StreamMutateEntryResponse.update_response:type_name -> filer_pb.UpdateEntryResponse
|
||||
25, // 44: filer_pb.StreamMutateEntryResponse.delete_response:type_name -> filer_pb.DeleteEntryResponse
|
||||
29, // 45: filer_pb.StreamMutateEntryResponse.rename_response:type_name -> filer_pb.StreamRenameEntryResponse
|
||||
81, // 46: filer_pb.MountListResponse.mounts:type_name -> filer_pb.MountInfo
|
||||
33, // 47: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations
|
||||
3, // 48: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest
|
||||
5, // 49: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest
|
||||
15, // 50: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest
|
||||
18, // 51: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest
|
||||
20, // 52: filer_pb.SeaweedFiler.TouchAccessTime:input_type -> filer_pb.TouchAccessTimeRequest
|
||||
22, // 53: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest
|
||||
24, // 54: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest
|
||||
26, // 55: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest
|
||||
28, // 56: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest
|
||||
75, // 57: filer_pb.SeaweedFiler.StreamMutateEntry:input_type -> filer_pb.StreamMutateEntryRequest
|
||||
30, // 58: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest
|
||||
32, // 59: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest
|
||||
37, // 60: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest
|
||||
39, // 61: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest
|
||||
41, // 62: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest
|
||||
43, // 63: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest
|
||||
45, // 64: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest
|
||||
50, // 65: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest
|
||||
47, // 66: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest
|
||||
47, // 67: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest
|
||||
57, // 68: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest
|
||||
59, // 69: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest
|
||||
62, // 70: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest
|
||||
64, // 71: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest
|
||||
66, // 72: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest
|
||||
68, // 73: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest
|
||||
71, // 74: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest
|
||||
73, // 75: filer_pb.SeaweedFiler.ReplicateLock:input_type -> filer_pb.ReplicateLockRequest
|
||||
77, // 76: filer_pb.SeaweedFiler.MountRegister:input_type -> filer_pb.MountRegisterRequest
|
||||
79, // 77: filer_pb.SeaweedFiler.MountList:input_type -> filer_pb.MountListRequest
|
||||
4, // 78: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse
|
||||
6, // 79: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse
|
||||
17, // 80: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse
|
||||
19, // 81: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse
|
||||
21, // 82: filer_pb.SeaweedFiler.TouchAccessTime:output_type -> filer_pb.TouchAccessTimeResponse
|
||||
23, // 83: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse
|
||||
25, // 84: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse
|
||||
27, // 85: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse
|
||||
29, // 86: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse
|
||||
76, // 87: filer_pb.SeaweedFiler.StreamMutateEntry:output_type -> filer_pb.StreamMutateEntryResponse
|
||||
31, // 88: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse
|
||||
35, // 89: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse
|
||||
38, // 90: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse
|
||||
40, // 91: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse
|
||||
42, // 92: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse
|
||||
44, // 93: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse
|
||||
46, // 94: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse
|
||||
51, // 95: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse
|
||||
48, // 96: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse
|
||||
48, // 97: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse
|
||||
58, // 98: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse
|
||||
60, // 99: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse
|
||||
63, // 100: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse
|
||||
65, // 101: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse
|
||||
67, // 102: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse
|
||||
69, // 103: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse
|
||||
72, // 104: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse
|
||||
74, // 105: filer_pb.SeaweedFiler.ReplicateLock:output_type -> filer_pb.ReplicateLockResponse
|
||||
78, // 106: filer_pb.SeaweedFiler.MountRegister:output_type -> filer_pb.MountRegisterResponse
|
||||
80, // 107: filer_pb.SeaweedFiler.MountList:output_type -> filer_pb.MountListResponse
|
||||
78, // [78:108] is the sub-list for method output_type
|
||||
48, // [48:78] is the sub-list for method input_type
|
||||
48, // [48:48] is the sub-list for extension type_name
|
||||
48, // [48:48] is the sub-list for extension extendee
|
||||
0, // [0:48] is the sub-list for field type_name
|
||||
16, // 22: filer_pb.DeleteEntryRequest.condition:type_name -> filer_pb.WriteCondition
|
||||
48, // 23: filer_pb.DeleteEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse
|
||||
1, // 24: filer_pb.DeleteEntryResponse.error_code:type_name -> filer_pb.FilerError
|
||||
10, // 25: filer_pb.StreamRenameEntryResponse.event_notification:type_name -> filer_pb.EventNotification
|
||||
34, // 26: filer_pb.AssignVolumeResponse.location:type_name -> filer_pb.Location
|
||||
34, // 27: filer_pb.Locations.locations:type_name -> filer_pb.Location
|
||||
84, // 28: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry
|
||||
36, // 29: filer_pb.CollectionListResponse.collections:type_name -> filer_pb.Collection
|
||||
10, // 30: filer_pb.SubscribeMetadataResponse.event_notification:type_name -> filer_pb.EventNotification
|
||||
48, // 31: filer_pb.SubscribeMetadataResponse.events:type_name -> filer_pb.SubscribeMetadataResponse
|
||||
49, // 32: filer_pb.SubscribeMetadataResponse.log_file_refs:type_name -> filer_pb.LogFileChunkRef
|
||||
11, // 33: filer_pb.LogFileChunkRef.chunks:type_name -> filer_pb.FileChunk
|
||||
8, // 34: filer_pb.TraverseBfsMetadataResponse.entry:type_name -> filer_pb.Entry
|
||||
85, // 35: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource
|
||||
86, // 36: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf
|
||||
8, // 37: filer_pb.CacheRemoteObjectToLocalClusterResponse.entry:type_name -> filer_pb.Entry
|
||||
48, // 38: filer_pb.CacheRemoteObjectToLocalClusterResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse
|
||||
70, // 39: filer_pb.TransferLocksRequest.locks:type_name -> filer_pb.Lock
|
||||
15, // 40: filer_pb.StreamMutateEntryRequest.create_request:type_name -> filer_pb.CreateEntryRequest
|
||||
18, // 41: filer_pb.StreamMutateEntryRequest.update_request:type_name -> filer_pb.UpdateEntryRequest
|
||||
24, // 42: filer_pb.StreamMutateEntryRequest.delete_request:type_name -> filer_pb.DeleteEntryRequest
|
||||
28, // 43: filer_pb.StreamMutateEntryRequest.rename_request:type_name -> filer_pb.StreamRenameEntryRequest
|
||||
17, // 44: filer_pb.StreamMutateEntryResponse.create_response:type_name -> filer_pb.CreateEntryResponse
|
||||
19, // 45: filer_pb.StreamMutateEntryResponse.update_response:type_name -> filer_pb.UpdateEntryResponse
|
||||
25, // 46: filer_pb.StreamMutateEntryResponse.delete_response:type_name -> filer_pb.DeleteEntryResponse
|
||||
29, // 47: filer_pb.StreamMutateEntryResponse.rename_response:type_name -> filer_pb.StreamRenameEntryResponse
|
||||
81, // 48: filer_pb.MountListResponse.mounts:type_name -> filer_pb.MountInfo
|
||||
33, // 49: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations
|
||||
3, // 50: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest
|
||||
5, // 51: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest
|
||||
15, // 52: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest
|
||||
18, // 53: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest
|
||||
20, // 54: filer_pb.SeaweedFiler.TouchAccessTime:input_type -> filer_pb.TouchAccessTimeRequest
|
||||
22, // 55: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest
|
||||
24, // 56: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest
|
||||
26, // 57: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest
|
||||
28, // 58: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest
|
||||
75, // 59: filer_pb.SeaweedFiler.StreamMutateEntry:input_type -> filer_pb.StreamMutateEntryRequest
|
||||
30, // 60: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest
|
||||
32, // 61: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest
|
||||
37, // 62: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest
|
||||
39, // 63: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest
|
||||
41, // 64: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest
|
||||
43, // 65: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest
|
||||
45, // 66: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest
|
||||
50, // 67: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest
|
||||
47, // 68: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest
|
||||
47, // 69: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest
|
||||
57, // 70: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest
|
||||
59, // 71: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest
|
||||
62, // 72: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest
|
||||
64, // 73: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest
|
||||
66, // 74: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest
|
||||
68, // 75: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest
|
||||
71, // 76: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest
|
||||
73, // 77: filer_pb.SeaweedFiler.ReplicateLock:input_type -> filer_pb.ReplicateLockRequest
|
||||
77, // 78: filer_pb.SeaweedFiler.MountRegister:input_type -> filer_pb.MountRegisterRequest
|
||||
79, // 79: filer_pb.SeaweedFiler.MountList:input_type -> filer_pb.MountListRequest
|
||||
4, // 80: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse
|
||||
6, // 81: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse
|
||||
17, // 82: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse
|
||||
19, // 83: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse
|
||||
21, // 84: filer_pb.SeaweedFiler.TouchAccessTime:output_type -> filer_pb.TouchAccessTimeResponse
|
||||
23, // 85: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse
|
||||
25, // 86: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse
|
||||
27, // 87: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse
|
||||
29, // 88: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse
|
||||
76, // 89: filer_pb.SeaweedFiler.StreamMutateEntry:output_type -> filer_pb.StreamMutateEntryResponse
|
||||
31, // 90: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse
|
||||
35, // 91: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse
|
||||
38, // 92: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse
|
||||
40, // 93: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse
|
||||
42, // 94: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse
|
||||
44, // 95: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse
|
||||
46, // 96: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse
|
||||
51, // 97: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse
|
||||
48, // 98: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse
|
||||
48, // 99: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse
|
||||
58, // 100: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse
|
||||
60, // 101: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse
|
||||
63, // 102: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse
|
||||
65, // 103: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse
|
||||
67, // 104: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse
|
||||
69, // 105: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse
|
||||
72, // 106: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse
|
||||
74, // 107: filer_pb.SeaweedFiler.ReplicateLock:output_type -> filer_pb.ReplicateLockResponse
|
||||
78, // 108: filer_pb.SeaweedFiler.MountRegister:output_type -> filer_pb.MountRegisterResponse
|
||||
80, // 109: filer_pb.SeaweedFiler.MountList:output_type -> filer_pb.MountListResponse
|
||||
80, // [80:110] is the sub-list for method output_type
|
||||
50, // [50:80] is the sub-list for method input_type
|
||||
50, // [50:50] is the sub-list for extension type_name
|
||||
50, // [50:50] is the sub-list for extension extendee
|
||||
0, // [0:50] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_filer_proto_init() }
|
||||
|
||||
@@ -214,33 +214,60 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
|
||||
}
|
||||
|
||||
var deleteResult deleteMutationResult
|
||||
deleteCode := s3a.withObjectWriteLock(bucket, object, func() s3err.ErrorCode {
|
||||
return s3a.checkDeleteIfMatch(bucket, object, versionId, versioningState, r.Header.Get(s3_constants.IfMatch), s3err.ErrPreconditionFailed)
|
||||
}, func() s3err.ErrorCode {
|
||||
if versioningConfigured {
|
||||
result, errCode := s3a.deleteVersionedObject(r, bucket, object, versionId, versioningState)
|
||||
if errCode != s3err.ErrNone {
|
||||
return errCode
|
||||
var deleteCode s3err.ErrorCode
|
||||
|
||||
// Fast path: route an unversioned object delete to its owner filer, which
|
||||
// serializes it and evaluates the If-Match precondition under the per-path
|
||||
// lock, avoiding the distributed lock. Falls back below for versioned /
|
||||
// object-lock buckets, unsupported conditions, or an unreachable owner.
|
||||
deleteHandled := false
|
||||
if !versioningConfigured {
|
||||
if cond, condOk := buildDeleteCondition(r); condOk {
|
||||
if owner, ownerOk := s3a.routedObjectOwner(bucket, object); ownerOk {
|
||||
resp, err := s3a.deleteEntryOnFiler(owner, bucket, object, cond)
|
||||
switch {
|
||||
case err != nil:
|
||||
glog.Warningf("DeleteObjectHandler: routed delete to %s failed for %s/%s, falling back to lock: %v", owner, bucket, object, err)
|
||||
case resp.ErrorCode == filer_pb.FilerError_PRECONDITION_FAILED:
|
||||
deleteCode, deleteHandled = s3err.ErrPreconditionFailed, true
|
||||
case resp.Error != "":
|
||||
glog.Errorf("DeleteObjectHandler: routed delete failed for %s/%s: %s", bucket, object, resp.Error)
|
||||
deleteCode, deleteHandled = s3err.ErrInternalError, true
|
||||
default:
|
||||
deleteCode, deleteHandled = s3err.ErrNone, true
|
||||
}
|
||||
}
|
||||
deleteResult = result
|
||||
}
|
||||
}
|
||||
if !deleteHandled {
|
||||
deleteCode = s3a.withObjectWriteLock(bucket, object, func() s3err.ErrorCode {
|
||||
return s3a.checkDeleteIfMatch(bucket, object, versionId, versioningState, r.Header.Get(s3_constants.IfMatch), s3err.ErrPreconditionFailed)
|
||||
}, func() s3err.ErrorCode {
|
||||
if versioningConfigured {
|
||||
result, errCode := s3a.deleteVersionedObject(r, bucket, object, versionId, versioningState)
|
||||
if errCode != s3err.ErrNone {
|
||||
return errCode
|
||||
}
|
||||
deleteResult = result
|
||||
return s3err.ErrNone
|
||||
}
|
||||
|
||||
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
|
||||
if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil {
|
||||
glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err)
|
||||
return s3err.ErrAccessDenied
|
||||
}
|
||||
|
||||
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
return s3a.deleteUnversionedObjectWithClient(client, bucket, object, false)
|
||||
}); err != nil {
|
||||
glog.Errorf("DeleteObjectHandler: failed to delete %s/%s: %v", bucket, object, err)
|
||||
return s3err.ErrInternalError
|
||||
}
|
||||
|
||||
return s3err.ErrNone
|
||||
}
|
||||
|
||||
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
|
||||
if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil {
|
||||
glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err)
|
||||
return s3err.ErrAccessDenied
|
||||
}
|
||||
|
||||
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
return s3a.deleteUnversionedObjectWithClient(client, bucket, object, false)
|
||||
}); err != nil {
|
||||
glog.Errorf("DeleteObjectHandler: failed to delete %s/%s: %v", bucket, object, err)
|
||||
return s3err.ErrInternalError
|
||||
}
|
||||
|
||||
return s3err.ErrNone
|
||||
})
|
||||
})
|
||||
}
|
||||
if deleteCode != s3err.ErrNone {
|
||||
s3err.WriteErrorResponse(w, r, deleteCode)
|
||||
return
|
||||
|
||||
@@ -802,7 +802,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
|
||||
}
|
||||
return s3a.checkConditionalHeaders(r, bucket, object)
|
||||
}
|
||||
createCode := s3a.withObjectWriteLock(bucket, object, preconditionFn, func() s3err.ErrorCode {
|
||||
createUnderLock := func() s3err.ErrorCode {
|
||||
createErr = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
req := &filer_pb.CreateEntryRequest{
|
||||
Directory: path.Dir(filePath),
|
||||
@@ -831,7 +831,37 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
|
||||
}
|
||||
}
|
||||
return s3err.ErrNone
|
||||
})
|
||||
}
|
||||
|
||||
// Fast path: route the create to the object key's owner filer, which
|
||||
// serializes it with its local per-path lock and evaluates the precondition,
|
||||
// avoiding the distributed lock. Restricted to the safe subset by
|
||||
// routedObjectWrite; falls back to the lock for everything else and if the
|
||||
// owner is unreachable.
|
||||
var createCode s3err.ErrorCode
|
||||
routed := false
|
||||
if owner, cond, ok := s3a.routedObjectWrite(r, bucket, object, afterCreate != nil); ok {
|
||||
req := &filer_pb.CreateEntryRequest{
|
||||
Directory: path.Dir(filePath),
|
||||
Entry: entry,
|
||||
Condition: cond,
|
||||
}
|
||||
resp, err := s3a.createEntryOnFiler(owner, req)
|
||||
switch {
|
||||
case err != nil:
|
||||
glog.Warningf("putToFiler: routed create to %s failed for %s, falling back to lock: %v", owner, filePath, err)
|
||||
case resp.ErrorCode == filer_pb.FilerError_PRECONDITION_FAILED:
|
||||
createCode, routed = s3err.ErrPreconditionFailed, true
|
||||
case resp.Error != "":
|
||||
createErr = fmt.Errorf("%s", resp.Error)
|
||||
createCode, routed = filerErrorToS3Error(createErr), true
|
||||
default:
|
||||
entryCreated, createCode, routed = true, s3err.ErrNone, true
|
||||
}
|
||||
}
|
||||
if !routed {
|
||||
createCode = s3a.withObjectWriteLock(bucket, object, preconditionFn, createUnderLock)
|
||||
}
|
||||
if createCode != s3err.ErrNone {
|
||||
if createErr != nil {
|
||||
glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr)
|
||||
|
||||
@@ -0,0 +1,168 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
// routedObjectOwner returns the filer that owns this object's metadata for
|
||||
// route-by-key, or ok=false when the object's writes must keep the distributed
|
||||
// lock. Versioned and object-lock buckets stay on the lock path: their
|
||||
// mutations span multiple entries / extra metadata checks a single conditional
|
||||
// create or delete does not cover. On any lookup error it falls back to be safe.
|
||||
func (s3a *S3ApiServer) routedObjectOwner(bucket, object string) (pb.ServerAddress, bool) {
|
||||
if object == "" || s3a.objectWriteLockClient == nil {
|
||||
return "", false
|
||||
}
|
||||
if configured, err := s3a.isVersioningConfigured(bucket); err != nil || configured {
|
||||
return "", false
|
||||
}
|
||||
if locked, err := s3a.isObjectLockEnabled(bucket); err != nil || locked {
|
||||
return "", false
|
||||
}
|
||||
lockKey := fmt.Sprintf("s3.object.write:%s", s3a.toFilerPath(bucket, object))
|
||||
owner := s3a.objectWriteLockClient.PrimaryForKey(lockKey)
|
||||
if owner == "" {
|
||||
return "", false
|
||||
}
|
||||
return owner, true
|
||||
}
|
||||
|
||||
// routedObjectWrite decides whether an object PUT can take the route-by-key
|
||||
// fast path: the metadata write goes straight to the key's owner filer, which
|
||||
// serializes it with its local per-path lock and evaluates the precondition,
|
||||
// instead of acquiring a distributed lock. It returns the owner filer and the
|
||||
// reduced precondition, or ok=false to fall back to withObjectWriteLock.
|
||||
//
|
||||
// The fast path additionally requires no post-create hook and a precondition
|
||||
// that reduces to one primitive.
|
||||
func (s3a *S3ApiServer) routedObjectWrite(r *http.Request, bucket, object string, hasAfterCreate bool) (owner pb.ServerAddress, cond *filer_pb.WriteCondition, ok bool) {
|
||||
if hasAfterCreate {
|
||||
return "", nil, false
|
||||
}
|
||||
cond, condOk := buildWriteCondition(r)
|
||||
if !condOk {
|
||||
return "", nil, false
|
||||
}
|
||||
owner, ok = s3a.routedObjectOwner(bucket, object)
|
||||
if !ok {
|
||||
return "", nil, false
|
||||
}
|
||||
return owner, cond, true
|
||||
}
|
||||
|
||||
// buildWriteCondition reduces the request's conditional headers to a single
|
||||
// WriteCondition primitive the filer can evaluate. It only handles the
|
||||
// unambiguous single-condition cases; anything else (header combinations,
|
||||
// time-based conditions, ETag lists, weak ETags) returns ok=false so the caller
|
||||
// keeps the existing gateway-side evaluation under the distributed lock.
|
||||
func buildWriteCondition(r *http.Request) (*filer_pb.WriteCondition, bool) {
|
||||
headers, errCode := parseConditionalHeaders(r)
|
||||
if errCode != s3err.ErrNone {
|
||||
return nil, false
|
||||
}
|
||||
if !headers.isSet {
|
||||
return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_NONE}, true
|
||||
}
|
||||
// Time-based conditions are rare on writes; let the lock path handle them.
|
||||
if !headers.ifModifiedSince.IsZero() || !headers.ifUnmodifiedSince.IsZero() {
|
||||
return nil, false
|
||||
}
|
||||
hasMatch := headers.ifMatch != ""
|
||||
hasNoneMatch := headers.ifNoneMatch != ""
|
||||
switch {
|
||||
case hasMatch && !hasNoneMatch:
|
||||
if headers.ifMatch == "*" {
|
||||
return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_IF_EXISTS}, true
|
||||
}
|
||||
if etag, single := singleStrongETag(headers.ifMatch); single {
|
||||
return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etag: etag}, true
|
||||
}
|
||||
return nil, false
|
||||
case hasNoneMatch && !hasMatch:
|
||||
if headers.ifNoneMatch == "*" {
|
||||
return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_IF_NOT_EXISTS}, true
|
||||
}
|
||||
if etag, single := singleStrongETag(headers.ifNoneMatch); single {
|
||||
return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_IF_ETAG_NOT_MATCH, Etag: etag}, true
|
||||
}
|
||||
return nil, false
|
||||
default:
|
||||
// Both If-Match and If-None-Match present — leave to the lock path.
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
|
||||
// singleStrongETag returns the normalized ETag when the header carries exactly
|
||||
// one strong ETag, and false for ETag lists or weak ("W/") ETags, which the
|
||||
// fast path does not replicate.
|
||||
func singleStrongETag(v string) (string, bool) {
|
||||
v = strings.TrimSpace(v)
|
||||
if strings.Contains(v, ",") {
|
||||
return "", false
|
||||
}
|
||||
if strings.HasPrefix(v, "W/") || strings.HasPrefix(v, "w/") {
|
||||
return "", false
|
||||
}
|
||||
return strings.Trim(v, `"`), true
|
||||
}
|
||||
|
||||
// createEntryOnFiler sends a CreateEntry directly to the given owner filer so
|
||||
// its local per-path lock serializes the write. The raw response is returned so
|
||||
// the caller can distinguish PRECONDITION_FAILED from other outcomes.
|
||||
func (s3a *S3ApiServer) createEntryOnFiler(owner pb.ServerAddress, req *filer_pb.CreateEntryRequest) (*filer_pb.CreateEntryResponse, error) {
|
||||
var resp *filer_pb.CreateEntryResponse
|
||||
err := pb.WithFilerClient(false, 0, owner, s3a.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
var e error
|
||||
resp, e = client.CreateEntry(context.Background(), req)
|
||||
return e
|
||||
})
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// buildDeleteCondition reduces a DeleteObject's If-Match header to a primitive.
|
||||
// DeleteObject only honors If-Match (matching checkDeleteIfMatch), so other
|
||||
// conditional headers are ignored here as they are on the existing path.
|
||||
func buildDeleteCondition(r *http.Request) (*filer_pb.WriteCondition, bool) {
|
||||
ifMatch := strings.TrimSpace(r.Header.Get(s3_constants.IfMatch))
|
||||
switch {
|
||||
case ifMatch == "":
|
||||
return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_NONE}, true
|
||||
case ifMatch == "*":
|
||||
return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_IF_EXISTS}, true
|
||||
default:
|
||||
if etag, single := singleStrongETag(ifMatch); single {
|
||||
return &filer_pb.WriteCondition{Kind: filer_pb.WriteCondition_IF_ETAG_MATCH, Etag: etag}, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
|
||||
// deleteEntryOnFiler sends a non-recursive DeleteEntry for the object directly
|
||||
// to its owner filer, which serializes it and evaluates the precondition under
|
||||
// the per-path lock. Flags mirror the unversioned delete path (doDeleteEntry).
|
||||
func (s3a *S3ApiServer) deleteEntryOnFiler(owner pb.ServerAddress, bucket, object string, cond *filer_pb.WriteCondition) (*filer_pb.DeleteEntryResponse, error) {
|
||||
dir, name := util.NewFullPath(s3a.bucketDir(bucket), object).DirAndName()
|
||||
var resp *filer_pb.DeleteEntryResponse
|
||||
err := pb.WithFilerClient(false, 0, owner, s3a.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
var e error
|
||||
resp, e = client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
|
||||
Directory: dir,
|
||||
Name: name,
|
||||
IsDeleteData: true,
|
||||
IsRecursive: false,
|
||||
IgnoreRecursiveError: true,
|
||||
Condition: cond,
|
||||
})
|
||||
return e
|
||||
})
|
||||
return resp, err
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
)
|
||||
|
||||
func reqWith(headers map[string]string) *http.Request {
|
||||
r, _ := http.NewRequest(http.MethodPut, "/b/o", nil)
|
||||
for k, v := range headers {
|
||||
r.Header.Set(k, v)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func TestBuildWriteCondition(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
headers map[string]string
|
||||
wantOk bool
|
||||
want filer_pb.WriteCondition_Kind
|
||||
wantTag string
|
||||
}{
|
||||
{"none", nil, true, filer_pb.WriteCondition_NONE, ""},
|
||||
{"ifnonematch-star", map[string]string{s3_constants.IfNoneMatch: "*"}, true, filer_pb.WriteCondition_IF_NOT_EXISTS, ""},
|
||||
{"ifmatch-star", map[string]string{s3_constants.IfMatch: "*"}, true, filer_pb.WriteCondition_IF_EXISTS, ""},
|
||||
{"ifmatch-etag", map[string]string{s3_constants.IfMatch: `"abc"`}, true, filer_pb.WriteCondition_IF_ETAG_MATCH, "abc"},
|
||||
{"ifnonematch-etag", map[string]string{s3_constants.IfNoneMatch: `"abc"`}, true, filer_pb.WriteCondition_IF_ETAG_NOT_MATCH, "abc"},
|
||||
// Cases that must fall back to the lock path:
|
||||
{"both", map[string]string{s3_constants.IfMatch: "*", s3_constants.IfNoneMatch: "*"}, false, 0, ""},
|
||||
{"etag-list", map[string]string{s3_constants.IfMatch: `"a","b"`}, false, 0, ""},
|
||||
{"weak-etag", map[string]string{s3_constants.IfMatch: `W/"abc"`}, false, 0, ""},
|
||||
{"time-based", map[string]string{s3_constants.IfUnmodifiedSince: "Wed, 21 Oct 2015 07:28:00 GMT"}, false, 0, ""},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
cond, ok := buildWriteCondition(reqWith(tc.headers))
|
||||
if ok != tc.wantOk {
|
||||
t.Errorf("%s: ok=%v want %v", tc.name, ok, tc.wantOk)
|
||||
continue
|
||||
}
|
||||
if ok {
|
||||
if cond.Kind != tc.want {
|
||||
t.Errorf("%s: kind=%v want %v", tc.name, cond.Kind, tc.want)
|
||||
}
|
||||
if cond.Etag != tc.wantTag {
|
||||
t.Errorf("%s: etag=%q want %q", tc.name, cond.Etag, tc.wantTag)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildDeleteCondition(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
ifMatch string
|
||||
wantOk bool
|
||||
want filer_pb.WriteCondition_Kind
|
||||
wantTag string
|
||||
}{
|
||||
{"none", "", true, filer_pb.WriteCondition_NONE, ""},
|
||||
{"star", "*", true, filer_pb.WriteCondition_IF_EXISTS, ""},
|
||||
{"etag", `"abc"`, true, filer_pb.WriteCondition_IF_ETAG_MATCH, "abc"},
|
||||
{"list", `"a","b"`, false, 0, ""},
|
||||
{"weak", `W/"abc"`, false, 0, ""},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
h := map[string]string{}
|
||||
if tc.ifMatch != "" {
|
||||
h[s3_constants.IfMatch] = tc.ifMatch
|
||||
}
|
||||
cond, ok := buildDeleteCondition(reqWith(h))
|
||||
if ok != tc.wantOk {
|
||||
t.Errorf("%s: ok=%v want %v", tc.name, ok, tc.wantOk)
|
||||
continue
|
||||
}
|
||||
if ok && (cond.Kind != tc.want || cond.Etag != tc.wantTag) {
|
||||
t.Errorf("%s: got kind=%v etag=%q want kind=%v etag=%q", tc.name, cond.Kind, cond.Etag, tc.want, tc.wantTag)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -96,6 +96,11 @@ type S3ApiServer struct {
|
||||
stsHandlers *STSHandlers // STS HTTP handlers for AssumeRoleWithWebIdentity
|
||||
cipher bool // encrypt data on volume servers
|
||||
newObjectWriteLock func(bucket, object string) objectWriteLock
|
||||
// objectWriteLockClient holds the lock-ring view used to route an object's
|
||||
// metadata writes to its owner filer (route-by-key), so that owner's local
|
||||
// per-path lock serializes them without acquiring a distributed lock. Nil
|
||||
// when no filers are configured.
|
||||
objectWriteLockClient *cluster.LockClient
|
||||
// Shared ReaderCache used by the S3 GET streaming path. It lives for the
|
||||
// lifetime of the server so that concurrent and repeat reads share a
|
||||
// single in-flight download per chunk, and so that no per-request
|
||||
@@ -266,6 +271,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
|
||||
|
||||
if len(option.Filers) > 0 {
|
||||
objectWriteLockClient := cluster.NewLockClient(option.GrpcDialOption, option.Filers[0])
|
||||
s3ApiServer.objectWriteLockClient = objectWriteLockClient
|
||||
// Mirror the master's lock-ring view so each object lock dials the key's
|
||||
// primary filer directly instead of forwarding through the seed filer.
|
||||
// The masterClient already filters updates to this server's filer group.
|
||||
|
||||
@@ -441,9 +441,35 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
|
||||
|
||||
glog.V(4).InfofCtx(ctx, "DeleteEntry %v", req)
|
||||
|
||||
ctx, eventSink := filer.WithMetadataEventSink(ctx)
|
||||
err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures, req.IfNotModifiedAfter)
|
||||
fullpath := util.JoinPath(req.Directory, req.Name)
|
||||
resp = &filer_pb.DeleteEntryResponse{}
|
||||
|
||||
// A single-entry delete serializes against concurrent mutations to the same
|
||||
// path so a conditional delete's check and the removal are atomic. Recursive
|
||||
// deletes span many paths and keep the prior unlocked behavior.
|
||||
if !req.IsRecursive {
|
||||
pathLock := fs.entryLockTable.AcquireLock("DeleteEntry", fullpath, util.ExclusiveLock)
|
||||
defer fs.entryLockTable.ReleaseLock(fullpath, pathLock)
|
||||
|
||||
if req.Condition != nil && req.Condition.Kind != filer_pb.WriteCondition_NONE {
|
||||
current, findErr := fs.filer.FindEntry(ctx, fullpath)
|
||||
if findErr != nil && findErr != filer_pb.ErrNotFound {
|
||||
return &filer_pb.DeleteEntryResponse{}, fmt.Errorf("DeleteEntry condition check %s: %w", fullpath, findErr)
|
||||
}
|
||||
if findErr == filer_pb.ErrNotFound {
|
||||
current = nil
|
||||
}
|
||||
if !writeConditionSatisfied(req.Condition, current) {
|
||||
glog.V(3).InfofCtx(ctx, "DeleteEntry %s: precondition %v failed", fullpath, req.Condition.Kind)
|
||||
resp.Error = "precondition failed"
|
||||
resp.ErrorCode = filer_pb.FilerError_PRECONDITION_FAILED
|
||||
return resp, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ctx, eventSink := filer.WithMetadataEventSink(ctx)
|
||||
err = fs.filer.DeleteEntryMetaAndData(ctx, fullpath, req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures, req.IfNotModifiedAfter)
|
||||
if err != nil && err != filer_pb.ErrNotFound {
|
||||
resp.Error = err.Error()
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user