mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(storage): never let an empty .dat delete healthy distributed EC shards (#9930)
* fix(storage): never let an empty .dat delete healthy distributed EC shards A leftover empty .dat stub (a phantom from the pre-fix loader; zero needles) next to a distributed EC volume's local shards made startup classify the volume as an interrupted local encode: validateEcVolume requires >= dataShards local shards when a .dat is present, fails with the 1-2 shards a distributed volume keeps per disk, and the cleanup deletes those shards -- the only copies of that part of the volume. Repeated across restart waves this destroys enough shards cluster-wide to make the volume unrecoverable. Go: - loadExistingVolume: hoist the empty-stub sweep above the EC presence checks. Previously the .vif-next-to-.ecx guard returned before the sweep ever ran, so exactly the dangerous layout (stub + .ecx + local shards) kept its stub and then lost its shards in loadAllEcShards. - validateEcVolume / checkDatFileExists: treat a .dat <= a superblock (zero needles) as absent. An empty .dat cannot be the encode source, so it must never gate shard deletion; this also covers stubs without a .vif, which the sweep cannot prove are EC leftovers. Rust mirror (seaweed-volume): the same gate in validate_ec_volume and check_dat_file_exists (the Rust sweep already ran before validation); the volume-load skip keeps a plain existence check so fresh, needle-less volumes still load. Regression tests in Go and Rust reproduce the production layout (a zero-byte .dat beside .ecx/.ecj and two shards of a 10+4 volume, with and without a .vif) and fail without the fix with the shards deleted. * fix(ec): gate source volume deletion on a recoverable shard set After EC encode, the shell command and the (plugin) worker task refused to delete the source volume unless every shard was present, and aborted otherwise -- leaving the source .dat next to live shards, exactly the mixed state the startup cleanup mishandles. Replace the full-set requirement with a recoverability gate shared by both callers (RequireRecoverableShardSet): deleting a non-empty source .dat requires at least dataShards distinct shards cluster-wide. Below that the source is kept and the encode fails as before. A degraded but recoverable set (>= dataShards, < total) now proceeds with a warning instead of aborting: the missing shards can be rebuilt from the survivors, while keeping the source would preserve the dangerous mixed state. Empty stub replicas are still swept unguarded (OnlyEmpty) -- an empty .dat has nothing to lose. dataShards/totalShards stay parameters so enterprise custom EC ratios share the helper verbatim. * test(ec): use recoverable shard verification gate
This commit is contained in:
@@ -192,8 +192,14 @@ impl DiskLocation {
|
||||
// the sibling-.dat prune deletes real shards against. Remote-tiered
|
||||
// volumes also have no local `.dat`, but their `.vif` points at
|
||||
// remote files and must still load via the remote path.
|
||||
// Plain existence check (not check_dat_file_exists): an empty
|
||||
// .dat here is a fresh, needle-less volume that must still load.
|
||||
// A stat error other than NotFound counts as present so a
|
||||
// transient error doesn't skip a real volume.
|
||||
let dat_path = format!("{}.dat", volume_name);
|
||||
if !check_dat_file_exists(&dat_path)
|
||||
let dat_missing = matches!(fs::metadata(&dat_path),
|
||||
Err(ref e) if e.kind() == io::ErrorKind::NotFound);
|
||||
if dat_missing
|
||||
&& !vif_references_remote_file(&format!("{}.vif", volume_name))
|
||||
&& !vif_references_remote_file(&format!("{}.vif", idx_name))
|
||||
{
|
||||
@@ -243,15 +249,21 @@ impl DiskLocation {
|
||||
let dat_path = format!("{}.dat", base);
|
||||
|
||||
let mut expected_shard_size: Option<i64> = None;
|
||||
let dat_exists = std::path::Path::new(&dat_path).exists();
|
||||
|
||||
if dat_exists {
|
||||
if let Ok(meta) = fs::metadata(&dat_path) {
|
||||
// An empty .dat (<= a superblock, zero needles) cannot be the encode
|
||||
// source -- it is a leftover stub -- so treat it as absent rather than
|
||||
// letting it mark a healthy distributed EC volume as an interrupted
|
||||
// local encode, which would delete its shards.
|
||||
let dat_exists = match fs::metadata(&dat_path) {
|
||||
Ok(meta) if meta.len() > SUPER_BLOCK_SIZE as u64 => {
|
||||
expected_shard_size = Some(calculate_expected_shard_size(meta.len() as i64));
|
||||
} else {
|
||||
return false;
|
||||
true
|
||||
}
|
||||
}
|
||||
Ok(_) => false,
|
||||
Err(e) if e.kind() == io::ErrorKind::NotFound => false,
|
||||
// Unexpected stat error: don't risk classifying local EC as
|
||||
// distributed; fail validation instead of deleting anything.
|
||||
Err(_) => return false,
|
||||
};
|
||||
|
||||
let mut shard_count = 0usize;
|
||||
let mut actual_shard_size: Option<i64> = None;
|
||||
@@ -1060,12 +1072,14 @@ fn parse_ec_shard_extension(ext: &str) -> Option<u32> {
|
||||
Some(id)
|
||||
}
|
||||
|
||||
/// Robust `.dat` existence check: any unexpected stat error (permission,
|
||||
/// I/O) is treated as "exists" so we don't misclassify local EC as
|
||||
/// distributed EC. Mirrors `checkDatFileExists` in Go.
|
||||
/// Robust check that a `.dat` with actual data exists. An empty `.dat`
|
||||
/// (<= a superblock, zero needles) is a leftover stub, not an encode source,
|
||||
/// and counts as absent so it never justifies deleting shards. Any unexpected
|
||||
/// stat error (permission, I/O) is treated as "exists" so we don't misclassify
|
||||
/// local EC as distributed EC. Mirrors `checkDatFileExists` in Go.
|
||||
fn check_dat_file_exists(path: &str) -> bool {
|
||||
match fs::metadata(path) {
|
||||
Ok(_) => true,
|
||||
Ok(meta) => meta.len() > SUPER_BLOCK_SIZE as u64,
|
||||
Err(e) if e.kind() == io::ErrorKind::NotFound => false,
|
||||
Err(_) => true,
|
||||
}
|
||||
|
||||
@@ -613,6 +613,97 @@ mod tests {
|
||||
assert_eq!(total, 2, "real EC shards were deleted (total={})", total);
|
||||
}
|
||||
|
||||
/// A disk holding a few local shards of a healthy distributed EC volume
|
||||
/// plus a leftover empty `.dat` stub: the stub must be swept before EC
|
||||
/// validation can mistake the volume for an interrupted local encode
|
||||
/// (fewer than data_shards local shards) and delete the only copies of
|
||||
/// those shards.
|
||||
#[test]
|
||||
fn test_empty_dat_stub_next_to_ecx_does_not_delete_shards() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let d0 = tmp.path().join("data0");
|
||||
std::fs::create_dir_all(&d0).unwrap();
|
||||
let dir = d0.to_str().unwrap();
|
||||
let coll = "warp-rec";
|
||||
let vid = 87u32;
|
||||
|
||||
write_shard(dir, coll, vid, 0);
|
||||
write_shard(dir, coll, vid, 5);
|
||||
write_index_files(dir, coll, vid, 10, 4);
|
||||
// Zero-byte stub .dat: the phantom left by the pre-fix loader.
|
||||
std::fs::write(d0.join(format!("{}_{}.dat", coll, vid)), b"").unwrap();
|
||||
|
||||
let mut store = Store::new(NeedleMapKind::InMemory);
|
||||
store
|
||||
.add_location(
|
||||
dir,
|
||||
dir,
|
||||
100,
|
||||
DiskType::HardDrive,
|
||||
MinFreeSpace::Percent(0.0),
|
||||
Vec::new(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
!d0.join(format!("{}_{}.dat", coll, vid)).exists(),
|
||||
"empty .dat stub was not swept"
|
||||
);
|
||||
assert_eq!(
|
||||
store.locations[0].ec_shard_count(),
|
||||
2,
|
||||
"EC shards were deleted on the stub's account"
|
||||
);
|
||||
assert_eq!(
|
||||
store.locations[0].volume_ids().len(),
|
||||
0,
|
||||
"stub was loaded as a phantom volume"
|
||||
);
|
||||
}
|
||||
|
||||
/// Same shard-holding disk but the stub has no `.vif` at all, so the
|
||||
/// sweep has no EC evidence and must leave it. The empty `.dat` still
|
||||
/// must not count as an encode source: the shards survive and load as
|
||||
/// a distributed EC volume.
|
||||
#[test]
|
||||
fn test_empty_dat_without_vif_does_not_delete_shards() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let d0 = tmp.path().join("data0");
|
||||
std::fs::create_dir_all(&d0).unwrap();
|
||||
let dir = d0.to_str().unwrap();
|
||||
let coll = "warp-rec";
|
||||
let vid = 88u32;
|
||||
|
||||
write_shard(dir, coll, vid, 1);
|
||||
write_shard(dir, coll, vid, 7);
|
||||
write_index_files(dir, coll, vid, 10, 4);
|
||||
std::fs::remove_file(d0.join(format!("{}_{}.vif", coll, vid))).unwrap();
|
||||
std::fs::write(d0.join(format!("{}_{}.dat", coll, vid)), b"").unwrap();
|
||||
|
||||
let mut store = Store::new(NeedleMapKind::InMemory);
|
||||
store
|
||||
.add_location(
|
||||
dir,
|
||||
dir,
|
||||
100,
|
||||
DiskType::HardDrive,
|
||||
MinFreeSpace::Percent(0.0),
|
||||
Vec::new(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
store.locations[0].ec_shard_count(),
|
||||
2,
|
||||
"EC shards were deleted on the empty .dat's account"
|
||||
);
|
||||
assert_eq!(
|
||||
store.locations[0].volume_ids().len(),
|
||||
0,
|
||||
"empty .dat was loaded as a phantom volume"
|
||||
);
|
||||
}
|
||||
|
||||
/// Regression for the prune credibility gate: when the EC `.vif`
|
||||
/// records no source size (`dat_file_size == 0`), an empty 8-byte
|
||||
/// sibling `.dat` stub must NOT justify deleting partial EC shards.
|
||||
|
||||
@@ -159,15 +159,22 @@ func TestVolumeEcShardsInfoReturnsAllShardsAcrossDisks(t *testing.T) {
|
||||
// End-to-end assertion via the same helper the worker uses to gate
|
||||
// source-volume deletion (weed/worker/tasks/erasure_coding/ec_task.go
|
||||
// verifyEcShardsBeforeDelete). The union across destinations is what
|
||||
// RequireFullShardSet measures; with one destination that holds every
|
||||
// shard, the union must cover dataShards + parityShards.
|
||||
// RequireRecoverableShardSet measures; with one destination that holds
|
||||
// every shard, the union must cover dataShards + parityShards without
|
||||
// being reported as degraded.
|
||||
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
|
||||
servers := []string{clusterHarness.VolumeServerAddress()}
|
||||
union, perServer := erasure_coding.VerifyShardsAcrossServers(ctx, volumeID, servers, dialOption)
|
||||
if err := erasure_coding.RequireFullShardSet(volumeID, union, erasure_coding.TotalShardsCount); err != nil {
|
||||
degraded, err := erasure_coding.RequireRecoverableShardSet(
|
||||
volumeID, union, erasure_coding.DataShardsCount, erasure_coding.TotalShardsCount)
|
||||
if err != nil {
|
||||
t.Fatalf("verifyEcShardsBeforeDelete-equivalent gate failed: %v\nper-server inventory: %s\nper-disk layout: %v",
|
||||
err, erasure_coding.SummarizeShardInventory(perServer), postReconcileLayout)
|
||||
}
|
||||
if degraded {
|
||||
t.Fatalf("verifyEcShardsBeforeDelete-equivalent gate reported a degraded shard set\nper-server inventory: %s\nper-disk layout: %v",
|
||||
erasure_coding.SummarizeShardInventory(perServer), postReconcileLayout)
|
||||
}
|
||||
if got, want := union.Count(), erasure_coding.TotalShardsCount; got != want {
|
||||
t.Fatalf("VerifyShardsAcrossServers union covered %d/%d shards (per-server=%s, layout=%v)",
|
||||
got, want, erasure_coding.SummarizeShardInventory(perServer), postReconcileLayout)
|
||||
|
||||
@@ -594,12 +594,16 @@ func verifyEcShardsBeforeDelete(commandEnv *CommandEnv, volumeIds []needle.Volum
|
||||
// volume-server heartbeats, so freshly distributed shards may not all be
|
||||
// visible in the master topology immediately. Poll a few times before
|
||||
// concluding the shard set is incomplete, so a heartbeat-propagation lag is
|
||||
// not mistaken for missing data (which would abort the encode). Genuine loss
|
||||
// still fails after the retries are exhausted.
|
||||
// not mistaken for missing data. After the retries: a volume below the
|
||||
// recoverable threshold (dataShards) aborts the deletion; a recoverable
|
||||
// but degraded set proceeds with a warning, since the missing shards can
|
||||
// be rebuilt from the survivors while keeping the source next to live
|
||||
// shards is the more dangerous mixed state.
|
||||
const maxAttempts = 10
|
||||
const retryInterval = 2 * time.Second
|
||||
|
||||
var lastErr error
|
||||
var lastDegraded []string
|
||||
for attempt := 0; attempt < maxAttempts; attempt++ {
|
||||
topoInfo, _, err := collectTopologyInfo(commandEnv, 0)
|
||||
if err != nil {
|
||||
@@ -607,6 +611,7 @@ func verifyEcShardsBeforeDelete(commandEnv *CommandEnv, volumeIds []needle.Volum
|
||||
}
|
||||
|
||||
lastErr = nil
|
||||
lastDegraded = lastDegraded[:0]
|
||||
for _, vid := range volumeIds {
|
||||
nodeShards, _ := collectEcNodeShardsInfo(topoInfo, vid, diskType)
|
||||
|
||||
@@ -616,7 +621,8 @@ func verifyEcShardsBeforeDelete(commandEnv *CommandEnv, volumeIds []needle.Volum
|
||||
}
|
||||
|
||||
totalShards := erasure_coding.TotalShardsCount
|
||||
if err := erasure_coding.RequireFullShardSet(uint32(vid), union, totalShards); err != nil {
|
||||
degraded, err := erasure_coding.RequireRecoverableShardSet(uint32(vid), union, erasure_coding.DataShardsCount, totalShards)
|
||||
if err != nil {
|
||||
summary := make([]string, 0, len(nodeShards))
|
||||
for node, info := range nodeShards {
|
||||
summary = append(summary, fmt.Sprintf("%s=%s", node, info.String()))
|
||||
@@ -625,23 +631,32 @@ func verifyEcShardsBeforeDelete(commandEnv *CommandEnv, volumeIds []needle.Volum
|
||||
lastErr = fmt.Errorf("volume %d: %w (observed: %v)", vid, err, summary)
|
||||
break
|
||||
}
|
||||
if degraded {
|
||||
lastDegraded = append(lastDegraded, fmt.Sprintf("volume %d: %d/%d shards", vid, union.Count(), totalShards))
|
||||
continue
|
||||
}
|
||||
|
||||
glog.V(0).Infof("EC shard verification ok for volume %d on diskType %q: %d/%d shards present across %d nodes",
|
||||
vid, diskType.ReadableString(), union.Count(), totalShards, len(nodeShards))
|
||||
}
|
||||
|
||||
if lastErr == nil {
|
||||
if lastErr == nil && len(lastDegraded) == 0 {
|
||||
return nil
|
||||
}
|
||||
if attempt < maxAttempts-1 {
|
||||
glog.V(0).Infof("EC shard verification incomplete (attempt %d/%d), waiting for shard locations to propagate: %v",
|
||||
attempt+1, maxAttempts, lastErr)
|
||||
glog.V(0).Infof("EC shard verification incomplete (attempt %d/%d), waiting for shard locations to propagate: %v %v",
|
||||
attempt+1, maxAttempts, lastErr, lastDegraded)
|
||||
time.Sleep(retryInterval)
|
||||
}
|
||||
}
|
||||
|
||||
glog.Errorf("EC shard verification failed after %d attempts: %v", maxAttempts, lastErr)
|
||||
return lastErr
|
||||
if lastErr != nil {
|
||||
glog.Errorf("EC shard verification failed after %d attempts: %v", maxAttempts, lastErr)
|
||||
return lastErr
|
||||
}
|
||||
glog.Warningf("EC shard set incomplete but recoverable after %d attempts, proceeding with source deletion (rebuild missing shards with ec.rebuild): %v",
|
||||
maxAttempts, lastDegraded)
|
||||
return nil
|
||||
}
|
||||
|
||||
// doDeleteVolumesWithLocations deletes volumes using pre-collected location information
|
||||
|
||||
@@ -215,6 +215,14 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
|
||||
return false
|
||||
}
|
||||
|
||||
// Sweep a leftover empty .dat stub before any EC presence checks below.
|
||||
// It must go first: next to an .ecx it would otherwise make
|
||||
// validateEcVolume mistake a healthy distributed EC volume for an
|
||||
// interrupted local encode and delete its shards.
|
||||
if l.removeEmptyEcDatStub(volumeName, vid, collection) {
|
||||
return false
|
||||
}
|
||||
|
||||
// .vif next to .ecx is EC shard metadata, not a regular volume.
|
||||
// Without this guard NewVolume below would create a phantom empty .dat.
|
||||
if strings.HasSuffix(basename, ".vif") && l.hasEcxFile(volumeName) {
|
||||
@@ -256,12 +264,6 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
|
||||
return true
|
||||
}
|
||||
|
||||
// Sweep a leftover empty .dat stub (a phantom from the pre-fix loader)
|
||||
// before it loads as a phantom volume.
|
||||
if l.removeEmptyEcDatStub(volumeName, vid, collection) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Load existing data only; never let NewVolume create a phantom .dat. A
|
||||
// lone .vif/.idx (e.g. an EC sidecar whose .ecx is on a sibling disk,
|
||||
// which the same-disk hasEcxFile() guard misses) would otherwise get an
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
|
||||
)
|
||||
|
||||
@@ -388,12 +389,14 @@ func (l *DiskLocation) handleFoundEcxFile(shards []string, collection string, vo
|
||||
}
|
||||
}
|
||||
|
||||
// checkDatFileExists checks if .dat file exists with robust error handling.
|
||||
// Unexpected errors (permission, I/O) are treated as "exists" to avoid misclassifying
|
||||
// local EC as distributed EC, which is the safer fallback.
|
||||
// checkDatFileExists checks if a .dat file with actual data exists with robust
|
||||
// error handling. An empty .dat (<= a superblock, zero needles) is a leftover
|
||||
// stub, not an encode source, and is treated as absent so it never justifies
|
||||
// deleting shards. Unexpected errors (permission, I/O) are treated as "exists"
|
||||
// to avoid misclassifying local EC as distributed EC, which is the safer fallback.
|
||||
func (l *DiskLocation) checkDatFileExists(datFileName string) bool {
|
||||
if _, err := os.Stat(datFileName); err == nil {
|
||||
return true
|
||||
if fi, err := os.Stat(datFileName); err == nil {
|
||||
return fi.Size() > int64(super_block.SuperBlockSize)
|
||||
} else if !os.IsNotExist(err) {
|
||||
glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err)
|
||||
// Safer to assume local .dat exists to avoid misclassifying as distributed EC
|
||||
@@ -477,9 +480,15 @@ func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId)
|
||||
dataShards := l.ecDataShardsFromVif(collection, vid)
|
||||
|
||||
// If .dat file exists, compute exact expected shard size from it.
|
||||
// An empty .dat (<= a superblock, zero needles) cannot be the encode
|
||||
// source -- it is a leftover stub -- so treat it as absent rather than
|
||||
// letting it mark a healthy distributed EC volume as an interrupted
|
||||
// local encode, which would delete its shards.
|
||||
if datFileInfo, err := os.Stat(datFileName); err == nil {
|
||||
datExists = true
|
||||
expectedShardSize = calculateExpectedShardSize(datFileInfo.Size(), dataShards)
|
||||
if datFileInfo.Size() > int64(super_block.SuperBlockSize) {
|
||||
datExists = true
|
||||
expectedShardSize = calculateExpectedShardSize(datFileInfo.Size(), dataShards)
|
||||
}
|
||||
} else if !os.IsNotExist(err) {
|
||||
// If stat fails with unexpected error (permission, I/O), fail validation
|
||||
// Don't treat this as "distributed EC" - it could be a temporary error
|
||||
|
||||
@@ -19,7 +19,7 @@ type ServerShardInventory struct {
|
||||
// Query errors are recorded per-server and treated as zero shards rather
|
||||
// than aborting the scan, so the caller still sees partial coverage from
|
||||
// healthy peers when one server is down. The caller gates destructive
|
||||
// actions on RequireFullShardSet against the returned union.
|
||||
// actions on RequireRecoverableShardSet against the returned union.
|
||||
func VerifyShardsAcrossServers(ctx context.Context, volumeID uint32,
|
||||
servers []string, dialOption grpc.DialOption) (
|
||||
union ShardBits, perServer map[string]ServerShardInventory) {
|
||||
@@ -63,14 +63,25 @@ func VerifyShardsAcrossServers(ctx context.Context, volumeID uint32,
|
||||
return union, perServer
|
||||
}
|
||||
|
||||
// totalShards is the configured DataShards+ParityShards for this volume.
|
||||
// Passed as a parameter (not derived from TotalShardsCount) so enterprise
|
||||
// builds with custom EC ratios share this helper verbatim.
|
||||
func RequireFullShardSet(volumeID uint32, shardsPresent ShardBits, totalShards int) error {
|
||||
// RequireRecoverableShardSet gates source-volume deletion after EC encode:
|
||||
// a non-empty .dat may only be deleted when enough distinct shards exist to
|
||||
// reconstruct the volume (>= dataShards). A full set returns (false, nil); a
|
||||
// degraded-but-recoverable set returns (true, nil) so the caller can warn and
|
||||
// proceed -- the missing shards can be rebuilt from the survivors, while
|
||||
// keeping the source next to live shards is the more dangerous mixed state.
|
||||
// Below dataShards it returns an error and the source must be kept.
|
||||
// dataShards/totalShards are passed as parameters (not derived from the
|
||||
// package constants) so enterprise builds with custom EC ratios share this
|
||||
// helper verbatim.
|
||||
func RequireRecoverableShardSet(volumeID uint32, shardsPresent ShardBits, dataShards, totalShards int) (degraded bool, err error) {
|
||||
if totalShards <= 0 || totalShards > MaxShardCount {
|
||||
return fmt.Errorf("invalid totalShards %d for volume %d (must be in [1, %d])",
|
||||
return false, fmt.Errorf("invalid totalShards %d for volume %d (must be in [1, %d])",
|
||||
totalShards, volumeID, MaxShardCount)
|
||||
}
|
||||
if dataShards <= 0 || dataShards > totalShards {
|
||||
return false, fmt.Errorf("invalid dataShards %d for volume %d (must be in [1, %d])",
|
||||
dataShards, volumeID, totalShards)
|
||||
}
|
||||
var missing []int
|
||||
for id := 0; id < totalShards; id++ {
|
||||
if !shardsPresent.Has(ShardId(id)) {
|
||||
@@ -78,11 +89,14 @@ func RequireFullShardSet(volumeID uint32, shardsPresent ShardBits, totalShards i
|
||||
}
|
||||
}
|
||||
if len(missing) == 0 {
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
if totalShards-len(missing) >= dataShards {
|
||||
return true, nil
|
||||
}
|
||||
sort.Ints(missing)
|
||||
return fmt.Errorf("EC shard set incomplete for volume %d: %d/%d shards present, missing shard ids %v",
|
||||
volumeID, shardsPresent.Count(), totalShards, missing)
|
||||
return false, fmt.Errorf("EC shard set unrecoverable for volume %d: %d/%d shards present, need %d to reconstruct, missing shard ids %v",
|
||||
volumeID, totalShards-len(missing), totalShards, dataShards, missing)
|
||||
}
|
||||
|
||||
func SummarizeShardInventory(perServer map[string]ServerShardInventory) string {
|
||||
|
||||
@@ -5,17 +5,23 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRequireFullShardSet_AllPresent(t *testing.T) {
|
||||
func TestRequireRecoverableShardSet_AllPresent(t *testing.T) {
|
||||
var bits ShardBits
|
||||
for id := 0; id < TotalShardsCount; id++ {
|
||||
bits = bits.Set(ShardId(id))
|
||||
}
|
||||
if err := RequireFullShardSet(42, bits, TotalShardsCount); err != nil {
|
||||
degraded, err := RequireRecoverableShardSet(42, bits, DataShardsCount, TotalShardsCount)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error for full set: %v", err)
|
||||
}
|
||||
if degraded {
|
||||
t.Error("full set must not be reported as degraded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequireFullShardSet_ReportsMissingIds(t *testing.T) {
|
||||
func TestRequireRecoverableShardSet_DegradedButRecoverable(t *testing.T) {
|
||||
// 12 of 14 shards: enough to reconstruct (>= 10), so the source may be
|
||||
// deleted, but the caller is told to warn and schedule a rebuild.
|
||||
var bits ShardBits
|
||||
for id := 0; id < TotalShardsCount; id++ {
|
||||
if id == 3 || id == 7 {
|
||||
@@ -23,24 +29,39 @@ func TestRequireFullShardSet_ReportsMissingIds(t *testing.T) {
|
||||
}
|
||||
bits = bits.Set(ShardId(id))
|
||||
}
|
||||
err := RequireFullShardSet(42, bits, TotalShardsCount)
|
||||
degraded, err := RequireRecoverableShardSet(42, bits, DataShardsCount, TotalShardsCount)
|
||||
if err != nil {
|
||||
t.Fatalf("recoverable set must not error: %v", err)
|
||||
}
|
||||
if !degraded {
|
||||
t.Error("missing shards must be reported as degraded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequireRecoverableShardSet_BelowDataShards(t *testing.T) {
|
||||
// 9 of 14 shards: one short of reconstructable; the source must be kept.
|
||||
var bits ShardBits
|
||||
for id := 0; id < DataShardsCount-1; id++ {
|
||||
bits = bits.Set(ShardId(id))
|
||||
}
|
||||
_, err := RequireRecoverableShardSet(42, bits, DataShardsCount, TotalShardsCount)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for incomplete set, got nil")
|
||||
t.Fatal("expected error for unrecoverable set, got nil")
|
||||
}
|
||||
msg := err.Error()
|
||||
if !strings.Contains(msg, "volume 42") {
|
||||
t.Errorf("error should name the volume id: %s", msg)
|
||||
}
|
||||
if !strings.Contains(msg, "[3 7]") {
|
||||
t.Errorf("error should list missing ids 3 and 7: %s", msg)
|
||||
if !strings.Contains(msg, "9/14") {
|
||||
t.Errorf("error should report 9/14 shards present: %s", msg)
|
||||
}
|
||||
if !strings.Contains(msg, "12/14") {
|
||||
t.Errorf("error should report 12/14 shards present: %s", msg)
|
||||
if !strings.Contains(msg, "[9 10 11 12 13]") {
|
||||
t.Errorf("error should list the missing ids: %s", msg)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequireFullShardSet_EmptyBitmap(t *testing.T) {
|
||||
err := RequireFullShardSet(1, 0, TotalShardsCount)
|
||||
func TestRequireRecoverableShardSet_EmptyBitmap(t *testing.T) {
|
||||
_, err := RequireRecoverableShardSet(1, 0, DataShardsCount, TotalShardsCount)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for empty bitmap")
|
||||
}
|
||||
@@ -49,37 +70,49 @@ func TestRequireFullShardSet_EmptyBitmap(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequireFullShardSet_CustomRatio(t *testing.T) {
|
||||
func TestRequireRecoverableShardSet_CustomRatio(t *testing.T) {
|
||||
// 6+3 ratio: total=9, all present
|
||||
var bits ShardBits
|
||||
for id := 0; id < 9; id++ {
|
||||
bits = bits.Set(ShardId(id))
|
||||
}
|
||||
if err := RequireFullShardSet(7, bits, 9); err != nil {
|
||||
t.Fatalf("unexpected error for full 6+3 set: %v", err)
|
||||
if degraded, err := RequireRecoverableShardSet(7, bits, 6, 9); err != nil || degraded {
|
||||
t.Fatalf("full 6+3 set: degraded=%v err=%v", degraded, err)
|
||||
}
|
||||
|
||||
// 6+3, missing shard 5
|
||||
// 6+3, missing shard 5: 8 >= 6 remain, recoverable but degraded
|
||||
bits = bits.Clear(5)
|
||||
err := RequireFullShardSet(7, bits, 9)
|
||||
if degraded, err := RequireRecoverableShardSet(7, bits, 6, 9); err != nil || !degraded {
|
||||
t.Fatalf("8/9 shards of 6+3: degraded=%v err=%v", degraded, err)
|
||||
}
|
||||
|
||||
// 6+3, only 5 shards left: below dataShards, must error
|
||||
bits = ShardBits(0)
|
||||
for id := 0; id < 5; id++ {
|
||||
bits = bits.Set(ShardId(id))
|
||||
}
|
||||
_, err := RequireRecoverableShardSet(7, bits, 6, 9)
|
||||
if err == nil {
|
||||
t.Fatal("expected error when shard 5 is missing in 6+3 ratio")
|
||||
t.Fatal("expected error with 5/9 shards in 6+3 ratio")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "8/9") {
|
||||
t.Errorf("error should report 8/9: %s", err.Error())
|
||||
}
|
||||
if !strings.Contains(err.Error(), "[5]") {
|
||||
t.Errorf("error should list missing id 5: %s", err.Error())
|
||||
if !strings.Contains(err.Error(), "5/9") {
|
||||
t.Errorf("error should report 5/9: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequireFullShardSet_RejectsInvalidTotal(t *testing.T) {
|
||||
if err := RequireFullShardSet(1, 0, 0); err == nil {
|
||||
func TestRequireRecoverableShardSet_RejectsInvalidParams(t *testing.T) {
|
||||
if _, err := RequireRecoverableShardSet(1, 0, DataShardsCount, 0); err == nil {
|
||||
t.Error("expected error for totalShards=0")
|
||||
}
|
||||
if err := RequireFullShardSet(1, 0, MaxShardCount+1); err == nil {
|
||||
if _, err := RequireRecoverableShardSet(1, 0, DataShardsCount, MaxShardCount+1); err == nil {
|
||||
t.Errorf("expected error for totalShards > MaxShardCount")
|
||||
}
|
||||
if _, err := RequireRecoverableShardSet(1, 0, 0, TotalShardsCount); err == nil {
|
||||
t.Error("expected error for dataShards=0")
|
||||
}
|
||||
if _, err := RequireRecoverableShardSet(1, 0, TotalShardsCount+1, TotalShardsCount); err == nil {
|
||||
t.Error("expected error for dataShards > totalShards")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSummarizeShardInventory_Deterministic(t *testing.T) {
|
||||
|
||||
@@ -244,3 +244,142 @@ func TestRemoveEmptyEcDatStubFindsVifInIdxDir(t *testing.T) {
|
||||
t.Error(".dat stub was not removed")
|
||||
}
|
||||
}
|
||||
|
||||
// startEcTestStore starts a single-disk store over dir and drains its
|
||||
// notification channels until cleanup.
|
||||
func startEcTestStore(t *testing.T, dir string) *Store {
|
||||
t.Helper()
|
||||
store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id",
|
||||
[]string{dir},
|
||||
[]int32{100},
|
||||
[]util.MinFreeSpace{{}},
|
||||
"",
|
||||
NeedleMapInMemory,
|
||||
[]types.DiskType{types.HardDriveType},
|
||||
nil,
|
||||
3,
|
||||
stats.DefaultDiskIOProbeConfig(),
|
||||
)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-store.NewVolumesChan:
|
||||
case <-store.NewEcShardsChan:
|
||||
case <-store.DeletedVolumesChan:
|
||||
case <-store.DeletedEcShardsChan:
|
||||
case <-store.StateUpdateChan:
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
t.Cleanup(func() {
|
||||
store.Close()
|
||||
close(done)
|
||||
})
|
||||
return store
|
||||
}
|
||||
|
||||
// writeEcShardFixture lays down .ecx, .ecj, and the given shards for a
|
||||
// distributed EC volume in dir, each shard truncated to shardSize.
|
||||
func writeEcShardFixture(t *testing.T, base string, shardIds []int, shardSize int64) {
|
||||
t.Helper()
|
||||
for _, sid := range shardIds {
|
||||
f, err := os.Create(base + erasure_coding.ToExt(sid))
|
||||
if err != nil {
|
||||
t.Fatalf("create shard %d: %v", sid, err)
|
||||
}
|
||||
if err := f.Truncate(shardSize); err != nil {
|
||||
f.Close()
|
||||
t.Fatalf("truncate shard %d: %v", sid, err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
t.Fatalf("close shard %d: %v", sid, err)
|
||||
}
|
||||
}
|
||||
if err := os.WriteFile(base+".ecx", make([]byte, 20), 0o644); err != nil {
|
||||
t.Fatalf("write .ecx: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(base+".ecj", nil, 0o644); err != nil {
|
||||
t.Fatalf("write .ecj: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEmptyDatStubNextToEcxDoesNotDeleteShards: a disk holding a few local
|
||||
// shards of a healthy distributed EC volume plus a leftover empty .dat stub.
|
||||
// The stub used to make startup classify the volume as an interrupted local
|
||||
// encode (fewer than dataShards local shards) and delete the only copies of
|
||||
// those shards. The stub must be swept and the shards must load.
|
||||
func TestEmptyDatStubNextToEcxDoesNotDeleteShards(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
collection := "warp-rec"
|
||||
vid := needle.VolumeId(87)
|
||||
base := erasure_coding.EcShardFileName(collection, dir, int(vid))
|
||||
|
||||
shardIds := []int{0, 5}
|
||||
writeEcShardFixture(t, base, shardIds, int64(erasure_coding.ErasureCodingSmallBlockSize))
|
||||
// Zero-byte stub .dat: the phantom left by the pre-fix loader.
|
||||
if err := os.WriteFile(base+".dat", nil, 0o644); err != nil {
|
||||
t.Fatalf("write stub .dat: %v", err)
|
||||
}
|
||||
if err := volume_info.SaveVolumeInfo(base+".vif", &volume_server_pb.VolumeInfo{
|
||||
Version: uint32(needle.Version3),
|
||||
EcShardConfig: &volume_server_pb.EcShardConfig{DataShards: 10, ParityShards: 4},
|
||||
}); err != nil {
|
||||
t.Fatalf("save .vif: %v", err)
|
||||
}
|
||||
|
||||
store := startEcTestStore(t, dir)
|
||||
|
||||
loc := store.Locations[0]
|
||||
for _, sid := range shardIds {
|
||||
if !util.FileExists(base + erasure_coding.ToExt(sid)) {
|
||||
t.Errorf("EC shard file %d was deleted", sid)
|
||||
}
|
||||
if _, found := loc.FindEcShard(vid, erasure_coding.ShardId(sid)); !found {
|
||||
t.Errorf("EC shard %d was not loaded", sid)
|
||||
}
|
||||
}
|
||||
if !util.FileExists(base + ".ecx") {
|
||||
t.Error(".ecx was deleted")
|
||||
}
|
||||
if util.FileExists(base + ".dat") {
|
||||
t.Error("empty .dat stub was not swept")
|
||||
}
|
||||
if store.findVolume(vid) != nil {
|
||||
t.Errorf("stub was loaded as a phantom volume %d", vid)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEmptyDatWithoutVifDoesNotDeleteShards: same shard-holding disk but the
|
||||
// stub has no .vif at all, so the sweep has no EC evidence and must leave it.
|
||||
// The empty .dat still must not count as an encode source: the shards survive
|
||||
// and load as a distributed EC volume.
|
||||
func TestEmptyDatWithoutVifDoesNotDeleteShards(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
collection := "warp-rec"
|
||||
vid := needle.VolumeId(88)
|
||||
base := erasure_coding.EcShardFileName(collection, dir, int(vid))
|
||||
|
||||
shardIds := []int{1, 7}
|
||||
writeEcShardFixture(t, base, shardIds, int64(erasure_coding.ErasureCodingSmallBlockSize))
|
||||
if err := os.WriteFile(base+".dat", nil, 0o644); err != nil {
|
||||
t.Fatalf("write stub .dat: %v", err)
|
||||
}
|
||||
|
||||
store := startEcTestStore(t, dir)
|
||||
|
||||
loc := store.Locations[0]
|
||||
for _, sid := range shardIds {
|
||||
if !util.FileExists(base + erasure_coding.ToExt(sid)) {
|
||||
t.Errorf("EC shard file %d was deleted", sid)
|
||||
}
|
||||
if _, found := loc.FindEcShard(vid, erasure_coding.ShardId(sid)); !found {
|
||||
t.Errorf("EC shard %d was not loaded", sid)
|
||||
}
|
||||
}
|
||||
if store.findVolume(vid) != nil {
|
||||
t.Errorf("stub was loaded as a phantom volume %d", vid)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -708,7 +708,8 @@ func (t *ErasureCodingTask) verifyEcShardsBeforeDelete(ctx context.Context) erro
|
||||
"per_server": summary,
|
||||
}).Info("EC shard inventory before source deletion")
|
||||
|
||||
if err := erasure_coding.RequireFullShardSet(t.volumeID, union, totalShards); err != nil {
|
||||
degraded, err := erasure_coding.RequireRecoverableShardSet(t.volumeID, union, int(t.dataShards), totalShards)
|
||||
if err != nil {
|
||||
t.GetLogger().WithFields(map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
"per_server": summary,
|
||||
@@ -716,6 +717,17 @@ func (t *ErasureCodingTask) verifyEcShardsBeforeDelete(ctx context.Context) erro
|
||||
}).Error("EC shard verification failed — source volume will be kept")
|
||||
return err
|
||||
}
|
||||
if degraded {
|
||||
// Enough shards to reconstruct; the missing ones can be rebuilt from
|
||||
// the survivors, while keeping the source next to live shards is the
|
||||
// more dangerous mixed state.
|
||||
t.GetLogger().WithFields(map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
"shards_seen": union.Count(),
|
||||
"shards_total": totalShards,
|
||||
"per_server": summary,
|
||||
}).Warning("EC shard set incomplete but recoverable; proceeding with source deletion")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user