fix(ec): correct multi-disk disk counting and EC balance shard attribution (#9594)

* fix(shell): count physical disks in cluster.status on multi-disk nodes

The master keys DataNodeInfo.DiskInfos by disk type, so several same-type
physical disks on one node collapse into a single DiskInfo entry. cluster.status
(printClusterInfo) and CountTopologyResources counted len(DiskInfos), reporting
one disk per node instead of the real physical disk count, while volume.list and
the admin ActiveTopology already split per physical disk.

Route both counters through DiskInfo.SplitByPhysicalDisk so a node with N
same-type disks reports N. Cosmetic/diagnostic only; placement already uses the
per-disk activeDisk map.

* fix(ec): attribute EC balance source disk per shard and reject same-node moves

On multi-disk nodes the EC balance worker built a node-level view that kept only
the first physical disk id per (node, volume), so a move of a shard living on a
different disk reported the wrong source disk. That source disk drives the
per-disk capacity reservation, so the wrong disk drifts the capacity model the
EC placement planner relies on. Track shards per physical disk and resolve the
actual source disk for every emitted move (dedup, cross-rack, within-rack,
global), keeping the per-disk view consistent as simulated moves are applied.

Also close a data-loss trap: VolumeEcShardsDelete is node-wide (it removes the
shard from every disk on the node) and copyAndMountShard skips the copy when
source and target addresses match, so a same-node move would erase a shard it
never copied. isDedupPhase now requires the same node AND disk, and Validate /
Execute reject same-node cross-disk moves outright.

* fix(ec): spread EC balance moves across destination disks

Port the shell ec.balance pickBestDiskOnNode heuristic to the EC balance
worker so a moved shard is placed on a good physical disk instead of always
deferring to the volume server (target disk 0). The detection now builds a
per-physical-disk view of each node (free slots split from the node total, exact
EC shard count, disk type, discovered from both regular volumes and EC shards)
and, for each cross-rack, within-rack, and global move, chooses the destination
disk by ascending score:
  - fewer total EC shards on the disk,
  - far fewer shards of the same volume on the disk (spread a volume's shards
    across disks for fault tolerance), and
  - data/parity anti-affinity (a data shard avoids disks holding the volume's
    parity shards and vice versa).

Planned placements are reserved on the in-memory model during a run so multiple
shards moved to the same node spread across its disks rather than piling on one.

* fix(ec): bring EC balance worker to parity with shell ec.balance

The worker's cross-rack and within-rack balancing balanced shards by total
count; the shell balances data and parity shards separately with anti-affinity
and honors replica placement. Port that logic so the automatic balancer makes
the same fault-tolerance-aware decisions as the manual command:

- Cross-rack and within-rack now run a two-pass balance: data shards spread
  first, then parity shards spread while avoiding racks/nodes that already hold
  the volume's data shards (anti-affinity), mirroring doBalanceEcShardsAcrossRacks
  and doBalanceEcShardsWithinOneRack.
- Optional replica placement: a new replica_placement config (e.g. "020")
  constrains shards per rack (DiffRackCount) and per node (SameRackCount); empty
  keeps the previous even-spread behavior.
- The data/parity boundary is resolved from a per-collection EC ratio (standard
  10+4 here), replacing the previously hardcoded constant at the call sites.

Selection is deterministic (sorted keys) to keep behavior reproducible.

* refactor(ec): extract shared ecbalancer package for shell and worker

The EC shard balancing policy was duplicated between the shell ec.balance
command and the admin EC balance worker, and the two had drifted (multi-disk
handling, data/parity anti-affinity, replica placement). Extract the policy into
a new pure package, weed/storage/erasure_coding/ecbalancer, that both callers
share so it cannot drift again.

- ecbalancer.Plan(topology, options) runs the full policy (dedup, cross-rack and
  within-rack data/parity two-pass with anti-affinity, global per-rack balance,
  and diversity-aware disk selection) over a caller-built Topology snapshot and
  returns the shard Moves. It depends only on erasure_coding and super_block.
- The worker builds the Topology from the master topology and turns Moves into
  task proposals; the shell builds it from its EcNode model and executes Moves
  via the existing move/delete RPCs. Per-collection EC ratio resolution stays in
  each caller (passed as Options.Ratio).
- Options expose the two genuine policy differences: GlobalUtilizationBased
  (worker balances by fractional fullness; shell by raw count) and
  GlobalMaxMovesPerRack (worker moves incrementally across cycles; shell drains
  in one pass).

The shell keeps pickBestDiskOnNode for the evacuate command. Policy tests move to
the ecbalancer package; the shell and worker keep their adapter/execution tests.

* fix(ec): restore parallelism and per-type/full-range balancing after ecbalancer refactor

Address regressions and gaps from the ecbalancer extraction:

- Shell ec.balance honors -maxParallelization again: planned moves run phase by
  phase (preserving cross-phase dependencies) with bounded concurrency within a
  phase. Apply mode does only the RPCs concurrently; dry-run stays sequential and
  updates the in-memory model for inspection.
- Rack and node balancing gate on per-type spread (data and parity separately)
  instead of combined totals, so a data/parity skew is corrected even when the
  per-rack/node totals are even.
- Global rack balancing iterates the full shard-id space (MaxShardCount) so
  custom EC ratios with more than the standard total are candidates.
- Cross-rack planning decrements the destination node's free slots per planned
  move, so limited-capacity targets are no longer over-planned.

* fix(ec): make EC dedup keeper deterministic and capacity-aware

When a shard is duplicated across nodes, keep the copy on the node with the most
free slots and delete the duplicates from the more-constrained nodes, relieving
capacity pressure where it is tightest. Tie-break on node id so the choice is
deterministic. This unifies the shell and worker (the shell previously kept the
least-free node, an incidental default) on the more sensible behavior.

* fix(ec): restore global volume-diversity and per-volume move serialization

Two more behaviors lost in the ecbalancer refactor:

- Global rack balancing again prefers moving a shard of a volume the destination
  does not hold at all before adding another shard of an already-present volume
  (two-pass, mirroring the old balanceEcRack), keeping each volume's shards
  spread across nodes.
- Shell apply-mode execution serializes a single volume's moves within a phase
  while still running different volumes in parallel, so concurrent moves of the
  same volume cannot race on its shared .ecx/.ecj/.vif sidecar files.

* fix(ec): key EC balance shards by (collection, volume id)

A numeric volume id can be reused across collections, and EC identity is
(collection, vid) (see store_ec_attach_reservation.go). The ecbalancer keyed
Node.shards by vid alone, so volumes sharing an id across collections merged into
one entry — letting dedup delete a "duplicate" that is actually a different
collection's shard, and letting moves act across collections. Key shards by
(collection, vid) throughout so each volume stays distinct.

* fix(ec): credit freed capacity from dedup before later balance phases

Dedup deletions are simulated only by applyMovesToTopology, which cleared shard
bits but did not return the freed disk/node/rack slots. Later phases reject
destinations with no free slots, so a slot opened by dedup could not be reused in
the same Plan/ec.balance run. applyMovesToTopology now credits the freed
disk/node/rack capacity for dedup moves (non-dedup moves still rely on the inline
accounting their phase already did).

* test(ec): add multi-disk EC balance integration test

Cover issue 9593 end-to-end at the unit level the old tests missed: build the
master's actual multi-disk wire format (same-type disks collapsed into one
DiskInfo, real DiskId only in per-shard records), run it through a real
ActiveTopology and the Detection entry point, then replay the planned moves with
the volume server's true semantics (node-wide VolumeEcShardsDelete) and assert no
EC shard is ever lost. Covers a balanced spread, a one-node-concentrated volume,
and a multi-rack spread, and asserts moves are safe (no same-node cross-disk),
correctly attributed to the source disk, and redistribute concentrated volumes
across both other racks and multiple destination disks.

* fix(ec): aggregate per-disk EC shards when verifying multi-disk volumes

collectEcNodeShardsInfo overwrote its per-server entry for each EcShardInfo of a
volume. A multi-disk node reports one EcShardInfo per physical disk holding shards
of the volume, so only the last disk's shards survived — the node looked like it
was missing shards it actually had. This made ec.encode's pre-delete verification
(and ec.decode) under-count volumes whose shards are spread across disks on one
server, falsely aborting the encode on multi-disk clusters. Union the per-disk
shard sets per server instead.

Also make verifyEcShardsBeforeDelete poll briefly: shard relocations reach the
master via volume-server heartbeats, so a freshly distributed shard set may not be
fully visible the instant the balance returns. Retry before concluding the set is
incomplete; genuine loss still fails after the retries are exhausted.

* test(ec): end-to-end multi-disk EC balance shard-loss regression

Start a real cluster of multi-disk volume servers (3 servers x 4 disks),
EC-encode a volume, run ec.balance, and assert hard invariants the prior
integration tests only logged: after encode all 14 shards exist, ec.balance loses
no shard, shards span more than one disk per node, and cluster.status counts
physical disks (not one per node). This reproduces issue 9593 end to end and would
have caught the multi-disk shard-aggregation bug fixed alongside it.

* fix(ec): bring EC balance worker/plugin path to parity with shell

- Per-volume serialization and phase order: key the plugin proposal dedupe by
  (collection, volume) instead of (volume, shard, source), so the scheduler runs
  only one of a volume's moves at a time (within a run and against in-flight jobs).
  Concurrent same-volume moves raced on the volume's .ecx/.ecj/.vif sidecars; and
  because the planner emits a volume's moves in phase order, they now execute in
  order across detection cycles, matching the shell.
- disk_type "hdd": normalize via ToDiskType (hdd -> "" HardDriveType) while keeping
  a "filter requested" flag, so disk_type=hdd matches the empty-keyed HDD disks
  instead of nothing; apply the canonical type to planner options and move params.
- Replica placement: expose shard_replica_placement in the admin config form and
  read it into the worker config, mirroring ec.balance -shardReplicaPlacement.

* test(ec): rename worker in-process test (not a real integration test)

The worker-package multi-disk tests build a fake master topology and simulate
move execution; they are not real-cluster integration tests. Rename
integration_test.go -> multidisk_detection_test.go and drop the Integration
prefix so 'integration' refers only to the real-cluster E2Es in test/erasure_coding.

* ci(ec): remove redundant ec-integration workflow

ec-integration.yml duplicated EC Integration Tests under the same workflow name
but ran only 'go test ec_integration_test.go' (one file), so it never ran new
test files (e.g. multidisk_shardloss_test.go) and was a strict, path-filtered
subset of ec-integration-tests.yml, which already runs 'go test -v' over the whole
test/erasure_coding package on every push/PR.

* fix(ec): worker falls back to master default replication for EC balance

For strict parity with the shell, the EC balance worker now uses the master's
configured default replication as the replica-placement fallback when no explicit
shard_replica_placement is set, instead of always defaulting to even spread.

The maintenance scanner reads it via GetMasterConfiguration each cycle and passes
it through ClusterInfo.DefaultReplicaPlacement; detection resolves the constraint
(explicit config wins, else master default, else none) in resolveReplicaPlacement.
A zero-replication default (the common 000 case) still means even spread, so the
common configuration is unchanged.

* fix(ec): plugin path populates master default replication too

The plugin worker built ClusterInfo with only ActiveTopology, so the master
default replication fallback added for the maintenance path never reached
plugin-driven EC balance detection — empty shard_replica_placement still meant
even spread there. Fetch the master default via GetMasterConfiguration (new
pluginworker.FetchDefaultReplicaPlacement) and set ClusterInfo.DefaultReplicaPlacement
so both detection paths resolve replica placement identically to the shell.

* docs(ec): empty shard replica placement uses master default, not even spread

