diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index a3af79508576..f6696d733482 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -41,7 +41,7 @@ use zksync_reorg_detector::ReorgDetector; use zksync_shared_metrics::rustc::RUST_METRICS; use zksync_state::{PostgresStorageCaches, RocksdbStorageOptions}; use zksync_state_keeper::{ - seal_criteria::NoopSealer, AsyncRocksdbCache, BatchExecutor, OutputHandler, + seal_criteria::NoopSealer, AsyncRocksdbCache, BatchExecutor, MainBatchExecutor, OutputHandler, StateKeeperPersistence, TreeWritesPersistence, ZkSyncStateKeeper, }; use zksync_storage::RocksDB; @@ -94,7 +94,8 @@ async fn build_state_keeper( stop_receiver_clone.changed().await?; result })); - let batch_executor_base = BatchExecutor::new(save_call_traces, true); + let batch_executor_base: Box = + Box::new(MainBatchExecutor::new(save_call_traces, true)); let io = ExternalIO::new( connection_pool, diff --git a/core/bin/external_node/src/node_builder.rs b/core/bin/external_node/src/node_builder.rs index bf7fe4b1068b..ff851999f623 100644 --- a/core/bin/external_node/src/node_builder.rs +++ b/core/bin/external_node/src/node_builder.rs @@ -37,7 +37,7 @@ use zksync_node_framework::{ reorg_detector::ReorgDetectorLayer, sigint::SigintHandlerLayer, state_keeper::{ - batch_executor::BatchExecutorLayer, external_io::ExternalIOLayer, + external_io::ExternalIOLayer, main_batch_executor::MainBatchExecutorLayer, output_handler::OutputHandlerLayer, StateKeeperLayer, }, sync_state_updater::SyncStateUpdaterLayer, @@ -201,7 +201,7 @@ impl ExternalNodeBuilder { .api_namespaces() .contains(&Namespace::Debug); let main_node_batch_executor_builder_layer = - BatchExecutorLayer::new(save_call_traces, OPTIONAL_BYTECODE_COMPRESSION); + MainBatchExecutorLayer::new(save_call_traces, OPTIONAL_BYTECODE_COMPRESSION); let rocksdb_options = RocksdbStorageOptions { block_cache_capacity: self diff --git a/core/bin/zksync_server/src/node_builder.rs b/core/bin/zksync_server/src/node_builder.rs index 436a1dc1d47a..64039ddcc873 100644 --- a/core/bin/zksync_server/src/node_builder.rs +++ b/core/bin/zksync_server/src/node_builder.rs @@ -48,7 +48,7 @@ use zksync_node_framework::{ query_eth_client::QueryEthClientLayer, sigint::SigintHandlerLayer, state_keeper::{ - batch_executor::BatchExecutorLayer, mempool_io::MempoolIOLayer, + main_batch_executor::MainBatchExecutorLayer, mempool_io::MempoolIOLayer, output_handler::OutputHandlerLayer, RocksdbStorageOptions, StateKeeperLayer, }, tee_verifier_input_producer::TeeVerifierInputProducerLayer, @@ -232,7 +232,7 @@ impl MainNodeBuilder { ); let db_config = try_load_config!(self.configs.db_config); let main_node_batch_executor_builder_layer = - BatchExecutorLayer::new(sk_config.save_call_traces, OPTIONAL_BYTECODE_COMPRESSION); + MainBatchExecutorLayer::new(sk_config.save_call_traces, OPTIONAL_BYTECODE_COMPRESSION); let rocksdb_options = RocksdbStorageOptions { block_cache_capacity: db_config diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index f0752a62d3c7..aab33c7dfe83 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -257,12 +257,6 @@ impl RocksdbStorageBuilder { ) -> anyhow::Result<()> { self.0.revert(storage, last_l1_batch_to_keep).await } - - /// Returns the underlying storage without synchronizing it or checking that it's up-to-date. - /// Should not be used other than for tests. - pub fn build_unchecked(self) -> RocksdbStorage { - self.0 - } } impl RocksdbStorage { diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index d6a550bdfde0..a2009d14dece 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -34,10 +34,11 @@ use zksync_state_keeper::{ io::{IoCursor, L1BatchParams, L2BlockParams}, seal_criteria::NoopSealer, testonly::{ - fund, l1_transaction, l2_transaction, test_batch_vm::MockReadStorageFactory, MockBatchVm, + fund, l1_transaction, l2_transaction, test_batch_executor::MockReadStorageFactory, + MockBatchExecutor, }, - AsyncRocksdbCache, BatchExecutor, OutputHandler, StateKeeperPersistence, TreeWritesPersistence, - ZkSyncStateKeeper, + AsyncRocksdbCache, MainBatchExecutor, OutputHandler, StateKeeperPersistence, + TreeWritesPersistence, ZkSyncStateKeeper, }; use zksync_test_account::Account; use zksync_types::{ @@ -546,7 +547,7 @@ impl StateKeeperRunner { ZkSyncStateKeeper::new( stop_recv, Box::new(io), - BatchExecutor::new(false, false), + Box::new(MainBatchExecutor::new(false, false)), OutputHandler::new(Box::new(persistence.with_tx_insertion())) .with_handler(Box::new(self.sync_state.clone())), Arc::new(NoopSealer), @@ -627,13 +628,12 @@ impl StateKeeperRunner { ZkSyncStateKeeper::new( stop_recv, Box::new(io), - BatchExecutor::new(false, false) - .with_vm_factory(Arc::::default()), + Box::new(MockBatchExecutor), OutputHandler::new(Box::new(persistence.with_tx_insertion())) .with_handler(Box::new(tree_writes_persistence)) .with_handler(Box::new(self.sync_state.clone())), Arc::new(NoopSealer), - Arc::::default(), + Arc::new(MockReadStorageFactory), ) .run() .await diff --git a/core/node/node_framework/examples/main_node.rs b/core/node/node_framework/examples/main_node.rs index bc9fc0f79cce..38f989bda85f 100644 --- a/core/node/node_framework/examples/main_node.rs +++ b/core/node/node_framework/examples/main_node.rs @@ -42,7 +42,7 @@ use zksync_node_framework::{ query_eth_client::QueryEthClientLayer, sigint::SigintHandlerLayer, state_keeper::{ - batch_executor::BatchExecutorLayer, mempool_io::MempoolIOLayer, + main_batch_executor::MainBatchExecutorLayer, mempool_io::MempoolIOLayer, output_handler::OutputHandlerLayer, StateKeeperLayer, }, web3_api::{ @@ -161,7 +161,7 @@ impl MainNodeBuilder { wallets.state_keeper.context("State keeper wallets")?, ); let main_node_batch_executor_builder_layer = - BatchExecutorLayer::new(StateKeeperConfig::from_env()?.save_call_traces, true); + MainBatchExecutorLayer::new(StateKeeperConfig::from_env()?.save_call_traces, true); let db_config = DBConfig::from_env()?; let rocksdb_options = RocksdbStorageOptions { diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/batch_executor.rs b/core/node/node_framework/src/implementations/layers/state_keeper/batch_executor.rs deleted file mode 100644 index 7e9f8500a6a3..000000000000 --- a/core/node/node_framework/src/implementations/layers/state_keeper/batch_executor.rs +++ /dev/null @@ -1,38 +0,0 @@ -use zksync_state_keeper::BatchExecutor; - -use crate::{ - implementations::resources::state_keeper::BatchExecutorResource, - wiring_layer::{WiringError, WiringLayer}, -}; - -/// Wiring layer for `BatchExecutor`, part of the state keeper responsible for running the VM. -#[derive(Debug)] -pub struct BatchExecutorLayer { - save_call_traces: bool, - optional_bytecode_compression: bool, -} - -impl BatchExecutorLayer { - pub fn new(save_call_traces: bool, optional_bytecode_compression: bool) -> Self { - Self { - save_call_traces, - optional_bytecode_compression, - } - } -} - -#[async_trait::async_trait] -impl WiringLayer for BatchExecutorLayer { - type Input = (); - type Output = BatchExecutorResource; - - fn layer_name(&self) -> &'static str { - "batch_executor_layer" - } - - async fn wire(self, (): Self::Input) -> Result { - let batch_executor = - BatchExecutor::new(self.save_call_traces, self.optional_bytecode_compression); - Ok(batch_executor.into()) - } -} diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs b/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs new file mode 100644 index 000000000000..33d3b5676aac --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs @@ -0,0 +1,48 @@ +use zksync_state_keeper::MainBatchExecutor; + +use crate::{ + implementations::resources::state_keeper::BatchExecutorResource, + wiring_layer::{WiringError, WiringLayer}, + IntoContext, +}; + +/// Wiring layer for `MainBatchExecutor`, part of the state keeper responsible for running the VM. +#[derive(Debug)] +pub struct MainBatchExecutorLayer { + save_call_traces: bool, + optional_bytecode_compression: bool, +} + +#[derive(Debug, IntoContext)] +#[context(crate = crate)] +pub struct Output { + pub batch_executor: BatchExecutorResource, +} + +impl MainBatchExecutorLayer { + pub fn new(save_call_traces: bool, optional_bytecode_compression: bool) -> Self { + Self { + save_call_traces, + optional_bytecode_compression, + } + } +} + +#[async_trait::async_trait] +impl WiringLayer for MainBatchExecutorLayer { + type Input = (); + type Output = Output; + + fn layer_name(&self) -> &'static str { + "main_batch_executor_layer" + } + + async fn wire(self, _input: Self::Input) -> Result { + let builder = + MainBatchExecutor::new(self.save_call_traces, self.optional_bytecode_compression); + + Ok(Output { + batch_executor: builder.into(), + }) + } +} diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs index dc784bee4cbf..b0dfe0f1600c 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs @@ -8,8 +8,8 @@ use zksync_state_keeper::{ }; use zksync_storage::RocksDB; -pub mod batch_executor; pub mod external_io; +pub mod main_batch_executor; pub mod mempool_io; pub mod output_handler; @@ -85,7 +85,7 @@ impl WiringLayer for StateKeeperLayer { .batch_executor .0 .take() - .context("BatchExecutor was provided but taken by some other task")?; + .context("L1BatchExecutorBuilder was provided but taken by some other task")?; let output_handler = input .output_handler .0 @@ -125,7 +125,7 @@ impl WiringLayer for StateKeeperLayer { #[derive(Debug)] pub struct StateKeeperTask { io: Box, - batch_executor_base: BatchExecutor, + batch_executor_base: Box, output_handler: OutputHandler, sealer: Arc, storage_factory: Arc, diff --git a/core/node/node_framework/src/implementations/resources/state_keeper.rs b/core/node/node_framework/src/implementations/resources/state_keeper.rs index afcab8139bd0..5db570d7989b 100644 --- a/core/node/node_framework/src/implementations/resources/state_keeper.rs +++ b/core/node/node_framework/src/implementations/resources/state_keeper.rs @@ -26,7 +26,7 @@ impl From for StateKeeperIOResource { /// A resource that provides [`BatchExecutor`] implementation to the service. /// This resource is unique, e.g. it's expected to be consumed by a single service. #[derive(Debug, Clone)] -pub struct BatchExecutorResource(pub Unique); +pub struct BatchExecutorResource(pub Unique>); impl Resource for BatchExecutorResource { fn name() -> String { @@ -34,9 +34,9 @@ impl Resource for BatchExecutorResource { } } -impl From for BatchExecutorResource { - fn from(executor: BatchExecutor) -> Self { - Self(Unique::new(executor)) +impl From for BatchExecutorResource { + fn from(executor: T) -> Self { + Self(Unique::new(Box::new(executor))) } } diff --git a/core/node/node_sync/src/tests.rs b/core/node/node_sync/src/tests.rs index 54f13de353a4..510f9124c297 100644 --- a/core/node/node_sync/src/tests.rs +++ b/core/node/node_sync/src/tests.rs @@ -13,8 +13,8 @@ use zksync_node_test_utils::{ use zksync_state_keeper::{ io::{L1BatchParams, L2BlockParams}, seal_criteria::NoopSealer, - testonly::test_batch_vm::{MockReadStorageFactory, TestBatchVmFactory}, - BatchExecutor, OutputHandler, StateKeeperPersistence, TreeWritesPersistence, ZkSyncStateKeeper, + testonly::test_batch_executor::{MockReadStorageFactory, TestBatchExecutorBuilder}, + OutputHandler, StateKeeperPersistence, TreeWritesPersistence, ZkSyncStateKeeper, }; use zksync_types::{ api, @@ -121,19 +121,18 @@ impl StateKeeperHandles { .unwrap(); let (stop_sender, stop_receiver) = watch::channel(false); - let mut vm_factory = TestBatchVmFactory::default(); + let mut batch_executor_base = TestBatchExecutorBuilder::default(); for &tx_hashes_in_l1_batch in tx_hashes { - vm_factory.push_successful_transactions(tx_hashes_in_l1_batch); + batch_executor_base.push_successful_transactions(tx_hashes_in_l1_batch); } - let batch_executor = BatchExecutor::new(false, false).with_vm_factory(Arc::new(vm_factory)); let state_keeper = ZkSyncStateKeeper::new( stop_receiver, Box::new(io), - batch_executor, + Box::new(batch_executor_base), output_handler, Arc::new(NoopSealer), - Arc::::default(), + Arc::new(MockReadStorageFactory), ); Self { diff --git a/core/node/state_keeper/Cargo.toml b/core/node/state_keeper/Cargo.toml index 890543bcd910..904d17718503 100644 --- a/core/node/state_keeper/Cargo.toml +++ b/core/node/state_keeper/Cargo.toml @@ -33,7 +33,6 @@ zksync_base_token_adjuster.workspace = true anyhow.workspace = true async-trait.workspace = true -tempfile.workspace = true # used in `testonly` module tokio = { workspace = true, features = ["time"] } thiserror.workspace = true tracing.workspace = true @@ -46,6 +45,7 @@ hex.workspace = true assert_matches.workspace = true test-casing.workspace = true futures.workspace = true +tempfile.workspace = true zksync_eth_client.workspace = true zksync_system_constants.workspace = true diff --git a/core/node/state_keeper/src/batch_executor/main_executor.rs b/core/node/state_keeper/src/batch_executor/main_executor.rs index 196b187cea7c..e96cf9f1f7cb 100644 --- a/core/node/state_keeper/src/batch_executor/main_executor.rs +++ b/core/node/state_keeper/src/batch_executor/main_executor.rs @@ -1,32 +1,37 @@ use std::sync::Arc; use anyhow::Context as _; +use async_trait::async_trait; +use once_cell::sync::OnceCell; use tokio::{ runtime::Handle, sync::{mpsc, watch}, }; -use zksync_multivm::interface::{FinishedL1Batch, L1BatchEnv, SystemEnv}; +use zksync_multivm::{ + interface::{ + ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, SystemEnv, + VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceHistoryEnabled, + }, + tracers::CallTracer, + vm_latest::HistoryEnabled, + MultiVMTracer, VmInstance, +}; use zksync_shared_metrics::{InteractionType, TxStage, APP_METRICS}; -use zksync_state::{PgOrRocksdbStorage, ReadStorage, ReadStorageFactory, StoragePtr, StorageView}; -use zksync_types::{L1BatchNumber, Transaction}; +use zksync_state::{ReadStorage, ReadStorageFactory, StorageView}; +use zksync_types::{vm_trace::Call, Transaction}; +use zksync_utils::bytecode::CompressedBytecodeInfo; -use super::{ - traits::{BatchVm, TraceCalls}, - BatchExecutorHandle, Command, TxExecutionResult, -}; +use super::{BatchExecutor, BatchExecutorHandle, Command, TxExecutionResult}; use crate::{ - batch_executor::traits::{BatchVmFactory, DefaultBatchVmFactory}, metrics::{TxExecutionStage, BATCH_TIP_METRICS, EXECUTOR_METRICS, KEEPER_METRICS}, + types::ExecutionMetricsForCriteria, }; -/// Concrete trait object for the [`BatchVmFactory`] used in [`BatchExecutor`]. -pub type DynVmFactory = dyn for<'a> BatchVmFactory>; - -/// Batch executor which maintains the VM for the duration of a single L1 batch. -#[derive(Debug)] -pub struct BatchExecutor { - vm_factory: Arc, - trace_calls: TraceCalls, +/// The default implementation of [`BatchExecutor`]. +/// Creates a "real" batch executor which maintains the VM (as opposed to the test builder which doesn't use the VM). +#[derive(Debug, Clone)] +pub struct MainBatchExecutor { + save_call_traces: bool, /// Whether batch executor would allow transactions with bytecode that cannot be compressed. /// For new blocks, bytecode compression is mandatory -- if bytecode compression is not supported, /// the transaction will be rejected. @@ -36,27 +41,18 @@ pub struct BatchExecutor { optional_bytecode_compression: bool, } -impl BatchExecutor { - /// Creates an executor with the specified parameters. +impl MainBatchExecutor { pub fn new(save_call_traces: bool, optional_bytecode_compression: bool) -> Self { Self { - vm_factory: Arc::new(DefaultBatchVmFactory), - trace_calls: if save_call_traces { - TraceCalls::Trace - } else { - TraceCalls::Skip - }, + save_call_traces, optional_bytecode_compression, } } +} - /// Sets the VM factory used by this executor. - pub fn with_vm_factory(mut self, vm_factory: Arc) -> Self { - self.vm_factory = vm_factory; - self - } - - pub async fn init_batch( +#[async_trait] +impl BatchExecutor for MainBatchExecutor { + async fn init_batch( &mut self, storage_factory: Arc, l1_batch_params: L1BatchEnv, @@ -67,22 +63,20 @@ impl BatchExecutor { // until a previous command is processed), capacity 1 is enough for the commands channel. let (commands_sender, commands_receiver) = mpsc::channel(1); let executor = CommandReceiver { - trace_calls: self.trace_calls, + save_call_traces: self.save_call_traces, optional_bytecode_compression: self.optional_bytecode_compression, commands: commands_receiver, }; - let vm_factory = self.vm_factory.clone(); let stop_receiver = stop_receiver.clone(); let handle = tokio::task::spawn_blocking(move || { - let l1_batch_number = l1_batch_params.number; if let Some(storage) = Handle::current() - .block_on(storage_factory.access_storage(&stop_receiver, l1_batch_number - 1)) + .block_on( + storage_factory.access_storage(&stop_receiver, l1_batch_params.number - 1), + ) .context("failed accessing state keeper storage")? { - let storage_view = StorageView::new(storage).to_rc_ptr(); - let vm = vm_factory.create_vm(l1_batch_params, system_env, storage_view.clone()); - executor.run(storage_view, l1_batch_number, vm); + executor.run(storage, l1_batch_params, system_env); } else { tracing::info!("Interrupted while trying to access state keeper storage"); } @@ -92,50 +86,54 @@ impl BatchExecutor { } } -/// Command processor for `BatchExecutor`. -/// Upon launch, it initializes the VM object with provided L1 batch context and properties, and keeps invoking the commands +/// Implementation of the "primary" (non-test) batch executor. +/// Upon launch, it initializes the VM object with provided block context and properties, and keeps invoking the commands /// sent to it one by one until the batch is finished. /// /// One `CommandReceiver` can execute exactly one batch, so once the batch is sealed, a new `CommandReceiver` object must /// be constructed. #[derive(Debug)] struct CommandReceiver { - trace_calls: TraceCalls, + save_call_traces: bool, optional_bytecode_compression: bool, commands: mpsc::Receiver, } impl CommandReceiver { - fn run( + pub(super) fn run( mut self, - storage_view: StoragePtr>, - l1_batch_number: L1BatchNumber, - mut vm: Box, + secondary_storage: S, + l1_batch_params: L1BatchEnv, + system_env: SystemEnv, ) { - tracing::info!("Starting executing L1 batch #{l1_batch_number}"); + tracing::info!("Starting executing L1 batch #{}", &l1_batch_params.number); + + let storage_view = StorageView::new(secondary_storage).to_rc_ptr(); + + let mut vm = VmInstance::new(l1_batch_params, system_env, storage_view.clone()); while let Some(cmd) = self.commands.blocking_recv() { match cmd { Command::ExecuteTx(tx, resp) => { - let result = self.execute_tx(&tx, &mut *vm); + let result = self.execute_tx(&tx, &mut vm); if resp.send(result).is_err() { break; } } Command::RollbackLastTx(resp) => { - self.rollback_last_tx(&mut *vm); + self.rollback_last_tx(&mut vm); if resp.send(()).is_err() { break; } } Command::StartNextL2Block(l2_block_env, resp) => { - vm.start_new_l2_block(l2_block_env); + self.start_next_l2_block(l2_block_env, &mut vm); if resp.send(()).is_err() { break; } } Command::FinishBatch(resp) => { - let vm_block_result = self.finish_batch(&mut *vm); + let vm_block_result = self.finish_batch(&mut vm); if resp.send(vm_block_result).is_err() { break; } @@ -150,7 +148,7 @@ impl CommandReceiver { return; } Command::FinishBatchWithCache(resp) => { - let vm_block_result = self.finish_batch(&mut *vm); + let vm_block_result = self.finish_batch(&mut vm); let cache = (*storage_view).borrow().cache(); if resp.send((vm_block_result, cache)).is_err() { break; @@ -164,28 +162,66 @@ impl CommandReceiver { tracing::info!("State keeper exited with an unfinished L1 batch"); } - fn execute_tx(&self, tx: &Transaction, vm: &mut dyn BatchVm) -> TxExecutionResult { + fn execute_tx( + &self, + tx: &Transaction, + vm: &mut VmInstance, + ) -> TxExecutionResult { + // Executing a next transaction means that a previous transaction was either rolled back (in which case its snapshot + // was already removed), or that we build on top of it (in which case, it can be removed now). + vm.pop_snapshot_no_rollback(); + // Save pre-execution VM snapshot. + vm.make_snapshot(); + // Execute the transaction. let latency = KEEPER_METRICS.tx_execution_time[&TxExecutionStage::Execution].start(); - let vm_output = if self.optional_bytecode_compression { - vm.execute_transaction_with_optional_compression(tx.clone(), self.trace_calls) - } else { - vm.execute_transaction(tx.clone(), self.trace_calls) - }; + let (tx_result, compressed_bytecodes, call_tracer_result) = + if self.optional_bytecode_compression { + self.execute_tx_in_vm_with_optional_compression(tx, vm) + } else { + self.execute_tx_in_vm(tx, vm) + }; latency.observe(); APP_METRICS.processed_txs[&TxStage::StateKeeper].inc(); APP_METRICS.processed_l1_txs[&TxStage::StateKeeper].inc_by(tx.is_l1().into()); - vm_output + if let ExecutionResult::Halt { reason } = tx_result.result { + return match reason { + Halt::BootloaderOutOfGas => TxExecutionResult::BootloaderOutOfGasForTx, + _ => TxExecutionResult::RejectedByVm { reason }, + }; + } + + let tx_metrics = ExecutionMetricsForCriteria::new(Some(tx), &tx_result); + let gas_remaining = vm.gas_remaining(); + + TxExecutionResult::Success { + tx_result: Box::new(tx_result), + tx_metrics: Box::new(tx_metrics), + compressed_bytecodes, + call_tracer_result, + gas_remaining, + } } - fn rollback_last_tx(&self, vm: &mut dyn BatchVm) { + fn rollback_last_tx(&self, vm: &mut VmInstance) { let latency = KEEPER_METRICS.tx_execution_time[&TxExecutionStage::TxRollback].start(); - vm.rollback_last_transaction(); + vm.rollback_to_the_latest_snapshot(); latency.observe(); } - fn finish_batch(&self, vm: &mut dyn BatchVm) -> FinishedL1Batch { + fn start_next_l2_block( + &self, + l2_block_env: L2BlockEnv, + vm: &mut VmInstance, + ) { + vm.start_new_l2_block(l2_block_env); + } + + fn finish_batch( + &self, + vm: &mut VmInstance, + ) -> FinishedL1Batch { // The vm execution was paused right after the last transaction was executed. // There is some post-processing work that the VM needs to do before the block is fully processed. let result = vm.finish_batch(); @@ -199,4 +235,108 @@ impl CommandReceiver { BATCH_TIP_METRICS.observe(&result.block_tip_execution_result); result } + + /// Attempts to execute transaction with or without bytecode compression. + /// If compression fails, the transaction will be re-executed without compression. + fn execute_tx_in_vm_with_optional_compression( + &self, + tx: &Transaction, + vm: &mut VmInstance, + ) -> ( + VmExecutionResultAndLogs, + Vec, + Vec, + ) { + // Note, that the space where we can put the calldata for compressing transactions + // is limited and the transactions do not pay for taking it. + // In order to not let the accounts spam the space of compressed bytecodes with bytecodes + // that will not be published (e.g. due to out of gas), we use the following scheme: + // We try to execute the transaction with compressed bytecodes. + // If it fails and the compressed bytecodes have not been published, + // it means that there is no sense in polluting the space of compressed bytecodes, + // and so we re-execute the transaction, but without compression. + + let call_tracer_result = Arc::new(OnceCell::default()); + let tracer = if self.save_call_traces { + vec![CallTracer::new(call_tracer_result.clone()).into_tracer_pointer()] + } else { + vec![] + }; + + if let (Ok(()), result) = + vm.inspect_transaction_with_bytecode_compression(tracer.into(), tx.clone(), true) + { + let compressed_bytecodes = vm.get_last_tx_compressed_bytecodes(); + + let trace = Arc::try_unwrap(call_tracer_result) + .unwrap() + .take() + .unwrap_or_default(); + return (result, compressed_bytecodes, trace); + } + + // Roll back to the snapshot just before the transaction execution taken in `Self::execute_tx()` + // and create a snapshot at the same VM state again. + vm.rollback_to_the_latest_snapshot(); + vm.make_snapshot(); + + let call_tracer_result = Arc::new(OnceCell::default()); + let tracer = if self.save_call_traces { + vec![CallTracer::new(call_tracer_result.clone()).into_tracer_pointer()] + } else { + vec![] + }; + + let result = + vm.inspect_transaction_with_bytecode_compression(tracer.into(), tx.clone(), false); + result + .0 + .expect("Compression can't fail if we don't apply it"); + let compressed_bytecodes = vm.get_last_tx_compressed_bytecodes(); + + // TODO implement tracer manager which will be responsible + // for collecting result from all tracers and save it to the database + let trace = Arc::try_unwrap(call_tracer_result) + .unwrap() + .take() + .unwrap_or_default(); + (result.1, compressed_bytecodes, trace) + } + + /// Attempts to execute transaction with mandatory bytecode compression. + /// If bytecode compression fails, the transaction will be rejected. + fn execute_tx_in_vm( + &self, + tx: &Transaction, + vm: &mut VmInstance, + ) -> ( + VmExecutionResultAndLogs, + Vec, + Vec, + ) { + let call_tracer_result = Arc::new(OnceCell::default()); + let tracer = if self.save_call_traces { + vec![CallTracer::new(call_tracer_result.clone()).into_tracer_pointer()] + } else { + vec![] + }; + + let (published_bytecodes, mut result) = + vm.inspect_transaction_with_bytecode_compression(tracer.into(), tx.clone(), true); + if published_bytecodes.is_ok() { + let compressed_bytecodes = vm.get_last_tx_compressed_bytecodes(); + + let trace = Arc::try_unwrap(call_tracer_result) + .unwrap() + .take() + .unwrap_or_default(); + (result, compressed_bytecodes, trace) + } else { + // Transaction failed to publish bytecodes, we reject it so initiator doesn't pay fee. + result.result = ExecutionResult::Halt { + reason: Halt::FailedToPublishCompressedBytecodes, + }; + (result, Default::default(), Default::default()) + } + } } diff --git a/core/node/state_keeper/src/batch_executor/mod.rs b/core/node/state_keeper/src/batch_executor/mod.rs index 80b626ffda38..4577ab1b360a 100644 --- a/core/node/state_keeper/src/batch_executor/mod.rs +++ b/core/node/state_keeper/src/batch_executor/mod.rs @@ -1,28 +1,26 @@ -use std::{error::Error as StdError, sync::Arc}; +use std::{error::Error as StdError, fmt, sync::Arc}; use anyhow::Context as _; +use async_trait::async_trait; use tokio::{ - sync::{mpsc, oneshot}, + sync::{mpsc, oneshot, watch}, task::JoinHandle, }; -use zksync_multivm::interface::{FinishedL1Batch, Halt, L2BlockEnv, VmExecutionResultAndLogs}; -use zksync_state::StorageViewCache; +use zksync_multivm::interface::{ + FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionResultAndLogs, +}; +use zksync_state::{ReadStorageFactory, StorageViewCache}; use zksync_types::{vm_trace::Call, Transaction}; use zksync_utils::bytecode::CompressedBytecodeInfo; -pub use self::{ - main_executor::{BatchExecutor, DynVmFactory}, - traits::{BatchVm, BatchVmFactory, TraceCalls}, -}; use crate::{ metrics::{ExecutorCommand, EXECUTOR_METRICS}, types::ExecutionMetricsForCriteria, }; -mod main_executor; +pub mod main_executor; #[cfg(test)] mod tests; -mod traits; /// Representation of a transaction executed in the virtual machine. #[derive(Debug, Clone)] @@ -54,6 +52,20 @@ impl TxExecutionResult { } } +/// An abstraction that allows us to create different kinds of batch executors. +/// The only requirement is to return a [`BatchExecutorHandle`], which does its work +/// by communicating with the externally initialized thread. +#[async_trait] +pub trait BatchExecutor: 'static + Send + Sync + fmt::Debug { + async fn init_batch( + &mut self, + storage_factory: Arc, + l1_batch_params: L1BatchEnv, + system_env: SystemEnv, + stop_receiver: &watch::Receiver, + ) -> Option; +} + #[derive(Debug)] enum HandleOrError { Handle(JoinHandle>), @@ -98,6 +110,8 @@ pub struct BatchExecutorHandle { impl BatchExecutorHandle { /// Creates a batch executor handle from the provided sender and thread join handle. + /// Can be used to inject an alternative batch executor implementation. + #[doc(hidden)] pub(super) fn from_raw( handle: JoinHandle>, commands: mpsc::Sender, diff --git a/core/node/state_keeper/src/batch_executor/tests/mod.rs b/core/node/state_keeper/src/batch_executor/tests/mod.rs index 013b5a30ad3d..4b36965895fd 100644 --- a/core/node/state_keeper/src/batch_executor/tests/mod.rs +++ b/core/node/state_keeper/src/batch_executor/tests/mod.rs @@ -1,7 +1,6 @@ use assert_matches::assert_matches; use test_casing::{test_casing, Product}; use zksync_dal::{ConnectionPool, Core}; -use zksync_multivm::interface::ExecutionResult; use zksync_test_account::Account; use zksync_types::{get_nonce_key, utils::storage_key_for_eth_balance, PriorityOpId}; @@ -505,45 +504,3 @@ async fn catchup_rocksdb_cache() { let res = executor.execute_tx(tx).await.unwrap(); assert_rejected(&res); } - -#[tokio::test] -async fn transaction_with_large_packable_bytecode() { - const BYTECODE_LEN: usize = 350_016 + 32; // +32 to ensure validity of the bytecode - - let connection_pool = ConnectionPool::::constrained_test_pool(1).await; - let mut alice = Account::random(); - - let mut tester = Tester::new(connection_pool); - - tester.genesis().await; - tester.fund(&[alice.address()]).await; - let mut executor = tester - .create_batch_executor(StorageType::AsyncRocksdbCache) - .await; - - // Produces bytecode consisting of 2 distinct 4-byte chunks (i.e., well compressable). - let mut bytecode = Vec::with_capacity(BYTECODE_LEN); - while bytecode.len() < BYTECODE_LEN { - bytecode.extend_from_slice(&[(bytecode.len() % 8) as u8; 4]); - } - - // With bytecode compression, the transaction uses ~13M gas; w/o it, it would use ~28M. - let tx = alice.execute_with_factory_deps(vec![bytecode], 20_000_000); - let res = executor.execute_tx(tx).await.unwrap(); - if let TxExecutionResult::Success { - tx_result, - compressed_bytecodes, - .. - } = &res - { - assert_matches!(tx_result.result, ExecutionResult::Success { .. }); - assert_eq!(compressed_bytecodes.len(), 1); - assert_eq!(compressed_bytecodes[0].original.len(), BYTECODE_LEN); - let compressed_len = compressed_bytecodes[0].compressed.len(); - assert!(compressed_len < BYTECODE_LEN); - } else { - panic!("unexpected tx execution result: {res:#?}"); - } - - executor.finish_batch().await.unwrap(); -} diff --git a/core/node/state_keeper/src/batch_executor/tests/tester.rs b/core/node/state_keeper/src/batch_executor/tests/tester.rs index 878ac3cbc76a..579f3bee4819 100644 --- a/core/node/state_keeper/src/batch_executor/tests/tester.rs +++ b/core/node/state_keeper/src/batch_executor/tests/tester.rs @@ -38,7 +38,7 @@ use crate::{ testonly, testonly::BASE_SYSTEM_CONTRACTS, tests::{default_l1_batch_env, default_system_env}, - AsyncRocksdbCache, BatchExecutor, + AsyncRocksdbCache, BatchExecutor, MainBatchExecutor, }; /// Representation of configuration parameters used by the state keeper. @@ -147,7 +147,7 @@ impl Tester { l1_batch_env: L1BatchEnv, system_env: SystemEnv, ) -> BatchExecutorHandle { - let mut batch_executor = BatchExecutor::new(self.config.save_call_traces, false); + let mut batch_executor = MainBatchExecutor::new(self.config.save_call_traces, false); let (_stop_sender, stop_receiver) = watch::channel(false); batch_executor .init_batch(storage_factory, l1_batch_env, system_env, &stop_receiver) @@ -326,12 +326,6 @@ pub trait AccountLoadNextExecutable { /// Returns a valid `execute` transaction. /// Automatically increments nonce of the account. fn execute_with_gas_limit(&mut self, gas_limit: u32) -> Transaction; - /// Returns an `execute` transaction with the specified factory deps and gas limit. - fn execute_with_factory_deps( - &mut self, - factory_deps: Vec>, - gas_limit: u32, - ) -> Transaction; /// Returns a transaction to the loadnext contract with custom gas limit and expected burned gas amount. /// Increments the account nonce. fn loadnext_custom_gas_call( @@ -357,10 +351,14 @@ impl AccountLoadNextExecutable for Account { testonly::l1_transaction(self, serial_id) } + /// Returns a valid `execute` transaction. + /// Automatically increments nonce of the account. fn execute(&mut self) -> Transaction { self.execute_with_gas_limit(1_000_000) } + /// Returns a transaction to the loadnext contract with custom amount of write requests. + /// Increments the account nonce. fn loadnext_custom_writes_call( &mut self, address: Address, @@ -395,26 +393,14 @@ impl AccountLoadNextExecutable for Account { ) } + /// Returns a valid `execute` transaction. + /// Automatically increments nonce of the account. fn execute_with_gas_limit(&mut self, gas_limit: u32) -> Transaction { testonly::l2_transaction(self, gas_limit) } - fn execute_with_factory_deps( - &mut self, - factory_deps: Vec>, - gas_limit: u32, - ) -> Transaction { - self.get_l2_tx_for_execute( - Execute { - contract_address: self.address, - calldata: vec![], - value: 0.into(), - factory_deps, - }, - Some(testonly::fee(gas_limit)), - ) - } - + /// Returns a transaction to the loadnext contract with custom gas limit and expected burned gas amount. + /// Increments the account nonce. fn loadnext_custom_gas_call( &mut self, address: Address, diff --git a/core/node/state_keeper/src/batch_executor/traits.rs b/core/node/state_keeper/src/batch_executor/traits.rs deleted file mode 100644 index bf85a9905760..000000000000 --- a/core/node/state_keeper/src/batch_executor/traits.rs +++ /dev/null @@ -1,246 +0,0 @@ -//! Helper traits for the main executor. - -use std::{fmt, sync::Arc}; - -use once_cell::sync::OnceCell; -use zksync_multivm::{ - interface::{ - ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, SystemEnv, - VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceHistoryEnabled, - }, - tracers::CallTracer, - vm_latest::HistoryEnabled, - MultiVMTracer, VmInstance, -}; -use zksync_state::{ReadStorage, StoragePtr, StorageView}; -use zksync_types::{vm_trace::Call, Transaction}; -use zksync_utils::bytecode::CompressedBytecodeInfo; - -use crate::{ExecutionMetricsForCriteria, TxExecutionResult}; - -/// Tracing configuration for transaction-related [`BatchVm`] methods. -#[derive(Debug, Clone, Copy)] -pub enum TraceCalls { - /// Trace calls during transaction execution. - Trace, - /// Do not trace calls during transaction execution. - Skip, -} - -/// Object-safe version of `VmInterface` used for batch execution. -/// -/// # Invariants -/// -/// - Transaction inspection methods must create a VM snapshot before the execution, and must not roll back these snapshots. -/// - All rollbacks happen via `rollback_last_transaction`. -pub trait BatchVm { - /// Attempts to execute transaction with mandatory bytecode compression. - /// If bytecode compression fails, the transaction will be rejected. - fn execute_transaction( - &mut self, - tx: Transaction, - trace_calls: TraceCalls, - ) -> TxExecutionResult; - - /// Attempts to execute transaction with or without bytecode compression. - /// If compression fails, the transaction will be re-executed without compression. - fn execute_transaction_with_optional_compression( - &mut self, - tx: Transaction, - trace_calls: TraceCalls, - ) -> TxExecutionResult; - - /// Rolls back the VM state to before the last transaction was executed. - fn rollback_last_transaction(&mut self); - - /// Starts a new L2 block with the provided parameters. - fn start_new_l2_block(&mut self, l2_block: L2BlockEnv); - - /// Finishes executing an L1 batch. This is guaranteed to be called exactly once at the end of the VM lifecycle. - fn finish_batch(&mut self) -> FinishedL1Batch; -} - -#[derive(Debug)] -struct VmTransactionOutput { - tx_result: VmExecutionResultAndLogs, - compressed_bytecodes: Vec, - /// Empty if call tracing was not requested. - call_tracer_result: Vec, - gas_remaining: u32, -} - -impl VmTransactionOutput { - fn from_err(err: VmTransactionError) -> Self { - let mut tx_result = *err.tx_result; - tx_result.result = ExecutionResult::Halt { - reason: Halt::FailedToPublishCompressedBytecodes, - }; - Self { - tx_result, - compressed_bytecodes: vec![], - call_tracer_result: vec![], - gas_remaining: 0, // is not used; see below - } - } - - fn into_tx_result(self, tx: &Transaction) -> TxExecutionResult { - if let ExecutionResult::Halt { reason } = self.tx_result.result { - return match reason { - Halt::BootloaderOutOfGas => TxExecutionResult::BootloaderOutOfGasForTx, - _ => TxExecutionResult::RejectedByVm { reason }, - }; - } - - let tx_metrics = ExecutionMetricsForCriteria::new(Some(tx), &self.tx_result); - - TxExecutionResult::Success { - tx_result: Box::new(self.tx_result), - tx_metrics: Box::new(tx_metrics), - compressed_bytecodes: self.compressed_bytecodes, - call_tracer_result: self.call_tracer_result, - gas_remaining: self.gas_remaining, - } - } -} - -#[derive(Debug)] -struct VmTransactionError { - tx_result: Box, -} - -fn inspect_transaction( - vm: &mut VmInstance, - tx: Transaction, - trace_calls: TraceCalls, - compress_bytecodes: bool, -) -> Result { - let call_tracer_result = Arc::new(OnceCell::default()); - let tracer = match trace_calls { - TraceCalls::Trace => { - vec![CallTracer::new(call_tracer_result.clone()).into_tracer_pointer()] - } - TraceCalls::Skip => vec![], - }; - - let (compression_result, tx_result) = - vm.inspect_transaction_with_bytecode_compression(tracer.into(), tx, compress_bytecodes); - if compression_result.is_err() { - return Err(VmTransactionError { - tx_result: Box::new(tx_result), - }); - } - - let call_tracer_result = Arc::try_unwrap(call_tracer_result) - .unwrap() - .take() - .unwrap_or_default(); - Ok(VmTransactionOutput { - gas_remaining: if matches!(tx_result.result, ExecutionResult::Halt { .. }) { - 0 // FIXME: will not be used; currently, the new VM diverges in this case - } else { - vm.gas_remaining() - }, - tx_result, - compressed_bytecodes: vm.get_last_tx_compressed_bytecodes(), - call_tracer_result, - }) -} - -impl BatchVm for VmInstance { - fn execute_transaction( - &mut self, - tx: Transaction, - trace_calls: TraceCalls, - ) -> TxExecutionResult { - // Executing a next transaction means that a previous transaction was either rolled back (in which case its snapshot - // was already removed), or that we build on top of it (in which case, it can be removed now). - self.pop_snapshot_no_rollback(); - // Save pre-execution VM snapshot. - self.make_snapshot(); - - inspect_transaction(self, tx.clone(), trace_calls, true) - .unwrap_or_else(VmTransactionOutput::from_err) - .into_tx_result(&tx) - } - - fn execute_transaction_with_optional_compression( - &mut self, - tx: Transaction, - trace_calls: TraceCalls, - ) -> TxExecutionResult { - // Executing a next transaction means that a previous transaction was either rolled back (in which case its snapshot - // was already removed), or that we build on top of it (in which case, it can be removed now). - self.pop_snapshot_no_rollback(); - // Save pre-execution VM snapshot. - self.make_snapshot(); - - // Note, that the space where we can put the calldata for compressing transactions - // is limited and the transactions do not pay for taking it. - // In order to not let the accounts spam the space of compressed bytecodes with bytecodes - // that will not be published (e.g. due to out of gas), we use the following scheme: - // We try to execute the transaction with compressed bytecodes. - // If it fails and the compressed bytecodes have not been published, - // it means that there is no sense in polluting the space of compressed bytecodes, - // and so we re-execute the transaction, but without compression. - - if let Ok(output) = inspect_transaction(self, tx.clone(), trace_calls, true) { - return output.into_tx_result(&tx); - } - - // Roll back to the snapshot just before the transaction execution taken in `Self::execute_tx()` - // and create a snapshot at the same VM state again. - self.rollback_to_the_latest_snapshot(); - self.make_snapshot(); - - inspect_transaction(self, tx.clone(), trace_calls, false) - .expect("Compression can't fail if we don't apply it") - .into_tx_result(&tx) - } - - fn rollback_last_transaction(&mut self) { - self.rollback_to_the_latest_snapshot(); - } - - fn start_new_l2_block(&mut self, l2_block: L2BlockEnv) { - VmInterface::start_new_l2_block(self, l2_block); - } - - fn finish_batch(&mut self) -> FinishedL1Batch { - VmInterface::finish_batch(self) - } -} - -/// VM factory used by the main batch executor. Encapsulates the storage type used by the executor -/// so that it's opaque from the implementor's perspective. -pub trait BatchVmFactory: fmt::Debug + Send + Sync { - fn create_vm<'a>( - &self, - l1_batch_params: L1BatchEnv, - system_env: SystemEnv, - storage: StoragePtr>, - ) -> Box - where - S: 'a; -} - -/// Default VM factory implementation used in the batch executor unless it's overridden. -#[derive(Debug)] -pub(super) struct DefaultBatchVmFactory; - -impl BatchVmFactory for DefaultBatchVmFactory { - fn create_vm<'a>( - &self, - l1_batch_params: L1BatchEnv, - system_env: SystemEnv, - storage: StoragePtr>, - ) -> Box - where - S: 'a, - { - Box::new(VmInstance::<_, HistoryEnabled>::new( - l1_batch_params, - system_env, - storage, - )) - } -} diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index 315c4ca7b35b..6c1718232a09 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -61,7 +61,7 @@ pub struct ZkSyncStateKeeper { stop_receiver: watch::Receiver, io: Box, output_handler: OutputHandler, - batch_executor_base: BatchExecutor, + batch_executor_base: Box, sealer: Arc, storage_factory: Arc, } @@ -70,7 +70,7 @@ impl ZkSyncStateKeeper { pub fn new( stop_receiver: watch::Receiver, sequencer: Box, - batch_executor_base: BatchExecutor, + batch_executor_base: Box, output_handler: OutputHandler, sealer: Arc, storage_factory: Arc, diff --git a/core/node/state_keeper/src/lib.rs b/core/node/state_keeper/src/lib.rs index e2ba887a6801..1c12f7825486 100644 --- a/core/node/state_keeper/src/lib.rs +++ b/core/node/state_keeper/src/lib.rs @@ -10,7 +10,9 @@ use zksync_node_fee_model::BatchFeeModelInputProvider; use zksync_types::L2ChainId; pub use self::{ - batch_executor::{BatchExecutor, BatchExecutorHandle, DynVmFactory, TxExecutionResult}, + batch_executor::{ + main_executor::MainBatchExecutor, BatchExecutor, BatchExecutorHandle, TxExecutionResult, + }, io::{ mempool::MempoolIO, L2BlockParams, L2BlockSealerTask, OutputHandler, StateKeeperIO, StateKeeperOutputHandler, StateKeeperPersistence, TreeWritesPersistence, @@ -50,7 +52,7 @@ pub async fn create_state_keeper( output_handler: OutputHandler, stop_receiver: watch::Receiver, ) -> ZkSyncStateKeeper { - let batch_executor_base = BatchExecutor::new(state_keeper_config.save_call_traces, false); + let batch_executor_base = MainBatchExecutor::new(state_keeper_config.save_call_traces, false); let io = MempoolIO::new( mempool, @@ -68,7 +70,7 @@ pub async fn create_state_keeper( ZkSyncStateKeeper::new( stop_receiver, Box::new(io), - batch_executor_base, + Box::new(batch_executor_base), output_handler, Arc::new(sealer), Arc::new(async_cache), diff --git a/core/node/state_keeper/src/testonly/mod.rs b/core/node/state_keeper/src/testonly/mod.rs index 216508504490..80514f51e8c3 100644 --- a/core/node/state_keeper/src/testonly/mod.rs +++ b/core/node/state_keeper/src/testonly/mod.rs @@ -1,16 +1,21 @@ -//! Test utilities that can be used for testing sequencer that may be useful outside of this crate. +//! Test utilities that can be used for testing sequencer that may +//! be useful outside of this crate. +use std::sync::Arc; + +use async_trait::async_trait; use once_cell::sync::Lazy; +use tokio::sync::{mpsc, watch}; use zksync_contracts::BaseSystemContracts; use zksync_dal::{ConnectionPool, Core, CoreDal as _}; use zksync_multivm::{ interface::{ - CurrentExecutionState, ExecutionResult, FinishedL1Batch, L1BatchEnv, L2BlockEnv, Refunds, - SystemEnv, VmExecutionResultAndLogs, VmExecutionStatistics, + CurrentExecutionState, ExecutionResult, FinishedL1Batch, L1BatchEnv, Refunds, SystemEnv, + VmExecutionResultAndLogs, VmExecutionStatistics, }, vm_latest::VmExecutionLogs, }; -use zksync_state::{ReadStorage, StoragePtr, StorageView}; +use zksync_state::{ReadStorageFactory, StorageViewCache}; use zksync_test_account::Account; use zksync_types::{ fee::Fee, utils::storage_key_for_standard_token_balance, AccountTreeId, Address, Execute, @@ -20,11 +25,11 @@ use zksync_types::{ use zksync_utils::u256_to_h256; use crate::{ - batch_executor::{BatchVm, BatchVmFactory, TraceCalls}, - ExecutionMetricsForCriteria, TxExecutionResult, + batch_executor::{BatchExecutor, BatchExecutorHandle, Command, TxExecutionResult}, + types::ExecutionMetricsForCriteria, }; -pub mod test_batch_vm; +pub mod test_batch_executor; pub(super) static BASE_SYSTEM_CONTRACTS: Lazy = Lazy::new(BaseSystemContracts::load_from_disk); @@ -71,51 +76,44 @@ pub(crate) fn successful_exec() -> TxExecutionResult { } } -/// `BatchVm` implementation which doesn't check anything at all. Accepts all transactions. -#[derive(Debug, Default)] -pub struct MockBatchVm; +pub(crate) fn storage_view_cache() -> StorageViewCache { + StorageViewCache::default() +} -impl BatchVm for MockBatchVm { - fn execute_transaction( - &mut self, - _tx: Transaction, - _trace_calls: TraceCalls, - ) -> TxExecutionResult { - successful_exec() - } +/// `BatchExecutor` which doesn't check anything at all. Accepts all transactions. +#[derive(Debug)] +pub struct MockBatchExecutor; - fn execute_transaction_with_optional_compression( +#[async_trait] +impl BatchExecutor for MockBatchExecutor { + async fn init_batch( &mut self, - tx: Transaction, - trace_calls: TraceCalls, - ) -> TxExecutionResult { - self.execute_transaction(tx, trace_calls) - } - - fn rollback_last_transaction(&mut self) { - panic!("unexpected rollback"); - } - - fn start_new_l2_block(&mut self, _l2_block: L2BlockEnv) { - // Do nothing - } - - fn finish_batch(&mut self) -> FinishedL1Batch { - default_vm_batch_result() - } -} - -impl BatchVmFactory for MockBatchVm { - fn create_vm<'a>( - &self, - _l1_batch_params: L1BatchEnv, + _storage_factory: Arc, + _l1batch_params: L1BatchEnv, _system_env: SystemEnv, - _storage: StoragePtr>, - ) -> Box - where - S: 'a, - { - Box::new(Self) + _stop_receiver: &watch::Receiver, + ) -> Option { + let (send, recv) = mpsc::channel(1); + let handle = tokio::task::spawn(async { + let mut recv = recv; + while let Some(cmd) = recv.recv().await { + match cmd { + Command::ExecuteTx(_, resp) => resp.send(successful_exec()).unwrap(), + Command::StartNextL2Block(_, resp) => resp.send(()).unwrap(), + Command::RollbackLastTx(_) => panic!("unexpected rollback"), + Command::FinishBatch(resp) => { + // Blanket result, it doesn't really matter. + resp.send(default_vm_batch_result()).unwrap(); + break; + } + Command::FinishBatchWithCache(resp) => resp + .send((default_vm_batch_result(), storage_view_cache())) + .unwrap(), + } + } + anyhow::Ok(()) + }); + Some(BatchExecutorHandle::from_raw(handle, send)) } } diff --git a/core/node/state_keeper/src/testonly/test_batch_vm.rs b/core/node/state_keeper/src/testonly/test_batch_executor.rs similarity index 82% rename from core/node/state_keeper/src/testonly/test_batch_vm.rs rename to core/node/state_keeper/src/testonly/test_batch_executor.rs index f62f80c9e11e..1be84cfbf54e 100644 --- a/core/node/state_keeper/src/testonly/test_batch_vm.rs +++ b/core/node/state_keeper/src/testonly/test_batch_executor.rs @@ -1,7 +1,7 @@ // TODO(QIT-33): Some of the interfaces are public, and some are only used in tests within this crate. // This causes crate-local interfaces to spawn a warning without `cfg(test)`. The interfaces here must // be revisited and properly split into "truly public" (e.g. useful for other crates to test, say, different -// IO implementations) and "local-test-only" (e.g. used only in tests within this crate). +// IO or `BatchExecutor` implementations) and "local-test-only" (e.g. used only in tests within this crate). #![allow(dead_code)] use std::{ @@ -13,32 +13,29 @@ use std::{ }; use async_trait::async_trait; -use tokio::sync::{watch, watch::Receiver}; +use tokio::sync::{mpsc, watch, watch::Receiver}; use zksync_contracts::BaseSystemContracts; use zksync_multivm::{ - interface::{ - ExecutionResult, FinishedL1Batch, L1BatchEnv, L2BlockEnv, SystemEnv, - VmExecutionResultAndLogs, - }, + interface::{ExecutionResult, L1BatchEnv, SystemEnv, VmExecutionResultAndLogs}, vm_latest::constants::BATCH_COMPUTATIONAL_GAS_LIMIT, }; use zksync_node_test_utils::create_l2_transaction; -use zksync_state::{ - PgOrRocksdbStorage, ReadStorage, ReadStorageFactory, RocksdbStorage, StoragePtr, StorageView, -}; +use zksync_state::{PgOrRocksdbStorage, ReadStorageFactory}; use zksync_types::{ fee_model::BatchFeeInput, protocol_upgrade::ProtocolUpgradeTx, Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, }; use crate::{ - batch_executor::{BatchVm, BatchVmFactory, TraceCalls, TxExecutionResult}, + batch_executor::{BatchExecutor, BatchExecutorHandle, Command, TxExecutionResult}, io::{IoCursor, L1BatchParams, L2BlockParams, PendingBatchData, StateKeeperIO}, seal_criteria::{IoSealCriteria, SequencerSealer, UnexecutableReason}, - testonly::{default_vm_batch_result, successful_exec, BASE_SYSTEM_CONTRACTS}, + testonly::{ + default_vm_batch_result, storage_view_cache, successful_exec, BASE_SYSTEM_CONTRACTS, + }, types::ExecutionMetricsForCriteria, updates::UpdatesManager, - BatchExecutor, OutputHandler, StateKeeperOutputHandler, ZkSyncStateKeeper, + OutputHandler, StateKeeperOutputHandler, ZkSyncStateKeeper, }; pub const FEE_ACCOUNT: Address = Address::repeat_byte(0x11); @@ -201,18 +198,16 @@ impl TestScenario { pub(crate) async fn run(self, sealer: SequencerSealer) { assert!(!self.actions.is_empty(), "Test scenario can't be empty"); - let batch_executor = BatchExecutor::new(false, false) - .with_vm_factory(Arc::new(TestBatchVmFactory::new(&self))); - + let batch_executor_base = TestBatchExecutorBuilder::new(&self); let (stop_sender, stop_receiver) = watch::channel(false); let (io, output_handler) = TestIO::new(stop_sender, self); let state_keeper = ZkSyncStateKeeper::new( stop_receiver, Box::new(io), - batch_executor, + Box::new(batch_executor_base), output_handler, Arc::new(sealer), - Arc::::default(), + Arc::new(MockReadStorageFactory), ); let sk_thread = tokio::spawn(state_keeper.run()); @@ -340,17 +335,17 @@ impl fmt::Debug for ScenarioItem { type ExpectedTransactions = VecDeque>>; #[derive(Debug, Default)] -pub struct TestBatchVmFactory { +pub struct TestBatchExecutorBuilder { /// Sequence of known transaction execution results per batch. /// We need to store txs for each batch separately, since the same transaction /// can be executed in several batches (e.g. after an `ExcludeAndSeal` rollback). /// When initializing each batch, we will `pop_front` known txs for the corresponding executor. - txs: Mutex, + txs: ExpectedTransactions, /// Set of transactions that would be rolled back at least once. rollback_set: HashSet, } -impl TestBatchVmFactory { +impl TestBatchExecutorBuilder { pub(crate) fn new(scenario: &TestScenario) -> Self { let mut txs = VecDeque::new(); let mut batch_txs = HashMap::new(); @@ -402,10 +397,7 @@ impl TestBatchVmFactory { // for the initialization of the "next-to-last" batch. txs.push_back(HashMap::default()); - Self { - txs: Mutex::new(txs), - rollback_set, - } + Self { txs, rollback_set } } /// Adds successful transactions to be executed in a single L1 batch. @@ -414,30 +406,37 @@ impl TestBatchVmFactory { .iter() .copied() .map(|tx_hash| (tx_hash, VecDeque::from([successful_exec()]))); - self.txs.lock().unwrap().push_back(txs.collect()); + self.txs.push_back(txs.collect()); } } -impl BatchVmFactory for TestBatchVmFactory { - fn create_vm<'a>( - &self, - _l1_batch_params: L1BatchEnv, +#[async_trait] +impl BatchExecutor for TestBatchExecutorBuilder { + async fn init_batch( + &mut self, + _storage_factory: Arc, + _l1batch_params: L1BatchEnv, _system_env: SystemEnv, - _storage: StoragePtr>, - ) -> Box - where - S: 'a, - { - Box::new(TestBatchVm { - txs: self.txs.lock().unwrap().pop_front().unwrap(), - rollback_set: self.rollback_set.clone(), - last_tx: H256::default(), // We don't expect rollbacks until the first tx is executed. - }) + _stop_receiver: &watch::Receiver, + ) -> Option { + let (commands_sender, commands_receiver) = mpsc::channel(1); + + let executor = TestBatchExecutor::new( + commands_receiver, + self.txs.pop_front().unwrap(), + self.rollback_set.clone(), + ); + let handle = tokio::task::spawn_blocking(move || { + executor.run(); + Ok(()) + }); + Some(BatchExecutorHandle::from_raw(handle, commands_sender)) } } #[derive(Debug)] -struct TestBatchVm { +pub(super) struct TestBatchExecutor { + commands: mpsc::Receiver, /// Mapping tx -> response. /// The same transaction can be executed several times, so we use a sequence of responses and consume them by one. txs: HashMap>, @@ -447,53 +446,66 @@ struct TestBatchVm { last_tx: H256, } -impl BatchVm for TestBatchVm { - fn execute_transaction( - &mut self, - tx: Transaction, - _trace_calls: TraceCalls, - ) -> TxExecutionResult { - let result = self - .txs - .get_mut(&tx.hash()) - .unwrap() - .pop_front() - .unwrap_or_else(|| { - panic!("Received a request to execute an unknown transaction: {tx:?}") - }); - self.last_tx = tx.hash(); - result - } - - fn execute_transaction_with_optional_compression( - &mut self, - tx: Transaction, - trace_calls: TraceCalls, - ) -> TxExecutionResult { - self.execute_transaction(tx, trace_calls) - } - - fn rollback_last_transaction(&mut self) { - // This is an additional safety check: IO would check that every rollback is included in the - // test scenario, but here we want to additionally check that each such request goes - // to the batch executor as well. - if !self.rollback_set.contains(&self.last_tx) { - // Request to rollback an unexpected tx. - panic!( - "Received a request to rollback an unexpected tx. Last executed tx: {:?}", - self.last_tx - ) +impl TestBatchExecutor { + pub(super) fn new( + commands: mpsc::Receiver, + txs: HashMap>, + rollback_set: HashSet, + ) -> Self { + Self { + commands, + txs, + rollback_set, + last_tx: H256::default(), // We don't expect rollbacks until the first tx is executed. } - // It's OK to not update `last_executed_tx`, since state keeper never should rollback more than 1 - // tx in a row, and it's going to cause a panic anyway. } - fn start_new_l2_block(&mut self, _l2_block: L2BlockEnv) { - // Do nothing - } - - fn finish_batch(&mut self) -> FinishedL1Batch { - default_vm_batch_result() + pub(super) fn run(mut self) { + while let Some(cmd) = self.commands.blocking_recv() { + match cmd { + Command::ExecuteTx(tx, resp) => { + let result = self + .txs + .get_mut(&tx.hash()) + .unwrap() + .pop_front() + .unwrap_or_else(|| { + panic!( + "Received a request to execute an unknown transaction: {:?}", + tx + ) + }); + resp.send(result).unwrap(); + self.last_tx = tx.hash(); + } + Command::StartNextL2Block(_, resp) => { + resp.send(()).unwrap(); + } + Command::RollbackLastTx(resp) => { + // This is an additional safety check: IO would check that every rollback is included in the + // test scenario, but here we want to additionally check that each such request goes to the + // the batch executor as well. + if !self.rollback_set.contains(&self.last_tx) { + // Request to rollback an unexpected tx. + panic!( + "Received a request to rollback an unexpected tx. Last executed tx: {:?}", + self.last_tx + ) + } + resp.send(()).unwrap(); + // It's OK to not update `last_executed_tx`, since state keeper never should rollback more than 1 + // tx in a row, and it's going to cause a panic anyway. + } + Command::FinishBatch(resp) => { + // Blanket result, it doesn't really matter. + resp.send(default_vm_batch_result()).unwrap(); + return; + } + Command::FinishBatchWithCache(resp) => resp + .send((default_vm_batch_result(), storage_view_cache())) + .unwrap(), + } + } } } @@ -522,7 +534,7 @@ impl StateKeeperOutputHandler for TestPersistence { async fn handle_l2_block(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { let action = self.pop_next_item("seal_l2_block"); let ScenarioItem::L2BlockSeal(_, check_fn) = action else { - anyhow::bail!("Unexpected action: {action:?}"); + anyhow::bail!("Unexpected action: {:?}", action); }; if let Some(check_fn) = check_fn { check_fn(updates_manager); @@ -793,19 +805,47 @@ impl StateKeeperIO for TestIO { } } +/// `BatchExecutor` which doesn't check anything at all. Accepts all transactions. +// FIXME: move to `utils`? #[derive(Debug)] -pub struct MockReadStorageFactory { - rocksdb_dir: tempfile::TempDir, -} +pub(crate) struct MockBatchExecutor; -impl Default for MockReadStorageFactory { - fn default() -> Self { - Self { - rocksdb_dir: tempfile::TempDir::new().expect("failed creating temporary RocksDB dir"), - } +#[async_trait] +impl BatchExecutor for MockBatchExecutor { + async fn init_batch( + &mut self, + _storage_factory: Arc, + _l1batch_params: L1BatchEnv, + _system_env: SystemEnv, + _stop_receiver: &watch::Receiver, + ) -> Option { + let (send, recv) = mpsc::channel(1); + let handle = tokio::task::spawn(async { + let mut recv = recv; + while let Some(cmd) = recv.recv().await { + match cmd { + Command::ExecuteTx(_, resp) => resp.send(successful_exec()).unwrap(), + Command::StartNextL2Block(_, resp) => resp.send(()).unwrap(), + Command::RollbackLastTx(_) => panic!("unexpected rollback"), + Command::FinishBatch(resp) => { + // Blanket result, it doesn't really matter. + resp.send(default_vm_batch_result()).unwrap(); + break; + } + Command::FinishBatchWithCache(resp) => resp + .send((default_vm_batch_result(), storage_view_cache())) + .unwrap(), + } + } + anyhow::Ok(()) + }); + Some(BatchExecutorHandle::from_raw(handle, send)) } } +#[derive(Debug)] +pub struct MockReadStorageFactory; + #[async_trait] impl ReadStorageFactory for MockReadStorageFactory { async fn access_storage( @@ -813,10 +853,7 @@ impl ReadStorageFactory for MockReadStorageFactory { _stop_receiver: &Receiver, _l1_batch_number: L1BatchNumber, ) -> anyhow::Result>> { - let storage = RocksdbStorage::builder(self.rocksdb_dir.path()) - .await - .expect("failed creating RocksdbStorage") - .build_unchecked(); - Ok(Some(storage.into())) + // Presume that the storage is never accessed in mocked environment + unimplemented!() } } diff --git a/core/node/state_keeper/src/tests/mod.rs b/core/node/state_keeper/src/tests/mod.rs index 634c68fb5fdf..8bfc53c8f7b1 100644 --- a/core/node/state_keeper/src/tests/mod.rs +++ b/core/node/state_keeper/src/tests/mod.rs @@ -37,16 +37,16 @@ use crate::{ }, testonly::{ successful_exec, - test_batch_vm::{ + test_batch_executor::{ random_tx, random_upgrade_tx, rejected_exec, successful_exec_with_metrics, - MockReadStorageFactory, TestBatchVmFactory, TestIO, TestScenario, FEE_ACCOUNT, + MockReadStorageFactory, TestBatchExecutorBuilder, TestIO, TestScenario, FEE_ACCOUNT, }, BASE_SYSTEM_CONTRACTS, }, types::ExecutionMetricsForCriteria, updates::UpdatesManager, utils::l1_batch_base_cost, - BatchExecutor, ZkSyncStateKeeper, + ZkSyncStateKeeper, }; /// Creates a mock `PendingBatchData` object containing the provided sequence of L2 blocks. @@ -425,8 +425,7 @@ async fn pending_batch_is_applied() { async fn load_upgrade_tx() { let sealer = SequencerSealer::default(); let scenario = TestScenario::new(); - let batch_executor = BatchExecutor::new(false, false) - .with_vm_factory(Arc::new(TestBatchVmFactory::new(&scenario))); + let batch_executor_base = TestBatchExecutorBuilder::new(&scenario); let (stop_sender, stop_receiver) = watch::channel(false); let (mut io, output_handler) = TestIO::new(stop_sender, scenario); @@ -436,10 +435,10 @@ async fn load_upgrade_tx() { let mut sk = ZkSyncStateKeeper::new( stop_receiver, Box::new(io), - batch_executor, + Box::new(batch_executor_base), output_handler, Arc::new(sealer), - Arc::::default(), + Arc::new(MockReadStorageFactory), ); // Since the version hasn't changed, and we are not using shared bridge, we should not load any diff --git a/core/node/vm_runner/src/impls/bwip.rs b/core/node/vm_runner/src/impls/bwip.rs index ed1c21098ecc..7ab18397353d 100644 --- a/core/node/vm_runner/src/impls/bwip.rs +++ b/core/node/vm_runner/src/impls/bwip.rs @@ -6,7 +6,7 @@ use tokio::sync::watch; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_object_store::ObjectStore; use zksync_prover_interface::inputs::VMRunWitnessInputData; -use zksync_state_keeper::{BatchExecutor, StateKeeperOutputHandler, UpdatesManager}; +use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesManager}; use zksync_types::{ block::StorageOracleInfo, witness_block_state::WitnessStorageState, L1BatchNumber, L2ChainId, H256, @@ -47,13 +47,13 @@ impl BasicWitnessInputProducer { }; let (output_handler_factory, output_handler_factory_task) = ConcurrentOutputHandlerFactory::new(pool.clone(), io.clone(), output_handler_factory); - let batch_processor = BatchExecutor::new(false, false); + let batch_processor = MainBatchExecutor::new(false, false); let vm_runner = VmRunner::new( pool, Box::new(io), Arc::new(loader), Box::new(output_handler_factory), - batch_processor, + Box::new(batch_processor), ); Ok(( Self { vm_runner }, diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs index bdf4a14bc0e4..3be37b77d114 100644 --- a/core/node/vm_runner/src/impls/protective_reads.rs +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -4,7 +4,7 @@ use anyhow::Context; use async_trait::async_trait; use tokio::sync::watch; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; -use zksync_state_keeper::{BatchExecutor, StateKeeperOutputHandler, UpdatesManager}; +use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesManager}; use zksync_types::{L1BatchNumber, L2ChainId, StorageLog}; use crate::{ @@ -37,13 +37,13 @@ impl ProtectiveReadsWriter { let output_handler_factory = ProtectiveReadsOutputHandlerFactory { pool: pool.clone() }; let (output_handler_factory, output_handler_factory_task) = ConcurrentOutputHandlerFactory::new(pool.clone(), io.clone(), output_handler_factory); - let batch_processor = BatchExecutor::new(false, false); + let batch_processor = MainBatchExecutor::new(false, false); let vm_runner = VmRunner::new( pool, Box::new(io), Arc::new(loader), Box::new(output_handler_factory), - batch_processor, + Box::new(batch_processor), ); Ok(( Self { vm_runner }, diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index acd66fc6c121..e84ec76d0726 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -29,7 +29,7 @@ pub struct VmRunner { io: Box, loader: Arc, output_handler_factory: Box, - batch_processor: BatchExecutor, + batch_processor: Box, } impl VmRunner { @@ -44,7 +44,7 @@ impl VmRunner { io: Box, loader: Arc, output_handler_factory: Box, - batch_processor: BatchExecutor, + batch_processor: Box, ) -> Self { Self { pool, diff --git a/core/node/vm_runner/src/tests/process.rs b/core/node/vm_runner/src/tests/process.rs index fc734e74d02c..664bdeebf855 100644 --- a/core/node/vm_runner/src/tests/process.rs +++ b/core/node/vm_runner/src/tests/process.rs @@ -4,7 +4,7 @@ use tempfile::TempDir; use tokio::sync::{watch, RwLock}; use zksync_dal::{ConnectionPool, Core}; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; -use zksync_state_keeper::BatchExecutor; +use zksync_state_keeper::MainBatchExecutor; use zksync_test_account::Account; use zksync_types::L2ChainId; @@ -65,13 +65,13 @@ async fn process_one_batch() -> anyhow::Result<()> { tokio::task::spawn(async move { task.run(output_stop_receiver).await.unwrap() }); let storage = Arc::new(storage); - let batch_executor = BatchExecutor::new(false, false); + let batch_executor = MainBatchExecutor::new(false, false); let vm_runner = VmRunner::new( connection_pool, Box::new(io.clone()), storage, Box::new(output_factory), - batch_executor, + Box::new(batch_executor), ); tokio::task::spawn(async move { vm_runner.run(&stop_receiver).await.unwrap() });