Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: relax ProviderFactory setup #13254

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/cli/commands/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

use alloy_primitives::B256;
use clap::Parser;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_chainspec::EthChainSpec;
use reth_cli::chainspec::ChainSpecParser;
use reth_config::{config::EtlConfig, Config};
use reth_consensus::noop::NoopConsensus;
use reth_db::{init_db, open_db_read_only, DatabaseEnv};
use reth_db_common::init::init_genesis;
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
Expand Down Expand Up @@ -151,10 +151,10 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
.add_stages(DefaultStages::new(
factory.clone(),
tip_rx,
Arc::new(EthBeaconConsensus::new(self.chain.clone())),
Arc::new(NoopConsensus::default()),
NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(),
NoopBlockExecutorProvider::default(),
NoopBlockExecutorProvider::<N::Primitives>::default(),
config.stages.clone(),
prune_modes.clone(),
))
Expand Down
11 changes: 3 additions & 8 deletions crates/cli/commands/src/stage/dump/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,7 @@ fn import_tables_with_range<N: NodeTypesWithDB>(
/// `PlainAccountState` safely. There might be some state dependency from an address
/// which hasn't been changed in the given range.
fn unwind_and_copy<
N: ProviderNodeTypes<
Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
BlockHeader = reth_primitives::Header,
>,
>,
N: ProviderNodeTypes<Primitives: NodePrimitives<BlockHeader = reth_primitives::Header>>,
>(
db_tool: &DbTool<N>,
from: u64,
Expand All @@ -155,7 +149,8 @@ fn unwind_and_copy<
) -> eyre::Result<()> {
let provider = db_tool.provider_factory.database_provider_rw()?;

let mut exec_stage = ExecutionStage::new_with_executor(NoopBlockExecutorProvider::default());
let mut exec_stage =
ExecutionStage::new_with_executor(NoopBlockExecutorProvider::<N::Primitives>::default());

exec_stage.unwind(
&provider,
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/commands/src/stage/dump/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ fn unwind_and_copy<

// Bring Plainstate to TO (hashing stage execution requires it)
let mut exec_stage = ExecutionStage::new(
NoopBlockExecutorProvider::default(), // Not necessary for unwinding.
NoopBlockExecutorProvider::<N::Primitives>::default(), // Not necessary for unwinding.
ExecutionStageThresholds {
max_blocks: Some(u64::MAX),
max_changes: None,
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/commands/src/stage/unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
let (tip_tx, tip_rx) = watch::channel(B256::ZERO);

// Unwinding does not require a valid executor
let executor = NoopBlockExecutorProvider::default();
let executor = NoopBlockExecutorProvider::<N::Primitives>::default();

let builder = if self.offline {
Pipeline::<N>::builder().add_stages(
Expand Down
4 changes: 2 additions & 2 deletions crates/engine/local/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use reth_engine_tree::{
EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineRequestHandler, FromEngine,
RequestHandlerEvent,
},
persistence::{PersistenceHandle, PersistenceNodeTypes},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
};
use reth_evm::execute::BlockExecutorProvider;
Expand Down Expand Up @@ -59,7 +59,7 @@ where

impl<N> LocalEngineService<N>
where
N: EngineNodeTypes + PersistenceNodeTypes,
N: EngineNodeTypes,
{
/// Constructor for [`LocalEngineService`].
#[allow(clippy::too_many_arguments)]
Expand Down
4 changes: 2 additions & 2 deletions crates/engine/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use reth_engine_tree::{
backfill::PipelineSync,
download::BasicBlockDownloader,
engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
persistence::{PersistenceHandle, PersistenceNodeTypes},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
};
pub use reth_engine_tree::{
Expand Down Expand Up @@ -59,7 +59,7 @@ where

impl<N, Client, E> EngineService<N, Client, E>
where
N: EngineNodeTypes + PersistenceNodeTypes,
N: EngineNodeTypes,
Client: EthBlockClient + 'static,
E: BlockExecutorProvider<Primitives = N::Primitives> + 'static,
{
Expand Down
21 changes: 9 additions & 12 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::metrics::PersistenceMetrics;
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumHash;
use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
Expand All @@ -17,11 +18,6 @@ use thiserror::Error;
use tokio::sync::oneshot;
use tracing::{debug, error};

/// A helper trait with requirements for [`ProviderNodeTypes`] to be used within
/// [`PersistenceService`].
pub trait PersistenceNodeTypes: ProviderNodeTypes<Primitives = EthPrimitives> {}
impl<T> PersistenceNodeTypes for T where T: ProviderNodeTypes<Primitives = EthPrimitives> {}

/// Writes parts of reth's in memory tree state to the database and static files.
///
/// This is meant to be a spawned service that listens for various incoming persistence operations,
Expand All @@ -32,7 +28,7 @@ impl<T> PersistenceNodeTypes for T where T: ProviderNodeTypes<Primitives = EthPr
#[derive(Debug)]
pub struct PersistenceService<N>
where
N: PersistenceNodeTypes,
N: ProviderNodeTypes,
{
/// The provider factory to use
provider: ProviderFactory<N>,
Expand All @@ -48,7 +44,7 @@ where

impl<N> PersistenceService<N>
where
N: PersistenceNodeTypes,
N: ProviderNodeTypes,
{
/// Create a new persistence service
pub fn new(
Expand All @@ -74,7 +70,7 @@ where

impl<N> PersistenceService<N>
where
N: PersistenceNodeTypes,
N: ProviderNodeTypes,
{
/// This is the main loop, that will listen to database events and perform the requested
/// database actions
Expand Down Expand Up @@ -148,9 +144,10 @@ where
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.block.num_hash()), last=?blocks.last().map(|b| b.block.num_hash()), "Saving range of blocks");
let start_time = Instant::now();
let last_block_hash_num = blocks
.last()
.map(|block| BlockNumHash { hash: block.block().hash(), number: block.block().number });
let last_block_hash_num = blocks.last().map(|block| BlockNumHash {
hash: block.block().hash(),
number: block.block().header().number(),
});

if last_block_hash_num.is_some() {
let provider_rw = self.provider.database_provider_rw()?;
Expand Down Expand Up @@ -219,7 +216,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
sync_metrics_tx: MetricEventsSender,
) -> PersistenceHandle<N::Primitives>
where
N: PersistenceNodeTypes,
N: ProviderNodeTypes,
{
// create the initial channels
let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
Expand Down
24 changes: 12 additions & 12 deletions crates/evm/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use alloy_primitives::BlockNumber;
use core::fmt::Display;
use reth_execution_errors::BlockExecutionError;
use reth_execution_types::{BlockExecutionInput, BlockExecutionOutput, ExecutionOutcome};
use reth_primitives::{BlockWithSenders, EthPrimitives, Receipt};
use reth_primitives::{BlockWithSenders, NodePrimitives};
use reth_prune_types::PruneModes;
use reth_storage_errors::provider::ProviderError;
use revm::State;
Expand All @@ -20,10 +20,10 @@ const UNAVAILABLE_FOR_NOOP: &str = "execution unavailable for noop";
/// A [`BlockExecutorProvider`] implementation that does nothing.
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct NoopBlockExecutorProvider;
pub struct NoopBlockExecutorProvider<P>(core::marker::PhantomData<P>);

impl BlockExecutorProvider for NoopBlockExecutorProvider {
type Primitives = EthPrimitives;
impl<P: NodePrimitives> BlockExecutorProvider for NoopBlockExecutorProvider<P> {
type Primitives = P;

type Executor<DB: Database<Error: Into<ProviderError> + Display>> = Self;

Expand All @@ -33,20 +33,20 @@ impl BlockExecutorProvider for NoopBlockExecutorProvider {
where
DB: Database<Error: Into<ProviderError> + Display>,
{
Self
Self::default()
}

fn batch_executor<DB>(&self, _: DB) -> Self::BatchExecutor<DB>
where
DB: Database<Error: Into<ProviderError> + Display>,
{
Self
Self::default()
}
}

impl<DB> Executor<DB> for NoopBlockExecutorProvider {
type Input<'a> = BlockExecutionInput<'a, BlockWithSenders>;
type Output = BlockExecutionOutput<Receipt>;
impl<DB, P: NodePrimitives> Executor<DB> for NoopBlockExecutorProvider<P> {
type Input<'a> = BlockExecutionInput<'a, BlockWithSenders<P::Block>>;
type Output = BlockExecutionOutput<P::Receipt>;
type Error = BlockExecutionError;

fn execute(self, _: Self::Input<'_>) -> Result<Self::Output, Self::Error> {
Expand Down Expand Up @@ -76,9 +76,9 @@ impl<DB> Executor<DB> for NoopBlockExecutorProvider {
}
}

impl<DB> BatchExecutor<DB> for NoopBlockExecutorProvider {
type Input<'a> = BlockExecutionInput<'a, BlockWithSenders>;
type Output = ExecutionOutcome;
impl<DB, P: NodePrimitives> BatchExecutor<DB> for NoopBlockExecutorProvider<P> {
type Input<'a> = BlockExecutionInput<'a, BlockWithSenders<P::Block>>;
type Output = ExecutionOutcome<P::Receipt>;
type Error = BlockExecutionError;

fn execute_and_verify_one(&mut self, _: Self::Input<'_>) -> Result<(), Self::Error> {
Expand Down
13 changes: 6 additions & 7 deletions crates/net/downloaders/src/bodies/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,23 @@ use reth_network_p2p::{
bodies::{downloader::BodyDownloader, response::BlockResponse},
error::{DownloadError, DownloadResult},
};
use reth_primitives::BlockBody;
use std::ops::RangeInclusive;
use std::{fmt::Debug, ops::RangeInclusive};

/// A [`BodyDownloader`] implementation that does nothing.
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct NoopBodiesDownloader;
pub struct NoopBodiesDownloader<B>(std::marker::PhantomData<B>);

impl BodyDownloader for NoopBodiesDownloader {
type Body = BlockBody;
impl<B: Debug + Send + Sync + Unpin + 'static> BodyDownloader for NoopBodiesDownloader<B> {
type Body = B;

fn set_download_range(&mut self, _: RangeInclusive<BlockNumber>) -> DownloadResult<()> {
Ok(())
}
}

impl Stream for NoopBodiesDownloader {
type Item = Result<Vec<BlockResponse<alloy_consensus::Header, BlockBody>>, DownloadError>;
impl<B> Stream for NoopBodiesDownloader<B> {
type Item = Result<Vec<BlockResponse<alloy_consensus::Header, B>>, DownloadError>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
Expand Down
14 changes: 7 additions & 7 deletions crates/net/downloaders/src/headers/noop.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
use alloy_consensus::Header;
use futures::Stream;
use reth_network_p2p::headers::{
downloader::{HeaderDownloader, SyncTarget},
error::HeadersDownloaderError,
};
use reth_primitives::SealedHeader;
use std::fmt::Debug;

/// A [`HeaderDownloader`] implementation that does nothing.
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct NoopHeaderDownloader;
pub struct NoopHeaderDownloader<H>(std::marker::PhantomData<H>);

impl HeaderDownloader for NoopHeaderDownloader {
type Header = Header;
impl<H: Debug + Send + Sync + Unpin + 'static> HeaderDownloader for NoopHeaderDownloader<H> {
type Header = H;

fn update_local_head(&mut self, _: SealedHeader) {}
fn update_local_head(&mut self, _: SealedHeader<H>) {}

fn update_sync_target(&mut self, _: SyncTarget) {}

fn set_batch_size(&mut self, _: usize) {}
}

impl Stream for NoopHeaderDownloader {
type Item = Result<Vec<SealedHeader>, HeadersDownloaderError<Header>>;
impl<H> Stream for NoopHeaderDownloader<H> {
type Item = Result<Vec<SealedHeader<H>>, HeadersDownloaderError<H>>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
Expand Down
20 changes: 5 additions & 15 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::{
use alloy_primitives::{BlockNumber, B256};
use eyre::{Context, OptionExt};
use rayon::ThreadPoolBuilder;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
use reth_config::{config::EtlConfig, PruneConfig};
use reth_consensus::noop::NoopConsensus;
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
use reth_db_common::init::{init_genesis, InitStorageError};
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
Expand Down Expand Up @@ -383,12 +383,7 @@ where
pub async fn create_provider_factory<N>(&self) -> eyre::Result<ProviderFactory<N>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
N::Primitives: FullNodePrimitives<
Block = reth_primitives::Block,
BlockBody = reth_primitives::BlockBody,
Receipt = reth_primitives::Receipt,
BlockHeader = reth_primitives::Header,
>,
N::Primitives: FullNodePrimitives<BlockHeader = reth_primitives::Header>,
{
let factory = ProviderFactory::new(
self.right().clone(),
Expand Down Expand Up @@ -420,10 +415,10 @@ where
.add_stages(DefaultStages::new(
factory.clone(),
tip_rx,
Arc::new(EthBeaconConsensus::new(self.chain_spec())),
Arc::new(NoopConsensus::default()),
NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(),
NoopBlockExecutorProvider::default(),
NoopBlockExecutorProvider::<N::Primitives>::default(),
self.toml_config().stages.clone(),
self.prune_modes(),
))
Expand Down Expand Up @@ -455,12 +450,7 @@ where
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
N::Primitives: FullNodePrimitives<
Block = reth_primitives::Block,
BlockBody = reth_primitives::BlockBody,
Receipt = reth_primitives::Receipt,
BlockHeader = reth_primitives::Header,
>,
N::Primitives: FullNodePrimitives<BlockHeader = reth_primitives::Header>,
{
let factory = self.create_provider_factory().await?;
let ctx = LaunchContextWith {
Expand Down
Loading
Loading