Skip to content

Commit

Permalink
disconnect peers
Browse files Browse the repository at this point in the history
  • Loading branch information
ddeguglielmo committed Nov 22, 2023
1 parent bf22da1 commit 97982b5
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 24 deletions.
49 changes: 27 additions & 22 deletions dht-cache/src/domocache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::utils;
use futures::prelude::*;
use libp2p::gossipsub::IdentTopic as Topic;
use libp2p::identity::Keypair;
use libp2p::mdns;
use libp2p::swarm::SwarmEvent;
use libp2p::{mdns, PeerId};
use libp2p::swarm::{dial_opts, SwarmEvent};
use rsa::pkcs8::EncodePrivateKey;
use rsa::RsaPrivateKey;
use serde::{Deserialize, Serialize};
Expand All @@ -21,6 +21,9 @@ use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::utils::get_epoch_ms;
use std::str::FromStr;
use libp2p::swarm::dial_opts::DialOpts;
use crate::domocache::Event::{Continue, RefreshTime};

fn generate_rsa_key() -> (Vec<u8>, Vec<u8>) {
let mut rng = rand::thread_rng();
let bits = 2048;
Expand Down Expand Up @@ -381,6 +384,17 @@ impl DomoCache {
// }
}

pub fn remove_connections_of_peers(&mut self) {
for (peer_id, peer_data) in self.peers_caches_state.iter() {
if peer_data.publication_timestamp < (utils::get_epoch_ms() - 2 * 1000 * u128::from(SEND_CACHE_HASH_PERIOD)){
println!("DISCONNECTING {peer_id}");
if let Ok(peer_id) = PeerId::from_str(peer_id) {
let res = self.swarm.disconnect_peer_id(peer_id);
}
}
}
}

pub fn print_peers_cache(&self) {
for (peer_id, peer_data) in self.peers_caches_state.iter() {
println!(
Expand Down Expand Up @@ -410,10 +424,6 @@ impl DomoCache {
}
SwarmEvent::ConnectionEstablished { peer_id, connection_id, endpoint, .. } => {
println!("Connection established {peer_id:?}, {connection_id:?}, {endpoint:?}");
// self.swarm
// .behaviour_mut()
// .gossipsub
// .add_explicit_peer(&peer_id);
}
SwarmEvent::ConnectionClosed { peer_id, connection_id, endpoint, num_established: _, cause } => {
println!("Connection closed {peer_id:?}, {connection_id:?}, {endpoint:?} -> {cause:?}");
Expand All @@ -429,15 +439,6 @@ impl DomoCache {
}
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening in {address:?}");
/* if address.to_string().contains("10.44.89") {
if let Ok(wg_server_addr) = libp2p::Multiaddr::from_str("/ip4/10.44.89.1/tcp/4489") {
if let Ok(ret) = self.swarm.dial(wg_server_addr) {
println!("CONNECTED TO WG_SERVER");
} else {
println!("CONNECTION TO WG_SERVER FAILED");
}
}
}*/
}
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Gossipsub(
libp2p::gossipsub::Event::Message {
Expand Down Expand Up @@ -470,26 +471,29 @@ impl DomoCache {

for (peer, addr) in list {
println!("MDNS for peer {peer} expired {local:?} {}", addr);
//self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer);
}
}
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Mdns(
mdns::Event::Discovered(list),
)) => {
let local = OffsetDateTime::now_utc();
for (peer, multiaddr) in list {
println!("Discovered peer {peer} {multiaddr}");
for (peer_id, multiaddr) in list {
println!("Discovered peer {peer_id} {multiaddr}");
log::info!("{}", multiaddr);
let is_local_peer = utils::is_local_peer(&multiaddr.to_string());
if self.loopback_peers_only && !is_local_peer {
log::info!("Skipping peer since it is not local");
continue;
}

self.swarm
.behaviour_mut()
.gossipsub
.add_explicit_peer(&peer);
let dial_opts = DialOpts::from(peer_id);
self.swarm.dial(dial_opts);


// self.swarm
// .behaviour_mut()
// .gossipsub
// .add_explicit_peer(&peer);

}

Expand All @@ -511,6 +515,7 @@ impl DomoCache {
self.send_cache_state_timer = tokio::time::Instant::now()
+ Duration::from_secs(u64::from(SEND_CACHE_HASH_PERIOD));
self.send_cache_state().await;
self.remove_connections_of_peers();
}
PersistentData(data) => {
return self.handle_persistent_message_data(&data).await;
Expand Down
4 changes: 2 additions & 2 deletions dht-cache/src/domolibp2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub async fn start(
.with_behaviour(|key| {

let mdnsconf = mdns::Config {
ttl: Duration::from_secs(10),
ttl: Duration::from_secs(100),
query_interval: Duration::from_secs(5),
enable_ipv6: false
};
Expand Down Expand Up @@ -131,7 +131,7 @@ pub async fn start(
Ok(behaviour)

})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(1)))
.build();


Expand Down

0 comments on commit 97982b5

Please sign in to comment.