Skip to content

Commit

Permalink
[ISSUE #355]🚀Implement ConsumeQueueStore recover (#356)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored May 14, 2024
1 parent 0fa694a commit 7e940b8
Show file tree
Hide file tree
Showing 17 changed files with 512 additions and 65 deletions.
19 changes: 18 additions & 1 deletion rocketmq-common/src/utils/util_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
* limitations under the License.
*/

use chrono::{Datelike, TimeZone, Timelike, Utc};
use std::path::PathBuf;

use chrono::{DateTime, Datelike, TimeZone, Timelike, Utc};

const HEX_ARRAY: [char; 16] = [
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F',
Expand Down Expand Up @@ -84,10 +86,25 @@ pub fn time_millis_to_human_string3(t: i64) -> String {
)
}

pub fn time_millis_to_human_string(t: i64) -> String {
let dt = DateTime::<Utc>::from_timestamp_millis(t);
dt.as_ref().unwrap().format("%Y%m%d%H%M%S%3f").to_string()
}

pub fn offset_to_file_name(offset: u64) -> String {
format!("{:020}", offset)
}

/*pub fn ensure_dir_ok(dir: impl AsRef<PathBuf>) -> Result<(), std::io::Error> {
match dir.as_ref().exists() {
true => Ok(()),
false => Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("{:?}", dir.as_ref()),
)),
}
}*/

#[cfg(test)]
mod tests {
use super::*;
Expand Down
148 changes: 139 additions & 9 deletions rocketmq-store/src/base/store_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,151 @@
* limitations under the License.
*/

#[derive(Default, Clone)]
pub struct StoreCheckpoint {}
use std::{
fs::{File, OpenOptions},
io::Write,
path::Path,
sync::atomic::{AtomicU64, Ordering},
};

use memmap2::MmapMut;
use tracing::info;

use crate::log_file::mapped_file::default_impl::OS_PAGE_SIZE;

pub struct StoreCheckpoint {
file: File,
mmap: parking_lot::Mutex<MmapMut>,
physic_msg_timestamp: AtomicU64,
logics_msg_timestamp: AtomicU64,
index_msg_timestamp: AtomicU64,
master_flushed_offset: AtomicU64,
confirm_phy_offset: AtomicU64,
}

impl StoreCheckpoint {
pub fn new(_scp_path: String) -> Self {
Self {}
pub fn new<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path.as_ref())?;

let mmap = unsafe { MmapMut::map_mut(&file)? };
let _ = file.set_len(OS_PAGE_SIZE);
if file.metadata()?.len() > 0 {
let buffer = &mmap[..8];
let physic_msg_timestamp = u64::from_ne_bytes(buffer.try_into().unwrap());
let logics_msg_timestamp = u64::from_ne_bytes(mmap[8..16].try_into().unwrap());
let index_msg_timestamp = u64::from_ne_bytes(mmap[16..24].try_into().unwrap());
let master_flushed_offset = u64::from_ne_bytes(mmap[24..32].try_into().unwrap());
let confirm_phy_offset = u64::from_ne_bytes(mmap[32..40].try_into().unwrap());

info!("store checkpoint file exists, {}", path.as_ref().display());
info!("physicMsgTimestamp: {}", physic_msg_timestamp);
info!("logicsMsgTimestamp: {}", logics_msg_timestamp);
info!("indexMsgTimestamp: {}", index_msg_timestamp);
info!("masterFlushedOffset: {}", master_flushed_offset);
info!("confirmPhyOffset: {}", confirm_phy_offset);

Ok(Self {
file,
mmap: parking_lot::Mutex::new(mmap),
physic_msg_timestamp: AtomicU64::new(physic_msg_timestamp),
logics_msg_timestamp: AtomicU64::new(logics_msg_timestamp),
index_msg_timestamp: AtomicU64::new(index_msg_timestamp),
master_flushed_offset: AtomicU64::new(master_flushed_offset),
confirm_phy_offset: AtomicU64::new(confirm_phy_offset),
})
} else {
//info!("store checkpoint file not exists, {}", path.as_ref());
Ok(Self {
file,
mmap: parking_lot::Mutex::new(mmap),
physic_msg_timestamp: AtomicU64::new(0),
logics_msg_timestamp: AtomicU64::new(0),
index_msg_timestamp: AtomicU64::new(0),
master_flushed_offset: AtomicU64::new(0),
confirm_phy_offset: AtomicU64::new(0),
})
}
}

fn flush(&self) -> std::io::Result<()> {
let mut buffer = &mut self.mmap.lock()[..8];
buffer.write_all(
self.physic_msg_timestamp
.load(Ordering::Relaxed)
.to_ne_bytes()
.as_ref(),
)?;
buffer.write_all(
self.logics_msg_timestamp
.load(Ordering::Relaxed)
.to_ne_bytes()
.as_ref(),
)?;
buffer.write_all(
self.index_msg_timestamp
.load(Ordering::Relaxed)
.to_ne_bytes()
.as_ref(),
)?;
buffer.write_all(
self.master_flushed_offset
.load(Ordering::Relaxed)
.to_ne_bytes()
.as_ref(),
)?;
buffer.write_all(
self.confirm_phy_offset
.load(Ordering::Relaxed)
.to_ne_bytes()
.as_ref(),
)?;
self.mmap.lock().flush()?;
Ok(())
}

pub fn get_master_flushed_offset(&self) -> i64 {
-1
fn shutdown(&self) -> std::io::Result<()> {
self.flush()
}

pub fn get_confirm_phy_offset(&self) -> i64 {
-1
pub fn set_physic_msg_timestamp(&self, physic_msg_timestamp: u64) {
self.physic_msg_timestamp
.store(physic_msg_timestamp, Ordering::Relaxed);
}
pub fn set_logics_msg_timestamp(&self, logics_msg_timestamp: u64) {
self.logics_msg_timestamp
.store(logics_msg_timestamp, Ordering::Relaxed);
}
pub fn set_index_msg_timestamp(&self, index_msg_timestamp: u64) {
self.index_msg_timestamp
.store(index_msg_timestamp, Ordering::Relaxed);
}
pub fn set_master_flushed_offset(&self, master_flushed_offset: u64) {
self.master_flushed_offset
.store(master_flushed_offset, Ordering::Relaxed);
}
pub fn set_confirm_phy_offset(&self, confirm_phy_offset: u64) {
self.confirm_phy_offset
.store(confirm_phy_offset, Ordering::Relaxed);
}

pub fn set_confirm_phy_offset(&mut self, _confirm_phy_offset: i64) {}
pub fn physic_msg_timestamp(&self) -> u64 {
self.physic_msg_timestamp.load(Ordering::Relaxed)
}
pub fn logics_msg_timestamp(&self) -> u64 {
self.logics_msg_timestamp.load(Ordering::Relaxed)
}
pub fn index_msg_timestamp(&self) -> u64 {
self.index_msg_timestamp.load(Ordering::Relaxed)
}
pub fn master_flushed_offset(&self) -> u64 {
self.master_flushed_offset.load(Ordering::Relaxed)
}
pub fn confirm_phy_offset(&self) -> u64 {
self.confirm_phy_offset.load(Ordering::Relaxed)
}
}
3 changes: 1 addition & 2 deletions rocketmq-store/src/config/broker_role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use std::fmt;

use serde::{Deserialize, Deserializer};

#[allow(dead_code)]
#[derive(Debug, Copy, Clone, Default)]
#[derive(Debug, Copy, Clone, Default, Eq, PartialEq)]
pub enum BrokerRole {
#[default]
AsyncMaster,
Expand Down
32 changes: 31 additions & 1 deletion rocketmq-store/src/consume_queue/consume_queue_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,34 @@
* limitations under the License.
*/

pub struct ConsumeQueueExtCqExtUnit {}
const MIN_EXT_UNIT_SIZE: i16 = 2 // size, 32k max
+ 8 * 2 // msg time + tagCode
+ 2; // bitMapSize
const MAX_EXT_UNIT_SIZE: i16 = i16::MAX;

#[derive(Clone, Default)]
pub struct CqExtUnit {
size: i16,
tags_code: i64,
msg_store_time: i64,
bit_map_size: i16,
filter_bit_map: Option<Vec<u8>>,
}

impl CqExtUnit {
pub fn new(tags_code: i64, msg_store_time: i64, filter_bit_map: Option<Vec<u8>>) -> Self {
let bit_map_size = if let Some(val) = filter_bit_map.as_ref() {
val.len() as i16
} else {
0
};
let size = MIN_EXT_UNIT_SIZE + bit_map_size;
Self {
size,
tags_code,
msg_store_time,
bit_map_size,
filter_bit_map,
}
}
}
2 changes: 1 addition & 1 deletion rocketmq-store/src/consume_queue/mapped_file_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl MappedFileQueue {
self.mapped_files.last().cloned()
}

pub async fn get_last_mapped_file_mut_start_offset(
pub fn get_last_mapped_file_mut_start_offset(
&mut self,
start_offset: u64,
need_create: bool,
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-store/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::collections::HashMap;

use crate::consume_queue::consume_queue_ext::ConsumeQueueExtCqExtUnit;
use crate::consume_queue::consume_queue_ext::CqExtUnit;

/// Represents a message filter.
pub trait MessageFilter {
Expand All @@ -26,7 +26,7 @@ pub trait MessageFilter {
fn is_matched_by_consume_queue(
&self,
tags_code: Option<i64>,
cq_ext_unit: Option<&ConsumeQueueExtCqExtUnit>,
cq_ext_unit: Option<&CqExtUnit>,
) -> bool;

/// Matches by message content which is stored in the commit log.
Expand Down
10 changes: 5 additions & 5 deletions rocketmq-store/src/log_file/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub struct CommitLog {
//local_file_message_store: Option<Weak<Mutex<LocalFileMessageStore>>>,
dispatcher: CommitLogDispatcherDefault,
confirm_offset: i64,
store_checkpoint: StoreCheckpoint,
store_checkpoint: Arc<StoreCheckpoint>,
append_message_callback: Arc<DefaultAppendMessageCallback>,
}

Expand All @@ -95,7 +95,7 @@ impl CommitLog {
message_store_config: Arc<MessageStoreConfig>,
broker_config: Arc<BrokerConfig>,
dispatcher: &CommitLogDispatcherDefault,
store_checkpoint: StoreCheckpoint,
store_checkpoint: Arc<StoreCheckpoint>,
topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>,
) -> Self {
let enabled_append_prop_crc = message_store_config.enabled_append_prop_crc;
Expand Down Expand Up @@ -139,7 +139,8 @@ impl CommitLog {

pub fn set_confirm_offset(&mut self, phy_offset: i64) {
self.confirm_offset = phy_offset;
self.store_checkpoint.set_confirm_phy_offset(phy_offset);
self.store_checkpoint
.set_confirm_phy_offset(phy_offset as u64);
}

pub async fn put_message(&mut self, msg: MessageExtBrokerInner) -> PutMessageResult {
Expand Down Expand Up @@ -193,7 +194,6 @@ impl CommitLog {
None => self
.mapped_file_queue
.get_last_mapped_file_mut_start_offset(0, true)
.await
.unwrap(),
Some(mapped_file) => mapped_file,
};
Expand Down Expand Up @@ -336,7 +336,7 @@ impl CommitLog {
} else {
warn!(
"The commitlog files are deleted, and delete the consume queue
files"
files"
);
self.mapped_file_queue.set_flushed_where(0);
self.mapped_file_queue.set_committed_where(0);
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-store/src/log_file/mapped_file/default_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
log_file::mapped_file::MappedFile,
};

const OS_PAGE_SIZE: u64 = 1024 * 4;
pub const OS_PAGE_SIZE: u64 = 1024 * 4;

static TOTAL_MAPPED_VIRTUAL_MEMORY: AtomicI64 = AtomicI64::new(0);
static TOTAL_MAPPED_FILES: AtomicI32 = AtomicI32::new(0);
Expand Down
Loading

0 comments on commit 7e940b8

Please sign in to comment.