Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
rubdos committed Jul 9, 2020
1 parent 3007887 commit 4b74ef4
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 37 deletions.
12 changes: 10 additions & 2 deletions libsignal-service-actix/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use libsignal_service::{
};

pub struct AwcWebSocket {
#[allow(dead_code)]
socket_sink: Box<dyn Sink<ws::Message, Error = WsProtocolError>>,
socket_sink: Box<dyn Sink<ws::Message, Error = WsProtocolError> + Unpin>,
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -130,6 +129,15 @@ impl AwcWebSocket {
}
}

#[async_trait::async_trait(?Send)]
impl WebSocketService for AwcWebSocket {
type Stream = Receiver<Bytes>;

async fn send_message(&mut self, msg: Bytes) -> Result<(), ServiceError> {
self.socket_sink
.send(ws::Message::Binary(msg))
.await
.map_err(AwcWebSocketError::from)?;
Ok(())
}
}
115 changes: 80 additions & 35 deletions libsignal-service/src/messagepipe.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::{
pin::Pin,
task::{Context, Poll},
use bytes::{Bytes, BytesMut};
use futures::{
channel::mpsc::{self, Sender},
prelude::*,
};

use bytes::Bytes;
use futures::prelude::*;
use pin_project::pin_project;
use prost::Message;

Expand All @@ -16,8 +14,11 @@ pub use crate::{
push_service::ServiceError,
};

#[async_trait::async_trait(?Send)]
pub trait WebSocketService {
type Stream: Stream<Item = Bytes>;
type Stream: Stream<Item = Bytes> + Unpin;

async fn send_message(&mut self, msg: Bytes) -> Result<(), ServiceError>;
}

#[pin_project]
Expand All @@ -31,41 +32,81 @@ impl<WS: WebSocketService> MessagePipe<WS> {
pub fn from_socket(ws: WS, stream: WS::Stream) -> Self {
MessagePipe { ws, stream }
}
}

impl<WS: WebSocketService + Unpin> Stream for MessagePipe<WS> {
type Item = Result<crate::envelope::Envelope, ServiceError>;

fn poll_next(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let frame = match futures::ready!(Stream::poll_next(
self.project().stream,
ctx
)) {
Some(frame) => frame,
None => return Poll::Ready(None),
async fn send_response(
&mut self,
r: WebSocketResponseMessage,
) -> Result<(), ServiceError> {
let msg = WebSocketMessage {
r#type: Some(web_socket_message::Type::Response.into()),
response: Some(r),
..Default::default()
};
let mut buffer = BytesMut::with_capacity(msg.encoded_len());
msg.encode(&mut buffer).unwrap();
self.ws.send_message(buffer.into()).await
}

/// Worker task that
async fn run(
mut self,
mut sink: Sender<Result<crate::envelope::Envelope, ServiceError>>,
) -> Result<(), mpsc::SendError> {
while let Some(frame) = self.stream.next().await {
// WebsocketConnection::onMessage(ByteString)
let msg = match WebSocketMessage::decode(frame) {
Ok(msg) => msg,
Err(e) => {
sink.send(Err(e.into())).await?;
continue;
},
};

// WebsocketConnection::onMessage(ByteString)
let msg = WebSocketMessage::decode(frame)?;
log::trace!("Decoded {:?}", msg);
log::trace!("Decoded {:?}", msg);

use web_socket_message::Type;
match msg.r#type() {
Type::Unknown => {
return Poll::Ready(Some(Err(
ServiceError::InvalidFrameError {
use web_socket_message::Type;
match (msg.r#type(), msg.request) {
(Type::Unknown, _) => {
sink.send(Err(ServiceError::InvalidFrameError {
reason: "Unknown frame type".into(),
},
)))
},
Type::Request => {},
Type::Response => {},
}))
.await?;
},
(Type::Request, Some(request)) => {
// Java: MessagePipe::read
let response =
WebSocketResponseMessage::from_request(&request);
if let Err(e) = self.send_response(response).await {
sink.send(Err(e)).await?;
}
},
(Type::Request, None) => {
sink.send(Err(ServiceError::InvalidFrameError {
reason:
"Type was request, but does not contain request."
.into(),
}))
.await?;
},
(Type::Response, _) => {},
}
}
Ok(())
}

Poll::Pending
/// Starts the stream of `Envelope`s and returns a worker `Future` that
/// needs to be polled.
pub fn stream(
self,
) -> (
impl Stream<Item = Result<crate::envelope::Envelope, ServiceError>>,
impl Future<Output = ()>,
) {
let (sink, stream) = mpsc::channel(1);
(
stream,
self.run(sink).map(|_| log::info!("Sink was closed.")),
)
}
}

Expand All @@ -75,4 +116,8 @@ pub struct PanicingWebSocketService;
#[async_trait::async_trait(?Send)]
impl WebSocketService for PanicingWebSocketService {
type Stream = futures::channel::mpsc::Receiver<Bytes>;

async fn send_message(&mut self, _msg: Bytes) -> Result<(), ServiceError> {
unimplemented!();
}
}

0 comments on commit 4b74ef4

Please sign in to comment.