Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Refactor PVF preparation memory stats #6693

Merged
merged 7 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
#![warn(missing_docs)]

use polkadot_node_core_pvf::{
InvalidCandidate as WasmInvalidCandidate, PrepareError, Pvf, ValidationError, ValidationHost,
InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, Pvf, ValidationError,
ValidationHost,
};
use polkadot_node_primitives::{
BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
Expand Down Expand Up @@ -654,7 +655,7 @@ trait ValidationBackend {
validation_result
}

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError>;
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<PrepareStats, PrepareError>;
}

#[async_trait]
Expand All @@ -680,7 +681,7 @@ impl ValidationBackend for ValidationHost {
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
}

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError> {
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<PrepareStats, PrepareError> {
let (tx, rx) = oneshot::channel();
if let Err(err) = self.precheck_pvf(pvf, tx).await {
// Return an IO error if there was an error communicating with the host.
Expand Down
12 changes: 6 additions & 6 deletions node/core/candidate-validation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ impl ValidationBackend for MockValidateCandidateBackend {
result
}

async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<Duration, PrepareError> {
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<PrepareStats, PrepareError> {
unreachable!()
}
}
Expand Down Expand Up @@ -894,11 +894,11 @@ fn pov_decompression_failure_is_invalid() {
}

struct MockPreCheckBackend {
result: Result<Duration, PrepareError>,
result: Result<PrepareStats, PrepareError>,
}

impl MockPreCheckBackend {
fn with_hardcoded_result(result: Result<Duration, PrepareError>) -> Self {
fn with_hardcoded_result(result: Result<PrepareStats, PrepareError>) -> Self {
Self { result }
}
}
Expand All @@ -914,7 +914,7 @@ impl ValidationBackend for MockPreCheckBackend {
unreachable!()
}

async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<Duration, PrepareError> {
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<PrepareStats, PrepareError> {
self.result.clone()
}
}
Expand All @@ -931,7 +931,7 @@ fn precheck_works() {

let (check_fut, check_result) = precheck_pvf(
ctx.sender(),
MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())),
MockPreCheckBackend::with_hardcoded_result(Ok(PrepareStats::default())),
relay_parent,
validation_code_hash,
)
Expand Down Expand Up @@ -977,7 +977,7 @@ fn precheck_invalid_pvf_blob_compression() {

let (check_fut, check_result) = precheck_pvf(
ctx.sender(),
MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())),
MockPreCheckBackend::with_hardcoded_result(Ok(PrepareStats::default())),
relay_parent,
validation_code_hash,
)
Expand Down
10 changes: 5 additions & 5 deletions node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::{error::PrepareError, host::PrepareResultSender};
use crate::{error::PrepareError, host::PrepareResultSender, prepare::PrepareStats};
use always_assert::always;
use polkadot_parachain::primitives::ValidationCodeHash;
use std::{
Expand Down Expand Up @@ -101,8 +101,8 @@ pub enum ArtifactState {
/// This is updated when we get the heads up for this artifact or when we just discover
/// this file.
last_time_needed: SystemTime,
/// The CPU time that was taken preparing this artifact.
cpu_time_elapsed: Duration,
/// Stats produced by successful preparation.
prepare_stats: PrepareStats,
},
/// A task to prepare this artifact is scheduled.
Preparing {
Expand Down Expand Up @@ -177,12 +177,12 @@ impl Artifacts {
&mut self,
artifact_id: ArtifactId,
last_time_needed: SystemTime,
cpu_time_elapsed: Duration,
prepare_stats: PrepareStats,
) {
// See the precondition.
always!(self
.artifacts
.insert(artifact_id, ArtifactState::Prepared { last_time_needed, cpu_time_elapsed })
.insert(artifact_id, ArtifactState::Prepared { last_time_needed, prepare_stats })
.is_none());
}

Expand Down
7 changes: 4 additions & 3 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::prepare::PrepareStats;
use parity_scale_codec::{Decode, Encode};
use std::{any::Any, fmt, time::Duration};
use std::{any::Any, fmt};

/// Result of PVF preparation performed by the validation host. Contains the elapsed CPU time if
/// Result of PVF preparation performed by the validation host. Contains stats about the preparation if
/// successful
pub type PrepareResult = Result<Duration, PrepareError>;
pub type PrepareResult = Result<PrepareStats, PrepareError>;

/// An error that occurred during the prepare part of the PVF pipeline.
#[derive(Debug, Clone, Encode, Decode)]
Expand Down
30 changes: 17 additions & 13 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,9 @@ async fn handle_precheck_pvf(

if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { last_time_needed, cpu_time_elapsed } => {
ArtifactState::Prepared { last_time_needed, prepare_stats } => {
*last_time_needed = SystemTime::now();
let _ = result_sender.send(Ok(*cpu_time_elapsed));
let _ = result_sender.send(Ok(prepare_stats.clone()));
},
ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
waiting_for_response.push(result_sender),
Expand Down Expand Up @@ -725,8 +725,8 @@ async fn handle_prepare_done(
}

*state = match result {
Ok(cpu_time_elapsed) =>
ArtifactState::Prepared { last_time_needed: SystemTime::now(), cpu_time_elapsed },
Ok(prepare_stats) =>
ArtifactState::Prepared { last_time_needed: SystemTime::now(), prepare_stats },
Err(error) => {
let last_time_failed = SystemTime::now();
let num_failures = *num_failures + 1;
Expand Down Expand Up @@ -834,7 +834,7 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()>
#[cfg(test)]
mod tests {
use super::*;
use crate::{InvalidCandidate, PrepareError};
use crate::{prepare::PrepareStats, InvalidCandidate, PrepareError};
use assert_matches::assert_matches;
use futures::future::BoxFuture;

Expand Down Expand Up @@ -1056,8 +1056,12 @@ mod tests {
let mut builder = Builder::default();
builder.cleanup_pulse_interval = Duration::from_millis(100);
builder.artifact_ttl = Duration::from_millis(500);
builder.artifacts.insert_prepared(artifact_id(1), mock_now, Duration::default());
builder.artifacts.insert_prepared(artifact_id(2), mock_now, Duration::default());
builder
.artifacts
.insert_prepared(artifact_id(1), mock_now, PrepareStats::default());
builder
.artifacts
.insert_prepared(artifact_id(2), mock_now, PrepareStats::default());
let mut test = builder.build();
let mut host = test.host_handle();

Expand Down Expand Up @@ -1129,7 +1133,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand All @@ -1145,7 +1149,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down Expand Up @@ -1197,7 +1201,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down Expand Up @@ -1304,7 +1308,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down Expand Up @@ -1454,7 +1458,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down Expand Up @@ -1630,7 +1634,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub mod testing;
pub use sp_tracing;

pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
pub use prepare::PrepareStats;
pub use priority::Priority;
pub use pvf::Pvf;

Expand Down
37 changes: 22 additions & 15 deletions node/core/pvf/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

//! Prometheus metrics related to the validation host.

use crate::prepare::MemoryStats;
use polkadot_node_metrics::metrics::{self, prometheus};

/// Validation host metrics.
Expand Down Expand Up @@ -73,24 +74,24 @@ impl Metrics {
self.0.as_ref().map(|metrics| metrics.execution_time.start_timer())
}

/// Observe max_rss for preparation.
pub(crate) fn observe_preparation_max_rss(&self, max_rss: f64) {
/// Observe memory stats for preparation.
#[allow(unused_variables)]
pub(crate) fn observe_preparation_memory_metrics(&self, memory_stats: MemoryStats) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_rss.observe(max_rss);
}
}
#[cfg(target_os = "linux")]
if let Some(max_rss) = memory_stats.max_rss {
metrics.preparation_max_rss.observe(max_rss as f64);
}

/// Observe max resident memory for preparation.
pub(crate) fn observe_preparation_max_resident(&self, max_resident_kb: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_resident.observe(max_resident_kb);
}
}
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
if let Some(tracker_stats) = memory_stats.memory_tracker_stats {
// We convert these stats from B to KB to match the unit of `ru_maxrss` from `getrusage`.
let max_resident_kb = (tracker_stats.resident / 1024) as f64;
let max_allocated_kb = (tracker_stats.allocated / 1024) as f64;

/// Observe max allocated memory for preparation.
pub(crate) fn observe_preparation_max_allocated(&self, max_allocated_kb: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_allocated.observe(max_allocated_kb);
metrics.preparation_max_resident.observe(max_resident_kb);
metrics.preparation_max_allocated.observe(max_allocated_kb);
}
}
}
}
Expand All @@ -106,8 +107,11 @@ struct MetricsInner {
execute_finished: prometheus::Counter<prometheus::U64>,
preparation_time: prometheus::Histogram,
execution_time: prometheus::Histogram,
#[cfg(target_os = "linux")]
preparation_max_rss: prometheus::Histogram,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_allocated: prometheus::Histogram,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_resident: prometheus::Histogram,
}

Expand Down Expand Up @@ -226,6 +230,7 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
#[cfg(target_os = "linux")]
preparation_max_rss: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
Expand All @@ -238,6 +243,7 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_resident: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
Expand All @@ -250,6 +256,7 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_allocated: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
Expand Down
Loading