diff --git a/crates/matrix-sdk-crypto/src/machine/tests/megolm_sender_data.rs b/crates/matrix-sdk-crypto/src/machine/tests/megolm_sender_data.rs index 0c91b97471a..95d246b3301 100644 --- a/crates/matrix-sdk-crypto/src/machine/tests/megolm_sender_data.rs +++ b/crates/matrix-sdk-crypto/src/machine/tests/megolm_sender_data.rs @@ -1,18 +1,16 @@ -/* -Copyright 2024 The Matrix.org Foundation C.I.C. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ +// Copyright 2024 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. use std::{fmt::Debug, iter, pin::Pin}; @@ -23,6 +21,7 @@ use matrix_sdk_test::async_test; use ruma::{room_id, user_id, RoomId, TransactionId, UserId}; use serde::Serialize; use serde_json::json; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use crate::{ machine::{ @@ -305,13 +304,16 @@ where /// Given the `room_keys_received_stream`, check that there is a pending update, /// and pop it. fn get_room_key_received_update( - room_keys_received_stream: &mut Pin>>>, + room_keys_received_stream: &mut Pin< + Box, BroadcastStreamRecvError>>>, + >, ) -> RoomKeyInfo { room_keys_received_stream .next() .now_or_never() .flatten() .expect("We should have received an update of room key infos") + .unwrap() .pop() .expect("Received an empty room key info update") } diff --git a/crates/matrix-sdk-crypto/src/machine/tests/mod.rs b/crates/matrix-sdk-crypto/src/machine/tests/mod.rs index 86c2526e4b9..09ea5107254 100644 --- a/crates/matrix-sdk-crypto/src/machine/tests/mod.rs +++ b/crates/matrix-sdk-crypto/src/machine/tests/mod.rs @@ -530,7 +530,8 @@ async fn test_megolm_encryption() { .next() .now_or_never() .flatten() - .expect("We should have received an update of room key infos"); + .expect("We should have received an update of room key infos") + .unwrap(); assert_eq!(room_keys.len(), 1); assert_eq!(room_keys[0].session_id, group_session.session_id()); diff --git a/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs b/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs index 609c5fe8262..e7f40800095 100644 --- a/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs +++ b/crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs @@ -4,7 +4,10 @@ use futures_core::Stream; use futures_util::StreamExt; use matrix_sdk_common::store_locks::CrossProcessStoreLock; use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, UserId}; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::{ + broadcast::{self}, + Mutex, +}; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; use tracing::{debug, trace, warn}; @@ -292,11 +295,12 @@ impl CryptoStoreWrapper { /// the stream. Updates that happen at the same time are batched into a /// [`Vec`]. /// - /// If the reader of the stream lags too far behind, a warning will be - /// logged and items will be dropped. - pub fn room_keys_received_stream(&self) -> impl Stream> { - let stream = BroadcastStream::new(self.room_keys_received_sender.subscribe()); - Self::filter_errors_out_of_stream(stream, "room_keys_received_stream") + /// If the reader of the stream lags too far behind an error will be sent to + /// the reader. + pub fn room_keys_received_stream( + &self, + ) -> impl Stream, BroadcastStreamRecvError>> { + BroadcastStream::new(self.room_keys_received_sender.subscribe()) } /// Receive notifications of received `m.room_key.withheld` messages. diff --git a/crates/matrix-sdk-crypto/src/store/mod.rs b/crates/matrix-sdk-crypto/src/store/mod.rs index 631e3a33355..662829f8c35 100644 --- a/crates/matrix-sdk-crypto/src/store/mod.rs +++ b/crates/matrix-sdk-crypto/src/store/mod.rs @@ -57,6 +57,7 @@ use ruma::{ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use thiserror::Error; use tokio::sync::{Mutex, MutexGuard, Notify, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tracing::{info, warn}; use vodozemac::{base64_encode, megolm::SessionOrdering, Curve25519PublicKey}; use zeroize::{Zeroize, ZeroizeOnDrop}; @@ -1593,12 +1594,14 @@ impl Store { /// the stream. Updates that happen at the same time are batched into a /// [`Vec`]. /// - /// If the reader of the stream lags too far behind, a warning will be - /// logged and items will be dropped. + /// If the reader of the stream lags too far behind an error will be sent to + /// the reader. /// /// The stream will terminate once all references to the underlying /// `CryptoStoreWrapper` are dropped. - pub fn room_keys_received_stream(&self) -> impl Stream> { + pub fn room_keys_received_stream( + &self, + ) -> impl Stream, BroadcastStreamRecvError>> { self.inner.store.room_keys_received_stream() } @@ -2043,7 +2046,8 @@ mod tests { .next() .now_or_never() .flatten() - .expect("We should have received an update of room key infos"); + .expect("We should have received an update of room key infos") + .unwrap(); assert_eq!(room_keys.len(), 1); assert_eq!(room_keys[0].room_id, "!room1:localhost"); } diff --git a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs index 8056c14dac8..99d8fa10bb0 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs @@ -254,7 +254,7 @@ fn merge_stream_and_receiver( /// When a [`RoomList`] is displayed to the user, it can be in various states. /// This enum tries to represent those states with a correct level of /// abstraction. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum RoomListLoadingState { /// The [`RoomList`] has not been loaded yet, i.e. a sync might run /// or not run at all, there is nothing to show in this `RoomList` yet. diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index b3e7ec8cd01..44bba18431f 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -23,6 +23,7 @@ use matrix_sdk::{ }; use ruma::{events::AnySyncTimelineEvent, RoomVersionId}; use tokio::sync::broadcast::error::RecvError; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tracing::{info, info_span, trace, warn, Instrument, Span}; use super::{ @@ -433,6 +434,47 @@ impl TimelineBuilder { }) }; + // TODO: Technically, this should be the only stream we need to listen to get + // notified when we should retry to decrypt an event. We sadly can't do that, + // since the cross-process support kills the `OlmMachine` which then in + // turn kills this stream. Once this is solved remove all the other ways we + // listen for room keys. + let room_keys_received_join_handle = { + let inner = controller.clone(); + let stream = client.encryption().room_keys_received_stream().await.expect( + "We should be logged in by now, so we should have access to an OlmMachine \ + to be able to listen to this stream", + ); + + spawn(async move { + pin_mut!(stream); + + while let Some(room_keys) = stream.next().await { + let session_ids = match room_keys { + Ok(room_keys) => { + let session_ids: BTreeSet = room_keys + .into_iter() + .filter(|info| info.room_id == inner.room().room_id()) + .map(|info| info.session_id) + .collect(); + + Some(session_ids) + } + Err(BroadcastStreamRecvError::Lagged(missed_updates)) => { + // We lagged, let's retry to decrypt anything we have, maybe something + // was received. + warn!(missed_updates, "The room keys stream has lagged, retrying to decrypt the whole timeline"); + + None + } + }; + + let room = inner.room(); + inner.retry_event_decryption(room, session_ids).await; + } + }) + }; + let timeline = Timeline { controller, event_cache: room_event_cache, @@ -443,6 +485,7 @@ impl TimelineBuilder { pinned_events_join_handle, room_key_from_backups_join_handle, room_key_backup_enabled_join_handle, + room_keys_received_join_handle, local_echo_listener_handle, _event_cache_drop_handle: event_cache_drop, encryption_changes_handle, diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index 9a9623e3f02..927180093cc 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -832,6 +832,7 @@ struct TimelineDropHandle { room_update_join_handle: JoinHandle<()>, pinned_events_join_handle: Option>, room_key_from_backups_join_handle: JoinHandle<()>, + room_keys_received_join_handle: JoinHandle<()>, room_key_backup_enabled_join_handle: JoinHandle<()>, local_echo_listener_handle: JoinHandle<()>, _event_cache_drop_handle: Arc, @@ -852,6 +853,7 @@ impl Drop for TimelineDropHandle { self.room_update_join_handle.abort(); self.room_key_from_backups_join_handle.abort(); self.room_key_backup_enabled_join_handle.abort(); + self.room_keys_received_join_handle.abort(); self.encryption_changes_handle.abort(); } } diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index 78376cf99f8..2344fbc7779 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -31,6 +31,7 @@ use futures_util::{ stream::{self, StreamExt}, }; use matrix_sdk_base::crypto::{ + store::RoomKeyInfo, types::requests::{ OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest, }, @@ -58,6 +59,7 @@ use ruma::{ }; use serde::Deserialize; use tokio::sync::{Mutex, RwLockReadGuard}; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tracing::{debug, error, instrument, trace, warn}; use url::Url; use vodozemac::Curve25519PublicKey; @@ -1444,6 +1446,45 @@ impl Encryption { Ok(ret) } + /// Receive notifications of room keys being received as a [`Stream`]. + /// + /// Each time a room key is updated in any way, an update will be sent to + /// the stream. Updates that happen at the same time are batched into a + /// [`Vec`]. + /// + /// If the reader of the stream lags too far behind, an error is broadcast + /// containing the number of skipped items. + /// + /// # Examples + /// + /// ```no_run + /// # use matrix_sdk::Client; + /// # use url::Url; + /// # async { + /// # let homeserver = Url::parse("http://example.com")?; + /// # let client = Client::new(homeserver).await?; + /// use futures_util::StreamExt; + /// + /// let Some(mut room_keys_stream) = + /// client.encryption().room_keys_received_stream().await + /// else { + /// return Ok(()); + /// }; + /// + /// while let Some(update) = room_keys_stream.next().await { + /// println!("Received room keys {update:?}"); + /// } + /// # anyhow::Ok(()) }; + /// ``` + pub async fn room_keys_received_stream( + &self, + ) -> Option, BroadcastStreamRecvError>>> { + let olm = self.client.olm_machine().await; + let olm = olm.as_ref()?; + + Some(olm.store().room_keys_received_stream()) + } + /// Get the secret storage manager of the client. pub fn secret_storage(&self) -> SecretStorage { SecretStorage { client: self.client.to_owned() } diff --git a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs index 531ca7a1d21..0b63cfb1c26 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs @@ -25,16 +25,20 @@ use matrix_sdk::{ encryption::{backups::BackupState, EncryptionSettings}, room::edit::EditedContent, ruma::{ - api::client::room::create_room::v3::Request as CreateRoomRequest, + api::client::room::create_room::v3::{Request as CreateRoomRequest, RoomPreset}, events::{ room::{encryption::RoomEncryptionEventContent, message::RoomMessageEventContent}, InitialStateEvent, }, MilliSecondsSinceUnixEpoch, }, + RoomState, }; -use matrix_sdk_ui::timeline::{ - EventSendState, ReactionStatus, RoomExt, TimelineItem, TimelineItemContent, +use matrix_sdk_ui::{ + notification_client::NotificationClient, + room_list_service::RoomListLoadingState, + sync_service::SyncService, + timeline::{EventSendState, ReactionStatus, RoomExt, TimelineItem, TimelineItemContent}, }; use similar_asserts::assert_eq; use tokio::{ @@ -357,7 +361,7 @@ async fn test_enabling_backups_retries_decryption() { .create_room(assign!(CreateRoomRequest::new(), { is_direct: true, initial_state, - preset: Some(matrix_sdk::ruma::api::client::room::create_room::v3::RoomPreset::PrivateChat) + preset: Some(RoomPreset::PrivateChat) })) .await .unwrap(); @@ -462,3 +466,165 @@ async fn test_enabling_backups_retries_decryption() { bob_sync.abort(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_room_keys_received_on_notification_client_trigger_redecryption() { + let alice = TestClientBuilder::new("alice").use_sqlite().build().await.unwrap(); + alice.encryption().wait_for_e2ee_initialization_tasks().await; + + // Set up sync for user Alice, and create a room. + let alice_sync = spawn({ + let alice = alice.clone(); + async move { + alice.sync(Default::default()).await.expect("sync failed!"); + } + }); + + debug!("Creating the room…"); + + // The room needs to be encrypted. + let initial_state = + vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults()) + .to_raw_any()]; + + let alice_room = alice + .create_room(assign!(CreateRoomRequest::new(), { + is_direct: true, + initial_state, + preset: Some(RoomPreset::PrivateChat) + })) + .await + .unwrap(); + + assert!(alice_room + .is_encrypted() + .await + .expect("We should be able to check that the room is encrypted")); + + // Now here comes bob. + let bob = TestClientBuilder::new("bob").use_sqlite().build().await.unwrap(); + bob.encryption().wait_for_e2ee_initialization_tasks().await; + + debug!("Inviting Bob."); + + alice_room + .invite_user_by_id(bob.user_id().expect("We should have access to bob's user ID")) + .await + .expect("We should be able to invite Bob to the room"); + + // Sync once to get access to the room. + let sync_service = SyncService::builder(bob.clone()).build().await.expect("Wat"); + sync_service.start().await; + + let bob_rooms = sync_service + .room_list_service() + .all_rooms() + .await + .expect("We should be able to get the room"); + + debug!("Waiting for the room list to load"); + let wait_for_room_list_load = async { + while let Some(state) = bob_rooms.loading_state().next().await { + if let RoomListLoadingState::Loaded { .. } = state { + break; + } + } + }; + + timeout(Duration::from_secs(5), wait_for_room_list_load) + .await + .expect("We should be able to load the room list"); + + // Bob joins the room. + let bob_room = + bob.get_room(alice_room.room_id()).expect("We should have access to the room now"); + bob_room.join().await.expect("Bob should be able to join the room"); + + debug!("Bob joined the room"); + assert_eq!(bob_room.state(), RoomState::Joined); + assert!(bob_room.is_encrypted().await.unwrap()); + + // Let's stop the sync so we don't receive the room key using the usual channel. + sync_service.stop().await.expect("We should be able to stop the sync service"); + + debug!("Alice sends the message"); + let event_id = alice_room + .send(RoomMessageEventContent::text_plain("It's a secret to everybody!")) + .await + .expect("We should be able to send a message to our new room") + .event_id; + + // We don't need Alice anymore. + alice_sync.abort(); + + // Let's get the timeline and backpaginate to load the event. + let mut timeline = + bob_room.timeline().await.expect("We should be able to get a timeline for our room"); + + let mut item = None; + + for _ in 0..10 { + timeline + .paginate_backwards(50) + .await + .expect("We should be able to paginate the timeline to fetch the history"); + + if let Some(timeline_item) = timeline.item_by_event_id(&event_id).await { + item = Some(timeline_item); + break; + } else { + timeline = bob_room.timeline().await.expect("We should be able to reset our timeline"); + sleep(Duration::from_millis(100)).await + } + } + + let item = item.expect("The event should be in the timeline by now"); + + // The event is not decrypted yet. + assert_matches!(item.content(), TimelineItemContent::UnableToDecrypt(_)); + + // Let's subscribe to our timeline so we don't miss the transition from UTD to + // decrypted event. + let (_, mut stream) = timeline + .subscribe_filter_map(|item| { + item.as_event().cloned().filter(|item| item.event_id() == Some(&event_id)) + }) + .await; + + // Now we create a notification client. + let notification_client = bob + .notification_client("BOB_NOTIFICATION_CLIENT".to_owned()) + .await + .expect("We should be able to build a notification client"); + + // This syncs and fetches the room key. + debug!("The notification client syncs"); + let notification_client = NotificationClient::new( + notification_client, + matrix_sdk_ui::notification_client::NotificationProcessSetup::SingleProcess { + sync_service: sync_service.into(), + }, + ) + .await + .expect("We should be able to build a notification client"); + + let _ = notification_client + .get_notification(bob_room.room_id(), &event_id) + .await + .expect("We should be able toe get a notification item for the given event"); + + // Alright, we should now receive an update that the event had been decrypted. + let _vector_diff = timeout(Duration::from_secs(5), stream.next()).await.unwrap().unwrap(); + + // Let's fetch the event again. + let item = + timeline.item_by_event_id(&event_id).await.expect("The event should be in the timeline"); + + // Yup it's decrypted great. + assert_let!( + TimelineItemContent::Message(message) = item.content(), + "The event should have been decrypted now" + ); + + assert_eq!(message.body(), "It's a secret to everybody!"); +}