diff --git a/spec/integ/crypto/crypto.spec.ts b/spec/integ/crypto/crypto.spec.ts index 0cd01000b73..09cf2ebfe52 100644 --- a/spec/integ/crypto/crypto.spec.ts +++ b/spec/integ/crypto/crypto.spec.ts @@ -692,7 +692,13 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, }); it("prepareToEncrypt", async () => { - expectAliceKeyQuery({ device_keys: { "@alice:localhost": {} }, failures: {} }); + const homeserverUrl = aliceClient.getHomeserverUrl(); + keyResponder = new E2EKeyResponder(homeserverUrl); + keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); + + const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID"); + keyResponder.addDeviceKeys(testDeviceKeys); + await startClientAndAwaitFirstSync(); aliceClient.setGlobalErrorOnUnknownDevices(false); @@ -700,10 +706,7 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, syncResponder.sendOrQueueSyncResponse(getSyncResponse(["@bob:xyz"])); await syncPromise(aliceClient); - // we expect alice first to query bob's keys... - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - - // ... and then claim one of his OTKs + // Alice should claim one of Bob's OTKs expectAliceKeyClaim(getTestKeysClaimResponse("@bob:xyz")); // fire off the prepare request @@ -720,18 +723,20 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, it("Alice sends a megolm message with GlobalErrorOnUnknownDevices=false", async () => { aliceClient.setGlobalErrorOnUnknownDevices(false); - expectAliceKeyQuery({ device_keys: { "@alice:localhost": {} }, failures: {} }); + const homeserverUrl = aliceClient.getHomeserverUrl(); + keyResponder = new E2EKeyResponder(homeserverUrl); + keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); + + const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID"); + keyResponder.addDeviceKeys(testDeviceKeys); + await startClientAndAwaitFirstSync(); // Alice shares a room with Bob syncResponder.sendOrQueueSyncResponse(getSyncResponse(["@bob:xyz"])); await syncPromise(aliceClient); - // Once we send the message, Alice will check Bob's device list (twice, because reasons) ... - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - - // ... and claim one of his OTKs ... + // ... and claim one of Bob's OTKs ... expectAliceKeyClaim(getTestKeysClaimResponse("@bob:xyz")); // ... and send an m.room_key message @@ -746,18 +751,20 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, it("We should start a new megolm session after forceDiscardSession", async () => { aliceClient.setGlobalErrorOnUnknownDevices(false); - expectAliceKeyQuery({ device_keys: { "@alice:localhost": {} }, failures: {} }); + const homeserverUrl = aliceClient.getHomeserverUrl(); + keyResponder = new E2EKeyResponder(homeserverUrl); + keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); + + const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID"); + keyResponder.addDeviceKeys(testDeviceKeys); + await startClientAndAwaitFirstSync(); // Alice shares a room with Bob syncResponder.sendOrQueueSyncResponse(getSyncResponse(["@bob:xyz"])); await syncPromise(aliceClient); - // Once we send the message, Alice will check Bob's device list (twice, because reasons) ... - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - - // ... and claim one of his OTKs ... + // ... and claim one of Bob's OTKs ... expectAliceKeyClaim(getTestKeysClaimResponse("@bob:xyz")); // ... and send an m.room_key message @@ -2052,13 +2059,17 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, }); } - oldBackendOnly("Sending an event initiates a member list sync", async () => { + it("Sending an event initiates a member list sync", async () => { + const homeserverUrl = aliceClient.getHomeserverUrl(); + keyResponder = new E2EKeyResponder(homeserverUrl); + keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); + + const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID"); + keyResponder.addDeviceKeys(testDeviceKeys); + // we expect a call to the /members list... const memberListPromise = expectMembershipRequest(ROOM_ID, ["@bob:xyz"]); - // then a request for bob's devices... - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - // then a to-device with the room_key const inboundGroupSessionPromise = expectSendRoomKey("@bob:xyz", testOlmAccount, p2pSession); @@ -2071,13 +2082,17 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string, await Promise.all([sendPromise, megolmMessagePromise, memberListPromise]); }); - oldBackendOnly("loading the membership list inhibits a later load", async () => { + it("loading the membership list inhibits a later load", async () => { + const homeserverUrl = aliceClient.getHomeserverUrl(); + keyResponder = new E2EKeyResponder(homeserverUrl); + keyResponder.addKeyReceiver("@alice:localhost", keyReceiver); + + const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID"); + keyResponder.addDeviceKeys(testDeviceKeys); + const room = aliceClient.getRoom(ROOM_ID)!; await Promise.all([room.loadMembersIfNeeded(), expectMembershipRequest(ROOM_ID, ["@bob:xyz"])]); - // expect a request for bob's devices... - expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz")); - // then a to-device with the room_key const inboundGroupSessionPromise = expectSendRoomKey("@bob:xyz", testOlmAccount, p2pSession); diff --git a/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts new file mode 100644 index 00000000000..9c59c2f416b --- /dev/null +++ b/spec/unit/rust-crypto/OutgoingRequestsManager.spec.ts @@ -0,0 +1,237 @@ +/* +Copyright 2023 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { Mocked } from "jest-mock"; +import * as RustSdkCryptoJs from "@matrix-org/matrix-sdk-crypto-wasm"; + +import { OutgoingRequest, OutgoingRequestProcessor } from "../../../src/rust-crypto/OutgoingRequestProcessor"; +import { OutgoingRequestsManager } from "../../../src/rust-crypto/OutgoingRequestsManager"; +import { defer, IDeferred } from "../../../src/utils"; +import { logger } from "../../../src/logger"; + +describe("OutgoingRequestsManager", () => { + /** the OutgoingRequestsManager implementation under test */ + let manager: OutgoingRequestsManager; + + /** a mock OutgoingRequestProcessor */ + let processor: Mocked; + + /** a mocked-up OlmMachine which manager is connected to */ + let olmMachine: Mocked; + + beforeEach(async () => { + olmMachine = { + outgoingRequests: jest.fn(), + } as unknown as Mocked; + + processor = { + makeOutgoingRequest: jest.fn(), + } as unknown as Mocked; + + manager = new OutgoingRequestsManager(logger, olmMachine, processor); + }); + + describe("Call doProcessOutgoingRequests", () => { + it("The call triggers handling of the machine outgoing requests", async () => { + const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); + const request2 = new RustSdkCryptoJs.KeysUploadRequest("foo2", "{}"); + olmMachine.outgoingRequests.mockImplementationOnce(async () => { + return [request1, request2]; + }); + + processor.makeOutgoingRequest.mockImplementationOnce(async () => { + return; + }); + + await manager.doProcessOutgoingRequests(); + + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1); + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(2); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request1); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request2); + }); + + it("Stack and batch calls to doProcessOutgoingRequests while one is already running", async () => { + const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); + const request2 = new RustSdkCryptoJs.KeysUploadRequest("foo2", "{}"); + const request3 = new RustSdkCryptoJs.KeysBackupRequest("foo3", "{}", "1"); + + const firstOutgoingRequestDefer = defer(); + + olmMachine.outgoingRequests + .mockImplementationOnce(async (): Promise => { + return firstOutgoingRequestDefer.promise; + }) + .mockImplementationOnce(async () => { + return [request3]; + }); + + const firstRequest = manager.doProcessOutgoingRequests(); + + // stack 2 additional requests while the first one is still running + const secondRequest = manager.doProcessOutgoingRequests(); + const thirdRequest = manager.doProcessOutgoingRequests(); + + // let the first request complete + firstOutgoingRequestDefer.resolve([request1, request2]); + + await firstRequest; + await secondRequest; + await thirdRequest; + + // outgoingRequests should be called twice in total, as the second and third requests are + // processed in the same loop. + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(2); + + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(3); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request1); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request2); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request3); + }); + + it("Process 3 consecutive calls to doProcessOutgoingRequests while not blocking previous ones", async () => { + const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); + const request2 = new RustSdkCryptoJs.KeysUploadRequest("foo2", "{}"); + const request3 = new RustSdkCryptoJs.KeysBackupRequest("foo3", "{}", "1"); + + // promises which will resolve when OlmMachine.outgoingRequests is called + const outgoingRequestCalledPromises: Promise[] = []; + + // deferreds which will provide the results of OlmMachine.outgoingRequests + const outgoingRequestResultDeferreds: IDeferred[] = []; + + for (let i = 0; i < 3; i++) { + const resultDeferred = defer(); + const calledPromise = new Promise((resolve) => { + olmMachine.outgoingRequests.mockImplementationOnce(() => { + resolve(); + return resultDeferred.promise; + }); + }); + outgoingRequestCalledPromises.push(calledPromise); + outgoingRequestResultDeferreds.push(resultDeferred); + } + + const call1 = manager.doProcessOutgoingRequests(); + + // First call will start an iteration and for now is awaiting on outgoingRequests + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1); + + // Make a new call now: this will request a new iteration + const call2 = manager.doProcessOutgoingRequests(); + + // let the first iteration complete + outgoingRequestResultDeferreds[0].resolve([request1]); + + // The first call should now complete + await call1; + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(1); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request1); + + // Wait for the second iteration to fire and be waiting on `outgoingRequests` + await outgoingRequestCalledPromises[1]; + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(2); + + // Stack a new call that should be processed in an additional iteration. + const call3 = manager.doProcessOutgoingRequests(); + + outgoingRequestResultDeferreds[1].resolve([request2]); + await call2; + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(2); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request2); + + // Wait for the third iteration to fire and be waiting on `outgoingRequests` + await outgoingRequestCalledPromises[2]; + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(3); + outgoingRequestResultDeferreds[2].resolve([request3]); + await call3; + + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(3); + expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request3); + + // ensure that no other iteration is going on + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(3); + }); + + it("Should not bubble exceptions if server request is rejected", async () => { + const request = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); + olmMachine.outgoingRequests.mockImplementationOnce(async () => { + return [request]; + }); + + processor.makeOutgoingRequest.mockImplementationOnce(async () => { + throw new Error("Some network error"); + }); + + await manager.doProcessOutgoingRequests(); + + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1); + }); + }); + + describe("Calling stop on the manager should stop ongoing work", () => { + it("When the manager is stopped after outgoingRequests() call, do not make sever requests", async () => { + const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}"); + + const firstOutgoingRequestDefer = defer(); + + olmMachine.outgoingRequests.mockImplementationOnce(async (): Promise => { + return firstOutgoingRequestDefer.promise; + }); + + const firstRequest = manager.doProcessOutgoingRequests(); + + // stop + manager.stop(); + + // let the first request complete + firstOutgoingRequestDefer.resolve([request1]); + + await firstRequest; + + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(0); + }); + + it("When the manager is stopped while doing server calls, it should stop before the next sever call", async () => { + const request1 = new RustSdkCryptoJs.KeysQueryRequest("11", "{}"); + const request2 = new RustSdkCryptoJs.KeysUploadRequest("12", "{}"); + + const firstRequestDefer = defer(); + + olmMachine.outgoingRequests.mockImplementationOnce(async (): Promise => { + return [request1, request2]; + }); + + processor.makeOutgoingRequest + .mockImplementationOnce(async () => { + manager.stop(); + return firstRequestDefer.promise; + }) + .mockImplementationOnce(async () => { + return; + }); + + const firstRequest = manager.doProcessOutgoingRequests(); + + firstRequestDefer.resolve(); + + await firstRequest; + + // should have been called once but not twice + expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/spec/unit/rust-crypto/rust-crypto.spec.ts b/spec/unit/rust-crypto/rust-crypto.spec.ts index c450331187e..1044c71a0c0 100644 --- a/spec/unit/rust-crypto/rust-crypto.spec.ts +++ b/spec/unit/rust-crypto/rust-crypto.spec.ts @@ -50,6 +50,7 @@ import { import * as testData from "../../test-utils/test-data"; import { defer } from "../../../src/utils"; import { logger } from "../../../src/logger"; +import { OutgoingRequestsManager } from "../../../src/rust-crypto/OutgoingRequestsManager"; const TEST_USER = "@alice:example.com"; const TEST_DEVICE_ID = "TEST_DEVICE"; @@ -347,6 +348,8 @@ describe("RustCrypto", () => { makeOutgoingRequest: jest.fn(), } as unknown as Mocked; + const outgoingRequestsManager = new OutgoingRequestsManager(logger, olmMachine, outgoingRequestProcessor); + rustCrypto = new RustCrypto( logger, olmMachine, @@ -357,6 +360,7 @@ describe("RustCrypto", () => { {} as CryptoCallbacks, ); rustCrypto["outgoingRequestProcessor"] = outgoingRequestProcessor; + rustCrypto["outgoingRequestsManager"] = outgoingRequestsManager; }); it("should poll for outgoing messages and send them", async () => { @@ -395,50 +399,6 @@ describe("RustCrypto", () => { await awaitCallToMakeOutgoingRequest(); expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(2); }); - - it("stops looping when stop() is called", async () => { - for (let i = 0; i < 5; i++) { - outgoingRequestQueue.push([new KeysQueryRequest("1234", "{}")]); - } - - let makeRequestPromise = awaitCallToMakeOutgoingRequest(); - - rustCrypto.onSyncCompleted({}); - - expect(rustCrypto["outgoingRequestLoopRunning"]).toBeTruthy(); - - // go a couple of times round the loop - let resolveMakeRequest = await makeRequestPromise; - makeRequestPromise = awaitCallToMakeOutgoingRequest(); - resolveMakeRequest(); - - resolveMakeRequest = await makeRequestPromise; - makeRequestPromise = awaitCallToMakeOutgoingRequest(); - resolveMakeRequest(); - - // a second sync while this is going on shouldn't make any difference - rustCrypto.onSyncCompleted({}); - - resolveMakeRequest = await makeRequestPromise; - outgoingRequestProcessor.makeOutgoingRequest.mockReset(); - resolveMakeRequest(); - - // now stop... - rustCrypto.stop(); - - // which should (eventually) cause the loop to stop with no further calls to outgoingRequests - olmMachine.outgoingRequests.mockReset(); - - await new Promise((resolve) => { - setTimeout(resolve, 100); - }); - expect(rustCrypto["outgoingRequestLoopRunning"]).toBeFalsy(); - expect(outgoingRequestProcessor.makeOutgoingRequest).not.toHaveBeenCalled(); - expect(olmMachine.outgoingRequests).not.toHaveBeenCalled(); - - // we sent three, so there should be 2 left - expect(outgoingRequestQueue.length).toEqual(2); - }); }); describe(".getEventEncryptionInfo", () => { diff --git a/src/rust-crypto/OutgoingRequestsManager.ts b/src/rust-crypto/OutgoingRequestsManager.ts new file mode 100644 index 00000000000..e3ca9066d0d --- /dev/null +++ b/src/rust-crypto/OutgoingRequestsManager.ts @@ -0,0 +1,141 @@ +/* +Copyright 2023 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { OlmMachine } from "@matrix-org/matrix-sdk-crypto-wasm"; + +import { OutgoingRequest, OutgoingRequestProcessor } from "./OutgoingRequestProcessor"; +import { Logger } from "../logger"; +import { defer, IDeferred } from "../utils"; + +/** + * OutgoingRequestsManager: responsible for processing outgoing requests from the OlmMachine. + * Ensure that only one loop is going on at once, and that the requests are processed in order. + */ +export class OutgoingRequestsManager { + /** whether {@link stop} has been called */ + private stopped = false; + + /** whether {@link outgoingRequestLoop} is currently running */ + private outgoingRequestLoopRunning = false; + + /** + * If there are additional calls to doProcessOutgoingRequests() while there is a current call running + * we need to remember in order to call `doProcessOutgoingRequests` again (as there could be new requests). + * + * If this is defined, it is an indication that we need to do another iteration; in this case the deferred + * will resolve once that next iteration completes. If it is undefined, there have been no new calls + * to `doProcessOutgoingRequests` since the current iteration started. + */ + private nextLoopDeferred?: IDeferred; + + public constructor( + private readonly logger: Logger, + private readonly olmMachine: OlmMachine, + public readonly outgoingRequestProcessor: OutgoingRequestProcessor, + ) {} + + /** + * Shut down as soon as possible the current loop of outgoing requests processing. + */ + public stop(): void { + this.stopped = true; + } + + /** + * Process the OutgoingRequests from the OlmMachine. + * + * This should be called at the end of each sync, to process any OlmMachine OutgoingRequests created by the rust sdk. + * In some cases if OutgoingRequests need to be sent immediately, this can be called directly. + * + * Calls to doProcessOutgoingRequests() are processed synchronously, one after the other, in order. + * If doProcessOutgoingRequests() is called while another call is still being processed, it will be queued. + * Multiple calls to doProcessOutgoingRequests() when a call is already processing will be batched together. + */ + public doProcessOutgoingRequests(): Promise { + // Flag that we need at least one more iteration of the loop. + // + // It is important that we do this even if the loop is currently running. There is potential for a race whereby + // a request is added to the queue *after* `OlmMachine.outgoingRequests` checks the queue, but *before* it + // returns. In such a case, the item could sit there unnoticed for some time. + // + // In order to circumvent the race, we set a flag which tells the loop to go round once again even if the + // queue appears to be empty. + if (!this.nextLoopDeferred) { + this.nextLoopDeferred = defer(); + } + + // ... and wait for it to complete. + const result = this.nextLoopDeferred.promise; + + // set the loop going if it is not already. + if (!this.outgoingRequestLoopRunning) { + this.outgoingRequestLoop().catch((e) => { + // this should not happen; outgoingRequestLoop should return any errors via `nextLoopDeferred`. + /* istanbul ignore next */ + this.logger.error("Uncaught error in outgoing request loop", e); + }); + } + return result; + } + + private async outgoingRequestLoop(): Promise { + /* istanbul ignore if */ + if (this.outgoingRequestLoopRunning) { + throw new Error("Cannot run two outgoing request loops"); + } + this.outgoingRequestLoopRunning = true; + try { + while (!this.stopped && this.nextLoopDeferred) { + const deferred = this.nextLoopDeferred; + + // reset `nextLoopDeferred` so that any future calls to `doProcessOutgoingRequests` are queued + // for another additional iteration. + this.nextLoopDeferred = undefined; + + // make the requests and feed the results back to the `nextLoopDeferred` + await this.processOutgoingRequests().then(deferred.resolve, deferred.reject); + } + } finally { + this.outgoingRequestLoopRunning = false; + } + + if (this.nextLoopDeferred) { + // the loop was stopped, but there was a call to `doProcessOutgoingRequests`. Make sure that + // we reject the promise in case anything is waiting for it. + this.nextLoopDeferred.reject(new Error("OutgoingRequestsManager was stopped")); + } + } + + /** + * Make a single request to `olmMachine.outgoingRequests` and do the corresponding requests. + */ + private async processOutgoingRequests(): Promise { + if (this.stopped) return; + + const outgoingRequests: OutgoingRequest[] = await this.olmMachine.outgoingRequests(); + + for (const request of outgoingRequests) { + if (this.stopped) return; + try { + await this.outgoingRequestProcessor.makeOutgoingRequest(request); + } catch (e) { + // as part of the loop we silently ignore errors, but log them. + // The rust sdk will retry the request later as it won't have been marked as sent. + this.logger.error(`Failed to process outgoing request ${request.type}: ${e}`); + } + } + } +} diff --git a/src/rust-crypto/RoomEncryptor.ts b/src/rust-crypto/RoomEncryptor.ts index c384a933586..4e0dc95b32c 100644 --- a/src/rust-crypto/RoomEncryptor.ts +++ b/src/rust-crypto/RoomEncryptor.ts @@ -23,6 +23,7 @@ import { HistoryVisibility as RustHistoryVisibility, ToDeviceRequest, } from "@matrix-org/matrix-sdk-crypto-wasm"; +import * as RustSdkCryptoJs from "@matrix-org/matrix-sdk-crypto-wasm"; import { EventType } from "../@types/event"; import { IContent, MatrixEvent } from "../models/event"; @@ -30,8 +31,8 @@ import { Room } from "../models/room"; import { Logger, logger } from "../logger"; import { KeyClaimManager } from "./KeyClaimManager"; import { RoomMember } from "../models/room-member"; -import { OutgoingRequestProcessor } from "./OutgoingRequestProcessor"; import { HistoryVisibility } from "../@types/partials"; +import { OutgoingRequestsManager } from "./OutgoingRequestsManager"; /** * RoomEncryptor: responsible for encrypting messages to a given room @@ -41,21 +42,34 @@ import { HistoryVisibility } from "../@types/partials"; export class RoomEncryptor { private readonly prefixedLogger: Logger; + /** whether the room members have been loaded and tracked for the first time */ + private lazyLoadedMembersResolved = false; + /** * @param olmMachine - The rust-sdk's OlmMachine * @param keyClaimManager - Our KeyClaimManager, which manages the queue of one-time-key claim requests - * @param outgoingRequestProcessor - The OutgoingRequestProcessor, which sends outgoing requests + * @param outgoingRequestManager - The OutgoingRequestManager, which manages the queue of outgoing requests. * @param room - The room we want to encrypt for * @param encryptionSettings - body of the m.room.encryption event currently in force in this room */ public constructor( private readonly olmMachine: OlmMachine, private readonly keyClaimManager: KeyClaimManager, - private readonly outgoingRequestProcessor: OutgoingRequestProcessor, + private readonly outgoingRequestManager: OutgoingRequestsManager, private readonly room: Room, private encryptionSettings: IContent, ) { this.prefixedLogger = logger.getChild(`[${room.roomId} encryption]`); + + // start tracking devices for any users already known to be in this room. + // Do not load members here, would defeat lazy loading. + const members = room.getJoinedMembers(); + // At this point just mark the known members as tracked, it might not be the full list of members + // because of lazy loading. This is fine, because we will get a member list update when sending a message for + // the first time, see `RoomEncryptor#ensureEncryptionSession` + this.olmMachine.updateTrackedUsers(members.map((u) => new RustSdkCryptoJs.UserId(u.userId))).then(() => { + this.prefixedLogger.debug(`Updated tracked users for room ${room.roomId}`); + }); } /** @@ -104,6 +118,28 @@ export class RoomEncryptor { } const members = await this.room.getEncryptionTargetMembers(); + + // If this is the first time we are sending a message to the room, we may not yet have seen all the members + // (so the Crypto SDK might not have a device list for them). So, if this is the first time we are encrypting + // for this room, give the SDK the full list of members, to be on the safe side. + // + // This could end up being racy (if two calls to ensureEncryptionSession happen at the same time), but that's + // not a particular problem, since `OlmMachine.updateTrackedUsers` just adds any users that weren't already tracked. + if (!this.lazyLoadedMembersResolved) { + await this.olmMachine.updateTrackedUsers(members.map((u) => new RustSdkCryptoJs.UserId(u.userId))); + this.lazyLoadedMembersResolved = true; + this.prefixedLogger.debug(`Updated tracked users for room ${this.room.roomId}`); + } + + // Query keys in case we don't have them for newly tracked members. + // This must be done before ensuring sessions. If not the devices of these users are not + // known yet and will not get the room key. + // We don't have API to only get the keys queries related to this member list, so we just + // process the pending requests from the olmMachine. (usually these are processed + // at the end of the sync, but we can't wait for that). + // XXX future improvement process only KeysQueryRequests for the tracked users. + await this.outgoingRequestManager.doProcessOutgoingRequests(); + this.prefixedLogger.debug( `Encrypting for users (shouldEncryptForInvitedMembers: ${this.room.shouldEncryptForInvitedMembers()}):`, members.map((u) => `${u.userId} (${u.membership})`), @@ -143,7 +179,7 @@ export class RoomEncryptor { ); if (shareMessages) { for (const m of shareMessages) { - await this.outgoingRequestProcessor.makeOutgoingRequest(m); + await this.outgoingRequestManager.outgoingRequestProcessor.makeOutgoingRequest(m); } } } diff --git a/src/rust-crypto/rust-crypto.ts b/src/rust-crypto/rust-crypto.ts index 4d986480e35..33eb1988b5d 100644 --- a/src/rust-crypto/rust-crypto.ts +++ b/src/rust-crypto/rust-crypto.ts @@ -27,7 +27,7 @@ import { BackupDecryptor, CryptoBackend, OnSyncCompletedData } from "../common-c import { Logger } from "../logger"; import { ClientPrefix, IHttpOpts, MatrixHttpApi, Method } from "../http-api"; import { RoomEncryptor } from "./RoomEncryptor"; -import { OutgoingRequest, OutgoingRequestProcessor } from "./OutgoingRequestProcessor"; +import { OutgoingRequestProcessor } from "./OutgoingRequestProcessor"; import { KeyClaimManager } from "./KeyClaimManager"; import { encodeUri, MapWithDefault } from "../utils"; import { @@ -72,6 +72,7 @@ import { ClientStoppedError } from "../errors"; import { ISignatures } from "../@types/signed"; import { encodeBase64 } from "../base64"; import { DecryptionError } from "../crypto/algorithms"; +import { OutgoingRequestsManager } from "./OutgoingRequestsManager"; const ALL_VERIFICATION_METHODS = ["m.sas.v1", "m.qr_code.scan.v1", "m.qr_code.show.v1", "m.reciprocate.v1"]; @@ -93,16 +94,6 @@ export class RustCrypto extends TypedEventEmitter = {}; @@ -111,6 +102,7 @@ export class RustCrypto extends TypedEventEmitter = {}; // When did we last try to check the server for a given session id? @@ -143,6 +135,12 @@ export class RustCrypto extends TypedEventEmitter new RustSdkCryptoJs.UserId(u.userId))); } /** called by the sync loop after processing each sync. @@ -1290,7 +1285,9 @@ export class RustCrypto extends TypedEventEmitter { + this.logger.warn("onSyncCompleted: Error processing outgoing requests", e); + }); } /** @@ -1540,68 +1537,10 @@ export class RustCrypto extends TypedEventEmitter { - this.logger.error("Error processing outgoing-message requests from rust crypto-sdk", e); + this.outgoingRequestsManager.doProcessOutgoingRequests().catch((e) => { + this.logger.warn("onKeyVerificationRequest: Error processing outgoing requests", e); }); } - - private async outgoingRequestLoopInner(): Promise { - /* istanbul ignore if */ - if (this.outgoingRequestLoopRunning) { - throw new Error("Cannot run two outgoing request loops"); - } - this.outgoingRequestLoopRunning = true; - try { - while (!this.stopped) { - // we clear the "one more loop" flag just before calling `OlmMachine.outgoingRequests()`, so we can tell - // if `this.outgoingRequestLoop()` was called while `OlmMachine.outgoingRequests()` was running. - this.outgoingRequestLoopOneMoreLoop = false; - - const outgoingRequests: Object[] = await this.olmMachine.outgoingRequests(); - - if (this.stopped) { - // we've been told to stop while `outgoingRequests` was running: exit the loop without processing - // any of the returned requests (anything important will happen next time the client starts.) - return; - } - - if (outgoingRequests.length === 0 && !this.outgoingRequestLoopOneMoreLoop) { - // `OlmMachine.outgoingRequests` returned no messages, and there was no call to - // `this.outgoingRequestLoop()` while it was running. We can stop the loop for a while. - return; - } - - for (const msg of outgoingRequests) { - await this.outgoingRequestProcessor.makeOutgoingRequest(msg as OutgoingRequest); - } - } - } finally { - this.outgoingRequestLoopRunning = false; - } - } } class EventDecryptor {