mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
f0d2a0d417
* admin/topology: carry the volume server address on DiskInfo The planning DiskInfo exposed only the node id, which can be an opaque label rather than ip:port. Record the address too so callers can resolve the physical machine a disk sits on. * ec.balance: spread a volume's shards across machines, not just nodes Volume servers sharing a host are one fault domain, but the within-rack spread treated them as independent nodes, so one box could end up holding more shards of a volume than EC can afford to lose. Add a machine (host) tier between rack and node: the within-rack pass spreads each volume across machines, and the global load phase no longer re-concentrates a volume onto a machine it already sits on. Host defaults to the node id, so clusters with one server per host are unchanged. * ec placement: prefer machines holding fewer of a volume's shards EC allocation and repair picked the least-loaded node in a rack with no regard for which physical machine it sits on, so a volume's shards could pile onto several servers of one box. Rank candidate nodes by their machine's shard count first, then the node's own. The machine is derived from the volume server address carried on DiskInfo, falling back to the node id, matching how the balancer resolves it. * volume.balance: don't move a replica onto a machine already holding one isGoodMove only rejected a move onto the same data node, so two replicas could land on two volume servers of one box and a single machine failure would lose both. Reject a target whose host already holds another replica of the volume. Best-effort: balancing simply skips and tries the next target. * volume allocation: spread same-rack replicas across machines PickNodesByWeight filled the same-rack replica picks by weight alone, so replicas could co-locate on one box. Prefer candidates on not-yet-used hosts, falling back when too few distinct machines exist. Data-center and rack tiers have no host, so their ordering is unchanged. * ec.balance: harden machine spread against re-concentration and capped machines Two cases where the machine-aware spread could still leave a volume badly placed: - The global load phase could move a shard of a volume onto a machine that already held it, raising that machine's count and undoing the within-rack spread (a 4/4/3/3 layout could become 3/5/3/3, past parity for 10+4). Limit the load-only fallback to same-machine moves, which leave a machine's count unchanged; cross-machine concentration is no longer allowed for load alone. - The within-rack spread chose a destination machine by free slots alone, so if that machine's only nodes were already at the SameRackCount cap it skipped the move instead of trying another machine. Require a machine to have a node that can actually take the shard before selecting it. * reduce comments across the machine-affinity change Trim narration down to the non-obvious why; one terse line where a block was overkill. * ec.balance: gate machine spread on fault-tolerance feasibility Spreading a volume evenly across machines only helps when there are enough that each can stay within EC's parity tolerance (numMachines >= ceil(total/parity)). With fewer -- or wildly unequal -- machines it can't make a machine loss survivable anyway, and forcing it fights capacity: e.g. a cluster of 12 volume servers on one host and 2 on another would have half of every volume crammed onto the 2-server box. So spread across machines only when it's achievable; otherwise fall back to per-node spread and let capacity/global balancing decide. The global load phase applies the same test: it protects a volume's machine spread (no cross-machine move that raises a machine's count past the source's) only where that spread is achievable, so heterogeneous clusters still level by fullness. * ec.balance worker: group servers by host when planning The worker built its planner topology without recording each server's host, so automated ec.balance treated ports on one machine as independent nodes and could concentrate a volume's shards on one physical box. Set the host from the volume server address, matching the shell path. * volume.balance worker: don't move a replica onto a machine holding one The worker compared only node ids, and the replica map dropped the server address, so it could move replicas onto different ports of one machine. Carry the host on ReplicaLocation (from the server address) and reject a target whose host already holds another replica of the volume. Best-effort, matching the shell. * ec.balance: judge machine-spread feasibility by the rack's shards The within-rack and global feasibility checks compared the whole volume's shard count against a rack's machine count, so a rack holding only part of a volume after cross-rack spreading -- e.g. 7 of a 10+4 volume across 2 machines -- was wrongly judged infeasible and fell back to node spread, which could pile 6 shards onto one host, past parity. Gate on the rack's own shard count of the volume instead. * ec.balance: spread a volume's shards across machines by combined count EC recovers from any loss within parity regardless of shard type, so what bounds a machine's exposure is its total shards of the volume, not data and parity separately. Spreading the two independently let each type's remainder land on the same machine -- ceil(d/M)+ceil(p/M) can exceed ceil(total/M), e.g. a 5/3 split where 4/4 was achievable, past parity. Balance the combined count in one pass; disk-level data/parity anti-affinity stays in pickBestDiskOnNode. * ec.balance: don't let the imbalance threshold skip an over-parity machine The within-rack spread gated on relative skew ((max-min)/avg > threshold), so a worker threshold of 0.5 skipped an exactly-50%-skewed layout like 5/4/3 for a 10+4 volume, leaving 5 shards -- past parity -- on one machine. The even cap (ceil(shards/groups)) is the real bound and the move loop already sheds only what exceeds it, so drop the threshold gate from the within-rack phase (machine and node): a balanced rack stays a no-op while any over-cap machine is always fixed. * ec.balance: keep the imbalance threshold for the node fallback Dropping the threshold from the whole within-rack phase made the node fallback too eager: it runs only when machine fault tolerance is unachievable, so it is cosmetic load distribution that should defer to the global utilization phase. Without the gate it would, for a one-server-per-host 6/4 split at threshold 0.5, schedule a count move that worsens utilization balance. Restore the threshold there; machine spreading keeps bypassing it, since that bound is durability, not cosmetic skew.
960 lines
31 KiB
Go
960 lines
31 KiB
Go
package shell
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"regexp"
|
|
"slices"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/ecbalancer"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type DataCenterId string
|
|
type EcNodeId string
|
|
type RackId string
|
|
|
|
// EcDisk represents a single disk on a volume server
|
|
type EcDisk struct {
|
|
diskId uint32
|
|
diskType string
|
|
freeEcSlots int
|
|
ecShardCount int // Total EC shards on this disk
|
|
// Map of volumeId -> ShardsInfo for shards on this disk
|
|
ecShards map[needle.VolumeId]*erasure_coding.ShardsInfo
|
|
}
|
|
|
|
type EcNode struct {
|
|
info *master_pb.DataNodeInfo
|
|
dc DataCenterId
|
|
rack RackId
|
|
freeEcSlot int
|
|
// disks maps diskId -> EcDisk for disk-level balancing
|
|
disks map[uint32]*EcDisk
|
|
}
|
|
|
|
type CandidateEcNode struct {
|
|
ecNode *EcNode
|
|
shardCount int
|
|
}
|
|
|
|
type EcRack struct {
|
|
ecNodes map[EcNodeId]*EcNode
|
|
freeEcSlot int
|
|
}
|
|
|
|
var (
|
|
ecBalanceAlgorithmDescription = `
|
|
func EcBalance() {
|
|
for each collection:
|
|
balanceEcVolumes(collectionName)
|
|
for each rack:
|
|
balanceEcRack(rack)
|
|
}
|
|
|
|
func balanceEcVolumes(collectionName){
|
|
for each volume:
|
|
doDeduplicateEcShards(volumeId)
|
|
|
|
tracks rack~shardCount mapping
|
|
for each volume:
|
|
doBalanceEcShardsAcrossRacks(volumeId)
|
|
|
|
for each volume:
|
|
doBalanceEcShardsWithinRacks(volumeId)
|
|
}
|
|
|
|
// spread ec shards into more racks
|
|
func doBalanceEcShardsAcrossRacks(volumeId){
|
|
tracks rack~volumeIdShardCount mapping
|
|
averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
|
|
ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
|
|
for each ecShardsToMove {
|
|
destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement)
|
|
destVolumeServers = volume servers on the destRack
|
|
pickOneEcNodeAndMoveOneShard(destVolumeServers)
|
|
}
|
|
}
|
|
|
|
func doBalanceEcShardsWithinRacks(volumeId){
|
|
racks = collect all racks that the volume id is on
|
|
for rack, shards := range racks
|
|
doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
|
|
}
|
|
|
|
// move ec shards
|
|
func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
|
|
tracks volumeServer~volumeIdShardCount mapping
|
|
averageShardCount = len(shards) / numVolumeServers
|
|
volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
|
|
ecShardsToMove = select overflown ec shards from volumeServersOverAverage
|
|
for each ecShardsToMove {
|
|
destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement)
|
|
pickOneEcNodeAndMoveOneShard(destVolumeServers)
|
|
}
|
|
}
|
|
|
|
// move ec shards while keeping shard distribution for the same volume unchanged or more even
|
|
func balanceEcRack(rack){
|
|
averageShardCount = total shards / numVolumeServers
|
|
for hasMovedOneEcShard {
|
|
sort all volume servers ordered by the number of local ec shards
|
|
pick the volume server A with the lowest number of ec shards x
|
|
pick the volume server B with the highest number of ec shards y
|
|
if y > averageShardCount and x +1 <= averageShardCount {
|
|
if B has a ec shard with volume id v that A does not have {
|
|
move one ec shard v from B to A
|
|
hasMovedOneEcShard = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
`
|
|
// Overridable functions for testing.
|
|
getDefaultReplicaPlacement = _getDefaultReplicaPlacement
|
|
)
|
|
|
|
func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
|
|
var resp *master_pb.GetMasterConfigurationResponse
|
|
var err error
|
|
|
|
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
|
|
resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
|
|
}
|
|
|
|
func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) {
|
|
var rp *super_block.ReplicaPlacement
|
|
var err error
|
|
|
|
if replicaStr != "" {
|
|
rp, err = super_block.NewReplicaPlacementFromString(replicaStr)
|
|
if err != nil {
|
|
return rp, err
|
|
}
|
|
glog.V(1).Infof("using replica placement %q for EC volumes\n", rp.String())
|
|
} else {
|
|
// No replica placement argument provided, resolve from master default settings.
|
|
rp, err = getDefaultReplicaPlacement(commandEnv)
|
|
if err != nil {
|
|
return rp, err
|
|
}
|
|
glog.V(1).Infof("using master default replica placement %q for EC volumes\n", rp.String())
|
|
}
|
|
|
|
return rp, nil
|
|
}
|
|
|
|
func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
|
|
if delayBeforeCollecting > 0 {
|
|
time.Sleep(delayBeforeCollecting)
|
|
}
|
|
|
|
var resp *master_pb.VolumeListResponse
|
|
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
|
|
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
|
|
}
|
|
|
|
func collectDataNodes(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) ([]*master_pb.DataNodeInfo, error) {
|
|
dataNodes := []*master_pb.DataNodeInfo{}
|
|
|
|
topo, _, err := collectTopologyInfo(commandEnv, delayBeforeCollecting)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, dci := range topo.GetDataCenterInfos() {
|
|
for _, r := range dci.GetRackInfos() {
|
|
for _, dn := range r.GetDataNodeInfos() {
|
|
dataNodes = append(dataNodes, dn)
|
|
}
|
|
}
|
|
}
|
|
|
|
return dataNodes, nil
|
|
}
|
|
|
|
func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
|
|
// list all possible locations
|
|
// collect topology information
|
|
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// find out all volume servers with one slot left.
|
|
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter, diskType)
|
|
|
|
sortEcNodesByFreeslotsDescending(ecNodes)
|
|
|
|
return
|
|
}
|
|
|
|
func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
|
|
return collectEcNodesForDC(commandEnv, "", diskType)
|
|
}
|
|
|
|
// collectVolumeIdToCollection returns a map from volume ID to its collection name
|
|
func collectVolumeIdToCollection(t *master_pb.TopologyInfo, vids []needle.VolumeId) map[needle.VolumeId]string {
|
|
result := make(map[needle.VolumeId]string)
|
|
if len(vids) == 0 {
|
|
return result
|
|
}
|
|
|
|
vidSet := make(map[needle.VolumeId]bool)
|
|
for _, vid := range vids {
|
|
vidSet[vid] = true
|
|
}
|
|
|
|
for _, dc := range t.DataCenterInfos {
|
|
for _, r := range dc.RackInfos {
|
|
for _, dn := range r.DataNodeInfos {
|
|
for _, diskInfo := range dn.DiskInfos {
|
|
for _, vi := range diskInfo.VolumeInfos {
|
|
vid := needle.VolumeId(vi.Id)
|
|
if vidSet[vid] {
|
|
result[vid] = vi.Collection
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string {
|
|
if len(vids) == 0 {
|
|
return nil
|
|
}
|
|
|
|
found := map[string]bool{}
|
|
for _, dc := range t.DataCenterInfos {
|
|
for _, r := range dc.RackInfos {
|
|
for _, dn := range r.DataNodeInfos {
|
|
for _, diskInfo := range dn.DiskInfos {
|
|
for _, vi := range diskInfo.VolumeInfos {
|
|
for _, vid := range vids {
|
|
if needle.VolumeId(vi.Id) == vid {
|
|
found[vi.Collection] = true
|
|
}
|
|
}
|
|
}
|
|
for _, ecs := range diskInfo.EcShardInfos {
|
|
for _, vid := range vids {
|
|
if needle.VolumeId(ecs.Id) == vid {
|
|
found[ecs.Collection] = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if len(found) == 0 {
|
|
return nil
|
|
}
|
|
|
|
collections := []string{}
|
|
for k, _ := range found {
|
|
collections = append(collections, k)
|
|
}
|
|
sort.Strings(collections)
|
|
return collections
|
|
}
|
|
|
|
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool, diskType types.DiskType) (err error) {
|
|
|
|
if !commandEnv.isLocked() {
|
|
return fmt.Errorf("lock is lost")
|
|
}
|
|
|
|
copiedShardIds := []erasure_coding.ShardId{shardId}
|
|
|
|
if applyBalancing {
|
|
|
|
existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
|
|
|
|
// ask destination node to copy shard and the ecx file from source node, and mount it
|
|
copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []erasure_coding.ShardId{shardId}, vid, collection, existingServerAddress, destDiskId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// unmount the to be deleted shards
|
|
err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// ask source node to delete the shard, and maybe the ecx file
|
|
err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if destDiskId > 0 {
|
|
fmt.Printf("moved ec shard %d.%d %s => %s (disk %d)\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id, destDiskId)
|
|
} else {
|
|
fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
|
|
}
|
|
|
|
}
|
|
|
|
destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds, diskType)
|
|
existingLocation.deleteEcVolumeShards(vid, copiedShardIds, diskType)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
|
|
targetServer *EcNode, shardIdsToCopy []erasure_coding.ShardId,
|
|
volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []erasure_coding.ShardId, err error) {
|
|
|
|
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
|
|
|
|
targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
|
|
err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
|
|
if targetAddress != existingLocation {
|
|
fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
|
|
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Collection: collection,
|
|
ShardIds: erasure_coding.ShardIdsToUint32(shardIdsToCopy),
|
|
CopyEcxFile: true,
|
|
CopyEcjFile: true,
|
|
CopyVifFile: true,
|
|
CopyEcsumFile: true, // propagate the bitrot sidecar with the shards (no-op if the source has none)
|
|
SourceDataNode: string(existingLocation),
|
|
DiskId: destDiskId,
|
|
})
|
|
if copyErr != nil {
|
|
return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
|
|
}
|
|
}
|
|
|
|
fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
|
|
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Collection: collection,
|
|
ShardIds: erasure_coding.ShardIdsToUint32(shardIdsToCopy),
|
|
})
|
|
if mountErr != nil {
|
|
return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
|
|
}
|
|
|
|
if targetAddress != existingLocation {
|
|
copiedShardIds = shardIdsToCopy
|
|
glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo)) {
|
|
for _, dc := range topo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, dn := range rack.DataNodeInfos {
|
|
fn(DataCenterId(dc.Id), RackId(rack.Id), dn)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func sortEcNodesByFreeslotsDescending(ecNodes []*EcNode) {
|
|
slices.SortFunc(ecNodes, func(a, b *EcNode) int {
|
|
return b.freeEcSlot - a.freeEcSlot
|
|
})
|
|
}
|
|
|
|
func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
|
|
slices.SortFunc(ecNodes, func(a, b *EcNode) int {
|
|
return a.freeEcSlot - b.freeEcSlot
|
|
})
|
|
}
|
|
|
|
func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
|
|
for _, eci := range ecShardInfos {
|
|
count += erasure_coding.GetShardCount(eci)
|
|
}
|
|
return
|
|
}
|
|
|
|
func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
|
|
if dn.DiskInfos == nil {
|
|
return 0
|
|
}
|
|
diskInfo := dn.DiskInfos[string(diskType)]
|
|
if diskInfo == nil {
|
|
return 0
|
|
}
|
|
|
|
slots := int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
|
|
if slots < 0 {
|
|
return 0
|
|
}
|
|
|
|
return slots
|
|
}
|
|
|
|
func (ecNode *EcNode) localShardIdCount(vid uint32) int {
|
|
for _, diskInfo := range ecNode.info.DiskInfos {
|
|
for _, eci := range diskInfo.EcShardInfos {
|
|
if vid == eci.Id {
|
|
return erasure_coding.GetShardCount(eci)
|
|
}
|
|
}
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int) {
|
|
eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
|
if selectedDataCenter != "" && selectedDataCenter != string(dc) {
|
|
return
|
|
}
|
|
|
|
freeEcSlots := countFreeShardSlots(dn, diskType)
|
|
ecNode := &EcNode{
|
|
info: dn,
|
|
dc: dc,
|
|
rack: rack,
|
|
freeEcSlot: int(freeEcSlots),
|
|
disks: make(map[uint32]*EcDisk),
|
|
}
|
|
|
|
// Build disk-level information from volumes and EC shards
|
|
// First, discover all unique disk IDs from VolumeInfos (includes empty disks)
|
|
allDiskIds := make(map[uint32]string) // diskId -> diskType
|
|
for diskTypeKey, diskInfo := range dn.DiskInfos {
|
|
if diskInfo == nil {
|
|
continue
|
|
}
|
|
// Get all disk IDs from volumes
|
|
for _, vi := range diskInfo.VolumeInfos {
|
|
allDiskIds[vi.DiskId] = diskTypeKey
|
|
}
|
|
// Also get disk IDs from EC shards
|
|
for _, ecShardInfo := range diskInfo.EcShardInfos {
|
|
allDiskIds[ecShardInfo.DiskId] = diskTypeKey
|
|
}
|
|
}
|
|
|
|
// Group EC shards by disk_id
|
|
diskShards := make(map[uint32]map[needle.VolumeId]*erasure_coding.ShardsInfo)
|
|
for _, diskInfo := range dn.DiskInfos {
|
|
if diskInfo == nil {
|
|
continue
|
|
}
|
|
for _, eci := range diskInfo.EcShardInfos {
|
|
diskId := eci.DiskId
|
|
if diskShards[diskId] == nil {
|
|
diskShards[diskId] = make(map[needle.VolumeId]*erasure_coding.ShardsInfo)
|
|
}
|
|
vid := needle.VolumeId(eci.Id)
|
|
diskShards[diskId][vid] = erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(eci)
|
|
}
|
|
}
|
|
|
|
// Create EcDisk for each discovered disk
|
|
diskCount := len(allDiskIds)
|
|
if diskCount == 0 {
|
|
diskCount = 1
|
|
}
|
|
freePerDisk := int(freeEcSlots) / diskCount
|
|
|
|
for diskId, diskTypeStr := range allDiskIds {
|
|
shards := diskShards[diskId]
|
|
if shards == nil {
|
|
shards = make(map[needle.VolumeId]*erasure_coding.ShardsInfo)
|
|
}
|
|
totalShardCount := 0
|
|
for _, shardsInfo := range shards {
|
|
totalShardCount += shardsInfo.Count()
|
|
}
|
|
|
|
ecNode.disks[diskId] = &EcDisk{
|
|
diskId: diskId,
|
|
diskType: diskTypeStr,
|
|
freeEcSlots: freePerDisk,
|
|
ecShardCount: totalShardCount,
|
|
ecShards: shards,
|
|
}
|
|
}
|
|
|
|
ecNodes = append(ecNodes, ecNode)
|
|
totalFreeEcSlots += freeEcSlots
|
|
})
|
|
return
|
|
}
|
|
|
|
func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []erasure_coding.ShardId) error {
|
|
|
|
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
|
|
|
|
return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Collection: collection,
|
|
ShardIds: erasure_coding.ShardIdsToUint32(toBeDeletedShardIds),
|
|
})
|
|
return deleteErr
|
|
})
|
|
|
|
}
|
|
|
|
func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedShardIds []erasure_coding.ShardId) error {
|
|
|
|
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedShardIds, sourceLocation)
|
|
|
|
return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
|
|
VolumeId: uint32(volumeId),
|
|
ShardIds: erasure_coding.ShardIdsToUint32(toBeUnmountedShardIds),
|
|
})
|
|
return deleteErr
|
|
})
|
|
}
|
|
|
|
func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedShardIds []erasure_coding.ShardId) error {
|
|
|
|
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedShardIds, sourceLocation)
|
|
|
|
return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Collection: collection,
|
|
ShardIds: erasure_coding.ShardIdsToUint32(toBeMountedShardIds),
|
|
})
|
|
return mountErr
|
|
})
|
|
}
|
|
|
|
func ceilDivide(a, b int) int {
|
|
var r int
|
|
if (a % b) != 0 {
|
|
r = 1
|
|
}
|
|
return (a / b) + r
|
|
}
|
|
|
|
func findEcVolumeShardsInfo(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType) *erasure_coding.ShardsInfo {
|
|
if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
|
|
for _, shardInfo := range diskInfo.EcShardInfos {
|
|
if needle.VolumeId(shardInfo.Id) == vid {
|
|
return erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(shardInfo)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Returns an empty ShardsInfo struct on failure, to avoid potential nil dereferences.
|
|
return erasure_coding.NewShardsInfo()
|
|
}
|
|
|
|
// TODO: simplify me
|
|
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []erasure_coding.ShardId, diskType types.DiskType) *EcNode {
|
|
|
|
foundVolume := false
|
|
diskInfo, found := ecNode.info.DiskInfos[string(diskType)]
|
|
if found {
|
|
for _, ecsi := range diskInfo.EcShardInfos {
|
|
if needle.VolumeId(ecsi.Id) == vid {
|
|
si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(ecsi)
|
|
oldShardCount := si.Count()
|
|
for _, shardId := range shardIds {
|
|
si.Set(erasure_coding.NewShardInfo(shardId, 0))
|
|
}
|
|
ecsi.EcIndexBits = si.Bitmap()
|
|
ecsi.ShardSizes = si.SizesInt64()
|
|
ecNode.freeEcSlot -= si.Count() - oldShardCount
|
|
foundVolume = true
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
diskInfo = &master_pb.DiskInfo{
|
|
Type: string(diskType),
|
|
}
|
|
ecNode.info.DiskInfos[string(diskType)] = diskInfo
|
|
}
|
|
|
|
if !foundVolume {
|
|
si := erasure_coding.NewShardsInfo()
|
|
for _, id := range shardIds {
|
|
si.Set(erasure_coding.NewShardInfo(id, 0))
|
|
}
|
|
diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
|
|
Id: uint32(vid),
|
|
Collection: collection,
|
|
EcIndexBits: si.Bitmap(),
|
|
ShardSizes: si.SizesInt64(),
|
|
DiskType: string(diskType),
|
|
})
|
|
ecNode.freeEcSlot -= si.Count()
|
|
}
|
|
|
|
return ecNode
|
|
}
|
|
|
|
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []erasure_coding.ShardId, diskType types.DiskType) *EcNode {
|
|
|
|
if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
|
|
for _, eci := range diskInfo.EcShardInfos {
|
|
if needle.VolumeId(eci.Id) == vid {
|
|
si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(eci)
|
|
oldCount := si.Count()
|
|
for _, shardId := range shardIds {
|
|
si.Delete(shardId)
|
|
}
|
|
eci.EcIndexBits = si.Bitmap()
|
|
eci.ShardSizes = si.SizesInt64()
|
|
ecNode.freeEcSlot -= si.Count() - oldCount
|
|
}
|
|
}
|
|
}
|
|
|
|
return ecNode
|
|
}
|
|
|
|
// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard
|
|
// It prefers disks of the specified type with fewer shards and more free slots
|
|
// When shardId is provided and dataShardCount > 0, it applies anti-affinity:
|
|
// - For data shards (shardId < dataShardCount): prefer disks without parity shards
|
|
// - For parity shards (shardId >= dataShardCount): prefer disks without data shards
|
|
// If strictDiskType is false, it will fall back to other disk types if no matching disk is found
|
|
func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType, strictDiskType bool, shardId erasure_coding.ShardId, dataShardCount int) uint32 {
|
|
if len(ecNode.disks) == 0 {
|
|
return 0 // No disk info available, let the server decide
|
|
}
|
|
|
|
var bestDiskId uint32
|
|
bestScore := -1
|
|
var fallbackDiskId uint32
|
|
fallbackScore := -1
|
|
|
|
// Determine if we're placing a data or parity shard
|
|
isDataShard := dataShardCount > 0 && int(shardId) < dataShardCount
|
|
|
|
for diskId, disk := range ecNode.disks {
|
|
if disk.freeEcSlots <= 0 {
|
|
continue
|
|
}
|
|
|
|
// Check existing shards on this disk for this volume
|
|
existingShards := 0
|
|
hasDataShards := false
|
|
hasParityShards := false
|
|
if si, ok := disk.ecShards[vid]; ok {
|
|
existingShards = si.Count()
|
|
// Check what type of shards are on this disk
|
|
if dataShardCount > 0 {
|
|
for _, existingShardId := range si.Ids() {
|
|
if int(existingShardId) < dataShardCount {
|
|
hasDataShards = true
|
|
} else {
|
|
hasParityShards = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Score: prefer disks with fewer total shards and fewer shards of this volume
|
|
// Lower score is better
|
|
score := disk.ecShardCount*10 + existingShards*100
|
|
|
|
// Apply anti-affinity penalty if applicable
|
|
if dataShardCount > 0 {
|
|
if isDataShard && hasParityShards {
|
|
// Penalize placing data shard on disk with parity shards
|
|
score += 1000
|
|
} else if !isDataShard && hasDataShards {
|
|
// Penalize placing parity shard on disk with data shards
|
|
score += 1000
|
|
}
|
|
}
|
|
|
|
if disk.diskType == string(diskType) {
|
|
// Matching disk type - this is preferred
|
|
if bestScore == -1 || score < bestScore {
|
|
bestScore = score
|
|
bestDiskId = diskId
|
|
}
|
|
} else if !strictDiskType {
|
|
// Non-matching disk type - use as fallback if allowed
|
|
if fallbackScore == -1 || score < fallbackScore {
|
|
fallbackScore = score
|
|
fallbackDiskId = diskId
|
|
}
|
|
}
|
|
}
|
|
|
|
// Return matching disk type if found, otherwise fallback
|
|
if bestDiskId != 0 {
|
|
return bestDiskId
|
|
}
|
|
return fallbackDiskId
|
|
}
|
|
|
|
// ecBalancer drives an EC balance run: it collects the cluster's EC nodes, hands
|
|
// them to the shared ecbalancer planner, and executes the planned shard moves.
|
|
// The balancing policy lives in weed/storage/erasure_coding/ecbalancer, shared
|
|
// with the EC balance worker so the two cannot drift.
|
|
type ecBalancer struct {
|
|
commandEnv *CommandEnv
|
|
ecNodes []*EcNode
|
|
replicaPlacement *super_block.ReplicaPlacement
|
|
applyBalancing bool
|
|
maxParallelization int
|
|
diskType types.DiskType
|
|
}
|
|
|
|
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, diskType types.DiskType, maxParallelization int, applyBalancing bool) (err error) {
|
|
// collect all ec nodes
|
|
allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc, diskType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if totalFreeEcSlots < 1 {
|
|
return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
|
|
}
|
|
|
|
ecb := &ecBalancer{
|
|
commandEnv: commandEnv,
|
|
ecNodes: allEcNodes,
|
|
replicaPlacement: ecReplicaPlacement,
|
|
applyBalancing: applyBalancing,
|
|
maxParallelization: maxParallelization,
|
|
diskType: diskType,
|
|
}
|
|
|
|
if len(collections) == 0 {
|
|
glog.V(1).Infof("WARNING: No collections to balance EC volumes across.\n")
|
|
}
|
|
return ecb.balance(collections)
|
|
}
|
|
|
|
// shellECRatio resolves a collection's EC data/parity counts, defaulting to the
|
|
// standard scheme. This is the shell's plug-in point for custom ratios.
|
|
func shellECRatio(_ string) (int, int) {
|
|
// Custom EC ratios are an enterprise feature; OSS uses the standard scheme.
|
|
return erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount
|
|
}
|
|
|
|
// balance plans EC shard moves with the shared planner and executes them. When
|
|
// collections is empty all collections present are balanced.
|
|
func (ecb *ecBalancer) balance(collections []string) error {
|
|
topo := toBalancerTopology(ecb.ecNodes, collections, ecb.diskType)
|
|
moves := ecbalancer.Plan(topo, ecbalancer.Options{
|
|
DiskType: string(ecb.diskType),
|
|
ImbalanceThreshold: 0, // the shell balances to an even distribution
|
|
ReplicaPlacement: ecb.replicaPlacement,
|
|
Ratio: shellECRatio,
|
|
// Balance the global phase by fractional fullness so heterogeneous-capacity
|
|
// nodes fill proportionally (matching the worker). This is identical to raw
|
|
// shard count when capacities are uniform.
|
|
GlobalUtilizationBased: true,
|
|
})
|
|
return ecb.executeMoves(moves)
|
|
}
|
|
|
|
// toBalancerTopology builds an ecbalancer.Topology from the shell's EcNode model,
|
|
// including the shards of the requested collections (all collections when empty).
|
|
func toBalancerTopology(ecNodes []*EcNode, collections []string, diskType types.DiskType) *ecbalancer.Topology {
|
|
allowed := make(map[string]bool, len(collections))
|
|
for _, c := range collections {
|
|
allowed[c] = true
|
|
}
|
|
|
|
topo := ecbalancer.NewTopology()
|
|
for _, en := range ecNodes {
|
|
rackKey := string(en.dc) + ":" + string(en.rack)
|
|
node := topo.AddNode(en.info.Id, string(en.dc), rackKey, en.freeEcSlot)
|
|
// Group by physical machine (host) so shards spread across machines, not just
|
|
// nodes; the id stays the node identity used for moves.
|
|
node.SetHost(pb.NewServerAddressFromDataNode(en.info).ToHost())
|
|
for diskId, d := range en.disks {
|
|
node.AddDisk(diskId, d.diskType, d.freeEcSlots, d.ecShardCount)
|
|
}
|
|
diskInfo, found := en.info.DiskInfos[string(diskType)]
|
|
if !found {
|
|
continue
|
|
}
|
|
for _, eci := range diskInfo.EcShardInfos {
|
|
if len(allowed) > 0 && !allowed[eci.Collection] {
|
|
continue
|
|
}
|
|
node.AddShards(eci.Id, eci.Collection, eci.DiskId, erasure_coding.ShardBits(eci.EcIndexBits))
|
|
}
|
|
}
|
|
return topo
|
|
}
|
|
|
|
// executeMoves carries out the planned moves. Phases run in order (a within-rack
|
|
// move can depend on a cross-rack move's result), and the independent moves
|
|
// within a phase run with up to maxParallelization concurrency. Apply mode does
|
|
// only the RPCs; dry-run mode runs sequentially and mutates the in-memory EcNode
|
|
// model so callers/tests can inspect the planned end state.
|
|
func (ecb *ecBalancer) executeMoves(moves []ecbalancer.Move) error {
|
|
byID := make(map[string]*EcNode, len(ecb.ecNodes))
|
|
for _, en := range ecb.ecNodes {
|
|
byID[en.info.Id] = en
|
|
}
|
|
|
|
// Plan emits moves grouped by phase; run each contiguous same-phase group
|
|
// together, waiting before the next so cross-phase dependencies hold.
|
|
for i := 0; i < len(moves); {
|
|
j := i
|
|
for j < len(moves) && moves[j].Phase == moves[i].Phase {
|
|
j++
|
|
}
|
|
if err := ecb.executePhase(byID, moves[i:j]); err != nil {
|
|
return err
|
|
}
|
|
i = j
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ecb *ecBalancer) executePhase(byID map[string]*EcNode, moves []ecbalancer.Move) error {
|
|
if !ecb.applyBalancing {
|
|
// Dry-run: sequential so the in-memory model updates are race-free and
|
|
// reflect the full plan for inspection.
|
|
for _, m := range moves {
|
|
if err := ecb.executeMove(byID, m); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
// Apply mode: parallelize across volumes, but run one volume's moves within a
|
|
// phase sequentially. Concurrent moves of the same volume to a node can race
|
|
// on its shared .ecx/.ecj/.vif sidecar files.
|
|
var order []uint32
|
|
byVol := make(map[uint32][]ecbalancer.Move)
|
|
for _, m := range moves {
|
|
if _, ok := byVol[m.VolumeID]; !ok {
|
|
order = append(order, m.VolumeID)
|
|
}
|
|
byVol[m.VolumeID] = append(byVol[m.VolumeID], m)
|
|
}
|
|
ewg := NewErrorWaitGroup(ecb.maxParallelization)
|
|
for _, vid := range order {
|
|
group := byVol[vid]
|
|
ewg.Add(func() error {
|
|
for _, m := range group {
|
|
if err := ecb.executeMove(byID, m); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
return ewg.Wait()
|
|
}
|
|
|
|
func (ecb *ecBalancer) executeMove(byID map[string]*EcNode, m ecbalancer.Move) error {
|
|
src := byID[m.SourceNode]
|
|
if src == nil {
|
|
return nil
|
|
}
|
|
vid := needle.VolumeId(m.VolumeID)
|
|
shardId := erasure_coding.ShardId(m.ShardID)
|
|
shardIds := []erasure_coding.ShardId{shardId}
|
|
|
|
if m.Phase == "dedup" {
|
|
fmt.Printf("dedup: delete ec shard %d.%d on %s\n", vid, shardId, m.SourceNode)
|
|
if !ecb.applyBalancing {
|
|
src.deleteEcVolumeShards(vid, shardIds, ecb.diskType)
|
|
return nil
|
|
}
|
|
grpcDialOption := ecb.commandEnv.option.GrpcDialOption
|
|
addr := pb.NewServerAddressFromDataNode(src.info)
|
|
if err := unmountEcShards(grpcDialOption, vid, addr, shardIds); err != nil {
|
|
return err
|
|
}
|
|
return sourceServerDeleteEcShards(grpcDialOption, m.Collection, vid, addr, shardIds)
|
|
}
|
|
|
|
dst := byID[m.TargetNode]
|
|
if dst == nil {
|
|
return nil
|
|
}
|
|
if m.TargetDisk > 0 {
|
|
fmt.Printf("%s moves ec shard %d.%d to %s (disk %d)\n", m.SourceNode, vid, shardId, m.TargetNode, m.TargetDisk)
|
|
} else {
|
|
fmt.Printf("%s moves ec shard %d.%d to %s\n", m.SourceNode, vid, shardId, m.TargetNode)
|
|
}
|
|
if !ecb.applyBalancing {
|
|
// Dry-run: update the in-memory model only.
|
|
return moveMountedShardToEcNode(ecb.commandEnv, src, m.Collection, vid, shardId, dst, m.TargetDisk, false, ecb.diskType)
|
|
}
|
|
return ecb.applyShardMoveRPC(src, dst, m.Collection, vid, shardId, m.TargetDisk)
|
|
}
|
|
|
|
// applyShardMoveRPC copies a shard to the destination disk, then unmounts and
|
|
// deletes it on the source. It does not touch the in-memory model, so it is safe
|
|
// to run concurrently across the moves of a phase.
|
|
func (ecb *ecBalancer) applyShardMoveRPC(src, dst *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destDiskId uint32) error {
|
|
grpcDialOption := ecb.commandEnv.option.GrpcDialOption
|
|
srcAddr := pb.NewServerAddressFromDataNode(src.info)
|
|
copiedShardIds, err := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, dst, []erasure_coding.ShardId{shardId}, vid, collection, srcAddr, destDiskId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(copiedShardIds) == 0 {
|
|
return nil
|
|
}
|
|
if err := unmountEcShards(grpcDialOption, vid, srcAddr, copiedShardIds); err != nil {
|
|
return err
|
|
}
|
|
return sourceServerDeleteEcShards(grpcDialOption, collection, vid, srcAddr, copiedShardIds)
|
|
}
|
|
|
|
// compileCollectionPattern compiles a regex pattern for collection matching.
|
|
// Empty patterns match empty collections only.
|
|
// The special keyword CollectionDefault ("_default") matches empty collections.
|
|
func compileCollectionPattern(pattern string) (*regexp.Regexp, error) {
|
|
if pattern == "" {
|
|
// empty pattern matches empty collection
|
|
return regexp.Compile("^$")
|
|
}
|
|
if pattern == CollectionDefault {
|
|
// CollectionDefault keyword matches empty collection
|
|
return regexp.Compile("^$")
|
|
}
|
|
return regexp.Compile(pattern)
|
|
}
|