The EC balance config text (admin plugin form, legacy form help text, and
the struct/proto field comments) still said an empty shard_replica_placement
spreads evenly. The runtime resolves empty to the master default replication
(resolveReplicaPlacement), matching shell ec.balance, with even spread only
when that default is empty or zero. Update the text to match and regenerate
worker_pb for the proto comment change.
This commit is contained in:
Chris Lu
2026-05-20 23:31:21 -07:00
committed by GitHub
parent afcc491517
commit 391f543ff2
27 changed files with 2799 additions and 2656 deletions
-49
View File
@@ -1,49 +0,0 @@
name: EC Integration Tests
on:
push:
branches: [ master ]
paths:
- 'weed/admin/**'
- 'weed/worker/**'
- 'test/erasure_coding/admin_dockertest/**'
- '.github/workflows/ec-integration.yml'
pull_request:
branches: [ master ]
paths:
- 'weed/admin/**'
- 'weed/worker/**'
- 'test/erasure_coding/admin_dockertest/**'
- '.github/workflows/ec-integration.yml'
jobs:
ec-integration-test:
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
- name: Checkout code
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version-file: 'go.mod'
- name: Build weed binary
run: |
cd weed
go build -o ../weed_bin
- name: Run EC integration tests
run: |
cd test/erasure_coding/admin_dockertest
go test -v -timeout 15m ec_integration_test.go
- name: Upload test logs on failure
if: failure()
uses: actions/upload-artifact@v7
with:
name: ec-test-logs
path: test/erasure_coding/admin_dockertest/tmp/logs/
retention-days: 7
@@ -0,0 +1,190 @@
package erasure_coding
import (
"context"
"fmt"
"path/filepath"
"regexp"
"strconv"
"strings"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/shell"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
// TestMultiDiskECBalanceNoShardLoss is the end-to-end regression for issue 9593.
// It runs a real cluster of multi-disk volume servers (3 servers x 4 disks),
// EC-encodes a volume, then runs ec.balance, asserting hard invariants the older
// integration tests only logged:
//
// - after encode the full set of 14 EC shards exists,
// - ec.balance never loses a shard (still 14 distinct shards afterwards),
// - shards end up spread across more than one disk per node, and
// - cluster.status counts physical disks (not one per node) and matches the
// real on-disk distribution.
func TestMultiDiskECBalanceNoShardLoss(t *testing.T) {
if testing.Short() {
t.Skip("Skipping multi-disk EC integration test in short mode")
}
testDir := t.TempDir()
ctx, cancel := context.WithTimeout(context.Background(), 240*time.Second)
defer cancel()
cluster, err := startMultiDiskCluster(ctx, testDir)
require.NoError(t, err)
defer cluster.Stop()
require.NoError(t, waitForServer("127.0.0.1:9334", 30*time.Second))
for i := 0; i < 3; i++ {
require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:809%d", i), 30*time.Second))
}
t.Log("waiting for multi-disk volume servers to register...")
time.Sleep(10 * time.Second)
commandEnv := shell.NewCommandEnv(&shell.ShellOptions{
Masters: stringPtr("127.0.0.1:9334"),
GrpcDialOption: grpc.WithInsecure(),
FilerGroup: stringPtr("default"),
})
connectToMasterAndSync(ctx, t, commandEnv)
// Upload enough small files that the volume holds real data to encode.
var volumeId needle.VolumeId
for retry := 0; retry < 5; retry++ {
volumeId, err = uploadTestDataToMaster([]byte(strings.Repeat("multidisk-ec-9593 ", 64)), "127.0.0.1:9334")
if err == nil {
break
}
time.Sleep(3 * time.Second)
}
require.NoError(t, err, "failed to upload test data")
for i := 0; i < 40; i++ {
if _, e := uploadTestDataToMaster([]byte(strings.Repeat("filler ", 128)), "127.0.0.1:9334"); e != nil {
break
}
}
t.Logf("using volume %d", volumeId)
time.Sleep(3 * time.Second)
locked, unlock := tryLockWithTimeout(t, commandEnv, 15*time.Second)
require.True(t, locked, "could not acquire shell lock")
defer unlock()
// EC-encode the volume.
out, err := captureCommandOutput(t, shell.Commands[findCommandIndex("ec.encode")],
[]string{"-volumeId", fmt.Sprintf("%d", volumeId), "-collection", "test", "-force"}, commandEnv)
t.Logf("ec.encode output:\n%s", out)
require.NoError(t, err, "ec.encode failed")
// All 14 shards must exist after encoding.
require.Eventually(t, func() bool {
return len(collectDistinctShardIDs(testDir, uint32(volumeId))) == erasureShardCount
}, 30*time.Second, time.Second, "expected all %d EC shards after encode, got %v",
erasureShardCount, collectDistinctShardIDs(testDir, uint32(volumeId)))
beforeBalance := collectDistinctShardIDs(testDir, uint32(volumeId))
t.Logf("after encode: %d distinct shards on %d disks", len(beforeBalance), disksWithShards(testDir, uint32(volumeId)))
// Run ec.balance.
out, err = captureCommandOutput(t, shell.Commands[findCommandIndex("ec.balance")],
[]string{"-collection", "test", "-force"}, commandEnv)
t.Logf("ec.balance output:\n%s", out)
require.NoError(t, err, "ec.balance failed")
time.Sleep(3 * time.Second)
// The core regression: ec.balance must not lose any shard.
afterBalance := collectDistinctShardIDs(testDir, uint32(volumeId))
require.Equal(t, erasureShardCount, len(afterBalance),
"ec.balance lost shards on multi-disk nodes: had %v, now %v", sortedKeysOf(beforeBalance), sortedKeysOf(afterBalance))
// Shards must be spread across more than one physical disk per node overall.
usedDisks := disksWithShards(testDir, uint32(volumeId))
assert.Greater(t, usedDisks, 3, "EC shards should span more than one disk per node (got %d disks across 3 nodes)", usedDisks)
// cluster.status must count physical disks, not collapse to one per node: it
// must report at least the disks actually holding this volume's shards (which
// is already >3 across the 3 nodes). Before the fix it reported 3 (node count).
require.Eventually(t, func() bool {
n, ok := clusterStatusDiskCount(t, commandEnv)
return ok && n >= usedDisks
}, 30*time.Second, 2*time.Second, "cluster.status never reported the >=%d physical disks holding shards (multi-disk count)", usedDisks)
n, _ := clusterStatusDiskCount(t, commandEnv)
t.Logf("cluster.status reports %d physical disks (>= %d holding this volume's shards)", n, usedDisks)
}
const erasureShardCount = 14 // 10 data + 4 parity
// collectDistinctShardIDs returns the set of EC shard ids present for a volume
// across every disk of every server in the multi-disk test layout.
func collectDistinctShardIDs(testDir string, volumeId uint32) map[int]bool {
ids := map[int]bool{}
for server := 0; server < 3; server++ {
for disk := 0; disk < 4; disk++ {
diskDir := filepath.Join(testDir, fmt.Sprintf("server%d_disk%d", server, disk))
files, err := listECShardFiles(diskDir, volumeId)
if err != nil {
continue
}
for _, f := range files {
i := strings.LastIndex(f, ".ec")
if i < 0 {
continue
}
if n, err := strconv.Atoi(f[i+3:]); err == nil && n >= 0 && n < erasureShardCount {
ids[n] = true
}
}
}
}
return ids
}
// disksWithShards counts how many physical disks hold at least one shard.
func disksWithShards(testDir string, volumeId uint32) int {
n := 0
for _, disks := range countShardsPerDisk(testDir, volumeId) {
for _, c := range disks {
if c > 0 {
n++
}
}
}
return n
}
var diskCountRe = regexp.MustCompile(`(\d+)\s+disks?`)
// clusterStatusDiskCount runs cluster.status and parses the reported disk count.
func clusterStatusDiskCount(t *testing.T, commandEnv *shell.CommandEnv) (int, bool) {
t.Helper()
out, err := captureCommandOutput(t, shell.Commands[findCommandIndex("cluster.status")], []string{}, commandEnv)
if err != nil {
return 0, false
}
m := diskCountRe.FindStringSubmatch(out)
if m == nil {
return 0, false
}
n, err := strconv.Atoi(m[1])
return n, err == nil
}
func sortedKeysOf(m map[int]bool) []int {
out := make([]int, 0, len(m))
for k := range m {
out = append(out, k)
}
for i := 1; i < len(out); i++ {
for j := i; j > 0 && out[j-1] > out[j]; j-- {
out[j-1], out[j] = out[j], out[j-1]
}
}
return out
}
@@ -26,6 +26,10 @@ type MaintenanceIntegration struct {
// Active topology for task detection and target selection
activeTopology *topology.ActiveTopology
// Master's default replication, refreshed by the scanner each cycle and
// passed to detectors as the replica-placement fallback (matches the shell).
defaultReplicaPlacement string
// Type conversion maps
taskTypeMap map[types.TaskType]MaintenanceTaskType
revTaskTypeMap map[MaintenanceTaskType]types.TaskType
@@ -219,9 +223,10 @@ func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.Vo
// Create cluster info
clusterInfo := &types.ClusterInfo{
TotalVolumes: len(filteredMetrics),
LastUpdated: time.Now(),
ActiveTopology: s.activeTopology, // Provide ActiveTopology for destination planning
TotalVolumes: len(filteredMetrics),
LastUpdated: time.Now(),
ActiveTopology: s.activeTopology, // Provide ActiveTopology for destination planning
DefaultReplicaPlacement: s.defaultReplicaPlacement,
}
// Run detection for each registered task type
@@ -271,6 +276,12 @@ func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.Vo
return allResults, nil
}
// SetDefaultReplicaPlacement records the master's default replication so detectors
// can use it as the replica-placement fallback (matching the shell).
func (s *MaintenanceIntegration) SetDefaultReplicaPlacement(replicaPlacement string) {
s.defaultReplicaPlacement = replicaPlacement
}
// UpdateTopologyInfo updates the volume shard tracker with topology information for empty servers
func (s *MaintenanceIntegration) UpdateTopologyInfo(topologyInfo *master_pb.TopologyInfo) error {
// Log topology details before update for diagnostics
@@ -51,6 +51,10 @@ func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult,
}
}
// Refresh the master's default replication so detectors can use it as the
// replica-placement fallback (matches the shell ec.balance default).
ms.integration.SetDefaultReplicaPlacement(ms.getDefaultReplicaPlacement())
// Use task detection system with complete cluster information
results, err := ms.integration.ScanWithTaskDetectors(taskMetrics)
if err != nil {
@@ -67,6 +71,26 @@ func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult,
return []*TaskDetectionResult{}, nil
}
// getDefaultReplicaPlacement reads the master's configured default replication,
// used by detectors as the replica-placement fallback. Returns "" on error so
// detectors fall back to even spread rather than failing the scan.
func (ms *MaintenanceScanner) getDefaultReplicaPlacement() string {
var replicaPlacement string
err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return err
}
replicaPlacement = resp.DefaultReplication
return nil
})
if err != nil {
glog.V(1).Infof("could not fetch master default replication: %v", err)
return ""
}
return replicaPlacement
}
// getVolumeHealthMetrics collects health information for all volumes.
// Returns metrics in task-system format directly (no intermediate copy) and
// the topology info for updating the active topology.
@@ -0,0 +1,54 @@
package topology
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// TestCountTopologyResources_multiDiskPerNode covers the case where the master
// keys DiskInfos by disk type, so several same-type physical disks on a node
// collapse into a single DiskInfo entry. Counting len(DiskInfos) under-reports
// the physical disk count and disagrees with the per-disk activeDisk map that
// the rest of the admin topology builds via SplitByPhysicalDisk.
func TestCountTopologyResources_multiDiskPerNode(t *testing.T) {
makeNode := func(id string) *master_pb.DataNodeInfo {
var ecShardInfos []*master_pb.VolumeEcShardInformationMessage
for diskId := uint32(0); diskId < 6; diskId++ {
ecShardInfos = append(ecShardInfos, &master_pb.VolumeEcShardInformationMessage{
Id: diskId + 1,
DiskId: diskId,
EcIndexBits: 1,
})
}
return &master_pb.DataNodeInfo{
Id: id,
DiskInfos: map[string]*master_pb.DiskInfo{
"": {Type: "", MaxVolumeCount: 60, EcShardInfos: ecShardInfos},
},
}
}
topo := &master_pb.TopologyInfo{
Id: "multi_disk_topo",
DataCenterInfos: []*master_pb.DataCenterInfo{{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{{
Id: "rack1",
DataNodeInfos: []*master_pb.DataNodeInfo{
makeNode("node1"), makeNode("node2"), makeNode("node3"),
},
}},
}},
}
dcCount, nodeCount, diskCount := CountTopologyResources(topo)
if dcCount != 1 {
t.Errorf("dcCount = %d, want 1", dcCount)
}
if nodeCount != 3 {
t.Errorf("nodeCount = %d, want 3", nodeCount)
}
if diskCount != 18 {
t.Errorf("diskCount = %d, want 18 (6 physical disks x 3 nodes)", diskCount)
}
}
+6 -1
View File
@@ -19,7 +19,12 @@ func CountTopologyResources(topologyInfo *master_pb.TopologyInfo) (dcCount, node
for _, rack := range dc.RackInfos {
nodeCount += len(rack.DataNodeInfos)
for _, node := range rack.DataNodeInfos {
diskCount += len(node.DiskInfos)
// DiskInfos is keyed by disk type, so same-type physical disks
// collapse into one entry. Count physical disks so the number
// matches the per-disk activeDisk map.
for _, diskInfo := range node.DiskInfos {
diskCount += len(diskInfo.SplitByPhysicalDisk())
}
}
}
}
+1
View File
@@ -413,6 +413,7 @@ message EcBalanceTaskConfig {
string collection_filter = 3; // Collection filter
string disk_type = 4; // Disk type filter
repeated string preferred_tags = 5; // Preferred disk tags for placement
string replica_placement = 6; // EC shard replica placement (e.g. "020"); empty falls back to master default replication
}
// ========== Task Persistence Messages ==========
+11 -2
View File
@@ -3297,6 +3297,7 @@ type EcBalanceTaskConfig struct {
CollectionFilter string `protobuf:"bytes,3,opt,name=collection_filter,json=collectionFilter,proto3" json:"collection_filter,omitempty"` // Collection filter
DiskType string `protobuf:"bytes,4,opt,name=disk_type,json=diskType,proto3" json:"disk_type,omitempty"` // Disk type filter
PreferredTags []string `protobuf:"bytes,5,rep,name=preferred_tags,json=preferredTags,proto3" json:"preferred_tags,omitempty"` // Preferred disk tags for placement
ReplicaPlacement string `protobuf:"bytes,6,opt,name=replica_placement,json=replicaPlacement,proto3" json:"replica_placement,omitempty"` // EC shard replica placement (e.g. "020"); empty falls back to master default replication
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -3366,6 +3367,13 @@ func (x *EcBalanceTaskConfig) GetPreferredTags() []string {
return nil
}
func (x *EcBalanceTaskConfig) GetReplicaPlacement() string {
if x != nil {
return x.ReplicaPlacement
}
return ""
}
// MaintenanceTaskData represents complete task state for persistence
type MaintenanceTaskData struct {
state protoimpl.MessageState `protogen:"open.v1"`
@@ -4238,13 +4246,14 @@ const file_worker_proto_rawDesc = "" +
"\x0esource_disk_id\x18\x05 \x01(\rR\fsourceDiskId\x12\x1f\n" +
"\vtarget_node\x18\x06 \x01(\tR\n" +
"targetNode\x12$\n" +
"\x0etarget_disk_id\x18\a \x01(\rR\ftargetDiskId\"\xe1\x01\n" +
"\x0etarget_disk_id\x18\a \x01(\rR\ftargetDiskId\"\x8e\x02\n" +
"\x13EcBalanceTaskConfig\x12/\n" +
"\x13imbalance_threshold\x18\x01 \x01(\x01R\x12imbalanceThreshold\x12(\n" +
"\x10min_server_count\x18\x02 \x01(\x05R\x0eminServerCount\x12+\n" +
"\x11collection_filter\x18\x03 \x01(\tR\x10collectionFilter\x12\x1b\n" +
"\tdisk_type\x18\x04 \x01(\tR\bdiskType\x12%\n" +
"\x0epreferred_tags\x18\x05 \x03(\tR\rpreferredTags\"\xae\a\n" +
"\x0epreferred_tags\x18\x05 \x03(\tR\rpreferredTags\x12+\n" +
"\x11replica_placement\x18\x06 \x01(\tR\x10replicaPlacement\"\xae\a\n" +
"\x13MaintenanceTaskData\x12\x0e\n" +
"\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" +
"\x04type\x18\x02 \x01(\tR\x04type\x12\x1a\n" +
+32
View File
@@ -59,6 +59,38 @@ func CollectVolumeMetricsFromMasters(
// FetchVolumeList dials the given master address (trying both the address as
// given and the gRPC port variant) and returns the master's volume list. Used
// by detection helpers that already know which master address to talk to.
// FetchDefaultReplicaPlacement returns the master's configured default replication
// (GetMasterConfiguration), used by detectors as the replica-placement fallback so
// the plugin path matches the shell. Returns "" if it cannot be fetched, so callers
// fall back to even spread rather than failing detection.
func FetchDefaultReplicaPlacement(ctx context.Context, masterAddresses []string, grpcDialOption grpc.DialOption) string {
if grpcDialOption == nil {
return ""
}
for _, address := range masterAddresses {
for _, candidate := range MasterAddressCandidates(address) {
if ctx.Err() != nil {
return ""
}
dialCtx, cancelDial := context.WithTimeout(ctx, 5*time.Second)
conn, err := pb.GrpcDial(dialCtx, candidate, false, grpcDialOption)
cancelDial()
if err != nil {
continue
}
client := master_pb.NewSeaweedClient(conn)
callCtx, cancelCall := context.WithTimeout(ctx, 10*time.Second)
resp, callErr := client.GetMasterConfiguration(callCtx, &master_pb.GetMasterConfigurationRequest{})
cancelCall()
_ = conn.Close()
if callErr == nil {
return resp.DefaultReplication
}
}
}
return ""
}
func FetchVolumeList(ctx context.Context, address string, grpcDialOption grpc.DialOption) (*master_pb.VolumeListResponse, error) {
var lastErr error
for _, candidate := range MasterAddressCandidates(address) {
+7 -1
View File
@@ -327,7 +327,13 @@ func (sp *ClusterStatusPrinter) printClusterInfo() {
for _, ri := range dci.RackInfos {
for _, dni := range ri.DataNodeInfos {
nodes++
disks += len(dni.DiskInfos)
// The master keys DiskInfos by disk type, so multiple
// same-type physical disks on one node collapse into a single
// entry. Count physical disks via SplitByPhysicalDisk so a node
// with N disks of one type reports N, not 1.
for _, di := range dni.DiskInfos {
disks += len(di.SplitByPhysicalDisk())
}
}
}
}
+59
View File
@@ -51,6 +51,65 @@ func TestPrintClusterInfo(t *testing.T) {
}
}
// TestPrintClusterInfo_multiDiskPerNode covers a node whose several physical
// disks of the same type collapse into a single DiskInfo on the wire (keyed by
// disk type), so counting len(DiskInfos) under-reports the physical disk count.
// Three nodes with six disks each must report 18 disks, not 3.
func TestPrintClusterInfo_multiDiskPerNode(t *testing.T) {
makeNode := func(id string) *master_pb.DataNodeInfo {
var ecShardInfos []*master_pb.VolumeEcShardInformationMessage
// One EC volume per physical disk, each carrying its own DiskId 0..5.
for diskId := uint32(0); diskId < 6; diskId++ {
ecShardInfos = append(ecShardInfos, &master_pb.VolumeEcShardInformationMessage{
Id: diskId + 1,
DiskId: diskId,
EcIndexBits: 1, // a single shard present
})
}
return &master_pb.DataNodeInfo{
Id: id,
DiskInfos: map[string]*master_pb.DiskInfo{
"": {
Type: "",
MaxVolumeCount: 60,
EcShardInfos: ecShardInfos,
},
},
}
}
topo := &master_pb.TopologyInfo{
Id: "multi_disk_topo",
DataCenterInfos: []*master_pb.DataCenterInfo{{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{{
Id: "rack1",
DataNodeInfos: []*master_pb.DataNodeInfo{
makeNode("node1"), makeNode("node2"), makeNode("node3"),
},
}},
}},
}
var buf bytes.Buffer
sp := &ClusterStatusPrinter{
writer: &buf,
humanize: true,
topology: topo,
}
sp.printClusterInfo()
got := buf.String()
want := `cluster:
id: multi_disk_topo
status: unlocked
nodes: 3
topology: 1 DC, 18 disks on 1 rack
`
if got != want {
t.Errorf("multi-disk cluster info:\ngot:\n%s\nwant:\n%s", got, want)
}
}
func TestPrintVolumeInfo(t *testing.T) {
testCases := []struct {
topology *master_pb.TopologyInfo
File diff suppressed because it is too large Load Diff
-377
View File
@@ -1,377 +0,0 @@
package shell
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func TestPickRackForShardType_AntiAffinityRacks(t *testing.T) {
// Setup topology with 3 racks, each with 1 node, enough free slots
topo := &master_pb.TopologyInfo{
Id: "test_topo",
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
buildRackWithEcShards("rack0", "node0:8080", 100, nil),
buildRackWithEcShards("rack1", "node1:8080", 100, nil),
buildRackWithEcShards("rack2", "node2:8080", 100, nil),
},
},
},
}
ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType)
ecb := &ecBalancer{
ecNodes: ecNodes,
diskType: types.HardDriveType,
}
racks := ecb.racks()
rackToShardCount := make(map[string]int)
shardsPerRack := make(map[string][]erasure_coding.ShardId)
maxPerRack := 2
// Case 1: Avoid rack0
antiAffinityRacks := map[string]bool{"rack0": true}
// Try multiple times to ensure randomness doesn't accidentally pass
for i := 0; i < 20; i++ {
picked, err := ecb.pickRackForShardType(racks, shardsPerRack, maxPerRack, rackToShardCount, antiAffinityRacks)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if picked == "rack0" {
t.Errorf("picked avoided rack rack0")
}
}
// Case 2: Fallback - avoid all racks
avoidAll := map[string]bool{"rack0": true, "rack1": true, "rack2": true}
picked, err := ecb.pickRackForShardType(racks, shardsPerRack, maxPerRack, rackToShardCount, avoidAll)
if err != nil {
t.Fatalf("fallback failed: %v", err)
}
if picked == "" {
t.Errorf("expected some rack to be picked in fallback")
}
}
func TestPickRackForShardType_EdgeCases(t *testing.T) {
t.Run("NoFreeSlots", func(t *testing.T) {
topo := &master_pb.TopologyInfo{
Id: "test_topo",
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
buildRackWithEcShards("rack0", "node0:8080", 0, nil), // maxVolumes=0
buildRackWithEcShards("rack1", "node1:8080", 0, nil),
},
},
},
}
ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType)
ecb := &ecBalancer{
ecNodes: ecNodes,
diskType: types.HardDriveType,
}
racks := ecb.racks()
_, err := ecb.pickRackForShardType(racks, make(map[string][]erasure_coding.ShardId), 2, make(map[string]int), nil)
if err == nil {
t.Error("expected error when no free slots, got nil")
}
})
t.Run("AllRacksAtMaxCapacity", func(t *testing.T) {
topo := &master_pb.TopologyInfo{
Id: "test_topo",
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
buildRackWithEcShards("rack0", "node0:8080", 100, nil),
buildRackWithEcShards("rack1", "node1:8080", 100, nil),
},
},
},
}
ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType)
ecb := &ecBalancer{
ecNodes: ecNodes,
diskType: types.HardDriveType,
}
racks := ecb.racks()
shardsPerRack := map[string][]erasure_coding.ShardId{
"rack0": {0, 1}, // 2 shards
"rack1": {2, 3}, // 2 shards
}
maxPerRack := 2
_, err := ecb.pickRackForShardType(racks, shardsPerRack, maxPerRack, make(map[string]int), nil)
if err == nil {
t.Error("expected error when all racks at max capacity, got nil")
}
})
t.Run("ReplicaPlacementLimit", func(t *testing.T) {
topo := &master_pb.TopologyInfo{
Id: "test_topo",
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
buildRackWithEcShards("rack0", "node0:8080", 100, nil),
buildRackWithEcShards("rack1", "node1:8080", 100, nil),
},
},
},
}
ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType)
rp, _ := super_block.NewReplicaPlacementFromString("012") // DiffRackCount = 1
ecb := &ecBalancer{
ecNodes: ecNodes,
diskType: types.HardDriveType,
replicaPlacement: rp,
}
racks := ecb.racks()
rackToShardCount := map[string]int{
"rack0": 1, // At limit
"rack1": 0,
}
picked, err := ecb.pickRackForShardType(racks, make(map[string][]erasure_coding.ShardId), 5, rackToShardCount, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if picked != "rack1" {
t.Errorf("expected rack1 (not at limit), got %v", picked)
}
})
t.Run("PreferFewerShards", func(t *testing.T) {
topo := &master_pb.TopologyInfo{
Id: "test_topo",
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
buildRackWithEcShards("rack0", "node0:8080", 100, nil),
buildRackWithEcShards("rack1", "node1:8080", 100, nil),
buildRackWithEcShards("rack2", "node2:8080", 100, nil),
},
},
},
}
ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType)
ecb := &ecBalancer{
ecNodes: ecNodes,
diskType: types.HardDriveType,
}
racks := ecb.racks()
shardsPerRack := map[string][]erasure_coding.ShardId{
"rack0": {0, 1}, // 2 shards
"rack1": {2}, // 1 shard
"rack2": {}, // 0 shards
}
// Should pick rack2 (fewest shards)
picked, err := ecb.pickRackForShardType(racks, shardsPerRack, 5, make(map[string]int), nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if picked != "rack2" {
t.Errorf("expected rack2 (fewest shards), got %v", picked)
}
})
}
func TestPickNodeForShardType_AntiAffinityNodes(t *testing.T) {
// Setup topology with 1 rack, 3 nodes
topo := &master_pb.TopologyInfo{
Id: "test_topo",
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{
Id: "rack0",
DataNodeInfos: []*master_pb.DataNodeInfo{
buildDataNode("node0:8080", 100),
buildDataNode("node1:8080", 100),
buildDataNode("node2:8080", 100),
},
},
},
},
},
}
ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType)
ecb := &ecBalancer{
ecNodes: ecNodes,
diskType: types.HardDriveType,
}
nodeToShardCount := make(map[string]int)
shardsPerNode := make(map[string][]erasure_coding.ShardId)
maxPerNode := 2
// Case 1: Avoid node0
antiAffinityNodes := map[string]bool{"node0:8080": true}
for i := 0; i < 20; i++ {
picked, err := ecb.pickNodeForShardType(ecNodes, shardsPerNode, maxPerNode, nodeToShardCount, antiAffinityNodes)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if picked.info.Id == "node0:8080" {
t.Errorf("picked avoided node node0")
}
}
}
func TestPickNodeForShardType_EdgeCases(t *testing.T) {
t.Run("NoFreeSlots", func(t *testing.T) {
topo := &master_pb.TopologyInfo{
Id: "test_topo",
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{
Id: "rack0",
DataNodeInfos: []*master_pb.DataNodeInfo{
buildDataNode("node0:8080", 0), // No capacity
},
},
},
},
},
}
ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType)
ecb := &ecBalancer{
ecNodes: ecNodes,
diskType: types.HardDriveType,
}
_, err := ecb.pickNodeForShardType(ecNodes, make(map[string][]erasure_coding.ShardId), 2, make(map[string]int), nil)
if err == nil {
t.Error("expected error when no free slots, got nil")
}
})
t.Run("ReplicaPlacementSameRackLimit", func(t *testing.T) {
topo := &master_pb.TopologyInfo{
Id: "test_topo",
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{
Id: "rack0",
DataNodeInfos: []*master_pb.DataNodeInfo{
buildDataNode("node0:8080", 100),
buildDataNode("node1:8080", 100),
},
},
},
},
},
}
ecNodes, _ := collectEcVolumeServersByDc(topo, "", types.HardDriveType)
rp, _ := super_block.NewReplicaPlacementFromString("021") // SameRackCount = 1
ecb := &ecBalancer{
ecNodes: ecNodes,
diskType: types.HardDriveType,
replicaPlacement: rp,
}
nodeToShardCount := map[string]int{
"node0:8080": 3, // Exceeds SameRackCount + 1
"node1:8080": 0,
}
picked, err := ecb.pickNodeForShardType(ecNodes, make(map[string][]erasure_coding.ShardId), 5, nodeToShardCount, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if picked.info.Id != "node1:8080" {
t.Errorf("expected node1 (not at limit), got %v", picked.info.Id)
}
})
}
func TestShardsByType(t *testing.T) {
vid := needle.VolumeId(123)
// Create mock nodes with shards
nodes := []*EcNode{
{
info: &master_pb.DataNodeInfo{
Id: "node1",
DiskInfos: map[string]*master_pb.DiskInfo{
string(types.HardDriveType): {
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{
{
Id: uint32(vid),
EcIndexBits: uint32((1 << 0) | (1 << 1) | (1 << 10) | (1 << 11)), // data: 0,1 parity: 10,11
},
},
},
},
},
rack: "rack1",
},
}
t.Run("Standard10Plus4", func(t *testing.T) {
dataPerRack, parityPerRack := shardsByTypePerRack(vid, nodes, types.HardDriveType, 10)
if len(dataPerRack["rack1"]) != 2 {
t.Errorf("expected 2 data shards, got %d", len(dataPerRack["rack1"]))
}
if len(parityPerRack["rack1"]) != 2 {
t.Errorf("expected 2 parity shards, got %d", len(parityPerRack["rack1"]))
}
})
t.Run("NodeGrouping", func(t *testing.T) {
dataPerNode, parityPerNode := shardsByTypePerNode(vid, nodes, types.HardDriveType, 10)
if len(dataPerNode["node1"]) != 2 {
t.Errorf("expected 2 data shards on node1, got %d", len(dataPerNode["node1"]))
}
if len(parityPerNode["node1"]) != 2 {
t.Errorf("expected 2 parity shards on node1, got %d", len(parityPerNode["node1"]))
}
})
}
func buildDataNode(nodeId string, maxVolumes int64) *master_pb.DataNodeInfo {
return &master_pb.DataNodeInfo{
Id: nodeId,
DiskInfos: map[string]*master_pb.DiskInfo{
string(types.HardDriveType): {
Type: string(types.HardDriveType),
MaxVolumeCount: maxVolumes,
VolumeCount: 0,
},
},
}
}
+11 -1
View File
@@ -405,9 +405,19 @@ func collectEcNodeShardsInfo(topoInfo *master_pb.TopologyInfo, vid needle.Volume
res := make(map[pb.ServerAddress]*erasure_coding.ShardsInfo)
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
// A node may report several EcShardInfos for one volume — one per
// physical disk holding shards of it (multi-disk nodes). Union them
// rather than overwriting, or only the last disk's shards survive and
// the node looks like it is missing shards it actually has.
for _, v := range diskInfo.EcShardInfos {
if v.Id == uint32(vid) {
res[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(v)
addr := pb.NewServerAddressFromDataNode(dn)
si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(v)
if existing, ok := res[addr]; ok {
existing.Add(si)
} else {
res[addr] = si
}
}
}
}
+44 -22
View File
@@ -340,36 +340,58 @@ func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection m
}
func verifyEcShardsBeforeDelete(commandEnv *CommandEnv, volumeIds []needle.VolumeId, diskType types.DiskType) error {
topoInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return fmt.Errorf("fetch topology for shard verification: %w", err)
}
// Shard relocations from the preceding EC balance reach the master via
// volume-server heartbeats, so freshly distributed shards may not all be
// visible in the master topology immediately. Poll a few times before
// concluding the shard set is incomplete, so a heartbeat-propagation lag is
// not mistaken for missing data (which would abort the encode). Genuine loss
// still fails after the retries are exhausted.
const maxAttempts = 10
const retryInterval = 2 * time.Second
for _, vid := range volumeIds {
nodeShards := collectEcNodeShardsInfo(topoInfo, vid, diskType)
var union erasure_coding.ShardBits
for _, info := range nodeShards {
union = erasure_coding.ShardBits(uint32(union) | info.Bitmap())
var lastErr error
for attempt := 0; attempt < maxAttempts; attempt++ {
topoInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return fmt.Errorf("fetch topology for shard verification: %w", err)
}
totalShards := erasure_coding.TotalShardsCount
if err := erasure_coding.RequireFullShardSet(uint32(vid), union, totalShards); err != nil {
summary := make([]string, 0, len(nodeShards))
for node, info := range nodeShards {
summary = append(summary, fmt.Sprintf("%s=%s", node, info.String()))
lastErr = nil
for _, vid := range volumeIds {
nodeShards := collectEcNodeShardsInfo(topoInfo, vid, diskType)
var union erasure_coding.ShardBits
for _, info := range nodeShards {
union = erasure_coding.ShardBits(uint32(union) | info.Bitmap())
}
sort.Strings(summary)
glog.Errorf("EC shard verification failed for volume %d on diskType %q: %v; observed: %v",
vid, diskType.ReadableString(), err, summary)
return fmt.Errorf("volume %d: %w (observed: %v)", vid, err, summary)
totalShards := erasure_coding.TotalShardsCount
if err := erasure_coding.RequireFullShardSet(uint32(vid), union, totalShards); err != nil {
summary := make([]string, 0, len(nodeShards))
for node, info := range nodeShards {
summary = append(summary, fmt.Sprintf("%s=%s", node, info.String()))
}
sort.Strings(summary)
lastErr = fmt.Errorf("volume %d: %w (observed: %v)", vid, err, summary)
break
}
glog.V(0).Infof("EC shard verification ok for volume %d on diskType %q: %d/%d shards present across %d nodes",
vid, diskType.ReadableString(), union.Count(), totalShards, len(nodeShards))
}
glog.V(0).Infof("EC shard verification ok for volume %d on diskType %q: %d/%d shards present across %d nodes",
vid, diskType.ReadableString(), union.Count(), totalShards, len(nodeShards))
if lastErr == nil {
return nil
}
if attempt < maxAttempts-1 {
glog.V(0).Infof("EC shard verification incomplete (attempt %d/%d), waiting for shard locations to propagate: %v",
attempt+1, maxAttempts, lastErr)
time.Sleep(retryInterval)
}
}
return nil
glog.Errorf("EC shard verification failed after %d attempts: %v", maxAttempts, lastErr)
return lastErr
}
// doDeleteVolumesWithLocations deletes volumes using pre-collected location information
+9 -12
View File
@@ -19,7 +19,7 @@ func TestCommandEcBalanceSmall(t *testing.T) {
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
ecb.balance([]string{"c1"})
}
func TestCommandEcBalanceNothingToMove(t *testing.T) {
@@ -36,7 +36,7 @@ func TestCommandEcBalanceNothingToMove(t *testing.T) {
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
ecb.balance([]string{"c1"})
}
func TestCommandEcBalanceAddNewServers(t *testing.T) {
@@ -55,7 +55,7 @@ func TestCommandEcBalanceAddNewServers(t *testing.T) {
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
ecb.balance([]string{"c1"})
}
func TestCommandEcBalanceAddNewRacks(t *testing.T) {
@@ -74,7 +74,7 @@ func TestCommandEcBalanceAddNewRacks(t *testing.T) {
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
ecb.balance([]string{"c1"})
}
func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) {
@@ -118,8 +118,7 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) {
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
ecb.balanceEcRacks()
ecb.balance([]string{"c1"})
}
func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNode {
@@ -158,7 +157,7 @@ func TestCommandEcBalanceEvenDataAndParityDistribution(t *testing.T) {
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
ecb.balance([]string{"c1"})
// After balancing (dry-run), verify the PLANNED distribution by checking what moves were proposed
// The ecb.ecNodes state is updated during dry-run to track planned moves
@@ -262,7 +261,7 @@ func TestCommandEcBalanceMultipleVolumesEvenDistribution(t *testing.T) {
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
ecb.balance([]string{"c1"})
// Check both volumes
for _, vid := range []needle.VolumeId{1, 2} {
@@ -316,8 +315,7 @@ func TestCommandEcBalanceAllNodesShareAllVolumes(t *testing.T) {
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
ecb.balanceEcRacks()
ecb.balance([]string{"c1"})
// Count total shards per node after balancing
for _, node := range ecb.ecNodes {
@@ -396,8 +394,7 @@ func TestCommandEcBalanceIssue8793Topology(t *testing.T) {
t.Logf("BEFORE node %s (max %d): %d shards", node.info.Id, node.freeEcSlot+count, count)
}
ecb.balanceEcVolumes("cldata")
ecb.balanceEcRacks()
ecb.balance([]string{"cldata"})
// Verify: no node should exceed the average
totalShards := 0
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,431 @@
package ecbalancer
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
)
func bits(ids ...int) erasure_coding.ShardBits {
var b erasure_coding.ShardBits
for _, id := range ids {
b = b.Set(erasure_coding.ShardId(id))
}
return b
}
// addEmptyNode adds an EC-empty destination node with six disks and capacity.
func addEmptyNode(t *Topology, id, rackKey string) {
n := t.AddNode(id, "dc1", rackKey, 600)
for d := uint32(1); d <= 6; d++ {
n.AddDisk(d, "", 100, 0)
}
}
func ratio(d, p int) func(string) (int, int) {
return func(string) (int, int) { return d, p }
}
func TestPickBestDiskOnNode(t *testing.T) {
const vid = uint32(100)
const ds = erasure_coding.DataShardsCount
vk := volKey{collection: "c", vid: vid}
t.Run("skips disks with no free slots", func(t *testing.T) {
topo := NewTopology()
n := topo.AddNode("n1", "dc1", "dc1:r1", 100)
n.AddDisk(1, "", 0, 0)
n.AddDisk(2, "", 10, 0)
if got := pickBestDiskOnNode(n, vk, "", 0, ds); got != 2 {
t.Errorf("got disk %d, want 2", got)
}
})
t.Run("spreads a volume's shards across disks", func(t *testing.T) {
topo := NewTopology()
n := topo.AddNode("n1", "dc1", "dc1:r1", 100)
n.AddDisk(1, "", 10, 1)
n.AddDisk(2, "", 10, 0)
n.AddShards(vid, "c", 1, bits(0))
if got := pickBestDiskOnNode(n, vk, "", 5, ds); got != 2 {
t.Errorf("got disk %d, want 2 (disk 1 already holds this volume)", got)
}
})
t.Run("data shard avoids disk holding parity", func(t *testing.T) {
topo := NewTopology()
n := topo.AddNode("n1", "dc1", "dc1:r1", 100)
n.AddDisk(1, "", 10, 1)
n.AddDisk(2, "", 10, 0)
n.AddShards(vid, "c", 1, bits(ds)) // parity on disk 1
if got := pickBestDiskOnNode(n, vk, "", 0, ds); got != 2 {
t.Errorf("got disk %d, want 2 (anti-affinity)", got)
}
})
t.Run("anti-affinity follows the supplied ratio boundary", func(t *testing.T) {
topo := NewTopology()
n := topo.AddNode("n1", "dc1", "dc1:r1", 100)
n.AddDisk(1, "", 10, 1)
n.AddDisk(2, "", 10, 2)
n.AddShards(vid, "c", 1, bits(7)) // parity at 6+3
n.AddShards(vid, "c", 2, bits(2)) // data
if got := pickBestDiskOnNode(n, vk, "", 1, 6); got != 2 {
t.Errorf("ratio 6: got disk %d, want 2", got)
}
if got := pickBestDiskOnNode(n, vk, "", 1, erasure_coding.DataShardsCount); got != 1 {
t.Errorf("boundary 10: got disk %d, want 1", got)
}
})
t.Run("only matching disk type when set", func(t *testing.T) {
topo := NewTopology()
n := topo.AddNode("n1", "dc1", "dc1:r1", 100)
n.AddDisk(1, "ssd", 10, 0)
n.AddDisk(2, "hdd", 10, 0)
if got := pickBestDiskOnNode(n, vk, "hdd", 0, ds); got != 2 {
t.Errorf("got disk %d, want 2 (only hdd)", got)
}
})
}
func TestPlanSourceDiskAttribution(t *testing.T) {
shardsByDisk := map[uint32][]int{0: {0, 1, 2}, 1: {3, 4, 5}, 2: {6, 7}, 3: {8, 9}, 4: {10, 11}, 5: {12, 13}}
shardToDisk := map[int]uint32{}
for d, ss := range shardsByDisk {
for _, s := range ss {
shardToDisk[s] = d
}
}
topo := NewTopology()
src := topo.AddNode("node1", "dc1", "dc1:rack1", 0)
for d, ss := range shardsByDisk {
src.AddDisk(d, "", 0, len(ss))
src.AddShards(100, "col1", d, bits(ss...))
}
addEmptyNode(topo, "node2", "dc1:rack2")
moves := Plan(topo, Options{ImbalanceThreshold: 0.01, Ratio: ratio(10, 4)})
if len(moves) == 0 {
t.Fatal("expected moves")
}
for _, m := range moves {
if m.Phase != "cross_rack" {
continue
}
if want := shardToDisk[m.ShardID]; m.SourceDisk != want {
t.Errorf("shard %d: source disk %d, want %d", m.ShardID, m.SourceDisk, want)
}
}
}
func TestPlanSpreadsAcrossDestinationDisks(t *testing.T) {
topo := NewTopology()
src := topo.AddNode("node1", "dc1", "dc1:rack1", 0)
src.AddDisk(0, "", 0, 14)
src.AddShards(100, "col1", 0, bits(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
addEmptyNode(topo, "node2", "dc1:rack2")
moves := Plan(topo, Options{ImbalanceThreshold: 0.01, Ratio: ratio(10, 4)})
distinct := map[uint32]bool{}
crossRack := 0
for _, m := range moves {
if m.Phase == "cross_rack" {
crossRack++
distinct[m.TargetDisk] = true
}
}
if crossRack != 7 {
t.Fatalf("got %d cross-rack moves, want 7", crossRack)
}
if len(distinct) != 6 {
t.Errorf("cross-rack moves used %d distinct disks, want 6: %v", len(distinct), distinct)
}
}
func TestPlanCrossRackParityAntiAffinity(t *testing.T) {
topo := NewTopology()
src := topo.AddNode("node1", "dc1", "dc1:rack1", 0)
src.AddDisk(0, "", 0, 3)
src.AddShards(100, "col1", 0, bits(0, 1, 2)) // 1 data + 2 parity at ratio 1+2
addEmptyNode(topo, "node2", "dc1:rack2")
addEmptyNode(topo, "node3", "dc1:rack3")
moves := Plan(topo, Options{ImbalanceThreshold: 0.01, Ratio: ratio(1, 2)})
if len(moves) == 0 {
t.Fatal("expected parity moves across racks")
}
for _, m := range moves {
if m.ShardID < 1 {
t.Errorf("data shard %d moved; it fits in rack1", m.ShardID)
}
if m.TargetNode == "node1" {
t.Errorf("parity shard %d moved onto data-bearing node1", m.ShardID)
}
}
}
func TestWithinRackParityAntiAffinity(t *testing.T) {
// Test the within-rack phase in isolation (the global phase, which balances
// total load, would otherwise also act on this single rack).
topo := NewTopology()
src := topo.AddNode("node1", "dc1", "dc1:rack1", 600)
src.AddDisk(0, "", 600, 3)
src.AddShards(100, "col1", 0, bits(0, 1, 2))
addEmptyNode(topo, "node2", "dc1:rack1")
addEmptyNode(topo, "node3", "dc1:rack1")
racks := buildRacks(topo.nodes)
moves := detectWithinRackImbalance(volKey{collection: "col1", vid: 100}, topo.nodes, racks, "", 0.01, 1, 2, nil)
if len(moves) == 0 {
t.Fatal("expected parity moves within rack")
}
for _, m := range moves {
if m.shardID < 1 {
t.Errorf("data shard %d moved; it fits on node1", m.shardID)
}
if m.target.id == "node1" {
t.Errorf("parity shard %d moved onto data-bearing node1", m.shardID)
}
}
}
func TestPlanReplicaPlacementCapsPerRack(t *testing.T) {
build := func() *Topology {
topo := NewTopology()
src := topo.AddNode("node1", "dc1", "dc1:rack1", 0)
src.AddDisk(0, "", 0, 6)
src.AddShards(100, "col1", 0, bits(0, 1, 2, 3, 4, 5)) // all data at ratio 6+0
addEmptyNode(topo, "node2", "dc1:rack2")
addEmptyNode(topo, "node3", "dc1:rack3")
return topo
}
countCross := func(moves []Move) int {
n := 0
for _, m := range moves {
if m.Phase == "cross_rack" {
n++
}
}
return n
}
if got := countCross(Plan(build(), Options{ImbalanceThreshold: 0.01, Ratio: ratio(6, 0)})); got != 4 {
t.Fatalf("without replica placement: %d cross-rack moves, want 4", got)
}
rp := &super_block.ReplicaPlacement{DiffRackCount: 1}
if got := countCross(Plan(build(), Options{ImbalanceThreshold: 0.01, ReplicaPlacement: rp, Ratio: ratio(6, 0)})); got != 2 {
t.Errorf("with DiffRackCount=1: %d cross-rack moves, want 2", got)
}
}
func TestPlanDedup(t *testing.T) {
topo := NewTopology()
n1 := topo.AddNode("node1", "dc1", "dc1:rack1", 5)
n1.AddDisk(0, "", 5, 2)
n1.AddShards(100, "col1", 0, bits(0, 1))
n2 := topo.AddNode("node2", "dc1", "dc1:rack2", 10)
n2.AddDisk(0, "", 10, 1)
n2.AddShards(100, "col1", 0, bits(0)) // shard 0 duplicated on node2
var dedup []Move
for _, m := range Plan(topo, Options{ImbalanceThreshold: 0.01, Ratio: ratio(10, 4)}) {
if m.Phase == "dedup" {
dedup = append(dedup, m)
}
}
if len(dedup) != 1 {
t.Fatalf("got %d dedup moves, want 1", len(dedup))
}
if dedup[0].ShardID != 0 || dedup[0].SourceNode != "node1" || dedup[0].TargetNode != "node1" {
t.Errorf("dedup move = %+v, want shard 0 deleted on node1 (fewer free slots)", dedup[0])
}
}
func TestCeilDivide(t *testing.T) {
for _, tc := range []struct{ a, b, want int }{{14, 3, 5}, {10, 3, 4}, {0, 5, 0}, {5, 0, 0}} {
if got := ceilDivide(tc.a, tc.b); got != tc.want {
t.Errorf("ceilDivide(%d,%d)=%d want %d", tc.a, tc.b, got, tc.want)
}
}
}
func allBits(n int) erasure_coding.ShardBits {
var b erasure_coding.ShardBits
for i := 0; i < n; i++ {
b = b.Set(erasure_coding.ShardId(i))
}
return b
}
func TestGlobalImbalanceMovesFromFullToEmpty(t *testing.T) {
topo := NewTopology()
n1 := topo.AddNode("node1", "dc1", "dc1:rack1", 5)
n1.AddShards(100, "col1", 0, allBits(14))
n1.AddShards(200, "col1", 0, allBits(6))
n2 := topo.AddNode("node2", "dc1", "dc1:rack1", 30)
n2.AddShards(300, "col1", 0, allBits(2))
moves := detectGlobalImbalance(topo.nodes, buildRacks(topo.nodes), "", 0.01, nil, 0, true)
if len(moves) == 0 {
t.Fatal("expected global balance moves")
}
for _, m := range moves {
if m.phase != "global" || m.source.id != "node1" || m.target.id != "node2" {
t.Errorf("move = %+v, want global node1->node2", m)
}
}
}
// TestGlobalImbalanceHeterogeneousCapacity: node1 holds more shards but is less
// utilized (high capacity); moves must drain the more-utilized node2.
func TestGlobalImbalanceHeterogeneousCapacity(t *testing.T) {
topo := NewTopology()
n1 := topo.AddNode("node1", "dc1", "dc1:rack1", 90)
n1.AddShards(100, "col1", 0, allBits(10))
n2 := topo.AddNode("node2", "dc1", "dc1:rack1", 2)
n2.AddShards(200, "col1", 0, allBits(3))
moves := detectGlobalImbalance(topo.nodes, buildRacks(topo.nodes), "", 0.01, nil, 0, true)
if len(moves) == 0 {
t.Fatal("expected moves from high-util node2 to low-util node1")
}
seen := map[[2]int]bool{}
for _, m := range moves {
if m.source.id != "node2" || m.target.id != "node1" {
t.Errorf("move = %+v, want node2->node1", m)
}
key := [2]int{int(m.volumeID), m.shardID}
if seen[key] {
t.Errorf("duplicate move for volume %d shard %d", m.volumeID, m.shardID)
}
seen[key] = true
}
}
func TestGlobalImbalanceSkipsFullNodes(t *testing.T) {
topo := NewTopology()
n1 := topo.AddNode("node1", "dc1", "dc1:rack1", 10)
n1.AddShards(100, "col1", 0, allBits(14))
n2 := topo.AddNode("node2", "dc1", "dc1:rack1", 0) // full, cannot receive
n2.AddShards(200, "col1", 0, allBits(2))
if moves := detectGlobalImbalance(topo.nodes, buildRacks(topo.nodes), "", 0.01, nil, 0, true); len(moves) != 0 {
t.Fatalf("expected 0 moves (node2 full), got %d", len(moves))
}
}
// TestPlanBalancesSkewedDataParityWithEvenTotals guards the per-type gate: two
// racks hold equal shard totals (7 each) but the data shards are skewed (7 vs 3).
// A total-count gate would skip balancing; the per-type gate must still act.
func TestPlanBalancesSkewedDataParityWithEvenTotals(t *testing.T) {
topo := NewTopology()
n1 := topo.AddNode("node1", "dc1", "dc1:rack1", 100)
n1.AddDisk(0, "", 100, 7)
n1.AddShards(100, "col1", 0, bits(0, 1, 2, 3, 4, 5, 6)) // 7 data shards
n2 := topo.AddNode("node2", "dc1", "dc1:rack2", 100)
n2.AddDisk(0, "", 100, 7)
n2.AddShards(100, "col1", 0, bits(7, 8, 9, 10, 11, 12, 13)) // 3 data + 4 parity
moves := Plan(topo, Options{ImbalanceThreshold: 0, Ratio: ratio(10, 4)})
crossRack, dataMoved := 0, 0
for _, m := range moves {
if m.Phase == "cross_rack" {
crossRack++
if m.ShardID < 10 {
dataMoved++
}
}
}
if crossRack == 0 {
t.Fatal("even totals masked a data/parity skew: no cross-rack moves produced")
}
if dataMoved == 0 {
t.Error("expected skewed data shards to rebalance across racks")
}
}
// TestGlobalPrefersVolumeAbsentFromDestination guards the global phase's
// volume-diversity preference: when draining a node, move a shard of a volume the
// destination does not hold at all before piling a second shard of an
// already-present volume onto it. node1 (full) holds vol100 and vol200; node2
// (empty) holds only vol100, so the first global move should be a vol200 shard.
func TestGlobalPrefersVolumeAbsentFromDestination(t *testing.T) {
topo := NewTopology()
n1 := topo.AddNode("node1", "dc1", "dc1:rack1", 0)
n1.AddShards(100, "col1", 0, bits(0, 1))
n1.AddShards(200, "col1", 0, bits(0, 1))
n2 := topo.AddNode("node2", "dc1", "dc1:rack1", 3)
n2.AddShards(100, "col1", 0, bits(2))
moves := detectGlobalImbalance(topo.nodes, buildRacks(topo.nodes), "", 0.01, nil, 0, true)
if len(moves) == 0 {
t.Fatal("expected a global move from the full node")
}
if moves[0].volumeID != 200 {
t.Errorf("first global move is volume %d, want 200 (the volume absent from node2)", moves[0].volumeID)
}
for _, m := range moves {
if m.source.id != "node1" || m.target.id != "node2" {
t.Errorf("move %+v, want node1->node2", m)
}
}
}
// TestPlanKeepsCollectionsWithSameVolumeIdDistinct guards EC identity: a numeric
// volume id reused across collections must not be merged. (A,5) on node1 and
// (B,5) on node2 are different volumes, so neither dedup nor any move should
// treat them as copies of one another.
func TestPlanKeepsCollectionsWithSameVolumeIdDistinct(t *testing.T) {
topo := NewTopology()
n1 := topo.AddNode("node1", "dc1", "dc1:rack1", 100)
n1.AddDisk(0, "", 100, 1)
n1.AddShards(5, "A", 0, bits(0))
n2 := topo.AddNode("node2", "dc1", "dc1:rack2", 100)
n2.AddDisk(0, "", 100, 1)
n2.AddShards(5, "B", 0, bits(0))
for _, m := range Plan(topo, Options{ImbalanceThreshold: 0.01, Ratio: ratio(10, 4)}) {
if m.Phase == "dedup" {
t.Errorf("dedup %+v: (A,5) and (B,5) are different volumes and must not be deduped", m)
}
}
}
// TestDedupFreesCapacityForLaterPhases checks that capacity opened by deleting a
// duplicate is usable by a later phase in the same Plan. node2 is full (0 free)
// but holds a duplicate of node1's shard 0; node1 is roomier, so dedup deletes
// node2's copy, freeing a slot. The within-rack phase must then be able to move a
// shard onto node2.
func TestDedupFreesCapacityForLaterPhases(t *testing.T) {
topo := NewTopology()
n1 := topo.AddNode("node1", "dc1", "dc1:rack1", 5)
n1.AddDisk(0, "", 5, 7)
n1.AddShards(100, "col1", 0, bits(0, 1, 2, 3, 4, 5, 6))
n2 := topo.AddNode("node2", "dc1", "dc1:rack1", 0) // full
n2.AddDisk(0, "", 0, 1)
n2.AddShards(100, "col1", 0, bits(0)) // duplicate of node1's shard 0
moves := Plan(topo, Options{ImbalanceThreshold: 0.01, Ratio: ratio(10, 4)})
dedup := false
toNode2 := 0
for _, m := range moves {
if m.Phase == "dedup" {
dedup = true
continue
}
if m.TargetNode == "node2" {
toNode2++
}
}
if !dedup {
t.Fatal("expected a dedup move for the duplicated shard 0")
}
if toNode2 == 0 {
t.Error("slot freed by dedup on node2 was not usable by a later phase")
}
}
+17 -1
View File
@@ -17,7 +17,8 @@ type Config struct {
CollectionFilter string `json:"collection_filter"`
DiskType string `json:"disk_type"`
PreferredTags []string `json:"preferred_tags"`
DataCenterFilter string `json:"-"` // per-detection-run, not persisted
ReplicaPlacement string `json:"replica_placement"` // e.g. "020"; empty falls back to the master default replication (even spread only when that default is empty or zero)
DataCenterFilter string `json:"-"` // per-detection-run, not persisted
}
// NewDefaultConfig creates a new default EC balance configuration
@@ -155,6 +156,19 @@ func GetConfigSpec() base.ConfigSpec {
InputType: "text",
CSSClasses: "form-control",
},
{
Name: "replica_placement",
JSONName: "replica_placement",
Type: config.FieldTypeString,
DefaultValue: "",
Required: false,
DisplayName: "Replica Placement",
Description: "EC shard replica placement constraint (e.g. 020)",
HelpText: "Leave empty to use the master default replication (even spread only when that default is empty or zero). When set, limits shards per rack/node per the placement digits (dc/rack/node)",
Placeholder: "020",
InputType: "text",
CSSClasses: "form-control",
},
},
}
}
@@ -174,6 +188,7 @@ func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
CollectionFilter: c.CollectionFilter,
DiskType: c.DiskType,
PreferredTags: preferredTagsCopy,
ReplicaPlacement: c.ReplicaPlacement,
},
},
}
@@ -195,6 +210,7 @@ func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
c.CollectionFilter = ecbConfig.CollectionFilter
c.DiskType = ecbConfig.DiskType
c.PreferredTags = append([]string(nil), ecbConfig.PreferredTags...)
c.ReplicaPlacement = ecbConfig.ReplicaPlacement
}
return nil
+148 -734
View File
@@ -3,56 +3,25 @@ package ec_balance
import (
"context"
"fmt"
"math"
"sort"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/ecbalancer"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
storagetypes "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// ecNodeInfo represents a volume server with EC shard information for detection
type ecNodeInfo struct {
nodeID string
address string
dc string
rack string // dc:rack composite key
freeSlots int
// volumeID -> shardBits (bitmask of shard IDs present on this node)
ecShards map[uint32]*ecVolumeInfo
}
type ecVolumeInfo struct {
collection string
shardBits uint32 // bitmask
diskID uint32
}
// ecRackInfo represents a rack with EC node information
type ecRackInfo struct {
nodes map[string]*ecNodeInfo
freeSlots int
}
// shardMove represents a proposed EC shard move
type shardMove struct {
volumeID uint32
shardID int
collection string
source *ecNodeInfo
sourceDisk uint32
target *ecNodeInfo
targetDisk uint32
phase string // "dedup", "cross_rack", "within_rack", "global"
}
// Detection implements the multi-phase EC shard balance detection algorithm.
// It analyzes EC shard distribution and proposes moves to achieve even distribution.
// Detection builds an EC balance topology snapshot from the cluster's active
// topology, runs the shared ecbalancer planner, and converts the planned moves
// into worker task proposals. The balancing policy lives in
// weed/storage/erasure_coding/ecbalancer, shared with the shell ec.balance
// command so the two cannot drift.
func Detection(
ctx context.Context,
metrics []*types.VolumeHealthMetrics,
@@ -72,139 +41,114 @@ func Detection(
if clusterInfo == nil || clusterInfo.ActiveTopology == nil {
return nil, false, fmt.Errorf("active topology not available for EC balance detection")
}
topoInfo := clusterInfo.ActiveTopology.GetTopologyInfo()
if topoInfo == nil {
return nil, false, fmt.Errorf("topology info not available")
}
// Build EC topology view
nodes, racks := buildECTopology(topoInfo, ecConfig)
if len(nodes) < ecConfig.MinServerCount {
glog.V(1).Infof("EC balance: only %d servers, need at least %d", len(nodes), ecConfig.MinServerCount)
topo, nodeCount := buildBalancerTopology(topoInfo, ecConfig)
if nodeCount < ecConfig.MinServerCount {
glog.V(1).Infof("EC balance: only %d servers, need at least %d", nodeCount, ecConfig.MinServerCount)
return nil, false, nil
}
// Collect all EC volumes grouped by collection
collections := collectECCollections(nodes, ecConfig)
if len(collections) == 0 {
glog.V(1).Infof("EC balance: no EC volumes found matching filters")
replicaPlacement := resolveReplicaPlacement(ecConfig, clusterInfo)
if ctx != nil {
if err := ctx.Err(); err != nil {
return nil, false, err
}
}
// Canonical disk type for placement/execution: "hdd" -> "" (HardDriveType),
// matching the topology's disk keys and the volume server's move RPCs.
normalizedDiskType := storagetypes.ToDiskType(ecConfig.DiskType).String()
moves := ecbalancer.Plan(topo, ecbalancer.Options{
DiskType: normalizedDiskType,
ImbalanceThreshold: ecConfig.ImbalanceThreshold,
ReplicaPlacement: replicaPlacement,
Ratio: func(collection string) (int, int) {
return resolveECRatio(clusterInfo, collection)
},
// Move incrementally across detection cycles rather than draining a rack
// in one batch; the scheduler re-evaluates each cycle.
GlobalMaxMovesPerRack: 10,
// Balance heterogeneous-capacity racks by fractional fullness.
GlobalUtilizationBased: true,
})
if len(moves) == 0 {
return nil, false, nil
}
threshold := ecConfig.ImbalanceThreshold
var allMoves []*shardMove
// Build set of allowed collections for global phase filtering
allowedVids := make(map[uint32]bool)
for _, volumeIDs := range collections {
for _, vid := range volumeIDs {
allowedVids[vid] = true
}
}
for collection, volumeIDs := range collections {
if ctx != nil {
if err := ctx.Err(); err != nil {
return nil, false, err
}
}
// Phase 1: Detect duplicate shards (always run, duplicates are errors not imbalance)
for _, vid := range volumeIDs {
moves := detectDuplicateShards(vid, collection, nodes, ecConfig.DiskType)
applyMovesToTopology(moves)
allMoves = append(allMoves, moves...)
}
// Phase 2: Balance shards across racks (operates on updated topology from phase 1)
for _, vid := range volumeIDs {
moves := detectCrossRackImbalance(vid, collection, nodes, racks, ecConfig.DiskType, threshold)
applyMovesToTopology(moves)
allMoves = append(allMoves, moves...)
}
// Phase 3: Balance shards within racks (operates on updated topology from phases 1-2)
for _, vid := range volumeIDs {
moves := detectWithinRackImbalance(vid, collection, nodes, racks, ecConfig.DiskType, threshold)
applyMovesToTopology(moves)
allMoves = append(allMoves, moves...)
}
}
// Phase 4: Global node balance across racks (only for volumes in allowed collections)
globalMoves := detectGlobalImbalance(nodes, racks, ecConfig, allowedVids)
allMoves = append(allMoves, globalMoves...)
// Cap results
hasMore := false
if maxResults > 0 && len(allMoves) > maxResults {
allMoves = allMoves[:maxResults]
if maxResults > 0 && len(moves) > maxResults {
moves = moves[:maxResults]
hasMore = true
}
// Convert moves to TaskDetectionResults
now := time.Now()
results := make([]*types.TaskDetectionResult, 0, len(allMoves))
for i, move := range allMoves {
// Include loop index and source/target in TaskID for uniqueness
results := make([]*types.TaskDetectionResult, 0, len(moves))
for i, m := range moves {
taskID := fmt.Sprintf("ec_balance_%d_%d_%s_%s_%d_%d",
move.volumeID, move.shardID,
move.source.nodeID, move.target.nodeID,
now.UnixNano(), i)
result := &types.TaskDetectionResult{
m.VolumeID, m.ShardID, m.SourceNode, m.TargetNode, now.UnixNano(), i)
results = append(results, &types.TaskDetectionResult{
TaskID: taskID,
TaskType: types.TaskTypeECBalance,
VolumeID: move.volumeID,
Server: move.source.nodeID,
Collection: move.collection,
Priority: movePhasePriority(move.phase),
Reason: fmt.Sprintf("EC shard %d.%d %s: %s → %s (%s)",
move.volumeID, move.shardID, move.phase,
move.source.nodeID, move.target.nodeID, move.phase),
VolumeID: m.VolumeID,
Server: m.SourceNode,
Collection: m.Collection,
Priority: movePhasePriority(m.Phase),
Reason: fmt.Sprintf("EC shard %d.%d %s: %s → %s",
m.VolumeID, m.ShardID, m.Phase, m.SourceNode, m.TargetNode),
ScheduleAt: now,
TypedParams: &worker_pb.TaskParams{
TaskId: taskID,
VolumeId: move.volumeID,
Collection: move.collection,
VolumeId: m.VolumeID,
Collection: m.Collection,
Sources: []*worker_pb.TaskSource{{
Node: move.source.address,
DiskId: move.sourceDisk,
Rack: move.source.rack,
ShardIds: []uint32{uint32(move.shardID)},
Node: m.SourceNode,
DiskId: m.SourceDisk,
Rack: m.SourceRack,
ShardIds: []uint32{uint32(m.ShardID)},
}},
Targets: []*worker_pb.TaskTarget{{
Node: move.target.address,
DiskId: move.targetDisk,
Rack: move.target.rack,
ShardIds: []uint32{uint32(move.shardID)},
Node: m.TargetNode,
DiskId: m.TargetDisk,
Rack: m.TargetRack,
ShardIds: []uint32{uint32(m.ShardID)},
}},
TaskParams: &worker_pb.TaskParams_EcBalanceParams{
EcBalanceParams: &worker_pb.EcBalanceTaskParams{
DiskType: ecConfig.DiskType,
DiskType: normalizedDiskType,
TimeoutSeconds: 600,
},
},
},
}
results = append(results, result)
})
}
glog.V(1).Infof("EC balance detection: %d moves proposed across %d collections",
len(results), len(collections))
glog.V(1).Infof("EC balance detection: %d moves proposed", len(results))
return results, hasMore, nil
}
// buildECTopology constructs EC node and rack structures from topology info.
// Rack keys are dc:rack composites to avoid cross-DC name collisions.
// Only racks with eligible nodes (matching disk type, having EC shards or capacity) are included.
func buildECTopology(topoInfo *master_pb.TopologyInfo, config *Config) (map[string]*ecNodeInfo, map[string]*ecRackInfo) {
nodes := make(map[string]*ecNodeInfo)
racks := make(map[string]*ecRackInfo)
// buildBalancerTopology builds an ecbalancer.Topology from the master topology,
// applying the data-center, disk-type, and collection filters. Rack keys are
// dc:rack composites to avoid cross-DC name collisions. Per-disk free capacity
// is split evenly from the node total because the wire collapses same-type disks.
// Returns the topology and the number of eligible nodes (for MinServerCount).
func buildBalancerTopology(topoInfo *master_pb.TopologyInfo, config *Config) (*ecbalancer.Topology, int) {
topo := ecbalancer.NewTopology()
allowedCollections := wildcard.CompileWildcardMatchers(config.CollectionFilter)
// Normalize the disk-type filter: "hdd" (and the default "") map to the
// HardDriveType, which the topology reports under the empty-string key. Keep a
// separate "filter requested" flag so a configured "hdd" still filters to HDD
// disks instead of being mistaken for "all disk types".
filterByDiskType := config.DiskType != ""
wantDiskType := storagetypes.ToDiskType(config.DiskType).String()
nodeCount := 0
for _, dc := range topoInfo.DataCenterInfos {
if config.DataCenterFilter != "" {
matchers := wildcard.CompileWildcardMatchers(config.DataCenterFilter)
@@ -214,41 +158,38 @@ func buildECTopology(topoInfo *master_pb.TopologyInfo, config *Config) (map[stri
}
for _, rack := range dc.RackInfos {
// Use dc:rack composite key to avoid cross-DC name collisions
rackKey := dc.Id + ":" + rack.Id
for _, dn := range rack.DataNodeInfos {
node := &ecNodeInfo{
nodeID: dn.Id,
address: dn.Id,
dc: dc.Id,
rack: rackKey,
ecShards: make(map[uint32]*ecVolumeInfo),
}
freeSlots := 0
diskTypeOf := make(map[uint32]string) // physical disk_id -> disk type
diskShardCount := make(map[uint32]int)
hasMatchingDisk := false
for diskType, diskInfo := range dn.DiskInfos {
if config.DiskType != "" && diskType != config.DiskType {
if filterByDiskType && diskType != wantDiskType {
continue
}
hasMatchingDisk = true
freeSlots := int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countEcShards(diskInfo.EcShardInfos)
if freeSlots > 0 {
node.freeSlots += freeSlots
fs := int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countEcShards(diskInfo.EcShardInfos)
if fs > 0 {
freeSlots += fs
}
for _, ecShardInfo := range diskInfo.EcShardInfos {
vid := ecShardInfo.Id
existing, ok := node.ecShards[vid]
if !ok {
existing = &ecVolumeInfo{
collection: ecShardInfo.Collection,
diskID: ecShardInfo.DiskId,
}
node.ecShards[vid] = existing
// Discover physical disks from regular volumes too, so an
// EC-empty disk is still a candidate destination.
for _, vi := range diskInfo.VolumeInfos {
if _, ok := diskTypeOf[vi.DiskId]; !ok {
diskTypeOf[vi.DiskId] = diskType
}
existing.shardBits |= ecShardInfo.EcIndexBits
}
for _, eci := range diskInfo.EcShardInfos {
if _, ok := diskTypeOf[eci.DiskId]; !ok {
diskTypeOf[eci.DiskId] = diskType
}
// Disk occupancy counts ALL volumes' shards (capacity model),
// independent of the collection filter below.
diskShardCount[eci.DiskId] += erasure_coding.GetShardCount(eci)
}
}
@@ -256,583 +197,79 @@ func buildECTopology(topoInfo *master_pb.TopologyInfo, config *Config) (map[stri
continue
}
nodes[dn.Id] = node
node := topo.AddNode(dn.Id, dc.Id, rackKey, freeSlots)
// Only create rack entry when we have an eligible node
if _, ok := racks[rackKey]; !ok {
racks[rackKey] = &ecRackInfo{nodes: make(map[string]*ecNodeInfo)}
perDiskFree := 0
if diskCount := len(diskTypeOf); diskCount > 0 && freeSlots > 0 {
perDiskFree = freeSlots / diskCount
}
racks[rackKey].nodes[dn.Id] = node
racks[rackKey].freeSlots += node.freeSlots
}
}
}
return nodes, racks
}
// collectECCollections groups EC volume IDs by collection, applying filters
func collectECCollections(nodes map[string]*ecNodeInfo, config *Config) map[string][]uint32 {
allowedCollections := wildcard.CompileWildcardMatchers(config.CollectionFilter)
// Collect unique volume IDs per collection
collectionVids := make(map[string]map[uint32]bool)
for _, node := range nodes {
for vid, info := range node.ecShards {
if len(allowedCollections) > 0 && !wildcard.MatchesAnyWildcard(allowedCollections, info.collection) {
continue
}
if _, ok := collectionVids[info.collection]; !ok {
collectionVids[info.collection] = make(map[uint32]bool)
}
collectionVids[info.collection][vid] = true
}
}
// Convert to sorted slices
result := make(map[string][]uint32, len(collectionVids))
for collection, vids := range collectionVids {
vidSlice := make([]uint32, 0, len(vids))
for vid := range vids {
vidSlice = append(vidSlice, vid)
}
sort.Slice(vidSlice, func(i, j int) bool { return vidSlice[i] < vidSlice[j] })
result[collection] = vidSlice
}
return result
}
// detectDuplicateShards finds shards that exist on multiple nodes.
// Duplicates are always returned regardless of threshold since they are data errors.
func detectDuplicateShards(vid uint32, collection string, nodes map[string]*ecNodeInfo, diskType string) []*shardMove {
// Build shard -> list of nodes mapping
shardLocations := make(map[int][]*ecNodeInfo)
for _, node := range nodes {
info, ok := node.ecShards[vid]
if !ok {
continue
}
for shardID := 0; shardID < erasure_coding.MaxShardCount; shardID++ {
if info.shardBits&(1<<uint(shardID)) != 0 {
shardLocations[shardID] = append(shardLocations[shardID], node)
}
}
}
var moves []*shardMove
for shardID, locs := range shardLocations {
if len(locs) <= 1 {
continue
}
// Keep the copy on the node with most free slots (ascending sort, keep last)
sort.Slice(locs, func(i, j int) bool { return locs[i].freeSlots < locs[j].freeSlots })
// Propose deletion of all other copies (skip the keeper at the end).
// Set target=source so isDedupPhase() recognizes this as unmount+delete only.
for _, node := range locs[:len(locs)-1] {
moves = append(moves, &shardMove{
volumeID: vid,
shardID: shardID,
collection: collection,
source: node,
sourceDisk: ecShardDiskID(node, vid),
target: node,
targetDisk: ecShardDiskID(node, vid),
phase: "dedup",
})
}
}
return moves
}
// detectCrossRackImbalance detects shards that should be moved across racks for even distribution.
// Returns nil if imbalance is below the threshold.
func detectCrossRackImbalance(vid uint32, collection string, nodes map[string]*ecNodeInfo, racks map[string]*ecRackInfo, diskType string, threshold float64) []*shardMove {
numRacks := len(racks)
if numRacks <= 1 {
return nil
}
// Count shards per rack for this volume
rackShardCount := make(map[string]int)
rackShardNodes := make(map[string][]*ecNodeInfo)
totalShards := 0
for _, node := range nodes {
info, ok := node.ecShards[vid]
if !ok {
continue
}
count := shardBitCount(info.shardBits)
if count > 0 {
rackShardCount[node.rack] += count
rackShardNodes[node.rack] = append(rackShardNodes[node.rack], node)
totalShards += count
}
}
if totalShards == 0 {
return nil
}
// Check if imbalance exceeds threshold
if !exceedsImbalanceThreshold(rackShardCount, totalShards, numRacks, threshold) {
return nil
}
maxPerRack := ceilDivide(totalShards, numRacks)
var moves []*shardMove
// Find over-loaded racks and move excess shards to under-loaded racks
for rackID, count := range rackShardCount {
if count <= maxPerRack {
continue
}
excess := count - maxPerRack
movedFromRack := 0
// Find shards to move from this rack
for _, node := range rackShardNodes[rackID] {
if movedFromRack >= excess {
break
}
info := node.ecShards[vid]
for shardID := 0; shardID < erasure_coding.TotalShardsCount; shardID++ {
if movedFromRack >= excess {
break
}
if info.shardBits&(1<<uint(shardID)) == 0 {
continue
for diskID, diskType := range diskTypeOf {
node.AddDisk(diskID, diskType, perDiskFree, diskShardCount[diskID])
}
// Find destination: rack with fewest shards of this volume
destNode := findDestNodeInUnderloadedRack(vid, racks, rackShardCount, maxPerRack, rackID, nodes)
if destNode == nil {
continue
}
moves = append(moves, &shardMove{
volumeID: vid,
shardID: shardID,
collection: collection,
source: node,
sourceDisk: ecShardDiskID(node, vid),
target: destNode,
targetDisk: ecShardDiskID(destNode, vid),
phase: "cross_rack",
})
movedFromRack++
// Reserve capacity on destination so it isn't picked again,
// and release one slot on the source so later volumes in this
// same detection run see its true available capacity.
rackShardCount[destNode.rack]++
rackShardCount[rackID]--
node.freeSlots++
destNode.freeSlots--
}
}
}
return moves
}
// detectWithinRackImbalance detects shards that should be moved within racks for even node distribution.
// Returns nil if imbalance is below the threshold.
func detectWithinRackImbalance(vid uint32, collection string, nodes map[string]*ecNodeInfo, racks map[string]*ecRackInfo, diskType string, threshold float64) []*shardMove {
var moves []*shardMove
for _, rack := range racks {
if len(rack.nodes) <= 1 {
continue
}
// Count shards per node in this rack for this volume
nodeShardCount := make(map[string]int)
totalInRack := 0
for nodeID, node := range rack.nodes {
info, ok := node.ecShards[vid]
if !ok {
continue
}
count := shardBitCount(info.shardBits)
nodeShardCount[nodeID] = count
totalInRack += count
}
if totalInRack == 0 {
continue
}
// Check if imbalance exceeds threshold
if !exceedsImbalanceThreshold(nodeShardCount, totalInRack, len(rack.nodes), threshold) {
continue
}
maxPerNode := ceilDivide(totalInRack, len(rack.nodes))
// Find over-loaded nodes and move excess
for nodeID, count := range nodeShardCount {
if count <= maxPerNode {
continue
}
excess := count - maxPerNode
node := rack.nodes[nodeID]
info := node.ecShards[vid]
moved := 0
for shardID := 0; shardID < erasure_coding.TotalShardsCount; shardID++ {
if moved >= excess {
break
}
if info.shardBits&(1<<uint(shardID)) == 0 {
continue
}
// Find least-loaded node in same rack
destNode := findLeastLoadedNodeInRack(vid, rack, nodeID, nodeShardCount, maxPerNode)
if destNode == nil {
continue
}
moves = append(moves, &shardMove{
volumeID: vid,
shardID: shardID,
collection: collection,
source: node,
sourceDisk: ecShardDiskID(node, vid),
target: destNode,
targetDisk: 0,
phase: "within_rack",
})
moved++
nodeShardCount[nodeID]--
nodeShardCount[destNode.nodeID]++
node.freeSlots++
destNode.freeSlots--
}
}
}
return moves
}
// detectGlobalImbalance detects total shard count imbalance across nodes in each rack.
// Respects ImbalanceThreshold from config. Only considers volumes in allowedVids.
func detectGlobalImbalance(nodes map[string]*ecNodeInfo, racks map[string]*ecRackInfo, config *Config, allowedVids map[uint32]bool) []*shardMove {
var moves []*shardMove
for _, rack := range racks {
if len(rack.nodes) <= 1 {
continue
}
// Count total EC shards per node (only for allowed volumes)
nodeShardCounts := make(map[string]int)
totalShards := 0
for nodeID, node := range rack.nodes {
count := 0
for vid, info := range node.ecShards {
if len(allowedVids) > 0 && !allowedVids[vid] {
continue
}
count += shardBitCount(info.shardBits)
}
nodeShardCounts[nodeID] = count
totalShards += count
}
if totalShards == 0 {
continue
}
// Snapshot each node's total shard capacity (current shards from allowed
// volumes plus any remaining free slots). Capacity is fixed for the
// duration of this loop — moves conserve total shards across the rack,
// so the denominator does not change as nodeShardCounts shift.
nodeCapacity := make(map[string]int, len(rack.nodes))
for nodeID, count := range nodeShardCounts {
nodeCapacity[nodeID] = count + rack.nodes[nodeID].freeSlots
}
// Check if imbalance exceeds threshold using utilization ratios
// (count/capacity), not raw shard counts. Raw counts would say a
// cluster is imbalanced whenever a large-capacity node holds more
// shards than a small-capacity node, even when both are at the
// same fractional fullness.
if !exceedsUtilImbalanceThreshold(nodeShardCounts, nodeCapacity, config.ImbalanceThreshold) {
continue
}
// Iteratively move shards from most-utilized to least-utilized
for i := 0; i < 10; i++ { // cap iterations to avoid infinite loops
// Find min and max nodes by utilization ratio. Min must have free
// slots so it can receive a shard; max can be any node with shards
// (we move shards out of it). Utilization-based selection is
// critical on heterogeneous racks: a large-capacity node with many
// shards in absolute terms may still be the LEAST utilized, and
// moving shards into it from a small, nearly-full node is the
// correct direction even though raw counts would suggest otherwise.
var minNode, maxNode *ecNodeInfo
minUtil := math.Inf(1)
maxUtil := -1.0
var minCount, maxCount int
for nodeID, count := range nodeShardCounts {
node := rack.nodes[nodeID]
cap := nodeCapacity[nodeID]
if cap <= 0 {
continue
}
util := float64(count) / float64(cap)
if util < minUtil && node.freeSlots > 0 {
minUtil = util
minCount = count
minNode = node
}
if util > maxUtil {
maxUtil = util
maxCount = count
maxNode = rack.nodes[nodeID]
}
}
if maxNode == nil || minNode == nil || maxNode.nodeID == minNode.nodeID {
break
}
// Per-move convergence guard: reject any move where the
// destination's post-move utilization would strictly exceed the
// source's post-move utilization. This mirrors the guard in
// weed/worker/tasks/balance/detection.go and terminates the loop
// once no further beneficial move exists, preventing oscillation
// and overshoot on heterogeneous racks.
maxCap := nodeCapacity[maxNode.nodeID]
minCap := nodeCapacity[minNode.nodeID]
if maxCap <= 0 || minCap <= 0 {
break
}
newSrcUtil := float64(maxCount-1) / float64(maxCap)
newDstUtil := float64(minCount+1) / float64(minCap)
if newDstUtil > newSrcUtil {
break
}
// Pick a shard from maxNode that doesn't already exist on minNode
moved := false
for vid, info := range maxNode.ecShards {
if moved {
break
}
if len(allowedVids) > 0 && !allowedVids[vid] {
continue
}
// Check minNode doesn't have this volume's shards already (avoid same-volume overlap)
minInfo := minNode.ecShards[vid]
for shardID := 0; shardID < erasure_coding.TotalShardsCount; shardID++ {
if info.shardBits&(1<<uint(shardID)) == 0 {
// Add shards only for volumes whose collection passes the filter;
// those are the volumes the planner will balance.
for diskType, diskInfo := range dn.DiskInfos {
if filterByDiskType && diskType != wantDiskType {
continue
}
// Skip if destination already has this shard
if minInfo != nil && minInfo.shardBits&(1<<uint(shardID)) != 0 {
continue
}
moves = append(moves, &shardMove{
volumeID: vid,
shardID: shardID,
collection: info.collection,
source: maxNode,
sourceDisk: info.diskID,
target: minNode,
targetDisk: 0,
phase: "global",
})
// Update in-memory shard placement so the next iteration
// of this loop picks a different shard. Without this, the
// inner loop always finds the lowest-set bit and emits
// duplicate move requests for the same physical shard.
shardBit := uint32(1 << uint(shardID))
info.shardBits &^= shardBit
if minInfo == nil {
minInfo = &ecVolumeInfo{
collection: info.collection,
diskID: info.diskID,
for _, eci := range diskInfo.EcShardInfos {
if len(allowedCollections) > 0 && !wildcard.MatchesAnyWildcard(allowedCollections, eci.Collection) {
continue
}
minNode.ecShards[vid] = minInfo
node.AddShards(eci.Id, eci.Collection, eci.DiskId, erasure_coding.ShardBits(eci.EcIndexBits))
}
minInfo.shardBits |= shardBit
nodeShardCounts[maxNode.nodeID]--
nodeShardCounts[minNode.nodeID]++
maxNode.freeSlots++
minNode.freeSlots--
moved = true
break
}
}
if !moved {
break
nodeCount++
}
}
}
return moves
return topo, nodeCount
}
// findDestNodeInUnderloadedRack finds a node in a rack that has fewer than maxPerRack shards
func findDestNodeInUnderloadedRack(vid uint32, racks map[string]*ecRackInfo, rackShardCount map[string]int, maxPerRack int, excludeRack string, nodes map[string]*ecNodeInfo) *ecNodeInfo {
var bestNode *ecNodeInfo
bestFreeSlots := -1
for rackID, rack := range racks {
if rackID == excludeRack {
continue
}
if rackShardCount[rackID] >= maxPerRack {
continue
}
if rack.freeSlots <= 0 {
continue
}
for _, node := range rack.nodes {
if node.freeSlots <= 0 {
continue
}
if node.freeSlots > bestFreeSlots {
bestFreeSlots = node.freeSlots
bestNode = node
}
}
}
return bestNode
// resolveECRatio returns the (dataShards, parityShards) for a collection from the
// admin EC config snapshot when present, else the local default. This keeps the
// enterprise-only custom-ratio plumbing out of the shared planner.
func resolveECRatio(_ *types.ClusterInfo, _ string) (int, int) {
// Custom EC ratios are an enterprise feature; OSS uses the standard scheme.
return normalizeECShardCounts(0, 0)
}
// findLeastLoadedNodeInRack finds the node with fewest shards in a rack
func findLeastLoadedNodeInRack(vid uint32, rack *ecRackInfo, excludeNode string, nodeShardCount map[string]int, maxPerNode int) *ecNodeInfo {
var bestNode *ecNodeInfo
bestCount := maxPerNode + 1
for nodeID, node := range rack.nodes {
if nodeID == excludeNode {
continue
}
if node.freeSlots <= 0 {
continue
}
count := nodeShardCount[nodeID]
if count >= maxPerNode {
continue
}
if count < bestCount {
bestCount = count
bestNode = node
}
// resolveReplicaPlacement picks the EC shard replica placement constraint: an
// explicit config value wins; otherwise it falls back to the master's default
// replication (matching the shell ec.balance default). A missing, invalid, or
// zero-replication value yields nil, meaning even spread / no constraint.
func resolveReplicaPlacement(ecConfig *Config, clusterInfo *types.ClusterInfo) *super_block.ReplicaPlacement {
spec := ecConfig.ReplicaPlacement
if spec == "" && clusterInfo != nil {
spec = clusterInfo.DefaultReplicaPlacement
}
return bestNode
if spec == "" {
return nil
}
rp, err := super_block.NewReplicaPlacementFromString(spec)
if err != nil {
glog.Warningf("EC balance: ignoring invalid replica placement %q: %v", spec, err)
return nil
}
if !rp.HasReplication() {
return nil
}
return rp
}
// exceedsImbalanceThreshold checks if the distribution of counts exceeds the threshold.
// numGroups is the total number of groups (including those with 0 shards that aren't in the map).
// imbalanceRatio = (maxCount - minCount) / avgCount
func exceedsImbalanceThreshold(counts map[string]int, total int, numGroups int, threshold float64) bool {
if numGroups <= 1 || total == 0 {
return false
func normalizeECShardCounts(dataShards, parityShards int) (int, int) {
if dataShards <= 0 {
dataShards = erasure_coding.DataShardsCount
}
minCount := 0 // groups not in map have 0 shards
if len(counts) >= numGroups {
// All groups have entries; find actual min
minCount = total + 1
for _, count := range counts {
if count < minCount {
minCount = count
}
}
if parityShards <= 0 {
parityShards = erasure_coding.ParityShardsCount
}
maxCount := -1
for _, count := range counts {
if count > maxCount {
maxCount = count
}
}
avg := float64(total) / float64(numGroups)
if avg == 0 {
return false
}
imbalanceRatio := float64(maxCount-minCount) / avg
return imbalanceRatio > threshold
return dataShards, parityShards
}
// exceedsUtilImbalanceThreshold checks whether the per-node utilization ratio
// (shard count / shard slot capacity) is skewed beyond the given threshold.
// Unlike exceedsImbalanceThreshold, it compares fractional fullness rather
// than raw counts so that racks with heterogeneous MaxVolumeCount are
// evaluated correctly — a large-capacity node holding more shards than a
// small-capacity node is not considered imbalanced if both are at the same
// fractional fullness. Nodes with zero capacity are skipped.
func exceedsUtilImbalanceThreshold(counts map[string]int, capacities map[string]int, threshold float64) bool {
minUtil := math.Inf(1)
maxUtil := -1.0
seen := 0
for nodeID, count := range counts {
cap := capacities[nodeID]
if cap <= 0 {
continue
}
util := float64(count) / float64(cap)
if util < minUtil {
minUtil = util
}
if util > maxUtil {
maxUtil = util
}
seen++
}
if seen < 2 || maxUtil <= 0 {
return false
}
avg := (maxUtil + minUtil) / 2
if avg == 0 {
return false
}
return (maxUtil-minUtil)/avg > threshold
}
// applyMovesToTopology simulates planned moves on the in-memory topology
// so subsequent detection phases see updated shard placement.
func applyMovesToTopology(moves []*shardMove) {
for _, move := range moves {
shardBit := uint32(1 << uint(move.shardID))
// Remove shard from source
if srcInfo, ok := move.source.ecShards[move.volumeID]; ok {
srcInfo.shardBits &^= shardBit
}
// For non-dedup moves, add shard to target
if move.source.nodeID != move.target.nodeID {
dstInfo, ok := move.target.ecShards[move.volumeID]
if !ok {
dstInfo = &ecVolumeInfo{
collection: move.collection,
diskID: move.targetDisk,
}
move.target.ecShards[move.volumeID] = dstInfo
}
dstInfo.shardBits |= shardBit
}
}
}
// Helper functions
func countEcShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) int {
count := 0
for _, eci := range ecShardInfos {
@@ -841,29 +278,6 @@ func countEcShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) in
return count
}
func shardBitCount(bits uint32) int {
count := 0
for bits != 0 {
count += int(bits & 1)
bits >>= 1
}
return count
}
func ecShardDiskID(node *ecNodeInfo, vid uint32) uint32 {
if info, ok := node.ecShards[vid]; ok {
return info.diskID
}
return 0
}
func ceilDivide(a, b int) int {
if b == 0 {
return 0
}
return (a + b - 1) / b
}
func movePhasePriority(phase string) types.TaskPriority {
switch phase {
case "dedup":
+50 -532
View File
@@ -6,519 +6,59 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/ecbalancer"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
func TestShardBitCount(t *testing.T) {
tests := []struct {
bits uint32
expected int
}{
{0, 0},
{1, 1},
{0b111, 3},
{0x3FFF, 14}, // all 14 shards
{0b10101010, 4},
// The EC balance policy itself is tested in the shared ecbalancer package; these
// tests cover the worker adapter: building the planner topology from the master
// topology (filters, capacity) and the Detection entry point.
func ecTopo(node1Collection string) *master_pb.TopologyInfo {
node1 := &master_pb.DataNodeInfo{
Id: "node1",
DiskInfos: map[string]*master_pb.DiskInfo{
"": {Type: "", MaxVolumeCount: 100, EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{
{Id: 100, Collection: node1Collection, DiskId: 0, EcIndexBits: 0x3FFF}, // 14 shards
}},
},
}
for _, tt := range tests {
got := shardBitCount(tt.bits)
if got != tt.expected {
t.Errorf("shardBitCount(%b) = %d, want %d", tt.bits, got, tt.expected)
}
node2 := &master_pb.DataNodeInfo{
Id: "node2",
DiskInfos: map[string]*master_pb.DiskInfo{"": {Type: "", MaxVolumeCount: 100}},
}
return &master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{Id: "rack1", DataNodeInfos: []*master_pb.DataNodeInfo{node1}},
{Id: "rack2", DataNodeInfos: []*master_pb.DataNodeInfo{node2}},
},
}},
}
}
func TestCeilDivide(t *testing.T) {
tests := []struct {
a, b int
expected int
}{
{14, 3, 5},
{14, 7, 2},
{10, 3, 4},
{0, 5, 0},
{5, 0, 0},
}
for _, tt := range tests {
got := ceilDivide(tt.a, tt.b)
if got != tt.expected {
t.Errorf("ceilDivide(%d, %d) = %d, want %d", tt.a, tt.b, got, tt.expected)
}
}
}
func TestDetectDuplicateShards(t *testing.T) {
nodes := map[string]*ecNodeInfo{
"node1": {
nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 5,
ecShards: map[uint32]*ecVolumeInfo{
100: {collection: "col1", shardBits: 0b11}, // shard 0, 1
},
},
"node2": {
nodeID: "node2", address: "node2:8080", rack: "dc1:rack2", freeSlots: 10,
ecShards: map[uint32]*ecVolumeInfo{
100: {collection: "col1", shardBits: 0b01}, // shard 0 (duplicate)
},
},
}
moves := detectDuplicateShards(100, "col1", nodes, "")
if len(moves) != 1 {
t.Fatalf("expected 1 dedup move, got %d", len(moves))
}
move := moves[0]
if move.phase != "dedup" {
t.Errorf("expected phase 'dedup', got %q", move.phase)
}
if move.shardID != 0 {
t.Errorf("expected shard 0 to be deduplicated, got %d", move.shardID)
}
// node1 has fewer free slots, so the duplicate on node1 should be removed (keeper is node2)
if move.source.nodeID != "node1" {
t.Errorf("expected source node1 (fewer free slots), got %s", move.source.nodeID)
}
// Dedup moves set target=source so isDedupPhase recognizes unmount+delete only
if move.target.nodeID != "node1" {
t.Errorf("expected target node1 (same as source for dedup), got %s", move.target.nodeID)
}
}
func TestDetectCrossRackImbalance(t *testing.T) {
// 14 shards all on rack1, 2 racks available — large imbalance
nodes := map[string]*ecNodeInfo{
"node1": {
nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 0,
ecShards: map[uint32]*ecVolumeInfo{
100: {collection: "col1", shardBits: 0x3FFF}, // all 14 shards
},
},
"node2": {
nodeID: "node2", address: "node2:8080", rack: "dc1:rack2", freeSlots: 20,
ecShards: map[uint32]*ecVolumeInfo{},
},
}
racks := map[string]*ecRackInfo{
"dc1:rack1": {
nodes: map[string]*ecNodeInfo{"node1": nodes["node1"]},
freeSlots: 0,
},
"dc1:rack2": {
nodes: map[string]*ecNodeInfo{"node2": nodes["node2"]},
freeSlots: 20,
},
}
// Use very low threshold so this triggers
moves := detectCrossRackImbalance(100, "col1", nodes, racks, "", 0.01)
// With 14 shards across 2 racks, max per rack = 7
// rack1 has 14 -> excess = 7, should move 7 to rack2
if len(moves) != 7 {
t.Fatalf("expected 7 cross-rack moves, got %d", len(moves))
}
for _, move := range moves {
if move.phase != "cross_rack" {
t.Errorf("expected phase 'cross_rack', got %q", move.phase)
}
if move.source.rack != "dc1:rack1" {
t.Errorf("expected source dc1:rack1, got %s", move.source.rack)
}
if move.target.rack != "dc1:rack2" {
t.Errorf("expected target dc1:rack2, got %s", move.target.rack)
}
}
}
func TestDetectCrossRackImbalanceBelowThreshold(t *testing.T) {
// Slight imbalance: rack1 has 8, rack2 has 6 — imbalance = 2/7 ≈ 0.29
nodes := map[string]*ecNodeInfo{
"node1": {
nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 10,
ecShards: map[uint32]*ecVolumeInfo{
100: {collection: "col1", shardBits: 0xFF}, // 8 shards
},
},
"node2": {
nodeID: "node2", address: "node2:8080", rack: "dc1:rack2", freeSlots: 10,
ecShards: map[uint32]*ecVolumeInfo{
100: {collection: "col1", shardBits: 0x3F00}, // 6 shards
},
},
}
racks := map[string]*ecRackInfo{
"dc1:rack1": {
nodes: map[string]*ecNodeInfo{"node1": nodes["node1"]},
freeSlots: 10,
},
"dc1:rack2": {
nodes: map[string]*ecNodeInfo{"node2": nodes["node2"]},
freeSlots: 10,
},
}
// High threshold should skip this
moves := detectCrossRackImbalance(100, "col1", nodes, racks, "", 0.5)
if len(moves) != 0 {
t.Fatalf("expected 0 moves below threshold, got %d", len(moves))
}
}
func TestDetectWithinRackImbalance(t *testing.T) {
// rack1 has 2 nodes: node1 has 10 shards, node2 has 0 shards
nodes := map[string]*ecNodeInfo{
"node1": {
nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 5,
ecShards: map[uint32]*ecVolumeInfo{
100: {collection: "col1", shardBits: 0b1111111111}, // shards 0-9
},
},
"node2": {
nodeID: "node2", address: "node2:8080", rack: "dc1:rack1", freeSlots: 20,
ecShards: map[uint32]*ecVolumeInfo{},
},
}
racks := map[string]*ecRackInfo{
"dc1:rack1": {
nodes: map[string]*ecNodeInfo{"node1": nodes["node1"], "node2": nodes["node2"]},
freeSlots: 25,
},
}
moves := detectWithinRackImbalance(100, "col1", nodes, racks, "", 0.01)
// 10 shards on 2 nodes, max per node = 5
// node1 has 10 -> excess = 5, should move 5 to node2
if len(moves) != 5 {
t.Fatalf("expected 5 within-rack moves, got %d", len(moves))
}
for _, move := range moves {
if move.phase != "within_rack" {
t.Errorf("expected phase 'within_rack', got %q", move.phase)
}
if move.source.nodeID != "node1" {
t.Errorf("expected source node1, got %s", move.source.nodeID)
}
if move.target.nodeID != "node2" {
t.Errorf("expected target node2, got %s", move.target.nodeID)
}
}
}
func TestDetectGlobalImbalance(t *testing.T) {
// node1 has 20 total shards, node2 has 2 total shards (same rack)
nodes := map[string]*ecNodeInfo{
"node1": {
nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 5,
ecShards: map[uint32]*ecVolumeInfo{
100: {collection: "col1", shardBits: 0x3FFF}, // 14 shards
200: {collection: "col1", shardBits: 0b111111}, // 6 shards
},
},
"node2": {
nodeID: "node2", address: "node2:8080", rack: "dc1:rack1", freeSlots: 30,
ecShards: map[uint32]*ecVolumeInfo{
300: {collection: "col1", shardBits: 0b11}, // 2 shards
},
},
}
racks := map[string]*ecRackInfo{
"dc1:rack1": {
nodes: map[string]*ecNodeInfo{"node1": nodes["node1"], "node2": nodes["node2"]},
freeSlots: 35,
},
}
func TestBuildBalancerTopology(t *testing.T) {
config := NewDefaultConfig()
config.ImbalanceThreshold = 0.01 // low threshold to ensure moves happen
moves := detectGlobalImbalance(nodes, racks, config, nil)
// Total = 22 shards, avg = 11. node1 has 20, node2 has 2.
// Should move shards until balanced (max 10 iterations)
topo, nodeCount := buildBalancerTopology(ecTopo("col1"), config)
if nodeCount != 2 {
t.Fatalf("nodeCount = %d, want 2", nodeCount)
}
moves := ecbalancer.Plan(topo, ecbalancer.Options{ImbalanceThreshold: 0.01})
if len(moves) == 0 {
t.Fatal("expected global balance moves, got 0")
}
for _, move := range moves {
if move.phase != "global" {
t.Errorf("expected phase 'global', got %q", move.phase)
}
if move.source.nodeID != "node1" {
t.Errorf("expected moves from node1, got %s", move.source.nodeID)
}
if move.target.nodeID != "node2" {
t.Errorf("expected moves to node2, got %s", move.target.nodeID)
}
t.Error("expected cross-rack moves for an all-on-one-rack volume")
}
}
// TestDetectGlobalImbalance_HeterogeneousCapacity is a regression test for
// the Phase 4 rebalancer on heterogeneous racks. node1 holds more shards in
// absolute terms but has much higher capacity, so it is actually the LESS
// utilized node; node2 holds fewer shards but is nearly full. The greedy
// algorithm must pick the most-utilized node as the source and move shards
// in the direction that reduces fractional fullness, NOT in the direction
// that would equalize raw counts (which here would overfill node2).
//
// Scenario:
//
// node1: 10 shards, freeSlots=90 → capacity 100, util 10%
// node2: 3 shards, freeSlots=2 → capacity 5, util 60%
//
// Correct behavior: move shards FROM node2 TO node1 (draining the
// most-utilized node), until no further improvement is possible. Also
// verifies that moves are de-duplicated — the inner loop must update
// shardBits between iterations so each proposed move refers to a distinct
// physical shard.
func TestDetectGlobalImbalance_HeterogeneousCapacity(t *testing.T) {
nodes := map[string]*ecNodeInfo{
"node1": {
nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 90,
ecShards: map[uint32]*ecVolumeInfo{
100: {collection: "col1", shardBits: 0x3FF}, // 10 shards
},
},
"node2": {
nodeID: "node2", address: "node2:8080", rack: "dc1:rack1", freeSlots: 2,
ecShards: map[uint32]*ecVolumeInfo{
200: {collection: "col1", shardBits: 0b111}, // 3 shards
},
},
}
racks := map[string]*ecRackInfo{
"dc1:rack1": {
nodes: map[string]*ecNodeInfo{"node1": nodes["node1"], "node2": nodes["node2"]},
freeSlots: 92,
},
}
func TestBuildBalancerTopologyCollectionFilter(t *testing.T) {
config := NewDefaultConfig()
config.ImbalanceThreshold = 0.01
moves := detectGlobalImbalance(nodes, racks, config, nil)
if len(moves) == 0 {
t.Fatal("expected moves from high-util node2 to low-util node1, got 0")
config.CollectionFilter = "other" // does not match the volume's collection
topo, nodeCount := buildBalancerTopology(ecTopo("col1"), config)
if nodeCount != 2 {
t.Fatalf("nodeCount = %d, want 2", nodeCount)
}
// Every move must drain the higher-util node (node2) and target the
// lower-util node (node1). A raw-count-based greedy algorithm would
// pick the opposite direction — that is the bug this test guards.
for _, move := range moves {
if move.source.nodeID != "node2" {
t.Errorf("expected source node2 (util 0.60), got %s", move.source.nodeID)
}
if move.target.nodeID != "node1" {
t.Errorf("expected target node1 (util 0.10), got %s", move.target.nodeID)
}
}
// Verify no duplicate (volumeID, shardID) pairs — the inner loop must
// update shardBits between iterations so each move refers to a distinct
// physical shard.
seen := make(map[[2]int]bool, len(moves))
for _, move := range moves {
key := [2]int{int(move.volumeID), move.shardID}
if seen[key] {
t.Errorf("duplicate move for volume %d shard %d", move.volumeID, move.shardID)
}
seen[key] = true
}
}
func TestDetectGlobalImbalanceSkipsFullNodes(t *testing.T) {
// node2 has 0 free slots — should not be chosen as destination
nodes := map[string]*ecNodeInfo{
"node1": {
nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 10,
ecShards: map[uint32]*ecVolumeInfo{
100: {collection: "col1", shardBits: 0x3FFF}, // 14 shards
},
},
"node2": {
nodeID: "node2", address: "node2:8080", rack: "dc1:rack1", freeSlots: 0,
ecShards: map[uint32]*ecVolumeInfo{
200: {collection: "col1", shardBits: 0b11}, // 2 shards
},
},
}
racks := map[string]*ecRackInfo{
"dc1:rack1": {
nodes: map[string]*ecNodeInfo{"node1": nodes["node1"], "node2": nodes["node2"]},
freeSlots: 10,
},
}
config := NewDefaultConfig()
config.ImbalanceThreshold = 0.01
moves := detectGlobalImbalance(nodes, racks, config, nil)
// node2 has no free slots so no moves should be proposed
if len(moves) != 0 {
t.Fatalf("expected 0 moves (node2 full), got %d", len(moves))
}
}
func TestBuildECTopology(t *testing.T) {
topoInfo := &master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{
Id: "rack1",
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: "server1:8080",
DiskInfos: map[string]*master_pb.DiskInfo{
"": {
MaxVolumeCount: 100,
VolumeCount: 50,
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{
{
Id: 1,
Collection: "test",
EcIndexBits: 0x3FFF, // all 14 shards
},
},
},
},
},
},
},
},
},
},
}
config := NewDefaultConfig()
nodes, racks := buildECTopology(topoInfo, config)
if len(nodes) != 1 {
t.Fatalf("expected 1 node, got %d", len(nodes))
}
if len(racks) != 1 {
t.Fatalf("expected 1 rack, got %d", len(racks))
}
node := nodes["server1:8080"]
if node == nil {
t.Fatal("expected node server1:8080")
}
if node.dc != "dc1" {
t.Errorf("expected dc=dc1, got %s", node.dc)
}
// Rack key should be dc:rack composite
if node.rack != "dc1:rack1" {
t.Errorf("expected rack=dc1:rack1, got %s", node.rack)
}
ecInfo, ok := node.ecShards[1]
if !ok {
t.Fatal("expected EC shard info for volume 1")
}
if ecInfo.collection != "test" {
t.Errorf("expected collection=test, got %s", ecInfo.collection)
}
if shardBitCount(ecInfo.shardBits) != 14 {
t.Errorf("expected 14 shards, got %d", shardBitCount(ecInfo.shardBits))
}
}
func TestBuildECTopologyCrossDCRackNames(t *testing.T) {
// Two DCs with identically-named racks should produce distinct rack keys
topoInfo := &master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{{
Id: "rack1",
DataNodeInfos: []*master_pb.DataNodeInfo{{
Id: "node-dc1:8080",
DiskInfos: map[string]*master_pb.DiskInfo{
"": {MaxVolumeCount: 10, VolumeCount: 0},
},
}},
}},
},
{
Id: "dc2",
RackInfos: []*master_pb.RackInfo{{
Id: "rack1",
DataNodeInfos: []*master_pb.DataNodeInfo{{
Id: "node-dc2:8080",
DiskInfos: map[string]*master_pb.DiskInfo{
"": {MaxVolumeCount: 10, VolumeCount: 0},
},
}},
}},
},
},
}
config := NewDefaultConfig()
_, racks := buildECTopology(topoInfo, config)
if len(racks) != 2 {
t.Fatalf("expected 2 distinct racks, got %d", len(racks))
}
if _, ok := racks["dc1:rack1"]; !ok {
t.Error("expected dc1:rack1 rack key")
}
if _, ok := racks["dc2:rack1"]; !ok {
t.Error("expected dc2:rack1 rack key")
}
}
func TestCollectECCollections(t *testing.T) {
nodes := map[string]*ecNodeInfo{
"node1": {
ecShards: map[uint32]*ecVolumeInfo{
100: {collection: "col1"},
200: {collection: "col2"},
},
},
"node2": {
ecShards: map[uint32]*ecVolumeInfo{
100: {collection: "col1"},
300: {collection: "col2"},
},
},
}
config := NewDefaultConfig()
collections := collectECCollections(nodes, config)
if len(collections) != 2 {
t.Fatalf("expected 2 collections, got %d", len(collections))
}
if len(collections["col1"]) != 1 {
t.Errorf("expected 1 volume in col1, got %d", len(collections["col1"]))
}
if len(collections["col2"]) != 2 {
t.Errorf("expected 2 volumes in col2, got %d", len(collections["col2"]))
}
}
func TestCollectECCollectionsWithFilter(t *testing.T) {
nodes := map[string]*ecNodeInfo{
"node1": {
ecShards: map[uint32]*ecVolumeInfo{
100: {collection: "col1"},
200: {collection: "col2"},
},
},
}
config := NewDefaultConfig()
config.CollectionFilter = "col1"
collections := collectECCollections(nodes, config)
if len(collections) != 1 {
t.Fatalf("expected 1 collection, got %d", len(collections))
}
if _, ok := collections["col1"]; !ok {
t.Error("expected col1 to be present")
if moves := ecbalancer.Plan(topo, ecbalancer.Options{ImbalanceThreshold: 0.01}); len(moves) != 0 {
t.Errorf("filtered-out collection should produce no moves, got %d", len(moves))
}
}
@@ -542,46 +82,24 @@ func TestDetectionNilTopology(t *testing.T) {
config := NewDefaultConfig()
clusterInfo := &types.ClusterInfo{ActiveTopology: nil}
_, _, err := Detection(context.Background(), nil, clusterInfo, config, 0)
if err == nil {
if _, _, err := Detection(context.Background(), nil, clusterInfo, config, 0); err == nil {
t.Fatal("expected error for nil topology")
}
}
func TestMovePhasePriority(t *testing.T) {
if movePhasePriority("dedup") != types.TaskPriorityHigh {
t.Error("dedup should be high priority")
cases := map[string]types.TaskPriority{
"dedup": types.TaskPriorityHigh,
"cross_rack": types.TaskPriorityMedium,
"within_rack": types.TaskPriorityLow,
"global": types.TaskPriorityLow,
}
if movePhasePriority("cross_rack") != types.TaskPriorityMedium {
t.Error("cross_rack should be medium priority")
}
if movePhasePriority("within_rack") != types.TaskPriorityLow {
t.Error("within_rack should be low priority")
}
if movePhasePriority("global") != types.TaskPriorityLow {
t.Error("global should be low priority")
for phase, want := range cases {
if got := movePhasePriority(phase); got != want {
t.Errorf("movePhasePriority(%q) = %v, want %v", phase, got, want)
}
}
}
func TestExceedsImbalanceThreshold(t *testing.T) {
// 14 vs 0 across 2 groups: imbalance = 14/7 = 2.0 > any reasonable threshold
counts := map[string]int{"a": 14, "b": 0}
if !exceedsImbalanceThreshold(counts, 14, 2, 0.2) {
t.Error("expected imbalance to exceed 0.2 threshold")
}
// Only one group has shards but numGroups=2: min is 0 from absent group
counts2 := map[string]int{"a": 14}
if !exceedsImbalanceThreshold(counts2, 14, 2, 0.2) {
t.Error("expected imbalance with absent group to exceed 0.2 threshold")
}
// 7 vs 7: perfectly balanced
counts3 := map[string]int{"a": 7, "b": 7}
if exceedsImbalanceThreshold(counts3, 14, 2, 0.01) {
t.Error("expected balanced distribution to not exceed threshold")
}
}
// helper to avoid unused import
// keep the erasure_coding import meaningful for future adapter tests
var _ = erasure_coding.DataShardsCount
@@ -73,6 +73,17 @@ func (t *ECBalanceTask) Execute(ctx context.Context, params *worker_pb.TaskParam
isDedupDelete := ecParams != nil && isDedupPhase(params)
// Guard against a same-node, cross-disk "move". copyAndMountShard skips the
// copy when source and target addresses match, but deleteShard is node-wide
// (it removes the shard from every disk on the node), so this sequence would
// erase the shard after never copying it. EC shards also cannot be relocated
// between disks of one node via these RPCs, so such a move is meaningless.
// Reject it rather than lose data.
if source.Node == target.Node && source.DiskId != target.DiskId {
return fmt.Errorf("refusing same-node cross-disk EC shard move for volume %d shard(s) %v on %s (source disk %d, target disk %d): EC shard delete is node-wide and would erase the shard after a skipped copy",
params.VolumeId, source.ShardIds, source.Node, source.DiskId, target.DiskId)
}
glog.Infof("EC balance: moving shard(s) %v of volume %d from %s to %s",
source.ShardIds, params.VolumeId, source.Node, target.Node)
@@ -199,6 +210,14 @@ func (t *ECBalanceTask) Validate(params *worker_pb.TaskParams) error {
if len(params.Targets[0].ShardIds) == 0 {
return fmt.Errorf("ECBalanceTask.Validate: Targets[0].ShardIds is empty")
}
// A same-node, cross-disk move is unsafe: the node-wide EC shard delete would
// erase the shard after copyAndMountShard skips the same-address copy. Such a
// move cannot be expressed by these RPCs anyway. Dedup (same node and disk) is
// allowed.
if params.Sources[0].Node == params.Targets[0].Node && params.Sources[0].DiskId != params.Targets[0].DiskId {
return fmt.Errorf("ECBalanceTask.Validate: refusing same-node cross-disk move on %s (source disk %d, target disk %d): EC shard delete is node-wide",
params.Sources[0].Node, params.Sources[0].DiskId, params.Targets[0].DiskId)
}
return nil
}
@@ -223,10 +242,16 @@ func (t *ECBalanceTask) reportProgress(progress float64, stage string) {
glog.Infof("EC balance volume %d: [%.2f] %s", t.volumeID, progress, stage)
}
// isDedupPhase checks if this is a dedup-phase task (source and target are the same node)
// isDedupPhase checks if this is a dedup-phase task: an unmount+delete on a
// single location, encoded by detection as source==target on the same node AND
// the same disk. Comparing the disk too is essential — VolumeEcShardsDelete is
// node-wide (it removes the shard from every disk on the node), so a same-node
// but cross-disk request must NOT be treated as a benign dedup; see Validate
// and Execute, which reject it outright.
func isDedupPhase(params *worker_pb.TaskParams) bool {
if len(params.Sources) > 0 && len(params.Targets) > 0 {
return params.Sources[0].Node == params.Targets[0].Node
s, t := params.Sources[0], params.Targets[0]
return s.Node == t.Node && s.DiskId == t.DiskId
}
return false
}
@@ -0,0 +1,335 @@
package ec_balance
import (
"context"
"sort"
"testing"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// In-process (no real cluster) tests of the detection/planning path against a
// topology shaped the way the master reports a multi-disk cluster: several
// same-type physical disks on a node collapse into a single DiskInfo, with each
// shard's real DiskId surviving only in the per-shard records (issue 9593). They
// run the master-wire-format -> ActiveTopology -> Detection path, then simulate
// executing the planned moves with the volume server's actual semantics
// (VolumeEcShardsDelete is node-wide) to assert no EC shard is ever lost. The
// real-cluster end-to-end equivalent lives in test/erasure_coding.
// nodeSpec describes one volume server: its rack and the volume's shards per
// physical disk (diskID -> shard ids).
type nodeSpec struct {
id string
rack string
disks map[uint32][]int
}
const integDisksPerNode = 6
// buildMasterTopology renders nodeSpecs into a *master_pb.TopologyInfo exactly as
// the master would: one DiskInfo per node keyed by disk type (""), carrying one
// EcShardInfo per (volume, physical disk) plus regular VolumeInfos so every
// physical disk is discoverable even when it holds no EC shards.
func buildMasterTopology(collection string, vid uint32, maxVolPerDisk int, specs []nodeSpec) *master_pb.TopologyInfo {
rackByID := map[string]*master_pb.RackInfo{}
var rackOrder []string
for _, spec := range specs {
var ecShards []*master_pb.VolumeEcShardInformationMessage
for diskID, shards := range spec.disks {
var bits erasure_coding.ShardBits
for _, s := range shards {
bits = bits.Set(erasure_coding.ShardId(s))
}
ecShards = append(ecShards, &master_pb.VolumeEcShardInformationMessage{
Id: vid,
Collection: collection,
DiskId: diskID,
EcIndexBits: uint32(bits),
})
}
// Expose all physical disks (incl. EC-empty ones) via regular volumes.
var volInfos []*master_pb.VolumeInformationMessage
for d := uint32(0); d < integDisksPerNode; d++ {
volInfos = append(volInfos, &master_pb.VolumeInformationMessage{Id: 90000 + d, DiskId: d})
}
dn := &master_pb.DataNodeInfo{
Id: spec.id,
DiskInfos: map[string]*master_pb.DiskInfo{
"": {
Type: "",
MaxVolumeCount: int64(maxVolPerDisk * integDisksPerNode),
VolumeCount: int64(integDisksPerNode),
EcShardInfos: ecShards,
VolumeInfos: volInfos,
},
},
}
r, ok := rackByID[spec.rack]
if !ok {
r = &master_pb.RackInfo{Id: spec.rack}
rackByID[spec.rack] = r
rackOrder = append(rackOrder, spec.rack)
}
r.DataNodeInfos = append(r.DataNodeInfos, dn)
}
dc := &master_pb.DataCenterInfo{Id: "dc1"}
for _, rk := range rackOrder {
dc.RackInfos = append(dc.RackInfos, rackByID[rk])
}
return &master_pb.TopologyInfo{Id: "integ", DataCenterInfos: []*master_pb.DataCenterInfo{dc}}
}
// runDetection wires the topology through a real ActiveTopology and runs Detection.
func runDetection(t *testing.T, topoInfo *master_pb.TopologyInfo, cfg *Config) []*types.TaskDetectionResult {
t.Helper()
at := topology.NewActiveTopology(0)
if err := at.UpdateTopology(topoInfo); err != nil {
t.Fatalf("UpdateTopology: %v", err)
}
results, _, err := Detection(context.Background(), nil, &types.ClusterInfo{ActiveTopology: at}, cfg, 0)
if err != nil {
t.Fatalf("Detection: %v", err)
}
return results
}
// shardModel is nodeID -> diskID -> set of shard ids for one volume.
type shardModel map[string]map[uint32]map[int]bool
func modelFromSpecs(specs []nodeSpec) shardModel {
m := make(shardModel)
for _, spec := range specs {
m[spec.id] = make(map[uint32]map[int]bool)
for diskID, shards := range spec.disks {
set := make(map[int]bool)
for _, s := range shards {
set[s] = true
}
m[spec.id][diskID] = set
}
}
return m
}
func (m shardModel) distinctShards() []int {
seen := map[int]bool{}
for _, disks := range m {
for _, set := range disks {
for s := range set {
seen[s] = true
}
}
}
out := make([]int, 0, len(seen))
for s := range seen {
out = append(out, s)
}
sort.Ints(out)
return out
}
// applyMovesRealistically replays planned moves with the volume server's actual
// behavior: a move copies the shard to the destination disk, then deletes it on
// the source node — and EC shard delete is node-wide (removes the shard from
// every disk on the source node). A dedup move (same node+disk) is delete-only.
func (m shardModel) apply(results []*types.TaskDetectionResult) {
nodeWideDelete := func(node string, shard int) {
for diskID := range m[node] {
delete(m[node][diskID], shard)
}
}
for _, r := range results {
p := r.TypedParams
if p == nil || len(p.Sources) == 0 || len(p.Targets) == 0 {
continue
}
src, dst := p.Sources[0], p.Targets[0]
if len(src.ShardIds) == 0 {
continue
}
shard := int(src.ShardIds[0])
dedup := src.Node == dst.Node && src.DiskId == dst.DiskId
if !dedup {
if m[dst.Node] == nil {
m[dst.Node] = make(map[uint32]map[int]bool)
}
if m[dst.Node][dst.DiskId] == nil {
m[dst.Node][dst.DiskId] = make(map[int]bool)
}
m[dst.Node][dst.DiskId][shard] = true
}
nodeWideDelete(src.Node, shard)
}
}
// TestMultiDiskBalanceNeverLosesShards is the core regression for
// issue 9593: a freshly-encoded volume on a 3-node, 6-disk-per-node cluster,
// balanced and then concentrated, must never lose a shard when the planned moves
// are executed with real node-wide-delete semantics.
func TestMultiDiskBalanceNeverLosesShards(t *testing.T) {
cfg := NewDefaultConfig()
cfg.ImbalanceThreshold = 0.0 // balance to even; surface any move the planner makes
cases := []struct {
name string
specs []nodeSpec
}{
{
// Healthy post-encode spread: 14 shards across 3 nodes (5/5/4), each
// node spreading its shards over distinct physical disks.
name: "balanced across disks",
specs: []nodeSpec{
{id: "n1", rack: "rack1", disks: map[uint32][]int{0: {0}, 1: {1}, 2: {2}, 3: {3}, 4: {4}}},
{id: "n2", rack: "rack1", disks: map[uint32][]int{0: {5}, 1: {6}, 2: {7}, 3: {8}, 4: {9}}},
{id: "n3", rack: "rack1", disks: map[uint32][]int{0: {10}, 1: {11}, 2: {12}, 3: {13}}},
},
},
{
// Worst case from the report: all 14 shards landed on one node's disks;
// the balancer must redistribute without losing any.
name: "concentrated on one node",
specs: []nodeSpec{
{id: "n1", rack: "rack1", disks: map[uint32][]int{0: {0, 1, 2}, 1: {3, 4, 5}, 2: {6, 7}, 3: {8, 9}, 4: {10, 11}, 5: {12, 13}}},
{id: "n2", rack: "rack2", disks: map[uint32][]int{}},
{id: "n3", rack: "rack3", disks: map[uint32][]int{}},
},
},
{
// Multi-rack healthy spread.
name: "spread across racks",
specs: []nodeSpec{
{id: "n1", rack: "rack1", disks: map[uint32][]int{0: {0, 1}, 1: {2, 3}, 2: {4}}},
{id: "n2", rack: "rack2", disks: map[uint32][]int{0: {5, 6}, 1: {7, 8}, 2: {9}}},
{id: "n3", rack: "rack3", disks: map[uint32][]int{0: {10, 11}, 1: {12, 13}}},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
topoInfo := buildMasterTopology("col1", 28, 50, tc.specs)
results := runDetection(t, topoInfo, cfg)
model := modelFromSpecs(tc.specs)
before := model.distinctShards()
for _, r := range results {
// No move may be a same-node cross-disk move (node-wide delete
// would erase the shard after a skipped copy).
p := r.TypedParams
s, d := p.Sources[0], p.Targets[0]
if s.Node == d.Node && s.DiskId != d.DiskId {
t.Errorf("unsafe same-node cross-disk move: vol %d shard %v on %s disk %d->%d", p.VolumeId, s.ShardIds, s.Node, s.DiskId, d.DiskId)
}
// Source disk must actually hold the shard being moved.
shard := int(s.ShardIds[0])
if !model[s.Node][s.DiskId][shard] && s.Node != d.Node {
t.Errorf("move sources vol %d shard %d from %s disk %d, which does not hold it", p.VolumeId, shard, s.Node, s.DiskId)
}
}
model.apply(results)
after := model.distinctShards()
if len(after) != len(before) {
t.Errorf("[%s] shard loss: had %v (%d), now %v (%d) after %d moves",
tc.name, before, len(before), after, len(after), len(results))
}
})
}
}
// TestConcentratedVolumeSpreadsAcrossNodesAndDisks asserts the
// remediation actually happens: a one-node-concentrated volume is redistributed
// to the other racks, landing on multiple distinct destination disks.
func TestConcentratedVolumeSpreadsAcrossNodesAndDisks(t *testing.T) {
cfg := NewDefaultConfig()
cfg.ImbalanceThreshold = 0.0
specs := []nodeSpec{
{id: "n1", rack: "rack1", disks: map[uint32][]int{0: {0, 1, 2}, 1: {3, 4, 5}, 2: {6, 7}, 3: {8, 9}, 4: {10, 11}, 5: {12, 13}}},
{id: "n2", rack: "rack2", disks: map[uint32][]int{}},
{id: "n3", rack: "rack3", disks: map[uint32][]int{}},
}
results := runDetection(t, buildMasterTopology("col1", 28, 50, specs), cfg)
if len(results) == 0 {
t.Fatal("expected redistribution moves for a one-node-concentrated volume")
}
destNodes := map[string]bool{}
destDisksByNode := map[string]map[uint32]bool{}
for _, r := range results {
d := r.TypedParams.Targets[0]
if d.Node == "n1" {
continue
}
destNodes[d.Node] = true
if destDisksByNode[d.Node] == nil {
destDisksByNode[d.Node] = map[uint32]bool{}
}
destDisksByNode[d.Node][d.DiskId] = true
}
if len(destNodes) < 2 {
t.Errorf("shards spread to only %d destination nodes, want both other racks", len(destNodes))
}
for node, disks := range destDisksByNode {
if len(disks) < 2 {
t.Errorf("destination %s received shards on only %d disk(s); expected spread across disks: %v", node, len(disks), disks)
}
}
}
// TestBuildBalancerTopologyNormalizesHddDiskType guards the disk-type filter:
// the master reports default-HDD disks under the empty-string key, so a config of
// "hdd" must match them (not filter everything out), while "ssd" must exclude them.
func TestBuildBalancerTopologyNormalizesHddDiskType(t *testing.T) {
specs := []nodeSpec{
{id: "n1", rack: "r1", disks: map[uint32][]int{0: {0, 1}}},
{id: "n2", rack: "r1", disks: map[uint32][]int{0: {2}}},
}
topoInfo := buildMasterTopology("c", 100, 50, specs)
if _, n := buildBalancerTopology(topoInfo, &Config{DiskType: "hdd"}); n != 2 {
t.Errorf("disk_type=hdd matched %d nodes on an all-HDD cluster, want 2 (hdd must map to the empty HDD key)", n)
}
if _, n := buildBalancerTopology(topoInfo, &Config{DiskType: ""}); n != 2 {
t.Errorf("disk_type=empty matched %d nodes, want 2 (all)", n)
}
if _, n := buildBalancerTopology(topoInfo, &Config{DiskType: "ssd"}); n != 0 {
t.Errorf("disk_type=ssd matched %d nodes on an all-HDD cluster, want 0", n)
}
}
// TestResolveReplicaPlacementFallsBackToMasterDefault verifies the worker mirrors
// the shell: explicit config wins, otherwise the master's default replication is
// the fallback, and an empty or zero-replication value means no constraint.
func TestResolveReplicaPlacementFallsBackToMasterDefault(t *testing.T) {
cases := []struct {
name string
configRP string
defaultRP string
wantApplied bool
}{
{"explicit config used", "010", "", true},
{"explicit config wins over default", "010", "100", true},
{"falls back to master default", "", "010", true},
{"zero master default = no constraint", "", "000", false},
{"empty everywhere = no constraint", "", "", false},
{"invalid value ignored", "", "nonsense", false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
rp := resolveReplicaPlacement(&Config{ReplicaPlacement: tc.configRP},
&types.ClusterInfo{DefaultReplicaPlacement: tc.defaultRP})
if (rp != nil) != tc.wantApplied {
t.Errorf("config=%q default=%q: applied=%v, want %v", tc.configRP, tc.defaultRP, rp != nil, tc.wantApplied)
}
})
}
}
@@ -0,0 +1,58 @@
package ec_balance
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
)
// These cover the worker execution layer (ec_balance_task.go). The shard-balance
// policy and multi-disk placement are tested in the shared ecbalancer package.
// TestValidateRejectsSameNodeCrossDiskMove covers the data-loss trap of a
// same-node, cross-disk shard move. copyAndMountShard skips the copy when source
// and target addresses match, but VolumeEcShardsDelete is node-wide, so executing
// such a move would erase the shard. Validate must reject it, while allowing a
// cross-node move and a same-node/same-disk dedup.
func TestValidateRejectsSameNodeCrossDiskMove(t *testing.T) {
task := NewECBalanceTask("t1", 100, "col1", nil)
mk := func(srcNode string, srcDisk uint32, dstNode string, dstDisk uint32) *worker_pb.TaskParams {
return &worker_pb.TaskParams{
VolumeId: 100,
Sources: []*worker_pb.TaskSource{{Node: srcNode, DiskId: srcDisk, ShardIds: []uint32{3}}},
Targets: []*worker_pb.TaskTarget{{Node: dstNode, DiskId: dstDisk, ShardIds: []uint32{3}}},
}
}
if err := task.Validate(mk("node1", 0, "node1", 3)); err == nil {
t.Error("Validate accepted a same-node cross-disk move; it must reject it to avoid node-wide delete data loss")
}
if err := task.Validate(mk("node1", 0, "node2", 0)); err != nil {
t.Errorf("Validate rejected a legitimate cross-node move: %v", err)
}
if err := task.Validate(mk("node1", 2, "node1", 2)); err != nil {
t.Errorf("Validate rejected a same-node/same-disk dedup: %v", err)
}
}
// TestIsDedupPhaseRequiresSameDisk confirms dedup classification keys on both
// node and disk, so a same-node cross-disk request is never silently routed to
// the unmount+delete path.
func TestIsDedupPhaseRequiresSameDisk(t *testing.T) {
withParams := func(srcDisk, dstDisk uint32) *worker_pb.TaskParams {
return &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{{Node: "node1", DiskId: srcDisk}},
Targets: []*worker_pb.TaskTarget{{Node: "node1", DiskId: dstDisk}},
TaskParams: &worker_pb.TaskParams_EcBalanceParams{
EcBalanceParams: &worker_pb.EcBalanceTaskParams{},
},
}
}
if !isDedupPhase(withParams(2, 2)) {
t.Error("same node and disk should be a dedup phase")
}
if isDedupPhase(withParams(0, 3)) {
t.Error("same node but different disk must NOT be a dedup phase")
}
}
+28 -12
View File
@@ -102,13 +102,22 @@ func (h *ECBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
{
Name: "shard_replica_placement",
Label: "Shard Replica Placement",
Description: "EC shard replica placement constraint (e.g. 020); empty uses the master default replication (even spread only when that default is empty or zero). Mirrors the ec.balance -shardReplicaPlacement flag.",
Placeholder: "master default",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"collection_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
"data_center_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
"disk_type": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
"collection_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
"data_center_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
"disk_type": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
"shard_replica_placement": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
},
},
WorkerConfigForm: &plugin_pb.ConfigForm{
@@ -207,6 +216,10 @@ func (h *ECBalanceHandler) Detect(
if diskType != "" {
workerConfig.TaskConfig.DiskType = diskType
}
replicaPlacement := strings.TrimSpace(pluginworker.ReadStringConfig(request.GetAdminConfigValues(), "shard_replica_placement", ""))
if replicaPlacement != "" {
workerConfig.TaskConfig.ReplicaPlacement = replicaPlacement
}
masters := make([]string, 0)
if request.ClusterContext != nil {
@@ -218,7 +231,10 @@ func (h *ECBalanceHandler) Detect(
return err
}
clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology}
clusterInfo := &workertypes.ClusterInfo{
ActiveTopology: activeTopology,
DefaultReplicaPlacement: pluginworker.FetchDefaultReplicaPlacement(ctx, masters, h.grpcDialOption),
}
maxResults := int(request.MaxResults)
if maxResults < 0 {
maxResults = 0
@@ -427,15 +443,15 @@ func buildECBalanceProposal(result *workertypes.TaskDetectionResult) (*plugin_pb
proposalID = fmt.Sprintf("ec-balance-%d-%d", result.VolumeID, time.Now().UnixNano())
}
// Dedupe key includes volume ID, shard ID, source node, and collection
// to distinguish moves of the same shard from different source nodes (e.g. dedup)
// Dedupe per (collection, volume), NOT per shard. A volume's EC shards can be
// spread across several disks on one server, and concurrent moves of the same
// volume race on its shared .ecx/.ecj/.vif sidecar files. Keying by volume
// makes the scheduler run only one of a volume's moves at a time — both within
// a detection run and against in-flight jobs — and because the planner emits a
// volume's moves in phase order (dedup, cross-rack, within-rack, global), the
// phases then execute in order across detection cycles. This mirrors the shell,
// which serializes a volume's moves and waits between phases.
dedupeKey := fmt.Sprintf("ec_balance:%d", result.VolumeID)
if len(result.TypedParams.Sources) > 0 {
src := result.TypedParams.Sources[0]
if len(src.ShardIds) > 0 {
dedupeKey = fmt.Sprintf("ec_balance:%d:%d:%s", result.VolumeID, src.ShardIds[0], src.Node)
}
}
if result.Collection != "" {
dedupeKey += ":" + result.Collection
}
@@ -118,8 +118,11 @@ func TestBuildECBalanceProposal(t *testing.T) {
if proposal.ProposalId != "test-task-123" {
t.Errorf("expected proposal ID test-task-123, got %s", proposal.ProposalId)
}
if proposal.DedupeKey != "ec_balance:42:5:source:8080:test-col" {
t.Errorf("expected dedupe key ec_balance:42:5:source:8080:test-col, got %s", proposal.DedupeKey)
// Dedupe is per (volume, collection) so a volume's shard moves serialize and
// run in phase order, rather than per shard which allowed concurrent same-volume
// moves that race on the volume's .ecx/.ecj/.vif sidecars.
if proposal.DedupeKey != "ec_balance:42:test-col" {
t.Errorf("expected dedupe key ec_balance:42:test-col, got %s", proposal.DedupeKey)
}
if proposal.Labels["source_node"] != "source:8080" {
t.Errorf("expected source_node label source:8080, got %s", proposal.Labels["source_node"])
+4
View File
@@ -22,6 +22,10 @@ type ClusterInfo struct {
LastUpdated time.Time
ActiveTopology *topology.ActiveTopology // Added for destination planning in detection
VolumeReplicaMap map[uint32][]ReplicaLocation
// DefaultReplicaPlacement is the master's configured default replication
// (GetMasterConfiguration). Detectors use it as the fallback when no explicit
// replica placement is set, matching the shell's behavior. Empty = none.
DefaultReplicaPlacement string
// GrpcDialOption is set when a detector needs to make targeted gRPC calls
// during detection (e.g., the EC detector auto-cleans up an orphaned
// regular replica that survived a previous encode; see #9448). Optional: