Skip to content

Commit

Permalink
feat(node): implement fraud-sub and services stopping on valid befp (#…
Browse files Browse the repository at this point in the history
…233)

* feat(node): implement fraud-sub and services stopping on valid befp

* add missing protocols

* cleanup the task that stops services on drop

* add a comment on token cancelling

* imports reorg

* add peers blacklisting

* rename the helper fn

* Update node/src/p2p.rs

* rename the node's cancellation token

* remove the network compromised token from pub api

---------

Signed-off-by: Maciej Zwoliński <[email protected]>
Co-authored-by: Mikołaj Florkiewicz <[email protected]>
  • Loading branch information
zvolin and fl0rek authored Feb 29, 2024
1 parent cd14d8b commit e7fc4be
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 15 deletions.
37 changes: 37 additions & 0 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ use celestia_types::ExtendedHeader;
use libp2p::identity::Keypair;
use libp2p::swarm::NetworkInfo;
use libp2p::{Multiaddr, PeerId};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::daser::{Daser, DaserArgs, DaserError};
use crate::executor::spawn;
use crate::p2p::{P2p, P2pArgs, P2pError};
use crate::peer_tracker::PeerTrackerInfo;
use crate::store::{Store, StoreError};
Expand Down Expand Up @@ -77,6 +81,7 @@ where
store: Arc<S>,
syncer: Arc<Syncer<S>>,
_daser: Arc<Daser>,
tasks_cancellation_token: CancellationToken,
}

impl<S> Node<S>
Expand Down Expand Up @@ -110,11 +115,33 @@ where
store: store.clone(),
})?);

// spawn the task that will stop the services when the fraud is detected
let network_compromised_token = p2p.get_network_compromised_token().await?;
let tasks_cancellation_token = CancellationToken::new();
spawn({
let syncer = syncer.clone();
let daser = daser.clone();
let tasks_cancellation_token = tasks_cancellation_token.child_token();
async move {
select! {
_ = tasks_cancellation_token.cancelled() => (),
_ = network_compromised_token.cancelled() => {
warn!("The network is compromised and should not be trusted.");
warn!("The node will stop synchronizing and sampling.");
warn!("You can still make some queries to the network.");
syncer.stop();
daser.stop();
}
}
}
});

Ok(Node {
p2p,
store,
syncer,
_daser: daser,
tasks_cancellation_token,
})
}

Expand Down Expand Up @@ -262,3 +289,13 @@ where
Ok(self.store.get_range(range).await?)
}
}

impl<S> Drop for Node<S>
where
S: Store,
{
fn drop(&mut self) {
// we have to cancel the task to drop the Arc's passed to it
self.tasks_cancellation_token.cancel();
}
}
111 changes: 102 additions & 9 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
//! It is a high level integration of various p2p protocols used by Celestia nodes.
//! Currently supporting:
//! - libp2p-identitfy
//! - header-sub topic on libp2p-gossipsub
//! - libp2p-kad
//! - libp2p-autonat
//! - libp2p-ping
//! - header-sub topic on libp2p-gossipsub
//! - fraud-sub topic on libp2p-gossipsub
//! - header-ex client
//! - header-ex server
//! - bitswap 1.2.0
//! - shwap - celestia's data availability protocol on top of bitswap
use std::collections::HashMap;
use std::future::poll_fn;
Expand All @@ -20,12 +23,12 @@ use std::time::Duration;
use blockstore::Blockstore;
use celestia_proto::p2p::pb::{header_request, HeaderRequest};
use celestia_tendermint_proto::Protobuf;
use celestia_types::hash::Hash;
use celestia_types::namespaced_data::NamespacedData;
use celestia_types::nmt::Namespace;
use celestia_types::row::Row;
use celestia_types::sample::Sample;
use celestia_types::ExtendedHeader;
use celestia_types::{fraud_proof::BadEncodingFraudProof, hash::Hash};
use celestia_types::{ExtendedHeader, FraudProof, Height};
use cid::Cid;
use futures::StreamExt;
use instant::Instant;
Expand All @@ -44,6 +47,7 @@ use libp2p::{
use smallvec::SmallVec;
use tokio::select;
use tokio::sync::{mpsc, oneshot, watch};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, trace, warn};

mod header_ex;
Expand All @@ -60,8 +64,8 @@ use crate::peer_tracker::PeerTracker;
use crate::peer_tracker::PeerTrackerInfo;
use crate::store::Store;
use crate::utils::{
celestia_protocol_id, gossipsub_ident_topic, MultiaddrExt, OneshotResultSender,
OneshotResultSenderExt, OneshotSenderExt,
celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic, MultiaddrExt,
OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt,
};

pub use crate::p2p::header_ex::HeaderExError;
Expand All @@ -84,6 +88,10 @@ pub(crate) const MAX_MH_SIZE: usize = 64;

const GET_SAMPLE_TIMEOUT: Duration = Duration::from_secs(10);

// all fraud proofs for height bigger than head height by this threshold
// will be ignored
const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;

type Result<T, E = P2pError> = std::result::Result<T, E>;

/// Representation of all the errors that can occur when interacting with [`P2p`].
Expand Down Expand Up @@ -207,6 +215,9 @@ pub(crate) enum P2pCmd {
cid: Cid,
respond_to: OneshotResultSender<Vec<u8>, P2pError>,
},
GetNetworkCompromisedToken {
respond_to: oneshot::Sender<CancellationToken>,
},
}

impl P2p {
Expand Down Expand Up @@ -481,6 +492,19 @@ impl P2p {
})
.await
}

