From be9996962d5cdbfb2bb89803686fbd71fc2784d8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 21 Apr 2026 23:33:57 -0700 Subject: [PATCH] fix(test): avoid port collision between master gRPC and volume ports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AllocateMiniPorts(1) reserved masterPort and masterPort+GrpcPortOffset by holding listeners open, but closed them on return. The subsequent AllocatePorts call bound 127.0.0.1:0, so the OS could immediately reuse the just-released mini gRPC port as a volume port — causing the volume server to fail at bind time with "address already in use". Introduce AllocatePortSet(miniCount, regularCount) that holds every listener open until the full set is chosen, and route the five volume test cluster builders through it. --- test/testutil/ports.go | 60 +++++++++++++++++++ test/testutil/ports_test.go | 29 +++++++++ test/volume_server/framework/cluster.go | 11 +--- test/volume_server/framework/cluster_mixed.go | 13 ++-- test/volume_server/framework/cluster_multi.go | 13 ++-- .../framework/cluster_multi_rust.go | 13 ++-- test/volume_server/framework/cluster_rust.go | 11 +--- 7 files changed, 107 insertions(+), 43 deletions(-) create mode 100644 test/testutil/ports_test.go diff --git a/test/testutil/ports.go b/test/testutil/ports.go index f0c35f11c..687d98862 100644 --- a/test/testutil/ports.go +++ b/test/testutil/ports.go @@ -118,3 +118,63 @@ func MustFreeMiniPort(t *testing.T, name string) int { t.Helper() return MustFreeMiniPorts(t, []string{name})[0] } + +// AllocatePortSet atomically allocates miniCount master-style port pairs (each +// with port and port+GrpcPortOffset reserved) plus regularCount additional +// distinct ports. All listeners are held open until the entire batch is +// allocated, so a mini gRPC port cannot be recycled as a regular port mid-batch. +func AllocatePortSet(miniCount, regularCount int) (mini []int, regular []int, err error) { + const ( + minPort = 10000 + maxPort = 55000 + ) + reserved := make(map[int]bool) + mini = make([]int, 0, miniCount) + var listeners []net.Listener + defer func() { + for _, l := range listeners { + l.Close() + } + }() + + for idx := 0; idx < miniCount; idx++ { + found := false + for i := 0; i < 1000; i++ { + port := minPort + rand.Intn(maxPort-minPort) + grpcPort := port + GrpcPortOffset + if reserved[port] || reserved[grpcPort] { + continue + } + l1, lErr := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if lErr != nil { + continue + } + l2, lErr := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", grpcPort)) + if lErr != nil { + l1.Close() + continue + } + listeners = append(listeners, l1, l2) + reserved[port] = true + reserved[grpcPort] = true + mini = append(mini, port) + found = true + break + } + if !found { + return nil, nil, fmt.Errorf("failed to allocate mini port %d of %d", idx+1, miniCount) + } + } + + regular = make([]int, 0, regularCount) + for i := 0; i < regularCount; i++ { + l, lErr := net.Listen("tcp", "127.0.0.1:0") + if lErr != nil { + return nil, nil, lErr + } + listeners = append(listeners, l) + regular = append(regular, l.Addr().(*net.TCPAddr).Port) + } + + return mini, regular, nil +} diff --git a/test/testutil/ports_test.go b/test/testutil/ports_test.go new file mode 100644 index 000000000..3cfd6a11d --- /dev/null +++ b/test/testutil/ports_test.go @@ -0,0 +1,29 @@ +package testutil + +import "testing" + +func TestAllocatePortSetNoGrpcCollision(t *testing.T) { + // Run a few iterations to catch the OS-recycles-just-closed-port race + // that previously hit regular ports when the mini gRPC offset was freed + // between AllocateMiniPorts and AllocatePorts calls. + for iter := 0; iter < 20; iter++ { + mini, regular, err := AllocatePortSet(1, 3) + if err != nil { + t.Fatalf("iter %d: AllocatePortSet: %v", iter, err) + } + if len(mini) != 1 || len(regular) != 3 { + t.Fatalf("iter %d: unexpected counts mini=%d regular=%d", iter, len(mini), len(regular)) + } + reserved := map[int]bool{ + mini[0]: true, + mini[0] + GrpcPortOffset: true, + } + for _, p := range regular { + if reserved[p] { + t.Fatalf("iter %d: regular port %d collides with mini pair %d/%d", + iter, p, mini[0], mini[0]+GrpcPortOffset) + } + reserved[p] = true + } + } +} diff --git a/test/volume_server/framework/cluster.go b/test/volume_server/framework/cluster.go index 5153d6233..352e7f77c 100644 --- a/test/volume_server/framework/cluster.go +++ b/test/volume_server/framework/cluster.go @@ -85,17 +85,12 @@ func StartSingleVolumeCluster(t testing.TB, profile matrix.Profile) *Cluster { t.Fatalf("write security config: %v", err) } - miniPorts, err := testutil.AllocateMiniPorts(1) - if err != nil { - t.Fatalf("allocate master port pair: %v", err) - } - masterPort := miniPorts[0] - masterGrpcPort := masterPort + testutil.GrpcPortOffset - - ports, err := testutil.AllocatePorts(3) + miniPorts, ports, err := testutil.AllocatePortSet(1, 3) if err != nil { t.Fatalf("allocate ports: %v", err) } + masterPort := miniPorts[0] + masterGrpcPort := masterPort + testutil.GrpcPortOffset c := &Cluster{ testingTB: t, diff --git a/test/volume_server/framework/cluster_mixed.go b/test/volume_server/framework/cluster_mixed.go index 46189c93b..6c6b76adf 100644 --- a/test/volume_server/framework/cluster_mixed.go +++ b/test/volume_server/framework/cluster_mixed.go @@ -93,22 +93,17 @@ func StartMixedVolumeCluster(t testing.TB, profile matrix.Profile, goCount, rust t.Fatalf("write security config: %v", err) } - miniPorts, err := testutil.AllocateMiniPorts(1) - if err != nil { - t.Fatalf("allocate master port pair: %v", err) - } - masterPort := miniPorts[0] - masterGrpcPort := masterPort + testutil.GrpcPortOffset - // 2 ports per server (admin, grpc); add 1 more when public port is split out. portsPerServer := 2 if profile.SplitPublicPort { portsPerServer = 3 } - ports, err := testutil.AllocatePorts(total * portsPerServer) + miniPorts, ports, err := testutil.AllocatePortSet(1, total*portsPerServer) if err != nil { - t.Fatalf("allocate volume ports: %v", err) + t.Fatalf("allocate ports: %v", err) } + masterPort := miniPorts[0] + masterGrpcPort := masterPort + testutil.GrpcPortOffset isRust := make([]bool, total) for i := goCount; i < total; i++ { diff --git a/test/volume_server/framework/cluster_multi.go b/test/volume_server/framework/cluster_multi.go index 84a82f067..57748bcb0 100644 --- a/test/volume_server/framework/cluster_multi.go +++ b/test/volume_server/framework/cluster_multi.go @@ -75,13 +75,6 @@ func StartMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCount i t.Fatalf("write security config: %v", err) } - miniPorts, err := testutil.AllocateMiniPorts(1) - if err != nil { - t.Fatalf("allocate master port pair: %v", err) - } - masterPort := miniPorts[0] - masterGrpcPort := masterPort + testutil.GrpcPortOffset - // Allocate ports for all volume servers (3 ports per server: admin, grpc, public) // If SplitPublicPort is true, we need an additional port per server portsPerServer := 3 @@ -89,10 +82,12 @@ func StartMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCount i portsPerServer = 4 } totalPorts := serverCount * portsPerServer - ports, err := testutil.AllocatePorts(totalPorts) + miniPorts, ports, err := testutil.AllocatePortSet(1, totalPorts) if err != nil { - t.Fatalf("allocate volume ports: %v", err) + t.Fatalf("allocate ports: %v", err) } + masterPort := miniPorts[0] + masterGrpcPort := masterPort + testutil.GrpcPortOffset c := &MultiVolumeCluster{ testingTB: t, diff --git a/test/volume_server/framework/cluster_multi_rust.go b/test/volume_server/framework/cluster_multi_rust.go index 18030330a..dc2515ebd 100644 --- a/test/volume_server/framework/cluster_multi_rust.go +++ b/test/volume_server/framework/cluster_multi_rust.go @@ -86,13 +86,6 @@ func StartRustMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCou t.Fatalf("write security config: %v", err) } - miniPorts, err := testutil.AllocateMiniPorts(1) - if err != nil { - t.Fatalf("allocate master port pair: %v", err) - } - masterPort := miniPorts[0] - masterGrpcPort := masterPort + testutil.GrpcPortOffset - // Allocate ports for all volume servers (3 ports per server: admin, grpc, public) // If SplitPublicPort is true, we need an additional port per server portsPerServer := 3 @@ -100,10 +93,12 @@ func StartRustMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCou portsPerServer = 4 } totalPorts := serverCount * portsPerServer - ports, err := testutil.AllocatePorts(totalPorts) + miniPorts, ports, err := testutil.AllocatePortSet(1, totalPorts) if err != nil { - t.Fatalf("allocate volume ports: %v", err) + t.Fatalf("allocate ports: %v", err) } + masterPort := miniPorts[0] + masterGrpcPort := masterPort + testutil.GrpcPortOffset c := &RustMultiVolumeCluster{ testingTB: t, diff --git a/test/volume_server/framework/cluster_rust.go b/test/volume_server/framework/cluster_rust.go index 2c786c1f0..77b724e16 100644 --- a/test/volume_server/framework/cluster_rust.go +++ b/test/volume_server/framework/cluster_rust.go @@ -80,17 +80,12 @@ func StartRustVolumeCluster(t testing.TB, profile matrix.Profile) *RustCluster { t.Fatalf("write security config: %v", err) } - miniPorts, err := testutil.AllocateMiniPorts(1) - if err != nil { - t.Fatalf("allocate master port pair: %v", err) - } - masterPort := miniPorts[0] - masterGrpcPort := masterPort + testutil.GrpcPortOffset - - ports, err := testutil.AllocatePorts(3) + miniPorts, ports, err := testutil.AllocatePortSet(1, 3) if err != nil { t.Fatalf("allocate ports: %v", err) } + masterPort := miniPorts[0] + masterGrpcPort := masterPort + testutil.GrpcPortOffset rc := &RustCluster{ testingTB: t,