Skip to content

Commit

Permalink
feat(kad): implement automatic client mode
Browse files Browse the repository at this point in the history
Currently, the kademlia behaviour can only learn that the remote node supports kademlia on a particular connection if we successfully negotiate a stream to them.

Using the newly introduced abstractions from #3651, we don't have to attempt to establish a stream to the remote to learn whether they support kademlia on a connection but we can directly learn it from the `ConnectionEvent::RemoteProtocolsChange` event. This happens directly once a connection is established which should overall benefit the DHT.

Clients do not advertise the kademlia protocol and thus we will immediately learn that a given connection is not suitable for kadmelia requests. We may receive inbound messages from it but this does not affect the routing table.

Resolves: #2032.

Pull-Request: #3877.
  • Loading branch information
thomaseizinger authored May 31, 2023
1 parent 7f86544 commit 92c8cc4
Show file tree
Hide file tree
Showing 8 changed files with 490 additions and 103 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
- Remove deprecated public modules `handler`, `protocol` and `kbucket`.
See [PR 3896].

- Automatically configure client/server mode based on external addresses.
If we have or learn about an external address of our node, we operate in server-mode and thus allow inbound requests.
By default, a node is in client-mode and only allows outbound requests.
See [PR 3877].

[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3877]: https://github.com/libp2p/rust-libp2p/pull/3877
[PR 3896]: https://github.com/libp2p/rust-libp2p/pull/3896

## 0.43.3
Expand Down
4 changes: 4 additions & 0 deletions protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ serde = { version = "1.0", optional = true, features = ["derive"] }
thiserror = "1"

[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }
env_logger = "0.10.0"
futures-timer = "3.0"
libp2p-identify = { path = "../identify" }
libp2p-noise = { workspace = true }
libp2p-swarm = { path = "../../swarm", features = ["macros"] }
libp2p-swarm-test = { path = "../../swarm-test" }
libp2p-yamux = { workspace = true }
quickcheck = { workspace = true }

Expand Down
151 changes: 125 additions & 26 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
mod test;

use crate::addresses::Addresses;
use crate::handler::{
KademliaHandler, KademliaHandlerConfig, KademliaHandlerEvent, KademliaHandlerIn,
KademliaRequestId,
};
use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId};
use crate::jobs::*;
use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus};
use crate::protocol::{KadConnectionType, KadPeer, KademliaProtocolConfig};
Expand All @@ -52,7 +49,7 @@ use libp2p_swarm::{
};
use log::{debug, info, warn};
use smallvec::SmallVec;
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::fmt;
use std::num::NonZeroUsize;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -109,11 +106,15 @@ pub struct Kademlia<TStore> {

external_addresses: ExternalAddresses,

connections: HashMap<ConnectionId, PeerId>,

/// See [`KademliaConfig::caching`].
caching: KademliaCaching,

local_peer_id: PeerId,

mode: Mode,

/// The record storage.
store: TStore,
}
Expand Down Expand Up @@ -453,6 +454,8 @@ where
connection_idle_timeout: config.connection_idle_timeout,
external_addresses: Default::default(),
local_peer_id: id,
connections: Default::default(),
mode: Mode::Client,
}
}

