From 708348acf637f1e95b5078da7cebe433b3bf3a3f Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Thu, 18 Aug 2022 15:29:04 +0100 Subject: [PATCH] Wait for received events using /sync in faster room joins tests (#441) https://github.com/matrix-org/synapse/pull/13477 unblocked lazy-loading `/sync`s while a room has partial state, which allows us to wait for received events using `/sync`. "Resync completes even when events arrive before their prev_events" is now the only test that waits for events using `/event`, since the outliers do not appear in the `/sync` timeline. --- ...federation_room_join_partial_state_test.go | 66 +++++++++++++------ 1 file changed, 46 insertions(+), 20 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 0bbf57c3..6b7cb0c9 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -65,6 +65,17 @@ func TestPartialStateJoin(t *testing.T) { return serverRoom } + // getSyncToken gets the latest sync token + getSyncToken := func(t *testing.T, alice *client.CSAPI) string { + _, syncToken := alice.MustSync(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(nil), + TimeoutMillis: "0", + }, + ) + return syncToken + } + // test that a regular /sync request made during a partial-state /send_join // request blocks until the state is correctly synced. t.Run("SyncBlocksDuringPartialStateJoin", func(t *testing.T) { @@ -182,6 +193,7 @@ func TestPartialStateJoin(t *testing.T) { deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") + syncToken := getSyncToken(t, alice) server := createTestServer(t, deployment) cancel := server.Listen() @@ -197,7 +209,7 @@ func TestPartialStateJoin(t *testing.T) { t.Logf("Derek created event with ID %s", event.EventID()) // derek sends an event in the room - testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event) + testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event, syncToken) }) // we should be able to receive events with a missing prev event over federation during the resync @@ -205,6 +217,7 @@ func TestPartialStateJoin(t *testing.T) { deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") + syncToken := getSyncToken(t, alice) server := createTestServer(t, deployment) cancel := server.Listen() @@ -236,7 +249,7 @@ func TestPartialStateJoin(t *testing.T) { []string{eventB.EventID()}, []*gomatrixserverlib.Event{eventA}) // send event B to hs1 - testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB) + testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB, syncToken) }) // we should be able to receive events with partially missing prev events over federation during the resync @@ -244,6 +257,7 @@ func TestPartialStateJoin(t *testing.T) { deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") + syncToken := getSyncToken(t, alice) server := createTestServer(t, deployment) cancel := server.Listen() @@ -277,7 +291,7 @@ func TestPartialStateJoin(t *testing.T) { []string{eventB.EventID()}, []*gomatrixserverlib.Event{eventA}) // send event B to hs1 - testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB) + testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB, syncToken) }) // we should be able to receive events with a missing prev event, with half missing prev events, @@ -286,6 +300,7 @@ func TestPartialStateJoin(t *testing.T) { deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") + syncToken := getSyncToken(t, alice) server := createTestServer(t, deployment) cancel := server.Listen() @@ -323,7 +338,7 @@ func TestPartialStateJoin(t *testing.T) { handleStateRequests(t, server, serverRoom, eventA.EventID(), serverRoom.AllCurrentState(), nil, nil) // send event C to hs1 - testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC) + testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC, syncToken) }) // a request to (client-side) /members?at= should block until the (federation) /state request completes @@ -635,6 +650,7 @@ func TestPartialStateJoin(t *testing.T) { deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") + syncToken := getSyncToken(t, alice) server := createTestServer(t, deployment) cancel := server.Listen() @@ -692,7 +708,7 @@ func TestPartialStateJoin(t *testing.T) { t.Logf("Charlie sent timeline event 2") // wait for it to become visible, which implies that all the outliers have been pulled in. - awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, timelineEvent2.EventID()) + awaitEventViaSync(t, alice, serverRoom.RoomID, timelineEvent2.EventID(), syncToken) // now we send over all the other events in the gap. server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{lateEvent.JSON()}, nil) @@ -741,6 +757,7 @@ func TestPartialStateJoin(t *testing.T) { deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") + syncToken := getSyncToken(t, alice) server := createTestServer(t, deployment) cancel := server.Listen() @@ -777,7 +794,7 @@ func TestPartialStateJoin(t *testing.T) { []json.RawMessage{badStateEvent.JSON(), sentinelEvent.JSON()}, nil) // wait for the sentinel event to be visible - awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, sentinelEvent.EventID()) + syncToken = awaitEventViaSync(t, alice, serverRoom.RoomID, sentinelEvent.EventID(), syncToken) // ... and check that the bad state event is *not* visible must.MatchResponse(t, @@ -793,7 +810,7 @@ func TestPartialStateJoin(t *testing.T) { // one more (non-state) event, for testReceiveEventDuringPartialStateJoin event := psjResult.CreateMessageEvent(t, "charlie", nil) t.Logf("charlie created regular timeline event %s", event.EventID()) - testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event) + testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event, syncToken) // check that the bad state event is *still* not visible must.MatchResponse(t, @@ -1214,14 +1231,14 @@ func TestPartialStateJoin(t *testing.T) { // test reception of an event over federation during a resync // sends the given event to the homeserver under test, checks that a client can see it and checks -// the state at the event +// the state at the event. returns the new sync token after the event. func testReceiveEventDuringPartialStateJoin( - t *testing.T, deployment *docker.Deployment, alice *client.CSAPI, psjResult partialStateJoinResult, event *gomatrixserverlib.Event, -) { + t *testing.T, deployment *docker.Deployment, alice *client.CSAPI, psjResult partialStateJoinResult, event *gomatrixserverlib.Event, syncToken string, +) string { // send the event to the homeserver psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) - awaitEventArrival(t, time.Second, alice, psjResult.ServerRoom.RoomID, event.EventID()) + syncToken = awaitEventViaSync(t, alice, psjResult.ServerRoom.RoomID, event.EventID(), syncToken) // fire off a /state_ids request for the last event. // it must either: @@ -1274,7 +1291,7 @@ func testReceiveEventDuringPartialStateJoin( ) if err := psjResult.Server.SendFederationRequest(context.Background(), deployment, stateReq, &respStateIDs); err != nil { t.Errorf("/state_ids request returned non-200: %s", err) - return + return syncToken } var gotState, expectedState []interface{} for _, ev := range respStateIDs.StateEventIDs { @@ -1284,21 +1301,30 @@ func testReceiveEventDuringPartialStateJoin( expectedState = append(expectedState, ev.EventID()) } must.CheckOffAll(t, gotState, expectedState) + + return syncToken } -// awaitEventArrival waits for alice to be able to see a given event -func awaitEventArrival(t *testing.T, timeout time.Duration, alice *client.CSAPI, roomID string, eventID string) { - /* TODO: check that a lazy-loading sync can see the event. Currently this doesn't work, because /sync blocks. - * https://github.com/matrix-org/synapse/issues/13146 - alice.MustSyncUntil(t, +// awaitEventViaSync waits for alice to be able to see a given event via an incremental lazy-loading +// /sync and returns the new sync token after +func awaitEventViaSync(t *testing.T, alice *client.CSAPI, roomID string, eventID string, syncToken string) string { + // check that a lazy-loading sync can see the event + syncToken = alice.MustSyncUntil(t, client.SyncReq{ + Since: syncToken, Filter: buildLazyLoadingSyncFilter(nil), }, client.SyncTimelineHasEventID(roomID, eventID), ) - */ - // still, Alice should be able to see the event with an /event request. We might have to try it a few times. + t.Logf("Alice successfully received event %s via /sync", eventID) + + return syncToken +} + +// awaitEventArrival waits for alice to be able to see a given event via /event +func awaitEventArrival(t *testing.T, timeout time.Duration, alice *client.CSAPI, roomID string, eventID string) { + // Alice should be able to see the event with an /event request. We might have to try it a few times. alice.DoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "event", eventID}, client.WithRetryUntil(timeout, func(res *http.Response) bool { if res.StatusCode == 200 { @@ -1312,7 +1338,7 @@ func awaitEventArrival(t *testing.T, timeout time.Duration, alice *client.CSAPI, return false }), ) - t.Logf("Alice successfully received event %s", eventID) + t.Logf("Alice successfully observed event %s via /event", eventID) } // buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq