Skip to content

Commit

Permalink
Wait for received events using /sync in faster room joins tests (#441)
Browse files Browse the repository at this point in the history
matrix-org/synapse#13477 unblocked lazy-loading
`/sync`s while a room has partial state, which allows us to wait for
received events using `/sync`.

"Resync completes even when events arrive before their prev_events" is
now the only test that waits for events using `/event`, since the
outliers do not appear in the `/sync` timeline.
  • Loading branch information
squahtx authored Aug 18, 2022
1 parent 33b97e7 commit 708348a
Showing 1 changed file with 46 additions and 20 deletions.
66 changes: 46 additions & 20 deletions tests/federation_room_join_partial_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ func TestPartialStateJoin(t *testing.T) {
return serverRoom
}

// getSyncToken gets the latest sync token
getSyncToken := func(t *testing.T, alice *client.CSAPI) string {
_, syncToken := alice.MustSync(t,
client.SyncReq{
Filter: buildLazyLoadingSyncFilter(nil),
TimeoutMillis: "0",
},
)
return syncToken
}

// test that a regular /sync request made during a partial-state /send_join
// request blocks until the state is correctly synced.
t.Run("SyncBlocksDuringPartialStateJoin", func(t *testing.T) {
Expand Down Expand Up @@ -182,6 +193,7 @@ func TestPartialStateJoin(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
Expand All @@ -197,14 +209,15 @@ func TestPartialStateJoin(t *testing.T) {
t.Logf("Derek created event with ID %s", event.EventID())

// derek sends an event in the room
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event)
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event, syncToken)
})

// we should be able to receive events with a missing prev event over federation during the resync
t.Run("CanReceiveEventsWithMissingParentsDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
Expand Down Expand Up @@ -236,14 +249,15 @@ func TestPartialStateJoin(t *testing.T) {
[]string{eventB.EventID()}, []*gomatrixserverlib.Event{eventA})

// send event B to hs1
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB)
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB, syncToken)
})

// we should be able to receive events with partially missing prev events over federation during the resync
t.Run("CanReceiveEventsWithHalfMissingParentsDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
Expand Down Expand Up @@ -277,7 +291,7 @@ func TestPartialStateJoin(t *testing.T) {
[]string{eventB.EventID()}, []*gomatrixserverlib.Event{eventA})

// send event B to hs1
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB)
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB, syncToken)
})

// we should be able to receive events with a missing prev event, with half missing prev events,
Expand All @@ -286,6 +300,7 @@ func TestPartialStateJoin(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
Expand Down Expand Up @@ -323,7 +338,7 @@ func TestPartialStateJoin(t *testing.T) {
handleStateRequests(t, server, serverRoom, eventA.EventID(), serverRoom.AllCurrentState(), nil, nil)

// send event C to hs1
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC)
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC, syncToken)
})

// a request to (client-side) /members?at= should block until the (federation) /state request completes
Expand Down Expand Up @@ -635,6 +650,7 @@ func TestPartialStateJoin(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
Expand Down Expand Up @@ -692,7 +708,7 @@ func TestPartialStateJoin(t *testing.T) {

t.Logf("Charlie sent timeline event 2")
// wait for it to become visible, which implies that all the outliers have been pulled in.
awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, timelineEvent2.EventID())
awaitEventViaSync(t, alice, serverRoom.RoomID, timelineEvent2.EventID(), syncToken)

// now we send over all the other events in the gap.
server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{lateEvent.JSON()}, nil)
Expand Down Expand Up @@ -741,6 +757,7 @@ func TestPartialStateJoin(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
Expand Down Expand Up @@ -777,7 +794,7 @@ func TestPartialStateJoin(t *testing.T) {
[]json.RawMessage{badStateEvent.JSON(), sentinelEvent.JSON()}, nil)

// wait for the sentinel event to be visible
awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, sentinelEvent.EventID())
syncToken = awaitEventViaSync(t, alice, serverRoom.RoomID, sentinelEvent.EventID(), syncToken)

