Skip to content

Commit

Permalink
Merge pull request #2 from Michael-F-Bryan/MessageReceiver
Browse files Browse the repository at this point in the history
Implementation of MessageReceiver and MessagePipe.
  • Loading branch information
rubdos authored Jul 13, 2020
2 parents 41dd104 + 523b0ca commit 44dda63
Show file tree
Hide file tree
Showing 14 changed files with 924 additions and 43 deletions.
7 changes: 7 additions & 0 deletions libsignal-service-actix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,15 @@ libsignal-service = { path = "../libsignal-service" }
libsignal-protocol = { git = "https://github.com/Michael-F-Bryan/libsignal-protocol-rs" }

awc = { version = "2.0.0-alpha.2", features=["rustls"] }
actix = "0.10.0-alpha.3"
actix-rt = "1.1"
serde_json = "1.0"
futures = "0.3"
bytes = "0.5"
rustls = "0.17"
url = "2.1"
serde = "1.0"
log = "0.4.8"

failure = "0.1.5"
thiserror = "1.0"
Expand Down
5 changes: 5 additions & 0 deletions libsignal-service-actix/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
pub mod push_service;
pub mod websocket;

pub mod prelude {
pub use crate::push_service::*;
}
126 changes: 118 additions & 8 deletions libsignal-service-actix/src/push_service.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,134 @@
use libsignal_service::{configuration::*, push_service::*};
use std::{sync::Arc, time::Duration};

use awc::Connector;
use libsignal_service::{
configuration::*, messagepipe::WebSocketService, push_service::*,
};
use serde::Deserialize;
use url::Url;

use crate::websocket::AwcWebSocket;

#[derive(Clone)]
pub struct AwcPushService {
cfg: ServiceConfiguration,
base_url: Url,
client: awc::Client,
}

#[async_trait::async_trait(?Send)]
impl PushService for AwcPushService {
async fn get(&mut self, _path: &str) -> Result<(), ServiceError> { Ok(()) }
type WebSocket = AwcWebSocket;

async fn get<T>(&mut self, path: &str) -> Result<T, ServiceError>
where
for<'de> T: Deserialize<'de>,
{
// In principle, we should be using http::uri::Uri,
// but that doesn't seem like an owned type where we can do this kind of
// constructions on.
// https://docs.rs/http/0.2.1/http/uri/struct.Uri.html
let url = self.base_url.join(path).expect("valid url");

log::debug!("AwcPushService::get({:?})", url);
let mut response =
self.client.get(url.as_str()).send().await.map_err(|e| {
ServiceError::SendError {
reason: e.to_string(),
}
})?;

log::debug!("AwcPushService::get response: {:?}", response);

ServiceError::from_status(response.status())?;

// In order to debug the output, we collect the whole response.
// The actix-web api is meant to used as a streaming deserializer,
// so we have this little awkward switch.
//
// This is also the reason we depend directly on serde_json, however
// actix already imports that anyway.
if log::log_enabled!(log::Level::Debug) {
let text = response.body().await.map_err(|e| {
ServiceError::JsonDecodeError {
reason: e.to_string(),
}
})?;
log::debug!("GET response: {:?}", String::from_utf8_lossy(&text));
serde_json::from_slice(&text).map_err(|e| {
ServiceError::JsonDecodeError {
reason: e.to_string(),
}
})
} else {
response
.json()
.await
.map_err(|e| ServiceError::JsonDecodeError {
reason: e.to_string(),
})
}
}

async fn ws(
&mut self,
credentials: Credentials,
) -> Result<
(
Self::WebSocket,
<Self::WebSocket as WebSocketService>::Stream,
),
ServiceError,
> {
Ok(AwcWebSocket::with_client(
&mut self.client,
&self.base_url,
Some(&credentials),
)
.await?)
}
}

