From c6eb46a0a155079c9f2deb1f4943824c963f0d8a Mon Sep 17 00:00:00 2001 From: owanikin Date: Sun, 5 May 2024 17:53:51 +0100 Subject: [PATCH 01/10] draft pr: Add example crate for custom rlpx subprotocol #7130 --- Cargo.lock | 19 ++ Cargo.toml | 3 + examples/custom-rlpx-subprotocol/Cargo.toml | 21 ++ examples/custom-rlpx-subprotocol/src/main.rs | 292 +++++++++++++++++++ 4 files changed, 335 insertions(+) create mode 100644 examples/custom-rlpx-subprotocol/Cargo.toml create mode 100644 examples/custom-rlpx-subprotocol/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index d8642eb7f132..0b0c7e40943d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2207,6 +2207,25 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "custom-rlpx-subprotocol" +version = "0.1.0" +dependencies = [ + "eyre", + "futures", + "reth", + "reth-eth-wire", + "reth-network", + "reth-network-api", + "reth-node-ethereum", + "reth-primitives", + "reth-provider", + "reth-rpc-types", + "reth-tracing", + "tokio", + "tokio-stream", +] + [[package]] name = "darling" version = "0.20.9" diff --git a/Cargo.toml b/Cargo.toml index 4cf1778781be..5188c9121188 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -123,6 +123,9 @@ members = [ "examples/polygon-p2p/", "examples/rpc-db/", "examples/txpool-tracing/", + "examples/custom-rlpx-subprotocol", + "examples/exex/minimal/", + "examples/exex/op-bridge/", "testing/ef-tests/", "testing/testing-utils", ] diff --git a/examples/custom-rlpx-subprotocol/Cargo.toml b/examples/custom-rlpx-subprotocol/Cargo.toml new file mode 100644 index 000000000000..e0c0f4143919 --- /dev/null +++ b/examples/custom-rlpx-subprotocol/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "custom-rlpx-subprotocol" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { workspace = true, features = ["full"] } +futures.workspace = true +reth-eth-wire.workspace = true +reth-network.workspace = true +reth-network-api.workspace = true +reth-node-ethereum.workspace = true +reth-provider.workspace = true +reth-primitives.workspace = true +tokio-stream.workspace = true +reth-rpc-types.workspace = true +reth-tracing.workspace = true +reth.workspace = true +eyre.workspace = true diff --git a/examples/custom-rlpx-subprotocol/src/main.rs b/examples/custom-rlpx-subprotocol/src/main.rs new file mode 100644 index 000000000000..4b6bc29b6935 --- /dev/null +++ b/examples/custom-rlpx-subprotocol/src/main.rs @@ -0,0 +1,292 @@ +// This example showcase rlpx subprotocols + +// Look closely into this crates/imports(use) that the code is importing + +use crate::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind}; +use futures::{Stream, StreamExt}; +use reth::{builder::NodeHandle, network}; +use reth_eth_wire::{ + capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, +}; +use reth_network::{ + protocol::{ConnectionHandler, OnNotSupported, ProtocolHandler}, + test_utils::Testnet, + NetworkProtocols, +}; +use reth_network_api::Direction; +use reth_node_ethereum::EthereumNode; +use reth_primitives::BytesMut; +use reth_provider::test_utils::MockEthProvider; +use reth_rpc_types::PeerId; +use std::{ + net::SocketAddr, + pin::Pin, + task::{ready, Context, Poll}, +}; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +// Custom Rlpx Subprotocol +pub mod proto { + use super::*; + use reth_eth_wire::capability::Capability; + use reth_primitives::{Buf, BufMut}; + + #[repr(u8)] + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + pub enum CustomRlpxProtoMessageId { + Ping = 0x00, + Pong = 0x01, + CustomMessage = 0x02, + } + + #[derive(Clone, Debug, PartialEq, Eq)] + pub enum CustomRlpxProtoMessageKind { + Ping, + Pong, + CustomMessage(String), + } + + #[derive(Clone, Debug, PartialEq, Eq)] + pub struct CustomRlpxProtoMessage { + pub message_type: CustomRlpxProtoMessageId, + pub message: CustomRlpxProtoMessageKind, + } + + impl CustomRlpxProtoMessage { + /// Returns the capability for the `custom_rlpx` protocol. + pub fn capability() -> Capability { + Capability::new_static("custom_rlpx", 1) + } + + /// Returns the protocol for the `custom_rlpx` protocol. + pub fn protocol() -> Protocol { + Protocol::new(Self::capability(), 3) + } + + /// Creates a ping message + pub fn ping() -> Self { + Self { + message_type: CustomRlpxProtoMessageId::Ping, + message: CustomRlpxProtoMessageKind::Ping, + } + } + + /// Creates a pong message + pub fn pong() -> Self { + Self { + message_type: CustomRlpxProtoMessageId::Pong, + message: CustomRlpxProtoMessageKind::Pong, + } + } + + /// Creates a custom message + pub fn custom_message(msg: impl Into) -> Self { + Self { + message_type: CustomRlpxProtoMessageId::CustomMessage, + message: CustomRlpxProtoMessageKind::CustomMessage(msg.into()), + } + } + + /// Creates a new `CustomRlpxProtoMessage` with the given message ID and payload. + pub fn encoded(&self) -> BytesMut { + let mut buf = BytesMut::new(); + buf.put_u8(self.message_type as u8); + match &self.message { + CustomRlpxProtoMessageKind::Ping => {} + CustomRlpxProtoMessageKind::Pong => {} + CustomRlpxProtoMessageKind::CustomMessage(msg) => { + buf.put(msg.as_bytes()); + } + } + buf + } + + /// Decodes a `CustomRlpxProtoMessage` from the given message buffer. + pub fn decode_message(buf: &mut &[u8]) -> Option { + if buf.is_empty() { + return None; + } + let id = buf[0]; + buf.advance(1); + let message_type = match id { + 0x00 => CustomRlpxProtoMessageId::Ping, + 0x01 => CustomRlpxProtoMessageId::Pong, + 0x02 => CustomRlpxProtoMessageId::CustomMessage, + _ => return None, + }; + let message = match message_type { + CustomRlpxProtoMessageId::Ping => CustomRlpxProtoMessageKind::Ping, + CustomRlpxProtoMessageId::Pong => CustomRlpxProtoMessageKind::Pong, + CustomRlpxProtoMessageId::CustomMessage => { + CustomRlpxProtoMessageKind::CustomMessage( + String::from_utf8_lossy(&buf[..]).into_owned(), + ) + } + }; + Some(Self { message_type, message }) + } + } +} + +/// Custom Rlpx Subprotocol Handler +#[derive(Debug)] +struct CustomRlpxProtoHandler { + state: ProtocolState, +} + +impl ProtocolHandler for CustomRlpxProtoHandler { + type ConnectionHandler = CustomRlpxConnectionHandler; + + fn on_incoming(&self, _socket_addr: SocketAddr) -> Option { + Some(CustomRlpxConnectionHandler { state: self.state.clone() }) + } + + fn on_outgoing( + &self, + _socket_addr: SocketAddr, + _peer_id: PeerId, + ) -> Option { + Some(CustomRlpxConnectionHandler { state: self.state.clone() }) + } +} + +#[derive(Clone, Debug)] +struct ProtocolState { + events: mpsc::UnboundedSender, +} + +#[derive(Debug)] +enum ProtocolEvent { + Established { + #[allow(dead_code)] + direction: Direction, + peer_id: PeerId, + to_connection: mpsc::UnboundedSender, + }, +} + +enum Command { + /// Send a custom message to the peer + CustomMessage { + msg: String, + /// The response will be sent to this channel. + response: oneshot::Sender, + }, +} + +struct CustomRlpxConnectionHandler { + state: ProtocolState, +} + +impl ConnectionHandler for CustomRlpxConnectionHandler { + type Connection = CustomRlpxConnection; + + fn protocol(&self) -> Protocol { + CustomRlpxProtoMessage::protocol() + } + + fn on_unsupported_by_peer( + self, + _supported: &SharedCapabilities, + _direction: Direction, + _peer_id: PeerId, + ) -> OnNotSupported { + OnNotSupported::KeepAlive + } + + fn into_connection( + self, + direction: Direction, + peer_id: PeerId, + conn: ProtocolConnection, + ) -> Self::Connection { + let (tx, rx) = mpsc::unbounded_channel(); + self.state + .events + .send(ProtocolEvent::Established { direction, peer_id, to_connection: tx }) + .ok(); + CustomRlpxConnection { + conn, + initial_ping: direction.is_outgoing().then(CustomRlpxProtoMessage::ping), + commands: UnboundedReceiverStream::new(rx), + pending_pong: None, + } + } +} + +struct CustomRlpxConnection { + conn: ProtocolConnection, + initial_ping: Option, + commands: UnboundedReceiverStream, + pending_pong: Option>, +} + +impl Stream for CustomRlpxConnection { + type Item = BytesMut; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + if let Some(initial_ping) = this.initial_ping.take() { + return Poll::Ready(Some(initial_ping.encoded())); + } + + loop { + if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) { + return match cmd { + Command::CustomMessage { msg, response } => { + this.pending_pong = Some(response); + Poll::Ready(Some(CustomRlpxProtoMessage::ping().encoded())) + } + }; + } + let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; + let Some(msg) = CustomRlpxProtoMessage::decode_message(&mut &msg[..]) else { + return Poll::Ready(None); + }; + + match msg.message { + CustomRlpxProtoMessageKind::Ping => { + return Poll::Ready(Some(CustomRlpxProtoMessage::pong().encoded())) + } + CustomRlpxProtoMessageKind::Pong => {} + CustomRlpxProtoMessageKind::CustomMessage(msg) => { + if let Some(sender) = this.pending_pong.take() { + sender.send(msg).ok(); + } + continue; + } + } + return Poll::Pending; + } + } +} + +fn main() -> eyre::Result<()> { + reth::cli::Cli::parse_args().run(|builder, args| async move { + // launch the node + let NodeHandle { mut node, node_exit_future } = + builder.node(EthereumNode::default()).launch().await?; + + // After lauch and after launch we inject a new rlpx protocol handler via the network + // node.network the rlpx can be similar to the test example, could even be something + // like simple string message exchange + + // let custom_rlpx_handler = CustomRlpxProtoHandler{ state: ProtocolState { events: + // node.network.events.clone()}}; node.network. + // add_rlpx_sub_protocol(custom_rlpx_handler); + + // Spawn a task to handle incoming messages from the custom RLPx protocol + + node_exit_future.await + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use reth_tracing::init_test_tracing; + + #[tokio::test(flavor = "multi_thread")] + async fn test_custom_rlpx_proto() {} +} From 2b4b410c1227f76a61e61d848c10c8ea80c77beb Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 21 May 2024 13:50:24 +0200 Subject: [PATCH 02/10] some boilderplate --- Cargo.lock | 1 - examples/custom-rlpx-subprotocol/Cargo.toml | 1 - examples/custom-rlpx-subprotocol/src/main.rs | 142 ++++-------------- examples/custom-rlpx-subprotocol/src/proto.rs | 97 ++++++++++++ 4 files changed, 123 insertions(+), 118 deletions(-) create mode 100644 examples/custom-rlpx-subprotocol/src/proto.rs diff --git a/Cargo.lock b/Cargo.lock index 0b0c7e40943d..6732f1534977 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2221,7 +2221,6 @@ dependencies = [ "reth-primitives", "reth-provider", "reth-rpc-types", - "reth-tracing", "tokio", "tokio-stream", ] diff --git a/examples/custom-rlpx-subprotocol/Cargo.toml b/examples/custom-rlpx-subprotocol/Cargo.toml index e0c0f4143919..0050f263ed07 100644 --- a/examples/custom-rlpx-subprotocol/Cargo.toml +++ b/examples/custom-rlpx-subprotocol/Cargo.toml @@ -16,6 +16,5 @@ reth-provider.workspace = true reth-primitives.workspace = true tokio-stream.workspace = true reth-rpc-types.workspace = true -reth-tracing.workspace = true reth.workspace = true eyre.workspace = true diff --git a/examples/custom-rlpx-subprotocol/src/main.rs b/examples/custom-rlpx-subprotocol/src/main.rs index 4b6bc29b6935..929f3ade8032 100644 --- a/examples/custom-rlpx-subprotocol/src/main.rs +++ b/examples/custom-rlpx-subprotocol/src/main.rs @@ -1,6 +1,9 @@ -// This example showcase rlpx subprotocols +//! This example showcase custom rlpx subprotocols +//! +//! This installs a custom rlpx subprotocol and negotiates it with peers. If a remote peer also +//! supports this protocol, it will be used to exchange custom messages. -// Look closely into this crates/imports(use) that the code is importing +#![cfg_attr(not(test), warn(unused_crate_dependencies))] use crate::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind}; use futures::{Stream, StreamExt}; @@ -23,111 +26,25 @@ use std::{ pin::Pin, task::{ready, Context, Poll}, }; + +use crate::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind}; +use futures::{Stream, StreamExt}; +use reth::builder::NodeHandle; +use reth_eth_wire::{ + capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, +}; +use reth_network::{ + protocol::{ConnectionHandler, OnNotSupported, ProtocolHandler}, + NetworkEvents, NetworkProtocols, +}; +use reth_network_api::Direction; +use reth_node_ethereum::EthereumNode; +use reth_primitives::BytesMut; +use reth_rpc_types::PeerId; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; -// Custom Rlpx Subprotocol -pub mod proto { - use super::*; - use reth_eth_wire::capability::Capability; - use reth_primitives::{Buf, BufMut}; - - #[repr(u8)] - #[derive(Clone, Copy, Debug, PartialEq, Eq)] - pub enum CustomRlpxProtoMessageId { - Ping = 0x00, - Pong = 0x01, - CustomMessage = 0x02, - } - - #[derive(Clone, Debug, PartialEq, Eq)] - pub enum CustomRlpxProtoMessageKind { - Ping, - Pong, - CustomMessage(String), - } - - #[derive(Clone, Debug, PartialEq, Eq)] - pub struct CustomRlpxProtoMessage { - pub message_type: CustomRlpxProtoMessageId, - pub message: CustomRlpxProtoMessageKind, - } - - impl CustomRlpxProtoMessage { - /// Returns the capability for the `custom_rlpx` protocol. - pub fn capability() -> Capability { - Capability::new_static("custom_rlpx", 1) - } - - /// Returns the protocol for the `custom_rlpx` protocol. - pub fn protocol() -> Protocol { - Protocol::new(Self::capability(), 3) - } - - /// Creates a ping message - pub fn ping() -> Self { - Self { - message_type: CustomRlpxProtoMessageId::Ping, - message: CustomRlpxProtoMessageKind::Ping, - } - } - - /// Creates a pong message - pub fn pong() -> Self { - Self { - message_type: CustomRlpxProtoMessageId::Pong, - message: CustomRlpxProtoMessageKind::Pong, - } - } - - /// Creates a custom message - pub fn custom_message(msg: impl Into) -> Self { - Self { - message_type: CustomRlpxProtoMessageId::CustomMessage, - message: CustomRlpxProtoMessageKind::CustomMessage(msg.into()), - } - } - - /// Creates a new `CustomRlpxProtoMessage` with the given message ID and payload. - pub fn encoded(&self) -> BytesMut { - let mut buf = BytesMut::new(); - buf.put_u8(self.message_type as u8); - match &self.message { - CustomRlpxProtoMessageKind::Ping => {} - CustomRlpxProtoMessageKind::Pong => {} - CustomRlpxProtoMessageKind::CustomMessage(msg) => { - buf.put(msg.as_bytes()); - } - } - buf - } - - /// Decodes a `CustomRlpxProtoMessage` from the given message buffer. - pub fn decode_message(buf: &mut &[u8]) -> Option { - if buf.is_empty() { - return None; - } - let id = buf[0]; - buf.advance(1); - let message_type = match id { - 0x00 => CustomRlpxProtoMessageId::Ping, - 0x01 => CustomRlpxProtoMessageId::Pong, - 0x02 => CustomRlpxProtoMessageId::CustomMessage, - _ => return None, - }; - let message = match message_type { - CustomRlpxProtoMessageId::Ping => CustomRlpxProtoMessageKind::Ping, - CustomRlpxProtoMessageId::Pong => CustomRlpxProtoMessageKind::Pong, - CustomRlpxProtoMessageId::CustomMessage => { - CustomRlpxProtoMessageKind::CustomMessage( - String::from_utf8_lossy(&buf[..]).into_owned(), - ) - } - }; - Some(Self { message_type, message }) - } - } -} +mod proto; /// Custom Rlpx Subprotocol Handler #[derive(Debug)] @@ -272,21 +189,14 @@ fn main() -> eyre::Result<()> { // node.network the rlpx can be similar to the test example, could even be something // like simple string message exchange - // let custom_rlpx_handler = CustomRlpxProtoHandler{ state: ProtocolState { events: - // node.network.events.clone()}}; node.network. - // add_rlpx_sub_protocol(custom_rlpx_handler); + let (tx, rx) = mpsc::unbounded_channel(); + + let custom_rlpx_handler = CustomRlpxProtoHandler { state: ProtocolState { events: tx } }; + // TODO implement traits + node.network.add_rlpx_sub_protocol(custom_rlpx_handler); // Spawn a task to handle incoming messages from the custom RLPx protocol node_exit_future.await }) } - -#[cfg(test)] -mod tests { - use super::*; - use reth_tracing::init_test_tracing; - - #[tokio::test(flavor = "multi_thread")] - async fn test_custom_rlpx_proto() {} -} diff --git a/examples/custom-rlpx-subprotocol/src/proto.rs b/examples/custom-rlpx-subprotocol/src/proto.rs new file mode 100644 index 000000000000..bc196683f545 --- /dev/null +++ b/examples/custom-rlpx-subprotocol/src/proto.rs @@ -0,0 +1,97 @@ +use reth_eth_wire::{protocol::Protocol, Capability}; +use reth_network::protocol::{ConnectionHandler, ProtocolHandler}; +use reth_primitives::{Buf, BufMut, BytesMut}; + +#[repr(u8)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum CustomRlpxProtoMessageId { + Ping = 0x00, + Pong = 0x01, + CustomMessage = 0x02, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum CustomRlpxProtoMessageKind { + Ping, + Pong, + CustomMessage(String), +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct CustomRlpxProtoMessage { + pub message_type: CustomRlpxProtoMessageId, + pub message: CustomRlpxProtoMessageKind, +} + +impl CustomRlpxProtoMessage { + /// Returns the capability for the `custom_rlpx` protocol. + pub fn capability() -> Capability { + Capability::new_static("custom_rlpx", 1) + } + + /// Returns the protocol for the `custom_rlpx` protocol. + pub fn protocol() -> Protocol { + Protocol::new(Self::capability(), 3) + } + + /// Creates a ping message + pub fn ping() -> Self { + Self { + message_type: CustomRlpxProtoMessageId::Ping, + message: CustomRlpxProtoMessageKind::Ping, + } + } + + /// Creates a pong message + pub fn pong() -> Self { + Self { + message_type: CustomRlpxProtoMessageId::Pong, + message: CustomRlpxProtoMessageKind::Pong, + } + } + + /// Creates a custom message + pub fn custom_message(msg: impl Into) -> Self { + Self { + message_type: CustomRlpxProtoMessageId::CustomMessage, + message: CustomRlpxProtoMessageKind::CustomMessage(msg.into()), + } + } + + /// Creates a new `CustomRlpxProtoMessage` with the given message ID and payload. + pub fn encoded(&self) -> BytesMut { + let mut buf = BytesMut::new(); + buf.put_u8(self.message_type as u8); + match &self.message { + CustomRlpxProtoMessageKind::Ping => {} + CustomRlpxProtoMessageKind::Pong => {} + CustomRlpxProtoMessageKind::CustomMessage(msg) => { + buf.put(msg.as_bytes()); + } + } + buf + } + + /// Decodes a `CustomRlpxProtoMessage` from the given message buffer. + pub fn decode_message(buf: &mut &[u8]) -> Option { + if buf.is_empty() { + return None; + } + let id = buf[0]; + buf.advance(1); + let message_type = match id { + 0x00 => CustomRlpxProtoMessageId::Ping, + 0x01 => CustomRlpxProtoMessageId::Pong, + 0x02 => CustomRlpxProtoMessageId::CustomMessage, + _ => return None, + }; + let message = match message_type { + CustomRlpxProtoMessageId::Ping => CustomRlpxProtoMessageKind::Ping, + CustomRlpxProtoMessageId::Pong => CustomRlpxProtoMessageKind::Pong, + CustomRlpxProtoMessageId::CustomMessage => CustomRlpxProtoMessageKind::CustomMessage( + String::from_utf8_lossy(&buf[..]).into_owned(), + ), + }; + Some(Self { message_type, message }) + } +} From 69cb33686a35940288125c43bd98e1680d0d97b8 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Thu, 20 Jun 2024 17:55:15 +0200 Subject: [PATCH 03/10] add rlpx subprotocol --- examples/custom-rlpx-subprotocol/src/main.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/custom-rlpx-subprotocol/src/main.rs b/examples/custom-rlpx-subprotocol/src/main.rs index 929f3ade8032..f3018918506d 100644 --- a/examples/custom-rlpx-subprotocol/src/main.rs +++ b/examples/custom-rlpx-subprotocol/src/main.rs @@ -34,8 +34,8 @@ use reth_eth_wire::{ capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, }; use reth_network::{ - protocol::{ConnectionHandler, OnNotSupported, ProtocolHandler}, - NetworkEvents, NetworkProtocols, + protocol::{ConnectionHandler, IntoRlpxSubProtocol, OnNotSupported, ProtocolHandler}, + NetworkProtocols, }; use reth_network_api::Direction; use reth_node_ethereum::EthereumNode; @@ -151,7 +151,7 @@ impl Stream for CustomRlpxConnection { loop { if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) { return match cmd { - Command::CustomMessage { msg, response } => { + Command::CustomMessage { msg: _, response } => { this.pending_pong = Some(response); Poll::Ready(Some(CustomRlpxProtoMessage::ping().encoded())) } @@ -180,20 +180,20 @@ impl Stream for CustomRlpxConnection { } fn main() -> eyre::Result<()> { - reth::cli::Cli::parse_args().run(|builder, args| async move { + reth::cli::Cli::parse_args().run(|builder, _args| async move { // launch the node - let NodeHandle { mut node, node_exit_future } = + let NodeHandle { node, node_exit_future } = builder.node(EthereumNode::default()).launch().await?; // After lauch and after launch we inject a new rlpx protocol handler via the network // node.network the rlpx can be similar to the test example, could even be something // like simple string message exchange - let (tx, rx) = mpsc::unbounded_channel(); + let (tx, _rx) = mpsc::unbounded_channel(); let custom_rlpx_handler = CustomRlpxProtoHandler { state: ProtocolState { events: tx } }; - // TODO implement traits - node.network.add_rlpx_sub_protocol(custom_rlpx_handler); + + node.network.add_rlpx_sub_protocol(custom_rlpx_handler.into_rlpx_sub_protocol()); // Spawn a task to handle incoming messages from the custom RLPx protocol From 42c1e837dd068171ed44848b931796ed0bbcc38f Mon Sep 17 00:00:00 2001 From: Luca Provini Date: Mon, 24 Jun 2024 17:15:27 +0200 Subject: [PATCH 04/10] reordered files --- Cargo.lock | 36 ++-- examples/custom-rlpx-subprotocol/Cargo.toml | 2 +- examples/custom-rlpx-subprotocol/src/main.rs | 187 +----------------- .../src/subprotocol/connection/handler.rs | 53 +++++ .../src/subprotocol/connection/mod.rs | 71 +++++++ .../src/subprotocol/mod.rs | 2 + .../src/subprotocol/protocol/event.rs | 15 ++ .../src/subprotocol/protocol/handler.rs | 34 ++++ .../protocol/message.rs} | 7 +- .../src/subprotocol/protocol/mod.rs | 3 + 10 files changed, 204 insertions(+), 206 deletions(-) create mode 100644 examples/custom-rlpx-subprotocol/src/subprotocol/connection/handler.rs create mode 100644 examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs create mode 100644 examples/custom-rlpx-subprotocol/src/subprotocol/mod.rs create mode 100644 examples/custom-rlpx-subprotocol/src/subprotocol/protocol/event.rs create mode 100644 examples/custom-rlpx-subprotocol/src/subprotocol/protocol/handler.rs rename examples/custom-rlpx-subprotocol/src/{proto.rs => subprotocol/protocol/message.rs} (94%) create mode 100644 examples/custom-rlpx-subprotocol/src/subprotocol/protocol/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 6732f1534977..cbc882ef40ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2207,24 +2207,6 @@ dependencies = [ "syn 2.0.66", ] -[[package]] -name = "custom-rlpx-subprotocol" -version = "0.1.0" -dependencies = [ - "eyre", - "futures", - "reth", - "reth-eth-wire", - "reth-network", - "reth-network-api", - "reth-node-ethereum", - "reth-primitives", - "reth-provider", - "reth-rpc-types", - "tokio", - "tokio-stream", -] - [[package]] name = "darling" version = "0.20.9" @@ -2880,6 +2862,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "example-custom-rlpx-subprotocol" +version = "0.1.0" +dependencies = [ + "eyre", + "futures", + "reth", + "reth-eth-wire", + "reth-network", + "reth-network-api", + "reth-node-ethereum", + "reth-primitives", + "reth-provider", + "reth-rpc-types", + "tokio", + "tokio-stream", +] + [[package]] name = "example-db-access" version = "0.0.0" diff --git a/examples/custom-rlpx-subprotocol/Cargo.toml b/examples/custom-rlpx-subprotocol/Cargo.toml index 0050f263ed07..ed92fc9b1611 100644 --- a/examples/custom-rlpx-subprotocol/Cargo.toml +++ b/examples/custom-rlpx-subprotocol/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "custom-rlpx-subprotocol" +name = "example-custom-rlpx-subprotocol" version = "0.1.0" edition = "2021" diff --git a/examples/custom-rlpx-subprotocol/src/main.rs b/examples/custom-rlpx-subprotocol/src/main.rs index f3018918506d..b41d1532f31d 100644 --- a/examples/custom-rlpx-subprotocol/src/main.rs +++ b/examples/custom-rlpx-subprotocol/src/main.rs @@ -1,196 +1,17 @@ -//! This example showcase custom rlpx subprotocols -//! -//! This installs a custom rlpx subprotocol and negotiates it with peers. If a remote peer also -//! supports this protocol, it will be used to exchange custom messages. - -#![cfg_attr(not(test), warn(unused_crate_dependencies))] - -use crate::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind}; -use futures::{Stream, StreamExt}; -use reth::{builder::NodeHandle, network}; -use reth_eth_wire::{ - capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, -}; -use reth_network::{ - protocol::{ConnectionHandler, OnNotSupported, ProtocolHandler}, - test_utils::Testnet, - NetworkProtocols, -}; -use reth_network_api::Direction; -use reth_node_ethereum::EthereumNode; -use reth_primitives::BytesMut; -use reth_provider::test_utils::MockEthProvider; -use reth_rpc_types::PeerId; -use std::{ - net::SocketAddr, - pin::Pin, - task::{ready, Context, Poll}, -}; - -use crate::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind}; -use futures::{Stream, StreamExt}; use reth::builder::NodeHandle; -use reth_eth_wire::{ - capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, -}; -use reth_network::{ - protocol::{ConnectionHandler, IntoRlpxSubProtocol, OnNotSupported, ProtocolHandler}, - NetworkProtocols, -}; -use reth_network_api::Direction; +use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols}; use reth_node_ethereum::EthereumNode; -use reth_primitives::BytesMut; -use reth_rpc_types::PeerId; -use tokio::sync::{mpsc, oneshot}; -use tokio_stream::wrappers::UnboundedReceiverStream; - -mod proto; - -/// Custom Rlpx Subprotocol Handler -#[derive(Debug)] -struct CustomRlpxProtoHandler { - state: ProtocolState, -} - -impl ProtocolHandler for CustomRlpxProtoHandler { - type ConnectionHandler = CustomRlpxConnectionHandler; - - fn on_incoming(&self, _socket_addr: SocketAddr) -> Option { - Some(CustomRlpxConnectionHandler { state: self.state.clone() }) - } - - fn on_outgoing( - &self, - _socket_addr: SocketAddr, - _peer_id: PeerId, - ) -> Option { - Some(CustomRlpxConnectionHandler { state: self.state.clone() }) - } -} - -#[derive(Clone, Debug)] -struct ProtocolState { - events: mpsc::UnboundedSender, -} - -#[derive(Debug)] -enum ProtocolEvent { - Established { - #[allow(dead_code)] - direction: Direction, - peer_id: PeerId, - to_connection: mpsc::UnboundedSender, - }, -} - -enum Command { - /// Send a custom message to the peer - CustomMessage { - msg: String, - /// The response will be sent to this channel. - response: oneshot::Sender, - }, -} - -struct CustomRlpxConnectionHandler { - state: ProtocolState, -} - -impl ConnectionHandler for CustomRlpxConnectionHandler { - type Connection = CustomRlpxConnection; - - fn protocol(&self) -> Protocol { - CustomRlpxProtoMessage::protocol() - } - - fn on_unsupported_by_peer( - self, - _supported: &SharedCapabilities, - _direction: Direction, - _peer_id: PeerId, - ) -> OnNotSupported { - OnNotSupported::KeepAlive - } - - fn into_connection( - self, - direction: Direction, - peer_id: PeerId, - conn: ProtocolConnection, - ) -> Self::Connection { - let (tx, rx) = mpsc::unbounded_channel(); - self.state - .events - .send(ProtocolEvent::Established { direction, peer_id, to_connection: tx }) - .ok(); - CustomRlpxConnection { - conn, - initial_ping: direction.is_outgoing().then(CustomRlpxProtoMessage::ping), - commands: UnboundedReceiverStream::new(rx), - pending_pong: None, - } - } -} +use subprotocol::protocol::handler::{CustomRlpxProtoHandler, ProtocolState}; +use tokio::sync::mpsc; -struct CustomRlpxConnection { - conn: ProtocolConnection, - initial_ping: Option, - commands: UnboundedReceiverStream, - pending_pong: Option>, -} - -impl Stream for CustomRlpxConnection { - type Item = BytesMut; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - if let Some(initial_ping) = this.initial_ping.take() { - return Poll::Ready(Some(initial_ping.encoded())); - } - - loop { - if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) { - return match cmd { - Command::CustomMessage { msg: _, response } => { - this.pending_pong = Some(response); - Poll::Ready(Some(CustomRlpxProtoMessage::ping().encoded())) - } - }; - } - let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; - let Some(msg) = CustomRlpxProtoMessage::decode_message(&mut &msg[..]) else { - return Poll::Ready(None); - }; - - match msg.message { - CustomRlpxProtoMessageKind::Ping => { - return Poll::Ready(Some(CustomRlpxProtoMessage::pong().encoded())) - } - CustomRlpxProtoMessageKind::Pong => {} - CustomRlpxProtoMessageKind::CustomMessage(msg) => { - if let Some(sender) = this.pending_pong.take() { - sender.send(msg).ok(); - } - continue; - } - } - return Poll::Pending; - } - } -} +mod subprotocol; fn main() -> eyre::Result<()> { reth::cli::Cli::parse_args().run(|builder, _args| async move { - // launch the node let NodeHandle { node, node_exit_future } = builder.node(EthereumNode::default()).launch().await?; - // After lauch and after launch we inject a new rlpx protocol handler via the network - // node.network the rlpx can be similar to the test example, could even be something - // like simple string message exchange - let (tx, _rx) = mpsc::unbounded_channel(); - let custom_rlpx_handler = CustomRlpxProtoHandler { state: ProtocolState { events: tx } }; node.network.add_rlpx_sub_protocol(custom_rlpx_handler.into_rlpx_sub_protocol()); diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/connection/handler.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/handler.rs new file mode 100644 index 000000000000..fd37cb221342 --- /dev/null +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/handler.rs @@ -0,0 +1,53 @@ +use super::CustomRlpxConnection; +use crate::subprotocol::protocol::{ + event::ProtocolEvent, handler::ProtocolState, message::CustomRlpxProtoMessage, +}; +use reth_eth_wire::{ + capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, +}; +use reth_network::protocol::{ConnectionHandler, OnNotSupported}; +use reth_network_api::Direction; +use reth_rpc_types::PeerId; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// The connection handler for the custom RLPx protocol. +pub(crate) struct CustomRlpxConnectionHandler { + pub(crate) state: ProtocolState, +} + +impl ConnectionHandler for CustomRlpxConnectionHandler { + type Connection = CustomRlpxConnection; + + fn protocol(&self) -> Protocol { + CustomRlpxProtoMessage::protocol() + } + + fn on_unsupported_by_peer( + self, + _supported: &SharedCapabilities, + _direction: Direction, + _peer_id: PeerId, + ) -> OnNotSupported { + OnNotSupported::KeepAlive + } + + fn into_connection( + self, + direction: Direction, + peer_id: PeerId, + conn: ProtocolConnection, + ) -> Self::Connection { + let (tx, rx) = mpsc::unbounded_channel(); + self.state + .events + .send(ProtocolEvent::Established { direction, peer_id, to_connection: tx }) + .ok(); + CustomRlpxConnection { + conn, + initial_ping: direction.is_outgoing().then(CustomRlpxProtoMessage::ping), + commands: UnboundedReceiverStream::new(rx), + pending_pong: None, + } + } +} diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs new file mode 100644 index 000000000000..fcd904b2accb --- /dev/null +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs @@ -0,0 +1,71 @@ +use futures::{Stream, StreamExt}; +use reth_eth_wire::multiplex::ProtocolConnection; +use reth_primitives::BytesMut; +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; +use tokio::sync::oneshot; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use super::protocol::message::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind}; + +pub(crate) mod handler; + +/// We define some custom commands that the subprotocol supports. +pub(crate) enum CustomCommand { + /// Sends a message to the peer + Message { + msg: String, + /// The response will be sent to this channel. + response: oneshot::Sender, + }, +} + +/// The connection handler for the custom RLPx protocol. +pub(crate) struct CustomRlpxConnection { + conn: ProtocolConnection, + initial_ping: Option, + commands: UnboundedReceiverStream, + pending_pong: Option>, +} + +impl Stream for CustomRlpxConnection { + type Item = BytesMut; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + if let Some(initial_ping) = this.initial_ping.take() { + return Poll::Ready(Some(initial_ping.encoded())); + } + + loop { + if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) { + return match cmd { + CustomCommand::Message { msg: _, response } => { + this.pending_pong = Some(response); + Poll::Ready(Some(CustomRlpxProtoMessage::ping().encoded())) + } + }; + } + let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; + let Some(msg) = CustomRlpxProtoMessage::decode_message(&mut &msg[..]) else { + return Poll::Ready(None); + }; + + match msg.message { + CustomRlpxProtoMessageKind::Ping => { + return Poll::Ready(Some(CustomRlpxProtoMessage::pong().encoded())) + } + CustomRlpxProtoMessageKind::Pong => {} + CustomRlpxProtoMessageKind::CustomMessage(msg) => { + if let Some(sender) = this.pending_pong.take() { + sender.send(msg).ok(); + } + continue; + } + } + return Poll::Pending; + } + } +} diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/mod.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/mod.rs new file mode 100644 index 000000000000..53ec0dc1d4e7 --- /dev/null +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod connection; +pub(crate) mod protocol; diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/event.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/event.rs new file mode 100644 index 000000000000..7c34374f1b76 --- /dev/null +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/event.rs @@ -0,0 +1,15 @@ +use crate::subprotocol::connection::CustomCommand; +use reth_network::Direction; +use reth_network_api::PeerId; +use tokio::sync::mpsc; + +/// The event that can be emitted by our custom protocol. +#[derive(Debug)] +pub(crate) enum ProtocolEvent { + Established { + #[allow(dead_code)] + direction: Direction, + peer_id: PeerId, + to_connection: mpsc::UnboundedSender, + }, +} diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/handler.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/handler.rs new file mode 100644 index 000000000000..d5a35398dae1 --- /dev/null +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/handler.rs @@ -0,0 +1,34 @@ +use super::event::ProtocolEvent; +use crate::subprotocol::connection::handler::CustomRlpxConnectionHandler; +use reth_network::protocol::ProtocolHandler; +use reth_network_api::PeerId; +use std::net::SocketAddr; +use tokio::sync::mpsc; + +/// Protocol state is an helper struct to store the protocol events. +#[derive(Clone, Debug)] +pub(crate) struct ProtocolState { + pub(crate) events: mpsc::UnboundedSender, +} + +/// The protocol handler takes care of incoming and outgoing connections. +#[derive(Debug)] +pub(crate) struct CustomRlpxProtoHandler { + pub state: ProtocolState, +} + +impl ProtocolHandler for CustomRlpxProtoHandler { + type ConnectionHandler = CustomRlpxConnectionHandler; + + fn on_incoming(&self, _socket_addr: SocketAddr) -> Option { + Some(CustomRlpxConnectionHandler { state: self.state.clone() }) + } + + fn on_outgoing( + &self, + _socket_addr: SocketAddr, + _peer_id: PeerId, + ) -> Option { + Some(CustomRlpxConnectionHandler { state: self.state.clone() }) + } +} diff --git a/examples/custom-rlpx-subprotocol/src/proto.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/message.rs similarity index 94% rename from examples/custom-rlpx-subprotocol/src/proto.rs rename to examples/custom-rlpx-subprotocol/src/subprotocol/protocol/message.rs index bc196683f545..16c0c85800e5 100644 --- a/examples/custom-rlpx-subprotocol/src/proto.rs +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/message.rs @@ -1,24 +1,23 @@ use reth_eth_wire::{protocol::Protocol, Capability}; -use reth_network::protocol::{ConnectionHandler, ProtocolHandler}; use reth_primitives::{Buf, BufMut, BytesMut}; #[repr(u8)] #[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum CustomRlpxProtoMessageId { +pub(crate) enum CustomRlpxProtoMessageId { Ping = 0x00, Pong = 0x01, CustomMessage = 0x02, } #[derive(Clone, Debug, PartialEq, Eq)] -pub enum CustomRlpxProtoMessageKind { +pub(crate) enum CustomRlpxProtoMessageKind { Ping, Pong, CustomMessage(String), } #[derive(Clone, Debug, PartialEq, Eq)] -pub struct CustomRlpxProtoMessage { +pub(crate) struct CustomRlpxProtoMessage { pub message_type: CustomRlpxProtoMessageId, pub message: CustomRlpxProtoMessageKind, } diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/mod.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/mod.rs new file mode 100644 index 000000000000..dd774c892fdd --- /dev/null +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod event; +pub(crate) mod handler; +pub(crate) mod message; From ef069d71a416636ee9ddef00bc558a934f468628 Mon Sep 17 00:00:00 2001 From: Luca Provini Date: Mon, 24 Jun 2024 17:25:48 +0200 Subject: [PATCH 05/10] added new network spawn --- Cargo.lock | 1 + examples/custom-rlpx-subprotocol/Cargo.toml | 1 + examples/custom-rlpx-subprotocol/src/main.rs | 18 +++++++++++++++--- .../src/subprotocol/protocol/event.rs | 2 +- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cbc882ef40ea..0946d9ba07c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2868,6 +2868,7 @@ version = "0.1.0" dependencies = [ "eyre", "futures", + "rand 0.8.5", "reth", "reth-eth-wire", "reth-network", diff --git a/examples/custom-rlpx-subprotocol/Cargo.toml b/examples/custom-rlpx-subprotocol/Cargo.toml index ed92fc9b1611..1e0a2c4dad66 100644 --- a/examples/custom-rlpx-subprotocol/Cargo.toml +++ b/examples/custom-rlpx-subprotocol/Cargo.toml @@ -18,3 +18,4 @@ tokio-stream.workspace = true reth-rpc-types.workspace = true reth.workspace = true eyre.workspace = true +rand.workspace = true diff --git a/examples/custom-rlpx-subprotocol/src/main.rs b/examples/custom-rlpx-subprotocol/src/main.rs index b41d1532f31d..fca0e5e39b92 100644 --- a/examples/custom-rlpx-subprotocol/src/main.rs +++ b/examples/custom-rlpx-subprotocol/src/main.rs @@ -1,6 +1,10 @@ use reth::builder::NodeHandle; -use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols}; +use reth_network::{ + config::SecretKey, protocol::IntoRlpxSubProtocol, NetworkConfig, NetworkManager, + NetworkProtocols, +}; use reth_node_ethereum::EthereumNode; +use reth_provider::test_utils::NoopProvider; use subprotocol::protocol::handler::{CustomRlpxProtoHandler, ProtocolState}; use tokio::sync::mpsc; @@ -12,11 +16,19 @@ fn main() -> eyre::Result<()> { builder.node(EthereumNode::default()).launch().await?; let (tx, _rx) = mpsc::unbounded_channel(); - let custom_rlpx_handler = CustomRlpxProtoHandler { state: ProtocolState { events: tx } }; + let custom_rlpx_handler = + CustomRlpxProtoHandler { state: ProtocolState { events: tx.clone() } }; node.network.add_rlpx_sub_protocol(custom_rlpx_handler.into_rlpx_sub_protocol()); - // Spawn a task to handle incoming messages from the custom RLPx protocol + let custom_rlpx_handler_2 = CustomRlpxProtoHandler { state: ProtocolState { events: tx } }; + let secret_key = SecretKey::new(&mut rand::thread_rng()); + let net_cfg = NetworkConfig::builder(secret_key) + .add_rlpx_sub_protocol(custom_rlpx_handler_2.into_rlpx_sub_protocol()) + .build(NoopProvider::default()); + let network = NetworkManager::new(net_cfg).await.unwrap(); + + tokio::spawn(network); node_exit_future.await }) diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/event.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/event.rs index 7c34374f1b76..ea9e588e592b 100644 --- a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/event.rs +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/event.rs @@ -3,7 +3,7 @@ use reth_network::Direction; use reth_network_api::PeerId; use tokio::sync::mpsc; -/// The event that can be emitted by our custom protocol. +/// The events that can be emitted by our custom protocol. #[derive(Debug)] pub(crate) enum ProtocolEvent { Established { From bb3bd793c4a7cedf029b981bf6c9f85e62eac1c7 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Tue, 25 Jun 2024 15:37:26 +0200 Subject: [PATCH 06/10] connecting launched node, establishing session and send pings --- examples/custom-rlpx-subprotocol/src/main.rs | 70 ++++++++++++++++--- .../src/subprotocol/connection/mod.rs | 1 + .../src/subprotocol/protocol/message.rs | 8 --- 3 files changed, 62 insertions(+), 17 deletions(-) diff --git a/examples/custom-rlpx-subprotocol/src/main.rs b/examples/custom-rlpx-subprotocol/src/main.rs index fca0e5e39b92..dc29743d0908 100644 --- a/examples/custom-rlpx-subprotocol/src/main.rs +++ b/examples/custom-rlpx-subprotocol/src/main.rs @@ -3,32 +3,84 @@ use reth_network::{ config::SecretKey, protocol::IntoRlpxSubProtocol, NetworkConfig, NetworkManager, NetworkProtocols, }; +use reth_network_api::NetworkInfo; use reth_node_ethereum::EthereumNode; +use reth_primitives::revm_primitives::FixedBytes; use reth_provider::test_utils::NoopProvider; -use subprotocol::protocol::handler::{CustomRlpxProtoHandler, ProtocolState}; -use tokio::sync::mpsc; +use subprotocol::{ + connection::CustomCommand, + protocol::{ + event::ProtocolEvent, + handler::{CustomRlpxProtoHandler, ProtocolState}, + }, +}; +use tokio::sync::{mpsc, oneshot}; mod subprotocol; fn main() -> eyre::Result<()> { reth::cli::Cli::parse_args().run(|builder, _args| async move { + // launch the node let NodeHandle { node, node_exit_future } = builder.node(EthereumNode::default()).launch().await?; + let peer_id = node.network.peer_id(); + let peer_addr = node.network.local_addr(); - let (tx, _rx) = mpsc::unbounded_channel(); - let custom_rlpx_handler = - CustomRlpxProtoHandler { state: ProtocolState { events: tx.clone() } }; - + // add the custom network subprotocol to the launched node + let (tx, mut from_peer0) = mpsc::unbounded_channel(); + let custom_rlpx_handler = CustomRlpxProtoHandler { state: ProtocolState { events: tx } }; node.network.add_rlpx_sub_protocol(custom_rlpx_handler.into_rlpx_sub_protocol()); - let custom_rlpx_handler_2 = CustomRlpxProtoHandler { state: ProtocolState { events: tx } }; + // creates a separate network instance and adds the custom network subprotocol let secret_key = SecretKey::new(&mut rand::thread_rng()); + let (tx, mut from_peer1) = mpsc::unbounded_channel(); + let custom_rlpx_handler_2 = CustomRlpxProtoHandler { state: ProtocolState { events: tx } }; let net_cfg = NetworkConfig::builder(secret_key) .add_rlpx_sub_protocol(custom_rlpx_handler_2.into_rlpx_sub_protocol()) .build(NoopProvider::default()); - let network = NetworkManager::new(net_cfg).await.unwrap(); - tokio::spawn(network); + // spawn the second network instance + let subnetwork = NetworkManager::new(net_cfg).await?; + let subnetwork_peer_id = *subnetwork.peer_id(); + let subnetwork_peer_addr = subnetwork.local_addr(); + let subnetwork_handle = subnetwork.peers_handle(); + node.task_executor.spawn(subnetwork); + + // connect the launched node to the subnetwork + node.network.peers_handle().add_peer(FixedBytes(*subnetwork_peer_id), subnetwork_peer_addr); + + // connect the subnetwork to the launched node + subnetwork_handle.add_peer(*peer_id, peer_addr); + + // establish connection between peer0 and peer1 + let peer0_to_peer1 = from_peer0.recv().await.expect("peer0 connecting to peer1"); + let peer0_conn = match peer0_to_peer1 { + ProtocolEvent::Established { direction: _, peer_id, to_connection } => { + assert_eq!(peer_id, subnetwork_peer_id); + to_connection + } + }; + + // establish connection between peer1 and peer0 + let peer1_to_peer0 = from_peer1.recv().await.expect("peer1 connecting to peer0"); + let peer1_conn = match peer1_to_peer0 { + ProtocolEvent::Established { direction: _, peer_id: peer1_id, to_connection } => { + assert_eq!(peer1_id, *peer_id); + to_connection + } + }; + + // send a ping message from peer0 to peer1 + let (tx, rx) = oneshot::channel(); + peer0_conn.send(CustomCommand::Message { msg: "hello!".to_owned(), response: tx })?; + let response = rx.await?; + assert_eq!(response, "hello!"); + + // send a ping message from peer1 to peer0 + let (tx, rx) = oneshot::channel(); + peer1_conn.send(CustomCommand::Message { msg: "world!".to_owned(), response: tx })?; + let response = rx.await?; + assert_eq!(response, "world!"); node_exit_future.await }) diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs index fcd904b2accb..c5e627b94023 100644 --- a/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs @@ -16,6 +16,7 @@ pub(crate) mod handler; pub(crate) enum CustomCommand { /// Sends a message to the peer Message { + #[allow(dead_code)] msg: String, /// The response will be sent to this channel. response: oneshot::Sender, diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/message.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/message.rs index 16c0c85800e5..fa60bf6e40dc 100644 --- a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/message.rs +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/message.rs @@ -49,14 +49,6 @@ impl CustomRlpxProtoMessage { } } - /// Creates a custom message - pub fn custom_message(msg: impl Into) -> Self { - Self { - message_type: CustomRlpxProtoMessageId::CustomMessage, - message: CustomRlpxProtoMessageKind::CustomMessage(msg.into()), - } - } - /// Creates a new `CustomRlpxProtoMessage` with the given message ID and payload. pub fn encoded(&self) -> BytesMut { let mut buf = BytesMut::new(); From 74f5a852365c451217cbfa49d90bb55623c05b23 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Tue, 25 Jun 2024 15:41:26 +0200 Subject: [PATCH 07/10] deny --- Cargo.lock | 2 +- examples/custom-rlpx-subprotocol/Cargo.toml | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0946d9ba07c1..407e00a579ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2864,7 +2864,7 @@ dependencies = [ [[package]] name = "example-custom-rlpx-subprotocol" -version = "0.1.0" +version = "0.0.0" dependencies = [ "eyre", "futures", diff --git a/examples/custom-rlpx-subprotocol/Cargo.toml b/examples/custom-rlpx-subprotocol/Cargo.toml index 1e0a2c4dad66..80d0341a3a01 100644 --- a/examples/custom-rlpx-subprotocol/Cargo.toml +++ b/examples/custom-rlpx-subprotocol/Cargo.toml @@ -1,9 +1,10 @@ [package] name = "example-custom-rlpx-subprotocol" -version = "0.1.0" -edition = "2021" +version = "0.0.0" +publish = false +edition.workspace = true +license.workspace = true -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] tokio = { workspace = true, features = ["full"] } @@ -14,8 +15,8 @@ reth-network-api.workspace = true reth-node-ethereum.workspace = true reth-provider.workspace = true reth-primitives.workspace = true -tokio-stream.workspace = true reth-rpc-types.workspace = true reth.workspace = true +tokio-stream.workspace = true eyre.workspace = true rand.workspace = true From b1283d74075757846f8e50e9d7e1f69ea5ce741e Mon Sep 17 00:00:00 2001 From: Loocapro Date: Wed, 26 Jun 2024 11:56:57 +0200 Subject: [PATCH 08/10] review --- Cargo.lock | 1 + examples/custom-rlpx-subprotocol/Cargo.toml | 1 + examples/custom-rlpx-subprotocol/src/main.rs | 15 +++++++++++++-- .../src/subprotocol/connection/handler.rs | 2 +- .../src/subprotocol/connection/mod.rs | 2 +- .../src/subprotocol/protocol/mod.rs | 2 +- .../subprotocol/protocol/{message.rs => proto.rs} | 3 +++ 7 files changed, 21 insertions(+), 5 deletions(-) rename examples/custom-rlpx-subprotocol/src/subprotocol/protocol/{message.rs => proto.rs} (94%) diff --git a/Cargo.lock b/Cargo.lock index 407e00a579ae..cbf16aae5329 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2879,6 +2879,7 @@ dependencies = [ "reth-rpc-types", "tokio", "tokio-stream", + "tracing", ] [[package]] diff --git a/examples/custom-rlpx-subprotocol/Cargo.toml b/examples/custom-rlpx-subprotocol/Cargo.toml index 80d0341a3a01..ae3a7c088c04 100644 --- a/examples/custom-rlpx-subprotocol/Cargo.toml +++ b/examples/custom-rlpx-subprotocol/Cargo.toml @@ -20,3 +20,4 @@ reth.workspace = true tokio-stream.workspace = true eyre.workspace = true rand.workspace = true +tracing.workspace = true diff --git a/examples/custom-rlpx-subprotocol/src/main.rs b/examples/custom-rlpx-subprotocol/src/main.rs index dc29743d0908..7e9bdc1e2ade 100644 --- a/examples/custom-rlpx-subprotocol/src/main.rs +++ b/examples/custom-rlpx-subprotocol/src/main.rs @@ -1,3 +1,12 @@ +//! Example for how to customize the network layer by adding a custom rlpx subprotocol. +//! +//! Run with +//! +//! ```not_rust +//! cargo run -p custom-rlpx-subprotocol -- node +//! ``` +//! +//! This launch a regular reth node with a custom rlpx subprotocol. use reth::builder::NodeHandle; use reth_network::{ config::SecretKey, protocol::IntoRlpxSubProtocol, NetworkConfig, NetworkManager, @@ -5,7 +14,6 @@ use reth_network::{ }; use reth_network_api::NetworkInfo; use reth_node_ethereum::EthereumNode; -use reth_primitives::revm_primitives::FixedBytes; use reth_provider::test_utils::NoopProvider; use subprotocol::{ connection::CustomCommand, @@ -15,6 +23,7 @@ use subprotocol::{ }, }; use tokio::sync::{mpsc, oneshot}; +use tracing::info; mod subprotocol; @@ -47,7 +56,7 @@ fn main() -> eyre::Result<()> { node.task_executor.spawn(subnetwork); // connect the launched node to the subnetwork - node.network.peers_handle().add_peer(FixedBytes(*subnetwork_peer_id), subnetwork_peer_addr); + node.network.peers_handle().add_peer(subnetwork_peer_id, subnetwork_peer_addr); // connect the subnetwork to the launched node subnetwork_handle.add_peer(*peer_id, peer_addr); @@ -82,6 +91,8 @@ fn main() -> eyre::Result<()> { let response = rx.await?; assert_eq!(response, "world!"); + info!("Peers connected via custom rlpx subprotocol!"); + node_exit_future.await }) } diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/connection/handler.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/handler.rs index fd37cb221342..dae2d5c8679e 100644 --- a/examples/custom-rlpx-subprotocol/src/subprotocol/connection/handler.rs +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/handler.rs @@ -1,6 +1,6 @@ use super::CustomRlpxConnection; use crate::subprotocol::protocol::{ - event::ProtocolEvent, handler::ProtocolState, message::CustomRlpxProtoMessage, + event::ProtocolEvent, handler::ProtocolState, proto::CustomRlpxProtoMessage, }; use reth_eth_wire::{ capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs index c5e627b94023..06b862544566 100644 --- a/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs @@ -8,7 +8,7 @@ use std::{ use tokio::sync::oneshot; use tokio_stream::wrappers::UnboundedReceiverStream; -use super::protocol::message::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind}; +use super::protocol::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind}; pub(crate) mod handler; diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/mod.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/mod.rs index dd774c892fdd..8aba9a4e3506 100644 --- a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/mod.rs +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/mod.rs @@ -1,3 +1,3 @@ pub(crate) mod event; pub(crate) mod handler; -pub(crate) mod message; +pub(crate) mod proto; diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/message.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/proto.rs similarity index 94% rename from examples/custom-rlpx-subprotocol/src/subprotocol/protocol/message.rs rename to examples/custom-rlpx-subprotocol/src/subprotocol/protocol/proto.rs index fa60bf6e40dc..63e09fd40c47 100644 --- a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/message.rs +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/proto.rs @@ -1,3 +1,6 @@ +//! Simple RLPx Ping Pong protocol that also support sending messages, +//! following [RLPx specs](https://github.com/ethereum/devp2p/blob/master/rlpx.md) + use reth_eth_wire::{protocol::Protocol, Capability}; use reth_primitives::{Buf, BufMut, BytesMut}; From 3d2b9335774947fe8a98c0d12f7266c6e4798126 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Wed, 26 Jun 2024 14:40:22 +0200 Subject: [PATCH 09/10] more logs and default prots --- examples/custom-rlpx-subprotocol/src/main.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/examples/custom-rlpx-subprotocol/src/main.rs b/examples/custom-rlpx-subprotocol/src/main.rs index 7e9bdc1e2ade..5b0543101a82 100644 --- a/examples/custom-rlpx-subprotocol/src/main.rs +++ b/examples/custom-rlpx-subprotocol/src/main.rs @@ -15,6 +15,7 @@ use reth_network::{ use reth_network_api::NetworkInfo; use reth_node_ethereum::EthereumNode; use reth_provider::test_utils::NoopProvider; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use subprotocol::{ connection::CustomCommand, protocol::{ @@ -45,6 +46,7 @@ fn main() -> eyre::Result<()> { let (tx, mut from_peer1) = mpsc::unbounded_channel(); let custom_rlpx_handler_2 = CustomRlpxProtoHandler { state: ProtocolState { events: tx } }; let net_cfg = NetworkConfig::builder(secret_key) + .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) .add_rlpx_sub_protocol(custom_rlpx_handler_2.into_rlpx_sub_protocol()) .build(NoopProvider::default()); @@ -78,20 +80,23 @@ fn main() -> eyre::Result<()> { to_connection } }; + info!(target:"rlpx-subprotocol", "Connection established!"); // send a ping message from peer0 to peer1 let (tx, rx) = oneshot::channel(); peer0_conn.send(CustomCommand::Message { msg: "hello!".to_owned(), response: tx })?; let response = rx.await?; assert_eq!(response, "hello!"); + info!(target:"rlpx-subprotocol", ?response, "New message received"); // send a ping message from peer1 to peer0 let (tx, rx) = oneshot::channel(); peer1_conn.send(CustomCommand::Message { msg: "world!".to_owned(), response: tx })?; let response = rx.await?; assert_eq!(response, "world!"); + info!(target:"rlpx-subprotocol", ?response, "New message received"); - info!("Peers connected via custom rlpx subprotocol!"); + info!(target:"rlpx-subprotocol", "Peers connected via custom rlpx subprotocol!"); node_exit_future.await }) From 32b12c64a8cf43ef87ef0755605bc09329c899d4 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Wed, 26 Jun 2024 17:40:35 +0200 Subject: [PATCH 10/10] correcting cargo run command --- examples/custom-rlpx-subprotocol/src/main.rs | 7 ++-- .../src/subprotocol/connection/mod.rs | 26 +++++++------ .../src/subprotocol/protocol/proto.rs | 38 +++++++++++++++---- 3 files changed, 49 insertions(+), 22 deletions(-) diff --git a/examples/custom-rlpx-subprotocol/src/main.rs b/examples/custom-rlpx-subprotocol/src/main.rs index 5b0543101a82..3a198c38d285 100644 --- a/examples/custom-rlpx-subprotocol/src/main.rs +++ b/examples/custom-rlpx-subprotocol/src/main.rs @@ -3,7 +3,7 @@ //! Run with //! //! ```not_rust -//! cargo run -p custom-rlpx-subprotocol -- node +//! cargo run -p example-custom-rlpx-subprotocol -- node //! ``` //! //! This launch a regular reth node with a custom rlpx subprotocol. @@ -47,6 +47,7 @@ fn main() -> eyre::Result<()> { let custom_rlpx_handler_2 = CustomRlpxProtoHandler { state: ProtocolState { events: tx } }; let net_cfg = NetworkConfig::builder(secret_key) .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) + .disable_discovery() .add_rlpx_sub_protocol(custom_rlpx_handler_2.into_rlpx_sub_protocol()) .build(NoopProvider::default()); @@ -84,14 +85,14 @@ fn main() -> eyre::Result<()> { // send a ping message from peer0 to peer1 let (tx, rx) = oneshot::channel(); - peer0_conn.send(CustomCommand::Message { msg: "hello!".to_owned(), response: tx })?; + peer0_conn.send(CustomCommand::Message { msg: "hello!".to_string(), response: tx })?; let response = rx.await?; assert_eq!(response, "hello!"); info!(target:"rlpx-subprotocol", ?response, "New message received"); // send a ping message from peer1 to peer0 let (tx, rx) = oneshot::channel(); - peer1_conn.send(CustomCommand::Message { msg: "world!".to_owned(), response: tx })?; + peer1_conn.send(CustomCommand::Message { msg: "world!".to_string(), response: tx })?; let response = rx.await?; assert_eq!(response, "world!"); info!(target:"rlpx-subprotocol", ?response, "New message received"); diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs index 06b862544566..a6d835b70c26 100644 --- a/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/connection/mod.rs @@ -1,3 +1,4 @@ +use super::protocol::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind}; use futures::{Stream, StreamExt}; use reth_eth_wire::multiplex::ProtocolConnection; use reth_primitives::BytesMut; @@ -8,15 +9,12 @@ use std::{ use tokio::sync::oneshot; use tokio_stream::wrappers::UnboundedReceiverStream; -use super::protocol::proto::{CustomRlpxProtoMessage, CustomRlpxProtoMessageKind}; - pub(crate) mod handler; /// We define some custom commands that the subprotocol supports. pub(crate) enum CustomCommand { /// Sends a message to the peer Message { - #[allow(dead_code)] msg: String, /// The response will be sent to this channel. response: oneshot::Sender, @@ -37,21 +35,23 @@ impl Stream for CustomRlpxConnection { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); if let Some(initial_ping) = this.initial_ping.take() { - return Poll::Ready(Some(initial_ping.encoded())); + return Poll::Ready(Some(initial_ping.encoded())) } loop { if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) { return match cmd { - CustomCommand::Message { msg: _, response } => { + CustomCommand::Message { msg, response } => { this.pending_pong = Some(response); - Poll::Ready(Some(CustomRlpxProtoMessage::ping().encoded())) + Poll::Ready(Some(CustomRlpxProtoMessage::ping_message(msg).encoded())) } - }; + } } + let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; + let Some(msg) = CustomRlpxProtoMessage::decode_message(&mut &msg[..]) else { - return Poll::Ready(None); + return Poll::Ready(None) }; match msg.message { @@ -59,14 +59,18 @@ impl Stream for CustomRlpxConnection { return Poll::Ready(Some(CustomRlpxProtoMessage::pong().encoded())) } CustomRlpxProtoMessageKind::Pong => {} - CustomRlpxProtoMessageKind::CustomMessage(msg) => { + CustomRlpxProtoMessageKind::PingMessage(msg) => { + return Poll::Ready(Some(CustomRlpxProtoMessage::pong_message(msg).encoded())) + } + CustomRlpxProtoMessageKind::PongMessage(msg) => { if let Some(sender) = this.pending_pong.take() { sender.send(msg).ok(); } - continue; + continue } } - return Poll::Pending; + + return Poll::Pending } } } diff --git a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/proto.rs b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/proto.rs index 63e09fd40c47..8b179a447d9f 100644 --- a/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/proto.rs +++ b/examples/custom-rlpx-subprotocol/src/subprotocol/protocol/proto.rs @@ -9,14 +9,16 @@ use reth_primitives::{Buf, BufMut, BytesMut}; pub(crate) enum CustomRlpxProtoMessageId { Ping = 0x00, Pong = 0x01, - CustomMessage = 0x02, + PingMessage = 0x02, + PongMessage = 0x03, } #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) enum CustomRlpxProtoMessageKind { Ping, Pong, - CustomMessage(String), + PingMessage(String), + PongMessage(String), } #[derive(Clone, Debug, PartialEq, Eq)] @@ -33,7 +35,22 @@ impl CustomRlpxProtoMessage { /// Returns the protocol for the `custom_rlpx` protocol. pub fn protocol() -> Protocol { - Protocol::new(Self::capability(), 3) + Protocol::new(Self::capability(), 4) + } + + /// Creates a ping message + pub fn ping_message(msg: impl Into) -> Self { + Self { + message_type: CustomRlpxProtoMessageId::PingMessage, + message: CustomRlpxProtoMessageKind::PingMessage(msg.into()), + } + } + /// Creates a ping message + pub fn pong_message(msg: impl Into) -> Self { + Self { + message_type: CustomRlpxProtoMessageId::PongMessage, + message: CustomRlpxProtoMessageKind::PongMessage(msg.into()), + } } /// Creates a ping message @@ -57,9 +74,9 @@ impl CustomRlpxProtoMessage { let mut buf = BytesMut::new(); buf.put_u8(self.message_type as u8); match &self.message { - CustomRlpxProtoMessageKind::Ping => {} - CustomRlpxProtoMessageKind::Pong => {} - CustomRlpxProtoMessageKind::CustomMessage(msg) => { + CustomRlpxProtoMessageKind::Ping | CustomRlpxProtoMessageKind::Pong => {} + CustomRlpxProtoMessageKind::PingMessage(msg) | + CustomRlpxProtoMessageKind::PongMessage(msg) => { buf.put(msg.as_bytes()); } } @@ -76,16 +93,21 @@ impl CustomRlpxProtoMessage { let message_type = match id { 0x00 => CustomRlpxProtoMessageId::Ping, 0x01 => CustomRlpxProtoMessageId::Pong, - 0x02 => CustomRlpxProtoMessageId::CustomMessage, + 0x02 => CustomRlpxProtoMessageId::PingMessage, + 0x03 => CustomRlpxProtoMessageId::PongMessage, _ => return None, }; let message = match message_type { CustomRlpxProtoMessageId::Ping => CustomRlpxProtoMessageKind::Ping, CustomRlpxProtoMessageId::Pong => CustomRlpxProtoMessageKind::Pong, - CustomRlpxProtoMessageId::CustomMessage => CustomRlpxProtoMessageKind::CustomMessage( + CustomRlpxProtoMessageId::PingMessage => CustomRlpxProtoMessageKind::PingMessage( + String::from_utf8_lossy(&buf[..]).into_owned(), + ), + CustomRlpxProtoMessageId::PongMessage => CustomRlpxProtoMessageKind::PongMessage( String::from_utf8_lossy(&buf[..]).into_owned(), ), }; + Some(Self { message_type, message }) } }