/// Get the cancellation token which will be cancelled when the network gets compromised.
///
/// After this token is cancelled, the network should be treated as insincere
/// and should not be trusted.
pub(crate) async fn get_network_compromised_token(&self) -> Result<CancellationToken> {
let (tx, rx) = oneshot::channel();

self.send_command(P2pCmd::GetNetworkCompromisedToken { respond_to: tx })
.await?;

Ok(rx.await?)
}
}

/// Our network behaviour.
Expand All @@ -506,10 +530,13 @@ where
{
swarm: Swarm<Behaviour<B, S>>,
header_sub_topic_hash: TopicHash,
bad_encoding_fraud_sub_topic: TopicHash,
cmd_rx: mpsc::Receiver<P2pCmd>,
peer_tracker: Arc<PeerTracker>,
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
network_compromised_token: CancellationToken,
store: Arc<S>,
}

impl<B, S> Worker<B, S>
Expand All @@ -534,7 +561,9 @@ where
));

let header_sub_topic = gossipsub_ident_topic(&args.network_id, "/header-sub/v0.0.1");
let gossipsub = init_gossipsub(&args, [&header_sub_topic])?;
let bad_encoding_fraud_sub_topic =
fraudsub_ident_topic(BadEncodingFraudProof::TYPE, &args.network_id);
let gossipsub = init_gossipsub(&args, [&header_sub_topic, &bad_encoding_fraud_sub_topic])?;

let kademlia = init_kademlia(&args)?;
let bitswap = init_bitswap(args.blockstore, args.store.clone(), &args.network_id)?;
Expand Down Expand Up @@ -572,10 +601,13 @@ where
Ok(Worker {
cmd_rx,
swarm,
bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
header_sub_topic_hash: header_sub_topic.hash(),
peer_tracker,
header_sub_watcher,
bitswap_queries: HashMap::new(),
network_compromised_token: CancellationToken::new(),
store: args.store,
})
}

Expand Down Expand Up @@ -712,6 +744,9 @@ where
P2pCmd::GetShwapCid { cid, respond_to } => {
self.on_get_shwap_cid(cid, respond_to);
}
P2pCmd::GetNetworkCompromisedToken { respond_to } => {
respond_to.maybe_send(self.network_compromised_token.child_token())
}
}

Ok(())
Expand Down Expand Up @@ -759,16 +794,21 @@ where
return;
};

// We may discovered a new peer
self.peer_maybe_discovered(peer);

let acceptance = if message.topic == self.header_sub_topic_hash {
self.on_header_sub_message(&message.data[..]).await
} else if message.topic == self.bad_encoding_fraud_sub_topic {
self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
.await
} else {
trace!("Unhandled gossipsub message");
gossipsub::MessageAcceptance::Ignore
};

if !matches!(acceptance, gossipsub::MessageAcceptance::Reject) {
// We may have discovered a new peer
self.peer_maybe_discovered(peer);
}

let _ = self
.swarm
.behaviour_mut()
Expand Down Expand Up @@ -899,6 +939,55 @@ where
gossipsub::MessageAcceptance::Ignore
}
}

#[instrument(skip_all)]
async fn on_bad_encoding_fraud_sub_message(
&mut self,
data: &[u8],
peer: &PeerId,
) -> gossipsub::MessageAcceptance {
let Ok(befp) = BadEncodingFraudProof::decode(data) else {
trace!("Malformed bad encoding fraud proof from {peer}");
self.swarm.behaviour_mut().gossipsub.blacklist_peer(peer);
return gossipsub::MessageAcceptance::Reject;
};

let height = befp.height().value();
let current_height =
if let Some(network_height) = network_head_height(&self.header_sub_watcher) {
network_height.value()
} else if let Ok(local_head) = self.store.get_head().await {
local_head.height().value()
} else {
// we aren't tracking the network and have uninitialized store
return gossipsub::MessageAcceptance::Ignore;
};

if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
// does this threshold make any sense if we're gonna ignore it anyway
// since we won't have the header
return gossipsub::MessageAcceptance::Ignore;
}

let hash = befp.header_hash();
let Ok(header) = self.store.get_by_hash(&hash).await else {
// we can't verify the proof without a header
// TODO: should we then store it and wait for the height? celestia doesn't
return gossipsub::MessageAcceptance::Ignore;
};

if let Err(e) = befp.validate(&header) {
trace!("Received invalid bad encoding fraud proof from {peer}: {e}");
self.swarm.behaviour_mut().gossipsub.blacklist_peer(peer);
return gossipsub::MessageAcceptance::Reject;
}

warn!("Received a valid bad encoding fraud proof");
// trigger cancellation for all services
self.network_compromised_token.cancel();

gossipsub::MessageAcceptance::Accept
}
}

/// Awaits at least one channel from the `bitswap_queries` to close.
Expand Down Expand Up @@ -1009,3 +1098,7 @@ where
.client_set_send_dont_have(false)
.build())
}

fn network_head_height(watcher: &watch::Sender<Option<ExtendedHeader>>) -> Option<Height> {
watcher.borrow().as_ref().map(|header| header.height())
}
7 changes: 7 additions & 0 deletions node/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ pub(crate) fn gossipsub_ident_topic(network: &str, topic: &str) -> IdentTopic {
IdentTopic::new(s)
}

pub(crate) fn fraudsub_ident_topic(proof_type: &str, network: &str) -> IdentTopic {
let proof_type = proof_type.trim_matches('/');
let network = network.trim_matches('/');
let s = format!("/{proof_type}/fraud-sub/{network}/v0.0.1");
IdentTopic::new(s)
}

pub(crate) type OneshotResultSender<T, E> = oneshot::Sender<Result<T, E>>;

pub(crate) trait OneshotSenderExt<T>
Expand Down
Loading

0 comments on commit e7fc4be

Please sign in to comment.