Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3221 from matrix-org/erikj/purge_token
Browse files Browse the repository at this point in the history
Make purge_history operate on tokens
  • Loading branch information
erikjohnston authored May 18, 2018
2 parents f8a1e76 + 680530c commit fa30ac3
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 37 deletions.
12 changes: 6 additions & 6 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ def __init__(self, hs):
# map from purge id to PurgeStatus
self._purges_by_id = {}

def start_purge_history(self, room_id, topological_ordering,
def start_purge_history(self, room_id, token,
delete_local_events=False):
"""Start off a history purge on a room.
Args:
room_id (str): The room to purge from
topological_ordering (int): minimum topo ordering to preserve
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
Expand All @@ -115,19 +115,19 @@ def start_purge_history(self, room_id, topological_ordering,
self._purges_by_id[purge_id] = PurgeStatus()
run_in_background(
self._purge_history,
purge_id, room_id, topological_ordering, delete_local_events,
purge_id, room_id, token, delete_local_events,
)
return purge_id

@defer.inlineCallbacks
def _purge_history(self, purge_id, room_id, topological_ordering,
def _purge_history(self, purge_id, room_id, token,
delete_local_events):
"""Carry out a history purge on a room.
Args:
purge_id (str): The id for this purge
room_id (str): The room to purge from
topological_ordering (int): minimum topo ordering to preserve
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
Expand All @@ -138,7 +138,7 @@ def _purge_history(self, purge_id, room_id, topological_ordering,
try:
with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(
room_id, topological_ordering, delete_local_events,
room_id, token, delete_local_events,
)
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
Expand Down
17 changes: 10 additions & 7 deletions synapse/rest/client/v1/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ def on_POST(self, request, room_id, event_id):
if event.room_id != room_id:
raise SynapseError(400, "Event is for wrong room.")

depth = event.depth
token = yield self.store.get_topological_token_for_event(event_id)

logger.info(
"[purge] purging up to depth %i (event_id %s)",
depth, event_id,
"[purge] purging up to token %s (event_id %s)",
token, event_id,
)
elif 'purge_up_to_ts' in body:
ts = body['purge_up_to_ts']
Expand All @@ -174,7 +175,9 @@ def on_POST(self, request, room_id, event_id):
)
)
if room_event_after_stream_ordering:
(_, depth, _) = room_event_after_stream_ordering
token = yield self.store.get_topological_token_for_event(
room_event_after_stream_ordering,
)
else:
logger.warn(
"[purge] purging events not possible: No event found "
Expand All @@ -187,9 +190,9 @@ def on_POST(self, request, room_id, event_id):
errcode=Codes.NOT_FOUND,
)
logger.info(
"[purge] purging up to depth %i (received_ts %i => "
"[purge] purging up to token %d (received_ts %i => "
"stream_ordering %i)",
depth, ts, stream_ordering,
token, ts, stream_ordering,
)
else:
raise SynapseError(
Expand All @@ -199,7 +202,7 @@ def on_POST(self, request, room_id, event_id):
)

purge_id = yield self.handlers.message_handler.start_purge_history(
room_id, depth,
room_id, token,
delete_local_events=delete_local_events,
)

Expand Down
77 changes: 53 additions & 24 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.types import get_domain_from_id
from synapse.types import get_domain_from_id, RoomStreamToken
import synapse.metrics

# these are only included to make the type annotations work
Expand Down Expand Up @@ -1803,15 +1803,14 @@ def get_all_new_events_txn(txn):
return self.runInteraction("get_all_new_events", get_all_new_events_txn)

