From 069923147b0c9fb204d9cc01f64c5229243c4d2b Mon Sep 17 00:00:00 2001 From: TeslaRustor <77013810+TeslaRustor@users.noreply.github.com> Date: Wed, 26 Jun 2024 08:00:00 +0000 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#696]=F0=9F=9A=80Support=20pull=20mess?= =?UTF-8?q?age=20consume-5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../header/pull_message_request_header.rs | 6 ++ rocketmq-store/src/base/select_result.rs | 11 +++ .../src/consume_queue/consume_queue_ext.rs | 16 +++ .../src/log_file/mapped_file/default_impl.rs | 6 +- .../message_store/default_message_store.rs | 31 +++++- rocketmq-store/src/queue.rs | 17 +++- .../src/queue/batch_consume_queue.rs | 2 +- rocketmq-store/src/queue/consume_queue_ext.rs | 4 + .../src/queue/single_consume_queue.rs | 98 ++++++++++++++++++- 9 files changed, 181 insertions(+), 10 deletions(-) diff --git a/rocketmq-remoting/src/protocol/header/pull_message_request_header.rs b/rocketmq-remoting/src/protocol/header/pull_message_request_header.rs index 011b46d7..78fd7360 100644 --- a/rocketmq-remoting/src/protocol/header/pull_message_request_header.rs +++ b/rocketmq-remoting/src/protocol/header/pull_message_request_header.rs @@ -25,6 +25,7 @@ use crate::protocol::command_custom_header::CommandCustomHeader; use crate::protocol::command_custom_header::FromMap; use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait; use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader; +use crate::rpc::rpc_request_header::RpcRequestHeader; #[derive(Debug, Clone, Deserialize, Serialize, Default)] #[serde(rename_all = "camelCase")] @@ -239,6 +240,11 @@ impl CommandCustomHeader for PullMessageRequestHeader { self.proxy_forward_client_id = Some(str.clone()); } + self.topic_request = Some(TopicRequestHeader { + rpc: Some(RpcRequestHeader::default()), + ..TopicRequestHeader::default() + }); + if let Some(str) = fields.get("lo") { self.topic_request.as_mut().unwrap().lo = Some(str.parse::().unwrap()); } diff --git a/rocketmq-store/src/base/select_result.rs b/rocketmq-store/src/base/select_result.rs index 2279fb7a..23ae373c 100644 --- a/rocketmq-store/src/base/select_result.rs +++ b/rocketmq-store/src/base/select_result.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use crate::log_file::mapped_file::default_impl::DefaultMappedFile; +use crate::log_file::mapped_file::MappedFile; /// Represents the result of selecting a mapped buffer. pub struct SelectMappedBufferResult { @@ -38,4 +39,14 @@ impl SelectMappedBufferResult { [self.start_offset as usize..(self.start_offset + self.size as u64) as usize] .as_ref() } + + pub fn is_in_mem(&self) -> bool { + match self.mapped_file.as_ref() { + None => true, + Some(inner) => { + let pos = self.start_offset - inner.get_file_from_offset(); + inner.is_loaded(pos as i64, self.size as usize) + } + } + } } diff --git a/rocketmq-store/src/consume_queue/consume_queue_ext.rs b/rocketmq-store/src/consume_queue/consume_queue_ext.rs index 3deba705..81631695 100644 --- a/rocketmq-store/src/consume_queue/consume_queue_ext.rs +++ b/rocketmq-store/src/consume_queue/consume_queue_ext.rs @@ -45,4 +45,20 @@ impl CqExtUnit { filter_bit_map, } } + + pub fn size(&self) -> i16 { + self.size + } + pub fn tags_code(&self) -> i64 { + self.tags_code + } + pub fn msg_store_time(&self) -> i64 { + self.msg_store_time + } + pub fn bit_map_size(&self) -> i16 { + self.bit_map_size + } + pub fn filter_bit_map(&self) -> &Option> { + &self.filter_bit_map + } } diff --git a/rocketmq-store/src/log_file/mapped_file/default_impl.rs b/rocketmq-store/src/log_file/mapped_file/default_impl.rs index 419f02a4..7a6724ca 100644 --- a/rocketmq-store/src/log_file/mapped_file/default_impl.rs +++ b/rocketmq-store/src/log_file/mapped_file/default_impl.rs @@ -366,7 +366,7 @@ impl MappedFile for DefaultMappedFile { start_offset: self.file_from_offset + pos as u64, size, mapped_file: Some(self), - is_in_cache: false, + is_in_cache: true, }) } else { None @@ -388,7 +388,7 @@ impl MappedFile for DefaultMappedFile { start_offset: self.get_file_from_offset() + pos as u64, size: read_position - pos, mapped_file: Some(self), - is_in_cache: false, + is_in_cache: true, }) } else { None @@ -588,7 +588,7 @@ impl MappedFile for DefaultMappedFile { }*/ fn is_loaded(&self, position: i64, size: usize) -> bool { - todo!() + true } } diff --git a/rocketmq-store/src/message_store/default_message_store.rs b/rocketmq-store/src/message_store/default_message_store.rs index cfefbe6c..434c0a4d 100644 --- a/rocketmq-store/src/message_store/default_message_store.rs +++ b/rocketmq-store/src/message_store/default_message_store.rs @@ -437,6 +437,14 @@ impl DefaultMessageStore { } next_offset } + + fn check_in_mem_by_commit_offset(&self, offset_py: i64, size: i32) -> bool { + let message = self.commit_log.get_message(offset_py, size); + match message { + None => false, + Some(msg) => msg.is_in_mem(), + } + } } fn estimate_in_mem_by_commit_offset( @@ -926,7 +934,28 @@ impl MessageStore for DefaultMessageStore { consume_offset: i64, batch_size: i32, ) -> bool { - todo!() + let consume_queue = self + .consume_queue_store + .find_or_create_consume_queue(topic, queue_id); + let first_cqitem = consume_queue.lock().get(consume_offset); + if first_cqitem.is_none() { + return false; + } + let cq = first_cqitem.as_ref().unwrap(); + let start_offset_py = cq.pos; + if batch_size <= 1 { + let size = cq.size; + return self.check_in_mem_by_commit_offset(start_offset_py, size); + } + let last_cqitem = consume_queue.lock().get(consume_offset + batch_size as i64); + if last_cqitem.is_none() { + let size = cq.size; + return self.check_in_mem_by_commit_offset(start_offset_py, size); + } + let last_cqitem = last_cqitem.as_ref().unwrap(); + let end_offset_py = last_cqitem.pos; + let size = (end_offset_py - start_offset_py) + last_cqitem.size as i64; + self.check_in_mem_by_commit_offset(start_offset_py, size as i32) } } diff --git a/rocketmq-store/src/queue.rs b/rocketmq-store/src/queue.rs index 3bb50400..2ddf91ef 100644 --- a/rocketmq-store/src/queue.rs +++ b/rocketmq-store/src/queue.rs @@ -90,6 +90,21 @@ pub struct CqUnit { pub compacted_offset: i32, } +impl Default for CqUnit { + fn default() -> Self { + CqUnit { + queue_offset: 0, + size: 0, + pos: 0, + batch_num: 1, + tags_code: 0, + cq_ext_unit: None, + native_buffer: vec![], + compacted_offset: 0, + } + } +} + impl CqUnit { pub fn get_valid_tags_code_as_long(&self) -> Option { if !self.is_tags_code_valid() { @@ -291,7 +306,7 @@ pub trait ConsumeQueueTrait: Send + Sync + FileQueueLifeCycle { ) -> Result>, RocksDBException>;*/ /// Get cq unit at specified index. - fn get(&self, index: i64) -> CqUnit; + fn get(&self, index: i64) -> Option; fn get_cq_unit_and_store_time(&self, index: i64) -> Option<(CqUnit, i64)>; diff --git a/rocketmq-store/src/queue/batch_consume_queue.rs b/rocketmq-store/src/queue/batch_consume_queue.rs index aba66597..2a811e03 100644 --- a/rocketmq-store/src/queue/batch_consume_queue.rs +++ b/rocketmq-store/src/queue/batch_consume_queue.rs @@ -205,7 +205,7 @@ impl ConsumeQueueTrait for BatchConsumeQueue { todo!() } - fn get(&self, index: i64) -> CqUnit { + fn get(&self, index: i64) -> Option { todo!() } diff --git a/rocketmq-store/src/queue/consume_queue_ext.rs b/rocketmq-store/src/queue/consume_queue_ext.rs index f60a28c4..423c2542 100644 --- a/rocketmq-store/src/queue/consume_queue_ext.rs +++ b/rocketmq-store/src/queue/consume_queue_ext.rs @@ -92,4 +92,8 @@ impl ConsumeQueueExt { pub fn destroy(&mut self) { self.mapped_file_queue.destroy(); } + + pub fn get(&self, address: i64, cq_ext_unit: &CqExtUnit) -> bool { + unimplemented!() + } } diff --git a/rocketmq-store/src/queue/single_consume_queue.rs b/rocketmq-store/src/queue/single_consume_queue.rs index 2ea350b5..b36272f6 100644 --- a/rocketmq-store/src/queue/single_consume_queue.rs +++ b/rocketmq-store/src/queue/single_consume_queue.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use bytes::Buf; use bytes::BufMut; +use bytes::Bytes; use bytes::BytesMut; use rocketmq_common::common::attribute::cq_type::CQType; use rocketmq_common::common::boundary_type::BoundaryType; @@ -31,6 +32,7 @@ use tracing::info; use tracing::warn; use crate::base::dispatch_request::DispatchRequest; +use crate::base::select_result::SelectMappedBufferResult; use crate::base::store_checkpoint::StoreCheckpoint; use crate::base::swappable::Swappable; use crate::config::broker_role::BrokerRole; @@ -315,6 +317,20 @@ impl ConsumeQueue { mapped_file.append_message_bytes(&bytes); } } + + pub fn get_index_buffer(&self, start_index: i64) -> Option { + let mapped_file_size = self.mapped_file_size; + let offset = start_index * CQ_STORE_UNIT_SIZE as i64; + if offset >= self.get_min_logic_offset() { + if let Some(mapped_file) = self + .mapped_file_queue + .find_mapped_file_by_offset(offset, false) + { + return mapped_file.select_mapped_buffer((offset % mapped_file_size as i64) as i32); + } + } + None + } } impl FileQueueLifeCycle for ConsumeQueue { @@ -485,8 +501,11 @@ impl ConsumeQueueTrait for ConsumeQueue { self.queue_id } - fn get(&self, index: i64) -> CqUnit { - todo!() + fn get(&self, index: i64) -> Option { + match self.iterate_from(index) { + None => None, + Some(value) => None, + } } fn get_cq_unit_and_store_time(&self, index: i64) -> Option<(CqUnit, i64)> { @@ -538,7 +557,7 @@ impl ConsumeQueueTrait for ConsumeQueue { } fn get_min_logic_offset(&self) -> i64 { - todo!() + self.min_logic_offset.load(Ordering::Relaxed) } fn get_cq_type(&self) -> CQType { @@ -776,7 +795,15 @@ impl ConsumeQueueTrait for ConsumeQueue { } fn iterate_from(&self, start_index: i64) -> Option>> { - todo!() + match self.get_index_buffer(start_index) { + None => None, + Some(value) => Some(Box::new(ConsumeQueueIterator { + smbr: Some(value), + relative_pos: 0, + counter: 0, + consume_queue_ext: self.consume_queue_ext.clone(), + })), + } } fn iterate_from_inner( @@ -787,3 +814,66 @@ impl ConsumeQueueTrait for ConsumeQueue { todo!() } } + +struct ConsumeQueueIterator { + smbr: Option, + relative_pos: i32, + counter: i32, + consume_queue_ext: Option, +} + +impl ConsumeQueueIterator { + fn get_ext(&self, offset: i64, cq_ext_unit: &CqExtUnit) -> bool { + match self.consume_queue_ext.as_ref() { + None => false, + Some(value) => value.get(offset, cq_ext_unit), + } + } +} + +impl Iterator for ConsumeQueueIterator { + type Item = CqUnit; + + fn next(&mut self) -> Option { + match self.smbr.as_ref() { + None => None, + Some(value) => { + if self.counter * CQ_STORE_UNIT_SIZE >= value.size { + return None; + } + let mmp = value.mapped_file.as_ref().unwrap().get_mapped_file(); + let start = + value.start_offset as usize + (self.counter * CQ_STORE_UNIT_SIZE) as usize; + self.counter += 1; + let end = start + CQ_STORE_UNIT_SIZE as usize; + let mut bytes = Bytes::copy_from_slice(&mmp[start..end]); + let pos = bytes.get_i64(); + let size = bytes.get_i32(); + let tags_code = bytes.get_i64(); + let mut cq_unit = CqUnit { + queue_offset: start as i64 / CQ_STORE_UNIT_SIZE as i64, + size, + pos, + tags_code, + ..CqUnit::default() + }; + + if ConsumeQueueExt::is_ext_addr(cq_unit.tags_code) { + let cq_ext_unit = CqExtUnit::default(); + let ext_ret = self.get_ext(cq_unit.tags_code, &cq_ext_unit); + if ext_ret { + cq_unit.tags_code = cq_ext_unit.tags_code(); + cq_unit.cq_ext_unit = Some(cq_ext_unit); + } else { + error!( + "[BUG] can't find consume queue extend file content! addr={}, \ + offsetPy={}, sizePy={}", + cq_unit.tags_code, cq_unit.pos, cq_unit.pos, + ); + } + } + Some(cq_unit) + } + } + } +}