mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
admin: report file and delete counts for EC volumes (#9060)
* admin: report file and delete counts for EC volumes The admin bucket size fix (#9058) left object counts at zero for EC-encoded data because VolumeEcShardInformationMessage carried no file count. Billing/monitoring dashboards therefore still under-report objects once a bucket is EC-encoded. Thread file_count and delete_count end-to-end: - Add file_count/delete_count to VolumeEcShardInformationMessage (proto fields 8 and 9) and regenerate master_pb. - Compute them lazily on volume servers by walking the .ecx index once per EcVolume, cache on the struct, and keep the cache in sync inside DeleteNeedleFromEcx (distinguishing live vs already-tombstoned entries so idempotent deletes do not drift the counts). - Populate the new proto fields from EcVolume.ToVolumeEcShardInformationMessage and carry them through the master-side EcVolumeInfo / topology sync. - Aggregate in admin collectCollectionStats, deduping per volume id: every node holding shards of an EC volume reports the same counts, so summing across nodes would otherwise multiply the object count by the number of shard holders. Regression tests cover the initial .ecx walk, live/tombstoned delete bookkeeping (including idempotent and missing-key cases), and the admin dedup path for an EC volume reported by multiple nodes. * ec: include .ecj journal in EcVolume delete count The initial delete count only reflected .ecx tombstones, missing any needle that was journaled in .ecj but not yet folded into .ecx — e.g. on partial recovery. Expand initCountsLocked to take the union of .ecx tombstones and .ecj journal entries, deduped by needle id, so: - an id that is both tombstoned in .ecx and listed in .ecj counts once - a duplicate .ecj entry counts once - an .ecj id with a live .ecx entry is counted as deleted (not live) - an .ecj id with no matching .ecx entry is still counted Covered by TestEcVolumeFileAndDeleteCountEcjUnion. * ec: report delete count authoritatively and tombstone once per delete Address two issues with the previous EcVolume file/delete count work: 1. The delete count was computed lazily on first heartbeat and mixed in a .ecj-union fallback to "recover" partial state. That diverged from how regular volumes report counts (always live from the needle map) and had drift cases when .ecj got reconciled. Replace with an eager walk of .ecx at NewEcVolume time, maintained incrementally on every DeleteNeedleFromEcx call. Semantics now match needle_map_metric: FileCount is the total number of needles ever recorded in .ecx (live + tombstoned), DeleteCount is the tombstones — so live = FileCount - DeleteCount. Drop the .ecj-union logic entirely. 2. A single EC needle delete fanned out to every node holding a replica of the primary data shard and called DeleteNeedleFromEcx on each, which inflated the per-volume delete total by the replica factor. Rewrite doDeleteNeedleFromRemoteEcShardServers to try replicas in order and stop at the first success (one tombstone per delete), and only fall back to other shards when the primary shard has no home (ErrEcShardMissing sentinel), not on transient RPC errors. Admin aggregation now folds EC counts correctly: FileCount is deduped per volume id (every shard holder has an identical .ecx) and DeleteCount is summed across nodes (each delete tombstones exactly one node). Live object count = deduped FileCount - summed DeleteCount. Tests updated to match the new semantics: - EC volume counts seed FileCount as total .ecx entries (live + tombstoned), DeleteCount as tombstones. - DeleteNeedleFromEcx keeps FileCount constant and increments DeleteCount only on live->tombstone transitions. - Admin dedup test uses distinct per-node delete counts (5 + 3 + 2) to prove they're summed, while FileCount=100 is applied once. * ec: test fixture uses real vid; admin warns on skewed ec counts - writeFixture now builds the .ecx/.ecj/.ec00/.vif filenames from the actual vid passed in, instead of hardcoding "_1". The existing tests all use vid=1 so behaviour is unchanged, but the helper no longer silently diverges from its documented parameter. - collectCollectionStats logs a glog warning when an EC volume's summed delete count exceeds its deduped file count, surfacing the anomaly (stale heartbeat, counter drift, etc.) instead of silently dropping the volume from the object count. * ec: derive file/delete counts from .ecx/.ecj file sizes seedCountsFromEcx walked the full .ecx index at volume load, which is wasted work: .ecx has fixed-size entries (NeedleMapEntrySize) and .ecj has fixed-size deletion records (NeedleIdSize), so both counts are pure file-size arithmetic. fileCount = ecxFileSize / NeedleMapEntrySize deleteCount = ecjFileSize / NeedleIdSize Rip out the cached counters, countsLock, seedCountsFromEcx, and the recordDelete helper. Track ecjFileSize directly on the EcVolume struct, seed it from Stat() at load, and bump it on every successful .ecj append inside DeleteNeedleFromEcx under ecjFileAccessLock. Skip the .ecj write entirely when the needle is already tombstoned so the derived delete count stays idempotent on repeat deletes. Heartbeats now compute counts in O(1). Tests updated: the initial fixture pre-populates .ecj with two ids to verify the file-size derivation end-to-end, and the delete test keeps its idempotent-re-delete / missing-needle invariants (unchanged externally, now enforced by the early return rather than a cache guard). * ec: sync Rust volume server with Go file/delete count semantics Mirror the Go-side EC file/delete count work in the Rust volume server so mixed Go/Rust clusters report consistent bucket object counts in the admin dashboard. - Add file_count (8) and delete_count (9) to the Rust copy of VolumeEcShardInformationMessage (seaweed-volume/proto/master.proto). - EcVolume gains ecj_file_size, seeded from the journal's metadata on open and bumped inside journal_delete on every successful append. - file_and_delete_count() returns counts derived in O(1) from ecx_file_size / NEEDLE_MAP_ENTRY_SIZE and ecj_file_size / NEEDLE_ID_SIZE, matching Go's FileAndDeleteCount. - to_volume_ec_shard_information_messages populates the new proto fields instead of defaulting them to zero. - mark_needle_deleted_in_ecx now returns a DeleteOutcome enum (NotFound / AlreadyDeleted / Tombstoned) so journal_delete can skip both the .ecj append and the size bump when the needle is missing or already tombstoned, keeping the derived delete_count idempotent on repeat or no-op deletes. - Rust's EcVolume::new no longer replays .ecj into .ecx on load. Go's RebuildEcxFile is only called from specific decode/rebuild gRPC handlers, not on volume open, and replaying on load was hiding the deletion journal from the new file-size-derived delete counter. rebuild_ecx_from_journal is kept as dead_code for future decode paths that may want the same replay semantics. Also clean up the Go FileAndDeleteCount to drop unnecessary runtime guards against zero constants — NeedleMapEntrySize and NeedleIdSize are compile-time non-zero. test_ec_volume_journal updated to pre-populate the .ecx with the needles it deletes, and extended to verify that repeat and missing-id deletes do not drift the derived counts. * ec: document enterprise-reserved proto field range on ec shard info Both OSS master.proto copies now note that fields 10-19 are reserved for future upstream additions while 20+ are owned by the enterprise fork. Enterprise already pins data_shards/parity_shards at 20/21, so keeping OSS additions inside 8-19 avoids wire-level collisions for mixed deployments. * ec(rust): resolve .ecx/.ecj helpers from ecx_actual_dir ecx_file_name() and ecj_file_name() resolved from self.dir_idx, but new() opens the actual files from ecx_actual_dir (which may fall back to the data dir when the idx dir does not contain the index). After a fallback, read_deleted_needles() and rebuild_ecx_from_journal() would read/rebuild the wrong (nonexistent) path while heartbeats reported counts from the file actually in use — silently dropping deletes. Point idx_base_name() at ecx_actual_dir, which is initialized to dir_idx and only diverges after a successful fallback, so every call site agrees with the file new() has open. The pre-fallback call in new() (line 142) still returns the dir_idx path because ecx_actual_dir == dir_idx at that point. Update the destroy() sweep to build the dir_idx cleanup paths explicitly instead of leaning on the helpers, so post-fallback stale files in the idx dir are still removed. * ec: reset ecj size after rebuild; rollback ecx tombstone on ecj failure Two EC delete-count correctness fixes applied symmetrically to Go and Rust volume servers. 1. rebuild_ecx_from_journal (Rust) now sets ecj_file_size = 0 after recreating the empty journal, matching the on-disk truth. Previously the cached size still reflected the pre-rebuild journal and file_and_delete_count() would keep reporting stale delete counts. The Go side has no equivalent bug because RebuildEcxFile runs in an offline helper that does not touch an EcVolume struct. 2. DeleteNeedleFromEcx / journal_delete used to tombstone the .ecx entry before writing the .ecj record. If the .ecj append then failed, the needle was permanently marked deleted but the heartbeat-reported delete_count never advanced (it is derived from .ecj file size), and a retry would see AlreadyDeleted and early- return, leaving the drift permanent. Both languages now capture the entry's file offset and original size bytes during the mark step, attempt the .ecj append, and on failure roll the .ecx tombstone back by writing the original size bytes at the known offset. A rollback that itself errors is logged (glog / tracing) but cannot re-sync the files — this is the same failure mode a double disk error would produce, and is unavoidable without a full on-disk transaction log. Go: wrap MarkNeedleDeleted in a closure that captures the file offset into an outer variable, then pass the offset + oldSize to the new rollbackEcxTombstone helper on .ecj seek/write errors. Rust: DeleteOutcome::Tombstoned now carries the size_offset and a [u8; SIZE_SIZE] copy of the pre-tombstone size field. journal_delete destructures on Tombstoned and calls restore_ecx_size on .ecj append failure. * test(ec): widen admin /health wait to 180s for cold CI TestEcEndToEnd starts master, 14 volume servers, filer, 2 workers and admin in sequence, then waited only 60s for admin's HTTP server to come up. On cold GitHub runners the tail of the earlier subprocess startups eats most of that budget and the wait occasionally times out (last hit on run 24374773031). The local fast path is still ~20s total, so the bump only extends the timeout ceiling, not the happy path. * test(ec): fork volume servers in parallel in TestEcEndToEnd startWeed is non-blocking (just cmd.Start()), so the per-process fork + mkdir + log-file-open overhead for 14 volume servers was serialized for no reason. On cold CI disks that overhead stacks up and eats into the subsequent admin /health wait, which is how run 24374773031 flaked. Wrap the volume-server loop in a sync.WaitGroup and guard runningCmds with a mutex so concurrent appends are safe. startWeed still calls t.Fatalf on failure, which is fine from a goroutine for a fatal test abort; the fail-fast isn't something we rely on for precise ordering. * ec: fsync ecx before ecj, truncate on failure, harden rebuild Four correctness fixes covering both volume servers. 1. Durability ordering (Go + Rust). After marking the .ecx tombstone we now fsync .ecx before touching .ecj, so a crash between the two files cannot leave the journal with an entry for a needle whose tombstone is still sitting in page cache. Once the fsync returns, the tombstone is the source of truth: reads see "deleted", delete_count may under-count by one (benign, idempotent retries) but never over-reports. If the fsync itself fails we restore the original size bytes and surface the error. The .ecj append is then followed by its own Sync so the reported delete_count matches the on-disk journal once the write returns. 2. .ecj truncation on append failure. write_all may have extended the journal on disk before sync_all / Sync errors out, leaving the cached ecj_file_size out of sync with the physical length and drifting delete_count permanently after restart. Both languages now capture the pre-append size, truncate the file back via set_len / Truncate on any write or sync failure, and only then restore the .ecx tombstone. Truncation errors are logged — same-fd length resets cannot realistically fail — but cannot themselves re-sync the files. 3. Atomic rebuild_ecx_from_journal (Rust, dead code today but wired up on any future decode path). Previously a failed mark_needle_deleted_in_ecx call was swallowed with `let _ = ...` and the journal was still removed, silently losing tombstones. We now bubble up any non-NotFound error, fsync .ecx after the whole replay succeeds, and only then drop and recreate .ecj. NotFound is still ignored (expected race between delete and encode). 4. Missing-.ecx hardening (Rust). mark_needle_deleted_in_ecx used to return Ok(NotFound) when self.ecx_file was None, hiding a closed or corrupt volume behind what looks like an idempotent no-op. It now returns an io::Error carrying the volume id so callers (e.g. journal_delete) fail loudly instead. Existing Go and Rust EC test suites stay green. * ec: make .ecx immutable at runtime; track deletes in memory + .ecj Refactors both volume servers so the sealed sorted .ecx index is never mutated during normal operation. Runtime deletes are committed to the .ecj deletion journal and tracked in an in-memory deleted-needle set; read-path lookups consult that set to mask out deleted ids on top of the immutable .ecx record. Mirrors the intended design on both Go and Rust sides. EcVolume gains a `deletedNeedles` / `deleted_needles` set seeded from .ecj in NewEcVolume / EcVolume::new. DeleteNeedleFromEcx / journal_delete: 1. Looks the needle up read-only in .ecx. 2. Missing needle -> no-op. 3. Pre-existing .ecx tombstone (from a prior decode/rebuild) -> mirror into the in-memory set, no .ecj append. 4. Otherwise append the id to .ecj, fsync, and only then publish the id into the set. A partial write is truncated back to the pre-append length so the on-disk journal and the in-memory set cannot drift. FindNeedleFromEcx / find_needle_from_ecx now return TombstoneFileSize when the id is in the in-memory set, even though the bytes on disk still show the original size. FileAndDeleteCount: fileCount = .ecx size / NeedleMapEntrySize (unchanged) deleteCount = len(deletedNeedles) (was: .ecj size / NeedleIdSize) The RebuildEcxFile / rebuild_ecx_from_journal decode-time helpers still fold .ecj into .ecx — that is the one place tombstones land in the physical index, and it runs offline on closed files. Rust's rebuild helper now also clears the in-memory set when it succeeds. Dead code removed on the Rust side: `DeleteOutcome`, `mark_needle_deleted_in_ecx`, `restore_ecx_size`. Go drops the runtime `rollbackEcxTombstone` path. Neither helper was needed once .ecx stopped being a runtime mutation target. TestEcVolumeSyncEnsuresDeletionsVisible (issue #7751) is rewritten as TestEcVolumeDeleteDurableToJournal, which exercises the full durability chain: delete -> .ecj fsync -> FindNeedleFromEcx masks via the in-memory set -> raw .ecx bytes are *unchanged* -> Close + RebuildEcxFile folds the journal into .ecx -> raw bytes now show the tombstone, as CopyFile in the decode path expects.
This commit is contained in:
@@ -145,6 +145,11 @@ message VolumeEcShardInformationMessage {
|
||||
uint64 expire_at_sec = 5; // used to record the destruction time of ec volume
|
||||
uint32 disk_id = 6;
|
||||
repeated int64 shard_sizes = 7; // optimized: sizes for shards in order of set bits in ec_index_bits
|
||||
uint64 file_count = 8; // total needles in the .ecx index (live + tombstoned)
|
||||
uint64 delete_count = 9; // node-local tombstones in the .ecj deletion journal
|
||||
// fields 10-19 reserved for future upstream open-source additions.
|
||||
// fields 20+ are owned by the enterprise fork (e.g. data_shards/parity_shards)
|
||||
// and must not be used here without coordination.
|
||||
}
|
||||
|
||||
message StorageBackend {
|
||||
|
||||
@@ -3,9 +3,10 @@
|
||||
//! Each EcVolume has a sorted index (.ecx) and a deletion journal (.ecj).
|
||||
//! Shards (.ec00-.ec13) may be distributed across multiple servers.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{self, Write};
|
||||
use std::sync::RwLock;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use crate::pb::master_pb;
|
||||
@@ -28,6 +29,17 @@ pub struct EcVolume {
|
||||
ecx_file: Option<File>,
|
||||
ecx_file_size: i64,
|
||||
ecj_file: Option<File>,
|
||||
/// On-disk size of the .ecj deletion journal. Used only by IO helpers
|
||||
/// (seek / set_len on partial writes) — the authoritative runtime
|
||||
/// delete count comes from `deleted_needles.len()`.
|
||||
ecj_file_size: i64,
|
||||
/// In-memory set of needle ids that have been deleted since the volume
|
||||
/// was encoded. .ecx is immutable at runtime — it only stores the
|
||||
/// sorted (id, offset, size) index written at encode time — and runtime
|
||||
/// deletes are journaled to .ecj + tracked here. Reads consult this
|
||||
/// set to mask out deleted needles on top of the sealed .ecx lookup.
|
||||
/// Seeded from .ecj in `new()` and updated by `journal_delete`.
|
||||
deleted_needles: RwLock<HashSet<NeedleId>>,
|
||||
pub disk_type: DiskType,
|
||||
/// Directory where .ecx/.ecj were actually found (may differ from dir_idx after fallback).
|
||||
ecx_actual_dir: String,
|
||||
@@ -112,6 +124,8 @@ impl EcVolume {
|
||||
ecx_file: None,
|
||||
ecx_file_size: 0,
|
||||
ecj_file: None,
|
||||
ecj_file_size: 0,
|
||||
deleted_needles: RwLock::new(HashSet::new()),
|
||||
disk_type: DiskType::default(),
|
||||
ecx_actual_dir: dir_idx.to_string(),
|
||||
shard_locations: HashMap::new(),
|
||||
@@ -141,10 +155,12 @@ impl EcVolume {
|
||||
}
|
||||
}
|
||||
|
||||
// Replay .ecj journal into .ecx on startup (matches Go's RebuildEcxFile).
|
||||
vol.rebuild_ecx_from_journal()?;
|
||||
|
||||
// Open .ecj file (deletion journal) — use ecx_actual_dir for consistency
|
||||
// Open .ecj file (deletion journal) — use ecx_actual_dir for consistency.
|
||||
// Note: Go does NOT replay .ecj into .ecx at volume load (RebuildEcxFile
|
||||
// is only invoked from specific decode/rebuild gRPC handlers), so we
|
||||
// don't either. Tombstones from prior sessions were already written
|
||||
// in-place in .ecx, and the journal grows monotonically until a
|
||||
// decode/rebuild operation folds it in.
|
||||
let ecj_base =
|
||||
crate::storage::volume::volume_file_name(&vol.ecx_actual_dir, collection, volume_id);
|
||||
let ecj_path = format!("{}.ecj", ecj_base);
|
||||
@@ -154,11 +170,75 @@ impl EcVolume {
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&ecj_path)?;
|
||||
vol.ecj_file_size = ecj_file.metadata()?.len() as i64;
|
||||
vol.ecj_file = Some(ecj_file);
|
||||
|
||||
// Seed the in-memory deleted set from the journal.
|
||||
vol.load_deleted_needles_from_ecj()?;
|
||||
|
||||
Ok(vol)
|
||||
}
|
||||
|
||||
/// Walk the .ecj journal and populate `deleted_needles`. Called once
|
||||
/// from `new()` under exclusive ownership of the just-constructed
|
||||
/// EcVolume, so locking is not strictly required — but we take the
|
||||
/// write lock anyway for symmetry with later mutations.
|
||||
fn load_deleted_needles_from_ecj(&mut self) -> io::Result<()> {
|
||||
let ecj_file = match self.ecj_file.as_ref() {
|
||||
Some(f) => f,
|
||||
None => return Ok(()),
|
||||
};
|
||||
if self.ecj_file_size < NEEDLE_ID_SIZE as i64 {
|
||||
return Ok(());
|
||||
}
|
||||
let mut buf = [0u8; NEEDLE_ID_SIZE];
|
||||
let mut set = self
|
||||
.deleted_needles
|
||||
.write()
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::Other, "deleted_needles lock poisoned"))?;
|
||||
let mut off: i64 = 0;
|
||||
while off + NEEDLE_ID_SIZE as i64 <= self.ecj_file_size {
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::fs::FileExt;
|
||||
ecj_file.read_exact_at(&mut buf, off as u64)?;
|
||||
}
|
||||
set.insert(NeedleId::from_bytes(&buf));
|
||||
off += NEEDLE_ID_SIZE as i64;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns (file_count, delete_count) for this EC volume. Mirrors Go's
|
||||
/// `EcVolume.FileAndDeleteCount`:
|
||||
///
|
||||
/// file_count = ecx_file_size / NEEDLE_MAP_ENTRY_SIZE — total
|
||||
/// entries in the sealed sorted .ecx index.
|
||||
/// delete_count = deleted_needles.len() — unique
|
||||
/// runtime deletes tracked in memory (seeded from
|
||||
/// .ecj on load and updated by `journal_delete`).
|
||||
///
|
||||
/// Because each needle delete is applied on exactly one shard holder,
|
||||
/// the admin aggregation sums delete_count across nodes while taking
|
||||
/// file_count from a single holder (they are identical per volume).
|
||||
pub fn file_and_delete_count(&self) -> (u64, u64) {
|
||||
let file_count = (self.ecx_file_size as u64) / (NEEDLE_MAP_ENTRY_SIZE as u64);
|
||||
let delete_count = self
|
||||
.deleted_needles
|
||||
.read()
|
||||
.map(|s| s.len() as u64)
|
||||
.unwrap_or(0);
|
||||
(file_count, delete_count)
|
||||
}
|
||||
|
||||
/// Reports whether the given needle id is in the in-memory deleted set.
|
||||
pub fn is_needle_deleted(&self, needle_id: NeedleId) -> bool {
|
||||
self.deleted_needles
|
||||
.read()
|
||||
.map(|s| s.contains(&needle_id))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
// ---- File names ----
|
||||
|
||||
#[allow(dead_code)]
|
||||
@@ -166,8 +246,16 @@ impl EcVolume {
|
||||
crate::storage::volume::volume_file_name(&self.dir, &self.collection, self.volume_id)
|
||||
}
|
||||
|
||||
/// Base path for the .ecx / .ecj index pair. Resolved from
|
||||
/// `ecx_actual_dir` (initialized to `dir_idx` and only updated after a
|
||||
/// successful idx-dir → data-dir fallback in `new()`), so every call site
|
||||
/// agrees on the same file regardless of whether the fallback fired.
|
||||
fn idx_base_name(&self) -> String {
|
||||
crate::storage::volume::volume_file_name(&self.dir_idx, &self.collection, self.volume_id)
|
||||
crate::storage::volume::volume_file_name(
|
||||
&self.ecx_actual_dir,
|
||||
&self.collection,
|
||||
self.volume_id,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn ecx_file_name(&self) -> String {
|
||||
@@ -255,6 +343,8 @@ impl EcVolume {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let (file_count, delete_count) = self.file_and_delete_count();
|
||||
|
||||
vec![master_pb::VolumeEcShardInformationMessage {
|
||||
id: self.volume_id.0,
|
||||
collection: self.collection.clone(),
|
||||
@@ -263,6 +353,8 @@ impl EcVolume {
|
||||
disk_type: self.disk_type.to_string(),
|
||||
expire_at_sec: self.expire_at_sec,
|
||||
disk_id,
|
||||
file_count,
|
||||
delete_count,
|
||||
..Default::default()
|
||||
}]
|
||||
}
|
||||
@@ -313,6 +405,13 @@ impl EcVolume {
|
||||
|
||||
let (key, offset, size) = idx_entry_from_bytes(&entry_buf);
|
||||
if key == needle_id {
|
||||
// Apply runtime deletion state on top of the sealed .ecx
|
||||
// lookup: a needle in the in-memory deleted set is
|
||||
// reported with TOMBSTONE_FILE_SIZE even though the .ecx
|
||||
// record itself is untouched.
|
||||
if self.is_needle_deleted(needle_id) {
|
||||
return Ok(Some((offset, TOMBSTONE_FILE_SIZE)));
|
||||
}
|
||||
return Ok(Some((offset, size)));
|
||||
} else if key < needle_id {
|
||||
lo = mid + 1;
|
||||
@@ -570,38 +669,41 @@ impl EcVolume {
|
||||
|
||||
// ---- Deletion ----
|
||||
|
||||
/// Mark a needle as deleted in the .ecx file in-place.
|
||||
/// Matches Go's MarkNeedleDeleted: binary search the .ecx, then overwrite
|
||||
/// the size field with TOMBSTONE_FILE_SIZE.
|
||||
fn mark_needle_deleted_in_ecx(&self, needle_id: NeedleId) -> io::Result<bool> {
|
||||
let ecx_file = match self.ecx_file.as_ref() {
|
||||
Some(f) => f,
|
||||
None => return Ok(false),
|
||||
};
|
||||
/// Write `TOMBSTONE_FILE_SIZE` over the Size field of an existing .ecx
|
||||
/// entry, matching Go's `MarkNeedleDeleted`. Only used by the offline
|
||||
/// `rebuild_ecx_from_journal` path — the runtime delete path does not
|
||||
/// touch .ecx because the index is treated as an immutable sorted
|
||||
/// (id, offset, size) table. Returns `false` if the needle is not in
|
||||
/// the index (ignored by callers) and an error on IO failure.
|
||||
fn tombstone_ecx_entry(&self, needle_id: NeedleId) -> io::Result<bool> {
|
||||
let ecx_file = self.ecx_file.as_ref().ok_or_else(|| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!(
|
||||
"ec volume {} has no open .ecx file (closed or corrupt)",
|
||||
self.volume_id.0
|
||||
),
|
||||
)
|
||||
})?;
|
||||
|
||||
let entry_count = self.ecx_file_size as usize / NEEDLE_MAP_ENTRY_SIZE;
|
||||
if entry_count == 0 {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// Binary search for the needle
|
||||
let mut lo: usize = 0;
|
||||
let mut hi: usize = entry_count;
|
||||
let mut entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
|
||||
|
||||
while lo < hi {
|
||||
let mid = lo + (hi - lo) / 2;
|
||||
let file_offset = (mid * NEEDLE_MAP_ENTRY_SIZE) as u64;
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::fs::FileExt;
|
||||
ecx_file.read_exact_at(&mut entry_buf, file_offset)?;
|
||||
}
|
||||
|
||||
let (key, _offset, _size) = idx_entry_from_bytes(&entry_buf);
|
||||
let (key, _offset, _old_size) = idx_entry_from_bytes(&entry_buf);
|
||||
if key == needle_id {
|
||||
// Found — overwrite the size field with TOMBSTONE_FILE_SIZE
|
||||
let size_offset = file_offset + NEEDLE_ID_SIZE as u64 + OFFSET_SIZE as u64;
|
||||
let mut size_buf = [0u8; SIZE_SIZE];
|
||||
TOMBSTONE_FILE_SIZE.to_bytes(&mut size_buf);
|
||||
@@ -617,13 +719,18 @@ impl EcVolume {
|
||||
hi = mid;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(false) // not found
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
/// Replay .ecj journal entries into .ecx on startup.
|
||||
/// Matches Go's RebuildEcxFile: for each needle ID in .ecj, marks it
|
||||
/// deleted in .ecx, then removes the .ecj file.
|
||||
/// Replay .ecj journal entries into .ecx: for each needle id in .ecj,
|
||||
/// overwrite its .ecx size field with a tombstone, then remove the
|
||||
/// journal file. Mirrors Go's `RebuildEcxFile`, which is invoked from
|
||||
/// specific decode / rebuild gRPC handlers — it is intentionally
|
||||
/// **not** called on volume load (runtime reads consult
|
||||
/// `deleted_needles` instead). The rebuild is atomic with respect to
|
||||
/// the journal: if any individual write fails the .ecj file is left
|
||||
/// in place and the error is propagated so tombstones are not lost.
|
||||
#[allow(dead_code)]
|
||||
fn rebuild_ecx_from_journal(&mut self) -> io::Result<()> {
|
||||
let ecj_path = self.ecj_file_name();
|
||||
if !std::path::Path::new(&ecj_path).exists() {
|
||||
@@ -642,14 +749,23 @@ impl EcVolume {
|
||||
break;
|
||||
}
|
||||
let needle_id = NeedleId::from_bytes(&data[start..start + NEEDLE_ID_SIZE]);
|
||||
// Errors for individual entries are non-fatal (needle may not exist in .ecx)
|
||||
let _ = self.mark_needle_deleted_in_ecx(needle_id);
|
||||
// A needle that never made it into .ecx is fine (e.g. the
|
||||
// delete raced against encode). Any other IO error aborts the
|
||||
// rebuild so the journal survives to be retried later.
|
||||
self.tombstone_ecx_entry(needle_id)?;
|
||||
}
|
||||
|
||||
// Remove the .ecj file after replay (matches Go)
|
||||
let _ = fs::remove_file(&ecj_path);
|
||||
// Durably flush the newly-written .ecx tombstones before dropping
|
||||
// the journal: the writes went through write_all_at and may still
|
||||
// be in page cache.
|
||||
if let Some(ref ecx_file) = self.ecx_file {
|
||||
ecx_file.sync_all()?;
|
||||
}
|
||||
|
||||
// Re-create .ecj for future deletions
|
||||
// Fold successful — drop and recreate the journal, clear the
|
||||
// in-memory deleted set (all of its contents are now materialized
|
||||
// in .ecx), and reset the cached size.
|
||||
fs::remove_file(&ecj_path)?;
|
||||
let ecj_file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
@@ -657,27 +773,126 @@ impl EcVolume {
|
||||
.append(true)
|
||||
.open(&ecj_path)?;
|
||||
self.ecj_file = Some(ecj_file);
|
||||
self.ecj_file_size = 0;
|
||||
if let Ok(mut set) = self.deleted_needles.write() {
|
||||
set.clear();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---- Deletion journal ----
|
||||
|
||||
/// Append a deleted needle ID to the .ecj journal and mark in .ecx.
|
||||
/// Matches Go's DeleteNeedleFromEcx: marks in .ecx first, then journals.
|
||||
/// Record a needle delete: append the id to the .ecj deletion journal
|
||||
/// and insert it into the in-memory deleted set. `.ecx` is not touched
|
||||
/// at runtime — it is a sealed sorted (id, offset, size) index and
|
||||
/// runtime deletion state lives exclusively in .ecj + `deleted_needles`.
|
||||
/// A lookup via `find_needle_from_ecx` masks the id out by returning
|
||||
/// `TOMBSTONE_FILE_SIZE` on a subsequent read.
|
||||
///
|
||||
/// The .ecj append is the durable commit point. On any failure the
|
||||
/// file is truncated back to the pre-append length so the on-disk
|
||||
/// journal and in-memory state cannot drift. Only after the sync
|
||||
/// succeeds is the id published into the set, so a failure leaves
|
||||
/// the delete invisible to readers.
|
||||
pub fn journal_delete(&mut self, needle_id: NeedleId) -> io::Result<()> {
|
||||
// Mark deleted in .ecx in-place (matches Go's MarkNeedleDeleted)
|
||||
let _ = self.mark_needle_deleted_in_ecx(needle_id);
|
||||
let ecj_file = self
|
||||
.ecj_file
|
||||
.as_mut()
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "ecj file not open"))?;
|
||||
// Look the needle up read-only. Missing is a silent no-op; a
|
||||
// pre-existing .ecx tombstone (from a prior decode/rebuild) is
|
||||
// mirrored into the in-memory set so delete_count stays accurate
|
||||
// without needing to walk .ecx on every heartbeat.
|
||||
match self.find_needle_from_ecx_raw(needle_id)? {
|
||||
None => return Ok(()),
|
||||
Some((_, size)) if size.is_deleted() => {
|
||||
if let Ok(mut set) = self.deleted_needles.write() {
|
||||
set.insert(needle_id);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
Some(_) => {}
|
||||
}
|
||||
|
||||
let mut buf = [0u8; NEEDLE_ID_SIZE];
|
||||
needle_id.to_bytes(&mut buf);
|
||||
ecj_file.write_all(&buf)?;
|
||||
ecj_file.sync_all()?;
|
||||
Ok(())
|
||||
// Idempotent fast path for repeat deletes — avoids the journal
|
||||
// append entirely so the derived delete_count stays stable.
|
||||
if self.is_needle_deleted(needle_id) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let prev_ecj_size = self.ecj_file_size;
|
||||
let append_result: io::Result<()> = {
|
||||
let ecj_file = self
|
||||
.ecj_file
|
||||
.as_mut()
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "ecj file not open"))?;
|
||||
let mut buf = [0u8; NEEDLE_ID_SIZE];
|
||||
needle_id.to_bytes(&mut buf);
|
||||
ecj_file
|
||||
.write_all(&buf)
|
||||
.and_then(|_| ecj_file.sync_all())
|
||||
};
|
||||
|
||||
match append_result {
|
||||
Ok(()) => {
|
||||
self.ecj_file_size += NEEDLE_ID_SIZE as i64;
|
||||
if let Ok(mut set) = self.deleted_needles.write() {
|
||||
set.insert(needle_id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
// write_all may have extended the file on disk before
|
||||
// sync_all failed; truncate back to the known-good size so
|
||||
// the on-disk journal never drifts past `deleted_needles`.
|
||||
if let Some(ecj) = self.ecj_file.as_mut() {
|
||||
if let Err(trunc_err) = ecj.set_len(prev_ecj_size as u64) {
|
||||
tracing::error!(
|
||||
volume_id = self.volume_id.0,
|
||||
needle_id = needle_id.0,
|
||||
truncate_error = %trunc_err,
|
||||
"failed to truncate ecj after append failure"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal: binary search .ecx without masking by `deleted_needles`.
|
||||
/// Used by `journal_delete` so a repeat delete can still see the raw
|
||||
/// pre-existing .ecx tombstone from a prior rebuild.
|
||||
fn find_needle_from_ecx_raw(
|
||||
&self,
|
||||
needle_id: NeedleId,
|
||||
) -> io::Result<Option<(Offset, Size)>> {
|
||||
let ecx_file = self
|
||||
.ecx_file
|
||||
.as_ref()
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "ecx file not open"))?;
|
||||
let entry_count = self.ecx_file_size as usize / NEEDLE_MAP_ENTRY_SIZE;
|
||||
if entry_count == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
let mut lo: usize = 0;
|
||||
let mut hi: usize = entry_count;
|
||||
let mut entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
|
||||
while lo < hi {
|
||||
let mid = lo + (hi - lo) / 2;
|
||||
let file_offset = (mid * NEEDLE_MAP_ENTRY_SIZE) as u64;
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::fs::FileExt;
|
||||
ecx_file.read_exact_at(&mut entry_buf, file_offset)?;
|
||||
}
|
||||
let (key, offset, size) = idx_entry_from_bytes(&entry_buf);
|
||||
if key == needle_id {
|
||||
return Ok(Some((offset, size)));
|
||||
} else if key < needle_id {
|
||||
lo = mid + 1;
|
||||
} else {
|
||||
hi = mid;
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Append a deleted needle ID to the .ecj journal, validating the cookie first.
|
||||
@@ -778,15 +993,17 @@ impl EcVolume {
|
||||
let _ = fs::remove_file(format!("{}.ecx", actual_base));
|
||||
let _ = fs::remove_file(format!("{}.ecj", actual_base));
|
||||
let _ = fs::remove_file(format!("{}.vif", actual_base));
|
||||
// Also try the configured idx dir and data dir in case files exist in either
|
||||
// Also sweep the originally-configured idx dir in case stale files
|
||||
// exist there (ecx_file_name() / ecj_file_name() now resolve from
|
||||
// ecx_actual_dir, so we have to build the idx-dir paths explicitly).
|
||||
if self.ecx_actual_dir != self.dir_idx {
|
||||
let _ = fs::remove_file(self.ecx_file_name());
|
||||
let _ = fs::remove_file(self.ecj_file_name());
|
||||
let idx_base = crate::storage::volume::volume_file_name(
|
||||
&self.dir_idx,
|
||||
&self.collection,
|
||||
self.volume_id,
|
||||
);
|
||||
let _ = fs::remove_file(format!("{}.ecx", idx_base));
|
||||
let _ = fs::remove_file(format!("{}.ecj", idx_base));
|
||||
let _ = fs::remove_file(format!("{}.vif", idx_base));
|
||||
}
|
||||
if self.ecx_actual_dir != self.dir && self.dir_idx != self.dir {
|
||||
@@ -859,16 +1076,33 @@ mod tests {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let dir = tmp.path().to_str().unwrap();
|
||||
|
||||
// Need ecx file for EcVolume::new to succeed
|
||||
write_ecx_file(dir, "", VolumeId(1), &[]);
|
||||
// .ecj append is gated on a live->tombstone transition in .ecx, so
|
||||
// the fixture must contain the needles we are about to delete.
|
||||
let entries = vec![
|
||||
(NeedleId(10), Offset::from_actual_offset(8), Size(100)),
|
||||
(NeedleId(20), Offset::from_actual_offset(200), Size(200)),
|
||||
];
|
||||
write_ecx_file(dir, "", VolumeId(1), &entries);
|
||||
|
||||
let mut vol = EcVolume::new(dir, dir, "", VolumeId(1)).unwrap();
|
||||
let (fc0, dc0) = vol.file_and_delete_count();
|
||||
assert_eq!((fc0, dc0), (2, 0));
|
||||
|
||||
vol.journal_delete(NeedleId(10)).unwrap();
|
||||
vol.journal_delete(NeedleId(20)).unwrap();
|
||||
|
||||
let deleted = vol.read_deleted_needles().unwrap();
|
||||
assert_eq!(deleted, vec![NeedleId(10), NeedleId(20)]);
|
||||
|
||||
let (fc, dc) = vol.file_and_delete_count();
|
||||
assert_eq!((fc, dc), (2, 2));
|
||||
|
||||
// Idempotent re-delete must not bump delete_count.
|
||||
vol.journal_delete(NeedleId(10)).unwrap();
|
||||
// Deleting a missing needle must not bump delete_count either.
|
||||
vol.journal_delete(NeedleId(999)).unwrap();
|
||||
let (fc, dc) = vol.file_and_delete_count();
|
||||
assert_eq!((fc, dc), (2, 2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@@ -22,10 +23,16 @@ const (
|
||||
FilerUrl = "http://localhost:8888"
|
||||
)
|
||||
|
||||
// Helper to run commands in background and track PIDs for cleanup
|
||||
var runningCmds []*exec.Cmd
|
||||
// Helper to run commands in background and track PIDs for cleanup. Guarded
|
||||
// by runningCmdsLock so parallel subprocess startup can append safely.
|
||||
var (
|
||||
runningCmds []*exec.Cmd
|
||||
runningCmdsLock sync.Mutex
|
||||
)
|
||||
|
||||
func cleanup() {
|
||||
runningCmdsLock.Lock()
|
||||
defer runningCmdsLock.Unlock()
|
||||
for _, cmd := range runningCmds {
|
||||
if cmd.Process != nil {
|
||||
cmd.Process.Kill()
|
||||
@@ -59,7 +66,9 @@ func startWeed(t *testing.T, name string, args ...string) *exec.Cmd {
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start weed %v: %v", args, err)
|
||||
}
|
||||
runningCmdsLock.Lock()
|
||||
runningCmds = append(runningCmds, cmd)
|
||||
runningCmdsLock.Unlock()
|
||||
return cmd
|
||||
}
|
||||
|
||||
@@ -107,14 +116,23 @@ func ensureEnvironment(t *testing.T) {
|
||||
waitForUrl(t, MasterUrl+"/cluster/status", 10)
|
||||
|
||||
// 3. Start Volume Server (Worker)
|
||||
// Start 14 volume servers to verify RS(10,4) default EC
|
||||
// Start 14 volume servers to verify RS(10,4) default EC. Fork/exec in
|
||||
// parallel because startWeed is non-blocking and the per-process fork +
|
||||
// mkdir + log-file-open overhead stacks up sequentially on cold CI
|
||||
// disks, eating most of the admin /health wait budget further down.
|
||||
var volWg sync.WaitGroup
|
||||
for i := 1; i <= 14; i++ {
|
||||
volName := fmt.Sprintf("volume%d", i)
|
||||
port := 8080 + i - 1
|
||||
dir := filepath.Join("tmp", volName)
|
||||
os.MkdirAll(dir, 0755)
|
||||
startWeed(t, volName, "volume", "-dir="+dir, "-mserver=localhost:9333", fmt.Sprintf("-port=%d", port), "-ip=localhost")
|
||||
volWg.Add(1)
|
||||
go func(i int) {
|
||||
defer volWg.Done()
|
||||
volName := fmt.Sprintf("volume%d", i)
|
||||
port := 8080 + i - 1
|
||||
dir := filepath.Join("tmp", volName)
|
||||
os.MkdirAll(dir, 0755)
|
||||
startWeed(t, volName, "volume", "-dir="+dir, "-mserver=localhost:9333", fmt.Sprintf("-port=%d", port), "-ip=localhost")
|
||||
}(i)
|
||||
}
|
||||
volWg.Wait()
|
||||
|
||||
// 4. Start Filer
|
||||
os.MkdirAll(filepath.Join("tmp", "filer"), 0755)
|
||||
@@ -136,7 +154,12 @@ func ensureEnvironment(t *testing.T) {
|
||||
os.RemoveAll(filepath.Join("tmp", "admin"))
|
||||
os.MkdirAll(filepath.Join("tmp", "admin"), 0755)
|
||||
startWeed(t, "admin", "admin", "-master=localhost:9333", "-port=23646", "-dataDir=./tmp/admin")
|
||||
waitForUrl(t, AdminUrl+"/health", 60)
|
||||
// Admin is started after master, 14 volume servers, filer and 2 workers,
|
||||
// so under cold CI conditions the wait here has to absorb the tail of
|
||||
// every earlier subprocess coming up. 60s is too tight and has flaked;
|
||||
// 180s gives comfortable headroom without meaningfully extending the
|
||||
// fast path (the first successful /health usually hits well under 30s).
|
||||
waitForUrl(t, AdminUrl+"/health", 180)
|
||||
|
||||
t.Log("Environment started successfully")
|
||||
}
|
||||
|
||||
@@ -1672,8 +1672,21 @@ type collectionStats struct {
|
||||
FileCount int64
|
||||
}
|
||||
|
||||
// ecVolumeCounts is used to correctly combine EC volume counts reported by
|
||||
// multiple nodes. Every node holding any shard of an EC volume reports the
|
||||
// same file_count (total entries in the replicated .ecx), so we dedupe it
|
||||
// per volume id. In contrast, a needle delete is applied on exactly one
|
||||
// shard holder, so each node reports its own local tombstone count and the
|
||||
// true delete total is the sum across nodes.
|
||||
type ecVolumeCounts struct {
|
||||
collection string
|
||||
fileCount uint64
|
||||
deleteCount uint64
|
||||
}
|
||||
|
||||
func collectCollectionStats(topologyInfo *master_pb.TopologyInfo) map[string]collectionStats {
|
||||
collectionMap := make(map[string]collectionStats)
|
||||
ecVolumeAgg := make(map[uint32]*ecVolumeCounts)
|
||||
for _, dc := range topologyInfo.DataCenterInfos {
|
||||
for _, rack := range dc.RackInfos {
|
||||
for _, node := range rack.DataNodeInfos {
|
||||
@@ -1709,11 +1722,36 @@ func collectCollectionStats(topologyInfo *master_pb.TopologyInfo) map[string]col
|
||||
data.PhysicalSize += int64(shards.TotalSize())
|
||||
data.LogicalSize += int64(shards.MinusParityShards().TotalSize())
|
||||
collectionMap[collection] = data
|
||||
|
||||
agg, ok := ecVolumeAgg[ecShardInfo.Id]
|
||||
if !ok {
|
||||
agg = &ecVolumeCounts{collection: collection, fileCount: ecShardInfo.FileCount}
|
||||
ecVolumeAgg[ecShardInfo.Id] = agg
|
||||
}
|
||||
agg.deleteCount += ecShardInfo.DeleteCount
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fold EC per-volume counts into the collection totals. fileCount is
|
||||
// already deduped (set once per volume), deleteCount is the sum of
|
||||
// local tombstones across every node holding shards of the volume.
|
||||
for vid, agg := range ecVolumeAgg {
|
||||
data := collectionMap[agg.collection]
|
||||
if agg.fileCount >= agg.deleteCount {
|
||||
data.FileCount += int64(agg.fileCount - agg.deleteCount)
|
||||
} else {
|
||||
// Should not happen in steady state — indicates a node reporting
|
||||
// a stale fileCount, a skewed heartbeat, or a delete-counter bug.
|
||||
// Defend the UI by skipping the add and surface the anomaly.
|
||||
glog.Warningf("ec volume %d in collection %q: summed delete_count=%d exceeds file_count=%d; skipping object count",
|
||||
vid, agg.collection, agg.deleteCount, agg.fileCount)
|
||||
}
|
||||
collectionMap[agg.collection] = data
|
||||
}
|
||||
|
||||
return collectionMap
|
||||
}
|
||||
|
||||
|
||||
@@ -122,3 +122,65 @@ func TestCollectCollectionStatsECEmptyCollection(t *testing.T) {
|
||||
t.Errorf("LogicalSize: got %d, want 2000", got.LogicalSize)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCollectCollectionStatsECFileAndDeleteCountAggregation verifies that
|
||||
// FileCount for an EC volume is deduped across nodes (every shard holder has
|
||||
// an identical .ecx, so the total entry count is taken once) while
|
||||
// DeleteCount is summed (each needle delete tombstones exactly one node's
|
||||
// .ecx, so the true delete total is the sum of every holder's local count).
|
||||
func TestCollectCollectionStatsECFileAndDeleteCountAggregation(t *testing.T) {
|
||||
// Volume id=7 reported by three nodes. Every node reports file_count=100
|
||||
// (same .ecx). Local delete counts: 5 + 3 + 2 = 10 deletes total.
|
||||
// Expected live object count = 100 - 10 = 90.
|
||||
makeNode := func(bits uint32, sizes []int64, deleteCount uint64) *master_pb.DataNodeInfo {
|
||||
return &master_pb.DataNodeInfo{
|
||||
DiskInfos: map[string]*master_pb.DiskInfo{
|
||||
"disk1": {
|
||||
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{
|
||||
{
|
||||
Id: 7,
|
||||
Collection: "bucket-a",
|
||||
EcIndexBits: bits,
|
||||
ShardSizes: sizes,
|
||||
FileCount: 100,
|
||||
DeleteCount: deleteCount,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
topo := &master_pb.TopologyInfo{
|
||||
DataCenterInfos: []*master_pb.DataCenterInfo{
|
||||
{
|
||||
RackInfos: []*master_pb.RackInfo{
|
||||
{
|
||||
DataNodeInfos: []*master_pb.DataNodeInfo{
|
||||
makeNode((1<<0)|(1<<1)|(1<<2)|(1<<3), []int64{1000, 1000, 1000, 1000}, 5),
|
||||
makeNode((1<<4)|(1<<5)|(1<<6)|(1<<7), []int64{1000, 1000, 1000, 1000}, 3),
|
||||
makeNode((1<<8)|(1<<9)|(1<<10)|(1<<11)|(1<<12)|(1<<13), []int64{1000, 1000, 1000, 1000, 1000, 1000}, 2),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
stats := collectCollectionStats(topo)
|
||||
|
||||
got, ok := stats["bucket-a"]
|
||||
if !ok {
|
||||
t.Fatalf("expected collection bucket-a in stats, got: %v", stats)
|
||||
}
|
||||
if got.FileCount != 90 {
|
||||
t.Errorf("FileCount: got %d, want 90 (100 total - 10 deletes summed)", got.FileCount)
|
||||
}
|
||||
// Sanity: 14 shards × 1000 bytes physical, 10 data shards logical.
|
||||
if got.PhysicalSize != 14000 {
|
||||
t.Errorf("PhysicalSize: got %d, want 14000", got.PhysicalSize)
|
||||
}
|
||||
if got.LogicalSize != 10000 {
|
||||
t.Errorf("LogicalSize: got %d, want 10000", got.LogicalSize)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,6 +145,11 @@ message VolumeEcShardInformationMessage {
|
||||
uint64 expire_at_sec = 5; // used to record the destruction time of ec volume
|
||||
uint32 disk_id = 6;
|
||||
repeated int64 shard_sizes = 7; // optimized: sizes for shards in order of set bits in ec_index_bits
|
||||
uint64 file_count = 8; // total needles in the .ecx index (live + tombstoned)
|
||||
uint64 delete_count = 9; // node-local tombstones in the .ecj deletion journal
|
||||
// fields 10-19 reserved for future upstream open-source additions.
|
||||
// fields 20+ are owned by the enterprise fork (e.g. data_shards/parity_shards)
|
||||
// and must not be used here without coordination.
|
||||
}
|
||||
|
||||
message StorageBackend {
|
||||
|
||||
@@ -639,6 +639,8 @@ type VolumeEcShardInformationMessage struct {
|
||||
ExpireAtSec uint64 `protobuf:"varint,5,opt,name=expire_at_sec,json=expireAtSec,proto3" json:"expire_at_sec,omitempty"` // used to record the destruction time of ec volume
|
||||
DiskId uint32 `protobuf:"varint,6,opt,name=disk_id,json=diskId,proto3" json:"disk_id,omitempty"`
|
||||
ShardSizes []int64 `protobuf:"varint,7,rep,packed,name=shard_sizes,json=shardSizes,proto3" json:"shard_sizes,omitempty"` // optimized: sizes for shards in order of set bits in ec_index_bits
|
||||
FileCount uint64 `protobuf:"varint,8,opt,name=file_count,json=fileCount,proto3" json:"file_count,omitempty"` // total needles in the .ecx index (live + tombstoned)
|
||||
DeleteCount uint64 `protobuf:"varint,9,opt,name=delete_count,json=deleteCount,proto3" json:"delete_count,omitempty"` // node-local tombstones in the .ecj deletion journal
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -722,6 +724,20 @@ func (x *VolumeEcShardInformationMessage) GetShardSizes() []int64 {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *VolumeEcShardInformationMessage) GetFileCount() uint64 {
|
||||
if x != nil {
|
||||
return x.FileCount
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *VolumeEcShardInformationMessage) GetDeleteCount() uint64 {
|
||||
if x != nil {
|
||||
return x.DeleteCount
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type StorageBackend struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
|
||||
@@ -4423,7 +4439,7 @@ const file_master_proto_rawDesc = "" +
|
||||
"\x03ttl\x18\n" +
|
||||
" \x01(\rR\x03ttl\x12\x1b\n" +
|
||||
"\tdisk_type\x18\x0f \x01(\tR\bdiskType\x12\x17\n" +
|
||||
"\adisk_id\x18\x10 \x01(\rR\x06diskId\"\xf0\x01\n" +
|
||||
"\adisk_id\x18\x10 \x01(\rR\x06diskId\"\xb2\x02\n" +
|
||||
"\x1fVolumeEcShardInformationMessage\x12\x0e\n" +
|
||||
"\x02id\x18\x01 \x01(\rR\x02id\x12\x1e\n" +
|
||||
"\n" +
|
||||
@@ -4434,7 +4450,10 @@ const file_master_proto_rawDesc = "" +
|
||||
"\rexpire_at_sec\x18\x05 \x01(\x04R\vexpireAtSec\x12\x17\n" +
|
||||
"\adisk_id\x18\x06 \x01(\rR\x06diskId\x12\x1f\n" +
|
||||
"\vshard_sizes\x18\a \x03(\x03R\n" +
|
||||
"shardSizes\"\xbe\x01\n" +
|
||||
"shardSizes\x12\x1d\n" +
|
||||
"\n" +
|
||||
"file_count\x18\b \x01(\x04R\tfileCount\x12!\n" +
|
||||
"\fdelete_count\x18\t \x01(\x04R\vdeleteCount\"\xbe\x01\n" +
|
||||
"\x0eStorageBackend\x12\x12\n" +
|
||||
"\x04type\x18\x01 \x01(\tR\x04type\x12\x0e\n" +
|
||||
"\x02id\x18\x02 \x01(\tR\x02id\x12I\n" +
|
||||
|
||||
@@ -482,86 +482,98 @@ func TestEcxFileDeletionWithSeparateHandles(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestEcVolumeSyncEnsuresDeletionsVisible is an integration test for issue #7751
|
||||
// that verifies the EcVolume.Sync() method ensures deleted needles are visible
|
||||
// to external readers (like ec.decode's CopyFile).
|
||||
func TestEcVolumeSyncEnsuresDeletionsVisible(t *testing.T) {
|
||||
// TestEcVolumeDeleteDurableToJournal tracks issue #7751: a runtime needle
|
||||
// delete must be observable by the ec.decode CopyFile path. Under the
|
||||
// current design .ecx is an immutable sealed index at runtime — deletes
|
||||
// are journaled to .ecj and tracked in an in-memory set — so the
|
||||
// durability chain decode relies on is:
|
||||
//
|
||||
// 1. DeleteNeedleFromEcx appends the needle id to .ecj and fsyncs it.
|
||||
// 2. Runtime reads via FindNeedleFromEcx consult the in-memory set and
|
||||
// return TombstoneFileSize even though the sealed .ecx record on
|
||||
// disk still shows the original size.
|
||||
// 3. ec.decode later closes the EcVolume and calls RebuildEcxFile on
|
||||
// the now-quiescent files, which walks .ecj and writes tombstones
|
||||
// into .ecx. CopyFile then reads the rebuilt .ecx.
|
||||
//
|
||||
// This test exercises the full chain on a tempdir fixture.
|
||||
func TestEcVolumeDeleteDurableToJournal(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
collection := "test"
|
||||
vid := 1
|
||||
base := filepath.Join(dir, collection+"_1")
|
||||
|
||||
// Create initial .ecx file with live needle
|
||||
// Seed .ecx with two live needles.
|
||||
needle1 := makeNeedleMapEntry(types.NeedleId(1), types.ToOffset(64), types.Size(100))
|
||||
needle2 := makeNeedleMapEntry(types.NeedleId(2), types.ToOffset(128), types.Size(200))
|
||||
ecxData := append(needle1, needle2...)
|
||||
if err := os.WriteFile(base+".ecx", ecxData, 0644); err != nil {
|
||||
t.Fatalf("write ecx: %v", err)
|
||||
}
|
||||
|
||||
// Create empty .ecj file
|
||||
if err := os.WriteFile(base+".ecj", []byte{}, 0644); err != nil {
|
||||
t.Fatalf("write ecj: %v", err)
|
||||
}
|
||||
|
||||
// Create minimal EC shard file to allow EcVolume creation
|
||||
// Shards need super block header (8 bytes)
|
||||
shardData := make([]byte, 8)
|
||||
if err := os.WriteFile(base+".ec00", shardData, 0644); err != nil {
|
||||
if err := os.WriteFile(base+".ec00", make([]byte, 8), 0644); err != nil {
|
||||
t.Fatalf("write ec00: %v", err)
|
||||
}
|
||||
|
||||
// Create .vif file
|
||||
if err := os.WriteFile(base+".vif", []byte{}, 0644); err != nil {
|
||||
t.Fatalf("write vif: %v", err)
|
||||
}
|
||||
|
||||
// Create EcVolume
|
||||
ecVolume, err := erasure_coding.NewEcVolume(
|
||||
"hdd",
|
||||
dir, // data dir
|
||||
dir, // idx dir
|
||||
collection,
|
||||
needle.VolumeId(vid),
|
||||
)
|
||||
ecVolume, err := erasure_coding.NewEcVolume("hdd", dir, dir, collection, needle.VolumeId(vid))
|
||||
if err != nil {
|
||||
t.Fatalf("NewEcVolume: %v", err)
|
||||
}
|
||||
defer ecVolume.Close()
|
||||
|
||||
// Delete needle 2 via EcVolume (this writes to .ecx and .ecj)
|
||||
// Runtime delete must not mutate .ecx.
|
||||
if err := ecVolume.DeleteNeedleFromEcx(types.NeedleId(2)); err != nil {
|
||||
t.Fatalf("DeleteNeedleFromEcx: %v", err)
|
||||
}
|
||||
|
||||
// Before Sync: open a new reader handle to simulate CopyFile
|
||||
readerBefore, err := os.OpenFile(base+".ecx", os.O_RDONLY, 0644)
|
||||
// FindNeedleFromEcx masks the id via the in-memory set.
|
||||
_, size, err := ecVolume.FindNeedleFromEcx(types.NeedleId(2))
|
||||
if err != nil {
|
||||
t.Fatalf("open ecx for read (before sync): %v", err)
|
||||
t.Fatalf("FindNeedleFromEcx(2): %v", err)
|
||||
}
|
||||
defer readerBefore.Close()
|
||||
|
||||
// Now call Sync() - this is what fixes issue #7751
|
||||
ecVolume.Sync()
|
||||
|
||||
// After Sync: open another reader handle
|
||||
readerAfter, err := os.OpenFile(base+".ecx", os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("open ecx for read (after sync): %v", err)
|
||||
}
|
||||
defer readerAfter.Close()
|
||||
|
||||
// Read needle 2's entry from the after-sync handle
|
||||
entrySize := types.NeedleIdSize + types.OffsetSize + types.SizeSize
|
||||
data := make([]byte, entrySize)
|
||||
if _, err := readerAfter.ReadAt(data, int64(entrySize)); err != nil {
|
||||
t.Fatalf("ReadAt after sync: %v", err)
|
||||
}
|
||||
|
||||
// Verify needle 2 is marked as deleted
|
||||
size := types.BytesToSize(data[types.NeedleIdSize+types.OffsetSize:])
|
||||
if !size.IsDeleted() {
|
||||
t.Fatalf("expected needle 2 to be deleted after Sync(), got size: %d", size)
|
||||
t.Fatalf("expected FindNeedleFromEcx to return tombstone for deleted needle, got size=%d", size)
|
||||
}
|
||||
|
||||
// Direct .ecx reader should still see the original size — .ecx is
|
||||
// immutable at runtime.
|
||||
entrySize := int64(types.NeedleIdSize + types.OffsetSize + types.SizeSize)
|
||||
rawBuf := make([]byte, entrySize)
|
||||
rawReader, err := os.Open(base + ".ecx")
|
||||
if err != nil {
|
||||
t.Fatalf("open ecx raw: %v", err)
|
||||
}
|
||||
if _, err := rawReader.ReadAt(rawBuf, entrySize); err != nil {
|
||||
t.Fatalf("read raw ecx entry: %v", err)
|
||||
}
|
||||
rawReader.Close()
|
||||
rawSize := types.BytesToSize(rawBuf[types.NeedleIdSize+types.OffsetSize:])
|
||||
if rawSize.IsDeleted() {
|
||||
t.Fatalf("runtime delete must not mutate .ecx on disk; got tombstone in raw entry")
|
||||
}
|
||||
|
||||
// Close the volume so RebuildEcxFile can operate on the files, then
|
||||
// fold .ecj into .ecx as the decode path does and verify the rebuilt
|
||||
// index has the tombstone visible to external readers.
|
||||
ecVolume.Close()
|
||||
if err := erasure_coding.RebuildEcxFile(base); err != nil {
|
||||
t.Fatalf("RebuildEcxFile: %v", err)
|
||||
}
|
||||
rebuilt, err := os.Open(base + ".ecx")
|
||||
if err != nil {
|
||||
t.Fatalf("open rebuilt ecx: %v", err)
|
||||
}
|
||||
defer rebuilt.Close()
|
||||
if _, err := rebuilt.ReadAt(rawBuf, entrySize); err != nil {
|
||||
t.Fatalf("read rebuilt ecx entry: %v", err)
|
||||
}
|
||||
rebuiltSize := types.BytesToSize(rawBuf[types.NeedleIdSize+types.OffsetSize:])
|
||||
if !rebuiltSize.IsDeleted() {
|
||||
t.Fatalf("expected needle 2 to be tombstoned in rebuilt .ecx, got size=%d", rebuiltSize)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,22 @@ type EcVolume struct {
|
||||
datFileSize int64
|
||||
ExpireAtSec uint64 //ec volume destroy time, calculated from the ec volume was created
|
||||
ECContext *ECContext // EC encoding parameters
|
||||
|
||||
// ecjFileSize mirrors the on-disk size of the .ecj deletion journal and
|
||||
// is maintained under ecjFileAccessLock. It is only used by IO helpers
|
||||
// (seek/truncate) — the authoritative runtime delete count comes from
|
||||
// deletedNeedles.
|
||||
ecjFileSize int64
|
||||
|
||||
// deletedNeedles is the in-memory set of needle ids that have been
|
||||
// deleted since the volume was encoded. .ecx is immutable at runtime —
|
||||
// it only stores the sorted (id, offset, size) index written at encode
|
||||
// time — and runtime deletes are journaled to .ecj + tracked here.
|
||||
// Reads consult this set to mask out deleted needles on top of the
|
||||
// sealed .ecx lookup. Heartbeat delete_count is derived from len(set).
|
||||
// Seeded from .ecj in NewEcVolume and updated under deletedNeedlesLock.
|
||||
deletedNeedlesLock sync.RWMutex
|
||||
deletedNeedles map[types.NeedleId]struct{}
|
||||
}
|
||||
|
||||
func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
|
||||
@@ -75,10 +91,19 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection
|
||||
ev.ecxFileSize = ecxFi.Size()
|
||||
ev.ecxCreatedAt = ecxFi.ModTime()
|
||||
|
||||
// open ecj file
|
||||
// open ecj file and seed the in-memory deleted set from it.
|
||||
if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
|
||||
return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err)
|
||||
}
|
||||
if ecjFi, statErr := ev.ecjFile.Stat(); statErr == nil {
|
||||
ev.ecjFileSize = ecjFi.Size()
|
||||
} else {
|
||||
glog.Warningf("stat ec volume journal %s.ecj: %v", indexBaseFileName, statErr)
|
||||
}
|
||||
ev.deletedNeedles = make(map[types.NeedleId]struct{})
|
||||
if loadErr := ev.loadDeletedNeedlesFromEcj(); loadErr != nil {
|
||||
glog.Warningf("ec volume %d: load deleted needles from .ecj: %v", vid, loadErr)
|
||||
}
|
||||
|
||||
// read volume info
|
||||
ev.Version = needle.Version3
|
||||
@@ -254,6 +279,8 @@ func (ev *EcVolume) ShardIdList() (shardIds []ShardId) {
|
||||
func (ev *EcVolume) ToVolumeEcShardInformationMessage(diskId uint32) (messages []*master_pb.VolumeEcShardInformationMessage) {
|
||||
ecInfoPerVolume := map[needle.VolumeId]*master_pb.VolumeEcShardInformationMessage{}
|
||||
|
||||
fileCount, deleteCount := ev.FileAndDeleteCount()
|
||||
|
||||
for _, s := range ev.Shards {
|
||||
m, ok := ecInfoPerVolume[s.VolumeId]
|
||||
if !ok {
|
||||
@@ -263,6 +290,8 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage(diskId uint32) (messages [
|
||||
DiskType: string(ev.diskType),
|
||||
ExpireAtSec: ev.ExpireAtSec,
|
||||
DiskId: diskId,
|
||||
FileCount: fileCount,
|
||||
DeleteCount: deleteCount,
|
||||
}
|
||||
ecInfoPerVolume[s.VolumeId] = m
|
||||
}
|
||||
@@ -280,6 +309,67 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage(diskId uint32) (messages [
|
||||
return
|
||||
}
|
||||
|
||||
// FileAndDeleteCount returns the current (fileCount, deleteCount) for this
|
||||
// EC volume.
|
||||
//
|
||||
// - fileCount = .ecx size / NeedleMapEntrySize — the total number of
|
||||
// needles recorded in the sealed sorted index. Because .ecx is written
|
||||
// at encode time and only overwritten during decode/rebuild (which
|
||||
// preserves record count), this matches the "cumulative put count"
|
||||
// semantics of regular volume FileCount.
|
||||
//
|
||||
// - deleteCount = len(deletedNeedles) — the number of unique runtime
|
||||
// deletes tracked in memory. The set is seeded from .ecj on load and
|
||||
// appended to on every successful DeleteNeedleFromEcx. Because a
|
||||
// needle delete is applied on exactly one shard holder, the admin
|
||||
// aggregation sums deleteCount across nodes to get the volume's true
|
||||
// delete total.
|
||||
//
|
||||
// Both values are O(1) — no index walking.
|
||||
func (ev *EcVolume) FileAndDeleteCount() (fileCount, deleteCount uint64) {
|
||||
fileCount = uint64(ev.ecxFileSize) / uint64(types.NeedleMapEntrySize)
|
||||
ev.deletedNeedlesLock.RLock()
|
||||
deleteCount = uint64(len(ev.deletedNeedles))
|
||||
ev.deletedNeedlesLock.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
// IsNeedleDeleted reports whether the given needle id is in the in-memory
|
||||
// deleted set. Callers that have already looked the needle up in .ecx
|
||||
// should consult this to apply runtime deletion state on top of the
|
||||
// sealed index.
|
||||
func (ev *EcVolume) IsNeedleDeleted(needleId types.NeedleId) bool {
|
||||
ev.deletedNeedlesLock.RLock()
|
||||
_, ok := ev.deletedNeedles[needleId]
|
||||
ev.deletedNeedlesLock.RUnlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
// markNeedleDeletedInMemory inserts a needle id into the deleted set.
|
||||
func (ev *EcVolume) markNeedleDeletedInMemory(needleId types.NeedleId) {
|
||||
ev.deletedNeedlesLock.Lock()
|
||||
ev.deletedNeedles[needleId] = struct{}{}
|
||||
ev.deletedNeedlesLock.Unlock()
|
||||
}
|
||||
|
||||
// loadDeletedNeedlesFromEcj walks the .ecj journal and populates the
|
||||
// in-memory deleted set. Called once from NewEcVolume under the exclusive
|
||||
// ownership of the just-constructed (and not yet shared) EcVolume.
|
||||
func (ev *EcVolume) loadDeletedNeedlesFromEcj() error {
|
||||
if ev.ecjFile == nil || ev.ecjFileSize < int64(types.NeedleIdSize) {
|
||||
return nil
|
||||
}
|
||||
buf := make([]byte, types.NeedleIdSize)
|
||||
for off := int64(0); off+int64(types.NeedleIdSize) <= ev.ecjFileSize; off += int64(types.NeedleIdSize) {
|
||||
if _, err := ev.ecjFile.ReadAt(buf, off); err != nil {
|
||||
return fmt.Errorf("read ecj at %d: %w", off, err)
|
||||
}
|
||||
id := types.BytesToNeedleId(buf)
|
||||
ev.deletedNeedles[id] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) {
|
||||
|
||||
// find the needle from ecx file
|
||||
@@ -313,7 +403,15 @@ func (ev *EcVolume) LocateEcShardNeedleInterval(version needle.Version, offset i
|
||||
}
|
||||
|
||||
func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) {
|
||||
return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
|
||||
offset, size, err = SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Apply runtime deletion state on top of the sealed .ecx lookup.
|
||||
if ev.IsNeedleDeleted(needleId) {
|
||||
size = types.TombstoneFileSize
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) {
|
||||
|
||||
@@ -0,0 +1,125 @@
|
||||
package erasure_coding_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
)
|
||||
|
||||
// makeEntry builds one .ecx index entry (needle id, offset, size).
|
||||
func makeEntry(key types.NeedleId, offset types.Offset, size types.Size) []byte {
|
||||
b := make([]byte, types.NeedleIdSize+types.OffsetSize+types.SizeSize)
|
||||
types.NeedleIdToBytes(b[0:types.NeedleIdSize], key)
|
||||
types.OffsetToBytes(b[types.NeedleIdSize:types.NeedleIdSize+types.OffsetSize], offset)
|
||||
types.SizeToBytes(b[types.NeedleIdSize+types.OffsetSize:], size)
|
||||
return b
|
||||
}
|
||||
|
||||
// encodeEcjIds packs a slice of needle ids into the binary .ecj on-disk format.
|
||||
func encodeEcjIds(ids []types.NeedleId) []byte {
|
||||
buf := make([]byte, 0, len(ids)*types.NeedleIdSize)
|
||||
for _, id := range ids {
|
||||
b := make([]byte, types.NeedleIdSize)
|
||||
types.NeedleIdToBytes(b, id)
|
||||
buf = append(buf, b...)
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
func writeFixture(t *testing.T, dir, collection string, vid int, ecxData []byte, ecjIds []types.NeedleId) *erasure_coding.EcVolume {
|
||||
t.Helper()
|
||||
base := filepath.Join(dir, fmt.Sprintf("%s_%d", collection, vid))
|
||||
|
||||
if err := os.WriteFile(base+".ecx", ecxData, 0644); err != nil {
|
||||
t.Fatalf("write ecx: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(base+".ecj", encodeEcjIds(ecjIds), 0644); err != nil {
|
||||
t.Fatalf("write ecj: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(base+".ec00", make([]byte, 8), 0644); err != nil {
|
||||
t.Fatalf("write ec00: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(base+".vif", []byte{}, 0644); err != nil {
|
||||
t.Fatalf("write vif: %v", err)
|
||||
}
|
||||
|
||||
ev, err := erasure_coding.NewEcVolume("hdd", dir, dir, collection, needle.VolumeId(vid))
|
||||
if err != nil {
|
||||
t.Fatalf("NewEcVolume: %v", err)
|
||||
}
|
||||
return ev
|
||||
}
|
||||
|
||||
// TestEcVolumeFileAndDeleteCountInitial verifies that FileAndDeleteCount is
|
||||
// derived from file sizes at load: fileCount = .ecx size / NeedleMapEntrySize
|
||||
// and deleteCount = .ecj size / NeedleIdSize. No index walk is performed.
|
||||
func TestEcVolumeFileAndDeleteCountInitial(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
// 3 needles in .ecx, 2 deletions already recorded in .ecj.
|
||||
ecx := []byte{}
|
||||
ecx = append(ecx, makeEntry(1, types.ToOffset(64), 100)...)
|
||||
ecx = append(ecx, makeEntry(2, types.ToOffset(128), 200)...)
|
||||
ecx = append(ecx, makeEntry(3, types.ToOffset(256), 300)...)
|
||||
ecj := []types.NeedleId{2, 3}
|
||||
|
||||
ev := writeFixture(t, dir, "test", 1, ecx, ecj)
|
||||
defer ev.Close()
|
||||
|
||||
fileCount, deleteCount := ev.FileAndDeleteCount()
|
||||
if fileCount != 3 {
|
||||
t.Errorf("fileCount: got %d, want 3 (.ecx entries)", fileCount)
|
||||
}
|
||||
if deleteCount != 2 {
|
||||
t.Errorf("deleteCount: got %d, want 2 (.ecj entries)", deleteCount)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEcVolumeFileAndDeleteCountAfterDelete verifies that DeleteNeedleFromEcx
|
||||
// increments the derived delete count by appending to .ecj, while leaving
|
||||
// fileCount (derived from the sealed .ecx) unchanged. Idempotent re-deletes
|
||||
// and deletes of missing needles must not drift the count.
|
||||
func TestEcVolumeFileAndDeleteCountAfterDelete(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
ecx := []byte{}
|
||||
ecx = append(ecx, makeEntry(1, types.ToOffset(64), 100)...)
|
||||
ecx = append(ecx, makeEntry(2, types.ToOffset(128), 200)...)
|
||||
|
||||
ev := writeFixture(t, dir, "test", 1, ecx, nil)
|
||||
defer ev.Close()
|
||||
|
||||
if fc, dc := ev.FileAndDeleteCount(); fc != 2 || dc != 0 {
|
||||
t.Fatalf("initial: got (%d, %d), want (2, 0)", fc, dc)
|
||||
}
|
||||
|
||||
if err := ev.DeleteNeedleFromEcx(2); err != nil {
|
||||
t.Fatalf("DeleteNeedleFromEcx: %v", err)
|
||||
}
|
||||
if fc, dc := ev.FileAndDeleteCount(); fc != 2 || dc != 1 {
|
||||
t.Errorf("after first delete: got (%d, %d), want (2, 1)", fc, dc)
|
||||
}
|
||||
|
||||
// Re-deleting an already tombstoned needle is a no-op on .ecj, so
|
||||
// deleteCount must stay at 1.
|
||||
if err := ev.DeleteNeedleFromEcx(2); err != nil {
|
||||
t.Fatalf("idempotent DeleteNeedleFromEcx: %v", err)
|
||||
}
|
||||
if fc, dc := ev.FileAndDeleteCount(); fc != 2 || dc != 1 {
|
||||
t.Errorf("after idempotent delete: got (%d, %d), want (2, 1)", fc, dc)
|
||||
}
|
||||
|
||||
// Deleting a non-existent needle is a no-op: search returns NotFound,
|
||||
// no .ecj append.
|
||||
if err := ev.DeleteNeedleFromEcx(99); err != nil {
|
||||
t.Fatalf("missing DeleteNeedleFromEcx: %v", err)
|
||||
}
|
||||
if fc, dc := ev.FileAndDeleteCount(); fc != 2 || dc != 1 {
|
||||
t.Errorf("after missing delete: got (%d, %d), want (2, 1)", fc, dc)
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
@@ -24,28 +25,71 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
// DeleteNeedleFromEcx marks the given needle as deleted. .ecx is treated
|
||||
// as an immutable sealed sorted index; runtime deletes are recorded by
|
||||
// appending the needle id to the .ecj deletion journal and inserting it
|
||||
// into the in-memory deletedNeedles set. A subsequent FindNeedleFromEcx
|
||||
// masks the id out by returning TombstoneFileSize.
|
||||
//
|
||||
// The .ecj append is the durable commit point — only after it syncs do
|
||||
// we publish the id into the in-memory set. A partial write is truncated
|
||||
// back to the known-good size so the on-disk journal and the set cannot
|
||||
// drift.
|
||||
func (ev *EcVolume) DeleteNeedleFromEcx(needleId types.NeedleId) (err error) {
|
||||
|
||||
_, _, err = SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, MarkNeedleDeleted)
|
||||
|
||||
// Look the needle up read-only. A missing needle is not an error
|
||||
// (already gone, e.g. from a race against encode); a pre-existing
|
||||
// .ecx tombstone means a prior decode/rebuild folded it in, in
|
||||
// which case there is nothing to journal but we still mirror it
|
||||
// into the in-memory set so delete_count stays consistent.
|
||||
_, oldSize, err := SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
|
||||
if err != nil {
|
||||
if err == NotFoundError {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
if oldSize.IsDeleted() {
|
||||
ev.markNeedleDeletedInMemory(needleId)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Serialise runtime deletes on ecjFileAccessLock so the idempotence
|
||||
// check, the journal append and the set insertion happen atomically
|
||||
// with respect to one another.
|
||||
ev.ecjFileAccessLock.Lock()
|
||||
defer ev.ecjFileAccessLock.Unlock()
|
||||
|
||||
if ev.IsNeedleDeleted(needleId) {
|
||||
return nil
|
||||
}
|
||||
|
||||
b := make([]byte, types.NeedleIdSize)
|
||||
types.NeedleIdToBytes(b, needleId)
|
||||
|
||||
ev.ecjFileAccessLock.Lock()
|
||||
prevEcjSize := ev.ecjFileSize
|
||||
if _, seekErr := ev.ecjFile.Seek(0, io.SeekEnd); seekErr != nil {
|
||||
return fmt.Errorf("seek ecj: %w", seekErr)
|
||||
}
|
||||
n, writeErr := ev.ecjFile.Write(b)
|
||||
if writeErr != nil {
|
||||
if truncErr := ev.ecjFile.Truncate(prevEcjSize); truncErr != nil {
|
||||
glog.Errorf("ec volume %d: failed to truncate ecj after write error: %v", ev.VolumeId, truncErr)
|
||||
}
|
||||
return fmt.Errorf("write ecj: %w", writeErr)
|
||||
}
|
||||
if syncErr := ev.ecjFile.Sync(); syncErr != nil {
|
||||
if truncErr := ev.ecjFile.Truncate(prevEcjSize); truncErr != nil {
|
||||
glog.Errorf("ec volume %d: failed to truncate ecj after sync error: %v", ev.VolumeId, truncErr)
|
||||
}
|
||||
return fmt.Errorf("sync ecj: %w", syncErr)
|
||||
}
|
||||
ev.ecjFileSize += int64(n)
|
||||
|
||||
ev.ecjFile.Seek(0, io.SeekEnd)
|
||||
ev.ecjFile.Write(b)
|
||||
// Publish into the in-memory set only after the journal is durable.
|
||||
ev.markNeedleDeletedInMemory(needleId)
|
||||
|
||||
ev.ecjFileAccessLock.Unlock()
|
||||
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
func RebuildEcxFile(baseFileName string) error {
|
||||
|
||||
@@ -13,6 +13,8 @@ type EcVolumeInfo struct {
|
||||
DiskId uint32 // ID of the disk this EC volume is on
|
||||
ExpireAtSec uint64 // ec volume destroy time, calculated from the ec volume was created
|
||||
ShardsInfo *ShardsInfo
|
||||
FileCount uint64 // live needle count for this EC volume (same on every node holding shards)
|
||||
DeleteCount uint64 // tombstoned needle count for this EC volume
|
||||
}
|
||||
|
||||
func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo {
|
||||
@@ -23,6 +25,8 @@ func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo {
|
||||
DiskType: ecInfo.DiskType,
|
||||
DiskId: ecInfo.DiskId,
|
||||
ExpireAtSec: ecInfo.ExpireAtSec,
|
||||
FileCount: ecInfo.FileCount,
|
||||
DeleteCount: ecInfo.DeleteCount,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,5 +39,7 @@ func (evi *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb.Vol
|
||||
DiskType: evi.DiskType,
|
||||
ExpireAtSec: evi.ExpireAtSec,
|
||||
DiskId: evi.DiskId,
|
||||
FileCount: evi.FileCount,
|
||||
DeleteCount: evi.DeleteCount,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
@@ -35,6 +36,12 @@ func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle
|
||||
|
||||
}
|
||||
|
||||
// errEcShardMissing indicates that no data node in the topology currently
|
||||
// holds the requested EC shard. Callers use this to trigger fallback to
|
||||
// another shard, distinguishing "no home for this shard" from "the shard
|
||||
// holder is reachable but the RPC failed".
|
||||
var errEcShardMissing = fmt.Errorf("ec shard missing")
|
||||
|
||||
func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
|
||||
|
||||
_, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version)
|
||||
@@ -45,15 +52,22 @@ func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_cod
|
||||
return erasure_coding.NotFoundError
|
||||
}
|
||||
|
||||
shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
|
||||
primaryShardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
|
||||
|
||||
err = s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId)
|
||||
if err == nil {
|
||||
return nil
|
||||
// Normal path: delete on exactly one node holding the primary data shard.
|
||||
err = s.doDeleteNeedleFromRemoteEcShardServers(primaryShardId, ecVolume, needleId)
|
||||
if err == nil || !errors.Is(err, errEcShardMissing) {
|
||||
return err
|
||||
}
|
||||
|
||||
for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ {
|
||||
if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); parityDeletionError == nil {
|
||||
// Primary data shard has no live holders; fall back to any other shard
|
||||
// (remaining data shards first, then parity) so a shard holder can still
|
||||
// tombstone the .ecx and the delete is durable.
|
||||
for shardId := erasure_coding.ShardId(0); shardId < erasure_coding.TotalShardsCount; shardId++ {
|
||||
if shardId == primaryShardId {
|
||||
continue
|
||||
}
|
||||
if fallbackErr := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); fallbackErr == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -68,18 +82,25 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.Sh
|
||||
sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId]
|
||||
ecVolume.ShardLocationsLock.RUnlock()
|
||||
|
||||
if !hasShardLocations {
|
||||
return fmt.Errorf("ec shard %d.%d not located", ecVolume.VolumeId, shardId)
|
||||
if !hasShardLocations || len(sourceDataNodes) == 0 {
|
||||
return fmt.Errorf("ec shard %d.%d: %w", ecVolume.VolumeId, shardId, errEcShardMissing)
|
||||
}
|
||||
|
||||
// Apply the delete on exactly one node. Replicas of the same shard all
|
||||
// hold identical .ecx copies, so tombstoning more than one would double
|
||||
// the delete count in heartbeat reporting. Try nodes in order and stop
|
||||
// at the first success; only return error if every replica failed.
|
||||
var lastErr error
|
||||
for _, sourceDataNode := range sourceDataNodes {
|
||||
glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode)
|
||||
if err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId); err != nil {
|
||||
return err
|
||||
if err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId); err == nil {
|
||||
return nil
|
||||
} else {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return lastErr
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -26,6 +26,8 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf
|
||||
DiskType: shardInfo.DiskType,
|
||||
DiskId: shardInfo.DiskId,
|
||||
ExpireAtSec: shardInfo.ExpireAtSec,
|
||||
FileCount: shardInfo.FileCount,
|
||||
DeleteCount: shardInfo.DeleteCount,
|
||||
}
|
||||
|
||||
shards = append(shards, ecVolumeInfo)
|
||||
@@ -53,6 +55,8 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards
|
||||
DiskType: shardInfo.DiskType,
|
||||
DiskId: shardInfo.DiskId,
|
||||
ExpireAtSec: shardInfo.ExpireAtSec,
|
||||
FileCount: shardInfo.FileCount,
|
||||
DeleteCount: shardInfo.DeleteCount,
|
||||
}
|
||||
|
||||
newShards = append(newShards, ecVolumeInfo)
|
||||
@@ -66,6 +70,8 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards
|
||||
DiskType: shardInfo.DiskType,
|
||||
DiskId: shardInfo.DiskId,
|
||||
ExpireAtSec: shardInfo.ExpireAtSec,
|
||||
FileCount: shardInfo.FileCount,
|
||||
DeleteCount: shardInfo.DeleteCount,
|
||||
}
|
||||
|
||||
deletedShards = append(deletedShards, ecVolumeInfo)
|
||||
|
||||
Reference in New Issue
Block a user