Skip to content

Commit

Permalink
[ISSUE #465]🔖Enhance the functionality of BrokerStatsManager -3
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jun 9, 2024
1 parent 6b4b742 commit f3fc52a
Show file tree
Hide file tree
Showing 8 changed files with 577 additions and 18 deletions.
4 changes: 4 additions & 0 deletions rocketmq-common/src/common/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ pub mod state_getter;
pub mod statistics_brief;
pub mod statistics_item;
pub mod statistics_item_formatter;
pub mod statistics_item_scheduled_printer;
pub mod statistics_item_state_getter;
pub mod statistics_kind_meta;
pub mod statistics_manager;
4 changes: 2 additions & 2 deletions rocketmq-common/src/common/statistics/statistics_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ impl StatisticsItem {
&self.invoke_times
}

pub fn last_timestamp(&self) -> &AtomicU64 {
&self.last_timestamp
pub fn last_timestamp(&self) -> u64 {
self.last_timestamp.load(Ordering::Relaxed)

Check warning on line 108 in rocketmq-common/src/common/statistics/statistics_item.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_item.rs#L107-L108

Added lines #L107 - L108 were not covered by tests
}

pub fn item_accumulate(&self, item_name: &str) -> Option<&AtomicI64> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::common::statistics::statistics_item::StatisticsItem;

pub struct StatisticsItemScheduledPrinter;

impl StatisticsItemScheduledPrinter {
pub fn schedule(&self, _statistics_item: &StatisticsItem) {
unimplemented!()

Check warning on line 23 in rocketmq-common/src/common/statistics/statistics_item_scheduled_printer.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_item_scheduled_printer.rs#L22-L23

Added lines #L22 - L23 were not covered by tests
}

pub fn remove(&self, _statistics_item: &StatisticsItem) {
unimplemented!()

Check warning on line 27 in rocketmq-common/src/common/statistics/statistics_item_scheduled_printer.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_item_scheduled_printer.rs#L26-L27

Added lines #L26 - L27 were not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::common::statistics::statistics_item::StatisticsItem;

/// `StatisticsItemStateGetter` is a trait that provides a common interface for objects that can
/// determine the online status of a `StatisticsItem`.
pub trait StatisticsItemStateGetter {
/// The `online` method accepts a reference to a `StatisticsItem` and returns a boolean
/// indicating whether the item is online or not.
///
/// # Arguments
///
/// * `item` - A reference to a `StatisticsItem` whose online status is to be determined.
///
/// # Returns
///
/// * `bool` - A boolean indicating whether the `StatisticsItem` is online (`true`) or not
/// (`false`).
fn online(&self, item: &StatisticsItem) -> bool;
}
61 changes: 61 additions & 0 deletions rocketmq-common/src/common/statistics/statistics_kind_meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::common::statistics::statistics_item_scheduled_printer::StatisticsItemScheduledPrinter;

pub struct StatisticsKindMeta {
name: String,
item_names: Vec<String>,
scheduled_printer: StatisticsItemScheduledPrinter,
}

impl StatisticsKindMeta {
pub fn new(

Check warning on line 26 in rocketmq-common/src/common/statistics/statistics_kind_meta.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_kind_meta.rs#L26

Added line #L26 was not covered by tests
name: String,
item_names: Vec<String>,
scheduled_printer: StatisticsItemScheduledPrinter,
) -> Self {
StatisticsKindMeta {

Check warning on line 31 in rocketmq-common/src/common/statistics/statistics_kind_meta.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_kind_meta.rs#L31

Added line #L31 was not covered by tests
name,
item_names,
scheduled_printer,
}
}

Check warning on line 36 in rocketmq-common/src/common/statistics/statistics_kind_meta.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_kind_meta.rs#L36

Added line #L36 was not covered by tests

pub fn get_name(&self) -> &str {
&self.name
}

Check warning on line 40 in rocketmq-common/src/common/statistics/statistics_kind_meta.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_kind_meta.rs#L38-L40

Added lines #L38 - L40 were not covered by tests

pub fn set_name(&mut self, name: String) {
self.name = name;
}

Check warning on line 44 in rocketmq-common/src/common/statistics/statistics_kind_meta.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_kind_meta.rs#L42-L44

Added lines #L42 - L44 were not covered by tests

pub fn get_item_names(&self) -> &Vec<String> {
&self.item_names
}

Check warning on line 48 in rocketmq-common/src/common/statistics/statistics_kind_meta.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_kind_meta.rs#L46-L48

Added lines #L46 - L48 were not covered by tests

pub fn set_item_names(&mut self, item_names: Vec<String>) {
self.item_names = item_names;
}

Check warning on line 52 in rocketmq-common/src/common/statistics/statistics_kind_meta.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_kind_meta.rs#L50-L52

Added lines #L50 - L52 were not covered by tests

pub fn get_scheduled_printer(&self) -> &StatisticsItemScheduledPrinter {
&self.scheduled_printer
}

Check warning on line 56 in rocketmq-common/src/common/statistics/statistics_kind_meta.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_kind_meta.rs#L54-L56

Added lines #L54 - L56 were not covered by tests

pub fn set_scheduled_printer(&mut self, scheduled_printer: StatisticsItemScheduledPrinter) {

Check warning on line 58 in rocketmq-common/src/common/statistics/statistics_kind_meta.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_kind_meta.rs#L58

Added line #L58 was not covered by tests
self.scheduled_printer = scheduled_printer;
}

Check warning on line 60 in rocketmq-common/src/common/statistics/statistics_kind_meta.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_kind_meta.rs#L60

Added line #L60 was not covered by tests
}
175 changes: 175 additions & 0 deletions rocketmq-common/src/common/statistics/statistics_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, SystemTime},
};

use parking_lot::RwLock;
use tokio::{task, time::interval};

use crate::{
common::statistics::{
statistics_item::StatisticsItem, statistics_item_state_getter::StatisticsItemStateGetter,
statistics_kind_meta::StatisticsKindMeta,
},
TimeUtils::get_current_millis,
};

type StatsTable = Arc<RwLock<HashMap<String, HashMap<String, Arc<StatisticsItem>>>>>;

pub struct StatisticsManager {
kind_meta_map: Arc<RwLock<HashMap<String, Arc<StatisticsKindMeta>>>>,
brief_metas: Option<Vec<(String, Vec<Vec<i64>>)>>,
stats_table: StatsTable,
statistics_item_state_getter: Option<Arc<dyn StatisticsItemStateGetter + Send + Sync>>,
}

impl Default for StatisticsManager {
fn default() -> Self {
Self::new()
}

Check warning on line 46 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L44-L46

Added lines #L44 - L46 were not covered by tests
}

impl StatisticsManager {
const MAX_IDLE_TIME: u64 = 10 * 60 * 1000;

pub fn new() -> Self {
let manager = Self {
kind_meta_map: Arc::new(RwLock::new(HashMap::new())),
brief_metas: None,
stats_table: Arc::new(RwLock::new(HashMap::new())),
statistics_item_state_getter: None,
};
manager.start();
manager
}

Check warning on line 61 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L52-L61

Added lines #L52 - L61 were not covered by tests

pub fn with_kind_meta(kind_meta: HashMap<String, Arc<StatisticsKindMeta>>) -> Self {
let manager = Self {
kind_meta_map: Arc::new(RwLock::new(kind_meta)),
brief_metas: None,
stats_table: Arc::new(RwLock::new(HashMap::new())),
statistics_item_state_getter: None,
};
manager.start();
manager
}

Check warning on line 72 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L63-L72

Added lines #L63 - L72 were not covered by tests

pub fn add_statistics_kind_meta(&self, kind_meta: Arc<StatisticsKindMeta>) {
let mut kind_meta_map = self.kind_meta_map.write();
kind_meta_map.insert(kind_meta.get_name().to_string(), kind_meta.clone());
let mut stats_table = self.stats_table.write();
stats_table
.entry(kind_meta.get_name().to_string())

Check warning on line 79 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L74-L79

Added lines #L74 - L79 were not covered by tests
.or_default();
}

Check warning on line 81 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L81

Added line #L81 was not covered by tests

pub fn set_brief_meta(&mut self, brief_metas: Vec<(String, Vec<Vec<i64>>)>) {
self.brief_metas = Some(brief_metas);
}

Check warning on line 85 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L83-L85

Added lines #L83 - L85 were not covered by tests

fn start(&self) {
let stats_table = self.stats_table.clone();
let kind_meta_map = self.kind_meta_map.clone();
let statistics_item_state_getter = self.statistics_item_state_getter.clone();

Check warning on line 90 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L87-L90

Added lines #L87 - L90 were not covered by tests

task::spawn(async move {
let mut interval = interval(Duration::from_millis(Self::MAX_IDLE_TIME / 3));
let stats_table_clone = stats_table.clone();

Check warning on line 94 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L92-L94

Added lines #L92 - L94 were not covered by tests
loop {
interval.tick().await;

Check warning on line 96 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L96

Added line #L96 was not covered by tests

let stats_table = stats_table.read();
for (_kind, item_map) in stats_table.iter() {
let tmp_item_map: HashMap<_, _> = item_map.clone().into_iter().collect();

Check warning on line 100 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L98-L100

Added lines #L98 - L100 were not covered by tests

for item in tmp_item_map.values() {
let last_time_stamp = item.last_timestamp();
if get_current_millis() - last_time_stamp > Self::MAX_IDLE_TIME
&& (statistics_item_state_getter.is_none()
|| !statistics_item_state_getter.as_ref().unwrap().online(item))

Check warning on line 106 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L102-L106

Added lines #L102 - L106 were not covered by tests
{
// Remove expired item
remove(item, &stats_table_clone, &kind_meta_map);

Check warning on line 109 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L109

Added line #L109 was not covered by tests
}
}
}
}
});
}

Check warning on line 115 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L112-L115

Added lines #L112 - L115 were not covered by tests

pub async fn inc(&self, kind: &str, key: &str, item_accumulates: Vec<i64>) -> bool {

Check warning on line 117 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L117

Added line #L117 was not covered by tests
if let Some(item_map) = self.stats_table.write().get_mut(kind) {
if let Some(item) = item_map.get(key) {
item.inc_items(item_accumulates.clone());
return true;
} else {
let kind_meta_map = self.kind_meta_map.read();
if let Some(kind_meta) = kind_meta_map.get(kind) {
let new_item = Arc::new(StatisticsItem::new(
kind,
key,
kind_meta
.get_item_names()
.iter()
.map(|item| item.as_str())
.collect(),
));
item_map.insert(key.to_string(), new_item.clone());
new_item.inc_items(item_accumulates);
self.schedule_statistics_item(new_item);
return true;
}
}
}
false
}

Check warning on line 142 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L142

Added line #L142 was not covered by tests

fn schedule_statistics_item(&self, item: Arc<StatisticsItem>) {
let kind_meta_map = self.kind_meta_map.read();
if let Some(kind_meta) = kind_meta_map.get(item.stat_kind()) {
kind_meta.get_scheduled_printer().schedule(item.as_ref());

Check warning on line 147 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L144-L147

Added lines #L144 - L147 were not covered by tests
}
}

Check warning on line 149 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L149

Added line #L149 was not covered by tests

pub fn set_statistics_item_state_getter(

Check warning on line 151 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L151

Added line #L151 was not covered by tests
&mut self,
getter: Arc<dyn StatisticsItemStateGetter + Send + Sync>,
) {
self.statistics_item_state_getter = Some(getter);
}

Check warning on line 156 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L155-L156

Added lines #L155 - L156 were not covered by tests
}

pub fn remove(

Check warning on line 159 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L159

Added line #L159 was not covered by tests
item: &StatisticsItem,
stats_table: &StatsTable,
kind_meta_map: &Arc<RwLock<HashMap<String, Arc<StatisticsKindMeta>>>>,
) {
let stat_kind = item.stat_kind();
let stat_object = item.stat_object();
if let Some(item_map) = stats_table.write().get_mut(stat_kind) {
item_map.remove(stat_object);
}

Check warning on line 168 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L164-L168

Added lines #L164 - L168 were not covered by tests

// Remove from scheduled printer
let kind_meta_map = kind_meta_map.write();
if let Some(kind_meta) = kind_meta_map.get(stat_kind) {
kind_meta.get_scheduled_printer().remove(item);

Check warning on line 173 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L171-L173

Added lines #L171 - L173 were not covered by tests
}
}

Check warning on line 175 in rocketmq-common/src/common/statistics/statistics_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/statistics/statistics_manager.rs#L175

Added line #L175 was not covered by tests
6 changes: 6 additions & 0 deletions rocketmq-common/src/common/stats/stats_item_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,9 @@
*/
#[derive(Debug)]
pub struct StatsItemSet {}

impl StatsItemSet {
pub fn new(_stats_name: String) -> Self {

Check warning on line 21 in rocketmq-common/src/common/stats/stats_item_set.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/stats/stats_item_set.rs#L21

Added line #L21 was not covered by tests
StatsItemSet {}
}

Check warning on line 23 in rocketmq-common/src/common/stats/stats_item_set.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/stats/stats_item_set.rs#L23

Added line #L23 was not covered by tests
}
Loading

0 comments on commit f3fc52a

Please sign in to comment.