Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/kad: Implement S-Kademlia's lookup over disjoint paths v2 #1473

Merged
merged 73 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
d8a2bad
protocols/kad: Implement S-Kademlia's lookup over disjoint paths
mxinden Feb 26, 2020
1fd2d35
protocols/kad: Add basic test for disjoint path peer iterator
mxinden Feb 26, 2020
0eabceb
protocols/kad/src/query/peers/closest.rs: Fix wrong comment
mxinden Feb 26, 2020
ba97589
protocols/kad/src/query.rs: Replace closest with disjoint of 1
mxinden Feb 26, 2020
1d963e5
src/query/peers/disjoint_closest: Only split up needed peers
mxinden Apr 1, 2020
9a2a705
protocols/kad/query/disjoint: Add license header
mxinden Apr 2, 2020
be0b9bf
protocols/kad: Expose configuration of disjoint path lookups
mxinden Apr 2, 2020
ca089ba
protocols/kad: Make disjoint_paths binary
mxinden Apr 2, 2020
17b9a68
protocols/kad/query/peers/disjoint: Add property test
mxinden Apr 3, 2020
f60ef67
Merge branch 'libp2p/master' into disjoint-paths-2
mxinden Apr 21, 2020
3e8097b
src/query/peers/disjoint: Have all closest iter share peers at init
mxinden Apr 21, 2020
4380758
protocols/kad/query: Move closest disjoint peers iter
mxinden Apr 21, 2020
189c330
protocols/kad/query/disjoint: Don't initialize will all peers
mxinden Apr 21, 2020
2b0a78f
Merge branch 'libp2p/master' into disjoint-paths-2
mxinden Apr 28, 2020
069d48a
protocols/kad/query/disjoint: Query closest iters fairly
mxinden Apr 29, 2020
5ba5616
protocols/kad/query/disjoint: Address minor TODOs
mxinden Apr 29, 2020
9d94642
protocols/kad: Fix intra doc link
mxinden Apr 29, 2020
2660a75
protocols/kad/query/disjoint: Ignore failures from other iterators
mxinden Apr 29, 2020
9e9224c
protocols/kad/query/disjoint: Shorten comment
mxinden May 6, 2020
0d7ba74
protocols/kad: Fix typo
mxinden May 6, 2020
ff358aa
protocols/kad/query/disjoint: Prevent index panics
mxinden May 6, 2020
dd714a6
protocols/kad/query/disjoint: Track state with Option
mxinden May 6, 2020
20650ee
protocols/kad/query/disjoint: Add comment
mxinden May 6, 2020
164c753
protocols/kad/query/disjoint: Reuse Arbitrary implementations
mxinden May 6, 2020
244aaa4
protocols/kad/query/disjoint: Rename test
mxinden May 6, 2020
8ea3c1b
protocols/kad/query/disjoint: Allow peer on multiple paths
mxinden May 6, 2020
79105d5
protocols/kad/query/disjoint: Rework IteratorIndex
mxinden May 11, 2020
fc727e5
protocols/kad/*: Make query parallelism configurable
mxinden May 11, 2020
c63922d
protocols/kad: Clarify parallelism for FixedPeersIter
mxinden May 12, 2020
c2e168d
protocols/kad: Improve comments
mxinden May 13, 2020
58c9447
protocols/kad/query/disjoint: Remove parallelism <= num_results
mxinden May 13, 2020
16fa97f
protocols/kad/query/disjoint: Remove additionally_awaited_by
mxinden May 13, 2020
2175b16
protocols/kad/query/disjoint: Use Iterator::cycle for iterator order
mxinden May 15, 2020
8b12f81
protocols/kad/query/disjoint: Introduce ResultIter to returning all p…
mxinden May 15, 2020
1b96ee4
Merge remote-tracking branch 'libp2p/master' into disjoint-paths-2
mxinden May 15, 2020
fe8941f
protocols/kad: Don't finish query early unless all paths are queried
mxinden May 16, 2020
1de9b99
Merge branch 'libp2p/master' into disjoint-paths-2
mxinden May 16, 2020
4e76145
protocols/kad/query/disjoint: Have quickcheck generate target
mxinden May 16, 2020
2336299
protocols/kad/behaviour/test: Have bootstrap test use disjoint paths
mxinden May 16, 2020
2a6303d
protocols/kad/behaviour/test: Make put_record&add_provider use disjoint
mxinden May 16, 2020
0fce409
protocols/kad/query/disjoint: Fix intra doc link
mxinden May 16, 2020
4c8fe6e
protocols/kad/query/disjoint: Do not initialize order for each next()
mxinden May 17, 2020
4be526e
protocols/kad/behaviour/test: Init put_record with at least 4 nodes
mxinden May 17, 2020
14a8e2f
protocols/kad/query/disjoint: Remove num_iters_query counter
mxinden May 18, 2020
ea7f3ed
protocols/kad/query/disjoint: Fix intra doc comment link
mxinden May 18, 2020
5b04baa
protocols/kad/behaviour: Rename successfull_peers to success
mxinden May 21, 2020
f5d917c
protocols/kad/behaviour/test: Adjust network size setup
mxinden May 21, 2020
05e28a7
Merge branch 'libp2p/master' into disjoint-paths-2
mxinden May 21, 2020
0500056
protocols/kad/behaviour/test: Remove newline
mxinden May 25, 2020
a39388c
protocols/kad/behaviour: Rename disjoint paths config option
mxinden Jun 10, 2020
1eb2436
protocols/kad/behaviour.rs: Introduce PeerRecord struct
mxinden Jun 10, 2020
7e64f2c
protocols/kad/behaviour: Expose PeerRecord on the public API
mxinden Jun 10, 2020
981c7aa
protocols/kad/behaviour/test.rs: Apply feedback
mxinden Jun 10, 2020
fac6375
protocols/kad/behaviour: Allow replication factor of 1 in add_provider
mxinden Jun 10, 2020
a01c960
protocols/kadquery.rs: Update may_finish comment
mxinden Jun 11, 2020
4955330
protocols/kad/query/disjoint: Prevent failure overwriting success
mxinden Jun 11, 2020
9e8e995
protocols/kad/query/closest/disjoint: Fix is_finished
mxinden Jun 11, 2020
2f10911
protocols/kad: Adjust naming and comments
mxinden Jun 11, 2020
3448f52
protocols/kad: Fix use_disjoint_query_paths renaming
mxinden Jun 11, 2020
57acffe
protocols/kad/src/lib: Expose PeerRecord and with /examples
mxinden Jun 11, 2020
80085c2
Merge branch 'libp2p/master' into disjoint-paths-2
mxinden Jun 11, 2020
5b76eda
protocols/kad/src/behaviour: Expose success peers on put record error
mxinden Jun 15, 2020
b69f24f
protocols/kad/query/disjoint: Log number finished on finish_paths
mxinden Jun 15, 2020
8e3dcf3
protocols/kad/query/disjoint: Don't notify initiator twice
mxinden Jun 15, 2020
692490a
protocols/kad/query/disjoint: Have `finish_paths` take reference
mxinden Jun 15, 2020
799f7f5
protocols/kad/behaviour: Improve `PeerRecord` documentation
mxinden Jun 15, 2020
e5542e8
Revert "protocols/kad/query/disjoint: Log number finished on finish_p…
mxinden Jun 16, 2020
d87141c
protocols/kad: Add try_finish debug logging
mxinden Jun 16, 2020
0c964ef
Merge branch 'libp2p/master' into disjoint-paths-2
mxinden Jun 16, 2020
1232c55
protocols/kad: Rework debug log and doc comments
mxinden Jun 17, 2020
63be1f3
Merge branch 'libp2p/master' into disjoint-paths-2
mxinden Jun 18, 2020
563ea78
Merge branch 'master' into disjoint-paths-2
mxinden Jun 19, 2020
6cfa7dd
Merge branch 'master' into disjoint-paths-2
mxinden Jun 19, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ use async_std::{io, task};
use futures::prelude::*;
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{
record::Key,
Kademlia,
KademliaEvent,
PeerRecord,
PutRecordOk,
QueryResult,
Quorum,
Record
Record,
record::Key,
};
use libp2p::{
NetworkBehaviour,
Expand Down Expand Up @@ -86,7 +87,7 @@ fn main() -> Result<(), Box<dyn Error>> {
match message {
KademliaEvent::QueryResult { result, .. } => match result {
QueryResult::GetRecord(Ok(ok)) => {
for Record { key, value, .. } in ok.records {
for PeerRecord { record: Record { key, value, .. }, ..} in ok.records {
println!(
"Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(),
Expand Down
1 change: 1 addition & 0 deletions protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ unsigned-varint = { version = "0.3", features = ["futures-codec"] }
void = "1.0"

[dev-dependencies]
futures-timer = "3.0"
libp2p-secio = { path = "../secio" }
libp2p-yamux = { path = "../../muxers/yamux" }
quickcheck = "0.9.0"
Expand Down
146 changes: 109 additions & 37 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,9 @@ impl Default for KademliaConfig {
impl KademliaConfig {
/// Sets a custom protocol name.
///
/// Kademlia nodes only communicate with other nodes using the same protocol name. Using a
/// custom name therefore allows to segregate the DHT from others, if that is desired.
/// Kademlia nodes only communicate with other nodes using the same protocol
/// name. Using a custom name therefore allows to segregate the DHT from
/// others, if that is desired.
romanb marked this conversation as resolved.
Show resolved Hide resolved
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
self.protocol_config.set_protocol_name(name);
self
Expand All @@ -154,10 +155,41 @@ impl KademliaConfig {
self
}

/// Sets the allowed level of parallelism for iterative queries.
///
/// The `α` parameter in the Kademlia paper. The maximum number of peers
/// that an iterative query is allowed to wait for in parallel while
/// iterating towards the closest nodes to a target. Defaults to
/// `ALPHA_VALUE`.
///
/// This only controls the level of parallelism of an iterative query, not
/// the level of parallelism of a query to a fixed set of peers.
///
/// When used with [`KademliaConfig::disjoint_query_paths`] it equals
/// the amount of disjoint paths used.
pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
self.query_config.parallelism = parallelism;
self
}

/// Require iterative queries to use disjoint paths for increased resiliency
/// in the presence of potentially adversarial nodes.
///
/// When enabled the number of disjoint paths used equals the configured
/// parallelism.
///
/// See the S/Kademlia paper for more information on the high level design
/// as well as its security improvements.
pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
self.query_config.disjoint_query_paths = enabled;
self
}

/// Sets the TTL for stored records.
///
/// The TTL should be significantly longer than the (re-)publication
/// interval, to avoid premature expiration of records. The default is 36 hours.
/// interval, to avoid premature expiration of records. The default is 36
/// hours.
///
/// `None` means records never expire.
///
Expand Down Expand Up @@ -191,10 +223,10 @@ impl KademliaConfig {

/// Sets the (re-)publication interval of stored records.
///
/// Records persist in the DHT until they expire. By default, published records
/// are re-published in regular intervals for as long as the record exists
/// in the local storage of the original publisher, thereby extending the
/// records lifetime.
/// Records persist in the DHT until they expire. By default, published
/// records are re-published in regular intervals for as long as the record
/// exists in the local storage of the original publisher, thereby extending
/// the records lifetime.
///
/// This interval should be significantly shorter than the record TTL, to
/// ensure records do not expire prematurely. The default is 24 hours.
Expand All @@ -220,7 +252,8 @@ impl KademliaConfig {
/// Sets the interval at which provider records for keys provided
/// by the local node are re-published.
///
/// `None` means that stored provider records are never automatically re-published.
/// `None` means that stored provider records are never automatically
/// re-published.
///
/// Must be significantly less than the provider record TTL.
pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
Expand All @@ -236,7 +269,8 @@ impl KademliaConfig {

/// Modifies the maximum allowed size of individual Kademlia packets.
///
/// It might be necessary to increase this value if trying to put large records.
/// It might be necessary to increase this value if trying to put large
/// records.
pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
self.protocol_config.set_max_packet_size(size);
self
Expand All @@ -247,7 +281,7 @@ impl<TStore> Kademlia<TStore>
where
for<'a> TStore: RecordStore<'a>
{
/// Creates a new `Kademlia` network behaviour with the given configuration.
/// Creates a new `Kademlia` network behaviour with a default configuration.
pub fn new(id: PeerId, store: TStore) -> Self {
Self::with_config(id, store, Default::default())
}
Expand Down Expand Up @@ -430,7 +464,7 @@ where
if record.is_expired(Instant::now()) {
self.store.remove(key)
} else {
records.push(record.into_owned());
records.push(PeerRecord{ peer: None, record: record.into_owned()});
}
}

Expand Down Expand Up @@ -892,15 +926,15 @@ where
if let Some(cache_key) = cache_at {
// Cache the record at the closest node to the key that
// did not return the record.
let record = records.first().expect("[not empty]").clone();
let record = records.first().expect("[not empty]").record.clone();
let quorum = NonZeroUsize::new(1).expect("1 > 0");
let context = PutRecordContext::Cache;
let info = QueryInfo::PutRecord {
context,
record,
quorum,
phase: PutRecordPhase::PutRecord {
num_results: 0,
success: vec![],
get_closest_peers_stats: QueryStats::empty()
}
};
Expand Down Expand Up @@ -934,7 +968,7 @@ where
record,
quorum,
phase: PutRecordPhase::PutRecord {
num_results: 0,
success: vec![],
get_closest_peers_stats: result.stats
}
};
Expand All @@ -947,13 +981,13 @@ where
context,
record,
quorum,
phase: PutRecordPhase::PutRecord { num_results, get_closest_peers_stats }
phase: PutRecordPhase::PutRecord { success, get_closest_peers_stats }
} => {
let mk_result = |key: record::Key| {
if num_results >= quorum.get() {
if success.len() >= quorum.get() {
Ok(PutRecordOk { key })
} else {
Err(PutRecordError::QuorumFailed { key, quorum, num_results })
Err(PutRecordError::QuorumFailed { key, quorum, success })
}
};
match context {
Expand Down Expand Up @@ -1050,9 +1084,9 @@ where
let err = Err(PutRecordError::Timeout {
key: record.key,
quorum,
num_results: match phase {
PutRecordPhase::GetClosestPeers => 0,
PutRecordPhase::PutRecord { num_results, .. } => num_results
success: match phase {
PutRecordPhase::GetClosestPeers => vec![],
PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
}
});
match context {
Expand Down Expand Up @@ -1098,7 +1132,7 @@ where
id: query_id,
stats: result.stats,
result: QueryResult::GetRecord(Err(
GetRecordError::Timeout { key, records, quorum }
GetRecordError::Timeout { key, records, quorum },
))
}),

Expand Down Expand Up @@ -1475,9 +1509,24 @@ where
key, records, quorum, cache_at
} = &mut query.inner.info {
if let Some(record) = record {
records.push(record);
if records.len() >= quorum.get() {
query.finish()
records.push(PeerRecord{ peer: Some(source.clone()), record });

let quorum = quorum.get();
if records.len() >= quorum {
// Desired quorum reached. The query may finish. See
// [`Query::try_finish`] for details.
let peers = records.iter()
.filter_map(|PeerRecord{ peer, .. }| peer.as_ref())
.cloned()
.collect::<Vec<_>>();
mxinden marked this conversation as resolved.
Show resolved Hide resolved
let finished = query.try_finish(peers.iter());
if !finished {
debug!(
"GetRecord query ({:?}) reached quorum ({}/{}) with \
response from peer {} but could not yet finish.",
user_data, peers.len(), quorum, source,
);
}
}
} else if quorum.get() == 1 {
// It is a "standard" Kademlia query, for which the
Expand Down Expand Up @@ -1513,11 +1562,21 @@ where
if let Some(query) = self.queries.get_mut(&user_data) {
query.on_success(&source, vec![]);
if let QueryInfo::PutRecord {
phase: PutRecordPhase::PutRecord { num_results, .. }, quorum, ..
phase: PutRecordPhase::PutRecord { success, .. }, quorum, ..
} = &mut query.inner.info {
*num_results += 1;
if *num_results >= quorum.get() {
query.finish()
success.push(source.clone());

let quorum = quorum.get();
if success.len() >= quorum {
let peers = success.clone();
let finished = query.try_finish(peers.iter());
if !finished {
debug!(
"PutRecord query ({:?}) reached quorum ({}/{}) with response \
from peer {} but could not yet finish.",
user_data, peers.len(), quorum, source,
);
}
}
}
}
Expand Down Expand Up @@ -1659,6 +1718,16 @@ impl Quorum {
}
}

/// A record either received by the given peer or retrieved from the local
/// record store.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerRecord {
/// The peer from whom the record was received. `None` if the record was
/// retrieved from local storage.
pub peer: Option<PeerId>,
mxinden marked this conversation as resolved.
Show resolved Hide resolved
pub record: Record,
}

//////////////////////////////////////////////////////////////////////////////
// Events

Expand Down Expand Up @@ -1742,7 +1811,7 @@ pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
/// The successful result of [`Kademlia::get_record`].
#[derive(Debug, Clone)]
pub struct GetRecordOk {
pub records: Vec<Record>
pub records: Vec<PeerRecord>
}

/// The error result of [`Kademlia::get_record`].
Expand All @@ -1754,12 +1823,12 @@ pub enum GetRecordError {
},
QuorumFailed {
key: record::Key,
records: Vec<Record>,
records: Vec<PeerRecord>,
quorum: NonZeroUsize
},
Timeout {
key: record::Key,
records: Vec<Record>,
records: Vec<PeerRecord>,
quorum: NonZeroUsize
}
}
Expand Down Expand Up @@ -1799,12 +1868,14 @@ pub struct PutRecordOk {
pub enum PutRecordError {
QuorumFailed {
key: record::Key,
num_results: usize,
/// [`PeerId`]s of the peers the record was successfully stored on.
success: Vec<PeerId>,
quorum: NonZeroUsize
},
Timeout {
key: record::Key,
num_results: usize,
/// [`PeerId`]s of the peers the record was successfully stored on.
success: Vec<PeerId>,
quorum: NonZeroUsize
},
}
Expand Down Expand Up @@ -2061,8 +2132,9 @@ pub enum QueryInfo {
GetRecord {
/// The key to look for.
key: record::Key,
/// The records found so far.
records: Vec<Record>,
/// The records with the id of the peer that returned them. `None` when
/// the record was found in the local store.
records: Vec<PeerRecord>,
/// The number of records to look for.
quorum: NonZeroUsize,
/// The closest peer to `key` that did not return a record.
Expand Down Expand Up @@ -2150,8 +2222,8 @@ pub enum PutRecordPhase {

/// The query is replicating the record to the closest nodes to the key.
PutRecord {
/// The number of successful replication requests so far.
num_results: usize,
/// A list of peers the given record has been successfully replicated to.
success: Vec<PeerId>,
/// Query statistics from the finished `GetClosestPeers` phase.
get_closest_peers_stats: QueryStats,
},
Expand Down
Loading