diff --git a/seaweed-volume/src/storage/needle_map.rs b/seaweed-volume/src/storage/needle_map.rs index 248604e1d..a21bc6caa 100644 --- a/seaweed-volume/src/storage/needle_map.rs +++ b/seaweed-volume/src/storage/needle_map.rs @@ -19,6 +19,7 @@ use compact_map::CompactMap; use redb::{Database, Durability, ReadableDatabase, ReadableTable, TableDefinition}; use crate::storage::idx; +use crate::storage::needle::needle::get_actual_size; use crate::storage::types::*; // ============================================================================ @@ -63,6 +64,11 @@ pub struct NeedleMapMetric { pub deletion_count: AtomicI64, pub deletion_byte_count: AtomicU64, pub max_file_key: AtomicU64, + /// Largest (offset.to_actual_offset() + get_actual_size(size, version)) + /// observed during the load walk. Used at volume load to verify that no + /// .idx entry references bytes past the end of .dat (issue #8928) + /// without paying for a second linear scan. + pub max_needle_end: AtomicI64, } impl NeedleMapMetric { @@ -108,6 +114,29 @@ impl NeedleMapMetric { } } } + + /// Update `max_needle_end` if this entry's (offset + actual size) exceeds + /// the running maximum. Skips deleted/zero-offset entries because they + /// don't reserve space in .dat. + fn maybe_set_max_needle_end(&self, offset: Offset, size: Size, version: Version) { + if offset.is_zero() || !size.is_valid() { + return; + } + let end = offset.to_actual_offset() + get_actual_size(size, version); + loop { + let current = self.max_needle_end.load(Ordering::Relaxed); + if end <= current { + break; + } + if self + .max_needle_end + .compare_exchange(current, end, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + break; + } + } + } } // ============================================================================ @@ -163,9 +192,10 @@ impl CompactNeedleMap { } /// Load from an .idx file, building the in-memory map. - pub fn load_from_idx(reader: &mut R) -> io::Result { + pub fn load_from_idx(reader: &mut R, version: Version) -> io::Result { let mut nm = CompactNeedleMap::new(); idx::walk_index_file(reader, 0, |key, offset, size| { + nm.metric.maybe_set_max_needle_end(offset, size, version); if offset.is_zero() || size.is_deleted() { nm.delete_from_map(key); } else { @@ -284,6 +314,12 @@ impl CompactNeedleMap { NeedleId(self.metric.max_file_key.load(Ordering::Relaxed)) } + /// Largest (offset + actual size) seen during the load walk; 0 if the + /// map is empty. See `NeedleMapMetric::maybe_set_max_needle_end`. + pub fn max_needle_end(&self) -> i64 { + self.metric.max_needle_end.load(Ordering::Relaxed) + } + pub fn index_file_size(&self) -> u64 { self.idx_file_offset } @@ -431,7 +467,7 @@ impl RedbNeedleMap { /// Rebuild metrics by scanning all entries in the redb table. /// Called when reusing an existing .rdb without a full rebuild. - fn rebuild_metrics_from_db(&self) -> io::Result<()> { + fn rebuild_metrics_from_db(&self, version: Version) -> io::Result<()> { let txn = self .db .begin_read() @@ -453,6 +489,8 @@ impl RedbNeedleMap { arr.copy_from_slice(bytes); let nv = unpack_needle_value(&arr); self.metric.maybe_set_max_file_key(key); + self.metric + .maybe_set_max_needle_end(nv.offset, nv.size, version); if nv.size.is_valid() { self.metric.file_count.fetch_add(1, Ordering::Relaxed); self.metric @@ -477,20 +515,24 @@ impl RedbNeedleMap { /// 2. If .idx size matches → reuse .rdb, rebuild metrics from scan /// 3. If .idx is larger → replay new entries incrementally /// 4. Otherwise (missing, corrupted, .idx smaller) → full rebuild - pub fn load_from_idx(db_path: &str, reader: &mut R) -> io::Result { + pub fn load_from_idx( + db_path: &str, + reader: &mut R, + version: Version, + ) -> io::Result { let idx_size = reader.seek(io::SeekFrom::End(0))?; reader.seek(io::SeekFrom::Start(0))?; // Try to reuse existing .rdb if Path::new(db_path).exists() { - if let Ok(nm) = Self::try_reuse_rdb(db_path, reader, idx_size) { + if let Ok(nm) = Self::try_reuse_rdb(db_path, reader, idx_size, version) { return Ok(nm); } // Reuse failed — fall through to full rebuild reader.seek(io::SeekFrom::Start(0))?; } - Self::full_rebuild(db_path, reader, idx_size) + Self::full_rebuild(db_path, reader, idx_size, version) } /// Try to reuse an existing .rdb file. Returns Ok if successful, @@ -499,6 +541,7 @@ impl RedbNeedleMap { db_path: &str, reader: &mut R, idx_size: u64, + version: Version, ) -> io::Result { let db = Database::open(db_path) .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb open: {}", e)))?; @@ -523,7 +566,7 @@ impl RedbNeedleMap { } // Rebuild metrics from existing data - nm.rebuild_metrics_from_db()?; + nm.rebuild_metrics_from_db(version)?; if stored_idx_size < idx_size { // .idx grew — replay new entries incrementally @@ -534,6 +577,7 @@ impl RedbNeedleMap { io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)) })?; idx::walk_index_file(reader, start_entry, |key, offset, size| { + nm.metric.maybe_set_max_needle_end(offset, size, version); let key_u64: u64 = key.into(); if offset.is_zero() || size.is_deleted() { // Delete: look up old value for metric update, then @@ -607,6 +651,7 @@ impl RedbNeedleMap { db_path: &str, reader: &mut R, idx_size: u64, + version: Version, ) -> io::Result { let _ = std::fs::remove_file(db_path); let nm = RedbNeedleMap::new(db_path)?; @@ -614,6 +659,7 @@ impl RedbNeedleMap { // Collect entries from idx file, resolving duplicates/deletions let mut entries: HashMap> = HashMap::new(); idx::walk_index_file(reader, 0, |key, offset, size| { + nm.metric.maybe_set_max_needle_end(offset, size, version); if offset.is_zero() || size.is_deleted() { entries.insert(key, None); } else { @@ -788,6 +834,12 @@ impl RedbNeedleMap { NeedleId(self.metric.max_file_key.load(Ordering::Relaxed)) } + /// Largest (offset + actual size) seen during the load walk; 0 if the + /// map is empty. See `NeedleMapMetric::maybe_set_max_needle_end`. + pub fn max_needle_end(&self) -> i64 { + self.metric.max_needle_end.load(Ordering::Relaxed) + } + pub fn index_file_size(&self) -> u64 { self.idx_file_offset } @@ -988,6 +1040,16 @@ impl NeedleMap { } } + /// Largest (offset + actual size) seen during the load walk; 0 if the + /// map is empty. Used at volume load to detect .idx entries that + /// reference past the end of .dat (issue #8928) without a second scan. + pub fn max_needle_end(&self) -> i64 { + match self { + NeedleMap::InMemory(nm) => nm.max_needle_end(), + NeedleMap::Redb(nm) => nm.max_needle_end(), + } + } + /// Index file size in bytes. pub fn index_file_size(&self) -> u64 { match self { @@ -1157,7 +1219,7 @@ mod tests { .unwrap(); let mut cursor = Cursor::new(idx_data); - let nm = CompactNeedleMap::load_from_idx(&mut cursor).unwrap(); + let nm = CompactNeedleMap::load_from_idx(&mut cursor, Version::current()).unwrap(); assert!(nm.get(NeedleId(1)).is_some()); assert!(nm.get(NeedleId(2)).is_none()); // deleted @@ -1300,7 +1362,7 @@ mod tests { .unwrap(); let mut cursor = Cursor::new(idx_data); - let nm = RedbNeedleMap::load_from_idx(db_path.to_str().unwrap(), &mut cursor).unwrap(); + let nm = RedbNeedleMap::load_from_idx(db_path.to_str().unwrap(), &mut cursor, Version::current()).unwrap(); assert!(nm.get(NeedleId(1)).is_some()); assert!(nm.get(NeedleId(2)).is_none()); // deleted and removed @@ -1377,7 +1439,7 @@ mod tests { // Load back with CompactNeedleMap to verify let mut idx_file = std::fs::File::open(&idx_path).unwrap(); - let loaded = CompactNeedleMap::load_from_idx(&mut idx_file).unwrap(); + let loaded = CompactNeedleMap::load_from_idx(&mut idx_file, Version::current()).unwrap(); assert_eq!(loaded.file_count(), 2); // only live entries assert!(loaded.get(NeedleId(1)).is_some()); assert!(loaded.get(NeedleId(2)).is_none()); // deleted, not saved diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 2ee94b5b2..70794c8f0 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -19,6 +19,8 @@ use std::time::{SystemTime, UNIX_EPOCH}; use tracing::warn; +#[cfg(test)] +use crate::storage::idx; use crate::storage::needle::needle::{self, get_actual_size, Needle, NeedleError}; use crate::storage::needle_map::{CompactNeedleMap, NeedleMap, NeedleMapKind, RedbNeedleMap}; use crate::storage::super_block::{ReplicaPlacement, SuperBlock, SUPER_BLOCK_SIZE}; @@ -73,6 +75,24 @@ pub enum VolumeError { StreamingUnsupported, } +/// Returns true when a needle read failed because the on-disk bytes are +/// unreadable in a permanent way (offset past EOF, header corruption, CRC +/// mismatch, malformed v2/v3/v4 fields). Vacuum can safely drop such entries +/// during compaction. Anything else (real disk EIO, ERROR_CRC on Windows, +/// network timeouts, EROFS, etc.) is transient or environmental and must +/// abort the compaction so an operator notices. +fn is_skippable_needle_read_error(e: &VolumeError) -> bool { + match e { + VolumeError::Io(io_err) => io_err.kind() == io::ErrorKind::UnexpectedEof, + VolumeError::Needle(NeedleError::SizeMismatch { .. }) => true, + VolumeError::Needle(NeedleError::CrcMismatch { .. }) => true, + VolumeError::Needle(NeedleError::IndexOutOfRange(_)) => true, + VolumeError::Needle(NeedleError::TailTooShort) => true, + VolumeError::SizeMismatch => true, + _ => false, + } +} + // ============================================================================ // VolumeInfo (.vif persistence) // ============================================================================ @@ -712,6 +732,28 @@ impl Volume { "volumeDataIntegrityChecking failed" ); } + + // Structural check: no .idx entry may reference bytes past the + // end of .dat. The needle map's load walk above already + // populated max_needle_end, so this is a numeric comparison + // — no extra disk I/O. A violation marks the volume read-only + // so vacuum doesn't silently drop reachable data based on a + // corrupt .idx left over from a crashed batched write. + // See issue #8928. + if let Some(ref nm) = self.nm { + if let Ok(dat_size) = self.current_dat_file_size() { + let max_end = nm.max_needle_end(); + if dat_size > 0 && max_end > dat_size as i64 { + self.no_write_or_delete = true; + warn!( + volume_id = self.id.0, + max_needle_end = max_end, + dat_size, + "idx references bytes past end of .dat; marking volume read-only" + ); + } + } + } } } @@ -759,7 +801,7 @@ impl Volume { // Open read-only if Path::new(&idx_path).exists() { let mut idx_file = File::open(&idx_path)?; - let nm = CompactNeedleMap::load_from_idx(&mut idx_file)?; + let nm = CompactNeedleMap::load_from_idx(&mut idx_file, self.version())?; self.nm = Some(NeedleMap::InMemory(nm)); } else { // Missing .idx with existing .dat could orphan needles @@ -785,7 +827,7 @@ impl Volume { let idx_size = idx_file.metadata()?.len(); let mut idx_reader = io::BufReader::new(&idx_file); - let mut nm = CompactNeedleMap::load_from_idx(&mut idx_reader)?; + let mut nm = CompactNeedleMap::load_from_idx(&mut idx_reader, self.version())?; // Re-open for append-only writes let write_file = OpenOptions::new() @@ -808,7 +850,7 @@ impl Volume { // Open read-only if Path::new(&idx_path).exists() { let mut idx_file = File::open(&idx_path)?; - let nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_file)?; + let nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_file, self.version())?; self.nm = Some(NeedleMap::Redb(nm)); } else { // Missing .idx with existing .dat could orphan needles @@ -834,7 +876,7 @@ impl Volume { let idx_size = idx_file.metadata()?.len(); let mut idx_reader = io::BufReader::new(&idx_file); - let mut nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_reader)?; + let mut nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_reader, self.version())?; // Re-open for append-only writes let write_file = OpenOptions::new() @@ -1752,6 +1794,11 @@ impl Volume { let version = self.version(); + // The deeper-than-tail structural check (every (offset + actual size) + // fits inside .dat — issue #8928) is now handled in load() via the + // needle map's max_needle_end accumulator, so we don't pay for a + // second linear scan of the .idx here. + // Check last 10 index entries (matching Go's CheckVolumeDataIntegrity). // Go starts healthyIndexSize = indexSize and reduces on EOF. // On success: break (err != ErrorSizeMismatch when err == nil). @@ -2683,6 +2730,8 @@ impl Volume { } entries.sort_by_key(|(_, offset, _)| *offset); + let mut skipped_needles: u64 = 0; + let mut skipped_data_bytes: u64 = 0; for (id, offset, size) in entries { // Progress callback if !progress_fn(offset.to_actual_offset()) { @@ -2699,7 +2748,41 @@ impl Volume { id, ..Needle::default() }; - self.read_needle_data_at(&mut n, offset.to_actual_offset(), size)?; + match self.read_needle_data_at(&mut n, offset.to_actual_offset(), size) { + Ok(()) => {} + Err(e) => { + // Record EIO for health monitoring (parity with Go's checkReadWriteError). + if let VolumeError::Io(ref io_err) = e { + self.check_read_write_error(Some(io_err)); + } + // Only drop the entry when the failure is one of the well- + // known permanent-corruption shapes. A transient disk fault, + // a tiered-read timeout, or a Windows hardware error (which + // surfaces as a generic Io rather than UnexpectedEof) must + // abort so an operator notices, rather than silently + // compacting away data that might come back on retry. + // See issue #8928. + if !is_skippable_needle_read_error(&e) { + return Err(VolumeError::Io(io::Error::new( + io::ErrorKind::Other, + format!("cannot hydrate needle from file: {}", e), + ))); + } + skipped_needles += 1; + if size.is_valid() { + skipped_data_bytes += size.0 as u64; + } + warn!( + volume_id = self.id.0, + key = id.0, + offset = offset.to_actual_offset(), + size = size.0, + error = %e, + "vacuum: dropping unreadable needle" + ); + continue; + } + } // Skip TTL-expired needles using the volume's TTL (matches Go's volume_vacuum.go) if n.has_ttl() { @@ -2721,6 +2804,15 @@ impl Volume { new_offset += bytes.len() as i64; } + if skipped_needles > 0 { + warn!( + volume_id = self.id.0, + skipped_needles, + skipped_data_bytes, + "vacuum: dropped unreadable index entries during compaction" + ); + } + dst.sync_all()?; // Save new index @@ -3710,6 +3802,138 @@ mod tests { v.cleanup_compact().unwrap(); } + /// Vacuum compaction must tolerate an .idx entry whose offset points past + /// the end of the .dat file (the failure mode in issue #8928). The bad + /// entry is silently dropped from the resulting .cpx; healthy needles + /// survive untouched. + #[test] + fn test_compact_by_index_drops_dangling_needle() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let mut v = make_test_volume(dir); + + // Write a handful of healthy needles to establish a baseline .dat/.idx. + for i in 1..=5u64 { + let mut n = Needle { + id: NeedleId(i), + cookie: Cookie(i as u32), + data: format!("payload-{}", i).into_bytes(), + data_size: format!("payload-{}", i).len() as u32, + ..Needle::default() + }; + v.write_needle(&mut n, true).unwrap(); + } + v.sync_to_disk().unwrap(); + + let dat_size = v.dat_file_size().unwrap(); + let bad_key = NeedleId(9999); + let bad_offset = Offset::from_actual_offset((dat_size + 1024 * 1024) as i64); + let bad_size = Size(2048); + v.nm + .as_mut() + .expect("needle map present") + .put(bad_key, bad_offset, bad_size) + .unwrap(); + v.nm.as_ref().unwrap().sync().unwrap(); + + // Vacuum must succeed in spite of the dangling entry. + v.compact_by_index(0, 0, |_| true) + .expect("compact_by_index should tolerate dangling entries"); + + // Walk the resulting .cpx and confirm the bad key was dropped while + // every healthy key made it through. + let cpx_path = v.file_name(".cpx"); + let mut cpx = File::open(&cpx_path).unwrap(); + let mut kept: Vec = Vec::new(); + idx::walk_index_file(&mut cpx, 0, |key, _, size| { + if !size.is_deleted() { + kept.push(key.0); + } + Ok(()) + }) + .unwrap(); + + assert!( + !kept.contains(&bad_key.0), + "dangling key {} should have been dropped from .cpx, got {:?}", + bad_key.0, + kept + ); + for i in 1..=5u64 { + assert!( + kept.contains(&i), + "healthy key {} missing from compacted .cpx, got {:?}", + i, + kept + ); + } + } + + /// The needle map's max_needle_end accumulator must let volume.load + /// detect an .idx whose entries point past the end of the .dat — the + /// deeper-than-tail corruption shape from issue #8928 that the existing + /// last-10-entries scan cannot see. The check is populated by the load + /// walk and read in volume.load() to flip the volume read-only. + #[test] + fn test_max_needle_end_detects_dangling_entry() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let mut v = make_test_volume(dir); + + for i in 1..=4u64 { + let mut n = Needle { + id: NeedleId(i), + cookie: Cookie(i as u32), + data: format!("data-{}", i).into_bytes(), + data_size: format!("data-{}", i).len() as u32, + ..Needle::default() + }; + v.write_needle(&mut n, true).unwrap(); + } + v.sync_to_disk().unwrap(); + + let dat_size = v.dat_file_size().unwrap() as i64; + let idx_path = v.file_name(".idx"); + let version = v.version(); + + // Sanity: a fresh load walk over the healthy .idx puts max_needle_end + // somewhere inside the .dat. + let mut idx_reader = File::open(&idx_path).unwrap(); + let healthy_nm = + CompactNeedleMap::load_from_idx(&mut idx_reader, version).unwrap(); + let healthy_end = healthy_nm.max_needle_end(); + assert!( + healthy_end > 0 && healthy_end <= dat_size, + "healthy volume should have max_needle_end ({}) in [1, dat_size={}]", + healthy_end, + dat_size + ); + + // Inject a dangling entry by appending a bogus 16/17-byte record + // directly to the .idx, then reload. The load walk should observe + // max_needle_end past dat_size — which is exactly the signal + // volume.load uses to mark the volume read-only. + let bad_offset = Offset::from_actual_offset(dat_size + 4 * 1024 * 1024); + let mut idx_append = OpenOptions::new() + .write(true) + .append(true) + .open(&idx_path) + .unwrap(); + idx::write_index_entry(&mut idx_append, NeedleId(9999), bad_offset, Size(1024)) + .unwrap(); + idx_append.sync_all().unwrap(); + + let mut idx_reread = File::open(&idx_path).unwrap(); + let bad_nm = CompactNeedleMap::load_from_idx(&mut idx_reread, version).unwrap(); + let bad_end = bad_nm.max_needle_end(); + assert!( + bad_end > dat_size, + "after dangling-entry inject max_needle_end ({}) should exceed dat_size ({})", + bad_end, + dat_size + ); + } + #[test] fn test_compaction_revision_relookup() { // Verifies that re_lookup_needle_data_offset returns the correct data offset diff --git a/weed/storage/needle/needle_read.go b/weed/storage/needle/needle_read.go index 7374dc0ca..c771aadb4 100644 --- a/weed/storage/needle/needle_read.go +++ b/weed/storage/needle/needle_read.go @@ -27,6 +27,12 @@ const ( var ErrorSizeMismatch = errors.New("size mismatch") var ErrorSizeInvalid = errors.New("size invalid") +// ErrorCorrupted marks a needle whose on-disk bytes cannot be parsed because +// of data corruption: bad CRC, malformed v2/v3/v4 headers, or out-of-range +// fields. Wrap errors with %w so callers (e.g. vacuum compaction) can +// distinguish "the bytes are bad" from a genuine I/O fault. +var ErrorCorrupted = errors.New("needle data corrupted") + func (n *Needle) DiskSize(version Version) int64 { return GetActualSize(n.Size, version) } @@ -58,7 +64,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio return ErrorSizeMismatch } stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatch).Inc() - return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) + return fmt.Errorf("%w: entry not found: offset %d found id %x size %d, expected size %d", ErrorSizeMismatch, offset, n.Id, n.Size, size) } if version == Version1 { n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] @@ -106,7 +112,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) { index = index + 4 if int(n.DataSize)+index > lenBytes { stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() - return fmt.Errorf("index out of range %d", 1) + return fmt.Errorf("index out of range %d: %w", 1, ErrorCorrupted) } n.Data = bytes[index : index+int(n.DataSize)] index = index + int(n.DataSize) @@ -125,7 +131,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err index = index + 1 if int(n.NameSize)+index > lenBytes { stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() - return index, fmt.Errorf("index out of range %d", 2) + return index, fmt.Errorf("index out of range %d: %w", 2, ErrorCorrupted) } n.Name = bytes[index : index+int(n.NameSize)] index = index + int(n.NameSize) @@ -135,7 +141,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err index = index + 1 if int(n.MimeSize)+index > lenBytes { stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() - return index, fmt.Errorf("index out of range %d", 3) + return index, fmt.Errorf("index out of range %d: %w", 3, ErrorCorrupted) } n.Mime = bytes[index : index+int(n.MimeSize)] index = index + int(n.MimeSize) @@ -143,7 +149,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err if index < lenBytes && n.HasLastModifiedDate() { if LastModifiedBytesLength+index > lenBytes { stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() - return index, fmt.Errorf("index out of range %d", 4) + return index, fmt.Errorf("index out of range %d: %w", 4, ErrorCorrupted) } n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength]) index = index + LastModifiedBytesLength @@ -151,7 +157,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err if index < lenBytes && n.HasTtl() { if TtlBytesLength+index > lenBytes { stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() - return index, fmt.Errorf("index out of range %d", 5) + return index, fmt.Errorf("index out of range %d: %w", 5, ErrorCorrupted) } n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) index = index + TtlBytesLength @@ -159,13 +165,13 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err if index < lenBytes && n.HasPairs() { if 2+index > lenBytes { stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() - return index, fmt.Errorf("index out of range %d", 6) + return index, fmt.Errorf("index out of range %d: %w", 6, ErrorCorrupted) } n.PairsSize = util.BytesToUint16(bytes[index : index+2]) index += 2 if int(n.PairsSize)+index > lenBytes { stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() - return index, fmt.Errorf("index out of range %d", 7) + return index, fmt.Errorf("index out of range %d: %w", 7, ErrorCorrupted) } end := index + int(n.PairsSize) n.Pairs = bytes[index:end] diff --git a/weed/storage/needle/needle_read_tail.go b/weed/storage/needle/needle_read_tail.go index 375f30b11..dfaab1706 100644 --- a/weed/storage/needle/needle_read_tail.go +++ b/weed/storage/needle/needle_read_tail.go @@ -18,7 +18,7 @@ func (n *Needle) readNeedleTail(needleBody []byte, version Version) error { // with seaweed version using crc.Value() instead of uint32(crc), which appears in commit 056c480eb // and switch appeared in version 3.09. stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorCRC).Inc() - return fmt.Errorf("invalid CRC for needle %v (got %08x, want %08x), data on disk corrupted", n.Id, dataChecksum, expectedChecksum) + return fmt.Errorf("invalid CRC for needle %v (got %08x, want %08x), data on disk corrupted: %w", n.Id, dataChecksum, expectedChecksum, ErrorCorrupted) } n.Checksum = dataChecksum } else { diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 5feb4a754..3311e931c 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -31,6 +31,7 @@ type NeedleMapper interface { FileCount() int DeletedCount() int MaxFileKey() NeedleId + MaxNeedleEnd() int64 IndexFileSize() uint64 Sync() error ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error) diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index a28e1a5b3..fe4b14953 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -12,6 +12,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/opt" "github.com/seaweedfs/seaweedfs/weed/storage/idx" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" @@ -39,7 +40,7 @@ type LevelDbNeedleMap struct { recordCount uint64 } -func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options, ldbTimeout int64) (m *LevelDbNeedleMap, err error) { +func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options, ldbTimeout int64, version needle.Version) (m *LevelDbNeedleMap, err error) { m = &LevelDbNeedleMap{dbFileName: dbFileName} m.indexFile = indexFile if !isLevelDbFresh(dbFileName, indexFile) { @@ -72,7 +73,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option return } } - mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile) + mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile, version) if indexLoadError != nil { return nil, indexLoadError } @@ -334,6 +335,10 @@ func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts * func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) (err error) { glog.V(0).Infof("loading idx to leveldb from offset %d for file: %s", startFrom, indexFile.Name()) + version := needle.GetCurrentVersion() + if v != nil { + version = v.Version() + } dbFileName := v.FileName(".cpldb") db, dbErr := leveldb.OpenFile(dbFileName, nil) defer func() { @@ -356,6 +361,7 @@ func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startF err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) { m.mapMetric.FileCounter++ + m.mapMetric.MaybeSetMaxNeedleEnd(offset, size, version) bytes := make([]byte, NeedleIdSize) NeedleIdToBytes(bytes[0:NeedleIdSize], key) // fresh loading diff --git a/weed/storage/needle_map_leveldb_test.go b/weed/storage/needle_map_leveldb_test.go index 50f1b04ea..443bc1b3e 100644 --- a/weed/storage/needle_map_leveldb_test.go +++ b/weed/storage/needle_map_leveldb_test.go @@ -7,6 +7,7 @@ import ( "sync" "testing" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/types" ) @@ -25,7 +26,7 @@ func TestLevelDbNeedleMap_Concurrency(t *testing.T) { dbFileName := filepath.Join(dir, prefix+".ldb") // Create and initialize map - m, err := NewLevelDbNeedleMap(dbFileName, indexFile, nil, 1) + m, err := NewLevelDbNeedleMap(dbFileName, indexFile, nil, 1, needle.GetCurrentVersion()) if err != nil { t.Fatalf("NewLevelDbNeedleMap: %v", err) } diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index 37f757b9d..4abb8891d 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/idx" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" . "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/syndtr/goleveldb/leveldb/opt" @@ -28,14 +29,15 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap { return nm } -func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) { +func LoadCompactNeedleMap(file *os.File, version needle.Version) (*NeedleMap, error) { nm := NewCompactNeedleMap(file) - return doLoading(file, nm) + return doLoading(file, nm, version) } -func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { +func doLoading(file *os.File, nm *NeedleMap, version needle.Version) (*NeedleMap, error) { e := idx.WalkIndexFile(file, 0, func(key NeedleId, offset Offset, size Size) error { nm.MaybeSetMaxFileKey(key) + nm.MaybeSetMaxNeedleEnd(offset, size, version) if !offset.IsZero() && !size.IsDeleted() { nm.FileCounter++ nm.FileByteCounter = nm.FileByteCounter + uint64(size) @@ -109,8 +111,13 @@ func (nm *NeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Op func (nm *NeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error { glog.V(0).Infof("loading idx from offset %d for file: %s", startFrom, indexFile.Name()) + version := needle.GetCurrentVersion() + if v != nil { + version = v.Version() + } e := idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) error { nm.MaybeSetMaxFileKey(key) + nm.MaybeSetMaxNeedleEnd(offset, size, version) nm.FileCounter++ if !offset.IsZero() && !size.IsDeleted() { nm.FileByteCounter = nm.FileByteCounter + uint64(size) diff --git a/weed/storage/needle_map_metric.go b/weed/storage/needle_map_metric.go index d6d0a8730..e1de97c6e 100644 --- a/weed/storage/needle_map_metric.go +++ b/weed/storage/needle_map_metric.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "github.com/seaweedfs/seaweedfs/weed/storage/idx" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" . "github.com/seaweedfs/seaweedfs/weed/storage/types" boom "github.com/tylertreat/BoomFilters" ) @@ -17,6 +18,12 @@ type mapMetric struct { DeletionByteCounter uint64 `json:"DeletionByteCounter"` FileByteCounter uint64 `json:"FileByteCounter"` MaximumFileKey uint64 `json:"MaxFileKey"` + // MaximumNeedleEnd is the largest (offset.ToActualOffset() + + // GetActualSize(size, version)) seen during the index walk. It is used + // at volume load to verify that no .idx entry references bytes past + // the end of the .dat — the deeper-than-tail corruption shape from + // issue #8928 — without paying for a second linear scan of the index. + MaximumNeedleEnd int64 `json:"MaxNeedleEnd"` } func (mm *mapMetric) logDelete(deletedByteCount Size) { @@ -92,7 +99,27 @@ func (mm *mapMetric) MaybeSetMaxFileKey(key NeedleId) { } } -func needleMapMetricFromIndexFile(r *os.File, mm *mapMetric) error { +// MaybeSetMaxNeedleEnd updates MaximumNeedleEnd if the supplied entry's +// (offset + actual size) is larger than what we have seen so far. Skips +// deleted/zero-offset entries because they don't reserve space in .dat. +func (mm *mapMetric) MaybeSetMaxNeedleEnd(offset Offset, size Size, version needle.Version) { + if mm == nil || offset.IsZero() || !size.IsValid() { + return + } + end := offset.ToActualOffset() + needle.GetActualSize(size, version) + if end > atomic.LoadInt64(&mm.MaximumNeedleEnd) { + atomic.StoreInt64(&mm.MaximumNeedleEnd, end) + } +} + +func (mm *mapMetric) MaxNeedleEnd() int64 { + if mm == nil { + return 0 + } + return atomic.LoadInt64(&mm.MaximumNeedleEnd) +} + +func needleMapMetricFromIndexFile(r *os.File, mm *mapMetric, version needle.Version) error { var bf *boom.BloomFilter buf := make([]byte, NeedleIdSize) err := reverseWalkIndexFile(r, func(entryCount int64) { @@ -100,6 +127,7 @@ func needleMapMetricFromIndexFile(r *os.File, mm *mapMetric) error { }, func(key NeedleId, offset Offset, size Size) error { mm.MaybeSetMaxFileKey(key) + mm.MaybeSetMaxNeedleEnd(offset, size, version) NeedleIdToBytes(buf, key) if size.IsValid() { mm.FileByteCounter += uint64(size) @@ -124,9 +152,9 @@ func needleMapMetricFromIndexFile(r *os.File, mm *mapMetric) error { return err } -func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) { +func newNeedleMapMetricFromIndexFile(r *os.File, version needle.Version) (mm *mapMetric, err error) { mm = &mapMetric{} - err = needleMapMetricFromIndexFile(r, mm) + err = needleMapMetricFromIndexFile(r, mm, version) return } diff --git a/weed/storage/needle_map_metric_test.go b/weed/storage/needle_map_metric_test.go index 96919d103..5dbe89406 100644 --- a/weed/storage/needle_map_metric_test.go +++ b/weed/storage/needle_map_metric_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" . "github.com/seaweedfs/seaweedfs/weed/storage/types" ) @@ -21,7 +22,7 @@ func TestFastLoadingNeedleMapMetrics(t *testing.T) { } } - mm, _ := newNeedleMapMetricFromIndexFile(idxFile) + mm, _ := newNeedleMapMetricFromIndexFile(idxFile, needle.GetCurrentVersion()) glog.V(0).Infof("FileCount expected %d actual %d", nm.FileCount(), mm.FileCount()) glog.V(0).Infof("DeletedSize expected %d actual %d", nm.DeletedSize(), mm.DeletedSize()) diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go index 5bd67ea86..15a40c791 100644 --- a/weed/storage/needle_map_sorted_file.go +++ b/weed/storage/needle_map_sorted_file.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" . "github.com/seaweedfs/seaweedfs/weed/storage/types" ) @@ -16,7 +17,7 @@ type SortedFileNeedleMap struct { dbFileSize int64 } -func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) { +func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File, version needle.Version) (m *SortedFileNeedleMap, err error) { m = &SortedFileNeedleMap{baseFileName: indexBaseFileName} m.indexFile = indexFile fileName := indexBaseFileName + ".sdx" @@ -33,7 +34,7 @@ func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *So dbStat, _ := m.dbFile.Stat() m.dbFileSize = dbStat.Size() glog.V(1).Infof("Loading %s...", indexFile.Name()) - mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile) + mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile, version) if indexLoadError != nil { _ = m.dbFile.Close() return nil, indexLoadError diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 4ae83394c..98eb71c06 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -122,6 +122,10 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin if indexSize == 0 { return 0, nil } + // The deeper-than-tail structural check (every (offset + actual size) + // fits inside .dat — issue #8928) lives in volume.load(): it reads + // MaximumNeedleEnd from the needle map after the load walk, so we don't + // need a redundant linear scan of the .idx here. healthyIndexSize := indexSize for i := 1; i <= 10 && indexSize >= int64(i)*types.NeedleMapEntrySize; i++ { // check and fix last 10 entries @@ -165,6 +169,7 @@ func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) ( return lastAppendAtNs, nil } + func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) { if indexSize, err = util.GetFileSize(indexFile); err == nil { if indexSize%types.NeedleMapEntrySize != 0 { diff --git a/weed/storage/volume_checking_test.go b/weed/storage/volume_checking_test.go index 2945823c8..3da226d9d 100644 --- a/weed/storage/volume_checking_test.go +++ b/weed/storage/volume_checking_test.go @@ -9,6 +9,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/backend" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func TestScrubVolumeData(t *testing.T) { @@ -35,8 +37,8 @@ func TestScrubVolumeData(t *testing.T) { version: needle.Version3, want: 27, wantErrs: []error{ - fmt.Errorf("needle 3 on volume 0: invalid CRC for needle 3 (got 0b243a0d, want 4af853fb), data on disk corrupted"), - fmt.Errorf("needle 48 on volume 0: invalid CRC for needle 30 (got 3c40e8d5, want 5077fea1), data on disk corrupted"), + fmt.Errorf("needle 3 on volume 0: invalid CRC for needle 3 (got 0b243a0d, want 4af853fb), data on disk corrupted: needle data corrupted"), + fmt.Errorf("needle 48 on volume 0: invalid CRC for needle 30 (got 3c40e8d5, want 5077fea1), data on disk corrupted: needle data corrupted"), fmt.Errorf("data file size for volume 0 (942864) doesn't match the size for 27 needles read (942856)"), }, }, @@ -78,3 +80,78 @@ func TestScrubVolumeData(t *testing.T) { }) } } + +// TestMaxNeedleEnd ensures the needle map's MaxNeedleEnd accumulator lets +// volume.load() detect an .idx that references bytes past the end of the .dat +// — the deeper-than-tail corruption shape from issue #8928 that the existing +// last-10-entries scan cannot see. The check is populated by the load walk +// and read by volume.load() to flip the volume read-only. +func TestMaxNeedleEnd(t *testing.T) { + dir := t.TempDir() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + defer v.Close() + + // A handful of healthy needles establishes a baseline .dat/.idx. + for i := 1; i <= 4; i++ { + n := newRandomNeedle(uint64(i)) + if _, _, _, err := v.writeNeedle2(n, true, false); err != nil { + t.Fatalf("write needle %d: %v", i, err) + } + } + if err := v.DataBackend.Sync(); err != nil { + t.Fatalf("sync .dat: %v", err) + } + if err := v.nm.Sync(); err != nil { + t.Fatalf("sync .idx: %v", err) + } + + datSize, _, err := v.DataBackend.GetStat() + if err != nil { + t.Fatalf("stat .dat: %v", err) + } + + // Sanity: a fresh load over the healthy .idx puts MaxNeedleEnd inside + // the .dat. + idxFile, err := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644) + if err != nil { + t.Fatalf("open .idx: %v", err) + } + healthyNm, err := LoadCompactNeedleMap(idxFile, v.Version()) + idxFile.Close() + if err != nil { + t.Fatalf("load healthy .idx: %v", err) + } + healthyEnd := healthyNm.MaxNeedleEnd() + if healthyEnd <= 0 || healthyEnd > datSize { + t.Fatalf("healthy volume should have MaxNeedleEnd (%d) in (0, dat_size=%d]", healthyEnd, datSize) + } + + // Inject a dangling entry by appending to the .idx, then reload. The + // walk should observe MaxNeedleEnd past dat_size — exactly the signal + // volume.load uses to mark the volume read-only. + bogusOffset := types.ToOffset(datSize + 4*1024*1024) + if err := v.nm.Put(types.Uint64ToNeedleId(9999), bogusOffset, types.Size(1024)); err != nil { + t.Fatalf("inject dangling idx entry: %v", err) + } + if err := v.nm.Sync(); err != nil { + t.Fatalf("sync .idx after inject: %v", err) + } + + idxFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644) + if err != nil { + t.Fatalf("reopen .idx: %v", err) + } + defer idxFile.Close() + badNm, err := LoadCompactNeedleMap(idxFile, v.Version()) + if err != nil { + t.Fatalf("reload .idx after inject: %v", err) + } + badEnd := badNm.MaxNeedleEnd() + if badEnd <= datSize { + t.Fatalf("after dangling-entry inject MaxNeedleEnd (%d) should exceed dat_size (%d)", badEnd, datSize) + } +} diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 37be275b6..b6f6924a1 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -173,8 +173,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } } + // The post-load structural check below uses the in-memory needle map + // to verify that no .idx entry references bytes past the end of .dat + // (issue #8928). The check piggybacks on MaxNeedleEnd, which the load + // walks below populate without a second linear scan. + if v.noWriteOrDelete || v.noWriteCanDelete { - if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile); err != nil { + if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile, v.Version()); err != nil { glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err) } } else { @@ -185,7 +190,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind err = v.tmpNm.UpdateNeedleMap(v, indexFile, nil, 0) } else { glog.V(2).Infoln("loading memory index", v.FileName(".idx"), "to memory") - if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil { + if v.nm, err = LoadCompactNeedleMap(indexFile, v.Version()); err != nil { glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err) } } @@ -200,7 +205,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout) } else { glog.V(0).Infoln("loading leveldb index", v.FileName(".ldb")) - if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil { + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil { glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) } } @@ -215,7 +220,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout) } else { glog.V(0).Infoln("loading leveldb medium index", v.FileName(".ldb")) - if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil { + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil { glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) } } @@ -230,12 +235,28 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout) } else { glog.V(0).Infoln("loading leveldb large index", v.FileName(".ldb")) - if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil { + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil { glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) } } } } + + // Structural check: no .idx entry may reference bytes past the end of + // the .dat. The needle map's load walk above already populated + // MaximumNeedleEnd, so this is just a numeric comparison — no extra + // disk I/O. A violation marks the volume read-only so a corrupt + // .idx left over from a crashed batched write does not silently + // power vacuum to drop reachable data. See issue #8928. + if !v.HasRemoteFile() && v.nm != nil && v.DataBackend != nil { + if datSize, _, statErr := v.DataBackend.GetStat(); statErr == nil && datSize > 0 { + if maxEnd := v.nm.MaxNeedleEnd(); maxEnd > datSize { + v.noWriteOrDelete = true + glog.V(0).Infof("volume %d: idx references end=%d but .dat is %d bytes; marking readonly", + v.Id, maxEnd, datSize) + } + } + } } if !hasVolumeInfoFile { diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index c5027204c..4814d1c7a 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -1,7 +1,9 @@ package storage import ( + "errors" "fmt" + "io" "os" "runtime" "time" @@ -17,6 +19,24 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +// isSkippableNeedleReadError returns true when a needle read failed because +// the on-disk bytes are unreadable in a permanent way (offset past EOF, header +// corruption, CRC mismatch, malformed v2/v3/v4 fields). Vacuum can safely drop +// such entries during compaction. Anything else (real disk EIO on Unix, +// ERROR_CRC / ERROR_IO_DEVICE on Windows, network timeouts, EROFS, etc.) is +// transient or environmental and must abort the compaction so an operator +// notices, rather than silently dropping recoverable data. +func isSkippableNeedleReadError(err error) bool { + if err == nil { + return false + } + return errors.Is(err, io.EOF) || + errors.Is(err, io.ErrUnexpectedEOF) || + errors.Is(err, needle.ErrorSizeMismatch) || + errors.Is(err, needle.ErrorSizeInvalid) || + errors.Is(err, needle.ErrorCorrupted) +} + type ProgressFunc func(processed int64) bool func (v *Volume) garbageLevel() float64 { @@ -493,6 +513,10 @@ func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) { } writeThrottler := util.NewWriteThrottler(opts.MaxBytesPerSecond) + var ( + skippedNeedles int + skippedDataBytes uint64 + ) err = oldNm.AscendingVisit(func(value needle_map.NeedleValue) error { offset, size := value.Offset, value.Size @@ -510,7 +534,21 @@ func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) { n := new(needle.Needle) if err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, opts.version); err != nil { v.checkReadWriteError(err) - return fmt.Errorf("cannot hydrate needle from file: %w", err) + // Only drop the entry when the failure is one of the well-known + // permanent-corruption shapes. A transient disk fault, a tiered + // read timeout, or a Windows hardware error (ERROR_CRC etc.) must + // abort so an operator notices, rather than silently compacting + // away data that might come back on retry. See issue #8928. + if !isSkippableNeedleReadError(err) { + return fmt.Errorf("cannot hydrate needle from file: %w", err) + } + skippedNeedles++ + if size.IsValid() { + skippedDataBytes += uint64(size) + } + glog.Warningf("vacuum volume %d: dropping unreadable needle key=%d offset=%d size=%d: %v", + v.Id, value.Key, offset.ToActualOffset(), size, err) + return nil } if n.HasTtl() && now >= n.LastModified+uint64(opts.superBlock.Ttl.Minutes()*60) { @@ -533,6 +571,10 @@ func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) { if err != nil { return err } + if skippedNeedles > 0 { + glog.Warningf("vacuum volume %d: dropped %d unreadable index entries (%d data bytes) during compaction", + v.Id, skippedNeedles, skippedDataBytes) + } if v.Ttl.String() == "" && v.nm != nil { dstDatSize, _, err := dstDatBackend.GetStat() if err != nil { @@ -540,6 +582,14 @@ func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) { } if v.nm.ContentSize() > v.nm.DeletedSize() { expectedContentSize := v.nm.ContentSize() - v.nm.DeletedSize() + // Skipped needles still contribute to the source-side ContentSize but + // were not written to the destination, so subtract them before the + // safety check to avoid a false positive. + if skippedDataBytes >= expectedContentSize { + expectedContentSize = 0 + } else { + expectedContentSize -= skippedDataBytes + } if expectedContentSize > uint64(dstDatSize) { return fmt.Errorf("volume %s unexpected new data size: %d does not match size of content minus deleted: %d", v.Id.String(), dstDatSize, expectedContentSize) diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index d8d6201eb..8689c19bc 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/seaweedfs/seaweedfs/weed/storage/idx" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" @@ -233,6 +234,90 @@ func TestCleanupCompactRemovesTempFiles(t *testing.T) { } } +// TestCompactByIndex_DropsDanglingNeedle verifies that vacuum compaction +// tolerates an .idx entry whose offset points past the end of the .dat file +// (the failure mode reported in issue #8928). The bad entry should be silently +// dropped from the resulting .cpx, while every healthy needle is preserved. +func TestCompactByIndex_DropsDanglingNeedle(t *testing.T) { + dir := t.TempDir() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + + const goodNeedleCount = 8 + infos := make([]*needleInfo, goodNeedleCount) + for i := 1; i <= goodNeedleCount; i++ { + n := newRandomNeedle(uint64(i)) + _, size, _, err := v.writeNeedle2(n, true, false) + if err != nil { + t.Fatalf("write needle %d: %v", i, err) + } + infos[i-1] = &needleInfo{size: size, crc: n.Checksum} + } + + if err := v.DataBackend.Sync(); err != nil { + t.Fatalf("sync .dat: %v", err) + } + if err := v.nm.Sync(); err != nil { + t.Fatalf("sync .idx: %v", err) + } + + datSize, _, err := v.DataBackend.GetStat() + if err != nil { + t.Fatalf("stat .dat: %v", err) + } + + // Inject an .idx entry that points 1 MB past the end of the .dat. The + // bad entry must go through nm.Put so it ends up in both the in-memory + // map and the on-disk .idx — exactly the corruption pattern in #8928. + badKey := types.Uint64ToNeedleId(uint64(goodNeedleCount + 100)) + badOffset := types.ToOffset(datSize + 1024*1024) + badSize := types.Size(2048) + if err := v.nm.Put(badKey, badOffset, badSize); err != nil { + t.Fatalf("inject bad idx entry: %v", err) + } + if err := v.nm.Sync(); err != nil { + t.Fatalf("sync .idx after inject: %v", err) + } + + if err := v.CompactByIndex(nil); err != nil { + t.Fatalf("CompactByIndex should tolerate dangling entries, got: %v", err) + } + + // Walk the new index and confirm the dangling entry was dropped while + // all of the original keys made it through. + cpx, err := os.Open(filepath.Join(dir, "1.cpx")) + if err != nil { + t.Fatalf("open .cpx: %v", err) + } + defer cpx.Close() + keptKeys := map[types.NeedleId]bool{} + if err := idx.WalkIndexFile(cpx, 0, func(key types.NeedleId, _ types.Offset, size types.Size) error { + if !size.IsDeleted() { + keptKeys[key] = true + } + return nil + }); err != nil { + t.Fatalf("walk .cpx: %v", err) + } + if keptKeys[badKey] { + t.Fatalf("dangling key %d should have been dropped from .cpx", badKey) + } + for i := 1; i <= goodNeedleCount; i++ { + if infos[i-1].size == 0 { + continue + } + k := types.Uint64ToNeedleId(uint64(i)) + if !keptKeys[k] { + t.Fatalf("healthy key %d missing from compacted .cpx", k) + } + } + + v.Close() +} + func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) { n := newRandomNeedle(uint64(i)) _, size, _, err := v.writeNeedle2(n, true, false) diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go index c7d991cd8..4617e4ea1 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -10,6 +10,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/backend" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -69,7 +70,7 @@ func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCac WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 10, // default value is 1 } - if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts, 0); err != nil { + if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts, 0, needle.GetCurrentVersion()); err != nil { return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err) }