From 1691bfa47ac561a2f27243e21b1b2fad2fb64be9 Mon Sep 17 00:00:00 2001 From: zeroqn Date: Fri, 18 Oct 2019 22:41:42 +0800 Subject: [PATCH] feat(network): log connected peer ips (#23) --- core/network/src/common.rs | 18 +++++- core/network/src/peer_manager/mod.rs | 83 ++++++++++++++++++++------- core/network/src/peer_manager/peer.rs | 24 +++++++- 3 files changed, 99 insertions(+), 26 deletions(-) diff --git a/core/network/src/common.rs b/core/network/src/common.rs index dc27291e3..f4491a49a 100644 --- a/core/network/src/common.rs +++ b/core/network/src/common.rs @@ -1,6 +1,6 @@ use std::{ future::Future, - net::SocketAddr, + net::{IpAddr, SocketAddr}, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -9,7 +9,7 @@ use std::{ use futures::{pin_mut, task::AtomicWaker}; use futures_timer::Delay; -use tentacle::multiaddr::{Multiaddr, Protocol}; +use tentacle::multiaddr::{Error as MultiaddrError, Multiaddr, Protocol}; #[macro_export] macro_rules! loop_ready { @@ -41,6 +41,20 @@ pub fn socket_to_multi_addr(socket_addr: SocketAddr) -> Multiaddr { multi_addr } +pub fn multi_addr_ip(addr: &Multiaddr) -> Result { + let comps = addr.iter().collect::>(); + + if comps.len() < 2 { + return Err(MultiaddrError::DataLessThanLen); + } + + match comps[0] { + Protocol::Ip4(ip) => Ok(IpAddr::V4(ip)), + Protocol::Ip6(ip) => Ok(IpAddr::V6(ip)), + _ => Err(MultiaddrError::InvalidMultiaddr), + } +} + pub struct HeartBeat { waker: Arc, interval: Duration, diff --git a/core/network/src/peer_manager/mod.rs b/core/network/src/peer_manager/mod.rs index e9e28a360..e698af64a 100644 --- a/core/network/src/peer_manager/mod.rs +++ b/core/network/src/peer_manager/mod.rs @@ -15,6 +15,7 @@ use std::{ collections::{HashMap, HashSet}, future::Future, hash::{Hash, Hasher}, + net::IpAddr, path::PathBuf, pin::Pin, sync::atomic::{AtomicBool, Ordering}, @@ -45,7 +46,7 @@ use tentacle::{ }; use crate::{ - common::HeartBeat, + common::{multi_addr_ip, HeartBeat}, error::NetworkError, event::{ConnectionEvent, ConnectionType, MultiUsersMessage, PeerManagerEvent, Session}, }; @@ -123,10 +124,44 @@ struct Inner { addr_pid: RwLock>, user_pid: RwLock>, pool: RwLock>, - listen: RwLock>, + + // Self PeerId + peer_id: Arc, + listen: RwLock>, } impl Inner { + pub fn new(pid: PeerId) -> Self { + Inner { + connecting: Default::default(), + connected: Default::default(), + + addr_pid: Default::default(), + user_pid: Default::default(), + pool: Default::default(), + + peer_id: Arc::new(pid), + listen: RwLock::new(None), + } + } + + pub fn connected_peers(&self) -> Vec { + self.connected + .read() + .iter() + .filter(|pid| pid != &self.peer_id.as_ref()) + .cloned() + .collect() + } + + pub fn connection_ip(&self, pid: &PeerId) -> Option { + if let Some(Some(ip)) = self.pool.read().get(pid).map(|peer| peer.connection_ip()) { + Some(ip) + } else { + None + } + } + pub fn peer_exist(&self, pid: &PeerId) -> bool { self.pool.read().contains_key(pid) } @@ -282,7 +317,7 @@ impl Inner { self.connecting.write().insert(peer_id.clone()); } - pub fn set_connected(&self, peer_id: &PeerId) { + pub fn set_connected(&self, peer_id: &PeerId, ip: Option) { // Clean outbound connection self.connecting.write().remove(peer_id); self.connected.write().insert(peer_id.clone()); @@ -290,6 +325,7 @@ impl Inner { let mut pool = self.pool.write(); if let Some(peer) = pool.get_mut(peer_id) { peer.update_connect(); + peer.set_connection_ip(ip); } } @@ -301,6 +337,7 @@ impl Inner { if let Some(peer) = pool.get_mut(peer_id) { peer.update_disconnect(); peer.update_alive(); + peer.set_connection_ip(None); } } @@ -309,7 +346,8 @@ impl Inner { } pub fn connection_count(&self) -> usize { - self.connected.read().len() + self.connecting.read().len() + // -1 to remove ourself + self.connected.read().len() + self.connecting.read().len() - 1 } pub fn unconnected_peers(&self, max: usize) -> Vec { @@ -390,20 +428,6 @@ impl Inner { } } -impl Default for Inner { - fn default() -> Self { - Inner { - connected: Default::default(), - connecting: Default::default(), - - addr_pid: Default::default(), - user_pid: Default::default(), - pool: Default::default(), - listen: Default::default(), - } - } -} - // TODO: Store our secret key? #[derive(Debug)] pub struct PeerManagerConfig { @@ -479,11 +503,12 @@ impl PeerManager { event_rx: UnboundedReceiver, conn_tx: UnboundedSender, ) -> Self { - let inner = Arc::new(Inner::default()); + let peer_id = config.our_id.clone(); + + let inner = Arc::new(Inner::new(peer_id.clone())); let waker = Arc::new(AtomicWaker::new()); let heart_beat = HeartBeat::new(Arc::clone(&waker), config.routine_interval); let persistence = Box::new(NoopPersistence); - let peer_id = config.our_id.clone(); // Register our self inner.register_self(config.our_id.clone(), config.pubkey.clone()); @@ -597,6 +622,7 @@ impl PeerManager { fn attach_peer_session(&mut self, pubkey: PublicKey, session: Session) { let Session { sid, addr, ty } = session; + let connection_ip = multi_addr_ip(&addr).ok(); let user_addr = Peer::pubkey_to_addr(&pubkey).as_hex(); let pid = pubkey.peer_id(); @@ -622,7 +648,7 @@ impl PeerManager { self.inner.add_peer_addr(&pid, addr); } - self.inner.set_connected(&pid); + self.inner.set_connected(&pid, connection_ip); self.peer_session.insert(pid.clone(), sid); self.session_peer.insert(sid, pid); @@ -727,6 +753,15 @@ impl PeerManager { } } + fn connected_peers_ip(&self) -> Vec<(PeerId, Option)> { + let connected_pids = self.inner.connected_peers(); + + connected_pids + .into_iter() + .map(|pid| (pid.clone(), self.inner.connection_ip(&pid))) + .collect() + } + fn unconnected_unknowns(&self, max: usize) -> Vec<&UnknownAddr> { self.unknown_addrs .iter() @@ -1000,6 +1035,12 @@ impl Future for PeerManager { self.process_event(event); } + info!( + "network: {:?}: connected peer_ip(s): {:?}", + self.peer_id, + self.connected_peers_ip() + ); + // Check connecting count let connection_count = self.inner.connection_count(); if connection_count < self.config.max_connections { diff --git a/core/network/src/peer_manager/peer.rs b/core/network/src/peer_manager/peer.rs index 6ce9de8ff..911b7ce64 100644 --- a/core/network/src/peer_manager/peer.rs +++ b/core/network/src/peer_manager/peer.rs @@ -1,6 +1,7 @@ use std::{ collections::HashSet, default::Default, + net::IpAddr, sync::Arc, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; @@ -20,14 +21,17 @@ pub const VALID_ATTEMPT_INTERVAL: u64 = 4; // TODO: display next_retry #[derive(Debug, Clone, Serialize, Deserialize, Display)] #[display( - fmt = "addrs: {:?}, retry: {}, last_connect: {}, alive: {}", - addr_set, + fmt = "connection_ip: {:?}, retry: {}, last_connect: {}, alive: {}", + connection_ip, retry_count, connect_at, alive )] pub(super) struct PeerState { - // Peer listen address set + // Current connection ip + connection_ip: Option, + + // Peer address set (Multiple format, p2p, quic etc) addr_set: HashSet, #[serde(skip)] @@ -67,6 +71,7 @@ pub struct Peer { impl PeerState { pub fn new() -> Self { PeerState { + connection_ip: None, addr_set: Default::default(), retry_count: 0, next_retry: Instant::now(), @@ -79,6 +84,11 @@ impl PeerState { pub fn from_addrs(addrs: Vec) -> Self { let mut state = PeerState::new(); + + if addrs.is_empty() { + return state; + } + state.addr_set.extend(addrs.into_iter()); state @@ -116,6 +126,10 @@ impl Peer { &self.pubkey } + pub fn connection_ip(&self) -> Option { + self.state.connection_ip + } + pub fn user_addr(&self) -> &UserAddress { &self.user_addr } @@ -140,6 +154,10 @@ impl Peer { self.state = state } + pub fn set_connection_ip(&mut self, ip: Option) { + self.state.connection_ip = ip; + } + pub fn add_addr(&mut self, addr: Multiaddr) { self.state.addr_set.insert(addr); }