Skip to content

Commit

Permalink
[ISSUE #2151]♻️Refactor RegisterBrokerResponseHeader with derive marc…
Browse files Browse the repository at this point in the history
…o RequestHeaderCodec
  • Loading branch information
mxsm committed Jan 7, 2025
1 parent 2c21491 commit 6866069
Showing 1 changed file with 130 additions and 47 deletions.
177 changes: 130 additions & 47 deletions rocketmq-remoting/src/protocol/header/namesrv/register_broker_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,11 @@
* limitations under the License.
*/

use std::collections::HashMap;

use anyhow::Error;
use cheetah_string::CheetahString;
use rocketmq_macros::RequestHeaderCodec;
use serde::Deserialize;
use serde::Serialize;

use crate::protocol::command_custom_header::CommandCustomHeader;
use crate::protocol::command_custom_header::FromMap;

/// Represents the header for a broker registration request.
#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodec)]
pub struct RegisterBrokerRequestHeader {
Expand Down Expand Up @@ -116,17 +110,14 @@ impl RegisterBrokerRequestHeader {
}
}

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodec)]

Check warning on line 113 in rocketmq-remoting/src/protocol/header/namesrv/register_broker_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/namesrv/register_broker_header.rs#L113

Added line #L113 was not covered by tests
#[serde(rename_all = "camelCase")]
pub struct RegisterBrokerResponseHeader {
pub ha_server_addr: Option<CheetahString>,
pub master_addr: Option<CheetahString>,
}

