mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
s3: route metadata-only self-copy off the distributed lock (#9638)
A non-versioned metadata-only self-copy (CopyObject with source == destination and the REPLACE directive) is a read-modify-write of one entry, which is why it held the distributed lock. It now routes to the owner as a serialized PATCH_EXTENDED: the owner merges the new managed metadata (set the replacements, delete the dropped keys) onto a fresh read of the entry under its per-path lock, so a concurrent change to non-managed keys (legal hold, retention, version id) is preserved instead of clobbered, and bumps mtime. PATCH_EXTENDED gains touch_mtime for the mtime bump. Versioned and suspended self-copies create a new version (already routed via the copy finalize) and the no-owner bootstrap keep the lock.
This commit is contained in:
@@ -316,6 +316,7 @@ message ObjectMutation {
|
||||
Recompute recompute = 9; // RECOMPUTE_LATEST parameters
|
||||
bool set_content = 10; // PATCH_EXTENDED: replace Entry.content with content
|
||||
bytes content = 11; // PATCH_EXTENDED: new Entry.content when set_content
|
||||
bool touch_mtime = 12; // PATCH_EXTENDED: set the entry's Mtime to now (e.g. a metadata-replace copy)
|
||||
}
|
||||
|
||||
// Recompute re-derives a pointer entry (directory/name on the mutation) from the
|
||||
|
||||
@@ -316,6 +316,7 @@ message ObjectMutation {
|
||||
Recompute recompute = 9; // RECOMPUTE_LATEST parameters
|
||||
bool set_content = 10; // PATCH_EXTENDED: replace Entry.content with content
|
||||
bytes content = 11; // PATCH_EXTENDED: new Entry.content when set_content
|
||||
bool touch_mtime = 12; // PATCH_EXTENDED: set the entry's Mtime to now (e.g. a metadata-replace copy)
|
||||
}
|
||||
|
||||
// Recompute re-derives a pointer entry (directory/name on the mutation) from the
|
||||
|
||||
@@ -1430,6 +1430,7 @@ type ObjectMutation struct {
|
||||
Recompute *Recompute `protobuf:"bytes,9,opt,name=recompute,proto3" json:"recompute,omitempty"` // RECOMPUTE_LATEST parameters
|
||||
SetContent bool `protobuf:"varint,10,opt,name=set_content,json=setContent,proto3" json:"set_content,omitempty"` // PATCH_EXTENDED: replace Entry.content with content
|
||||
Content []byte `protobuf:"bytes,11,opt,name=content,proto3" json:"content,omitempty"` // PATCH_EXTENDED: new Entry.content when set_content
|
||||
TouchMtime bool `protobuf:"varint,12,opt,name=touch_mtime,json=touchMtime,proto3" json:"touch_mtime,omitempty"` // PATCH_EXTENDED: set the entry's Mtime to now (e.g. a metadata-replace copy)
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -1541,6 +1542,13 @@ func (x *ObjectMutation) GetContent() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *ObjectMutation) GetTouchMtime() bool {
|
||||
if x != nil {
|
||||
return x.TouchMtime
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Recompute re-derives a pointer entry (directory/name on the mutation) from the
|
||||
// current contents of a scanned directory, atomically under the transaction's
|
||||
// lock. It is mechanical: the filer picks the child that sorts first or last by
|
||||
@@ -6398,7 +6406,7 @@ const file_filer_proto_rawDesc = "" +
|
||||
"\x13IF_UNMODIFIED_SINCE\x10\x05\x12\x15\n" +
|
||||
"\x11IF_MODIFIED_SINCE\x10\x06\x12\x19\n" +
|
||||
"\x15IF_EXTENDED_NOT_EQUAL\x10\a\x12\x1c\n" +
|
||||
"\x18IF_EXTENDED_TIME_ELAPSED\x10\b\"\xd1\x04\n" +
|
||||
"\x18IF_EXTENDED_TIME_ELAPSED\x10\b\"\xf2\x04\n" +
|
||||
"\x0eObjectMutation\x121\n" +
|
||||
"\x04type\x18\x01 \x01(\x0e2\x1d.filer_pb.ObjectMutation.TypeR\x04type\x12\x1c\n" +
|
||||
"\tdirectory\x18\x02 \x01(\tR\tdirectory\x12\x12\n" +
|
||||
@@ -6412,7 +6420,9 @@ const file_filer_proto_rawDesc = "" +
|
||||
"\vset_content\x18\n" +
|
||||
" \x01(\bR\n" +
|
||||
"setContent\x12\x18\n" +
|
||||
"\acontent\x18\v \x01(\fR\acontent\x1a>\n" +
|
||||
"\acontent\x18\v \x01(\fR\acontent\x12\x1f\n" +
|
||||
"\vtouch_mtime\x18\f \x01(\bR\n" +
|
||||
"touchMtime\x1a>\n" +
|
||||
"\x10SetExtendedEntry\x12\x10\n" +
|
||||
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
|
||||
"\x05value\x18\x02 \x01(\fR\x05value:\x028\x01\"E\n" +
|
||||
|
||||
@@ -158,9 +158,12 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
|
||||
if sameDestination && (replaceMeta || replaceTagging) && s3a.canUseMetadataOnlySelfCopy(entry, r, dstBucket, dstObject) {
|
||||
var dstVersionId string
|
||||
var etag string
|
||||
updateCode := s3a.withObjectWriteLock(dstBucket, dstObject, func() s3err.ErrorCode {
|
||||
return s3a.checkConditionalHeaders(r, dstBucket, dstObject)
|
||||
}, func() s3err.ErrorCode {
|
||||
// A non-versioned in-place metadata replace routes to the owner as a
|
||||
// serialized PATCH (off the distributed lock); versioned/suspended (which
|
||||
// create a new version) and the no-owner bootstrap keep the lock.
|
||||
owner := s3a.objectWriteOwner(dstBucket, dstObject)
|
||||
routeInPlace := owner != "" && dstVersioningState == ""
|
||||
selfCopyBody := func() s3err.ErrorCode {
|
||||
currentEntry, currentErr := s3a.resolveCopySourceEntry(srcBucket, srcObject, srcVersionId, srcVersioningState)
|
||||
if currentErr != nil || currentEntry.IsDirectory {
|
||||
return s3err.ErrInvalidCopySource
|
||||
@@ -168,26 +171,41 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
|
||||
if errCode := s3a.validateConditionalCopyHeaders(r, currentEntry); errCode != s3err.ErrNone {
|
||||
return errCode
|
||||
}
|
||||
|
||||
updatedEntry := cloneProtoEntry(currentEntry)
|
||||
updatedMetadata, metadataErr := processMetadataBytes(r.Header, updatedEntry.Extended, replaceMeta, replaceTagging)
|
||||
currentErr = metadataErr
|
||||
if currentErr != nil {
|
||||
glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, currentErr)
|
||||
updatedMetadata, metadataErr := processMetadataBytes(r.Header, currentEntry.Extended, replaceMeta, replaceTagging)
|
||||
if metadataErr != nil {
|
||||
glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, metadataErr)
|
||||
return s3err.ErrInvalidTag
|
||||
}
|
||||
if routeInPlace {
|
||||
if err := s3a.routedMetadataReplace(owner, dstBucket, dstObject, currentEntry, updatedMetadata); err != nil {
|
||||
return filerErrorToS3Error(err)
|
||||
}
|
||||
etag = getEtagFromEntry(currentEntry)
|
||||
return s3err.ErrNone
|
||||
}
|
||||
updatedEntry := cloneProtoEntry(currentEntry)
|
||||
updatedEntry.Extended = mergeCopyMetadata(updatedEntry.Extended, updatedMetadata)
|
||||
if updatedEntry.Attributes == nil {
|
||||
updatedEntry.Attributes = &filer_pb.FuseAttributes{}
|
||||
}
|
||||
updatedEntry.Attributes.Mtime = t.Unix()
|
||||
|
||||
dstVersionId, etag, currentErr = s3a.finalizeCopyDestination(dstBucket, dstObject, dstVersioningState, updatedEntry)
|
||||
if currentErr != nil {
|
||||
return filerErrorToS3Error(currentErr)
|
||||
var finErr error
|
||||
dstVersionId, etag, finErr = s3a.finalizeCopyDestination(dstBucket, dstObject, dstVersioningState, updatedEntry)
|
||||
if finErr != nil {
|
||||
return filerErrorToS3Error(finErr)
|
||||
}
|
||||
return s3err.ErrNone
|
||||
})
|
||||
}
|
||||
var updateCode s3err.ErrorCode
|
||||
if routeInPlace {
|
||||
if updateCode = s3a.checkConditionalHeaders(r, dstBucket, dstObject); updateCode == s3err.ErrNone {
|
||||
updateCode = selfCopyBody()
|
||||
}
|
||||
} else {
|
||||
updateCode = s3a.withObjectWriteLock(dstBucket, dstObject, func() s3err.ErrorCode {
|
||||
return s3a.checkConditionalHeaders(r, dstBucket, dstObject)
|
||||
}, selfCopyBody)
|
||||
}
|
||||
if updateCode != s3err.ErrNone {
|
||||
s3err.WriteErrorResponse(w, r, updateCode)
|
||||
return
|
||||
|
||||
@@ -215,3 +215,41 @@ func (s3a *S3ApiServer) routedDelete(owner pb.ServerAddress, bucket, object stri
|
||||
}},
|
||||
})
|
||||
}
|
||||
|
||||
// routedMetadataReplace applies a metadata-only self-copy (REPLACE directive) to
|
||||
// an existing object in place via a routed PATCH_EXTENDED. The owner merges the
|
||||
// new managed metadata onto a fresh read of the entry under its per-path lock —
|
||||
// so a concurrent change to non-managed keys (legal hold, retention, version id)
|
||||
// is preserved rather than clobbered by a whole-entry rewrite — and bumps mtime.
|
||||
// updatedMetadata is the full managed-metadata set (processMetadataBytes); the
|
||||
// delete list is the managed keys the replace dropped.
|
||||
func (s3a *S3ApiServer) routedMetadataReplace(owner pb.ServerAddress, bucket, object string, current *filer_pb.Entry, updatedMetadata map[string][]byte) error {
|
||||
fullpath := util.NewFullPath(s3a.bucketDir(bucket), object)
|
||||
dir, name := fullpath.DirAndName()
|
||||
var del []string
|
||||
for k := range current.Extended {
|
||||
if isManagedCopyMetadataKey(k) {
|
||||
if _, keep := updatedMetadata[k]; !keep {
|
||||
del = append(del, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
resp, err := s3a.objectTxnOnFiler(owner, &filer_pb.ObjectTransactionRequest{
|
||||
LockKey: string(fullpath),
|
||||
Mutations: []*filer_pb.ObjectMutation{{
|
||||
Type: filer_pb.ObjectMutation_PATCH_EXTENDED,
|
||||
Directory: dir,
|
||||
Name: name,
|
||||
SetExtended: updatedMetadata,
|
||||
DeleteExtended: del,
|
||||
TouchMtime: true,
|
||||
}},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.Error != "" {
|
||||
return fmt.Errorf("routed metadata replace %s/%s: %s", bucket, object, resp.Error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -379,6 +379,9 @@ func (fs *FilerServer) applyObjectMutation(ctx context.Context, m *filer_pb.Obje
|
||||
newEntry.FileSize = uint64(len(m.Content))
|
||||
}
|
||||
}
|
||||
if m.TouchMtime {
|
||||
newEntry.Attr.Mtime = time.Now()
|
||||
}
|
||||
if err := fs.filer.UpdateEntry(ctx, oldEntry, newEntry); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -560,3 +560,38 @@ func TestObjectTransactionVersionDeleteWithWorm(t *testing.T) {
|
||||
t.Errorf("pointer should recompute to v_b/v2, got name=%s vid=%s", ptr["latestName"], ptr["latestVid"])
|
||||
}
|
||||
}
|
||||
|
||||
// PATCH_EXTENDED with touch_mtime bumps the entry's Mtime (a metadata-replace
|
||||
// copy) while merging Extended.
|
||||
func TestObjectTransactionPatchTouchMtime(t *testing.T) {
|
||||
old := time.Unix(1600000000, 0)
|
||||
fs, store := newTxnTestServer(map[string]*filer.Entry{
|
||||
"/buckets/b/obj": {
|
||||
FullPath: "/buckets/b/obj",
|
||||
Attr: filer.Attr{Inode: 1, Mtime: old, Crtime: old, Mode: 0644},
|
||||
Extended: map[string][]byte{"X-Amz-Meta-old": []byte("1")},
|
||||
},
|
||||
})
|
||||
resp, err := fs.ObjectTransaction(context.Background(), &filer_pb.ObjectTransactionRequest{
|
||||
LockKey: "/buckets/b/obj",
|
||||
Mutations: []*filer_pb.ObjectMutation{{
|
||||
Type: filer_pb.ObjectMutation_PATCH_EXTENDED, Directory: "/buckets/b", Name: "obj",
|
||||
SetExtended: map[string][]byte{"X-Amz-Meta-new": []byte("2")},
|
||||
DeleteExtended: []string{"X-Amz-Meta-old"},
|
||||
TouchMtime: true,
|
||||
}},
|
||||
})
|
||||
if err != nil || resp.Error != "" {
|
||||
t.Fatalf("patch failed: err=%v resp=%q", err, resp.Error)
|
||||
}
|
||||
e := store.entries["/buckets/b/obj"]
|
||||
if !e.Attr.Mtime.After(old) {
|
||||
t.Errorf("touch_mtime should bump Mtime past %v, got %v", old, e.Attr.Mtime)
|
||||
}
|
||||
if _, ok := e.Extended["X-Amz-Meta-old"]; ok {
|
||||
t.Errorf("old meta should be deleted")
|
||||
}
|
||||
if string(e.Extended["X-Amz-Meta-new"]) != "2" {
|
||||
t.Errorf("new meta not set: %v", e.Extended)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user