Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

Commit

Permalink
chore(network): adjust default log level output (#5)
Browse files Browse the repository at this point in the history
* chore(network): adjust default log level output

* fix(network): clippy warning
  • Loading branch information
zeroqn authored Sep 26, 2019
1 parent e1188f9 commit 4fc9997
Showing 1 changed file with 98 additions and 51 deletions.
149 changes: 98 additions & 51 deletions core/network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ use std::{
};

use futures::{
channel::mpsc::{UnboundedReceiver, UnboundedSender},
channel::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
},
future::TryFutureExt,
pin_mut,
stream::Stream,
task::AtomicWaker,
};
use log::{debug, error, info};
use log::{debug, error, info, warn};
use parking_lot::RwLock;
use protocol::types::UserAddress;
use rand::seq::IteratorRandom;
Expand Down Expand Up @@ -114,12 +117,18 @@ impl Inner {
user_pid.get(user).and_then(|pid| pool.get(pid).cloned())
}

pub fn pid_user_addr(&self, pid: &PeerId) -> Option<UserAddress> {
let pool = self.pool.read();

pool.get(pid).map(|peer| peer.user_addr().clone())
}

pub fn add_peer(&self, peer: Peer) {
let mut pool = self.pool.write();
let mut addr_pid = self.addr_pid.write();
let mut user_pid = self.user_pid.write();

user_pid.insert(Peer::pubkey_to_addr(peer.pubkey()), peer.id().clone());
user_pid.insert(peer.user_addr().clone(), peer.id().clone());

for addr in peer.addrs().into_iter() {
addr_pid.insert(addr.clone(), peer.id().clone());
Expand Down Expand Up @@ -487,32 +496,117 @@ impl PeerManager {
condidates.into_iter().choose_multiple(&mut rng, max)
}

fn route_multi_users_message(
&mut self,
users_msg: MultiUsersMessage,
miss_tx: oneshot::Sender<Vec<UserAddress>>,
) {
let mut no_peers = vec![];
let mut connected = vec![];
let mut unconnected_peers = vec![];

for user_addr in users_msg.user_addrs.into_iter() {
if let Some(peer) = self.inner.user_peer(&user_addr) {
if let Some(sid) = self.peer_session.get(&peer.id()) {
connected.push(*sid);
} else {
unconnected_peers.push(peer);
}
} else {
no_peers.push(user_addr);
}
}

warn!("network: peer manager: no peers {:?}", no_peers);

// Send message to connected users
let tar = TargetSession::Multi(connected);
let MultiUsersMessage { msg, pri, .. } = users_msg;
let send_msg = ConnectionEvent::SendMsg { tar, msg, pri };

if self.conn_tx.unbounded_send(send_msg).is_err() {
debug!("network: connection service exit");
}

// Try connect to unconnected peers
let unconnected_addrs = unconnected_peers
.iter()
.map(Peer::owned_addrs)
.flatten()
.collect::<Vec<_>>();

self.connect_peers(unconnected_addrs);

// Report missed user addresses
let mut missed_accounts = unconnected_peers
.iter()
.map(Peer::user_addr)
.cloned()
.collect::<Vec<_>>();

missed_accounts.extend(no_peers);

if miss_tx.send(missed_accounts).is_err() {
debug!("network: peer manager route multi accounts message dropped")
}
}

fn process_event(&mut self, event: PeerManagerEvent) {
match event {
PeerManagerEvent::AddPeer { pubkey, addr, .. } => {
let user_addr = Peer::pubkey_to_addr(&pubkey).as_hex();

info!(
"network: connected: user addr: {}, multiaddr: {}",
user_addr, addr
);

self.add_peer(pubkey, addr);
}
PeerManagerEvent::UpdatePeerSession { pid, sid } => {
let user_addr = self.inner.pid_user_addr(&pid);

if let Some(sid) = sid {
info!(
"network: connected: user addr {:?} session id {}",
user_addr, sid
);

self.attach_peer_session(pid, sid);
} else {
info!("network: disconnect: user addr {:?}", user_addr);

self.detach_peer_session(&pid);
}
}
PeerManagerEvent::PeerAlive { pid } => {
let user_addr = self.inner.pid_user_addr(&pid);
info!("network: peer alive, user addr {:?}", user_addr);

self.inner.reset_retry(&pid);
}
PeerManagerEvent::RemovePeer { pid, .. } => {
let user_addr = self.inner.pid_user_addr(&pid);
info!("network: remove, user addr {:?}", user_addr);

self.remove_peer(&pid);
}
PeerManagerEvent::RemovePeerBySession { sid, .. } => {
if let Some(pid) = self.session_peer.get(&sid).cloned() {
let user_addr = self.inner.pid_user_addr(&pid);
info!("network: remove, user addr {:?}", user_addr);

self.remove_peer(&pid);
} else {
info!("network: disconnect session {}", sid);

self.detach_session_id(sid);
}
}
PeerManagerEvent::RetryPeerLater { pid, .. } => {
let user_addr = self.inner.pid_user_addr(&pid);
info!("network: retry user addr {:?} later", user_addr);

self.inner.increase_retry_count(&pid);

// TODO: reduce score base on kind, may be ban this peer for a
Expand Down Expand Up @@ -579,54 +673,7 @@ impl PeerManager {
self.inner.remove_peer_addr(&self.config.our_id, &addr);
}
PeerManagerEvent::RouteMultiUsersMessage { users_msg, miss_tx } => {
let mut no_peers = vec![];
let mut connected = vec![];
let mut unconnected_peers = vec![];

for user_addr in users_msg.user_addrs.into_iter() {
if let Some(peer) = self.inner.user_peer(&user_addr) {
if let Some(sid) = self.peer_session.get(&peer.id()) {
connected.push(*sid);
} else {
unconnected_peers.push(peer);
}
} else {
no_peers.push(user_addr);
}
}

debug!("network: peer manager: no peers {:?}", no_peers);

// Send message to connected users
let tar = TargetSession::Multi(connected);
let MultiUsersMessage { msg, pri, .. } = users_msg;
let send_msg = ConnectionEvent::SendMsg { tar, msg, pri };

if self.conn_tx.unbounded_send(send_msg).is_err() {
debug!("network: connection service exit");
}

// Try connect to unconnected peers
let unconnected_addrs = unconnected_peers
.iter()
.map(Peer::owned_addrs)
.flatten()
.collect::<Vec<_>>();

self.connect_peers(unconnected_addrs);

// Report missed user addresses
let mut missed_accounts = unconnected_peers
.iter()
.map(Peer::user_addr)
.cloned()
.collect::<Vec<_>>();

missed_accounts.extend(no_peers);

if miss_tx.send(missed_accounts).is_err() {
debug!("network: peer manager route multi accounts message dropped")
}
self.route_multi_users_message(users_msg, miss_tx);
}
}
}
Expand Down

0 comments on commit 4fc9997

Please sign in to comment.