Skip to content
This repository has been archived by the owner on Aug 28, 2024. It is now read-only.

Commit

Permalink
fix(api): Fix transaction methods for pruned transactions (matter-lab…
Browse files Browse the repository at this point in the history
…s#2168)

## What ❔

Reworks `TxCache` for full nodes to look more like a mempool:

- Retains transactions from it when they are sent to the main node
- Gets rid of querying transactions from the main node in `TxProxy`.

## Why ❔

Right now, two transaction-related methods (`eth_getTransactionByHash`
and `zks_getTransactionDetails`) return data for pruned transactions.
Some other methods (e.g., `eth_getTransactionReceipt`) do not return
data for pruned transactions (i.e., return `null`). This looks
inconsistent; also may be wasteful w.r.t. calls to the main node.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
slowli authored Jun 12, 2024
1 parent 1cb0887 commit 00c4cca
Showing 7 changed files with 681 additions and 119 deletions.
2 changes: 1 addition & 1 deletion core/lib/dal/src/models/storage_transaction.rs
Original file line number Diff line number Diff line change
@@ -397,7 +397,7 @@ impl From<StorageTransactionReceipt> for TransactionReceipt {
}

#[derive(Debug, Clone, sqlx::FromRow)]
pub struct StorageTransactionDetails {
pub(crate) struct StorageTransactionDetails {
pub is_priority: bool,
pub initiator_address: Vec<u8>,
pub gas_limit: Option<BigDecimal>,
76 changes: 75 additions & 1 deletion core/lib/db_connection/src/connection.rs
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ use sqlx::{
use crate::{
connection_pool::ConnectionPool,
error::{DalConnectionError, DalResult},
instrument::InstrumentExt,
metrics::CONNECTION_METRICS,
utils::InternalMarker,
};
@@ -183,6 +184,7 @@ impl<'a, DB: DbMarker> Connection<'a, DB> {
}
}

/// Starts a transaction or a new checkpoint within the current transaction.
pub async fn start_transaction(&mut self) -> DalResult<Connection<'_, DB>> {
let (conn, tags) = self.conn_and_tags();
let inner = ConnectionInner::Transaction {
@@ -198,6 +200,24 @@ impl<'a, DB: DbMarker> Connection<'a, DB> {
})
}

/// Starts building a new transaction with custom settings. Unlike [`Self::start_transaction()`], this method
/// will error if called from a transaction; it is a logical error to change transaction settings in the middle of it.
pub fn transaction_builder(&mut self) -> DalResult<TransactionBuilder<'_, 'a, DB>> {
if let ConnectionInner::Transaction { tags, .. } = &self.inner {
let err = io::Error::new(
io::ErrorKind::Other,
"`Connection::transaction_builder()` can only be invoked outside of a transaction",
);
return Err(
DalConnectionError::start_transaction(sqlx::Error::Io(err), tags.cloned()).into(),
);
}
Ok(TransactionBuilder {
connection: self,
is_readonly: false,
})
}

/// Checks if the `Connection` is currently within database transaction.
pub fn in_transaction(&self) -> bool {
matches!(self.inner, ConnectionInner::Transaction { .. })
@@ -260,9 +280,36 @@ impl<'a, DB: DbMarker> Connection<'a, DB> {
}
}

/// Builder of transactions allowing to configure transaction characteristics (for now, just its readonly status).
#[derive(Debug)]
pub struct TransactionBuilder<'a, 'c, DB: DbMarker> {
connection: &'a mut Connection<'c, DB>,
is_readonly: bool,
}

impl<'a, DB: DbMarker> TransactionBuilder<'a, '_, DB> {
/// Sets the readonly status of the created transaction.
pub fn set_readonly(mut self) -> Self {
self.is_readonly = true;
self
}

/// Builds the transaction with the provided characteristics.
pub async fn build(self) -> DalResult<Connection<'a, DB>> {
let mut transaction = self.connection.start_transaction().await?;
if self.is_readonly {
sqlx::query("SET TRANSACTION READ ONLY")
.instrument("set_transaction_characteristics")
.execute(&mut transaction)
.await?;
}
Ok(transaction)
}
}

