Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: Add FromFn ConnectionHandler #2852

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2c580d9
Add `ReadyUpgrade`
thomaseizinger Aug 24, 2022
b2d1c05
Add basic `FromFn` `ConnectionHandler`
thomaseizinger Aug 24, 2022
230fabb
Add `TState` abstraction
thomaseizinger Aug 28, 2022
58fe45b
Publicly expose types
thomaseizinger Aug 28, 2022
ff1db42
Fix docs
thomaseizinger Aug 28, 2022
1b175d2
Implement limit for max inbound streams and pending dials
thomaseizinger Aug 30, 2022
69fa94f
Implement std::error::Error for `OpenError`
thomaseizinger Aug 30, 2022
54e6437
Avoid dial terminology for streams
thomaseizinger Sep 5, 2022
635730a
Remove `idle_waker`
thomaseizinger Sep 5, 2022
b37fa5c
Make `TState` configurable
thomaseizinger Sep 5, 2022
d4079bf
Finish local work before producing new work
thomaseizinger Sep 5, 2022
bc361ec
Introduce `FromFnProto`
thomaseizinger Sep 5, 2022
712180b
Expose `remote_peer_id` and `connected_point` to closures
thomaseizinger Sep 5, 2022
f9f8e75
Merge branch 'master' into from-fn-connection-handler
thomaseizinger Nov 2, 2022
5d7f0bd
Implement `Shared` abstraction to automatically share state
thomaseizinger Nov 2, 2022
8ee59fd
Don't allow ConnectionHandlers to modify the state
thomaseizinger Nov 2, 2022
7fb4387
Implement test
thomaseizinger Nov 2, 2022
ac5c236
Remove unnecessary waker
thomaseizinger Nov 2, 2022
3e81e72
Separate clone-able registration data
thomaseizinger Nov 2, 2022
49c6eb1
WIP: Migrate `libp2p-rendezvous` to `from_fn`
thomaseizinger Nov 2, 2022
411c0d7
fixup! WIP: Migrate `libp2p-rendezvous` to `from_fn`
thomaseizinger Nov 11, 2022
5e185d8
fixup! WIP: Migrate `libp2p-rendezvous` to `from_fn`
thomaseizinger Nov 11, 2022
bdbeadc
Enforce the use of `Shared`
thomaseizinger Nov 11, 2022
660c0a3
Share state between connection handlers
thomaseizinger Nov 11, 2022
f5a3c50
Introduce builder pattern
thomaseizinger Nov 11, 2022
df74f14
Update docs
thomaseizinger Nov 11, 2022
f84388f
Make inbound streams impossible when no handler is configured
thomaseizinger Nov 11, 2022
6d1add5
Allow for streaming protocols
thomaseizinger Nov 11, 2022
913caf3
Pass `ConnectedPoint` by value
thomaseizinger Nov 11, 2022
e9cd5fc
Allow regular async functions to be used as handlers
thomaseizinger Nov 11, 2022
5801576
Flatten errors
thomaseizinger Nov 12, 2022
c15433f
Implement `libp2p-ping` with `from_fn` abstraction
thomaseizinger Nov 12, 2022
a418d2f
Merge branch 'master' into from-fn-connection-handler
thomaseizinger Nov 16, 2022
628e9ab
Fully migrate rendezvous
thomaseizinger Nov 16, 2022
7bdd953
Minimise diff
thomaseizinger Nov 16, 2022
4485c48
Merge branch 'master' into from-fn-connection-handler
thomaseizinger Nov 18, 2022
6f00703
Update to new behaviour interface
thomaseizinger Nov 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
430 changes: 0 additions & 430 deletions protocols/ping/src/handler.rs

This file was deleted.

348 changes: 293 additions & 55 deletions protocols/ping/src/lib.rs

Large diffs are not rendered by default.

300 changes: 197 additions & 103 deletions protocols/rendezvous/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,38 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, Ttl};
use crate::handler;
use crate::handler::outbound;
use crate::handler::outbound::OpenInfo;
use crate::substream_handler::SubstreamConnectionHandler;
use crate::codec::{
Cookie, Error, ErrorCode, Message, Namespace, NewRegistrationRequest, Registration,
RendezvousCodec, Ttl,
};
use crate::PROTOCOL_IDENT;
use asynchronous_codec::Framed;
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use instant::Duration;
use futures::SinkExt;
use libp2p_core::connection::ConnectionId;
use libp2p_core::identity::error::SigningError;
use libp2p_core::identity::Keypair;
use libp2p_core::{Multiaddr, PeerId, PeerRecord};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, PeerRecord};
use libp2p_swarm::behaviour::FromSwarm;
use libp2p_swarm::handler::from_fn;
use libp2p_swarm::{
CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
};
use std::collections::{HashMap, VecDeque};
use std::iter::FromIterator;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use void::Void;

