From e947612ad959c93edf29fc27453202ab789a6f9c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 11 Jul 2023 19:08:32 +0100 Subject: [PATCH] Fix #192: ignore unseen old events --- state/accumulator.go | 125 ++++++++++++++++++++------ state/accumulator_test.go | 81 ----------------- tests-integration/regressions_test.go | 112 +++++++++++++++++++++++ 3 files changed, 210 insertions(+), 108 deletions(-) create mode 100644 tests-integration/regressions_test.go diff --git a/state/accumulator.go b/state/accumulator.go index 0105786f..acab4555 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -293,34 +293,20 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia // - Else it creates a new room state snapshot if the timeline contains state events (as this now represents the current state) // - It adds entries to the membership log for membership events. func (a *Accumulator) Accumulate(txn *sqlx.Tx, roomID string, prevBatch string, timeline []json.RawMessage) (numNew int, timelineNIDs []int64, err error) { - // Insert the events. Check for duplicates which can happen in the real world when joining - // Matrix HQ on Synapse. - dedupedEvents := make([]Event, 0, len(timeline)) - seenEvents := make(map[string]struct{}) - for i := range timeline { - e := Event{ - JSON: timeline[i], - RoomID: roomID, - } - if err := e.ensureFieldsSetOnEvent(); err != nil { - return 0, nil, fmt.Errorf("event malformed: %s", err) - } - if _, ok := seenEvents[e.ID]; ok { - logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg( - "Accumulator.Accumulate: seen the same event ID twice, ignoring", - ) - continue - } - if i == 0 && prevBatch != "" { - // tag the first timeline event with the prev batch token - e.PrevBatch = sql.NullString{ - String: prevBatch, - Valid: true, - } - } - dedupedEvents = append(dedupedEvents, e) - seenEvents[e.ID] = struct{}{} + // The first stage of accumulating events is mostly around validation around what the upstream HS sends us. For accumulation to work correctly + // we expect: + // - there to be no duplicate events + // - if there are new events, they are always new. + // Both of these assumptions can be false for different reasons + dedupedEvents, err := a.filterAndParseTimelineEvents(txn, roomID, timeline, prevBatch) + if err != nil { + err = fmt.Errorf("filterTimelineEvents: %w", err) + return } + if len(dedupedEvents) == 0 { + return 0, nil, err // nothing to do + } + eventIDToNID, err := a.eventsTable.Insert(txn, dedupedEvents, false) if err != nil { return 0, nil, err @@ -413,6 +399,91 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, roomID string, prevBatch string, return numNew, timelineNIDs, nil } +// filterAndParseTimelineEvents takes a raw timeline array from sync v2 and applies sanity to it: +// - removes duplicate events: this is just a bug which has been seen on Synapse on matrix.org +// - removes old events: this is an edge case when joining rooms over federation, see https://github.com/matrix-org/sliding-sync/issues/192 +// - parses it and returns Event structs. +// - check which events are unknown. If all events are known, filter them all out. +func (a *Accumulator) filterAndParseTimelineEvents(txn *sqlx.Tx, roomID string, timeline []json.RawMessage, prevBatch string) ([]Event, error) { + // Check for duplicates which can happen in the real world when joining + // Matrix HQ on Synapse, as well as when you join rooms for the first time over federation. + dedupedEvents := make([]Event, 0, len(timeline)) + seenEvents := make(map[string]struct{}) + for i := range timeline { + e := Event{ + JSON: timeline[i], + RoomID: roomID, + } + if err := e.ensureFieldsSetOnEvent(); err != nil { + return nil, fmt.Errorf("event malformed: %s", err) + } + if _, ok := seenEvents[e.ID]; ok { + logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg( + "Accumulator.filterAndParseTimelineEvents: seen the same event ID twice, ignoring", + ) + continue + } + if i == 0 && prevBatch != "" { + // tag the first timeline event with the prev batch token + e.PrevBatch = sql.NullString{ + String: prevBatch, + Valid: true, + } + } + dedupedEvents = append(dedupedEvents, e) + seenEvents[e.ID] = struct{}{} + } + + // if we only have a single timeline event we cannot determine if it is old or not, as we rely on already seen events + // being after (higher index) than it. + if len(dedupedEvents) <= 1 { + return dedupedEvents, nil + } + + // Figure out which of these events are unseen and hence brand new live events. + // In some cases, we may have unseen OLD events - see https://github.com/matrix-org/sliding-sync/issues/192 + // in which case we need to drop those events. + dedupedEventIDs := make([]string, 0, len(seenEvents)) + for evID := range seenEvents { + dedupedEventIDs = append(dedupedEventIDs, evID) + } + unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, dedupedEventIDs) + if err != nil { + return nil, fmt.Errorf("filterAndParseTimelineEvents: failed to SelectUnknownEventIDs: %w", err) + } + + if len(unknownEventIDs) == 0 { + // every event has been seen already, no work to do + return nil, nil + } + + // In the happy case, we expect to see timeline arrays like this: (SEEN=S, UNSEEN=U) + // [S,S,U,U] -> want last 2 + // [U,U,U] -> want all + // In the backfill edge case, we might see: + // [U,S,S,S] -> want none + // [U,S,S,U] -> want last 1 + // We should never see scenarios like: + // [U,S,S,U,S,S] <- we should only see 1 contiguous block of seen events. + // If we do, we'll just ignore all unseen events less than the highest seen event. + + // The algorithm starts at the end and just looks for the first S event, returning the subslice after that S event (which may be []) + seenIndex := -1 + for i := len(dedupedEvents) - 1; i >= 0; i-- { + _, unseen := unknownEventIDs[dedupedEvents[i].ID] + if !unseen { + seenIndex = i + break + } + } + // seenIndex can be -1 if all are unseen, or len-1 if all are seen, either way if we +1 this slices correctly: + // no seen events s[A,B,C] => s[-1+1:] => [A,B,C] + // C is seen event s[A,B,C] => s[2+1:] => [] + // B is seen event s[A,B,C] => s[1+1:] => [C] + // A is seen event s[A,B,C] => s[0+1:] => [B,C] + return dedupedEvents[seenIndex+1:], nil +} + // Delta returns a list of events of at most `limit` for the room not including `lastEventNID`. // Returns the latest NID of the last event (most recent) func (a *Accumulator) Delta(roomID string, lastEventNID int64, limit int) (eventsJSON []json.RawMessage, latest int64, err error) { diff --git a/state/accumulator_test.go b/state/accumulator_test.go index 250854e8..64ee6c86 100644 --- a/state/accumulator_test.go +++ b/state/accumulator_test.go @@ -11,7 +11,6 @@ import ( "github.com/jmoiron/sqlx" "github.com/matrix-org/sliding-sync/sqlutil" "github.com/matrix-org/sliding-sync/sync2" - "github.com/matrix-org/sliding-sync/testutils" "github.com/tidwall/gjson" ) @@ -417,86 +416,6 @@ func TestAccumulatorDupeEvents(t *testing.T) { } } -// Regression test for corrupt state snapshots. -// This seems to have happened in the wild, whereby the snapshot exhibited 2 things: -// - A message event having a event_replaces_nid. This should be impossible as messages are not state. -// - Duplicate events in the state snapshot. -// -// We can reproduce a message event having a event_replaces_nid by doing the following: -// - Create a room with initial state A,C -// - Accumulate events D, A, B(msg). This should be impossible because we already got A initially but whatever, roll with it, blame state resets or something. -// - This leads to A,B being processed and D ignored if you just take the newest results. -// -// This can then be tested by: -// - Query the current room snapshot. This will include B(msg) when it shouldn't. -func TestAccumulatorMisorderedGraceful(t *testing.T) { - alice := "@alice:localhost" - bob := "@bob:localhost" - - eventA := testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"}) - eventC := testutils.NewStateEvent(t, "m.room.create", "", alice, map[string]interface{}{}) - eventD := testutils.NewStateEvent( - t, "m.room.member", bob, "join", map[string]interface{}{"membership": "join"}, - ) - eventBMsg := testutils.NewEvent( - t, "m.room.message", bob, map[string]interface{}{"body": "hello"}, - ) - t.Logf("A=member-alice, B=msg, C=create, D=member-bob") - - db, close := connectToDB(t) - defer close() - accumulator := NewAccumulator(db) - roomID := "!TestAccumulatorStateReset:localhost" - // Create a room with initial state A,C - _, err := accumulator.Initialise(roomID, []json.RawMessage{ - eventA, eventC, - }) - if err != nil { - t.Fatalf("failed to Initialise accumulator: %s", err) - } - - // Accumulate events D, A, B(msg). - err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error { - _, _, err = accumulator.Accumulate(txn, roomID, "", []json.RawMessage{eventD, eventA, eventBMsg}) - return err - }) - if err != nil { - t.Fatalf("failed to Accumulate: %s", err) - } - - eventIDs := []string{ - gjson.GetBytes(eventA, "event_id").Str, - gjson.GetBytes(eventBMsg, "event_id").Str, - gjson.GetBytes(eventC, "event_id").Str, - gjson.GetBytes(eventD, "event_id").Str, - } - t.Logf("Events A,B,C,D: %v", eventIDs) - txn := accumulator.db.MustBeginTx(context.Background(), nil) - idsToNIDs, err := accumulator.eventsTable.SelectNIDsByIDs(txn, eventIDs) - if err != nil { - t.Fatalf("Failed to SelectNIDsByIDs: %s", err) - } - if len(idsToNIDs) != len(eventIDs) { - t.Errorf("SelectNIDsByIDs: asked for %v got %v", eventIDs, idsToNIDs) - } - t.Logf("Events: %v", idsToNIDs) - - wantEventNIDs := []int64{ - idsToNIDs[eventIDs[0]], idsToNIDs[eventIDs[2]], idsToNIDs[eventIDs[3]], - } - sort.Slice(wantEventNIDs, func(i, j int) bool { - return wantEventNIDs[i] < wantEventNIDs[j] - }) - // Query the current room snapshot - gotSnapshotEvents := currentSnapshotNIDs(t, accumulator.snapshotTable, roomID) - if len(gotSnapshotEvents) != len(wantEventNIDs) { // events A,C,D - t.Errorf("corrupt snapshot, got %v want %v", gotSnapshotEvents, wantEventNIDs) - } - if !reflect.DeepEqual(wantEventNIDs, gotSnapshotEvents) { - t.Errorf("got %v want %v", gotSnapshotEvents, wantEventNIDs) - } -} - // Regression test for corrupt state snapshots. // This seems to have happened in the wild, whereby the snapshot exhibited 2 things: // - A message event having a event_replaces_nid. This should be impossible as messages are not state. diff --git a/tests-integration/regressions_test.go b/tests-integration/regressions_test.go new file mode 100644 index 00000000..d24869ce --- /dev/null +++ b/tests-integration/regressions_test.go @@ -0,0 +1,112 @@ +package syncv3 + +import ( + "encoding/json" + "testing" + "time" + + "github.com/matrix-org/sliding-sync/sync2" + "github.com/matrix-org/sliding-sync/sync3" + "github.com/matrix-org/sliding-sync/testutils" + "github.com/matrix-org/sliding-sync/testutils/m" +) + +// catch all file for any kind of regression test which doesn't fall into a unique category + +// Regression test for https://github.com/matrix-org/sliding-sync/issues/192 +// - Bob on his server invites Alice to a room. +// - Alice joins the room first over federation. Proxy does the right thing and sets her membership to join. There is no timeline though due to not having backfilled. +// - Alice's client backfills in the room which pulls in the invite event, but the SS proxy doesn't see it as it's backfill, not /sync. +// - Charlie joins the same room via SS, which makes the SS proxy see 50 timeline events, which includes the invite. +// As the proxy has never seen this invite event before, it assumes it is newer than the join event and inserts it, corrupting state. +// +// Manually confirmed this can happen with 3x Element clients. We need to make sure we drop those earlier events. +// The first join over federation presents itself as a single join event in the timeline, with the create event, etc in state. +func TestBackfillInviteDoesntCorruptState(t *testing.T) { + pqString := testutils.PrepareDBConnectionString() + // setup code + v2 := runTestV2Server(t) + v3 := runTestServer(t, v2, pqString) + defer v2.close() + defer v3.close() + + fedBob := "@bob:over_federation" + charlie := "@charlie:localhost" + charlieToken := "CHARLIE_TOKEN" + joinEvent := testutils.NewJoinEvent(t, alice) + + room := roomEvents{ + roomID: "!TestBackfillInviteDoesntCorruptState:localhost", + events: []json.RawMessage{ + joinEvent, + }, + state: createRoomState(t, fedBob, time.Now()), + } + v2.addAccount(t, alice, aliceToken) + v2.queueResponse(alice, sync2.SyncResponse{ + Rooms: sync2.SyncRoomsResponse{ + Join: v2JoinTimeline(room), + }, + }) + + // alice syncs and should see the room. + aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + Ranges: sync3.SliceRanges{{0, 20}}, + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 5, + }, + }, + }, + }) + m.MatchResponse(t, aliceRes, m.MatchList("a", m.MatchV3Count(1), m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{room.roomID})))) + + // Alice's client "backfills" new data in, meaning the next user who joins is going to see a different set of timeline events + dummyMsg := testutils.NewMessageEvent(t, fedBob, "you didn't see this before joining") + charlieJoinEvent := testutils.NewJoinEvent(t, charlie) + backfilledTimelineEvents := append( + room.state, []json.RawMessage{ + dummyMsg, + testutils.NewStateEvent(t, "m.room.member", alice, fedBob, map[string]interface{}{ + "membership": "invite", + }), + joinEvent, + charlieJoinEvent, + }..., + ) + + // now charlie also joins the room, causing a different response from /sync v2 + v2.addAccount(t, charlie, charlieToken) + v2.queueResponse(charlie, sync2.SyncResponse{ + Rooms: sync2.SyncRoomsResponse{ + Join: v2JoinTimeline(roomEvents{ + roomID: room.roomID, + events: backfilledTimelineEvents, + }), + }, + }) + + // and now charlie hits SS, which might corrupt membership state for alice. + charlieRes := v3.mustDoV3Request(t, charlieToken, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + Ranges: sync3.SliceRanges{{0, 20}}, + }, + }, + }) + m.MatchResponse(t, charlieRes, m.MatchList("a", m.MatchV3Count(1), m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{room.roomID})))) + + // alice should not see dummyMsg or the invite + aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{}) + m.MatchResponse(t, aliceRes, m.MatchNoV3Ops(), m.LogResponse(t), m.MatchRoomSubscriptionsStrict( + map[string][]m.RoomMatcher{ + room.roomID: { + m.MatchJoinCount(3), // alice, bob, charlie, + m.MatchNoInviteCount(), + m.MatchNumLive(1), + m.MatchRoomTimeline([]json.RawMessage{charlieJoinEvent}), + }, + }, + )) +}