Skip to content

Commit

Permalink
Complete interface for fetching messages.
Browse files Browse the repository at this point in the history
Halfy verified working with Whisperfish.
  • Loading branch information
rubdos committed Jun 20, 2020
1 parent 32852b1 commit 4436476
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 19 deletions.
3 changes: 3 additions & 0 deletions libsignal-service-actix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ libsignal-protocol = { git = "https://github.com/Michael-F-Bryan/libsignal-proto
awc = { version = "2.0.0-alpha.2", features=["rustls"] }
actix-rt = "1.1"
rustls = "0.17"
url = "2.1"
serde = "1.0"
log = "0.4.8"

failure = "0.1.5"
thiserror = "1.0"
Expand Down
4 changes: 4 additions & 0 deletions libsignal-service-actix/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
pub mod push_service;

pub mod prelude {
pub use crate::push_service::*;
}
77 changes: 71 additions & 6 deletions libsignal-service-actix/src/push_service.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,89 @@
use std::{sync::Arc, time::Duration};

use awc::Connector;
use libsignal_service::{configuration::*, push_service::*};
use serde::Deserialize;
use url::Url;

pub struct AwcPushService {
cfg: ServiceConfiguration,
base_url: Url,
client: awc::Client,
}

#[async_trait::async_trait(?Send)]
impl PushService for AwcPushService {
async fn get(&mut self, _path: &str) -> Result<(), ServiceError> { Ok(()) }
async fn get<T>(&mut self, path: &str) -> Result<T, ServiceError>
where
for<'de> T: Deserialize<'de>,
{
// In principle, we should be using http::uri::Uri,
// but that doesn't seem like an owned type where we can do this kind of
// constructions on.
// https://docs.rs/http/0.2.1/http/uri/struct.Uri.html
let url = self.base_url.join(path).expect("valid url");

log::debug!("AwcPushService::get({:?})", url);
let mut response =
self.client.get(url.as_str()).send().await.map_err(|e| {
ServiceError::SendError {
reason: e.to_string(),
}
})?;

log::debug!("AwcPushService::get response: {:?}", response);

ServiceError::from_status(response.status())?;

response
.json()
.await
.map_err(|e| ServiceError::JsonDecodeError {
reason: e.to_string(),
})
}
}

impl AwcPushService {
/// Creates a new AwcPushService
///
/// Panics on invalid service url.
pub fn new(
_cfg: ServiceConfiguration,
_credentials: Credentials,
cfg: ServiceConfiguration,
credentials: Credentials,
user_agent: &str,
root_ca: &str,
) -> Self {
let base_url =
Url::parse(&cfg.service_urls[0]).expect("valid service url");

// SSL setup
let mut ssl_config = rustls::ClientConfig::new();
ssl_config.alpn_protocols = vec![b"http/1.1".to_vec()];
ssl_config
.root_store
.add_pem_file(&mut std::io::Cursor::new(root_ca))
.unwrap();
let connector = Connector::new()
.rustls(Arc::new(ssl_config))
.timeout(Duration::from_secs(10)) // https://github.com/actix/actix-web/issues/1047
.finish();
let client = awc::ClientBuilder::new()
.connector(connector)
.header("X-Signal-Agent", user_agent)
.timeout(Duration::from_secs(65)); // as in Signal-Android

let client = if let Some((ident, pass)) = credentials.authorization() {
client.basic_auth(ident, Some(pass))
} else {
client
};
let client = client.finish();

Self {
client: awc::ClientBuilder::new()
.header("X-Signal-Agent", user_agent)
.finish(),
cfg,
base_url,
client,
}
}
}
2 changes: 1 addition & 1 deletion libsignal-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ async-trait = "0.1.30"
url = "2.1.1"
thiserror = "1.0"
serde = {version = "1.0", features=["derive"]}
serde_json = "1.0"
prost = "0.6"
http = "0.2.1"

[dev-dependencies]
structopt = "0.2.17"
Expand Down
4 changes: 2 additions & 2 deletions libsignal-service/examples/registering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ async fn main() -> Result<(), Error> {

let config = ServiceConfiguration::default();
let credentials = Credentials {
uuid: String::new(),
uuid: None,
e164: args.username,
password,
password: Some(password),
};

let service = PanicingPushService::new(
Expand Down
18 changes: 16 additions & 2 deletions libsignal-service/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,21 @@ pub struct ServiceConfiguration {
}

pub struct Credentials {
pub uuid: String,
pub uuid: Option<String>,
pub e164: String,
pub password: String,
pub password: Option<String>,
}

impl Credentials {
/// Kind-of equivalent with `PushServiceSocket::getAuthorizationHeader`
///
/// None when `self.password == None`
pub fn authorization(&self) -> Option<(&str, &str)> {
let identifier: &str = if let Some(uuid) = self.uuid.as_ref() {
uuid
} else {
&self.e164
};
Some((identifier, self.password.as_ref()?))
}
}
2 changes: 1 addition & 1 deletion libsignal-service/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub struct Envelope {

#[derive(serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct EnvelopeEntity {
pub struct EnvelopeEntity {
pub r#type: i32,
pub relay: String,
pub timestamp: i64,
Expand Down
7 changes: 7 additions & 0 deletions libsignal-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,10 @@ pub const USER_AGENT: &'static str =
concat!(env!("CARGO_PKG_NAME"), "-rs-", env!("CARGO_PKG_VERSION"));

pub struct TrustStore;

pub mod prelude {
pub use crate::{
configuration::{Credentials, ServiceConfiguration},
receiver::MessageReceiver,
};
}
59 changes: 54 additions & 5 deletions libsignal-service/src/push_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use crate::configuration::{Credentials, ServiceConfiguration};
use crate::{
configuration::{Credentials, ServiceConfiguration},
envelope::*,
};

use http::StatusCode;
use serde::Deserialize;

pub const CREATE_ACCOUNT_SMS_PATH: &str = "/v1/accounts/sms/code/%s?client=%s";
pub const CREATE_ACCOUNT_VOICE_PATH: &str = "/v1/accounts/voice/code/%s";
Expand All @@ -23,7 +29,7 @@ pub const DIRECTORY_TOKENS_PATH: &str = "/v1/directory/tokens";
pub const DIRECTORY_VERIFY_PATH: &str = "/v1/directory/%s";
pub const DIRECTORY_AUTH_PATH: &str = "/v1/directory/auth";
pub const DIRECTORY_FEEDBACK_PATH: &str = "/v1/directory/feedback-v3/%s";
pub const MESSAGE_PATH: &str = "/v1/messages/%s";
pub const MESSAGE_PATH: &str = "/v1/messages/"; // optionally with destination appended
pub const SENDER_ACK_MESSAGE_PATH: &str = "/v1/messages/%s/%d";
pub const UUID_ACK_MESSAGE_PATH: &str = "/v1/messages/uuid/%s";
pub const ATTACHMENT_PATH: &str = "/v2/attachments/form/upload";
Expand All @@ -46,18 +52,58 @@ pub enum SmsVerificationCodeResponse {
}

#[derive(thiserror::Error, Debug)]
pub enum ServiceError {}
pub enum ServiceError {
#[error("Error sending request: {reason}")]
SendError { reason: String },
#[error("Error decoding JSON response: {reason}")]
JsonDecodeError { reason: String },

#[error("Rate limit exceeded")]
RateLimitExceeded,
#[error("Authorization failed")]
Unauthorized,
#[error("Unexpected response: HTTP {http_code}")]
UnhandledResponseCode { http_code: u16 },
}

impl ServiceError {
pub fn from_status(code: http::StatusCode) -> Result<(), Self> {
match code {
StatusCode::OK => Ok(()),
StatusCode::NO_CONTENT => Ok(()),
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
Err(ServiceError::Unauthorized)
},
StatusCode::PAYLOAD_TOO_LARGE => {
// This is 413 and means rate limit exceeded for Signal.
Err(ServiceError::RateLimitExceeded)
},
// XXX: fill in rest from PushServiceSocket
_ => Err(ServiceError::UnhandledResponseCode {
http_code: code.as_u16(),
}),
}
}
}

#[async_trait::async_trait(?Send)]
pub trait PushService {
async fn get(&mut self, path: &str) -> Result<(), ServiceError>;
async fn get<T>(&mut self, path: &str) -> Result<T, ServiceError>
where
for<'de> T: Deserialize<'de>;

async fn request_sms_verification_code(
&mut self,
) -> Result<SmsVerificationCodeResponse, ServiceError> {
self.get(CREATE_ACCOUNT_SMS_PATH).await?;
Ok(SmsVerificationCodeResponse::SmsSent)
}

async fn get_messages(
&mut self,
) -> Result<Vec<EnvelopeEntity>, ServiceError> {
Ok(self.get(MESSAGE_PATH).await?)
}
}

/// PushService that panics on every request, mainly for example code.
Expand All @@ -77,7 +123,10 @@ impl PanicingPushService {

#[async_trait::async_trait(?Send)]
impl PushService for PanicingPushService {
async fn get(&mut self, path: &str) -> Result<(), ServiceError> {
async fn get<T>(&mut self, _path: &str) -> Result<T, ServiceError>
where
for<'de> T: Deserialize<'de>,
{
unimplemented!()
}
}
17 changes: 15 additions & 2 deletions libsignal-service/src/receiver.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{configuration::*, envelope::Envelope, push_service::PushService};
use crate::{configuration::*, envelope::Envelope, push_service::*};

use libsignal_protocol::StoreContext;

Expand All @@ -8,16 +8,29 @@ pub struct MessageReceiver<Service> {
context: StoreContext,
}

#[derive(thiserror::Error, Debug)]
pub enum MessageReceiverError {
#[error("ServiceError")]
ServiceError(#[from] ServiceError),
}

impl<Service: PushService> MessageReceiver<Service> {
pub fn new(service: Service, context: StoreContext) -> Self {
MessageReceiver { service, context }
}

/// One-off method to receive all pending messages.
///
/// Equivalent with Java's `SignalServiceMessageReceiver::retrieveMessages`.
///
/// For streaming messages, use a `MessagePipe` through
/// [`MessageReceiver::create_message_pipe()`].
pub async fn receive_messages(&mut self) -> Vec<Envelope> { vec![] }
pub async fn retrieve_messages(
&mut self,
) -> Result<Vec<Envelope>, MessageReceiverError> {
let _entities = self.service.get_messages().await?;
Ok(vec![])
}

pub async fn create_message_pipe(&self) -> () { unimplemented!() }
}

0 comments on commit 4436476

Please sign in to comment.