Skip to content

Commit

Permalink
fix: use correct cursor on chainsync roll forward #2
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Dec 15, 2021
1 parent 2f560cb commit 23d76bf
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 57 deletions.
41 changes: 21 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 16 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,32 @@ authors = [


[dependencies]
pallas = "0.3.1"
pallas = "0.3.3"
# pallas = { path = "../pallas/pallas" }
hex = "0.4.3"
net2 = "0.2.37"
bech32 = "0.8.1"
clap = "2.33.3"
log = "0.4.14"
env_logger = "0.9.0"
crossterm = "0.20"
tui = { version = "0.16", default-features = false, features = ['crossterm'] }
merge = "0.1.0"
config = "0.11.0"
serde = "1.0.130"
serde_derive = "1.0.130"
kafka = "0.8.0"
serde_json = "1.0.72"
bech32 = "0.8.1"
serde_derive = "1.0.130"

# feature: tuisink
tui = { version = "0.16", optional = true, default-features = false, features = ['crossterm'] }

# feature: kafkasink
kafka = { version = "0.8.0", optional = true }

# required for CI to complete successfully
openssl = { version = "0.10", features = ["vendored"] }
openssl = { version = "0.10", optional = true, features = ["vendored"] }
minicbor = "0.12.0"

[features]
default = []
kafkasink = ["kafka", "openssl"]
tuisink = ["tui"]
8 changes: 7 additions & 1 deletion src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use clap::{value_t, ArgMatches};
use config::{Config, ConfigError, Environment, File};
use log::debug;
use oura::framework::{BootstrapResult, Event, SinkConfig, SourceConfig};
use oura::sinks::kafka::Config as KafkaConfig;
use oura::sinks::terminal::Config as TerminalConfig;
use oura::sources::n2c::Config as N2CConfig;
use oura::sources::n2n::Config as N2NConfig;
use serde_derive::Deserialize;

#[cfg(kafkasink)]
use oura::sinks::kafka::Config as KafkaConfig;

use crate::Error;

#[derive(Debug, Deserialize)]
Expand All @@ -32,13 +34,17 @@ impl SourceConfig for Source {
#[serde(tag = "type")]
enum Sink {
Terminal(TerminalConfig),

#[cfg(kafkasink)]
Kafka(KafkaConfig),
}

impl SinkConfig for Sink {
fn bootstrap(&self, input: Receiver<Event>) -> BootstrapResult {
match self {
Sink::Terminal(c) => c.bootstrap(input),

#[cfg(kafkasink)]
Sink::Kafka(c) => c.bootstrap(input),
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
pub mod kafka;
pub mod terminal;

#[cfg(tuisink)]
pub mod tui;

#[cfg(kafkasink)]
pub mod kafka;
60 changes: 39 additions & 21 deletions src/sources/n2c/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ mod setup;

pub use setup::*;

use log::error;
use log::{error, info};

use pallas::{
ledger::alonzo::{BlockWrapper, Fragment},
ledger::alonzo::{crypto, Block, BlockWrapper, Fragment},
ouroboros::network::{
chainsync::{BlockBody, ClientConsumer, Observer, Tip},
machines::{primitives::Point, run_agent},
chainsync::{BlockLike, Consumer, Observer, Tip},
machines::{
primitives::Point, run_agent, DecodePayload, EncodePayload, PayloadDecoder,
PayloadEncoder,
},
multiplexer::Channel,
},
};
Expand All @@ -20,27 +23,42 @@ use crate::{
mapping::ToHex,
};

#[derive(Debug)]
pub struct Content(Block);

impl EncodePayload for Content {
fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}

impl DecodePayload for Content {
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.tag()?;
let bytes = d.bytes()?;
let BlockWrapper(_, block) = BlockWrapper::decode_fragment(bytes)?;
Ok(Content(block))
}
}

impl BlockLike for Content {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
let hash = crypto::hash_block_header(&self.0.header)?;
Ok(Point(self.0.header.header_body.slot, Vec::from(hash)))
}
}

#[derive(Debug)]
pub struct ChainObserver(EventWriter);

impl Observer<BlockBody> for ChainObserver {
impl Observer<Content> for ChainObserver {
fn on_block(
&self,
_cursor: &Option<Point>,
content: &BlockBody,
content: &Content,
) -> Result<(), Box<dyn std::error::Error>> {
let BlockBody(bytes) = content;
let maybe_block = BlockWrapper::decode_fragment(&bytes[..]);

match maybe_block {
Ok(BlockWrapper(_, block)) => {
block.write_events(&self.0)?;
}
Err(err) => {
log::error!("{:?}", err);
log::info!("{}", hex::encode(bytes));
}
};
let Content(block) = content;
block.write_events(&self.0)?;

Ok(())
}
Expand All @@ -53,20 +71,20 @@ impl Observer<BlockBody> for ChainObserver {
}

fn on_intersect_found(&self, point: &Point, _tip: &Tip) -> Result<(), Error> {
println!("intersect found {:#?}", point);
info!("intersect found {:?}", point);
Ok(())
}

fn on_tip_reached(&self) -> Result<(), Error> {
println!("tip reached");
info!("tip reached");
Ok(())
}
}

fn observe_forever(mut channel: Channel, from: Point, output: Sender<Event>) -> Result<(), Error> {
let writer = EventWriter::new(output);
let observer = ChainObserver(writer);
let agent = ClientConsumer::initial(vec![from], observer);
let agent = Consumer::<Content, _>::initial(vec![from], observer);
let agent = run_agent(agent, &mut channel)?;
error!("chainsync agent final state: {:?}", agent.state);

Expand Down
Loading

0 comments on commit 23d76bf

Please sign in to comment.