EC encode: place shards via ecbalancer.Place + configurable replica placement (#9623)

* Add shared super_block.ResolveReplicaPlacement; use it in ec_balance

* Add ecbalancer.FromActiveTopology snapshot constructor for EC encode/repair

* Add ecbalancer.Place greenfield/repair placement core (strict + durability-first)

* topology: add GetEffectiveAvailableEcShardSlots; FromActiveTopology uses shard-granular free slots

GetDisksWithEffectiveCapacity flattens reserved shard slots into volume slots via
integer truncation, so an in-flight EC task reserving a non-multiple-of-
DataShardsCount number of shards was lost from the snapshot and freeSlots was
over-reported. GetEffectiveAvailableEcShardSlots subtracts the full reservation
impact at shard granularity.

* ecbalancer.Place: reject nodes without a free disk of the requested type

FromActiveTopology keeps all disk types in the snapshot, so an SSD-only request
could be routed to a node with only HDD capacity (pickBestDiskOnNode then returns
disk 0 on the wrong tier). Filter rack/node selection to those with a free disk
of the requested type.

* ecbalancer.Place: enforce ReplicaPlacement DiffDataCenterCount (per-DC shard cap)

* ecbalancer: enforce DiffDataCenterCount in balance (cross-DC phase + cross-rack DC cap)

Adds a cross-DC corrective phase that drains data centers holding more than
DiffDataCenterCount shards of a volume, and a per-DC cap on cross-rack move
targets. Both are no-ops when DiffDataCenterCount is unset, so balance output is
unchanged for non-DC placements.

* topology: ratio-aware EC shard slots and provisional empty-disk slot

GetEffectiveAvailableEcShardSlots now takes the target collection's data-shard
count, so a 4+2 volume's larger shards are not over-counted at 10 per volume slot;
and it keeps the one provisional slot for freshly started empty servers that
report max=0, matching getEffectiveAvailableCapacityUnsafe. FromActiveTopology
threads the ratio through.

* ecbalancer.Place: explicit disk-type filter signal (fix HDD vs any ambiguity)

HardDriveType normalizes to "", which collided with "" meaning any disk. Add
Constraints.FilterDiskType and normalize both sides so a hdd request matches disks
reported as "" and never leaks to SSD, while filter=false still means any.

* ecbalancer: add clearShardAccounting for repair snapshot reconciliation

Clears one disk's copy of a shard from per-domain accounting and recomputes the
node-level union (preserving a kept copy on another disk of the same node), without
crediting capacity. Repair uses it to drop to-be-deleted copies before placing
missing shards.

* ecbalancer: don't cap cross-DC target racks when DiffRackCount is unset

len(racks)+1 wrongly limited each target rack (3 in a 2-rack cluster), so draining
a DC could stop short of the DiffDataCenterCount cap. Use MaxShardCount+1 as the
effectively-unlimited default.

* topology/ecbalancer: ratio-correct EC capacity accounting

Reservation shard slots (default ShardsPerVolumeSlot units) are now converted to
the target ratio before subtracting, and existing EC shards are charged by size
(targetDataShards/shardDataShards) so a 2+1 shard isn't counted as one 10+4 slot.
Per-shard ratio lookup is behind shardDataShards (OSS uses the standard ratio).

* ecbalancer.Place: candidate tiering and eligible-rack caps

Adds a per-disk eligibility/preference abstraction so Place supports:
- preferred-tag whole-plan retry (try disks carrying the earliest tags first,
  widen to all only if a tier cannot place every shard; reports
  SpilledOutsidePreferredTags),
- soft disk-type spill via DiskTypePolicy (Any/Prefer/Require): Prefer fills the
  preferred type then spills, reporting SpilledToOtherDiskType; Require filters,
- even per-rack caps that divide by racks holding an eligible disk, so a tiered
  cluster (e.g. SSDs in 2 of 4 racks) isn't capped impossibly low.
Disk tags carried via Node.AddDiskTags + FromActiveTopology.

* ecbalancer: export ClearShardAccounting for repair snapshot reconciliation

* ecbalancer: address review feedback (ratio rounding, bitmap walk, same-DC moves)

- topology/ecbalancer: round shard-reservation and existing-shard footprint up
  when converting to target-ratio shard slots, so a sub-slot reservation is not
  truncated to zero and free capacity is not overstated for low-data-shard
  layouts (targetDataShards < ds).
- erasure_coding: add ShardBits.All iterator and use it across the balancer,
  cross-DC phase, and placement scoring instead of scanning 0..MaxShardCount and
  probing Has on every id.
- ecbalancer: allow same-DC cross-rack moves when a DC already sits at its
  DiffDataCenterCount cap; a same-DC move leaves the DC total unchanged. Add a
  regression test that fails without the guard.
- ecbalancer cross-DC phase: pick targets via the eligible-aware
  pickNodeInRackEligible/pickBestDiskEligible helpers so the disk-type filter is
  honored and a 0 disk id is not mistaken for a valid selection.

* ecbalancer: test ecShardSlotsOnDisk fractional round-up

Cover the mixed-ratio path (targetDataShards < existing data shards) so a
shard's fractional footprint is never floored to zero and free capacity is not
overstated. Exercises the round-up via the targetDataShards parameter; OSS uses
the standard ratio at runtime while the enterprise build hits it with real
per-volume ratios.

* ecbalancer: assert node B rack in TestFromActiveTopology

* ecbalancer: split Destination into separate DataCenter and bare Rack

Replace the composite "dc:rack" Rack field on Destination with separate
DataCenter and bare Rack values, matching topology.DiskInfo and the worker-task
convention. Callers (and tests) read the data center directly instead of parsing
the composite with strings.SplitN.

* shell ec.balance: use utilization-based global balancing (parity with worker)

The shell's global rebalance phase balanced by raw shard count; switch it to
fractional fullness (shards/capacity), as the worker already does. On uniform
capacity the two agree; on heterogeneous capacity it fills nodes proportionally
instead of driving small-capacity nodes toward full.

Updates the heterogeneous-capacity regression test to assert even fullness
(~equal shards/capacity per node) rather than even shard count.

* ecbalancer: bounded-proportional per-DC shard spread

DiffDataCenterCount was enforced only as a ceiling (drain-to-cap), which could
leave a within-cap-but-lopsided DC distribution under a loose cap (e.g. 10/4 of 14
with cap=10). Now the cross-DC phase, the cross-rack DC guard, and Place all target
boundedMaxPerDC = min(DiffDataCenterCount, max(ceil(total/numDCs), parityShards)):
shards spread proportionally across DCs, but no tighter than the durability floor
(once each DC holds <= parityShards a DC loss is recoverable, so further spreading
only adds cross-DC/WAN traffic). No-op when DiffDataCenterCount is 0; identical to
before when the cap is the binding constraint.

* ecbalancer: drop DiffDataCenterCount enforcement for EC placement

The 1-byte volume ReplicaPlacement packs xyz into x*100+y*10+z<=255, so the DC
digit can only be 0-2 -- far too small to be a meaningful per-DC EC shard cap (a
cap of 1-2 would demand 7-14 DCs for a 10+4 volume). It's volume replica-placement,
not an EC spec. Removes the cross-DC balance phase, the DC guard in the cross-rack
phase, and the per-DC cap in Place (and the just-added bounded-proportional logic);
EC relies on the RP-independent rack/node even spread instead. Rack/node caps
(DiffRackCount/SameRackCount) are unchanged. Per-domain EC caps are left for a real
EC placement spec.

* ecbalancer: enforce per-disk durability cap; symmetric reserve/release

Place now refuses to put more than parityShards shards of a volume on a single
disk (pickBestDiskEligible skips a disk once it holds parityShards of the volume,
a hard cap not relaxed even in durability-first). Previously Place assigned by
free capacity, so a skewed near-full cluster could pile >parityShards onto one
disk -> losing it loses the volume; only distinct-disk count was checked. This
covers encode and repair (both route through Place); the caller skips/leaves the
volume rather than minting an unrecoverable layout.

Also makes reserveShard decrement freeSlots unconditionally, symmetric with
releaseShard's unconditional increment (the old guarded decrement could credit a
phantom slot on release if a shard were ever reserved onto a full disk).

