diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index 7dfe6c750ee..d0f5f63e76b 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -11,7 +11,6 @@ categories = ["network-programming", "asynchronous"] [dependencies] async-tls = "0.7.0" -bytes = "0.5" either = "1.5.3" futures = "0.3.1" libp2p-core = { version = "0.19.0", path = "../../core" } @@ -19,10 +18,10 @@ log = "0.4.8" quicksink = "0.1" rustls = "0.17.0" rw-stream-sink = "0.2.0" -soketto = { version = "0.3", features = ["deflate"] } +soketto = { version = "0.4", features = ["deflate"] } url = "2.1" webpki = "0.21" webpki-roots = "0.18" [dev-dependencies] -libp2p-tcp = { version = "0.19.0", path = "../tcp" } +libp2p-tcp = { version = "0.19.0", path = "../tcp", features = ["async-std"] } diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 182b81ab68b..4d2f9a7ac07 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -19,7 +19,6 @@ // DEALINGS IN THE SOFTWARE. use async_tls::{client, server}; -use bytes::BytesMut; use crate::{error::Error, tls}; use either::Either; use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream}; @@ -30,8 +29,8 @@ use libp2p_core::{ transport::{ListenerEvent, TransportError} }; use log::{debug, trace}; -use soketto::{connection, data, extension::deflate::Deflate, handshake}; -use std::{convert::TryInto, fmt, io, pin::Pin, task::Context, task::Poll}; +use soketto::{connection, extension::deflate::Deflate, handshake}; +use std::{convert::TryInto, fmt, io, mem, pin::Pin, task::Context, task::Poll}; use url::Url; /// Max. number of payload bytes of a single frame. @@ -406,36 +405,55 @@ fn location_to_multiaddr(location: &str) -> Result> { /// The websocket connection. pub struct Connection { - receiver: BoxStream<'static, Result>, + receiver: BoxStream<'static, Result>, sender: Pin + Send>>, _marker: std::marker::PhantomData } /// Data received over the websocket connection. #[derive(Debug, Clone)] -pub struct IncomingData(data::Incoming); +pub enum IncomingData { + /// Binary application data. + Binary(Vec), + /// UTF-8 encoded application data. + Text(Vec), + /// PONG control frame data. + Pong(Vec) +} impl IncomingData { + pub fn is_data(&self) -> bool { + self.is_binary() || self.is_text() + } + pub fn is_binary(&self) -> bool { - self.0.is_binary() + if let IncomingData::Binary(_) = self { true } else { false } } pub fn is_text(&self) -> bool { - self.0.is_text() + if let IncomingData::Text(_) = self { true } else { false } } - pub fn is_data(&self) -> bool { - self.0.is_data() + pub fn is_pong(&self) -> bool { + if let IncomingData::Pong(_) = self { true } else { false } } - pub fn is_pong(&self) -> bool { - self.0.is_pong() + pub fn into_bytes(self) -> Vec { + match self { + IncomingData::Binary(d) => d, + IncomingData::Text(d) => d, + IncomingData::Pong(d) => d + } } } impl AsRef<[u8]> for IncomingData { fn as_ref(&self) -> &[u8] { - self.0.as_ref() + match self { + IncomingData::Binary(d) => d, + IncomingData::Text(d) => d, + IncomingData::Pong(d) => d + } } } @@ -443,12 +461,12 @@ impl AsRef<[u8]> for IncomingData { #[derive(Debug, Clone)] pub enum OutgoingData { /// Send some bytes. - Binary(BytesMut), + Binary(Vec), /// Send a PING message. - Ping(BytesMut), + Ping(Vec), /// Send an unsolicited PONG message. /// (Incoming PINGs are answered automatically.) - Pong(BytesMut) + Pong(Vec) } impl fmt::Debug for Connection { @@ -469,13 +487,13 @@ where sender.send_binary_mut(x).await? } quicksink::Action::Send(OutgoingData::Ping(x)) => { - let data = x.as_ref().try_into().map_err(|_| { + let data = x[..].try_into().map_err(|_| { io::Error::new(io::ErrorKind::InvalidInput, "PING data must be < 126 bytes") })?; sender.send_ping(data).await? } quicksink::Action::Send(OutgoingData::Pong(x)) => { - let data = x.as_ref().try_into().map_err(|_| { + let data = x[..].try_into().map_err(|_| { io::Error::new(io::ErrorKind::InvalidInput, "PONG data must be < 126 bytes") })?; sender.send_pong(data).await? @@ -485,26 +503,41 @@ where } Ok(sender) }); + let stream = stream::unfold((Vec::new(), receiver), |(mut data, mut receiver)| async { + match receiver.receive(&mut data).await { + Ok(soketto::Incoming::Data(soketto::Data::Text(_))) => { + Some((Ok(IncomingData::Text(mem::take(&mut data))), (data, receiver))) + } + Ok(soketto::Incoming::Data(soketto::Data::Binary(_))) => { + Some((Ok(IncomingData::Binary(mem::take(&mut data))), (data, receiver))) + } + Ok(soketto::Incoming::Pong(pong)) => { + Some((Ok(IncomingData::Pong(Vec::from(pong))), (data, receiver))) + } + Err(connection::Error::Closed) => None, + Err(e) => Some((Err(e), (data, receiver))) + } + }); Connection { - receiver: connection::into_stream(receiver).boxed(), + receiver: stream.boxed(), sender: Box::pin(sink), _marker: std::marker::PhantomData } } /// Send binary application data to the remote. - pub fn send_data(&mut self, data: impl Into) -> sink::Send<'_, Self, OutgoingData> { - self.send(OutgoingData::Binary(data.into())) + pub fn send_data(&mut self, data: Vec) -> sink::Send<'_, Self, OutgoingData> { + self.send(OutgoingData::Binary(data)) } /// Send a PING to the remote. - pub fn send_ping(&mut self, data: impl Into) -> sink::Send<'_, Self, OutgoingData> { - self.send(OutgoingData::Ping(data.into())) + pub fn send_ping(&mut self, data: Vec) -> sink::Send<'_, Self, OutgoingData> { + self.send(OutgoingData::Ping(data)) } /// Send an unsolicited PONG to the remote. - pub fn send_pong(&mut self, data: impl Into) -> sink::Send<'_, Self, OutgoingData> { - self.send(OutgoingData::Pong(data.into())) + pub fn send_pong(&mut self, data: Vec) -> sink::Send<'_, Self, OutgoingData> { + self.send(OutgoingData::Pong(data)) } } @@ -517,7 +550,7 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let item = ready!(self.receiver.poll_next_unpin(cx)); let item = item.map(|result| { - result.map(IncomingData).map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + result.map_err(|e| io::Error::new(io::ErrorKind::Other, e)) }); Poll::Ready(item) } diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index b55dd25851b..5327496d736 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -24,7 +24,6 @@ pub mod error; pub mod framed; pub mod tls; -use bytes::BytesMut; use error::Error; use framed::Connection; use futures::{future::BoxFuture, prelude::*, stream::BoxStream, ready}; @@ -142,13 +141,13 @@ impl Stream for BytesConnection where T: AsyncRead + AsyncWrite + Send + Unpin + 'static { - type Item = io::Result; + type Item = io::Result>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { if let Some(item) = ready!(self.0.try_poll_next_unpin(cx)?) { if item.is_data() { - return Poll::Ready(Some(Ok(BytesMut::from(item.as_ref())))) + return Poll::Ready(Some(Ok(item.into_bytes()))) } } else { return Poll::Ready(None) @@ -157,7 +156,7 @@ where } } -impl Sink for BytesConnection +impl Sink> for BytesConnection where T: AsyncRead + AsyncWrite + Send + Unpin + 'static { @@ -167,7 +166,7 @@ where Pin::new(&mut self.0).poll_ready(cx) } - fn start_send(mut self: Pin<&mut Self>, item: BytesMut) -> io::Result<()> { + fn start_send(mut self: Pin<&mut Self>, item: Vec) -> io::Result<()> { Pin::new(&mut self.0).start_send(framed::OutgoingData::Binary(item)) }