Skip to content

Commit

Permalink
refactor: rewrite the peer store to add strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Jul 28, 2021
1 parent fb27f98 commit 19003f5
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 91 deletions.
2 changes: 1 addition & 1 deletion network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ impl<T: ExitHandler> ServiceHandle for EventHandler<T> {
match self.network_state.accept_peer(&session_context) {
Ok(Some(evicted_peer)) => {
debug!(
"evict peer (disonnect it), {} => {}",
"evict peer (disconnect it), {} => {}",
evicted_peer.session_id, evicted_peer.connected_addr,
);
if let Err(err) = disconnect_with_message(
Expand Down
2 changes: 1 addition & 1 deletion network/src/peer_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl PeerRegistry {
return Err(PeerError::ReachMaxOutboundLimit.into());
}
}
peer_store.add_connected_peer(remote_addr.clone(), session_type)?;
peer_store.add_connected_peer(remote_addr.clone(), session_type);
let peer = Peer::new(session_id, session_type, remote_addr, is_whitelist);
self.peers.insert(session_id, peer);
Ok(evicted_peer)
Expand Down
51 changes: 25 additions & 26 deletions network/src/peer_store/peer_store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ use crate::{
PeerId, SessionType,
};
use ipnetwork::IpNetwork;
use std::cell::{Ref, RefCell};
use std::collections::{hash_map::Entry, HashMap};

/// Peer store
#[derive(Default)]
pub struct PeerStore {
addr_manager: AddrManager,
ban_list: RefCell<BanList>,
peers: RefCell<HashMap<PeerId, PeerInfo>>,
ban_list: BanList,
peers: HashMap<PeerId, PeerInfo>,
score_config: PeerScoreConfig,
}

Expand All @@ -29,19 +28,17 @@ impl PeerStore {
pub fn new(addr_manager: AddrManager, ban_list: BanList) -> Self {
PeerStore {
addr_manager,
ban_list: RefCell::new(ban_list),
ban_list: ban_list,
peers: Default::default(),
score_config: Default::default(),
}
}

/// Add a peer and address into peer_store
/// this method will assume peer is connected, which implies address is "verified".
pub fn add_connected_peer(&mut self, addr: Multiaddr, session_type: SessionType) -> Result<()> {
pub fn add_connected_peer(&mut self, addr: Multiaddr, session_type: SessionType) {
let now_ms = faketime::unix_time_as_millis();
match self
.peers
.get_mut()
.entry(extract_peer_id(&addr).expect("connected addr should have peer id"))
{
Entry::Occupied(mut entry) => {
Expand All @@ -55,14 +52,9 @@ impl PeerStore {
entry.insert(peer);
}
}
let score = self.score_config.default_score;
if session_type.is_outbound() {
self.addr_manager.add(AddrInfo::new(addr, now_ms, score));
}
Ok(())
}

/// Add discovered peer addresses
/// Add discovered peer address
/// this method will assume peer and addr is untrust since we have not connected to it.
pub fn add_addr(&mut self, addr: Multiaddr) -> Result<()> {
self.check_purge()?;
Expand All @@ -71,6 +63,13 @@ impl PeerStore {
Ok(())
}

/// Add outbound peer address
pub fn add_outbound_addr(&mut self, addr: Multiaddr) {
let score = self.score_config.default_score;
self.addr_manager
.add(AddrInfo::new(addr, faketime::unix_time_as_millis(), score));
}

/// Get address manager
pub fn addr_manager(&self) -> &AddrManager {
&self.addr_manager
Expand Down Expand Up @@ -100,12 +99,12 @@ impl PeerStore {

/// Remove peer id
pub fn remove_disconnected_peer(&mut self, addr: &Multiaddr) -> Option<PeerInfo> {
extract_peer_id(addr).and_then(|peer_id| self.peers.borrow_mut().remove(&peer_id))
extract_peer_id(addr).and_then(|peer_id| self.peers.remove(&peer_id))
}

/// Get peer status
pub fn peer_status(&self, peer_id: &PeerId) -> Status {
if self.peers.borrow().contains_key(peer_id) {
if self.peers.contains_key(peer_id) {
Status::Connected
} else {
Status::Disconnected
Expand All @@ -115,8 +114,8 @@ impl PeerStore {
/// Get peers for outbound connection, this method randomly return non-connected peer addrs
pub fn fetch_addrs_to_attempt(&mut self, count: usize) -> Vec<AddrInfo> {
let now_ms = faketime::unix_time_as_millis();
let ban_list = self.ban_list.borrow();
let peers = self.peers.borrow();
let ban_list = &self.ban_list;
let peers = &self.peers;
// get addrs that can attempt.
self.addr_manager
.fetch_random(count, |peer_addr: &AddrInfo| {
Expand All @@ -134,8 +133,8 @@ impl PeerStore {
let now_ms = faketime::unix_time_as_millis();
let addr_expired_ms = now_ms - ADDR_TIMEOUT_MS;
// get expired or never successed addrs.
let ban_list = self.ban_list.borrow();
let peers = self.peers.borrow();
let ban_list = &self.ban_list;
let peers = &self.peers;
self.addr_manager
.fetch_random(count, |peer_addr: &AddrInfo| {
!ban_list.is_addr_banned(&peer_addr.addr)
Expand All @@ -151,8 +150,8 @@ impl PeerStore {
pub fn fetch_random_addrs(&mut self, count: usize) -> Vec<AddrInfo> {
let now_ms = faketime::unix_time_as_millis();
let addr_expired_ms = now_ms - ADDR_TIMEOUT_MS;
let ban_list = self.ban_list.borrow();
let peers = self.peers.borrow();
let ban_list = &self.ban_list;
let peers = &self.peers;
// get success connected addrs.
self.addr_manager
.fetch_random(count, |peer_addr: &AddrInfo| {
Expand Down Expand Up @@ -189,18 +188,18 @@ impl PeerStore {
}

/// Get ban list
pub fn ban_list(&self) -> Ref<BanList> {
self.ban_list.borrow()
pub fn ban_list(&self) -> &BanList {
&self.ban_list
}

/// Get mut ban list
pub fn mut_ban_list(&mut self) -> &mut BanList {
self.ban_list.get_mut()
&mut self.ban_list
}

/// Clear ban list
pub fn clear_ban_list(&self) {
self.ban_list.replace(Default::default());
pub fn clear_ban_list(&mut self) {
std::mem::take(&mut self.ban_list);
}

/// Check and try delete addrs if reach limit
Expand Down
39 changes: 17 additions & 22 deletions network/src/protocols/feeler.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use crate::network::disconnect_with_message;
use crate::NetworkState;
use ckb_logger::{debug, info};
use ckb_logger::debug;
use p2p::{
context::{ProtocolContext, ProtocolContextMutRef},
secio::PublicKey,
traits::ServiceProtocol,
};
use std::sync::Arc;
use std::sync::{atomic::Ordering, Arc};

/// Feeler
/// Currently do nothing, CKBProtocol auto refresh peer_store after connected.
Expand All @@ -20,28 +19,24 @@ impl Feeler {
}
}

//TODO
//1. report bad behaviours
//2. set peer feeler flag
impl ServiceProtocol for Feeler {
fn init(&mut self, _context: &mut ProtocolContext) {}

fn connected(&mut self, context: ProtocolContextMutRef, _: &str) {
fn connected(&mut self, context: ProtocolContextMutRef, version: &str) {
let session = context.session;
let peer_id = session
.remote_pubkey
.as_ref()
.map(PublicKey::peer_id)
.expect("Secio must enabled");
self.network_state.with_peer_store_mut(|peer_store| {
if let Err(err) = peer_store.add_connected_peer(session.address.clone(), session.ty) {
debug!(
"Failed to add connected peer to peer_store {:?} {:?} {:?}",
err, peer_id, session
);
}
});
info!("peer={} FeelerProtocol.connected", session.address);
if self.network_state.ckb2021.load(Ordering::SeqCst) && version != "2" {
self.network_state
.peer_store
.lock()
.mut_addr_manager()
.remove(&session.address);
} else if context.session.ty.is_outbound() {
self.network_state.with_peer_store_mut(|peer_store| {
peer_store.add_outbound_addr(session.address.clone());
});
}

debug!("peer={} FeelerProtocol.connected", session.address);
if let Err(err) =
disconnect_with_message(context.control(), session.id, "feeler connection")
{
Expand All @@ -54,6 +49,6 @@ impl ServiceProtocol for Feeler {
self.network_state.with_peer_registry_mut(|reg| {
reg.remove_feeler(&session.address);
});
info!("peer={} FeelerProtocol.disconnected", session.address);
debug!("peer={} FeelerProtocol.disconnected", session.address);
}
}
23 changes: 17 additions & 6 deletions network/src/protocols/identify/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod protocol;

use crate::{NetworkState, PeerIdentifyInfo, SupportProtocols};
use ckb_types::{packed, prelude::*};
use std::sync::atomic::Ordering;

use protocol::IdentifyMessage;

Expand Down Expand Up @@ -59,7 +60,7 @@ impl MisbehaveResult {
/// The trait to communicate with underlying peer storage
pub trait Callback: Clone + Send {
// Register open protocol
fn register(&self, id: SessionId, pid: ProtocolId, version: &str);
fn register(&self, context: &ProtocolContextMutRef, version: &str);
// remove registered identify protocol
fn unregister(&self, id: SessionId, pid: ProtocolId);
/// Received custom message
Expand Down Expand Up @@ -229,8 +230,7 @@ impl<T: Callback> ServiceProtocol for IdentifyProtocol<T> {
return;
}

self.callback
.register(session.id, context.proto_id, version);
self.callback.register(&context, version);

let remote_info = RemoteInfo::new(session.clone(), Duration::from_secs(DEFAULT_TIMEOUT));
trace!("IdentifyProtocol sconnected from {:?}", remote_info.peer_id);
Expand Down Expand Up @@ -361,12 +361,23 @@ impl IdentifyCallback {
}

impl Callback for IdentifyCallback {
fn register(&self, id: SessionId, pid: ProtocolId, version: &str) {
fn register(&self, context: &ProtocolContextMutRef, version: &str) {
self.network_state.with_peer_registry_mut(|reg| {
reg.get_peer_mut(id).map(|peer| {
peer.protocols.insert(pid, version.to_owned());
reg.get_peer_mut(context.session.id).map(|peer| {
peer.protocols.insert(context.proto_id, version.to_owned());
})
});
if self.network_state.ckb2021.load(Ordering::SeqCst) && version != "2" {
self.network_state
.peer_store
.lock()
.mut_addr_manager()
.remove(&context.session.address);
} else if context.session.ty.is_outbound() {
self.network_state.with_peer_store_mut(|peer_store| {
peer_store.add_outbound_addr(context.session.address.clone());
});
}
}

fn unregister(&self, id: SessionId, pid: ProtocolId) {
Expand Down
Loading

0 comments on commit 19003f5

Please sign in to comment.