Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make purge_history operate on tokens #3221

Merged
merged 5 commits into from
May 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ def __init__(self, hs):
# map from purge id to PurgeStatus
self._purges_by_id = {}

def start_purge_history(self, room_id, topological_ordering,
def start_purge_history(self, room_id, token,
delete_local_events=False):
"""Start off a history purge on a room.
Args:
room_id (str): The room to purge from
topological_ordering (int): minimum topo ordering to preserve
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
Expand All @@ -115,19 +115,19 @@ def start_purge_history(self, room_id, topological_ordering,
self._purges_by_id[purge_id] = PurgeStatus()
run_in_background(
self._purge_history,
purge_id, room_id, topological_ordering, delete_local_events,
purge_id, room_id, token, delete_local_events,
)
return purge_id

@defer.inlineCallbacks
def _purge_history(self, purge_id, room_id, topological_ordering,
def _purge_history(self, purge_id, room_id, token,
delete_local_events):
"""Carry out a history purge on a room.
Args:
purge_id (str): The id for this purge
room_id (str): The room to purge from
topological_ordering (int): minimum topo ordering to preserve
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
Expand All @@ -138,7 +138,7 @@ def _purge_history(self, purge_id, room_id, topological_ordering,
try:
with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(
room_id, topological_ordering, delete_local_events,
room_id, token, delete_local_events,
)
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
Expand Down
17 changes: 10 additions & 7 deletions synapse/rest/client/v1/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ def on_POST(self, request, room_id, event_id):
if event.room_id != room_id:
raise SynapseError(400, "Event is for wrong room.")

depth = event.depth
token = yield self.store.get_topological_token_for_event(event_id)

logger.info(
"[purge] purging up to depth %i (event_id %s)",
depth, event_id,
"[purge] purging up to token %s (event_id %s)",
token, event_id,
)
elif 'purge_up_to_ts' in body:
ts = body['purge_up_to_ts']
Expand All @@ -174,7 +175,9 @@ def on_POST(self, request, room_id, event_id):
)
)
if room_event_after_stream_ordering:
(_, depth, _) = room_event_after_stream_ordering
token = yield self.store.get_topological_token_for_event(
room_event_after_stream_ordering,
)
else:
logger.warn(
"[purge] purging events not possible: No event found "
Expand All @@ -187,9 +190,9 @@ def on_POST(self, request, room_id, event_id):
errcode=Codes.NOT_FOUND,
)
logger.info(
"[purge] purging up to depth %i (received_ts %i => "
"[purge] purging up to token %d (received_ts %i => "
"stream_ordering %i)",
depth, ts, stream_ordering,
token, ts, stream_ordering,
)
else:
raise SynapseError(
Expand All @@ -199,7 +202,7 @@ def on_POST(self, request, room_id, event_id):
)

purge_id = yield self.handlers.message_handler.start_purge_history(
room_id, depth,
room_id, token,
delete_local_events=delete_local_events,
)

Expand Down
77 changes: 53 additions & 24 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.types import get_domain_from_id
from synapse.types import get_domain_from_id, RoomStreamToken
import synapse.metrics

# these are only included to make the type annotations work
Expand Down Expand Up @@ -1803,15 +1803,14 @@ def get_all_new_events_txn(txn):
return self.runInteraction("get_all_new_events", get_all_new_events_txn)

