Skip to content

Commit

Permalink
fix: fix network kad discovery issue
Browse files Browse the repository at this point in the history
  • Loading branch information
jjyr committed Nov 22, 2018
1 parent ae6b14d commit bc99452
Show file tree
Hide file tree
Showing 8 changed files with 450 additions and 245 deletions.
7 changes: 5 additions & 2 deletions network/src/ckb_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ where
Err(err) => {
return {
error!(target: "network", "failed to upgrade ckb_protocol");
future::err(IoError::new(IoErrorKind::Other, err))
future::err(IoError::new(
IoErrorKind::Other,
format!("faild to upgrade ckb_protocol, error: {}", err),
))
}
}
};
Expand Down Expand Up @@ -214,7 +217,7 @@ impl<T> CKBProtocol<T> {
}
}}
Err(e) => {
future::Either::A(future::err(e))
future::Either::A(future::err(IoError::new(IoErrorKind::Other, format!("error when receive data: {}", e))))
}
}
}
Expand Down
1 change: 1 addition & 0 deletions network/src/ckb_protocol_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl CKBProtocolContext for DefaultCKBProtocolContext {
}
// disconnect from peer
fn disconnect(&self, peer_index: PeerIndex) {
debug!(target: "network", "disconnect peer {}", peer_index);
let mut peers_registry = self.network.peers_registry().write();
if let Some(peer_id) = peers_registry
.get_peer_id(peer_index)
Expand Down
594 changes: 383 additions & 211 deletions network/src/discovery_service.rs

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions network/src/memory_peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ impl PeerStore for MemoryPeerStore {
}

fn bootnodes<'a>(&'a self) -> Box<Iterator<Item = (&'a PeerId, &'a Multiaddr)> + 'a> {
let iter = self
let mut bootnodes = self
.peers_to_attempt()
.chain(self.bootnodes.iter().map(|(peer_id, addr)| (peer_id, addr)));
.chain(self.bootnodes.iter().map(|(peer_id, addr)| (peer_id, addr)))
.collect::<Vec<_>>();
bootnodes.dedup();
let iter = bootnodes.into_iter();
Box::new(iter) as Box<_>
}
fn peer_addrs<'a>(
Expand Down
71 changes: 44 additions & 27 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ckb_protocol_handler::CKBProtocolHandler;
use ckb_protocol_handler::DefaultCKBProtocolContext;
use ckb_service::CKBService;
use ckb_util::{Mutex, RwLock};
use discovery_service::DiscoveryService;
use discovery_service::{DiscoveryQueryService, DiscoveryService, KadManage};
use futures::future::{self, select_all, Future};
use futures::sync::mpsc::UnboundedSender;
use futures::sync::oneshot;
Expand Down Expand Up @@ -152,7 +152,6 @@ impl Network {
Ok(_) => {
let mut peer = peers_registry.get_mut(&peer_id).unwrap();
if let Some(addresses) = addresses {
// consider store all addresses in peerstore?
peer.append_addresses(addresses.clone());
}
if let Some(protocol_connec) = peer
Expand Down Expand Up @@ -429,30 +428,37 @@ impl Network {
),
));
}
let kad_upgrade = kad::KadConnecConfig::new();
let kad_manage = Arc::new(Mutex::new(KadManage::new(
Arc::clone(&network),
kad_upgrade.clone(),
)));
let kad_system = {
let peer_store = network.peer_store().read();
let known_initial_peers: Box<Iterator<Item = PeerId>> = Box::new(
peer_store
.bootnodes()
.map(|(peer_id, _)| peer_id.to_owned())
.take(100)
.collect::<Vec<_>>()
.into_iter(),
) as Box<_>;
Arc::new(kad::KadSystem::without_init(kad::KadSystemConfig {
parallelism: 1,
local_peer_id: local_peer_id.clone(),
kbuckets_timeout: Duration::from_secs(KBUCKETS_TIMEOUT),
request_timeout: config.discovery_timeout,
known_initial_peers,
}))
};

