mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(master): notify clients after manual volume grow (#9656)
Co-authored-by: Neetika Mittal <mneetika@users.noreply.github.com>
This commit is contained in:
@@ -427,6 +427,13 @@ func (ms *MasterServer) broadcastToClients(message *master_pb.KeepConnectedRespo
|
||||
ms.clientChansLock.RUnlock()
|
||||
}
|
||||
|
||||
// broadcastVolumeLocationsToClients notifies connected clients about newly created volume locations.
|
||||
func (ms *MasterServer) broadcastVolumeLocationsToClients(locations []*master_pb.VolumeLocation) {
|
||||
for _, location := range locations {
|
||||
ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: location})
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error {
|
||||
leader, err := ms.Topo.Leader()
|
||||
if err != nil {
|
||||
|
||||
@@ -2,8 +2,10 @@ package weed_server
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -35,3 +37,38 @@ func TestInitialLockRingUpdateSkipsNonFilers(t *testing.T) {
|
||||
|
||||
assert.Nil(t, ms.initialLockRingUpdate(cluster.BrokerType, "group-a"))
|
||||
}
|
||||
|
||||
// TestBroadcastVolumeLocationsToClients verifies grown volume locations are sent to registered clients.
|
||||
func TestBroadcastVolumeLocationsToClients(t *testing.T) {
|
||||
clientChan := make(chan *master_pb.KeepConnectedResponse, 2)
|
||||
ms := &MasterServer{
|
||||
clientChans: map[string]chan *master_pb.KeepConnectedResponse{
|
||||
"default.filer@127.0.0.1:8888": clientChan,
|
||||
},
|
||||
}
|
||||
|
||||
ms.broadcastVolumeLocationsToClients([]*master_pb.VolumeLocation{
|
||||
{Url: "volume-a:8080", NewVids: []uint32{7}},
|
||||
{Url: "volume-b:8080", NewVids: []uint32{8}},
|
||||
})
|
||||
|
||||
var first *master_pb.KeepConnectedResponse
|
||||
select {
|
||||
case first = <-clientChan:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timed out waiting for first broadcast")
|
||||
}
|
||||
require.NotNil(t, first.GetVolumeLocation())
|
||||
assert.Equal(t, []uint32{7}, first.GetVolumeLocation().GetNewVids())
|
||||
assert.Equal(t, "volume-a:8080", first.GetVolumeLocation().GetUrl())
|
||||
|
||||
var second *master_pb.KeepConnectedResponse
|
||||
select {
|
||||
case second = <-clientChan:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timed out waiting for second broadcast")
|
||||
}
|
||||
require.NotNil(t, second.GetVolumeLocation())
|
||||
assert.Equal(t, []uint32{8}, second.GetVolumeLocation().GetNewVids())
|
||||
assert.Equal(t, "volume-b:8080", second.GetVolumeLocation().GetUrl())
|
||||
}
|
||||
|
||||
@@ -42,9 +42,7 @@ func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) {
|
||||
glog.V(1).Infof("automatic volume grow failed: %+v", err)
|
||||
return
|
||||
}
|
||||
for _, newVidLocation := range newVidLocations {
|
||||
ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation})
|
||||
}
|
||||
ms.broadcastVolumeLocationsToClients(newVidLocations)
|
||||
}
|
||||
|
||||
func (ms *MasterServer) ProcessGrowRequest() {
|
||||
|
||||
@@ -91,6 +91,9 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
|
||||
} else {
|
||||
var newVidLocations []*master_pb.VolumeLocation
|
||||
newVidLocations, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, uint32(count), option, ms.Topo)
|
||||
if len(newVidLocations) > 0 {
|
||||
ms.broadcastVolumeLocationsToClients(newVidLocations)
|
||||
}
|
||||
count = uint64(len(newVidLocations))
|
||||
}
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user