Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node): implement fraud-sub and services stopping on valid befp #233

Merged
merged 11 commits into from
Feb 29, 2024
37 changes: 37 additions & 0 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ use celestia_types::ExtendedHeader;
use libp2p::identity::Keypair;
use libp2p::swarm::NetworkInfo;
use libp2p::{Multiaddr, PeerId};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::daser::{Daser, DaserArgs, DaserError};
use crate::executor::spawn;
use crate::p2p::{P2p, P2pArgs, P2pError};
use crate::peer_tracker::PeerTrackerInfo;
use crate::store::{Store, StoreError};
Expand Down Expand Up @@ -77,6 +81,7 @@ where
store: Arc<S>,
syncer: Arc<Syncer<S>>,
_daser: Arc<Daser>,
on_network_compromised_task_cancellation_token: CancellationToken,
zvolin marked this conversation as resolved.
Show resolved Hide resolved
}

impl<S> Node<S>
Expand Down Expand Up @@ -110,11 +115,33 @@ where
store: store.clone(),
})?);

// spawn the task that will stop the services when the fraud is detected
let network_compromised_token = p2p.get_network_compromised_token().await?;
let on_network_compromised_task_cancellation_token = CancellationToken::new();
spawn({
let syncer = syncer.clone();
let daser = daser.clone();
let cancellation_token = on_network_compromised_task_cancellation_token.clone();
zvolin marked this conversation as resolved.
Show resolved Hide resolved
async move {
select! {
_ = cancellation_token.cancelled() => (),
_ = network_compromised_token.cancelled() => {
warn!("The network is compromised and should not be trusted.");
warn!("The node will stop synchronizing and sampling.");
warn!("You can still make some queries to the network.");
syncer.stop();
daser.stop();
}
}
}
});

Ok(Node {
p2p,
store,
syncer,
_daser: daser,
on_network_compromised_task_cancellation_token,
})
}

Expand Down Expand Up @@ -262,3 +289,13 @@ where
Ok(self.store.get_range(range).await?)
}
}

impl<S> Drop for Node<S>
where
S: Store,
{
fn drop(&mut self) {
// we have to cancel the task to drop the Arc's passed to it
self.on_network_compromised_task_cancellation_token.cancel();
}
}
100 changes: 94 additions & 6 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
//! It is a high level integration of various p2p protocols used by Celestia nodes.
//! Currently supporting:
//! - libp2p-identitfy
//! - header-sub topic on libp2p-gossipsub
//! - libp2p-kad
//! - libp2p-autonat
//! - libp2p-ping
//! - header-sub topic on libp2p-gossipsub
//! - fraud-sub topic on libp2p-gossipsub
//! - header-ex client
//! - header-ex server
//! - bitswap 1.2.0
//! - shwap - celestia's data availability protocol on top of bitswap

use std::collections::HashMap;
use std::future::poll_fn;
Expand All @@ -20,12 +23,12 @@ use std::time::Duration;
use blockstore::Blockstore;
use celestia_proto::p2p::pb::{header_request, HeaderRequest};
use celestia_tendermint_proto::Protobuf;
use celestia_types::hash::Hash;
use celestia_types::namespaced_data::NamespacedData;
use celestia_types::nmt::Namespace;
use celestia_types::row::Row;
use celestia_types::sample::Sample;
use celestia_types::ExtendedHeader;
use celestia_types::{fraud_proof::BadEncodingFraudProof, hash::Hash};
use celestia_types::{ExtendedHeader, FraudProof, Height};
use cid::Cid;
use futures::StreamExt;
use instant::Instant;
Expand All @@ -44,6 +47,7 @@ use libp2p::{
use smallvec::SmallVec;
use tokio::select;
use tokio::sync::{mpsc, oneshot, watch};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, trace, warn};

mod header_ex;
Expand All @@ -60,8 +64,8 @@ use crate::peer_tracker::PeerTracker;
use crate::peer_tracker::PeerTrackerInfo;
use crate::store::Store;
use crate::utils::{
celestia_protocol_id, gossipsub_ident_topic, MultiaddrExt, OneshotResultSender,
OneshotResultSenderExt, OneshotSenderExt,
celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic, MultiaddrExt,
OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt,
};

pub use crate::p2p::header_ex::HeaderExError;
Expand All @@ -84,6 +88,10 @@ pub(crate) const MAX_MH_SIZE: usize = 64;

const GET_SAMPLE_TIMEOUT: Duration = Duration::from_secs(10);

// all fraud proofs for height bigger than head height by this threshold
// will be ignored
const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;

type Result<T, E = P2pError> = std::result::Result<T, E>;

/// Representation of all the errors that can occur when interacting with [`P2p`].
Expand Down Expand Up @@ -207,6 +215,9 @@ pub(crate) enum P2pCmd {
cid: Cid,
respond_to: OneshotResultSender<Vec<u8>, P2pError>,
},
GetNetworkCompromisedToken {
respond_to: oneshot::Sender<CancellationToken>,
},
}

impl P2p {
Expand Down Expand Up @@ -481,6 +492,19 @@ impl P2p {
})
.await
}

/// Get the cancellation token which will be cancelled when the network gets compromised.
///
/// After this token is cancelled, the network should be treated as insincere
/// and should not be trusted.
pub async fn get_network_compromised_token(&self) -> Result<CancellationToken> {
oblique marked this conversation as resolved.
Show resolved Hide resolved
let (tx, rx) = oneshot::channel();

self.send_command(P2pCmd::GetNetworkCompromisedToken { respond_to: tx })
.await?;

Ok(rx.await?)
}
}

