mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(volume): keep vacuum running past dangling .idx entries (#9115)
* fix(volume): keep vacuum running past dangling .idx entries Vacuum compaction aborted entirely on the first .idx entry whose offset pointed past the end of the .dat file, surfacing as `cannot hydrate needle from file: EOF` and stalling progress on every other volume. In both Go and Rust: - During compaction, skip an unreadable needle and continue. The bytes it pointed at were already unreachable via reads, so dropping the index reference makes the post-vacuum volume consistent. Real EIO still bails out so a disk fault is not silently papered over. - At volume load, do a single linear scan of the .idx and confirm every (offset + actual size) fits inside .dat. The pre-existing integrity check only looked at the last 10 entries, so deeper corruption (e.g. left over from a crashed batched write) went undetected and only surfaced later as a vacuum EOF. A failure now marks the volume read-only at load time so an operator can react. Refs #8928 * fix(volume): only skip permanent-corruption needle reads during vacuum Address PR review feedback (gemini-code-assist + coderabbit): The original patch skipped any non-EIO read failure, which would silently drop needles on transient errors — Windows hardware bad-sector errors (ERROR_CRC etc.) never surface as syscall.EIO; tiered-storage network timeouts and EROFS would also slip through and shrink the volume. Switch to an explicit whitelist of permanent-corruption shapes: - Add needle.ErrorCorrupted sentinel and wrap CRC and "index out of range" errors with %w so callers can match via errors.Is. - copyDataBasedOnIndexFile now skips only when the read failure is io.EOF, io.ErrUnexpectedEOF, ErrorSizeMismatch, ErrorSizeInvalid, or ErrorCorrupted. Anything else (real disk faults, environmental errors, Windows hardware codes) aborts the compaction so an operator notices. - Mirror the same whitelist in the Rust volume server, matching on io::ErrorKind::UnexpectedEof and the NeedleError corruption variants (SizeMismatch, CrcMismatch, IndexOutOfRange, TailTooShort). Also add `defer v.Close()` in TestVerifyIndexFitsInDat so Windows t.TempDir() cleanup can release the .dat/.idx handles. Refs #8928 * fix(volume): wrap entry-not-found size-mismatch with ErrorSizeMismatch Address PR review: the fallback branch in ReadBytes returned an unwrapped fmt.Errorf, so isSkippableNeedleReadError (and any caller using errors.Is(..., ErrorSizeMismatch)) could not match it. Wrap with %w so the whitelist applies, while leaving the existing direct sentinel return for the OffsetSize==4 / offset<MaxPossibleVolumeSize retry path unchanged so ReadData's `err == ErrorSizeMismatch` retry still triggers. Refs #8928 * fix(volume): integrate dangling-idx check into existing index load walk Address PR review (gemini-code-assist, medium): the structural .idx check used to do a second linear scan of the index file at every volume load, doubling the disk-I/O cost on servers managing many volumes. Track the largest (offset + actual size) seen during the existing needle-map load walks (`LoadCompactNeedleMap`, `NewLevelDbNeedleMap`, `NewSortedFileNeedleMap`'s `newNeedleMapMetricFromIndexFile`, `DoOffsetLoading`) on a new `MaximumNeedleEnd` field on `mapMetric`, exposed as `MaxNeedleEnd()` on the NeedleMapper interface. `volume.load()` then compares `nm.MaxNeedleEnd()` to the .dat size after the load is complete — pure numeric comparison, no extra I/O. The standalone `verifyIndexFitsInDat` helper and its caller in `CheckVolumeDataIntegrity` are removed; the test that used to drive the helper directly now exercises the new path via `LoadCompactNeedleMap`. Mirror the same change in the Rust volume server: track `max_needle_end` on `NeedleMapMetric`, expose via `max_needle_end()` on `CompactNeedleMap`, `RedbNeedleMap`, and the `NeedleMap` enum. The Rust load walk already happens in `load_from_idx` for both map kinds, so the structural check becomes free. Refs #8928
This commit is contained in:
@@ -19,6 +19,7 @@ use compact_map::CompactMap;
|
||||
use redb::{Database, Durability, ReadableDatabase, ReadableTable, TableDefinition};
|
||||
|
||||
use crate::storage::idx;
|
||||
use crate::storage::needle::needle::get_actual_size;
|
||||
use crate::storage::types::*;
|
||||
|
||||
// ============================================================================
|
||||
@@ -63,6 +64,11 @@ pub struct NeedleMapMetric {
|
||||
pub deletion_count: AtomicI64,
|
||||
pub deletion_byte_count: AtomicU64,
|
||||
pub max_file_key: AtomicU64,
|
||||
/// Largest (offset.to_actual_offset() + get_actual_size(size, version))
|
||||
/// observed during the load walk. Used at volume load to verify that no
|
||||
/// .idx entry references bytes past the end of .dat (issue #8928)
|
||||
/// without paying for a second linear scan.
|
||||
pub max_needle_end: AtomicI64,
|
||||
}
|
||||
|
||||
impl NeedleMapMetric {
|
||||
@@ -108,6 +114,29 @@ impl NeedleMapMetric {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update `max_needle_end` if this entry's (offset + actual size) exceeds
|
||||
/// the running maximum. Skips deleted/zero-offset entries because they
|
||||
/// don't reserve space in .dat.
|
||||
fn maybe_set_max_needle_end(&self, offset: Offset, size: Size, version: Version) {
|
||||
if offset.is_zero() || !size.is_valid() {
|
||||
return;
|
||||
}
|
||||
let end = offset.to_actual_offset() + get_actual_size(size, version);
|
||||
loop {
|
||||
let current = self.max_needle_end.load(Ordering::Relaxed);
|
||||
if end <= current {
|
||||
break;
|
||||
}
|
||||
if self
|
||||
.max_needle_end
|
||||
.compare_exchange(current, end, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
@@ -163,9 +192,10 @@ impl CompactNeedleMap {
|
||||
}
|
||||
|
||||
/// Load from an .idx file, building the in-memory map.
|
||||
pub fn load_from_idx<R: Read + Seek>(reader: &mut R) -> io::Result<Self> {
|
||||
pub fn load_from_idx<R: Read + Seek>(reader: &mut R, version: Version) -> io::Result<Self> {
|
||||
let mut nm = CompactNeedleMap::new();
|
||||
idx::walk_index_file(reader, 0, |key, offset, size| {
|
||||
nm.metric.maybe_set_max_needle_end(offset, size, version);
|
||||
if offset.is_zero() || size.is_deleted() {
|
||||
nm.delete_from_map(key);
|
||||
} else {
|
||||
@@ -284,6 +314,12 @@ impl CompactNeedleMap {
|
||||
NeedleId(self.metric.max_file_key.load(Ordering::Relaxed))
|
||||
}
|
||||
|
||||
/// Largest (offset + actual size) seen during the load walk; 0 if the
|
||||
/// map is empty. See `NeedleMapMetric::maybe_set_max_needle_end`.
|
||||
pub fn max_needle_end(&self) -> i64 {
|
||||
self.metric.max_needle_end.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn index_file_size(&self) -> u64 {
|
||||
self.idx_file_offset
|
||||
}
|
||||
@@ -431,7 +467,7 @@ impl RedbNeedleMap {
|
||||
|
||||
/// Rebuild metrics by scanning all entries in the redb table.
|
||||
/// Called when reusing an existing .rdb without a full rebuild.
|
||||
fn rebuild_metrics_from_db(&self) -> io::Result<()> {
|
||||
fn rebuild_metrics_from_db(&self, version: Version) -> io::Result<()> {
|
||||
let txn = self
|
||||
.db
|
||||
.begin_read()
|
||||
@@ -453,6 +489,8 @@ impl RedbNeedleMap {
|
||||
arr.copy_from_slice(bytes);
|
||||
let nv = unpack_needle_value(&arr);
|
||||
self.metric.maybe_set_max_file_key(key);
|
||||
self.metric
|
||||
.maybe_set_max_needle_end(nv.offset, nv.size, version);
|
||||
if nv.size.is_valid() {
|
||||
self.metric.file_count.fetch_add(1, Ordering::Relaxed);
|
||||
self.metric
|
||||
@@ -477,20 +515,24 @@ impl RedbNeedleMap {
|
||||
/// 2. If .idx size matches → reuse .rdb, rebuild metrics from scan
|
||||
/// 3. If .idx is larger → replay new entries incrementally
|
||||
/// 4. Otherwise (missing, corrupted, .idx smaller) → full rebuild
|
||||
pub fn load_from_idx<R: Read + Seek>(db_path: &str, reader: &mut R) -> io::Result<Self> {
|
||||
pub fn load_from_idx<R: Read + Seek>(
|
||||
db_path: &str,
|
||||
reader: &mut R,
|
||||
version: Version,
|
||||
) -> io::Result<Self> {
|
||||
let idx_size = reader.seek(io::SeekFrom::End(0))?;
|
||||
reader.seek(io::SeekFrom::Start(0))?;
|
||||
|
||||
// Try to reuse existing .rdb
|
||||
if Path::new(db_path).exists() {
|
||||
if let Ok(nm) = Self::try_reuse_rdb(db_path, reader, idx_size) {
|
||||
if let Ok(nm) = Self::try_reuse_rdb(db_path, reader, idx_size, version) {
|
||||
return Ok(nm);
|
||||
}
|
||||
// Reuse failed — fall through to full rebuild
|
||||
reader.seek(io::SeekFrom::Start(0))?;
|
||||
}
|
||||
|
||||
Self::full_rebuild(db_path, reader, idx_size)
|
||||
Self::full_rebuild(db_path, reader, idx_size, version)
|
||||
}
|
||||
|
||||
/// Try to reuse an existing .rdb file. Returns Ok if successful,
|
||||
@@ -499,6 +541,7 @@ impl RedbNeedleMap {
|
||||
db_path: &str,
|
||||
reader: &mut R,
|
||||
idx_size: u64,
|
||||
version: Version,
|
||||
) -> io::Result<Self> {
|
||||
let db = Database::open(db_path)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb open: {}", e)))?;
|
||||
@@ -523,7 +566,7 @@ impl RedbNeedleMap {
|
||||
}
|
||||
|
||||
// Rebuild metrics from existing data
|
||||
nm.rebuild_metrics_from_db()?;
|
||||
nm.rebuild_metrics_from_db(version)?;
|
||||
|
||||
if stored_idx_size < idx_size {
|
||||
// .idx grew — replay new entries incrementally
|
||||
@@ -534,6 +577,7 @@ impl RedbNeedleMap {
|
||||
io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e))
|
||||
})?;
|
||||
idx::walk_index_file(reader, start_entry, |key, offset, size| {
|
||||
nm.metric.maybe_set_max_needle_end(offset, size, version);
|
||||
let key_u64: u64 = key.into();
|
||||
if offset.is_zero() || size.is_deleted() {
|
||||
// Delete: look up old value for metric update, then
|
||||
@@ -607,6 +651,7 @@ impl RedbNeedleMap {
|
||||
db_path: &str,
|
||||
reader: &mut R,
|
||||
idx_size: u64,
|
||||
version: Version,
|
||||
) -> io::Result<Self> {
|
||||
let _ = std::fs::remove_file(db_path);
|
||||
let nm = RedbNeedleMap::new(db_path)?;
|
||||
@@ -614,6 +659,7 @@ impl RedbNeedleMap {
|
||||
// Collect entries from idx file, resolving duplicates/deletions
|
||||
let mut entries: HashMap<NeedleId, Option<NeedleValue>> = HashMap::new();
|
||||
idx::walk_index_file(reader, 0, |key, offset, size| {
|
||||
nm.metric.maybe_set_max_needle_end(offset, size, version);
|
||||
if offset.is_zero() || size.is_deleted() {
|
||||
entries.insert(key, None);
|
||||
} else {
|
||||
@@ -788,6 +834,12 @@ impl RedbNeedleMap {
|
||||
NeedleId(self.metric.max_file_key.load(Ordering::Relaxed))
|
||||
}
|
||||
|
||||
/// Largest (offset + actual size) seen during the load walk; 0 if the
|
||||
/// map is empty. See `NeedleMapMetric::maybe_set_max_needle_end`.
|
||||
pub fn max_needle_end(&self) -> i64 {
|
||||
self.metric.max_needle_end.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn index_file_size(&self) -> u64 {
|
||||
self.idx_file_offset
|
||||
}
|
||||
@@ -988,6 +1040,16 @@ impl NeedleMap {
|
||||
}
|
||||
}
|
||||
|
||||
/// Largest (offset + actual size) seen during the load walk; 0 if the
|
||||
/// map is empty. Used at volume load to detect .idx entries that
|
||||
/// reference past the end of .dat (issue #8928) without a second scan.
|
||||
pub fn max_needle_end(&self) -> i64 {
|
||||
match self {
|
||||
NeedleMap::InMemory(nm) => nm.max_needle_end(),
|
||||
NeedleMap::Redb(nm) => nm.max_needle_end(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Index file size in bytes.
|
||||
pub fn index_file_size(&self) -> u64 {
|
||||
match self {
|
||||
@@ -1157,7 +1219,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let mut cursor = Cursor::new(idx_data);
|
||||
let nm = CompactNeedleMap::load_from_idx(&mut cursor).unwrap();
|
||||
let nm = CompactNeedleMap::load_from_idx(&mut cursor, Version::current()).unwrap();
|
||||
|
||||
assert!(nm.get(NeedleId(1)).is_some());
|
||||
assert!(nm.get(NeedleId(2)).is_none()); // deleted
|
||||
@@ -1300,7 +1362,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let mut cursor = Cursor::new(idx_data);
|
||||
let nm = RedbNeedleMap::load_from_idx(db_path.to_str().unwrap(), &mut cursor).unwrap();
|
||||
let nm = RedbNeedleMap::load_from_idx(db_path.to_str().unwrap(), &mut cursor, Version::current()).unwrap();
|
||||
|
||||
assert!(nm.get(NeedleId(1)).is_some());
|
||||
assert!(nm.get(NeedleId(2)).is_none()); // deleted and removed
|
||||
@@ -1377,7 +1439,7 @@ mod tests {
|
||||
|
||||
// Load back with CompactNeedleMap to verify
|
||||
let mut idx_file = std::fs::File::open(&idx_path).unwrap();
|
||||
let loaded = CompactNeedleMap::load_from_idx(&mut idx_file).unwrap();
|
||||
let loaded = CompactNeedleMap::load_from_idx(&mut idx_file, Version::current()).unwrap();
|
||||
assert_eq!(loaded.file_count(), 2); // only live entries
|
||||
assert!(loaded.get(NeedleId(1)).is_some());
|
||||
assert!(loaded.get(NeedleId(2)).is_none()); // deleted, not saved
|
||||
|
||||
@@ -19,6 +19,8 @@ use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use tracing::warn;
|
||||
|
||||
#[cfg(test)]
|
||||
use crate::storage::idx;
|
||||
use crate::storage::needle::needle::{self, get_actual_size, Needle, NeedleError};
|
||||
use crate::storage::needle_map::{CompactNeedleMap, NeedleMap, NeedleMapKind, RedbNeedleMap};
|
||||
use crate::storage::super_block::{ReplicaPlacement, SuperBlock, SUPER_BLOCK_SIZE};
|
||||
@@ -73,6 +75,24 @@ pub enum VolumeError {
|
||||
StreamingUnsupported,
|
||||
}
|
||||
|
||||
/// Returns true when a needle read failed because the on-disk bytes are
|
||||
/// unreadable in a permanent way (offset past EOF, header corruption, CRC
|
||||
/// mismatch, malformed v2/v3/v4 fields). Vacuum can safely drop such entries
|
||||
/// during compaction. Anything else (real disk EIO, ERROR_CRC on Windows,
|
||||
/// network timeouts, EROFS, etc.) is transient or environmental and must
|
||||
/// abort the compaction so an operator notices.
|
||||
fn is_skippable_needle_read_error(e: &VolumeError) -> bool {
|
||||
match e {
|
||||
VolumeError::Io(io_err) => io_err.kind() == io::ErrorKind::UnexpectedEof,
|
||||
VolumeError::Needle(NeedleError::SizeMismatch { .. }) => true,
|
||||
VolumeError::Needle(NeedleError::CrcMismatch { .. }) => true,
|
||||
VolumeError::Needle(NeedleError::IndexOutOfRange(_)) => true,
|
||||
VolumeError::Needle(NeedleError::TailTooShort) => true,
|
||||
VolumeError::SizeMismatch => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// VolumeInfo (.vif persistence)
|
||||
// ============================================================================
|
||||
@@ -712,6 +732,28 @@ impl Volume {
|
||||
"volumeDataIntegrityChecking failed"
|
||||
);
|
||||
}
|
||||
|
||||
// Structural check: no .idx entry may reference bytes past the
|
||||
// end of .dat. The needle map's load walk above already
|
||||
// populated max_needle_end, so this is a numeric comparison
|
||||
// — no extra disk I/O. A violation marks the volume read-only
|
||||
// so vacuum doesn't silently drop reachable data based on a
|
||||
// corrupt .idx left over from a crashed batched write.
|
||||
// See issue #8928.
|
||||
if let Some(ref nm) = self.nm {
|
||||
if let Ok(dat_size) = self.current_dat_file_size() {
|
||||
let max_end = nm.max_needle_end();
|
||||
if dat_size > 0 && max_end > dat_size as i64 {
|
||||
self.no_write_or_delete = true;
|
||||
warn!(
|
||||
volume_id = self.id.0,
|
||||
max_needle_end = max_end,
|
||||
dat_size,
|
||||
"idx references bytes past end of .dat; marking volume read-only"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -759,7 +801,7 @@ impl Volume {
|
||||
// Open read-only
|
||||
if Path::new(&idx_path).exists() {
|
||||
let mut idx_file = File::open(&idx_path)?;
|
||||
let nm = CompactNeedleMap::load_from_idx(&mut idx_file)?;
|
||||
let nm = CompactNeedleMap::load_from_idx(&mut idx_file, self.version())?;
|
||||
self.nm = Some(NeedleMap::InMemory(nm));
|
||||
} else {
|
||||
// Missing .idx with existing .dat could orphan needles
|
||||
@@ -785,7 +827,7 @@ impl Volume {
|
||||
|
||||
let idx_size = idx_file.metadata()?.len();
|
||||
let mut idx_reader = io::BufReader::new(&idx_file);
|
||||
let mut nm = CompactNeedleMap::load_from_idx(&mut idx_reader)?;
|
||||
let mut nm = CompactNeedleMap::load_from_idx(&mut idx_reader, self.version())?;
|
||||
|
||||
// Re-open for append-only writes
|
||||
let write_file = OpenOptions::new()
|
||||
@@ -808,7 +850,7 @@ impl Volume {
|
||||
// Open read-only
|
||||
if Path::new(&idx_path).exists() {
|
||||
let mut idx_file = File::open(&idx_path)?;
|
||||
let nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_file)?;
|
||||
let nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_file, self.version())?;
|
||||
self.nm = Some(NeedleMap::Redb(nm));
|
||||
} else {
|
||||
// Missing .idx with existing .dat could orphan needles
|
||||
@@ -834,7 +876,7 @@ impl Volume {
|
||||
|
||||
let idx_size = idx_file.metadata()?.len();
|
||||
let mut idx_reader = io::BufReader::new(&idx_file);
|
||||
let mut nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_reader)?;
|
||||
let mut nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_reader, self.version())?;
|
||||
|
||||
// Re-open for append-only writes
|
||||
let write_file = OpenOptions::new()
|
||||
@@ -1752,6 +1794,11 @@ impl Volume {
|
||||
|
||||
let version = self.version();
|
||||
|
||||
// The deeper-than-tail structural check (every (offset + actual size)
|
||||
// fits inside .dat — issue #8928) is now handled in load() via the
|
||||
// needle map's max_needle_end accumulator, so we don't pay for a
|
||||
// second linear scan of the .idx here.
|
||||
|
||||
// Check last 10 index entries (matching Go's CheckVolumeDataIntegrity).
|
||||
// Go starts healthyIndexSize = indexSize and reduces on EOF.
|
||||
// On success: break (err != ErrorSizeMismatch when err == nil).
|
||||
@@ -2683,6 +2730,8 @@ impl Volume {
|
||||
}
|
||||
entries.sort_by_key(|(_, offset, _)| *offset);
|
||||
|
||||
let mut skipped_needles: u64 = 0;
|
||||
let mut skipped_data_bytes: u64 = 0;
|
||||
for (id, offset, size) in entries {
|
||||
// Progress callback
|
||||
if !progress_fn(offset.to_actual_offset()) {
|
||||
@@ -2699,7 +2748,41 @@ impl Volume {
|
||||
id,
|
||||
..Needle::default()
|
||||
};
|
||||
self.read_needle_data_at(&mut n, offset.to_actual_offset(), size)?;
|
||||
match self.read_needle_data_at(&mut n, offset.to_actual_offset(), size) {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
// Record EIO for health monitoring (parity with Go's checkReadWriteError).
|
||||
if let VolumeError::Io(ref io_err) = e {
|
||||
self.check_read_write_error(Some(io_err));
|
||||
}
|
||||
// Only drop the entry when the failure is one of the well-
|
||||
// known permanent-corruption shapes. A transient disk fault,
|
||||
// a tiered-read timeout, or a Windows hardware error (which
|
||||
// surfaces as a generic Io rather than UnexpectedEof) must
|
||||
// abort so an operator notices, rather than silently
|
||||
// compacting away data that might come back on retry.
|
||||
// See issue #8928.
|
||||
if !is_skippable_needle_read_error(&e) {
|
||||
return Err(VolumeError::Io(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("cannot hydrate needle from file: {}", e),
|
||||
)));
|
||||
}
|
||||
skipped_needles += 1;
|
||||
if size.is_valid() {
|
||||
skipped_data_bytes += size.0 as u64;
|
||||
}
|
||||
warn!(
|
||||
volume_id = self.id.0,
|
||||
key = id.0,
|
||||
offset = offset.to_actual_offset(),
|
||||
size = size.0,
|
||||
error = %e,
|
||||
"vacuum: dropping unreadable needle"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Skip TTL-expired needles using the volume's TTL (matches Go's volume_vacuum.go)
|
||||
if n.has_ttl() {
|
||||
@@ -2721,6 +2804,15 @@ impl Volume {
|
||||
new_offset += bytes.len() as i64;
|
||||
}
|
||||
|
||||
if skipped_needles > 0 {
|
||||
warn!(
|
||||
volume_id = self.id.0,
|
||||
skipped_needles,
|
||||
skipped_data_bytes,
|
||||
"vacuum: dropped unreadable index entries during compaction"
|
||||
);
|
||||
}
|
||||
|
||||
dst.sync_all()?;
|
||||
|
||||
// Save new index
|
||||
@@ -3710,6 +3802,138 @@ mod tests {
|
||||
v.cleanup_compact().unwrap();
|
||||
}
|
||||
|
||||
/// Vacuum compaction must tolerate an .idx entry whose offset points past
|
||||
/// the end of the .dat file (the failure mode in issue #8928). The bad
|
||||
/// entry is silently dropped from the resulting .cpx; healthy needles
|
||||
/// survive untouched.
|
||||
#[test]
|
||||
fn test_compact_by_index_drops_dangling_needle() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let dir = tmp.path().to_str().unwrap();
|
||||
let mut v = make_test_volume(dir);
|
||||
|
||||
// Write a handful of healthy needles to establish a baseline .dat/.idx.
|
||||
for i in 1..=5u64 {
|
||||
let mut n = Needle {
|
||||
id: NeedleId(i),
|
||||
cookie: Cookie(i as u32),
|
||||
data: format!("payload-{}", i).into_bytes(),
|
||||
data_size: format!("payload-{}", i).len() as u32,
|
||||
..Needle::default()
|
||||
};
|
||||
v.write_needle(&mut n, true).unwrap();
|
||||
}
|
||||
v.sync_to_disk().unwrap();
|
||||
|
||||
let dat_size = v.dat_file_size().unwrap();
|
||||
let bad_key = NeedleId(9999);
|
||||
let bad_offset = Offset::from_actual_offset((dat_size + 1024 * 1024) as i64);
|
||||
let bad_size = Size(2048);
|
||||
v.nm
|
||||
.as_mut()
|
||||
.expect("needle map present")
|
||||
.put(bad_key, bad_offset, bad_size)
|
||||
.unwrap();
|
||||
v.nm.as_ref().unwrap().sync().unwrap();
|
||||
|
||||
// Vacuum must succeed in spite of the dangling entry.
|
||||
v.compact_by_index(0, 0, |_| true)
|
||||
.expect("compact_by_index should tolerate dangling entries");
|
||||
|
||||
// Walk the resulting .cpx and confirm the bad key was dropped while
|
||||
// every healthy key made it through.
|
||||
let cpx_path = v.file_name(".cpx");
|
||||
let mut cpx = File::open(&cpx_path).unwrap();
|
||||
let mut kept: Vec<u64> = Vec::new();
|
||||
idx::walk_index_file(&mut cpx, 0, |key, _, size| {
|
||||
if !size.is_deleted() {
|
||||
kept.push(key.0);
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
!kept.contains(&bad_key.0),
|
||||
"dangling key {} should have been dropped from .cpx, got {:?}",
|
||||
bad_key.0,
|
||||
kept
|
||||
);
|
||||
for i in 1..=5u64 {
|
||||
assert!(
|
||||
kept.contains(&i),
|
||||
"healthy key {} missing from compacted .cpx, got {:?}",
|
||||
i,
|
||||
kept
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// The needle map's max_needle_end accumulator must let volume.load
|
||||
/// detect an .idx whose entries point past the end of the .dat — the
|
||||
/// deeper-than-tail corruption shape from issue #8928 that the existing
|
||||
/// last-10-entries scan cannot see. The check is populated by the load
|
||||
/// walk and read in volume.load() to flip the volume read-only.
|
||||
#[test]
|
||||
fn test_max_needle_end_detects_dangling_entry() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let dir = tmp.path().to_str().unwrap();
|
||||
let mut v = make_test_volume(dir);
|
||||
|
||||
for i in 1..=4u64 {
|
||||
let mut n = Needle {
|
||||
id: NeedleId(i),
|
||||
cookie: Cookie(i as u32),
|
||||
data: format!("data-{}", i).into_bytes(),
|
||||
data_size: format!("data-{}", i).len() as u32,
|
||||
..Needle::default()
|
||||
};
|
||||
v.write_needle(&mut n, true).unwrap();
|
||||
}
|
||||
v.sync_to_disk().unwrap();
|
||||
|
||||
let dat_size = v.dat_file_size().unwrap() as i64;
|
||||
let idx_path = v.file_name(".idx");
|
||||
let version = v.version();
|
||||
|
||||
// Sanity: a fresh load walk over the healthy .idx puts max_needle_end
|
||||
// somewhere inside the .dat.
|
||||
let mut idx_reader = File::open(&idx_path).unwrap();
|
||||
let healthy_nm =
|
||||
CompactNeedleMap::load_from_idx(&mut idx_reader, version).unwrap();
|
||||
let healthy_end = healthy_nm.max_needle_end();
|
||||
assert!(
|
||||
healthy_end > 0 && healthy_end <= dat_size,
|
||||
"healthy volume should have max_needle_end ({}) in [1, dat_size={}]",
|
||||
healthy_end,
|
||||
dat_size
|
||||
);
|
||||
|
||||
// Inject a dangling entry by appending a bogus 16/17-byte record
|
||||
// directly to the .idx, then reload. The load walk should observe
|
||||
// max_needle_end past dat_size — which is exactly the signal
|
||||
// volume.load uses to mark the volume read-only.
|
||||
let bad_offset = Offset::from_actual_offset(dat_size + 4 * 1024 * 1024);
|
||||
let mut idx_append = OpenOptions::new()
|
||||
.write(true)
|
||||
.append(true)
|
||||
.open(&idx_path)
|
||||
.unwrap();
|
||||
idx::write_index_entry(&mut idx_append, NeedleId(9999), bad_offset, Size(1024))
|
||||
.unwrap();
|
||||
idx_append.sync_all().unwrap();
|
||||
|
||||
let mut idx_reread = File::open(&idx_path).unwrap();
|
||||
let bad_nm = CompactNeedleMap::load_from_idx(&mut idx_reread, version).unwrap();
|
||||
let bad_end = bad_nm.max_needle_end();
|
||||
assert!(
|
||||
bad_end > dat_size,
|
||||
"after dangling-entry inject max_needle_end ({}) should exceed dat_size ({})",
|
||||
bad_end,
|
||||
dat_size
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compaction_revision_relookup() {
|
||||
// Verifies that re_lookup_needle_data_offset returns the correct data offset
|
||||
|
||||
@@ -27,6 +27,12 @@ const (
|
||||
var ErrorSizeMismatch = errors.New("size mismatch")
|
||||
var ErrorSizeInvalid = errors.New("size invalid")
|
||||
|
||||
// ErrorCorrupted marks a needle whose on-disk bytes cannot be parsed because
|
||||
// of data corruption: bad CRC, malformed v2/v3/v4 headers, or out-of-range
|
||||
// fields. Wrap errors with %w so callers (e.g. vacuum compaction) can
|
||||
// distinguish "the bytes are bad" from a genuine I/O fault.
|
||||
var ErrorCorrupted = errors.New("needle data corrupted")
|
||||
|
||||
func (n *Needle) DiskSize(version Version) int64 {
|
||||
return GetActualSize(n.Size, version)
|
||||
}
|
||||
@@ -58,7 +64,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio
|
||||
return ErrorSizeMismatch
|
||||
}
|
||||
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatch).Inc()
|
||||
return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
|
||||
return fmt.Errorf("%w: entry not found: offset %d found id %x size %d, expected size %d", ErrorSizeMismatch, offset, n.Id, n.Size, size)
|
||||
}
|
||||
if version == Version1 {
|
||||
n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
|
||||
@@ -106,7 +112,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
|
||||
index = index + 4
|
||||
if int(n.DataSize)+index > lenBytes {
|
||||
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
|
||||
return fmt.Errorf("index out of range %d", 1)
|
||||
return fmt.Errorf("index out of range %d: %w", 1, ErrorCorrupted)
|
||||
}
|
||||
n.Data = bytes[index : index+int(n.DataSize)]
|
||||
index = index + int(n.DataSize)
|
||||
@@ -125,7 +131,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err
|
||||
index = index + 1
|
||||
if int(n.NameSize)+index > lenBytes {
|
||||
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
|
||||
return index, fmt.Errorf("index out of range %d", 2)
|
||||
return index, fmt.Errorf("index out of range %d: %w", 2, ErrorCorrupted)
|
||||
}
|
||||
n.Name = bytes[index : index+int(n.NameSize)]
|
||||
index = index + int(n.NameSize)
|
||||
@@ -135,7 +141,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err
|
||||
index = index + 1
|
||||
if int(n.MimeSize)+index > lenBytes {
|
||||
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
|
||||
return index, fmt.Errorf("index out of range %d", 3)
|
||||
return index, fmt.Errorf("index out of range %d: %w", 3, ErrorCorrupted)
|
||||
}
|
||||
n.Mime = bytes[index : index+int(n.MimeSize)]
|
||||
index = index + int(n.MimeSize)
|
||||
@@ -143,7 +149,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err
|
||||
if index < lenBytes && n.HasLastModifiedDate() {
|
||||
if LastModifiedBytesLength+index > lenBytes {
|
||||
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
|
||||
return index, fmt.Errorf("index out of range %d", 4)
|
||||
return index, fmt.Errorf("index out of range %d: %w", 4, ErrorCorrupted)
|
||||
}
|
||||
n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
|
||||
index = index + LastModifiedBytesLength
|
||||
@@ -151,7 +157,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err
|
||||
if index < lenBytes && n.HasTtl() {
|
||||
if TtlBytesLength+index > lenBytes {
|
||||
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
|
||||
return index, fmt.Errorf("index out of range %d", 5)
|
||||
return index, fmt.Errorf("index out of range %d: %w", 5, ErrorCorrupted)
|
||||
}
|
||||
n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
|
||||
index = index + TtlBytesLength
|
||||
@@ -159,13 +165,13 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err
|
||||
if index < lenBytes && n.HasPairs() {
|
||||
if 2+index > lenBytes {
|
||||
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
|
||||
return index, fmt.Errorf("index out of range %d", 6)
|
||||
return index, fmt.Errorf("index out of range %d: %w", 6, ErrorCorrupted)
|
||||
}
|
||||
n.PairsSize = util.BytesToUint16(bytes[index : index+2])
|
||||
index += 2
|
||||
if int(n.PairsSize)+index > lenBytes {
|
||||
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
|
||||
return index, fmt.Errorf("index out of range %d", 7)
|
||||
return index, fmt.Errorf("index out of range %d: %w", 7, ErrorCorrupted)
|
||||
}
|
||||
end := index + int(n.PairsSize)
|
||||
n.Pairs = bytes[index:end]
|
||||
|
||||
@@ -18,7 +18,7 @@ func (n *Needle) readNeedleTail(needleBody []byte, version Version) error {
|
||||
// with seaweed version using crc.Value() instead of uint32(crc), which appears in commit 056c480eb
|
||||
// and switch appeared in version 3.09.
|
||||
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorCRC).Inc()
|
||||
return fmt.Errorf("invalid CRC for needle %v (got %08x, want %08x), data on disk corrupted", n.Id, dataChecksum, expectedChecksum)
|
||||
return fmt.Errorf("invalid CRC for needle %v (got %08x, want %08x), data on disk corrupted: %w", n.Id, dataChecksum, expectedChecksum, ErrorCorrupted)
|
||||
}
|
||||
n.Checksum = dataChecksum
|
||||
} else {
|
||||
|
||||
@@ -31,6 +31,7 @@ type NeedleMapper interface {
|
||||
FileCount() int
|
||||
DeletedCount() int
|
||||
MaxFileKey() NeedleId
|
||||
MaxNeedleEnd() int64
|
||||
IndexFileSize() uint64
|
||||
Sync() error
|
||||
ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error)
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
@@ -39,7 +40,7 @@ type LevelDbNeedleMap struct {
|
||||
recordCount uint64
|
||||
}
|
||||
|
||||
func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options, ldbTimeout int64) (m *LevelDbNeedleMap, err error) {
|
||||
func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options, ldbTimeout int64, version needle.Version) (m *LevelDbNeedleMap, err error) {
|
||||
m = &LevelDbNeedleMap{dbFileName: dbFileName}
|
||||
m.indexFile = indexFile
|
||||
if !isLevelDbFresh(dbFileName, indexFile) {
|
||||
@@ -72,7 +73,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option
|
||||
return
|
||||
}
|
||||
}
|
||||
mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
|
||||
mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile, version)
|
||||
if indexLoadError != nil {
|
||||
return nil, indexLoadError
|
||||
}
|
||||
@@ -334,6 +335,10 @@ func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *
|
||||
|
||||
func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) (err error) {
|
||||
glog.V(0).Infof("loading idx to leveldb from offset %d for file: %s", startFrom, indexFile.Name())
|
||||
version := needle.GetCurrentVersion()
|
||||
if v != nil {
|
||||
version = v.Version()
|
||||
}
|
||||
dbFileName := v.FileName(".cpldb")
|
||||
db, dbErr := leveldb.OpenFile(dbFileName, nil)
|
||||
defer func() {
|
||||
@@ -356,6 +361,7 @@ func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startF
|
||||
|
||||
err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) {
|
||||
m.mapMetric.FileCounter++
|
||||
m.mapMetric.MaybeSetMaxNeedleEnd(offset, size, version)
|
||||
bytes := make([]byte, NeedleIdSize)
|
||||
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
|
||||
// fresh loading
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
)
|
||||
|
||||
@@ -25,7 +26,7 @@ func TestLevelDbNeedleMap_Concurrency(t *testing.T) {
|
||||
dbFileName := filepath.Join(dir, prefix+".ldb")
|
||||
|
||||
// Create and initialize map
|
||||
m, err := NewLevelDbNeedleMap(dbFileName, indexFile, nil, 1)
|
||||
m, err := NewLevelDbNeedleMap(dbFileName, indexFile, nil, 1, needle.GetCurrentVersion())
|
||||
if err != nil {
|
||||
t.Fatalf("NewLevelDbNeedleMap: %v", err)
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
||||
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
@@ -28,14 +29,15 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap {
|
||||
return nm
|
||||
}
|
||||
|
||||
func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
|
||||
func LoadCompactNeedleMap(file *os.File, version needle.Version) (*NeedleMap, error) {
|
||||
nm := NewCompactNeedleMap(file)
|
||||
return doLoading(file, nm)
|
||||
return doLoading(file, nm, version)
|
||||
}
|
||||
|
||||
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
|
||||
func doLoading(file *os.File, nm *NeedleMap, version needle.Version) (*NeedleMap, error) {
|
||||
e := idx.WalkIndexFile(file, 0, func(key NeedleId, offset Offset, size Size) error {
|
||||
nm.MaybeSetMaxFileKey(key)
|
||||
nm.MaybeSetMaxNeedleEnd(offset, size, version)
|
||||
if !offset.IsZero() && !size.IsDeleted() {
|
||||
nm.FileCounter++
|
||||
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
|
||||
@@ -109,8 +111,13 @@ func (nm *NeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Op
|
||||
|
||||
func (nm *NeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error {
|
||||
glog.V(0).Infof("loading idx from offset %d for file: %s", startFrom, indexFile.Name())
|
||||
version := needle.GetCurrentVersion()
|
||||
if v != nil {
|
||||
version = v.Version()
|
||||
}
|
||||
e := idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) error {
|
||||
nm.MaybeSetMaxFileKey(key)
|
||||
nm.MaybeSetMaxNeedleEnd(offset, size, version)
|
||||
nm.FileCounter++
|
||||
if !offset.IsZero() && !size.IsDeleted() {
|
||||
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
boom "github.com/tylertreat/BoomFilters"
|
||||
)
|
||||
@@ -17,6 +18,12 @@ type mapMetric struct {
|
||||
DeletionByteCounter uint64 `json:"DeletionByteCounter"`
|
||||
FileByteCounter uint64 `json:"FileByteCounter"`
|
||||
MaximumFileKey uint64 `json:"MaxFileKey"`
|
||||
// MaximumNeedleEnd is the largest (offset.ToActualOffset() +
|
||||
// GetActualSize(size, version)) seen during the index walk. It is used
|
||||
// at volume load to verify that no .idx entry references bytes past
|
||||
// the end of the .dat — the deeper-than-tail corruption shape from
|
||||
// issue #8928 — without paying for a second linear scan of the index.
|
||||
MaximumNeedleEnd int64 `json:"MaxNeedleEnd"`
|
||||
}
|
||||
|
||||
func (mm *mapMetric) logDelete(deletedByteCount Size) {
|
||||
@@ -92,7 +99,27 @@ func (mm *mapMetric) MaybeSetMaxFileKey(key NeedleId) {
|
||||
}
|
||||
}
|
||||
|
||||
func needleMapMetricFromIndexFile(r *os.File, mm *mapMetric) error {
|
||||
// MaybeSetMaxNeedleEnd updates MaximumNeedleEnd if the supplied entry's
|
||||
// (offset + actual size) is larger than what we have seen so far. Skips
|
||||
// deleted/zero-offset entries because they don't reserve space in .dat.
|
||||
func (mm *mapMetric) MaybeSetMaxNeedleEnd(offset Offset, size Size, version needle.Version) {
|
||||
if mm == nil || offset.IsZero() || !size.IsValid() {
|
||||
return
|
||||
}
|
||||
end := offset.ToActualOffset() + needle.GetActualSize(size, version)
|
||||
if end > atomic.LoadInt64(&mm.MaximumNeedleEnd) {
|
||||
atomic.StoreInt64(&mm.MaximumNeedleEnd, end)
|
||||
}
|
||||
}
|
||||
|
||||
func (mm *mapMetric) MaxNeedleEnd() int64 {
|
||||
if mm == nil {
|
||||
return 0
|
||||
}
|
||||
return atomic.LoadInt64(&mm.MaximumNeedleEnd)
|
||||
}
|
||||
|
||||
func needleMapMetricFromIndexFile(r *os.File, mm *mapMetric, version needle.Version) error {
|
||||
var bf *boom.BloomFilter
|
||||
buf := make([]byte, NeedleIdSize)
|
||||
err := reverseWalkIndexFile(r, func(entryCount int64) {
|
||||
@@ -100,6 +127,7 @@ func needleMapMetricFromIndexFile(r *os.File, mm *mapMetric) error {
|
||||
}, func(key NeedleId, offset Offset, size Size) error {
|
||||
|
||||
mm.MaybeSetMaxFileKey(key)
|
||||
mm.MaybeSetMaxNeedleEnd(offset, size, version)
|
||||
NeedleIdToBytes(buf, key)
|
||||
if size.IsValid() {
|
||||
mm.FileByteCounter += uint64(size)
|
||||
@@ -124,9 +152,9 @@ func needleMapMetricFromIndexFile(r *os.File, mm *mapMetric) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
|
||||
func newNeedleMapMetricFromIndexFile(r *os.File, version needle.Version) (mm *mapMetric, err error) {
|
||||
mm = &mapMetric{}
|
||||
err = needleMapMetricFromIndexFile(r, mm)
|
||||
err = needleMapMetricFromIndexFile(r, mm, version)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
)
|
||||
|
||||
@@ -21,7 +22,7 @@ func TestFastLoadingNeedleMapMetrics(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
mm, _ := newNeedleMapMetricFromIndexFile(idxFile)
|
||||
mm, _ := newNeedleMapMetricFromIndexFile(idxFile, needle.GetCurrentVersion())
|
||||
|
||||
glog.V(0).Infof("FileCount expected %d actual %d", nm.FileCount(), mm.FileCount())
|
||||
glog.V(0).Infof("DeletedSize expected %d actual %d", nm.DeletedSize(), mm.DeletedSize())
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
||||
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
)
|
||||
@@ -16,7 +17,7 @@ type SortedFileNeedleMap struct {
|
||||
dbFileSize int64
|
||||
}
|
||||
|
||||
func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) {
|
||||
func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File, version needle.Version) (m *SortedFileNeedleMap, err error) {
|
||||
m = &SortedFileNeedleMap{baseFileName: indexBaseFileName}
|
||||
m.indexFile = indexFile
|
||||
fileName := indexBaseFileName + ".sdx"
|
||||
@@ -33,7 +34,7 @@ func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *So
|
||||
dbStat, _ := m.dbFile.Stat()
|
||||
m.dbFileSize = dbStat.Size()
|
||||
glog.V(1).Infof("Loading %s...", indexFile.Name())
|
||||
mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
|
||||
mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile, version)
|
||||
if indexLoadError != nil {
|
||||
_ = m.dbFile.Close()
|
||||
return nil, indexLoadError
|
||||
|
||||
@@ -122,6 +122,10 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin
|
||||
if indexSize == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
// The deeper-than-tail structural check (every (offset + actual size)
|
||||
// fits inside .dat — issue #8928) lives in volume.load(): it reads
|
||||
// MaximumNeedleEnd from the needle map after the load walk, so we don't
|
||||
// need a redundant linear scan of the .idx here.
|
||||
healthyIndexSize := indexSize
|
||||
for i := 1; i <= 10 && indexSize >= int64(i)*types.NeedleMapEntrySize; i++ {
|
||||
// check and fix last 10 entries
|
||||
@@ -165,6 +169,7 @@ func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (
|
||||
return lastAppendAtNs, nil
|
||||
}
|
||||
|
||||
|
||||
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
|
||||
if indexSize, err = util.GetFileSize(indexFile); err == nil {
|
||||
if indexSize%types.NeedleMapEntrySize != 0 {
|
||||
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
)
|
||||
|
||||
func TestScrubVolumeData(t *testing.T) {
|
||||
@@ -35,8 +37,8 @@ func TestScrubVolumeData(t *testing.T) {
|
||||
version: needle.Version3,
|
||||
want: 27,
|
||||
wantErrs: []error{
|
||||
fmt.Errorf("needle 3 on volume 0: invalid CRC for needle 3 (got 0b243a0d, want 4af853fb), data on disk corrupted"),
|
||||
fmt.Errorf("needle 48 on volume 0: invalid CRC for needle 30 (got 3c40e8d5, want 5077fea1), data on disk corrupted"),
|
||||
fmt.Errorf("needle 3 on volume 0: invalid CRC for needle 3 (got 0b243a0d, want 4af853fb), data on disk corrupted: needle data corrupted"),
|
||||
fmt.Errorf("needle 48 on volume 0: invalid CRC for needle 30 (got 3c40e8d5, want 5077fea1), data on disk corrupted: needle data corrupted"),
|
||||
fmt.Errorf("data file size for volume 0 (942864) doesn't match the size for 27 needles read (942856)"),
|
||||
},
|
||||
},
|
||||
@@ -78,3 +80,78 @@ func TestScrubVolumeData(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestMaxNeedleEnd ensures the needle map's MaxNeedleEnd accumulator lets
|
||||
// volume.load() detect an .idx that references bytes past the end of the .dat
|
||||
// — the deeper-than-tail corruption shape from issue #8928 that the existing
|
||||
// last-10-entries scan cannot see. The check is populated by the load walk
|
||||
// and read by volume.load() to flip the volume read-only.
|
||||
func TestMaxNeedleEnd(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("volume creation: %v", err)
|
||||
}
|
||||
defer v.Close()
|
||||
|
||||
// A handful of healthy needles establishes a baseline .dat/.idx.
|
||||
for i := 1; i <= 4; i++ {
|
||||
n := newRandomNeedle(uint64(i))
|
||||
if _, _, _, err := v.writeNeedle2(n, true, false); err != nil {
|
||||
t.Fatalf("write needle %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
if err := v.DataBackend.Sync(); err != nil {
|
||||
t.Fatalf("sync .dat: %v", err)
|
||||
}
|
||||
if err := v.nm.Sync(); err != nil {
|
||||
t.Fatalf("sync .idx: %v", err)
|
||||
}
|
||||
|
||||
datSize, _, err := v.DataBackend.GetStat()
|
||||
if err != nil {
|
||||
t.Fatalf("stat .dat: %v", err)
|
||||
}
|
||||
|
||||
// Sanity: a fresh load over the healthy .idx puts MaxNeedleEnd inside
|
||||
// the .dat.
|
||||
idxFile, err := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("open .idx: %v", err)
|
||||
}
|
||||
healthyNm, err := LoadCompactNeedleMap(idxFile, v.Version())
|
||||
idxFile.Close()
|
||||
if err != nil {
|
||||
t.Fatalf("load healthy .idx: %v", err)
|
||||
}
|
||||
healthyEnd := healthyNm.MaxNeedleEnd()
|
||||
if healthyEnd <= 0 || healthyEnd > datSize {
|
||||
t.Fatalf("healthy volume should have MaxNeedleEnd (%d) in (0, dat_size=%d]", healthyEnd, datSize)
|
||||
}
|
||||
|
||||
// Inject a dangling entry by appending to the .idx, then reload. The
|
||||
// walk should observe MaxNeedleEnd past dat_size — exactly the signal
|
||||
// volume.load uses to mark the volume read-only.
|
||||
bogusOffset := types.ToOffset(datSize + 4*1024*1024)
|
||||
if err := v.nm.Put(types.Uint64ToNeedleId(9999), bogusOffset, types.Size(1024)); err != nil {
|
||||
t.Fatalf("inject dangling idx entry: %v", err)
|
||||
}
|
||||
if err := v.nm.Sync(); err != nil {
|
||||
t.Fatalf("sync .idx after inject: %v", err)
|
||||
}
|
||||
|
||||
idxFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("reopen .idx: %v", err)
|
||||
}
|
||||
defer idxFile.Close()
|
||||
badNm, err := LoadCompactNeedleMap(idxFile, v.Version())
|
||||
if err != nil {
|
||||
t.Fatalf("reload .idx after inject: %v", err)
|
||||
}
|
||||
badEnd := badNm.MaxNeedleEnd()
|
||||
if badEnd <= datSize {
|
||||
t.Fatalf("after dangling-entry inject MaxNeedleEnd (%d) should exceed dat_size (%d)", badEnd, datSize)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,8 +173,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
||||
}
|
||||
}
|
||||
|
||||
// The post-load structural check below uses the in-memory needle map
|
||||
// to verify that no .idx entry references bytes past the end of .dat
|
||||
// (issue #8928). The check piggybacks on MaxNeedleEnd, which the load
|
||||
// walks below populate without a second linear scan.
|
||||
|
||||
if v.noWriteOrDelete || v.noWriteCanDelete {
|
||||
if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile); err != nil {
|
||||
if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile, v.Version()); err != nil {
|
||||
glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err)
|
||||
}
|
||||
} else {
|
||||
@@ -185,7 +190,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
||||
err = v.tmpNm.UpdateNeedleMap(v, indexFile, nil, 0)
|
||||
} else {
|
||||
glog.V(2).Infoln("loading memory index", v.FileName(".idx"), "to memory")
|
||||
if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil {
|
||||
if v.nm, err = LoadCompactNeedleMap(indexFile, v.Version()); err != nil {
|
||||
glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err)
|
||||
}
|
||||
}
|
||||
@@ -200,7 +205,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
||||
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
|
||||
} else {
|
||||
glog.V(0).Infoln("loading leveldb index", v.FileName(".ldb"))
|
||||
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil {
|
||||
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
|
||||
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
|
||||
}
|
||||
}
|
||||
@@ -215,7 +220,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
||||
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
|
||||
} else {
|
||||
glog.V(0).Infoln("loading leveldb medium index", v.FileName(".ldb"))
|
||||
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil {
|
||||
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
|
||||
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
|
||||
}
|
||||
}
|
||||
@@ -230,12 +235,28 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
||||
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
|
||||
} else {
|
||||
glog.V(0).Infoln("loading leveldb large index", v.FileName(".ldb"))
|
||||
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil {
|
||||
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
|
||||
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Structural check: no .idx entry may reference bytes past the end of
|
||||
// the .dat. The needle map's load walk above already populated
|
||||
// MaximumNeedleEnd, so this is just a numeric comparison — no extra
|
||||
// disk I/O. A violation marks the volume read-only so a corrupt
|
||||
// .idx left over from a crashed batched write does not silently
|
||||
// power vacuum to drop reachable data. See issue #8928.
|
||||
if !v.HasRemoteFile() && v.nm != nil && v.DataBackend != nil {
|
||||
if datSize, _, statErr := v.DataBackend.GetStat(); statErr == nil && datSize > 0 {
|
||||
if maxEnd := v.nm.MaxNeedleEnd(); maxEnd > datSize {
|
||||
v.noWriteOrDelete = true
|
||||
glog.V(0).Infof("volume %d: idx references end=%d but .dat is %d bytes; marking readonly",
|
||||
v.Id, maxEnd, datSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !hasVolumeInfoFile {
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"runtime"
|
||||
"time"
|
||||
@@ -17,6 +19,24 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
// isSkippableNeedleReadError returns true when a needle read failed because
|
||||
// the on-disk bytes are unreadable in a permanent way (offset past EOF, header
|
||||
// corruption, CRC mismatch, malformed v2/v3/v4 fields). Vacuum can safely drop
|
||||
// such entries during compaction. Anything else (real disk EIO on Unix,
|
||||
// ERROR_CRC / ERROR_IO_DEVICE on Windows, network timeouts, EROFS, etc.) is
|
||||
// transient or environmental and must abort the compaction so an operator
|
||||
// notices, rather than silently dropping recoverable data.
|
||||
func isSkippableNeedleReadError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
return errors.Is(err, io.EOF) ||
|
||||
errors.Is(err, io.ErrUnexpectedEOF) ||
|
||||
errors.Is(err, needle.ErrorSizeMismatch) ||
|
||||
errors.Is(err, needle.ErrorSizeInvalid) ||
|
||||
errors.Is(err, needle.ErrorCorrupted)
|
||||
}
|
||||
|
||||
type ProgressFunc func(processed int64) bool
|
||||
|
||||
func (v *Volume) garbageLevel() float64 {
|
||||
@@ -493,6 +513,10 @@ func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) {
|
||||
}
|
||||
|
||||
writeThrottler := util.NewWriteThrottler(opts.MaxBytesPerSecond)
|
||||
var (
|
||||
skippedNeedles int
|
||||
skippedDataBytes uint64
|
||||
)
|
||||
err = oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
|
||||
|
||||
offset, size := value.Offset, value.Size
|
||||
@@ -510,7 +534,21 @@ func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) {
|
||||
n := new(needle.Needle)
|
||||
if err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, opts.version); err != nil {
|
||||
v.checkReadWriteError(err)
|
||||
return fmt.Errorf("cannot hydrate needle from file: %w", err)
|
||||
// Only drop the entry when the failure is one of the well-known
|
||||
// permanent-corruption shapes. A transient disk fault, a tiered
|
||||
// read timeout, or a Windows hardware error (ERROR_CRC etc.) must
|
||||
// abort so an operator notices, rather than silently compacting
|
||||
// away data that might come back on retry. See issue #8928.
|
||||
if !isSkippableNeedleReadError(err) {
|
||||
return fmt.Errorf("cannot hydrate needle from file: %w", err)
|
||||
}
|
||||
skippedNeedles++
|
||||
if size.IsValid() {
|
||||
skippedDataBytes += uint64(size)
|
||||
}
|
||||
glog.Warningf("vacuum volume %d: dropping unreadable needle key=%d offset=%d size=%d: %v",
|
||||
v.Id, value.Key, offset.ToActualOffset(), size, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if n.HasTtl() && now >= n.LastModified+uint64(opts.superBlock.Ttl.Minutes()*60) {
|
||||
@@ -533,6 +571,10 @@ func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if skippedNeedles > 0 {
|
||||
glog.Warningf("vacuum volume %d: dropped %d unreadable index entries (%d data bytes) during compaction",
|
||||
v.Id, skippedNeedles, skippedDataBytes)
|
||||
}
|
||||
if v.Ttl.String() == "" && v.nm != nil {
|
||||
dstDatSize, _, err := dstDatBackend.GetStat()
|
||||
if err != nil {
|
||||
@@ -540,6 +582,14 @@ func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) {
|
||||
}
|
||||
if v.nm.ContentSize() > v.nm.DeletedSize() {
|
||||
expectedContentSize := v.nm.ContentSize() - v.nm.DeletedSize()
|
||||
// Skipped needles still contribute to the source-side ContentSize but
|
||||
// were not written to the destination, so subtract them before the
|
||||
// safety check to avoid a false positive.
|
||||
if skippedDataBytes >= expectedContentSize {
|
||||
expectedContentSize = 0
|
||||
} else {
|
||||
expectedContentSize -= skippedDataBytes
|
||||
}
|
||||
if expectedContentSize > uint64(dstDatSize) {
|
||||
return fmt.Errorf("volume %s unexpected new data size: %d does not match size of content minus deleted: %d",
|
||||
v.Id.String(), dstDatSize, expectedContentSize)
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
@@ -233,6 +234,90 @@ func TestCleanupCompactRemovesTempFiles(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestCompactByIndex_DropsDanglingNeedle verifies that vacuum compaction
|
||||
// tolerates an .idx entry whose offset points past the end of the .dat file
|
||||
// (the failure mode reported in issue #8928). The bad entry should be silently
|
||||
// dropped from the resulting .cpx, while every healthy needle is preserved.
|
||||
func TestCompactByIndex_DropsDanglingNeedle(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("volume creation: %v", err)
|
||||
}
|
||||
|
||||
const goodNeedleCount = 8
|
||||
infos := make([]*needleInfo, goodNeedleCount)
|
||||
for i := 1; i <= goodNeedleCount; i++ {
|
||||
n := newRandomNeedle(uint64(i))
|
||||
_, size, _, err := v.writeNeedle2(n, true, false)
|
||||
if err != nil {
|
||||
t.Fatalf("write needle %d: %v", i, err)
|
||||
}
|
||||
infos[i-1] = &needleInfo{size: size, crc: n.Checksum}
|
||||
}
|
||||
|
||||
if err := v.DataBackend.Sync(); err != nil {
|
||||
t.Fatalf("sync .dat: %v", err)
|
||||
}
|
||||
if err := v.nm.Sync(); err != nil {
|
||||
t.Fatalf("sync .idx: %v", err)
|
||||
}
|
||||
|
||||
datSize, _, err := v.DataBackend.GetStat()
|
||||
if err != nil {
|
||||
t.Fatalf("stat .dat: %v", err)
|
||||
}
|
||||
|
||||
// Inject an .idx entry that points 1 MB past the end of the .dat. The
|
||||
// bad entry must go through nm.Put so it ends up in both the in-memory
|
||||
// map and the on-disk .idx — exactly the corruption pattern in #8928.
|
||||
badKey := types.Uint64ToNeedleId(uint64(goodNeedleCount + 100))
|
||||
badOffset := types.ToOffset(datSize + 1024*1024)
|
||||
badSize := types.Size(2048)
|
||||
if err := v.nm.Put(badKey, badOffset, badSize); err != nil {
|
||||
t.Fatalf("inject bad idx entry: %v", err)
|
||||
}
|
||||
if err := v.nm.Sync(); err != nil {
|
||||
t.Fatalf("sync .idx after inject: %v", err)
|
||||
}
|
||||
|
||||
if err := v.CompactByIndex(nil); err != nil {
|
||||
t.Fatalf("CompactByIndex should tolerate dangling entries, got: %v", err)
|
||||
}
|
||||
|
||||
// Walk the new index and confirm the dangling entry was dropped while
|
||||
// all of the original keys made it through.
|
||||
cpx, err := os.Open(filepath.Join(dir, "1.cpx"))
|
||||
if err != nil {
|
||||
t.Fatalf("open .cpx: %v", err)
|
||||
}
|
||||
defer cpx.Close()
|
||||
keptKeys := map[types.NeedleId]bool{}
|
||||
if err := idx.WalkIndexFile(cpx, 0, func(key types.NeedleId, _ types.Offset, size types.Size) error {
|
||||
if !size.IsDeleted() {
|
||||
keptKeys[key] = true
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("walk .cpx: %v", err)
|
||||
}
|
||||
if keptKeys[badKey] {
|
||||
t.Fatalf("dangling key %d should have been dropped from .cpx", badKey)
|
||||
}
|
||||
for i := 1; i <= goodNeedleCount; i++ {
|
||||
if infos[i-1].size == 0 {
|
||||
continue
|
||||
}
|
||||
k := types.Uint64ToNeedleId(uint64(i))
|
||||
if !keptKeys[k] {
|
||||
t.Fatalf("healthy key %d missing from compacted .cpx", k)
|
||||
}
|
||||
}
|
||||
|
||||
v.Close()
|
||||
}
|
||||
|
||||
func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
|
||||
n := newRandomNeedle(uint64(i))
|
||||
_, size, _, err := v.writeNeedle2(n, true, false)
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
@@ -69,7 +70,7 @@ func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCac
|
||||
WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
|
||||
CompactionTableSizeMultiplier: 10, // default value is 1
|
||||
}
|
||||
if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts, 0); err != nil {
|
||||
if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts, 0, needle.GetCurrentVersion()); err != nil {
|
||||
return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user