* ecbalancer: add Topology.ReleaseVolumeShards (clear + credit) for greenfield encode

Releases all of a volume's shards from the snapshot and credits the freed disk
capacity, so a greenfield encode can plan as if stale EC shards from a prior failed
attempt are gone. Safe to credit because the encode task deletes stale shards
(cleanupStaleEcShards) before distributing the new ones. Distinct from
ClearShardAccounting (repair), which does not credit.

* ecbalancer: ReleaseVolumeShards credits node freeSlots, not just disks

releaseShard only increments per-disk freeSlots, but rack capacity is summed from
node freeSlots (buildRacks) and node freeSlots gates node eligibility. Crediting
only disks left a node/rack looking full after releasing stale shards, so a
greenfield encode still couldn't use the freed capacity. Now credits the node by
the total disk-slots freed.

* ecbalancer: correct PlacementMode docs (encode uses durability-first)

PlaceStrict was labeled '(encode)' but encode uses PlaceDurabilityFirst. Clarify
that durability-first is used by both encode and repair, reports relaxations in
PlaceResult.Relaxed, and never relaxes the per-disk durability cap.

* ecbalancer: treat SameRackCount as a direct per-node shard cap

The 3rd ReplicaPlacement digit now caps shards per node at exactly the digit
value, matching how DiffRackCount (2nd digit) caps per rack, instead of allowing
digit+1 per node. This makes the per-rack and per-node caps consistent and
matches the documented "digits cap EC shards per rack and per node" semantics;
e.g. 011 now means at most one shard per rack and one per node.

* EC encode: place shards via ecbalancer.Place + configurable replica placement

Encode now plans destinations through the shared ecbalancer.Place policy
(durability-first: prefers the source disk type and honors replica placement /
caps / anti-affinity, relaxing rather than failing when capacity is tight) instead
of the EC-only placement planner. Targets and capacity reservations use Place's
actual per-disk shard assignment, not a round-robin guess; cross-volume in-cycle
capacity is tracked by ActiveTopology's pending task, so the cached planner is no
longer consulted. Adds a configurable replica_placement (proto field 6 + worker
form + reader) that overrides the master default replication.

The placement-package planner code is left in place (now unused) and removed in a
follow-up that drops the package.

* EC encode: drop unused dataShards param from createECTargets

Addresses review feedback: after switching to Place's per-disk shardsPerPlan
assignment, createECTargets no longer needs the data-shard count.

* EC encode: fix packed-target validation, greenfield stale-shard accounting, RP docs

- Validate counts distinct shard ids across targets, not target rows, so packed
  plans (fewer (node,disk) targets than shards) aren't rejected.