let ping_service = Arc::new(PingService::new(config.ping_interval, config.ping_timeout));
let discovery_service = Arc::new(DiscoveryService::new(
config.discovery_timeout,
config.discovery_response_count,
config.discovery_interval,
{
let peer_store = network.peer_store().read();
let known_initial_peers: Box<Iterator<Item = PeerId>> = Box::new(
peer_store
.bootnodes()
.map(|(peer_id, _)| peer_id.to_owned())
.take(100)
.collect::<Vec<_>>()
.into_iter(),
) as Box<_>;
kad::KadSystemConfig {
parallelism: 5,
local_peer_id: local_peer_id.clone(),
kbuckets_timeout: Duration::from_secs(KBUCKETS_TIMEOUT),
request_timeout: config.discovery_timeout,
known_initial_peers,
}
},
Arc::clone(&kad_manage),
Arc::clone(&kad_system),
));
let identify_service = Arc::new(IdentifyService {
client_version,
Expand All @@ -462,7 +468,7 @@ impl Network {
});

let ckb_protocol_service = Arc::new(CKBService {
kad_system: Arc::clone(&discovery_service.kad_system),
kad_system: Arc::clone(&kad_system),
});
let timer_service = Arc::new(TimerService {
timer_registry: Arc::clone(&timer_registry),
Expand All @@ -476,7 +482,7 @@ impl Network {
let transport = basic_transport.clone();
transport.and_then({
let network = Arc::clone(&network);
let kad_upgrade = discovery_service.kad_upgrade.clone();
let kad_upgrade = kad_upgrade.clone();
move |out, endpoint, fut| {
let peer_id = Arc::new(out.peer_id);
let original_addr = out.original_addr;
Expand Down Expand Up @@ -587,19 +593,30 @@ impl Network {
}
}

let discovery_query_service = DiscoveryQueryService::new(
Arc::clone(&network),
swarm_controller.clone(),
basic_transport.clone(),
config.discovery_interval,
Arc::clone(&kad_system),
Arc::clone(&kad_manage),
kad_upgrade.clone(),
);

// prepare services futures
let futures: Vec<Box<Future<Item = (), Error = IoError> + Send>> = vec![
Box::new(swarm_events.for_each(|_| Ok(()))),
Box::new(
discovery_query_service
.into_future()
.map(|_| ())
.map_err(|(err, _)| err),
) as Box<Future<Item = (), Error = IoError> + Send>,
ping_service.start_protocol(
Arc::clone(&network),
swarm_controller.clone(),
basic_transport.clone(),
),
// discovery_service.start_protocol(
// Arc::clone(&network),
// swarm_controller.clone(),
// basic_transport.clone(),
// ),
identify_service.start_protocol(
Arc::clone(&network),
swarm_controller.clone(),
Expand Down
2 changes: 1 addition & 1 deletion network/src/network_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl Default for NetworkConfig {
ping_timeout: Duration::from_secs(30),
discovery_timeout: Duration::from_secs(20),
discovery_response_count: 20,
discovery_interval: Duration::from_secs(30),
discovery_interval: Duration::from_secs(15),
identify_timeout: Duration::from_secs(30),
identify_interval: Duration::from_secs(15),
outgoing_timeout: Duration::from_secs(30),
Expand Down
8 changes: 7 additions & 1 deletion network/src/peers_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ impl PeersRegistry {

// registry a new peer
pub fn new_peer(&mut self, peer_id: PeerId, endpoint: Endpoint) -> Result<(), Error> {
if let Some(_) = self.peer_connections.get(&peer_id) {
return Ok(());
}
let is_reserved = self.peer_store.read().is_reserved(&peer_id);

if !is_reserved {
Expand Down Expand Up @@ -224,7 +227,8 @@ impl PeersRegistry {
}
}
let peer = PeerConnection::new(endpoint);
self.add_peer(peer_id, peer);
let peer_index = self.add_peer(peer_id.clone(), peer);
debug!(target: "network", "allocate peer_index {} to peer {:?}", peer_index,peer_id);
Ok(())
}

Expand Down Expand Up @@ -288,10 +292,12 @@ impl PeersRegistry {

#[inline]
pub fn drop_all(&mut self) {
debug!(target: "network", "drop_all");
self.peer_connections = Default::default();
}

pub(crate) fn ban_peer(&mut self, peer_id: PeerId, timeout: Duration) {
debug!(target: "network", "ban_peer: {:?}", peer_id);
self.drop_peer(&peer_id);
self.deny_list.ban_peer(peer_id, timeout);
}
Expand Down
5 changes: 4 additions & 1 deletion network/src/ping_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ impl<T: Send> ProtocolService<T> for PingService {
let peer_id = peer_id.clone();
move |mut pinger| {
pinger.ping().map(|_| peer_id).map_err(|err| {
IoError::new(IoErrorKind::Other, err)
IoError::new(
IoErrorKind::Other,
format!("pinger error {}", err),
)
})
}
});
Expand Down

0 comments on commit bc99452

Please sign in to comment.