Skip to content

Commit

Permalink
Update to soketto v0.4.0 (libp2p#1603)
Browse files Browse the repository at this point in the history
* Update to soketto v0.4.0

* Remove patch section from Cargo.toml

Co-authored-by: Roman Borschel <[email protected]>
  • Loading branch information
twittner and romanb authored Jun 15, 2020
1 parent 569113e commit b983c94
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 33 deletions.
5 changes: 2 additions & 3 deletions transports/websocket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,17 @@ categories = ["network-programming", "asynchronous"]

[dependencies]
async-tls = "0.7.0"
bytes = "0.5"
either = "1.5.3"
futures = "0.3.1"
libp2p-core = { version = "0.19.0", path = "../../core" }
log = "0.4.8"
quicksink = "0.1"
rustls = "0.17.0"
rw-stream-sink = "0.2.0"
soketto = { version = "0.3", features = ["deflate"] }
soketto = { version = "0.4", features = ["deflate"] }
url = "2.1"
webpki = "0.21"
webpki-roots = "0.18"

[dev-dependencies]
libp2p-tcp = { version = "0.19.0", path = "../tcp" }
libp2p-tcp = { version = "0.19.0", path = "../tcp", features = ["async-std"] }
83 changes: 58 additions & 25 deletions transports/websocket/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
// DEALINGS IN THE SOFTWARE.

use async_tls::{client, server};
use bytes::BytesMut;
use crate::{error::Error, tls};
use either::Either;
use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream};
Expand All @@ -30,8 +29,8 @@ use libp2p_core::{
transport::{ListenerEvent, TransportError}
};
use log::{debug, trace};
use soketto::{connection, data, extension::deflate::Deflate, handshake};
use std::{convert::TryInto, fmt, io, pin::Pin, task::Context, task::Poll};
use soketto::{connection, extension::deflate::Deflate, handshake};
use std::{convert::TryInto, fmt, io, mem, pin::Pin, task::Context, task::Poll};
use url::Url;

