diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 7a8fbac24..c7fa357fe 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -316,6 +316,7 @@ message ObjectMutation { Recompute recompute = 9; // RECOMPUTE_LATEST parameters bool set_content = 10; // PATCH_EXTENDED: replace Entry.content with content bytes content = 11; // PATCH_EXTENDED: new Entry.content when set_content + bool touch_mtime = 12; // PATCH_EXTENDED: set the entry's Mtime to now (e.g. a metadata-replace copy) } // Recompute re-derives a pointer entry (directory/name on the mutation) from the diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index b29f72aa9..5f3d2f407 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -316,6 +316,7 @@ message ObjectMutation { Recompute recompute = 9; // RECOMPUTE_LATEST parameters bool set_content = 10; // PATCH_EXTENDED: replace Entry.content with content bytes content = 11; // PATCH_EXTENDED: new Entry.content when set_content + bool touch_mtime = 12; // PATCH_EXTENDED: set the entry's Mtime to now (e.g. a metadata-replace copy) } // Recompute re-derives a pointer entry (directory/name on the mutation) from the diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index 6f494c701..555958632 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -1430,6 +1430,7 @@ type ObjectMutation struct { Recompute *Recompute `protobuf:"bytes,9,opt,name=recompute,proto3" json:"recompute,omitempty"` // RECOMPUTE_LATEST parameters SetContent bool `protobuf:"varint,10,opt,name=set_content,json=setContent,proto3" json:"set_content,omitempty"` // PATCH_EXTENDED: replace Entry.content with content Content []byte `protobuf:"bytes,11,opt,name=content,proto3" json:"content,omitempty"` // PATCH_EXTENDED: new Entry.content when set_content + TouchMtime bool `protobuf:"varint,12,opt,name=touch_mtime,json=touchMtime,proto3" json:"touch_mtime,omitempty"` // PATCH_EXTENDED: set the entry's Mtime to now (e.g. a metadata-replace copy) unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1541,6 +1542,13 @@ func (x *ObjectMutation) GetContent() []byte { return nil } +func (x *ObjectMutation) GetTouchMtime() bool { + if x != nil { + return x.TouchMtime + } + return false +} + // Recompute re-derives a pointer entry (directory/name on the mutation) from the // current contents of a scanned directory, atomically under the transaction's // lock. It is mechanical: the filer picks the child that sorts first or last by @@ -6398,7 +6406,7 @@ const file_filer_proto_rawDesc = "" + "\x13IF_UNMODIFIED_SINCE\x10\x05\x12\x15\n" + "\x11IF_MODIFIED_SINCE\x10\x06\x12\x19\n" + "\x15IF_EXTENDED_NOT_EQUAL\x10\a\x12\x1c\n" + - "\x18IF_EXTENDED_TIME_ELAPSED\x10\b\"\xd1\x04\n" + + "\x18IF_EXTENDED_TIME_ELAPSED\x10\b\"\xf2\x04\n" + "\x0eObjectMutation\x121\n" + "\x04type\x18\x01 \x01(\x0e2\x1d.filer_pb.ObjectMutation.TypeR\x04type\x12\x1c\n" + "\tdirectory\x18\x02 \x01(\tR\tdirectory\x12\x12\n" + @@ -6412,7 +6420,9 @@ const file_filer_proto_rawDesc = "" + "\vset_content\x18\n" + " \x01(\bR\n" + "setContent\x12\x18\n" + - "\acontent\x18\v \x01(\fR\acontent\x1a>\n" + + "\acontent\x18\v \x01(\fR\acontent\x12\x1f\n" + + "\vtouch_mtime\x18\f \x01(\bR\n" + + "touchMtime\x1a>\n" + "\x10SetExtendedEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\fR\x05value:\x028\x01\"E\n" + diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 23260dab3..72942c34c 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -158,9 +158,12 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request if sameDestination && (replaceMeta || replaceTagging) && s3a.canUseMetadataOnlySelfCopy(entry, r, dstBucket, dstObject) { var dstVersionId string var etag string - updateCode := s3a.withObjectWriteLock(dstBucket, dstObject, func() s3err.ErrorCode { - return s3a.checkConditionalHeaders(r, dstBucket, dstObject) - }, func() s3err.ErrorCode { + // A non-versioned in-place metadata replace routes to the owner as a + // serialized PATCH (off the distributed lock); versioned/suspended (which + // create a new version) and the no-owner bootstrap keep the lock. + owner := s3a.objectWriteOwner(dstBucket, dstObject) + routeInPlace := owner != "" && dstVersioningState == "" + selfCopyBody := func() s3err.ErrorCode { currentEntry, currentErr := s3a.resolveCopySourceEntry(srcBucket, srcObject, srcVersionId, srcVersioningState) if currentErr != nil || currentEntry.IsDirectory { return s3err.ErrInvalidCopySource @@ -168,26 +171,41 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request if errCode := s3a.validateConditionalCopyHeaders(r, currentEntry); errCode != s3err.ErrNone { return errCode } - - updatedEntry := cloneProtoEntry(currentEntry) - updatedMetadata, metadataErr := processMetadataBytes(r.Header, updatedEntry.Extended, replaceMeta, replaceTagging) - currentErr = metadataErr - if currentErr != nil { - glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, currentErr) + updatedMetadata, metadataErr := processMetadataBytes(r.Header, currentEntry.Extended, replaceMeta, replaceTagging) + if metadataErr != nil { + glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, metadataErr) return s3err.ErrInvalidTag } + if routeInPlace { + if err := s3a.routedMetadataReplace(owner, dstBucket, dstObject, currentEntry, updatedMetadata); err != nil { + return filerErrorToS3Error(err) + } + etag = getEtagFromEntry(currentEntry) + return s3err.ErrNone + } + updatedEntry := cloneProtoEntry(currentEntry) updatedEntry.Extended = mergeCopyMetadata(updatedEntry.Extended, updatedMetadata) if updatedEntry.Attributes == nil { updatedEntry.Attributes = &filer_pb.FuseAttributes{} } updatedEntry.Attributes.Mtime = t.Unix() - - dstVersionId, etag, currentErr = s3a.finalizeCopyDestination(dstBucket, dstObject, dstVersioningState, updatedEntry) - if currentErr != nil { - return filerErrorToS3Error(currentErr) + var finErr error + dstVersionId, etag, finErr = s3a.finalizeCopyDestination(dstBucket, dstObject, dstVersioningState, updatedEntry) + if finErr != nil { + return filerErrorToS3Error(finErr) } return s3err.ErrNone - }) + } + var updateCode s3err.ErrorCode + if routeInPlace { + if updateCode = s3a.checkConditionalHeaders(r, dstBucket, dstObject); updateCode == s3err.ErrNone { + updateCode = selfCopyBody() + } + } else { + updateCode = s3a.withObjectWriteLock(dstBucket, dstObject, func() s3err.ErrorCode { + return s3a.checkConditionalHeaders(r, dstBucket, dstObject) + }, selfCopyBody) + } if updateCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, updateCode) return diff --git a/weed/s3api/s3api_object_routed_write.go b/weed/s3api/s3api_object_routed_write.go index 95340a00a..4febcc4a5 100644 --- a/weed/s3api/s3api_object_routed_write.go +++ b/weed/s3api/s3api_object_routed_write.go @@ -215,3 +215,41 @@ func (s3a *S3ApiServer) routedDelete(owner pb.ServerAddress, bucket, object stri }}, }) } + +// routedMetadataReplace applies a metadata-only self-copy (REPLACE directive) to +// an existing object in place via a routed PATCH_EXTENDED. The owner merges the +// new managed metadata onto a fresh read of the entry under its per-path lock — +// so a concurrent change to non-managed keys (legal hold, retention, version id) +// is preserved rather than clobbered by a whole-entry rewrite — and bumps mtime. +// updatedMetadata is the full managed-metadata set (processMetadataBytes); the +// delete list is the managed keys the replace dropped. +func (s3a *S3ApiServer) routedMetadataReplace(owner pb.ServerAddress, bucket, object string, current *filer_pb.Entry, updatedMetadata map[string][]byte) error { + fullpath := util.NewFullPath(s3a.bucketDir(bucket), object) + dir, name := fullpath.DirAndName() + var del []string + for k := range current.Extended { + if isManagedCopyMetadataKey(k) { + if _, keep := updatedMetadata[k]; !keep { + del = append(del, k) + } + } + } + resp, err := s3a.objectTxnOnFiler(owner, &filer_pb.ObjectTransactionRequest{ + LockKey: string(fullpath), + Mutations: []*filer_pb.ObjectMutation{{ + Type: filer_pb.ObjectMutation_PATCH_EXTENDED, + Directory: dir, + Name: name, + SetExtended: updatedMetadata, + DeleteExtended: del, + TouchMtime: true, + }}, + }) + if err != nil { + return err + } + if resp.Error != "" { + return fmt.Errorf("routed metadata replace %s/%s: %s", bucket, object, resp.Error) + } + return nil +} diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 2bf466882..5d9e4f5ef 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -379,6 +379,9 @@ func (fs *FilerServer) applyObjectMutation(ctx context.Context, m *filer_pb.Obje newEntry.FileSize = uint64(len(m.Content)) } } + if m.TouchMtime { + newEntry.Attr.Mtime = time.Now() + } if err := fs.filer.UpdateEntry(ctx, oldEntry, newEntry); err != nil { return err } diff --git a/weed/server/filer_grpc_server_object_txn_test.go b/weed/server/filer_grpc_server_object_txn_test.go index ddf5a42b2..05c03cacc 100644 --- a/weed/server/filer_grpc_server_object_txn_test.go +++ b/weed/server/filer_grpc_server_object_txn_test.go @@ -560,3 +560,38 @@ func TestObjectTransactionVersionDeleteWithWorm(t *testing.T) { t.Errorf("pointer should recompute to v_b/v2, got name=%s vid=%s", ptr["latestName"], ptr["latestVid"]) } } + +// PATCH_EXTENDED with touch_mtime bumps the entry's Mtime (a metadata-replace +// copy) while merging Extended. +func TestObjectTransactionPatchTouchMtime(t *testing.T) { + old := time.Unix(1600000000, 0) + fs, store := newTxnTestServer(map[string]*filer.Entry{ + "/buckets/b/obj": { + FullPath: "/buckets/b/obj", + Attr: filer.Attr{Inode: 1, Mtime: old, Crtime: old, Mode: 0644}, + Extended: map[string][]byte{"X-Amz-Meta-old": []byte("1")}, + }, + }) + resp, err := fs.ObjectTransaction(context.Background(), &filer_pb.ObjectTransactionRequest{ + LockKey: "/buckets/b/obj", + Mutations: []*filer_pb.ObjectMutation{{ + Type: filer_pb.ObjectMutation_PATCH_EXTENDED, Directory: "/buckets/b", Name: "obj", + SetExtended: map[string][]byte{"X-Amz-Meta-new": []byte("2")}, + DeleteExtended: []string{"X-Amz-Meta-old"}, + TouchMtime: true, + }}, + }) + if err != nil || resp.Error != "" { + t.Fatalf("patch failed: err=%v resp=%q", err, resp.Error) + } + e := store.entries["/buckets/b/obj"] + if !e.Attr.Mtime.After(old) { + t.Errorf("touch_mtime should bump Mtime past %v, got %v", old, e.Attr.Mtime) + } + if _, ok := e.Extended["X-Amz-Meta-old"]; ok { + t.Errorf("old meta should be deleted") + } + if string(e.Extended["X-Amz-Meta-new"]) != "2" { + t.Errorf("new meta not set: %v", e.Extended) + } +}