diff --git a/network/src/network.rs b/network/src/network.rs index 768830ab804..59a0d3cd01f 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -670,7 +670,7 @@ impl ServiceHandle for EventHandler { match self.network_state.accept_peer(&session_context) { Ok(Some(evicted_peer)) => { debug!( - "evict peer (disonnect it), {} => {}", + "evict peer (disconnect it), {} => {}", evicted_peer.session_id, evicted_peer.connected_addr, ); if let Err(err) = disconnect_with_message( diff --git a/network/src/peer_registry.rs b/network/src/peer_registry.rs index bd17e75bf3c..784369a40d3 100644 --- a/network/src/peer_registry.rs +++ b/network/src/peer_registry.rs @@ -108,7 +108,7 @@ impl PeerRegistry { return Err(PeerError::ReachMaxOutboundLimit.into()); } } - peer_store.add_connected_peer(remote_addr.clone(), session_type)?; + peer_store.add_connected_peer(remote_addr.clone(), session_type); let peer = Peer::new(session_id, session_type, remote_addr, is_whitelist); self.peers.insert(session_id, peer); Ok(evicted_peer) diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs index cebbaa15b67..66138baa08f 100644 --- a/network/src/peer_store/peer_store_impl.rs +++ b/network/src/peer_store/peer_store_impl.rs @@ -12,15 +12,14 @@ use crate::{ PeerId, SessionType, }; use ipnetwork::IpNetwork; -use std::cell::{Ref, RefCell}; use std::collections::{hash_map::Entry, HashMap}; /// Peer store #[derive(Default)] pub struct PeerStore { addr_manager: AddrManager, - ban_list: RefCell, - peers: RefCell>, + ban_list: BanList, + peers: HashMap, score_config: PeerScoreConfig, } @@ -29,19 +28,17 @@ impl PeerStore { pub fn new(addr_manager: AddrManager, ban_list: BanList) -> Self { PeerStore { addr_manager, - ban_list: RefCell::new(ban_list), + ban_list: ban_list, peers: Default::default(), score_config: Default::default(), } } - /// Add a peer and address into peer_store /// this method will assume peer is connected, which implies address is "verified". - pub fn add_connected_peer(&mut self, addr: Multiaddr, session_type: SessionType) -> Result<()> { + pub fn add_connected_peer(&mut self, addr: Multiaddr, session_type: SessionType) { let now_ms = faketime::unix_time_as_millis(); match self .peers - .get_mut() .entry(extract_peer_id(&addr).expect("connected addr should have peer id")) { Entry::Occupied(mut entry) => { @@ -55,14 +52,9 @@ impl PeerStore { entry.insert(peer); } } - let score = self.score_config.default_score; - if session_type.is_outbound() { - self.addr_manager.add(AddrInfo::new(addr, now_ms, score)); - } - Ok(()) } - /// Add discovered peer addresses + /// Add discovered peer address /// this method will assume peer and addr is untrust since we have not connected to it. pub fn add_addr(&mut self, addr: Multiaddr) -> Result<()> { self.check_purge()?; @@ -71,6 +63,13 @@ impl PeerStore { Ok(()) } + /// Add outbound peer address + pub fn add_outbound_addr(&mut self, addr: Multiaddr) { + let score = self.score_config.default_score; + self.addr_manager + .add(AddrInfo::new(addr, faketime::unix_time_as_millis(), score)); + } + /// Get address manager pub fn addr_manager(&self) -> &AddrManager { &self.addr_manager @@ -100,12 +99,12 @@ impl PeerStore { /// Remove peer id pub fn remove_disconnected_peer(&mut self, addr: &Multiaddr) -> Option { - extract_peer_id(addr).and_then(|peer_id| self.peers.borrow_mut().remove(&peer_id)) + extract_peer_id(addr).and_then(|peer_id| self.peers.remove(&peer_id)) } /// Get peer status pub fn peer_status(&self, peer_id: &PeerId) -> Status { - if self.peers.borrow().contains_key(peer_id) { + if self.peers.contains_key(peer_id) { Status::Connected } else { Status::Disconnected @@ -115,8 +114,8 @@ impl PeerStore { /// Get peers for outbound connection, this method randomly return non-connected peer addrs pub fn fetch_addrs_to_attempt(&mut self, count: usize) -> Vec { let now_ms = faketime::unix_time_as_millis(); - let ban_list = self.ban_list.borrow(); - let peers = self.peers.borrow(); + let ban_list = &self.ban_list; + let peers = &self.peers; // get addrs that can attempt. self.addr_manager .fetch_random(count, |peer_addr: &AddrInfo| { @@ -134,8 +133,8 @@ impl PeerStore { let now_ms = faketime::unix_time_as_millis(); let addr_expired_ms = now_ms - ADDR_TIMEOUT_MS; // get expired or never successed addrs. - let ban_list = self.ban_list.borrow(); - let peers = self.peers.borrow(); + let ban_list = &self.ban_list; + let peers = &self.peers; self.addr_manager .fetch_random(count, |peer_addr: &AddrInfo| { !ban_list.is_addr_banned(&peer_addr.addr) @@ -151,8 +150,8 @@ impl PeerStore { pub fn fetch_random_addrs(&mut self, count: usize) -> Vec { let now_ms = faketime::unix_time_as_millis(); let addr_expired_ms = now_ms - ADDR_TIMEOUT_MS; - let ban_list = self.ban_list.borrow(); - let peers = self.peers.borrow(); + let ban_list = &self.ban_list; + let peers = &self.peers; // get success connected addrs. self.addr_manager .fetch_random(count, |peer_addr: &AddrInfo| { @@ -189,18 +188,18 @@ impl PeerStore { } /// Get ban list - pub fn ban_list(&self) -> Ref { - self.ban_list.borrow() + pub fn ban_list(&self) -> &BanList { + &self.ban_list } /// Get mut ban list pub fn mut_ban_list(&mut self) -> &mut BanList { - self.ban_list.get_mut() + &mut self.ban_list } /// Clear ban list - pub fn clear_ban_list(&self) { - self.ban_list.replace(Default::default()); + pub fn clear_ban_list(&mut self) { + std::mem::take(&mut self.ban_list); } /// Check and try delete addrs if reach limit diff --git a/network/src/protocols/feeler.rs b/network/src/protocols/feeler.rs index 22a7a6bb8e9..d5505294e94 100644 --- a/network/src/protocols/feeler.rs +++ b/network/src/protocols/feeler.rs @@ -1,12 +1,11 @@ use crate::network::disconnect_with_message; use crate::NetworkState; -use ckb_logger::{debug, info}; +use ckb_logger::debug; use p2p::{ context::{ProtocolContext, ProtocolContextMutRef}, - secio::PublicKey, traits::ServiceProtocol, }; -use std::sync::Arc; +use std::sync::{atomic::Ordering, Arc}; /// Feeler /// Currently do nothing, CKBProtocol auto refresh peer_store after connected. @@ -20,28 +19,24 @@ impl Feeler { } } -//TODO -//1. report bad behaviours -//2. set peer feeler flag impl ServiceProtocol for Feeler { fn init(&mut self, _context: &mut ProtocolContext) {} - fn connected(&mut self, context: ProtocolContextMutRef, _: &str) { + fn connected(&mut self, context: ProtocolContextMutRef, version: &str) { let session = context.session; - let peer_id = session - .remote_pubkey - .as_ref() - .map(PublicKey::peer_id) - .expect("Secio must enabled"); - self.network_state.with_peer_store_mut(|peer_store| { - if let Err(err) = peer_store.add_connected_peer(session.address.clone(), session.ty) { - debug!( - "Failed to add connected peer to peer_store {:?} {:?} {:?}", - err, peer_id, session - ); - } - }); - info!("peer={} FeelerProtocol.connected", session.address); + if self.network_state.ckb2021.load(Ordering::SeqCst) && version != "2" { + self.network_state + .peer_store + .lock() + .mut_addr_manager() + .remove(&session.address); + } else if context.session.ty.is_outbound() { + self.network_state.with_peer_store_mut(|peer_store| { + peer_store.add_outbound_addr(session.address.clone()); + }); + } + + debug!("peer={} FeelerProtocol.connected", session.address); if let Err(err) = disconnect_with_message(context.control(), session.id, "feeler connection") { @@ -54,6 +49,6 @@ impl ServiceProtocol for Feeler { self.network_state.with_peer_registry_mut(|reg| { reg.remove_feeler(&session.address); }); - info!("peer={} FeelerProtocol.disconnected", session.address); + debug!("peer={} FeelerProtocol.disconnected", session.address); } } diff --git a/network/src/protocols/identify/mod.rs b/network/src/protocols/identify/mod.rs index 604d80db695..d316149217f 100644 --- a/network/src/protocols/identify/mod.rs +++ b/network/src/protocols/identify/mod.rs @@ -19,6 +19,7 @@ mod protocol; use crate::{NetworkState, PeerIdentifyInfo, SupportProtocols}; use ckb_types::{packed, prelude::*}; +use std::sync::atomic::Ordering; use protocol::IdentifyMessage; @@ -59,7 +60,7 @@ impl MisbehaveResult { /// The trait to communicate with underlying peer storage pub trait Callback: Clone + Send { // Register open protocol - fn register(&self, id: SessionId, pid: ProtocolId, version: &str); + fn register(&self, context: &ProtocolContextMutRef, version: &str); // remove registered identify protocol fn unregister(&self, id: SessionId, pid: ProtocolId); /// Received custom message @@ -229,8 +230,7 @@ impl ServiceProtocol for IdentifyProtocol { return; } - self.callback - .register(session.id, context.proto_id, version); + self.callback.register(&context, version); let remote_info = RemoteInfo::new(session.clone(), Duration::from_secs(DEFAULT_TIMEOUT)); trace!("IdentifyProtocol sconnected from {:?}", remote_info.peer_id); @@ -361,12 +361,23 @@ impl IdentifyCallback { } impl Callback for IdentifyCallback { - fn register(&self, id: SessionId, pid: ProtocolId, version: &str) { + fn register(&self, context: &ProtocolContextMutRef, version: &str) { self.network_state.with_peer_registry_mut(|reg| { - reg.get_peer_mut(id).map(|peer| { - peer.protocols.insert(pid, version.to_owned()); + reg.get_peer_mut(context.session.id).map(|peer| { + peer.protocols.insert(context.proto_id, version.to_owned()); }) }); + if self.network_state.ckb2021.load(Ordering::SeqCst) && version != "2" { + self.network_state + .peer_store + .lock() + .mut_addr_manager() + .remove(&context.session.address); + } else if context.session.ty.is_outbound() { + self.network_state.with_peer_store_mut(|peer_store| { + peer_store.add_outbound_addr(context.session.address.clone()); + }); + } } fn unregister(&self, id: SessionId, pid: ProtocolId) { diff --git a/network/src/tests/peer_store.rs b/network/src/tests/peer_store.rs index a570a170582..845afa66e8b 100644 --- a/network/src/tests/peer_store.rs +++ b/network/src/tests/peer_store.rs @@ -11,9 +11,8 @@ fn test_add_connected_peer() { let mut peer_store: PeerStore = Default::default(); let addr = random_addr(); assert_eq!(peer_store.fetch_random_addrs(2).len(), 0); - peer_store - .add_connected_peer(addr, SessionType::Outbound) - .unwrap(); + peer_store.add_connected_peer(addr.clone(), SessionType::Outbound); + peer_store.add_addr(addr).unwrap(); assert_eq!(peer_store.fetch_random_addrs(2).len(), 1); } @@ -39,9 +38,7 @@ fn test_report() { fn test_update_status() { let mut peer_store: PeerStore = Default::default(); let addr = random_addr(); - peer_store - .add_connected_peer(addr.clone(), SessionType::Inbound) - .unwrap(); + peer_store.add_connected_peer(addr.clone(), SessionType::Inbound); assert_eq!( peer_store.peer_status(&extract_peer_id(&addr).unwrap()), Status::Connected @@ -52,9 +49,7 @@ fn test_update_status() { fn test_ban_peer() { let mut peer_store: PeerStore = Default::default(); let addr = random_addr(); - peer_store - .add_connected_peer(addr.clone(), SessionType::Inbound) - .unwrap(); + peer_store.add_connected_peer(addr.clone(), SessionType::Inbound); peer_store.ban_addr(&addr, 10_000, "no reason".into()); assert!(peer_store.is_addr_banned(&addr)); } @@ -76,9 +71,7 @@ fn test_fetch_addrs_to_attempt() { let addr = random_addr(); peer_store.add_addr(addr.clone()).unwrap(); assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 1); - peer_store - .add_connected_peer(addr, SessionType::Outbound) - .unwrap(); + peer_store.add_connected_peer(addr, SessionType::Outbound); assert!(peer_store.fetch_addrs_to_attempt(1).is_empty()); } @@ -111,12 +104,15 @@ fn test_fetch_addrs_to_feeler() { assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1); // ignores connected peers' addrs - peer_store - .add_connected_peer(addr.clone(), SessionType::Outbound) - .unwrap(); + peer_store.add_connected_peer(addr.clone(), SessionType::Outbound); assert!(peer_store.fetch_addrs_to_feeler(1).is_empty()); // peer does not need feeler if it connected to us recently + peer_store + .mut_addr_manager() + .get_mut(&addr) + .unwrap() + .last_connected_at_ms = faketime::unix_time_as_millis(); peer_store.remove_disconnected_peer(&addr); assert!(peer_store.fetch_addrs_to_feeler(1).is_empty()); } @@ -137,27 +133,21 @@ fn test_fetch_random_addrs() { // random should not return peer that we have never connected to assert!(peer_store.fetch_random_addrs(1).is_empty()); // can't get peer addr from inbound - peer_store - .add_connected_peer(addr1.clone(), SessionType::Inbound) - .unwrap(); + peer_store.add_connected_peer(addr1.clone(), SessionType::Inbound); assert!(peer_store.fetch_random_addrs(1).is_empty()); // get peer addr from outbound - peer_store - .add_connected_peer(addr1, SessionType::Outbound) - .unwrap(); + peer_store.add_connected_peer(addr1.clone(), SessionType::Outbound); + peer_store.add_addr(addr1).unwrap(); assert_eq!(peer_store.fetch_random_addrs(2).len(), 1); // get peer addrs by limit - peer_store - .add_connected_peer(addr2, SessionType::Outbound) - .unwrap(); + peer_store.add_connected_peer(addr2.clone(), SessionType::Outbound); + peer_store.add_addr(addr2).unwrap(); assert_eq!(peer_store.fetch_random_addrs(2).len(), 2); assert_eq!(peer_store.fetch_random_addrs(1).len(), 1); // return old peer's addr peer_store.add_addr(addr3.clone()).unwrap(); - peer_store - .add_connected_peer(addr3.clone(), SessionType::Outbound) - .unwrap(); + peer_store.add_connected_peer(addr3.clone(), SessionType::Outbound); // set last_connected_at_ms to an expired timestamp // should still return peer's addr peer_store @@ -173,18 +163,16 @@ fn test_fetch_random_addrs() { #[test] fn test_get_random_restrict_addrs_from_same_ip() { let mut peer_store: PeerStore = Default::default(); - let addr1 = format!("/ip4/225.0.0.1/tcp/42/p2p/{}", PeerId::random().to_base58()) + let addr1: Multiaddr = format!("/ip4/225.0.0.1/tcp/42/p2p/{}", PeerId::random().to_base58()) .parse() .unwrap(); - let addr2 = format!("/ip4/225.0.0.1/tcp/43/p2p/{}", PeerId::random().to_base58()) + let addr2: Multiaddr = format!("/ip4/225.0.0.1/tcp/43/p2p/{}", PeerId::random().to_base58()) .parse() .unwrap(); - peer_store - .add_connected_peer(addr1, SessionType::Outbound) - .unwrap(); - peer_store - .add_connected_peer(addr2, SessionType::Outbound) - .unwrap(); + peer_store.add_connected_peer(addr1.clone(), SessionType::Outbound); + peer_store.add_connected_peer(addr2.clone(), SessionType::Outbound); + peer_store.add_addr(addr1).unwrap(); + peer_store.add_addr(addr2).unwrap(); assert_eq!(peer_store.fetch_random_addrs(2).len(), 1); }