Skip to content

Commit

Permalink
Add OlmMachine.registerRoomKeysWithheldCallback (#136)
Browse files Browse the repository at this point in the history
Expose the new stream added in matrix-org/matrix-rust-sdk#3660 and matrix-org/matrix-rust-sdk#3674 via a new callback.
  • Loading branch information
richvdh authored Jul 11, 2024
1 parent 410a072 commit b0a4ac5
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 91 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@
- `EncryptionSettings.onlyAllowTrustedDevices` has been replaced with
`EncryptionSettings.sharingStrategy`, which adds the ability to share only
with cross-signed devices.
([#134](https://github.com/matrix-org/matrix-rust-sdk-crypto-wasm/pull/134))

**Other changes**

- Update matrix-rust-sdk to `11cbf849c`, which includes:
- Add `OlmMachine.registerRoomKeysWithheldCallback` to notify when we are
told that room keys have been withheld.
([#136](https://github.com/matrix-org/matrix-rust-sdk-crypto-wasm/pull/136))

- Update matrix-rust-sdk to `8d54bd92d`, which includes:

- refactor(sdk-crypto): Room key sharing, introduce extensible strategy
([#3605](https://github.com/matrix-org/matrix-rust-sdk/pull/3605))
Expand Down
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

185 changes: 101 additions & 84 deletions src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::{
collections::{BTreeMap, HashSet},
iter,
ops::Deref,
pin::{pin, Pin},
time::Duration,
Expand Down Expand Up @@ -36,7 +37,7 @@ use crate::{
requests::{outgoing_request_to_js_value, CrossSigningBootstrapRequests, ToDeviceRequest},
responses::{self, response_from_string},
store,
store::{RoomKeyInfo, StoreHandle},
store::{RoomKeyInfo, RoomKeyWithheldInfo, StoreHandle},
sync_events,
types::{self, RoomKeyImportResult, RoomSettings, SignatureVerification},
verification, vodozemac,
Expand Down Expand Up @@ -1305,14 +1306,41 @@ impl OlmMachine {
pub async fn register_room_key_updated_callback(&self, callback: Function) {
let stream = self.inner.store().room_keys_received_stream();

// fire up a promise chain which will call `cb` on each result from the stream
spawn_local(async move {
// take a reference to `callback` (which we then pass into the closure), to stop
// the callback being moved into the closure (which would mean we could only
// call the closure once)
let callback_ref = &callback;
stream.for_each(move |item| send_room_key_info_to_callback(callback_ref, item)).await;
});
copy_stream_to_callback(
stream,
|input| {
iter::once(
input.into_iter().map(RoomKeyInfo::from).map(JsValue::from).collect::<Array>(),
)
},
callback,
"room-key-received",
);
}

/// Register a callback which will be called whenever we receive a
/// notification that some room keys have been withheld.
///
/// `callback` should be a function that takes a single argument (an array
/// of {@link RoomKeyWithheldInfo}) and returns a Promise.
#[wasm_bindgen(js_name = "registerRoomKeysWithheldCallback")]
pub async fn register_room_keys_withheld_callback(&self, callback: Function) {
let stream = self.inner.store().room_keys_withheld_received_stream();

copy_stream_to_callback(
stream,
|input| {
iter::once(
input
.into_iter()
.map(RoomKeyWithheldInfo::from)
.map(JsValue::from)
.collect::<Array>(),
)
},
callback,
"room-key-withheld",
);
}

/// Register a callback which will be called whenever there is an update to
Expand All @@ -1324,14 +1352,18 @@ impl OlmMachine {
pub async fn register_user_identity_updated_callback(&self, callback: Function) {
let stream = self.inner.store().identities_stream_raw();

// fire up a promise chain which will call `cb` on each result from the stream
spawn_local(async move {
// take a reference to `callback` (which we then pass into the closure), to stop
// the callback being moved into the closure (which would mean we could only
// call the closure once)
let callback_ref = &callback;
stream.for_each(move |item| send_user_identities_to_callback(callback_ref, item)).await;
});
copy_stream_to_callback(
stream,
|(identity_updates, _)| {
identity_updates
.new
.into_iter()
.chain(identity_updates.changed.into_iter())
.map(|update| identifiers::UserId::from(update.user_id().to_owned()))
},
callback,
"user-identity-updated",
);
}

/// Register a callback which will be called whenever there is an update to
Expand All @@ -1343,15 +1375,25 @@ impl OlmMachine {
pub async fn register_devices_updated_callback(&self, callback: Function) {
let stream = self.inner.store().identities_stream_raw();

// fire up a promise chain which will call `callback` on each result from the
// stream
spawn_local(async move {
// take a reference to `callback` (which we then pass into the closure), to stop
// the callback being moved into the closure (which would mean we could only
// call the closure once)
let callback_ref = &callback;
stream.for_each(move |item| send_device_updates_to_callback(callback_ref, item)).await;
});
fn mapper(changes: (IdentityChanges, DeviceChanges)) -> iter::Once<Array> {
let (_, device_updates) = changes;

// get the user IDs of all the devices that have changed
let updated_chain = device_updates
.new
.into_iter()
.chain(device_updates.changed.into_iter())
.chain(device_updates.deleted.into_iter());

// put them in a set to make them unique
let updated_users: HashSet<String> =
HashSet::from_iter(updated_chain.map(|device| device.user_id().to_string()));

// ... and collect to a JS Array
iter::once(updated_users.into_iter().map(JsValue::from).collect())
}

copy_stream_to_callback(stream, mapper, callback, "device-updated");
}

/// Register a callback which will be called whenever a secret
Expand Down Expand Up @@ -1538,66 +1580,41 @@ impl OlmMachine {
}
}

// helper for register_room_key_received_callback: wraps the key info
// into our own RoomKeyInfo struct, and passes it into the javascript
// function
async fn send_room_key_info_to_callback(
callback: &Function,
room_key_info: Vec<matrix_sdk_crypto::store::RoomKeyInfo>,
) {
let rki: Array = room_key_info.into_iter().map(RoomKeyInfo::from).map(JsValue::from).collect();
match promise_result_to_future(callback.call1(&JsValue::NULL, &rki)).await {
Ok(_) => (),
Err(e) => {
warn!("Error calling room-key-received callback: {:?}", e);
}
}
}

// helper for register_user_identity_updated_callback: passes the user ID into
// the javascript function
async fn send_user_identities_to_callback(
callback: &Function,
(identity_updates, _): (IdentityChanges, DeviceChanges),
) {
let update_chain = identity_updates.new.into_iter().chain(identity_updates.changed.into_iter());
for update in update_chain {
let user_id = identifiers::UserId::from(update.user_id().to_owned());
match promise_result_to_future(callback.call1(&JsValue::NULL, &(user_id.into()))).await {
Ok(_) => (),
Err(e) => {
warn!("Error calling user-identity-updated callback: {:?}", e);
/// Helper for `register_*_callback` methods: fires off a background job (or
/// rather, a chain of JS promises) which will copy items from the stream to the
/// callback.
///
/// # Arguments
///
/// * `stream`: the stream to copy items from.
/// * `mapper`: a function which takes items from the stream, and converts them
/// to an iterator of values to send to the callback. Each entry in the
/// iterator will result in a call to the callback.
/// * `callback`: the javascript callback function.
/// * `callback_name`: a name for this type of callback, for error reporting.
fn copy_stream_to_callback<Item, MappedTypeIterator, MappedType>(
stream: impl Stream<Item = Item> + 'static,
mapper: impl Fn(Item) -> MappedTypeIterator + 'static,
callback: Function,
callback_name: &'static str,
) where
MappedTypeIterator: Iterator<Item = MappedType>,
MappedType: Into<JsValue>,
{
spawn_local(async move {
pin_mut!(stream);

while let Some(item) = stream.next().await {
for val in mapper(item) {
match promise_result_to_future(callback.call1(&JsValue::NULL, &val.into())).await {
Ok(_) => (),
Err(e) => {
warn!("Error calling {} callback: {:?}", callback_name, e);
}
}
}
}
}
}

// helper for register_device_updated_callback: passes the user IDs into
// the javascript function
async fn send_device_updates_to_callback(
callback: &Function,
(_, device_updates): (IdentityChanges, DeviceChanges),
) {
// get the user IDs of all the devices that have changed
let updated_chain = device_updates
.new
.into_iter()
.chain(device_updates.changed.into_iter())
.chain(device_updates.deleted.into_iter());
// put them in a set to make them unique
let updated_users: HashSet<String> =
HashSet::from_iter(updated_chain.map(|device| device.user_id().to_string()));
let updated_users_vec = Vec::from_iter(updated_users.iter());
match promise_result_to_future(
callback.call1(&JsValue::NULL, &serde_wasm_bindgen::to_value(&updated_users_vec).unwrap()),
)
.await
{
Ok(_) => (),
Err(e) => {
warn!("Error calling device-updated callback: {:?}", e);
}
}
});
}

// helper for register_secret_receive_callback: passes the secret name and value
Expand Down
47 changes: 46 additions & 1 deletion src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use wasm_bindgen::prelude::*;
use zeroize::{Zeroize, Zeroizing};

use crate::{
encryption::EncryptionAlgorithm, identifiers::RoomId, impl_from_to_inner,
encryption::EncryptionAlgorithm,
identifiers::{RoomId, UserId},
impl_from_to_inner,
vodozemac::Curve25519PublicKey,
};

Expand Down Expand Up @@ -205,6 +207,49 @@ impl RoomKeyInfo {
}
}

/// Information on a received `m.room_key.withheld` event.
#[wasm_bindgen]
#[derive(Debug)]
pub struct RoomKeyWithheldInfo {
pub(crate) inner: matrix_sdk_crypto::store::RoomKeyWithheldInfo,
}

impl_from_to_inner!(matrix_sdk_crypto::store::RoomKeyWithheldInfo => RoomKeyWithheldInfo);

#[wasm_bindgen]
impl RoomKeyWithheldInfo {
/// The User ID of the user that sent us the `m.room_key.withheld` message.
#[wasm_bindgen(getter)]
pub fn sender(&self) -> UserId {
self.inner.withheld_event.sender.to_owned().into()
}

/// The encryption algorithm of the session that is being withheld.
#[wasm_bindgen(getter)]
pub fn algorithm(&self) -> EncryptionAlgorithm {
self.inner.withheld_event.content.algorithm().into()
}

/// The `code` from the `m.room_key.withheld` message, such as
/// `m.unverified`.
#[wasm_bindgen(getter, js_name = "withheldCode")]
pub fn withheld_code(&self) -> String {
self.inner.withheld_event.content.withheld_code().as_str().to_owned()
}

/// The room ID of the session that is being withheld.
#[wasm_bindgen(getter, js_name = "roomId")]
pub fn room_id(&self) -> RoomId {
self.inner.room_id.to_owned().into()
}

/// The session ID of the session that is being withheld.
#[wasm_bindgen(getter, js_name = "sessionId")]
pub fn session_id(&self) -> String {
self.inner.session_id.to_owned()
}
}

/// Struct containing the bundle of secrets to fully activate a new device for
/// end-to-end encryption.
#[derive(Debug)]
Expand Down
Loading

0 comments on commit b0a4ac5

Please sign in to comment.