Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crypto: Expose new stream about room_key withheld messages #3660

Merged
merged 3 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/matrix-sdk-crypto/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ Deprecations:

Additions:

- Expose new method `OlmMachine::room_keys_withheld_received_stream`, to allow
applications to receive notifications about received `m.room_key.withheld`
events.
([#3660](https://github.com/matrix-org/matrix-rust-sdk/pull/3660))

- Expose new method `OlmMachine::clear_crypto_cache()`, with FFI bindings
([#3462](https://github.com/matrix-org/matrix-rust-sdk/pull/3462))
Expand Down
24 changes: 22 additions & 2 deletions crates/matrix-sdk-crypto/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2372,7 +2372,7 @@ pub(crate) mod tests {
};

use assert_matches2::{assert_let, assert_matches};
use futures_util::{FutureExt, StreamExt};
use futures_util::{pin_mut, FutureExt, StreamExt};
use itertools::Itertools;
use matrix_sdk_common::deserialized_responses::{
DeviceLinkProblem, ShieldState, UnableToDecryptInfo, UnsignedDecryptionResult,
Expand Down Expand Up @@ -2428,7 +2428,9 @@ pub(crate) mod tests {
types::{
events::{
room::encrypted::{EncryptedToDeviceEvent, ToDeviceEncryptedEventContent},
room_key_withheld::{RoomKeyWithheldContent, WithheldCode},
room_key_withheld::{
MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent, WithheldCode,
},
ToDeviceEvent,
},
CrossSigningKey, DeviceKeys, EventEncryptionAlgorithm, SignedKey, SigningKeys,
Expand Down Expand Up @@ -3241,6 +3243,9 @@ pub(crate) mod tests {
get_machine_pair_with_setup_sessions_test_helper(alice_id(), user_id(), false).await;
let room_id = room_id!("!test:example.org");

let room_keys_withheld_received_stream = bob.store().room_keys_withheld_received_stream();
pin_mut!(room_keys_withheld_received_stream);

let encryption_settings = EncryptionSettings::default();
let encryption_settings = EncryptionSettings {
sharing_strategy: CollectStrategy::new_device_based(true),
Expand Down Expand Up @@ -3280,6 +3285,21 @@ pub(crate) mod tests {
.await
.unwrap();

// We should receive a notification on the room_keys_withheld_received_stream
let withheld_received = room_keys_withheld_received_stream
.next()
.now_or_never()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To use now_or_never, you must be sure that the value will be immediately available in the Stream, otherwise the test will be flaky.

Is it a problem to use next().await here instead? The risk is that the test might be pending forever if something changes in the future, but it's still an indication that a regression has been detected. The pros, however, is that we are sure the test can never be flaky.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a problem to use next().await here instead?

I guess not. TBH I pretty much cargo-culted this from test_megolm_encryption above, which afaik has never shown any sign of being flaky. Flipping it around: is there any reason to believe that the value would not already be available in the Stream?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream is created before we receive the sync changes, the sync changes trigger an update, this means that the now_or_never() should never fail here, unless we introduce a bug.

There are no hidden tasks/threads or requests being sent out in receive_sync_changes() it's all a pretty dumb, for better or worse, state machine.

now_or_never() is the right thing to use, otherwise a regression will produce a hang instead of a loud test failure.

.flatten()
.expect("We should have received a notification of room key being withheld");
assert_eq!(withheld_received.len(), 1);
assert_matches!(
&withheld_received[0].content,
RoomKeyWithheldContent::MegolmV1AesSha2(MegolmV1AesSha2WithheldContent::Unverified(
unverified_withheld_content
))
);
assert_eq!(unverified_withheld_content.room_id, room_id);

let plaintext = "You shouldn't be able to decrypt that message";

let content = RoomMessageEventContent::text_plain(plaintext);
Expand Down
86 changes: 54 additions & 32 deletions crates/matrix-sdk-crypto/src/store/crypto_store_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{ops::Deref, sync::Arc};
use std::{future, ops::Deref, sync::Arc};

use futures_core::Stream;
use futures_util::StreamExt;
Expand All @@ -13,6 +13,7 @@ use crate::{
olm::InboundGroupSession,
store,
store::{Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo},
types::events::room_key_withheld::RoomKeyWithheldEvent,
GossippedSecret, ReadOnlyOwnUserIdentity,
};

Expand All @@ -29,6 +30,10 @@ pub(crate) struct CryptoStoreWrapper {
/// an update to an inbound group session.
room_keys_received_sender: broadcast::Sender<Vec<RoomKeyInfo>>,

/// The sender side of a broadcast stream that is notified whenever we
/// receive an `m.room_key.withheld` message.
room_keys_withheld_received_sender: broadcast::Sender<Vec<RoomKeyWithheldEvent>>,

/// The sender side of a broadcast channel which sends out secrets we
/// received as a `m.secret.send` event.
secrets_broadcaster: broadcast::Sender<GossippedSecret>,
Expand All @@ -42,6 +47,7 @@ pub(crate) struct CryptoStoreWrapper {
impl CryptoStoreWrapper {
pub(crate) fn new(user_id: &UserId, store: impl IntoCryptoStore) -> Self {
let room_keys_received_sender = broadcast::Sender::new(10);
let room_keys_withheld_received_sender = broadcast::Sender::new(10);
let secrets_broadcaster = broadcast::Sender::new(10);
// The identities broadcaster is responsible for user identities as well as
// devices, that's why we increase the capacity here.
Expand All @@ -51,6 +57,7 @@ impl CryptoStoreWrapper {
user_id: user_id.to_owned(),
store: store.into_crypto_store(),
room_keys_received_sender,
room_keys_withheld_received_sender,
secrets_broadcaster,
identities_broadcaster,
}
Expand All @@ -68,6 +75,12 @@ impl CryptoStoreWrapper {
let room_key_updates: Vec<_> =
changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();

let withheld_session_updates: Vec<_> = changes
.withheld_session_info
.values()
.flat_map(|session_map| session_map.values().cloned())
.collect();

let secrets = changes.secrets.to_owned();
let devices = changes.devices.to_owned();
let identities = changes.identities.to_owned();
Expand All @@ -79,6 +92,10 @@ impl CryptoStoreWrapper {
let _ = self.room_keys_received_sender.send(room_key_updates);
}

if !withheld_session_updates.is_empty() {
let _ = self.room_keys_withheld_received_sender.send(withheld_session_updates);
}

for secret in secrets {
let _ = self.secrets_broadcaster.send(secret);
}
Expand Down Expand Up @@ -131,38 +148,29 @@ impl CryptoStoreWrapper {
/// logged and items will be dropped.
pub fn room_keys_received_stream(&self) -> impl Stream<Item = Vec<RoomKeyInfo>> {
let stream = BroadcastStream::new(self.room_keys_received_sender.subscribe());
Self::filter_errors_out_of_stream(stream, "room_keys_received_stream")
}

// the raw BroadcastStream gives us Results which can fail with
// BroadcastStreamRecvError if the reader falls behind. That's annoying to work
// with, so here we just drop the errors.
stream.filter_map(|result| async move {
match result {
Ok(r) => Some(r),
Err(BroadcastStreamRecvError::Lagged(lag)) => {
warn!("room_keys_received_stream missed {lag} updates");
None
}
}
})
/// Receive notifications of received `m.room_key.withheld` messages.
///
/// Each time an `m.room_key.withheld` is received and stored, an update
/// will be sent to the stream. Updates that happen at the same time are
/// batched into a [`Vec`].
///
/// If the reader of the stream lags too far behind, a warning will be
/// logged and items will be dropped.
pub fn room_keys_withheld_received_stream(
&self,
) -> impl Stream<Item = Vec<RoomKeyWithheldEvent>> {
let stream = BroadcastStream::new(self.room_keys_withheld_received_sender.subscribe());
Self::filter_errors_out_of_stream(stream, "room_keys_withheld_received_stream")
}

/// Receive notifications of gossipped secrets being received and stored in
/// the secret inbox as a [`Stream`].
pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> {
let stream = BroadcastStream::new(self.secrets_broadcaster.subscribe());

// the raw BroadcastStream gives us Results which can fail with
// BroadcastStreamRecvError if the reader falls behind. That's annoying to work
// with, so here we just drop the errors.
stream.filter_map(|result| async move {
match result {
Ok(r) => Some(r),
Err(BroadcastStreamRecvError::Lagged(lag)) => {
warn!("secrets_stream missed {lag} updates");
None
}
}
})
Self::filter_errors_out_of_stream(stream, "secrets_stream")
}

/// Returns a stream of newly created or updated cryptographic identities.
Expand All @@ -173,17 +181,31 @@ impl CryptoStoreWrapper {
&self,
) -> impl Stream<Item = (Option<ReadOnlyOwnUserIdentity>, IdentityChanges, DeviceChanges)> {
let stream = BroadcastStream::new(self.identities_broadcaster.subscribe());
Self::filter_errors_out_of_stream(stream, "identities_stream")
}

// See the comment in the [`Store::room_keys_received_stream()`] on why we're
// ignoring the lagged error.
stream.filter_map(|result| async move {
match result {
/// Helper for *_stream functions: filters errors out of the stream,
/// creating a new Stream.
///
/// `BroadcastStream`s gives us `Result`s which can fail with
/// `BroadcastStreamRecvError` if the reader falls behind. That's annoying
/// to work with, so here we just emit a warning and drop the errors.
fn filter_errors_out_of_stream<ItemType>(
stream: BroadcastStream<ItemType>,
stream_name: &str,
) -> impl Stream<Item = ItemType>
where
ItemType: 'static + Clone + Send,
{
let stream_name = stream_name.to_owned();
stream.filter_map(move |result| {
future::ready(match result {
Ok(r) => Some(r),
Err(BroadcastStreamRecvError::Lagged(lag)) => {
warn!("devices_stream missed {lag} updates");
warn!("{stream_name} missed {lag} updates");
None
}
}
})
})
}

Expand Down
14 changes: 14 additions & 0 deletions crates/matrix-sdk-crypto/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,20 @@ impl Store {
self.inner.store.room_keys_received_stream()
}

/// Receive notifications of received `m.room_key.withheld` messages.
///
/// Each time an `m.room_key.withheld` is received and stored, an update
/// will be sent to the stream. Updates that happen at the same time are
/// batched into a [`Vec`].
///
/// If the reader of the stream lags too far behind, a warning will be
/// logged and items will be dropped.
pub fn room_keys_withheld_received_stream(
&self,
) -> impl Stream<Item = Vec<RoomKeyWithheldEvent>> {
self.inner.store.room_keys_withheld_received_stream()
}

/// Returns a stream of user identity updates, allowing users to listen for
/// notifications about new or changed user identities.
///
Expand Down
Loading