Skip to content

Commit

Permalink
feat(node): Generate syncing related events
Browse files Browse the repository at this point in the history
  • Loading branch information
oblique committed Jun 26, 2024
1 parent e932d3e commit 8ae9fdf
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 30 deletions.
9 changes: 4 additions & 5 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(crate) async fn run(args: Params) -> Result<()> {
info!("Initialised store, present headers: {stored_ranges}");
}

let node = Node::new(NodeConfig {
let (_node, mut events) = Node::new_subscribed(NodeConfig {
network_id,
genesis_hash,
p2p_local_keypair,
Expand All @@ -80,14 +80,13 @@ pub(crate) async fn run(args: Params) -> Result<()> {
.await
.context("Failed to start node")?;

node.wait_connected_trusted().await?;
let mut events = node.event_subscriber();

// We have nothing else to do, but we want to keep main alive
while let Ok(ev) = events.recv().await {
match ev.event {
// Skip noisy events
NodeEvent::ShareSamplingResult { .. } => continue,
event @ (NodeEvent::FatalDaserError { .. } | NodeEvent::NetworkCompromised) => {
warn!("{event}");
}
event => info!("{event}"),
}
}
Expand Down
23 changes: 15 additions & 8 deletions node-wasm/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct NodeWorker {
}

impl NodeWorker {
async fn new(config: WasmNodeConfig) -> Result<Self> {
async fn new(events_channel_name: &str, config: WasmNodeConfig) -> Result<Self> {
let config = config.into_node_config().await?;

if let Ok(store_height) = config.store.head_height().await {
Expand All @@ -64,18 +64,16 @@ impl NodeWorker {
info!("Initialised new empty store");
}

let node = Node::new(config).await?;
let (node, events_sub) = Node::new_subscribed(config).await?;

let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid());
let events_channel = BroadcastChannel::new(&events_channel_name)
let events_channel = BroadcastChannel::new(events_channel_name)
.context("Failed to allocate BroadcastChannel")?;

let events_sub = node.event_subscriber();
spawn_local(event_forwarder_task(events_sub, events_channel));

Ok(Self {
node,
events_channel_name,
events_channel_name: events_channel_name.to_owned(),
})
}

Expand Down Expand Up @@ -238,9 +236,10 @@ impl NodeWorker {
}

#[wasm_bindgen]
pub async fn run_worker(queued_events: Vec<MessageEvent>) {
pub async fn run_worker(queued_events: Vec<MessageEvent>) -> Result<()> {
info!("Entered run_worker");
let (tx, mut rx) = mpsc::channel(WORKER_MESSAGE_SERVER_INCOMING_QUEUE_LENGTH);
let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid());

let mut message_server: Box<dyn MessageServer> = if SharedWorker::is_worker_type() {
Box::new(SharedWorkerMessageServer::new(tx.clone(), queued_events))
Expand All @@ -265,8 +264,14 @@ pub async fn run_worker(queued_events: Vec<MessageEvent>) {
NodeCommand::IsRunning => {
message_server.respond_to(client_id, WorkerResponse::IsRunning(false));
}
NodeCommand::GetEventsChannelName => {
message_server.respond_to(
client_id,
WorkerResponse::EventsChannelName(events_channel_name.clone()),
);
}
NodeCommand::StartNode(config) => {
match NodeWorker::new(config).await {
match NodeWorker::new(&events_channel_name, config).await {
Ok(node) => {
worker = Some(node);
message_server
Expand All @@ -293,6 +298,8 @@ pub async fn run_worker(queued_events: Vec<MessageEvent>) {
}

info!("Channel to WorkerMessageServer closed, exiting the SharedWorker");

Ok(())
}

async fn event_forwarder_task(mut events_sub: EventSubscriber, events_channel: BroadcastChannel) {
Expand Down
49 changes: 49 additions & 0 deletions node/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ impl EventChannel {
rx: self.tx.subscribe(),
}
}

/// Returns if there are any active subscribers or not.
pub fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
}

impl Default for EventChannel {
Expand All @@ -94,6 +99,10 @@ impl EventPublisher {
file_line: location.line(),
});
}

pub(crate) fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
}

impl EventSubscriber {
Expand Down Expand Up @@ -219,6 +228,23 @@ pub enum NodeEvent {
/// A human readable error.
error: String,
},

NewSubjectiveHead {
height: u64,
},

AddedHeaderFromHeaderSub {
height: u64,
},

FetchingHeadHeader,

FetchingHeaders {
from_height: u64,
to_height: u64,
},

NetworkCompromised,
}

impl fmt::Display for NodeEvent {
Expand Down Expand Up @@ -272,6 +298,29 @@ impl fmt::Display for NodeEvent {
NodeEvent::FatalDaserError { error } => {
write!(f, "Daser stopped because of a fatal error: {error}")
}
NodeEvent::NewSubjectiveHead { height } => {
write!(f, "New subjective head: {height}")
}
NodeEvent::AddedHeaderFromHeaderSub { height } => {
write!(f, "Added header {height} from HeaderSub")
}
NodeEvent::FetchingHeadHeader => {
write!(f, "Fetching header of network head block")
}
NodeEvent::FetchingHeaders {
from_height,
to_height,
} => {
if from_height == to_height {
write!(f, "Featching header of {from_height} block")
} else {
write!(f, "Featching headers of {from_height}-{to_height} blocks")
}
}
NodeEvent::NetworkCompromised => {
write!(f, "The network is compromised and should not be trusted. ")?;
write!(f, "Node stopped synchronizing and sampling, but you can still make some queries to the network.")
}
}
}
}
Expand Down
36 changes: 30 additions & 6 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::daser::{Daser, DaserArgs, DaserError};
use crate::events::{EventChannel, EventSubscriber};
use crate::events::{EventChannel, EventSubscriber, NodeEvent};
use crate::executor::spawn;
use crate::p2p::{P2p, P2pArgs, P2pError};
use crate::peer_tracker::PeerTrackerInfo;
Expand Down Expand Up @@ -92,10 +92,23 @@ where
{
/// Creates and starts a new celestia node with a given config.
pub async fn new<B>(config: NodeConfig<B, S>) -> Result<Self>
where
B: Blockstore + 'static,
{
let (node, _) = Node::new_subscribed(config).await?;
Ok(node)
}

/// Creates and starts a new celestia node with a given config.
///
/// Returns `Node` alogn with `EventSubscriber`. Use this to avoid missing any
/// events that will be generated on the construction of the node.
pub async fn new_subscribed<B>(config: NodeConfig<B, S>) -> Result<(Self, EventSubscriber)>
where
B: Blockstore + 'static,
{
let event_channel = EventChannel::new();
let event_sub = event_channel.subscribe();
let store = Arc::new(config.store);

let p2p = Arc::new(P2p::start(P2pArgs {
Expand All @@ -111,6 +124,7 @@ where
let syncer = Arc::new(Syncer::start(SyncerArgs {
store: store.clone(),
p2p: p2p.clone(),
event_pub: event_channel.publisher(),
})?);

let daser = Arc::new(Daser::start(DaserArgs {
Expand All @@ -122,32 +136,42 @@ where
// spawn the task that will stop the services when the fraud is detected
let network_compromised_token = p2p.get_network_compromised_token().await?;
let tasks_cancellation_token = CancellationToken::new();

spawn({
let syncer = syncer.clone();
let daser = daser.clone();
let tasks_cancellation_token = tasks_cancellation_token.child_token();
let event_pub = event_channel.publisher();

async move {
select! {
_ = tasks_cancellation_token.cancelled() => (),
_ = network_compromised_token.cancelled() => {
warn!("The network is compromised and should not be trusted.");
warn!("The node will stop synchronizing and sampling.");
warn!("You can still make some queries to the network.");
syncer.stop();
daser.stop();

if event_pub.has_subscribers() {
event_pub.send(NodeEvent::NetworkCompromised);
} else {
// This is a very important message and we want to log it if user
// does not consume our events.
warn!("{}", NodeEvent::NetworkCompromised);
}
}
}
}
});

Ok(Node {
let node = Node {
event_channel,
p2p,
store,
syncer,
_daser: daser,
tasks_cancellation_token,
})
};

Ok((node, event_sub))
}

/// Returns a new `EventSubscriber`.
Expand Down
Loading

0 comments on commit 8ae9fdf

Please sign in to comment.