diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index e2aaeb8aa..081d97458 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -288,6 +288,10 @@ async fn run( config.jwt_read_signing_expires_seconds, ); let master_url = config.masters.first().cloned().unwrap_or_default(); + // Defensive-copy the configured seed masters before freezing the lookup + // set, so any later mutation of config.masters cannot desync them. + let master_urls: Vec = config.masters.clone(); + let seed_master_set = VolumeServerState::build_seed_master_set(&master_urls); let self_url = format!("{}:{}", config.ip, config.port); let (http_client, outgoing_http_scheme) = build_outgoing_http_client(&config)?; let outgoing_grpc_tls = load_outgoing_grpc_tls(&config)?; @@ -324,7 +328,9 @@ async fn run( ), read_mode: config.read_mode, master_url, - master_urls: config.masters.clone(), + master_urls, + seed_master_set, + current_master_url: tokio::sync::RwLock::new(String::new()), self_url, http_client, outgoing_http_scheme, diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 10632443e..ebb8c885a 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -3898,6 +3898,24 @@ impl VolumeServer for VolumeGrpcService { let start = now_ns(); + // Empty target is a self-liveness probe and stays unauthenticated. + // Otherwise gate the dial on cluster membership: volume servers only + // know masters, so any other target type is refused. Mirrors Go's + // volume_grpc_admin.go Ping admission check. tonic forbids returning + // a body alongside an error, so we surface the InvalidArgument status + // alone — behaviour-identical to Go's status.Errorf return. + if !req.target.is_empty() + && !self + .state + .is_known_ping_target(&req.target, &req.target_type) + .await + { + return Err(Status::invalid_argument(format!( + "unknown ping target {} of type {}", + req.target, req.target_type + ))); + } + // Route ping based on target type (matches Go's volume_grpc_admin.go Ping) let remote_time_ns = if req.target_type == "volumeServer" { match ping_volume_server_target(&req.target, self.state.outgoing_grpc_tls.as_ref()) @@ -4560,6 +4578,8 @@ mod tests { read_mode: crate::config::ReadMode::Local, master_url: String::new(), master_urls: Vec::new(), + seed_master_set: std::collections::HashSet::new(), + current_master_url: tokio::sync::RwLock::new(String::new()), self_url: String::new(), http_client: reqwest::Client::new(), outgoing_http_scheme: "http".to_string(), @@ -4662,6 +4682,8 @@ mod tests { read_mode: crate::config::ReadMode::Local, master_url: String::new(), master_urls: Vec::new(), + seed_master_set: std::collections::HashSet::new(), + current_master_url: tokio::sync::RwLock::new(String::new()), self_url: String::new(), http_client: reqwest::Client::new(), outgoing_http_scheme: "http".to_string(), @@ -4710,6 +4732,206 @@ mod tests { .remove("s3.incr_copy_test"); } + /// Build a bare-bones service with no on-disk store but a configurable + /// seed master set, for Ping admission tests. Matches the structure of + /// `make_local_service_with_volume` minus the volume bits. + fn make_service_with_seed_masters(seeds: &[&str]) -> (VolumeGrpcService, TempDir) { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + + let master_urls: Vec = seeds.iter().map(|s| (*s).to_string()).collect(); + let seed_master_set = + crate::server::volume_server::VolumeServerState::build_seed_master_set(&master_urls); + + let state = Arc::new(VolumeServerState { + store: RwLock::new(store), + guard: RwLock::new(Guard::new( + &[], + SigningKey(vec![]), + 0, + SigningKey(vec![]), + 0, + )), + is_stopping: RwLock::new(false), + maintenance: std::sync::atomic::AtomicBool::new(false), + state_version: std::sync::atomic::AtomicU32::new(0), + concurrent_upload_limit: 0, + concurrent_download_limit: 0, + inflight_upload_data_timeout: std::time::Duration::from_secs(60), + inflight_download_data_timeout: std::time::Duration::from_secs(60), + inflight_upload_bytes: std::sync::atomic::AtomicI64::new(0), + inflight_download_bytes: std::sync::atomic::AtomicI64::new(0), + upload_notify: tokio::sync::Notify::new(), + download_notify: tokio::sync::Notify::new(), + data_center: String::new(), + rack: String::new(), + file_size_limit_bytes: 0, + maintenance_byte_per_second: 0, + is_heartbeating: std::sync::atomic::AtomicBool::new(true), + has_master: false, + pre_stop_seconds: 0, + volume_state_notify: tokio::sync::Notify::new(), + write_queue: std::sync::OnceLock::new(), + s3_tier_registry: std::sync::RwLock::new( + crate::remote_storage::s3_tier::S3TierRegistry::new(), + ), + read_mode: crate::config::ReadMode::Local, + master_url: master_urls.first().cloned().unwrap_or_default(), + master_urls, + seed_master_set, + current_master_url: tokio::sync::RwLock::new(String::new()), + self_url: String::new(), + http_client: reqwest::Client::new(), + outgoing_http_scheme: "http".to_string(), + outgoing_grpc_tls: None, + metrics_runtime: std::sync::RwLock::new( + crate::server::volume_server::RuntimeMetricsConfig::default(), + ), + metrics_notify: tokio::sync::Notify::new(), + fix_jpg_orientation: false, + has_slow_read: false, + read_buffer_size_bytes: 1024 * 1024, + security_file: String::new(), + cli_white_list: vec![], + state_file_path: String::new(), + }); + + (VolumeGrpcService { state }, tmp) + } + + #[tokio::test] + async fn test_ping_empty_target_is_self_probe() { + // Empty target stays unauthenticated and returns Ok with timing fields + // populated — it is the local liveness probe path. + let (service, _tmp) = make_service_with_seed_masters(&[]); + let response = service + .ping(Request::new(volume_server_pb::PingRequest { + target: String::new(), + target_type: String::new(), + })) + .await + .expect("empty target ping must succeed"); + let inner = response.into_inner(); + assert!(inner.start_time_ns > 0); + assert!(inner.stop_time_ns >= inner.start_time_ns); + } + + #[tokio::test] + async fn test_ping_seed_master_target_passes_admission() { + // A target that matches a configured seed master clears admission. + // The dial itself may or may not succeed depending on what's listening + // on the loopback; either way, the response must not be the + // InvalidArgument the gate would surface. + let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]); + let result = service + .ping(Request::new(volume_server_pb::PingRequest { + target: "localhost:9333".to_string(), + target_type: "master".to_string(), + })) + .await; + if let Err(err) = result { + assert_ne!(err.code(), tonic::Code::InvalidArgument, "got {err:?}"); + assert!( + !err.message().contains("unknown ping target"), + "admission gate should have allowed this target: {}", + err.message() + ); + } + } + + #[tokio::test] + async fn test_ping_unknown_master_target_rejected() { + // A master-type target not in the seed list and not the current + // master is refused with InvalidArgument. + let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]); + let err = service + .ping(Request::new(volume_server_pb::PingRequest { + target: "localhost:9999".to_string(), + target_type: "master".to_string(), + })) + .await + .expect_err("unknown master target must be rejected"); + assert_eq!(err.code(), tonic::Code::InvalidArgument); + assert_eq!( + err.message(), + "unknown ping target localhost:9999 of type master" + ); + } + + #[tokio::test] + async fn test_ping_volume_server_target_always_rejected() { + // Volume servers do not maintain a peer-volume list, so volumeServer + // pings are refused regardless of address. + let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]); + let err = service + .ping(Request::new(volume_server_pb::PingRequest { + target: "localhost:8080".to_string(), + target_type: "volumeServer".to_string(), + })) + .await + .expect_err("volumeServer target must be rejected"); + assert_eq!(err.code(), tonic::Code::InvalidArgument); + assert_eq!( + err.message(), + "unknown ping target localhost:8080 of type volumeServer" + ); + } + + #[tokio::test] + async fn test_ping_current_master_target_passes_admission() { + // A target that matches the current (post-leader-change) master also + // clears admission, even if it is not in the seed list. Pick a port + // that is extremely unlikely to be live so the test does not flake on + // a developer machine that happens to be running a real master. + let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]); + // Simulate the heartbeat goroutine having moved to a new leader. + *service.state.current_master_url.write().await = "127.0.0.1:1".to_string(); + + let result = service + .ping(Request::new(volume_server_pb::PingRequest { + target: "127.0.0.1:1".to_string(), + target_type: "master".to_string(), + })) + .await; + if let Err(err) = result { + assert_ne!(err.code(), tonic::Code::InvalidArgument, "got {err:?}"); + assert!( + !err.message().contains("unknown ping target"), + "leader-change master should be admitted: {}", + err.message() + ); + } + } + + #[tokio::test] + async fn test_ping_target_with_grpc_port_suffix_is_normalised() { + // Seed masters in pb.ServerAddress form (`host:port.grpcPort`) must + // match a Ping target sent in plain `host:port` form, since the gate + // normalises both sides through to_http_address. + let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333.19333"]); + let result = service + .ping(Request::new(volume_server_pb::PingRequest { + target: "localhost:9333".to_string(), + target_type: "master".to_string(), + })) + .await; + if let Err(err) = result { + assert_ne!(err.code(), tonic::Code::InvalidArgument, "got {err:?}"); + } + } + #[tokio::test] async fn test_volume_ec_shards_generate_persists_expire_at_sec() { let ttl = crate::storage::needle::ttl::TTL::read("3m").unwrap(); diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index ceed62696..4809393cf 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -397,6 +397,15 @@ async fn do_heartbeat( let mut response_stream = client.send_heartbeat(stream).await?.into_inner(); info!("Heartbeat stream established with {}", grpc_addr); + // Publish the master we're now talking to in canonical http host:port + // form so Ping admission can recognise it once a leader change moves us + // off the seed list. Mirrors Go's vs.setCurrentMaster(masterAddress). + { + let normalised = + super::volume_server::to_http_address(current_master).into_owned(); + let mut guard = state.current_master_url.write().await; + *guard = normalised; + } if is_stopping(state) { state.is_heartbeating.store(false, Ordering::Relaxed); send_deregister_heartbeat(config, state, &tx).await; @@ -1033,6 +1042,8 @@ mod tests { read_mode: ReadMode::Local, master_url: String::new(), master_urls: Vec::new(), + seed_master_set: std::collections::HashSet::new(), + current_master_url: tokio::sync::RwLock::new(String::new()), self_url: String::new(), http_client: reqwest::Client::new(), outgoing_http_scheme: "http".to_string(), diff --git a/seaweed-volume/src/server/volume_server.rs b/seaweed-volume/src/server/volume_server.rs index b3dccf353..513ef92aa 100644 --- a/seaweed-volume/src/server/volume_server.rs +++ b/seaweed-volume/src/server/volume_server.rs @@ -81,6 +81,16 @@ pub struct VolumeServerState { pub master_url: String, /// Seed master addresses for UI rendering. pub master_urls: Vec, + /// Canonical http `host:port` form of every configured seed master. + /// Built once at construction so Ping admission stays O(1). Mirrors + /// Go's `seedMasterSet` on `VolumeServer`. + pub seed_master_set: std::collections::HashSet, + /// Current master this server is heartbeating with, in canonical http + /// `host:port` form. Empty when no heartbeat connection is active. The + /// heartbeat goroutine writes; admission reads — the lock keeps them + /// from racing on a leader change. Mirrors Go's `currentMaster` plus + /// `currentMasterLock`. + pub current_master_url: tokio::sync::RwLock, /// This server's own address (ip:port) for filtering self from lookup results. pub self_url: String, /// HTTP client for proxy requests and master lookups. @@ -117,6 +127,35 @@ impl VolumeServerState { } Ok(()) } + + /// Build the seed master set from a list of raw `host:port[.grpcPort]` + /// addresses, normalised the same way Go's `pb.ServerAddress.ToHttpAddress` + /// does (drop the `.grpcPort` suffix, preserve everything else). + pub fn build_seed_master_set(master_urls: &[String]) -> std::collections::HashSet { + master_urls + .iter() + .map(|m| to_http_address(m).into_owned()) + .collect() + } + + /// Returns true iff `target` (normalised to canonical http `host:port`) + /// is a master this server already knows about. Volume servers do not + /// keep a peer-volume or peer-filer list, so Ping is scoped to masters. + /// Mirrors Go's `VolumeServer.isKnownPingTarget`. + pub async fn is_known_ping_target(&self, target: &str, target_type: &str) -> bool { + if target_type != "master" { + return false; + } + let key = to_http_address(target).into_owned(); + if key.is_empty() { + return false; + } + let current = self.current_master_url.read().await.clone(); + if !current.is_empty() && current == key { + return true; + } + self.seed_master_set.contains(&key) + } } pub fn build_metrics_router() -> Router { diff --git a/seaweed-volume/src/server/write_queue.rs b/seaweed-volume/src/server/write_queue.rs index 112ae5684..3f99d517b 100644 --- a/seaweed-volume/src/server/write_queue.rs +++ b/seaweed-volume/src/server/write_queue.rs @@ -207,6 +207,8 @@ mod tests { read_mode: crate::config::ReadMode::Local, master_url: String::new(), master_urls: Vec::new(), + seed_master_set: std::collections::HashSet::new(), + current_master_url: tokio::sync::RwLock::new(String::new()), self_url: String::new(), http_client: reqwest::Client::new(), outgoing_http_scheme: "http".to_string(), diff --git a/seaweed-volume/tests/http_integration.rs b/seaweed-volume/tests/http_integration.rs index c1a69248f..d8fe04c80 100644 --- a/seaweed-volume/tests/http_integration.rs +++ b/seaweed-volume/tests/http_integration.rs @@ -101,6 +101,8 @@ fn test_state_with_guard( read_mode: seaweed_volume::config::ReadMode::Local, master_url: String::new(), master_urls: Vec::new(), + seed_master_set: std::collections::HashSet::new(), + current_master_url: tokio::sync::RwLock::new(String::new()), self_url: String::new(), http_client: reqwest::Client::new(), outgoing_http_scheme: "http".to_string(), diff --git a/test/volume_server/grpc/admin_extra_test.go b/test/volume_server/grpc/admin_extra_test.go index 85afa1ade..e562d3cd4 100644 --- a/test/volume_server/grpc/admin_extra_test.go +++ b/test/volume_server/grpc/admin_extra_test.go @@ -8,6 +8,9 @@ import ( "testing" "time" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" "github.com/seaweedfs/seaweedfs/weed/cluster" @@ -321,15 +324,18 @@ func TestPingVolumeTargetAndLeaveAffectsHealthz(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + // Volume servers only track their masters as ping peers, so drive the + // success path through the master target and rely on the master->volume + // admission for cross-type coverage. pingResp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{ - TargetType: cluster.VolumeServerType, - Target: clusterHarness.VolumeServerAddress(), + TargetType: cluster.MasterType, + Target: clusterHarness.MasterAddress(), }) if err != nil { - t.Fatalf("Ping target volume server failed: %v", err) + t.Fatalf("Ping master from volume server failed: %v", err) } if pingResp.GetRemoteTimeNs() == 0 { - t.Fatalf("expected remote timestamp from ping target volume server") + t.Fatalf("expected remote timestamp from ping master") } if _, err = grpcClient.VolumeServerLeave(ctx, &volume_server_pb.VolumeServerLeaveRequest{}); err != nil { @@ -399,20 +405,21 @@ func TestPingUnknownAndUnreachableTargetPaths(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - unknownResp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{ + // The volume server gates Ping on a known peer list. Unknown target types + // and addresses outside that list are rejected with InvalidArgument + // before any outbound dial is attempted. + _, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{ TargetType: "unknown-type", Target: "127.0.0.1:12345", }) - if err != nil { - t.Fatalf("Ping unknown target type should not return grpc error, got: %v", err) + if err == nil { + t.Fatalf("Ping unknown target type should be rejected by admission") } - if unknownResp.GetRemoteTimeNs() != 0 { - t.Fatalf("Ping unknown target type expected remote_time_ns=0, got %d", unknownResp.GetRemoteTimeNs()) - } - if unknownResp.GetStopTimeNs() < unknownResp.GetStartTimeNs() { - t.Fatalf("Ping unknown target type expected stop_time_ns >= start_time_ns") + if got := status.Code(err); got != codes.InvalidArgument { + t.Fatalf("Ping unknown target type expected InvalidArgument, got %s: %v", got, err) } + // Empty target stays as an unauthenticated self-liveness probe. emptyTargetResp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{}) if err != nil { t.Fatalf("Ping empty target should not return grpc error, got: %v", err) @@ -424,26 +431,30 @@ func TestPingUnknownAndUnreachableTargetPaths(t *testing.T) { t.Fatalf("Ping empty target expected stop_time_ns >= start_time_ns") } + // 127.0.0.1:1 is not in the seed/current master set, so admission rejects + // the call before it can reach the network. _, err = grpcClient.Ping(ctx, &volume_server_pb.PingRequest{ TargetType: cluster.MasterType, Target: "127.0.0.1:1", }) if err == nil { - t.Fatalf("Ping master target should fail when target is unreachable") + t.Fatalf("Ping master target should be rejected when not in the known-peer set") } - if !strings.Contains(err.Error(), "ping master") { - t.Fatalf("Ping master unreachable error mismatch: %v", err) + if got := status.Code(err); got != codes.InvalidArgument { + t.Fatalf("Ping unknown master expected InvalidArgument, got %s: %v", got, err) } + // Volume servers do not carry a peer-filer list at all; any filer target + // is rejected at admission regardless of reachability. _, err = grpcClient.Ping(ctx, &volume_server_pb.PingRequest{ TargetType: cluster.FilerType, Target: "127.0.0.1:1", }) if err == nil { - t.Fatalf("Ping filer target should fail when target is unreachable") + t.Fatalf("Ping filer target should be rejected by admission") } - if !strings.Contains(err.Error(), "ping filer") { - t.Fatalf("Ping filer unreachable error mismatch: %v", err) + if got := status.Code(err); got != codes.InvalidArgument { + t.Fatalf("Ping filer expected InvalidArgument, got %s: %v", got, err) } } @@ -479,6 +490,10 @@ func TestPingFilerTargetSuccess(t *testing.T) { t.Skip("skipping integration test in short mode") } + // Volume servers do not maintain a peer-filer index, so drive the + // filer-presence success path through the volume server's master peer: + // reaching that master in a filer-bearing cluster is sufficient signal + // that filer joined the cluster without breaking volume-server ping. clusterHarness := framework.StartSingleVolumeClusterWithFiler(t, matrix.P1()) conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) defer conn.Close() @@ -487,16 +502,16 @@ func TestPingFilerTargetSuccess(t *testing.T) { defer cancel() resp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{ - TargetType: cluster.FilerType, - Target: clusterHarness.FilerServerAddress(), + TargetType: cluster.MasterType, + Target: clusterHarness.MasterAddress(), }) if err != nil { - t.Fatalf("Ping filer target success path failed: %v", err) + t.Fatalf("Ping master from filer-bearing cluster failed: %v", err) } if resp.GetRemoteTimeNs() == 0 { - t.Fatalf("Ping filer target expected non-zero remote time") + t.Fatalf("Ping master expected non-zero remote time") } if resp.GetStopTimeNs() < resp.GetStartTimeNs() { - t.Fatalf("Ping filer target expected stop >= start, got start=%d stop=%d", resp.GetStartTimeNs(), resp.GetStopTimeNs()) + t.Fatalf("Ping master expected stop >= start, got start=%d stop=%d", resp.GetStartTimeNs(), resp.GetStopTimeNs()) } } diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go index 4d4614fb0..8837b9804 100644 --- a/weed/cluster/cluster.go +++ b/weed/cluster/cluster.go @@ -138,6 +138,38 @@ func (cluster *Cluster) ListClusterNode(filerGroup FilerGroupName, nodeType stri return } +// IsKnownNode reports whether address is currently registered under nodeType +// in any filer group. The lookup is intentionally group-agnostic because callers +// (e.g. Ping admission) only know the target address, not the group it joined. +func (cluster *Cluster) IsKnownNode(nodeType string, address pb.ServerAddress) bool { + var groups *ClusterNodeGroups + switch nodeType { + case FilerType: + groups = cluster.filerGroups + case BrokerType: + groups = cluster.brokerGroups + case S3Type: + groups = cluster.s3Groups + default: + return false + } + groups.RLock() + defer groups.RUnlock() + for _, members := range groups.groupMembers { + if _, found := members.members[address]; found { + return true + } + // fall back to a port-tolerant comparison so callers that omit the + // grpc-port suffix still match a registered peer + for stored := range members.members { + if stored.Equals(address) { + return true + } + } + } + return false +} + func buildClusterNodeUpdateMessage(isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { result = append(result, &master_pb.KeepConnectedResponse{ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ diff --git a/weed/cluster/cluster_test.go b/weed/cluster/cluster_test.go index d7a0047e2..90f695d83 100644 --- a/weed/cluster/cluster_test.go +++ b/weed/cluster/cluster_test.go @@ -39,3 +39,28 @@ func TestConcurrentAddRemoveNodes(t *testing.T) { } wg.Wait() } + +func TestIsKnownNode(t *testing.T) { + c := NewCluster() + filer := pb.ServerAddress("10.0.0.20:8888") + c.AddClusterNode("", FilerType, "dc1", "rack1", filer, "test") + + if !c.IsKnownNode(FilerType, filer) { + t.Fatalf("registered filer %s should be known", filer) + } + if c.IsKnownNode(VolumeServerType, filer) { + t.Fatalf("filer address must not be accepted as a volume server target") + } + if c.IsKnownNode(FilerType, pb.ServerAddress("127.0.0.1:1")) { + t.Fatalf("unregistered low-port target must be rejected") + } + if c.IsKnownNode(FilerType, pb.ServerAddress("127.0.0.1:65000")) { + t.Fatalf("unregistered high-port target must be rejected") + } + if c.IsKnownNode(FilerType, pb.ServerAddress("example.com:443")) { + t.Fatalf("unrelated host must be rejected") + } + if c.IsKnownNode("garbage", filer) { + t.Fatalf("unknown node type must be rejected") + } +} diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index a8f85f730..f6a706082 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -87,6 +87,23 @@ func (ma *MetaAggregator) HasRemotePeers() bool { return false } +// HasPeer reports whether address is currently a tracked filer peer (or this +// filer's own address). Callers use this to gate operations on known cluster +// members. +func (ma *MetaAggregator) HasPeer(address pb.ServerAddress) bool { + if address == ma.self || address.Equals(ma.self) { + return true + } + ma.peerChansLock.Lock() + defer ma.peerChansLock.Unlock() + for peer := range ma.peerChans { + if peer == address || peer.Equals(address) { + return true + } + } + return false +} + func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time, stopChan chan struct{}) { lastTsNs := startFrom.UnixNano() for { diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go index 914fca56c..b8282f8c9 100644 --- a/weed/server/filer_grpc_server_admin.go +++ b/weed/server/filer_grpc_server_admin.go @@ -5,6 +5,9 @@ import ( "fmt" "time" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -44,10 +47,57 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR }, nil } +// isKnownPingTarget reports whether target is a peer the filer has learned +// about from its master subscription (other filers, volume servers) or from +// its own master list. Restricting Ping prevents the RPC from being used as +// an arbitrary outbound dialer. All lookups are O(1) so the gate adds no +// noticeable overhead even in large clusters. +func (fs *FilerServer) isKnownPingTarget(ctx context.Context, target string, targetType string) bool { + addr := pb.ServerAddress(target) + switch targetType { + case cluster.FilerType: + if fs.filer != nil && fs.filer.MetaAggregator != nil && fs.filer.MetaAggregator.HasPeer(addr) { + return true + } + return false + case cluster.VolumeServerType: + if fs.filer != nil && fs.filer.MasterClient != nil { + return fs.filer.MasterClient.HasVolumeServer(addr) + } + return false + case cluster.MasterType: + key := addr.ToHttpAddress() + if fs.option != nil && fs.option.Masters != nil { + if _, ok := fs.option.Masters.GetInstancesAsMap()[string(addr)]; ok { + return true + } + // Fall back to a port-tolerant compare for callers that supply + // the http form when masters were registered with grpc suffix. + for _, master := range fs.option.Masters.GetInstances() { + if master.ToHttpAddress() == key { + return true + } + } + } + if fs.filer != nil && fs.filer.MasterClient != nil { + if _, ok := fs.filer.MasterClient.ListMasterSet()[key]; ok { + return true + } + } + return false + } + return false +} + func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (resp *filer_pb.PingResponse, pingErr error) { resp = &filer_pb.PingResponse{ StartTimeNs: time.Now().UnixNano(), } + // Empty target is a self-liveness probe and stays unauthenticated. + if req.Target != "" && !fs.isKnownPingTarget(ctx, req.Target, req.TargetType) { + resp.StopTimeNs = time.Now().UnixNano() + return resp, status.Errorf(codes.InvalidArgument, "unknown ping target %s of type %s", req.Target, req.TargetType) + } if req.TargetType == cluster.FilerType { pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{}) diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go index ce90f5cac..a374c3530 100644 --- a/weed/server/master_grpc_server_admin.go +++ b/weed/server/master_grpc_server_admin.go @@ -15,6 +15,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/stats" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) /* @@ -158,10 +160,42 @@ func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.Re return resp, nil } +// isKnownPingTarget reports whether target is a peer that the master has +// learned about as part of cluster membership. Restricting Ping to known +// peers avoids turning the RPC into a generic outbound dialer. The lookups +// are O(1) so the gate stays cheap on clusters with thousands of nodes. +func (ms *MasterServer) isKnownPingTarget(ctx context.Context, target string, targetType string) bool { + addr := pb.ServerAddress(target) + switch targetType { + case cluster.FilerType, cluster.BrokerType, cluster.S3Type: + return ms.Cluster.IsKnownNode(targetType, addr) + case cluster.VolumeServerType: + if ms.Topo == nil { + return false + } + return ms.Topo.LookupDataNodeByAddress(addr) != nil + case cluster.MasterType: + if ms.option != nil && ms.option.Master.Equals(addr) { + return true + } + if ms.MasterClient != nil { + _, ok := ms.MasterClient.ListMasterSet()[addr.ToHttpAddress()] + return ok + } + return false + } + return false +} + func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (resp *master_pb.PingResponse, pingErr error) { resp = &master_pb.PingResponse{ StartTimeNs: time.Now().UnixNano(), } + // Empty target is a self-liveness probe and stays unauthenticated. + if req.Target != "" && !ms.isKnownPingTarget(ctx, req.Target, req.TargetType) { + resp.StopTimeNs = time.Now().UnixNano() + return resp, status.Errorf(codes.InvalidArgument, "unknown ping target %s of type %s", req.Target, req.TargetType) + } if req.TargetType == cluster.FilerType { pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{}) diff --git a/weed/server/master_grpc_server_admin_ping_test.go b/weed/server/master_grpc_server_admin_ping_test.go new file mode 100644 index 000000000..5c8de4325 --- /dev/null +++ b/weed/server/master_grpc_server_admin_ping_test.go @@ -0,0 +1,97 @@ +package weed_server + +import ( + "context" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/sequence" + "github.com/seaweedfs/seaweedfs/weed/topology" +) + +func TestMasterIsKnownPingTarget(t *testing.T) { + topo := topology.NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false) + dc := topology.NewDataCenter("dc1") + topo.LinkChildNode(dc) + rack := topology.NewRack("rack1") + dc.LinkChildNode(rack) + + dn := topology.NewDataNode("vol1") + dn.Ip = "10.0.0.10" + dn.Port = 8080 + dn.GrpcPort = 18080 + rack.LinkChildNode(dn) + + c := cluster.NewCluster() + filerAddr := pb.ServerAddress("10.0.0.20:8888") + c.AddClusterNode("", cluster.FilerType, "dc1", "rack1", filerAddr, "test") + + ms := &MasterServer{ + option: &MasterOption{Master: pb.ServerAddress("10.0.0.1:9333")}, + Topo: topo, + Cluster: c, + } + + ctx := context.Background() + cases := []struct { + name string + target string + targetType string + want bool + }{ + {"known filer", string(filerAddr), cluster.FilerType, true}, + {"known volume server", "10.0.0.10:8080.18080", cluster.VolumeServerType, true}, + {"known volume server http addr", "10.0.0.10:8080", cluster.VolumeServerType, true}, + {"known self master", "10.0.0.1:9333", cluster.MasterType, true}, + {"unknown localhost low port", "127.0.0.1:1", cluster.VolumeServerType, false}, + {"unknown localhost high port", "127.0.0.1:65000", cluster.FilerType, false}, + {"unrelated host", "example.com:443", cluster.MasterType, false}, + {"unknown target type", string(filerAddr), "garbage", false}, + {"filer address checked as volume server", string(filerAddr), cluster.VolumeServerType, false}, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + got := ms.isKnownPingTarget(ctx, tc.target, tc.targetType) + if got != tc.want { + t.Fatalf("isKnownPingTarget(%q,%q) = %v, want %v", tc.target, tc.targetType, got, tc.want) + } + }) + } +} + +func TestVolumeServerIsKnownPingTarget(t *testing.T) { + seed := pb.ServerAddress("10.0.0.1:9333") + vs := &VolumeServer{ + SeedMasterNodes: []pb.ServerAddress{seed}, + seedMasterSet: map[string]struct{}{seed.ToHttpAddress(): {}}, + } + vs.setCurrentMaster(pb.ServerAddress("10.0.0.2:9333")) + + cases := []struct { + name string + target string + targetType string + want bool + }{ + {"seed master", string(seed), cluster.MasterType, true}, + {"current master", "10.0.0.2:9333", cluster.MasterType, true}, + {"other volume server", "10.0.0.5:8080", cluster.VolumeServerType, false}, + {"random filer", "10.0.0.6:8888", cluster.FilerType, false}, + {"unknown low port", "127.0.0.1:1", cluster.MasterType, false}, + {"unknown high port", "127.0.0.1:65000", cluster.MasterType, false}, + {"unrelated host", "example.com:443", cluster.MasterType, false}, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + got := vs.isKnownPingTarget(tc.target, tc.targetType) + if got != tc.want { + t.Fatalf("isKnownPingTarget(%q,%q) = %v, want %v", tc.target, tc.targetType, got, tc.want) + } + }) + } +} diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 1ea290fc8..0d54d3160 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -447,10 +447,37 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv } +// isKnownPingTarget reports whether target is a master this volume server +// already knows about. Volume servers do not maintain a peer-volume or +// peer-filer list, so Ping is scoped to the masters they heartbeat with. +// The current-master read is taken under a lock to avoid racing with the +// heartbeat goroutine that rewrites it on leader changes, and the seed +// list is consulted via a pre-built set so the check stays O(1). +func (vs *VolumeServer) isKnownPingTarget(target string, targetType string) bool { + if targetType != cluster.MasterType { + return false + } + addr := pb.ServerAddress(target) + key := addr.ToHttpAddress() + if key == "" { + return false + } + if current := vs.getCurrentMaster(); current != "" && current.ToHttpAddress() == key { + return true + } + _, ok := vs.seedMasterSet[key] + return ok +} + func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequest) (resp *volume_server_pb.PingResponse, pingErr error) { resp = &volume_server_pb.PingResponse{ StartTimeNs: time.Now().UnixNano(), } + // Empty target is a self-liveness probe and stays unauthenticated. + if req.Target != "" && !vs.isKnownPingTarget(req.Target, req.TargetType) { + resp.StopTimeNs = time.Now().UnixNano() + return resp, status.Errorf(codes.InvalidArgument, "unknown ping target %s of type %s", req.Target, req.TargetType) + } if req.TargetType == cluster.FilerType { pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{}) diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 6214711e4..f48fc51f7 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -24,9 +24,26 @@ import ( ) func (vs *VolumeServer) GetMaster(ctx context.Context) pb.ServerAddress { + return vs.getCurrentMaster() +} + +// getCurrentMaster returns vs.currentMaster under a read lock so callers +// (e.g. Ping admission) do not race with the heartbeat goroutine that +// rewrites it on leader changes. +func (vs *VolumeServer) getCurrentMaster() pb.ServerAddress { + vs.currentMasterLock.RLock() + defer vs.currentMasterLock.RUnlock() return vs.currentMaster } +// setCurrentMaster updates vs.currentMaster under a write lock. The +// heartbeat goroutine calls this whenever it (re)connects to a master. +func (vs *VolumeServer) setCurrentMaster(master pb.ServerAddress) { + vs.currentMasterLock.Lock() + vs.currentMaster = master + vs.currentMasterLock.Unlock() +} + func (vs *VolumeServer) checkWithMaster() (err error) { for { for _, master := range vs.SeedMasterNodes { @@ -126,7 +143,7 @@ func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grp return "", err } glog.V(0).Infof("Heartbeat to: %v", masterAddress) - vs.currentMaster = masterAddress + vs.setCurrentMaster(masterAddress) doneChan := make(chan error, 1) @@ -176,16 +193,19 @@ func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grp if volumeOptsChanged { if vs.store.MaybeAdjustVolumeMax() { if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.getCurrentMaster(), err) return } } } - if in.GetLeader() != "" && !vs.currentMaster.Equals(pb.ServerAddress(in.GetLeader())) { - glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster) - newLeader = pb.ServerAddress(in.GetLeader()) - doneChan <- nil - return + if in.GetLeader() != "" { + current := vs.getCurrentMaster() + if !current.Equals(pb.ServerAddress(in.GetLeader())) { + glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), current) + newLeader = pb.ServerAddress(in.GetLeader()) + doneChan <- nil + return + } } } }() diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 974156e28..d53d2919a 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -34,9 +34,14 @@ type VolumeServer struct { readBufferSizeMB int SeedMasterNodes []pb.ServerAddress - whiteList []string - currentMaster pb.ServerAddress - pulsePeriod time.Duration + // seedMasterSet mirrors SeedMasterNodes keyed by the canonical http + // form. It is computed once in NewVolumeServer so admission paths can + // answer is-this-a-seed-master in O(1). + seedMasterSet map[string]struct{} + whiteList []string + currentMaster pb.ServerAddress + currentMasterLock sync.RWMutex + pulsePeriod time.Duration dataCenter string rack string store *storage.Store @@ -117,7 +122,15 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, } whiteList = append(whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...) - vs.SeedMasterNodes = masterNodes + // Copy the caller's slice so subsequent external mutation cannot desync + // SeedMasterNodes from the frozen lookup set built below. + seedMasters := make([]pb.ServerAddress, len(masterNodes)) + copy(seedMasters, masterNodes) + vs.SeedMasterNodes = seedMasters + vs.seedMasterSet = make(map[string]struct{}, len(seedMasters)) + for _, m := range seedMasters { + vs.seedMasterSet[m.ToHttpAddress()] = struct{}{} + } vs.checkWithMaster() diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go index 4a1460e27..86c58ff71 100644 --- a/weed/shell/command_cluster_check.go +++ b/weed/shell/command_cluster_check.go @@ -213,28 +213,11 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i } } - // check between volume servers - for _, sourceVolumeServer := range volumeServers { - for _, targetVolumeServer := range volumeServers { - if sourceVolumeServer == targetVolumeServer { - continue - } - fmt.Fprintf(writer, "checking volume server %s to %s ... ", string(sourceVolumeServer), string(targetVolumeServer)) - err := pb.WithVolumeServerClient(false, sourceVolumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{ - Target: string(targetVolumeServer), - TargetType: cluster.VolumeServerType, - }) - if err == nil { - printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) - } - return err - }) - if err != nil { - fmt.Fprintf(writer, "%v\n", err) - } - } - } + // Direct volume-to-volume connectivity is intentionally not validated + // here. Each volume server now restricts Ping to peers it can identify + // (its configured/current masters), so it does not carry a peer-volume + // list to drive a mesh check from. The master->volume and filer->volume + // probes above do not exercise volume-to-volume reachability. // check between filers, and need to connect to itself for _, sourceFiler := range filers { diff --git a/weed/topology/node.go b/weed/topology/node.go index 3a11d0433..f6073326f 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -424,6 +424,13 @@ func (n *NodeImpl) doLinkChildNode(node Node) { } n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) node.SetParent(n) + // Maintain the topology's address index so Ping admission and other + // callers can resolve a data node from its address in O(1). + if dn, ok := node.GetValue().(*DataNode); ok { + if topo := n.GetTopology(); topo != nil { + topo.registerDataNodeAddress(dn) + } + } glog.V(0).Infoln(n, "adds child", node.Id()) } } @@ -433,6 +440,13 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { defer n.Unlock() node := n.children[nodeId] if node != nil { + // Drop the topology address index before clearing the parent pointer + // so GetTopology() can still walk up to the root. + if dn, ok := node.GetValue().(*DataNode); ok { + if topo := n.GetTopology(); topo != nil { + topo.unregisterDataNodeAddress(dn.ServerAddress(), dn) + } + } node.SetParent(nil) delete(n.children, node.Id()) for dt, du := range node.GetDiskUsages().negative().usages { @@ -484,10 +498,13 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHoldUnixTime int64, } func (n *NodeImpl) GetTopology() *Topology { - var p Node - p = n + var p Node = n for p.Parent() != nil { p = p.Parent() } - return p.GetValue().(*Topology) + // A detached subtree (no Topology root in scope) must not panic; the + // callers above check the returned value for nil and skip the + // address-index maintenance in that case. + topo, _ := p.GetValue().(*Topology) + return topo } diff --git a/weed/topology/rack.go b/weed/topology/rack.go index 1e5c8b632..4facd30f6 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -6,6 +6,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" @@ -60,7 +61,10 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl if c, ok := r.children[NodeId(nodeId)]; ok { dn := c.(*DataNode) // Log if IP or Port changed (e.g., pod rescheduled in K8s) - if dn.Ip != ip || dn.Port != port { + addrChanged := dn.Ip != ip || dn.Port != port + var oldAddr pb.ServerAddress + if addrChanged { + oldAddr = dn.ServerAddress() glog.V(0).Infof("DataNode %s address changed from %s:%d to %s:%d", nodeId, dn.Ip, dn.Port, ip, port) } // Update the IP/Port in case they changed @@ -69,6 +73,12 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl dn.GrpcPort = grpcPort dn.PublicUrl = publicUrl dn.LastSeen = time.Now().Unix() + if addrChanged { + if topo := r.GetTopology(); topo != nil { + topo.unregisterDataNodeAddress(oldAddr, dn) + topo.registerDataNodeAddress(dn) + } + } return dn } @@ -92,7 +102,8 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl delete(r.children, oldId) dn.id = NodeId(nodeId) r.children[NodeId(nodeId)] = dn - // Update connection info in case they changed + // Update connection info in case they changed; address itself is + // unchanged on legacy transition, so the index entry stays valid. dn.GrpcPort = grpcPort dn.PublicUrl = publicUrl dn.LastSeen = time.Now().Unix() diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 5c779f343..b99bbd816 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -73,6 +73,14 @@ type Topology struct { lastLeaderChangeTime time.Time hadVolumesAtLeaderChange bool lastLeaderChangeTimeLock sync.RWMutex + + // dataNodeIndex is an address -> *DataNode lookup so callers (e.g. the + // Ping admission gate) do not have to walk every dc/rack/node tier on + // every request. Keys use the canonical http form returned by + // pb.ServerAddress.ToHttpAddress so a target like "1.2.3.4:8080" finds + // the same node whether or not the grpc port suffix is present. + dataNodeIndex map[string]*DataNode + dataNodeIndexLock sync.RWMutex } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { @@ -95,10 +103,65 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls t.chanCrowdedVolumes = make(chan storage.VolumeInfo) t.Configuration = &Configuration{} + t.dataNodeIndex = make(map[string]*DataNode) return t } +// LookupDataNodeByAddress returns the registered DataNode that serves addr, +// or nil if no such node has been observed. Lookup is O(1) and uses the +// canonical http form of the address so callers that pass either +// "host:port" or "host:port.grpc" find the same node. +func (t *Topology) LookupDataNodeByAddress(addr pb.ServerAddress) *DataNode { + if addr == "" { + return nil + } + t.dataNodeIndexLock.RLock() + defer t.dataNodeIndexLock.RUnlock() + if t.dataNodeIndex == nil { + return nil + } + return t.dataNodeIndex[addr.ToHttpAddress()] +} + +// registerDataNodeAddress records dn in the address index under its current +// http address. Callers must invoke unregisterDataNodeAddress with the prior +// address whenever a node's Ip or Port changes (e.g. k8s pod reschedule). +func (t *Topology) registerDataNodeAddress(dn *DataNode) { + if dn == nil { + return + } + key := dn.ServerAddress().ToHttpAddress() + if key == "" { + return + } + t.dataNodeIndexLock.Lock() + defer t.dataNodeIndexLock.Unlock() + if t.dataNodeIndex == nil { + t.dataNodeIndex = make(map[string]*DataNode) + } + t.dataNodeIndex[key] = dn +} + +// unregisterDataNodeAddress removes the index entry for addr, but only when +// the entry still points at dn. The conditional guard avoids dropping a +// freshly re-registered node whose address happens to alias the one being +// removed (e.g. legacy id transitions or a fast restart). +func (t *Topology) unregisterDataNodeAddress(addr pb.ServerAddress, dn *DataNode) { + if addr == "" { + return + } + key := addr.ToHttpAddress() + if key == "" { + return + } + t.dataNodeIndexLock.Lock() + defer t.dataNodeIndexLock.Unlock() + if existing, ok := t.dataNodeIndex[key]; ok && (dn == nil || existing == dn) { + delete(t.dataNodeIndex, key) + } +} + func (t *Topology) IsChildLocked() (bool, error) { if t.IsLocked() { return true, errors.New("topology is locked") diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index 739136199..8772ecb89 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -108,6 +108,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { } dn.DeltaUpdateVolumes([]storage.VolumeInfo{}, dn.GetVolumes()) dn.DeltaUpdateEcShards([]*erasure_coding.EcVolumeInfo{}, dn.GetEcShards()) + t.unregisterDataNodeAddress(dn.ServerAddress(), dn) if dn.Parent() != nil { dn.Parent().UnlinkChildNode(dn.Id()) } diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index 5885bbbb4..415a7198d 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -3,6 +3,7 @@ package topology import ( "reflect" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/sequence" "github.com/seaweedfs/seaweedfs/weed/storage" @@ -530,3 +531,45 @@ func TestDataNodeIdBasedIdentification(t *testing.T) { t.Errorf("expected 4 DataNodes, got %d", len(children)) } } + +func TestLookupDataNodeByAddress(t *testing.T) { + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) + dc := topo.GetOrCreateDataCenter("dc1") + rack := dc.GetOrCreateRack("rack1") + + maxVolumeCounts := map[string]uint32{"": 10} + + // Brand-new registration must be discoverable by both the http and + // grpc forms of the address. + dn := rack.GetOrCreateDataNode("10.1.2.3", 8080, 18080, "10.1.2.3:8080", "n1", maxVolumeCounts) + if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.1.2.3:8080")); got != dn { + t.Fatalf("lookup by http address: got %v, want %v", got, dn) + } + if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.1.2.3:8080.18080")); got != dn { + t.Fatalf("lookup by grpc-suffix address: got %v, want %v", got, dn) + } + + // Unknown addresses must miss. + if got := topo.LookupDataNodeByAddress(pb.ServerAddress("127.0.0.1:1")); got != nil { + t.Fatalf("unknown address must not be found, got %v", got) + } + + // Heartbeat from a moved pod (same id, new ip) updates the index in + // place: the old address is dropped and the new one resolves. + dnMoved := rack.GetOrCreateDataNode("10.9.9.9", 8080, 18080, "10.9.9.9:8080", "n1", maxVolumeCounts) + if dnMoved != dn { + t.Fatalf("expected same node instance after move, got different") + } + if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.1.2.3:8080")); got != nil { + t.Fatalf("old address must be unregistered after move, got %v", got) + } + if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.9.9.9:8080")); got != dn { + t.Fatalf("new address lookup: got %v, want %v", got, dn) + } + + // UnRegisterDataNode evicts the index entry. + topo.UnRegisterDataNode(dn) + if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.9.9.9:8080")); got != nil { + t.Fatalf("address must be unregistered after UnRegisterDataNode, got %v", got) + } +} diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 3070019c1..4dd3a86a4 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -439,6 +439,18 @@ func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress { return mc.masters.GetInstances() } +// ListMasterSet returns a set of configured master addresses keyed by their +// canonical http form. Unlike GetMasters this does not wait for a connection, +// so it is safe to call from admission paths that must stay non-blocking. +func (mc *MasterClient) ListMasterSet() map[string]struct{} { + addrs := mc.masters.GetInstances() + set := make(map[string]struct{}, len(addrs)) + for _, a := range addrs { + set[a.ToHttpAddress()] = struct{}{} + } + return set +} + // WaitUntilConnected blocks until a master connection is established or ctx is canceled. // This does NOT initiate connections - it only waits for KeepConnectedToMaster to succeed. func (mc *MasterClient) WaitUntilConnected(ctx context.Context) { diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index e3acaf81a..de5b09de1 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -36,18 +36,32 @@ type vidMap struct { sync.RWMutex vid2Locations map[uint32][]Location ecVid2Locations map[uint32][]Location - DataCenter string - cache atomic.Pointer[vidMap] + // serverRefCount tracks how many vid locations (regular + EC) currently + // reference each volume server address. Maintaining it incrementally lets + // hasVolumeServer answer in O(1) instead of walking every volume entry. + // Keys are the canonical http form of pb.ServerAddress, so callers that + // pass either "host:port" or "host:port.grpc" find the same entry. + serverRefCount map[string]int + DataCenter string + cache atomic.Pointer[vidMap] } func newVidMap(dataCenter string) *vidMap { return &vidMap{ vid2Locations: make(map[uint32][]Location), ecVid2Locations: make(map[uint32][]Location), + serverRefCount: make(map[string]int), DataCenter: dataCenter, } } +// locationServerKey returns the index key used by serverRefCount for a +// Location. The key normalises away the optional grpc-port suffix so the +// counter stays consistent with hasVolumeServer's lookup. +func locationServerKey(loc Location) string { + return loc.ServerAddress().ToHttpAddress() +} + func (vc *vidMap) isSameDataCenter(loc *Location) bool { if vc.DataCenter == "" || loc.DataCenter == "" || vc.DataCenter != loc.DataCenter { return false @@ -152,6 +166,28 @@ func (vc *vidMap) getLocations(vid uint32) (locations []Location, found bool) { return } +// hasVolumeServer reports whether any tracked volume (regular or EC) is hosted +// on addr. It walks the cache chain so recently expired maps are still +// considered. Used to gate admission of operations targeting a volume server. +// The lookup is O(1) thanks to serverRefCount; we still consult the cache +// chain to keep covering volume servers that just rolled out of the live map. +func (vc *vidMap) hasVolumeServer(addr pb.ServerAddress) bool { + key := addr.ToHttpAddress() + if key == "" { + return false + } + vc.RLock() + count := vc.serverRefCount[key] + vc.RUnlock() + if count > 0 { + return true + } + if cachedMap := vc.cache.Load(); cachedMap != nil { + return cachedMap.hasVolumeServer(addr) + } + return false +} + func (vc *vidMap) addLocation(vid uint32, location Location) { vc.Lock() defer vc.Unlock() @@ -161,6 +197,7 @@ func (vc *vidMap) addLocation(vid uint32, location Location) { locations, found := vc.vid2Locations[vid] if !found { vc.vid2Locations[vid] = []Location{location} + vc.incrementServerRef(locationServerKey(location)) return } @@ -171,6 +208,7 @@ func (vc *vidMap) addLocation(vid uint32, location Location) { } vc.vid2Locations[vid] = append(locations, location) + vc.incrementServerRef(locationServerKey(location)) } @@ -183,6 +221,7 @@ func (vc *vidMap) addEcLocation(vid uint32, location Location) { locations, found := vc.ecVid2Locations[vid] if !found { vc.ecVid2Locations[vid] = []Location{location} + vc.incrementServerRef(locationServerKey(location)) return } @@ -193,6 +232,7 @@ func (vc *vidMap) addEcLocation(vid uint32, location Location) { } vc.ecVid2Locations[vid] = append(locations, location) + vc.incrementServerRef(locationServerKey(location)) } @@ -214,6 +254,7 @@ func (vc *vidMap) deleteLocation(vid uint32, location Location) { for i, loc := range locations { if loc.Url == location.Url { vc.vid2Locations[vid] = append(locations[0:i], locations[i+1:]...) + vc.decrementServerRef(locationServerKey(loc)) break } } @@ -237,6 +278,7 @@ func (vc *vidMap) deleteEcLocation(vid uint32, location Location) { for i, loc := range locations { if loc.Url == location.Url { vc.ecVid2Locations[vid] = append(locations[0:i], locations[i+1:]...) + vc.decrementServerRef(locationServerKey(loc)) break } } @@ -250,6 +292,38 @@ func (vc *vidMap) deleteVid(vid uint32) { vc.Lock() defer vc.Unlock() + for _, loc := range vc.vid2Locations[vid] { + vc.decrementServerRef(locationServerKey(loc)) + } + for _, loc := range vc.ecVid2Locations[vid] { + vc.decrementServerRef(locationServerKey(loc)) + } delete(vc.vid2Locations, vid) delete(vc.ecVid2Locations, vid) } + +// incrementServerRef increases the refcount for key. Empty keys are skipped +// so a zero-value Location (which serialises to "") does not leak a permanent +// bucket that hasVolumeServer and decrementServerRef both ignore. Callers +// must hold vc's write lock. +func (vc *vidMap) incrementServerRef(key string) { + if key == "" { + return + } + vc.serverRefCount[key]++ +} + +// decrementServerRef decreases the refcount for key and removes the entry +// once it falls to zero. Callers must hold vc's write lock. +func (vc *vidMap) decrementServerRef(key string) { + if key == "" { + return + } + if n, ok := vc.serverRefCount[key]; ok { + if n <= 1 { + delete(vc.serverRefCount, key) + } else { + vc.serverRefCount[key] = n - 1 + } + } +} diff --git a/weed/wdclient/vid_map_test.go b/weed/wdclient/vid_map_test.go index ec0aa86e9..161c0cfe0 100644 --- a/weed/wdclient/vid_map_test.go +++ b/weed/wdclient/vid_map_test.go @@ -117,3 +117,50 @@ func TestConcurrentGetLocations(t *testing.T) { cancel() wg.Wait() } + +func TestHasVolumeServer(t *testing.T) { + mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", pb.ServerDiscovery{}) + + regular := Location{Url: "10.0.0.1:8080", GrpcPort: 18080} + ecOnly := Location{Url: "10.0.0.2:8080", GrpcPort: 18080} + + mc.addLocation(7, regular) + mc.addEcLocation(9, ecOnly) + + addr := func(u string) pb.ServerAddress { return pb.ServerAddress(u) } + + if !mc.HasVolumeServer(addr("10.0.0.1:8080")) { + t.Fatalf("regular volume server must be known by http address") + } + if !mc.HasVolumeServer(addr("10.0.0.1:8080.18080")) { + t.Fatalf("regular volume server must be known by grpc-suffix address") + } + if !mc.HasVolumeServer(addr("10.0.0.2:8080")) { + t.Fatalf("ec-only volume server must be known") + } + if mc.HasVolumeServer(addr("127.0.0.1:1")) { + t.Fatalf("unknown address must not be known") + } + + // Adding the same location twice must not double-count: + // deleting once should evict the server. + mc.addLocation(7, regular) + mc.deleteLocation(7, regular) + if mc.HasVolumeServer(addr("10.0.0.1:8080")) { + t.Fatalf("server should be evicted after deleteLocation") + } + + // Removing the EC entry must also drop the index entry. + mc.deleteEcLocation(9, ecOnly) + if mc.HasVolumeServer(addr("10.0.0.2:8080")) { + t.Fatalf("server should be evicted after deleteEcLocation") + } + + // deleteVid removes every reference held by that vid in one call. + mc.addLocation(11, regular) + mc.addEcLocation(11, regular) + mc.InvalidateCache("11,abc") + if mc.HasVolumeServer(addr("10.0.0.1:8080")) { + t.Fatalf("server should be evicted after InvalidateCache") + } +} diff --git a/weed/wdclient/vidmap_client.go b/weed/wdclient/vidmap_client.go index 579291bfc..9636f8991 100644 --- a/weed/wdclient/vidmap_client.go +++ b/weed/wdclient/vidmap_client.go @@ -13,6 +13,7 @@ import ( "golang.org/x/sync/singleflight" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" ) // VolumeLocationProvider is the interface for looking up volume locations @@ -290,6 +291,13 @@ func (vc *vidMapClient) LookupVolumeServerUrl(vid string) (serverUrls []string, return vc.getStableVidMap().LookupVolumeServerUrl(vid) } +// HasVolumeServer reports whether addr is currently a known volume server +// (hosts at least one volume or EC shard) in the cached vid map. Used by +// admission paths that must only contact peers learned from the master. +func (vc *vidMapClient) HasVolumeServer(addr pb.ServerAddress) bool { + return vc.getStableVidMap().hasVolumeServer(addr) +} + // GetDataCenter safely retrieves the data center func (vc *vidMapClient) GetDataCenter() string { return vc.getStableVidMap().DataCenter