/// Max. number of payload bytes of a single frame.
Expand Down Expand Up @@ -406,49 +405,68 @@ fn location_to_multiaddr<T>(location: &str) -> Result<Multiaddr, Error<T>> {

/// The websocket connection.
pub struct Connection<T> {
receiver: BoxStream<'static, Result<data::Incoming, connection::Error>>,
receiver: BoxStream<'static, Result<IncomingData, connection::Error>>,
sender: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>,
_marker: std::marker::PhantomData<T>
}

/// Data received over the websocket connection.
#[derive(Debug, Clone)]
pub struct IncomingData(data::Incoming);
pub enum IncomingData {
/// Binary application data.
Binary(Vec<u8>),
/// UTF-8 encoded application data.
Text(Vec<u8>),
/// PONG control frame data.
Pong(Vec<u8>)
}

impl IncomingData {
pub fn is_data(&self) -> bool {
self.is_binary() || self.is_text()
}

pub fn is_binary(&self) -> bool {
self.0.is_binary()
if let IncomingData::Binary(_) = self { true } else { false }
}

pub fn is_text(&self) -> bool {
self.0.is_text()
if let IncomingData::Text(_) = self { true } else { false }
}

pub fn is_data(&self) -> bool {
self.0.is_data()
pub fn is_pong(&self) -> bool {
if let IncomingData::Pong(_) = self { true } else { false }
}

pub fn is_pong(&self) -> bool {
self.0.is_pong()
pub fn into_bytes(self) -> Vec<u8> {
match self {
IncomingData::Binary(d) => d,
IncomingData::Text(d) => d,
IncomingData::Pong(d) => d
}
}
}

impl AsRef<[u8]> for IncomingData {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
match self {
IncomingData::Binary(d) => d,
IncomingData::Text(d) => d,
IncomingData::Pong(d) => d
}
}
}

/// Data sent over the websocket connection.
#[derive(Debug, Clone)]
pub enum OutgoingData {
/// Send some bytes.
Binary(BytesMut),
Binary(Vec<u8>),
/// Send a PING message.
Ping(BytesMut),
Ping(Vec<u8>),
/// Send an unsolicited PONG message.
/// (Incoming PINGs are answered automatically.)
Pong(BytesMut)
Pong(Vec<u8>)
}

impl<T> fmt::Debug for Connection<T> {
Expand All @@ -469,13 +487,13 @@ where
sender.send_binary_mut(x).await?
}
quicksink::Action::Send(OutgoingData::Ping(x)) => {
let data = x.as_ref().try_into().map_err(|_| {
let data = x[..].try_into().map_err(|_| {
io::Error::new(io::ErrorKind::InvalidInput, "PING data must be < 126 bytes")
})?;
sender.send_ping(data).await?
}
quicksink::Action::Send(OutgoingData::Pong(x)) => {
let data = x.as_ref().try_into().map_err(|_| {
let data = x[..].try_into().map_err(|_| {
io::Error::new(io::ErrorKind::InvalidInput, "PONG data must be < 126 bytes")
})?;
sender.send_pong(data).await?
Expand All @@ -485,26 +503,41 @@ where
}
Ok(sender)
});
let stream = stream::unfold((Vec::new(), receiver), |(mut data, mut receiver)| async {
match receiver.receive(&mut data).await {
Ok(soketto::Incoming::Data(soketto::Data::Text(_))) => {
Some((Ok(IncomingData::Text(mem::take(&mut data))), (data, receiver)))
}
Ok(soketto::Incoming::Data(soketto::Data::Binary(_))) => {
Some((Ok(IncomingData::Binary(mem::take(&mut data))), (data, receiver)))
}
Ok(soketto::Incoming::Pong(pong)) => {
Some((Ok(IncomingData::Pong(Vec::from(pong))), (data, receiver)))
}
Err(connection::Error::Closed) => None,
Err(e) => Some((Err(e), (data, receiver)))
}
});
Connection {
receiver: connection::into_stream(receiver).boxed(),
receiver: stream.boxed(),
sender: Box::pin(sink),
_marker: std::marker::PhantomData
}
}

/// Send binary application data to the remote.
pub fn send_data(&mut self, data: impl Into<BytesMut>) -> sink::Send<'_, Self, OutgoingData> {
self.send(OutgoingData::Binary(data.into()))
pub fn send_data(&mut self, data: Vec<u8>) -> sink::Send<'_, Self, OutgoingData> {
self.send(OutgoingData::Binary(data))
}

/// Send a PING to the remote.
pub fn send_ping(&mut self, data: impl Into<BytesMut>) -> sink::Send<'_, Self, OutgoingData> {
self.send(OutgoingData::Ping(data.into()))
pub fn send_ping(&mut self, data: Vec<u8>) -> sink::Send<'_, Self, OutgoingData> {
self.send(OutgoingData::Ping(data))
}

/// Send an unsolicited PONG to the remote.
pub fn send_pong(&mut self, data: impl Into<BytesMut>) -> sink::Send<'_, Self, OutgoingData> {
self.send(OutgoingData::Pong(data.into()))
pub fn send_pong(&mut self, data: Vec<u8>) -> sink::Send<'_, Self, OutgoingData> {
self.send(OutgoingData::Pong(data))
}
}

Expand All @@ -517,7 +550,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let item = ready!(self.receiver.poll_next_unpin(cx));
let item = item.map(|result| {
result.map(IncomingData).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
result.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
});
Poll::Ready(item)
}
Expand Down
9 changes: 4 additions & 5 deletions transports/websocket/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ pub mod error;
pub mod framed;
pub mod tls;

use bytes::BytesMut;
use error::Error;
use framed::Connection;
use futures::{future::BoxFuture, prelude::*, stream::BoxStream, ready};
Expand Down Expand Up @@ -142,13 +141,13 @@ impl<T> Stream for BytesConnection<T>
where
T: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
type Item = io::Result<BytesMut>;
type Item = io::Result<Vec<u8>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
if let Some(item) = ready!(self.0.try_poll_next_unpin(cx)?) {
if item.is_data() {
return Poll::Ready(Some(Ok(BytesMut::from(item.as_ref()))))
return Poll::Ready(Some(Ok(item.into_bytes())))
}
} else {
return Poll::Ready(None)
Expand All @@ -157,7 +156,7 @@ where
}
}

impl<T> Sink<BytesMut> for BytesConnection<T>
impl<T> Sink<Vec<u8>> for BytesConnection<T>
where
T: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
Expand All @@ -167,7 +166,7 @@ where
Pin::new(&mut self.0).poll_ready(cx)
}

fn start_send(mut self: Pin<&mut Self>, item: BytesMut) -> io::Result<()> {
fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> io::Result<()> {
Pin::new(&mut self.0).start_send(framed::OutgoingData::Binary(item))
}

Expand Down

0 comments on commit b983c94

Please sign in to comment.