Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update persistent networking parameters manager. #1762

Merged
merged 7 commits into from
Aug 8, 2023
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::DsnArgs;
use futures::StreamExt;
use parking_lot::Mutex;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
Expand All @@ -14,6 +15,7 @@ use subspace_networking::libp2p::identity::Keypair;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::strip_peer_id;
use subspace_networking::{
create, Config, NetworkingParametersManager, Node, NodeRunner, PeerInfo, PeerInfoProvider,
PieceByHashRequest, PieceByHashRequestHandler, PieceByHashResponse,
Expand Down Expand Up @@ -53,7 +55,14 @@ pub(super) fn configure_dsn(
let networking_parameters_registry = {
let known_addresses_db_path = base_path.join("known_addresses_db");

NetworkingParametersManager::new(&known_addresses_db_path).map(|manager| manager.boxed())?
NetworkingParametersManager::new(
&known_addresses_db_path,
strip_peer_id(bootstrap_nodes.clone())
.into_iter()
.map(|(peer_id, _)| peer_id)
.collect::<HashSet<_>>(),
)
.map(|manager| manager.boxed())?
};

// TODO: Consider introducing and using global in-memory segment header cache (this comment is
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/examples/get-peers-complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn main() {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
networking_parameters_registry: Some(
NetworkingParametersManager::new(db_path.as_ref())
NetworkingParametersManager::new(db_path.as_ref(), Default::default())
.unwrap()
.boxed(),
),
Expand Down
149 changes: 126 additions & 23 deletions crates/subspace-networking/src/behavior/persistent_parameters.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::utils::{CollectionBatcher, PeerAddress};
use crate::utils::{CollectionBatcher, Handler, HandlerFn, PeerAddress};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use event_listener_primitives::HandlerId;
use futures::future::Fuse;
use futures::FutureExt;
use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr, PeerId};
use lru::LruCache;
use parity_db::{Db, Options};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::num::NonZeroUsize;
use std::ops::Add;
Expand All @@ -23,19 +24,32 @@ use tracing::{debug, trace};
/// Parity DB error type alias.
pub type ParityDbError = parity_db::Error;

// Defines optional time for address dial failure
/// Defines optional time for address dial failure
type FailureTime = Option<DateTime<Utc>>;

// Size of the LRU cache for peers.
/// Size of the LRU cache for peers.
const PEER_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(100).expect("Not zero; qed");
// Size of the LRU cache for addresses.
/// Size of the LRU cache for addresses.
const ADDRESSES_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(30).expect("Not zero; qed");
// Pause duration between network parameters save.
/// Pause duration between network parameters save.
const DATA_FLUSH_DURATION_SECS: u64 = 5;
// Defines a batch size for a combined collection for known peers addresses and boostrap addresses.
/// Defines a batch size for a combined collection for known peers addresses and boostrap addresses.
pub(crate) const PEERS_ADDRESSES_BATCH_SIZE: usize = 30;
// Defines an expiration period for the peer marked for the removal.
/// Defines an expiration period for the peer marked for the removal for persistent storage.
const REMOVE_KNOWN_PEERS_GRACE_PERIOD_SECS: i64 = 86400; // 1 DAY
/// Defines an expiration period for the peer marked for the removal for Kademlia DHT.
const REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA_SECS: i64 = 3600; // 1 HOUR

/// Defines the event triggered when the peer address is removed from the permanent storage.
#[derive(Debug, Clone)]
pub struct PeerAddressRemovedEvent {
/// Peer ID
pub peer_id: PeerId,
/// Peer address
pub address: Multiaddr,
/// No address left in the permanent storage.
pub last_address: bool,
}

/// Defines operations with the networking parameters.
#[async_trait]
Expand All @@ -62,6 +76,15 @@ pub trait NetworkingParametersRegistry: Send + Sync {

/// Enables Clone implementation for `Box<dyn NetworkingParametersRegistry>`
fn clone_box(&self) -> Box<dyn NetworkingParametersRegistry>;

/// Triggers when we removed the peer address from the permanent storage. Returns optional
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be a bit more specific, we don't really remove it from persistent storage when this event is triggered. Maybe call it on_unreachable or something of that sort?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I like on_unreachable_address but NetworkingParametersRegistry only knows about address removal nothing about the connectivity. I could add the intended usage comment though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it does have a timer and it does essentially check for how long address was unreachable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we need to remove it. Current name is misleading because address is literally not removed from anywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I'll change to on_unreachable_address

/// event HandlerId. Option enables stub implementation. One of the usages is to notify
/// Kademlia about the expired(unreachable) address when it check for how long address was
/// unreachable.
fn on_unreachable_address(
&mut self,
handler: HandlerFn<PeerAddressRemovedEvent>,
) -> Option<HandlerId>;
}

impl Clone for Box<dyn NetworkingParametersRegistry> {
Expand Down Expand Up @@ -100,6 +123,13 @@ impl NetworkingParametersRegistry for StubNetworkingParametersManager {
fn clone_box(&self) -> Box<dyn NetworkingParametersRegistry> {
Box::new(self.clone())
}

fn on_unreachable_address(
&mut self,
_handler: HandlerFn<PeerAddressRemovedEvent>,
) -> Option<HandlerId> {
None
}
}

/// Networking parameters persistence errors.
Expand Down Expand Up @@ -130,12 +160,19 @@ pub struct NetworkingParametersManager {
object_id: &'static [u8],
// Provides batching capabilities for the address collection (it stores the last batch index)
collection_batcher: CollectionBatcher<PeerAddress>,
// Event handler triggered when we decide to remove address from the storage.
address_removed: Handler<PeerAddressRemovedEvent>,
// Peer ID list to filter on address adding.
ignore_peer_list: HashSet<PeerId>,
}

impl NetworkingParametersManager {
/// Object constructor. It accepts `NetworkingParametersProvider` implementation as a parameter.
/// On object creation it starts a job for networking parameters cache handling.
pub fn new(path: &Path) -> Result<Self, NetworkParametersPersistenceError> {
pub fn new(
path: &Path,
ignore_peer_list: HashSet<PeerId>,
) -> Result<Self, NetworkParametersPersistenceError> {
let mut options = Options::with_columns(path, 1);
// We don't use stats
options.stats = false;
Expand Down Expand Up @@ -170,6 +207,8 @@ impl NetworkingParametersManager {
NonZeroUsize::new(PEERS_ADDRESSES_BATCH_SIZE)
.expect("Manual non-zero initialization failed."),
),
address_removed: Default::default(),
ignore_peer_list,
})
}

Expand Down Expand Up @@ -222,6 +261,17 @@ fn clone_lru_cache<K: Clone + Hash + Eq, V: Clone>(
#[async_trait]
impl NetworkingParametersRegistry for NetworkingParametersManager {
async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
if self.ignore_peer_list.contains(&peer_id) {
debug!(
%peer_id,
addr_num=addresses.len(),
"Adding new peer addresses canceled (ignore list): {:?}",
addresses
);

return;
}

debug!(
%peer_id,
addr_num=addresses.len(),
Expand Down Expand Up @@ -259,13 +309,18 @@ impl NetworkingParametersRegistry for NetworkingParametersManager {
async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
trace!(%peer_id, "Remove peer addresses from the networking parameters registry: {:?}", addresses);

remove_known_peer_addresses_internal(
let removed_addresses = remove_known_peer_addresses_internal(
&mut self.known_peers,
peer_id,
addresses,
chrono::Duration::seconds(REMOVE_KNOWN_PEERS_GRACE_PERIOD_SECS),
chrono::Duration::seconds(REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA_SECS),
);

for event in removed_addresses {
self.address_removed.call_simple(&event);
}

self.cache_need_saving = true;
}

Expand Down Expand Up @@ -330,9 +385,20 @@ impl NetworkingParametersRegistry for NetworkingParametersManager {
column_id: self.column_id,
object_id: self.object_id,
collection_batcher: self.collection_batcher.clone(),
address_removed: self.address_removed.clone(),
ignore_peer_list: self.ignore_peer_list.clone(),
}
.boxed()
}

fn on_unreachable_address(
&mut self,
handler: HandlerFn<PeerAddressRemovedEvent>,
) -> Option<HandlerId> {
let handler_id = self.address_removed.add(handler);

Some(handler_id)
}
}

// Helper struct for NetworkingPersistence implementations (data transfer object).
Expand Down Expand Up @@ -370,48 +436,83 @@ impl NetworkingParameters {
}
}

// Removes a P2p protocol suffix from the multiaddress if any.
fn remove_p2p_suffix(address: Multiaddr) -> Multiaddr {
let mut modified_address = address.clone();
/// Removes a P2p protocol suffix from the multiaddress if any.
pub(crate) fn remove_p2p_suffix(mut address: Multiaddr) -> Multiaddr {
let last_protocol = address.pop();

if let Some(Protocol::P2p(_)) = &last_protocol {
return address;
}

if let Some(protocol) = last_protocol {
address.push(protocol)
}

address
}

/// Appends a P2p protocol suffix to the multiaddress if require3d.
pub(crate) fn append_p2p_suffix(peer_id: PeerId, mut address: Multiaddr) -> Multiaddr {
let last_protocol = address.pop();

if let Some(Protocol::P2p(_)) = modified_address.pop() {
modified_address
} else {
address
if let Some(protocol) = last_protocol {
if !matches!(protocol, Protocol::P2p(..)) {
address.push(protocol)
}
}
address.push(Protocol::P2p(peer_id));

address
}

// Testable implementation of the `remove_known_peer_addresses`
pub(super) fn remove_known_peer_addresses_internal(
known_peers: &mut LruCache<PeerId, LruCache<Multiaddr, FailureTime>>,
peer_id: PeerId,
addresses: Vec<Multiaddr>,
expired_address_duration: chrono::Duration,
) {
expired_address_duration_persistent_storage: chrono::Duration,
expired_address_duration_kademlia: chrono::Duration,
) -> Vec<PeerAddressRemovedEvent> {
let mut address_removed_events = Vec::new();

addresses
.into_iter()
.map(remove_p2p_suffix)
.for_each(|addr| {
// if peer_id is present in the cache
if let Some(addresses) = known_peers.peek_mut(&peer_id) {
let last_address = addresses.contains(&addr) && addresses.len() == 1;
// Get mutable reference to first_failed_time for the address without updating
// the item's position in the cache
if let Some(first_failed_time) = addresses.peek_mut(&addr) {
// if we failed previously with this address
if let Some(time) = first_failed_time {
// if we failed first time more than a day ago
if time.add(expired_address_duration) < Utc::now() {
// if we failed first time more than an hour ago (for Kademlia)
if time.add(expired_address_duration_kademlia) < Utc::now() {
let address_removed = PeerAddressRemovedEvent{
peer_id,
address: addr.clone(),
last_address
};

address_removed_events.push(address_removed);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not calling even handlers from here? Looks like you're collecting them into vector here just to iterate it afterwards. Or you can turn this for_each into filter_map and return iterator of these things without collecting first.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's easier to test this way. I don't expect a lot of elements in this vector.


trace!(%peer_id, "Address was marked for removal from Kademlia: {:?}", addr);
}

// if we failed first time more than a day ago (for persistent cache)
if time.add(expired_address_duration_persistent_storage) < Utc::now() {
// Remove a failed address
addresses.pop(&addr);

// If the last address for peer
if addresses.is_empty() {
if last_address {
known_peers.pop(&peer_id);

trace!(%peer_id, "Peer removed from the cache");
}

trace!(%peer_id, "Address removed from the cache: {:?}", addr);
trace!(%peer_id, "Address removed from the persistent cache: {:?}", addr);
} else {
trace!(%peer_id, "Saving failed connection attempt to a peer: {:?}", addr);
}
Expand All @@ -424,4 +525,6 @@ pub(super) fn remove_known_peer_addresses_internal(
}
}
});

address_removed_events
}
Loading