Skip to content

Commit

Permalink
chore: make registration configurable, use only grpc endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Abhinandan Purkait <[email protected]>
  • Loading branch information
Abhinandan-Purkait committed Feb 6, 2024
1 parent 4f9ed99 commit 6aca045
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 50 deletions.
93 changes: 44 additions & 49 deletions control-plane/csi-driver/src/bin/node/main_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use stor_port::platform;
use utils::tracing_telemetry::{FmtLayer, FmtStyle};

use crate::client::AppNodesClientWrapper;
use anyhow::anyhow;
use clap::Arg;
use futures::TryFutureExt;
use serde_json::json;
Expand Down Expand Up @@ -108,11 +107,11 @@ pub(super) async fn main() -> anyhow::Result<()> {
.required(true),
)
.arg(
Arg::new("instance-endpoint")
.long("instance-endpoint")
.env("MY_POD_IP")
.help("Endpoint of current instance")
.required(true)
Arg::new("enable-registration")
.long("enable-registration")
.action(clap::ArgAction::SetTrue)
.value_name("BOOLEAN")
.help("Disable registration of the node with the control plane")
)
.arg(
Arg::new("csi-socket")
Expand All @@ -133,10 +132,10 @@ pub(super) async fn main() -> anyhow::Result<()> {
Arg::new("grpc-endpoint")
.short('g')
.long("grpc-endpoint")
.value_name("NAME")
.value_name("IP_ADDRESS")
.help("ip address where this instance runs, and optionally the gRPC port")
.default_value("0.0.0.0")
.required(false)
.env("MY_POD_IP")
.required(true)
)
.arg(
Arg::new("v")
Expand Down Expand Up @@ -302,28 +301,42 @@ pub(super) async fn main() -> anyhow::Result<()> {
let client =
AppNodesClientWrapper::initialize(matches.get_one::<String>("rest-endpoint").unwrap())?;

let registration_enabled = matches.get_flag("enable-registration");

// Parse instance and grpc endpoints from the command line arguments and validate.
let (instance_sock_addr, grpc_sock_addr) = validate_endpoints(
matches.get_one::<String>("instance-endpoint").unwrap(),
let grpc_sock_addr = validate_endpoints(
matches.get_one::<String>("grpc-endpoint").unwrap(),
registration_enabled,
)?;

// Start the CSI server, node plugin grpc server and registration loop.
// Start the CSI server, node plugin grpc server and registration loop if registration is
// enabled.
*crate::config::config().nvme_as_mut() = TryFrom::try_from(&matches)?;
tokio::select! {
result = CsiServer::run(csi_socket, &matches)? => {
result?;
if registration_enabled {
tokio::select! {
result = CsiServer::run(csi_socket, &matches)? => {
result?;
}
result = NodePluginGrpcServer::run(grpc_sock_addr) => {
result?;
}
_ = run_registration_loop(node_name.clone(), grpc_sock_addr.to_string(), Some(csi_labels), &client, registration_enabled) => {}
}
result = NodePluginGrpcServer::run(grpc_sock_addr) => {
result?;
// Deregister the node from the control plane on termination.
if let Err(error) = client.deregister_app_node(node_name).await {
error!("Failed to deregister node, {:?}", error);
}
} else {
tokio::select! {
result = CsiServer::run(csi_socket, &matches)? => {
result?;
}
result = NodePluginGrpcServer::run(grpc_sock_addr) => {
result?;
}
}
_ = run_registration_loop(node_name.clone(), instance_sock_addr.to_string(), Some(csi_labels), &client) => {}
}

// Deregister the node from the control plane on termination.
if let Err(error) = client.deregister_app_node(node_name).await {
error!("Failed to deregister node, {:?}", error);
}
Ok(())
}

Expand Down Expand Up @@ -400,41 +413,23 @@ async fn check_ana_and_label_node(
Ok(())
}

/// Validate that the instance endpoint and grpc endpoint are valid, and returns the instance
/// endpoint.
/// Validate that the grpc endpoint is valid.
fn validate_endpoints(
instance_endpoint: &str,
grpc_endpoint: &str,
) -> anyhow::Result<(SocketAddr, SocketAddr)> {
// Append the port to the grpc endpoint if it is not specified.
registration_enabled: bool,
) -> anyhow::Result<SocketAddr> {
let grpc_endpoint = if grpc_endpoint.contains(':') {
grpc_endpoint.to_string()
} else {
format!("{grpc_endpoint}:{GRPC_PORT}")
};

// Append the port to the instance endpoint with the grpc endpoint's port if it is not
// specified.
let instance_endpoint = if instance_endpoint.contains(':') {
instance_endpoint.to_string()
} else {
format!(
"{instance_endpoint}:{}",
grpc_endpoint
.split(':')
.last()
.ok_or(anyhow!("gRPC endpoint must have a port"))?
)
};

let instance_url = SocketAddr::from_str(&instance_endpoint)?;
let grpc_endpoint_url = SocketAddr::from_str(&grpc_endpoint)?;

if instance_url.port() != grpc_endpoint_url.port() {
return Err(anyhow!(
"instance endpoint and gRPC endpoint must have the same port"
// Should not allow using an unspecified ip if registration is enabled as grpc endpoint gets
// sent in registration request.
if registration_enabled && grpc_endpoint_url.ip().is_unspecified() {
return Err(anyhow::format_err!(
"gRPC endpoint: `0.0.0.0` is not allowed if registration is enabled"
));
}

Ok((instance_url, grpc_endpoint_url))
Ok(grpc_endpoint_url)
}
12 changes: 11 additions & 1 deletion control-plane/csi-driver/src/bin/node/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@ pub(crate) async fn run_registration_loop(
endpoint: String,
labels: Option<HashMap<String, String>>,
client: &AppNodesClientWrapper,
registration_enabled: bool,
) {
if !registration_enabled {
return;
}
let mut logged_error = false;
loop {
let interval_duration = match client.register_app_node(&id, &endpoint, &labels).await {
Ok(_) => REGISTRATION_INTERVAL_ON_SUCCESS,
Ok(_) => {
if logged_error {
tracing::info!("Successfully re-registered the app_node");
logged_error = false;
}
REGISTRATION_INTERVAL_ON_SUCCESS
}
Err(e) => {
if !logged_error {
error!("Failed to register app node: {:?}", e);
Expand Down

0 comments on commit 6aca045

Please sign in to comment.