diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 71cc93cbed2c..5c0e571d77a8 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -175,13 +175,13 @@ async def get_state_events( state_filter = state_filter or StateFilter.all() if at_token: - last_events, _ = await self.store.get_recent_events_for_room( - room_id, end_token=at_token.room_key, limit=1 + last_event = await self.store.get_last_event_in_room_before_stream_ordering( + room_id, + end_token=at_token.room_key, ) - if not last_events: + if not last_event: raise NotFoundError("Can't find event for token %s" % (at_token,)) - last_event = last_events[0] # check whether the user is in the room at that time to determine # whether they should be treated as peeking. @@ -200,7 +200,7 @@ async def get_state_events( visible_events = await filter_events_for_client( self.storage, user_id, - last_events, + [last_event], filter_send_to_client=False, is_peeking=is_peeking, ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 69f0e2bd2e2d..f4048149d278 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -661,12 +661,12 @@ async def get_state_at( stream_position: point at which to get state state_filter: The state filter used to fetch state from the database. """ - last_events, _ = await self.store.get_recent_events_for_room( - room_id, end_token=stream_position.room_key, limit=1 + last_event = await self.store.get_last_event_in_room_before_stream_ordering( + room_id, + end_token=stream_position.room_key, ) - if last_events: - last_event = last_events[-1] + if last_event: state = await self.get_state_after_event( last_event, state_filter=state_filter or StateFilter.all() ) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 51316af3a502..2a3ea49c8c40 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -748,6 +748,32 @@ def _f(txn): "get_room_event_before_stream_ordering", _f ) + async def get_last_event_in_room_before_stream_ordering( + self, + room_id: str, + end_token: RoomStreamToken, + ) -> Optional[EventBase]: + """Returns the last event in a room at or before a stream ordering + + Args: + room_id + end_token: The token used to stream from + + Returns: + The most recent event. + """ + + last_row = await self.get_room_event_before_stream_ordering( + room_id=room_id, + stream_ordering=end_token.stream, + ) + if last_row: + _, _, event_id = last_row + event = await self.get_event(event_id, get_prev_content=True) + return event + + return None + async def get_current_room_stream_token_for_room_id( self, room_id: Optional[str] = None ) -> RoomStreamToken: