Skip to content

Commit

Permalink
network: Upgrade litep2p to v0.6.2 (paritytech#4799)
Browse files Browse the repository at this point in the history
This PR upgrades `litep2p` to the latest version and includes the two
fixes:

1. Enables incoming DHT record validation with `litep2p` network
backend.
2. Sets `TCP_NODELAY` flag on TCP & WS sockets in `litep2p` backend, as
it is currently done in `libp2p` backend.

---------

Signed-off-by: Oliver Tale-Yazdi <[email protected]>
Co-authored-by: Oliver Tale-Yazdi <[email protected]>
  • Loading branch information
2 people authored and TarekkMA committed Aug 2, 2024
1 parent 335dab9 commit bc48cc8
Show file tree
Hide file tree
Showing 8 changed files with 539 additions and 440 deletions.
827 changes: 413 additions & 414 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ linked-hash-map = { version = "0.5.4" }
linked_hash_set = { version = "0.1.4" }
linregress = { version = "0.5.1" }
lite-json = { version = "0.2.0", default-features = false }
litep2p = { version = "0.5.0" }
litep2p = { version = "0.6.2" }
log = { version = "0.4.21", default-features = false }
macro_magic = { version = "0.5.0" }
maplit = { version = "1.0.2" }
Expand Down
24 changes: 24 additions & 0 deletions prdoc/pr_4799.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: "network: Upgrade `litep2p` to v0.6.0"

doc:
- audience: Node Operator
description: |
This PR brings the latest `litep2p` v0.6.0 to polkadot-sdk with stability improvements,
security fixes, and performance optimizations.

Specifically:
- Incoming DHT records are now validated also with experimental litep2p network backend.
- Performance of TCP & WebSocket connections improved by setting `TCP_NODELAY` flag.
- Stability of secondary connection establishment improved.
- Multiple possible panics in litep2p library eliminated.

crates:
- name: sc-authority-discovery
bump: patch
- name: sc-network
bump: patch
- name: sc-network-types
bump: patch
2 changes: 1 addition & 1 deletion substrate/client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ where

debug!(
target: LOG_TARGET,
"Authority DHT record peer_id='{local_peer_id}' addresses='{addresses:?}'",
"Publishing authority DHT record peer_id='{local_peer_id}' addresses='{addresses:?}'",
);

// The address must include the local peer id.
Expand Down
50 changes: 46 additions & 4 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use litep2p::{
libp2p::{
identify::{Config as IdentifyConfig, IdentifyEvent},
kademlia::{
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, KademliaEvent,
KademliaHandle, QueryId, Quorum, Record, RecordKey, RecordsType,
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder,
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum,
Record, RecordKey, RecordsType,
},
ping::{Config as PingConfig, PingEvent},
},
Expand All @@ -52,7 +53,7 @@ use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
time::{Duration, Instant},
};

/// Logging target for the file.
Expand Down Expand Up @@ -138,6 +139,12 @@ pub enum DiscoveryEvent {
/// Query ID.
query_id: QueryId,
},

/// Incoming record to store.
IncomingRecord {
/// Record.
record: Record,
},
}

/// Discovery.
Expand Down Expand Up @@ -249,6 +256,7 @@ impl Discovery {
KademliaConfigBuilder::new()
.with_known_peers(known_peers)
.with_protocol_names(protocol_names)
.with_incoming_records_validation_mode(IncomingRecordValidationMode::Manual)
.build()
};

Expand Down Expand Up @@ -295,7 +303,7 @@ impl Discovery {
) {
if self.local_protocols.is_disjoint(&supported_protocols) {
log::trace!(
target: "sub-libp2p",
target: LOG_TARGET,
"Ignoring self-reported address of peer {peer} as remote node is not part of the \
Kademlia DHT supported by the local node.",
);
Expand Down Expand Up @@ -340,6 +348,30 @@ impl Discovery {
.await
}

/// Store record in the local DHT store.
pub async fn store_record(
&mut self,
key: KademliaKey,
value: Vec<u8>,
publisher: Option<sc_network_types::PeerId>,
expires: Option<Instant>,
) {
log::debug!(
target: LOG_TARGET,
"Storing DHT record with key {key:?}, originally published by {publisher:?}, \
expires {expires:?}.",
);

self.kademlia_handle
.store_record(Record {
key: RecordKey::new(&key.to_vec()),
value,
publisher: publisher.map(Into::into),
expires,
})
.await;
}

/// Check if the observed address is a known address.
fn is_known_address(known: &Multiaddr, observed: &Multiaddr) -> bool {
let mut known = known.iter();
Expand Down Expand Up @@ -481,6 +513,16 @@ impl Stream for Discovery {
false => return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })),
}
},
Poll::Ready(Some(KademliaEvent::IncomingRecord { record })) => {
log::trace!(
target: LOG_TARGET,
"incoming `PUT_RECORD` request with key {:?} from publisher {:?}",
record.key,
record.publisher,
);

return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record }))
},
}

