Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eth-sender): handle transactions for different operators separately to increase throughtput #2341

Merged
merged 10 commits into from
Jun 28, 2024
Merged

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 17 additions & 6 deletions core/lib/dal/src/eth_sender_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ pub struct EthSenderDal<'a, 'c> {
}

impl EthSenderDal<'_, '_> {
pub async fn get_inflight_txs(&mut self) -> sqlx::Result<Vec<EthTx>> {
pub async fn get_inflight_txs(
&mut self,
operator_address: Option<Address>,
) -> sqlx::Result<Vec<EthTx>> {
let txs = sqlx::query_as!(
StorageEthTx,
r#"
Expand All @@ -31,7 +34,8 @@ impl EthSenderDal<'_, '_> {
FROM
eth_txs
WHERE
confirmed_eth_tx_history_id IS NULL
from_addr IS NOT DISTINCT FROM $1 -- can't just use equality as NULL != NULL
AND confirmed_eth_tx_history_id IS NULL
AND id <= (
SELECT
COALESCE(MAX(eth_tx_id), 0)
Expand All @@ -42,7 +46,8 @@ impl EthSenderDal<'_, '_> {
)
ORDER BY
id
"#
"#,
operator_address.as_ref().map(|h160| h160.as_bytes()),
)
.fetch_all(self.storage.conn())
.await?;
Expand Down Expand Up @@ -121,7 +126,11 @@ impl EthSenderDal<'_, '_> {
.map(Into::into))
}

pub async fn get_new_eth_txs(&mut self, limit: u64) -> sqlx::Result<Vec<EthTx>> {
pub async fn get_new_eth_txs(
&mut self,
limit: u64,
operator_address: &Option<Address>,
) -> sqlx::Result<Vec<EthTx>> {
let txs = sqlx::query_as!(
StorageEthTx,
r#"
Expand All @@ -130,7 +139,8 @@ impl EthSenderDal<'_, '_> {
FROM
eth_txs
WHERE
id > (
from_addr IS NOT DISTINCT FROM $2 -- can't just use equality as NULL != NULL
AND id > (
SELECT
COALESCE(MAX(eth_tx_id), 0)
FROM
Expand All @@ -141,7 +151,8 @@ impl EthSenderDal<'_, '_> {
LIMIT
$1
"#,
limit as i64
limit as i64,
operator_address.as_ref().map(|h160| h160.as_bytes()),
)
.fetch_all(self.storage.conn())
.await?;
Expand Down
41 changes: 15 additions & 26 deletions core/node/eth_sender/src/abstract_l1_interface.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt;

use async_trait::async_trait;
use vise::{EncodeLabelSet, EncodeLabelValue};
use zksync_eth_client::{
clients::{DynClient, L1},
BoundEthInterface, EnrichedClientResult, EthInterface, ExecutedTxStatus, FailureInfo, Options,
Expand Down Expand Up @@ -32,6 +33,13 @@ pub(crate) struct L1BlockNumbers {
pub latest: L1BlockNumber,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)]
#[metrics(label = "type", rename_all = "snake_case")]
pub(crate) enum OperatorType {
NonBlob,
Blob,
}

#[async_trait]
pub(super) trait AbstractL1Interface: 'static + Sync + Send + fmt::Debug {
async fn failure_reason(&self, tx_hash: H256) -> Option<FailureInfo>;
Expand All @@ -51,11 +59,7 @@ pub(super) trait AbstractL1Interface: 'static + Sync + Send + fmt::Debug {
async fn get_operator_nonce(
&self,
block_numbers: L1BlockNumbers,
) -> Result<OperatorNonce, EthSenderError>;

async fn get_blobs_operator_nonce(
&self,
block_numbers: L1BlockNumbers,
operator_type: OperatorType,
) -> Result<Option<OperatorNonce>, EthSenderError>;

async fn sign_tx(
Expand Down Expand Up @@ -122,28 +126,13 @@ impl AbstractL1Interface for RealL1Interface {
async fn get_operator_nonce(
&self,
block_numbers: L1BlockNumbers,
) -> Result<OperatorNonce, EthSenderError> {
let finalized = self
.ethereum_gateway()
.nonce_at(block_numbers.finalized.0.into())
.await?
.as_u32()
.into();

let latest = self
.ethereum_gateway()
.nonce_at(block_numbers.latest.0.into())
.await?
.as_u32()
.into();
Ok(OperatorNonce { finalized, latest })
}

async fn get_blobs_operator_nonce(
&self,
block_numbers: L1BlockNumbers,
operator_type: OperatorType,
) -> Result<Option<OperatorNonce>, EthSenderError> {
match &self.ethereum_gateway_blobs() {
let gateway = match operator_type {
OperatorType::NonBlob => Some(self.ethereum_gateway()),
OperatorType::Blob => self.ethereum_gateway_blobs(),
};
match gateway {
None => Ok(None),
Some(gateway) => {
let finalized = gateway
Expand Down
Loading
Loading