From 7519f68ff1054a84df486fdde5ef40a594af03ab Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 23 May 2026 10:22:52 -0700 Subject: [PATCH] s3: route versioned MPU-complete + lifecycle markers off the DLM Both flip the .versions latest pointer off the object-key lock, racing routed versioned/suspended writes (the per-path entry lock and the distributed lock don't serialize against each other). - completeMultipartUpload (versioning enabled) now routes its latest-pointer update through routedVersionedFinalize on the object's owner, carrying the precondition so the filer evaluates it atomically under the per-path lock. The completion then takes no distributed lock at all (mirroring the versioned PutObject path); only object-lock, suspended versioning, unsupported preconditions, or a missing ring view fall back to the lock. Suspended MPU only writes the main object entry (last-writer-wins, benign) and is unchanged. - The lifecycle expiration delete-marker (enabled and suspended) resolves the object owner and routes its marker pointer flip the same way. Both fall back to the lock-based updateLatestVersionInDirectory when the owner isn't known. --- weed/s3api/filer_multipart.go | 47 +++++++++++++++++++------- weed/s3api/s3api_internal_lifecycle.go | 9 +++-- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index 126a91589..c6454229f 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -554,14 +554,26 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId entryName, dirName := s3a.getEntryNameAndDir(input) var completionState *multipartCompletionState - // Non-versioned destination: the final write is a single CreateEntry, so route - // it (with the precondition) to the object's owner and skip the distributed - // lock. Assembly and the idempotency replay below are read-only and safe - // outside any lock. Versioned/suspended completions keep the lock path. + // Route the completion to the object's owner and skip the distributed lock + // when it can carry its own precondition. A non-versioned completion is a + // single CreateEntry; a versioning-enabled completion is a unique version + // file plus an atomic latest-pointer flip (FinalizeVersionedWrite). Both + // evaluate the precondition under the per-path lock, so the lock wrapper is + // only kept for paths that can't route: object-lock, suspended versioning, + // unsupported preconditions, or before a ring view arrives. Assembly and the + // idempotency replay below are read-only and safe outside any lock. mpuCond, mpuCondOk := buildWriteCondition(r) var nvOwner pb.ServerAddress - if vs, vErr := s3a.getVersioningState(*input.Bucket); vErr == nil && vs == "" && mpuCondOk { - nvOwner, _ = s3a.routedObjectOwner(*input.Bucket, *input.Key) + var versionedOwner pb.ServerAddress + if mpuCondOk { + if vs, vErr := s3a.getVersioningState(*input.Bucket); vErr == nil { + switch vs { + case "": + nvOwner, _ = s3a.routedObjectOwner(*input.Bucket, *input.Key) + case s3_constants.VersioningEnabled: + versionedOwner = s3a.objectWriteOwner(*input.Bucket, *input.Key) + } + } } finalizeBody := func() s3err.ErrorCode { var prepCode s3err.ErrorCode @@ -657,9 +669,20 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl versionEntryForCache.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) } - // Update the .versions directory metadata to indicate this is the latest version - // Pass entry to cache its metadata for single-scan list efficiency - if err := s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache); err != nil { + // Update the .versions directory metadata to indicate this is the + // latest version. When the owner is known (computed above as + // versionedOwner) the pointer flip is routed: it serializes on the + // object key against routed versioned writes and evaluates the + // precondition atomically, so the completion takes no distributed lock. + // Fall back to the lock-based update when the owner is unknown. + if versionedOwner != "" { + if code := s3a.routedVersionedFinalize(versionedOwner, *input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache, mpuCond, ""); code != s3err.ErrNone { + if rollbackErr := s3a.rollbackMultipartVersion(versionDir, versionFileName); rollbackErr != nil { + glog.Errorf("completeMultipartUpload: failed to rollback version %s for %s/%s after routed finalize error: %v", versionId, *input.Bucket, *input.Key, rollbackErr) + } + return code + } + } else if err := s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache); err != nil { if rollbackErr := s3a.rollbackMultipartVersion(versionDir, versionFileName); rollbackErr != nil { glog.Errorf("completeMultipartUpload: failed to rollback version %s for %s/%s after latest pointer update error: %v", versionId, *input.Bucket, *input.Key, rollbackErr) } @@ -819,9 +842,9 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl return s3err.ErrNone } var finalizeCode s3err.ErrorCode - if nvOwner != "" { - // Routed non-versioned completion: no distributed lock, no gateway - // precondition here — the routed write carries the condition. + if nvOwner != "" || versionedOwner != "" { + // Routed completion: no distributed lock, no gateway precondition here — + // the routed write carries the condition. finalizeCode = finalizeBody() } else { finalizeCode = s3a.withObjectWriteLock(*input.Bucket, *input.Key, func() s3err.ErrorCode { diff --git a/weed/s3api/s3api_internal_lifecycle.go b/weed/s3api/s3api_internal_lifecycle.go index a636d4aeb..3a110fbd7 100644 --- a/weed/s3api/s3api_internal_lifecycle.go +++ b/weed/s3api/s3api_internal_lifecycle.go @@ -69,9 +69,14 @@ func (s3a *S3ApiServer) lifecycleDispatch(ctx context.Context, req *s3_lifecycle } return retryLater("TRANSPORT_ERROR: versioning lookup: " + vErr.Error()), nil } + // Route the pointer flip to the object's owner so it serializes on the + // object key against routed versioned/suspended writes (it would otherwise + // flip the latest pointer off that lock). Falls back to the lock path when + // the owner is unknown. + markerOwner := s3a.objectWriteOwner(req.Bucket, req.ObjectPath) switch state { case s3_constants.VersioningEnabled: - if _, err := s3a.createDeleteMarker(req.Bucket, req.ObjectPath, "", nil, ""); err != nil { + if _, err := s3a.createDeleteMarker(req.Bucket, req.ObjectPath, markerOwner, nil, ""); err != nil { return retryLater("TRANSPORT_ERROR: createDeleteMarker: " + err.Error()), nil } return done(), nil @@ -82,7 +87,7 @@ func (s3a *S3ApiServer) lifecycleDispatch(ctx context.Context, req *s3_lifecycle return retryLater("TRANSPORT_ERROR: deleteNullVersion: " + err.Error()), nil } } - if _, err := s3a.createDeleteMarker(req.Bucket, req.ObjectPath, "", nil, ""); err != nil { + if _, err := s3a.createDeleteMarker(req.Bucket, req.ObjectPath, markerOwner, nil, ""); err != nil { return retryLater("TRANSPORT_ERROR: createDeleteMarker: " + err.Error()), nil } return done(), nil