From cd247a5bd11e4267f6369dcabfee1dfb6c1ae8b5 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 9 Aug 2021 14:23:39 +0200 Subject: [PATCH] muxers/yamux: Format with rustfmt --- muxers/yamux/src/lib.rs | 134 +++++++++++++++++++++++++--------------- 1 file changed, 83 insertions(+), 51 deletions(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index eb47ad8d0ce..941e0fefd8e 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -21,11 +21,20 @@ //! Implements the Yamux multiplexing protocol for libp2p, see also the //! [specification](https://github.com/hashicorp/yamux/blob/master/spec.md). -use futures::{future, prelude::*, ready, stream::{BoxStream, LocalBoxStream}}; +use futures::{ + future, + prelude::*, + ready, + stream::{BoxStream, LocalBoxStream}, +}; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use parking_lot::Mutex; -use std::{fmt, io, iter, pin::Pin, task::{Context, Poll}}; +use std::{ + fmt, io, iter, + pin::Pin, + task::{Context, Poll}, +}; use thiserror::Error; /// A Yamux connection. @@ -50,7 +59,7 @@ pub struct OpenSubstreamToken(()); impl Yamux> where - C: AsyncRead + AsyncWrite + Send + Unpin + 'static + C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { /// Create a new Yamux connection. fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { @@ -59,7 +68,7 @@ where let inner = Inner { incoming: Incoming { stream: yamux::into_stream(conn).err_into().boxed(), - _marker: std::marker::PhantomData + _marker: std::marker::PhantomData, }, control: ctrl, }; @@ -69,7 +78,7 @@ where impl Yamux> where - C: AsyncRead + AsyncWrite + Unpin + 'static + C: AsyncRead + AsyncWrite + Unpin + 'static, { /// Create a new Yamux connection (which is ![`Send`]). fn local(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { @@ -78,7 +87,7 @@ where let inner = Inner { incoming: LocalIncoming { stream: yamux::into_stream(conn).err_into().boxed_local(), - _marker: std::marker::PhantomData + _marker: std::marker::PhantomData, }, control: ctrl, }; @@ -91,20 +100,21 @@ pub type YamuxResult = Result; /// > **Note**: This implementation never emits [`StreamMuxerEvent::AddressChange`] events. impl StreamMuxer for Yamux where - S: Stream> + Unpin + S: Stream> + Unpin, { type Substream = yamux::Stream; type OutboundSubstream = OpenSubstreamToken; type Error = YamuxError; - fn poll_event(&self, c: &mut Context<'_>) - -> Poll>> - { + fn poll_event( + &self, + c: &mut Context<'_>, + ) -> Poll>> { let mut inner = self.0.lock(); match ready!(inner.incoming.poll_next_unpin(c)) { Some(Ok(s)) => Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))), Some(Err(e)) => Poll::Ready(Err(e)), - None => Poll::Ready(Err(yamux::ConnectionError::Closed.into())) + None => Poll::Ready(Err(yamux::ConnectionError::Closed.into())), } } @@ -112,53 +122,71 @@ where OpenSubstreamToken(()) } - fn poll_outbound(&self, c: &mut Context<'_>, _: &mut OpenSubstreamToken) - -> Poll> - { + fn poll_outbound( + &self, + c: &mut Context<'_>, + _: &mut OpenSubstreamToken, + ) -> Poll> { let mut inner = self.0.lock(); - Pin::new(&mut inner.control).poll_open_stream(c).map_err(YamuxError) + Pin::new(&mut inner.control) + .poll_open_stream(c) + .map_err(YamuxError) } fn destroy_outbound(&self, _: Self::OutboundSubstream) { self.0.lock().control.abort_open_stream() } - fn read_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream, b: &mut [u8]) - -> Poll> - { - Pin::new(s).poll_read(c, b).map_err(|e| YamuxError(e.into())) - } - - fn write_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream, b: &[u8]) - -> Poll> - { - Pin::new(s).poll_write(c, b).map_err(|e| YamuxError(e.into())) - } - - fn flush_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream) - -> Poll> - { + fn read_substream( + &self, + c: &mut Context<'_>, + s: &mut Self::Substream, + b: &mut [u8], + ) -> Poll> { + Pin::new(s) + .poll_read(c, b) + .map_err(|e| YamuxError(e.into())) + } + + fn write_substream( + &self, + c: &mut Context<'_>, + s: &mut Self::Substream, + b: &[u8], + ) -> Poll> { + Pin::new(s) + .poll_write(c, b) + .map_err(|e| YamuxError(e.into())) + } + + fn flush_substream( + &self, + c: &mut Context<'_>, + s: &mut Self::Substream, + ) -> Poll> { Pin::new(s).poll_flush(c).map_err(|e| YamuxError(e.into())) } - fn shutdown_substream(&self, c: &mut Context<'_>, s: &mut Self::Substream) - -> Poll> - { + fn shutdown_substream( + &self, + c: &mut Context<'_>, + s: &mut Self::Substream, + ) -> Poll> { Pin::new(s).poll_close(c).map_err(|e| YamuxError(e.into())) } - fn destroy_substream(&self, _: Self::Substream) { } + fn destroy_substream(&self, _: Self::Substream) {} fn close(&self, c: &mut Context<'_>) -> Poll> { let mut inner = self.0.lock(); if let std::task::Poll::Ready(x) = Pin::new(&mut inner.control).poll_close(c) { - return Poll::Ready(x.map_err(YamuxError)) + return Poll::Ready(x.map_err(YamuxError)); } while let std::task::Poll::Ready(x) = inner.incoming.poll_next_unpin(c) { match x { - Some(Ok(_)) => {} // drop inbound stream + Some(Ok(_)) => {} // drop inbound stream Some(Err(e)) => return Poll::Ready(Err(e)), - None => return Poll::Ready(Ok(())) + None => return Poll::Ready(Ok(())), } } Poll::Pending @@ -173,7 +201,7 @@ where #[derive(Clone)] pub struct YamuxConfig { inner: yamux::Config, - mode: Option + mode: Option, } /// The window update mode determines when window updates are @@ -299,7 +327,7 @@ impl UpgradeInfo for YamuxLocalConfig { impl InboundUpgrade for YamuxConfig where - C: AsyncRead + AsyncWrite + Send + Unpin + 'static + C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { type Output = Yamux>; type Error = io::Error; @@ -313,7 +341,7 @@ where impl InboundUpgrade for YamuxLocalConfig where - C: AsyncRead + AsyncWrite + Unpin + 'static + C: AsyncRead + AsyncWrite + Unpin + 'static, { type Output = Yamux>; type Error = io::Error; @@ -328,7 +356,7 @@ where impl OutboundUpgrade for YamuxConfig where - C: AsyncRead + AsyncWrite + Send + Unpin + 'static + C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { type Output = Yamux>; type Error = io::Error; @@ -342,7 +370,7 @@ where impl OutboundUpgrade for YamuxLocalConfig where - C: AsyncRead + AsyncWrite + Unpin + 'static + C: AsyncRead + AsyncWrite + Unpin + 'static, { type Output = Yamux>; type Error = io::Error; @@ -364,7 +392,7 @@ impl From for io::Error { fn from(err: YamuxError) -> Self { match err.0 { yamux::ConnectionError::Io(e) => e, - e => io::Error::new(io::ErrorKind::Other, e) + e => io::Error::new(io::ErrorKind::Other, e), } } } @@ -372,7 +400,7 @@ impl From for io::Error { /// The [`futures::stream::Stream`] of incoming substreams. pub struct Incoming { stream: BoxStream<'static, Result>, - _marker: std::marker::PhantomData + _marker: std::marker::PhantomData, } impl fmt::Debug for Incoming { @@ -384,7 +412,7 @@ impl fmt::Debug for Incoming { /// The [`futures::stream::Stream`] of incoming substreams (`!Send`). pub struct LocalIncoming { stream: LocalBoxStream<'static, Result>, - _marker: std::marker::PhantomData + _marker: std::marker::PhantomData, } impl fmt::Debug for LocalIncoming { @@ -396,7 +424,10 @@ impl fmt::Debug for LocalIncoming { impl Stream for Incoming { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> std::task::Poll> { self.stream.as_mut().poll_next_unpin(cx) } @@ -405,13 +436,15 @@ impl Stream for Incoming { } } -impl Unpin for Incoming { -} +impl Unpin for Incoming {} impl Stream for LocalIncoming { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> std::task::Poll> { self.stream.as_mut().poll_next_unpin(cx) } @@ -420,5 +453,4 @@ impl Stream for LocalIncoming { } } -impl Unpin for LocalIncoming { -} +impl Unpin for LocalIncoming {}