Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subsystem benchmarks: add cpu profiling #2601

Closed
wants to merge 48 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
01af630
skeleton
sandreim Oct 25, 2023
7c22abe
wip
sandreim Nov 6, 2023
c3adc77
measure tput and fixes
sandreim Nov 6, 2023
31b0351
add network emulation
sandreim Nov 7, 2023
e4bb037
cleanup
sandreim Nov 7, 2023
a694924
Add latency emulation
sandreim Nov 7, 2023
7ca4dba
support multiple pov sizes
sandreim Nov 8, 2023
0430b5b
new metric in recovery and more testing
sandreim Nov 8, 2023
027bcd8
CLI update and fixes
sandreim Nov 9, 2023
5a05da0
peer stats
sandreim Nov 9, 2023
895e8d6
Switch stats to atomics
sandreim Nov 10, 2023
a2fb0c9
add more network metrics, new load generator
sandreim Nov 12, 2023
d1b9fa3
refactor
sandreim Nov 14, 2023
c5937ab
pretty cli + minor refactor + remove unused
sandreim Nov 15, 2023
d6c259d
update
sandreim Nov 15, 2023
050529b
remove comment
sandreim Nov 15, 2023
cb38be5
separate cli options for availability
sandreim Nov 17, 2023
24a736a
implement unified and extensible configuration
sandreim Nov 17, 2023
2843865
Prepare to swtich to overseer
sandreim Nov 24, 2023
fd4620e
Merge branch 'master' of github.com:paritytech/polkadot-sdk into sand…
sandreim Nov 24, 2023
b17a147
add mocked subsystems
sandreim Nov 27, 2023
4724d8c
full overseer based implementation complete
sandreim Nov 27, 2023
7aed30f
make clean
sandreim Nov 27, 2023
b51485b
more cleaning
sandreim Nov 27, 2023
7e46444
more cleaning
sandreim Nov 27, 2023
d3df927
proper overseer control
sandreim Nov 27, 2023
7557768
refactor CLI display of env stats
sandreim Nov 27, 2023
787dc00
Add grafana dashboards for DA read
sandreim Nov 28, 2023
cd18f8d
network stats fixes
sandreim Nov 28, 2023
e8506b3
move examples and grafana
sandreim Nov 28, 2023
cbb6772
Add readme
sandreim Nov 28, 2023
1a80870
fmt + readme updates
sandreim Nov 28, 2023
eb49ea0
update dashboard and sample
sandreim Nov 28, 2023
b249056
remove unused
sandreim Nov 28, 2023
7fbcdfc
Merge branch 'master' of github.com:paritytech/polkadot-sdk into sand…
sandreim Nov 28, 2023
fb34181
revert unneeded changes
sandreim Nov 28, 2023
3a716a5
add missing comments and minor fixes
sandreim Nov 29, 2023
a092b76
clippy
sandreim Nov 29, 2023
ca27370
zepter format features --fix
sandreim Nov 29, 2023
be814e5
fix markdown
sandreim Nov 29, 2023
11ce8f5
remove sleep till end of block
sandreim Nov 29, 2023
8d93abc
review
sandreim Nov 29, 2023
af141ee
Emulated network improvements
sandreim Dec 1, 2023
29d80fa
fix comment
sandreim Dec 1, 2023
70ac38e
Add cpu profiling
AndreiEres Dec 4, 2023
a06f2a5
Merge branch 'sandreim/subsystem-bench' into AndreiEres/cpu-profiling
AndreiEres Dec 5, 2023
b9f4dd9
Update polkadot/node/subsystem-bench/README.md
AndreiEres Dec 5, 2023
8736689
Update
AndreiEres Dec 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update
Signed-off-by: Andrei Sandu <[email protected]>
sandreim committed Nov 15, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit d6c259df9ff7eaaa5f7207364b87a7f3a76b165e
3 changes: 1 addition & 2 deletions polkadot/node/network/availability-recovery/src/lib.rs
Original file line number Diff line number Diff line change
@@ -582,6 +582,7 @@ impl AvailabilityRecoverySubsystem {
}
}

