Skip to content

Commit

Permalink
[ISSUE #459]🚀Enhance the functionality of BrokerStatsManager -2🎨 (#460)
Browse files Browse the repository at this point in the history
* [ISSUE #459]🚀Enhance the functionality of BrokerStatsManager -2🎨

* fix ci error
  • Loading branch information
mxsm authored Jun 8, 2024
1 parent eb4ab3f commit 84ff741
Show file tree
Hide file tree
Showing 23 changed files with 774 additions and 92 deletions.
8 changes: 6 additions & 2 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use rocketmq_runtime::RocketMQRuntime;
use rocketmq_store::{
base::store_enum::StoreType, config::message_store_config::MessageStoreConfig,
log_file::MessageStore, message_store::default_message_store::DefaultMessageStore,
timer::timer_message_store::TimerMessageStore,
stats::broker_stats_manager::BrokerStatsManager, timer::timer_message_store::TimerMessageStore,
};
use tracing::{info, warn};

Expand Down Expand Up @@ -91,6 +91,7 @@ pub(crate) struct BrokerRuntime {
drop: Arc<AtomicBool>,
shutdown: Arc<AtomicBool>,
shutdown_hook: Option<BrokerShutdownHook>,
broker_stats_manager: Arc<BrokerStatsManager>,
}

impl Clone for BrokerRuntime {
Expand All @@ -114,6 +115,7 @@ impl Clone for BrokerRuntime {
drop: self.drop.clone(),
shutdown: self.shutdown.clone(),
shutdown_hook: self.shutdown_hook.clone(),
broker_stats_manager: self.broker_stats_manager.clone(),
}
}
}
Expand All @@ -139,7 +141,7 @@ impl BrokerRuntime {
server_config: server_config.clone(),
topic_queue_mapping_manager: topic_queue_mapping_manager.clone(),
});

let broker_stats_manager = Arc::new(BrokerStatsManager::new(broker_config.clone()));
Self {
broker_config: broker_config.clone(),
message_store_config,
Expand All @@ -162,6 +164,7 @@ impl BrokerRuntime {
drop: Arc::new(AtomicBool::new(false)),
shutdown: Arc::new(AtomicBool::new(false)),
shutdown_hook: None,
broker_stats_manager,
}
}

Expand Down Expand Up @@ -243,6 +246,7 @@ impl BrokerRuntime {
self.message_store_config.clone(),
self.broker_config.clone(),
self.topic_config_manager.topic_config_table(),
Some(self.broker_stats_manager.clone()),
);
self.topic_config_manager
.set_message_store(Some(message_store.clone()));
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-broker/src/mqtrace/send_message_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::{any::Any, collections::HashMap};

use rocketmq_common::common::message::message_enum::MessageType;
use rocketmq_store::status::StatsType;
use rocketmq_store::stats::stats_type::StatsType;

#[derive(Debug, Default)]
pub struct SendMessageContext {
Expand Down
4 changes: 1 addition & 3 deletions rocketmq-broker/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ use rocketmq_remoting::{
},
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
};
use rocketmq_store::{
log_file::MessageStore, status::manager::broker_stats_manager::BrokerStatsManager,
};
use rocketmq_store::{log_file::MessageStore, stats::broker_stats_manager::BrokerStatsManager};
use tracing::{info, warn};

use self::client_manage_processor::ClientManageProcessor;
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-broker/src/processor/send_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use rocketmq_remoting::{
};
use rocketmq_store::{
base::message_result::PutMessageResult, log_file::MessageStore,
status::manager::broker_stats_manager::BrokerStatsManager,
stats::broker_stats_manager::BrokerStatsManager,
};
use tracing::debug;

Expand Down
5 changes: 5 additions & 0 deletions rocketmq-broker/src/util/hook_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ mod tests {
config::message_store_config::MessageStoreConfig,
hook::put_message_hook::BoxedPutMessageHook,
log_file::MessageStore,
stats::broker_stats_manager::BrokerStatsManager,
store::running_flags::RunningFlags,
};

Expand Down Expand Up @@ -474,6 +475,10 @@ mod tests {
fn set_put_message_hook(&self, put_message_hook: BoxedPutMessageHook) {
todo!()
}

fn get_broker_stats_manager(&self) -> Option<Arc<BrokerStatsManager>> {
todo!()
}
// Implement required methods...
}

Expand Down
1 change: 1 addition & 0 deletions rocketmq-common/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub mod mix_all;
pub mod mq_version;
pub mod namesrv;
pub mod server;
pub mod statistics;
pub mod stats;
pub mod sys_flag;
pub mod thread;
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-common/src/common/broker/broker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub struct BrokerConfig {
pub revive_queue_num: u32,
pub enable_slave_acting_master: bool,
pub reject_transaction_message: bool,
pub enable_detail_stat: bool,
}

impl Default for BrokerConfig {
Expand Down Expand Up @@ -163,6 +164,7 @@ impl Default for BrokerConfig {
revive_queue_num: 8,
enable_slave_acting_master: false,
reject_transaction_message: false,
enable_detail_stat: true,
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions rocketmq-common/src/common/statistics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod interceptor;
pub mod state_getter;
pub mod statistics_brief;
pub mod statistics_item;
32 changes: 32 additions & 0 deletions rocketmq-common/src/common/statistics/interceptor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/// `Interceptor` is a trait that provides a common interface for objects that can intercept and
/// modify a sequence of integers. It provides two methods: `inc` and `reset`.
pub trait Interceptor {
/// The `inc` method accepts a vector of i64 integers, `deltas`, and applies some operation to
/// them. The specifics of the operation are determined by the implementing type.
///
/// # Arguments
///
/// * `deltas` - A vector of i64 integers to be intercepted and modified.
fn inc(&self, deltas: Vec<i64>);

/// The `reset` method resets the state of the implementing object. The specifics of what
/// "resetting" means are determined by the implementing type.
fn reset(&self);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
///
/// This trait is used to determine whether a specific instance, identified by its `instance_id`,
/// `group`, and `topic`, is online.
pub trait StateGetter {
pub trait StateGetter: Send + Sync + 'static {
/// Checks if the specified instance is online.
///
/// # Arguments
Expand Down
Loading

0 comments on commit 84ff741

Please sign in to comment.