-
Notifications
You must be signed in to change notification settings - Fork 211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sliding sync: various fixes to background update #17636
Changes from all commits
6a78c63
86e50ea
2676c7c
e4cd5b3
fac2fe8
1267888
70c6722
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ | |
SlidingSyncMembershipSnapshotSharedInsertValues, | ||
SlidingSyncStateInsertValues, | ||
) | ||
from synapse.storage.databases.main.events_worker import DatabaseCorruptionError | ||
from synapse.storage.databases.main.state_deltas import StateDeltasStore | ||
from synapse.storage.databases.main.stream import StreamWorkerStore | ||
from synapse.storage.types import Cursor | ||
|
@@ -1857,6 +1858,7 @@ async def _sliding_sync_membership_snapshots_bg_update( | |
initial_phase = True | ||
|
||
last_room_id = progress.get("last_room_id", "") | ||
last_user_id = progress.get("last_user_id", "") | ||
last_event_stream_ordering = progress["last_event_stream_ordering"] | ||
|
||
def _find_memberships_to_update_txn( | ||
|
@@ -1887,11 +1889,11 @@ def _find_memberships_to_update_txn( | |
FROM local_current_membership AS c | ||
INNER JOIN events AS e USING (event_id) | ||
LEFT JOIN rooms AS r ON (c.room_id = r.room_id) | ||
WHERE c.room_id > ? | ||
ORDER BY c.room_id ASC | ||
WHERE (c.room_id, c.user_id) > (?, ?) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm guessing this means the background update needed to be restarted to ensure we didn't drop anything along the way before? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or actually, I'm guessing the background update was just stuck on one room because it had more members than the batch size. So it doesn't need to be restarted. |
||
ORDER BY c.room_id ASC, c.user_id ASC | ||
LIMIT ? | ||
""", | ||
(last_room_id, batch_size), | ||
(last_room_id, last_user_id, batch_size), | ||
) | ||
elif last_event_stream_ordering is not None: | ||
# It's important to sort by `event_stream_ordering` *ascending* (oldest to | ||
|
@@ -1993,13 +1995,16 @@ def _find_previous_membership_txn( | |
WHERE | ||
room_id = ? | ||
AND m.user_id = ? | ||
AND (m.membership = ? OR m.membership = ?) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What was the root cause to add this caveat to only check for Perhaps a double-leave event somewhere although Synapse doesn't seem to allow that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, this was a ban + leave I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh so ban and unban There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Documented in #17654 |
||
AND e.event_id != ? | ||
ORDER BY e.topological_ordering DESC | ||
LIMIT 1 | ||
""", | ||
( | ||
room_id, | ||
user_id, | ||
Membership.INVITE, | ||
Membership.KNOCK, | ||
event_id, | ||
), | ||
) | ||
|
@@ -2081,9 +2086,17 @@ def _find_previous_membership_txn( | |
# have `current_state_events` and we should have some current state | ||
# for each room | ||
if current_state_ids_map: | ||
fetched_events = await self.get_events( | ||
current_state_ids_map.values() | ||
) | ||
try: | ||
fetched_events = await self.get_events( | ||
current_state_ids_map.values() | ||
) | ||
except DatabaseCorruptionError as e: | ||
logger.warning( | ||
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s", | ||
room_id, | ||
e, | ||
) | ||
continue | ||
|
||
current_state_map: StateMap[EventBase] = { | ||
state_key: fetched_events[event_id] | ||
|
@@ -2124,7 +2137,7 @@ def _find_previous_membership_txn( | |
False | ||
) | ||
elif membership in (Membership.INVITE, Membership.KNOCK) or ( | ||
membership == Membership.LEAVE and is_outlier | ||
membership in (Membership.LEAVE, Membership.BAN) and is_outlier | ||
): | ||
invite_or_knock_event_id = membership_event_id | ||
invite_or_knock_membership = membership | ||
|
@@ -2135,7 +2148,7 @@ def _find_previous_membership_txn( | |
# us a consistent view of the room state regardless of your | ||
# membership (i.e. the room shouldn't disappear if your using the | ||
# `is_encrypted` filter and you leave). | ||
if membership == Membership.LEAVE and is_outlier: | ||
if membership in (Membership.LEAVE, Membership.BAN) and is_outlier: | ||
invite_or_knock_event_id, invite_or_knock_membership = ( | ||
await self.db_pool.runInteraction( | ||
"sliding_sync_membership_snapshots_bg_update._find_previous_membership", | ||
|
@@ -2182,7 +2195,15 @@ def _find_previous_membership_txn( | |
await_full_state=False, | ||
) | ||
|
||
fetched_events = await self.get_events(state_ids_map.values()) | ||
try: | ||
fetched_events = await self.get_events(state_ids_map.values()) | ||
except DatabaseCorruptionError as e: | ||
logger.warning( | ||
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s", | ||
room_id, | ||
e, | ||
) | ||
continue | ||
|
||
state_map: StateMap[EventBase] = { | ||
state_key: fetched_events[event_id] | ||
|
@@ -2296,7 +2317,7 @@ def _fill_table_txn(txn: LoggingTransaction) -> None: | |
( | ||
room_id, | ||
_room_id_from_rooms_table, | ||
_user_id, | ||
user_id, | ||
_sender, | ||
_membership_event_id, | ||
_membership, | ||
|
@@ -2308,8 +2329,11 @@ def _fill_table_txn(txn: LoggingTransaction) -> None: | |
progress = { | ||
"initial_phase": initial_phase, | ||
"last_room_id": room_id, | ||
"last_event_stream_ordering": membership_event_stream_ordering, | ||
"last_user_id": user_id, | ||
"last_event_stream_ordering": last_event_stream_ordering, | ||
} | ||
if not initial_phase: | ||
progress["last_event_stream_ordering"] = membership_event_stream_ordering | ||
|
||
await self.db_pool.updates._background_update_progress( | ||
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I assume this became obvious after running the background update on
matrix.org
since this is aNOT NULL
column 😵