diff --git a/spec/unit/models/thread.spec.ts b/spec/unit/models/thread.spec.ts index 99f090a53b7..4dd5a681b6e 100644 --- a/spec/unit/models/thread.spec.ts +++ b/spec/unit/models/thread.spec.ts @@ -20,7 +20,7 @@ import { Thread, THREAD_RELATION_TYPE, ThreadEvent } from "../../../src/models/t import { mkThread } from "../../test-utils/thread"; import { TestClient } from "../../TestClient"; import { emitPromise, mkMessage, mock } from "../../test-utils/test-utils"; -import { EventStatus, MatrixEvent } from "../../../src"; +import { Direction, EventStatus, MatrixEvent } from "../../../src"; import { ReceiptType } from "../../../src/@types/read_receipts"; import { getMockClientWithEventEmitter, mockClientMethodsUser } from "../../test-utils/client"; import { ReEmitter } from "../../../src/ReEmitter"; @@ -283,4 +283,143 @@ describe("Thread", () => { expect(thread2.getEventReadUpTo(myUserId)).toBe(null); }); }); + + describe("resetLiveTimeline", () => { + // ResetLiveTimeline is used when we have missing messages between the current live timeline's end and newly + // received messages. In that case, we want to replace the existing live timeline. To ensure pagination + // continues working correctly, new pagination tokens need to be set on both the old live timeline (which is + // now a regular timeline) and the new live timeline. + it("replaces the live timeline and correctly sets pagination tokens", async () => { + const myUserId = "@bob:example.org"; + const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, { + timelineSupport: false, + }); + const client = testClient.client; + const room = new Room("123", client, myUserId, { + pendingEventOrdering: PendingEventOrdering.Detached, + }); + + jest.spyOn(client, "getRoom").mockReturnValue(room); + + const { thread } = mkThread({ + room, + client, + authorId: myUserId, + participantUserIds: ["@alice:example.org"], + length: 3, + }); + await emitPromise(thread, ThreadEvent.Update); + expect(thread.length).toBe(2); + + jest.spyOn(client, "createMessagesRequest").mockImplementation((_, token) => + Promise.resolve({ + chunk: [], + start: `${token}-new`, + end: `${token}-new`, + }), + ); + + function timelines(): [string | null, string | null][] { + return thread.timelineSet + .getTimelines() + .map((it) => [it.getPaginationToken(Direction.Backward), it.getPaginationToken(Direction.Forward)]); + } + + expect(timelines()).toEqual([[null, null]]); + const promise = thread.resetLiveTimeline("b1", "f1"); + expect(timelines()).toEqual([ + [null, "f1"], + ["b1", null], + ]); + await promise; + expect(timelines()).toEqual([ + [null, "f1-new"], + ["b1-new", null], + ]); + }); + + // As the pagination tokens cannot be used right now, resetLiveTimeline needs to replace them before they can + // be used. But if in the future the bug in synapse is fixed, and they can actually be used, we can get into a + // state where the client has paginated (and changed the tokens) while resetLiveTimeline tries to set the + // corrected tokens. To prevent such a race condition, we make sure that resetLiveTimeline respects any + // changes done to the pagination tokens. + it("replaces the live timeline but does not replace changed pagination tokens", async () => { + const myUserId = "@bob:example.org"; + const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, { + timelineSupport: false, + }); + const client = testClient.client; + const room = new Room("123", client, myUserId, { + pendingEventOrdering: PendingEventOrdering.Detached, + }); + + jest.spyOn(client, "getRoom").mockReturnValue(room); + + const { thread } = mkThread({ + room, + client, + authorId: myUserId, + participantUserIds: ["@alice:example.org"], + length: 3, + }); + await emitPromise(thread, ThreadEvent.Update); + expect(thread.length).toBe(2); + + jest.spyOn(client, "createMessagesRequest").mockImplementation((_, token) => + Promise.resolve({ + chunk: [], + start: `${token}-new`, + end: `${token}-new`, + }), + ); + + function timelines(): [string | null, string | null][] { + return thread.timelineSet + .getTimelines() + .map((it) => [it.getPaginationToken(Direction.Backward), it.getPaginationToken(Direction.Forward)]); + } + + expect(timelines()).toEqual([[null, null]]); + const promise = thread.resetLiveTimeline("b1", "f1"); + expect(timelines()).toEqual([ + [null, "f1"], + ["b1", null], + ]); + thread.timelineSet.getTimelines()[0].setPaginationToken("f2", Direction.Forward); + thread.timelineSet.getTimelines()[1].setPaginationToken("b2", Direction.Backward); + await promise; + expect(timelines()).toEqual([ + [null, "f2"], + ["b2", null], + ]); + }); + + it("is correctly called by the room", async () => { + const myUserId = "@bob:example.org"; + const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, { + timelineSupport: false, + }); + const client = testClient.client; + const room = new Room("123", client, myUserId, { + pendingEventOrdering: PendingEventOrdering.Detached, + }); + + jest.spyOn(client, "getRoom").mockReturnValue(room); + + const { thread } = mkThread({ + room, + client, + authorId: myUserId, + participantUserIds: ["@alice:example.org"], + length: 3, + }); + await emitPromise(thread, ThreadEvent.Update); + expect(thread.length).toBe(2); + const mock = jest.spyOn(thread, "resetLiveTimeline"); + mock.mockReturnValue(Promise.resolve()); + + room.resetLiveTimeline("b1", "f1"); + expect(mock).toHaveBeenCalledWith("b1", "f1"); + }); + }); }); diff --git a/src/models/room.ts b/src/models/room.ts index a363ef0dfa3..e1202c523d1 100644 --- a/src/models/room.ts +++ b/src/models/room.ts @@ -1114,6 +1114,9 @@ export class Room extends ReadReceipt { for (const timelineSet of this.timelineSets) { timelineSet.resetLiveTimeline(backPaginationToken ?? undefined, forwardPaginationToken ?? undefined); } + for (const thread of this.threads.values()) { + thread.resetLiveTimeline(backPaginationToken, forwardPaginationToken); + } this.fixUpLegacyTimelineFields(); } @@ -1223,7 +1226,7 @@ export class Room extends ReadReceipt { const event = this.findEventById(eventId); const thread = this.findThreadForEvent(event); if (thread) { - return thread.timelineSet.getLiveTimeline(); + return thread.timelineSet.getTimelineForEvent(eventId); } else { return this.getUnfilteredTimelineSet().getTimelineForEvent(eventId); } diff --git a/src/models/thread.ts b/src/models/thread.ts index 2ffee8038c2..31587bbafb7 100644 --- a/src/models/thread.ts +++ b/src/models/thread.ts @@ -256,7 +256,7 @@ export class Thread extends ReadReceipt { this.setEventMetadata(event); const lastReply = this.lastReply(); - const isNewestReply = !lastReply || event.localTimestamp > lastReply!.localTimestamp; + const isNewestReply = !lastReply || event.localTimestamp >= lastReply!.localTimestamp; // Add all incoming events to the thread's timeline set when there's no server support if (!Thread.hasServerSideSupport) { @@ -358,6 +358,63 @@ export class Thread extends ReadReceipt { this.pendingReplyCount = pendingEvents.length; } + /** + * Reset the live timeline of all timelineSets, and start new ones. + * + *

