Skip to content

Commit

Permalink
[ISSUES mxsm#878] Support AdminBrokerProcessor get_broker_runtime_inf…
Browse files Browse the repository at this point in the history
…o part#2
  • Loading branch information
847850277 committed Aug 13, 2024
1 parent 2c2f79c commit 04a6592
Show file tree
Hide file tree
Showing 13 changed files with 377 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,5 @@ once_cell = "1.19.0"

mockall = "0.13.0"
cfg-if = "1.0.0"

sysinfo = "0.31.2"
1 change: 1 addition & 0 deletions rocketmq-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ log = "0.4.22"
cfg-if = { workspace = true }
lazy_static.workspace = true
num_cpus = { workspace = true }
sysinfo = { workspace = true }
[dev-dependencies]
mockall = "0.13.0"
static_assertions = { version = "1" }
Expand Down
6 changes: 5 additions & 1 deletion rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,17 @@ impl BrokerRuntime {
async fn initialize_message_store(&mut self) -> bool {
if self.message_store_config.store_type == StoreType::LocalFile {
info!("Use local file as message store");
let message_store = DefaultMessageStore::new(
let mut message_store = DefaultMessageStore::new(
self.message_store_config.clone(),
self.broker_config.clone(),
self.topic_config_manager.topic_config_table(),
Some(self.broker_stats_manager.clone()),
false,
);
if self.message_store_config.is_timer_wheel_enable() {
let time_message_store = TimerMessageStore::new(Some(message_store.clone()));
message_store.set_timer_message_store(Arc::new(time_message_store));
}
self.consumer_offset_manager
.set_message_store(Some(Arc::new(message_store.clone())));
self.topic_config_manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
use std::collections::HashMap;

use bytes::Bytes;
use rocketmq_common::common::mix_all;
use rocketmq_common::common::mq_version::RocketMqVersion;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::body::kv_table::KVTable;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::server::ConnectionHandlerContext;
use rocketmq_store::log_file::MessageStore;
use sysinfo::Disks;

use crate::processor::admin_broker_processor::Inner;

Expand Down Expand Up @@ -190,6 +192,99 @@ impl BrokerConfigRequestHandler {
.get_start_accept_send_request_time_stamp()
.to_string(),
);
let is_timer_wheel_enable = self.inner.message_store_config.is_timer_wheel_enable();
if is_timer_wheel_enable {
runtime_info.insert(
"timerReadBehind".to_string(),
self.inner
.default_message_store
.get_timer_message_store()
.get_dequeue_behind()
.to_string(),
);
runtime_info.insert(
"timerOffsetBehind".to_string(),
self.inner
.default_message_store
.get_timer_message_store()
.get_enqueue_behind_messages()
.to_string(),
);
runtime_info.insert(
"timerCongestNum".to_string(),
self.inner
.default_message_store
.get_timer_message_store()
.get_all_congest_num()
.to_string(),
);
runtime_info.insert(
"timerEnqueueTps".to_string(),
self.inner
.default_message_store
.get_timer_message_store()
.get_enqueue_tps()
.to_string(),
);
runtime_info.insert(
"timerDequeueTps".to_string(),
self.inner
.default_message_store
.get_timer_message_store()
.get_dequeue_tps()
.to_string(),
);
} else {
runtime_info.insert("timerReadBehind".to_string(), "0".to_string());
runtime_info.insert("timerOffsetBehind".to_string(), "0".to_string());
runtime_info.insert("timerCongestNum".to_string(), "0".to_string());
runtime_info.insert("timerEnqueueTps".to_string(), "0.0".to_string());
runtime_info.insert("timerDequeueTps".to_string(), "0.0".to_string());
}
let default_message_store = self.inner.default_message_store.clone();
runtime_info.insert(
"remainTransientStoreBufferNumbs".to_string(),
default_message_store
.remain_transient_store_buffer_nums()
.to_string(),
);
if default_message_store.is_transient_store_pool_enable() {
runtime_info.insert(
"remainHowManyDataToCommit".to_string(),
mix_all::human_readable_byte_count(
default_message_store.remain_how_many_data_to_commit(),
false,
),
);
}
runtime_info.insert(
"remainHowManyDataToFlush".to_string(),
mix_all::human_readable_byte_count(
default_message_store.remain_how_many_data_to_flush(),
false,
),
);
let store_path_root_dir = &self.inner.message_store_config.store_path_root_dir;
let commit_log_dir = std::path::Path::new(&store_path_root_dir);
if commit_log_dir.exists() {
let disks = Disks::new_with_refreshed_list();
let path_str = commit_log_dir.to_str().unwrap();
for disk in &disks {
if disk.mount_point().to_str() == Some(path_str) {
runtime_info.insert(
"commitLogDirCapacity".to_string(),
format!(
"Total : {}, Free : {}.",
mix_all::human_readable_byte_count(disk.total_space() as i64, false),
mix_all::human_readable_byte_count(
disk.available_space() as i64,
false,
)
),
);
}
}
}
runtime_info
}
fn is_special_service_running(&self) -> bool {
Expand Down
17 changes: 16 additions & 1 deletion rocketmq-common/src/common/mix_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,22 @@ pub fn broker_vip_channel(is_change: bool, broker_addr: &str) -> String {
broker_addr.to_string()
}

pub fn human_readable_byte_count(bytes: i64, si: bool) -> String {
let bytes = bytes as f64;
let unit = if si { 1000.0 } else { 1024.0 };
if bytes < unit {
return format!("{} B", bytes);
}
let exp = (bytes.ln() / unit.ln()).floor() as i32;
let pre = ['K', 'M', 'G', 'T', 'P', 'E'][(exp - 1) as usize];
let pre = if si {
pre.to_string()
} else {
format!("{}i", pre)
};
format!("{:.1} {}B", bytes / unit.powi(exp), pre)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -220,7 +236,6 @@ mod tests {
let lmq_meta_data = Some("NonLMQSpecificInfo");
assert!(!is_lmq(lmq_meta_data));
}

#[test]
fn returns_false_for_none_metadata() {
assert!(!is_lmq(None));
Expand Down
82 changes: 81 additions & 1 deletion rocketmq-store/src/base/transient_store_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,84 @@
* limitations under the License.
*/

pub struct TransientStorePool;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex;

use tracing::warn;

pub struct TransientStorePool {
pool_size: usize,
file_size: usize,
available_buffers: Arc<Mutex<VecDeque<Vec<u8>>>>,
is_real_commit: Arc<Mutex<bool>>,
}

impl TransientStorePool {
pub fn new(pool_size: usize, file_size: usize) -> Self {
let available_buffers = Arc::new(Mutex::new(VecDeque::with_capacity(pool_size)));
let is_real_commit = Arc::new(Mutex::new(true));
TransientStorePool {
pool_size,
file_size,
available_buffers,
is_real_commit,
}
}

pub fn init(&self) {
let mut available_buffers = self.available_buffers.lock().unwrap();
for _ in 0..self.pool_size {
let buffer = vec![0u8; self.file_size];
available_buffers.push_back(buffer);
}
}

pub fn destroy(&self) {
let mut available_buffers = self.available_buffers.lock().unwrap();
available_buffers.clear();
}

pub fn return_buffer(&self, buffer: Vec<u8>) {
let mut available_buffers = self.available_buffers.lock().unwrap();
available_buffers.push_front(buffer);
}

pub fn borrow_buffer(&self) -> Option<Vec<u8>> {
let mut available_buffers = self.available_buffers.lock().unwrap();
let buffer = available_buffers.pop_front();
if available_buffers.len() < self.pool_size / 10 * 4 {
warn!(
"TransientStorePool only remain {} sheets.",
available_buffers.len()
);
}
buffer
}

pub fn available_buffer_nums(&self) -> usize {
let available_buffers = self.available_buffers.lock().unwrap();
available_buffers.len()
}

pub fn is_real_commit(&self) -> bool {
let is_real_commit = self.is_real_commit.lock().unwrap();
*is_real_commit
}

pub fn set_real_commit(&self, real_commit: bool) {
let mut is_real_commit = self.is_real_commit.lock().unwrap();
*is_real_commit = real_commit;
}
}

impl Clone for TransientStorePool {
fn clone(&self) -> Self {
TransientStorePool {
pool_size: self.pool_size,
file_size: self.file_size,
available_buffers: self.available_buffers.clone(),
is_real_commit: self.is_real_commit.clone(),
}
}
}
15 changes: 15 additions & 0 deletions rocketmq-store/src/consume_queue/mapped_file_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,21 @@ impl MappedFileQueue {
}
result
}

pub fn remain_how_many_data_to_commit(&self) -> i64 {
self.get_max_wrote_position() - self.get_committed_where()
}

pub fn remain_how_many_data_to_flush(&self) -> i64 {
self.get_max_offset() - self.get_flushed_where()
}
fn get_max_wrote_position(&self) -> i64 {
let mapped_file = self.get_last_mapped_file();
match mapped_file {
None => 0,
Some(file) => file.get_file_from_offset() as i64 + file.get_wrote_position() as i64,
}
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ pub mod stats;
pub mod store;
pub mod store_path_config_helper;
pub mod timer;
mod utils;
pub mod utils;
22 changes: 22 additions & 0 deletions rocketmq-store/src/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::hook::put_message_hook::BoxedPutMessageHook;
use crate::queue::ArcConsumeQueue;
use crate::stats::broker_stats_manager::BrokerStatsManager;
use crate::store::running_flags::RunningFlags;
use crate::timer::timer_message_store::TimerMessageStore;

pub(crate) mod cold_data_check_service;
pub mod commit_log;
Expand Down Expand Up @@ -411,4 +412,25 @@ pub trait RocketMQMessageStore: Clone + 'static {
///
/// * `i64` - Timestamp of the earliest message in this store.
fn get_earliest_message_time(&self) -> i64;

/// Get the store time of the earliest message in this store.
fn get_timer_message_store(&self) -> Arc<TimerMessageStore>;

/// Set the timer message store.
fn set_timer_message_store(&mut self, timer_message_store: Arc<TimerMessageStore>);

/// Get remain transientStoreBuffer numbers
/// @return
/// * `i32` - The number of remaining transient store buffers.
fn remain_transient_store_buffer_nums(&self) -> i32;

/// Get remain how many data to commit
/// @return
/// * `i64` - remain how many data to commit.
fn remain_how_many_data_to_commit(&self) -> i64;

/// Get remain how many data to flush
/// @return
/// * `i64` - remain how many data to flush.
fn remain_how_many_data_to_flush(&self) -> i64;
}
8 changes: 8 additions & 0 deletions rocketmq-store/src/log_file/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,14 @@ impl CommitLog {
pub fn begin_time_in_lock(&self) -> &Arc<AtomicU64> {
&self.begin_time_in_lock
}

pub fn remain_how_many_data_to_commit(&self) -> i64 {
self.mapped_file_queue.remain_how_many_data_to_commit()
}

pub fn remain_how_many_data_to_flush(&self) -> i64 {
self.mapped_file_queue.remain_how_many_data_to_flush()
}
}

pub fn check_message_and_return_size(
Expand Down
Loading

0 comments on commit 04a6592

Please sign in to comment.