From c090160bdd566db4e3ce7386c971a34c1de0e519 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Wed, 27 Nov 2024 16:16:26 +0200 Subject: [PATCH] Merging dev #6655 --- applications/minotari_node/src/bootstrap.rs | 8 +- .../chain_metadata_service/service.rs | 3 + .../p2p/src/services/liveness/handle.rs | 27 +- base_layer/p2p/src/services/liveness/mock.rs | 12 +- base_layer/p2p/src/services/liveness/mod.rs | 4 +- .../p2p/src/services/liveness/service.rs | 54 ++- base_layer/p2p/src/services/liveness/state.rs | 20 +- base_layer/p2p/src/services/mod.rs | 1 + .../p2p/src/services/monitor_peers/mod.rs | 86 +++++ .../p2p/src/services/monitor_peers/service.rs | 315 ++++++++++++++++++ network/core/src/connection.rs | 6 + network/core/src/event.rs | 2 +- 12 files changed, 502 insertions(+), 36 deletions(-) create mode 100644 base_layer/p2p/src/services/monitor_peers/mod.rs create mode 100644 base_layer/p2p/src/services/monitor_peers/service.rs diff --git a/applications/minotari_node/src/bootstrap.rs b/applications/minotari_node/src/bootstrap.rs index 740e186b5a..d40af2c061 100644 --- a/applications/minotari_node/src/bootstrap.rs +++ b/applications/minotari_node/src/bootstrap.rs @@ -51,7 +51,10 @@ use tari_p2p::{ initialization::P2pInitializer, message::TariNodeMessageSpec, peer_seeds::SeedPeer, - services::liveness::{config::LivenessConfig, LivenessInitializer}, + services::{ + liveness::{config::LivenessConfig, LivenessInitializer}, + monitor_peers::MonitorPeersInitializer, + }, Dispatcher, P2pConfig, }; @@ -142,6 +145,9 @@ where B: BlockchainBackend + 'static }, dispatcher.clone(), )) + .add_initializer(MonitorPeersInitializer::new( + base_node_config.metadata_auto_ping_interval, + )) .add_initializer(ChainMetadataServiceInitializer) .add_initializer(BaseNodeStateMachineInitializer::new( self.db.clone().into(), diff --git a/base_layer/core/src/base_node/chain_metadata_service/service.rs b/base_layer/core/src/base_node/chain_metadata_service/service.rs index 69b2670df3..2a0a6e247f 100644 --- a/base_layer/core/src/base_node/chain_metadata_service/service.rs +++ b/base_layer/core/src/base_node/chain_metadata_service/service.rs @@ -365,6 +365,7 @@ mod test { metadata, peer_id, latency: None, + nonce: 0, }; let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event)); @@ -387,6 +388,7 @@ mod test { metadata, peer_id: node_id, latency: None, + nonce: 0, }; let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event)); @@ -405,6 +407,7 @@ mod test { metadata, peer_id: node_id, latency: None, + nonce: 0, }; let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event)); diff --git a/base_layer/p2p/src/services/liveness/handle.rs b/base_layer/p2p/src/services/liveness/handle.rs index 4ae77eee56..cd52d0e747 100644 --- a/base_layer/p2p/src/services/liveness/handle.rs +++ b/base_layer/p2p/src/services/liveness/handle.rs @@ -35,6 +35,8 @@ use crate::proto::liveness::MetadataKey; pub enum LivenessRequest { /// Send a ping to the given node ID SendPing(PeerId), + /// Ping a list of peers + SendPings(Vec), /// Retrieve the total number of pings received GetPingCount, /// Retrieve the total number of pongs received @@ -55,7 +57,7 @@ pub enum LivenessRequest { #[derive(Debug)] pub enum LivenessResponse { /// Indicates that the request succeeded - Ok, + Ok(Option>), /// Used to return a counter value from `GetPingCount` and `GetPongCount` Count(usize), /// Response for GetAvgLatency and GetNetworkAvgLatency @@ -82,14 +84,17 @@ pub struct PingPongEvent { pub latency: Option, /// Metadata of the corresponding node pub metadata: Metadata, + /// The nonce of the ping/pong message, for clients that want to match pings with pongs + pub nonce: u64, } impl PingPongEvent { - pub fn new(peer_id: PeerId, latency: Option, metadata: Metadata) -> Self { + pub fn new(peer_id: PeerId, latency: Option, metadata: Metadata, nonce: u64) -> Self { Self { peer_id, latency, metadata, + nonce, } } } @@ -120,9 +125,17 @@ impl LivenessHandle { } /// Send a ping to a given node ID - pub async fn send_ping(&mut self, peer_id: PeerId) -> Result<(), LivenessError> { + pub async fn send_ping(&mut self, peer_id: PeerId) -> Result { match self.handle.call(LivenessRequest::SendPing(peer_id)).await?? { - LivenessResponse::Ok => Ok(()), + LivenessResponse::Ok(Some(nonces)) => Ok(nonces[0]), + _ => Err(LivenessError::UnexpectedApiResponse), + } + } + + /// Send pings to a list of peers + pub async fn send_pings(&mut self, node_ids: Vec) -> Result, LivenessError> { + match self.handle.call(LivenessRequest::SendPings(node_ids)).await?? { + LivenessResponse::Ok(Some(nonces)) => Ok(nonces), _ => Err(LivenessError::UnexpectedApiResponse), } } @@ -150,7 +163,7 @@ impl LivenessHandle { .call(LivenessRequest::SetMetadataEntry(key, value)) .await?? { - LivenessResponse::Ok => Ok(()), + LivenessResponse::Ok(_) => Ok(()), _ => Err(LivenessError::UnexpectedApiResponse), } } @@ -158,7 +171,7 @@ impl LivenessHandle { /// Add a monitored peer to the basic config if not present pub async fn check_add_monitored_peer(&mut self, peer_id: PeerId) -> Result<(), LivenessError> { match self.handle.call(LivenessRequest::AddMonitoredPeer(peer_id)).await?? { - LivenessResponse::Ok => Ok(()), + LivenessResponse::Ok(_) => Ok(()), _ => Err(LivenessError::UnexpectedApiResponse), } } @@ -170,7 +183,7 @@ impl LivenessHandle { .call(LivenessRequest::RemoveMonitoredPeer(peer_id)) .await?? { - LivenessResponse::Ok => Ok(()), + LivenessResponse::Ok(_) => Ok(()), _ => Err(LivenessError::UnexpectedApiResponse), } } diff --git a/base_layer/p2p/src/services/liveness/mock.rs b/base_layer/p2p/src/services/liveness/mock.rs index 652531a17f..9b765fe5d3 100644 --- a/base_layer/p2p/src/services/liveness/mock.rs +++ b/base_layer/p2p/src/services/liveness/mock.rs @@ -125,7 +125,11 @@ impl LivenessMock { self.mock_state.add_request_call(req.clone()); match req { SendPing(_) => { - reply.send(Ok(LivenessResponse::Ok)).unwrap(); + reply.send(Ok(LivenessResponse::Ok(Some(vec![0])))).unwrap(); + }, + SendPings(node_ids) => { + let nonces: Vec = (0..node_ids.len() as u64).collect(); + reply.send(Ok(LivenessResponse::Ok(Some(nonces)))).unwrap(); }, GetPingCount => { reply.send(Ok(LivenessResponse::Count(1))).unwrap(); @@ -140,13 +144,13 @@ impl LivenessMock { reply.send(Ok(LivenessResponse::AvgLatency(None))).unwrap(); }, SetMetadataEntry(_, _) => { - reply.send(Ok(LivenessResponse::Ok)).unwrap(); + reply.send(Ok(LivenessResponse::Ok(None))).unwrap(); }, AddMonitoredPeer(_) => { - reply.send(Ok(LivenessResponse::Ok)).unwrap(); + reply.send(Ok(LivenessResponse::Ok(None))).unwrap(); }, RemoveMonitoredPeer(_) => { - reply.send(Ok(LivenessResponse::Ok)).unwrap(); + reply.send(Ok(LivenessResponse::Ok(None))).unwrap(); }, } } diff --git a/base_layer/p2p/src/services/liveness/mod.rs b/base_layer/p2p/src/services/liveness/mod.rs index 0076587f59..d91c0e77d2 100644 --- a/base_layer/p2p/src/services/liveness/mod.rs +++ b/base_layer/p2p/src/services/liveness/mod.rs @@ -45,6 +45,7 @@ pub use handle::{LivenessEvent, LivenessHandle, LivenessRequest, LivenessRespons mod message; mod service; +pub use service::MAX_INFLIGHT_TTL; mod state; pub use state::Metadata; @@ -64,6 +65,7 @@ use tari_service_framework::{ use tokio::sync::{broadcast, mpsc}; use self::service::LivenessService; +pub use crate::proto::liveness::MetadataKey; use crate::{ message::TariNodeMessageSpec, proto::message::TariMessageType, @@ -74,7 +76,7 @@ const LOG_TARGET: &str = "p2p::services::liveness"; /// Initializer for the Liveness service handle and service future. pub struct LivenessInitializer { - config: Option, + pub(crate) config: Option, dispatcher: Dispatcher, } diff --git a/base_layer/p2p/src/services/liveness/service.rs b/base_layer/p2p/src/services/liveness/service.rs index 5d84c4a682..bcabd667f6 100644 --- a/base_layer/p2p/src/services/liveness/service.rs +++ b/base_layer/p2p/src/services/liveness/service.rs @@ -20,7 +20,12 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{collections::HashSet, iter, sync::Arc, time::Instant}; +use std::{ + collections::HashSet, + iter, + sync::Arc, + time::{Duration, Instant}, +}; use futures::{future::Either, pin_mut, stream::StreamExt, Stream}; use log::*; @@ -45,6 +50,8 @@ use crate::{ services::liveness::{handle::LivenessEventSender, LivenessEvent, PingPongEvent}, }; +pub const MAX_INFLIGHT_TTL: Duration = Duration::from_secs(30); + /// Service responsible for testing Liveness of Peers. pub struct LivenessService { config: LivenessConfig, @@ -163,7 +170,8 @@ where TRequestStream: Stream { @@ -188,19 +196,29 @@ where TRequestStream: Stream Result<(), LivenessError> { + async fn send_ping(&mut self, peer_id: PeerId) -> Result { let msg = PingPongMessage::ping_with_metadata(self.state.metadata().clone()); - self.state.add_inflight_ping(msg.nonce, peer_id); + let nonce = msg.nonce; + self.state.add_inflight_ping( + msg.nonce, + peer_id, + self.config.auto_ping_interval.unwrap_or(MAX_INFLIGHT_TTL), + ); debug!(target: LOG_TARGET, "Sending ping to peer '{}'", peer_id); self.outbound_messaging.send_message(peer_id, msg).await?; - Ok(()) + Ok(nonce) } async fn send_pong(&mut self, nonce: u64, dest: PeerId) -> Result<(), LivenessError> { @@ -214,9 +232,17 @@ where TRequestStream: Stream { - self.send_ping(peer_id).await?; + let nonce = self.send_ping(peer_id).await?; self.state.inc_pings_sent(); - Ok(LivenessResponse::Ok) + Ok(LivenessResponse::Ok(Some(vec![nonce]))) + }, + SendPings(peer_ids) => { + let mut nonces = Vec::with_capacity(peer_ids.len()); + for peer_id in peer_ids { + nonces.push(self.send_ping(peer_id).await?); + self.state.inc_pings_sent(); + } + Ok(LivenessResponse::Ok(Some(nonces))) }, GetPingCount => { let ping_count = self.get_ping_count(); @@ -236,15 +262,15 @@ where TRequestStream: Stream { self.state.set_metadata_entry(key, value); - Ok(LivenessResponse::Ok) + Ok(LivenessResponse::Ok(None)) }, AddMonitoredPeer(peer_id) => { self.monitored_peers.insert(peer_id); - Ok(LivenessResponse::Ok) + Ok(LivenessResponse::Ok(None)) }, RemoveMonitoredPeer(peer_id) => { self.monitored_peers.remove(&peer_id); - Ok(LivenessResponse::Ok) + Ok(LivenessResponse::Ok(None)) }, } } @@ -270,7 +296,11 @@ where TRequestStream: Stream, +} + +impl MonitorPeersInitializer { + /// Create a new MonitorPeersInitializer from the inbound message subscriber + pub fn new(auto_ping_interval: Duration) -> Self { + Self { + auto_ping_interval: Some(auto_ping_interval), + } + } +} + +impl Default for MonitorPeersInitializer { + fn default() -> Self { + Self { + auto_ping_interval: Some(MAX_INFLIGHT_TTL), + } + } +} + +#[async_trait] +impl ServiceInitializer for MonitorPeersInitializer { + async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> { + debug!(target: LOG_TARGET, "Initializing Peer Monitoring Service"); + + let auto_ping_interval = max( + self.auto_ping_interval + .take() + .expect("Monitor peers service initialized more than once."), + MAX_INFLIGHT_TTL, + ); + + // Spawn the MonitorPeers service on the executor + context.spawn_when_ready(move |handles| async move { + let liveness = handles.expect_handle::(); + let network = handles.expect_handle::(); + + let service = + MonitorPeersService::new(network, liveness, handles.get_shutdown_signal(), auto_ping_interval); + service.run().await; + debug!(target: LOG_TARGET, "Monitor peers service has shut down"); + }); + + debug!(target: LOG_TARGET, "Monitor peers service initialized"); + Ok(()) + } +} diff --git a/base_layer/p2p/src/services/monitor_peers/service.rs b/base_layer/p2p/src/services/monitor_peers/service.rs new file mode 100644 index 0000000000..67c4a37d28 --- /dev/null +++ b/base_layer/p2p/src/services/monitor_peers/service.rs @@ -0,0 +1,315 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::collections::{HashMap, VecDeque}; + +use futures::pin_mut; +use log::*; +use tari_network::{identity::PeerId, Connection, ConnectionDirection, NetworkHandle, NetworkingService}; +use tari_shutdown::ShutdownSignal; +use tokio::{ + sync::broadcast::error::RecvError, + time::{self, Duration}, +}; + +use crate::services::{ + liveness::{LivenessEvent, LivenessHandle}, + monitor_peers::LOG_TARGET, +}; + +struct PeerLiveness { + vec: VecDeque, +} + +impl PeerLiveness { + pub fn new() -> Self { + Self { + vec: VecDeque::with_capacity(MAX_SIZE), + } + } + + pub fn push_pop(&mut self, item: T) { + if self.vec.len() == MAX_SIZE { + self.vec.pop_front(); + } + self.vec.push_back(item); + } + + pub fn iter(&self) -> std::collections::vec_deque::Iter { + self.vec.iter() + } +} + +struct Stats { + connected: bool, + responsive: bool, + loop_count: u64, +} + +struct PeerPingPong { + expected_nonce: u64, + received_nonce: Option, + node_id: PeerId, +} + +pub struct MonitorPeersService { + network: NetworkHandle, + liveness_handle: LivenessHandle, + shutdown_signal: ShutdownSignal, + auto_ping_interval: Duration, +} + +impl MonitorPeersService { + pub fn new( + network: NetworkHandle, + liveness_handle: LivenessHandle, + shutdown_signal: ShutdownSignal, + auto_ping_interval: Duration, + ) -> Self { + Self { + network, + liveness_handle, + shutdown_signal, + auto_ping_interval, + } + } + + /// Monitor the liveness of outbound peer connections and disconnect those that do not respond to pings + /// consecutively. The intent of the interval timer is to be significantly longer than the rate at which + /// metadata is requested from peers. + #[allow(clippy::too_many_lines)] + pub async fn run(mut self) { + let mut interval_timer = time::interval(self.auto_ping_interval * 10); + let liveness_events = self.liveness_handle.get_event_stream(); + pin_mut!(liveness_events); + + let mut peer_liveness_stats: HashMap> = HashMap::new(); + + let mut loop_count = 0u64; + loop { + loop_count += 1; + tokio::select! { + biased; + _ = self.shutdown_signal.wait() => { + break; + } + + _ = interval_timer.tick() => { + trace!(target: LOG_TARGET, "Starting monitor peers round (iter {})", loop_count); + let active_connections = match self.network.get_active_connections().await { + Ok(val) => val, + Err(e) => { + warn!(target: LOG_TARGET, "Failed to get active connections ({})", e); + continue; + }, + }; + let mut active_peer_connections = active_connections + .iter() + .filter(|p|p.is_node() && p.direction() == ConnectionDirection::Outbound) + .cloned() + .collect::>(); + if active_peer_connections.is_empty() { + trace!(target: LOG_TARGET, "No active connections found"); + continue; + } + let active_peer_node_ids = active_peer_connections + .iter() + .map(|p|*p.peer_id()) + .collect::>(); + + let known_peer_connections = peer_liveness_stats.keys().copied().collect::>(); + for peer_id in &known_peer_connections { + if !active_peer_node_ids.contains(peer_id) { + // Prior connections not connected now are considered inactive and unresponsive + peer_liveness_stats + .entry(*peer_id) + .and_modify(|item| item.push_pop( + Stats {connected: false, responsive: false, loop_count} + )); + } + } + for peer_id in &active_peer_node_ids { + if !known_peer_connections.contains(peer_id) { + // New connections are considered active and responsive + peer_liveness_stats.insert( *peer_id, PeerLiveness::new()); + } + } + + let mut peer_ping_pongs = match self.liveness_handle + .send_pings(active_peer_node_ids.clone()) + .await + { + Ok(nonces) => active_peer_node_ids + .iter() + .zip(nonces.iter()) + .map(|(node_id, &nonce)| PeerPingPong { + expected_nonce: nonce, + received_nonce: None, + node_id: *node_id, + }) + .collect::>(), + Err(e) => { + warn!(target: LOG_TARGET, "Failed to send pings to peers ({})", e); + continue; + }, + }; + + // Only listen for the expected pongs from the peers (ignore any other pongs) + let timeout_timer = time::sleep(self.auto_ping_interval); + tokio::pin!(timeout_timer); + loop { + tokio::select! { + biased; + _ = self.shutdown_signal.wait() => { + break; + } + + event = liveness_events.recv() => { + let event_str = format!("{:?}", event); + match event { + Ok(arc_event) => { + if let LivenessEvent::ReceivedPong(pong) = &*arc_event { + if let Some(ping_pong) = peer_ping_pongs.iter_mut().find(|p| p.expected_nonce == pong.nonce) { + ping_pong.received_nonce = Some(pong.nonce); + } + if peer_ping_pongs.iter().all(|p| p.received_nonce.is_some()) { + break; + } + } + }, + Err(RecvError::Closed) => { + return; + }, + Err(ref e) => { + debug!( + target: LOG_TARGET, + "Liveness event error: {:?} ({})", + event_str, e.to_string() + ); + }, + } + }, + + _ = &mut timeout_timer => { + trace!( + target: LOG_TARGET, + "Timed out waiting for pongs, received {} of {} (iter {})", + peer_ping_pongs.iter().filter(|p| p.received_nonce.is_some()).count(), + peer_ping_pongs.len(), + loop_count + ); + break; + }, + } + } + + // Compare nonces and close connections for peers that did not respond multiple times + self.update_stats_and_cull_unresponsive_connections( + &peer_ping_pongs, + &mut active_peer_connections, + &mut peer_liveness_stats, + loop_count + ).await; + }, + } + } + } + + async fn update_stats_and_cull_unresponsive_connections( + &mut self, + peer_ping_pongs: &[PeerPingPong], + active_peer_connections: &mut [Connection], + peer_liveness_stats: &mut HashMap>, + loop_count: u64, + ) { + let received_nonces_count = peer_ping_pongs.iter().filter(|p| p.received_nonce.is_some()).count(); + if received_nonces_count != peer_ping_pongs.len() { + trace!( + target: LOG_TARGET, + "Found {} of {} outbound base node peer connections that did not respond to pings", + peer_ping_pongs.len().saturating_sub(received_nonces_count), active_peer_connections.len() + ); + } + + let mut disconnect_peers = Vec::new(); + for &mut ref peer in active_peer_connections.iter_mut() { + if let Some(ping_pong) = peer_ping_pongs.iter().find(|p| &p.node_id == peer.peer_id()) { + if ping_pong.received_nonce.is_some() { + peer_liveness_stats.entry(*peer.peer_id()).and_modify(|item| { + item.push_pop(Stats { + connected: true, + responsive: true, + loop_count, + }) + }); + } else { + peer_liveness_stats.entry(*peer.peer_id()).and_modify(|item| { + item.push_pop(Stats { + connected: true, + responsive: false, + loop_count, + }) + }); + if let Some(stats) = peer_liveness_stats.get(peer.peer_id()) { + // Evaluate the last 3 entries in the stats + if stats + .iter() + .rev() + .take(3) + .filter(|s| s.connected && !s.responsive) + .count() >= + 3 + { + disconnect_peers.push(peer.clone()); + } else { + trace!( + target: LOG_TARGET, + "Peer {} stats - (iter, conn, resp) {:?}", + peer.peer_id(), + stats.iter().map(|s|(s.loop_count, s.connected, s.responsive)).collect::>(), + ); + } + } + } + } + } + + for peer in disconnect_peers { + if let Some(stats) = peer_liveness_stats.get(peer.peer_id()) { + debug!( + target: LOG_TARGET, + "Disconnecting {} as the peer is no longer responsive - (iter, conn, resp) {:?}", + peer.peer_id(), + stats.iter().map(|s|(s.loop_count, s.connected, s.responsive)).collect::>(), + ); + if let Err(e) = self.network.disconnect_peer(peer.peer_id).await { + warn!( + target: LOG_TARGET, + "Error while attempting to disconnect peer {}: {}", peer.peer_id(), e + ); + } + peer_liveness_stats.remove(peer.peer_id()); + trace!(target: LOG_TARGET, "Disconnected {} (iter, {})", peer.peer_id(), loop_count); + } + } + } +} diff --git a/network/core/src/connection.rs b/network/core/src/connection.rs index a142f8fdc8..4ef308b0aa 100644 --- a/network/core/src/connection.rs +++ b/network/core/src/connection.rs @@ -53,4 +53,10 @@ impl Connection { pub fn is_wallet_user_agent(&self) -> bool { self.user_agent.as_ref().map_or(false, |x| x.contains("wallet")) } + + pub fn is_node(&self) -> bool { + self.supported_protocols + .iter() + .any(|p| format!("{}", p).contains("/tari/mempool")) + } } diff --git a/network/core/src/event.rs b/network/core/src/event.rs index 545138ce42..3b43cb6924 100644 --- a/network/core/src/event.rs +++ b/network/core/src/event.rs @@ -25,7 +25,7 @@ pub enum NetworkEvent { }, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] pub enum ConnectionDirection { Inbound, Outbound,