Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update persistent networking parameters manager. #1762

Merged
merged 7 commits into from
Aug 8, 2023
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::DsnArgs;
use futures::StreamExt;
use parking_lot::Mutex;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
Expand All @@ -14,6 +15,7 @@ use subspace_farmer::{NodeClient, NodeRpcClient};
use subspace_networking::libp2p::identity::Keypair;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::utils::convert_multiaddresses;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::{
create, peer_id, Config, NetworkingParametersManager, Node, NodeRunner,
Expand Down Expand Up @@ -58,7 +60,14 @@ pub(super) fn configure_dsn(
let networking_parameters_registry = {
let known_addresses_db_path = base_path.join("known_addresses_db");

NetworkingParametersManager::new(&known_addresses_db_path).map(|manager| manager.boxed())?
NetworkingParametersManager::new(
&known_addresses_db_path,
convert_multiaddresses(bootstrap_nodes.clone())
.into_iter()
.map(|(peer_id, _)| peer_id)
.collect::<HashSet<_>>(),
)
.map(|manager| manager.boxed())?
};

let weak_readers_and_pieces = Arc::downgrade(readers_and_pieces);
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/examples/get-peers-complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn main() {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
networking_parameters_registry: Some(
NetworkingParametersManager::new(db_path.as_ref())
NetworkingParametersManager::new(db_path.as_ref(), Default::default())
.unwrap()
.boxed(),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use libp2p::{Multiaddr, PeerId};
use lru::LruCache;
use parity_db::{Db, Options};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::num::NonZeroUsize;
use std::ops::Add;
Expand Down Expand Up @@ -160,12 +160,17 @@ pub struct NetworkingParametersManager {
collection_batcher: CollectionBatcher<PeerAddress>,
// Event handler triggered when we decide to remove address from the storage.
address_removed: Handler<PeerAddressRemovedEvent>,
// Peer ID list to filter on address adding.
ignore_peer_list: HashSet<PeerId>,
}

impl NetworkingParametersManager {
/// Object constructor. It accepts `NetworkingParametersProvider` implementation as a parameter.
/// On object creation it starts a job for networking parameters cache handling.
pub fn new(path: &Path) -> Result<Self, NetworkParametersPersistenceError> {
pub fn new(
path: &Path,
ignore_peer_list: HashSet<PeerId>,
) -> Result<Self, NetworkParametersPersistenceError> {
let mut options = Options::with_columns(path, 1);
// We don't use stats
options.stats = false;
Expand Down Expand Up @@ -201,6 +206,7 @@ impl NetworkingParametersManager {
.expect("Manual non-zero initialization failed."),
),
address_removed: Default::default(),
ignore_peer_list,
})
}

Expand Down Expand Up @@ -253,6 +259,17 @@ fn clone_lru_cache<K: Clone + Hash + Eq, V: Clone>(
#[async_trait]
impl NetworkingParametersRegistry for NetworkingParametersManager {
async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
if self.ignore_peer_list.contains(&peer_id) {
debug!(
%peer_id,
addr_num=addresses.len(),
"Adding new peer addresses canceled (ignore list): {:?}",
addresses
);

return;
}

debug!(
%peer_id,
addr_num=addresses.len(),
Expand Down Expand Up @@ -367,6 +384,7 @@ impl NetworkingParametersRegistry for NetworkingParametersManager {
object_id: self.object_id,
collection_batcher: self.collection_batcher.clone(),
address_removed: self.address_removed.clone(),
ignore_peer_list: self.ignore_peer_list.clone(),
}
.boxed()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ use either::Either;
use libp2p::identity::ed25519::Keypair;
use libp2p::{identity, Multiaddr, PeerId};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::utils::convert_multiaddresses;
use subspace_networking::{
peer_id, Config, NetworkingParametersManager, ParityDbProviderStorage, VoidProviderStorage,
};
Expand Down Expand Up @@ -162,8 +164,14 @@ async fn main() -> anyhow::Result<()> {
.map(|path| {
let known_addresses_db = path.join("known_addresses_db");

NetworkingParametersManager::new(&known_addresses_db)
.map(|manager| manager.boxed())
NetworkingParametersManager::new(
&known_addresses_db,
convert_multiaddresses(bootstrap_nodes.clone())
.into_iter()
.map(|(peer_id, _)| peer_id)
.collect::<HashSet<_>>(),
)
.map(|manager| manager.boxed())
})
.transpose()
.map_err(|err| anyhow!(err))?
Expand Down
6 changes: 3 additions & 3 deletions crates/subspace-networking/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ impl<T: Clone> CollectionBatcher<T> {
// Convenience alias for peer ID and its multiaddresses.
pub(crate) type PeerAddress = (PeerId, Multiaddr);

// Helper function. Converts multiaddresses to a tuple with peer ID removing the peer Id suffix.
// It logs incorrect multiaddresses.
pub(crate) fn convert_multiaddresses(addresses: Vec<Multiaddr>) -> Vec<PeerAddress> {
/// Helper function. Converts multiaddresses to a tuple with peer ID removing the peer Id suffix.
/// It logs incorrect multiaddresses.
pub fn convert_multiaddresses(addresses: Vec<Multiaddr>) -> Vec<PeerAddress> {
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
addresses
.into_iter()
.filter_map(|multiaddr| {
Expand Down
11 changes: 10 additions & 1 deletion crates/subspace-service/src/dsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ pub mod import_blocks;
use crate::piece_cache::PieceCache;
use sc_client_api::AuxStore;
use sc_consensus_subspace::SegmentHeadersStore;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use subspace_core_primitives::{SegmentHeader, SegmentIndex};
use subspace_networking::libp2p::{identity, Multiaddr};
use subspace_networking::utils::convert_multiaddresses;
use subspace_networking::{
CreationError, NetworkParametersPersistenceError, NetworkingParametersManager, Node,
NodeRunner, ParityDbError, PeerInfoProvider, PieceByHashRequestHandler, PieceByHashResponse,
Expand Down Expand Up @@ -89,7 +91,14 @@ where
.map(|path| {
let db_path = path.join("known_addresses_db");

NetworkingParametersManager::new(&db_path).map(|manager| manager.boxed())
NetworkingParametersManager::new(
&db_path,
convert_multiaddresses(dsn_config.bootstrap_nodes.clone())
.into_iter()
.map(|(peer_id, _)| peer_id)
.collect::<HashSet<_>>(),
)
.map(|manager| manager.boxed())
})
.transpose()?
};
Expand Down