diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index b941f6af9..afbf54bbd 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -352,6 +352,17 @@ func (s3a *S3ApiServer) withObjectWriteLock(bucket, object string, preconditionF return fn() } +// putFinalize folds an object write's finalize into its create. On the routed +// path its mutations ride in the entry's PUT transaction under lockKey, committing +// atomically (e.g. the .versions pointer flip); off the ring afterCreate does the +// equivalent under the object write lock. A finalize with no routed form (suspended +// IsLatest fixups) carries no mutations and runs only via afterCreate. +type putFinalize struct { + lockKey string + mutations []*filer_pb.ObjectMutation + afterCreate func(entry *filer_pb.Entry) s3err.ErrorCode +} + // putToFiler writes one chunk of object bytes (a full PutObject body, a // single MPU part, a copy-part destination). lifecycleTTLSec is non-zero // only for top-level PutObject paths where the lifecycle XML's @@ -359,7 +370,7 @@ func (s3a *S3ApiServer) withObjectWriteLock(bucket, object string, preconditionF // pass 0 because their own keys aren't the user-visible object the rule // targets and a part write would otherwise bind a TTL clock starting // before CompleteMultipartUpload. -func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader io.Reader, bucket string, object string, partNumber int, lifecycleTTLSec int32, afterCreate func(entry *filer_pb.Entry) s3err.ErrorCode, uniqueWritePath bool) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) { +func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader io.Reader, bucket string, object string, partNumber int, lifecycleTTLSec int32, finalize *putFinalize, uniqueWritePath bool) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) { // NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy // This eliminates the filer proxy overhead for PUT operations // Note: filePath is now passed directly instead of URL (no parsing needed) @@ -819,8 +830,8 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader return filerErrorToS3Error(createErr) } entryCreated = true - if afterCreate != nil { - if afterCreateCode := afterCreate(entry); afterCreateCode != s3err.ErrNone { + if finalize != nil && finalize.afterCreate != nil { + if afterCreateCode := finalize.afterCreate(entry); afterCreateCode != s3err.ErrNone { rollbackErr = s3a.rmObject(path.Dir(filePath), path.Base(filePath), true, false) if rollbackErr != nil { glog.Errorf("putToFiler: failed to rollback created entry for %s after post-create error: %v", filePath, rollbackErr) @@ -833,15 +844,19 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader return s3err.ErrNone } - // Route the create to the object's owner filer, whose per-path lock - // serializes it, then run afterCreate (e.g. a versioned finalize that routes - // itself). Conditional/object-lock/non-reducible cases fall back to the - // distributed lock. + // Route the create to the object's owner filer (its per-path lock serializes + // it); conditional/object-lock/non-reducible cases fall back to the lock. var createCode s3err.ErrorCode routed := false if owner := s3a.routableWriteOwner(bucket, object); owner != "" { if cond, ok := routeWriteCondition(r, uniqueWritePath); ok { - resp, err := s3a.routedPut(owner, s3a.objectRouteKey(bucket, object), filePath, entry, cond) + // Routed mutations ride in the PUT's transaction (committing atomically), + // so lockKey is the object path they carry, not the version file path. + lockKey, finalizeMutations := filePath, []*filer_pb.ObjectMutation(nil) + if finalize != nil && len(finalize.mutations) > 0 { + lockKey, finalizeMutations = finalize.lockKey, finalize.mutations + } + resp, err := s3a.routedPut(owner, s3a.objectRouteKey(bucket, object), lockKey, filePath, entry, cond, finalizeMutations) switch { case err != nil: glog.Warningf("putToFiler: routed PUT to %s failed for %s, falling back to lock: %v", owner, filePath, err) @@ -852,8 +867,10 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader glog.Warningf("putToFiler: routed PUT to %s returned %q for %s, falling back to lock", owner, resp.Error, filePath) default: entryCreated, routed, createCode = true, true, s3err.ErrNone - if afterCreate != nil { - createCode = afterCreate(entry) + // Bundled mutations already finalized in the transaction above; + // a finalize with none (suspended versioning) runs off the lock. + if len(finalizeMutations) == 0 && finalize != nil && finalize.afterCreate != nil { + createCode = finalize.afterCreate(entry) } } } @@ -1300,22 +1317,19 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob } } - // Upload the file using putToFiler - this will create the file with version metadata. - // Versioned/suspended bucket → resolver returns 0 by construction; - // pass 0 directly so the path is explicit at the call site. - // - // Clear the prior latest-version pointer (and stamp the displaced - // entry with NoncurrentSinceNs) inside the afterCreate callback so - // it runs while withObjectWriteLock is still held in putToFiler. - // Doing it after putToFiler returns would race a concurrent PUT - // promoting a newer latest, which we'd then incorrectly wipe. - etag, errCode, sseMetadata = s3a.putToFiler(r, filePath, body, bucket, normalizedObject, 1, 0, func(_ *filer_pb.Entry) s3err.ErrorCode { - if err := s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, normalizedObject); err != nil { - // Best-effort: a stale IsLatest flag is recoverable on the - // next list-versions resync, so don't fail the PUT. - glog.Warningf("putSuspendedVersioningObject: failed to update IsLatest flags: %v", err) - } - return s3err.ErrNone + // Versioned/suspended bucket → resolver returns 0; pass it directly. + // afterCreate clears the prior latest pointer and stamps the displaced version + // with NoncurrentSinceNs — off-ring under the write lock, routed off-lock after + // the PUT. Best-effort either way: a stale flag self-heals on the next list. + etag, errCode, sseMetadata = s3a.putToFiler(r, filePath, body, bucket, normalizedObject, 1, 0, &putFinalize{ + afterCreate: func(_ *filer_pb.Entry) s3err.ErrorCode { + if err := s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, normalizedObject); err != nil { + // Best-effort: a stale IsLatest flag is recoverable on the + // next list-versions resync, so don't fail the PUT. + glog.Warningf("putSuspendedVersioningObject: failed to update IsLatest flags: %v", err) + } + return s3err.ErrNone + }, }, false) if errCode != s3err.ErrNone { glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) @@ -1483,7 +1497,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // directly — versioned objects sit on regular volumes and the // lifecycle worker handles their expiration. etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, normalizedObject, 1, 0, - s3a.versionedAfterCreate(bucket, normalizedObject, versionId, versionFileName, useInvertedFormat), true) + s3a.versionedFinalize(bucket, normalizedObject, versionId, versionFileName, useInvertedFormat), true) if errCode != s3err.ErrNone { glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) return "", "", errCode, SSEResponseMetadata{} diff --git a/weed/s3api/s3api_object_routed_write.go b/weed/s3api/s3api_object_routed_write.go index 1a4b072ff..af0b0b19f 100644 --- a/weed/s3api/s3api_object_routed_write.go +++ b/weed/s3api/s3api_object_routed_write.go @@ -158,19 +158,25 @@ func (s3a *S3ApiServer) objectTxnOnFiler(owner pb.ServerAddress, req *filer_pb.O return resp, err } -// routedPut writes an object entry as a one-mutation ObjectTransaction on the -// owner filer. lock_key is the object's full path so the transaction shares the -// per-path lock with a concurrent create or delete of the same key. -func (s3a *S3ApiServer) routedPut(owner pb.ServerAddress, routeKey, filePath string, entry *filer_pb.Entry, cond *filer_pb.WriteCondition) (*filer_pb.ObjectTransactionResponse, error) { +// routedPut writes an entry as a PUT, optionally followed by finalize mutations, +// as one ObjectTransaction applied in order under lockKey on the owner filer. +// lockKey is normally the entry's own path; a versioned add instead passes the +// object path plus a RECOMPUTE_LATEST finalize, so the version's PUT and its +// .versions pointer flip commit atomically (the recompute scans .versions/ after +// the PUT and sees the new version). +func (s3a *S3ApiServer) routedPut(owner pb.ServerAddress, routeKey, lockKey, filePath string, entry *filer_pb.Entry, cond *filer_pb.WriteCondition, finalize []*filer_pb.ObjectMutation) (*filer_pb.ObjectTransactionResponse, error) { + mutations := make([]*filer_pb.ObjectMutation, 0, 1+len(finalize)) + mutations = append(mutations, &filer_pb.ObjectMutation{ + Type: filer_pb.ObjectMutation_PUT, + Directory: path.Dir(filePath), + Entry: entry, + }) + mutations = append(mutations, finalize...) return s3a.objectTxnOnFiler(owner, &filer_pb.ObjectTransactionRequest{ - LockKey: filePath, + LockKey: lockKey, RouteKey: routeKey, Condition: cond, - Mutations: []*filer_pb.ObjectMutation{{ - Type: filer_pb.ObjectMutation_PUT, - Directory: path.Dir(filePath), - Entry: entry, - }}, + Mutations: mutations, }) } @@ -193,7 +199,8 @@ func (s3a *S3ApiServer) routedMkFile(owner pb.ServerAddress, routeKey, parentDir if fn != nil { fn(entry) } - resp, err := s3a.routedPut(owner, routeKey, parentDir+"/"+name, entry, nil) + filePath := parentDir + "/" + name + resp, err := s3a.routedPut(owner, routeKey, filePath, filePath, entry, nil, nil) if err != nil { return err } diff --git a/weed/s3api/s3api_object_versioned_finalize.go b/weed/s3api/s3api_object_versioned_finalize.go index 7844e6509..ca1bd978d 100644 --- a/weed/s3api/s3api_object_versioned_finalize.go +++ b/weed/s3api/s3api_object_versioned_finalize.go @@ -170,19 +170,20 @@ func (s3a *S3ApiServer) routedDeleteNullVersion(owner pb.ServerAddress, bucket, } } -// versionedAfterCreate returns the putToFiler hook that finalizes a versioned -// write: the routed RECOMPUTE_LATEST when the owner is known, else the existing -// lock-free updateLatestVersionInDirectory. -func (s3a *S3ApiServer) versionedAfterCreate(bucket, object, versionId, versionFileName string, useInvertedFormat bool) func(*filer_pb.Entry) s3err.ErrorCode { - owner := s3a.objectWriteOwner(bucket, object) - return func(versionEntry *filer_pb.Entry) s3err.ErrorCode { - if owner != "" { - return s3a.routedVersionedFinalize(owner, bucket, object, useInvertedFormat) - } - if err := s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName, versionEntry); err != nil { - glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) - return s3err.ErrInternalError - } - return s3err.ErrNone +// versionedFinalize flips the .versions latest pointer for a versioned PutObject: +// on the routed path RECOMPUTE_LATEST rides in the version file's PUT transaction, +// committing atomically under the object's per-path lock; off the ring +// updateLatestVersionInDirectory does it under the object write lock. +func (s3a *S3ApiServer) versionedFinalize(bucket, object, versionId, versionFileName string, useInvertedFormat bool) *putFinalize { + return &putFinalize{ + lockKey: s3a.toFilerPath(bucket, object), + mutations: []*filer_pb.ObjectMutation{s3a.latestPointerRecompute(bucket, object, useInvertedFormat, "", true)}, + afterCreate: func(versionEntry *filer_pb.Entry) s3err.ErrorCode { + if err := s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName, versionEntry); err != nil { + glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) + return s3err.ErrInternalError + } + return s3err.ErrNone + }, } } diff --git a/weed/server/filer_grpc_server_object_txn_test.go b/weed/server/filer_grpc_server_object_txn_test.go index 11502be64..098917d7c 100644 --- a/weed/server/filer_grpc_server_object_txn_test.go +++ b/weed/server/filer_grpc_server_object_txn_test.go @@ -496,6 +496,59 @@ func TestObjectTransactionRecomputeDemoteAndAttrs(t *testing.T) { } } +// A versioned add is one transaction: the PUT writes the new version file and +// the RECOMPUTE_LATEST that follows, under the same lock, scans the directory, +// sees it, flips the .versions pointer to it, and demotes the prior latest. This +// is what lets putVersionedObject commit the version and its pointer atomically. +func TestObjectTransactionPutThenRecomputeLatest(t *testing.T) { + t0 := time.Unix(1700000000, 0) + t1 := time.Unix(1700000100, 0) + fs, store := newTxnTestServer(map[string]*filer.Entry{ + "/buckets/b/obj/.versions": { + Attr: filer.Attr{Inode: 2, Mtime: t0, Crtime: t0, Mode: 0755 | (1 << 31)}, + Extended: map[string][]byte{"latestName": []byte("v1.ver"), "latestVid": []byte("v1")}, + }, + "/buckets/b/obj/.versions/v1.ver": { + Attr: filer.Attr{Inode: 10, Mtime: t0, Crtime: t0, Mode: 0644, FileSize: 100}, + Extended: map[string][]byte{"vid": []byte("v1")}, + }, + }) + + resp, err := fs.ObjectTransaction(context.Background(), &filer_pb.ObjectTransactionRequest{ + LockKey: "/buckets/b/obj", + Mutations: []*filer_pb.ObjectMutation{ + {Type: filer_pb.ObjectMutation_PUT, Directory: "/buckets/b/obj/.versions", Entry: &filer_pb.Entry{ + Name: "v2.ver", + Attributes: &filer_pb.FuseAttributes{Mtime: t1.Unix(), FileMode: 0644, Inode: 11, FileSize: 250}, + Extended: map[string][]byte{"vid": []byte("v2")}, + }}, + {Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST, Directory: "/buckets/b/obj", Name: ".versions", + Recompute: &filer_pb.Recompute{ + ScanDir: "/buckets/b/obj/.versions", + Descending: true, + CopyExtended: map[string]string{"latestVid": "vid"}, + NameToKey: "latestName", + DemoteKey: "noncurrentSince", + DemoteValue: []byte("999"), + }}, + }, + }) + if err != nil || resp.Error != "" { + t.Fatalf("txn failed: err=%v resp=%q", err, resp.Error) + } + + if _, ok := store.entries["/buckets/b/obj/.versions/v2.ver"]; !ok { + t.Fatalf("the PUT should have created the new version") + } + ptr := store.entries["/buckets/b/obj/.versions"].Extended + if string(ptr["latestName"]) != "v2.ver" || string(ptr["latestVid"]) != "v2" { + t.Fatalf("pointer should flip to the just-PUT version, got name=%s vid=%s", ptr["latestName"], ptr["latestVid"]) + } + if got := store.entries["/buckets/b/obj/.versions/v1.ver"].Extended["noncurrentSince"]; string(got) != "999" { + t.Errorf("prior latest v1.ver should be demoted, noncurrentSince=%q want 999", got) + } +} + // A version-specific delete locks the object (condition_key checks WORM on the // version), recomputes the pointer excluding the version (repoint-before-delete), // then deletes it. A legal-hold guard blocks the delete and preserves the entry.