Skip to content

Commit

Permalink
[ISSUE #667]🍻Optimize socket connection (#668)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jun 20, 2024
1 parent fe98391 commit 52fd565
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 6 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions rocketmq-remoting/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,8 @@ futures-io = "0.3"
num_cpus.workspace = true
parking_lot.workspace = true
trait-variant.workspace = true
uuid = { version = "1.8.0", features = ["v4", # Lets you generate random UUIDs
"fast-rng", # Use a faster (but still sufficiently random) RNG
"macro-diagnostics", ] }
[dev-dependencies]
bytes = "1.6.0"
6 changes: 4 additions & 2 deletions rocketmq-remoting/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio::net::TcpStream;
use tokio_util::codec::Framed;

use crate::codec::remoting_command_codec::RemotingCommandCodec;
use crate::net::channel::Channel;

/// Send and receive `Frame` values from a remote peer.
///
Expand All @@ -37,7 +38,7 @@ use crate::codec::remoting_command_codec::RemotingCommandCodec;
pub struct Connection {
pub(crate) framed: Framed<TcpStream, RemotingCommandCodec>,
pub(crate) remote_addr: SocketAddr,
pub(crate) channel: Channel,
}

impl Connection {
Expand All @@ -51,9 +52,10 @@ impl Connection {
///
/// A new `Connection` instance.
pub fn new(tcp_stream: TcpStream, remote_addr: SocketAddr) -> Connection {
let channel = Channel::new(tcp_stream.local_addr().unwrap(), remote_addr);
Self {
framed: Framed::new(tcp_stream, RemotingCommandCodec::new()),
remote_addr,
channel,
}
}
}
2 changes: 1 addition & 1 deletion rocketmq-remoting/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

pub mod channel;
pub struct ResponseFuture;
98 changes: 98 additions & 0 deletions rocketmq-remoting/src/net/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::net::SocketAddr;

use uuid::Uuid;

#[derive(Clone, PartialEq, Hash)]
pub struct Channel {
local_address: SocketAddr,
remote_address: SocketAddr,
channel_id: String,
}

impl Channel {
pub fn new(local_address: SocketAddr, remote_address: SocketAddr) -> Self {
let channel_id = Uuid::new_v4().to_string();
Self {
local_address,
remote_address,
channel_id,
}
}
}

impl Channel {
pub fn set_local_address(&mut self, local_address: SocketAddr) {
self.local_address = local_address;
}
pub fn set_remote_address(&mut self, remote_address: SocketAddr) {
self.remote_address = remote_address;
}
pub fn set_channel_id(&mut self, channel_id: String) {
self.channel_id = channel_id;
}

pub fn local_address(&self) -> SocketAddr {
self.local_address
}
pub fn remote_address(&self) -> SocketAddr {
self.remote_address
}
pub fn channel_id(&self) -> &str {
self.channel_id.as_str()
}
}

#[cfg(test)]
mod tests {
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;

use super::*;

#[test]
fn channel_creation_with_new() {
let local_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let remote_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)), 8080);
let channel = Channel::new(local_address, remote_address);

assert_eq!(channel.local_address(), local_address);
assert_eq!(channel.remote_address(), remote_address);
assert!(Uuid::parse_str(channel.channel_id()).is_ok());
}

#[test]
fn channel_setters_work_correctly() {
let local_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let remote_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)), 8080);
let new_local_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 8080);
let new_remote_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 16, 0, 1)), 8080);
let new_channel_id = Uuid::new_v4().to_string();

let mut channel = Channel::new(local_address, remote_address);
channel.set_local_address(new_local_address);
channel.set_remote_address(new_remote_address);
channel.set_channel_id(new_channel_id.clone());

assert_eq!(channel.local_address(), new_local_address);
assert_eq!(channel.remote_address(), new_remote_address);
assert_eq!(channel.channel_id(), new_channel_id);
}
}
5 changes: 2 additions & 3 deletions rocketmq-remoting/src/runtime/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub struct ConnectionHandler<RP> {
impl<RP> Drop for ConnectionHandler<RP> {
fn drop(&mut self) {
if let Some(ref sender) = self.conn_disconnect_notify {
let socket_addr = self.connection.remote_addr;
let socket_addr = self.connection.channel.remote_address();
warn!(
"connection[{}] disconnected, Send notify message.",
socket_addr
Expand All @@ -69,7 +69,6 @@ impl<RP> Drop for ConnectionHandler<RP> {

impl<RP: RequestProcessor + Sync + 'static> ConnectionHandler<RP> {
async fn handle(&mut self) -> anyhow::Result<()> {
let _remote_addr = self.connection.remote_addr;
while !self.shutdown.is_shutdown {
let frame = tokio::select! {
res = self.connection.framed.next() => res,
Expand Down Expand Up @@ -340,7 +339,7 @@ impl<'a> ConnectionHandlerContext<'a> {
}

pub fn remoting_address(&self) -> SocketAddr {
self.connection.remote_addr
self.connection.channel.remote_address()
}
}

Expand Down

0 comments on commit 52fd565

Please sign in to comment.