From e0d9db2cc26eaeace07d434383b119d7dbee3f8a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 5 Jul 2024 18:23:28 +0100 Subject: [PATCH 1/3] crypto: Factor out helper method in CryptoStoreWrapper Let's stop duplicating this code --- .../src/store/crypto_store_wrapper.rs | 56 ++++++++----------- 1 file changed, 23 insertions(+), 33 deletions(-) diff --git a/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs b/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs index 9510d4595c4..8e4b8bf90f7 100644 --- a/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs +++ b/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs @@ -1,4 +1,4 @@ -use std::{ops::Deref, sync::Arc}; +use std::{future, ops::Deref, sync::Arc}; use futures_core::Stream; use futures_util::StreamExt; @@ -131,38 +131,14 @@ impl CryptoStoreWrapper { /// logged and items will be dropped. pub fn room_keys_received_stream(&self) -> impl Stream> { let stream = BroadcastStream::new(self.room_keys_received_sender.subscribe()); - - // the raw BroadcastStream gives us Results which can fail with - // BroadcastStreamRecvError if the reader falls behind. That's annoying to work - // with, so here we just drop the errors. - stream.filter_map(|result| async move { - match result { - Ok(r) => Some(r), - Err(BroadcastStreamRecvError::Lagged(lag)) => { - warn!("room_keys_received_stream missed {lag} updates"); - None - } - } - }) + Self::filter_errors_out_of_stream(stream, "room_keys_received_stream") } /// Receive notifications of gossipped secrets being received and stored in /// the secret inbox as a [`Stream`]. pub fn secrets_stream(&self) -> impl Stream { let stream = BroadcastStream::new(self.secrets_broadcaster.subscribe()); - - // the raw BroadcastStream gives us Results which can fail with - // BroadcastStreamRecvError if the reader falls behind. That's annoying to work - // with, so here we just drop the errors. - stream.filter_map(|result| async move { - match result { - Ok(r) => Some(r), - Err(BroadcastStreamRecvError::Lagged(lag)) => { - warn!("secrets_stream missed {lag} updates"); - None - } - } - }) + Self::filter_errors_out_of_stream(stream, "secrets_stream") } /// Returns a stream of newly created or updated cryptographic identities. @@ -173,17 +149,31 @@ impl CryptoStoreWrapper { &self, ) -> impl Stream, IdentityChanges, DeviceChanges)> { let stream = BroadcastStream::new(self.identities_broadcaster.subscribe()); + Self::filter_errors_out_of_stream(stream, "identities_stream") + } - // See the comment in the [`Store::room_keys_received_stream()`] on why we're - // ignoring the lagged error. - stream.filter_map(|result| async move { - match result { + /// Helper for *_stream functions: filters errors out of the stream, + /// creating a new Stream. + /// + /// `BroadcastStream`s gives us `Result`s which can fail with + /// `BroadcastStreamRecvError` if the reader falls behind. That's annoying + /// to work with, so here we just emit a warning and drop the errors. + fn filter_errors_out_of_stream( + stream: BroadcastStream, + stream_name: &str, + ) -> impl Stream + where + ItemType: 'static + Clone + Send, + { + let stream_name = stream_name.to_owned(); + stream.filter_map(move |result| { + future::ready(match result { Ok(r) => Some(r), Err(BroadcastStreamRecvError::Lagged(lag)) => { - warn!("devices_stream missed {lag} updates"); + warn!("{stream_name} missed {lag} updates"); None } - } + }) }) } From e900d6480e778c9e5d793f53ac7c7fe92ab28062 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 5 Jul 2024 18:24:12 +0100 Subject: [PATCH 2/3] crypto: Expose new stream about room_key withheld messages --- crates/matrix-sdk-crypto/CHANGELOG.md | 4 +++ crates/matrix-sdk-crypto/src/machine.rs | 22 ++++++++++++- .../src/store/crypto_store_wrapper.rs | 33 +++++++++++++++++++ crates/matrix-sdk-crypto/src/store/mod.rs | 14 ++++++++ 4 files changed, 72 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk-crypto/CHANGELOG.md b/crates/matrix-sdk-crypto/CHANGELOG.md index b79a354c4e3..7ced4886c82 100644 --- a/crates/matrix-sdk-crypto/CHANGELOG.md +++ b/crates/matrix-sdk-crypto/CHANGELOG.md @@ -60,6 +60,10 @@ Deprecations: Additions: +- Expose new method `OlmMachine::room_keys_withheld_received_stream`, to allow + applications to receive notifications about received `m.room_key.withheld` + events. + ([#3660](https://github.com/matrix-org/matrix-rust-sdk/pull/3660)) - Expose new method `OlmMachine::clear_crypto_cache()`, with FFI bindings ([#3462](https://github.com/matrix-org/matrix-rust-sdk/pull/3462)) diff --git a/crates/matrix-sdk-crypto/src/machine.rs b/crates/matrix-sdk-crypto/src/machine.rs index 95822e77454..6ce8d91425a 100644 --- a/crates/matrix-sdk-crypto/src/machine.rs +++ b/crates/matrix-sdk-crypto/src/machine.rs @@ -2428,7 +2428,9 @@ pub(crate) mod tests { types::{ events::{ room::encrypted::{EncryptedToDeviceEvent, ToDeviceEncryptedEventContent}, - room_key_withheld::{RoomKeyWithheldContent, WithheldCode}, + room_key_withheld::{ + MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent, WithheldCode, + }, ToDeviceEvent, }, CrossSigningKey, DeviceKeys, EventEncryptionAlgorithm, SignedKey, SigningKeys, @@ -3241,6 +3243,9 @@ pub(crate) mod tests { get_machine_pair_with_setup_sessions_test_helper(alice_id(), user_id(), false).await; let room_id = room_id!("!test:example.org"); + let mut room_keys_withheld_received_stream = + Box::pin(bob.store().room_keys_withheld_received_stream()); + let encryption_settings = EncryptionSettings::default(); let encryption_settings = EncryptionSettings { sharing_strategy: CollectStrategy::new_device_based(true), @@ -3280,6 +3285,21 @@ pub(crate) mod tests { .await .unwrap(); + // We should receive a notification on the room_keys_withheld_received_stream + let withheld_received = room_keys_withheld_received_stream + .next() + .now_or_never() + .flatten() + .expect("We should have received a notification of room key being withheld"); + assert_eq!(withheld_received.len(), 1); + assert_matches!( + &withheld_received[0].content, + RoomKeyWithheldContent::MegolmV1AesSha2(MegolmV1AesSha2WithheldContent::Unverified( + unverified_withheld_content + )) + ); + assert_eq!(unverified_withheld_content.room_id, room_id); + let plaintext = "You shouldn't be able to decrypt that message"; let content = RoomMessageEventContent::text_plain(plaintext); diff --git a/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs b/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs index 8e4b8bf90f7..5ac800ddd13 100644 --- a/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs +++ b/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs @@ -13,6 +13,7 @@ use crate::{ olm::InboundGroupSession, store, store::{Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo}, + types::events::room_key_withheld::RoomKeyWithheldEvent, GossippedSecret, ReadOnlyOwnUserIdentity, }; @@ -29,6 +30,10 @@ pub(crate) struct CryptoStoreWrapper { /// an update to an inbound group session. room_keys_received_sender: broadcast::Sender>, + /// The sender side of a broadcast stream that is notified whenever we + /// receive an `m.room_key.withheld` message. + room_keys_withheld_received_sender: broadcast::Sender>, + /// The sender side of a broadcast channel which sends out secrets we /// received as a `m.secret.send` event. secrets_broadcaster: broadcast::Sender, @@ -42,6 +47,7 @@ pub(crate) struct CryptoStoreWrapper { impl CryptoStoreWrapper { pub(crate) fn new(user_id: &UserId, store: impl IntoCryptoStore) -> Self { let room_keys_received_sender = broadcast::Sender::new(10); + let room_keys_withheld_received_sender = broadcast::Sender::new(10); let secrets_broadcaster = broadcast::Sender::new(10); // The identities broadcaster is responsible for user identities as well as // devices, that's why we increase the capacity here. @@ -51,6 +57,7 @@ impl CryptoStoreWrapper { user_id: user_id.to_owned(), store: store.into_crypto_store(), room_keys_received_sender, + room_keys_withheld_received_sender, secrets_broadcaster, identities_broadcaster, } @@ -68,6 +75,13 @@ impl CryptoStoreWrapper { let room_key_updates: Vec<_> = changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect(); + let withheld_session_updates: Vec<_> = changes + .withheld_session_info + .values() + .flat_map(|session_map| session_map.values()) + .map(Clone::clone) + .collect(); + let secrets = changes.secrets.to_owned(); let devices = changes.devices.to_owned(); let identities = changes.identities.to_owned(); @@ -79,6 +93,10 @@ impl CryptoStoreWrapper { let _ = self.room_keys_received_sender.send(room_key_updates); } + if !withheld_session_updates.is_empty() { + let _ = self.room_keys_withheld_received_sender.send(withheld_session_updates); + } + for secret in secrets { let _ = self.secrets_broadcaster.send(secret); } @@ -134,6 +152,21 @@ impl CryptoStoreWrapper { Self::filter_errors_out_of_stream(stream, "room_keys_received_stream") } + /// Receive notifications of received `m.room_key.withheld` messages. + /// + /// Each time an `m.room_key.withheld` is received and stored, an update + /// will be sent to the stream. Updates that happen at the same time are + /// batched into a [`Vec`]. + /// + /// If the reader of the stream lags too far behind, a warning will be + /// logged and items will be dropped. + pub fn room_keys_withheld_received_stream( + &self, + ) -> impl Stream> { + let stream = BroadcastStream::new(self.room_keys_withheld_received_sender.subscribe()); + Self::filter_errors_out_of_stream(stream, "room_keys_withheld_received_stream") + } + /// Receive notifications of gossipped secrets being received and stored in /// the secret inbox as a [`Stream`]. pub fn secrets_stream(&self) -> impl Stream { diff --git a/crates/matrix-sdk-crypto/src/store/mod.rs b/crates/matrix-sdk-crypto/src/store/mod.rs index 60a7c0e3694..4ed122a132e 100644 --- a/crates/matrix-sdk-crypto/src/store/mod.rs +++ b/crates/matrix-sdk-crypto/src/store/mod.rs @@ -1463,6 +1463,20 @@ impl Store { self.inner.store.room_keys_received_stream() } + /// Receive notifications of received `m.room_key.withheld` messages. + /// + /// Each time an `m.room_key.withheld` is received and stored, an update + /// will be sent to the stream. Updates that happen at the same time are + /// batched into a [`Vec`]. + /// + /// If the reader of the stream lags too far behind, a warning will be + /// logged and items will be dropped. + pub fn room_keys_withheld_received_stream( + &self, + ) -> impl Stream> { + self.inner.store.room_keys_withheld_received_stream() + } + /// Returns a stream of user identity updates, allowing users to listen for /// notifications about new or changed user identities. /// From 6b38e9906e3ed687a7f7c7309ad97fb8db295496 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 9 Jul 2024 14:59:49 +0100 Subject: [PATCH 3/3] Address review comments --- crates/matrix-sdk-crypto/src/machine.rs | 6 +++--- crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/matrix-sdk-crypto/src/machine.rs b/crates/matrix-sdk-crypto/src/machine.rs index 6ce8d91425a..10dafe93c86 100644 --- a/crates/matrix-sdk-crypto/src/machine.rs +++ b/crates/matrix-sdk-crypto/src/machine.rs @@ -2372,7 +2372,7 @@ pub(crate) mod tests { }; use assert_matches2::{assert_let, assert_matches}; - use futures_util::{FutureExt, StreamExt}; + use futures_util::{pin_mut, FutureExt, StreamExt}; use itertools::Itertools; use matrix_sdk_common::deserialized_responses::{ DeviceLinkProblem, ShieldState, UnableToDecryptInfo, UnsignedDecryptionResult, @@ -3243,8 +3243,8 @@ pub(crate) mod tests { get_machine_pair_with_setup_sessions_test_helper(alice_id(), user_id(), false).await; let room_id = room_id!("!test:example.org"); - let mut room_keys_withheld_received_stream = - Box::pin(bob.store().room_keys_withheld_received_stream()); + let room_keys_withheld_received_stream = bob.store().room_keys_withheld_received_stream(); + pin_mut!(room_keys_withheld_received_stream); let encryption_settings = EncryptionSettings::default(); let encryption_settings = EncryptionSettings { diff --git a/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs b/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs index 5ac800ddd13..da1884cd4de 100644 --- a/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs +++ b/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs @@ -78,8 +78,7 @@ impl CryptoStoreWrapper { let withheld_session_updates: Vec<_> = changes .withheld_session_info .values() - .flat_map(|session_map| session_map.values()) - .map(Clone::clone) + .flat_map(|session_map| session_map.values().cloned()) .collect(); let secrets = changes.secrets.to_owned();