match Pin::new(&mut this.identify_event_stream).poll_next(cx) {
Expand Down
17 changes: 16 additions & 1 deletion substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use litep2p::{
protocol::{
libp2p::{
bitswap::Config as BitswapConfig,
kademlia::{QueryId, RecordsType},
kademlia::{QueryId, Record, RecordsType},
},
request_response::ConfigBuilder as RequestResponseConfigBuilder,
},
Expand Down Expand Up @@ -369,11 +369,13 @@ impl Litep2pNetworkBackend {
.with_websocket(WebSocketTransportConfig {
listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
yamux_config: yamux_config.clone(),
nodelay: true,
..Default::default()
})
.with_tcp(TcpTransportConfig {
listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
yamux_config,
nodelay: true,
..Default::default()
})
}
Expand Down Expand Up @@ -698,6 +700,9 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
let query_id = self.discovery.put_value(key.clone(), value).await;
self.pending_put_values.insert(query_id, (key, Instant::now()));
}
NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
}
NetworkServiceCommand::EventStream { tx } => {
self.event_streams.push(tx);
}
Expand Down Expand Up @@ -915,6 +920,16 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
"ping time with {peer:?}: {rtt:?}",
);
}
Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
self.event_streams.send(Event::Dht(
DhtEvent::PutRecordRequest(
libp2p::kad::RecordKey::new(&key),
value,
publisher.map(Into::into),
expires,
)
));
}
},
event = self.litep2p.next_event() => match event {
Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
Expand Down
31 changes: 25 additions & 6 deletions substrate/client/network/src/litep2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ pub enum NetworkServiceCommand {
value: Vec<u8>,
},

/// Store record in the local DHT store.
StoreRecord {
/// Record key.
key: KademliaKey,

/// Record value.
value: Vec<u8>,

/// Original publisher of the record.
publisher: Option<PeerId>,

/// Record expiration time as measured by a local, monothonic clock.
expires: Option<Instant>,
},

/// Query network status.
Status {
/// `oneshot::Sender` for sending the status.
Expand Down Expand Up @@ -240,13 +255,17 @@ impl NetworkDHTProvider for Litep2pNetworkService {

fn store_record(
&self,
_key: KademliaKey,
_value: Vec<u8>,
_publisher: Option<PeerId>,
_expires: Option<Instant>,
key: KademliaKey,
value: Vec<u8>,
publisher: Option<PeerId>,
expires: Option<Instant>,
) {
// Will be added once litep2p is released with: https://github.com/paritytech/litep2p/pull/135
log::warn!(target: LOG_TARGET, "Store record is not implemented for litep2p");
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StoreRecord {
key,
value,
publisher,
expires,
});
}
}

Expand Down
26 changes: 13 additions & 13 deletions substrate/client/network/types/src/ed25519.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ impl fmt::Debug for Keypair {

impl From<litep2p_ed25519::Keypair> for Keypair {
fn from(kp: litep2p_ed25519::Keypair) -> Self {
Self::try_from_bytes(&mut kp.encode())
Self::try_from_bytes(&mut kp.to_bytes())
.expect("ed25519_dalek in substrate & litep2p to use the same format")
}
}

impl From<Keypair> for litep2p_ed25519::Keypair {
fn from(kp: Keypair) -> Self {
Self::decode(&mut kp.to_bytes())
Self::try_from_bytes(&mut kp.to_bytes())
.expect("ed25519_dalek in substrate & litep2p to use the same format")
}
}
Expand Down Expand Up @@ -191,14 +191,14 @@ impl PublicKey {

impl From<litep2p_ed25519::PublicKey> for PublicKey {
fn from(k: litep2p_ed25519::PublicKey) -> Self {
Self::try_from_bytes(&k.encode())
Self::try_from_bytes(&k.to_bytes())
.expect("ed25519_dalek in substrate & litep2p to use the same format")
}
}

impl From<PublicKey> for litep2p_ed25519::PublicKey {
fn from(k: PublicKey) -> Self {
Self::decode(&k.to_bytes())
Self::try_from_bytes(&k.to_bytes())
.expect("ed25519_dalek in substrate & litep2p to use the same format")
}
}
Expand Down Expand Up @@ -272,7 +272,7 @@ impl From<litep2p_ed25519::SecretKey> for SecretKey {

impl From<SecretKey> for litep2p_ed25519::SecretKey {
fn from(sk: SecretKey) -> Self {
Self::from_bytes(&mut sk.to_bytes())
Self::try_from_bytes(&mut sk.to_bytes())
.expect("litep2p `SecretKey` to accept 32 bytes as Ed25519 key")
}
}
Expand Down Expand Up @@ -357,10 +357,10 @@ mod tests {
let kp1: libp2p_ed25519::Keypair = kp.clone().into();
let kp2: litep2p_ed25519::Keypair = kp.clone().into();
let kp3 = libp2p_ed25519::Keypair::try_from_bytes(&mut kp_bytes.clone()).unwrap();
let kp4 = litep2p_ed25519::Keypair::decode(&mut kp_bytes.clone()).unwrap();
let kp4 = litep2p_ed25519::Keypair::try_from_bytes(&mut kp_bytes.clone()).unwrap();

assert_eq!(kp_bytes, kp1.to_bytes());
assert_eq!(kp_bytes, kp2.encode());
assert_eq!(kp_bytes, kp2.to_bytes());

let msg = "hello world".as_bytes();
let sig = kp.sign(msg);
Expand Down Expand Up @@ -389,9 +389,9 @@ mod tests {
fn litep2p_kp_to_substrate_kp() {
let kp = litep2p_ed25519::Keypair::generate();
let kp1: Keypair = kp.clone().into();
let kp2 = Keypair::try_from_bytes(&mut kp.encode()).unwrap();
let kp2 = Keypair::try_from_bytes(&mut kp.to_bytes()).unwrap();

assert_eq!(kp.encode(), kp1.to_bytes());
assert_eq!(kp.to_bytes(), kp1.to_bytes());

let msg = "hello world".as_bytes();
let sig = kp.sign(msg);
Expand Down Expand Up @@ -439,10 +439,10 @@ mod tests {
let pk1: libp2p_ed25519::PublicKey = pk.clone().into();
let pk2: litep2p_ed25519::PublicKey = pk.clone().into();
let pk3 = libp2p_ed25519::PublicKey::try_from_bytes(&pk_bytes).unwrap();
let pk4 = litep2p_ed25519::PublicKey::decode(&pk_bytes).unwrap();
let pk4 = litep2p_ed25519::PublicKey::try_from_bytes(&pk_bytes).unwrap();

assert_eq!(pk_bytes, pk1.to_bytes());
assert_eq!(pk_bytes, pk2.encode());
assert_eq!(pk_bytes, pk2.to_bytes());

let msg = "hello world".as_bytes();
let sig = kp.sign(msg);
Expand All @@ -458,7 +458,7 @@ mod tests {
fn litep2p_pk_to_substrate_pk() {
let kp = litep2p_ed25519::Keypair::generate();
let pk = kp.public();
let pk_bytes = pk.clone().encode();
let pk_bytes = pk.clone().to_bytes();
let pk1: PublicKey = pk.clone().into();
let pk2 = PublicKey::try_from_bytes(&pk_bytes).unwrap();

Expand Down Expand Up @@ -497,7 +497,7 @@ mod tests {
let sk1: libp2p_ed25519::SecretKey = sk.clone().into();
let sk2: litep2p_ed25519::SecretKey = sk.clone().into();
let sk3 = libp2p_ed25519::SecretKey::try_from_bytes(&mut sk_bytes.clone()).unwrap();
let sk4 = litep2p_ed25519::SecretKey::from_bytes(&mut sk_bytes.clone()).unwrap();
let sk4 = litep2p_ed25519::SecretKey::try_from_bytes(&mut sk_bytes.clone()).unwrap();

let kp: Keypair = sk.into();
let kp1: libp2p_ed25519::Keypair = sk1.into();
Expand Down

0 comments on commit bc48cc8

Please sign in to comment.