def purge_history(
self, room_id, topological_ordering, delete_local_events,
self, room_id, token, delete_local_events,
):
"""Deletes room history before a certain point
Args:
room_id (str):
topological_ordering (int):
minimum topo ordering to preserve
token (str): A topological token to delete events before
delete_local_events (bool):
if True, we will delete local events as well as remote ones
Expand All @@ -1821,13 +1820,15 @@ def purge_history(

return self.runInteraction(
"purge_history",
self._purge_history_txn, room_id, topological_ordering,
self._purge_history_txn, room_id, token,
delete_local_events,
)

def _purge_history_txn(
self, txn, room_id, topological_ordering, delete_local_events,
self, txn, room_id, token_str, delete_local_events,
):
token = RoomStreamToken.parse(token_str)

# Tables that should be pruned:
# event_auth
# event_backward_extremities
Expand Down Expand Up @@ -1872,6 +1873,13 @@ def _purge_history_txn(
" ON events_to_purge(should_delete)",
)

# We do joins against events_to_purge for e.g. calculating state
# groups to purge, etc., so lets make an index.
txn.execute(
"CREATE INDEX events_to_purge_id"
" ON events_to_purge(event_id)",
)

# First ensure that we're not about to delete all the forward extremeties
txn.execute(
"SELECT e.event_id, e.depth FROM events as e "
Expand All @@ -1884,7 +1892,7 @@ def _purge_history_txn(
rows = txn.fetchall()
max_depth = max(row[0] for row in rows)

if max_depth <= topological_ordering:
if max_depth <= token.topological:
# We need to ensure we don't delete all the events from the datanase
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)
Expand All @@ -1900,7 +1908,7 @@ def _purge_history_txn(
should_delete_expr += " AND event_id NOT LIKE ?"
should_delete_params += ("%:" + self.hs.hostname, )

should_delete_params += (room_id, topological_ordering)
should_delete_params += (room_id, token.topological)

txn.execute(
"INSERT INTO events_to_purge"
Expand All @@ -1923,13 +1931,13 @@ def _purge_history_txn(
logger.info("[purge] Finding new backward extremities")

# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
# events to be purged that are pointed to by events we're not going to
# purge.
txn.execute(
"SELECT DISTINCT e.event_id FROM events_to_purge AS e"
" INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
" INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
" WHERE e2.topological_ordering >= ?",
(topological_ordering, )
" LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id"
" WHERE ep2.event_id IS NULL",
)
new_backwards_extrems = txn.fetchall()

Expand All @@ -1953,16 +1961,22 @@ def _purge_history_txn(

# Get all state groups that are only referenced by events that are
# to be deleted.
txn.execute(
"SELECT state_group FROM event_to_state_groups"
" INNER JOIN events USING (event_id)"
" WHERE state_group IN ("
" SELECT DISTINCT state_group FROM events_to_purge"
" INNER JOIN event_to_state_groups USING (event_id)"
" )"
" GROUP BY state_group HAVING MAX(topological_ordering) < ?",
(topological_ordering, )
)
# This works by first getting state groups that we may want to delete,
# joining against event_to_state_groups to get events that use that
# state group, then left joining against events_to_purge again. Any
# state group where the left join produce *no nulls* are referenced
# only by events that are going to be purged.
txn.execute("""
SELECT state_group FROM
(
SELECT DISTINCT state_group FROM events_to_purge
INNER JOIN event_to_state_groups USING (event_id)
) AS sp
INNER JOIN event_to_state_groups USING (state_group)
LEFT JOIN events_to_purge AS ep USING (event_id)
GROUP BY state_group
HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
""")

state_rows = txn.fetchall()
logger.info("[purge] found %i redundant state groups", len(state_rows))
Expand Down Expand Up @@ -2109,10 +2123,25 @@ def _purge_history_txn(
#
# So, let's stick it at the end so that we don't block event
# persistence.
logger.info("[purge] updating room_depth")
#
# We do this by calculating the minimum depth of the backwards
# extremities. However, the events in event_backward_extremities
# are ones we don't have yet so we need to look at the events that
# point to it via event_edges table.
txn.execute("""
SELECT COALESCE(MIN(depth), 0)
FROM event_backward_extremities AS eb
INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id
INNER JOIN events AS e ON e.event_id = eg.event_id
WHERE eb.room_id = ?
""", (room_id,))
min_depth, = txn.fetchone()

logger.info("[purge] updating room_depth to %d", min_depth)

txn.execute(
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
(topological_ordering, room_id,)
(min_depth, room_id,)
)

# finally, drop the temp table. this will commit the txn in sqlite,
Expand Down

0 comments on commit fa30ac3

Please sign in to comment.