Skip to content

Commit

Permalink
add peer on connect
Browse files Browse the repository at this point in the history
  • Loading branch information
ddeguglielmo committed Nov 16, 2023
1 parent e00fdb2 commit 5b9370f
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 17 deletions.
26 changes: 15 additions & 11 deletions dht-cache/src/domocache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub enum DomoEvent {
}

// period at which we send messages containing our cache hash
const SEND_CACHE_HASH_PERIOD: u8 = 10;
pub const SEND_CACHE_HASH_PERIOD: u8 = 10;

#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct DomoCacheElement {
Expand Down Expand Up @@ -186,7 +186,7 @@ impl DomoCache {
Ok(DomoEvent::PersistentData(m))
}
_ => {
log::info!("Old message received");
//log::info!("Old message received");
Ok(DomoEvent::None)
}
}
Expand Down Expand Up @@ -308,15 +308,14 @@ impl DomoCache {

async fn handle_config_data(&mut self, message: &str) {
let m: DomoCacheStateMessage = serde_json::from_str(message).unwrap();
log::info!(
println!(
"Received cache message from {}, check caches ...",
m.peer_id
);

let mut need_check = false;

if let Some(old) = self.peers_caches_state.get(&m.peer_id) {

if old.publication_timestamp < (get_epoch_ms() - 2 * 1000 * u128::from(SEND_CACHE_HASH_PERIOD) ) {
need_check = true;
}
Expand All @@ -332,7 +331,7 @@ impl DomoCache {
}

async fn check_caches_desynchronization(&mut self) {
log::info!("CHECK CACHE DESYNC SINCE NEW NODE JOINED THE NET");
println!("CHECK CACHE DESYNC SINCE NEW NODE JOINED THE NET");
let local_hash = self.get_cache_hash();
let (sync, leader) = self.is_synchronized(local_hash, &self.peers_caches_state);
if !sync {
Expand Down Expand Up @@ -410,7 +409,11 @@ impl DomoCache {
log::debug!("Address {address:?} expired");
}
SwarmEvent::ConnectionEstablished { peer_id, connection_id, endpoint, .. } => {
log::info!("Connection established {peer_id:?}, {connection_id:?}, {endpoint:?}");
println!("Connection established {peer_id:?}, {connection_id:?}, {endpoint:?}");
self.swarm
.behaviour_mut()
.gossipsub
.add_explicit_peer(&peer_id);
}
SwarmEvent::ConnectionClosed { peer_id, connection_id, endpoint, num_established: _, cause } => {
log::info!("Connection closed {peer_id:?}, {connection_id:?}, {endpoint:?} -> {cause:?}");
Expand All @@ -425,7 +428,7 @@ impl DomoCache {
log::info!("Listener Closed");
}
SwarmEvent::NewListenAddr { address, .. } => {
log::info!("Listening in {address:?}");
println!("Listening in {address:?}");
}
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Gossipsub(
libp2p::gossipsub::Event::Message {
Expand Down Expand Up @@ -465,17 +468,18 @@ impl DomoCache {
)) => {
let local = OffsetDateTime::now_utc();
for (peer, multiaddr) in list {
println!("Discovered peer {peer} {multiaddr}");
log::info!("{}", multiaddr);
let is_local_peer = utils::is_local_peer(&multiaddr.to_string());
if self.loopback_peers_only && !is_local_peer {
log::info!("Skipping peer since it is not local");
continue;
}
self.swarm
.behaviour_mut()
.gossipsub
.add_explicit_peer(&peer);
log::info!("Discovered peer {peer} {local:?}");
.behaviour_mut()
.gossipsub
.add_explicit_peer(&peer);

}

}
Expand Down
2 changes: 1 addition & 1 deletion dht-cache/src/domolibp2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub async fn start(
Ok(behaviour)

})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(2 * crate::domocache::SEND_CACHE_HASH_PERIOD as u64)))
.build();


Expand Down
2 changes: 1 addition & 1 deletion src/domobroker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl DomoBroker {
deleted: m.deleted,
},
);
println!("SENT DATA ON WS {}", get_epoch_ms());
//println!("SENT DATA ON WS {}", get_epoch_ms());
DomoEvent::PersistentData(m2)
}
Ok(DomoEvent::VolatileData(m)) => {
Expand Down
8 changes: 4 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ async fn main() -> Result<(), Box<dyn Error>> {
}

fn report_event(m: &DomoEvent) {
println!("Domo Event received");
//println!("Domo Event received");
match m {
DomoEvent::None => {
println!("None {:?}", m);
//println!("None {:?}", m);
}
DomoEvent::VolatileData(_v) => {
println!("Volatile");
//println!("Volatile");
}
DomoEvent::PersistentData(_v) => {
println!("Persistent");
//println!("Persistent");
}
}
}
Expand Down

0 comments on commit 5b9370f

Please sign in to comment.