From 3e515b90b7e66e6daa5a6425a884425b995b97aa Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Thu, 29 Dec 2022 21:18:40 +0200 Subject: [PATCH] Add tests for `behaviour.rs` --- .../src/protocol/notifications/behaviour.rs | 1977 ++++++++++++++++- .../src/protocol/notifications/handler.rs | 86 +- .../protocol/notifications/upgrade/collec.rs | 26 +- 3 files changed, 2073 insertions(+), 16 deletions(-) diff --git a/client/network/src/protocol/notifications/behaviour.rs b/client/network/src/protocol/notifications/behaviour.rs index 04f6fe445ac63..9843911d55b22 100644 --- a/client/network/src/protocol/notifications/behaviour.rs +++ b/client/network/src/protocol/notifications/behaviour.rs @@ -46,6 +46,9 @@ use std::{ time::{Duration, Instant}, }; +/// Default ban duration. +const BAN_DURATION: Duration = Duration::from_secs(5); + /// Network behaviour that handles opening substreams for custom protocols with other peers. /// /// # How it works @@ -1416,8 +1419,8 @@ impl NetworkBehaviour for Notifications { let ban_duration = match st { PeerState::PendingRequest { timer_deadline, .. } if timer_deadline > now => - cmp::max(timer_deadline - now, Duration::from_secs(5)), - _ => Duration::from_secs(5), + cmp::max(timer_deadline - now, BAN_DURATION), + _ => BAN_DURATION, }; let delay_id = self.next_delay_id; @@ -2070,3 +2073,1973 @@ impl NetworkBehaviour for Notifications { Poll::Pending } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::protocol::notifications::handler::tests::*; + use libp2p::swarm::AddressRecord; + use std::{collections::HashSet, iter}; + + impl PartialEq for ConnectionState { + fn eq(&self, other: &ConnectionState) -> bool { + match (self, other) { + (ConnectionState::Closed, ConnectionState::Closed) => true, + (ConnectionState::Closing, ConnectionState::Closing) => true, + (ConnectionState::Opening, ConnectionState::Opening) => true, + (ConnectionState::OpeningThenClosing, ConnectionState::OpeningThenClosing) => true, + (ConnectionState::OpenDesiredByRemote, ConnectionState::OpenDesiredByRemote) => + true, + (ConnectionState::Open(_), ConnectionState::Open(_)) => true, + _ => false, + } + } + } + + #[derive(Clone)] + struct MockPollParams { + peer_id: PeerId, + addr: Multiaddr, + } + + impl PollParameters for MockPollParams { + type SupportedProtocolsIter = std::vec::IntoIter>; + type ListenedAddressesIter = std::vec::IntoIter; + type ExternalAddressesIter = std::vec::IntoIter; + + fn supported_protocols(&self) -> Self::SupportedProtocolsIter { + vec![].into_iter() + } + + fn listened_addresses(&self) -> Self::ListenedAddressesIter { + vec![self.addr.clone()].into_iter() + } + + fn external_addresses(&self) -> Self::ExternalAddressesIter { + vec![].into_iter() + } + + fn local_peer_id(&self) -> &PeerId { + &self.peer_id + } + } + + fn development_notifs() -> (Notifications, sc_peerset::PeersetHandle) { + let (peerset, peerset_handle) = { + let mut sets = Vec::with_capacity(1); + + sets.push(sc_peerset::SetConfig { + in_peers: 25, + out_peers: 25, + bootnodes: Vec::new(), + reserved_nodes: HashSet::new(), + reserved_only: false, + }); + + sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { sets }) + }; + + ( + Notifications::new( + peerset, + iter::once(ProtocolConfig { + name: "/foo".into(), + fallback_names: Vec::new(), + handshake: vec![1, 2, 3, 4], + max_notification_size: u64::MAX, + }), + ), + peerset_handle, + ) + } + + #[test] + fn update_handshake() { + let (mut notif, _peerset) = development_notifs(); + + let inner = notif.notif_protocols.get_mut(0).unwrap().handshake.read().clone(); + assert_eq!(inner, vec![1, 2, 3, 4]); + + notif.set_notif_protocol_handshake(0.into(), vec![5, 6, 7, 8]); + + let inner = notif.notif_protocols.get_mut(0).unwrap().handshake.read().clone(); + assert_eq!(inner, vec![5, 6, 7, 8]); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn update_unknown_handshake() { + let (mut notif, _peerset) = development_notifs(); + + notif.set_notif_protocol_handshake(1337.into(), vec![5, 6, 7, 8]); + } + + #[test] + fn disconnect_disconnected_peer() { + let (mut notif, _peerset) = development_notifs(); + + let peer = PeerId::random(); + notif.peers.insert( + (peer, 0.into()), + PeerState::Backoff { timer: DelayId(0), timer_deadline: Instant::now() }, + ); + notif.disconnect_peer(&peer, 0.into()); + + assert!(std::matches!( + notif.peers.get(&(peer, 0.into())), + Some(PeerState::Backoff { timer: DelayId(0), .. }) + )); + + let peer = PeerId::random(); + notif.peers.insert( + (peer, 0.into()), + PeerState::PendingRequest { timer: DelayId(0), timer_deadline: Instant::now() }, + ); + notif.disconnect_peer(&peer, 0.into()); + + assert!(std::matches!( + notif.peers.get(&(peer, 0.into())), + Some(PeerState::PendingRequest { timer: DelayId(0), .. }) + )); + + let peer = PeerId::random(); + notif.peers.insert((peer, 0.into()), PeerState::Requested); + notif.disconnect_peer(&peer, 0.into()); + + assert!(std::matches!(notif.peers.get(&(peer, 0.into())), Some(PeerState::Requested))); + + let peer = PeerId::random(); + notif.peers.insert( + (peer, 0.into()), + PeerState::Disabled { backoff_until: None, connections: SmallVec::new() }, + ); + notif.disconnect_peer(&peer, 0.into()); + + assert!(std::matches!( + notif.peers.get(&(peer, 0.into())), + Some(PeerState::Disabled { backoff_until: None, .. }) + )); + } + + #[test] + fn remote_opens_connection_and_substream() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + + if let Some(&PeerState::Disabled { ref connections, backoff_until: None }) = + notif.peers.get(&(peer, 0.into())) + { + assert_eq!(connections[0], (conn, ConnectionState::Closed)); + } else { + panic!("invalid state"); + } + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + + if let Some(&PeerState::Incoming { ref connections, backoff_until: None }) = + notif.peers.get(&(peer, 0.into())) + { + assert_eq!(connections.len(), 1); + assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote)); + } else { + panic!("invalid state"); + } + + assert!(std::matches!( + notif.incoming.pop(), + Some(IncomingPeer { alive: true, incoming_id: sc_peerset::IncomingIndex(0), .. }), + )); + } + + #[tokio::test] + async fn remote_substream_open_rejected() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + notif.disconnect_peer(&peer, 0.into()); + + if let Some(&PeerState::Disabled { ref connections, backoff_until: None }) = + notif.peers.get(&(peer, 0.into())) + { + assert_eq!(connections.len(), 1); + assert_eq!(connections[0], (conn, ConnectionState::Closing)); + } else { + panic!("invalid state"); + } + } + + #[test] + fn peerset_report_connect_backoff() { + let (mut notif, _peerset) = development_notifs(); + let set_id = sc_peerset::SetId::from(0); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // manually add backoff for the entry + if let Some(PeerState::Disabled { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, set_id)) + { + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap()); + } + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + + let timer = if let Some(&PeerState::Backoff { timer_deadline, .. }) = + notif.peers.get(&(peer, set_id)) + { + timer_deadline.clone() + } else { + panic!("invalid state"); + }; + + // attempt to connect the backed-off peer and verify that the request is pending + notif.peerset_report_connect(peer, set_id); + + if let Some(&PeerState::PendingRequest { timer_deadline, .. }) = + notif.peers.get(&(peer, set_id)) + { + assert_eq!(timer, timer_deadline); + } else { + panic!("invalid state"); + } + } + + #[test] + fn peerset_connect_incoming() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + // attempt to connect to the peer and verify that the peer state is `Enabled` + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); + } + + #[test] + fn peerset_disconnect_disable_pending_enable() { + let (mut notif, _peerset) = development_notifs(); + let set_id = sc_peerset::SetId::from(0); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // manually add backoff for the entry + if let Some(PeerState::Disabled { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, set_id)) + { + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap()); + } + + // switch state to `DisabledPendingEnable` + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!( + notif.peers.get(&(peer, set_id)), + Some(&PeerState::DisabledPendingEnable { .. }) + )); + + notif.peerset_report_disconnect(peer, set_id); + + if let Some(PeerState::Disabled { backoff_until, .. }) = notif.peers.get(&(peer, set_id)) { + assert!(backoff_until.is_some()); + assert!(backoff_until.unwrap() > Instant::now()); + } else { + panic!("invalid state"); + } + } + + #[test] + fn peerset_disconnect_enabled() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + // Set peer into `Enabled` state. + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); + + // disconnect peer and verify that the state is `Disabled` + notif.peerset_report_disconnect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + } + + #[test] + fn peerset_disconnect_requested() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let set_id = sc_peerset::SetId::from(0); + + // Set peer into `Requested` state. + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested))); + + // disconnect peer and verify that the state is `Disabled` + notif.peerset_report_disconnect(peer, set_id); + assert!(notif.peers.get(&(peer, set_id)).is_none()); + } + + #[test] + fn peerset_disconnect_pending_request() { + let (mut notif, _peerset) = development_notifs(); + let set_id = sc_peerset::SetId::from(0); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // manually add backoff for the entry + if let Some(PeerState::Disabled { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, set_id)) + { + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap()); + } + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. }))); + + // attempt to connect the backed-off peer and verify that the request is pending + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!( + notif.peers.get(&(peer, set_id)), + Some(&PeerState::PendingRequest { .. }) + )); + + // attempt to connect the backed-off peer and verify that the request is pending + notif.peerset_report_disconnect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. }))); + } + + #[test] + fn peerset_accept_peer_not_alive() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + assert!(std::matches!( + notif.incoming[0], + IncomingPeer { alive: true, incoming_id: sc_peerset::IncomingIndex(0), .. }, + )); + + notif.disconnect_peer(&peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + assert!(std::matches!( + notif.incoming[0], + IncomingPeer { alive: false, incoming_id: sc_peerset::IncomingIndex(0), .. }, + )); + + notif.peerset_report_accept(sc_peerset::IncomingIndex(0)); + } + + #[test] + fn secondary_connection_peer_state_incoming() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let conn2 = ConnectionId::new(1usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + if let Some(&PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id)) + { + assert_eq!(connections.len(), 1); + assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote)); + } else { + panic!("invalid state"); + } + + // add another connection + notif.inject_connection_established(&peer, &conn2, &connected, None, 0usize); + + if let Some(&PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id)) + { + assert_eq!(connections.len(), 2); + assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote)); + assert_eq!(connections[1], (conn2, ConnectionState::Closed)); + } else { + panic!("invalid state"); + } + } + + #[test] + fn close_connection_for_disabled_peer() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + assert!(notif.peers.get(&(peer, set_id)).is_none()); + } + + #[test] + fn close_connection_for_incoming_peer_one_connection() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + assert!(notif.peers.get(&(peer, set_id)).is_none()); + assert!(std::matches!( + notif.incoming[0], + IncomingPeer { alive: false, incoming_id: sc_peerset::IncomingIndex(0), .. }, + )); + } + + #[test] + fn close_connection_for_incoming_peer_two_connection() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let conn1 = ConnectionId::new(1usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + let mut conns = SmallVec::< + [(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER], + >::from(vec![(conn, ConnectionState::Closed)]); + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + notif.inject_connection_established(&peer, &conn1, &connected, None, 0usize); + conns.push((conn1, ConnectionState::Closed)); + + if let Some(PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id)) + { + assert_eq!(connections.len(), 2); + assert_eq!(connections[0], (conn, ConnectionState::OpenDesiredByRemote)); + assert_eq!(connections[1], (conn1, ConnectionState::Closed)); + } + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + } + + #[test] + fn connection_and_substream_open() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + let mut conn_yielder = ConnectionYielder::new(); + + // move the peer to `Enabled` state + notif.inject_connection_established( + &peer, + &conn, + &ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }, + None, + 0usize, + ); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); + + // open new substream + let event = conn_yielder.open_substream(peer, 0, connected, vec![1, 2, 3, 4]); + + notif.inject_event(peer, conn, event); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); + + if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) { + assert_eq!(connections.len(), 1); + assert_eq!(connections[0].0, conn); + assert!(std::matches!(connections[0].1, ConnectionState::Open(_))); + } + + assert!(std::matches!( + notif.events[notif.events.len() - 1], + NetworkBehaviourAction::GenerateEvent(NotificationsOut::CustomProtocolOpen { .. }) + )); + } + + #[test] + fn connection_closed_sink_replaced() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn1 = ConnectionId::new(0usize); + let conn2 = ConnectionId::new(1usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + let mut conn_yielder = ConnectionYielder::new(); + + // open two connections + for conn_id in vec![conn1, conn2] { + notif.inject_connection_established( + &peer, + &conn_id, + &ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }, + None, + 0usize, + ); + } + + if let Some(&PeerState::Disabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) + { + assert_eq!(connections[0], (conn1, ConnectionState::Closed)); + assert_eq!(connections[1], (conn2, ConnectionState::Closed)); + } else { + panic!("invalid state"); + } + + // open substreams on both active connections + notif.peerset_report_connect(peer, set_id); + notif.inject_event( + peer, + conn2, + NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }, + ); + + if let Some(&PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) + { + assert_eq!(connections[0], (conn1, ConnectionState::Opening)); + assert_eq!(connections[1], (conn2, ConnectionState::Opening)); + } else { + panic!("invalid state"); + } + + // add two new substreams, one for each connection and verify that both are in open state + for conn in vec![conn1, conn2].iter() { + notif.inject_event( + peer, + *conn, + conn_yielder.open_substream(peer, 0, connected.clone(), vec![1, 2, 3, 4]), + ); + } + + if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) { + assert_eq!(connections[0].0, conn1); + assert!(std::matches!(connections[0].1, ConnectionState::Open(_))); + assert_eq!(connections[1].0, conn2); + assert!(std::matches!(connections[1].1, ConnectionState::Open(_))); + } else { + panic!("invalid state"); + } + + // check peer information + assert_eq!(notif.open_peers().collect::>(), vec![&peer],); + assert_eq!(notif.reserved_peers(set_id).collect::>(), Vec::<&PeerId>::new(),); + assert_eq!(notif.num_discovered_peers(), 0usize); + + // close the other connection and verify that notification replacement event is emitted + notif.inject_connection_closed( + &peer, + &conn1, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + + if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) { + assert_eq!(connections.len(), 1); + assert_eq!(connections[0].0, conn2); + assert!(std::matches!(connections[0].1, ConnectionState::Open(_))); + } else { + panic!("invalid state"); + } + + assert!(std::matches!( + notif.events[notif.events.len() - 1], + NetworkBehaviourAction::GenerateEvent(NotificationsOut::CustomProtocolReplaced { .. }) + )); + } + + #[test] + fn dial_failure_for_requested_peer() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let set_id = sc_peerset::SetId::from(0); + + // Set peer into `Requested` state. + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested))); + + notif.inject_dial_failure( + Some(peer), + NotifsHandlerProto::new(vec![]), + &libp2p::swarm::DialError::Banned, + ); + + if let Some(PeerState::Backoff { timer_deadline, .. }) = notif.peers.get(&(peer, set_id)) { + assert!(timer_deadline > &Instant::now()); + } else { + panic!("invalid state"); + } + } + + #[tokio::test] + async fn write_notification() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + let mut conn_yielder = ConnectionYielder::new(); + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); + + notif.inject_event( + peer, + conn, + conn_yielder.open_substream(peer, 0, connected, vec![1, 2, 3, 4]), + ); + + if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) { + assert_eq!(connections[0].0, conn); + assert!(std::matches!(connections[0].1, ConnectionState::Open(_))); + } else { + panic!("invalid state"); + } + + notif.write_notification(&peer, set_id, vec![1, 3, 3, 7]); + assert_eq!(conn_yielder.get_next_event(peer, set_id.into()).await, Some(vec![1, 3, 3, 7])); + } + + #[test] + fn peerset_report_connect_backoff_expired() { + let (mut notif, _peerset) = development_notifs(); + let set_id = sc_peerset::SetId::from(0); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + let backoff_duration = Duration::from_secs(2); + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // manually add backoff for the entry + if let Some(PeerState::Disabled { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, set_id)) + { + *backoff_until = Some(Instant::now().checked_add(backoff_duration).unwrap()); + } + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + + // wait until the backoff time has passed + std::thread::sleep(backoff_duration * 2); + + // attempt to connect the backed-off peer and verify that the request is pending + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested { .. }))) + } + + #[test] + fn peerset_report_disconnect_disabled() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let set_id = sc_peerset::SetId::from(0); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + notif.peerset_report_disconnect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + } + + #[test] + fn peerset_report_disconnect_backoff() { + let (mut notif, _peerset) = development_notifs(); + let set_id = sc_peerset::SetId::from(0); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + let backoff_duration = Duration::from_secs(2); + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // manually add backoff for the entry + if let Some(PeerState::Disabled { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, set_id)) + { + *backoff_until = Some(Instant::now().checked_add(backoff_duration).unwrap()); + } + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. }))); + + notif.peerset_report_disconnect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. }))); + } + + #[test] + fn inject_connection_closed_disable_pending_enable_two_connections() { + let (mut notif, _peerset) = development_notifs(); + let set_id = sc_peerset::SetId::from(0); + let peer = PeerId::random(); + let conn1 = ConnectionId::new(0usize); + let conn2 = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn1, &connected, None, 0usize); + notif.inject_connection_established(&peer, &conn2, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // manually add backoff for the entry + if let Some(PeerState::Disabled { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, set_id)) + { + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap()); + } + + // switch state to `DisabledPendingEnable` + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!( + notif.peers.get(&(peer, set_id)), + Some(&PeerState::DisabledPendingEnable { .. }) + )); + + notif.inject_connection_closed( + &peer, + &conn1, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + assert!(std::matches!( + notif.peers.get(&(peer, set_id)), + Some(&PeerState::DisabledPendingEnable { .. }) + )); + + notif.inject_connection_closed( + &peer, + &conn1, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. }))); + } + + #[test] + fn inject_connection_closed_incoming_with_backoff() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let set_id = sc_peerset::SetId::from(0); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + + if let Some(&mut PeerState::Incoming { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, 0.into())) + { + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap()); + } else { + panic!("invalid state"); + } + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. }))); + } + + #[test] + fn inject_connection_closed_incoming_with_two_connections_inactive_connection_closed() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn1 = ConnectionId::new(0usize); + let conn2 = ConnectionId::new(1usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + // open two connections + for conn_id in vec![conn1, conn2] { + notif.inject_connection_established( + &peer, + &conn_id, + &ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }, + None, + 0usize, + ); + } + + if let Some(&PeerState::Disabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) + { + assert_eq!(connections[0], (conn1, ConnectionState::Closed)); + assert_eq!(connections[1], (conn2, ConnectionState::Closed)); + } else { + panic!("invalid state"); + } + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event( + peer, + conn1, + NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }, + ); + + if let Some(&mut PeerState::Incoming { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, 0.into())) + { + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap()); + } else { + panic!("invalid state"); + } + + notif.inject_connection_closed( + &peer, + &conn2, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + } + + #[test] + fn inject_connection_closed_incoming_with_two_connections_active_connection_closed() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn1 = ConnectionId::new(0usize); + let conn2 = ConnectionId::new(1usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + // open two connections + for conn_id in vec![conn1, conn2] { + notif.inject_connection_established( + &peer, + &conn_id, + &ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }, + None, + 0usize, + ); + } + + if let Some(&PeerState::Disabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) + { + assert_eq!(connections[0], (conn1, ConnectionState::Closed)); + assert_eq!(connections[1], (conn2, ConnectionState::Closed)); + } else { + panic!("invalid state"); + } + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event( + peer, + conn1, + NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }, + ); + + if let Some(&mut PeerState::Incoming { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, 0.into())) + { + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap()); + } else { + panic!("invalid state"); + } + + notif.inject_connection_closed( + &peer, + &conn1, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + } + + #[test] + fn inject_connection_closed_for_active_connection() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn1 = ConnectionId::new(0usize); + let conn2 = ConnectionId::new(1usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + let mut conn_yielder = ConnectionYielder::new(); + + // open two connections + for conn_id in vec![conn1, conn2] { + notif.inject_connection_established( + &peer, + &conn_id, + &ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }, + None, + 0usize, + ); + } + + if let Some(&PeerState::Disabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) + { + assert_eq!(connections[0], (conn1, ConnectionState::Closed)); + assert_eq!(connections[1], (conn2, ConnectionState::Closed)); + } else { + panic!("invalid state"); + } + + // open substreams on both active connections + notif.peerset_report_connect(peer, set_id); + + if let Some(&PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) + { + assert_eq!(connections[0], (conn1, ConnectionState::Opening)); + assert_eq!(connections[1], (conn2, ConnectionState::Closed)); + } else { + panic!("invalid state"); + } + + notif.inject_event( + peer, + conn1, + conn_yielder.open_substream(peer, 0, connected.clone(), vec![1, 2, 3, 4]), + ); + + if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) { + assert!(std::matches!(connections[0].1, ConnectionState::Open(_))); + assert_eq!(connections[0].0, conn1); + assert_eq!(connections[1], (conn2, ConnectionState::Closed)); + } else { + panic!("invalid state"); + } + + // close the other connection and verify that notification replacement event is emitted + notif.inject_connection_closed( + &peer, + &conn1, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + } + + #[test] + fn inject_dial_failure_for_pending_request() { + let (mut notif, _peerset) = development_notifs(); + let set_id = sc_peerset::SetId::from(0); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // manually add backoff for the entry + if let Some(PeerState::Disabled { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, set_id)) + { + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap()); + } + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. }))); + + // attempt to connect the backed-off peer and verify that the request is pending + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!( + notif.peers.get(&(peer, set_id)), + Some(&PeerState::PendingRequest { .. }) + )); + + notif.inject_dial_failure( + Some(peer), + NotifsHandlerProto::new(vec![]), + &libp2p::swarm::DialError::Banned, + ); + + if let Some(PeerState::PendingRequest { ref timer_deadline, .. }) = + notif.peers.get(&(peer, set_id)) + { + assert!(timer_deadline > &(Instant::now() + BAN_DURATION)); + } + } + + #[test] + fn peerstate_incoming_open_desired_by_remote() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let set_id = sc_peerset::SetId::from(0); + let conn1 = ConnectionId::new(0usize); + let conn2 = ConnectionId::new(1usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn1, &connected, None, 0usize); + notif.inject_connection_established(&peer, &conn2, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event( + peer, + conn1, + NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }, + ); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + // add another open event from remote + notif.inject_event( + peer, + conn2, + NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }, + ); + + if let Some(PeerState::Incoming { ref connections, .. }) = notif.peers.get(&(peer, set_id)) + { + assert_eq!(connections[0], (conn1, ConnectionState::OpenDesiredByRemote)); + assert_eq!(connections[1], (conn2, ConnectionState::OpenDesiredByRemote)); + } + } + + #[tokio::test] + async fn remove_backoff_peer_after_timeout() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let set_id = sc_peerset::SetId::from(0); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + // assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. + // }))); + + if let Some(&mut PeerState::Disabled { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, 0.into())) + { + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(2)).unwrap()); + } else { + panic!("invalid state"); + } + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + + let duration = if let Some(&PeerState::Backoff { timer_deadline, .. }) = + notif.peers.get(&(peer, set_id)) + { + timer_deadline + } else { + panic!("invalid state"); + }; + + if duration > Instant::now() { + std::thread::sleep(duration - Instant::now()); + } + + assert!(notif.peers.get(&(peer, set_id)).is_some()); + + if tokio::time::timeout(Duration::from_secs(5), async { + let mut params = MockPollParams { peer_id: PeerId::random(), addr: Multiaddr::empty() }; + + loop { + futures::future::poll_fn(|cx| { + let _ = notif.poll(cx, &mut params); + Poll::Ready(()) + }) + .await; + + if notif.peers.get(&(peer, set_id)).is_none() { + break + } + } + }) + .await + .is_err() + { + panic!("backoff peer was not removed in time"); + } + + assert!(notif.peers.get(&(peer, set_id)).is_none()); + } + + #[tokio::test] + async fn reschedule_disabled_pending_enable_when_connection_not_closed() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + let mut conn_yielder = ConnectionYielder::new(); + + // move the peer to `Enabled` state + notif.inject_connection_established( + &peer, + &conn, + &ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }, + None, + 0usize, + ); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); + + let event = conn_yielder.open_substream(peer, 0, connected, vec![1, 2, 3, 4]); + + notif.inject_event(peer, conn, event); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); + + if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) { + assert!(std::matches!(connections[0], (_, ConnectionState::Open(_)))); + assert_eq!(connections[0].0, conn); + } else { + panic!("invalid state"); + } + + notif.peerset_report_disconnect(peer, set_id); + + if let Some(PeerState::Disabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) + { + assert!(std::matches!(connections[0], (_, ConnectionState::Closing))); + assert_eq!(connections[0].0, conn); + } else { + panic!("invalid state"); + } + + notif.peerset_report_disconnect(peer, set_id); + + if let Some(PeerState::Disabled { ref connections, ref mut backoff_until }) = + notif.peers.get_mut(&(peer, set_id)) + { + assert!(std::matches!(connections[0], (_, ConnectionState::Closing))); + assert_eq!(connections[0].0, conn); + + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(2)).unwrap()); + } else { + panic!("invalid state"); + } + + notif.peerset_report_connect(peer, set_id); + + let mut prev_instant = + if let Some(PeerState::DisabledPendingEnable { + ref connections, timer_deadline, .. + }) = notif.peers.get(&(peer, set_id)) + { + assert!(std::matches!(connections[0], (_, ConnectionState::Closing))); + assert_eq!(connections[0].0, conn); + + timer_deadline.clone() + } else { + panic!("invalid state"); + }; + + if tokio::time::timeout(Duration::from_secs(5), async { + let mut params = MockPollParams { peer_id: PeerId::random(), addr: Multiaddr::empty() }; + + loop { + futures::future::poll_fn(|cx| { + let _ = notif.poll(cx, &mut params); + Poll::Ready(()) + }) + .await; + + prev_instant = + if let Some(PeerState::DisabledPendingEnable { timer_deadline, .. }) = + notif.peers.get(&(peer, set_id)) + { + if timer_deadline != &prev_instant { + break + } + timer_deadline.clone() + } else { + panic!("invalid state"); + }; + } + }) + .await + .is_err() + { + panic!("backoff peer was not removed in time"); + } + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn peerset_report_connect_with_enabled_peer() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + let mut conn_yielder = ConnectionYielder::new(); + + // move the peer to `Enabled` state + notif.inject_connection_established( + &peer, + &conn, + &ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }, + None, + 0usize, + ); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); + + let event = conn_yielder.open_substream(peer, 0, connected, vec![1, 2, 3, 4]); + + notif.inject_event(peer, conn, event); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); + + if let Some(PeerState::Enabled { ref connections, .. }) = notif.peers.get(&(peer, set_id)) { + assert!(std::matches!(connections[0], (_, ConnectionState::Open(_)))); + assert_eq!(connections[0].0, conn); + } else { + panic!("invalid state"); + } + + notif.peerset_report_connect(peer, set_id); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn peerset_report_connect_with_disabled_pending_enable_peer() { + let (mut notif, _peerset) = development_notifs(); + let set_id = sc_peerset::SetId::from(0); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // manually add backoff for the entry + if let Some(PeerState::Disabled { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, set_id)) + { + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap()); + } + + // switch state to `DisabledPendingEnable` + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!( + notif.peers.get(&(peer, set_id)), + Some(&PeerState::DisabledPendingEnable { .. }) + )); + + notif.peerset_report_connect(peer, set_id); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn peerset_report_connect_with_requested_peer() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let set_id = sc_peerset::SetId::from(0); + + // Set peer into `Requested` state. + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Requested))); + + notif.peerset_report_connect(peer, set_id); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn peerset_report_connect_with_pending_requested() { + let (mut notif, _peerset) = development_notifs(); + let set_id = sc_peerset::SetId::from(0); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // manually add backoff for the entry + if let Some(PeerState::Disabled { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, set_id)) + { + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap()); + } + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. }))); + + // attempt to connect the backed-off peer and verify that the request is pending + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!( + notif.peers.get(&(peer, set_id)), + Some(&PeerState::PendingRequest { .. }) + )); + + notif.peerset_report_connect(peer, set_id); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn peerset_report_disconnect_with_incoming_peer() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let set_id = sc_peerset::SetId::from(0); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + notif.peerset_report_disconnect(peer, set_id); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn peerset_report_accept_incoming_peer() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + assert!(std::matches!( + notif.incoming[0], + IncomingPeer { alive: true, incoming_id: sc_peerset::IncomingIndex(0), .. }, + )); + + notif.peers.remove(&(peer, set_id)); + notif.peerset_report_accept(sc_peerset::IncomingIndex(0)); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn peerset_report_accept_not_incoming_peer() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + let mut conn_yielder = ConnectionYielder::new(); + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + assert!(std::matches!( + notif.incoming[0], + IncomingPeer { alive: true, incoming_id: sc_peerset::IncomingIndex(0), .. }, + )); + + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); + + let event = conn_yielder.open_substream(peer, 0, connected, vec![1, 2, 3, 4]); + notif.inject_event(peer, conn, event); + + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); + notif.incoming[0].alive = true; + notif.peerset_report_accept(sc_peerset::IncomingIndex(0)); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn inject_connection_closed_non_existent_peer() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let endpoint = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_closed( + &peer, + &ConnectionId::new(0usize), + &endpoint, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &endpoint), + 0usize, + ); + } + + #[test] + fn disconnect_non_existent_peer() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let set_id = sc_peerset::SetId::from(0); + + notif.peerset_report_disconnect(peer, set_id); + + assert!(notif.peers.is_empty()); + assert!(notif.incoming.is_empty()); + } + + #[test] + fn accept_non_existent_connection() { + let (mut notif, _peerset) = development_notifs(); + + notif.peerset_report_accept(0.into()); + + assert!(notif.peers.is_empty()); + assert!(notif.incoming.is_empty()); + } + + #[test] + fn reject_non_existent_connection() { + let (mut notif, _peerset) = development_notifs(); + + notif.peerset_report_reject(0.into()); + + assert!(notif.peers.is_empty()); + assert!(notif.incoming.is_empty()); + } + + #[test] + fn reject_non_active_connection() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + notif.incoming[0].alive = false; + notif.peerset_report_reject(0.into()); + + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn reject_non_existent_peer_but_alive_connection() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + assert!(std::matches!( + notif.incoming[0], + IncomingPeer { alive: true, incoming_id: sc_peerset::IncomingIndex(0), .. }, + )); + + notif.peers.remove(&(peer, set_id)); + notif.peerset_report_reject(0.into()); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn inject_connection_closed_for_incoming_peer() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + notif.inject_connection_closed( + &peer, + &ConnectionId::new(1337usize), + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn inject_connection_closed_for_disabled_peer() { + let (mut notif, _peerset) = development_notifs(); + let set_id = sc_peerset::SetId::from(0); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + notif.inject_connection_closed( + &peer, + &ConnectionId::new(1337usize), + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn inject_connection_closed_for_disabled_pending_enable() { + let (mut notif, _peerset) = development_notifs(); + let set_id = sc_peerset::SetId::from(0); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // manually add backoff for the entry + if let Some(PeerState::Disabled { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, set_id)) + { + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap()); + } + + // switch state to `DisabledPendingEnable` + notif.peerset_report_connect(peer, set_id); + + assert!(std::matches!( + notif.peers.get(&(peer, set_id)), + Some(&PeerState::DisabledPendingEnable { .. }) + )); + + notif.inject_connection_closed( + &peer, + &ConnectionId::new(1337usize), + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn inject_connection_closed_for_incoming_peer_state_mismatch() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + notif.incoming[0].alive = false; + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn inject_connection_closed_for_enabled_state_mismatch() { + let (mut notif, _peerset) = development_notifs(); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let set_id = sc_peerset::SetId::from(0); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // remote opens a substream, verify that peer state is updated to `Incoming` + notif.inject_event(peer, conn, NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); + + // attempt to connect to the peer and verify that the peer state is `Enabled` + notif.peerset_report_connect(peer, set_id); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); + + notif.inject_connection_closed( + &peer, + &ConnectionId::new(1337usize), + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn inject_connection_closed_for_backoff_peer() { + let (mut notif, _peerset) = development_notifs(); + let set_id = sc_peerset::SetId::from(0); + let peer = PeerId::random(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + + notif.inject_connection_established(&peer, &conn, &connected, None, 0usize); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); + + // manually add backoff for the entry + if let Some(PeerState::Disabled { ref mut backoff_until, .. }) = + notif.peers.get_mut(&(peer, set_id)) + { + *backoff_until = + Some(Instant::now().checked_add(std::time::Duration::from_secs(5)).unwrap()); + } + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Backoff { .. }))); + + notif.inject_connection_closed( + &peer, + &conn, + &connected, + NotifsHandlerProto::new(vec![]).into_handler(&peer, &connected), + 0usize, + ); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] + fn open_desired_by_remote_non_existent_peer() { + let (mut notif, _peerset) = development_notifs(); + let conn = ConnectionId::new(0usize); + let connected = ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }; + let mut conn_yielder = ConnectionYielder::new(); + + notif.inject_event( + PeerId::random(), + conn, + conn_yielder.open_substream(PeerId::random(), 0, connected, vec![1, 2, 3, 4]), + ); + } +} diff --git a/client/network/src/protocol/notifications/handler.rs b/client/network/src/protocol/notifications/handler.rs index 7fe76c1bb6345..e45bd97adb79f 100644 --- a/client/network/src/protocol/notifications/handler.rs +++ b/client/network/src/protocol/notifications/handler.rs @@ -854,7 +854,7 @@ impl ConnectionHandler for NotifsHandler { } #[cfg(test)] -mod tests { +pub mod tests { use super::*; use crate::protocol::notifications::upgrade::{ NotificationsInOpen, NotificationsInSubstreamHandshake, NotificationsOutOpen, @@ -863,11 +863,93 @@ mod tests { use futures::prelude::*; use libp2p::{core::muxing::SubstreamBox, Multiaddr}; use multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}; - use std::io::{Error, IoSlice, IoSliceMut}; + use std::{ + collections::HashMap, + io::{Error, IoSlice, IoSliceMut}, + }; use tokio::{net::TcpStream, sync::mpsc}; use tokio_util::compat::TokioAsyncReadCompatExt; use unsigned_varint::codec::UviBytes; + struct OpenSubstream { + notifications: stream::Peekable< + stream::Select< + stream::Fuse>, + stream::Fuse>, + >, + >, + in_substream: MockSubstream, + out_substream: MockSubstream, + } + + pub struct ConnectionYielder { + connections: HashMap<(PeerId, usize), OpenSubstream>, + } + + impl ConnectionYielder { + /// Create new [`ConnectionYielder`]. + pub fn new() -> Self { + Self { connections: HashMap::new() } + } + + /// Open a new substream for peer. + pub fn open_substream( + &mut self, + peer: PeerId, + protocol_index: usize, + endpoint: ConnectedPoint, + received_handshake: Vec, + ) -> NotifsHandlerOut { + let (async_tx, async_rx) = + futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); + let (sync_tx, sync_rx) = + futures::channel::mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); + let notifications_sink = NotificationsSink { + inner: Arc::new(NotificationsSinkInner { + peer_id: peer, + async_channel: FuturesMutex::new(async_tx), + sync_channel: Mutex::new(Some(sync_tx)), + }), + }; + let (in_substream, out_substream) = MockSubstream::new(); + + self.connections.insert( + (peer, protocol_index), + OpenSubstream { + notifications: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(), + in_substream, + out_substream, + }, + ); + + NotifsHandlerOut::OpenResultOk { + protocol_index, + negotiated_fallback: None, + endpoint, + received_handshake, + notifications_sink, + } + } + + /// Attempt to get next pending event from one of the notification sinks. + pub async fn get_next_event(&mut self, peer: PeerId, set: usize) -> Option> { + let substream = if let Some(info) = self.connections.get_mut(&(peer, set)) { + info + } else { + return None + }; + + futures::future::poll_fn(|cx| match substream.notifications.poll_next_unpin(cx) { + Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => + Poll::Ready(Some(message)), + Poll::Pending => Poll::Ready(None), + Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) | Poll::Ready(None) => + panic!("sink closed"), + }) + .await + } + } + struct MockSubstream { pub rx: mpsc::Receiver>, pub tx: mpsc::Sender>, diff --git a/client/network/src/protocol/notifications/upgrade/collec.rs b/client/network/src/protocol/notifications/upgrade/collec.rs index 057cd3a243ffd..77593b8f676c6 100644 --- a/client/network/src/protocol/notifications/upgrade/collec.rs +++ b/client/network/src/protocol/notifications/upgrade/collec.rs @@ -73,7 +73,7 @@ where } /// Groups a `ProtocolName` with a `usize`. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct ProtoNameWithUsize(T, usize); impl ProtocolName for ProtoNameWithUsize { @@ -131,11 +131,13 @@ mod tests { let upgrade: UpgradeCollec<_> = upgrades.into_iter().collect::>(); let protos = vec![ - ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol1")), 1), 1), - ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol2")), 2), 2), - ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol3")), 3), 3), + ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol1")), 1), 0), + ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol2")), 2), 1), + ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol3")), 3), 2), ]; - assert!(std::matches!(upgrade.protocol_info().collect::>(), protos)); + let upgrades = upgrade.protocol_info().collect::>(); + + assert_eq!(upgrades, protos,); } #[test] @@ -165,13 +167,13 @@ mod tests { let upgrade: UpgradeCollec<_> = upgrades.into_iter().collect::>(); let protos = vec![ - ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol1")), 1), 1), - ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol2")), 2), 2), - ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol3")), 3), 3), - ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol22")), 1), 4), - ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol33")), 2), 5), - ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol44")), 3), 6), + ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol1")), 1), 0), + ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol2")), 2), 1), + ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol22")), 1), 2), + ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol33")), 2), 2), + ProtoNameWithUsize(ProtoNameWithUsize(ProtoName::from(format!("protocol44")), 3), 2), ]; - assert!(std::matches!(upgrade.protocol_info().collect::>(), protos)); + let upgrades = upgrade.protocol_info().collect::>(); + assert_eq!(upgrades, protos,); } }