mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
test(ec): end-to-end encode over a multi-server multi-disk stuck layout (#9728)
* test(framework): support multiple disks per server in MultiVolumeCluster StartMultiVolumeClusterWithDisks gives each volume server N data directories (one DiskLocation each), passed to -dir as a comma list, with a per-server disk-dir accessor for file inspection. StartMultiVolumeCluster keeps its one-disk default. * test(ec): end-to-end encode over a multi-server multi-disk stuck layout A volume in the stuck state — real .dat source, a 0-byte stub replica, and partial stale EC shards from an interrupted encode — must converge to one valid EC layout. Asserts the full shard set across servers, .ecx/.vif kept per server (info file survives the source-volume delete), stale shards cleared, and no regular .dat/.idx left behind.
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
@@ -24,6 +25,7 @@ type MultiVolumeCluster struct {
|
||||
logsDir string
|
||||
keepLogs bool
|
||||
volumeServerCount int
|
||||
disksPerServer int
|
||||
|
||||
masterPort int
|
||||
masterGrpcPort int
|
||||
@@ -31,6 +33,7 @@ type MultiVolumeCluster struct {
|
||||
volumePorts []int
|
||||
volumeGrpcPorts []int
|
||||
volumePubPorts []int
|
||||
volumeDiskDirs [][]string // per server: the data directories (one DiskLocation each)
|
||||
|
||||
masterCmd *exec.Cmd
|
||||
volumeCmds []*exec.Cmd
|
||||
@@ -38,13 +41,25 @@ type MultiVolumeCluster struct {
|
||||
cleanupOnce sync.Once
|
||||
}
|
||||
|
||||
// StartMultiVolumeCluster starts a cluster with a specified number of volume servers
|
||||
// StartMultiVolumeCluster starts a cluster with serverCount volume servers, one
|
||||
// data directory (DiskLocation) each.
|
||||
func StartMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCount int) *MultiVolumeCluster {
|
||||
return StartMultiVolumeClusterWithDisks(t, profile, serverCount, 1)
|
||||
}
|
||||
|
||||
// StartMultiVolumeClusterWithDisks starts serverCount volume servers, each with
|
||||
// disksPerServer data directories passed to -dir as a comma list so every
|
||||
// directory becomes its own DiskLocation. Lets tests exercise per-disk EC
|
||||
// layouts (e.g. shards on one disk, a stub/index on a sibling disk).
|
||||
func StartMultiVolumeClusterWithDisks(t testing.TB, profile matrix.Profile, serverCount, disksPerServer int) *MultiVolumeCluster {
|
||||
t.Helper()
|
||||
|
||||
if serverCount < 1 {
|
||||
t.Fatalf("serverCount must be at least 1, got %d", serverCount)
|
||||
}
|
||||
if disksPerServer < 1 {
|
||||
t.Fatalf("disksPerServer must be at least 1, got %d", disksPerServer)
|
||||
}
|
||||
|
||||
weedBinary, err := FindOrBuildWeedBinary()
|
||||
if err != nil {
|
||||
@@ -60,10 +75,21 @@ func StartMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCount i
|
||||
logsDir := filepath.Join(baseDir, "logs")
|
||||
masterDataDir := filepath.Join(baseDir, "master")
|
||||
|
||||
// Create directories for master and all volume servers
|
||||
// Create directories for master and all volume servers. With one disk the
|
||||
// layout stays at baseDir/volumeN; multi-disk servers get baseDir/volumeN/diskD.
|
||||
dirs := []string{configDir, logsDir, masterDataDir}
|
||||
volumeDiskDirs := make([][]string, serverCount)
|
||||
for i := 0; i < serverCount; i++ {
|
||||
dirs = append(dirs, filepath.Join(baseDir, fmt.Sprintf("volume%d", i)))
|
||||
serverDir := filepath.Join(baseDir, fmt.Sprintf("volume%d", i))
|
||||
volumeDiskDirs[i] = make([]string, disksPerServer)
|
||||
for d := 0; d < disksPerServer; d++ {
|
||||
if disksPerServer == 1 {
|
||||
volumeDiskDirs[i][d] = serverDir
|
||||
} else {
|
||||
volumeDiskDirs[i][d] = filepath.Join(serverDir, fmt.Sprintf("disk%d", d))
|
||||
}
|
||||
dirs = append(dirs, volumeDiskDirs[i][d])
|
||||
}
|
||||
}
|
||||
for _, dir := range dirs {
|
||||
if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil {
|
||||
@@ -98,11 +124,13 @@ func StartMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCount i
|
||||
logsDir: logsDir,
|
||||
keepLogs: keepLogs,
|
||||
volumeServerCount: serverCount,
|
||||
disksPerServer: disksPerServer,
|
||||
masterPort: masterPort,
|
||||
masterGrpcPort: masterGrpcPort,
|
||||
volumePorts: make([]int, serverCount),
|
||||
volumeGrpcPorts: make([]int, serverCount),
|
||||
volumePubPorts: make([]int, serverCount),
|
||||
volumeDiskDirs: volumeDiskDirs,
|
||||
volumeCmds: make([]*exec.Cmd, serverCount),
|
||||
}
|
||||
|
||||
@@ -133,8 +161,7 @@ func StartMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCount i
|
||||
|
||||
// Start all volume servers
|
||||
for i := 0; i < serverCount; i++ {
|
||||
volumeDataDir := filepath.Join(baseDir, fmt.Sprintf("volume%d", i))
|
||||
if err = c.startVolume(i, volumeDataDir); err != nil {
|
||||
if err = c.startVolume(i, c.volumeDiskDirs[i]); err != nil {
|
||||
// Log current server's log for debugging startup failures
|
||||
volumeLog := fmt.Sprintf("volume%d.log", i)
|
||||
c.Stop()
|
||||
@@ -207,13 +234,17 @@ func (c *MultiVolumeCluster) startMaster(dataDir string) error {
|
||||
return c.masterCmd.Start()
|
||||
}
|
||||
|
||||
func (c *MultiVolumeCluster) startVolume(index int, dataDir string) error {
|
||||
func (c *MultiVolumeCluster) startVolume(index int, dataDirs []string) error {
|
||||
logName := fmt.Sprintf("volume%d.log", index)
|
||||
logFile, err := os.Create(filepath.Join(c.logsDir, logName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maxPerDir := make([]string, len(dataDirs))
|
||||
for i := range dataDirs {
|
||||
maxPerDir[i] = "16"
|
||||
}
|
||||
args := []string{
|
||||
"-config_dir=" + c.configDir,
|
||||
"volume",
|
||||
@@ -221,8 +252,8 @@ func (c *MultiVolumeCluster) startVolume(index int, dataDir string) error {
|
||||
"-port=" + strconv.Itoa(c.volumePorts[index]),
|
||||
"-port.grpc=" + strconv.Itoa(c.volumeGrpcPorts[index]),
|
||||
"-port.public=" + strconv.Itoa(c.volumePubPorts[index]),
|
||||
"-dir=" + dataDir,
|
||||
"-max=16",
|
||||
"-dir=" + strings.Join(dataDirs, ","),
|
||||
"-max=" + strings.Join(maxPerDir, ","),
|
||||
"-master=127.0.0.1:" + strconv.Itoa(c.masterPort),
|
||||
"-readMode=" + c.profile.ReadMode,
|
||||
"-concurrentUploadLimitMB=" + strconv.Itoa(c.profile.ConcurrentUploadLimitMB),
|
||||
@@ -301,3 +332,17 @@ func (c *MultiVolumeCluster) VolumePublicURL(index int) string {
|
||||
func (c *MultiVolumeCluster) BaseDir() string {
|
||||
return c.baseDir
|
||||
}
|
||||
|
||||
// VolumeDiskDir returns the data directory backing disk diskIndex on volume
|
||||
// server serverIndex — i.e. the directory for that DiskLocation, for file
|
||||
// inspection in tests.
|
||||
func (c *MultiVolumeCluster) VolumeDiskDir(serverIndex, diskIndex int) string {
|
||||
if serverIndex < 0 || serverIndex >= len(c.volumeDiskDirs) {
|
||||
return ""
|
||||
}
|
||||
disks := c.volumeDiskDirs[serverIndex]
|
||||
if diskIndex < 0 || diskIndex >= len(disks) {
|
||||
return ""
|
||||
}
|
||||
return disks[diskIndex]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user