Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Implement purge_history by timestamp #2946

Merged
merged 1 commit into from
Mar 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions docs/admin_api/purge_history_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ Depending on the amount of history being purged a call to the API may take
several minutes or longer. During this period users will not be able to
paginate further back in the room from the point being purged from.

The API is simply:
The API is:

``POST /_matrix/client/r0/admin/purge_history/<room_id>/<event_id>``
``POST /_matrix/client/r0/admin/purge_history/<room_id>[/<event_id>]``

including an ``access_token`` of a server admin.

Expand All @@ -25,3 +25,10 @@ To delete local events as well, set ``delete_local_events`` in the body:
{
"delete_local_events": true
}

The caller must specify the point in the room to purge up to. This can be
specified by including an event_id in the URI, or by setting a
``purge_up_to_event_id`` or ``purge_up_to_ts`` in the request body. If an event
id is given, that event (and others at the same graph depth) will be retained.
If ``purge_up_to_ts`` is given, it should be a timestamp since the unix epoch,
in milliseconds.
14 changes: 5 additions & 9 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,12 @@ def __init__(self, hs):
self.pagination_lock = ReadWriteLock()

@defer.inlineCallbacks
def purge_history(self, room_id, event_id, delete_local_events=False):
event = yield self.store.get_event(event_id)

if event.room_id != room_id:
raise SynapseError(400, "Event is for wrong room.")

depth = event.depth

def purge_history(self, room_id, topological_ordering,
delete_local_events=False):
with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(room_id, depth, delete_local_events)
yield self.store.purge_history(
room_id, topological_ordering, delete_local_events,
)

@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
Expand Down
58 changes: 55 additions & 3 deletions synapse/rest/client/v1/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from twisted.internet import defer

from synapse.api.constants import Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.api.errors import AuthError, SynapseError, Codes
from synapse.types import UserID, create_requester
from synapse.http.servlet import parse_json_object_from_request

Expand Down Expand Up @@ -114,12 +114,18 @@ def on_POST(self, request):

class PurgeHistoryRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns(
"/admin/purge_history/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
"/admin/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
)

def __init__(self, hs):
"""

Args:
hs (synapse.server.HomeServer)
"""
super(PurgeHistoryRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers()
self.store = hs.get_datastore()

@defer.inlineCallbacks
def on_POST(self, request, room_id, event_id):
Expand All @@ -133,8 +139,54 @@ def on_POST(self, request, room_id, event_id):

delete_local_events = bool(body.get("delete_local_events", False))

# establish the topological ordering we should keep events from. The
# user can provide an event_id in the URL or the request body, or can
# provide a timestamp in the request body.
if event_id is None:
event_id = body.get('purge_up_to_event_id')

if event_id is not None:
event = yield self.store.get_event(event_id)

if event.room_id != room_id:
raise SynapseError(400, "Event is for wrong room.")

depth = event.depth
logger.info(
"[purge] purging up to depth %i (event_id %s)",
depth, event_id,
)
elif 'purge_up_to_ts' in body:
ts = body['purge_up_to_ts']
if not isinstance(ts, int):
raise SynapseError(
400, "purge_up_to_ts must be an int",
errcode=Codes.BAD_JSON,
)

stream_ordering = (
yield self.store.find_first_stream_ordering_after_ts(ts)
)

(_, depth, _) = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(The parentheses on the left are redundant)

yield self.store.get_room_event_after_stream_ordering(
room_id, stream_ordering,
)
)
logger.info(
"[purge] purging up to depth %i (received_ts %i => "
"stream_ordering %i)",
depth, ts, stream_ordering,
)
else:
raise SynapseError(
400,
"must specify purge_up_to_event_id or purge_up_to_ts",
errcode=Codes.BAD_JSON,
)

yield self.handlers.message_handler.purge_history(
room_id, event_id,
room_id, depth,
delete_local_events=delete_local_events,
)

Expand Down
27 changes: 27 additions & 0 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,33 @@ def get_recent_events_for_room_txn(txn):
"get_recent_events_for_room", get_recent_events_for_room_txn
)

def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or after a stream ordering

Args:
room_id (str):
stream_ordering (int):

Returns:
Deferred[(int, int, str)]:
(stream ordering, topological ordering, event_id)
"""
def _f(txn):
sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
" WHERE room_id = ? AND stream_ordering >= ?"
" AND NOT outlier"
" ORDER BY stream_ordering"
" LIMIT 1"
)
txn.execute(sql, (room_id, stream_ordering, ))
return txn.fetchone()

return self.runInteraction(
"get_room_event_after_stream_ordering", _f,
)

@defer.inlineCallbacks
def get_room_events_max_id(self, room_id=None):
"""Returns the current token for rooms stream.
Expand Down