Skip to content

Commit

Permalink
muxers/yamux: Format with rustfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Aug 9, 2021
1 parent 7957253 commit cd247a5
Showing 1 changed file with 83 additions and 51 deletions.
134 changes: 83 additions & 51 deletions muxers/yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@
//! Implements the Yamux multiplexing protocol for libp2p, see also the
//! [specification](https://github.com/hashicorp/yamux/blob/master/spec.md).
use futures::{future, prelude::*, ready, stream::{BoxStream, LocalBoxStream}};
use futures::{
future,
prelude::*,
ready,
stream::{BoxStream, LocalBoxStream},
};
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use parking_lot::Mutex;
use std::{fmt, io, iter, pin::Pin, task::{Context, Poll}};
use std::{
fmt, io, iter,
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;

/// A Yamux connection.
Expand All @@ -50,7 +59,7 @@ pub struct OpenSubstreamToken(());

impl<C> Yamux<Incoming<C>>
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
/// Create a new Yamux connection.
fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self {
Expand All @@ -59,7 +68,7 @@ where
let inner = Inner {
incoming: Incoming {
stream: yamux::into_stream(conn).err_into().boxed(),
_marker: std::marker::PhantomData
_marker: std::marker::PhantomData,
},
control: ctrl,
};
Expand All @@ -69,7 +78,7 @@ where

impl<C> Yamux<LocalIncoming<C>>
where
C: AsyncRead + AsyncWrite + Unpin + 'static
C: AsyncRead + AsyncWrite + Unpin + 'static,
{
/// Create a new Yamux connection (which is ![`Send`]).
fn local(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self {
Expand All @@ -78,7 +87,7 @@ where
let inner = Inner {
incoming: LocalIncoming {
stream: yamux::into_stream(conn).err_into().boxed_local(),
_marker: std::marker::PhantomData
_marker: std::marker::PhantomData,
},
control: ctrl,
};
Expand All @@ -91,74 +100,93 @@ pub type YamuxResult<T> = Result<T, YamuxError>;
/// > **Note**: This implementation never emits [`StreamMuxerEvent::AddressChange`] events.
impl<S> StreamMuxer for Yamux<S>
where
S: Stream<Item = Result<yamux::Stream, YamuxError>> + Unpin
S: Stream<Item = Result<yamux::Stream, YamuxError>> + Unpin,
{
type Substream = yamux::Stream;
type OutboundSubstream = OpenSubstreamToken;
type Error = YamuxError;

fn poll_event(&self, c: &mut Context<'_>)
-> Poll<YamuxResult<StreamMuxerEvent<Self::Substream>>>
{
fn poll_event(
&self,
c: &mut Context<'_>,
) -> Poll<YamuxResult<StreamMuxerEvent<Self::Substream>>> {
let mut inner = self.0.lock();
match ready!(inner.incoming.poll_next_unpin(c)) {
Some(Ok(s)) => Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))),
Some(Err(e)) => Poll::Ready(Err(e)),
None => Poll::Ready(Err(yamux::ConnectionError::Closed.into()))
None => Poll::Ready(Err(yamux::ConnectionError::Closed.into())),
}
}

fn open_outbound(&self) -> Self::OutboundSubstream {
OpenSubstreamToken(())
}

fn poll_outbound(&self, c: &mut Context<'_>, _: &mut OpenSubstreamToken)
-> Poll<YamuxResult<Self::Substream>>
{
fn poll_outbound(
&self,
c: &mut Context<'_>,
_: &mut OpenSubstreamToken,
) -> Poll<YamuxResult<Self::Substream>> {
let mut inner = self.0.lock();
Pin::new(&mut inner.control).poll_open_stream(c).map_err(YamuxError)
Pin::new(&mut inner.control)
.poll_open_stream(c)
.map_err(YamuxError)
}

fn destroy_outbound(&self, _: Self::OutboundSubstream) {
self.0.lock().control.abort_open_stream()
}

fn read_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream, b: &mut [u8])
-> Poll<YamuxResult<usize>>
{
Pin::new(s).poll_read(c, b).map_err(|e| YamuxError(e.into()))
}

fn write_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream, b: &[u8])
-> Poll<YamuxResult<usize>>
{
Pin::new(s).poll_write(c, b).map_err(|e| YamuxError(e.into()))
}

fn flush_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream)
-> Poll<YamuxResult<()>>
{
fn read_substream(
&self,
c: &mut Context<'_>,
s: &mut Self::Substream,
b: &mut [u8],
) -> Poll<YamuxResult<usize>> {
Pin::new(s)
.poll_read(c, b)
.map_err(|e| YamuxError(e.into()))
}

fn write_substream(
&self,
c: &mut Context<'_>,
s: &mut Self::Substream,
b: &[u8],
) -> Poll<YamuxResult<usize>> {
Pin::new(s)
.poll_write(c, b)
.map_err(|e| YamuxError(e.into()))
}

fn flush_substream(
&self,
c: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<YamuxResult<()>> {
Pin::new(s).poll_flush(c).map_err(|e| YamuxError(e.into()))
}

fn shutdown_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream)
-> Poll<YamuxResult<()>>
{
fn shutdown_substream(
&self,
c: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<YamuxResult<()>> {
Pin::new(s).poll_close(c).map_err(|e| YamuxError(e.into()))
}

fn destroy_substream(&self, _: Self::Substream) { }
fn destroy_substream(&self, _: Self::Substream) {}

fn close(&self, c: &mut Context<'_>) -> Poll<YamuxResult<()>> {
let mut inner = self.0.lock();
if let std::task::Poll::Ready(x) = Pin::new(&mut inner.control).poll_close(c) {
return Poll::Ready(x.map_err(YamuxError))
return Poll::Ready(x.map_err(YamuxError));
}
while let std::task::Poll::Ready(x) = inner.incoming.poll_next_unpin(c) {
match x {
Some(Ok(_)) => {} // drop inbound stream
Some(Ok(_)) => {} // drop inbound stream
Some(Err(e)) => return Poll::Ready(Err(e)),
None => return Poll::Ready(Ok(()))
None => return Poll::Ready(Ok(())),
}
}
Poll::Pending
Expand All @@ -173,7 +201,7 @@ where
#[derive(Clone)]
pub struct YamuxConfig {
inner: yamux::Config,
mode: Option<yamux::Mode>
mode: Option<yamux::Mode>,
}

/// The window update mode determines when window updates are
Expand Down Expand Up @@ -299,7 +327,7 @@ impl UpgradeInfo for YamuxLocalConfig {

impl<C> InboundUpgrade<C> for YamuxConfig
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Output = Yamux<Incoming<C>>;
type Error = io::Error;
Expand All @@ -313,7 +341,7 @@ where

impl<C> InboundUpgrade<C> for YamuxLocalConfig
where
C: AsyncRead + AsyncWrite + Unpin + 'static
C: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Output = Yamux<LocalIncoming<C>>;
type Error = io::Error;
Expand All @@ -328,7 +356,7 @@ where

impl<C> OutboundUpgrade<C> for YamuxConfig
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Output = Yamux<Incoming<C>>;
type Error = io::Error;
Expand All @@ -342,7 +370,7 @@ where

impl<C> OutboundUpgrade<C> for YamuxLocalConfig
where
C: AsyncRead + AsyncWrite + Unpin + 'static
C: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Output = Yamux<LocalIncoming<C>>;
type Error = io::Error;
Expand All @@ -364,15 +392,15 @@ impl From<YamuxError> for io::Error {
fn from(err: YamuxError) -> Self {
match err.0 {
yamux::ConnectionError::Io(e) => e,
e => io::Error::new(io::ErrorKind::Other, e)
e => io::Error::new(io::ErrorKind::Other, e),
}
}
}

/// The [`futures::stream::Stream`] of incoming substreams.
pub struct Incoming<T> {
stream: BoxStream<'static, Result<yamux::Stream, YamuxError>>,
_marker: std::marker::PhantomData<T>
_marker: std::marker::PhantomData<T>,
}

impl<T> fmt::Debug for Incoming<T> {
Expand All @@ -384,7 +412,7 @@ impl<T> fmt::Debug for Incoming<T> {
/// The [`futures::stream::Stream`] of incoming substreams (`!Send`).
pub struct LocalIncoming<T> {
stream: LocalBoxStream<'static, Result<yamux::Stream, YamuxError>>,
_marker: std::marker::PhantomData<T>
_marker: std::marker::PhantomData<T>,
}

impl<T> fmt::Debug for LocalIncoming<T> {
Expand All @@ -396,7 +424,10 @@ impl<T> fmt::Debug for LocalIncoming<T> {
impl<T> Stream for Incoming<T> {
type Item = Result<yamux::Stream, YamuxError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Option<Self::Item>> {
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next_unpin(cx)
}

Expand All @@ -405,13 +436,15 @@ impl<T> Stream for Incoming<T> {
}
}

impl<T> Unpin for Incoming<T> {
}
impl<T> Unpin for Incoming<T> {}

impl<T> Stream for LocalIncoming<T> {
type Item = Result<yamux::Stream, YamuxError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Option<Self::Item>> {
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next_unpin(cx)
}

Expand All @@ -420,5 +453,4 @@ impl<T> Stream for LocalIncoming<T> {
}
}

impl<T> Unpin for LocalIncoming<T> {
}
impl<T> Unpin for LocalIncoming<T> {}

0 comments on commit cd247a5

Please sign in to comment.