Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: rename pipeline references to backfill sync #9223

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/engine/tree/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ mod tests {
.build(),
);

// force the pipeline to be "done" after 5 blocks
// force the pipeline to be "done" after `pipeline_done_after` blocks
let pipeline = TestPipelineBuilder::new()
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(BlockNumber::from(pipeline_done_after)),
Expand Down
74 changes: 37 additions & 37 deletions crates/engine/tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use std::{
///
/// ## Control flow
///
/// The [`ChainOrchestrator`] is responsible for controlling the pipeline sync and additional hooks.
/// The [`ChainOrchestrator`] is responsible for controlling the backfill sync and additional hooks.
/// It polls the given `handler`, which is responsible for advancing the chain, how is up to the
/// handler. However, due to database restrictions (e.g. exclusive write access), following
/// invariants apply:
/// - If the handler requests a backfill run (e.g. [`BackfillAction::Start`]), the handler must
/// ensure that while the pipeline is running, no other write access is granted.
/// ensure that while the backfill sync is running, no other write access is granted.
/// - At any time the [`ChainOrchestrator`] can request exclusive write access to the database
/// (e.g. if pruning is required), but will not do so until the handler has acknowledged the
/// request for write access.
Expand All @@ -35,18 +35,18 @@ where
{
/// The handler for advancing the chain.
handler: T,
/// Controls pipeline sync.
pipeline: P,
/// Controls backfill sync.
backfill_sync: P,
}

impl<T, P> ChainOrchestrator<T, P>
where
T: ChainHandler + Unpin,
P: BackfillSync + Unpin,
{
/// Creates a new [`ChainOrchestrator`] with the given handler and pipeline.
pub const fn new(handler: T, pipeline: P) -> Self {
Self { handler, pipeline }
/// Creates a new [`ChainOrchestrator`] with the given handler and backfill sync.
pub const fn new(handler: T, backfill_sync: P) -> Self {
Self { handler, backfill_sync }
}

/// Returns the handler
Expand All @@ -68,34 +68,34 @@ where

// This loop polls the components
//
// 1. Polls the pipeline to completion, if active.
// 1. Polls the backfill sync to completion, if active.
// 2. Advances the chain by polling the handler.
'outer: loop {
// try to poll the pipeline to completion, if active
match this.pipeline.poll(cx) {
Poll::Ready(pipeline_event) => match pipeline_event {
// try to poll the backfill sync to completion, if active
match this.backfill_sync.poll(cx) {
Poll::Ready(backfill_sync_event) => match backfill_sync_event {
BackfillEvent::Idle => {}
BackfillEvent::Started(_) => {
// notify handler that pipeline started
this.handler.on_event(FromOrchestrator::PipelineStarted);
return Poll::Ready(ChainEvent::PipelineStarted);
// notify handler that backfill sync started
this.handler.on_event(FromOrchestrator::BackfillSyncStarted);
return Poll::Ready(ChainEvent::BackfillSyncStarted);
}
BackfillEvent::Finished(res) => {
return match res {
Ok(event) => {
tracing::debug!(?event, "pipeline finished");
// notify handler that pipeline finished
this.handler.on_event(FromOrchestrator::PipelineFinished);
Poll::Ready(ChainEvent::PipelineFinished)
tracing::debug!(?event, "backfill sync finished");
// notify handler that backfill sync finished
this.handler.on_event(FromOrchestrator::BackfillSyncFinished);
Poll::Ready(ChainEvent::BackfillSyncFinished)
}
Err(err) => {
tracing::error!( %err, "pipeline failed");
tracing::error!( %err, "backfill sync failed");
Poll::Ready(ChainEvent::FatalError)
}
}
}
BackfillEvent::TaskDropped(err) => {
tracing::error!( %err, "pipeline task dropped");
tracing::error!( %err, "backfill sync task dropped");
return Poll::Ready(ChainEvent::FatalError);
}
},
Expand All @@ -106,9 +106,9 @@ where
match this.handler.poll(cx) {
Poll::Ready(handler_event) => {
match handler_event {
HandlerEvent::Pipeline(target) => {
// trigger pipeline and start polling it
this.pipeline.on_action(BackfillAction::Start(target));
HandlerEvent::BackfillSync(target) => {
// trigger backfill sync and start polling it
this.backfill_sync.on_action(BackfillAction::Start(target));
continue 'outer
}
HandlerEvent::Event(ev) => {
Expand Down Expand Up @@ -153,10 +153,10 @@ enum SyncMode {
/// These are meant to be used for observability and debugging purposes.
#[derive(Debug)]
pub enum ChainEvent<T> {
/// Pipeline sync started
PipelineStarted,
/// Pipeline sync finished
PipelineFinished,
/// Backfill sync started
BackfillSyncStarted,
/// Backfill sync finished
BackfillSyncFinished,
/// Fatal error
FatalError,
/// Event emitted by the handler
Expand All @@ -180,35 +180,35 @@ pub trait ChainHandler: Send + Sync {
/// Events/Requests that the [`ChainHandler`] can emit to the [`ChainOrchestrator`].
#[derive(Clone, Debug)]
pub enum HandlerEvent<T> {
/// Request to start a pipeline sync
Pipeline(PipelineTarget),
/// Request to start a backfill sync
BackfillSync(PipelineTarget),
/// Other event emitted by the handler
Event(T),
}

/// Internal events issued by the [`ChainOrchestrator`].
#[derive(Clone, Debug)]
pub enum FromOrchestrator {
/// Invoked when pipeline sync finished
PipelineFinished,
/// Invoked when pipeline started
PipelineStarted,
/// Invoked when backfill sync finished
BackfillSyncFinished,
/// Invoked when backfill sync started
BackfillSyncStarted,
}

/// Represents the state of the chain.
#[derive(Clone, Copy, PartialEq, Eq, Default, Debug)]
pub enum OrchestratorState {
/// Orchestrator has exclusive write access to the database.
PipelineActive,
BackfillSyncActive,
/// Node is actively processing the chain.
#[default]
Idle,
}

impl OrchestratorState {
/// Returns `true` if the state is [`OrchestratorState::PipelineActive`].
pub const fn is_pipeline_active(&self) -> bool {
matches!(self, Self::PipelineActive)
/// Returns `true` if the state is [`OrchestratorState::BackfillSyncActive`].
pub const fn is_backfill_sync_active(&self) -> bool {
matches!(self, Self::BackfillSyncActive)
}

/// Returns `true` if the state is [`OrchestratorState::Idle`].
Expand Down
8 changes: 4 additions & 4 deletions crates/engine/tree/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tracing::trace;
/// A trait that can download blocks on demand.
pub trait BlockDownloader: Send + Sync {
/// Handle an action.
fn on_action(&mut self, event: DownloadAction);
fn on_action(&mut self, action: DownloadAction);

/// Advance in progress requests if any
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome>;
Expand Down Expand Up @@ -65,7 +65,7 @@ where
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
/// Create a new instance
pub(crate) fn new(client: Client, consensus: Arc<dyn Consensus>) -> Self {
pub fn new(client: Client, consensus: Arc<dyn Consensus>) -> Self {
Self {
full_block_client: FullBlockClient::new(client, consensus),
inflight_full_block_requests: Vec::new(),
Expand Down Expand Up @@ -154,8 +154,8 @@ where
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
/// Handles incoming download actions.
fn on_action(&mut self, event: DownloadAction) {
match event {
fn on_action(&mut self, action: DownloadAction) {
match action {
DownloadAction::Clear => self.clear(),
DownloadAction::Download(request) => self.download(request),
}
Expand Down
6 changes: 3 additions & 3 deletions crates/engine/tree/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ where
RequestHandlerEvent::Idle => break,
RequestHandlerEvent::HandlerEvent(ev) => {
return match ev {
HandlerEvent::Pipeline(target) => {
// bubble up pipeline request
HandlerEvent::BackfillSync(target) => {
// bubble up backfill sync request request
self.downloader.on_action(DownloadAction::Clear);
Poll::Ready(HandlerEvent::Pipeline(target))
Poll::Ready(HandlerEvent::BackfillSync(target))
}
HandlerEvent::Event(ev) => {
// bubble up the event
Expand Down
Loading