Files
Chris Lu cd15ae1395 fix(ec): bring ec.encode worker and EC/volume helpers to parity with shell (#9599)
* refactor(volume): extract replica sync/select into shared volume_replica package

Move the volume replica reconciliation helpers (status, union builder,
SyncAndSelectBestReplica, ReadNeedleMeta) out of the shell into a new
weed/storage/volume_replica package so both the shell (ec.encode, volume.tier.move,
volume.check.disk) and the EC encode worker can reuse them. No behavior change.

* fix(ec): bring ec.encode worker to parity with the shell

- Sync replicas and encode the most-complete one (via the shared
  volume_replica.SyncAndSelectBestReplica) instead of a possibly-stale replica,
  marking all replicas readonly first. Prevents silent data loss when a stale
  replica is encoded and the originals deleted.
- Skip remote/tiered volumes in detection (shell ec.encode excludes them).
- Min-node safety gate: refuse to encode when cluster nodes < parity shards.
- Align default thresholds with the shell (fullness 0.95, quiet 1h).

* fix(vacuum): plugin path honors min_volume_age_seconds override

deriveVacuumConfig hard-coded MinVolumeAgeSeconds=0, dropping any configured
value. Read it from worker config (default 0, matching the shell/master vacuum
which has no age gate) so an explicit override is honored.

* address review feedback

- config.go: align GetConfigSpec schema defaults (quiet_for_seconds=3600,
  fullness_ratio=0.95) with the runtime defaults so UI/bootstrap flows match the
  shell (coderabbitai).
- ec_task.go: roll back readonly when markReplicasReadonly fails partway, so
  already-marked replicas don't stay readonly (coderabbitai).
- volume_replica: pass the caller's replica statuses into buildUnionReplica instead
  of re-fetching them, and skip the per-needle ReadNeedleMeta RPC when the source
  replica is read-only (gemini-code-assist).

* test(plugin_workers/ec): make fixtures eligible under the new defaults

The default EC encode thresholds were raised to match the shell (fullness 0.95,
quiet 1h), but the plugin-worker integration fixtures still used 90%-full /
10-minute-old volumes, so detection found no eligible volumes and the tests failed
in CI. Bump the eligible fixtures to 96% full and 2h old.
2026-05-21 02:16:28 -07:00

127 lines
3.9 KiB
Go

package erasure_coding_test
import (
"context"
"fmt"
"testing"
"time"
pluginworkers "github.com/seaweedfs/seaweedfs/test/plugin_workers"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func TestErasureCodingDetectionLargeTopology(t *testing.T) {
const (
rackCount = 100
serverCount = 1000
volumesPerNode = 300
volumeSizeLimit = uint64(100)
)
if serverCount%rackCount != 0 {
t.Fatalf("serverCount (%d) must be divisible by rackCount (%d)", serverCount, rackCount)
}
nodesPerRack := serverCount / rackCount
// Eligible volumes must exceed the default fullness (0.95) and quiet (1h)
// thresholds; ineligible ones fall below the fullness threshold.
eligibleSize := uint64(96) * 1024 * 1024
ineligibleSize := uint64(10) * 1024 * 1024
modifiedAt := time.Now().Add(-2 * time.Hour).Unix()
volumeID := uint32(1)
dataCenters := make([]*master_pb.DataCenterInfo, 0, 1)
racks := make([]*master_pb.RackInfo, 0, rackCount)
for rack := 0; rack < rackCount; rack++ {
nodes := make([]*master_pb.DataNodeInfo, 0, nodesPerRack)
for node := 0; node < nodesPerRack; node++ {
address := fmt.Sprintf("10.0.%d.%d:8080", rack, node+1)
volumes := make([]*master_pb.VolumeInformationMessage, 0, volumesPerNode)
for v := 0; v < volumesPerNode; v++ {
size := ineligibleSize
if volumeID%2 == 0 {
size = eligibleSize
}
volumes = append(volumes, &master_pb.VolumeInformationMessage{
Id: volumeID,
Collection: "ec-bulk",
DiskId: 0,
Size: size,
DeletedByteCount: 0,
ModifiedAtSecond: modifiedAt,
ReplicaPlacement: 1,
ReadOnly: false,
})
volumeID++
}
diskInfo := &master_pb.DiskInfo{
DiskId: 0,
MaxVolumeCount: int64(volumesPerNode + 10),
VolumeCount: int64(volumesPerNode),
VolumeInfos: volumes,
}
nodes = append(nodes, &master_pb.DataNodeInfo{
Id: address,
Address: address,
DiskInfos: map[string]*master_pb.DiskInfo{"hdd": diskInfo},
})
}
racks = append(racks, &master_pb.RackInfo{
Id: fmt.Sprintf("rack-%d", rack+1),
DataNodeInfos: nodes,
})
}
dataCenters = append(dataCenters, &master_pb.DataCenterInfo{
Id: "dc-1",
RackInfos: racks,
})
response := &master_pb.VolumeListResponse{
VolumeSizeLimitMb: volumeSizeLimit,
TopologyInfo: &master_pb.TopologyInfo{
DataCenterInfos: dataCenters,
},
}
master := pluginworkers.NewMasterServer(t, response)
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
handler := erasure_coding.NewErasureCodingHandler(dialOption, t.TempDir())
harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{
WorkerOptions: pluginworker.WorkerOptions{
GrpcDialOption: dialOption,
},
Handlers: []pluginworker.JobHandler{handler},
})
harness.WaitForJobType("erasure_coding")
totalVolumes := serverCount * volumesPerNode
expectedEligible := totalVolumes / 2
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
start := time.Now()
proposals, err := harness.Plugin().RunDetection(ctx, "erasure_coding", &plugin_pb.ClusterContext{
MasterGrpcAddresses: []string{master.Address()},
}, 0)
duration := time.Since(start)
require.NoError(t, err)
require.GreaterOrEqual(t, len(proposals), 10, "should detect at least some proposals")
t.Logf("large topology detection completed in %s (proposals=%d, eligible=%d)", duration, len(proposals), expectedEligible)
if len(proposals) < expectedEligible {
t.Logf("large topology detection stopped early: %d proposals vs %d eligible", len(proposals), expectedEligible)
}
}