fix(ec): mirror EC sidecars onto every shard-bearing disk at startup (#9525)

* fix(ec): mirror EC sidecars onto every shard-bearing disk at startup

In a multi-disk volume server, ec.balance and ec.rebuild can land shards
on a disk that does not also hold the matching .ecx / .ecj / .vif index
files. The orphan-shard reconciler in reconcileEcShardsAcrossDisks
already loads those shards by pointing the EcVolume at the sibling
disk's index files; reads work, but any failure on the index-owning
disk silently disables every shard on the other disk, even though those
shards are physically fine.

This change adds mirrorEcMetadataToShardDisks, a startup pass that
physically replicates .ecx / .ecj / .vif onto each disk that holds
shards but is missing them. Each copy is atomic (tmp + fsync + rename)
and idempotent (a destination that already has the sidecar is
preserved). After mirroring, the cross-disk reconciler prefers the
local IdxDirectory so the EcVolume mounts self-contained; the
cross-disk virtual mount remains as a fallback for volumes whose mirror
failed (read-only target, out of space, partial copy on a previous
boot).

The same-disk invariant the EC lifecycle (encode / decode / balance /
vacuum / repair) was already documented as promising is now actually
restored at boot, so a future failure of one disk in a split-shards
layout no longer takes the other disk's shards with it.

Tests cover the orphan-layout mirror (dir0 receives the .ecx / .ecj /
.vif from dir1) and idempotency (an existing destination .ecx is not
overwritten with the owner's copy).

* fix(ec): handle legacy pre-dir.idx sidecar layout in mirror skip-check

hasAllEcSidecarsLocally checked only the modern destination path
(IdxDirectory for .ecx/.ecj, Directory for .vif). A destination disk
that still had a legacy .ecx in its data dir (written before -dir.idx
was set) would report "not present" and the mirror would write a
second copy to IdxDirectory, leaving two .ecx files on disk.

Matches HasEcxFileOnDisk's open-with-fallback contract: check the
modern path first, then the opposite directory. Factored the
exists-and-not-a-dir check into a small statRegular helper so the
fallback ladder stays readable.

* rust(seaweed-volume): mirror EC sidecars onto shard-bearing disks at startup

Port of the Go fix (commit 088e26ea6) to the Rust volume server.
Adds Store::mirror_ec_metadata_to_shard_disks, called from
add_location / load_new_volumes before the cross-disk orphan
reconciler. Physically copies .ecx / .ecj / .vif from the disk that
owns the index files onto every disk holding shards but missing
sidecars, so each shard-bearing disk ends up self-contained.

The reconciler now prefers the local idx_directory when the mirror
has installed a .ecx there; the cross-disk virtual mount remains as
the fallback for volumes whose mirror failed (read-only target, out
of space, partial copy on a previous boot). Adds ec_local_ecx_path
helper shared between reconcile and mirror to detect the post-mirror
fast path.

Mirrors the Go-side fallback in hasAllEcSidecarsLocally: when
-dir.idx is configured and the destination still has a legacy .ecx
in its data dir, that's recognized so the mirror does not write a
duplicate copy into idx_directory.

Tests cover the two key cases: orphan layout (dir0 receives the
sidecars from dir1) and idempotency (a pre-existing destination .ecx
is not overwritten).

* trim verbose comments on EC mirror code

Comments now lead with the WHY (non-obvious constraints, the
post-mirror fast path, why local copies are authoritative) and drop
restate-the-code blocks, headers, and section dividers. Behavior is
unchanged; all existing tests still pass on both the Go volume
server and the seaweed-volume Rust port.

* drop github issue refs from added comments

Two stray "#9212" references slipped into comments I added on the
cross-disk reconciler call site. The git log carries the issue
history; comments stand on their own.

* test(ec): accept rebuild on either disk after sidecar mirror

TestEcLifecycleAcrossMultipleDisks asserted the rebuilt shard 9 must
land at the disk-0 path. With the boot-time sidecar mirror, every
shard-bearing disk owns its own .ecx, so VolumeEcShardsRebuild now
picks whichever disk hosts the most shards — disk 1 in this layout
after the deletion. The shard can legitimately rebuild on either
disk; the test now accepts both and uses the chosen path for the
subsequent mount + read verification.
This commit is contained in:
Chris Lu
2026-05-17 19:55:15 -07:00
committed by GitHub
parent 6b94701213
commit c11ff6657b
9 changed files with 976 additions and 37 deletions
+1
View File
@@ -4,6 +4,7 @@ pub mod idx;
pub mod needle;
pub mod needle_map;
pub mod store;
pub mod store_ec_mirror;
pub mod store_ec_reconcile;
pub mod super_block;
pub mod types;
+11 -8
View File
@@ -96,14 +96,16 @@ impl Store {
// re-loading shards we just cleaned up.
self.prune_incomplete_ec_with_sibling_dat();
// After every disk has finished its per-disk EC scan, sweep
// the store for shards that live on a disk without local index
// files and load them by reaching across to a sibling disk's
// .ecx / .ecj / .vif (seaweedfs/seaweedfs#9212 / #9244).
// ec.balance / ec.rebuild can move shards onto a destination
// node's second disk while leaving the index on the disk that
// already held the volume; without this pass those orphan
// shards stay invisible to the master.
// Physically mirror EC sidecars onto every shard-bearing disk
// so each disk mounts self-contained. Must run before the
// cross-disk reconciler so the orphan pass can prefer the
// local idx_directory.
self.mirror_ec_metadata_to_shard_disks();
// Cross-disk fallback for orphan shards — ec.balance can land
// shards on one disk while leaving the index on another. Still
// needed after the mirror pass for volumes whose mirror failed
// (read-only target, partial copy).
self.reconcile_ec_shards_across_disks();
Ok(())
@@ -118,6 +120,7 @@ impl Store {
}
}
self.prune_incomplete_ec_with_sibling_dat();
self.mirror_ec_metadata_to_shard_disks();
self.reconcile_ec_shards_across_disks();
}
@@ -0,0 +1,421 @@
//! Physical EC sidecar mirroring across disks of the same volume
//! server. Mirrors `weed/storage/store_ec_mirror.go`.
use std::collections::HashMap;
use std::fs;
use std::io::{self, Read, Write};
use std::path::Path;
use tracing::{info, warn};
use crate::storage::disk_location::{parse_collection_volume_id_pub, DiskLocation};
use crate::storage::store::Store;
use crate::storage::types::VolumeId;
// Listed in `EcVolume::new`'s open order.
const EC_MIRRORED_SIDECARS: &[&str] = &[".ecx", ".ecj", ".vif"];
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct EcKey {
collection: String,
vid: VolumeId,
}
#[derive(Clone, Debug)]
struct EcxOwner {
location: usize,
idx_dir: String,
data_dir: String,
}
impl Store {
/// Mirror EC sidecars onto every shard-bearing disk that lacks
/// them, so each disk mounts self-contained. Runs before
/// `reconcile_ec_shards_across_disks` so the orphan pass can
/// prefer the local idx_directory.
pub fn mirror_ec_metadata_to_shard_disks(&mut self) {
if self.locations.len() < 2 {
return;
}
let owners = self.index_ecx_owners_for_mirror();
if owners.is_empty() {
return;
}
// Two-pass: gather work under an immutable borrow, then apply
// copies under independent mutable borrows.
struct Mirror<'a> {
target_idx: usize,
owner: &'a EcxOwner,
collection: String,
vid: VolumeId,
}
let mut mirrors: Vec<Mirror> = Vec::new();
for (loc_idx, loc) in self.locations.iter().enumerate() {
let orphans = collect_shard_disk_volumes(loc);
for (key, _shards) in orphans {
let Some(owner) = owners.get(&key) else {
continue;
};
if owner.location == loc_idx {
continue;
}
if disk_has_all_sidecars(loc, &key.collection, key.vid) {
continue;
}
mirrors.push(Mirror {
target_idx: loc_idx,
owner,
collection: key.collection,
vid: key.vid,
});
}
}
for m in mirrors {
let loc_dir = self.locations[m.target_idx].directory.clone();
let loc_idx_dir = self.locations[m.target_idx].idx_directory.clone();
match mirror_sidecars_for_volume(
&m.owner.idx_dir,
&m.owner.data_dir,
&loc_dir,
&loc_idx_dir,
&m.collection,
m.vid,
) {
Ok(0) => {}
Ok(copied) => {
info!(
volume_id = m.vid.0,
collection = %m.collection,
from = %m.owner.data_dir,
to = %loc_dir,
copied,
"mirrored EC sidecar(s) for same-disk invariant",
);
}
Err(e) => {
warn!(
volume_id = m.vid.0,
collection = %m.collection,
from = %m.owner.data_dir,
to = %loc_dir,
error = %e,
"mirror EC sidecars failed; cross-disk fallback will handle this volume",
);
}
}
}
}
// Records both idx_dir (where .ecx was found) and data_dir, so
// the mirror can resolve .vif from data_dir even when .ecx lives
// in idx_directory.
fn index_ecx_owners_for_mirror(&self) -> HashMap<EcKey, EcxOwner> {
let mut owners: HashMap<EcKey, EcxOwner> = HashMap::new();
for (loc_idx, loc) in self.locations.iter().enumerate() {
let mut seen: Vec<&str> = Vec::with_capacity(2);
for scan in [loc.idx_directory.as_str(), loc.directory.as_str()] {
if scan.is_empty() || seen.contains(&scan) {
continue;
}
seen.push(scan);
let Ok(read) = fs::read_dir(scan) else {
continue;
};
for ent in read.flatten() {
if ent.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
continue;
}
let name = ent.file_name().to_string_lossy().into_owned();
let Some(base) = name.strip_suffix(".ecx") else {
continue;
};
let Some((collection, vid)) = parse_collection_volume_id_pub(base) else {
continue;
};
owners
.entry(EcKey { collection, vid })
.or_insert_with(|| EcxOwner {
location: loc_idx,
idx_dir: scan.to_string(),
data_dir: loc.directory.clone(),
});
}
}
}
owners
}
}
// Checks the modern routing and the opposite directory — without
// that fallback, a destination with a legacy pre-`-dir.idx` .ecx in
// its data dir would be re-mirrored into idx_directory.
fn disk_has_all_sidecars(loc: &DiskLocation, collection: &str, vid: VolumeId) -> bool {
for ext in EC_MIRRORED_SIDECARS {
let primary = sidecar_dest_path(&loc.directory, &loc.idx_directory, collection, vid, ext);
if path_is_regular_file(&primary) {
continue;
}
if loc.idx_directory != loc.directory {
let (fallback_data, fallback_idx) = if *ext == ".vif" {
(loc.idx_directory.as_str(), loc.directory.as_str())
} else {
(loc.directory.as_str(), loc.directory.as_str())
};
let fallback = sidecar_dest_path(fallback_data, fallback_idx, collection, vid, ext);
if path_is_regular_file(&fallback) {
continue;
}
}
return false;
}
true
}
fn path_is_regular_file(path: &str) -> bool {
fs::metadata(path).map(|m| !m.is_dir()).unwrap_or(false)
}
// `.ecx`/`.ecj` route to idx_directory, `.vif` to directory.
fn sidecar_dest_path(
data_dir: &str,
idx_dir: &str,
collection: &str,
vid: VolumeId,
ext: &str,
) -> String {
let dir = if ext == ".vif" { data_dir } else { idx_dir };
if collection.is_empty() {
format!("{}/{}{}", dir, vid.0, ext)
} else {
format!("{}/{}_{}{}", dir, collection, vid.0, ext)
}
}
fn mirror_sidecars_for_volume(
src_idx_dir: &str,
src_data_dir: &str,
dst_data_dir: &str,
dst_idx_dir: &str,
collection: &str,
vid: VolumeId,
) -> io::Result<usize> {
let mut copied = 0usize;
for ext in EC_MIRRORED_SIDECARS {
let dst = sidecar_dest_path(dst_data_dir, dst_idx_dir, collection, vid, ext);
// An existing local copy is authoritative — it may be newer
// than the owner's after a delete journal append.
if fs::metadata(&dst).is_ok() {
continue;
}
let candidates = [
sidecar_dest_path(src_data_dir, src_idx_dir, collection, vid, ext),
sidecar_dest_path(src_idx_dir, src_data_dir, collection, vid, ext),
];
let mut src_path: Option<String> = None;
for c in candidates.iter() {
if fs::metadata(c).map(|m| !m.is_dir()).unwrap_or(false) {
src_path = Some(c.clone());
break;
}
}
let Some(src) = src_path else {
continue;
};
copy_sidecar_atomic(Path::new(&src), Path::new(&dst))?;
copied += 1;
}
Ok(copied)
}
fn copy_sidecar_atomic(src: &Path, dst: &Path) -> io::Result<()> {
if let Some(parent) = dst.parent() {
fs::create_dir_all(parent)?;
}
let mut src_file = fs::File::open(src)?;
let tmp = {
let mut s = dst.as_os_str().to_owned();
s.push(".mirror.tmp");
std::path::PathBuf::from(s)
};
let _ = fs::remove_file(&tmp);
let mut dst_file = fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&tmp)?;
let mut buf = [0u8; 64 * 1024];
loop {
let n = src_file.read(&mut buf)?;
if n == 0 {
break;
}
dst_file.write_all(&buf[..n])?;
}
dst_file.sync_all()?;
drop(dst_file);
if let Err(e) = fs::rename(&tmp, dst) {
let _ = fs::remove_file(&tmp);
return Err(e);
}
Ok(())
}
fn collect_shard_disk_volumes(loc: &DiskLocation) -> HashMap<EcKey, Vec<String>> {
let mut out: HashMap<EcKey, Vec<String>> = HashMap::new();
let Ok(read) = fs::read_dir(&loc.directory) else {
return out;
};
for ent in read.flatten() {
if ent.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
continue;
}
let name = ent.file_name().to_string_lossy().into_owned();
let Some(dot) = name.rfind('.') else {
continue;
};
let (base, ext) = name.split_at(dot);
if crate::storage::disk_location::is_ec_shard_extension(ext).is_none() {
continue;
}
match ent.metadata() {
Ok(meta) if meta.len() > 0 => {}
_ => continue,
}
let Some((collection, vid)) = parse_collection_volume_id_pub(base) else {
continue;
};
out.entry(EcKey { collection, vid })
.or_default()
.push(name);
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::MinFreeSpace;
use crate::storage::needle_map::NeedleMapKind;
use crate::storage::types::DiskType;
use crate::storage::volume::{VifEcShardConfig, VifVolumeInfo};
use tempfile::TempDir;
fn plant_shard(dir: &Path, collection: &str, vid: u32, shard_id: u8) {
let path = if collection.is_empty() {
dir.join(format!("{}.ec{:02}", vid, shard_id))
} else {
dir.join(format!("{}_{}.ec{:02}", collection, vid, shard_id))
};
fs::write(&path, b"shard data nonempty").unwrap();
}
fn plant_ecx(dir: &Path, collection: &str, vid: u32, bytes: &[u8]) {
let path = dir.join(format!("{}_{}.ecx", collection, vid));
fs::write(&path, bytes).unwrap();
}
fn plant_ecj(dir: &Path, collection: &str, vid: u32, bytes: &[u8]) {
let path = dir.join(format!("{}_{}.ecj", collection, vid));
fs::write(&path, bytes).unwrap();
}
fn plant_vif(dir: &Path, collection: &str, vid: u32, data_shards: u32, parity_shards: u32) {
let vif = VifVolumeInfo {
version: 3,
ec_shard_config: Some(VifEcShardConfig {
data_shards,
parity_shards,
}),
..Default::default()
};
let path = dir.join(format!("{}_{}.vif", collection, vid));
fs::write(&path, serde_json::to_string(&vif).unwrap()).unwrap();
}
fn add_loc(store: &mut Store, dir: &Path) {
store
.add_location(
dir.to_str().unwrap(),
dir.to_str().unwrap(),
100,
DiskType::HardDrive,
MinFreeSpace::Percent(0.0),
Vec::new(),
)
.unwrap();
}
#[test]
fn mirror_copies_sidecars_to_shard_only_disk() {
let tmp = TempDir::new().unwrap();
let dir0 = tmp.path().join("data0");
let dir1 = tmp.path().join("data1");
fs::create_dir_all(&dir0).unwrap();
fs::create_dir_all(&dir1).unwrap();
let collection = "video-recordings";
let vid = 4121u32;
plant_shard(&dir0, collection, vid, 0);
plant_shard(&dir0, collection, vid, 12);
plant_shard(&dir1, collection, vid, 1);
let ecx = vec![0xA1u8; 20];
let ecj = vec![0xB2u8; 16];
plant_ecx(&dir1, collection, vid, &ecx);
plant_ecj(&dir1, collection, vid, &ecj);
plant_vif(&dir1, collection, vid, 10, 4);
let mut store = Store::new(NeedleMapKind::InMemory);
add_loc(&mut store, &dir0);
add_loc(&mut store, &dir1);
for ext in [".ecx", ".ecj", ".vif"] {
let dst = dir0.join(format!("{}_{}{}", collection, vid, ext));
assert!(
dst.exists(),
"mirror did not install sidecar {} on dir0",
ext
);
}
let ecx_dst = fs::read(dir0.join(format!("{}_{}.ecx", collection, vid))).unwrap();
assert_eq!(ecx_dst, ecx, ".ecx mirrored bytes differ from source");
let ecj_dst = fs::read(dir0.join(format!("{}_{}.ecj", collection, vid))).unwrap();
assert_eq!(ecj_dst, ecj, ".ecj mirrored bytes differ from source");
}
#[test]
fn mirror_preserves_existing_destination_sidecars() {
let tmp = TempDir::new().unwrap();
let dir0 = tmp.path().join("data0");
let dir1 = tmp.path().join("data1");
fs::create_dir_all(&dir0).unwrap();
fs::create_dir_all(&dir1).unwrap();
let collection = "video-recordings";
let vid = 7777u32;
plant_shard(&dir0, collection, vid, 0);
plant_shard(&dir0, collection, vid, 12);
plant_shard(&dir1, collection, vid, 1);
let ecx_owner = vec![0xC3u8; 20];
let ecx_local = vec![0x5Au8; 20];
let ecj_bytes = vec![0xD4u8; 16];
plant_ecx(&dir1, collection, vid, &ecx_owner);
plant_ecj(&dir1, collection, vid, &ecj_bytes);
plant_vif(&dir1, collection, vid, 10, 4);
plant_ecx(&dir0, collection, vid, &ecx_local);
plant_ecj(&dir0, collection, vid, &ecj_bytes);
plant_vif(&dir0, collection, vid, 10, 4);
let mut store = Store::new(NeedleMapKind::InMemory);
add_loc(&mut store, &dir0);
add_loc(&mut store, &dir1);
let post = fs::read(dir0.join(format!("{}_{}.ecx", collection, vid))).unwrap();
assert_eq!(post, ecx_local, "mirror overwrote dir0's existing .ecx");
}
}
@@ -26,6 +26,14 @@ use crate::storage::store::Store;
use crate::storage::super_block::SUPER_BLOCK_SIZE;
use crate::storage::types::VolumeId;
pub(crate) fn ec_local_ecx_path(dir: &str, collection: &str, vid: VolumeId) -> String {
if collection.is_empty() {
format!("{}/{}.ecx", dir, vid.0)
} else {
format!("{}/{}_{}.ecx", dir, collection, vid.0)
}
}
/// Sibling-disk `.dat` candidate for `prune_incomplete_ec_with_sibling_dat`.
/// We record both the disk index and the file's size: the size is
/// consulted before deleting any EC artefacts. A zero-byte or truncated
@@ -76,10 +84,10 @@ impl Store {
return;
}
// Snapshot of orphan shards, keyed by (loc_idx, ec_key) so we
// can release the immutable borrow on self.locations before
// calling mount_ec_shards_with_idx_dir (which needs &mut).
let mut to_load: Vec<(usize, EcKey, Vec<(String, u32)>, EcxOwnerInfo)> = Vec::new();
// `use_local_idx` is the post-mirror fast path: when the
// mirror already installed sidecars locally, mount against
// loc.idx_directory instead of the owner disk.
let mut to_load: Vec<(usize, EcKey, Vec<(String, u32)>, EcxOwnerInfo, bool)> = Vec::new();
for (loc_idx, loc) in self.locations.iter().enumerate() {
let orphans = collect_orphan_ec_shards(loc, loc_idx);
for (key, shards) in orphans {
@@ -93,34 +101,57 @@ impl Store {
);
continue;
};
if owner.location == loc_idx && owner.idx_dir == loc.idx_directory {
// Normal same-disk case: load_all_ec_shards already
// attempted the mount via `loc.idx_directory` and
// logged the underlying failure. No point retrying
// the same call.
let local_ecx = ec_local_ecx_path(&loc.idx_directory, &key.collection, key.vid);
let local_ecx_in_data = ec_local_ecx_path(&loc.directory, &key.collection, key.vid);
let use_local_idx = std::path::Path::new(&local_ecx).exists()
|| std::path::Path::new(&local_ecx_in_data).exists();
if !use_local_idx
&& owner.location == loc_idx
&& owner.idx_dir == loc.idx_directory
{
// Same-disk no-op: load_all_ec_shards already
// tried and logged the failure.
continue;
}
// Either a cross-disk owner OR a same-disk owner whose
// `.ecx` actually lives in `loc.directory` (the legacy
// pre-`-dir.idx` layout). The latter wasn't tried by
// load_all_ec_shards, which only looked in
// `self.idx_directory`, so we still need to retry it
// here with the owner's discovered idx_dir.
to_load.push((loc_idx, key, shards, owner.clone()));
to_load.push((loc_idx, key, shards, owner.clone(), use_local_idx));
}
}
for (loc_idx, key, shards, owner) in to_load {
for (loc_idx, key, shards, owner, use_local_idx) in to_load {
let shard_names: Vec<&str> = shards.iter().map(|(n, _)| n.as_str()).collect();
let loc_dir = self.locations[loc_idx].directory.clone();
let shard_ids: Vec<u32> = shards.iter().map(|(_, sid)| *sid).collect();
if use_local_idx {
info!(
volume_id = key.vid.0,
collection = %key.collection,
directory = %loc_dir,
"loading orphan EC shards against locally-mirrored sidecars: {:?}",
shard_names,
);
let loc = &mut self.locations[loc_idx];
if let Err(e) = loc.mount_ec_shards(key.vid, &key.collection, &shard_ids, "") {
loc.unmount_ec_shards(key.vid, &shard_ids);
warn!(
volume_id = key.vid.0,
directory = %loc_dir,
"local-mirror shard load failed: {}",
e,
);
}
continue;
}
info!(
volume_id = key.vid.0,
collection = %key.collection,
from = %self.locations[owner.location].directory,
to = %self.locations[loc_idx].directory,
to = %loc_dir,
"loading orphan EC shards using index files from sibling disk (issue #9212): {:?}",
shard_names,
);
let shard_ids: Vec<u32> = shards.iter().map(|(_, sid)| *sid).collect();
let owner_idx_dir = owner.idx_dir.clone();
let loc = &mut self.locations[loc_idx];
if let Err(e) = loc.mount_ec_shards_with_idx_dir(
@@ -141,7 +172,7 @@ impl Store {
loc.unmount_ec_shards(key.vid, &shard_ids);
warn!(
volume_id = key.vid.0,
directory = %loc.directory,
directory = %loc_dir,
"cross-disk shard load failed: {}",
e,
);
@@ -205,8 +205,21 @@ func TestEcLifecycleAcrossMultipleDisks(t *testing.T) {
t.Fatalf("VolumeEcShardsRebuild expected to rebuild shard %d, got %v",
repairTargetShard, rebuildResp.GetRebuiltShardIds())
}
if _, statErr := os.Stat(shardPath); statErr != nil {
t.Fatalf("rebuild did not restore shard %d on disk 0 (%s): %v", repairTargetShard, shardPath, statErr)
// The rebuilder picks whichever disk hosts the most shards plus a
// matching .ecx. After the boot-time mirror runs, every shard-
// bearing disk owns its own sidecars, so rebuild can legitimately
// restore the shard on either disk — accept both.
rebuiltOn := -1
for i, dir := range dataDirs {
candidate := filepath.Join(dir, shardFileName(collection, volumeID, repairTargetShard))
if _, statErr := os.Stat(candidate); statErr == nil {
rebuiltOn = i
shardPath = candidate
break
}
}
if rebuiltOn < 0 {
t.Fatalf("rebuild did not restore shard %d on any disk (checked %v)", repairTargetShard, dataDirs)
}
if _, err := grpcClient3.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
+9 -7
View File
@@ -164,13 +164,15 @@ func NewStore(
// we just cleaned up.
s.pruneIncompleteEcWithSiblingDat()
// After every DiskLocation has finished its per-disk EC scan, sweep the
// store for shards that live on a disk without local index files and
// load them by reaching across to a sibling disk's .ecx / .ecj / .vif.
// This is the volume-server side of issue #9212: ec.balance can move
// shards onto a destination node's second disk while leaving the index
// on the disk that already held the volume, and without this pass those
// orphan shards stay invisible to the master.
// Physically mirror EC sidecars onto every shard-bearing disk so
// each disk mounts self-contained. Must run before the cross-disk
// reconciler so the orphan pass can prefer the local IdxDirectory.
s.mirrorEcMetadataToShardDisks()
// Cross-disk fallback for orphan shards — ec.balance can land
// shards on one disk while leaving the index on another. Still
// needed after the mirror pass for volumes whose mirror failed
// (read-only target, out of space, partial copy).
s.reconcileEcShardsAcrossDisks()
// Resolve state.pb's directory via the first disk location so it inherits
+179
View File
@@ -0,0 +1,179 @@
package storage
import (
"fmt"
"io"
"os"
"path/filepath"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
// Listed in the order NewEcVolume opens them.
var ecMirroredSidecars = []string{".ecx", ".ecj", ".vif"}
// mirrorEcMetadataToShardDisks physically copies .ecx / .ecj / .vif
// onto every disk that holds EC shards but lacks the matching
// sidecars, so each shard-bearing disk mounts self-contained instead
// of reaching across to a sibling. Runs before
// reconcileEcShardsAcrossDisks; the cross-disk virtual mount stays
// as the fallback when mirroring fails.
func (s *Store) mirrorEcMetadataToShardDisks() {
if len(s.Locations) < 2 {
return
}
ecxOwners := s.indexEcxOwners()
if len(ecxOwners) == 0 {
return
}
for _, loc := range s.Locations {
orphans := loc.collectOrphanEcShards()
if len(orphans) == 0 {
continue
}
for key := range orphans {
owner, ok := ecxOwners[key]
if !ok {
continue
}
if owner.location == loc {
continue
}
if loc.hasAllEcSidecarsLocally(key.collection, key.vid) {
continue
}
copied, err := loc.mirrorEcSidecarsFrom(owner, key.collection, key.vid)
if err != nil {
glog.Warningf("ec volume %d (collection=%q): mirror sidecars from %s to %s failed after %d files: %v; cross-disk fallback will handle this volume",
key.vid, key.collection, owner.location.Directory, loc.Directory, copied, err)
continue
}
if copied > 0 {
glog.V(0).Infof("ec volume %d (collection=%q): mirrored %d sidecar(s) from %s to %s for same-disk invariant",
key.vid, key.collection, copied, owner.location.Directory, loc.Directory)
}
}
}
}
// hasAllEcSidecarsLocally checks both the modern routing and the
// opposite directory (legacy pre-`-dir.idx` layout) — without the
// fallback, a destination that still has .ecx in its data dir would
// be re-mirrored into IdxDirectory.
func (l *DiskLocation) hasAllEcSidecarsLocally(collection string, vid needle.VolumeId) bool {
for _, ext := range ecMirroredSidecars {
if statRegular(l.ecSidecarDestPath(collection, vid, ext)) {
continue
}
if l.IdxDirectory != l.Directory {
fallbackDir := l.Directory
if ext == ".vif" {
fallbackDir = l.IdxDirectory
}
if statRegular(erasure_coding.EcShardFileName(collection, fallbackDir, int(vid)) + ext) {
continue
}
}
return false
}
return true
}
func statRegular(path string) bool {
info, err := os.Stat(path)
return err == nil && !info.IsDir()
}
// ecSidecarDestPath routes .ecx/.ecj to IdxDirectory and .vif to
// Directory, matching NewEcVolume's open order.
func (l *DiskLocation) ecSidecarDestPath(collection string, vid needle.VolumeId, ext string) string {
if ext == ".vif" {
return erasure_coding.EcShardFileName(collection, l.Directory, int(vid)) + ext
}
return erasure_coding.EcShardFileName(collection, l.IdxDirectory, int(vid)) + ext
}
func (l *DiskLocation) mirrorEcSidecarsFrom(owner ecxOwnerInfo, collection string, vid needle.VolumeId) (int, error) {
srcIdxBase := erasure_coding.EcShardFileName(collection, owner.idxDir, int(vid))
srcDataBase := erasure_coding.EcShardFileName(collection, owner.location.Directory, int(vid))
copied := 0
for _, ext := range ecMirroredSidecars {
dst := l.ecSidecarDestPath(collection, vid, ext)
// An existing local copy is authoritative — it may be newer
// than the owner's after a delete journal append.
if _, err := os.Stat(dst); err == nil {
continue
} else if !os.IsNotExist(err) {
return copied, fmt.Errorf("stat %s: %w", dst, err)
}
var src string
for _, candidate := range []string{srcIdxBase + ext, srcDataBase + ext} {
if info, err := os.Stat(candidate); err == nil && !info.IsDir() {
src = candidate
break
}
}
if src == "" {
glog.V(1).Infof("ec volume %d (collection=%q): sidecar %s not found on owner %s; skipping mirror",
vid, collection, ext, owner.location.Directory)
continue
}
if err := copyEcSidecarAtomic(src, dst); err != nil {
return copied, fmt.Errorf("copy %s: %w", ext, err)
}
copied++
}
return copied, nil
}
// copyEcSidecarAtomic writes to <dst>.mirror.tmp, fsyncs, renames.
// Crash-safe: a partial write leaves the tmp orphaned and the
// canonical dst absent so retries recognise the file still needs
// copying.
func copyEcSidecarAtomic(src, dst string) error {
srcFile, err := os.Open(src)
if err != nil {
return fmt.Errorf("open source %s: %w", src, err)
}
defer srcFile.Close()
if err := os.MkdirAll(filepath.Dir(dst), 0o755); err != nil {
return fmt.Errorf("mkdir %s: %w", filepath.Dir(dst), err)
}
tmpDst := dst + ".mirror.tmp"
_ = os.Remove(tmpDst)
dstFile, err := os.OpenFile(tmpDst, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o644)
if err != nil {
return fmt.Errorf("create %s: %w", tmpDst, err)
}
cleanup := func() {
_ = dstFile.Close()
_ = os.Remove(tmpDst)
}
if _, err := io.Copy(dstFile, srcFile); err != nil {
cleanup()
return fmt.Errorf("copy bytes %s -> %s: %w", src, tmpDst, err)
}
if err := dstFile.Sync(); err != nil {
cleanup()
return fmt.Errorf("fsync %s: %w", tmpDst, err)
}
if err := dstFile.Close(); err != nil {
_ = os.Remove(tmpDst)
return fmt.Errorf("close %s: %w", tmpDst, err)
}
if err := os.Rename(tmpDst, dst); err != nil {
_ = os.Remove(tmpDst)
return fmt.Errorf("rename %s -> %s: %w", tmpDst, dst, err)
}
return nil
}
+278
View File
@@ -0,0 +1,278 @@
package storage
import (
"bytes"
"os"
"path/filepath"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// Orphan layout: shards on dir0, sidecars on dir1. After NewStore,
// dir0 must own a local copy of every sidecar.
func TestMirrorEcMetadataOnStartup_PhysicallyCopiesSidecars(t *testing.T) {
tempDir := t.TempDir()
dir0 := filepath.Join(tempDir, "data1")
dir1 := filepath.Join(tempDir, "data2")
for _, d := range []string{dir0, dir1} {
if err := os.MkdirAll(d, 0o755); err != nil {
t.Fatalf("mkdir %s: %v", d, err)
}
}
collection := "video-recordings"
vid := needle.VolumeId(4121)
const dataShards, parityShards = 10, 4
const datSize int64 = 10 * 1024 * 1024
expectedShardSize := calculateExpectedShardSize(datSize, dataShards)
writeShard := func(dir string, shardId int) {
t.Helper()
base := erasure_coding.EcShardFileName(collection, dir, int(vid))
f, err := os.Create(base + erasure_coding.ToExt(shardId))
if err != nil {
t.Fatalf("create shard %d in %s: %v", shardId, dir, err)
}
if err := f.Truncate(expectedShardSize); err != nil {
f.Close()
t.Fatalf("truncate shard %d in %s: %v", shardId, dir, err)
}
f.Close()
}
writeShard(dir0, 0)
writeShard(dir0, 12)
writeShard(dir1, 1)
base1 := erasure_coding.EcShardFileName(collection, dir1, int(vid))
ecxBytes := bytes.Repeat([]byte{0xA1}, 20)
ecjBytes := bytes.Repeat([]byte{0xB2}, 16)
if err := os.WriteFile(base1+".ecx", ecxBytes, 0o644); err != nil {
t.Fatalf("write .ecx: %v", err)
}
if err := os.WriteFile(base1+".ecj", ecjBytes, 0o644); err != nil {
t.Fatalf("write .ecj: %v", err)
}
if err := volume_info.SaveVolumeInfo(base1+".vif", &volume_server_pb.VolumeInfo{
Version: uint32(needle.Version3),
DatFileSize: datSize,
EcShardConfig: &volume_server_pb.EcShardConfig{
DataShards: dataShards,
ParityShards: parityShards,
},
}); err != nil {
t.Fatalf("save .vif: %v", err)
}
store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id",
[]string{dir0, dir1},
[]int32{100, 100},
[]util.MinFreeSpace{{}, {}},
"",
NeedleMapInMemory,
[]types.DiskType{types.HardDriveType, types.HardDriveType},
nil,
3,
)
done := make(chan struct{})
go func() {
for {
select {
case <-store.NewEcShardsChan:
case <-store.NewVolumesChan:
case <-store.DeletedVolumesChan:
case <-store.DeletedEcShardsChan:
case <-store.StateUpdateChan:
case <-done:
return
}
}
}()
t.Cleanup(func() {
store.Close()
close(done)
})
base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid))
// .vif drifts on first mount via version-correction, so byte
// equality is only the right invariant for .ecx and .ecj.
for _, ext := range []string{".ecx", ".ecj"} {
gotBytes, err := os.ReadFile(base0 + ext)
if err != nil {
t.Errorf("sidecar %s not mirrored to dir0: %v", ext, err)
continue
}
wantBytes, err := os.ReadFile(base1 + ext)
if err != nil {
t.Fatalf("read source %s: %v", base1+ext, err)
}
if !bytes.Equal(gotBytes, wantBytes) {
t.Errorf("sidecar %s content mismatch between dir0 and dir1", ext)
}
}
dir0Info, _, found, err := volume_info.MaybeLoadVolumeInfo(base0 + ".vif")
if err != nil || !found {
t.Errorf("dir0 .vif missing or unreadable after mirror: err=%v found=%v", err, found)
} else if dir0Info.EcShardConfig == nil ||
dir0Info.EcShardConfig.DataShards != dataShards ||
dir0Info.EcShardConfig.ParityShards != parityShards {
t.Errorf("dir0 .vif has wrong EC ratio after mirror: got %+v, want %d+%d",
dir0Info.EcShardConfig, dataShards, parityShards)
}
loc0 := store.Locations[0]
ev0, found := loc0.FindEcVolume(vid)
if !found {
t.Fatalf("dir0 EcVolume %d not loaded after startup mirror", vid)
}
for _, sid := range []erasure_coding.ShardId{0, 12} {
if _, ok := ev0.FindEcVolumeShard(sid); !ok {
t.Errorf("shard %d not registered on dir0 after mirror", sid)
}
}
if got, want := filepath.Dir(ev0.FileName(".ecx")), dir0; got != want {
t.Errorf("dir0 EcVolume .ecx resolved at %q, want %q (local mirrored copy)", got, want)
}
loc1 := store.Locations[1]
ev1, found := loc1.FindEcVolume(vid)
if !found {
t.Fatalf("dir1 EcVolume %d not loaded after startup mirror", vid)
}
if _, ok := ev1.FindEcVolumeShard(1); !ok {
t.Errorf("dir1 shard 1 missing after mirror")
}
for _, sid := range []int{0, 12} {
shardPath := base0 + erasure_coding.ToExt(sid)
fi, err := os.Stat(shardPath)
if err != nil {
t.Errorf("shard %d on dir0 lost after mirror: %v", sid, err)
continue
}
if fi.Size() != expectedShardSize {
t.Errorf("shard %d on dir0 truncated by mirror pass: size %d, want %d", sid, fi.Size(), expectedShardSize)
}
}
}
// A pre-existing destination .ecx must not be overwritten — local
// copies are authoritative because they may have absorbed delete
// journal updates not yet on the owner.
func TestMirrorEcMetadataOnStartup_NoOpWhenAlreadyMirrored(t *testing.T) {
tempDir := t.TempDir()
dir0 := filepath.Join(tempDir, "data1")
dir1 := filepath.Join(tempDir, "data2")
for _, d := range []string{dir0, dir1} {
if err := os.MkdirAll(d, 0o755); err != nil {
t.Fatalf("mkdir %s: %v", d, err)
}
}
collection := "video-recordings"
vid := needle.VolumeId(7777)
const dataShards, parityShards = 10, 4
const datSize int64 = 10 * 1024 * 1024
expectedShardSize := calculateExpectedShardSize(datSize, dataShards)
plantShard := func(dir string, shardId int) {
base := erasure_coding.EcShardFileName(collection, dir, int(vid))
f, err := os.Create(base + erasure_coding.ToExt(shardId))
if err != nil {
t.Fatalf("create shard %d in %s: %v", shardId, dir, err)
}
if err := f.Truncate(expectedShardSize); err != nil {
f.Close()
t.Fatalf("truncate shard %d in %s: %v", shardId, dir, err)
}
f.Close()
}
plantSidecars := func(dir string, ecx, ecj []byte) {
base := erasure_coding.EcShardFileName(collection, dir, int(vid))
if err := os.WriteFile(base+".ecx", ecx, 0o644); err != nil {
t.Fatalf("write %s.ecx: %v", base, err)
}
if err := os.WriteFile(base+".ecj", ecj, 0o644); err != nil {
t.Fatalf("write %s.ecj: %v", base, err)
}
if err := volume_info.SaveVolumeInfo(base+".vif", &volume_server_pb.VolumeInfo{
Version: uint32(needle.Version3),
DatFileSize: datSize,
EcShardConfig: &volume_server_pb.EcShardConfig{
DataShards: dataShards,
ParityShards: parityShards,
},
}); err != nil {
t.Fatalf("save %s.vif: %v", base, err)
}
}
plantShard(dir0, 0)
plantShard(dir0, 12)
plantShard(dir1, 1)
// Deliberately different .ecx bytes on each side so any overwrite
// shows up as a diff. .ecx never rewrites at mount, unlike .vif.
ecxOwner := bytes.Repeat([]byte{0xC3}, 20)
ecxLocal := bytes.Repeat([]byte{0x5A}, 20)
ecjBytes := bytes.Repeat([]byte{0xD4}, 16)
plantSidecars(dir1, ecxOwner, ecjBytes)
plantSidecars(dir0, ecxLocal, ecjBytes)
base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid))
store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id",
[]string{dir0, dir1},
[]int32{100, 100},
[]util.MinFreeSpace{{}, {}},
"",
NeedleMapInMemory,
[]types.DiskType{types.HardDriveType, types.HardDriveType},
nil,
3,
)
done := make(chan struct{})
go func() {
for {
select {
case <-store.NewEcShardsChan:
case <-store.NewVolumesChan:
case <-store.DeletedVolumesChan:
case <-store.DeletedEcShardsChan:
case <-store.StateUpdateChan:
case <-done:
return
}
}
}()
t.Cleanup(func() {
store.Close()
close(done)
})
postEcx, err := os.ReadFile(base0 + ".ecx")
if err != nil {
t.Fatalf("read dir0 .ecx after NewStore: %v", err)
}
if !bytes.Equal(ecxLocal, postEcx) {
t.Errorf("dir0 .ecx was overwritten by mirror pass; pre-existing destination must be preserved (got first 4 bytes %x, want %x)",
postEcx[:4], ecxLocal[:4])
}
if loc0 := store.Locations[0]; loc0 == nil {
t.Fatal("loc0 unexpectedly nil")
} else if ev, found := loc0.FindEcVolume(vid); !found {
t.Errorf("dir0 EcVolume %d not loaded", vid)
} else if got, want := filepath.Dir(ev.FileName(".ecx")), dir0; got != want {
t.Errorf("dir0 .ecx resolved at %q, want %q", got, want)
}
}
+11
View File
@@ -79,6 +79,17 @@ func (s *Store) reconcileEcShardsAcrossDisks() {
key.vid, key.collection, loc.Directory, shards)
continue
}
// Post-mirror fast path: when the local .ecx is present,
// mount self-contained against IdxDirectory instead of
// the owner disk.
if loc.HasEcxFileOnDisk(key.collection, key.vid) {
glog.V(0).Infof("ec volume %d (collection=%q): loading orphan shards %v on %s against locally-mirrored sidecars",
key.vid, key.collection, shards, loc.Directory)
if err := loc.loadEcShards(shards, key.collection, key.vid, loc.ecShardNotifyHandler); err != nil {
glog.Errorf("ec volume %d on %s: local-mirror shard load failed: %v", key.vid, loc.Directory, err)
}
continue
}
if owner.location == loc {
// .ecx is on this same disk, but loadAllEcShards still
// did not load these shards — handleFoundEcxFile already