Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a few counters for BlockSTM size #12581

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions aptos-move/aptos-aggregator/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,17 @@ impl DelayedFieldValue {
},
})
}

/// Approximate memory consumption of current DelayedFieldValue
pub fn get_approximate_memory_size(&self) -> usize {
// 32 + len
std::mem::size_of::<DelayedFieldValue>()
+ match &self {
DelayedFieldValue::Aggregator(_) | DelayedFieldValue::Snapshot(_) => 0,
// additional allocated memory for the data:
DelayedFieldValue::Derived(v) => v.len(),
}
}
}

impl TryFromMoveValue for DelayedFieldValue {
Expand Down
50 changes: 48 additions & 2 deletions aptos-move/block-executor/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use aptos_metrics_core::{
exponential_buckets, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, Histogram, HistogramVec, IntCounter, IntCounterVec,
exponential_buckets, register_avg_counter_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_vec, Histogram, HistogramVec, IntCounter,
IntCounterVec,
};
use aptos_mvhashmap::BlockStateStats;
use aptos_types::fee_statement::FeeStatement;
use once_cell::sync::Lazy;

Expand Down Expand Up @@ -211,6 +213,22 @@ pub static BLOCK_COMMITTED_TXNS: Lazy<HistogramVec> = Lazy::new(|| {
.unwrap()
});

pub static BLOCK_VIEW_DISTINCT_KEYS: Lazy<HistogramVec> = Lazy::new(|| {
register_avg_counter_vec(
"aptos_execution_block_view_distinct_keys",
"Size (number of keys) ",
&["mode", "object_type"],
)
});

pub static BLOCK_VIEW_BASE_VALUES_MEMORY_USAGE: Lazy<HistogramVec> = Lazy::new(|| {
register_avg_counter_vec(
"aptos_execution_block_view_base_values_memory_usage",
"Memory usage (in bytes) for base values",
&["mode", "object_type"],
)
});

fn observe_gas(counter: &Lazy<HistogramVec>, mode_str: &str, fee_statement: &FeeStatement) {
counter
.with_label_values(&[mode_str, GasType::TOTAL_GAS])
Expand Down Expand Up @@ -275,3 +293,31 @@ pub(crate) fn update_txn_gas_counters(txn_fee_statements: &Vec<FeeStatement>, is
observe_gas(&TXN_GAS, mode_str, fee_statement);
}
}

pub(crate) fn update_state_counters(block_state_stats: BlockStateStats, is_parallel: bool) {
let mode_str = if is_parallel {
Mode::PARALLEL
} else {
Mode::SEQUENTIAL
};

BLOCK_VIEW_DISTINCT_KEYS
.with_label_values(&[mode_str, "resource"])
.observe(block_state_stats.num_resources as f64);
BLOCK_VIEW_DISTINCT_KEYS
.with_label_values(&[mode_str, "resource_group"])
.observe(block_state_stats.num_resource_groups as f64);
BLOCK_VIEW_DISTINCT_KEYS
.with_label_values(&[mode_str, "delayed_field"])
.observe(block_state_stats.num_delayed_fields as f64);
BLOCK_VIEW_DISTINCT_KEYS
.with_label_values(&[mode_str, "module"])
.observe(block_state_stats.num_modules as f64);

BLOCK_VIEW_BASE_VALUES_MEMORY_USAGE
.with_label_values(&[mode_str, "resource"])
.observe(block_state_stats.base_resources_size as f64);
BLOCK_VIEW_BASE_VALUES_MEMORY_USAGE
.with_label_values(&[mode_str, "delayed_field"])
.observe(block_state_stats.base_delayed_fields_size as f64);
}
5 changes: 5 additions & 0 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,9 @@ where
}
});
drop(timer);

counters::update_state_counters(versioned_cache.stats(), true);

// Explicit async drops.
DEFAULT_DROPPER.schedule_drop((last_input_output, scheduler, versioned_cache));

