mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(s3api): self-heal stale .versions latest-version pointer on read (#9125)
* fix(s3api): self-heal stale .versions latest-version pointer on read When the `.versions` directory metadata points at a version file that has gone missing (e.g. a crash between deleting the latest version and rewriting the pointer, or a concurrent delete racing with a read), `getLatestObjectVersion` bailed with a hard error that required manual repair. Tagging, ACL, retention, copy-source, and HEAD/GET all surfaced NoSuchKey even though other versions remained on disk. On `ErrNotFound`/`codes.NotFound` from the pointed-at version lookup, rescan the `.versions` directory, pick the newest remaining non-delete- marker entry, persist the repaired pointer (best-effort), and return that entry. If only delete markers (or nothing) remain, the caller still sees an error and the object correctly appears absent. Extracted the selection logic into a pure `selectLatestContentVersion` helper so `updateLatestVersionAfterDeletion` and the new self-heal path share a single implementation. A warning is logged whenever the heal kicks in so stale-pointer incidents remain visible in operator logs. * fix(s3api): self-heal must promote newest version even if delete marker Review feedback (gemini-code-assist): the self-heal path used `selectLatestContentVersion`, which skips delete markers. That had two bugs: 1. If the chronologically newest entry was a delete marker, an older content version would be promoted, effectively "undeleting" an object that was actually deleted. 2. If only delete markers remained, heal returned an error and the caller surfaced a hard 500 instead of the correct 404-with- x-amz-delete-marker response. Add `selectLatestVersion` that picks the newest entry regardless of type (content or delete marker) and use it in `healStaleLatestVersionPointer`. The promoted entry flows back through `doGetLatestObjectVersion` unchanged; downstream handlers already detect `ExtDeleteMarkerKey` on the returned entry and render NoSuchKey + `x-amz-delete-marker: true` (see `s3api_object_handlers.go:722-728`). Kept `selectLatestContentVersion` in place for `updateLatestVersionAfterDeletion`, which deliberately limits the pointer to a live content version in the post-deletion flow — changing that is out of scope for this fix. Added four tests for the new selector: - promotes newest delete marker over older content (the reviewer case) - picks content when content is newest - promotes newest delete marker when only delete markers remain - returns nil latestEntry on empty/untagged input * fix(s3api): paginate .versions scan in self-heal path Review feedback (gemini-code-assist, coderabbitai): the self-heal rescan did a single-shot list(..., 1000). For objects whose version ids use the old raw-timestamp format, filer ordering is lexicographic-ascending = oldest-first. If the .versions directory held more than one page of entries, the first page contained only the oldest, and the heal would promote an older version as the new "latest" — silently surfacing stale data on subsequent reads. Paginate through the whole directory with `filer.PaginationSize`, running `selectLatestVersion` per page and keeping a single running best candidate across pages (via `compareVersionIds`). This mirrors the pagination pattern already used by `getObjectVersionList` in the same file and closes the window for old-format stacks larger than one page. New-format (inverted-timestamp) ids were not affected because their lexicographic order matches newest-first, but paginating is still the right fix. Also updated the function doc to reflect that self-heal now promotes the newest entry regardless of type (content version or delete marker). * fix(s3api): don't resurrect deleted objects; wrap ErrNotFound sentinel Two review findings addressed: 1. `updateLatestVersionAfterDeletion` was using `selectLatestContentVersion` which skipped delete markers. Scenario: PUT v1, DELETE (dm1 written, pointer->dm1), PUT v3 (pointer->v3), user explicitly deletes v3 by versionId. Remaining files: v1, dm1. S3 semantics say the current version is the chronologically newest = dm1 (object appears deleted). Old code would promote v1, "undeleting" the object silently. Switched to `selectLatestVersion`, which picks the newest entry regardless of type. Also paginated the scan the same way the self-heal path does — otherwise a single-page `list(1000)` still mis-selects the latest for old-format version id stacks that exceed one page. Removed the `hasDeleteMarkers || !isLast` branch: with `selectLatestVersion` any delete marker participates in the comparison and shows up as the latest when it is newest, so the "keep the directory on ambiguity" guard becomes unreachable. Full pagination also makes the `!isLast` guard unnecessary — we either see every entry or surface a list error. 2. `healStaleLatestVersionPointer`'s "no remaining version" error was a plain `fmt.Errorf`, so callers could not distinguish genuine object-absence from a scan failure via `errors.Is`. Wrap it with `filer_pb.ErrNotFound` so the sentinel flows through the chain (the outer wrap already uses `%w`). No test additions needed — `TestSelectLatestVersion_PromotesNewestDeleteMarker` already asserts the "newer delete marker beats older content" invariant that drives both fixes. * refactor(s3api): drop unused selectLatestContentVersion after review Review feedback flagged a stale comment claiming selectLatestContentVersion "mirrors" the post-deletion semantics of updateLatestVersionAfterDeletion. That claim became false when the post-deletion path switched to selectLatestVersion in the previous commit. Verified no production callers remain — only the helper's own tests referenced it — so the cleaner fix is to delete the dead code rather than rewrite the comment to explain "this exists but isn't used." - Removed selectLatestContentVersion. - Removed the four TestSelectLatestContentVersion_* tests. - Preserved a renamed TestSelectLatestVersion_MixedFormats so the mixed-format comparator coverage still runs against the active selector.
This commit is contained in:
@@ -21,6 +21,8 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
s3_constants "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// ErrDeleteMarker is returned when the latest version is a delete marker (expected condition)
|
||||
@@ -1035,7 +1037,12 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateLatestVersionAfterDeletion finds the new latest version after deleting the current latest
|
||||
// updateLatestVersionAfterDeletion finds the new latest version after deleting
|
||||
// the current latest. The pointer may refer to a delete marker: if a delete
|
||||
// marker is chronologically newer than the most recent remaining content
|
||||
// version, S3 semantics treat the object as deleted and the pointer must
|
||||
// reflect that. Restricting the scan to content versions here would resurrect
|
||||
// the object by promoting an older content version over a newer delete marker.
|
||||
func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string) error {
|
||||
bucketDir := s3a.bucketDir(bucket)
|
||||
versionsObjectPath := object + s3_constants.VersionsFolder
|
||||
@@ -1043,51 +1050,40 @@ func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string)
|
||||
|
||||
glog.V(1).Infof("updateLatestVersionAfterDeletion: updating latest version for %s/%s, listing %s", bucket, object, versionsDir)
|
||||
|
||||
// List all remaining version files in the .versions directory
|
||||
entries, isLast, err := s3a.list(versionsDir, "", "", false, 1000)
|
||||
if err != nil {
|
||||
glog.Errorf("updateLatestVersionAfterDeletion: failed to list versions in %s: %v", versionsDir, err)
|
||||
return fmt.Errorf("failed to list versions: %v", err)
|
||||
// Paginate through all remaining entries and keep a running best candidate.
|
||||
// A single-shot list would miss the true latest for old-format (raw
|
||||
// timestamp) version ids when the directory exceeds one page, since filer
|
||||
// order is lexicographic-ascending = oldest-first for that format.
|
||||
var (
|
||||
latestVersionEntry *filer_pb.Entry
|
||||
latestVersionId string
|
||||
latestVersionFileName string
|
||||
latestIsDeleteMarker bool
|
||||
totalEntries int
|
||||
startFrom string
|
||||
)
|
||||
for {
|
||||
entries, isLast, err := s3a.list(versionsDir, "", startFrom, false, filer.PaginationSize)
|
||||
if err != nil {
|
||||
glog.Errorf("updateLatestVersionAfterDeletion: failed to list versions in %s: %v", versionsDir, err)
|
||||
return fmt.Errorf("failed to list versions: %v", err)
|
||||
}
|
||||
totalEntries += len(entries)
|
||||
if pageEntry, pageId, pageFile, pageDM := selectLatestVersion(entries); pageEntry != nil {
|
||||
if latestVersionEntry == nil || compareVersionIds(pageId, latestVersionId) < 0 {
|
||||
latestVersionEntry = pageEntry
|
||||
latestVersionId = pageId
|
||||
latestVersionFileName = pageFile
|
||||
latestIsDeleteMarker = pageDM
|
||||
}
|
||||
}
|
||||
if isLast || len(entries) == 0 {
|
||||
break
|
||||
}
|
||||
startFrom = entries[len(entries)-1].Name
|
||||
}
|
||||
|
||||
glog.V(1).Infof("updateLatestVersionAfterDeletion: found %d entries in %s", len(entries), versionsDir)
|
||||
|
||||
// Find the most recent remaining version (latest timestamp in version ID)
|
||||
var latestVersionId string
|
||||
var latestVersionFileName string
|
||||
var latestVersionEntry *filer_pb.Entry
|
||||
hasDeleteMarkers := false
|
||||
|
||||
for _, entry := range entries {
|
||||
if entry.Extended == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
|
||||
if !hasVersionId {
|
||||
continue
|
||||
}
|
||||
|
||||
versionId := string(versionIdBytes)
|
||||
|
||||
// Skip delete markers when finding latest content version
|
||||
isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey]
|
||||
if string(isDeleteMarkerBytes) == "true" {
|
||||
hasDeleteMarkers = true
|
||||
continue
|
||||
}
|
||||
|
||||
// Compare version IDs chronologically using unified comparator (handles both old and new formats)
|
||||
// compareVersionIds returns negative if first arg is newer
|
||||
if latestVersionId == "" || compareVersionIds(versionId, latestVersionId) < 0 {
|
||||
glog.V(1).Infof("updateLatestVersionAfterDeletion: found newer version %s (file: %s)", versionId, entry.Name)
|
||||
latestVersionId = versionId
|
||||
latestVersionFileName = entry.Name
|
||||
latestVersionEntry = entry
|
||||
} else {
|
||||
glog.V(1).Infof("updateLatestVersionAfterDeletion: skipping older or equal version %s", versionId)
|
||||
}
|
||||
}
|
||||
glog.V(1).Infof("updateLatestVersionAfterDeletion: scanned %d entries in %s", totalEntries, versionsDir)
|
||||
|
||||
// Update the .versions directory metadata
|
||||
versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
|
||||
@@ -1099,16 +1095,14 @@ func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string)
|
||||
versionsEntry.Extended = make(map[string][]byte)
|
||||
}
|
||||
|
||||
if latestVersionId != "" {
|
||||
// Update metadata to point to new latest version
|
||||
if latestVersionEntry != nil {
|
||||
// Update metadata to point at the new latest (content version or
|
||||
// delete marker — whichever is chronologically newest).
|
||||
versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(latestVersionId)
|
||||
versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(latestVersionFileName)
|
||||
|
||||
// Update cached list metadata from the new latest version entry
|
||||
setCachedListMetadata(versionsEntry, latestVersionEntry)
|
||||
|
||||
glog.V(2).Infof("updateLatestVersionAfterDeletion: new latest version for %s/%s is %s", bucket, object, latestVersionId)
|
||||
// Update the .versions directory entry with new latest version metadata
|
||||
glog.V(2).Infof("updateLatestVersionAfterDeletion: new latest version for %s/%s is %s (deleteMarker=%v)", bucket, object, latestVersionId, latestIsDeleteMarker)
|
||||
err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
|
||||
updatedEntry.Extended = versionsEntry.Extended
|
||||
updatedEntry.Attributes = versionsEntry.Attributes
|
||||
@@ -1117,13 +1111,11 @@ func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update .versions directory metadata: %v", err)
|
||||
}
|
||||
} else if hasDeleteMarkers || !isLast {
|
||||
// Delete markers still exist in the .versions directory, or the listing was
|
||||
// truncated so there may be more entries. Either way, keep the directory.
|
||||
glog.V(2).Infof("updateLatestVersionAfterDeletion: no content versions found for %s/%s but .versions directory still has entries (deleteMarkers=%v, isLast=%v), keeping directory",
|
||||
bucket, object, hasDeleteMarkers, isLast)
|
||||
} else {
|
||||
// No entries at all - delete the .versions directory entirely
|
||||
// No version-tagged entries remain - delete the .versions directory.
|
||||
// rm is non-recursive, so any stray non-version entries will cause
|
||||
// rm to fail; we log and continue, treating the directory cleanup as
|
||||
// best-effort since the version data is already gone.
|
||||
glog.V(2).Infof("updateLatestVersionAfterDeletion: no versions left for %s/%s, deleting .versions directory", bucket, object)
|
||||
|
||||
err = s3a.rm(bucketDir, versionsObjectPath, true, false)
|
||||
@@ -1283,12 +1275,134 @@ func (s3a *S3ApiServer) doGetLatestObjectVersion(bucket, object string, maxRetri
|
||||
latestVersionPath := versionsObjectPath + "/" + latestVersionFile
|
||||
latestVersionEntry, err := s3a.getEntry(bucketDir, latestVersionPath)
|
||||
if err != nil {
|
||||
// The pointer refers to a version file that no longer exists. Rather than
|
||||
// surfacing a hard error that requires manual repair, rescan the .versions
|
||||
// directory and self-heal the pointer to whatever remains.
|
||||
if errors.Is(err, filer_pb.ErrNotFound) || status.Code(err) == codes.NotFound {
|
||||
healed, healErr := s3a.healStaleLatestVersionPointer(bucket, normalizedObject, versionsEntry, latestVersionFile)
|
||||
if healErr == nil {
|
||||
return healed, nil
|
||||
}
|
||||
return nil, fmt.Errorf("stale latest-version pointer for %s/%s (file %s) could not self-heal: %w", bucket, normalizedObject, latestVersionFile, healErr)
|
||||
}
|
||||
return nil, fmt.Errorf("failed to get latest version file %s: %v", latestVersionPath, err)
|
||||
}
|
||||
|
||||
return latestVersionEntry, nil
|
||||
}
|
||||
|
||||
// selectLatestVersion returns the chronologically newest entry with a version
|
||||
// id, including delete markers. isDeleteMarker reflects whether the selected
|
||||
// entry is a delete marker. Returns nil for latestEntry when the directory
|
||||
// contains no version-id-tagged entries.
|
||||
//
|
||||
// This is the correct selector for the self-heal path: the .versions pointer
|
||||
// tracks the current-version-regardless-of-type (see createDeleteMarker), and
|
||||
// promoting a delete marker keeps S3 semantics — downstream handlers observe
|
||||
// ExtDeleteMarkerKey on the returned entry and respond with NoSuchKey.
|
||||
func selectLatestVersion(entries []*filer_pb.Entry) (latestEntry *filer_pb.Entry, latestVersionId, latestVersionFileName string, isDeleteMarker bool) {
|
||||
for _, entry := range entries {
|
||||
if entry == nil || entry.Extended == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
|
||||
if !hasVersionId {
|
||||
continue
|
||||
}
|
||||
|
||||
versionId := string(versionIdBytes)
|
||||
// compareVersionIds returns negative when the first arg is newer
|
||||
if latestVersionId == "" || compareVersionIds(versionId, latestVersionId) < 0 {
|
||||
latestVersionId = versionId
|
||||
latestVersionFileName = entry.Name
|
||||
latestEntry = entry
|
||||
isDeleteMarker = string(entry.Extended[s3_constants.ExtDeleteMarkerKey]) == "true"
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// healStaleLatestVersionPointer is invoked when the .versions directory metadata
|
||||
// points to a version file that no longer exists. It paginates the directory,
|
||||
// picks the chronologically newest remaining entry (content version or delete
|
||||
// marker), updates the directory pointer metadata best-effort, and returns the
|
||||
// rescanned entry. Downstream handlers detect ExtDeleteMarkerKey on the
|
||||
// returned entry and render NoSuchKey, so promoting a delete marker preserves
|
||||
// correct S3 semantics. If no version-tagged entry remains an error is
|
||||
// returned and the caller surfaces it as not found.
|
||||
func (s3a *S3ApiServer) healStaleLatestVersionPointer(bucket, normalizedObject string, versionsEntry *filer_pb.Entry, stalePointerFile string) (*filer_pb.Entry, error) {
|
||||
bucketDir := s3a.bucketDir(bucket)
|
||||
versionsObjectPath := normalizedObject + s3_constants.VersionsFolder
|
||||
versionsDir := bucketDir + "/" + versionsObjectPath
|
||||
|
||||
glog.Warningf("healStaleLatestVersionPointer: stale pointer for %s/%s - version file %q missing, rescanning %s", bucket, normalizedObject, stalePointerFile, versionsDir)
|
||||
|
||||
// Paginate through all version entries and keep a running best candidate.
|
||||
// A single-shot list would miss the true latest when old-format (raw
|
||||
// timestamp) version ids spill past one page, since filesystem order is
|
||||
// lexicographic-ascending = oldest-first for that format.
|
||||
//
|
||||
// Pick the chronologically newest entry regardless of type. Promoting a
|
||||
// delete marker is correct: S3 semantics treat it as the current version
|
||||
// and the caller renders NoSuchKey (with x-amz-delete-marker) from the
|
||||
// returned entry. Restricting to content versions here would "undelete"
|
||||
// the object by promoting an older content version over a newer marker.
|
||||
var (
|
||||
latestEntry *filer_pb.Entry
|
||||
latestVersionId string
|
||||
latestVersionFileName string
|
||||
isDeleteMarker bool
|
||||
startFrom string
|
||||
)
|
||||
for {
|
||||
entries, isLast, err := s3a.list(versionsDir, "", startFrom, false, filer.PaginationSize)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list %s: %w", versionsDir, err)
|
||||
}
|
||||
if pageEntry, pageId, pageFile, pageDM := selectLatestVersion(entries); pageEntry != nil {
|
||||
if latestEntry == nil || compareVersionIds(pageId, latestVersionId) < 0 {
|
||||
latestEntry = pageEntry
|
||||
latestVersionId = pageId
|
||||
latestVersionFileName = pageFile
|
||||
isDeleteMarker = pageDM
|
||||
}
|
||||
}
|
||||
if isLast || len(entries) == 0 {
|
||||
break
|
||||
}
|
||||
startFrom = entries[len(entries)-1].Name
|
||||
}
|
||||
|
||||
if latestEntry == nil {
|
||||
// Wrap filer_pb.ErrNotFound so callers can distinguish genuine
|
||||
// object-absence (nothing left to promote) from scan failures
|
||||
// (I/O errors during list) via errors.Is.
|
||||
return nil, fmt.Errorf("%w: no remaining version in %s", filer_pb.ErrNotFound, versionsDir)
|
||||
}
|
||||
|
||||
if versionsEntry.Extended == nil {
|
||||
versionsEntry.Extended = make(map[string][]byte)
|
||||
}
|
||||
versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(latestVersionId)
|
||||
versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(latestVersionFileName)
|
||||
setCachedListMetadata(versionsEntry, latestEntry)
|
||||
|
||||
if mkErr := s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
|
||||
updatedEntry.Extended = versionsEntry.Extended
|
||||
updatedEntry.Attributes = versionsEntry.Attributes
|
||||
updatedEntry.Chunks = versionsEntry.Chunks
|
||||
}); mkErr != nil {
|
||||
// Persisting the repair is best-effort. Surface a warning but still
|
||||
// return the rescanned entry so the read succeeds; a subsequent write
|
||||
// on the object will persist a fresh pointer.
|
||||
glog.Warningf("healStaleLatestVersionPointer: failed to persist repaired pointer for %s/%s: %v (returning rescanned entry)", bucket, normalizedObject, mkErr)
|
||||
} else {
|
||||
glog.V(1).Infof("healStaleLatestVersionPointer: repaired pointer for %s/%s to version %s (file %s, deleteMarker=%v)", bucket, normalizedObject, latestVersionId, latestVersionFileName, isDeleteMarker)
|
||||
}
|
||||
return latestEntry, nil
|
||||
}
|
||||
|
||||
// getLatestVersionEntryFromDirectoryEntry creates a logical entry for list operations using cached metadata
|
||||
// from the .versions directory entry. This achieves SINGLE-SCAN efficiency - no additional getEntry calls needed.
|
||||
//
|
||||
|
||||
@@ -0,0 +1,127 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// newVersionEntry builds a .versions directory child entry with the given
|
||||
// version id and name, optionally tagged as a delete marker.
|
||||
func newVersionEntry(name, versionId string, isDeleteMarker bool) *filer_pb.Entry {
|
||||
ext := map[string][]byte{
|
||||
s3_constants.ExtVersionIdKey: []byte(versionId),
|
||||
}
|
||||
if isDeleteMarker {
|
||||
ext[s3_constants.ExtDeleteMarkerKey] = []byte("true")
|
||||
}
|
||||
return &filer_pb.Entry{
|
||||
Name: name,
|
||||
Attributes: &filer_pb.FuseAttributes{},
|
||||
Extended: ext,
|
||||
}
|
||||
}
|
||||
|
||||
// TestSelectLatestVersion_MixedFormats ensures the chronological comparator
|
||||
// is used when the directory contains both old- and new-format version ids
|
||||
// created across a format upgrade.
|
||||
func TestSelectLatestVersion_MixedFormats(t *testing.T) {
|
||||
baseTs := int64(1700000000000000000)
|
||||
oldId := createOldFormatVersionId(baseTs)
|
||||
newIdLater := createNewFormatVersionId(baseTs + int64(time.Minute)) // chronologically newer
|
||||
|
||||
entries := []*filer_pb.Entry{
|
||||
newVersionEntry("v-old."+oldId, oldId, false),
|
||||
newVersionEntry("v-new-later."+newIdLater, newIdLater, false),
|
||||
}
|
||||
|
||||
latest, latestId, latestName, isDM := selectLatestVersion(entries)
|
||||
|
||||
assert.NotNil(t, latest)
|
||||
assert.Equal(t, newIdLater, latestId, "newer timestamp should win across formats")
|
||||
assert.Equal(t, "v-new-later."+newIdLater, latestName)
|
||||
assert.False(t, isDM)
|
||||
}
|
||||
|
||||
// TestSelectLatestVersion_PromotesNewestDeleteMarker verifies the
|
||||
// selector promotes a delete marker when it is the chronologically newest
|
||||
// entry. Returning the older content version would "undelete" the object.
|
||||
func TestSelectLatestVersion_PromotesNewestDeleteMarker(t *testing.T) {
|
||||
baseTs := int64(1700000000000000000)
|
||||
olderContentId := createOldFormatVersionId(baseTs)
|
||||
newerDmId := createOldFormatVersionId(baseTs + int64(time.Minute))
|
||||
|
||||
entries := []*filer_pb.Entry{
|
||||
newVersionEntry("v-content."+olderContentId, olderContentId, false),
|
||||
newVersionEntry("dm-newer."+newerDmId, newerDmId, true),
|
||||
}
|
||||
|
||||
latest, latestId, latestName, isDM := selectLatestVersion(entries)
|
||||
|
||||
assert.NotNil(t, latest)
|
||||
assert.Equal(t, newerDmId, latestId, "newest delete marker must win")
|
||||
assert.Equal(t, "dm-newer."+newerDmId, latestName)
|
||||
assert.True(t, isDM, "selected entry is a delete marker")
|
||||
}
|
||||
|
||||
// TestSelectLatestVersion_ContentWinsWhenNewer verifies that when a content
|
||||
// version is chronologically newest, it is selected and isDeleteMarker=false.
|
||||
func TestSelectLatestVersion_ContentWinsWhenNewer(t *testing.T) {
|
||||
baseTs := int64(1700000000000000000)
|
||||
olderDmId := createOldFormatVersionId(baseTs)
|
||||
newerContentId := createOldFormatVersionId(baseTs + int64(time.Minute))
|
||||
|
||||
entries := []*filer_pb.Entry{
|
||||
newVersionEntry("dm-older."+olderDmId, olderDmId, true),
|
||||
newVersionEntry("v-newer."+newerContentId, newerContentId, false),
|
||||
}
|
||||
|
||||
latest, latestId, latestName, isDM := selectLatestVersion(entries)
|
||||
|
||||
assert.NotNil(t, latest)
|
||||
assert.Equal(t, newerContentId, latestId)
|
||||
assert.Equal(t, "v-newer."+newerContentId, latestName)
|
||||
assert.False(t, isDM)
|
||||
}
|
||||
|
||||
// TestSelectLatestVersion_OnlyDeleteMarkers verifies that when only delete
|
||||
// markers are present, the self-heal selector still returns the newest one
|
||||
// so the pointer can be repaired and the caller correctly renders 404.
|
||||
func TestSelectLatestVersion_OnlyDeleteMarkers(t *testing.T) {
|
||||
baseTs := int64(1700000000000000000)
|
||||
dmOlder := createOldFormatVersionId(baseTs)
|
||||
dmNewer := createOldFormatVersionId(baseTs + int64(time.Minute))
|
||||
|
||||
entries := []*filer_pb.Entry{
|
||||
newVersionEntry("dm-older."+dmOlder, dmOlder, true),
|
||||
newVersionEntry("dm-newer."+dmNewer, dmNewer, true),
|
||||
}
|
||||
|
||||
latest, latestId, latestName, isDM := selectLatestVersion(entries)
|
||||
|
||||
assert.NotNil(t, latest, "self-heal must still promote the newest delete marker when that is all that remains")
|
||||
assert.Equal(t, dmNewer, latestId)
|
||||
assert.Equal(t, "dm-newer."+dmNewer, latestName)
|
||||
assert.True(t, isDM)
|
||||
}
|
||||
|
||||
// TestSelectLatestVersion_EmptyOrUntagged verifies nil latestEntry when there
|
||||
// is no version-id-tagged entry at all.
|
||||
func TestSelectLatestVersion_EmptyOrUntagged(t *testing.T) {
|
||||
entries := []*filer_pb.Entry{
|
||||
nil,
|
||||
{Name: "no-extended"},
|
||||
{Name: "empty-extended", Extended: map[string][]byte{}},
|
||||
{Name: "no-version-id", Extended: map[string][]byte{"some-other-key": []byte("x")}},
|
||||
}
|
||||
|
||||
latest, latestId, latestName, isDM := selectLatestVersion(entries)
|
||||
|
||||
assert.Nil(t, latest)
|
||||
assert.Empty(t, latestId)
|
||||
assert.Empty(t, latestName)
|
||||
assert.False(t, isDM)
|
||||
}
|
||||
Reference in New Issue
Block a user