From 0ab679bf4ae6096db7e93fcb55180e1003ada700 Mon Sep 17 00:00:00 2001 From: Nick Mosher Date: Fri, 22 Jan 2021 10:57:47 -0500 Subject: [PATCH 01/10] Consolidate start and upgrade checks --- Cargo.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d137fe4f2e..5f73b13356 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2560,9 +2560,9 @@ dependencies = [ [[package]] name = "rand" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18519b42a40024d661e1714153e9ad0c3de27cd495760ceb09710920f1098b1e" +checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e" dependencies = [ "libc", "rand_chacha 0.3.0", @@ -3127,7 +3127,7 @@ checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" dependencies = [ "cfg-if 1.0.0", "libc", - "rand 0.8.2", + "rand 0.8.3", "redox_syscall 0.2.4", "remove_dir_all", "winapi", From e27827e5d8e0ae1bee258354b76d78c69a859236 Mon Sep 17 00:00:00 2001 From: Nick Mosher Date: Mon, 25 Jan 2021 11:58:40 -0500 Subject: [PATCH 02/10] Factor out SysInstaller from k8 and local installers --- Cargo.lock | 75 ++++++++++- src/cluster/Cargo.toml | 1 + src/cluster/src/cli/start/k8.rs | 22 ---- src/cluster/src/cli/start/mod.rs | 4 +- src/cluster/src/cli/start/sys.rs | 40 ++++++ src/cluster/src/error.rs | 10 ++ src/cluster/src/lib.rs | 14 ++ src/cluster/src/start/k8.rs | 83 +++--------- src/cluster/src/start/local.rs | 25 ++-- src/cluster/src/start/mod.rs | 77 ----------- src/cluster/src/sys.rs | 214 +++++++++++++++++++++++++++++++ 11 files changed, 384 insertions(+), 181 deletions(-) create mode 100644 src/cluster/src/cli/start/sys.rs create mode 100644 src/cluster/src/sys.rs diff --git a/Cargo.lock b/Cargo.lock index 5f73b13356..062d5f2bb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -560,7 +560,7 @@ dependencies = [ "ansi_term 0.11.0", "atty", "bitflags", - "strsim", + "strsim 0.8.0", "textwrap", "unicode-width", "vec_map", @@ -764,6 +764,41 @@ dependencies = [ "winapi", ] +[[package]] +name = "darling" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.9.3", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "data-encoding" version = "2.3.1" @@ -796,6 +831,31 @@ dependencies = [ "rusticata-macros", ] +[[package]] +name = "derive_builder" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2658621297f2cf68762a6f7dc0bb7e1ff2cfd6583daef8ee0fed6f7ec468ec0" +dependencies = [ + "darling", + "derive_builder_core", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "derive_builder_core" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2791ea3e372c8495c0bc2033991d76b512cd799d07491fbd6890124db9458bef" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "digest" version = "0.9.0" @@ -987,6 +1047,7 @@ dependencies = [ "async-trait", "color-eyre", "colored", + "derive_builder", "fluvio", "fluvio-command", "fluvio-controlplane-metadata", @@ -1901,6 +1962,12 @@ dependencies = [ "want", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.2.0" @@ -3068,6 +3135,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +[[package]] +name = "strsim" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c" + [[package]] name = "structopt" version = "0.3.21" diff --git a/src/cluster/Cargo.toml b/src/cluster/Cargo.toml index 5896f79f4e..afd602c05f 100644 --- a/src/cluster/Cargo.toml +++ b/src/cluster/Cargo.toml @@ -43,6 +43,7 @@ async-channel = "1.5.1" futures-lite = "1.11.0" tokio = { version = "0.2.21", features = ["macros"] } once_cell = "1.5" +derive_builder = "0.9.0" # Fluvio dependencies fluvio = { version = "0.4.0", path = "../client", default-features = false } diff --git a/src/cluster/src/cli/start/k8.rs b/src/cluster/src/cli/start/k8.rs index 1eeb567f8f..201d0bd940 100644 --- a/src/cluster/src/cli/start/k8.rs +++ b/src/cluster/src/cli/start/k8.rs @@ -115,28 +115,6 @@ pub async fn install_core( Ok(()) } -pub fn install_sys(opt: StartOpt, upgrade: bool) -> Result<(), ClusterCliError> { - let mut builder = ClusterInstaller::new() - .with_namespace(opt.k8_config.namespace) - .with_upgrade(upgrade); - - match opt.k8_config.chart_location { - // If a chart location is given, use it - Some(chart_location) => { - builder = builder.with_local_chart(chart_location); - } - // If we're in develop mode (but no explicit chart location), use local path - None if opt.develop => { - builder = builder.with_local_chart("./k8-util/helm"); - } - _ => (), - } - let installer = builder.build()?; - installer._install_sys().map_err(ClusterError::InstallK8)?; - println!("fluvio sys chart has been installed"); - Ok(()) -} - pub async fn run_setup(opt: StartOpt) -> Result<(), ClusterCliError> { let mut builder = ClusterInstaller::new().with_namespace(opt.k8_config.namespace); match opt.k8_config.chart_location { diff --git a/src/cluster/src/cli/start/mod.rs b/src/cluster/src/cli/start/mod.rs index c1ce5a6895..bf2b8a1a14 100644 --- a/src/cluster/src/cli/start/mod.rs +++ b/src/cluster/src/cli/start/mod.rs @@ -5,6 +5,7 @@ use structopt::StructOpt; mod local; mod k8; +mod sys; mod tls; use crate::cli::ClusterCliError; @@ -153,10 +154,9 @@ pub struct StartOpt { impl StartOpt { pub async fn process(self, upgrade: bool, skip_sys: bool) -> Result<(), ClusterCliError> { - use k8::install_sys; use k8::install_core; use k8::run_setup; - + use sys::install_sys; use local::{install_local, run_local_setup}; if self.sys { diff --git a/src/cluster/src/cli/start/sys.rs b/src/cluster/src/cli/start/sys.rs new file mode 100644 index 0000000000..3005636757 --- /dev/null +++ b/src/cluster/src/cli/start/sys.rs @@ -0,0 +1,40 @@ +use crate::cli::start::StartOpt; +use crate::cli::ClusterCliError; +use crate::sys::{SysConfig, SysInstaller}; +use crate::ClusterError; +use crate::error::SysInstallError; + +pub fn install_sys(opt: StartOpt, upgrade: bool) -> Result<(), ClusterCliError> { + install_sys_impl(opt, upgrade).map_err(ClusterError::InstallSys)?; + Ok(()) +} + +fn install_sys_impl(opt: StartOpt, upgrade: bool) -> Result<(), SysInstallError> { + let mut builder = SysConfig::builder(); + builder.with_namespace(opt.k8_config.namespace); + + match opt.k8_config.chart_location { + // If a chart location is given, use it + Some(chart_location) => { + builder.with_local_chart(chart_location); + } + // If we're in develop mode (but no explicit chart location), use local path + None if opt.develop => { + builder.with_local_chart("./k8-util/helm"); + } + _ => (), + } + + let config = builder.build()?; + let installer = SysInstaller::with_config(config)?; + + if upgrade { + installer.install()?; + println!("Fluvio system chart has been installed"); + } else { + installer.upgrade()?; + println!("Fluvio system chart has been upgraded"); + } + + Ok(()) +} diff --git a/src/cluster/src/error.rs b/src/cluster/src/error.rs index b77bc2f8d6..06c1eefc1c 100644 --- a/src/cluster/src/error.rs +++ b/src/cluster/src/error.rs @@ -16,11 +16,21 @@ pub enum ClusterError { /// An error occurred while trying to install Fluvio locally #[error("Failed to install Fluvio locally")] InstallLocal(#[from] LocalInstallError), + /// An error occurred while trying to install Fluvio system charts + #[error("Failed to install Fluvio system charts")] + InstallSys(#[from] SysInstallError), /// An error occurred while trying to uninstall Fluvio #[error("Failed to uninstall Fluvio")] Uninstall(#[from] UninstallError), } +#[derive(thiserror::Error, Debug)] +pub enum SysInstallError { + /// An error occurred while running helm. + #[error("Helm client error")] + HelmError(#[from] HelmError), +} + /// Errors that may occur while trying to install Fluvio on Kubernetes #[derive(thiserror::Error, Debug)] pub enum K8InstallError { diff --git a/src/cluster/src/lib.rs b/src/cluster/src/lib.rs index 46c9c6c9a5..031877ad0f 100644 --- a/src/cluster/src/lib.rs +++ b/src/cluster/src/lib.rs @@ -18,10 +18,13 @@ #![warn(missing_docs)] +use std::path::PathBuf; + mod check; mod start; mod delete; mod error; +mod sys; /// extensions #[cfg(feature = "cli")] @@ -36,6 +39,7 @@ pub use helm::HelmError; pub use check::{ClusterChecker, CheckStatus, CheckStatuses, CheckResult, CheckResults}; pub use check::{RecoverableCheck, UnrecoverableCheck, CheckFailed, CheckSuggestion}; pub use delete::ClusterUninstaller; +pub use sys::{SysConfig, SysConfigBuilder, SysInstaller}; #[cfg(feature = "platform")] const VERSION: &str = include_str!("../../../VERSION"); @@ -47,6 +51,7 @@ pub(crate) const DEFAULT_NAMESPACE: &str = "default"; pub(crate) const DEFAULT_HELM_VERSION: &str = "3.3.4"; pub(crate) const DEFAULT_CHART_SYS_REPO: &str = "fluvio-sys"; pub(crate) const DEFAULT_CHART_APP_REPO: &str = "fluvio"; +pub(crate) const DEFAULT_CHART_REMOTE: &str = "https://charts.fluvio.io"; /// The result of a successful startup of a Fluvio cluster /// @@ -75,3 +80,12 @@ impl StartStatus { self.port } } + +/// Distinguishes between a Local and Remote helm chart +#[derive(Debug, Clone)] +pub enum ChartLocation { + /// Local charts must be located at a valid filesystem path. + Local(PathBuf), + /// Remote charts will be located at a URL such as `https://...` + Remote(String), +} diff --git a/src/cluster/src/start/k8.rs b/src/cluster/src/start/k8.rs index 420fa9cb55..1174b06145 100644 --- a/src/cluster/src/start/k8.rs +++ b/src/cluster/src/start/k8.rs @@ -23,20 +23,18 @@ use k8_config::K8Config; use k8_client::meta_client::MetadataClient; use k8_types::core::service::{ServiceSpec, TargetPort}; -use crate::helm::{HelmClient, Chart, InstalledChart}; +use crate::helm::{HelmClient, Chart}; use crate::check::{UnrecoverableCheck, CheckFailed, RecoverableCheck, CheckResults, AlreadyInstalled}; -use crate::error::K8InstallError; +use crate::error::{K8InstallError, SysInstallError}; use crate::{ - ClusterError, StartStatus, DEFAULT_NAMESPACE, DEFAULT_CHART_SYS_REPO, DEFAULT_CHART_APP_REPO, - CheckStatus, ClusterChecker, CheckStatuses, + ClusterError, StartStatus, DEFAULT_NAMESPACE, DEFAULT_CHART_APP_REPO, CheckStatus, + ClusterChecker, CheckStatuses, DEFAULT_CHART_REMOTE, ChartLocation, }; -use crate::start::{ChartLocation, DEFAULT_CHART_REMOTE}; +use crate::sys::{SysConfig, SysInstaller}; use fluvio_command::CommandExt; const DEFAULT_REGISTRY: &str = "infinyon"; const DEFAULT_APP_NAME: &str = "fluvio-app"; -const DEFAULT_SYS_NAME: &str = "fluvio-sys"; -const DEFAULT_CHART_SYS_NAME: &str = "fluvio/fluvio-sys"; const DEFAULT_CHART_APP_NAME: &str = "fluvio/fluvio-app"; const DEFAULT_GROUP_NAME: &str = "main"; const DEFAULT_CLOUD_NAME: &str = "minikube"; @@ -641,13 +639,6 @@ impl ClusterInstaller { Ok(versions) } - /// Get installed system chart - pub fn sys_charts() -> Result, K8InstallError> { - let helm_client = HelmClient::new()?; - let sys_charts = helm_client.get_installed_chart_by_name(DEFAULT_CHART_SYS_REPO, None)?; - Ok(sys_charts) - } - /// Checks if all of the prerequisites for installing Fluvio are met /// /// This will attempt to automatically fix any missing prerequisites, @@ -695,8 +686,18 @@ impl ClusterInstaller { match error { RecoverableCheck::MissingSystemChart if self.config.install_sys => { println!("Fluvio system chart not installed. Attempting to install"); - self._install_sys() - .map_err(|_| UnrecoverableCheck::FailedRecovery(error))?; + + // Use closure to catch errors + let result = (|| -> Result<(), SysInstallError> { + let config: SysConfig = SysConfig::builder() + .with_namespace(&self.config.namespace) + .with_cloud(&self.config.cloud) + .build()?; + let sys_installer = SysInstaller::with_config(config)?; + sys_installer.install()?; + Ok(()) + })(); + result.map_err(|_| UnrecoverableCheck::FailedRecovery(error))?; } RecoverableCheck::MinikubeTunnelNotFoundRetry => { println!( @@ -803,56 +804,6 @@ impl ClusterInstaller { }) } - /// Install the Fluvio System chart on the configured cluster - #[doc(hidden)] - #[instrument(skip(self))] - pub fn _install_sys(&self) -> Result<(), K8InstallError> { - use fluvio_helm::InstallArg; - - let install_settings = vec![("cloud".to_owned(), self.config.cloud.to_owned())]; - match &self.config.chart_location { - ChartLocation::Remote(chart_location) => { - debug!( - chart_location = &**chart_location, - "Using remote helm chart:" - ); - self.helm_client - .repo_add(DEFAULT_CHART_APP_REPO, chart_location)?; - self.helm_client.repo_update()?; - let args = InstallArg::new(DEFAULT_CHART_SYS_REPO, DEFAULT_CHART_SYS_NAME) - .namespace(&self.config.namespace) - .opts(install_settings) - .develop(); - if self.config.upgrade { - self.helm_client.upgrade(&args)?; - } else { - self.helm_client.install(&args)?; - } - } - ChartLocation::Local(chart_home) => { - let chart_location = chart_home.join(DEFAULT_SYS_NAME); - let chart_string = chart_location.to_string_lossy(); - debug!( - chart_location = chart_string.as_ref(), - "Using local helm chart:" - ); - println!("installing"); - let args = InstallArg::new(DEFAULT_CHART_SYS_REPO, chart_string) - .namespace(&self.config.namespace) - .develop() - .opts(install_settings); - if self.config.upgrade { - self.helm_client.upgrade(&args)?; - } else { - self.helm_client.install(&args)?; - } - } - } - - info!("Fluvio sys chart has been installed"); - Ok(()) - } - /// Install Fluvio Core chart on the configured cluster #[instrument(skip(self))] fn install_app(&self) -> Result<(), K8InstallError> { diff --git a/src/cluster/src/start/local.rs b/src/cluster/src/start/local.rs index 9b4501f125..c50b5ff093 100644 --- a/src/cluster/src/start/local.rs +++ b/src/cluster/src/start/local.rs @@ -17,13 +17,12 @@ use k8_types::{InputK8Obj, InputObjectMeta}; use k8_client::SharedK8Client; use crate::{ - LocalInstallError, ClusterError, UnrecoverableCheck, StartStatus, DEFAULT_NAMESPACE, - ClusterChecker, + LocalInstallError, ClusterError, UnrecoverableCheck, StartStatus, ClusterChecker, + ChartLocation, DEFAULT_CHART_REMOTE, }; use crate::check::{RecoverableCheck, CheckResults}; -use crate::start::k8::ClusterInstaller; -use crate::start::{ChartLocation, DEFAULT_CHART_REMOTE}; use crate::check::render::render_check_progress; +use crate::sys::{SysConfig, SysInstaller}; use fluvio_command::CommandExt; const LOCAL_SC_ADDRESS: &str = "localhost:9003"; @@ -269,8 +268,8 @@ impl LocalClusterInstallerBuilder { /// # Example /// /// ```no_run - /// # use fluvio_cluster::ClusterInstaller; - /// let installer = ClusterInstaller::new() + /// # use fluvio_cluster::LocalClusterInstaller; + /// let installer = LocalClusterInstaller::new() /// .with_render_checks(true) /// .build() /// .unwrap(); @@ -345,7 +344,7 @@ impl LocalClusterInstaller { #[instrument(skip(error))] async fn pre_install_fix( install_sys: bool, - chart_location: ChartLocation, + sys_chart_location: ChartLocation, error: RecoverableCheck, ) -> Result<(), UnrecoverableCheck> { // Depending on what error occurred, try to fix the error. @@ -357,14 +356,14 @@ impl LocalClusterInstaller { // Use closure to catch any errors let result = (|| -> Result<_, ClusterError> { - let mut builder = ClusterInstaller::new().with_namespace(DEFAULT_NAMESPACE); - - if let ChartLocation::Local(chart) = &chart_location { - builder = builder.with_local_chart(chart); + let mut builder = SysConfig::builder(); + if let ChartLocation::Local(path) = &sys_chart_location { + builder.with_local_chart(path); } - let installer = builder.build()?; - installer._install_sys()?; + let config: SysConfig = builder.build()?; + let installer = SysInstaller::with_config(config)?; + installer.install()?; Ok(()) })(); diff --git a/src/cluster/src/start/mod.rs b/src/cluster/src/start/mod.rs index 3d32b07381..8fe1ad74da 100644 --- a/src/cluster/src/start/mod.rs +++ b/src/cluster/src/start/mod.rs @@ -1,79 +1,2 @@ -use std::path::PathBuf; - pub mod k8; pub mod local; - -const DEFAULT_CHART_REMOTE: &str = "https://charts.fluvio.io"; - -/// Distinguishes between a Local and Remote helm chart -#[derive(Debug, Clone)] -pub enum ChartLocation { - /// Local charts must be located at a valid filesystem path. - Local(PathBuf), - /// Remote charts will be located at a URL such as `https://...` - Remote(String), -} - -// /// Runs all of the given checks and attempts to fix any errors -// /// -// /// This requires a fixing-function to be given, which takes a -// /// `RecoverableCheck` and returns a `Result<(), UnrecoverableCheck>`. -// /// -// /// If the fixing function was able to fix the problem, it returns -// /// `Ok(())`. Otherwise, it may wrap the failed check and return it, -// /// such as: -// /// -// /// ```ignore -// /// # use fluvio_cluster::{RecoverableCheck, UnrecoverableCheck}; -// /// async fn fix(check: RecoverableCheck) -> Result<(), UnrecoverableCheck> { -// /// // Try to fix the check... -// /// // If the fix did not succeed: -// /// Err(UnrecoverableCheck::FailedRecovery(check)) -// /// } -// /// -// /// # async fn do_check_and_fix() { -// /// let check_results = check_and_fix(&[todo!("Add some checks")], fix).await; -// /// # } -// /// ``` -// pub(crate) async fn check_and_fix(checks: &[Box], fix: F) -> CheckResults -// where -// F: Fn(RecoverableCheck) -> R, -// R: Future>, -// { -// // We want to collect all of the results of the checks -// let mut results: Vec = vec![]; -// -// for check in checks { -// // Perform one individual check -// let check_result = check.perform_check().await; -// match check_result { -// // If the check passed, add it to the results list -// it @ Ok(CheckStatus::Pass(_)) => results.push(it), -// // If the check failed but is potentially auto-recoverable, try to recover it -// Ok(CheckStatus::Fail(CheckFailed::AutoRecoverable(it))) => { -// let err = format!("{}", it); -// let fix_result = fix(it).await; -// match fix_result { -// // If the fix worked, return a passed check -// Ok(_) => results.push(Ok(CheckStatus::pass(format!("Fixed: {}", err)))), -// Err(e) => { -// // If the fix failed, wrap the original failed check in Unrecoverable -// results.push(Ok(CheckStatus::fail(CheckFailed::Unrecoverable(e)))); -// // We return upon the first check failure -// return CheckResults::from(results); -// } -// } -// } -// it @ Ok(CheckStatus::Fail(_)) => { -// results.push(it); -// return CheckResults::from(results); -// } -// it @ Err(_) => { -// results.push(it); -// return CheckResults::from(results); -// } -// } -// } -// -// CheckResults::from(results) -// } diff --git a/src/cluster/src/sys.rs b/src/cluster/src/sys.rs new file mode 100644 index 0000000000..39b035cfca --- /dev/null +++ b/src/cluster/src/sys.rs @@ -0,0 +1,214 @@ +use tracing::{info, debug, instrument}; +use derive_builder::Builder; +use crate::{ChartLocation, DEFAULT_CHART_APP_REPO, DEFAULT_CHART_SYS_REPO, DEFAULT_NAMESPACE, DEFAULT_CHART_REMOTE}; +use fluvio_helm::{HelmClient, InstallArg}; +use std::path::{Path, PathBuf}; +use crate::error::SysInstallError; + +const DEFAULT_SYS_NAME: &str = "fluvio-sys"; +const DEFAULT_CHART_SYS_NAME: &str = "fluvio/fluvio-sys"; +const DEFAULT_CLOUD_NAME: &str = "minikube"; + +/// Configuration options for installing Fluvio system charts +#[derive(Builder, Debug)] +#[builder(build_fn(skip), setter(prefix = "with"))] +pub struct SysConfig { + /// The type of cloud infrastructure the cluster will be running on + /// + /// # Example + /// + /// ``` + /// # use fluvio_cluster::SysConfigBuilder; + /// # fn add_cloud(builder: &mut SysConfigBuilder) { + /// builder.with_cloud("minikube"); + /// # } + /// ``` + #[builder(setter(into))] + pub cloud: String, + /// The namespace in which to install the system chart + /// + /// # Example + /// + /// ``` + /// # use fluvio_cluster::SysConfigBuilder; + /// # fn add_namespace(builder: &mut SysConfigBuilder) { + /// builder.with_namespace("fluvio"); + /// # } + /// ``` + #[builder(setter(into))] + pub namespace: String, + /// The location at which to find the system chart to install + #[builder(private)] + pub chart_location: ChartLocation, +} + +impl SysConfig { + /// Creates a default [`SysConfigBuilder`]. + /// + /// # Example + /// + /// ``` + /// use fluvio_cluster::SysConfig; + /// let builder = SysConfig::builder(); + /// ``` + pub fn builder() -> SysConfigBuilder { + SysConfigBuilder::default() + } +} + +impl SysConfigBuilder { + /// Validates all builder options and constructs a `SysConfig` + pub fn build(&self) -> Result { + let cloud = self.cloud.clone().unwrap_or_else(|| DEFAULT_CLOUD_NAME.to_string()); + let namespace = self.namespace.clone().unwrap_or_else(|| DEFAULT_NAMESPACE.to_string()); + let chart_location = self.chart_location.clone() + .unwrap_or_else(|| ChartLocation::Remote(DEFAULT_CHART_REMOTE.to_string())); + + Ok(SysConfig { + cloud, + namespace, + chart_location, + }) + } + + /// The local chart location to install sys charts from + /// + /// # Example + /// + /// ``` + /// # use fluvio_cluster::SysConfigBuilder; + /// # fn add_local_chart(builder: &mut SysConfigBuilder) { + /// builder.with_local_chart("./helm/fluvio-sys"); + /// # } + /// ``` + pub fn with_local_chart>(&mut self, path: P) -> &mut Self { + self.chart_location = Some(ChartLocation::Local(path.into())); + self + } + + /// The remote chart location to install sys charts from + /// + /// # Example + /// + /// ``` + /// # use fluvio_cluster::SysConfigBuilder; + /// # fn add_remote_chart(builder: &mut SysConfigBuilder) { + /// builder.with_remote_chart("https://charts.fluvio.io"); + /// } + /// ``` + pub fn with_remote_chart>(&mut self, location: S) -> &mut Self { + self.chart_location = Some(ChartLocation::Remote(location.into())); + self + } +} + +/// Installs or upgrades the Fluvio system charts +#[derive(Debug)] +pub struct SysInstaller { + config: SysConfig, + helm_client: HelmClient, +} + +impl SysInstaller { + /// Create a new `SysInstaller` using the default config + pub fn new() -> Result { + let config = SysConfig::builder().build()?; + Self::with_config(config) + } + + /// Create a new `SysInstaller` using the given config + pub fn with_config(config: SysConfig) -> Result { + let helm_client = HelmClient::new()?; + Ok(Self { + config, + helm_client, + }) + } + + /// Install the Fluvio System chart on the configured cluster + pub fn install(&self) -> Result<(), SysInstallError> { + self.process(false) + } + + /// Upgrade the Fluvio System chart on the configured cluster + pub fn upgrade(&self) -> Result<(), SysInstallError> { + self.process(true) + } + + #[instrument(skip(self))] + fn process(&self, upgrade: bool) -> Result<(), SysInstallError> { + let settings = vec![("cloud".to_owned(), self.config.cloud.to_owned())]; + match &self.config.chart_location { + ChartLocation::Remote(chart_location) => { + self.process_remote_chart(chart_location, upgrade, settings)?; + } + ChartLocation::Local(chart_home) => { + self.process_local_chart(chart_home, upgrade, settings)?; + } + } + + info!("Fluvio sys chart has been installed"); + Ok(()) + } + + #[instrument(skip(self, upgrade))] + fn process_remote_chart( + &self, + chart: &str, + upgrade: bool, + settings: Vec<(String, String)>, + ) -> Result<(), SysInstallError> { + debug!(?chart, "Using remote helm chart:"); + self.helm_client + .repo_add(DEFAULT_CHART_APP_REPO, chart)?; + self.helm_client.repo_update()?; + let args = InstallArg::new(DEFAULT_CHART_SYS_REPO, DEFAULT_CHART_SYS_NAME) + .namespace(&self.config.namespace) + .opts(settings) + .develop(); + if upgrade { + self.helm_client.upgrade(&args)?; + } else { + self.helm_client.install(&args)?; + } + Ok(()) + } + + #[instrument(skip(self, upgrade))] + fn process_local_chart( + &self, + chart_home: &Path, + upgrade: bool, + settings: Vec<(String, String)>, + ) -> Result<(), SysInstallError> { + let chart_location = chart_home.join(DEFAULT_SYS_NAME); + let chart_string = chart_location.to_string_lossy(); + debug!(chart_location = %chart_location.display(), "Using local helm chart:"); + + let args = InstallArg::new(DEFAULT_CHART_SYS_REPO, chart_string) + .namespace(&self.config.namespace) + .develop() + .opts(settings); + if upgrade { + self.helm_client.upgrade(&args)?; + } else { + self.helm_client.install(&args)?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_config_builder() { + let config = SysConfig::builder() + .with_namespace("fluvio") + .with_cloud("minikube") + .with_local_chart("./helm/fluvio-sys") + .build() + .expect("should build config"); + } +} From 42fbcd31065b26986d071d88b9975c2e07c2d5f3 Mon Sep 17 00:00:00 2001 From: Nick Mosher Date: Mon, 25 Jan 2021 12:06:47 -0500 Subject: [PATCH 03/10] Fix broken intra-doc links --- src/cluster/src/check/mod.rs | 41 +++++++++++++++++++++++------------- src/cluster/src/lib.rs | 1 + 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/src/cluster/src/check/mod.rs b/src/cluster/src/check/mod.rs index fbc2737691..f5fb67de93 100644 --- a/src/cluster/src/check/mod.rs +++ b/src/cluster/src/check/mod.rs @@ -354,8 +354,11 @@ impl ClusterCheck for LoadBalancer { /// Manages all cluster check operations /// /// A `ClusterChecker` can be configured with different sets of checks to run. -/// It can wait for all checks to run sequentially using [`run`], or spawn a +/// It can wait for all checks to run sequentially using [`run_wait`], or spawn a /// task and receive progress updates about checks using [`run_with_progress`]. +/// +/// [`run_wait`]: ClusterChecker::run_wait +/// [`run_with_progress`]: ClusterChecker::run_with_progress #[derive(Debug)] #[non_exhaustive] pub struct ClusterChecker { @@ -374,6 +377,8 @@ impl ClusterChecker { /// # use fluvio_cluster::ClusterChecker; /// let checker: ClusterChecker = ClusterChecker::empty(); /// ``` + /// + /// [`with_check`]: ClusterChecker::with_check pub fn empty() -> Self { ClusterChecker { checks: vec![] } } @@ -388,10 +393,10 @@ impl ClusterChecker { /// /// Note that no checks are run until one of the `run` methods are invoked. /// - /// - [`run_wait`] - /// - [`run_wait_and_fix`] - /// - [`run_with_progress`] - /// - [`run_and_fix_with_progress`] + /// - [`run_wait`](ClusterChecker::run_wait) + /// - [`run_wait_and_fix`](ClusterChecker::run_wait_and_fix) + /// - [`run_with_progress`](ClusterChecker::run_with_progress) + /// - [`run_and_fix_with_progress`](ClusterChecker::run_and_fix_with_progress) pub fn with_preflight_checks(mut self) -> Self { let checks: Vec> = vec![ Box::new(LoadableConfig), @@ -411,10 +416,10 @@ impl ClusterChecker { /// /// Note that no checks are run until one of the `run` methods are invoked. /// - /// - [`run_wait`] - /// - [`run_wait_and_fix`] - /// - [`run_with_progress`] - /// - [`run_and_fix_with_progress`] + /// - [`run_wait`](ClusterChecker::run_wait) + /// - [`run_wait_and_fix`](ClusterChecker::run_wait_and_fix) + /// - [`run_with_progress`](ClusterChecker::run_with_progress) + /// - [`run_and_fix_with_progress`](ClusterChecker::run_and_fix_with_progress) pub fn with_k8_checks(mut self) -> Self { let checks: Vec> = vec![ Box::new(LoadableConfig), @@ -430,10 +435,10 @@ impl ClusterChecker { /// /// Note that no checks are run until one of the `run` methods are invoked. /// - /// - [`run_wait`] - /// - [`run_wait_and_fix`] - /// - [`run_with_progress`] - /// - [`run_and_fix_with_progress`] + /// - [`run_wait`](ClusterChecker::run_wait) + /// - [`run_wait_and_fix`](ClusterChecker::run_wait_and_fix) + /// - [`run_with_progress`](ClusterChecker::run_with_progress) + /// - [`run_and_fix_with_progress`](ClusterChecker::run_and_fix_with_progress) pub fn with_local_checks(mut self) -> Self { let checks: Vec> = vec![ Box::new(HelmVersion), @@ -462,6 +467,8 @@ impl ClusterChecker { /// .await; /// # } /// ``` + /// + /// [`run_with_progress`]: ClusterChecker::run_with_progress pub async fn run_wait(&self) -> CheckResults { let mut check_results = vec![]; for check in &self.checks { @@ -523,7 +530,7 @@ impl ClusterChecker { /// updates about checks as they are run. /// /// If you want to run the checks as a single batch and receive all of the results - /// at once, use [`run`] instead. + /// at once, use [`run_wait`] instead. /// /// # Example /// @@ -539,6 +546,8 @@ impl ClusterChecker { /// } /// # } /// ``` + /// + /// [`run_wait`]: ClusterChecker::run_wait pub fn run_with_progress(self) -> Receiver { let (sender, receiver) = async_channel::bounded(100); spawn(async move { @@ -563,7 +572,9 @@ impl ClusterChecker { /// progress updates about checks and fixes as they run. /// /// If you want to run checks and fixes as a single batch and receive all of - /// the results at once, use [`run`] instead. + /// the results at once, use [`run_wait`] instead. + /// + /// [`run_wait`]: ClusterChecker::run_wait pub fn run_and_fix_with_progress(self, fix: F) -> Receiver where F: Fn(RecoverableCheck) -> R + Send + Sync + 'static, diff --git a/src/cluster/src/lib.rs b/src/cluster/src/lib.rs index 031877ad0f..21213f8696 100644 --- a/src/cluster/src/lib.rs +++ b/src/cluster/src/lib.rs @@ -17,6 +17,7 @@ //! [`ClusterInstaller`]: ./struct.ClusterInstaller.html #![warn(missing_docs)] +#![deny(broken_intra_doc_links)] use std::path::PathBuf; From c9245573c694d231e86a4387f5a3b9ddc223b98b Mon Sep 17 00:00:00 2001 From: Nick Mosher Date: Mon, 25 Jan 2021 16:35:19 -0500 Subject: [PATCH 04/10] Add 'attempt_fix' to ClusterCheck --- src/cluster/src/check/mod.rs | 185 ++++++++++++++++++++------------- src/cluster/src/cli/check.rs | 7 +- src/cluster/src/start/k8.rs | 140 +++++++++++++------------ src/cluster/src/start/local.rs | 102 +++++++++--------- src/cluster/src/sys.rs | 3 +- 5 files changed, 247 insertions(+), 190 deletions(-) diff --git a/src/cluster/src/check/mod.rs b/src/cluster/src/check/mod.rs index f5fb67de93..697aaba92c 100644 --- a/src/cluster/src/check/mod.rs +++ b/src/cluster/src/check/mod.rs @@ -2,7 +2,6 @@ use std::io::Error as IoError; use std::fmt::Debug; use std::time::Duration; use std::process::{Command}; -use std::future::Future; pub mod render; @@ -22,7 +21,9 @@ use k8_types::InputObjectMeta; use k8_types::core::service::ServiceSpec; use k8_client::ClientError as K8ClientError; -use crate::{DEFAULT_NAMESPACE, DEFAULT_CHART_SYS_REPO, DEFAULT_CHART_APP_REPO, DEFAULT_HELM_VERSION}; +use crate::{DEFAULT_NAMESPACE, DEFAULT_CHART_SYS_REPO, DEFAULT_CHART_APP_REPO, DEFAULT_HELM_VERSION, SysConfig, SysInstaller}; +use crate::error::SysInstallError; +use std::fs::File; const DUMMY_LB_SERVICE: &str = "fluvio-dummy-service"; const DELAY: u64 = 1000; @@ -256,6 +257,15 @@ impl CheckSuggestion for UnrecoverableCheck { pub trait ClusterCheck: Debug + Send + Sync + 'static { /// perform check, if successful return success message, if fail, return fail message async fn perform_check(&self) -> CheckResult; + + /// Attempt to fix a recoverable error. + /// + /// The default implementation is to fail with `FailedRecovery`. Concrete instances + /// may override this implementation with functionality to actually attempt to fix + /// errors. + async fn attempt_fix(&self, check: RecoverableCheck) -> Result<(), UnrecoverableCheck> { + Err(UnrecoverableCheck::FailedRecovery(check)) + } } #[derive(Debug)] @@ -290,14 +300,39 @@ impl ClusterCheck for HelmVersion { } #[derive(Debug)] -pub(crate) struct SysChart; +pub(crate) struct SysChartCheck { + config: SysConfig, +} + +impl SysChartCheck { + pub(crate) fn new(config: SysConfig) -> Self { + Self { config } + } +} #[async_trait] -impl ClusterCheck for SysChart { +impl ClusterCheck for SysChartCheck { async fn perform_check(&self) -> CheckResult { let helm_client = HelmClient::new().map_err(CheckError::HelmError)?; check_system_chart(&helm_client, DEFAULT_CHART_SYS_REPO) } + + async fn attempt_fix(&self, error: RecoverableCheck) -> Result<(), UnrecoverableCheck> { + println!("Fluvio system chart not installed. Attempting to install"); + + // Use closure to catch errors + let result = (|| -> Result<(), SysInstallError> { + let config: SysConfig = SysConfig::builder() + .with_namespace(&self.config.namespace) + .with_cloud(&self.config.cloud) + .build()?; + let sys_installer = SysInstaller::with_config(config)?; + sys_installer.install()?; + Ok(()) + })(); + result.map_err(|_| UnrecoverableCheck::FailedRecovery(error))?; + Ok(()) + } } #[derive(Debug)] @@ -347,7 +382,67 @@ pub(crate) struct LoadBalancer; #[async_trait] impl ClusterCheck for LoadBalancer { async fn perform_check(&self) -> CheckResult { - check_load_balancer_status().await + let config = K8Config::load().map_err(CheckError::K8ConfigError)?; + let context = match config { + K8Config::Pod(_) => return Ok(CheckStatus::pass("Pod config found, ignoring the check")), + K8Config::KubeConfig(context) => context, + }; + + let cluster_context = match context.config.current_context() { + Some(context) => context, + None => { + return Ok(CheckStatus::fail( + UnrecoverableCheck::NoActiveKubernetesContext, + )); + } + }; + + let username = &cluster_context.context.user; + + // create dummy service + create_dummy_service()?; + if wait_for_service_exist(DEFAULT_NAMESPACE).await?.is_some() { + // IP found, everything good + delete_service()?; + } else { + delete_service()?; + if username == MINIKUBE_USERNAME { + // In case of macos we need to run tunnel with elevated context of sudo + // hence handle both separately + return Ok(CheckStatus::fail(get_tunnel_error())); + } + return Ok(CheckStatus::fail( + UnrecoverableCheck::LoadBalancerServiceNotAvailable, + )); + } + + Ok(CheckStatus::pass("Load balancer is up")) + } + + /// Attempt to fix missing load balancer by running `minikube tunnel` + async fn attempt_fix(&self, error: RecoverableCheck) -> Result<(), UnrecoverableCheck> { + use std::process::Stdio; + + // Use closure to catch potential errors + let result = (|| -> Result<(), std::io::Error> { + let log_file = File::create("/tmp/tunnel.out")?; + let error_file = log_file.try_clone()?; + + // run minikube tunnel > /tmp/tunnel.out 2> /tmp/tunnel.out + Command::new("minikube") + .arg("tunnel") + .stdout(Stdio::from(log_file)) + .stderr(Stdio::from(error_file)) + .spawn()?; + Ok(()) + })(); + + if let Err(_) = result { + return Err(UnrecoverableCheck::FailedRecovery(error)); + } + + sleep(Duration::from_millis(DELAY)).await; + Ok(()) } } @@ -384,8 +479,8 @@ impl ClusterChecker { } /// Adds a check to this `ClusterChecker` - pub fn with_check(mut self, check: Box) -> Self { - self.checks.push(check); + pub fn with_check>>(mut self, check: B) -> Self { + self.checks.push(check.into()); self } @@ -402,7 +497,6 @@ impl ClusterChecker { Box::new(LoadableConfig), Box::new(K8Version), Box::new(HelmVersion), - Box::new(SysChart), Box::new(CreateServicePermission), Box::new(CreateCrdPermission), Box::new(CreateServiceAccountPermission), @@ -424,7 +518,6 @@ impl ClusterChecker { let checks: Vec> = vec![ Box::new(LoadableConfig), Box::new(HelmVersion), - Box::new(SysChart), Box::new(LoadBalancer), ]; self.checks.extend(checks); @@ -444,7 +537,6 @@ impl ClusterChecker { Box::new(HelmVersion), Box::new(K8Version), Box::new(LoadableConfig), - Box::new(SysChart), ]; self.checks.extend(checks); self @@ -481,27 +573,22 @@ impl ClusterChecker { /// Performs all checks sequentially, attempting to fix any problems along the way. /// /// This may appear to "hang" if there are many checks, or if fixes take a long time. - pub async fn run_wait_and_fix(&self, fix: F) -> CheckResults - where - F: Fn(RecoverableCheck) -> R, - R: Future>, - { + pub async fn run_wait_and_fix(&self) -> CheckResults { // We want to collect all of the results of the checks let mut results: Vec = vec![]; for check in &self.checks { // Perform one individual check - let check_result = check.perform_check().await; - match check_result { + match check.perform_check().await { // If the check passed, add it to the results list it @ Ok(CheckStatus::Pass(_)) => results.push(it), // If the check failed but is potentially auto-recoverable, try to recover it - Ok(CheckStatus::Fail(CheckFailed::AutoRecoverable(it))) => { - let err = format!("{}", it); - let fix_result = fix(it).await; - match fix_result { - // If the fix worked, return a passed check - Ok(_) => results.push(Ok(CheckStatus::pass(format!("Fixed: {}", err)))), + Ok(CheckStatus::Fail(CheckFailed::AutoRecoverable(recoverable))) => { + let err = format!("{}", recoverable); + match check.attempt_fix(recoverable).await { + Ok(_) => { + results.push(Ok(CheckStatus::pass(format!("Fixed: {}", err)))); + } Err(e) => { // If the fix failed, wrap the original failed check in Unrecoverable results.push(Ok(CheckStatus::fail(CheckFailed::Unrecoverable(e)))); @@ -575,11 +662,7 @@ impl ClusterChecker { /// the results at once, use [`run_wait`] instead. /// /// [`run_wait`]: ClusterChecker::run_wait - pub fn run_and_fix_with_progress(self, fix: F) -> Receiver - where - F: Fn(RecoverableCheck) -> R + Send + Sync + 'static, - R: Future> + Send + Sync, - { + pub fn run_and_fix_with_progress(self) -> Receiver { let (sender, receiver) = async_channel::bounded(100); spawn(async move { for check in &self.checks { @@ -589,10 +672,9 @@ impl ClusterChecker { // If the check passed, add it to the results list it @ Ok(CheckStatus::Pass(_)) => sender.send(it).await, // If the check failed but is potentially auto-recoverable, try to recover it - Ok(CheckStatus::Fail(CheckFailed::AutoRecoverable(it))) => { - let err = format!("{}", it); - let fix_result = fix(it).await; - match fix_result { + Ok(CheckStatus::Fail(CheckFailed::AutoRecoverable(recoverable))) => { + let err = format!("{}", recoverable); + match check.attempt_fix(recoverable).await { // If the fix worked, return a passed check Ok(_) => { sender @@ -668,45 +750,6 @@ pub(crate) fn check_already_installed(helm: &HelmClient, app_repo: &str) -> Chec Ok(CheckStatus::pass("Previous fluvio installation not found")) } -/// Check if load balancer is up -pub(crate) async fn check_load_balancer_status() -> CheckResult { - let config = K8Config::load().map_err(CheckError::K8ConfigError)?; - let context = match config { - K8Config::Pod(_) => return Ok(CheckStatus::pass("Pod config found, ignoring the check")), - K8Config::KubeConfig(context) => context, - }; - - let cluster_context = match context.config.current_context() { - Some(context) => context, - None => { - return Ok(CheckStatus::fail( - UnrecoverableCheck::NoActiveKubernetesContext, - )); - } - }; - - let username = &cluster_context.context.user; - - // create dummy service - create_dummy_service()?; - if wait_for_service_exist(DEFAULT_NAMESPACE).await?.is_some() { - // IP found, everything good - delete_service()?; - } else { - delete_service()?; - if username == MINIKUBE_USERNAME { - // In case of macos we need to run tunnel with elevated context of sudo - // hence handle both separately - return Ok(CheckStatus::fail(get_tunnel_error())); - } - return Ok(CheckStatus::fail( - UnrecoverableCheck::LoadBalancerServiceNotAvailable, - )); - } - - Ok(CheckStatus::pass("Load balancer is up")) -} - fn create_dummy_service() -> Result<(), CheckError> { Command::new("kubectl") .arg("create") diff --git a/src/cluster/src/cli/check.rs b/src/cluster/src/cli/check.rs index 2b63d94243..13e0d0e286 100644 --- a/src/cluster/src/cli/check.rs +++ b/src/cluster/src/cli/check.rs @@ -1,8 +1,9 @@ use structopt::StructOpt; -use crate::ClusterChecker; +use crate::{ClusterChecker, SysConfig, ClusterError}; use crate::cli::ClusterCliError; use crate::check::render::{render_check_progress, render_results_next_steps}; +use crate::check::SysChartCheck; #[derive(Debug, StructOpt)] pub struct CheckOpt {} @@ -11,8 +12,12 @@ impl CheckOpt { pub async fn process(self) -> Result<(), ClusterCliError> { use colored::*; println!("{}", "Running pre-startup checks...".bold()); + let sys_config: SysConfig = SysConfig::builder() + .build() + .map_err(ClusterError::InstallSys)?; let mut progress = ClusterChecker::empty() .with_preflight_checks() + .with_check(SysChartCheck::new(sys_config)) .run_with_progress(); let results = render_check_progress(&mut progress).await; diff --git a/src/cluster/src/start/k8.rs b/src/cluster/src/start/k8.rs index 1174b06145..1b16bf4339 100644 --- a/src/cluster/src/start/k8.rs +++ b/src/cluster/src/start/k8.rs @@ -5,8 +5,6 @@ use std::borrow::Cow; use std::process::Command; use std::time::Duration; use std::net::SocketAddr; -use std::process::Stdio; -use std::fs::File; use std::env; use tracing::{info, warn, debug, error, instrument}; @@ -24,13 +22,12 @@ use k8_client::meta_client::MetadataClient; use k8_types::core::service::{ServiceSpec, TargetPort}; use crate::helm::{HelmClient, Chart}; -use crate::check::{UnrecoverableCheck, CheckFailed, RecoverableCheck, CheckResults, AlreadyInstalled}; -use crate::error::{K8InstallError, SysInstallError}; +use crate::check::{CheckFailed, CheckResults, AlreadyInstalled, SysChartCheck}; +use crate::error::K8InstallError; use crate::{ ClusterError, StartStatus, DEFAULT_NAMESPACE, DEFAULT_CHART_APP_REPO, CheckStatus, - ClusterChecker, CheckStatuses, DEFAULT_CHART_REMOTE, ChartLocation, + ClusterChecker, CheckStatuses, DEFAULT_CHART_REMOTE, ChartLocation, SysConfig, }; -use crate::sys::{SysConfig, SysInstaller}; use fluvio_command::CommandExt; const DEFAULT_REGISTRY: &str = "infinyon"; @@ -50,8 +47,9 @@ static MAX_SC_NETWORK_LOOP: Lazy = Lazy::new(|| { var_value.parse().unwrap_or(30) }); const NETWORK_SLEEP_MS: u64 = 1000; -/// time betwen network check -const DELAY: u64 = 3000; + +// /// time betwen network check +// const DELAY: u64 = 3000; /// A builder for cluster startup options #[derive(Debug)] @@ -652,69 +650,77 @@ impl ClusterInstaller { /// [`with_update_context`]: ./struct.ClusterInstaller.html#method.with_update_context #[instrument(skip(self))] pub async fn setup(&self) -> CheckResults { - let fix = |err| self.pre_install_fix(err); - let mut checker = ClusterChecker::empty().with_k8_checks(); + let sys_config: SysConfig = SysConfig::builder() + .with_namespace(&self.config.namespace) + .with_chart_location(self.config.chart_location.clone()) + .with_cloud(&self.config.cloud) + .build() + .unwrap(); + + let mut checker = ClusterChecker::empty() + .with_k8_checks() + .with_check(SysChartCheck::new(sys_config)); if !self.config.upgrade { - checker = checker.with_check(Box::new(AlreadyInstalled)); + checker = checker.with_check(AlreadyInstalled); } - checker.run_wait_and_fix(fix).await - } - - async fn _try_minikube_tunnel(&self) -> Result<(), K8InstallError> { - let log_file = File::create("/tmp/tunnel.out")?; - let error_file = log_file.try_clone()?; - - // run minikube tunnel > /tmp/tunnel.out 2> /tmp/tunnel.out - Command::new("minikube") - .arg("tunnel") - .stdout(Stdio::from(log_file)) - .stderr(Stdio::from(error_file)) - .spawn()?; - sleep(Duration::from_millis(DELAY)).await; - Ok(()) + checker.run_wait_and_fix().await } - /// Given a pre-check error, attempt to automatically correct it - #[instrument(skip(self, error))] - pub(crate) async fn pre_install_fix( - &self, - error: RecoverableCheck, - ) -> Result<(), UnrecoverableCheck> { - // Depending on what error occurred, try to fix the error. - // If we handle the error successfully, return Ok(()) to indicate success - // If we cannot handle this error, wrap it in UnrecoverableCheck::FailedRecovery - match error { - RecoverableCheck::MissingSystemChart if self.config.install_sys => { - println!("Fluvio system chart not installed. Attempting to install"); - - // Use closure to catch errors - let result = (|| -> Result<(), SysInstallError> { - let config: SysConfig = SysConfig::builder() - .with_namespace(&self.config.namespace) - .with_cloud(&self.config.cloud) - .build()?; - let sys_installer = SysInstaller::with_config(config)?; - sys_installer.install()?; - Ok(()) - })(); - result.map_err(|_| UnrecoverableCheck::FailedRecovery(error))?; - } - RecoverableCheck::MinikubeTunnelNotFoundRetry => { - println!( - "Load balancer service is not available, trying to bring up minikube tunnel" - ); - self._try_minikube_tunnel() - .await - .map_err(|_| UnrecoverableCheck::FailedRecovery(error))?; - } - unhandled => { - warn!("Pre-install was unable to autofix an error"); - return Err(UnrecoverableCheck::FailedRecovery(unhandled)); - } - } - - Ok(()) - } + // async fn _try_minikube_tunnel(&self) -> Result<(), K8InstallError> { + // let log_file = File::create("/tmp/tunnel.out")?; + // let error_file = log_file.try_clone()?; + // + // // run minikube tunnel > /tmp/tunnel.out 2> /tmp/tunnel.out + // Command::new("minikube") + // .arg("tunnel") + // .stdout(Stdio::from(log_file)) + // .stderr(Stdio::from(error_file)) + // .spawn()?; + // sleep(Duration::from_millis(DELAY)).await; + // Ok(()) + // } + + // /// Given a pre-check error, attempt to automatically correct it + // #[instrument(skip(self, error))] + // pub(crate) async fn pre_install_fix( + // &self, + // error: RecoverableCheck, + // ) -> Result<(), UnrecoverableCheck> { + // // Depending on what error occurred, try to fix the error. + // // If we handle the error successfully, return Ok(()) to indicate success + // // If we cannot handle this error, wrap it in UnrecoverableCheck::FailedRecovery + // match error { + // RecoverableCheck::MissingSystemChart if self.config.install_sys => { + // println!("Fluvio system chart not installed. Attempting to install"); + // + // // Use closure to catch errors + // let result = (|| -> Result<(), SysInstallError> { + // let config: SysConfig = SysConfig::builder() + // .with_namespace(&self.config.namespace) + // .with_cloud(&self.config.cloud) + // .build()?; + // let sys_installer = SysInstaller::with_config(config)?; + // sys_installer.install()?; + // Ok(()) + // })(); + // result.map_err(|_| UnrecoverableCheck::FailedRecovery(error))?; + // } + // RecoverableCheck::MinikubeTunnelNotFoundRetry => { + // println!( + // "Load balancer service is not available, trying to bring up minikube tunnel" + // ); + // self._try_minikube_tunnel() + // .await + // .map_err(|_| UnrecoverableCheck::FailedRecovery(error))?; + // } + // unhandled => { + // warn!("Pre-install was unable to autofix an error"); + // return Err(UnrecoverableCheck::FailedRecovery(unhandled)); + // } + // } + // + // Ok(()) + // } /// Installs Fluvio according to the installer's configuration /// diff --git a/src/cluster/src/start/local.rs b/src/cluster/src/start/local.rs index c50b5ff093..649815b246 100644 --- a/src/cluster/src/start/local.rs +++ b/src/cluster/src/start/local.rs @@ -17,12 +17,11 @@ use k8_types::{InputK8Obj, InputObjectMeta}; use k8_client::SharedK8Client; use crate::{ - LocalInstallError, ClusterError, UnrecoverableCheck, StartStatus, ClusterChecker, - ChartLocation, DEFAULT_CHART_REMOTE, + LocalInstallError, ClusterError, StartStatus, ClusterChecker, ChartLocation, + DEFAULT_CHART_REMOTE, SysConfig, }; -use crate::check::{RecoverableCheck, CheckResults}; +use crate::check::{CheckResults, SysChartCheck}; use crate::check::render::render_check_progress; -use crate::sys::{SysConfig, SysInstaller}; use fluvio_command::CommandExt; const LOCAL_SC_ADDRESS: &str = "localhost:9003"; @@ -323,63 +322,68 @@ impl LocalClusterInstaller { /// and tries to auto-fix the issues observed pub async fn setup(&self) -> CheckResults { println!("Performing pre-flight checks"); - let install_sys = self.config.install_sys; - let chart_location = self.config.chart_location.clone(); - let fix = move |err| Self::pre_install_fix(install_sys, chart_location.clone(), err); + // let install_sys = self.config.install_sys; + // let chart_location = self.config.chart_location.clone(); + let sys_config: SysConfig = SysConfig::builder() + .with_chart_location(self.config.chart_location.clone()) + .build() + .expect("TODO remove"); if self.config.render_checks { let mut progress = ClusterChecker::empty() .with_local_checks() - .run_and_fix_with_progress(fix); + .with_check(SysChartCheck::new(sys_config)) + .run_and_fix_with_progress(); render_check_progress(&mut progress).await } else { ClusterChecker::empty() .with_local_checks() - .run_wait_and_fix(fix) + .with_check(SysChartCheck::new(sys_config)) + .run_wait_and_fix() .await } } - /// Given a pre-check error, attempt to automatically correct it - #[instrument(skip(error))] - async fn pre_install_fix( - install_sys: bool, - sys_chart_location: ChartLocation, - error: RecoverableCheck, - ) -> Result<(), UnrecoverableCheck> { - // Depending on what error occurred, try to fix the error. - // If we handle the error successfully, return Ok(()) to indicate success - // If we cannot handle this error, return it to bubble up - match error { - RecoverableCheck::MissingSystemChart if install_sys => { - debug!("Fluvio system chart not installed. Attempting to install"); - - // Use closure to catch any errors - let result = (|| -> Result<_, ClusterError> { - let mut builder = SysConfig::builder(); - if let ChartLocation::Local(path) = &sys_chart_location { - builder.with_local_chart(path); - } - - let config: SysConfig = builder.build()?; - let installer = SysInstaller::with_config(config)?; - installer.install()?; - Ok(()) - })(); - - // If any errors occurred, recovery failed - if result.is_err() { - return Err(UnrecoverableCheck::FailedRecovery(error)); - } - } - unhandled => { - warn!("Pre-install was unable to autofix an error"); - return Err(UnrecoverableCheck::FailedRecovery(unhandled)); - } - } - - Ok(()) - } + // /// Given a pre-check error, attempt to automatically correct it + // #[instrument(skip(error))] + // async fn pre_install_fix( + // install_sys: bool, + // sys_chart_location: ChartLocation, + // error: RecoverableCheck, + // ) -> Result<(), UnrecoverableCheck> { + // // Depending on what error occurred, try to fix the error. + // // If we handle the error successfully, return Ok(()) to indicate success + // // If we cannot handle this error, return it to bubble up + // match error { + // RecoverableCheck::MissingSystemChart if install_sys => { + // debug!("Fluvio system chart not installed. Attempting to install"); + // + // // Use closure to catch any errors + // let result = (|| -> Result<_, ClusterError> { + // let mut builder = SysConfig::builder(); + // if let ChartLocation::Local(path) = &sys_chart_location { + // builder.with_local_chart(path); + // } + // + // let config: SysConfig = builder.build()?; + // let installer = SysInstaller::with_config(config)?; + // installer.install()?; + // Ok(()) + // })(); + // + // // If any errors occurred, recovery failed + // if result.is_err() { + // return Err(UnrecoverableCheck::FailedRecovery(error)); + // } + // } + // unhandled => { + // warn!("Pre-install was unable to autofix an error"); + // return Err(UnrecoverableCheck::FailedRecovery(unhandled)); + // } + // } + // + // Ok(()) + // } /// Install fluvio locally #[instrument(skip(self))] diff --git a/src/cluster/src/sys.rs b/src/cluster/src/sys.rs index 39b035cfca..9baa49bbec 100644 --- a/src/cluster/src/sys.rs +++ b/src/cluster/src/sys.rs @@ -38,7 +38,6 @@ pub struct SysConfig { #[builder(setter(into))] pub namespace: String, /// The location at which to find the system chart to install - #[builder(private)] pub chart_location: ChartLocation, } @@ -204,7 +203,7 @@ mod tests { #[test] fn test_config_builder() { - let config = SysConfig::builder() + let _config = SysConfig::builder() .with_namespace("fluvio") .with_cloud("minikube") .with_local_chart("./helm/fluvio-sys") From bc6dcf906da43f11dfda7b49c2d3837a4006623a Mon Sep 17 00:00:00 2001 From: Nick Mosher Date: Mon, 25 Jan 2021 17:14:13 -0500 Subject: [PATCH 05/10] Add progress rendering checks to K8Installer --- src/cluster/src/cli/start/k8.rs | 15 +++--- src/cluster/src/start/k8.rs | 87 ++++++++++++--------------------- src/cluster/src/start/local.rs | 43 ---------------- 3 files changed, 38 insertions(+), 107 deletions(-) diff --git a/src/cluster/src/cli/start/k8.rs b/src/cluster/src/cli/start/k8.rs index 201d0bd940..3106607613 100644 --- a/src/cluster/src/cli/start/k8.rs +++ b/src/cluster/src/cli/start/k8.rs @@ -9,7 +9,8 @@ use crate::{ClusterInstaller, ClusterError, K8InstallError, StartStatus}; use crate::cli::ClusterCliError; use crate::cli::start::StartOpt; use crate::check::render::{ - render_check_statuses, render_statuses_next_steps, render_check_results, + render_statuses_next_steps, + render_check_results, render_results_next_steps, }; @@ -27,6 +28,7 @@ pub async fn install_core( .with_save_profile(!opt.skip_profile_creation) .with_tls(client, server) .with_chart_values(opt.k8_config.chart_values) + .with_render_checks(true) .with_upgrade(upgrade); if skip_sys { @@ -97,15 +99,14 @@ pub async fn install_core( println!("Successfully installed Fluvio!"); } // Successfully performed startup with pre-checks - Ok(StartStatus { - checks: Some(checks), - .. - }) => { - render_check_statuses(&checks); + Ok(StartStatus { checks, .. }) => { + if checks.is_none() { + println!("Skipped pre-start checks"); + } + println!("Successfully installed Fluvio!"); } // Aborted startup because pre-checks failed Err(ClusterError::InstallK8(K8InstallError::FailedPrecheck(check_statuses))) => { - render_check_statuses(&check_statuses); render_statuses_next_steps(&check_statuses); } // Another type of error occurred during checking or startup diff --git a/src/cluster/src/start/k8.rs b/src/cluster/src/start/k8.rs index 1b16bf4339..19be34890a 100644 --- a/src/cluster/src/start/k8.rs +++ b/src/cluster/src/start/k8.rs @@ -28,6 +28,7 @@ use crate::{ ClusterError, StartStatus, DEFAULT_NAMESPACE, DEFAULT_CHART_APP_REPO, CheckStatus, ClusterChecker, CheckStatuses, DEFAULT_CHART_REMOTE, ChartLocation, SysConfig, }; +use crate::check::render::render_check_progress; use fluvio_command::CommandExt; const DEFAULT_REGISTRY: &str = "infinyon"; @@ -96,6 +97,8 @@ pub struct ClusterInstallerBuilder { skip_spu_liveness_check: bool, /// chart values chart_values: Vec, + /// Whether to print check results as they are performed + render_checks: bool, } impl ClusterInstallerBuilder { @@ -520,6 +523,24 @@ impl ClusterInstallerBuilder { self.chart_values = values; self } + + /// Whether to render pre-install checks to stdout as they are performed. + /// + /// Defaults to `false`. + /// + /// # Example + /// + /// ```no_run + /// # use fluvio_cluster::LocalClusterInstaller; + /// let installer = LocalClusterInstaller::new() + /// .with_render_checks(true) + /// .build() + /// .unwrap(); + /// ``` + pub fn with_render_checks(mut self, render_checks: bool) -> Self { + self.render_checks = render_checks; + self + } } /// Allows installing Fluvio on a Kubernetes cluster @@ -625,6 +646,7 @@ impl ClusterInstaller { use_cluster_ip: false, skip_spu_liveness_check: false, chart_values: vec![], + render_checks: false, } } @@ -660,67 +682,18 @@ impl ClusterInstaller { let mut checker = ClusterChecker::empty() .with_k8_checks() .with_check(SysChartCheck::new(sys_config)); + if !self.config.upgrade { checker = checker.with_check(AlreadyInstalled); } - checker.run_wait_and_fix().await - } - // async fn _try_minikube_tunnel(&self) -> Result<(), K8InstallError> { - // let log_file = File::create("/tmp/tunnel.out")?; - // let error_file = log_file.try_clone()?; - // - // // run minikube tunnel > /tmp/tunnel.out 2> /tmp/tunnel.out - // Command::new("minikube") - // .arg("tunnel") - // .stdout(Stdio::from(log_file)) - // .stderr(Stdio::from(error_file)) - // .spawn()?; - // sleep(Duration::from_millis(DELAY)).await; - // Ok(()) - // } - - // /// Given a pre-check error, attempt to automatically correct it - // #[instrument(skip(self, error))] - // pub(crate) async fn pre_install_fix( - // &self, - // error: RecoverableCheck, - // ) -> Result<(), UnrecoverableCheck> { - // // Depending on what error occurred, try to fix the error. - // // If we handle the error successfully, return Ok(()) to indicate success - // // If we cannot handle this error, wrap it in UnrecoverableCheck::FailedRecovery - // match error { - // RecoverableCheck::MissingSystemChart if self.config.install_sys => { - // println!("Fluvio system chart not installed. Attempting to install"); - // - // // Use closure to catch errors - // let result = (|| -> Result<(), SysInstallError> { - // let config: SysConfig = SysConfig::builder() - // .with_namespace(&self.config.namespace) - // .with_cloud(&self.config.cloud) - // .build()?; - // let sys_installer = SysInstaller::with_config(config)?; - // sys_installer.install()?; - // Ok(()) - // })(); - // result.map_err(|_| UnrecoverableCheck::FailedRecovery(error))?; - // } - // RecoverableCheck::MinikubeTunnelNotFoundRetry => { - // println!( - // "Load balancer service is not available, trying to bring up minikube tunnel" - // ); - // self._try_minikube_tunnel() - // .await - // .map_err(|_| UnrecoverableCheck::FailedRecovery(error))?; - // } - // unhandled => { - // warn!("Pre-install was unable to autofix an error"); - // return Err(UnrecoverableCheck::FailedRecovery(unhandled)); - // } - // } - // - // Ok(()) - // } + if self.config.render_checks { + let mut progress = checker.run_and_fix_with_progress(); + render_check_progress(&mut progress).await + } else { + checker.run_wait_and_fix().await + } + } /// Installs Fluvio according to the installer's configuration /// diff --git a/src/cluster/src/start/local.rs b/src/cluster/src/start/local.rs index 649815b246..571e6d802e 100644 --- a/src/cluster/src/start/local.rs +++ b/src/cluster/src/start/local.rs @@ -322,8 +322,6 @@ impl LocalClusterInstaller { /// and tries to auto-fix the issues observed pub async fn setup(&self) -> CheckResults { println!("Performing pre-flight checks"); - // let install_sys = self.config.install_sys; - // let chart_location = self.config.chart_location.clone(); let sys_config: SysConfig = SysConfig::builder() .with_chart_location(self.config.chart_location.clone()) .build() @@ -344,47 +342,6 @@ impl LocalClusterInstaller { } } - // /// Given a pre-check error, attempt to automatically correct it - // #[instrument(skip(error))] - // async fn pre_install_fix( - // install_sys: bool, - // sys_chart_location: ChartLocation, - // error: RecoverableCheck, - // ) -> Result<(), UnrecoverableCheck> { - // // Depending on what error occurred, try to fix the error. - // // If we handle the error successfully, return Ok(()) to indicate success - // // If we cannot handle this error, return it to bubble up - // match error { - // RecoverableCheck::MissingSystemChart if install_sys => { - // debug!("Fluvio system chart not installed. Attempting to install"); - // - // // Use closure to catch any errors - // let result = (|| -> Result<_, ClusterError> { - // let mut builder = SysConfig::builder(); - // if let ChartLocation::Local(path) = &sys_chart_location { - // builder.with_local_chart(path); - // } - // - // let config: SysConfig = builder.build()?; - // let installer = SysInstaller::with_config(config)?; - // installer.install()?; - // Ok(()) - // })(); - // - // // If any errors occurred, recovery failed - // if result.is_err() { - // return Err(UnrecoverableCheck::FailedRecovery(error)); - // } - // } - // unhandled => { - // warn!("Pre-install was unable to autofix an error"); - // return Err(UnrecoverableCheck::FailedRecovery(unhandled)); - // } - // } - // - // Ok(()) - // } - /// Install fluvio locally #[instrument(skip(self))] pub async fn install(&self) -> Result { From e4f8ebd29c467be2a06574c91f7c265ca6d468b9 Mon Sep 17 00:00:00 2001 From: Nick Mosher Date: Wed, 27 Jan 2021 09:57:33 -0500 Subject: [PATCH 06/10] Move check implementations into ClusterCheck impls --- src/cluster/src/check/mod.rs | 274 +++++++++++++++++------------------ 1 file changed, 131 insertions(+), 143 deletions(-) diff --git a/src/cluster/src/check/mod.rs b/src/cluster/src/check/mod.rs index 697aaba92c..539fa16e7e 100644 --- a/src/cluster/src/check/mod.rs +++ b/src/cluster/src/check/mod.rs @@ -21,7 +21,10 @@ use k8_types::InputObjectMeta; use k8_types::core::service::ServiceSpec; use k8_client::ClientError as K8ClientError; -use crate::{DEFAULT_NAMESPACE, DEFAULT_CHART_SYS_REPO, DEFAULT_CHART_APP_REPO, DEFAULT_HELM_VERSION, SysConfig, SysInstaller}; +use crate::{ + DEFAULT_NAMESPACE, DEFAULT_CHART_SYS_REPO, DEFAULT_CHART_APP_REPO, DEFAULT_HELM_VERSION, + SysConfig, SysInstaller, +}; use crate::error::SysInstallError; use std::fs::File; @@ -273,8 +276,50 @@ pub(crate) struct LoadableConfig; #[async_trait] impl ClusterCheck for LoadableConfig { + /// Checks that we can connect to Kubernetes via the active context async fn perform_check(&self) -> CheckResult { - check_cluster_connection() + let config = match K8Config::load() { + Ok(config) => config, + Err(K8ConfigError::NoCurrentContext) => { + return Ok(CheckStatus::fail( + UnrecoverableCheck::NoActiveKubernetesContext, + )); + } + Err(other) => return Err(CheckError::K8ConfigError(other)), + }; + + let context = match config { + K8Config::Pod(_) => { + return Ok(CheckStatus::pass("Pod config found, ignoring the check")) + } + K8Config::KubeConfig(context) => context, + }; + + let cluster_context = match context.config.current_cluster() { + Some(context) => context, + None => { + return Ok(CheckStatus::fail( + UnrecoverableCheck::NoActiveKubernetesContext, + )); + } + }; + + let server_url = &cluster_context.cluster.server; + + // Check that the server URL has a hostname, not just an IP + let host_present = Url::parse(server_url) + .ok() + .and_then(|it| it.host().map(|host| host.to_string())) + .map(|it| !it.is_empty()) + .unwrap_or(false); + + if !host_present { + return Ok(CheckStatus::fail( + UnrecoverableCheck::MissingKubernetesServerHost, + )); + } + + Ok(CheckStatus::pass("Kubernetes config is loadable")) } } @@ -283,8 +328,53 @@ pub(crate) struct K8Version; #[async_trait] impl ClusterCheck for K8Version { + /// Check if required kubectl version is installed async fn perform_check(&self) -> CheckResult { - k8_version_check() + let kube_version = Command::new("kubectl") + .arg("version") + .arg("-o=json") + .output() + .map_err(CheckError::KubectlNotFoundError)?; + let version_text = String::from_utf8(kube_version.stdout).unwrap(); + + #[derive(Debug, serde::Deserialize)] + #[serde(rename_all = "camelCase")] + struct ComponentVersion { + git_version: String, + } + + #[derive(Debug, serde::Deserialize)] + #[serde(rename_all = "camelCase")] + struct KubernetesVersion { + client_version: ComponentVersion, + server_version: Option, + } + + let kube_versions: KubernetesVersion = + serde_json::from_str(&version_text).map_err(CheckError::KubectlVersionJsonError)?; + + let server_version = match kube_versions.server_version { + Some(version) => version.git_version, + None => { + return Ok(CheckStatus::fail( + UnrecoverableCheck::CannotConnectToKubernetes, + )) + } + }; + + // Trim off the `v` in v0.1.2 to get just "0.1.2" + let server_version = &server_version[1..]; + if Version::parse(&server_version) < Version::parse(KUBE_VERSION) { + return Ok(CheckStatus::fail( + UnrecoverableCheck::IncompatibleKubectlVersion { + installed: server_version.to_string(), + required: KUBE_VERSION.to_string(), + }, + )); + } + Ok(CheckStatus::pass( + "Supported kubernetes version is installed", + )) } } @@ -293,9 +383,20 @@ pub(crate) struct HelmVersion; #[async_trait] impl ClusterCheck for HelmVersion { + /// Checks that the installed helm version is compatible with the installer requirements async fn perform_check(&self) -> CheckResult { - let helm_client = HelmClient::new().map_err(CheckError::HelmError)?; - check_helm_version(&helm_client, DEFAULT_HELM_VERSION) + let helm = HelmClient::new().map_err(CheckError::HelmError)?; + let helm_version = helm.get_helm_version().map_err(CheckError::HelmError)?; + let required = DEFAULT_HELM_VERSION; + if Version::parse(&helm_version) < Version::parse(required) { + return Ok(CheckStatus::fail( + UnrecoverableCheck::IncompatibleHelmVersion { + installed: helm_version, + required: required.to_string(), + }, + )); + } + Ok(CheckStatus::pass("Supported helm version is installed")) } } @@ -312,9 +413,20 @@ impl SysChartCheck { #[async_trait] impl ClusterCheck for SysChartCheck { + /// Check that the system chart is installed + /// This uses whatever namespace it is being called async fn perform_check(&self) -> CheckResult { - let helm_client = HelmClient::new().map_err(CheckError::HelmError)?; - check_system_chart(&helm_client, DEFAULT_CHART_SYS_REPO) + let helm = HelmClient::new().map_err(CheckError::HelmError)?; + // check installed system chart version + let sys_charts = helm + .get_installed_chart_by_name(DEFAULT_CHART_SYS_REPO, None) + .map_err(CheckError::HelmError)?; + if sys_charts.is_empty() { + return Ok(CheckStatus::fail(RecoverableCheck::MissingSystemChart)); + } else if sys_charts.len() > 1 { + return Ok(CheckStatus::fail(UnrecoverableCheck::MultipleSystemCharts)); + } + Ok(CheckStatus::pass("Fluvio system charts are installed")) } async fn attempt_fix(&self, error: RecoverableCheck) -> Result<(), UnrecoverableCheck> { @@ -340,9 +452,16 @@ pub(crate) struct AlreadyInstalled; #[async_trait] impl ClusterCheck for AlreadyInstalled { + /// Checks that Fluvio is not already installed async fn perform_check(&self) -> CheckResult { - let helm_client = HelmClient::new().map_err(CheckError::HelmError)?; - check_already_installed(&helm_client, DEFAULT_CHART_APP_REPO) + let helm = HelmClient::new().map_err(CheckError::HelmError)?; + let app_charts = helm + .get_installed_chart_by_name(DEFAULT_CHART_APP_REPO, None) + .map_err(CheckError::HelmError)?; + if !app_charts.is_empty() { + return Ok(CheckStatus::fail(CheckFailed::AlreadyInstalled)); + } + Ok(CheckStatus::pass("Previous fluvio installation not found")) } } @@ -384,7 +503,9 @@ impl ClusterCheck for LoadBalancer { async fn perform_check(&self) -> CheckResult { let config = K8Config::load().map_err(CheckError::K8ConfigError)?; let context = match config { - K8Config::Pod(_) => return Ok(CheckStatus::pass("Pod config found, ignoring the check")), + K8Config::Pod(_) => { + return Ok(CheckStatus::pass("Pod config found, ignoring the check")) + } K8Config::KubeConfig(context) => context, }; @@ -710,46 +831,6 @@ impl ClusterChecker { } } -/// Checks that the installed helm version is compatible with the installer requirements -pub(crate) fn check_helm_version(helm: &HelmClient, required: &str) -> CheckResult { - let helm_version = helm.get_helm_version().map_err(CheckError::HelmError)?; - if Version::parse(&helm_version) < Version::parse(required) { - return Ok(CheckStatus::fail( - UnrecoverableCheck::IncompatibleHelmVersion { - installed: helm_version, - required: required.to_string(), - }, - )); - } - Ok(CheckStatus::pass("Supported helm version is installed")) -} - -/// Check that the system chart is installed -/// This uses whatever namespace it is being called -pub(crate) fn check_system_chart(helm: &HelmClient, sys_repo: &str) -> CheckResult { - // check installed system chart version - let sys_charts = helm - .get_installed_chart_by_name(sys_repo, None) - .map_err(CheckError::HelmError)?; - if sys_charts.is_empty() { - return Ok(CheckStatus::fail(RecoverableCheck::MissingSystemChart)); - } else if sys_charts.len() > 1 { - return Ok(CheckStatus::fail(UnrecoverableCheck::MultipleSystemCharts)); - } - Ok(CheckStatus::pass("Fluvio system charts are installed")) -} - -/// Checks that Fluvio is not already installed -pub(crate) fn check_already_installed(helm: &HelmClient, app_repo: &str) -> CheckResult { - let app_charts = helm - .get_installed_chart_by_name(app_repo, None) - .map_err(CheckError::HelmError)?; - if !app_charts.is_empty() { - return Ok(CheckStatus::fail(CheckFailed::AlreadyInstalled)); - } - Ok(CheckStatus::pass("Previous fluvio installation not found")) -} - fn create_dummy_service() -> Result<(), CheckError> { Command::new("kubectl") .arg("create") @@ -811,99 +892,6 @@ fn get_tunnel_error() -> CheckFailed { RecoverableCheck::MinikubeTunnelNotFoundRetry.into() } -/// Checks that we can connect to Kubernetes via the active context -fn check_cluster_connection() -> CheckResult { - let config = match K8Config::load() { - Ok(config) => config, - Err(K8ConfigError::NoCurrentContext) => { - return Ok(CheckStatus::fail( - UnrecoverableCheck::NoActiveKubernetesContext, - )); - } - Err(other) => return Err(CheckError::K8ConfigError(other)), - }; - - let context = match config { - K8Config::Pod(_) => return Ok(CheckStatus::pass("Pod config found, ignoring the check")), - K8Config::KubeConfig(context) => context, - }; - - let cluster_context = match context.config.current_cluster() { - Some(context) => context, - None => { - return Ok(CheckStatus::fail( - UnrecoverableCheck::NoActiveKubernetesContext, - )); - } - }; - - let server_url = &cluster_context.cluster.server; - - // Check that the server URL has a hostname, not just an IP - let host_present = Url::parse(server_url) - .ok() - .and_then(|it| it.host().map(|host| host.to_string())) - .map(|it| !it.is_empty()) - .unwrap_or(false); - - if !host_present { - return Ok(CheckStatus::fail( - UnrecoverableCheck::MissingKubernetesServerHost, - )); - } - - Ok(CheckStatus::pass("Kubernetes config is loadable")) -} - -// Check if required kubectl version is installed -fn k8_version_check() -> CheckResult { - let kube_version = Command::new("kubectl") - .arg("version") - .arg("-o=json") - .output() - .map_err(CheckError::KubectlNotFoundError)?; - let version_text = String::from_utf8(kube_version.stdout).unwrap(); - - #[derive(Debug, serde::Deserialize)] - #[serde(rename_all = "camelCase")] - struct ComponentVersion { - git_version: String, - } - - #[derive(Debug, serde::Deserialize)] - #[serde(rename_all = "camelCase")] - struct KubernetesVersion { - client_version: ComponentVersion, - server_version: Option, - } - - let kube_versions: KubernetesVersion = - serde_json::from_str(&version_text).map_err(CheckError::KubectlVersionJsonError)?; - - let server_version = match kube_versions.server_version { - Some(version) => version.git_version, - None => { - return Ok(CheckStatus::fail( - UnrecoverableCheck::CannotConnectToKubernetes, - )) - } - }; - - // Trim off the `v` in v0.1.2 to get just "0.1.2" - let server_version = &server_version[1..]; - if Version::parse(&server_version) < Version::parse(KUBE_VERSION) { - return Ok(CheckStatus::fail( - UnrecoverableCheck::IncompatibleKubectlVersion { - installed: server_version.to_string(), - required: KUBE_VERSION.to_string(), - }, - )); - } - Ok(CheckStatus::pass( - "Supported kubernetes version is installed", - )) -} - fn check_permission(resource: &str) -> CheckResult { let res = check_create_permission(resource)?; if !res { From 778817f0dd4727e09cdd22b154fe6735d1d97dfb Mon Sep 17 00:00:00 2001 From: Nick Mosher Date: Wed, 27 Jan 2021 10:17:12 -0500 Subject: [PATCH 07/10] Apply cargo fmt --- src/cluster/src/cli/start/k8.rs | 4 +--- src/cluster/src/sys.rs | 22 ++++++++++++++++------ 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/cluster/src/cli/start/k8.rs b/src/cluster/src/cli/start/k8.rs index 3106607613..58fba8ce29 100644 --- a/src/cluster/src/cli/start/k8.rs +++ b/src/cluster/src/cli/start/k8.rs @@ -9,9 +9,7 @@ use crate::{ClusterInstaller, ClusterError, K8InstallError, StartStatus}; use crate::cli::ClusterCliError; use crate::cli::start::StartOpt; use crate::check::render::{ - render_statuses_next_steps, - render_check_results, - render_results_next_steps, + render_statuses_next_steps, render_check_results, render_results_next_steps, }; pub async fn install_core( diff --git a/src/cluster/src/sys.rs b/src/cluster/src/sys.rs index 9baa49bbec..17935887fd 100644 --- a/src/cluster/src/sys.rs +++ b/src/cluster/src/sys.rs @@ -1,6 +1,9 @@ use tracing::{info, debug, instrument}; use derive_builder::Builder; -use crate::{ChartLocation, DEFAULT_CHART_APP_REPO, DEFAULT_CHART_SYS_REPO, DEFAULT_NAMESPACE, DEFAULT_CHART_REMOTE}; +use crate::{ + ChartLocation, DEFAULT_CHART_APP_REPO, DEFAULT_CHART_SYS_REPO, DEFAULT_NAMESPACE, + DEFAULT_CHART_REMOTE, +}; use fluvio_helm::{HelmClient, InstallArg}; use std::path::{Path, PathBuf}; use crate::error::SysInstallError; @@ -58,9 +61,17 @@ impl SysConfig { impl SysConfigBuilder { /// Validates all builder options and constructs a `SysConfig` pub fn build(&self) -> Result { - let cloud = self.cloud.clone().unwrap_or_else(|| DEFAULT_CLOUD_NAME.to_string()); - let namespace = self.namespace.clone().unwrap_or_else(|| DEFAULT_NAMESPACE.to_string()); - let chart_location = self.chart_location.clone() + let cloud = self + .cloud + .clone() + .unwrap_or_else(|| DEFAULT_CLOUD_NAME.to_string()); + let namespace = self + .namespace + .clone() + .unwrap_or_else(|| DEFAULT_NAMESPACE.to_string()); + let chart_location = self + .chart_location + .clone() .unwrap_or_else(|| ChartLocation::Remote(DEFAULT_CHART_REMOTE.to_string())); Ok(SysConfig { @@ -158,8 +169,7 @@ impl SysInstaller { settings: Vec<(String, String)>, ) -> Result<(), SysInstallError> { debug!(?chart, "Using remote helm chart:"); - self.helm_client - .repo_add(DEFAULT_CHART_APP_REPO, chart)?; + self.helm_client.repo_add(DEFAULT_CHART_APP_REPO, chart)?; self.helm_client.repo_update()?; let args = InstallArg::new(DEFAULT_CHART_SYS_REPO, DEFAULT_CHART_SYS_NAME) .namespace(&self.config.namespace) From 82534e527d4c2f039584c65e903c230e2c494845 Mon Sep 17 00:00:00 2001 From: Nick Mosher Date: Wed, 27 Jan 2021 10:24:58 -0500 Subject: [PATCH 08/10] Apply cargo clippy --- src/cluster/src/check/mod.rs | 2 +- src/cluster/src/check/render.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cluster/src/check/mod.rs b/src/cluster/src/check/mod.rs index 539fa16e7e..4d21f15c9b 100644 --- a/src/cluster/src/check/mod.rs +++ b/src/cluster/src/check/mod.rs @@ -558,7 +558,7 @@ impl ClusterCheck for LoadBalancer { Ok(()) })(); - if let Err(_) = result { + if result.is_err() { return Err(UnrecoverableCheck::FailedRecovery(error)); } diff --git a/src/cluster/src/check/render.rs b/src/cluster/src/check/render.rs index 5f2810ee8b..429b9dd5e0 100644 --- a/src/cluster/src/check/render.rs +++ b/src/cluster/src/check/render.rs @@ -1,3 +1,5 @@ +#![allow(unused)] + use async_channel::Receiver; use crate::{CheckStatus, CheckResult, CheckResults, CheckFailed, CheckSuggestion}; From 97c806b69a5109d225ca8a377196cb02e31672a2 Mon Sep 17 00:00:00 2001 From: Nick Mosher Date: Wed, 27 Jan 2021 13:44:57 -0500 Subject: [PATCH 09/10] Add builder method for conditional chaining --- src/cluster/src/check/mod.rs | 24 ++++------------------- src/cluster/src/cli/start/sys.rs | 26 ++++++++++--------------- src/cluster/src/error.rs | 1 + src/cluster/src/lib.rs | 2 +- src/cluster/src/sys.rs | 33 ++++++++++++++++++++++++++++++++ 5 files changed, 49 insertions(+), 37 deletions(-) diff --git a/src/cluster/src/check/mod.rs b/src/cluster/src/check/mod.rs index 4d21f15c9b..4a00addc29 100644 --- a/src/cluster/src/check/mod.rs +++ b/src/cluster/src/check/mod.rs @@ -8,7 +8,7 @@ pub mod render; use tracing::warn; use async_trait::async_trait; use async_channel::Receiver; -use url::{Url, ParseError}; +use url::ParseError; use semver::Version; use serde_json::Error as JsonError; @@ -295,7 +295,7 @@ impl ClusterCheck for LoadableConfig { K8Config::KubeConfig(context) => context, }; - let cluster_context = match context.config.current_cluster() { + let _cluster_context = match context.config.current_cluster() { Some(context) => context, None => { return Ok(CheckStatus::fail( @@ -304,21 +304,6 @@ impl ClusterCheck for LoadableConfig { } }; - let server_url = &cluster_context.cluster.server; - - // Check that the server URL has a hostname, not just an IP - let host_present = Url::parse(server_url) - .ok() - .and_then(|it| it.host().map(|host| host.to_string())) - .map(|it| !it.is_empty()) - .unwrap_or(false); - - if !host_present { - return Ok(CheckStatus::fail( - UnrecoverableCheck::MissingKubernetesServerHost, - )); - } - Ok(CheckStatus::pass("Kubernetes config is loadable")) } } @@ -335,7 +320,6 @@ impl ClusterCheck for K8Version { .arg("-o=json") .output() .map_err(CheckError::KubectlNotFoundError)?; - let version_text = String::from_utf8(kube_version.stdout).unwrap(); #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] @@ -350,8 +334,8 @@ impl ClusterCheck for K8Version { server_version: Option, } - let kube_versions: KubernetesVersion = - serde_json::from_str(&version_text).map_err(CheckError::KubectlVersionJsonError)?; + let kube_versions: KubernetesVersion = serde_json::from_slice(&kube_version.stdout) + .map_err(CheckError::KubectlVersionJsonError)?; let server_version = match kube_versions.server_version { Some(version) => version.git_version, diff --git a/src/cluster/src/cli/start/sys.rs b/src/cluster/src/cli/start/sys.rs index 3005636757..6818552e42 100644 --- a/src/cluster/src/cli/start/sys.rs +++ b/src/cluster/src/cli/start/sys.rs @@ -10,22 +10,16 @@ pub fn install_sys(opt: StartOpt, upgrade: bool) -> Result<(), ClusterCliError> } fn install_sys_impl(opt: StartOpt, upgrade: bool) -> Result<(), SysInstallError> { - let mut builder = SysConfig::builder(); - builder.with_namespace(opt.k8_config.namespace); - - match opt.k8_config.chart_location { - // If a chart location is given, use it - Some(chart_location) => { - builder.with_local_chart(chart_location); - } - // If we're in develop mode (but no explicit chart location), use local path - None if opt.develop => { - builder.with_local_chart("./k8-util/helm"); - } - _ => (), - } - - let config = builder.build()?; + let config = SysConfig::builder() + .with_namespace(&opt.k8_config.namespace) + .with(|builder| match &opt.k8_config.chart_location { + // If a chart location is given, use it + Some(chart_location) => builder.with_local_chart(chart_location), + // If we're in develop mode (but no explicit chart location), use local path + None if opt.develop => builder.with_local_chart("./k8-util/helm"), + _ => builder, + }) + .build()?; let installer = SysInstaller::with_config(config)?; if upgrade { diff --git a/src/cluster/src/error.rs b/src/cluster/src/error.rs index 06c1eefc1c..7af70a5808 100644 --- a/src/cluster/src/error.rs +++ b/src/cluster/src/error.rs @@ -24,6 +24,7 @@ pub enum ClusterError { Uninstall(#[from] UninstallError), } +/// Errors that may occur while trying to install Fluvio system charts #[derive(thiserror::Error, Debug)] pub enum SysInstallError { /// An error occurred while running helm. diff --git a/src/cluster/src/lib.rs b/src/cluster/src/lib.rs index 21213f8696..d44a5e5151 100644 --- a/src/cluster/src/lib.rs +++ b/src/cluster/src/lib.rs @@ -35,7 +35,7 @@ use fluvio_helm as helm; pub use start::k8::{ClusterInstaller, ClusterInstallerBuilder}; pub use start::local::LocalClusterInstaller; -pub use error::{ClusterError, K8InstallError, LocalInstallError, UninstallError}; +pub use error::{ClusterError, K8InstallError, LocalInstallError, UninstallError, SysInstallError}; pub use helm::HelmError; pub use check::{ClusterChecker, CheckStatus, CheckStatuses, CheckResult, CheckResults}; pub use check::{RecoverableCheck, UnrecoverableCheck, CheckFailed, CheckSuggestion}; diff --git a/src/cluster/src/sys.rs b/src/cluster/src/sys.rs index 17935887fd..294257c0d6 100644 --- a/src/cluster/src/sys.rs +++ b/src/cluster/src/sys.rs @@ -110,6 +110,39 @@ impl SysConfigBuilder { self.chart_location = Some(ChartLocation::Remote(location.into())); self } + + /// A builder helper for conditionally setting options + /// + /// This is useful for maintaining a fluid call chain even when + /// we only want to set certain options conditionally. + /// + /// # Example + /// + /// ``` + /// # use fluvio_cluster::{SysConfig, SysInstallError}; + /// # fn do_thing() -> Result<(), SysInstallError> { + /// let should_use_custom_namespace = true; + /// let config: SysConfig = SysConfig::builder() + /// .with(|builder| { + /// if should_use_custom_namespace { + /// // Only use custom namespace in this condition + /// builder.with_namespace("my-namespace") + /// } + /// else { + /// // Otherwise don't edit the builder + /// builder + /// } + /// }) + /// .build()?; + /// # Ok(()) + /// # } + /// ``` + pub fn with(&mut self, f: F) -> &mut Self + where + F: Fn(&mut Self) -> &mut Self, + { + f(self) + } } /// Installs or upgrades the Fluvio system charts From cc5eca28a5c0d5e565bb74eceeada486bb8d30de Mon Sep 17 00:00:00 2001 From: Nick Mosher Date: Wed, 27 Jan 2021 15:49:58 -0500 Subject: [PATCH 10/10] Bump fluvio-helm dependency to 0.4.1 --- Cargo.lock | 21 ++++++++++++++++----- src/cluster/Cargo.toml | 2 +- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 062d5f2bb1..ddff06258e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1049,7 +1049,7 @@ dependencies = [ "colored", "derive_builder", "fluvio", - "fluvio-command", + "fluvio-command 0.1.0", "fluvio-controlplane-metadata", "fluvio-extension-common", "fluvio-future", @@ -1083,6 +1083,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "fluvio-command" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2dcf39df27ca6854ecf6adaf0e5cf83eafd668d27c1efc891ecc3a0b8a1f097" +dependencies = [ + "once_cell", + "thiserror", + "tracing", +] + [[package]] name = "fluvio-controlplane" version = "0.4.0" @@ -1200,11 +1211,11 @@ dependencies = [ [[package]] name = "fluvio-helm" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16a817c2db1feda2efcb63d7fa73438439ede7404a4db1bae6966d3e564c893b" +checksum = "10b6e3e6219becb39f3cc3a436ebbcdaf9a4cc1db70d310891e9b08a9f8baadf" dependencies = [ - "flv-util", + "fluvio-command 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde", "serde_json", "thiserror", @@ -1544,7 +1555,7 @@ dependencies = [ "async-trait", "bytes 0.5.6", "fluvio", - "fluvio-command", + "fluvio-command 0.1.0", "fluvio-controlplane-metadata", "fluvio-dataplane-protocol", "fluvio-future", diff --git a/src/cluster/Cargo.toml b/src/cluster/Cargo.toml index afd602c05f..81b7518e88 100644 --- a/src/cluster/Cargo.toml +++ b/src/cluster/Cargo.toml @@ -47,7 +47,7 @@ derive_builder = "0.9.0" # Fluvio dependencies fluvio = { version = "0.4.0", path = "../client", default-features = false } -fluvio-helm = "0.4.0" +fluvio-helm = "0.4.1" fluvio-future = { version = "0.1.13" } fluvio-command = { path = "../command" } fluvio-runner-local = { version = "0.2.0", path = "../extension-runner-local", optional = true }