mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
s3: collapse suspended-versioning deletes onto one null marker (#9845)
A suspended-versioning DELETE was recorded with createDeleteMarker, which mints a fresh real version id each time, so repeated suspended deletes piled up delete markers instead of overwriting a single null marker as S3 specifies. Record the suspended delete as a 'null' marker with a fixed file name (v_null) and point the latest-version pointer at it explicitly; putSuspendedVersioningObject's existing null-version cleanup removes it on the next suspended PUT, so the object undeletes cleanly and at most one null marker exists. Enabled-versioning deletes are unchanged (still distinct historical markers). Update TestSuspendedVersioningDeleteBehavior to the AWS-correct counts: one null marker after a suspended delete, and the null marker plus one real marker after a re-enabled delete.
This commit is contained in:
@@ -543,12 +543,12 @@ func TestSuspendedVersioningDeleteBehavior(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, listResp.Versions, 3, "Should be back to 3 versions after deleting null version")
|
||||
// A suspended-versioning DELETE removes the null version and records a delete
|
||||
// marker so the object reads as deleted (without one, an older version would
|
||||
// resurface). AWS keeps a single null marker that overwrites; SeaweedFS does
|
||||
// not yet collapse to one (suspended-marker id/dedup is tracked separately),
|
||||
// so assert the invariant - a marker is recorded - rather than an exact count.
|
||||
assert.NotEmpty(t, listResp.DeleteMarkers, "Suspended delete should record a delete marker")
|
||||
// A suspended-versioning DELETE removes the null version and records a single
|
||||
// null delete marker that overwrites any prior one (S3 spec), so the object
|
||||
// reads as deleted without markers piling up.
|
||||
require.Len(t, listResp.DeleteMarkers, 1, "Suspended delete should record exactly one null delete marker")
|
||||
assert.Equal(t, "null", aws.ToString(listResp.DeleteMarkers[0].VersionId), "Suspended delete marker should have version id null")
|
||||
assert.Equal(t, objectKey, aws.ToString(listResp.DeleteMarkers[0].Key), "Delete marker should be for the deleted object")
|
||||
|
||||
// Verify null version is gone
|
||||
nullVersionFound = false
|
||||
@@ -609,7 +609,19 @@ func TestSuspendedVersioningDeleteBehavior(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, listResp.Versions, 4, "Should have 3 original versions + 1 new version")
|
||||
assert.NotEmpty(t, listResp.DeleteMarkers, "Re-enabled delete should record a delete marker")
|
||||
// The historical null suspended marker plus the new enabled-mode delete marker.
|
||||
require.Len(t, listResp.DeleteMarkers, 2, "Should have the null suspended marker and the enabled-mode delete marker")
|
||||
var sawNull, sawEnabled bool
|
||||
for _, dm := range listResp.DeleteMarkers {
|
||||
assert.Equal(t, objectKey, aws.ToString(dm.Key), "Delete marker should be for the deleted object")
|
||||
if aws.ToString(dm.VersionId) == "null" {
|
||||
sawNull = true
|
||||
} else {
|
||||
sawEnabled = true
|
||||
}
|
||||
}
|
||||
assert.True(t, sawNull, "Suspended phase should leave a null delete marker")
|
||||
assert.True(t, sawEnabled, "Enabled delete should add a real-version-id delete marker")
|
||||
|
||||
t.Logf("Successfully verified suspended versioning delete behavior")
|
||||
}
|
||||
|
||||
@@ -152,12 +152,13 @@ func (s3a *S3ApiServer) deleteVersionedObject(r *http.Request, bucket, object, v
|
||||
glog.Errorf("deleteVersionedObject: failed to delete null version for %s/%s: %v", bucket, object, err)
|
||||
return result, s3err.ErrInternalError
|
||||
}
|
||||
deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object)
|
||||
if err != nil {
|
||||
glog.Errorf("deleteVersionedObject: failed to create delete marker for suspended versioning %s/%s: %v", bucket, object, err)
|
||||
// Suspended versioning overwrites the null version with a single null delete
|
||||
// marker (S3 spec), so the marker replaces any prior one instead of piling up.
|
||||
if err := s3a.createNullDeleteMarker(bucket, object); err != nil {
|
||||
glog.Errorf("deleteVersionedObject: failed to create null delete marker for suspended versioning %s/%s: %v", bucket, object, err)
|
||||
return result, s3err.ErrInternalError
|
||||
}
|
||||
result.versionId = deleteMarkerVersionId
|
||||
result.versionId = "null"
|
||||
result.deleteMarker = true
|
||||
return result, s3err.ErrNone
|
||||
}
|
||||
|
||||
@@ -257,6 +257,56 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error
|
||||
return versionId, nil
|
||||
}
|
||||
|
||||
// createNullDeleteMarker records a suspended-versioning delete as a single "null"
|
||||
// delete marker. Unlike createDeleteMarker (enabled versioning, where each delete is
|
||||
// a distinct historical marker), a suspended delete overwrites the null version per
|
||||
// the S3 spec, so this reuses the "null" version id and its fixed file name (v_null):
|
||||
// repeated suspended deletes collapse onto one marker instead of accumulating, and a
|
||||
// later suspended PUT removes it via putSuspendedVersioningObject's null-version
|
||||
// cleanup. The latest-version pointer is set explicitly (not recomputed) because
|
||||
// "null" does not sort as the newest version id.
|
||||
func (s3a *S3ApiServer) createNullDeleteMarker(bucket, object string) error {
|
||||
cleanObject := strings.TrimPrefix(object, "/")
|
||||
bucketDir := s3a.bucketDir(bucket)
|
||||
versionsDir := bucketDir + "/" + cleanObject + s3_constants.VersionsFolder
|
||||
versionFileName := s3a.getVersionFileName("null")
|
||||
|
||||
mtime := time.Now().Unix()
|
||||
markerExtended := map[string][]byte{
|
||||
s3_constants.ExtVersionIdKey: []byte("null"),
|
||||
s3_constants.ExtDeleteMarkerKey: []byte("true"),
|
||||
}
|
||||
|
||||
if err := s3a.mkFile(versionsDir, versionFileName, nil, func(entry *filer_pb.Entry) {
|
||||
entry.IsDirectory = false
|
||||
if entry.Attributes == nil {
|
||||
entry.Attributes = &filer_pb.FuseAttributes{}
|
||||
}
|
||||
entry.Attributes.Mtime = mtime
|
||||
if entry.Extended == nil {
|
||||
entry.Extended = make(map[string][]byte)
|
||||
}
|
||||
for k, v := range markerExtended {
|
||||
entry.Extended[k] = v
|
||||
}
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to create null delete marker in .versions directory: %w", err)
|
||||
}
|
||||
|
||||
markerEntry := &filer_pb.Entry{
|
||||
Name: versionFileName,
|
||||
IsDirectory: false,
|
||||
Attributes: &filer_pb.FuseAttributes{Mtime: mtime},
|
||||
Extended: markerExtended,
|
||||
}
|
||||
if err := s3a.updateLatestVersionInDirectory(bucket, cleanObject, "null", versionFileName, markerEntry); err != nil {
|
||||
return fmt.Errorf("failed to point latest at null delete marker for %s/%s: %w", bucket, object, err)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("createNullDeleteMarker: recorded null delete marker for %s/%s", bucket, object)
|
||||
return nil
|
||||
}
|
||||
|
||||
// versionListItem represents an item in the unified version/prefix list
|
||||
type versionListItem struct {
|
||||
key string
|
||||
|
||||
Reference in New Issue
Block a user