diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index dcbc9217d..30bec6791 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -115,6 +115,15 @@ func (l *DiskLocation) HasEcxFileOnDisk(collection string, vid needle.VolumeId) } func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolume, error) { + return l.loadEcShardWithIdxDir(collection, vid, shardId, l.IdxDirectory) +} + +// loadEcShardWithIdxDir is like LoadEcShard but uses the supplied idxDir as +// the source of .ecx / .ecj rather than this disk's own IdxDirectory. The +// orphan-shard reconciliation calls this with a sibling disk's idx folder +// when shards live on a disk that does not own the index files itself +// (issue #9212). +func (l *DiskLocation) loadEcShardWithIdxDir(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, idxDir string) (*erasure_coding.EcVolume, error) { ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId) if err != nil { @@ -127,7 +136,7 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard defer l.ecVolumesLock.Unlock() ecVolume, found := l.ecVolumes[vid] if !found { - ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid) + ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, idxDir, collection, vid) if err != nil { return nil, fmt.Errorf("failed to create ec volume %d: %v", vid, err) } @@ -261,6 +270,38 @@ func (l *DiskLocation) loadAllEcShards(onShardLoad func(collection string, vid n return nil } +// loadEcShardsWithIdxDir loads each shard file in shards into l.ecVolumes, +// using idxDir as the source of .ecx / .ecj / .vif (NewEcVolume falls back +// to dirIdx for .vif when the data dir does not have one). Used by the +// store-level orphan-shard reconciliation in #9212; stops on the first +// failure so the caller can log and continue with other volumes. +func (l *DiskLocation) loadEcShardsWithIdxDir(shards []string, collection string, vid needle.VolumeId, idxDir string, onShardLoad func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume)) error { + + for _, shard := range shards { + ext := path.Ext(shard) + if len(ext) < 4 { + return fmt.Errorf("unexpected ec shard name %v", shard) + } + shardId, err := strconv.ParseInt(ext[3:], 10, 64) + if err != nil { + return fmt.Errorf("failed to parse ec shard name %v: %w", shard, err) + } + if shardId < 0 || shardId > 255 { + return fmt.Errorf("shard ID out of range: %d", shardId) + } + + ecVolume, err := l.loadEcShardWithIdxDir(collection, vid, erasure_coding.ShardId(shardId), idxDir) + if err != nil { + return fmt.Errorf("failed to load ec shard %v: %w", shard, err) + } + if onShardLoad != nil { + onShardLoad(collection, vid, erasure_coding.ShardId(shardId), ecVolume) + } + } + + return nil +} + func (l *DiskLocation) deleteEcVolumeById(vid needle.VolumeId) (e error) { // Add write lock since we're modifying the ecVolumes map l.ecVolumesLock.Lock() @@ -376,21 +417,29 @@ func (l *DiskLocation) checkOrphanedShards(shards []string, collection string, v // calculateExpectedShardSize computes the exact expected shard size based on .dat file size // The EC encoding process is deterministic: -// 1. Data is processed in batches of (LargeBlockSize * DataShardsCount) for large blocks -// 2. Remaining data is processed in batches of (SmallBlockSize * DataShardsCount) for small blocks +// 1. Data is processed in batches of (LargeBlockSize * dataShardCount) for large blocks +// 2. Remaining data is processed in batches of (SmallBlockSize * dataShardCount) for small blocks // 3. Each shard gets exactly its portion, with zero-padding applied to incomplete blocks -func calculateExpectedShardSize(datFileSize int64) int64 { +// +// dataShardCount is taken as a parameter rather than read from +// erasure_coding.DataShardsCount so that tests writing a custom layout +// to .vif compute the matching shard size, and so custom-ratio builds +// (e.g. enterprise) can swap the default without touching this helper. +func calculateExpectedShardSize(datFileSize int64, dataShardCount int) int64 { + if dataShardCount <= 0 { + return 0 + } var shardSize int64 - // Process large blocks (1GB * 10 = 10GB batches) - largeBatchSize := int64(erasure_coding.ErasureCodingLargeBlockSize) * int64(erasure_coding.DataShardsCount) + // Process large blocks (1GB * dataShardCount per batch) + largeBatchSize := int64(erasure_coding.ErasureCodingLargeBlockSize) * int64(dataShardCount) numLargeBatches := datFileSize / largeBatchSize shardSize = numLargeBatches * int64(erasure_coding.ErasureCodingLargeBlockSize) remainingSize := datFileSize - (numLargeBatches * largeBatchSize) - // Process remaining data in small blocks (1MB * 10 = 10MB batches) + // Process remaining data in small blocks (1MB * dataShardCount per batch) if remainingSize > 0 { - smallBatchSize := int64(erasure_coding.ErasureCodingSmallBlockSize) * int64(erasure_coding.DataShardsCount) + smallBatchSize := int64(erasure_coding.ErasureCodingSmallBlockSize) * int64(dataShardCount) numSmallBatches := (remainingSize + smallBatchSize - 1) / smallBatchSize // Ceiling division shardSize += numSmallBatches * int64(erasure_coding.ErasureCodingSmallBlockSize) } @@ -410,10 +459,13 @@ func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId) var expectedShardSize int64 = -1 datExists := false - // If .dat file exists, compute exact expected shard size from it + // If .dat file exists, compute exact expected shard size from it. + // Pass the build's default data-shard count; calculateExpectedShardSize + // takes it as a parameter so tests / enterprise builds can supply + // their own. if datFileInfo, err := os.Stat(datFileName); err == nil { datExists = true - expectedShardSize = calculateExpectedShardSize(datFileInfo.Size()) + expectedShardSize = calculateExpectedShardSize(datFileInfo.Size(), erasure_coding.DataShardsCount) } else if !os.IsNotExist(err) { // If stat fails with unexpected error (permission, I/O), fail validation // Don't treat this as "distributed EC" - it could be a temporary error diff --git a/weed/storage/disk_location_ec_realworld_test.go b/weed/storage/disk_location_ec_realworld_test.go index 3a21ccb6c..ea92d3f00 100644 --- a/weed/storage/disk_location_ec_realworld_test.go +++ b/weed/storage/disk_location_ec_realworld_test.go @@ -83,7 +83,7 @@ func TestCalculateExpectedShardSizeWithRealEncoding(t *testing.T) { datFile.Close() // Calculate expected shard size using our function - expectedShardSize := calculateExpectedShardSize(tt.datFileSize) + expectedShardSize := calculateExpectedShardSize(tt.datFileSize, erasure_coding.DataShardsCount) // Run actual EC encoding err = erasure_coding.WriteEcFiles(baseFileName) @@ -164,7 +164,7 @@ func TestCalculateExpectedShardSizeEdgeCases(t *testing.T) { datFile.Close() // Calculate expected - expectedShardSize := calculateExpectedShardSize(tt.datFileSize) + expectedShardSize := calculateExpectedShardSize(tt.datFileSize, erasure_coding.DataShardsCount) // Run actual EC encoding err = erasure_coding.WriteEcFiles(baseFileName) diff --git a/weed/storage/disk_location_ec_shard_size_test.go b/weed/storage/disk_location_ec_shard_size_test.go index e58c1c129..a601e2567 100644 --- a/weed/storage/disk_location_ec_shard_size_test.go +++ b/weed/storage/disk_location_ec_shard_size_test.go @@ -128,7 +128,7 @@ func TestCalculateExpectedShardSize(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actualShardSize := calculateExpectedShardSize(tt.datFileSize) + actualShardSize := calculateExpectedShardSize(tt.datFileSize, dataShards) if actualShardSize != tt.expectedShardSize { t.Errorf("Expected shard size %d, got %d. %s", @@ -143,6 +143,7 @@ func TestCalculateExpectedShardSize(t *testing.T) { // TestShardSizeValidationScenarios tests realistic scenarios func TestShardSizeValidationScenarios(t *testing.T) { + const dataShards = 10 scenarios := []struct { name string datFileSize int64 @@ -183,7 +184,7 @@ func TestShardSizeValidationScenarios(t *testing.T) { for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { - expectedSize := calculateExpectedShardSize(scenario.datFileSize) + expectedSize := calculateExpectedShardSize(scenario.datFileSize, dataShards) isValid := scenario.actualShardSize == expectedSize if isValid != scenario.shouldBeValid { diff --git a/weed/storage/disk_location_ec_test.go b/weed/storage/disk_location_ec_test.go index e40870b7f..0210890f3 100644 --- a/weed/storage/disk_location_ec_test.go +++ b/weed/storage/disk_location_ec_test.go @@ -123,7 +123,7 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) { // Use deterministic but small size: 10MB .dat => 1MB per shard datFileSize := int64(10 * 1024 * 1024) // 10MB - expectedShardSize := calculateExpectedShardSize(datFileSize) + expectedShardSize := calculateExpectedShardSize(datFileSize, erasure_coding.DataShardsCount) // Create .dat file if needed if tt.createDatFile { @@ -325,7 +325,7 @@ func TestValidateEcVolume(t *testing.T) { // For test purposes, use a small .dat file size that still exercises the logic // 10MB .dat file = 1MB per shard (one small batch, fast and deterministic) datFileSize := int64(10 * 1024 * 1024) // 10MB - expectedShardSize := calculateExpectedShardSize(datFileSize) + expectedShardSize := calculateExpectedShardSize(datFileSize, erasure_coding.DataShardsCount) // Create .dat file if needed if tt.createDatFile { diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 2bf96d0d8..f0421f7f0 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -105,9 +105,23 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection glog.Warningf("ec volume %d: load deleted needles from .ecj: %v", vid, loadErr) } - // read volume info + // read volume info. Prefer .vif at the data dir (where shards live), but + // fall back to the index dir when the data dir does not have one — the + // orphan-shard reconciliation in Store loads shards on a disk whose only + // EC artefacts are .ec?? files, with .ecx / .ecj / .vif on a sibling disk + // (issue #9212). Without this fallback we'd write a stub .vif on the + // shard disk and lose the real EC config + datFileSize. + vifFileName := dataBaseFileName + ".vif" + if dirIdx != dir { + if _, statErr := os.Stat(vifFileName); statErr != nil && os.IsNotExist(statErr) { + altVif := EcShardFileName(collection, dirIdx, int(vid)) + ".vif" + if _, altStatErr := os.Stat(altVif); altStatErr == nil { + vifFileName = altVif + } + } + } ev.Version = needle.Version3 - if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found { + if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(vifFileName); found { ev.Version = needle.Version(volumeInfo.Version) ev.datFileSize = volumeInfo.DatFileSize ev.ExpireAtSec = volumeInfo.ExpireAtSec @@ -134,7 +148,7 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection ev.ECContext = NewDefaultECContext(collection, vid) } } else { - glog.Warningf("vif file not found,volumeId:%d, filename:%s", vid, dataBaseFileName) + glog.Warningf("vif file not found,volumeId:%d, filename:%s", vid, vifFileName) volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)}) ev.ECContext = NewDefaultECContext(collection, vid) } diff --git a/weed/storage/store.go b/weed/storage/store.go index 205aad7b8..fab43ef9a 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -154,6 +154,15 @@ func NewStore( } wg.Wait() + // After every DiskLocation has finished its per-disk EC scan, sweep the + // store for shards that live on a disk without local index files and + // load them by reaching across to a sibling disk's .ecx / .ecj / .vif. + // This is the volume-server side of issue #9212: ec.balance can move + // shards onto a destination node's second disk while leaving the index + // on the disk that already held the volume, and without this pass those + // orphan shards stay invisible to the master. + s.reconcileEcShardsAcrossDisks() + // Resolve state.pb's directory via the first disk location so it inherits // the same `~` expansion and empty-idxFolder fallback used for .idx files, // and is never written as a relative path against the process CWD (#9173). diff --git a/weed/storage/store_ec_orphan_shard_test.go b/weed/storage/store_ec_orphan_shard_test.go new file mode 100644 index 000000000..2b8f37660 --- /dev/null +++ b/weed/storage/store_ec_orphan_shard_test.go @@ -0,0 +1,458 @@ +package storage + +import ( + "os" + "path/filepath" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/storage/volume_info" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// TestLoadEcShardsWhenIndexFilesOnDifferentDisk reproduces issue #9212. +// On the user's volume server, ec.balance moved EC shards onto a different +// physical disk than the disk that holds the .ecx/.ecj/.vif index files. +// The volume server must still load those orphan shards on startup and +// register them with the master — otherwise ec.rebuild reports the volume +// as unrepairable even though all shards are physically present. +// +// Layout under test (mirrors volume-0 + volume-2 rows for grafana-loki_1093 +// in the ls -l attached to the issue): +// +// dir0 (diskId 0): .ec00, .ec12 <-- shards but no .ecx / .ecj / .vif +// dir1 (diskId 1): .ec01, .ecx, .ecj, .vif +func TestLoadEcShardsWhenIndexFilesOnDifferentDisk(t *testing.T) { + tempDir := t.TempDir() + dir0 := filepath.Join(tempDir, "data1") + dir1 := filepath.Join(tempDir, "data2") + if err := os.MkdirAll(dir0, 0o755); err != nil { + t.Fatalf("mkdir dir0: %v", err) + } + if err := os.MkdirAll(dir1, 0o755); err != nil { + t.Fatalf("mkdir dir1: %v", err) + } + + collection := "grafana-loki" + vid := needle.VolumeId(1093) + + // EC shape used to populate .vif. Kept as locals (not the package + // constants) so the test stays valid when enterprise builds use a + // different default ratio. + const dataShards, parityShards = 10, 4 + + // Use a small but realistic shard size so calculateExpectedShardSize + // validation lines up with the .dat-derived size if it ever runs. + const datSize int64 = 10 * 1024 * 1024 + expectedShardSize := calculateExpectedShardSize(datSize, dataShards) + + writeShard := func(dir string, shardId int) { + t.Helper() + base := erasure_coding.EcShardFileName(collection, dir, int(vid)) + f, err := os.Create(base + erasure_coding.ToExt(shardId)) + if err != nil { + t.Fatalf("create shard %d in %s: %v", shardId, dir, err) + } + if err := f.Truncate(expectedShardSize); err != nil { + f.Close() + t.Fatalf("truncate shard %d in %s: %v", shardId, dir, err) + } + if err := f.Close(); err != nil { + t.Fatalf("close shard %d in %s: %v", shardId, dir, err) + } + } + + // dir0: orphan shards. No .ecx / .ecj / .vif on this disk. + writeShard(dir0, 0) + writeShard(dir0, 12) + + // dir1: one shard plus the index files. + writeShard(dir1, 1) + + base1 := erasure_coding.EcShardFileName(collection, dir1, int(vid)) + + // Build a valid sealed .ecx with one entry so NewEcVolume can open it. + // Layout: NeedleId(8) + Offset(8) + Size(4) = 20 bytes per entry. + ecxBytes := make([]byte, 20) + if err := os.WriteFile(base1+".ecx", ecxBytes, 0o644); err != nil { + t.Fatalf("write .ecx: %v", err) + } + + // Empty .ecj is fine (no deletes journaled yet). + if err := os.WriteFile(base1+".ecj", nil, 0o644); err != nil { + t.Fatalf("write .ecj: %v", err) + } + + // .vif with the EC config the test set above so EcVolume picks up the + // right shape regardless of the build's default ratio. + if err := volume_info.SaveVolumeInfo(base1+".vif", &volume_server_pb.VolumeInfo{ + Version: uint32(needle.Version3), + DatFileSize: datSize, + EcShardConfig: &volume_server_pb.EcShardConfig{ + DataShards: dataShards, + ParityShards: parityShards, + }, + }); err != nil { + t.Fatalf("save .vif: %v", err) + } + + // Build the Store with both disks. NewStore triggers per-disk loading + // during construction, which is the codepath under test. + store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id", + []string{dir0, dir1}, + []int32{100, 100}, + []util.MinFreeSpace{{}, {}}, + "", + NeedleMapInMemory, + []types.DiskType{types.HardDriveType, types.HardDriveType}, + nil, + 3, + ) + + // Drain heartbeat-style channels so loading does not block. + done := make(chan struct{}) + go func() { + for { + select { + case <-store.NewVolumesChan: + case <-store.NewEcShardsChan: + case <-store.DeletedVolumesChan: + case <-store.DeletedEcShardsChan: + case <-store.StateUpdateChan: + case <-done: + return + } + } + }() + t.Cleanup(func() { + store.Close() + close(done) + }) + + if got, want := len(store.Locations), 2; got != want { + t.Fatalf("store has %d disk locations, want %d", got, want) + } + + // Sanity: dir1's shard ec01 lives on the disk that owns .ecx and must load. + loc1 := store.Locations[1] + if _, found := loc1.FindEcShard(vid, 1); !found { + t.Fatalf("baseline broken: shard 1 on dir1 (which has .ecx) was not loaded") + } + + // The bug: shards on dir0 have no local .ecx/.vif — they should still be + // loaded by reaching across to dir1's index files, but currently the + // per-disk loader silently drops them when no .dat file is present. + loc0 := store.Locations[0] + for _, sid := range []erasure_coding.ShardId{0, 12} { + if _, found := loc0.FindEcShard(vid, sid); !found { + t.Errorf("issue #9212: shard %d on dir0 was not loaded; .ecx is on dir1", sid) + } + } + + // Files on dir0 must not have been deleted by any cleanup path. + base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid)) + for _, sid := range []int{0, 12} { + shardPath := base0 + erasure_coding.ToExt(sid) + fi, err := os.Stat(shardPath) + if err != nil { + t.Errorf("orphan shard %d was destroyed: %v", sid, err) + continue + } + if fi.Size() != expectedShardSize { + t.Errorf("orphan shard %d truncated: size %d, want %d", sid, fi.Size(), expectedShardSize) + } + } +} + +// TestLoadEcShardsOrphanWithoutSiblingEcx exercises the truly-orphaned +// case from issue #9212: shard files exist on a disk but no .ecx exists +// anywhere on the volume server. We must not crash, and we must leave the +// shard files alone so an operator can restore the index later. +func TestLoadEcShardsOrphanWithoutSiblingEcx(t *testing.T) { + tempDir := t.TempDir() + dir0 := filepath.Join(tempDir, "data1") + dir1 := filepath.Join(tempDir, "data2") + if err := os.MkdirAll(dir0, 0o755); err != nil { + t.Fatalf("mkdir dir0: %v", err) + } + if err := os.MkdirAll(dir1, 0o755); err != nil { + t.Fatalf("mkdir dir1: %v", err) + } + + collection := "grafana-loki" + vid := needle.VolumeId(2222) + const dataShards = 10 + const datSize int64 = 10 * 1024 * 1024 + expectedShardSize := calculateExpectedShardSize(datSize, dataShards) + + base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid)) + for _, sid := range []int{0, 12} { + f, err := os.Create(base0 + erasure_coding.ToExt(sid)) + if err != nil { + t.Fatalf("create shard %d: %v", sid, err) + } + if err := f.Truncate(expectedShardSize); err != nil { + f.Close() + t.Fatalf("truncate shard %d: %v", sid, err) + } + f.Close() + } + + store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id", + []string{dir0, dir1}, + []int32{100, 100}, + []util.MinFreeSpace{{}, {}}, + "", + NeedleMapInMemory, + []types.DiskType{types.HardDriveType, types.HardDriveType}, + nil, + 3, + ) + done := make(chan struct{}) + go func() { + for { + select { + case <-store.NewVolumesChan: + case <-store.NewEcShardsChan: + case <-store.DeletedVolumesChan: + case <-store.DeletedEcShardsChan: + case <-store.StateUpdateChan: + case <-done: + return + } + } + }() + t.Cleanup(func() { + store.Close() + close(done) + }) + + loc0 := store.Locations[0] + if _, found := loc0.FindEcShard(vid, 0); found { + t.Errorf("shard 0 should remain unloaded when no .ecx exists anywhere; reconciliation must not fabricate one") + } + + for _, sid := range []int{0, 12} { + shardPath := base0 + erasure_coding.ToExt(sid) + fi, err := os.Stat(shardPath) + if err != nil { + t.Errorf("orphan shard %d destroyed by reconciliation: %v", sid, err) + continue + } + if fi.Size() != expectedShardSize { + t.Errorf("orphan shard %d truncated: size %d, want %d", sid, fi.Size(), expectedShardSize) + } + } +} + +// TestReconcileNoOpWhenEachDiskIsSelfContained guards against the +// reconciliation pass double-loading shards or stomping on EcVolumes that +// were already populated by the per-disk pass. +func TestReconcileNoOpWhenEachDiskIsSelfContained(t *testing.T) { + tempDir := t.TempDir() + dir0 := filepath.Join(tempDir, "data1") + dir1 := filepath.Join(tempDir, "data2") + if err := os.MkdirAll(dir0, 0o755); err != nil { + t.Fatalf("mkdir dir0: %v", err) + } + if err := os.MkdirAll(dir1, 0o755); err != nil { + t.Fatalf("mkdir dir1: %v", err) + } + + collection := "grafana-loki" + vid := needle.VolumeId(3333) + const dataShards, parityShards = 10, 4 + const datSize int64 = 10 * 1024 * 1024 + expectedShardSize := calculateExpectedShardSize(datSize, dataShards) + + writeFullEcLayout := func(dir string, shardIds []int) { + base := erasure_coding.EcShardFileName(collection, dir, int(vid)) + for _, sid := range shardIds { + f, err := os.Create(base + erasure_coding.ToExt(sid)) + if err != nil { + t.Fatalf("create shard %d in %s: %v", sid, dir, err) + } + if err := f.Truncate(expectedShardSize); err != nil { + f.Close() + t.Fatalf("truncate shard %d in %s: %v", sid, dir, err) + } + f.Close() + } + if err := os.WriteFile(base+".ecx", make([]byte, 20), 0o644); err != nil { + t.Fatalf("write .ecx in %s: %v", dir, err) + } + if err := os.WriteFile(base+".ecj", nil, 0o644); err != nil { + t.Fatalf("write .ecj in %s: %v", dir, err) + } + if err := volume_info.SaveVolumeInfo(base+".vif", &volume_server_pb.VolumeInfo{ + Version: uint32(needle.Version3), + DatFileSize: datSize, + EcShardConfig: &volume_server_pb.EcShardConfig{ + DataShards: dataShards, + ParityShards: parityShards, + }, + }); err != nil { + t.Fatalf("save .vif in %s: %v", dir, err) + } + } + + // Each disk is a fully self-contained EC layout — reconciliation + // should leave them untouched. + writeFullEcLayout(dir0, []int{0, 4}) + writeFullEcLayout(dir1, []int{8, 12}) + + store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id", + []string{dir0, dir1}, + []int32{100, 100}, + []util.MinFreeSpace{{}, {}}, + "", + NeedleMapInMemory, + []types.DiskType{types.HardDriveType, types.HardDriveType}, + nil, + 3, + ) + done := make(chan struct{}) + go func() { + for { + select { + case <-store.NewVolumesChan: + case <-store.NewEcShardsChan: + case <-store.DeletedVolumesChan: + case <-store.DeletedEcShardsChan: + case <-store.StateUpdateChan: + case <-done: + return + } + } + }() + t.Cleanup(func() { + store.Close() + close(done) + }) + + for diskIdx, sids := range [][]erasure_coding.ShardId{{0, 4}, {8, 12}} { + loc := store.Locations[diskIdx] + ev, found := loc.FindEcVolume(vid) + if !found { + t.Fatalf("disk %d: EcVolume for vid %d not loaded", diskIdx, vid) + } + if got := len(ev.Shards); got != len(sids) { + t.Errorf("disk %d: EcVolume has %d shards, want %d (reconciliation may have double-loaded)", diskIdx, got, len(sids)) + } + for _, sid := range sids { + if _, ok := loc.FindEcShard(vid, sid); !ok { + t.Errorf("disk %d: shard %d missing", diskIdx, sid) + } + } + } +} + +// TestLoadEcShardsWhenOwnerEcxIsInDataDir covers the legacy layout flagged +// in PR #9244 review: -dir.idx is configured (so every DiskLocation has +// IdxDirectory != Directory), but the owner's .ecx / .ecj / .vif were +// written into the owner's data dir before -dir.idx was set. indexEcxOwners +// must record the directory the .ecx was actually found in (Directory), +// not just the owner's IdxDirectory — otherwise NewEcVolume's same-disk +// fallback retries the orphan disk's data dir and ENOENTs there too. +func TestLoadEcShardsWhenOwnerEcxIsInDataDir(t *testing.T) { + tempDir := t.TempDir() + dir0 := filepath.Join(tempDir, "data1") // orphan: shards only + dir1 := filepath.Join(tempDir, "data2") // owner: .ecx in data dir + idxDir := filepath.Join(tempDir, "idx") // shared idx folder, intentionally empty + for _, d := range []string{dir0, dir1, idxDir} { + if err := os.MkdirAll(d, 0o755); err != nil { + t.Fatalf("mkdir %s: %v", d, err) + } + } + + collection := "grafana-loki" + vid := needle.VolumeId(4242) + const dataShards, parityShards = 10, 4 + const datSize int64 = 10 * 1024 * 1024 + expectedShardSize := calculateExpectedShardSize(datSize, dataShards) + + writeShard := func(dir string, shardId int) { + t.Helper() + base := erasure_coding.EcShardFileName(collection, dir, int(vid)) + f, err := os.Create(base + erasure_coding.ToExt(shardId)) + if err != nil { + t.Fatalf("create shard %d in %s: %v", shardId, dir, err) + } + if err := f.Truncate(expectedShardSize); err != nil { + f.Close() + t.Fatalf("truncate shard %d in %s: %v", shardId, dir, err) + } + f.Close() + } + + writeShard(dir0, 0) + writeShard(dir0, 12) + writeShard(dir1, 1) + + // Owner's .ecx / .ecj / .vif live in dir1 (data dir), NOT idxDir. + // This mirrors a server that ran without -dir.idx, then later got it + // configured — the index files stay in their original on-disk home. + base1Data := erasure_coding.EcShardFileName(collection, dir1, int(vid)) + if err := os.WriteFile(base1Data+".ecx", make([]byte, 20), 0o644); err != nil { + t.Fatalf("write .ecx in data dir: %v", err) + } + if err := os.WriteFile(base1Data+".ecj", nil, 0o644); err != nil { + t.Fatalf("write .ecj in data dir: %v", err) + } + if err := volume_info.SaveVolumeInfo(base1Data+".vif", &volume_server_pb.VolumeInfo{ + Version: uint32(needle.Version3), + DatFileSize: datSize, + EcShardConfig: &volume_server_pb.EcShardConfig{ + DataShards: dataShards, + ParityShards: parityShards, + }, + }); err != nil { + t.Fatalf("save .vif in data dir: %v", err) + } + + // idxDir is configured but intentionally empty for this volume — we + // want IdxDirectory != Directory while the .ecx lives in Directory. + store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id", + []string{dir0, dir1}, + []int32{100, 100}, + []util.MinFreeSpace{{}, {}}, + idxDir, + NeedleMapInMemory, + []types.DiskType{types.HardDriveType, types.HardDriveType}, + nil, + 3, + ) + done := make(chan struct{}) + go func() { + for { + select { + case <-store.NewVolumesChan: + case <-store.NewEcShardsChan: + case <-store.DeletedVolumesChan: + case <-store.DeletedEcShardsChan: + case <-store.StateUpdateChan: + case <-done: + return + } + } + }() + t.Cleanup(func() { + store.Close() + close(done) + }) + + loc1 := store.Locations[1] + if _, found := loc1.FindEcShard(vid, 1); !found { + t.Fatalf("baseline broken: shard 1 on dir1 (which has .ecx in its data dir) was not loaded") + } + + loc0 := store.Locations[0] + for _, sid := range []erasure_coding.ShardId{0, 12} { + if _, found := loc0.FindEcShard(vid, sid); !found { + t.Errorf("issue #9212 (PR #9244 review): orphan shard %d on dir0 not loaded; reconcile pointed loader at IdxDirectory but .ecx was actually in owner.Directory", sid) + } + } +} diff --git a/weed/storage/store_ec_reconcile.go b/weed/storage/store_ec_reconcile.go new file mode 100644 index 000000000..afb7ca65d --- /dev/null +++ b/weed/storage/store_ec_reconcile.go @@ -0,0 +1,170 @@ +package storage + +import ( + "os" + "path" + "strconv" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" +) + +// ecKeyForReconcile keys orphan-shard reconciliation by collection + volume +// id. Per-collection grouping matters because two collections can re-use the +// same volume id, and we must only pair shards with their own .ecx file. +type ecKeyForReconcile struct { + collection string + vid needle.VolumeId +} + +// ecxOwnerInfo records both the disk that owns the .ecx and the actual +// directory it lives in (IdxDirectory or Directory). The directory matters +// because indexEcxOwners scans both — when .ecx lives in Directory (the +// legacy "written before -dir.idx was set" layout that removeEcVolumeFiles +// in disk_location_ec.go also keeps cleaning up), passing the owner's +// IdxDirectory to NewEcVolume would ENOENT both the primary and the +// same-disk fallback path, which uses the orphan disk's data dir, not the +// owner's. Tracking the actual scan dir lets reconcile point loaders at +// the directory the .ecx is really in. +type ecxOwnerInfo struct { + location *DiskLocation + idxDir string +} + +// reconcileEcShardsAcrossDisks loads EC shards that the per-disk scan in +// loadAllEcShards skipped because the disk holding the .ec?? files does not +// also hold the matching .ecx / .ecj / .vif index files. The index files +// are located on a different disk of the same volume server (issue #9212). +// +// Per-disk loadAllEcShards correctly leaves these orphan shards on disk — +// it does not have visibility into other DiskLocations on the same store — +// so the cross-disk fan-out must happen here, after every disk's initial +// pass has completed. We register each shard against its physical disk's +// ecVolumes map (so heartbeat reporting carries the right DiskId per +// shard), but point the EcVolume at the sibling disk's index files so it +// can serve reads and route deletes through a real .ecx / .ecj. +func (s *Store) reconcileEcShardsAcrossDisks() { + if len(s.Locations) < 2 { + return + } + + ecxOwners := s.indexEcxOwners() + if len(ecxOwners) == 0 { + return + } + + for _, loc := range s.Locations { + orphans := loc.collectOrphanEcShards() + if len(orphans) == 0 { + continue + } + for key, shards := range orphans { + owner, ok := ecxOwners[key] + if !ok { + glog.Warningf("ec volume %d (collection=%q) has shards on %s without a matching .ecx anywhere on this volume server; shards %v will stay unloaded until the missing .ecx is restored", + key.vid, key.collection, loc.Directory, shards) + continue + } + if owner.location == loc { + // .ecx is on this same disk, but loadAllEcShards still + // did not load these shards — handleFoundEcxFile already + // logged the underlying failure. Don't try again here. + continue + } + glog.V(0).Infof("ec volume %d (collection=%q): loading orphan shards %v on %s using index files from %s (issue #9212)", + key.vid, key.collection, shards, loc.Directory, owner.idxDir) + if err := loc.loadEcShardsWithIdxDir(shards, key.collection, key.vid, owner.idxDir, loc.ecShardNotifyHandler); err != nil { + glog.Errorf("ec volume %d on %s: cross-disk shard load failed: %v", key.vid, loc.Directory, err) + } + } + } +} + +// indexEcxOwners returns the disk and the actual directory that owns the +// .ecx file for each (collection, vid) on this store. .ecx normally lives +// in IdxDirectory but may have been written into the data directory before +// -dir.idx was set, so we check both — and we record which one matched so +// downstream loaders point NewEcVolume at the directory that really has +// the file. The first owner found wins; duplicates across disks are +// unusual but tolerated. +func (s *Store) indexEcxOwners() map[ecKeyForReconcile]ecxOwnerInfo { + owners := make(map[ecKeyForReconcile]ecxOwnerInfo) + for _, loc := range s.Locations { + seen := make(map[string]bool, 2) + for _, scan := range []string{loc.IdxDirectory, loc.Directory} { + if scan == "" || seen[scan] { + continue + } + seen[scan] = true + entries, err := os.ReadDir(scan) + if err != nil { + continue + } + for _, entry := range entries { + if entry.IsDir() { + continue + } + name := entry.Name() + if !strings.HasSuffix(name, ".ecx") { + continue + } + base := name[:len(name)-len(".ecx")] + collection, vid, err := parseCollectionVolumeId(base) + if err != nil { + continue + } + key := ecKeyForReconcile{collection: collection, vid: vid} + if _, exists := owners[key]; !exists { + owners[key] = ecxOwnerInfo{location: loc, idxDir: scan} + } + } + } + } + return owners +} + +// collectOrphanEcShards walks the disk's data directory and returns the +// .ec?? shard files that are present on disk but not yet registered to an +// EcVolume in memory. The map is keyed by (collection, vid) so callers can +// match each group against the .ecx-owning disk in one lookup. +// +// Zero-byte shard files are ignored — loadAllEcShards already treats them +// as cleanup-worthy noise and we want the same shape here. +func (l *DiskLocation) collectOrphanEcShards() map[ecKeyForReconcile][]string { + entries, err := os.ReadDir(l.Directory) + if err != nil { + return nil + } + orphans := make(map[ecKeyForReconcile][]string) + for _, entry := range entries { + if entry.IsDir() { + continue + } + name := entry.Name() + ext := path.Ext(name) + if !re.MatchString(ext) { + continue + } + info, err := entry.Info() + if err != nil || info.Size() == 0 { + continue + } + shardId, err := strconv.ParseInt(ext[3:], 10, 64) + if err != nil || shardId < 0 || shardId > 255 { + continue + } + base := name[:len(name)-len(ext)] + collection, vid, err := parseCollectionVolumeId(base) + if err != nil { + continue + } + if _, loaded := l.FindEcShard(vid, erasure_coding.ShardId(shardId)); loaded { + continue + } + key := ecKeyForReconcile{collection: collection, vid: vid} + orphans[key] = append(orphans[key], name) + } + return orphans +}