Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libp2p-kad] More control & insight for k-buckets. #1628

Merged
merged 6 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
# 0.21.0 [????-??-??]

- More control and insight for k-buckets
([PR 1628](https://github.com/libp2p/rust-libp2p/pull/1628)).
In particular, `Kademlia::kbuckets_entries` has been removed and
replaced by `Kademlia::kbuckets`/`Kademlia::kbucket` which provide
more information than just the peer IDs. Furthermore `Kademlia::add_address`
now returns a result and two new events, `KademliaEvent::RoutablePeer`
and `KademliaEvent::PendingRoutablePeer` are introduced (but are not
required to be acted upon in order to retain existing behaviour).
For more details, see the PR description.

# 0.20.1 [2020-06-23]

Maintenance release ([PR 1623](https://github.com/libp2p/rust-libp2p/pull/1623)).
Expand Down
220 changes: 199 additions & 21 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,15 @@ use wasm_timer::Instant;

pub use crate::query::QueryStats;

/// Network behaviour that handles Kademlia.
/// `Kademlia` is a `NetworkBehaviour` that implements the libp2p
/// Kademlia protocol.
pub struct Kademlia<TStore> {
/// The Kademlia routing table.
kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,

/// The k-bucket insertion strategy.
kbucket_inserts: KademliaBucketInserts,

/// Configuration of the wire protocol.
protocol_config: KademliaProtocolConfig,

Expand Down Expand Up @@ -92,6 +96,30 @@ pub struct Kademlia<TStore> {
store: TStore,
}

/// The configurable strategies for the insertion of peers
/// and their addresses into the k-buckets of the Kademlia
/// routing table.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum KademliaBucketInserts {
/// Whenever a connection to a peer is established as a
/// result of a dialing attempt and that peer is not yet
/// in the routing table, it is inserted as long as there
/// is a free slot in the corresponding k-bucket. If the
/// k-bucket is full but still has a free pending slot,
/// it may be inserted into the routing table at a later time if an unresponsive
/// disconnected peer is evicted from the bucket.
OnConnected,
/// New peers and addresses are only added to the routing table via
/// explicit calls to [`Kademlia::add_address`].
///
/// > **Note**: Even though peers can only get into the
/// > routing table as a result of [`Kademlia::add_address`],
/// > routing table entries are still updated as peers
/// > connect and disconnect (i.e. the order of the entries
/// > as well as the network addresses).
Manual,
}

/// The configuration for the `Kademlia` behaviour.
///
/// The configuration is consumed by [`Kademlia::new`].
Expand All @@ -106,6 +134,7 @@ pub struct KademliaConfig {
provider_record_ttl: Option<Duration>,
provider_publication_interval: Option<Duration>,
connection_idle_timeout: Duration,
kbucket_inserts: KademliaBucketInserts,
}

impl Default for KademliaConfig {
Expand All @@ -120,6 +149,7 @@ impl Default for KademliaConfig {
provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
connection_idle_timeout: Duration::from_secs(10),
kbucket_inserts: KademliaBucketInserts::OnConnected,
}
}
}
Expand Down Expand Up @@ -275,6 +305,12 @@ impl KademliaConfig {
self.protocol_config.set_max_packet_size(size);
self
}

/// Sets the k-bucket insertion strategy for the Kademlia routing table.
pub fn set_kbucket_inserts(&mut self, inserts: KademliaBucketInserts) -> &mut Self {
self.kbucket_inserts = inserts;
self
}
}

impl<TStore> Kademlia<TStore>
Expand Down Expand Up @@ -312,6 +348,7 @@ where
Kademlia {
store,
kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
kbucket_inserts: config.kbucket_inserts,
protocol_config: config.protocol_config,
queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
queries: QueryPool::new(config.query_config),
Expand Down Expand Up @@ -381,7 +418,7 @@ where
///
/// If the routing table has been updated as a result of this operation,
/// a [`KademliaEvent::RoutingUpdated`] event is emitted.
pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) {
pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
let key = kbucket::Key::new(peer.clone());
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, _) => {
Expand All @@ -394,9 +431,11 @@ where
}
))
}
RoutingUpdate::Success
}
kbucket::Entry::Pending(mut entry, _) => {
entry.value().insert(address);
RoutingUpdate::Pending
}
kbucket::Entry::Absent(entry) => {
let addresses = Addresses::new(address);
Expand All @@ -415,26 +454,97 @@ where
old_peer: None,
}
));
RoutingUpdate::Success
},
kbucket::InsertResult::Full => {
debug!("Bucket full. Peer not added to routing table: {}", peer)
debug!("Bucket full. Peer not added to routing table: {}", peer);
RoutingUpdate::Failed
},
kbucket::InsertResult::Pending { disconnected } => {
self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: disconnected.into_preimage(),
condition: DialPeerCondition::Disconnected
})
});
RoutingUpdate::Pending
},
}
},
kbucket::Entry::SelfEntry => {},
kbucket::Entry::SelfEntry => RoutingUpdate::Failed,
}
}

