-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Don't add rejections to the state_group, persist all rejections #948
Changes from all commits
2623cec
8f7f4cb
1b3c3e6
efeb617
a6f06ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -397,6 +397,12 @@ def _persist_event_txn(self, txn, event, context, current_state, backfilled=Fals | |
|
||
@log_function | ||
def _persist_events_txn(self, txn, events_and_contexts, backfilled): | ||
"""Insert some number of room events into the necessary database tables. | ||
|
||
Rejected events are only inserted into the events table, the events_json table, | ||
and the rejections table. Things reading from those table will need to check | ||
whether the event was rejected. | ||
""" | ||
depth_updates = {} | ||
for event, context in events_and_contexts: | ||
# Remove the any existing cache entries for the event_ids | ||
|
@@ -407,21 +413,11 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled): | |
event.room_id, event.internal_metadata.stream_ordering, | ||
) | ||
|
||
if not event.internal_metadata.is_outlier(): | ||
if not event.internal_metadata.is_outlier() and not context.rejected: | ||
depth_updates[event.room_id] = max( | ||
event.depth, depth_updates.get(event.room_id, event.depth) | ||
) | ||
|
||
if context.push_actions: | ||
self._set_push_actions_for_event_and_users_txn( | ||
txn, event, context.push_actions | ||
) | ||
|
||
if event.type == EventTypes.Redaction and event.redacts is not None: | ||
self._remove_push_actions_for_event_id_txn( | ||
txn, event.room_id, event.redacts | ||
) | ||
|
||
for room_id, depth in depth_updates.items(): | ||
self._update_min_depth_for_room_txn(txn, room_id, depth) | ||
|
||
|
@@ -431,14 +427,24 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled): | |
), | ||
[event.event_id for event, _ in events_and_contexts] | ||
) | ||
|
||
have_persisted = { | ||
event_id: outlier | ||
for event_id, outlier in txn.fetchall() | ||
} | ||
|
||
# Remove the events that we've seen before. | ||
event_map = {} | ||
to_remove = set() | ||
for event, context in events_and_contexts: | ||
if context.rejected: | ||
# If the event is rejected then we don't care if the event | ||
# was an outlier or not. | ||
if event.event_id in have_persisted: | ||
# If we have already seen the event then ignore it. | ||
to_remove.add(event) | ||
continue | ||
|
||
# Handle the case of the list including the same event multiple | ||
# times. The tricky thing here is when they differ by whether | ||
# they are an outlier. | ||
|
@@ -463,6 +469,12 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled): | |
|
||
outlier_persisted = have_persisted[event.event_id] | ||
if not event.internal_metadata.is_outlier() and outlier_persisted: | ||
# We received a copy of an event that we had already stored as | ||
# an outlier in the database. We now have some state at that | ||
# so we need to update the state_groups table with that state. | ||
|
||
# insert into the state_group, state_groups_state and | ||
# event_to_state_groups tables. | ||
self._store_mult_state_groups_txn(txn, ((event, context),)) | ||
|
||
metadata_json = encode_json( | ||
|
@@ -478,6 +490,8 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled): | |
(metadata_json, event.event_id,) | ||
) | ||
|
||
# Add an entry to the ex_outlier_stream table to replicate the | ||
# change in outlier status to our workers. | ||
stream_order = event.internal_metadata.stream_ordering | ||
state_group_id = context.state_group or context.new_state_group_id | ||
self._simple_insert_txn( | ||
|
@@ -499,45 +513,21 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled): | |
(False, event.event_id,) | ||
) | ||
|
||
# Update the event_backward_extremities table now that this | ||
# event isn't an outlier any more. | ||
self._update_extremeties(txn, [event]) | ||
|
||
events_and_contexts = [ | ||
ec for ec in events_and_contexts if ec[0] not in to_remove | ||
] | ||
|
||
if not events_and_contexts: | ||
# Make sure we don't pass an empty list to functions that expect to | ||
# be storing at least one element. | ||
return | ||
|
||
self._store_mult_state_groups_txn(txn, events_and_contexts) | ||
|
||
self._handle_mult_prev_events( | ||
txn, | ||
events=[event for event, _ in events_and_contexts], | ||
) | ||
|
||
for event, _ in events_and_contexts: | ||
if event.type == EventTypes.Name: | ||
self._store_room_name_txn(txn, event) | ||
elif event.type == EventTypes.Topic: | ||
self._store_room_topic_txn(txn, event) | ||
elif event.type == EventTypes.Message: | ||
self._store_room_message_txn(txn, event) | ||
elif event.type == EventTypes.Redaction: | ||
self._store_redaction(txn, event) | ||
elif event.type == EventTypes.RoomHistoryVisibility: | ||
self._store_history_visibility_txn(txn, event) | ||
elif event.type == EventTypes.GuestAccess: | ||
self._store_guest_access_txn(txn, event) | ||
|
||
self._store_room_members_txn( | ||
txn, | ||
[ | ||
event | ||
for event, _ in events_and_contexts | ||
if event.type == EventTypes.Member | ||
], | ||
backfilled=backfilled, | ||
) | ||
# From this point onwards the events are only events that we haven't | ||
# seen before. | ||
|
||
def event_dict(event): | ||
return { | ||
|
@@ -591,10 +581,41 @@ def event_dict(event): | |
], | ||
) | ||
|
||
if context.rejected: | ||
self._store_rejections_txn( | ||
txn, event.event_id, context.rejected | ||
) | ||
# Remove the rejected events from the list now that we've added them | ||
# to the events table and the events_json table. | ||
to_remove = set() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok I see it's deliberate. reusing the same variable is bad karma imho. But basically it stems from this method being too big. Factor some of this shit out. |
||
for event, context in events_and_contexts: | ||
if context.rejected: | ||
# Insert the event_id into the rejections table | ||
self._store_rejections_txn( | ||
txn, event.event_id, context.rejected | ||
) | ||
to_remove.add(event) | ||
|
||
events_and_contexts = [ | ||
ec for ec in events_and_contexts if ec[0] not in to_remove | ||
] | ||
|
||
if not events_and_contexts: | ||
# Make sure we don't pass an empty list to functions that expect to | ||
# be storing at least one element. | ||
return | ||
|
||
# From this point onwards the events are only ones that weren't rejected. | ||
|
||
for event, context in events_and_contexts: | ||
# Insert all the push actions into the event_push_actions table. | ||
if context.push_actions: | ||
self._set_push_actions_for_event_and_users_txn( | ||
txn, event, context.push_actions | ||
) | ||
|
||
if event.type == EventTypes.Redaction and event.redacts is not None: | ||
# Remove the entries in the event_push_actions table for the | ||
# redacted event. | ||
self._remove_push_actions_for_event_id_txn( | ||
txn, event.room_id, event.redacts | ||
) | ||
|
||
self._simple_insert_many_txn( | ||
txn, | ||
|
@@ -610,6 +631,49 @@ def event_dict(event): | |
], | ||
) | ||
|
||
# Insert into the state_groups, state_groups_state, and | ||
# event_to_state_groups tables. | ||
self._store_mult_state_groups_txn(txn, events_and_contexts) | ||
|
||
# Update the event_forward_extremities, event_backward_extremities and | ||
# event_edges tables. | ||
self._handle_mult_prev_events( | ||
txn, | ||
events=[event for event, _ in events_and_contexts], | ||
) | ||
|
||
for event, _ in events_and_contexts: | ||
if event.type == EventTypes.Name: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. factor this fucker out: you've already gone to the effort of moving it. Move it somewhere sensible instead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I can reasonably factor it out since it is such a mess in it's current state. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. surely you can make a |
||
# Insert into the room_names and event_search tables. | ||
self._store_room_name_txn(txn, event) | ||
elif event.type == EventTypes.Topic: | ||
# Insert into the topics table and event_search table. | ||
self._store_room_topic_txn(txn, event) | ||
elif event.type == EventTypes.Message: | ||
# Insert into the event_search table. | ||
self._store_room_message_txn(txn, event) | ||
elif event.type == EventTypes.Redaction: | ||
# Insert into the redactions table. | ||
self._store_redaction(txn, event) | ||
elif event.type == EventTypes.RoomHistoryVisibility: | ||
# Insert into the event_search table. | ||
self._store_history_visibility_txn(txn, event) | ||
elif event.type == EventTypes.GuestAccess: | ||
# Insert into the event_search table. | ||
self._store_guest_access_txn(txn, event) | ||
|
||
# Insert into the room_memberships table. | ||
self._store_room_members_txn( | ||
txn, | ||
[ | ||
event | ||
for event, _ in events_and_contexts | ||
if event.type == EventTypes.Member | ||
], | ||
backfilled=backfilled, | ||
) | ||
|
||
# Insert event_reference_hashes table. | ||
self._store_event_reference_hashes_txn( | ||
txn, [event for event, _ in events_and_contexts] | ||
) | ||
|
@@ -654,6 +718,7 @@ def event_dict(event): | |
], | ||
) | ||
|
||
# Prefill the event cache | ||
self._add_to_cache(txn, events_and_contexts) | ||
|
||
if backfilled: | ||
|
@@ -666,11 +731,6 @@ def event_dict(event): | |
# Outlier events shouldn't clobber the current state. | ||
continue | ||
|
||
if context.rejected: | ||
# If the event failed it's auth checks then it shouldn't | ||
# clobbler the current state. | ||
continue | ||
|
||
txn.call_after( | ||
self._get_current_state_for_key.invalidate, | ||
(event.room_id, event.type, event.state_key,) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code appears to have landed between a comment and the code it refers to, thus making the comment confusing. Because this method wasn't confusing enough before, I suppose.