From 9fdbad025f9c12e1c388d4102f95b35524a040bc Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 23 Feb 2022 13:56:00 +0000 Subject: [PATCH 1/3] Minor typing fixes for `synapse/storage/persist_events.py` Signed-off-by: Sean Quah --- changelog.d/12069.misc | 1 + synapse/storage/persist_events.py | 9 ++++----- 2 files changed, 5 insertions(+), 5 deletions(-) create mode 100644 changelog.d/12069.misc diff --git a/changelog.d/12069.misc b/changelog.d/12069.misc new file mode 100644 index 000000000000..8374a63220d1 --- /dev/null +++ b/changelog.d/12069.misc @@ -0,0 +1 @@ +Minor typing fixes. diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 428d66a617b1..f68ec3331658 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -460,14 +460,13 @@ async def _persist_event_batch( ) for room_id, ev_ctx_rm in events_by_room.items(): - latest_event_ids = ( + latest_event_ids = set( await self.main_store.get_latest_event_ids_in_room(room_id) ) new_latest_event_ids = await self._calculate_new_extremities( room_id, ev_ctx_rm, latest_event_ids ) - latest_event_ids = set(latest_event_ids) if new_latest_event_ids == latest_event_ids: # No change in extremities, so no change in state continue @@ -567,7 +566,7 @@ async def _persist_event_batch( ) if not is_still_joined: logger.info("Server no longer in room %s", room_id) - latest_event_ids = [] + latest_event_ids = set() current_state = {} delta.no_longer_in_room = True @@ -906,9 +905,9 @@ async def _prune_extremities( # Ideally we'd figure out a way of still being able to drop old # dummy events that reference local events, but this is good enough # as a first cut. - events_to_check = [event] + events_to_check: Collection[EventBase] = [event] while events_to_check: - new_events = set() + new_events: Set[str] = set() for event_to_check in events_to_check: if self.is_mine_id(event_to_check.sender): if event_to_check.type != EventTypes.Dummy: From 45c8cd4da5b83eac2e8e34e5e8c377838775c72e Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 24 Feb 2022 11:26:55 +0000 Subject: [PATCH 2/3] Add and fix some more type hints --- synapse/storage/databases/main/events.py | 21 ++++++++++++--------- synapse/storage/persist_events.py | 16 ++++++++-------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a1d7a9b41300..d04ba39e5bc7 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -130,7 +130,7 @@ async def _persist_events_and_state_updates( *, current_state_for_room: Dict[str, StateMap[str]], state_delta_for_room: Dict[str, DeltaState], - new_forward_extremeties: Dict[str, List[str]], + new_forward_extremities: Dict[str, Set[str]], use_negative_stream_ordering: bool = False, inhibit_local_membership_updates: bool = False, ) -> None: @@ -143,7 +143,7 @@ async def _persist_events_and_state_updates( the room based on forward extremities state_delta_for_room: Map from room_id to the delta to apply to room state - new_forward_extremities: Map from room_id to list of event IDs + new_forward_extremities: Map from room_id to set of event IDs that are the new forward extremities of the room. use_negative_stream_ordering: Whether to start stream_ordering on the negative side and decrement. This should be set as True @@ -193,7 +193,7 @@ async def _persist_events_and_state_updates( events_and_contexts=events_and_contexts, inhibit_local_membership_updates=inhibit_local_membership_updates, state_delta_for_room=state_delta_for_room, - new_forward_extremeties=new_forward_extremeties, + new_forward_extremities=new_forward_extremities, ) persist_event_counter.inc(len(events_and_contexts)) @@ -220,7 +220,7 @@ async def _persist_events_and_state_updates( for room_id, new_state in current_state_for_room.items(): self.store.get_current_state_ids.prefill((room_id,), new_state) - for room_id, latest_event_ids in new_forward_extremeties.items(): + for room_id, latest_event_ids in new_forward_extremities.items(): self.store.get_latest_event_ids_in_room.prefill( (room_id,), list(latest_event_ids) ) @@ -334,7 +334,7 @@ def _persist_events_txn( events_and_contexts: List[Tuple[EventBase, EventContext]], inhibit_local_membership_updates: bool = False, state_delta_for_room: Optional[Dict[str, DeltaState]] = None, - new_forward_extremeties: Optional[Dict[str, List[str]]] = None, + new_forward_extremities: Optional[Dict[str, Set[str]]] = None, ): """Insert some number of room events into the necessary database tables. @@ -353,13 +353,13 @@ def _persist_events_txn( from the database. This is useful when retrying due to IntegrityError. state_delta_for_room: The current-state delta for each room. - new_forward_extremetie: The new forward extremities for each room. + new_forward_extremities: The new forward extremities for each room. For each room, a list of the event ids which are the forward extremities. """ state_delta_for_room = state_delta_for_room or {} - new_forward_extremeties = new_forward_extremeties or {} + new_forward_extremities = new_forward_extremities or {} all_events_and_contexts = events_and_contexts @@ -372,7 +372,7 @@ def _persist_events_txn( self._update_forward_extremities_txn( txn, - new_forward_extremities=new_forward_extremeties, + new_forward_extremities=new_forward_extremities, max_stream_order=max_stream_order, ) @@ -1158,7 +1158,10 @@ def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str): ) def _update_forward_extremities_txn( - self, txn, new_forward_extremities, max_stream_order + self, + txn: LoggingTransaction, + new_forward_extremities: Dict[str, Set[str]], + max_stream_order: int, ): for room_id in new_forward_extremities.keys(): self.db_pool.simple_delete_txn( diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index f68ec3331658..7d543fdbe08a 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -427,21 +427,21 @@ async def _persist_event_batch( # NB: Assumes that we are only persisting events for one room # at a time. - # map room_id->list[event_ids] giving the new forward + # map room_id->set[event_ids] giving the new forward # extremities in each room - new_forward_extremeties = {} + new_forward_extremities: Dict[str, Set[str]] = {} # map room_id->(type,state_key)->event_id tracking the full # state in each room after adding these events. # This is simply used to prefill the get_current_state_ids # cache - current_state_for_room = {} + current_state_for_room: Dict[str, StateMap[str]] = {} # map room_id->(to_delete, to_insert) where to_delete is a list # of type/state keys to remove from current state, and to_insert # is a map (type,key)->event_id giving the state delta in each # room - state_delta_for_room = {} + state_delta_for_room: Dict[str, DeltaState] = {} # Set of remote users which were in rooms the server has left. We # should check if we still share any rooms and if not we mark their @@ -477,7 +477,7 @@ async def _persist_event_batch( # extremities, so we'll `continue` above and skip this bit.) assert new_latest_event_ids, "No forward extremities left!" - new_forward_extremeties[room_id] = new_latest_event_ids + new_forward_extremities[room_id] = new_latest_event_ids len_1 = ( len(latest_event_ids) == 1 @@ -532,7 +532,7 @@ async def _persist_event_batch( # extremities, so we'll `continue` above and skip this bit.) assert new_latest_event_ids, "No forward extremities left!" - new_forward_extremeties[room_id] = new_latest_event_ids + new_forward_extremities[room_id] = new_latest_event_ids # If either are not None then there has been a change, # and we need to work out the delta (or use that @@ -581,7 +581,7 @@ async def _persist_event_batch( chunk, current_state_for_room=current_state_for_room, state_delta_for_room=state_delta_for_room, - new_forward_extremeties=new_forward_extremeties, + new_forward_extremities=new_forward_extremities, use_negative_stream_ordering=backfilled, inhibit_local_membership_updates=backfilled, ) @@ -595,7 +595,7 @@ async def _calculate_new_extremities( room_id: str, event_contexts: List[Tuple[EventBase, EventContext]], latest_event_ids: Collection[str], - ): + ) -> Set[str]: """Calculates the new forward extremities for a room given events to persist. From a5ee447ba62f68944af6af90dadb7bdea59c0863 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Thu, 24 Feb 2022 19:45:31 +0000 Subject: [PATCH 3/3] Update synapse/storage/databases/main/events.py Co-authored-by: Patrick Cloke --- synapse/storage/databases/main/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index d04ba39e5bc7..ae113f758d61 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -335,7 +335,7 @@ def _persist_events_txn( inhibit_local_membership_updates: bool = False, state_delta_for_room: Optional[Dict[str, DeltaState]] = None, new_forward_extremities: Optional[Dict[str, Set[str]]] = None, - ): + ) -> None: """Insert some number of room events into the necessary database tables. Rejected events are only inserted into the events table, the events_json table,