/// Returns an iterator over all peer IDs of nodes currently contained in a bucket
/// of the Kademlia routing table.
pub fn kbuckets_entries(&mut self) -> impl Iterator<Item = &PeerId> {
self.kbuckets.iter().map(|entry| entry.node.key.preimage())
/// Removes an address of a peer from the routing table.
///
/// If the given address is the last address of the peer in the
/// routing table, the peer is removed from the routing table
/// and `Some` is returned with a view of the removed entry.
/// The same applies if the peer is currently pending insertion
/// into the routing table.
///
/// If the given peer or address is not in the routing table,
/// this is a no-op.
pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr)
-> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>>
{
let key = kbucket::Key::new(peer.clone());
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, _) => {
if entry.value().remove(address).is_err() {
Some(entry.remove()) // it is the last address, thus remove the peer.
} else {
None
}
}
kbucket::Entry::Pending(mut entry, _) => {
if entry.value().remove(address).is_err() {
Some(entry.remove()) // it is the last address, thus remove the peer.
} else {
None
}
}
kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
None
}
}
}

/// Removes a peer from the routing table.
///
/// Returns `None` if the peer was not in the routing table,
/// not even pending insertion.
pub fn remove_peer(&mut self, peer: &PeerId)
-> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>>
{
let key = kbucket::Key::new(peer.clone());
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(entry, _) => {
Some(entry.remove())
}
kbucket::Entry::Pending(entry, _) => {
Some(entry.remove())
}
kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
None
}
}
}

/// Returns an iterator over all non-empty buckets in the routing table.
pub fn kbuckets(&mut self)
mxinden marked this conversation as resolved.
Show resolved Hide resolved
-> impl Iterator<Item = kbucket::KBucketRef<kbucket::Key<PeerId>, Addresses>>
{
self.kbuckets.iter().filter(|b| !b.is_empty())
}

/// Returns the k-bucket for the distance to the given key.
///
/// Returns `None` if the given key refers to the local key.
pub fn kbucket<K>(&mut self, key: K)
-> Option<kbucket::KBucketRef<kbucket::Key<PeerId>, Addresses>>
where
K: Borrow<[u8]> + Clone
{
self.kbuckets.bucket(&kbucket::Key::new(key))
}

/// Initiates an iterative query for the closest peers to the given key.
Expand Down Expand Up @@ -723,7 +833,7 @@ where
self.queries.add_iter_closest(target.clone(), peers, inner);
}

