Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mdns): emit ToSwarm::NewExternalAddrOfPeer on discovery #5753

Merged
merged 11 commits into from
Dec 23, 2024
41 changes: 28 additions & 13 deletions protocols/mdns/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ mod iface;
mod socket;
mod timer;

use futures::{channel::mpsc, Stream, StreamExt};
use if_watch::IfEvent;
use libp2p_core::{transport::PortUse, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::{
behaviour::FromSwarm, dummy, ConnectionDenied, ConnectionId, ListenAddresses, NetworkBehaviour,
THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use smallvec::SmallVec;
hopinheimer marked this conversation as resolved.
Show resolved Hide resolved
use std::collections::VecDeque;
use std::{
cmp,
collections::hash_map::{Entry, HashMap},
Expand All @@ -34,17 +44,7 @@ use std::{
task::{Context, Poll},
time::Instant,
};

use futures::{channel::mpsc, Stream, StreamExt};
use if_watch::IfEvent;
use libp2p_core::{transport::PortUse, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::{
behaviour::FromSwarm, dummy, ConnectionDenied, ConnectionId, ListenAddresses, NetworkBehaviour,
THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use smallvec::SmallVec;

use std::convert::Infallible;
use self::iface::InterfaceState;
use crate::{
behaviour::{socket::AsyncSocket, timer::Builder},
Expand Down Expand Up @@ -188,6 +188,9 @@ where
listen_addresses: Arc<RwLock<ListenAddresses>>,

local_peer_id: PeerId,

/// Pending behaviour events to be emitted.
pending_events: VecDeque<ToSwarm<Event, Infallible>>,
}

impl<P> Behaviour<P>
Expand All @@ -208,6 +211,7 @@ where
closest_expiration: Default::default(),
listen_addresses: Default::default(),
local_peer_id,
pending_events: Default::default(),
})
}

Expand Down Expand Up @@ -304,6 +308,11 @@ where
&mut self,
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// Checking for pending events and emit them
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
}
hopinheimer marked this conversation as resolved.
Show resolved Hide resolved

// Poll ifwatch.
while let Poll::Ready(Some(event)) = Pin::new(&mut self.if_watch).poll_next(cx) {
match event {
Expand Down Expand Up @@ -359,13 +368,19 @@ where
} else {
tracing::info!(%peer, address=%addr, "discovered peer on address");
self.discovered_nodes.push((peer, addr.clone(), expiration));
discovered.push((peer, addr));
discovered.push((peer, addr.clone()));

self.pending_events
.push_back(ToSwarm::NewExternalAddrOfPeer {
peer_id: peer,
address: addr,
});
}
}

if !discovered.is_empty() {
let event = Event::Discovered(discovered);
return Poll::Ready(ToSwarm::GenerateEvent(event));
self.pending_events.push_back(ToSwarm::GenerateEvent(event));
hopinheimer marked this conversation as resolved.
Show resolved Hide resolved
}
// Emit expired event.
let now = Instant::now();
Expand Down
Loading