From a0e482a594422abb9ebe4c11c0e740102de5596d Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Thu, 18 Jun 2020 16:31:57 +0200 Subject: [PATCH 01/23] Scaffold for Envelope and EnvelopeEntity --- libsignal-service/src/envelope.rs | 29 +++++++++++++++++++++++++++++ libsignal-service/src/lib.rs | 1 + libsignal-service/src/receiver.rs | 10 +++++++++- 3 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 libsignal-service/src/envelope.rs diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs new file mode 100644 index 000000000..41ac3d62a --- /dev/null +++ b/libsignal-service/src/envelope.rs @@ -0,0 +1,29 @@ +pub struct Envelope { + inner: crate::proto::Envelope, +} + +#[derive(serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct EnvelopeEntity { + pub r#type: i32, + pub relay: String, + pub timestamp: i64, + pub source: String, + pub source_uuid: String, + pub source_device: i32, + pub message: Vec, + pub content: Vec, + pub server_timestamp: i64, + pub guid: String, +} + +const SUPPORTED_VERSION: usize = 1; +const CIPHER_KEY_SIZE: usize = 32; +const MAC_KEY_SIZE: usize = 20; +const MAC_SIZE: usize = 10; + +const VERSION_OFFSET: usize = 0; +const VERSION_LENGTH: usize = 1; +const IV_OFFSET: usize = VERSION_OFFSET + VERSION_LENGTH; +const IV_LENGTH: usize = 16; +const CIPHERTEXT_OFFSET: usize = IV_OFFSET + IV_LENGTH; diff --git a/libsignal-service/src/lib.rs b/libsignal-service/src/lib.rs index 0e584f0a7..669659ef2 100644 --- a/libsignal-service/src/lib.rs +++ b/libsignal-service/src/lib.rs @@ -1,5 +1,6 @@ mod account_manager; pub mod configuration; +pub mod envelope; pub mod models; pub mod push_service; pub mod receiver; diff --git a/libsignal-service/src/receiver.rs b/libsignal-service/src/receiver.rs index b7332b633..87e01b267 100644 --- a/libsignal-service/src/receiver.rs +++ b/libsignal-service/src/receiver.rs @@ -1,4 +1,4 @@ -use crate::{configuration::*, push_service::PushService}; +use crate::{configuration::*, envelope::Envelope, push_service::PushService}; use libsignal_protocol::StoreContext; @@ -12,4 +12,12 @@ impl MessageReceiver { pub fn new(service: Service, context: StoreContext) -> Self { MessageReceiver { service, context } } + + /// One-off method to receive all pending messages. + /// + /// For streaming messages, use a `MessagePipe` through + /// [`MessageReceiver::create_message_pipe()`]. + pub async fn receive_messages(&mut self) -> Vec { vec![] } + + pub async fn create_message_pipe(&self) -> () { unimplemented!() } } From f7dfa74fbc0d5ac336e734b0685d967d5f5b12d2 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sat, 20 Jun 2020 08:55:03 +0200 Subject: [PATCH 02/23] CredentialsProvider trait is non-Rusty --- libsignal-service-actix/src/push_service.rs | 4 ++-- libsignal-service/examples/registering.rs | 2 +- libsignal-service/src/configuration.rs | 18 +----------------- libsignal-service/src/push_service.rs | 6 +++--- 4 files changed, 7 insertions(+), 23 deletions(-) diff --git a/libsignal-service-actix/src/push_service.rs b/libsignal-service-actix/src/push_service.rs index 7dc8c12ba..13a76bb2b 100644 --- a/libsignal-service-actix/src/push_service.rs +++ b/libsignal-service-actix/src/push_service.rs @@ -10,9 +10,9 @@ impl PushService for AwcPushService { } impl AwcPushService { - pub fn new( + pub fn new( _cfg: ServiceConfiguration, - _credentials: T, + _credentials: Credentials, user_agent: &str, ) -> Self { Self { diff --git a/libsignal-service/examples/registering.rs b/libsignal-service/examples/registering.rs index e6bd26ab3..9add25c69 100644 --- a/libsignal-service/examples/registering.rs +++ b/libsignal-service/examples/registering.rs @@ -34,7 +34,7 @@ async fn main() -> Result<(), Error> { let password = args.get_password()?; let config = ServiceConfiguration::default(); - let credentials = StaticCredentialsProvider { + let credentials = Credentials { uuid: String::new(), e164: args.username, password, diff --git a/libsignal-service/src/configuration.rs b/libsignal-service/src/configuration.rs index ccead381f..dd0d7cd2e 100644 --- a/libsignal-service/src/configuration.rs +++ b/libsignal-service/src/configuration.rs @@ -5,24 +5,8 @@ pub struct ServiceConfiguration { pub contact_discovery_url: Vec, } -pub trait CredentialsProvider { - fn get_uuid(&self) -> String; - - fn get_e164(&self) -> String; - - fn get_password(&self) -> String; -} - -pub struct StaticCredentialsProvider { +pub struct Credentials { pub uuid: String, pub e164: String, pub password: String, } - -impl CredentialsProvider for StaticCredentialsProvider { - fn get_uuid(&self) -> String { self.uuid.clone() } - - fn get_e164(&self) -> String { self.e164.clone() } - - fn get_password(&self) -> String { self.password.clone() } -} diff --git a/libsignal-service/src/push_service.rs b/libsignal-service/src/push_service.rs index fdf2f2f7e..07d96a78e 100644 --- a/libsignal-service/src/push_service.rs +++ b/libsignal-service/src/push_service.rs @@ -1,4 +1,4 @@ -use crate::configuration::{CredentialsProvider, ServiceConfiguration}; +use crate::configuration::{Credentials, ServiceConfiguration}; pub const CREATE_ACCOUNT_SMS_PATH: &str = "/v1/accounts/sms/code/%s?client=%s"; pub const CREATE_ACCOUNT_VOICE_PATH: &str = "/v1/accounts/voice/code/%s"; @@ -66,9 +66,9 @@ pub struct PanicingPushService; impl PanicingPushService { /// A PushService implementation typically takes a ServiceConfiguration, /// credentials and a user agent. - pub fn new( + pub fn new( _cfg: ServiceConfiguration, - _credentials: T, + _credentials: Credentials, _user_agent: &str, ) -> Self { Self From 0ea9c20e36a7468907e2cc098a317a2b62cc4982 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sat, 20 Jun 2020 12:29:47 +0200 Subject: [PATCH 03/23] Complete interface for fetching messages. Halfy verified working with Whisperfish. --- libsignal-service-actix/Cargo.toml | 3 + libsignal-service-actix/src/lib.rs | 4 ++ libsignal-service-actix/src/push_service.rs | 77 +++++++++++++++++++-- libsignal-service/Cargo.toml | 2 +- libsignal-service/examples/registering.rs | 4 +- libsignal-service/src/configuration.rs | 18 ++++- libsignal-service/src/envelope.rs | 2 +- libsignal-service/src/lib.rs | 7 ++ libsignal-service/src/push_service.rs | 59 ++++++++++++++-- libsignal-service/src/receiver.rs | 17 ++++- 10 files changed, 174 insertions(+), 19 deletions(-) diff --git a/libsignal-service-actix/Cargo.toml b/libsignal-service-actix/Cargo.toml index a42fdb8ba..ca751cedb 100644 --- a/libsignal-service-actix/Cargo.toml +++ b/libsignal-service-actix/Cargo.toml @@ -13,6 +13,9 @@ libsignal-protocol = { git = "https://github.com/Michael-F-Bryan/libsignal-proto awc = { version = "2.0.0-alpha.2", features=["rustls"] } actix-rt = "1.1" rustls = "0.17" +url = "2.1" +serde = "1.0" +log = "0.4.8" failure = "0.1.5" thiserror = "1.0" diff --git a/libsignal-service-actix/src/lib.rs b/libsignal-service-actix/src/lib.rs index 028a9bfe4..b2fa815e5 100644 --- a/libsignal-service-actix/src/lib.rs +++ b/libsignal-service-actix/src/lib.rs @@ -1 +1,5 @@ pub mod push_service; + +pub mod prelude { + pub use crate::push_service::*; +} diff --git a/libsignal-service-actix/src/push_service.rs b/libsignal-service-actix/src/push_service.rs index 13a76bb2b..899bb6cee 100644 --- a/libsignal-service-actix/src/push_service.rs +++ b/libsignal-service-actix/src/push_service.rs @@ -1,24 +1,89 @@ +use std::{sync::Arc, time::Duration}; + +use awc::Connector; use libsignal_service::{configuration::*, push_service::*}; +use serde::Deserialize; +use url::Url; pub struct AwcPushService { + cfg: ServiceConfiguration, + base_url: Url, client: awc::Client, } #[async_trait::async_trait(?Send)] impl PushService for AwcPushService { - async fn get(&mut self, _path: &str) -> Result<(), ServiceError> { Ok(()) } + async fn get(&mut self, path: &str) -> Result + where + for<'de> T: Deserialize<'de>, + { + // In principle, we should be using http::uri::Uri, + // but that doesn't seem like an owned type where we can do this kind of + // constructions on. + // https://docs.rs/http/0.2.1/http/uri/struct.Uri.html + let url = self.base_url.join(path).expect("valid url"); + + log::debug!("AwcPushService::get({:?})", url); + let mut response = + self.client.get(url.as_str()).send().await.map_err(|e| { + ServiceError::SendError { + reason: e.to_string(), + } + })?; + + log::debug!("AwcPushService::get response: {:?}", response); + + ServiceError::from_status(response.status())?; + + response + .json() + .await + .map_err(|e| ServiceError::JsonDecodeError { + reason: e.to_string(), + }) + } } impl AwcPushService { + /// Creates a new AwcPushService + /// + /// Panics on invalid service url. pub fn new( - _cfg: ServiceConfiguration, - _credentials: Credentials, + cfg: ServiceConfiguration, + credentials: Credentials, user_agent: &str, + root_ca: &str, ) -> Self { + let base_url = + Url::parse(&cfg.service_urls[0]).expect("valid service url"); + + // SSL setup + let mut ssl_config = rustls::ClientConfig::new(); + ssl_config.alpn_protocols = vec![b"http/1.1".to_vec()]; + ssl_config + .root_store + .add_pem_file(&mut std::io::Cursor::new(root_ca)) + .unwrap(); + let connector = Connector::new() + .rustls(Arc::new(ssl_config)) + .timeout(Duration::from_secs(10)) // https://github.com/actix/actix-web/issues/1047 + .finish(); + let client = awc::ClientBuilder::new() + .connector(connector) + .header("X-Signal-Agent", user_agent) + .timeout(Duration::from_secs(65)); // as in Signal-Android + + let client = if let Some((ident, pass)) = credentials.authorization() { + client.basic_auth(ident, Some(pass)) + } else { + client + }; + let client = client.finish(); + Self { - client: awc::ClientBuilder::new() - .header("X-Signal-Agent", user_agent) - .finish(), + cfg, + base_url, + client, } } } diff --git a/libsignal-service/Cargo.toml b/libsignal-service/Cargo.toml index ade0ece0b..33e5dab63 100644 --- a/libsignal-service/Cargo.toml +++ b/libsignal-service/Cargo.toml @@ -13,8 +13,8 @@ async-trait = "0.1.30" url = "2.1.1" thiserror = "1.0" serde = {version = "1.0", features=["derive"]} -serde_json = "1.0" prost = "0.6" +http = "0.2.1" [dev-dependencies] structopt = "0.2.17" diff --git a/libsignal-service/examples/registering.rs b/libsignal-service/examples/registering.rs index 9add25c69..24414d1d0 100644 --- a/libsignal-service/examples/registering.rs +++ b/libsignal-service/examples/registering.rs @@ -35,9 +35,9 @@ async fn main() -> Result<(), Error> { let config = ServiceConfiguration::default(); let credentials = Credentials { - uuid: String::new(), + uuid: None, e164: args.username, - password, + password: Some(password), }; let service = PanicingPushService::new( diff --git a/libsignal-service/src/configuration.rs b/libsignal-service/src/configuration.rs index dd0d7cd2e..c0a4652f8 100644 --- a/libsignal-service/src/configuration.rs +++ b/libsignal-service/src/configuration.rs @@ -6,7 +6,21 @@ pub struct ServiceConfiguration { } pub struct Credentials { - pub uuid: String, + pub uuid: Option, pub e164: String, - pub password: String, + pub password: Option, +} + +impl Credentials { + /// Kind-of equivalent with `PushServiceSocket::getAuthorizationHeader` + /// + /// None when `self.password == None` + pub fn authorization(&self) -> Option<(&str, &str)> { + let identifier: &str = if let Some(uuid) = self.uuid.as_ref() { + uuid + } else { + &self.e164 + }; + Some((identifier, self.password.as_ref()?)) + } } diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index 41ac3d62a..b3ab3f50e 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -4,7 +4,7 @@ pub struct Envelope { #[derive(serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] -pub(crate) struct EnvelopeEntity { +pub struct EnvelopeEntity { pub r#type: i32, pub relay: String, pub timestamp: i64, diff --git a/libsignal-service/src/lib.rs b/libsignal-service/src/lib.rs index 669659ef2..70b43e37b 100644 --- a/libsignal-service/src/lib.rs +++ b/libsignal-service/src/lib.rs @@ -13,3 +13,10 @@ pub const USER_AGENT: &'static str = concat!(env!("CARGO_PKG_NAME"), "-rs-", env!("CARGO_PKG_VERSION")); pub struct TrustStore; + +pub mod prelude { + pub use crate::{ + configuration::{Credentials, ServiceConfiguration}, + receiver::MessageReceiver, + }; +} diff --git a/libsignal-service/src/push_service.rs b/libsignal-service/src/push_service.rs index 07d96a78e..4545c4a92 100644 --- a/libsignal-service/src/push_service.rs +++ b/libsignal-service/src/push_service.rs @@ -1,4 +1,10 @@ -use crate::configuration::{Credentials, ServiceConfiguration}; +use crate::{ + configuration::{Credentials, ServiceConfiguration}, + envelope::*, +}; + +use http::StatusCode; +use serde::Deserialize; pub const CREATE_ACCOUNT_SMS_PATH: &str = "/v1/accounts/sms/code/%s?client=%s"; pub const CREATE_ACCOUNT_VOICE_PATH: &str = "/v1/accounts/voice/code/%s"; @@ -23,7 +29,7 @@ pub const DIRECTORY_TOKENS_PATH: &str = "/v1/directory/tokens"; pub const DIRECTORY_VERIFY_PATH: &str = "/v1/directory/%s"; pub const DIRECTORY_AUTH_PATH: &str = "/v1/directory/auth"; pub const DIRECTORY_FEEDBACK_PATH: &str = "/v1/directory/feedback-v3/%s"; -pub const MESSAGE_PATH: &str = "/v1/messages/%s"; +pub const MESSAGE_PATH: &str = "/v1/messages/"; // optionally with destination appended pub const SENDER_ACK_MESSAGE_PATH: &str = "/v1/messages/%s/%d"; pub const UUID_ACK_MESSAGE_PATH: &str = "/v1/messages/uuid/%s"; pub const ATTACHMENT_PATH: &str = "/v2/attachments/form/upload"; @@ -46,11 +52,45 @@ pub enum SmsVerificationCodeResponse { } #[derive(thiserror::Error, Debug)] -pub enum ServiceError {} +pub enum ServiceError { + #[error("Error sending request: {reason}")] + SendError { reason: String }, + #[error("Error decoding JSON response: {reason}")] + JsonDecodeError { reason: String }, + + #[error("Rate limit exceeded")] + RateLimitExceeded, + #[error("Authorization failed")] + Unauthorized, + #[error("Unexpected response: HTTP {http_code}")] + UnhandledResponseCode { http_code: u16 }, +} + +impl ServiceError { + pub fn from_status(code: http::StatusCode) -> Result<(), Self> { + match code { + StatusCode::OK => Ok(()), + StatusCode::NO_CONTENT => Ok(()), + StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => { + Err(ServiceError::Unauthorized) + }, + StatusCode::PAYLOAD_TOO_LARGE => { + // This is 413 and means rate limit exceeded for Signal. + Err(ServiceError::RateLimitExceeded) + }, + // XXX: fill in rest from PushServiceSocket + _ => Err(ServiceError::UnhandledResponseCode { + http_code: code.as_u16(), + }), + } + } +} #[async_trait::async_trait(?Send)] pub trait PushService { - async fn get(&mut self, path: &str) -> Result<(), ServiceError>; + async fn get(&mut self, path: &str) -> Result + where + for<'de> T: Deserialize<'de>; async fn request_sms_verification_code( &mut self, @@ -58,6 +98,12 @@ pub trait PushService { self.get(CREATE_ACCOUNT_SMS_PATH).await?; Ok(SmsVerificationCodeResponse::SmsSent) } + + async fn get_messages( + &mut self, + ) -> Result, ServiceError> { + Ok(self.get(MESSAGE_PATH).await?) + } } /// PushService that panics on every request, mainly for example code. @@ -77,7 +123,10 @@ impl PanicingPushService { #[async_trait::async_trait(?Send)] impl PushService for PanicingPushService { - async fn get(&mut self, path: &str) -> Result<(), ServiceError> { + async fn get(&mut self, _path: &str) -> Result + where + for<'de> T: Deserialize<'de>, + { unimplemented!() } } diff --git a/libsignal-service/src/receiver.rs b/libsignal-service/src/receiver.rs index 87e01b267..52ed833ae 100644 --- a/libsignal-service/src/receiver.rs +++ b/libsignal-service/src/receiver.rs @@ -1,4 +1,4 @@ -use crate::{configuration::*, envelope::Envelope, push_service::PushService}; +use crate::{configuration::*, envelope::Envelope, push_service::*}; use libsignal_protocol::StoreContext; @@ -8,6 +8,12 @@ pub struct MessageReceiver { context: StoreContext, } +#[derive(thiserror::Error, Debug)] +pub enum MessageReceiverError { + #[error("ServiceError")] + ServiceError(#[from] ServiceError), +} + impl MessageReceiver { pub fn new(service: Service, context: StoreContext) -> Self { MessageReceiver { service, context } @@ -15,9 +21,16 @@ impl MessageReceiver { /// One-off method to receive all pending messages. /// + /// Equivalent with Java's `SignalServiceMessageReceiver::retrieveMessages`. + /// /// For streaming messages, use a `MessagePipe` through /// [`MessageReceiver::create_message_pipe()`]. - pub async fn receive_messages(&mut self) -> Vec { vec![] } + pub async fn retrieve_messages( + &mut self, + ) -> Result, MessageReceiverError> { + let _entities = self.service.get_messages().await?; + Ok(vec![]) + } pub async fn create_message_pipe(&self) -> () { unimplemented!() } } From 65cd1827a7660fcff9ae714c772a0255efe5a809 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sat, 20 Jun 2020 20:15:41 +0200 Subject: [PATCH 04/23] Allow debugging the passing through JSON data. --- libsignal-service-actix/Cargo.toml | 1 + libsignal-service-actix/src/push_service.rs | 30 +++++++++++++++++---- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/libsignal-service-actix/Cargo.toml b/libsignal-service-actix/Cargo.toml index ca751cedb..32ddfa56a 100644 --- a/libsignal-service-actix/Cargo.toml +++ b/libsignal-service-actix/Cargo.toml @@ -12,6 +12,7 @@ libsignal-protocol = { git = "https://github.com/Michael-F-Bryan/libsignal-proto awc = { version = "2.0.0-alpha.2", features=["rustls"] } actix-rt = "1.1" +serde_json = "1.0" rustls = "0.17" url = "2.1" serde = "1.0" diff --git a/libsignal-service-actix/src/push_service.rs b/libsignal-service-actix/src/push_service.rs index 899bb6cee..43bb3043c 100644 --- a/libsignal-service-actix/src/push_service.rs +++ b/libsignal-service-actix/src/push_service.rs @@ -35,12 +35,32 @@ impl PushService for AwcPushService { ServiceError::from_status(response.status())?; - response - .json() - .await - .map_err(|e| ServiceError::JsonDecodeError { - reason: e.to_string(), + // In order to debug the output, we collect the whole response. + // The actix-web api is meant to used as a streaming deserializer, + // so we have this little awkward switch. + // + // This is also the reason we depend directly on serde_json, however + // actix already imports that anyway. + if log::log_enabled!(log::Level::Debug) { + let text = response.body().await.map_err(|e| { + ServiceError::JsonDecodeError { + reason: e.to_string(), + } + })?; + log::debug!("GET response: {:?}", String::from_utf8_lossy(&text)); + serde_json::from_slice(&text).map_err(|e| { + ServiceError::JsonDecodeError { + reason: e.to_string(), + } }) + } else { + response + .json() + .await + .map_err(|e| ServiceError::JsonDecodeError { + reason: e.to_string(), + }) + } } } From ca43a2c8be97b9a54de3c9478293c80305ca0624 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sat, 20 Jun 2020 20:16:35 +0200 Subject: [PATCH 05/23] EnvelopeEntityList wrapping type. --- libsignal-service/Cargo.toml | 1 + libsignal-service/src/envelope.rs | 9 +++++++++ libsignal-service/src/lib.rs | 2 ++ libsignal-service/src/push_service.rs | 3 ++- libsignal-service/src/utils.rs | 22 ++++++++++++++++++++++ 5 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 libsignal-service/src/utils.rs diff --git a/libsignal-service/Cargo.toml b/libsignal-service/Cargo.toml index 33e5dab63..8caacf8f9 100644 --- a/libsignal-service/Cargo.toml +++ b/libsignal-service/Cargo.toml @@ -11,6 +11,7 @@ libsignal-protocol = { git = "https://github.com/Michael-F-Bryan/libsignal-proto failure = "0.1.5" async-trait = "0.1.30" url = "2.1.1" +base64 = "0.12" thiserror = "1.0" serde = {version = "1.0", features=["derive"]} prost = "0.6" diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index b3ab3f50e..17280562e 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -1,3 +1,5 @@ +use crate::utils::serde_base64; + pub struct Envelope { inner: crate::proto::Envelope, } @@ -11,12 +13,19 @@ pub struct EnvelopeEntity { pub source: String, pub source_uuid: String, pub source_device: i32, + #[serde(with = "serde_base64")] pub message: Vec, + #[serde(with = "serde_base64")] pub content: Vec, pub server_timestamp: i64, pub guid: String, } +#[derive(serde::Serialize, serde::Deserialize)] +pub(crate) struct EnvelopeEntityList { + pub messages: Vec, +} + const SUPPORTED_VERSION: usize = 1; const CIPHER_KEY_SIZE: usize = 32; const MAC_KEY_SIZE: usize = 20; diff --git a/libsignal-service/src/lib.rs b/libsignal-service/src/lib.rs index 70b43e37b..a76f806fa 100644 --- a/libsignal-service/src/lib.rs +++ b/libsignal-service/src/lib.rs @@ -7,6 +7,8 @@ pub mod receiver; mod proto; +mod utils; + pub use crate::account_manager::AccountManager; pub const USER_AGENT: &'static str = diff --git a/libsignal-service/src/push_service.rs b/libsignal-service/src/push_service.rs index 4545c4a92..4c2140189 100644 --- a/libsignal-service/src/push_service.rs +++ b/libsignal-service/src/push_service.rs @@ -102,7 +102,8 @@ pub trait PushService { async fn get_messages( &mut self, ) -> Result, ServiceError> { - Ok(self.get(MESSAGE_PATH).await?) + let entity_list: EnvelopeEntityList = self.get(MESSAGE_PATH).await?; + Ok(entity_list.messages) } } diff --git a/libsignal-service/src/utils.rs b/libsignal-service/src/utils.rs new file mode 100644 index 000000000..140d54e22 --- /dev/null +++ b/libsignal-service/src/utils.rs @@ -0,0 +1,22 @@ +pub mod serde_base64 { + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(bytes: &T, serializer: S) -> Result + where + T: AsRef<[u8]>, + S: Serializer, + { + serializer.serialize_str(&base64::encode(bytes.as_ref())) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + use serde::de::Error; + String::deserialize(deserializer).and_then(|string| { + base64::decode(&string) + .map_err(|err| Error::custom(err.to_string())) + }) + } +} From 9275a54dfe1665d640879d0dc0904a147fa55129 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sat, 20 Jun 2020 21:35:06 +0200 Subject: [PATCH 06/23] Allow optional envelope binary contents. --- libsignal-service/src/envelope.rs | 10 ++++----- libsignal-service/src/utils.rs | 35 +++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index 17280562e..05b343d5b 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -1,4 +1,4 @@ -use crate::utils::serde_base64; +use crate::utils::{serde_base64, serde_optional_base64}; pub struct Envelope { inner: crate::proto::Envelope, @@ -13,10 +13,10 @@ pub struct EnvelopeEntity { pub source: String, pub source_uuid: String, pub source_device: i32, - #[serde(with = "serde_base64")] - pub message: Vec, - #[serde(with = "serde_base64")] - pub content: Vec, + #[serde(with = "serde_optional_base64")] + pub message: Option>, + #[serde(with = "serde_optional_base64")] + pub content: Option>, pub server_timestamp: i64, pub guid: String, } diff --git a/libsignal-service/src/utils.rs b/libsignal-service/src/utils.rs index 140d54e22..2813b6d46 100644 --- a/libsignal-service/src/utils.rs +++ b/libsignal-service/src/utils.rs @@ -20,3 +20,38 @@ pub mod serde_base64 { }) } } + +pub mod serde_optional_base64 { + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize( + bytes: &Option, + serializer: S, + ) -> Result + where + T: AsRef<[u8]>, + S: Serializer, + { + match bytes { + Some(bytes) => { + serializer.serialize_str(&base64::encode(bytes.as_ref())) + }, + None => serializer.serialize_none(), + } + } + + pub fn deserialize<'de, D>( + deserializer: D, + ) -> Result>, D::Error> + where + D: Deserializer<'de>, + { + use serde::de::Error; + match Option::::deserialize(deserializer)? { + Some(s) => base64::decode(&s) + .map_err(|err| Error::custom(err.to_string())) + .map(Some), + None => Ok(None), + } + } +} From 78958f97e02d7bd4a12e2cce7ed93cdb7696df61 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Mon, 22 Jun 2020 10:29:46 +0200 Subject: [PATCH 07/23] Receiver does not take any cryptographic context --- libsignal-service/src/receiver.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/libsignal-service/src/receiver.rs b/libsignal-service/src/receiver.rs index 52ed833ae..79d42f909 100644 --- a/libsignal-service/src/receiver.rs +++ b/libsignal-service/src/receiver.rs @@ -1,11 +1,8 @@ use crate::{configuration::*, envelope::Envelope, push_service::*}; -use libsignal_protocol::StoreContext; - /// Equivalent of Java's `SignalServiceMessageReceiver`. pub struct MessageReceiver { service: Service, - context: StoreContext, } #[derive(thiserror::Error, Debug)] @@ -15,9 +12,7 @@ pub enum MessageReceiverError { } impl MessageReceiver { - pub fn new(service: Service, context: StoreContext) -> Self { - MessageReceiver { service, context } - } + pub fn new(service: Service) -> Self { MessageReceiver { service } } /// One-off method to receive all pending messages. /// @@ -28,8 +23,9 @@ impl MessageReceiver { pub async fn retrieve_messages( &mut self, ) -> Result, MessageReceiverError> { - let _entities = self.service.get_messages().await?; - Ok(vec![]) + let entities = self.service.get_messages().await?; + let entities = entities.into_iter().map(Envelope::from).collect(); + Ok(entities) } pub async fn create_message_pipe(&self) -> () { unimplemented!() } From 43481c0bc7147e6b0d7c41b86be7cf78c69e56b7 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Mon, 22 Jun 2020 10:30:12 +0200 Subject: [PATCH 08/23] EnvelopeEntity to proto::Envelope --- libsignal-service/src/envelope.rs | 64 ++++++++++++++++++++++++++++--- libsignal-service/src/lib.rs | 7 ++++ 2 files changed, 65 insertions(+), 6 deletions(-) diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index 05b343d5b..a6ef09bb3 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -1,23 +1,75 @@ -use crate::utils::{serde_base64, serde_optional_base64}; +use crate::{ + utils::{serde_base64, serde_optional_base64}, + ServiceAddress, +}; pub struct Envelope { inner: crate::proto::Envelope, } +impl From for Envelope { + fn from(entity: EnvelopeEntity) -> Envelope { + // XXX: Java also checks whether .source and .source_uuid are + // not null. + if entity.source.is_some() && entity.source_device > 0 { + let address = ServiceAddress { + uuid: entity.source_uuid.clone(), + e164: entity.source.clone().unwrap(), + relay: None, + }; + Envelope::new_with_source(entity, address) + } else { + Envelope::new_from_entity(entity) + } + } +} + +impl Envelope { + fn new_from_entity(entity: EnvelopeEntity) -> Self { + Envelope { + inner: crate::proto::Envelope { + r#type: Some(entity.r#type), + timestamp: Some(entity.timestamp), + server_timestamp: Some(entity.server_timestamp), + server_guid: entity.source_uuid, + legacy_message: entity.message, + content: entity.content, + ..Default::default() + }, + } + } + + fn new_with_source(entity: EnvelopeEntity, source: ServiceAddress) -> Self { + Envelope { + inner: crate::proto::Envelope { + r#type: Some(entity.r#type), + source_device: Some(entity.source_device), + timestamp: Some(entity.timestamp), + server_timestamp: Some(entity.server_timestamp), + source_e164: Some(source.e164), + source_uuid: source.uuid, + legacy_message: entity.message, + content: entity.content, + ..Default::default() + }, + } + } +} + #[derive(serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct EnvelopeEntity { pub r#type: i32, pub relay: String, - pub timestamp: i64, - pub source: String, - pub source_uuid: String, - pub source_device: i32, + pub timestamp: u64, + pub source: Option, + pub source_uuid: Option, + pub source_device: u32, #[serde(with = "serde_optional_base64")] pub message: Option>, #[serde(with = "serde_optional_base64")] pub content: Option>, - pub server_timestamp: i64, + pub server_timestamp: u64, pub guid: String, } diff --git a/libsignal-service/src/lib.rs b/libsignal-service/src/lib.rs index a76f806fa..4df84da9a 100644 --- a/libsignal-service/src/lib.rs +++ b/libsignal-service/src/lib.rs @@ -16,6 +16,13 @@ pub const USER_AGENT: &'static str = pub struct TrustStore; +pub struct ServiceAddress { + pub uuid: Option, + // In principe, this is also Option if you follow the Java code. + pub e164: String, + pub relay: Option, +} + pub mod prelude { pub use crate::{ configuration::{Credentials, ServiceConfiguration}, From 2587da55348713643d7bbae1914887536d225dc7 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Wed, 24 Jun 2020 10:03:27 +0200 Subject: [PATCH 09/23] Import cleanup --- libsignal-service/src/receiver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsignal-service/src/receiver.rs b/libsignal-service/src/receiver.rs index 79d42f909..23fb95b5b 100644 --- a/libsignal-service/src/receiver.rs +++ b/libsignal-service/src/receiver.rs @@ -1,4 +1,4 @@ -use crate::{configuration::*, envelope::Envelope, push_service::*}; +use crate::{envelope::Envelope, push_service::*}; /// Equivalent of Java's `SignalServiceMessageReceiver`. pub struct MessageReceiver { From e24620cdda41ffd5136360b1889c8ed17e1d8da9 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sat, 27 Jun 2020 13:34:05 +0200 Subject: [PATCH 10/23] Scaffolding for web socket --- libsignal-service-actix/Cargo.toml | 2 + libsignal-service-actix/src/lib.rs | 1 + libsignal-service-actix/src/push_service.rs | 8 +++ libsignal-service-actix/src/websocket.rs | 72 +++++++++++++++++++++ libsignal-service/src/lib.rs | 1 + libsignal-service/src/messagepipe.rs | 7 ++ libsignal-service/src/push_service.rs | 10 +++ libsignal-service/src/receiver.rs | 8 ++- 8 files changed, 107 insertions(+), 2 deletions(-) create mode 100644 libsignal-service-actix/src/websocket.rs create mode 100644 libsignal-service/src/messagepipe.rs diff --git a/libsignal-service-actix/Cargo.toml b/libsignal-service-actix/Cargo.toml index 32ddfa56a..479de305b 100644 --- a/libsignal-service-actix/Cargo.toml +++ b/libsignal-service-actix/Cargo.toml @@ -11,8 +11,10 @@ libsignal-service = { path = "../libsignal-service" } libsignal-protocol = { git = "https://github.com/Michael-F-Bryan/libsignal-protocol-rs" } awc = { version = "2.0.0-alpha.2", features=["rustls"] } +actix = "0.10.0-alpha.3" actix-rt = "1.1" serde_json = "1.0" +futures = "0.3" rustls = "0.17" url = "2.1" serde = "1.0" diff --git a/libsignal-service-actix/src/lib.rs b/libsignal-service-actix/src/lib.rs index b2fa815e5..73a1ea704 100644 --- a/libsignal-service-actix/src/lib.rs +++ b/libsignal-service-actix/src/lib.rs @@ -1,4 +1,5 @@ pub mod push_service; +pub mod websocket; pub mod prelude { pub use crate::push_service::*; diff --git a/libsignal-service-actix/src/push_service.rs b/libsignal-service-actix/src/push_service.rs index 43bb3043c..6304071a0 100644 --- a/libsignal-service-actix/src/push_service.rs +++ b/libsignal-service-actix/src/push_service.rs @@ -5,6 +5,8 @@ use libsignal_service::{configuration::*, push_service::*}; use serde::Deserialize; use url::Url; +use crate::websocket::AwcWebSocket; + pub struct AwcPushService { cfg: ServiceConfiguration, base_url: Url, @@ -13,6 +15,8 @@ pub struct AwcPushService { #[async_trait::async_trait(?Send)] impl PushService for AwcPushService { + type WebSocket = AwcWebSocket; + async fn get(&mut self, path: &str) -> Result where for<'de> T: Deserialize<'de>, @@ -62,6 +66,10 @@ impl PushService for AwcPushService { }) } } + + async fn ws(&mut self) -> Result { + Ok(AwcWebSocket::with_client(&mut self.client, &self.base_url).await?) + } } impl AwcPushService { diff --git a/libsignal-service-actix/src/websocket.rs b/libsignal-service-actix/src/websocket.rs new file mode 100644 index 000000000..2cae92bf0 --- /dev/null +++ b/libsignal-service-actix/src/websocket.rs @@ -0,0 +1,72 @@ +use actix::prelude::*; +use awc::{error::WsProtocolError, ws}; +use futures::{channel::mpsc::*, prelude::*}; +use url::Url; + +use libsignal_service::push_service::ServiceError; + +pub struct AwcWebSocket { + actor: Addr, + messagestream: Receiver<()>, +} + +#[derive(thiserror::Error, Debug)] +pub enum AwcWebSocketError { + #[error("Could not connect to the Signal Server")] + ConnectionError(#[from] awc::error::WsClientError), +} + +impl From for ServiceError { + fn from(e: AwcWebSocketError) -> ServiceError { + todo!("error conversion {:?}", e) + } +} + +impl AwcWebSocket { + pub(crate) async fn with_client( + client: &mut awc::Client, + base_url: impl std::borrow::Borrow, + ) -> Result { + let url = base_url.borrow().join("/v1/websocket").expect("valid url"); + let (_response, framed) = client.ws(url.as_str()).connect().await?; + + log::debug!("WebSocket connected: {:?}", _response); + + let (sink, stream) = framed.split(); + + let (messagesink, messagestream) = channel(1); + let actor = AwcWebSocketActor::create(move |ctx| { + ctx.add_stream(stream); + + AwcWebSocketActor { + sink: Box::new(sink), + messagesink, + } + }); + + Ok(Self { + actor, + messagestream, + }) + } +} + +struct AwcWebSocketActor { + // XXX: in principle, this type is completely known... + sink: Box>, + messagesink: Sender<()>, +} + +impl Actor for AwcWebSocketActor { + type Context = Context; +} + +impl StreamHandler> for AwcWebSocketActor { + fn handle( + &mut self, + _: Result, + _ctx: &mut Self::Context, + ) { + log::trace!("Message on the WS"); + } +} diff --git a/libsignal-service/src/lib.rs b/libsignal-service/src/lib.rs index 4df84da9a..70b285778 100644 --- a/libsignal-service/src/lib.rs +++ b/libsignal-service/src/lib.rs @@ -1,6 +1,7 @@ mod account_manager; pub mod configuration; pub mod envelope; +pub mod messagepipe; pub mod models; pub mod push_service; pub mod receiver; diff --git a/libsignal-service/src/messagepipe.rs b/libsignal-service/src/messagepipe.rs new file mode 100644 index 000000000..9bbe67f03 --- /dev/null +++ b/libsignal-service/src/messagepipe.rs @@ -0,0 +1,7 @@ +pub struct MessagePipe { + ws: WS, +} + +impl MessagePipe { + pub fn from_socket(ws: WS) -> Self { MessagePipe { ws } } +} diff --git a/libsignal-service/src/push_service.rs b/libsignal-service/src/push_service.rs index 4c2140189..3fa5aa8cb 100644 --- a/libsignal-service/src/push_service.rs +++ b/libsignal-service/src/push_service.rs @@ -88,6 +88,8 @@ impl ServiceError { #[async_trait::async_trait(?Send)] pub trait PushService { + type WebSocket; + async fn get(&mut self, path: &str) -> Result where for<'de> T: Deserialize<'de>; @@ -105,6 +107,8 @@ pub trait PushService { let entity_list: EnvelopeEntityList = self.get(MESSAGE_PATH).await?; Ok(entity_list.messages) } + + async fn ws(&mut self) -> Result; } /// PushService that panics on every request, mainly for example code. @@ -124,10 +128,16 @@ impl PanicingPushService { #[async_trait::async_trait(?Send)] impl PushService for PanicingPushService { + type WebSocket = (); + async fn get(&mut self, _path: &str) -> Result where for<'de> T: Deserialize<'de>, { unimplemented!() } + + async fn ws(&mut self) -> Result { + unimplemented!() + } } diff --git a/libsignal-service/src/receiver.rs b/libsignal-service/src/receiver.rs index 23fb95b5b..a453b94b4 100644 --- a/libsignal-service/src/receiver.rs +++ b/libsignal-service/src/receiver.rs @@ -1,4 +1,4 @@ -use crate::{envelope::Envelope, push_service::*}; +use crate::{envelope::Envelope, messagepipe::MessagePipe, push_service::*}; /// Equivalent of Java's `SignalServiceMessageReceiver`. pub struct MessageReceiver { @@ -28,5 +28,9 @@ impl MessageReceiver { Ok(entities) } - pub async fn create_message_pipe(&self) -> () { unimplemented!() } + pub async fn create_message_pipe( + &mut self, + ) -> Result, MessageReceiverError> { + Ok(MessagePipe::from_socket(self.service.ws().await?)) + } } From fb93b9c52a6e2b2e5d344432279642c54bbbe7ac Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sun, 28 Jun 2020 12:38:44 +0200 Subject: [PATCH 11/23] Unnecessary wrapping type. --- libsignal-service/src/envelope.rs | 40 +++++++++++++------------------ 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index a6ef09bb3..e5ed17bd3 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -3,9 +3,7 @@ use crate::{ ServiceAddress, }; -pub struct Envelope { - inner: crate::proto::Envelope, -} +pub use crate::proto::Envelope; impl From for Envelope { fn from(entity: EnvelopeEntity) -> Envelope { @@ -27,31 +25,27 @@ impl From for Envelope { impl Envelope { fn new_from_entity(entity: EnvelopeEntity) -> Self { Envelope { - inner: crate::proto::Envelope { - r#type: Some(entity.r#type), - timestamp: Some(entity.timestamp), - server_timestamp: Some(entity.server_timestamp), - server_guid: entity.source_uuid, - legacy_message: entity.message, - content: entity.content, - ..Default::default() - }, + r#type: Some(entity.r#type), + timestamp: Some(entity.timestamp), + server_timestamp: Some(entity.server_timestamp), + server_guid: entity.source_uuid, + legacy_message: entity.message, + content: entity.content, + ..Default::default() } } fn new_with_source(entity: EnvelopeEntity, source: ServiceAddress) -> Self { Envelope { - inner: crate::proto::Envelope { - r#type: Some(entity.r#type), - source_device: Some(entity.source_device), - timestamp: Some(entity.timestamp), - server_timestamp: Some(entity.server_timestamp), - source_e164: Some(source.e164), - source_uuid: source.uuid, - legacy_message: entity.message, - content: entity.content, - ..Default::default() - }, + r#type: Some(entity.r#type), + source_device: Some(entity.source_device), + timestamp: Some(entity.timestamp), + server_timestamp: Some(entity.server_timestamp), + source_e164: Some(source.e164), + source_uuid: source.uuid, + legacy_message: entity.message, + content: entity.content, + ..Default::default() } } } From 4089bd362b012632b461d762b8a44e54b707f534 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sun, 28 Jun 2020 12:39:00 +0200 Subject: [PATCH 12/23] Unused import. --- libsignal-service/src/envelope.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index e5ed17bd3..bc317fab3 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -1,7 +1,4 @@ -use crate::{ - utils::{serde_base64, serde_optional_base64}, - ServiceAddress, -}; +use crate::{utils::serde_optional_base64, ServiceAddress}; pub use crate::proto::Envelope; From 98ba246e3e9ce4b1f5925637b9b056ae0b40ab01 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Thu, 9 Jul 2020 17:44:09 +0200 Subject: [PATCH 13/23] Add Credentials::login method --- libsignal-service/src/configuration.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/libsignal-service/src/configuration.rs b/libsignal-service/src/configuration.rs index c0a4652f8..209e163f6 100644 --- a/libsignal-service/src/configuration.rs +++ b/libsignal-service/src/configuration.rs @@ -16,11 +16,15 @@ impl Credentials { /// /// None when `self.password == None` pub fn authorization(&self) -> Option<(&str, &str)> { - let identifier: &str = if let Some(uuid) = self.uuid.as_ref() { + let identifier = self.login(); + Some((identifier, self.password.as_ref()?)) + } + + pub fn login(&self) -> &str { + if let Some(uuid) = self.uuid.as_ref() { uuid } else { &self.e164 - }; - Some((identifier, self.password.as_ref()?)) + } } } From 6cc366885a7c96804378601da227c8bf9b68a23d Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Thu, 9 Jul 2020 17:44:49 +0200 Subject: [PATCH 14/23] Implement Clone on credentials. --- libsignal-service/src/configuration.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/libsignal-service/src/configuration.rs b/libsignal-service/src/configuration.rs index 209e163f6..f51012038 100644 --- a/libsignal-service/src/configuration.rs +++ b/libsignal-service/src/configuration.rs @@ -5,6 +5,7 @@ pub struct ServiceConfiguration { pub contact_discovery_url: Vec, } +#[derive(Clone)] pub struct Credentials { pub uuid: Option, pub e164: String, From b21ee0d62242eac509ca320a6fb3a1dac1e03488 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Thu, 9 Jul 2020 18:10:10 +0200 Subject: [PATCH 15/23] Get rid of some warnings. --- libsignal-service/src/envelope.rs | 2 ++ libsignal-service/src/utils.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index bc317fab3..76fb9f4f4 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] // XXX: remove when all constants on bottom are used. + use crate::{utils::serde_optional_base64, ServiceAddress}; pub use crate::proto::Envelope; diff --git a/libsignal-service/src/utils.rs b/libsignal-service/src/utils.rs index 2813b6d46..bf5a6e75c 100644 --- a/libsignal-service/src/utils.rs +++ b/libsignal-service/src/utils.rs @@ -1,3 +1,4 @@ +#[allow(dead_code)] pub mod serde_base64 { use serde::{Deserialize, Deserializer, Serializer}; From 7901d9a0b1b5ef561bcc4941241c858b1364c6c5 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Thu, 9 Jul 2020 18:14:12 +0200 Subject: [PATCH 16/23] Further implementation of WebSocket --- libsignal-service-actix/Cargo.toml | 1 + libsignal-service-actix/src/push_service.rs | 24 +++- libsignal-service-actix/src/websocket.rs | 147 +++++++++++++++----- libsignal-service/Cargo.toml | 4 + libsignal-service/src/lib.rs | 2 + libsignal-service/src/messagepipe.rs | 124 ++++++++++++++++- libsignal-service/src/push_service.rs | 36 ++++- libsignal-service/src/receiver.rs | 3 +- 8 files changed, 292 insertions(+), 49 deletions(-) diff --git a/libsignal-service-actix/Cargo.toml b/libsignal-service-actix/Cargo.toml index 479de305b..f4005e93f 100644 --- a/libsignal-service-actix/Cargo.toml +++ b/libsignal-service-actix/Cargo.toml @@ -15,6 +15,7 @@ actix = "0.10.0-alpha.3" actix-rt = "1.1" serde_json = "1.0" futures = "0.3" +bytes = "0.5" rustls = "0.17" url = "2.1" serde = "1.0" diff --git a/libsignal-service-actix/src/push_service.rs b/libsignal-service-actix/src/push_service.rs index 6304071a0..d2dc6d659 100644 --- a/libsignal-service-actix/src/push_service.rs +++ b/libsignal-service-actix/src/push_service.rs @@ -1,16 +1,20 @@ use std::{sync::Arc, time::Duration}; use awc::Connector; -use libsignal_service::{configuration::*, push_service::*}; +use libsignal_service::{ + configuration::*, messagepipe::WebSocketService, push_service::*, +}; use serde::Deserialize; use url::Url; use crate::websocket::AwcWebSocket; +#[derive(Clone)] pub struct AwcPushService { cfg: ServiceConfiguration, base_url: Url, client: awc::Client, + credentials: Credentials, } #[async_trait::async_trait(?Send)] @@ -67,8 +71,21 @@ impl PushService for AwcPushService { } } - async fn ws(&mut self) -> Result { - Ok(AwcWebSocket::with_client(&mut self.client, &self.base_url).await?) + async fn ws( + &mut self, + ) -> Result< + ( + Self::WebSocket, + ::Stream, + ), + ServiceError, + > { + Ok(AwcWebSocket::with_client( + &mut self.client, + &self.base_url, + Some(&self.credentials), + ) + .await?) } } @@ -112,6 +129,7 @@ impl AwcPushService { cfg, base_url, client, + credentials, } } } diff --git a/libsignal-service-actix/src/websocket.rs b/libsignal-service-actix/src/websocket.rs index 2cae92bf0..41d236974 100644 --- a/libsignal-service-actix/src/websocket.rs +++ b/libsignal-service-actix/src/websocket.rs @@ -1,13 +1,16 @@ -use actix::prelude::*; -use awc::{error::WsProtocolError, ws}; +use actix::Arbiter; + +use awc::{error::WsProtocolError, ws, ws::Frame}; +use bytes::Bytes; use futures::{channel::mpsc::*, prelude::*}; use url::Url; -use libsignal_service::push_service::ServiceError; +use libsignal_service::{ + configuration::Credentials, messagepipe::*, push_service::ServiceError, +}; pub struct AwcWebSocket { - actor: Addr, - messagestream: Receiver<()>, + socket_sink: Box + Unpin>, } #[derive(thiserror::Error, Debug)] @@ -22,51 +25,119 @@ impl From for ServiceError { } } +impl From for AwcWebSocketError { + fn from(e: WsProtocolError) -> AwcWebSocketError { + todo!("error conversion {:?}", e) + // return Some(Err(ServiceError::WsError { + // reason: e.to_string(), + // })); + } +} + +/// Process the WebSocket, until it times out. +async fn process( + mut socket_stream: S, + mut incoming_sink: Sender, +) -> Result<(), AwcWebSocketError> +where + S: Unpin, + S: Stream>, +{ + while let Some(frame) = socket_stream.next().await { + let frame = match frame? { + Frame::Binary(s) => s, + + Frame::Continuation(_c) => todo!(), + Frame::Ping(msg) => { + log::warn!("Received Ping({:?})", msg); + // XXX: send pong and make the above log::debug + continue; + }, + Frame::Pong(msg) => { + log::trace!("Received Pong({:?})", msg); + + continue; + }, + Frame::Text(frame) => { + log::warn!("Frame::Text {:?}", frame); + + // this is a protocol violation, maybe break; is better? + continue; + }, + + Frame::Close(c) => { + log::warn!("Websocket closing: {:?}", c); + + break; + }, + }; + + // Match SendError + if let Err(e) = incoming_sink.send(frame).await { + log::info!("Websocket sink has closed: {:?}.", e); + break; + } + } + Ok(()) +} + impl AwcWebSocket { pub(crate) async fn with_client( client: &mut awc::Client, base_url: impl std::borrow::Borrow, - ) -> Result { - let url = base_url.borrow().join("/v1/websocket").expect("valid url"); - let (_response, framed) = client.ws(url.as_str()).connect().await?; + credentials: Option<&Credentials>, + ) -> Result<(Self, ::Stream), AwcWebSocketError> + { + let mut url = + base_url.borrow().join("/v1/websocket/").expect("valid url"); + url.set_scheme("wss").expect("valid https base url"); - log::debug!("WebSocket connected: {:?}", _response); + if let Some(credentials) = credentials { + url.query_pairs_mut() + .append_pair("login", credentials.login()) + .append_pair( + "password", + credentials.password.as_ref().expect("a password"), + ); + } - let (sink, stream) = framed.split(); + log::trace!("Will start websocket at {:?}", url); + let (response, framed) = client.ws(url.as_str()).connect().await?; - let (messagesink, messagestream) = channel(1); - let actor = AwcWebSocketActor::create(move |ctx| { - ctx.add_stream(stream); + log::debug!("WebSocket connected: {:?}", response); - AwcWebSocketActor { - sink: Box::new(sink), - messagesink, - } - }); + let (incoming_sink, incoming_stream) = channel(1); - Ok(Self { - actor, - messagestream, - }) - } -} + let (socket_sink, socket_stream) = framed.split(); + let processing_task = process(socket_stream, incoming_sink); -struct AwcWebSocketActor { - // XXX: in principle, this type is completely known... - sink: Box>, - messagesink: Sender<()>, -} + // When the processing_task stops, the consuming stream and sink also + // terminate. + Arbiter::spawn(processing_task.map(|v| match v { + Ok(()) => (), + Err(e) => { + log::warn!("Processing task terminated with error: {:?}", e) + }, + })); -impl Actor for AwcWebSocketActor { - type Context = Context; + Ok(( + Self { + socket_sink: Box::new(socket_sink), + }, + incoming_stream, + )) + } } -impl StreamHandler> for AwcWebSocketActor { - fn handle( - &mut self, - _: Result, - _ctx: &mut Self::Context, - ) { - log::trace!("Message on the WS"); +#[async_trait::async_trait(?Send)] +impl WebSocketService for AwcWebSocket { + type Stream = Receiver; + + async fn send_message(&mut self, msg: Bytes) -> Result<(), ServiceError> { + self.socket_sink + .send(ws::Message::Binary(msg)) + .await + .map_err(AwcWebSocketError::from)?; + Ok(()) } } diff --git a/libsignal-service/Cargo.toml b/libsignal-service/Cargo.toml index 8caacf8f9..ea44a77ac 100644 --- a/libsignal-service/Cargo.toml +++ b/libsignal-service/Cargo.toml @@ -12,10 +12,14 @@ failure = "0.1.5" async-trait = "0.1.30" url = "2.1.1" base64 = "0.12" +bytes = "0.5" +futures = "0.3" +pin-project = "0.4" thiserror = "1.0" serde = {version = "1.0", features=["derive"]} prost = "0.6" http = "0.2.1" +log = "0.4.8" [dev-dependencies] structopt = "0.2.17" diff --git a/libsignal-service/src/lib.rs b/libsignal-service/src/lib.rs index 70b285778..3216a4fc4 100644 --- a/libsignal-service/src/lib.rs +++ b/libsignal-service/src/lib.rs @@ -27,6 +27,8 @@ pub struct ServiceAddress { pub mod prelude { pub use crate::{ configuration::{Credentials, ServiceConfiguration}, + envelope::Envelope, + push_service::ServiceError, receiver::MessageReceiver, }; } diff --git a/libsignal-service/src/messagepipe.rs b/libsignal-service/src/messagepipe.rs index 9bbe67f03..0cc308352 100644 --- a/libsignal-service/src/messagepipe.rs +++ b/libsignal-service/src/messagepipe.rs @@ -1,7 +1,125 @@ -pub struct MessagePipe { +use bytes::{Bytes, BytesMut}; +use futures::{ + channel::mpsc::{self, Sender}, + prelude::*, +}; +use pin_project::pin_project; +use prost::Message; + +pub use crate::{ + proto::{ + web_socket_message, WebSocketMessage, WebSocketRequestMessage, + WebSocketResponseMessage, + }, + push_service::ServiceError, +}; + +#[async_trait::async_trait(?Send)] +pub trait WebSocketService { + type Stream: Stream + Unpin; + + async fn send_message(&mut self, msg: Bytes) -> Result<(), ServiceError>; +} + +#[pin_project] +pub struct MessagePipe { ws: WS, + #[pin] + stream: WS::Stream, +} + +impl MessagePipe { + pub fn from_socket(ws: WS, stream: WS::Stream) -> Self { + MessagePipe { ws, stream } + } + + async fn send_response( + &mut self, + r: WebSocketResponseMessage, + ) -> Result<(), ServiceError> { + let msg = WebSocketMessage { + r#type: Some(web_socket_message::Type::Response.into()), + response: Some(r), + ..Default::default() + }; + let mut buffer = BytesMut::with_capacity(msg.encoded_len()); + msg.encode(&mut buffer).unwrap(); + self.ws.send_message(buffer.into()).await + } + + /// Worker task that + async fn run( + mut self, + mut sink: Sender>, + ) -> Result<(), mpsc::SendError> { + while let Some(frame) = self.stream.next().await { + // WebsocketConnection::onMessage(ByteString) + let msg = match WebSocketMessage::decode(frame) { + Ok(msg) => msg, + Err(e) => { + sink.send(Err(e.into())).await?; + continue; + }, + }; + + log::trace!("Decoded {:?}", msg); + + use web_socket_message::Type; + match (msg.r#type(), msg.request) { + (Type::Unknown, _) => { + sink.send(Err(ServiceError::InvalidFrameError { + reason: "Unknown frame type".into(), + })) + .await?; + }, + (Type::Request, Some(request)) => { + // Java: MessagePipe::read + let response = + WebSocketResponseMessage::from_request(&request); + if let Err(e) = self.send_response(response).await { + sink.send(Err(e)).await?; + } + }, + (Type::Request, None) => { + sink.send(Err(ServiceError::InvalidFrameError { + reason: + "Type was request, but does not contain request." + .into(), + })) + .await?; + }, + (Type::Response, _) => {}, + } + } + Ok(()) + } + + /// Returns the stream of `Envelope`s + pub fn stream( + self, + ) -> impl Stream> + { + let (sink, stream) = mpsc::channel(1); + + let stream = stream.map(Some); + let runner = self.run(sink).map(|_| { + log::info!("Sink was closed."); + None + }); + + let combined = futures::stream::select(stream, runner.into_stream()); + combined.filter_map(|x| async { x }) + } } -impl MessagePipe { - pub fn from_socket(ws: WS) -> Self { MessagePipe { ws } } +/// WebSocketService that panics on every request, mainly for example code. +pub struct PanicingWebSocketService; + +#[async_trait::async_trait(?Send)] +impl WebSocketService for PanicingWebSocketService { + type Stream = futures::channel::mpsc::Receiver; + + async fn send_message(&mut self, _msg: Bytes) -> Result<(), ServiceError> { + unimplemented!(); + } } diff --git a/libsignal-service/src/push_service.rs b/libsignal-service/src/push_service.rs index 3fa5aa8cb..61417fab3 100644 --- a/libsignal-service/src/push_service.rs +++ b/libsignal-service/src/push_service.rs @@ -1,6 +1,7 @@ use crate::{ configuration::{Credentials, ServiceConfiguration}, envelope::*, + messagepipe::WebSocketService, }; use http::StatusCode; @@ -64,6 +65,17 @@ pub enum ServiceError { Unauthorized, #[error("Unexpected response: HTTP {http_code}")] UnhandledResponseCode { http_code: u16 }, + + #[error("Websocket error: {reason}")] + WsError { reason: String }, + #[error("Websocket closing: {reason}")] + WsClosing { reason: String }, + + #[error("Undecodable frame")] + DecodeError(#[from] prost::DecodeError), + + #[error("Invalid frame: {reason}")] + InvalidFrameError { reason: String }, } impl ServiceError { @@ -88,7 +100,7 @@ impl ServiceError { #[async_trait::async_trait(?Send)] pub trait PushService { - type WebSocket; + type WebSocket: WebSocketService; async fn get(&mut self, path: &str) -> Result where @@ -108,7 +120,15 @@ pub trait PushService { Ok(entity_list.messages) } - async fn ws(&mut self) -> Result; + async fn ws( + &mut self, + ) -> Result< + ( + Self::WebSocket, + ::Stream, + ), + ServiceError, + >; } /// PushService that panics on every request, mainly for example code. @@ -128,7 +148,7 @@ impl PanicingPushService { #[async_trait::async_trait(?Send)] impl PushService for PanicingPushService { - type WebSocket = (); + type WebSocket = crate::messagepipe::PanicingWebSocketService; async fn get(&mut self, _path: &str) -> Result where @@ -137,7 +157,15 @@ impl PushService for PanicingPushService { unimplemented!() } - async fn ws(&mut self) -> Result { + async fn ws( + &mut self, + ) -> Result< + ( + Self::WebSocket, + ::Stream, + ), + ServiceError, + > { unimplemented!() } } diff --git a/libsignal-service/src/receiver.rs b/libsignal-service/src/receiver.rs index a453b94b4..af9b35b0b 100644 --- a/libsignal-service/src/receiver.rs +++ b/libsignal-service/src/receiver.rs @@ -31,6 +31,7 @@ impl MessageReceiver { pub async fn create_message_pipe( &mut self, ) -> Result, MessageReceiverError> { - Ok(MessagePipe::from_socket(self.service.ws().await?)) + let (ws, stream) = self.service.ws().await?; + Ok(MessagePipe::from_socket(ws, stream)) } } From 75fbcaa29e89e3bb31afa608bf5c1249d5e07856 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Thu, 9 Jul 2020 18:56:16 +0200 Subject: [PATCH 17/23] Add is_signal_key_encrypted, is_signal_service_envelope. --- libsignal-service/src/proto.rs | 39 ++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/libsignal-service/src/proto.rs b/libsignal-service/src/proto.rs index af7dd6d2f..6a612fc17 100644 --- a/libsignal-service/src/proto.rs +++ b/libsignal-service/src/proto.rs @@ -1 +1,40 @@ include!(concat!(env!("OUT_DIR"), "/signalservice.rs")); + +use std::ops::Deref; + +impl WebSocketRequestMessage { + /// Equivalent of + /// `SignalServiceMessagePipe::isSignalServiceEnvelope(WebSocketMessage)`. + pub fn is_signal_service_envelope(&self) -> bool { + self.verb.as_ref().map(Deref::deref) == Some("PUT") + && self.path.as_ref().map(Deref::deref) == Some("/api/v1/message") + } + + /// Equivalent of + /// `SignalServiceMessagePipe::isSignalKeyEncrypted(WebSocketMessage)`. + pub fn is_signal_key_encrypted(&self) -> bool { + if self.headers.len() == 0 { + return true; + } + + for header in &self.headers { + let parts: Vec<_> = header.split(':').collect(); + if parts.len() != 2 { + log::warn!( + "Got a weird header: {:?}, split in {:?}", + header, + parts + ); + continue; + } + + if parts[0].trim().eq_ignore_ascii_case("X-Signal-Key") { + if parts[1].trim().eq_ignore_ascii_case("false") { + return false; + } + } + } + + false + } +} From f1854f8e172ee748234bfcc7b9af13a00a8ede40 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Thu, 9 Jul 2020 19:25:00 +0200 Subject: [PATCH 18/23] Add ResponseMessage::from_request. --- libsignal-service/src/proto.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/libsignal-service/src/proto.rs b/libsignal-service/src/proto.rs index 6a612fc17..410bdabad 100644 --- a/libsignal-service/src/proto.rs +++ b/libsignal-service/src/proto.rs @@ -38,3 +38,25 @@ impl WebSocketRequestMessage { false } } + +impl WebSocketResponseMessage { + /// Equivalent of + /// `SignalServiceMessagePipe::isSignalServiceEnvelope(WebSocketMessage)`. + pub fn from_request(msg: &WebSocketRequestMessage) -> Self { + if msg.is_signal_service_envelope() { + WebSocketResponseMessage { + id: msg.id, + status: Some(200), + message: Some("OK".to_string()), + ..Default::default() + } + } else { + WebSocketResponseMessage { + id: msg.id, + status: Some(400), + message: Some("Unknown".to_string()), + ..Default::default() + } + } + } +} From da384c5a56c10cc4337b6fc942443427582ce406 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Fri, 10 Jul 2020 17:43:34 +0200 Subject: [PATCH 19/23] Accept Credentials in ws() call --- libsignal-service-actix/src/push_service.rs | 5 ++--- libsignal-service/src/messagepipe.rs | 14 ++++++++++++-- libsignal-service/src/push_service.rs | 2 ++ 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/libsignal-service-actix/src/push_service.rs b/libsignal-service-actix/src/push_service.rs index d2dc6d659..cfabb9b75 100644 --- a/libsignal-service-actix/src/push_service.rs +++ b/libsignal-service-actix/src/push_service.rs @@ -14,7 +14,6 @@ pub struct AwcPushService { cfg: ServiceConfiguration, base_url: Url, client: awc::Client, - credentials: Credentials, } #[async_trait::async_trait(?Send)] @@ -73,6 +72,7 @@ impl PushService for AwcPushService { async fn ws( &mut self, + credentials: Credentials, ) -> Result< ( Self::WebSocket, @@ -83,7 +83,7 @@ impl PushService for AwcPushService { Ok(AwcWebSocket::with_client( &mut self.client, &self.base_url, - Some(&self.credentials), + Some(&credentials), ) .await?) } @@ -129,7 +129,6 @@ impl AwcPushService { cfg, base_url, client, - credentials, } } } diff --git a/libsignal-service/src/messagepipe.rs b/libsignal-service/src/messagepipe.rs index 0cc308352..56911dce5 100644 --- a/libsignal-service/src/messagepipe.rs +++ b/libsignal-service/src/messagepipe.rs @@ -7,6 +7,7 @@ use pin_project::pin_project; use prost::Message; pub use crate::{ + configuration::Credentials, proto::{ web_socket_message, WebSocketMessage, WebSocketRequestMessage, WebSocketResponseMessage, @@ -26,11 +27,20 @@ pub struct MessagePipe { ws: WS, #[pin] stream: WS::Stream, + credentials: Credentials, } impl MessagePipe { - pub fn from_socket(ws: WS, stream: WS::Stream) -> Self { - MessagePipe { ws, stream } + pub fn from_socket( + ws: WS, + stream: WS::Stream, + credentials: Credentials, + ) -> Self { + MessagePipe { + ws, + stream, + credentials, + } } async fn send_response( diff --git a/libsignal-service/src/push_service.rs b/libsignal-service/src/push_service.rs index 61417fab3..21704d02b 100644 --- a/libsignal-service/src/push_service.rs +++ b/libsignal-service/src/push_service.rs @@ -122,6 +122,7 @@ pub trait PushService { async fn ws( &mut self, + credentials: Credentials, ) -> Result< ( Self::WebSocket, @@ -159,6 +160,7 @@ impl PushService for PanicingPushService { async fn ws( &mut self, + _credentials: Credentials, ) -> Result< ( Self::WebSocket, From 9167ee630081de31fd9297f22a81b655b4f13761 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Fri, 10 Jul 2020 23:31:47 +0200 Subject: [PATCH 20/23] Implement decryption of websocket data with signaling key. --- libsignal-service/Cargo.toml | 5 ++ libsignal-service/examples/registering.rs | 14 +++++ libsignal-service/src/configuration.rs | 4 ++ libsignal-service/src/envelope.rs | 76 ++++++++++++++++++++--- libsignal-service/src/messagepipe.rs | 30 ++++++--- libsignal-service/src/push_service.rs | 3 + libsignal-service/src/receiver.rs | 10 ++- 7 files changed, 122 insertions(+), 20 deletions(-) diff --git a/libsignal-service/Cargo.toml b/libsignal-service/Cargo.toml index ea44a77ac..300010229 100644 --- a/libsignal-service/Cargo.toml +++ b/libsignal-service/Cargo.toml @@ -21,6 +21,11 @@ prost = "0.6" http = "0.2.1" log = "0.4.8" +sha2 = "0.9.0" +hmac = "0.8.0" +aes = "0.4.0" +block-modes = "0.5.0" + [dev-dependencies] structopt = "0.2.17" tokio = { version = "0.2", features=["macros"] } diff --git a/libsignal-service/examples/registering.rs b/libsignal-service/examples/registering.rs index 24414d1d0..92f4c6b37 100644 --- a/libsignal-service/examples/registering.rs +++ b/libsignal-service/examples/registering.rs @@ -34,10 +34,19 @@ async fn main() -> Result<(), Error> { let password = args.get_password()?; let config = ServiceConfiguration::default(); + + let mut signaling_key = [0u8; 52]; + base64::decode_config_slice( + args.signaling_key, + base64::STANDARD, + &mut signaling_key, + ) + .unwrap(); let credentials = Credentials { uuid: None, e164: args.username, password: Some(password), + signaling_key, }; let service = PanicingPushService::new( @@ -80,6 +89,11 @@ pub struct Args { raw(default_value = "libsignal_service::USER_AGENT") )] pub user_agent: String, + #[structopt( + long = "signaling-key", + help = "The key used to encrypt and authenticate messages in transit, base64 encoded." + )] + pub signaling_key: String, } impl Args { diff --git a/libsignal-service/src/configuration.rs b/libsignal-service/src/configuration.rs index f51012038..52814a56a 100644 --- a/libsignal-service/src/configuration.rs +++ b/libsignal-service/src/configuration.rs @@ -1,3 +1,5 @@ +use crate::envelope::{CIPHER_KEY_SIZE, MAC_KEY_SIZE}; + #[derive(Clone, Default)] pub struct ServiceConfiguration { pub service_urls: Vec, @@ -10,6 +12,8 @@ pub struct Credentials { pub uuid: Option, pub e164: String, pub password: Option, + + pub signaling_key: [u8; CIPHER_KEY_SIZE + MAC_KEY_SIZE], } impl Credentials { diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index 76fb9f4f4..f2a8fb4ad 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -1,6 +1,10 @@ #![allow(dead_code)] // XXX: remove when all constants on bottom are used. -use crate::{utils::serde_optional_base64, ServiceAddress}; +use prost::Message; + +use crate::{ + push_service::ServiceError, utils::serde_optional_base64, ServiceAddress, +}; pub use crate::proto::Envelope; @@ -22,6 +26,58 @@ impl From for Envelope { } impl Envelope { + pub fn decrypt( + input: &[u8], + signaling_key: &[u8; CIPHER_KEY_SIZE + MAC_KEY_SIZE], + is_signaling_key_encrypted: bool, + ) -> Result { + if !is_signaling_key_encrypted { + Ok(Envelope::decode(input)?) + } else { + if input.len() < VERSION_LENGTH + || input[VERSION_OFFSET] != SUPPORTED_VERSION + { + return Err(ServiceError::InvalidFrameError { + reason: "Unsupported signaling cryptogram version".into(), + }); + } + + let aes_key = &signaling_key[..CIPHER_KEY_SIZE]; + let mac_key = &signaling_key[CIPHER_KEY_SIZE..]; + let mac = &input[(input.len() - MAC_SIZE)..]; + let input_for_mac = &input[..(input.len() - MAC_SIZE)]; + let iv = &input[IV_OFFSET..(IV_OFFSET + IV_LENGTH)]; + debug_assert_eq!(mac_key.len(), MAC_KEY_SIZE); + debug_assert_eq!(aes_key.len(), CIPHER_KEY_SIZE); + debug_assert_eq!(iv.len(), IV_LENGTH); + + // Verify MAC + use hmac::{Hmac, Mac, NewMac}; + use sha2::Sha256; + let mut verifier = Hmac::::new_varkey(mac_key) + .expect("Hmac can take any size key"); + verifier.update(input_for_mac); + // XXX: possible timing attack, but we need the bytes for a + // truncated view... + let our_mac = verifier.finalize().into_bytes(); + if &our_mac[..MAC_SIZE] != mac { + return Err(ServiceError::MacError); + } + + use aes::Aes256; + // libsignal-service-java uses Pkcs5, + // but that should not matter. + // https://crypto.stackexchange.com/questions/9043/what-is-the-difference-between-pkcs5-padding-and-pkcs7-padding + use block_modes::{block_padding::Pkcs7, BlockMode, Cbc}; + let cipher = Cbc::::new_var(&aes_key, iv) + .expect("initalization of CBC/AES/PKCS7"); + let input = &input[CIPHERTEXT_OFFSET..(input.len() - MAC_SIZE)]; + let input = cipher.decrypt_vec(input).expect("decryption"); + + Ok(Envelope::decode(&input as &[u8])?) + } + } + fn new_from_entity(entity: EnvelopeEntity) -> Self { Envelope { r#type: Some(entity.r#type), @@ -71,13 +127,13 @@ pub(crate) struct EnvelopeEntityList { pub messages: Vec, } -const SUPPORTED_VERSION: usize = 1; -const CIPHER_KEY_SIZE: usize = 32; -const MAC_KEY_SIZE: usize = 20; -const MAC_SIZE: usize = 10; +pub(crate) const SUPPORTED_VERSION: u8 = 1; +pub(crate) const CIPHER_KEY_SIZE: usize = 32; +pub(crate) const MAC_KEY_SIZE: usize = 20; +pub(crate) const MAC_SIZE: usize = 10; -const VERSION_OFFSET: usize = 0; -const VERSION_LENGTH: usize = 1; -const IV_OFFSET: usize = VERSION_OFFSET + VERSION_LENGTH; -const IV_LENGTH: usize = 16; -const CIPHERTEXT_OFFSET: usize = IV_OFFSET + IV_LENGTH; +pub(crate) const VERSION_OFFSET: usize = 0; +pub(crate) const VERSION_LENGTH: usize = 1; +pub(crate) const IV_OFFSET: usize = VERSION_OFFSET + VERSION_LENGTH; +pub(crate) const IV_LENGTH: usize = 16; +pub(crate) const CIPHERTEXT_OFFSET: usize = IV_OFFSET + IV_LENGTH; diff --git a/libsignal-service/src/messagepipe.rs b/libsignal-service/src/messagepipe.rs index 56911dce5..a81dfb7bd 100644 --- a/libsignal-service/src/messagepipe.rs +++ b/libsignal-service/src/messagepipe.rs @@ -9,8 +9,8 @@ use prost::Message; pub use crate::{ configuration::Credentials, proto::{ - web_socket_message, WebSocketMessage, WebSocketRequestMessage, - WebSocketResponseMessage, + web_socket_message, Envelope, WebSocketMessage, + WebSocketRequestMessage, WebSocketResponseMessage, }, push_service::ServiceError, }; @@ -60,7 +60,7 @@ impl MessagePipe { /// Worker task that async fn run( mut self, - mut sink: Sender>, + mut sink: Sender>, ) -> Result<(), mpsc::SendError> { while let Some(frame) = self.stream.next().await { // WebsocketConnection::onMessage(ByteString) @@ -86,6 +86,25 @@ impl MessagePipe { // Java: MessagePipe::read let response = WebSocketResponseMessage::from_request(&request); + + if request.is_signal_service_envelope() { + let body = if let Some(body) = request.body.as_ref() { + body + } else { + sink.send(Err(ServiceError::InvalidFrameError { + reason: "Request without body.".into(), + })) + .await?; + continue; + }; + let envelope = Envelope::decrypt( + body, + &self.credentials.signaling_key, + request.is_signal_key_encrypted(), + ); + sink.send(envelope.map_err(Into::into)).await?; + } + if let Err(e) = self.send_response(response).await { sink.send(Err(e)).await?; } @@ -105,10 +124,7 @@ impl MessagePipe { } /// Returns the stream of `Envelope`s - pub fn stream( - self, - ) -> impl Stream> - { + pub fn stream(self) -> impl Stream> { let (sink, stream) = mpsc::channel(1); let stream = stream.map(Some); diff --git a/libsignal-service/src/push_service.rs b/libsignal-service/src/push_service.rs index 21704d02b..acee856c0 100644 --- a/libsignal-service/src/push_service.rs +++ b/libsignal-service/src/push_service.rs @@ -76,6 +76,9 @@ pub enum ServiceError { #[error("Invalid frame: {reason}")] InvalidFrameError { reason: String }, + + #[error("MAC error")] + MacError, } impl ServiceError { diff --git a/libsignal-service/src/receiver.rs b/libsignal-service/src/receiver.rs index af9b35b0b..7c31bc9db 100644 --- a/libsignal-service/src/receiver.rs +++ b/libsignal-service/src/receiver.rs @@ -1,4 +1,7 @@ -use crate::{envelope::Envelope, messagepipe::MessagePipe, push_service::*}; +use crate::{ + configuration::Credentials, envelope::Envelope, messagepipe::MessagePipe, + push_service::*, +}; /// Equivalent of Java's `SignalServiceMessageReceiver`. pub struct MessageReceiver { @@ -30,8 +33,9 @@ impl MessageReceiver { pub async fn create_message_pipe( &mut self, + credentials: Credentials, ) -> Result, MessageReceiverError> { - let (ws, stream) = self.service.ws().await?; - Ok(MessagePipe::from_socket(ws, stream)) + let (ws, stream) = self.service.ws(credentials.clone()).await?; + Ok(MessagePipe::from_socket(ws, stream, credentials)) } } From 941bb7f9c30bbcfa145fd34873460bafcae7986d Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Mon, 13 Jul 2020 12:48:12 +0200 Subject: [PATCH 21/23] Add decryption test for Envelope::decrypt. --- libsignal-service/src/envelope.rs | 35 +++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index f2a8fb4ad..3efb7ba6b 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -137,3 +137,38 @@ pub(crate) const VERSION_LENGTH: usize = 1; pub(crate) const IV_OFFSET: usize = VERSION_OFFSET + VERSION_LENGTH; pub(crate) const IV_LENGTH: usize = 16; pub(crate) const CIPHERTEXT_OFFSET: usize = IV_OFFSET + IV_LENGTH; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn decrypt_envelope() { + // This is a real message, reencrypted with the zero-key. + let body = [ + 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 79, 32, 12, 100, + 26, 157, 130, 210, 254, 174, 87, 45, 238, 126, 68, 39, 188, 171, + 156, 16, 10, 138, 233, 73, 202, 52, 125, 102, 121, 182, 71, 148, 8, + 3, 134, 149, 154, 67, 116, 40, 146, 253, 242, 196, 139, 203, 14, + 174, 254, 78, 27, 47, 108, 60, 202, 60, 42, 210, 242, 58, 13, 185, + 67, 147, 166, 191, 71, 164, 128, 81, 177, 199, 147, 252, 162, 229, + 143, 98, 141, 222, 46, 83, 109, 82, 196, 109, 161, 40, 108, 207, + 82, 53, 162, 205, 171, 33, 140, 5, 74, 76, 150, 22, 122, 176, 189, + 228, 176, 234, 176, 13, 118, 181, 134, 35, 133, 164, 160, 205, 176, + 32, 188, 185, 166, 73, 24, 164, 20, 187, 2, 226, 186, 238, 98, 57, + 51, 76, 156, 83, 113, 72, 184, 50, 220, 49, 138, 46, 36, 4, 49, + 215, 66, 173, 58, 139, 187, 6, 252, 97, 191, 69, 246, 82, 48, 177, + 11, 149, 168, 93, 15, 170, 125, 131, 101, 103, 253, 177, 165, 71, + 85, 219, 207, 106, 12, 58, 47, 159, 33, 243, 107, 6, 117, 141, 209, + 115, 207, 19, 236, 137, 195, 230, 167, 225, 172, 99, 204, 113, 125, + 69, 125, 97, 252, 90, 248, 198, 175, 240, 187, 246, 164, 220, 102, + 7, 224, 124, 28, 170, 6, 4, 137, 155, 233, 85, 125, 93, 119, 97, + 183, 114, 193, 10, 184, 191, 202, 109, 97, 116, 194, 152, 40, 46, + 202, 49, 195, 138, 14, 2, 255, 44, 107, 160, 45, 150, 6, 78, 145, + 99, + ]; + + let signaling_key = [0u8; 52]; + let _ = Envelope::decrypt(&body, &signaling_key, true).unwrap(); + } +} From 69671c433866e65e35769eb2e6ad938d62199f0f Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Mon, 13 Jul 2020 13:29:25 +0200 Subject: [PATCH 22/23] Doc comment about acknowledged messages. --- libsignal-service/src/messagepipe.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libsignal-service/src/messagepipe.rs b/libsignal-service/src/messagepipe.rs index a81dfb7bd..46303ce2e 100644 --- a/libsignal-service/src/messagepipe.rs +++ b/libsignal-service/src/messagepipe.rs @@ -124,6 +124,8 @@ impl MessagePipe { } /// Returns the stream of `Envelope`s + /// + /// Envelopes yielded are acknowledged. pub fn stream(self) -> impl Stream> { let (sink, stream) = mpsc::channel(1); From 523b0cad146e08896f569144d0cd556e47ae9c9e Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Mon, 13 Jul 2020 13:44:12 +0200 Subject: [PATCH 23/23] Push structop version --- libsignal-service/Cargo.toml | 2 +- libsignal-service/examples/registering.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libsignal-service/Cargo.toml b/libsignal-service/Cargo.toml index 300010229..9bd7e04e7 100644 --- a/libsignal-service/Cargo.toml +++ b/libsignal-service/Cargo.toml @@ -27,7 +27,7 @@ aes = "0.4.0" block-modes = "0.5.0" [dev-dependencies] -structopt = "0.2.17" +structopt = "0.3.0" tokio = { version = "0.2", features=["macros"] } [build-dependencies] diff --git a/libsignal-service/examples/registering.rs b/libsignal-service/examples/registering.rs index 92f4c6b37..e52d8137c 100644 --- a/libsignal-service/examples/registering.rs +++ b/libsignal-service/examples/registering.rs @@ -86,7 +86,7 @@ pub struct Args { #[structopt( long = "user-agent", help = "The user agent to use when contacting servers", - raw(default_value = "libsignal_service::USER_AGENT") + default_value = "libsignal_service::USER_AGENT" )] pub user_agent: String, #[structopt(