Skip to content

Commit

Permalink
feat(network): log connected peer ips (nervosnetwork#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroqn authored and yejiayu committed Oct 31, 2019
1 parent 6bf92fe commit 77ed393
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 26 deletions.
18 changes: 16 additions & 2 deletions core/network/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
future::Future,
net::SocketAddr,
net::{IpAddr, SocketAddr},
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand All @@ -9,7 +9,7 @@ use std::{

use futures::{pin_mut, task::AtomicWaker};
use futures_timer::Delay;
use tentacle::multiaddr::{Multiaddr, Protocol};
use tentacle::multiaddr::{Error as MultiaddrError, Multiaddr, Protocol};

#[macro_export]
macro_rules! loop_ready {
Expand Down Expand Up @@ -41,6 +41,20 @@ pub fn socket_to_multi_addr(socket_addr: SocketAddr) -> Multiaddr {
multi_addr
}

pub fn multi_addr_ip(addr: &Multiaddr) -> Result<IpAddr, MultiaddrError> {
let comps = addr.iter().collect::<Vec<_>>();

if comps.len() < 2 {
return Err(MultiaddrError::DataLessThanLen);
}

match comps[0] {
Protocol::Ip4(ip) => Ok(IpAddr::V4(ip)),
Protocol::Ip6(ip) => Ok(IpAddr::V6(ip)),
_ => Err(MultiaddrError::InvalidMultiaddr),
}
}

pub struct HeartBeat {
waker: Arc<AtomicWaker>,
interval: Duration,
Expand Down
83 changes: 62 additions & 21 deletions core/network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::{
collections::{HashMap, HashSet},
future::Future,
hash::{Hash, Hasher},
net::IpAddr,
path::PathBuf,
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -45,7 +46,7 @@ use tentacle::{
};

use crate::{
common::HeartBeat,
common::{multi_addr_ip, HeartBeat},
error::NetworkError,
event::{ConnectionEvent, ConnectionType, MultiUsersMessage, PeerManagerEvent, Session},
};
Expand Down Expand Up @@ -123,10 +124,44 @@ struct Inner {
addr_pid: RwLock<HashMap<Multiaddr, PeerId>>,
user_pid: RwLock<HashMap<UserAddress, PeerId>>,
pool: RwLock<HashMap<PeerId, Peer>>,
listen: RwLock<Option<Multiaddr>>,

// Self PeerId
peer_id: Arc<PeerId>,
listen: RwLock<Option<Multiaddr>>,
}

impl Inner {
pub fn new(pid: PeerId) -> Self {
Inner {
connecting: Default::default(),
connected: Default::default(),

addr_pid: Default::default(),
user_pid: Default::default(),
pool: Default::default(),

peer_id: Arc::new(pid),
listen: RwLock::new(None),
}
}

pub fn connected_peers(&self) -> Vec<PeerId> {
self.connected
.read()
.iter()
.filter(|pid| pid != &self.peer_id.as_ref())
.cloned()
.collect()
}

pub fn connection_ip(&self, pid: &PeerId) -> Option<IpAddr> {
if let Some(Some(ip)) = self.pool.read().get(pid).map(|peer| peer.connection_ip()) {
Some(ip)
} else {
None
}
}

pub fn peer_exist(&self, pid: &PeerId) -> bool {
self.pool.read().contains_key(pid)
}
Expand Down Expand Up @@ -282,14 +317,15 @@ impl Inner {
self.connecting.write().insert(peer_id.clone());
}

pub fn set_connected(&self, peer_id: &PeerId) {
pub fn set_connected(&self, peer_id: &PeerId, ip: Option<IpAddr>) {
// Clean outbound connection
self.connecting.write().remove(peer_id);
self.connected.write().insert(peer_id.clone());

let mut pool = self.pool.write();
if let Some(peer) = pool.get_mut(peer_id) {
peer.update_connect();
peer.set_connection_ip(ip);
}
}

Expand All @@ -301,6 +337,7 @@ impl Inner {
if let Some(peer) = pool.get_mut(peer_id) {
peer.update_disconnect();
peer.update_alive();
peer.set_connection_ip(None);
}
}

Expand All @@ -309,7 +346,8 @@ impl Inner {
}

pub fn connection_count(&self) -> usize {
self.connected.read().len() + self.connecting.read().len()
// -1 to remove ourself
self.connected.read().len() + self.connecting.read().len() - 1
}

pub fn unconnected_peers(&self, max: usize) -> Vec<PeerId> {
Expand Down Expand Up @@ -390,20 +428,6 @@ impl Inner {
}
}

impl Default for Inner {
fn default() -> Self {
Inner {
connected: Default::default(),
connecting: Default::default(),

addr_pid: Default::default(),
user_pid: Default::default(),
pool: Default::default(),
listen: Default::default(),
}
}
}

// TODO: Store our secret key?
#[derive(Debug)]
pub struct PeerManagerConfig {
Expand Down Expand Up @@ -479,11 +503,12 @@ impl PeerManager {
event_rx: UnboundedReceiver<PeerManagerEvent>,
conn_tx: UnboundedSender<ConnectionEvent>,
) -> Self {
let inner = Arc::new(Inner::default());
let peer_id = config.our_id.clone();

let inner = Arc::new(Inner::new(peer_id.clone()));
let waker = Arc::new(AtomicWaker::new());
let heart_beat = HeartBeat::new(Arc::clone(&waker), config.routine_interval);
let persistence = Box::new(NoopPersistence);
let peer_id = config.our_id.clone();

// Register our self
inner.register_self(config.our_id.clone(), config.pubkey.clone());
Expand Down Expand Up @@ -597,6 +622,7 @@ impl PeerManager {
fn attach_peer_session(&mut self, pubkey: PublicKey, session: Session) {
let Session { sid, addr, ty } = session;

let connection_ip = multi_addr_ip(&addr).ok();
let user_addr = Peer::pubkey_to_addr(&pubkey).as_hex();
let pid = pubkey.peer_id();

Expand All @@ -622,7 +648,7 @@ impl PeerManager {
self.inner.add_peer_addr(&pid, addr);
}

self.inner.set_connected(&pid);
self.inner.set_connected(&pid, connection_ip);

self.peer_session.insert(pid.clone(), sid);
self.session_peer.insert(sid, pid);
Expand Down Expand Up @@ -727,6 +753,15 @@ impl PeerManager {
}
}

fn connected_peers_ip(&self) -> Vec<(PeerId, Option<IpAddr>)> {
let connected_pids = self.inner.connected_peers();

connected_pids
.into_iter()
.map(|pid| (pid.clone(), self.inner.connection_ip(&pid)))
.collect()
}

fn unconnected_unknowns(&self, max: usize) -> Vec<&UnknownAddr> {
self.unknown_addrs
.iter()
Expand Down Expand Up @@ -1000,6 +1035,12 @@ impl Future for PeerManager {
self.process_event(event);
}

info!(
"network: {:?}: connected peer_ip(s): {:?}",
self.peer_id,
self.connected_peers_ip()
);

// Check connecting count
let connection_count = self.inner.connection_count();
if connection_count < self.config.max_connections {
Expand Down
24 changes: 21 additions & 3 deletions core/network/src/peer_manager/peer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::HashSet,
default::Default,
net::IpAddr,
sync::Arc,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
Expand All @@ -20,14 +21,17 @@ pub const VALID_ATTEMPT_INTERVAL: u64 = 4;
// TODO: display next_retry
#[derive(Debug, Clone, Serialize, Deserialize, Display)]
#[display(
fmt = "addrs: {:?}, retry: {}, last_connect: {}, alive: {}",
addr_set,
fmt = "connection_ip: {:?}, retry: {}, last_connect: {}, alive: {}",
connection_ip,
retry_count,
connect_at,
alive
)]
pub(super) struct PeerState {
// Peer listen address set
// Current connection ip
connection_ip: Option<IpAddr>,

// Peer address set (Multiple format, p2p, quic etc)
addr_set: HashSet<Multiaddr>,

#[serde(skip)]
Expand Down Expand Up @@ -67,6 +71,7 @@ pub struct Peer {
impl PeerState {
pub fn new() -> Self {
PeerState {
connection_ip: None,
addr_set: Default::default(),
retry_count: 0,
next_retry: Instant::now(),
Expand All @@ -79,6 +84,11 @@ impl PeerState {

pub fn from_addrs(addrs: Vec<Multiaddr>) -> Self {
let mut state = PeerState::new();

if addrs.is_empty() {
return state;
}

state.addr_set.extend(addrs.into_iter());

state
Expand Down Expand Up @@ -116,6 +126,10 @@ impl Peer {
&self.pubkey
}

pub fn connection_ip(&self) -> Option<IpAddr> {
self.state.connection_ip
}

pub fn user_addr(&self) -> &UserAddress {
&self.user_addr
}
Expand All @@ -140,6 +154,10 @@ impl Peer {
self.state = state
}

pub fn set_connection_ip(&mut self, ip: Option<IpAddr>) {
self.state.connection_ip = ip;
}

pub fn add_addr(&mut self, addr: Multiaddr) {
self.state.addr_set.insert(addr);
}
Expand Down

0 comments on commit 77ed393

Please sign in to comment.