Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Implement prune only stagnant check mode #5761

Merged
merged 11 commits into from
Jul 9, 2022
62 changes: 57 additions & 5 deletions node/core/chain-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Timestamp = u64;
// If a block isn't approved in 120 seconds, nodes will abandon it
// and begin building on another chain.
const STAGNANT_TIMEOUT: Timestamp = 120;
// Delay prunning of the stagnant keys in prune only mode by 25 hours to avoid interception with the finality
const STAGNANT_PRUNE_DELAY: Timestamp = 25 * 60 * 60;
// Maximum number of stagnant entries cleaned during one `STAGNANT_TIMEOUT` iteration
const MAX_STAGNANT_ENTRIES: usize = 1000;

Expand Down Expand Up @@ -297,13 +299,28 @@ impl StagnantCheckInterval {
}
}

/// Mode of the stagnant check operations: check and prune or prune only
#[derive(Debug, Clone)]
pub enum StagnantCheckMode {
CheckAndPrune,
PruneOnly,
}

impl Default for StagnantCheckMode {
fn default() -> Self {
StagnantCheckMode::PruneOnly
}
}

/// Configuration for the chain selection subsystem.
#[derive(Debug, Clone)]
pub struct Config {
/// The column in the database that the storage should use.
pub col_data: u32,
/// How often to check for stagnant blocks.
pub stagnant_check_interval: StagnantCheckInterval,
/// Mode of stagnant checks
pub stagnant_check_mode: StagnantCheckMode,
}

/// The chain selection subsystem.
Expand Down Expand Up @@ -340,9 +357,15 @@ impl<Context> ChainSelectionSubsystem {
);

SpawnedSubsystem {
future: run(ctx, backend, self.config.stagnant_check_interval, Box::new(SystemClock))
.map(Ok)
.boxed(),
future: run(
ctx,
backend,
self.config.stagnant_check_interval,
self.config.stagnant_check_mode,
Box::new(SystemClock),
)
.map(Ok)
.boxed(),
name: "chain-selection-subsystem",
}
}
Expand All @@ -353,12 +376,20 @@ async fn run<Context, B>(
mut ctx: Context,
mut backend: B,
stagnant_check_interval: StagnantCheckInterval,
stagnant_check_mode: StagnantCheckMode,
clock: Box<dyn Clock + Send + Sync>,
) where
B: Backend,
{
loop {
let res = run_until_error(&mut ctx, &mut backend, &stagnant_check_interval, &*clock).await;
let res = run_until_error(
&mut ctx,
&mut backend,
&stagnant_check_interval,
&stagnant_check_mode,
&*clock,
)
.await;
match res {
Err(e) => {
e.trace();
Expand All @@ -383,6 +414,7 @@ async fn run_until_error<Context, B>(
ctx: &mut Context,
backend: &mut B,
stagnant_check_interval: &StagnantCheckInterval,
stagnant_check_mode: &StagnantCheckMode,
clock: &(dyn Clock + Sync),
) -> Result<(), Error>
where
Expand Down Expand Up @@ -437,7 +469,13 @@ where
}
}
_ = stagnant_check_stream.next().fuse() => {
detect_stagnant(backend, clock.timestamp_now(), MAX_STAGNANT_ENTRIES)?;
match stagnant_check_mode {
StagnantCheckMode::CheckAndPrune => detect_stagnant(backend, clock.timestamp_now(), MAX_STAGNANT_ENTRIES),
StagnantCheckMode::PruneOnly => {
let now_timestamp = clock.timestamp_now();
prune_only_stagnant(backend, now_timestamp - STAGNANT_PRUNE_DELAY, MAX_STAGNANT_ENTRIES)
},
}?;
}
}
}
Expand Down Expand Up @@ -653,6 +691,20 @@ fn detect_stagnant(
backend.write(ops)
}

fn prune_only_stagnant(
backend: &mut impl Backend,
up_to: Timestamp,
max_elements: usize,
) -> Result<(), Error> {
let ops = {
let overlay = tree::prune_only_stagnant(&*backend, up_to, max_elements)?;

overlay.into_write_ops()
};

backend.write(ops)
}

// Load the leaves from the backend. If there are no leaves, then return
// the finalized block.
async fn load_leaves(
Expand Down
1 change: 1 addition & 0 deletions node/core/chain-selection/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
context,
backend.clone(),
StagnantCheckInterval::new(TEST_STAGNANT_INTERVAL),
StagnantCheckMode::CheckAndPrune,
Box::new(clock.clone()),
);

Expand Down
35 changes: 34 additions & 1 deletion node/core/chain-selection/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ pub(super) fn detect_stagnant<'a, B: 'a + Backend>(
?up_to,
?min_ts,
?max_ts,
"Prepared {} stagnant entries for pruning",
"Prepared {} stagnant entries for checking/pruning",
stagnant_up_to.len()
);

Expand Down Expand Up @@ -594,6 +594,39 @@ pub(super) fn detect_stagnant<'a, B: 'a + Backend>(
Ok(backend)
}

/// Prune stagnant entries at some timestamp without other checks
/// This function is intended just to clean leftover entries when the real
/// stagnant checks are disabled
pub(super) fn prune_only_stagnant<'a, B: 'a + Backend>(
backend: &'a B,
up_to: Timestamp,
max_elements: usize,
) -> Result<OverlayedBackend<'a, B>, Error> {
let stagnant_up_to = backend.load_stagnant_at_up_to(up_to, max_elements)?;
let mut backend = OverlayedBackend::new(backend);

let (min_ts, max_ts) = match stagnant_up_to.len() {
0 => (0 as Timestamp, 0 as Timestamp),
1 => (stagnant_up_to[0].0, stagnant_up_to[0].0),
n => (stagnant_up_to[0].0, stagnant_up_to[n - 1].0),
};

gum::debug!(
target: LOG_TARGET,
?up_to,
?min_ts,
?max_ts,
"Prepared {} stagnant entries for pruning",
stagnant_up_to.len()
);

for (timestamp, _) in stagnant_up_to {
backend.delete_stagnant_at(timestamp);
}

Ok(backend)
}

/// Revert the tree to the block relative to `hash`.
///
/// This accepts a fresh backend and returns an overlay on top of it representing
Expand Down
4 changes: 3 additions & 1 deletion node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,8 @@ where

let chain_selection_config = ChainSelectionConfig {
col_data: parachains_db::REAL_COLUMNS.col_chain_selection_data,
stagnant_check_interval: chain_selection_subsystem::StagnantCheckInterval::never(),
stagnant_check_interval: Default::default(),
stagnant_check_mode: chain_selection_subsystem::StagnantCheckMode::PruneOnly,
};

let dispute_coordinator_config = DisputeCoordinatorConfig {
Expand Down Expand Up @@ -1477,6 +1478,7 @@ fn revert_chain_selection(db: Arc<dyn Database>, hash: Hash) -> sp_blockchain::R
let config = chain_selection_subsystem::Config {
col_data: parachains_db::REAL_COLUMNS.col_chain_selection_data,
stagnant_check_interval: chain_selection_subsystem::StagnantCheckInterval::never(),
stagnant_check_mode: chain_selection_subsystem::StagnantCheckMode::PruneOnly,
ordian marked this conversation as resolved.
Show resolved Hide resolved
};

let chain_selection = chain_selection_subsystem::ChainSelectionSubsystem::new(config, db);
Expand Down