diff --git a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs index b5bdbc60..570cb4a4 100644 --- a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs +++ b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs @@ -14,12 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use std::{error::Error, sync::Arc}; +use std::{collections::HashMap, error::Error, sync::Arc}; use rocketmq_common::TokioExecutorService; use crate::{ - clients::RemotingClient, + clients::{Client, RemotingClient}, protocol::remoting_command::RemotingCommand, remoting::{InvokeCallback, RemotingService}, runtime::{ @@ -31,6 +31,9 @@ use crate::{ pub struct RocketmqDefaultClient { service_bridge: ServiceBridge, tokio_client_config: TokioClientConfig, + //cache connection + connection_tables: HashMap, + lock: std::sync::RwLock<()>, } impl RocketmqDefaultClient { @@ -38,10 +41,29 @@ impl RocketmqDefaultClient { Self { service_bridge: ServiceBridge::new(), tokio_client_config, + connection_tables: Default::default(), + lock: Default::default(), } } } +impl RocketmqDefaultClient { + fn get_and_create_client(&mut self, addr: String) -> &mut Client { + let lc = self.lock.write().unwrap(); + + if self.connection_tables.contains_key(&addr) { + return self.connection_tables.get_mut(&addr).unwrap(); + } + + let addr_inner = addr.clone(); + let client = + futures::executor::block_on(async move { Client::connect(addr_inner).await.unwrap() }); + self.connection_tables.insert(addr.clone(), client); + drop(lc); + self.connection_tables.get_mut(&addr).unwrap() + } +} + #[allow(unused_variables)] impl RemotingService for RocketmqDefaultClient { async fn start(&mut self) { @@ -91,7 +113,9 @@ impl RemotingClient for RocketmqDefaultClient { timeout_millis: u64, invoke_callback: impl InvokeCallback, ) -> Result<(), Box> { - todo!() + let client = self.get_and_create_client(addr.clone()); + + unreachable!() } fn invoke_oneway( diff --git a/rocketmq-remoting/src/remoting.rs b/rocketmq-remoting/src/remoting.rs index c8a64e88..82fa5b32 100644 --- a/rocketmq-remoting/src/remoting.rs +++ b/rocketmq-remoting/src/remoting.rs @@ -30,6 +30,6 @@ pub trait RemotingService: Send { pub trait InvokeCallback { fn operation_complete(&self, response_future: ResponseFuture); - fn operation_succeed(&self, _response: RemotingCommand) {} - fn operation_fail(&self, _throwable: Box) {} + fn operation_succeed(&self, response: RemotingCommand); + fn operation_fail(&self, throwable: Box); } diff --git a/rocketmq-remoting/src/runtime.rs b/rocketmq-remoting/src/runtime.rs index d089965a..86ec5e68 100644 --- a/rocketmq-remoting/src/runtime.rs +++ b/rocketmq-remoting/src/runtime.rs @@ -15,13 +15,16 @@ * limitations under the License. */ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use rocketmq_common::{common::Pair, TokioExecutorService}; +use tokio::time; use crate::{ + clients::Client, net::ResponseFuture, protocol::{remoting_command::RemotingCommand, RemotingCommandType}, + remoting::InvokeCallback, runtime::{processor::RequestProcessor, server::ConnectionHandlerContext}, }; @@ -113,4 +116,20 @@ impl ServiceBridge { _msg: RemotingCommand, ) { } + + pub async fn invoke_async( + &mut self, + client: &mut Client, + request: RemotingCommand, + timeout_millis: u64, + invoke_callback: impl InvokeCallback, + ) { + if let Ok(resp) = time::timeout(Duration::from_millis(timeout_millis), async { + client.invoke(request).await.unwrap() + }) + .await + { + invoke_callback.operation_succeed(resp) + } + } }