diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index b9cd82897c24..21863d0ed39f 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -30,7 +30,7 @@ jobs: images: ghcr.io/${{ github.repository }}-server - name: Build and push - uses: docker/build-push-action@v5 + uses: docker/build-push-action@v6 with: context: . file: ./misc/server/Dockerfile diff --git a/Cargo.lock b/Cargo.lock index efa620c9f412..9b25b574a46d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1212,16 +1212,15 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "4.1.2" +version = "4.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" dependencies = [ "cfg-if", "cpufeatures", "curve25519-dalek-derive", "digest 0.10.7", "fiat-crypto", - "platforms", "rustc_version", "subtle", "zeroize", @@ -2837,7 +2836,7 @@ dependencies = [ [[package]] name = "libp2p-identify" -version = "0.44.2" +version = "0.45.0" dependencies = [ "async-std", "asynchronous-codec", @@ -4355,12 +4354,6 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" -[[package]] -name = "platforms" -version = "3.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3d7ddaed09e0eb771a79ab0fd64609ba0afb0a8366421957936ad14cbd13630" - [[package]] name = "plotters" version = "0.3.5" diff --git a/Cargo.toml b/Cargo.toml index 3f7d8ab7c0d4..76bb4f18b3d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ libp2p-dcutr = { version = "0.11.1", path = "protocols/dcutr" } libp2p-dns = { version = "0.41.1", path = "transports/dns" } libp2p-floodsub = { version = "0.44.0", path = "protocols/floodsub" } libp2p-gossipsub = { version = "0.46.2", path = "protocols/gossipsub" } -libp2p-identify = { version = "0.44.2", path = "protocols/identify" } +libp2p-identify = { version = "0.45.0", path = "protocols/identify" } libp2p-identity = { version = "0.2.9" } libp2p-kad = { version = "0.46.0", path = "protocols/kad" } libp2p-mdns = { version = "0.45.1", path = "protocols/mdns" } diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 633b627d41db..b7f3c3c4640a 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.41.3 +## 0.41.3 - Use `web-time` instead of `instant`. See [PR 5347](https://github.com/libp2p/rust-libp2p/pull/5347). diff --git a/core/Cargo.toml b/core/Cargo.toml index 6831eb54c94c..76021c151865 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -35,8 +35,8 @@ void = "1" [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } -libp2p-mplex = { path = "../muxers/mplex" } # Using `path` here because this is a cyclic dev-dependency which otherwise breaks releasing. -libp2p-noise = { path = "../transports/noise" } # Using `path` here because this is a cyclic dev-dependency which otherwise breaks releasing. +libp2p-mplex = { path = "../muxers/mplex" } # Using `path` here because this is a cyclic dev-dependency which otherwise breaks releasing. +libp2p-noise = { path = "../transports/noise" } # Using `path` here because this is a cyclic dev-dependency which otherwise breaks releasing. multihash = { workspace = true, features = ["arb"] } quickcheck = { workspace = true } libp2p-identity = { workspace = true, features = ["ed25519", "rand"] } diff --git a/core/src/lib.rs b/core/src/lib.rs index abb83481d6cf..3ba153e19273 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -56,6 +56,7 @@ pub mod transport; pub mod upgrade; pub use connection::{ConnectedPoint, Endpoint}; +pub use libp2p_identity::PeerId; pub use multiaddr::Multiaddr; pub use multihash; pub use muxing::StreamMuxer; diff --git a/libp2p/CHANGELOG.md b/libp2p/CHANGELOG.md index 23efbc21fd97..977d9f919247 100644 --- a/libp2p/CHANGELOG.md +++ b/libp2p/CHANGELOG.md @@ -2,6 +2,7 @@ - Update individual crates. - Update to [`libp2p-kad` `v0.46.0`](protocols/kad/CHANGELOG.md#0460). + - Update to [`libp2p-identify` `v0.45.0`](protocols/identify/CHANGELOG.md#0450). - Raise MSRV to 1.73. See [PR 5266](https://github.com/libp2p/rust-libp2p/pull/5266). diff --git a/misc/server/CHANGELOG.md b/misc/server/CHANGELOG.md index 254ab1d92be8..5369163460c7 100644 --- a/misc/server/CHANGELOG.md +++ b/misc/server/CHANGELOG.md @@ -4,6 +4,8 @@ - Use periodic and automatic bootstrap of Kademlia. See [PR 4838](https://github.com/libp2p/rust-libp2p/pull/4838). +- Update to [`libp2p-identify` `v0.45.0`](protocols/identify/CHANGELOG.md#0450). + See [PR 4981](https://github.com/libp2p/rust-libp2p/pull/4981). ## 0.12.6 diff --git a/misc/server/src/main.rs b/misc/server/src/main.rs index 6dfa035f3b10..eb28b9ad5c27 100644 --- a/misc/server/src/main.rs +++ b/misc/server/src/main.rs @@ -138,6 +138,7 @@ async fn main() -> Result<(), Box> { protocols, .. }, + .. } = e { if protocols.iter().any(|p| *p == kad::PROTOCOL_NAME) { diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index 83984448d073..b8ab3f8df507 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.45.0 + +- Add `ConnectionId` in `Event`. + See [PR 4981](https://github.com/libp2p/rust-libp2p/pull/4981). + ## 0.44.2 - Emit `ToSwarm::NewExternalAddrOfPeer` for all external addresses of remote peers. diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 86425d580e93..320c8465650f 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-identify" edition = "2021" rust-version = { workspace = true } description = "Nodes identification protocol for libp2p" -version = "0.44.2" +version = "0.45.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 43bddb52fe7f..92a0dc461035 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -258,7 +258,7 @@ impl NetworkBehaviour for Behaviour { fn on_connection_handler_event( &mut self, peer_id: PeerId, - id: ConnectionId, + connection_id: ConnectionId, event: THandlerOutEvent, ) { match event { @@ -270,6 +270,7 @@ impl NetworkBehaviour for Behaviour { let observed = info.observed_addr.clone(); self.events .push_back(ToSwarm::GenerateEvent(Event::Received { + connection_id, peer_id, info: info.clone(), })); @@ -285,7 +286,7 @@ impl NetworkBehaviour for Behaviour { } } - match self.our_observed_addresses.entry(id) { + match self.our_observed_addresses.entry(connection_id) { Entry::Vacant(not_yet_observed) => { not_yet_observed.insert(observed.clone()); self.events @@ -298,7 +299,7 @@ impl NetworkBehaviour for Behaviour { tracing::info!( old_address=%already_observed.get(), new_address=%observed, - "Our observed address on connection {id} changed", + "Our observed address on connection {connection_id} changed", ); *already_observed.get_mut() = observed.clone(); @@ -308,16 +309,24 @@ impl NetworkBehaviour for Behaviour { } } handler::Event::Identification => { - self.events - .push_back(ToSwarm::GenerateEvent(Event::Sent { peer_id })); + self.events.push_back(ToSwarm::GenerateEvent(Event::Sent { + connection_id, + peer_id, + })); } handler::Event::IdentificationPushed(info) => { - self.events - .push_back(ToSwarm::GenerateEvent(Event::Pushed { peer_id, info })); + self.events.push_back(ToSwarm::GenerateEvent(Event::Pushed { + connection_id, + peer_id, + info, + })); } handler::Event::IdentificationError(error) => { - self.events - .push_back(ToSwarm::GenerateEvent(Event::Error { peer_id, error })); + self.events.push_back(ToSwarm::GenerateEvent(Event::Error { + connection_id, + peer_id, + error, + })); } } } @@ -415,6 +424,8 @@ impl NetworkBehaviour for Behaviour { pub enum Event { /// Identification information has been received from a peer. Received { + /// Identifier of the connection. + connection_id: ConnectionId, /// The peer that has been identified. peer_id: PeerId, /// The information provided by the peer. @@ -423,12 +434,16 @@ pub enum Event { /// Identification information of the local node has been sent to a peer in /// response to an identification request. Sent { + /// Identifier of the connection. + connection_id: ConnectionId, /// The peer that the information has been sent to. peer_id: PeerId, }, /// Identification information of the local node has been actively pushed to /// a peer. Pushed { + /// Identifier of the connection. + connection_id: ConnectionId, /// The peer that the information has been sent to. peer_id: PeerId, /// The full Info struct we pushed to the remote peer. Clients must @@ -437,6 +452,8 @@ pub enum Event { }, /// Error while attempting to identify the remote. Error { + /// Identifier of the connection. + connection_id: ConnectionId, /// The peer with whom the error originated. peer_id: PeerId, /// The error that occurred. @@ -444,6 +461,17 @@ pub enum Event { }, } +impl Event { + pub fn connection_id(&self) -> ConnectionId { + match self { + Event::Received { connection_id, .. } + | Event::Sent { connection_id, .. } + | Event::Pushed { connection_id, .. } + | Event::Error { connection_id, .. } => *connection_id, + } + } +} + /// If there is a given peer_id in the multiaddr, make sure it is the same as /// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true. fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool { diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 7e9e766f5568..f88484bafa18 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,5 +1,7 @@ ## 0.46.0 +- Included multiaddresses of found peers alongside peer IDs in `GetClosestPeers` query results. + See [PR 5475](https://github.com/libp2p/rust-libp2p/pull/5475) - Changed `FIND_NODE` response: now includes a list of closest peers when querying the recipient peer ID. Previously, this request yielded an empty response. See [PR 5270](https://github.com/libp2p/rust-libp2p/pull/5270) - Update to DHT republish interval and expiration time defaults to 22h and 48h respectively, rationale in [libp2p/specs#451](https://github.com/libp2p/specs/pull/451) diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index d09cea65860d..a00bf4e4040f 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -33,9 +33,8 @@ use crate::record::{ store::{self, RecordStore}, ProviderRecord, Record, }; -use crate::K_VALUE; use crate::{jobs::*, protocol}; -use fnv::{FnvHashMap, FnvHashSet}; +use fnv::FnvHashSet; use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::behaviour::{ @@ -47,7 +46,6 @@ use libp2p_swarm::{ ListenAddresses, NetworkBehaviour, NotifyHandler, StreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; -use smallvec::SmallVec; use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::fmt; use std::num::NonZeroUsize; @@ -76,7 +74,7 @@ pub struct Behaviour { record_filtering: StoreInserts, /// The currently active (i.e. in-progress) queries. - queries: QueryPool, + queries: QueryPool, /// The currently connected peers. /// @@ -270,7 +268,7 @@ impl Config { /// Sets the replication factor to use. /// /// The replication factor determines to how many closest peers - /// a record is replicated. The default is [`K_VALUE`]. + /// a record is replicated. The default is [`crate::K_VALUE`]. pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self { self.query_config.replication_factor = replication_factor; self @@ -429,7 +427,7 @@ impl Config { /// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted in the routing table. /// This prevent cascading bootstrap requests when multiple peers are inserted into the routing table "at the same time". /// This also allows to wait a little bit for other potential peers to be inserted into the routing table before - /// triggering a bootstrap, giving more context to the future bootstrap request. + /// triggering a bootstrap, giving more context to the future bootstrap request. /// /// * Default to `500` ms. /// * Set to `Some(Duration::ZERO)` to never wait before triggering a bootstrap request when a new peer @@ -726,8 +724,7 @@ where step: ProgressStep::first(), }; let peer_keys: Vec> = self.kbuckets.closest_keys(&target).collect(); - let inner = QueryInner::new(info); - self.queries.add_iter_closest(target, peer_keys, inner) + self.queries.add_iter_closest(target, peer_keys, info) } /// Returns closest peers to the given key; takes peers from local routing table only. @@ -776,8 +773,7 @@ where } }; let peers = self.kbuckets.closest_keys(&target); - let inner = QueryInner::new(info); - let id = self.queries.add_iter_closest(target.clone(), peers, inner); + let id = self.queries.add_iter_closest(target.clone(), peers, info); // No queries were actually done for the results yet. let stats = QueryStats::empty(); @@ -833,8 +829,7 @@ where quorum, phase: PutRecordPhase::GetClosestPeers, }; - let inner = QueryInner::new(info); - Ok(self.queries.add_iter_closest(target.clone(), peers, inner)) + Ok(self.queries.add_iter_closest(target.clone(), peers, info)) } /// Stores a record at specific peers, without storing it locally. @@ -879,8 +874,7 @@ where get_closest_peers_stats: QueryStats::empty(), }, }; - let inner = QueryInner::new(info); - self.queries.add_fixed(peers, inner) + self.queries.add_fixed(peers, info) } /// Removes the record with the given key from _local_ storage, @@ -944,8 +938,7 @@ where Err(NoKnownPeers()) } else { self.bootstrap_status.on_started(); - let inner = QueryInner::new(info); - Ok(self.queries.add_iter_closest(local_key, peers, inner)) + Ok(self.queries.add_iter_closest(local_key, peers, info)) } } @@ -990,8 +983,7 @@ where key, phase: AddProviderPhase::GetClosestPeers, }; - let inner = QueryInner::new(info); - let id = self.queries.add_iter_closest(target.clone(), peers, inner); + let id = self.queries.add_iter_closest(target.clone(), peers, info); Ok(id) } @@ -1031,8 +1023,7 @@ where let target = kbucket::Key::new(key.clone()); let peers = self.kbuckets.closest_keys(&target); - let inner = QueryInner::new(info); - let id = self.queries.add_iter_closest(target.clone(), peers, inner); + let id = self.queries.add_iter_closest(target.clone(), peers, info); // No queries were actually done for the results yet. let stats = QueryStats::empty(); @@ -1166,7 +1157,7 @@ where "Peer reported by source in query" ); let addrs = peer.multiaddrs.iter().cloned().collect(); - query.inner.addresses.insert(peer.node_id, addrs); + query.peers.addresses.insert(peer.node_id, addrs); } query.on_success(source, others_iter.cloned().map(|kp| kp.node_id)) } @@ -1255,8 +1246,7 @@ where }; let target = kbucket::Key::new(key); let peers = self.kbuckets.closest_keys(&target); - let inner = QueryInner::new(info); - self.queries.add_iter_closest(target.clone(), peers, inner); + self.queries.add_iter_closest(target.clone(), peers, info); } /// Starts an iterative `PUT_VALUE` query for the given record. @@ -1270,8 +1260,7 @@ where context, phase: PutRecordPhase::GetClosestPeers, }; - let inner = QueryInner::new(info); - self.queries.add_iter_closest(target.clone(), peers, inner); + self.queries.add_iter_closest(target.clone(), peers, info); } /// Updates the routing table with a new connection status and address of a peer. @@ -1403,11 +1392,10 @@ where } /// Handles a finished (i.e. successful) query. - fn query_finished(&mut self, q: Query) -> Option { + fn query_finished(&mut self, q: Query) -> Option { let query_id = q.id(); tracing::trace!(query=?query_id, "Query finished"); - let result = q.into_result(); - match result.inner.info { + match q.info { QueryInfo::Bootstrap { peer, remaining, @@ -1461,9 +1449,8 @@ where step: step.next(), }; let peers = self.kbuckets.closest_keys(&target); - let inner = QueryInner::new(info); self.queries - .continue_iter_closest(query_id, target, peers, inner); + .continue_iter_closest(query_id, target, peers, info); } else { step.last = true; self.bootstrap_status.on_finish(); @@ -1471,7 +1458,7 @@ where Some(Event::OutboundQueryProgressed { id: query_id, - stats: result.stats, + stats: q.stats, result: QueryResult::Bootstrap(Ok(BootstrapOk { peer, num_remaining, @@ -1485,10 +1472,10 @@ where Some(Event::OutboundQueryProgressed { id: query_id, - stats: result.stats, + stats: q.stats, result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { key, - peers: result.peers.collect(), + peers: q.peers.into_peerinfos_iter().collect(), })), step, }) @@ -1499,10 +1486,10 @@ where Some(Event::OutboundQueryProgressed { id: query_id, - stats: result.stats, + stats: q.stats, result: QueryResult::GetProviders(Ok( GetProvidersOk::FinishedWithNoAdditionalRecord { - closest_peers: result.peers.collect(), + closest_peers: q.peers.into_peerids_iter().collect(), }, )), step, @@ -1516,16 +1503,17 @@ where } => { let provider_id = self.local_peer_id; let external_addresses = self.external_addresses.iter().cloned().collect(); - let inner = QueryInner::new(QueryInfo::AddProvider { + let info = QueryInfo::AddProvider { context, key, phase: AddProviderPhase::AddProvider { provider_id, external_addresses, - get_closest_peers_stats: result.stats, + get_closest_peers_stats: q.stats, }, - }); - self.queries.continue_fixed(query_id, result.peers, inner); + }; + self.queries + .continue_fixed(query_id, q.peers.into_peerids_iter(), info); None } @@ -1540,13 +1528,13 @@ where } => match context { AddProviderContext::Publish => Some(Event::OutboundQueryProgressed { id: query_id, - stats: get_closest_peers_stats.merge(result.stats), + stats: get_closest_peers_stats.merge(q.stats), result: QueryResult::StartProviding(Ok(AddProviderOk { key })), step: ProgressStep::first_and_last(), }), AddProviderContext::Republish => Some(Event::OutboundQueryProgressed { id: query_id, - stats: get_closest_peers_stats.merge(result.stats), + stats: get_closest_peers_stats.merge(q.stats), result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })), step: ProgressStep::first_and_last(), }), @@ -1565,12 +1553,12 @@ where } else { Err(GetRecordError::NotFound { key, - closest_peers: result.peers.collect(), + closest_peers: q.peers.into_peerids_iter().collect(), }) }; Some(Event::OutboundQueryProgressed { id: query_id, - stats: result.stats, + stats: q.stats, result: QueryResult::GetRecord(results), step, }) @@ -1588,11 +1576,11 @@ where quorum, phase: PutRecordPhase::PutRecord { success: vec![], - get_closest_peers_stats: result.stats, + get_closest_peers_stats: q.stats, }, }; - let inner = QueryInner::new(info); - self.queries.continue_fixed(query_id, result.peers, inner); + self.queries + .continue_fixed(query_id, q.peers.into_peerids_iter(), info); None } @@ -1621,14 +1609,14 @@ where PutRecordContext::Publish | PutRecordContext::Custom => { Some(Event::OutboundQueryProgressed { id: query_id, - stats: get_closest_peers_stats.merge(result.stats), + stats: get_closest_peers_stats.merge(q.stats), result: QueryResult::PutRecord(mk_result(record.key)), step: ProgressStep::first_and_last(), }) } PutRecordContext::Republish => Some(Event::OutboundQueryProgressed { id: query_id, - stats: get_closest_peers_stats.merge(result.stats), + stats: get_closest_peers_stats.merge(q.stats), result: QueryResult::RepublishRecord(mk_result(record.key)), step: ProgressStep::first_and_last(), }), @@ -1642,11 +1630,10 @@ where } /// Handles a query that timed out. - fn query_timeout(&mut self, query: Query) -> Option { + fn query_timeout(&mut self, query: Query) -> Option { let query_id = query.id(); tracing::trace!(query=?query_id, "Query timed out"); - let result = query.into_result(); - match result.inner.info { + match query.info { QueryInfo::Bootstrap { peer, mut remaining, @@ -1664,9 +1651,8 @@ where step: step.next(), }; let peers = self.kbuckets.closest_keys(&target); - let inner = QueryInner::new(info); self.queries - .continue_iter_closest(query_id, target, peers, inner); + .continue_iter_closest(query_id, target, peers, info); } else { step.last = true; self.bootstrap_status.on_finish(); @@ -1674,7 +1660,7 @@ where Some(Event::OutboundQueryProgressed { id: query_id, - stats: result.stats, + stats: query.stats, result: QueryResult::Bootstrap(Err(BootstrapError::Timeout { peer, num_remaining, @@ -1686,13 +1672,13 @@ where QueryInfo::AddProvider { context, key, .. } => Some(match context { AddProviderContext::Publish => Event::OutboundQueryProgressed { id: query_id, - stats: result.stats, + stats: query.stats, result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })), step: ProgressStep::first_and_last(), }, AddProviderContext::Republish => Event::OutboundQueryProgressed { id: query_id, - stats: result.stats, + stats: query.stats, result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })), step: ProgressStep::first_and_last(), }, @@ -1700,13 +1686,12 @@ where QueryInfo::GetClosestPeers { key, mut step } => { step.last = true; - Some(Event::OutboundQueryProgressed { id: query_id, - stats: result.stats, + stats: query.stats, result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout { key, - peers: result.peers.collect(), + peers: query.peers.into_peerinfos_iter().collect(), })), step, }) @@ -1730,14 +1715,14 @@ where PutRecordContext::Publish | PutRecordContext::Custom => { Some(Event::OutboundQueryProgressed { id: query_id, - stats: result.stats, + stats: query.stats, result: QueryResult::PutRecord(err), step: ProgressStep::first_and_last(), }) } PutRecordContext::Republish => Some(Event::OutboundQueryProgressed { id: query_id, - stats: result.stats, + stats: query.stats, result: QueryResult::RepublishRecord(err), step: ProgressStep::first_and_last(), }), @@ -1762,7 +1747,7 @@ where Some(Event::OutboundQueryProgressed { id: query_id, - stats: result.stats, + stats: query.stats, result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })), step, }) @@ -1773,10 +1758,10 @@ where Some(Event::OutboundQueryProgressed { id: query_id, - stats: result.stats, + stats: query.stats, result: QueryResult::GetProviders(Err(GetProvidersError::Timeout { key, - closest_peers: result.peers.collect(), + closest_peers: query.peers.into_peerids_iter().collect(), })), step, }) @@ -1974,7 +1959,7 @@ where } for query in self.queries.iter_mut() { - if let Some(addrs) = query.inner.addresses.get_mut(&peer_id) { + if let Some(addrs) = query.peers.addresses.get_mut(&peer_id) { addrs.retain(|a| a != address); } } @@ -2046,10 +2031,10 @@ where // // Given two connected nodes: local node A and remote node B. Say node B // is not in node A's routing table. Additionally node B is part of the - // `QueryInner::addresses` list of an ongoing query on node A. Say Node + // `Query::addresses` list of an ongoing query on node A. Say Node // B triggers an address change and then disconnects. Later on the // earlier mentioned query on node A would like to connect to node B. - // Without replacing the address in the `QueryInner::addresses` set node + // Without replacing the address in the `Query::addresses` set node // A would attempt to dial the old and not the new address. // // While upholding correctness, iterating through all discovered @@ -2057,7 +2042,7 @@ where // large performance impact. If so, the code below might be worth // revisiting. for query in self.queries.iter_mut() { - if let Some(addrs) = query.inner.addresses.get_mut(&peer) { + if let Some(addrs) = query.peers.addresses.get_mut(&peer) { for addr in addrs.iter_mut() { if addr == old { *addr = new.clone(); @@ -2132,11 +2117,10 @@ where // Queue events for sending pending RPCs to the connected peer. // There can be only one pending RPC for a particular peer and query per definition. for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| { - q.inner - .pending_rpcs + q.pending_rpcs .iter() .position(|(p, _)| p == &peer) - .map(|p| q.inner.pending_rpcs.remove(p)) + .map(|p| q.pending_rpcs.remove(p)) }) { handler.on_behaviour_event(event) } @@ -2227,7 +2211,7 @@ where // We add to that a temporary list of addresses from the ongoing queries. for query in self.queries.iter() { - if let Some(addrs) = query.inner.addresses.get(&peer_id) { + if let Some(addrs) = query.peers.addresses.get(&peer_id) { peer_addrs.extend(addrs.iter().cloned()) } } @@ -2328,7 +2312,7 @@ where ref mut providers_found, ref mut step, .. - } = query.inner.info + } = query.info { *providers_found += provider_peers.len(); let providers = provider_peers.iter().map(|p| p.node_id).collect(); @@ -2420,7 +2404,7 @@ where ref mut step, ref mut found_a_record, cache_candidates, - } = &mut query.inner.info + } = &mut query.info { if let Some(record) = record { *found_a_record = true; @@ -2474,7 +2458,7 @@ where phase: PutRecordPhase::PutRecord { success, .. }, quorum, .. - } = &mut query.inner.info + } = &mut query.info { success.push(source); @@ -2586,7 +2570,7 @@ where } } QueryPoolState::Waiting(Some((query, peer_id))) => { - let event = query.inner.info.to_request(query.id()); + let event = query.info.to_request(query.id()); // TODO: AddProvider requests yield no response, so the query completes // as soon as all requests have been sent. However, the handler should // better emit an event when the request has been sent (and report @@ -2595,7 +2579,7 @@ where if let QueryInfo::AddProvider { phase: AddProviderPhase::AddProvider { .. }, .. - } = &query.inner.info + } = &query.info { query.on_success(&peer_id, vec![]) } @@ -2607,7 +2591,7 @@ where handler: NotifyHandler::Any, }); } else if &peer_id != self.kbuckets.local_key().preimage() { - query.inner.pending_rpcs.push((peer_id, event)); + query.pending_rpcs.push((peer_id, event)); self.queued_events.push_back(ToSwarm::Dial { opts: DialOpts::peer_id(peer_id).build(), }); @@ -2656,6 +2640,13 @@ where } } +/// Peer Info combines a Peer ID with a set of multiaddrs that the peer is listening on. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PeerInfo { + pub peer_id: PeerId, + pub addrs: Vec, +} + /// A quorum w.r.t. the configured replication factor specifies the minimum /// number of distinct nodes that must be successfully contacted in order /// for a query to succeed. @@ -3000,14 +2991,14 @@ pub type GetClosestPeersResult = Result #[derive(Debug, Clone)] pub struct GetClosestPeersOk { pub key: Vec, - pub peers: Vec, + pub peers: Vec, } /// The error result of [`Behaviour::get_closest_peers`]. #[derive(Debug, Clone, Error)] pub enum GetClosestPeersError { #[error("the request timed out")] - Timeout { key: Vec, peers: Vec }, + Timeout { key: Vec, peers: Vec }, } impl GetClosestPeersError { @@ -3115,31 +3106,6 @@ impl From, Addresses>> for KadPeer { } } -////////////////////////////////////////////////////////////////////////////// -// Internal query state - -struct QueryInner { - /// The query-specific state. - info: QueryInfo, - /// Addresses of peers discovered during a query. - addresses: FnvHashMap>, - /// A map of pending requests to peers. - /// - /// A request is pending if the targeted peer is not currently connected - /// and these requests are sent as soon as a connection to the peer is established. - pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>, -} - -impl QueryInner { - fn new(info: QueryInfo) -> Self { - QueryInner { - info, - addresses: Default::default(), - pending_rpcs: SmallVec::default(), - } - } -} - /// The context of a [`QueryInfo::AddProvider`] query. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum AddProviderContext { @@ -3325,7 +3291,7 @@ pub enum PutRecordPhase { /// A mutable reference to a running query. pub struct QueryMut<'a> { - query: &'a mut Query, + query: &'a mut Query, } impl<'a> QueryMut<'a> { @@ -3335,7 +3301,7 @@ impl<'a> QueryMut<'a> { /// Gets information about the type and state of the query. pub fn info(&self) -> &QueryInfo { - &self.query.inner.info + &self.query.info } /// Gets execution statistics about the query. @@ -3355,7 +3321,7 @@ impl<'a> QueryMut<'a> { /// An immutable reference to a running query. pub struct QueryRef<'a> { - query: &'a Query, + query: &'a Query, } impl<'a> QueryRef<'a> { @@ -3365,7 +3331,7 @@ impl<'a> QueryRef<'a> { /// Gets information about the type and state of the query. pub fn info(&self) -> &QueryInfo { - &self.query.inner.info + &self.query.info } /// Gets execution statistics about the query. diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 7005f39e5e67..b82ec966f896 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -23,7 +23,7 @@ use super::*; use crate::record::{store::MemoryStore, Key}; -use crate::{PROTOCOL_NAME, SHA_256_MH}; +use crate::{K_VALUE, PROTOCOL_NAME, SHA_256_MH}; use futures::{executor::block_on, future::poll_fn, prelude::*}; use futures_timer::Delay; use libp2p_core::{ @@ -294,9 +294,11 @@ fn query_iter() { assert_eq!(&ok.key[..], search_target.to_bytes().as_slice()); assert_eq!(swarm_ids[i], expected_swarm_id); assert_eq!(swarm.behaviour_mut().queries.size(), 0); - assert!(expected_peer_ids.iter().all(|p| ok.peers.contains(p))); + let peer_ids = + ok.peers.into_iter().map(|p| p.peer_id).collect::>(); + assert!(expected_peer_ids.iter().all(|p| peer_ids.contains(p))); let key = kbucket::Key::new(ok.key); - assert_eq!(expected_distances, distances(&key, ok.peers)); + assert_eq!(expected_distances, distances(&key, peer_ids)); return Poll::Ready(()); } // Ignore any other event. @@ -408,7 +410,7 @@ fn unresponsive_not_returned_indirect() { }))) => { assert_eq!(&ok.key[..], search_target.to_bytes().as_slice()); assert_eq!(ok.peers.len(), 1); - assert_eq!(ok.peers[0], first_peer_id); + assert_eq!(ok.peers[0].peer_id, first_peer_id); return Poll::Ready(()); } // Ignore any other event. @@ -1185,7 +1187,7 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { .behaviour() .queries .iter() - .for_each(|q| match &q.inner.info { + .for_each(|q| match &q.info { QueryInfo::GetRecord { step, .. } => { assert_eq!(usize::from(step.count), 2); } diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index bb7c2ace663e..681d135f79b8 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -59,9 +59,9 @@ pub use behaviour::{ AddProviderContext, AddProviderError, AddProviderOk, AddProviderPhase, AddProviderResult, BootstrapError, BootstrapOk, BootstrapResult, GetClosestPeersError, GetClosestPeersOk, GetClosestPeersResult, GetProvidersError, GetProvidersOk, GetProvidersResult, GetRecordError, - GetRecordOk, GetRecordResult, InboundRequest, Mode, NoKnownPeers, PeerRecord, PutRecordContext, - PutRecordError, PutRecordOk, PutRecordPhase, PutRecordResult, QueryInfo, QueryMut, QueryRef, - QueryResult, QueryStats, RoutingUpdate, + GetRecordOk, GetRecordResult, InboundRequest, Mode, NoKnownPeers, PeerInfo, PeerRecord, + PutRecordContext, PutRecordError, PutRecordOk, PutRecordPhase, PutRecordResult, QueryInfo, + QueryMut, QueryRef, QueryResult, QueryStats, RoutingUpdate, }; pub use behaviour::{ Behaviour, BucketInserts, Caching, Config, Event, ProgressStep, Quorum, StoreInserts, diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index cf102040a7a7..c598bac012ee 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -20,14 +20,18 @@ mod peers; +use libp2p_core::Multiaddr; use peers::closest::{ disjoint::ClosestDisjointPeersIter, ClosestPeersIter, ClosestPeersIterConfig, }; use peers::fixed::FixedPeersIter; use peers::PeersIterState; +use smallvec::SmallVec; +use crate::behaviour::PeerInfo; +use crate::handler::HandlerIn; use crate::kbucket::{Key, KeyBytes}; -use crate::{ALPHA_VALUE, K_VALUE}; +use crate::{QueryInfo, ALPHA_VALUE, K_VALUE}; use either::Either; use fnv::FnvHashMap; use libp2p_identity::PeerId; @@ -39,26 +43,26 @@ use web_time::Instant; /// Internally, a `Query` is in turn driven by an underlying `QueryPeerIter` /// that determines the peer selection strategy, i.e. the order in which the /// peers involved in the query should be contacted. -pub(crate) struct QueryPool { +pub(crate) struct QueryPool { next_id: usize, config: QueryConfig, - queries: FnvHashMap>, + queries: FnvHashMap, } /// The observable states emitted by [`QueryPool::poll`]. -pub(crate) enum QueryPoolState<'a, TInner> { +pub(crate) enum QueryPoolState<'a> { /// The pool is idle, i.e. there are no queries to process. Idle, /// At least one query is waiting for results. `Some(request)` indicates /// that a new request is now being waited on. - Waiting(Option<(&'a mut Query, PeerId)>), + Waiting(Option<(&'a mut Query, PeerId)>), /// A query has finished. - Finished(Query), + Finished(Query), /// A query has timed out. - Timeout(Query), + Timeout(Query), } -impl QueryPool { +impl QueryPool { /// Creates a new `QueryPool` with the given configuration. pub(crate) fn new(config: QueryConfig) -> Self { QueryPool { @@ -74,7 +78,7 @@ impl QueryPool { } /// Returns an iterator over the queries in the pool. - pub(crate) fn iter(&self) -> impl Iterator> { + pub(crate) fn iter(&self) -> impl Iterator { self.queries.values() } @@ -84,42 +88,42 @@ impl QueryPool { } /// Returns an iterator that allows modifying each query in the pool. - pub(crate) fn iter_mut(&mut self) -> impl Iterator> { + pub(crate) fn iter_mut(&mut self) -> impl Iterator { self.queries.values_mut() } /// Adds a query to the pool that contacts a fixed set of peers. - pub(crate) fn add_fixed(&mut self, peers: I, inner: TInner) -> QueryId + pub(crate) fn add_fixed(&mut self, peers: I, info: QueryInfo) -> QueryId where I: IntoIterator, { let id = self.next_query_id(); - self.continue_fixed(id, peers, inner); + self.continue_fixed(id, peers, info); id } /// Continues an earlier query with a fixed set of peers, reusing /// the given query ID, which must be from a query that finished /// earlier. - pub(crate) fn continue_fixed(&mut self, id: QueryId, peers: I, inner: TInner) + pub(crate) fn continue_fixed(&mut self, id: QueryId, peers: I, info: QueryInfo) where I: IntoIterator, { assert!(!self.queries.contains_key(&id)); let parallelism = self.config.replication_factor; let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism)); - let query = Query::new(id, peer_iter, inner); + let query = Query::new(id, peer_iter, info); self.queries.insert(id, query); } /// Adds a query to the pool that iterates towards the closest peers to the target. - pub(crate) fn add_iter_closest(&mut self, target: T, peers: I, inner: TInner) -> QueryId + pub(crate) fn add_iter_closest(&mut self, target: T, peers: I, info: QueryInfo) -> QueryId where T: Into + Clone, I: IntoIterator>, { let id = self.next_query_id(); - self.continue_iter_closest(id, target, peers, inner); + self.continue_iter_closest(id, target, peers, info); id } @@ -129,7 +133,7 @@ impl QueryPool { id: QueryId, target: T, peers: I, - inner: TInner, + info: QueryInfo, ) where T: Into + Clone, I: IntoIterator>, @@ -148,7 +152,7 @@ impl QueryPool { QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers)) }; - let query = Query::new(id, peer_iter, inner); + let query = Query::new(id, peer_iter, info); self.queries.insert(id, query); } @@ -159,17 +163,17 @@ impl QueryPool { } /// Returns a reference to a query with the given ID, if it is in the pool. - pub(crate) fn get(&self, id: &QueryId) -> Option<&Query> { + pub(crate) fn get(&self, id: &QueryId) -> Option<&Query> { self.queries.get(id) } /// Returns a mutablereference to a query with the given ID, if it is in the pool. - pub(crate) fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query> { + pub(crate) fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query> { self.queries.get_mut(id) } /// Polls the pool to advance the queries. - pub(crate) fn poll(&mut self, now: Instant) -> QueryPoolState<'_, TInner> { + pub(crate) fn poll(&mut self, now: Instant) -> QueryPoolState<'_> { let mut finished = None; let mut timeout = None; let mut waiting = None; @@ -264,15 +268,53 @@ impl Default for QueryConfig { } /// A query in a `QueryPool`. -pub(crate) struct Query { +pub(crate) struct Query { /// The unique ID of the query. id: QueryId, /// The peer iterator that drives the query state. - peer_iter: QueryPeerIter, + pub(crate) peers: QueryPeers, /// Execution statistics of the query. - stats: QueryStats, - /// The opaque inner query state. - pub(crate) inner: TInner, + pub(crate) stats: QueryStats, + /// The query-specific state. + pub(crate) info: QueryInfo, + /// A map of pending requests to peers. + /// + /// A request is pending if the targeted peer is not currently connected + /// and these requests are sent as soon as a connection to the peer is established. + pub(crate) pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>, +} + +/// The peer iterator that drives the query state, +pub(crate) struct QueryPeers { + /// Addresses of peers discovered during a query. + pub(crate) addresses: FnvHashMap>, + /// The peer iterator that drives the query state. + peer_iter: QueryPeerIter, +} + +impl QueryPeers { + /// Consumes the peers iterator, producing a final `Iterator` over the discovered `PeerId`s. + pub(crate) fn into_peerids_iter(self) -> impl Iterator { + match self.peer_iter { + QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())), + QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())), + QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()), + } + } + + /// Consumes the peers iterator, producing a final `Iterator` over the discovered `PeerId`s + /// with their matching `Multiaddr`s. + pub(crate) fn into_peerinfos_iter(mut self) -> impl Iterator { + match self.peer_iter { + QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())), + QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())), + QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()), + } + .map(move |peer_id| { + let addrs = self.addresses.remove(&peer_id).unwrap_or_default().to_vec(); + PeerInfo { peer_id, addrs } + }) + } } /// The peer selection strategies that can be used by queries. @@ -282,13 +324,17 @@ enum QueryPeerIter { Fixed(FixedPeersIter), } -impl Query { +impl Query { /// Creates a new query without starting it. - fn new(id: QueryId, peer_iter: QueryPeerIter, inner: TInner) -> Self { + fn new(id: QueryId, peer_iter: QueryPeerIter, info: QueryInfo) -> Self { Query { id, - inner, - peer_iter, + info, + peers: QueryPeers { + addresses: Default::default(), + peer_iter, + }, + pending_rpcs: SmallVec::default(), stats: QueryStats::empty(), } } @@ -305,7 +351,7 @@ impl Query { /// Informs the query that the attempt to contact `peer` failed. pub(crate) fn on_failure(&mut self, peer: &PeerId) { - let updated = match &mut self.peer_iter { + let updated = match &mut self.peers.peer_iter { QueryPeerIter::Closest(iter) => iter.on_failure(peer), QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer), QueryPeerIter::Fixed(iter) => iter.on_failure(peer), @@ -322,7 +368,7 @@ impl Query { where I: IntoIterator, { - let updated = match &mut self.peer_iter { + let updated = match &mut self.peers.peer_iter { QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers), QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers), QueryPeerIter::Fixed(iter) => iter.on_success(peer), @@ -334,7 +380,7 @@ impl Query { /// Advances the state of the underlying peer iterator. fn next(&mut self, now: Instant) -> PeersIterState<'_> { - let state = match &mut self.peer_iter { + let state = match &mut self.peers.peer_iter { QueryPeerIter::Closest(iter) => iter.next(now), QueryPeerIter::ClosestDisjoint(iter) => iter.next(now), QueryPeerIter::Fixed(iter) => iter.next(), @@ -368,7 +414,7 @@ impl Query { where I: IntoIterator, { - match &mut self.peer_iter { + match &mut self.peers.peer_iter { QueryPeerIter::Closest(iter) => { iter.finish(); true @@ -386,7 +432,7 @@ impl Query { /// A finished query immediately stops yielding new peers to contact and will be /// reported by [`QueryPool::poll`] via [`QueryPoolState::Finished`]. pub(crate) fn finish(&mut self) { - match &mut self.peer_iter { + match &mut self.peers.peer_iter { QueryPeerIter::Closest(iter) => iter.finish(), QueryPeerIter::ClosestDisjoint(iter) => iter.finish(), QueryPeerIter::Fixed(iter) => iter.finish(), @@ -398,36 +444,12 @@ impl Query { /// A finished query is eventually reported by `QueryPool::next()` and /// removed from the pool. pub(crate) fn is_finished(&self) -> bool { - match &self.peer_iter { + match &self.peers.peer_iter { QueryPeerIter::Closest(iter) => iter.is_finished(), QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(), QueryPeerIter::Fixed(iter) => iter.is_finished(), } } - - /// Consumes the query, producing the final `QueryResult`. - pub(crate) fn into_result(self) -> QueryResult> { - let peers = match self.peer_iter { - QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())), - QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())), - QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()), - }; - QueryResult { - peers, - inner: self.inner, - stats: self.stats, - } - } -} - -/// The result of a `Query`. -pub(crate) struct QueryResult { - /// The opaque inner query state. - pub(crate) inner: TInner, - /// The successfully contacted peers. - pub(crate) peers: TPeers, - /// The collected query statistics. - pub(crate) stats: QueryStats, } /// Execution statistics of a query.