From 0e5540e32dd11e8a008889f24f2a829eca774298 Mon Sep 17 00:00:00 2001 From: Sehyo Chang Date: Fri, 4 Aug 2023 19:58:27 -0700 Subject: [PATCH 1/4] move spu smartmodule to controlplane only --- .../src/smartmodule/mod.rs | 31 ------------------- crates/fluvio-controlplane/src/message/mod.rs | 7 ++--- .../src/spu_api/update_smartmodule.rs | 31 +++++++++++++++++-- .../src/core/smartmodule/metadata.rs | 2 +- .../src/services/public/tests/mod.rs | 3 +- .../src/services/public/tests/stream_fetch.rs | 3 +- 6 files changed, 37 insertions(+), 40 deletions(-) diff --git a/crates/fluvio-controlplane-metadata/src/smartmodule/mod.rs b/crates/fluvio-controlplane-metadata/src/smartmodule/mod.rs index fa720c8801..8818356577 100644 --- a/crates/fluvio-controlplane-metadata/src/smartmodule/mod.rs +++ b/crates/fluvio-controlplane-metadata/src/smartmodule/mod.rs @@ -8,42 +8,11 @@ pub use self::spec::*; pub use self::status::*; pub use self::package::*; -use std::fmt; - -use fluvio_stream_model::core::MetadataItem; -use fluvio_stream_model::store::MetadataStoreObject; -use fluvio_types::SmartModuleName; -use fluvio_protocol::{Encoder, Decoder}; - #[cfg(feature = "k8")] mod k8; #[cfg(feature = "k8")] pub use k8::*; -/// SmartModule object that can be used to transport from SC to SPU -#[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)] -pub struct SmartModule { - pub name: SmartModuleName, - pub spec: SmartModuleSpec, -} - -impl fmt::Display for SmartModule { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "SmartModule({})", self.name) - } -} - -impl From> for SmartModule -where - C: MetadataItem, -{ - fn from(mso: MetadataStoreObject) -> Self { - let name = mso.key_owned(); - let spec = mso.spec; - Self { name, spec } - } -} - mod metadata { use crate::core::{Spec, Status, Removable, Creatable}; diff --git a/crates/fluvio-controlplane/src/message/mod.rs b/crates/fluvio-controlplane/src/message/mod.rs index 8ced5f760a..c9938f0bf8 100644 --- a/crates/fluvio-controlplane/src/message/mod.rs +++ b/crates/fluvio-controlplane/src/message/mod.rs @@ -13,10 +13,9 @@ mod spu_msg { } mod smartmodule_msg { - use fluvio_controlplane_metadata::{ - smartmodule::SmartModule, - message::{Message, Messages}, - }; + use fluvio_controlplane_metadata::message::{Message, Messages}; + + use crate::spu_api::update_smartmodule::SmartModule; pub type SmartModuleMsg = Message; pub type SmartModuleMsgs = Messages; diff --git a/crates/fluvio-controlplane/src/spu_api/update_smartmodule.rs b/crates/fluvio-controlplane/src/spu_api/update_smartmodule.rs index 1576499864..2c0c07232e 100644 --- a/crates/fluvio-controlplane/src/spu_api/update_smartmodule.rs +++ b/crates/fluvio-controlplane/src/spu_api/update_smartmodule.rs @@ -1,9 +1,12 @@ -#![allow(clippy::assign_op_pattern)] +use std::fmt; +use fluvio_controlplane_metadata::core::MetadataItem; +use fluvio_controlplane_metadata::smartmodule::SmartModuleSpec; +use fluvio_controlplane_metadata::store::MetadataStoreObject; use fluvio_protocol::Decoder; use fluvio_protocol::Encoder; use fluvio_protocol::api::Request; -use fluvio_controlplane_metadata::smartmodule::SmartModule; +use fluvio_types::SmartModuleName; use crate::requests::ControlPlaneRequest; @@ -19,3 +22,27 @@ impl Request for UpdateSmartModuleRequest { #[derive(Decoder, Encoder, Default, Debug)] pub struct UpdateSmartModuleResponse {} + +/// SmartModule object that can be used to transport from SC to SPU +#[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)] +pub struct SmartModule { + pub name: SmartModuleName, + pub spec: SmartModuleSpec, +} + +impl fmt::Display for SmartModule { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "SmartModule({})", self.name) + } +} + +impl From> for SmartModule +where + C: MetadataItem, +{ + fn from(mso: MetadataStoreObject) -> Self { + let name = mso.key_owned(); + let spec = mso.spec; + Self { name, spec } + } +} diff --git a/crates/fluvio-spu/src/core/smartmodule/metadata.rs b/crates/fluvio-spu/src/core/smartmodule/metadata.rs index 58161c8a75..4c4a70adb8 100644 --- a/crates/fluvio-spu/src/core/smartmodule/metadata.rs +++ b/crates/fluvio-spu/src/core/smartmodule/metadata.rs @@ -1,6 +1,6 @@ use anyhow::Result; -use fluvio_controlplane_metadata::smartmodule::SmartModule; +use fluvio_controlplane::spu_api::update_smartmodule::SmartModule; use fluvio_controlplane_metadata::smartmodule::SmartModulePackageKey; use fluvio_types::SmartModuleName; diff --git a/crates/fluvio-spu/src/services/public/tests/mod.rs b/crates/fluvio-spu/src/services/public/tests/mod.rs index 2fa23de850..23eb20f84d 100644 --- a/crates/fluvio-spu/src/services/public/tests/mod.rs +++ b/crates/fluvio-spu/src/services/public/tests/mod.rs @@ -5,8 +5,9 @@ use std::{ use chrono::Utc; use flate2::{bufread::GzEncoder, Compression}; +use fluvio_controlplane::spu_api::update_smartmodule::SmartModule; use fluvio_controlplane_metadata::smartmodule::{ - SmartModuleSpec, SmartModule, SmartModuleWasm, SmartModuleWasmFormat, + SmartModuleSpec, SmartModuleWasm, SmartModuleWasmFormat, }; use fluvio_protocol::{ fixture::BatchProducer, diff --git a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs index 5577743e46..6716612b0a 100644 --- a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs +++ b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs @@ -3,11 +3,12 @@ use std::sync::Arc; use chrono::{Utc, Days}; use fluvio_controlplane::replica::Replica; +use fluvio_controlplane::spu_api::update_smartmodule::SmartModule; use fluvio_smartmodule::dataplane::smartmodule::Lookback; use tracing::{debug, info}; use fluvio_controlplane_metadata::smartmodule::{ - SmartModule, SmartModuleWasm, SmartModuleWasmFormat, SmartModuleSpec, + SmartModuleWasm, SmartModuleWasmFormat, SmartModuleSpec, }; use fluvio_storage::FileReplica; use flv_util::fixture::ensure_clean_dir; From e53d8797a1eb14c471ee1a1654f6c9595b1493df Mon Sep 17 00:00:00 2001 From: Sehyo Chang Date: Fri, 4 Aug 2023 20:21:46 -0700 Subject: [PATCH 2/4] migrate anyhow for sc dispatcher --- .../src/control_plane/dispatcher.rs | 37 ++++++++----------- crates/fluvio-spu/src/lib.rs | 1 - 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/crates/fluvio-spu/src/control_plane/dispatcher.rs b/crates/fluvio-spu/src/control_plane/dispatcher.rs index 63c8c636cc..cbd5e22bdb 100644 --- a/crates/fluvio-spu/src/control_plane/dispatcher.rs +++ b/crates/fluvio-spu/src/control_plane/dispatcher.rs @@ -1,5 +1,9 @@ use std::time::Duration; -use std::io::Error as IoError; + +use tracing::{info, trace, error, debug, warn, instrument}; +use tokio::select; +use futures_util::stream::StreamExt; +use anyhow::{anyhow, Result}; use fluvio_controlplane::sc_api::register_spu::RegisterSpuRequest; use fluvio_controlplane::sc_api::update_lrs::UpdateLrsRequest; @@ -7,19 +11,14 @@ use fluvio_controlplane::spu_api::api::{InternalSpuRequest, InternalSpuApi}; use fluvio_controlplane::spu_api::update_replica::UpdateReplicaRequest; use fluvio_controlplane::spu_api::update_smartmodule::UpdateSmartModuleRequest; use fluvio_controlplane::spu_api::update_spu::UpdateSpuRequest; -use tracing::{info, trace, error, debug, warn, instrument}; -use tokio::select; -use futures_util::stream::StreamExt; - use flv_util::print_cli_err; use fluvio_future::task::spawn; use fluvio_future::timer::sleep; use fluvio_protocol::api::RequestMessage; -use fluvio_socket::{FluvioSocket, SocketError, FluvioSink}; +use fluvio_socket::{FluvioSocket, FluvioSink}; use fluvio_storage::FileReplica; use crate::core::SharedGlobalContext; -use crate::InternalServerError; use super::message_sink::SharedStatusUpdate; @@ -112,7 +111,7 @@ impl ScDispatcher { socket = socket.id() ) )] - async fn request_loop(&mut self, socket: FluvioSocket) -> Result<(), SocketError> { + async fn request_loop(&mut self, socket: FluvioSocket) -> Result<()> { use async_io::Timer; /// Interval between each send to SC @@ -175,10 +174,7 @@ impl ScDispatcher { /// send status back to sc, if there is error return false #[instrument(skip(self))] - async fn send_status_back_to_sc( - &mut self, - sc_sink: &mut FluvioSink, - ) -> Result<(), SocketError> { + async fn send_status_back_to_sc(&mut self, sc_sink: &mut FluvioSink) -> Result<()> { let requests = self.status_update.remove_all().await; if requests.is_empty() { @@ -188,10 +184,10 @@ impl ScDispatcher { } let message = RequestMessage::new_request(UpdateLrsRequest::new(requests)); - sc_sink.send_request(&message).await.map_err(|err| { - error!(?err, "error sending status back"); - err - }) + sc_sink + .send_request(&message) + .await + .map_err(|err| anyhow!("error sending status back to sc: {}", err)) } /// register local spu to sc @@ -199,10 +195,7 @@ impl ScDispatcher { skip(self), fields(socket = socket.id()) )] - async fn send_spu_registration( - &self, - socket: &mut FluvioSocket, - ) -> Result { + async fn send_spu_registration(&self, socket: &mut FluvioSocket) -> Result { let local_spu_id = self.ctx.local_spu_id(); debug!(%local_spu_id, "sending spu registration request",); @@ -299,7 +292,7 @@ impl ScDispatcher { async fn handle_update_spu_request( &mut self, req_msg: RequestMessage, - ) -> Result<(), IoError> { + ) -> Result<()> { let (_, request) = req_msg.get_header_request(); debug!( message = ?request,"starting spu update"); @@ -336,7 +329,7 @@ impl ScDispatcher { async fn handle_update_smartmodule_request( &mut self, req_msg: RequestMessage, - ) -> Result<(), IoError> { + ) -> Result<()> { let (_, request) = req_msg.get_header_request(); debug!( message = ?request,"starting SmartModule update"); diff --git a/crates/fluvio-spu/src/lib.rs b/crates/fluvio-spu/src/lib.rs index 8a67b2a684..25e854730b 100644 --- a/crates/fluvio-spu/src/lib.rs +++ b/crates/fluvio-spu/src/lib.rs @@ -15,7 +15,6 @@ cfg_if::cfg_if! { } } -use self::error::InternalServerError; pub use config::SpuOpt; const VERSION: &str = include_str!("../../../VERSION"); From 4338493c417dd093e127c71b0c859c6e85ee26b6 Mon Sep 17 00:00:00 2001 From: Sehyo Chang Date: Fri, 4 Aug 2023 20:05:31 -0700 Subject: [PATCH 3/4] remove unneeded clippy rules --- crates/fluvio-controlplane/src/message/replica_msg.rs | 2 -- crates/fluvio-controlplane/src/sc_api/register_spu.rs | 2 -- crates/fluvio-controlplane/src/sc_api/remove.rs | 2 -- crates/fluvio-controlplane/src/sc_api/update_lrs.rs | 2 -- crates/fluvio-controlplane/src/spu_api/update_replica.rs | 2 -- crates/fluvio-controlplane/src/spu_api/update_spu.rs | 2 -- 6 files changed, 12 deletions(-) diff --git a/crates/fluvio-controlplane/src/message/replica_msg.rs b/crates/fluvio-controlplane/src/message/replica_msg.rs index 34541b773c..6c018e276e 100644 --- a/crates/fluvio-controlplane/src/message/replica_msg.rs +++ b/crates/fluvio-controlplane/src/message/replica_msg.rs @@ -1,5 +1,3 @@ -#![allow(clippy::assign_op_pattern)] - //! //! # Replica Messages //! diff --git a/crates/fluvio-controlplane/src/sc_api/register_spu.rs b/crates/fluvio-controlplane/src/sc_api/register_spu.rs index e1b1a9d05b..a19b78be6e 100644 --- a/crates/fluvio-controlplane/src/sc_api/register_spu.rs +++ b/crates/fluvio-controlplane/src/sc_api/register_spu.rs @@ -1,5 +1,3 @@ -#![allow(clippy::assign_op_pattern)] - //! //! # Register SPU //! diff --git a/crates/fluvio-controlplane/src/sc_api/remove.rs b/crates/fluvio-controlplane/src/sc_api/remove.rs index 42648c6dc3..f4e0071800 100644 --- a/crates/fluvio-controlplane/src/sc_api/remove.rs +++ b/crates/fluvio-controlplane/src/sc_api/remove.rs @@ -1,5 +1,3 @@ -#![allow(clippy::assign_op_pattern)] - use std::fmt; use fluvio_protocol::api::Request; diff --git a/crates/fluvio-controlplane/src/sc_api/update_lrs.rs b/crates/fluvio-controlplane/src/sc_api/update_lrs.rs index cc54cbcba1..3b3a19e592 100644 --- a/crates/fluvio-controlplane/src/sc_api/update_lrs.rs +++ b/crates/fluvio-controlplane/src/sc_api/update_lrs.rs @@ -1,5 +1,3 @@ -#![allow(clippy::assign_op_pattern)] - use std::fmt; use std::hash::{Hash, Hasher}; diff --git a/crates/fluvio-controlplane/src/spu_api/update_replica.rs b/crates/fluvio-controlplane/src/spu_api/update_replica.rs index b2c37153e2..dff62d0ae8 100644 --- a/crates/fluvio-controlplane/src/spu_api/update_replica.rs +++ b/crates/fluvio-controlplane/src/spu_api/update_replica.rs @@ -1,5 +1,3 @@ -#![allow(clippy::assign_op_pattern)] - use fluvio_protocol::api::Request; use fluvio_protocol::Decoder; use fluvio_protocol::Encoder; diff --git a/crates/fluvio-controlplane/src/spu_api/update_spu.rs b/crates/fluvio-controlplane/src/spu_api/update_spu.rs index fdd81bdad3..4a402571f3 100644 --- a/crates/fluvio-controlplane/src/spu_api/update_spu.rs +++ b/crates/fluvio-controlplane/src/spu_api/update_spu.rs @@ -1,5 +1,3 @@ -#![allow(clippy::assign_op_pattern)] - use fluvio_protocol::api::Request; use fluvio_protocol::Decoder; use fluvio_protocol::Encoder; From 9480292173506a4e35a78bbe0886d2010c6ec9bc Mon Sep 17 00:00:00 2001 From: Sehyo Chang Date: Fri, 4 Aug 2023 20:25:03 -0700 Subject: [PATCH 4/4] cleanup --- crates/fluvio-socket/src/sink.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/fluvio-socket/src/sink.rs b/crates/fluvio-socket/src/sink.rs index e7488aef1f..8d6976b296 100644 --- a/crates/fluvio-socket/src/sink.rs +++ b/crates/fluvio-socket/src/sink.rs @@ -3,11 +3,11 @@ use std::fmt::Debug; use std::sync::Arc; use tracing::{trace, instrument}; -use futures_util::{SinkExt}; +use futures_util::SinkExt; use async_lock::Mutex; use async_lock::MutexGuard; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; -use tokio_util::codec::{FramedWrite}; +use tokio_util::codec::FramedWrite; use fluvio_protocol::api::{RequestMessage, ResponseMessage}; use fluvio_protocol::codec::FluvioCodec;