diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index 5028244e..4b2ddc77 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -46,12 +46,18 @@ var ( markerInsertionContentField = "org.matrix.msc2716.marker.insertion" ) -var createRoomOpts = map[string]interface{}{ +var createPublicRoomOpts = map[string]interface{}{ "preset": "public_chat", "name": "the hangout spot", "room_version": "org.matrix.msc2716", } +var createPrivateRoomOpts = map[string]interface{}{ + "preset": "private_chat", + "name": "the hangout spot", + "room_version": "org.matrix.msc2716", +} + func TestBackfillingHistory(t *testing.T) { deployment := Deploy(t, b.BlueprintHSWithApplicationService) defer deployment.Destroy(t) @@ -82,7 +88,7 @@ func TestBackfillingHistory(t *testing.T) { t.Run("Backfilled historical events resolve with proper state in correct order", func(t *testing.T) { t.Parallel() - roomID := as.CreateRoom(t, createRoomOpts) + roomID := as.CreateRoom(t, createPublicRoomOpts) alice.JoinRoom(t, roomID, nil) // Create some normal messages in the timeline. We're creating them in @@ -106,15 +112,15 @@ func TestBackfillingHistory(t *testing.T) { eventIDsAfter := createMessagesInRoom(t, alice, roomID, 2) // Insert the most recent chunk of backfilled history + insertTime1 := timeAfterEventBefore.Add(timeBetweenMessages * 3) batchSendRes := batchSendHistoricalMessages( t, as, - []string{virtualUserID}, roomID, eventIdBefore, - timeAfterEventBefore.Add(timeBetweenMessages*3), "", - 3, + createJoinStateEventsForBackfillRequest([]string{virtualUserID}, insertTime1), + createMessageEventsForBackfillRequest([]string{virtualUserID}, insertTime1, 3), // Status 200, ) @@ -124,15 +130,15 @@ func TestBackfillingHistory(t *testing.T) { // Insert another older chunk of backfilled history from the same user. // Make sure the meta data and joins still work on the subsequent chunk + insertTime2 := timeAfterEventBefore batchSendRes2 := batchSendHistoricalMessages( t, as, - []string{virtualUserID}, roomID, eventIdBefore, - timeAfterEventBefore, nextChunkID, - 3, + createJoinStateEventsForBackfillRequest([]string{virtualUserID}, insertTime2), + createMessageEventsForBackfillRequest([]string{virtualUserID}, insertTime2, 3), // Status 200, ) @@ -193,7 +199,7 @@ func TestBackfillingHistory(t *testing.T) { t.Run("Backfilled historical events from multiple users in the same chunk", func(t *testing.T) { t.Parallel() - roomID := as.CreateRoom(t, createRoomOpts) + roomID := as.CreateRoom(t, createPublicRoomOpts) alice.JoinRoom(t, roomID, nil) // Create the "live" event we are going to insert our backfilled events next to @@ -207,16 +213,17 @@ func TestBackfillingHistory(t *testing.T) { virtualUserID3 := "@carol:hs1" ensureVirtualUserRegistered(t, as, "carol") + virtualUserList := []string{virtualUserID, virtualUserID2, virtualUserID3} + // Insert a backfilled event batchSendRes := batchSendHistoricalMessages( t, as, - []string{virtualUserID, virtualUserID2, virtualUserID3}, roomID, eventIdBefore, - timeAfterEventBefore, "", - 3, + createJoinStateEventsForBackfillRequest(virtualUserList, timeAfterEventBefore), + createMessageEventsForBackfillRequest(virtualUserList, timeAfterEventBefore, 3), // Status 200, ) @@ -237,10 +244,10 @@ func TestBackfillingHistory(t *testing.T) { }) }) - t.Run("Backfilled historical events with m.historical do not come down in an incremental sync", func(t *testing.T) { + t.Run("Backfilled historical events do not come down in an incremental sync", func(t *testing.T) { t.Parallel() - roomID := as.CreateRoom(t, createRoomOpts) + roomID := as.CreateRoom(t, createPublicRoomOpts) alice.JoinRoom(t, roomID, nil) // Create the "live" event we are going to insert our backfilled events next to @@ -255,12 +262,11 @@ func TestBackfillingHistory(t *testing.T) { batchSendRes := batchSendHistoricalMessages( t, as, - []string{virtualUserID}, roomID, eventIdBefore, - timeAfterEventBefore, "", - 1, + createJoinStateEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore, 1), // Status 200, ) @@ -284,20 +290,53 @@ func TestBackfillingHistory(t *testing.T) { }) }) + t.Run("Batch send endpoint only returns state events that we passed in via state_events_at_start", func(t *testing.T) { + t.Parallel() + + roomID := as.CreateRoom(t, createPublicRoomOpts) + alice.JoinRoom(t, roomID, nil) + + // Create the "live" event we are going to insert our backfilled events next to + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() + + // Insert a backfilled event + batchSendRes := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + "", + createJoinStateEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore, 1), + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + stateEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "state_events") + + // We only expect 1 state event to be returned because we only passed in 1 + // event into `?state_events_at_start` + if len(stateEventIDs) != 1 { + t.Fatalf("Expected only 1 state event to be returned but received %d: %v", len(stateEventIDs), stateEventIDs) + } + }) + t.Run("Unrecognised prev_event ID will throw an error", func(t *testing.T) { t.Parallel() - roomID := as.CreateRoom(t, createRoomOpts) + roomID := as.CreateRoom(t, createPublicRoomOpts) + insertTime := time.Now() batchSendHistoricalMessages( t, as, - []string{virtualUserID}, roomID, "$some-non-existant-event-id", - time.Now(), "", - 1, + createJoinStateEventsForBackfillRequest([]string{virtualUserID}, insertTime), + createMessageEventsForBackfillRequest([]string{virtualUserID}, insertTime, 1), // Status // TODO: Seems like this makes more sense as a 404 // But the current Synapse code around unknown prev events will throw -> @@ -309,7 +348,7 @@ func TestBackfillingHistory(t *testing.T) { t.Run("Normal users aren't allowed to backfill messages", func(t *testing.T) { t.Parallel() - roomID := as.CreateRoom(t, createRoomOpts) + roomID := as.CreateRoom(t, createPublicRoomOpts) alice.JoinRoom(t, roomID, nil) eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) @@ -319,28 +358,80 @@ func TestBackfillingHistory(t *testing.T) { batchSendHistoricalMessages( t, alice, - []string{virtualUserID}, roomID, eventIdBefore, - timeAfterEventBefore, "", - 1, + createJoinStateEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore, 1), // Status // Normal user alice should not be able to backfill messages 403, ) }) + t.Run("TODO: Trying to send insertion event with same `next_chunk_id` will reject", func(t *testing.T) { + t.Skip("Skipping until implemented") + // (room_id, next_chunk_id) should be unique + }) + + t.Run("Should be able to backfill into private room", func(t *testing.T) { + t.Parallel() + + roomID := as.CreateRoom(t, createPrivateRoomOpts) + as.InviteRoom(t, roomID, alice.UserID) + alice.JoinRoom(t, roomID, nil) + + // Create the "live" event we are going to insert our backfilled events next to + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() + + var stateEvents []map[string]interface{} + stateEvents = append(stateEvents, createInviteStateEventsForBackfillRequest(as.UserID, []string{virtualUserID}, timeAfterEventBefore)...) + stateEvents = append(stateEvents, createJoinStateEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore)...) + + // Insert a backfilled event + batchSendRes := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + "", + stateEvents, + createMessageEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore, 3), + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := getEventsFromBatchSendResponseBody(t, batchSendResBody) + + messagesRes := alice.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + must.MatchResponse(t, messagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) + }) + t.Run("TODO: Test if historical avatar/display name set back in time are picked up on historical messages", func(t *testing.T) { t.Skip("Skipping until implemented") // TODO: Try adding avatar and displayName and see if historical messages get this info }) + t.Run("TODO: What happens when you point multiple chunks at the same insertion event?", func(t *testing.T) { + t.Skip("Skipping until implemented") + }) + t.Run("Historical messages are visible when joining on federated server - auto-generated base insertion event", func(t *testing.T) { - t.Skip("Skipping until federation is implemented") t.Parallel() - roomID := as.CreateRoom(t, createRoomOpts) + roomID := as.CreateRoom(t, createPublicRoomOpts) alice.JoinRoom(t, roomID, nil) eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) @@ -353,12 +444,11 @@ func TestBackfillingHistory(t *testing.T) { batchSendRes := batchSendHistoricalMessages( t, as, - []string{virtualUserID}, roomID, eventIdBefore, - timeAfterEventBefore, "", - 2, + createJoinStateEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore, 2), // Status 200, ) @@ -392,10 +482,9 @@ func TestBackfillingHistory(t *testing.T) { }) t.Run("Historical messages are visible when joining on federated server - pre-made insertion event", func(t *testing.T) { - t.Skip("Skipping until federation is implemented") t.Parallel() - roomID := as.CreateRoom(t, createRoomOpts) + roomID := as.CreateRoom(t, createPublicRoomOpts) alice.JoinRoom(t, roomID, nil) eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) @@ -412,7 +501,7 @@ func TestBackfillingHistory(t *testing.T) { }, } // We can't use as.SendEventSynced(...) because application services can't use the /sync API - insertionSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", insertionEvent.Type, "txn-m123"}, client.WithJSONBody(t, insertionEvent.Content)) + insertionSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", insertionEvent.Type, "txn-i123"}, client.WithJSONBody(t, insertionEvent.Content)) insertionSendBody := client.ParseJSON(t, insertionSendRes) insertionEventID := client.GetJSONFieldStr(t, insertionSendBody, "event_id") // Make sure the insertion event has reached the homeserver @@ -426,12 +515,11 @@ func TestBackfillingHistory(t *testing.T) { batchSendRes := batchSendHistoricalMessages( t, as, - []string{virtualUserID}, roomID, eventIdBefore, - timeAfterEventBefore, chunkId, - 2, + createJoinStateEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore, 2), // Status 200, ) @@ -465,10 +553,9 @@ func TestBackfillingHistory(t *testing.T) { }) t.Run("Historical messages are visible when already joined on federated server", func(t *testing.T) { - t.Skip("Skipping until federation is implemented") t.Parallel() - roomID := as.CreateRoom(t, createRoomOpts) + roomID := as.CreateRoom(t, createPublicRoomOpts) alice.JoinRoom(t, roomID, nil) // Join the room from a remote homeserver before any backfilled messages are sent @@ -484,19 +571,18 @@ func TestBackfillingHistory(t *testing.T) { // Mimic scrollback just through the latest messages remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ "dir": []string{"b"}, - // Limited so we can only see a few of the latest messages + // Limited so we can only see a portion of the latest messages "limit": []string{"5"}, })) batchSendRes := batchSendHistoricalMessages( t, as, - []string{virtualUserID}, roomID, eventIdBefore, - timeAfterEventBefore, "", - 2, + createJoinStateEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore, 2), // Status 200, ) @@ -506,7 +592,7 @@ func TestBackfillingHistory(t *testing.T) { // [1 insertion event + 2 historical events + 1 chunk event + 1 insertion event] if len(historicalEventIDs) != 5 { - t.Fatalf("Expected eventID list should be length 15 but saw %d: %s", len(historicalEventIDs), historicalEventIDs) + t.Fatalf("Expected eventID list should be length 5 but saw %d: %v", len(historicalEventIDs), historicalEventIDs) } beforeMarkerMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ @@ -536,30 +622,16 @@ func TestBackfillingHistory(t *testing.T) { }, }) - // Send a marker event to let all of the homeservers know about the - // insertion point where all of the historical messages are at - markerEvent := b.Event{ - Type: markerEventType, - Content: map[string]interface{}{ - markerInsertionContentField: baseInsertionEventID, - }, - } - // We can't use as.SendEventSynced(...) because application services can't use the /sync API - markerSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", markerEvent.Type, "txn-m123"}, client.WithJSONBody(t, markerEvent.Content)) - markerSendBody := client.ParseJSON(t, markerSendRes) - markerEventID := client.GetJSONFieldStr(t, markerSendBody, "event_id") - - // Make sure the marker event has reached the remote homeserver - remoteCharlie.SyncUntilTimelineHas(t, roomID, func(ev gjson.Result) bool { - return ev.Get("event_id").Str == markerEventID - }) + // Send the marker event + sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) - messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ "dir": []string{"b"}, "limit": []string{"100"}, })) - must.MatchResponse(t, messagesRes, match.HTTPResponse{ + // Make sure all of the historical messages are visible when we scrollback again + must.MatchResponse(t, remoteMessagesRes, match.HTTPResponse{ JSON: []match.JSON{ match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { return r.Get("event_id").Str @@ -569,10 +641,9 @@ func TestBackfillingHistory(t *testing.T) { }) t.Run("When messages have already been scrolled back through, new historical messages are visible in next scroll back on federated server", func(t *testing.T) { - t.Skip("Skipping until federation is implemented") t.Parallel() - roomID := as.CreateRoom(t, createRoomOpts) + roomID := as.CreateRoom(t, createPublicRoomOpts) alice.JoinRoom(t, roomID, nil) // Join the room from a remote homeserver before any backfilled messages are sent @@ -596,26 +667,58 @@ func TestBackfillingHistory(t *testing.T) { batchSendRes := batchSendHistoricalMessages( t, as, - []string{virtualUserID}, roomID, eventIdBefore, - timeAfterEventBefore, "", - 2, + createJoinStateEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBackfillRequest([]string{virtualUserID}, timeAfterEventBefore, 2), // Status 200, ) batchSendResBody := client.ParseJSON(t, batchSendRes) historicalEventIDs := getEventsFromBatchSendResponseBody(t, batchSendResBody) + baseInsertionEventID := historicalEventIDs[len(historicalEventIDs)-1] - // TODO: Send marker event + // [1 insertion event + 2 historical events + 1 chunk event + 1 insertion event] + if len(historicalEventIDs) != 5 { + t.Fatalf("Expected eventID list should be length 5 but saw %d: %s", len(historicalEventIDs), historicalEventIDs) + } - messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + beforeMarkerMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ "dir": []string{"b"}, "limit": []string{"100"}, })) + beforeMarkerMesssageResBody := client.ParseJSON(t, beforeMarkerMessagesRes) + eventDebugStringsFromBeforeMarkerResponse := getRelevantEventDebugStringsFromMessagesResponse(t, beforeMarkerMesssageResBody) + // Since the original body can only be read once, create a new one from the body bytes we just read + beforeMarkerMessagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(beforeMarkerMesssageResBody)) + // Make sure the history isn't visible before we expect it to be there. + // This is to avoid some bug in the homeserver using some unknown + // mechanism to distribute the historical messages to other homeservers. + must.MatchResponse(t, beforeMarkerMessagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONArrayEach("chunk", func(r gjson.Result) error { + // Throw if we find one of the historical events in the message response + for _, historicalEventID := range historicalEventIDs { + if r.Get("event_id").Str == historicalEventID { + return fmt.Errorf("Historical event (%s) found on remote homeserver before marker event was sent out\nmessage response (%d): %v\nhistoricalEventIDs (%d): %v", historicalEventID, len(eventDebugStringsFromBeforeMarkerResponse), eventDebugStringsFromBeforeMarkerResponse, len(historicalEventIDs), historicalEventIDs) + } + } + return nil + }), + }, + }) - must.MatchResponse(t, messagesRes, match.HTTPResponse{ + // Send the marker event + sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) + + remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + // Make sure all of the historical messages are visible when we scrollback again + must.MatchResponse(t, remoteMessagesRes, match.HTTPResponse{ JSON: []match.JSON{ match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { return r.Get("event_id").Str @@ -649,7 +752,7 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, checkCounter := 0 for { if time.Since(start) > c.SyncUntilTimeout { - t.Fatalf("fetchMessagesUntilResponseHas timed out. Called check function %d times", checkCounter) + t.Fatalf("fetchUntilMessagesResponseHas timed out. Called check function %d times", checkCounter) } messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ @@ -734,6 +837,38 @@ func ensureVirtualUserRegistered(t *testing.T, c *client.CSAPI, virtualUserLocal } } +func sendMarkerAndEnsureBackfilled(t *testing.T, as *client.CSAPI, c *client.CSAPI, roomID, insertionEventID string) { + t.Helper() + + // Send a marker event to let all of the homeservers know about the + // insertion point where all of the historical messages are at + markerEvent := b.Event{ + Type: markerEventType, + Content: map[string]interface{}{ + markerInsertionContentField: insertionEventID, + }, + } + // We can't use as.SendEventSynced(...) because application services can't use the /sync API + markerSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", markerEvent.Type, "txn-m123"}, client.WithJSONBody(t, markerEvent.Content)) + markerSendBody := client.ParseJSON(t, markerSendRes) + markerEventID := client.GetJSONFieldStr(t, markerSendBody, "event_id") + + // Make sure the marker event has reached the remote homeserver + c.SyncUntilTimelineHas(t, roomID, func(ev gjson.Result) bool { + return ev.Get("event_id").Str == markerEventID + }) + + // Make sure all of the base insertion event has been backfilled + // after the marker was received + fetchUntilMessagesResponseHas(t, c, roomID, func(ev gjson.Result) bool { + if ev.Get("event_id").Str == insertionEventID { + return true + } + + return false + }) +} + func createMessagesInRoom(t *testing.T, c *client.CSAPI, roomID string, count int) (eventIDs []string) { eventIDs = make([]string, count) for i := 0; i < len(eventIDs); i++ { @@ -751,22 +886,64 @@ func createMessagesInRoom(t *testing.T, c *client.CSAPI, roomID string, count in return eventIDs } -var chunkCount int64 = 0 +func createInviteStateEventsForBackfillRequest( + invitedByUserID string, + virtualUserIDs []string, + insertTime time.Time, +) []map[string]interface{} { + // Timestamp in milliseconds + insertOriginServerTs := uint64(insertTime.UnixNano() / int64(time.Millisecond)) -func batchSendHistoricalMessages( - t *testing.T, - c *client.CSAPI, + stateEvents := make([]map[string]interface{}, len(virtualUserIDs)) + for i, virtualUserID := range virtualUserIDs { + inviteEvent := map[string]interface{}{ + "type": "m.room.member", + "sender": invitedByUserID, + "origin_server_ts": insertOriginServerTs, + "content": map[string]interface{}{ + "membership": "invite", + }, + "state_key": virtualUserID, + } + + stateEvents[i] = inviteEvent + } + + return stateEvents +} + +func createJoinStateEventsForBackfillRequest( virtualUserIDs []string, - roomID string, - insertAfterEventId string, insertTime time.Time, - chunkID string, - count int, - expectedStatus int, -) (res *http.Response) { +) []map[string]interface{} { // Timestamp in milliseconds insertOriginServerTs := uint64(insertTime.UnixNano() / int64(time.Millisecond)) + stateEvents := make([]map[string]interface{}, len(virtualUserIDs)) + for i, virtualUserID := range virtualUserIDs { + joinEvent := map[string]interface{}{ + "type": "m.room.member", + "sender": virtualUserID, + "origin_server_ts": insertOriginServerTs, + "content": map[string]interface{}{ + "membership": "join", + }, + "state_key": virtualUserID, + } + + stateEvents[i] = joinEvent + } + + return stateEvents +} + +func createMessageEventsForBackfillRequest( + virtualUserIDs []string, + insertTime time.Time, + count int, +) []map[string]interface{} { + // Timestamp in milliseconds + insertOriginServerTs := uint64(insertTime.UnixNano() / int64(time.Millisecond)) timeBetweenMessagesMS := uint64(timeBetweenMessages / time.Millisecond) evs := make([]map[string]interface{}, count) @@ -787,20 +964,22 @@ func batchSendHistoricalMessages( evs[i] = newEvent } - state_evs := make([]map[string]interface{}, len(virtualUserIDs)) - for i, virtualUserID := range virtualUserIDs { - joinEvent := map[string]interface{}{ - "type": "m.room.member", - "sender": virtualUserID, - "origin_server_ts": insertOriginServerTs, - "content": map[string]interface{}{ - "membership": "join", - }, - "state_key": virtualUserID, - } + return evs +} - state_evs[i] = joinEvent - } +var chunkCount int64 = 0 + +func batchSendHistoricalMessages( + t *testing.T, + c *client.CSAPI, + roomID string, + insertAfterEventId string, + chunkID string, + stateEventsAtStart []map[string]interface{}, + events []map[string]interface{}, + expectedStatus int, +) (res *http.Response) { + t.Helper() query := make(url.Values, 2) query.Add("prev_event", insertAfterEventId) @@ -814,8 +993,8 @@ func batchSendHistoricalMessages( "POST", []string{"_matrix", "client", "unstable", "org.matrix.msc2716", "rooms", roomID, "batch_send"}, client.WithJSONBody(t, map[string]interface{}{ - "events": evs, - "state_events_at_start": state_evs, + "events": events, + "state_events_at_start": stateEventsAtStart, }), client.WithContentType("application/json"), client.WithQueries(query),