diff --git a/CHANGELOG.md b/CHANGELOG.md index b18df2c8a41..a926161b0dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,14 @@ has no effect. [PR 1536](https://github.com/libp2p/rust-libp2p/pull/1536) +- `libp2p-kad`: Provide more insight into, and control of, the execution of + queries. All query results are now wrapped in `KademliaEvent::QueryResult`. + As a side-effect of these changes and for as long as the record storage + API is not asynchronous, local storage errors on `put_record` are reported + synchronously in a `Result`, instead of being reported asynchronously by + an event. + [PR 1567](https://github.com/libp2p/rust-libp2p/pull/1567) + - `libp2p-tcp`: On listeners started with an IPv6 multi-address the socket option `IPV6_V6ONLY` is set to true. Instead of relying on IPv4-mapped IPv6 address support, two listeners can be started if IPv4 and IPv6 should both diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index 5bf21855d32..014418d318b 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -32,7 +32,15 @@ use async_std::{io, task}; use futures::prelude::*; use libp2p::kad::record::store::MemoryStore; -use libp2p::kad::{record::Key, Kademlia, KademliaEvent, PutRecordOk, Quorum, Record}; +use libp2p::kad::{ + record::Key, + Kademlia, + KademliaEvent, + PutRecordOk, + QueryResult, + Quorum, + Record +}; use libp2p::{ NetworkBehaviour, PeerId, @@ -76,26 +84,29 @@ fn main() -> Result<(), Box> { // Called when `kademlia` produces an event. fn inject_event(&mut self, message: KademliaEvent) { match message { - KademliaEvent::GetRecordResult(Ok(result)) => { - for Record { key, value, .. } in result.records { + KademliaEvent::QueryResult { result, .. } => match result { + QueryResult::GetRecord(Ok(ok)) => { + for Record { key, value, .. } in ok.records { + println!( + "Got record {:?} {:?}", + std::str::from_utf8(key.as_ref()).unwrap(), + std::str::from_utf8(&value).unwrap(), + ); + } + } + QueryResult::GetRecord(Err(err)) => { + eprintln!("Failed to get record: {:?}", err); + } + QueryResult::PutRecord(Ok(PutRecordOk { key })) => { println!( - "Got record {:?} {:?}", - std::str::from_utf8(key.as_ref()).unwrap(), - std::str::from_utf8(&value).unwrap(), + "Successfully put record {:?}", + std::str::from_utf8(key.as_ref()).unwrap() ); } - } - KademliaEvent::GetRecordResult(Err(err)) => { - eprintln!("Failed to get record: {:?}", err); - } - KademliaEvent::PutRecordResult(Ok(PutRecordOk { key })) => { - println!( - "Successfully put record {:?}", - std::str::from_utf8(key.as_ref()).unwrap() - ); - } - KademliaEvent::PutRecordResult(Err(err)) => { - eprintln!("Failed to put record: {:?}", err); + QueryResult::PutRecord(Err(err)) => { + eprintln!("Failed to put record: {:?}", err); + } + _ => {} } _ => {} } @@ -188,7 +199,7 @@ fn handle_input_line(kademlia: &mut Kademlia, line: String) { publisher: None, expires: None, }; - kademlia.put_record(record, Quorum::One); + kademlia.put_record(record, Quorum::One).expect("Failed to store record locally."); } _ => { eprintln!("expected GET or PUT"); diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index bb1738e5815..ec48435db00 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -30,7 +30,13 @@ use libp2p::{ identity, build_development_transport }; -use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, GetClosestPeersError}; +use libp2p::kad::{ + Kademlia, + KademliaConfig, + KademliaEvent, + GetClosestPeersError, + QueryResult, +}; use libp2p::kad::record::store::MemoryStore; use std::{env, error::Error, time::Duration}; @@ -91,7 +97,10 @@ fn main() -> Result<(), Box> { task::block_on(async move { loop { let event = swarm.next().await; - if let KademliaEvent::GetClosestPeersResult(result) = event { + if let KademliaEvent::QueryResult { + result: QueryResult::GetClosestPeers(result), + .. + } = event { match result { Ok(ok) => if !ok.peers.is_empty() { diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 4401309efdf..6bfcdba7695 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -44,10 +44,14 @@ use log::{info, debug, warn}; use smallvec::SmallVec; use std::{borrow::{Borrow, Cow}, error, iter, time::Duration}; use std::collections::{HashSet, VecDeque}; +use std::fmt; use std::num::NonZeroUsize; use std::task::{Context, Poll}; +use std::vec; use wasm_timer::Instant; +pub use crate::query::QueryStats; + /// Network behaviour that handles Kademlia. pub struct Kademlia { /// The Kademlia routing table. @@ -286,6 +290,46 @@ where } } + /// Gets an iterator over immutable references to all running queries. + pub fn iter_queries<'a>(&'a self) -> impl Iterator> { + self.queries.iter().filter_map(|query| + if !query.is_finished() { + Some(QueryRef { query }) + } else { + None + }) + } + + /// Gets an iterator over mutable references to all running queries. + pub fn iter_queries_mut<'a>(&'a mut self) -> impl Iterator> { + self.queries.iter_mut().filter_map(|query| + if !query.is_finished() { + Some(QueryMut { query }) + } else { + None + }) + } + + /// Gets an immutable reference to a running query, if it exists. + pub fn query<'a>(&'a self, id: &QueryId) -> Option> { + self.queries.get(id).and_then(|query| + if !query.is_finished() { + Some(QueryRef { query }) + } else { + None + }) + } + + /// Gets a mutable reference to a running query, if it exists. + pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option> { + self.queries.get_mut(id).and_then(|query| + if !query.is_finished() { + Some(QueryMut { query }) + } else { + None + }) + } + /// Adds a known listen address of a peer participating in the DHT to the /// routing table. /// @@ -359,10 +403,11 @@ where self.kbuckets.iter().map(|entry| entry.node.key.preimage()) } - /// Performs a lookup for the closest peers to the given key. + /// Initiates an iterative query for the closest peers to the given key. /// - /// The result of this operation is delivered in [`KademliaEvent::GetClosestPeersResult`]. - pub fn get_closest_peers(&mut self, key: K) + /// The result of the query is delivered in a + /// [`KademliaEvent::QueryResult{QueryResult::GetClosestPeers}`]. + pub fn get_closest_peers(&mut self, key: K) -> QueryId where K: Borrow<[u8]> + Clone { @@ -370,13 +415,14 @@ where let target = kbucket::Key::new(key); let peers = self.kbuckets.closest_keys(&target); let inner = QueryInner::new(info); - self.queries.add_iter_closest(target.clone(), peers, inner); + self.queries.add_iter_closest(target.clone(), peers, inner) } /// Performs a lookup for a record in the DHT. /// - /// The result of this operation is delivered in [`KademliaEvent::GetRecordResult`]. - pub fn get_record(&mut self, key: &record::Key, quorum: Quorum) { + /// The result of this operation is delivered in a + /// [`KademliaEvent::QueryResult{QueryResult::GetRecord}`]. + pub fn get_record(&mut self, key: &record::Key, quorum: Quorum) -> QueryId { let quorum = quorum.eval(self.queries.config().replication_factor); let mut records = Vec::with_capacity(quorum.get()); @@ -385,25 +431,30 @@ where self.store.remove(key) } else { records.push(record.into_owned()); - if quorum.get() == 1 { - self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::GetRecordResult(Ok(GetRecordOk { records })) - )); - return; - } } } + let done = records.len() >= quorum.get(); let target = kbucket::Key::new(key.clone()); let info = QueryInfo::GetRecord { key: key.clone(), records, quorum, cache_at: None }; let peers = self.kbuckets.closest_keys(&target); let inner = QueryInner::new(info); - self.queries.add_iter_closest(target.clone(), peers, inner); + let id = self.queries.add_iter_closest(target.clone(), peers, inner); // (*) + + // Instantly finish the query if we already have enough records. + if done { + self.queries.get_mut(&id).expect("by (*)").finish(); + } + + id } /// Stores a record in the DHT. /// - /// The result of this operation is delivered in [`KademliaEvent::PutRecordResult`]. + /// Returns `Ok` if a record has been stored locally, providing the + /// `QueryId` of the initial query that replicates the record in the DHT. + /// The result of the query is eventually reported as a + /// [`KademliaEvent::QueryResult{QueryResult::PutRecord}`]. /// /// The record is always stored locally with the given expiration. If the record's /// expiration is `None`, the common case, it does not expire in local storage @@ -415,28 +466,23 @@ where /// does not update the record's expiration in local storage, thus a given record /// with an explicit expiration will always expire at that instant and until then /// is subject to regular (re-)replication and (re-)publication. - pub fn put_record(&mut self, mut record: Record, quorum: Quorum) { + pub fn put_record(&mut self, mut record: Record, quorum: Quorum) -> Result { record.publisher = Some(self.kbuckets.local_key().preimage().clone()); - if let Err(err) = self.store.put(record.clone()) { - self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::PutRecordResult(Err( - PutRecordError::LocalStorageError { - key: record.key, - cause: err, - } - )) - )); - } else { - record.expires = record.expires.or_else(|| - self.record_ttl.map(|ttl| Instant::now() + ttl)); - let quorum = quorum.eval(self.queries.config().replication_factor); - let target = kbucket::Key::new(record.key.clone()); - let peers = self.kbuckets.closest_keys(&target); - let context = PutRecordContext::Publish; - let info = QueryInfo::PreparePutRecord { record, quorum, context }; - let inner = QueryInner::new(info); - self.queries.add_iter_closest(target.clone(), peers, inner); - } + self.store.put(record.clone())?; + record.expires = record.expires.or_else(|| + self.record_ttl.map(|ttl| Instant::now() + ttl)); + let quorum = quorum.eval(self.queries.config().replication_factor); + let target = kbucket::Key::new(record.key.clone()); + let peers = self.kbuckets.closest_keys(&target); + let context = PutRecordContext::Publish; + let info = QueryInfo::PutRecord { + context, + record, + quorum, + phase: PutRecordPhase::GetClosestPeers + }; + let inner = QueryInner::new(info); + Ok(self.queries.add_iter_closest(target.clone(), peers, inner)) } /// Removes the record with the given key from _local_ storage, @@ -471,18 +517,28 @@ where /// refreshed by initiating an additional bootstrapping query for each such /// bucket with random keys. /// - /// The result(s) of this operation are delivered in [`KademliaEvent::BootstrapResult`], - /// with one event per bootstrapping query. + /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the + /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is + /// reported via [`KademliaEvent::QueryResult{QueryResult::Bootstrap}`] events, + /// with one such event per bootstrapping query. + /// + /// Returns `Err` if bootstrapping is impossible due an empty routing table. /// /// > **Note**: Bootstrapping requires at least one node of the DHT to be known. /// > See [`Kademlia::add_address`]. - pub fn bootstrap(&mut self) { + pub fn bootstrap(&mut self) -> Result { let local_key = self.kbuckets.local_key().clone(); - let info = QueryInfo::Bootstrap { peer: local_key.preimage().clone() }; + let info = QueryInfo::Bootstrap { + peer: local_key.preimage().clone(), + remaining: None + }; let peers = self.kbuckets.closest_keys(&local_key).collect::>(); - // TODO: Emit error if `peers` is empty? BootstrapError::NoPeers? - let inner = QueryInner::new(info); - self.queries.add_iter_closest(local_key, peers, inner); + if peers.is_empty() { + Err(NoKnownPeers()) + } else { + let inner = QueryInner::new(info); + Ok(self.queries.add_iter_closest(local_key, peers, inner)) + } } /// Establishes the local node as a provider of a value for the given key. @@ -491,6 +547,9 @@ where /// identity of the local node to the peers closest to the key, thus establishing /// the local node as a provider. /// + /// Returns `Ok` if a provider record has been stored locally, providing the + /// `QueryId` of the initial query that announces the local node as a provider. + /// /// The publication of the provider records is periodically repeated as per the /// configured interval, to renew the expiry and account for changes to the DHT /// topology. A provider record may be removed from local storage and @@ -503,23 +562,21 @@ where /// of the libp2p Kademlia provider API. /// /// The results of the (repeated) provider announcements sent by this node are - /// delivered in [`AddProviderResult`]. - pub fn start_providing(&mut self, key: record::Key) { + /// reported via [`KademliaEvent::QueryResult{QueryResult::AddProvider}`]. + pub fn start_providing(&mut self, key: record::Key) -> Result { let record = ProviderRecord::new(key.clone(), self.kbuckets.local_key().preimage().clone()); - if let Err(err) = self.store.add_provider(record) { - self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::StartProvidingResult(Err( - AddProviderError::LocalStorageError { key, cause: err } - )) - )); - } else { - let target = kbucket::Key::new(key.clone()); - let peers = self.kbuckets.closest_keys(&target); - let context = AddProviderContext::Publish; - let info = QueryInfo::PrepareAddProvider { key, context }; - let inner = QueryInner::new(info); - self.queries.add_iter_closest(target.clone(), peers, inner); - } + self.store.add_provider(record)?; + let target = kbucket::Key::new(key.clone()); + let peers = self.kbuckets.closest_keys(&target); + let context = AddProviderContext::Publish; + let info = QueryInfo::AddProvider { + context, + key, + phase: AddProviderPhase::GetClosestPeers + }; + let inner = QueryInner::new(info); + let id = self.queries.add_iter_closest(target.clone(), peers, inner); + Ok(id) } /// Stops the local node from announcing that it is a provider for the given key. @@ -532,8 +589,9 @@ where /// Performs a lookup for providers of a value to the given key. /// - /// The result of this operation is delivered in [`KademliaEvent::GetProvidersResult`]. - pub fn get_providers(&mut self, key: record::Key) { + /// The result of this operation is delivered in a + /// reported via [`KademliaEvent::QueryResult{QueryResult::GetProviders}`]. + pub fn get_providers(&mut self, key: record::Key) -> QueryId { let info = QueryInfo::GetProviders { key: key.clone(), providers: HashSet::new(), @@ -541,7 +599,7 @@ where let target = kbucket::Key::new(key); let peers = self.kbuckets.closest_keys(&target); let inner = QueryInner::new(info); - self.queries.add_iter_closest(target.clone(), peers, inner); + self.queries.add_iter_closest(target.clone(), peers, inner) } /// Processes discovered peers from a successful request in an iterative `Query`. @@ -608,7 +666,11 @@ where /// Starts an iterative `ADD_PROVIDER` query for the given key. fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) { - let info = QueryInfo::PrepareAddProvider { key: key.clone(), context }; + let info = QueryInfo::AddProvider { + context, + key: key.clone(), + phase: AddProviderPhase::GetClosestPeers + }; let target = kbucket::Key::new(key); let peers = self.kbuckets.closest_keys(&target); let inner = QueryInner::new(info); @@ -620,7 +682,9 @@ where let quorum = quorum.eval(self.queries.config().replication_factor); let target = kbucket::Key::new(record.key.clone()); let peers = self.kbuckets.closest_keys(&target); - let info = QueryInfo::PreparePutRecord { record, quorum, context }; + let info = QueryInfo::PutRecord { + record, quorum, context, phase: PutRecordPhase::GetClosestPeers + }; let inner = QueryInner::new(info); self.queries.add_iter_closest(target.clone(), peers, inner); } @@ -696,17 +760,19 @@ where fn query_finished(&mut self, q: Query, params: &mut impl PollParameters) -> Option { - log::trace!("Query {:?} finished.", q.id()); + let query_id = q.id(); + log::trace!("Query {:?} finished.", query_id); let result = q.into_result(); match result.inner.info { - QueryInfo::Bootstrap { peer } => { + QueryInfo::Bootstrap { peer, remaining } => { let local_key = self.kbuckets.local_key().clone(); - if &peer == local_key.preimage() { + let mut remaining = remaining.unwrap_or_else(|| { + debug_assert_eq!(&peer, local_key.preimage()); // The lookup for the local key finished. To complete the bootstrap process, // a bucket refresh should be performed for every bucket farther away than // the first non-empty bucket (which are most likely no more than the last // few, i.e. farthest, buckets). - let targets = self.kbuckets.buckets() + self.kbuckets.buckets() .skip_while(|b| b.num_entries() == 0) .skip(1) // Skip the bucket with the closest neighbour. .map(|b| { @@ -732,71 +798,112 @@ where target = kbucket::Key::new(PeerId::random()); } target - }).collect::>(); + }).collect::>().into_iter() + }); - for target in targets { - let info = QueryInfo::Bootstrap { peer: target.clone().into_preimage() }; - let peers = self.kbuckets.closest_keys(&target); - let inner = QueryInner::new(info); - self.queries.add_iter_closest(target.clone(), peers, inner); - } + let num_remaining = remaining.len().saturating_sub(1) as u32; + + if let Some(target) = remaining.next() { + let info = QueryInfo::Bootstrap { + peer: target.clone().into_preimage(), + remaining: Some(remaining) + }; + let peers = self.kbuckets.closest_keys(&target); + let inner = QueryInner::new(info); + self.queries.continue_iter_closest(query_id, target.clone(), peers, inner); } - Some(KademliaEvent::BootstrapResult(Ok(BootstrapOk { peer }))) + + Some(KademliaEvent::QueryResult { + id: query_id, + stats: result.stats, + result: QueryResult::Bootstrap(Ok(BootstrapOk { peer, num_remaining })) + }) } QueryInfo::GetClosestPeers { key, .. } => { - Some(KademliaEvent::GetClosestPeersResult(Ok( - GetClosestPeersOk { key, peers: result.peers.collect() } - ))) + Some(KademliaEvent::QueryResult { + id: query_id, + stats: result.stats, + result: QueryResult::GetClosestPeers(Ok( + GetClosestPeersOk { key, peers: result.peers.collect() } + )) + }) } QueryInfo::GetProviders { key, providers } => { - Some(KademliaEvent::GetProvidersResult(Ok( - GetProvidersOk { - key, - providers, - closest_peers: result.peers.collect() - } - ))) + Some(KademliaEvent::QueryResult { + id: query_id, + stats: result.stats, + result: QueryResult::GetProviders(Ok( + GetProvidersOk { + key, + providers, + closest_peers: result.peers.collect() + } + )) + }) } - QueryInfo::PrepareAddProvider { key, context } => { + QueryInfo::AddProvider { + context, + key, + phase: AddProviderPhase::GetClosestPeers + } => { let provider_id = params.local_peer_id().clone(); let external_addresses = params.external_addresses().collect(); let inner = QueryInner::new(QueryInfo::AddProvider { - key, - provider_id, - external_addresses, context, + key, + phase: AddProviderPhase::AddProvider { + provider_id, + external_addresses, + get_closest_peers_stats: result.stats + } }); - self.queries.add_fixed(result.peers, inner); + self.queries.continue_fixed(query_id, result.peers, inner); None } - QueryInfo::AddProvider { key, context, .. } => { + QueryInfo::AddProvider { + context, + key, + phase: AddProviderPhase::AddProvider { get_closest_peers_stats, .. } + } => { match context { AddProviderContext::Publish => { - Some(KademliaEvent::StartProvidingResult(Ok( - AddProviderOk { key } - ))) + Some(KademliaEvent::QueryResult { + id: query_id, + stats: get_closest_peers_stats.merge(result.stats), + result: QueryResult::StartProviding(Ok(AddProviderOk { key })) + }) } AddProviderContext::Republish => { - Some(KademliaEvent::RepublishProviderResult(Ok( - AddProviderOk { key } - ))) + Some(KademliaEvent::QueryResult { + id: query_id, + stats: get_closest_peers_stats.merge(result.stats), + result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })) + }) } } } QueryInfo::GetRecord { key, records, quorum, cache_at } => { - let result = if records.len() >= quorum.get() { // [not empty] + let results = if records.len() >= quorum.get() { // [not empty] if let Some(cache_key) = cache_at { // Cache the record at the closest node to the key that // did not return the record. let record = records.first().expect("[not empty]").clone(); let quorum = NonZeroUsize::new(1).expect("1 > 0"); let context = PutRecordContext::Cache; - let info = QueryInfo::PutRecord { record, quorum, context, num_results: 0 }; + let info = QueryInfo::PutRecord { + context, + record, + quorum, + phase: PutRecordPhase::PutRecord { + num_results: 0, + get_closest_peers_stats: QueryStats::empty() + } + }; let inner = QueryInner::new(info); self.queries.add_fixed(iter::once(cache_key.into_preimage()), inner); } @@ -809,18 +916,40 @@ where } else { Err(GetRecordError::QuorumFailed { key, records, quorum }) }; - Some(KademliaEvent::GetRecordResult(result)) + Some(KademliaEvent::QueryResult { + id: query_id, + stats: result.stats, + result: QueryResult::GetRecord(results) + }) } - QueryInfo::PreparePutRecord { record, quorum, context } => { - let info = QueryInfo::PutRecord { record, quorum, context, num_results: 0 }; + QueryInfo::PutRecord { + context, + record, + quorum, + phase: PutRecordPhase::GetClosestPeers + } => { + let info = QueryInfo::PutRecord { + context, + record, + quorum, + phase: PutRecordPhase::PutRecord { + num_results: 0, + get_closest_peers_stats: result.stats + } + }; let inner = QueryInner::new(info); - self.queries.add_fixed(result.peers, inner); + self.queries.continue_fixed(query_id, result.peers, inner); None } - QueryInfo::PutRecord { record, quorum, num_results, context } => { - let result = |key: record::Key| { + QueryInfo::PutRecord { + context, + record, + quorum, + phase: PutRecordPhase::PutRecord { num_results, get_closest_peers_stats } + } => { + let mk_result = |key: record::Key| { if num_results >= quorum.get() { Ok(PutRecordOk { key }) } else { @@ -829,9 +958,17 @@ where }; match context { PutRecordContext::Publish => - Some(KademliaEvent::PutRecordResult(result(record.key))), + Some(KademliaEvent::QueryResult { + id: query_id, + stats: get_closest_peers_stats.merge(result.stats), + result: QueryResult::PutRecord(mk_result(record.key)) + }), PutRecordContext::Republish => - Some(KademliaEvent::RepublishRecordResult(result(record.key))), + Some(KademliaEvent::QueryResult { + id: query_id, + stats: get_closest_peers_stats.merge(result.stats), + result: QueryResult::RepublishRecord(mk_result(record.key)) + }), PutRecordContext::Replicate => { debug!("Record replicated: {:?}", record.key); None @@ -846,99 +983,138 @@ where } /// Handles a query that timed out. - fn query_timeout(&self, query: Query) -> Option { - log::trace!("Query {:?} timed out.", query.id()); + fn query_timeout(&mut self, query: Query) -> Option { + let query_id = query.id(); + log::trace!("Query {:?} timed out.", query_id); let result = query.into_result(); match result.inner.info { - QueryInfo::Bootstrap { peer } => - Some(KademliaEvent::BootstrapResult(Err( - BootstrapError::Timeout { peer }))), + QueryInfo::Bootstrap { peer, mut remaining } => { + let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32); + + if let Some(mut remaining) = remaining.take() { + // Continue with the next bootstrap query if `remaining` is not empty. + if let Some(target) = remaining.next() { + let info = QueryInfo::Bootstrap { + peer: target.clone().into_preimage(), + remaining: Some(remaining) + }; + let peers = self.kbuckets.closest_keys(&target); + let inner = QueryInner::new(info); + self.queries.continue_iter_closest(query_id, target.clone(), peers, inner); + } + } - QueryInfo::PrepareAddProvider { key, context } => - Some(match context { - AddProviderContext::Publish => - KademliaEvent::StartProvidingResult(Err( - AddProviderError::Timeout { key })), - AddProviderContext::Republish => - KademliaEvent::RepublishProviderResult(Err( - AddProviderError::Timeout { key })), - }), + Some(KademliaEvent::QueryResult { + id: query_id, + stats: result.stats, + result: QueryResult::Bootstrap(Err( + BootstrapError::Timeout { peer, num_remaining } + )) + }) + } - QueryInfo::AddProvider { key, context, .. } => + QueryInfo::AddProvider { context, key, .. } => Some(match context { AddProviderContext::Publish => - KademliaEvent::StartProvidingResult(Err( - AddProviderError::Timeout { key })), + KademliaEvent::QueryResult { + id: query_id, + stats: result.stats, + result: QueryResult::StartProviding(Err( + AddProviderError::Timeout { key } + )) + }, AddProviderContext::Republish => - KademliaEvent::RepublishProviderResult(Err( - AddProviderError::Timeout { key })), + KademliaEvent::QueryResult { + id: query_id, + stats: result.stats, + result: QueryResult::RepublishProvider(Err( + AddProviderError::Timeout { key } + )) + } }), QueryInfo::GetClosestPeers { key } => { - Some(KademliaEvent::GetClosestPeersResult(Err( - GetClosestPeersError::Timeout { - key, - peers: result.peers.collect() - }))) + Some(KademliaEvent::QueryResult { + id: query_id, + stats: result.stats, + result: QueryResult::GetClosestPeers(Err( + GetClosestPeersError::Timeout { + key, + peers: result.peers.collect() + } + )) + }) }, - QueryInfo::PreparePutRecord { record, quorum, context, .. } => { + QueryInfo::PutRecord { record, quorum, context, phase } => { let err = Err(PutRecordError::Timeout { key: record.key, - num_results: 0, - quorum - }); - match context { - PutRecordContext::Publish => - Some(KademliaEvent::PutRecordResult(err)), - PutRecordContext::Republish => - Some(KademliaEvent::RepublishRecordResult(err)), - PutRecordContext::Replicate => { - warn!("Locating closest peers for replication failed: {:?}", err); - None + quorum, + num_results: match phase { + PutRecordPhase::GetClosestPeers => 0, + PutRecordPhase::PutRecord { num_results, .. } => num_results } - PutRecordContext::Cache => - // Caching a record at the closest peer to a key that did not return - // a record is never preceded by a lookup for the closest peers, i.e. - // it is a direct query to a single peer. - unreachable!() - } - } - - QueryInfo::PutRecord { record, quorum, num_results, context } => { - let err = Err(PutRecordError::Timeout { - key: record.key, - num_results, - quorum }); match context { PutRecordContext::Publish => - Some(KademliaEvent::PutRecordResult(err)), + Some(KademliaEvent::QueryResult { + id: query_id, + stats: result.stats, + result: QueryResult::PutRecord(err) + }), PutRecordContext::Republish => - Some(KademliaEvent::RepublishRecordResult(err)), - PutRecordContext::Replicate => { - debug!("Replicatiing record failed: {:?}", err); - None + Some(KademliaEvent::QueryResult { + id: query_id, + stats: result.stats, + result: QueryResult::RepublishRecord(err) + }), + PutRecordContext::Replicate => match phase { + PutRecordPhase::GetClosestPeers => { + warn!("Locating closest peers for replication failed: {:?}", err); + None + } + PutRecordPhase::PutRecord { .. } => { + debug!("Replicating record failed: {:?}", err); + None + } } - PutRecordContext::Cache => { - debug!("Caching record failed: {:?}", err); - None + PutRecordContext::Cache => match phase { + PutRecordPhase::GetClosestPeers => { + // Caching a record at the closest peer to a key that did not return + // a record is never preceded by a lookup for the closest peers, i.e. + // it is a direct query to a single peer. + unreachable!() + } + PutRecordPhase::PutRecord { .. } => { + debug!("Caching record failed: {:?}", err); + None + } } } } QueryInfo::GetRecord { key, records, quorum, .. } => - Some(KademliaEvent::GetRecordResult(Err( - GetRecordError::Timeout { key, records, quorum }))), + Some(KademliaEvent::QueryResult { + id: query_id, + stats: result.stats, + result: QueryResult::GetRecord(Err( + GetRecordError::Timeout { key, records, quorum } + )) + }), QueryInfo::GetProviders { key, providers } => - Some(KademliaEvent::GetProvidersResult(Err( - GetProvidersError::Timeout { - key, - providers, - closest_peers: result.peers.collect() - }))), - } + Some(KademliaEvent::QueryResult { + id: query_id, + stats: result.stats, + result: QueryResult::GetProviders(Err( + GetProvidersError::Timeout { + key, + providers, + closest_peers: result.peers.collect() + } + )) + }) + } } /// Processes a record received from a peer. @@ -1300,7 +1476,7 @@ where } = &mut query.inner.info { if let Some(record) = record { records.push(record); - if records.len() == quorum.get() { + if records.len() >= quorum.get() { query.finish() } } else if quorum.get() == 1 { @@ -1337,10 +1513,10 @@ where if let Some(query) = self.queries.get_mut(&user_data) { query.on_success(&source, vec![]); if let QueryInfo::PutRecord { - num_results, quorum, .. + phase: PutRecordPhase::PutRecord { num_results, .. }, quorum, .. } = &mut query.inner.info { *num_results += 1; - if *num_results == quorum.get() { + if *num_results >= quorum.get() { query.finish() } } @@ -1429,7 +1605,10 @@ where // better emit an event when the request has been sent (and report // an error if sending fails), instead of immediately reporting // "success" somewhat prematurely here. - if let QueryInfo::AddProvider { .. } = &query.inner.info { + if let QueryInfo::AddProvider { + phase: AddProviderPhase::AddProvider { .. }, + .. + } = &query.inner.info { query.on_success(&peer_id, vec![]) } if self.connected_peers.contains(&peer_id) { @@ -1488,29 +1667,15 @@ impl Quorum { /// See [`NetworkBehaviour::poll`]. #[derive(Debug)] pub enum KademliaEvent { - /// The result of [`Kademlia::bootstrap`]. - BootstrapResult(BootstrapResult), - - /// The result of [`Kademlia::get_closest_peers`]. - GetClosestPeersResult(GetClosestPeersResult), - - /// The result of [`Kademlia::get_providers`]. - GetProvidersResult(GetProvidersResult), - - /// The result of [`Kademlia::start_providing`]. - StartProvidingResult(AddProviderResult), - - /// The result of a (automatic) republishing of a provider record. - RepublishProviderResult(AddProviderResult), - - /// The result of [`Kademlia::get_record`]. - GetRecordResult(GetRecordResult), - - /// The result of [`Kademlia::put_record`]. - PutRecordResult(PutRecordResult), - - /// The result of a (automatic) republishing of a (value-)record. - RepublishRecordResult(PutRecordResult), + /// A query has produced a result. + QueryResult { + /// The ID of the query that finished. + id: QueryId, + /// The result of the query. + result: QueryResult, + /// Execution statistics from the query. + stats: QueryStats + }, /// A peer has been discovered during a query. Discovered { @@ -1543,6 +1708,34 @@ pub enum KademliaEvent { } } +/// The results of Kademlia queries. +#[derive(Debug)] +pub enum QueryResult { + /// The result of [`Kademlia::bootstrap`]. + Bootstrap(BootstrapResult), + + /// The result of [`Kademlia::get_closest_peers`]. + GetClosestPeers(GetClosestPeersResult), + + /// The result of [`Kademlia::get_providers`]. + GetProviders(GetProvidersResult), + + /// The result of [`Kademlia::start_providing`]. + StartProviding(AddProviderResult), + + /// The result of a (automatic) republishing of a provider record. + RepublishProvider(AddProviderResult), + + /// The result of [`Kademlia::get_record`]. + GetRecord(GetRecordResult), + + /// The result of [`Kademlia::put_record`]. + PutRecord(PutRecordResult), + + /// The result of a (automatic) republishing of a (value-)record. + RepublishRecord(PutRecordResult), +} + /// The result of [`Kademlia::get_record`]. pub type GetRecordResult = Result; @@ -1614,10 +1807,6 @@ pub enum PutRecordError { num_results: usize, quorum: NonZeroUsize }, - LocalStorageError { - key: record::Key, - cause: store::Error - } } impl PutRecordError { @@ -1626,7 +1815,6 @@ impl PutRecordError { match self { PutRecordError::QuorumFailed { key, .. } => key, PutRecordError::Timeout { key, .. } => key, - PutRecordError::LocalStorageError { key, .. } => key } } @@ -1636,7 +1824,6 @@ impl PutRecordError { match self { PutRecordError::QuorumFailed { key, .. } => key, PutRecordError::Timeout { key, .. } => key, - PutRecordError::LocalStorageError { key, .. } => key, } } } @@ -1647,13 +1834,17 @@ pub type BootstrapResult = Result; /// The successful result of [`Kademlia::bootstrap`]. #[derive(Debug, Clone)] pub struct BootstrapOk { - pub peer: PeerId + pub peer: PeerId, + pub num_remaining: u32, } /// The error result of [`Kademlia::bootstrap`]. #[derive(Debug, Clone)] pub enum BootstrapError { - Timeout { peer: PeerId } + Timeout { + peer: PeerId, + num_remaining: Option, + } } /// The result of [`Kademlia::get_closest_peers`]. @@ -1746,11 +1937,6 @@ pub enum AddProviderError { Timeout { key: record::Key, }, - /// The provider record could not be stored. - LocalStorageError { - key: record::Key, - cause: store::Error - } } impl AddProviderError { @@ -1758,16 +1944,13 @@ impl AddProviderError { pub fn key(&self) -> &record::Key { match self { AddProviderError::Timeout { key, .. } => key, - AddProviderError::LocalStorageError { key, .. } => key, } } /// Extracts the key for which the operation failed, - /// consuming the error. pub fn into_key(self) -> record::Key { match self { AddProviderError::Timeout { key, .. } => key, - AddProviderError::LocalStorageError { key, .. } => key, } } } @@ -1810,33 +1993,42 @@ impl QueryInner { } } +/// The context of a [`QueryInfo::AddProvider`] query. #[derive(Debug, Copy, Clone, PartialEq, Eq)] -enum AddProviderContext { +pub enum AddProviderContext { Publish, Republish, } +/// The context of a [`QueryInfo::PutRecord`] query. #[derive(Debug, Copy, Clone, PartialEq, Eq)] -enum PutRecordContext { +pub enum PutRecordContext { Publish, Republish, Replicate, Cache, } -/// The internal query state. -#[derive(Debug, Clone, PartialEq, Eq)] -enum QueryInfo { - /// A bootstrapping query. +/// Information about a running query. +#[derive(Debug, Clone)] +pub enum QueryInfo { + /// A query initiated by [`Kademlia::bootstrap`]. Bootstrap { /// The targeted peer ID. peer: PeerId, + /// The remaining random peer IDs to query, one per + /// bucket that still needs refreshing. + /// + /// This is `None` if the initial self-lookup has not + /// yet completed and `Some` with an exhausted iterator + /// if bootstrapping is complete. + remaining: Option>> }, - /// A query to find the closest peers to a key. + /// A query initiated by [`Kademlia::get_closest_peers`]. GetClosestPeers { key: Vec }, - /// A query for the providers of a key. + /// A query initiated by [`Kademlia::get_providers`]. GetProviders { /// The key for which to search for providers. key: record::Key, @@ -1844,49 +2036,39 @@ enum QueryInfo { providers: HashSet, }, - /// A query that searches for the closest closest nodes to a key to be - /// used in a subsequent `AddProvider` query. - PrepareAddProvider { - key: record::Key, - context: AddProviderContext, - }, - - /// A query that advertises the local node as a provider for a key. + /// A (repeated) query initiated by [`Kademlia::start_providing`]. AddProvider { + /// The record key. key: record::Key, - provider_id: PeerId, - external_addresses: Vec, + /// The current phase of the query. + phase: AddProviderPhase, + /// The execution context of the query. context: AddProviderContext, }, - /// A query that searches for the closest closest nodes to a key to be used - /// in a subsequent `PutValue` query. - PreparePutRecord { - record: Record, - quorum: NonZeroUsize, - context: PutRecordContext, - }, - - /// A query that replicates a record to other nodes. + /// A (repeated) query initiated by [`Kademlia::put_record`]. PutRecord { record: Record, + /// The expected quorum of responses w.r.t. the replication factor. quorum: NonZeroUsize, - num_results: usize, + /// The current phase of the query. + phase: PutRecordPhase, + /// The execution context of the query. context: PutRecordContext, }, - /// A query that searches for values for a key. + /// A query initiated by [`Kademlia::get_record`]. GetRecord { /// The key to look for. key: record::Key, - /// The records found. + /// The records found so far. records: Vec, /// The number of records to look for. quorum: NonZeroUsize, /// The closest peer to `key` that did not return a record. /// /// When a record is found in a standard Kademlia query (quorum == 1), - /// it is cached at this peer. + /// it is cached at this peer as soon as a record is found. cache_at: Option>, }, } @@ -1896,7 +2078,7 @@ impl QueryInfo { /// context of a query. fn to_request(&self, query_id: QueryId) -> KademliaHandlerIn { match &self { - QueryInfo::Bootstrap { peer } => KademliaHandlerIn::FindNodeReq { + QueryInfo::Bootstrap { peer, .. } => KademliaHandlerIn::FindNodeReq { key: peer.clone().into_bytes(), user_data: query_id, }, @@ -1908,35 +2090,135 @@ impl QueryInfo { key: key.clone(), user_data: query_id, }, - QueryInfo::PrepareAddProvider { key, .. } => KademliaHandlerIn::FindNodeReq { - key: key.to_vec(), - user_data: query_id, - }, - QueryInfo::AddProvider { - key, - provider_id, - external_addresses, - .. - } => KademliaHandlerIn::AddProvider { - key: key.clone(), - provider: crate::protocol::KadPeer { - node_id: provider_id.clone(), - multiaddrs: external_addresses.clone(), - connection_ty: crate::protocol::KadConnectionType::Connected, + QueryInfo::AddProvider { key, phase, .. } => match phase { + AddProviderPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq { + key: key.to_vec(), + user_data: query_id, + }, + AddProviderPhase::AddProvider { provider_id, external_addresses, .. } => { + KademliaHandlerIn::AddProvider { + key: key.clone(), + provider: crate::protocol::KadPeer { + node_id: provider_id.clone(), + multiaddrs: external_addresses.clone(), + connection_ty: crate::protocol::KadConnectionType::Connected, + } + } } }, QueryInfo::GetRecord { key, .. } => KademliaHandlerIn::GetRecord { key: key.clone(), user_data: query_id, }, - QueryInfo::PreparePutRecord { record, .. } => KademliaHandlerIn::FindNodeReq { - key: record.key.to_vec(), - user_data: query_id, - }, - QueryInfo::PutRecord { record, .. } => KademliaHandlerIn::PutRecord { - record: record.clone(), - user_data: query_id + QueryInfo::PutRecord { record, phase, .. } => match phase { + PutRecordPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq { + key: record.key.to_vec(), + user_data: query_id, + }, + PutRecordPhase::PutRecord { .. } => KademliaHandlerIn::PutRecord { + record: record.clone(), + user_data: query_id + } } } } } + +/// The phases of a [`QueryInfo::AddProvider`] query. +#[derive(Debug, Clone)] +pub enum AddProviderPhase { + /// The query is searching for the closest nodes to the record key. + GetClosestPeers, + + /// The query advertises the local node as a provider for the key to + /// the closest nodes to the key. + AddProvider { + /// The local peer ID that is advertised as a provider. + provider_id: PeerId, + /// The external addresses of the provider being advertised. + external_addresses: Vec, + /// Query statistics from the finished `GetClosestPeers` phase. + get_closest_peers_stats: QueryStats, + }, +} + +/// The phases of a [`QueryInfo::PutRecord`] query. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum PutRecordPhase { + /// The query is searching for the closest nodes to the record key. + GetClosestPeers, + + /// The query is replicating the record to the closest nodes to the key. + PutRecord { + /// The number of successful replication requests so far. + num_results: usize, + /// Query statistics from the finished `GetClosestPeers` phase. + get_closest_peers_stats: QueryStats, + }, +} + +/// A mutable reference to a running query. +pub struct QueryMut<'a> { + query: &'a mut Query, +} + +impl<'a> QueryMut<'a> { + pub fn id(&self) -> QueryId { + self.query.id() + } + + /// Gets information about the type and state of the query. + pub fn info(&self) -> &QueryInfo { + &self.query.inner.info + } + + /// Gets execution statistics about the query. + /// + /// For a multi-phase query such as `put_record`, these are the + /// statistics of the current phase. + pub fn stats(&self) -> &QueryStats { + self.query.stats() + } + + /// Finishes the query asap, without waiting for the + /// regular termination conditions. + pub fn finish(&mut self) { + self.query.finish() + } +} + +/// An immutable reference to a running query. +pub struct QueryRef<'a> { + query: &'a Query, +} + +impl<'a> QueryRef<'a> { + pub fn id(&self) -> QueryId { + self.query.id() + } + + /// Gets information about the type and state of the query. + pub fn info(&self) -> &QueryInfo { + &self.query.inner.info + } + + /// Gets execution statistics about the query. + /// + /// For a multi-phase query such as `put_record`, these are the + /// statistics of the current phase. + pub fn stats(&self) -> &QueryStats { + self.query.stats() + } +} + +/// An operation failed to due no known peers in the routing table. +#[derive(Debug, Clone)] +pub struct NoKnownPeers(); + +impl fmt::Display for NoKnownPeers { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "No known peers.") + } +} + +impl std::error::Error for NoKnownPeers {} diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index afeda26a98f..6e64f67a159 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -148,10 +148,11 @@ fn bootstrap() { .collect::>(); let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); - swarms[0].bootstrap(); + let qid = swarms[0].bootstrap().unwrap(); // Expected known peers let expected_known = swarm_ids.iter().skip(1).cloned().collect::>(); + let mut first = true; // Run test block_on( @@ -159,14 +160,23 @@ fn bootstrap() { for (i, swarm) in swarms.iter_mut().enumerate() { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(KademliaEvent::BootstrapResult(Ok(ok)))) => { + Poll::Ready(Some(KademliaEvent::QueryResult { + id, result: QueryResult::Bootstrap(Ok(ok)), .. + })) => { + assert_eq!(id, qid); assert_eq!(i, 0); - assert_eq!(ok.peer, swarm_ids[0]); - let known = swarm.kbuckets.iter() - .map(|e| e.node.key.preimage().clone()) - .collect::>(); - assert_eq!(expected_known, known); - return Poll::Ready(()) + if first { + // Bootstrapping must start with a self-lookup. + assert_eq!(ok.peer, swarm_ids[0]); + } + first = false; + if ok.num_remaining == 0 { + let known = swarm.kbuckets.iter() + .map(|e| e.node.key.preimage().clone()) + .collect::>(); + assert_eq!(expected_known, known); + return Poll::Ready(()) + } } // Ignore any other event. Poll::Ready(Some(_)) => (), @@ -206,7 +216,17 @@ fn query_iter() { // propagate forwards through the list of peers. let search_target = PeerId::random(); let search_target_key = kbucket::Key::new(search_target.clone()); - swarms[0].get_closest_peers(search_target.clone()); + let qid = swarms[0].get_closest_peers(search_target.clone()); + + match swarms[0].query(&qid) { + Some(q) => match q.info() { + QueryInfo::GetClosestPeers { key } => { + assert_eq!(&key[..], search_target.borrow() as &[u8]) + }, + i => panic!("Unexpected query info: {:?}", i) + } + None => panic!("Query not found: {:?}", qid) + } // Set up expectations. let expected_swarm_id = swarm_ids[0].clone(); @@ -220,7 +240,10 @@ fn query_iter() { for (i, swarm) in swarms.iter_mut().enumerate() { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { + Poll::Ready(Some(KademliaEvent::QueryResult { + id, result: QueryResult::GetClosestPeers(Ok(ok)), .. + })) => { + assert_eq!(id, qid); assert_eq!(&ok.key[..], search_target.as_bytes()); assert_eq!(swarm_ids[i], expected_swarm_id); assert_eq!(swarm.queries.size(), 0); @@ -270,7 +293,9 @@ fn unresponsive_not_returned_direct() { for swarm in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { + Poll::Ready(Some(KademliaEvent::QueryResult { + result: QueryResult::GetClosestPeers(Ok(ok)), .. + })) => { assert_eq!(&ok.key[..], search_target.as_bytes()); assert_eq!(ok.peers.len(), 0); return Poll::Ready(()); @@ -318,7 +343,9 @@ fn unresponsive_not_returned_indirect() { for swarm in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { + Poll::Ready(Some(KademliaEvent::QueryResult { + result: QueryResult::GetClosestPeers(Ok(ok)), .. + })) => { assert_eq!(&ok.key[..], search_target.as_bytes()); assert_eq!(ok.peers.len(), 1); assert_eq!(ok.peers[0], first_peer_id); @@ -354,14 +381,17 @@ fn get_record_not_found() { let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::>(); let target_key = record::Key::from(random_multihash()); - swarms[0].get_record(&target_key, Quorum::One); + let qid = swarms[0].get_record(&target_key, Quorum::One); block_on( poll_fn(move |ctx| { for swarm in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(KademliaEvent::GetRecordResult(Err(e)))) => { + Poll::Ready(Some(KademliaEvent::QueryResult { + id, result: QueryResult::GetRecord(Err(e)), .. + })) => { + assert_eq!(id, qid); if let GetRecordError::NotFound { key, closest_peers, } = e { assert_eq!(key, target_key); assert_eq!(closest_peers.len(), 2); @@ -426,8 +456,23 @@ fn put_record() { }) .collect::>(); + // Initiate put_record queries. + let mut qids = HashSet::new(); for r in records.values() { - swarms[0].put_record(r.clone(), Quorum::All); + let qid = swarms[0].put_record(r.clone(), Quorum::All).unwrap(); + match swarms[0].query(&qid) { + Some(q) => match q.info() { + QueryInfo::PutRecord { phase, record, .. } => { + assert_eq!(phase, &PutRecordPhase::GetClosestPeers); + assert_eq!(record.key, r.key); + assert_eq!(record.value, r.value); + assert!(record.expires.is_some()); + qids.insert(qid); + }, + i => panic!("Unexpected query info: {:?}", i) + } + None => panic!("Query not found: {:?}", qid) + } } // Each test run republishes all records once. @@ -441,8 +486,17 @@ fn put_record() { for swarm in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(KademliaEvent::PutRecordResult(res))) | - Poll::Ready(Some(KademliaEvent::RepublishRecordResult(res))) => { + Poll::Ready(Some(KademliaEvent::QueryResult { + id, result: QueryResult::PutRecord(res), stats + })) | + Poll::Ready(Some(KademliaEvent::QueryResult { + id, result: QueryResult::RepublishRecord(res), stats + })) => { + assert!(qids.is_empty() || qids.remove(&id)); + assert!(stats.duration().is_some()); + assert!(stats.num_successes() >= replication_factor.get() as u32); + assert!(stats.num_requests() >= stats.num_successes()); + assert_eq!(stats.num_failures(), 0); match res { Err(e) => panic!("{:?}", e), Ok(ok) => { @@ -541,7 +595,7 @@ fn put_record() { } #[test] -fn get_value() { +fn get_record() { let mut swarms = build_nodes(3); // Let first peer know of second peer and second peer know of third peer. @@ -556,14 +610,17 @@ fn get_value() { let record = Record::new(random_multihash(), vec![4,5,6]); swarms[1].store.put(record.clone()).unwrap(); - swarms[0].get_record(&record.key, Quorum::One); + let qid = swarms[0].get_record(&record.key, Quorum::One); block_on( poll_fn(move |ctx| { for swarm in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => { + Poll::Ready(Some(KademliaEvent::QueryResult { + id, result: QueryResult::GetRecord(Ok(ok)), .. + })) => { + assert_eq!(id, qid); assert_eq!(ok.records.len(), 1); assert_eq!(ok.records.first(), Some(&record)); return Poll::Ready(()); @@ -582,7 +639,7 @@ fn get_value() { } #[test] -fn get_value_many() { +fn get_record_many() { // TODO: Randomise let num_nodes = 12; let mut swarms = build_connected_nodes(num_nodes, 3).into_iter() @@ -597,14 +654,17 @@ fn get_value_many() { } let quorum = Quorum::N(NonZeroUsize::new(num_results).unwrap()); - swarms[0].get_record(&record.key, quorum); + let qid = swarms[0].get_record(&record.key, quorum); block_on( poll_fn(move |ctx| { for swarm in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => { + Poll::Ready(Some(KademliaEvent::QueryResult { + id, result: QueryResult::GetRecord(Ok(ok)), .. + })) => { + assert_eq!(id, qid); assert_eq!(ok.records.len(), num_results); assert_eq!(ok.records.first(), Some(&record)); return Poll::Ready(()); @@ -661,8 +721,10 @@ fn add_provider() { let mut results = Vec::new(); // Initiate the first round of publishing. + let mut qids = HashSet::new(); for k in &keys { - swarms[0].start_providing(k.clone()); + let qid = swarms[0].start_providing(k.clone()).unwrap(); + qids.insert(qid); } block_on( @@ -671,8 +733,13 @@ fn add_provider() { for swarm in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { - Poll::Ready(Some(KademliaEvent::StartProvidingResult(res))) | - Poll::Ready(Some(KademliaEvent::RepublishProviderResult(res))) => { + Poll::Ready(Some(KademliaEvent::QueryResult { + id, result: QueryResult::StartProviding(res), .. + })) | + Poll::Ready(Some(KademliaEvent::QueryResult { + id, result: QueryResult::RepublishProvider(res), .. + })) => { + assert!(qids.is_empty() || qids.remove(&id)); match res { Err(e) => panic!(e), Ok(ok) => { @@ -773,7 +840,7 @@ fn exceed_jobs_max_queries() { let (_addr, mut swarm) = build_node(); let num = JOBS_MAX_QUERIES + 1; for _ in 0 .. num { - swarm.bootstrap(); + swarm.get_closest_peers(PeerId::random()); } assert_eq!(swarm.queries.size(), num); @@ -783,8 +850,10 @@ fn exceed_jobs_max_queries() { for _ in 0 .. num { // There are no other nodes, so the queries finish instantly. if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) { - if let KademliaEvent::BootstrapResult(r) = e { - assert!(r.is_ok(), "Unexpected error") + if let KademliaEvent::QueryResult { + result: QueryResult::GetClosestPeers(Ok(r)), .. + } = e { + assert!(r.peers.is_empty()) } else { panic!("Unexpected event: {:?}", e) } diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 5da1fe07da3..aa13374f89d 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -41,6 +41,10 @@ mod dht_proto { pub use addresses::Addresses; pub use behaviour::{Kademlia, KademliaConfig, KademliaEvent, Quorum}; pub use behaviour::{ + QueryResult, + QueryInfo, + QueryStats, + BootstrapResult, BootstrapOk, BootstrapError, diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 71d95f0d75d..706ca622d39 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -91,13 +91,38 @@ impl QueryPool { where I: IntoIterator { + let id = self.next_query_id(); + self.continue_fixed(id, peers, inner); + id + } + + /// Continues an earlier query with a fixed set of peers, reusing + /// the given query ID, which must be from a query that finished + /// earlier. + pub fn continue_fixed(&mut self, id: QueryId, peers: I, inner: TInner) + where + I: IntoIterator + { + assert!(!self.queries.contains_key(&id)); let parallelism = self.config.replication_factor.get(); let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism)); - self.add(peer_iter, inner) + let query = Query::new(id, peer_iter, inner); + self.queries.insert(id, query); } /// Adds a query to the pool that iterates towards the closest peers to the target. pub fn add_iter_closest(&mut self, target: T, peers: I, inner: TInner) -> QueryId + where + T: Into, + I: IntoIterator> + { + let id = self.next_query_id(); + self.continue_iter_closest(id, target, peers, inner); + id + } + + /// Adds a query to the pool that iterates towards the closest peers to the target. + pub fn continue_iter_closest(&mut self, id: QueryId, target: T, peers: I, inner: TInner) where T: Into, I: IntoIterator> @@ -107,14 +132,13 @@ impl QueryPool { .. ClosestPeersIterConfig::default() }; let peer_iter = QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers)); - self.add(peer_iter, inner) + let query = Query::new(id, peer_iter, inner); + self.queries.insert(id, query); } - fn add(&mut self, peer_iter: QueryPeerIter, inner: TInner) -> QueryId { + fn next_query_id(&mut self) -> QueryId { let id = QueryId(self.next_id); self.next_id = self.next_id.wrapping_add(1); - let query = Query::new(id, peer_iter, inner); - self.queries.insert(id, query); id } @@ -135,7 +159,7 @@ impl QueryPool { let mut waiting = None; for (&query_id, query) in self.queries.iter_mut() { - query.started = query.started.or(Some(now)); + query.stats.start = query.stats.start.or(Some(now)); match query.next(now) { PeersIterState::Finished => { finished = Some(query_id); @@ -147,7 +171,7 @@ impl QueryPool { break } PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => { - let elapsed = now - query.started.unwrap_or(now); + let elapsed = now - query.stats.start.unwrap_or(now); if elapsed >= self.config.timeout { timeout = Some(query_id); break @@ -162,12 +186,14 @@ impl QueryPool { } if let Some(query_id) = finished { - let query = self.queries.remove(&query_id).expect("s.a."); + let mut query = self.queries.remove(&query_id).expect("s.a."); + query.stats.end = Some(now); return QueryPoolState::Finished(query) } if let Some(query_id) = timeout { - let query = self.queries.remove(&query_id).expect("s.a."); + let mut query = self.queries.remove(&query_id).expect("s.a."); + query.stats.end = Some(now); return QueryPoolState::Timeout(query) } @@ -205,9 +231,8 @@ pub struct Query { id: QueryId, /// The peer iterator that drives the query state. peer_iter: QueryPeerIter, - /// The instant when the query started (i.e. began waiting for the first - /// result from a peer). - started: Option, + /// Execution statistics of the query. + stats: QueryStats, /// The opaque inner query state. pub inner: TInner, } @@ -221,7 +246,7 @@ enum QueryPeerIter { impl Query { /// Creates a new query without starting it. fn new(id: QueryId, peer_iter: QueryPeerIter, inner: TInner) -> Self { - Query { id, inner, peer_iter, started: None } + Query { id, inner, peer_iter, stats: QueryStats::empty() } } /// Gets the unique ID of the query. @@ -229,11 +254,19 @@ impl Query { self.id } + /// Gets the current execution statistics of the query. + pub fn stats(&self) -> &QueryStats { + &self.stats + } + /// Informs the query that the attempt to contact `peer` failed. pub fn on_failure(&mut self, peer: &PeerId) { - match &mut self.peer_iter { + let updated = match &mut self.peer_iter { QueryPeerIter::Closest(iter) => iter.on_failure(peer), QueryPeerIter::Fixed(iter) => iter.on_failure(peer) + }; + if updated { + self.stats.failure += 1; } } @@ -244,9 +277,12 @@ impl Query { where I: IntoIterator { - match &mut self.peer_iter { + let updated = match &mut self.peer_iter { QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers), QueryPeerIter::Fixed(iter) => iter.on_success(peer) + }; + if updated { + self.stats.success += 1; } } @@ -260,10 +296,16 @@ impl Query { /// Advances the state of the underlying peer iterator. fn next(&mut self, now: Instant) -> PeersIterState { - match &mut self.peer_iter { + let state = match &mut self.peer_iter { QueryPeerIter::Closest(iter) => iter.next(now), QueryPeerIter::Fixed(iter) => iter.next() + }; + + if let PeersIterState::Waiting(Some(_)) = state { + self.stats.requests += 1; } + + state } /// Finishes the query prematurely. @@ -277,13 +319,24 @@ impl Query { } } + /// Checks whether the query has finished. + /// + /// A finished query is eventually reported by `QueryPool::next()` and + /// removed from the pool. + pub fn is_finished(&self) -> bool { + match &self.peer_iter { + QueryPeerIter::Closest(iter) => iter.is_finished(), + QueryPeerIter::Fixed(iter) => iter.is_finished() + } + } + /// Consumes the query, producing the final `QueryResult`. pub fn into_result(self) -> QueryResult> { let peers = match self.peer_iter { QueryPeerIter::Closest(iter) => Either::Left(iter.into_result()), QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()) }; - QueryResult { inner: self.inner, peers } + QueryResult { peers, inner: self.inner, stats: self.stats } } } @@ -292,5 +345,90 @@ pub struct QueryResult { /// The opaque inner query state. pub inner: TInner, /// The successfully contacted peers. - pub peers: TPeers + pub peers: TPeers, + /// The collected query statistics. + pub stats: QueryStats +} + +/// Execution statistics of a query. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct QueryStats { + requests: u32, + success: u32, + failure: u32, + start: Option, + end: Option +} + +impl QueryStats { + pub fn empty() -> Self { + QueryStats { + requests: 0, + success: 0, + failure: 0, + start: None, + end: None, + } + } + + /// Gets the total number of requests initiated by the query. + pub fn num_requests(&self) -> u32 { + self.requests + } + + /// Gets the number of successful requests. + pub fn num_successes(&self) -> u32 { + self.success + } + + /// Gets the number of failed requests. + pub fn num_failures(&self) -> u32 { + self.failure + } + + /// Gets the number of pending requests. + /// + /// > **Note**: A query can finish while still having pending + /// > requests, if the termination conditions are already met. + pub fn num_pending(&self) -> u32 { + self.requests - (self.success + self.failure) + } + + /// Gets the duration of the query. + /// + /// If the query has not yet finished, the duration is measured from the + /// start of the query to the current instant. + /// + /// If the query did not yet start (i.e. yield the first peer to contact), + /// `None` is returned. + pub fn duration(&self) -> Option { + if let Some(s) = self.start { + if let Some(e) = self.end { + Some(e - s) + } else { + Some(Instant::now() - s) + } + } else { + None + } + } + + /// Merges these stats with the given stats of another query, + /// e.g. to accumulate statistics from a multi-phase query. + /// + /// Counters are merged cumulatively while the instants for + /// start and end of the queries are taken as the minimum and + /// maximum, respectively. + pub fn merge(self, other: QueryStats) -> Self { + QueryStats { + requests: self.requests + other.requests, + success: self.success + other.success, + failure: self.failure + other.failure, + start: match (self.start, other.start) { + (Some(a), Some(b)) => Some(std::cmp::min(a, b)), + (a, b) => a.or(b) + }, + end: std::cmp::max(self.end, other.end) + } + } } diff --git a/protocols/kad/src/query/peers/closest.rs b/protocols/kad/src/query/peers/closest.rs index 8879c58cb69..dda9b716349 100644 --- a/protocols/kad/src/query/peers/closest.rs +++ b/protocols/kad/src/query/peers/closest.rs @@ -122,8 +122,7 @@ impl ClosestPeersIter { } } - /// Callback for delivering the result of a successful request to a peer - /// that the iterator is waiting on. + /// Callback for delivering the result of a successful request to a peer. /// /// Delivering results of requests back to the iterator allows the iterator to make /// progress. The iterator is said to make progress either when the given @@ -131,18 +130,20 @@ impl ClosestPeersIter { /// or when the iterator did not yet accumulate `num_results` closest peers and /// `closer_peers` contains a new peer, regardless of its distance to the target. /// - /// After calling this function, `next` should eventually be called again - /// to advance the state of the iterator. + /// If the iterator is currently waiting for a result from `peer`, + /// the iterator state is updated and `true` is returned. In that + /// case, after calling this function, `next` should eventually be + /// called again to obtain the new state of the iterator. /// /// If the iterator is finished, it is not currently waiting for a /// result from `peer`, or a result for `peer` has already been reported, - /// calling this function has no effect. - pub fn on_success(&mut self, peer: &PeerId, closer_peers: I) + /// calling this function has no effect and `false` is returned. + pub fn on_success(&mut self, peer: &PeerId, closer_peers: I) -> bool where I: IntoIterator { if let State::Finished = self.state { - return + return false } let key = Key::from(peer.clone()); @@ -150,7 +151,7 @@ impl ClosestPeersIter { // Mark the peer as succeeded. match self.closest_peers.entry(distance) { - Entry::Vacant(..) => return, + Entry::Vacant(..) => return false, Entry::Occupied(mut e) => match e.get().state { PeerState::Waiting(..) => { debug_assert!(self.num_waiting > 0); @@ -162,7 +163,7 @@ impl ClosestPeersIter { } PeerState::NotContacted | PeerState::Failed - | PeerState::Succeeded => return + | PeerState::Succeeded => return false } } @@ -199,28 +200,31 @@ impl ClosestPeersIter { State::Stalled } State::Finished => State::Finished - } + }; + + true } - /// Callback for informing the iterator about a failed request to a peer - /// that the iterator is waiting on. + /// Callback for informing the iterator about a failed request to a peer. /// - /// After calling this function, `next` should eventually be called again - /// to advance the state of the iterator. + /// If the iterator is currently waiting for a result from `peer`, + /// the iterator state is updated and `true` is returned. In that + /// case, after calling this function, `next` should eventually be + /// called again to obtain the new state of the iterator. /// /// If the iterator is finished, it is not currently waiting for a /// result from `peer`, or a result for `peer` has already been reported, - /// calling this function has no effect. - pub fn on_failure(&mut self, peer: &PeerId) { + /// calling this function has no effect and `false` is returned. + pub fn on_failure(&mut self, peer: &PeerId) -> bool { if let State::Finished = self.state { - return + return false } let key = Key::from(peer.clone()); let distance = key.distance(&self.target); match self.closest_peers.entry(distance) { - Entry::Vacant(_) => return, + Entry::Vacant(_) => return false, Entry::Occupied(mut e) => match e.get().state { PeerState::Waiting(_) => { debug_assert!(self.num_waiting > 0); @@ -230,9 +234,13 @@ impl ClosestPeersIter { PeerState::Unresponsive => { e.get_mut().state = PeerState::Failed } - _ => {} + PeerState::NotContacted + | PeerState::Failed + | PeerState::Succeeded => return false } } + + true } /// Returns the list of peers for which the iterator is currently waiting @@ -343,7 +351,7 @@ impl ClosestPeersIter { } /// Checks whether the iterator has finished. - pub fn finished(&self) -> bool { + pub fn is_finished(&self) -> bool { self.state == State::Finished } @@ -649,7 +657,7 @@ mod tests { match iter.next(now) { PeersIterState::Waiting(Some(p)) => { let peer2 = p.into_owned(); - iter.on_success(&peer2, closer.clone()) + assert!(iter.on_success(&peer2, closer.clone())) } PeersIterState::Finished => {} _ => panic!("Unexpectedly iter state."), @@ -689,7 +697,7 @@ mod tests { Peer { state, .. } => panic!("Unexpected peer state: {:?}", state) } - let finished = iter.finished(); + let finished = iter.is_finished(); iter.on_success(&peer, iter::empty()); let closest = iter.into_result().collect::>(); diff --git a/protocols/kad/src/query/peers/fixed.rs b/protocols/kad/src/query/peers/fixed.rs index 402a4c2ba4f..edb86ef45c5 100644 --- a/protocols/kad/src/query/peers/fixed.rs +++ b/protocols/kad/src/query/peers/fixed.rs @@ -39,6 +39,7 @@ pub struct FixedPeersIter { state: State, } +#[derive(Debug, PartialEq, Eq)] enum State { Waiting { num_waiting: usize }, Finished @@ -71,22 +72,46 @@ impl FixedPeersIter { } } - pub fn on_success(&mut self, peer: &PeerId) { + /// Callback for delivering the result of a successful request to a peer. + /// + /// If the iterator is currently waiting for a result from `peer`, + /// the iterator state is updated and `true` is returned. In that + /// case, after calling this function, `next` should eventually be + /// called again to obtain the new state of the iterator. + /// + /// If the iterator is finished, it is not currently waiting for a + /// result from `peer`, or a result for `peer` has already been reported, + /// calling this function has no effect and `false` is returned. + pub fn on_success(&mut self, peer: &PeerId) -> bool { if let State::Waiting { num_waiting } = &mut self.state { if let Some(state @ PeerState::Waiting) = self.peers.get_mut(peer) { *state = PeerState::Succeeded; *num_waiting -= 1; + return true } } + false } - pub fn on_failure(&mut self, peer: &PeerId) { + /// Callback for informing the iterator about a failed request to a peer. + /// + /// If the iterator is currently waiting for a result from `peer`, + /// the iterator state is updated and `true` is returned. In that + /// case, after calling this function, `next` should eventually be + /// called again to obtain the new state of the iterator. + /// + /// If the iterator is finished, it is not currently waiting for a + /// result from `peer`, or a result for `peer` has already been reported, + /// calling this function has no effect and `false` is returned. + pub fn on_failure(&mut self, peer: &PeerId) -> bool { if let State::Waiting { num_waiting } = &mut self.state { if let Some(state @ PeerState::Waiting) = self.peers.get_mut(peer) { *state = PeerState::Failed; *num_waiting -= 1; + return true } } + false } pub fn is_waiting(&self, peer: &PeerId) -> bool { @@ -99,6 +124,11 @@ impl FixedPeersIter { } } + /// Checks whether the iterator has finished. + pub fn is_finished(&self) -> bool { + self.state == State::Finished + } + pub fn next(&mut self) -> PeersIterState { match &mut self.state { State::Finished => return PeersIterState::Finished,