mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(volume): stop flipping volumes read-only on a non-append-ordered .idx (#9726)
* fix(volume): verify the .dat-tail needle in the integrity check CheckVolumeDataIntegrity checked the last entry by file position in the .idx and, for a live needle, flipped the volume read-only when fileSize > fileTailOffset. That entry is the .dat tail only when the .idx is in append order; a key-sorted .idx (weed fix and other rebuilds listed entries by key) puts the highest-key needle last, whose tail sits mid-file, so healthy volumes went read-only on every load and re-running weed fix only reproduced the sorted index. Locate the needle at the maximum offset — the one physically last in the .dat — and verify the .dat ends exactly at it, regardless of .idx ordering. The append-ordered common case stays O(1) (the last entry's on-disk end matches the .dat size); only a key-sorted index pays a single linear scan. Deletion tombstones at the tail are now verified too, instead of skipping the file-size check. * fix(command): weed fix rebuilds the .idx in .dat offset order SaveToIdx wrote entries via AscendingVisit — sorted by key, the .sdx/.ecx shape — so the rebuilt .idx put the highest-key needle last instead of the .dat-tail needle, and dropped tombstones whose live needle was gone. Collect the live and deleted entries, sort by .dat offset, and write them in append order so the .idx stays a faithful log whose last entry is the real .dat tail.
This commit is contained in:
+27
-7
@@ -6,6 +6,7 @@ import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
@@ -190,15 +191,34 @@ func SaveToIdx(scaner *VolumeFileScanner4Fix, idxName string) (ret error) {
|
||||
idxFile.Close()
|
||||
}()
|
||||
|
||||
return scaner.nm.AscendingVisit(func(value needle_map.NeedleValue) error {
|
||||
_, err := idxFile.Write(value.ToBytes())
|
||||
if scaner.includeDeleted && err == nil {
|
||||
if deleted, ok := scaner.nmDeleted.Get(value.Key); ok {
|
||||
_, err = idxFile.Write(deleted.ToBytes())
|
||||
}
|
||||
}
|
||||
// Emit entries in .dat offset (append) order so the .idx stays the
|
||||
// append-ordered log the volume server writes at runtime — not sorted by
|
||||
// key, which is the .sdx / .ecx shape. A key-sorted .idx puts the
|
||||
// highest-key needle last instead of the .dat-tail needle, which used to
|
||||
// flip volumes read-only on load. Every tombstone is included (including
|
||||
// any whose live needle is gone) so the last entry is the real .dat tail.
|
||||
var values []needle_map.NeedleValue
|
||||
collect := func(value needle_map.NeedleValue) error {
|
||||
values = append(values, value)
|
||||
return nil
|
||||
}
|
||||
if err = scaner.nm.AscendingVisit(collect); err != nil {
|
||||
return err
|
||||
}
|
||||
if scaner.includeDeleted {
|
||||
if err = scaner.nmDeleted.AscendingVisit(collect); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
sort.Slice(values, func(i, j int) bool {
|
||||
return values[i].Offset.ToActualOffset() < values[j].Offset.ToActualOffset()
|
||||
})
|
||||
for _, value := range values {
|
||||
if _, err = idxFile.Write(value.ToBytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func doFixOneVolume(basepath string, baseFileName string, collection string, volumeId int64, fixIncludeDeleted bool) {
|
||||
|
||||
@@ -0,0 +1,75 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
)
|
||||
|
||||
// TestSaveToIdxWritesOffsetOrder guards that weed fix rebuilds the .idx as an
|
||||
// append-ordered log (sorted by .dat offset), not sorted by key. A key-sorted
|
||||
// .idx puts the highest-key needle last instead of the .dat-tail needle, which
|
||||
// flipped volumes read-only on load (issue #9688). It must also carry every
|
||||
// tombstone — including one whose live needle is gone — so the last entry is
|
||||
// the real .dat tail.
|
||||
func TestSaveToIdxWritesOffsetOrder(t *testing.T) {
|
||||
nm := needle_map.NewMemDb()
|
||||
defer nm.Close()
|
||||
nmDeleted := needle_map.NewMemDb()
|
||||
defer nmDeleted.Close()
|
||||
|
||||
// Live needles with high keys at low offsets (as if written first).
|
||||
for _, e := range []struct {
|
||||
key uint64
|
||||
offset int64
|
||||
}{{30, 8}, {20, 128}, {10, 256}} {
|
||||
if err := nm.Set(types.Uint64ToNeedleId(e.key), types.ToOffset(e.offset), types.Size(100)); err != nil {
|
||||
t.Fatalf("nm.Set: %v", err)
|
||||
}
|
||||
}
|
||||
// A tombstone at the .dat tail whose live needle is gone; the old SaveToIdx
|
||||
// dropped these because the key was absent from the live map.
|
||||
if err := nmDeleted.Set(types.Uint64ToNeedleId(5), types.ToOffset(384), types.TombstoneFileSize); err != nil {
|
||||
t.Fatalf("nmDeleted.Set: %v", err)
|
||||
}
|
||||
|
||||
scanner := &VolumeFileScanner4Fix{nm: nm, nmDeleted: nmDeleted, includeDeleted: true}
|
||||
|
||||
idxPath := filepath.Join(t.TempDir(), "v.idx")
|
||||
if err := SaveToIdx(scanner, idxPath); err != nil {
|
||||
t.Fatalf("SaveToIdx: %v", err)
|
||||
}
|
||||
|
||||
f, err := os.Open(idxPath)
|
||||
if err != nil {
|
||||
t.Fatalf("open idx: %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var offsets []int64
|
||||
var lastKey types.NeedleId
|
||||
if err := idx.WalkIndexFile(f, 0, func(key types.NeedleId, offset types.Offset, size types.Size) error {
|
||||
offsets = append(offsets, offset.ToActualOffset())
|
||||
lastKey = key
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("walk idx: %v", err)
|
||||
}
|
||||
|
||||
if len(offsets) != 4 {
|
||||
t.Fatalf("expected 4 entries (3 live + 1 tombstone), got %d", len(offsets))
|
||||
}
|
||||
for i := 1; i < len(offsets); i++ {
|
||||
if offsets[i] < offsets[i-1] {
|
||||
t.Fatalf("entries not in offset order: %v", offsets)
|
||||
}
|
||||
}
|
||||
// The .dat-tail needle (the orphan tombstone at the highest offset) must be last.
|
||||
if lastKey != types.Uint64ToNeedleId(5) {
|
||||
t.Errorf("last entry should be the .dat-tail tombstone (key 5), got key %v", lastKey)
|
||||
}
|
||||
}
|
||||
@@ -146,22 +146,80 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin
|
||||
// fits inside .dat — issue #8928) lives in volume.load(): it reads
|
||||
// MaximumNeedleEnd from the needle map after the load walk, so we don't
|
||||
// need a redundant linear scan of the .idx here.
|
||||
healthyIndexSize := indexSize
|
||||
for i := 1; i <= 10 && indexSize >= int64(i)*types.NeedleMapEntrySize; i++ {
|
||||
// check and fix last 10 entries
|
||||
lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*types.NeedleMapEntrySize)
|
||||
if err == io.EOF {
|
||||
healthyIndexSize = indexSize - int64(i)*types.NeedleMapEntrySize
|
||||
continue
|
||||
}
|
||||
if err != ErrorSizeMismatch {
|
||||
break
|
||||
|
||||
// Verify the needle physically last in the .dat (the highest offset) and
|
||||
// that the .dat ends exactly at it. The .idx is not always in .dat append
|
||||
// order: weed fix and other rebuilds could rewrite it sorted by key, which
|
||||
// puts the highest-key needle last instead of the .dat-tail needle. Picking
|
||||
// the last file-position entry there compares a mid-file needle's tail
|
||||
// against the full .dat size and falsely flips the volume read-only on every
|
||||
// load (issue #9688).
|
||||
tailEntryPos, err := findDatTailEntryOffset(v, indexFile, indexSize)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("CheckVolumeDataIntegrity %s: %v", indexFile.Name(), err)
|
||||
}
|
||||
if tailEntryPos < 0 {
|
||||
// pre-allocated / all-zero index, nothing to verify against the .dat
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
if lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, tailEntryPos); err != nil {
|
||||
return lastAppendAtNs, fmt.Errorf("CheckVolumeDataIntegrity %s failed: %v", indexFile.Name(), err)
|
||||
}
|
||||
return lastAppendAtNs, nil
|
||||
}
|
||||
|
||||
// findDatTailEntryOffset returns the .idx file position of the entry whose
|
||||
// needle is physically last in the .dat (highest offset). The common case — an
|
||||
// append-ordered .idx — is resolved in O(1): the last entry's on-disk end
|
||||
// equals the .dat size. A key-sorted .idx (e.g. legacy weed fix output) falls
|
||||
// back to a single linear scan for the maximum offset. Returns -1 when no entry
|
||||
// has a non-zero offset (a pre-allocated / all-zero index).
|
||||
func findDatTailEntryOffset(v *Volume, indexFile *os.File, indexSize int64) (int64, error) {
|
||||
// Fast path: an append-ordered .idx ends with the .dat-tail needle, so the
|
||||
// last entry's on-disk end matches the .dat size.
|
||||
if v.DataBackend != nil {
|
||||
if datSize, _, statErr := v.DataBackend.GetStat(); statErr == nil && datSize > 0 {
|
||||
lastPos := indexSize - types.NeedleMapEntrySize
|
||||
entryBytes, err := readIndexEntryAtOffset(indexFile, lastPos)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("read last index entry: %v", err)
|
||||
}
|
||||
_, offset, size := idx.IdxFileEntry(entryBytes)
|
||||
if !offset.IsZero() && needleDiskEnd(offset, size, v.Version()) == datSize {
|
||||
return lastPos, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
if healthyIndexSize < indexSize {
|
||||
return 0, fmt.Errorf("CheckVolumeDataIntegrity %s failed: index size %d differs from healthy size %d", indexFile.Name(), indexSize, healthyIndexSize)
|
||||
|
||||
// Slow path: scan the whole .idx for the entry at the maximum offset.
|
||||
maxEntryPos := int64(-1)
|
||||
maxActualOffset := int64(-1)
|
||||
entryPos := int64(0)
|
||||
if err := idx.WalkIndexFile(indexFile, 0, func(_ types.NeedleId, offset types.Offset, _ types.Size) error {
|
||||
if !offset.IsZero() {
|
||||
if ao := offset.ToActualOffset(); ao > maxActualOffset {
|
||||
maxActualOffset = ao
|
||||
maxEntryPos = entryPos
|
||||
}
|
||||
}
|
||||
entryPos += types.NeedleMapEntrySize
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, fmt.Errorf("scan index: %v", err)
|
||||
}
|
||||
return
|
||||
return maxEntryPos, nil
|
||||
}
|
||||
|
||||
// needleDiskEnd returns the byte offset just past the needle's on-disk record.
|
||||
// Deletion tombstones carry TombstoneFileSize (-1) in the .idx but are written
|
||||
// with DataSize=0, so their on-disk record is sized as 0.
|
||||
func needleDiskEnd(offset types.Offset, size types.Size, version needle.Version) int64 {
|
||||
onDiskSize := size
|
||||
if size.IsDeleted() {
|
||||
onDiskSize = 0
|
||||
}
|
||||
return offset.ToActualOffset() + needle.GetActualSize(onDiskSize, version)
|
||||
}
|
||||
|
||||
func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (lastAppendAtNs uint64, err error) {
|
||||
@@ -174,10 +232,10 @@ func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (
|
||||
return 0, nil
|
||||
}
|
||||
if size < 0 {
|
||||
// read the deletion entry. Pass io.EOF and ErrorSizeMismatch through
|
||||
// unwrapped so CheckVolumeDataIntegrity can recognize them and run its
|
||||
// trailing-truncation / wrap-around recovery loop, matching the live
|
||||
// branch below.
|
||||
// Deletion tombstone: the .idx carries TombstoneFileSize (-1) but the
|
||||
// appended needle in .dat carries Size=0. verifyDeletedNeedleIntegrity
|
||||
// reads it with Size=0 (the size check / 32GB wrap-around retry would
|
||||
// otherwise read past EOF) and still verifies the .dat ends at it.
|
||||
if lastAppendAtNs, err = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset(), key); err != nil {
|
||||
if err == io.EOF || err == ErrorSizeMismatch {
|
||||
return lastAppendAtNs, err
|
||||
@@ -274,8 +332,8 @@ func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.V
|
||||
// which reads past EOF and falsely marks the volume read-only.
|
||||
size := types.Size(0)
|
||||
if err = n.ReadData(datFile, offset, size, v); err != nil {
|
||||
// Preserve io.EOF and ErrorSizeMismatch as-is so CheckVolumeDataIntegrity
|
||||
// can detect trailing truncation and trigger its wrap-around retry.
|
||||
// A truncated tail surfaces here as io.EOF; pass it (and a size mismatch)
|
||||
// through so the volume is marked read-only.
|
||||
if err == io.EOF || err == ErrorSizeMismatch {
|
||||
return n.AppendAtNs, err
|
||||
}
|
||||
@@ -284,7 +342,17 @@ func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.V
|
||||
if n.Id != key {
|
||||
return n.AppendAtNs, fmt.Errorf("index key %v does not match needle's Id %v", key, n.Id)
|
||||
}
|
||||
return n.AppendAtNs, err
|
||||
// This tombstone is the needle at the .dat tail (the highest offset), so the
|
||||
// .dat must end exactly at it; extra bytes mean an unindexed trailing record.
|
||||
fileSize, _, statErr := datFile.GetStat()
|
||||
if statErr != nil {
|
||||
return n.AppendAtNs, fmt.Errorf("stat file %s: %v", datFile.Name(), statErr)
|
||||
}
|
||||
if fileTailOffset := offset + needle.GetActualSize(size, v); fileSize > fileTailOffset {
|
||||
glog.Warningf("data file %s actual %d bytes expected %d bytes!", datFile.Name(), fileSize, fileTailOffset)
|
||||
return n.AppendAtNs, fmt.Errorf("data file %s actual %d bytes expected %d bytes", datFile.Name(), fileSize, fileTailOffset)
|
||||
}
|
||||
return n.AppendAtNs, nil
|
||||
}
|
||||
|
||||
func (v *Volume) checkIdxFile() error {
|
||||
|
||||
@@ -4,11 +4,14 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
)
|
||||
@@ -179,6 +182,189 @@ func TestCheckVolumeDataIntegrityWithDeletionTombstone(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestCheckVolumeDataIntegritySortedIndex reproduces issue #9688: after the
|
||||
// .idx is rebuilt sorted by key — what weed fix and other rebuilds emitted via
|
||||
// needle_map.AscendingVisit — the last file-position entry is the highest-key
|
||||
// needle, not the needle at the .dat tail. The integrity check compared that
|
||||
// mid-file needle's tail against the full .dat size, so fileSize > fileTailOffset
|
||||
// tripped and flipped the volume read-only on every load even though the .dat
|
||||
// was intact. The check must locate and verify the needle at the .dat tail
|
||||
// (the maximum offset) regardless of .idx ordering.
|
||||
func TestCheckVolumeDataIntegritySortedIndex(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("volume creation: %v", err)
|
||||
}
|
||||
defer v.Close()
|
||||
|
||||
// Write keys in descending order so the highest key lands at the lowest
|
||||
// .dat offset; the last-written needle (lowest key) sits at the tail.
|
||||
for _, id := range []uint64{30, 20, 10} {
|
||||
if _, _, _, err := v.writeNeedle2(newRandomNeedle(id), true, false); err != nil {
|
||||
t.Fatalf("write needle %d: %v", id, err)
|
||||
}
|
||||
}
|
||||
if err := v.DataBackend.Sync(); err != nil {
|
||||
t.Fatalf("sync .dat: %v", err)
|
||||
}
|
||||
if err := v.nm.Sync(); err != nil {
|
||||
t.Fatalf("sync .idx: %v", err)
|
||||
}
|
||||
|
||||
// Rebuild the .idx sorted ascending by key, the shape weed fix's SaveToIdx
|
||||
// emitted. The last entry is now key 30, which is physically first in the
|
||||
// .dat — far from the .dat tail.
|
||||
rewriteIdxSortedByKey(t, v.FileName(".idx"))
|
||||
|
||||
idxFile, err := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("open .idx: %v", err)
|
||||
}
|
||||
defer idxFile.Close()
|
||||
|
||||
if _, err := CheckVolumeDataIntegrity(v, idxFile); err != nil {
|
||||
t.Fatalf("CheckVolumeDataIntegrity should pass for a key-sorted .idx: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCheckVolumeDataIntegrityVerifiesDeletionTail ensures a deletion tombstone
|
||||
// at the .dat tail is verified rather than skipped: unindexed bytes appended
|
||||
// past it must flip the volume read-only, not be silently tolerated.
|
||||
func TestCheckVolumeDataIntegrityVerifiesDeletionTail(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("volume creation: %v", err)
|
||||
}
|
||||
defer v.Close()
|
||||
|
||||
for i := 1; i <= 3; i++ {
|
||||
if _, _, _, err := v.writeNeedle2(newRandomNeedle(uint64(i)), true, false); err != nil {
|
||||
t.Fatalf("write needle %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
// Delete a needle so a tombstone is the last (highest-offset) record.
|
||||
if _, err := v.doDeleteRequest(newEmptyNeedle(2)); err != nil {
|
||||
t.Fatalf("delete needle: %v", err)
|
||||
}
|
||||
// Append unindexed bytes past the tombstone tail.
|
||||
datSize, _, err := v.DataBackend.GetStat()
|
||||
if err != nil {
|
||||
t.Fatalf("stat .dat: %v", err)
|
||||
}
|
||||
if _, err := v.DataBackend.WriteAt(make([]byte, 64), datSize); err != nil {
|
||||
t.Fatalf("append trailing bytes: %v", err)
|
||||
}
|
||||
if err := v.DataBackend.Sync(); err != nil {
|
||||
t.Fatalf("sync .dat: %v", err)
|
||||
}
|
||||
if err := v.nm.Sync(); err != nil {
|
||||
t.Fatalf("sync .idx: %v", err)
|
||||
}
|
||||
|
||||
idxFile, err := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("open .idx: %v", err)
|
||||
}
|
||||
defer idxFile.Close()
|
||||
|
||||
if _, err := CheckVolumeDataIntegrity(v, idxFile); err == nil {
|
||||
t.Fatal("CheckVolumeDataIntegrity should detect bytes past a deletion-tombstone tail")
|
||||
}
|
||||
}
|
||||
|
||||
// TestVolumeLoadStaysWritableWithKeySortedIndex reproduces issue #9688 end to
|
||||
// end through the real volume load path: a volume whose .idx is sorted by key
|
||||
// (the on-disk state weed fix left behind) must reload writable, not flip to
|
||||
// read-only — and a live needle must still be readable afterward.
|
||||
func TestVolumeLoadStaysWritableWithKeySortedIndex(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
// Build a volume: keys written descending so the highest key (30) lands at
|
||||
// the lowest .dat offset, then delete one so a tombstone sits at the tail —
|
||||
// the layout in the bug report (highest-key needle mid-file, tombstone last).
|
||||
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("volume creation: %v", err)
|
||||
}
|
||||
wanted := newRandomNeedle(10)
|
||||
for _, n := range []*needle.Needle{newRandomNeedle(30), newRandomNeedle(20), wanted} {
|
||||
if _, _, _, err := v.writeNeedle2(n, true, false); err != nil {
|
||||
t.Fatalf("write needle %d: %v", n.Id, err)
|
||||
}
|
||||
}
|
||||
if _, err := v.doDeleteRequest(newEmptyNeedle(20)); err != nil {
|
||||
t.Fatalf("delete needle: %v", err)
|
||||
}
|
||||
idxName := v.FileName(".idx")
|
||||
v.Close()
|
||||
|
||||
// Rewrite the .idx sorted by key — the shape old weed fix produced.
|
||||
rewriteIdxSortedByKey(t, idxName)
|
||||
|
||||
// Reload through the real load path; the integrity check must not flip it.
|
||||
reloaded, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("reload: %v", err)
|
||||
}
|
||||
defer reloaded.Close()
|
||||
if reloaded.noWriteOrDelete {
|
||||
t.Fatal("volume flipped read-only after reload with a key-sorted .idx (issue #9688)")
|
||||
}
|
||||
|
||||
// The surviving needle is still readable, and new writes still succeed.
|
||||
rn := &needle.Needle{Id: wanted.Id}
|
||||
if _, err := reloaded.readNeedle(rn, nil, nil); err != nil {
|
||||
t.Fatalf("read surviving needle after reload: %v", err)
|
||||
}
|
||||
if _, _, _, err := reloaded.writeNeedle2(newRandomNeedle(40), true, false); err != nil {
|
||||
t.Fatalf("write after reload should succeed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// rewriteIdxSortedByKey rewrites the .idx in ascending key order — the on-disk
|
||||
// shape weed fix's SaveToIdx emitted (needle_map.AscendingVisit) — so a test can
|
||||
// exercise an .idx whose file order differs from .dat append order.
|
||||
func rewriteIdxSortedByKey(t *testing.T, idxPath string) {
|
||||
t.Helper()
|
||||
f, err := os.OpenFile(idxPath, os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("open .idx for read: %v", err)
|
||||
}
|
||||
var entries []needle_map.NeedleValue
|
||||
if walkErr := idx.WalkIndexFile(f, 0, func(key types.NeedleId, offset types.Offset, size types.Size) error {
|
||||
entries = append(entries, needle_map.NeedleValue{Key: key, Offset: offset, Size: size})
|
||||
return nil
|
||||
}); walkErr != nil {
|
||||
f.Close()
|
||||
t.Fatalf("walk .idx: %v", walkErr)
|
||||
}
|
||||
f.Close()
|
||||
|
||||
// Sort by key, then by offset, so a key's live entry precedes its tombstone
|
||||
// — the order weed fix's AscendingVisit emitted.
|
||||
sort.Slice(entries, func(i, j int) bool {
|
||||
if entries[i].Key != entries[j].Key {
|
||||
return entries[i].Key < entries[j].Key
|
||||
}
|
||||
return entries[i].Offset.ToActualOffset() < entries[j].Offset.ToActualOffset()
|
||||
})
|
||||
|
||||
out, err := os.OpenFile(idxPath, os.O_WRONLY|os.O_TRUNC, 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("open .idx for rewrite: %v", err)
|
||||
}
|
||||
defer out.Close()
|
||||
for _, e := range entries {
|
||||
if _, err := out.Write(e.ToBytes()); err != nil {
|
||||
t.Fatalf("write idx entry: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestMaxNeedleEnd ensures the needle map's MaxNeedleEnd accumulator lets
|
||||
// volume.load() detect an .idx that references bytes past the end of the .dat
|
||||
// — the deeper-than-tail corruption shape from issue #8928 that the existing
|
||||
|
||||
Reference in New Issue
Block a user