Skip to content

Commit

Permalink
Custom ProtocolsHandler and multiple protocols.
Browse files Browse the repository at this point in the history
  1. Implement a custom ProtocolsHandler instead of using
     the OneShotHandler for better control and error handling.
     In particular, all request/response sending/receiving is
     kept in the substreams upgrades and thus the background
     task of a connection.
  2. Support multiple protocols (usually protocol versions)
     with a single `RequestResponse` instance, with
     configurable inbound/outbound support.
  • Loading branch information
Roman S. Borschel committed Jun 7, 2020
1 parent 521fe2d commit fbcf59c
Show file tree
Hide file tree
Showing 8 changed files with 725 additions and 209 deletions.
2 changes: 1 addition & 1 deletion protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
bytes = "0.5"
futures = "0.3.1"
libp2p-core = { version = "0.19.0", path = "../../core" }
libp2p-swarm = { version = "0.19.0", path = "../../swarm" }
smallvec = "1.4"
wasm-timer = "0.2"

[dev-dependencies]
async-std = "< 1.6"
Expand Down
53 changes: 53 additions & 0 deletions protocols/request-response/src/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

pub use libp2p_core::ProtocolName;

use futures::{prelude::*, future::BoxFuture};
use std::io;

/// A `RequestResponseCodec` defines the request and response types
/// for a [`RequestResponse`] protocol and how they are encoded / decoded
/// on an I/O stream.
pub trait RequestResponseCodec {
type Protocol: ProtocolName + Send + Sync + Clone;
type Request: Send + Clone;
type Response: Send;

fn read_request<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T)
-> BoxFuture<'a, Result<Self::Request, io::Error>>
where
T: AsyncRead + Unpin + Send;

fn read_response<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T)
-> BoxFuture<'a, Result<Self::Response, io::Error>>
where
T: AsyncRead + Unpin + Send;

fn write_request<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T, req: Self::Request)
-> BoxFuture<'a, Result<(), io::Error>>
where
T: AsyncWrite + Unpin + Send;

fn write_response<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T, res: Self::Response)
-> BoxFuture<'a, Result<(), io::Error>>
where
T: AsyncWrite + Unpin + Send;
}
303 changes: 303 additions & 0 deletions protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

mod protocol;

use crate::{EMPTY_QUEUE_SHRINK_THRESHOLD, RequestId};
use crate::codec::RequestResponseCodec;

pub use protocol::{RequestProtocol, ResponseProtocol, ProtocolSupport};

use futures::{
channel::oneshot,
future::BoxFuture,
prelude::*,
stream::FuturesUnordered
};
use libp2p_core::{
upgrade::{UpgradeError, NegotiationError},
};
use libp2p_swarm::{
SubstreamProtocol,
protocols_handler::{
KeepAlive,
ProtocolsHandler,
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr,
}
};
use smallvec::SmallVec;
use std::{
collections::VecDeque,
io,
time::Duration,
task::{Context, Poll}
};
use wasm_timer::Instant;

/// A connection handler of a `RequestResponse` protocol.
#[doc(hidden)]
pub struct RequestResponseHandler<TCodec>
where
TCodec: RequestResponseCodec,
{
/// The supported inbound protocols.
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
/// The request/response message codec.
codec: TCodec,
/// The keep-alive timeout of idle connections. A connection is considered
/// idle if there are no outbound substreams.
keep_alive_timeout: Duration,
/// The timeout for inbound and outbound substreams (i.e. request
/// and response processing).
substream_timeout: Duration,
/// The current connection keep-alive.
keep_alive: KeepAlive,
/// A pending fatal error that results in the connection being closed.
pending_error: Option<ProtocolsHandlerUpgrErr<io::Error>>,
/// Queue of events to emit in `poll()`.
pending_events: VecDeque<RequestResponseHandlerEvent<TCodec>>,
/// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`.
outbound: VecDeque<RequestProtocol<TCodec>>,
/// Inbound upgrades waiting for the incoming request.
inbound: FuturesUnordered<BoxFuture<'static,
Result<
(TCodec::Request, oneshot::Sender<TCodec::Response>),
oneshot::Canceled
>>>,
}

impl<TCodec> RequestResponseHandler<TCodec>
where
TCodec: RequestResponseCodec,
{
pub(super) fn new(
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
codec: TCodec,
keep_alive_timeout: Duration,
substream_timeout: Duration,
) -> Self {
Self {
inbound_protocols,
codec,
keep_alive: KeepAlive::Yes,
keep_alive_timeout,
substream_timeout,
outbound: VecDeque::new(),
inbound: FuturesUnordered::new(),
pending_events: VecDeque::new(),
pending_error: None,
}
}
}