def purge_history(
self, room_id, topological_ordering, delete_local_events,
self, room_id, token, delete_local_events,
):
"""Deletes room history before a certain point
Args:
room_id (str):
topological_ordering (int):
minimum topo ordering to preserve
token (str): A topological token to delete events before
delete_local_events (bool):
if True, we will delete local events as well as remote ones
Expand All @@ -1821,13 +1820,15 @@ def purge_history(

return self.runInteraction(
"purge_history",
self._purge_history_txn, room_id, topological_ordering,
self._purge_history_txn, room_id, token,
delete_local_events,
)

def _purge_history_txn(
self, txn, room_id, topological_ordering, delete_local_events,
self, txn, room_id, token_str, delete_local_events,
):
token = RoomStreamToken.parse(token_str)

# Tables that should be pruned:
# event_auth
# event_backward_extremities
Expand Down Expand Up @@ -1872,6 +1873,13 @@ def _purge_history_txn(
" ON events_to_purge(should_delete)",
)

# We do joins against events_to_purge for e.g. calculating state
# groups to purge, etc., so lets make an index.
txn.execute(
"CREATE INDEX events_to_purge_id"
" ON events_to_purge(event_id)",
)

# First ensure that we're not about to delete all the forward extremeties
txn.execute(
"SELECT e.event_id, e.depth FROM events as e "
Expand All @@ -1884,7 +1892,7 @@ def _purge_history_txn(
rows = txn.fetchall()
max_depth = max(row[0] for row in rows)

if max_depth <= topological_ordering:
if max_depth <= token.topological:
# We need to ensure we don't delete all the events from the datanase
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)
Expand All @@ -1900,7 +1908,7 @@ def _purge_history_txn(
should_delete_expr += " AND event_id NOT LIKE ?"
should_delete_params += ("%:" + self.hs.hostname, )

should_delete_params += (room_id, topological_ordering)
should_delete_params += (room_id, token.topological)

txn.execute(
"INSERT INTO events_to_purge"
Expand All @@ -1923,13 +1931,13 @@ def _purge_history_txn(
logger.info("[purge] Finding new backward extremities")

# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
# events to be purged that are pointed to by events we're not going to
# purge.
txn.execute(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment on this doesn't really seem to be true any more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How so?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged

we now look for events we are about to purge, which point to events we are not going to purge. it's completely different.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, its different in terms that we're not returning the event that points to purged events but the purged events that we get pointed at, yes. (And this isn't anything to do with the change in the sql, and was wrong before)

"SELECT DISTINCT e.event_id FROM events_to_purge AS e"
" INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
" INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
" WHERE e2.topological_ordering >= ?",
(topological_ordering, )
" LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id"
" WHERE ep2.event_id IS NULL",
)
new_backwards_extrems = txn.fetchall()

Expand All @@ -1953,16 +1961,22 @@ def _purge_history_txn(

# Get all state groups that are only referenced by events that are
# to be deleted.
txn.execute(
"SELECT state_group FROM event_to_state_groups"
" INNER JOIN events USING (event_id)"
" WHERE state_group IN ("
" SELECT DISTINCT state_group FROM events_to_purge"
" INNER JOIN event_to_state_groups USING (event_id)"
" )"
" GROUP BY state_group HAVING MAX(topological_ordering) < ?",
(topological_ordering, )
)
# This works by first getting state groups that we may want to delete,
# joining against event_to_state_groups to get events that use that
# state group, then left joining against events_to_purge again. Any
# state group where the left join produce *no nulls* are referenced
# only by events that are going to be purged.
txn.execute("""
SELECT state_group FROM
(
SELECT DISTINCT state_group FROM events_to_purge
INNER JOIN event_to_state_groups USING (event_id)
) AS sp
INNER JOIN event_to_state_groups USING (state_group)
LEFT JOIN events_to_purge AS ep USING (event_id)
GROUP BY state_group
HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
""")

state_rows = txn.fetchall()
logger.info("[purge] found %i redundant state groups", len(state_rows))
Expand Down Expand Up @@ -2109,10 +2123,25 @@ def _purge_history_txn(
#
# So, let's stick it at the end so that we don't block event
# persistence.
logger.info("[purge] updating room_depth")
#
# We do this by calculating the minimum depth of the backwards
# extremities. However, the events in event_backward_extremities
# are ones we don't have yet so we need to look at the events that
# point to it via event_edges table.
txn.execute("""
SELECT COALESCE(MIN(depth), 0)
FROM event_backward_extremities AS eb
INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id
INNER JOIN events AS e ON e.event_id = eg.event_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this looking for the prev_event before the extremity? which we have likely just purged?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking at the events that point to the extremity, as we don't actually have the extremity events.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(And I think the check is the right way round?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh right. ehm. I'm not convinced this is going to calculate the right thing in the presence of disconnected bits of dag.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each disconnected part of the DAG will have their backwards extremities in the event_backward_extremities table, so the event with the minimum depth must be one of them, no?

WHERE eb.room_id = ?
""", (room_id,))
min_depth, = txn.fetchone()

logger.info("[purge] updating room_depth to %d", min_depth)

txn.execute(
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
(topological_ordering, room_id,)
(min_depth, room_id,)
)

# finally, drop the temp table. this will commit the txn in sqlite,
Expand Down