Skip to content

Commit

Permalink
refactor(request-response): don't use upgrade infrastructure
Browse files Browse the repository at this point in the history
This patch refactors `libp2p-request-response` to not use the "upgrade infrastructure" provided by `libp2p-swarm`. Instead, we directly convert the negotiated streams into futures that read and write the messages.

Related: #3268.
Related: #2863.

Pull-Request: #3914.

Co-authored-by: Yiannis Marangos <[email protected]>
  • Loading branch information
thomaseizinger and oblique authored Oct 26, 2023
1 parent fc6efaf commit fd7b5ac
Show file tree
Hide file tree
Showing 16 changed files with 1,030 additions and 330 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions examples/file-sharing/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use libp2p::{
identity, kad,
multiaddr::Protocol,
noise,
request_response::{self, ProtocolSupport, RequestId, ResponseChannel},
request_response::{self, OutboundRequestId, ProtocolSupport, ResponseChannel},
swarm::{NetworkBehaviour, Swarm, SwarmEvent},
tcp, yamux, PeerId,
};
Expand Down Expand Up @@ -175,7 +175,7 @@ pub(crate) struct EventLoop {
pending_start_providing: HashMap<kad::QueryId, oneshot::Sender<()>>,
pending_get_providers: HashMap<kad::QueryId, oneshot::Sender<HashSet<PeerId>>>,
pending_request_file:
HashMap<RequestId, oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>>,
HashMap<OutboundRequestId, oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>>,
}

impl EventLoop {
Expand Down
3 changes: 3 additions & 0 deletions protocols/autonat/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 0.12.0 - unreleased

- Remove `Clone`, `PartialEq` and `Eq` implementations on `Event` and its sub-structs.
The `Event` also contains errors which are not clonable or comparable.
See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914).

## 0.11.0

