From 72ec2147244b583a18d19f2347a25f0edd82d3ac Mon Sep 17 00:00:00 2001 From: Valere Date: Tue, 24 Oct 2023 15:58:22 +0200 Subject: [PATCH 01/14] fix members loaded on intitial sync --- spec/integ/crypto/crypto.spec.ts | 4 +-- src/rust-crypto/RoomEncryptor.ts | 42 ++++++++++++++++++++++++++++++-- src/rust-crypto/rust-crypto.ts | 4 --- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/spec/integ/crypto/crypto.spec.ts b/spec/integ/crypto/crypto.spec.ts index 55058bf0789..9eeaa719622 100644 --- a/spec/integ/crypto/crypto.spec.ts +++ b/spec/integ/crypto/crypto.spec.ts @@ -1727,7 +1727,7 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, }); } - oldBackendOnly("Sending an event initiates a member list sync", async () => { + it("Sending an event initiates a member list sync", async () => { // we expect a call to the /members list... const memberListPromise = expectMembershipRequest(ROOM_ID, ["@bob:xyz"]); @@ -1746,7 +1746,7 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, await Promise.all([sendPromise, megolmMessagePromise, memberListPromise]); }); - oldBackendOnly("loading the membership list inhibits a later load", async () => { + it("loading the membership list inhibits a later load", async () => { const room = aliceClient.getRoom(ROOM_ID)!; await Promise.all([room.loadMembersIfNeeded(), expectMembershipRequest(ROOM_ID, ["@bob:xyz"])]); diff --git a/src/rust-crypto/RoomEncryptor.ts b/src/rust-crypto/RoomEncryptor.ts index 5f1af2610ca..d44597d1428 100644 --- a/src/rust-crypto/RoomEncryptor.ts +++ b/src/rust-crypto/RoomEncryptor.ts @@ -14,7 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ -import { EncryptionSettings, OlmMachine, RoomId, UserId } from "@matrix-org/matrix-sdk-crypto-wasm"; +import { EncryptionSettings, OlmMachine, RequestType, RoomId, UserId } from "@matrix-org/matrix-sdk-crypto-wasm"; +import * as RustSdkCryptoJs from "@matrix-org/matrix-sdk-crypto-wasm"; import { EventType } from "../@types/event"; import { IContent, MatrixEvent } from "../models/event"; @@ -22,7 +23,7 @@ import { Room } from "../models/room"; import { Logger, logger } from "../logger"; import { KeyClaimManager } from "./KeyClaimManager"; import { RoomMember } from "../models/room-member"; -import { OutgoingRequestProcessor } from "./OutgoingRequestProcessor"; +import { OutgoingRequest, OutgoingRequestProcessor } from "./OutgoingRequestProcessor"; /** * RoomEncryptor: responsible for encrypting messages to a given room @@ -35,6 +36,7 @@ export class RoomEncryptor { /** * @param olmMachine - The rust-sdk's OlmMachine * @param keyClaimManager - Our KeyClaimManager, which manages the queue of one-time-key claim requests + * @param outgoingRequestProcessor - The OutgoingRequestProcessor, which sends outgoing requests to the homeserver. * @param room - The room we want to encrypt for * @param encryptionSettings - body of the m.room.encryption event currently in force in this room */ @@ -46,6 +48,16 @@ export class RoomEncryptor { private encryptionSettings: IContent, ) { this.prefixedLogger = logger.getChild(`[${room.roomId} encryption]`); + + // start tracking devices for any users already known to be in this room. + // Do not load members here, would defeat lazy loading. + const members = room.getJoinedMembers(); + // At this point just mark the known members as tracked, it might not be the full list of members + // because of lazy loading. This is fine, because we will get a member list update when sending a message for + // the first time, see `RoomEncryptor#ensureEncryptionSession` + this.olmMachine.updateTrackedUsers(members.map((u) => new RustSdkCryptoJs.UserId(u.userId))).then(() => { + this.prefixedLogger.debug(`Updated tracked users for room ${room.roomId}`); + }); } /** @@ -91,7 +103,33 @@ export class RoomEncryptor { ); } + // Manually call `loadMembersIfNeeded` here, because we want to know if it's the first + // time the room is loaded (due to lazy loading), so we can update the tracked users. + const fromServer = await this.room.loadMembersIfNeeded(); const members = await this.room.getEncryptionTargetMembers(); + + if (fromServer) { + // It's the first time the room is loaded, so we need to update the tracked users + await this.olmMachine + .updateTrackedUsers(members.map((u) => new RustSdkCryptoJs.UserId(u.userId))) + .then(() => { + this.prefixedLogger.debug(`Updated tracked users for room ${this.room.roomId}`); + }); + } + + // Query keys in case we don't have them for newly tracked members. + // This must be done before ensuring sessions. If not the devices of these users are not + // known yet and will not get the room key. + // We don't have API to only get the queries related to this member list, so we just + // process the pending `KeysQuery` requests from the olmMachine. (usually these are processed + // at the end of the sync, but we can't wait for that). + const request: OutgoingRequest[] = (await this.olmMachine.outgoingRequests()).filter( + (r: OutgoingRequest) => r.type === RequestType.KeysQuery, + ); + for (let i = 0; i < request.length; i++) { + await this.outgoingRequestProcessor.makeOutgoingRequest(request[i]); + } + this.prefixedLogger.debug( `Encrypting for users (shouldEncryptForInvitedMembers: ${this.room.shouldEncryptForInvitedMembers()}):`, members.map((u) => `${u.userId} (${u.membership})`), diff --git a/src/rust-crypto/rust-crypto.ts b/src/rust-crypto/rust-crypto.ts index 378d79b579f..6f98233ea1d 100644 --- a/src/rust-crypto/rust-crypto.ts +++ b/src/rust-crypto/rust-crypto.ts @@ -1232,10 +1232,6 @@ export class RustCrypto extends TypedEventEmitter new RustSdkCryptoJs.UserId(u.userId))); } /** called by the sync loop after processing each sync. From f9802f4edade62a64734eab68ea01c5bbd142c3e Mon Sep 17 00:00:00 2001 From: Valere Date: Wed, 25 Oct 2023 09:21:46 +0200 Subject: [PATCH 02/14] Update test to use KeyResponder --- spec/integ/crypto/crypto.spec.ts | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/spec/integ/crypto/crypto.spec.ts b/spec/integ/crypto/crypto.spec.ts index 9eeaa719622..4893ce78110 100644 --- a/spec/integ/crypto/crypto.spec.ts +++ b/spec/integ/crypto/crypto.spec.ts @@ -670,7 +670,13 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, }); it("prepareToEncrypt", async () => { - expectAliceKeyQuery({ device_keys: { "@alice:localhost": {} }, failures: {} }); + const homeserverUrl = "https://alice-server.com"; + keyResponder = new E2EKeyResponder(homeserverUrl); + keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); + + const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID"); + keyResponder.addDeviceKeys(testDeviceKeys); + await startClientAndAwaitFirstSync(); aliceClient.setGlobalErrorOnUnknownDevices(false); @@ -678,10 +684,7 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, syncResponder.sendOrQueueSyncResponse(getSyncResponse(["@bob:xyz"])); await syncPromise(aliceClient); - // we expect alice first to query bob's keys... - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - - // ... and then claim one of his OTKs + // Alice should claim one of his OTKs expectAliceKeyClaim(getTestKeysClaimResponse("@bob:xyz")); // fire off the prepare request From cf98910d6c984645c8c585461811fe90fdd29cfb Mon Sep 17 00:00:00 2001 From: Valere Date: Wed, 25 Oct 2023 09:38:34 +0200 Subject: [PATCH 03/14] Use E2EKeyResponder --- spec/integ/crypto/crypto.spec.ts | 44 ++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/spec/integ/crypto/crypto.spec.ts b/spec/integ/crypto/crypto.spec.ts index 4893ce78110..a4af39eab1a 100644 --- a/spec/integ/crypto/crypto.spec.ts +++ b/spec/integ/crypto/crypto.spec.ts @@ -701,17 +701,19 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, it("Alice sends a megolm message with GlobalErrorOnUnknownDevices=false", async () => { aliceClient.setGlobalErrorOnUnknownDevices(false); - expectAliceKeyQuery({ device_keys: { "@alice:localhost": {} }, failures: {} }); + const homeserverUrl = "https://alice-server.com"; + keyResponder = new E2EKeyResponder(homeserverUrl); + keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); + + const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID"); + keyResponder.addDeviceKeys(testDeviceKeys); + await startClientAndAwaitFirstSync(); // Alice shares a room with Bob syncResponder.sendOrQueueSyncResponse(getSyncResponse(["@bob:xyz"])); await syncPromise(aliceClient); - // Once we send the message, Alice will check Bob's device list (twice, because reasons) ... - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - // ... and claim one of his OTKs ... expectAliceKeyClaim(getTestKeysClaimResponse("@bob:xyz")); @@ -727,17 +729,19 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, it("We should start a new megolm session after forceDiscardSession", async () => { aliceClient.setGlobalErrorOnUnknownDevices(false); - expectAliceKeyQuery({ device_keys: { "@alice:localhost": {} }, failures: {} }); + const homeserverUrl = "https://alice-server.com"; + keyResponder = new E2EKeyResponder(homeserverUrl); + keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); + + const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID"); + keyResponder.addDeviceKeys(testDeviceKeys); + await startClientAndAwaitFirstSync(); // Alice shares a room with Bob syncResponder.sendOrQueueSyncResponse(getSyncResponse(["@bob:xyz"])); await syncPromise(aliceClient); - // Once we send the message, Alice will check Bob's device list (twice, because reasons) ... - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - // ... and claim one of his OTKs ... expectAliceKeyClaim(getTestKeysClaimResponse("@bob:xyz")); @@ -1731,12 +1735,16 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, } it("Sending an event initiates a member list sync", async () => { + const homeserverUrl = "https://alice-server.com"; + keyResponder = new E2EKeyResponder(homeserverUrl); + keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); + + const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID"); + keyResponder.addDeviceKeys(testDeviceKeys); + // we expect a call to the /members list... const memberListPromise = expectMembershipRequest(ROOM_ID, ["@bob:xyz"]); - // then a request for bob's devices... - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - // then a to-device with the room_key const inboundGroupSessionPromise = expectSendRoomKey("@bob:xyz", testOlmAccount, p2pSession); @@ -1750,12 +1758,16 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, }); it("loading the membership list inhibits a later load", async () => { + const homeserverUrl = "https://alice-server.com"; + keyResponder = new E2EKeyResponder(homeserverUrl); + keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); + + const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID"); + keyResponder.addDeviceKeys(testDeviceKeys); + const room = aliceClient.getRoom(ROOM_ID)!; await Promise.all([room.loadMembersIfNeeded(), expectMembershipRequest(ROOM_ID, ["@bob:xyz"])]); - // expect a request for bob's devices... - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - // then a to-device with the room_key const inboundGroupSessionPromise = expectSendRoomKey("@bob:xyz", testOlmAccount, p2pSession); From 407110ee5969fb526aa3ab26054fa639133974da Mon Sep 17 00:00:00 2001 From: Valere Date: Wed, 25 Oct 2023 18:06:14 +0200 Subject: [PATCH 04/14] code review --- spec/integ/crypto/crypto.spec.ts | 12 +- .../OutgoingRequestsManager.spec.ts | 196 ++++++++++++++++++ src/rust-crypto/OutgoingRequestsManager.ts | 106 ++++++++++ src/rust-crypto/RoomEncryptor.ts | 35 ++-- src/rust-crypto/rust-crypto.ts | 87 ++------ 5 files changed, 337 insertions(+), 99 deletions(-) create mode 100644 spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts create mode 100644 src/rust-crypto/OutgoingRequestsManager.ts diff --git a/spec/integ/crypto/crypto.spec.ts b/spec/integ/crypto/crypto.spec.ts index a4af39eab1a..07bebf09957 100644 --- a/spec/integ/crypto/crypto.spec.ts +++ b/spec/integ/crypto/crypto.spec.ts @@ -670,7 +670,7 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, }); it("prepareToEncrypt", async () => { - const homeserverUrl = "https://alice-server.com"; + const homeserverUrl = aliceClient.getHomeserverUrl(); keyResponder = new E2EKeyResponder(homeserverUrl); keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); @@ -684,7 +684,7 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, syncResponder.sendOrQueueSyncResponse(getSyncResponse(["@bob:xyz"])); await syncPromise(aliceClient); - // Alice should claim one of his OTKs + // Alice should claim one of Bob's OTKs expectAliceKeyClaim(getTestKeysClaimResponse("@bob:xyz")); // fire off the prepare request @@ -701,7 +701,7 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, it("Alice sends a megolm message with GlobalErrorOnUnknownDevices=false", async () => { aliceClient.setGlobalErrorOnUnknownDevices(false); - const homeserverUrl = "https://alice-server.com"; + const homeserverUrl = aliceClient.getHomeserverUrl(); keyResponder = new E2EKeyResponder(homeserverUrl); keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); @@ -729,7 +729,7 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, it("We should start a new megolm session after forceDiscardSession", async () => { aliceClient.setGlobalErrorOnUnknownDevices(false); - const homeserverUrl = "https://alice-server.com"; + const homeserverUrl = aliceClient.getHomeserverUrl(); keyResponder = new E2EKeyResponder(homeserverUrl); keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); @@ -1735,7 +1735,7 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, } it("Sending an event initiates a member list sync", async () => { - const homeserverUrl = "https://alice-server.com"; + const homeserverUrl = aliceClient.getHomeserverUrl(); keyResponder = new E2EKeyResponder(homeserverUrl); keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); @@ -1758,7 +1758,7 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, }); it("loading the membership list inhibits a later load", async () => { - const homeserverUrl = "https://alice-server.com"; + const homeserverUrl = aliceClient.getHomeserverUrl(); keyResponder = new E2EKeyResponder(homeserverUrl); keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); diff --git a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts new file mode 100644 index 00000000000..4813f86e7bb --- /dev/null +++ b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts @@ -0,0 +1,196 @@ +/* +Copyright 2024 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { Mocked } from "jest-mock"; +import * as RustSdkCryptoJs from "@matrix-org/matrix-sdk-crypto-wasm"; + +import { OutgoingRequestProcessor } from "../../../src/rust-crypto/OutgoingRequestProcessor"; +import { OutgoingRequestsManager } from "../../../src/rust-crypto/OutgoingRequestsManager"; +import { logger } from "../../../lib/logger"; +import { defer } from "../../../src/utils"; +import { OutgoingRequest } from "../../../lib/rust-crypto/OutgoingRequestProcessor"; + +describe("OutgoingRequestsManager", () => { + /** the OutgoingRequestProcessor implementation under test */ + let manager: OutgoingRequestsManager; + + /** a mock OutgoingRequestProcessor */ + let processor: Mocked; + + /** a mocked-up OlmMachine which processor is connected to */ + let olmMachine: Mocked; + + beforeEach(async () => { + olmMachine = { + outgoingRequests: jest.fn(), + } as unknown as Mocked; + + processor = { + makeOutgoingRequest: jest.fn(), + } as unknown as Mocked; + + manager = new OutgoingRequestsManager(logger, olmMachine, processor); + }); + + describe("requestLoop", () => { + it("Requests are processed directly when requested", async () => { + const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); + const request2 = new RustSdkCryptoJs.KeysUploadRequest("foo2", "{}"); + olmMachine.outgoingRequests.mockImplementationOnce(async () => { + return [request1, request2]; + }); + + processor.makeOutgoingRequest.mockImplementationOnce(async () => { + return; + }); + + await manager.requestLoop(); + + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1); + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(2); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request1); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request2); + }); + + it("Stack requests while one is already running", async () => { + const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); + const request2 = new RustSdkCryptoJs.KeysUploadRequest("foo2", "{}"); + const request3 = new RustSdkCryptoJs.KeysBackupRequest("foo3", "{}", "1"); + + const firstOutgoingRequestDefer = defer(); + + olmMachine.outgoingRequests + .mockImplementationOnce(async (): Promise => { + return firstOutgoingRequestDefer.promise; + }) + .mockImplementationOnce(async () => { + return [request3]; + }); + + const firstRequest = manager.requestLoop(); + + // stack 2 additional requests while the first one is still running + const secondRequest = manager.requestLoop(); + const thirdRequest = manager.requestLoop(); + + // let the first request complete + firstOutgoingRequestDefer.resolve([request1, request2]); + + await firstRequest; + await secondRequest; + await thirdRequest; + + // outgoingRequests should be called twice in total, as the second and third requests are + // processed in the same loop. + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(2); + + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(3); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request1); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request2); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request3); + }); + + it("Should not bubble if request is rejected", async () => { + const request = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); + olmMachine.outgoingRequests.mockImplementationOnce(async () => { + return [request]; + }); + + processor.makeOutgoingRequest.mockImplementationOnce(async () => { + throw new Error("Some network error"); + }); + + await manager.requestLoop(); + + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1); + }); + }); + + describe("Stop", () => { + it("Is stopped properly before making requests", async () => { + const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); + + const firstOutgoingRequestDefer = defer(); + + olmMachine.outgoingRequests.mockImplementationOnce(async (): Promise => { + return firstOutgoingRequestDefer.promise; + }); + + const firstRequest = manager.requestLoop(); + + // stop + manager.stop(); + + // let the first request complete + firstOutgoingRequestDefer.resolve([request1]); + + await firstRequest; + + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(0); + }); + + it("Is stopped properly after calling outgoing requests", async () => { + const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); + + const firstOutgoingRequestDefer = defer(); + + olmMachine.outgoingRequests.mockImplementationOnce(async (): Promise => { + return firstOutgoingRequestDefer.promise; + }); + + const firstRequest = manager.requestLoop(); + + // stop + manager.stop(); + + // let the first request complete + firstOutgoingRequestDefer.resolve([request1]); + + await firstRequest; + + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(0); + }); + + it("Is stopped properly in between requests", async () => { + const request1 = new RustSdkCryptoJs.KeysQueryRequest("11", "{}"); + const request2 = new RustSdkCryptoJs.KeysUploadRequest("12", "{}"); + + const firstRequestDefer = defer(); + + olmMachine.outgoingRequests.mockImplementationOnce(async (): Promise => { + return [request1, request2]; + }); + + processor.makeOutgoingRequest + .mockImplementationOnce(async () => { + manager.stop(); + return firstRequestDefer.promise; + }) + .mockImplementationOnce(async () => { + return; + }); + + const firstRequest = manager.requestLoop(); + + firstRequestDefer.resolve(); + + await firstRequest; + + // should have been called once but not twice + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/src/rust-crypto/OutgoingRequestsManager.ts b/src/rust-crypto/OutgoingRequestsManager.ts new file mode 100644 index 00000000000..1891530c53c --- /dev/null +++ b/src/rust-crypto/OutgoingRequestsManager.ts @@ -0,0 +1,106 @@ +/* +Copyright 2024 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { OlmMachine } from "@matrix-org/matrix-sdk-crypto-wasm"; + +import { OutgoingRequest, OutgoingRequestProcessor } from "./OutgoingRequestProcessor"; +import { Logger } from "../logger"; + +/** + * OutgoingRequestsManager: responsible for processing outgoing requests from the OlmMachine. + * Ensure that only one loop is going on at once, and that the requests are processed in order. + */ +export class OutgoingRequestsManager { + /** whether {@link stop} has been called */ + private stopped = false; + + /** whether the loop is currently running */ + private isLoopRunning = false; + + /** queue of requests to be processed once the current loop is finished */ + private requestQueue: (() => void)[] = []; + + public constructor( + private readonly logger: Logger, + private readonly olmMachine: OlmMachine, + public readonly outgoingRequestProcessor: OutgoingRequestProcessor, + ) {} + + /** + * Shut down as soon as possible the current loop of outgoing requests processing. + */ + public stop(): void { + this.stopped = true; + } + + /** + * Process the outgoing requests from the OlmMachine. + * There is only one request running at once, and the others are queued. + * If a request is currently running the queued request will only trigger an additional run. + */ + public async requestLoop(): Promise { + if (this.isLoopRunning) { + // If the task is running, add the request to the queue and wait for completion. + // This ensures that the requests are processed only once at a time. + await new Promise((resolve) => { + this.requestQueue.push(resolve); + }); + } else { + await this.executeLoop(); + } + } + + private async executeLoop(): Promise { + this.isLoopRunning = true; + + await this.processOutgoingRequests(); + + this.isLoopRunning = false; + + if (this.requestQueue.length > 0) { + if (this.stopped) { + this.requestQueue.forEach((resolve) => resolve()); + return; + } + // there are a pending request that need to be executed + const awaitingRequests = this.requestQueue.map((resolve) => resolve); + // reset the queue + this.requestQueue = []; + + // run again and resolve all the pending requests. + await this.executeLoop(); + + awaitingRequests.forEach((resolve) => resolve()); + } + } + + private async processOutgoingRequests(): Promise { + if (this.stopped) return; + + const outgoingRequests: OutgoingRequest[] = await this.olmMachine.outgoingRequests(); + + for (const request of outgoingRequests) { + if (this.stopped) return; + try { + await this.outgoingRequestProcessor.makeOutgoingRequest(request); + } catch (e) { + // as part of the loop we silently ignore errors, but log them. + // The rust sdk will retry the request later as it won't have been marked as sent. + this.logger.error(`Failed to process outgoing request ${request.type}: ${e}`); + } + } + } +} diff --git a/src/rust-crypto/RoomEncryptor.ts b/src/rust-crypto/RoomEncryptor.ts index d44597d1428..ba1ed66cda7 100644 --- a/src/rust-crypto/RoomEncryptor.ts +++ b/src/rust-crypto/RoomEncryptor.ts @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -import { EncryptionSettings, OlmMachine, RequestType, RoomId, UserId } from "@matrix-org/matrix-sdk-crypto-wasm"; +import { EncryptionSettings, OlmMachine, RoomId, UserId } from "@matrix-org/matrix-sdk-crypto-wasm"; import * as RustSdkCryptoJs from "@matrix-org/matrix-sdk-crypto-wasm"; import { EventType } from "../@types/event"; @@ -23,7 +23,7 @@ import { Room } from "../models/room"; import { Logger, logger } from "../logger"; import { KeyClaimManager } from "./KeyClaimManager"; import { RoomMember } from "../models/room-member"; -import { OutgoingRequest, OutgoingRequestProcessor } from "./OutgoingRequestProcessor"; +import { OutgoingRequestsManager } from "./OutgoingRequestsManager"; /** * RoomEncryptor: responsible for encrypting messages to a given room @@ -33,17 +33,20 @@ import { OutgoingRequest, OutgoingRequestProcessor } from "./OutgoingRequestProc export class RoomEncryptor { private readonly prefixedLogger: Logger; + /** whether the room members have been loaded and tracked for the first time */ + private lazyLoadedMembersResolved = false; + /** * @param olmMachine - The rust-sdk's OlmMachine * @param keyClaimManager - Our KeyClaimManager, which manages the queue of one-time-key claim requests - * @param outgoingRequestProcessor - The OutgoingRequestProcessor, which sends outgoing requests to the homeserver. + * @param outgoingRequestManager - The OutgoingRequestManager, which manages the queue of outgoing requests. * @param room - The room we want to encrypt for * @param encryptionSettings - body of the m.room.encryption event currently in force in this room */ public constructor( private readonly olmMachine: OlmMachine, private readonly keyClaimManager: KeyClaimManager, - private readonly outgoingRequestProcessor: OutgoingRequestProcessor, + private readonly outgoingRequestManager: OutgoingRequestsManager, private readonly room: Room, private encryptionSettings: IContent, ) { @@ -108,27 +111,21 @@ export class RoomEncryptor { const fromServer = await this.room.loadMembersIfNeeded(); const members = await this.room.getEncryptionTargetMembers(); - if (fromServer) { + if (fromServer && !this.lazyLoadedMembersResolved) { // It's the first time the room is loaded, so we need to update the tracked users - await this.olmMachine - .updateTrackedUsers(members.map((u) => new RustSdkCryptoJs.UserId(u.userId))) - .then(() => { - this.prefixedLogger.debug(`Updated tracked users for room ${this.room.roomId}`); - }); + await this.olmMachine.updateTrackedUsers(members.map((u) => new RustSdkCryptoJs.UserId(u.userId))); + this.lazyLoadedMembersResolved = true; + this.prefixedLogger.debug(`Updated tracked users for room ${this.room.roomId}`); } // Query keys in case we don't have them for newly tracked members. // This must be done before ensuring sessions. If not the devices of these users are not // known yet and will not get the room key. - // We don't have API to only get the queries related to this member list, so we just - // process the pending `KeysQuery` requests from the olmMachine. (usually these are processed + // We don't have API to only get the keys queries related to this member list, so we just + // process the pending requests from the olmMachine. (usually these are processed // at the end of the sync, but we can't wait for that). - const request: OutgoingRequest[] = (await this.olmMachine.outgoingRequests()).filter( - (r: OutgoingRequest) => r.type === RequestType.KeysQuery, - ); - for (let i = 0; i < request.length; i++) { - await this.outgoingRequestProcessor.makeOutgoingRequest(request[i]); - } + // XXX future improvement process only KeysQueryRequests for the tracked users. + await this.outgoingRequestManager.requestLoop(); this.prefixedLogger.debug( `Encrypting for users (shouldEncryptForInvitedMembers: ${this.room.shouldEncryptForInvitedMembers()}):`, @@ -150,7 +147,7 @@ export class RoomEncryptor { ); if (shareMessages) { for (const m of shareMessages) { - await this.outgoingRequestProcessor.makeOutgoingRequest(m); + await this.outgoingRequestManager.outgoingRequestProcessor.makeOutgoingRequest(m); } } } diff --git a/src/rust-crypto/rust-crypto.ts b/src/rust-crypto/rust-crypto.ts index 6f98233ea1d..2cbed12f886 100644 --- a/src/rust-crypto/rust-crypto.ts +++ b/src/rust-crypto/rust-crypto.ts @@ -27,7 +27,7 @@ import { BackupDecryptor, CryptoBackend, OnSyncCompletedData } from "../common-c import { Logger } from "../logger"; import { ClientPrefix, IHttpOpts, MatrixHttpApi, Method } from "../http-api"; import { RoomEncryptor } from "./RoomEncryptor"; -import { OutgoingRequest, OutgoingRequestProcessor } from "./OutgoingRequestProcessor"; +import { OutgoingRequestProcessor } from "./OutgoingRequestProcessor"; import { KeyClaimManager } from "./KeyClaimManager"; import { MapWithDefault, encodeUri } from "../utils"; import { @@ -72,6 +72,7 @@ import { ClientStoppedError } from "../errors"; import { ISignatures } from "../@types/signed"; import { encodeBase64 } from "../base64"; import { DecryptionError } from "../crypto/algorithms"; +import { OutgoingRequestsManager } from "./OutgoingRequestsManager"; const ALL_VERIFICATION_METHODS = ["m.sas.v1", "m.qr_code.scan.v1", "m.qr_code.show.v1", "m.reciprocate.v1"]; @@ -94,16 +95,6 @@ export class RustCrypto extends TypedEventEmitter = {}; @@ -112,6 +103,7 @@ export class RustCrypto extends TypedEventEmitter = {}; // When did we last try to check the server for a given session id? @@ -144,6 +136,12 @@ export class RustCrypto extends TypedEventEmitter { - this.logger.error("Error processing outgoing-message requests from rust crypto-sdk", e); - }); - } - - private async outgoingRequestLoopInner(): Promise { - /* istanbul ignore if */ - if (this.outgoingRequestLoopRunning) { - throw new Error("Cannot run two outgoing request loops"); - } - this.outgoingRequestLoopRunning = true; - try { - while (!this.stopped) { - // we clear the "one more loop" flag just before calling `OlmMachine.outgoingRequests()`, so we can tell - // if `this.outgoingRequestLoop()` was called while `OlmMachine.outgoingRequests()` was running. - this.outgoingRequestLoopOneMoreLoop = false; - - const outgoingRequests: Object[] = await this.olmMachine.outgoingRequests(); - - if (this.stopped) { - // we've been told to stop while `outgoingRequests` was running: exit the loop without processing - // any of the returned requests (anything important will happen next time the client starts.) - return; - } - - if (outgoingRequests.length === 0 && !this.outgoingRequestLoopOneMoreLoop) { - // `OlmMachine.outgoingRequests` returned no messages, and there was no call to - // `this.outgoingRequestLoop()` while it was running. We can stop the loop for a while. - return; - } - - for (const msg of outgoingRequests) { - await this.outgoingRequestProcessor.makeOutgoingRequest(msg as OutgoingRequest); - } - } - } finally { - this.outgoingRequestLoopRunning = false; - } + this.outgoingRequestsManager.requestLoop(); } } From f42d81386ab2fdbdc71be848afba7e4c82464b7a Mon Sep 17 00:00:00 2001 From: Valere Date: Wed, 25 Oct 2023 18:19:33 +0200 Subject: [PATCH 05/14] better comment --- spec/integ/crypto/crypto.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/integ/crypto/crypto.spec.ts b/spec/integ/crypto/crypto.spec.ts index 07bebf09957..6f0b0c6342c 100644 --- a/spec/integ/crypto/crypto.spec.ts +++ b/spec/integ/crypto/crypto.spec.ts @@ -714,7 +714,7 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, syncResponder.sendOrQueueSyncResponse(getSyncResponse(["@bob:xyz"])); await syncPromise(aliceClient); - // ... and claim one of his OTKs ... + // ... and claim one of Bob's OTKs ... expectAliceKeyClaim(getTestKeysClaimResponse("@bob:xyz")); // ... and send an m.room_key message @@ -742,7 +742,7 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, syncResponder.sendOrQueueSyncResponse(getSyncResponse(["@bob:xyz"])); await syncPromise(aliceClient); - // ... and claim one of his OTKs ... + // ... and claim one of Bob's OTKs ... expectAliceKeyClaim(getTestKeysClaimResponse("@bob:xyz")); // ... and send an m.room_key message From eef6a451fbdd20b2f5229b9b4734a9353c479488 Mon Sep 17 00:00:00 2001 From: Valere Date: Wed, 25 Oct 2023 18:37:24 +0200 Subject: [PATCH 06/14] fix test --- spec/unit/rust-crypto/rust-crypto.spec.ts | 48 ++--------------------- 1 file changed, 4 insertions(+), 44 deletions(-) diff --git a/spec/unit/rust-crypto/rust-crypto.spec.ts b/spec/unit/rust-crypto/rust-crypto.spec.ts index 4d6973084f5..bde6e0b3da0 100644 --- a/spec/unit/rust-crypto/rust-crypto.spec.ts +++ b/spec/unit/rust-crypto/rust-crypto.spec.ts @@ -50,6 +50,7 @@ import { import * as testData from "../../test-utils/test-data"; import { defer } from "../../../src/utils"; import { logger } from "../../../src/logger"; +import { OutgoingRequestsManager } from "../../../src/rust-crypto/OutgoingRequestsManager"; const TEST_USER = "@alice:example.com"; const TEST_DEVICE_ID = "TEST_DEVICE"; @@ -345,6 +346,8 @@ describe("RustCrypto", () => { makeOutgoingRequest: jest.fn(), } as unknown as Mocked; + const outgoingRequestsManager = new OutgoingRequestsManager(logger, olmMachine, outgoingRequestProcessor); + rustCrypto = new RustCrypto( logger, olmMachine, @@ -355,6 +358,7 @@ describe("RustCrypto", () => { {} as CryptoCallbacks, ); rustCrypto["outgoingRequestProcessor"] = outgoingRequestProcessor; + rustCrypto["outgoingRequestsManager"] = outgoingRequestsManager; }); it("should poll for outgoing messages and send them", async () => { @@ -393,50 +397,6 @@ describe("RustCrypto", () => { await awaitCallToMakeOutgoingRequest(); expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(2); }); - - it("stops looping when stop() is called", async () => { - for (let i = 0; i < 5; i++) { - outgoingRequestQueue.push([new KeysQueryRequest("1234", "{}")]); - } - - let makeRequestPromise = awaitCallToMakeOutgoingRequest(); - - rustCrypto.onSyncCompleted({}); - - expect(rustCrypto["outgoingRequestLoopRunning"]).toBeTruthy(); - - // go a couple of times round the loop - let resolveMakeRequest = await makeRequestPromise; - makeRequestPromise = awaitCallToMakeOutgoingRequest(); - resolveMakeRequest(); - - resolveMakeRequest = await makeRequestPromise; - makeRequestPromise = awaitCallToMakeOutgoingRequest(); - resolveMakeRequest(); - - // a second sync while this is going on shouldn't make any difference - rustCrypto.onSyncCompleted({}); - - resolveMakeRequest = await makeRequestPromise; - outgoingRequestProcessor.makeOutgoingRequest.mockReset(); - resolveMakeRequest(); - - // now stop... - rustCrypto.stop(); - - // which should (eventually) cause the loop to stop with no further calls to outgoingRequests - olmMachine.outgoingRequests.mockReset(); - - await new Promise((resolve) => { - setTimeout(resolve, 100); - }); - expect(rustCrypto["outgoingRequestLoopRunning"]).toBeFalsy(); - expect(outgoingRequestProcessor.makeOutgoingRequest).not.toHaveBeenCalled(); - expect(olmMachine.outgoingRequests).not.toHaveBeenCalled(); - - // we sent three, so there should be 2 left - expect(outgoingRequestQueue.length).toEqual(2); - }); }); describe(".getEventEncryptionInfo", () => { From c5c745e74790390c6f20ee0d45eee67fdf9894a3 Mon Sep 17 00:00:00 2001 From: Valere Date: Wed, 25 Oct 2023 18:40:08 +0200 Subject: [PATCH 07/14] post merge fix --- src/rust-crypto/RoomEncryptor.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rust-crypto/RoomEncryptor.ts b/src/rust-crypto/RoomEncryptor.ts index dedc8a93818..3d8cf4dbcd1 100644 --- a/src/rust-crypto/RoomEncryptor.ts +++ b/src/rust-crypto/RoomEncryptor.ts @@ -30,6 +30,7 @@ import { Logger, logger } from "../logger"; import { KeyClaimManager } from "./KeyClaimManager"; import { RoomMember } from "../models/room-member"; import { HistoryVisibility } from "../@types/partials"; +import { OutgoingRequestsManager } from "./OutgoingRequestsManager"; /** * RoomEncryptor: responsible for encrypting messages to a given room From 683a5b9e04a2d089fc00441d8e5c796f097e6340 Mon Sep 17 00:00:00 2001 From: Valere Date: Wed, 25 Oct 2023 18:44:42 +0200 Subject: [PATCH 08/14] fix imports --- spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts index 4813f86e7bb..4486c13ed22 100644 --- a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts +++ b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts @@ -17,11 +17,10 @@ limitations under the License. import { Mocked } from "jest-mock"; import * as RustSdkCryptoJs from "@matrix-org/matrix-sdk-crypto-wasm"; -import { OutgoingRequestProcessor } from "../../../src/rust-crypto/OutgoingRequestProcessor"; +import { OutgoingRequest, OutgoingRequestProcessor } from "../../../src/rust-crypto/OutgoingRequestProcessor"; import { OutgoingRequestsManager } from "../../../src/rust-crypto/OutgoingRequestsManager"; -import { logger } from "../../../lib/logger"; import { defer } from "../../../src/utils"; -import { OutgoingRequest } from "../../../lib/rust-crypto/OutgoingRequestProcessor"; +import { logger } from "../../../src/logger"; describe("OutgoingRequestsManager", () => { /** the OutgoingRequestProcessor implementation under test */ From cef893bf46414d6c25bb1c1e8f3f968c869ee5f2 Mon Sep 17 00:00:00 2001 From: Valere Date: Thu, 26 Oct 2023 08:54:38 +0200 Subject: [PATCH 09/14] refactoring, better names --- .../OutgoingRequestsManager.spec.ts | 16 +++++------ src/rust-crypto/OutgoingRequestsManager.ts | 28 +++++++++++-------- src/rust-crypto/RoomEncryptor.ts | 2 +- src/rust-crypto/rust-crypto.ts | 4 +-- 4 files changed, 28 insertions(+), 22 deletions(-) diff --git a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts index 4486c13ed22..7e8fbe91ea6 100644 --- a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts +++ b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts @@ -56,7 +56,7 @@ describe("OutgoingRequestsManager", () => { return; }); - await manager.requestLoop(); + await manager.doProcessOutgoingRequests(); expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1); expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(2); @@ -79,11 +79,11 @@ describe("OutgoingRequestsManager", () => { return [request3]; }); - const firstRequest = manager.requestLoop(); + const firstRequest = manager.doProcessOutgoingRequests(); // stack 2 additional requests while the first one is still running - const secondRequest = manager.requestLoop(); - const thirdRequest = manager.requestLoop(); + const secondRequest = manager.doProcessOutgoingRequests(); + const thirdRequest = manager.doProcessOutgoingRequests(); // let the first request complete firstOutgoingRequestDefer.resolve([request1, request2]); @@ -112,7 +112,7 @@ describe("OutgoingRequestsManager", () => { throw new Error("Some network error"); }); - await manager.requestLoop(); + await manager.doProcessOutgoingRequests(); expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1); }); @@ -128,7 +128,7 @@ describe("OutgoingRequestsManager", () => { return firstOutgoingRequestDefer.promise; }); - const firstRequest = manager.requestLoop(); + const firstRequest = manager.doProcessOutgoingRequests(); // stop manager.stop(); @@ -150,7 +150,7 @@ describe("OutgoingRequestsManager", () => { return firstOutgoingRequestDefer.promise; }); - const firstRequest = manager.requestLoop(); + const firstRequest = manager.doProcessOutgoingRequests(); // stop manager.stop(); @@ -182,7 +182,7 @@ describe("OutgoingRequestsManager", () => { return; }); - const firstRequest = manager.requestLoop(); + const firstRequest = manager.doProcessOutgoingRequests(); firstRequestDefer.resolve(); diff --git a/src/rust-crypto/OutgoingRequestsManager.ts b/src/rust-crypto/OutgoingRequestsManager.ts index 1891530c53c..2b7b90efda2 100644 --- a/src/rust-crypto/OutgoingRequestsManager.ts +++ b/src/rust-crypto/OutgoingRequestsManager.ts @@ -27,8 +27,8 @@ export class OutgoingRequestsManager { /** whether {@link stop} has been called */ private stopped = false; - /** whether the loop is currently running */ - private isLoopRunning = false; + /** whether a task is currently running */ + private isTaskRunning = false; /** queue of requests to be processed once the current loop is finished */ private requestQueue: (() => void)[] = []; @@ -48,27 +48,33 @@ export class OutgoingRequestsManager { /** * Process the outgoing requests from the OlmMachine. + * + * This should be called at the end of each sync, to process any requests that have been queued. + * In some cases if outgoing requests need to be sent immediately, this can be called directly. + * * There is only one request running at once, and the others are queued. * If a request is currently running the queued request will only trigger an additional run. */ - public async requestLoop(): Promise { - if (this.isLoopRunning) { + public async doProcessOutgoingRequests(): Promise { + if (this.isTaskRunning) { // If the task is running, add the request to the queue and wait for completion. // This ensures that the requests are processed only once at a time. await new Promise((resolve) => { this.requestQueue.push(resolve); }); } else { - await this.executeLoop(); + await this.executeTask(); } } - private async executeLoop(): Promise { - this.isLoopRunning = true; + private async executeTask(): Promise { + this.isTaskRunning = true; - await this.processOutgoingRequests(); - - this.isLoopRunning = false; + try { + await this.processOutgoingRequests(); + } finally { + this.isTaskRunning = false; + } if (this.requestQueue.length > 0) { if (this.stopped) { @@ -81,7 +87,7 @@ export class OutgoingRequestsManager { this.requestQueue = []; // run again and resolve all the pending requests. - await this.executeLoop(); + await this.executeTask(); awaitingRequests.forEach((resolve) => resolve()); } diff --git a/src/rust-crypto/RoomEncryptor.ts b/src/rust-crypto/RoomEncryptor.ts index e8fff51056e..133a383c06c 100644 --- a/src/rust-crypto/RoomEncryptor.ts +++ b/src/rust-crypto/RoomEncryptor.ts @@ -133,7 +133,7 @@ export class RoomEncryptor { // process the pending requests from the olmMachine. (usually these are processed // at the end of the sync, but we can't wait for that). // XXX future improvement process only KeysQueryRequests for the tracked users. - await this.outgoingRequestManager.requestLoop(); + await this.outgoingRequestManager.doProcessOutgoingRequests(); this.prefixedLogger.debug( `Encrypting for users (shouldEncryptForInvitedMembers: ${this.room.shouldEncryptForInvitedMembers()}):`, diff --git a/src/rust-crypto/rust-crypto.ts b/src/rust-crypto/rust-crypto.ts index 9944ce811b8..bef19918880 100644 --- a/src/rust-crypto/rust-crypto.ts +++ b/src/rust-crypto/rust-crypto.ts @@ -1256,7 +1256,7 @@ export class RustCrypto extends TypedEventEmitter Date: Fri, 27 Oct 2023 17:25:04 +0200 Subject: [PATCH 10/14] code review --- .../OutgoingRequestsManager.spec.ts | 74 ++++++++++++++++-- src/rust-crypto/OutgoingRequestsManager.ts | 78 ++++++++++--------- src/rust-crypto/rust-crypto.ts | 8 +- 3 files changed, 117 insertions(+), 43 deletions(-) diff --git a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts index 7e8fbe91ea6..17e89308e82 100644 --- a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts +++ b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts @@ -23,13 +23,13 @@ import { defer } from "../../../src/utils"; import { logger } from "../../../src/logger"; describe("OutgoingRequestsManager", () => { - /** the OutgoingRequestProcessor implementation under test */ + /** the OutgoingRequestsManager implementation under test */ let manager: OutgoingRequestsManager; - /** a mock OutgoingRequestProcessor */ + /** a mock OutgoingRequestProcessor */ let processor: Mocked; - /** a mocked-up OlmMachine which processor is connected to */ + /** a mocked-up OlmMachine which manager is connected to */ let olmMachine: Mocked; beforeEach(async () => { @@ -64,7 +64,7 @@ describe("OutgoingRequestsManager", () => { expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request2); }); - it("Stack requests while one is already running", async () => { + it("Stack and batch calls to doProcessOutgoingRequests while one is already running", async () => { const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); const request2 = new RustSdkCryptoJs.KeysUploadRequest("foo2", "{}"); const request3 = new RustSdkCryptoJs.KeysBackupRequest("foo3", "{}", "1"); @@ -102,7 +102,69 @@ describe("OutgoingRequestsManager", () => { expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request3); }); - it("Should not bubble if request is rejected", async () => { + it("Process 3 consecutive calls to doProcessOutgoingRequests while not blocking first one", async () => { + const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); + const request2 = new RustSdkCryptoJs.KeysUploadRequest("foo2", "{}"); + const request3 = new RustSdkCryptoJs.KeysBackupRequest("foo3", "{}", "1"); + + // create defer to control if there is a loop going on + const firstOutgoingRequestDefer = defer(); + const secondOutgoingRequestDefer = defer(); + const thirdOutgoingRequestDefer = defer(); + + olmMachine.outgoingRequests + .mockImplementationOnce(async (): Promise => { + return firstOutgoingRequestDefer.promise; + }) + .mockImplementationOnce(async () => { + return secondOutgoingRequestDefer.promise; + }) + .mockImplementationOnce(async () => { + return thirdOutgoingRequestDefer.promise; + }) + .mockImplementationOnce(async () => { + // Another one that should not occur + return []; + }); + + const firstRequest = manager.doProcessOutgoingRequests(); + + // First request will start an iteration and for now is awaiting on firstOutgoingRequestDefer + + // Query a new request now, this would request a new iteration + const secondRequest = manager.doProcessOutgoingRequests(); + + // let the first iteration complete + firstOutgoingRequestDefer.resolve([request1]); + + // The first request should be now complete + await firstRequest; + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(1); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request1); + + // The second request is awaiting on secondOutgoingRequestDefer + // stack a new request that should be processed in an additional iteration + + const thirdRequest = manager.doProcessOutgoingRequests(); + + secondOutgoingRequestDefer.resolve([request2]); + await secondRequest; + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(2); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request2); + + // The third request is awaiting on thirdOutgoingRequestDefer + + thirdOutgoingRequestDefer.resolve([request3]); + await thirdRequest; + + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(3); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request3); + + // ensure that no other iteration is going on + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(3); + }); + + it("Should not bubble exceptions if server request is rejected", async () => { const request = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); olmMachine.outgoingRequests.mockImplementationOnce(async () => { return [request]; @@ -118,7 +180,7 @@ describe("OutgoingRequestsManager", () => { }); }); - describe("Stop", () => { + describe("Calling stop on the manager should stop ongoing work", () => { it("Is stopped properly before making requests", async () => { const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); diff --git a/src/rust-crypto/OutgoingRequestsManager.ts b/src/rust-crypto/OutgoingRequestsManager.ts index 2b7b90efda2..6d2f7c93b88 100644 --- a/src/rust-crypto/OutgoingRequestsManager.ts +++ b/src/rust-crypto/OutgoingRequestsManager.ts @@ -18,6 +18,7 @@ import { OlmMachine } from "@matrix-org/matrix-sdk-crypto-wasm"; import { OutgoingRequest, OutgoingRequestProcessor } from "./OutgoingRequestProcessor"; import { Logger } from "../logger"; +import { defer, IDeferred } from "../utils"; /** * OutgoingRequestsManager: responsible for processing outgoing requests from the OlmMachine. @@ -30,8 +31,11 @@ export class OutgoingRequestsManager { /** whether a task is currently running */ private isTaskRunning = false; - /** queue of requests to be processed once the current loop is finished */ - private requestQueue: (() => void)[] = []; + /** + * If there are additional calls to doProcessOutgoingRequests() while there is a current call running + * we need to remember in order to call process again (as there could be new requests). + */ + private nextLoopDeferred?: IDeferred; public constructor( private readonly logger: Logger, @@ -47,52 +51,56 @@ export class OutgoingRequestsManager { } /** - * Process the outgoing requests from the OlmMachine. + * Process the OutgoingRequests from the OlmMachine. * - * This should be called at the end of each sync, to process any requests that have been queued. - * In some cases if outgoing requests need to be sent immediately, this can be called directly. + * This should be called at the end of each sync, to process any OlmMachine OutgoingRequests created by the rust sdk. + * In some cases if OutgoingRequests need to be sent immediately, this can be called directly. * - * There is only one request running at once, and the others are queued. - * If a request is currently running the queued request will only trigger an additional run. + * Calls to doProcessOutgoingRequests() are processed synchronously, one after the other, in order. + * If doProcessOutgoingRequests() is called while another call is still being processed, it will be queued. + * Multiple calls to doProcessOutgoingRequests() when a call is already processing will be batched together. */ public async doProcessOutgoingRequests(): Promise { if (this.isTaskRunning) { // If the task is running, add the request to the queue and wait for completion. // This ensures that the requests are processed only once at a time. - await new Promise((resolve) => { - this.requestQueue.push(resolve); - }); + if (!this.nextLoopDeferred) { + this.nextLoopDeferred = defer(); + } + return this.nextLoopDeferred.promise; } else { - await this.executeTask(); - } - } - - private async executeTask(): Promise { - this.isTaskRunning = true; - - try { - await this.processOutgoingRequests(); - } finally { - this.isTaskRunning = false; - } - - if (this.requestQueue.length > 0) { - if (this.stopped) { - this.requestQueue.forEach((resolve) => resolve()); - return; + this.isTaskRunning = true; + try { + await this.processOutgoingRequests(); + } finally { + this.isTaskRunning = false; } - // there are a pending request that need to be executed - const awaitingRequests = this.requestQueue.map((resolve) => resolve); - // reset the queue - this.requestQueue = []; - - // run again and resolve all the pending requests. - await this.executeTask(); - awaitingRequests.forEach((resolve) => resolve()); + // If there was some request while this iteration was running, run a second time and resolve the linked promise. + if (this.nextLoopDeferred) { + if (this.stopped) { + this.nextLoopDeferred.resolve(); + return; + } + + // keep the current deferred requests to resolve them after the next iteration. + const deferred = this.nextLoopDeferred; + // reset the nextLoopDeferred so that any future requests are queued for another additional iteration. + this.nextLoopDeferred = undefined; + + // Run again and resolve all the pending requests. + // Notice that we don't await on it, so that the current promise is resolved now. + // The requests that were deferred will be resolved after this new iteration. + this.doProcessOutgoingRequests().then(() => { + deferred.resolve(); + }); + } } } + /** + * Make a single request to `olmMachine.outgoingRequests` and do the corresponding requests. + */ private async processOutgoingRequests(): Promise { if (this.stopped) return; diff --git a/src/rust-crypto/rust-crypto.ts b/src/rust-crypto/rust-crypto.ts index bef19918880..8631c5d3016 100644 --- a/src/rust-crypto/rust-crypto.ts +++ b/src/rust-crypto/rust-crypto.ts @@ -1256,7 +1256,9 @@ export class RustCrypto extends TypedEventEmitter { + this.logger.warn("onSyncCompleted: Error processing outgoing requests", e); + }); } /** @@ -1506,7 +1508,9 @@ export class RustCrypto extends TypedEventEmitter { + this.logger.warn("onKeyVerificationRequest: Error processing outgoing requests", e); + }); } } From 13a7ad35c470d8d83fc618533d5b011aa05832c5 Mon Sep 17 00:00:00 2001 From: Valere Date: Fri, 27 Oct 2023 17:41:37 +0200 Subject: [PATCH 11/14] clean tests --- .../OutgoingRequestsManager.spec.ts | 32 +++---------------- 1 file changed, 5 insertions(+), 27 deletions(-) diff --git a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts index 17e89308e82..94d80253d16 100644 --- a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts +++ b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts @@ -44,8 +44,8 @@ describe("OutgoingRequestsManager", () => { manager = new OutgoingRequestsManager(logger, olmMachine, processor); }); - describe("requestLoop", () => { - it("Requests are processed directly when requested", async () => { + describe("Call doProcessOutgoingRequests", () => { + it("The call triggers handling of the machine outgoing requests", async () => { const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); const request2 = new RustSdkCryptoJs.KeysUploadRequest("foo2", "{}"); olmMachine.outgoingRequests.mockImplementationOnce(async () => { @@ -102,7 +102,7 @@ describe("OutgoingRequestsManager", () => { expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request3); }); - it("Process 3 consecutive calls to doProcessOutgoingRequests while not blocking first one", async () => { + it("Process 3 consecutive calls to doProcessOutgoingRequests while not blocking previous ones", async () => { const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); const request2 = new RustSdkCryptoJs.KeysUploadRequest("foo2", "{}"); const request3 = new RustSdkCryptoJs.KeysBackupRequest("foo3", "{}", "1"); @@ -181,7 +181,7 @@ describe("OutgoingRequestsManager", () => { }); describe("Calling stop on the manager should stop ongoing work", () => { - it("Is stopped properly before making requests", async () => { + it("When the manager is stopped after outgoingRequests() call, do not make sever requests", async () => { const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); const firstOutgoingRequestDefer = defer(); @@ -203,29 +203,7 @@ describe("OutgoingRequestsManager", () => { expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(0); }); - it("Is stopped properly after calling outgoing requests", async () => { - const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); - - const firstOutgoingRequestDefer = defer(); - - olmMachine.outgoingRequests.mockImplementationOnce(async (): Promise => { - return firstOutgoingRequestDefer.promise; - }); - - const firstRequest = manager.doProcessOutgoingRequests(); - - // stop - manager.stop(); - - // let the first request complete - firstOutgoingRequestDefer.resolve([request1]); - - await firstRequest; - - expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(0); - }); - - it("Is stopped properly in between requests", async () => { + it("When the manager is stopped while doing server calls, it should stop before the next sever call", async () => { const request1 = new RustSdkCryptoJs.KeysQueryRequest("11", "{}"); const request2 = new RustSdkCryptoJs.KeysUploadRequest("12", "{}"); From 1aa5775a0cf7cfa18d413bac4ddb7e2e7a621f20 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 1 Nov 2023 17:26:18 +0000 Subject: [PATCH 12/14] Cleanups per review comments --- src/rust-crypto/OutgoingRequestsManager.ts | 85 ++++++++++++++-------- src/rust-crypto/RoomEncryptor.ts | 12 +-- 2 files changed, 60 insertions(+), 37 deletions(-) diff --git a/src/rust-crypto/OutgoingRequestsManager.ts b/src/rust-crypto/OutgoingRequestsManager.ts index 6d2f7c93b88..7486bccbf5c 100644 --- a/src/rust-crypto/OutgoingRequestsManager.ts +++ b/src/rust-crypto/OutgoingRequestsManager.ts @@ -28,12 +28,16 @@ export class OutgoingRequestsManager { /** whether {@link stop} has been called */ private stopped = false; - /** whether a task is currently running */ - private isTaskRunning = false; + /** whether {@link outgoingRequestLoop} is currently running */ + private outgoingRequestLoopRunning = false; /** * If there are additional calls to doProcessOutgoingRequests() while there is a current call running - * we need to remember in order to call process again (as there could be new requests). + * we need to remember in order to call `doProcessOutgoingRequests` again (as there could be new requests). + * + * If this is defined, it is an indication that we need to do another iteration; in this case the deferred + * will resolve once that next iteration completes. If it is undefined, there have been no new calls + * to `doProcessOutgoingRequests` since the current iteration started. */ private nextLoopDeferred?: IDeferred; @@ -60,41 +64,58 @@ export class OutgoingRequestsManager { * If doProcessOutgoingRequests() is called while another call is still being processed, it will be queued. * Multiple calls to doProcessOutgoingRequests() when a call is already processing will be batched together. */ - public async doProcessOutgoingRequests(): Promise { - if (this.isTaskRunning) { - // If the task is running, add the request to the queue and wait for completion. - // This ensures that the requests are processed only once at a time. - if (!this.nextLoopDeferred) { - this.nextLoopDeferred = defer(); - } - return this.nextLoopDeferred.promise; - } else { - this.isTaskRunning = true; - try { - await this.processOutgoingRequests(); - } finally { - this.isTaskRunning = false; - } + public doProcessOutgoingRequests(): Promise { + // Flag that we need at least one more iteration of the loop. + // + // It is important that we do this even if the loop is currently running. There is potential for a race whereby + // a request is added to the queue *after* `OlmMachine.outgoingRequests` checks the queue, but *before* it + // returns. In such a case, the item could sit there unnoticed for some time. + // + // In order to circumvent the race, we set a flag which tells the loop to go round once again even if the + // queue appears to be empty. + if (!this.nextLoopDeferred) { + this.nextLoopDeferred = defer(); + } + + // ... and wait for it to complete. + const result = this.nextLoopDeferred.promise; - // If there was some request while this iteration was running, run a second time and resolve the linked promise. - if (this.nextLoopDeferred) { - if (this.stopped) { - this.nextLoopDeferred.resolve(); - return; - } + // set the loop going if it is not already. + if (!this.outgoingRequestLoopRunning) { + this.outgoingRequestLoop().catch((e) => { + // this should not happen; outgoingRequestLoop should return any errors via `nextLoopDeferred`. + /* istanbul ignore next */ + this.logger.error("Uncaught error in outgoing request loop", e); + }); + } + return result; + } - // keep the current deferred requests to resolve them after the next iteration. + private async outgoingRequestLoop(): Promise { + /* istanbul ignore if */ + if (this.outgoingRequestLoopRunning) { + throw new Error("Cannot run two outgoing request loops"); + } + this.outgoingRequestLoopRunning = true; + try { + while (!this.stopped && this.nextLoopDeferred) { const deferred = this.nextLoopDeferred; - // reset the nextLoopDeferred so that any future requests are queued for another additional iteration. + + // reset `nextLoopDeferred` so that any future calls to `doProcessOutgoingRequests` are queued + // for another additional iteration. this.nextLoopDeferred = undefined; - // Run again and resolve all the pending requests. - // Notice that we don't await on it, so that the current promise is resolved now. - // The requests that were deferred will be resolved after this new iteration. - this.doProcessOutgoingRequests().then(() => { - deferred.resolve(); - }); + // make the requests and feed the results back to the `nextLoopDeferred` + await this.processOutgoingRequests().then(deferred.resolve, deferred.reject); } + } finally { + this.outgoingRequestLoopRunning = false; + } + + if (this.nextLoopDeferred) { + // the loop was stopped, but there was a call to `doProcessOutgoingRequests`. Make sure that + // we reject the promise in case anything is waiting for it. + this.nextLoopDeferred.reject(new Error("OutgoingRequestsManager was stopped")); } } diff --git a/src/rust-crypto/RoomEncryptor.ts b/src/rust-crypto/RoomEncryptor.ts index 3466d3a2fd0..4e0dc95b32c 100644 --- a/src/rust-crypto/RoomEncryptor.ts +++ b/src/rust-crypto/RoomEncryptor.ts @@ -117,13 +117,15 @@ export class RoomEncryptor { ); } - // Manually call `loadMembersIfNeeded` here, because we want to know if it's the first - // time the room is loaded (due to lazy loading), so we can update the tracked users. - const fromServer = await this.room.loadMembersIfNeeded(); const members = await this.room.getEncryptionTargetMembers(); - if (fromServer && !this.lazyLoadedMembersResolved) { - // It's the first time the room is loaded, so we need to update the tracked users + // If this is the first time we are sending a message to the room, we may not yet have seen all the members + // (so the Crypto SDK might not have a device list for them). So, if this is the first time we are encrypting + // for this room, give the SDK the full list of members, to be on the safe side. + // + // This could end up being racy (if two calls to ensureEncryptionSession happen at the same time), but that's + // not a particular problem, since `OlmMachine.updateTrackedUsers` just adds any users that weren't already tracked. + if (!this.lazyLoadedMembersResolved) { await this.olmMachine.updateTrackedUsers(members.map((u) => new RustSdkCryptoJs.UserId(u.userId))); this.lazyLoadedMembersResolved = true; this.prefixedLogger.debug(`Updated tracked users for room ${this.room.roomId}`); From c4dd02d56c2c27e3e60943c19c2e54e56f811faf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 2 Nov 2023 17:16:25 +0000 Subject: [PATCH 13/14] fix test --- .../OutgoingRequestsManager.spec.ts | 70 ++++++++++--------- 1 file changed, 36 insertions(+), 34 deletions(-) diff --git a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts index 94d80253d16..ba363879931 100644 --- a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts +++ b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts @@ -19,7 +19,7 @@ import * as RustSdkCryptoJs from "@matrix-org/matrix-sdk-crypto-wasm"; import { OutgoingRequest, OutgoingRequestProcessor } from "../../../src/rust-crypto/OutgoingRequestProcessor"; import { OutgoingRequestsManager } from "../../../src/rust-crypto/OutgoingRequestsManager"; -import { defer } from "../../../src/utils"; +import { defer, IDeferred } from "../../../src/utils"; import { logger } from "../../../src/logger"; describe("OutgoingRequestsManager", () => { @@ -107,55 +107,57 @@ describe("OutgoingRequestsManager", () => { const request2 = new RustSdkCryptoJs.KeysUploadRequest("foo2", "{}"); const request3 = new RustSdkCryptoJs.KeysBackupRequest("foo3", "{}", "1"); - // create defer to control if there is a loop going on - const firstOutgoingRequestDefer = defer(); - const secondOutgoingRequestDefer = defer(); - const thirdOutgoingRequestDefer = defer(); + // promises which will resolve when OlmMachine.outgoingRequests is called + const outgoingRequestCalledPromises: Promise[] = []; - olmMachine.outgoingRequests - .mockImplementationOnce(async (): Promise => { - return firstOutgoingRequestDefer.promise; - }) - .mockImplementationOnce(async () => { - return secondOutgoingRequestDefer.promise; - }) - .mockImplementationOnce(async () => { - return thirdOutgoingRequestDefer.promise; - }) - .mockImplementationOnce(async () => { - // Another one that should not occur - return []; + // deferreds which will provide the results of OlmMachine.outgoingRequests + const outgoingRequestResultDeferreds: IDeferred[] = []; + + for (let i = 0; i < 3; i++) { + const resultDeferred = defer(); + const calledPromise = new Promise((resolve) => { + olmMachine.outgoingRequests.mockImplementationOnce(() => { + resolve(); + return resultDeferred.promise; + }); }); + outgoingRequestCalledPromises.push(calledPromise); + outgoingRequestResultDeferreds.push(resultDeferred); + } - const firstRequest = manager.doProcessOutgoingRequests(); + const call1 = manager.doProcessOutgoingRequests(); - // First request will start an iteration and for now is awaiting on firstOutgoingRequestDefer + // First call will start an iteration and for now is awaiting on outgoingRequests + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1); - // Query a new request now, this would request a new iteration - const secondRequest = manager.doProcessOutgoingRequests(); + // Make a new call now: this will request a new iteration + const call2 = manager.doProcessOutgoingRequests(); // let the first iteration complete - firstOutgoingRequestDefer.resolve([request1]); + outgoingRequestResultDeferreds[0].resolve([request1]); - // The first request should be now complete - await firstRequest; + // The first call should now complete + await call1; expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(1); expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request1); - // The second request is awaiting on secondOutgoingRequestDefer - // stack a new request that should be processed in an additional iteration + // Wait for the second iteration to fire and be waiting on `outgoingRequests` + await outgoingRequestCalledPromises[1]; + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(2); - const thirdRequest = manager.doProcessOutgoingRequests(); + // Stack a new call that should be processed in an additional iteration. + const call3 = manager.doProcessOutgoingRequests(); - secondOutgoingRequestDefer.resolve([request2]); - await secondRequest; + outgoingRequestResultDeferreds[1].resolve([request2]); + await call2; expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(2); expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request2); - // The third request is awaiting on thirdOutgoingRequestDefer - - thirdOutgoingRequestDefer.resolve([request3]); - await thirdRequest; + // Wait for the third iteration to fire and be waiting on `outgoingRequests` + await outgoingRequestCalledPromises[2]; + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(3); + outgoingRequestResultDeferreds[2].resolve([request3]); + await call3; expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(3); expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request3); From dadd93f258b379f4c3b40f7bc52d43dd52aaa9a8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 3 Nov 2023 12:06:35 +0000 Subject: [PATCH 14/14] Apply suggestions from code review --- spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts | 2 +- src/rust-crypto/OutgoingRequestsManager.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts index ba363879931..9c59c2f416b 100644 --- a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts +++ b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts @@ -1,5 +1,5 @@ /* -Copyright 2024 The Matrix.org Foundation C.I.C. +Copyright 2023 The Matrix.org Foundation C.I.C. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/src/rust-crypto/OutgoingRequestsManager.ts b/src/rust-crypto/OutgoingRequestsManager.ts index 7486bccbf5c..e3ca9066d0d 100644 --- a/src/rust-crypto/OutgoingRequestsManager.ts +++ b/src/rust-crypto/OutgoingRequestsManager.ts @@ -1,5 +1,5 @@ /* -Copyright 2024 The Matrix.org Foundation C.I.C. +Copyright 2023 The Matrix.org Foundation C.I.C. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.