Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): Support Merkle tree recovery with pruning enabled #3172

Merged
merged 6 commits into from
Nov 6, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Check pruning info during recovery
slowli committed Oct 24, 2024

Verified

This commit was signed with the committer’s verified signature. The key has expired.
tdmorello Tim Morello
commit 3067f0302edc66e6b782ed33c7eb0e01e1c8d649
31 changes: 28 additions & 3 deletions core/node/metadata_calculator/src/recovery/mod.rs
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ use std::{
};

use anyhow::Context as _;
use async_trait::async_trait;
use futures::future;
use tokio::sync::{watch, Mutex, Semaphore};
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal};
@@ -51,12 +52,13 @@ mod tests;

/// Handler of recovery life cycle events. This functionality is encapsulated in a trait to be able
/// to control recovery behavior in tests.
#[async_trait]
trait HandleRecoveryEvent: fmt::Debug + Send + Sync {
fn recovery_started(&mut self, _chunk_count: u64, _recovered_chunk_count: u64) {
// Default implementation does nothing
}

fn chunk_recovered(&self) {
async fn chunk_recovered(&self) {
// Default implementation does nothing
}
}
@@ -79,6 +81,7 @@ impl<'a> RecoveryHealthUpdater<'a> {
}
}

#[async_trait]
impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> {
fn recovery_started(&mut self, chunk_count: u64, recovered_chunk_count: u64) {
self.chunk_count = chunk_count;
@@ -88,7 +91,7 @@ impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> {
.set(recovered_chunk_count);
}

fn chunk_recovered(&self) {
async fn chunk_recovered(&self) {
let recovered_chunk_count = self.recovered_chunk_count.fetch_add(1, Ordering::SeqCst) + 1;
let chunks_left = self.chunk_count.saturating_sub(recovered_chunk_count);
tracing::info!(
@@ -294,7 +297,7 @@ impl AsyncTreeRecovery {
if Self::recover_key_chunk(&tree, init_params.l2_block, chunk, pool, stop_receiver)
.await?
{
options.events.chunk_recovered();
options.events.chunk_recovered().await;
}
anyhow::Ok(())
});
@@ -317,6 +320,11 @@ impl AsyncTreeRecovery {
);
}

// Check pruning info one last time before finalizing the tree.
let mut storage = pool.connection_tagged("metadata_calculator").await?;
Self::check_pruning_info(&mut storage, init_params.l2_block).await?;
drop(storage);

let tree = tree.finalize().await?;
finalize_latency.observe();
tracing::info!(
@@ -371,6 +379,21 @@ impl AsyncTreeRecovery {
Ok(output)
}

async fn check_pruning_info(
storage: &mut Connection<'_, Core>,
snapshot_l2_block: L2BlockNumber,
) -> anyhow::Result<()> {
let pruning_info = storage.pruning_dal().get_pruning_info().await?;
if let Some(last_hard_pruned_l2_block) = pruning_info.last_hard_pruned_l2_block {
anyhow::ensure!(
last_hard_pruned_l2_block == snapshot_l2_block,
"Additional data was pruned compared to tree recovery L2 block #{snapshot_l2_block}: {pruning_info:?}. \
Continuing recovery is impossible; to recover the tree, drop its RocksDB directory, stop pruning and restart recovery"
);
}
Ok(())
}

/// Returns `Ok(true)` if the chunk was recovered, `Ok(false)` if the recovery process was interrupted.
async fn recover_key_chunk(
tree: &Mutex<AsyncTreeRecovery>,
@@ -394,7 +417,9 @@ impl AsyncTreeRecovery {
.storage_logs_dal()
.get_tree_entries_for_l2_block(snapshot_l2_block, key_chunk.clone())
.await?;
Self::check_pruning_info(&mut storage, snapshot_l2_block).await?;
drop(storage);

let entries_latency = entries_latency.observe();
tracing::debug!(
"Loaded {} entries for chunk {key_chunk:?} in {entries_latency:?}",
55 changes: 54 additions & 1 deletion core/node/metadata_calculator/src/recovery/tests.rs
Original file line number Diff line number Diff line change
@@ -207,12 +207,13 @@ impl TestEventListener {
}
}

#[async_trait]
impl HandleRecoveryEvent for TestEventListener {
fn recovery_started(&mut self, _chunk_count: u64, recovered_chunk_count: u64) {
assert_eq!(recovered_chunk_count, self.expected_recovered_chunks);
}

fn chunk_recovered(&self) {
async fn chunk_recovered(&self) {
let processed_chunk_count = self.processed_chunk_count.fetch_add(1, Ordering::SeqCst) + 1;
if processed_chunk_count >= self.stop_threshold {
self.stop_sender.send_replace(true);
@@ -491,3 +492,55 @@ async fn recovery_with_further_pruning(pruned_batches: u32) {
let (calculator, _) = setup_calculator(temp_dir.path(), pool, true).await;
assert_eq!(run_calculator(calculator).await, expected_root_hash);
}

#[derive(Debug)]
struct PruningEventListener {
pool: ConnectionPool<Core>,
pruned_l1_batch: L1BatchNumber,
}

#[async_trait]
impl HandleRecoveryEvent for PruningEventListener {
async fn chunk_recovered(&self) {
prune_storage(&self.pool, self.pruned_l1_batch).await;
}
}

#[tokio::test]
async fn pruning_during_recovery_is_detected() {
let pool = ConnectionPool::<Core>::test_pool().await;
let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB");

let mut storage = pool.connection().await.unwrap();
insert_genesis_batch(&mut storage, &GenesisParams::mock())
.await
.unwrap();
let logs = gen_storage_logs(200..400, 5);
extend_db_state(&mut storage, logs).await;
drop(storage);
prune_storage(&pool, L1BatchNumber(1)).await;

let tree_path = temp_dir.path().join("recovery");
let config = MetadataCalculatorRecoveryConfig::default();
let (tree, _) = create_tree_recovery(&tree_path, L1BatchNumber(1), &config).await;
let (_stop_sender, stop_receiver) = watch::channel(false);
let recovery_options = RecoveryOptions {
chunk_count: 5,
concurrency_limit: 1,
events: Box::new(PruningEventListener {
pool: pool.clone(),
pruned_l1_batch: L1BatchNumber(3),
}),
};
let init_params = InitParameters::new(&pool, &config)
.await
.unwrap()
.expect("no init params");

let err = tree
.recover(init_params, recovery_options, &pool, &stop_receiver)
.await
.unwrap_err();
let err = format!("{err:#}").to_lowercase();
assert!(err.contains("continuing recovery is impossible"), "{err}");
}