diff --git a/.github/workflows/protobuf.yaml b/.github/workflows/protobuf.yaml index 0b17cb74c008..bc8da1a3a347 100644 --- a/.github/workflows/protobuf.yaml +++ b/.github/workflows/protobuf.yaml @@ -73,4 +73,6 @@ jobs: with: github_token: ${{ github.token }} - name: buf breaking - run: buf breaking './after.binpb' --against './before.binpb' --config '{"version":"v1","breaking":{"use":["WIRE_JSON","WIRE"]}}' --error-format 'github-actions' + run: > + buf breaking './after.binpb' --against './before.binpb' --exclude-path 'zksync/config/experimental.proto' + --config '{"version":"v1","breaking":{"use":["WIRE_JSON","WIRE"]}}' --error-format 'github-actions' diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index ec2ecdd68b4b..c9ed4e5749bc 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -400,19 +400,6 @@ pub(crate) struct OptionalENConfig { pruning_data_retention_sec: u64, } -#[derive(Debug, Clone, PartialEq, Deserialize)] -pub struct ApiComponentConfig { - /// Address of the tree API used by this EN in case it does not have a - /// local tree component running and in this case needs to send requests - /// to some external tree API. - pub tree_api_remote_url: Option, -} - -#[derive(Debug, Clone, PartialEq, Deserialize)] -pub struct TreeComponentConfig { - pub api_port: Option, -} - impl OptionalENConfig { const fn default_filters_limit() -> usize { 10_000 @@ -725,8 +712,41 @@ impl PostgresConfig { } } +/// Experimental part of the external node config. All parameters in this group can change or disappear without notice. +/// Eventually, parameters from this group generally end up in the optional group. +#[derive(Debug, Clone, Deserialize)] +pub(crate) struct ExperimentalENConfig { + // State keeper cache config + /// Block cache capacity of the state keeper RocksDB cache. The default value is 128 MB. + #[serde(default = "ExperimentalENConfig::default_state_keeper_db_block_cache_capacity_mb")] + state_keeper_db_block_cache_capacity_mb: usize, + /// Maximum number of files concurrently opened by state keeper cache RocksDB. Useful to fit into OS limits; can be used + /// as a rudimentary way to control RAM usage of the cache. + pub state_keeper_db_max_open_files: Option, +} + +impl ExperimentalENConfig { + const fn default_state_keeper_db_block_cache_capacity_mb() -> usize { + 128 + } + + #[cfg(test)] + fn mock() -> Self { + Self { + state_keeper_db_block_cache_capacity_mb: + Self::default_state_keeper_db_block_cache_capacity_mb(), + state_keeper_db_max_open_files: None, + } + } + + /// Returns the size of block cache for the state keeper RocksDB cache in bytes. + pub fn state_keeper_db_block_cache_capacity(&self) -> usize { + self.state_keeper_db_block_cache_capacity_mb * BYTES_IN_MEGABYTE + } +} + pub(crate) fn read_consensus_secrets() -> anyhow::Result> { - let Ok(path) = std::env::var("EN_CONSENSUS_SECRETS_PATH") else { + let Ok(path) = env::var("EN_CONSENSUS_SECRETS_PATH") else { return Ok(None); }; let cfg = std::fs::read_to_string(&path).context(path)?; @@ -736,7 +756,7 @@ pub(crate) fn read_consensus_secrets() -> anyhow::Result anyhow::Result> { - let Ok(path) = std::env::var("EN_CONSENSUS_CONFIG_PATH") else { + let Ok(path) = env::var("EN_CONSENSUS_CONFIG_PATH") else { return Ok(None); }; let cfg = std::fs::read_to_string(&path).context(path)?; @@ -745,20 +765,34 @@ pub(crate) fn read_consensus_config() -> anyhow::Result> )) } -/// Configuration for snapshot recovery. Loaded optionally, only if the corresponding command-line argument -/// is supplied to the EN binary. -#[derive(Debug, Clone)] +/// Configuration for snapshot recovery. Loaded optionally, only if snapshot recovery is enabled. +#[derive(Debug)] pub(crate) struct SnapshotsRecoveryConfig { pub snapshots_object_store: ObjectStoreConfig, } -pub(crate) fn read_snapshots_recovery_config() -> anyhow::Result { - let snapshots_object_store = envy::prefixed("EN_SNAPSHOTS_OBJECT_STORE_") - .from_env::() - .context("failed loading snapshot object store config from env variables")?; - Ok(SnapshotsRecoveryConfig { - snapshots_object_store, - }) +impl SnapshotsRecoveryConfig { + pub fn new() -> anyhow::Result { + let snapshots_object_store = envy::prefixed("EN_SNAPSHOTS_OBJECT_STORE_") + .from_env::() + .context("failed loading snapshot object store config from env variables")?; + Ok(Self { + snapshots_object_store, + }) + } +} + +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct ApiComponentConfig { + /// Address of the tree API used by this EN in case it does not have a + /// local tree component running and in this case needs to send requests + /// to some external tree API. + pub tree_api_remote_url: Option, +} + +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct TreeComponentConfig { + pub api_port: Option, } /// External Node Config contains all the configuration required for the EN operation. @@ -769,6 +803,7 @@ pub(crate) struct ExternalNodeConfig { pub postgres: PostgresConfig, pub optional: OptionalENConfig, pub remote: RemoteENConfig, + pub experimental: ExperimentalENConfig, pub consensus: Option, pub api_component: ApiComponentConfig, pub tree_component: TreeComponentConfig, @@ -780,19 +815,20 @@ impl ExternalNodeConfig { pub async fn collect() -> anyhow::Result { let required = envy::prefixed("EN_") .from_env::() - .context("could not load external node config")?; - + .context("could not load external node config (required params)")?; let optional = envy::prefixed("EN_") .from_env::() - .context("could not load external node config")?; + .context("could not load external node config (optional params)")?; + let experimental = envy::prefixed("EN_EXPERIMENTAL_") + .from_env::() + .context("could not load external node config (experimental params)")?; let api_component_config = envy::prefixed("EN_API_") .from_env::() - .context("could not load external node config")?; - + .context("could not load external node config (API component params)")?; let tree_component_config = envy::prefixed("EN_TREE_") .from_env::() - .context("could not load external node config")?; + .context("could not load external node config (tree component params)")?; let client = L2Client::http(&required.main_node_url()?) .context("Unable to build HTTP client for main node")? @@ -844,6 +880,7 @@ impl ExternalNodeConfig { postgres, required, optional, + experimental, consensus: read_consensus_config().context("read_consensus_config()")?, tree_component: tree_component_config, api_component: api_component_config, @@ -857,6 +894,7 @@ impl ExternalNodeConfig { postgres: PostgresConfig::mock(test_pool), optional: OptionalENConfig::mock(), remote: RemoteENConfig::mock(), + experimental: ExperimentalENConfig::mock(), consensus: None, api_component: ApiComponentConfig { tree_api_remote_url: None, diff --git a/core/bin/external_node/src/config/tests.rs b/core/bin/external_node/src/config/tests.rs index 1d74201d8d81..89c2f86d73ca 100644 --- a/core/bin/external_node/src/config/tests.rs +++ b/core/bin/external_node/src/config/tests.rs @@ -107,3 +107,30 @@ fn parsing_optional_config_from_env() { L1BatchCommitDataGeneratorMode::Validium ); } + +#[test] +fn parsing_experimental_config_from_empty_env() { + let config: ExperimentalENConfig = envy::prefixed("EN_EXPERIMENTAL_").from_iter([]).unwrap(); + assert_eq!(config.state_keeper_db_block_cache_capacity(), 128 << 20); + assert_eq!(config.state_keeper_db_max_open_files, None); +} + +#[test] +fn parsing_experimental_config_from_env() { + let env_vars = [ + ( + "EN_EXPERIMENTAL_STATE_KEEPER_DB_BLOCK_CACHE_CAPACITY_MB", + "64", + ), + ("EN_EXPERIMENTAL_STATE_KEEPER_DB_MAX_OPEN_FILES", "100"), + ]; + let env_vars = env_vars + .into_iter() + .map(|(name, value)| (name.to_owned(), value.to_owned())); + + let config: ExperimentalENConfig = envy::prefixed("EN_EXPERIMENTAL_") + .from_iter(env_vars) + .unwrap(); + assert_eq!(config.state_keeper_db_block_cache_capacity(), 64 << 20); + assert_eq!(config.state_keeper_db_max_open_files, NonZeroU32::new(100)); +} diff --git a/core/bin/external_node/src/init.rs b/core/bin/external_node/src/init.rs index 95eb2ba5d19c..fb8eee9bce06 100644 --- a/core/bin/external_node/src/init.rs +++ b/core/bin/external_node/src/init.rs @@ -12,7 +12,7 @@ use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS}; use zksync_snapshots_applier::{SnapshotsApplierConfig, SnapshotsApplierTask}; use zksync_web3_decl::client::BoxedL2Client; -use crate::config::read_snapshots_recovery_config; +use crate::config::SnapshotsRecoveryConfig; #[derive(Debug)] enum InitDecision { @@ -85,7 +85,7 @@ pub(crate) async fn ensure_storage_initialized( ); tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk"); - let recovery_config = read_snapshots_recovery_config()?; + let recovery_config = SnapshotsRecoveryConfig::new()?; let blob_store = ObjectStoreFactory::new(recovery_config.snapshots_object_store) .create_store() .await; diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index f64de55f6d99..7034d99e9304 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -48,7 +48,7 @@ use zksync_db_connection::{ }; use zksync_eth_client::{clients::QueryClient, EthInterface}; use zksync_health_check::{AppHealthCheck, HealthStatus, ReactiveHealthCheck}; -use zksync_state::PostgresStorageCaches; +use zksync_state::{PostgresStorageCaches, RocksdbStorageOptions}; use zksync_storage::RocksDB; use zksync_types::L2ChainId; use zksync_utils::wait_for_tasks::ManagedTasks; @@ -85,15 +85,19 @@ async fn build_state_keeper( output_handler: OutputHandler, stop_receiver: watch::Receiver, chain_id: L2ChainId, - task_handles: &mut Vec>>, + task_handles: &mut Vec>>, ) -> anyhow::Result { // We only need call traces on the external node if the `debug_` namespace is enabled. let save_call_traces = config.optional.api_namespaces().contains(&Namespace::Debug); + let cache_options = RocksdbStorageOptions { + block_cache_capacity: config.experimental.state_keeper_db_block_cache_capacity(), + max_open_files: config.experimental.state_keeper_db_max_open_files, + }; let (storage_factory, task) = - AsyncRocksdbCache::new(connection_pool.clone(), state_keeper_db_path); + AsyncRocksdbCache::new(connection_pool.clone(), state_keeper_db_path, cache_options); let mut stop_receiver_clone = stop_receiver.clone(); - task_handles.push(tokio::task::spawn(async move { + task_handles.push(tokio::spawn(async move { let result = task.run(stop_receiver_clone.clone()).await; stop_receiver_clone.changed().await?; result diff --git a/core/lib/config/src/configs/database.rs b/core/lib/config/src/configs/database.rs index ba36d7085957..7c903878c5d2 100644 --- a/core/lib/config/src/configs/database.rs +++ b/core/lib/config/src/configs/database.rs @@ -3,6 +3,8 @@ use std::time::Duration; use anyhow::Context as _; use serde::{Deserialize, Serialize}; +use crate::configs::ExperimentalDBConfig; + /// Mode of operation for the Merkle tree. /// /// The mode does not influence how tree data is stored; i.e., a mode can be switched on the fly. @@ -110,8 +112,11 @@ pub struct DBConfig { /// Merkle tree configuration. #[serde(skip)] // ^ Filled in separately in `Self::from_env()`. We cannot use `serde(flatten)` because it - // doesn't work with 'envy`. + // doesn't work with `envy`. pub merkle_tree: MerkleTreeConfig, + /// Experimental parts of the config. + #[serde(skip)] // same reasoning as for `merkle_tree` + pub experimental: ExperimentalDBConfig, } impl DBConfig { diff --git a/core/lib/config/src/configs/experimental.rs b/core/lib/config/src/configs/experimental.rs new file mode 100644 index 000000000000..ad0ef5a4d5b8 --- /dev/null +++ b/core/lib/config/src/configs/experimental.rs @@ -0,0 +1,35 @@ +//! Experimental part of configuration. + +use std::num::NonZeroU32; + +use serde::Deserialize; + +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct ExperimentalDBConfig { + /// Block cache capacity of the state keeper RocksDB cache. The default value is 128 MB. + #[serde(default = "ExperimentalDBConfig::default_state_keeper_db_block_cache_capacity_mb")] + pub state_keeper_db_block_cache_capacity_mb: usize, + /// Maximum number of files concurrently opened by state keeper cache RocksDB. Useful to fit into OS limits; can be used + /// as a rudimentary way to control RAM usage of the cache. + pub state_keeper_db_max_open_files: Option, +} + +impl Default for ExperimentalDBConfig { + fn default() -> Self { + Self { + state_keeper_db_block_cache_capacity_mb: + Self::default_state_keeper_db_block_cache_capacity_mb(), + state_keeper_db_max_open_files: None, + } + } +} + +impl ExperimentalDBConfig { + const fn default_state_keeper_db_block_cache_capacity_mb() -> usize { + 128 + } + + pub fn state_keeper_db_block_cache_capacity(&self) -> usize { + self.state_keeper_db_block_cache_capacity_mb * super::BYTES_IN_MEGABYTE + } +} diff --git a/core/lib/config/src/configs/mod.rs b/core/lib/config/src/configs/mod.rs index 7a59a68c596c..2460fa04f920 100644 --- a/core/lib/config/src/configs/mod.rs +++ b/core/lib/config/src/configs/mod.rs @@ -6,6 +6,7 @@ pub use self::{ database::{DBConfig, PostgresConfig}, eth_sender::{EthConfig, GasAdjusterConfig}, eth_watch::EthWatchConfig, + experimental::ExperimentalDBConfig, fri_proof_compressor::FriProofCompressorConfig, fri_prover::FriProverConfig, fri_prover_gateway::FriProverGatewayConfig, @@ -28,6 +29,7 @@ pub mod contracts; pub mod database; pub mod eth_sender; pub mod eth_watch; +mod experimental; pub mod fri_proof_compressor; pub mod fri_prover; pub mod fri_prover_gateway; diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index fee02d662d7d..2df156811f5f 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -284,11 +284,21 @@ impl Distribution for EncodeDist { } } +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> configs::ExperimentalDBConfig { + configs::ExperimentalDBConfig { + state_keeper_db_block_cache_capacity_mb: self.sample(rng), + state_keeper_db_max_open_files: self.sample(rng), + } + } +} + impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> configs::database::DBConfig { configs::database::DBConfig { state_keeper_db_path: self.sample(rng), merkle_tree: self.sample(rng), + experimental: self.sample(rng), } } } diff --git a/core/lib/env_config/src/database.rs b/core/lib/env_config/src/database.rs index 448eadde3ad3..cab275bb860f 100644 --- a/core/lib/env_config/src/database.rs +++ b/core/lib/env_config/src/database.rs @@ -23,6 +23,7 @@ impl FromEnv for DBConfig { fn from_env() -> anyhow::Result { Ok(Self { merkle_tree: envy_load("database_merkle_tree", "DATABASE_MERKLE_TREE_")?, + experimental: envy_load("database_experimental", "DATABASE_EXPERIMENTAL_")?, ..envy_load("database", "DATABASE_")? }) } @@ -65,7 +66,7 @@ impl FromEnv for PostgresConfig { #[cfg(test)] mod tests { - use std::time::Duration; + use std::{num::NonZeroU32, time::Duration}; use zksync_config::configs::database::MerkleTreeMode; @@ -85,6 +86,8 @@ mod tests { DATABASE_MERKLE_TREE_MEMTABLE_CAPACITY_MB=512 DATABASE_MERKLE_TREE_STALLED_WRITES_TIMEOUT_SEC=60 DATABASE_MERKLE_TREE_MAX_L1_BATCHES_PER_ITER=50 + DATABASE_EXPERIMENTAL_STATE_KEEPER_DB_BLOCK_CACHE_CAPACITY_MB=64 + DATABASE_EXPERIMENTAL_STATE_KEEPER_DB_MAX_OPEN_FILES=100 "#; lock.set_env(config); @@ -96,6 +99,16 @@ mod tests { assert_eq!(db_config.merkle_tree.max_l1_batches_per_iter, 50); assert_eq!(db_config.merkle_tree.memtable_capacity_mb, 512); assert_eq!(db_config.merkle_tree.stalled_writes_timeout_sec, 60); + assert_eq!( + db_config + .experimental + .state_keeper_db_block_cache_capacity_mb, + 64 + ); + assert_eq!( + db_config.experimental.state_keeper_db_max_open_files, + NonZeroU32::new(100) + ); } #[test] @@ -103,6 +116,8 @@ mod tests { let mut lock = MUTEX.lock(); lock.remove_env(&[ "DATABASE_STATE_KEEPER_DB_PATH", + "DATABASE_EXPERIMENTAL_STATE_KEEPER_DB_MAX_OPEN_FILES", + "DATABASE_EXPERIMENTAL_STATE_KEEPER_DB_BLOCK_CACHE_CAPACITY_MB", "DATABASE_MERKLE_TREE_BACKUP_PATH", "DATABASE_MERKLE_TREE_PATH", "DATABASE_MERKLE_TREE_MODE", @@ -122,6 +137,13 @@ mod tests { assert_eq!(db_config.merkle_tree.block_cache_size_mb, 128); assert_eq!(db_config.merkle_tree.memtable_capacity_mb, 256); assert_eq!(db_config.merkle_tree.stalled_writes_timeout_sec, 30); + assert_eq!( + db_config + .experimental + .state_keeper_db_block_cache_capacity_mb, + 128 + ); + assert_eq!(db_config.experimental.state_keeper_db_max_open_files, None); // Check that new env variable for Merkle tree path is supported lock.set_env("DATABASE_MERKLE_TREE_PATH=/db/tree/main"); diff --git a/core/lib/protobuf_config/src/database.rs b/core/lib/protobuf_config/src/database.rs index 0489c43512b7..e537e544f2d0 100644 --- a/core/lib/protobuf_config/src/database.rs +++ b/core/lib/protobuf_config/src/database.rs @@ -66,12 +66,20 @@ impl ProtoRepr for proto::MerkleTree { impl ProtoRepr for proto::Db { type Type = configs::database::DBConfig; + fn read(&self) -> anyhow::Result { Ok(Self::Type { state_keeper_db_path: required(&self.state_keeper_db_path) .context("state_keeper_db_path")? .clone(), merkle_tree: read_required_repr(&self.merkle_tree).context("merkle_tree")?, + experimental: self + .experimental + .as_ref() + .map(ProtoRepr::read) + .transpose() + .context("experimental")? + .unwrap_or_default(), }) } @@ -79,6 +87,7 @@ impl ProtoRepr for proto::Db { Self { state_keeper_db_path: Some(this.state_keeper_db_path.clone()), merkle_tree: Some(ProtoRepr::build(&this.merkle_tree)), + experimental: Some(ProtoRepr::build(&this.experimental)), } } } diff --git a/core/lib/protobuf_config/src/experimental.rs b/core/lib/protobuf_config/src/experimental.rs new file mode 100644 index 000000000000..c4fe17aadf43 --- /dev/null +++ b/core/lib/protobuf_config/src/experimental.rs @@ -0,0 +1,39 @@ +use std::num::NonZeroU32; + +use anyhow::Context as _; +use zksync_config::configs; +use zksync_protobuf::{repr::ProtoRepr, required}; + +use crate::proto::experimental as proto; + +impl ProtoRepr for proto::Db { + type Type = configs::ExperimentalDBConfig; + + fn read(&self) -> anyhow::Result { + let state_keeper_db_block_cache_capacity_mb = + required(&self.state_keeper_db_block_cache_capacity_mb) + .and_then(|&capacity| Ok(capacity.try_into()?)) + .context("state_keeper_db_block_cache_capacity_mb")?; + Ok(configs::ExperimentalDBConfig { + state_keeper_db_block_cache_capacity_mb, + state_keeper_db_max_open_files: self + .state_keeper_db_max_open_files + .map(|count| NonZeroU32::new(count).context("cannot be 0")) + .transpose() + .context("state_keeper_db_max_open_files")?, + }) + } + + fn build(this: &Self::Type) -> Self { + Self { + state_keeper_db_block_cache_capacity_mb: Some( + this.state_keeper_db_block_cache_capacity_mb + .try_into() + .expect("state_keeper_db_block_cache_capacity_mb"), + ), + state_keeper_db_max_open_files: this + .state_keeper_db_max_open_files + .map(NonZeroU32::get), + } + } +} diff --git a/core/lib/protobuf_config/src/lib.rs b/core/lib/protobuf_config/src/lib.rs index 21cbcba283e9..f3d1614c98f8 100644 --- a/core/lib/protobuf_config/src/lib.rs +++ b/core/lib/protobuf_config/src/lib.rs @@ -12,6 +12,7 @@ mod contract_verifier; mod contracts; mod database; mod eth; +mod experimental; mod general; mod genesis; mod house_keeper; diff --git a/core/lib/protobuf_config/src/proto/config/database.proto b/core/lib/protobuf_config/src/proto/config/database.proto index 2e4821c654a0..1154bfc1846a 100644 --- a/core/lib/protobuf_config/src/proto/config/database.proto +++ b/core/lib/protobuf_config/src/proto/config/database.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package zksync.config.database; +import "zksync/config/experimental.proto"; + enum MerkleTreeMode { FULL = 0; LIGHTWEIGHT = 1; @@ -20,6 +22,7 @@ message MerkleTree { message DB { optional string state_keeper_db_path = 1; // optional; fs path optional MerkleTree merkle_tree = 2; // optional + optional experimental.DB experimental = 3; // optional } message Postgres { diff --git a/core/lib/protobuf_config/src/proto/config/experimental.proto b/core/lib/protobuf_config/src/proto/config/experimental.proto new file mode 100644 index 000000000000..4f456b9aca39 --- /dev/null +++ b/core/lib/protobuf_config/src/proto/config/experimental.proto @@ -0,0 +1,11 @@ +// Experimental configuration types. Unlike other types, experimental types are unstable and do not undergo Protobuf compatibility checks in CI. + +syntax = "proto3"; + +package zksync.config.experimental; + +// Experimental part of the database configuration. +message DB { + optional uint64 state_keeper_db_block_cache_capacity_mb = 1; // MB; required + optional uint32 state_keeper_db_max_open_files = 2; // optional +} diff --git a/core/lib/state/src/catchup.rs b/core/lib/state/src/catchup.rs index 60c1b03f974e..4adf7547a301 100644 --- a/core/lib/state/src/catchup.rs +++ b/core/lib/state/src/catchup.rs @@ -8,7 +8,7 @@ use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS}; use zksync_storage::RocksDB; use zksync_types::L1BatchNumber; -use crate::{RocksdbStorage, StateKeeperColumnFamily}; +use crate::{RocksdbStorage, RocksdbStorageOptions, StateKeeperColumnFamily}; /// A runnable task that blocks until the provided RocksDB cache instance is caught up with /// Postgres. @@ -18,6 +18,7 @@ use crate::{RocksdbStorage, StateKeeperColumnFamily}; pub struct AsyncCatchupTask { pool: ConnectionPool, state_keeper_db_path: String, + state_keeper_db_options: RocksdbStorageOptions, rocksdb_cell: Arc>>, to_l1_batch_number: Option, } @@ -28,12 +29,14 @@ impl AsyncCatchupTask { pub fn new( pool: ConnectionPool, state_keeper_db_path: String, + state_keeper_db_options: RocksdbStorageOptions, rocksdb_cell: Arc>>, to_l1_batch_number: Option, ) -> Self { Self { pool, state_keeper_db_path, + state_keeper_db_options, rocksdb_cell, to_l1_batch_number, } @@ -48,10 +51,14 @@ impl AsyncCatchupTask { let started_at = Instant::now(); tracing::debug!("Catching up RocksDB asynchronously"); - let mut rocksdb_builder = RocksdbStorage::builder(self.state_keeper_db_path.as_ref()) - .await - .context("Failed creating RocksDB storage builder")?; - let mut connection = self.pool.connection().await?; + let mut rocksdb_builder = RocksdbStorage::builder_with_options( + self.state_keeper_db_path.as_ref(), + self.state_keeper_db_options, + ) + .await + .context("Failed creating RocksDB storage builder")?; + + let mut connection = self.pool.connection_tagged("state_keeper").await?; let was_recovered_from_snapshot = rocksdb_builder .ensure_ready(&mut connection, &stop_receiver) .await diff --git a/core/lib/state/src/lib.rs b/core/lib/state/src/lib.rs index 00e0e3abb4bd..0621e2b33a31 100644 --- a/core/lib/state/src/lib.rs +++ b/core/lib/state/src/lib.rs @@ -36,7 +36,9 @@ pub use self::{ // Note, that `test_infra` of the bootloader tests relies on this value to be exposed in_memory::IN_MEMORY_STORAGE_DEFAULT_NETWORK_ID, postgres::{PostgresStorage, PostgresStorageCaches, PostgresStorageCachesTask}, - rocksdb::{RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily}, + rocksdb::{ + RocksdbStorage, RocksdbStorageBuilder, RocksdbStorageOptions, StateKeeperColumnFamily, + }, shadow_storage::ShadowStorage, storage_factory::{BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbWithMemory}, storage_view::{StorageView, StorageViewMetrics}, diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index 83dab6a8dc94..bda416cb433b 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -22,6 +22,7 @@ use std::{ collections::HashMap, convert::TryInto, mem, + num::NonZeroU32, path::{Path, PathBuf}, time::Instant, }; @@ -30,7 +31,7 @@ use anyhow::Context as _; use itertools::{Either, Itertools}; use tokio::sync::watch; use zksync_dal::{Connection, Core, CoreDal, DalError}; -use zksync_storage::{db::NamedColumnFamily, RocksDB}; +use zksync_storage::{db::NamedColumnFamily, RocksDB, RocksDBOptions}; use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256}; #[cfg(test)] @@ -124,6 +125,35 @@ impl From for RocksdbSyncError { } } +/// Options for [`RocksdbStorage`]. +#[derive(Debug)] +pub struct RocksdbStorageOptions { + /// Size of the RocksDB block cache in bytes. The default value is 128 MiB. + pub block_cache_capacity: usize, + /// Number of open files that can be simultaneously opened by RocksDB. Default is `None`, for no limit. + /// Can be used to restrict memory usage of RocksDB. + pub max_open_files: Option, +} + +impl Default for RocksdbStorageOptions { + fn default() -> Self { + Self { + block_cache_capacity: 128 << 20, + max_open_files: None, + } + } +} + +impl RocksdbStorageOptions { + fn into_generic(self) -> RocksDBOptions { + RocksDBOptions { + block_cache_capacity: Some(self.block_cache_capacity), + max_open_files: self.max_open_files, + ..RocksDBOptions::default() + } + } +} + /// [`ReadStorage`] implementation backed by RocksDB. #[derive(Debug, Clone)] pub struct RocksdbStorage { @@ -250,15 +280,29 @@ impl RocksdbStorage { /// /// Propagates RocksDB I/O errors. pub async fn builder(path: &Path) -> anyhow::Result { - Self::new(path.to_path_buf()) + Self::builder_with_options(path, RocksdbStorageOptions::default()).await + } + + /// Creates a new storage builder with the provided RocksDB `path` and custom options. + /// + /// # Errors + /// + /// Propagates RocksDB I/O errors. + pub async fn builder_with_options( + path: &Path, + options: RocksdbStorageOptions, + ) -> anyhow::Result { + Self::new(path.to_path_buf(), options) .await .map(RocksdbStorageBuilder) } - async fn new(path: PathBuf) -> anyhow::Result { + async fn new(path: PathBuf, options: RocksdbStorageOptions) -> anyhow::Result { tokio::task::spawn_blocking(move || { + let db = RocksDB::with_options(&path, options.into_generic()) + .context("failed initializing state keeper RocksDB")?; Ok(Self { - db: RocksDB::new(&path).context("failed initializing state keeper RocksDB")?, + db, pending_patch: InMemoryStorage::default(), #[cfg(test)] listener: RocksdbStorageEventListener::default(), diff --git a/core/lib/state/src/rocksdb/tests.rs b/core/lib/state/src/rocksdb/tests.rs index 5c1423500042..a006fcba4750 100644 --- a/core/lib/state/src/rocksdb/tests.rs +++ b/core/lib/state/src/rocksdb/tests.rs @@ -43,7 +43,9 @@ impl Default for RocksdbStorageEventListener { #[tokio::test] async fn rocksdb_storage_basics() { let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); - let mut storage = RocksdbStorage::new(dir.path().into()).await.unwrap(); + let mut storage = RocksdbStorage::new(dir.path().into(), RocksdbStorageOptions::default()) + .await + .unwrap(); let mut storage_logs: HashMap<_, _> = gen_storage_logs(0..20) .into_iter() .map(|log| (log.key, log.value)) @@ -270,7 +272,9 @@ async fn low_level_snapshot_recovery(log_chunk_size: u64) { prepare_postgres_for_snapshot_recovery(&mut conn).await; let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); - let mut storage = RocksdbStorage::new(dir.path().into()).await.unwrap(); + let mut storage = RocksdbStorage::new(dir.path().into(), RocksdbStorageOptions::default()) + .await + .unwrap(); let (_stop_sender, stop_receiver) = watch::channel(false); let (_, next_l1_batch) = storage .ensure_ready(&mut conn, log_chunk_size, &stop_receiver) @@ -399,7 +403,9 @@ async fn recovery_fault_tolerance() { let log_chunk_size = storage_logs.len() as u64 / 5; let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); - let mut storage = RocksdbStorage::new(dir.path().into()).await.unwrap(); + let mut storage = RocksdbStorage::new(dir.path().into(), RocksdbStorageOptions::default()) + .await + .unwrap(); let (stop_sender, stop_receiver) = watch::channel(false); let mut synced_chunk_count = 0_u64; storage.listener.on_logs_chunk_recovered = Arc::new(RwLock::new(move |chunk_id| { @@ -419,7 +425,9 @@ async fn recovery_fault_tolerance() { // Resume recovery and check that no chunks are recovered twice. let (_stop_sender, stop_receiver) = watch::channel(false); - let mut storage = RocksdbStorage::new(dir.path().into()).await.unwrap(); + let mut storage = RocksdbStorage::new(dir.path().into(), RocksdbStorageOptions::default()) + .await + .unwrap(); storage.listener.on_logs_chunk_recovered = Arc::new(RwLock::new(|chunk_id| { assert!(chunk_id >= 2); })); diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index ba62647985c2..58145c48facd 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -62,7 +62,7 @@ use zksync_house_keeper::{ use zksync_object_store::{ObjectStore, ObjectStoreFactory}; use zksync_queued_job_processor::JobProcessor; use zksync_shared_metrics::{InitStage, APP_METRICS}; -use zksync_state::PostgresStorageCaches; +use zksync_state::{PostgresStorageCaches, RocksdbStorageOptions}; use zksync_types::{ethabi::Contract, fee_model::FeeModelConfig, Address, L2ChainId}; use crate::{ @@ -865,8 +865,17 @@ async fn add_state_keeper_to_task_futures( .build() .await .context("failed to build async_cache_pool")?; - let (async_cache, async_catchup_task) = - AsyncRocksdbCache::new(async_cache_pool, db_config.state_keeper_db_path.clone()); + let cache_options = RocksdbStorageOptions { + block_cache_capacity: db_config + .experimental + .state_keeper_db_block_cache_capacity(), + max_open_files: db_config.experimental.state_keeper_db_max_open_files, + }; + let (async_cache, async_catchup_task) = AsyncRocksdbCache::new( + async_cache_pool, + db_config.state_keeper_db_path.clone(), + cache_options, + ); let state_keeper = create_state_keeper( state_keeper_config, state_keeper_wallets, diff --git a/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs b/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs index b70e9dedfeb8..d3c230724808 100644 --- a/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs +++ b/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs @@ -12,7 +12,7 @@ use tokio::{sync::watch, task::JoinHandle}; use zksync_config::configs::chain::StateKeeperConfig; use zksync_contracts::{get_loadnext_contract, test_contracts::LoadnextContractExecutionParams}; use zksync_dal::{ConnectionPool, Core, CoreDal}; -use zksync_state::ReadStorageFactory; +use zksync_state::{ReadStorageFactory, RocksdbStorageOptions}; use zksync_test_account::{Account, DeployContractsTx, TxType}; use zksync_types::{ block::L2BlockHasher, ethabi::Token, fee::Fee, snapshots::SnapshotRecoveryStatus, @@ -101,8 +101,11 @@ impl Tester { match storage_type { StorageType::AsyncRocksdbCache => { let (l1_batch_env, system_env) = self.default_batch_params(); - let (state_keeper_storage, task) = - AsyncRocksdbCache::new(self.pool(), self.state_keeper_db_path()); + let (state_keeper_storage, task) = AsyncRocksdbCache::new( + self.pool(), + self.state_keeper_db_path(), + RocksdbStorageOptions::default(), + ); let handle = tokio::task::spawn(async move { let (_stop_sender, stop_receiver) = watch::channel(false); task.run(stop_receiver).await.unwrap() @@ -156,8 +159,11 @@ impl Tester { &mut self, snapshot: &SnapshotRecoveryStatus, ) -> BatchExecutorHandle { - let (storage_factory, task) = - AsyncRocksdbCache::new(self.pool(), self.state_keeper_db_path()); + let (storage_factory, task) = AsyncRocksdbCache::new( + self.pool(), + self.state_keeper_db_path(), + RocksdbStorageOptions::default(), + ); let (_, stop_receiver) = watch::channel(false); let handle = tokio::task::spawn(async move { task.run(stop_receiver).await.unwrap() }); self.tasks.push(handle); diff --git a/core/lib/zksync_core/src/state_keeper/state_keeper_storage.rs b/core/lib/zksync_core/src/state_keeper/state_keeper_storage.rs index 114f5644bd99..e0d7d20bf3fa 100644 --- a/core/lib/zksync_core/src/state_keeper/state_keeper_storage.rs +++ b/core/lib/zksync_core/src/state_keeper/state_keeper_storage.rs @@ -6,7 +6,8 @@ use once_cell::sync::OnceCell; use tokio::sync::watch; use zksync_dal::{ConnectionPool, Core}; use zksync_state::{ - AsyncCatchupTask, PgOrRocksdbStorage, ReadStorageFactory, StateKeeperColumnFamily, + AsyncCatchupTask, PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorageOptions, + StateKeeperColumnFamily, }; use zksync_storage::RocksDB; use zksync_types::L1BatchNumber; @@ -54,11 +55,13 @@ impl AsyncRocksdbCache { pub fn new( pool: ConnectionPool, state_keeper_db_path: String, + state_keeper_db_options: RocksdbStorageOptions, ) -> (Self, AsyncCatchupTask) { let rocksdb_cell = Arc::new(OnceCell::new()); let task = AsyncCatchupTask::new( pool.clone(), state_keeper_db_path, + state_keeper_db_options, rocksdb_cell.clone(), None, ); diff --git a/core/lib/zksync_core/src/vm_runner/storage.rs b/core/lib/zksync_core/src/vm_runner/storage.rs index 019d5f6e80b1..d2524d7f0032 100644 --- a/core/lib/zksync_core/src/vm_runner/storage.rs +++ b/core/lib/zksync_core/src/vm_runner/storage.rs @@ -15,7 +15,7 @@ use vm_utils::storage::L1BatchParamsProvider; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_state::{ AsyncCatchupTask, BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorage, - RocksdbStorageBuilder, RocksdbWithMemory, StateKeeperColumnFamily, + RocksdbStorageBuilder, RocksdbStorageOptions, RocksdbWithMemory, StateKeeperColumnFamily, }; use zksync_storage::RocksDB; use zksync_types::{block::L2BlockExecutionData, L1BatchNumber, L2ChainId}; @@ -254,6 +254,7 @@ impl StorageSyncTask { let catchup_task = AsyncCatchupTask::new( pool.clone(), rocksdb_path, + RocksdbStorageOptions::default(), rocksdb_cell.clone(), Some(loader.latest_processed_batch(&mut conn).await?), ); diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs b/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs index b65fe1449df1..d460b0725277 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use zksync_config::{configs::chain::StateKeeperConfig, DBConfig}; use zksync_core::state_keeper::{AsyncRocksdbCache, MainBatchExecutor}; -use zksync_state::AsyncCatchupTask; +use zksync_state::{AsyncCatchupTask, RocksdbStorageOptions}; use crate::{ implementations::resources::{ @@ -39,9 +39,17 @@ impl WiringLayer for MainBatchExecutorLayer { async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { let master_pool = context.get_resource::>().await?; + let cache_options = RocksdbStorageOptions { + block_cache_capacity: self + .db_config + .experimental + .state_keeper_db_block_cache_capacity(), + max_open_files: self.db_config.experimental.state_keeper_db_max_open_files, + }; let (storage_factory, task) = AsyncRocksdbCache::new( master_pool.get_singleton().await?, self.db_config.state_keeper_db_path, + cache_options, ); let builder = MainBatchExecutor::new( Arc::new(storage_factory), diff --git a/etc/env/file_based/general.yaml b/etc/env/file_based/general.yaml index 89d583c3bc56..c04002fe26ce 100644 --- a/etc/env/file_based/general.yaml +++ b/etc/env/file_based/general.yaml @@ -16,6 +16,8 @@ db: max_l1_batches_per_iter: 50 path: "./db/main/tree" mode: FULL + experimental: + state_keeper_db_block_cache_capacity_mb: 128 api: prometheus: