s3: commit a versioned PutObject and its latest pointer in one transaction (#9756)

* s3: commit a versioned PutObject and its latest pointer in one transaction

A versioned PutObject wrote the version file and flipped the .versions
latest pointer in two separate routed transactions. Fold the
RECOMPUTE_LATEST into the version file's PUT so both commit atomically
under the object's per-path lock: the recompute, applied after the PUT in
the same transaction, scans the directory and sees the new version. A
crash can no longer leave the version present with a stale pointer.

putToFiler now takes a putFinalize describing the finalize step — routed
mutations folded into the PUT, or an afterCreate run under the object
write lock off the ring. Suspended-versioning keeps its afterCreate-only
form; multipart, copy, and delete-marker finalizes are unchanged.

* s3: trim verbose finalize comments
This commit is contained in:
Chris Lu
2026-05-31 00:13:36 -07:00
committed by GitHub
parent d806778757
commit 6b06fe5ec4
4 changed files with 127 additions and 52 deletions
+41 -27
View File
@@ -352,6 +352,17 @@ func (s3a *S3ApiServer) withObjectWriteLock(bucket, object string, preconditionF
return fn()
}
// putFinalize folds an object write's finalize into its create. On the routed
// path its mutations ride in the entry's PUT transaction under lockKey, committing
// atomically (e.g. the .versions pointer flip); off the ring afterCreate does the
// equivalent under the object write lock. A finalize with no routed form (suspended
// IsLatest fixups) carries no mutations and runs only via afterCreate.
type putFinalize struct {
lockKey string
mutations []*filer_pb.ObjectMutation
afterCreate func(entry *filer_pb.Entry) s3err.ErrorCode
}
// putToFiler writes one chunk of object bytes (a full PutObject body, a
// single MPU part, a copy-part destination). lifecycleTTLSec is non-zero
// only for top-level PutObject paths where the lifecycle XML's
@@ -359,7 +370,7 @@ func (s3a *S3ApiServer) withObjectWriteLock(bucket, object string, preconditionF
// pass 0 because their own keys aren't the user-visible object the rule
// targets and a part write would otherwise bind a TTL clock starting
// before CompleteMultipartUpload.
func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader io.Reader, bucket string, object string, partNumber int, lifecycleTTLSec int32, afterCreate func(entry *filer_pb.Entry) s3err.ErrorCode, uniqueWritePath bool) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader io.Reader, bucket string, object string, partNumber int, lifecycleTTLSec int32, finalize *putFinalize, uniqueWritePath bool) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
// NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy
// This eliminates the filer proxy overhead for PUT operations
// Note: filePath is now passed directly instead of URL (no parsing needed)
@@ -819,8 +830,8 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
return filerErrorToS3Error(createErr)
}
entryCreated = true
if afterCreate != nil {
if afterCreateCode := afterCreate(entry); afterCreateCode != s3err.ErrNone {
if finalize != nil && finalize.afterCreate != nil {
if afterCreateCode := finalize.afterCreate(entry); afterCreateCode != s3err.ErrNone {
rollbackErr = s3a.rmObject(path.Dir(filePath), path.Base(filePath), true, false)
if rollbackErr != nil {
glog.Errorf("putToFiler: failed to rollback created entry for %s after post-create error: %v", filePath, rollbackErr)
@@ -833,15 +844,19 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
return s3err.ErrNone
}
// Route the create to the object's owner filer, whose per-path lock
// serializes it, then run afterCreate (e.g. a versioned finalize that routes
// itself). Conditional/object-lock/non-reducible cases fall back to the
// distributed lock.
// Route the create to the object's owner filer (its per-path lock serializes
// it); conditional/object-lock/non-reducible cases fall back to the lock.
var createCode s3err.ErrorCode
routed := false
if owner := s3a.routableWriteOwner(bucket, object); owner != "" {
if cond, ok := routeWriteCondition(r, uniqueWritePath); ok {
resp, err := s3a.routedPut(owner, s3a.objectRouteKey(bucket, object), filePath, entry, cond)
// Routed mutations ride in the PUT's transaction (committing atomically),
// so lockKey is the object path they carry, not the version file path.
lockKey, finalizeMutations := filePath, []*filer_pb.ObjectMutation(nil)
if finalize != nil && len(finalize.mutations) > 0 {
lockKey, finalizeMutations = finalize.lockKey, finalize.mutations
}
resp, err := s3a.routedPut(owner, s3a.objectRouteKey(bucket, object), lockKey, filePath, entry, cond, finalizeMutations)
switch {
case err != nil:
glog.Warningf("putToFiler: routed PUT to %s failed for %s, falling back to lock: %v", owner, filePath, err)
@@ -852,8 +867,10 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
glog.Warningf("putToFiler: routed PUT to %s returned %q for %s, falling back to lock", owner, resp.Error, filePath)
default:
entryCreated, routed, createCode = true, true, s3err.ErrNone
if afterCreate != nil {
createCode = afterCreate(entry)
// Bundled mutations already finalized in the transaction above;
// a finalize with none (suspended versioning) runs off the lock.
if len(finalizeMutations) == 0 && finalize != nil && finalize.afterCreate != nil {
createCode = finalize.afterCreate(entry)
}
}
}
@@ -1300,22 +1317,19 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
}
}
// Upload the file using putToFiler - this will create the file with version metadata.
// Versioned/suspended bucket → resolver returns 0 by construction;
// pass 0 directly so the path is explicit at the call site.
//
// Clear the prior latest-version pointer (and stamp the displaced
// entry with NoncurrentSinceNs) inside the afterCreate callback so
// it runs while withObjectWriteLock is still held in putToFiler.
// Doing it after putToFiler returns would race a concurrent PUT
// promoting a newer latest, which we'd then incorrectly wipe.
etag, errCode, sseMetadata = s3a.putToFiler(r, filePath, body, bucket, normalizedObject, 1, 0, func(_ *filer_pb.Entry) s3err.ErrorCode {
if err := s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, normalizedObject); err != nil {
// Best-effort: a stale IsLatest flag is recoverable on the
// next list-versions resync, so don't fail the PUT.
glog.Warningf("putSuspendedVersioningObject: failed to update IsLatest flags: %v", err)
}
return s3err.ErrNone
// Versioned/suspended bucket → resolver returns 0; pass it directly.
// afterCreate clears the prior latest pointer and stamps the displaced version
// with NoncurrentSinceNs — off-ring under the write lock, routed off-lock after
// the PUT. Best-effort either way: a stale flag self-heals on the next list.
etag, errCode, sseMetadata = s3a.putToFiler(r, filePath, body, bucket, normalizedObject, 1, 0, &putFinalize{
afterCreate: func(_ *filer_pb.Entry) s3err.ErrorCode {
if err := s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, normalizedObject); err != nil {
// Best-effort: a stale IsLatest flag is recoverable on the
// next list-versions resync, so don't fail the PUT.
glog.Warningf("putSuspendedVersioningObject: failed to update IsLatest flags: %v", err)
}
return s3err.ErrNone
},
}, false)
if errCode != s3err.ErrNone {
glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
@@ -1483,7 +1497,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
// directly — versioned objects sit on regular volumes and the
// lifecycle worker handles their expiration.
etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, normalizedObject, 1, 0,
s3a.versionedAfterCreate(bucket, normalizedObject, versionId, versionFileName, useInvertedFormat), true)
s3a.versionedFinalize(bucket, normalizedObject, versionId, versionFileName, useInvertedFormat), true)
if errCode != s3err.ErrNone {
glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
return "", "", errCode, SSEResponseMetadata{}
+18 -11
View File
@@ -158,19 +158,25 @@ func (s3a *S3ApiServer) objectTxnOnFiler(owner pb.ServerAddress, req *filer_pb.O
return resp, err
}
// routedPut writes an object entry as a one-mutation ObjectTransaction on the
// owner filer. lock_key is the object's full path so the transaction shares the
// per-path lock with a concurrent create or delete of the same key.
func (s3a *S3ApiServer) routedPut(owner pb.ServerAddress, routeKey, filePath string, entry *filer_pb.Entry, cond *filer_pb.WriteCondition) (*filer_pb.ObjectTransactionResponse, error) {
// routedPut writes an entry as a PUT, optionally followed by finalize mutations,
// as one ObjectTransaction applied in order under lockKey on the owner filer.
// lockKey is normally the entry's own path; a versioned add instead passes the
// object path plus a RECOMPUTE_LATEST finalize, so the version's PUT and its
// .versions pointer flip commit atomically (the recompute scans .versions/ after
// the PUT and sees the new version).
func (s3a *S3ApiServer) routedPut(owner pb.ServerAddress, routeKey, lockKey, filePath string, entry *filer_pb.Entry, cond *filer_pb.WriteCondition, finalize []*filer_pb.ObjectMutation) (*filer_pb.ObjectTransactionResponse, error) {
mutations := make([]*filer_pb.ObjectMutation, 0, 1+len(finalize))
mutations = append(mutations, &filer_pb.ObjectMutation{
Type: filer_pb.ObjectMutation_PUT,
Directory: path.Dir(filePath),
Entry: entry,
})
mutations = append(mutations, finalize...)
return s3a.objectTxnOnFiler(owner, &filer_pb.ObjectTransactionRequest{
LockKey: filePath,
LockKey: lockKey,
RouteKey: routeKey,
Condition: cond,
Mutations: []*filer_pb.ObjectMutation{{
Type: filer_pb.ObjectMutation_PUT,
Directory: path.Dir(filePath),
Entry: entry,
}},
Mutations: mutations,
})
}
@@ -193,7 +199,8 @@ func (s3a *S3ApiServer) routedMkFile(owner pb.ServerAddress, routeKey, parentDir
if fn != nil {
fn(entry)
}
resp, err := s3a.routedPut(owner, routeKey, parentDir+"/"+name, entry, nil)
filePath := parentDir + "/" + name
resp, err := s3a.routedPut(owner, routeKey, filePath, filePath, entry, nil, nil)
if err != nil {
return err
}
+15 -14
View File
@@ -170,19 +170,20 @@ func (s3a *S3ApiServer) routedDeleteNullVersion(owner pb.ServerAddress, bucket,
}
}
// versionedAfterCreate returns the putToFiler hook that finalizes a versioned
// write: the routed RECOMPUTE_LATEST when the owner is known, else the existing
// lock-free updateLatestVersionInDirectory.
func (s3a *S3ApiServer) versionedAfterCreate(bucket, object, versionId, versionFileName string, useInvertedFormat bool) func(*filer_pb.Entry) s3err.ErrorCode {
owner := s3a.objectWriteOwner(bucket, object)
return func(versionEntry *filer_pb.Entry) s3err.ErrorCode {
if owner != "" {
return s3a.routedVersionedFinalize(owner, bucket, object, useInvertedFormat)
}
if err := s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName, versionEntry); err != nil {
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
return s3err.ErrInternalError
}
return s3err.ErrNone
// versionedFinalize flips the .versions latest pointer for a versioned PutObject:
// on the routed path RECOMPUTE_LATEST rides in the version file's PUT transaction,
// committing atomically under the object's per-path lock; off the ring
// updateLatestVersionInDirectory does it under the object write lock.
func (s3a *S3ApiServer) versionedFinalize(bucket, object, versionId, versionFileName string, useInvertedFormat bool) *putFinalize {
return &putFinalize{
lockKey: s3a.toFilerPath(bucket, object),
mutations: []*filer_pb.ObjectMutation{s3a.latestPointerRecompute(bucket, object, useInvertedFormat, "", true)},
afterCreate: func(versionEntry *filer_pb.Entry) s3err.ErrorCode {
if err := s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName, versionEntry); err != nil {
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
return s3err.ErrInternalError
}
return s3err.ErrNone
},
}
}
@@ -496,6 +496,59 @@ func TestObjectTransactionRecomputeDemoteAndAttrs(t *testing.T) {
}
}
// A versioned add is one transaction: the PUT writes the new version file and
// the RECOMPUTE_LATEST that follows, under the same lock, scans the directory,
// sees it, flips the .versions pointer to it, and demotes the prior latest. This
// is what lets putVersionedObject commit the version and its pointer atomically.
func TestObjectTransactionPutThenRecomputeLatest(t *testing.T) {
t0 := time.Unix(1700000000, 0)
t1 := time.Unix(1700000100, 0)
fs, store := newTxnTestServer(map[string]*filer.Entry{
"/buckets/b/obj/.versions": {
Attr: filer.Attr{Inode: 2, Mtime: t0, Crtime: t0, Mode: 0755 | (1 << 31)},
Extended: map[string][]byte{"latestName": []byte("v1.ver"), "latestVid": []byte("v1")},
},
"/buckets/b/obj/.versions/v1.ver": {
Attr: filer.Attr{Inode: 10, Mtime: t0, Crtime: t0, Mode: 0644, FileSize: 100},
Extended: map[string][]byte{"vid": []byte("v1")},
},
})
resp, err := fs.ObjectTransaction(context.Background(), &filer_pb.ObjectTransactionRequest{
LockKey: "/buckets/b/obj",
Mutations: []*filer_pb.ObjectMutation{
{Type: filer_pb.ObjectMutation_PUT, Directory: "/buckets/b/obj/.versions", Entry: &filer_pb.Entry{
Name: "v2.ver",
Attributes: &filer_pb.FuseAttributes{Mtime: t1.Unix(), FileMode: 0644, Inode: 11, FileSize: 250},
Extended: map[string][]byte{"vid": []byte("v2")},
}},
{Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST, Directory: "/buckets/b/obj", Name: ".versions",
Recompute: &filer_pb.Recompute{
ScanDir: "/buckets/b/obj/.versions",
Descending: true,
CopyExtended: map[string]string{"latestVid": "vid"},
NameToKey: "latestName",
DemoteKey: "noncurrentSince",
DemoteValue: []byte("999"),
}},
},
})
if err != nil || resp.Error != "" {
t.Fatalf("txn failed: err=%v resp=%q", err, resp.Error)
}
if _, ok := store.entries["/buckets/b/obj/.versions/v2.ver"]; !ok {
t.Fatalf("the PUT should have created the new version")
}
ptr := store.entries["/buckets/b/obj/.versions"].Extended
if string(ptr["latestName"]) != "v2.ver" || string(ptr["latestVid"]) != "v2" {
t.Fatalf("pointer should flip to the just-PUT version, got name=%s vid=%s", ptr["latestName"], ptr["latestVid"])
}
if got := store.entries["/buckets/b/obj/.versions/v1.ver"].Extended["noncurrentSince"]; string(got) != "999" {
t.Errorf("prior latest v1.ver should be demoted, noncurrentSince=%q want 999", got)
}
}
// A version-specific delete locks the object (condition_key checks WORM on the
// version), recomputes the pointer excluding the version (repoint-before-delete),
// then deletes it. A legal-hold guard blocks the delete and preserves the entry.