mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
2386fa550a
A Canceled/DeadlineExceeded from the caller's per-request context was treated like a dead channel: it closed the shared cached ClientConn and cancelled every other in-flight RPC on it with "the client connection is closing". Under a burst of concurrent chunk assigns (e.g. a large S3 multipart upload) one slow assign hitting its 10s attempt timeout could poison the connection for all the rest, cascading into a flood of 500s. Thread the caller's context into shouldInvalidateConnection and only invalidate on Canceled/DeadlineExceeded while that context is still live, which isolates the genuine stale-channel signal (a peer restart behind a k8s Service VIP). To carry the context, add a ctx parameter to the existing WithGrpcClient, WithMasterClient, and WithMasterServerClient; the master assign and volume-lookup paths pass their per-attempt context and every other caller passes context.Background().
39 lines
1.2 KiB
Go
39 lines
1.2 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/dustin/go-humanize/english"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
func ListExistingPeerUpdates(master pb.ServerAddress, grpcDialOption grpc.DialOption, filerGroup string, clientType string) (existingNodes []*master_pb.ClusterNodeUpdate) {
|
|
|
|
if grpcErr := pb.WithMasterClient(context.Background(), false, master, grpcDialOption, false, func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
|
ClientType: clientType,
|
|
FilerGroup: filerGroup,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
glog.V(0).Infof("the cluster has %d %s\n", len(resp.ClusterNodes), english.PluralWord(len(resp.ClusterNodes), clientType, ""))
|
|
for _, node := range resp.ClusterNodes {
|
|
existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{
|
|
NodeType: FilerType,
|
|
Address: node.Address,
|
|
IsAdd: true,
|
|
CreatedAtNs: node.CreatedAtNs,
|
|
})
|
|
}
|
|
return nil
|
|
}); grpcErr != nil {
|
|
glog.V(0).Infof("connect to %s: %v", master, grpcErr)
|
|
}
|
|
return
|
|
}
|