diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs
index be95ed6de53bf..cd98c26809644 100644
--- a/bin/node/cli/src/service.rs
+++ b/bin/node/cli/src/service.rs
@@ -266,7 +266,7 @@ pub fn new_full_base(
Event::Dht(e) => Some(e),
_ => None,
}}).boxed();
- let authority_discovery = sc_authority_discovery::AuthorityDiscovery::new(
+ let (authority_discovery_worker, _service) = sc_authority_discovery::new_worker_and_service(
client.clone(),
network.clone(),
sentries,
@@ -275,7 +275,7 @@ pub fn new_full_base(
prometheus_registry.clone(),
);
- task_manager.spawn_handle().spawn("authority-discovery", authority_discovery);
+ task_manager.spawn_handle().spawn("authority-discovery-worker", authority_discovery_worker);
}
// if the node isn't actively participating in consensus then it doesn't
diff --git a/client/authority-discovery/build.rs b/client/authority-discovery/build.rs
index ed632575f3ba8..c44fe8578ba25 100644
--- a/client/authority-discovery/build.rs
+++ b/client/authority-discovery/build.rs
@@ -1,3 +1,3 @@
fn main() {
- prost_build::compile_protos(&["src/schema/dht.proto"], &["src/schema"]).unwrap();
+ prost_build::compile_protos(&["src/worker/schema/dht.proto"], &["src/worker/schema"]).unwrap();
}
diff --git a/client/authority-discovery/src/addr_cache.rs b/client/authority-discovery/src/addr_cache.rs
deleted file mode 100644
index f108afce0a92f..0000000000000
--- a/client/authority-discovery/src/addr_cache.rs
+++ /dev/null
@@ -1,205 +0,0 @@
-// Copyright 2019-2020 Parity Technologies (UK) Ltd.
-// This file is part of Substrate.
-
-// Substrate is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Substrate is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Substrate. If not, see .
-
-use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
-use std::{
- clone::Clone,
- cmp::{Eq, Ord, PartialEq},
- collections::BTreeMap,
- convert::AsRef,
- hash::Hash,
-};
-
-/// The maximum number of authority connections initialized through the authority discovery module.
-///
-/// In other words the maximum size of the `authority` peer set priority group.
-const MAX_NUM_AUTHORITY_CONN: usize = 10;
-
-/// Cache of Multiaddresses of authority nodes or their sentry nodes.
-//
-// The network peerset interface for priority groups lets us only set an entire group, but we
-// retrieve the addresses of other authorities one by one from the network. To use the peerset
-// interface we need to cache the addresses and always overwrite the entire peerset priority
-// group. To ensure this map doesn't grow indefinitely `purge_old_authorities_from_cache`
-// function is called each time we add a new entry.
-pub(super) struct AddrCache {
- cache: BTreeMap>,
-
- /// Random number to seed address selection RNG.
- ///
- /// A node should only try to connect to a subset of all authorities. To choose this subset one
- /// uses randomness. The choice should differ between nodes to prevent hot spots, but not within
- /// each node between each update to prevent connection churn. Thus before each selection we
- /// seed an RNG with the same seed.
- rand_addr_selection_seed: u64,
-}
-
-impl AddrCache
-where
- Id: Clone + Eq + Hash + Ord,
- Addr: Clone + PartialEq + AsRef<[u8]>,
-{
- pub fn new() -> Self {
- AddrCache {
- cache: BTreeMap::new(),
- rand_addr_selection_seed: rand::thread_rng().gen(),
- }
- }
-
- pub fn insert(&mut self, id: Id, mut addresses: Vec) {
- if addresses.is_empty() {
- return;
- }
-
- addresses.sort_by(|a, b| a.as_ref().cmp(b.as_ref()));
- self.cache.insert(id, addresses);
- }
-
- /// Returns the number of authority IDs in the cache.
- pub fn num_ids(&self) -> usize {
- self.cache.len()
- }
-
- // Each node should connect to a subset of all authorities. In order to prevent hot spots, this
- // selection is based on randomness. Selecting randomly each time we alter the address cache
- // would result in connection churn. To reduce this churn a node generates a seed on startup and
- // uses this seed for a new rng on each update. (One could as well use ones peer id as a seed.
- // Given that the peer id is publicly known, it would make this process predictable by others,
- // which might be used as an attack.)
- pub fn get_subset(&self) -> Vec {
- let mut rng = StdRng::seed_from_u64(self.rand_addr_selection_seed);
-
- let mut addresses = self
- .cache
- .iter()
- .map(|(_peer_id, addresses)| {
- addresses
- .choose(&mut rng)
- .expect("an empty address vector is never inserted into the cache")
- })
- .cloned()
- .collect::>();
-
- addresses.dedup();
- addresses.sort_by(|a, b| a.as_ref().cmp(b.as_ref()));
-
- addresses
- .choose_multiple(&mut rng, MAX_NUM_AUTHORITY_CONN)
- .cloned()
- .collect()
- }
-
- pub fn retain_ids(&mut self, ids: &Vec) {
- let to_remove = self
- .cache
- .iter()
- .filter(|(id, _addresses)| !ids.contains(id))
- .map(|entry| entry.0)
- .cloned()
- .collect::>();
-
- for key in to_remove {
- self.cache.remove(&key);
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use quickcheck::{QuickCheck, TestResult};
-
- #[test]
- fn returns_addresses_of_same_authorities_on_repeated_calls() {
- fn property(input: Vec<(u32, Vec)>) -> TestResult {
- // Expect less than 1000 authorities.
- if input.len() > 1000 {
- return TestResult::discard();
- }
-
- // Expect less than 100 addresses per authority.
- for i in &input {
- if i.1.len() > 100 {
- return TestResult::discard();
- }
- }
-
- let mut c = AddrCache::new();
-
- for (id, addresses) in input {
- c.insert(id, addresses);
- }
-
- let result = c.get_subset();
- assert!(result.len() <= MAX_NUM_AUTHORITY_CONN);
-
- for _ in 1..100 {
- assert_eq!(c.get_subset(), result);
- }
-
- TestResult::passed()
- }
-
- QuickCheck::new()
- .max_tests(10)
- .quickcheck(property as fn(Vec<(u32, Vec)>) -> TestResult)
- }
-
- #[test]
- fn returns_same_addresses_of_first_authority_when_second_authority_changes() {
- let mut c = AddrCache::new();
-
- // Insert addresses of first authority.
- let addresses = (1..100)
- .map(|i| format!("{:?}", i))
- .collect::>();
- c.insert(1, addresses);
- let first_subset = c.get_subset();
- assert_eq!(1, first_subset.len());
-
- // Insert address of second authority.
- c.insert(2, vec!["a".to_string()]);
- let second_subset = c.get_subset();
- assert_eq!(2, second_subset.len());
-
- // Expect same address of first authority.
- assert!(second_subset.contains(&first_subset[0]));
-
- // Alter address of second authority.
- c.insert(2, vec!["b".to_string()]);
- let second_subset = c.get_subset();
- assert_eq!(2, second_subset.len());
-
- // Expect same address of first authority.
- assert!(second_subset.contains(&first_subset[0]));
- }
-
- #[test]
- fn retains_only_entries_of_provided_ids() {
- let mut cache = AddrCache::new();
-
- cache.insert(1, vec![vec![10]]);
- cache.insert(2, vec![vec![20]]);
- cache.insert(3, vec![vec![30]]);
-
- cache.retain_ids(&vec![1, 3]);
-
- let mut subset = cache.get_subset();
- subset.sort();
-
- assert_eq!(vec![vec![10], vec![30]], subset);
- }
-}
diff --git a/client/authority-discovery/src/lib.rs b/client/authority-discovery/src/lib.rs
index 1a4473d665ce0..347deb8d9fc5d 100644
--- a/client/authority-discovery/src/lib.rs
+++ b/client/authority-discovery/src/lib.rs
@@ -18,705 +18,61 @@
//! Substrate authority discovery.
//!
-//! This crate enables Substrate authorities to directly connect to other authorities.
-//! [`AuthorityDiscovery`] implements the Future trait. By polling [`AuthorityDiscovery`] an
-//! authority:
+//! This crate enables Substrate authorities to discover and directly connect to
+//! other authorities. It is split into two components the [`Worker`] and the
+//! [`Service`].
//!
-//!
-//! 1. **Makes itself discoverable**
-//!
-//! 1. Retrieves its external addresses (including peer id) or the ones of its sentry nodes.
-//!
-//! 2. Signs the above.
-//!
-//! 3. Puts the signature and the addresses on the libp2p Kademlia DHT.
-//!
-//!
-//! 2. **Discovers other authorities**
-//!
-//! 1. Retrieves the current set of authorities.
-//!
-//! 2. Starts DHT queries for the ids of the authorities.
-//!
-//! 3. Validates the signatures of the retrieved key value pairs.
-//!
-//! 4. Adds the retrieved external addresses as priority nodes to the peerset.
-//!
-//! When run as a sentry node, the authority discovery module does not
-//! publish any addresses to the DHT but still discovers validators and
-//! sentry nodes of validators, i.e. only step 2 (Discovers other authorities)
-//! is executed.
+//! See [`Worker`] and [`Service`] for more documentation.
+
+pub use crate::{service::Service, worker::{NetworkProvider, Worker, Role}};
-use std::collections::{HashMap, HashSet};
-use std::convert::TryInto;
-use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
-use std::time::{Duration, Instant};
-use futures::task::{Context, Poll};
-use futures::{Future, FutureExt, ready, Stream, StreamExt};
-use futures_timer::Delay;
+use futures::channel::{mpsc, oneshot};
+use futures::Stream;
-use addr_cache::AddrCache;
-use codec::Decode;
-use error::{Error, Result};
-use libp2p::core::multiaddr;
-use log::{debug, error, log_enabled};
-use prometheus_endpoint::{Counter, CounterVec, Gauge, Opts, U64, register};
-use prost::Message;
use sc_client_api::blockchain::HeaderBackend;
-use sc_network::{
- config::MultiaddrWithPeerId,
- DhtEvent,
- ExHashT,
- Multiaddr,
- NetworkStateInfo,
- PeerId,
-};
-use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair};
-use sp_core::crypto::{key_types, Pair};
-use sp_core::traits::BareCryptoStorePtr;
-use sp_runtime::{traits::Block as BlockT, generic::BlockId};
+use sc_network::{config::MultiaddrWithPeerId, DhtEvent, Multiaddr, PeerId};
+use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId};
+use sp_runtime::traits::Block as BlockT;
use sp_api::ProvideRuntimeApi;
+mod error;
+mod service;
#[cfg(test)]
mod tests;
+mod worker;
-mod error;
-mod addr_cache;
-/// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs.
-mod schema {
- include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs"));
-}
-
-type Interval = Box + Unpin + Send + Sync>;
-
-const LOG_TARGET: &'static str = "sub-authority-discovery";
-
-/// Upper bound estimation on how long one should wait before accessing the Kademlia DHT.
-const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30);
-
-/// Name of the Substrate peerset priority group for authorities discovered through the authority
-/// discovery module.
-const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities";
-
-/// Role an authority discovery module can run as.
-pub enum Role {
- /// Actual authority as well as a reference to its key store.
- Authority(BareCryptoStorePtr),
- /// Sentry node that guards an authority.
- ///
- /// No reference to its key store needed, as sentry nodes don't have an identity to sign
- /// addresses with in the first place.
- Sentry,
-}
-
-/// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities.
-pub struct AuthorityDiscovery
-where
- Block: BlockT + 'static,
- Network: NetworkProvider,
- Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend,
- >::Api: AuthorityDiscoveryApi,
-{
+/// Create a new authority discovery [`Worker`] and [`Service`].
+pub fn new_worker_and_service(
client: Arc,
-
network: Arc,
- /// List of sentry node public addresses.
- //
- // There are 3 states:
- // - None: No addresses were specified.
- // - Some(vec![]): Addresses were specified, but none could be parsed as proper
- // Multiaddresses.
- // - Some(vec![a, b, c, ...]): Valid addresses were specified.
- sentry_nodes: Option>,
- /// Channel we receive Dht events on.
+ sentry_nodes: Vec,
dht_event_rx: Pin + Send>>,
-
- /// Interval to be proactive, publishing own addresses.
- publish_interval: Interval,
- /// Interval on which to query for addresses of other authorities.
- query_interval: Interval,
-
- addr_cache: addr_cache::AddrCache,
-
- metrics: Option,
-
role: Role,
-
- phantom: PhantomData,
-}
-
-impl AuthorityDiscovery
-where
- Block: BlockT + Unpin + 'static,
- Network: NetworkProvider,
- Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend,
- >::Api:
- AuthorityDiscoveryApi,
- Self: Future,
-{
- /// Return a new authority discovery.
- ///
- /// Note: When specifying `sentry_nodes` this module will not advertise the public addresses of
- /// the node itself but only the public addresses of its sentry nodes.
- pub fn new(
- client: Arc,
- network: Arc,
- sentry_nodes: Vec,
- dht_event_rx: Pin + Send>>,
- role: Role,
- prometheus_registry: Option,
- ) -> Self {
- // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h.
- // Given that a node could restart at any point in time, one can not depend on the
- // republishing process, thus publishing own external addresses should happen on an interval
- // < 36h.
- let publish_interval = interval_at(
- Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME,
- Duration::from_secs(12 * 60 * 60),
- );
-
- // External addresses of other authorities can change at any given point in time. The
- // interval on which to query for external addresses of other authorities is a trade off
- // between efficiency and performance.
- let query_interval = interval_at(
- Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME,
- Duration::from_secs(10 * 60),
- );
-
- let sentry_nodes = if !sentry_nodes.is_empty() {
- Some(sentry_nodes.into_iter().map(|ma| ma.concat()).collect::>())
- } else {
- None
- };
-
- let addr_cache = AddrCache::new();
-
- let metrics = match prometheus_registry {
- Some(registry) => {
- match Metrics::register(®istry) {
- Ok(metrics) => Some(metrics),
- Err(e) => {
- error!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
- None
- },
- }
- },
- None => None,
- };
-
- AuthorityDiscovery {
- client,
- network,
- sentry_nodes,
- dht_event_rx,
- publish_interval,
- query_interval,
- addr_cache,
- role,
- metrics,
- phantom: PhantomData,
- }
- }
-
- /// Publish either our own or if specified the public addresses of our sentry nodes.
- fn publish_ext_addresses(&mut self) -> Result<()> {
- let key_store = match &self.role {
- Role::Authority(key_store) => key_store,
- // Only authority nodes can put addresses (their own or the ones of their sentry nodes)
- // on the Dht. Sentry nodes don't have a known identity to authenticate such addresses,
- // thus `publish_ext_addresses` becomes a no-op.
- Role::Sentry => return Ok(()),
- };
-
- if let Some(metrics) = &self.metrics {
- metrics.publish.inc()
- }
-
- let addresses: Vec<_> = match &self.sentry_nodes {
- Some(addrs) => addrs.clone().into_iter()
- .map(|a| a.to_vec())
- .collect(),
- None => self.network.external_addresses()
- .into_iter()
- .map(|a| a.with(multiaddr::Protocol::P2p(
- self.network.local_peer_id().into(),
- )))
- .map(|a| a.to_vec())
- .collect(),
- };
-
- if let Some(metrics) = &self.metrics {
- metrics.amount_last_published.set(addresses.len() as u64);
- }
-
- let mut serialized_addresses = vec![];
- schema::AuthorityAddresses { addresses }
- .encode(&mut serialized_addresses)
- .map_err(Error::EncodingProto)?;
-
- let keys = AuthorityDiscovery::get_own_public_keys_within_authority_set(
- &key_store,
- &self.client,
- )?.into_iter().map(Into::into).collect::>();
-
- let signatures = key_store.read()
- .sign_with_all(
- key_types::AUTHORITY_DISCOVERY,
- keys.clone(),
- serialized_addresses.as_slice(),
- )
- .map_err(|_| Error::Signing)?;
-
- for (sign_result, key) in signatures.into_iter().zip(keys) {
- let mut signed_addresses = vec![];
-
- // sign_with_all returns Result signature
- // is generated for a public key that is supported.
- // Verify that all signatures exist for all provided keys.
- let signature = sign_result.map_err(|_| Error::MissingSignature(key.clone()))?;
- schema::SignedAuthorityAddresses {
- addresses: serialized_addresses.clone(),
- signature,
- }
- .encode(&mut signed_addresses)
- .map_err(Error::EncodingProto)?;
-
- self.network.put_value(
- hash_authority_id(key.1.as_ref()),
- signed_addresses,
- );
- }
-
- Ok(())
- }
-
- fn request_addresses_of_others(&mut self) -> Result<()> {
- let id = BlockId::hash(self.client.info().best_hash);
-
- let authorities = self
- .client
- .runtime_api()
- .authorities(&id)
- .map_err(Error::CallingRuntime)?;
-
- let local_keys = match &self.role {
- Role::Authority(key_store) => {
- key_store.read()
- .sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
- .into_iter()
- .collect::>()
- },
- Role::Sentry => HashSet::new(),
- };
-
- for authority_id in authorities.iter() {
- // Make sure we don't look up our own keys.
- if !local_keys.contains(authority_id.as_ref()) {
- if let Some(metrics) = &self.metrics {
- metrics.request.inc();
- }
-
- self.network
- .get_value(&hash_authority_id(authority_id.as_ref()));
- }
- }
-
- Ok(())
- }
-
- /// Handle incoming Dht events.
- ///
- /// Returns either:
- /// - Poll::Pending when there are no more events to handle or
- /// - Poll::Ready(()) when the dht event stream terminated.
- fn handle_dht_events(&mut self, cx: &mut Context) -> Poll<()>{
- loop {
- match ready!(self.dht_event_rx.poll_next_unpin(cx)) {
- Some(DhtEvent::ValueFound(v)) => {
- if let Some(metrics) = &self.metrics {
- metrics.dht_event_received.with_label_values(&["value_found"]).inc();
- }
-
- if log_enabled!(log::Level::Debug) {
- let hashes = v.iter().map(|(hash, _value)| hash.clone());
- debug!(
- target: LOG_TARGET,
- "Value for hash '{:?}' found on Dht.", hashes,
- );
- }
-
- if let Err(e) = self.handle_dht_value_found_event(v) {
- if let Some(metrics) = &self.metrics {
- metrics.handle_value_found_event_failure.inc();
- }
-
- debug!(
- target: LOG_TARGET,
- "Failed to handle Dht value found event: {:?}", e,
- );
- }
- }
- Some(DhtEvent::ValueNotFound(hash)) => {
- if let Some(metrics) = &self.metrics {
- metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
- }
-
- debug!(
- target: LOG_TARGET,
- "Value for hash '{:?}' not found on Dht.", hash
- )
- },
- Some(DhtEvent::ValuePut(hash)) => {
- if let Some(metrics) = &self.metrics {
- metrics.dht_event_received.with_label_values(&["value_put"]).inc();
- }
-
- debug!(
- target: LOG_TARGET,
- "Successfully put hash '{:?}' on Dht.", hash,
- )
- },
- Some(DhtEvent::ValuePutFailed(hash)) => {
- if let Some(metrics) = &self.metrics {
- metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
- }
-
- debug!(
- target: LOG_TARGET,
- "Failed to put hash '{:?}' on Dht.", hash
- )
- },
- None => {
- debug!(target: LOG_TARGET, "Dht event stream terminated.");
- return Poll::Ready(());
- },
- }
- }
- }
-
- fn handle_dht_value_found_event(
- &mut self,
- values: Vec<(libp2p::kad::record::Key, Vec)>,
- ) -> Result<()> {
- // Ensure `values` is not empty and all its keys equal.
- let remote_key = values.iter().fold(Ok(None), |acc, (key, _)| {
- match acc {
- Ok(None) => Ok(Some(key.clone())),
- Ok(Some(ref prev_key)) if prev_key != key => Err(
- Error::ReceivingDhtValueFoundEventWithDifferentKeys
- ),
- x @ Ok(_) => x,
- Err(e) => Err(e),
- }
- })?.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?;
-
- let authorities = {
- let block_id = BlockId::hash(self.client.info().best_hash);
- // From the Dht we only get the hashed authority id. In order to retrieve the actual
- // authority id and to ensure it is actually an authority, we match the hash against the
- // hash of the authority id of all other authorities.
- let authorities = self.client.runtime_api().authorities(&block_id)?;
- self.addr_cache.retain_ids(&authorities);
- authorities
- .into_iter()
- .map(|id| (hash_authority_id(id.as_ref()), id))
- .collect::>()
- };
-
- // Check if the event origins from an authority in the current authority set.
- let authority_id: &AuthorityId = authorities
- .get(&remote_key)
- .ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?;
-
- let local_peer_id = self.network.local_peer_id();
-
- let remote_addresses: Vec = values.into_iter()
- .map(|(_k, v)| {
- let schema::SignedAuthorityAddresses { signature, addresses } =
- schema::SignedAuthorityAddresses::decode(v.as_slice())
- .map_err(Error::DecodingProto)?;
-
- let signature = AuthoritySignature::decode(&mut &signature[..])
- .map_err(Error::EncodingDecodingScale)?;
-
- if !AuthorityPair::verify(&signature, &addresses, authority_id) {
- return Err(Error::VerifyingDhtPayload);
- }
-
- let addresses = schema::AuthorityAddresses::decode(addresses.as_slice())
- .map(|a| a.addresses)
- .map_err(Error::DecodingProto)?
- .into_iter()
- .map(|a| a.try_into())
- .collect::>()
- .map_err(Error::ParsingMultiaddress)?;
-
- Ok(addresses)
- })
- .collect::>>>()?
- .into_iter()
- .flatten()
- // Ignore own addresses.
- .filter(|addr| !addr.iter().any(|protocol| {
- // Parse to PeerId first as Multihashes of old and new PeerId
- // representation don't equal.
- //
- // See https://github.com/libp2p/rust-libp2p/issues/555 for
- // details.
- if let multiaddr::Protocol::P2p(hash) = protocol {
- let peer_id = match PeerId::from_multihash(hash) {
- Ok(peer_id) => peer_id,
- Err(_) => return true, // Discard address.
- };
-
- return peer_id == local_peer_id;
- }
-
- false // Multiaddr does not contain a PeerId.
- }))
- .collect();
-
- if !remote_addresses.is_empty() {
- self.addr_cache.insert(authority_id.clone(), remote_addresses);
- if let Some(metrics) = &self.metrics {
- metrics.known_authorities_count.set(
- self.addr_cache.num_ids().try_into().unwrap_or(std::u64::MAX)
- );
- }
- self.update_peer_set_priority_group()?;
- }
-
- Ok(())
- }
-
- /// Retrieve our public keys within the current authority set.
- //
- // A node might have multiple authority discovery keys within its keystore, e.g. an old one and
- // one for the upcoming session. In addition it could be participating in the current authority
- // set with two keys. The function does not return all of the local authority discovery public
- // keys, but only the ones intersecting with the current authority set.
- fn get_own_public_keys_within_authority_set(
- key_store: &BareCryptoStorePtr,
- client: &Client,
- ) -> Result> {
- let local_pub_keys = key_store.read()
- .sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
- .into_iter()
- .collect::>();
-
- let id = BlockId::hash(client.info().best_hash);
- let current_authorities = client.runtime_api()
- .authorities(&id)
- .map_err(Error::CallingRuntime)?
- .into_iter()
- .map(std::convert::Into::into)
- .collect::>();
-
- let intersection = local_pub_keys.intersection(¤t_authorities)
- .cloned()
- .map(std::convert::Into::into)
- .collect();
-
- Ok(intersection)
- }
-
- /// Update the peer set 'authority' priority group.
- fn update_peer_set_priority_group(&self) -> Result<()> {
- let addresses = self.addr_cache.get_subset();
-
- if let Some(metrics) = &self.metrics {
- metrics.priority_group_size.set(addresses.len().try_into().unwrap_or(std::u64::MAX));
- }
-
- debug!(
- target: LOG_TARGET,
- "Applying priority group {:?} to peerset.", addresses,
- );
- self.network
- .set_priority_group(
- AUTHORITIES_PRIORITY_GROUP_NAME.to_string(),
- addresses.into_iter().collect(),
- )
- .map_err(Error::SettingPeersetPriorityGroup)?;
-
- Ok(())
- }
-}
-
-impl Future for AuthorityDiscovery
+ prometheus_registry: Option,
+) -> (Worker, Service)
where
Block: BlockT + Unpin + 'static,
Network: NetworkProvider,
Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend,
- >::Api:
- AuthorityDiscoveryApi,
-{
- type Output = ();
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll {
- // Process incoming events.
- if let Poll::Ready(()) = self.handle_dht_events(cx) {
- // `handle_dht_events` returns `Poll::Ready(())` when the Dht event stream terminated.
- // Termination of the Dht event stream implies that the underlying network terminated,
- // thus authority discovery should terminate as well.
- return Poll::Ready(());
- }
-
-
- // Publish own addresses.
- if let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {
- // Register waker of underlying task for next interval.
- while let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {}
-
- if let Err(e) = self.publish_ext_addresses() {
- error!(
- target: LOG_TARGET,
- "Failed to publish external addresses: {:?}", e,
- );
- }
- }
-
- // Request addresses of authorities.
- if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {
- // Register waker of underlying task for next interval.
- while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {}
-
- if let Err(e) = self.request_addresses_of_others() {
- error!(
- target: LOG_TARGET,
- "Failed to request addresses of authorities: {:?}", e,
- );
- }
- }
-
- Poll::Pending
- }
-}
-
-/// NetworkProvider provides AuthorityDiscovery with all necessary hooks into the underlying
-/// Substrate networking. Using this trait abstraction instead of NetworkService directly is
-/// necessary to unit test AuthorityDiscovery.
-pub trait NetworkProvider: NetworkStateInfo {
- /// Modify a peerset priority group.
- fn set_priority_group(
- &self,
- group_id: String,
- peers: HashSet,
- ) -> std::result::Result<(), String>;
-
- /// Start putting a value in the Dht.
- fn put_value(&self, key: libp2p::kad::record::Key, value: Vec);
-
- /// Start getting a value from the Dht.
- fn get_value(&self, key: &libp2p::kad::record::Key);
-}
-
-impl NetworkProvider for sc_network::NetworkService
-where
- B: BlockT + 'static,
- H: ExHashT,
+ >::Api: AuthorityDiscoveryApi,
{
- fn set_priority_group(
- &self,
- group_id: String,
- peers: HashSet,
- ) -> std::result::Result<(), String> {
- self.set_priority_group(group_id, peers)
- }
- fn put_value(&self, key: libp2p::kad::record::Key, value: Vec) {
- self.put_value(key, value)
- }
- fn get_value(&self, key: &libp2p::kad::record::Key) {
- self.get_value(key)
- }
-}
-
-fn hash_authority_id(id: &[u8]) -> libp2p::kad::record::Key {
- libp2p::kad::record::Key::new(&libp2p::multihash::Sha2_256::digest(id))
-}
+ let (to_worker, from_service) = mpsc::channel(0);
-fn interval_at(start: Instant, duration: Duration) -> Interval {
- let stream = futures::stream::unfold(start, move |next| {
- let time_until_next = next.saturating_duration_since(Instant::now());
-
- Delay::new(time_until_next).map(move |_| Some(((), next + duration)))
- });
-
- Box::new(stream)
-}
+ let worker = Worker::new(
+ from_service, client, network, sentry_nodes, dht_event_rx, role, prometheus_registry,
+ );
+ let service = Service::new(to_worker);
-/// Prometheus metrics for an `AuthorityDiscovery`.
-#[derive(Clone)]
-pub(crate) struct Metrics {
- publish: Counter,
- amount_last_published: Gauge,
- request: Counter,
- dht_event_received: CounterVec,
- handle_value_found_event_failure: Counter,
- known_authorities_count: Gauge,
- priority_group_size: Gauge,
+ (worker, service)
}
-impl Metrics {
- pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result {
- Ok(Self {
- publish: register(
- Counter::new(
- "authority_discovery_times_published_total",
- "Number of times authority discovery has published external addresses."
- )?,
- registry,
- )?,
- amount_last_published: register(
- Gauge::new(
- "authority_discovery_amount_external_addresses_last_published",
- "Number of external addresses published when authority discovery last \
- published addresses."
- )?,
- registry,
- )?,
- request: register(
- Counter::new(
- "authority_discovery_authority_addresses_requested_total",
- "Number of times authority discovery has requested external addresses of a \
- single authority."
- )?,
- registry,
- )?,
- dht_event_received: register(
- CounterVec::new(
- Opts::new(
- "authority_discovery_dht_event_received",
- "Number of dht events received by authority discovery."
- ),
- &["name"],
- )?,
- registry,
- )?,
- handle_value_found_event_failure: register(
- Counter::new(
- "authority_discovery_handle_value_found_event_failure",
- "Number of times handling a dht value found event failed."
- )?,
- registry,
- )?,
- known_authorities_count: register(
- Gauge::new(
- "authority_discovery_known_authorities_count",
- "Number of authorities known by authority discovery."
- )?,
- registry,
- )?,
- priority_group_size: register(
- Gauge::new(
- "authority_discovery_priority_group_size",
- "Number of addresses passed to the peer set as a priority group."
- )?,
- registry,
- )?,
- })
- }
+/// Message send from the [`Service`] to the [`Worker`].
+pub(crate) enum ServicetoWorkerMsg {
+ /// See [`Service::get_addresses_by_authority_id`].
+ GetAddressesByAuthorityId(AuthorityId, oneshot::Sender>>),
+ /// See [`Service::get_authority_id_by_peer_id`].
+ GetAuthorityIdByPeerId(PeerId, oneshot::Sender >)
}
diff --git a/client/authority-discovery/src/service.rs b/client/authority-discovery/src/service.rs
new file mode 100644
index 0000000000000..01fb7134fb5d1
--- /dev/null
+++ b/client/authority-discovery/src/service.rs
@@ -0,0 +1,70 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Substrate. If not, see .
+
+use crate::ServicetoWorkerMsg;
+
+use futures::channel::{mpsc, oneshot};
+use futures::SinkExt;
+
+use sc_network::{Multiaddr, PeerId};
+use sp_authority_discovery::AuthorityId;
+
+/// Service to interact with the [`Worker`].
+#[derive(Clone)]
+pub struct Service {
+ to_worker: mpsc::Sender,
+}
+
+/// A [`Service`] allows to interact with a [`Worker`], e.g. by querying the
+/// [`Worker`]'s local address cache for a given [`AuthorityId`].
+impl Service {
+ pub(crate) fn new(to_worker: mpsc::Sender) -> Self {
+ Self {
+ to_worker,
+ }
+ }
+
+ /// Get the addresses for the given [`AuthorityId`] from the local address cache.
+ ///
+ /// Returns `None` if no entry was present or connection to the [`crate::Worker`] failed.
+ ///
+ /// [`Multiaddr`]s returned always include a [`libp2p::core::multiaddr:Protocol::P2p`]
+ /// component.
+ pub async fn get_addresses_by_authority_id(&mut self, authority: AuthorityId) -> Option> {
+ let (tx, rx) = oneshot::channel();
+
+ self.to_worker
+ .send(ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, tx))
+ .await
+ .ok()?;
+
+ rx.await.ok().flatten()
+ }
+
+ /// Get the [`AuthorityId`] for the given [`PeerId`] from the local address cache.
+ ///
+ /// Returns `None` if no entry was present or connection to the [`crate::Worker`] failed.
+ pub async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option {
+ let (tx, rx) = oneshot::channel();
+
+ self.to_worker
+ .send(ServicetoWorkerMsg::GetAuthorityIdByPeerId(peer_id, tx))
+ .await
+ .ok()?;
+
+ rx.await.ok().flatten()
+ }
+}
diff --git a/client/authority-discovery/src/tests.rs b/client/authority-discovery/src/tests.rs
index 09a65fd138c11..8e7367f2f7885 100644
--- a/client/authority-discovery/src/tests.rs
+++ b/client/authority-discovery/src/tests.rs
@@ -16,315 +16,42 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
-use std::{iter::FromIterator, sync::{Arc, Mutex}};
+use crate::{new_worker_and_service, worker::{tests::{TestApi, TestNetwork}, Role}};
+use std::sync::Arc;
+
+use futures::prelude::*;
use futures::channel::mpsc::channel;
-use futures::executor::{block_on, LocalPool};
-use futures::future::{poll_fn, FutureExt};
-use futures::sink::SinkExt;
+use futures::executor::LocalPool;
use futures::task::LocalSpawn;
-use futures::poll;
-use libp2p::{kad, core::multiaddr, PeerId};
-
-use sp_api::{ProvideRuntimeApi, ApiRef};
-use sp_core::{crypto::Public, testing::KeyStore};
-use sp_runtime::traits::{Zero, Block as BlockT, NumberFor};
-use substrate_test_runtime_client::runtime::Block;
-
-use super::*;
-
-#[test]
-fn interval_at_with_start_now() {
- let start = Instant::now();
-
- let mut interval = interval_at(
- std::time::Instant::now(),
- std::time::Duration::from_secs(10),
- );
-
- futures::executor::block_on(async {
- interval.next().await;
- });
-
- assert!(
- Instant::now().saturating_duration_since(start) < Duration::from_secs(1),
- "Expected low resolution instant interval to fire within less than a second.",
- );
-}
+use libp2p::core::{multiaddr::{Multiaddr, Protocol}, PeerId};
-#[test]
-fn interval_at_is_queuing_ticks() {
- let start = Instant::now();
-
- let interval = interval_at(start, std::time::Duration::from_millis(100));
-
- // Let's wait for 200ms, thus 3 elements should be queued up (1st at 0ms, 2nd at 100ms, 3rd
- // at 200ms).
- std::thread::sleep(Duration::from_millis(200));
-
- futures::executor::block_on(async {
- interval.take(3).collect::>().await;
- });
-
- // Make sure we did not wait for more than 300 ms, which would imply that `at_interval` is
- // not queuing ticks.
- assert!(
- Instant::now().saturating_duration_since(start) < Duration::from_millis(300),
- "Expect interval to /queue/ events when not polled for a while.",
- );
-}
+use sp_authority_discovery::AuthorityId;
+use sp_core::crypto::key_types;
+use sp_core::testing::KeyStore;
#[test]
-fn interval_at_with_initial_delay() {
- let start = Instant::now();
-
- let mut interval = interval_at(
- std::time::Instant::now() + Duration::from_millis(100),
- std::time::Duration::from_secs(10),
- );
-
- futures::executor::block_on(async {
- interval.next().await;
- });
-
- assert!(
- Instant::now().saturating_duration_since(start) > Duration::from_millis(100),
- "Expected interval with initial delay not to fire right away.",
- );
-}
-
-#[derive(Clone)]
-struct TestApi {
- authorities: Vec,
-}
-
-impl ProvideRuntimeApi for TestApi {
- type Api = RuntimeApi;
-
- fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> {
- RuntimeApi {
- authorities: self.authorities.clone(),
- }.into()
- }
-}
-
-/// Blockchain database header backend. Does not perform any validation.
-impl HeaderBackend for TestApi {
- fn header(
- &self,
- _id: BlockId,
- ) -> std::result::Result, sp_blockchain::Error> {
- Ok(None)
- }
-
- fn info(&self) -> sc_client_api::blockchain::Info {
- sc_client_api::blockchain::Info {
- best_hash: Default::default(),
- best_number: Zero::zero(),
- finalized_hash: Default::default(),
- finalized_number: Zero::zero(),
- genesis_hash: Default::default(),
- number_leaves: Default::default(),
- }
- }
-
- fn status(
- &self,
- _id: BlockId,
- ) -> std::result::Result {
- Ok(sc_client_api::blockchain::BlockStatus::Unknown)
- }
-
- fn number(
- &self,
- _hash: Block::Hash,
- ) -> std::result::Result>, sp_blockchain::Error> {
- Ok(None)
- }
-
- fn hash(
- &self,
- _number: NumberFor,
- ) -> std::result::Result, sp_blockchain::Error> {
- Ok(None)
- }
-}
-
-struct RuntimeApi {
- authorities: Vec,
-}
-
-sp_api::mock_impl_runtime_apis! {
- impl AuthorityDiscoveryApi for RuntimeApi {
- type Error = sp_blockchain::Error;
-
- fn authorities(&self) -> Vec {
- self.authorities.clone()
- }
- }
-}
-
-struct TestNetwork {
- peer_id: PeerId,
- // Whenever functions on `TestNetwork` are called, the function arguments are added to the
- // vectors below.
- pub put_value_call: Arc)>>>,
- pub get_value_call: Arc>>,
- pub set_priority_group_call: Arc)>>>,
-}
-
-impl Default for TestNetwork {
- fn default() -> Self {
- TestNetwork {
- peer_id: PeerId::random(),
- put_value_call: Default::default(),
- get_value_call: Default::default(),
- set_priority_group_call: Default::default(),
- }
- }
-}
-
-impl NetworkProvider for TestNetwork {
- fn set_priority_group(
- &self,
- group_id: String,
- peers: HashSet,
- ) -> std::result::Result<(), String> {
- self.set_priority_group_call
- .lock()
- .unwrap()
- .push((group_id, peers));
- Ok(())
- }
- fn put_value(&self, key: kad::record::Key, value: Vec) {
- self.put_value_call.lock().unwrap().push((key, value));
- }
- fn get_value(&self, key: &kad::record::Key) {
- self.get_value_call.lock().unwrap().push(key.clone());
- }
-}
-
-impl NetworkStateInfo for TestNetwork {
- fn local_peer_id(&self) -> PeerId {
- self.peer_id.clone()
- }
-
- fn external_addresses(&self) -> Vec {
- vec!["/ip6/2001:db8::/tcp/30333".parse().unwrap()]
- }
-}
-
-#[test]
-fn new_registers_metrics() {
- let (_dht_event_tx, dht_event_rx) = channel(1000);
+fn get_addresses_and_authority_id() {
+ let (_dht_event_tx, dht_event_rx) = channel(0);
let network: Arc = Arc::new(Default::default());
- let key_store = KeyStore::new();
- let test_api = Arc::new(TestApi {
- authorities: vec![],
- });
-
- let registry = prometheus_endpoint::Registry::new();
-
- AuthorityDiscovery::new(
- test_api,
- network.clone(),
- vec![],
- dht_event_rx.boxed(),
- Role::Authority(key_store),
- Some(registry.clone()),
- );
-
- assert!(registry.gather().len() > 0);
-}
-
-#[test]
-fn request_addresses_of_others_triggers_dht_get_query() {
- let _ = ::env_logger::try_init();
- let (_dht_event_tx, dht_event_rx) = channel(1000);
- // Generate authority keys
- let authority_1_key_pair = AuthorityPair::from_seed_slice(&[1; 32]).unwrap();
- let authority_2_key_pair = AuthorityPair::from_seed_slice(&[2; 32]).unwrap();
-
- let test_api = Arc::new(TestApi {
- authorities: vec![authority_1_key_pair.public(), authority_2_key_pair.public()],
- });
-
- let network: Arc = Arc::new(Default::default());
let key_store = KeyStore::new();
-
- let mut authority_discovery = AuthorityDiscovery::new(
- test_api,
- network.clone(),
- vec![],
- dht_event_rx.boxed(),
- Role::Authority(key_store),
- None,
- );
-
- authority_discovery.request_addresses_of_others().unwrap();
-
- // Expect authority discovery to request new records from the dht.
- assert_eq!(network.get_value_call.lock().unwrap().len(), 2);
-}
-
-#[test]
-fn publish_discover_cycle() {
- let _ = ::env_logger::try_init();
-
- // Node A publishing its address.
-
- let (_dht_event_tx, dht_event_rx) = channel(1000);
-
- let network: Arc = Arc::new(Default::default());
- let node_a_multiaddr = {
- let peer_id = network.local_peer_id();
- let address = network.external_addresses().pop().unwrap();
-
- address.with(multiaddr::Protocol::P2p(
- peer_id.into(),
- ))
- };
-
- let key_store = KeyStore::new();
- let node_a_public = key_store
+ let remote_authority_id: AuthorityId = key_store
.write()
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
- .unwrap();
- let test_api = Arc::new(TestApi {
- authorities: vec![node_a_public.into()],
- });
-
- let mut authority_discovery = AuthorityDiscovery::new(
- test_api,
- network.clone(),
- vec![],
- dht_event_rx.boxed(),
- Role::Authority(key_store),
- None,
- );
+ .unwrap()
+ .into();
- authority_discovery.publish_ext_addresses().unwrap();
+ let remote_peer_id = PeerId::random();
+ let remote_addr = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333".parse::()
+ .unwrap()
+ .with(Protocol::P2p(remote_peer_id.clone().into()));
- // Expect authority discovery to put a new record onto the dht.
- assert_eq!(network.put_value_call.lock().unwrap().len(), 1);
-
- let dht_event = {
- let (key, value) = network.put_value_call.lock().unwrap().pop().unwrap();
- sc_network::DhtEvent::ValueFound(vec![(key, value)])
- };
-
- // Node B discovering node A's address.
-
- let (mut dht_event_tx, dht_event_rx) = channel(1000);
let test_api = Arc::new(TestApi {
- // Make sure node B identifies node A as an authority.
- authorities: vec![node_a_public.into()],
+ authorities: vec![],
});
- let network: Arc = Arc::new(Default::default());
- let key_store = KeyStore::new();
- let mut authority_discovery = AuthorityDiscovery::new(
+ let (mut worker, mut service) = new_worker_and_service(
test_api,
network.clone(),
vec![],
@@ -333,230 +60,19 @@ fn publish_discover_cycle() {
None,
);
- dht_event_tx.try_send(dht_event).unwrap();
+ worker.inject_addresses(remote_authority_id.clone(), vec![remote_addr.clone()]);
- let f = |cx: &mut Context<'_>| -> Poll<()> {
- // Make authority discovery handle the event.
- if let Poll::Ready(e) = authority_discovery.handle_dht_events(cx) {
- panic!("Unexpected error: {:?}", e);
- }
-
- // Expect authority discovery to set the priority set.
- assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1);
+ let mut pool = LocalPool::new();
+ pool.spawner().spawn_local_obj(Box::pin(worker).into()).unwrap();
+ pool.run_until(async {
assert_eq!(
- network.set_priority_group_call.lock().unwrap()[0],
- (
- "authorities".to_string(),
- HashSet::from_iter(vec![node_a_multiaddr.clone()].into_iter())
- )
+ Some(vec![remote_addr]),
+ service.get_addresses_by_authority_id(remote_authority_id.clone()).await,
);
-
- Poll::Ready(())
- };
-
- let _ = block_on(poll_fn(f));
-}
-
-#[test]
-fn terminate_when_event_stream_terminates() {
- let (dht_event_tx, dht_event_rx) = channel(1000);
- let network: Arc = Arc::new(Default::default());
- let key_store = KeyStore::new();
- let test_api = Arc::new(TestApi {
- authorities: vec![],
- });
-
- let mut authority_discovery = AuthorityDiscovery::new(
- test_api,
- network.clone(),
- vec![],
- dht_event_rx.boxed(),
- Role::Authority(key_store),
- None,
- );
-
- block_on(async {
- assert_eq!(Poll::Pending, poll!(&mut authority_discovery));
-
- // Simulate termination of the network through dropping the sender side of the dht event
- // channel.
- drop(dht_event_tx);
-
assert_eq!(
- Poll::Ready(()), poll!(&mut authority_discovery),
- "Expect the authority discovery module to terminate once the sending side of the dht \
- event channel is terminated.",
+ Some(remote_authority_id),
+ service.get_authority_id_by_peer_id(remote_peer_id).await,
);
});
}
-
-#[test]
-fn dont_stop_polling_when_error_is_returned() {
- #[derive(PartialEq, Debug)]
- enum Event {
- Processed,
- End,
- };
-
- let (mut dht_event_tx, dht_event_rx) = channel(1000);
- let (mut discovery_update_tx, mut discovery_update_rx) = channel(1000);
- let network: Arc = Arc::new(Default::default());
- let key_store = KeyStore::new();
- let test_api = Arc::new(TestApi {
- authorities: vec![],
- });
- let mut pool = LocalPool::new();
-
- let mut authority_discovery = AuthorityDiscovery::new(
- test_api,
- network.clone(),
- vec![],
- dht_event_rx.boxed(),
- Role::Authority(key_store),
- None,
- );
-
- // Spawn the authority discovery to make sure it is polled independently.
- //
- // As this is a local pool, only one future at a time will have the CPU and
- // can make progress until the future returns `Pending`.
- pool.spawner().spawn_local_obj(
- futures::future::poll_fn(move |ctx| {
- match std::pin::Pin::new(&mut authority_discovery).poll(ctx) {
- Poll::Ready(()) => {},
- Poll::Pending => {
- discovery_update_tx.send(Event::Processed).now_or_never();
- return Poll::Pending;
- },
- }
- let _ = discovery_update_tx.send(Event::End).now_or_never().unwrap();
- Poll::Ready(())
- }).boxed_local().into(),
- ).expect("Spawns authority discovery");
-
- pool.run_until(
- // The future that drives the event stream
- async {
- // Send an event that should generate an error
- let _ = dht_event_tx.send(DhtEvent::ValueFound(Default::default())).now_or_never();
- // Send the same event again to make sure that the event stream needs to be polled twice
- // to be woken up again.
- let _ = dht_event_tx.send(DhtEvent::ValueFound(Default::default())).now_or_never();
-
- // Now we call `await` and give the control to the authority discovery future.
- assert_eq!(Some(Event::Processed), discovery_update_rx.next().await);
-
- // Drop the event rx to stop the authority discovery. If it was polled correctly, it
- // should end properly.
- drop(dht_event_tx);
-
- assert!(
- discovery_update_rx.collect::>()
- .await
- .into_iter()
- .any(|evt| evt == Event::End),
- "The authority discovery should have ended",
- );
- }
- );
-}
-
-/// In the scenario of a validator publishing the address of its sentry node to
-/// the DHT, said sentry node should not add its own Multiaddr to the
-/// peerset "authority" priority group.
-#[test]
-fn never_add_own_address_to_priority_group() {
- let validator_key_store = KeyStore::new();
- let validator_public = validator_key_store
- .write()
- .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
- .unwrap();
-
- let sentry_network: Arc = Arc::new(Default::default());
-
- let sentry_multiaddr = {
- let peer_id = sentry_network.local_peer_id();
- let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333".parse().unwrap();
-
- address.with(multiaddr::Protocol::P2p(
- peer_id.into(),
- ))
- };
-
- // Address of some other sentry node of `validator`.
- let random_multiaddr = {
- let peer_id = PeerId::random();
- let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap();
-
- address.with(multiaddr::Protocol::P2p(
- peer_id.into(),
- ))
- };
-
- let dht_event = {
- let addresses = vec![
- sentry_multiaddr.to_vec(),
- random_multiaddr.to_vec(),
- ];
-
- let mut serialized_addresses = vec![];
- schema::AuthorityAddresses { addresses }
- .encode(&mut serialized_addresses)
- .map_err(Error::EncodingProto)
- .unwrap();
-
- let signature = validator_key_store.read()
- .sign_with(
- key_types::AUTHORITY_DISCOVERY,
- &validator_public.clone().into(),
- serialized_addresses.as_slice(),
- )
- .map_err(|_| Error::Signing)
- .unwrap();
-
- let mut signed_addresses = vec![];
- schema::SignedAuthorityAddresses {
- addresses: serialized_addresses.clone(),
- signature,
- }
- .encode(&mut signed_addresses)
- .map_err(Error::EncodingProto)
- .unwrap();
-
- let key = hash_authority_id(&validator_public.to_raw_vec());
- let value = signed_addresses;
- (key, value)
- };
-
- let (_dht_event_tx, dht_event_rx) = channel(1);
- let sentry_test_api = Arc::new(TestApi {
- // Make sure the sentry node identifies its validator as an authority.
- authorities: vec![validator_public.into()],
- });
-
- let mut sentry_authority_discovery = AuthorityDiscovery::new(
- sentry_test_api,
- sentry_network.clone(),
- vec![],
- dht_event_rx.boxed(),
- Role::Sentry,
- None,
- );
-
- sentry_authority_discovery.handle_dht_value_found_event(vec![dht_event]).unwrap();
-
- assert_eq!(
- sentry_network.set_priority_group_call.lock().unwrap().len(), 1,
- "Expect authority discovery to set the priority set.",
- );
-
- assert_eq!(
- sentry_network.set_priority_group_call.lock().unwrap()[0],
- (
- "authorities".to_string(),
- HashSet::from_iter(vec![random_multiaddr.clone()].into_iter(),)
- ),
- "Expect authority discovery to only add `random_multiaddr`."
- );
-}
diff --git a/client/authority-discovery/src/worker.rs b/client/authority-discovery/src/worker.rs
new file mode 100644
index 0000000000000..dd13b89278e2d
--- /dev/null
+++ b/client/authority-discovery/src/worker.rs
@@ -0,0 +1,785 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Substrate. If not, see .
+
+use crate::{error::{Error, Result}, ServicetoWorkerMsg};
+
+use std::collections::{HashMap, HashSet};
+use std::convert::TryInto;
+use std::marker::PhantomData;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use futures::channel::mpsc;
+use futures::task::{Context, Poll};
+use futures::{Future, FutureExt, ready, Stream, StreamExt, stream::Fuse};
+use futures_timer::Delay;
+
+use addr_cache::AddrCache;
+use codec::Decode;
+use libp2p::core::multiaddr;
+use log::{debug, error, log_enabled};
+use prometheus_endpoint::{Counter, CounterVec, Gauge, Opts, U64, register};
+use prost::Message;
+use sc_client_api::blockchain::HeaderBackend;
+use sc_network::{
+ config::MultiaddrWithPeerId,
+ DhtEvent,
+ ExHashT,
+ Multiaddr,
+ NetworkStateInfo,
+ PeerId,
+};
+use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair};
+use sp_core::crypto::{key_types, Pair};
+use sp_core::traits::BareCryptoStorePtr;
+use sp_runtime::{traits::Block as BlockT, generic::BlockId};
+use sp_api::ProvideRuntimeApi;
+
+mod addr_cache;
+/// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs.
+mod schema { include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); }
+#[cfg(test)]
+pub mod tests;
+
+type Interval = Box + Unpin + Send + Sync>;
+
+const LOG_TARGET: &'static str = "sub-authority-discovery";
+
+/// Upper bound estimation on how long one should wait before accessing the Kademlia DHT.
+const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30);
+
+/// Name of the Substrate peerset priority group for authorities discovered through the authority
+/// discovery module.
+const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities";
+
+/// Role an authority discovery module can run as.
+pub enum Role {
+ /// Actual authority as well as a reference to its key store.
+ Authority(BareCryptoStorePtr),
+ /// Sentry node that guards an authority.
+ ///
+ /// No reference to its key store needed, as sentry nodes don't have an identity to sign
+ /// addresses with in the first place.
+ Sentry,
+}
+
+/// A [`Worker`] makes a given authority discoverable and discovers other
+/// authorities.
+///
+/// The [`Worker`] implements the Future trait. By
+/// polling [`Worker`] an authority:
+///
+/// 1. **Makes itself discoverable**
+///
+/// 1. Retrieves its external addresses (including peer id) or the ones of
+/// its sentry nodes.
+///
+/// 2. Signs the above.
+///
+/// 3. Puts the signature and the addresses on the libp2p Kademlia DHT.
+///
+///
+/// 2. **Discovers other authorities**
+///
+/// 1. Retrieves the current set of authorities.
+///
+/// 2. Starts DHT queries for the ids of the authorities.
+///
+/// 3. Validates the signatures of the retrieved key value pairs.
+///
+/// 4. Adds the retrieved external addresses as priority nodes to the
+/// peerset.
+///
+/// When run as a sentry node, the [`Worker`] does not publish
+/// any addresses to the DHT but still discovers validators and sentry nodes of
+/// validators, i.e. only step 2 (Discovers other authorities) is executed.
+pub struct Worker
+where
+ Block: BlockT + 'static,
+ Network: NetworkProvider,
+ Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend,
+ >::Api: AuthorityDiscoveryApi,
+{
+ /// Channel receiver for messages send by an [`Service`].
+ from_service: Fuse>,
+
+ client: Arc,
+
+ network: Arc,
+ /// List of sentry node public addresses.
+ //
+ // There are 3 states:
+ // - None: No addresses were specified.
+ // - Some(vec![]): Addresses were specified, but none could be parsed as proper
+ // Multiaddresses.
+ // - Some(vec![a, b, c, ...]): Valid addresses were specified.
+ sentry_nodes: Option>,
+ /// Channel we receive Dht events on.
+ dht_event_rx: Pin + Send>>,
+
+ /// Interval to be proactive, publishing own addresses.
+ publish_interval: Interval,
+ /// Interval on which to query for addresses of other authorities.
+ query_interval: Interval,
+ /// Interval on which to set the peerset priority group to a new random
+ /// set of addresses.
+ priority_group_set_interval: Interval,
+
+ addr_cache: addr_cache::AddrCache,
+
+ metrics: Option,
+
+ role: Role,
+
+ phantom: PhantomData,
+}
+
+impl Worker
+where
+ Block: BlockT + Unpin + 'static,
+ Network: NetworkProvider,
+ Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend,
+ >::Api:
+ AuthorityDiscoveryApi,
+ Self: Future,
+{
+ /// Return a new [`Worker`].
+ ///
+ /// Note: When specifying `sentry_nodes` this module will not advertise the public addresses of
+ /// the node itself but only the public addresses of its sentry nodes.
+ pub(crate) fn new(
+ from_service: mpsc::Receiver,
+ client: Arc,
+ network: Arc,
+ sentry_nodes: Vec,
+ dht_event_rx: Pin + Send>>,
+ role: Role,
+ prometheus_registry: Option,
+ ) -> Self {
+ // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h.
+ // Given that a node could restart at any point in time, one can not depend on the
+ // republishing process, thus publishing own external addresses should happen on an interval
+ // < 36h.
+ let publish_interval = interval_at(
+ Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME,
+ Duration::from_secs(12 * 60 * 60),
+ );
+
+ // External addresses of other authorities can change at any given point in time. The
+ // interval on which to query for external addresses of other authorities is a trade off
+ // between efficiency and performance.
+ let query_interval_duration = Duration::from_secs(60);
+ let query_interval_start = Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME;
+ let query_interval = interval_at(query_interval_start, query_interval_duration);
+
+ // Querying 500 [`AuthorityId`]s takes ~1m on the Kusama DHT (10th of August 2020) when
+ // comparing `authority_discovery_authority_addresses_requested_total` and
+ // `authority_discovery_dht_event_received`. With that in mind set the peerset priority
+ // group on the same interval as the [`query_interval`] above, just delayed by 2 minutes.
+ let priority_group_set_interval = interval_at(
+ query_interval_start + Duration::from_secs(2 * 60),
+ query_interval_duration,
+ );
+
+ let sentry_nodes = if !sentry_nodes.is_empty() {
+ Some(sentry_nodes.into_iter().map(|ma| ma.concat()).collect::>())
+ } else {
+ None
+ };
+
+ let addr_cache = AddrCache::new();
+
+ let metrics = match prometheus_registry {
+ Some(registry) => {
+ match Metrics::register(®istry) {
+ Ok(metrics) => Some(metrics),
+ Err(e) => {
+ error!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
+ None
+ },
+ }
+ },
+ None => None,
+ };
+
+ Worker {
+ from_service: from_service.fuse(),
+ client,
+ network,
+ sentry_nodes,
+ dht_event_rx,
+ publish_interval,
+ query_interval,
+ priority_group_set_interval,
+ addr_cache,
+ role,
+ metrics,
+ phantom: PhantomData,
+ }
+ }
+
+ /// Publish either our own or if specified the public addresses of our sentry nodes.
+ fn publish_ext_addresses(&mut self) -> Result<()> {
+ let key_store = match &self.role {
+ Role::Authority(key_store) => key_store,
+ // Only authority nodes can put addresses (their own or the ones of their sentry nodes)
+ // on the Dht. Sentry nodes don't have a known identity to authenticate such addresses,
+ // thus `publish_ext_addresses` becomes a no-op.
+ Role::Sentry => return Ok(()),
+ };
+
+ if let Some(metrics) = &self.metrics {
+ metrics.publish.inc()
+ }
+
+ let addresses: Vec<_> = match &self.sentry_nodes {
+ Some(addrs) => addrs.clone().into_iter()
+ .map(|a| a.to_vec())
+ .collect(),
+ None => self.network.external_addresses()
+ .into_iter()
+ .map(|a| a.with(multiaddr::Protocol::P2p(
+ self.network.local_peer_id().into(),
+ )))
+ .map(|a| a.to_vec())
+ .collect(),
+ };
+
+ if let Some(metrics) = &self.metrics {
+ metrics.amount_last_published.set(addresses.len() as u64);
+ }
+
+ let mut serialized_addresses = vec![];
+ schema::AuthorityAddresses { addresses }
+ .encode(&mut serialized_addresses)
+ .map_err(Error::EncodingProto)?;
+
+ let keys = Worker::get_own_public_keys_within_authority_set(
+ &key_store,
+ &self.client,
+ )?.into_iter().map(Into::into).collect::>();
+
+ let signatures = key_store.read()
+ .sign_with_all(
+ key_types::AUTHORITY_DISCOVERY,
+ keys.clone(),
+ serialized_addresses.as_slice(),
+ )
+ .map_err(|_| Error::Signing)?;
+
+ for (sign_result, key) in signatures.into_iter().zip(keys) {
+ let mut signed_addresses = vec![];
+
+ // sign_with_all returns Result signature
+ // is generated for a public key that is supported.
+ // Verify that all signatures exist for all provided keys.
+ let signature = sign_result.map_err(|_| Error::MissingSignature(key.clone()))?;
+ schema::SignedAuthorityAddresses {
+ addresses: serialized_addresses.clone(),
+ signature,
+ }
+ .encode(&mut signed_addresses)
+ .map_err(Error::EncodingProto)?;
+
+ self.network.put_value(
+ hash_authority_id(key.1.as_ref()),
+ signed_addresses,
+ );
+ }
+
+ Ok(())
+ }
+
+ fn request_addresses_of_others(&mut self) -> Result<()> {
+ let id = BlockId::hash(self.client.info().best_hash);
+
+ let authorities = self
+ .client
+ .runtime_api()
+ .authorities(&id)
+ .map_err(Error::CallingRuntime)?;
+
+ let local_keys = match &self.role {
+ Role::Authority(key_store) => {
+ key_store.read()
+ .sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
+ .into_iter()
+ .collect::>()
+ },
+ Role::Sentry => HashSet::new(),
+ };
+
+ for authority_id in authorities.iter() {
+ // Make sure we don't look up our own keys.
+ if !local_keys.contains(authority_id.as_ref()) {
+ if let Some(metrics) = &self.metrics {
+ metrics.request.inc();
+ }
+
+ self.network
+ .get_value(&hash_authority_id(authority_id.as_ref()));
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Handle incoming Dht events.
+ ///
+ /// Returns either:
+ /// - Poll::Pending when there are no more events to handle or
+ /// - Poll::Ready(()) when the dht event stream terminated.
+ fn handle_dht_events(&mut self, cx: &mut Context) -> Poll<()>{
+ loop {
+ match ready!(self.dht_event_rx.poll_next_unpin(cx)) {
+ Some(DhtEvent::ValueFound(v)) => {
+ if let Some(metrics) = &self.metrics {
+ metrics.dht_event_received.with_label_values(&["value_found"]).inc();
+ }
+
+ if log_enabled!(log::Level::Debug) {
+ let hashes = v.iter().map(|(hash, _value)| hash.clone());
+ debug!(
+ target: LOG_TARGET,
+ "Value for hash '{:?}' found on Dht.", hashes,
+ );
+ }
+
+ if let Err(e) = self.handle_dht_value_found_event(v) {
+ if let Some(metrics) = &self.metrics {
+ metrics.handle_value_found_event_failure.inc();
+ }
+
+ debug!(
+ target: LOG_TARGET,
+ "Failed to handle Dht value found event: {:?}", e,
+ );
+ }
+ }
+ Some(DhtEvent::ValueNotFound(hash)) => {
+ if let Some(metrics) = &self.metrics {
+ metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
+ }
+
+ debug!(
+ target: LOG_TARGET,
+ "Value for hash '{:?}' not found on Dht.", hash
+ )
+ },
+ Some(DhtEvent::ValuePut(hash)) => {
+ if let Some(metrics) = &self.metrics {
+ metrics.dht_event_received.with_label_values(&["value_put"]).inc();
+ }
+
+ debug!(
+ target: LOG_TARGET,
+ "Successfully put hash '{:?}' on Dht.", hash,
+ )
+ },
+ Some(DhtEvent::ValuePutFailed(hash)) => {
+ if let Some(metrics) = &self.metrics {
+ metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
+ }
+
+ debug!(
+ target: LOG_TARGET,
+ "Failed to put hash '{:?}' on Dht.", hash
+ )
+ },
+ None => {
+ debug!(target: LOG_TARGET, "Dht event stream terminated.");
+ return Poll::Ready(());
+ },
+ }
+ }
+ }
+
+ fn handle_dht_value_found_event(
+ &mut self,
+ values: Vec<(libp2p::kad::record::Key, Vec)>,
+ ) -> Result<()> {
+ // Ensure `values` is not empty and all its keys equal.
+ let remote_key = values.iter().fold(Ok(None), |acc, (key, _)| {
+ match acc {
+ Ok(None) => Ok(Some(key.clone())),
+ Ok(Some(ref prev_key)) if prev_key != key => Err(
+ Error::ReceivingDhtValueFoundEventWithDifferentKeys
+ ),
+ x @ Ok(_) => x,
+ Err(e) => Err(e),
+ }
+ })?.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?;
+
+ let authorities = {
+ let block_id = BlockId::hash(self.client.info().best_hash);
+ // From the Dht we only get the hashed authority id. In order to retrieve the actual
+ // authority id and to ensure it is actually an authority, we match the hash against the
+ // hash of the authority id of all other authorities.
+ let authorities = self.client.runtime_api().authorities(&block_id)?;
+ self.addr_cache.retain_ids(&authorities);
+ authorities
+ .into_iter()
+ .map(|id| (hash_authority_id(id.as_ref()), id))
+ .collect::>()
+ };
+
+ // Check if the event origins from an authority in the current authority set.
+ let authority_id: &AuthorityId = authorities
+ .get(&remote_key)
+ .ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?;
+
+ let local_peer_id = self.network.local_peer_id();
+
+ let remote_addresses: Vec = values.into_iter()
+ .map(|(_k, v)| {
+ let schema::SignedAuthorityAddresses { signature, addresses } =
+ schema::SignedAuthorityAddresses::decode(v.as_slice())
+ .map_err(Error::DecodingProto)?;
+
+ let signature = AuthoritySignature::decode(&mut &signature[..])
+ .map_err(Error::EncodingDecodingScale)?;
+
+ if !AuthorityPair::verify(&signature, &addresses, authority_id) {
+ return Err(Error::VerifyingDhtPayload);
+ }
+
+ let addresses = schema::AuthorityAddresses::decode(addresses.as_slice())
+ .map(|a| a.addresses)
+ .map_err(Error::DecodingProto)?
+ .into_iter()
+ .map(|a| a.try_into())
+ .collect::>()
+ .map_err(Error::ParsingMultiaddress)?;
+
+ Ok(addresses)
+ })
+ .collect::>>>()?
+ .into_iter()
+ .flatten()
+ // Ignore [`Multiaddr`]s without [`PeerId`] and own addresses.
+ .filter(|addr| addr.iter().any(|protocol| {
+ // Parse to PeerId first as Multihashes of old and new PeerId
+ // representation don't equal.
+ //
+ // See https://github.com/libp2p/rust-libp2p/issues/555 for
+ // details.
+ if let multiaddr::Protocol::P2p(hash) = protocol {
+ let peer_id = match PeerId::from_multihash(hash) {
+ Ok(peer_id) => peer_id,
+ Err(_) => return false, // Discard address.
+ };
+
+ // Discard if equal to local peer id, keep if it differs.
+ return !(peer_id == local_peer_id);
+ }
+
+ false // `protocol` is not a [`Protocol::P2p`], let's keep looking.
+ }))
+ .collect();
+
+ if !remote_addresses.is_empty() {
+ self.addr_cache.insert(authority_id.clone(), remote_addresses);
+ if let Some(metrics) = &self.metrics {
+ metrics.known_authorities_count.set(
+ self.addr_cache.num_ids().try_into().unwrap_or(std::u64::MAX)
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Retrieve our public keys within the current authority set.
+ //
+ // A node might have multiple authority discovery keys within its keystore, e.g. an old one and
+ // one for the upcoming session. In addition it could be participating in the current authority
+ // set with two keys. The function does not return all of the local authority discovery public
+ // keys, but only the ones intersecting with the current authority set.
+ fn get_own_public_keys_within_authority_set(
+ key_store: &BareCryptoStorePtr,
+ client: &Client,
+ ) -> Result> {
+ let local_pub_keys = key_store.read()
+ .sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
+ .into_iter()
+ .collect::>();
+
+ let id = BlockId::hash(client.info().best_hash);
+ let current_authorities = client.runtime_api()
+ .authorities(&id)
+ .map_err(Error::CallingRuntime)?
+ .into_iter()
+ .map(std::convert::Into::into)
+ .collect::>();
+
+ let intersection = local_pub_keys.intersection(¤t_authorities)
+ .cloned()
+ .map(std::convert::Into::into)
+ .collect();
+
+ Ok(intersection)
+ }
+
+ /// Set the peer set 'authority' priority group to a new random set of
+ /// [`Multiaddr`]s.
+ fn set_priority_group(&self) -> Result<()> {
+ let addresses = self.addr_cache.get_random_subset();
+
+ if addresses.is_empty() {
+ debug!(
+ target: LOG_TARGET,
+ "Got no addresses in cache for peerset priority group.",
+ );
+ return Ok(());
+ }
+
+ if let Some(metrics) = &self.metrics {
+ metrics.priority_group_size.set(addresses.len().try_into().unwrap_or(std::u64::MAX));
+ }
+
+ debug!(
+ target: LOG_TARGET,
+ "Applying priority group {:?} to peerset.", addresses,
+ );
+
+ self.network
+ .set_priority_group(
+ AUTHORITIES_PRIORITY_GROUP_NAME.to_string(),
+ addresses.into_iter().collect(),
+ )
+ .map_err(Error::SettingPeersetPriorityGroup)?;
+
+ Ok(())
+ }
+}
+
+impl Future for Worker
+where
+ Block: BlockT + Unpin + 'static,
+ Network: NetworkProvider,
+ Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend,
+ >::Api:
+ AuthorityDiscoveryApi,
+{
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll {
+ // Process incoming events.
+ if let Poll::Ready(()) = self.handle_dht_events(cx) {
+ // `handle_dht_events` returns `Poll::Ready(())` when the Dht event stream terminated.
+ // Termination of the Dht event stream implies that the underlying network terminated,
+ // thus authority discovery should terminate as well.
+ return Poll::Ready(());
+ }
+
+ // Publish own addresses.
+ if let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {
+ // Register waker of underlying task for next interval.
+ while let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {}
+
+ if let Err(e) = self.publish_ext_addresses() {
+ error!(
+ target: LOG_TARGET,
+ "Failed to publish external addresses: {:?}", e,
+ );
+ }
+ }
+
+ // Request addresses of authorities.
+ if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {
+ // Register waker of underlying task for next interval.
+ while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {}
+
+ if let Err(e) = self.request_addresses_of_others() {
+ error!(
+ target: LOG_TARGET,
+ "Failed to request addresses of authorities: {:?}", e,
+ );
+ }
+ }
+
+ // Set peerset priority group to a new random set of addresses.
+ if let Poll::Ready(_) = self.priority_group_set_interval.poll_next_unpin(cx) {
+ // Register waker of underlying task for next interval.
+ while let Poll::Ready(_) = self.priority_group_set_interval.poll_next_unpin(cx) {}
+
+ if let Err(e) = self.set_priority_group() {
+ error!(
+ target: LOG_TARGET,
+ "Failed to set priority group: {:?}", e,
+ );
+ }
+ }
+
+ // Handle messages from [`Service`].
+ while let Poll::Ready(Some(msg)) = self.from_service.poll_next_unpin(cx) {
+ match msg {
+ ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, sender) => {
+ let _ = sender.send(
+ self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
+ );
+ }
+ ServicetoWorkerMsg::GetAuthorityIdByPeerId(peer_id, sender) => {
+ let _ = sender.send(
+ self.addr_cache.get_authority_id_by_peer_id(&peer_id).map(Clone::clone),
+ );
+ }
+ }
+ }
+
+ Poll::Pending
+ }
+}
+
+/// NetworkProvider provides [`Worker`] with all necessary hooks into the
+/// underlying Substrate networking. Using this trait abstraction instead of [`NetworkService`]
+/// directly is necessary to unit test [`Worker`].
+pub trait NetworkProvider: NetworkStateInfo {
+ /// Modify a peerset priority group.
+ fn set_priority_group(
+ &self,
+ group_id: String,
+ peers: HashSet,
+ ) -> std::result::Result<(), String>;
+
+ /// Start putting a value in the Dht.
+ fn put_value(&self, key: libp2p::kad::record::Key, value: Vec);
+
+ /// Start getting a value from the Dht.
+ fn get_value(&self, key: &libp2p::kad::record::Key);
+}
+
+impl NetworkProvider for sc_network::NetworkService
+where
+ B: BlockT + 'static,
+ H: ExHashT,
+{
+ fn set_priority_group(
+ &self,
+ group_id: String,
+ peers: HashSet,
+ ) -> std::result::Result<(), String> {
+ self.set_priority_group(group_id, peers)
+ }
+ fn put_value(&self, key: libp2p::kad::record::Key, value: Vec) {
+ self.put_value(key, value)
+ }
+ fn get_value(&self, key: &libp2p::kad::record::Key) {
+ self.get_value(key)
+ }
+}
+
+fn hash_authority_id(id: &[u8]) -> libp2p::kad::record::Key {
+ libp2p::kad::record::Key::new(&libp2p::multihash::Sha2_256::digest(id))
+}
+
+fn interval_at(start: Instant, duration: Duration) -> Interval {
+ let stream = futures::stream::unfold(start, move |next| {
+ let time_until_next = next.saturating_duration_since(Instant::now());
+
+ Delay::new(time_until_next).map(move |_| Some(((), next + duration)))
+ });
+
+ Box::new(stream)
+}
+
+/// Prometheus metrics for a [`Worker`].
+#[derive(Clone)]
+pub(crate) struct Metrics {
+ publish: Counter,
+ amount_last_published: Gauge,
+ request: Counter,
+ dht_event_received: CounterVec,
+ handle_value_found_event_failure: Counter,
+ known_authorities_count: Gauge,
+ priority_group_size: Gauge,
+}
+
+impl Metrics {
+ pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result {
+ Ok(Self {
+ publish: register(
+ Counter::new(
+ "authority_discovery_times_published_total",
+ "Number of times authority discovery has published external addresses."
+ )?,
+ registry,
+ )?,
+ amount_last_published: register(
+ Gauge::new(
+ "authority_discovery_amount_external_addresses_last_published",
+ "Number of external addresses published when authority discovery last \
+ published addresses."
+ )?,
+ registry,
+ )?,
+ request: register(
+ Counter::new(
+ "authority_discovery_authority_addresses_requested_total",
+ "Number of times authority discovery has requested external addresses of a \
+ single authority."
+ )?,
+ registry,
+ )?,
+ dht_event_received: register(
+ CounterVec::new(
+ Opts::new(
+ "authority_discovery_dht_event_received",
+ "Number of dht events received by authority discovery."
+ ),
+ &["name"],
+ )?,
+ registry,
+ )?,
+ handle_value_found_event_failure: register(
+ Counter::new(
+ "authority_discovery_handle_value_found_event_failure",
+ "Number of times handling a dht value found event failed."
+ )?,
+ registry,
+ )?,
+ known_authorities_count: register(
+ Gauge::new(
+ "authority_discovery_known_authorities_count",
+ "Number of authorities known by authority discovery."
+ )?,
+ registry,
+ )?,
+ priority_group_size: register(
+ Gauge::new(
+ "authority_discovery_priority_group_size",
+ "Number of addresses passed to the peer set as a priority group."
+ )?,
+ registry,
+ )?,
+ })
+ }
+}
+
+// Helper functions for unit testing.
+#[cfg(test)]
+impl Worker
+where
+ Block: BlockT + 'static,
+ Network: NetworkProvider,
+ Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend,
+ >::Api: AuthorityDiscoveryApi,
+{
+ pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec) {
+ self.addr_cache.insert(authority, addresses);
+ }
+}
diff --git a/client/authority-discovery/src/worker/addr_cache.rs b/client/authority-discovery/src/worker/addr_cache.rs
new file mode 100644
index 0000000000000..a2cd3f33e9215
--- /dev/null
+++ b/client/authority-discovery/src/worker/addr_cache.rs
@@ -0,0 +1,233 @@
+// Copyright 2019-2020 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Substrate. If not, see .
+
+use libp2p::core::multiaddr::{Multiaddr, Protocol};
+use rand::seq::SliceRandom;
+use std::collections::HashMap;
+
+use sp_authority_discovery::AuthorityId;
+use sc_network::PeerId;
+
+/// The maximum number of authority connections initialized through the authority discovery module.
+///
+/// In other words the maximum size of the `authority` peerset priority group.
+const MAX_NUM_AUTHORITY_CONN: usize = 10;
+
+/// Cache for [`AuthorityId`] -> [`Vec`] and [`PeerId`] -> [`AuthorityId`] mappings.
+pub(super) struct AddrCache {
+ authority_id_to_addresses: HashMap>,
+ peer_id_to_authority_id: HashMap,
+}
+
+impl AddrCache {
+ pub fn new() -> Self {
+ AddrCache {
+ authority_id_to_addresses: HashMap::new(),
+ peer_id_to_authority_id: HashMap::new(),
+ }
+ }
+
+ /// Inserts the given [`AuthorityId`] and [`Vec`] pair for future lookups by
+ /// [`AuthorityId`] or [`PeerId`].
+ pub fn insert(&mut self, authority_id: AuthorityId, mut addresses: Vec) {
+ if addresses.is_empty() {
+ return;
+ }
+
+ // Insert into `self.peer_id_to_authority_id`.
+ let peer_ids = addresses.iter()
+ .map(|a| peer_id_from_multiaddr(a))
+ .filter_map(|peer_id| peer_id);
+ for peer_id in peer_ids {
+ self.peer_id_to_authority_id.insert(peer_id, authority_id.clone());
+ }
+
+ // Insert into `self.authority_id_to_addresses`.
+ addresses.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref()));
+ self.authority_id_to_addresses.insert(authority_id, addresses);
+ }
+
+ /// Returns the number of authority IDs in the cache.
+ pub fn num_ids(&self) -> usize {
+ self.authority_id_to_addresses.len()
+ }
+
+ /// Returns the addresses for the given [`AuthorityId`].
+ pub fn get_addresses_by_authority_id(&self, authority_id: &AuthorityId) -> Option<&Vec> {
+ self.authority_id_to_addresses.get(&authority_id)
+ }
+
+ /// Returns the [`AuthorityId`] for the given [`PeerId`].
+ pub fn get_authority_id_by_peer_id(&self, peer_id: &PeerId) -> Option<&AuthorityId> {
+ self.peer_id_to_authority_id.get(peer_id)
+ }
+
+ /// Returns a single address for a random subset (maximum of [`MAX_NUM_AUTHORITY_CONN`]) of all
+ /// known authorities.
+ pub fn get_random_subset(&self) -> Vec {
+ let mut rng = rand::thread_rng();
+
+ let mut addresses = self
+ .authority_id_to_addresses
+ .iter()
+ .filter_map(|(_authority_id, addresses)| {
+ debug_assert!(!addresses.is_empty());
+ addresses
+ .choose(&mut rng)
+ })
+ .collect::>();
+
+ addresses.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref()));
+ addresses.dedup();
+
+ addresses
+ .choose_multiple(&mut rng, MAX_NUM_AUTHORITY_CONN)
+ .map(|a| (**a).clone())
+ .collect()
+ }
+
+ /// Removes all [`PeerId`]s and [`Multiaddr`]s from the cache that are not related to the given
+ /// [`AuthorityId`]s.
+ pub fn retain_ids(&mut self, authority_ids: &Vec) {
+ // The below logic could be replaced by `BtreeMap::drain_filter` once it stabilized.
+ let authority_ids_to_remove = self.authority_id_to_addresses.iter()
+ .filter(|(id, _addresses)| !authority_ids.contains(id))
+ .map(|entry| entry.0)
+ .cloned()
+ .collect::>();
+
+ for authority_id_to_remove in authority_ids_to_remove {
+ // Remove other entries from `self.authority_id_to_addresses`.
+ let addresses = self.authority_id_to_addresses.remove(&authority_id_to_remove);
+
+ // Remove other entries from `self.peer_id_to_authority_id`.
+ let peer_ids = addresses.iter()
+ .flatten()
+ .map(|a| peer_id_from_multiaddr(a))
+ .filter_map(|peer_id| peer_id);
+ for peer_id in peer_ids {
+ if let Some(id) = self.peer_id_to_authority_id.remove(&peer_id) {
+ debug_assert_eq!(authority_id_to_remove, id);
+ }
+ }
+ }
+ }
+}
+
+fn peer_id_from_multiaddr(addr: &Multiaddr) -> Option {
+ addr.iter().last().and_then(|protocol| if let Protocol::P2p(multihash) = protocol {
+ PeerId::from_multihash(multihash).ok()
+ } else {
+ None
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use libp2p::multihash;
+ use quickcheck::{Arbitrary, Gen, QuickCheck, TestResult};
+ use rand::Rng;
+
+ use sp_authority_discovery::{AuthorityId, AuthorityPair};
+ use sp_core::crypto::Pair;
+
+ #[derive(Clone, Debug)]
+ struct TestAuthorityId(AuthorityId);
+
+ impl Arbitrary for TestAuthorityId {
+ fn arbitrary(g: &mut G) -> Self {
+ let seed: [u8; 32] = g.gen();
+ TestAuthorityId(AuthorityPair::from_seed_slice(&seed).unwrap().public())
+ }
+ }
+
+ #[derive(Clone, Debug)]
+ struct TestMultiaddr(Multiaddr);
+
+ impl Arbitrary for TestMultiaddr {
+ fn arbitrary(g: &mut G) -> Self {
+ let seed: [u8; 32] = g.gen();
+ let peer_id = PeerId::from_multihash(
+ multihash::wrap(multihash::Code::Sha2_256, &seed)
+ ).unwrap();
+ let multiaddr = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333".parse::()
+ .unwrap()
+ .with(Protocol::P2p(peer_id.into()));
+
+ TestMultiaddr(multiaddr)
+ }
+ }
+
+ #[test]
+ fn retains_only_entries_of_provided_authority_ids() {
+ fn property(
+ first: (TestAuthorityId, TestMultiaddr),
+ second: (TestAuthorityId, TestMultiaddr),
+ third: (TestAuthorityId, TestMultiaddr),
+ ) -> TestResult {
+ let first: (AuthorityId, Multiaddr) = ((first.0).0, (first.1).0);
+ let second: (AuthorityId, Multiaddr) = ((second.0).0, (second.1).0);
+ let third: (AuthorityId, Multiaddr) = ((third.0).0, (third.1).0);
+
+ let mut cache = AddrCache::new();
+
+ cache.insert(first.0.clone(), vec![first.1.clone()]);
+ cache.insert(second.0.clone(), vec![second.1.clone()]);
+ cache.insert(third.0.clone(), vec![third.1.clone()]);
+
+ let subset = cache.get_random_subset();
+ assert!(
+ subset.contains(&first.1) && subset.contains(&second.1) && subset.contains(&third.1),
+ "Expect initial subset to contain all authorities.",
+ );
+ assert_eq!(
+ Some(&vec![third.1.clone()]),
+ cache.get_addresses_by_authority_id(&third.0),
+ "Expect `get_addresses_by_authority_id` to return addresses of third authority."
+ );
+ assert_eq!(
+ Some(&third.0),
+ cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&third.1).unwrap()),
+ "Expect `get_authority_id_by_peer_id` to return `AuthorityId` of third authority."
+ );
+
+ cache.retain_ids(&vec![first.0, second.0]);
+
+ let subset = cache.get_random_subset();
+ assert!(
+ subset.contains(&first.1) || subset.contains(&second.1),
+ "Expected both first and second authority."
+ );
+ assert!(!subset.contains(&third.1), "Did not expect address from third authority");
+ assert_eq!(
+ None, cache.get_addresses_by_authority_id(&third.0),
+ "Expect `get_addresses_by_authority_id` to not return `None` for third authority."
+ );
+ assert_eq!(
+ None, cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&third.1).unwrap()),
+ "Expect `get_authority_id_by_peer_id` to return `None` for third authority."
+ );
+
+ TestResult::passed()
+ }
+
+ QuickCheck::new()
+ .max_tests(10)
+ .quickcheck(property as fn(_, _, _) -> TestResult)
+ }
+}
diff --git a/client/authority-discovery/src/schema/dht.proto b/client/authority-discovery/src/worker/schema/dht.proto
similarity index 100%
rename from client/authority-discovery/src/schema/dht.proto
rename to client/authority-discovery/src/worker/schema/dht.proto
diff --git a/client/authority-discovery/src/worker/tests.rs b/client/authority-discovery/src/worker/tests.rs
new file mode 100644
index 0000000000000..68aadca7a7f30
--- /dev/null
+++ b/client/authority-discovery/src/worker/tests.rs
@@ -0,0 +1,693 @@
+// This file is part of Substrate.
+
+// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+use crate::worker::schema;
+
+use std::{iter::FromIterator, sync::{Arc, Mutex}};
+
+use futures::channel::mpsc::channel;
+use futures::executor::{block_on, LocalPool};
+use futures::future::{poll_fn, FutureExt};
+use futures::sink::SinkExt;
+use futures::task::LocalSpawn;
+use futures::poll;
+use libp2p::{kad, core::multiaddr, PeerId};
+
+use sp_api::{ProvideRuntimeApi, ApiRef};
+use sp_core::{crypto::Public, testing::KeyStore};
+use sp_runtime::traits::{Zero, Block as BlockT, NumberFor};
+use substrate_test_runtime_client::runtime::Block;
+
+use super::*;
+
+#[test]
+fn interval_at_with_start_now() {
+ let start = Instant::now();
+
+ let mut interval = interval_at(
+ std::time::Instant::now(),
+ std::time::Duration::from_secs(10),
+ );
+
+ futures::executor::block_on(async {
+ interval.next().await;
+ });
+
+ assert!(
+ Instant::now().saturating_duration_since(start) < Duration::from_secs(1),
+ "Expected low resolution instant interval to fire within less than a second.",
+ );
+}
+
+#[test]
+fn interval_at_is_queuing_ticks() {
+ let start = Instant::now();
+
+ let interval = interval_at(start, std::time::Duration::from_millis(100));
+
+ // Let's wait for 200ms, thus 3 elements should be queued up (1st at 0ms, 2nd at 100ms, 3rd
+ // at 200ms).
+ std::thread::sleep(Duration::from_millis(200));
+
+ futures::executor::block_on(async {
+ interval.take(3).collect::>().await;
+ });
+
+ // Make sure we did not wait for more than 300 ms, which would imply that `at_interval` is
+ // not queuing ticks.
+ assert!(
+ Instant::now().saturating_duration_since(start) < Duration::from_millis(300),
+ "Expect interval to /queue/ events when not polled for a while.",
+ );
+}
+
+#[test]
+fn interval_at_with_initial_delay() {
+ let start = Instant::now();
+
+ let mut interval = interval_at(
+ std::time::Instant::now() + Duration::from_millis(100),
+ std::time::Duration::from_secs(10),
+ );
+
+ futures::executor::block_on(async {
+ interval.next().await;
+ });
+
+ assert!(
+ Instant::now().saturating_duration_since(start) > Duration::from_millis(100),
+ "Expected interval with initial delay not to fire right away.",
+ );
+}
+
+#[derive(Clone)]
+pub(crate) struct TestApi {
+ pub(crate) authorities: Vec,
+}
+
+impl ProvideRuntimeApi for TestApi {
+ type Api = RuntimeApi;
+
+ fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> {
+ RuntimeApi {
+ authorities: self.authorities.clone(),
+ }.into()
+ }
+}
+
+/// Blockchain database header backend. Does not perform any validation.
+impl HeaderBackend for TestApi {
+ fn header(
+ &self,
+ _id: BlockId,
+ ) -> std::result::Result, sp_blockchain::Error> {
+ Ok(None)
+ }
+
+ fn info(&self) -> sc_client_api::blockchain::Info {
+ sc_client_api::blockchain::Info {
+ best_hash: Default::default(),
+ best_number: Zero::zero(),
+ finalized_hash: Default::default(),
+ finalized_number: Zero::zero(),
+ genesis_hash: Default::default(),
+ number_leaves: Default::default(),
+ }
+ }
+
+ fn status(
+ &self,
+ _id: BlockId,
+ ) -> std::result::Result {
+ Ok(sc_client_api::blockchain::BlockStatus::Unknown)
+ }
+
+ fn number(
+ &self,
+ _hash: Block::Hash,
+ ) -> std::result::Result>, sp_blockchain::Error> {
+ Ok(None)
+ }
+
+ fn hash(
+ &self,
+ _number: NumberFor,
+ ) -> std::result::Result, sp_blockchain::Error> {
+ Ok(None)
+ }
+}
+
+pub(crate) struct RuntimeApi {
+ authorities: Vec,
+}
+
+sp_api::mock_impl_runtime_apis! {
+ impl AuthorityDiscoveryApi for RuntimeApi {
+ type Error = sp_blockchain::Error;
+
+ fn authorities(&self) -> Vec {
+ self.authorities.clone()
+ }
+ }
+}
+
+pub struct TestNetwork {
+ peer_id: PeerId,
+ // Whenever functions on `TestNetwork` are called, the function arguments are added to the
+ // vectors below.
+ pub put_value_call: Arc)>>>,
+ pub get_value_call: Arc>>,
+ pub set_priority_group_call: Arc)>>>,
+}
+
+impl Default for TestNetwork {
+ fn default() -> Self {
+ TestNetwork {
+ peer_id: PeerId::random(),
+ put_value_call: Default::default(),
+ get_value_call: Default::default(),
+ set_priority_group_call: Default::default(),
+ }
+ }
+}
+
+impl NetworkProvider for TestNetwork {
+ fn set_priority_group(
+ &self,
+ group_id: String,
+ peers: HashSet,
+ ) -> std::result::Result<(), String> {
+ self.set_priority_group_call
+ .lock()
+ .unwrap()
+ .push((group_id, peers));
+ Ok(())
+ }
+ fn put_value(&self, key: kad::record::Key, value: Vec) {
+ self.put_value_call.lock().unwrap().push((key, value));
+ }
+ fn get_value(&self, key: &kad::record::Key) {
+ self.get_value_call.lock().unwrap().push(key.clone());
+ }
+}
+
+impl NetworkStateInfo for TestNetwork {
+ fn local_peer_id(&self) -> PeerId {
+ self.peer_id.clone()
+ }
+
+ fn external_addresses(&self) -> Vec {
+ vec!["/ip6/2001:db8::/tcp/30333".parse().unwrap()]
+ }
+}
+
+#[test]
+fn new_registers_metrics() {
+ let (_dht_event_tx, dht_event_rx) = channel(1000);
+ let network: Arc = Arc::new(Default::default());
+ let key_store = KeyStore::new();
+ let test_api = Arc::new(TestApi {
+ authorities: vec![],
+ });
+
+ let registry = prometheus_endpoint::Registry::new();
+
+ let (_to_worker, from_service) = mpsc::channel(0);
+ Worker::new(
+ from_service,
+ test_api,
+ network.clone(),
+ vec![],
+ dht_event_rx.boxed(),
+ Role::Authority(key_store),
+ Some(registry.clone()),
+ );
+
+ assert!(registry.gather().len() > 0);
+}
+
+#[test]
+fn request_addresses_of_others_triggers_dht_get_query() {
+ let _ = ::env_logger::try_init();
+ let (_dht_event_tx, dht_event_rx) = channel(1000);
+
+ // Generate authority keys
+ let authority_1_key_pair = AuthorityPair::from_seed_slice(&[1; 32]).unwrap();
+ let authority_2_key_pair = AuthorityPair::from_seed_slice(&[2; 32]).unwrap();
+
+ let test_api = Arc::new(TestApi {
+ authorities: vec![authority_1_key_pair.public(), authority_2_key_pair.public()],
+ });
+
+ let network: Arc = Arc::new(Default::default());
+ let key_store = KeyStore::new();
+
+
+ let (_to_worker, from_service) = mpsc::channel(0);
+ let mut worker = Worker::new(
+ from_service,
+ test_api,
+ network.clone(),
+ vec![],
+ dht_event_rx.boxed(),
+ Role::Authority(key_store),
+ None,
+ );
+
+ worker.request_addresses_of_others().unwrap();
+
+ // Expect authority discovery to request new records from the dht.
+ assert_eq!(network.get_value_call.lock().unwrap().len(), 2);
+}
+
+#[test]
+fn publish_discover_cycle() {
+ let _ = ::env_logger::try_init();
+
+ // Node A publishing its address.
+
+ let (_dht_event_tx, dht_event_rx) = channel(1000);
+
+ let network: Arc = Arc::new(Default::default());
+ let node_a_multiaddr = {
+ let peer_id = network.local_peer_id();
+ let address = network.external_addresses().pop().unwrap();
+
+ address.with(multiaddr::Protocol::P2p(
+ peer_id.into(),
+ ))
+ };
+
+ let key_store = KeyStore::new();
+ let node_a_public = key_store
+ .write()
+ .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
+ .unwrap();
+ let test_api = Arc::new(TestApi {
+ authorities: vec![node_a_public.into()],
+ });
+
+ let (_to_worker, from_service) = mpsc::channel(0);
+ let mut worker = Worker::new(
+ from_service,
+ test_api,
+ network.clone(),
+ vec![],
+ dht_event_rx.boxed(),
+ Role::Authority(key_store),
+ None,
+ );
+
+ worker.publish_ext_addresses().unwrap();
+
+ // Expect authority discovery to put a new record onto the dht.
+ assert_eq!(network.put_value_call.lock().unwrap().len(), 1);
+
+ let dht_event = {
+ let (key, value) = network.put_value_call.lock().unwrap().pop().unwrap();
+ sc_network::DhtEvent::ValueFound(vec![(key, value)])
+ };
+
+ // Node B discovering node A's address.
+
+ let (mut dht_event_tx, dht_event_rx) = channel(1000);
+ let test_api = Arc::new(TestApi {
+ // Make sure node B identifies node A as an authority.
+ authorities: vec![node_a_public.into()],
+ });
+ let network: Arc = Arc::new(Default::default());
+ let key_store = KeyStore::new();
+
+ let (_to_worker, from_service) = mpsc::channel(0);
+ let mut worker = Worker::new(
+ from_service,
+ test_api,
+ network.clone(),
+ vec![],
+ dht_event_rx.boxed(),
+ Role::Authority(key_store),
+ None,
+ );
+
+ dht_event_tx.try_send(dht_event).unwrap();
+
+ let f = |cx: &mut Context<'_>| -> Poll<()> {
+ // Make authority discovery handle the event.
+ if let Poll::Ready(e) = worker.handle_dht_events(cx) {
+ panic!("Unexpected error: {:?}", e);
+ }
+ worker.set_priority_group().unwrap();
+
+ // Expect authority discovery to set the priority set.
+ assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1);
+
+ assert_eq!(
+ network.set_priority_group_call.lock().unwrap()[0],
+ (
+ "authorities".to_string(),
+ HashSet::from_iter(vec![node_a_multiaddr.clone()].into_iter())
+ )
+ );
+
+ Poll::Ready(())
+ };
+
+ let _ = block_on(poll_fn(f));
+}
+
+#[test]
+fn terminate_when_event_stream_terminates() {
+ let (dht_event_tx, dht_event_rx) = channel(1000);
+ let network: Arc = Arc::new(Default::default());
+ let key_store = KeyStore::new();
+ let test_api = Arc::new(TestApi {
+ authorities: vec![],
+ });
+
+ let (_to_worker, from_service) = mpsc::channel(0);
+ let mut worker = Worker::new(
+ from_service,
+ test_api,
+ network.clone(),
+ vec![],
+ dht_event_rx.boxed(),
+ Role::Authority(key_store),
+ None,
+ );
+
+ block_on(async {
+ assert_eq!(Poll::Pending, poll!(&mut worker));
+
+ // Simulate termination of the network through dropping the sender side of the dht event
+ // channel.
+ drop(dht_event_tx);
+
+ assert_eq!(
+ Poll::Ready(()), poll!(&mut worker),
+ "Expect the authority discovery module to terminate once the sending side of the dht \
+ event channel is terminated.",
+ );
+ });
+}
+
+#[test]
+fn continue_operating_when_service_channel_is_dropped() {
+ let (_dht_event_tx, dht_event_rx) = channel(0);
+ let network: Arc = Arc::new(Default::default());
+ let key_store = KeyStore::new();
+ let test_api = Arc::new(TestApi {
+ authorities: vec![],
+ });
+
+ let (to_worker, from_service) = mpsc::channel(0);
+ let mut worker = Worker::new(
+ from_service,
+ test_api,
+ network.clone(),
+ vec![],
+ dht_event_rx.boxed(),
+ Role::Authority(key_store),
+ None,
+ );
+
+ block_on(async {
+ assert_eq!(Poll::Pending, poll!(&mut worker));
+
+ drop(to_worker);
+
+ for _ in 0..100 {
+ assert_eq!(
+ Poll::Pending, poll!(&mut worker),
+ "Expect authority discovery `Worker` not to panic when service channel is dropped.",
+ );
+ }
+ });
+}
+
+#[test]
+fn dont_stop_polling_when_error_is_returned() {
+ #[derive(PartialEq, Debug)]
+ enum Event {
+ Processed,
+ End,
+ };
+
+ let (mut dht_event_tx, dht_event_rx) = channel(1000);
+ let (mut discovery_update_tx, mut discovery_update_rx) = channel(1000);
+ let network: Arc = Arc::new(Default::default());
+ let key_store = KeyStore::new();
+ let test_api = Arc::new(TestApi {
+ authorities: vec![],
+ });
+ let mut pool = LocalPool::new();
+
+ let (_to_worker, from_service) = mpsc::channel(0);
+ let mut worker = Worker::new(
+ from_service,
+ test_api,
+ network.clone(),
+ vec![],
+ dht_event_rx.boxed(),
+ Role::Authority(key_store),
+ None,
+ );
+
+ // Spawn the authority discovery to make sure it is polled independently.
+ //
+ // As this is a local pool, only one future at a time will have the CPU and
+ // can make progress until the future returns `Pending`.
+ pool.spawner().spawn_local_obj(
+ futures::future::poll_fn(move |ctx| {
+ match std::pin::Pin::new(&mut worker).poll(ctx) {
+ Poll::Ready(()) => {},
+ Poll::Pending => {
+ discovery_update_tx.send(Event::Processed).now_or_never();
+ return Poll::Pending;
+ },
+ }
+ let _ = discovery_update_tx.send(Event::End).now_or_never().unwrap();
+ Poll::Ready(())
+ }).boxed_local().into(),
+ ).expect("Spawns authority discovery");
+
+ pool.run_until(
+ // The future that drives the event stream
+ async {
+ // Send an event that should generate an error
+ let _ = dht_event_tx.send(DhtEvent::ValueFound(Default::default())).now_or_never();
+ // Send the same event again to make sure that the event stream needs to be polled twice
+ // to be woken up again.
+ let _ = dht_event_tx.send(DhtEvent::ValueFound(Default::default())).now_or_never();
+
+ // Now we call `await` and give the control to the authority discovery future.
+ assert_eq!(Some(Event::Processed), discovery_update_rx.next().await);
+
+ // Drop the event rx to stop the authority discovery. If it was polled correctly, it
+ // should end properly.
+ drop(dht_event_tx);
+
+ assert!(
+ discovery_update_rx.collect::>()
+ .await
+ .into_iter()
+ .any(|evt| evt == Event::End),
+ "The authority discovery should have ended",
+ );
+ }
+ );
+}
+
+/// In the scenario of a validator publishing the address of its sentry node to
+/// the DHT, said sentry node should not add its own Multiaddr to the
+/// peerset "authority" priority group.
+#[test]
+fn never_add_own_address_to_priority_group() {
+ let validator_key_store = KeyStore::new();
+ let validator_public = validator_key_store
+ .write()
+ .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
+ .unwrap();
+
+ let sentry_network: Arc = Arc::new(Default::default());
+
+ let sentry_multiaddr = {
+ let peer_id = sentry_network.local_peer_id();
+ let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333".parse().unwrap();
+
+ address.with(multiaddr::Protocol::P2p(peer_id.into()))
+ };
+
+ // Address of some other sentry node of `validator`.
+ let random_multiaddr = {
+ let peer_id = PeerId::random();
+ let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap();
+
+ address.with(multiaddr::Protocol::P2p(
+ peer_id.into(),
+ ))
+ };
+
+ let dht_event = {
+ let addresses = vec![
+ sentry_multiaddr.to_vec(),
+ random_multiaddr.to_vec(),
+ ];
+
+ let mut serialized_addresses = vec![];
+ schema::AuthorityAddresses { addresses }
+ .encode(&mut serialized_addresses)
+ .map_err(Error::EncodingProto)
+ .unwrap();
+
+ let signature = validator_key_store.read()
+ .sign_with(
+ key_types::AUTHORITY_DISCOVERY,
+ &validator_public.clone().into(),
+ serialized_addresses.as_slice(),
+ )
+ .map_err(|_| Error::Signing)
+ .unwrap();
+
+ let mut signed_addresses = vec![];
+ schema::SignedAuthorityAddresses {
+ addresses: serialized_addresses.clone(),
+ signature,
+ }
+ .encode(&mut signed_addresses)
+ .map_err(Error::EncodingProto)
+ .unwrap();
+
+ let key = hash_authority_id(&validator_public.to_raw_vec());
+ let value = signed_addresses;
+ (key, value)
+ };
+
+ let (_dht_event_tx, dht_event_rx) = channel(1);
+ let sentry_test_api = Arc::new(TestApi {
+ // Make sure the sentry node identifies its validator as an authority.
+ authorities: vec![validator_public.into()],
+ });
+
+ let (_to_worker, from_service) = mpsc::channel(0);
+ let mut sentry_worker = Worker::new(
+ from_service,
+ sentry_test_api,
+ sentry_network.clone(),
+ vec![],
+ dht_event_rx.boxed(),
+ Role::Sentry,
+ None,
+ );
+
+ sentry_worker.handle_dht_value_found_event(vec![dht_event]).unwrap();
+ sentry_worker.set_priority_group().unwrap();
+
+ assert_eq!(
+ sentry_network.set_priority_group_call.lock().unwrap().len(), 1,
+ "Expect authority discovery to set the priority set.",
+ );
+
+ assert_eq!(
+ sentry_network.set_priority_group_call.lock().unwrap()[0],
+ (
+ "authorities".to_string(),
+ HashSet::from_iter(vec![random_multiaddr.clone()].into_iter(),)
+ ),
+ "Expect authority discovery to only add `random_multiaddr`."
+ );
+}
+
+#[test]
+fn do_not_cache_addresses_without_peer_id() {
+ let remote_key_store = KeyStore::new();
+ let remote_public = remote_key_store
+ .write()
+ .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
+ .unwrap();
+
+ let multiaddr_with_peer_id = {
+ let peer_id = PeerId::random();
+ let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333".parse().unwrap();
+
+ address.with(multiaddr::Protocol::P2p(peer_id.into()))
+ };
+
+ let multiaddr_without_peer_id: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap();
+
+ let dht_event = {
+ let addresses = vec![
+ multiaddr_with_peer_id.to_vec(),
+ multiaddr_without_peer_id.to_vec(),
+ ];
+
+ let mut serialized_addresses = vec![];
+ schema::AuthorityAddresses { addresses }
+ .encode(&mut serialized_addresses)
+ .map_err(Error::EncodingProto)
+ .unwrap();
+
+ let signature = remote_key_store.read()
+ .sign_with(
+ key_types::AUTHORITY_DISCOVERY,
+ &remote_public.clone().into(),
+ serialized_addresses.as_slice(),
+ )
+ .map_err(|_| Error::Signing)
+ .unwrap();
+
+ let mut signed_addresses = vec![];
+ schema::SignedAuthorityAddresses {
+ addresses: serialized_addresses.clone(),
+ signature,
+ }
+ .encode(&mut signed_addresses)
+ .map_err(Error::EncodingProto)
+ .unwrap();
+
+ let key = hash_authority_id(&remote_public.to_raw_vec());
+ let value = signed_addresses;
+ (key, value)
+ };
+
+ let (_dht_event_tx, dht_event_rx) = channel(1);
+ let local_test_api = Arc::new(TestApi {
+ // Make sure the sentry node identifies its validator as an authority.
+ authorities: vec![remote_public.into()],
+ });
+ let local_network: Arc = Arc::new(Default::default());
+ let local_key_store = KeyStore::new();
+
+ let (_to_worker, from_service) = mpsc::channel(0);
+ let mut local_worker = Worker::new(
+ from_service,
+ local_test_api,
+ local_network.clone(),
+ vec![],
+ dht_event_rx.boxed(),
+ Role::Authority(local_key_store),
+ None,
+ );
+
+ local_worker.handle_dht_value_found_event(vec![dht_event]).unwrap();
+
+ assert_eq!(
+ Some(&vec![multiaddr_with_peer_id]),
+ local_worker.addr_cache.get_addresses_by_authority_id(&remote_public.into()),
+ "Expect worker to only cache `Multiaddr`s with `PeerId`s.",
+ );
+}