fix(volume.list): show one entry per physical disk on multi-disk nodes (#9541)

* fix(volume.list): show one entry per physical disk on multi-disk nodes

DataNodeInfo.DiskInfos is keyed by disk type, so several same-type
physical disks on one node collapse to a single map entry at the master.
volume.list iterated that map directly and reported one "Disk hdd ...
id:0" line per node, hiding the per-disk volume and shard layout. EC
operators on multi-disk volume servers had no way to verify which
physical disk a shard landed on.

Lift the per-physical-disk split into a DiskInfo.SplitByPhysicalDisk()
method on the proto type so consumers outside admin/topology can use
it. Apply it in writeDataNodeInfo so the verbose Disk block shows one
entry per physical disk, ordered by DiskId. Capacity counters are
split evenly across reconstructed disks since the wire format doesn't
carry per-disk capacity yet.

This is a display-only change. ActiveTopology already did the split on
its own and is now updated to call the shared helper.

* fix(volume.list): preserve totals, count active/remote exactly, dedupe header

Address review feedback on the per-physical-disk split:

- share() truncated remainders so reconstructed per-disk counters could
  sum to less than the original aggregate (10 / 3 = 3+3+3). Distribute
  the remainder to the lowest disk ids so MaxVolumeCount and
  FreeVolumeCount sum exactly back to the node totals.
- ActiveVolumeCount and RemoteVolumeCount are derivable per disk from
  the VolumeInfos already grouped by DiskId, so count them exactly
  (ReadOnly=false and RemoteStorageName!="" respectively) instead of
  approximating with an even split.
- writeDataNodeInfo's per-disk callback fired the DataNode header on
  every iteration after the split, so a node with 6 physical disks
  emitted 6 DataNode headers. Guard the callback with headerPrinted so
  the header still appears at most once per node.
- Sort split disks deterministically using explicit DiskId comparison
  to avoid int overflow risk on 32-bit systems.
- Tighten the volume.list test substring to "id:N\n" so unrelated
  tokens like "ec volume id:101" don't accidentally match the id:1
  needle, and assert the rack callback fires once.
This commit is contained in:
Chris Lu
2026-05-18 14:43:44 -07:00
committed by GitHub
parent a761441926
commit 41b6ad002b
5 changed files with 424 additions and 77 deletions
+2 -70
View File
@@ -9,74 +9,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
)
// splitDiskInfoByPhysicalDisk returns one master_pb.DiskInfo per physical
// disk_id observed in VolumeInfos / EcShardInfos. Multiple same-type physical
// disks collapse to one DiskInfo at the master; per-volume/per-shard records
// keep the original disk_id and are the authoritative signal here. Capacity
// is split evenly — the wire format doesn't carry per-disk capacity yet.
func splitDiskInfoByPhysicalDisk(diskInfo *master_pb.DiskInfo) []*master_pb.DiskInfo {
if diskInfo == nil {
return nil
}
// Records with DiskId=0 and a non-zero outer DiskId belong to the outer
// disk — handles older payloads / fixtures that omit the per-record id.
normalize := func(id uint32) uint32 {
if id == 0 && diskInfo.DiskId != 0 {
return diskInfo.DiskId
}
return id
}
diskIDs := make(map[uint32]struct{})
for _, vi := range diskInfo.VolumeInfos {
diskIDs[normalize(vi.DiskId)] = struct{}{}
}
for _, eci := range diskInfo.EcShardInfos {
diskIDs[normalize(eci.DiskId)] = struct{}{}
}
if len(diskIDs) == 0 {
diskIDs[diskInfo.DiskId] = struct{}{}
}
if len(diskIDs) == 1 {
for diskID := range diskIDs {
if diskID == diskInfo.DiskId {
return []*master_pb.DiskInfo{diskInfo}
}
}
}
perDiskVolumes := make(map[uint32][]*master_pb.VolumeInformationMessage)
for _, vi := range diskInfo.VolumeInfos {
perDiskVolumes[normalize(vi.DiskId)] = append(perDiskVolumes[normalize(vi.DiskId)], vi)
}
perDiskShards := make(map[uint32][]*master_pb.VolumeEcShardInformationMessage)
for _, eci := range diskInfo.EcShardInfos {
perDiskShards[normalize(eci.DiskId)] = append(perDiskShards[normalize(eci.DiskId)], eci)
}
count := int64(len(diskIDs))
share := func(total int64) int64 { return total / count }
result := make([]*master_pb.DiskInfo, 0, len(diskIDs))
for diskID := range diskIDs {
result = append(result, &master_pb.DiskInfo{
Type: diskInfo.Type,
MaxVolumeCount: share(diskInfo.MaxVolumeCount),
VolumeCount: int64(len(perDiskVolumes[diskID])),
FreeVolumeCount: share(diskInfo.FreeVolumeCount),
ActiveVolumeCount: share(diskInfo.ActiveVolumeCount),
RemoteVolumeCount: share(diskInfo.RemoteVolumeCount),
VolumeInfos: perDiskVolumes[diskID],
EcShardInfos: perDiskShards[diskID],
DiskId: diskID,
Tags: append([]string(nil), diskInfo.Tags...),
})
}
return result
}
// CountTopologyResources counts datacenters, nodes, and disks in topology info
func CountTopologyResources(topologyInfo *master_pb.TopologyInfo) (dcCount, nodeCount, diskCount int) {
if topologyInfo == nil {
@@ -142,10 +74,10 @@ func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) e
disks: make(map[uint32]*activeDisk),
}
// One activeDisk per physical disk_id (#9369): the master keys
// One activeDisk per physical disk_id: the master keys
// DiskInfos by disk type, so same-type disks must be split out.
for diskType, diskInfo := range nodeInfo.DiskInfos {
perDiskInfos := splitDiskInfoByPhysicalDisk(diskInfo)
perDiskInfos := diskInfo.SplitByPhysicalDisk()
for _, perDisk := range perDiskInfos {
disk := &activeDisk{
DiskInfo: &DiskInfo{
+106
View File
@@ -1,5 +1,111 @@
package master_pb
import "sort"
func (v *VolumeLocation) IsEmptyUrl() bool {
return v.Url == "" || v.Url == ":0"
}
// SplitByPhysicalDisk returns one DiskInfo per physical disk_id observed in
// VolumeInfos / EcShardInfos. The wire format keys DataNodeInfo.DiskInfos by
// disk type, so multiple same-type physical disks on one DataNode collapse
// into a single DiskInfo entry. Per-volume and per-shard records carry the
// real physical DiskId; this helper rebuilds a per-physical-disk view from
// those records so consumers (topology indexes, shell output) can target
// individual disks instead of treating each node as one big disk.
//
// ActiveVolumeCount and RemoteVolumeCount are computed exactly from each
// disk's VolumeInfos (read-only and remote-backed are known per-volume).
// MaxVolumeCount and FreeVolumeCount are not derivable from per-volume
// records, so they are split across reconstructed disks with the remainder
// distributed to the lowest disk ids — the sums are preserved exactly.
func (d *DiskInfo) SplitByPhysicalDisk() []*DiskInfo {
if d == nil {
return nil
}
normalize := func(id uint32) uint32 {
if id == 0 && d.DiskId != 0 {
return d.DiskId
}
return id
}
diskIDs := make(map[uint32]struct{})
for _, vi := range d.VolumeInfos {
diskIDs[normalize(vi.DiskId)] = struct{}{}
}
for _, eci := range d.EcShardInfos {
diskIDs[normalize(eci.DiskId)] = struct{}{}
}
if len(diskIDs) == 0 {
diskIDs[d.DiskId] = struct{}{}
}
if len(diskIDs) == 1 {
for diskID := range diskIDs {
if diskID == d.DiskId {
return []*DiskInfo{d}
}
}
}
perDiskVolumes := make(map[uint32][]*VolumeInformationMessage)
for _, vi := range d.VolumeInfos {
id := normalize(vi.DiskId)
perDiskVolumes[id] = append(perDiskVolumes[id], vi)
}
perDiskShards := make(map[uint32][]*VolumeEcShardInformationMessage)
for _, eci := range d.EcShardInfos {
id := normalize(eci.DiskId)
perDiskShards[id] = append(perDiskShards[id], eci)
}
// Sort disk IDs so the remainder distribution is deterministic and the
// reconstructed slice is in DiskId order, which is what downstream
// renderers expect.
ids := make([]uint32, 0, len(diskIDs))
for id := range diskIDs {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
count := int64(len(ids))
// share returns total / count, plus one extra for the first
// (total % count) entries so the sum of shares equals total. Without
// the remainder distribution, splitting 10 across 3 disks would yield
// 3+3+3 = 9 and under-report aggregate capacity.
share := func(total int64, idx int) int64 {
base := total / count
if int64(idx) < total%count {
return base + 1
}
return base
}
result := make([]*DiskInfo, 0, len(ids))
for i, diskID := range ids {
var activeCount, remoteCount int64
for _, vi := range perDiskVolumes[diskID] {
if !vi.ReadOnly {
activeCount++
}
if vi.RemoteStorageName != "" {
remoteCount++
}
}
result = append(result, &DiskInfo{
Type: d.Type,
MaxVolumeCount: share(d.MaxVolumeCount, i),
VolumeCount: int64(len(perDiskVolumes[diskID])),
FreeVolumeCount: share(d.FreeVolumeCount, i),
ActiveVolumeCount: activeCount,
RemoteVolumeCount: remoteCount,
VolumeInfos: perDiskVolumes[diskID],
EcShardInfos: perDiskShards[diskID],
DiskId: diskID,
Tags: append([]string(nil), d.Tags...),
})
}
return result
}
+219
View File
@@ -0,0 +1,219 @@
package master_pb
import (
"sort"
"testing"
)
func TestDiskInfoSplitByPhysicalDisk_collapsesOnSingleDisk(t *testing.T) {
d := &DiskInfo{
Type: "hdd",
MaxVolumeCount: 10,
VolumeInfos: []*VolumeInformationMessage{
{Id: 1, DiskId: 0},
{Id: 2, DiskId: 0},
},
DiskId: 0,
}
got := d.SplitByPhysicalDisk()
if len(got) != 1 {
t.Fatalf("want 1 split disk, got %d", len(got))
}
if got[0] != d {
t.Errorf("single-disk input should be returned unchanged; got a copy")
}
}
func TestDiskInfoSplitByPhysicalDisk_splitsByVolumeDiskId(t *testing.T) {
d := &DiskInfo{
Type: "hdd",
MaxVolumeCount: 60,
FreeVolumeCount: 30,
ActiveVolumeCount: 12,
VolumeInfos: []*VolumeInformationMessage{
{Id: 10, DiskId: 0},
{Id: 11, DiskId: 0},
{Id: 20, DiskId: 1},
{Id: 21, DiskId: 2},
{Id: 22, DiskId: 2},
{Id: 23, DiskId: 2},
},
}
got := d.SplitByPhysicalDisk()
if len(got) != 3 {
t.Fatalf("want 3 split disks, got %d", len(got))
}
byID := map[uint32]*DiskInfo{}
for _, di := range got {
byID[di.DiskId] = di
}
for _, want := range []uint32{0, 1, 2} {
if _, ok := byID[want]; !ok {
t.Errorf("missing DiskId=%d in split result", want)
}
}
if byID[0].VolumeCount != 2 {
t.Errorf("disk 0: want 2 volumes, got %d", byID[0].VolumeCount)
}
if byID[1].VolumeCount != 1 {
t.Errorf("disk 1: want 1 volume, got %d", byID[1].VolumeCount)
}
if byID[2].VolumeCount != 3 {
t.Errorf("disk 2: want 3 volumes, got %d", byID[2].VolumeCount)
}
// Capacity is split evenly across the reconstructed disks. With 3 disks
// and a max of 60, every reconstructed disk gets 20.
for id, di := range byID {
if di.MaxVolumeCount != 20 {
t.Errorf("disk %d: want MaxVolumeCount=20 (60/3), got %d", id, di.MaxVolumeCount)
}
}
// Disk type is preserved on every reconstructed entry so writers that
// label by type still see "hdd".
for id, di := range byID {
if di.Type != "hdd" {
t.Errorf("disk %d: want Type=hdd, got %q", id, di.Type)
}
}
}
// TestDiskInfoSplitByPhysicalDisk_preservesAggregateCapacityWithRemainder
// pins the invariant that the sum of reconstructed counters equals the
// original aggregate, even when the count does not divide evenly.
func TestDiskInfoSplitByPhysicalDisk_preservesAggregateCapacityWithRemainder(t *testing.T) {
d := &DiskInfo{
Type: "hdd",
MaxVolumeCount: 10,
FreeVolumeCount: 7,
VolumeInfos: []*VolumeInformationMessage{
{Id: 1, DiskId: 0},
{Id: 2, DiskId: 1},
{Id: 3, DiskId: 2},
},
}
got := d.SplitByPhysicalDisk()
if len(got) != 3 {
t.Fatalf("want 3 disks, got %d", len(got))
}
var sumMax, sumFree int64
for _, di := range got {
sumMax += di.MaxVolumeCount
sumFree += di.FreeVolumeCount
}
if sumMax != 10 {
t.Errorf("sum of MaxVolumeCount = %d, want 10 (lossless split)", sumMax)
}
if sumFree != 7 {
t.Errorf("sum of FreeVolumeCount = %d, want 7 (lossless split)", sumFree)
}
}
// TestDiskInfoSplitByPhysicalDisk_countsActiveAndRemoteExactly verifies
// the per-disk ActiveVolumeCount and RemoteVolumeCount are derived from
// the actual VolumeInfos rather than an even split of the node totals.
func TestDiskInfoSplitByPhysicalDisk_countsActiveAndRemoteExactly(t *testing.T) {
d := &DiskInfo{
Type: "hdd",
VolumeInfos: []*VolumeInformationMessage{
{Id: 1, DiskId: 0, ReadOnly: false},
{Id: 2, DiskId: 0, ReadOnly: true},
{Id: 3, DiskId: 1, ReadOnly: false, RemoteStorageName: "s3"},
{Id: 4, DiskId: 2, ReadOnly: false},
{Id: 5, DiskId: 2, ReadOnly: false, RemoteStorageName: "s3"},
},
}
got := d.SplitByPhysicalDisk()
byID := map[uint32]*DiskInfo{}
for _, di := range got {
byID[di.DiskId] = di
}
cases := []struct {
id uint32
wantActive int64
wantRemote int64
}{
{0, 1, 0},
{1, 1, 1},
{2, 2, 1},
}
for _, c := range cases {
di := byID[c.id]
if di == nil {
t.Errorf("missing disk %d", c.id)
continue
}
if di.ActiveVolumeCount != c.wantActive {
t.Errorf("disk %d: ActiveVolumeCount = %d, want %d", c.id, di.ActiveVolumeCount, c.wantActive)
}
if di.RemoteVolumeCount != c.wantRemote {
t.Errorf("disk %d: RemoteVolumeCount = %d, want %d", c.id, di.RemoteVolumeCount, c.wantRemote)
}
}
}
func TestDiskInfoSplitByPhysicalDisk_splitsByEcShardDiskId(t *testing.T) {
d := &DiskInfo{
Type: "hdd",
EcShardInfos: []*VolumeEcShardInformationMessage{
{Id: 100, DiskId: 4},
{Id: 101, DiskId: 4},
{Id: 102, DiskId: 5},
},
}
got := d.SplitByPhysicalDisk()
ids := make([]int, 0, len(got))
for _, di := range got {
ids = append(ids, int(di.DiskId))
}
sort.Ints(ids)
if len(ids) != 2 || ids[0] != 4 || ids[1] != 5 {
t.Fatalf("want split disk ids [4,5], got %v", ids)
}
}
func TestDiskInfoSplitByPhysicalDisk_normalizesZeroToOuterDiskId(t *testing.T) {
// Older payloads / fixtures omit the per-record DiskId. The outer DiskId
// is the authoritative fallback.
d := &DiskInfo{
Type: "hdd",
DiskId: 7,
VolumeInfos: []*VolumeInformationMessage{
{Id: 1, DiskId: 0}, // normalize to 7
{Id: 2, DiskId: 0}, // normalize to 7
},
}
got := d.SplitByPhysicalDisk()
if len(got) != 1 {
t.Fatalf("want 1 disk, got %d", len(got))
}
if got[0].DiskId != 7 {
t.Errorf("want DiskId=7 after normalization, got %d", got[0].DiskId)
}
}
func TestDiskInfoSplitByPhysicalDisk_nilSafe(t *testing.T) {
var d *DiskInfo
if got := d.SplitByPhysicalDisk(); got != nil {
t.Errorf("nil receiver should return nil slice, got %v", got)
}
}
func TestDiskInfoSplitByPhysicalDisk_emptyDiskReturnsSelf(t *testing.T) {
d := &DiskInfo{Type: "hdd", MaxVolumeCount: 10, DiskId: 3}
got := d.SplitByPhysicalDisk()
if len(got) != 1 || got[0] != d {
t.Errorf("empty disk should be passed through unchanged; got %v", got)
}
}
+31 -7
View File
@@ -181,14 +181,38 @@ func (c *commandVolumeList) writeRackInfo(writer io.Writer, t *master_pb.RackInf
func (c *commandVolumeList) writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo, verbosityLevel int, outRackInfo func()) statistics {
var s statistics
diskInfoFound := false
headerPrinted := false
// DataNodeInfo.DiskInfos keys by disk type, so a node with several disks
// of the same type collapses into a single entry. Split each one back
// into per-physical-disk views (using the DiskId carried on each
// VolumeInfo / EcShardInfo) so the verbose Disk block shows one entry
// per physical disk instead of summing them all under "id:0". headerPrinted
// guards against the inner callback firing once per split disk and
// emitting the DataNode header on every iteration.
for _, diskType := range sortMapKey(t.DiskInfos) {
diskInfo := t.DiskInfos[diskType]
s.add(c.writeDiskInfo(writer, diskInfo, verbosityLevel, func() {
outRackInfo()
output(verbosityLevel >= 3, writer, " DataNode %s%s\n", t.Id, diskInfosToString(t.DiskInfos))
}))
if !diskInfoFound && !s.isEmpty() {
diskInfoFound = true
perDiskInfos := t.DiskInfos[diskType].SplitByPhysicalDisk()
slices.SortFunc(perDiskInfos, func(a, b *master_pb.DiskInfo) int {
switch {
case a.DiskId < b.DiskId:
return -1
case a.DiskId > b.DiskId:
return 1
default:
return 0
}
})
for _, diskInfo := range perDiskInfos {
s.add(c.writeDiskInfo(writer, diskInfo, verbosityLevel, func() {
if headerPrinted {
return
}
outRackInfo()
output(verbosityLevel >= 3, writer, " DataNode %s%s\n", t.Id, diskInfosToString(t.DiskInfos))
headerPrinted = true
}))
if !diskInfoFound && !s.isEmpty() {
diskInfoFound = true
}
}
}
output(diskInfoFound && verbosityLevel >= 3, writer, " DataNode %s %+v \n", t.Id, s)
+66
View File
@@ -1,6 +1,10 @@
package shell
import (
"bytes"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/stretchr/testify/assert"
@@ -127,3 +131,65 @@ func parseOutput(output string) *master_pb.TopologyInfo {
return topo
}
// TestWriteDataNodeInfo_SplitsCollapsedDisksByPhysicalDiskId verifies that
// the verbose Disk block in volume.list shows one entry per physical disk
// when the master collapsed several same-type disks under a single
// DiskInfos["hdd"] map entry. Before the split, six physical disks would
// appear as "Disk hdd ... id:0" with all volumes stacked on it.
func TestWriteDataNodeInfo_SplitsCollapsedDisksByPhysicalDiskId(t *testing.T) {
dn := &master_pb.DataNodeInfo{
Id: "node1:8081",
DiskInfos: map[string]*master_pb.DiskInfo{
"hdd": {
Type: "hdd",
MaxVolumeCount: 60,
VolumeInfos: []*master_pb.VolumeInformationMessage{
{Id: 1, DiskId: 0, Collection: "c"},
{Id: 2, DiskId: 1, Collection: "c"},
{Id: 3, DiskId: 2, Collection: "c"},
},
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{
{Id: 100, DiskId: 3, Collection: "c", EcIndexBits: 1},
{Id: 101, DiskId: 4, Collection: "c", EcIndexBits: 1},
{Id: 102, DiskId: 5, Collection: "c", EcIndexBits: 1},
},
},
},
}
c := &commandVolumeList{}
fs := flag.NewFlagSet("volume.list", flag.ContinueOnError)
c.collectionPattern = fs.String("collection", "", "")
c.dataCenter = fs.String("dataCenter", "", "")
c.rack = fs.String("rack", "", "")
c.dataNode = fs.String("dataNode", "", "")
c.readonly = fs.Bool("readonly", false, "")
c.writable = fs.Bool("writable", false, "")
c.volumeId = fs.Uint64("volumeId", 0, "")
var buf bytes.Buffer
verbosity := 5
rackInvocations := 0
c.writeDataNodeInfo(&buf, dn, verbosity, func() { rackInvocations++ })
out := buf.String()
// Match the "id:N\n" line ending so substring checks don't accept
// unrelated tokens like "ec volume id:101" as a match for id:1.
for diskID := 0; diskID < 6; diskID++ {
needle := fmt.Sprintf("id:%d\n", diskID)
if !strings.Contains(out, needle) {
t.Errorf("output missing %q; got:\n%s", needle, out)
}
}
// The parent's outRackInfo callback rides the same code path as the
// DataNode header — it fires from the inner writeDiskInfo callback,
// once per disk before the fix. After the fix the header guard runs
// outRackInfo at most once per DataNode. This proxy lets us pin the
// "header printed once" invariant without depending on the exact
// format of the rendered DataNode line.
if rackInvocations != 1 {
t.Errorf("DataNode header callback ran %d times; want 1 (regression: header printed once per split disk)", rackInvocations)
}
}