diff --git a/Cargo.lock b/Cargo.lock index 54f5d3f694f7..93324652a9d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7124,6 +7124,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "reth-ethereum-engine" +version = "1.0.0" +dependencies = [ + "futures", + "pin-project", + "reth-beacon-consensus", + "reth-chainspec", + "reth-db-api", + "reth-engine-tree", + "reth-ethereum-engine-primitives", + "reth-network-p2p", + "reth-stages-api", + "reth-tasks", + "tokio", + "tokio-stream", +] + [[package]] name = "reth-ethereum-engine-primitives" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 936de53b63b4..417fe3ef8293 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ members = [ "crates/errors/", "crates/ethereum-forks/", "crates/ethereum/consensus/", + "crates/ethereum/engine/", "crates/ethereum/engine-primitives/", "crates/ethereum/evm", "crates/ethereum/node", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index bcc8ae34bddd..e2a1c462d6c8 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -30,6 +30,7 @@ reth-payload-validator.workspace = true reth-primitives.workspace = true reth-provider.workspace = true reth-prune.workspace = true +reth-prune-types.workspace = true reth-revm.workspace = true reth-rpc-types.workspace = true reth-stages-api.workspace = true @@ -54,11 +55,24 @@ aquamarine.workspace = true parking_lot.workspace = true tracing.workspace = true +# optional deps for test-utils +reth-stages = { workspace = true, optional = true } +reth-tracing = { workspace = true, optional = true } + [dev-dependencies] # reth +reth-db = { workspace = true, features = ["test-utils"] } reth-network-p2p = { workspace = true, features = ["test-utils"] } reth-prune-types.workspace = true reth-stages = { workspace = true, features = ["test-utils"] } reth-tracing.workspace = true -assert_matches.workspace = true \ No newline at end of file +assert_matches.workspace = true + +[features] +test-utils = [ + "reth-db/test-utils", + "reth-network-p2p/test-utils", + "reth-stages/test-utils", + "reth-tracing" +] diff --git a/crates/engine/tree/src/backfill.rs b/crates/engine/tree/src/backfill.rs index 3060ddd1d4e7..24153bed24c0 100644 --- a/crates/engine/tree/src/backfill.rs +++ b/crates/engine/tree/src/backfill.rs @@ -206,23 +206,17 @@ impl PipelineState { #[cfg(test)] mod tests { use super::*; - use crate::test_utils::insert_headers_into_client; + use crate::test_utils::{insert_headers_into_client, TestPipelineBuilder}; use assert_matches::assert_matches; use futures::poll; - use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET}; + use reth_chainspec::{ChainSpecBuilder, MAINNET}; use reth_db::{mdbx::DatabaseEnv, test_utils::TempDatabase}; use reth_network_p2p::test_utils::TestFullBlockClient; use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, BlockNumber, Header, B256}; - use reth_provider::{ - test_utils::create_test_provider_factory_with_chain_spec, ExecutionOutcome, - }; - use reth_prune_types::PruneModes; - use reth_stages::{test_utils::TestStages, ExecOutput, StageError}; + use reth_stages::ExecOutput; use reth_stages_api::StageCheckpoint; - use reth_static_file::StaticFileProducer; use reth_tasks::TokioTaskExecutor; use std::{collections::VecDeque, future::poll_fn, sync::Arc}; - use tokio::sync::watch; struct TestHarness { pipeline_sync: PipelineSync>>, @@ -263,52 +257,6 @@ mod tests { } } - struct TestPipelineBuilder { - pipeline_exec_outputs: VecDeque>, - executor_results: Vec, - } - - impl TestPipelineBuilder { - /// Create a new [`TestPipelineBuilder`]. - const fn new() -> Self { - Self { pipeline_exec_outputs: VecDeque::new(), executor_results: Vec::new() } - } - - /// Set the pipeline execution outputs to use for the test consensus engine. - fn with_pipeline_exec_outputs( - mut self, - pipeline_exec_outputs: VecDeque>, - ) -> Self { - self.pipeline_exec_outputs = pipeline_exec_outputs; - self - } - - /// Set the executor results to use for the test consensus engine. - #[allow(dead_code)] - fn with_executor_results(mut self, executor_results: Vec) -> Self { - self.executor_results = executor_results; - self - } - - /// Builds the pipeline. - fn build(self, chain_spec: Arc) -> Pipeline>> { - reth_tracing::init_test_tracing(); - - // Setup pipeline - let (tip_tx, _tip_rx) = watch::channel(B256::default()); - let pipeline = Pipeline::builder() - .add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default())) - .with_tip_sender(tip_tx); - - let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec); - - let static_file_producer = - StaticFileProducer::new(provider_factory.clone(), PruneModes::default()); - - pipeline.build(provider_factory, static_file_producer) - } - } - #[tokio::test] async fn pipeline_started_and_finished() { const TOTAL_BLOCKS: usize = 10; diff --git a/crates/engine/tree/src/engine.rs b/crates/engine/tree/src/engine.rs index 2010c3768a42..25d4fabf7832 100644 --- a/crates/engine/tree/src/engine.rs +++ b/crates/engine/tree/src/engine.rs @@ -10,9 +10,10 @@ use reth_engine_primitives::EngineTypes; use reth_primitives::{SealedBlockWithSenders, B256}; use std::{ collections::HashSet, + sync::mpsc::Sender, task::{Context, Poll}, }; -use tokio::sync::mpsc; +use tokio::sync::mpsc::UnboundedReceiver; /// Advances the chain based on incoming requests. /// @@ -146,13 +147,23 @@ pub trait EngineRequestHandler: Send + Sync { #[derive(Debug)] pub struct EngineApiRequestHandler { /// channel to send messages to the tree to execute the payload. - to_tree: std::sync::mpsc::Sender>>, + to_tree: Sender>>, /// channel to receive messages from the tree. - from_tree: mpsc::UnboundedReceiver, + from_tree: UnboundedReceiver, // TODO add db controller } -impl EngineApiRequestHandler where T: EngineTypes {} +impl EngineApiRequestHandler +where + T: EngineTypes, +{ + pub const fn new( + to_tree: Sender>>, + from_tree: UnboundedReceiver, + ) -> Self { + Self { to_tree, from_tree } + } +} impl EngineRequestHandler for EngineApiRequestHandler where diff --git a/crates/engine/tree/src/lib.rs b/crates/engine/tree/src/lib.rs index 52ef90e4b637..8f40119b2cc0 100644 --- a/crates/engine/tree/src/lib.rs +++ b/crates/engine/tree/src/lib.rs @@ -1,4 +1,8 @@ //! This crate includes the core components for advancing a reth chain. +//! +//! ## Feature Flags +//! +//! - `test-utils`: Export utilities for testing #![doc( html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", @@ -27,5 +31,5 @@ pub mod persistence; /// Support for interacting with the blockchain tree. pub mod tree; -#[cfg(test)] -mod test_utils; +#[cfg(any(test, feature = "test-utils"))] +pub mod test_utils; diff --git a/crates/engine/tree/src/test_utils.rs b/crates/engine/tree/src/test_utils.rs index eed483e29932..0a5fbd5ad560 100644 --- a/crates/engine/tree/src/test_utils.rs +++ b/crates/engine/tree/src/test_utils.rs @@ -1,6 +1,62 @@ +use reth_chainspec::ChainSpec; +use reth_db::{mdbx::DatabaseEnv, test_utils::TempDatabase}; use reth_network_p2p::test_utils::TestFullBlockClient; -use reth_primitives::{BlockBody, SealedHeader}; -use std::ops::Range; +use reth_primitives::{BlockBody, SealedHeader, B256}; +use reth_provider::{test_utils::create_test_provider_factory_with_chain_spec, ExecutionOutcome}; +use reth_prune_types::PruneModes; +use reth_stages::{test_utils::TestStages, ExecOutput, StageError}; +use reth_stages_api::Pipeline; +use reth_static_file::StaticFileProducer; +use std::{collections::VecDeque, ops::Range, sync::Arc}; +use tokio::sync::watch; + +/// Test pipeline builder. +#[derive(Default)] +pub struct TestPipelineBuilder { + pipeline_exec_outputs: VecDeque>, + executor_results: Vec, +} + +impl TestPipelineBuilder { + /// Create a new [`TestPipelineBuilder`]. + pub const fn new() -> Self { + Self { pipeline_exec_outputs: VecDeque::new(), executor_results: Vec::new() } + } + + /// Set the pipeline execution outputs to use for the test consensus engine. + pub fn with_pipeline_exec_outputs( + mut self, + pipeline_exec_outputs: VecDeque>, + ) -> Self { + self.pipeline_exec_outputs = pipeline_exec_outputs; + self + } + + /// Set the executor results to use for the test consensus engine. + #[allow(dead_code)] + pub fn with_executor_results(mut self, executor_results: Vec) -> Self { + self.executor_results = executor_results; + self + } + + /// Builds the pipeline. + pub fn build(self, chain_spec: Arc) -> Pipeline>> { + reth_tracing::init_test_tracing(); + + // Setup pipeline + let (tip_tx, _tip_rx) = watch::channel(B256::default()); + let pipeline = Pipeline::builder() + .add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default())) + .with_tip_sender(tip_tx); + + let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec); + + let static_file_producer = + StaticFileProducer::new(provider_factory.clone(), PruneModes::default()); + + pipeline.build(provider_factory, static_file_producer) + } +} pub(crate) fn insert_headers_into_client( client: &TestFullBlockClient, diff --git a/crates/ethereum/engine/Cargo.toml b/crates/ethereum/engine/Cargo.toml new file mode 100644 index 000000000000..05fbc4386cde --- /dev/null +++ b/crates/ethereum/engine/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "reth-ethereum-engine" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dependencies] +# reth +reth-beacon-consensus.workspace = true +reth-chainspec.workspace = true +reth-db-api.workspace = true +reth-engine-tree.workspace = true +reth-ethereum-engine-primitives.workspace = true +reth-network-p2p.workspace = true +reth-stages-api.workspace = true +reth-tasks.workspace = true + +# async +futures.workspace = true +pin-project.workspace = true +tokio = { workspace = true, features = ["sync"] } +tokio-stream.workspace = true + +[dev-dependencies] +reth-engine-tree = { workspace = true, features = ["test-utils"] } diff --git a/crates/ethereum/engine/src/lib.rs b/crates/ethereum/engine/src/lib.rs new file mode 100644 index 000000000000..e623dd733052 --- /dev/null +++ b/crates/ethereum/engine/src/lib.rs @@ -0,0 +1,12 @@ +//! Ethereum engine implementation. + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", + html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", + issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" +)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +/// Ethereum engine orchestrator. +pub mod orchestrator; diff --git a/crates/ethereum/engine/src/orchestrator.rs b/crates/ethereum/engine/src/orchestrator.rs new file mode 100644 index 000000000000..0abf352eeab7 --- /dev/null +++ b/crates/ethereum/engine/src/orchestrator.rs @@ -0,0 +1,138 @@ +use futures::{ready, StreamExt}; +use pin_project::pin_project; +use reth_beacon_consensus::{BeaconEngineMessage, EthBeaconConsensus}; +use reth_chainspec::ChainSpec; +use reth_db_api::database::Database; +use reth_engine_tree::{ + backfill::PipelineSync, + chain::ChainOrchestrator, + download::BasicBlockDownloader, + engine::{EngineApiEvent, EngineApiRequestHandler, EngineHandler, FromEngine}, +}; +use reth_ethereum_engine_primitives::EthEngineTypes; +use reth_network_p2p::{bodies::client::BodiesClient, headers::client::HeadersClient}; +use reth_stages_api::Pipeline; +use reth_tasks::TaskSpawner; +use std::{ + future::Future, + pin::Pin, + sync::{mpsc::Sender, Arc}, + task::{Context, Poll}, +}; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// Alias for Ethereum chain orchestrator. +type EthServiceType = ChainOrchestrator< + EngineHandler< + EngineApiRequestHandler, + UnboundedReceiverStream>, + BasicBlockDownloader, + >, + PipelineSync, +>; + +/// The type that drives the Ethereum chain forward and communicates progress. +#[pin_project] +#[allow(missing_debug_implementations)] +pub struct EthService +where + DB: Database + 'static, + Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, +{ + orchestrator: EthServiceType, +} + +impl EthService +where + DB: Database + 'static, + Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, +{ + /// Constructor for `EthService`. + pub fn new( + chain_spec: Arc, + client: Client, + to_tree: Sender>>, + from_tree: UnboundedReceiver, + incoming_requests: UnboundedReceiverStream>, + pipeline: Pipeline, + pipeline_task_spawner: Box, + ) -> Self { + let consensus = Arc::new(EthBeaconConsensus::new(chain_spec)); + let downloader = BasicBlockDownloader::new(client, consensus); + + let engine_handler = EngineApiRequestHandler::new(to_tree, from_tree); + let handler = EngineHandler::new(engine_handler, downloader, incoming_requests); + + let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner); + + Self { orchestrator: ChainOrchestrator::new(handler, backfill_sync) } + } +} + +impl Future for EthService +where + DB: Database + 'static, + Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, +{ + type Output = Result<(), EthServiceError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Call poll on the inner orchestrator. + let mut orchestrator = self.project().orchestrator; + loop { + match ready!(StreamExt::poll_next_unpin(&mut orchestrator, cx)) { + Some(_event) => continue, + None => return Poll::Ready(Ok(())), + } + } + } +} + +/// Potential error returned by `EthService`. +#[derive(Debug)] +pub struct EthServiceError {} + +#[cfg(test)] +mod tests { + use super::*; + use reth_chainspec::{ChainSpecBuilder, MAINNET}; + use reth_engine_tree::test_utils::TestPipelineBuilder; + use reth_ethereum_engine_primitives::EthEngineTypes; + use reth_network_p2p::test_utils::TestFullBlockClient; + use reth_tasks::TokioTaskExecutor; + use std::sync::{mpsc::channel, Arc}; + use tokio::sync::mpsc::unbounded_channel; + + #[test] + fn eth_chain_orchestrator_build() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + + let client = TestFullBlockClient::default(); + + let (_tx, rx) = unbounded_channel::>(); + let incoming_requests = UnboundedReceiverStream::new(rx); + + let pipeline = TestPipelineBuilder::new().build(chain_spec.clone()); + let pipeline_task_spawner = Box::::default(); + + let (to_tree_tx, _to_tree_rx) = channel(); + let (_from_tree_tx, from_tree_rx) = unbounded_channel(); + + let _eth_chain_orchestrator = EthService::new( + chain_spec, + client, + to_tree_tx, + from_tree_rx, + incoming_requests, + pipeline, + pipeline_task_spawner, + ); + } +}