Skip to content

Commit

Permalink
fix: Add missing finalize option to N2C (txpipe#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored and kodemill committed Jun 24, 2022
1 parent d0fde3c commit 1321bbb
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/bin/oura/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
since: None,
intersect,
retry_policy: None,
finalize: None,
}),
};

Expand Down
1 change: 1 addition & 0 deletions src/bin/oura/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
since: None,
intersect,
retry_policy: None,
finalize: None,
}),
};

Expand Down
18 changes: 17 additions & 1 deletion src/sources/n2c/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ use pallas::network::{
multiplexer::StdChannel,
};

use crate::{mapper::EventWriter, sources::n2c::blocks::CborHolder, Error};
use crate::{
mapper::EventWriter,
sources::{n2c::blocks::CborHolder, should_finalize, FinalizeConfig},
Error,
};

use super::blocks::MultiEraBlock;

Expand All @@ -14,6 +18,8 @@ struct ChainObserver {
min_depth: usize,
blocks: HashMap<Point, CborHolder>,
event_writer: EventWriter,
finalize_config: Option<FinalizeConfig>,
block_count: u64,
}

// workaround to put a stop on excessive debug requirement coming from Pallas
Expand Down Expand Up @@ -69,6 +75,13 @@ impl<'b> chainsync::Observer<chainsync::BlockContent> for ChainObserver {
.event_writer
.crawl_shelley_with_cbor(&model, block.cbor(), era.into())?,
};

self.block_count += 1;

// evaluate if we should finalize the thread according to config
if should_finalize(&self.finalize_config, &point, self.block_count) {
return Ok(chainsync::Continuation::DropOut);
}
}

log_buffer_state(&self.chain_buffer);
Expand Down Expand Up @@ -111,12 +124,15 @@ pub(crate) fn observe_forever(
event_writer: EventWriter,
known_points: Option<Vec<Point>>,
min_depth: usize,
finalize_config: Option<FinalizeConfig>,
) -> Result<(), Error> {
let observer = ChainObserver {
chain_buffer: Default::default(),
blocks: HashMap::new(),
min_depth,
event_writer,
block_count: 0,
finalize_config,
};

let agent = chainsync::BlockConsumer::initial(known_points, observer);
Expand Down
7 changes: 5 additions & 2 deletions src/sources/n2c/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
pipelining::{new_inter_stage_channel, PartialBootstrapResult, SourceProvider},
sources::{
common::{AddressArg, MagicArg, PointArg},
define_start_point, setup_multiplexer, IntersectArg, RetryPolicy,
define_start_point, setup_multiplexer, FinalizeConfig, IntersectArg, RetryPolicy,
},
utils::{ChainWellKnownInfo, WithUtils},
Error,
Expand Down Expand Up @@ -49,6 +49,8 @@ pub struct Config {
pub min_depth: usize,

pub retry_policy: Option<RetryPolicy>,

pub finalize: Option<FinalizeConfig>,
}

fn do_handshake(channel: &mut StdChannel, magic: u64) -> Result<(), Error> {
Expand Down Expand Up @@ -98,8 +100,9 @@ impl SourceProvider for WithUtils<Config> {
log::info!("starting chain sync from: {:?}", &known_points);

let min_depth = self.inner.min_depth;
let finalize = self.inner.finalize.clone();
let handle = std::thread::spawn(move || {
observe_forever(cs_channel, writer, known_points, min_depth)
observe_forever(cs_channel, writer, known_points, min_depth, finalize)
.expect("chainsync loop failed");
});

Expand Down

0 comments on commit 1321bbb

Please sign in to comment.