Skip to content

Commit

Permalink
Fix TestDeviceListUpdates flakes (#593)
Browse files Browse the repository at this point in the history
When a remote user has a device list update, an entry is expected to appear in
`device_lists.changed` in the `/sync` response. The local homeserver may then
query the remote homeserver for the update. Some homeservers (Synapse) may emit
extra `device_lists.changed` updates in `/sync` responses after querying keys.

Previously, we attempted to skip over the spurious `device_lists.changed`
updates by doing another sync. This wasn't sufficient, so we make another
remote user send a device list update, and wait for that to appear.

Fixes #580.

Signed-off-by: Sean Quah <[email protected]>
  • Loading branch information
squahtx authored Feb 1, 2023
1 parent 8df7168 commit 02c5e37
Showing 1 changed file with 52 additions and 10 deletions.
62 changes: 52 additions & 10 deletions tests/csapi/device_lists_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,51 @@ func TestDeviceListUpdates(t *testing.T) {
}
}

// makeBarrier returns a function which tries to act as a barrier for `device_lists.changed`
// updates.
//
// When a remote user has a device list update, an entry is expected to appear in
// `device_lists.changed` in the `/sync` response. The local homeserver may then query the
// remote homeserver for the update. Some homeservers (Synapse) may emit extra
// `device_lists.changed` updates in `/sync` responses after querying keys.
//
// The barrier tries to ensure that `device_lists.changed` entries resulting from device list
// updates and queries before the barrier do not appear in `/sync` responses after the barrier.
makeBarrier := func(
t *testing.T,
deployment *docker.Deployment,
observingUser *client.CSAPI,
otherHSName string,
) func(t *testing.T, nextBatch string) string {
t.Helper()

barry := deployment.RegisterUser(t, otherHSName, generateLocalpart("barry"), "password", false)

// The observing user must share a room with the dummy barrier user.
roomID := barry.CreateRoom(t, map[string]interface{}{"preset": "public_chat"})
observingUser.JoinRoom(t, roomID, []string{otherHSName})
observingUser.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(observingUser.UserID, roomID))

return func(t *testing.T, nextBatch string) string {
// Publish a device list update from the barrier user and wait until the observing user
// sees it.
uploadNewKeys(t, barry)
return observingUser.MustSyncUntil(
t,
client.SyncReq{Since: nextBatch},
syncDeviceListsHas("changed", barry.UserID),
)
}
}

// In all of these test scenarios, there are two users: Alice and Bob.
// We only care about what Alice sees.

// testOtherUserJoin tests another user joining a room Alice is already in.
testOtherUserJoin := func(t *testing.T, deployment *docker.Deployment, hsName string, otherHSName string) {
alice := deployment.RegisterUser(t, hsName, generateLocalpart("alice"), "password", false)
bob := deployment.RegisterUser(t, otherHSName, generateLocalpart("bob"), "password", false)
barrier := makeBarrier(t, deployment, alice, otherHSName)
checkBobKeys := uploadNewKeys(t, bob)

roomID := alice.CreateRoom(t, map[string]interface{}{"preset": "public_chat"})
Expand All @@ -117,14 +155,14 @@ func TestDeviceListUpdates(t *testing.T) {
bob.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))