impl RegisterBrokerResponseHeader {
const HA_SERVER_ADDR: &'static str = "haServerAddr";
const MASTER_ADDR: &'static str = "masterAddr";

pub fn new(ha_server_addr: Option<CheetahString>, master_addr: Option<CheetahString>) -> Self {
RegisterBrokerResponseHeader {
ha_server_addr,
Expand All @@ -135,48 +126,140 @@ impl RegisterBrokerResponseHeader {
}
}

impl CommandCustomHeader for RegisterBrokerResponseHeader {
fn check_fields(&self) -> anyhow::Result<(), Error> {
Ok(())
#[cfg(test)]
mod tests {
use cheetah_string::CheetahString;

use super::*;

#[test]
fn register_broker_request_header_new() {
let header = RegisterBrokerRequestHeader::new(
CheetahString::from("broker1"),
CheetahString::from("127.0.0.1"),
CheetahString::from("cluster1"),
CheetahString::from("127.0.0.2"),
1,
Some(3000),
Some(true),
true,
12345,
);
assert_eq!(header.broker_name, CheetahString::from("broker1"));
assert_eq!(header.broker_addr, CheetahString::from("127.0.0.1"));
assert_eq!(header.cluster_name, CheetahString::from("cluster1"));
assert_eq!(header.ha_server_addr, CheetahString::from("127.0.0.2"));
assert_eq!(header.broker_id, 1);
assert_eq!(header.heartbeat_timeout_millis, Some(3000));
assert_eq!(header.enable_acting_master, Some(true));
assert!(header.compressed);
assert_eq!(header.body_crc32, 12345);
}

fn to_map(&self) -> Option<HashMap<CheetahString, CheetahString>> {
let mut map = HashMap::<CheetahString, CheetahString>::new();
#[test]
fn register_broker_request_header_serialization() {
let header = RegisterBrokerRequestHeader::new(
CheetahString::from("broker1"),
CheetahString::from("127.0.0.1"),
CheetahString::from("cluster1"),
CheetahString::from("127.0.0.2"),
1,
Some(3000),
Some(true),
true,
12345,
);
let serialized = serde_json::to_string(&header).unwrap();
assert_eq!(
serialized,
r#"{"brokerName":"broker1","brokerAddr":"127.0.0.1","clusterName":"cluster1","haServerAddr":"127.0.0.2","brokerId":1,"heartbeatTimeoutMillis":3000,"enableActingMaster":true,"compressed":true,"bodyCrc32":12345}"#
);
}

if let Some(ref ha_server_addr) = self.ha_server_addr {
map.insert(
CheetahString::from_static_str(RegisterBrokerResponseHeader::HA_SERVER_ADDR),
ha_server_addr.clone(),
);
}
if let Some(ref master_addr) = self.master_addr {
map.insert(
CheetahString::from_static_str(RegisterBrokerResponseHeader::MASTER_ADDR),
master_addr.clone(),
);
}
#[test]
fn register_broker_request_header_deserialization() {
let json = r#"{"brokerName":"broker1","brokerAddr":"127.0.0.1","clusterName":"cluster1","haServerAddr":"127.0.0.2","brokerId":1,"heartbeatTimeoutMillis":3000,"enableActingMaster":true,"compressed":true,"bodyCrc32":12345}"#;
let deserialized: RegisterBrokerRequestHeader = serde_json::from_str(json).unwrap();
assert_eq!(deserialized.broker_name, CheetahString::from("broker1"));
assert_eq!(deserialized.broker_addr, CheetahString::from("127.0.0.1"));
assert_eq!(deserialized.cluster_name, CheetahString::from("cluster1"));
assert_eq!(
deserialized.ha_server_addr,
CheetahString::from("127.0.0.2")
);
assert_eq!(deserialized.broker_id, 1);
assert_eq!(deserialized.heartbeat_timeout_millis, Some(3000));
assert_eq!(deserialized.enable_acting_master, Some(true));
assert!(deserialized.compressed);
assert_eq!(deserialized.body_crc32, 12345);
}

Some(map)
#[test]
fn register_broker_request_header_deserialization_missing_fields() {
let json = r#"{"brokerName":"broker1","brokerAddr":"127.0.0.1","clusterName":"cluster1","haServerAddr":"127.0.0.2","brokerId":1,"compressed":true,"bodyCrc32":12345}"#;
let deserialized: RegisterBrokerRequestHeader = serde_json::from_str(json).unwrap();
assert_eq!(deserialized.broker_name, CheetahString::from("broker1"));
assert_eq!(deserialized.broker_addr, CheetahString::from("127.0.0.1"));
assert_eq!(deserialized.cluster_name, CheetahString::from("cluster1"));
assert_eq!(
deserialized.ha_server_addr,
CheetahString::from("127.0.0.2")
);
assert_eq!(deserialized.broker_id, 1);
assert_eq!(deserialized.heartbeat_timeout_millis, None);
assert_eq!(deserialized.enable_acting_master, None);
assert!(deserialized.compressed);
assert_eq!(deserialized.body_crc32, 12345);
}

#[test]
fn register_broker_response_header_new() {
let header = RegisterBrokerResponseHeader::new(
Some(CheetahString::from("127.0.0.2")),
Some(CheetahString::from("127.0.0.3")),
);
assert_eq!(
header.ha_server_addr,
Some(CheetahString::from("127.0.0.2"))
);
assert_eq!(header.master_addr, Some(CheetahString::from("127.0.0.3")));
}

#[test]
fn register_broker_response_header_serialization() {
let header = RegisterBrokerResponseHeader::new(
Some(CheetahString::from("127.0.0.2")),
Some(CheetahString::from("127.0.0.3")),
);
let serialized = serde_json::to_string(&header).unwrap();
assert_eq!(
serialized,
r#"{"haServerAddr":"127.0.0.2","masterAddr":"127.0.0.3"}"#
);
}

#[test]
fn register_broker_response_header_deserialization() {
let json = r#"{"haServerAddr":"127.0.0.2","masterAddr":"127.0.0.3"}"#;
let deserialized: RegisterBrokerResponseHeader = serde_json::from_str(json).unwrap();
assert_eq!(
deserialized.ha_server_addr,
Some(CheetahString::from("127.0.0.2"))
);
assert_eq!(
deserialized.master_addr,
Some(CheetahString::from("127.0.0.3"))
);
}
}

impl FromMap for RegisterBrokerResponseHeader {
type Error = crate::remoting_error::RemotingError;

type Target = Self;

fn from(map: &HashMap<CheetahString, CheetahString>) -> Result<Self::Target, Self::Error> {
Ok(RegisterBrokerResponseHeader {
ha_server_addr: map
.get(&CheetahString::from_static_str(
RegisterBrokerResponseHeader::HA_SERVER_ADDR,
))
.cloned(),
master_addr: map
.get(&CheetahString::from_static_str(
RegisterBrokerResponseHeader::MASTER_ADDR,
))
.cloned(),
})
#[test]
fn register_broker_response_header_deserialization_missing_fields() {
let json = r#"{"haServerAddr":"127.0.0.2"}"#;
let deserialized: RegisterBrokerResponseHeader = serde_json::from_str(json).unwrap();
assert_eq!(
deserialized.ha_server_addr,
Some(CheetahString::from("127.0.0.2"))
);
assert_eq!(deserialized.master_addr, None);
}
}

0 comments on commit 6866069

Please sign in to comment.