Skip to content

Commit

Permalink
fix: only terminate the stream if range is empty (#13281)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexey Shekhirin <[email protected]>
  • Loading branch information
mattsse and shekhirin authored Dec 11, 2024
1 parent b6e682e commit 1602bae
Showing 1 changed file with 46 additions and 38 deletions.
84 changes: 46 additions & 38 deletions crates/exex/exex/src/backfill/stream.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
use super::job::BackfillJobResult;
use crate::{BackfillJob, SingleBlockBackfillJob};
use std::{
ops::RangeInclusive,
pin::Pin,
task::{ready, Context, Poll},
};

use alloy_primitives::BlockNumber;
use futures::{
stream::{FuturesOrdered, Stream},
Expand All @@ -17,10 +12,13 @@ use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
use reth_prune_types::PruneModes;
use reth_stages_api::ExecutionStageThresholds;
use reth_tracing::tracing::debug;
use std::{
ops::RangeInclusive,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::task::JoinHandle;

use super::job::BackfillJobResult;

/// The default parallelism for active tasks in [`StreamBackfillJob`].
pub(crate) const DEFAULT_PARALLELISM: usize = 4;
/// The default batch size for active tasks in [`StreamBackfillJob`].
Expand Down Expand Up @@ -157,33 +155,44 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

// Spawn new tasks only if we are below the parallelism configured.
while this.tasks.len() < this.parallelism {
// Take the next `batch_size` blocks from the range and calculate the range bounds
let mut range = this.range.by_ref().take(this.batch_size);
let start = range.next();
let range_bounds = start.zip(range.last().or(start));

// Create the range from the range bounds. If it is empty, we are done.
let Some(range) = range_bounds.map(|(first, last)| first..=last) else {
debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill");
break;
};

// Spawn a new task for that range
debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task");
let job = Box::new(BackfillJob {
executor: this.executor.clone(),
provider: this.provider.clone(),
prune_modes: this.prune_modes.clone(),
thresholds: this.thresholds.clone(),
range,
stream_parallelism: this.parallelism,
}) as BackfillTaskIterator<_>;
this.push_back(job);
loop {
// Spawn new tasks only if we are below the parallelism configured.
while this.tasks.len() < this.parallelism {
// Take the next `batch_size` blocks from the range and calculate the range bounds
let mut range = this.range.by_ref().take(this.batch_size);
let start = range.next();
let range_bounds = start.zip(range.last().or(start));

// Create the range from the range bounds. If it is empty, we are done.
let Some(range) = range_bounds.map(|(first, last)| first..=last) else {
debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill");
break;
};

// Spawn a new task for that range
debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task");
let job = Box::new(BackfillJob {
executor: this.executor.clone(),
provider: this.provider.clone(),
prune_modes: this.prune_modes.clone(),
thresholds: this.thresholds.clone(),
range,
stream_parallelism: this.parallelism,
}) as BackfillTaskIterator<_>;
this.push_back(job);
}

let res = ready!(this.poll_next_task(cx));

if res.is_some() {
return Poll::Ready(res);
}

if this.range.is_empty() {
// only terminate the stream if there are no more blocks to process
return Poll::Ready(None);
}
}

this.poll_next_task(cx)
}
}

Expand Down Expand Up @@ -310,10 +319,9 @@ mod tests {
blocks_and_execution_outcome(provider_factory, chain_spec, key_pair)?;

// Backfill the same range
let factory =
BackfillJobFactory::new(executor.clone(), blockchain_db.clone()).with_thresholds(
ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() },
);
let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone())
.with_thresholds(ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() })
.with_stream_parallelism(1);
let mut backfill_stream = factory.backfill(1..=2).into_stream();
let mut chain = backfill_stream.next().await.unwrap().unwrap();
chain.execution_outcome_mut().state_mut().reverts.sort();
Expand Down

0 comments on commit 1602bae

Please sign in to comment.