From 7e940b806f3c33aab04b75e314aca264ae9e2b60 Mon Sep 17 00:00:00 2001 From: mxsm Date: Tue, 14 May 2024 23:37:28 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#355]=F0=9F=9A=80Implement=20ConsumeQu?= =?UTF-8?q?eueStore=20recover=20(#356)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-common/src/utils/util_all.rs | 19 +- rocketmq-store/src/base/store_checkpoint.rs | 148 +++++++++++++- rocketmq-store/src/config/broker_role.rs | 3 +- .../src/consume_queue/consume_queue_ext.rs | 32 +++- .../src/consume_queue/mapped_file_queue.rs | 2 +- rocketmq-store/src/filter.rs | 4 +- rocketmq-store/src/log_file/commit_log.rs | 10 +- .../src/log_file/mapped_file/default_impl.rs | 2 +- .../message_store/default_message_store.rs | 33 ++-- rocketmq-store/src/queue.rs | 35 +++- .../src/queue/batch_consume_queue.rs | 10 +- .../src/queue/build_consume_queue.rs | 14 +- rocketmq-store/src/queue/consume_queue_ext.rs | 6 +- .../queue/local_file_consume_queue_store.rs | 68 ++++++- .../src/queue/queue_offset_operator.rs | 4 +- .../src/queue/single_consume_queue.rs | 181 +++++++++++++++++- rocketmq-store/src/store/running_flags.rs | 6 + 17 files changed, 512 insertions(+), 65 deletions(-) diff --git a/rocketmq-common/src/utils/util_all.rs b/rocketmq-common/src/utils/util_all.rs index 2495f2ce..9da36912 100644 --- a/rocketmq-common/src/utils/util_all.rs +++ b/rocketmq-common/src/utils/util_all.rs @@ -15,7 +15,9 @@ * limitations under the License. */ -use chrono::{Datelike, TimeZone, Timelike, Utc}; +use std::path::PathBuf; + +use chrono::{DateTime, Datelike, TimeZone, Timelike, Utc}; const HEX_ARRAY: [char; 16] = [ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', @@ -84,10 +86,25 @@ pub fn time_millis_to_human_string3(t: i64) -> String { ) } +pub fn time_millis_to_human_string(t: i64) -> String { + let dt = DateTime::::from_timestamp_millis(t); + dt.as_ref().unwrap().format("%Y%m%d%H%M%S%3f").to_string() +} + pub fn offset_to_file_name(offset: u64) -> String { format!("{:020}", offset) } +/*pub fn ensure_dir_ok(dir: impl AsRef) -> Result<(), std::io::Error> { + match dir.as_ref().exists() { + true => Ok(()), + false => Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("{:?}", dir.as_ref()), + )), + } +}*/ + #[cfg(test)] mod tests { use super::*; diff --git a/rocketmq-store/src/base/store_checkpoint.rs b/rocketmq-store/src/base/store_checkpoint.rs index fe487b7d..eed2dee5 100644 --- a/rocketmq-store/src/base/store_checkpoint.rs +++ b/rocketmq-store/src/base/store_checkpoint.rs @@ -15,21 +15,151 @@ * limitations under the License. */ -#[derive(Default, Clone)] -pub struct StoreCheckpoint {} +use std::{ + fs::{File, OpenOptions}, + io::Write, + path::Path, + sync::atomic::{AtomicU64, Ordering}, +}; + +use memmap2::MmapMut; +use tracing::info; + +use crate::log_file::mapped_file::default_impl::OS_PAGE_SIZE; + +pub struct StoreCheckpoint { + file: File, + mmap: parking_lot::Mutex, + physic_msg_timestamp: AtomicU64, + logics_msg_timestamp: AtomicU64, + index_msg_timestamp: AtomicU64, + master_flushed_offset: AtomicU64, + confirm_phy_offset: AtomicU64, +} impl StoreCheckpoint { - pub fn new(_scp_path: String) -> Self { - Self {} + pub fn new>(path: P) -> std::io::Result { + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(path.as_ref())?; + + let mmap = unsafe { MmapMut::map_mut(&file)? }; + let _ = file.set_len(OS_PAGE_SIZE); + if file.metadata()?.len() > 0 { + let buffer = &mmap[..8]; + let physic_msg_timestamp = u64::from_ne_bytes(buffer.try_into().unwrap()); + let logics_msg_timestamp = u64::from_ne_bytes(mmap[8..16].try_into().unwrap()); + let index_msg_timestamp = u64::from_ne_bytes(mmap[16..24].try_into().unwrap()); + let master_flushed_offset = u64::from_ne_bytes(mmap[24..32].try_into().unwrap()); + let confirm_phy_offset = u64::from_ne_bytes(mmap[32..40].try_into().unwrap()); + + info!("store checkpoint file exists, {}", path.as_ref().display()); + info!("physicMsgTimestamp: {}", physic_msg_timestamp); + info!("logicsMsgTimestamp: {}", logics_msg_timestamp); + info!("indexMsgTimestamp: {}", index_msg_timestamp); + info!("masterFlushedOffset: {}", master_flushed_offset); + info!("confirmPhyOffset: {}", confirm_phy_offset); + + Ok(Self { + file, + mmap: parking_lot::Mutex::new(mmap), + physic_msg_timestamp: AtomicU64::new(physic_msg_timestamp), + logics_msg_timestamp: AtomicU64::new(logics_msg_timestamp), + index_msg_timestamp: AtomicU64::new(index_msg_timestamp), + master_flushed_offset: AtomicU64::new(master_flushed_offset), + confirm_phy_offset: AtomicU64::new(confirm_phy_offset), + }) + } else { + //info!("store checkpoint file not exists, {}", path.as_ref()); + Ok(Self { + file, + mmap: parking_lot::Mutex::new(mmap), + physic_msg_timestamp: AtomicU64::new(0), + logics_msg_timestamp: AtomicU64::new(0), + index_msg_timestamp: AtomicU64::new(0), + master_flushed_offset: AtomicU64::new(0), + confirm_phy_offset: AtomicU64::new(0), + }) + } + } + + fn flush(&self) -> std::io::Result<()> { + let mut buffer = &mut self.mmap.lock()[..8]; + buffer.write_all( + self.physic_msg_timestamp + .load(Ordering::Relaxed) + .to_ne_bytes() + .as_ref(), + )?; + buffer.write_all( + self.logics_msg_timestamp + .load(Ordering::Relaxed) + .to_ne_bytes() + .as_ref(), + )?; + buffer.write_all( + self.index_msg_timestamp + .load(Ordering::Relaxed) + .to_ne_bytes() + .as_ref(), + )?; + buffer.write_all( + self.master_flushed_offset + .load(Ordering::Relaxed) + .to_ne_bytes() + .as_ref(), + )?; + buffer.write_all( + self.confirm_phy_offset + .load(Ordering::Relaxed) + .to_ne_bytes() + .as_ref(), + )?; + self.mmap.lock().flush()?; + Ok(()) } - pub fn get_master_flushed_offset(&self) -> i64 { - -1 + fn shutdown(&self) -> std::io::Result<()> { + self.flush() } - pub fn get_confirm_phy_offset(&self) -> i64 { - -1 + pub fn set_physic_msg_timestamp(&self, physic_msg_timestamp: u64) { + self.physic_msg_timestamp + .store(physic_msg_timestamp, Ordering::Relaxed); + } + pub fn set_logics_msg_timestamp(&self, logics_msg_timestamp: u64) { + self.logics_msg_timestamp + .store(logics_msg_timestamp, Ordering::Relaxed); + } + pub fn set_index_msg_timestamp(&self, index_msg_timestamp: u64) { + self.index_msg_timestamp + .store(index_msg_timestamp, Ordering::Relaxed); + } + pub fn set_master_flushed_offset(&self, master_flushed_offset: u64) { + self.master_flushed_offset + .store(master_flushed_offset, Ordering::Relaxed); + } + pub fn set_confirm_phy_offset(&self, confirm_phy_offset: u64) { + self.confirm_phy_offset + .store(confirm_phy_offset, Ordering::Relaxed); } - pub fn set_confirm_phy_offset(&mut self, _confirm_phy_offset: i64) {} + pub fn physic_msg_timestamp(&self) -> u64 { + self.physic_msg_timestamp.load(Ordering::Relaxed) + } + pub fn logics_msg_timestamp(&self) -> u64 { + self.logics_msg_timestamp.load(Ordering::Relaxed) + } + pub fn index_msg_timestamp(&self) -> u64 { + self.index_msg_timestamp.load(Ordering::Relaxed) + } + pub fn master_flushed_offset(&self) -> u64 { + self.master_flushed_offset.load(Ordering::Relaxed) + } + pub fn confirm_phy_offset(&self) -> u64 { + self.confirm_phy_offset.load(Ordering::Relaxed) + } } diff --git a/rocketmq-store/src/config/broker_role.rs b/rocketmq-store/src/config/broker_role.rs index 38fca5d6..7b568efe 100644 --- a/rocketmq-store/src/config/broker_role.rs +++ b/rocketmq-store/src/config/broker_role.rs @@ -18,8 +18,7 @@ use std::fmt; use serde::{Deserialize, Deserializer}; -#[allow(dead_code)] -#[derive(Debug, Copy, Clone, Default)] +#[derive(Debug, Copy, Clone, Default, Eq, PartialEq)] pub enum BrokerRole { #[default] AsyncMaster, diff --git a/rocketmq-store/src/consume_queue/consume_queue_ext.rs b/rocketmq-store/src/consume_queue/consume_queue_ext.rs index 03612868..3deba705 100644 --- a/rocketmq-store/src/consume_queue/consume_queue_ext.rs +++ b/rocketmq-store/src/consume_queue/consume_queue_ext.rs @@ -15,4 +15,34 @@ * limitations under the License. */ -pub struct ConsumeQueueExtCqExtUnit {} +const MIN_EXT_UNIT_SIZE: i16 = 2 // size, 32k max + + 8 * 2 // msg time + tagCode + + 2; // bitMapSize +const MAX_EXT_UNIT_SIZE: i16 = i16::MAX; + +#[derive(Clone, Default)] +pub struct CqExtUnit { + size: i16, + tags_code: i64, + msg_store_time: i64, + bit_map_size: i16, + filter_bit_map: Option>, +} + +impl CqExtUnit { + pub fn new(tags_code: i64, msg_store_time: i64, filter_bit_map: Option>) -> Self { + let bit_map_size = if let Some(val) = filter_bit_map.as_ref() { + val.len() as i16 + } else { + 0 + }; + let size = MIN_EXT_UNIT_SIZE + bit_map_size; + Self { + size, + tags_code, + msg_store_time, + bit_map_size, + filter_bit_map, + } + } +} diff --git a/rocketmq-store/src/consume_queue/mapped_file_queue.rs b/rocketmq-store/src/consume_queue/mapped_file_queue.rs index 216fce54..e48f1046 100644 --- a/rocketmq-store/src/consume_queue/mapped_file_queue.rs +++ b/rocketmq-store/src/consume_queue/mapped_file_queue.rs @@ -161,7 +161,7 @@ impl MappedFileQueue { self.mapped_files.last().cloned() } - pub async fn get_last_mapped_file_mut_start_offset( + pub fn get_last_mapped_file_mut_start_offset( &mut self, start_offset: u64, need_create: bool, diff --git a/rocketmq-store/src/filter.rs b/rocketmq-store/src/filter.rs index 17f685f4..d5741cd6 100644 --- a/rocketmq-store/src/filter.rs +++ b/rocketmq-store/src/filter.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; -use crate::consume_queue::consume_queue_ext::ConsumeQueueExtCqExtUnit; +use crate::consume_queue::consume_queue_ext::CqExtUnit; /// Represents a message filter. pub trait MessageFilter { @@ -26,7 +26,7 @@ pub trait MessageFilter { fn is_matched_by_consume_queue( &self, tags_code: Option, - cq_ext_unit: Option<&ConsumeQueueExtCqExtUnit>, + cq_ext_unit: Option<&CqExtUnit>, ) -> bool; /// Matches by message content which is stored in the commit log. diff --git a/rocketmq-store/src/log_file/commit_log.rs b/rocketmq-store/src/log_file/commit_log.rs index 919b0086..e5467e58 100644 --- a/rocketmq-store/src/log_file/commit_log.rs +++ b/rocketmq-store/src/log_file/commit_log.rs @@ -86,7 +86,7 @@ pub struct CommitLog { //local_file_message_store: Option>>, dispatcher: CommitLogDispatcherDefault, confirm_offset: i64, - store_checkpoint: StoreCheckpoint, + store_checkpoint: Arc, append_message_callback: Arc, } @@ -95,7 +95,7 @@ impl CommitLog { message_store_config: Arc, broker_config: Arc, dispatcher: &CommitLogDispatcherDefault, - store_checkpoint: StoreCheckpoint, + store_checkpoint: Arc, topic_config_table: Arc>>, ) -> Self { let enabled_append_prop_crc = message_store_config.enabled_append_prop_crc; @@ -139,7 +139,8 @@ impl CommitLog { pub fn set_confirm_offset(&mut self, phy_offset: i64) { self.confirm_offset = phy_offset; - self.store_checkpoint.set_confirm_phy_offset(phy_offset); + self.store_checkpoint + .set_confirm_phy_offset(phy_offset as u64); } pub async fn put_message(&mut self, msg: MessageExtBrokerInner) -> PutMessageResult { @@ -193,7 +194,6 @@ impl CommitLog { None => self .mapped_file_queue .get_last_mapped_file_mut_start_offset(0, true) - .await .unwrap(), Some(mapped_file) => mapped_file, }; @@ -336,7 +336,7 @@ impl CommitLog { } else { warn!( "The commitlog files are deleted, and delete the consume queue - files" + files" ); self.mapped_file_queue.set_flushed_where(0); self.mapped_file_queue.set_committed_where(0); diff --git a/rocketmq-store/src/log_file/mapped_file/default_impl.rs b/rocketmq-store/src/log_file/mapped_file/default_impl.rs index d0a42db7..903a98b0 100644 --- a/rocketmq-store/src/log_file/mapped_file/default_impl.rs +++ b/rocketmq-store/src/log_file/mapped_file/default_impl.rs @@ -41,7 +41,7 @@ use crate::{ log_file::mapped_file::MappedFile, }; -const OS_PAGE_SIZE: u64 = 1024 * 4; +pub const OS_PAGE_SIZE: u64 = 1024 * 4; static TOTAL_MAPPED_VIRTUAL_MEMORY: AtomicI64 = AtomicI64::new(0); static TOTAL_MAPPED_FILES: AtomicI32 = AtomicI32::new(0); diff --git a/rocketmq-store/src/message_store/default_message_store.rs b/rocketmq-store/src/message_store/default_message_store.rs index 65ed4daf..d04fb9e3 100644 --- a/rocketmq-store/src/message_store/default_message_store.rs +++ b/rocketmq-store/src/message_store/default_message_store.rs @@ -71,7 +71,7 @@ pub struct DefaultMessageStore { //message_store_runtime: Option, commit_log: CommitLog, compaction_service: CompactionService, - store_checkpoint: Option, + store_checkpoint: Option>, master_flushed_offset: Arc, index_service: IndexService, allocate_mapped_file_service: Arc, @@ -112,18 +112,29 @@ impl DefaultMessageStore { broker_config: Arc, ) -> Self { let index_service = IndexService {}; + let running_flags = Arc::new(RunningFlags::new()); + let store_checkpoint = Arc::new( + StoreCheckpoint::new(get_store_checkpoint( + message_store_config.store_path_root_dir.as_str(), + )) + .unwrap(), + ); let build_index = CommitLogDispatcherBuildIndex::new(index_service.clone(), message_store_config.clone()); let topic_config_table = Arc::new(parking_lot::Mutex::new(HashMap::new())); - let consume_queue_store = - ConsumeQueueStore::new(message_store_config.clone(), topic_config_table.clone()); + let consume_queue_store = ConsumeQueueStore::new( + message_store_config.clone(), + topic_config_table.clone(), + running_flags.clone(), + store_checkpoint.clone(), + ); let build_consume_queue = CommitLogDispatcherBuildConsumeQueue::new(consume_queue_store.clone()); let dispatcher = CommitLogDispatcherDefault { build_index, build_consume_queue, }; - let store_checkpoint = StoreCheckpoint {}; + let commit_log = CommitLog::new( message_store_config.clone(), broker_config.clone(), @@ -148,7 +159,7 @@ impl DefaultMessageStore { broker_init_max_offset: Arc::new(AtomicI64::new(-1)), state_machine_version: Arc::new(AtomicI64::new(0)), shutdown: Arc::new(AtomicBool::new(false)), - running_flags: Arc::new(RunningFlags::new()), + running_flags, } } } @@ -301,12 +312,12 @@ impl MessageStore for DefaultMessageStore { }, self.message_store_config.store_path_root_dir ); - //load Commit log + //load Commit log-- init commit mapped file queue let mut result = self.commit_log.load(); if !result { return result; } - // load Consume Queue + // load Consume Queue-- init Consume log mapped file queue result &= self.consume_queue_store.load(); if self.message_store_config.enable_compaction { @@ -317,13 +328,13 @@ impl MessageStore for DefaultMessageStore { } if result { - self.store_checkpoint = Some(StoreCheckpoint::new(get_store_checkpoint( + /*self.store_checkpoint = Some(StoreCheckpoint::new(get_store_checkpoint( self.message_store_config.store_path_root_dir.as_str(), - ))); + )));*/ let checkpoint = self.store_checkpoint.as_ref().unwrap(); self.master_flushed_offset = - Arc::new(AtomicI64::new(checkpoint.get_master_flushed_offset())); - self.set_confirm_offset(checkpoint.get_confirm_phy_offset()); + Arc::new(AtomicI64::new(checkpoint.master_flushed_offset() as i64)); + self.set_confirm_offset(checkpoint.confirm_phy_offset() as i64); result = self.index_service.load(last_exit_ok); self.recover(last_exit_ok).await; info!( diff --git a/rocketmq-store/src/queue.rs b/rocketmq-store/src/queue.rs index ece5141a..070ed2de 100644 --- a/rocketmq-store/src/queue.rs +++ b/rocketmq-store/src/queue.rs @@ -30,7 +30,7 @@ use rocketmq_common::common::{ use crate::{ base::{dispatch_request::DispatchRequest, swappable::Swappable}, - consume_queue::consume_queue_ext::ConsumeQueueExtCqExtUnit, + consume_queue::consume_queue_ext::CqExtUnit, filter::MessageFilter, queue::queue_offset_operator::QueueOffsetOperator, }; @@ -85,7 +85,7 @@ pub struct CqUnit { pub pos: i64, pub batch_num: i16, pub tags_code: i64, - pub cq_ext_unit: Option, + pub cq_ext_unit: Option, pub native_buffer: Vec, pub compacted_offset: i32, } @@ -168,12 +168,28 @@ pub trait ConsumeQueueStoreTrait: Send + Sync { /// /// `request`: Dispatch request. /// Throws RocksDBException only in rocksdb mode. - fn put_message_position_info_wrapper_single(&self, request: DispatchRequest); + fn put_message_position_info_wrapper(&self, request: &DispatchRequest); + + fn put_message_position_info_wrapper_with_cq( + &self, + consume_queue: &mut dyn ConsumeQueueTrait, + request: &DispatchRequest, + ); + + fn range_query( + &self, + topic: &str, + queue_id: i32, + start_index: i64, + num: i32, + ) -> Option>; + + fn get_signal(&self, topic: &str, queue_id: i32, start_index: i64) -> Option; /// Increase queue offset. /// `msg`: Message itself. /// `message_num`: Message number. - fn increase_queue_offset(&mut self, msg: MessageExtBrokerInner, message_num: i16); + fn increase_queue_offset(&mut self, msg: &MessageExtBrokerInner, message_num: i16); /// Increase lmq offset. /// `queue_key`: Queue key. @@ -258,12 +274,11 @@ pub trait ConsumeQueueTrait: Send + Sync + FileQueueLifeCycle { /// Get cq unit at specified index. fn get(&self, index: i64) -> CqUnit; + fn get_cq_unit_and_store_time(&self, index: i64) -> Option<(CqUnit, i64)>; + /// Get earliest cq unit. - /* fn get_earliest_unit_and_store_time( - &self, - index: i64, - ) -> Result, RocksDBException>; - */ + fn get_earliest_unit_and_store_time(&self) -> Option<(CqUnit, i64)>; + /// Get earliest cq unit. fn get_earliest_unit(&self) -> CqUnit; @@ -314,7 +329,7 @@ pub trait ConsumeQueueTrait: Send + Sync + FileQueueLifeCycle { fn correct_min_offset(&self, min_commit_log_offset: i64); /// Do dispatch. - fn put_message_position_info_wrapper(&self, request: DispatchRequest); + fn put_message_position_info_wrapper(&mut self, request: &DispatchRequest); /// Assign queue offset. /* fn assign_queue_offset( diff --git a/rocketmq-store/src/queue/batch_consume_queue.rs b/rocketmq-store/src/queue/batch_consume_queue.rs index d61117c5..5d748154 100644 --- a/rocketmq-store/src/queue/batch_consume_queue.rs +++ b/rocketmq-store/src/queue/batch_consume_queue.rs @@ -211,6 +211,14 @@ impl ConsumeQueueTrait for BatchConsumeQueue { todo!() } + fn get_cq_unit_and_store_time(&self, index: i64) -> Option<(CqUnit, i64)> { + todo!() + } + + fn get_earliest_unit_and_store_time(&self) -> Option<(CqUnit, i64)> { + todo!() + } + fn get_earliest_unit(&self) -> CqUnit { todo!() } @@ -271,7 +279,7 @@ impl ConsumeQueueTrait for BatchConsumeQueue { todo!() } - fn put_message_position_info_wrapper(&self, request: DispatchRequest) { + fn put_message_position_info_wrapper(&mut self, request: &DispatchRequest) { todo!() } diff --git a/rocketmq-store/src/queue/build_consume_queue.rs b/rocketmq-store/src/queue/build_consume_queue.rs index 8684a1db..01ebd38b 100644 --- a/rocketmq-store/src/queue/build_consume_queue.rs +++ b/rocketmq-store/src/queue/build_consume_queue.rs @@ -14,9 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag; + use crate::{ base::{commit_log_dispatcher::CommitLogDispatcher, dispatch_request::DispatchRequest}, - queue::local_file_consume_queue_store::ConsumeQueueStore, + queue::{local_file_consume_queue_store::ConsumeQueueStore, ConsumeQueueStoreTrait}, }; #[derive(Clone)] @@ -34,7 +36,13 @@ impl CommitLogDispatcherBuildConsumeQueue { impl CommitLogDispatcher for CommitLogDispatcherBuildConsumeQueue { fn dispatch(&mut self, dispatch_request: &DispatchRequest) { - self.consume_queue_store - .put_message_position_info_wrapper(dispatch_request); + let tran_type = MessageSysFlag::get_transaction_value(dispatch_request.sys_flag); + match tran_type { + MessageSysFlag::TRANSACTION_NOT_TYPE | MessageSysFlag::TRANSACTION_COMMIT_TYPE => { + self.consume_queue_store + .put_message_position_info_wrapper(dispatch_request); + } + _ => {} + } } } diff --git a/rocketmq-store/src/queue/consume_queue_ext.rs b/rocketmq-store/src/queue/consume_queue_ext.rs index 335f7453..6d81130c 100644 --- a/rocketmq-store/src/queue/consume_queue_ext.rs +++ b/rocketmq-store/src/queue/consume_queue_ext.rs @@ -18,7 +18,7 @@ use std::path::PathBuf; use tracing::info; -use crate::consume_queue::mapped_file_queue::MappedFileQueue; +use crate::consume_queue::{consume_queue_ext::CqExtUnit, mapped_file_queue::MappedFileQueue}; const END_BLANK_DATA_LENGTH: usize = 4; @@ -81,4 +81,8 @@ impl ConsumeQueueExt { } pub fn recover(&mut self) {} + + pub fn put(&self, cq_ext_unit: CqExtUnit) -> i64 { + unimplemented!() + } } diff --git a/rocketmq-store/src/queue/local_file_consume_queue_store.rs b/rocketmq-store/src/queue/local_file_consume_queue_store.rs index f0d6dd93..d7fdd458 100644 --- a/rocketmq-store/src/queue/local_file_consume_queue_store.rs +++ b/rocketmq-store/src/queue/local_file_consume_queue_store.rs @@ -18,6 +18,7 @@ use std::{collections::HashMap, fs, path::Path, sync::Arc}; +use bytes::Bytes; use rocketmq_common::{ common::{ attribute::cq_type::CQType, config::TopicConfig, @@ -27,18 +28,21 @@ use rocketmq_common::{ }; use crate::{ - base::dispatch_request::DispatchRequest, + base::{dispatch_request::DispatchRequest, store_checkpoint::StoreCheckpoint}, config::message_store_config::MessageStoreConfig, queue::{ batch_consume_queue::BatchConsumeQueue, queue_offset_operator::QueueOffsetOperator, single_consume_queue::ConsumeQueue, ConsumeQueueStoreTrait, ConsumeQueueTrait, CqUnit, }, + store::running_flags::RunningFlags, store_path_config_helper::{get_store_path_batch_consume_queue, get_store_path_consume_queue}, }; #[derive(Clone)] pub struct ConsumeQueueStore { inner: Arc, + running_flags: Arc, + store_checkpoint: Arc, topic_config_table: Arc>>, } @@ -53,11 +57,22 @@ struct ConsumeQueueStoreInner { pub(crate) consume_queue_table: ConsumeQueueTable, } +impl ConsumeQueueStoreInner { + fn put_message_position_info_wrapper( + &self, + consume_queue: &mut dyn ConsumeQueueTrait, + request: &DispatchRequest, + ) { + consume_queue.put_message_position_info_wrapper(request) + } +} + impl ConsumeQueueStore { pub fn new( message_store_config: Arc, topic_config_table: Arc>>, - //commit_log: Arc>, + running_flags: Arc, + store_checkpoint: Arc, ) -> Self { Self { inner: Arc::new(ConsumeQueueStoreInner { @@ -66,16 +81,18 @@ impl ConsumeQueueStore { queue_offset_operator: QueueOffsetOperator::new(), consume_queue_table: parking_lot::Mutex::new(HashMap::new()), }), + running_flags, + store_checkpoint, topic_config_table, } } - pub fn put_message_position_info_wrapper(&mut self, dispatch_request: &DispatchRequest) { + /* pub fn put_message_position_info_wrapper(&mut self, dispatch_request: &DispatchRequest) { println!( "put_message_position_info_wrapper-----{}", dispatch_request.topic ) - } + }*/ } #[allow(unused_variables)] @@ -103,10 +120,16 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { } fn recover(&mut self) { - let mutex = &mut *self.inner.consume_queue_table.lock(); + let mut mutex = self.inner.consume_queue_table.lock().clone(); for (_topic, consume_queue_table) in mutex.iter_mut() { for (_queue_id, consume_queue) in consume_queue_table.iter_mut() { - consume_queue.lock().recover(); + //consume_queue.lock().recover(); + let guard = consume_queue.lock(); + let queue_id = guard.get_queue_id(); + let topic = guard.get_topic(); + drop(guard); + let file_queue_life_cycle = self.get_life_cycle(topic.as_str(), queue_id); + file_queue_life_cycle.lock().recover(); } } } @@ -174,11 +197,36 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { } } - fn put_message_position_info_wrapper_single(&self, request: DispatchRequest) { + fn put_message_position_info_wrapper(&self, request: &DispatchRequest) { + let cq = self.find_or_create_consume_queue(request.topic.as_str(), request.queue_id); + self.put_message_position_info_wrapper_with_cq(cq.lock().as_mut(), request); + println!("put_message_position_info_wrapper-----{}", request.topic) + } + + fn put_message_position_info_wrapper_with_cq( + &self, + consume_queue: &mut dyn ConsumeQueueTrait, + request: &DispatchRequest, + ) { + self.inner + .put_message_position_info_wrapper(consume_queue, request); + } + + fn range_query( + &self, + topic: &str, + queue_id: i32, + start_index: i64, + num: i32, + ) -> Option> { + todo!() + } + + fn get_signal(&self, topic: &str, queue_id: i32, start_index: i64) -> Option { todo!() } - fn increase_queue_offset(&mut self, msg: MessageExtBrokerInner, message_num: i16) { + fn increase_queue_offset(&mut self, msg: &MessageExtBrokerInner, message_num: i16) { todo!() } @@ -258,6 +306,8 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { .message_store_config .get_mapped_file_size_consume_queue(), self.inner.message_store_config.clone(), + self.running_flags.clone(), + self.store_checkpoint.clone(), )))), CQType::BatchCQ => { Arc::new(parking_lot::Mutex::new(Box::new(BatchConsumeQueue::new( @@ -393,6 +443,8 @@ impl ConsumeQueueStore { .message_store_config .get_mapped_file_size_consume_queue(), self.inner.message_store_config.clone(), + self.running_flags.clone(), + self.store_checkpoint.clone(), ); Box::new(consume_queue) } diff --git a/rocketmq-store/src/queue/queue_offset_operator.rs b/rocketmq-store/src/queue/queue_offset_operator.rs index b4a2a7bb..f5067a4a 100644 --- a/rocketmq-store/src/queue/queue_offset_operator.rs +++ b/rocketmq-store/src/queue/queue_offset_operator.rs @@ -17,6 +17,8 @@ use std::{collections::HashMap, sync::Arc}; +use tracing::info; + pub struct QueueOffsetOperator { topic_queue_table: Arc>>, batch_topic_queue_table: Arc>>, @@ -106,7 +108,7 @@ impl QueueOffsetOperator { let mut lmq_topic_queue_table = self.lmq_topic_queue_table.lock(); lmq_topic_queue_table.remove(&topic_queue_key); - println!( + info!( "removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queue_id ); diff --git a/rocketmq-store/src/queue/single_consume_queue.rs b/rocketmq-store/src/queue/single_consume_queue.rs index f532b9a9..b347f129 100644 --- a/rocketmq-store/src/queue/single_consume_queue.rs +++ b/rocketmq-store/src/queue/single_consume_queue.rs @@ -16,23 +16,26 @@ */ use std::{path::PathBuf, sync::Arc}; -use bytes::Buf; +use bytes::{Buf, BufMut, BytesMut}; use rocketmq_common::common::{ attribute::cq_type::CQType, boundary_type::BoundaryType, message::message_single::MessageExtBrokerInner, }; -use tracing::info; +use tracing::{error, info, warn}; use crate::{ - base::{dispatch_request::DispatchRequest, swappable::Swappable}, - config::message_store_config::MessageStoreConfig, - consume_queue::mapped_file_queue::MappedFileQueue, + base::{ + dispatch_request::DispatchRequest, store_checkpoint::StoreCheckpoint, swappable::Swappable, + }, + config::{broker_role::BrokerRole, message_store_config::MessageStoreConfig}, + consume_queue::{consume_queue_ext::CqExtUnit, mapped_file_queue::MappedFileQueue}, filter::MessageFilter, - log_file::mapped_file::MappedFile, + log_file::mapped_file::{default_impl::DefaultMappedFile, MappedFile}, queue::{ consume_queue_ext::ConsumeQueueExt, queue_offset_operator::QueueOffsetOperator, ConsumeQueueTrait, CqUnit, FileQueueLifeCycle, }, + store::running_flags::RunningFlags, store_path_config_helper::get_store_path_consume_queue_ext, }; @@ -63,6 +66,8 @@ pub struct ConsumeQueue { max_physic_offset: Arc>, min_logic_offset: Arc>, consume_queue_ext: Option, + running_flags: Arc, + store_checkpoint: Arc, } impl ConsumeQueue { @@ -72,6 +77,8 @@ impl ConsumeQueue { store_path: String, mapped_file_size: i32, message_store_config: Arc, + running_flags: Arc, + store_checkpoint: Arc, ) -> Self { let queue_dir = PathBuf::from(store_path.clone()) .join(topic.clone()) @@ -102,6 +109,8 @@ impl ConsumeQueue { max_physic_offset: Arc::new(parking_lot::Mutex::new(-1)), min_logic_offset: Arc::new(parking_lot::Mutex::new(0)), consume_queue_ext, + running_flags, + store_checkpoint, } } } @@ -194,6 +203,102 @@ impl ConsumeQueue { pub fn is_ext_addr(tags_code: i64) -> bool { ConsumeQueueExt::is_ext_addr(tags_code) } + + pub fn is_ext_write_enable(&self) -> bool { + self.consume_queue_ext.is_some() && self.message_store_config.enable_consume_queue_ext + } + + pub fn put_message_position_info( + &mut self, + offset: i64, + size: i32, + tags_code: i64, + cq_offset: i64, + ) -> bool { + if offset + size as i64 <= self.get_max_physic_offset() { + warn!( + "Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", + self.get_max_physic_offset(), + offset + ); + return true; + } + let mut bytes = BytesMut::with_capacity(CQ_STORE_UNIT_SIZE as usize); + bytes.put_i64(offset); + bytes.put_i32(size); + bytes.put_i64(tags_code); + + let expect_logic_offset = cq_offset + CQ_STORE_UNIT_SIZE as i64; + if let Some(mapped_file) = self + .mapped_file_queue + .get_last_mapped_file_mut_start_offset(expect_logic_offset as u64, true) + { + if mapped_file.is_first_create_in_queue() + && cq_offset != 0 + && mapped_file.get_wrote_position() == 0 + { + *self.min_logic_offset.lock() = expect_logic_offset; + self.mapped_file_queue + .set_flushed_where(expect_logic_offset); + self.mapped_file_queue + .set_committed_where(expect_logic_offset); + self.fill_pre_blank(&mapped_file, expect_logic_offset); + info!( + "fill pre blank space {} {}", + mapped_file.get_file_name(), + mapped_file.get_wrote_position() + ); + } + + if cq_offset != 0 { + let current_logic_offset = mapped_file.get_wrote_position() as i64 + + mapped_file.get_file_from_offset() as i64; + + if expect_logic_offset < current_logic_offset { + warn!( + "Build consume queue repeatedly, expectLogicOffset: {} \ + currentLogicOffset: {} Topic: {} QID: {} Diff: {}", + expect_logic_offset, + current_logic_offset, + self.topic, + self.queue_id, + expect_logic_offset - current_logic_offset + ); + return true; + } + + if expect_logic_offset != current_logic_offset { + warn!( + "[BUG]logic queue order maybe wrong, expectLogicOffset: {} \ + currentLogicOffset: {} Topic: {} QID: {} Diff: {}", + expect_logic_offset, + current_logic_offset, + self.topic, + self.queue_id, + expect_logic_offset - current_logic_offset + ); + } + } + self.set_max_physic_offset(offset + size as i64); + mapped_file.append_message_bytes(&bytes.freeze()) + } else { + false + } + } + + fn fill_pre_blank(&self, mapped_file: &Arc, until_where: i64) { + let mut bytes_mut = BytesMut::with_capacity(CQ_STORE_UNIT_SIZE as usize); + + bytes_mut.put_i64(0); + bytes_mut.put_i32(i32::MAX); + bytes_mut.put_i64(0); + let bytes = bytes_mut.freeze(); + let until = (until_where % self.mapped_file_queue.mapped_file_size as i64) as i32 + / CQ_STORE_UNIT_SIZE; + for n in 0..until { + mapped_file.append_message_bytes(&bytes); + } + } } impl FileQueueLifeCycle for ConsumeQueue { @@ -246,6 +351,7 @@ impl FileQueueLifeCycle for ConsumeQueue { if Self::is_ext_addr(tags_code) { max_ext_addr = tags_code; } + println!("offset {}, size {}, tags_code {}", offset, size, tags_code); } else { info!( "recover current consume queue file over, {}, {} {} {}", @@ -361,6 +467,14 @@ impl ConsumeQueueTrait for ConsumeQueue { todo!() } + fn get_cq_unit_and_store_time(&self, index: i64) -> Option<(CqUnit, i64)> { + todo!() + } + + fn get_earliest_unit_and_store_time(&self) -> Option<(CqUnit, i64)> { + todo!() + } + fn get_earliest_unit(&self) -> CqUnit { todo!() } @@ -421,8 +535,59 @@ impl ConsumeQueueTrait for ConsumeQueue { todo!() } - fn put_message_position_info_wrapper(&self, request: DispatchRequest) { - todo!() + fn put_message_position_info_wrapper(&mut self, request: &DispatchRequest) { + let max_retries = 30i32; + let can_write = self.running_flags.is_cq_writeable(); + let mut i = 0i32; + while i < max_retries && can_write { + let mut tags_code = request.tags_code; + if self.is_ext_write_enable() { + let ext_addr = self.consume_queue_ext.as_ref().unwrap().put(CqExtUnit::new( + tags_code, + request.store_timestamp, + Some(request.bit_map.clone()), + )); + + if Self::is_ext_addr(ext_addr) { + tags_code = ext_addr; + } else { + warn!( + "Save consume queue extend fail, So just save tagsCode! topic:{}, \ + queueId:{}, offset:{}", + self.topic, self.queue_id, request.commit_log_offset, + ) + } + } + if self.put_message_position_info( + request.commit_log_offset, + request.msg_size, + tags_code, + request.consume_queue_offset, + ) { + if self.message_store_config.broker_role == BrokerRole::Slave + || self.message_store_config.enable_dledger_commit_log + { + unimplemented!("slave or dledger commit log not support") + } + self.store_checkpoint + .set_logics_msg_timestamp(request.store_timestamp as u64); + //if (MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore. + // getMessageStoreConfig(), request)) { + // multiDispatchLmqQueue(request, maxRetries); } + return; + } else { + warn!( + "[BUG]put commit log position info to {}:{} failed, retry {} times", + self.topic, self.queue_id, i + ); + } + i += 1; + } + error!( + "[BUG]consume queue can not write, {} {}", + self.topic, self.queue_id + ); + self.running_flags.make_logics_queue_error(); } fn increase_queue_offset( diff --git a/rocketmq-store/src/store/running_flags.rs b/rocketmq-store/src/store/running_flags.rs index f5706850..13cdb247 100644 --- a/rocketmq-store/src/store/running_flags.rs +++ b/rocketmq-store/src/store/running_flags.rs @@ -28,6 +28,12 @@ const DISK_FULL_BIT: u32 = 1 << 4; const FENCED_BIT: u32 = 1 << 5; const LOGIC_DISK_FULL_BIT: u32 = 1 << 6; +impl Default for RunningFlags { + fn default() -> Self { + Self::new() + } +} + impl RunningFlags { pub fn new() -> Self { Self {