/// Starts the inner subsystem loop.
pub async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
let mut state = State::default();
let Self { mut req_receiver, metrics, recovery_strategy_kind, bypass_availability_store } =
@@ -726,8 +727,6 @@ impl AvailabilityRecoverySubsystem {
}
}
output = state.ongoing_recoveries.select_next_some() => {
// No caching for benchmark.
#[cfg(not(feature = "subsystem-benchmarks"))]
if let Some((candidate_hash, result)) = output {
if let Ok(recovery) = CachedRecovery::try_from(result) {
state.availability_lru.insert(candidate_hash, recovery);
17 changes: 17 additions & 0 deletions polkadot/node/subsystem-bench/src/availability/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use super::*;
19 changes: 10 additions & 9 deletions polkadot/node/subsystem-bench/src/availability/configuration.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@
use std::path::Path;

use super::*;
use serde::{Deserialize,Serialize};
use serde::{Deserialize, Serialize};
/// Peer response latency configuration.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct PeerLatency {
@@ -56,7 +56,6 @@ pub struct TestConfiguration {
pub num_blocks: usize,
}


impl Default for TestConfiguration {
fn default() -> Self {
Self {
@@ -69,8 +68,8 @@ impl Default for TestConfiguration {
latency: None,
error: 0,
num_blocks: 1,
min_pov_size: 5*1024*1024,
max_pov_size: 5*1024*1024,
min_pov_size: 5 * 1024 * 1024,
max_pov_size: 5 * 1024 * 1024,
}
}
}
@@ -79,25 +78,26 @@ fn generate_pov_sizes(count: usize, min: usize, max: usize) -> Vec<usize> {
(0..count).map(|_| random_pov_size(min, max)).collect()
}

#[derive(Serialize,Deserialize)]
#[derive(Serialize, Deserialize)]
pub struct TestSequence {
#[serde(rename(serialize = "TestConfiguration", deserialize = "TestConfiguration"))]
test_configurations: Vec<TestConfiguration>
test_configurations: Vec<TestConfiguration>,
}

impl TestSequence {
pub fn to_vec(mut self) -> Vec<TestConfiguration> {
// Generate Pov sizes

for config in self.test_configurations.iter_mut() {
config.pov_sizes = generate_pov_sizes(config.n_cores, config.min_pov_size, config.max_pov_size);
config.pov_sizes =
generate_pov_sizes(config.n_cores, config.min_pov_size, config.max_pov_size);
}

self.test_configurations
}
}

impl TestSequence {
impl TestSequence {
pub fn new_from_file(path: &Path) -> std::io::Result<TestSequence> {
let string = String::from_utf8(std::fs::read(&path)?).expect("File is valid UTF8");
Ok(toml::from_str(&string).expect("File is valid test sequence TOML"))
@@ -107,7 +107,8 @@ impl TestSequence {
impl TestConfiguration {
pub fn write_to_disk(&self) {
// Serialize a slice of configurations
let toml = toml::to_string(&TestSequence{ test_configurations: vec![self.clone()] }).unwrap();
let toml =
toml::to_string(&TestSequence { test_configurations: vec![self.clone()] }).unwrap();
std::fs::write("last_test.toml", toml).unwrap();
}

27 changes: 19 additions & 8 deletions polkadot/node/subsystem-bench/src/availability/mod.rs
Original file line number Diff line number Diff line change
@@ -748,13 +748,15 @@ pub async fn bench_chunk_recovery(env: &mut TestEnvironment) {
env.metrics().set_n_cores(config.n_cores);

for block_num in 0..env.config().num_blocks {
gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num, env.config().num_blocks);
gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num + 1, env.config().num_blocks);
env.metrics().set_current_block(block_num);

let block_start_ts = Instant::now();
for candidate_num in 0..config.n_cores as u64 {
let candidate =
env.state.next_candidate().expect("We always send up to n_cores*num_blocks; qed");
let candidate = env
.state
.next_candidate()
.expect("We always send up to n_cores*num_blocks; qed");
let (tx, rx) = oneshot::channel();
batch.push(rx);

@@ -769,7 +771,7 @@ pub async fn bench_chunk_recovery(env: &mut TestEnvironment) {
.await;
}

gum::info!("{}", format!("{} requests pending", batch.len()).bright_black());
gum::info!("{}", format!("{} recoveries pending", batch.len()).bright_black());
while let Some(completed) = batch.next().await {
let available_data = completed.unwrap().unwrap();
env.metrics().on_pov_size(available_data.encoded_size());
@@ -778,21 +780,26 @@ pub async fn bench_chunk_recovery(env: &mut TestEnvironment) {

let block_time_delta =
Duration::from_secs(6).saturating_sub(Instant::now().sub(block_start_ts));

let block_time = Instant::now().sub(block_start_ts).as_millis() as u64;
env.metrics().set_block_time(block_time);
gum::info!("Block time {}", format!("{:?}ms", block_time).cyan());
gum::info!(target: LOG_TARGET,"{}", format!("Sleeping till end of block ({}ms)", block_time_delta.as_millis()).bright_black());
tokio::time::sleep(block_time_delta).await;
}

env.send_signal(OverseerSignal::Conclude).await;
let duration = start_marker.elapsed().as_millis();
let availability_bytes = availability_bytes / 1024;
gum::info!("Benchmark completed in {}", format!("{:?}ms", duration).cyan());
gum::info!("All blocks processed in {}", format!("{:?}ms", duration).cyan());
gum::info!(
"Throughput: {}",
format!("{} KiB/block", availability_bytes / env.config().num_blocks as u128).bright_red()
);
gum::info!(
"Block time: {}",
format!("{} ms", start_marker.elapsed().as_millis() / env.config().num_blocks as u128).red()
format!("{} ms", start_marker.elapsed().as_millis() / env.config().num_blocks as u128)
.red()
);

let stats = env.network().stats();
@@ -812,9 +819,13 @@ pub async fn bench_chunk_recovery(env: &mut TestEnvironment) {
let test_metrics = super::core::display::parse_metrics(&env.registry());
let subsystem_cpu_metrics =
test_metrics.subset_with_label_value("task_group", "availability-recovery-subsystem");
gum::info!(target: LOG_TARGET, "Total subsystem CPU usage {}", format!("{:.2}s", subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum")).bright_purple());
let total_cpu = subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
gum::info!(target: LOG_TARGET, "Total subsystem CPU usage {}", format!("{:.2}s", total_cpu).bright_purple());
gum::info!(target: LOG_TARGET, "CPU usage per block {}", format!("{:.2}s", total_cpu/env.config().num_blocks as f64).bright_purple());

let test_env_cpu_metrics =
test_metrics.subset_with_label_value("task_group", "test-environment");
gum::info!(target: LOG_TARGET, "Total test environment CPU usage {}", format!("{:.2}s", test_env_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum")).bright_purple());
let total_cpu = test_env_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
gum::info!(target: LOG_TARGET, "Total test environment CPU usage {}", format!("{:.2}s", total_cpu).bright_purple());
gum::info!(target: LOG_TARGET, "CPU usage per block {}", format!("{:.2}s", total_cpu/env.config().num_blocks as f64).bright_purple());
}
2 changes: 1 addition & 1 deletion polkadot/node/subsystem-bench/src/core/mod.rs
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ const LOG_TARGET: &str = "subsystem-bench::core";
use polkadot_primitives::AuthorityDiscoveryId;
use sc_service::SpawnTaskHandle;

pub mod display;
pub mod keyring;
pub mod network;
pub mod test_env;
pub mod display;
8 changes: 6 additions & 2 deletions polkadot/node/subsystem-bench/src/core/network.rs
Original file line number Diff line number Diff line change
@@ -14,9 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use colored::Colorize;
use prometheus_endpoint::U64;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::mpsc::UnboundedSender;

// An emulated node egress traffic rate_limiter.
#[derive(Debug)]
pub struct RateLimit {
@@ -282,6 +284,8 @@ impl NetworkEmulator {
spawn_task_handle: SpawnTaskHandle,
registry: &Registry,
) -> Self {
gum::info!(target: LOG_TARGET, "{}",format!("Initializing network emulation for {} peers.", n_peers).bright_blue());

let metrics = Metrics::new(&registry).expect("Metrics always register succesfully");
let mut validator_authority_id_mapping = HashMap::new();

@@ -337,8 +341,8 @@ impl NetworkEmulator {
}
}

use polkadot_node_subsystem_util::metrics::{
prometheus::{CounterVec, Opts, PrometheusError, Registry},
use polkadot_node_subsystem_util::metrics::prometheus::{
self, CounterVec, Opts, PrometheusError, Registry,
};

/// Emulated network metrics.
10 changes: 10 additions & 0 deletions polkadot/node/subsystem-bench/src/core/test_env.rs
Original file line number Diff line number Diff line change
@@ -31,6 +31,8 @@ pub struct TestEnvironmentMetrics {
pov_size: Histogram,
/// Current block
current_block: Gauge<U64>,
/// Current block
block_time: Gauge<U64>,
}

impl TestEnvironmentMetrics {
@@ -58,6 +60,10 @@ impl TestEnvironmentMetrics {
Gauge::new("subsystem_benchmark_current_block", "The current test block")?,
registry,
)?,
block_time: prometheus::register(
Gauge::new("subsystem_benchmark_block_time", "The time it takes for the target subsystems(s) to complete all the requests in a block")?,
registry,
)?,
pov_size: prometheus::register(
Histogram::with_opts(
prometheus::HistogramOpts::new(
@@ -83,6 +89,10 @@ impl TestEnvironmentMetrics {
self.current_block.set(current_block as u64);
}

pub fn set_block_time(&self, block_time_ms: u64) {
self.block_time.set(block_time_ms);
}

pub fn on_pov_size(&self, pov_size: usize) {
self.pov_size.observe(pov_size as f64);
}
41 changes: 24 additions & 17 deletions polkadot/node/subsystem-bench/src/subsystem-bench.rs
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ use clap::Parser;
use color_eyre::eyre;

use colored::Colorize;
use std::{time::Duration, path::Path};
use std::{path::Path, time::Duration};

pub(crate) mod availability;
pub(crate) mod core;
@@ -82,7 +82,6 @@ pub struct DataAvailabilityReadOptions {
pub num_blocks: usize,
}


#[derive(Debug, clap::Parser)]
#[clap(rename_all = "kebab-case")]
#[allow(missing_docs)]
@@ -91,12 +90,10 @@ pub struct TestSequenceOptions {
pub path: String,
}



/// Define the supported benchmarks targets
#[derive(Debug, Parser)]
#[command(about = "Target subsystems", version, rename_all = "kebab-case")]
enum BenchmarkTarget {
#[command(about = "Test objectives", version, rename_all = "kebab-case")]
enum TestObjective {
/// Benchmark availability recovery strategies.
DataAvailabilityRead(DataAvailabilityReadOptions),
/// Run a test sequence specified in a file
@@ -131,7 +128,7 @@ struct BenchCli {
pub peer_max_latency: Option<u64>,

#[command(subcommand)]
pub target: BenchmarkTarget,
pub objective: TestObjective,
}

fn new_runtime() -> tokio::runtime::Runtime {
@@ -150,25 +147,35 @@ impl BenchCli {

let runtime = new_runtime();

let mut test_config = match self.target {
BenchmarkTarget::TestSequence(options) => {
let test_sequence = availability::TestSequence::new_from_file(Path::new(&options.path)).expect("File exists").to_vec();
let mut test_config = match self.objective {
TestObjective::TestSequence(options) => {
let test_sequence =
availability::TestSequence::new_from_file(Path::new(&options.path))
.expect("File exists")
.to_vec();
let num_steps = test_sequence.len();
gum::info!("{}", format!("Sequence contains {} step(s)",num_steps).bright_purple());
for (index, test_config) in test_sequence.into_iter().enumerate(){
gum::info!("{}", format!("Current step {}/{}", index + 1, num_steps).bright_purple());
gum::info!(
"{}",
format!("Sequence contains {} step(s)", num_steps).bright_purple()
);
for (index, test_config) in test_sequence.into_iter().enumerate() {
gum::info!(
"{}",
format!("Current step {}/{}", index + 1, num_steps).bright_purple()
);

let candidate_count = test_config.n_cores * test_config.num_blocks;

let mut state = TestState::new(test_config);
state.generate_candidates(candidate_count);
let mut env = TestEnvironment::new(runtime.handle().clone(), state, Registry::new());

let mut env =
TestEnvironment::new(runtime.handle().clone(), state, Registry::new());

runtime.block_on(availability::bench_chunk_recovery(&mut env));
}
return Ok(())
}
BenchmarkTarget::DataAvailabilityRead(options) => match self.network {
},
TestObjective::DataAvailabilityRead(options) => match self.network {
NetworkEmulation::Healthy => TestConfiguration::healthy_network(
options.num_blocks,
options.fetch_from_backers,
Loading