// Check that Alice receives a device list update from Bob
alice.MustSyncUntil(
aliceNextBatch = alice.MustSyncUntil(
t,
client.SyncReq{Since: aliceNextBatch},
syncDeviceListsHas("changed", bob.UserID),
)
mustQueryKeys(t, alice, bob.UserID, checkBobKeys)
// Some homeservers (Synapse) may emit another `changed` update after querying keys.
_, aliceNextBatch = alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"})
aliceNextBatch = barrier(t, aliceNextBatch)

// Both homeservers think Bob has joined now
// Bob then updates their device list
Expand All @@ -145,6 +183,7 @@ func TestDeviceListUpdates(t *testing.T) {
) {
alice := deployment.RegisterUser(t, hsName, generateLocalpart("alice"), "password", false)
bob := deployment.RegisterUser(t, otherHSName, generateLocalpart("bob"), "password", false)
barrier := makeBarrier(t, deployment, alice, otherHSName)
checkBobKeys := uploadNewKeys(t, bob)

roomID := bob.CreateRoom(t, map[string]interface{}{"preset": "public_chat"})
Expand All @@ -157,14 +196,14 @@ func TestDeviceListUpdates(t *testing.T) {
bob.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))

// Check that Alice receives a device list update from Bob
alice.MustSyncUntil(
aliceNextBatch = alice.MustSyncUntil(
t,
client.SyncReq{Since: aliceNextBatch},
syncDeviceListsHas("changed", bob.UserID),
)
mustQueryKeys(t, alice, bob.UserID, checkBobKeys)
// Some homeservers (Synapse) may emit another `changed` update after querying keys.
_, aliceNextBatch = alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"})
aliceNextBatch = barrier(t, aliceNextBatch)

// Both homeservers think Alice has joined now
// Bob then updates their device list
Expand All @@ -183,6 +222,7 @@ func TestDeviceListUpdates(t *testing.T) {
testOtherUserLeave := func(t *testing.T, deployment *docker.Deployment, hsName string, otherHSName string) {
alice := deployment.RegisterUser(t, hsName, generateLocalpart("alice"), "password", false)
bob := deployment.RegisterUser(t, otherHSName, generateLocalpart("bob"), "password", false)
barrier := makeBarrier(t, deployment, alice, otherHSName)
checkBobKeys := uploadNewKeys(t, bob)

roomID := alice.CreateRoom(t, map[string]interface{}{"preset": "public_chat"})
Expand All @@ -192,10 +232,10 @@ func TestDeviceListUpdates(t *testing.T) {
bobNextBatch := bob.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))

// Alice performs an initial sync
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))
aliceNextBatch := alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))
mustQueryKeys(t, alice, bob.UserID, checkBobKeys)
// Some homeservers (Synapse) may emit another `changed` update after querying keys.
_, aliceNextBatch := alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"})
aliceNextBatch = barrier(t, aliceNextBatch)

// Bob leaves the room
bob.LeaveRoom(t, roomID)
Expand Down Expand Up @@ -226,6 +266,7 @@ func TestDeviceListUpdates(t *testing.T) {
testLeave := func(t *testing.T, deployment *docker.Deployment, hsName string, otherHSName string) {
alice := deployment.RegisterUser(t, hsName, generateLocalpart("alice"), "password", false)
bob := deployment.RegisterUser(t, otherHSName, generateLocalpart("bob"), "password", false)
barrier := makeBarrier(t, deployment, alice, otherHSName)
checkBobKeys := uploadNewKeys(t, bob)

roomID := bob.CreateRoom(t, map[string]interface{}{"preset": "public_chat"})
Expand All @@ -235,10 +276,10 @@ func TestDeviceListUpdates(t *testing.T) {
bobNextBatch := bob.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))

// Alice performs an initial sync
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))
aliceNextBatch := alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))
mustQueryKeys(t, alice, bob.UserID, checkBobKeys)
// Some homeservers (Synapse) may emit another `changed` update after querying keys.
_, aliceNextBatch := alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"})
aliceNextBatch = barrier(t, aliceNextBatch)

// Alice leaves the room
alice.LeaveRoom(t, roomID)
Expand Down Expand Up @@ -269,6 +310,7 @@ func TestDeviceListUpdates(t *testing.T) {
testOtherUserRejoin := func(t *testing.T, deployment *docker.Deployment, hsName string, otherHSName string) {
alice := deployment.RegisterUser(t, hsName, generateLocalpart("alice"), "password", false)
bob := deployment.RegisterUser(t, otherHSName, generateLocalpart("bob"), "password", false)
barrier := makeBarrier(t, deployment, alice, otherHSName)
checkBobKeys := uploadNewKeys(t, bob)

roomID := alice.CreateRoom(t, map[string]interface{}{"preset": "public_chat"})
Expand All @@ -278,10 +320,10 @@ func TestDeviceListUpdates(t *testing.T) {
bobNextBatch := bob.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))

// Alice performs an initial sync
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))
aliceNextBatch := alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))
mustQueryKeys(t, alice, bob.UserID, checkBobKeys)
// Some homeservers (Synapse) may emit another `changed` update after querying keys.
_, aliceNextBatch := alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"})
aliceNextBatch = barrier(t, aliceNextBatch)

// Both homeservers think Bob has joined now
// Bob leaves the room
Expand Down

0 comments on commit 02c5e37

Please sign in to comment.