diff --git a/seaweed-volume/proto/master.proto b/seaweed-volume/proto/master.proto index f3253c2a4..4d112232c 100644 --- a/seaweed-volume/proto/master.proto +++ b/seaweed-volume/proto/master.proto @@ -145,6 +145,11 @@ message VolumeEcShardInformationMessage { uint64 expire_at_sec = 5; // used to record the destruction time of ec volume uint32 disk_id = 6; repeated int64 shard_sizes = 7; // optimized: sizes for shards in order of set bits in ec_index_bits + uint64 file_count = 8; // total needles in the .ecx index (live + tombstoned) + uint64 delete_count = 9; // node-local tombstones in the .ecj deletion journal + // fields 10-19 reserved for future upstream open-source additions. + // fields 20+ are owned by the enterprise fork (e.g. data_shards/parity_shards) + // and must not be used here without coordination. } message StorageBackend { diff --git a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs index 24967c04a..c7464ba97 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs @@ -3,9 +3,10 @@ //! Each EcVolume has a sorted index (.ecx) and a deletion journal (.ecj). //! Shards (.ec00-.ec13) may be distributed across multiple servers. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs::{self, File, OpenOptions}; use std::io::{self, Write}; +use std::sync::RwLock; use std::time::{SystemTime, UNIX_EPOCH}; use crate::pb::master_pb; @@ -28,6 +29,17 @@ pub struct EcVolume { ecx_file: Option, ecx_file_size: i64, ecj_file: Option, + /// On-disk size of the .ecj deletion journal. Used only by IO helpers + /// (seek / set_len on partial writes) — the authoritative runtime + /// delete count comes from `deleted_needles.len()`. + ecj_file_size: i64, + /// In-memory set of needle ids that have been deleted since the volume + /// was encoded. .ecx is immutable at runtime — it only stores the + /// sorted (id, offset, size) index written at encode time — and runtime + /// deletes are journaled to .ecj + tracked here. Reads consult this + /// set to mask out deleted needles on top of the sealed .ecx lookup. + /// Seeded from .ecj in `new()` and updated by `journal_delete`. + deleted_needles: RwLock>, pub disk_type: DiskType, /// Directory where .ecx/.ecj were actually found (may differ from dir_idx after fallback). ecx_actual_dir: String, @@ -112,6 +124,8 @@ impl EcVolume { ecx_file: None, ecx_file_size: 0, ecj_file: None, + ecj_file_size: 0, + deleted_needles: RwLock::new(HashSet::new()), disk_type: DiskType::default(), ecx_actual_dir: dir_idx.to_string(), shard_locations: HashMap::new(), @@ -141,10 +155,12 @@ impl EcVolume { } } - // Replay .ecj journal into .ecx on startup (matches Go's RebuildEcxFile). - vol.rebuild_ecx_from_journal()?; - - // Open .ecj file (deletion journal) — use ecx_actual_dir for consistency + // Open .ecj file (deletion journal) — use ecx_actual_dir for consistency. + // Note: Go does NOT replay .ecj into .ecx at volume load (RebuildEcxFile + // is only invoked from specific decode/rebuild gRPC handlers), so we + // don't either. Tombstones from prior sessions were already written + // in-place in .ecx, and the journal grows monotonically until a + // decode/rebuild operation folds it in. let ecj_base = crate::storage::volume::volume_file_name(&vol.ecx_actual_dir, collection, volume_id); let ecj_path = format!("{}.ecj", ecj_base); @@ -154,11 +170,75 @@ impl EcVolume { .create(true) .append(true) .open(&ecj_path)?; + vol.ecj_file_size = ecj_file.metadata()?.len() as i64; vol.ecj_file = Some(ecj_file); + // Seed the in-memory deleted set from the journal. + vol.load_deleted_needles_from_ecj()?; + Ok(vol) } + /// Walk the .ecj journal and populate `deleted_needles`. Called once + /// from `new()` under exclusive ownership of the just-constructed + /// EcVolume, so locking is not strictly required — but we take the + /// write lock anyway for symmetry with later mutations. + fn load_deleted_needles_from_ecj(&mut self) -> io::Result<()> { + let ecj_file = match self.ecj_file.as_ref() { + Some(f) => f, + None => return Ok(()), + }; + if self.ecj_file_size < NEEDLE_ID_SIZE as i64 { + return Ok(()); + } + let mut buf = [0u8; NEEDLE_ID_SIZE]; + let mut set = self + .deleted_needles + .write() + .map_err(|_| io::Error::new(io::ErrorKind::Other, "deleted_needles lock poisoned"))?; + let mut off: i64 = 0; + while off + NEEDLE_ID_SIZE as i64 <= self.ecj_file_size { + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + ecj_file.read_exact_at(&mut buf, off as u64)?; + } + set.insert(NeedleId::from_bytes(&buf)); + off += NEEDLE_ID_SIZE as i64; + } + Ok(()) + } + + /// Returns (file_count, delete_count) for this EC volume. Mirrors Go's + /// `EcVolume.FileAndDeleteCount`: + /// + /// file_count = ecx_file_size / NEEDLE_MAP_ENTRY_SIZE — total + /// entries in the sealed sorted .ecx index. + /// delete_count = deleted_needles.len() — unique + /// runtime deletes tracked in memory (seeded from + /// .ecj on load and updated by `journal_delete`). + /// + /// Because each needle delete is applied on exactly one shard holder, + /// the admin aggregation sums delete_count across nodes while taking + /// file_count from a single holder (they are identical per volume). + pub fn file_and_delete_count(&self) -> (u64, u64) { + let file_count = (self.ecx_file_size as u64) / (NEEDLE_MAP_ENTRY_SIZE as u64); + let delete_count = self + .deleted_needles + .read() + .map(|s| s.len() as u64) + .unwrap_or(0); + (file_count, delete_count) + } + + /// Reports whether the given needle id is in the in-memory deleted set. + pub fn is_needle_deleted(&self, needle_id: NeedleId) -> bool { + self.deleted_needles + .read() + .map(|s| s.contains(&needle_id)) + .unwrap_or(false) + } + // ---- File names ---- #[allow(dead_code)] @@ -166,8 +246,16 @@ impl EcVolume { crate::storage::volume::volume_file_name(&self.dir, &self.collection, self.volume_id) } + /// Base path for the .ecx / .ecj index pair. Resolved from + /// `ecx_actual_dir` (initialized to `dir_idx` and only updated after a + /// successful idx-dir → data-dir fallback in `new()`), so every call site + /// agrees on the same file regardless of whether the fallback fired. fn idx_base_name(&self) -> String { - crate::storage::volume::volume_file_name(&self.dir_idx, &self.collection, self.volume_id) + crate::storage::volume::volume_file_name( + &self.ecx_actual_dir, + &self.collection, + self.volume_id, + ) } pub fn ecx_file_name(&self) -> String { @@ -255,6 +343,8 @@ impl EcVolume { return Vec::new(); } + let (file_count, delete_count) = self.file_and_delete_count(); + vec![master_pb::VolumeEcShardInformationMessage { id: self.volume_id.0, collection: self.collection.clone(), @@ -263,6 +353,8 @@ impl EcVolume { disk_type: self.disk_type.to_string(), expire_at_sec: self.expire_at_sec, disk_id, + file_count, + delete_count, ..Default::default() }] } @@ -313,6 +405,13 @@ impl EcVolume { let (key, offset, size) = idx_entry_from_bytes(&entry_buf); if key == needle_id { + // Apply runtime deletion state on top of the sealed .ecx + // lookup: a needle in the in-memory deleted set is + // reported with TOMBSTONE_FILE_SIZE even though the .ecx + // record itself is untouched. + if self.is_needle_deleted(needle_id) { + return Ok(Some((offset, TOMBSTONE_FILE_SIZE))); + } return Ok(Some((offset, size))); } else if key < needle_id { lo = mid + 1; @@ -570,38 +669,41 @@ impl EcVolume { // ---- Deletion ---- - /// Mark a needle as deleted in the .ecx file in-place. - /// Matches Go's MarkNeedleDeleted: binary search the .ecx, then overwrite - /// the size field with TOMBSTONE_FILE_SIZE. - fn mark_needle_deleted_in_ecx(&self, needle_id: NeedleId) -> io::Result { - let ecx_file = match self.ecx_file.as_ref() { - Some(f) => f, - None => return Ok(false), - }; + /// Write `TOMBSTONE_FILE_SIZE` over the Size field of an existing .ecx + /// entry, matching Go's `MarkNeedleDeleted`. Only used by the offline + /// `rebuild_ecx_from_journal` path — the runtime delete path does not + /// touch .ecx because the index is treated as an immutable sorted + /// (id, offset, size) table. Returns `false` if the needle is not in + /// the index (ignored by callers) and an error on IO failure. + fn tombstone_ecx_entry(&self, needle_id: NeedleId) -> io::Result { + let ecx_file = self.ecx_file.as_ref().ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + format!( + "ec volume {} has no open .ecx file (closed or corrupt)", + self.volume_id.0 + ), + ) + })?; let entry_count = self.ecx_file_size as usize / NEEDLE_MAP_ENTRY_SIZE; if entry_count == 0 { return Ok(false); } - // Binary search for the needle let mut lo: usize = 0; let mut hi: usize = entry_count; let mut entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE]; - while lo < hi { let mid = lo + (hi - lo) / 2; let file_offset = (mid * NEEDLE_MAP_ENTRY_SIZE) as u64; - #[cfg(unix)] { use std::os::unix::fs::FileExt; ecx_file.read_exact_at(&mut entry_buf, file_offset)?; } - - let (key, _offset, _size) = idx_entry_from_bytes(&entry_buf); + let (key, _offset, _old_size) = idx_entry_from_bytes(&entry_buf); if key == needle_id { - // Found — overwrite the size field with TOMBSTONE_FILE_SIZE let size_offset = file_offset + NEEDLE_ID_SIZE as u64 + OFFSET_SIZE as u64; let mut size_buf = [0u8; SIZE_SIZE]; TOMBSTONE_FILE_SIZE.to_bytes(&mut size_buf); @@ -617,13 +719,18 @@ impl EcVolume { hi = mid; } } - - Ok(false) // not found + Ok(false) } - /// Replay .ecj journal entries into .ecx on startup. - /// Matches Go's RebuildEcxFile: for each needle ID in .ecj, marks it - /// deleted in .ecx, then removes the .ecj file. + /// Replay .ecj journal entries into .ecx: for each needle id in .ecj, + /// overwrite its .ecx size field with a tombstone, then remove the + /// journal file. Mirrors Go's `RebuildEcxFile`, which is invoked from + /// specific decode / rebuild gRPC handlers — it is intentionally + /// **not** called on volume load (runtime reads consult + /// `deleted_needles` instead). The rebuild is atomic with respect to + /// the journal: if any individual write fails the .ecj file is left + /// in place and the error is propagated so tombstones are not lost. + #[allow(dead_code)] fn rebuild_ecx_from_journal(&mut self) -> io::Result<()> { let ecj_path = self.ecj_file_name(); if !std::path::Path::new(&ecj_path).exists() { @@ -642,14 +749,23 @@ impl EcVolume { break; } let needle_id = NeedleId::from_bytes(&data[start..start + NEEDLE_ID_SIZE]); - // Errors for individual entries are non-fatal (needle may not exist in .ecx) - let _ = self.mark_needle_deleted_in_ecx(needle_id); + // A needle that never made it into .ecx is fine (e.g. the + // delete raced against encode). Any other IO error aborts the + // rebuild so the journal survives to be retried later. + self.tombstone_ecx_entry(needle_id)?; } - // Remove the .ecj file after replay (matches Go) - let _ = fs::remove_file(&ecj_path); + // Durably flush the newly-written .ecx tombstones before dropping + // the journal: the writes went through write_all_at and may still + // be in page cache. + if let Some(ref ecx_file) = self.ecx_file { + ecx_file.sync_all()?; + } - // Re-create .ecj for future deletions + // Fold successful — drop and recreate the journal, clear the + // in-memory deleted set (all of its contents are now materialized + // in .ecx), and reset the cached size. + fs::remove_file(&ecj_path)?; let ecj_file = OpenOptions::new() .read(true) .write(true) @@ -657,27 +773,126 @@ impl EcVolume { .append(true) .open(&ecj_path)?; self.ecj_file = Some(ecj_file); + self.ecj_file_size = 0; + if let Ok(mut set) = self.deleted_needles.write() { + set.clear(); + } Ok(()) } // ---- Deletion journal ---- - /// Append a deleted needle ID to the .ecj journal and mark in .ecx. - /// Matches Go's DeleteNeedleFromEcx: marks in .ecx first, then journals. + /// Record a needle delete: append the id to the .ecj deletion journal + /// and insert it into the in-memory deleted set. `.ecx` is not touched + /// at runtime — it is a sealed sorted (id, offset, size) index and + /// runtime deletion state lives exclusively in .ecj + `deleted_needles`. + /// A lookup via `find_needle_from_ecx` masks the id out by returning + /// `TOMBSTONE_FILE_SIZE` on a subsequent read. + /// + /// The .ecj append is the durable commit point. On any failure the + /// file is truncated back to the pre-append length so the on-disk + /// journal and in-memory state cannot drift. Only after the sync + /// succeeds is the id published into the set, so a failure leaves + /// the delete invisible to readers. pub fn journal_delete(&mut self, needle_id: NeedleId) -> io::Result<()> { - // Mark deleted in .ecx in-place (matches Go's MarkNeedleDeleted) - let _ = self.mark_needle_deleted_in_ecx(needle_id); - let ecj_file = self - .ecj_file - .as_mut() - .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "ecj file not open"))?; + // Look the needle up read-only. Missing is a silent no-op; a + // pre-existing .ecx tombstone (from a prior decode/rebuild) is + // mirrored into the in-memory set so delete_count stays accurate + // without needing to walk .ecx on every heartbeat. + match self.find_needle_from_ecx_raw(needle_id)? { + None => return Ok(()), + Some((_, size)) if size.is_deleted() => { + if let Ok(mut set) = self.deleted_needles.write() { + set.insert(needle_id); + } + return Ok(()); + } + Some(_) => {} + } - let mut buf = [0u8; NEEDLE_ID_SIZE]; - needle_id.to_bytes(&mut buf); - ecj_file.write_all(&buf)?; - ecj_file.sync_all()?; - Ok(()) + // Idempotent fast path for repeat deletes — avoids the journal + // append entirely so the derived delete_count stays stable. + if self.is_needle_deleted(needle_id) { + return Ok(()); + } + + let prev_ecj_size = self.ecj_file_size; + let append_result: io::Result<()> = { + let ecj_file = self + .ecj_file + .as_mut() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "ecj file not open"))?; + let mut buf = [0u8; NEEDLE_ID_SIZE]; + needle_id.to_bytes(&mut buf); + ecj_file + .write_all(&buf) + .and_then(|_| ecj_file.sync_all()) + }; + + match append_result { + Ok(()) => { + self.ecj_file_size += NEEDLE_ID_SIZE as i64; + if let Ok(mut set) = self.deleted_needles.write() { + set.insert(needle_id); + } + Ok(()) + } + Err(e) => { + // write_all may have extended the file on disk before + // sync_all failed; truncate back to the known-good size so + // the on-disk journal never drifts past `deleted_needles`. + if let Some(ecj) = self.ecj_file.as_mut() { + if let Err(trunc_err) = ecj.set_len(prev_ecj_size as u64) { + tracing::error!( + volume_id = self.volume_id.0, + needle_id = needle_id.0, + truncate_error = %trunc_err, + "failed to truncate ecj after append failure" + ); + } + } + Err(e) + } + } + } + + /// Internal: binary search .ecx without masking by `deleted_needles`. + /// Used by `journal_delete` so a repeat delete can still see the raw + /// pre-existing .ecx tombstone from a prior rebuild. + fn find_needle_from_ecx_raw( + &self, + needle_id: NeedleId, + ) -> io::Result> { + let ecx_file = self + .ecx_file + .as_ref() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "ecx file not open"))?; + let entry_count = self.ecx_file_size as usize / NEEDLE_MAP_ENTRY_SIZE; + if entry_count == 0 { + return Ok(None); + } + let mut lo: usize = 0; + let mut hi: usize = entry_count; + let mut entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE]; + while lo < hi { + let mid = lo + (hi - lo) / 2; + let file_offset = (mid * NEEDLE_MAP_ENTRY_SIZE) as u64; + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + ecx_file.read_exact_at(&mut entry_buf, file_offset)?; + } + let (key, offset, size) = idx_entry_from_bytes(&entry_buf); + if key == needle_id { + return Ok(Some((offset, size))); + } else if key < needle_id { + lo = mid + 1; + } else { + hi = mid; + } + } + Ok(None) } /// Append a deleted needle ID to the .ecj journal, validating the cookie first. @@ -778,15 +993,17 @@ impl EcVolume { let _ = fs::remove_file(format!("{}.ecx", actual_base)); let _ = fs::remove_file(format!("{}.ecj", actual_base)); let _ = fs::remove_file(format!("{}.vif", actual_base)); - // Also try the configured idx dir and data dir in case files exist in either + // Also sweep the originally-configured idx dir in case stale files + // exist there (ecx_file_name() / ecj_file_name() now resolve from + // ecx_actual_dir, so we have to build the idx-dir paths explicitly). if self.ecx_actual_dir != self.dir_idx { - let _ = fs::remove_file(self.ecx_file_name()); - let _ = fs::remove_file(self.ecj_file_name()); let idx_base = crate::storage::volume::volume_file_name( &self.dir_idx, &self.collection, self.volume_id, ); + let _ = fs::remove_file(format!("{}.ecx", idx_base)); + let _ = fs::remove_file(format!("{}.ecj", idx_base)); let _ = fs::remove_file(format!("{}.vif", idx_base)); } if self.ecx_actual_dir != self.dir && self.dir_idx != self.dir { @@ -859,16 +1076,33 @@ mod tests { let tmp = TempDir::new().unwrap(); let dir = tmp.path().to_str().unwrap(); - // Need ecx file for EcVolume::new to succeed - write_ecx_file(dir, "", VolumeId(1), &[]); + // .ecj append is gated on a live->tombstone transition in .ecx, so + // the fixture must contain the needles we are about to delete. + let entries = vec![ + (NeedleId(10), Offset::from_actual_offset(8), Size(100)), + (NeedleId(20), Offset::from_actual_offset(200), Size(200)), + ]; + write_ecx_file(dir, "", VolumeId(1), &entries); let mut vol = EcVolume::new(dir, dir, "", VolumeId(1)).unwrap(); + let (fc0, dc0) = vol.file_and_delete_count(); + assert_eq!((fc0, dc0), (2, 0)); vol.journal_delete(NeedleId(10)).unwrap(); vol.journal_delete(NeedleId(20)).unwrap(); let deleted = vol.read_deleted_needles().unwrap(); assert_eq!(deleted, vec![NeedleId(10), NeedleId(20)]); + + let (fc, dc) = vol.file_and_delete_count(); + assert_eq!((fc, dc), (2, 2)); + + // Idempotent re-delete must not bump delete_count. + vol.journal_delete(NeedleId(10)).unwrap(); + // Deleting a missing needle must not bump delete_count either. + vol.journal_delete(NeedleId(999)).unwrap(); + let (fc, dc) = vol.file_and_delete_count(); + assert_eq!((fc, dc), (2, 2)); } #[test] diff --git a/test/erasure_coding/admin_dockertest/ec_integration_test.go b/test/erasure_coding/admin_dockertest/ec_integration_test.go index 1bd1b2095..b17924774 100644 --- a/test/erasure_coding/admin_dockertest/ec_integration_test.go +++ b/test/erasure_coding/admin_dockertest/ec_integration_test.go @@ -12,6 +12,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "testing" "time" ) @@ -22,10 +23,16 @@ const ( FilerUrl = "http://localhost:8888" ) -// Helper to run commands in background and track PIDs for cleanup -var runningCmds []*exec.Cmd +// Helper to run commands in background and track PIDs for cleanup. Guarded +// by runningCmdsLock so parallel subprocess startup can append safely. +var ( + runningCmds []*exec.Cmd + runningCmdsLock sync.Mutex +) func cleanup() { + runningCmdsLock.Lock() + defer runningCmdsLock.Unlock() for _, cmd := range runningCmds { if cmd.Process != nil { cmd.Process.Kill() @@ -59,7 +66,9 @@ func startWeed(t *testing.T, name string, args ...string) *exec.Cmd { if err != nil { t.Fatalf("Failed to start weed %v: %v", args, err) } + runningCmdsLock.Lock() runningCmds = append(runningCmds, cmd) + runningCmdsLock.Unlock() return cmd } @@ -107,14 +116,23 @@ func ensureEnvironment(t *testing.T) { waitForUrl(t, MasterUrl+"/cluster/status", 10) // 3. Start Volume Server (Worker) - // Start 14 volume servers to verify RS(10,4) default EC + // Start 14 volume servers to verify RS(10,4) default EC. Fork/exec in + // parallel because startWeed is non-blocking and the per-process fork + + // mkdir + log-file-open overhead stacks up sequentially on cold CI + // disks, eating most of the admin /health wait budget further down. + var volWg sync.WaitGroup for i := 1; i <= 14; i++ { - volName := fmt.Sprintf("volume%d", i) - port := 8080 + i - 1 - dir := filepath.Join("tmp", volName) - os.MkdirAll(dir, 0755) - startWeed(t, volName, "volume", "-dir="+dir, "-mserver=localhost:9333", fmt.Sprintf("-port=%d", port), "-ip=localhost") + volWg.Add(1) + go func(i int) { + defer volWg.Done() + volName := fmt.Sprintf("volume%d", i) + port := 8080 + i - 1 + dir := filepath.Join("tmp", volName) + os.MkdirAll(dir, 0755) + startWeed(t, volName, "volume", "-dir="+dir, "-mserver=localhost:9333", fmt.Sprintf("-port=%d", port), "-ip=localhost") + }(i) } + volWg.Wait() // 4. Start Filer os.MkdirAll(filepath.Join("tmp", "filer"), 0755) @@ -136,7 +154,12 @@ func ensureEnvironment(t *testing.T) { os.RemoveAll(filepath.Join("tmp", "admin")) os.MkdirAll(filepath.Join("tmp", "admin"), 0755) startWeed(t, "admin", "admin", "-master=localhost:9333", "-port=23646", "-dataDir=./tmp/admin") - waitForUrl(t, AdminUrl+"/health", 60) + // Admin is started after master, 14 volume servers, filer and 2 workers, + // so under cold CI conditions the wait here has to absorb the tail of + // every earlier subprocess coming up. 60s is too tight and has flaked; + // 180s gives comfortable headroom without meaningfully extending the + // fast path (the first successful /health usually hits well under 30s). + waitForUrl(t, AdminUrl+"/health", 180) t.Log("Environment started successfully") } diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index ef7750348..2516abcf9 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -1672,8 +1672,21 @@ type collectionStats struct { FileCount int64 } +// ecVolumeCounts is used to correctly combine EC volume counts reported by +// multiple nodes. Every node holding any shard of an EC volume reports the +// same file_count (total entries in the replicated .ecx), so we dedupe it +// per volume id. In contrast, a needle delete is applied on exactly one +// shard holder, so each node reports its own local tombstone count and the +// true delete total is the sum across nodes. +type ecVolumeCounts struct { + collection string + fileCount uint64 + deleteCount uint64 +} + func collectCollectionStats(topologyInfo *master_pb.TopologyInfo) map[string]collectionStats { collectionMap := make(map[string]collectionStats) + ecVolumeAgg := make(map[uint32]*ecVolumeCounts) for _, dc := range topologyInfo.DataCenterInfos { for _, rack := range dc.RackInfos { for _, node := range rack.DataNodeInfos { @@ -1709,11 +1722,36 @@ func collectCollectionStats(topologyInfo *master_pb.TopologyInfo) map[string]col data.PhysicalSize += int64(shards.TotalSize()) data.LogicalSize += int64(shards.MinusParityShards().TotalSize()) collectionMap[collection] = data + + agg, ok := ecVolumeAgg[ecShardInfo.Id] + if !ok { + agg = &ecVolumeCounts{collection: collection, fileCount: ecShardInfo.FileCount} + ecVolumeAgg[ecShardInfo.Id] = agg + } + agg.deleteCount += ecShardInfo.DeleteCount } } } } } + + // Fold EC per-volume counts into the collection totals. fileCount is + // already deduped (set once per volume), deleteCount is the sum of + // local tombstones across every node holding shards of the volume. + for vid, agg := range ecVolumeAgg { + data := collectionMap[agg.collection] + if agg.fileCount >= agg.deleteCount { + data.FileCount += int64(agg.fileCount - agg.deleteCount) + } else { + // Should not happen in steady state — indicates a node reporting + // a stale fileCount, a skewed heartbeat, or a delete-counter bug. + // Defend the UI by skipping the add and surface the anomaly. + glog.Warningf("ec volume %d in collection %q: summed delete_count=%d exceeds file_count=%d; skipping object count", + vid, agg.collection, agg.deleteCount, agg.fileCount) + } + collectionMap[agg.collection] = data + } + return collectionMap } diff --git a/weed/admin/dash/collect_collection_stats_test.go b/weed/admin/dash/collect_collection_stats_test.go index 3cceab33c..f390b35e0 100644 --- a/weed/admin/dash/collect_collection_stats_test.go +++ b/weed/admin/dash/collect_collection_stats_test.go @@ -122,3 +122,65 @@ func TestCollectCollectionStatsECEmptyCollection(t *testing.T) { t.Errorf("LogicalSize: got %d, want 2000", got.LogicalSize) } } + +// TestCollectCollectionStatsECFileAndDeleteCountAggregation verifies that +// FileCount for an EC volume is deduped across nodes (every shard holder has +// an identical .ecx, so the total entry count is taken once) while +// DeleteCount is summed (each needle delete tombstones exactly one node's +// .ecx, so the true delete total is the sum of every holder's local count). +func TestCollectCollectionStatsECFileAndDeleteCountAggregation(t *testing.T) { + // Volume id=7 reported by three nodes. Every node reports file_count=100 + // (same .ecx). Local delete counts: 5 + 3 + 2 = 10 deletes total. + // Expected live object count = 100 - 10 = 90. + makeNode := func(bits uint32, sizes []int64, deleteCount uint64) *master_pb.DataNodeInfo { + return &master_pb.DataNodeInfo{ + DiskInfos: map[string]*master_pb.DiskInfo{ + "disk1": { + EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{ + { + Id: 7, + Collection: "bucket-a", + EcIndexBits: bits, + ShardSizes: sizes, + FileCount: 100, + DeleteCount: deleteCount, + }, + }, + }, + }, + } + } + + topo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + RackInfos: []*master_pb.RackInfo{ + { + DataNodeInfos: []*master_pb.DataNodeInfo{ + makeNode((1<<0)|(1<<1)|(1<<2)|(1<<3), []int64{1000, 1000, 1000, 1000}, 5), + makeNode((1<<4)|(1<<5)|(1<<6)|(1<<7), []int64{1000, 1000, 1000, 1000}, 3), + makeNode((1<<8)|(1<<9)|(1<<10)|(1<<11)|(1<<12)|(1<<13), []int64{1000, 1000, 1000, 1000, 1000, 1000}, 2), + }, + }, + }, + }, + }, + } + + stats := collectCollectionStats(topo) + + got, ok := stats["bucket-a"] + if !ok { + t.Fatalf("expected collection bucket-a in stats, got: %v", stats) + } + if got.FileCount != 90 { + t.Errorf("FileCount: got %d, want 90 (100 total - 10 deletes summed)", got.FileCount) + } + // Sanity: 14 shards × 1000 bytes physical, 10 data shards logical. + if got.PhysicalSize != 14000 { + t.Errorf("PhysicalSize: got %d, want 14000", got.PhysicalSize) + } + if got.LogicalSize != 10000 { + t.Errorf("LogicalSize: got %d, want 10000", got.LogicalSize) + } +} diff --git a/weed/pb/master.proto b/weed/pb/master.proto index c127bafb1..57fa1f268 100644 --- a/weed/pb/master.proto +++ b/weed/pb/master.proto @@ -145,6 +145,11 @@ message VolumeEcShardInformationMessage { uint64 expire_at_sec = 5; // used to record the destruction time of ec volume uint32 disk_id = 6; repeated int64 shard_sizes = 7; // optimized: sizes for shards in order of set bits in ec_index_bits + uint64 file_count = 8; // total needles in the .ecx index (live + tombstoned) + uint64 delete_count = 9; // node-local tombstones in the .ecj deletion journal + // fields 10-19 reserved for future upstream open-source additions. + // fields 20+ are owned by the enterprise fork (e.g. data_shards/parity_shards) + // and must not be used here without coordination. } message StorageBackend { diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go index 24d1ab64f..eb47afb5d 100644 --- a/weed/pb/master_pb/master.pb.go +++ b/weed/pb/master_pb/master.pb.go @@ -639,6 +639,8 @@ type VolumeEcShardInformationMessage struct { ExpireAtSec uint64 `protobuf:"varint,5,opt,name=expire_at_sec,json=expireAtSec,proto3" json:"expire_at_sec,omitempty"` // used to record the destruction time of ec volume DiskId uint32 `protobuf:"varint,6,opt,name=disk_id,json=diskId,proto3" json:"disk_id,omitempty"` ShardSizes []int64 `protobuf:"varint,7,rep,packed,name=shard_sizes,json=shardSizes,proto3" json:"shard_sizes,omitempty"` // optimized: sizes for shards in order of set bits in ec_index_bits + FileCount uint64 `protobuf:"varint,8,opt,name=file_count,json=fileCount,proto3" json:"file_count,omitempty"` // total needles in the .ecx index (live + tombstoned) + DeleteCount uint64 `protobuf:"varint,9,opt,name=delete_count,json=deleteCount,proto3" json:"delete_count,omitempty"` // node-local tombstones in the .ecj deletion journal unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -722,6 +724,20 @@ func (x *VolumeEcShardInformationMessage) GetShardSizes() []int64 { return nil } +func (x *VolumeEcShardInformationMessage) GetFileCount() uint64 { + if x != nil { + return x.FileCount + } + return 0 +} + +func (x *VolumeEcShardInformationMessage) GetDeleteCount() uint64 { + if x != nil { + return x.DeleteCount + } + return 0 +} + type StorageBackend struct { state protoimpl.MessageState `protogen:"open.v1"` Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` @@ -4423,7 +4439,7 @@ const file_master_proto_rawDesc = "" + "\x03ttl\x18\n" + " \x01(\rR\x03ttl\x12\x1b\n" + "\tdisk_type\x18\x0f \x01(\tR\bdiskType\x12\x17\n" + - "\adisk_id\x18\x10 \x01(\rR\x06diskId\"\xf0\x01\n" + + "\adisk_id\x18\x10 \x01(\rR\x06diskId\"\xb2\x02\n" + "\x1fVolumeEcShardInformationMessage\x12\x0e\n" + "\x02id\x18\x01 \x01(\rR\x02id\x12\x1e\n" + "\n" + @@ -4434,7 +4450,10 @@ const file_master_proto_rawDesc = "" + "\rexpire_at_sec\x18\x05 \x01(\x04R\vexpireAtSec\x12\x17\n" + "\adisk_id\x18\x06 \x01(\rR\x06diskId\x12\x1f\n" + "\vshard_sizes\x18\a \x03(\x03R\n" + - "shardSizes\"\xbe\x01\n" + + "shardSizes\x12\x1d\n" + + "\n" + + "file_count\x18\b \x01(\x04R\tfileCount\x12!\n" + + "\fdelete_count\x18\t \x01(\x04R\vdeleteCount\"\xbe\x01\n" + "\x0eStorageBackend\x12\x12\n" + "\x04type\x18\x01 \x01(\tR\x04type\x12\x0e\n" + "\x02id\x18\x02 \x01(\tR\x02id\x12I\n" + diff --git a/weed/storage/erasure_coding/ec_decoder_test.go b/weed/storage/erasure_coding/ec_decoder_test.go index f1f54bb25..31aeeb3e3 100644 --- a/weed/storage/erasure_coding/ec_decoder_test.go +++ b/weed/storage/erasure_coding/ec_decoder_test.go @@ -482,86 +482,98 @@ func TestEcxFileDeletionWithSeparateHandles(t *testing.T) { } } -// TestEcVolumeSyncEnsuresDeletionsVisible is an integration test for issue #7751 -// that verifies the EcVolume.Sync() method ensures deleted needles are visible -// to external readers (like ec.decode's CopyFile). -func TestEcVolumeSyncEnsuresDeletionsVisible(t *testing.T) { +// TestEcVolumeDeleteDurableToJournal tracks issue #7751: a runtime needle +// delete must be observable by the ec.decode CopyFile path. Under the +// current design .ecx is an immutable sealed index at runtime — deletes +// are journaled to .ecj and tracked in an in-memory set — so the +// durability chain decode relies on is: +// +// 1. DeleteNeedleFromEcx appends the needle id to .ecj and fsyncs it. +// 2. Runtime reads via FindNeedleFromEcx consult the in-memory set and +// return TombstoneFileSize even though the sealed .ecx record on +// disk still shows the original size. +// 3. ec.decode later closes the EcVolume and calls RebuildEcxFile on +// the now-quiescent files, which walks .ecj and writes tombstones +// into .ecx. CopyFile then reads the rebuilt .ecx. +// +// This test exercises the full chain on a tempdir fixture. +func TestEcVolumeDeleteDurableToJournal(t *testing.T) { dir := t.TempDir() collection := "test" vid := 1 base := filepath.Join(dir, collection+"_1") - // Create initial .ecx file with live needle + // Seed .ecx with two live needles. needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100)) needle2 := makeNeedleMapEntry(types.NeedleId(2), types.ToOffset(128), types.Size(200)) ecxData := append(needle1, needle2...) if err := os.WriteFile(base+".ecx", ecxData, 0644); err != nil { t.Fatalf("write ecx: %v", err) } - - // Create empty .ecj file if err := os.WriteFile(base+".ecj", []byte{}, 0644); err != nil { t.Fatalf("write ecj: %v", err) } - - // Create minimal EC shard file to allow EcVolume creation - // Shards need super block header (8 bytes) - shardData := make([]byte, 8) - if err := os.WriteFile(base+".ec00", shardData, 0644); err != nil { + if err := os.WriteFile(base+".ec00", make([]byte, 8), 0644); err != nil { t.Fatalf("write ec00: %v", err) } - - // Create .vif file if err := os.WriteFile(base+".vif", []byte{}, 0644); err != nil { t.Fatalf("write vif: %v", err) } - // Create EcVolume - ecVolume, err := erasure_coding.NewEcVolume( - "hdd", - dir, // data dir - dir, // idx dir - collection, - needle.VolumeId(vid), - ) + ecVolume, err := erasure_coding.NewEcVolume("hdd", dir, dir, collection, needle.VolumeId(vid)) if err != nil { t.Fatalf("NewEcVolume: %v", err) } - defer ecVolume.Close() - // Delete needle 2 via EcVolume (this writes to .ecx and .ecj) + // Runtime delete must not mutate .ecx. if err := ecVolume.DeleteNeedleFromEcx(types.NeedleId(2)); err != nil { t.Fatalf("DeleteNeedleFromEcx: %v", err) } - // Before Sync: open a new reader handle to simulate CopyFile - readerBefore, err := os.OpenFile(base+".ecx", os.O_RDONLY, 0644) + // FindNeedleFromEcx masks the id via the in-memory set. + _, size, err := ecVolume.FindNeedleFromEcx(types.NeedleId(2)) if err != nil { - t.Fatalf("open ecx for read (before sync): %v", err) + t.Fatalf("FindNeedleFromEcx(2): %v", err) } - defer readerBefore.Close() - - // Now call Sync() - this is what fixes issue #7751 - ecVolume.Sync() - - // After Sync: open another reader handle - readerAfter, err := os.OpenFile(base+".ecx", os.O_RDONLY, 0644) - if err != nil { - t.Fatalf("open ecx for read (after sync): %v", err) - } - defer readerAfter.Close() - - // Read needle 2's entry from the after-sync handle - entrySize := types.NeedleIdSize + types.OffsetSize + types.SizeSize - data := make([]byte, entrySize) - if _, err := readerAfter.ReadAt(data, int64(entrySize)); err != nil { - t.Fatalf("ReadAt after sync: %v", err) - } - - // Verify needle 2 is marked as deleted - size := types.BytesToSize(data[types.NeedleIdSize+types.OffsetSize:]) if !size.IsDeleted() { - t.Fatalf("expected needle 2 to be deleted after Sync(), got size: %d", size) + t.Fatalf("expected FindNeedleFromEcx to return tombstone for deleted needle, got size=%d", size) + } + + // Direct .ecx reader should still see the original size — .ecx is + // immutable at runtime. + entrySize := int64(types.NeedleIdSize + types.OffsetSize + types.SizeSize) + rawBuf := make([]byte, entrySize) + rawReader, err := os.Open(base + ".ecx") + if err != nil { + t.Fatalf("open ecx raw: %v", err) + } + if _, err := rawReader.ReadAt(rawBuf, entrySize); err != nil { + t.Fatalf("read raw ecx entry: %v", err) + } + rawReader.Close() + rawSize := types.BytesToSize(rawBuf[types.NeedleIdSize+types.OffsetSize:]) + if rawSize.IsDeleted() { + t.Fatalf("runtime delete must not mutate .ecx on disk; got tombstone in raw entry") + } + + // Close the volume so RebuildEcxFile can operate on the files, then + // fold .ecj into .ecx as the decode path does and verify the rebuilt + // index has the tombstone visible to external readers. + ecVolume.Close() + if err := erasure_coding.RebuildEcxFile(base); err != nil { + t.Fatalf("RebuildEcxFile: %v", err) + } + rebuilt, err := os.Open(base + ".ecx") + if err != nil { + t.Fatalf("open rebuilt ecx: %v", err) + } + defer rebuilt.Close() + if _, err := rebuilt.ReadAt(rawBuf, entrySize); err != nil { + t.Fatalf("read rebuilt ecx entry: %v", err) + } + rebuiltSize := types.BytesToSize(rawBuf[types.NeedleIdSize+types.OffsetSize:]) + if !rebuiltSize.IsDeleted() { + t.Fatalf("expected needle 2 to be tombstoned in rebuilt .ecx, got size=%d", rebuiltSize) } } diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 0061f1222..2bf96d0d8 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -43,6 +43,22 @@ type EcVolume struct { datFileSize int64 ExpireAtSec uint64 //ec volume destroy time, calculated from the ec volume was created ECContext *ECContext // EC encoding parameters + + // ecjFileSize mirrors the on-disk size of the .ecj deletion journal and + // is maintained under ecjFileAccessLock. It is only used by IO helpers + // (seek/truncate) — the authoritative runtime delete count comes from + // deletedNeedles. + ecjFileSize int64 + + // deletedNeedles is the in-memory set of needle ids that have been + // deleted since the volume was encoded. .ecx is immutable at runtime — + // it only stores the sorted (id, offset, size) index written at encode + // time — and runtime deletes are journaled to .ecj + tracked here. + // Reads consult this set to mask out deleted needles on top of the + // sealed .ecx lookup. Heartbeat delete_count is derived from len(set). + // Seeded from .ecj in NewEcVolume and updated under deletedNeedlesLock. + deletedNeedlesLock sync.RWMutex + deletedNeedles map[types.NeedleId]struct{} } func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { @@ -75,10 +91,19 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection ev.ecxFileSize = ecxFi.Size() ev.ecxCreatedAt = ecxFi.ModTime() - // open ecj file + // open ecj file and seed the in-memory deleted set from it. if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil { return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err) } + if ecjFi, statErr := ev.ecjFile.Stat(); statErr == nil { + ev.ecjFileSize = ecjFi.Size() + } else { + glog.Warningf("stat ec volume journal %s.ecj: %v", indexBaseFileName, statErr) + } + ev.deletedNeedles = make(map[types.NeedleId]struct{}) + if loadErr := ev.loadDeletedNeedlesFromEcj(); loadErr != nil { + glog.Warningf("ec volume %d: load deleted needles from .ecj: %v", vid, loadErr) + } // read volume info ev.Version = needle.Version3 @@ -254,6 +279,8 @@ func (ev *EcVolume) ShardIdList() (shardIds []ShardId) { func (ev *EcVolume) ToVolumeEcShardInformationMessage(diskId uint32) (messages []*master_pb.VolumeEcShardInformationMessage) { ecInfoPerVolume := map[needle.VolumeId]*master_pb.VolumeEcShardInformationMessage{} + fileCount, deleteCount := ev.FileAndDeleteCount() + for _, s := range ev.Shards { m, ok := ecInfoPerVolume[s.VolumeId] if !ok { @@ -263,6 +290,8 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage(diskId uint32) (messages [ DiskType: string(ev.diskType), ExpireAtSec: ev.ExpireAtSec, DiskId: diskId, + FileCount: fileCount, + DeleteCount: deleteCount, } ecInfoPerVolume[s.VolumeId] = m } @@ -280,6 +309,67 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage(diskId uint32) (messages [ return } +// FileAndDeleteCount returns the current (fileCount, deleteCount) for this +// EC volume. +// +// - fileCount = .ecx size / NeedleMapEntrySize — the total number of +// needles recorded in the sealed sorted index. Because .ecx is written +// at encode time and only overwritten during decode/rebuild (which +// preserves record count), this matches the "cumulative put count" +// semantics of regular volume FileCount. +// +// - deleteCount = len(deletedNeedles) — the number of unique runtime +// deletes tracked in memory. The set is seeded from .ecj on load and +// appended to on every successful DeleteNeedleFromEcx. Because a +// needle delete is applied on exactly one shard holder, the admin +// aggregation sums deleteCount across nodes to get the volume's true +// delete total. +// +// Both values are O(1) — no index walking. +func (ev *EcVolume) FileAndDeleteCount() (fileCount, deleteCount uint64) { + fileCount = uint64(ev.ecxFileSize) / uint64(types.NeedleMapEntrySize) + ev.deletedNeedlesLock.RLock() + deleteCount = uint64(len(ev.deletedNeedles)) + ev.deletedNeedlesLock.RUnlock() + return +} + +// IsNeedleDeleted reports whether the given needle id is in the in-memory +// deleted set. Callers that have already looked the needle up in .ecx +// should consult this to apply runtime deletion state on top of the +// sealed index. +func (ev *EcVolume) IsNeedleDeleted(needleId types.NeedleId) bool { + ev.deletedNeedlesLock.RLock() + _, ok := ev.deletedNeedles[needleId] + ev.deletedNeedlesLock.RUnlock() + return ok +} + +// markNeedleDeletedInMemory inserts a needle id into the deleted set. +func (ev *EcVolume) markNeedleDeletedInMemory(needleId types.NeedleId) { + ev.deletedNeedlesLock.Lock() + ev.deletedNeedles[needleId] = struct{}{} + ev.deletedNeedlesLock.Unlock() +} + +// loadDeletedNeedlesFromEcj walks the .ecj journal and populates the +// in-memory deleted set. Called once from NewEcVolume under the exclusive +// ownership of the just-constructed (and not yet shared) EcVolume. +func (ev *EcVolume) loadDeletedNeedlesFromEcj() error { + if ev.ecjFile == nil || ev.ecjFileSize < int64(types.NeedleIdSize) { + return nil + } + buf := make([]byte, types.NeedleIdSize) + for off := int64(0); off+int64(types.NeedleIdSize) <= ev.ecjFileSize; off += int64(types.NeedleIdSize) { + if _, err := ev.ecjFile.ReadAt(buf, off); err != nil { + return fmt.Errorf("read ecj at %d: %w", off, err) + } + id := types.BytesToNeedleId(buf) + ev.deletedNeedles[id] = struct{}{} + } + return nil +} + func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) { // find the needle from ecx file @@ -313,7 +403,15 @@ func (ev *EcVolume) LocateEcShardNeedleInterval(version needle.Version, offset i } func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) { - return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil) + offset, size, err = SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil) + if err != nil { + return + } + // Apply runtime deletion state on top of the sealed .ecx lookup. + if ev.IsNeedleDeleted(needleId) { + size = types.TombstoneFileSize + } + return } func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) { diff --git a/weed/storage/erasure_coding/ec_volume_counts_test.go b/weed/storage/erasure_coding/ec_volume_counts_test.go new file mode 100644 index 000000000..cb5e9c5f1 --- /dev/null +++ b/weed/storage/erasure_coding/ec_volume_counts_test.go @@ -0,0 +1,125 @@ +package erasure_coding_test + +import ( + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +// makeEntry builds one .ecx index entry (needle id, offset, size). +func makeEntry(key types.NeedleId, offset types.Offset, size types.Size) []byte { + b := make([]byte, types.NeedleIdSize+types.OffsetSize+types.SizeSize) + types.NeedleIdToBytes(b[0:types.NeedleIdSize], key) + types.OffsetToBytes(b[types.NeedleIdSize:types.NeedleIdSize+types.OffsetSize], offset) + types.SizeToBytes(b[types.NeedleIdSize+types.OffsetSize:], size) + return b +} + +// encodeEcjIds packs a slice of needle ids into the binary .ecj on-disk format. +func encodeEcjIds(ids []types.NeedleId) []byte { + buf := make([]byte, 0, len(ids)*types.NeedleIdSize) + for _, id := range ids { + b := make([]byte, types.NeedleIdSize) + types.NeedleIdToBytes(b, id) + buf = append(buf, b...) + } + return buf +} + +func writeFixture(t *testing.T, dir, collection string, vid int, ecxData []byte, ecjIds []types.NeedleId) *erasure_coding.EcVolume { + t.Helper() + base := filepath.Join(dir, fmt.Sprintf("%s_%d", collection, vid)) + + if err := os.WriteFile(base+".ecx", ecxData, 0644); err != nil { + t.Fatalf("write ecx: %v", err) + } + if err := os.WriteFile(base+".ecj", encodeEcjIds(ecjIds), 0644); err != nil { + t.Fatalf("write ecj: %v", err) + } + if err := os.WriteFile(base+".ec00", make([]byte, 8), 0644); err != nil { + t.Fatalf("write ec00: %v", err) + } + if err := os.WriteFile(base+".vif", []byte{}, 0644); err != nil { + t.Fatalf("write vif: %v", err) + } + + ev, err := erasure_coding.NewEcVolume("hdd", dir, dir, collection, needle.VolumeId(vid)) + if err != nil { + t.Fatalf("NewEcVolume: %v", err) + } + return ev +} + +// TestEcVolumeFileAndDeleteCountInitial verifies that FileAndDeleteCount is +// derived from file sizes at load: fileCount = .ecx size / NeedleMapEntrySize +// and deleteCount = .ecj size / NeedleIdSize. No index walk is performed. +func TestEcVolumeFileAndDeleteCountInitial(t *testing.T) { + dir := t.TempDir() + + // 3 needles in .ecx, 2 deletions already recorded in .ecj. + ecx := []byte{} + ecx = append(ecx, makeEntry(1, types.ToOffset(64), 100)...) + ecx = append(ecx, makeEntry(2, types.ToOffset(128), 200)...) + ecx = append(ecx, makeEntry(3, types.ToOffset(256), 300)...) + ecj := []types.NeedleId{2, 3} + + ev := writeFixture(t, dir, "test", 1, ecx, ecj) + defer ev.Close() + + fileCount, deleteCount := ev.FileAndDeleteCount() + if fileCount != 3 { + t.Errorf("fileCount: got %d, want 3 (.ecx entries)", fileCount) + } + if deleteCount != 2 { + t.Errorf("deleteCount: got %d, want 2 (.ecj entries)", deleteCount) + } +} + +// TestEcVolumeFileAndDeleteCountAfterDelete verifies that DeleteNeedleFromEcx +// increments the derived delete count by appending to .ecj, while leaving +// fileCount (derived from the sealed .ecx) unchanged. Idempotent re-deletes +// and deletes of missing needles must not drift the count. +func TestEcVolumeFileAndDeleteCountAfterDelete(t *testing.T) { + dir := t.TempDir() + + ecx := []byte{} + ecx = append(ecx, makeEntry(1, types.ToOffset(64), 100)...) + ecx = append(ecx, makeEntry(2, types.ToOffset(128), 200)...) + + ev := writeFixture(t, dir, "test", 1, ecx, nil) + defer ev.Close() + + if fc, dc := ev.FileAndDeleteCount(); fc != 2 || dc != 0 { + t.Fatalf("initial: got (%d, %d), want (2, 0)", fc, dc) + } + + if err := ev.DeleteNeedleFromEcx(2); err != nil { + t.Fatalf("DeleteNeedleFromEcx: %v", err) + } + if fc, dc := ev.FileAndDeleteCount(); fc != 2 || dc != 1 { + t.Errorf("after first delete: got (%d, %d), want (2, 1)", fc, dc) + } + + // Re-deleting an already tombstoned needle is a no-op on .ecj, so + // deleteCount must stay at 1. + if err := ev.DeleteNeedleFromEcx(2); err != nil { + t.Fatalf("idempotent DeleteNeedleFromEcx: %v", err) + } + if fc, dc := ev.FileAndDeleteCount(); fc != 2 || dc != 1 { + t.Errorf("after idempotent delete: got (%d, %d), want (2, 1)", fc, dc) + } + + // Deleting a non-existent needle is a no-op: search returns NotFound, + // no .ecj append. + if err := ev.DeleteNeedleFromEcx(99); err != nil { + t.Fatalf("missing DeleteNeedleFromEcx: %v", err) + } + if fc, dc := ev.FileAndDeleteCount(); fc != 2 || dc != 1 { + t.Errorf("after missing delete: got (%d, %d), want (2, 1)", fc, dc) + } +} diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go index 076176bea..55490fc67 100644 --- a/weed/storage/erasure_coding/ec_volume_delete.go +++ b/weed/storage/erasure_coding/ec_volume_delete.go @@ -5,6 +5,7 @@ import ( "io" "os" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -24,28 +25,71 @@ var ( } ) +// DeleteNeedleFromEcx marks the given needle as deleted. .ecx is treated +// as an immutable sealed sorted index; runtime deletes are recorded by +// appending the needle id to the .ecj deletion journal and inserting it +// into the in-memory deletedNeedles set. A subsequent FindNeedleFromEcx +// masks the id out by returning TombstoneFileSize. +// +// The .ecj append is the durable commit point — only after it syncs do +// we publish the id into the in-memory set. A partial write is truncated +// back to the known-good size so the on-disk journal and the set cannot +// drift. func (ev *EcVolume) DeleteNeedleFromEcx(needleId types.NeedleId) (err error) { - _, _, err = SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, MarkNeedleDeleted) - + // Look the needle up read-only. A missing needle is not an error + // (already gone, e.g. from a race against encode); a pre-existing + // .ecx tombstone means a prior decode/rebuild folded it in, in + // which case there is nothing to journal but we still mirror it + // into the in-memory set so delete_count stays consistent. + _, oldSize, err := SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil) if err != nil { if err == NotFoundError { return nil } return err } + if oldSize.IsDeleted() { + ev.markNeedleDeletedInMemory(needleId) + return nil + } + + // Serialise runtime deletes on ecjFileAccessLock so the idempotence + // check, the journal append and the set insertion happen atomically + // with respect to one another. + ev.ecjFileAccessLock.Lock() + defer ev.ecjFileAccessLock.Unlock() + + if ev.IsNeedleDeleted(needleId) { + return nil + } b := make([]byte, types.NeedleIdSize) types.NeedleIdToBytes(b, needleId) - ev.ecjFileAccessLock.Lock() + prevEcjSize := ev.ecjFileSize + if _, seekErr := ev.ecjFile.Seek(0, io.SeekEnd); seekErr != nil { + return fmt.Errorf("seek ecj: %w", seekErr) + } + n, writeErr := ev.ecjFile.Write(b) + if writeErr != nil { + if truncErr := ev.ecjFile.Truncate(prevEcjSize); truncErr != nil { + glog.Errorf("ec volume %d: failed to truncate ecj after write error: %v", ev.VolumeId, truncErr) + } + return fmt.Errorf("write ecj: %w", writeErr) + } + if syncErr := ev.ecjFile.Sync(); syncErr != nil { + if truncErr := ev.ecjFile.Truncate(prevEcjSize); truncErr != nil { + glog.Errorf("ec volume %d: failed to truncate ecj after sync error: %v", ev.VolumeId, truncErr) + } + return fmt.Errorf("sync ecj: %w", syncErr) + } + ev.ecjFileSize += int64(n) - ev.ecjFile.Seek(0, io.SeekEnd) - ev.ecjFile.Write(b) + // Publish into the in-memory set only after the journal is durable. + ev.markNeedleDeletedInMemory(needleId) - ev.ecjFileAccessLock.Unlock() - - return + return nil } func RebuildEcxFile(baseFileName string) error { diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go index 8a4359a53..13d22ee60 100644 --- a/weed/storage/erasure_coding/ec_volume_info.go +++ b/weed/storage/erasure_coding/ec_volume_info.go @@ -13,6 +13,8 @@ type EcVolumeInfo struct { DiskId uint32 // ID of the disk this EC volume is on ExpireAtSec uint64 // ec volume destroy time, calculated from the ec volume was created ShardsInfo *ShardsInfo + FileCount uint64 // live needle count for this EC volume (same on every node holding shards) + DeleteCount uint64 // tombstoned needle count for this EC volume } func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo { @@ -23,6 +25,8 @@ func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo { DiskType: ecInfo.DiskType, DiskId: ecInfo.DiskId, ExpireAtSec: ecInfo.ExpireAtSec, + FileCount: ecInfo.FileCount, + DeleteCount: ecInfo.DeleteCount, } } @@ -35,5 +39,7 @@ func (evi *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb.Vol DiskType: evi.DiskType, ExpireAtSec: evi.ExpireAtSec, DiskId: evi.DiskId, + FileCount: evi.FileCount, + DeleteCount: evi.DeleteCount, } } diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go index 9fcb092a2..da20ffd52 100644 --- a/weed/storage/store_ec_delete.go +++ b/weed/storage/store_ec_delete.go @@ -2,6 +2,7 @@ package storage import ( "context" + "errors" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -35,6 +36,12 @@ func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle } +// errEcShardMissing indicates that no data node in the topology currently +// holds the requested EC shard. Callers use this to trigger fallback to +// another shard, distinguishing "no home for this shard" from "the shard +// holder is reachable but the RPC failed". +var errEcShardMissing = fmt.Errorf("ec shard missing") + func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error { _, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version) @@ -45,15 +52,22 @@ func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_cod return erasure_coding.NotFoundError } - shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) + primaryShardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) - err = s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId) - if err == nil { - return nil + // Normal path: delete on exactly one node holding the primary data shard. + err = s.doDeleteNeedleFromRemoteEcShardServers(primaryShardId, ecVolume, needleId) + if err == nil || !errors.Is(err, errEcShardMissing) { + return err } - for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ { - if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); parityDeletionError == nil { + // Primary data shard has no live holders; fall back to any other shard + // (remaining data shards first, then parity) so a shard holder can still + // tombstone the .ecx and the delete is durable. + for shardId := erasure_coding.ShardId(0); shardId < erasure_coding.TotalShardsCount; shardId++ { + if shardId == primaryShardId { + continue + } + if fallbackErr := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); fallbackErr == nil { return nil } } @@ -68,18 +82,25 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.Sh sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId] ecVolume.ShardLocationsLock.RUnlock() - if !hasShardLocations { - return fmt.Errorf("ec shard %d.%d not located", ecVolume.VolumeId, shardId) + if !hasShardLocations || len(sourceDataNodes) == 0 { + return fmt.Errorf("ec shard %d.%d: %w", ecVolume.VolumeId, shardId, errEcShardMissing) } + // Apply the delete on exactly one node. Replicas of the same shard all + // hold identical .ecx copies, so tombstoning more than one would double + // the delete count in heartbeat reporting. Try nodes in order and stop + // at the first success; only return error if every replica failed. + var lastErr error for _, sourceDataNode := range sourceDataNodes { glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode) - if err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId); err != nil { - return err + if err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId); err == nil { + return nil + } else { + lastErr = err } } - return nil + return lastErr } diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go index 827e2e801..30c174941 100644 --- a/weed/topology/topology_ec.go +++ b/weed/topology/topology_ec.go @@ -26,6 +26,8 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf DiskType: shardInfo.DiskType, DiskId: shardInfo.DiskId, ExpireAtSec: shardInfo.ExpireAtSec, + FileCount: shardInfo.FileCount, + DeleteCount: shardInfo.DeleteCount, } shards = append(shards, ecVolumeInfo) @@ -53,6 +55,8 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards DiskType: shardInfo.DiskType, DiskId: shardInfo.DiskId, ExpireAtSec: shardInfo.ExpireAtSec, + FileCount: shardInfo.FileCount, + DeleteCount: shardInfo.DeleteCount, } newShards = append(newShards, ecVolumeInfo) @@ -66,6 +70,8 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards DiskType: shardInfo.DiskType, DiskId: shardInfo.DiskId, ExpireAtSec: shardInfo.ExpireAtSec, + FileCount: shardInfo.FileCount, + DeleteCount: shardInfo.DeleteCount, } deletedShards = append(deletedShards, ecVolumeInfo)