This is used when /sync returns a 'limited' timeline. 'Limited' means that there's a gap between the messages + * /sync returned, and the last known message in our timeline. In such a case, our live timeline isn't live anymore + * and has to be replaced by a new one. To make sure we can continue paginating our timelines correctly, we have to + * set new pagination tokens on the old and the new timeline. + * + * @param backPaginationToken - token for back-paginating the new timeline + * @param forwardPaginationToken - token for forward-paginating the old live timeline, + * if absent or null, all timelines are reset, removing old ones (including the previous live + * timeline which would otherwise be unable to paginate forwards without this token). + * Removing just the old live timeline whilst preserving previous ones is not supported. + */ + public async resetLiveTimeline( + backPaginationToken?: string | null, + forwardPaginationToken?: string | null, + ): Promise { + const oldLive = this.liveTimeline; + this.timelineSet.resetLiveTimeline(backPaginationToken ?? undefined, forwardPaginationToken ?? undefined); + const newLive = this.liveTimeline; + + // FIXME: Remove the following as soon as https://github.com/matrix-org/synapse/issues/14830 is resolved. + // + // The pagination API for thread timelines currently can't handle the type of pagination tokens returned by sync + // + // To make this work anyway, we'll have to transform them into one of the types that the API can handle. + // One option is passing the tokens to /messages, which can handle sync tokens, and returns the right format. + // /messages does not return new tokens on requests with a limit of 0. + // This means our timelines might overlap a slight bit, but that's not an issue, as we deduplicate messages + // anyway. + + let newBackward: string | undefined; + let oldForward: string | undefined; + if (backPaginationToken) { + const res = await this.client.createMessagesRequest(this.roomId, backPaginationToken, 1, Direction.Forward); + newBackward = res.end; + } + if (forwardPaginationToken) { + const res = await this.client.createMessagesRequest( + this.roomId, + forwardPaginationToken, + 1, + Direction.Backward, + ); + oldForward = res.start; + } + // Only replace the token if we don't have paginated away from this position already. This situation doesn't + // occur today, but if the above issue is resolved, we'd have to go down this path. + if (forwardPaginationToken && oldLive.getPaginationToken(Direction.Forward) === forwardPaginationToken) { + oldLive.setPaginationToken(oldForward ?? null, Direction.Forward); + } + if (backPaginationToken && newLive.getPaginationToken(Direction.Backward) === backPaginationToken) { + newLive.setPaginationToken(newBackward ?? null, Direction.Backward); + } + } + private async updateThreadMetadata(): Promise { this.updatePendingReplyCount();