mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
cfc08fbf6c
* fix(volume): pass on-disk tombstone size to ReadData in verifyDeletedNeedleIntegrity verifyDeletedNeedleIntegrity was forwarding TombstoneFileSize (-1) into Needle.ReadData. A deletion tombstone is appended to .dat with DataSize=0 so the on-disk needle header carries Size=0; TombstoneFileSize is only the .idx sentinel for "this entry is deleted" and is never written into a needle header. ReadBytes' size check therefore mismatched on every tombstone (-1 != 0), returned ErrorSizeMismatch, and triggered the 4-byte-offset wrap-around retry in ReadData (offset + 32 GB). On any volume large enough that offset+32 GB exceeds dat fileSize the retry read EOF, CheckVolumeDataIntegrity reported corruption, and the loader set noWriteOrDelete = true. Every volume whose last 10 .idx entries included a deletion went read-only on startup — i.e. any healthy volume where the most recent operations included a delete. Pass Size(0) so the size check matches the on-disk tombstone header. Add a regression test that writes three needles, deletes one, and asserts CheckVolumeDataIntegrity succeeds with a tombstone at the .idx tail. Without this fix the test reproduces the exact log shape from the bug report: read 0 dataSize 32 offset <orig+32GB> fileSize <much smaller>: EOF verifyDeletedNeedleIntegrity ...idx failed: read data [N,N+32) : EOF The Rust port guards its integrity-check size comparison with !size.is_deleted() (seaweed-volume/src/storage/volume.rs) and never hits this path, so no Rust mirror change is needed. * test(seaweed-volume): mirror Go regression for deletion-tombstone integrity The Rust integrity check already guards its size-mismatch comparison with !size.is_deleted() (volume.rs:1859) and reads tombstone AppendAtNs with body_size=0, so the Go regression fixed in the previous commit does not apply. Lock that guarantee in with a parallel reload test: write three needles, delete one, sync, reopen via Volume::new, assert the volume is not flipped read-only. Catches any future change that removes the deleted-entry guard or re-introduces a size-strict path in check_volume_data_integrity for tombstones. * fix(volume): propagate io.EOF and ErrorSizeMismatch from verifyDeletedNeedleIntegrity CheckVolumeDataIntegrity relies on identity comparison against io.EOF and ErrorSizeMismatch to walk back through the last ten .idx entries and tolerate a partial truncation at the tail (the "fix and continue" loop). The live-needle branch in doCheckAndFixVolumeData already returns those sentinels unwrapped; the deletion branch wrapped them in fmt.Errorf, so a genuine .dat truncation past a tombstone offset broke the recovery and flipped the volume read-only. Mirror the live-needle handling: both verifyDeletedNeedleIntegrity and doCheckAndFixVolumeData now short-circuit on io.EOF / ErrorSizeMismatch and pass them through unwrapped. Other errors keep their existing context wrapping. Also tighten the regression test to capture lastAppendAtNs and assert it's non-zero, so a future regression that skips the tombstone body (and therefore never populates AppendAtNs) is caught even when the err check still passes.
285 lines
10 KiB
Go
285 lines
10 KiB
Go
package storage
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
// openIndex returns a file descriptor for the volume's index, and the index size in bytes.
|
|
func (v *Volume) openIndex() (*os.File, int64, error) {
|
|
idxFileName := v.FileName(".idx")
|
|
idxFile, err := os.OpenFile(idxFileName, os.O_RDONLY, 0644)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("failed to open IDX file %s for volume %v: %v", idxFileName, v.Id, err)
|
|
}
|
|
|
|
idxStat, err := idxFile.Stat()
|
|
if err != nil {
|
|
idxFile.Close()
|
|
return nil, 0, fmt.Errorf("failed to stat IDX file %s for volume %v: %v", idxFileName, v.Id, err)
|
|
}
|
|
if idxStat.Size() == 0 {
|
|
idxFile.Close()
|
|
return nil, 0, fmt.Errorf("zero-size IDX file for volume %v at %s", v.Id, idxFileName)
|
|
}
|
|
|
|
return idxFile, idxStat.Size(), nil
|
|
}
|
|
|
|
// ScrubIndex checks the volume's index for issues.
|
|
func (v *Volume) ScrubIndex() (int64, []error) {
|
|
v.dataFileAccessLock.RLock()
|
|
defer v.dataFileAccessLock.RUnlock()
|
|
|
|
idxFile, idxFileSize, err := v.openIndex()
|
|
if err != nil {
|
|
return 0, []error{err}
|
|
}
|
|
defer idxFile.Close()
|
|
|
|
return idx.CheckIndexFile(idxFile, idxFileSize, v.Version())
|
|
}
|
|
|
|
// scrubVolumeData checks a volume content + index for issues.
|
|
func (v *Volume) scrubVolumeData(dataFile backend.BackendStorageFile, idxFile *os.File, idxFileSize int64) (int64, []error) {
|
|
// full scrubbing means also scrubbing the index
|
|
var count int64
|
|
_, errs := idx.CheckIndexFile(idxFile, idxFileSize, v.Version())
|
|
|
|
// read and check every indexed needle
|
|
var totalRead int64
|
|
version := v.Version()
|
|
err := idx.WalkIndexFile(idxFile, 0, func(id types.NeedleId, offset types.Offset, size types.Size) error {
|
|
count++
|
|
// compute the actual size of the needle in disk, including needle header, body and alignment padding.
|
|
actualSize := int64(needle.GetActualSize(size, version))
|
|
|
|
// TODO: Needle.ReadData() is currently broken for deleted files, which have a types.Size < 0. Fix
|
|
// so deleted needles get properly scrubbed as well.
|
|
// TODO: idx.WalkIndexFile() returns a size -1 (and actual size of 32 bytes) for deleted needles. We
|
|
// want to scrub deleted needles whenever possible.
|
|
if size.IsDeleted() {
|
|
totalRead += actualSize
|
|
return nil
|
|
}
|
|
|
|
n := needle.Needle{}
|
|
if err := n.ReadData(dataFile, offset.ToActualOffset(), size, version); err != nil {
|
|
errs = append(errs, fmt.Errorf("needle %d on volume %d: %v", id, v.Id, err))
|
|
}
|
|
|
|
totalRead += actualSize
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
|
|
// check total volume file size
|
|
wantSize := totalRead + super_block.SuperBlockSize
|
|
dataSize, _, err := dataFile.GetStat()
|
|
if err != nil {
|
|
errs = append(errs, fmt.Errorf("failed to stat data file for volume %d: %v", v.Id, err))
|
|
} else {
|
|
if dataSize < wantSize {
|
|
errs = append(errs, fmt.Errorf("data file for volume %d is smaller (%d) than the %d needles it contains (%d)", v.Id, dataSize, count, wantSize))
|
|
} else if dataSize != wantSize {
|
|
errs = append(errs, fmt.Errorf("data file size for volume %d (%d) doesn't match the size for %d needles read (%d)", v.Id, dataSize, count, wantSize))
|
|
}
|
|
}
|
|
|
|
return count, errs
|
|
}
|
|
|
|
// Scrub checks the entire volume content for issues.
|
|
func (v *Volume) Scrub() (int64, []error) {
|
|
v.dataFileAccessLock.RLock()
|
|
defer v.dataFileAccessLock.RUnlock()
|
|
|
|
idxFile, idxFileSize, err := v.openIndex()
|
|
if err != nil {
|
|
return 0, []error{err}
|
|
}
|
|
defer idxFile.Close()
|
|
|
|
return v.scrubVolumeData(v.DataBackend, idxFile, idxFileSize)
|
|
}
|
|
|
|
func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) {
|
|
var indexSize int64
|
|
if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil {
|
|
return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err)
|
|
}
|
|
if indexSize == 0 {
|
|
return 0, nil
|
|
}
|
|
// The deeper-than-tail structural check (every (offset + actual size)
|
|
// fits inside .dat — issue #8928) lives in volume.load(): it reads
|
|
// MaximumNeedleEnd from the needle map after the load walk, so we don't
|
|
// need a redundant linear scan of the .idx here.
|
|
healthyIndexSize := indexSize
|
|
for i := 1; i <= 10 && indexSize >= int64(i)*types.NeedleMapEntrySize; i++ {
|
|
// check and fix last 10 entries
|
|
lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*types.NeedleMapEntrySize)
|
|
if err == io.EOF {
|
|
healthyIndexSize = indexSize - int64(i)*types.NeedleMapEntrySize
|
|
continue
|
|
}
|
|
if err != ErrorSizeMismatch {
|
|
break
|
|
}
|
|
}
|
|
if healthyIndexSize < indexSize {
|
|
return 0, fmt.Errorf("CheckVolumeDataIntegrity %s failed: index size %d differs from healthy size %d", indexFile.Name(), indexSize, healthyIndexSize)
|
|
}
|
|
return
|
|
}
|
|
|
|
func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (lastAppendAtNs uint64, err error) {
|
|
var lastIdxEntry []byte
|
|
if lastIdxEntry, err = readIndexEntryAtOffset(indexFile, indexOffset); err != nil {
|
|
return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), err)
|
|
}
|
|
key, offset, size := idx.IdxFileEntry(lastIdxEntry)
|
|
if offset.IsZero() {
|
|
return 0, nil
|
|
}
|
|
if size < 0 {
|
|
// read the deletion entry. Pass io.EOF and ErrorSizeMismatch through
|
|
// unwrapped so CheckVolumeDataIntegrity can recognize them and run its
|
|
// trailing-truncation / wrap-around recovery loop, matching the live
|
|
// branch below.
|
|
if lastAppendAtNs, err = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset(), key); err != nil {
|
|
if err == io.EOF || err == ErrorSizeMismatch {
|
|
return lastAppendAtNs, err
|
|
}
|
|
return lastAppendAtNs, fmt.Errorf("verifyDeletedNeedleIntegrity %s failed: %v", indexFile.Name(), err)
|
|
}
|
|
} else {
|
|
if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset(), key, size); err != nil {
|
|
if err == ErrorSizeMismatch {
|
|
return verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset()+int64(types.MaxPossibleVolumeSize), key, size)
|
|
}
|
|
return lastAppendAtNs, err
|
|
}
|
|
}
|
|
return lastAppendAtNs, nil
|
|
}
|
|
|
|
|
|
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
|
|
if indexSize, err = util.GetFileSize(indexFile); err == nil {
|
|
if indexSize%types.NeedleMapEntrySize != 0 {
|
|
err = fmt.Errorf("index file's size is %d bytes, maybe corrupted", indexSize)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err error) {
|
|
if offset < 0 {
|
|
err = fmt.Errorf("offset %d for index file is invalid", offset)
|
|
return
|
|
}
|
|
bytes = make([]byte, types.NeedleMapEntrySize)
|
|
var readCount int
|
|
readCount, err = indexFile.ReadAt(bytes, offset)
|
|
if err == io.EOF && readCount == types.NeedleMapEntrySize {
|
|
err = nil
|
|
}
|
|
return
|
|
}
|
|
|
|
func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key types.NeedleId, size types.Size) (lastAppendAtNs uint64, err error) {
|
|
n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset)
|
|
if err == io.EOF {
|
|
return 0, err
|
|
}
|
|
if err != nil {
|
|
return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset)
|
|
}
|
|
if n.Size != size {
|
|
return 0, ErrorSizeMismatch
|
|
}
|
|
if v == needle.Version3 {
|
|
bytes := make([]byte, types.TimestampSize)
|
|
var readCount int
|
|
readCount, err = datFile.ReadAt(bytes, offset+types.NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
|
|
if err == io.EOF && readCount == types.TimestampSize {
|
|
err = nil
|
|
}
|
|
if err == io.EOF {
|
|
return 0, err
|
|
}
|
|
if err != nil {
|
|
return 0, fmt.Errorf("verifyNeedleIntegrity check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err)
|
|
}
|
|
n.AppendAtNs = util.BytesToUint64(bytes)
|
|
fileTailOffset := offset + needle.GetActualSize(size, v)
|
|
fileSize, _, err := datFile.GetStat()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("stat file %s: %v", datFile.Name(), err)
|
|
}
|
|
if fileSize == fileTailOffset {
|
|
return n.AppendAtNs, nil
|
|
}
|
|
if fileSize > fileTailOffset {
|
|
glog.Warningf("data file %s actual %d bytes expected %d bytes!", datFile.Name(), fileSize, fileTailOffset)
|
|
return n.AppendAtNs, fmt.Errorf("data file %s actual %d bytes expected %d bytes", datFile.Name(), fileSize, fileTailOffset)
|
|
}
|
|
glog.Warningf("data file %s has %d bytes, less than expected %d bytes!", datFile.Name(), fileSize, fileTailOffset)
|
|
}
|
|
if err = n.ReadData(datFile, offset, size, v); err != nil {
|
|
return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err)
|
|
}
|
|
if n.Id != key {
|
|
return n.AppendAtNs, fmt.Errorf("index key %v does not match needle's Id %v", key, n.Id)
|
|
}
|
|
return n.AppendAtNs, err
|
|
}
|
|
|
|
func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key types.NeedleId) (lastAppendAtNs uint64, err error) {
|
|
n := new(needle.Needle)
|
|
// Tombstones are appended with DataSize=0, so the on-disk header carries
|
|
// Size=0. TombstoneFileSize (-1) lives only in the .idx; passing it to
|
|
// ReadData fails the size check and triggers the 32GB wrap-around retry,
|
|
// which reads past EOF and falsely marks the volume read-only.
|
|
size := types.Size(0)
|
|
if err = n.ReadData(datFile, offset, size, v); err != nil {
|
|
// Preserve io.EOF and ErrorSizeMismatch as-is so CheckVolumeDataIntegrity
|
|
// can detect trailing truncation and trigger its wrap-around retry.
|
|
if err == io.EOF || err == ErrorSizeMismatch {
|
|
return n.AppendAtNs, err
|
|
}
|
|
return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+needle.GetActualSize(size, v), err)
|
|
}
|
|
if n.Id != key {
|
|
return n.AppendAtNs, fmt.Errorf("index key %v does not match needle's Id %v", key, n.Id)
|
|
}
|
|
return n.AppendAtNs, err
|
|
}
|
|
|
|
func (v *Volume) checkIdxFile() error {
|
|
datFileSize, _, err := v.DataBackend.GetStat()
|
|
if err != nil {
|
|
return fmt.Errorf("get stat %s: %v", v.FileName(".dat"), err)
|
|
}
|
|
if datFileSize <= super_block.SuperBlockSize {
|
|
return nil
|
|
}
|
|
indexFileName := v.FileName(".idx")
|
|
if util.FileExists(indexFileName) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("idx file %s does not exists", indexFileName)
|
|
}
|