Expand Down Expand Up @@ -1272,6 +1275,8 @@ where

ret.resize_with(num_txns, E::Output::skip_output);

counters::update_state_counters(unsync_map.stats(), false);

// TODO add block end info to output.
// block_limit_processor.is_block_limit_reached();

Expand Down
2 changes: 1 addition & 1 deletion aptos-move/block-executor/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ impl<'a, T: Transaction, X: Executable> SequentialState<'a, T, X> {
}

pub(crate) fn set_delayed_field_value(&self, id: T::Identifier, base_value: DelayedFieldValue) {
self.unsync_map.write_delayed_field(id, base_value)
self.unsync_map.set_base_delayed_field(id, base_value)
}

pub(crate) fn read_delayed_field(&self, id: T::Identifier) -> Option<DelayedFieldValue> {
Expand Down
21 changes: 21 additions & 0 deletions aptos-move/mvhashmap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ impl<
}
}

pub fn stats(&self) -> BlockStateStats {
BlockStateStats {
num_resources: self.data.num_keys(),
num_resource_groups: self.group_data.num_keys(),
num_delayed_fields: self.delayed_fields.num_keys(),
num_modules: self.modules.num_keys(),
base_resources_size: self.data.total_base_value_size(),
base_delayed_fields_size: self.delayed_fields.total_base_value_size(),
}
}

/// Contains 'simple' versioned data (nothing contained in groups).
pub fn data(&self) -> &VersionedData<K, V> {
&self.data
Expand Down Expand Up @@ -92,3 +103,13 @@ impl<
Self::new()
}
}

pub struct BlockStateStats {
pub num_resources: usize,
pub num_resource_groups: usize,
pub num_delayed_fields: usize,
pub num_modules: usize,

pub base_resources_size: u64,
pub base_delayed_fields_size: u64,
}
44 changes: 42 additions & 2 deletions aptos-move/mvhashmap/src/unsync_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::{
types::{GroupReadResult, MVModulesOutput, UnsyncGroupError, ValueWithLayout},
utils::module_hash,
BlockStateStats,
};
use aptos_aggregator::types::{code_invariant_error, DelayedFieldValue};
use aptos_crypto::hash::HashValue;
Expand All @@ -16,7 +17,16 @@ use aptos_vm_types::resource_group_adapter::group_size_as_sum;
use move_binary_format::errors::PartialVMResult;
use move_core_types::value::MoveTypeLayout;
use serde::Serialize;
use std::{cell::RefCell, collections::HashMap, fmt::Debug, hash::Hash, sync::Arc};
use std::{
cell::RefCell,
collections::HashMap,
fmt::Debug,
hash::Hash,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};

/// UnsyncMap is designed to mimic the functionality of MVHashMap for sequential execution.
/// In this case only the latest recorded version is relevant, simplifying the implementation.
Expand All @@ -38,6 +48,9 @@ pub struct UnsyncMap<
executable_cache: RefCell<HashMap<HashValue, Arc<X>>>,
executable_bytes: RefCell<usize>,
delayed_field_map: RefCell<HashMap<I, DelayedFieldValue>>,

total_base_resource_size: AtomicU64,
total_base_delayed_field_size: AtomicU64,
}

impl<
Expand All @@ -56,6 +69,8 @@ impl<
executable_cache: RefCell::new(HashMap::new()),
executable_bytes: RefCell::new(0),
delayed_field_map: RefCell::new(HashMap::new()),
total_base_resource_size: AtomicU64::new(0),
total_base_delayed_field_size: AtomicU64::new(0),
}
}
}
Expand All @@ -72,6 +87,17 @@ impl<
Self::default()
}

pub fn stats(&self) -> BlockStateStats {
BlockStateStats {
num_resources: self.resource_map.borrow().len(),
num_resource_groups: self.group_cache.borrow().len(),
num_delayed_fields: self.delayed_field_map.borrow().len(),
num_modules: self.module_map.borrow().len(),
base_resources_size: self.total_base_resource_size.load(Ordering::Relaxed),
base_delayed_fields_size: self.total_base_delayed_field_size.load(Ordering::Relaxed),
}
}