#[cfg(test)]
mod tests {
use crate::{connection_pool::ConnectionPool, utils::InternalMarker};
use super::*;

#[tokio::test]
async fn processor_tags_propagate_to_transactions() {
@@ -296,4 +343,31 @@ mod tests {
assert!(traced.is_empty());
}
}

#[tokio::test]
async fn creating_readonly_transaction() {
let pool = ConnectionPool::<InternalMarker>::constrained_test_pool(1).await;
let mut connection = pool.connection().await.unwrap();
let mut readonly_transaction = connection
.transaction_builder()
.unwrap()
.set_readonly()
.build()
.await
.unwrap();
assert!(readonly_transaction.in_transaction());

sqlx::query("SELECT COUNT(*) AS \"count?\" FROM miniblocks")
.instrument("test")
.fetch_optional(&mut readonly_transaction)
.await
.unwrap()
.expect("no row returned");
// Check that it's impossible to execute write statements in the transaction.
sqlx::query("DELETE FROM miniblocks")
.instrument("test")
.execute(&mut readonly_transaction)
.await
.unwrap_err();
}
}
674 changes: 565 additions & 109 deletions core/node/api_server/src/tx_sender/proxy.rs

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions core/node/api_server/src/tx_sender/tx_sink.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use zksync_dal::transactions_dal::L2TxSubmissionResult;
use zksync_dal::{transactions_dal::L2TxSubmissionResult, Connection, Core};
use zksync_types::{
api::{Transaction, TransactionDetails, TransactionId},
fee::TransactionExecutionMetrics,
@@ -42,14 +42,19 @@ pub trait TxSink: std::fmt::Debug + Send + Sync + 'static {

/// Attempts to look up the transaction by its API ID in the sink-specific storage.
/// By default, returns `Ok(None)`.
async fn lookup_tx(&self, _id: TransactionId) -> Result<Option<Transaction>, Web3Error> {
async fn lookup_tx(
&self,
_storage: &mut Connection<'_, Core>,
_id: TransactionId,
) -> Result<Option<Transaction>, Web3Error> {
Ok(None)
}

/// Attempts to look up the transaction details by its hash in the sink-specific storage.
/// By default, returns `Ok(None)`.
async fn lookup_tx_details(
&self,
_storage: &mut Connection<'_, Core>,
_hash: H256,
) -> Result<Option<TransactionDetails>, Web3Error> {
Ok(None)
15 changes: 15 additions & 0 deletions core/node/api_server/src/utils.rs
Original file line number Diff line number Diff line change
@@ -6,6 +6,21 @@ use std::{
time::{Duration, Instant},
};

use zksync_dal::{Connection, Core, DalError};
use zksync_web3_decl::error::Web3Error;

/// Opens a readonly transaction over the specified connection.
pub(crate) async fn open_readonly_transaction<'r>(
conn: &'r mut Connection<'_, Core>,
) -> Result<Connection<'r, Core>, Web3Error> {
let builder = conn.transaction_builder().map_err(DalError::generalize)?;
Ok(builder
.set_readonly()
.build()
.await
.map_err(DalError::generalize)?)
}

/// Allows filtering events (e.g., for logging) so that they are reported no more frequently than with a configurable interval.
///
/// Current implementation uses thread-local vars in order to not rely on mutexes or other cross-thread primitives.
10 changes: 7 additions & 3 deletions core/node/api_server/src/web3/namespaces/eth.rs
Original file line number Diff line number Diff line change
@@ -18,8 +18,9 @@ use zksync_web3_decl::{
types::{Address, Block, Filter, FilterChanges, Log, U64},
};

use crate::web3::{
backend_jsonrpsee::MethodTracer, metrics::API_METRICS, state::RpcState, TypedFilter,
use crate::{
utils::open_readonly_transaction,
web3::{backend_jsonrpsee::MethodTracer, metrics::API_METRICS, state::RpcState, TypedFilter},
};

pub const EVENT_TOPIC_NUMBER_LIMIT: usize = 4;
@@ -463,6 +464,9 @@ impl EthNamespace {
id: TransactionId,
) -> Result<Option<Transaction>, Web3Error> {
let mut storage = self.state.acquire_connection().await?;
// Open a readonly transaction to have a consistent view of Postgres
let mut storage = open_readonly_transaction(&mut storage).await?;

let chain_id = self.state.api_config.l2_chain_id;
let mut transaction = match id {
TransactionId::Hash(hash) => storage
@@ -497,7 +501,7 @@ impl EthNamespace {
};

if transaction.is_none() {
transaction = self.state.tx_sink().lookup_tx(id).await?;
transaction = self.state.tx_sink().lookup_tx(&mut storage, id).await?;
}
Ok(transaction)
}
14 changes: 11 additions & 3 deletions core/node/api_server/src/web3/namespaces/zks.rs
Original file line number Diff line number Diff line change
@@ -29,7 +29,10 @@ use zksync_web3_decl::{
types::{Address, Token, H256},
};

use crate::web3::{backend_jsonrpsee::MethodTracer, metrics::API_METRICS, RpcState};
use crate::{
utils::open_readonly_transaction,
web3::{backend_jsonrpsee::MethodTracer, metrics::API_METRICS, RpcState},
};

#[derive(Debug)]
pub(crate) struct ZksNamespace {
@@ -399,15 +402,20 @@ impl ZksNamespace {
hash: H256,
) -> Result<Option<TransactionDetails>, Web3Error> {
let mut storage = self.state.acquire_connection().await?;
// Open a readonly transaction to have a consistent view of Postgres
let mut storage = open_readonly_transaction(&mut storage).await?;
let mut tx_details = storage
.transactions_web3_dal()
.get_transaction_details(hash)
.await
.map_err(DalError::generalize)?;
drop(storage);

if tx_details.is_none() {
tx_details = self.state.tx_sink().lookup_tx_details(hash).await?;
tx_details = self
.state
.tx_sink()
.lookup_tx_details(&mut storage, hash)
.await?;
}
Ok(tx_details)
}

0 comments on commit 00c4cca

Please sign in to comment.