From 04a6592f4aebf44fd0bf9ec1a66aab14618374ef Mon Sep 17 00:00:00 2001 From: zhengpeng <847850277@qq.com> Date: Tue, 13 Aug 2024 11:41:32 +0800 Subject: [PATCH] [ISSUES #878] Support AdminBrokerProcessor get_broker_runtime_info part#2 --- Cargo.lock | 1 + Cargo.toml | 2 + rocketmq-broker/Cargo.toml | 1 + rocketmq-broker/src/broker_runtime.rs | 6 +- .../broker_config_request_handler.rs | 95 +++++++++++++++++++ rocketmq-common/src/common/mix_all.rs | 17 +++- .../src/base/transient_store_pool.rs | 82 +++++++++++++++- .../src/consume_queue/mapped_file_queue.rs | 15 +++ rocketmq-store/src/lib.rs | 2 +- rocketmq-store/src/log_file.rs | 22 +++++ rocketmq-store/src/log_file/commit_log.rs | 8 ++ .../message_store/default_message_store.rs | 42 +++++++- .../src/timer/timer_message_store.rs | 91 +++++++++++++++++- 13 files changed, 377 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c7fc00b..981aa48d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1862,6 +1862,7 @@ dependencies = [ "serde", "serde_json", "static_assertions", + "sysinfo", "tokio", "tokio-stream", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index 2e676c63..69d4d2fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,3 +71,5 @@ once_cell = "1.19.0" mockall = "0.13.0" cfg-if = "1.0.0" + +sysinfo = "0.31.2" diff --git a/rocketmq-broker/Cargo.toml b/rocketmq-broker/Cargo.toml index d0ce8ff5..64643a51 100644 --- a/rocketmq-broker/Cargo.toml +++ b/rocketmq-broker/Cargo.toml @@ -60,6 +60,7 @@ log = "0.4.22" cfg-if = { workspace = true } lazy_static.workspace = true num_cpus = { workspace = true } +sysinfo = { workspace = true } [dev-dependencies] mockall = "0.13.0" static_assertions = { version = "1" } diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index db569c6c..1bd0a399 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -302,13 +302,17 @@ impl BrokerRuntime { async fn initialize_message_store(&mut self) -> bool { if self.message_store_config.store_type == StoreType::LocalFile { info!("Use local file as message store"); - let message_store = DefaultMessageStore::new( + let mut message_store = DefaultMessageStore::new( self.message_store_config.clone(), self.broker_config.clone(), self.topic_config_manager.topic_config_table(), Some(self.broker_stats_manager.clone()), false, ); + if self.message_store_config.is_timer_wheel_enable() { + let time_message_store = TimerMessageStore::new(Some(message_store.clone())); + message_store.set_timer_message_store(Arc::new(time_message_store)); + } self.consumer_offset_manager .set_message_store(Some(Arc::new(message_store.clone()))); self.topic_config_manager diff --git a/rocketmq-broker/src/processor/admin_broker_processor/broker_config_request_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/broker_config_request_handler.rs index b294040d..414e2f89 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/broker_config_request_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/broker_config_request_handler.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use bytes::Bytes; +use rocketmq_common::common::mix_all; use rocketmq_common::common::mq_version::RocketMqVersion; use rocketmq_remoting::code::request_code::RequestCode; use rocketmq_remoting::net::channel::Channel; @@ -25,6 +26,7 @@ use rocketmq_remoting::protocol::body::kv_table::KVTable; use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use rocketmq_remoting::runtime::server::ConnectionHandlerContext; use rocketmq_store::log_file::MessageStore; +use sysinfo::Disks; use crate::processor::admin_broker_processor::Inner; @@ -190,6 +192,99 @@ impl BrokerConfigRequestHandler { .get_start_accept_send_request_time_stamp() .to_string(), ); + let is_timer_wheel_enable = self.inner.message_store_config.is_timer_wheel_enable(); + if is_timer_wheel_enable { + runtime_info.insert( + "timerReadBehind".to_string(), + self.inner + .default_message_store + .get_timer_message_store() + .get_dequeue_behind() + .to_string(), + ); + runtime_info.insert( + "timerOffsetBehind".to_string(), + self.inner + .default_message_store + .get_timer_message_store() + .get_enqueue_behind_messages() + .to_string(), + ); + runtime_info.insert( + "timerCongestNum".to_string(), + self.inner + .default_message_store + .get_timer_message_store() + .get_all_congest_num() + .to_string(), + ); + runtime_info.insert( + "timerEnqueueTps".to_string(), + self.inner + .default_message_store + .get_timer_message_store() + .get_enqueue_tps() + .to_string(), + ); + runtime_info.insert( + "timerDequeueTps".to_string(), + self.inner + .default_message_store + .get_timer_message_store() + .get_dequeue_tps() + .to_string(), + ); + } else { + runtime_info.insert("timerReadBehind".to_string(), "0".to_string()); + runtime_info.insert("timerOffsetBehind".to_string(), "0".to_string()); + runtime_info.insert("timerCongestNum".to_string(), "0".to_string()); + runtime_info.insert("timerEnqueueTps".to_string(), "0.0".to_string()); + runtime_info.insert("timerDequeueTps".to_string(), "0.0".to_string()); + } + let default_message_store = self.inner.default_message_store.clone(); + runtime_info.insert( + "remainTransientStoreBufferNumbs".to_string(), + default_message_store + .remain_transient_store_buffer_nums() + .to_string(), + ); + if default_message_store.is_transient_store_pool_enable() { + runtime_info.insert( + "remainHowManyDataToCommit".to_string(), + mix_all::human_readable_byte_count( + default_message_store.remain_how_many_data_to_commit(), + false, + ), + ); + } + runtime_info.insert( + "remainHowManyDataToFlush".to_string(), + mix_all::human_readable_byte_count( + default_message_store.remain_how_many_data_to_flush(), + false, + ), + ); + let store_path_root_dir = &self.inner.message_store_config.store_path_root_dir; + let commit_log_dir = std::path::Path::new(&store_path_root_dir); + if commit_log_dir.exists() { + let disks = Disks::new_with_refreshed_list(); + let path_str = commit_log_dir.to_str().unwrap(); + for disk in &disks { + if disk.mount_point().to_str() == Some(path_str) { + runtime_info.insert( + "commitLogDirCapacity".to_string(), + format!( + "Total : {}, Free : {}.", + mix_all::human_readable_byte_count(disk.total_space() as i64, false), + mix_all::human_readable_byte_count( + disk.available_space() as i64, + false, + ) + ), + ); + } + } + } runtime_info } fn is_special_service_running(&self) -> bool { diff --git a/rocketmq-common/src/common/mix_all.rs b/rocketmq-common/src/common/mix_all.rs index 18dfcd57..e0ac997b 100644 --- a/rocketmq-common/src/common/mix_all.rs +++ b/rocketmq-common/src/common/mix_all.rs @@ -142,6 +142,22 @@ pub fn broker_vip_channel(is_change: bool, broker_addr: &str) -> String { broker_addr.to_string() } +pub fn human_readable_byte_count(bytes: i64, si: bool) -> String { + let bytes = bytes as f64; + let unit = if si { 1000.0 } else { 1024.0 }; + if bytes < unit { + return format!("{} B", bytes); + } + let exp = (bytes.ln() / unit.ln()).floor() as i32; + let pre = ['K', 'M', 'G', 'T', 'P', 'E'][(exp - 1) as usize]; + let pre = if si { + pre.to_string() + } else { + format!("{}i", pre) + }; + format!("{:.1} {}B", bytes / unit.powi(exp), pre) +} + #[cfg(test)] mod tests { use super::*; @@ -220,7 +236,6 @@ mod tests { let lmq_meta_data = Some("NonLMQSpecificInfo"); assert!(!is_lmq(lmq_meta_data)); } - #[test] fn returns_false_for_none_metadata() { assert!(!is_lmq(None)); diff --git a/rocketmq-store/src/base/transient_store_pool.rs b/rocketmq-store/src/base/transient_store_pool.rs index d5d205a9..1587cab4 100644 --- a/rocketmq-store/src/base/transient_store_pool.rs +++ b/rocketmq-store/src/base/transient_store_pool.rs @@ -15,4 +15,84 @@ * limitations under the License. */ -pub struct TransientStorePool; +use std::collections::VecDeque; +use std::sync::Arc; +use std::sync::Mutex; + +use tracing::warn; + +pub struct TransientStorePool { + pool_size: usize, + file_size: usize, + available_buffers: Arc>>>, + is_real_commit: Arc>, +} + +impl TransientStorePool { + pub fn new(pool_size: usize, file_size: usize) -> Self { + let available_buffers = Arc::new(Mutex::new(VecDeque::with_capacity(pool_size))); + let is_real_commit = Arc::new(Mutex::new(true)); + TransientStorePool { + pool_size, + file_size, + available_buffers, + is_real_commit, + } + } + + pub fn init(&self) { + let mut available_buffers = self.available_buffers.lock().unwrap(); + for _ in 0..self.pool_size { + let buffer = vec![0u8; self.file_size]; + available_buffers.push_back(buffer); + } + } + + pub fn destroy(&self) { + let mut available_buffers = self.available_buffers.lock().unwrap(); + available_buffers.clear(); + } + + pub fn return_buffer(&self, buffer: Vec) { + let mut available_buffers = self.available_buffers.lock().unwrap(); + available_buffers.push_front(buffer); + } + + pub fn borrow_buffer(&self) -> Option> { + let mut available_buffers = self.available_buffers.lock().unwrap(); + let buffer = available_buffers.pop_front(); + if available_buffers.len() < self.pool_size / 10 * 4 { + warn!( + "TransientStorePool only remain {} sheets.", + available_buffers.len() + ); + } + buffer + } + + pub fn available_buffer_nums(&self) -> usize { + let available_buffers = self.available_buffers.lock().unwrap(); + available_buffers.len() + } + + pub fn is_real_commit(&self) -> bool { + let is_real_commit = self.is_real_commit.lock().unwrap(); + *is_real_commit + } + + pub fn set_real_commit(&self, real_commit: bool) { + let mut is_real_commit = self.is_real_commit.lock().unwrap(); + *is_real_commit = real_commit; + } +} + +impl Clone for TransientStorePool { + fn clone(&self) -> Self { + TransientStorePool { + pool_size: self.pool_size, + file_size: self.file_size, + available_buffers: self.available_buffers.clone(), + is_real_commit: self.is_real_commit.clone(), + } + } +} diff --git a/rocketmq-store/src/consume_queue/mapped_file_queue.rs b/rocketmq-store/src/consume_queue/mapped_file_queue.rs index 63a73a72..d3017879 100644 --- a/rocketmq-store/src/consume_queue/mapped_file_queue.rs +++ b/rocketmq-store/src/consume_queue/mapped_file_queue.rs @@ -383,6 +383,21 @@ impl MappedFileQueue { } result } + + pub fn remain_how_many_data_to_commit(&self) -> i64 { + self.get_max_wrote_position() - self.get_committed_where() + } + + pub fn remain_how_many_data_to_flush(&self) -> i64 { + self.get_max_offset() - self.get_flushed_where() + } + fn get_max_wrote_position(&self) -> i64 { + let mapped_file = self.get_last_mapped_file(); + match mapped_file { + None => 0, + Some(file) => file.get_file_from_offset() as i64 + file.get_wrote_position() as i64, + } + } } #[cfg(test)] diff --git a/rocketmq-store/src/lib.rs b/rocketmq-store/src/lib.rs index 4f16b579..9e1a2275 100644 --- a/rocketmq-store/src/lib.rs +++ b/rocketmq-store/src/lib.rs @@ -34,4 +34,4 @@ pub mod stats; pub mod store; pub mod store_path_config_helper; pub mod timer; -mod utils; +pub mod utils; diff --git a/rocketmq-store/src/log_file.rs b/rocketmq-store/src/log_file.rs index 5027a102..adee3f04 100644 --- a/rocketmq-store/src/log_file.rs +++ b/rocketmq-store/src/log_file.rs @@ -34,6 +34,7 @@ use crate::hook::put_message_hook::BoxedPutMessageHook; use crate::queue::ArcConsumeQueue; use crate::stats::broker_stats_manager::BrokerStatsManager; use crate::store::running_flags::RunningFlags; +use crate::timer::timer_message_store::TimerMessageStore; pub(crate) mod cold_data_check_service; pub mod commit_log; @@ -411,4 +412,25 @@ pub trait RocketMQMessageStore: Clone + 'static { /// /// * `i64` - Timestamp of the earliest message in this store. fn get_earliest_message_time(&self) -> i64; + + /// Get the store time of the earliest message in this store. + fn get_timer_message_store(&self) -> Arc; + + /// Set the timer message store. + fn set_timer_message_store(&mut self, timer_message_store: Arc); + + /// Get remain transientStoreBuffer numbers + /// @return + /// * `i32` - The number of remaining transient store buffers. + fn remain_transient_store_buffer_nums(&self) -> i32; + + /// Get remain how many data to commit + /// @return + /// * `i64` - remain how many data to commit. + fn remain_how_many_data_to_commit(&self) -> i64; + + /// Get remain how many data to flush + /// @return + /// * `i64` - remain how many data to flush. + fn remain_how_many_data_to_flush(&self) -> i64; } diff --git a/rocketmq-store/src/log_file/commit_log.rs b/rocketmq-store/src/log_file/commit_log.rs index a3f8fafc..1df0cc23 100644 --- a/rocketmq-store/src/log_file/commit_log.rs +++ b/rocketmq-store/src/log_file/commit_log.rs @@ -1146,6 +1146,14 @@ impl CommitLog { pub fn begin_time_in_lock(&self) -> &Arc { &self.begin_time_in_lock } + + pub fn remain_how_many_data_to_commit(&self) -> i64 { + self.mapped_file_queue.remain_how_many_data_to_commit() + } + + pub fn remain_how_many_data_to_flush(&self) -> i64 { + self.mapped_file_queue.remain_how_many_data_to_flush() + } } pub fn check_message_and_return_size( diff --git a/rocketmq-store/src/message_store/default_message_store.rs b/rocketmq-store/src/message_store/default_message_store.rs index 788e9f6e..98cfc861 100644 --- a/rocketmq-store/src/message_store/default_message_store.rs +++ b/rocketmq-store/src/message_store/default_message_store.rs @@ -70,6 +70,7 @@ use crate::base::query_message_result::QueryMessageResult; use crate::base::select_result::SelectMappedBufferResult; use crate::base::store_checkpoint::StoreCheckpoint; use crate::base::store_stats_service::StoreStatsService; +use crate::base::transient_store_pool::TransientStorePool; use crate::config::broker_role::BrokerRole; use crate::config::message_store_config::MessageStoreConfig; use crate::config::store_path_config_helper::get_store_path_batch_consume_queue; @@ -94,6 +95,7 @@ use crate::store::running_flags::RunningFlags; use crate::store_path_config_helper::get_abort_file; use crate::store_path_config_helper::get_store_checkpoint; use crate::store_path_config_helper::get_store_path_consume_queue; +use crate::timer::timer_message_store::TimerMessageStore; use crate::utils::store_util::TOTAL_PHYSICAL_MEMORY_SIZE; ///Using local files to store message data, which is also the default method. @@ -126,6 +128,8 @@ pub struct DefaultMessageStore { notify_message_arrive_in_batch: bool, store_stats_service: Arc, compaction_store: Arc, + timer_message_store: Arc, + transient_store_pool: TransientStorePool, } impl Clone for DefaultMessageStore { @@ -156,6 +160,8 @@ impl Clone for DefaultMessageStore { notify_message_arrive_in_batch: self.notify_message_arrive_in_batch, store_stats_service: self.store_stats_service.clone(), compaction_store: self.compaction_store.clone(), + timer_message_store: self.timer_message_store.clone(), + transient_store_pool: self.transient_store_pool.clone(), } } } @@ -208,6 +214,10 @@ impl DefaultMessageStore { ensure_dir_ok(Self::get_store_path_logic(&message_store_config).as_str()); let identity = broker_config.broker_identity.clone(); + let transient_store_pool = TransientStorePool::new( + message_store_config.transient_store_pool_size, + message_store_config.mapped_file_size_commit_log, + ); Self { message_store_config: message_store_config.clone(), broker_config, @@ -240,6 +250,8 @@ impl DefaultMessageStore { notify_message_arrive_in_batch, store_stats_service: Arc::new(StoreStatsService::new(Some(identity))), compaction_store: Arc::new(CompactionStore), + timer_message_store: Arc::new(TimerMessageStore::new_empty()), + transient_store_pool, } } @@ -259,6 +271,12 @@ impl DefaultMessageStore { pub fn message_store_config(&self) -> Arc { self.message_store_config.clone() } + + pub fn is_transient_store_pool_enable(&self) -> bool { + self.message_store_config.transient_store_pool_enable + && (self.broker_config.enable_controller_mode + || self.message_store_config().broker_role != BrokerRole::Slave) + } } impl Drop for DefaultMessageStore { @@ -1255,9 +1273,31 @@ impl MessageStore for DefaultMessageStore { } fn get_earliest_message_time(&self) -> i64 { - //TODO -1 } + + fn get_timer_message_store(&self) -> Arc { + self.timer_message_store.clone() + } + + fn set_timer_message_store(&mut self, timer_message_store: Arc) { + self.timer_message_store = timer_message_store; + } + + fn remain_transient_store_buffer_nums(&self) -> i32 { + if self.is_transient_store_pool_enable() { + return self.transient_store_pool.available_buffer_nums() as i32; + } + i32::MAX + } + + fn remain_how_many_data_to_commit(&self) -> i64 { + self.commit_log.remain_how_many_data_to_commit() + } + + fn remain_how_many_data_to_flush(&self) -> i64 { + self.commit_log.remain_how_many_data_to_flush() + } } #[derive(Clone)] diff --git a/rocketmq-store/src/timer/timer_message_store.rs b/rocketmq-store/src/timer/timer_message_store.rs index f836c79e..51f16f0e 100644 --- a/rocketmq-store/src/timer/timer_message_store.rs +++ b/rocketmq-store/src/timer/timer_message_store.rs @@ -14,7 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::sync::atomic::AtomicI64; + use rocketmq_common::common::message::MessageConst; +use rocketmq_common::common::system_clock::SystemClock; + +use crate::log_file::MessageStore; +use crate::message_store::default_message_store::DefaultMessageStore; pub const TIMER_TOPIC: &str = concat!("rmq_sys_", "wheel_timer"); pub const TIMER_OUT_MS: &str = MessageConst::PROPERTY_TIMER_OUT_MS; @@ -37,8 +43,27 @@ pub const MAGIC_DEFAULT: i32 = 1; pub const MAGIC_ROLL: i32 = 1 << 1; pub const MAGIC_DELETE: i32 = 1 << 2; -#[derive(Default, Clone)] -pub struct TimerMessageStore {} +pub struct TimerMessageStore { + pub curr_read_time_ms: AtomicI64, + pub curr_queue_offset: AtomicI64, + pub default_message_store: Option, +} + +impl Clone for TimerMessageStore { + fn clone(&self) -> Self { + Self { + curr_read_time_ms: AtomicI64::new( + self.curr_read_time_ms + .load(std::sync::atomic::Ordering::Relaxed), + ), + curr_queue_offset: AtomicI64::new( + self.curr_queue_offset + .load(std::sync::atomic::Ordering::Relaxed), + ), + default_message_store: self.default_message_store.clone(), + } + } +} impl TimerMessageStore { pub fn load(&mut self) -> bool { @@ -50,4 +75,66 @@ impl TimerMessageStore { pub fn is_reject(&self, _deliver_ms: u64) -> bool { false } + + pub fn get_dequeue_behind(&self) -> i64 { + self.get_dequeue_behind_millis() / 1000 + } + + pub fn get_dequeue_behind_millis(&self) -> i64 { + (SystemClock::now() as i64) + - self + .curr_read_time_ms + .load(std::sync::atomic::Ordering::Relaxed) + } + + pub fn get_enqueue_behind_messages(&self) -> i64 { + let temp_queue_offset = self + .curr_queue_offset + .load(std::sync::atomic::Ordering::Relaxed); + let consume_queue = self + .default_message_store + .as_ref() + .unwrap() + .find_consume_queue(TIMER_TOPIC, 0); + let max_offset_in_queue = match consume_queue { + Some(queue) => queue.get_max_offset_in_queue(), + None => 0, + }; + max_offset_in_queue - temp_queue_offset + } + + pub fn get_all_congest_num(&self) -> i64 { + 0 + } + + pub fn get_enqueue_tps(&self) -> f32 { + 0.0 + } + + pub fn get_dequeue_tps(&self) -> f32 { + 0.0 + } + + pub fn new(default_message_store: Option) -> Self { + Self { + curr_read_time_ms: AtomicI64::new(0), + curr_queue_offset: AtomicI64::new(0), + default_message_store, + } + } + + pub fn new_empty() -> Self { + Self { + curr_read_time_ms: AtomicI64::new(0), + curr_queue_offset: AtomicI64::new(0), + default_message_store: None, + } + } + + pub fn set_default_message_store( + &mut self, + default_message_store: Option, + ) { + self.default_message_store = default_message_store; + } }