Expand Down
16 changes: 9 additions & 7 deletions protocols/autonat/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use instant::Instant;
use libp2p_core::{multiaddr::Protocol, ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_request_response::{
self as request_response, ProtocolSupport, RequestId, ResponseChannel,
self as request_response, InboundRequestId, OutboundRequestId, ProtocolSupport, ResponseChannel,
};
use libp2p_swarm::{
behaviour::{
Expand Down Expand Up @@ -133,7 +133,7 @@ impl ProbeId {
}

/// Event produced by [`Behaviour`].
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum Event {
/// Event on an inbound probe.
InboundProbe(InboundProbeEvent),
Expand Down Expand Up @@ -187,14 +187,14 @@ pub struct Behaviour {
PeerId,
(
ProbeId,
RequestId,
InboundRequestId,
Vec<Multiaddr>,
ResponseChannel<DialResponse>,
),
>,

// Ongoing outbound probes and mapped to the inner request id.
ongoing_outbound: HashMap<RequestId, ProbeId>,
ongoing_outbound: HashMap<OutboundRequestId, ProbeId>,

// Connected peers with the observed address of each connection.
// If the endpoint of a connection is relayed or not global (in case of Config::only_global_ips),
Expand All @@ -220,9 +220,11 @@ pub struct Behaviour {
impl Behaviour {
pub fn new(local_peer_id: PeerId, config: Config) -> Self {
let protocols = iter::once((DEFAULT_PROTOCOL_NAME, ProtocolSupport::Full));
let mut cfg = request_response::Config::default();
cfg.set_request_timeout(config.timeout);
let inner = request_response::Behaviour::with_codec(AutoNatCodec, protocols, cfg);
let inner = request_response::Behaviour::with_codec(
AutoNatCodec,
protocols,
request_response::Config::default().with_request_timeout(config.timeout),
);
Self {
local_peer_id,
inner,
Expand Down
10 changes: 5 additions & 5 deletions protocols/autonat/src/behaviour/as_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures_timer::Delay;
use instant::Instant;
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_request_response::{self as request_response, OutboundFailure, RequestId};
use libp2p_request_response::{self as request_response, OutboundFailure, OutboundRequestId};
use libp2p_swarm::{ConnectionId, ListenAddresses, ToSwarm};
use rand::{seq::SliceRandom, thread_rng};
use std::{
Expand All @@ -39,7 +39,7 @@ use std::{
};

/// Outbound probe failed or was aborted.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum OutboundProbeError {
/// Probe was aborted because no server is known, or all servers
/// are throttled through [`Config::throttle_server_period`].
Expand All @@ -53,7 +53,7 @@ pub enum OutboundProbeError {
Response(ResponseError),
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum OutboundProbeEvent {
/// A dial-back request was sent to a remote peer.
Request {
Expand Down Expand Up @@ -91,7 +91,7 @@ pub(crate) struct AsClient<'a> {
pub(crate) throttled_servers: &'a mut Vec<(PeerId, Instant)>,
pub(crate) nat_status: &'a mut NatStatus,
pub(crate) confidence: &'a mut usize,
pub(crate) ongoing_outbound: &'a mut HashMap<RequestId, ProbeId>,
pub(crate) ongoing_outbound: &'a mut HashMap<OutboundRequestId, ProbeId>,
pub(crate) last_probe: &'a mut Option<Instant>,
pub(crate) schedule_probe: &'a mut Delay,
pub(crate) listen_addresses: &'a ListenAddresses,
Expand All @@ -117,7 +117,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> {
let probe_id = self
.ongoing_outbound
.remove(&request_id)
.expect("RequestId exists.");
.expect("OutboundRequestId exists.");

let event = match response.result.clone() {
Ok(address) => OutboundProbeEvent::Response {
Expand Down
8 changes: 4 additions & 4 deletions protocols/autonat/src/behaviour/as_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use instant::Instant;
use libp2p_core::{multiaddr::Protocol, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_request_response::{
self as request_response, InboundFailure, RequestId, ResponseChannel,
self as request_response, InboundFailure, InboundRequestId, ResponseChannel,
};
use libp2p_swarm::{
dial_opts::{DialOpts, PeerCondition},
Expand All @@ -38,15 +38,15 @@ use std::{
};

/// Inbound probe failed.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum InboundProbeError {
/// Receiving the dial-back request or sending a response failed.
InboundRequest(InboundFailure),
/// We refused or failed to dial the client.
Response(ResponseError),
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug)]
pub enum InboundProbeEvent {
/// A dial-back request was received from a remote peer.
Request {
Expand Down Expand Up @@ -85,7 +85,7 @@ pub(crate) struct AsServer<'a> {
PeerId,
(
ProbeId,
RequestId,
InboundRequestId,
Vec<Multiaddr>,
ResponseChannel<DialResponse>,
),
Expand Down
8 changes: 4 additions & 4 deletions protocols/autonat/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn test_auto_probe() {
match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => {
assert!(peer.is_none());
assert_eq!(error, OutboundProbeError::NoAddresses);
assert!(matches!(error, OutboundProbeError::NoAddresses));
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
Expand Down Expand Up @@ -181,10 +181,10 @@ async fn test_confidence() {
peer,
error,
} if !test_public => {
assert_eq!(
assert!(matches!(
error,
OutboundProbeError::Response(ResponseError::DialError)
);
));
(peer.unwrap(), probe_id)
}
other => panic!("Unexpected Outbound Event: {other:?}"),
Expand Down Expand Up @@ -261,7 +261,7 @@ async fn test_throttle_server_period() {
match client.next_behaviour_event().await {
Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => {
assert!(peer.is_none());
assert_eq!(error, OutboundProbeError::NoServer);
assert!(matches!(error, OutboundProbeError::NoServer));
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
Expand Down
9 changes: 6 additions & 3 deletions protocols/autonat/tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ async fn test_dial_error() {
}) => {
assert_eq!(probe_id, request_probe_id);
assert_eq!(peer, client_id);
assert_eq!(error, InboundProbeError::Response(ResponseError::DialError));
assert!(matches!(
error,
InboundProbeError::Response(ResponseError::DialError)
));
}
other => panic!("Unexpected behaviour event: {other:?}."),
}
Expand Down Expand Up @@ -252,10 +255,10 @@ async fn test_throttle_peer_max() {
}) => {
assert_eq!(client_id, peer);
assert_ne!(first_probe_id, probe_id);
assert_eq!(
assert!(matches!(
error,
InboundProbeError::Response(ResponseError::DialRefused)
)
));
}
other => panic!("Unexpected behaviour event: {other:?}."),
};
Expand Down
14 changes: 9 additions & 5 deletions protocols/rendezvous/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use libp2p_core::{Endpoint, Multiaddr, PeerRecord};
use libp2p_identity::{Keypair, PeerId, SigningError};
use libp2p_request_response::{ProtocolSupport, RequestId};
use libp2p_request_response::{OutboundRequestId, ProtocolSupport};
use libp2p_swarm::{
ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
Expand All @@ -41,8 +41,8 @@ pub struct Behaviour {

keypair: Keypair,

waiting_for_register: HashMap<RequestId, (PeerId, Namespace)>,
waiting_for_discovery: HashMap<RequestId, (PeerId, Option<Namespace>)>,
waiting_for_register: HashMap<OutboundRequestId, (PeerId, Namespace)>,
waiting_for_discovery: HashMap<OutboundRequestId, (PeerId, Option<Namespace>)>,

/// Hold addresses of all peers that we have discovered so far.
///
Expand Down Expand Up @@ -336,7 +336,7 @@ impl NetworkBehaviour for Behaviour {
}

impl Behaviour {
fn event_for_outbound_failure(&mut self, req_id: &RequestId) -> Option<Event> {
fn event_for_outbound_failure(&mut self, req_id: &OutboundRequestId) -> Option<Event> {
if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) {
return Some(Event::RegisterFailed {
rendezvous_node,
Expand All @@ -356,7 +356,11 @@ impl Behaviour {
None
}

fn handle_response(&mut self, request_id: &RequestId, response: Message) -> Option<Event> {
fn handle_response(
&mut self,
request_id: &OutboundRequestId,
response: Message,
) -> Option<Event> {
match response {
RegisterResponse(Ok(ttl)) => {
if let Some((rendezvous_node, namespace)) =
Expand Down
8 changes: 7 additions & 1 deletion protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

- Remove `request_response::Config::set_connection_keep_alive` in favor of `SwarmBuilder::idle_connection_timeout`.
See [PR 4679](https://github.com/libp2p/rust-libp2p/pull/4679).

- Allow at most 100 concurrent inbound + outbound streams per instance of `request_response::Behaviour`.
This limit is configurable via `Config::with_max_concurrent_streams`.
See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914).
- Report IO failures on inbound and outbound streams.
See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914).
- Introduce dedicated types for `InboundRequestId` and `OutboundRequestId`.
See [PR 3914](https://github.com/libp2p/rust-libp2p/pull/3914).
- Keep peer addresses in `HashSet` instead of `SmallVec` to prevent adding duplicate addresses.
See [PR 4700](https://github.com/libp2p/rust-libp2p/pull/4700).

Expand Down
3 changes: 3 additions & 0 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ serde_json = { version = "1.0.107", optional = true }
smallvec = "1.11.1"
void = "1.0.2"
log = "0.4.20"
futures-timer = "3.0.2"
futures-bounded = { workspace = true }

[features]
json = ["dep:serde", "dep:serde_json", "libp2p-swarm/macros"]
cbor = ["dep:serde", "dep:cbor4ii", "libp2p-swarm/macros"]

[dev-dependencies]
anyhow = "1.0.75"
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.10.0"
libp2p-noise = { workspace = true }
Expand Down
Loading

0 comments on commit fd7b5ac

Please sign in to comment.