From 719ee762ab7c2f67540bc11f5b141233bcaae370 Mon Sep 17 00:00:00 2001 From: Domenico De Guglielmo <domenico.deguglielmo@domo-iot.com> Date: Thu, 23 Nov 2023 11:51:39 +0100 Subject: [PATCH] ping and removal of connections --- dht-cache/Cargo.toml | 2 +- dht-cache/src/domocache.rs | 31 +++++++++++-------------------- dht-cache/src/domolibp2p.rs | 25 +++++++++++++++++-------- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/dht-cache/Cargo.toml b/dht-cache/Cargo.toml index ab9069c..ece9744 100644 --- a/dht-cache/Cargo.toml +++ b/dht-cache/Cargo.toml @@ -13,7 +13,7 @@ async-trait = "0.1.68" futures = "0.3.21" futures-util = "0.3.29" jsonpath_lib = "0.3.0" -libp2p = { version = "0.53.0", features = ["tokio", "mdns", "gossipsub", "noise", "yamux", "pnet", "rsa", "tcp", "macros"] } +libp2p = { version="0.53.1", features = ["tokio", "mdns", "gossipsub", "noise", "ping", "yamux", "pnet", "rsa", "tcp", "macros"] } log = "0.4.17" rand = "0.8" sea-query = "0.28.3" diff --git a/dht-cache/src/domocache.rs b/dht-cache/src/domocache.rs index b963939..9a719b6 100644 --- a/dht-cache/src/domocache.rs +++ b/dht-cache/src/domocache.rs @@ -3,7 +3,7 @@ use crate::utils; use futures::prelude::*; use libp2p::gossipsub::IdentTopic as Topic; use libp2p::identity::Keypair; -use libp2p::{mdns, PeerId}; +use libp2p::{mdns, ping}; use libp2p::swarm::{SwarmEvent}; use rsa::pkcs8::EncodePrivateKey; use rsa::RsaPrivateKey; @@ -20,7 +20,6 @@ use time::OffsetDateTime; use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, Sender}; use crate::utils::get_epoch_ms; -use std::str::FromStr; use libp2p::swarm::dial_opts::DialOpts; fn generate_rsa_key() -> (Vec<u8>, Vec<u8>) { @@ -384,19 +383,6 @@ impl DomoCache { // } } - pub fn remove_connections_of_peers(&mut self) { - for (peer_id, last_mdns_rec_timestamp) in self.mdns_peers_cache.iter() { - - if last_mdns_rec_timestamp.to_owned() < (utils::get_epoch_ms() - 1000 * 10 as u128) { - if let Ok(peer_id) = PeerId::from_str(peer_id) { - if let Ok(_res) = self.swarm.disconnect_peer_id(peer_id) { - println!("DISCONNECTING LOCAL CONNECTIONS TO {peer_id}"); - } - } - } - } - - } pub fn print_peers_cache(&self) { for (peer_id, peer_data) in self.peers_caches_state.iter() { @@ -467,10 +453,16 @@ impl DomoCache { } } } - SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Mdns( - mdns::Event::Expired(_list), + SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Ping( + ping::Event { peer, connection, result} )) => { - println!("MDNS TTL Expired"); + + if let Ok(_res) = result { + println!("PING OK {} {}", peer.to_string(), connection); + } else { + println!("PING FAILED {} {}, CLOSE CONNECTION", peer.to_string(), connection); + self.swarm.close_connection(connection); + } } SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Mdns( mdns::Event::Discovered(list), @@ -487,7 +479,7 @@ impl DomoCache { let dial_opts = DialOpts::from(peer_id); let _res = self.swarm.dial(dial_opts); - println!("INSERT INTO MDNS CACHE {} {} ", peer_id.to_string(), get_epoch_ms()); + println!("INSERT INTO MDNS CACHE {} {} ", peer_id.to_string(), get_epoch_ms()/1000); self.mdns_peers_cache.insert(peer_id.to_string(), get_epoch_ms()); println!("{:?}", self.mdns_peers_cache); @@ -517,7 +509,6 @@ impl DomoCache { self.send_cache_state_timer = tokio::time::Instant::now() + Duration::from_secs(u64::from(SEND_CACHE_HASH_PERIOD)); self.send_cache_state().await; - self.remove_connections_of_peers(); } PersistentData(data) => { return self.handle_persistent_message_data(&data).await; diff --git a/dht-cache/src/domolibp2p.rs b/dht-cache/src/domolibp2p.rs index 9ed49d2..8d783ab 100644 --- a/dht-cache/src/domolibp2p.rs +++ b/dht-cache/src/domolibp2p.rs @@ -15,7 +15,7 @@ use libp2p::yamux; //use libp2p::tcp::TcpConfig; use libp2p::Transport; -use libp2p::{identity, mdns, swarm::NetworkBehaviour, PeerId, Swarm}; +use libp2p::{identity, mdns, swarm::NetworkBehaviour, PeerId, Swarm, ping}; use std::error::Error; use std::io; @@ -70,9 +70,6 @@ pub async fn start( listen_addr: String ) -> Result<Swarm<DomoBehaviour>, Box<dyn Error>> { let local_peer_id = PeerId::from(local_key_pair.public()); - - - let arr = parse_hex_key(&shared_key)?; let psk = PreSharedKey::new(arr); @@ -93,8 +90,8 @@ pub async fn start( .with_behaviour(|key| { let mdnsconf = mdns::Config { - ttl: Duration::from_secs(10), - query_interval: Duration::from_secs(5), + ttl: Duration::from_secs(30), + query_interval: Duration::from_secs(10), enable_ipv6: false }; @@ -121,7 +118,11 @@ pub async fn start( gossipsub_config, )?; - let behaviour = DomoBehaviour { mdns, gossipsub }; + let ping_config = ping::Config::new().with_interval(Duration::from_secs(5)).with_timeout(Duration::from_secs(1)); + + let ping = ping::Behaviour::new(ping_config); + + let behaviour = DomoBehaviour { mdns, gossipsub , ping }; Ok(behaviour) @@ -150,12 +151,13 @@ pub async fn start( Ok(swarm) } -// We create a custom network behaviour that combines mDNS and gossipsub. +// We create a custom network behaviour that combines mDNS and gossipsub and ping. #[derive(NetworkBehaviour)] #[behaviour(to_swarm = "OutEvent")] pub struct DomoBehaviour { pub mdns: mdns::tokio::Behaviour, pub gossipsub: gossipsub::Behaviour, + pub ping: ping::Behaviour } #[allow(clippy::large_enum_variant)] @@ -163,6 +165,7 @@ pub struct DomoBehaviour { pub enum OutEvent { Gossipsub(gossipsub::Event), Mdns(mdns::Event), + Ping(ping::Event) } impl From<mdns::Event> for OutEvent { @@ -176,3 +179,9 @@ impl From<gossipsub::Event> for OutEvent { Self::Gossipsub(v) } } + +impl From<ping::Event> for OutEvent { + fn from(v: ping::Event) -> Self { + Self::Ping(v) + } +}