diff --git a/Cargo.lock b/Cargo.lock index a193a5143bd..46387da4b4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3162,21 +3162,26 @@ dependencies = [ [[package]] name = "fluvio-stream-dispatcher" -version = "0.10.2" +version = "0.11.0" dependencies = [ + "anyhow", "async-channel", - "async-rwlock", + "async-lock", "async-trait", "event-listener 3.0.0", "fluvio-future", "fluvio-stream-model", "fluvio-types", "futures-lite", + "futures-util", + "k8-diff", "k8-metadata-client", "k8-types", "once_cell", "serde", "serde_json", + "serde_yaml 0.9.25", + "thiserror", "tokio", "tracing", ] @@ -4517,10 +4522,11 @@ dependencies = [ [[package]] name = "k8-client" -version = "10.1.0" +version = "11.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2b634e8388eaeab70b77666c6aa05b1d02943775528d55e8ce6c99bb4ee0f85" +checksum = "995113c3a7e549a780a67d0a38467b6b050773e36b70377ca2a479c1a0325376" dependencies = [ + "anyhow", "async-trait", "base64 0.13.1", "bytes 1.5.0", @@ -4537,7 +4543,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "serde_qs 0.10.1", + "serde_qs 0.12.0", "tokio", "tracing", ] @@ -4570,10 +4576,11 @@ dependencies = [ [[package]] name = "k8-metadata-client" -version = "5.1.0" +version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c229be3cd93fc47d21cca9ff4cac6ca32ec9b94a3443d125a839ddbbd1c642a8" +checksum = "576a47a3a1cbd5daae3a197601f378987c92ec22d2918d6b5e1c822d5b984f5d" dependencies = [ + "anyhow", "async-trait", "futures-util", "k8-diff", @@ -4586,9 +4593,9 @@ dependencies = [ [[package]] name = "k8-types" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8da4ee9d80b9ad358d185c48f222ce0e1ac5216bc8597e9776db8bbf6ccd4da8" +checksum = "9f604a19941d3a538c86e8d2944b99cec1acd46043d9544804f2aaf0342a8401" dependencies = [ "serde", "serde_json", @@ -6524,9 +6531,9 @@ dependencies = [ [[package]] name = "serde_qs" -version = "0.10.1" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cac3f1e2ca2fe333923a1ae72caca910b98ed0630bb35ef6f8c8517d6e81afa" +checksum = "0431a35568651e363364210c91983c1da5eb29404d9f0928b67d4ebcfa7d330c" dependencies = [ "percent-encoding", "serde", diff --git a/Cargo.toml b/Cargo.toml index a05410d7f77..4a29e1c63e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -144,10 +144,11 @@ fluvio-future = { version = "0.6.0", default-features = false } fluvio-helm = { version = "0.4.1" } flv-tls-proxy = { version = "0.8" } flv-util = { version = "0.5.2", default-features = false } -k8-client = { version = "10.1.0" } +k8-client = { version = "11.0.0" } k8-config = { version = "2.0.0" } -k8-metadata-client = { version = "5.1.0" } -k8-types = { version = "0.8.3" } +k8-metadata-client = { version = "6.0.0" } +k8-types = { version = "0.8.5" } +k8-diff = { version = "0.1.2" } trybuild = { branch = "check_option", git = "https://github.com/infinyon/trybuild" } # Internal fluvio dependencies @@ -170,7 +171,7 @@ fluvio-smartmodule = { version = "0.7.0", path = "crates/fluvio-smartmodule", de fluvio-socket = { version = "0.14.3", path = "crates/fluvio-socket", default-features = false } fluvio-spu-schema = { version = "0.14.0", path = "crates/fluvio-spu-schema", default-features = false } fluvio-storage = { path = "crates/fluvio-storage" } -fluvio-stream-dispatcher = { path = "crates/fluvio-stream-dispatcher" } +fluvio-stream-dispatcher = { version = "0.11.0", path = "crates/fluvio-stream-dispatcher" } fluvio-stream-model = { version = "0.9.3", path = "crates/fluvio-stream-model", default-features = false } fluvio-types = { version = "0.4.4", path = "crates/fluvio-types", default-features = false } diff --git a/crates/fluvio-auth/src/x509/identity.rs b/crates/fluvio-auth/src/x509/identity.rs index 1bb6baebf1e..46be341b91b 100644 --- a/crates/fluvio-auth/src/x509/identity.rs +++ b/crates/fluvio-auth/src/x509/identity.rs @@ -2,7 +2,7 @@ use serde::{Serialize, Deserialize}; use futures_util::stream::StreamExt; -use fluvio_protocol::api::{ResponseMessage}; +use fluvio_protocol::api::ResponseMessage; use fluvio_socket::FluvioSocket; use super::request::{AuthorizationScopes, AuthorizationApiRequest, AuthResponse}; diff --git a/crates/fluvio-cli/src/error.rs b/crates/fluvio-cli/src/error.rs index e488bd68a39..58f0ac5dc52 100644 --- a/crates/fluvio-cli/src/error.rs +++ b/crates/fluvio-cli/src/error.rs @@ -1,4 +1,4 @@ -use std::{convert::Infallible}; +use std::convert::Infallible; use handlebars::TemplateError; use indicatif::style::TemplateError as ProgressTemplateError; @@ -32,10 +32,6 @@ pub enum CliError { #[error("Kubernetes config error: {0}")] K8ConfigError(#[from] k8_config::ConfigError), - #[cfg(feature = "k8s")] - #[error("Kubernetes client error: {0}")] - K8ClientError(#[from] k8_client::ClientError), - /// An error occurred while processing the connector yaml #[error("Fluvio connector config: {0}")] ConnectorConfig(#[from] serde_yaml::Error), diff --git a/crates/fluvio-cli/src/profile/sync/k8.rs b/crates/fluvio-cli/src/profile/sync/k8.rs index 0c46af7fc7d..5a20ff50480 100644 --- a/crates/fluvio-cli/src/profile/sync/k8.rs +++ b/crates/fluvio-cli/src/profile/sync/k8.rs @@ -10,7 +10,7 @@ use k8_config::K8Config; use k8_client::K8Client; use k8_client::meta_client::MetadataClient; use k8_types::core::service::ServiceSpec; -use k8_types::InputObjectMeta; +use k8_types::{InputObjectMeta, MetaStatus}; use crate::common::tls::TlsClientOpt; @@ -112,22 +112,22 @@ pub async fn set_k8_context(opt: K8Opt, external_addr: String) -> Result) -> Result> { - use k8_client::http::status::StatusCode; - let ns = namespace.unwrap_or("default"); let svc = match K8Client::try_default()? .retrieve_item::(&InputObjectMeta::named("fluvio-sc-public", ns)) .await { Ok(svc) => svc, - Err(err) => match err { - k8_client::ClientError::ApiResponse(status) - if status.code == Some(StatusCode::NOT_FOUND.as_u16()) => + Err(err) => { + if let Some(MetaStatus { + code: Some(404), .. + }) = err.downcast_ref() { - return Ok(None) + return Ok(None); + } else { + return Err(anyhow!("unable to look up fluvio service in k8: {}", err)); } - _ => return Err(anyhow!("unable to look up fluvio service in k8: {}", err)), - }, + } }; debug!("fluvio svc: {:#?}", svc); diff --git a/crates/fluvio-cluster/src/check/mod.rs b/crates/fluvio-cluster/src/check/mod.rs index 4f12af849ec..ecf7dd37e62 100644 --- a/crates/fluvio-cluster/src/check/mod.rs +++ b/crates/fluvio-cluster/src/check/mod.rs @@ -19,7 +19,6 @@ use sysinfo::{ProcessExt, System, SystemExt}; use fluvio_helm::{HelmClient, HelmError}; use k8_config::{ConfigError as K8ConfigError, K8Config}; -use k8_client::ClientError as K8ClientError; use crate::charts::{DEFAULT_HELM_VERSION, APP_CHART_NAME}; use crate::progress::ProgressBarFactory; @@ -53,10 +52,6 @@ pub enum ClusterCheckError { #[error("Kubernetes config error")] K8ConfigError(#[from] K8ConfigError), - /// Could not connect to K8 client - #[error("Kubernetes client error")] - K8ClientError(#[from] K8ClientError), - /// Failed to parse kubernetes cluster server URL #[error("Failed to parse server url from Kubernetes context")] BadKubernetesServerUrl(#[from] ParseError), @@ -111,10 +106,6 @@ pub enum ClusterAutoFixError { #[error("Kubernetes config error")] K8Config(#[from] K8ConfigError), - /// Could not connect to K8 client - #[error("Kubernetes client error")] - K8Client(#[from] K8ClientError), - #[error("Chart Install error")] ChartInstall(#[from] ChartInstallError), } diff --git a/crates/fluvio-cluster/src/cli/error.rs b/crates/fluvio-cluster/src/cli/error.rs index 27e9201c60c..6d04b60939e 100644 --- a/crates/fluvio-cluster/src/cli/error.rs +++ b/crates/fluvio-cluster/src/cli/error.rs @@ -50,20 +50,7 @@ impl ClusterCliError { impl ClusterError { /// Converts the plain error type into a CLI-formatted Report pub fn into_report(self) -> color_eyre::Report { - #[allow(unused)] - use color_eyre::Section; use color_eyre::Report; - use k8_client::ClientError as K8; - - // In the future when we want to annotate errors, we do it here - match &self { - Self::InstallLocal(LocalInstallError::K8ClientError(K8::ApiResponse(it))) - if it.code == Some(409) => - { - let report = Report::from(self); - report.suggestion("Run `fluvio cluster delete --local`, then retry") - } - _ => Report::from(self), - } + Report::from(self) } } diff --git a/crates/fluvio-cluster/src/delete.rs b/crates/fluvio-cluster/src/delete.rs index e6238487ce8..4ca08cef961 100644 --- a/crates/fluvio-cluster/src/delete.rs +++ b/crates/fluvio-cluster/src/delete.rs @@ -2,6 +2,7 @@ use std::process::Command; use std::fs::{remove_dir_all, remove_file}; use derive_builder::Builder; +use k8_metadata_client::MetadataClient; use tracing::{info, warn, debug, instrument}; use sysinfo::{ProcessExt, System, SystemExt}; @@ -283,17 +284,13 @@ impl ClusterUninstaller { /// in order to remove partitions, finalizers need to be cleared #[instrument(skip(self))] - async fn remove_finalizers_for_partitions( - &self, - namespace: &str, - ) -> Result<(), UninstallError> { + async fn remove_finalizers_for_partitions(&self, namespace: &str) -> anyhow::Result<()> { use fluvio_controlplane_metadata::partition::PartitionSpec; use fluvio_controlplane_metadata::store::k8::K8ExtendedSpec; use k8_client::load_and_share; - use k8_metadata_client::MetadataClient; use k8_metadata_client::PatchMergeType::JsonMerge; - let client = load_and_share().map_err(UninstallError::K8ClientError)?; + let client = load_and_share()?; let partitions = client .retrieve_items::<::K8Spec, _>(namespace) diff --git a/crates/fluvio-cluster/src/error.rs b/crates/fluvio-cluster/src/error.rs index 675a312284e..e1c406c6b61 100644 --- a/crates/fluvio-cluster/src/error.rs +++ b/crates/fluvio-cluster/src/error.rs @@ -6,7 +6,6 @@ use indicatif::style::TemplateError; use fluvio::FluvioError; use k8_config::ConfigError as K8ConfigError; -use k8_client::ClientError as K8ClientError; use fluvio_helm::HelmError; use fluvio_command::CommandError; @@ -39,9 +38,6 @@ pub enum K8InstallError { /// An error occurred with the Kubernetes config. #[error("Kubernetes config error: {0}")] K8ConfigError(#[from] K8ConfigError), - /// An error occurred with the Kubernetes client. - #[error("Kubernetes client error: {0}")] - K8ClientError(#[from] K8ClientError), /// An error occurred while running helm. #[error("Helm client error: {0}")] HelmError(#[from] HelmError), @@ -115,9 +111,6 @@ pub enum LocalInstallError { /// An error occurred with the Kubernetes config. #[error("Kubernetes config error: {0}")] K8ConfigError(#[from] K8ConfigError), - /// An error occurred with the Kubernetes client. - #[error("Kubernetes client error: {0}")] - K8ClientError(#[from] K8ClientError), /// An error occurred while running helm. #[error("Helm client error: {0}")] HelmError(#[from] HelmError), @@ -179,9 +172,6 @@ pub enum UninstallError { /// An error occurred with the Kubernetes config. #[error("Kubernetes config error: {0}")] K8ConfigError(#[from] K8ConfigError), - /// An error occurred with the Kubernetes client. - #[error("Kubernetes client error: {0}")] - K8ClientError(#[from] K8ClientError), /// An error occurred while running helm. #[error("Helm client error: {0}")] HelmError(#[from] HelmError), diff --git a/crates/fluvio-cluster/src/start/common.rs b/crates/fluvio-cluster/src/start/common.rs index 6a3197e55d5..63836fee890 100644 --- a/crates/fluvio-cluster/src/start/common.rs +++ b/crates/fluvio-cluster/src/start/common.rs @@ -4,7 +4,7 @@ use std::{ }; use fluvio_controlplane_metadata::spu::SpuSpec; -use k8_client::{SharedK8Client, ClientError}; +use k8_client::SharedK8Client; use once_cell::sync::Lazy; use semver::Version; use tracing::{debug, error, instrument, warn}; @@ -88,7 +88,7 @@ pub async fn try_connect_to_sc( } // hack -pub async fn check_crd(client: SharedK8Client) -> Result<(), ClientError> { +pub async fn check_crd(client: SharedK8Client) -> anyhow::Result<()> { use k8_metadata_client::MetadataClient; for i in 0..100 { @@ -103,5 +103,5 @@ pub async fn check_crd(client: SharedK8Client) -> Result<(), ClientError> { } } - Err(ClientError::Other("Fluvio CRD not ready".to_string())) + Err(anyhow::anyhow!("Fluvio CRD not ready")) } diff --git a/crates/fluvio-cluster/src/start/k8.rs b/crates/fluvio-cluster/src/start/k8.rs index 9e9e089a1d6..806f54b377b 100644 --- a/crates/fluvio-cluster/src/start/k8.rs +++ b/crates/fluvio-cluster/src/start/k8.rs @@ -573,15 +573,15 @@ impl ClusterInstaller { /// # Example /// /// ``` - /// # use fluvio_cluster::{ClusterConfig, ClusterError, ClusterInstaller}; - /// # fn example(config: ClusterConfig) -> Result<(), ClusterError> { + /// # use fluvio_cluster::{ClusterConfig, ClusterInstaller}; + /// # fn example(config: ClusterConfig) -> anyhow::Result<()> { /// let installer = ClusterInstaller::from_config(config)?; /// # Ok(()) /// # } /// ``` - pub fn from_config(config: ClusterConfig) -> Result { + pub fn from_config(config: ClusterConfig) -> Result { Ok(Self { - kube_client: load_and_share().map_err(K8InstallError::K8ClientError)?, + kube_client: load_and_share()?, pb_factory: ProgressBarFactory::new(config.hide_spinner), config, }) @@ -650,9 +650,7 @@ impl ClusterInstaller { self.install_app().await?; // before we do let's try make sure SPU are installed. - check_crd(self.kube_client.clone()) - .await - .map_err(K8InstallError::from)?; + check_crd(self.kube_client.clone()).await?; let pb = self.pb_factory.create()?; @@ -924,7 +922,7 @@ impl ClusterInstaller { /// Looks up the external address of a Fluvio SC instance in the given namespace #[instrument(skip(self))] - async fn discover_sc_service(&self) -> Result, K8InstallError> { + async fn discover_sc_service(&self) -> Result> { use tokio::select; use futures_util::stream::StreamExt; @@ -940,7 +938,7 @@ impl ClusterInstaller { select! { _ = &mut timer => { debug!(timer = *MAX_SC_SERVICE_WAIT,"timer expired"); - return Err(K8InstallError::SCServiceTimeout) + return Err(K8InstallError::SCServiceTimeout.into()) }, service_next = service_stream.next() => { if let Some(service_watches) = service_next { @@ -962,7 +960,7 @@ impl ClusterInstaller { } } else { debug!("service stream ended"); - return Err(K8InstallError::SCServiceTimeout) + return Err(K8InstallError::SCServiceTimeout.into()) } } } @@ -971,7 +969,7 @@ impl ClusterInstaller { /// Waits for SC pod #[instrument(skip(self))] - async fn wait_for_sc_availability(&self) -> Result, K8InstallError> { + async fn wait_for_sc_availability(&self) -> Result> { use tokio::select; use futures_util::stream::StreamExt; @@ -987,7 +985,7 @@ impl ClusterInstaller { select! { _ = &mut timer => { debug!(timer = *MAX_SC_DEPLOYMENT_AVAILABLE_WAIT, "timer expired"); - return Err(K8InstallError::SCDeploymentTimeout) + return Err(K8InstallError::SCDeploymentTimeout.into()) }, deployment_next = deployment_stream.next() => { if let Some(deployment_watches) = deployment_next { @@ -1014,7 +1012,7 @@ impl ClusterInstaller { } } else { debug!("deployment stream ended"); - return Err(K8InstallError::SCDeploymentTimeout) + return Err(K8InstallError::SCDeploymentTimeout.into()) } } } diff --git a/crates/fluvio-cluster/src/start/local.rs b/crates/fluvio-cluster/src/start/local.rs index df014665c3f..8bf01826ac6 100644 --- a/crates/fluvio-cluster/src/start/local.rs +++ b/crates/fluvio-cluster/src/start/local.rs @@ -1,6 +1,6 @@ use std::path::{Path, PathBuf}; use std::fs::create_dir_all; -use std::process::{Command}; +use std::process::Command; use std::time::{Duration, SystemTime}; use colored::Colorize; @@ -11,7 +11,7 @@ use once_cell::sync::Lazy; use fluvio::{Fluvio, FluvioConfig}; use fluvio::config::{TlsPolicy, ConfigFile, LOCAL_PROFILE}; -use fluvio_controlplane_metadata::spu::{SpuSpec}; +use fluvio_controlplane_metadata::spu::SpuSpec; use fluvio_future::timer::sleep; use fluvio_command::CommandExt; use k8_types::{InputK8Obj, InputObjectMeta}; @@ -19,7 +19,7 @@ use k8_client::SharedK8Client; use crate::render::{ProgressRenderedText, ProgressRenderer}; use crate::{ClusterChecker, LocalInstallError, StartStatus, UserChartLocation}; -use crate::charts::{ChartConfig}; +use crate::charts::ChartConfig; use crate::check::{SysChartCheck, ClusterCheckError}; use crate::runtime::local::{LocalSpuProcessClusterManager, ScProcess}; use crate::progress::{InstallProgressMessage, ProgressBarFactory}; @@ -523,7 +523,7 @@ impl LocalInstaller { client: SharedK8Client, ) -> Result<(), LocalInstallError> { use k8_client::meta_client::MetadataClient; - use crate::runtime::spu::{SpuClusterManager}; + use crate::runtime::spu::SpuClusterManager; let spu_process = cluster_manager.create_spu_relative(spu_index); diff --git a/crates/fluvio-sc-schema/src/objects/create.rs b/crates/fluvio-sc-schema/src/objects/create.rs index 50302e00c05..6f1df2eca3a 100644 --- a/crates/fluvio-sc-schema/src/objects/create.rs +++ b/crates/fluvio-sc-schema/src/objects/create.rs @@ -65,7 +65,7 @@ use classic::*; mod classic { use std::io::{Error as IoError, ErrorKind, Cursor}; - use std::fmt::{Debug}; + use std::fmt::Debug; use anyhow::Result; diff --git a/crates/fluvio-sc/src/error.rs b/crates/fluvio-sc/src/error.rs index 77663a76ecb..6fe7d8d9812 100644 --- a/crates/fluvio-sc/src/error.rs +++ b/crates/fluvio-sc/src/error.rs @@ -6,16 +6,12 @@ use std::fmt; use std::io::Error as IoError; use fluvio_types::PartitionError; -#[cfg(feature = "k8")] -use k8_client::ClientError; use fluvio_socket::SocketError; use fluvio_auth::AuthError; #[derive(Debug)] pub enum ScError { Io(IoError), - #[cfg(feature = "k8")] - Client(ClientError), Socket(SocketError), Partition(PartitionError), Auth(AuthError), @@ -26,8 +22,6 @@ impl fmt::Display for ScError { match self { Self::Io(err) => write!(f, "{err}"), // Self::SendError(err) => write!(f, "{}", err), - #[cfg(feature = "k8")] - Self::Client(err) => write!(f, "{err}"), Self::Socket(err) => write!(f, "{err}"), Self::Partition(err) => write!(f, "{err}"), Self::Auth(err) => write!(f, "{err}"), @@ -47,13 +41,6 @@ impl From for ScError { } } -#[cfg(feature = "k8")] -impl From for ScError { - fn from(error: ClientError) -> Self { - Self::Client(error) - } -} - impl From for ScError { fn from(error: SocketError) -> Self { Self::Socket(error) diff --git a/crates/fluvio-sc/src/k8/controllers/spg_stateful.rs b/crates/fluvio-sc/src/k8/controllers/spg_stateful.rs index 5932615b3e4..b22c869d21e 100644 --- a/crates/fluvio-sc/src/k8/controllers/spg_stateful.rs +++ b/crates/fluvio-sc/src/k8/controllers/spg_stateful.rs @@ -1,6 +1,7 @@ -use std::{time::Duration}; +use std::time::Duration; -use fluvio_stream_dispatcher::{store::K8ChangeListener}; +use anyhow::Result; +use fluvio_stream_dispatcher::store::K8ChangeListener; use tracing::debug; use tracing::error; use tracing::trace; @@ -9,14 +10,13 @@ use tracing::instrument; use fluvio_future::task::spawn; use fluvio_future::timer::sleep; -use k8_client::ClientError; -use crate::stores::{StoreContext}; +use crate::stores::StoreContext; use crate::stores::spg::{SpuGroupSpec, SpuGroupStatus}; -use crate::stores::spu::{SpuSpec}; +use crate::stores::spu::SpuSpec; use crate::cli::TlsConfig; -use crate::k8::objects::spg_group::{SpuGroupObj}; +use crate::k8::objects::spg_group::SpuGroupObj; use crate::k8::objects::spu_k8_config::ScK8Config; use crate::k8::objects::statefulset::StatefulsetSpec; use crate::k8::objects::spg_service::SpgServiceSpec; @@ -66,7 +66,7 @@ impl SpgStatefulSetController { } #[instrument(skip(self), name = "SpgStatefulSetController")] - async fn inner_loop(&mut self) -> Result<(), ClientError> { + async fn inner_loop(&mut self) -> Result<()> { use tokio::select; let mut spg_listener = self.groups.change_listener(); @@ -99,7 +99,7 @@ impl SpgStatefulSetController { async fn sync_with_config( &mut self, listener: &mut K8ChangeListener, - ) -> Result<(), ClientError> { + ) -> Result<()> { if !listener.has_change() { trace!("no config change, skipping"); return Ok(()); @@ -132,7 +132,7 @@ impl SpgStatefulSetController { async fn sync_spgs_to_statefulset( &mut self, listener: &mut K8ChangeListener, - ) -> Result<(), ClientError> { + ) -> Result<()> { if !listener.has_change() { debug!("no spg change, skipping"); return Ok(()); @@ -170,7 +170,7 @@ impl SpgStatefulSetController { &mut self, spu_group: SpuGroupObj, spu_k8_config: &ScK8Config, - ) -> Result<(), ClientError> { + ) -> Result<()> { let spg_name = spu_group.key(); // ensure we don't have conflict with existing spu group diff --git a/crates/fluvio-sc/src/k8/controllers/spu_controller.rs b/crates/fluvio-sc/src/k8/controllers/spu_controller.rs index b9917929779..f60a289eabe 100644 --- a/crates/fluvio-sc/src/k8/controllers/spu_controller.rs +++ b/crates/fluvio-sc/src/k8/controllers/spu_controller.rs @@ -1,12 +1,13 @@ use std::{collections::HashMap, fmt, net::IpAddr, time::Duration}; +use anyhow::Result; + use fluvio_controlplane_metadata::{ spu::IngressPort, store::{MetadataStoreObject, k8::K8MetaItem}, }; use fluvio_stream_dispatcher::actions::WSAction; -use k8_client::ClientError; use tracing::{debug, error, instrument, info}; use fluvio_future::task::spawn; @@ -67,7 +68,7 @@ impl K8SpuController { } } - async fn inner_loop(&mut self) -> Result<(), ClientError> { + async fn inner_loop(&mut self) -> Result<()> { use tokio::select; debug!("initializing listeners"); @@ -150,7 +151,7 @@ impl K8SpuController { #[instrument(skip(self))] /// synchronize change from spg to spu - async fn sync_spu(&mut self) -> Result<(), ClientError> { + async fn sync_spu(&mut self) -> Result<()> { // get all models let spg = self.groups.store().clone_values().await; let services = self.get_spu_services().await; @@ -205,7 +206,7 @@ impl K8SpuController { fn get_ingress_from_service( svc_md: &MetadataStoreObject, -) -> Result { +) -> Result { // Get the external ingress from the service // Look at svc_md to identify if LoadBalancer let lb_type = svc_md.spec().inner().r#type.as_ref(); @@ -219,7 +220,7 @@ fn get_ingress_from_service( Some(LoadBalancerType::NodePort) => { let port = svc_md.spec().inner().ports[0] .node_port - .ok_or_else(|| ClientError::Other("SPU service missing NodePort".into()))?; + .ok_or_else(|| anyhow::anyhow!("SPU service missing NodePort"))?; IngressPort { port, ..Default::default() diff --git a/crates/fluvio-sc/src/k8/controllers/spu_service.rs b/crates/fluvio-sc/src/k8/controllers/spu_service.rs index cad663513a3..1aca3d7dfaa 100644 --- a/crates/fluvio-sc/src/k8/controllers/spu_service.rs +++ b/crates/fluvio-sc/src/k8/controllers/spu_service.rs @@ -2,12 +2,12 @@ use std::{collections::HashMap, fmt, time::Duration}; use tracing::{debug, error, info, instrument, trace, warn}; +use anyhow::{Result, anyhow}; use fluvio_future::task::spawn; use fluvio_future::timer::sleep; use fluvio_controlplane_metadata::store::MetadataStoreObject; use fluvio_controlplane_metadata::store::k8::K8MetaItem; use fluvio_stream_dispatcher::actions::WSAction; -use k8_client::ClientError; use crate::stores::spg::SpuGroupSpec; use crate::stores::{StoreContext, K8ChangeListener}; @@ -60,7 +60,7 @@ impl SpuServiceController { } #[instrument(skip(self), name = "SpuSvcLoop")] - async fn inner_loop(&mut self) -> Result<(), ClientError> { + async fn inner_loop(&mut self) -> Result<()> { use tokio::select; let mut spg_listener = self.groups.change_listener(); @@ -100,7 +100,7 @@ impl SpuServiceController { async fn sync_with_config( &mut self, listener: &mut K8ChangeListener, - ) -> Result<(), ClientError> { + ) -> Result<()> { if !listener.has_change() { trace!("no config change, skipping"); return Ok(()); @@ -126,10 +126,7 @@ impl SpuServiceController { Ok(()) } - async fn sync_with_spg( - &mut self, - listener: &mut K8ChangeListener, - ) -> Result<(), ClientError> { + async fn sync_with_spg(&mut self, listener: &mut K8ChangeListener) -> Result<()> { if !listener.has_change() { trace!("no spg changes, skipping"); return Ok(()); @@ -150,7 +147,7 @@ impl SpuServiceController { self.update_services(updates, config.inner_owned().spec) .await?; } else { - return Err(ClientError::Other("fluvio config not found".to_owned())); + return Err(anyhow!("fluvio config not found")); } Ok(()) @@ -161,7 +158,7 @@ impl SpuServiceController { &self, updates: Vec>, config: ScK8Config, - ) -> Result<(), ClientError> { + ) -> Result<()> { for group_item in updates.into_iter() { let spg_obj = SpuGroupObj::new(group_item); @@ -186,7 +183,7 @@ impl SpuServiceController { spg_obj: &SpuGroupObj, spu_name: &str, spu_k8_config: &ScK8Config, - ) -> Result<(), ClientError> { + ) -> Result<()> { use k8_types::core::service::ServiceSpec as K8ServiceSpec; let mut selector = HashMap::new(); diff --git a/crates/fluvio-sc/src/k8/objects/spg_group.rs b/crates/fluvio-sc/src/k8/objects/spg_group.rs index 4743086210e..69195194322 100644 --- a/crates/fluvio-sc/src/k8/objects/spg_group.rs +++ b/crates/fluvio-sc/src/k8/objects/spg_group.rs @@ -13,13 +13,13 @@ use crate::stores::MetadataStoreObject; use crate::stores::spg::SpuGroupSpec; use crate::stores::spu::is_conflict; use crate::stores::k8::K8MetaItem; -use crate::stores::spu::{SpuSpec}; -use crate::stores::{LocalStore}; +use crate::stores::spu::SpuSpec; +use crate::stores::LocalStore; use crate::stores::actions::WSAction; use crate::cli::TlsConfig; use super::spu_k8_config::ScK8Config; -use super::statefulset::{StatefulsetSpec}; +use super::statefulset::StatefulsetSpec; use super::spg_service::SpgServiceSpec; #[derive(Debug)] @@ -191,7 +191,7 @@ mod k8_convert { }; use crate::stores::spg::SpuGroupSpec; - use super::super::statefulset::{K8StatefulSetSpec}; + use super::super::statefulset::K8StatefulSetSpec; use super::{ScK8Config, TlsConfig}; /// convert spu group spec into k8 statefulset spec diff --git a/crates/fluvio-sc/src/k8/objects/spu_k8_config.rs b/crates/fluvio-sc/src/k8/objects/spu_k8_config.rs index 7bc3abb474a..4bdeb84ac46 100644 --- a/crates/fluvio-sc/src/k8/objects/spu_k8_config.rs +++ b/crates/fluvio-sc/src/k8/objects/spu_k8_config.rs @@ -1,12 +1,12 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt; +use anyhow::Result; use serde::Deserialize; use tracing::{debug, info}; use fluvio_controlplane_metadata::core::MetadataContext; use fluvio_types::defaults::SPU_PUBLIC_PORT; -use k8_client::ClientError; use k8_types::Env; use k8_types::core::pod::{ ResourceRequirements, PodSecurityContext, ContainerSpec, VolumeMount, VolumeSpec, @@ -47,11 +47,11 @@ pub struct ScK8Config { } impl ScK8Config { - fn from(mut data: BTreeMap) -> Result { + fn from(mut data: BTreeMap) -> Result { debug!("ConfigMap {} data: {:?}", CONFIG_MAP_NAME, data); let image = data.remove("image").ok_or_else(|| { - ClientError::Other("image not found in ConfigMap spu-k8 data".to_owned()) + anyhow::anyhow!("image not found in ConfigMap spu-k8 data".to_owned()) })?; let pod_security_context = @@ -75,9 +75,8 @@ impl ScK8Config { }; let spu_pod_config = if let Some(config_str) = data.remove("spuPodConfig") { - serde_json::from_str(&config_str).map_err(|err| { - ClientError::Other(format!("not able to parse spu pod config: {err:#?}")) - })? + serde_json::from_str(&config_str) + .map_err(|err| anyhow::anyhow!("not able to parse spu pod config: {err:#?}"))? } else { info!("spu pod config not found, using default"); PodConfig::default() diff --git a/crates/fluvio-stream-dispatcher/Cargo.toml b/crates/fluvio-stream-dispatcher/Cargo.toml index 6be8d1b4e21..29fb27a569e 100644 --- a/crates/fluvio-stream-dispatcher/Cargo.toml +++ b/crates/fluvio-stream-dispatcher/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fluvio-stream-dispatcher" edition = "2021" -version = "0.10.2" +version = "0.11.0" authors = ["Fluvio Contributors "] description = "Fluvio Event Stream access" repository = "https://github.com/infinyon/fluvio" @@ -13,20 +13,25 @@ path = "src/lib.rs" [dependencies] +anyhow = { workspace = true } async-trait = { workspace = true } -async-rwlock = { workspace = true } -futures-lite = { workspace = true } +async-lock = { workspace = true } async-channel = { workspace = true } event-listener = { workspace = true } +futures-lite = { workspace = true } +futures-util = { workspace = true } once_cell = { workspace = true } serde = { workspace = true, features = ['derive'] } serde_json = { workspace = true } +serde_yaml = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true, features = ["macros"] } tracing = { workspace = true } # Fluvio dependencies fluvio-types = { workspace = true } fluvio-stream-model = { workspace = true, features = [ "k8"] } +k8-diff = { workspace = true } k8-metadata-client = { workspace = true } k8-types = { workspace = true } fluvio-future = { workspace = true, features = ["task", "timer"] } diff --git a/crates/fluvio-stream-dispatcher/src/dispatcher/k8_dispatcher.rs b/crates/fluvio-stream-dispatcher/src/dispatcher/k8_dispatcher.rs index 2779acca5b9..6743f4621bc 100644 --- a/crates/fluvio-stream-dispatcher/src/dispatcher/k8_dispatcher.rs +++ b/crates/fluvio-stream-dispatcher/src/dispatcher/k8_dispatcher.rs @@ -405,8 +405,8 @@ mod convert { /// Translates watch events into metadata action and apply into local store /// #[instrument(skip(stream, local_store))] - pub async fn k8_watch_events_to_metadata_actions( - stream: TokenStreamResult, + pub async fn k8_watch_events_to_metadata_actions( + stream: TokenStreamResult, local_store: &LocalStore, multi_namespace_context: bool, ) -> Option @@ -415,7 +415,6 @@ mod convert { S::IndexKey: Display, ::Owner: K8ExtendedSpec, S::Status: PartialEq, - E: MetadataClientError, S::IndexKey: Display, { let events = stream.unwrap(); diff --git a/crates/fluvio-stream-dispatcher/src/dispatcher/k8_ws_service.rs b/crates/fluvio-stream-dispatcher/src/dispatcher/k8_ws_service.rs index 588da4ba878..ea2bff39cdb 100644 --- a/crates/fluvio-stream-dispatcher/src/dispatcher/k8_ws_service.rs +++ b/crates/fluvio-stream-dispatcher/src/dispatcher/k8_ws_service.rs @@ -5,6 +5,7 @@ use std::fmt::Display; use std::convert::Into; use std::marker::PhantomData; +use anyhow::Result; use tracing::trace; use tracing::debug; use serde::de::DeserializeOwned; @@ -40,10 +41,7 @@ where } /// add/update - pub async fn apply( - &self, - value: MetadataStoreObject, - ) -> Result<(), C::MetadataClientError> { + pub async fn apply(&self, value: MetadataStoreObject) -> Result<()> { debug!("K8 Applying {}:{}", S::LABEL, value.key()); trace!("adding KV {:#?} to k8 kv", value); @@ -87,7 +85,7 @@ where &self, metadata: K8MetaItem, status: S::Status, - ) -> Result, C::MetadataClientError> { + ) -> Result> { debug!( "K8 Update Status: {} key: {} value: {}", S::LABEL, @@ -110,11 +108,7 @@ where } /// update spec only - pub async fn update_spec( - &self, - metadata: K8MetaItem, - spec: S, - ) -> Result<(), C::MetadataClientError> { + pub async fn update_spec(&self, metadata: K8MetaItem, spec: S) -> Result<()> { debug!("K8 Update Spec: {} key: {}", S::LABEL, metadata.name); trace!("K8 Update Spec: {:#?}", spec); @@ -133,7 +127,7 @@ where self.client.apply(k8_input).await.map(|_| ()) } - pub async fn delete(&self, meta: K8MetaItem) -> Result<(), C::MetadataClientError> { + pub async fn delete(&self, meta: K8MetaItem) -> Result<()> { use k8_types::options::{DeleteOptions, PropogationPolicy}; let options = if S::DELETE_WAIT_DEPENDENTS { @@ -153,7 +147,7 @@ where .map(|_| ()) } - pub async fn final_delete(&self, meta: K8MetaItem) -> Result<(), C::MetadataClientError> { + pub async fn final_delete(&self, meta: K8MetaItem) -> Result<()> { use once_cell::sync::Lazy; use serde_json::Value; diff --git a/crates/fluvio-test-util/test_runner/test_driver/mod.rs b/crates/fluvio-test-util/test_runner/test_driver/mod.rs index 614a64aac0e..6a9740c868d 100644 --- a/crates/fluvio-test-util/test_runner/test_driver/mod.rs +++ b/crates/fluvio-test-util/test_runner/test_driver/mod.rs @@ -4,7 +4,7 @@ use tracing::debug; use anyhow::Result; use fluvio::consumer::PartitionSelectionStrategy; -use fluvio::{Fluvio}; +use fluvio::Fluvio; use fluvio::metadata::topic::TopicSpec; use fluvio::{TopicProducer, RecordKey, PartitionConsumer, MultiplePartitionConsumer}; use fluvio::TopicProducerConfig; diff --git a/crates/fluvio/src/producer/accumulator.rs b/crates/fluvio/src/producer/accumulator.rs index 0399013516e..6400a95b1ec 100644 --- a/crates/fluvio/src/producer/accumulator.rs +++ b/crates/fluvio/src/producer/accumulator.rs @@ -12,7 +12,7 @@ use tracing::trace; use fluvio_future::sync::Condvar; use futures_util::future::{BoxFuture, Either, Shared}; use futures_util::{FutureExt, ready}; -use fluvio_protocol::record::{Batch}; +use fluvio_protocol::record::Batch; use fluvio_compression::Compression; use fluvio_protocol::record::Offset; use fluvio_protocol::link::ErrorCode; diff --git a/crates/fluvio/src/producer/mod.rs b/crates/fluvio/src/producer/mod.rs index bb577c5dc9e..c180e7a567e 100644 --- a/crates/fluvio/src/producer/mod.rs +++ b/crates/fluvio/src/producer/mod.rs @@ -32,7 +32,7 @@ pub use crate::producer::partitioning::{Partitioner, PartitionerConfig}; #[cfg(feature = "stats")] use crate::stats::{ClientStats, ClientStatsDataCollect, metrics::ClientStatsDataFrame}; -use self::accumulator::{BatchHandler}; +use self::accumulator::BatchHandler; pub use self::config::{ TopicProducerConfigBuilder, TopicProducerConfig, TopicProducerConfigBuilderError, DeliverySemantic, RetryPolicy, RetryStrategy,