Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Refactor backfilled behavior into specific function parameters
Browse files Browse the repository at this point in the history
Part of #11300
  • Loading branch information
MadLittleMods committed Nov 19, 2021
1 parent 7ffddd8 commit d203d22
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 24 deletions.
12 changes: 7 additions & 5 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -1821,7 +1821,8 @@ async def persist_events_and_notify(
self,
room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
*,
inhibit_push_notifications: bool = False,
) -> int:
"""Persists events and tells the notifier/pushers about them, if
necessary.
Expand All @@ -1831,8 +1832,9 @@ async def persist_events_and_notify(
event_and_contexts: Sequence of events with their associated
context that should be persisted. All events must belong to
the same room.
backfilled: Whether these events are a result of
backfilling or not
inhibit_push_notifications: Whether to stop the notifiers/pushers
from knowing about the event. Usually this is done for any backfilled
event.
Returns:
The stream ID after which all events have been persisted.
Expand All @@ -1850,7 +1852,7 @@ async def persist_events_and_notify(
store=self._store,
room_id=room_id,
event_and_contexts=batch,
backfilled=backfilled,
inhibit_push_notifications=inhibit_push_notifications,
)
return result["max_stream_id"]
else:
Expand All @@ -1867,7 +1869,7 @@ async def persist_events_and_notify(
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)

if not backfilled: # Never notify for backfilled events
if not inhibit_push_notifications: # Never notify for backfilled events
for event in events:
await self._notify_persisted_event(event, max_stream_token)

Expand Down
17 changes: 10 additions & 7 deletions synapse/replication/http/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
}],
"backfilled": false
"inhibit_push_notifications": false
}
200 OK
Expand All @@ -69,14 +69,17 @@ def __init__(self, hs: "HomeServer"):
self.federation_event_handler = hs.get_federation_event_handler()

@staticmethod
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
async def _serialize_payload(
store, room_id, event_and_contexts, inhibit_push_notifications: bool = False
):
"""
Args:
store
room_id (str)
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of
backfilling
inhibit_push_notifications (bool): Whether to stop the notifiers/pushers
from knowing about the event. Usually this is done for any backfilled
event.
"""
event_payloads = []
for event, context in event_and_contexts:
Expand All @@ -96,7 +99,7 @@ async def _serialize_payload(store, room_id, event_and_contexts, backfilled):

payload = {
"events": event_payloads,
"backfilled": backfilled,
"inhibit_push_notifications": inhibit_push_notifications,
"room_id": room_id,
}

Expand All @@ -107,7 +110,7 @@ async def _handle_request(self, request):
content = parse_json_object_from_request(request)

room_id = content["room_id"]
backfilled = content["backfilled"]
inhibit_push_notifications = content["inhibit_push_notifications"]

event_payloads = content["events"]

Expand All @@ -132,7 +135,7 @@ async def _handle_request(self, request):
logger.info("Got %d events from federation", len(event_and_contexts))

max_stream_id = await self.federation_event_handler.persist_events_and_notify(
room_id, event_and_contexts, backfilled
room_id, event_and_contexts, inhibit_push_notifications
)

return 200, {"max_stream_id": max_stream_id}
Expand Down
39 changes: 29 additions & 10 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ def __init__(
async def _persist_events_and_state_updates(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
*,
current_state_for_room: Dict[str, StateMap[str]],
state_delta_for_room: Dict[str, DeltaState],
new_forward_extremeties: Dict[str, List[str]],
backfilled: bool = False,
use_negative_stream_ordering: bool = False,
) -> None:
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
Expand All @@ -137,7 +138,9 @@ async def _persist_events_and_state_updates(
room state
new_forward_extremities: Map from room_id to list of event IDs
that are the new forward extremities of the room.
backfilled
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. Usually this is done for any
backfilled event.
Returns:
Resolves when the events have been persisted
Expand All @@ -159,7 +162,7 @@ async def _persist_events_and_state_updates(
#
# Note: Multiple instances of this function cannot be in flight at
# the same time for the same room.
if backfilled:
if use_negative_stream_ordering:
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
len(events_and_contexts)
)
Expand All @@ -182,7 +185,8 @@ async def _persist_events_and_state_updates(
)
persist_event_counter.inc(len(events_and_contexts))

if not backfilled:
# TODO: test that this actuall works
if stream < 0:
# backfilled events have negative stream orderings, so we don't
# want to set the event_persisted_position to that.
synapse.metrics.event_persisted_position.set(
Expand Down Expand Up @@ -1200,21 +1204,25 @@ def _update_room_depths_txn(
self,
txn,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
*,
update_room_forward_stream_ordering: bool = True,
):
"""Update min_depth for each room
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
backfilled (bool): True if the events were backfilled
update_room_forward_stream_ordering (bool): Whether to update the
stream_ordering position to mark the latest event as the front
of the room. This should only be set as false for backfilled
events.
"""
depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
if not backfilled:
if update_room_forward_stream_ordering:
txn.call_after(
self.store._events_stream_cache.entity_has_changed,
event.room_id,
Expand Down Expand Up @@ -1638,8 +1646,19 @@ def _store_event_reference_hashes_txn(self, txn, events):
txn, table="event_reference_hashes", values=vals
)

def _store_room_members_txn(self, txn, events, backfilled):
"""Store a room member in the database."""
def _store_room_members_txn(
self, txn, events, *, inhibit_local_membership_updates: bool = False
):
"""
Store a room member in the database.
Args:
txn: The transaction to use.
events: List of events to store.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. Usually this is done for
backfilled events.
"""

def non_null_str_or_none(val: Any) -> Optional[str]:
return val if isinstance(val, str) and "\u0000" not in val else None
Expand Down Expand Up @@ -1682,7 +1701,7 @@ def non_null_str_or_none(val: Any) -> Optional[str]:
# band membership", like a remote invite or a rejection of a remote invite.
if (
self.is_mine_id(event.state_key)
and not backfilled
and not inhibit_local_membership_updates
and event.internal_metadata.is_outlier()
and event.internal_metadata.is_out_of_band_membership()
):
Expand Down
10 changes: 8 additions & 2 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,19 @@ async def persist_event(
async def _persist_event_batch(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool = False,
should_calculate_state_and_forward_extrems: bool = True,
) -> Dict[str, str]:
"""Callback for the _event_persist_queue
Calculates the change to current state and forward extremities, and
persists the given events and with those updates.
Args:
events_and_contexts:
should_calculate_state_and_forward_extrems: Determines whether we
need to calculate the state and new forward extremities for the
room. This should be set to false for backfilled events.
Returns:
A dictionary of event ID to event ID we didn't persist as we already
had another event persisted with the same TXN ID.
Expand Down Expand Up @@ -448,7 +454,7 @@ async def _persist_event_batch(
# device lists as stale.
potentially_left_users: Set[str] = set()

if not backfilled:
if should_calculate_state_and_forward_extrems:
with Measure(self._clock, "_calculate_state_and_extrem"):
# Work out the new "current state" for each room.
# We do this by working out what the new extremities are and then
Expand Down

0 comments on commit d203d22

Please sign in to comment.