Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #192: ignore unseen old events #198

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 98 additions & 27 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,34 +293,20 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
// - Else it creates a new room state snapshot if the timeline contains state events (as this now represents the current state)
// - It adds entries to the membership log for membership events.
func (a *Accumulator) Accumulate(txn *sqlx.Tx, roomID string, prevBatch string, timeline []json.RawMessage) (numNew int, timelineNIDs []int64, err error) {
// Insert the events. Check for duplicates which can happen in the real world when joining
// Matrix HQ on Synapse.
dedupedEvents := make([]Event, 0, len(timeline))
seenEvents := make(map[string]struct{})
for i := range timeline {
e := Event{
JSON: timeline[i],
RoomID: roomID,
}
if err := e.ensureFieldsSetOnEvent(); err != nil {
return 0, nil, fmt.Errorf("event malformed: %s", err)
}
if _, ok := seenEvents[e.ID]; ok {
logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg(
"Accumulator.Accumulate: seen the same event ID twice, ignoring",
)
continue
}
if i == 0 && prevBatch != "" {
// tag the first timeline event with the prev batch token
e.PrevBatch = sql.NullString{
String: prevBatch,
Valid: true,
}
}
dedupedEvents = append(dedupedEvents, e)
seenEvents[e.ID] = struct{}{}
// The first stage of accumulating events is mostly around validation around what the upstream HS sends us. For accumulation to work correctly
// we expect:
// - there to be no duplicate events
// - if there are new events, they are always new.
// Both of these assumptions can be false for different reasons
dedupedEvents, err := a.filterAndParseTimelineEvents(txn, roomID, timeline, prevBatch)
if err != nil {
err = fmt.Errorf("filterTimelineEvents: %w", err)
return
}
if len(dedupedEvents) == 0 {
return 0, nil, err // nothing to do
}

eventIDToNID, err := a.eventsTable.Insert(txn, dedupedEvents, false)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -413,6 +399,91 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, roomID string, prevBatch string,
return numNew, timelineNIDs, nil
}

// filterAndParseTimelineEvents takes a raw timeline array from sync v2 and applies sanity to it:
// - removes duplicate events: this is just a bug which has been seen on Synapse on matrix.org
// - removes old events: this is an edge case when joining rooms over federation, see https://github.com/matrix-org/sliding-sync/issues/192
// - parses it and returns Event structs.
// - check which events are unknown. If all events are known, filter them all out.
func (a *Accumulator) filterAndParseTimelineEvents(txn *sqlx.Tx, roomID string, timeline []json.RawMessage, prevBatch string) ([]Event, error) {
// Check for duplicates which can happen in the real world when joining
// Matrix HQ on Synapse, as well as when you join rooms for the first time over federation.
dedupedEvents := make([]Event, 0, len(timeline))
seenEvents := make(map[string]struct{})
for i := range timeline {
e := Event{
JSON: timeline[i],
RoomID: roomID,
}
if err := e.ensureFieldsSetOnEvent(); err != nil {
return nil, fmt.Errorf("event malformed: %s", err)
}
if _, ok := seenEvents[e.ID]; ok {
logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg(
"Accumulator.filterAndParseTimelineEvents: seen the same event ID twice, ignoring",
)
continue
}
if i == 0 && prevBatch != "" {
// tag the first timeline event with the prev batch token
e.PrevBatch = sql.NullString{
String: prevBatch,
Valid: true,
}
}
dedupedEvents = append(dedupedEvents, e)
seenEvents[e.ID] = struct{}{}
}

// if we only have a single timeline event we cannot determine if it is old or not, as we rely on already seen events
// being after (higher index) than it.
if len(dedupedEvents) <= 1 {
return dedupedEvents, nil
}

// Figure out which of these events are unseen and hence brand new live events.
// In some cases, we may have unseen OLD events - see https://github.com/matrix-org/sliding-sync/issues/192
// in which case we need to drop those events.
dedupedEventIDs := make([]string, 0, len(seenEvents))
for evID := range seenEvents {
dedupedEventIDs = append(dedupedEventIDs, evID)
}
unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, dedupedEventIDs)
if err != nil {
return nil, fmt.Errorf("filterAndParseTimelineEvents: failed to SelectUnknownEventIDs: %w", err)
}

if len(unknownEventIDs) == 0 {
// every event has been seen already, no work to do
return nil, nil
}

// In the happy case, we expect to see timeline arrays like this: (SEEN=S, UNSEEN=U)
// [S,S,U,U] -> want last 2
// [U,U,U] -> want all
// In the backfill edge case, we might see:
// [U,S,S,S] -> want none
// [U,S,S,U] -> want last 1
// We should never see scenarios like:
// [U,S,S,U,S,S] <- we should only see 1 contiguous block of seen events.
// If we do, we'll just ignore all unseen events less than the highest seen event.

// The algorithm starts at the end and just looks for the first S event, returning the subslice after that S event (which may be [])
seenIndex := -1
for i := len(dedupedEvents) - 1; i >= 0; i-- {
_, unseen := unknownEventIDs[dedupedEvents[i].ID]
if !unseen {
seenIndex = i
break
}
}
// seenIndex can be -1 if all are unseen, or len-1 if all are seen, either way if we +1 this slices correctly:
// no seen events s[A,B,C] => s[-1+1:] => [A,B,C]
// C is seen event s[A,B,C] => s[2+1:] => []
// B is seen event s[A,B,C] => s[1+1:] => [C]
// A is seen event s[A,B,C] => s[0+1:] => [B,C]
return dedupedEvents[seenIndex+1:], nil
}