/// Our network behaviour.
Expand All @@ -506,10 +530,13 @@ where
{
swarm: Swarm<Behaviour<B, S>>,
header_sub_topic_hash: TopicHash,
bad_encoding_fraud_sub_topic: TopicHash,
cmd_rx: mpsc::Receiver<P2pCmd>,
peer_tracker: Arc<PeerTracker>,
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
network_compromised_token: CancellationToken,
store: Arc<S>,
}

impl<B, S> Worker<B, S>
Expand All @@ -534,7 +561,9 @@ where
));

let header_sub_topic = gossipsub_ident_topic(&args.network_id, "/header-sub/v0.0.1");
let gossipsub = init_gossipsub(&args, [&header_sub_topic])?;
let bad_encoding_fraud_sub_topic =
fraudsub_ident_topic(BadEncodingFraudProof::TYPE, &args.network_id);
let gossipsub = init_gossipsub(&args, [&header_sub_topic, &bad_encoding_fraud_sub_topic])?;

let kademlia = init_kademlia(&args)?;
let bitswap = init_bitswap(args.blockstore, args.store.clone(), &args.network_id)?;
Expand Down Expand Up @@ -572,10 +601,13 @@ where
Ok(Worker {
cmd_rx,
swarm,
bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
header_sub_topic_hash: header_sub_topic.hash(),
peer_tracker,
header_sub_watcher,
bitswap_queries: HashMap::new(),
network_compromised_token: CancellationToken::new(),
store: args.store,
})
}

Expand Down Expand Up @@ -712,6 +744,9 @@ where
P2pCmd::GetShwapCid { cid, respond_to } => {
self.on_get_shwap_cid(cid, respond_to);
}
P2pCmd::GetNetworkCompromisedToken { respond_to } => {
respond_to.maybe_send(self.network_compromised_token.child_token())
}
}

Ok(())
Expand Down Expand Up @@ -764,6 +799,9 @@ where

let acceptance = if message.topic == self.header_sub_topic_hash {
self.on_header_sub_message(&message.data[..]).await
} else if message.topic == self.bad_encoding_fraud_sub_topic {
self.on_bad_encoding_fraud_sub_message(&message.data[..])
.await
} else {
trace!("Unhandled gossipsub message");
gossipsub::MessageAcceptance::Ignore
Expand Down Expand Up @@ -899,6 +937,52 @@ where
gossipsub::MessageAcceptance::Ignore
}
}

#[instrument(skip_all)]
async fn on_bad_encoding_fraud_sub_message(
&mut self,
data: &[u8],
) -> gossipsub::MessageAcceptance {
let Ok(befp) = BadEncodingFraudProof::decode(data) else {
trace!("Malformed or invalid bad encoding fraud proof from froud-sub");
// TODO: celestia blacklists here, should we too?
zvolin marked this conversation as resolved.
Show resolved Hide resolved
return gossipsub::MessageAcceptance::Reject;
};

let height = befp.height().value();
let current_height = if let Some(network_height) = network_head(&self.header_sub_watcher) {
network_height.value()
} else if let Ok(local_head) = self.store.get_head().await {
local_head.height().value()
} else {
// we aren't tracking the network and have uninitialized store
return gossipsub::MessageAcceptance::Ignore;
};

if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
// does this threshold make any sense if we're gonna ignore it anyway
// since we won't have the header
return gossipsub::MessageAcceptance::Ignore;
}
oblique marked this conversation as resolved.
Show resolved Hide resolved

let hash = befp.header_hash();
let Ok(header) = self.store.get_by_hash(&hash).await else {
// we can't verify the proof without a header
// TODO: should we then store it and wait for the height? celestia doesn't
return gossipsub::MessageAcceptance::Ignore;
zvolin marked this conversation as resolved.
Show resolved Hide resolved
};

if let Err(e) = befp.validate(&header) {
trace!("Received invalid bad encoding fraud proof: {e}");
zvolin marked this conversation as resolved.
Show resolved Hide resolved
return gossipsub::MessageAcceptance::Reject;
}

warn!("Received a valid bad encoding fraud proof");
// trigger cancellation for all services
self.network_compromised_token.cancel();

gossipsub::MessageAcceptance::Accept
}
}

/// Awaits at least one channel from the `bitswap_queries` to close.
Expand Down Expand Up @@ -1009,3 +1093,7 @@ where
.client_set_send_dont_have(false)
.build())
}

fn network_head(watcher: &watch::Sender<Option<ExtendedHeader>>) -> Option<Height> {
watcher.borrow().as_ref().map(|header| header.height())
}
7 changes: 7 additions & 0 deletions node/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ pub(crate) fn gossipsub_ident_topic(network: &str, topic: &str) -> IdentTopic {
IdentTopic::new(s)
}

pub(crate) fn fraudsub_ident_topic(proof_type: &str, network: &str) -> IdentTopic {
let proof_type = proof_type.trim_matches('/');
let network = network.trim_matches('/');
let s = format!("/{proof_type}/fraud-sub/{network}/v0.0.1");
IdentTopic::new(s)
}

pub(crate) type OneshotResultSender<T, E> = oneshot::Sender<Result<T, E>>;

pub(crate) trait OneshotSenderExt<T>
Expand Down
Loading
Loading