Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #459]🚀Enhance the functionality of BrokerStatsManager -2🎨 #460

Merged
merged 2 commits into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
use rocketmq_store::{
base::store_enum::StoreType, config::message_store_config::MessageStoreConfig,
log_file::MessageStore, message_store::default_message_store::DefaultMessageStore,
timer::timer_message_store::TimerMessageStore,
stats::broker_stats_manager::BrokerStatsManager, timer::timer_message_store::TimerMessageStore,
};
use tracing::{info, warn};

Expand Down Expand Up @@ -91,6 +91,7 @@
drop: Arc<AtomicBool>,
shutdown: Arc<AtomicBool>,
shutdown_hook: Option<BrokerShutdownHook>,
broker_stats_manager: Arc<BrokerStatsManager>,
}

impl Clone for BrokerRuntime {
Expand All @@ -114,6 +115,7 @@
drop: self.drop.clone(),
shutdown: self.shutdown.clone(),
shutdown_hook: self.shutdown_hook.clone(),
broker_stats_manager: self.broker_stats_manager.clone(),

Check warning on line 118 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L118

Added line #L118 was not covered by tests
}
}
}
Expand All @@ -139,7 +141,7 @@
server_config: server_config.clone(),
topic_queue_mapping_manager: topic_queue_mapping_manager.clone(),
});

let broker_stats_manager = Arc::new(BrokerStatsManager::new(broker_config.clone()));

Check warning on line 144 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L144

Added line #L144 was not covered by tests
Self {
broker_config: broker_config.clone(),
message_store_config,
Expand All @@ -162,6 +164,7 @@
drop: Arc::new(AtomicBool::new(false)),
shutdown: Arc::new(AtomicBool::new(false)),
shutdown_hook: None,
broker_stats_manager,

Check warning on line 167 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L167

Added line #L167 was not covered by tests
}
}

Expand Down Expand Up @@ -243,6 +246,7 @@
self.message_store_config.clone(),
self.broker_config.clone(),
self.topic_config_manager.topic_config_table(),
Some(self.broker_stats_manager.clone()),

Check warning on line 249 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L249

Added line #L249 was not covered by tests
);
self.topic_config_manager
.set_message_store(Some(message_store.clone()));
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-broker/src/mqtrace/send_message_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::{any::Any, collections::HashMap};

use rocketmq_common::common::message::message_enum::MessageType;
use rocketmq_store::status::StatsType;
use rocketmq_store::stats::stats_type::StatsType;

#[derive(Debug, Default)]
pub struct SendMessageContext {
Expand Down
4 changes: 1 addition & 3 deletions rocketmq-broker/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ use rocketmq_remoting::{
},
runtime::{processor::RequestProcessor, server::ConnectionHandlerContext},
};
use rocketmq_store::{
log_file::MessageStore, status::manager::broker_stats_manager::BrokerStatsManager,
};
use rocketmq_store::{log_file::MessageStore, stats::broker_stats_manager::BrokerStatsManager};
use tracing::{info, warn};

use self::client_manage_processor::ClientManageProcessor;
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-broker/src/processor/send_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use rocketmq_remoting::{
};
use rocketmq_store::{
base::message_result::PutMessageResult, log_file::MessageStore,
status::manager::broker_stats_manager::BrokerStatsManager,
stats::broker_stats_manager::BrokerStatsManager,
};
use tracing::debug;

Expand Down
5 changes: 5 additions & 0 deletions rocketmq-broker/src/util/hook_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@
config::message_store_config::MessageStoreConfig,
hook::put_message_hook::BoxedPutMessageHook,
log_file::MessageStore,
stats::broker_stats_manager::BrokerStatsManager,
store::running_flags::RunningFlags,
};

Expand Down Expand Up @@ -474,6 +475,10 @@
fn set_put_message_hook(&self, put_message_hook: BoxedPutMessageHook) {
todo!()
}

fn get_broker_stats_manager(&self) -> Option<Arc<BrokerStatsManager>> {
todo!()

Check warning on line 480 in rocketmq-broker/src/util/hook_utils.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/util/hook_utils.rs#L479-L480

Added lines #L479 - L480 were not covered by tests
}
// Implement required methods...
}

Expand Down
1 change: 1 addition & 0 deletions rocketmq-common/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub mod mix_all;
pub mod mq_version;
pub mod namesrv;
pub mod server;
pub mod statistics;
pub mod stats;
pub mod sys_flag;
pub mod thread;
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-common/src/common/broker/broker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
pub revive_queue_num: u32,
pub enable_slave_acting_master: bool,
pub reject_transaction_message: bool,
pub enable_detail_stat: bool,

Check warning on line 120 in rocketmq-common/src/common/broker/broker_config.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/broker/broker_config.rs#L120

Added line #L120 was not covered by tests
}

impl Default for BrokerConfig {
Expand Down Expand Up @@ -163,6 +164,7 @@
revive_queue_num: 8,
enable_slave_acting_master: false,
reject_transaction_message: false,
enable_detail_stat: true,
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions rocketmq-common/src/common/statistics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod interceptor;
pub mod state_getter;
pub mod statistics_brief;
pub mod statistics_item;
32 changes: 32 additions & 0 deletions rocketmq-common/src/common/statistics/interceptor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/// `Interceptor` is a trait that provides a common interface for objects that can intercept and
/// modify a sequence of integers. It provides two methods: `inc` and `reset`.
pub trait Interceptor {
/// The `inc` method accepts a vector of i64 integers, `deltas`, and applies some operation to
/// them. The specifics of the operation are determined by the implementing type.
///
/// # Arguments
///
/// * `deltas` - A vector of i64 integers to be intercepted and modified.
fn inc(&self, deltas: Vec<i64>);

/// The `reset` method resets the state of the implementing object. The specifics of what
/// "resetting" means are determined by the implementing type.
fn reset(&self);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
///
/// This trait is used to determine whether a specific instance, identified by its `instance_id`,
/// `group`, and `topic`, is online.
pub trait StateGetter {
pub trait StateGetter: Send + Sync + 'static {
/// Checks if the specified instance is online.
///
/// # Arguments
Expand Down
Loading
Loading