diff --git a/Cargo.lock b/Cargo.lock index 6ff0fcc69..60bea3533 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1830,6 +1830,7 @@ name = "kubectl-plugin" version = "1.0.0" dependencies = [ "anyhow", + "async-trait", "clap", "console-logger", "humantime", @@ -3371,6 +3372,7 @@ dependencies = [ "openapi", "platform", "pstor", + "rest-plugin", "schemars", "serde", "serde_json", diff --git a/dependencies/control-plane b/dependencies/control-plane index 6869243c9..3ea84a8c9 160000 --- a/dependencies/control-plane +++ b/dependencies/control-plane @@ -1 +1 @@ -Subproject commit 6869243c9f17783e7b8bbafe9218975cb18c692e +Subproject commit 3ea84a8c9c64a27a317293e9793405e41a659b7e diff --git a/k8s/plugin/Cargo.toml b/k8s/plugin/Cargo.toml index 3aae17fe9..7e9240eee 100644 --- a/k8s/plugin/Cargo.toml +++ b/k8s/plugin/Cargo.toml @@ -28,6 +28,7 @@ tokio = { version = "1.33.0" } anyhow = "1.0.75" clap = { version = "4.4.6", features = ["color", "derive"] } humantime = "2.1.0" +async-trait = "0.1.73" # Tracing opentelemetry = { version = "0.20.0", features = ["rt-tokio-current-thread"] } shutdown = { path = "../../dependencies/control-plane/utils/shutdown" } diff --git a/k8s/plugin/src/main.rs b/k8s/plugin/src/main.rs index 86db7e2f6..ef59040c9 100644 --- a/k8s/plugin/src/main.rs +++ b/k8s/plugin/src/main.rs @@ -1,266 +1,89 @@ -use crate::resources::GetResourcesK8s; -use anyhow::Result; use clap::Parser; use openapi::tower::client::Url; -use opentelemetry::global; -use plugin::{ - operations::{ - Cordoning, Drain, Get, GetBlockDevices, GetSnapshots, List, ListExt, RebuildHistory, - ReplicaTopology, Scale, - }, - resources::{ - blockdevice, cordon, drain, node, pool, snapshot, volume, CordonResources, DrainResources, - GetCordonArgs, GetDrainArgs, GetResources, ScaleResources, - }, - rest_wrapper::RestClient, -}; +use plugin::{rest_wrapper::RestClient, ExecuteOperation}; use resources::Operations; -use upgrade::plugin::{preflight_validations, upgrade::DeleteResources}; -use std::{env, path::PathBuf}; +use std::{env, ops::Deref}; mod resources; #[derive(Parser, Debug)] #[clap(name = utils::package_description!(), version = utils::version_info_str!())] +#[group(skip)] struct CliArgs { /// The rest endpoint to connect to. #[clap(global = true, long, short)] rest: Option, - /// Path to kubeconfig file. - #[clap(global = true, long, short = 'k')] - kube_config_path: Option, - /// The operation to be performed. #[clap(subcommand)] operations: Operations, - /// The Output, viz yaml, json. - #[clap(global = true, default_value = plugin::resources::utils::OutputFormat::None.as_ref(), short, long)] - output: plugin::resources::utils::OutputFormat, - - /// Trace rest requests to the Jaeger endpoint agent. - #[clap(global = true, long, short)] - jaeger: Option, - - /// Timeout for the REST operations. - #[clap(long, short, default_value = "10s")] - timeout: humantime::Duration, - - /// Kubernetes namespace of mayastor service - #[clap(global = true, long, short = 'n', default_value = "mayastor")] - namespace: String, + #[clap(flatten)] + args: resources::CliArgs, } + impl CliArgs { fn args() -> Self { CliArgs::parse() } } +impl Deref for CliArgs { + type Target = plugin::CliArgs; -#[tokio::main] -async fn main() { - plugin::init_tracing(CliArgs::args().jaeger.as_ref()); - - execute(CliArgs::args()).await; - - global::shutdown_tracer_provider(); -} - -async fn execute(cli_args: CliArgs) { - // Initialise the REST client. - if let Err(e) = init_rest(&cli_args).await { - eprintln!("Failed to initialise the REST client. Error {e}"); - std::process::exit(1); + fn deref(&self) -> &Self::Target { + &self.args } +} - // Perform the operations based on the subcommand, with proper output format. - let fut = async move { - let result: std::result::Result<(), Error> = match cli_args.operations { - Operations::Get(resource) => match resource { - GetResourcesK8s::Rest(resource) => match resource { - GetResources::Cordon(get_cordon_resource) => match get_cordon_resource { - GetCordonArgs::Node { id: node_id } => { - cordon::NodeCordon::get(&node_id, &cli_args.output) - .await - .map_err(|e| e.into()) - } - GetCordonArgs::Nodes => cordon::NodeCordons::list(&cli_args.output) - .await - .map_err(|e| e.into()), - }, - GetResources::Drain(get_drain_resource) => match get_drain_resource { - GetDrainArgs::Node { id: node_id } => { - drain::NodeDrain::get(&node_id, &cli_args.output) - .await - .map_err(|e| e.into()) - } - GetDrainArgs::Nodes => drain::NodeDrains::list(&cli_args.output) - .await - .map_err(|e| e.into()), - }, - GetResources::Volumes(volume_args) => { - volume::Volumes::list(&cli_args.output, &volume_args) - .await - .map_err(|e| e.into()) - } - GetResources::Volume { id } => volume::Volume::get(&id, &cli_args.output) - .await - .map_err(|e| e.into()), - GetResources::VolumeReplicaTopologies(volume_args) => { - volume::Volume::topologies(&cli_args.output, &volume_args) - .await - .map_err(|e| e.into()) - } - GetResources::VolumeReplicaTopology { id } => { - volume::Volume::topology(&id, &cli_args.output) - .await - .map_err(|e| e.into()) - } - GetResources::Pools => pool::Pools::list(&cli_args.output) - .await - .map_err(|e| e.into()), - GetResources::Pool { id } => pool::Pool::get(&id, &cli_args.output) - .await - .map_err(|e| e.into()), - GetResources::Nodes => node::Nodes::list(&cli_args.output) - .await - .map_err(|e| e.into()), - GetResources::Node(args) => node::Node::get(&args.node_id(), &cli_args.output) - .await - .map_err(|e| e.into()), - GetResources::BlockDevices(bdargs) => { - blockdevice::BlockDevice::get_blockdevices( - &bdargs.node_id(), - &bdargs.all(), - &cli_args.output, - ) - .await - .map_err(|e| e.into()) - } - GetResources::VolumeSnapshots(snapargs) => { - snapshot::VolumeSnapshots::get_snapshots( - &snapargs.volume(), - &snapargs.snapshot(), - &cli_args.output, - ) - .await - .map_err(|e| e.into()) - } - GetResources::RebuildHistory { id } => { - volume::Volume::rebuild_history(&id, &cli_args.output) - .await - .map_err(|e| e.into()) - } - }, - GetResourcesK8s::UpgradeStatus(resources) => resources - .get_upgrade(&cli_args.namespace) - .await - .map_err(|e| e.into()), - }, - Operations::Drain(resource) => match resource { - DrainResources::Node(drain_node_args) => node::Node::drain( - &drain_node_args.node_id(), - drain_node_args.label(), - drain_node_args.drain_timeout(), - &cli_args.output, - ) - .await - .map_err(|e| e.into()), - }, - Operations::Scale(resource) => match resource { - ScaleResources::Volume { id, replica_count } => { - volume::Volume::scale(&id, replica_count, &cli_args.output) - .await - .map_err(|e| e.into()) - } - }, - Operations::Cordon(resource) => match resource { - CordonResources::Node { id, label } => { - node::Node::cordon(&id, &label, &cli_args.output) - .await - .map_err(|e| e.into()) - } - }, - Operations::Uncordon(resource) => match resource { - CordonResources::Node { id, label } => { - node::Node::uncordon(&id, &label, &cli_args.output) - .await - .map_err(|e| e.into()) - } - }, - Operations::Dump(resources) => { - resources - .dump(cli_args.kube_config_path) - .await - .map_err(|e| { - println!("Partially collected dump information: "); - e.into() - }) +#[tokio::main] +async fn main() { + let cli_args = CliArgs::args(); + let _tracer_flusher = cli_args.init_tracing(); + + if let Err(error) = cli_args.execute().await { + let mut exit_code = 1; + match error { + Error::RestPlugin(error) => eprintln!("{error}"), + Error::RestClient(error) => { + eprintln!("Failed to initialise the REST client. Error {error}") } - Operations::Upgrade(resources) => { - match preflight_validations::preflight_check( - &cli_args.namespace, - cli_args.kube_config_path.clone(), - cli_args.timeout, - &resources, - ) - .await - { - Ok(_) => { - if resources.dry_run { - resources - .dummy_apply(&cli_args.namespace) - .await - .map_err(|e| e.into()) - } else { - resources - .apply(&cli_args.namespace) - .await - .map_err(|e| e.into()) - } - } - Err(e) => Err(e.into()), - } + Error::Upgrade(error) => { + eprintln!("{error}"); + exit_code = error.into(); } + Error::Generic(error) => eprintln!("{error}"), + } + std::process::exit(exit_code); + } +} - Operations::Delete(resource) => match resource { - DeleteResources::Upgrade(res) => { - res.delete(&cli_args.namespace).await.map_err(|e| e.into()) - } - }, - }; +impl CliArgs { + async fn execute(self) -> Result<(), Error> { + // Initialise the REST client. + init_rest(&self).await?; - if let Err(error) = result { - let mut exit_code = 1; - match error { - Error::RestPlugin(err) => eprintln!("{err}"), - Error::Upgrade(err) => { - eprintln!("{err}"); - exit_code = err.into(); - } - Error::Generic(err) => eprintln!("{err}"), + tokio::select! { + shutdown = shutdown::Shutdown::wait_sig() => { + Err(anyhow::anyhow!("Interrupted by {shutdown:?}").into()) + }, + done = self.operations.execute(&self.args) => { + done } - std::process::exit(exit_code); - }; - }; - - tokio::select! { - _shutdown = shutdown::Shutdown::wait_sig() => {}, - _done = fut => {} + } } } /// Initialise the REST client. -async fn init_rest(args: &CliArgs) -> Result<()> { +async fn init_rest(cli_args: &CliArgs) -> Result<(), Error> { // Use the supplied URL if there is one otherwise obtain one from the kubeconfig file. - match args.rest.clone() { - Some(url) => RestClient::init(url, *args.timeout), + match cli_args.rest.clone() { + Some(url) => RestClient::init(url, *cli_args.timeout).map_err(Error::RestClient), None => { let config = kube_proxy::ConfigBuilder::default_api_rest() - .with_kube_config(args.kube_config_path.clone()) - .with_timeout(*args.timeout) - .with_target_mod(|t| t.with_namespace(&args.namespace)) + .with_kube_config(cli_args.args.kube_config_path.clone()) + .with_timeout(*cli_args.timeout) + .with_target_mod(|t| t.with_namespace(&cli_args.args.namespace)) .build() .await?; RestClient::init_with_config(config)?; @@ -269,9 +92,10 @@ async fn init_rest(args: &CliArgs) -> Result<()> { } } -enum Error { +pub enum Error { Upgrade(upgrade::error::Error), RestPlugin(plugin::resources::error::Error), + RestClient(anyhow::Error), Generic(anyhow::Error), } diff --git a/k8s/plugin/src/resources/mod.rs b/k8s/plugin/src/resources/mod.rs index d1d761354..9e2166b0c 100644 --- a/k8s/plugin/src/resources/mod.rs +++ b/k8s/plugin/src/resources/mod.rs @@ -1,7 +1,37 @@ +use crate::Error; use clap::Parser; -use plugin::resources::{CordonResources, DrainResources, GetResources, ScaleResources}; +use plugin::{ + resources::{CordonResources, DrainResources, GetResources, ScaleResources}, + ExecuteOperation, +}; +use std::{ops::Deref, path::PathBuf}; use supportability::DumpArgs; -use upgrade::plugin::upgrade::{DeleteResources, GetUpgradeArgs, UpgradeArgs}; +use upgrade::{ + plugin::upgrade::{DeleteResources, GetUpgradeArgs, UpgradeArgs}, + preflight_validations, +}; + +#[derive(Parser, Debug)] +#[group(skip)] +pub struct CliArgs { + /// Path to kubeconfig file. + #[clap(global = true, long, short = 'k')] + pub(super) kube_config_path: Option, + + /// Kubernetes namespace of mayastor service + #[clap(global = true, long, short = 'n', default_value = "mayastor")] + pub(super) namespace: String, + + #[clap(flatten)] + cli_args: plugin::CliArgs, +} +impl Deref for CliArgs { + type Target = plugin::CliArgs; + + fn deref(&self) -> &Self::Target { + &self.cli_args + } +} #[derive(clap::Subcommand, Debug)] pub enum GetResourcesK8s { @@ -37,3 +67,48 @@ pub enum Operations { #[clap(subcommand)] Delete(DeleteResources), } + +#[async_trait::async_trait(?Send)] +impl ExecuteOperation for Operations { + type Args = CliArgs; + type Error = Error; + async fn execute(&self, cli_args: &CliArgs) -> Result<(), Error> { + match self { + Operations::Get(resource) => match resource { + GetResourcesK8s::Rest(resource) => resource.execute(cli_args).await?, + GetResourcesK8s::UpgradeStatus(resources) => { + // todo: use generic execute trait + resources.get_upgrade(&cli_args.namespace).await? + } + }, + Operations::Drain(resource) => resource.execute(cli_args).await?, + Operations::Scale(resource) => resource.execute(cli_args).await?, + Operations::Cordon(resource) => resource.execute(cli_args).await?, + Operations::Uncordon(resource) => resource.execute(cli_args).await?, + Operations::Dump(resources) => { + // todo: build and pass arguments + resources.execute(&()).await.map_err(|e| { + // todo: check why is this here, can it be removed? + println!("Partially collected dump information: "); + e + })? + } + Operations::Upgrade(resources) => { + // todo: use generic execute trait + preflight_validations::preflight_check( + &cli_args.namespace, + cli_args.kube_config_path.clone(), + cli_args.timeout, + resources, + ) + .await?; + resources.execute(&cli_args.namespace).await? + } + Operations::Delete(resource) => match resource { + // todo: use generic execute trait + DeleteResources::Upgrade(res) => res.delete(&cli_args.namespace).await?, + }, + } + Ok(()) + } +} diff --git a/k8s/supportability/Cargo.toml b/k8s/supportability/Cargo.toml index 130149009..30d775a5a 100644 --- a/k8s/supportability/Cargo.toml +++ b/k8s/supportability/Cargo.toml @@ -45,3 +45,4 @@ pstor = { path = "../../dependencies/control-plane/utils/pstor" } platform = { path = "../../dependencies/control-plane/utils/platform" } openapi = { path = "../../dependencies/control-plane/openapi", default-features = false, features = [ "tower-client", "tower-trace" ] } kube-proxy = { path = "../proxy" } +rest-plugin = { path = "../../dependencies/control-plane/control-plane/plugin", default-features = false } diff --git a/k8s/supportability/src/lib.rs b/k8s/supportability/src/lib.rs index da6f865f9..c88ac49ab 100644 --- a/k8s/supportability/src/lib.rs +++ b/k8s/supportability/src/lib.rs @@ -2,18 +2,19 @@ pub mod collect; pub mod operations; use collect::{ - common::DumpConfig, + common::{DumpConfig, OutputFormat}, error::Error, resource_dump::ResourceDumper, resources::{node::NodeClientWrapper, Resourcer}, rest_wrapper, + utils::log, }; -use operations::{Operations, Resource}; +use operations::Resource; #[cfg(debug_assertions)] use collect::resources::{pool::PoolClientWrapper, traits::Topologer, volume::VolumeClientWrapper}; -use crate::collect::{common::OutputFormat, utils::log}; +use plugin::ExecuteOperation; use std::path::PathBuf; /// Collects state & log information of mayastor services running in the system and dump them. @@ -44,6 +45,10 @@ pub struct SupportArgs { /// Kubernetes namespace of mayastor service #[clap(global = true, long, short = 'n', default_value = "mayastor")] namespace: String, + + /// Path to kubeconfig file. + #[clap(global = true, long, short = 'k')] + kube_config_path: Option, } /// Supportability - collects state & log information of services and dumps it to a tar file. @@ -58,214 +63,209 @@ pub struct DumpArgs { resource: Resource, } -impl DumpArgs { - /// Execute the dump of the specified resources. - pub async fn dump(self, kube_config: Option) -> anyhow::Result<()> { - self.args - .execute(kube_config, Operations::Dump(self.resource)) - .await +#[async_trait::async_trait(?Send)] +impl ExecuteOperation for DumpArgs { + type Args = (); + type Error = anyhow::Error; + async fn execute(&self, _: &Self::Args) -> Result<(), Self::Error> { + self.resource.execute(&self.args).await } } -impl SupportArgs { - /// Execute the specified operation. - pub(crate) async fn execute( - self, - kube_config_path: Option, - operation: Operations, - ) -> anyhow::Result<()> { - // Initialise the REST client. +#[async_trait::async_trait(?Send)] +impl ExecuteOperation for Resource { + type Args = SupportArgs; + type Error = anyhow::Error; + + async fn execute(&self, cli_args: &Self::Args) -> Result<(), Self::Error> { let config = kube_proxy::ConfigBuilder::default_api_rest() - .with_kube_config(kube_config_path.clone()) - .with_timeout(*self.timeout) - .with_target_mod(|t| t.with_namespace(&self.namespace)) + .with_kube_config(cli_args.kube_config_path.clone()) + .with_timeout(*cli_args.timeout) + .with_target_mod(|t| t.with_namespace(&cli_args.namespace)) .build() .await?; let rest_client = rest_wrapper::RestClient::new_with_config(config); - // TODO: Move code inside options to some generic function - // Perform the operations based on user chosen subcommands - match operation { - Operations::Dump(resource) => self - .execute_resource_dump(rest_client, kube_config_path, resource) - .await - .map_err(|e| anyhow::anyhow!("{:?}", e)), - } + execute_resource_dump( + cli_args.clone(), + rest_client, + cli_args.kube_config_path.clone(), + self.clone(), + ) + .await + .map_err(|e| anyhow::anyhow!("{:?}", e)) } +} - async fn execute_resource_dump( - self, - rest_client: rest_wrapper::RestClient, - kube_config_path: Option, - resource: Resource, - ) -> Result<(), Error> { - let cli_args = self; +async fn execute_resource_dump( + cli_args: SupportArgs, + rest_client: rest_wrapper::RestClient, + kube_config_path: Option, + resource: Resource, +) -> Result<(), Error> { + #[cfg(debug_assertions)] + let topologer: Box; + let mut config = DumpConfig { + rest_client: rest_client.clone(), + output_directory: cli_args.output_directory_path, + namespace: cli_args.namespace, + loki_uri: cli_args.loki_endpoint, + etcd_uri: cli_args.etcd_endpoint, + since: cli_args.since, + kube_config_path, + timeout: cli_args.timeout, #[cfg(debug_assertions)] - let topologer: Box; - let mut config = DumpConfig { - rest_client: rest_client.clone(), - output_directory: cli_args.output_directory_path, - namespace: cli_args.namespace, - loki_uri: cli_args.loki_endpoint, - etcd_uri: cli_args.etcd_endpoint, - since: cli_args.since, - kube_config_path, - timeout: cli_args.timeout, - #[cfg(debug_assertions)] - topologer: None, - output_format: OutputFormat::Tar, - }; - let mut errors = Vec::new(); - match resource { - Resource::Loki => { - let mut system_dumper = - collect::system_dump::SystemDumper::get_or_panic_system_dumper(config, true) - .await; - let node_topologer = NodeClientWrapper::new(system_dumper.rest_client()) - .get_topologer(None) - .await - .ok(); - log("Completed collection of topology information".to_string()); + topologer: None, + output_format: OutputFormat::Tar, + }; + let mut errors = Vec::new(); + match resource { + Resource::Loki => { + let mut system_dumper = + collect::system_dump::SystemDumper::get_or_panic_system_dumper(config, true).await; + let node_topologer = NodeClientWrapper::new(system_dumper.rest_client()) + .get_topologer(None) + .await + .ok(); + log("Completed collection of topology information".to_string()); - system_dumper - .collect_and_dump_loki_logs(node_topologer) - .await?; - if let Err(e) = system_dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + system_dumper + .collect_and_dump_loki_logs(node_topologer) + .await?; + if let Err(e) = system_dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); + } + } + Resource::System(args) => { + let mut system_dumper = collect::system_dump::SystemDumper::get_or_panic_system_dumper( + config, + args.disable_log_collection, + ) + .await; + if let Err(e) = system_dumper.dump_system().await { + // NOTE: We also need to log error content into Supportability log file + log(format!("Failed to dump system state, error: {e:?}")); + errors.push(e); + } + if let Err(e) = system_dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); + } + } + #[cfg(debug_assertions)] + Resource::Volumes => { + let volume_client = VolumeClientWrapper::new(rest_client); + topologer = volume_client.get_topologer(None).await?; + config.topologer = Some(topologer); + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_info("topology/volume".to_string()).await { + log(format!("Failed to dump volumes information, Error: {e:?}")); + errors.push(e); + } + if let Err(e) = dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); } - Resource::System(args) => { - let mut system_dumper = - collect::system_dump::SystemDumper::get_or_panic_system_dumper( - config, - args.disable_log_collection, - ) - .await; - if let Err(e) = system_dumper.dump_system().await { - // NOTE: We also need to log error content into Supportability log file - log(format!("Failed to dump system state, error: {e:?}")); - errors.push(e); - } - if let Err(e) = system_dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + } + #[cfg(debug_assertions)] + Resource::Volume { id } => { + let volume_client = VolumeClientWrapper::new(rest_client); + topologer = volume_client.get_topologer(Some(id)).await?; + config.topologer = Some(topologer); + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_info("topology/volume".to_string()).await { + log(format!( + "Failed to dump volume {id} information, Error: {e:?}" + )); + errors.push(e); } - #[cfg(debug_assertions)] - Resource::Volumes => { - let volume_client = VolumeClientWrapper::new(rest_client); - topologer = volume_client.get_topologer(None).await?; - config.topologer = Some(topologer); - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_info("topology/volume".to_string()).await { - log(format!("Failed to dump volumes information, Error: {e:?}")); - errors.push(e); - } - if let Err(e) = dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + if let Err(e) = dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); } - #[cfg(debug_assertions)] - Resource::Volume { id } => { - let volume_client = VolumeClientWrapper::new(rest_client); - topologer = volume_client.get_topologer(Some(id)).await?; - config.topologer = Some(topologer); - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_info("topology/volume".to_string()).await { - log(format!( - "Failed to dump volume {id} information, Error: {e:?}" - )); - errors.push(e); - } - if let Err(e) = dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + } + #[cfg(debug_assertions)] + Resource::Pools => { + let pool_client = PoolClientWrapper::new(rest_client); + topologer = pool_client.get_topologer(None).await?; + config.topologer = Some(topologer); + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_info("topology/pool".to_string()).await { + log(format!("Failed to dump pools information, Error: {e:?}")); + errors.push(e); + } + if let Err(e) = dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); + } + } + #[cfg(debug_assertions)] + Resource::Pool { id } => { + let pool_client = PoolClientWrapper::new(rest_client); + topologer = pool_client.get_topologer(Some(id.to_string())).await?; + config.topologer = Some(topologer); + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_info("topology/pool".to_string()).await { + log(format!( + "Failed to dump pool {id} information, Error: {e:?}" + )); + errors.push(e); } - #[cfg(debug_assertions)] - Resource::Pools => { - let pool_client = PoolClientWrapper::new(rest_client); - topologer = pool_client.get_topologer(None).await?; - config.topologer = Some(topologer); - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_info("topology/pool".to_string()).await { - log(format!("Failed to dump pools information, Error: {e:?}")); - errors.push(e); - } - if let Err(e) = dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + if let Err(e) = dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); } - #[cfg(debug_assertions)] - Resource::Pool { id } => { - let pool_client = PoolClientWrapper::new(rest_client); - topologer = pool_client.get_topologer(Some(id.to_string())).await?; - config.topologer = Some(topologer); - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_info("topology/pool".to_string()).await { - log(format!( - "Failed to dump pool {id} information, Error: {e:?}" - )); - errors.push(e); - } - if let Err(e) = dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + } + #[cfg(debug_assertions)] + Resource::Nodes => { + let node_client = NodeClientWrapper { rest_client }; + topologer = node_client.get_topologer(None).await?; + config.topologer = Some(topologer); + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_info("topology/node".to_string()).await { + log(format!("Failed to dump nodes information, Error: {e:?}")); + errors.push(e); } - #[cfg(debug_assertions)] - Resource::Nodes => { - let node_client = NodeClientWrapper { rest_client }; - topologer = node_client.get_topologer(None).await?; - config.topologer = Some(topologer); - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_info("topology/node".to_string()).await { - log(format!("Failed to dump nodes information, Error: {e:?}")); - errors.push(e); - } - if let Err(e) = dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + if let Err(e) = dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); } - #[cfg(debug_assertions)] - Resource::Node { id } => { - let node_client = NodeClientWrapper { rest_client }; - topologer = node_client.get_topologer(Some(id.to_string())).await?; - config.topologer = Some(topologer); - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_info("topology/node".to_string()).await { - log(format!( - "Failed to dump node {id} information, Error: {e:?}" - )); - errors.push(e); - } - if let Err(e) = dumper.fill_archive_and_delete_tmp() { - log(format!("Failed to copy content to archive, error: {e:?}")); - errors.push(e); - } + } + #[cfg(debug_assertions)] + Resource::Node { id } => { + let node_client = NodeClientWrapper { rest_client }; + topologer = node_client.get_topologer(Some(id.to_string())).await?; + config.topologer = Some(topologer); + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_info("topology/node".to_string()).await { + log(format!( + "Failed to dump node {id} information, Error: {e:?}" + )); + errors.push(e); } - Resource::Etcd { stdout } => { - config.output_format = if stdout { - OutputFormat::Stdout - } else { - OutputFormat::Tar - }; - let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; - if let Err(e) = dumper.dump_etcd().await { - log(format!("Failed to dump etcd information, Error: {e:?}")); - errors.push(e); - } + if let Err(e) = dumper.fill_archive_and_delete_tmp() { + log(format!("Failed to copy content to archive, error: {e:?}")); + errors.push(e); } } - if !errors.is_empty() { - return Err(Error::MultipleErrors(errors)); + Resource::Etcd { stdout } => { + config.output_format = if stdout { + OutputFormat::Stdout + } else { + OutputFormat::Tar + }; + let mut dumper = ResourceDumper::get_or_panic_resource_dumper(config).await; + if let Err(e) = dumper.dump_etcd().await { + log(format!("Failed to dump etcd information, Error: {e:?}")); + errors.push(e); + } } - println!("Completed collection of dump !!"); - Ok(()) } + if !errors.is_empty() { + return Err(Error::MultipleErrors(errors)); + } + println!("Completed collection of dump !!"); + Ok(()) } diff --git a/k8s/upgrade/src/plugin/upgrade.rs b/k8s/upgrade/src/plugin/upgrade.rs index 461b7b255..d8c80a884 100644 --- a/k8s/upgrade/src/plugin/upgrade.rs +++ b/k8s/upgrade/src/plugin/upgrade.rs @@ -198,6 +198,15 @@ impl UpgradeArgs { Ok(()) } + /// Execute the upgrade command. + pub async fn execute(&self, namespace: &str) -> error::Result<()> { + if self.dry_run { + self.dummy_apply(namespace).await + } else { + self.apply(namespace).await + } + } + /// Dummy upgrade the resources. pub async fn dummy_apply(&self, namespace: &str) -> error::Result<()> { let mut pods_names: Vec = Vec::new();