Skip to content

Commit

Permalink
[ISSUE #272]🚀Implement remoting client-2 (#273)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Mar 13, 2024
1 parent a9f2aa4 commit 0b5ca38
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
30 changes: 27 additions & 3 deletions rocketmq-remoting/src/clients/rocketmq_default_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::{error::Error, sync::Arc};
use std::{collections::HashMap, error::Error, sync::Arc};

use rocketmq_common::TokioExecutorService;

use crate::{
clients::RemotingClient,
clients::{Client, RemotingClient},
protocol::remoting_command::RemotingCommand,
remoting::{InvokeCallback, RemotingService},
runtime::{
Expand All @@ -31,17 +31,39 @@ use crate::{
pub struct RocketmqDefaultClient {
service_bridge: ServiceBridge,
tokio_client_config: TokioClientConfig,
//cache connection
connection_tables: HashMap<String /* ip:port */, Client>,
lock: std::sync::RwLock<()>,
}

impl RocketmqDefaultClient {
pub fn new(tokio_client_config: TokioClientConfig) -> Self {
Self {
service_bridge: ServiceBridge::new(),
tokio_client_config,
connection_tables: Default::default(),
lock: Default::default(),
}
}
}

impl RocketmqDefaultClient {
fn get_and_create_client(&mut self, addr: String) -> &mut Client {
let lc = self.lock.write().unwrap();

if self.connection_tables.contains_key(&addr) {
return self.connection_tables.get_mut(&addr).unwrap();
}

let addr_inner = addr.clone();
let client =
futures::executor::block_on(async move { Client::connect(addr_inner).await.unwrap() });
self.connection_tables.insert(addr.clone(), client);
drop(lc);
self.connection_tables.get_mut(&addr).unwrap()
}
}

#[allow(unused_variables)]
impl RemotingService for RocketmqDefaultClient {
async fn start(&mut self) {
Expand Down Expand Up @@ -91,7 +113,9 @@ impl RemotingClient for RocketmqDefaultClient {
timeout_millis: u64,
invoke_callback: impl InvokeCallback,
) -> Result<(), Box<dyn Error>> {
todo!()
let client = self.get_and_create_client(addr.clone());

unreachable!()
}

fn invoke_oneway(
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-remoting/src/remoting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ pub trait RemotingService: Send {

pub trait InvokeCallback {
fn operation_complete(&self, response_future: ResponseFuture);
fn operation_succeed(&self, _response: RemotingCommand) {}
fn operation_fail(&self, _throwable: Box<dyn std::error::Error>) {}
fn operation_succeed(&self, response: RemotingCommand);
fn operation_fail(&self, throwable: Box<dyn std::error::Error>);
}
21 changes: 20 additions & 1 deletion rocketmq-remoting/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
* limitations under the License.
*/

use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, time::Duration};

use rocketmq_common::{common::Pair, TokioExecutorService};
use tokio::time;

use crate::{
clients::Client,
net::ResponseFuture,
protocol::{remoting_command::RemotingCommand, RemotingCommandType},
remoting::InvokeCallback,
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
};

Expand Down Expand Up @@ -113,4 +116,20 @@ impl ServiceBridge {
_msg: RemotingCommand,
) {
}

pub async fn invoke_async(
&mut self,
client: &mut Client,
request: RemotingCommand,
timeout_millis: u64,
invoke_callback: impl InvokeCallback,
) {
if let Ok(resp) = time::timeout(Duration::from_millis(timeout_millis), async {
client.invoke(request).await.unwrap()
})
.await
{
invoke_callback.operation_succeed(resp)
}
}
}

0 comments on commit 0b5ca38

Please sign in to comment.