diff --git a/Cargo.lock b/Cargo.lock index c829ffaf99e..0096903f563 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,6 +94,18 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" +[[package]] +name = "app_dirs2" +version = "2.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47a8d2d8dbda5fca0a522259fb88e4f55d2b10ad39f5f03adeebf85031eba501" +dependencies = [ + "jni", + "ndk-context", + "winapi", + "xdg", +] + [[package]] name = "arc-swap" version = "1.5.1" @@ -554,6 +566,12 @@ version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cfg-if" version = "1.0.0" @@ -742,6 +760,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b" +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "concurrent-queue" version = "1.2.4" @@ -751,6 +779,20 @@ dependencies = [ "cache-padded", ] +[[package]] +name = "console" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89eab4d20ce20cea182308bca13088fecea9c05f6776cf287205d41a0ed3c847" +dependencies = [ + "encode_unicode", + "libc", + "once_cell", + "terminal_size", + "unicode-width", + "winapi", +] + [[package]] name = "console_error_panic_hook" version = "0.1.7" @@ -1161,6 +1203,17 @@ dependencies = [ "syn", ] +[[package]] +name = "dialoguer" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92e7e37ecef6857fdc0c0c5d42fd5b0938e46590c2183cc92dd310a6d078eb1" +dependencies = [ + "console", + "tempfile", + "zeroize", +] + [[package]] name = "digest" version = "0.9.0" @@ -1270,6 +1323,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "encoding_rs" version = "0.8.31" @@ -2164,12 +2223,17 @@ checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" name = "jack-in" version = "0.2.0" dependencies = [ + "app_dirs2", + "dialoguer", "eyre", "futures", "futures-signals", "log4rs", "matrix-sdk", "matrix-sdk-common", + "matrix-sdk-sled", + "sanitize-filename-reader-friendly", + "serde_json", "structopt", "tokio", "tracing", @@ -2179,6 +2243,26 @@ dependencies = [ "tuirealm", ] +[[package]] +name = "jni" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6df18c2e3db7e453d3c6ac5b3e9d5182664d28788126d39b91f2d1e22b017ec" +dependencies = [ + "cesu8", + "combine", + "jni-sys", + "log", + "thiserror", + "walkdir", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "jpeg-decoder" version = "0.1.22" @@ -2904,6 +2988,12 @@ dependencies = [ "tempfile", ] +[[package]] +name = "ndk-context" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27b02d87554356db9e9a873add8782d4ea6e3e58ea071a9adb9a2e8ddb884a8b" + [[package]] name = "nibble_vec" version = "0.1.0" @@ -4440,6 +4530,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "terminal_size" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -5407,6 +5507,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "xdg" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4583db5cbd4c4c0303df2d15af80f0539db703fa1c68802d4cbbd2dd0f88f6" +dependencies = [ + "dirs", +] + [[package]] name = "xshell" version = "0.1.17" diff --git a/bindings/matrix-sdk-crypto-ffi/src/machine.rs b/bindings/matrix-sdk-crypto-ffi/src/machine.rs index fa078d7d33b..a859b432e96 100644 --- a/bindings/matrix-sdk-crypto-ffi/src/machine.rs +++ b/bindings/matrix-sdk-crypto-ffi/src/machine.rs @@ -419,7 +419,7 @@ impl OlmMachine { key_counts: HashMap, unused_fallback_keys: Option>, ) -> Result { - let events: ToDevice = serde_json::from_str(events)?; + let to_device: ToDevice = serde_json::from_str(events)?; let device_changes: RumaDeviceLists = device_changes.into(); let key_counts: BTreeMap = key_counts .into_iter() @@ -437,7 +437,7 @@ impl OlmMachine { unused_fallback_keys.map(|u| u.into_iter().map(DeviceKeyAlgorithm::from).collect()); let events = self.runtime.block_on(self.inner.receive_sync_changes( - events, + to_device.events, &device_changes, &key_counts, unused_fallback_keys.as_deref(), diff --git a/bindings/matrix-sdk-crypto-ffi/src/responses.rs b/bindings/matrix-sdk-crypto-ffi/src/responses.rs index feb6b89c909..0a2512804a8 100644 --- a/bindings/matrix-sdk-crypto-ffi/src/responses.rs +++ b/bindings/matrix-sdk-crypto-ffi/src/responses.rs @@ -19,7 +19,7 @@ use ruma::{ }, }, message::send_message_event::v3::Response as RoomMessageResponse, - sync::sync_events::v3::DeviceLists as RumaDeviceLists, + sync::sync_events::DeviceLists as RumaDeviceLists, to_device::send_event_to_device::v3::Response as ToDeviceResponse, }, assign, diff --git a/bindings/matrix-sdk-crypto-js/src/sync_events.rs b/bindings/matrix-sdk-crypto-js/src/sync_events.rs index 8e04c632ccd..a06815f9bd6 100644 --- a/bindings/matrix-sdk-crypto-js/src/sync_events.rs +++ b/bindings/matrix-sdk-crypto-js/src/sync_events.rs @@ -9,7 +9,7 @@ use crate::{identifiers, js::downcast}; #[wasm_bindgen] #[derive(Debug)] pub struct DeviceLists { - pub(crate) inner: ruma::api::client::sync::sync_events::v3::DeviceLists, + pub(crate) inner: ruma::api::client::sync::sync_events::DeviceLists, } #[wasm_bindgen] @@ -19,7 +19,7 @@ impl DeviceLists { /// `changed` and `left` must be an array of `UserId`. #[wasm_bindgen(constructor)] pub fn new(changed: Option, left: Option) -> Result { - let mut inner = ruma::api::client::sync::sync_events::v3::DeviceLists::default(); + let mut inner = ruma::api::client::sync::sync_events::DeviceLists::default(); inner.changed = changed .unwrap_or_default() diff --git a/bindings/matrix-sdk-crypto-js/tests/device.test.js b/bindings/matrix-sdk-crypto-js/tests/device.test.js index 532624e2655..ec01a664e09 100644 --- a/bindings/matrix-sdk-crypto-js/tests/device.test.js +++ b/bindings/matrix-sdk-crypto-js/tests/device.test.js @@ -176,13 +176,11 @@ describe('Key Verification', () => { outgoingVerificationRequest = JSON.parse(outgoingVerificationRequest.body); expect(outgoingVerificationRequest.event_type).toStrictEqual('m.key.verification.request'); - const toDeviceEvents = { - events: [{ - sender: userId1.toString(), - type: outgoingVerificationRequest.event_type, - content: outgoingVerificationRequest.messages[userId2.toString()][deviceId2.toString()], - }] - }; + const toDeviceEvents = [{ + sender: userId1.toString(), + type: outgoingVerificationRequest.event_type, + content: outgoingVerificationRequest.messages[userId2.toString()][deviceId2.toString()], + }]; // Let's send the verification request to `m2`. await m2.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); @@ -230,13 +228,11 @@ describe('Key Verification', () => { outgoingVerificationRequest = JSON.parse(outgoingVerificationRequest.body); expect(outgoingVerificationRequest.event_type).toStrictEqual('m.key.verification.ready'); - const toDeviceEvents = { - events: [{ - sender: userId2.toString(), - type: outgoingVerificationRequest.event_type, - content: outgoingVerificationRequest.messages[userId1.toString()][deviceId1.toString()], - }], - }; + const toDeviceEvents = [{ + sender: userId2.toString(), + type: outgoingVerificationRequest.event_type, + content: outgoingVerificationRequest.messages[userId1.toString()][deviceId1.toString()], + }]; // Let's send the verification ready to `m1`. await m1.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); @@ -287,13 +283,11 @@ describe('Key Verification', () => { outgoingVerificationRequest = JSON.parse(outgoingVerificationRequest.body); expect(outgoingVerificationRequest.event_type).toStrictEqual('m.key.verification.start'); - const toDeviceEvents = { - events: [{ - sender: userId2.toString(), - type: outgoingVerificationRequest.event_type, - content: outgoingVerificationRequest.messages[userId1.toString()][deviceId1.toString()], - }], - }; + const toDeviceEvents = [{ + sender: userId2.toString(), + type: outgoingVerificationRequest.event_type, + content: outgoingVerificationRequest.messages[userId1.toString()][deviceId1.toString()], + }]; // Let's send the SAS start to `m1`. await m1.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); @@ -335,13 +329,11 @@ describe('Key Verification', () => { outgoingVerificationRequest = JSON.parse(outgoingVerificationRequest.body); expect(outgoingVerificationRequest.event_type).toStrictEqual('m.key.verification.accept'); - const toDeviceEvents = { - events: [{ - sender: userId1.toString(), - type: outgoingVerificationRequest.event_type, - content: outgoingVerificationRequest.messages[userId2.toString()][deviceId2.toString()], - }], - }; + const toDeviceEvents = [{ + sender: userId1.toString(), + type: outgoingVerificationRequest.event_type, + content: outgoingVerificationRequest.messages[userId2.toString()][deviceId2.toString()], + }]; // Let's send the SAS accept to `m2`. await m2.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); @@ -364,13 +356,11 @@ describe('Key Verification', () => { toDeviceRequest = JSON.parse(toDeviceRequest.body); expect(toDeviceRequest.event_type).toStrictEqual('m.key.verification.key'); - const toDeviceEvents = { - events: [{ - sender: userId2.toString(), - type: toDeviceRequest.event_type, - content: toDeviceRequest.messages[userId1.toString()][deviceId1.toString()], - }], - }; + const toDeviceEvents = [{ + sender: userId2.toString(), + type: toDeviceRequest.event_type, + content: toDeviceRequest.messages[userId1.toString()][deviceId1.toString()], + }]; // Let's send te SAS key to `m1`. await m1.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); @@ -390,13 +380,11 @@ describe('Key Verification', () => { toDeviceRequest = JSON.parse(toDeviceRequest.body); expect(toDeviceRequest.event_type).toStrictEqual('m.key.verification.key'); - const toDeviceEvents = { - events: [{ - sender: userId1.toString(), - type: toDeviceRequest.event_type, - content: toDeviceRequest.messages[userId2.toString()][deviceId2.toString()], - }], - }; + const toDeviceEvents = [{ + sender: userId1.toString(), + type: toDeviceRequest.event_type, + content: toDeviceRequest.messages[userId2.toString()][deviceId2.toString()], + }]; // Let's send te SAS key to `m2`. await m2.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); @@ -463,13 +451,11 @@ describe('Key Verification', () => { outgoingVerificationRequest = JSON.parse(outgoingVerificationRequest.body); expect(outgoingVerificationRequest.event_type).toStrictEqual('m.key.verification.mac'); - const toDeviceEvents = { - events: [{ - sender: userId1.toString(), - type: outgoingVerificationRequest.event_type, - content: outgoingVerificationRequest.messages[userId2.toString()][deviceId2.toString()], - }], - }; + const toDeviceEvents = [{ + sender: userId1.toString(), + type: outgoingVerificationRequest.event_type, + content: outgoingVerificationRequest.messages[userId2.toString()][deviceId2.toString()], + }]; // Let's send te SAS confirmation to `m2`. await m2.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); @@ -491,13 +477,11 @@ describe('Key Verification', () => { outgoingVerificationRequest = JSON.parse(outgoingVerificationRequest.body); expect(outgoingVerificationRequest.event_type).toStrictEqual('m.key.verification.mac'); - const toDeviceEvents = { - events: [{ - sender: userId2.toString(), - type: outgoingVerificationRequest.event_type, - content: outgoingVerificationRequest.messages[userId1.toString()][deviceId1.toString()], - }], - }; + const toDeviceEvents = [{ + sender: userId2.toString(), + type: outgoingVerificationRequest.event_type, + content: outgoingVerificationRequest.messages[userId1.toString()][deviceId1.toString()], + }]; // Let's send te SAS confirmation to `m1`. await m1.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); @@ -512,13 +496,11 @@ describe('Key Verification', () => { outgoingVerificationRequest = JSON.parse(outgoingVerificationRequest.body); expect(outgoingVerificationRequest.event_type).toStrictEqual('m.key.verification.done'); - const toDeviceEvents = { - events: [{ - sender: userId2.toString(), - type: outgoingVerificationRequest.event_type, - content: outgoingVerificationRequest.messages[userId1.toString()][deviceId1.toString()], - }], - }; + const toDeviceEvents = [{ + sender: userId2.toString(), + type: outgoingVerificationRequest.event_type, + content: outgoingVerificationRequest.messages[userId1.toString()][deviceId1.toString()], + }]; // Let's send te SAS done to `m1`. await m1.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); @@ -538,13 +520,11 @@ describe('Key Verification', () => { toDeviceRequest = JSON.parse(toDeviceRequest.body); expect(toDeviceRequest.event_type).toStrictEqual('m.key.verification.done'); - const toDeviceEvents = { - events: [{ - sender: userId1.toString(), - type: toDeviceRequest.event_type, - content: toDeviceRequest.messages[userId2.toString()][deviceId2.toString()], - }], - }; + const toDeviceEvents = [{ + sender: userId1.toString(), + type: toDeviceRequest.event_type, + content: toDeviceRequest.messages[userId2.toString()][deviceId2.toString()], + }]; // Let's send te SAS key to `m2`. await m2.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); @@ -628,13 +608,11 @@ describe('Key Verification', () => { outgoingVerificationRequest = JSON.parse(outgoingVerificationRequest.body); expect(outgoingVerificationRequest.event_type).toStrictEqual('m.key.verification.request'); - const toDeviceEvents = { - events: [{ - sender: userId1.toString(), - type: outgoingVerificationRequest.event_type, - content: outgoingVerificationRequest.messages[userId2.toString()][deviceId2.toString()], - }] - }; + const toDeviceEvents = [{ + sender: userId1.toString(), + type: outgoingVerificationRequest.event_type, + content: outgoingVerificationRequest.messages[userId2.toString()][deviceId2.toString()], + }]; // Let's send the verification request to `m2`. await m2.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); @@ -685,13 +663,11 @@ describe('Key Verification', () => { outgoingVerificationRequest = JSON.parse(outgoingVerificationRequest.body); expect(outgoingVerificationRequest.event_type).toStrictEqual('m.key.verification.ready'); - const toDeviceEvents = { - events: [{ - sender: userId2.toString(), - type: outgoingVerificationRequest.event_type, - content: outgoingVerificationRequest.messages[userId1.toString()][deviceId1.toString()], - }], - }; + const toDeviceEvents = [{ + sender: userId2.toString(), + type: outgoingVerificationRequest.event_type, + content: outgoingVerificationRequest.messages[userId1.toString()][deviceId1.toString()], + }]; // Let's send the verification ready to `m1`. await m1.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); @@ -848,13 +824,11 @@ describe('Key Verification', () => { outgoingVerificationRequest = JSON.parse(outgoingVerificationRequest.body); expect(outgoingVerificationRequest.event_type).toStrictEqual('m.key.verification.start'); - const toDeviceEvents = { - events: [{ - sender: userId1.toString(), - type: outgoingVerificationRequest.event_type, - content: outgoingVerificationRequest.messages[userId2.toString()][deviceId2.toString()], - }] - }; + const toDeviceEvents = [{ + sender: userId1.toString(), + type: outgoingVerificationRequest.event_type, + content: outgoingVerificationRequest.messages[userId2.toString()][deviceId2.toString()], + }]; // Let's send the verification request to `m2`. await m2.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); @@ -872,13 +846,11 @@ describe('Key Verification', () => { outgoingVerificationRequest = JSON.parse(outgoingVerificationRequest.body); expect(outgoingVerificationRequest.event_type).toStrictEqual('m.key.verification.done'); - const toDeviceEvents = { - events: [{ - sender: userId2.toString(), - type: outgoingVerificationRequest.event_type, - content: outgoingVerificationRequest.messages[userId1.toString()][deviceId1.toString()], - }] - }; + const toDeviceEvents = [{ + sender: userId2.toString(), + type: outgoingVerificationRequest.event_type, + content: outgoingVerificationRequest.messages[userId1.toString()][deviceId1.toString()], + }]; // Let's send the verification request to `m2`. await m2.receiveSyncChanges(JSON.stringify(toDeviceEvents), new DeviceLists(), new Map(), new Set()); diff --git a/bindings/matrix-sdk-crypto-js/tests/helper.js b/bindings/matrix-sdk-crypto-js/tests/helper.js index 1ced03d1a46..29f21b61ac3 100644 --- a/bindings/matrix-sdk-crypto-js/tests/helper.js +++ b/bindings/matrix-sdk-crypto-js/tests/helper.js @@ -11,14 +11,14 @@ function* zip(...arrays) { // Add a machine to another machine, i.e. be sure a machine knows // another exists. async function addMachineToMachine(machineToAdd, machine) { - const toDeviceEvents = JSON.stringify({}); + const toDeviceEvents = JSON.stringify([]); const changedDevices = new DeviceLists(); const oneTimeKeyCounts = new Map(); const unusedFallbackKeys = new Set(); const receiveSyncChanges = JSON.parse(await machineToAdd.receiveSyncChanges(toDeviceEvents, changedDevices, oneTimeKeyCounts, unusedFallbackKeys)); - expect(receiveSyncChanges).toEqual({}); + expect(receiveSyncChanges).toEqual([]); const outgoingRequests = await machineToAdd.outgoingRequests(); diff --git a/bindings/matrix-sdk-crypto-js/tests/machine.test.js b/bindings/matrix-sdk-crypto-js/tests/machine.test.js index 7eda1903c3c..c375015ddbf 100644 --- a/bindings/matrix-sdk-crypto-js/tests/machine.test.js +++ b/bindings/matrix-sdk-crypto-js/tests/machine.test.js @@ -128,26 +128,26 @@ describe(OlmMachine.name, () => { test('can receive sync changes', async () => { const m = await machine(); - const toDeviceEvents = JSON.stringify({}); + const toDeviceEvents = JSON.stringify([]); const changedDevices = new DeviceLists(); const oneTimeKeyCounts = new Map(); const unusedFallbackKeys = new Set(); const receiveSyncChanges = JSON.parse(await m.receiveSyncChanges(toDeviceEvents, changedDevices, oneTimeKeyCounts, unusedFallbackKeys)); - expect(receiveSyncChanges).toEqual({}); + expect(receiveSyncChanges).toEqual([]); }); test('can get the outgoing requests that need to be send out', async () => { const m = await machine(); - const toDeviceEvents = JSON.stringify({}); + const toDeviceEvents = JSON.stringify([]); const changedDevices = new DeviceLists(); const oneTimeKeyCounts = new Map(); const unusedFallbackKeys = new Set(); const receiveSyncChanges = JSON.parse(await m.receiveSyncChanges(toDeviceEvents, changedDevices, oneTimeKeyCounts, unusedFallbackKeys)); - expect(receiveSyncChanges).toEqual({}); + expect(receiveSyncChanges).toEqual([]); const outgoingRequests = await m.outgoingRequests(); @@ -182,7 +182,7 @@ describe(OlmMachine.name, () => { beforeAll(async () => { m = await machine(new UserId('@alice:example.org'), new DeviceId('DEVICEID')); - const toDeviceEvents = JSON.stringify({}); + const toDeviceEvents = JSON.stringify([]); const changedDevices = new DeviceLists(); const oneTimeKeyCounts = new Map(); const unusedFallbackKeys = new Set(); diff --git a/bindings/matrix-sdk-crypto-nodejs/src/sync_events.rs b/bindings/matrix-sdk-crypto-nodejs/src/sync_events.rs index 63e017021ee..4122bba2d9d 100644 --- a/bindings/matrix-sdk-crypto-nodejs/src/sync_events.rs +++ b/bindings/matrix-sdk-crypto-nodejs/src/sync_events.rs @@ -7,7 +7,7 @@ use crate::identifiers; /// Information on E2E device updates. #[napi] pub struct DeviceLists { - pub(crate) inner: ruma::api::client::sync::sync_events::v3::DeviceLists, + pub(crate) inner: ruma::api::client::sync::sync_events::DeviceLists, } #[napi] @@ -18,7 +18,7 @@ impl DeviceLists { changed: Option>, left: Option>, ) -> Self { - let mut inner = ruma::api::client::sync::sync_events::v3::DeviceLists::default(); + let mut inner = ruma::api::client::sync::sync_events::DeviceLists::default(); inner.changed = changed.into_iter().flatten().map(|user| user.inner.clone()).collect(); inner.left = left.into_iter().flatten().map(|user| user.inner.clone()).collect(); diff --git a/bindings/matrix-sdk-crypto-nodejs/tests/machine.test.js b/bindings/matrix-sdk-crypto-nodejs/tests/machine.test.js index 020486bd2aa..a3b424683c5 100644 --- a/bindings/matrix-sdk-crypto-nodejs/tests/machine.test.js +++ b/bindings/matrix-sdk-crypto-nodejs/tests/machine.test.js @@ -51,26 +51,26 @@ describe(OlmMachine.name, () => { test('can receive sync changes', async () => { const m = await machine(); - const toDeviceEvents = JSON.stringify({}); + const toDeviceEvents = JSON.stringify([]); const changedDevices = new DeviceLists(); const oneTimeKeyCounts = {}; const unusedFallbackKeys = []; const receiveSyncChanges = JSON.parse(await m.receiveSyncChanges(toDeviceEvents, changedDevices, oneTimeKeyCounts, unusedFallbackKeys)); - expect(receiveSyncChanges).toEqual({}); + expect(receiveSyncChanges).toEqual([]); }); test('can get the outgoing requests that need to be send out', async () => { const m = await machine(); - const toDeviceEvents = JSON.stringify({}); + const toDeviceEvents = JSON.stringify([]); const changedDevices = new DeviceLists(); const oneTimeKeyCounts = {}; const unusedFallbackKeys = []; const receiveSyncChanges = JSON.parse(await m.receiveSyncChanges(toDeviceEvents, changedDevices, oneTimeKeyCounts, unusedFallbackKeys)); - expect(receiveSyncChanges).toEqual({}); + expect(receiveSyncChanges).toEqual([]); const outgoingRequests = await m.outgoingRequests(); @@ -105,12 +105,12 @@ describe(OlmMachine.name, () => { beforeAll(async () => { m = await machine(new UserId('@alice:example.org'), new DeviceId('DEVICEID')); - const toDeviceEvents = JSON.stringify({}); + const toDeviceEvents = JSON.stringify([]); const changedDevices = new DeviceLists(); const oneTimeKeyCounts = {}; const unusedFallbackKeys = []; - const receiveSyncChanges = await m.receiveSyncChanges(toDeviceEvents, changedDevices, oneTimeKeyCounts, unusedFallbackKeys); + await m.receiveSyncChanges(toDeviceEvents, changedDevices, oneTimeKeyCounts, unusedFallbackKeys); outgoingRequests = await m.outgoingRequests(); expect(outgoingRequests).toHaveLength(2); diff --git a/bindings/matrix-sdk-ffi/src/api.udl b/bindings/matrix-sdk-ffi/src/api.udl index b92e5eba603..61b7da5ee84 100644 --- a/bindings/matrix-sdk-ffi/src/api.udl +++ b/bindings/matrix-sdk-ffi/src/api.udl @@ -175,6 +175,9 @@ interface SlidingSyncBuilder { [Self=ByArc] SlidingSyncBuilder add_view(SlidingSyncView view); + [Self=ByArc] + SlidingSyncBuilder with_common_extensions(); + [Throws=ClientError, Self=ByArc] SlidingSync build(); }; diff --git a/bindings/matrix-sdk-ffi/src/client.rs b/bindings/matrix-sdk-ffi/src/client.rs index 1b89bbe99f0..28f2e961cd4 100644 --- a/bindings/matrix-sdk-ffi/src/client.rs +++ b/bindings/matrix-sdk-ffi/src/client.rs @@ -357,7 +357,7 @@ impl Client { &*session_verification_controller.read().await { session_verification_controller - .process_to_device_messages(sync_response.to_device) + .process_to_device_messages(sync_response.to_device_events) .await; } diff --git a/bindings/matrix-sdk-ffi/src/session_verification.rs b/bindings/matrix-sdk-ffi/src/session_verification.rs index bc9b3d59e2f..9db8f5c4c90 100644 --- a/bindings/matrix-sdk-ffi/src/session_verification.rs +++ b/bindings/matrix-sdk-ffi/src/session_verification.rs @@ -6,8 +6,8 @@ use matrix_sdk::{ verification::{SasVerification, VerificationRequest}, }, ruma::{ - api::client::sync::sync_events::v3::ToDevice, events::{key::verification::VerificationMethod, AnyToDeviceEvent}, + serde::Raw, }, }; @@ -106,10 +106,10 @@ impl SessionVerificationController { }) } - pub async fn process_to_device_messages(&self, to_device: ToDevice) { + pub async fn process_to_device_messages(&self, to_device_events: Vec>) { let sas_verification = self.sas_verification.clone(); - for event in to_device.events.into_iter().filter_map(|e| e.deserialize().ok()) { + for event in to_device_events.into_iter().filter_map(|e| e.deserialize().ok()) { match event { AnyToDeviceEvent::KeyVerificationReady(event) => { if !self.is_transaction_id_valid(event.content.transaction_id.to_string()) { diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 9bfe47a1a49..97eda7b9ea5 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -137,9 +137,10 @@ impl SlidingSyncRoom { for ev in lock.iter().rev() { if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage( SyncRoomMessageEvent::Original(o), - ))) = ev.deserialize() + ))) = ev.event.deserialize() { - let inner = matrix_sdk::room::timeline::EventTimelineItem::_new(o, ev.clone()); + let inner = + matrix_sdk::room::timeline::EventTimelineItem::_new(o, ev.event.clone()); return Some(Arc::new(EventTimelineItem(inner))); } } @@ -584,6 +585,12 @@ impl SlidingSyncBuilder { Arc::new(builder) } + pub fn with_common_extensions(self: Arc) -> Arc { + let mut builder = unwrap_or_clone_arc(self); + builder.inner = builder.inner.with_common_extensions(); + Arc::new(builder) + } + pub fn build(self: Arc) -> anyhow::Result> { let builder = unwrap_or_clone_arc(self); Ok(Arc::new(SlidingSync::new(builder.inner.build()?, builder.client))) diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 9f5605d52c5..0daff8a9618 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -252,10 +252,12 @@ impl BaseClient { } #[allow(clippy::too_many_arguments)] - async fn handle_timeline( + pub(crate) async fn handle_timeline( &self, room: &Room, - ruma_timeline: api::sync::sync_events::v3::Timeline, + limited: bool, + events: Vec>, + prev_batch: Option, push_rules: &Ruleset, user_ids: &mut BTreeSet, room_info: &mut RoomInfo, @@ -264,10 +266,10 @@ impl BaseClient { ) -> Result { let room_id = room.room_id(); let user_id = room.own_user_id(); - let mut timeline = Timeline::new(ruma_timeline.limited, ruma_timeline.prev_batch.clone()); + let mut timeline = Timeline::new(limited, prev_batch); let mut push_context = self.get_push_room_context(room, room_info, changes).await?; - for event in ruma_timeline.events { + for event in events { #[allow(unused_mut)] let mut event: SyncTimelineEvent = event.into(); @@ -496,7 +498,7 @@ impl BaseClient { Ok(user_ids) } - async fn handle_room_account_data( + pub(crate) async fn handle_room_account_data( &self, room_id: &RoomId, events: &[Raw], @@ -509,7 +511,7 @@ impl BaseClient { } } - async fn handle_account_data( + pub(crate) async fn handle_account_data( &self, events: &[Raw], changes: &mut StateChanges, @@ -552,6 +554,31 @@ impl BaseClient { changes.account_data = account_data; } + #[cfg(feature = "e2e-encryption")] + pub(crate) async fn preprocess_to_device_events( + &self, + to_device_events: Vec>, + changed_devices: &api::sync::sync_events::DeviceLists, + one_time_keys_counts: &BTreeMap, + unused_fallback_keys: Option<&[ruma::DeviceKeyAlgorithm]>, + ) -> Result>> { + if let Some(o) = self.olm_machine() { + // Let the crypto machine handle the sync response, this + // decrypts to-device events, but leaves room events alone. + // This makes sure that we have the decryption keys for the room + // events at hand. + Ok(o.receive_sync_changes( + to_device_events, + changed_devices, + one_time_keys_counts, + unused_fallback_keys, + ) + .await?) + } else { + Ok(to_device_events) + } + } + /// Receive a response from a sync call. /// /// # Arguments @@ -582,25 +609,17 @@ impl BaseClient { } let now = Instant::now(); + let to_device_events = to_device.events; #[cfg(feature = "e2e-encryption")] - let to_device = { - if let Some(o) = self.olm_machine() { - // Let the crypto machine handle the sync response, this - // decrypts to-device events, but leaves room events alone. - // This makes sure that we have the decryption keys for the room - // events at hand. - o.receive_sync_changes( - to_device, - &device_lists, - &device_one_time_keys_count, - device_unused_fallback_key_types.as_deref(), - ) - .await? - } else { - to_device - } - }; + let to_device_events = self + .preprocess_to_device_events( + to_device_events, + &device_lists, + &device_one_time_keys_count, + device_unused_fallback_key_types.as_deref(), + ) + .await?; let mut changes = StateChanges::new(next_batch.clone()); let mut ambiguity_cache = AmbiguityCache::new(self.store.inner.clone()); @@ -644,7 +663,9 @@ impl BaseClient { let timeline = self .handle_timeline( &room, - new_info.timeline, + new_info.timeline.limited, + new_info.timeline.events, + new_info.timeline.prev_batch, &push_rules, &mut user_ids, &mut room_info, @@ -683,7 +704,7 @@ impl BaseClient { JoinedRoom::new( timeline, new_info.state, - new_info.account_data, + new_info.account_data.events, new_info.ephemeral, notification_count, ), @@ -709,7 +730,9 @@ impl BaseClient { let timeline = self .handle_timeline( &room, - new_info.timeline, + new_info.timeline.limited, + new_info.timeline.events, + new_info.timeline.prev_batch, &push_rules, &mut user_ids, &mut room_info, @@ -771,8 +794,8 @@ impl BaseClient { next_batch, rooms: new_rooms, presence, - account_data, - to_device, + account_data: account_data.events, + to_device_events, device_lists, device_one_time_keys_count: device_one_time_keys_count .into_iter() diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index 92c062864d4..c18c0293ab2 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -30,60 +30,68 @@ impl BaseClient { // next_batch, rooms, lists, + extensions, // FIXME: missing compared to v3::Response //presence, - //account_data, - //to_device, - //device_lists, - //device_one_time_keys_count, - //device_unused_fallback_key_types, .. } = response; - // FIXME not yet supported by sliding sync. see - // https://github.com/matrix-org/matrix-rust-sdk/issues/1014 - // #[cfg(feature = "encryption")] - // let to_device = { - // if let Some(o) = self.olm_machine().await { - // // Let the crypto machine handle the sync response, this - // // decrypts to-device events, but leaves room events alone. - // // This makes sure that we have the decryption keys for the room - // // events at hand. - // o.receive_sync_changes( - // to_device, - // &device_lists, - // &device_one_time_keys_count, - // device_unused_fallback_key_types.as_deref(), - // ) - // .await? - // } else { - // to_device - // } - // }; - - if rooms.is_empty() { - // nothing for us to handle here + if rooms.is_empty() && extensions.is_empty() { + // we received a room reshuffling event only, there won't be anything for us to + // process. stop early return Ok(SyncResponse::default()); }; + let v4::Extensions { to_device, e2ee, account_data, .. } = extensions; + + let to_device_events = to_device.map(|v4| v4.events).unwrap_or_default(); + + #[cfg(feature = "e2e-encryption")] + let to_device_events = { + if let Some(e2ee) = &e2ee { + self.preprocess_to_device_events( + to_device_events, + &e2ee.device_lists, + &e2ee.device_one_time_keys_count, + e2ee.device_unused_fallback_key_types.as_deref(), + ) + .await? + } else { + to_device_events + } + }; + + let (device_lists, device_one_time_keys_count) = e2ee + .map(|e2ee| { + ( + e2ee.device_lists, + e2ee.device_one_time_keys_count + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect(), + ) + }) + .unwrap_or_default(); + let store = self.store.clone(); let mut changes = StateChanges::default(); let mut ambiguity_cache = AmbiguityCache::new(store.inner.clone()); - // FIXME not yet supported by sliding sync. - // self.handle_account_data(&account_data.events, &mut changes).await; + if let Some(global_data) = account_data.as_ref().map(|a| &a.global) { + self.handle_account_data(global_data, &mut changes).await; + } - let _push_rules = self.get_push_rules(&changes).await?; + let push_rules = self.get_push_rules(&changes).await?; let mut new_rooms = Rooms::default(); - for (room_id, room_data) in &rooms { + for (room_id, room_data) in rooms.into_iter() { if !room_data.invite_state.is_empty() { let invite_states = &room_data.invite_state; - let room = store.get_or_create_stripped_room(room_id).await; + let room = store.get_or_create_stripped_room(&room_id).await; let mut room_info = room.clone_info(); - if let Some(r) = store.get_room(room_id) { + if let Some(r) = store.get_room(&room_id) { let mut room_info = r.clone_info(); room_info.mark_as_invited(); // FIXME: this might not be accurate changes.add_room(room_info); @@ -96,7 +104,7 @@ impl BaseClient { v3::InvitedRoom::from(v3::InviteState::from(invite_states.clone())), ); } else { - let room = store.get_or_create_room(room_id, RoomType::Joined).await; + let room = store.get_or_create_room(&room_id, RoomType::Joined).await; let mut room_info = room.clone_info(); room_info.mark_as_joined(); // FIXME: this might not be accurate @@ -105,18 +113,16 @@ impl BaseClient { room_info.set_prev_batch(room_data.prev_batch.as_deref()); - let user_ids = if room_data.required_state.is_empty() { - None - } else { - Some( - self.handle_state( - &room_data.required_state, - &mut room_info, - &mut changes, - &mut ambiguity_cache, - ) - .await?, + let mut user_ids = if !room_data.required_state.is_empty() { + self.handle_state( + &room_data.required_state, + &mut room_info, + &mut changes, + &mut ambiguity_cache, ) + .await? + } else { + Default::default() }; // FIXME not yet supported by sliding sync. see @@ -130,36 +136,34 @@ impl BaseClient { // changes.add_receipts(&room_id, event); // } - // FIXME not yet supported by sliding sync. - // self.handle_room_account_data(&room_id, &room_data.account_data.events, &mut - // changes) .await; + let room_account_data = if let Some(inner_account_data) = &account_data { + if let Some(events) = inner_account_data.rooms.get(&room_id) { + self.handle_room_account_data(&room_id, events, &mut changes).await; + Some(events.to_vec()) + } else { + None + } + } else { + None + }; - // FIXME not yet supported by sliding sync. - // if room_data.timeline.limited { - // room_info.mark_members_missing(); - // } + if room_data.limited { + room_info.mark_members_missing(); + } - // let timeline = self - // .handle_timeline( - // &room, - // room_data.timeline, - // &push_rules, - // &mut room_info, - // &mut changes, - // &mut ambiguity_cache, - // &mut user_ids, - // ) - // .await?; - - // let timeline_slice = TimelineSlice::new( - // timeline.events.clone(), - // next_batch.clone(), - // timeline.prev_batch.clone(), - // timeline.limited, - // true, - // ); - - // changes.add_timeline(&room_id, timeline_slice); + let timeline = self + .handle_timeline( + &room, + room_data.limited, + room_data.timeline, + room_data.prev_batch, + &push_rules, + &mut user_ids, + &mut room_info, + &mut changes, + &mut ambiguity_cache, + ) + .await?; #[cfg(feature = "e2e-encryption")] if room_info.is_encrypted() { @@ -168,15 +172,15 @@ impl BaseClient { // The room turned on encryption in this sync, we need // to also get all the existing users and mark them for // tracking. - let joined = store.get_joined_user_ids(room_id).await?; - let invited = store.get_invited_user_ids(room_id).await?; + let joined = store.get_joined_user_ids(&room_id).await?; + let invited = store.get_invited_user_ids(&room_id).await?; let user_ids: Vec<&UserId> = joined.iter().chain(&invited).map(Deref::deref).collect(); o.update_tracked_users(user_ids).await } - if let Some(user_ids) = user_ids { + if !user_ids.is_empty() { o.update_tracked_users(user_ids.iter().map(Deref::deref)).await; } } @@ -187,9 +191,9 @@ impl BaseClient { new_rooms.join.insert( room_id.clone(), JoinedRoom::new( - Default::default(), //timeline, + timeline, v3::State::with_events(room_data.required_state.clone()), - Default::default(), // room_info.account_data, + room_account_data.unwrap_or_default(), Default::default(), // room_info.ephemeral, notification_count, ), @@ -199,9 +203,13 @@ impl BaseClient { } } - // FIXME not yet supported by sliding sync. see - // https://github.com/matrix-org/matrix-rust-sdk/issues/1014 - // self.handle_account_data(&account_data.events, &mut changes).await; + // TODO remove this, we're processing account data events here again + // because we want to have the push rules in place before we process + // rooms and their events, but we want to create the rooms before we + // process the `m.direct` account data event. + if let Some(global_data) = account_data.as_ref().map(|a| &a.global) { + self.handle_account_data(global_data, &mut changes).await; + } // FIXME not yet supported by sliding sync. // changes.presence = presence @@ -228,10 +236,10 @@ impl BaseClient { notifications: changes.notifications, // FIXME not yet supported by sliding sync. presence: Default::default(), - account_data: Default::default(), - to_device: Default::default(), - device_lists: Default::default(), - device_one_time_keys_count: Default::default(), + account_data: account_data.map(|a| a.global).unwrap_or_default(), + to_device_events, + device_lists, + device_one_time_keys_count, }) } } diff --git a/crates/matrix-sdk-common/src/deserialized_responses.rs b/crates/matrix-sdk-common/src/deserialized_responses.rs index ffe6e06d871..e573b59dd36 100644 --- a/crates/matrix-sdk-common/src/deserialized_responses.rs +++ b/crates/matrix-sdk-common/src/deserialized_responses.rs @@ -4,11 +4,8 @@ use ruma::{ api::client::{ push::get_notifications::v3::Notification, sync::sync_events::{ - v3::{ - DeviceLists, Ephemeral, GlobalAccountData, InvitedRoom, Presence, RoomAccountData, - State, ToDevice, - }, - UnreadNotificationsCount as RumaUnreadNotificationsCount, + v3::{Ephemeral, InvitedRoom, Presence, RoomAccountData, State}, + DeviceLists, UnreadNotificationsCount as RumaUnreadNotificationsCount, }, }, events::{ @@ -16,7 +13,8 @@ use ruma::{ MembershipState, RoomMemberEvent, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent, }, - AnySyncTimelineEvent, AnyTimelineEvent, + AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncTimelineEvent, AnyTimelineEvent, + AnyToDeviceEvent, }, serde::Raw, DeviceKeyAlgorithm, EventId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, @@ -133,9 +131,9 @@ pub struct SyncResponse { /// Updates to the presence status of other users. pub presence: Presence, /// The global private data created by this user. - pub account_data: GlobalAccountData, + pub account_data: Vec>, /// Messages sent directly between devices. - pub to_device: ToDevice, + pub to_device_events: Vec>, /// Information on E2E device updates. /// /// Only present on an incremental sync. @@ -187,7 +185,7 @@ pub struct JoinedRoom { /// true). pub state: State, /// The private data that this user has attached to this room. - pub account_data: RoomAccountData, + pub account_data: Vec>, /// The ephemeral events in the room that aren't recorded in the timeline or /// state of the room. e.g. typing. pub ephemeral: Ephemeral, @@ -197,7 +195,7 @@ impl JoinedRoom { pub fn new( timeline: Timeline, state: State, - account_data: RoomAccountData, + account_data: Vec>, ephemeral: Ephemeral, unread_notifications: UnreadNotificationsCount, ) -> Self { diff --git a/crates/matrix-sdk-crypto/README.md b/crates/matrix-sdk-crypto/README.md index b8dc9ad80b4..a7a7f7a696b 100644 --- a/crates/matrix-sdk-crypto/README.md +++ b/crates/matrix-sdk-crypto/README.md @@ -30,14 +30,13 @@ async fn main() -> Result<(), OlmError> { let alice = user_id!("@alice:example.org"); let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await; - let to_device_events = ToDevice::default(); let changed_devices = DeviceLists::default(); let one_time_key_counts = BTreeMap::default(); let unused_fallback_keys = Some(Vec::new()); // Push changes that the server sent to us in a sync response. let decrypted_to_device = machine.receive_sync_changes( - to_device_events, + vec![], &changed_devices, &one_time_key_counts, unused_fallback_keys.as_deref(), diff --git a/crates/matrix-sdk-crypto/src/machine.rs b/crates/matrix-sdk-crypto/src/machine.rs index fbdec2bf2eb..382727794b2 100644 --- a/crates/matrix-sdk-crypto/src/machine.rs +++ b/crates/matrix-sdk-crypto/src/machine.rs @@ -31,11 +31,12 @@ use ruma::{ upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest, }, - sync::sync_events::v3::{DeviceLists, ToDevice}, + sync::sync_events::DeviceLists, }, assign, events::{ - secret::request::SecretName, AnyMessageLikeEvent, AnyTimelineEvent, MessageLikeEventContent, + secret::request::SecretName, AnyMessageLikeEvent, AnyTimelineEvent, AnyToDeviceEvent, + MessageLikeEventContent, }, serde::Raw, DeviceId, DeviceKeyAlgorithm, OwnedDeviceId, OwnedDeviceKeyId, OwnedTransactionId, OwnedUserId, @@ -891,11 +892,11 @@ impl OlmMachine { /// [`decrypt_room_event`]: #method.decrypt_room_event pub async fn receive_sync_changes( &self, - to_device_events: ToDevice, + to_device_events: Vec>, changed_devices: &DeviceLists, one_time_keys_counts: &BTreeMap, unused_fallback_keys: Option<&[DeviceKeyAlgorithm]>, - ) -> OlmResult { + ) -> OlmResult>> { // Remove verification objects that have expired or are done. let mut events = self.verification_machine.garbage_collect(); @@ -912,7 +913,7 @@ impl OlmMachine { } } - for mut raw_event in to_device_events.events { + for mut raw_event in to_device_events { let event: ToDeviceEvents = match raw_event.deserialize_as() { Ok(e) => e, Err(e) => { @@ -1002,10 +1003,7 @@ impl OlmMachine { self.store.save_changes(changes).await?; - let mut to_device = ToDevice::new(); - to_device.events = events; - - Ok(to_device) + Ok(events) } /// Request a room key from our devices. @@ -1586,7 +1584,7 @@ pub(crate) mod tests { api::{ client::{ keys::{claim_keys, get_keys, upload_keys}, - sync::sync_events::v3::{DeviceLists, ToDevice}, + sync::sync_events::v3::DeviceLists, }, IncomingResponse, }, @@ -2005,15 +2003,12 @@ pub(crate) mod tests { let alice_session = alice.group_session_manager.get_outbound_group_session(room_id).unwrap(); - let mut to_device = ToDevice::new(); - to_device.events.push(event); - let decrypted = bob - .receive_sync_changes(to_device, &Default::default(), &Default::default(), None) + .receive_sync_changes(vec![event], &Default::default(), &Default::default(), None) .await .unwrap(); - let event = decrypted.events[0].deserialize().unwrap(); + let event = decrypted[0].deserialize().unwrap(); if let AnyToDeviceEvent::RoomKey(event) = event { assert_eq!(&event.sender, alice.user_id()); @@ -2342,13 +2337,13 @@ pub(crate) mod tests { other: Default::default(), }; let event = json_convert(&event).unwrap(); - let mut to_device = ToDevice::new(); - to_device.events.push(event); let changed_devices = DeviceLists::new(); let key_counts = Default::default(); - let _ = - bob.receive_sync_changes(to_device, &changed_devices, &key_counts, None).await.unwrap(); + let _ = bob + .receive_sync_changes(vec![event], &changed_devices, &key_counts, None) + .await + .unwrap(); let group_session = GroupSession::new(SessionConfig::version_1()); let session_key = group_session.session_key(); @@ -2378,10 +2373,8 @@ pub(crate) mod tests { ); let event: Raw = json_convert(&event).unwrap(); - let mut to_device = ToDevice::new(); - to_device.events.push(event.clone()); - bob.receive_sync_changes(to_device, &changed_devices, &key_counts, None).await.unwrap(); + bob.receive_sync_changes(vec![event], &changed_devices, &key_counts, None).await.unwrap(); let session = bob.store.get_inbound_group_session(room_id, &session_id).await; diff --git a/crates/matrix-sdk/src/sliding_sync.rs b/crates/matrix-sdk/src/sliding_sync.rs index ea2fc19cf4c..4ba3afe123d 100644 --- a/crates/matrix-sdk/src/sliding_sync.rs +++ b/crates/matrix-sdk/src/sliding_sync.rs @@ -17,12 +17,14 @@ use std::{fmt::Debug, sync::Arc}; use anyhow::{bail, Context}; use futures_core::stream::Stream; -use matrix_sdk_base::deserialized_responses::SyncResponse; +use futures_signals::signal::Mutable; +use matrix_sdk_base::deserialized_responses::{SyncResponse, SyncTimelineEvent}; use ruma::{ - api::client::sync::sync_events::v4, + api::client::sync::sync_events::v4::{ + self, AccountDataConfig, E2EEConfig, ExtensionsConfig, ToDeviceConfig, + }, assign, - events::{AnySyncTimelineEvent, RoomEventType}, - serde::Raw, + events::RoomEventType, OwnedRoomId, RoomId, UInt, }; use url::Url; @@ -89,28 +91,30 @@ impl RoomListEntry { } } -pub type AliveRoomTimeline = - Arc>>; +pub type AliveRoomTimeline = Arc>; /// Room info as giving by the SlidingSync Feature. #[derive(Debug, Clone)] pub struct SlidingSyncRoom { room_id: OwnedRoomId, inner: v4::SlidingSyncRoom, - is_loading_more: futures_signals::signal::Mutable, - prev_batch: futures_signals::signal::Mutable>, + is_loading_more: Mutable, + prev_batch: Mutable>, timeline: AliveRoomTimeline, } impl SlidingSyncRoom { - fn from(room_id: OwnedRoomId, mut inner: v4::SlidingSyncRoom) -> Self { - let v4::SlidingSyncRoom { timeline, .. } = inner; + fn from( + room_id: OwnedRoomId, + mut inner: v4::SlidingSyncRoom, + timeline: Vec, + ) -> Self { // we overwrite to only keep one copy inner.timeline = vec![]; Self { room_id, - is_loading_more: futures_signals::signal::Mutable::new(false), - prev_batch: futures_signals::signal::Mutable::new(inner.prev_batch.clone()), + is_loading_more: Mutable::new(false), + prev_batch: Mutable::new(inner.prev_batch.clone()), timeline: Arc::new(futures_signals::signal_vec::MutableVec::new_with_values(timeline)), inner, } @@ -141,7 +145,7 @@ impl SlidingSyncRoom { self.inner.name.as_deref() } - fn update(&mut self, room_data: &v4::SlidingSyncRoom) { + fn update(&mut self, room_data: &v4::SlidingSyncRoom, timeline: Vec) { let v4::SlidingSyncRoom { name, initial, @@ -150,7 +154,6 @@ impl SlidingSyncRoom { unread_notifications, required_state, prev_batch, - timeline, .. } = room_data; @@ -178,8 +181,8 @@ impl SlidingSyncRoom { if !timeline.is_empty() { let mut ref_timeline = self.timeline.lock_mut(); - for e in timeline { - ref_timeline.push_cloned(e.clone()); + for e in timeline.into_iter() { + ref_timeline.push_cloned(e); } } } @@ -192,11 +195,11 @@ impl std::ops::Deref for SlidingSyncRoom { } } -type ViewState = futures_signals::signal::Mutable; -type SyncMode = futures_signals::signal::Mutable; -type PosState = futures_signals::signal::Mutable>; -type RangeState = futures_signals::signal::Mutable>; -type RoomsCount = futures_signals::signal::Mutable>; +type ViewState = Mutable; +type SyncMode = Mutable; +type PosState = Mutable>; +type RangeState = Mutable>; +type RoomsCount = Mutable>; type RoomsList = Arc>; type RoomsMap = Arc>; type RoomsSubscriptions = @@ -243,6 +246,9 @@ pub struct SlidingSync { /// The rooms details #[builder(private, default)] rooms: RoomsMap, + + #[builder(private, default)] + extensions: Mutable>, } impl SlidingSyncBuilder { @@ -275,6 +281,91 @@ impl SlidingSyncBuilder { self.views = Some(views); self } + + /// Activate e2ee, to-device-message and account data extensions if not yet + /// configured. + /// + /// Will leave any extension configuration found untouched, so the order + /// does not matter. + pub fn with_common_extensions(mut self) -> Self { + { + let mut lock = self.extensions.get_or_insert_with(Default::default).lock_mut(); + let mut cfg = lock.get_or_insert_with(Default::default); + if cfg.to_device.is_none() { + cfg.to_device = Some(assign!(ToDeviceConfig::default(), {enabled : Some(true)})); + } + + if cfg.e2ee.is_none() { + cfg.e2ee = Some(assign!(E2EEConfig::default(), {enabled : Some(true)})); + } + + if cfg.account_data.is_none() { + cfg.account_data = + Some(assign!(AccountDataConfig::default(), {enabled : Some(true)})); + } + } + self + } + + /// Set the E2EE extension configuration. + pub fn with_e2ee_extension(mut self, e2ee: E2EEConfig) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .lock_mut() + .get_or_insert_with(Default::default) + .e2ee = Some(e2ee); + self + } + + /// Unset the E2EE extension configuration. + pub fn without_e2ee_extension(mut self) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .lock_mut() + .get_or_insert_with(Default::default) + .e2ee = None; + self + } + + /// Set the ToDevice extension configuration. + pub fn with_to_device_extension(mut self, to_device: ToDeviceConfig) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .lock_mut() + .get_or_insert_with(Default::default) + .to_device = Some(to_device); + self + } + + /// Unset the ToDevice extension configuration. + pub fn without_to_device_extension(mut self) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .lock_mut() + .get_or_insert_with(Default::default) + .to_device = None; + self + } + + /// Set the account data extension configuration. + pub fn with_account_data_extension(mut self, account_data: AccountDataConfig) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .lock_mut() + .get_or_insert_with(Default::default) + .account_data = Some(account_data); + self + } + + /// Unset the account data extension configuration. + pub fn without_account_data_extension(mut self) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .lock_mut() + .get_or_insert_with(Default::default) + .account_data = None; + self + } } impl SlidingSync { @@ -329,6 +420,15 @@ impl SlidingSync { self.rooms.lock_ref().get(&room_id).cloned() } + fn update_to_device_since(&self, since: String) { + self.extensions + .lock_mut() + .get_or_insert_with(Default::default) + .to_device + .get_or_insert_with(Default::default) + .since = Some(since); + } + /// Lookup a set of rooms pub fn get_rooms>( &self, @@ -343,7 +443,7 @@ impl SlidingSync { resp: v4::Response, views: &[SlidingSyncView], ) -> anyhow::Result { - self.client.process_sliding_sync(resp.clone()).await?; + let mut processed = self.client.process_sliding_sync(resp.clone()).await?; tracing::info!("main client processed."); self.pos.replace(Some(resp.pos)); let mut updated_views = Vec::new(); @@ -362,20 +462,33 @@ impl SlidingSync { let mut rooms = Vec::new(); let mut rooms_map = self.rooms.lock_mut(); - for (id, room_data) in resp.rooms.iter() { - if let Some(mut r) = rooms_map.remove(id) { - r.update(room_data); + for (id, mut room_data) in resp.rooms.into_iter() { + let timeline = if let Some(joined_room) = processed.rooms.join.remove(&id) { + joined_room.timeline.events + } else { + let events = room_data.timeline.into_iter().map(Into::into).collect(); + room_data.timeline = vec![]; + events + }; + + if let Some(mut r) = rooms_map.remove(&id) { + r.update(&room_data, timeline); rooms_map.insert_cloned(id.clone(), r); rooms.push(id.clone()); } else { rooms_map.insert_cloned( id.clone(), - SlidingSyncRoom::from(id.clone(), room_data.clone()), + SlidingSyncRoom::from(id.clone(), room_data, timeline), ); - rooms.push(id.clone()); + rooms.push(id); } } + // Update the `to-device` next-batch if found. + if let Some(to_device_since) = resp.extensions.to_device.map(|t| t.next_batch) { + self.update_to_device_since(to_device_since) + } + Ok(UpdateSummary { views: updated_views, rooms }) } @@ -386,9 +499,7 @@ impl SlidingSync { &self, ) -> anyhow::Result> + '_> { let views = self.views.lock_ref().to_vec(); - let _pos = self.pos.clone(); - - // FIXME: hack for while the sliding sync server is on a proxy + let extensions = self.extensions.clone(); let client = self.client.clone(); Ok(async_stream::try_stream! { @@ -398,6 +509,11 @@ impl SlidingSync { .map(SlidingSyncView::request_generator) .collect(); loop { + #[cfg(feature = "e2e-encryption")] + if let Err(e) = client.send_outgoing_requests().await { + tracing::error!(error = ?e, "Error while sending outgoing E2EE requests"); + } + let mut requests = Vec::new(); let mut new_remaining_generators = Vec::new(); let mut new_remaining_views = Vec::new(); @@ -431,6 +547,7 @@ impl SlidingSync { pos: pos.as_deref(), room_subscriptions, unsubscribe_rooms: &unsubscribe_rooms, + extensions: extensions.lock_mut().take().unwrap_or_default(), // extensions are sticky, we pop them here once }); tracing::debug!("requesting"); let resp = client diff --git a/crates/matrix-sdk/src/sync.rs b/crates/matrix-sdk/src/sync.rs index 1145dd971d0..48491d5e92f 100644 --- a/crates/matrix-sdk/src/sync.rs +++ b/crates/matrix-sdk/src/sync.rs @@ -30,17 +30,16 @@ impl Client { rooms, presence, account_data, - to_device, + to_device_events, device_lists: _, device_one_time_keys_count: _, ambiguity_changes: _, notifications, } = &response; - self.handle_sync_events(HandlerKind::GlobalAccountData, &None, &account_data.events) - .await?; + self.handle_sync_events(HandlerKind::GlobalAccountData, &None, account_data).await?; self.handle_sync_events(HandlerKind::Presence, &None, &presence.events).await?; - self.handle_sync_events(HandlerKind::ToDevice, &None, &to_device.events).await?; + self.handle_sync_events(HandlerKind::ToDevice, &None, to_device_events).await?; for (room_id, room_info) in &rooms.join { let room = self.get_room(room_id); @@ -54,8 +53,7 @@ impl Client { self.handle_sync_events(HandlerKind::EphemeralRoomData, &room, &ephemeral.events) .await?; - self.handle_sync_events(HandlerKind::RoomAccountData, &room, &account_data.events) - .await?; + self.handle_sync_events(HandlerKind::RoomAccountData, &room, account_data).await?; self.handle_sync_state_events(&room, &state.events).await?; self.handle_sync_timeline_events(&room, &timeline.events).await?; } diff --git a/labs/jack-in/Cargo.toml b/labs/jack-in/Cargo.toml index d9ad0e3f076..47a5acb98fc 100644 --- a/labs/jack-in/Cargo.toml +++ b/labs/jack-in/Cargo.toml @@ -9,18 +9,22 @@ edition = "2021" file-logging = ["dep:log4rs"] [dependencies] -tuirealm = "~1.7.1" -matrix-sdk = { path = "../../crates/matrix-sdk", default-features = false, features = ["e2e-encryption", "anyhow", "native-tls", "sled", "sliding-sync"] , version = "0.6.0" } -matrix-sdk-common = { path = "../../crates/matrix-sdk-common" , version = "0.6.0" } -structopt = "0.3" -tokio = { version = "1", features = ["rt-multi-thread", "sync", "macros"] } +app_dirs2 = "2" +dialoguer = "0.10.2" +eyre = "0.6" futures = { version = "0.3.1" } futures-signals = "0.3.24" +matrix-sdk = { path = "../../crates/matrix-sdk", default-features = false, features = ["e2e-encryption", "anyhow", "native-tls", "sled", "sliding-sync"], version = "0.6.0" } +matrix-sdk-common = { path = "../../crates/matrix-sdk-common", version = "0.6.0" } +matrix-sdk-sled = { path = "../../crates/matrix-sdk-sled", features = ["state-store", "crypto-store"], version = "0.2.0" } +sanitize-filename-reader-friendly = "2.2.1" +serde_json = "1.0.85" +structopt = "0.3" +tokio = { version = "1", features = ["rt-multi-thread", "sync", "macros"] } tracing-flame = "0.2" tracing-subscriber = "0.3.15" -eyre = "0.6" - tui-logger = "0.8.0" +tuirealm = "~1.7.1" # file-logging specials tracing = { version = "0.1.35", features = ["log"] } diff --git a/labs/jack-in/src/client/mod.rs b/labs/jack-in/src/client/mod.rs index d9e0156f553..8181d84655d 100644 --- a/labs/jack-in/src/client/mod.rs +++ b/labs/jack-in/src/client/mod.rs @@ -1,7 +1,7 @@ use eyre::{Result, WrapErr}; use futures::{pin_mut, StreamExt}; use tokio::sync::mpsc; -use tracing::{error, warn}; +use tracing::{error, info, warn}; pub mod state; @@ -12,13 +12,14 @@ pub async fn run_client( sliding_sync_proxy: String, tx: mpsc::Sender, ) -> Result<()> { - warn!("Starting sliding sync now"); + info!("Starting sliding sync now"); let builder = client.sliding_sync().await; let full_sync_view = SlidingSyncViewBuilder::default_with_fullsync().timeline_limit(10u32).build()?; let syncer = builder .homeserver(sliding_sync_proxy.parse().wrap_err("can't parse sync proxy")?) .add_view(full_sync_view) + .with_common_extensions() .build()?; let stream = syncer.stream().await.expect("we can build the stream"); let view = syncer.views.lock_ref().first().expect("we have the full syncer there").clone(); @@ -26,8 +27,13 @@ pub async fn run_client( let mut ssync_state = state::SlidingSyncState::new(view); tx.send(ssync_state.clone()).await?; + info!("starting polling"); + pin_mut!(stream); - let _first_poll = stream.next().await; + if let Some(Err(e)) = stream.next().await { + error!("Initial Query on sliding sync failed: {:#?}", e); + return Ok(()); + } let view_state = state.read_only().get_cloned(); if view_state != SlidingSyncState::CatchingUp { warn!("Sliding Query failed: {:#?}", view_state); @@ -38,25 +44,26 @@ pub async fn run_client( ssync_state.set_first_render_now(); tx.send(ssync_state.clone()).await?; } - warn!("Done initial sliding sync"); + info!("Done initial sliding sync"); loop { match stream.next().await { Some(Ok(_)) => { // we are switching into live updates mode next. ignoring + let state = state.read_only().get_cloned(); - if state.read_only().get_cloned() == SlidingSyncState::Live { - warn!("Reached live sync"); + if state == SlidingSyncState::Live { + info!("Reached live sync"); break; } let _ = tx.send(ssync_state.clone()).await; } Some(Err(e)) => { - warn!("Error: {:}", e); + error!("Error: {:}", e); break; } None => { - warn!("Never reached live state"); + error!("Never reached live state"); break; } } @@ -88,7 +95,7 @@ pub async fn run_client( } match update { Ok(update) => { - warn!("Live update received: {:?}", update); + info!("Live update received: {:?}", update); tx.send(ssync_state.clone()).await?; err_counter = 0; } diff --git a/labs/jack-in/src/components/details.rs b/labs/jack-in/src/components/details.rs index 2e2e71fe4eb..3610eed684d 100644 --- a/labs/jack-in/src/components/details.rs +++ b/labs/jack-in/src/components/details.rs @@ -79,7 +79,7 @@ impl Details { .timeline() .lock_ref() .iter() - .filter_map(|d| d.deserialize().ok()) + .filter_map(|d| d.event.deserialize().ok()) .map(|e| e.into_full_event(room_id.clone())) .collect(); timeline.reverse(); diff --git a/labs/jack-in/src/main.rs b/labs/jack-in/src/main.rs index 1a33c26684c..52c11bab76a 100644 --- a/labs/jack-in/src/main.rs +++ b/labs/jack-in/src/main.rs @@ -4,16 +4,22 @@ use std::path::{Path, PathBuf}; +use app_dirs2::{app_root, AppDataType, AppInfo}; +use dialoguer::{theme::ColorfulTheme, Password}; use eyre::{eyre, Result}; use matrix_sdk::{ - ruma::{OwnedDeviceId, OwnedRoomId, OwnedUserId}, - Client, Session, + ruma::{OwnedRoomId, OwnedUserId}, + Client, }; -use tracing::{log::LevelFilter, warn}; +use matrix_sdk_sled::make_store_config; +use sanitize_filename_reader_friendly::sanitize; +use tracing::{log, warn}; use tracing_flame::FlameLayer; use tracing_subscriber::prelude::*; use tuirealm::{application::PollStrategy, Event, Update}; +const APP_INFO: AppInfo = AppInfo { name: "jack-in", author: "Matrix-Rust-SDK Core Team" }; + // -- internal mod app; mod client; @@ -75,14 +81,27 @@ struct Opt { #[structopt(short, long, default_value = "http://localhost:8008", env = "JACKIN_SYNC_PROXY")] sliding_sync_proxy: String, - /// Your access token to connect via the - #[structopt(short, long, env = "JACKIN_TOKEN")] - token: String, + /// The password of your account. If not given and no database found, it + /// will prompt you for it + #[structopt(short, long, env = "JACKIN_PASSWORD")] + password: Option, + + /// Create a fresh database, drop all existing cache + #[structopt(long)] + fresh: bool, + + /// RUST_LOG log-levels + #[structopt(short, long, env = "JACKIN_LOG", default_value = "jack_in=info,warn")] + log: String, - /// The userID associated with this access token + /// The userID to log in with #[structopt(short, long, env = "JACKIN_USER")] user: String, + /// The password to encrypt the store with + #[structopt(long, env = "JACKIN_STORE_PASSWORD")] + store_pass: Option, + #[structopt(long)] /// Activate tracing and write the flamegraph to the specified file flames: Option, @@ -112,7 +131,6 @@ async fn main() -> Result<()> { let opt = Opt::from_args(); let user_id: OwnedUserId = opt.user.clone().parse()?; - let device_id: OwnedDeviceId = "XdftAsd".into(); if let Some(ref p) = opt.flames { setup_flames(p.as_path()); @@ -138,41 +156,104 @@ async fn main() -> Result<()> { .logger( Logger::builder() .appender("file") - .build("matrix_sdk::sliding_sync", LevelFilter::Trace), + .build("matrix_sdk::sliding_sync", log::LevelFilter::Trace), ) .logger( Logger::builder() .appender("file") - .build("matrix_sdk::http_client", LevelFilter::Debug), + .build("matrix_sdk::http_client", log::LevelFilter::Debug), ) .logger( Logger::builder() .appender("file") - .build("matrix_sdk_base::sliding_sync", LevelFilter::Debug), + .build("matrix_sdk_base::sliding_sync", log::LevelFilter::Debug), ) - .logger(Logger::builder().appender("file").build("reqwest", LevelFilter::Trace)) - .logger(Logger::builder().appender("file").build("matrix_sdk", LevelFilter::Warn)) - .build(Root::builder().build(LevelFilter::Error)) + .logger( + Logger::builder().appender("file").build("reqwest", log::LevelFilter::Trace), + ) + .logger( + Logger::builder().appender("file").build("matrix_sdk", log::LevelFilter::Warn), + ) + .build(Root::builder().build(log::LevelFilter::Error)) .unwrap(); log4rs::init_config(config).expect("Logging with log4rs failed to initialize"); } #[cfg(not(feature = "file-logging"))] { - tui_logger::init_logger(LevelFilter::Trace).expect("Could not set up logging"); - tui_logger::set_default_level(LevelFilter::Warn); - tui_logger::set_level_for_target("matrix_sdk", LevelFilter::Warn); + tui_logger::init_logger(log::LevelFilter::Trace).unwrap(); + // Set default level for unknown targets to Trace + tui_logger::set_default_level(log::LevelFilter::Warn); + + for pair in opt.log.split(',') { + if let Some((name, lvl)) = pair.split_once('=') { + let level = match lvl.to_lowercase().as_str() { + "trace" => log::LevelFilter::Trace, + "debug" => log::LevelFilter::Debug, + "info" => log::LevelFilter::Info, + "warn" => log::LevelFilter::Warn, + "error" => log::LevelFilter::Error, + // nothing means error + _ => continue, + }; + tui_logger::set_level_for_target(name, level); + } else { + let level = match pair.to_lowercase().as_str() { + "trace" => log::LevelFilter::Trace, + "debug" => log::LevelFilter::Debug, + "info" => log::LevelFilter::Info, + "warn" => log::LevelFilter::Warn, + "error" => log::LevelFilter::Error, + // nothing means error + _ => continue, + }; + tui_logger::set_default_level(level); + } + } } } - let client = Client::builder().server_name(user_id.server_name()).build().await?; - let session = Session { - access_token: opt.token.clone(), - refresh_token: None, - user_id: user_id.clone(), - device_id, - }; - client.restore_login(session).await?; + let data_path = app_root(AppDataType::UserData, &APP_INFO)?.join(sanitize(user_id.as_str())); + if opt.fresh { + // drop the database first; + std::fs::remove_dir_all(&data_path)?; + } + std::fs::create_dir_all(&data_path)?; + let store_config = make_store_config(&data_path, opt.store_pass.as_deref()).await?; + + let client = Client::builder() + .user_agent("jack-in") + .server_name(user_id.server_name()) + .store_config(store_config) + .build() + .await?; + + let session_key = b"jackin::session_token"; + + if let Some(session) = client + .store() + .get_custom_value(session_key) + .await? + .map(|v| serde_json::from_slice(&v)) + .transpose()? + { + tracing::info!("Restoring session from store"); + client.restore_login(session).await?; + } else { + let theme = ColorfulTheme::default(); + let password = match opt.password { + Some(ref pw) => pw.clone(), + _ => Password::with_theme(&theme) + .with_prompt(format!("Password for {user_id:} :")) + .interact()?, + }; + client.login_username(&user_id, &password).send().await?; + } + + if let Some(session) = client.session() { + client.store().set_custom_value(session_key, serde_json::to_vec(&session)?).await?; + } + let sliding_client = client.clone(); let proxy = opt.sliding_sync_proxy.clone();