pub struct Behaviour {
events: VecDeque<
NetworkBehaviourAction<
Event,
SubstreamConnectionHandler<void::Void, outbound::Stream, outbound::OpenInfo>,
from_fn::FromFnProto<Void, Result<OutboundEvent, Error>, OpenInfo, ()>,
>,
>,
keypair: Keypair,
Expand Down Expand Up @@ -87,9 +92,7 @@ impl Behaviour {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: rendezvous_node,
event: handler::OutboundInEvent::NewSubstream {
open_info: OpenInfo::UnregisterRequest(namespace),
},
event: from_fn::InEvent::NewOutbound(OpenInfo::UnregisterRequest(namespace)),
handler: NotifyHandler::Any,
});
}
Expand All @@ -111,13 +114,11 @@ impl Behaviour {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: rendezvous_node,
event: handler::OutboundInEvent::NewSubstream {
open_info: OpenInfo::DiscoverRequest {
namespace: ns,
cookie,
limit,
},
},
event: from_fn::InEvent::NewOutbound(OpenInfo::DiscoverRequest {
namespace: ns,
cookie,
limit,
}),
handler: NotifyHandler::Any,
});
}
Expand Down Expand Up @@ -164,15 +165,45 @@ pub enum Event {
Expired { peer: PeerId },
}

#[derive(Debug, Clone)]
pub enum OutboundEvent {
Registered {
namespace: Namespace,
ttl: Ttl,
},
RegisterFailed(Namespace, ErrorCode),
Discovered {
registrations: Vec<Registration>,
cookie: Cookie,
},
DiscoverFailed {
namespace: Option<Namespace>,
error: ErrorCode,
},
}

