From d5c0a7b153d2b66aabfe478432deef347db5bc88 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 13 May 2026 13:56:20 -0700 Subject: [PATCH] fix(ec): make multi-disk same-server EC reads work + full-lifecycle integration test (#9487) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(master): include GrpcPort in LookupEcVolume response LookupVolume already passes loc.GrpcPort through to the client; LookupEcVolume builds Location with only Url / PublicUrl / DataCenter, so callers fall back to ServerToGrpcAddress (httpPort + 10000). On any deployment where that convention does not hold — multi-disk integration tests, custom port layouts — EC reads dial the wrong port and quietly degrade to parity recovery. * fix(volume/ec): probe every DiskLocation when serving local shard reads reconcileEcShardsAcrossDisks (issue 9212) registers each .ec?? against the DiskLocation that physically owns it, so a multi-disk volume server can hold shards for the same vid in two separate ecVolumes — one per disk — with .ecx on whichever disk owned the original .dat. The read path only consulted the single EcVolume FindEcVolume picked, so requests for shards on the sibling disk fell through to errShardNotLocal and then to remote/loopback recovery. Walk all DiskLocations after the first probe in both readLocalEcShardInterval and the VolumeEcShardRead gRPC handler; the latter also covers the loopback that recoverOneRemoteEcShardInterval falls back to when a peer dial fails. * test(volume/ec): cover the multi-disk EC lifecycle end-to-end Two integration tests against a real volume server with two data dirs: TestEcLifecycleAcrossMultipleDisks drives encode -> mount -> HTTP read -> drop .dat -> stop -> redistribute shards across disks -> restart -> verify reconcileEcShardsAcrossDisks attached the orphan shards and reads still work -> blob delete -> stop -> drop a shard -> restart -> VolumeEcShardsRebuild pulls input from both disks -> reads still work. TestEcPartialShardsOnSiblingDiskCleanedUpOnRestart is the issue 9478 reproducer at the cluster level: seed a healthy .dat on disk 0, plant the on-disk footprint of an interrupted EC encode on disk 1, restart, and assert pruneIncompleteEcWithSiblingDat wipes disk 1 without touching disk 0. Framework gets RestartVolumeServer / StopVolumeServer helpers; the previous run's volume.log is rotated to volume.log.previous so a startup regression on the second run does not lose the first run's diagnostics. * review: trim verbose comments * review: drop racy fast-path, use locked findEcShard directly gemini-code-assist flagged the two-step lookup in readLocalEcShardInterval and VolumeEcShardRead: the first probe (ecVolume.FindEcVolumeShard) reads the EcVolume's Shards slice without holding ecVolumesLock, so a concurrent mount / unmount could race with it. findEcShard already walks every DiskLocation under the right lock, so the fast-path adds nothing but the race. Collapse both call sites to a single locked call. Also note in RestartVolumeServer why the log-rotation error is swallowed: absence on first call is benign; anything else surfaces in the next os.Create in startVolume. --- test/volume_server/framework/cluster.go | 32 ++ .../grpc/ec_multi_disk_lifecycle_test.go | 487 ++++++++++++++++++ weed/server/master_grpc_server_volume.go | 2 + weed/server/volume_grpc_erasure_coding.go | 4 +- weed/storage/store_ec.go | 10 +- 5 files changed, 533 insertions(+), 2 deletions(-) create mode 100644 test/volume_server/grpc/ec_multi_disk_lifecycle_test.go diff --git a/test/volume_server/framework/cluster.go b/test/volume_server/framework/cluster.go index 86d75415d..ae84ede07 100644 --- a/test/volume_server/framework/cluster.go +++ b/test/volume_server/framework/cluster.go @@ -184,6 +184,38 @@ func (c *Cluster) Stop() { }) } +// RestartVolumeServer kills the volume server and starts a new one against +// the same data dirs and ports. The master keeps running. The previous run's +// volume.log is moved to volume.log.previous so the first run's logs survive +// a second-run startup failure. +func (c *Cluster) RestartVolumeServer() { + c.testingTB.Helper() + stopProcess(c.volumeCmd) + c.volumeCmd = nil + // Rotate the log; absent on the first call after a clean start, which + // is fine — startVolume will create it. Any real filesystem failure + // surfaces immediately on the next os.Create in startVolume. + oldLog := filepath.Join(c.logsDir, "volume.log") + _ = os.Rename(oldLog, filepath.Join(c.logsDir, "volume.log.previous")) + if err := c.startVolume(c.volumeDataDirs); err != nil { + c.testingTB.Fatalf("restart volume server: %v", err) + } + if err := c.waitForHTTP(c.VolumeAdminURL() + "/status"); err != nil { + c.testingTB.Fatalf("wait for volume admin readiness after restart: %v\nvolume log tail:\n%s", err, c.tailLog("volume.log")) + } + if err := c.waitForTCP(c.VolumeGRPCAddress()); err != nil { + c.testingTB.Fatalf("wait for volume grpc readiness after restart: %v\nvolume log tail:\n%s", err, c.tailLog("volume.log")) + } +} + +// StopVolumeServer kills the volume server but leaves the master and data +// dirs alone. Pair with RestartVolumeServer or Stop. +func (c *Cluster) StopVolumeServer() { + c.testingTB.Helper() + stopProcess(c.volumeCmd) + c.volumeCmd = nil +} + func (c *Cluster) startMaster(dataDir string) error { logFile, err := os.Create(filepath.Join(c.logsDir, "master.log")) if err != nil { diff --git a/test/volume_server/grpc/ec_multi_disk_lifecycle_test.go b/test/volume_server/grpc/ec_multi_disk_lifecycle_test.go new file mode 100644 index 000000000..6b2325ae8 --- /dev/null +++ b/test/volume_server/grpc/ec_multi_disk_lifecycle_test.go @@ -0,0 +1,487 @@ +package volume_server_grpc_test + +import ( + "context" + "fmt" + "net/http" + "os" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" +) + +// TestEcLifecycleAcrossMultipleDisks drives encode, mount, read, drop-dat, +// cross-disk redistribute + restart + reconcile, blob delete, and rebuild on +// a 2-disk volume server. +func TestEcLifecycleAcrossMultipleDisks(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + const ( + dataDirCount = 2 + volumeID = uint32(9500) + collection = "ec-multi-disk" + ) + + clusterHarness := framework.StartSingleVolumeClusterWithDataDirs(t, matrix.P1(), dataDirCount) + dataDirs := clusterHarness.VolumeDataDirs() + if len(dataDirs) != dataDirCount { + t.Fatalf("expected %d data dirs, got %d: %v", dataDirCount, len(dataDirs), dataDirs) + } + + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + framework.AllocateVolume(t, grpcClient, volumeID, collection) + + // populate + type needleSpec struct { + fid string + payload []byte + } + needles := []needleSpec{ + {framework.NewFileID(volumeID, 9501, 0xAA000001), bytesOfLen(11, 0xA1)}, + {framework.NewFileID(volumeID, 9502, 0xAA000002), bytesOfLen(1024, 0xA2)}, + {framework.NewFileID(volumeID, 9503, 0xAA000003), bytesOfLen(63*1024+17, 0xA3)}, + {framework.NewFileID(volumeID, 9504, 0xAA000004), bytesOfLen(2, 0xA4)}, + {framework.NewFileID(volumeID, 9505, 0xAA000005), bytesOfLen(513*1024, 0xA5)}, + } + httpClient := framework.NewHTTPClient() + for _, n := range needles { + resp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), n.fid, n.payload) + _ = framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("upload %s expected 201, got %d", n.fid, resp.StatusCode) + } + } + + // encode — every shard lands on the .dat's disk + if _, err := grpcClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: volumeID, + Collection: collection, + }); err != nil { + t.Fatalf("VolumeEcShardsGenerate: %v", err) + } + postGenerateLayout := scanShardLayout(t, dataDirs, collection, volumeID) + if len(postGenerateLayout[0]) != erasure_coding.TotalShardsCount { + t.Fatalf("post-generate: expected all %d shards on disk 0, got per-disk layout %v", + erasure_coding.TotalShardsCount, postGenerateLayout) + } + if len(postGenerateLayout[1]) != 0 { + t.Fatalf("post-generate: disk 1 should be empty, got %v", postGenerateLayout[1]) + } + // .ecj is created lazily on first EC delete + for _, side := range []string{".ecx", ".vif"} { + if !fileExistsIn(dataDirs[0], collection, volumeID, side) { + t.Fatalf("post-generate: expected %s on disk 0", side) + } + } + + // mount + read via EC + if _, err := grpcClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, + Collection: collection, + ShardIds: dataShardIds(), + }); err != nil { + t.Fatalf("VolumeEcShardsMount data shards: %v", err) + } + for _, n := range needles { + verifyHTTPRead(t, httpClient, clusterHarness.VolumeAdminURL(), n.fid, n.payload, "after-mount") + } + + // drop the original volume so EC shards are the only source + if _, err := grpcClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{ + VolumeId: volumeID, + }); err != nil { + t.Fatalf("VolumeDelete (drop .dat): %v", err) + } + if fileExistsIn(dataDirs[0], collection, volumeID, ".dat") { + t.Fatalf("VolumeDelete left .dat in place on disk 0") + } + for _, n := range needles { + verifyHTTPRead(t, httpClient, clusterHarness.VolumeAdminURL(), n.fid, n.payload, "after-dat-delete") + } + + // move half the shards onto disk 1, leaving .ecx on disk 0 + clusterHarness.StopVolumeServer() + const splitAt = 7 + for shard := 0; shard < splitAt; shard++ { + movedFile(t, dataDirs[0], dataDirs[1], collection, volumeID, erasure_coding.ToExt(shard)) + } + postMoveLayout := scanShardLayout(t, dataDirs, collection, volumeID) + if len(postMoveLayout[0]) != erasure_coding.TotalShardsCount-splitAt { + t.Fatalf("post-move: expected %d shards on disk 0, got %v", erasure_coding.TotalShardsCount-splitAt, postMoveLayout[0]) + } + if len(postMoveLayout[1]) != splitAt { + t.Fatalf("post-move: expected %d shards on disk 1, got %v", splitAt, postMoveLayout[1]) + } + if !fileExistsIn(dataDirs[0], collection, volumeID, ".ecx") { + t.Fatalf("post-move: .ecx must stay on disk 0 to exercise the cross-disk reconcile path") + } + if fileExistsIn(dataDirs[1], collection, volumeID, ".ecx") { + t.Fatalf("post-move: .ecx leaked onto disk 1; reconcile path would not be exercised") + } + + clusterHarness.RestartVolumeServer() + conn2, grpcClient2 := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn2.Close() + + // VolumeEcShardsInfo only sees one disk's EcVolume; filesystem layout is + // the ground truth for the whole-store shard count. + postReconcileLayout := scanShardLayout(t, dataDirs, collection, volumeID) + if got, want := totalShardsInLayout(postReconcileLayout), erasure_coding.TotalShardsCount; got != want { + t.Fatalf("post-reconcile: total shards on disk mismatch: got %d, want %d (layout=%v)", got, want, postReconcileLayout) + } + if got, want := len(postReconcileLayout[0]), erasure_coding.TotalShardsCount-splitAt; got != want { + t.Fatalf("post-reconcile: disk 0 shard count drift: got %d, want %d (layout=%v)", got, want, postReconcileLayout) + } + if got, want := len(postReconcileLayout[1]), splitAt; got != want { + t.Fatalf("post-reconcile: disk 1 shard count drift: got %d, want %d (layout=%v)", got, want, postReconcileLayout) + } + if _, err := grpcClient2.VolumeEcShardsInfo(ctx, &volume_server_pb.VolumeEcShardsInfoRequest{ + VolumeId: volumeID, + }); err != nil { + t.Fatalf("VolumeEcShardsInfo after redistribute restart: %v", err) + } + for _, n := range needles { + verifyHTTPRead(t, httpClient, clusterHarness.VolumeAdminURL(), n.fid, n.payload, "after-cross-disk-reconcile") + } + + // blob delete on a cross-disk layout + deletedNeedle := needles[2] + deleteReq, err := http.NewRequest(http.MethodDelete, clusterHarness.VolumeAdminURL()+"/"+deletedNeedle.fid, nil) + if err != nil { + t.Fatalf("build delete request: %v", err) + } + deleteResp := framework.DoRequest(t, httpClient, deleteReq) + _ = framework.ReadAllAndClose(t, deleteResp) + if deleteResp.StatusCode != http.StatusAccepted { + t.Fatalf("HTTP DELETE %s expected 202, got %d", deletedNeedle.fid, deleteResp.StatusCode) + } + readResp := framework.ReadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), deletedNeedle.fid) + _ = framework.ReadAllAndClose(t, readResp) + if readResp.StatusCode == http.StatusOK { + t.Fatalf("HTTP GET on deleted EC needle expected non-200, got %d", readResp.StatusCode) + } + for _, n := range needles { + if n.fid == deletedNeedle.fid { + continue + } + verifyHTTPRead(t, httpClient, clusterHarness.VolumeAdminURL(), n.fid, n.payload, "after-blob-delete") + } + + // rebuild a shard whose inputs are split across two disks + clusterHarness.StopVolumeServer() + const repairTargetShard = 9 // sits on disk 0 after the split + shardPath := filepath.Join(dataDirs[0], shardFileName(collection, volumeID, repairTargetShard)) + if _, statErr := os.Stat(shardPath); statErr != nil { + t.Fatalf("repair setup: shard %d not on disk 0 (%s): %v", repairTargetShard, shardPath, statErr) + } + if err := os.Remove(shardPath); err != nil { + t.Fatalf("repair setup: remove shard %d on disk 0: %v", repairTargetShard, err) + } + clusterHarness.RestartVolumeServer() + conn3, grpcClient3 := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn3.Close() + + rebuildResp, err := grpcClient3.VolumeEcShardsRebuild(ctx, &volume_server_pb.VolumeEcShardsRebuildRequest{ + VolumeId: volumeID, + Collection: collection, + }) + if err != nil { + t.Fatalf("VolumeEcShardsRebuild after cross-disk shard loss: %v", err) + } + if !containsUint32(rebuildResp.GetRebuiltShardIds(), repairTargetShard) { + t.Fatalf("VolumeEcShardsRebuild expected to rebuild shard %d, got %v", + repairTargetShard, rebuildResp.GetRebuiltShardIds()) + } + if _, statErr := os.Stat(shardPath); statErr != nil { + t.Fatalf("rebuild did not restore shard %d on disk 0 (%s): %v", repairTargetShard, shardPath, statErr) + } + + if _, err := grpcClient3.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, + Collection: collection, + ShardIds: []uint32{repairTargetShard}, + }); err != nil { + t.Fatalf("VolumeEcShardsMount rebuilt shard %d: %v", repairTargetShard, err) + } + for _, n := range needles { + if n.fid == deletedNeedle.fid { + continue + } + verifyHTTPRead(t, httpClient, clusterHarness.VolumeAdminURL(), n.fid, n.payload, "after-repair") + } +} + +// TestEcPartialShardsOnSiblingDiskCleanedUpOnRestart seeds a healthy .dat on +// disk 0, plants the on-disk footprint of an interrupted EC encode on disk 1 +// (one shard plus .ecx / .ecj / .vif, no .dat), and asserts the startup prune +// wipes disk 1 without touching disk 0. +func TestEcPartialShardsOnSiblingDiskCleanedUpOnRestart(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + const ( + dataDirCount = 2 + volumeID = uint32(9501) + collection = "ec-9478" + partialShard = 1 + ) + + clusterHarness := framework.StartSingleVolumeClusterWithDataDirs(t, matrix.P1(), dataDirCount) + dataDirs := clusterHarness.VolumeDataDirs() + if len(dataDirs) != dataDirCount { + t.Fatalf("expected %d data dirs, got %d: %v", dataDirCount, len(dataDirs), dataDirs) + } + + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + framework.AllocateVolume(t, grpcClient, volumeID, collection) + + httpClient := framework.NewHTTPClient() + needleFid := framework.NewFileID(volumeID, 7777, 0xCAFEBABE) + needlePayload := bytesOfLen(4096, 0xC7) + upResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), needleFid, needlePayload) + _ = framework.ReadAllAndClose(t, upResp) + if upResp.StatusCode != http.StatusCreated { + t.Fatalf("seed upload expected 201, got %d", upResp.StatusCode) + } + + datPath := filepath.Join(dataDirs[0], datFileName(collection, volumeID)) + datInfo, err := os.Stat(datPath) + if err != nil { + t.Fatalf("seed setup: .dat must exist on disk 0 (%s): %v", datPath, err) + } + // prune's size guard refuses cleanup if the sibling .dat looks truncated + if datInfo.Size() == 0 { + t.Fatalf("seed setup: .dat on disk 0 is zero bytes") + } + + clusterHarness.StopVolumeServer() + plantPartialEc(t, dataDirs[1], collection, volumeID, partialShard, datInfo.Size()) + + preRestartLayout := scanShardLayout(t, dataDirs, collection, volumeID) + if len(preRestartLayout[0]) != 0 { + t.Fatalf("seed setup: disk 0 should hold no EC shards before restart, got %v", preRestartLayout[0]) + } + if len(preRestartLayout[1]) != 1 { + t.Fatalf("seed setup: disk 1 should hold exactly one planted shard, got %v", preRestartLayout[1]) + } + + clusterHarness.RestartVolumeServer() + + conn2, grpcClient2 := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn2.Close() + + // the volume admin port answers before the startup prune finishes + postPruneLayout := waitForLayout(t, dataDirs, collection, volumeID, func(layout map[int][]int) bool { + return len(layout[1]) == 0 + }, 10*time.Second) + + if len(postPruneLayout[1]) != 0 { + t.Fatalf("partial EC shards survived restart on disk 1: %v", postPruneLayout[1]) + } + // .vif is intentionally left behind: with no .ecx / shard to anchor it, + // the next startup ignores it. + for _, side := range []string{".ecx", ".ecj"} { + if fileExistsIn(dataDirs[1], collection, volumeID, side) { + t.Fatalf("%s survived prune on disk 1", side) + } + } + if !fileExistsIn(dataDirs[0], collection, volumeID, ".dat") { + t.Fatalf("prune wiped the healthy .dat on disk 0") + } + if !fileExistsIn(dataDirs[0], collection, volumeID, ".idx") { + t.Fatalf("prune wiped the healthy .idx on disk 0") + } + if _, err = grpcClient2.VolumeEcShardsInfo(ctx, &volume_server_pb.VolumeEcShardsInfoRequest{ + VolumeId: volumeID, + }); err == nil { + t.Fatalf("VolumeEcShardsInfo returned success after prune; the partial EC mount should be gone") + } + verifyHTTPRead(t, httpClient, clusterHarness.VolumeAdminURL(), needleFid, needlePayload, "after-prune") +} + +func bytesOfLen(n int, seed byte) []byte { + out := make([]byte, n) + for i := range out { + out[i] = byte(int(seed) + i*31) + } + return out +} + +func dataShardIds() []uint32 { + ids := make([]uint32, erasure_coding.DataShardsCount) + for i := range ids { + ids[i] = uint32(i) + } + return ids +} + +func datFileName(collection string, volumeID uint32) string { + if collection == "" { + return fmt.Sprintf("%d.dat", volumeID) + } + return fmt.Sprintf("%s_%d.dat", collection, volumeID) +} + +func shardFileName(collection string, volumeID uint32, shardID int) string { + if collection == "" { + return fmt.Sprintf("%d%s", volumeID, erasure_coding.ToExt(shardID)) + } + return fmt.Sprintf("%s_%d%s", collection, volumeID, erasure_coding.ToExt(shardID)) +} + +func sideFileName(collection string, volumeID uint32, ext string) string { + if collection == "" { + return fmt.Sprintf("%d%s", volumeID, ext) + } + return fmt.Sprintf("%s_%d%s", collection, volumeID, ext) +} + +func fileExistsIn(dir, collection string, volumeID uint32, ext string) bool { + _, err := os.Stat(filepath.Join(dir, sideFileName(collection, volumeID, ext))) + return err == nil +} + +func scanShardLayout(t testing.TB, dataDirs []string, collection string, volumeID uint32) map[int][]int { + t.Helper() + out := make(map[int][]int, len(dataDirs)) + for diskIdx, dir := range dataDirs { + out[diskIdx] = nil + for shard := 0; shard < erasure_coding.TotalShardsCount; shard++ { + if _, err := os.Stat(filepath.Join(dir, shardFileName(collection, volumeID, shard))); err == nil { + out[diskIdx] = append(out[diskIdx], shard) + } + } + sort.Ints(out[diskIdx]) + } + return out +} + +func waitForLayout(t testing.TB, dataDirs []string, collection string, volumeID uint32, pred func(map[int][]int) bool, timeout time.Duration) map[int][]int { + t.Helper() + deadline := time.Now().Add(timeout) + var layout map[int][]int + for time.Now().Before(deadline) { + layout = scanShardLayout(t, dataDirs, collection, volumeID) + if pred(layout) { + return layout + } + time.Sleep(100 * time.Millisecond) + } + return layout +} + +func movedFile(t testing.TB, fromDir, toDir, collection string, volumeID uint32, ext string) { + t.Helper() + name := sideFileName(collection, volumeID, ext) + src := filepath.Join(fromDir, name) + dst := filepath.Join(toDir, name) + if err := os.Rename(src, dst); err != nil { + t.Fatalf("move %s → %s: %v", src, dst, err) + } +} + +func totalShardsInLayout(layout map[int][]int) int { + n := 0 + for _, ids := range layout { + n += len(ids) + } + return n +} + +func containsUint32(s []uint32, v uint32) bool { + for _, x := range s { + if x == v { + return true + } + } + return false +} + +func verifyHTTPRead(t testing.TB, client *http.Client, base, fid string, want []byte, stage string) { + t.Helper() + resp := framework.ReadBytes(t, client, base, fid) + body := framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusOK { + t.Fatalf("[%s] GET %s expected 200, got %d", stage, fid, resp.StatusCode) + } + if len(body) != len(want) { + t.Fatalf("[%s] GET %s size mismatch: got %d, want %d", stage, fid, len(body), len(want)) + } + for i := range want { + if body[i] != want[i] { + t.Fatalf("[%s] GET %s byte %d mismatch: got %d, want %d", stage, fid, i, body[i], want[i]) + } + } +} + +// plantPartialEc writes the footprint of an interrupted EC encode (one shard +// sized for datFileSize, plus .ecx / .ecj / .vif, no .dat) into dir. +func plantPartialEc(t testing.TB, dir, collection string, volumeID uint32, shardID int, datFileSize int64) { + t.Helper() + shardPath := filepath.Join(dir, shardFileName(collection, volumeID, shardID)) + if f, err := os.Create(shardPath); err == nil { + expectedShardSize := plantedShardSize(datFileSize) + if err := f.Truncate(expectedShardSize); err != nil { + f.Close() + t.Fatalf("truncate planted shard: %v", err) + } + f.Close() + } else { + t.Fatalf("create planted shard %s: %v", shardPath, err) + } + for _, side := range []struct { + ext string + content []byte + }{ + {".ecx", []byte("dummy ecx")}, + {".ecj", nil}, + {".vif", nil}, + } { + p := filepath.Join(dir, sideFileName(collection, volumeID, side.ext)) + f, err := os.Create(p) + if err != nil { + t.Fatalf("create planted %s: %v", side.ext, err) + } + if side.content != nil { + if _, err := f.Write(side.content); err != nil { + f.Close() + t.Fatalf("write planted %s: %v", side.ext, err) + } + } + f.Close() + } +} + +// mirrors calculateExpectedShardSize in weed/storage/disk_location_ec.go +func plantedShardSize(datFileSize int64) int64 { + const dataShards = int64(erasure_coding.DataShardsCount) + largeBatch := int64(erasure_coding.ErasureCodingLargeBlockSize) * dataShards + numLarge := datFileSize / largeBatch + shardSize := numLarge * int64(erasure_coding.ErasureCodingLargeBlockSize) + remaining := datFileSize - numLarge*largeBatch + if remaining > 0 { + smallBatch := int64(erasure_coding.ErasureCodingSmallBlockSize) * dataShards + numSmall := (remaining + smallBatch - 1) / smallBatch + shardSize += numSmall * int64(erasure_coding.ErasureCodingSmallBlockSize) + } + return shardSize +} diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index dc74b5709..cb49e7f3e 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -274,6 +274,8 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku Url: dn.Url(), PublicUrl: dn.PublicUrl, DataCenter: dn.GetDataCenterId(), + // without this, clients derive grpc as httpPort+10000 + GrpcPort: uint32(dn.GrpcPort), }) } resp.ShardIdLocations = append(resp.ShardIdLocations, &master_pb.LookupEcVolumeResponse_EcShardIdLocation{ diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 1ce00127c..389e33207 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -482,7 +482,9 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea if !found { return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId) } - ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId)) + // shard may live on a sibling disk of this server; walk all of them + // under ecVolumesLock. + _, ecShard, found := vs.store.FindEcShard(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(req.ShardId)) if !found { return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId) } diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index a0157e353..7512d0430 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -230,6 +230,12 @@ func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) return 0, nil, false } +// FindEcShard returns the shard if any DiskLocation on this server holds it, +// along with that disk's id. +func (s *Store) FindEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (diskId uint32, shard *erasure_coding.EcVolumeShard, found bool) { + return s.findEcShard(vid, shardId) +} + func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) { for _, location := range s.Locations { if s, found := location.FindEcVolume(vid); found { @@ -412,7 +418,9 @@ func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) } func (s *Store) readLocalEcShardInterval(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.ShardId, buf []byte, offset int64) error { - shard, found := ecVolume.FindEcVolumeShard(shardId) + // findEcShard walks every DiskLocation under ecVolumesLock; the + // shard may live on a sibling disk of this server. + _, shard, found := s.findEcShard(ecVolume.VolumeId, shardId) if !found { return fmt.Errorf("shard %d for volume %d: %w", shardId, ecVolume.VolumeId, errShardNotLocal) }