Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(app node): register csi nodes to control plane #730

Merged
merged 4 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions control-plane/csi-driver/src/bin/controller/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::CsiControllerConfig;
use std::collections::HashMap;
use stor_port::types::v0::openapi::{
clients,
clients::tower::StatusCode,
Expand All @@ -12,6 +11,7 @@ use stor_port::types::v0::openapi::{

use anyhow::{anyhow, Result};
use once_cell::sync::OnceCell;
use std::collections::HashMap;
use tracing::{debug, info, instrument};

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -93,18 +93,18 @@ impl From<clients::tower::Error<RestJsonError>> for ApiClientError {
}
}

static REST_CLIENT: OnceCell<IoEngineApiClient> = OnceCell::new();
static REST_CLIENT: OnceCell<RestApiClient> = OnceCell::new();

/// Single instance API client for accessing REST API gateway.
/// Encapsulates communication with REST API by exposing a set of
/// high-level API functions, which perform (de)serialization
/// of API request/response objects.
#[derive(Debug)]
pub struct IoEngineApiClient {
pub struct RestApiClient {
rest_client: clients::tower::ApiClient,
}

impl IoEngineApiClient {
impl RestApiClient {
/// Initialize API client instance. Must be called prior to
/// obtaining the client instance.
pub(crate) fn initialize() -> Result<()> {
Expand Down Expand Up @@ -143,7 +143,7 @@ impl IoEngineApiClient {

/// Obtain client instance. Panics if called before the client
/// has been initialized.
pub(crate) fn get_client() -> &'static IoEngineApiClient {
pub(crate) fn get_client() -> &'static RestApiClient {
REST_CLIENT.get().expect("Rest client is not initialized")
}
}
Expand All @@ -154,7 +154,7 @@ pub(crate) enum ListToken {
Number(isize),
}

impl IoEngineApiClient {
impl RestApiClient {
/// List all nodes available in IoEngine cluster.
pub(crate) async fn list_nodes(&self) -> Result<Vec<Node>, ApiClientError> {
let response = self.rest_client.nodes_api().get_nodes(None).await?;
Expand Down
114 changes: 54 additions & 60 deletions control-plane/csi-driver/src/bin/controller/controller.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{
client::ListToken, ApiClientError, CreateVolumeTopology, CsiControllerConfig, IoEngineApiClient,
client::ListToken, ApiClientError, CreateVolumeTopology, CsiControllerConfig, RestApiClient,
};

use csi_driver::context::{CreateParams, PublishParams};
use rpc::csi::{volume_content_source::Type, Topology as CsiTopology, *};
use stor_port::types::v0::openapi::{
Expand Down Expand Up @@ -289,7 +288,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
let mut volume_context = args.parameters.clone();

// First check if the volume already exists.
match IoEngineApiClient::get_client()
match RestApiClient::get_client()
.get_volume_for_create(&parsed_vol_uuid)
.await
{
Expand Down Expand Up @@ -319,7 +318,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {

let volume = match volume_content_source {
Some(snapshot_uuid) => {
IoEngineApiClient::get_client()
RestApiClient::get_client()
.create_snapshot_volume(
&parsed_vol_uuid,
&snapshot_uuid,
Expand All @@ -333,7 +332,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
.await?
}
None => {
IoEngineApiClient::get_client()
RestApiClient::get_client()
.create_volume(
&parsed_vol_uuid,
replica_count,
Expand Down Expand Up @@ -391,7 +390,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
Status::invalid_argument(format!("Malformed volume UUID: {}", args.volume_id))
})?;
let _guard = csi_driver::limiter::VolumeOpGuard::new(volume_uuid)?;
IoEngineApiClient::get_client()
RestApiClient::get_client()
.delete_volume(&volume_uuid)
.await
.map_err(|e| {
Expand Down Expand Up @@ -440,9 +439,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
}

// Check if the volume is already published.
let volume = IoEngineApiClient::get_client()
.get_volume(&volume_id)
.await?;
let volume = RestApiClient::get_client().get_volume(&volume_id).await?;

let params = PublishParams::try_from(&args.volume_context)?;

Expand Down Expand Up @@ -482,52 +479,52 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
return Err(Status::internal(m));
}
},
_ => {
_ => {

// Check for node being cordoned.
fn cordon_check(spec: Option<&NodeSpec>) -> bool {
if let Some(spec) = spec {
return spec.cordondrainstate.is_some()
// Check for node being cordoned.
fn cordon_check(spec: Option<&NodeSpec>) -> bool {
if let Some(spec) = spec {
return spec.cordondrainstate.is_some()
}
false
}
false
}

// if the csi-node happens to be a data-plane node, use that for nexus creation, otherwise
// let the control-plane select the target node.
let target_node = match IoEngineApiClient::get_client().get_node(&node_id).await {
Err(ApiClientError::ResourceNotExists(_)) => Ok(None),
Err(error) => Err(error),
// When nodes are not online for any reason (eg: io-engine no longer runs) on said node,
// then let the control-plane decide where to place the target. Node should not be cordoned.
Ok(node) if node.state.as_ref().map(|n| n.status).unwrap_or(NodeStatus::Unknown) != NodeStatus::Online || cordon_check(node.spec.as_ref()) => {
Ok(None)
},
// For 1-replica volumes, don't pre-select the target node. This will allow the
// control-plane to pin the target to the replica node.
Ok(_) if volume.spec.num_replicas == 1 => Ok(None),
Ok(_) => Ok(Some(node_id.as_str())),
}?;

// Volume is not published.
let v = IoEngineApiClient::get_client()
.publish_volume(&volume_id, target_node, protocol, args.node_id.clone(), &publish_context)
.await?;

if let Some((node, uri)) = get_volume_share_location(&v) {
debug!(
// if the csi-node happens to be a data-plane node, use that for nexus creation, otherwise
// let the control-plane select the target node.
let target_node = match RestApiClient::get_client().get_node(&node_id).await {
Err(ApiClientError::ResourceNotExists(_)) => Ok(None),
Err(error) => Err(error),
// When nodes are not online for any reason (eg: io-engine no longer runs) on said node,
// then let the control-plane decide where to place the target. Node should not be cordoned.
Ok(node) if node.state.as_ref().map(|n| n.status).unwrap_or(NodeStatus::Unknown) != NodeStatus::Online || cordon_check(node.spec.as_ref()) => {
Ok(None)
},
// For 1-replica volumes, don't pre-select the target node. This will allow the
// control-plane to pin the target to the replica node.
Ok(_) if volume.spec.num_replicas == 1 => Ok(None),
Ok(_) => Ok(Some(node_id.as_str())),
}?;

// Volume is not published.
let v = RestApiClient::get_client()
.publish_volume(&volume_id, target_node, protocol, args.node_id.clone(), &publish_context)
.await?;

if let Some((node, uri)) = get_volume_share_location(&v) {
debug!(
"Volume {} successfully published on node {} via {}",
volume_id, node, uri
);
uri
} else {
let m = format!(
"Volume {volume_id} has been successfully published but URI is not available"
);
error!("{}", m);
return Err(Status::internal(m));
uri
} else {
let m = format!(
"Volume {volume_id} has been successfully published but URI is not available"
);
error!("{}", m);
return Err(Status::internal(m));
}
}
}
};
};

publish_context.insert("uri".to_string(), uri);

Expand All @@ -553,10 +550,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
})?;
let _guard = csi_driver::limiter::VolumeOpGuard::new(volume_uuid)?;
// Check if target volume exists.
let volume = match IoEngineApiClient::get_client()
.get_volume(&volume_uuid)
.await
{
let volume = match RestApiClient::get_client().get_volume(&volume_uuid).await {
Ok(volume) => volume,
Err(ApiClientError::ResourceNotExists { .. }) => {
debug!("Volume {} does not exist, not unpublishing", args.volume_id);
Expand All @@ -575,7 +569,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
}

// Do forced volume upublish as Kubernetes already detached the volume.
IoEngineApiClient::get_client()
RestApiClient::get_client()
.unpublish_volume(&volume_uuid, true)
.await
.map_err(|e| {
Expand All @@ -602,7 +596,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
Status::invalid_argument(format!("Malformed volume UUID: {}", args.volume_id))
})?;
let _guard = csi_driver::limiter::VolumeOpGuard::new(volume_uuid)?;
let _volume = IoEngineApiClient::get_client()
let _volume = RestApiClient::get_client()
.get_volume(&volume_uuid)
.await
.map_err(|_e| Status::unimplemented("Not implemented"))?;
Expand Down Expand Up @@ -656,7 +650,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {

let vt_mapper = VolumeTopologyMapper::init().await?;

let volumes = IoEngineApiClient::get_client()
let volumes = RestApiClient::get_client()
.list_volumes(max_entries, ListToken::String(args.starting_token))
.await
.map_err(|e| Status::internal(format!("Failed to list volumes, error = {e:?}")))?;
Expand Down Expand Up @@ -708,7 +702,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {

let pools: Vec<Pool> = if let Some(node) = node {
debug!("Calculating pool capacity for node {}", node);
IoEngineApiClient::get_client()
RestApiClient::get_client()
.get_node_pools(node)
.await
.map_err(|e| {
Expand All @@ -718,7 +712,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
})?
} else {
debug!("Calculating overall pool capacity");
IoEngineApiClient::get_client()
RestApiClient::get_client()
.list_pools()
.await
.map_err(|e| {
Expand Down Expand Up @@ -805,7 +799,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
Status::invalid_argument(format!("Malformed snapshot ID: {}", request.name))
})?;

let snapshot = IoEngineApiClient::get_client()
let snapshot = RestApiClient::get_client()
.create_volume_snapshot(&volume_uuid, &snap_uuid)
.await
.map_err(|error| match error {
Expand All @@ -831,7 +825,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
Status::invalid_argument(format!("Malformed snapshot UUID: {}", args.snapshot_id))
})?;

IoEngineApiClient::get_client()
RestApiClient::get_client()
.delete_volume_snapshot(&snapshot_uuid)
.await
.map_err(|e| {
Expand Down Expand Up @@ -865,7 +859,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
return Err(Status::invalid_argument("max_entries can't be negative"));
}

let snapshots = IoEngineApiClient::get_client()
let snapshots = RestApiClient::get_client()
.list_volume_snapshots(vol_uuid, snap_uuid, max_entries, request.starting_token)
.await?;

Expand Down
5 changes: 3 additions & 2 deletions control-plane/csi-driver/src/bin/controller/identity.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{ApiClientError, IoEngineApiClient};
use crate::{ApiClientError, RestApiClient};
use csi_driver::CSI_PLUGIN_NAME;
use rpc::csi::*;

use std::collections::HashMap;
use tonic::{Request, Response, Status};
use tracing::{debug, error, instrument};
Expand Down Expand Up @@ -65,7 +66,7 @@ impl rpc::csi::identity_server::Identity for CsiIdentitySvc {
// communicates to the Container Orchestrator that the plugin is not yet initialised but
// should not be restarted. See the CSI spec:
// https://github.com/container-storage-interface/spec/blob/5b0d4540158a260cb3347ef1c87ede8600afb9bf/csi.proto#L252-L256
let ready = match IoEngineApiClient::get_client().list_nodes().await {
let ready = match RestApiClient::get_client().list_nodes().await {
Ok(_) => true,
Err(ApiClientError::ServerCommunication { .. }) => {
error!("Failed to access REST API gateway, CSI Controller plugin is not ready",);
Expand Down
13 changes: 6 additions & 7 deletions control-plane/csi-driver/src/bin/controller/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use tracing::info;

use clap::{Arg, ArgMatches};
use client::{ApiClientError, CreateVolumeTopology, RestApiClient};
use config::CsiControllerConfig;
mod client;
mod config;
mod controller;
mod identity;
mod pvwatcher;
mod server;

use client::{ApiClientError, CreateVolumeTopology, IoEngineApiClient};
use config::CsiControllerConfig;
use clap::{Arg, ArgMatches};
use tracing::info;

const CSI_SOCKET: &str = "/var/tmp/csi.sock";
const CONCURRENCY_LIMIT: usize = 10;
Expand All @@ -18,7 +17,7 @@ const REST_TIMEOUT: &str = "30s";
/// Initialize all components before starting the CSI controller.
fn initialize_controller(args: &ArgMatches) -> anyhow::Result<()> {
CsiControllerConfig::initialize(args)?;
IoEngineApiClient::initialize()
RestApiClient::initialize()
.map_err(|error| anyhow::anyhow!("Failed to initialize API client, error = {error}"))?;
Ok(())
}
Expand All @@ -27,7 +26,7 @@ fn initialize_controller(args: &ArgMatches) -> anyhow::Result<()> {
async fn ping_rest_api() {
info!("Checking REST API endpoint accessibility ...");

match IoEngineApiClient::get_client().list_nodes().await {
match RestApiClient::get_client().list_nodes().await {
Err(error) => tracing::error!(?error, "REST API endpoint is not accessible"),
Ok(nodes) => {
let names: Vec<String> = nodes.into_iter().map(|n| n.id).collect();
Expand Down
4 changes: 2 additions & 2 deletions control-plane/csi-driver/src/bin/controller/pvwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::{debug, error, info};
pub(crate) struct PvGarbageCollector {
pub(crate) pv_handle: Api<PersistentVolume>,
orphan_period: Option<humantime::Duration>,
rest_client: &'static crate::IoEngineApiClient,
rest_client: &'static crate::RestApiClient,
}

/// Methods implemented by PV Garbage Collector
Expand All @@ -24,7 +24,7 @@ impl PvGarbageCollector {
Ok(Self {
pv_handle: Api::<PersistentVolume>::all(client),
orphan_period,
rest_client: crate::IoEngineApiClient::get_client(),
rest_client: crate::RestApiClient::get_client(),
})
}
/// Starts watching PV events.
Expand Down
20 changes: 9 additions & 11 deletions control-plane/csi-driver/src/bin/controller/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use futures::TryFutureExt;
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::UnixListener,
};
use tonic::transport::{server::Connected, Server};
use tracing::{debug, error, info};
use crate::{controller::CsiControllerSvc, identity::CsiIdentitySvc};
use rpc::csi::{controller_server::ControllerServer, identity_server::IdentityServer};

use futures::TryFutureExt;
use std::{
fs,
io::ErrorKind,
Expand All @@ -14,10 +10,12 @@ use std::{
sync::Arc,
task::{Context, Poll},
};

use rpc::csi::{controller_server::ControllerServer, identity_server::IdentityServer};

use crate::{controller::CsiControllerSvc, identity::CsiIdentitySvc};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::UnixListener,
};
use tonic::transport::{server::Connected, Server};
use tracing::{debug, error, info};

#[derive(Debug)]
struct UnixStream(tokio::net::UnixStream);
Expand Down
Loading
Loading