pub fn set_group_base_values(
&self,
group_key: K,
Expand Down Expand Up @@ -245,7 +271,13 @@ impl<
}

pub fn set_base_value(&self, key: K, value: ValueWithLayout<V>) {
self.resource_map.borrow_mut().insert(key, value);
let cur_size = value.bytes_len();
if self.resource_map.borrow_mut().insert(key, value).is_none() {
if let Some(cur_size) = cur_size {
self.total_base_resource_size
.fetch_add(cur_size as u64, Ordering::Relaxed);
}
}
}

/// We return false if the executable was already stored, as this isn't supposed to happen
Expand Down Expand Up @@ -274,6 +306,14 @@ impl<
pub fn write_delayed_field(&self, id: I, value: DelayedFieldValue) {
self.delayed_field_map.borrow_mut().insert(id, value);
}

pub fn set_base_delayed_field(&self, id: I, value: DelayedFieldValue) {
self.total_base_delayed_field_size.fetch_add(
value.get_approximate_memory_size() as u64,
Ordering::Relaxed,
);
self.delayed_field_map.borrow_mut().insert(id, value);
}
}

#[cfg(test)]
Expand Down
19 changes: 18 additions & 1 deletion aptos-move/mvhashmap/src/versioned_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::{
collections::btree_map::{self, BTreeMap},
fmt::Debug,
hash::Hash,
sync::Arc,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};