impl AwcPushService {
pub fn new<T: CredentialsProvider>(
_cfg: ServiceConfiguration,
_credentials: T,
/// Creates a new AwcPushService
///
/// Panics on invalid service url.
pub fn new(
cfg: ServiceConfiguration,
credentials: Credentials,
user_agent: &str,
root_ca: &str,
) -> Self {
let base_url =
Url::parse(&cfg.service_urls[0]).expect("valid service url");

// SSL setup
let mut ssl_config = rustls::ClientConfig::new();
ssl_config.alpn_protocols = vec![b"http/1.1".to_vec()];
ssl_config
.root_store
.add_pem_file(&mut std::io::Cursor::new(root_ca))
.unwrap();
let connector = Connector::new()
.rustls(Arc::new(ssl_config))
.timeout(Duration::from_secs(10)) // https://github.com/actix/actix-web/issues/1047
.finish();
let client = awc::ClientBuilder::new()
.connector(connector)
.header("X-Signal-Agent", user_agent)
.timeout(Duration::from_secs(65)); // as in Signal-Android

let client = if let Some((ident, pass)) = credentials.authorization() {
client.basic_auth(ident, Some(pass))
} else {
client
};
let client = client.finish();

Self {
client: awc::ClientBuilder::new()
.header("X-Signal-Agent", user_agent)
.finish(),
cfg,
base_url,
client,
}
}
}
143 changes: 143 additions & 0 deletions libsignal-service-actix/src/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use actix::Arbiter;

use awc::{error::WsProtocolError, ws, ws::Frame};
use bytes::Bytes;
use futures::{channel::mpsc::*, prelude::*};
use url::Url;

use libsignal_service::{
configuration::Credentials, messagepipe::*, push_service::ServiceError,
};

pub struct AwcWebSocket {
socket_sink: Box<dyn Sink<ws::Message, Error = WsProtocolError> + Unpin>,
}

#[derive(thiserror::Error, Debug)]
pub enum AwcWebSocketError {
#[error("Could not connect to the Signal Server")]
ConnectionError(#[from] awc::error::WsClientError),
}

impl From<AwcWebSocketError> for ServiceError {
fn from(e: AwcWebSocketError) -> ServiceError {
todo!("error conversion {:?}", e)
}
}

impl From<WsProtocolError> for AwcWebSocketError {
fn from(e: WsProtocolError) -> AwcWebSocketError {
todo!("error conversion {:?}", e)
// return Some(Err(ServiceError::WsError {
// reason: e.to_string(),
// }));
}
}

/// Process the WebSocket, until it times out.
async fn process<S: Stream>(
mut socket_stream: S,
mut incoming_sink: Sender<Bytes>,
) -> Result<(), AwcWebSocketError>
where
S: Unpin,
S: Stream<Item = Result<Frame, WsProtocolError>>,
{
while let Some(frame) = socket_stream.next().await {
let frame = match frame? {
Frame::Binary(s) => s,

Frame::Continuation(_c) => todo!(),
Frame::Ping(msg) => {
log::warn!("Received Ping({:?})", msg);
// XXX: send pong and make the above log::debug
continue;
},
Frame::Pong(msg) => {
log::trace!("Received Pong({:?})", msg);

continue;
},
Frame::Text(frame) => {
log::warn!("Frame::Text {:?}", frame);

// this is a protocol violation, maybe break; is better?
continue;
},

Frame::Close(c) => {
log::warn!("Websocket closing: {:?}", c);

break;
},
};

// Match SendError
if let Err(e) = incoming_sink.send(frame).await {
log::info!("Websocket sink has closed: {:?}.", e);
break;
}
}
Ok(())
}

impl AwcWebSocket {
pub(crate) async fn with_client(
client: &mut awc::Client,
base_url: impl std::borrow::Borrow<Url>,
credentials: Option<&Credentials>,
) -> Result<(Self, <Self as WebSocketService>::Stream), AwcWebSocketError>
{
let mut url =
base_url.borrow().join("/v1/websocket/").expect("valid url");
url.set_scheme("wss").expect("valid https base url");

if let Some(credentials) = credentials {
url.query_pairs_mut()
.append_pair("login", credentials.login())
.append_pair(
"password",
credentials.password.as_ref().expect("a password"),
);
}

log::trace!("Will start websocket at {:?}", url);
let (response, framed) = client.ws(url.as_str()).connect().await?;

log::debug!("WebSocket connected: {:?}", response);

let (incoming_sink, incoming_stream) = channel(1);

let (socket_sink, socket_stream) = framed.split();
let processing_task = process(socket_stream, incoming_sink);

// When the processing_task stops, the consuming stream and sink also
// terminate.
Arbiter::spawn(processing_task.map(|v| match v {
Ok(()) => (),
Err(e) => {
log::warn!("Processing task terminated with error: {:?}", e)
},
}));

Ok((
Self {
socket_sink: Box::new(socket_sink),
},
incoming_stream,
))
}
}

#[async_trait::async_trait(?Send)]
impl WebSocketService for AwcWebSocket {
type Stream = Receiver<Bytes>;

async fn send_message(&mut self, msg: Bytes) -> Result<(), ServiceError> {
self.socket_sink
.send(ws::Message::Binary(msg))
.await
.map_err(AwcWebSocketError::from)?;
Ok(())
}
}
14 changes: 12 additions & 2 deletions libsignal-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,23 @@ libsignal-protocol = { git = "https://github.com/Michael-F-Bryan/libsignal-proto
failure = "0.1.5"
async-trait = "0.1.30"
url = "2.1.1"
base64 = "0.12"
bytes = "0.5"
futures = "0.3"
pin-project = "0.4"
thiserror = "1.0"
serde = {version = "1.0", features=["derive"]}
serde_json = "1.0"
prost = "0.6"
http = "0.2.1"
log = "0.4.8"

sha2 = "0.9.0"
hmac = "0.8.0"
aes = "0.4.0"
block-modes = "0.5.0"

[dev-dependencies]
structopt = "0.2.17"
structopt = "0.3.0"
tokio = { version = "0.2", features=["macros"] }

[build-dependencies]
Expand Down
22 changes: 18 additions & 4 deletions libsignal-service/examples/registering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,19 @@ async fn main() -> Result<(), Error> {
let password = args.get_password()?;

let config = ServiceConfiguration::default();
let credentials = StaticCredentialsProvider {
uuid: String::new(),

let mut signaling_key = [0u8; 52];
base64::decode_config_slice(
args.signaling_key,
base64::STANDARD,
&mut signaling_key,
)
.unwrap();
let credentials = Credentials {
uuid: None,
e164: args.username,
password,
password: Some(password),
signaling_key,
};

let service = PanicingPushService::new(
Expand Down Expand Up @@ -77,9 +86,14 @@ pub struct Args {
#[structopt(
long = "user-agent",
help = "The user agent to use when contacting servers",
raw(default_value = "libsignal_service::USER_AGENT")
default_value = "libsignal_service::USER_AGENT"
)]
pub user_agent: String,
#[structopt(
long = "signaling-key",
help = "The key used to encrypt and authenticate messages in transit, base64 encoded."
)]
pub signaling_key: String,
}

impl Args {
Expand Down
Loading

0 comments on commit 44dda63

Please sign in to comment.