diff --git a/seaweed-volume/src/storage/disk_location.rs b/seaweed-volume/src/storage/disk_location.rs index 18c702483..81dbe2809 100644 --- a/seaweed-volume/src/storage/disk_location.rs +++ b/seaweed-volume/src/storage/disk_location.rs @@ -192,8 +192,14 @@ impl DiskLocation { // the sibling-.dat prune deletes real shards against. Remote-tiered // volumes also have no local `.dat`, but their `.vif` points at // remote files and must still load via the remote path. + // Plain existence check (not check_dat_file_exists): an empty + // .dat here is a fresh, needle-less volume that must still load. + // A stat error other than NotFound counts as present so a + // transient error doesn't skip a real volume. let dat_path = format!("{}.dat", volume_name); - if !check_dat_file_exists(&dat_path) + let dat_missing = matches!(fs::metadata(&dat_path), + Err(ref e) if e.kind() == io::ErrorKind::NotFound); + if dat_missing && !vif_references_remote_file(&format!("{}.vif", volume_name)) && !vif_references_remote_file(&format!("{}.vif", idx_name)) { @@ -243,15 +249,21 @@ impl DiskLocation { let dat_path = format!("{}.dat", base); let mut expected_shard_size: Option = None; - let dat_exists = std::path::Path::new(&dat_path).exists(); - - if dat_exists { - if let Ok(meta) = fs::metadata(&dat_path) { + // An empty .dat (<= a superblock, zero needles) cannot be the encode + // source -- it is a leftover stub -- so treat it as absent rather than + // letting it mark a healthy distributed EC volume as an interrupted + // local encode, which would delete its shards. + let dat_exists = match fs::metadata(&dat_path) { + Ok(meta) if meta.len() > SUPER_BLOCK_SIZE as u64 => { expected_shard_size = Some(calculate_expected_shard_size(meta.len() as i64)); - } else { - return false; + true } - } + Ok(_) => false, + Err(e) if e.kind() == io::ErrorKind::NotFound => false, + // Unexpected stat error: don't risk classifying local EC as + // distributed; fail validation instead of deleting anything. + Err(_) => return false, + }; let mut shard_count = 0usize; let mut actual_shard_size: Option = None; @@ -1060,12 +1072,14 @@ fn parse_ec_shard_extension(ext: &str) -> Option { Some(id) } -/// Robust `.dat` existence check: any unexpected stat error (permission, -/// I/O) is treated as "exists" so we don't misclassify local EC as -/// distributed EC. Mirrors `checkDatFileExists` in Go. +/// Robust check that a `.dat` with actual data exists. An empty `.dat` +/// (<= a superblock, zero needles) is a leftover stub, not an encode source, +/// and counts as absent so it never justifies deleting shards. Any unexpected +/// stat error (permission, I/O) is treated as "exists" so we don't misclassify +/// local EC as distributed EC. Mirrors `checkDatFileExists` in Go. fn check_dat_file_exists(path: &str) -> bool { match fs::metadata(path) { - Ok(_) => true, + Ok(meta) => meta.len() > SUPER_BLOCK_SIZE as u64, Err(e) if e.kind() == io::ErrorKind::NotFound => false, Err(_) => true, } diff --git a/seaweed-volume/src/storage/store_ec_reconcile.rs b/seaweed-volume/src/storage/store_ec_reconcile.rs index 53d9a7589..0a3edab5e 100644 --- a/seaweed-volume/src/storage/store_ec_reconcile.rs +++ b/seaweed-volume/src/storage/store_ec_reconcile.rs @@ -613,6 +613,97 @@ mod tests { assert_eq!(total, 2, "real EC shards were deleted (total={})", total); } + /// A disk holding a few local shards of a healthy distributed EC volume + /// plus a leftover empty `.dat` stub: the stub must be swept before EC + /// validation can mistake the volume for an interrupted local encode + /// (fewer than data_shards local shards) and delete the only copies of + /// those shards. + #[test] + fn test_empty_dat_stub_next_to_ecx_does_not_delete_shards() { + let tmp = TempDir::new().unwrap(); + let d0 = tmp.path().join("data0"); + std::fs::create_dir_all(&d0).unwrap(); + let dir = d0.to_str().unwrap(); + let coll = "warp-rec"; + let vid = 87u32; + + write_shard(dir, coll, vid, 0); + write_shard(dir, coll, vid, 5); + write_index_files(dir, coll, vid, 10, 4); + // Zero-byte stub .dat: the phantom left by the pre-fix loader. + std::fs::write(d0.join(format!("{}_{}.dat", coll, vid)), b"").unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dir, + dir, + 100, + DiskType::HardDrive, + MinFreeSpace::Percent(0.0), + Vec::new(), + ) + .unwrap(); + + assert!( + !d0.join(format!("{}_{}.dat", coll, vid)).exists(), + "empty .dat stub was not swept" + ); + assert_eq!( + store.locations[0].ec_shard_count(), + 2, + "EC shards were deleted on the stub's account" + ); + assert_eq!( + store.locations[0].volume_ids().len(), + 0, + "stub was loaded as a phantom volume" + ); + } + + /// Same shard-holding disk but the stub has no `.vif` at all, so the + /// sweep has no EC evidence and must leave it. The empty `.dat` still + /// must not count as an encode source: the shards survive and load as + /// a distributed EC volume. + #[test] + fn test_empty_dat_without_vif_does_not_delete_shards() { + let tmp = TempDir::new().unwrap(); + let d0 = tmp.path().join("data0"); + std::fs::create_dir_all(&d0).unwrap(); + let dir = d0.to_str().unwrap(); + let coll = "warp-rec"; + let vid = 88u32; + + write_shard(dir, coll, vid, 1); + write_shard(dir, coll, vid, 7); + write_index_files(dir, coll, vid, 10, 4); + std::fs::remove_file(d0.join(format!("{}_{}.vif", coll, vid))).unwrap(); + std::fs::write(d0.join(format!("{}_{}.dat", coll, vid)), b"").unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dir, + dir, + 100, + DiskType::HardDrive, + MinFreeSpace::Percent(0.0), + Vec::new(), + ) + .unwrap(); + + assert_eq!( + store.locations[0].ec_shard_count(), + 2, + "EC shards were deleted on the empty .dat's account" + ); + assert_eq!( + store.locations[0].volume_ids().len(), + 0, + "empty .dat was loaded as a phantom volume" + ); + } + /// Regression for the prune credibility gate: when the EC `.vif` /// records no source size (`dat_file_size == 0`), an empty 8-byte /// sibling `.dat` stub must NOT justify deleting partial EC shards. diff --git a/test/volume_server/grpc/ec_verify_multi_disk_test.go b/test/volume_server/grpc/ec_verify_multi_disk_test.go index 79a82efb2..1de4ee199 100644 --- a/test/volume_server/grpc/ec_verify_multi_disk_test.go +++ b/test/volume_server/grpc/ec_verify_multi_disk_test.go @@ -159,15 +159,22 @@ func TestVolumeEcShardsInfoReturnsAllShardsAcrossDisks(t *testing.T) { // End-to-end assertion via the same helper the worker uses to gate // source-volume deletion (weed/worker/tasks/erasure_coding/ec_task.go // verifyEcShardsBeforeDelete). The union across destinations is what - // RequireFullShardSet measures; with one destination that holds every - // shard, the union must cover dataShards + parityShards. + // RequireRecoverableShardSet measures; with one destination that holds + // every shard, the union must cover dataShards + parityShards without + // being reported as degraded. dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) servers := []string{clusterHarness.VolumeServerAddress()} union, perServer := erasure_coding.VerifyShardsAcrossServers(ctx, volumeID, servers, dialOption) - if err := erasure_coding.RequireFullShardSet(volumeID, union, erasure_coding.TotalShardsCount); err != nil { + degraded, err := erasure_coding.RequireRecoverableShardSet( + volumeID, union, erasure_coding.DataShardsCount, erasure_coding.TotalShardsCount) + if err != nil { t.Fatalf("verifyEcShardsBeforeDelete-equivalent gate failed: %v\nper-server inventory: %s\nper-disk layout: %v", err, erasure_coding.SummarizeShardInventory(perServer), postReconcileLayout) } + if degraded { + t.Fatalf("verifyEcShardsBeforeDelete-equivalent gate reported a degraded shard set\nper-server inventory: %s\nper-disk layout: %v", + erasure_coding.SummarizeShardInventory(perServer), postReconcileLayout) + } if got, want := union.Count(), erasure_coding.TotalShardsCount; got != want { t.Fatalf("VerifyShardsAcrossServers union covered %d/%d shards (per-server=%s, layout=%v)", got, want, erasure_coding.SummarizeShardInventory(perServer), postReconcileLayout) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 1e69f6172..7c775640b 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -594,12 +594,16 @@ func verifyEcShardsBeforeDelete(commandEnv *CommandEnv, volumeIds []needle.Volum // volume-server heartbeats, so freshly distributed shards may not all be // visible in the master topology immediately. Poll a few times before // concluding the shard set is incomplete, so a heartbeat-propagation lag is - // not mistaken for missing data (which would abort the encode). Genuine loss - // still fails after the retries are exhausted. + // not mistaken for missing data. After the retries: a volume below the + // recoverable threshold (dataShards) aborts the deletion; a recoverable + // but degraded set proceeds with a warning, since the missing shards can + // be rebuilt from the survivors while keeping the source next to live + // shards is the more dangerous mixed state. const maxAttempts = 10 const retryInterval = 2 * time.Second var lastErr error + var lastDegraded []string for attempt := 0; attempt < maxAttempts; attempt++ { topoInfo, _, err := collectTopologyInfo(commandEnv, 0) if err != nil { @@ -607,6 +611,7 @@ func verifyEcShardsBeforeDelete(commandEnv *CommandEnv, volumeIds []needle.Volum } lastErr = nil + lastDegraded = lastDegraded[:0] for _, vid := range volumeIds { nodeShards, _ := collectEcNodeShardsInfo(topoInfo, vid, diskType) @@ -616,7 +621,8 @@ func verifyEcShardsBeforeDelete(commandEnv *CommandEnv, volumeIds []needle.Volum } totalShards := erasure_coding.TotalShardsCount - if err := erasure_coding.RequireFullShardSet(uint32(vid), union, totalShards); err != nil { + degraded, err := erasure_coding.RequireRecoverableShardSet(uint32(vid), union, erasure_coding.DataShardsCount, totalShards) + if err != nil { summary := make([]string, 0, len(nodeShards)) for node, info := range nodeShards { summary = append(summary, fmt.Sprintf("%s=%s", node, info.String())) @@ -625,23 +631,32 @@ func verifyEcShardsBeforeDelete(commandEnv *CommandEnv, volumeIds []needle.Volum lastErr = fmt.Errorf("volume %d: %w (observed: %v)", vid, err, summary) break } + if degraded { + lastDegraded = append(lastDegraded, fmt.Sprintf("volume %d: %d/%d shards", vid, union.Count(), totalShards)) + continue + } glog.V(0).Infof("EC shard verification ok for volume %d on diskType %q: %d/%d shards present across %d nodes", vid, diskType.ReadableString(), union.Count(), totalShards, len(nodeShards)) } - if lastErr == nil { + if lastErr == nil && len(lastDegraded) == 0 { return nil } if attempt < maxAttempts-1 { - glog.V(0).Infof("EC shard verification incomplete (attempt %d/%d), waiting for shard locations to propagate: %v", - attempt+1, maxAttempts, lastErr) + glog.V(0).Infof("EC shard verification incomplete (attempt %d/%d), waiting for shard locations to propagate: %v %v", + attempt+1, maxAttempts, lastErr, lastDegraded) time.Sleep(retryInterval) } } - glog.Errorf("EC shard verification failed after %d attempts: %v", maxAttempts, lastErr) - return lastErr + if lastErr != nil { + glog.Errorf("EC shard verification failed after %d attempts: %v", maxAttempts, lastErr) + return lastErr + } + glog.Warningf("EC shard set incomplete but recoverable after %d attempts, proceeding with source deletion (rebuild missing shards with ec.rebuild): %v", + maxAttempts, lastDegraded) + return nil } // doDeleteVolumesWithLocations deletes volumes using pre-collected location information diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index eb47e1fcc..33312eb49 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -215,6 +215,14 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne return false } + // Sweep a leftover empty .dat stub before any EC presence checks below. + // It must go first: next to an .ecx it would otherwise make + // validateEcVolume mistake a healthy distributed EC volume for an + // interrupted local encode and delete its shards. + if l.removeEmptyEcDatStub(volumeName, vid, collection) { + return false + } + // .vif next to .ecx is EC shard metadata, not a regular volume. // Without this guard NewVolume below would create a phantom empty .dat. if strings.HasSuffix(basename, ".vif") && l.hasEcxFile(volumeName) { @@ -256,12 +264,6 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne return true } - // Sweep a leftover empty .dat stub (a phantom from the pre-fix loader) - // before it loads as a phantom volume. - if l.removeEmptyEcDatStub(volumeName, vid, collection) { - return false - } - // Load existing data only; never let NewVolume create a phantom .dat. A // lone .vif/.idx (e.g. an EC sidecar whose .ecx is on a sibling disk, // which the same-disk hasEcxFile() guard misses) would otherwise get an diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index c6e4c1824..24d43b337 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -13,6 +13,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/volume_info" ) @@ -388,12 +389,14 @@ func (l *DiskLocation) handleFoundEcxFile(shards []string, collection string, vo } } -// checkDatFileExists checks if .dat file exists with robust error handling. -// Unexpected errors (permission, I/O) are treated as "exists" to avoid misclassifying -// local EC as distributed EC, which is the safer fallback. +// checkDatFileExists checks if a .dat file with actual data exists with robust +// error handling. An empty .dat (<= a superblock, zero needles) is a leftover +// stub, not an encode source, and is treated as absent so it never justifies +// deleting shards. Unexpected errors (permission, I/O) are treated as "exists" +// to avoid misclassifying local EC as distributed EC, which is the safer fallback. func (l *DiskLocation) checkDatFileExists(datFileName string) bool { - if _, err := os.Stat(datFileName); err == nil { - return true + if fi, err := os.Stat(datFileName); err == nil { + return fi.Size() > int64(super_block.SuperBlockSize) } else if !os.IsNotExist(err) { glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err) // Safer to assume local .dat exists to avoid misclassifying as distributed EC @@ -477,9 +480,15 @@ func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId) dataShards := l.ecDataShardsFromVif(collection, vid) // If .dat file exists, compute exact expected shard size from it. + // An empty .dat (<= a superblock, zero needles) cannot be the encode + // source -- it is a leftover stub -- so treat it as absent rather than + // letting it mark a healthy distributed EC volume as an interrupted + // local encode, which would delete its shards. if datFileInfo, err := os.Stat(datFileName); err == nil { - datExists = true - expectedShardSize = calculateExpectedShardSize(datFileInfo.Size(), dataShards) + if datFileInfo.Size() > int64(super_block.SuperBlockSize) { + datExists = true + expectedShardSize = calculateExpectedShardSize(datFileInfo.Size(), dataShards) + } } else if !os.IsNotExist(err) { // If stat fails with unexpected error (permission, I/O), fail validation // Don't treat this as "distributed EC" - it could be a temporary error diff --git a/weed/storage/erasure_coding/verification.go b/weed/storage/erasure_coding/verification.go index 0be6fa7c3..caf540154 100644 --- a/weed/storage/erasure_coding/verification.go +++ b/weed/storage/erasure_coding/verification.go @@ -19,7 +19,7 @@ type ServerShardInventory struct { // Query errors are recorded per-server and treated as zero shards rather // than aborting the scan, so the caller still sees partial coverage from // healthy peers when one server is down. The caller gates destructive -// actions on RequireFullShardSet against the returned union. +// actions on RequireRecoverableShardSet against the returned union. func VerifyShardsAcrossServers(ctx context.Context, volumeID uint32, servers []string, dialOption grpc.DialOption) ( union ShardBits, perServer map[string]ServerShardInventory) { @@ -63,14 +63,25 @@ func VerifyShardsAcrossServers(ctx context.Context, volumeID uint32, return union, perServer } -// totalShards is the configured DataShards+ParityShards for this volume. -// Passed as a parameter (not derived from TotalShardsCount) so enterprise -// builds with custom EC ratios share this helper verbatim. -func RequireFullShardSet(volumeID uint32, shardsPresent ShardBits, totalShards int) error { +// RequireRecoverableShardSet gates source-volume deletion after EC encode: +// a non-empty .dat may only be deleted when enough distinct shards exist to +// reconstruct the volume (>= dataShards). A full set returns (false, nil); a +// degraded-but-recoverable set returns (true, nil) so the caller can warn and +// proceed -- the missing shards can be rebuilt from the survivors, while +// keeping the source next to live shards is the more dangerous mixed state. +// Below dataShards it returns an error and the source must be kept. +// dataShards/totalShards are passed as parameters (not derived from the +// package constants) so enterprise builds with custom EC ratios share this +// helper verbatim. +func RequireRecoverableShardSet(volumeID uint32, shardsPresent ShardBits, dataShards, totalShards int) (degraded bool, err error) { if totalShards <= 0 || totalShards > MaxShardCount { - return fmt.Errorf("invalid totalShards %d for volume %d (must be in [1, %d])", + return false, fmt.Errorf("invalid totalShards %d for volume %d (must be in [1, %d])", totalShards, volumeID, MaxShardCount) } + if dataShards <= 0 || dataShards > totalShards { + return false, fmt.Errorf("invalid dataShards %d for volume %d (must be in [1, %d])", + dataShards, volumeID, totalShards) + } var missing []int for id := 0; id < totalShards; id++ { if !shardsPresent.Has(ShardId(id)) { @@ -78,11 +89,14 @@ func RequireFullShardSet(volumeID uint32, shardsPresent ShardBits, totalShards i } } if len(missing) == 0 { - return nil + return false, nil + } + if totalShards-len(missing) >= dataShards { + return true, nil } sort.Ints(missing) - return fmt.Errorf("EC shard set incomplete for volume %d: %d/%d shards present, missing shard ids %v", - volumeID, shardsPresent.Count(), totalShards, missing) + return false, fmt.Errorf("EC shard set unrecoverable for volume %d: %d/%d shards present, need %d to reconstruct, missing shard ids %v", + volumeID, totalShards-len(missing), totalShards, dataShards, missing) } func SummarizeShardInventory(perServer map[string]ServerShardInventory) string { diff --git a/weed/storage/erasure_coding/verification_test.go b/weed/storage/erasure_coding/verification_test.go index 8a268069a..78e19005a 100644 --- a/weed/storage/erasure_coding/verification_test.go +++ b/weed/storage/erasure_coding/verification_test.go @@ -5,17 +5,23 @@ import ( "testing" ) -func TestRequireFullShardSet_AllPresent(t *testing.T) { +func TestRequireRecoverableShardSet_AllPresent(t *testing.T) { var bits ShardBits for id := 0; id < TotalShardsCount; id++ { bits = bits.Set(ShardId(id)) } - if err := RequireFullShardSet(42, bits, TotalShardsCount); err != nil { + degraded, err := RequireRecoverableShardSet(42, bits, DataShardsCount, TotalShardsCount) + if err != nil { t.Fatalf("unexpected error for full set: %v", err) } + if degraded { + t.Error("full set must not be reported as degraded") + } } -func TestRequireFullShardSet_ReportsMissingIds(t *testing.T) { +func TestRequireRecoverableShardSet_DegradedButRecoverable(t *testing.T) { + // 12 of 14 shards: enough to reconstruct (>= 10), so the source may be + // deleted, but the caller is told to warn and schedule a rebuild. var bits ShardBits for id := 0; id < TotalShardsCount; id++ { if id == 3 || id == 7 { @@ -23,24 +29,39 @@ func TestRequireFullShardSet_ReportsMissingIds(t *testing.T) { } bits = bits.Set(ShardId(id)) } - err := RequireFullShardSet(42, bits, TotalShardsCount) + degraded, err := RequireRecoverableShardSet(42, bits, DataShardsCount, TotalShardsCount) + if err != nil { + t.Fatalf("recoverable set must not error: %v", err) + } + if !degraded { + t.Error("missing shards must be reported as degraded") + } +} + +func TestRequireRecoverableShardSet_BelowDataShards(t *testing.T) { + // 9 of 14 shards: one short of reconstructable; the source must be kept. + var bits ShardBits + for id := 0; id < DataShardsCount-1; id++ { + bits = bits.Set(ShardId(id)) + } + _, err := RequireRecoverableShardSet(42, bits, DataShardsCount, TotalShardsCount) if err == nil { - t.Fatal("expected error for incomplete set, got nil") + t.Fatal("expected error for unrecoverable set, got nil") } msg := err.Error() if !strings.Contains(msg, "volume 42") { t.Errorf("error should name the volume id: %s", msg) } - if !strings.Contains(msg, "[3 7]") { - t.Errorf("error should list missing ids 3 and 7: %s", msg) + if !strings.Contains(msg, "9/14") { + t.Errorf("error should report 9/14 shards present: %s", msg) } - if !strings.Contains(msg, "12/14") { - t.Errorf("error should report 12/14 shards present: %s", msg) + if !strings.Contains(msg, "[9 10 11 12 13]") { + t.Errorf("error should list the missing ids: %s", msg) } } -func TestRequireFullShardSet_EmptyBitmap(t *testing.T) { - err := RequireFullShardSet(1, 0, TotalShardsCount) +func TestRequireRecoverableShardSet_EmptyBitmap(t *testing.T) { + _, err := RequireRecoverableShardSet(1, 0, DataShardsCount, TotalShardsCount) if err == nil { t.Fatal("expected error for empty bitmap") } @@ -49,37 +70,49 @@ func TestRequireFullShardSet_EmptyBitmap(t *testing.T) { } } -func TestRequireFullShardSet_CustomRatio(t *testing.T) { +func TestRequireRecoverableShardSet_CustomRatio(t *testing.T) { // 6+3 ratio: total=9, all present var bits ShardBits for id := 0; id < 9; id++ { bits = bits.Set(ShardId(id)) } - if err := RequireFullShardSet(7, bits, 9); err != nil { - t.Fatalf("unexpected error for full 6+3 set: %v", err) + if degraded, err := RequireRecoverableShardSet(7, bits, 6, 9); err != nil || degraded { + t.Fatalf("full 6+3 set: degraded=%v err=%v", degraded, err) } - // 6+3, missing shard 5 + // 6+3, missing shard 5: 8 >= 6 remain, recoverable but degraded bits = bits.Clear(5) - err := RequireFullShardSet(7, bits, 9) + if degraded, err := RequireRecoverableShardSet(7, bits, 6, 9); err != nil || !degraded { + t.Fatalf("8/9 shards of 6+3: degraded=%v err=%v", degraded, err) + } + + // 6+3, only 5 shards left: below dataShards, must error + bits = ShardBits(0) + for id := 0; id < 5; id++ { + bits = bits.Set(ShardId(id)) + } + _, err := RequireRecoverableShardSet(7, bits, 6, 9) if err == nil { - t.Fatal("expected error when shard 5 is missing in 6+3 ratio") + t.Fatal("expected error with 5/9 shards in 6+3 ratio") } - if !strings.Contains(err.Error(), "8/9") { - t.Errorf("error should report 8/9: %s", err.Error()) - } - if !strings.Contains(err.Error(), "[5]") { - t.Errorf("error should list missing id 5: %s", err.Error()) + if !strings.Contains(err.Error(), "5/9") { + t.Errorf("error should report 5/9: %s", err.Error()) } } -func TestRequireFullShardSet_RejectsInvalidTotal(t *testing.T) { - if err := RequireFullShardSet(1, 0, 0); err == nil { +func TestRequireRecoverableShardSet_RejectsInvalidParams(t *testing.T) { + if _, err := RequireRecoverableShardSet(1, 0, DataShardsCount, 0); err == nil { t.Error("expected error for totalShards=0") } - if err := RequireFullShardSet(1, 0, MaxShardCount+1); err == nil { + if _, err := RequireRecoverableShardSet(1, 0, DataShardsCount, MaxShardCount+1); err == nil { t.Errorf("expected error for totalShards > MaxShardCount") } + if _, err := RequireRecoverableShardSet(1, 0, 0, TotalShardsCount); err == nil { + t.Error("expected error for dataShards=0") + } + if _, err := RequireRecoverableShardSet(1, 0, TotalShardsCount+1, TotalShardsCount); err == nil { + t.Error("expected error for dataShards > totalShards") + } } func TestSummarizeShardInventory_Deterministic(t *testing.T) { diff --git a/weed/storage/store_ec_phantom_dat_test.go b/weed/storage/store_ec_phantom_dat_test.go index e653549f1..9d15b26d9 100644 --- a/weed/storage/store_ec_phantom_dat_test.go +++ b/weed/storage/store_ec_phantom_dat_test.go @@ -244,3 +244,142 @@ func TestRemoveEmptyEcDatStubFindsVifInIdxDir(t *testing.T) { t.Error(".dat stub was not removed") } } + +// startEcTestStore starts a single-disk store over dir and drains its +// notification channels until cleanup. +func startEcTestStore(t *testing.T, dir string) *Store { + t.Helper() + store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id", + []string{dir}, + []int32{100}, + []util.MinFreeSpace{{}}, + "", + NeedleMapInMemory, + []types.DiskType{types.HardDriveType}, + nil, + 3, + stats.DefaultDiskIOProbeConfig(), + ) + done := make(chan struct{}) + go func() { + for { + select { + case <-store.NewVolumesChan: + case <-store.NewEcShardsChan: + case <-store.DeletedVolumesChan: + case <-store.DeletedEcShardsChan: + case <-store.StateUpdateChan: + case <-done: + return + } + } + }() + t.Cleanup(func() { + store.Close() + close(done) + }) + return store +} + +// writeEcShardFixture lays down .ecx, .ecj, and the given shards for a +// distributed EC volume in dir, each shard truncated to shardSize. +func writeEcShardFixture(t *testing.T, base string, shardIds []int, shardSize int64) { + t.Helper() + for _, sid := range shardIds { + f, err := os.Create(base + erasure_coding.ToExt(sid)) + if err != nil { + t.Fatalf("create shard %d: %v", sid, err) + } + if err := f.Truncate(shardSize); err != nil { + f.Close() + t.Fatalf("truncate shard %d: %v", sid, err) + } + if err := f.Close(); err != nil { + t.Fatalf("close shard %d: %v", sid, err) + } + } + if err := os.WriteFile(base+".ecx", make([]byte, 20), 0o644); err != nil { + t.Fatalf("write .ecx: %v", err) + } + if err := os.WriteFile(base+".ecj", nil, 0o644); err != nil { + t.Fatalf("write .ecj: %v", err) + } +} + +// TestEmptyDatStubNextToEcxDoesNotDeleteShards: a disk holding a few local +// shards of a healthy distributed EC volume plus a leftover empty .dat stub. +// The stub used to make startup classify the volume as an interrupted local +// encode (fewer than dataShards local shards) and delete the only copies of +// those shards. The stub must be swept and the shards must load. +func TestEmptyDatStubNextToEcxDoesNotDeleteShards(t *testing.T) { + dir := t.TempDir() + collection := "warp-rec" + vid := needle.VolumeId(87) + base := erasure_coding.EcShardFileName(collection, dir, int(vid)) + + shardIds := []int{0, 5} + writeEcShardFixture(t, base, shardIds, int64(erasure_coding.ErasureCodingSmallBlockSize)) + // Zero-byte stub .dat: the phantom left by the pre-fix loader. + if err := os.WriteFile(base+".dat", nil, 0o644); err != nil { + t.Fatalf("write stub .dat: %v", err) + } + if err := volume_info.SaveVolumeInfo(base+".vif", &volume_server_pb.VolumeInfo{ + Version: uint32(needle.Version3), + EcShardConfig: &volume_server_pb.EcShardConfig{DataShards: 10, ParityShards: 4}, + }); err != nil { + t.Fatalf("save .vif: %v", err) + } + + store := startEcTestStore(t, dir) + + loc := store.Locations[0] + for _, sid := range shardIds { + if !util.FileExists(base + erasure_coding.ToExt(sid)) { + t.Errorf("EC shard file %d was deleted", sid) + } + if _, found := loc.FindEcShard(vid, erasure_coding.ShardId(sid)); !found { + t.Errorf("EC shard %d was not loaded", sid) + } + } + if !util.FileExists(base + ".ecx") { + t.Error(".ecx was deleted") + } + if util.FileExists(base + ".dat") { + t.Error("empty .dat stub was not swept") + } + if store.findVolume(vid) != nil { + t.Errorf("stub was loaded as a phantom volume %d", vid) + } +} + +// TestEmptyDatWithoutVifDoesNotDeleteShards: same shard-holding disk but the +// stub has no .vif at all, so the sweep has no EC evidence and must leave it. +// The empty .dat still must not count as an encode source: the shards survive +// and load as a distributed EC volume. +func TestEmptyDatWithoutVifDoesNotDeleteShards(t *testing.T) { + dir := t.TempDir() + collection := "warp-rec" + vid := needle.VolumeId(88) + base := erasure_coding.EcShardFileName(collection, dir, int(vid)) + + shardIds := []int{1, 7} + writeEcShardFixture(t, base, shardIds, int64(erasure_coding.ErasureCodingSmallBlockSize)) + if err := os.WriteFile(base+".dat", nil, 0o644); err != nil { + t.Fatalf("write stub .dat: %v", err) + } + + store := startEcTestStore(t, dir) + + loc := store.Locations[0] + for _, sid := range shardIds { + if !util.FileExists(base + erasure_coding.ToExt(sid)) { + t.Errorf("EC shard file %d was deleted", sid) + } + if _, found := loc.FindEcShard(vid, erasure_coding.ShardId(sid)); !found { + t.Errorf("EC shard %d was not loaded", sid) + } + } + if store.findVolume(vid) != nil { + t.Errorf("stub was loaded as a phantom volume %d", vid) + } +} diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go index 7bdf201f5..fc12021ca 100644 --- a/weed/worker/tasks/erasure_coding/ec_task.go +++ b/weed/worker/tasks/erasure_coding/ec_task.go @@ -708,7 +708,8 @@ func (t *ErasureCodingTask) verifyEcShardsBeforeDelete(ctx context.Context) erro "per_server": summary, }).Info("EC shard inventory before source deletion") - if err := erasure_coding.RequireFullShardSet(t.volumeID, union, totalShards); err != nil { + degraded, err := erasure_coding.RequireRecoverableShardSet(t.volumeID, union, int(t.dataShards), totalShards) + if err != nil { t.GetLogger().WithFields(map[string]interface{}{ "volume_id": t.volumeID, "per_server": summary, @@ -716,6 +717,17 @@ func (t *ErasureCodingTask) verifyEcShardsBeforeDelete(ctx context.Context) erro }).Error("EC shard verification failed — source volume will be kept") return err } + if degraded { + // Enough shards to reconstruct; the missing ones can be rebuilt from + // the survivors, while keeping the source next to live shards is the + // more dangerous mixed state. + t.GetLogger().WithFields(map[string]interface{}{ + "volume_id": t.volumeID, + "shards_seen": union.Count(), + "shards_total": totalShards, + "per_server": summary, + }).Warning("EC shard set incomplete but recoverable; proceeding with source deletion") + } return nil }