Skip to content

Commit

Permalink
Subprotocol example (#8991)
Browse files Browse the repository at this point in the history
Co-authored-by: owanikin <[email protected]>
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
3 people authored Jun 26, 2024
1 parent 1fde1dc commit 063c08f
Show file tree
Hide file tree
Showing 11 changed files with 446 additions and 0 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ members = [
"examples/polygon-p2p/",
"examples/rpc-db/",
"examples/txpool-tracing/",
"examples/custom-rlpx-subprotocol",
"examples/exex/minimal/",
"examples/exex/op-bridge/",
"testing/ef-tests/",
"testing/testing-utils",
]
Expand Down
23 changes: 23 additions & 0 deletions examples/custom-rlpx-subprotocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "example-custom-rlpx-subprotocol"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true


[dependencies]
tokio = { workspace = true, features = ["full"] }
futures.workspace = true
reth-eth-wire.workspace = true
reth-network.workspace = true
reth-network-api.workspace = true
reth-node-ethereum.workspace = true
reth-provider.workspace = true
reth-primitives.workspace = true
reth-rpc-types.workspace = true
reth.workspace = true
tokio-stream.workspace = true
eyre.workspace = true
rand.workspace = true
tracing.workspace = true
104 changes: 104 additions & 0 deletions examples/custom-rlpx-subprotocol/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//! Example for how to customize the network layer by adding a custom rlpx subprotocol.
//!
//! Run with
//!
//! ```not_rust
//! cargo run -p example-custom-rlpx-subprotocol -- node
//! ```
//!
//! This launch a regular reth node with a custom rlpx subprotocol.
use reth::builder::NodeHandle;
use reth_network::{
config::SecretKey, protocol::IntoRlpxSubProtocol, NetworkConfig, NetworkManager,
NetworkProtocols,
};
use reth_network_api::NetworkInfo;
use reth_node_ethereum::EthereumNode;
use reth_provider::test_utils::NoopProvider;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use subprotocol::{
connection::CustomCommand,
protocol::{
event::ProtocolEvent,
handler::{CustomRlpxProtoHandler, ProtocolState},
},
};
use tokio::sync::{mpsc, oneshot};
use tracing::info;

mod subprotocol;

fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _args| async move {
// launch the node
let NodeHandle { node, node_exit_future } =
builder.node(EthereumNode::default()).launch().await?;
let peer_id = node.network.peer_id();
let peer_addr = node.network.local_addr();

// add the custom network subprotocol to the launched node
let (tx, mut from_peer0) = mpsc::unbounded_channel();
let custom_rlpx_handler = CustomRlpxProtoHandler { state: ProtocolState { events: tx } };
node.network.add_rlpx_sub_protocol(custom_rlpx_handler.into_rlpx_sub_protocol());

// creates a separate network instance and adds the custom network subprotocol
let secret_key = SecretKey::new(&mut rand::thread_rng());
let (tx, mut from_peer1) = mpsc::unbounded_channel();
let custom_rlpx_handler_2 = CustomRlpxProtoHandler { state: ProtocolState { events: tx } };
let net_cfg = NetworkConfig::builder(secret_key)
.listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
.disable_discovery()
.add_rlpx_sub_protocol(custom_rlpx_handler_2.into_rlpx_sub_protocol())
.build(NoopProvider::default());

// spawn the second network instance
let subnetwork = NetworkManager::new(net_cfg).await?;
let subnetwork_peer_id = *subnetwork.peer_id();
let subnetwork_peer_addr = subnetwork.local_addr();
let subnetwork_handle = subnetwork.peers_handle();
node.task_executor.spawn(subnetwork);

// connect the launched node to the subnetwork
node.network.peers_handle().add_peer(subnetwork_peer_id, subnetwork_peer_addr);

// connect the subnetwork to the launched node
subnetwork_handle.add_peer(*peer_id, peer_addr);

// establish connection between peer0 and peer1
let peer0_to_peer1 = from_peer0.recv().await.expect("peer0 connecting to peer1");
let peer0_conn = match peer0_to_peer1 {
ProtocolEvent::Established { direction: _, peer_id, to_connection } => {
assert_eq!(peer_id, subnetwork_peer_id);
to_connection
}
};

// establish connection between peer1 and peer0
let peer1_to_peer0 = from_peer1.recv().await.expect("peer1 connecting to peer0");
let peer1_conn = match peer1_to_peer0 {
ProtocolEvent::Established { direction: _, peer_id: peer1_id, to_connection } => {
assert_eq!(peer1_id, *peer_id);
to_connection
}
};
info!(target:"rlpx-subprotocol", "Connection established!");

// send a ping message from peer0 to peer1
let (tx, rx) = oneshot::channel();
peer0_conn.send(CustomCommand::Message { msg: "hello!".to_string(), response: tx })?;
let response = rx.await?;
assert_eq!(response, "hello!");
info!(target:"rlpx-subprotocol", ?response, "New message received");

// send a ping message from peer1 to peer0
let (tx, rx) = oneshot::channel();
peer1_conn.send(CustomCommand::Message { msg: "world!".to_string(), response: tx })?;
let response = rx.await?;
assert_eq!(response, "world!");
info!(target:"rlpx-subprotocol", ?response, "New message received");

info!(target:"rlpx-subprotocol", "Peers connected via custom rlpx subprotocol!");

node_exit_future.await
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use super::CustomRlpxConnection;
use crate::subprotocol::protocol::{
event::ProtocolEvent, handler::ProtocolState, proto::CustomRlpxProtoMessage,
};
use reth_eth_wire::{
capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol,
};
use reth_network::protocol::{ConnectionHandler, OnNotSupported};
use reth_network_api::Direction;
use reth_rpc_types::PeerId;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

/// The connection handler for the custom RLPx protocol.
pub(crate) struct CustomRlpxConnectionHandler {
pub(crate) state: ProtocolState,
}

impl ConnectionHandler for CustomRlpxConnectionHandler {
type Connection = CustomRlpxConnection;

fn protocol(&self) -> Protocol {
CustomRlpxProtoMessage::protocol()
}

fn on_unsupported_by_peer(
self,
_supported: &SharedCapabilities,
_direction: Direction,
_peer_id: PeerId,
) -> OnNotSupported {
OnNotSupported::KeepAlive
}

fn into_connection(
self,
direction: Direction,
peer_id: PeerId,
conn: ProtocolConnection,
) -> Self::Connection {
let (tx, rx) = mpsc::unbounded_channel();
self.state
.events
.send(ProtocolEvent::Established { direction, peer_id, to_connection: tx })
.ok();
CustomRlpxConnection {
conn,
initial_ping: direction.is_outgoing().then(CustomRlpxProtoMessage::ping),
commands: UnboundedReceiverStream::new(rx),
pending_pong: None,
}
}
}
76 changes: 76 additions & 0 deletions examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use super::protocol::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind};
use futures::{Stream, StreamExt};
use reth_eth_wire::multiplex::ProtocolConnection;
use reth_primitives::BytesMut;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
use tokio_stream::wrappers::UnboundedReceiverStream;

pub(crate) mod handler;

/// We define some custom commands that the subprotocol supports.
pub(crate) enum CustomCommand {
/// Sends a message to the peer
Message {
msg: String,
/// The response will be sent to this channel.
response: oneshot::Sender<String>,
},
}

/// The connection handler for the custom RLPx protocol.
pub(crate) struct CustomRlpxConnection {
conn: ProtocolConnection,
initial_ping: Option<CustomRlpxProtoMessage>,
commands: UnboundedReceiverStream<CustomCommand>,
pending_pong: Option<oneshot::Sender<String>>,
}

impl Stream for CustomRlpxConnection {
type Item = BytesMut;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(initial_ping) = this.initial_ping.take() {
return Poll::Ready(Some(initial_ping.encoded()))
}

loop {
if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) {
return match cmd {
CustomCommand::Message { msg, response } => {
this.pending_pong = Some(response);
Poll::Ready(Some(CustomRlpxProtoMessage::ping_message(msg).encoded()))
}
}
}

let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) };

