Skip to content

Commit

Permalink
feat(core): removes multiple tokio runtimes and worker number setting. (
Browse files Browse the repository at this point in the history
#826)

## What ❔

<!-- What are the changes this PR brings about? -->
<!-- Example: This PR adds a PR template to the repo. -->
<!-- (For bigger PRs adding more context is appreciated) -->

## Why ❔

<!-- Why are these changes done? What goal do they contribute to? What
are the principles behind them? -->
<!-- Example: PR templates ensure PR reviewers, observers, and future
iterators are in context about the evolution of repos. -->

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
- [ ] Spellcheck has been run via `cargo spellcheck
--cfg=./spellcheck/era.cfg --code 1`.
  • Loading branch information
montekki authored Jan 9, 2024
1 parent 997db87 commit b8b190f
Show file tree
Hide file tree
Showing 12 changed files with 27 additions and 105 deletions.
2 changes: 0 additions & 2 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,6 @@ pub struct RequiredENConfig {
pub ws_port: u16,
/// Port on which the healthcheck REST server is listening.
pub healthcheck_port: u16,
/// Number of threads per API server
pub threads_per_server: usize,
/// Address of the Ethereum node API.
/// Intentionally private: use getter method as it manages the missing port.
eth_client_url: String,
Expand Down
10 changes: 1 addition & 9 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ async fn init_tasks(
.with_filter_limit(config.optional.filters_limit)
.with_batch_request_size_limit(config.optional.max_batch_request_size)
.with_response_body_size_limit(config.optional.max_response_body_size())
.with_threads(config.required.threads_per_server)
.with_tx_sender(tx_sender.clone(), vm_barrier.clone())
.with_sync_state(sync_state.clone())
.enable_api_namespaces(config.optional.api_namespaces())
Expand All @@ -294,7 +293,6 @@ async fn init_tasks(
.with_batch_request_size_limit(config.optional.max_batch_request_size)
.with_response_body_size_limit(config.optional.max_response_body_size())
.with_polling_interval(config.optional.polling_interval())
.with_threads(config.required.threads_per_server)
.with_tx_sender(tx_sender, vm_barrier)
.with_sync_state(sync_state)
.enable_api_namespaces(config.optional.api_namespaces())
Expand Down Expand Up @@ -485,17 +483,11 @@ async fn main() -> anyhow::Result<()> {
if let Some(last_correct_batch) = reorg_detector_last_correct_batch {
tracing::info!("Performing rollback to L1 batch #{last_correct_batch}");

let block_reverter_connection_pool =
ConnectionPool::builder(&config.postgres.database_url, 1)
.build()
.await
.context("failed to build a block reverter connection pool")?;

let reverter = BlockReverter::new(
config.required.state_cache_path,
config.required.merkle_tree_path,
None,
block_reverter_connection_pool,
connection_pool,
L1ExecutedBatchesRevert::Allowed,
);
reverter
Expand Down
21 changes: 0 additions & 21 deletions core/lib/config/src/configs/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ pub struct Web3JsonRpcConfig {
pub subscriptions_limit: Option<u32>,
/// Interval between polling db for pubsub (in ms).
pub pubsub_polling_interval: Option<u64>,
/// number of threads per server
pub threads_per_server: u32,
/// Tx nonce: how far ahead from the committed nonce can it be.
pub max_nonce_ahead: u32,
/// The multiplier to use when suggesting gas price. Should be higher than one,
Expand Down Expand Up @@ -69,12 +67,6 @@ pub struct Web3JsonRpcConfig {
/// Latest values cache size in MiBs. The default value is 128 MiB. If set to 0, the latest
/// values cache will be disabled.
pub latest_values_cache_size_mb: Option<usize>,
/// Override value for the amount of threads used for HTTP RPC server.
/// If not set, the value from `threads_per_server` is used.
pub http_threads: Option<u32>,
/// Override value for the amount of threads used for WebSocket RPC server.
/// If not set, the value from `threads_per_server` is used.
pub ws_threads: Option<u32>,
/// Limit for fee history block range.
pub fee_history_limit: Option<u64>,
/// Maximum number of requests in a single batch JSON RPC request. Default is 500.
Expand Down Expand Up @@ -103,7 +95,6 @@ impl Web3JsonRpcConfig {
filters_limit: Some(10000),
subscriptions_limit: Some(10000),
pubsub_polling_interval: Some(200),
threads_per_server: 1,
max_nonce_ahead: 50,
gas_price_scale_factor: 1.2,
request_timeout: Default::default(),
Expand All @@ -116,8 +107,6 @@ impl Web3JsonRpcConfig {
factory_deps_cache_size_mb: Default::default(),
initial_writes_cache_size_mb: Default::default(),
latest_values_cache_size_mb: Default::default(),
http_threads: Default::default(),
ws_threads: Default::default(),
fee_history_limit: Default::default(),
max_batch_request_size: Default::default(),
max_response_body_size_mb: Default::default(),
Expand Down Expand Up @@ -180,14 +169,6 @@ impl Web3JsonRpcConfig {
self.latest_values_cache_size_mb.unwrap_or(128) * super::BYTES_IN_MEGABYTE
}

pub fn http_server_threads(&self) -> usize {
self.http_threads.unwrap_or(self.threads_per_server) as usize
}

pub fn ws_server_threads(&self) -> usize {
self.ws_threads.unwrap_or(self.threads_per_server) as usize
}

pub fn fee_history_limit(&self) -> u64 {
self.fee_history_limit.unwrap_or(1024)
}
Expand Down Expand Up @@ -230,8 +211,6 @@ pub struct ContractVerificationApiConfig {
pub port: u16,
/// URL to access REST server.
pub url: String,
/// number of threads per server
pub threads_per_server: u32,
}

impl ContractVerificationApiConfig {
Expand Down
8 changes: 0 additions & 8 deletions core/lib/env_config/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ mod tests {
filters_limit: Some(10000),
subscriptions_limit: Some(10000),
pubsub_polling_interval: Some(200),
threads_per_server: 128,
max_nonce_ahead: 5,
request_timeout: Some(10),
account_pks: Some(vec![
Expand All @@ -82,8 +81,6 @@ mod tests {
factory_deps_cache_size_mb: Some(128),
initial_writes_cache_size_mb: Some(32),
latest_values_cache_size_mb: Some(256),
http_threads: Some(128),
ws_threads: Some(256),
fee_history_limit: Some(100),
max_batch_request_size: Some(200),
max_response_body_size_mb: Some(10),
Expand All @@ -93,7 +90,6 @@ mod tests {
contract_verification: ContractVerificationApiConfig {
port: 3070,
url: "http://127.0.0.1:3070".into(),
threads_per_server: 128,
},
prometheus: PrometheusConfig {
listener_port: 3312,
Expand All @@ -117,7 +113,6 @@ mod tests {
API_WEB3_JSON_RPC_FILTERS_LIMIT=10000
API_WEB3_JSON_RPC_SUBSCRIPTIONS_LIMIT=10000
API_WEB3_JSON_RPC_PUBSUB_POLLING_INTERVAL=200
API_WEB3_JSON_RPC_THREADS_PER_SERVER=128
API_WEB3_JSON_RPC_MAX_NONCE_AHEAD=5
API_WEB3_JSON_RPC_GAS_PRICE_SCALE_FACTOR=1.2
API_WEB3_JSON_RPC_REQUEST_TIMEOUT=10
Expand All @@ -129,14 +124,11 @@ mod tests {
API_WEB3_JSON_RPC_FACTORY_DEPS_CACHE_SIZE_MB=128
API_WEB3_JSON_RPC_INITIAL_WRITES_CACHE_SIZE_MB=32
API_WEB3_JSON_RPC_LATEST_VALUES_CACHE_SIZE_MB=256
API_WEB3_JSON_RPC_HTTP_THREADS=128
API_WEB3_JSON_RPC_WS_THREADS=256
API_WEB3_JSON_RPC_FEE_HISTORY_LIMIT=100
API_WEB3_JSON_RPC_MAX_BATCH_REQUEST_SIZE=200
API_WEB3_JSON_RPC_WEBSOCKET_REQUESTS_PER_MINUTE_LIMIT=10
API_CONTRACT_VERIFICATION_PORT="3070"
API_CONTRACT_VERIFICATION_URL="http://127.0.0.1:3070"
API_CONTRACT_VERIFICATION_THREADS_PER_SERVER=128
API_WEB3_JSON_RPC_MAX_RESPONSE_BODY_SIZE_MB=10
API_PROMETHEUS_LISTENER_PORT="3312"
API_PROMETHEUS_PUSHGATEWAY_URL="http://127.0.0.1:9091"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod api_decl;
mod api_impl;
mod metrics;

fn start_server(api: RestApi, bind_to: SocketAddr, threads: usize) -> Server {
fn start_server(api: RestApi, bind_to: SocketAddr) -> Server {
HttpServer::new(move || {
let api = api.clone();
App::new()
Expand All @@ -32,7 +32,6 @@ fn start_server(api: RestApi, bind_to: SocketAddr, threads: usize) -> Server {
web::get().to(|| async { HttpResponse::Ok().finish() }),
)
})
.workers(threads)
.bind(bind_to)
.unwrap()
.shutdown_timeout(60)
Expand All @@ -58,10 +57,9 @@ pub fn start_server_thread_detached(

actix_rt::System::new().block_on(async move {
let bind_address = api_config.bind_addr();
let threads = api_config.threads_per_server as usize;
let api = RestApi::new(master_connection_pool, replica_connection_pool);

let server = start_server(api, bind_address, threads);
let server = start_server(api, bind_address);
let close_handle = server.handle();
actix_rt::spawn(async move {
if stop_receiver.changed().await.is_ok() {
Expand Down
54 changes: 16 additions & 38 deletions core/lib/zksync_core/src/api_server/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ struct FullApiParams {
transport: ApiTransport,
tx_sender: TxSender,
vm_barrier: VmConcurrencyBarrier,
threads: usize,
polling_interval: Duration,
namespaces: Vec<Namespace>,
optional: OptionalApiParams,
Expand All @@ -141,7 +140,6 @@ pub struct ApiBuilder {
transport: Option<ApiTransport>,
tx_sender: Option<TxSender>,
vm_barrier: Option<VmConcurrencyBarrier>,
threads: Option<usize>,
// Optional params that may or may not be set using builder methods. We treat `namespaces`
// specially because we want to output a warning if they are not set.
namespaces: Option<Vec<Namespace>>,
Expand All @@ -160,7 +158,6 @@ impl ApiBuilder {
transport: None,
tx_sender: None,
vm_barrier: None,
threads: None,
namespaces: None,
optional: OptionalApiParams::default(),
}
Expand Down Expand Up @@ -224,11 +221,6 @@ impl ApiBuilder {
self
}

pub fn with_threads(mut self, threads: usize) -> Self {
self.threads = Some(threads);
self
}

pub fn with_polling_interval(mut self, polling_interval: Duration) -> Self {
self.polling_interval = polling_interval;
self
Expand Down Expand Up @@ -258,7 +250,6 @@ impl ApiBuilder {
transport: self.transport.context("API transport not set")?,
tx_sender: self.tx_sender.context("Transaction sender not set")?,
vm_barrier: self.vm_barrier.context("VM barrier not set")?,
threads: self.threads.context("Number of server threads not set")?,
polling_interval: self.polling_interval,
namespaces: self.namespaces.unwrap_or_else(|| {
tracing::warn!(
Expand Down Expand Up @@ -399,9 +390,9 @@ impl FullApiParams {
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<ApiServerHandles> {
let transport = self.transport;
let (runtime_thread_name, health_check_name) = match transport {
ApiTransport::Http(_) => ("jsonrpsee-http-worker", "http_api"),
ApiTransport::WebSocket(_) => ("jsonrpsee-ws-worker", "ws_api"),
let health_check_name = match transport {
ApiTransport::Http(_) => "http_api",
ApiTransport::WebSocket(_) => "ws_api",
};
let (health_check, health_updater) = ReactiveHealthCheck::new(health_check_name);
let vm_barrier = self.vm_barrier.clone();
Expand All @@ -419,21 +410,12 @@ impl FullApiParams {
let websocket_requests_per_minute_limit = self.optional.websocket_requests_per_minute_limit;
let subscriptions_limit = self.optional.subscriptions_limit;

let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name(runtime_thread_name)
.worker_threads(self.threads)
.build()
.with_context(|| {
format!("Failed creating Tokio runtime for {health_check_name} jsonrpsee server")
})?;

let mut tasks = vec![];
let mut pubsub = None;
if matches!(transport, ApiTransport::WebSocket(_))
&& self.namespaces.contains(&Namespace::Pubsub)
{
let mut pub_sub = EthSubscribe::new(runtime.handle().clone());
let mut pub_sub = EthSubscribe::new();
if let Some(sender) = &self.optional.pub_sub_events_sender {
pub_sub.set_events_sender(sender.clone());
}
Expand All @@ -449,22 +431,18 @@ impl FullApiParams {
let rpc = self.build_rpc_module(pubsub).await;
// Start the server in a separate tokio runtime from a dedicated thread.
let (local_addr_sender, local_addr) = oneshot::channel();
let server_task = tokio::task::spawn_blocking(move || {
let res = runtime.block_on(Self::run_jsonrpsee_server(
rpc,
transport,
stop_receiver,
local_addr_sender,
health_updater,
vm_barrier,
batch_request_config,
response_body_size_limit,
subscriptions_limit,
websocket_requests_per_minute_limit,
));
runtime.shutdown_timeout(GRACEFUL_SHUTDOWN_TIMEOUT);
res
});
let server_task = tokio::spawn(Self::run_jsonrpsee_server(
rpc,
transport,
stop_receiver,
local_addr_sender,
health_updater,
vm_barrier,
batch_request_config,
response_body_size_limit,
subscriptions_limit,
websocket_requests_per_minute_limit,
));

let local_addr = match local_addr.await {
Ok(addr) => addr,
Expand Down
22 changes: 8 additions & 14 deletions core/lib/zksync_core/src/api_server/web3/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,10 @@ pub(super) struct EthSubscribe {
transactions: broadcast::Sender<Vec<PubSubResult>>,
logs: broadcast::Sender<Vec<PubSubResult>>,
events_sender: Option<mpsc::UnboundedSender<PubSubEvent>>,
handle: tokio::runtime::Handle,
}

impl EthSubscribe {
pub fn new(handle: tokio::runtime::Handle) -> Self {
pub fn new() -> Self {
let (blocks, _) = broadcast::channel(BROADCAST_CHANNEL_CAPACITY);
let (transactions, _) = broadcast::channel(BROADCAST_CHANNEL_CAPACITY);
let (logs, _) = broadcast::channel(BROADCAST_CHANNEL_CAPACITY);
Expand All @@ -214,7 +213,6 @@ impl EthSubscribe {
transactions,
logs,
events_sender: None,
handle,
}
}

Expand Down Expand Up @@ -323,7 +321,7 @@ impl EthSubscribe {
return;
};
let blocks_rx = self.blocks.subscribe();
self.handle.spawn(Self::run_subscriber(
tokio::spawn(Self::run_subscriber(
sink,
SubscriptionType::Blocks,
blocks_rx,
Expand All @@ -337,7 +335,7 @@ impl EthSubscribe {
return;
};
let transactions_rx = self.transactions.subscribe();
self.handle.spawn(Self::run_subscriber(
tokio::spawn(Self::run_subscriber(
sink,
SubscriptionType::Txs,
transactions_rx,
Expand All @@ -357,7 +355,7 @@ impl EthSubscribe {
return;
};
let logs_rx = self.logs.subscribe();
self.handle.spawn(Self::run_subscriber(
tokio::spawn(Self::run_subscriber(
sink,
SubscriptionType::Logs,
logs_rx,
Expand All @@ -371,7 +369,7 @@ impl EthSubscribe {
return;
};

self.handle.spawn(async move {
tokio::spawn(async move {
sink.send_timeout(
SubscriptionMessage::from_json(&PubSubResult::Syncing(false)).unwrap(),
SUBSCRIPTION_SINK_SEND_TIMEOUT,
Expand Down Expand Up @@ -408,9 +406,7 @@ impl EthSubscribe {
polling_interval,
events_sender: self.events_sender.clone(),
};
let notifier_task = self
.handle
.spawn(notifier.notify_blocks(stop_receiver.clone()));
let notifier_task = tokio::spawn(notifier.notify_blocks(stop_receiver.clone()));
notifier_tasks.push(notifier_task);

let notifier = PubSubNotifier {
Expand All @@ -419,9 +415,7 @@ impl EthSubscribe {
polling_interval,
events_sender: self.events_sender.clone(),
};
let notifier_task = self
.handle
.spawn(notifier.notify_txs(stop_receiver.clone()));
let notifier_task = tokio::spawn(notifier.notify_txs(stop_receiver.clone()));
notifier_tasks.push(notifier_task);

let notifier = PubSubNotifier {
Expand All @@ -430,7 +424,7 @@ impl EthSubscribe {
polling_interval,
events_sender: self.events_sender.clone(),
};
let notifier_task = self.handle.spawn(notifier.notify_logs(stop_receiver));
let notifier_task = tokio::spawn(notifier.notify_logs(stop_receiver));

notifier_tasks.push(notifier_task);
notifier_tasks
Expand Down
1 change: 0 additions & 1 deletion core/lib/zksync_core/src/api_server/web3/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ async fn spawn_server(
}
};
let server_handles = server_builder
.with_threads(1)
.with_polling_interval(POLL_INTERVAL)
.with_tx_sender(tx_sender, vm_barrier)
.with_pub_sub_events(pub_sub_events_sender)
Expand Down
Loading

0 comments on commit b8b190f

Please sign in to comment.