Skip to content

Commit

Permalink
refactor(katana): dont run sequencing task inside of pipeline (dojoen…
Browse files Browse the repository at this point in the history
  • Loading branch information
kariy authored and augustin-v committed Dec 4, 2024
1 parent 18214ea commit cca0fe3
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 5 additions & 10 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use katana_core::service::messaging::MessagingConfig;
use katana_db::mdbx::DbEnv;
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_executor::{ExecutionFlags, ExecutorFactory};
use katana_pipeline::{stage, Pipeline};
use katana_pipeline::stage::Sequencing;
use katana_pool::ordering::FiFo;
use katana_pool::validation::stateful::TxValidator;
use katana_pool::TxPool;
Expand Down Expand Up @@ -128,27 +128,22 @@ impl Node {
let block_producer = self.block_producer.clone();
let validator = self.block_producer.validator().clone();

// --- build sequencing stage
// --- build and run sequencing task

let sequencing = stage::Sequencing::new(
let sequencing = Sequencing::new(
pool.clone(),
backend.clone(),
self.task_manager.task_spawner(),
block_producer.clone(),
self.messaging_config.clone(),
);

// --- build and start the pipeline

let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(sequencing));

self.task_manager
.task_spawner()
.build_task()
.critical()
.name("Pipeline")
.spawn(pipeline.into_future());
.name("Sequencing")
.spawn(sequencing.into_future());

let node_components = (pool, backend, block_producer, validator, self.forked_client.take());
let rpc = spawn(node_components, self.rpc_config.clone()).await?;
Expand Down
49 changes: 24 additions & 25 deletions crates/katana/pipeline/src/stage/sequencing.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::future::IntoFuture;
use std::sync::Arc;

use anyhow::Result;
use futures::future;
use futures::future::{self, BoxFuture};
use katana_core::backend::Backend;
use katana_core::service::block_producer::{BlockProducer, BlockProductionError};
use katana_core::service::messaging::{MessagingConfig, MessagingService, MessagingTask};
Expand All @@ -11,8 +12,7 @@ use katana_pool::{TransactionPool, TxPool};
use katana_tasks::{TaskHandle, TaskSpawner};
use tracing::error;

use super::{StageId, StageResult};
use crate::Stage;
pub type SequencingFut = BoxFuture<'static, Result<()>>;

/// The sequencing stage is responsible for advancing the chain state.
#[allow(missing_debug_implementations)]
Expand Down Expand Up @@ -61,31 +61,30 @@ impl<EF: ExecutorFactory> Sequencing<EF> {
}
}

#[async_trait::async_trait]
impl<EF: ExecutorFactory> Stage for Sequencing<EF> {
fn id(&self) -> StageId {
StageId::Sequencing
}
impl<EF: ExecutorFactory> IntoFuture for Sequencing<EF> {
type Output = Result<()>;
type IntoFuture = SequencingFut;

#[tracing::instrument(skip(self), name = "Stage", fields(id = %self.id()))]
async fn execute(&mut self) -> StageResult {
// Build the messaging and block production tasks.
let messaging = self.run_messaging().await?;
let block_production = self.run_block_production();
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
// Build the messaging and block production tasks.
let messaging = self.run_messaging().await?;
let block_production = self.run_block_production();

// Neither of these tasks should complete as they are meant to be run forever,
// but if either of them do complete, the sequencing stage should return.
//
// Select on the tasks completion to prevent the task from failing silently (if any).
tokio::select! {
res = messaging => {
error!(target: "pipeline", reason = ?res, "Messaging task finished unexpectedly.");
},
res = block_production => {
error!(target: "pipeline", reason = ?res, "Block production task finished unexpectedly.");
// Neither of these tasks should complete as they are meant to be run forever,
// but if either of them do complete, the sequencing stage should return.
//
// Select on the tasks completion to prevent the task from failing silently (if any).
tokio::select! {
res = messaging => {
error!(target: "sequencing", reason = ?res, "Messaging task finished unexpectedly.");
},
res = block_production => {
error!(target: "sequencing", reason = ?res, "Block production task finished unexpectedly.");
}
}
}

Ok(())
Ok(())
})
}
}

0 comments on commit cca0fe3

Please sign in to comment.