diff --git a/core/bin/snapshots_creator/src/chunking.rs b/core/bin/snapshots_creator/src/chunking.rs index 11768febd44f..047a6a23d24e 100644 --- a/core/bin/snapshots_creator/src/chunking.rs +++ b/core/bin/snapshots_creator/src/chunking.rs @@ -5,10 +5,10 @@ use zksync_utils::u256_to_h256; pub(crate) fn get_chunk_hashed_keys_range( chunk_id: u64, - chunks_count: u64, + chunk_count: u64, ) -> ops::RangeInclusive { - assert!(chunks_count > 0); - let mut stride = U256::MAX / chunks_count; + assert!(chunk_count > 0); + let mut stride = U256::MAX / chunk_count; let stride_minus_one = if stride < U256::MAX { stride += U256::one(); stride - 1 diff --git a/core/bin/snapshots_creator/src/creator.rs b/core/bin/snapshots_creator/src/creator.rs new file mode 100644 index 000000000000..da2ac930dbd4 --- /dev/null +++ b/core/bin/snapshots_creator/src/creator.rs @@ -0,0 +1,336 @@ +//! [`SnapshotCreator`] and tightly related types. + +use anyhow::Context as _; +use tokio::sync::Semaphore; +use zksync_config::SnapshotsCreatorConfig; +use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_object_store::ObjectStore; +use zksync_types::{ + snapshots::{ + SnapshotFactoryDependencies, SnapshotMetadata, SnapshotStorageLogsChunk, + SnapshotStorageLogsStorageKey, + }, + L1BatchNumber, MiniblockNumber, +}; +use zksync_utils::ceil_div; + +#[cfg(test)] +use crate::tests::HandleEvent; +use crate::{ + chunking::get_chunk_hashed_keys_range, + metrics::{FactoryDepsStage, StorageChunkStage, METRICS}, +}; + +/// Encapsulates progress of creating a particular storage snapshot. +#[derive(Debug)] +struct SnapshotProgress { + l1_batch_number: L1BatchNumber, + /// `true` if the snapshot is new (i.e., its progress is not recovered from Postgres). + is_new_snapshot: bool, + chunk_count: u64, + remaining_chunk_ids: Vec, +} + +impl SnapshotProgress { + fn new(l1_batch_number: L1BatchNumber, chunk_count: u64) -> Self { + Self { + l1_batch_number, + is_new_snapshot: true, + chunk_count, + remaining_chunk_ids: (0..chunk_count).collect(), + } + } + + fn from_existing_snapshot(snapshot: &SnapshotMetadata) -> Self { + let remaining_chunk_ids = snapshot + .storage_logs_filepaths + .iter() + .enumerate() + .filter_map(|(chunk_id, path)| path.is_none().then_some(chunk_id as u64)) + .collect(); + + Self { + l1_batch_number: snapshot.l1_batch_number, + is_new_snapshot: false, + chunk_count: snapshot.storage_logs_filepaths.len() as u64, + remaining_chunk_ids, + } + } +} + +/// Creator of a single storage snapshot. +#[derive(Debug)] +pub(crate) struct SnapshotCreator { + pub blob_store: Box, + pub master_pool: ConnectionPool, + pub replica_pool: ConnectionPool, + #[cfg(test)] + pub event_listener: Box, +} + +impl SnapshotCreator { + async fn connect_to_replica(&self) -> anyhow::Result> { + self.replica_pool + .access_storage_tagged("snapshots_creator") + .await + } + + async fn process_storage_logs_single_chunk( + &self, + semaphore: &Semaphore, + miniblock_number: MiniblockNumber, + l1_batch_number: L1BatchNumber, + chunk_id: u64, + chunk_count: u64, + ) -> anyhow::Result<()> { + let _permit = semaphore.acquire().await?; + #[cfg(test)] + if self.event_listener.on_chunk_started().should_exit() { + return Ok(()); + } + + let hashed_keys_range = get_chunk_hashed_keys_range(chunk_id, chunk_count); + let mut conn = self.connect_to_replica().await?; + + let latency = + METRICS.storage_logs_processing_duration[&StorageChunkStage::LoadFromPostgres].start(); + let logs = conn + .snapshots_creator_dal() + .get_storage_logs_chunk(miniblock_number, hashed_keys_range) + .await + .context("Error fetching storage logs count")?; + drop(conn); + let latency = latency.observe(); + tracing::info!( + "Loaded chunk {chunk_id} ({} logs) from Postgres in {latency:?}", + logs.len() + ); + + let latency = + METRICS.storage_logs_processing_duration[&StorageChunkStage::SaveToGcs].start(); + let storage_logs_chunk = SnapshotStorageLogsChunk { storage_logs: logs }; + let key = SnapshotStorageLogsStorageKey { + l1_batch_number, + chunk_id, + }; + let filename = self + .blob_store + .put(key, &storage_logs_chunk) + .await + .context("Error storing storage logs chunk in blob store")?; + let output_filepath_prefix = self + .blob_store + .get_storage_prefix::(); + let output_filepath = format!("{output_filepath_prefix}/{filename}"); + let latency = latency.observe(); + + let mut master_conn = self + .master_pool + .access_storage_tagged("snapshots_creator") + .await?; + master_conn + .snapshots_dal() + .add_storage_logs_filepath_for_snapshot(l1_batch_number, chunk_id, &output_filepath) + .await?; + #[cfg(test)] + self.event_listener.on_chunk_saved(); + + let tasks_left = METRICS.storage_logs_chunks_left_to_process.dec_by(1) - 1; + tracing::info!( + "Saved chunk {chunk_id} (overall progress {}/{chunk_count}) in {latency:?} to location: {output_filepath}", + chunk_count - tasks_left as u64 + ); + Ok(()) + } + + async fn process_factory_deps( + &self, + miniblock_number: MiniblockNumber, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result { + let mut conn = self.connect_to_replica().await?; + + tracing::info!("Loading factory deps from Postgres..."); + let latency = + METRICS.factory_deps_processing_duration[&FactoryDepsStage::LoadFromPostgres].start(); + let factory_deps = conn + .snapshots_creator_dal() + .get_all_factory_deps(miniblock_number) + .await?; + drop(conn); + let latency = latency.observe(); + tracing::info!("Loaded {} factory deps in {latency:?}", factory_deps.len()); + + tracing::info!("Saving factory deps to GCS..."); + let latency = + METRICS.factory_deps_processing_duration[&FactoryDepsStage::SaveToGcs].start(); + let factory_deps = SnapshotFactoryDependencies { factory_deps }; + let filename = self + .blob_store + .put(l1_batch_number, &factory_deps) + .await + .context("Error storing factory deps in blob store")?; + let output_filepath_prefix = self + .blob_store + .get_storage_prefix::(); + let output_filepath = format!("{output_filepath_prefix}/{filename}"); + let latency = latency.observe(); + tracing::info!( + "Saved {} factory deps in {latency:?} to location: {output_filepath}", + factory_deps.factory_deps.len() + ); + + Ok(output_filepath) + } + + /// Returns `Ok(None)` if the created snapshot would coincide with `latest_snapshot`. + async fn initialize_snapshot_progress( + config: &SnapshotsCreatorConfig, + min_chunk_count: u64, + latest_snapshot: Option<&SnapshotMetadata>, + conn: &mut StorageProcessor<'_>, + ) -> anyhow::Result> { + // We subtract 1 so that after restore, EN node has at least one L1 batch to fetch + let sealed_l1_batch_number = conn.blocks_dal().get_sealed_l1_batch_number().await?; + let sealed_l1_batch_number = sealed_l1_batch_number.context("No L1 batches in Postgres")?; + anyhow::ensure!( + sealed_l1_batch_number != L1BatchNumber(0), + "Cannot create snapshot when only the genesis L1 batch is present in Postgres" + ); + let l1_batch_number = sealed_l1_batch_number - 1; + + let latest_snapshot_l1_batch_number = + latest_snapshot.map(|snapshot| snapshot.l1_batch_number); + if latest_snapshot_l1_batch_number == Some(l1_batch_number) { + tracing::info!( + "Snapshot at expected L1 batch #{l1_batch_number} is already created; exiting" + ); + return Ok(None); + } + + let distinct_storage_logs_keys_count = conn + .snapshots_creator_dal() + .get_distinct_storage_logs_keys_count(l1_batch_number) + .await?; + let chunk_size = config.storage_logs_chunk_size; + // We force the minimum number of chunks to avoid situations where only one chunk is created in tests. + let chunk_count = + ceil_div(distinct_storage_logs_keys_count, chunk_size).max(min_chunk_count); + + tracing::info!( + "Selected storage logs chunking for L1 batch {l1_batch_number}: \ + {chunk_count} chunks of expected size {chunk_size}" + ); + Ok(Some(SnapshotProgress::new(l1_batch_number, chunk_count))) + } + + /// Returns `Ok(None)` if a snapshot should not be created / resumed. + async fn load_or_initialize_snapshot_progress( + &self, + config: &SnapshotsCreatorConfig, + min_chunk_count: u64, + ) -> anyhow::Result> { + let mut master_conn = self + .master_pool + .access_storage_tagged("snapshots_creator") + .await?; + let latest_snapshot = master_conn + .snapshots_dal() + .get_newest_snapshot_metadata() + .await?; + drop(master_conn); + + let pending_snapshot = latest_snapshot + .as_ref() + .filter(|snapshot| !snapshot.is_complete()); + if let Some(snapshot) = pending_snapshot { + Ok(Some(SnapshotProgress::from_existing_snapshot(snapshot))) + } else { + Self::initialize_snapshot_progress( + config, + min_chunk_count, + latest_snapshot.as_ref(), + &mut self.connect_to_replica().await?, + ) + .await + } + } + + pub async fn run( + self, + config: SnapshotsCreatorConfig, + min_chunk_count: u64, + ) -> anyhow::Result<()> { + let latency = METRICS.snapshot_generation_duration.start(); + + let Some(progress) = self + .load_or_initialize_snapshot_progress(&config, min_chunk_count) + .await? + else { + // No snapshot creation is necessary; a snapshot for the current L1 batch is already created + return Ok(()); + }; + + let mut conn = self.connect_to_replica().await?; + let (_, last_miniblock_number_in_batch) = conn + .blocks_dal() + .get_miniblock_range_of_l1_batch(progress.l1_batch_number) + .await? + .context("Error fetching last miniblock number")?; + drop(conn); + + METRICS.storage_logs_chunks_count.set(progress.chunk_count); + tracing::info!( + "Creating snapshot for storage logs up to miniblock {last_miniblock_number_in_batch}, \ + L1 batch {}", + progress.l1_batch_number + ); + + if progress.is_new_snapshot { + let factory_deps_output_file = self + .process_factory_deps(last_miniblock_number_in_batch, progress.l1_batch_number) + .await?; + + let mut master_conn = self + .master_pool + .access_storage_tagged("snapshots_creator") + .await?; + master_conn + .snapshots_dal() + .add_snapshot( + progress.l1_batch_number, + progress.chunk_count, + &factory_deps_output_file, + ) + .await?; + } + + METRICS + .storage_logs_chunks_left_to_process + .set(progress.remaining_chunk_ids.len()); + let semaphore = Semaphore::new(config.concurrent_queries_count as usize); + let tasks = progress.remaining_chunk_ids.into_iter().map(|chunk_id| { + self.process_storage_logs_single_chunk( + &semaphore, + last_miniblock_number_in_batch, + progress.l1_batch_number, + chunk_id, + progress.chunk_count, + ) + }); + futures::future::try_join_all(tasks).await?; + + METRICS + .snapshot_l1_batch + .set(progress.l1_batch_number.0.into()); + + let elapsed = latency.observe(); + tracing::info!("snapshot_generation_duration: {elapsed:?}"); + tracing::info!("snapshot_l1_batch: {}", METRICS.snapshot_l1_batch.get()); + tracing::info!( + "storage_logs_chunks_count: {}", + METRICS.storage_logs_chunks_count.get() + ); + Ok(()) + } +} diff --git a/core/bin/snapshots_creator/src/main.rs b/core/bin/snapshots_creator/src/main.rs index e8f1fa3bf7fe..0571500615bb 100644 --- a/core/bin/snapshots_creator/src/main.rs +++ b/core/bin/snapshots_creator/src/main.rs @@ -1,29 +1,26 @@ //! Snapshot creator utility. Intended to run on a schedule, with each run creating a new snapshot. +//! +//! # Assumptions +//! +//! The snapshot creator is fault-tolerant; if it stops in the middle of creating a snapshot, +//! this snapshot will be continued from roughly the same point after the restart. If this is +//! undesired, remove the `snapshots` table record corresponding to the pending snapshot. +//! +//! It is assumed that the snapshot creator is run as a singleton process (no more than 1 instance +//! at a time). use anyhow::Context as _; use prometheus_exporter::PrometheusExporterConfig; -use tokio::{ - sync::{watch, Semaphore}, - task::JoinHandle, -}; +use tokio::{sync::watch, task::JoinHandle}; use zksync_config::{configs::PrometheusConfig, PostgresConfig, SnapshotsCreatorConfig}; use zksync_dal::ConnectionPool; use zksync_env_config::{object_store::SnapshotsObjectStoreConfig, FromEnv}; -use zksync_object_store::{ObjectStore, ObjectStoreFactory}; -use zksync_types::{ - snapshots::{ - SnapshotFactoryDependencies, SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey, - }, - L1BatchNumber, MiniblockNumber, -}; -use zksync_utils::ceil_div; +use zksync_object_store::ObjectStoreFactory; -use crate::{ - chunking::get_chunk_hashed_keys_range, - metrics::{FactoryDepsStage, StorageChunkStage, METRICS}, -}; +use crate::creator::SnapshotCreator; mod chunking; +mod creator; mod metrics; #[cfg(test)] mod tests; @@ -47,205 +44,6 @@ async fn maybe_enable_prometheus_metrics( } } -async fn process_storage_logs_single_chunk( - blob_store: &dyn ObjectStore, - pool: &ConnectionPool, - semaphore: &Semaphore, - miniblock_number: MiniblockNumber, - l1_batch_number: L1BatchNumber, - chunk_id: u64, - chunks_count: u64, -) -> anyhow::Result { - let _permit = semaphore.acquire().await?; - let hashed_keys_range = get_chunk_hashed_keys_range(chunk_id, chunks_count); - let mut conn = pool.access_storage_tagged("snapshots_creator").await?; - - let latency = - METRICS.storage_logs_processing_duration[&StorageChunkStage::LoadFromPostgres].start(); - let logs = conn - .snapshots_creator_dal() - .get_storage_logs_chunk(miniblock_number, hashed_keys_range) - .await - .context("Error fetching storage logs count")?; - drop(conn); - let latency = latency.observe(); - tracing::info!( - "Loaded chunk {chunk_id} ({} logs) from Postgres in {latency:?}", - logs.len() - ); - - let latency = METRICS.storage_logs_processing_duration[&StorageChunkStage::SaveToGcs].start(); - let storage_logs_chunk = SnapshotStorageLogsChunk { storage_logs: logs }; - let key = SnapshotStorageLogsStorageKey { - l1_batch_number, - chunk_id, - }; - let filename = blob_store - .put(key, &storage_logs_chunk) - .await - .context("Error storing storage logs chunk in blob store")?; - let output_filepath_prefix = blob_store.get_storage_prefix::(); - let output_filepath = format!("{output_filepath_prefix}/{filename}"); - let latency = latency.observe(); - - let tasks_left = METRICS.storage_logs_chunks_left_to_process.dec_by(1) - 1; - tracing::info!( - "Saved chunk {chunk_id} (overall progress {}/{chunks_count}) in {latency:?} to location: {output_filepath}", - chunks_count - tasks_left - ); - Ok(output_filepath) -} - -async fn process_factory_deps( - blob_store: &dyn ObjectStore, - pool: &ConnectionPool, - miniblock_number: MiniblockNumber, - l1_batch_number: L1BatchNumber, -) -> anyhow::Result { - let mut conn = pool.access_storage_tagged("snapshots_creator").await?; - - tracing::info!("Loading factory deps from Postgres..."); - let latency = - METRICS.factory_deps_processing_duration[&FactoryDepsStage::LoadFromPostgres].start(); - let factory_deps = conn - .snapshots_creator_dal() - .get_all_factory_deps(miniblock_number) - .await?; - drop(conn); - let latency = latency.observe(); - tracing::info!("Loaded {} factory deps in {latency:?}", factory_deps.len()); - - tracing::info!("Saving factory deps to GCS..."); - let latency = METRICS.factory_deps_processing_duration[&FactoryDepsStage::SaveToGcs].start(); - let factory_deps = SnapshotFactoryDependencies { factory_deps }; - let filename = blob_store - .put(l1_batch_number, &factory_deps) - .await - .context("Error storing factory deps in blob store")?; - let output_filepath_prefix = blob_store.get_storage_prefix::(); - let output_filepath = format!("{output_filepath_prefix}/{filename}"); - let latency = latency.observe(); - tracing::info!( - "Saved {} factory deps in {latency:?} to location: {output_filepath}", - factory_deps.factory_deps.len() - ); - - Ok(output_filepath) -} - -async fn run( - blob_store: Box, - replica_pool: ConnectionPool, - master_pool: ConnectionPool, - min_chunk_count: u64, -) -> anyhow::Result<()> { - let latency = METRICS.snapshot_generation_duration.start(); - let config = SnapshotsCreatorConfig::from_env().context("SnapshotsCreatorConfig::from_env")?; - - let mut conn = replica_pool - .access_storage_tagged("snapshots_creator") - .await?; - - // We subtract 1 so that after restore, EN node has at least one L1 batch to fetch - let sealed_l1_batch_number = conn.blocks_dal().get_sealed_l1_batch_number().await?; - let sealed_l1_batch_number = sealed_l1_batch_number.context("No L1 batches in Postgres")?; - anyhow::ensure!( - sealed_l1_batch_number != L1BatchNumber(0), - "Cannot create snapshot when only the genesis L1 batch is present in Postgres" - ); - let l1_batch_number = sealed_l1_batch_number - 1; - - let mut master_conn = master_pool - .access_storage_tagged("snapshots_creator") - .await?; - if master_conn - .snapshots_dal() - .get_snapshot_metadata(l1_batch_number) - .await? - .is_some() - { - tracing::info!("Snapshot for L1 batch number {l1_batch_number} already exists, exiting"); - return Ok(()); - } - drop(master_conn); - - let (_, last_miniblock_number_in_batch) = conn - .blocks_dal() - .get_miniblock_range_of_l1_batch(l1_batch_number) - .await? - .context("Error fetching last miniblock number")?; - let distinct_storage_logs_keys_count = conn - .snapshots_creator_dal() - .get_distinct_storage_logs_keys_count(l1_batch_number) - .await?; - drop(conn); - - let chunk_size = config.storage_logs_chunk_size; - // We force the minimum number of chunks to avoid situations where only one chunk is created in tests. - let chunks_count = ceil_div(distinct_storage_logs_keys_count, chunk_size).max(min_chunk_count); - - METRICS.storage_logs_chunks_count.set(chunks_count); - - tracing::info!( - "Creating snapshot for storage logs up to miniblock {last_miniblock_number_in_batch}, \ - L1 batch {l1_batch_number}" - ); - tracing::info!("Starting to generate {chunks_count} chunks of expected size {chunk_size}"); - - let factory_deps_output_file = process_factory_deps( - &*blob_store, - &replica_pool, - last_miniblock_number_in_batch, - l1_batch_number, - ) - .await?; - - METRICS - .storage_logs_chunks_left_to_process - .set(chunks_count); - - let semaphore = Semaphore::new(config.concurrent_queries_count as usize); - let tasks = (0..chunks_count).map(|chunk_id| { - process_storage_logs_single_chunk( - &*blob_store, - &replica_pool, - &semaphore, - last_miniblock_number_in_batch, - l1_batch_number, - chunk_id, - chunks_count, - ) - }); - let mut storage_logs_output_files = futures::future::try_join_all(tasks).await?; - // Sanity check: the number of files should equal the number of chunks. - assert_eq!(storage_logs_output_files.len(), chunks_count as usize); - storage_logs_output_files.sort(); - - tracing::info!("Finished generating snapshot, storing progress in Postgres"); - let mut master_conn = master_pool - .access_storage_tagged("snapshots_creator") - .await?; - master_conn - .snapshots_dal() - .add_snapshot( - l1_batch_number, - &storage_logs_output_files, - &factory_deps_output_file, - ) - .await?; - - METRICS.snapshot_l1_batch.set(l1_batch_number.0 as u64); - - let elapsed = latency.observe(); - tracing::info!("snapshot_generation_duration: {elapsed:?}"); - tracing::info!("snapshot_l1_batch: {}", METRICS.snapshot_l1_batch.get()); - tracing::info!( - "storage_logs_chunks_count: {}", - METRICS.storage_logs_chunks_count.get() - ); - Ok(()) -} - /// Minimum number of storage log chunks to produce. const MIN_CHUNK_COUNT: u64 = 10; @@ -292,7 +90,15 @@ async fn main() -> anyhow::Result<()> { .build() .await?; - run(blob_store, replica_pool, master_pool, MIN_CHUNK_COUNT).await?; + let creator = SnapshotCreator { + blob_store, + master_pool, + replica_pool, + #[cfg(test)] + event_listener: Box::new(()), + }; + creator.run(creator_config, MIN_CHUNK_COUNT).await?; + tracing::info!("Finished running snapshot creator!"); stop_sender.send(true).ok(); if let Some(prometheus_exporter_task) = prometheus_exporter_task { diff --git a/core/bin/snapshots_creator/src/metrics.rs b/core/bin/snapshots_creator/src/metrics.rs index 194ed8f1e680..5eb1984712e5 100644 --- a/core/bin/snapshots_creator/src/metrics.rs +++ b/core/bin/snapshots_creator/src/metrics.rs @@ -24,7 +24,7 @@ pub(crate) struct SnapshotsCreatorMetrics { /// Number of chunks in the most recently generated snapshot. Set when a snapshot generation starts. pub storage_logs_chunks_count: Gauge, /// Number of chunks left to process for the snapshot being currently generated. - pub storage_logs_chunks_left_to_process: Gauge, + pub storage_logs_chunks_left_to_process: Gauge, /// Total latency of snapshot generation. #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] pub snapshot_generation_duration: Histogram, diff --git a/core/bin/snapshots_creator/src/tests.rs b/core/bin/snapshots_creator/src/tests.rs index 33d7a225b7d6..c0e8dd0cbc28 100644 --- a/core/bin/snapshots_creator/src/tests.rs +++ b/core/bin/snapshots_creator/src/tests.rs @@ -1,17 +1,107 @@ //! Lower-level tests for the snapshot creator component. -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + fmt, + sync::atomic::{AtomicUsize, Ordering}, +}; use rand::{thread_rng, Rng}; use zksync_dal::StorageProcessor; +use zksync_object_store::ObjectStore; use zksync_types::{ block::{BlockGasCount, L1BatchHeader, MiniblockHeader}, - snapshots::{SnapshotFactoryDependency, SnapshotStorageLog}, - AccountTreeId, Address, ProtocolVersion, StorageKey, StorageLog, H256, + snapshots::{ + SnapshotFactoryDependencies, SnapshotFactoryDependency, SnapshotStorageLog, + SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey, + }, + AccountTreeId, Address, L1BatchNumber, MiniblockNumber, ProtocolVersion, StorageKey, + StorageLog, H256, }; use super::*; +const TEST_CONFIG: SnapshotsCreatorConfig = SnapshotsCreatorConfig { + storage_logs_chunk_size: 1_000_000, + concurrent_queries_count: 10, +}; +const SEQUENTIAL_TEST_CONFIG: SnapshotsCreatorConfig = SnapshotsCreatorConfig { + storage_logs_chunk_size: 1_000_000, + concurrent_queries_count: 1, +}; + +#[derive(Debug)] +struct TestEventListener { + stop_after_chunk_count: usize, + processed_chunk_count: AtomicUsize, +} + +impl TestEventListener { + fn new(stop_after_chunk_count: usize) -> Self { + Self { + stop_after_chunk_count, + processed_chunk_count: AtomicUsize::new(0), + } + } +} + +impl HandleEvent for TestEventListener { + fn on_chunk_started(&self) -> TestBehavior { + let should_stop = + self.processed_chunk_count.load(Ordering::SeqCst) >= self.stop_after_chunk_count; + TestBehavior::new(should_stop) + } + + fn on_chunk_saved(&self) { + self.processed_chunk_count.fetch_add(1, Ordering::SeqCst); + } +} + +impl SnapshotCreator { + fn for_tests(blob_store: Box, pool: ConnectionPool) -> Self { + Self { + blob_store, + master_pool: pool.clone(), + replica_pool: pool, + event_listener: Box::new(()), + } + } + + fn stop_after_chunk_count(self, stop_after_chunk_count: usize) -> Self { + Self { + event_listener: Box::new(TestEventListener::new(stop_after_chunk_count)), + ..self + } + } +} + +#[derive(Debug)] +pub(crate) struct TestBehavior { + should_exit: bool, +} + +impl TestBehavior { + fn new(should_exit: bool) -> Self { + Self { should_exit } + } + + pub fn should_exit(&self) -> bool { + self.should_exit + } +} + +pub(crate) trait HandleEvent: fmt::Debug { + fn on_chunk_started(&self) -> TestBehavior { + TestBehavior::new(false) + } + + fn on_chunk_saved(&self) { + // Do nothing + } +} + +impl HandleEvent for () {} + fn gen_storage_logs(rng: &mut impl Rng, count: usize) -> Vec { (0..count) .map(|_| { @@ -159,12 +249,17 @@ async fn persisting_snapshot_metadata() { let mut conn = pool.access_storage().await.unwrap(); prepare_postgres(&mut rng, &mut conn, 10).await; - run(object_store, pool.clone(), pool.clone(), MIN_CHUNK_COUNT) + SnapshotCreator::for_tests(object_store, pool.clone()) + .run(TEST_CONFIG, MIN_CHUNK_COUNT) .await .unwrap(); // Check snapshot metadata in Postgres. - let snapshots = conn.snapshots_dal().get_all_snapshots().await.unwrap(); + let snapshots = conn + .snapshots_dal() + .get_all_complete_snapshots() + .await + .unwrap(); assert_eq!(snapshots.snapshots_l1_batch_numbers.len(), 1); let snapshot_l1_batch_number = snapshots.snapshots_l1_batch_numbers[0]; assert_eq!(snapshot_l1_batch_number, L1BatchNumber(8)); @@ -183,7 +278,11 @@ async fn persisting_snapshot_metadata() { MIN_CHUNK_COUNT as usize ); for path in &snapshot_metadata.storage_logs_filepaths { - let path = path.strip_prefix("storage_logs_snapshots/").unwrap(); + let path = path + .as_ref() + .unwrap() + .strip_prefix("storage_logs_snapshots/") + .unwrap(); assert!(path.ends_with(".proto.gzip")); } } @@ -194,12 +293,11 @@ async fn persisting_snapshot_factory_deps() { let mut rng = thread_rng(); let object_store_factory = ObjectStoreFactory::mock(); let object_store = object_store_factory.create_store().await; - - // Insert some data to Postgres. let mut conn = pool.access_storage().await.unwrap(); let expected_outputs = prepare_postgres(&mut rng, &mut conn, 10).await; - run(object_store, pool.clone(), pool.clone(), MIN_CHUNK_COUNT) + SnapshotCreator::for_tests(object_store, pool.clone()) + .run(TEST_CONFIG, MIN_CHUNK_COUNT) .await .unwrap(); let snapshot_l1_batch_number = L1BatchNumber(8); @@ -217,17 +315,24 @@ async fn persisting_snapshot_logs() { let mut rng = thread_rng(); let object_store_factory = ObjectStoreFactory::mock(); let object_store = object_store_factory.create_store().await; - - // Insert some data to Postgres. let mut conn = pool.access_storage().await.unwrap(); let expected_outputs = prepare_postgres(&mut rng, &mut conn, 10).await; - run(object_store, pool.clone(), pool.clone(), MIN_CHUNK_COUNT) + SnapshotCreator::for_tests(object_store, pool.clone()) + .run(TEST_CONFIG, MIN_CHUNK_COUNT) .await .unwrap(); let snapshot_l1_batch_number = L1BatchNumber(8); let object_store = object_store_factory.create_store().await; + assert_storage_logs(&*object_store, snapshot_l1_batch_number, &expected_outputs).await; +} + +async fn assert_storage_logs( + object_store: &dyn ObjectStore, + snapshot_l1_batch_number: L1BatchNumber, + expected_outputs: &ExpectedOutputs, +) { let mut actual_logs = HashSet::new(); for chunk_id in 0..MIN_CHUNK_COUNT { let key = SnapshotStorageLogsStorageKey { @@ -239,3 +344,114 @@ async fn persisting_snapshot_logs() { } assert_eq!(actual_logs, expected_outputs.storage_logs); } + +#[tokio::test] +async fn recovery_workflow() { + let pool = ConnectionPool::test_pool().await; + let mut rng = thread_rng(); + let object_store_factory = ObjectStoreFactory::mock(); + let object_store = object_store_factory.create_store().await; + let mut conn = pool.access_storage().await.unwrap(); + let expected_outputs = prepare_postgres(&mut rng, &mut conn, 10).await; + + SnapshotCreator::for_tests(object_store, pool.clone()) + .stop_after_chunk_count(0) + .run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT) + .await + .unwrap(); + + let snapshot_l1_batch_number = L1BatchNumber(8); + let snapshot_metadata = conn + .snapshots_dal() + .get_snapshot_metadata(snapshot_l1_batch_number) + .await + .unwrap() + .expect("No snapshot metadata"); + assert!(snapshot_metadata + .storage_logs_filepaths + .iter() + .all(Option::is_none)); + + let object_store = object_store_factory.create_store().await; + let SnapshotFactoryDependencies { factory_deps } = + object_store.get(snapshot_l1_batch_number).await.unwrap(); + let actual_deps: HashSet<_> = factory_deps.into_iter().collect(); + assert_eq!(actual_deps, expected_outputs.deps); + + // Process 2 storage log chunks, then stop. + SnapshotCreator::for_tests(object_store, pool.clone()) + .stop_after_chunk_count(2) + .run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT) + .await + .unwrap(); + + let snapshot_metadata = conn + .snapshots_dal() + .get_snapshot_metadata(snapshot_l1_batch_number) + .await + .unwrap() + .expect("No snapshot metadata"); + assert_eq!( + snapshot_metadata + .storage_logs_filepaths + .iter() + .flatten() + .count(), + 2 + ); + + // Process the remaining chunks. + let object_store = object_store_factory.create_store().await; + SnapshotCreator::for_tests(object_store, pool.clone()) + .run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT) + .await + .unwrap(); + + let object_store = object_store_factory.create_store().await; + assert_storage_logs(&*object_store, snapshot_l1_batch_number, &expected_outputs).await; +} + +#[tokio::test] +async fn recovery_workflow_with_varying_chunk_size() { + let pool = ConnectionPool::test_pool().await; + let mut rng = thread_rng(); + let object_store_factory = ObjectStoreFactory::mock(); + let object_store = object_store_factory.create_store().await; + let mut conn = pool.access_storage().await.unwrap(); + let expected_outputs = prepare_postgres(&mut rng, &mut conn, 10).await; + + SnapshotCreator::for_tests(object_store, pool.clone()) + .stop_after_chunk_count(2) + .run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT) + .await + .unwrap(); + + let snapshot_l1_batch_number = L1BatchNumber(8); + let snapshot_metadata = conn + .snapshots_dal() + .get_snapshot_metadata(snapshot_l1_batch_number) + .await + .unwrap() + .expect("No snapshot metadata"); + assert_eq!( + snapshot_metadata + .storage_logs_filepaths + .iter() + .flatten() + .count(), + 2 + ); + + let config_with_other_size = SnapshotsCreatorConfig { + storage_logs_chunk_size: 1, // << should be ignored + ..SEQUENTIAL_TEST_CONFIG + }; + let object_store = object_store_factory.create_store().await; + SnapshotCreator::for_tests(object_store, pool.clone()) + .run(config_with_other_size, MIN_CHUNK_COUNT) + .await + .unwrap(); + + let object_store = object_store_factory.create_store().await; + assert_storage_logs(&*object_store, snapshot_l1_batch_number, &expected_outputs).await; +} diff --git a/core/lib/dal/sqlx-data.json b/core/lib/dal/sqlx-data.json index 1caa9f54a1e8..183ab24fc458 100644 --- a/core/lib/dal/sqlx-data.json +++ b/core/lib/dal/sqlx-data.json @@ -339,6 +339,24 @@ }, "query": "\n SELECT\n storage_refunds\n FROM\n l1_batches\n WHERE\n number = $1\n " }, + "040eaa878c3473f5edc73b77e572b5ea100f59295cd693d14ee0d5ee089c7981": { + "describe": { + "columns": [ + { + "name": "l1_batch_number", + "ordinal": 0, + "type_info": "Int8" + } + ], + "nullable": [ + false + ], + "parameters": { + "Left": [] + } + }, + "query": "\n SELECT\n l1_batch_number\n FROM\n snapshots\n WHERE\n NOT (''::TEXT = ANY (storage_logs_filepaths))\n ORDER BY\n l1_batch_number DESC\n " + }, "04fbbd198108d2614a3b29fa795994723ebe57b3ed209069bd3db906921ef1a3": { "describe": { "columns": [ @@ -3543,6 +3561,20 @@ }, "query": "\n UPDATE transactions\n SET\n in_mempool = FALSE\n FROM\n UNNEST($1::bytea[]) AS s (address)\n WHERE\n transactions.in_mempool = TRUE\n AND transactions.initiator_address = s.address\n " }, + "31334f2878b1ac7d828d5bc22d65ef6676b2eac623c0f78634cae9072fe0498a": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [ + "Int8", + "Int4", + "Text" + ] + } + }, + "query": "\n INSERT INTO\n snapshots (\n l1_batch_number,\n storage_logs_filepaths,\n factory_deps_filepath,\n created_at,\n updated_at\n )\n VALUES\n ($1, ARRAY_FILL(''::TEXT, ARRAY[$2::INTEGER]), $3, NOW(), NOW())\n " + }, "314f7e619a34efa89255a58c89f85d4402ff6005446bbded68c8d3dbca510f37": { "describe": { "columns": [], @@ -4575,36 +4607,6 @@ }, "query": "\n INSERT INTO\n prover_fri_protocol_versions (\n id,\n recursion_scheduler_level_vk_hash,\n recursion_node_level_vk_hash,\n recursion_leaf_level_vk_hash,\n recursion_circuits_set_vks_hash,\n created_at\n )\n VALUES\n ($1, $2, $3, $4, $5, NOW())\n ON CONFLICT (id) DO NOTHING\n " }, - "4e2cb66131a524d1bd628424d0c0735d7f9b0b5820ae3a07467d2e76cd6280f9": { - "describe": { - "columns": [ - { - "name": "l1_batch_number", - "ordinal": 0, - "type_info": "Int8" - }, - { - "name": "factory_deps_filepath", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "storage_logs_filepaths", - "ordinal": 2, - "type_info": "TextArray" - } - ], - "nullable": [ - false, - false, - false - ], - "parameters": { - "Left": [] - } - }, - "query": "\n SELECT\n l1_batch_number,\n factory_deps_filepath,\n storage_logs_filepaths\n FROM\n snapshots\n " - }, "525123d4ec2b427f1c171f30d0937d8d542b4f14cf560972c005ab3cc13d1f63": { "describe": { "columns": [ @@ -6724,20 +6726,6 @@ }, "query": "\n INSERT INTO\n storage (hashed_key, address, key, value, tx_hash, created_at, updated_at)\n SELECT\n u.hashed_key,\n u.address,\n u.key,\n u.value,\n u.tx_hash,\n NOW(),\n NOW()\n FROM\n UNNEST($1::bytea[], $2::bytea[], $3::bytea[], $4::bytea[], $5::bytea[]) AS u (hashed_key, address, key, value, tx_hash)\n ON CONFLICT (hashed_key) DO\n UPDATE\n SET\n tx_hash = excluded.tx_hash,\n value = excluded.value,\n updated_at = NOW()\n " }, - "83134807aee4b6154a1aee4f76dd989d5b4637a97f815b84ace70587acc95e7c": { - "describe": { - "columns": [], - "nullable": [], - "parameters": { - "Left": [ - "Int8", - "TextArray", - "Text" - ] - } - }, - "query": "\n INSERT INTO\n snapshots (\n l1_batch_number,\n storage_logs_filepaths,\n factory_deps_filepath,\n created_at,\n updated_at\n )\n VALUES\n ($1, $2, $3, NOW(), NOW())\n " - }, "83a931ceddf34e1c760649d613f534014b9ab9ca7725e14fb17aa050d9f35eb8": { "describe": { "columns": [ @@ -7570,6 +7558,36 @@ }, "query": "\n SELECT\n factory_deps.bytecode,\n transactions.data AS \"data?\",\n transactions.contract_address AS \"contract_address?\"\n FROM\n (\n SELECT\n *\n FROM\n storage_logs\n WHERE\n storage_logs.hashed_key = $1\n ORDER BY\n miniblock_number DESC,\n operation_number DESC\n LIMIT\n 1\n ) storage_logs\n JOIN factory_deps ON factory_deps.bytecode_hash = storage_logs.value\n LEFT JOIN transactions ON transactions.hash = storage_logs.tx_hash\n WHERE\n storage_logs.value != $2\n " }, + "995cecd37a5235d1acc2e6fc418d9b6a1a6fe629f9a02c8e33330a0efda64068": { + "describe": { + "columns": [ + { + "name": "l1_batch_number", + "ordinal": 0, + "type_info": "Int8" + }, + { + "name": "factory_deps_filepath", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "storage_logs_filepaths", + "ordinal": 2, + "type_info": "TextArray" + } + ], + "nullable": [ + false, + false, + false + ], + "parameters": { + "Left": [] + } + }, + "query": "\n SELECT\n l1_batch_number,\n factory_deps_filepath,\n storage_logs_filepaths\n FROM\n snapshots\n ORDER BY\n l1_batch_number DESC\n LIMIT\n 1\n " + }, "99acb091650478fe0feb367b1d64561347b81f8931cc2addefa907c9aa9355e6": { "describe": { "columns": [ @@ -7779,6 +7797,20 @@ }, "query": "\n UPDATE contract_verification_requests\n SET\n status = 'successful',\n updated_at = NOW()\n WHERE\n id = $1\n " }, + "9de5acb3de1b96ff8eb62a6324e8e221a8ef9014458cc7f1dbc60c056a0768a0": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [ + "Int8", + "Int4", + "Text" + ] + } + }, + "query": "\n UPDATE snapshots\n SET\n storage_logs_filepaths[$2] = $3,\n updated_at = NOW()\n WHERE\n l1_batch_number = $1\n " + }, "9ef2f43e6201cc00a0e1425a666a36532fee1450733849852dfd20e18ded1f03": { "describe": { "columns": [], diff --git a/core/lib/dal/src/snapshots_dal.rs b/core/lib/dal/src/snapshots_dal.rs index 9582b3a72094..3b2e62085bb6 100644 --- a/core/lib/dal/src/snapshots_dal.rs +++ b/core/lib/dal/src/snapshots_dal.rs @@ -5,6 +5,27 @@ use zksync_types::{ use crate::{instrument::InstrumentExt, StorageProcessor}; +#[derive(Debug, sqlx::FromRow)] +struct StorageSnapshotMetadata { + l1_batch_number: i64, + storage_logs_filepaths: Vec, + factory_deps_filepath: String, +} + +impl From for SnapshotMetadata { + fn from(row: StorageSnapshotMetadata) -> Self { + Self { + l1_batch_number: L1BatchNumber(row.l1_batch_number as u32), + storage_logs_filepaths: row + .storage_logs_filepaths + .into_iter() + .map(|path| (!path.is_empty()).then_some(path)) + .collect(), + factory_deps_filepath: row.factory_deps_filepath, + } + } +} + #[derive(Debug)] pub struct SnapshotsDal<'a, 'c> { pub(crate) storage: &'a mut StorageProcessor<'c>, @@ -14,9 +35,9 @@ impl SnapshotsDal<'_, '_> { pub async fn add_snapshot( &mut self, l1_batch_number: L1BatchNumber, - storage_logs_filepaths: &[String], + storage_logs_chunk_count: u64, factory_deps_filepaths: &str, - ) -> Result<(), sqlx::Error> { + ) -> sqlx::Result<()> { sqlx::query!( r#" INSERT INTO @@ -28,10 +49,10 @@ impl SnapshotsDal<'_, '_> { updated_at ) VALUES - ($1, $2, $3, NOW(), NOW()) + ($1, ARRAY_FILL(''::TEXT, ARRAY[$2::INTEGER]), $3, NOW(), NOW()) "#, l1_batch_number.0 as i32, - storage_logs_filepaths, + storage_logs_chunk_count as i32, factory_deps_filepaths, ) .instrument("add_snapshot") @@ -41,34 +62,89 @@ impl SnapshotsDal<'_, '_> { Ok(()) } - pub async fn get_all_snapshots(&mut self) -> Result { - let records: Vec = sqlx::query!( + pub async fn add_storage_logs_filepath_for_snapshot( + &mut self, + l1_batch_number: L1BatchNumber, + chunk_id: u64, + storage_logs_filepath: &str, + ) -> sqlx::Result<()> { + sqlx::query!( + r#" + UPDATE snapshots + SET + storage_logs_filepaths[$2] = $3, + updated_at = NOW() + WHERE + l1_batch_number = $1 + "#, + l1_batch_number.0 as i32, + chunk_id as i32 + 1, + storage_logs_filepath, + ) + .execute(self.storage.conn()) + .await?; + + Ok(()) + } + + pub async fn get_all_complete_snapshots(&mut self) -> sqlx::Result { + let rows = sqlx::query!( r#" SELECT - l1_batch_number, - factory_deps_filepath, - storage_logs_filepaths + l1_batch_number FROM snapshots + WHERE + NOT (''::TEXT = ANY (storage_logs_filepaths)) + ORDER BY + l1_batch_number DESC "# ) - .instrument("get_all_snapshots") + .instrument("get_all_complete_snapshots") .report_latency() .fetch_all(self.storage.conn()) - .await? - .into_iter() - .map(|r| L1BatchNumber(r.l1_batch_number as u32)) - .collect(); + .await?; + + let snapshots_l1_batch_numbers = rows + .into_iter() + .map(|row| L1BatchNumber(row.l1_batch_number as u32)) + .collect(); + Ok(AllSnapshots { - snapshots_l1_batch_numbers: records, + snapshots_l1_batch_numbers, }) } + pub async fn get_newest_snapshot_metadata(&mut self) -> sqlx::Result> { + let row = sqlx::query_as!( + StorageSnapshotMetadata, + r#" + SELECT + l1_batch_number, + factory_deps_filepath, + storage_logs_filepaths + FROM + snapshots + ORDER BY + l1_batch_number DESC + LIMIT + 1 + "# + ) + .instrument("get_newest_snapshot_metadata") + .report_latency() + .fetch_optional(self.storage.conn()) + .await?; + + Ok(row.map(Into::into)) + } + pub async fn get_snapshot_metadata( &mut self, l1_batch_number: L1BatchNumber, - ) -> Result, sqlx::Error> { - let record: Option = sqlx::query!( + ) -> sqlx::Result> { + let row = sqlx::query_as!( + StorageSnapshotMetadata, r#" SELECT l1_batch_number, @@ -84,13 +160,9 @@ impl SnapshotsDal<'_, '_> { .instrument("get_snapshot_metadata") .report_latency() .fetch_optional(self.storage.conn()) - .await? - .map(|r| SnapshotMetadata { - l1_batch_number: L1BatchNumber(r.l1_batch_number as u32), - factory_deps_filepath: r.factory_deps_filepath, - storage_logs_filepaths: r.storage_logs_filepaths, - }); - Ok(record) + .await?; + + Ok(row.map(Into::into)) } } @@ -106,29 +178,38 @@ mod tests { let mut conn = pool.access_storage().await.unwrap(); let mut dal = conn.snapshots_dal(); let l1_batch_number = L1BatchNumber(100); - dal.add_snapshot(l1_batch_number, &[], "gs:///bucket/factory_deps.bin") + dal.add_snapshot(l1_batch_number, 2, "gs:///bucket/factory_deps.bin") .await .expect("Failed to add snapshot"); let snapshots = dal - .get_all_snapshots() + .get_all_complete_snapshots() .await .expect("Failed to retrieve snapshots"); - assert_eq!(1, snapshots.snapshots_l1_batch_numbers.len()); - assert_eq!( - snapshots.snapshots_l1_batch_numbers[0], - l1_batch_number as L1BatchNumber - ); + assert_eq!(snapshots.snapshots_l1_batch_numbers, []); + + for i in 0..2 { + dal.add_storage_logs_filepath_for_snapshot( + l1_batch_number, + i, + "gs:///bucket/chunk.bin", + ) + .await + .unwrap(); + } + + let snapshots = dal + .get_all_complete_snapshots() + .await + .expect("Failed to retrieve snapshots"); + assert_eq!(snapshots.snapshots_l1_batch_numbers, [l1_batch_number]); let snapshot_metadata = dal .get_snapshot_metadata(l1_batch_number) .await .expect("Failed to retrieve snapshot") .unwrap(); - assert_eq!( - snapshot_metadata.l1_batch_number, - l1_batch_number as L1BatchNumber - ); + assert_eq!(snapshot_metadata.l1_batch_number, l1_batch_number); } #[tokio::test] @@ -137,16 +218,14 @@ mod tests { let mut conn = pool.access_storage().await.unwrap(); let mut dal = conn.snapshots_dal(); let l1_batch_number = L1BatchNumber(100); - dal.add_snapshot( - l1_batch_number, - &[ - "gs:///bucket/test_file1.bin".to_string(), - "gs:///bucket/test_file2.bin".to_string(), - ], - "gs:///bucket/factory_deps.bin", - ) - .await - .expect("Failed to add snapshot"); + dal.add_snapshot(l1_batch_number, 2, "gs:///bucket/factory_deps.bin") + .await + .expect("Failed to add snapshot"); + + let storage_log_filepaths = ["gs:///bucket/test_file1.bin", "gs:///bucket/test_file2.bin"]; + dal.add_storage_logs_filepath_for_snapshot(l1_batch_number, 1, storage_log_filepaths[1]) + .await + .unwrap(); let files = dal .get_snapshot_metadata(l1_batch_number) @@ -154,7 +233,27 @@ mod tests { .expect("Failed to retrieve snapshot") .unwrap() .storage_logs_filepaths; - assert!(files.contains(&"gs:///bucket/test_file1.bin".to_string())); - assert!(files.contains(&"gs:///bucket/test_file2.bin".to_string())); + assert_eq!( + files, + [None, Some("gs:///bucket/test_file2.bin".to_string())] + ); + + dal.add_storage_logs_filepath_for_snapshot(l1_batch_number, 0, storage_log_filepaths[0]) + .await + .unwrap(); + + let files = dal + .get_snapshot_metadata(l1_batch_number) + .await + .expect("Failed to retrieve snapshot") + .unwrap() + .storage_logs_filepaths; + assert_eq!( + files, + [ + Some("gs:///bucket/test_file1.bin".to_string()), + Some("gs:///bucket/test_file2.bin".to_string()) + ] + ); } } diff --git a/core/lib/types/src/snapshots.rs b/core/lib/types/src/snapshots.rs index c4804880c2ac..b71a8e34b4be 100644 --- a/core/lib/types/src/snapshots.rs +++ b/core/lib/types/src/snapshots.rs @@ -7,28 +7,42 @@ use zksync_protobuf::{required, ProtoFmt}; use crate::{commitment::L1BatchWithMetadata, Bytes, StorageKey, StorageValue}; +/// Information about all snapshots persisted by the node. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct AllSnapshots { + /// L1 batch numbers for complete snapshots. Ordered by descending number (i.e., 0th element + /// corresponds to the newest snapshot). pub snapshots_l1_batch_numbers: Vec, } -// used in dal to fetch certain snapshot data -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] +/// Storage snapshot metadata. Used in DAL to fetch certain snapshot data. +#[derive(Debug, Clone)] pub struct SnapshotMetadata { + /// L1 batch for the snapshot. The data in the snapshot captures node storage at the end of this batch. pub l1_batch_number: L1BatchNumber, + /// Path to the factory dependencies blob. pub factory_deps_filepath: String, - pub storage_logs_filepaths: Vec, + /// Paths to the storage log blobs. Ordered by the chunk ID. If a certain chunk is not produced yet, + /// the corresponding path is `None`. + pub storage_logs_filepaths: Vec>, +} + +impl SnapshotMetadata { + /// Checks whether a snapshot is complete (contains all information to restore from). + pub fn is_complete(&self) -> bool { + self.storage_logs_filepaths.iter().all(Option::is_some) + } } -//contains all data not contained in factory_deps/storage_logs files to perform restore process +/// Snapshot data returned by using JSON-RPC API. +/// Contains all data not contained in factory deps / storage logs files to perform restore process. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct SnapshotHeader { pub l1_batch_number: L1BatchNumber, pub miniblock_number: MiniblockNumber, - //ordered by chunk ids + /// Ordered by chunk IDs. pub storage_logs_chunks: Vec, pub factory_deps_filepath: String, pub last_l1_batch_with_metadata: L1BatchWithMetadata, @@ -42,7 +56,7 @@ pub struct SnapshotStorageLogsChunkMetadata { pub filepath: String, } -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct SnapshotStorageLogsStorageKey { pub l1_batch_number: L1BatchNumber, diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/snapshots.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/snapshots.rs index bd192e872866..b45fe9a472ee 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/snapshots.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/snapshots.rs @@ -29,7 +29,7 @@ impl SnapshotsNamespace { .map_err(|err| internal_error(method_name, err))?; let mut snapshots_dal = storage_processor.snapshots_dal(); let response = snapshots_dal - .get_all_snapshots() + .get_all_complete_snapshots() .await .map_err(|err| internal_error(method_name, err)); method_latency.observe(); @@ -48,45 +48,61 @@ impl SnapshotsNamespace { .access_storage_tagged("api") .await .map_err(|err| internal_error(method_name, err))?; - let mut snapshots_dal = storage_processor.snapshots_dal(); - let snapshot_metadata = snapshots_dal + let snapshot_metadata = storage_processor + .snapshots_dal() .get_snapshot_metadata(l1_batch_number) .await .map_err(|err| internal_error(method_name, err))?; - if let Some(snapshot_metadata) = snapshot_metadata { - let snapshot_files = snapshot_metadata.storage_logs_filepaths.clone(); - let chunks = snapshot_files - .iter() - .enumerate() - .map(|(chunk_id, filepath)| SnapshotStorageLogsChunkMetadata { - chunk_id: chunk_id as u64, - filepath: filepath.clone(), - }) - .collect(); - let l1_batch_with_metadata = storage_processor - .blocks_dal() - .get_l1_batch_metadata(l1_batch_number) - .await - .map_err(|err| internal_error(method_name, err))? - .unwrap(); - let miniblock_number = storage_processor - .blocks_dal() - .get_miniblock_range_of_l1_batch(l1_batch_number) - .await - .map_err(|err| internal_error(method_name, err))? - .unwrap() - .1; + + let Some(snapshot_metadata) = snapshot_metadata else { method_latency.observe(); - Ok(Some(SnapshotHeader { - l1_batch_number: snapshot_metadata.l1_batch_number, - miniblock_number, - last_l1_batch_with_metadata: l1_batch_with_metadata, - storage_logs_chunks: chunks, - factory_deps_filepath: snapshot_metadata.factory_deps_filepath, - })) - } else { + return Ok(None); + }; + + let snapshot_files = snapshot_metadata.storage_logs_filepaths; + let is_complete = snapshot_files.iter().all(Option::is_some); + if !is_complete { + // We don't return incomplete snapshots via API. method_latency.observe(); - Ok(None) + return Ok(None); } + + let chunks = snapshot_files + .into_iter() + .enumerate() + .filter_map(|(chunk_id, filepath)| { + Some(SnapshotStorageLogsChunkMetadata { + chunk_id: chunk_id as u64, + filepath: filepath?, + }) + }) + .collect(); + let l1_batch_with_metadata = storage_processor + .blocks_dal() + .get_l1_batch_metadata(l1_batch_number) + .await + .map_err(|err| internal_error(method_name, err))? + .ok_or_else(|| { + let err = format!("missing metadata for L1 batch #{l1_batch_number}"); + internal_error(method_name, err) + })?; + let (_, miniblock_number) = storage_processor + .blocks_dal() + .get_miniblock_range_of_l1_batch(l1_batch_number) + .await + .map_err(|err| internal_error(method_name, err))? + .ok_or_else(|| { + let err = format!("missing miniblocks for L1 batch #{l1_batch_number}"); + internal_error(method_name, err) + })?; + + method_latency.observe(); + Ok(Some(SnapshotHeader { + l1_batch_number: snapshot_metadata.l1_batch_number, + miniblock_number, + last_l1_batch_with_metadata: l1_batch_with_metadata, + storage_logs_chunks: chunks, + factory_deps_filepath: snapshot_metadata.factory_deps_filepath, + })) } } diff --git a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs index 7160975d8706..3333e72faf54 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs @@ -30,6 +30,7 @@ use crate::{ state_keeper::tests::create_l2_transaction, }; +mod snapshots; mod ws; const TEST_TIMEOUT: Duration = Duration::from_secs(10); @@ -138,6 +139,9 @@ async fn spawn_server( .await; let (pub_sub_events_sender, pub_sub_events_receiver) = mpsc::unbounded_channel(); + let mut namespaces = Namespace::DEFAULT.to_vec(); + namespaces.push(Namespace::Snapshots); + let server_builder = match transport { ApiTransportLabel::Http => ApiBuilder::jsonrpsee_backend(api_config, pool).http(0), ApiTransportLabel::Ws => { @@ -156,7 +160,7 @@ async fn spawn_server( .with_polling_interval(POLL_INTERVAL) .with_tx_sender(tx_sender, vm_barrier) .with_pub_sub_events(pub_sub_events_sender) - .enable_api_namespaces(Namespace::DEFAULT.to_vec()) + .enable_api_namespaces(namespaces) .build(stop_receiver) .await .expect("Failed spawning JSON-RPC server"); @@ -221,8 +225,9 @@ fn create_miniblock(number: u32) -> MiniblockHeader { } } -async fn store_block(pool: &ConnectionPool) -> anyhow::Result<(MiniblockHeader, H256)> { - let mut storage = pool.access_storage().await?; +async fn store_miniblock( + storage: &mut StorageProcessor<'_>, +) -> anyhow::Result<(MiniblockHeader, H256)> { let new_tx = create_l2_transaction(1, 2); let new_tx_hash = new_tx.hash(); let tx_submission_result = storage @@ -295,10 +300,10 @@ async fn store_events( } #[derive(Debug)] -struct HttpServerBasics; +struct HttpServerBasicsTest; #[async_trait] -impl HttpTest for HttpServerBasics { +impl HttpTest for HttpServerBasicsTest { async fn test(&self, client: &HttpClient, _pool: &ConnectionPool) -> anyhow::Result<()> { let block_number = client.get_block_number().await?; assert_eq!(block_number, U64::from(0)); @@ -317,19 +322,20 @@ impl HttpTest for HttpServerBasics { #[tokio::test] async fn http_server_basics() { - test_http_server(HttpServerBasics).await; + test_http_server(HttpServerBasicsTest).await; } #[derive(Debug)] -struct BasicFilterChanges; +struct BasicFilterChangesTest; #[async_trait] -impl HttpTest for BasicFilterChanges { +impl HttpTest for BasicFilterChangesTest { async fn test(&self, client: &HttpClient, pool: &ConnectionPool) -> anyhow::Result<()> { let block_filter_id = client.new_block_filter().await?; let tx_filter_id = client.new_pending_transaction_filter().await?; - let (new_miniblock, new_tx_hash) = store_block(pool).await?; + let (new_miniblock, new_tx_hash) = + store_miniblock(&mut pool.access_storage().await?).await?; let block_filter_changes = client.get_filter_changes(block_filter_id).await?; assert_matches!( @@ -364,14 +370,14 @@ impl HttpTest for BasicFilterChanges { #[tokio::test] async fn basic_filter_changes() { - test_http_server(BasicFilterChanges).await; + test_http_server(BasicFilterChangesTest).await; } #[derive(Debug)] -struct LogFilterChanges; +struct LogFilterChangesTest; #[async_trait] -impl HttpTest for LogFilterChanges { +impl HttpTest for LogFilterChangesTest { async fn test(&self, client: &HttpClient, pool: &ConnectionPool) -> anyhow::Result<()> { let all_logs_filter_id = client.new_filter(Filter::default()).await?; let address_filter = Filter { @@ -419,14 +425,14 @@ impl HttpTest for LogFilterChanges { #[tokio::test] async fn log_filter_changes() { - test_http_server(LogFilterChanges).await; + test_http_server(LogFilterChangesTest).await; } #[derive(Debug)] -struct LogFilterChangesWithBlockBoundaries; +struct LogFilterChangesWithBlockBoundariesTest; #[async_trait] -impl HttpTest for LogFilterChangesWithBlockBoundaries { +impl HttpTest for LogFilterChangesWithBlockBoundariesTest { async fn test(&self, client: &HttpClient, pool: &ConnectionPool) -> anyhow::Result<()> { let lower_bound_filter = Filter { from_block: Some(api::BlockNumber::Number(2.into())), @@ -516,5 +522,5 @@ impl HttpTest for LogFilterChangesWithBlockBoundaries { #[tokio::test] async fn log_filter_changes_with_block_boundaries() { - test_http_server(LogFilterChangesWithBlockBoundaries).await; + test_http_server(LogFilterChangesWithBlockBoundariesTest).await; } diff --git a/core/lib/zksync_core/src/api_server/web3/tests/snapshots.rs b/core/lib/zksync_core/src/api_server/web3/tests/snapshots.rs new file mode 100644 index 000000000000..608f2845065f --- /dev/null +++ b/core/lib/zksync_core/src/api_server/web3/tests/snapshots.rs @@ -0,0 +1,125 @@ +//! Tests for the `snapshots` Web3 namespace. + +use std::collections::HashSet; + +use zksync_types::block::{BlockGasCount, L1BatchHeader}; +use zksync_web3_decl::namespaces::SnapshotsNamespaceClient; + +use super::*; +use crate::state_keeper::tests::create_l1_batch_metadata; + +async fn seal_l1_batch( + storage: &mut StorageProcessor<'_>, + number: L1BatchNumber, +) -> anyhow::Result<()> { + let header = L1BatchHeader::new( + number, + number.0.into(), + Address::repeat_byte(1), + BaseSystemContractsHashes::default(), + ProtocolVersionId::latest(), + ); + storage + .blocks_dal() + .insert_l1_batch(&header, &[], BlockGasCount::default(), &[], &[]) + .await?; + storage + .blocks_dal() + .mark_miniblocks_as_executed_in_l1_batch(number) + .await?; + let metadata = create_l1_batch_metadata(number.0); + storage + .blocks_dal() + .save_l1_batch_metadata(number, &metadata, H256::zero(), false) + .await?; + Ok(()) +} + +#[derive(Debug)] +struct SnapshotBasicsTest { + chunk_ids: HashSet, +} + +impl SnapshotBasicsTest { + const CHUNK_COUNT: u64 = 5; + + fn new(chunk_ids: impl IntoIterator) -> Self { + let chunk_ids: HashSet<_> = chunk_ids.into_iter().collect(); + assert!(chunk_ids.iter().all(|&id| id < Self::CHUNK_COUNT)); + Self { chunk_ids } + } + + fn is_complete_snapshot(&self) -> bool { + self.chunk_ids == HashSet::from_iter(0..Self::CHUNK_COUNT) + } +} + +#[async_trait] +impl HttpTest for SnapshotBasicsTest { + async fn test(&self, client: &HttpClient, pool: &ConnectionPool) -> anyhow::Result<()> { + let mut storage = pool.access_storage().await.unwrap(); + store_miniblock(&mut storage).await?; + seal_l1_batch(&mut storage, L1BatchNumber(1)).await?; + storage + .snapshots_dal() + .add_snapshot(L1BatchNumber(1), Self::CHUNK_COUNT, "file:///factory_deps") + .await?; + + for &chunk_id in &self.chunk_ids { + let path = format!("file:///storage_logs/chunk{chunk_id}"); + storage + .snapshots_dal() + .add_storage_logs_filepath_for_snapshot(L1BatchNumber(1), chunk_id, &path) + .await?; + } + + let all_snapshots = client.get_all_snapshots().await?; + if self.is_complete_snapshot() { + assert_eq!(all_snapshots.snapshots_l1_batch_numbers, [L1BatchNumber(1)]); + } else { + assert_eq!(all_snapshots.snapshots_l1_batch_numbers, []); + } + + let snapshot_header = client + .get_snapshot_by_l1_batch_number(L1BatchNumber(1)) + .await?; + let snapshot_header = if self.is_complete_snapshot() { + snapshot_header.context("no snapshot for L1 batch #1")? + } else { + assert!(snapshot_header.is_none()); + return Ok(()); + }; + + assert_eq!(snapshot_header.l1_batch_number, L1BatchNumber(1)); + assert_eq!(snapshot_header.miniblock_number, MiniblockNumber(1)); + assert_eq!( + snapshot_header.factory_deps_filepath, + "file:///factory_deps" + ); + + assert_eq!( + snapshot_header.storage_logs_chunks.len(), + self.chunk_ids.len() + ); + for chunk in &snapshot_header.storage_logs_chunks { + assert!(self.chunk_ids.contains(&chunk.chunk_id)); + assert!(chunk.filepath.starts_with("file:///storage_logs/")); + } + Ok(()) + } +} + +#[tokio::test] +async fn snapshot_without_chunks() { + test_http_server(SnapshotBasicsTest::new([])).await; +} + +#[tokio::test] +async fn snapshot_with_some_chunks() { + test_http_server(SnapshotBasicsTest::new([0, 2, 4])).await; +} + +#[tokio::test] +async fn snapshot_with_all_chunks() { + test_http_server(SnapshotBasicsTest::new(0..SnapshotBasicsTest::CHUNK_COUNT)).await; +} diff --git a/core/lib/zksync_core/src/api_server/web3/tests/ws.rs b/core/lib/zksync_core/src/api_server/web3/tests/ws.rs index af062f8367b3..509b1e194e7f 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/ws.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/ws.rs @@ -114,10 +114,10 @@ async fn test_ws_server(test: impl WsTest) { } #[derive(Debug)] -struct WsServerCanStart; +struct WsServerCanStartTest; #[async_trait] -impl WsTest for WsServerCanStart { +impl WsTest for WsServerCanStartTest { async fn test( &self, client: &WsClient, @@ -141,14 +141,14 @@ impl WsTest for WsServerCanStart { #[tokio::test] async fn ws_server_can_start() { - test_ws_server(WsServerCanStart).await; + test_ws_server(WsServerCanStartTest).await; } #[derive(Debug)] -struct BasicSubscriptions; +struct BasicSubscriptionsTest; #[async_trait] -impl WsTest for BasicSubscriptions { +impl WsTest for BasicSubscriptionsTest { async fn test( &self, client: &WsClient, @@ -172,7 +172,8 @@ impl WsTest for BasicSubscriptions { .await?; wait_for_subscription(&mut pub_sub_events, SubscriptionType::Txs).await; - let (new_miniblock, new_tx_hash) = store_block(pool).await?; + let (new_miniblock, new_tx_hash) = + store_miniblock(&mut pool.access_storage().await?).await?; let received_tx_hash = tokio::time::timeout(TEST_TIMEOUT, txs_subscription.next()) .await @@ -193,11 +194,11 @@ impl WsTest for BasicSubscriptions { #[tokio::test] async fn basic_subscriptions() { - test_ws_server(BasicSubscriptions).await; + test_ws_server(BasicSubscriptionsTest).await; } #[derive(Debug)] -struct LogSubscriptions; +struct LogSubscriptionsTest; #[derive(Debug)] struct Subscriptions { @@ -248,7 +249,7 @@ impl Subscriptions { } #[async_trait] -impl WsTest for LogSubscriptions { +impl WsTest for LogSubscriptionsTest { async fn test( &self, client: &WsClient, @@ -314,14 +315,14 @@ async fn collect_logs( #[tokio::test] async fn log_subscriptions() { - test_ws_server(LogSubscriptions).await; + test_ws_server(LogSubscriptionsTest).await; } #[derive(Debug)] -struct LogSubscriptionsWithNewBlock; +struct LogSubscriptionsWithNewBlockTest; #[async_trait] -impl WsTest for LogSubscriptionsWithNewBlock { +impl WsTest for LogSubscriptionsWithNewBlockTest { async fn test( &self, client: &WsClient, @@ -362,14 +363,14 @@ impl WsTest for LogSubscriptionsWithNewBlock { #[tokio::test] async fn log_subscriptions_with_new_block() { - test_ws_server(LogSubscriptionsWithNewBlock).await; + test_ws_server(LogSubscriptionsWithNewBlockTest).await; } #[derive(Debug)] -struct LogSubscriptionsWithManyBlocks; +struct LogSubscriptionsWithManyBlocksTest; #[async_trait] -impl WsTest for LogSubscriptionsWithManyBlocks { +impl WsTest for LogSubscriptionsWithManyBlocksTest { async fn test( &self, client: &WsClient, @@ -408,14 +409,14 @@ impl WsTest for LogSubscriptionsWithManyBlocks { #[tokio::test] async fn log_subscriptions_with_many_new_blocks_at_once() { - test_ws_server(LogSubscriptionsWithManyBlocks).await; + test_ws_server(LogSubscriptionsWithManyBlocksTest).await; } #[derive(Debug)] -struct LogSubscriptionsWithDelay; +struct LogSubscriptionsWithDelayTest; #[async_trait] -impl WsTest for LogSubscriptionsWithDelay { +impl WsTest for LogSubscriptionsWithDelayTest { async fn test( &self, client: &WsClient, @@ -472,14 +473,14 @@ impl WsTest for LogSubscriptionsWithDelay { #[tokio::test] async fn log_subscriptions_with_delay() { - test_ws_server(LogSubscriptionsWithDelay).await; + test_ws_server(LogSubscriptionsWithDelayTest).await; } #[derive(Debug)] -struct RateLimiting; +struct RateLimitingTest; #[async_trait] -impl WsTest for RateLimiting { +impl WsTest for RateLimitingTest { async fn test( &self, client: &WsClient, @@ -509,14 +510,14 @@ impl WsTest for RateLimiting { #[tokio::test] async fn rate_limiting() { - test_ws_server(RateLimiting).await; + test_ws_server(RateLimitingTest).await; } #[derive(Debug)] -struct BatchGetsRateLimited; +struct BatchGetsRateLimitedTest; #[async_trait] -impl WsTest for BatchGetsRateLimited { +impl WsTest for BatchGetsRateLimitedTest { async fn test( &self, client: &WsClient, @@ -553,5 +554,5 @@ impl WsTest for BatchGetsRateLimited { #[tokio::test] async fn batch_rate_limiting() { - test_ws_server(BatchGetsRateLimited).await; + test_ws_server(BatchGetsRateLimitedTest).await; }