cluster: restrict Ping RPC to known peers of the requested type (#9445)

Ping previously dialled whatever host:port the caller asked for. Gate
each server's Ping handler on cluster membership: masters check the
topology, registered cluster nodes, and configured master peers; volume
servers only accept their seed/current masters; filers accept tracked
peer filers, the master-learned volume server set, and configured
masters.

Use address-indexed peer lookups to keep Ping target validation O(1):
- topology maintains a pb.ServerAddress -> *DataNode index alongside
  the dc/rack/node tree, kept in sync from doLinkChildNode and
  UnlinkChildNode plus the ip/port-rewrite branch in
  GetOrCreateDataNode. GetTopology now returns nil on a detached
  subtree instead of panicking, so the linkage hooks can no-op safely.
- vid_map tracks a refcount per volume-server address so
  hasVolumeServer answers without scanning every vid location. The
  add path skips empty-address entries the same way the delete path
  already does, so a zero-value Location cannot leak a permanent
  serverRefCount[""] bucket.
- masters reuse a cached master-address set from MasterClient instead
  of walking the configured peer slice on every request.
- volume servers compare against a pre-built seed-master set and
  protect currentMaster reads/writes with an RWMutex, fixing the
  data race with the heartbeat goroutine. The seed slice is copied
  on construction so external mutation cannot desync it from the
  frozen lookup set.
- cluster.check drops the direct volume-to-volume sweep; volume
  servers no longer carry a peer-volume list, and the note next to
  the dropped probe is reworded to make clear that direct
  volume-to-volume reachability is intentionally not validated by
  this command.

Update the volume-server integration tests that drove Ping through the
new admission gate: success-path coverage now targets the master peer
(the only type a volume server tracks), and the unknown/unreachable
path asserts the InvalidArgument the gate now returns instead of the
old downstream dial error.

Mirror the same admission gate in the Rust volume server crate: a
seed-master HashSet built once at startup plus a tokio RwLock over the
heartbeat-tracked current master, both consulted in is_known_ping_target
on every Ping, with InvalidArgument returned for any target that isn't
a recognised master.
This commit is contained in:
Chris Lu
2026-05-12 13:00:52 -07:00
committed by GitHub
parent 5004b4e542
commit 10cc06333b
26 changed files with 935 additions and 64 deletions
+7 -1
View File
@@ -288,6 +288,10 @@ async fn run(
config.jwt_read_signing_expires_seconds,
);
let master_url = config.masters.first().cloned().unwrap_or_default();
// Defensive-copy the configured seed masters before freezing the lookup
// set, so any later mutation of config.masters cannot desync them.
let master_urls: Vec<String> = config.masters.clone();
let seed_master_set = VolumeServerState::build_seed_master_set(&master_urls);
let self_url = format!("{}:{}", config.ip, config.port);
let (http_client, outgoing_http_scheme) = build_outgoing_http_client(&config)?;
let outgoing_grpc_tls = load_outgoing_grpc_tls(&config)?;
@@ -324,7 +328,9 @@ async fn run(
),
read_mode: config.read_mode,
master_url,
master_urls: config.masters.clone(),
master_urls,
seed_master_set,
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url,
http_client,
outgoing_http_scheme,
+222
View File
@@ -3898,6 +3898,24 @@ impl VolumeServer for VolumeGrpcService {
let start = now_ns();
// Empty target is a self-liveness probe and stays unauthenticated.
// Otherwise gate the dial on cluster membership: volume servers only
// know masters, so any other target type is refused. Mirrors Go's
// volume_grpc_admin.go Ping admission check. tonic forbids returning
// a body alongside an error, so we surface the InvalidArgument status
// alone — behaviour-identical to Go's status.Errorf return.
if !req.target.is_empty()
&& !self
.state
.is_known_ping_target(&req.target, &req.target_type)
.await
{
return Err(Status::invalid_argument(format!(
"unknown ping target {} of type {}",
req.target, req.target_type
)));
}
// Route ping based on target type (matches Go's volume_grpc_admin.go Ping)
let remote_time_ns = if req.target_type == "volumeServer" {
match ping_volume_server_target(&req.target, self.state.outgoing_grpc_tls.as_ref())
@@ -4560,6 +4578,8 @@ mod tests {
read_mode: crate::config::ReadMode::Local,
master_url: String::new(),
master_urls: Vec::new(),
seed_master_set: std::collections::HashSet::new(),
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),
@@ -4662,6 +4682,8 @@ mod tests {
read_mode: crate::config::ReadMode::Local,
master_url: String::new(),
master_urls: Vec::new(),
seed_master_set: std::collections::HashSet::new(),
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),
@@ -4710,6 +4732,206 @@ mod tests {
.remove("s3.incr_copy_test");
}
/// Build a bare-bones service with no on-disk store but a configurable
/// seed master set, for Ping admission tests. Matches the structure of
/// `make_local_service_with_volume` minus the volume bits.
fn make_service_with_seed_masters(seeds: &[&str]) -> (VolumeGrpcService, TempDir) {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store
.add_location(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
let master_urls: Vec<String> = seeds.iter().map(|s| (*s).to_string()).collect();
let seed_master_set =
crate::server::volume_server::VolumeServerState::build_seed_master_set(&master_urls);
let state = Arc::new(VolumeServerState {
store: RwLock::new(store),
guard: RwLock::new(Guard::new(
&[],
SigningKey(vec![]),
0,
SigningKey(vec![]),
0,
)),
is_stopping: RwLock::new(false),
maintenance: std::sync::atomic::AtomicBool::new(false),
state_version: std::sync::atomic::AtomicU32::new(0),
concurrent_upload_limit: 0,
concurrent_download_limit: 0,
inflight_upload_data_timeout: std::time::Duration::from_secs(60),
inflight_download_data_timeout: std::time::Duration::from_secs(60),
inflight_upload_bytes: std::sync::atomic::AtomicI64::new(0),
inflight_download_bytes: std::sync::atomic::AtomicI64::new(0),
upload_notify: tokio::sync::Notify::new(),
download_notify: tokio::sync::Notify::new(),
data_center: String::new(),
rack: String::new(),
file_size_limit_bytes: 0,
maintenance_byte_per_second: 0,
is_heartbeating: std::sync::atomic::AtomicBool::new(true),
has_master: false,
pre_stop_seconds: 0,
volume_state_notify: tokio::sync::Notify::new(),
write_queue: std::sync::OnceLock::new(),
s3_tier_registry: std::sync::RwLock::new(
crate::remote_storage::s3_tier::S3TierRegistry::new(),
),
read_mode: crate::config::ReadMode::Local,
master_url: master_urls.first().cloned().unwrap_or_default(),
master_urls,
seed_master_set,
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),
outgoing_grpc_tls: None,
metrics_runtime: std::sync::RwLock::new(
crate::server::volume_server::RuntimeMetricsConfig::default(),
),
metrics_notify: tokio::sync::Notify::new(),
fix_jpg_orientation: false,
has_slow_read: false,
read_buffer_size_bytes: 1024 * 1024,
security_file: String::new(),
cli_white_list: vec![],
state_file_path: String::new(),
});
(VolumeGrpcService { state }, tmp)
}
#[tokio::test]
async fn test_ping_empty_target_is_self_probe() {
// Empty target stays unauthenticated and returns Ok with timing fields
// populated — it is the local liveness probe path.
let (service, _tmp) = make_service_with_seed_masters(&[]);
let response = service
.ping(Request::new(volume_server_pb::PingRequest {
target: String::new(),
target_type: String::new(),
}))
.await
.expect("empty target ping must succeed");
let inner = response.into_inner();
assert!(inner.start_time_ns > 0);
assert!(inner.stop_time_ns >= inner.start_time_ns);
}
#[tokio::test]
async fn test_ping_seed_master_target_passes_admission() {
// A target that matches a configured seed master clears admission.
// The dial itself may or may not succeed depending on what's listening
// on the loopback; either way, the response must not be the
// InvalidArgument the gate would surface.
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]);
let result = service
.ping(Request::new(volume_server_pb::PingRequest {
target: "localhost:9333".to_string(),
target_type: "master".to_string(),
}))
.await;
if let Err(err) = result {
assert_ne!(err.code(), tonic::Code::InvalidArgument, "got {err:?}");
assert!(
!err.message().contains("unknown ping target"),
"admission gate should have allowed this target: {}",
err.message()
);
}
}
#[tokio::test]
async fn test_ping_unknown_master_target_rejected() {
// A master-type target not in the seed list and not the current
// master is refused with InvalidArgument.
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]);
let err = service
.ping(Request::new(volume_server_pb::PingRequest {
target: "localhost:9999".to_string(),
target_type: "master".to_string(),
}))
.await
.expect_err("unknown master target must be rejected");
assert_eq!(err.code(), tonic::Code::InvalidArgument);
assert_eq!(
err.message(),
"unknown ping target localhost:9999 of type master"
);
}
#[tokio::test]
async fn test_ping_volume_server_target_always_rejected() {
// Volume servers do not maintain a peer-volume list, so volumeServer
// pings are refused regardless of address.
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]);
let err = service
.ping(Request::new(volume_server_pb::PingRequest {
target: "localhost:8080".to_string(),
target_type: "volumeServer".to_string(),
}))
.await
.expect_err("volumeServer target must be rejected");
assert_eq!(err.code(), tonic::Code::InvalidArgument);
assert_eq!(
err.message(),
"unknown ping target localhost:8080 of type volumeServer"
);
}
#[tokio::test]
async fn test_ping_current_master_target_passes_admission() {
// A target that matches the current (post-leader-change) master also
// clears admission, even if it is not in the seed list. Pick a port
// that is extremely unlikely to be live so the test does not flake on
// a developer machine that happens to be running a real master.
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]);
// Simulate the heartbeat goroutine having moved to a new leader.
*service.state.current_master_url.write().await = "127.0.0.1:1".to_string();
let result = service
.ping(Request::new(volume_server_pb::PingRequest {
target: "127.0.0.1:1".to_string(),
target_type: "master".to_string(),
}))
.await;
if let Err(err) = result {
assert_ne!(err.code(), tonic::Code::InvalidArgument, "got {err:?}");
assert!(
!err.message().contains("unknown ping target"),
"leader-change master should be admitted: {}",
err.message()
);
}
}
#[tokio::test]
async fn test_ping_target_with_grpc_port_suffix_is_normalised() {
// Seed masters in pb.ServerAddress form (`host:port.grpcPort`) must
// match a Ping target sent in plain `host:port` form, since the gate
// normalises both sides through to_http_address.
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333.19333"]);
let result = service
.ping(Request::new(volume_server_pb::PingRequest {
target: "localhost:9333".to_string(),
target_type: "master".to_string(),
}))
.await;
if let Err(err) = result {
assert_ne!(err.code(), tonic::Code::InvalidArgument, "got {err:?}");
}
}
#[tokio::test]
async fn test_volume_ec_shards_generate_persists_expire_at_sec() {
let ttl = crate::storage::needle::ttl::TTL::read("3m").unwrap();
+11
View File
@@ -397,6 +397,15 @@ async fn do_heartbeat(
let mut response_stream = client.send_heartbeat(stream).await?.into_inner();
info!("Heartbeat stream established with {}", grpc_addr);
// Publish the master we're now talking to in canonical http host:port
// form so Ping admission can recognise it once a leader change moves us
// off the seed list. Mirrors Go's vs.setCurrentMaster(masterAddress).
{
let normalised =
super::volume_server::to_http_address(current_master).into_owned();
let mut guard = state.current_master_url.write().await;
*guard = normalised;
}
if is_stopping(state) {
state.is_heartbeating.store(false, Ordering::Relaxed);
send_deregister_heartbeat(config, state, &tx).await;
@@ -1033,6 +1042,8 @@ mod tests {
read_mode: ReadMode::Local,
master_url: String::new(),
master_urls: Vec::new(),
seed_master_set: std::collections::HashSet::new(),
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),
@@ -81,6 +81,16 @@ pub struct VolumeServerState {
pub master_url: String,
/// Seed master addresses for UI rendering.
pub master_urls: Vec<String>,
/// Canonical http `host:port` form of every configured seed master.
/// Built once at construction so Ping admission stays O(1). Mirrors
/// Go's `seedMasterSet` on `VolumeServer`.
pub seed_master_set: std::collections::HashSet<String>,
/// Current master this server is heartbeating with, in canonical http
/// `host:port` form. Empty when no heartbeat connection is active. The
/// heartbeat goroutine writes; admission reads — the lock keeps them
/// from racing on a leader change. Mirrors Go's `currentMaster` plus
/// `currentMasterLock`.
pub current_master_url: tokio::sync::RwLock<String>,
/// This server's own address (ip:port) for filtering self from lookup results.
pub self_url: String,
/// HTTP client for proxy requests and master lookups.
@@ -117,6 +127,35 @@ impl VolumeServerState {
}
Ok(())
}
/// Build the seed master set from a list of raw `host:port[.grpcPort]`
/// addresses, normalised the same way Go's `pb.ServerAddress.ToHttpAddress`
/// does (drop the `.grpcPort` suffix, preserve everything else).
pub fn build_seed_master_set(master_urls: &[String]) -> std::collections::HashSet<String> {
master_urls
.iter()
.map(|m| to_http_address(m).into_owned())
.collect()
}
/// Returns true iff `target` (normalised to canonical http `host:port`)
/// is a master this server already knows about. Volume servers do not
/// keep a peer-volume or peer-filer list, so Ping is scoped to masters.
/// Mirrors Go's `VolumeServer.isKnownPingTarget`.
pub async fn is_known_ping_target(&self, target: &str, target_type: &str) -> bool {
if target_type != "master" {
return false;
}
let key = to_http_address(target).into_owned();
if key.is_empty() {
return false;
}
let current = self.current_master_url.read().await.clone();
if !current.is_empty() && current == key {
return true;
}
self.seed_master_set.contains(&key)
}
}
pub fn build_metrics_router() -> Router {
+2
View File
@@ -207,6 +207,8 @@ mod tests {
read_mode: crate::config::ReadMode::Local,
master_url: String::new(),
master_urls: Vec::new(),
seed_master_set: std::collections::HashSet::new(),
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),
+2
View File
@@ -101,6 +101,8 @@ fn test_state_with_guard(
read_mode: seaweed_volume::config::ReadMode::Local,
master_url: String::new(),
master_urls: Vec::new(),
seed_master_set: std::collections::HashSet::new(),
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),