Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow customizing the delay before closing a Kademlia connection #1477

Merged
merged 7 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod test;

use crate::K_VALUE;
use crate::addresses::Addresses;
use crate::handler::{KademliaHandler, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
use crate::handler::{KademliaHandler, KademliaHandlerProto, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
use crate::jobs::*;
use crate::kbucket::{self, KBucketsTable, NodeStatus};
use crate::protocol::{KadConnectionType, KadPeer};
Expand Down Expand Up @@ -77,6 +77,9 @@ pub struct Kademlia<TStore> {
/// The TTL of provider records.
provider_record_ttl: Option<Duration>,

/// How long to keep connections alive when they're idle.
idle_keep_alive: Duration,
tomaka marked this conversation as resolved.
Show resolved Hide resolved

/// Queued events to return when the behaviour is being polled.
queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>,

Expand All @@ -97,6 +100,7 @@ pub struct KademliaConfig {
record_publication_interval: Option<Duration>,
provider_record_ttl: Option<Duration>,
provider_publication_interval: Option<Duration>,
idle_keep_alive: Duration,
}

impl Default for KademliaConfig {
Expand All @@ -110,6 +114,7 @@ impl Default for KademliaConfig {
record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
idle_keep_alive: Duration::from_secs(10),
}
}
}
Expand Down Expand Up @@ -217,6 +222,12 @@ impl KademliaConfig {
self.provider_publication_interval = interval;
self
}

/// Sets the amount of time to keep connections alive when they're idle.
pub fn set_idle_keep_alive(&mut self, duration: Duration) -> &mut Self {
tomaka marked this conversation as resolved.
Show resolved Hide resolved
self.idle_keep_alive = duration;
self
}
}

impl<TStore> Kademlia<TStore>
Expand Down Expand Up @@ -264,6 +275,7 @@ where
put_record_job,
record_ttl: config.record_ttl,
provider_record_ttl: config.provider_record_ttl,
idle_keep_alive: config.idle_keep_alive,
}
}

