Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Use the new room_retention table in purge jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
babolivier committed Aug 2, 2019
1 parent d071ba4 commit dfb6fc5
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 59 deletions.
73 changes: 39 additions & 34 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,49 +92,54 @@ def __init__(self, hs):

@defer.inlineCallbacks
def purge_history_for_rooms_in_range(self, min_s, max_s):
retention_in_rooms = yield self.store.get_retention_periods_for_rooms()
# If a room lacks a max_lifetime, we consider it equal to one defined in the
# server's configuration, therefore we include these rooms if the server's
# config's max_lifetime is in the provided range.
include_null = (min_s < self.config.retention_max_lifetime <= max_s)
rooms = yield self.store.get_rooms_for_retention_period_in_range(
min_s, max_s, include_null
)

for room in retention_in_rooms:
if min_s <= room["max_lifetime"] <= max_s:
room_id = room["room_id"]
ts = self.clock.time_msec() - (room["max_lifetime"] * 1000)
for room in rooms:
room_id = room["room_id"]
ts = self.clock.time_msec() - (room["max_lifetime"] * 1000)

stream_ordering = (
yield self.store.find_first_stream_ordering_after_ts(ts)
)
stream_ordering = (
yield self.store.find_first_stream_ordering_after_ts(ts)
)

r = (
yield self.store.get_room_event_after_stream_ordering(
room_id, stream_ordering,
)
r = (
yield self.store.get_room_event_after_stream_ordering(
room_id, stream_ordering,
)
)
if not r:
logger.warning(
"[purge] purging events not possible: No event found "
"(ts %i => stream_ordering %i)",
ts, stream_ordering,
)
if not r:
logger.warning(
"[purge] purging events not possible: No event found "
"(ts %i => stream_ordering %i)",
ts, stream_ordering,
)

(stream, topo, _event_id) = r
token = "t%d-%d" % (topo, stream)
(stream, topo, _event_id) = r
token = "t%d-%d" % (topo, stream)

if room_id in self._purges_in_progress_by_room:
logger.warning(
"[purge] not purging room %s as there's an ongoing purge running"
" for this room",
room_id,
)
continue
if room_id in self._purges_in_progress_by_room:
logger.warning(
"[purge] not purging room %s as there's an ongoing purge running"
" for this room",
room_id,
)
continue

purge_id = random_string(16)
purge_id = random_string(16)

logger.info("Starting purging events in room %s", room_id)
logger.info("Starting purging events in room %s", room_id)

# We want to purge everything, including local events.
run_in_background(
self._purge_history,
purge_id, room_id, token, True,
)
# We want to purge everything, including local events.
run_in_background(
self._purge_history,
purge_id, room_id, token, True,
)

def start_purge_history(self, room_id, token,
delete_local_events=False):
Expand Down
43 changes: 18 additions & 25 deletions synapse/storage/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ class RoomStore(RoomWorkerStore, SearchStore):
def __init__(self, db_conn, hs):
super(RoomStore, self).__init__(db_conn, hs)

self.config = hs.config

self.register_background_update_handler(
"users_set_deactivated_flag", self._background_insert_retention,
)
Expand Down Expand Up @@ -735,38 +737,29 @@ def _get_media_mxcs_in_room_txn(self, txn, room_id):
return local_media_mxcs, remote_media_mxcs

@defer.inlineCallbacks
def get_retention_periods_for_rooms(self):
def get_rooms_for_retention_period_in_range(self, min_s, max_s, include_null=False):

def get_retention_periods_for_rooms_txn(txn):
def get_rooms_for_retention_period_in_range_txn(txn):
sql = (
"SELECT s.room_id, e.json FROM current_state_events AS s"
" LEFT JOIN event_json AS e ON (s.event_id = e.event_id)"
" WHERE s.type = 'im.vector.retention'"
"SELECT room_id, min_lifetime, max_lifetime FROM room_retention"
" WHERE ("
" max_lifetime > ?"
" AND max_lifetime <= ?"
" )"
)

txn.execute(sql)

rows = self.cursor_to_dict(txn)

rooms = []

for row in rows:
eventJSON = json.loads(row["json"])
if include_null:
sql += " OR max_lifetime IS NULL"

# Ignore invalid events.
if "max_lifetime" not in eventJSON.get("content", {}):
continue
txn.execute(sql, (min_s, max_s))

rooms.append({
"room_id": row["room_id"],
"max_lifetime": eventJSON["content"]["max_lifetime"],
})
rows = self.cursor_to_dict(txn)

return rooms
return rows

retention_periods = yield self.runInteraction(
"get_rooms_for_retention_period_range",
get_retention_periods_for_rooms_txn,
rooms = yield self.runInteraction(
"get_rooms_for_retention_period_in_range",
get_rooms_for_retention_period_in_range_txn,
)

defer.returnValue(retention_periods)
defer.returnValue(rooms)

0 comments on commit dfb6fc5

Please sign in to comment.