Expand Down Expand Up @@ -1937,9 +1940,12 @@ where
ConnectionClosed {
peer_id,
remaining_established,
connection_id,
..
}: ConnectionClosed<<Self as NetworkBehaviour>::ConnectionHandler>,
) {
self.connections.remove(&connection_id);

if remaining_established == 0 {
for query in self.queries.iter_mut() {
query.on_failure(&peer_id);
Expand All @@ -1964,43 +1970,45 @@ where

fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
let connected_point = ConnectedPoint::Listener {
local_addr: local_addr.clone(),
send_back_addr: remote_addr.clone(),
};
self.connections.insert(connection_id, peer);

Ok(KademliaHandler::new(
KademliaHandlerConfig {
protocol_config: self.protocol_config.clone(),
allow_listening: true,
idle_timeout: self.connection_idle_timeout,
},
ConnectedPoint::Listener {
local_addr: local_addr.clone(),
send_back_addr: remote_addr.clone(),
},
self.protocol_config.clone(),
self.connection_idle_timeout,
connected_point,
peer,
self.mode,
))
}

fn handle_established_outbound_connection(
&mut self,
_connection_id: ConnectionId,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
let connected_point = ConnectedPoint::Dialer {
address: addr.clone(),
role_override,
};
self.connections.insert(connection_id, peer);

Ok(KademliaHandler::new(
KademliaHandlerConfig {
protocol_config: self.protocol_config.clone(),
allow_listening: true,
idle_timeout: self.connection_idle_timeout,
},
ConnectedPoint::Dialer {
address: addr.clone(),
role_override,
},
self.protocol_config.clone(),
self.connection_idle_timeout,
connected_point,
peer,
self.mode,
))
}

Expand Down Expand Up @@ -2055,9 +2063,18 @@ where
ConnectedPoint::Dialer { address, .. } => Some(address),
ConnectedPoint::Listener { .. } => None,
};

self.connection_updated(source, address, NodeStatus::Connected);
}

KademliaHandlerEvent::ProtocolNotSupported { endpoint } => {
let address = match endpoint {
ConnectedPoint::Dialer { address, .. } => Some(address),
ConnectedPoint::Listener { .. } => None,
};
self.connection_updated(source, address, NodeStatus::Disconnected);
}

KademliaHandlerEvent::FindNodeReq { key, request_id } => {
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);

Expand Down Expand Up @@ -2419,7 +2436,63 @@ where

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
self.listen_addresses.on_swarm_event(&event);
self.external_addresses.on_swarm_event(&event);
let external_addresses_changed = self.external_addresses.on_swarm_event(&event);

self.mode = match (self.external_addresses.as_slice(), self.mode) {
([], Mode::Server) => {
log::debug!("Switching to client-mode because we no longer have any confirmed external addresses");

Mode::Client
}
([], Mode::Client) => {
// Previously client-mode, now also client-mode because no external addresses.

Mode::Client
}
(confirmed_external_addresses, Mode::Client) => {
if log::log_enabled!(log::Level::Debug) {
let confirmed_external_addresses =
to_comma_separated_list(confirmed_external_addresses);

log::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
}

Mode::Server
}
(confirmed_external_addresses, Mode::Server) => {
debug_assert!(
!confirmed_external_addresses.is_empty(),
"Previous match arm handled empty list"
);

// Previously, server-mode, now also server-mode because > 1 external address. Don't log anything to avoid spam.

Mode::Server
}
};

if external_addresses_changed && !self.connections.is_empty() {
let num_connections = self.connections.len();

log::debug!(
"External addresses changed, re-configuring {} established connection{}",
num_connections,
if num_connections > 1 { "s" } else { "" }
);

self.queued_events
.extend(
self.connections
.iter()
.map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*conn_id),
event: KademliaHandlerIn::ReconfigureMode {
new_mode: self.mode,
},
}),
);
}

match event {
FromSwarm::ConnectionEstablished(connection_established) => {
Expand Down Expand Up @@ -3187,3 +3260,29 @@ pub enum RoutingUpdate {
/// peer ID).
Failed,
}

#[derive(PartialEq, Copy, Clone, Debug)]
pub enum Mode {
Client,
Server,
}

impl fmt::Display for Mode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Mode::Client => write!(f, "client"),
Mode::Server => write!(f, "server"),
}
}
}

fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
where
T: ToString,
{
confirmed_external_addresses
.iter()
.map(|addr| addr.to_string())
.collect::<Vec<_>>()
.join(", ")
}
1 change: 1 addition & 0 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) {

let address: Multiaddr = Protocol::Memory(random::<u64>()).into();
swarm.listen_on(address.clone()).unwrap();
swarm.add_external_address(address.clone());

(address, swarm)
}
Expand Down
Loading

0 comments on commit 92c8cc4

Please sign in to comment.