Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

chore(network): adjust default log level output #5

Merged
merged 2 commits into from
Sep 26, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 98 additions & 51 deletions core/network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ use std::{
};

use futures::{
channel::mpsc::{UnboundedReceiver, UnboundedSender},
channel::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
},
future::TryFutureExt,
pin_mut,
stream::Stream,
task::AtomicWaker,
};
use log::{debug, error, info};
use log::{debug, error, info, warn};
use parking_lot::RwLock;
use protocol::types::UserAddress;
use rand::seq::IteratorRandom;
Expand Down Expand Up @@ -114,12 +117,18 @@ impl Inner {
user_pid.get(user).and_then(|pid| pool.get(pid).cloned())
}

pub fn pid_user_addr(&self, pid: &PeerId) -> Option<UserAddress> {
let pool = self.pool.read();

pool.get(pid).map(|peer| peer.user_addr().clone())
}

pub fn add_peer(&self, peer: Peer) {
let mut pool = self.pool.write();
let mut addr_pid = self.addr_pid.write();
let mut user_pid = self.user_pid.write();

user_pid.insert(Peer::pubkey_to_addr(peer.pubkey()), peer.id().clone());
user_pid.insert(peer.user_addr().clone(), peer.id().clone());

for addr in peer.addrs().into_iter() {
addr_pid.insert(addr.clone(), peer.id().clone());
Expand Down Expand Up @@ -487,32 +496,117 @@ impl PeerManager {
condidates.into_iter().choose_multiple(&mut rng, max)
}

fn route_multi_users_message(
&mut self,
users_msg: MultiUsersMessage,
miss_tx: oneshot::Sender<Vec<UserAddress>>,
) {
let mut no_peers = vec![];
let mut connected = vec![];
let mut unconnected_peers = vec![];

for user_addr in users_msg.user_addrs.into_iter() {
if let Some(peer) = self.inner.user_peer(&user_addr) {
if let Some(sid) = self.peer_session.get(&peer.id()) {
connected.push(*sid);
} else {
unconnected_peers.push(peer);
}
} else {
no_peers.push(user_addr);
}
}

warn!("network: peer manager: no peers {:?}", no_peers);

// Send message to connected users
let tar = TargetSession::Multi(connected);
let MultiUsersMessage { msg, pri, .. } = users_msg;
let send_msg = ConnectionEvent::SendMsg { tar, msg, pri };

if self.conn_tx.unbounded_send(send_msg).is_err() {
debug!("network: connection service exit");
}

// Try connect to unconnected peers
let unconnected_addrs = unconnected_peers
.iter()
.map(Peer::owned_addrs)
.flatten()
.collect::<Vec<_>>();

self.connect_peers(unconnected_addrs);

// Report missed user addresses
let mut missed_accounts = unconnected_peers
.iter()
.map(Peer::user_addr)
.cloned()
.collect::<Vec<_>>();

missed_accounts.extend(no_peers);

if miss_tx.send(missed_accounts).is_err() {
debug!("network: peer manager route multi accounts message dropped")
}
}

fn process_event(&mut self, event: PeerManagerEvent) {
match event {
PeerManagerEvent::AddPeer { pubkey, addr, .. } => {
let user_addr = Peer::pubkey_to_addr(&pubkey).as_hex();

info!(
"network: connected: user addr: {}, multiaddr: {}",
user_addr, addr
);

self.add_peer(pubkey, addr);
}
PeerManagerEvent::UpdatePeerSession { pid, sid } => {
let user_addr = self.inner.pid_user_addr(&pid);

if let Some(sid) = sid {
info!(
"network: connected: user addr {:?} session id {}",
user_addr, sid
);

self.attach_peer_session(pid, sid);
} else {
info!("network: disconnect: user addr {:?}", user_addr);

self.detach_peer_session(&pid);
}
}
PeerManagerEvent::PeerAlive { pid } => {
let user_addr = self.inner.pid_user_addr(&pid);
info!("network: peer alive, user addr {:?}", user_addr);

self.inner.reset_retry(&pid);
}
PeerManagerEvent::RemovePeer { pid, .. } => {
let user_addr = self.inner.pid_user_addr(&pid);
info!("network: remove, user addr {:?}", user_addr);

self.remove_peer(&pid);
}
PeerManagerEvent::RemovePeerBySession { sid, .. } => {
if let Some(pid) = self.session_peer.get(&sid).cloned() {
let user_addr = self.inner.pid_user_addr(&pid);
info!("network: remove, user addr {:?}", user_addr);

self.remove_peer(&pid);
} else {
info!("network: disconnect session {}", sid);

self.detach_session_id(sid);
}
}
PeerManagerEvent::RetryPeerLater { pid, .. } => {
let user_addr = self.inner.pid_user_addr(&pid);
info!("network: retry user addr {:?} later", user_addr);

self.inner.increase_retry_count(&pid);

// TODO: reduce score base on kind, may be ban this peer for a
Expand Down Expand Up @@ -579,54 +673,7 @@ impl PeerManager {
self.inner.remove_peer_addr(&self.config.our_id, &addr);
}
PeerManagerEvent::RouteMultiUsersMessage { users_msg, miss_tx } => {
let mut no_peers = vec![];
let mut connected = vec![];
let mut unconnected_peers = vec![];

for user_addr in users_msg.user_addrs.into_iter() {
if let Some(peer) = self.inner.user_peer(&user_addr) {
if let Some(sid) = self.peer_session.get(&peer.id()) {
connected.push(*sid);
} else {
unconnected_peers.push(peer);
}
} else {
no_peers.push(user_addr);
}
}

debug!("network: peer manager: no peers {:?}", no_peers);

// Send message to connected users
let tar = TargetSession::Multi(connected);
let MultiUsersMessage { msg, pri, .. } = users_msg;
let send_msg = ConnectionEvent::SendMsg { tar, msg, pri };

if self.conn_tx.unbounded_send(send_msg).is_err() {
debug!("network: connection service exit");
}

// Try connect to unconnected peers
let unconnected_addrs = unconnected_peers
.iter()
.map(Peer::owned_addrs)
.flatten()
.collect::<Vec<_>>();

self.connect_peers(unconnected_addrs);

// Report missed user addresses
let mut missed_accounts = unconnected_peers
.iter()
.map(Peer::user_addr)
.cloned()
.collect::<Vec<_>>();

missed_accounts.extend(no_peers);

if miss_tx.send(missed_accounts).is_err() {
debug!("network: peer manager route multi accounts message dropped")
}
self.route_multi_users_message(users_msg, miss_tx);
}
}
}
Expand Down