Skip to content

Commit

Permalink
refactor(kubectl-plugin): tidy up error handling and command execution
Browse files Browse the repository at this point in the history
Separate command execution from error handling.
Make use of common cli args and execution trait to remove redundant code.
Remove excessing error mapping code.
todo: make execution trait even more generic (in the rest-plugin crate side)
Currently the cli args for supportability rely on the fact that clap global
allows us to specify "duplicates". I'm not sure if this is a good idea..
todo: Build dump cli args explicitly?

Signed-off-by: Tiago Castro <[email protected]>
  • Loading branch information
tiagolobocastro committed Jan 24, 2024
1 parent af3dd6b commit ab912c1
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 416 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions k8s/plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tokio = { version = "1.33.0" }
anyhow = "1.0.75"
clap = { version = "4.4.6", features = ["color", "derive"] }
humantime = "2.1.0"
async-trait = "0.1.73"
# Tracing
opentelemetry = { version = "0.20.0", features = ["rt-tokio-current-thread"] }
shutdown = { path = "../../dependencies/control-plane/utils/shutdown" }
274 changes: 49 additions & 225 deletions k8s/plugin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,266 +1,89 @@
use crate::resources::GetResourcesK8s;
use anyhow::Result;
use clap::Parser;
use openapi::tower::client::Url;
use opentelemetry::global;
use plugin::{
operations::{
Cordoning, Drain, Get, GetBlockDevices, GetSnapshots, List, ListExt, RebuildHistory,
ReplicaTopology, Scale,
},
resources::{
blockdevice, cordon, drain, node, pool, snapshot, volume, CordonResources, DrainResources,
GetCordonArgs, GetDrainArgs, GetResources, ScaleResources,
},
rest_wrapper::RestClient,
};
use plugin::{rest_wrapper::RestClient, ExecuteOperation};
use resources::Operations;
use upgrade::plugin::{preflight_validations, upgrade::DeleteResources};

use std::{env, path::PathBuf};
use std::{env, ops::Deref};

mod resources;

#[derive(Parser, Debug)]
#[clap(name = utils::package_description!(), version = utils::version_info_str!())]
#[group(skip)]
struct CliArgs {
/// The rest endpoint to connect to.
#[clap(global = true, long, short)]
rest: Option<Url>,

/// Path to kubeconfig file.
#[clap(global = true, long, short = 'k')]
kube_config_path: Option<PathBuf>,

/// The operation to be performed.
#[clap(subcommand)]
operations: Operations,

/// The Output, viz yaml, json.
#[clap(global = true, default_value = plugin::resources::utils::OutputFormat::None.as_ref(), short, long)]
output: plugin::resources::utils::OutputFormat,

/// Trace rest requests to the Jaeger endpoint agent.
#[clap(global = true, long, short)]
jaeger: Option<String>,

/// Timeout for the REST operations.
#[clap(long, short, default_value = "10s")]
timeout: humantime::Duration,

/// Kubernetes namespace of mayastor service
#[clap(global = true, long, short = 'n', default_value = "mayastor")]
namespace: String,
#[clap(flatten)]
args: resources::CliArgs,
}

