diff --git a/control-plane/csi-driver/src/bin/node/client.rs b/control-plane/csi-driver/src/bin/node/client.rs index 4a4894c78..c95853d6a 100644 --- a/control-plane/csi-driver/src/bin/node/client.rs +++ b/control-plane/csi-driver/src/bin/node/client.rs @@ -102,7 +102,13 @@ pub(crate) struct AppNodesClientWrapper { impl AppNodesClientWrapper { /// Initialize AppNodes API client instance. - pub(crate) fn initialize(endpoint: &String) -> anyhow::Result { + pub(crate) fn initialize( + endpoint: Option<&String>, + ) -> anyhow::Result> { + let Some(endpoint) = endpoint else { + return Ok(None); + }; + let url = clients::tower::Url::parse(endpoint) .map_err(|error| anyhow!("Invalid API endpoint URL {}: {:?}", endpoint, error))?; @@ -121,9 +127,9 @@ impl AppNodesClientWrapper { endpoint, DEFAULT_TIMEOUT_FOR_REST_REQUESTS, ); - Ok(Self { + Ok(Some(Self { client: AppNodesClient::new(Arc::new(tower)), - }) + })) } /// Register an app node. diff --git a/control-plane/csi-driver/src/bin/node/main_.rs b/control-plane/csi-driver/src/bin/node/main_.rs index 827a29f59..c54dd2947 100644 --- a/control-plane/csi-driver/src/bin/node/main_.rs +++ b/control-plane/csi-driver/src/bin/node/main_.rs @@ -21,7 +21,6 @@ use stor_port::platform; use utils::tracing_telemetry::{FmtLayer, FmtStyle}; use crate::client::AppNodesClientWrapper; -use anyhow::anyhow; use clap::Arg; use futures::TryFutureExt; use serde_json::json; @@ -103,16 +102,15 @@ pub(super) async fn main() -> anyhow::Result<()> { Arg::new("rest-endpoint") .long("rest-endpoint") .env("ENDPOINT") - .default_value("http://ksnode-1:30011") .help("A URL endpoint to the control plane's rest endpoint") - .required(true), ) .arg( - Arg::new("instance-endpoint") - .long("instance-endpoint") - .env("MY_POD_IP") - .help("Endpoint of current instance") - .required(true) + Arg::new("enable-registration") + .long("enable-registration") + .action(clap::ArgAction::SetTrue) + .value_name("BOOLEAN") + .requires("rest-endpoint") + .help("Enable registration of the csi node with the control plane") ) .arg( Arg::new("csi-socket") @@ -133,10 +131,9 @@ pub(super) async fn main() -> anyhow::Result<()> { Arg::new("grpc-endpoint") .short('g') .long("grpc-endpoint") - .value_name("NAME") + .value_name("ENDPOINT") .help("ip address where this instance runs, and optionally the gRPC port") - .default_value("0.0.0.0") - .required(false) + .required(true) ) .arg( Arg::new("v") @@ -299,32 +296,31 @@ pub(super) async fn main() -> anyhow::Result<()> { } // Initialize the rest api client. - let client = - AppNodesClientWrapper::initialize(matches.get_one::("rest-endpoint").unwrap())?; + let client = AppNodesClientWrapper::initialize(matches.get_one::("rest-endpoint"))?; + + let registration_enabled = matches.get_flag("enable-registration"); // Parse instance and grpc endpoints from the command line arguments and validate. - let (instance_sock_addr, grpc_sock_addr) = validate_endpoints( - matches.get_one::("instance-endpoint").unwrap(), + let grpc_sock_addr = validate_endpoints( matches.get_one::("grpc-endpoint").unwrap(), + registration_enabled, )?; - // Start the CSI server, node plugin grpc server and registration loop. + // Start the CSI server, node plugin grpc server and registration loop if registration is + // enabled. *crate::config::config().nvme_as_mut() = TryFrom::try_from(&matches)?; - tokio::select! { - result = CsiServer::run(csi_socket, &matches)? => { - result?; - } - result = NodePluginGrpcServer::run(grpc_sock_addr) => { - result?; - } - _ = run_registration_loop(node_name.clone(), instance_sock_addr.to_string(), Some(csi_labels), &client) => {} - } - - // Deregister the node from the control plane on termination. - if let Err(error) = client.deregister_app_node(node_name).await { - error!("Failed to deregister node, {:?}", error); - } - Ok(()) + let (csi, grpc, registration) = tokio::join!( + CsiServer::run(csi_socket, &matches)?, + NodePluginGrpcServer::run(grpc_sock_addr), + run_registration_loop( + node_name.clone(), + grpc_sock_addr.to_string(), + Some(csi_labels), + &client, + registration_enabled + ) + ); + vec![csi, grpc, registration].into_iter().collect() } struct CsiServer {} @@ -400,41 +396,23 @@ async fn check_ana_and_label_node( Ok(()) } -/// Validate that the instance endpoint and grpc endpoint are valid, and returns the instance -/// endpoint. +/// Validate that the grpc endpoint is valid. fn validate_endpoints( - instance_endpoint: &str, grpc_endpoint: &str, -) -> anyhow::Result<(SocketAddr, SocketAddr)> { - // Append the port to the grpc endpoint if it is not specified. + registration_enabled: bool, +) -> anyhow::Result { let grpc_endpoint = if grpc_endpoint.contains(':') { grpc_endpoint.to_string() } else { format!("{grpc_endpoint}:{GRPC_PORT}") }; - - // Append the port to the instance endpoint with the grpc endpoint's port if it is not - // specified. - let instance_endpoint = if instance_endpoint.contains(':') { - instance_endpoint.to_string() - } else { - format!( - "{instance_endpoint}:{}", - grpc_endpoint - .split(':') - .last() - .ok_or(anyhow!("gRPC endpoint must have a port"))? - ) - }; - - let instance_url = SocketAddr::from_str(&instance_endpoint)?; let grpc_endpoint_url = SocketAddr::from_str(&grpc_endpoint)?; - - if instance_url.port() != grpc_endpoint_url.port() { - return Err(anyhow!( - "instance endpoint and gRPC endpoint must have the same port" + // Should not allow using an unspecified ip if registration is enabled as grpc endpoint gets + // sent in registration request. + if registration_enabled && grpc_endpoint_url.ip().is_unspecified() { + return Err(anyhow::format_err!( + "gRPC endpoint: `0.0.0.0` is not allowed if registration is enabled" )); } - - Ok((instance_url, grpc_endpoint_url)) + Ok(grpc_endpoint_url) } diff --git a/control-plane/csi-driver/src/bin/node/registration.rs b/control-plane/csi-driver/src/bin/node/registration.rs index 9c77c6323..d22536e3a 100644 --- a/control-plane/csi-driver/src/bin/node/registration.rs +++ b/control-plane/csi-driver/src/bin/node/registration.rs @@ -1,4 +1,4 @@ -use crate::client::AppNodesClientWrapper; +use crate::{client::AppNodesClientWrapper, shutdown_event::Shutdown}; use snafu::Snafu; use std::{collections::HashMap, time::Duration}; use tokio::task::JoinError; @@ -18,12 +18,29 @@ pub(crate) async fn run_registration_loop( id: String, endpoint: String, labels: Option>, - client: &AppNodesClientWrapper, -) { + client: &Option, + registration_enabled: bool, +) -> anyhow::Result<()> { + if !registration_enabled { + return Ok(()); + } + + let Some(client) = client else { + return Err(anyhow::anyhow!( + "Rest API Client should have been initialized if registration is enabled" + )); + }; + let mut logged_error = false; loop { let interval_duration = match client.register_app_node(&id, &endpoint, &labels).await { - Ok(_) => REGISTRATION_INTERVAL_ON_SUCCESS, + Ok(_) => { + if logged_error { + tracing::info!("Successfully re-registered the app node"); + logged_error = false; + } + REGISTRATION_INTERVAL_ON_SUCCESS + } Err(e) => { if !logged_error { error!("Failed to register app node: {:?}", e); @@ -32,6 +49,16 @@ pub(crate) async fn run_registration_loop( REGISTRATION_INTERVAL_ON_ERROR } }; - tokio::time::sleep(interval_duration).await; + tokio::select! { + _ = tokio::time::sleep(interval_duration) => {} + _ = Shutdown::wait() => { + break; + } + } + } + // Deregister the node from the control plane on termination. + if let Err(error) = client.deregister_app_node(&id).await { + error!("Failed to deregister node, {:?}", error); } + Ok(()) }