/// Every entry in shared multi-version data-structure has an "estimate" flag
Expand Down Expand Up @@ -52,6 +55,7 @@ struct VersionedValue<V> {
/// Maps each key (access path) to an internal versioned value representation.
pub struct VersionedData<K, V> {
values: DashMap<K, VersionedValue<V>>,
total_base_value_size: AtomicU64,
}

impl<V> Entry<V> {
Expand Down Expand Up @@ -213,9 +217,18 @@ impl<K: Hash + Clone + Debug + Eq, V: TransactionWrite> VersionedData<K, V> {
pub(crate) fn new() -> Self {
Self {
values: DashMap::new(),
total_base_value_size: AtomicU64::new(0),
}
}

pub(crate) fn num_keys(&self) -> usize {
self.values.len()
}

pub(crate) fn total_base_value_size(&self) -> u64 {
self.total_base_value_size.load(Ordering::Relaxed)
}

pub fn add_delta(&self, key: K, txn_idx: TxnIndex, delta: DeltaOp) {
let mut v = self.values.entry(key).or_default();
v.versioned_map.insert(
Expand Down Expand Up @@ -278,6 +291,10 @@ impl<K: Hash + Clone + Debug + Eq, V: TransactionWrite> VersionedData<K, V> {
use ValueWithLayout::*;
match v.versioned_map.entry(ShiftedTxnIndex::zero_idx()) {
Vacant(v) => {
if let Some(base_size) = value.bytes_len() {
georgemitenkov marked this conversation as resolved.
Show resolved Hide resolved
self.total_base_value_size
.fetch_add(base_size as u64, Ordering::Relaxed);
}
v.insert(CachePadded::new(Entry::new_write_from(0, value)));
},
Occupied(mut o) => {
Expand Down
27 changes: 21 additions & 6 deletions aptos-move/mvhashmap/src/versioned_delayed_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{
hash::Hash,
iter::DoubleEndedIterator,
ops::Deref,
sync::atomic::Ordering,
sync::atomic::{AtomicU64, Ordering},
};

pub enum CommitError {
Expand Down Expand Up @@ -382,7 +382,9 @@ pub struct VersionedDelayedFields<K: Clone> {

/// No deltas are allowed below next_idx_to_commit version, as all deltas (and snapshots)
/// must be materialized and converted to Values during commit.
next_idx_to_commit: AtomicTxnIndex,
next_idx_to_commit: CachePadded<AtomicTxnIndex>,

total_base_value_size: CachePadded<AtomicU64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You make this cache padded to ensure so that both counters are not colocated on the same cache line, and we avoid invalidations? But what about values and next_idx_to_commit. Is the effect actually observable here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's sync up with @gelash when he is back. for now this makes new counter not conflict, doesn't improve on existing conflicts

}

impl<K: Eq + Hash + Clone + Debug + Copy> VersionedDelayedFields<K> {
Expand All @@ -392,20 +394,33 @@ impl<K: Eq + Hash + Clone + Debug + Copy> VersionedDelayedFields<K> {
pub(crate) fn new() -> Self {
Self {
values: DashMap::new(),
next_idx_to_commit: AtomicTxnIndex::new(0),
next_idx_to_commit: CachePadded::new(AtomicTxnIndex::new(0)),
total_base_value_size: CachePadded::new(AtomicU64::new(0)),
}
}

pub(crate) fn num_keys(&self) -> usize {
self.values.len()
}

pub(crate) fn total_base_value_size(&self) -> u64 {
self.total_base_value_size.load(Ordering::Relaxed)
}

/// Must be called when an delayed field from storage is resolved, with ID replacing the
/// base value. This ensures that VersionedValue exists for the delayed field before any
/// other uses (adding deltas, etc).
///
/// Setting base value multiple times, even concurrently, is okay for the same ID,
/// because the corresponding value prior to the block is fixed.
pub fn set_base_value(&self, id: K, base_value: DelayedFieldValue) {
self.values
.entry(id)
.or_insert(VersionedValue::new(Some(base_value)));
self.values.entry(id).or_insert_with(|| {
self.total_base_value_size.fetch_add(
base_value.get_approximate_memory_size() as u64,
Ordering::Relaxed,
);
VersionedValue::new(Some(base_value))
});
}

/// Must be called when an delayed field creation with a given ID and initial value is
Expand Down
4 changes: 4 additions & 0 deletions aptos-move/mvhashmap/src/versioned_group_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ impl<
}
}

pub(crate) fn num_keys(&self) -> usize {
self.group_values.len()
}

pub fn set_raw_base_values(&self, key: K, base_values: impl IntoIterator<Item = (T, V)>) {
// Incarnation is irrelevant for storage version, set to 0.
self.group_values
Expand Down
4 changes: 4 additions & 0 deletions aptos-move/mvhashmap/src/versioned_modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ impl<K: Hash + Clone + Eq, V: TransactionWrite, X: Executable> VersionedModules<
}
}

pub(crate) fn num_keys(&self) -> usize {
self.values.len()
}

/// Mark an entry from transaction 'txn_idx' at access path 'key' as an estimated write
/// (for future incarnation). Will panic if the entry is not in the data-structure.
pub fn mark_estimate(&self, key: &K, txn_idx: TxnIndex) {
Expand Down
15 changes: 13 additions & 2 deletions crates/aptos-metrics-core/src/avg_counter.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
// Copyright © Aptos Foundation

use prometheus::{register_histogram, Histogram};
use prometheus::{register_histogram, register_histogram_vec, Histogram, HistogramVec};

// use histogram, instead of pair of sum/count counters, to guarantee
// atomicity of observing and fetching (which Histogram handles correctly)
pub fn register_avg_counter(name: &str, desc: &str) -> Histogram {
register_histogram!(
name,
desc,
// We need to have at least one bucket in histogram, otherwise default buckets are used
// We need to have at least one bucket in histogram, otherwise default buckets are used.
vec![0.5],
)
.unwrap()
}

pub fn register_avg_counter_vec(name: &str, desc: &str, labels: &[&str]) -> HistogramVec {
register_histogram_vec!(
name,
desc,
labels,
// We need to have at least one bucket in histogram, otherwise default buckets are used.
vec![0.5],
)
.unwrap()
Expand Down
Loading
Loading