- planECDestinations releases the volume's stale EC shards from the snapshot before
  Place (ReleaseVolumeShards), crediting their capacity. The encode task deletes
  stale shards before distributing, so a retry on tight capacity no longer fails
  planning by counting shards that are about to be removed.
- replica_placement config/form help no longer claims a data-center limit (the DC
  digit is ignored for EC); detection logs a warning when a DC digit is set.

* EC encode: surface relaxed placement; mark replica_placement best-effort

Encode places with PlaceDurabilityFirst (the chosen lenient behavior), which can
relax caps/anti-affinity/replica-placement to avoid deferring. That was silent
(only disk-type/tag spills were logged). Now logs PlaceResult.Relaxed so a tight
replica placement isn't weakened unnoticed, and the config/form help states the
rack/node caps are best-effort during encode (enforced by rebalancing).

* EC encode: key per-disk shard grouping by struct, not formatted string

planECDestinations grouped destinations using a fmt.Sprintf("%s:%d") map key
per shard; use a {node,diskID} struct key and pre-size the map/slice to the
shard count to drop the per-shard string allocation.
This commit is contained in:
Chris Lu
2026-05-22 20:22:30 -07:00
committed by GitHub
parent d4e39b499b
commit 0566fbd552
8 changed files with 216 additions and 168 deletions
+1
View File
@@ -374,6 +374,7 @@ message ErasureCodingTaskConfig {
int32 min_volume_size_mb = 3; // Minimum volume size for EC
string collection_filter = 4; // Only process volumes from specific collections
repeated string preferred_tags = 5; // Disk tags to prioritize for EC shard placement
string replica_placement = 6; // EC shard replica placement (e.g. "020"); empty falls back to master default replication
}
// BalanceTaskConfig contains balance-specific configuration
+11 -2
View File
@@ -2960,6 +2960,7 @@ type ErasureCodingTaskConfig struct {
MinVolumeSizeMb int32 `protobuf:"varint,3,opt,name=min_volume_size_mb,json=minVolumeSizeMb,proto3" json:"min_volume_size_mb,omitempty"` // Minimum volume size for EC
CollectionFilter string `protobuf:"bytes,4,opt,name=collection_filter,json=collectionFilter,proto3" json:"collection_filter,omitempty"` // Only process volumes from specific collections
PreferredTags []string `protobuf:"bytes,5,rep,name=preferred_tags,json=preferredTags,proto3" json:"preferred_tags,omitempty"` // Disk tags to prioritize for EC shard placement
ReplicaPlacement string `protobuf:"bytes,6,opt,name=replica_placement,json=replicaPlacement,proto3" json:"replica_placement,omitempty"` // EC shard replica placement (e.g. "020"); empty falls back to master default replication
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -3029,6 +3030,13 @@ func (x *ErasureCodingTaskConfig) GetPreferredTags() []string {
return nil
}
func (x *ErasureCodingTaskConfig) GetReplicaPlacement() string {
if x != nil {
return x.ReplicaPlacement
}
return ""
}
// BalanceTaskConfig contains balance-specific configuration
type BalanceTaskConfig struct {
state protoimpl.MessageState `protogen:"open.v1"`
@@ -4218,13 +4226,14 @@ const file_worker_proto_rawDesc = "" +
"\x10VacuumTaskConfig\x12+\n" +
"\x11garbage_threshold\x18\x01 \x01(\x01R\x10garbageThreshold\x12/\n" +
"\x14min_volume_age_hours\x18\x02 \x01(\x05R\x11minVolumeAgeHours\x120\n" +
"\x14min_interval_seconds\x18\x03 \x01(\x05R\x12minIntervalSeconds\"\xed\x01\n" +
"\x14min_interval_seconds\x18\x03 \x01(\x05R\x12minIntervalSeconds\"\x9a\x02\n" +
"\x17ErasureCodingTaskConfig\x12%\n" +
"\x0efullness_ratio\x18\x01 \x01(\x01R\rfullnessRatio\x12*\n" +
"\x11quiet_for_seconds\x18\x02 \x01(\x05R\x0fquietForSeconds\x12+\n" +
"\x12min_volume_size_mb\x18\x03 \x01(\x05R\x0fminVolumeSizeMb\x12+\n" +
"\x11collection_filter\x18\x04 \x01(\tR\x10collectionFilter\x12%\n" +
"\x0epreferred_tags\x18\x05 \x03(\tR\rpreferredTags\"n\n" +
"\x0epreferred_tags\x18\x05 \x03(\tR\rpreferredTags\x12+\n" +
"\x11replica_placement\x18\x06 \x01(\tR\x10replicaPlacement\"n\n" +
"\x11BalanceTaskConfig\x12/\n" +
"\x13imbalance_threshold\x18\x01 \x01(\x01R\x12imbalanceThreshold\x12(\n" +
"\x10min_server_count\x18\x02 \x01(\x05R\x0eminServerCount\"I\n" +
@@ -17,6 +17,7 @@ type Config struct {
CollectionFilter string `json:"collection_filter"`
MinSizeMB int `json:"min_size_mb"`
PreferredTags []string `json:"preferred_tags"`
ReplicaPlacement string `json:"replica_placement"` // e.g. "020"; empty falls back to the master default replication
}
// NewDefaultConfig creates a new default erasure coding configuration
@@ -157,6 +158,19 @@ func GetConfigSpec() base.ConfigSpec {
InputType: "text",
CSSClasses: "form-control",
},
{
Name: "replica_placement",
JSONName: "replica_placement",
Type: config.FieldTypeString,
DefaultValue: "",
Required: false,
DisplayName: "Replica Placement",
Description: "EC shard replica placement constraint (e.g. 020)",
HelpText: "Leave empty to use the master default replication. When set, the 2nd/3rd digits cap EC shards per rack and per node (best-effort during encode: relaxed rather than failing if the cluster can't satisfy them, then enforced by rebalancing). The 1st (data-center) digit is ignored for EC placement",
Placeholder: "020",
InputType: "text",
CSSClasses: "form-control",
},
},
}
}
@@ -177,6 +191,7 @@ func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
MinVolumeSizeMb: int32(c.MinSizeMB),
CollectionFilter: c.CollectionFilter,
PreferredTags: preferredTagsCopy,
ReplicaPlacement: c.ReplicaPlacement,
},
},
}
@@ -200,6 +215,7 @@ func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
c.MinSizeMB = int(ecConfig.MinVolumeSizeMb)
c.CollectionFilter = ecConfig.CollectionFilter
c.PreferredTags = append([]string(nil), ecConfig.PreferredTags...)
c.ReplicaPlacement = ecConfig.ReplicaPlacement
}
return nil
+123 -128
View File
@@ -14,7 +14,9 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/ecbalancer"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
@@ -56,7 +58,17 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
skippedTooFewNodes := 0
consecutivePlanningFailures := 0
var planner *ecPlacementPlanner
// EC shard replica placement: explicit config wins, else the master default.
var replicaPlacement *super_block.ReplicaPlacement
if clusterInfo != nil {
replicaPlacement = super_block.ResolveReplicaPlacement(ecConfig.ReplicaPlacement, clusterInfo.DefaultReplicaPlacement)
}
// EC placement honors only the rack/node digits; the data-center digit can't
// express a useful per-DC EC shard cap (it maxes at 2). Warn once per cycle so a
// 1xx/2xx setting isn't silently ineffective.
if replicaPlacement != nil && replicaPlacement.DiffDataCenterCount > 0 {
glog.Warningf("EC Detection: replica placement data-center digit (%d) is ignored for EC; only rack/node digits are honored", replicaPlacement.DiffDataCenterCount)
}
allowedCollections := wildcard.CompileWildcardMatchers(ecConfig.CollectionFilter)
@@ -219,12 +231,9 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
}
glog.Infof("EC Detection: ActiveTopology available, planning destinations for volume %d", metric.VolumeID)
if planner == nil {
planner = newECPlacementPlanner(clusterInfo.ActiveTopology, ecConfig.PreferredTags)
}
dataShards := erasure_coding.DataShardsCount
parityShards := erasure_coding.ParityShardsCount
multiPlan, err := planECDestinations(planner, metric, ecConfig, dataShards, parityShards)
multiPlan, shardsPerPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig, replicaPlacement, dataShards, parityShards)
if err != nil {
glog.V(2).Infof("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err)
consecutivePlanningFailures++
@@ -304,14 +313,13 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)",
len(replicaLocations), len(existingECShards), metric.VolumeID, len(sources))
// Convert shard destinations to TaskDestinationSpec. With fewer
// disks than shards a destination holds several shards, so reserve
// capacity for the actual per-disk shard count (round-robin matches
// createECTargets) rather than assuming one shard each.
// Convert shard destinations to TaskDestinationSpec. A destination may
// hold several shards (small clusters), so reserve capacity for the
// actual per-disk shard count that Place assigned (shardsPerPlan),
// which is exactly what createECTargets writes.
destinations := make([]topology.TaskDestinationSpec, len(shardDestinations))
shardsPerDest := distributeECShards(dataShards+parityShards, len(shardDestinations))
for i, dest := range shardDestinations {
shardCount := len(shardsPerDest[i])
shardCount := len(shardsPerPlan[i])
shardImpact := topology.CalculateECShardStorageImpact(int32(shardCount), int64(expectedShardSize))
destSize := int64(expectedShardSize) * int64(shardCount)
destinations[i] = topology.TaskDestinationSpec{
@@ -342,9 +350,9 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
continue // Skip this volume if topology task addition fails
}
if planner != nil {
planner.applyTaskReservations(int64(metric.Size), sources, destinations)
}
// Cross-volume in-cycle capacity is tracked by ActiveTopology via the
// pending task above, which the next volume's FromActiveTopology snapshot
// reflects; no separate planner reservation is needed.
glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations",
taskID, metric.VolumeID, len(sources), len(multiPlan.Plans))
@@ -360,7 +368,7 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
Sources: sourcesProto,
// Unified targets - all EC shard destinations
Targets: createECTargets(multiPlan, dataShards, parityShards),
Targets: createECTargets(multiPlan, shardsPerPlan),
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
ErasureCodingParams: createECTaskParams(dataShards, parityShards, metric.DiskType),
@@ -699,12 +707,20 @@ func countTopologyNodes(at *topology.ActiveTopology) int {
return n
}
func planECDestinations(planner *ecPlacementPlanner, metric *types.VolumeHealthMetrics, ecConfig *Config, dataShards, parityShards int) (*topology.MultiDestinationPlan, error) {
if planner == nil || planner.activeTopology == nil {
return nil, fmt.Errorf("active topology not available for EC placement")
// planECDestinations places all shards of the volume via the shared ecbalancer
// policy and returns the per-disk destination plans plus, parallel to them, the
// shard ids ecbalancer.Place assigned to each disk (so createECTargets and the
// capacity reservations use the real assignment, not a round-robin guess).
//
// Encode is lenient (PlaceDurabilityFirst): it relaxes caps/anti-affinity/RP as
// needed rather than fail, and prefers the source disk type but spills if that
// type can't hold every shard. rp is the resolved replica placement (may be nil).
func planECDestinations(at *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config, rp *super_block.ReplicaPlacement, dataShards, parityShards int) (*topology.MultiDestinationPlan, [][]uint32, error) {
if at == nil {
return nil, nil, fmt.Errorf("active topology not available for EC placement")
}
if dataShards <= 0 || parityShards <= 0 {
return nil, fmt.Errorf("invalid EC ratio: dataShards=%d parityShards=%d", dataShards, parityShards)
return nil, nil, fmt.Errorf("invalid EC ratio: dataShards=%d parityShards=%d", dataShards, parityShards)
}
totalShards := dataShards + parityShards
// Survive losing one disk: each disk holds at most parityShards shards,
@@ -712,105 +728,103 @@ func planECDestinations(planner *ecPlacementPlanner, metric *types.VolumeHealthM
minTotalDisks := (totalShards + parityShards - 1) / parityShards
expectedShardSize := uint64(metric.Size) / uint64(dataShards)
// Get source node information from topology
var sourceRack, sourceDC string
// Extract rack and DC from topology info
topologyInfo := planner.activeTopology.GetTopologyInfo()
if topologyInfo != nil {
for _, dc := range topologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, dataNodeInfo := range rack.DataNodeInfos {
if dataNodeInfo.Id == metric.Server {
sourceDC = dc.Id
sourceRack = rack.Id
break
}
}
if sourceRack != "" {
break
}
}
if sourceDC != "" {
break
}
}
snap := ecbalancer.FromActiveTopology(at, dataShards)
// Encode is greenfield: any EC shards already present for this volume are stale
// leftovers from a prior failed attempt, which the task deletes
// (cleanupStaleEcShards) before distributing the new shards. Release them so they
// don't occupy capacity or skew anti-affinity / per-disk caps during planning.
snap.ReleaseVolumeShards(metric.Collection, metric.VolumeID)
need := make([]int, totalShards)
for i := range need {
need[i] = i
}
// Select best disks for EC placement with rack/DC diversity using the cached planner.
// Pass source disk type so placement prefers matching-type disks (#9423).
selectedDisks, err := planner.selectDestinations(sourceRack, sourceDC, metric.DiskType, totalShards)
res, err := snap.Place(metric.VolumeID, metric.Collection, need, ecbalancer.Constraints{
DiskType: metric.DiskType,
DiskTypePolicy: ecbalancer.DiskTypePrefer,
PreferredTags: ecConfig.PreferredTags,
ReplicaPlacement: rp,
Ratio: func(string) (int, int) { return dataShards, parityShards },
}, ecbalancer.PlaceDurabilityFirst)
if err != nil {
return nil, err
return nil, nil, err
}
if len(selectedDisks) < minTotalDisks {
return nil, fmt.Errorf("found %d disks, but EC %d+%d needs at least %d disks so no disk holds more than %d shards",
len(selectedDisks), dataShards, parityShards, minTotalDisks, parityShards)
if res.SpilledToOtherDiskType {
glog.Warningf("EC volume %d: placed shards outside preferred disk type %q", metric.VolumeID, metric.DiskType)
}
// Fewer than totalShards disks is fine: createECTargets round-robins the
// shards across the available disks, packing several distinct shards onto a
// disk when needed (matching ec.encode's "spread as 4,4,3,3" fallback for
// small clusters). A disk holding several shards of one volume is safe —
// each is a separate .ecNN file and ReceiveFile keys by that extension. The
// minTotalDisks floor above keeps any single disk under parityShards shards,
// so the volume still survives losing any one disk.
if len(selectedDisks) < totalShards {
glog.V(1).Infof("EC volume %d: only %d disks for %d shards, packing up to %d shards per disk",
metric.VolumeID, len(selectedDisks), totalShards, (totalShards+len(selectedDisks)-1)/len(selectedDisks))
if res.SpilledOutsidePreferredTags {
glog.Warningf("EC volume %d: placed shards outside preferred tags %v", metric.VolumeID, ecConfig.PreferredTags)
}
if len(res.Relaxed) > 0 {
// Encode is best-effort (PlaceDurabilityFirst): it relaxes these constraints
// rather than defer when the cluster can't satisfy them. Surface it so a tight
// replica placement isn't silently weakened; rebalancing tightens the spread.
glog.Warningf("EC volume %d: placed with relaxed constraints %v; replica placement not fully satisfied (rebalancing will adjust)", metric.VolumeID, res.Relaxed)
}
// Group the per-shard destinations into one plan per (node,disk), iterating
// shard ids in order for determinism.
type diskGroup struct {
node, rack, dc string
diskID uint32
shards []uint32
}
type diskKey struct {
node string
diskID uint32
}
groups := make(map[diskKey]*diskGroup, totalShards)
order := make([]diskKey, 0, totalShards)
for sid := 0; sid < totalShards; sid++ {
d, ok := res.Destinations[sid]
if !ok {
return nil, nil, fmt.Errorf("EC volume %d: shard %d was not placed", metric.VolumeID, sid)
}
key := diskKey{node: d.Node, diskID: d.DiskID}
g := groups[key]
if g == nil {
g = &diskGroup{node: d.Node, rack: d.Rack, dc: d.DataCenter, diskID: d.DiskID}
groups[key] = g
order = append(order, key)
}
g.shards = append(g.shards, uint32(sid))
}
if len(order) < minTotalDisks {
return nil, nil, fmt.Errorf("placed onto %d disks, but EC %d+%d needs at least %d so no disk holds more than %d shards",
len(order), dataShards, parityShards, minTotalDisks, parityShards)
}
var plans []*topology.DestinationPlan
shardsPerPlan := make([][]uint32, 0, len(order))
rackCount := make(map[string]int)
dcCount := make(map[string]int)
for _, disk := range selectedDisks {
// Get the target server address
targetAddress, err := workerutil.ResolveServerAddress(disk.NodeID, planner.activeTopology)
for _, key := range order {
g := groups[key]
targetAddress, err := workerutil.ResolveServerAddress(g.node, at)
if err != nil {
return nil, fmt.Errorf("failed to resolve address for target server %s: %v", disk.NodeID, err)
return nil, nil, fmt.Errorf("failed to resolve address for target server %s: %v", g.node, err)
}
plan := &topology.DestinationPlan{
TargetNode: disk.NodeID,
TargetAddress: targetAddress,
TargetDisk: disk.DiskID,
TargetRack: disk.Rack,
TargetDC: disk.DataCenter,
ExpectedSize: expectedShardSize, // Set calculated EC shard size
PlacementScore: calculateECScoreCandidate(disk, sourceRack, sourceDC),
}
plans = append(plans, plan)
// Count rack and DC diversity
rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
rackCount[rackKey]++
dcCount[disk.DataCenter]++
plans = append(plans, &topology.DestinationPlan{
TargetNode: g.node,
TargetAddress: targetAddress,
TargetDisk: g.diskID,
TargetRack: g.rack,
TargetDC: g.dc,
ExpectedSize: expectedShardSize,
})
shardsPerPlan = append(shardsPerPlan, g.shards)
rackCount[fmt.Sprintf("%s:%s", g.dc, g.rack)]++
dcCount[g.dc]++
}
// Log capacity utilization information using ActiveTopology's encapsulated logic
totalEffectiveCapacity := int64(0)
for _, plan := range plans {
key := ecDiskKey(plan.TargetNode, plan.TargetDisk)
if candidate, ok := planner.candidateByKey[key]; ok {
totalEffectiveCapacity += int64(candidate.FreeSlots)
}
}
glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs, total effective capacity=%d slots",
metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount), totalEffectiveCapacity)
// Log storage impact for EC task (source only - EC has multiple targets handled individually)
sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, int64(metric.Size))
glog.V(2).Infof("EC task capacity management: source_reserves_with_zero_impact={VolumeSlots:%d, ShardSlots:%d}, %d_targets_will_receive_shards, estimated_size=%d",
sourceChange.VolumeSlots, sourceChange.ShardSlots, len(plans), metric.Size)
glog.V(2).Infof("EC source reserves capacity but with zero StorageSlotChange impact")
glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d disks, %d racks, %d DCs",
metric.VolumeID, metric.Size, expectedShardSize, totalShards, len(plans), len(rackCount), len(dcCount))
return &topology.MultiDestinationPlan{
Plans: plans,
TotalShards: len(plans),
TotalShards: totalShards,
SuccessfulRack: len(rackCount),
SuccessfulDCs: len(dcCount),
}, nil
}, shardsPerPlan, nil
}
// distributeECShards assigns shard ids 0..totalShards-1 across numTargets
@@ -830,41 +844,22 @@ func distributeECShards(totalShards, numTargets int) [][]uint32 {
return targetShards
}
// createECTargets builds TaskTargets, round-robining shards across the plan
// entries. With fewer disks than shards a target receives several shard ids.
func createECTargets(multiPlan *topology.MultiDestinationPlan, dataShards, parityShards int) []*worker_pb.TaskTarget {
var targets []*worker_pb.TaskTarget
numTargets := len(multiPlan.Plans)
totalShards := dataShards + parityShards
targetShards := distributeECShards(totalShards, numTargets)
// createECTargets builds TaskTargets from the per-disk plans and the shard ids
// ecbalancer.Place assigned to each (shardsPerPlan is parallel to multiPlan.Plans).
func createECTargets(multiPlan *topology.MultiDestinationPlan, shardsPerPlan [][]uint32) []*worker_pb.TaskTarget {
targets := make([]*worker_pb.TaskTarget, 0, len(multiPlan.Plans))
for i, plan := range multiPlan.Plans {
target := &worker_pb.TaskTarget{
shardIDs := shardsPerPlan[i]
targets = append(targets, &worker_pb.TaskTarget{
Node: plan.TargetAddress,
DiskId: plan.TargetDisk,
Rack: plan.TargetRack,
DataCenter: plan.TargetDC,
ShardIds: targetShards[i],
ShardIds: shardIDs,
EstimatedSize: plan.ExpectedSize,
}
targets = append(targets, target)
assignedData := make([]uint32, 0)
assignedParity := make([]uint32, 0)
for _, shardId := range targetShards[i] {
if int(shardId) < dataShards {
assignedData = append(assignedData, shardId)
} else {
assignedParity = append(assignedParity, shardId)
}
}
glog.V(2).Infof("EC planning: target %s assigned shards %v (data: %v, parity: %v)",
plan.TargetNode, targetShards[i], assignedData, assignedParity)
})
glog.V(2).Infof("EC planning: target %s disk %d assigned shards %v", plan.TargetNode, plan.TargetDisk, shardIDs)
}
glog.V(1).Infof("EC planning: distributed %d shards across %d targets using round-robin (data shards 0-%d, parity shards %d-%d)",
totalShards, numTargets, dataShards-1, dataShards, totalShards-1)
return targets
}
@@ -19,9 +19,6 @@ func TestPlanECDestinationsPrefersSourceDiskType_FullCluster(t *testing.T) {
// for a 10+4 layout with one-shard-per-(server,disk) diversity.
activeTopology := buildActiveTopology(t, erasure_coding.TotalShardsCount, []string{"hdd", "ssd"}, 100, 0)
planner := newECPlacementPlanner(activeTopology, nil)
require.NotNil(t, planner)
metric := &types.VolumeHealthMetrics{
VolumeID: 1,
Server: "10.0.0.1:8080",
@@ -30,7 +27,7 @@ func TestPlanECDestinationsPrefersSourceDiskType_FullCluster(t *testing.T) {
DiskType: "ssd", // the property being plumbed end-to-end
}
plan, err := planECDestinations(planner, metric, NewDefaultConfig(), erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
plan, _, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.NoError(t, err)
require.Len(t, plan.Plans, erasure_coding.TotalShardsCount)
@@ -67,9 +64,6 @@ func TestPlanECDestinationsSpillsToOtherDiskType_WhenPreferredScarce(t *testing.
}
require.NoError(t, activeTopology.UpdateTopology(topo))
planner := newECPlacementPlanner(activeTopology, nil)
require.NotNil(t, planner)
metric := &types.VolumeHealthMetrics{
VolumeID: 2,
Server: "10.0.0.1:8080",
@@ -78,7 +72,7 @@ func TestPlanECDestinationsSpillsToOtherDiskType_WhenPreferredScarce(t *testing.
DiskType: "ssd",
}
plan, err := planECDestinations(planner, metric, NewDefaultConfig(), erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
plan, _, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.NoError(t, err)
require.Len(t, plan.Plans, erasure_coding.TotalShardsCount)
@@ -47,8 +47,6 @@ func TestECPlacementPlannerApplyReservations(t *testing.T) {
func TestPlanECDestinationsUsesPlanner(t *testing.T) {
activeTopology := buildActiveTopology(t, 7, []string{"hdd", "ssd"}, 100, 0)
planner := newECPlacementPlanner(activeTopology, nil)
require.NotNil(t, planner)
metric := &types.VolumeHealthMetrics{
VolumeID: 1,
@@ -57,10 +55,10 @@ func TestPlanECDestinationsUsesPlanner(t *testing.T) {
Collection: "",
}
plan, err := planECDestinations(planner, metric, NewDefaultConfig(), erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
plan, shardsPerPlan, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.NoError(t, err)
require.NotNil(t, plan)
assert.Equal(t, erasure_coding.TotalShardsCount, len(plan.Plans))
requireAllShardsPlaced(t, plan, shardsPerPlan)
}
func TestECPlacementPlannerPrefersTaggedDisks(t *testing.T) {
@@ -363,9 +361,6 @@ func TestPlanECDestinationsSpreadsAcrossPhysicalDisks(t *testing.T) {
}},
}))
planner := newECPlacementPlanner(activeTopology, nil)
require.NotNil(t, planner)
metric := &types.VolumeHealthMetrics{
VolumeID: 42,
Server: "127.0.0.1:8081",
@@ -373,23 +368,14 @@ func TestPlanECDestinationsSpreadsAcrossPhysicalDisks(t *testing.T) {
Collection: "",
}
plan, err := planECDestinations(planner, metric, NewDefaultConfig(), erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
plan, shardsPerPlan, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.NoError(t, err)
require.NotNil(t, plan)
require.Equal(t, erasure_coding.TotalShardsCount, len(plan.Plans))
seen := make(map[string]bool, len(plan.Plans))
for _, p := range plan.Plans {
key := fmt.Sprintf("%s:%d", p.TargetNode, p.TargetDisk)
assert.False(t, seen[key], "duplicate (server,disk_id) target %s", key)
seen[key] = true
}
requireAllShardsPlaced(t, plan, shardsPerPlan)
}
func TestPlanECDestinationsFailsWithInsufficientCapacity(t *testing.T) {
activeTopology := buildActiveTopology(t, 1, []string{"hdd"}, 1, 1)
planner := newECPlacementPlanner(activeTopology, nil)
require.NotNil(t, planner)
metric := &types.VolumeHealthMetrics{
VolumeID: 2,
@@ -398,7 +384,7 @@ func TestPlanECDestinationsFailsWithInsufficientCapacity(t *testing.T) {
Collection: "",
}
_, err := planECDestinations(planner, metric, NewDefaultConfig(), erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
_, _, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.Error(t, err)
}
@@ -440,9 +426,6 @@ func TestPlanECDestinationsPacksWhenFewerDisksThanShards(t *testing.T) {
DataCenterInfos: []*master_pb.DataCenterInfo{{Id: "dc1", RackInfos: rackInfos}},
}))
planner := newECPlacementPlanner(activeTopology, nil)
require.NotNil(t, planner)
metric := &types.VolumeHealthMetrics{
VolumeID: 4569,
Server: "192.168.1.145:8081",
@@ -450,16 +433,18 @@ func TestPlanECDestinationsPacksWhenFewerDisksThanShards(t *testing.T) {
Collection: "",
}
plan, err := planECDestinations(planner, metric, NewDefaultConfig(), erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
plan, shardsPerPlan, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.NoError(t, err)
require.NotNil(t, plan)
// One plan entry per available disk; fewer than the 14 shards.
require.Equal(t, numServers, len(plan.Plans))
// Packed onto the available disks: more than one shard per disk but never more
// than the 8 disks, and at least the durability floor of distinct disks.
require.LessOrEqual(t, len(plan.Plans), numServers)
require.GreaterOrEqual(t, len(plan.Plans), (erasure_coding.TotalShardsCount+erasure_coding.ParityShardsCount-1)/erasure_coding.ParityShardsCount)
// createECTargets must cover all 14 shards exactly once, packing onto the
// available disks without any disk exceeding parityShards shards.
targets := createECTargets(plan, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.Equal(t, numServers, len(targets))
targets := createECTargets(plan, shardsPerPlan)
require.Equal(t, len(plan.Plans), len(targets))
seenShards := make(map[uint32]bool)
for _, target := range targets {
@@ -473,6 +458,28 @@ func TestPlanECDestinationsPacksWhenFewerDisksThanShards(t *testing.T) {
require.Len(t, seenShards, erasure_coding.TotalShardsCount, "every shard must be placed exactly once")
}
// requireAllShardsPlaced asserts every EC shard landed exactly once, on a distinct
// (node,disk) target, with no disk holding more than parityShards shards (so losing
// any one disk cannot lose the volume). shardsPerPlan is parallel to plan.Plans.
func requireAllShardsPlaced(t *testing.T, plan *topology.MultiDestinationPlan, shardsPerPlan [][]uint32) {
t.Helper()
require.Equal(t, len(plan.Plans), len(shardsPerPlan), "one shard list per plan entry")
keys := make(map[string]bool, len(plan.Plans))
seen := make(map[uint32]bool)
for i, p := range plan.Plans {
key := fmt.Sprintf("%s:%d", p.TargetNode, p.TargetDisk)
require.False(t, keys[key], "duplicate (node,disk) target %s", key)
keys[key] = true
require.LessOrEqual(t, len(shardsPerPlan[i]), erasure_coding.ParityShardsCount,
"disk %s holds %d shards, over parityShards", key, len(shardsPerPlan[i]))
for _, s := range shardsPerPlan[i] {
require.False(t, seen[s], "shard %d placed more than once", s)
seen[s] = true
}
}
require.Len(t, seen, erasure_coding.TotalShardsCount, "every shard must be placed exactly once")
}
func buildVolumeMetricsForIDs(count int) []*types.VolumeHealthMetrics {
metrics := make([]*types.VolumeHealthMetrics, 0, count)
now := time.Now()
+11 -2
View File
@@ -271,8 +271,17 @@ func (t *ErasureCodingTask) Validate(params *worker_pb.TaskParams) error {
return fmt.Errorf("invalid parity shards: %d (must be >= 1)", ecParams.ParityShards)
}
if len(params.Targets) < int(ecParams.DataShards+ecParams.ParityShards) {
return fmt.Errorf("insufficient targets: got %d, need %d", len(params.Targets), ecParams.DataShards+ecParams.ParityShards)
// Count distinct shard ids across targets, not target rows: Place packs several
// shards onto one (node,disk) target when there are fewer disks than shards, so
// a valid plan can have fewer target rows than total shards.
distinctShards := make(map[uint32]struct{})
for _, target := range params.Targets {
for _, sid := range target.ShardIds {
distinctShards[sid] = struct{}{}
}
}
if total := int(ecParams.DataShards + ecParams.ParityShards); len(distinctShards) < total {
return fmt.Errorf("insufficient shard targets: got %d distinct shards across %d targets, need %d", len(distinctShards), len(params.Targets), total)
}
return nil
@@ -138,6 +138,14 @@ func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
{
Name: "replica_placement",
Label: "Replica Placement",
Description: "EC shard placement (e.g. 020): 2nd/3rd digits cap shards per rack/node (best-effort during encode, enforced by rebalancing); the data-center digit is ignored. Empty uses the master default.",
Placeholder: "020",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
},
},
},
@@ -154,6 +162,9 @@ func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
"preferred_tags": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
"replica_placement": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
},
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
@@ -217,7 +228,11 @@ func (h *ErasureCodingHandler) Detect(
return err
}
clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology, GrpcDialOption: h.grpcDialOption}
clusterInfo := &workertypes.ClusterInfo{
ActiveTopology: activeTopology,
GrpcDialOption: h.grpcDialOption,
DefaultReplicaPlacement: pluginworker.FetchDefaultReplicaPlacement(ctx, masters, h.grpcDialOption),
}
maxResults := int(request.MaxResults)
if maxResults < 0 {
maxResults = 0
@@ -592,6 +607,8 @@ func deriveErasureCodingWorkerConfig(values map[string]*plugin_pb.ConfigValue) *
taskConfig.PreferredTags = util.NormalizeTagList(pluginworker.ReadStringListConfig(values, "preferred_tags"))
taskConfig.ReplicaPlacement = strings.TrimSpace(pluginworker.ReadStringConfig(values, "replica_placement", taskConfig.ReplicaPlacement))
return &erasureCodingWorkerConfig{
TaskConfig: taskConfig,
}