Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

When purging, don't de-delta state groups we're about to delete #2214

Merged
merged 3 commits into from
May 11, 2017
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2033,6 +2033,8 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
for event_id, state_key in event_rows:
txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))

logger.debug("[purge] Finding new backward extremities")

# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
txn.execute(
Expand All @@ -2045,6 +2047,8 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
)
new_backwards_extrems = txn.fetchall()

logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)

txn.execute(
"DELETE FROM event_backward_extremities WHERE room_id = ?",
(room_id,)
Expand All @@ -2059,6 +2063,8 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
]
)

logger.debug("[purge] finding redundant state groups")

# Get all state groups that are only referenced by events that are
# to be deleted.
txn.execute(
Expand All @@ -2074,15 +2080,14 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
)

state_rows = txn.fetchall()
state_groups_to_delete = [sg for sg, in state_rows]
state_groups_to_delete = set([sg for sg, in state_rows])

# Now we get all the state groups that rely on these state groups
logger.debug("[purge] finding state groups which depend on redundant"
" state groups")
new_state_edges = []
chunks = [
state_groups_to_delete[i:i + 100]
for i in xrange(0, len(state_groups_to_delete), 100)
]
for chunk in chunks:
for i in xrange(0, len(state_rows), 100):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems odd to dedupe state_groups_to_delete and then use the non-deduped list here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the set is for easy lookup at line 2101 rather than deduping. We can't easily divide a set into chunks, hence the use of the list here. There shouldn't be any dups because the query has a GROUP BY.

chunk = [sg for sg, in state_rows[i:i + 100]]
rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
Expand All @@ -2091,11 +2096,16 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
retcols=["state_group"],
keyvalues={},
)
new_state_edges.extend(row["state_group"] for row in rows)
new_state_edges.extend(
row["state_group"] for row in rows
if row["state_group"] not in state_groups_to_delete
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment this please? It took me a few moments to realise what was going on

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup will do more comments

)

# Now we turn the state groups that reference to-be-deleted state groups
# to non delta versions.
for new_state_edge in new_state_edges:
logger.debug("[purge] de-delta-ing remaining state group %s",
new_state_edge)
curr_state = self._get_state_groups_from_groups_txn(
txn, [new_state_edge], types=None
)
Expand Down Expand Up @@ -2132,6 +2142,7 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
],
)

logger.debug("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
state_rows
Expand All @@ -2140,12 +2151,15 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
"DELETE FROM state_groups WHERE id = ?",
state_rows
)

# Delete all non-state
logger.debug("[purge] removing events from event_to_state_groups")
txn.executemany(
"DELETE FROM event_to_state_groups WHERE event_id = ?",
[(event_id,) for event_id, _ in event_rows]
)

logger.debug("[purge] updating room_depth")
txn.execute(
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
(topological_ordering, room_id,)
Expand All @@ -2171,16 +2185,15 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
"event_signatures",
"rejections",
):
logger.debug("[purge] removing non-state events from %s", table)

txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
to_delete
)

txn.executemany(
"DELETE FROM events WHERE event_id = ?",
to_delete
)
# Mark all state and own events as outliers
logger.debug("[purge] marking events as outliers")
txn.executemany(
"UPDATE events SET outlier = ?"
" WHERE event_id = ?",
Expand All @@ -2190,6 +2203,8 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
]
)

logger.debug("[purge] done")

@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
Expand Down