From ab912c1e9658eac80aa578733d3d646f097093dd Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Tue, 23 Jan 2024 11:54:56 +0000 Subject: [PATCH] refactor(kubectl-plugin): tidy up error handling and command execution Separate command execution from error handling. Make use of common cli args and execution trait to remove redundant code. Remove excessing error mapping code. todo: make execution trait even more generic (in the rest-plugin crate side) Currently the cli args for supportability rely on the fact that clap global allows us to specify "duplicates". I'm not sure if this is a good idea.. todo: Build dump cli args explicitly? Signed-off-by: Tiago Castro --- Cargo.lock | 2 + dependencies/control-plane | 2 +- k8s/plugin/Cargo.toml | 1 + k8s/plugin/src/main.rs | 274 ++++------------------ k8s/plugin/src/resources/mod.rs | 79 ++++++- k8s/supportability/Cargo.toml | 1 + k8s/supportability/src/lib.rs | 376 +++++++++++++++--------------- k8s/upgrade/src/plugin/upgrade.rs | 9 + 8 files changed, 328 insertions(+), 416 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6ff0fcc69..60bea3533 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1830,6 +1830,7 @@ name = "kubectl-plugin" version = "1.0.0" dependencies = [ "anyhow", + "async-trait", "clap", "console-logger", "humantime", @@ -3371,6 +3372,7 @@ dependencies = [ "openapi", "platform", "pstor", + "rest-plugin", "schemars", "serde", "serde_json", diff --git a/dependencies/control-plane b/dependencies/control-plane index 6869243c9..3ea84a8c9 160000 --- a/dependencies/control-plane +++ b/dependencies/control-plane @@ -1 +1 @@ -Subproject commit 6869243c9f17783e7b8bbafe9218975cb18c692e +Subproject commit 3ea84a8c9c64a27a317293e9793405e41a659b7e diff --git a/k8s/plugin/Cargo.toml b/k8s/plugin/Cargo.toml index 3aae17fe9..7e9240eee 100644 --- a/k8s/plugin/Cargo.toml +++ b/k8s/plugin/Cargo.toml @@ -28,6 +28,7 @@ tokio = { version = "1.33.0" } anyhow = "1.0.75" clap = { version = "4.4.6", features = ["color", "derive"] } humantime = "2.1.0" +async-trait = "0.1.73" # Tracing opentelemetry = { version = "0.20.0", features = ["rt-tokio-current-thread"] } shutdown = { path = "../../dependencies/control-plane/utils/shutdown" } diff --git a/k8s/plugin/src/main.rs b/k8s/plugin/src/main.rs index 86db7e2f6..ef59040c9 100644 --- a/k8s/plugin/src/main.rs +++ b/k8s/plugin/src/main.rs @@ -1,266 +1,89 @@ -use crate::resources::GetResourcesK8s; -use anyhow::Result; use clap::Parser; use openapi::tower::client::Url; -use opentelemetry::global; -use plugin::{ - operations::{ - Cordoning, Drain, Get, GetBlockDevices, GetSnapshots, List, ListExt, RebuildHistory, - ReplicaTopology, Scale, - }, - resources::{ - blockdevice, cordon, drain, node, pool, snapshot, volume, CordonResources, DrainResources, - GetCordonArgs, GetDrainArgs, GetResources, ScaleResources, - }, - rest_wrapper::RestClient, -}; +use plugin::{rest_wrapper::RestClient, ExecuteOperation}; use resources::Operations; -use upgrade::plugin::{preflight_validations, upgrade::DeleteResources}; -use std::{env, path::PathBuf}; +use std::{env, ops::Deref}; mod resources; #[derive(Parser, Debug)] #[clap(name = utils::package_description!(), version = utils::version_info_str!())] +#[group(skip)] struct CliArgs { /// The rest endpoint to connect to. #[clap(global = true, long, short)] rest: Option, - /// Path to kubeconfig file. - #[clap(global = true, long, short = 'k')] - kube_config_path: Option, - /// The operation to be performed. #[clap(subcommand)] operations: Operations, - /// The Output, viz yaml, json. - #[clap(global = true, default_value = plugin::resources::utils::OutputFormat::None.as_ref(), short, long)] - output: plugin::resources::utils::OutputFormat, - - /// Trace rest requests to the Jaeger endpoint agent. - #[clap(global = true, long, short)] - jaeger: Option, - - /// Timeout for the REST operations. - #[clap(long, short, default_value = "10s")] - timeout: humantime::Duration, - - /// Kubernetes namespace of mayastor service - #[clap(global = true, long, short = 'n', default_value = "mayastor")] - namespace: String, + #[clap(flatten)] + args: resources::CliArgs, } + impl CliArgs { fn args() -> Self { CliArgs::parse() } } +impl Deref for CliArgs { + type Target = plugin::CliArgs; -#[tokio::main] -async fn main() { - plugin::init_tracing(CliArgs::args().jaeger.as_ref()); - - execute(CliArgs::args()).await; - - global::shutdown_tracer_provider(); -} - -async fn execute(cli_args: CliArgs) { - // Initialise the REST client. - if let Err(e) = init_rest(&cli_args).await { - eprintln!("Failed to initialise the REST client. Error {e}"); - std::process::exit(1); + fn deref(&self) -> &Self::Target { + &self.args } +} - // Perform the operations based on the subcommand, with proper output format. - let fut = async move { - let result: std::result::Result<(), Error> = match cli_args.operations { - Operations::Get(resource) => match resource { - GetResourcesK8s::Rest(resource) => match resource { - GetResources::Cordon(get_cordon_resource) => match get_cordon_resource { - GetCordonArgs::Node { id: node_id } => { - cordon::NodeCordon::get(&node_id, &cli_args.output) - .await - .map_err(|e| e.into()) - } - GetCordonArgs::Nodes => cordon::NodeCordons::list(&cli_args.output) - .await - .map_err(|e| e.into()), - }, - GetResources::Drain(get_drain_resource) => match get_drain_resource { - GetDrainArgs::Node { id: node_id } => { - drain::NodeDrain::get(&node_id, &cli_args.output) - .await - .map_err(|e| e.into()) - } - GetDrainArgs::Nodes => drain::NodeDrains::list(&cli_args.output) - .await - .map_err(|e| e.into()), - }, - GetResources::Volumes(volume_args) => { - volume::Volumes::list(&cli_args.output, &volume_args) - .await - .map_err(|e| e.into()) - } - GetResources::Volume { id } => volume::Volume::get(&id, &cli_args.output) - .await - .map_err(|e| e.into()), - GetResources::VolumeReplicaTopologies(volume_args) => { - volume::Volume::topologies(&cli_args.output, &volume_args) - .await - .map_err(|e| e.into()) - } - GetResources::VolumeReplicaTopology { id } => { - volume::Volume::topology(&id, &cli_args.output) - .await - .map_err(|e| e.into()) - } - GetResources::Pools => pool::Pools::list(&cli_args.output) - .await - .map_err(|e| e.into()), - GetResources::Pool { id } => pool::Pool::get(&id, &cli_args.output) - .await - .map_err(|e| e.into()), - GetResources::Nodes => node::Nodes::list(&cli_args.output) - .await - .map_err(|e| e.into()), - GetResources::Node(args) => node::Node::get(&args.node_id(), &cli_args.output) - .await - .map_err(|e| e.into()), - GetResources::BlockDevices(bdargs) => { - blockdevice::BlockDevice::get_blockdevices( - &bdargs.node_id(), - &bdargs.all(), - &cli_args.output, - ) - .await - .map_err(|e| e.into()) - } - GetResources::VolumeSnapshots(snapargs) => { - snapshot::VolumeSnapshots::get_snapshots( - &snapargs.volume(), - &snapargs.snapshot(), - &cli_args.output, - ) - .await - .map_err(|e| e.into()) - } - GetResources::RebuildHistory { id } => { - volume::Volume::rebuild_history(&id, &cli_args.output) - .await - .map_err(|e| e.into()) - } - }, - GetResourcesK8s::UpgradeStatus(resources) => resources - .get_upgrade(&cli_args.namespace) - .await - .map_err(|e| e.into()), - }, - Operations::Drain(resource) => match resource { - DrainResources::Node(drain_node_args) => node::Node::drain( - &drain_node_args.node_id(), - drain_node_args.label(), - drain_node_args.drain_timeout(), - &cli_args.output, - ) - .await - .map_err(|e| e.into()), - }, - Operations::Scale(resource) => match resource { - ScaleResources::Volume { id, replica_count } => { - volume::Volume::scale(&id, replica_count, &cli_args.output) - .await - .map_err(|e| e.into()) - } - }, - Operations::Cordon(resource) => match resource { - CordonResources::Node { id, label } => { - node::Node::cordon(&id, &label, &cli_args.output) - .await - .map_err(|e| e.into()) - } - }, - Operations::Uncordon(resource) => match resource { - CordonResources::Node { id, label } => { - node::Node::uncordon(&id, &label, &cli_args.output) - .await - .map_err(|e| e.into()) - } - }, - Operations::Dump(resources) => { - resources - .dump(cli_args.kube_config_path) - .await - .map_err(|e| { - println!("Partially collected dump information: "); - e.into() - }) +#[tokio::main] +async fn main() { + let cli_args = CliArgs::args(); + let _tracer_flusher = cli_args.init_tracing(); + + if let Err(error) = cli_args.execute().await { + let mut exit_code = 1; + match error { + Error::RestPlugin(error) => eprintln!("{error}"), + Error::RestClient(error) => { + eprintln!("Failed to initialise the REST client. Error {error}") } - Operations::Upgrade(resources) => { - match preflight_validations::preflight_check( - &cli_args.namespace, - cli_args.kube_config_path.clone(), - cli_args.timeout, - &resources, - ) - .await - { - Ok(_) => { - if resources.dry_run { - resources - .dummy_apply(&cli_args.namespace) - .await - .map_err(|e| e.into()) - } else { - resources - .apply(&cli_args.namespace) - .await - .map_err(|e| e.into()) - } - } - Err(e) => Err(e.into()), - } + Error::Upgrade(error) => { + eprintln!("{error}"); + exit_code = error.into(); } + Error::Generic(error) => eprintln!("{error}"), + } + std::process::exit(exit_code); + } +} - Operations::Delete(resource) => match resource { - DeleteResources::Upgrade(res) => { - res.delete(&cli_args.namespace).await.map_err(|e| e.into()) - } - }, - }; +impl CliArgs { + async fn execute(self) -> Result<(), Error> { + // Initialise the REST client. + init_rest(&self).await?; - if let Err(error) = result { - let mut exit_code = 1; - match error { - Error::RestPlugin(err) => eprintln!("{err}"), - Error::Upgrade(err) => { - eprintln!("{err}"); - exit_code = err.into(); - } - Error::Generic(err) => eprintln!("{err}"), + tokio::select! { + shutdown = shutdown::Shutdown::wait_sig() => { + Err(anyhow::anyhow!("Interrupted by {shutdown:?}").into()) + }, + done = self.operations.execute(&self.args) => { + done } - std::process::exit(exit_code); - }; - }; - - tokio::select! { - _shutdown = shutdown::Shutdown::wait_sig() => {}, - _done = fut => {} + } } } /// Initialise the REST client. -async fn init_rest(args: &CliArgs) -> Result<()> { +async fn init_rest(cli_args: &CliArgs) -> Result<(), Error> { // Use the supplied URL if there is one otherwise obtain one from the kubeconfig file. - match args.rest.clone() { - Some(url) => RestClient::init(url, *args.timeout), + match cli_args.rest.clone() { + Some(url) => RestClient::init(url, *cli_args.timeout).map_err(Error::RestClient), None => { let config = kube_proxy::ConfigBuilder::default_api_rest() - .with_kube_config(args.kube_config_path.clone()) - .with_timeout(*args.timeout) - .with_target_mod(|t| t.with_namespace(&args.namespace)) + .with_kube_config(cli_args.args.kube_config_path.clone()) + .with_timeout(*cli_args.timeout) + .with_target_mod(|t| t.with_namespace(&cli_args.args.namespace)) .build() .await?; RestClient::init_with_config(config)?; @@ -269,9 +92,10 @@ async fn init_rest(args: &CliArgs) -> Result<()> { } } -enum Error { +pub enum Error { Upgrade(upgrade::error::Error), RestPlugin(plugin::resources::error::Error), + RestClient(anyhow::Error), Generic(anyhow::Error), } diff --git a/k8s/plugin/src/resources/mod.rs b/k8s/plugin/src/resources/mod.rs index d1d761354..9e2166b0c 100644 --- a/k8s/plugin/src/resources/mod.rs +++ b/k8s/plugin/src/resources/mod.rs @@ -1,7 +1,37 @@ +use crate::Error; use clap::Parser; -use plugin::resources::{CordonResources, DrainResources, GetResources, ScaleResources}; +use plugin::{ + resources::{CordonResources, DrainResources, GetResources, ScaleResources}, + ExecuteOperation, +}; +use std::{ops::Deref, path::PathBuf}; use supportability::DumpArgs; -use upgrade::plugin::upgrade::{DeleteResources, GetUpgradeArgs, UpgradeArgs}; +use upgrade::{ + plugin::upgrade::{DeleteResources, GetUpgradeArgs, UpgradeArgs}, + preflight_validations, +}; + +#[derive(Parser, Debug)] +#[group(skip)] +pub struct CliArgs { + /// Path to kubeconfig file. + #[clap(global = true, long, short = 'k')] + pub(super) kube_config_path: Option, + + /// Kubernetes namespace of mayastor service + #[clap(global = true, long, short = 'n', default_value = "mayastor")] + pub(super) namespace: String, + + #[clap(flatten)] + cli_args: plugin::CliArgs, +} +impl Deref for CliArgs { + type Target = plugin::CliArgs; + + fn deref(&self) -> &Self::Target { + &self.cli_args + } +} #[derive(clap::Subcommand, Debug)] pub enum GetResourcesK8s { @@ -37,3 +67,48 @@ pub enum Operations { #[clap(subcommand)] Delete(DeleteResources), } + +#[async_trait::async_trait(?Send)] +impl ExecuteOperation for Operations { + type Args = CliArgs; + type Error = Error; + async fn execute(&self, cli_args: &CliArgs) -> Result<(), Error> { + match self { + Operations::Get(resource) => match resource { + GetResourcesK8s::Rest(resource) => resource.execute(cli_args).await?, + GetResourcesK8s::UpgradeStatus(resources) => { + // todo: use generic execute trait + resources.get_upgrade(&cli_args.namespace).await? + } + }, + Operations::Drain(resource) => resource.execute(cli_args).await?, + Operations::Scale(resource) => resource.execute(cli_args).await?, + Operations::Cordon(resource) => resource.execute(cli_args).await?, + Operations::Uncordon(resource) => resource.execute(cli_args).await?, + Operations::Dump(resources) => { + // todo: build and pass arguments + resources.execute(&()).await.map_err(|e| { + // todo: check why is this here, can it be removed? + println!("Partially collected dump information: "); + e + })? + } + Operations::Upgrade(resources) => { + // todo: use generic execute trait + preflight_validations::preflight_check( + &cli_args.namespace, + cli_args.kube_config_path.clone(), + cli_args.timeout, + resources, + ) + .await?; + resources.execute(&cli_args.namespace).await? + } + Operations::Delete(resource) => match resource { + // todo: use generic execute trait + DeleteResources::Upgrade(res) => res.delete(&cli_args.namespace).await?, + }, + } + Ok(()) + } +} diff --git a/k8s/supportability/Cargo.toml b/k8s/supportability/Cargo.toml index 130149009..30d775a5a 100644 --- a/k8s/supportability/Cargo.toml +++ b/k8s/supportability/Cargo.toml @@ -45,3 +45,4 @@ pstor = { path = "../../dependencies/control-plane/utils/pstor" } platform = { path = "../../dependencies/control-plane/utils/platform" } openapi = { path = "../../dependencies/control-plane/openapi", default-features = false, features = [ "tower-client", "tower-trace" ] } kube-proxy = { path = "../proxy" } +rest-plugin = { path = "../../dependencies/control-plane/control-plane/plugin", default-features = false } diff --git a/k8s/supportability/src/lib.rs b/k8s/supportability/src/lib.rs index da6f865f9..c88ac49ab 100644 --- a/k8s/supportability/src/lib.rs +++ b/k8s/supportability/src/lib.rs @@ -2,18 +2,19 @@ pub mod collect; pub mod operations; use collect::{ - common::DumpConfig, + common::{DumpConfig, OutputFormat}, error::Error, resource_dump::ResourceDumper, resources::{node::NodeClientWrapper, Resourcer}, rest_wrapper, + utils::log, }; -use operations::{Operations, Resource}; +use operations::Resource; #[cfg(debug_assertions)] use collect::resources::{pool::PoolClientWrapper, traits::Topologer, volume::VolumeClientWrapper}; -use crate::collect::{common::OutputFormat, utils::log}; +use plugin::ExecuteOperation; use std::path::PathBuf; /// Collects state & log information of mayastor services running in the system and dump them. @@ -44,6 +45,10 @@ pub struct SupportArgs { /// Kubernetes namespace of mayastor service #[clap(global = true, long, short = 'n', default_value = "mayastor")] namespace: String, + + /// Path to kubeconfig file. + #[clap(global = true, long, short = 'k')] + kube_config_path: Option, } /// Supportability - collects state & log information of services and dumps it to a tar file. @@ -58,214 +63,209 @@ pub struct DumpArgs { resource: Resource, } -impl DumpArgs { - /// Execute the dump of the specified resources. - pub async fn dump(self, kube_config: Option) -> anyhow::Result<()> { - self.args - .execute(kube_config, Operations::Dump(self.resource)) - .await +#[async_trait::async_trait(?Send)] +impl ExecuteOperation for DumpArgs { + type Args = (); + type Error = anyhow::Error; + async fn execute(&self, _: &Self::Args) -> Result<(), Self::Error> { + self.resource.execute(&self.args).await } } -impl SupportArgs { - /// Execute the specified operation. - pub(crate) async fn execute( - self, - kube_config_path: Option, - operation: Operations, - ) -> anyhow::Result<()> { - // Initialise the REST client. +#[async_trait::async_trait(?Send)] +impl ExecuteOperation for Resource { + type Args = SupportArgs; + type Error = anyhow::Error; + + async fn execute(&self, cli_args: &Self::Args) -> Result<(), Self::Error> { let config = kube_proxy::ConfigBuilder::default_api_rest() - .with_kube_config(kube_config_path.clone()) - .with_timeout(*self.timeout) - .with_target_mod(|t| t.with_namespace(&self.namespace)) + .with_kube_config(cli_args.kube_config_path.clone()) + .with_timeout(*cli_args.timeout) + .with_target_mod(|t| t.with_namespace(&cli_args.namespace)) .build() .await?; let rest_client = rest_wrapper::RestClient::new_with_config(config); - // TODO: Move code inside options to some generic function - // Perform the operations based on user chosen subcommands - match operation { - Operations::Dump(resource) => self - .execute_resource_dump(rest_client, kube_config_path, resource) - .await - .map_err(|e| anyhow::anyhow!("{:?}", e)), - } + execute_resource_dump( + cli_args.clone(), + rest_client, + cli_args.kube_config_path.clone(), + self.clone(), + ) + .await + .map_err(|e| anyhow::anyhow!("{:?}", e)) } +} - async fn execute_resource_dump( - self, - rest_client: rest_wrapper::RestClient, - kube_config_path: Option, - resource: Resource, - ) -> Result<(), Error> { - let cli_args = self; +async fn execute_resource_dump( + cli_args: SupportArgs, + rest_client: rest_wrapper::RestClient, + kube_config_path: Option, + resource: Resource, +) -> Result<(), Error> { + #[cfg(debug_assertions)] + let topologer: Box; + let mut config = DumpConfig { + rest_client: rest_client.clone(), + output_directory: cli_args.output_directory_path, + namespace: cli_args.namespace, + loki_uri: cli_args.loki_endpoint, + etcd_uri: cli_args.etcd_endpoint, + since: cli_args.since, + kube_config_path, + timeout: cli_args.timeout, #[cfg(debug_assertions)] - let topologer: Box; - let mut config = DumpConfig { - rest_client: rest_client.clone(), - output_directory: cli_args.output_directory_path, - namespace: cli_args.namespace, - loki_uri: cli_args.loki_endpoint, - etcd_uri: cli_args.etcd_endpoint, - since: cli_args.since, - kube_config_path, - timeout: cli_args.timeout, - #[cfg(debug_assertions)] - topologer: None, - output_format: OutputFormat::Tar, - }; - let mut errors = Vec::new(); - match resource { - Resource::Loki => { - let mut system_dumper = - collect::system_dump::SystemDumper::get_or_panic_system_dumper(config, true) - .await; - let node_topologer = NodeClientWrapper::new(system_dumper.rest_client()) - .get_topologer(None) - .await - .ok(); - log("Completed collection of topology information".to_string()); + topologer: None, + output_format: OutputFormat::Tar, + }; + let mut errors = Vec::new(); + match resource { + Resource::Loki => { + let mut system_dumper = + collect::system_dump::SystemDumper::get_or_panic_system_dumper(config, true).await; + let node_topologer = NodeClientWrapper::new(system_dumper.rest_client()) + .get_topologer(None) + .await + .ok(); + log("Completed collection of topology information".to_string()); - system_dumper - .collect_and_dump_loki_logs(node_topologer) - .await?; - if let Err(e) = system_dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + system_dumper + .collect_and_dump_loki_logs(node_topologer) + .await?; + if let Err(e) = system_dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); + } + } + Resource::System(args) => { + let mut system_dumper = collect::system_dump::SystemDumper::get_or_panic_system_dumper( + config, + args.disable_log_collection, + ) + .await; + if let Err(e) = system_dumper.dump_system().await { + // NOTE: We also need to log error content into Supportability log file + log(format!("Failed to dump system state, error: {e:?}")); + errors.push(e); + } + if let Err(e) = system_dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); + } + } + #[cfg(debug_assertions)] + Resource::Volumes => { + let volume_client = VolumeClientWrapper::new(rest_client); + topologer = volume_client.get_topologer(None).await?; + config.topologer = Some(topologer); + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_info("topology/volume".to_string()).await { + log(format!("Failed to dump volumes information, Error: {e:?}")); + errors.push(e); + } + if let Err(e) = dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); } - Resource::System(args) => { - let mut system_dumper = - collect::system_dump::SystemDumper::get_or_panic_system_dumper( - config, - args.disable_log_collection, - ) - .await; - if let Err(e) = system_dumper.dump_system().await { - // NOTE: We also need to log error content into Supportability log file - log(format!("Failed to dump system state, error: {e:?}")); - errors.push(e); - } - if let Err(e) = system_dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + } + #[cfg(debug_assertions)] + Resource::Volume { id } => { + let volume_client = VolumeClientWrapper::new(rest_client); + topologer = volume_client.get_topologer(Some(id)).await?; + config.topologer = Some(topologer); + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_info("topology/volume".to_string()).await { + log(format!( + "Failed to dump volume {id} information, Error: {e:?}" + )); + errors.push(e); } - #[cfg(debug_assertions)] - Resource::Volumes => { - let volume_client = VolumeClientWrapper::new(rest_client); - topologer = volume_client.get_topologer(None).await?; - config.topologer = Some(topologer); - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_info("topology/volume".to_string()).await { - log(format!("Failed to dump volumes information, Error: {e:?}")); - errors.push(e); - } - if let Err(e) = dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + if let Err(e) = dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); } - #[cfg(debug_assertions)] - Resource::Volume { id } => { - let volume_client = VolumeClientWrapper::new(rest_client); - topologer = volume_client.get_topologer(Some(id)).await?; - config.topologer = Some(topologer); - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_info("topology/volume".to_string()).await { - log(format!( - "Failed to dump volume {id} information, Error: {e:?}" - )); - errors.push(e); - } - if let Err(e) = dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + } + #[cfg(debug_assertions)] + Resource::Pools => { + let pool_client = PoolClientWrapper::new(rest_client); + topologer = pool_client.get_topologer(None).await?; + config.topologer = Some(topologer); + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_info("topology/pool".to_string()).await { + log(format!("Failed to dump pools information, Error: {e:?}")); + errors.push(e); + } + if let Err(e) = dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); + } + } + #[cfg(debug_assertions)] + Resource::Pool { id } => { + let pool_client = PoolClientWrapper::new(rest_client); + topologer = pool_client.get_topologer(Some(id.to_string())).await?; + config.topologer = Some(topologer); + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_info("topology/pool".to_string()).await { + log(format!( + "Failed to dump pool {id} information, Error: {e:?}" + )); + errors.push(e); } - #[cfg(debug_assertions)] - Resource::Pools => { - let pool_client = PoolClientWrapper::new(rest_client); - topologer = pool_client.get_topologer(None).await?; - config.topologer = Some(topologer); - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_info("topology/pool".to_string()).await { - log(format!("Failed to dump pools information, Error: {e:?}")); - errors.push(e); - } - if let Err(e) = dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + if let Err(e) = dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); } - #[cfg(debug_assertions)] - Resource::Pool { id } => { - let pool_client = PoolClientWrapper::new(rest_client); - topologer = pool_client.get_topologer(Some(id.to_string())).await?; - config.topologer = Some(topologer); - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_info("topology/pool".to_string()).await { - log(format!( - "Failed to dump pool {id} information, Error: {e:?}" - )); - errors.push(e); - } - if let Err(e) = dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + } + #[cfg(debug_assertions)] + Resource::Nodes => { + let node_client = NodeClientWrapper { rest_client }; + topologer = node_client.get_topologer(None).await?; + config.topologer = Some(topologer); + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_info("topology/node".to_string()).await { + log(format!("Failed to dump nodes information, Error: {e:?}")); + errors.push(e); } - #[cfg(debug_assertions)] - Resource::Nodes => { - let node_client = NodeClientWrapper { rest_client }; - topologer = node_client.get_topologer(None).await?; - config.topologer = Some(topologer); - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_info("topology/node".to_string()).await { - log(format!("Failed to dump nodes information, Error: {e:?}")); - errors.push(e); - } - if let Err(e) = dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + if let Err(e) = dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); } - #[cfg(debug_assertions)] - Resource::Node { id } => { - let node_client = NodeClientWrapper { rest_client }; - topologer = node_client.get_topologer(Some(id.to_string())).await?; - config.topologer = Some(topologer); - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_info("topology/node".to_string()).await { - log(format!( - "Failed to dump node {id} information, Error: {e:?}" - )); - errors.push(e); - } - if let Err(e) = dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + } + #[cfg(debug_assertions)] + Resource::Node { id } => { + let node_client = NodeClientWrapper { rest_client }; + topologer = node_client.get_topologer(Some(id.to_string())).await?; + config.topologer = Some(topologer); + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_info("topology/node".to_string()).await { + log(format!( + "Failed to dump node {id} information, Error: {e:?}" + )); + errors.push(e); } - Resource::Etcd { stdout } => { - config.output_format = if stdout { - OutputFormat::Stdout - } else { - OutputFormat::Tar - }; - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_etcd().await { - log(format!("Failed to dump etcd information, Error: {e:?}")); - errors.push(e); - } + if let Err(e) = dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); } } - if !errors.is_empty() { - return Err(Error::MultipleErrors(errors)); + Resource::Etcd { stdout } => { + config.output_format = if stdout { + OutputFormat::Stdout + } else { + OutputFormat::Tar + }; + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_etcd().await { + log(format!("Failed to dump etcd information, Error: {e:?}")); + errors.push(e); + } } - println!("Completed collection of dump !!"); - Ok(()) } + if !errors.is_empty() { + return Err(Error::MultipleErrors(errors)); + } + println!("Completed collection of dump !!"); + Ok(()) } diff --git a/k8s/upgrade/src/plugin/upgrade.rs b/k8s/upgrade/src/plugin/upgrade.rs index 461b7b255..d8c80a884 100644 --- a/k8s/upgrade/src/plugin/upgrade.rs +++ b/k8s/upgrade/src/plugin/upgrade.rs @@ -198,6 +198,15 @@ impl UpgradeArgs { Ok(()) } + /// Execute the upgrade command. + pub async fn execute(&self, namespace: &str) -> error::Result<()> { + if self.dry_run { + self.dummy_apply(namespace).await + } else { + self.apply(namespace).await + } + } + /// Dummy upgrade the resources. pub async fn dummy_apply(&self, namespace: &str) -> error::Result<()> { let mut pods_names: Vec = Vec::new();