#[allow(clippy::large_enum_variant)]
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Clone)]
pub enum OpenInfo {
RegisterRequest(NewRegistrationRequest),
UnregisterRequest(Namespace),
DiscoverRequest {
namespace: Option<Namespace>,
cookie: Option<Cookie>,
limit: Option<Ttl>,
},
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler =
SubstreamConnectionHandler<void::Void, outbound::Stream, outbound::OpenInfo>;
type ConnectionHandler = from_fn::FromFnProto<Void, Result<OutboundEvent, Error>, OpenInfo, ()>;
type OutEvent = Event;

fn new_handler(&mut self) -> Self::ConnectionHandler {
let initial_keep_alive = Duration::from_secs(30);

SubstreamConnectionHandler::new_outbound_only(initial_keep_alive)
from_fn::from_fn(PROTOCOL_IDENT)
.without_state()
.without_inbound_handler()
.with_outbound_handler(10, outbound_stream_handler)
}

fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
Expand All @@ -187,25 +218,89 @@ impl NetworkBehaviour for Behaviour {
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: handler::OutboundOutEvent,
_: ConnectionId,
event: from_fn::OutEvent<Void, Result<OutboundEvent, Error>, OpenInfo>,
) {
let new_events = match event {
handler::OutboundOutEvent::InboundEvent { message, .. } => void::unreachable(message),
handler::OutboundOutEvent::OutboundEvent { message, .. } => handle_outbound_event(
message,
peer_id,
&mut self.discovered_peers,
&mut self.expiring_registrations,
),
handler::OutboundOutEvent::InboundError { error, .. } => void::unreachable(error),
handler::OutboundOutEvent::OutboundError { error, .. } => {
log::warn!("Connection with peer {} failed: {}", peer_id, error);

vec![NetworkBehaviourAction::CloseConnection {
peer_id,
connection: CloseConnection::One(connection_id),
}]
from_fn::OutEvent::InboundEmitted(never) => void::unreachable(never),
from_fn::OutEvent::OutboundEmitted(Ok(OutboundEvent::Discovered {
registrations,
cookie,
})) => {
self.discovered_peers
.extend(registrations.iter().map(|registration| {
let peer_id = registration.record.peer_id();
let namespace = registration.namespace.clone();
let addresses = registration.record.addresses().to_vec();

((peer_id, namespace), addresses)
}));
self.expiring_registrations
.extend(registrations.iter().cloned().map(|registration| {
async move {
// if the timer errors we consider it expired
futures_timer::Delay::new(Duration::from_secs(registration.ttl as u64))
.await;

(registration.record.peer_id(), registration.namespace)
}
.boxed()
}));

vec![NetworkBehaviourAction::GenerateEvent(Event::Discovered {
rendezvous_node: peer_id,
registrations,
cookie,
})]
}
from_fn::OutEvent::OutboundEmitted(Ok(OutboundEvent::Registered {
namespace,
ttl,
})) => {
vec![NetworkBehaviourAction::GenerateEvent(Event::Registered {
rendezvous_node: peer_id,
ttl,
namespace,
})]
}
from_fn::OutEvent::OutboundEmitted(Ok(OutboundEvent::DiscoverFailed {
namespace,
error,
})) => {
vec![NetworkBehaviourAction::GenerateEvent(
Event::DiscoverFailed {
rendezvous_node: peer_id,
namespace,
error,
},
)]
}
from_fn::OutEvent::OutboundEmitted(Ok(OutboundEvent::RegisterFailed(
namespace,
error,
))) => {
vec![NetworkBehaviourAction::GenerateEvent(
Event::RegisterFailed(RegisterError::Remote {
rendezvous_node: peer_id,
namespace,
error,
}),
)]
}
from_fn::OutEvent::OutboundEmitted(Err(_)) => {
todo!()
}
from_fn::OutEvent::FailedToOpen(from_fn::OpenError::Timeout(_)) => {
todo!()
}
from_fn::OutEvent::FailedToOpen(from_fn::OpenError::NegotiationFailed(..)) => {
todo!()
}
from_fn::OutEvent::FailedToOpen(from_fn::OpenError::LimitExceeded { .. }) => {
todo!()
}
from_fn::OutEvent::FailedToOpen(from_fn::OpenError::Unsupported { .. }) => {
todo!()
}
};

Expand Down Expand Up @@ -238,13 +333,13 @@ impl NetworkBehaviour for Behaviour {
let action = match PeerRecord::new(&self.keypair, external_addresses) {
Ok(peer_record) => NetworkBehaviourAction::NotifyHandler {
peer_id: rendezvous_node,
event: handler::OutboundInEvent::NewSubstream {
open_info: OpenInfo::RegisterRequest(NewRegistration {
event: from_fn::InEvent::NewOutbound(OpenInfo::RegisterRequest(
NewRegistrationRequest {
namespace,
record: peer_record,
ttl,
}),
},
},
)),
handler: NotifyHandler::Any,
},
Err(signing_error) => NetworkBehaviourAction::GenerateEvent(Event::RegisterFailed(
Expand Down Expand Up @@ -285,70 +380,69 @@ impl NetworkBehaviour for Behaviour {
}
}

fn handle_outbound_event(
event: outbound::OutEvent,
peer_id: PeerId,
discovered_peers: &mut HashMap<(PeerId, Namespace), Vec<Multiaddr>>,
expiring_registrations: &mut FuturesUnordered<BoxFuture<'static, (PeerId, Namespace)>>,
) -> Vec<
NetworkBehaviourAction<
Event,
SubstreamConnectionHandler<void::Void, outbound::Stream, outbound::OpenInfo>,
>,
> {
match event {
outbound::OutEvent::Registered { namespace, ttl } => {
vec![NetworkBehaviourAction::GenerateEvent(Event::Registered {
rendezvous_node: peer_id,
ttl,
async fn outbound_stream_handler(
substream: NegotiatedSubstream,
_: PeerId,
_: ConnectedPoint,
_: Arc<()>,
request: OpenInfo,
) -> Result<OutboundEvent, Error> {
let mut substream = Framed::new(substream, RendezvousCodec::default());

substream
.send(match request.clone() {
OpenInfo::RegisterRequest(new_registration) => Message::Register(new_registration),
OpenInfo::UnregisterRequest(namespace) => Message::Unregister(namespace),
OpenInfo::DiscoverRequest {
namespace,
})]
cookie,
limit,
} => Message::Discover {
namespace,
cookie,
limit,
},
})
.await?;

let response = substream.next().await.transpose()?;

let out_event = match (request, response) {
(OpenInfo::RegisterRequest(r), Some(Message::RegisterResponse(Ok(ttl)))) => {
OutboundEvent::Registered {
namespace: r.namespace,
ttl,
}
}
outbound::OutEvent::RegisterFailed(namespace, error) => {
vec![NetworkBehaviourAction::GenerateEvent(
Event::RegisterFailed(RegisterError::Remote {
rendezvous_node: peer_id,
namespace,
error,
}),
)]
(OpenInfo::RegisterRequest(r), Some(Message::RegisterResponse(Err(e)))) => {
OutboundEvent::RegisterFailed(r.namespace, e)
}
outbound::OutEvent::Discovered {
(
OpenInfo::DiscoverRequest { .. },
Some(Message::DiscoverResponse(Ok((registrations, cookie)))),
) => OutboundEvent::Discovered {
registrations,
cookie,
} => {
discovered_peers.extend(registrations.iter().map(|registration| {
let peer_id = registration.record.peer_id();
let namespace = registration.namespace.clone();

let addresses = registration.record.addresses().to_vec();

((peer_id, namespace), addresses)
}));
expiring_registrations.extend(registrations.iter().cloned().map(|registration| {
async move {
// if the timer errors we consider it expired
futures_timer::Delay::new(Duration::from_secs(registration.ttl)).await;

(registration.record.peer_id(), registration.namespace)
}
.boxed()
}));
},
(
OpenInfo::DiscoverRequest { namespace, .. },
Some(Message::DiscoverResponse(Err(error))),
) => OutboundEvent::DiscoverFailed { namespace, error },
(OpenInfo::UnregisterRequest(_), None) => {
// All good.

vec![NetworkBehaviourAction::GenerateEvent(Event::Discovered {
rendezvous_node: peer_id,
registrations,
cookie,
})]
todo!()
}
outbound::OutEvent::DiscoverFailed { namespace, error } => {
vec![NetworkBehaviourAction::GenerateEvent(
Event::DiscoverFailed {
rendezvous_node: peer_id,
namespace,
error,
},
)]
(_, None) => {
// EOF?
todo!()
}
}
_ => {
panic!("protocol violation") // TODO: Make two different codecs to avoid this?
}
};

substream.close().await?;

Ok(out_event)
}
Loading