Skip to content

Commit

Permalink
[ISSUE #227]🚧Implement MessageStore put message(single message)-3 (#229)
Browse files Browse the repository at this point in the history
* [ISSUE #227]🚧Implement MessageStore put message(single message)-3

* fix ci

* fix test case error
  • Loading branch information
mxsm authored Feb 27, 2024
1 parent c2107f3 commit e20b9aa
Show file tree
Hide file tree
Showing 15 changed files with 320 additions and 60 deletions.
24 changes: 15 additions & 9 deletions rocketmq-common/src/common/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,40 @@ pub mod message_queue;
pub mod message_single;

pub trait MessageTrait {
fn get_topic(&self) -> &str;
fn topic(&self) -> &str;

fn set_topic(&mut self, topic: impl Into<String>);
fn with_topic(&mut self, topic: impl Into<String>);

fn get_tags(&self) -> Option<&str>;
fn tags(&self) -> Option<&str>;

fn set_tags(&mut self, tags: impl Into<String>);
fn with_tags(&mut self, tags: impl Into<String>);

fn put_property(&mut self, key: impl Into<String>, value: impl Into<String>);

fn get_properties(&self) -> &HashMap<String, String>;
fn properties(&self) -> &HashMap<String, String>;

fn put_user_property(&mut self, name: impl Into<String>, value: impl Into<String>);

fn get_delay_time_level(&self) -> i32;
fn delay_time_level(&self) -> i32;

fn set_delay_time_level(&self, level: i32) -> i32;
fn with_delay_time_level(&self, level: i32) -> i32;
}

pub const MESSAGE_MAGIC_CODE_V1: i32 = -626843481;
pub const MESSAGE_MAGIC_CODE_V2: i32 = -626843477;

#[derive(Debug, PartialEq, Eq, Hash)]
enum MessageVersion {
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub enum MessageVersion {
V1(i32),
V2(i32),
}

impl Default for MessageVersion {
fn default() -> Self {
Self::V1(MESSAGE_MAGIC_CODE_V1)
}
}

impl MessageVersion {
fn value_of_magic_code(magic_code: i32) -> Result<MessageVersion, &'static str> {
match magic_code {
Expand Down
19 changes: 10 additions & 9 deletions rocketmq-common/src/common/message/message_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,41 @@ pub struct MessageBatch {
pub messages: Vec<Message>,
}

#[allow(unused_variables)]
impl MessageTrait for MessageBatch {
fn get_topic(&self) -> &str {
fn topic(&self) -> &str {
todo!()
}

fn set_topic(&mut self, _topic: impl Into<String>) {
fn with_topic(&mut self, topic: impl Into<String>) {
todo!()
}

fn get_tags(&self) -> Option<&str> {
fn tags(&self) -> Option<&str> {
todo!()
}

fn set_tags(&mut self, _tags: impl Into<String>) {
fn with_tags(&mut self, tags: impl Into<String>) {
todo!()
}

fn put_property(&mut self, _key: impl Into<String>, _value: impl Into<String>) {
fn put_property(&mut self, key: impl Into<String>, value: impl Into<String>) {
todo!()
}

fn get_properties(&self) -> &HashMap<String, String> {
fn properties(&self) -> &HashMap<String, String> {
todo!()
}

fn put_user_property(&mut self, _name: impl Into<String>, _value: impl Into<String>) {
fn put_user_property(&mut self, name: impl Into<String>, value: impl Into<String>) {
todo!()
}

fn get_delay_time_level(&self) -> i32 {
fn delay_time_level(&self) -> i32 {
todo!()
}

fn set_delay_time_level(&self, _level: i32) -> i32 {
fn with_delay_time_level(&self, level: i32) -> i32 {
todo!()
}
}
Expand Down
91 changes: 80 additions & 11 deletions rocketmq-common/src/common/message/message_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@

use std::{collections::HashMap, net::SocketAddr};

use crate::common::message::{MessageTrait, MessageVersion, MESSAGE_MAGIC_CODE_V1};
use crate::{
common::{
message::{MessageTrait, MessageVersion, MESSAGE_MAGIC_CODE_V1},
sys_flag::message_sys_flag::MessageSysFlag,
},
MessageUtils,
};

#[derive(Clone, Debug, Default)]
pub struct Message {
Expand All @@ -28,40 +34,47 @@ pub struct Message {
pub transaction_id: Option<String>,
}

impl Message {
pub fn clear_property(&mut self, name: impl Into<String>) {
self.properties.remove(name.into().as_str());
}
}

#[allow(unused_variables)]
impl MessageTrait for Message {
fn get_topic(&self) -> &str {
fn topic(&self) -> &str {
todo!()
}

fn set_topic(&mut self, _topic: impl Into<String>) {
fn with_topic(&mut self, topic: impl Into<String>) {
todo!()
}

fn get_tags(&self) -> Option<&str> {
fn tags(&self) -> Option<&str> {
todo!()
}

fn set_tags(&mut self, _tags: impl Into<String>) {
fn with_tags(&mut self, tags: impl Into<String>) {
todo!()
}

fn put_property(&mut self, _key: impl Into<String>, _value: impl Into<String>) {
fn put_property(&mut self, key: impl Into<String>, value: impl Into<String>) {
todo!()
}

fn get_properties(&self) -> &HashMap<String, String> {
fn properties(&self) -> &HashMap<String, String> {
todo!()
}

fn put_user_property(&mut self, _name: impl Into<String>, _value: impl Into<String>) {
fn put_user_property(&mut self, name: impl Into<String>, value: impl Into<String>) {
todo!()
}

fn get_delay_time_level(&self) -> i32 {
fn delay_time_level(&self) -> i32 {
todo!()
}

fn set_delay_time_level(&self, _level: i32) -> i32 {
fn with_delay_time_level(&self, level: i32) -> i32 {
todo!()
}
}
Expand All @@ -80,11 +93,33 @@ pub struct MessageExt {
pub store_host: SocketAddr,
pub msg_id: String,
pub commit_log_offset: i64,
pub body_crc: i32,
pub body_crc: u32,
pub reconsume_times: i32,
pub prepared_transaction_offset: i64,
}

impl MessageExt {
pub fn topic(&self) -> &str {
self.message_inner.topic()
}

pub fn born_host(&self) -> SocketAddr {
self.born_host
}

pub fn store_host(&self) -> SocketAddr {
self.store_host
}

pub fn with_born_host_v6_flag(&mut self) {
self.sys_flag |= MessageSysFlag::BORNHOST_V6_FLAG;
}

pub fn with_store_host_v6_flag(&mut self) {
self.sys_flag |= MessageSysFlag::STOREHOSTADDRESS_V6_FLAG;
}
}

impl Default for MessageExt {
fn default() -> Self {
Self {
Expand Down Expand Up @@ -119,8 +154,42 @@ pub struct MessageExtBrokerInner {
pub tags_code: i64,
pub encoded_buff: bytes::Bytes,
pub encode_completed: bool,
pub version: MessageVersion,
}

impl MessageExtBrokerInner {
const VERSION: MessageVersion = MessageVersion::V1(MESSAGE_MAGIC_CODE_V1);

pub fn delete_property(&mut self, name: impl Into<String>) {
let name = name.into();
self.message_ext_inner
.message_inner
.clear_property(name.as_str());
self.properties_string =
MessageUtils::delete_property(self.properties_string.as_str(), name.as_str());
}

pub fn with_version(&mut self, version: MessageVersion) {
self.version = version;
}

pub fn topic(&self) -> &str {
self.message_ext_inner.topic()
}

pub fn born_host(&self) -> SocketAddr {
self.message_ext_inner.born_host()
}

pub fn store_host(&self) -> SocketAddr {
self.message_ext_inner.store_host()
}

pub fn with_born_host_v6_flag(&mut self) {
self.message_ext_inner.with_born_host_v6_flag()
}

pub fn with_store_host_v6_flag(&mut self) {
self.message_ext_inner.with_store_host_v6_flag()
}
}
3 changes: 2 additions & 1 deletion rocketmq-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ pub use crate::{
},
utils::{
crc32_utils as CRC32Utils, env_utils as EnvUtils, file_utils as FileUtils,
parse_config_file as ParseConfigFile, time_utils as TimeUtils,
message_utils as MessageUtils, parse_config_file as ParseConfigFile,
time_utils as TimeUtils,
},
};

Expand Down
1 change: 1 addition & 0 deletions rocketmq-common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
pub mod crc32_utils;
pub mod env_utils;
pub mod file_utils;
pub mod message_utils;
pub mod parse_config_file;
pub mod queue_type_utils;
pub mod time_utils;
83 changes: 83 additions & 0 deletions rocketmq-common/src/utils/message_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::{
collections::{hash_map::DefaultHasher, HashMap, HashSet},
hash::{Hash, Hasher},
};

use crate::{
common::message::{message_single::MessageExt, MessageConst},
MessageDecoder::{NAME_VALUE_SEPARATOR, PROPERTY_SEPARATOR},
};

pub fn get_sharding_key_index(sharding_key: &str, index_size: usize) -> usize {
let mut hasher = DefaultHasher::new();
sharding_key.hash(&mut hasher);
let hash = hasher.finish() as usize;
hash % index_size
}

pub fn get_sharding_key_index_by_msg(msg: &MessageExt, index_size: usize) -> usize {
let sharding_key = match msg
.message_inner
.properties
.get(MessageConst::PROPERTY_SHARDING_KEY)
{
Some(key) => key,
None => "",
};
get_sharding_key_index(sharding_key, index_size)
}

pub fn get_sharding_key_indexes(msgs: &[MessageExt], index_size: usize) -> HashSet<usize> {
let mut index_set = HashSet::with_capacity(index_size);
for msg in msgs {
let idx = get_sharding_key_index_by_msg(msg, index_size);
index_set.insert(idx);
}
index_set
}

//need refactor
pub fn delete_property(properties_string: &str, name: &str) -> String {
if properties_string.is_empty() {
return properties_string.to_owned();
}
let index1 = properties_string.find(name);
if index1.is_none() {
return properties_string.to_owned();
}
properties_string
.split(PROPERTY_SEPARATOR)
.map(|s| s.to_owned())
.filter(|s| s.starts_with(name))
.collect::<Vec<String>>()
.join(PROPERTY_SEPARATOR.to_string().as_str())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_delete_property() {
assert_eq!(
delete_property("aa\u{0001}bb\u{0002}cc\u{0001}bb\u{0002}", "a"),
"aa\u{0001}bb"
);
}
}
Loading

0 comments on commit e20b9aa

Please sign in to comment.