// Delta returns a list of events of at most `limit` for the room not including `lastEventNID`.
// Returns the latest NID of the last event (most recent)
func (a *Accumulator) Delta(roomID string, lastEventNID int64, limit int) (eventsJSON []json.RawMessage, latest int64, err error) {
Expand Down
81 changes: 0 additions & 81 deletions state/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/sqlutil"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/matrix-org/sliding-sync/testutils"
"github.com/tidwall/gjson"
)

Expand Down Expand Up @@ -417,86 +416,6 @@ func TestAccumulatorDupeEvents(t *testing.T) {
}
}

// Regression test for corrupt state snapshots.
// This seems to have happened in the wild, whereby the snapshot exhibited 2 things:
// - A message event having a event_replaces_nid. This should be impossible as messages are not state.
// - Duplicate events in the state snapshot.
//
// We can reproduce a message event having a event_replaces_nid by doing the following:
// - Create a room with initial state A,C
// - Accumulate events D, A, B(msg). This should be impossible because we already got A initially but whatever, roll with it, blame state resets or something.
// - This leads to A,B being processed and D ignored if you just take the newest results.
//
// This can then be tested by:
// - Query the current room snapshot. This will include B(msg) when it shouldn't.
func TestAccumulatorMisorderedGraceful(t *testing.T) {
kegsay marked this conversation as resolved.
Show resolved Hide resolved
alice := "@alice:localhost"
bob := "@bob:localhost"

eventA := testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"})
eventC := testutils.NewStateEvent(t, "m.room.create", "", alice, map[string]interface{}{})
eventD := testutils.NewStateEvent(
t, "m.room.member", bob, "join", map[string]interface{}{"membership": "join"},
)
eventBMsg := testutils.NewEvent(
t, "m.room.message", bob, map[string]interface{}{"body": "hello"},
)
t.Logf("A=member-alice, B=msg, C=create, D=member-bob")

db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
roomID := "!TestAccumulatorStateReset:localhost"
// Create a room with initial state A,C
_, err := accumulator.Initialise(roomID, []json.RawMessage{
eventA, eventC,
})
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}

// Accumulate events D, A, B(msg).
err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
_, _, err = accumulator.Accumulate(txn, roomID, "", []json.RawMessage{eventD, eventA, eventBMsg})
return err
})
if err != nil {
t.Fatalf("failed to Accumulate: %s", err)
}

eventIDs := []string{
gjson.GetBytes(eventA, "event_id").Str,
gjson.GetBytes(eventBMsg, "event_id").Str,
gjson.GetBytes(eventC, "event_id").Str,
gjson.GetBytes(eventD, "event_id").Str,
}
t.Logf("Events A,B,C,D: %v", eventIDs)
txn := accumulator.db.MustBeginTx(context.Background(), nil)
idsToNIDs, err := accumulator.eventsTable.SelectNIDsByIDs(txn, eventIDs)
if err != nil {
t.Fatalf("Failed to SelectNIDsByIDs: %s", err)
}
if len(idsToNIDs) != len(eventIDs) {
t.Errorf("SelectNIDsByIDs: asked for %v got %v", eventIDs, idsToNIDs)
}
t.Logf("Events: %v", idsToNIDs)