/// The events emitted by the [`RequestResponseHandler`].
#[doc(hidden)]
pub enum RequestResponseHandlerEvent<TCodec>
where
TCodec: RequestResponseCodec
{
/// An inbound request.
Request {
request: TCodec::Request,
sender: oneshot::Sender<TCodec::Response>
},
/// An inbound response.
Response {
request_id: RequestId,
response: TCodec::Response
},
/// An outbound upgrade (i.e. request) timed out.
OutboundTimeout(RequestId),
/// An outbound request failed to negotiate a mutually supported protocol.
OutboundUnsupportedProtocols(RequestId),
/// An inbound request timed out.
InboundTimeout,
/// An inbound request failed to negotiate a mutually supported protocol.
InboundUnsupportedProtocols,
}

impl<TCodec> ProtocolsHandler for RequestResponseHandler<TCodec>
where
TCodec: RequestResponseCodec + Send + Sync + Clone + 'static,
TCodec::Request: Send + Clone,
TCodec::Response: Send,
{
type InEvent = RequestProtocol<TCodec>;
type OutEvent = RequestResponseHandlerEvent<TCodec>;
type Error = ProtocolsHandlerUpgrErr<io::Error>;
type InboundProtocol = ResponseProtocol<TCodec>;
type OutboundProtocol = RequestProtocol<TCodec>;
type OutboundOpenInfo = RequestId;

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
let (upgrade, rq_recv, rs_send) = ResponseProtocol::new(
self.inbound_protocols.clone(), self.codec.clone());
// The handler waits for the request to come in.
self.inbound.push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed());
SubstreamProtocol::new(upgrade).with_timeout(self.substream_timeout)
}

fn inject_fully_negotiated_inbound(
&mut self,
(): (),
) {
// TODO
}

fn inject_fully_negotiated_outbound(
&mut self,
response: TCodec::Response,
request_id: RequestId,
) {
self.pending_events.push_back(
RequestResponseHandlerEvent::Response {
request_id, response
});
}

fn inject_event(&mut self, request: Self::InEvent) {
self.keep_alive = KeepAlive::Yes;
self.outbound.push_back(request);
}

fn inject_dial_upgrade_error(
&mut self,
info: RequestId,
error: ProtocolsHandlerUpgrErr<io::Error>,
) {
match error {
ProtocolsHandlerUpgrErr::Timeout => {
self.pending_events.push_back(
RequestResponseHandlerEvent::OutboundTimeout(info));
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The remote merely doesn't support the protocol(s) we requested.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
// An event is reported to permit user code to react to the fact that
// the remote peer does not support the requested protocol(s).
self.pending_events.push_back(
RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error);
}
}
}

fn inject_listen_upgrade_error(
&mut self,
error: ProtocolsHandlerUpgrErr<io::Error>
) {
match error {
ProtocolsHandlerUpgrErr::Timeout => {
self.pending_events.push_back(
RequestResponseHandlerEvent::InboundTimeout);
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The remote merely doesn't support the protocol(s) we requested.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
// An event is reported to permit user code to react to the fact that
// the local peer does not support the requested protocol(s).
self.pending_events.push_back(
RequestResponseHandlerEvent::InboundUnsupportedProtocols);
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error);
}
}
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}

fn poll(
&mut self,
cx: &mut Context,
) -> Poll<
ProtocolsHandlerEvent<RequestProtocol<TCodec>, RequestId, Self::OutEvent, Self::Error>,
> {
// Check for a pending (fatal) error.
if let Some(err) = self.pending_error.take() {
// The handler will not be polled again by the `Swarm`.
return Poll::Ready(ProtocolsHandlerEvent::Close(err))
}

// Drain pending events.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(ProtocolsHandlerEvent::Custom(event))
} else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
self.pending_events.shrink_to_fit();
}

// Check for inbound requests.
while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) {
match result {
Ok((rq, rs_sender)) => {
// We received an inbound request.
self.keep_alive = KeepAlive::Yes;
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RequestResponseHandlerEvent::Request {
request: rq, sender: rs_sender
}))
}
Err(oneshot::Canceled) => {
// The inbound upgrade has errored or timed out reading
// or waiting for the request. The handler is onformed
// via `inject_listen_upgrade_error`.
}
}
}

// Emit outbound requests.
if let Some(request) = self.outbound.pop_front() {
let info = request.request_id();
return Poll::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(request)
.with_timeout(self.substream_timeout),
info,
},
)
}

debug_assert!(self.outbound.is_empty());

if self.outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
self.outbound.shrink_to_fit();
}

if self.inbound.is_empty() {
// No new inbound or outbound requests.
let until = Instant::now() + self.substream_timeout + self.keep_alive_timeout;
self.keep_alive = KeepAlive::Until(until);
}

Poll::Pending
}
}

Loading

0 comments on commit fbcf59c

Please sign in to comment.