From f7493206cf7b9f1419eb37d6834978c999104088 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 22 Jan 2025 11:45:50 +0200 Subject: [PATCH] Fix race condition in EN storage initialization --- .../src/external_node/revert.rs | 6 ++ core/node/node_storage_init/src/lib.rs | 5 +- core/node/node_storage_init/src/traits.rs | 3 + core/node/reorg_detector/src/lib.rs | 86 ++++++++++++++----- core/node/reorg_detector/src/tests.rs | 10 +++ 5 files changed, 86 insertions(+), 24 deletions(-) diff --git a/core/node/node_storage_init/src/external_node/revert.rs b/core/node/node_storage_init/src/external_node/revert.rs index 86d137c6b660..db06a4492bb7 100644 --- a/core/node/node_storage_init/src/external_node/revert.rs +++ b/core/node/node_storage_init/src/external_node/revert.rs @@ -34,6 +34,12 @@ impl RevertStorage for ExternalNodeReverter { Ok(()) } + async fn is_reorg_needed(&self, stop_receiver: watch::Receiver) -> anyhow::Result { + ReorgDetector::new(self.client.clone(), self.pool.clone()) + .check_reorg_presence(stop_receiver) + .await + } + async fn last_correct_batch_for_reorg( &self, stop_receiver: watch::Receiver, diff --git a/core/node/node_storage_init/src/lib.rs b/core/node/node_storage_init/src/lib.rs index 10b0131908ca..a8b72b769a18 100644 --- a/core/node/node_storage_init/src/lib.rs +++ b/core/node/node_storage_init/src/lib.rs @@ -182,10 +182,7 @@ impl NodeStorageInitializer { ) -> anyhow::Result { // May be `true` if stop signal is received, but the node will shut down without launching any tasks anyway. let initialized = if let Some(reverter) = &self.strategy.block_reverter { - reverter - .last_correct_batch_for_reorg(stop_receiver) - .await? - .is_none() + !reverter.is_reorg_needed(stop_receiver).await? } else { true }; diff --git a/core/node/node_storage_init/src/traits.rs b/core/node/node_storage_init/src/traits.rs index 3b6467764d97..d28b0226d845 100644 --- a/core/node/node_storage_init/src/traits.rs +++ b/core/node/node_storage_init/src/traits.rs @@ -18,6 +18,9 @@ pub trait InitializeStorage: fmt::Debug + Send + Sync + 'static { /// This trait assumes that for any invalid state there exists a batch number to which the storage can be rolled back. #[async_trait::async_trait] pub trait RevertStorage: fmt::Debug + Send + Sync + 'static { + /// Checks whether a reorg is needed for the storage. + async fn is_reorg_needed(&self, stop_receiver: watch::Receiver) -> anyhow::Result; + /// Checks if the storage is invalid state and has to be rolled back. async fn last_correct_batch_for_reorg( &self, diff --git a/core/node/reorg_detector/src/lib.rs b/core/node/reorg_detector/src/lib.rs index d1954ca4b74b..ec5b505d7803 100644 --- a/core/node/reorg_detector/src/lib.rs +++ b/core/node/reorg_detector/src/lib.rs @@ -266,26 +266,32 @@ impl ReorgDetector { &self.health_check } - async fn check_consistency(&mut self) -> Result<(), Error> { + async fn find_last_diverged_batch(&mut self) -> Result, HashMatchError> { let mut storage = self.pool.connection().await?; - let Some(local_l1_batch) = storage + // Create a readonly transaction to get a consistent view of the storage. + let mut storage_tx = storage + .transaction_builder()? + .set_readonly() + .build() + .await?; + let Some(local_l1_batch) = storage_tx .blocks_dal() .get_last_l1_batch_number_with_tree_data() .await? else { - return Ok(()); + return Ok(None); }; - let Some(local_l2_block) = storage.blocks_dal().get_sealed_l2_block_number().await? else { - return Ok(()); + let Some(local_l2_block) = storage_tx.blocks_dal().get_sealed_l2_block_number().await? + else { + return Ok(None); }; + drop(storage_tx); drop(storage); let remote_l1_batch = self.client.sealed_l1_batch_number().await?; let remote_l2_block = self.client.sealed_l2_block_number().await?; - let checked_l1_batch = local_l1_batch.min(remote_l1_batch); let checked_l2_block = local_l2_block.min(remote_l2_block); - let root_hashes_match = self.root_hashes_match(checked_l1_batch).await?; let l2_block_hashes_match = self.l2_block_hashes_match(checked_l2_block).await?; @@ -295,13 +301,21 @@ impl ReorgDetector { // In other cases either there is only a height mismatch which means that one of // the nodes needs to do catching up; however, it is not certain that there is actually // a re-org taking place. - if root_hashes_match && l2_block_hashes_match { + Ok(if root_hashes_match && l2_block_hashes_match { self.event_handler .update_correct_block(checked_l2_block, checked_l1_batch); + None + } else { + let diverged_l1_batch = checked_l1_batch + (root_hashes_match as u32); + self.event_handler.report_divergence(diverged_l1_batch); + Some(diverged_l1_batch) + }) + } + + async fn check_consistency(&mut self) -> Result<(), Error> { + let Some(diverged_l1_batch) = self.find_last_diverged_batch().await? else { return Ok(()); - } - let diverged_l1_batch = checked_l1_batch + (root_hashes_match as u32); - self.event_handler.report_divergence(diverged_l1_batch); + }; // Check that the first L1 batch matches, to make sure that // we are actually tracking the same chain as the main node. @@ -455,15 +469,7 @@ impl ReorgDetector { ) -> Result<(), Error> { while !*stop_receiver.borrow_and_update() { let sleep_interval = match self.check_consistency().await { - Err(Error::HashMatch(HashMatchError::MissingData(MissingData::RootHash))) => { - tracing::debug!("Last L1 batch on the main node doesn't have a state root hash; waiting until it is computed"); - self.sleep_interval / 10 - } - Err(err) if err.is_retriable() => { - tracing::warn!("Following transient error occurred: {err}"); - tracing::info!("Trying again after a delay"); - self.sleep_interval - } + Err(Error::HashMatch(err)) => self.handle_hash_err(err)?, Err(err) => return Err(err), Ok(()) if stop_after_success => return Ok(()), Ok(()) => self.sleep_interval, @@ -480,6 +486,46 @@ impl ReorgDetector { } Ok(()) } + + /// Returns the sleep interval if the error is transient. + fn handle_hash_err(&self, err: HashMatchError) -> Result { + match err { + HashMatchError::MissingData(MissingData::RootHash) => { + tracing::debug!("Last L1 batch on the main node doesn't have a state root hash; waiting until it is computed"); + Ok(self.sleep_interval / 10) + } + err if err.is_retriable() => { + tracing::warn!("Following transient error occurred: {err}"); + tracing::info!("Trying again after a delay"); + Ok(self.sleep_interval) + } + err => Err(err), + } + } + + /// Checks whether a reorg is present. Unlike [`Self::run_once()`], this method doesn't pinpoint the first diverged L1 batch; + /// it just checks whether diverged batches / blocks exist in general. + /// + /// Internally retries transient errors. Returns `Ok(false)` if a stop signal is received. + pub async fn check_reorg_presence( + &mut self, + mut stop_receiver: watch::Receiver, + ) -> anyhow::Result { + while !*stop_receiver.borrow_and_update() { + let sleep_interval = match self.find_last_diverged_batch().await { + Err(err) => self.handle_hash_err(err)?, + Ok(maybe_diverged_batch) => return Ok(maybe_diverged_batch.is_some()), + }; + + if tokio::time::timeout(sleep_interval, stop_receiver.changed()) + .await + .is_ok() + { + break; + } + } + Ok(false) + } } /// Fallible and async predicate for binary search. diff --git a/core/node/reorg_detector/src/tests.rs b/core/node/reorg_detector/src/tests.rs index 5465cf8662d6..64e9c224d224 100644 --- a/core/node/reorg_detector/src/tests.rs +++ b/core/node/reorg_detector/src/tests.rs @@ -312,12 +312,19 @@ async fn reorg_is_detected_on_batch_hash_mismatch() { store_l2_block(&mut storage, 2, l2_block_hash).await; detector.check_consistency().await.unwrap(); + let (_stop_sender, stop_receiver) = watch::channel(false); + assert!(!detector + .check_reorg_presence(stop_receiver.clone()) + .await + .unwrap()); + seal_l1_batch(&mut storage, 2, H256::repeat_byte(0xff)).await; // ^ Hash of L1 batch #2 differs from that on the main node. assert_matches!( detector.check_consistency().await, Err(Error::ReorgDetected(L1BatchNumber(1))) ); + assert!(detector.check_reorg_presence(stop_receiver).await.unwrap()); } #[tokio::test] @@ -621,6 +628,9 @@ async fn reorg_is_detected_based_on_l2_block_hashes(last_correct_l1_batch: u32) detector.check_consistency().await, Err(Error::ReorgDetected(L1BatchNumber(num))) if num == last_correct_l1_batch ); + + let (_stop_sender, stop_receiver) = watch::channel(false); + assert!(detector.check_reorg_presence(stop_receiver).await.unwrap()); } #[derive(Debug)]