Expand Down Expand Up @@ -1035,11 +1047,12 @@ where
for<'a> TStore: RecordStore<'a>,
TStore: Send + 'static,
{
type ProtocolsHandler = KademliaHandler<QueryId>;
type ProtocolsHandler = KademliaHandlerProto<QueryId>;
type OutEvent = KademliaEvent;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
let mut handler = KademliaHandler::dial_and_listen();
let mut handler = KademliaHandlerProto::dial_and_listen()
.with_idle_keep_alive(self.idle_keep_alive);
if let Some(name) = self.protocol_name_override.as_ref() {
handler = handler.with_protocol_name(name.clone());
}
Expand Down Expand Up @@ -1322,7 +1335,7 @@ where

fn poll(&mut self, cx: &mut Context, parameters: &mut impl PollParameters) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
<KademliaHandler<QueryId> as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
Expand Down
85 changes: 68 additions & 17 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,36 @@ use libp2p_swarm::{
NegotiatedSubstream,
KeepAlive,
SubstreamProtocol,
IntoProtocolsHandler,
ProtocolsHandler,
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};
use libp2p_core::{
PeerId,
ConnectedPoint,
either::EitherOutput,
upgrade::{self, InboundUpgrade, OutboundUpgrade}
};
use log::trace;
use std::{borrow::Cow, error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration};
use std::{borrow::Cow, error, fmt, io, marker::PhantomData, pin::Pin, task::{Context, Poll}, time::Duration};
use wasm_timer::Instant;

/// Prototype for a [`KademliaHandler`].
pub struct KademliaHandlerProto<TUserData> {
/// Configuration for the Kademlia protocol.
config: KademliaProtocolConfig,

/// If false, we always refuse incoming Kademlia substreams.
allow_listening: bool,

/// How long to keep connections alive when they're idle.
idle_keep_alive: Duration,

/// Marker to keep the generic pinned.
marker: PhantomData<TUserData>,
}

/// Protocol handler that handles Kademlia communications with the remote.
///
/// The handler will automatically open a Kademlia substream with the remote for each request we
Expand All @@ -61,6 +79,9 @@ pub struct KademliaHandler<TUserData> {

/// Until when to keep the connection alive.
keep_alive: KeepAlive,

/// How long to keep connections alive when they're idle.
idle_keep_alive: Duration,
}

/// State of an active substream, opened either by us or by the remote.
Expand Down Expand Up @@ -368,28 +389,27 @@ pub struct KademliaRequestId {
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct UniqueConnecId(u64);

impl<TUserData> KademliaHandler<TUserData> {
/// Create a `KademliaHandler` that only allows sending messages to the remote but denying
/// incoming connections.
impl<TUserData> KademliaHandlerProto<TUserData> {
/// Create a `KademliaHandlerProto` that only allows sending messages to the remote but
/// denying incoming connections.
pub fn dial_only() -> Self {
KademliaHandler::with_allow_listening(false)
KademliaHandlerProto::with_allow_listening(false)
}

/// Create a `KademliaHandler` that only allows sending messages but also receive incoming
/// requests.
/// Create a `KademliaHandlerProto` that only allows sending messages but also receive
/// incoming requests.
///
/// The `Default` trait implementation wraps around this function.
pub fn dial_and_listen() -> Self {
KademliaHandler::with_allow_listening(true)
KademliaHandlerProto::with_allow_listening(true)
}

fn with_allow_listening(allow_listening: bool) -> Self {
KademliaHandler {
KademliaHandlerProto {
config: Default::default(),
allow_listening,
next_connec_unique_id: UniqueConnecId(0),
substreams: Vec::new(),
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(10)),
idle_keep_alive: Duration::from_secs(10),
marker: PhantomData,
}
}

Expand All @@ -399,12 +419,43 @@ impl<TUserData> KademliaHandler<TUserData> {
self.config = self.config.with_protocol_name(name);
self
}

/// Modifies the amount of time to keep connections alive when they're idle.
pub fn with_idle_keep_alive(mut self, duration: Duration) -> Self {
self.idle_keep_alive = duration;
self
}
}

impl<TUserData> Default for KademliaHandler<TUserData> {
#[inline]
impl<TUserData> Default for KademliaHandlerProto<TUserData> {
fn default() -> Self {
KademliaHandler::dial_and_listen()
KademliaHandlerProto::dial_and_listen()
}
}

impl<TUserData> IntoProtocolsHandler for KademliaHandlerProto<TUserData>
where
TUserData: Clone + Send + 'static,
{
type Handler = KademliaHandler<TUserData>;

fn into_handler(self, _: &PeerId, _: &ConnectedPoint) -> Self::Handler {
KademliaHandler {
config: self.config,
allow_listening: self.allow_listening,
next_connec_unique_id: UniqueConnecId(0),
substreams: Vec::new(),
keep_alive: KeepAlive::Until(Instant::now() + self.idle_keep_alive),
idle_keep_alive: self.idle_keep_alive,
}
}

fn inbound_protocol(&self) -> <Self::Handler as ProtocolsHandler>::InboundProtocol {
if self.allow_listening {
upgrade::EitherUpgrade::A(self.config.clone())
} else {
upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade)
}
}
}

Expand Down Expand Up @@ -642,7 +693,7 @@ where
}
(None, Some(event), _) => {
if self.substreams.is_empty() {
self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10));
self.keep_alive = KeepAlive::Until(Instant::now() + self.idle_keep_alive);
}
return Poll::Ready(event);
}
Expand All @@ -663,7 +714,7 @@ where

if self.substreams.is_empty() {
// We destroyed all substreams in this function.
self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10));
self.keep_alive = KeepAlive::Until(Instant::now() + self.idle_keep_alive);
} else {
self.keep_alive = KeepAlive::Yes;
}
Expand Down