let Some(msg) = CustomRlpxProtoMessage::decode_message(&mut &msg[..]) else {
return Poll::Ready(None)
};

match msg.message {
CustomRlpxProtoMessageKind::Ping => {
return Poll::Ready(Some(CustomRlpxProtoMessage::pong().encoded()))
}
CustomRlpxProtoMessageKind::Pong => {}
CustomRlpxProtoMessageKind::PingMessage(msg) => {
return Poll::Ready(Some(CustomRlpxProtoMessage::pong_message(msg).encoded()))
}
CustomRlpxProtoMessageKind::PongMessage(msg) => {
if let Some(sender) = this.pending_pong.take() {
sender.send(msg).ok();
}
continue
}
}

return Poll::Pending
}
}
}
2 changes: 2 additions & 0 deletions examples/custom-rlpx-subprotocol/src/subprotocol/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod connection;
pub(crate) mod protocol;
15 changes: 15 additions & 0 deletions examples/custom-rlpx-subprotocol/src/subprotocol/protocol/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use crate::subprotocol::connection::CustomCommand;
use reth_network::Direction;
use reth_network_api::PeerId;
use tokio::sync::mpsc;

/// The events that can be emitted by our custom protocol.
#[derive(Debug)]
pub(crate) enum ProtocolEvent {
Established {
#[allow(dead_code)]
direction: Direction,
peer_id: PeerId,
to_connection: mpsc::UnboundedSender<CustomCommand>,
},
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use super::event::ProtocolEvent;
use crate::subprotocol::connection::handler::CustomRlpxConnectionHandler;
use reth_network::protocol::ProtocolHandler;
use reth_network_api::PeerId;
use std::net::SocketAddr;
use tokio::sync::mpsc;

/// Protocol state is an helper struct to store the protocol events.
#[derive(Clone, Debug)]
pub(crate) struct ProtocolState {
pub(crate) events: mpsc::UnboundedSender<ProtocolEvent>,
}

/// The protocol handler takes care of incoming and outgoing connections.
#[derive(Debug)]
pub(crate) struct CustomRlpxProtoHandler {
pub state: ProtocolState,
}

impl ProtocolHandler for CustomRlpxProtoHandler {
type ConnectionHandler = CustomRlpxConnectionHandler;

fn on_incoming(&self, _socket_addr: SocketAddr) -> Option<Self::ConnectionHandler> {
Some(CustomRlpxConnectionHandler { state: self.state.clone() })
}

fn on_outgoing(
&self,
_socket_addr: SocketAddr,
_peer_id: PeerId,
) -> Option<Self::ConnectionHandler> {
Some(CustomRlpxConnectionHandler { state: self.state.clone() })
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub(crate) mod event;
pub(crate) mod handler;
pub(crate) mod proto;
Loading

0 comments on commit 063c08f

Please sign in to comment.