Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(en): Fix race condition in EN storage initialization #3515

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/node/node_storage_init/src/external_node/revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ impl RevertStorage for ExternalNodeReverter {
Ok(())
}

async fn is_reorg_needed(&self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<bool> {
ReorgDetector::new(self.client.clone(), self.pool.clone())
.check_reorg_presence(stop_receiver)
.await
}

async fn last_correct_batch_for_reorg(
&self,
stop_receiver: watch::Receiver<bool>,
Expand Down
5 changes: 1 addition & 4 deletions core/node/node_storage_init/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,7 @@ impl NodeStorageInitializer {
) -> anyhow::Result<bool> {
// May be `true` if stop signal is received, but the node will shut down without launching any tasks anyway.
let initialized = if let Some(reverter) = &self.strategy.block_reverter {
reverter
.last_correct_batch_for_reorg(stop_receiver)
.await?
.is_none()
!reverter.is_reorg_needed(stop_receiver).await?
} else {
true
};
Expand Down
3 changes: 3 additions & 0 deletions core/node/node_storage_init/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub trait InitializeStorage: fmt::Debug + Send + Sync + 'static {
/// This trait assumes that for any invalid state there exists a batch number to which the storage can be rolled back.
#[async_trait::async_trait]
pub trait RevertStorage: fmt::Debug + Send + Sync + 'static {
/// Checks whether a reorg is needed for the storage.
async fn is_reorg_needed(&self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<bool>;

/// Checks if the storage is invalid state and has to be rolled back.
async fn last_correct_batch_for_reorg(
&self,
Expand Down
86 changes: 66 additions & 20 deletions core/node/reorg_detector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,26 +266,32 @@ impl ReorgDetector {
&self.health_check
}

async fn check_consistency(&mut self) -> Result<(), Error> {
async fn find_last_diverged_batch(&mut self) -> Result<Option<L1BatchNumber>, HashMatchError> {
let mut storage = self.pool.connection().await?;
let Some(local_l1_batch) = storage
// Create a readonly transaction to get a consistent view of the storage.
let mut storage_tx = storage
.transaction_builder()?
.set_readonly()
.build()
.await?;
let Some(local_l1_batch) = storage_tx
.blocks_dal()
.get_last_l1_batch_number_with_tree_data()
.await?
else {
return Ok(());
return Ok(None);
};
let Some(local_l2_block) = storage.blocks_dal().get_sealed_l2_block_number().await? else {
return Ok(());
let Some(local_l2_block) = storage_tx.blocks_dal().get_sealed_l2_block_number().await?
else {
return Ok(None);
};
drop(storage_tx);
drop(storage);

let remote_l1_batch = self.client.sealed_l1_batch_number().await?;
let remote_l2_block = self.client.sealed_l2_block_number().await?;

let checked_l1_batch = local_l1_batch.min(remote_l1_batch);
let checked_l2_block = local_l2_block.min(remote_l2_block);

let root_hashes_match = self.root_hashes_match(checked_l1_batch).await?;
let l2_block_hashes_match = self.l2_block_hashes_match(checked_l2_block).await?;

Expand All @@ -295,13 +301,21 @@ impl ReorgDetector {
// In other cases either there is only a height mismatch which means that one of
// the nodes needs to do catching up; however, it is not certain that there is actually
// a re-org taking place.
if root_hashes_match && l2_block_hashes_match {
Ok(if root_hashes_match && l2_block_hashes_match {
self.event_handler
.update_correct_block(checked_l2_block, checked_l1_batch);
None
} else {
let diverged_l1_batch = checked_l1_batch + (root_hashes_match as u32);
self.event_handler.report_divergence(diverged_l1_batch);
Some(diverged_l1_batch)
})
}

async fn check_consistency(&mut self) -> Result<(), Error> {
let Some(diverged_l1_batch) = self.find_last_diverged_batch().await? else {
return Ok(());
}
let diverged_l1_batch = checked_l1_batch + (root_hashes_match as u32);
self.event_handler.report_divergence(diverged_l1_batch);
};

// Check that the first L1 batch matches, to make sure that
// we are actually tracking the same chain as the main node.
Expand Down Expand Up @@ -455,15 +469,7 @@ impl ReorgDetector {
) -> Result<(), Error> {
while !*stop_receiver.borrow_and_update() {
let sleep_interval = match self.check_consistency().await {
Err(Error::HashMatch(HashMatchError::MissingData(MissingData::RootHash))) => {
tracing::debug!("Last L1 batch on the main node doesn't have a state root hash; waiting until it is computed");
self.sleep_interval / 10
}
Err(err) if err.is_retriable() => {
tracing::warn!("Following transient error occurred: {err}");
tracing::info!("Trying again after a delay");
self.sleep_interval
}
Err(Error::HashMatch(err)) => self.handle_hash_err(err)?,
Err(err) => return Err(err),
Ok(()) if stop_after_success => return Ok(()),
Ok(()) => self.sleep_interval,
Expand All @@ -480,6 +486,46 @@ impl ReorgDetector {
}
Ok(())
}

/// Returns the sleep interval if the error is transient.
fn handle_hash_err(&self, err: HashMatchError) -> Result<Duration, HashMatchError> {
match err {
HashMatchError::MissingData(MissingData::RootHash) => {
tracing::debug!("Last L1 batch on the main node doesn't have a state root hash; waiting until it is computed");
Ok(self.sleep_interval / 10)
}
err if err.is_retriable() => {
tracing::warn!("Following transient error occurred: {err}");
tracing::info!("Trying again after a delay");
Ok(self.sleep_interval)
}
err => Err(err),
}
}

/// Checks whether a reorg is present. Unlike [`Self::run_once()`], this method doesn't pinpoint the first diverged L1 batch;
/// it just checks whether diverged batches / blocks exist in general.
///
/// Internally retries transient errors. Returns `Ok(false)` if a stop signal is received.
pub async fn check_reorg_presence(
&mut self,
mut stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<bool> {
while !*stop_receiver.borrow_and_update() {
let sleep_interval = match self.find_last_diverged_batch().await {
Err(err) => self.handle_hash_err(err)?,
Ok(maybe_diverged_batch) => return Ok(maybe_diverged_batch.is_some()),
};

if tokio::time::timeout(sleep_interval, stop_receiver.changed())
.await
.is_ok()
{
break;
}
}
Ok(false)
}
}

/// Fallible and async predicate for binary search.
Expand Down
10 changes: 10 additions & 0 deletions core/node/reorg_detector/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,19 @@ async fn reorg_is_detected_on_batch_hash_mismatch() {
store_l2_block(&mut storage, 2, l2_block_hash).await;
detector.check_consistency().await.unwrap();

let (_stop_sender, stop_receiver) = watch::channel(false);
assert!(!detector
.check_reorg_presence(stop_receiver.clone())
.await
.unwrap());

seal_l1_batch(&mut storage, 2, H256::repeat_byte(0xff)).await;
// ^ Hash of L1 batch #2 differs from that on the main node.
assert_matches!(
detector.check_consistency().await,
Err(Error::ReorgDetected(L1BatchNumber(1)))
);
assert!(detector.check_reorg_presence(stop_receiver).await.unwrap());
}

#[tokio::test]
Expand Down Expand Up @@ -621,6 +628,9 @@ async fn reorg_is_detected_based_on_l2_block_hashes(last_correct_l1_batch: u32)
detector.check_consistency().await,
Err(Error::ReorgDetected(L1BatchNumber(num))) if num == last_correct_l1_batch
);

let (_stop_sender, stop_receiver) = watch::channel(false);
assert!(detector.check_reorg_presence(stop_receiver).await.unwrap());
}

#[derive(Debug)]
Expand Down
Loading