Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(katana): pending block provider #2763

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/katana/core/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::utils::get_current_timestamp;
pub(crate) const LOG_TARGET: &str = "katana::core::backend";

#[derive(Debug)]
pub struct Backend<EF: ExecutorFactory> {
pub struct Backend<EF> {
pub chain_spec: Arc<ChainSpec>,
/// stores all block related data in memory
pub blockchain: Blockchain,
Expand Down
261 changes: 260 additions & 1 deletion crates/katana/core/src/service/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,23 @@ use katana_executor::{BlockExecutor, ExecutionResult, ExecutionStats, ExecutorFa
use katana_pool::validation::stateful::TxValidator;
use katana_primitives::block::{BlockHashOrNumber, ExecutableBlock, PartialHeader};
use katana_primitives::da::L1DataAvailabilityMode;
use katana_primitives::env::BlockEnv;
use katana_primitives::receipt::Receipt;
use katana_primitives::trace::TxExecInfo;
use katana_primitives::transaction::{ExecutableTxWithHash, TxHash, TxWithHash};
use katana_provider::error::ProviderError;
use katana_provider::traits::block::{BlockHashProvider, BlockNumberProvider};
use katana_provider::traits::env::BlockEnvProvider;
use katana_provider::traits::state::StateFactoryProvider;
use katana_provider::traits::pending::PendingBlockProvider;
use katana_provider::traits::state::{StateFactoryProvider, StateProvider};
use katana_provider::traits::transaction::{
ReceiptProvider, TransactionProvider, TransactionTraceProvider,
};
use katana_provider::ProviderResult;
use katana_tasks::{BlockingTaskPool, BlockingTaskResult};
use parking_lot::lock_api::RawMutex;
use parking_lot::{Mutex, RwLock};
use starknet::core::types::{TransactionExecutionStatus, TransactionStatus};
use tokio::time::{interval_at, Instant, Interval};
use tracing::{error, info, trace, warn};

Expand Down Expand Up @@ -699,3 +706,255 @@ impl<EF: ExecutorFactory> Stream for InstantBlockProducer<EF> {
Poll::Pending
}
}

impl<EF: ExecutorFactory> PendingBlockProvider for BlockProducer<EF> {
fn pending_block(&self) -> ProviderResult<()> {
match &*self.producer.read() {
BlockProducerMode::Instant(bp) => bp.pending_block(),
BlockProducerMode::Interval(bp) => bp.pending_block(),
}
}

fn pending_transaction(&self, hash: TxHash) -> ProviderResult<Option<TxWithHash>> {
match &*self.producer.read() {
BlockProducerMode::Instant(bp) => bp.pending_transaction(hash),
BlockProducerMode::Interval(bp) => bp.pending_transaction(hash),
}
}

fn pending_receipt(&self, hash: TxHash) -> ProviderResult<Option<Receipt>> {
match &*self.producer.read() {
BlockProducerMode::Instant(bp) => bp.pending_receipt(hash),
BlockProducerMode::Interval(bp) => bp.pending_receipt(hash),
}
}

fn pending_block_env(&self) -> ProviderResult<BlockEnv> {
match &*self.producer.read() {
BlockProducerMode::Instant(bp) => bp.pending_block_env(),
BlockProducerMode::Interval(bp) => bp.pending_block_env(),
}
}

fn pending_state(&self) -> ProviderResult<Box<dyn StateProvider>> {
match &*self.producer.read() {
BlockProducerMode::Instant(bp) => bp.pending_state(),
BlockProducerMode::Interval(bp) => bp.pending_state(),
}
}

fn pending_transaction_status(
&self,
hash: TxHash,
) -> ProviderResult<Option<TransactionStatus>> {
match &*self.producer.read() {
BlockProducerMode::Instant(bp) => bp.pending_transaction_status(hash),
BlockProducerMode::Interval(bp) => bp.pending_transaction_status(hash),
}
}

fn pending_transactions(&self) -> ProviderResult<Vec<TxWithHash>> {
match &*self.producer.read() {
BlockProducerMode::Instant(bp) => bp.pending_transactions(),
BlockProducerMode::Interval(bp) => bp.pending_transactions(),
}
}

fn pending_receipts(&self) -> ProviderResult<Vec<Receipt>> {
match &*self.producer.read() {
BlockProducerMode::Instant(bp) => bp.pending_receipts(),
BlockProducerMode::Interval(bp) => bp.pending_receipts(),
}
}

fn pending_transaction_trace(&self, hash: TxHash) -> ProviderResult<Option<TxExecInfo>> {
match &*self.producer.read() {
BlockProducerMode::Instant(bp) => bp.pending_transaction_trace(hash),
BlockProducerMode::Interval(bp) => bp.pending_transaction_trace(hash),
}
}

fn pending_transaction_traces(&self) -> ProviderResult<Vec<TxExecInfo>> {
match &*self.producer.read() {
BlockProducerMode::Instant(bp) => bp.pending_transaction_traces(),
BlockProducerMode::Interval(bp) => bp.pending_transaction_traces(),
}
}
}

impl<EF: ExecutorFactory> PendingBlockProvider for InstantBlockProducer<EF> {
fn pending_transaction(&self, hash: TxHash) -> ProviderResult<Option<TxWithHash>> {
self.backend.blockchain.provider().transaction_by_hash(hash)
}

fn pending_receipt(&self, hash: TxHash) -> ProviderResult<Option<Receipt>> {
self.backend.blockchain.provider().receipt_by_hash(hash)
}

fn pending_block_env(&self) -> ProviderResult<BlockEnv> {
// In instant mining mode, we don't have a notion of 'pending' block as a block is mined
// instantly when a valid transaction is submitted. So we need to mimic how a new
// block env is created in interval mining mode.
//
// It is important that we update the block env here especially for the block timestamp.
// Because if a fee estimate is requested using the pending block in instant mode,
// and the timestamp is NOT UPDATED, it will be estimated using the block timestamp
// of the last mined block. This is not ideal as the fee estimate should be based on
// the block timestamp of the block that the transaction may

let num = self.backend.blockchain.provider().latest_number()?;
let env = self.backend.blockchain.provider().block_env_at(num.into())?;

let mut env = env.expect("latest block env must exist");
self.backend.update_block_env(&mut env);

Ok(env)
}

fn pending_state(&self) -> ProviderResult<Box<dyn StateProvider>> {
self.backend.blockchain.provider().latest()
}

fn pending_block(&self) -> ProviderResult<()> {
todo!()
}
Comment on lines +818 to +820
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! The pending_block method is unimplemented and needs attention.

In both InstantBlockProducer and IntervalBlockProducer, the pending_block method contains a todo!() macro. This indicates that the method is unimplemented. Please provide a suitable implementation to fulfill the PendingBlockProvider trait requirements.

Also applies to: 887-889


fn pending_transaction_status(
&self,
hash: TxHash,
) -> ProviderResult<Option<TransactionStatus>> {
if let Some(receipt) = dbg!(self.backend.blockchain.provider().receipt_by_hash(hash)?) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo! Remove debug print

The dbg! macro should be removed from production code.

Apply this diff:

-            if let Some(receipt) = dbg!(self.backend.blockchain.provider().receipt_by_hash(hash)?) {
+            if let Some(receipt) = self.backend.blockchain.provider().receipt_by_hash(hash)? {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if let Some(receipt) = dbg!(self.backend.blockchain.provider().receipt_by_hash(hash)?) {
if let Some(receipt) = self.backend.blockchain.provider().receipt_by_hash(hash)? {

if receipt.is_reverted() {
Ok(Some(TransactionStatus::AcceptedOnL2(TransactionExecutionStatus::Reverted)))
} else {
Ok(Some(TransactionStatus::AcceptedOnL2(TransactionExecutionStatus::Succeeded)))
}
} else {
Ok(None)
}
}

fn pending_transactions(&self) -> ProviderResult<Vec<TxWithHash>> {
Ok(Vec::new())
}

fn pending_receipts(&self) -> ProviderResult<Vec<Receipt>> {
Ok(Vec::new())
}

fn pending_transaction_trace(&self, hash: TxHash) -> ProviderResult<Option<TxExecInfo>> {
self.backend.blockchain.provider().transaction_execution(hash)
}

fn pending_transaction_traces(&self) -> ProviderResult<Vec<TxExecInfo>> {
let number = self.backend.blockchain.provider().latest_number()?;
let traces =
self.backend.blockchain.provider().transaction_executions_by_block(number.into())?;
Ok(traces.expect("traces for latest block must exist"))
}
}

impl<EF: ExecutorFactory> PendingBlockProvider for IntervalBlockProducer<EF> {
fn pending_transaction(&self, hash: TxHash) -> ProviderResult<Option<TxWithHash>> {
let transaction = self
.executor()
.read()
.transactions()
.iter()
.find_map(|(tx, _)| if tx.hash == hash { Some(tx.clone()) } else { None });

Ok(transaction)
}

fn pending_receipt(&self, hash: TxHash) -> ProviderResult<Option<Receipt>> {
let receipt = self
.executor()
.read()
.transactions()
.iter()
.find_map(|(tx, res)| if tx.hash == hash { res.receipt().cloned() } else { None });

Ok(receipt)
}

fn pending_transactions(&self) -> ProviderResult<Vec<TxWithHash>> {
Ok(self.executor().read().transactions().iter().map(|(tx, _)| tx.clone()).collect())
}

fn pending_state(&self) -> ProviderResult<Box<dyn StateProvider>> {
Ok(self.executor().read().state())
}

fn pending_block_env(&self) -> ProviderResult<BlockEnv> {
Ok(self.executor().read().block_env())
}

fn pending_block(&self) -> ProviderResult<()> {
todo!()
}

fn pending_receipts(&self) -> ProviderResult<Vec<Receipt>> {
let receipts = self.executor().read().transactions().iter().fold(
Vec::new(),
|mut acc, (_, result)| {
if let Some(receipt) = result.receipt() {
acc.push(receipt.clone());
}
acc
},
);

Ok(receipts)
}

fn pending_transaction_status(
&self,
hash: TxHash,
) -> ProviderResult<Option<TransactionStatus>> {
if let Some((_, res)) =
self.executor().read().transactions().iter().find(|(tx, _)| tx.hash == hash)
{
// TODO: should impl From<ExecutionResult> for TransactionStatus
let status = match res {
ExecutionResult::Failed { .. } => TransactionStatus::Rejected,
ExecutionResult::Success { receipt, .. } => {
if receipt.is_reverted() {
TransactionStatus::AcceptedOnL2(TransactionExecutionStatus::Reverted)
} else {
TransactionStatus::AcceptedOnL2(TransactionExecutionStatus::Succeeded)
}
}
};

Ok(Some(status))
} else {
Ok(None)
}
}

fn pending_transaction_trace(&self, hash: TxHash) -> ProviderResult<Option<TxExecInfo>> {
let result = self
.executor()
.read()
.transactions()
.iter()
.find(|(t, _)| t.hash == hash)
.map(|(_, res)| res)
.and_then(|res| res.trace())
.cloned();

Ok(result)
}

fn pending_transaction_traces(&self) -> ProviderResult<Vec<TxExecInfo>> {
let traces = self
.executor()
.read()
.transactions()
.iter()
.filter_map(|(_, r)| r.trace().cloned())
.collect::<Vec<_>>();

Ok(traces)
}
}
2 changes: 1 addition & 1 deletion crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ pub async fn spawn<EF: ExecutorFactory>(
cfg,
)
} else {
StarknetApi::new(backend.clone(), pool.clone(), Some(block_producer.clone()), cfg)
StarknetApi::new(backend.clone(), pool.clone(), block_producer.clone(), cfg)
};

methods.merge(StarknetApiServer::into_rpc(server.clone()))?;
Expand Down
Loading
Loading