impl CliArgs {
fn args() -> Self {
CliArgs::parse()
}
}
impl Deref for CliArgs {
type Target = plugin::CliArgs;

#[tokio::main]
async fn main() {
plugin::init_tracing(CliArgs::args().jaeger.as_ref());

execute(CliArgs::args()).await;

global::shutdown_tracer_provider();
}

async fn execute(cli_args: CliArgs) {
// Initialise the REST client.
if let Err(e) = init_rest(&cli_args).await {
eprintln!("Failed to initialise the REST client. Error {e}");
std::process::exit(1);
fn deref(&self) -> &Self::Target {
&self.args
}
}

// Perform the operations based on the subcommand, with proper output format.
let fut = async move {
let result: std::result::Result<(), Error> = match cli_args.operations {
Operations::Get(resource) => match resource {
GetResourcesK8s::Rest(resource) => match resource {
GetResources::Cordon(get_cordon_resource) => match get_cordon_resource {
GetCordonArgs::Node { id: node_id } => {
cordon::NodeCordon::get(&node_id, &cli_args.output)
.await
.map_err(|e| e.into())
}
GetCordonArgs::Nodes => cordon::NodeCordons::list(&cli_args.output)
.await
.map_err(|e| e.into()),
},
GetResources::Drain(get_drain_resource) => match get_drain_resource {
GetDrainArgs::Node { id: node_id } => {
drain::NodeDrain::get(&node_id, &cli_args.output)
.await
.map_err(|e| e.into())
}
GetDrainArgs::Nodes => drain::NodeDrains::list(&cli_args.output)
.await
.map_err(|e| e.into()),
},
GetResources::Volumes(volume_args) => {
volume::Volumes::list(&cli_args.output, &volume_args)
.await
.map_err(|e| e.into())
}
GetResources::Volume { id } => volume::Volume::get(&id, &cli_args.output)
.await
.map_err(|e| e.into()),
GetResources::VolumeReplicaTopologies(volume_args) => {
volume::Volume::topologies(&cli_args.output, &volume_args)
.await
.map_err(|e| e.into())
}
GetResources::VolumeReplicaTopology { id } => {
volume::Volume::topology(&id, &cli_args.output)
.await
.map_err(|e| e.into())
}
GetResources::Pools => pool::Pools::list(&cli_args.output)
.await
.map_err(|e| e.into()),
GetResources::Pool { id } => pool::Pool::get(&id, &cli_args.output)
.await
.map_err(|e| e.into()),
GetResources::Nodes => node::Nodes::list(&cli_args.output)
.await
.map_err(|e| e.into()),
GetResources::Node(args) => node::Node::get(&args.node_id(), &cli_args.output)
.await
.map_err(|e| e.into()),
GetResources::BlockDevices(bdargs) => {
blockdevice::BlockDevice::get_blockdevices(
&bdargs.node_id(),
&bdargs.all(),
&cli_args.output,
)
.await
.map_err(|e| e.into())
}
GetResources::VolumeSnapshots(snapargs) => {
snapshot::VolumeSnapshots::get_snapshots(
&snapargs.volume(),
&snapargs.snapshot(),
&cli_args.output,
)
.await
.map_err(|e| e.into())
}
GetResources::RebuildHistory { id } => {
volume::Volume::rebuild_history(&id, &cli_args.output)
.await
.map_err(|e| e.into())
}
},
GetResourcesK8s::UpgradeStatus(resources) => resources
.get_upgrade(&cli_args.namespace)
.await
.map_err(|e| e.into()),
},
Operations::Drain(resource) => match resource {
DrainResources::Node(drain_node_args) => node::Node::drain(
&drain_node_args.node_id(),
drain_node_args.label(),
drain_node_args.drain_timeout(),
&cli_args.output,
)
.await
.map_err(|e| e.into()),
},
Operations::Scale(resource) => match resource {
ScaleResources::Volume { id, replica_count } => {
volume::Volume::scale(&id, replica_count, &cli_args.output)
.await
.map_err(|e| e.into())
}
},
Operations::Cordon(resource) => match resource {
CordonResources::Node { id, label } => {
node::Node::cordon(&id, &label, &cli_args.output)
.await
.map_err(|e| e.into())
}
},
Operations::Uncordon(resource) => match resource {
CordonResources::Node { id, label } => {
node::Node::uncordon(&id, &label, &cli_args.output)
.await
.map_err(|e| e.into())
}
},
Operations::Dump(resources) => {
resources
.dump(cli_args.kube_config_path)
.await
.map_err(|e| {
println!("Partially collected dump information: ");
e.into()
})
#[tokio::main]
async fn main() {
let cli_args = CliArgs::args();
let _tracer_flusher = cli_args.init_tracing();

if let Err(error) = cli_args.execute().await {
let mut exit_code = 1;
match error {
Error::RestPlugin(error) => eprintln!("{error}"),
Error::RestClient(error) => {
eprintln!("Failed to initialise the REST client. Error {error}")
}
Operations::Upgrade(resources) => {
match preflight_validations::preflight_check(
&cli_args.namespace,
cli_args.kube_config_path.clone(),
cli_args.timeout,
&resources,
)
.await
{
Ok(_) => {
if resources.dry_run {
resources
.dummy_apply(&cli_args.namespace)
.await
.map_err(|e| e.into())
} else {
resources
.apply(&cli_args.namespace)
.await
.map_err(|e| e.into())
}
}
Err(e) => Err(e.into()),
}
Error::Upgrade(error) => {
eprintln!("{error}");
exit_code = error.into();
}
Error::Generic(error) => eprintln!("{error}"),
}
std::process::exit(exit_code);
}
}

Operations::Delete(resource) => match resource {
DeleteResources::Upgrade(res) => {
res.delete(&cli_args.namespace).await.map_err(|e| e.into())
}
},
};
impl CliArgs {
async fn execute(self) -> Result<(), Error> {
// Initialise the REST client.
init_rest(&self).await?;

if let Err(error) = result {
let mut exit_code = 1;
match error {
Error::RestPlugin(err) => eprintln!("{err}"),
Error::Upgrade(err) => {
eprintln!("{err}");
exit_code = err.into();
}
Error::Generic(err) => eprintln!("{err}"),
tokio::select! {
shutdown = shutdown::Shutdown::wait_sig() => {
Err(anyhow::anyhow!("Interrupted by {shutdown:?}").into())
},
done = self.operations.execute(&self.args) => {
done
}
std::process::exit(exit_code);
};
};

tokio::select! {
_shutdown = shutdown::Shutdown::wait_sig() => {},
_done = fut => {}
}
}
}

/// Initialise the REST client.
async fn init_rest(args: &CliArgs) -> Result<()> {
async fn init_rest(cli_args: &CliArgs) -> Result<(), Error> {
// Use the supplied URL if there is one otherwise obtain one from the kubeconfig file.
match args.rest.clone() {
Some(url) => RestClient::init(url, *args.timeout),
match cli_args.rest.clone() {
Some(url) => RestClient::init(url, *cli_args.timeout).map_err(Error::RestClient),
None => {
let config = kube_proxy::ConfigBuilder::default_api_rest()
.with_kube_config(args.kube_config_path.clone())
.with_timeout(*args.timeout)
.with_target_mod(|t| t.with_namespace(&args.namespace))
.with_kube_config(cli_args.args.kube_config_path.clone())
.with_timeout(*cli_args.timeout)
.with_target_mod(|t| t.with_namespace(&cli_args.args.namespace))
.build()
.await?;
RestClient::init_with_config(config)?;
Expand All @@ -269,9 +92,10 @@ async fn init_rest(args: &CliArgs) -> Result<()> {
}
}

enum Error {
pub enum Error {
Upgrade(upgrade::error::Error),
RestPlugin(plugin::resources::error::Error),
RestClient(anyhow::Error),
Generic(anyhow::Error),
}

Expand Down
Loading

0 comments on commit ab912c1

Please sign in to comment.