From dbf0fd8ccb7f1019cc20c8974e3b8347cbcbde01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Tue, 15 Oct 2024 18:31:12 +0200 Subject: [PATCH] Re-flatten crate structure and get rid of PushService trait This is a first step to refactor PushService and split the push_service.rs file in many smaller parts. --- .github/workflows/ci.yaml | 16 +- Cargo.toml | 65 +- libsignal-service/build.rs => build.rs | 0 .../certs => certs}/production-root-ca.pem | 0 .../certs => certs}/staging-root-ca.pem | 0 .../examples => examples}/storage.rs | 0 libsignal-service-actix/Cargo.toml | 42 - .../examples/registering.rs | 222 ------ libsignal-service-actix/src/lib.rs | 9 - libsignal-service-actix/src/push_service.rs | 735 ------------------ libsignal-service-actix/src/websocket.rs | 212 ----- libsignal-service-hyper/Cargo.toml | 44 -- .../examples/registering.rs | 196 ----- libsignal-service-hyper/src/lib.rs | 9 - libsignal-service-hyper/src/push_service.rs | 670 ---------------- libsignal-service/.gitignore | 1 - libsignal-service/Cargo.toml | 48 -- .../protobuf => protobuf}/DeviceName.proto | 0 .../protobuf => protobuf}/Groups.proto | 0 .../protobuf => protobuf}/Provisioning.proto | 0 .../protobuf => protobuf}/SignalService.proto | 0 .../StickerResources.proto | 0 .../UnidentifiedDelivery.proto | 0 .../WebSocketResources.proto | 0 .../protobuf => protobuf}/update-protos.sh | 0 .../src => src}/account_manager.rs | 10 +- .../src => src}/attachment_cipher.rs | 0 {libsignal-service/src => src}/cipher.rs | 0 .../src => src}/configuration.rs | 0 {libsignal-service/src => src}/content.rs | 0 .../src => src}/content/data_message.rs | 0 .../src => src}/content/story_message.rs | 0 .../src => src}/digeststream.rs | 0 {libsignal-service/src => src}/envelope.rs | 0 .../src => src}/groups_v2/manager.rs | 8 +- .../src => src}/groups_v2/mod.rs | 0 .../src => src}/groups_v2/model.rs | 0 .../src => src}/groups_v2/operations.rs | 0 .../src => src}/groups_v2/utils.rs | 0 {libsignal-service/src => src}/kat.bin.rs | 0 {libsignal-service/src => src}/lib.rs | 19 - {libsignal-service/src => src}/master_key.rs | 0 {libsignal-service/src => src}/messagepipe.rs | 6 +- {libsignal-service/src => src}/models.rs | 0 {libsignal-service/src => src}/pre_keys.rs | 0 .../src => src}/profile_cipher.rs | 0 .../src => src}/profile_name.rs | 0 .../src => src}/profile_service.rs | 0 {libsignal-service/src => src}/proto.rs | 0 .../src => src}/provisioning/cipher.rs | 0 .../src => src}/provisioning/mod.rs | 3 +- .../src => src}/provisioning/pipe.rs | 0 .../src => src}/push_service.rs | 692 ++++++++++++++--- {libsignal-service/src => src}/receiver.rs | 8 +- {libsignal-service/src => src}/sender.rs | 9 +- .../src => src}/service_address.rs | 0 .../src => src}/session_store.rs | 0 .../src => src}/sticker_cipher.rs | 0 {libsignal-service/src => src}/timestamp.rs | 0 .../src => src}/unidentified_access.rs | 0 {libsignal-service/src => src}/utils.rs | 0 .../websocket/attachment_service.rs | 0 .../src/websocket.rs => src/websocket/mod.rs | 1 + .../src => src}/websocket/sender.rs | 0 .../websocket/tungstenite.rs | 45 +- 65 files changed, 686 insertions(+), 2384 deletions(-) rename libsignal-service/build.rs => build.rs (100%) rename {libsignal-service/certs => certs}/production-root-ca.pem (100%) rename {libsignal-service/certs => certs}/staging-root-ca.pem (100%) rename {libsignal-service/examples => examples}/storage.rs (100%) delete mode 100644 libsignal-service-actix/Cargo.toml delete mode 100644 libsignal-service-actix/examples/registering.rs delete mode 100644 libsignal-service-actix/src/lib.rs delete mode 100644 libsignal-service-actix/src/push_service.rs delete mode 100644 libsignal-service-actix/src/websocket.rs delete mode 100644 libsignal-service-hyper/Cargo.toml delete mode 100644 libsignal-service-hyper/examples/registering.rs delete mode 100644 libsignal-service-hyper/src/lib.rs delete mode 100644 libsignal-service-hyper/src/push_service.rs delete mode 100644 libsignal-service/.gitignore delete mode 100644 libsignal-service/Cargo.toml rename {libsignal-service/protobuf => protobuf}/DeviceName.proto (100%) rename {libsignal-service/protobuf => protobuf}/Groups.proto (100%) rename {libsignal-service/protobuf => protobuf}/Provisioning.proto (100%) rename {libsignal-service/protobuf => protobuf}/SignalService.proto (100%) rename {libsignal-service/protobuf => protobuf}/StickerResources.proto (100%) rename {libsignal-service/protobuf => protobuf}/UnidentifiedDelivery.proto (100%) rename {libsignal-service/protobuf => protobuf}/WebSocketResources.proto (100%) rename {libsignal-service/protobuf => protobuf}/update-protos.sh (100%) rename {libsignal-service/src => src}/account_manager.rs (99%) rename {libsignal-service/src => src}/attachment_cipher.rs (100%) rename {libsignal-service/src => src}/cipher.rs (100%) rename {libsignal-service/src => src}/configuration.rs (100%) rename {libsignal-service/src => src}/content.rs (100%) rename {libsignal-service/src => src}/content/data_message.rs (100%) rename {libsignal-service/src => src}/content/story_message.rs (100%) rename {libsignal-service/src => src}/digeststream.rs (100%) rename {libsignal-service/src => src}/envelope.rs (100%) rename {libsignal-service/src => src}/groups_v2/manager.rs (98%) rename {libsignal-service/src => src}/groups_v2/mod.rs (100%) rename {libsignal-service/src => src}/groups_v2/model.rs (100%) rename {libsignal-service/src => src}/groups_v2/operations.rs (100%) rename {libsignal-service/src => src}/groups_v2/utils.rs (100%) rename {libsignal-service/src => src}/kat.bin.rs (100%) rename {libsignal-service/src => src}/lib.rs (75%) rename {libsignal-service/src => src}/master_key.rs (100%) rename {libsignal-service/src => src}/messagepipe.rs (93%) rename {libsignal-service/src => src}/models.rs (100%) rename {libsignal-service/src => src}/pre_keys.rs (100%) rename {libsignal-service/src => src}/profile_cipher.rs (100%) rename {libsignal-service/src => src}/profile_name.rs (100%) rename {libsignal-service/src => src}/profile_service.rs (100%) rename {libsignal-service/src => src}/proto.rs (100%) rename {libsignal-service/src => src}/provisioning/cipher.rs (100%) rename {libsignal-service/src => src}/provisioning/mod.rs (99%) rename {libsignal-service/src => src}/provisioning/pipe.rs (100%) rename {libsignal-service/src => src}/push_service.rs (66%) rename {libsignal-service/src => src}/receiver.rs (96%) rename {libsignal-service/src => src}/sender.rs (99%) rename {libsignal-service/src => src}/service_address.rs (100%) rename {libsignal-service/src => src}/session_store.rs (100%) rename {libsignal-service/src => src}/sticker_cipher.rs (100%) rename {libsignal-service/src => src}/timestamp.rs (100%) rename {libsignal-service/src => src}/unidentified_access.rs (100%) rename {libsignal-service/src => src}/utils.rs (100%) rename {libsignal-service/src => src}/websocket/attachment_service.rs (100%) rename libsignal-service/src/websocket.rs => src/websocket/mod.rs (99%) rename {libsignal-service/src => src}/websocket/sender.rs (100%) rename libsignal-service-hyper/src/websocket.rs => src/websocket/tungstenite.rs (80%) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 6544b851f..fc15a0d24 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -22,12 +22,7 @@ jobs: strategy: fail-fast: false matrix: - project: ["libsignal-service-actix", "libsignal-service-hyper", "libsignal-service"] - features: ["", "unsend-futures"] - exclude: - # -actix always has unsend futures, so we don't have that feature flag - - project: "libsignal-service-actix" - features: "unsend-futures" + project: ["libsignal-service"] steps: - uses: actions/checkout@v3 - name: Install protobuf @@ -45,7 +40,7 @@ jobs: strategy: fail-fast: false matrix: - project: ["libsignal-service-actix", "libsignal-service-hyper", "libsignal-service"] + project: ["libsignal-service"] toolchain: ["stable", "beta", "nightly"] coverage: [false, true] features: ["", "unsend-futures"] @@ -60,17 +55,10 @@ jobs: # Feature flag related excludes # Actix like above - - project: "libsignal-service-actix" - features: "unsend-futures" - # We don't need to spawn this many jobs to see that unsend-futures works - features: "unsend-futures" toolchain: "beta" - features: "unsend-futures" toolchain: "nightly" - include: - - project: "libsignal-service-actix" - toolchain: "1.75" - coverage: false steps: - uses: actions/checkout@v3 - name: Install protobuf diff --git a/Cargo.toml b/Cargo.toml index ebd59bfb8..fc5f3e8de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,65 @@ -[workspace] -members = ["libsignal-service", "libsignal-service-actix", "libsignal-service-hyper"] -default-members = ["libsignal-service", "libsignal-service-hyper"] +[package] +name = "libsignal-service" +version = "0.1.0" +authors = ["Ruben De Smet ", "Gabriel Féron ", "Michael Bryan ", "Shady Khalifa "] +edition = "2021" +license = "AGPL-3.0" +readme = "../README.md" -resolver = "2" +[dependencies] +libsignal-protocol = { git = "https://github.com/signalapp/libsignal", tag = "v0.56.1" } +zkgroup = { git = "https://github.com/signalapp/libsignal", tag = "v0.56.1" } + +aes = "0.8" +aes-gcm = "0.10" +cbc = "0.1" +ctr = "0.9" +async-trait = "0.1" +base64 = "0.22" +bincode = "1.3" +bytes = "1" +chrono = { version = "0.4", features = ["serde", "clock"], default-features = false } +derivative = "2.2" +futures = "0.3" +hex = "0.4" +hkdf = "0.12" +hmac = "0.12" +phonenumber = "0.3" +prost = "0.13" +rand = "0.8" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0.85" +sha2 = "0.10" +thiserror = "1.0" +url = { version = "2.1", features = ["serde"] } +uuid = { version = "1", features = ["serde"] } + +# http +hyper = "1.0" +hyper-util = { version = "0.1", features = ["client", "client-legacy"] } +hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "ring", "logging"] } +hyper-timeout = "0.5" +headers = "0.4" +http-body-util = "0.1" +mpart-async = "0.7" +async-tungstenite = { version = "0.27", features = ["tokio-rustls-native-certs", "url"] } +tokio = { version = "1.0", features = ["macros"] } +tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } + +rustls-pemfile = "2.0" + +tracing = { version = "0.1", features = ["log"] } +tracing-futures = "0.2" + +[build-dependencies] +prost-build = "0.13" + +[dev-dependencies] +anyhow = "1.0" +tokio = { version = "1.0", features = ["macros", "rt"] } + +[features] +unsend-futures = [] [patch.crates-io] curve25519-dalek = { git = 'https://github.com/signalapp/curve25519-dalek', tag = 'signal-curve25519-4.1.3' } diff --git a/libsignal-service/build.rs b/build.rs similarity index 100% rename from libsignal-service/build.rs rename to build.rs diff --git a/libsignal-service/certs/production-root-ca.pem b/certs/production-root-ca.pem similarity index 100% rename from libsignal-service/certs/production-root-ca.pem rename to certs/production-root-ca.pem diff --git a/libsignal-service/certs/staging-root-ca.pem b/certs/staging-root-ca.pem similarity index 100% rename from libsignal-service/certs/staging-root-ca.pem rename to certs/staging-root-ca.pem diff --git a/libsignal-service/examples/storage.rs b/examples/storage.rs similarity index 100% rename from libsignal-service/examples/storage.rs rename to examples/storage.rs diff --git a/libsignal-service-actix/Cargo.toml b/libsignal-service-actix/Cargo.toml deleted file mode 100644 index 350db0963..000000000 --- a/libsignal-service-actix/Cargo.toml +++ /dev/null @@ -1,42 +0,0 @@ -[package] -name = "libsignal-service-actix" -version = "0.1.0" -authors = ["Ruben De Smet "] -edition = "2021" -license = "AGPL-3.0" -rust-version = "1.70.0" - -[dependencies] -# Contrary to hyper, actix does not have Send compatible futures, which means -# the Send requirement in libsignal-service needs to be lifted by enabling `unsend-futures`. -libsignal-service = { path = "../libsignal-service", features = ["unsend-futures"] } - -awc = { version = "3.2.0", features = ["rustls-0_21"] } -actix = "0.13" -actix-http = "3.2.0" -actix-rt = "2.4" -mpart-async = "0.6" -serde_json = "1.0" -futures = "0.3" -tracing = "0.1" -tracing-futures = "0.2" -bytes = "1" -rustls = "0.21" -rustls-pemfile = "0.3" -url = "2.1" -serde = "1.0" -rand = "0.8" - -thiserror = "1.0" -async-trait = "0.1" - -phonenumber = "0.3" - -[dev-dependencies] -chrono = "0.4" -image = { version = "0.23", default-features = false, features = ["png"] } -opener = "0.5" -qrcode = "0.12" -structopt = "0.3" -tokio = { version = "1", features = ["macros"] } -anyhow = "1.0" diff --git a/libsignal-service-actix/examples/registering.rs b/libsignal-service-actix/examples/registering.rs deleted file mode 100644 index c9ce6076d..000000000 --- a/libsignal-service-actix/examples/registering.rs +++ /dev/null @@ -1,222 +0,0 @@ -//! At install time, clients need to register with the Signal server. -//! -//! ```java -//! private final String URL = "https://my.signal.server.com"; -//! private final TrustStore TRUST_STORE = new MyTrustStoreImpl(); -//! private final String USERNAME = "+14151231234"; -//! private final String PASSWORD = generateRandomPassword(); -//! private final String USER_AGENT = "[FILL_IN]"; -//! -//! SignalServiceAccountManager accountManager = new SignalServiceAccountManager(URL, TRUST_STORE, -//! USERNAME, PASSWORD, USER_AGENT); -//! -//! accountManager.requestSmsVerificationCode(); -//! accountManager.verifyAccountWithCode(receivedSmsVerificationCode, generateRandomSignalingKey(), -//! generateRandomInstallId(), false); -//! accountManager.setGcmId(Optional.of(GoogleCloudMessaging.getInstance(this).register(REGISTRATION_ID))); -//! accountManager.setPreKeys(identityKey.getPublicKey(), lastResortKey, signedPreKeyRecord, oneTimePreKeys); -//! ``` - -use anyhow::Error; -use libsignal_service::configuration::SignalServers; -use libsignal_service::prelude::{ProfileKey, ServiceCredentials}; -use libsignal_service::provisioning::generate_registration_id; -use libsignal_service::push_service::{ - AccountAttributes, DeviceCapabilities, PushService, RegistrationMethod, - VerificationTransport, -}; -use libsignal_service::{AccountManager, USER_AGENT}; -use libsignal_service_actix::prelude::AwcPushService; -use rand::RngCore; -use structopt::StructOpt; - -#[path = "../../libsignal-service/examples/storage.rs"] -mod storage; - -#[actix_rt::main] -async fn main() -> Result<(), Error> { - let client = "libsignal-service-hyper-example"; - let use_voice = false; - - let Args { - servers, - phonenumber, - password, - captcha, - } = Args::from_args(); - - let push_token = None; - // Mobile country code and mobile network code can in theory be extracted from the phone - // number, but it's not necessary for the API to function correctly. - // XXX: We could internalize this if statement to create_verification_session - let (mcc, mnc) = if let Some(carrier) = phonenumber.carrier() { - (Some(&carrier[0..3]), Some(&carrier[3..])) - } else { - (None, None) - }; - - // Only used with MessageSender and MessageReceiver - // let password = args.get_password()?; - - let mut push_service = AwcPushService::new( - servers, - Some(ServiceCredentials { - aci: None, - pni: None, - phonenumber: phonenumber.clone(), - password, - signaling_key: None, - device_id: None, - }), - USER_AGENT.into(), - ); - - let mut session = push_service - .create_verification_session( - &phonenumber.to_string(), - push_token, - mcc, - mnc, - ) - .await - .expect("create a registration verification session"); - println!("Sending registration request..."); - - if session.captcha_required() { - session = push_service - .patch_verification_session( - &session.id, - None, - None, - None, - captcha.as_deref(), - None, - ) - .await - .expect("submit captcha"); - } - - if session.push_challenge_required() { - anyhow::bail!("Push challenge required, but not implemented."); - } - - if !session.allowed_to_request_code { - anyhow::bail!( - "Not allowed to request verification code, reason unknown: {session:?}", - ); - } - - session = push_service - .request_verification_code( - &session.id, - client, - if use_voice { - VerificationTransport::Voice - } else { - VerificationTransport::Sms - }, - ) - .await - .expect("request verification code"); - - let confirmation_code = let_user_enter_confirmation_code(); - - println!("Submitting confirmation code..."); - - session = push_service - .submit_verification_code(&session.id, confirmation_code) - .await - .expect("Sending confirmation code failed."); - - if !session.verified { - anyhow::bail!("Session is not verified"); - } - - let registration_id = generate_registration_id(&mut rand::thread_rng()); - let pni_registration_id = generate_registration_id(&mut rand::thread_rng()); - let signaling_key = generate_signaling_key(); - let mut profile_key = [0u8; 32]; - rand::thread_rng().fill_bytes(&mut profile_key); - let profile_key = ProfileKey::create(profile_key); - let skip_device_transfer = false; - - // Create the prekeys storage - let mut aci_store = storage::ExampleStore::new(); - let mut pni_store = storage::ExampleStore::new(); - - let mut account_manager = AccountManager::new(push_service, None); - let _registration_data = account_manager - .register_account( - &mut rand::thread_rng(), - RegistrationMethod::SessionId(&session.id), - AccountAttributes { - signaling_key: Some(signaling_key.to_vec()), - registration_id, - pni_registration_id, - voice: false, - video: false, - fetches_messages: true, - pin: None, - registration_lock: None, - unidentified_access_key: Some( - profile_key.derive_access_key().to_vec(), - ), - unrestricted_unidentified_access: false, // TODO: make this configurable? - discoverable_by_phone_number: true, - name: Some("libsignal-service-hyper test".into()), - capabilities: DeviceCapabilities::default(), - }, - &mut aci_store, - &mut pni_store, - skip_device_transfer, - ) - .await; - - // You would want to store the registration data - - println!("Registration completed!"); - - Ok(()) -} - -fn let_user_enter_confirmation_code() -> &'static str { - "12345" -} - -fn generate_signaling_key() -> [u8; 52] { - // Signaling key that decrypts the incoming Signal messages - let mut rng = rand::thread_rng(); - let mut signaling_key = [0u8; 52]; - rng.fill_bytes(&mut signaling_key); - signaling_key -} - -#[derive(Debug, Clone, PartialEq, Eq, StructOpt)] -pub struct Args { - #[structopt( - short = "s", - long = "servers", - help = "The servers to connect to", - default_value = "staging" - )] - pub servers: SignalServers, - #[structopt( - short = "u", - long = "username", - help = "Your username or other identifier", - default_value = "+14151231234" - )] - pub phonenumber: phonenumber::PhoneNumber, - #[structopt( - short = "p", - long = "password", - help = "The password to use. Read from stdin if not provided" - )] - pub password: Option, - #[structopt( - short = "c", - long = "captcha", - help = "Captcha for registration" - )] - pub captcha: Option, -} diff --git a/libsignal-service-actix/src/lib.rs b/libsignal-service-actix/src/lib.rs deleted file mode 100644 index 84388618e..000000000 --- a/libsignal-service-actix/src/lib.rs +++ /dev/null @@ -1,9 +0,0 @@ -#![recursion_limit = "256"] -#![allow(clippy::uninlined_format_args)] - -pub mod push_service; -pub mod websocket; - -pub mod prelude { - pub use crate::push_service::*; -} diff --git a/libsignal-service-actix/src/push_service.rs b/libsignal-service-actix/src/push_service.rs deleted file mode 100644 index 47f26dd84..000000000 --- a/libsignal-service-actix/src/push_service.rs +++ /dev/null @@ -1,735 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use awc::{ - error::{ConnectError, PayloadError, SendRequestError}, - http::StatusCode, - http::{header::HeaderValue, Method}, - Client, ClientRequest, ClientResponse, Connector, -}; -use bytes::Bytes; -use futures::prelude::*; -use libsignal_service::{ - configuration::*, prelude::ProtobufMessage, push_service::*, - websocket::SignalWebSocket, -}; -use serde::{Deserialize, Serialize}; -use tracing_futures::Instrument; - -use crate::websocket::AwcWebSocket; - -#[derive(Clone)] -pub struct AwcPushService { - cfg: ServiceConfiguration, - credentials: Option, - client: awc::Client, -} - -impl AwcPushService { - pub fn new( - cfg: impl Into, - credentials: Option, - user_agent: String, - ) -> Self { - let cfg = cfg.into(); - let client = get_client(&cfg, user_agent); - Self { - cfg, - credentials: credentials.and_then(|c| c.authorization()), - client, - } - } - - fn request( - &self, - method: Method, - endpoint: Endpoint, - path: impl AsRef, - additional_headers: &[(&str, &str)], - credentials_override: HttpAuthOverride, - ) -> Result { - let url = self.cfg.base_url(endpoint).join(path.as_ref())?; - tracing::debug!(%url, %method, "HTTP request"); - let mut builder = self.client.request(method, url.as_str()); - for &header in additional_headers { - builder = builder.insert_header(header); - } - builder = match credentials_override { - HttpAuthOverride::NoOverride => { - if let Some(credentials) = self.credentials.as_ref() { - builder.basic_auth( - &credentials.username, - &credentials.password, - ) - } else { - builder - } - }, - HttpAuthOverride::Identified(HttpAuth { username, password }) => { - builder.basic_auth(username, password) - }, - HttpAuthOverride::Unidentified => builder, - }; - Ok(builder) - } - - fn json(text: &[u8]) -> Result - where - for<'de> T: Deserialize<'de>, - { - let value = if text.is_empty() { - serde_json::from_value(serde_json::Value::Null) - } else { - serde_json::from_slice(text) - }; - value.map_err(|e| ServiceError::JsonDecodeError { - reason: e.to_string(), - }) - } - - #[tracing::instrument(name = "extracting error", skip(response))] - async fn from_response( - response: &mut ClientResponse, - ) -> Result<(), ServiceError> - where - S: Stream> + Unpin, - { - match response.status() { - StatusCode::OK => Ok(()), - StatusCode::NO_CONTENT => Ok(()), - StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => { - Err(ServiceError::Unauthorized) - }, - StatusCode::NOT_FOUND => { - // This is 404 and means that e.g. recipient is not registered - Err(ServiceError::NotFoundError) - }, - StatusCode::PAYLOAD_TOO_LARGE => { - // This is 413 and means rate limit exceeded for Signal. - Err(ServiceError::RateLimitExceeded) - }, - StatusCode::CONFLICT => { - let mismatched_devices = - response.json().await.map_err(|e| { - tracing::error!( - ?response, - "Failed to decode HTTP 409 response: {}", - e - ); - ServiceError::UnhandledResponseCode { - http_code: StatusCode::CONFLICT.as_u16(), - } - })?; - Err(ServiceError::MismatchedDevicesException( - mismatched_devices, - )) - }, - StatusCode::GONE => { - let stale_devices = response.json().await.map_err(|e| { - tracing::error!( - ?response, - "Failed to decode HTTP 410 response: {}", - e - ); - ServiceError::UnhandledResponseCode { - http_code: StatusCode::GONE.as_u16(), - } - })?; - Err(ServiceError::StaleDevices(stale_devices)) - }, - StatusCode::LOCKED => { - let locked = response.json().await.map_err(|e| { - tracing::error!( - ?response, - "Failed to decode HTTP 423 response: {}", - e - ); - ServiceError::UnhandledResponseCode { - http_code: StatusCode::LOCKED.as_u16(), - } - })?; - Err(ServiceError::Locked(locked)) - }, - StatusCode::PRECONDITION_REQUIRED => { - let proof_required = response.json().await.map_err(|e| { - tracing::error!( - ?response, - "Failed to decode HTTP 428 response: {}", - e - ); - ServiceError::UnhandledResponseCode { - http_code: StatusCode::PRECONDITION_REQUIRED.as_u16(), - } - })?; - Err(ServiceError::ProofRequiredError(proof_required)) - }, - // XXX: fill in rest from PushServiceSocket - code => { - let contents = response.body().await; - tracing::trace!( - ?response, - "Unhandled response {} with body: {:?}", - code.as_u16(), - contents, - ); - Err(ServiceError::UnhandledResponseCode { - http_code: code.as_u16(), - }) - }, - } - } -} - -#[async_trait::async_trait(?Send)] -impl PushService for AwcPushService { - // This is in principle known at compile time, but long to write out. - type ByteStream = Box; - - async fn get_json( - &mut self, - endpoint: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - credentials_override: HttpAuthOverride, - ) -> Result - where - for<'de> T: Deserialize<'de>, - { - use awc::error::{ConnectError, SendRequestError}; - let mut response = self - .request( - Method::GET, - endpoint, - path, - additional_headers, - credentials_override, - )? - .send() - .await - .map_err(|e| match e { - SendRequestError::Connect(ConnectError::Timeout) => { - ServiceError::Timeout { - reason: e.to_string(), - } - }, - _ => ServiceError::SendError { - reason: e.to_string(), - }, - })?; - - let _span = tracing::debug_span!("processing response", ?response); - - Self::from_response(&mut response).await?; - - // In order to catch the zero-length output, we have to collect - // the whole response. The actix-web api is meant to used as a - // streaming deserializer, so we have this little awkward match. - // - // This is also the reason we depend directly on serde_json, however - // actix already imports that anyway. - let text = match response.body().await { - Ok(text) => { - tracing::debug!( - "GET response: {:?}", - String::from_utf8_lossy(&text) - ); - text - }, - Err(e) => { - return Err(ServiceError::ResponseError { - reason: e.to_string(), - }) - }, - }; - Self::json(&text) - } - - /// Deletes a resource through the HTTP DELETE verb. - async fn delete_json( - &mut self, - endpoint: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - ) -> Result - where - for<'de> T: Deserialize<'de>, - { - let mut response = self - .request( - Method::DELETE, - endpoint, - path, - additional_headers, - HttpAuthOverride::NoOverride, - )? - .send() - .await - .map_err(|e| match e { - SendRequestError::Connect(ConnectError::Timeout) => { - ServiceError::Timeout { - reason: e.to_string(), - } - }, - _ => ServiceError::SendError { - reason: e.to_string(), - }, - })?; - - let _span = - tracing::debug_span!("processing response", ?response).entered(); - - Self::from_response(&mut response).await?; - - // In order to catch the zero-length output, we have to collect - // the whole response. The actix-web api is meant to used as a - // streaming deserializer, so we have this little awkward match. - // - // This is also the reason we depend directly on serde_json, however - // actix already imports that anyway. - let text = match response.body().await { - Ok(text) => { - tracing::debug!( - "GET response: {:?}", - String::from_utf8_lossy(&text) - ); - text - }, - Err(e) => { - return Err(ServiceError::ResponseError { - reason: e.to_string(), - }) - }, - }; - - Self::json(&text) - } - - async fn put_json( - &mut self, - endpoint: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - credentials_override: HttpAuthOverride, - value: S, - ) -> Result - where - for<'de> D: Deserialize<'de>, - S: Serialize, - { - let mut response = self - .request( - Method::PUT, - endpoint, - path, - additional_headers, - credentials_override, - )? - .send_json(&value) - .await - .map_err(|e| ServiceError::SendError { - reason: e.to_string(), - })?; - - let _span = - tracing::debug_span!("processing response", ?response).entered(); - - Self::from_response(&mut response).await?; - - // In order to catch the zero-length output, we have to collect - // the whole response. The actix-web api is meant to used as a - // streaming deserializer, so we have this little awkward match. - // - // This is also the reason we depend directly on serde_json, however - // actix already imports that anyway. - let text = match response.body().await { - Ok(text) => { - tracing::debug!( - "GET response: {:?}", - String::from_utf8_lossy(&text) - ); - text - }, - Err(e) => { - return Err(ServiceError::ResponseError { - reason: e.to_string(), - }) - }, - }; - Self::json(&text) - } - - async fn patch_json( - &mut self, - endpoint: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - credentials_override: HttpAuthOverride, - value: S, - ) -> Result - where - for<'de> D: Deserialize<'de>, - S: Serialize, - { - let mut response = self - .request( - Method::PATCH, - endpoint, - path, - additional_headers, - credentials_override, - )? - .send_json(&value) - .await - .map_err(|e| ServiceError::SendError { - reason: e.to_string(), - })?; - - let _span = - tracing::debug_span!("processing response", ?response).entered(); - - Self::from_response(&mut response).await?; - - // In order to catch the zero-length output, we have to collect - // the whole response. The actix-web api is meant to used as a - // streaming deserializer, so we have this little awkward match. - // - // This is also the reason we depend directly on serde_json, however - // actix already imports that anyway. - let text = match response.body().await { - Ok(text) => { - tracing::debug!( - "PATCH response: {:?}", - String::from_utf8_lossy(&text) - ); - text - }, - Err(e) => { - return Err(ServiceError::ResponseError { - reason: e.to_string(), - }) - }, - }; - Self::json(&text) - } - - async fn post_json( - &mut self, - endpoint: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - credentials_override: HttpAuthOverride, - value: S, - ) -> Result - where - for<'de> D: Deserialize<'de>, - S: Serialize, - { - let mut response = self - .request( - Method::POST, - endpoint, - path, - additional_headers, - credentials_override, - )? - .send_json(&value) - .await - .map_err(|e| ServiceError::SendError { - reason: e.to_string(), - })?; - - let _span = - tracing::debug_span!("processing response", ?response).entered(); - - Self::from_response(&mut response).await?; - - // In order to catch the zero-length output, we have to collect - // the whole response. The actix-web api is meant to used as a - // streaming deserializer, so we have this little awkward match. - // - // This is also the reason we depend directly on serde_json, however - // actix already imports that anyway. - let text = match response.body().await { - Ok(text) => { - tracing::debug!( - "GET response: {:?}", - String::from_utf8_lossy(&text) - ); - text - }, - Err(e) => { - return Err(ServiceError::ResponseError { - reason: e.to_string(), - }) - }, - }; - Self::json(&text) - } - - async fn get_protobuf( - &mut self, - endpoint: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - credentials_override: HttpAuthOverride, - ) -> Result - where - T: Default + ProtobufMessage, - { - let mut response = self - .request( - Method::GET, - endpoint, - path, - additional_headers, - credentials_override, - )? - .send() - .await - .map_err(|e| ServiceError::SendError { - reason: e.to_string(), - })?; - - let text = - response - .body() - .await - .map_err(|e| ServiceError::ResponseError { - reason: e.to_string(), - })?; - Ok(T::decode(text)?) - } - - async fn put_protobuf( - &mut self, - endpoint: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - value: S, - ) -> Result - where - D: Default + ProtobufMessage, - S: Sized + ProtobufMessage, - { - let buf = value.encode_to_vec(); - - let mut response = self - .request( - Method::PUT, - endpoint, - path, - additional_headers, - HttpAuthOverride::NoOverride, - )? - .content_type(HeaderValue::from_static("application/x-protobuf")) - .send_body(buf) - .await - .map_err(|e| ServiceError::SendError { - reason: e.to_string(), - })?; - - let text = - response - .body() - .await - .map_err(|e| ServiceError::ResponseError { - reason: e.to_string(), - })?; - Ok(D::decode(text)?) - } - - async fn get_from_cdn( - &mut self, - cdn_id: u32, - path: &str, - ) -> Result { - let mut response = self - .request( - Method::GET, - Endpoint::Cdn(cdn_id), - path, - &[], - HttpAuthOverride::Unidentified, - )? - .send() - .await - .map_err(|e| ServiceError::SendError { - reason: e.to_string(), - })?; - - let _span = - tracing::debug_span!("processing response", ?response).entered(); - - Self::from_response(&mut response).await?; - - Ok(Box::new( - response - .map_err(|e| { - use awc::error::PayloadError; - match e { - PayloadError::Io(e) => e, - other => std::io::Error::new( - std::io::ErrorKind::Other, - other, - ), - } - }) - .into_async_read(), - )) - } - - async fn post_to_cdn0<'s, C: std::io::Read + Send + 's>( - &mut self, - path: &str, - value: &[(&str, &str)], - file: Option<(&str, &'s mut C)>, - ) -> Result<(), ServiceError> { - let request = self.request( - Method::POST, - Endpoint::Cdn(0), - path, - &[], - HttpAuthOverride::NoOverride, - )?; - - let mut form = mpart_async::client::MultipartRequest::default(); - - // mpart-async has a peculiar ordering of the form items, - // and Amazon S3 expects them in a very specific order (i.e., the file contents should - // go last. - // - // mpart-async uses a VecDeque internally for ordering the fields in the order given. - // - // https://github.com/cetra3/mpart-async/issues/16 - - for &(k, v) in value { - form.add_field(k, v); - } - - if let Some((filename, file)) = file { - // XXX Actix doesn't cope with none-'static lifetimes - // https://docs.rs/actix-web/3.2.0/actix_web/body/enum.Body.html - let mut buf = Vec::new(); - file.read_to_end(&mut buf) - .expect("infallible Read instance"); - form.add_stream( - "file", - filename, - "application/octet-stream", - futures::future::ok::<_, ()>(Bytes::from(buf)).into_stream(), - ); - } - - let content_type = - format!("multipart/form-data; boundary={}", form.get_boundary()); - - // XXX Amazon S3 needs the Content-Length, but we don't know it without depleting the whole - // stream. Sadly, Content-Length != contents.len(), but should include the whole form. - let mut body_contents = vec![]; - use futures::stream::StreamExt; - while let Some(b) = form.next().await { - // Unwrap, because no error type was used above - body_contents.extend(b.unwrap()); - } - tracing::trace!( - "Sending PUT with Content-Type={} and length {}", - content_type, - body_contents.len() - ); - - let mut response = request - .content_type(&content_type) - .content_length(body_contents.len() as u64) - .send_body(body_contents) - .await - .map_err(|e| ServiceError::SendError { - reason: e.to_string(), - })?; - - let _span = - tracing::debug_span!("processing response", ?response).entered(); - - Self::from_response(&mut response).await?; - - Ok(()) - } - - async fn ws( - &mut self, - path: &str, - keep_alive_path: &str, - additional_headers: &[(&str, &str)], - credentials: Option, - ) -> Result { - let span = tracing::debug_span!("websocket"); - let (ws, stream) = AwcWebSocket::with_client( - &mut self.client, - self.cfg.base_url(Endpoint::Service), - path, - additional_headers, - credentials.as_ref(), - ) - .instrument(span.clone()) - .await?; - let (ws, task) = SignalWebSocket::from_socket( - ws, - stream, - keep_alive_path.to_owned(), - ); - actix_rt::spawn(task.instrument(span)); - Ok(ws) - } -} - -/// Creates a `awc::Client` with usable default settings: -/// Creates a default `awc::Client`. -/// -/// Creates a `awc::Client` with usable default settings: -/// * certificate authority from the `ServiceConfiguration` -/// * 10s timeout on TCP connection -/// * 65s timeout on HTTP request -/// * provided user-agent -fn get_client(cfg: &ServiceConfiguration, user_agent: String) -> Client { - let mut cert_bytes = std::io::Cursor::new(&cfg.certificate_authority); - let roots = - rustls_pemfile::certs(&mut cert_bytes).expect("parseable PEM files"); - let roots = roots.iter().map(|v| rustls::Certificate(v.clone())); - - let mut root_certs = rustls::RootCertStore::empty(); - for root in roots { - root_certs.add(&root).unwrap(); - } - - let mut ssl_config = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(root_certs) - .with_no_client_auth(); - ssl_config.alpn_protocols = vec![b"http/1.1".to_vec()]; - - let connector = Connector::new() - .rustls_021(Arc::new(ssl_config)) - .timeout(Duration::from_secs(10)); // https://github.com/actix/actix-web/issues/1047 - let client = awc::ClientBuilder::new() - .connector(connector) - .add_default_header(("X-Signal-Agent", user_agent.clone())) - .add_default_header(("User-Agent", user_agent)) - .timeout(Duration::from_secs(65)); // as in Signal-Android - - client.finish() -} - -#[cfg(test)] -mod tests { - use libsignal_service::configuration::SignalServers; - - #[test] - fn create_clients() { - let configs = &[SignalServers::Staging, SignalServers::Production]; - - for cfg in configs { - let _ = super::get_client( - &cfg.into(), - "libsignal-service test".to_string(), - ); - } - } -} diff --git a/libsignal-service-actix/src/websocket.rs b/libsignal-service-actix/src/websocket.rs deleted file mode 100644 index 0019afc6d..000000000 --- a/libsignal-service-actix/src/websocket.rs +++ /dev/null @@ -1,212 +0,0 @@ -use awc::{ - error::{WsClientError, WsProtocolError}, - http::StatusCode, - ws, - ws::Frame, -}; -use bytes::Bytes; -use futures::{channel::mpsc::*, prelude::*}; -use url::Url; - -use libsignal_service::{ - configuration::ServiceCredentials, - messagepipe::*, - push_service::{self, ServiceError}, -}; - -pub struct AwcWebSocket { - socket_sink: Box + Unpin>, -} - -#[derive(thiserror::Error, Debug)] -pub enum AwcWebSocketError { - #[error("Could not connect to the Signal Server")] - ConnectionError(#[from] awc::error::WsClientError), - #[error("Error during Websocket connection")] - ProtocolError(#[from] WsProtocolError), -} - -impl From for ServiceError { - fn from(e: AwcWebSocketError) -> ServiceError { - match e { - AwcWebSocketError::ConnectionError(e) => match e { - WsClientError::InvalidResponseStatus(s) => match s { - StatusCode::FORBIDDEN => ServiceError::Unauthorized, - s => ServiceError::WsError { - reason: format!("HTTP status {}", s), - }, - }, - e => ServiceError::WsError { - reason: e.to_string(), - }, - }, - AwcWebSocketError::ProtocolError(e) => match e { - WsProtocolError::Io(e) => match e.kind() { - std::io::ErrorKind::UnexpectedEof => { - ServiceError::WsClosing { - reason: format!( - "WebSocket closing due to unexpected EOF: {}", - e - ), - } - }, - _ => ServiceError::WsError { - reason: format!( - "IO error during WebSocket connection: {}", - e - ), - }, - }, - e => ServiceError::WsError { - reason: e.to_string(), - }, - }, - } - } -} - -/// Process the WebSocket, until it times out. -async fn process( - socket_stream: S, - mut incoming_sink: Sender, -) -> Result<(), AwcWebSocketError> -where - S: Unpin, - S: Stream>, -{ - let mut socket_stream = socket_stream.fuse(); - - let mut ka_interval = actix::clock::interval_at( - actix::clock::Instant::now(), - push_service::KEEPALIVE_TIMEOUT_SECONDS, - ); - - loop { - let tick = ka_interval.tick().fuse(); - futures::pin_mut!(tick); - futures::select! { - _ = tick => { - tracing::trace!("Triggering keep-alive"); - if let Err(e) = incoming_sink.send(WebSocketStreamItem::KeepAliveRequest).await { - tracing::info!("Websocket sink has closed: {:?}.", e); - break; - }; - }, - frame = socket_stream.next() => { - let frame = if let Some(frame) = frame { - frame - } else { - tracing::info!("process: Socket stream ended"); - break; - }; - - let frame = match frame? { - Frame::Binary(s) => s, - - Frame::Continuation(_c) => todo!(), - Frame::Ping(msg) => { - tracing::warn!(?msg, "received Ping"); - - continue; - }, - Frame::Pong(msg) => { - tracing::trace!(?msg, "received Pong"); - - continue; - }, - Frame::Text(frame) => { - tracing::warn!(?frame, "frame::Text",); - - // this is a protocol violation, maybe break; is better? - continue; - }, - - Frame::Close(c) => { - tracing::warn!(?c, "Websocket closing"); - - break; - }, - }; - - // Match SendError - if let Err(e) = incoming_sink.send(WebSocketStreamItem::Message(frame)).await { - tracing::info!("Websocket sink has closed: {:?}.", e); - break; - } - }, - } - } - Ok(()) -} - -impl AwcWebSocket { - pub(crate) async fn with_client( - client: &mut awc::Client, - base_url: impl std::borrow::Borrow, - path: &str, - additional_headers: &[(&str, &str)], - credentials: Option<&ServiceCredentials>, - ) -> Result<(Self, ::Stream), AwcWebSocketError> - { - let mut url = base_url.borrow().join(path).expect("valid url"); - url.set_scheme("wss").expect("valid https base url"); - - if let Some(credentials) = credentials { - url.query_pairs_mut() - .append_pair("login", credentials.login().as_ref()) - .append_pair( - "password", - credentials.password.as_ref().expect("a password"), - ); - } - - tracing::trace!( - url.scheme = url.scheme(), - url.host = ?url.host(), - url.path = url.path(), - url.has_query = ?url.query().is_some(), - "starting websocket", - ); - let mut ws = client.ws(url.as_str()); - for (key, value) in additional_headers { - ws = ws.header(*key, *value); - } - let (response, framed) = ws.connect().await?; - - tracing::debug!(?response, "WebSocket connected"); - - let (incoming_sink, incoming_stream) = channel(5); - - let (socket_sink, socket_stream) = framed.split(); - let processing_task = process(socket_stream, incoming_sink); - - // When the processing_task stops, the consuming stream and sink also - // terminate. - actix_rt::spawn(processing_task.map(|v| match v { - Ok(()) => (), - Err(e) => { - tracing::warn!("Processing task terminated with error: {:?}", e) - }, - })); - - Ok(( - Self { - socket_sink: Box::new(socket_sink), - }, - incoming_stream, - )) - } -} - -#[async_trait::async_trait(?Send)] -impl WebSocketService for AwcWebSocket { - type Stream = Receiver; - - async fn send_message(&mut self, msg: Bytes) -> Result<(), ServiceError> { - self.socket_sink - .send(ws::Message::Binary(msg)) - .await - .map_err(AwcWebSocketError::from)?; - Ok(()) - } -} diff --git a/libsignal-service-hyper/Cargo.toml b/libsignal-service-hyper/Cargo.toml deleted file mode 100644 index df43c566e..000000000 --- a/libsignal-service-hyper/Cargo.toml +++ /dev/null @@ -1,44 +0,0 @@ -[package] -name = "libsignal-service-hyper" -version = "0.1.0" -authors = ["Gabriel Féron "] -edition = "2021" -license = "AGPL-3.0" -rust-version = "1.70.0" - -[dependencies] -libsignal-service = { path = "../libsignal-service" } - -async-trait = "0.1" -bytes = "1.0" -futures = "0.3" -tracing = "0.1" -tracing-futures = "0.2" -mpart-async = "0.7" -serde = "1.0" -serde_json = "1.0" -thiserror = "1.0" -url = "2.1" - -hyper = "1.0" -hyper-util = { version = "0.1", features = ["client", "client-legacy"] } -hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "ring", "logging"] } -hyper-timeout = "0.5" -headers = "0.4" -http-body-util = "0.1" - -# for websocket support -async-tungstenite = { version = "0.27", features = ["tokio-rustls-native-certs", "url"] } - -tokio = { version = "1.0", features = ["macros"] } -tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } - -rustls-pemfile = "2.0" - -[dev-dependencies] -chrono = "0.4" -rand = "0.8" -tokio = { version = "1.0", features = ["rt-multi-thread"] } - -[features] -unsend-futures = ["libsignal-service/unsend-futures"] diff --git a/libsignal-service-hyper/examples/registering.rs b/libsignal-service-hyper/examples/registering.rs deleted file mode 100644 index 7e8355a29..000000000 --- a/libsignal-service-hyper/examples/registering.rs +++ /dev/null @@ -1,196 +0,0 @@ -use std::str::FromStr; - -use libsignal_service::configuration::{ServiceCredentials, SignalServers}; -use libsignal_service::prelude::phonenumber::PhoneNumber; -use libsignal_service::prelude::ProfileKey; -use libsignal_service::provisioning::generate_registration_id; -use libsignal_service::push_service::{ - AccountAttributes, DeviceCapabilities, PushService, RegistrationMethod, - VerificationTransport, -}; -use libsignal_service::{AccountManager, USER_AGENT}; - -use libsignal_service_hyper::prelude::HyperPushService; - -use rand::RngCore; - -#[path = "../../libsignal-service/examples/storage.rs"] -mod storage; - -#[tokio::main] -async fn main() { - let client = "libsignal-service-hyper-example"; - let phonenumber = let_user_enter_phone_number(); - let password = let_user_enter_password(); - let use_voice = does_user_want_voice_confirmation(); - let captcha = let_user_solve_captcha(); - let push_token = None; - // Mobile country code and mobile network code can in theory be extracted from the phone - // number, but it's not necessary for the API to function correctly. - // XXX: We could internalize this if statement to create_verification_session - let (mcc, mnc) = if let Some(carrier) = phonenumber.carrier() { - (Some(&carrier[0..3]), Some(&carrier[3..])) - } else { - (None, None) - }; - - let mut push_service = - create_push_service(phonenumber.clone(), password.clone()); - let mut session = push_service - .create_verification_session( - &phonenumber.to_string(), - push_token, - mcc, - mnc, - ) - .await - .expect("create a registration verification session"); - println!("Sending registration request..."); - - if session.captcha_required() { - session = push_service - .patch_verification_session( - &session.id, - None, - None, - None, - Some(&captcha), - None, - ) - .await - .expect("submit captcha"); - } - - if session.push_challenge_required() { - eprintln!("Push challenge required, but not implemented."); - return; - } - - if !session.allowed_to_request_code { - eprintln!( - "Not allowed to request verification code, reason unknown: {session:?}", - ); - return; - } - - session = push_service - .request_verification_code( - &session.id, - client, - if use_voice { - VerificationTransport::Voice - } else { - VerificationTransport::Sms - }, - ) - .await - .expect("request verification code"); - - let confirmation_code = let_user_enter_confirmation_code(); - - println!("Submitting confirmation code..."); - - session = push_service - .submit_verification_code(&session.id, confirmation_code) - .await - .expect("Sending confirmation code failed."); - - if !session.verified { - eprintln!("Session is not verified"); - return; - } - - let registration_id = generate_registration_id(&mut rand::thread_rng()); - let pni_registration_id = generate_registration_id(&mut rand::thread_rng()); - let signaling_key = generate_signaling_key(); - let mut profile_key = [0u8; 32]; - rand::thread_rng().fill_bytes(&mut profile_key); - let profile_key = ProfileKey::create(profile_key); - let skip_device_transfer = false; - - // Create the prekeys storage - let mut aci_store = storage::ExampleStore::new(); - let mut pni_store = storage::ExampleStore::new(); - - let mut account_manager = AccountManager::new(push_service, None); - let _registration_data = account_manager - .register_account( - &mut rand::thread_rng(), - RegistrationMethod::SessionId(&session.id), - AccountAttributes { - signaling_key: Some(signaling_key.to_vec()), - registration_id, - pni_registration_id, - voice: false, - video: false, - fetches_messages: true, - pin: None, - registration_lock: None, - unidentified_access_key: Some( - profile_key.derive_access_key().to_vec(), - ), - unrestricted_unidentified_access: false, // TODO: make this configurable? - discoverable_by_phone_number: true, - name: Some("libsignal-service-hyper test".into()), - capabilities: DeviceCapabilities::default(), - }, - &mut aci_store, - &mut pni_store, - skip_device_transfer, - ) - .await; - - // You would want to store the registration data - - println!("Registration completed!"); -} - -fn generate_signaling_key() -> [u8; 52] { - // Signaling key that decrypts the incoming Signal messages - let mut rng = rand::thread_rng(); - let mut signaling_key = [0u8; 52]; - rng.fill_bytes(&mut signaling_key); - signaling_key -} - -fn create_push_service( - phonenumber: PhoneNumber, - password: String, -) -> HyperPushService { - HyperPushService::new( - SignalServers::Staging, // You might want to switch to Production servers - Some(ServiceCredentials { - aci: None, - pni: None, - phonenumber, - password: Some(password), - signaling_key: None, - device_id: None, - }), - USER_AGENT.into(), - ) -} - -// ------------------------------------ -// Here come the user interaction mocks - -fn let_user_solve_captcha() -> String { - // Here you want to let the user solve a captcha on https://signalcaptchas.org/registration/generate.html - "EnterCaptchaHere".to_string() -} - -fn let_user_enter_confirmation_code() -> &'static str { - "12345" -} - -fn does_user_want_voice_confirmation() -> bool { - false -} - -fn let_user_enter_phone_number() -> PhoneNumber { - PhoneNumber::from_str("+49301234567").expect("Not a valid phone number") -} - -fn let_user_enter_password() -> String { - "EnterPasswordHere".to_string() -} diff --git a/libsignal-service-hyper/src/lib.rs b/libsignal-service-hyper/src/lib.rs deleted file mode 100644 index 84388618e..000000000 --- a/libsignal-service-hyper/src/lib.rs +++ /dev/null @@ -1,9 +0,0 @@ -#![recursion_limit = "256"] -#![allow(clippy::uninlined_format_args)] - -pub mod push_service; -pub mod websocket; - -pub mod prelude { - pub use crate::push_service::*; -} diff --git a/libsignal-service-hyper/src/push_service.rs b/libsignal-service-hyper/src/push_service.rs deleted file mode 100644 index 6a294825e..000000000 --- a/libsignal-service-hyper/src/push_service.rs +++ /dev/null @@ -1,670 +0,0 @@ -use std::io; -use std::time::Duration; - -use bytes::{Buf, Bytes}; -use futures::{FutureExt, StreamExt, TryStreamExt}; -use headers::{Authorization, HeaderMapExt}; -use http_body_util::{BodyExt, Full}; -use hyper::{ - body::Incoming, - header::{CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT}, - Method, Request, Response, StatusCode, -}; -use hyper_rustls::HttpsConnector; -use hyper_timeout::TimeoutConnector; -use hyper_util::{ - client::legacy::{connect::HttpConnector, Client}, - rt::TokioExecutor, -}; -use libsignal_service::{ - configuration::*, prelude::ProtobufMessage, push_service::*, - websocket::SignalWebSocket, MaybeSend, -}; -use serde::{Deserialize, Serialize}; -use tokio_rustls::rustls::{self, ClientConfig}; -use tracing::{debug, debug_span}; -use tracing_futures::Instrument; - -use crate::websocket::TungsteniteWebSocket; - -#[derive(Clone)] -pub struct HyperPushService { - cfg: ServiceConfiguration, - user_agent: String, - credentials: Option, - client: - Client>, Full>, -} - -#[derive(Debug)] -struct RequestBody { - contents: Vec, - content_type: String, -} - -impl HyperPushService { - pub fn new( - cfg: impl Into, - credentials: Option, - user_agent: String, - ) -> Self { - let cfg = cfg.into(); - let tls_config = Self::tls_config(&cfg); - - let https = hyper_rustls::HttpsConnectorBuilder::new() - .with_tls_config(tls_config) - .https_only() - .enable_http1() - .build(); - - // as in Signal-Android - let mut timeout_connector = TimeoutConnector::new(https); - timeout_connector.set_connect_timeout(Some(Duration::from_secs(10))); - timeout_connector.set_read_timeout(Some(Duration::from_secs(65))); - timeout_connector.set_write_timeout(Some(Duration::from_secs(65))); - - let client: Client<_, Full> = - Client::builder(TokioExecutor::new()).build(timeout_connector); - - Self { - cfg, - credentials: credentials.and_then(|c| c.authorization()), - client, - user_agent, - } - } - - fn tls_config(cfg: &ServiceConfiguration) -> ClientConfig { - let mut cert_bytes = io::Cursor::new(&cfg.certificate_authority); - let roots = rustls_pemfile::certs(&mut cert_bytes); - - let mut root_certs = rustls::RootCertStore::empty(); - root_certs.add_parsable_certificates( - roots.map(|c| c.expect("parsable PEM files")), - ); - - rustls::ClientConfig::builder() - .with_root_certificates(root_certs) - .with_no_client_auth() - } - - #[tracing::instrument(skip(self, path, body), fields(path = %path.as_ref()))] - async fn request( - &self, - method: Method, - endpoint: Endpoint, - path: impl AsRef, - additional_headers: &[(&str, &str)], - credentials_override: HttpAuthOverride, - body: Option, - ) -> Result, ServiceError> { - let url = self.cfg.base_url(endpoint).join(path.as_ref())?; - let mut builder = Request::builder() - .method(method) - .uri(url.as_str()) - .header(USER_AGENT, &self.user_agent); - - for (header, value) in additional_headers { - builder = builder.header(*header, *value); - } - - match credentials_override { - HttpAuthOverride::NoOverride => { - if let Some(HttpAuth { username, password }) = - self.credentials.as_ref() - { - builder - .headers_mut() - .unwrap() - .typed_insert(Authorization::basic(username, password)); - } - }, - HttpAuthOverride::Identified(HttpAuth { username, password }) => { - builder - .headers_mut() - .unwrap() - .typed_insert(Authorization::basic(&username, &password)); - }, - HttpAuthOverride::Unidentified => (), - }; - - let request = if let Some(RequestBody { - contents, - content_type, - }) = body - { - builder - .header(CONTENT_LENGTH, contents.len() as u64) - .header(CONTENT_TYPE, content_type) - .body(Full::new(Bytes::from(contents))) - .unwrap() - } else { - builder.body(Full::default()).unwrap() - }; - - let mut response = self.client.request(request).await.map_err(|e| { - ServiceError::SendError { - reason: e.to_string(), - } - })?; - - match response.status() { - StatusCode::OK => Ok(response), - StatusCode::NO_CONTENT => Ok(response), - StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => { - Err(ServiceError::Unauthorized) - }, - StatusCode::NOT_FOUND => { - // This is 404 and means that e.g. recipient is not registered - Err(ServiceError::NotFoundError) - }, - StatusCode::PAYLOAD_TOO_LARGE => { - // This is 413 and means rate limit exceeded for Signal. - Err(ServiceError::RateLimitExceeded) - }, - StatusCode::CONFLICT => { - let mismatched_devices = - Self::json(&mut response).await.map_err(|e| { - tracing::error!( - "Failed to decode HTTP 409 response: {}", - e - ); - ServiceError::UnhandledResponseCode { - http_code: StatusCode::CONFLICT.as_u16(), - } - })?; - Err(ServiceError::MismatchedDevicesException( - mismatched_devices, - )) - }, - StatusCode::GONE => { - let stale_devices = - Self::json(&mut response).await.map_err(|e| { - tracing::error!( - "Failed to decode HTTP 410 response: {}", - e - ); - ServiceError::UnhandledResponseCode { - http_code: StatusCode::GONE.as_u16(), - } - })?; - Err(ServiceError::StaleDevices(stale_devices)) - }, - StatusCode::LOCKED => { - let locked = Self::json(&mut response).await.map_err(|e| { - tracing::error!( - ?response, - "Failed to decode HTTP 423 response: {}", - e - ); - ServiceError::UnhandledResponseCode { - http_code: StatusCode::LOCKED.as_u16(), - } - })?; - Err(ServiceError::Locked(locked)) - }, - StatusCode::PRECONDITION_REQUIRED => { - let proof_required = - Self::json(&mut response).await.map_err(|e| { - tracing::error!( - "Failed to decode HTTP 428 response: {}", - e - ); - ServiceError::UnhandledResponseCode { - http_code: StatusCode::PRECONDITION_REQUIRED - .as_u16(), - } - })?; - Err(ServiceError::ProofRequiredError(proof_required)) - }, - // XXX: fill in rest from PushServiceSocket - code => { - tracing::trace!( - "Unhandled response {} with body: {}", - code.as_u16(), - Self::text(&mut response).await?, - ); - Err(ServiceError::UnhandledResponseCode { - http_code: code.as_u16(), - }) - }, - } - } - - async fn body( - response: &mut Response, - ) -> Result { - Ok(response - .collect() - .await - .map_err(|e| ServiceError::ResponseError { - reason: format!("failed to aggregate HTTP response body: {e}"), - })? - .aggregate()) - } - - #[tracing::instrument(skip(response), fields(status = %response.status()))] - async fn json( - response: &mut Response, - ) -> Result - where - for<'de> T: Deserialize<'de>, - { - let body = Self::body(response).await?; - - if body.has_remaining() { - serde_json::from_reader(body.reader()) - } else { - serde_json::from_value(serde_json::Value::Null) - } - .map_err(|e| ServiceError::JsonDecodeError { - reason: e.to_string(), - }) - } - - #[tracing::instrument(skip(response), fields(status = %response.status()))] - async fn protobuf( - response: &mut Response, - ) -> Result - where - M: ProtobufMessage + Default, - { - let body = Self::body(response).await?; - M::decode(body).map_err(ServiceError::ProtobufDecodeError) - } - - #[tracing::instrument(skip(response), fields(status = %response.status()))] - async fn text( - response: &mut Response, - ) -> Result { - let body = Self::body(response).await?; - io::read_to_string(body.reader()).map_err(|e| { - ServiceError::ResponseError { - reason: format!("failed to read HTTP response body: {e}"), - } - }) - } -} - -#[cfg_attr(feature = "unsend-futures", async_trait::async_trait(?Send))] -#[cfg_attr(not(feature = "unsend-futures"), async_trait::async_trait)] -impl PushService for HyperPushService { - // This is in principle known at compile time, but long to write out. - type ByteStream = Box; - - #[tracing::instrument(skip(self))] - async fn get_json( - &mut self, - service: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - credentials_override: HttpAuthOverride, - ) -> Result - where - for<'de> T: Deserialize<'de>, - { - let mut response = self - .request( - Method::GET, - service, - path, - additional_headers, - credentials_override, - None, - ) - .await?; - - Self::json(&mut response).await - } - - #[tracing::instrument(skip(self))] - async fn delete_json( - &mut self, - service: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - ) -> Result - where - for<'de> T: Deserialize<'de>, - { - let mut response = self - .request( - Method::DELETE, - service, - path, - additional_headers, - HttpAuthOverride::NoOverride, - None, - ) - .await?; - - Self::json(&mut response).await - } - - #[tracing::instrument(skip(self, value))] - async fn put_json( - &mut self, - service: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - credentials_override: HttpAuthOverride, - value: S, - ) -> Result - where - for<'de> D: Deserialize<'de>, - S: MaybeSend + Serialize, - { - let json = serde_json::to_vec(&value).map_err(|e| { - ServiceError::JsonDecodeError { - reason: e.to_string(), - } - })?; - - let mut response = self - .request( - Method::PUT, - service, - path, - additional_headers, - credentials_override, - Some(RequestBody { - contents: json, - content_type: "application/json".into(), - }), - ) - .await?; - - Self::json(&mut response).await - } - - #[tracing::instrument(skip(self, value))] - async fn patch_json( - &mut self, - service: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - credentials_override: HttpAuthOverride, - value: S, - ) -> Result - where - for<'de> D: Deserialize<'de>, - S: MaybeSend + Serialize, - { - let json = serde_json::to_vec(&value).map_err(|e| { - ServiceError::JsonDecodeError { - reason: e.to_string(), - } - })?; - - let mut response = self - .request( - Method::PATCH, - service, - path, - additional_headers, - credentials_override, - Some(RequestBody { - contents: json, - content_type: "application/json".into(), - }), - ) - .await?; - - Self::json(&mut response).await - } - - #[tracing::instrument(skip(self, value))] - async fn post_json( - &mut self, - service: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - credentials_override: HttpAuthOverride, - value: S, - ) -> Result - where - for<'de> D: Deserialize<'de>, - S: MaybeSend + Serialize, - { - let json = serde_json::to_vec(&value).map_err(|e| { - ServiceError::JsonDecodeError { - reason: e.to_string(), - } - })?; - - let mut response = self - .request( - Method::POST, - service, - path, - additional_headers, - credentials_override, - Some(RequestBody { - contents: json, - content_type: "application/json".into(), - }), - ) - .await?; - - Self::json(&mut response).await - } - - #[tracing::instrument(skip(self))] - async fn get_protobuf( - &mut self, - service: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - credentials_override: HttpAuthOverride, - ) -> Result - where - T: Default + libsignal_service::prelude::ProtobufMessage, - { - let mut response = self - .request( - Method::GET, - service, - path, - additional_headers, - credentials_override, - None, - ) - .await?; - - Self::protobuf(&mut response).await - } - - #[tracing::instrument(skip(self, value))] - async fn put_protobuf( - &mut self, - service: Endpoint, - path: &str, - additional_headers: &[(&str, &str)], - value: S, - ) -> Result - where - D: Default + libsignal_service::prelude::ProtobufMessage, - S: Sized + libsignal_service::prelude::ProtobufMessage, - { - let protobuf = value.encode_to_vec(); - - let mut response = self - .request( - Method::PUT, - service, - path, - additional_headers, - HttpAuthOverride::NoOverride, - Some(RequestBody { - contents: protobuf, - content_type: "application/x-protobuf".into(), - }), - ) - .await?; - - Self::protobuf(&mut response).await - } - - #[tracing::instrument(skip(self))] - async fn get_from_cdn( - &mut self, - cdn_id: u32, - path: &str, - ) -> Result { - let response = self - .request( - Method::GET, - Endpoint::Cdn(cdn_id), - path, - &[], - HttpAuthOverride::Unidentified, // CDN requests are always without authentication - None, - ) - .await?; - - Ok(Box::new( - response - .into_body() - .into_data_stream() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - .into_async_read(), - )) - } - - #[tracing::instrument(skip(self, value, file), fields(file = file.as_ref().map(|_| "")))] - async fn post_to_cdn0<'s, C>( - &mut self, - path: &str, - value: &[(&str, &str)], - file: Option<(&str, &'s mut C)>, - ) -> Result<(), ServiceError> - where - C: io::Read + Send + 's, - { - let mut form = mpart_async::client::MultipartRequest::default(); - - // mpart-async has a peculiar ordering of the form items, - // and Amazon S3 expects them in a very specific order (i.e., the file contents should - // go last. - // - // mpart-async uses a VecDeque internally for ordering the fields in the order given. - // - // https://github.com/cetra3/mpart-async/issues/16 - - for &(k, v) in value { - form.add_field(k, v); - } - - if let Some((filename, file)) = file { - // XXX Actix doesn't cope with none-'static lifetimes - // https://docs.rs/actix-web/3.2.0/actix_web/body/enum.Body.html - let mut buf = Vec::new(); - file.read_to_end(&mut buf) - .expect("infallible Read instance"); - form.add_stream( - "file", - filename, - "application/octet-stream", - futures::future::ok::<_, ()>(Bytes::from(buf)).into_stream(), - ); - } - - let content_type = - format!("multipart/form-data; boundary={}", form.get_boundary()); - - // XXX Amazon S3 needs the Content-Length, but we don't know it without depleting the whole - // stream. Sadly, Content-Length != contents.len(), but should include the whole form. - let mut body_contents = vec![]; - while let Some(b) = form.next().await { - // Unwrap, because no error type was used above - body_contents.extend(b.unwrap()); - } - tracing::trace!( - "Sending PUT with Content-Type={} and length {}", - content_type, - body_contents.len() - ); - - let response = self - .request( - Method::POST, - Endpoint::Cdn(0), - path, - &[], - HttpAuthOverride::NoOverride, - Some(RequestBody { - contents: body_contents, - content_type, - }), - ) - .await?; - - debug!("HyperPushService::PUT response: {:?}", response); - - Ok(()) - } - - async fn ws( - &mut self, - path: &str, - keepalive_path: &str, - additional_headers: &[(&str, &str)], - credentials: Option, - ) -> Result { - let span = debug_span!("websocket"); - let (ws, stream) = TungsteniteWebSocket::with_tls_config( - Self::tls_config(&self.cfg), - self.cfg.base_url(Endpoint::Service), - path, - additional_headers, - credentials.as_ref(), - ) - .instrument(span.clone()) - .await?; - let (ws, task) = - SignalWebSocket::from_socket(ws, stream, keepalive_path.to_owned()); - let task = task.instrument(span); - #[cfg(feature = "unsend-futures")] - tokio::task::spawn_local(task); - #[cfg(not(feature = "unsend-futures"))] - tokio::task::spawn(task); - Ok(ws) - } -} - -#[cfg(test)] -mod tests { - use bytes::{Buf, Bytes}; - use libsignal_service::configuration::SignalServers; - - #[test] - fn create_clients() { - let configs = &[SignalServers::Staging, SignalServers::Production]; - - for cfg in configs { - let _ = super::HyperPushService::new( - cfg, - None, - "libsignal-service test".to_string(), - ); - } - } - - #[test] - fn serde_json_from_empty_reader() { - // This fails, so we have handle empty response body separately in HyperPushService::json() - let bytes: Bytes = "".into(); - assert!( - serde_json::from_reader::, String>( - bytes.reader() - ) - .is_err() - ); - } - - #[test] - fn serde_json_form_empty_vec() { - // If we're trying to send and empty payload, serde_json must be able to make a Vec out of it - assert!(serde_json::to_vec(b"").is_ok()); - } -} diff --git a/libsignal-service/.gitignore b/libsignal-service/.gitignore deleted file mode 100644 index 5116d3fb3..000000000 --- a/libsignal-service/.gitignore +++ /dev/null @@ -1 +0,0 @@ -src/proto/*.rs diff --git a/libsignal-service/Cargo.toml b/libsignal-service/Cargo.toml deleted file mode 100644 index 59d84e1f5..000000000 --- a/libsignal-service/Cargo.toml +++ /dev/null @@ -1,48 +0,0 @@ -[package] -name = "libsignal-service" -version = "0.1.0" -authors = ["Ruben De Smet ", "Gabriel Féron ", "Michael Bryan ", "Shady Khalifa "] -edition = "2021" -license = "AGPL-3.0" -readme = "../README.md" - -[dependencies] -libsignal-protocol = { git = "https://github.com/signalapp/libsignal", tag = "v0.56.1" } -zkgroup = { git = "https://github.com/signalapp/libsignal", tag = "v0.56.1" } - -aes = "0.8" -aes-gcm = "0.10" -cbc = "0.1" -ctr = "0.9" -async-trait = "0.1" -base64 = "0.22" -bincode = "1.3" -bytes = "1" -chrono = { version = "0.4", features = ["serde", "clock"], default-features = false } -derivative = "2.2" -futures = "0.3" -hex = "0.4" -hkdf = "0.12" -hmac = "0.12" -phonenumber = "0.3" -prost = "0.13" -rand = "0.8" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.85" -sha2 = "0.10" -thiserror = "1.0" -url = { version = "2.1", features = ["serde"] } -uuid = { version = "1", features = ["serde"] } - -tracing = { version = "0.1", features = ["log"] } -tracing-futures = "0.2" - -[build-dependencies] -prost-build = "0.13" - -[dev-dependencies] -anyhow = "1.0" -tokio = { version = "1.0", features = ["macros", "rt"] } - -[features] -unsend-futures = [] diff --git a/libsignal-service/protobuf/DeviceName.proto b/protobuf/DeviceName.proto similarity index 100% rename from libsignal-service/protobuf/DeviceName.proto rename to protobuf/DeviceName.proto diff --git a/libsignal-service/protobuf/Groups.proto b/protobuf/Groups.proto similarity index 100% rename from libsignal-service/protobuf/Groups.proto rename to protobuf/Groups.proto diff --git a/libsignal-service/protobuf/Provisioning.proto b/protobuf/Provisioning.proto similarity index 100% rename from libsignal-service/protobuf/Provisioning.proto rename to protobuf/Provisioning.proto diff --git a/libsignal-service/protobuf/SignalService.proto b/protobuf/SignalService.proto similarity index 100% rename from libsignal-service/protobuf/SignalService.proto rename to protobuf/SignalService.proto diff --git a/libsignal-service/protobuf/StickerResources.proto b/protobuf/StickerResources.proto similarity index 100% rename from libsignal-service/protobuf/StickerResources.proto rename to protobuf/StickerResources.proto diff --git a/libsignal-service/protobuf/UnidentifiedDelivery.proto b/protobuf/UnidentifiedDelivery.proto similarity index 100% rename from libsignal-service/protobuf/UnidentifiedDelivery.proto rename to protobuf/UnidentifiedDelivery.proto diff --git a/libsignal-service/protobuf/WebSocketResources.proto b/protobuf/WebSocketResources.proto similarity index 100% rename from libsignal-service/protobuf/WebSocketResources.proto rename to protobuf/WebSocketResources.proto diff --git a/libsignal-service/protobuf/update-protos.sh b/protobuf/update-protos.sh similarity index 100% rename from libsignal-service/protobuf/update-protos.sh rename to protobuf/update-protos.sh diff --git a/libsignal-service/src/account_manager.rs b/src/account_manager.rs similarity index 99% rename from libsignal-service/src/account_manager.rs rename to src/account_manager.rs index d86af6c67..d8251c92b 100644 --- a/libsignal-service/src/account_manager.rs +++ b/src/account_manager.rs @@ -52,8 +52,8 @@ use crate::{ type Aes256Ctr128BE = ctr::Ctr128BE; -pub struct AccountManager { - service: Service, +pub struct AccountManager { + service: PushService, profile_key: Option, } @@ -73,8 +73,8 @@ pub struct Profile { pub avatar: Option, } -impl AccountManager { - pub fn new(service: Service, profile_key: Option) -> Self { +impl AccountManager { + pub fn new(service: PushService, profile_key: Option) -> Self { Self { service, profile_key, @@ -639,7 +639,7 @@ impl AccountManager { &mut self, aci_protocol_store: &mut Aci, pni_protocol_store: &mut Pni, - mut sender: MessageSender, + mut sender: MessageSender, local_aci: ServiceAddress, e164: PhoneNumber, csprng: &mut R, diff --git a/libsignal-service/src/attachment_cipher.rs b/src/attachment_cipher.rs similarity index 100% rename from libsignal-service/src/attachment_cipher.rs rename to src/attachment_cipher.rs diff --git a/libsignal-service/src/cipher.rs b/src/cipher.rs similarity index 100% rename from libsignal-service/src/cipher.rs rename to src/cipher.rs diff --git a/libsignal-service/src/configuration.rs b/src/configuration.rs similarity index 100% rename from libsignal-service/src/configuration.rs rename to src/configuration.rs diff --git a/libsignal-service/src/content.rs b/src/content.rs similarity index 100% rename from libsignal-service/src/content.rs rename to src/content.rs diff --git a/libsignal-service/src/content/data_message.rs b/src/content/data_message.rs similarity index 100% rename from libsignal-service/src/content/data_message.rs rename to src/content/data_message.rs diff --git a/libsignal-service/src/content/story_message.rs b/src/content/story_message.rs similarity index 100% rename from libsignal-service/src/content/story_message.rs rename to src/content/story_message.rs diff --git a/libsignal-service/src/digeststream.rs b/src/digeststream.rs similarity index 100% rename from libsignal-service/src/digeststream.rs rename to src/digeststream.rs diff --git a/libsignal-service/src/envelope.rs b/src/envelope.rs similarity index 100% rename from libsignal-service/src/envelope.rs rename to src/envelope.rs diff --git a/libsignal-service/src/groups_v2/manager.rs b/src/groups_v2/manager.rs similarity index 98% rename from libsignal-service/src/groups_v2/manager.rs rename to src/groups_v2/manager.rs index ff7d76f97..d5e77dffd 100644 --- a/libsignal-service/src/groups_v2/manager.rs +++ b/src/groups_v2/manager.rs @@ -127,17 +127,17 @@ impl CredentialsCache for &mut T { } } -pub struct GroupsManager { +pub struct GroupsManager { service_ids: ServiceIds, - push_service: S, + push_service: PushService, credentials_cache: C, server_public_params: ServerPublicParams, } -impl GroupsManager { +impl GroupsManager { pub fn new( service_ids: ServiceIds, - push_service: S, + push_service: PushService, credentials_cache: C, server_public_params: ServerPublicParams, ) -> Self { diff --git a/libsignal-service/src/groups_v2/mod.rs b/src/groups_v2/mod.rs similarity index 100% rename from libsignal-service/src/groups_v2/mod.rs rename to src/groups_v2/mod.rs diff --git a/libsignal-service/src/groups_v2/model.rs b/src/groups_v2/model.rs similarity index 100% rename from libsignal-service/src/groups_v2/model.rs rename to src/groups_v2/model.rs diff --git a/libsignal-service/src/groups_v2/operations.rs b/src/groups_v2/operations.rs similarity index 100% rename from libsignal-service/src/groups_v2/operations.rs rename to src/groups_v2/operations.rs diff --git a/libsignal-service/src/groups_v2/utils.rs b/src/groups_v2/utils.rs similarity index 100% rename from libsignal-service/src/groups_v2/utils.rs rename to src/groups_v2/utils.rs diff --git a/libsignal-service/src/kat.bin.rs b/src/kat.bin.rs similarity index 100% rename from libsignal-service/src/kat.bin.rs rename to src/kat.bin.rs diff --git a/libsignal-service/src/lib.rs b/src/lib.rs similarity index 75% rename from libsignal-service/src/lib.rs rename to src/lib.rs index 177325328..2a881ca8d 100644 --- a/libsignal-service/src/lib.rs +++ b/src/lib.rs @@ -48,25 +48,6 @@ pub const GROUP_UPDATE_FLAG: u32 = 1; /// GROUP_LEAVE_FLAG signals that this message is a group leave message. pub const GROUP_LEAVE_FLAG: u32 = 2; -/// This trait allows for the conditional support of Send compatible futures -/// depending on whether or not the `unsend-futures` feature flag is enabled. -/// As this feature is disabled by default, Send is supported by default. -/// -/// This is necessary as actix does not support Send, which means unconditionally -/// imposing this requirement would break libsignal-service-actix. -/// -/// Conversely, hyper does support Send, which is why libsignal-service-hyper -/// does not enable the `unsend-futures` feature flag. -#[cfg(not(feature = "unsend-futures"))] -pub trait MaybeSend: Send {} -#[cfg(not(feature = "unsend-futures"))] -impl MaybeSend for T where T: Send {} - -#[cfg(feature = "unsend-futures")] -pub trait MaybeSend {} -#[cfg(feature = "unsend-futures")] -impl MaybeSend for T {} - pub mod prelude { pub use super::ServiceAddress; pub use crate::{ diff --git a/libsignal-service/src/master_key.rs b/src/master_key.rs similarity index 100% rename from libsignal-service/src/master_key.rs rename to src/master_key.rs diff --git a/libsignal-service/src/messagepipe.rs b/src/messagepipe.rs similarity index 93% rename from libsignal-service/src/messagepipe.rs rename to src/messagepipe.rs index 9da098599..cdb77b7a8 100644 --- a/libsignal-service/src/messagepipe.rs +++ b/src/messagepipe.rs @@ -29,8 +29,7 @@ pub enum Incoming { QueueEmpty, } -#[cfg_attr(feature = "unsend-futures", async_trait::async_trait(?Send))] -#[cfg_attr(not(feature = "unsend-futures"), async_trait::async_trait)] +#[async_trait::async_trait] pub trait WebSocketService { type Stream: FusedStream + Unpin; @@ -139,8 +138,7 @@ impl MessagePipe { pub struct PanicingWebSocketService; #[allow(clippy::diverging_sub_expression)] -#[cfg_attr(feature = "unsend-futures", async_trait::async_trait(?Send))] -#[cfg_attr(not(feature = "unsend-futures"), async_trait::async_trait)] +#[async_trait::async_trait] impl WebSocketService for PanicingWebSocketService { type Stream = futures::channel::mpsc::Receiver; diff --git a/libsignal-service/src/models.rs b/src/models.rs similarity index 100% rename from libsignal-service/src/models.rs rename to src/models.rs diff --git a/libsignal-service/src/pre_keys.rs b/src/pre_keys.rs similarity index 100% rename from libsignal-service/src/pre_keys.rs rename to src/pre_keys.rs diff --git a/libsignal-service/src/profile_cipher.rs b/src/profile_cipher.rs similarity index 100% rename from libsignal-service/src/profile_cipher.rs rename to src/profile_cipher.rs diff --git a/libsignal-service/src/profile_name.rs b/src/profile_name.rs similarity index 100% rename from libsignal-service/src/profile_name.rs rename to src/profile_name.rs diff --git a/libsignal-service/src/profile_service.rs b/src/profile_service.rs similarity index 100% rename from libsignal-service/src/profile_service.rs rename to src/profile_service.rs diff --git a/libsignal-service/src/proto.rs b/src/proto.rs similarity index 100% rename from libsignal-service/src/proto.rs rename to src/proto.rs diff --git a/libsignal-service/src/provisioning/cipher.rs b/src/provisioning/cipher.rs similarity index 100% rename from libsignal-service/src/provisioning/cipher.rs rename to src/provisioning/cipher.rs diff --git a/libsignal-service/src/provisioning/mod.rs b/src/provisioning/mod.rs similarity index 99% rename from libsignal-service/src/provisioning/mod.rs rename to src/provisioning/mod.rs index 39d058cc9..2455409a1 100644 --- a/libsignal-service/src/provisioning/mod.rs +++ b/src/provisioning/mod.rs @@ -136,12 +136,11 @@ pub async fn link_device< R: rand::Rng + rand::CryptoRng, Aci: PreKeysStore, Pni: PreKeysStore, - P: PushService + Clone, >( aci_store: &mut Aci, pni_store: &mut Pni, csprng: &mut R, - mut push_service: P, + mut push_service: PushService, password: &str, device_name: &str, mut tx: Sender, diff --git a/libsignal-service/src/provisioning/pipe.rs b/src/provisioning/pipe.rs similarity index 100% rename from libsignal-service/src/provisioning/pipe.rs rename to src/provisioning/pipe.rs diff --git a/libsignal-service/src/push_service.rs b/src/push_service.rs similarity index 66% rename from libsignal-service/src/push_service.rs rename to src/push_service.rs index 4724ce81f..709cdcfac 100644 --- a/libsignal-service/src/push_service.rs +++ b/src/push_service.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fmt, time::Duration}; +use std::{collections::HashMap, fmt, io, time::Duration}; use crate::{ configuration::{Endpoint, ServiceCredentials}, @@ -7,16 +7,32 @@ use crate::{ pre_keys::{ KyberPreKeyEntity, PreKeyEntity, PreKeyState, SignedPreKeyEntity, }, + prelude::ServiceConfiguration, profile_cipher::ProfileCipherError, proto::{attachment_pointer::AttachmentIdentifier, AttachmentPointer}, sender::{OutgoingPushMessage, OutgoingPushMessages, SendMessageResponse}, utils::{serde_base64, serde_optional_base64, serde_phone_number}, - websocket::SignalWebSocket, - MaybeSend, ParseServiceAddressError, Profile, ServiceAddress, + websocket::{tungstenite::TungsteniteWebSocket, SignalWebSocket}, + ParseServiceAddressError, Profile, ServiceAddress, }; +use bytes::{Buf, Bytes}; use chrono::prelude::*; use derivative::Derivative; +use futures::{FutureExt, StreamExt, TryStreamExt}; +use headers::{Authorization, HeaderMapExt}; +use http_body_util::{BodyExt, Full}; +use hyper::{ + body::Incoming, + header::{CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT}, + Method, Request, Response, StatusCode, +}; +use hyper_rustls::HttpsConnector; +use hyper_timeout::TimeoutConnector; +use hyper_util::{ + client::legacy::{connect::HttpConnector, Client}, + rt::TokioExecutor, +}; use libsignal_protocol::{ error::SignalProtocolError, kem::{Key, Public}, @@ -25,53 +41,14 @@ use libsignal_protocol::{ use phonenumber::PhoneNumber; use prost::Message as ProtobufMessage; use serde::{Deserialize, Serialize}; +use tokio_rustls::rustls; +use tracing::{debug, debug_span, Instrument}; use uuid::Uuid; use zkgroup::{ profiles::{ProfileKeyCommitment, ProfileKeyVersion}, ZkGroupDeserializationFailure, }; -/** -Since we can't use format!() with constants, the URLs here are just for reference purposes -pub const REGISTER_GCM_PATH: &str = "/v1/accounts/gcm/"; -pub const TURN_SERVER_INFO: &str = "/v1/accounts/turn"; -pub const SET_ACCOUNT_ATTRIBUTES: &str = "/v1/accounts/attributes/"; -pub const PIN_PATH: &str = "/v1/accounts/pin/"; -pub const REQUEST_PUSH_CHALLENGE: &str = "/v1/accounts/fcm/preauth/%s/%s"; -pub const WHO_AM_I: &str = "/v1/accounts/whoami"; - -pub const PREKEY_PATH: &str = "/v2/keys/%s"; -pub const PREKEY_DEVICE_PATH: &str = "/v2/keys/%s/%s"; -pub const SIGNED_PREKEY_PATH: &str = "/v2/keys/signed"; - -pub const PROVISIONING_CODE_PATH: &str = "/v1/devices/provisioning/code"; -pub const PROVISIONING_MESSAGE_PATH: &str = "/v1/provisioning/%s"; - -pub const DIRECTORY_TOKENS_PATH: &str = "/v1/directory/tokens"; -pub const DIRECTORY_VERIFY_PATH: &str = "/v1/directory/%s"; -pub const DIRECTORY_AUTH_PATH: &str = "/v1/directory/auth"; -pub const DIRECTORY_FEEDBACK_PATH: &str = "/v1/directory/feedback-v3/%s"; -pub const SENDER_ACK_MESSAGE_PATH: &str = "/v1/messages/%s/%d"; -pub const UUID_ACK_MESSAGE_PATH: &str = "/v1/messages/uuid/%s"; -pub const ATTACHMENT_PATH: &str = "/v2/attachments/form/upload"; - -pub const PROFILE_PATH: &str = "/v1/profile/"; - -pub const SENDER_CERTIFICATE_LEGACY_PATH: &str = "/v1/certificate/delivery"; -pub const SENDER_CERTIFICATE_PATH: &str = - "/v1/certificate/delivery?includeUuid=true"; - -pub const VERIFICATION_SESSION_PATH: &str = "/v1/verification/session"; -pub const VERIFICATION_CODE_PATH: &str = "/v1/verification/session/%s/code"; - -pub const REGISTRATION_PATH: &str = "/v1/registration"; - -pub const ATTACHMENT_DOWNLOAD_PATH: &str = "attachments/%d"; - -pub const STICKER_MANIFEST_PATH: &str = "stickers/%s/manifest.proto"; -pub const STICKER_PATH: &str = "stickers/%s/full/%d"; -**/ - pub const KEEPALIVE_TIMEOUT_SECONDS: Duration = Duration::from_secs(55); pub const DEFAULT_DEVICE_ID: u32 = 1; @@ -609,12 +586,268 @@ pub enum ServiceError { InvalidDeviceName, } -#[cfg_attr(feature = "unsend-futures", async_trait::async_trait(?Send))] -#[cfg_attr(not(feature = "unsend-futures"), async_trait::async_trait)] -pub trait PushService: MaybeSend { - type ByteStream: futures::io::AsyncRead + MaybeSend + Unpin; +#[derive(Debug)] +struct RequestBody { + contents: Vec, + content_type: String, +} + +#[derive(Clone)] +pub struct PushService { + cfg: ServiceConfiguration, + user_agent: String, + credentials: Option, + client: + Client>, Full>, +} - async fn get_json( +impl PushService { + pub fn new( + cfg: impl Into, + credentials: Option, + user_agent: String, + ) -> Self { + let cfg = cfg.into(); + let tls_config = Self::tls_config(&cfg); + + let https = hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config(tls_config) + .https_only() + .enable_http1() + .build(); + + // as in Signal-Android + let mut timeout_connector = TimeoutConnector::new(https); + timeout_connector.set_connect_timeout(Some(Duration::from_secs(10))); + timeout_connector.set_read_timeout(Some(Duration::from_secs(65))); + timeout_connector.set_write_timeout(Some(Duration::from_secs(65))); + + let client: Client<_, Full> = + Client::builder(TokioExecutor::new()).build(timeout_connector); + + Self { + cfg, + credentials: credentials.and_then(|c| c.authorization()), + client, + user_agent, + } + } + + fn tls_config(cfg: &ServiceConfiguration) -> rustls::ClientConfig { + let mut cert_bytes = io::Cursor::new(&cfg.certificate_authority); + let roots = rustls_pemfile::certs(&mut cert_bytes); + + let mut root_certs = rustls::RootCertStore::empty(); + root_certs.add_parsable_certificates( + roots.map(|c| c.expect("parsable PEM files")), + ); + + rustls::ClientConfig::builder() + .with_root_certificates(root_certs) + .with_no_client_auth() + } + + #[tracing::instrument(skip(self, path, body), fields(path = %path.as_ref()))] + async fn request( + &self, + method: Method, + endpoint: Endpoint, + path: impl AsRef, + additional_headers: &[(&str, &str)], + credentials_override: HttpAuthOverride, + body: Option, + ) -> Result, ServiceError> { + let url = self.cfg.base_url(endpoint).join(path.as_ref())?; + let mut builder = Request::builder() + .method(method) + .uri(url.as_str()) + .header(USER_AGENT, &self.user_agent); + + for (header, value) in additional_headers { + builder = builder.header(*header, *value); + } + + match credentials_override { + HttpAuthOverride::NoOverride => { + if let Some(HttpAuth { username, password }) = + self.credentials.as_ref() + { + builder + .headers_mut() + .unwrap() + .typed_insert(Authorization::basic(username, password)); + } + }, + HttpAuthOverride::Identified(HttpAuth { username, password }) => { + builder + .headers_mut() + .unwrap() + .typed_insert(Authorization::basic(&username, &password)); + }, + HttpAuthOverride::Unidentified => (), + }; + + let request = if let Some(RequestBody { + contents, + content_type, + }) = body + { + builder + .header(CONTENT_LENGTH, contents.len() as u64) + .header(CONTENT_TYPE, content_type) + .body(Full::new(Bytes::from(contents))) + .unwrap() + } else { + builder.body(Full::default()).unwrap() + }; + + let mut response = self.client.request(request).await.map_err(|e| { + ServiceError::SendError { + reason: e.to_string(), + } + })?; + + match response.status() { + StatusCode::OK => Ok(response), + StatusCode::NO_CONTENT => Ok(response), + StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => { + Err(ServiceError::Unauthorized) + }, + StatusCode::NOT_FOUND => { + // This is 404 and means that e.g. recipient is not registered + Err(ServiceError::NotFoundError) + }, + StatusCode::PAYLOAD_TOO_LARGE => { + // This is 413 and means rate limit exceeded for Signal. + Err(ServiceError::RateLimitExceeded) + }, + StatusCode::CONFLICT => { + let mismatched_devices = + Self::json(&mut response).await.map_err(|e| { + tracing::error!( + "Failed to decode HTTP 409 response: {}", + e + ); + ServiceError::UnhandledResponseCode { + http_code: StatusCode::CONFLICT.as_u16(), + } + })?; + Err(ServiceError::MismatchedDevicesException( + mismatched_devices, + )) + }, + StatusCode::GONE => { + let stale_devices = + Self::json(&mut response).await.map_err(|e| { + tracing::error!( + "Failed to decode HTTP 410 response: {}", + e + ); + ServiceError::UnhandledResponseCode { + http_code: StatusCode::GONE.as_u16(), + } + })?; + Err(ServiceError::StaleDevices(stale_devices)) + }, + StatusCode::LOCKED => { + let locked = Self::json(&mut response).await.map_err(|e| { + tracing::error!( + ?response, + "Failed to decode HTTP 423 response: {}", + e + ); + ServiceError::UnhandledResponseCode { + http_code: StatusCode::LOCKED.as_u16(), + } + })?; + Err(ServiceError::Locked(locked)) + }, + StatusCode::PRECONDITION_REQUIRED => { + let proof_required = + Self::json(&mut response).await.map_err(|e| { + tracing::error!( + "Failed to decode HTTP 428 response: {}", + e + ); + ServiceError::UnhandledResponseCode { + http_code: StatusCode::PRECONDITION_REQUIRED + .as_u16(), + } + })?; + Err(ServiceError::ProofRequiredError(proof_required)) + }, + // XXX: fill in rest from PushServiceSocket + code => { + tracing::trace!( + "Unhandled response {} with body: {}", + code.as_u16(), + Self::text(&mut response).await?, + ); + Err(ServiceError::UnhandledResponseCode { + http_code: code.as_u16(), + }) + }, + } + } + + async fn body( + response: &mut Response, + ) -> Result { + Ok(response + .collect() + .await + .map_err(|e| ServiceError::ResponseError { + reason: format!("failed to aggregate HTTP response body: {e}"), + })? + .aggregate()) + } + + #[tracing::instrument(skip(response), fields(status = %response.status()))] + async fn json( + response: &mut Response, + ) -> Result + where + for<'de> T: Deserialize<'de>, + { + let body = Self::body(response).await?; + + if body.has_remaining() { + serde_json::from_reader(body.reader()) + } else { + serde_json::from_value(serde_json::Value::Null) + } + .map_err(|e| ServiceError::JsonDecodeError { + reason: e.to_string(), + }) + } + + #[tracing::instrument(skip(response), fields(status = %response.status()))] + async fn protobuf( + response: &mut Response, + ) -> Result + where + M: ProtobufMessage + Default, + { + let body = Self::body(response).await?; + M::decode(body).map_err(ServiceError::ProtobufDecodeError) + } + + #[tracing::instrument(skip(response), fields(status = %response.status()))] + async fn text( + response: &mut Response, + ) -> Result { + let body = Self::body(response).await?; + io::read_to_string(body.reader()).map_err(|e| { + ServiceError::ResponseError { + reason: format!("failed to read HTTP response body: {e}"), + } + }) + } +} + +impl PushService { + #[tracing::instrument(skip(self))] + pub(crate) async fn get_json( &mut self, service: Endpoint, path: &str, @@ -622,8 +855,23 @@ pub trait PushService: MaybeSend { credentials_override: HttpAuthOverride, ) -> Result where - for<'de> T: Deserialize<'de>; + for<'de> T: Deserialize<'de>, + { + let mut response = self + .request( + Method::GET, + service, + path, + additional_headers, + credentials_override, + None, + ) + .await?; + Self::json(&mut response).await + } + + #[tracing::instrument(skip(self))] async fn delete_json( &mut self, service: Endpoint, @@ -631,9 +879,24 @@ pub trait PushService: MaybeSend { additional_headers: &[(&str, &str)], ) -> Result where - for<'de> T: Deserialize<'de>; + for<'de> T: Deserialize<'de>, + { + let mut response = self + .request( + Method::DELETE, + service, + path, + additional_headers, + HttpAuthOverride::NoOverride, + None, + ) + .await?; + + Self::json(&mut response).await + } - async fn put_json( + #[tracing::instrument(skip(self, value))] + pub async fn put_json( &mut self, service: Endpoint, path: &str, @@ -643,8 +906,32 @@ pub trait PushService: MaybeSend { ) -> Result where for<'de> D: Deserialize<'de>, - S: MaybeSend + Serialize; + S: Send + Serialize, + { + let json = serde_json::to_vec(&value).map_err(|e| { + ServiceError::JsonDecodeError { + reason: e.to_string(), + } + })?; + + let mut response = self + .request( + Method::PUT, + service, + path, + additional_headers, + credentials_override, + Some(RequestBody { + contents: json, + content_type: "application/json".into(), + }), + ) + .await?; + Self::json(&mut response).await + } + + #[tracing::instrument(skip(self, value))] async fn patch_json( &mut self, service: Endpoint, @@ -655,8 +942,32 @@ pub trait PushService: MaybeSend { ) -> Result where for<'de> D: Deserialize<'de>, - S: MaybeSend + Serialize; + S: Send + Serialize, + { + let json = serde_json::to_vec(&value).map_err(|e| { + ServiceError::JsonDecodeError { + reason: e.to_string(), + } + })?; + + let mut response = self + .request( + Method::PATCH, + service, + path, + additional_headers, + credentials_override, + Some(RequestBody { + contents: json, + content_type: "application/json".into(), + }), + ) + .await?; + Self::json(&mut response).await + } + + #[tracing::instrument(skip(self, value))] async fn post_json( &mut self, service: Endpoint, @@ -667,8 +978,32 @@ pub trait PushService: MaybeSend { ) -> Result where for<'de> D: Deserialize<'de>, - S: MaybeSend + Serialize; + S: Send + Serialize, + { + let json = serde_json::to_vec(&value).map_err(|e| { + ServiceError::JsonDecodeError { + reason: e.to_string(), + } + })?; + + let mut response = self + .request( + Method::POST, + service, + path, + additional_headers, + credentials_override, + Some(RequestBody { + contents: json, + content_type: "application/json".into(), + }), + ) + .await?; + Self::json(&mut response).await + } + + #[tracing::instrument(skip(self))] async fn get_protobuf( &mut self, service: Endpoint, @@ -677,8 +1012,23 @@ pub trait PushService: MaybeSend { credentials_override: HttpAuthOverride, ) -> Result where - T: Default + ProtobufMessage; + T: Default + ProtobufMessage, + { + let mut response = self + .request( + Method::GET, + service, + path, + additional_headers, + credentials_override, + None, + ) + .await?; + + Self::protobuf(&mut response).await + } + #[tracing::instrument(skip(self, value))] async fn put_protobuf( &mut self, service: Endpoint, @@ -688,39 +1038,154 @@ pub trait PushService: MaybeSend { ) -> Result where D: Default + ProtobufMessage, - S: Sized + ProtobufMessage; + S: Sized + ProtobufMessage, + { + let protobuf = value.encode_to_vec(); + + let mut response = self + .request( + Method::PUT, + service, + path, + additional_headers, + HttpAuthOverride::NoOverride, + Some(RequestBody { + contents: protobuf, + content_type: "application/x-protobuf".into(), + }), + ) + .await?; + + Self::protobuf(&mut response).await + } - /// Downloads larger files in streaming fashion, e.g. attachments. + #[tracing::instrument(skip(self))] async fn get_from_cdn( &mut self, cdn_id: u32, path: &str, - ) -> Result; + ) -> Result { + let response = self + .request( + Method::GET, + Endpoint::Cdn(cdn_id), + path, + &[], + HttpAuthOverride::Unidentified, // CDN requests are always without authentication + None, + ) + .await?; - /// Upload larger file to CDN0 in legacy fashion, e.g. for attachments. - /// - /// Implementations are allowed to *panic* when the Read instance throws an IO-Error - async fn post_to_cdn0<'s, C>( + Ok(Box::new( + response + .into_body() + .into_data_stream() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .into_async_read(), + )) + } + + #[tracing::instrument(skip(self, value, file), fields(file = file.as_ref().map(|_| "")))] + pub async fn post_to_cdn0<'s, C>( &mut self, path: &str, value: &[(&str, &str)], file: Option<(&str, &'s mut C)>, ) -> Result<(), ServiceError> where - C: std::io::Read + Send + 's; + C: io::Read + Send + 's, + { + let mut form = mpart_async::client::MultipartRequest::default(); + + // mpart-async has a peculiar ordering of the form items, + // and Amazon S3 expects them in a very specific order (i.e., the file contents should + // go last. + // + // mpart-async uses a VecDeque internally for ordering the fields in the order given. + // + // https://github.com/cetra3/mpart-async/issues/16 + + for &(k, v) in value { + form.add_field(k, v); + } + + if let Some((filename, file)) = file { + // XXX Actix doesn't cope with none-'static lifetimes + // https://docs.rs/actix-web/3.2.0/actix_web/body/enum.Body.html + let mut buf = Vec::new(); + file.read_to_end(&mut buf) + .expect("infallible Read instance"); + form.add_stream( + "file", + filename, + "application/octet-stream", + futures::future::ok::<_, ()>(Bytes::from(buf)).into_stream(), + ); + } + + let content_type = + format!("multipart/form-data; boundary={}", form.get_boundary()); + + // XXX Amazon S3 needs the Content-Length, but we don't know it without depleting the whole + // stream. Sadly, Content-Length != contents.len(), but should include the whole form. + let mut body_contents = vec![]; + while let Some(b) = form.next().await { + // Unwrap, because no error type was used above + body_contents.extend(b.unwrap()); + } + tracing::trace!( + "Sending PUT with Content-Type={} and length {}", + content_type, + body_contents.len() + ); + + let response = self + .request( + Method::POST, + Endpoint::Cdn(0), + path, + &[], + HttpAuthOverride::NoOverride, + Some(RequestBody { + contents: body_contents, + content_type, + }), + ) + .await?; + + debug!("HyperPushService::PUT response: {:?}", response); - async fn ws( + Ok(()) + } + + pub async fn ws( &mut self, path: &str, keepalive_path: &str, additional_headers: &[(&str, &str)], credentials: Option, - ) -> Result; + ) -> Result { + let span = debug_span!("websocket"); + let (ws, stream) = TungsteniteWebSocket::with_tls_config( + Self::tls_config(&self.cfg), + self.cfg.base_url(Endpoint::Service), + path, + additional_headers, + credentials.as_ref(), + ) + .instrument(span.clone()) + .await?; + let (ws, task) = + SignalWebSocket::from_socket(ws, stream, keepalive_path.to_owned()); + let task = task.instrument(span); + tokio::task::spawn(task); + Ok(ws) + } /// Fetches a list of all devices tied to the authenticated account. /// /// This list include the device that sends the request. - async fn devices(&mut self) -> Result, ServiceError> { + pub async fn devices(&mut self) -> Result, ServiceError> { #[derive(serde::Deserialize)] struct DeviceInfoList { devices: Vec, @@ -738,12 +1203,12 @@ pub trait PushService: MaybeSend { Ok(devices.devices) } - async fn unlink_device(&mut self, id: i64) -> Result<(), ServiceError> { + pub async fn unlink_device(&mut self, id: i64) -> Result<(), ServiceError> { self.delete_json(Endpoint::Service, &format!("/v1/devices/{}", id), &[]) .await } - async fn get_pre_key_status( + pub async fn get_pre_key_status( &mut self, service_id_type: ServiceIdType, ) -> Result { @@ -756,7 +1221,7 @@ pub trait PushService: MaybeSend { .await } - async fn register_pre_keys( + pub async fn register_pre_keys( &mut self, service_id_type: ServiceIdType, pre_key_state: PreKeyState, @@ -776,19 +1241,19 @@ pub trait PushService: MaybeSend { } } - async fn get_attachment_by_id( + pub async fn get_attachment_by_id( &mut self, id: &str, cdn_id: u32, - ) -> Result { + ) -> Result { let path = format!("attachments/{}", id); self.get_from_cdn(cdn_id, &path).await } - async fn get_attachment( + pub async fn get_attachment( &mut self, ptr: &AttachmentPointer, - ) -> Result { + ) -> Result { match ptr.attachment_identifier.as_ref().unwrap() { AttachmentIdentifier::CdnId(id) => { // cdn_number did not exist for this part of the protocol. @@ -803,24 +1268,24 @@ pub trait PushService: MaybeSend { } } - async fn get_sticker_pack_manifest( + pub async fn get_sticker_pack_manifest( &mut self, id: &str, - ) -> Result { + ) -> Result { let path = format!("/stickers/{}/manifest.proto", id); self.get_from_cdn(0, &path).await } - async fn get_sticker( + pub async fn get_sticker( &mut self, pack_id: &str, sticker_id: u32, - ) -> Result { + ) -> Result { let path = format!("/stickers/{}/full/{}", pack_id, sticker_id); self.get_from_cdn(0, &path).await } - async fn send_messages( + pub async fn send_messages( &mut self, messages: OutgoingPushMessages, ) -> Result { @@ -853,7 +1318,7 @@ pub trait PushService: MaybeSend { /// Upload attachment to CDN /// /// Returns attachment ID and the attachment digest - async fn upload_attachment<'s, C>( + pub async fn upload_attachment<'s, C>( &mut self, attrs: &AttachmentV2UploadAttributes, content: &'s mut C, @@ -883,7 +1348,7 @@ pub trait PushService: MaybeSend { Ok((attrs.attachment_id, digester.finalize())) } - async fn get_messages( + pub async fn get_messages( &mut self, allow_stories: bool, ) -> Result, ServiceError> { @@ -902,7 +1367,7 @@ pub trait PushService: MaybeSend { } /// Method used to check our own UUID - async fn whoami(&mut self) -> Result { + pub async fn whoami(&mut self) -> Result { self.get_json( Endpoint::Service, "/v1/accounts/whoami", @@ -912,7 +1377,7 @@ pub trait PushService: MaybeSend { .await } - async fn retrieve_profile_by_id( + pub async fn retrieve_profile_by_id( &mut self, address: ServiceAddress, profile_key: Option, @@ -937,21 +1402,21 @@ pub trait PushService: MaybeSend { .await } - async fn retrieve_profile_avatar( + pub async fn retrieve_profile_avatar( &mut self, path: &str, - ) -> Result { + ) -> Result { self.get_from_cdn(0, path).await } - async fn retrieve_groups_v2_profile_avatar( + pub async fn retrieve_groups_v2_profile_avatar( &mut self, path: &str, - ) -> Result { + ) -> Result { self.get_from_cdn(0, path).await } - async fn get_pre_key( + pub async fn get_pre_key( &mut self, destination: &ServiceAddress, device_id: u32, @@ -974,7 +1439,7 @@ pub trait PushService: MaybeSend { Ok(device.into_bundle(identity)?) } - async fn get_pre_keys( + pub(crate) async fn get_pre_keys( &mut self, destination: &ServiceAddress, device_id: u32, @@ -1000,7 +1465,7 @@ pub trait PushService: MaybeSend { Ok(pre_keys) } - async fn get_group( + pub(crate) async fn get_group( &mut self, credentials: HttpAuth, ) -> Result { @@ -1041,7 +1506,7 @@ pub trait PushService: MaybeSend { Ok(SenderCertificate::deserialize(&cert.certificate)?) } - async fn link_device( + pub async fn link_device( &mut self, link_request: &LinkRequest, http_auth: HttpAuth, @@ -1056,7 +1521,7 @@ pub trait PushService: MaybeSend { .await } - async fn set_account_attributes( + pub async fn set_account_attributes( &mut self, attributes: AccountAttributes, ) -> Result<(), ServiceError> { @@ -1086,7 +1551,7 @@ pub trait PushService: MaybeSend { /// See [`AccountManager`][struct@crate::AccountManager] for a convenience method. /// /// Java equivalent: `writeProfile` - async fn write_profile<'s, C, S>( + pub async fn write_profile<'s, C, S>( &mut self, version: &ProfileKeyVersion, name: &[u8], @@ -1309,7 +1774,7 @@ pub trait PushService: MaybeSend { Ok(res) } - async fn submit_registration_request<'a>( + pub async fn submit_registration_request<'a>( &mut self, registration_method: RegistrationMethod<'a>, account_attributes: AccountAttributes, @@ -1363,7 +1828,7 @@ pub trait PushService: MaybeSend { Ok(res) } - async fn distribute_pni_keys( + pub async fn distribute_pni_keys( &mut self, pni_identity_key: &IdentityKey, device_messages: Vec, @@ -1408,3 +1873,40 @@ pub trait PushService: MaybeSend { Ok(res) } } + +#[cfg(test)] +mod tests { + use crate::configuration::SignalServers; + use bytes::{Buf, Bytes}; + + #[test] + fn create_clients() { + let configs = &[SignalServers::Staging, SignalServers::Production]; + + for cfg in configs { + let _ = super::PushService::new( + cfg, + None, + "libsignal-service test".to_string(), + ); + } + } + + #[test] + fn serde_json_from_empty_reader() { + // This fails, so we have handle empty response body separately in HyperPushService::json() + let bytes: Bytes = "".into(); + assert!( + serde_json::from_reader::, String>( + bytes.reader() + ) + .is_err() + ); + } + + #[test] + fn serde_json_form_empty_vec() { + // If we're trying to send and empty payload, serde_json must be able to make a Vec out of it + assert!(serde_json::to_vec(b"").is_ok()); + } +} diff --git a/libsignal-service/src/receiver.rs b/src/receiver.rs similarity index 96% rename from libsignal-service/src/receiver.rs rename to src/receiver.rs index 5cb121256..2e3986976 100644 --- a/libsignal-service/src/receiver.rs +++ b/src/receiver.rs @@ -11,14 +11,14 @@ use crate::{ /// Equivalent of Java's `SignalServiceMessageReceiver`. #[derive(Clone)] -pub struct MessageReceiver { - service: Service, +pub struct MessageReceiver { + service: PushService, } -impl MessageReceiver { +impl MessageReceiver { // TODO: to avoid providing the wrong service/wrong credentials // change it like LinkingManager or ProvisioningManager - pub fn new(service: Service) -> Self { + pub fn new(service: PushService) -> Self { MessageReceiver { service } } diff --git a/libsignal-service/src/sender.rs b/src/sender.rs similarity index 99% rename from libsignal-service/src/sender.rs rename to src/sender.rs index 99e3d1273..81b8b0005 100644 --- a/libsignal-service/src/sender.rs +++ b/src/sender.rs @@ -84,10 +84,10 @@ pub struct AttachmentSpec { /// Equivalent of Java's `SignalServiceMessageSender`. #[derive(Clone)] -pub struct MessageSender { +pub struct MessageSender { identified_ws: SignalWebSocket, unidentified_ws: SignalWebSocket, - service: Service, + service: PushService, cipher: ServiceCipher, csprng: R, protocol_store: S, @@ -137,9 +137,8 @@ pub enum ThreadIdentifier { Group(GroupV2Id), } -impl MessageSender +impl MessageSender where - Service: PushService, S: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone, R: Rng + CryptoRng, { @@ -147,7 +146,7 @@ where pub fn new( identified_ws: SignalWebSocket, unidentified_ws: SignalWebSocket, - service: Service, + service: PushService, cipher: ServiceCipher, csprng: R, protocol_store: S, diff --git a/libsignal-service/src/service_address.rs b/src/service_address.rs similarity index 100% rename from libsignal-service/src/service_address.rs rename to src/service_address.rs diff --git a/libsignal-service/src/session_store.rs b/src/session_store.rs similarity index 100% rename from libsignal-service/src/session_store.rs rename to src/session_store.rs diff --git a/libsignal-service/src/sticker_cipher.rs b/src/sticker_cipher.rs similarity index 100% rename from libsignal-service/src/sticker_cipher.rs rename to src/sticker_cipher.rs diff --git a/libsignal-service/src/timestamp.rs b/src/timestamp.rs similarity index 100% rename from libsignal-service/src/timestamp.rs rename to src/timestamp.rs diff --git a/libsignal-service/src/unidentified_access.rs b/src/unidentified_access.rs similarity index 100% rename from libsignal-service/src/unidentified_access.rs rename to src/unidentified_access.rs diff --git a/libsignal-service/src/utils.rs b/src/utils.rs similarity index 100% rename from libsignal-service/src/utils.rs rename to src/utils.rs diff --git a/libsignal-service/src/websocket/attachment_service.rs b/src/websocket/attachment_service.rs similarity index 100% rename from libsignal-service/src/websocket/attachment_service.rs rename to src/websocket/attachment_service.rs diff --git a/libsignal-service/src/websocket.rs b/src/websocket/mod.rs similarity index 99% rename from libsignal-service/src/websocket.rs rename to src/websocket/mod.rs index a998cfbcd..26f3f4b3e 100644 --- a/libsignal-service/src/websocket.rs +++ b/src/websocket/mod.rs @@ -21,6 +21,7 @@ use crate::push_service::{MismatchedDevices, ServiceError}; mod attachment_service; mod sender; +pub(crate) mod tungstenite; type RequestStreamItem = ( WebSocketRequestMessage, diff --git a/libsignal-service/src/websocket/sender.rs b/src/websocket/sender.rs similarity index 100% rename from libsignal-service/src/websocket/sender.rs rename to src/websocket/sender.rs diff --git a/libsignal-service-hyper/src/websocket.rs b/src/websocket/tungstenite.rs similarity index 80% rename from libsignal-service-hyper/src/websocket.rs rename to src/websocket/tungstenite.rs index 89adf3ff9..ae40caf09 100644 --- a/libsignal-service-hyper/src/websocket.rs +++ b/src/websocket/tungstenite.rs @@ -14,23 +14,16 @@ use tokio::time::Instant; use tokio_rustls::rustls; use url::Url; -use libsignal_service::{ +use crate::{ configuration::ServiceCredentials, - messagepipe::*, push_service::{self, ServiceError}, - MaybeSend, }; -// This weird one-time trait is required because MaybeSend, unlike Send, is not -// an auto trait. Only auto traits can be used as additional traits in a trait object. -trait MaybeSendSink: Sink + MaybeSend {} -impl MaybeSendSink for T where - T: Sink + MaybeSend -{ -} +use crate::messagepipe::{WebSocketService, WebSocketStreamItem}; pub struct TungsteniteWebSocket { - socket_sink: Box, + socket_sink: + Box + Send + Unpin>, } #[derive(thiserror::Error, Debug)] @@ -57,33 +50,6 @@ impl From for ServiceError { } } -// impl From for ServiceError { -// fn from(e: AwcWebSocketError) -> ServiceError { -// match e { -// AwcWebSocketError::ConnectionError(e) => match e { -// WsClientError::InvalidResponseStatus(s) => match s { -// StatusCode::FORBIDDEN => ServiceError::Unauthorized, -// s => ServiceError::WsError { -// reason: format!("HTTP status {}", s), -// }, -// }, -// e => ServiceError::WsError { -// reason: e.to_string(), -// }, -// }, -// } -// } -// } - -// impl From for AwcWebSocketError { -// fn from(e: WsProtocolError) -> AwcWebSocketError { -// todo!("error conversion {:?}", e) -// // return Some(Err(ServiceError::WsError { -// // reason: e.to_string(), -// // })); -// } -// } - // Process the WebSocket, until it times out. async fn process( socket_stream: S, @@ -222,8 +188,7 @@ impl TungsteniteWebSocket { } } -#[cfg_attr(feature = "unsend-futures", async_trait::async_trait(?Send))] -#[cfg_attr(not(feature = "unsend-futures"), async_trait::async_trait)] +#[async_trait::async_trait] impl WebSocketService for TungsteniteWebSocket { type Stream = Receiver;