mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
s3: route versioned MPU-complete + lifecycle markers off the DLM
Both flip the .versions latest pointer off the object-key lock, racing routed versioned/suspended writes (the per-path entry lock and the distributed lock don't serialize against each other). - completeMultipartUpload (versioning enabled) now routes its latest-pointer update through routedVersionedFinalize on the object's owner, carrying the precondition so the filer evaluates it atomically under the per-path lock. The completion then takes no distributed lock at all (mirroring the versioned PutObject path); only object-lock, suspended versioning, unsupported preconditions, or a missing ring view fall back to the lock. Suspended MPU only writes the main object entry (last-writer-wins, benign) and is unchanged. - The lifecycle expiration delete-marker (enabled and suspended) resolves the object owner and routes its marker pointer flip the same way. Both fall back to the lock-based updateLatestVersionInDirectory when the owner isn't known.
This commit is contained in:
@@ -554,14 +554,26 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
|
||||
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
|
||||
entryName, dirName := s3a.getEntryNameAndDir(input)
|
||||
var completionState *multipartCompletionState
|
||||
// Non-versioned destination: the final write is a single CreateEntry, so route
|
||||
// it (with the precondition) to the object's owner and skip the distributed
|
||||
// lock. Assembly and the idempotency replay below are read-only and safe
|
||||
// outside any lock. Versioned/suspended completions keep the lock path.
|
||||
// Route the completion to the object's owner and skip the distributed lock
|
||||
// when it can carry its own precondition. A non-versioned completion is a
|
||||
// single CreateEntry; a versioning-enabled completion is a unique version
|
||||
// file plus an atomic latest-pointer flip (FinalizeVersionedWrite). Both
|
||||
// evaluate the precondition under the per-path lock, so the lock wrapper is
|
||||
// only kept for paths that can't route: object-lock, suspended versioning,
|
||||
// unsupported preconditions, or before a ring view arrives. Assembly and the
|
||||
// idempotency replay below are read-only and safe outside any lock.
|
||||
mpuCond, mpuCondOk := buildWriteCondition(r)
|
||||
var nvOwner pb.ServerAddress
|
||||
if vs, vErr := s3a.getVersioningState(*input.Bucket); vErr == nil && vs == "" && mpuCondOk {
|
||||
nvOwner, _ = s3a.routedObjectOwner(*input.Bucket, *input.Key)
|
||||
var versionedOwner pb.ServerAddress
|
||||
if mpuCondOk {
|
||||
if vs, vErr := s3a.getVersioningState(*input.Bucket); vErr == nil {
|
||||
switch vs {
|
||||
case "":
|
||||
nvOwner, _ = s3a.routedObjectOwner(*input.Bucket, *input.Key)
|
||||
case s3_constants.VersioningEnabled:
|
||||
versionedOwner = s3a.objectWriteOwner(*input.Bucket, *input.Key)
|
||||
}
|
||||
}
|
||||
}
|
||||
finalizeBody := func() s3err.ErrorCode {
|
||||
var prepCode s3err.ErrorCode
|
||||
@@ -657,9 +669,20 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
|
||||
versionEntryForCache.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
|
||||
}
|
||||
|
||||
// Update the .versions directory metadata to indicate this is the latest version
|
||||
// Pass entry to cache its metadata for single-scan list efficiency
|
||||
if err := s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache); err != nil {
|
||||
// Update the .versions directory metadata to indicate this is the
|
||||
// latest version. When the owner is known (computed above as
|
||||
// versionedOwner) the pointer flip is routed: it serializes on the
|
||||
// object key against routed versioned writes and evaluates the
|
||||
// precondition atomically, so the completion takes no distributed lock.
|
||||
// Fall back to the lock-based update when the owner is unknown.
|
||||
if versionedOwner != "" {
|
||||
if code := s3a.routedVersionedFinalize(versionedOwner, *input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache, mpuCond, ""); code != s3err.ErrNone {
|
||||
if rollbackErr := s3a.rollbackMultipartVersion(versionDir, versionFileName); rollbackErr != nil {
|
||||
glog.Errorf("completeMultipartUpload: failed to rollback version %s for %s/%s after routed finalize error: %v", versionId, *input.Bucket, *input.Key, rollbackErr)
|
||||
}
|
||||
return code
|
||||
}
|
||||
} else if err := s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache); err != nil {
|
||||
if rollbackErr := s3a.rollbackMultipartVersion(versionDir, versionFileName); rollbackErr != nil {
|
||||
glog.Errorf("completeMultipartUpload: failed to rollback version %s for %s/%s after latest pointer update error: %v", versionId, *input.Bucket, *input.Key, rollbackErr)
|
||||
}
|
||||
@@ -819,9 +842,9 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
|
||||
return s3err.ErrNone
|
||||
}
|
||||
var finalizeCode s3err.ErrorCode
|
||||
if nvOwner != "" {
|
||||
// Routed non-versioned completion: no distributed lock, no gateway
|
||||
// precondition here — the routed write carries the condition.
|
||||
if nvOwner != "" || versionedOwner != "" {
|
||||
// Routed completion: no distributed lock, no gateway precondition here —
|
||||
// the routed write carries the condition.
|
||||
finalizeCode = finalizeBody()
|
||||
} else {
|
||||
finalizeCode = s3a.withObjectWriteLock(*input.Bucket, *input.Key, func() s3err.ErrorCode {
|
||||
|
||||
@@ -69,9 +69,14 @@ func (s3a *S3ApiServer) lifecycleDispatch(ctx context.Context, req *s3_lifecycle
|
||||
}
|
||||
return retryLater("TRANSPORT_ERROR: versioning lookup: " + vErr.Error()), nil
|
||||
}
|
||||
// Route the pointer flip to the object's owner so it serializes on the
|
||||
// object key against routed versioned/suspended writes (it would otherwise
|
||||
// flip the latest pointer off that lock). Falls back to the lock path when
|
||||
// the owner is unknown.
|
||||
markerOwner := s3a.objectWriteOwner(req.Bucket, req.ObjectPath)
|
||||
switch state {
|
||||
case s3_constants.VersioningEnabled:
|
||||
if _, err := s3a.createDeleteMarker(req.Bucket, req.ObjectPath, "", nil, ""); err != nil {
|
||||
if _, err := s3a.createDeleteMarker(req.Bucket, req.ObjectPath, markerOwner, nil, ""); err != nil {
|
||||
return retryLater("TRANSPORT_ERROR: createDeleteMarker: " + err.Error()), nil
|
||||
}
|
||||
return done(), nil
|
||||
@@ -82,7 +87,7 @@ func (s3a *S3ApiServer) lifecycleDispatch(ctx context.Context, req *s3_lifecycle
|
||||
return retryLater("TRANSPORT_ERROR: deleteNullVersion: " + err.Error()), nil
|
||||
}
|
||||
}
|
||||
if _, err := s3a.createDeleteMarker(req.Bucket, req.ObjectPath, "", nil, ""); err != nil {
|
||||
if _, err := s3a.createDeleteMarker(req.Bucket, req.ObjectPath, markerOwner, nil, ""); err != nil {
|
||||
return retryLater("TRANSPORT_ERROR: createDeleteMarker: " + err.Error()), nil
|
||||
}
|
||||
return done(), nil
|
||||
|
||||
Reference in New Issue
Block a user