Skip to content

Commit

Permalink
Don't panic in libp2p-websocket
Browse files Browse the repository at this point in the history
Return an error instead. `quicksink` panics if you call a method after
it returned an error once.

Fixes libp2p#2598.
  • Loading branch information
hrxi committed Mar 30, 2022
1 parent 6cc3b4e commit 4711b8a
Showing 1 changed file with 44 additions and 2 deletions.
46 changes: 44 additions & 2 deletions transports/websocket/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ fn location_to_multiaddr<T>(location: &str) -> Result<Multiaddr, Error<T>> {
/// The websocket connection.
pub struct Connection<T> {
receiver: BoxStream<'static, Result<Incoming, connection::Error>>,
sender: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>,
sender: SenderSink,
_marker: std::marker::PhantomData<T>,
}

Expand Down Expand Up @@ -577,6 +577,48 @@ impl<T> fmt::Debug for Connection<T> {
}
}

struct SenderSink {
inner: Option<Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>>,
}

impl SenderSink {
fn new(inner: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>) -> SenderSink {
SenderSink {
inner: Some(inner),
}
}
fn handle_result(&mut self, result: Result<(), connection::Error>) -> Result<(), connection::Error> {
if result.is_err() {
self.inner = None;
}
result
}
}

impl Sink<OutgoingData> for SenderSink {
type Error = connection::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.as_mut().map(|i| Pin::new(i).poll_ready(cx))
.unwrap_or_else(|| Poll::Ready(Err(connection::Error::Closed)))
.map(|r| self.handle_result(r))
}
fn start_send(mut self: Pin<&mut Self>, item: OutgoingData) -> Result<(), Self::Error> {
let result = self.inner.as_mut().map(|i| Pin::new(i).start_send(item))
.unwrap_or_else(|| Err(connection::Error::Closed));
self.handle_result(result)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.as_mut().map(|i| Pin::new(i).poll_flush(cx))
.unwrap_or_else(|| Poll::Ready(Err(connection::Error::Closed)))
.map(|r| self.handle_result(r))
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.as_mut().map(|i| Pin::new(i).poll_close(cx))
.unwrap_or_else(|| Poll::Ready(Err(connection::Error::Closed)))
.map(|r| self.handle_result(r))
}
}

impl<T> Connection<T>
where
T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
Expand Down Expand Up @@ -627,7 +669,7 @@ where
});
Connection {
receiver: stream.boxed(),
sender: Box::pin(sink),
sender: SenderSink::new(Box::pin(sink)),
_marker: std::marker::PhantomData,
}
}
Expand Down

0 comments on commit 4711b8a

Please sign in to comment.