Skip to content

Commit

Permalink
Fix bugs with the first reply to a thread (#4104)
Browse files Browse the repository at this point in the history
* WIP fix for bugs first-thread-reply bugs

* Add re-emitter before we start adding events, as per comment

* Add test for notification bug

* Test for the bug that caused the dot to be the wrong colour

* Add comment

* elaborate

* Fix outdated comment

* Also fix this comment

* Fix another comment

* Fix typo

Co-authored-by: Richard van der Hoff <[email protected]>

* Clarify comment

* More comment

* so much comment

also reformat (the bit that's actually added is s/it/this.addEvents/)

* The comments

* Maybe make comment clearer.

* Add comment about potential race

---------

Co-authored-by: Richard van der Hoff <[email protected]>
  • Loading branch information
dbkr and richvdh authored Mar 20, 2024
1 parent 7884c22 commit afc3c62
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 47 deletions.
48 changes: 41 additions & 7 deletions spec/unit/room.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2831,11 +2831,41 @@ describe("Room", function () {
// XXX: If we add the relation to the thread response before the thread finishes fetching via /relations
// then the test will fail
await emitPromise(room, ThreadEvent.Update);
await emitPromise(room, ThreadEvent.Update);
await Promise.all([emitPromise(room, ThreadEvent.Update), room.addLiveEvents([threadResponseEdit])]);
expect(thread.replyToEvent!.getContent().body).toBe(threadResponseEdit.getContent()["m.new_content"].body);
});

it("emits event for the first event added to a thread", async () => {
room.client.supportsThreads = () => true;
Thread.setServerSideSupport(FeatureSupport.Stable);

const threadRoot = mkMessage();
const threadResponse1 = mkThreadResponse(threadRoot);

await room.addLiveEvents([threadRoot]);

const onEvent = jest.fn();
room.on(RoomEvent.Timeline, onEvent);

await room.addLiveEvents([threadResponse1]);

expect(onEvent).toHaveBeenCalled();
});

it("contains the events added as soon as it's created", async () => {
room.client.supportsThreads = () => true;
Thread.setServerSideSupport(FeatureSupport.Stable);

const threadRoot = mkMessage();
const threadResponse1 = mkThreadResponse(threadRoot);

const newThreadEventPromise = emitPromise(room, ThreadEvent.New);
await room.addLiveEvents([threadRoot, threadResponse1]);
const thread = await newThreadEventPromise;

expect(thread.timeline).toContain(threadResponse1);
});

it("Redactions to thread responses decrement the length", async () => {
room.client.supportsThreads = () => true;
Thread.setServerSideSupport(FeatureSupport.Stable);
Expand Down Expand Up @@ -2864,7 +2894,6 @@ describe("Room", function () {
let prom = emitPromise(room, ThreadEvent.New);
await room.addLiveEvents([threadRoot, threadResponse1, threadResponse2]);
const thread = await prom;
await emitPromise(room, ThreadEvent.Update);

expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
Expand Down Expand Up @@ -2929,6 +2958,10 @@ describe("Room", function () {
},
});

room.client.fetchRelations = jest.fn().mockResolvedValue({
chunk: [threadResponse2Reaction.event, threadResponse2.event, threadResponse1.event],
});

const prom = emitPromise(room, ThreadEvent.New);
await room.addLiveEvents([threadRoot, threadResponse1, threadResponse2, threadResponse2Reaction]);
const thread = await prom;
Expand Down Expand Up @@ -2969,18 +3002,20 @@ describe("Room", function () {
},
});

let prom = emitPromise(room, ThreadEvent.New);
const prom = emitPromise(room, ThreadEvent.New);
await room.addLiveEvents([threadRoot, threadResponse1, threadResponse2, threadResponse2Reaction]);
const thread = await prom;
await emitPromise(room, ThreadEvent.Update);

expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());

prom = emitPromise(room, ThreadEvent.Update);
const threadRootRedaction = mkRedaction(threadRoot);
await room.addLiveEvents([threadRootRedaction]);
await prom;

// We can't wait for a thread update here because there shouldn't be one (which is
// what we're asserting). Flush any promises to try to get more certainty that an
// update is not happening some time after the event is added.
await flushPromises();
expect(thread).toHaveLength(2);
});

Expand Down Expand Up @@ -3058,7 +3093,6 @@ describe("Room", function () {

await emitPromise(room, ThreadEvent.Update);
const threadResponse2Redaction = mkRedaction(threadResponse2);
await emitPromise(room, ThreadEvent.Update);
await room.addLiveEvents([threadResponse2Redaction]);
expect(thread).toHaveLength(1);
expect(thread.replyToEvent!.getId()).toBe(threadResponse1.getId());
Expand Down
16 changes: 9 additions & 7 deletions src/models/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2345,6 +2345,15 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
receipts: this.cachedThreadReadReceipts.get(threadId) ?? [],
});

// Add the re-emitter before we start adding events to the thread so we don't miss events
this.reEmitter.reEmit(thread, [
ThreadEvent.Delete,
ThreadEvent.Update,
ThreadEvent.NewReply,
RoomEvent.Timeline,
RoomEvent.TimelineReset,
]);

// All read receipts should now come down from sync, we do not need to keep
// a reference to the cached receipts anymore.
this.cachedThreadReadReceipts.delete(threadId);
Expand All @@ -2360,13 +2369,6 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
// and pass the event through this.
thread.addEvents(events, false);

this.reEmitter.reEmit(thread, [
ThreadEvent.Delete,
ThreadEvent.Update,
ThreadEvent.NewReply,
RoomEvent.Timeline,
RoomEvent.TimelineReset,
]);
const isNewer =
this.lastThread?.rootEvent &&
rootEvent?.localTimestamp &&
Expand Down
86 changes: 53 additions & 33 deletions src/models/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
*
* So it looks like this is only really relevant when initialEventsFetched
* is false, because as soon as the initial events have been fetched, we
* should have a timeline (I think).
* should have a proper chunk of timeline from the pagination fetch.
*
* If all replies in this thread are redacted, this is set to the root
* event. I'm not clear what the meaning of this is, since usually after the
Expand Down Expand Up @@ -138,6 +138,7 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
* that we've already fetched them.
*/
public initialEventsFetched = !Thread.hasServerSideSupport;
private initalEventFetchProm: Promise<boolean> | undefined;

/**
* An array of events to add to the timeline once the thread has been initialised
Expand Down Expand Up @@ -377,20 +378,20 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
// When there's no server-side support, just add it to the end of the timeline.
this.addEventToTimeline(event, toStartOfTimeline);
this.client.decryptEventIfNeeded(event);
} else if (!toStartOfTimeline && this.initialEventsFetched && isNewestReply) {
// When we've asked for the event to be added to the end, and we're
// not in the initial state, and this event belongs at the end, add it.
this.addEventToTimeline(event, false);
this.fetchEditsWhereNeeded(event);
} else if (event.isRelation(RelationType.Annotation) || event.isRelation(RelationType.Replace)) {
this.addRelatedThreadEvent(event, toStartOfTimeline);
return;
} else if (this.initialEventsFetched) {
// If initial events have not been fetched, we are OK to throw away
// this event, because we are about to fetch all the events for this
// thread from the server.

} else if (!toStartOfTimeline && isNewestReply) {
// When we've asked for the event to be added to the end,
// and this event belongs at the end, add it.
this.addEventToTimeline(event, false);
this.fetchEditsWhereNeeded(event);
} else {
// Otherwise, we should add it, but we suspect it is out of order.
// This may be because we've just created the thread object and are
// still fetching events, in which case add it where we think is sensible
// and it will be removed and replaced with the events from the pagination
// request once that completes.
if (toStartOfTimeline) {
// If we're adding at the start of the timeline, it doesn't
// matter that it's out of order.
Expand Down Expand Up @@ -606,29 +607,48 @@ export class Thread extends ReadReceipt<ThreadEmittedEvents, ThreadEventHandlerM
await this.processRootEventPromise;

if (!this.initialEventsFetched) {
this.initialEventsFetched = true;
// fetch initial event to allow proper pagination
try {
// if the thread has regular events, this will just load the last reply.
// if the thread is newly created, this will load the root event.
if (this.replyCount === 0 && this.rootEvent) {
this.timelineSet.addEventsToTimeline([this.rootEvent], true, this.liveTimeline, null);
this.liveTimeline.setPaginationToken(null, Direction.Backward);
} else {
await this.client.paginateEventTimeline(this.liveTimeline, {
backwards: true,
});
}
for (const event of this.replayEvents!) {
this.addEvent(event, false);
if (this.initalEventFetchProm) {
await this.initalEventFetchProm;
} else {
// fetch initial events to allow proper pagination
try {
// clear out any events that were added before the pagination request
// completed (eg. from sync). They'll be replaced by those from the pagination.
// Really, we should do this after the pagination request completes, but
// paginateEventTimeline does the request and adds the events in one go, so
// this would need a refactor in order to do. It's therefore possible there's
// a remaining race where an event comes in while the pagination request is
// happening.
this.timelineSet.resetLiveTimeline();
// if the thread has regular events, this will just load the last reply.
// if the thread is newly created, this will load the root event.
if (this.replyCount === 0 && this.rootEvent) {
this.timelineSet.addEventsToTimeline([this.rootEvent], true, this.liveTimeline, null);
this.liveTimeline.setPaginationToken(null, Direction.Backward);
} else {
this.initalEventFetchProm = this.client.paginateEventTimeline(this.liveTimeline, {
backwards: true,
});
await this.initalEventFetchProm;
}
// We have now fetched the initial events, so set the flag. We need to do this before
// we actually add the events, so `this.addEvents` knows that it can now safely add
// them rather than buffer them in the pending event list. The main thing is that this
// must remain false while the async fetch happens, so we don't try to add events before
// the pagination has finished. The important thing is that we're not await-ing anything
// else between setting this and adding events, so no races.
this.initialEventsFetched = true;
for (const event of this.replayEvents!) {
this.addEvent(event, false);
}
this.replayEvents = null;
// just to make sure that, if we've created a timeline window for this thread before the thread itself
// existed (e.g. when creating a new thread), we'll make sure the panel is force refreshed correctly.
this.emit(RoomEvent.TimelineReset, this.room, this.timelineSet, true);
} catch (e) {
logger.error("Failed to load start of newly created thread: ", e);
this.initialEventsFetched = false;
}
this.replayEvents = null;
// just to make sure that, if we've created a timeline window for this thread before the thread itself
// existed (e.g. when creating a new thread), we'll make sure the panel is force refreshed correctly.
this.emit(RoomEvent.TimelineReset, this.room, this.timelineSet, true);
} catch (e) {
logger.error("Failed to load start of newly created thread: ", e);
this.initialEventsFetched = false;
}
}

Expand Down

0 comments on commit afc3c62

Please sign in to comment.