// ... and check that the bad state event is *not* visible
must.MatchResponse(t,
Expand All @@ -793,7 +810,7 @@ func TestPartialStateJoin(t *testing.T) {
// one more (non-state) event, for testReceiveEventDuringPartialStateJoin
event := psjResult.CreateMessageEvent(t, "charlie", nil)
t.Logf("charlie created regular timeline event %s", event.EventID())
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event)
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event, syncToken)

// check that the bad state event is *still* not visible
must.MatchResponse(t,
Expand Down Expand Up @@ -1214,14 +1231,14 @@ func TestPartialStateJoin(t *testing.T) {

// test reception of an event over federation during a resync
// sends the given event to the homeserver under test, checks that a client can see it and checks
// the state at the event
// the state at the event. returns the new sync token after the event.
func testReceiveEventDuringPartialStateJoin(
t *testing.T, deployment *docker.Deployment, alice *client.CSAPI, psjResult partialStateJoinResult, event *gomatrixserverlib.Event,
) {
t *testing.T, deployment *docker.Deployment, alice *client.CSAPI, psjResult partialStateJoinResult, event *gomatrixserverlib.Event, syncToken string,
) string {
// send the event to the homeserver
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil)

awaitEventArrival(t, time.Second, alice, psjResult.ServerRoom.RoomID, event.EventID())
syncToken = awaitEventViaSync(t, alice, psjResult.ServerRoom.RoomID, event.EventID(), syncToken)

// fire off a /state_ids request for the last event.
// it must either:
Expand Down Expand Up @@ -1274,7 +1291,7 @@ func testReceiveEventDuringPartialStateJoin(
)
if err := psjResult.Server.SendFederationRequest(context.Background(), deployment, stateReq, &respStateIDs); err != nil {
t.Errorf("/state_ids request returned non-200: %s", err)
return
return syncToken
}
var gotState, expectedState []interface{}
for _, ev := range respStateIDs.StateEventIDs {
Expand All @@ -1284,21 +1301,30 @@ func testReceiveEventDuringPartialStateJoin(
expectedState = append(expectedState, ev.EventID())
}
must.CheckOffAll(t, gotState, expectedState)

return syncToken
}

// awaitEventArrival waits for alice to be able to see a given event
func awaitEventArrival(t *testing.T, timeout time.Duration, alice *client.CSAPI, roomID string, eventID string) {
/* TODO: check that a lazy-loading sync can see the event. Currently this doesn't work, because /sync blocks.
* https://github.com/matrix-org/synapse/issues/13146
alice.MustSyncUntil(t,
// awaitEventViaSync waits for alice to be able to see a given event via an incremental lazy-loading
// /sync and returns the new sync token after
func awaitEventViaSync(t *testing.T, alice *client.CSAPI, roomID string, eventID string, syncToken string) string {
// check that a lazy-loading sync can see the event
syncToken = alice.MustSyncUntil(t,
client.SyncReq{
Since: syncToken,
Filter: buildLazyLoadingSyncFilter(nil),
},
client.SyncTimelineHasEventID(roomID, eventID),
)
*/

// still, Alice should be able to see the event with an /event request. We might have to try it a few times.
t.Logf("Alice successfully received event %s via /sync", eventID)

return syncToken
}

// awaitEventArrival waits for alice to be able to see a given event via /event
func awaitEventArrival(t *testing.T, timeout time.Duration, alice *client.CSAPI, roomID string, eventID string) {
// Alice should be able to see the event with an /event request. We might have to try it a few times.
alice.DoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "event", eventID},
client.WithRetryUntil(timeout, func(res *http.Response) bool {
if res.StatusCode == 200 {
Expand All @@ -1312,7 +1338,7 @@ func awaitEventArrival(t *testing.T, timeout time.Duration, alice *client.CSAPI,
return false
}),
)
t.Logf("Alice successfully received event %s", eventID)
t.Logf("Alice successfully observed event %s via /event", eventID)
}

// buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq
Expand Down

0 comments on commit 708348a

Please sign in to comment.