diff --git a/control-plane/plugin/src/bin/rest-plugin/main.rs b/control-plane/plugin/src/bin/rest-plugin/main.rs index b71e5cfeb..7b195b23c 100644 --- a/control-plane/plugin/src/bin/rest-plugin/main.rs +++ b/control-plane/plugin/src/bin/rest-plugin/main.rs @@ -1,17 +1,8 @@ use clap::Parser; use openapi::tower::client::Url; -use plugin::{ - operations::{ - Cordoning, Drain, Get, GetBlockDevices, GetSnapshots, List, ListExt, Operations, - RebuildHistory, ReplicaTopology, Scale, - }, - resources::{ - blockdevice, cordon, drain, node, pool, snapshot, volume, CordonResources, DrainResources, - GetCordonArgs, GetDrainArgs, GetResources, ScaleResources, - }, - rest_wrapper::RestClient, -}; -use std::env; +use plugin::rest_wrapper::RestClient; +use snafu::ResultExt; +use std::{env, ops::Deref}; #[derive(clap::Parser, Debug)] #[clap(name = utils::package_description!(), version = utils::version_info_str!())] @@ -20,122 +11,46 @@ struct CliArgs { #[clap(global = true, long, short, default_value = "http://localhost:8081")] rest: Url, - /// The operation to be performed. - #[clap(subcommand)] - operations: Operations, - - /// The Output, viz yaml, json. - #[clap(global = true, default_value = plugin::resources::utils::OutputFormat::None.as_ref(), short, long)] - output: plugin::resources::utils::OutputFormat, - - /// Trace rest requests to the Jaeger endpoint agent. - #[clap(global = true, long, short)] - jaeger: Option, - - /// Timeout for the REST operations. - #[clap(long, short, default_value = "10s")] - timeout: humantime::Duration, + #[clap(flatten)] + args: plugin::RestCliArgs, } impl CliArgs { fn args() -> Self { CliArgs::parse() } } +impl Deref for CliArgs { + type Target = plugin::RestCliArgs; + + fn deref(&self) -> &Self::Target { + &self.args + } +} #[tokio::main] async fn main() { - plugin::init_tracing(CliArgs::args().jaeger.as_ref()); + let cli_args = CliArgs::args(); + let _trace_flush = cli_args.init_tracing(); - execute(CliArgs::args()).await; + if let Err(error) = cli_args.execute().await { + eprintln!("{error}"); + std::process::exit(1); + } +} - utils::tracing_telemetry::flush_traces(); +#[derive(Debug, snafu::Snafu)] +enum Error { + #[snafu(display("Failed to initialise the REST client. Error {source}"))] + RestClient { source: anyhow::Error }, + #[snafu(display("{source}"))] + Resources { source: plugin::resources::Error }, } -async fn execute(cli_args: CliArgs) { - // Initialise the REST client. - if let Err(e) = RestClient::init(cli_args.rest.clone(), *cli_args.timeout) { - println!("Failed to initialise the REST client. Error {e}"); +impl CliArgs { + async fn execute(&self) -> Result<(), Error> { + // todo: client connection is lazy, we should do sanity connection test here. + // Example, we can use use rest liveness probe. + RestClient::init(self.rest.clone(), *self.timeout).context(RestClientSnafu)?; + self.args.execute().await.context(ResourcesSnafu) } - - // Perform the operations based on the subcommand, with proper output format. - let result = match &cli_args.operations { - Operations::Drain(resource) => match resource { - DrainResources::Node(drain_node_args) => { - node::Node::drain( - &drain_node_args.node_id(), - drain_node_args.label(), - drain_node_args.drain_timeout(), - &cli_args.output, - ) - .await - } - }, - Operations::Get(resource) => match resource { - GetResources::Cordon(get_cordon_resource) => match get_cordon_resource { - GetCordonArgs::Node { id: node_id } => { - cordon::NodeCordon::get(node_id, &cli_args.output).await - } - GetCordonArgs::Nodes => cordon::NodeCordons::list(&cli_args.output).await, - }, - GetResources::Drain(get_drain_resource) => match get_drain_resource { - GetDrainArgs::Node { id: node_id } => { - drain::NodeDrain::get(node_id, &cli_args.output).await - } - GetDrainArgs::Nodes => drain::NodeDrains::list(&cli_args.output).await, - }, - GetResources::Volumes(vol_args) => { - volume::Volumes::list(&cli_args.output, vol_args).await - } - GetResources::Volume { id } => volume::Volume::get(id, &cli_args.output).await, - GetResources::RebuildHistory { id } => { - volume::Volume::rebuild_history(id, &cli_args.output).await - } - GetResources::VolumeReplicaTopologies(vol_args) => { - volume::Volume::topologies(&cli_args.output, vol_args).await - } - GetResources::VolumeReplicaTopology { id } => { - volume::Volume::topology(id, &cli_args.output).await - } - GetResources::Pools => pool::Pools::list(&cli_args.output).await, - GetResources::Pool { id } => pool::Pool::get(id, &cli_args.output).await, - GetResources::Nodes => node::Nodes::list(&cli_args.output).await, - GetResources::Node(args) => node::Node::get(&args.node_id(), &cli_args.output).await, - GetResources::BlockDevices(bdargs) => { - blockdevice::BlockDevice::get_blockdevices( - &bdargs.node_id(), - &bdargs.all(), - &cli_args.output, - ) - .await - } - GetResources::VolumeSnapshots(snapargs) => { - snapshot::VolumeSnapshots::get_snapshots( - &snapargs.volume(), - &snapargs.snapshot(), - &cli_args.output, - ) - .await - } - }, - Operations::Scale(resource) => match resource { - ScaleResources::Volume { id, replica_count } => { - volume::Volume::scale(id, *replica_count, &cli_args.output).await - } - }, - Operations::Cordon(resource) => match resource { - CordonResources::Node { id, label } => { - node::Node::cordon(id, label, &cli_args.output).await - } - }, - Operations::Uncordon(resource) => match resource { - CordonResources::Node { id, label } => { - node::Node::uncordon(id, label, &cli_args.output).await - } - }, - }; - - if let Err(error) = result { - eprintln!("{error}"); - std::process::exit(1); - }; } diff --git a/control-plane/plugin/src/lib.rs b/control-plane/plugin/src/lib.rs index eb793b4da..d019e72ab 100644 --- a/control-plane/plugin/src/lib.rs +++ b/control-plane/plugin/src/lib.rs @@ -3,26 +3,147 @@ extern crate prettytable; #[macro_use] extern crate lazy_static; +use crate::{ + operations::{ + Cordoning, Drain, Get, GetBlockDevices, GetSnapshots, List, ListExt, Operations, + PluginResult, RebuildHistory, ReplicaTopology, Scale, + }, + resources::{ + blockdevice, cordon, drain, node, pool, snapshot, volume, CordonResources, DrainResources, + GetCordonArgs, GetDrainArgs, GetResources, ScaleResources, + }, +}; + pub mod operations; pub mod resources; pub mod rest_wrapper; -/// Initialize tracing (including opentelemetry). -pub fn init_tracing(jaeger: Option<&String>) { - let git_version = option_env!("GIT_VERSION").unwrap_or_else(utils::raw_version_str); - let tags = - utils::tracing_telemetry::default_tracing_tags(git_version, env!("CARGO_PKG_VERSION")); - - let fmt_layer = match std::env::var("RUST_LOG") { - Ok(_) => utils::tracing_telemetry::FmtLayer::Stderr, - Err(_) => utils::tracing_telemetry::FmtLayer::None, - }; - - utils::tracing_telemetry::init_tracing_ext( - env!("CARGO_PKG_NAME"), - tags, - jaeger, - fmt_layer, - None, - ); +/// Flush traces on `Drop`. +pub struct TracingFlusher {} +impl Drop for TracingFlusher { + fn drop(&mut self) { + utils::tracing_telemetry::flush_traces(); + } +} + +#[derive(clap::Parser, Debug)] +#[clap(name = utils::package_description!(), version = utils::version_info_str!())] +pub struct RestCliArgs { + /// The operation to be performed. + #[clap(subcommand)] + pub operation: Operations, + + /// The Output, viz yaml, json. + #[clap(global = true, default_value = resources::utils::OutputFormat::None.as_ref(), short, long)] + pub output: resources::utils::OutputFormat, + + /// Trace rest requests to the Jaeger endpoint agent. + #[clap(global = true, long, short)] + pub jaeger: Option, + + /// Timeout for the REST operations. + #[clap(long, short, default_value = "10s")] + pub timeout: humantime::Duration, +} + +impl RestCliArgs { + /// Execute the operation specified in `Self::operations`, with proper output format. + pub async fn execute(&self) -> PluginResult { + match &self.operation { + Operations::Drain(resource) => match resource { + DrainResources::Node(drain_node_args) => { + node::Node::drain( + &drain_node_args.node_id(), + drain_node_args.label(), + drain_node_args.drain_timeout(), + &self.output, + ) + .await + } + }, + Operations::Get(resource) => match resource { + GetResources::Cordon(get_cordon_resource) => match get_cordon_resource { + GetCordonArgs::Node { id: node_id } => { + cordon::NodeCordon::get(node_id, &self.output).await + } + GetCordonArgs::Nodes => cordon::NodeCordons::list(&self.output).await, + }, + GetResources::Drain(get_drain_resource) => match get_drain_resource { + GetDrainArgs::Node { id: node_id } => { + drain::NodeDrain::get(node_id, &self.output).await + } + GetDrainArgs::Nodes => drain::NodeDrains::list(&self.output).await, + }, + GetResources::Volumes(vol_args) => { + volume::Volumes::list(&self.output, vol_args).await + } + GetResources::Volume { id } => volume::Volume::get(id, &self.output).await, + GetResources::RebuildHistory { id } => { + volume::Volume::rebuild_history(id, &self.output).await + } + GetResources::VolumeReplicaTopologies(vol_args) => { + volume::Volume::topologies(&self.output, vol_args).await + } + GetResources::VolumeReplicaTopology { id } => { + volume::Volume::topology(id, &self.output).await + } + GetResources::Pools => pool::Pools::list(&self.output).await, + GetResources::Pool { id } => pool::Pool::get(id, &self.output).await, + GetResources::Nodes => node::Nodes::list(&self.output).await, + GetResources::Node(args) => node::Node::get(&args.node_id(), &self.output).await, + GetResources::BlockDevices(bdargs) => { + blockdevice::BlockDevice::get_blockdevices( + &bdargs.node_id(), + &bdargs.all(), + &self.output, + ) + .await + } + GetResources::VolumeSnapshots(snapargs) => { + snapshot::VolumeSnapshots::get_snapshots( + &snapargs.volume(), + &snapargs.snapshot(), + &self.output, + ) + .await + } + }, + Operations::Scale(resource) => match resource { + ScaleResources::Volume { id, replica_count } => { + volume::Volume::scale(id, *replica_count, &self.output).await + } + }, + Operations::Cordon(resource) => match resource { + CordonResources::Node { id, label } => { + node::Node::cordon(id, label, &self.output).await + } + }, + Operations::Uncordon(resource) => match resource { + CordonResources::Node { id, label } => { + node::Node::uncordon(id, label, &self.output).await + } + }, + } + } + /// Initialize tracing (including opentelemetry). + pub fn init_tracing(&self) -> TracingFlusher { + let git_version = option_env!("GIT_VERSION").unwrap_or_else(utils::raw_version_str); + let tags = + utils::tracing_telemetry::default_tracing_tags(git_version, env!("CARGO_PKG_VERSION")); + + let fmt_layer = match std::env::var("RUST_LOG") { + Ok(_) => utils::tracing_telemetry::FmtLayer::Stderr, + Err(_) => utils::tracing_telemetry::FmtLayer::None, + }; + + utils::tracing_telemetry::init_tracing_ext( + env!("CARGO_PKG_NAME"), + tags, + self.jaeger.as_ref(), + fmt_layer, + None, + ); + + TracingFlusher {} + } } diff --git a/control-plane/plugin/src/resources/error.rs b/control-plane/plugin/src/resources/error.rs index 6add2fc5b..fd60ee084 100644 --- a/control-plane/plugin/src/resources/error.rs +++ b/control-plane/plugin/src/resources/error.rs @@ -1,12 +1,12 @@ use snafu::Snafu; -/// All errors returned when plugin command fails +/// All errors returned when resources command fails. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] #[allow(clippy::enum_variant_names)] pub enum Error { /// Error when listing block devices fails. - #[snafu(display("Failed to list blockdevices for node {id} . Error {source}"))] + #[snafu(display("Failed to list blockdevices for node {id}. Error {source}"))] GetBlockDevicesError { id: String, source: openapi::tower::client::Error, diff --git a/control-plane/plugin/src/resources/mod.rs b/control-plane/plugin/src/resources/mod.rs index f811de54a..7cf863505 100644 --- a/control-plane/plugin/src/resources/mod.rs +++ b/control-plane/plugin/src/resources/mod.rs @@ -15,6 +15,7 @@ pub mod snapshot; pub mod utils; pub mod volume; +pub use error::Error; pub type VolumeId = openapi::apis::Uuid; pub type SnapshotId = openapi::apis::Uuid; pub type ReplicaCount = u8; @@ -101,6 +102,6 @@ pub enum GetDrainArgs { Nodes, } -/// Tabular Output Tests +/// Tabular Output Tests. #[cfg(test)] mod tests;