diff --git a/control-plane/csi-driver/src/bin/node/main_.rs b/control-plane/csi-driver/src/bin/node/main_.rs index 827a29f59..5bda459cf 100644 --- a/control-plane/csi-driver/src/bin/node/main_.rs +++ b/control-plane/csi-driver/src/bin/node/main_.rs @@ -21,7 +21,6 @@ use stor_port::platform; use utils::tracing_telemetry::{FmtLayer, FmtStyle}; use crate::client::AppNodesClientWrapper; -use anyhow::anyhow; use clap::Arg; use futures::TryFutureExt; use serde_json::json; @@ -108,11 +107,11 @@ pub(super) async fn main() -> anyhow::Result<()> { .required(true), ) .arg( - Arg::new("instance-endpoint") - .long("instance-endpoint") - .env("MY_POD_IP") - .help("Endpoint of current instance") - .required(true) + Arg::new("enable-registration") + .long("enable-registration") + .action(clap::ArgAction::SetTrue) + .value_name("BOOLEAN") + .help("Disable registration of the node with the control plane") ) .arg( Arg::new("csi-socket") @@ -133,10 +132,10 @@ pub(super) async fn main() -> anyhow::Result<()> { Arg::new("grpc-endpoint") .short('g') .long("grpc-endpoint") - .value_name("NAME") + .value_name("IP_ADDRESS") .help("ip address where this instance runs, and optionally the gRPC port") - .default_value("0.0.0.0") - .required(false) + .env("MY_POD_IP") + .required(true) ) .arg( Arg::new("v") @@ -302,28 +301,42 @@ pub(super) async fn main() -> anyhow::Result<()> { let client = AppNodesClientWrapper::initialize(matches.get_one::("rest-endpoint").unwrap())?; + let registration_enabled = matches.get_flag("enable-registration"); + // Parse instance and grpc endpoints from the command line arguments and validate. - let (instance_sock_addr, grpc_sock_addr) = validate_endpoints( - matches.get_one::("instance-endpoint").unwrap(), + let grpc_sock_addr = validate_endpoints( matches.get_one::("grpc-endpoint").unwrap(), + registration_enabled, )?; - // Start the CSI server, node plugin grpc server and registration loop. + // Start the CSI server, node plugin grpc server and registration loop if registration is + // enabled. *crate::config::config().nvme_as_mut() = TryFrom::try_from(&matches)?; - tokio::select! { - result = CsiServer::run(csi_socket, &matches)? => { - result?; + if registration_enabled { + tokio::select! { + result = CsiServer::run(csi_socket, &matches)? => { + result?; + } + result = NodePluginGrpcServer::run(grpc_sock_addr) => { + result?; + } + _ = run_registration_loop(node_name.clone(), grpc_sock_addr.to_string(), Some(csi_labels), &client, registration_enabled) => {} } - result = NodePluginGrpcServer::run(grpc_sock_addr) => { - result?; + // Deregister the node from the control plane on termination. + if let Err(error) = client.deregister_app_node(node_name).await { + error!("Failed to deregister node, {:?}", error); + } + } else { + tokio::select! { + result = CsiServer::run(csi_socket, &matches)? => { + result?; + } + result = NodePluginGrpcServer::run(grpc_sock_addr) => { + result?; + } } - _ = run_registration_loop(node_name.clone(), instance_sock_addr.to_string(), Some(csi_labels), &client) => {} } - // Deregister the node from the control plane on termination. - if let Err(error) = client.deregister_app_node(node_name).await { - error!("Failed to deregister node, {:?}", error); - } Ok(()) } @@ -400,41 +413,23 @@ async fn check_ana_and_label_node( Ok(()) } -/// Validate that the instance endpoint and grpc endpoint are valid, and returns the instance -/// endpoint. +/// Validate that the grpc endpoint is valid. fn validate_endpoints( - instance_endpoint: &str, grpc_endpoint: &str, -) -> anyhow::Result<(SocketAddr, SocketAddr)> { - // Append the port to the grpc endpoint if it is not specified. + registration_enabled: bool, +) -> anyhow::Result { let grpc_endpoint = if grpc_endpoint.contains(':') { grpc_endpoint.to_string() } else { format!("{grpc_endpoint}:{GRPC_PORT}") }; - - // Append the port to the instance endpoint with the grpc endpoint's port if it is not - // specified. - let instance_endpoint = if instance_endpoint.contains(':') { - instance_endpoint.to_string() - } else { - format!( - "{instance_endpoint}:{}", - grpc_endpoint - .split(':') - .last() - .ok_or(anyhow!("gRPC endpoint must have a port"))? - ) - }; - - let instance_url = SocketAddr::from_str(&instance_endpoint)?; let grpc_endpoint_url = SocketAddr::from_str(&grpc_endpoint)?; - - if instance_url.port() != grpc_endpoint_url.port() { - return Err(anyhow!( - "instance endpoint and gRPC endpoint must have the same port" + // Should not allow using an unspecified ip if registration is enabled as grpc endpoint gets + // sent in registration request. + if registration_enabled && grpc_endpoint_url.ip().is_unspecified() { + return Err(anyhow::format_err!( + "gRPC endpoint: `0.0.0.0` is not allowed if registration is enabled" )); } - - Ok((instance_url, grpc_endpoint_url)) + Ok(grpc_endpoint_url) } diff --git a/control-plane/csi-driver/src/bin/node/registration.rs b/control-plane/csi-driver/src/bin/node/registration.rs index 9c77c6323..c071e3d35 100644 --- a/control-plane/csi-driver/src/bin/node/registration.rs +++ b/control-plane/csi-driver/src/bin/node/registration.rs @@ -19,11 +19,21 @@ pub(crate) async fn run_registration_loop( endpoint: String, labels: Option>, client: &AppNodesClientWrapper, + registration_enabled: bool, ) { + if !registration_enabled { + return; + } let mut logged_error = false; loop { let interval_duration = match client.register_app_node(&id, &endpoint, &labels).await { - Ok(_) => REGISTRATION_INTERVAL_ON_SUCCESS, + Ok(_) => { + if logged_error { + tracing::info!("Successfully re-registered the app_node"); + logged_error = false; + } + REGISTRATION_INTERVAL_ON_SUCCESS + } Err(e) => { if !logged_error { error!("Failed to register app node: {:?}", e);