wantEventNIDs := []int64{
idsToNIDs[eventIDs[0]], idsToNIDs[eventIDs[2]], idsToNIDs[eventIDs[3]],
}
sort.Slice(wantEventNIDs, func(i, j int) bool {
return wantEventNIDs[i] < wantEventNIDs[j]
})
// Query the current room snapshot
gotSnapshotEvents := currentSnapshotNIDs(t, accumulator.snapshotTable, roomID)
if len(gotSnapshotEvents) != len(wantEventNIDs) { // events A,C,D
t.Errorf("corrupt snapshot, got %v want %v", gotSnapshotEvents, wantEventNIDs)
}
if !reflect.DeepEqual(wantEventNIDs, gotSnapshotEvents) {
t.Errorf("got %v want %v", gotSnapshotEvents, wantEventNIDs)
}
}

// Regression test for corrupt state snapshots.
// This seems to have happened in the wild, whereby the snapshot exhibited 2 things:
// - A message event having a event_replaces_nid. This should be impossible as messages are not state.
Expand Down
112 changes: 112 additions & 0 deletions tests-integration/regressions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package syncv3

import (
"encoding/json"
"testing"
"time"

"github.com/matrix-org/sliding-sync/sync2"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/testutils"
"github.com/matrix-org/sliding-sync/testutils/m"
)

// catch all file for any kind of regression test which doesn't fall into a unique category

// Regression test for https://github.com/matrix-org/sliding-sync/issues/192
// - Bob on his server invites Alice to a room.
// - Alice joins the room first over federation. Proxy does the right thing and sets her membership to join. There is no timeline though due to not having backfilled.
// - Alice's client backfills in the room which pulls in the invite event, but the SS proxy doesn't see it as it's backfill, not /sync.
// - Charlie joins the same room via SS, which makes the SS proxy see 50 timeline events, which includes the invite.
// As the proxy has never seen this invite event before, it assumes it is newer than the join event and inserts it, corrupting state.
//
// Manually confirmed this can happen with 3x Element clients. We need to make sure we drop those earlier events.
// The first join over federation presents itself as a single join event in the timeline, with the create event, etc in state.
func TestBackfillInviteDoesntCorruptState(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()

fedBob := "@bob:over_federation"
charlie := "@charlie:localhost"
charlieToken := "CHARLIE_TOKEN"
joinEvent := testutils.NewJoinEvent(t, alice)

room := roomEvents{
roomID: "!TestBackfillInviteDoesntCorruptState:localhost",
events: []json.RawMessage{
joinEvent,
},
state: createRoomState(t, fedBob, time.Now()),
}
v2.addAccount(t, alice, aliceToken)
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(room),
},
})

// alice syncs and should see the room.
aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: sync3.SliceRanges{{0, 20}},
RoomSubscription: sync3.RoomSubscription{
TimelineLimit: 5,
},
},
},
})
m.MatchResponse(t, aliceRes, m.MatchList("a", m.MatchV3Count(1), m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{room.roomID}))))

// Alice's client "backfills" new data in, meaning the next user who joins is going to see a different set of timeline events
dummyMsg := testutils.NewMessageEvent(t, fedBob, "you didn't see this before joining")
charlieJoinEvent := testutils.NewJoinEvent(t, charlie)
backfilledTimelineEvents := append(
room.state, []json.RawMessage{
dummyMsg,
testutils.NewStateEvent(t, "m.room.member", alice, fedBob, map[string]interface{}{
"membership": "invite",
}),
joinEvent,
charlieJoinEvent,
}...,
)

// now charlie also joins the room, causing a different response from /sync v2
v2.addAccount(t, charlie, charlieToken)
v2.queueResponse(charlie, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: room.roomID,
events: backfilledTimelineEvents,
}),
},
})

// and now charlie hits SS, which might corrupt membership state for alice.
charlieRes := v3.mustDoV3Request(t, charlieToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: sync3.SliceRanges{{0, 20}},
},
},
})
m.MatchResponse(t, charlieRes, m.MatchList("a", m.MatchV3Count(1), m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{room.roomID}))))

// alice should not see dummyMsg or the invite
aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{})
m.MatchResponse(t, aliceRes, m.MatchNoV3Ops(), m.LogResponse(t), m.MatchRoomSubscriptionsStrict(
map[string][]m.RoomMatcher{
room.roomID: {
m.MatchJoinCount(3), // alice, bob, charlie,
m.MatchNoInviteCount(),
m.MatchNumLive(1),
m.MatchRoomTimeline([]json.RawMessage{charlieJoinEvent}),
},
},
))
}