Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Implement TxSink abstraction #1204

Merged
merged 3 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion checks-config/era.dic
Original file line number Diff line number Diff line change
Expand Up @@ -902,4 +902,6 @@ reimplementation
composability
md5
shivini
balancer
balancer
lookups
stateful
31 changes: 24 additions & 7 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use zksync_core::{
api_server::{
execution_sandbox::VmConcurrencyLimiter,
healthcheck::HealthCheckHandle,
tx_sender::{ApiContracts, TxSenderBuilder},
tx_sender::{proxy::TxProxy, ApiContracts, TxSenderBuilder},
web3::{ApiBuilder, Namespace},
},
block_reverter::{BlockReverter, BlockReverterFlags, L1ExecutedBatchesRevert},
Expand Down Expand Up @@ -275,11 +275,22 @@ async fn init_tasks(
let fee_params_fetcher_handle =
tokio::spawn(fee_params_fetcher.clone().run(stop_receiver.clone()));

let (tx_sender, vm_barrier, cache_update_handle) = {
let tx_sender_builder =
TxSenderBuilder::new(config.clone().into(), connection_pool.clone())
.with_main_connection_pool(connection_pool.clone())
.with_tx_proxy(main_node_client);
let (tx_sender, vm_barrier, cache_update_handle, proxy_cache_updater_handle) = {
let tx_proxy = TxProxy::new(main_node_client);
let proxy_cache_updater_pool = singleton_pool_builder
.build()
.await
.context("failed to build a tree_pool")?;
let proxy_cache_updater_handle = tokio::spawn(
tx_proxy
.run_account_nonce_sweeper(proxy_cache_updater_pool.clone(), stop_receiver.clone()),
);

let tx_sender_builder = TxSenderBuilder::new(
config.clone().into(),
connection_pool.clone(),
Arc::new(tx_proxy),
);

if config.optional.transactions_per_sec_limit.is_some() {
tracing::warn!("`transactions_per_sec_limit` option is deprecated and ignored");
Expand Down Expand Up @@ -308,7 +319,12 @@ async fn init_tasks(
storage_caches,
)
.await;
(tx_sender, vm_barrier, cache_update_handle)
(
tx_sender,
vm_barrier,
cache_update_handle,
proxy_cache_updater_handle,
)
};

let http_server_handles =
Expand Down Expand Up @@ -359,6 +375,7 @@ async fn init_tasks(
task_handles.extend(http_server_handles.tasks);
task_handles.extend(ws_server_handles.tasks);
task_handles.extend(cache_update_handle);
task_handles.push(proxy_cache_updater_handle);
task_handles.extend([
sk_handle,
fee_address_migration_handle,
Expand Down
37 changes: 37 additions & 0 deletions core/lib/zksync_core/src/api_server/tx_sender/master_pool_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool};
use zksync_types::{fee::TransactionExecutionMetrics, l2::L2Tx};

use super::{tx_sink::TxSink, SubmitTxError};
use crate::metrics::{TxStage, APP_METRICS};

/// Wrapper for the master DB pool that allows to submit transactions to the mempool.
#[derive(Debug)]
pub struct MasterPoolSink {
master_pool: ConnectionPool,
}

impl MasterPoolSink {
pub fn new(master_pool: ConnectionPool) -> Self {
Self { master_pool }
}
}

#[async_trait::async_trait]
impl TxSink for MasterPoolSink {
async fn submit_tx(
&self,
tx: L2Tx,
execution_metrics: TransactionExecutionMetrics,
) -> Result<L2TxSubmissionResult, SubmitTxError> {
let submission_res_handle = self
.master_pool
.access_storage_tagged("api")
.await?
.transactions_dal()
.insert_transaction_l2(tx, execution_metrics)
.await;

APP_METRICS.processed_txs[&TxStage::Mempool(submission_res_handle)].inc();
Ok(submission_res_handle)
}
}
85 changes: 24 additions & 61 deletions core/lib/zksync_core/src/api_server/tx_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use zksync_types::{
MAX_NEW_FACTORY_DEPS, U256,
};
use zksync_utils::h256_to_u256;
use zksync_web3_decl::jsonrpsee::http_client::HttpClient;

pub(super) use self::{proxy::TxProxy, result::SubmitTxError};
pub(super) use self::result::SubmitTxError;
use self::tx_sink::TxSink;
use crate::{
api_server::{
execution_sandbox::{
Expand All @@ -38,15 +38,16 @@ use crate::{
tx_sender::result::ApiCallResult,
},
fee_model::BatchFeeModelInputProvider,
metrics::{TxStage, APP_METRICS},
state_keeper::seal_criteria::{ConditionalSealer, NoopSealer, SealData},
utils::pending_protocol_version,
};

mod proxy;
pub mod master_pool_sink;
pub mod proxy;
mod result;
#[cfg(test)]
pub(crate) mod tests;
pub mod tx_sink;

#[derive(Debug, Clone)]
pub struct MultiVMBaseSystemContracts {
Expand Down Expand Up @@ -141,21 +142,22 @@ pub struct TxSenderBuilder {
config: TxSenderConfig,
/// Connection pool for read requests.
replica_connection_pool: ConnectionPool,
/// Connection pool for write requests. If not set, `proxy` must be set.
master_connection_pool: Option<ConnectionPool>,
/// Proxy to submit transactions to the network. If not set, `master_connection_pool` must be set.
proxy: Option<TxProxy>,
/// Sink to be used to persist transactions.
tx_sink: Arc<dyn TxSink>,
/// Batch sealer used to check whether transaction can be executed by the sequencer.
sealer: Option<Arc<dyn ConditionalSealer>>,
}

impl TxSenderBuilder {
pub fn new(config: TxSenderConfig, replica_connection_pool: ConnectionPool) -> Self {
pub fn new(
config: TxSenderConfig,
replica_connection_pool: ConnectionPool,
tx_sink: Arc<dyn TxSink>,
) -> Self {
Self {
config,
replica_connection_pool,
master_connection_pool: None,
proxy: None,
tx_sink,
sealer: None,
}
}
Expand All @@ -165,38 +167,22 @@ impl TxSenderBuilder {
self
}

pub fn with_tx_proxy(mut self, client: HttpClient) -> Self {
self.proxy = Some(TxProxy::new(client));
self
}

pub fn with_main_connection_pool(mut self, master_connection_pool: ConnectionPool) -> Self {
self.master_connection_pool = Some(master_connection_pool);
self
}

pub async fn build(
self,
batch_fee_input_provider: Arc<dyn BatchFeeModelInputProvider>,
vm_concurrency_limiter: Arc<VmConcurrencyLimiter>,
api_contracts: ApiContracts,
storage_caches: PostgresStorageCaches,
) -> TxSender {
assert!(
self.master_connection_pool.is_some() || self.proxy.is_some(),
"Either master connection pool or proxy must be set"
);

// Use noop sealer if no sealer was explicitly provided.
let sealer = self.sealer.unwrap_or_else(|| Arc::new(NoopSealer));

TxSender(Arc::new(TxSenderInner {
sender_config: self.config,
master_connection_pool: self.master_connection_pool,
tx_sink: self.tx_sink,
replica_connection_pool: self.replica_connection_pool,
batch_fee_input_provider,
api_contracts,
proxy: self.proxy,
vm_concurrency_limiter,
storage_caches,
sealer,
Expand Down Expand Up @@ -244,13 +230,12 @@ impl TxSenderConfig {

pub struct TxSenderInner {
pub(super) sender_config: TxSenderConfig,
pub master_connection_pool: Option<ConnectionPool>,
/// Sink to be used to persist transactions.
pub tx_sink: Arc<dyn TxSink>,
pub replica_connection_pool: ConnectionPool,
// Used to keep track of gas prices for the fee ticker.
pub batch_fee_input_provider: Arc<dyn BatchFeeModelInputProvider>,
pub(super) api_contracts: ApiContracts,
/// Optional transaction proxy to be used for transaction submission.
pub(super) proxy: Option<TxProxy>,
/// Used to limit the amount of VMs that can be executed simultaneously.
pub(super) vm_concurrency_limiter: Arc<VmConcurrencyLimiter>,
// Caches used in VM execution.
Expand Down Expand Up @@ -348,41 +333,14 @@ impl TxSender {
let stage_started_at = Instant::now();
self.ensure_tx_executable(tx.clone().into(), &execution_output.metrics, true)?;

if let Some(proxy) = &self.0.proxy {
// We're running an external node: we have to proxy the transaction to the main node.
// But before we do that, save the tx to cache in case someone will request it
// Before it reaches the main node.
proxy.save_tx(tx.clone()).await;
proxy.submit_tx(&tx).await?;
// Now, after we are sure that the tx is on the main node, remove it from cache
// since we don't want to store txs that might have been replaced or otherwise removed
// from the mempool.
proxy.forget_tx(tx.hash()).await;
SANDBOX_METRICS.submit_tx[&SubmitTxStage::TxProxy].observe(stage_started_at.elapsed());
APP_METRICS.processed_txs[&TxStage::Proxied].inc();
return Ok(L2TxSubmissionResult::Proxied);
} else {
assert!(
self.0.master_connection_pool.is_some(),
"TxSender is instantiated without both master connection pool and tx proxy"
);
}

let nonce = tx.common_data.nonce.0;
let hash = tx.hash();
let initiator_account = tx.initiator_account();
let submission_res_handle = self
.0
.master_connection_pool
.as_ref()
.unwrap() // Checked above
.access_storage_tagged("api")
.await?
.transactions_dal()
.insert_transaction_l2(tx, execution_output.metrics)
.await;

APP_METRICS.processed_txs[&TxStage::Mempool(submission_res_handle)].inc();
.tx_sink
.submit_tx(tx, execution_output.metrics)
.await?;

match submission_res_handle {
L2TxSubmissionResult::AlreadyExecuted => {
Expand All @@ -399,6 +357,11 @@ impl TxSender {
))
}
L2TxSubmissionResult::Duplicate => Err(SubmitTxError::IncorrectTx(TxDuplication(hash))),
L2TxSubmissionResult::Proxied => {
SANDBOX_METRICS.submit_tx[&SubmitTxStage::TxProxy]
.observe(stage_started_at.elapsed());
Ok(submission_res_handle)
}
_ => {
SANDBOX_METRICS.submit_tx[&SubmitTxStage::DbInsert]
.observe(stage_started_at.elapsed());
Expand Down
Loading
Loading