From f7c2f1fc2ba10f23ac6cfcaa926222bc0fec98ac Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Sat, 30 Nov 2024 12:22:59 -0500 Subject: [PATCH 1/2] dont run sequencing task in pipeline lol --- Cargo.lock | 2 +- crates/katana/node/src/lib.rs | 15 ++---- .../katana/pipeline/src/stage/sequencing.rs | 49 +++++++++---------- 3 files changed, 29 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0330436cbb..5673493b1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8419,7 +8419,7 @@ dependencies = [ [[package]] name = "katana-feeder-gateway" -version = "1.0.2" +version = "1.0.3" dependencies = [ "katana-primitives", "katana-rpc-types", diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index e2037730a2..4675e1acbe 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -32,7 +32,7 @@ use katana_core::service::messaging::MessagingConfig; use katana_db::mdbx::DbEnv; use katana_executor::implementation::blockifier::BlockifierFactory; use katana_executor::{ExecutionFlags, ExecutorFactory}; -use katana_pipeline::{stage, Pipeline}; +use katana_pipeline::stage::Sequencing; use katana_pool::ordering::FiFo; use katana_pool::validation::stateful::TxValidator; use katana_pool::TxPool; @@ -126,9 +126,9 @@ impl Node { let block_producer = self.block_producer.clone(); let validator = self.block_producer.validator().clone(); - // --- build sequencing stage + // --- build and run sequencing task - let sequencing = stage::Sequencing::new( + let sequencing = Sequencing::new( pool.clone(), backend.clone(), self.task_manager.task_spawner(), @@ -136,17 +136,12 @@ impl Node { self.messaging_config.clone(), ); - // --- build and start the pipeline - - let mut pipeline = Pipeline::new(); - pipeline.add_stage(Box::new(sequencing)); - self.task_manager .task_spawner() .build_task() .critical() - .name("Pipeline") - .spawn(pipeline.into_future()); + .name("Sequencing") + .spawn(sequencing.into_future()); let node_components = (pool, backend, block_producer, validator, self.forked_client.take()); let rpc = spawn(node_components, self.rpc_config.clone()).await?; diff --git a/crates/katana/pipeline/src/stage/sequencing.rs b/crates/katana/pipeline/src/stage/sequencing.rs index c988ecab57..50b199caf7 100644 --- a/crates/katana/pipeline/src/stage/sequencing.rs +++ b/crates/katana/pipeline/src/stage/sequencing.rs @@ -1,7 +1,8 @@ +use std::future::IntoFuture; use std::sync::Arc; use anyhow::Result; -use futures::future; +use futures::future::{self, BoxFuture}; use katana_core::backend::Backend; use katana_core::service::block_producer::{BlockProducer, BlockProductionError}; use katana_core::service::messaging::{MessagingConfig, MessagingService, MessagingTask}; @@ -11,8 +12,7 @@ use katana_pool::{TransactionPool, TxPool}; use katana_tasks::{TaskHandle, TaskSpawner}; use tracing::error; -use super::{StageId, StageResult}; -use crate::Stage; +pub type SequencingFut = BoxFuture<'static, ()>; /// The sequencing stage is responsible for advancing the chain state. #[allow(missing_debug_implementations)] @@ -61,31 +61,28 @@ impl Sequencing { } } -#[async_trait::async_trait] -impl Stage for Sequencing { - fn id(&self) -> StageId { - StageId::Sequencing - } +impl IntoFuture for Sequencing { + type Output = (); + type IntoFuture = SequencingFut; - #[tracing::instrument(skip(self), name = "Stage", fields(id = %self.id()))] - async fn execute(&mut self) -> StageResult { - // Build the messaging and block production tasks. - let messaging = self.run_messaging().await?; - let block_production = self.run_block_production(); + fn into_future(self) -> Self::IntoFuture { + Box::pin(async move { + // Build the messaging and block production tasks. + let messaging = self.run_messaging().await.unwrap(); + let block_production = self.run_block_production(); - // Neither of these tasks should complete as they are meant to be run forever, - // but if either of them do complete, the sequencing stage should return. - // - // Select on the tasks completion to prevent the task from failing silently (if any). - tokio::select! { - res = messaging => { - error!(target: "pipeline", reason = ?res, "Messaging task finished unexpectedly."); - }, - res = block_production => { - error!(target: "pipeline", reason = ?res, "Block production task finished unexpectedly."); + // Neither of these tasks should complete as they are meant to be run forever, + // but if either of them do complete, the sequencing stage should return. + // + // Select on the tasks completion to prevent the task from failing silently (if any). + tokio::select! { + res = messaging => { + error!(target: "sequencing", reason = ?res, "Messaging task finished unexpectedly."); + }, + res = block_production => { + error!(target: "sequencing", reason = ?res, "Block production task finished unexpectedly."); + } } - } - - Ok(()) + }) } } From 1e08603a829d70d4629eb7129689b222782eb316 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Sat, 30 Nov 2024 12:30:06 -0500 Subject: [PATCH 2/2] remove unwrap --- crates/katana/pipeline/src/stage/sequencing.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/katana/pipeline/src/stage/sequencing.rs b/crates/katana/pipeline/src/stage/sequencing.rs index 50b199caf7..c1998e7341 100644 --- a/crates/katana/pipeline/src/stage/sequencing.rs +++ b/crates/katana/pipeline/src/stage/sequencing.rs @@ -12,7 +12,7 @@ use katana_pool::{TransactionPool, TxPool}; use katana_tasks::{TaskHandle, TaskSpawner}; use tracing::error; -pub type SequencingFut = BoxFuture<'static, ()>; +pub type SequencingFut = BoxFuture<'static, Result<()>>; /// The sequencing stage is responsible for advancing the chain state. #[allow(missing_debug_implementations)] @@ -62,13 +62,13 @@ impl Sequencing { } impl IntoFuture for Sequencing { - type Output = (); + type Output = Result<()>; type IntoFuture = SequencingFut; fn into_future(self) -> Self::IntoFuture { Box::pin(async move { // Build the messaging and block production tasks. - let messaging = self.run_messaging().await.unwrap(); + let messaging = self.run_messaging().await?; let block_production = self.run_block_production(); // Neither of these tasks should complete as they are meant to be run forever, @@ -83,6 +83,8 @@ impl IntoFuture for Sequencing { error!(target: "sequencing", reason = ?res, "Block production task finished unexpectedly."); } } + + Ok(()) }) } }