mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(shell): volume.balance no longer drains all volumes onto one server (#9579)
* fix(shell): volume.balance no longer drains all volumes onto one server The density-based capacity function reads per-disk VolumeInfos sizes, but adjustAfterMove only updated VolumeCount and the selectedVolumes map. The planner re-read a stale topology after every move, so the source node's density never dropped and it kept moving volumes until that node was empty. Move the volume's size accounting between disks after each planned move so the density recomputes and the loop converges to an even distribution. * refactor(shell): O(1) volume removal and direct disk lookup in adjustAfterMove removeVolumeInfo swaps with the last element instead of shifting, and the disk is fetched by key rather than ranging the DiskInfos map.
This commit is contained in:
@@ -564,6 +564,19 @@ func isGoodMove(placement *super_block.ReplicaPlacement, existingReplicas []*Vol
|
||||
return satisfyReplicaPlacement(placement, existingReplicasExceptSourceNode, targetLocation)
|
||||
}
|
||||
|
||||
func removeVolumeInfo(diskInfo *master_pb.DiskInfo, volumeId uint32) {
|
||||
for i, volumeInfo := range diskInfo.VolumeInfos {
|
||||
if volumeInfo.Id == volumeId {
|
||||
// order does not matter here, so swap with the last and truncate
|
||||
last := len(diskInfo.VolumeInfos) - 1
|
||||
diskInfo.VolumeInfos[i] = diskInfo.VolumeInfos[last]
|
||||
diskInfo.VolumeInfos[last] = nil
|
||||
diskInfo.VolumeInfos = diskInfo.VolumeInfos[:last]
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func adjustAfterMove(v *master_pb.VolumeInformationMessage, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, emptyNode *Node) {
|
||||
delete(fullNode.selectedVolumes, v.Id)
|
||||
if emptyNode.selectedVolumes != nil {
|
||||
@@ -576,15 +589,17 @@ func adjustAfterMove(v *master_pb.VolumeInformationMessage, volumeReplicas map[u
|
||||
replica.location.dc == fullNode.dc {
|
||||
loc := newLocation(emptyNode.dc, emptyNode.rack, emptyNode.info)
|
||||
replica.location = &loc
|
||||
for diskType, diskInfo := range fullNode.info.DiskInfos {
|
||||
if diskType == v.DiskType {
|
||||
addVolumeCount(diskInfo, -1)
|
||||
}
|
||||
// Move the volume's size accounting between disks so that
|
||||
// capacityByMinVolumeDensity recomputes ratios correctly on the next
|
||||
// iteration. Without this the density view stays stale and the planner
|
||||
// keeps draining the same node, moving every volume onto one server.
|
||||
if fullDisk, found := fullNode.info.DiskInfos[v.DiskType]; found {
|
||||
removeVolumeInfo(fullDisk, v.Id)
|
||||
addVolumeCount(fullDisk, -1)
|
||||
}
|
||||
for diskType, diskInfo := range emptyNode.info.DiskInfos {
|
||||
if diskType == v.DiskType {
|
||||
addVolumeCount(diskInfo, 1)
|
||||
}
|
||||
if emptyDisk, found := emptyNode.info.DiskInfos[v.DiskType]; found {
|
||||
emptyDisk.VolumeInfos = append(emptyDisk.VolumeInfos, v)
|
||||
addVolumeCount(emptyDisk, 1)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -262,6 +262,64 @@ func TestBalance(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
// Regression test: a freshly added empty volume server must end up sharing the
|
||||
// data roughly evenly, not having every volume drained onto it. Before the fix,
|
||||
// adjustAfterMove never updated the per-disk VolumeInfos that the density-based
|
||||
// capacity function reads, so the planner saw a stale topology and moved every
|
||||
// volume from the full node onto the empty one.
|
||||
func TestBalanceDoesNotDrainOntoOneNode(t *testing.T) {
|
||||
const mb = 1024 * 1024
|
||||
volumeSizeLimitMb := uint64(100)
|
||||
|
||||
makeNode := func(id string, volumes []*master_pb.VolumeInformationMessage) *Node {
|
||||
return &Node{
|
||||
info: &master_pb.DataNodeInfo{
|
||||
Id: id,
|
||||
DiskInfos: map[string]*master_pb.DiskInfo{
|
||||
"": {
|
||||
MaxVolumeCount: 10,
|
||||
VolumeCount: int64(len(volumes)),
|
||||
VolumeInfos: volumes,
|
||||
},
|
||||
},
|
||||
},
|
||||
dc: "dc1",
|
||||
rack: "rack1",
|
||||
}
|
||||
}
|
||||
|
||||
var fullVolumes []*master_pb.VolumeInformationMessage
|
||||
for id := uint32(1); id <= 6; id++ {
|
||||
fullVolumes = append(fullVolumes, &master_pb.VolumeInformationMessage{Id: id, Size: 95 * mb})
|
||||
}
|
||||
fullNode := makeNode("full", fullVolumes)
|
||||
emptyNode := makeNode("empty", nil)
|
||||
nodes := []*Node{fullNode, emptyNode}
|
||||
|
||||
volumeReplicas := map[uint32][]*VolumeReplica{}
|
||||
for _, v := range fullVolumes {
|
||||
loc := newLocation("dc1", "rack1", fullNode.info)
|
||||
volumeReplicas[v.Id] = []*VolumeReplica{{location: &loc, info: v}}
|
||||
}
|
||||
|
||||
for _, n := range nodes {
|
||||
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { return true })
|
||||
}
|
||||
|
||||
if err := balanceSelectedVolume(nil, types.HardDriveType, volumeReplicas, nodes, sortWritableVolumes, volumeSizeLimitMb, false); err != nil {
|
||||
t.Fatalf("balanceSelectedVolume: %v", err)
|
||||
}
|
||||
|
||||
fullCount := len(fullNode.info.DiskInfos[""].VolumeInfos)
|
||||
emptyCount := len(emptyNode.info.DiskInfos[""].VolumeInfos)
|
||||
if fullCount == 0 || emptyCount == 0 {
|
||||
t.Fatalf("expected volumes spread across both nodes, got full=%d empty=%d", fullCount, emptyCount)
|
||||
}
|
||||
if diff := fullCount - emptyCount; diff > 1 || diff < -1 {
|
||||
t.Fatalf("expected balanced distribution within one volume, got full=%d empty=%d", fullCount, emptyCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestVolumeSelection(t *testing.T) {
|
||||
topologyInfo := parseOutput(topoData)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user