/// Updates the connection status of a peer in the Kademlia routing table.
/// Updates the routing table with a new connection status and address of a peer.
fn connection_updated(&mut self, peer: PeerId, address: Option<Multiaddr>, new_status: NodeStatus) {
let key = kbucket::Key::new(peer.clone());
match self.kbuckets.entry(&key) {
Expand Down Expand Up @@ -755,9 +865,22 @@ where

kbucket::Entry::Absent(entry) => {
// Only connected nodes with a known address are newly inserted.
if new_status == NodeStatus::Connected {
if let Some(address) = address {
let addresses = Addresses::new(address);
if new_status != NodeStatus::Connected {
return
}
match (address, self.kbucket_inserts) {
(None, _) => {
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::UnroutablePeer { peer }
));
}
(Some(a), KademliaBucketInserts::Manual) => {
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::RoutablePeer { peer, address: a }
));
}
(Some(a), KademliaBucketInserts::OnConnected) => {
let addresses = Addresses::new(a);
match entry.insert(addresses.clone(), new_status) {
kbucket::InsertResult::Inserted => {
let event = KademliaEvent::RoutingUpdated {
Expand All @@ -769,20 +892,24 @@ where
NetworkBehaviourAction::GenerateEvent(event));
},
kbucket::InsertResult::Full => {
debug!("Bucket full. Peer not added to routing table: {}", peer)
debug!("Bucket full. Peer not added to routing table: {}", peer);
let address = addresses.first().clone();
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::RoutablePeer { peer, address }
));
},
kbucket::InsertResult::Pending { disconnected } => {
debug_assert!(!self.connected_peers.contains(disconnected.preimage()));
let address = addresses.first().clone();
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::PendingRoutablePeer { peer, address }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc says

A connection to a peer has been established

I'm not sure if that's true at this point. DialPeer is being scheduled, it may fail. Also, DialPeer may result in dialing not addresses.first(), but some other address.

Just wanted to state that this is a bit misleading :) Maybe reflect that in the doc comment on PendingRoutablePeer?

Copy link
Contributor Author

@romanb romanb Jul 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if that's true at this point. DialPeer is being scheduled, it may fail. Also, DialPeer may result in dialing not addresses.first(), but some other address.

A DialPeer is scheduled for the peer that is potentially replaced if it does not respond, i.e. for disconnected (this is a PeerId). peer is the one to whom we just connected via address and is pending insertion (if disconnected turns out unresponsive). So I think this is correct. Does that help? If you have suggestions for clarifying the documentation, feel free to open a PR!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying! I definitely missed that DialPeer is issued to disconnected, not to freshly connected peer. I was wrong, thank you for explanation!

));
self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: disconnected.into_preimage(),
condition: DialPeerCondition::Disconnected
})
},
}
} else {
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::UnroutablePeer { peer }
));
}
}
},
Expand All @@ -806,8 +933,8 @@ where
// a bucket refresh should be performed for every bucket farther away than
// the first non-empty bucket (which are most likely no more than the last
// few, i.e. farthest, buckets).
self.kbuckets.buckets()
.skip_while(|b| b.num_entries() == 0)
self.kbuckets.iter()
.skip_while(|b| b.is_empty())
.skip(1) // Skip the bucket with the closest neighbour.
.map(|b| {
// Try to find a key that falls into the bucket. While such keys can
Expand Down Expand Up @@ -1770,10 +1897,42 @@ pub enum KademliaEvent {

/// A peer has connected for whom no listen address is known.
///
/// If the peer is to be added to the local node's routing table, a known
/// If the peer is to be added to the routing table, a known
/// listen address for the peer must be provided via [`Kademlia::add_address`].
UnroutablePeer {
peer: PeerId
},

/// A connection to a peer has been established for whom a listen address
/// is known but the peer has not been added to the routing table either
/// because [`KademliaBucketInserts::Manual`] is configured or because
/// the corresponding bucket is full.
///
/// If the peer is to be included in the routing table, it must
/// must be explicitly added via [`Kademlia::add_address`], possibly after
/// removing another peer.
///
/// See [`Kademlia::kbucket`] for insight into the contents of
/// the k-bucket of `peer`.
RoutablePeer {
peer: PeerId,
address: Multiaddr,
},

/// A connection to a peer has been established for whom a listen address
/// is known but the peer is only pending insertion into the routing table
/// if the least-recently disconnected peer is unresponsive, i.e. the peer
/// may not make it into the routing table.
///
/// If the peer is to be unconditionally included in the routing table,
/// it should be explicitly added via [`Kademlia::add_address`] after
/// removing another peer.
///
/// See [`Kademlia::kbucket`] for insight into the contents of
/// the k-bucket of `peer`.
PendingRoutablePeer {
peer: PeerId,
address: Multiaddr,
}
}

Expand Down Expand Up @@ -2294,3 +2453,22 @@ impl fmt::Display for NoKnownPeers {
}

impl std::error::Error for NoKnownPeers {}

/// The possible outcomes of [`Kademlia::add_address`].
pub enum RoutingUpdate {
/// The given peer and address has been added to the routing
/// table.
mxinden marked this conversation as resolved.
Show resolved Hide resolved
Success,
/// The peer and address is pending insertion into
/// the routing table, if a disconnected peer fails
/// to respond. If the given peer and address ends up
/// in the routing table, [`KademliaEvent::RoutingUpdated`]
/// is eventually emitted.
Pending,
/// The routing table update failed, either because the
/// corresponding bucket for the peer is full and the
/// pending slot(s) are occupied, or because the given
/// peer ID is deemed invalid (e.g. refers to the local
/// peer ID).
Failed,
}
Loading