Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1916 from matrix-org/erikj/push_actions_delete
Browse files Browse the repository at this point in the history
Aggregate event push actions
  • Loading branch information
erikjohnston authored Feb 16, 2017
2 parents 7dcbcca + ce3c8df commit 04eca25
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 57 deletions.
6 changes: 6 additions & 0 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ def __init__(self, db_conn, hs):
get_unread_event_push_actions_by_room_for_user = (
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
)
_get_unread_counts_by_receipt_txn = (
DataStore._get_unread_counts_by_receipt_txn.__func__
)
_get_unread_counts_by_pos_txn = (
DataStore._get_unread_counts_by_pos_txn.__func__
)
_get_state_group_for_events = (
StateStore.__dict__["_get_state_group_for_events"]
)
Expand Down
267 changes: 210 additions & 57 deletions synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from ._base import SQLBaseStore
from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.types import RoomStreamToken
from .stream import lower_bound
Expand All @@ -29,7 +30,6 @@ class EventPushActionsStore(SQLBaseStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"

def __init__(self, hs):
self.stream_ordering_month_ago = None
super(EventPushActionsStore, self).__init__(hs)

self.register_background_index_update(
Expand All @@ -47,6 +47,9 @@ def __init__(self, hs):
where_clause="highlight=1"
)

self._doing_notif_rotation = False
self._clock.looping_call(self._rotate_notifs, 30 * 60 * 1000)

def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
Args:
Expand Down Expand Up @@ -77,66 +80,89 @@ def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
):
def _get_unread_event_push_actions_by_room(txn):
sql = (
"SELECT stream_ordering, topological_ordering"
" FROM events"
" WHERE room_id = ? AND event_id = ?"
)
txn.execute(
sql, (room_id, last_read_event_id)
)
results = txn.fetchall()
if len(results) == 0:
return {"notify_count": 0, "highlight_count": 0}

stream_ordering = results[0][0]
topological_ordering = results[0][1]
token = RoomStreamToken(
topological_ordering, stream_ordering
)

# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
# notif=1
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)
ret = yield self.runInteraction(
"get_unread_event_push_actions_by_room",
self._get_unread_counts_by_receipt_txn,
room_id, user_id, last_read_event_id
)
defer.returnValue(ret)

txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
notify_count = row[0] if row else 0
def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id,
last_read_event_id):
sql = (
"SELECT stream_ordering, topological_ordering"
" FROM events"
" WHERE room_id = ? AND event_id = ?"
)
txn.execute(
sql, (room_id, last_read_event_id)
)
results = txn.fetchall()
if len(results) == 0:
return {"notify_count": 0, "highlight_count": 0}

# Now get the number of highlights
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" highlight = 1"
" AND user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)
stream_ordering = results[0][0]
topological_ordering = results[0][1]

txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
highlight_count = row[0] if row else 0
return self._get_unread_counts_by_pos_txn(
txn, room_id, user_id, topological_ordering, stream_ordering
)

return {
"notify_count": notify_count,
"highlight_count": highlight_count,
}
def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering,
stream_ordering):
token = RoomStreamToken(
topological_ordering, stream_ordering
)

ret = yield self.runInteraction(
"get_unread_event_push_actions_by_room",
_get_unread_event_push_actions_by_room
# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
# notif=1
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)

txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
notify_count = row[0] if row else 0

summary_notif_count = self._simple_select_one_onecol_txn(
txn,
table="event_push_summary",
keyvalues={
"user_id": user_id,
"room_id": room_id,
},
retcol="notif_count",
allow_none=True,
)
defer.returnValue(ret)

if summary_notif_count:
notify_count += summary_notif_count

# Now get the number of highlights
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" highlight = 1"
" AND user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)

txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
highlight_count = row[0] if row else 0

return {
"notify_count": notify_count,
"highlight_count": highlight_count,
}

@defer.inlineCallbacks
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
Expand Down Expand Up @@ -448,7 +474,7 @@ def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
)

def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
topological_ordering):
topological_ordering, stream_ordering):
"""
Purges old push actions for a user and room before a given
topological_ordering.
Expand Down Expand Up @@ -479,11 +505,16 @@ def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
txn.execute(
"DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND "
" topological_ordering < ?"
" topological_ordering <= ?"
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
)

txn.execute("""
DELETE FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""", (room_id, user_id, stream_ordering))

@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
yield self.runInteraction(
Expand All @@ -500,6 +531,14 @@ def _find_stream_orderings_for_times_txn(self, txn):
"Found stream ordering 1 month ago: it's %d",
self.stream_ordering_month_ago
)
logger.info("Searching for stream ordering 1 day ago")
self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
)
logger.info(
"Found stream ordering 1 day ago: it's %d",
self.stream_ordering_day_ago
)

def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
"""
Expand Down Expand Up @@ -539,6 +578,120 @@ def _find_first_stream_ordering_after_ts_txn(self, txn, ts):

return range_end

@defer.inlineCallbacks
def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
return
self._doing_notif_rotation = True

try:
while True:
logger.info("Rotating notifications")

caught_up = yield self.runInteraction(
"_rotate_notifs",
self._rotate_notifs_txn
)
if caught_up:
break
yield sleep(5)
finally:
self._doing_notif_rotation = False

def _rotate_notifs_txn(self, txn):
"""Archives older notifications into event_push_summary. Returns whether
the archiving process has caught up or not.
"""

# We want to make sure that we only ever do this one at a time
self.database_engine.lock_table(txn, "event_push_summary")

# We don't to try and rotate millions of rows at once, so we cap the
# maximum stream ordering we'll rotate before.
txn.execute("""
SELECT stream_ordering FROM event_push_actions
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
""")
stream_row = txn.fetchone()
if stream_row:
offset_stream_ordering, = stream_row
rotate_to_stream_ordering = min(
self.stream_ordering_day_ago, offset_stream_ordering
)
caught_up = offset_stream_ordering >= self.stream_ordering_day_ago
else:
rotate_to_stream_ordering = self.stream_ordering_day_ago
caught_up = True

self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)

# We have caught up iff we were limited by `stream_ordering_day_ago`
return caught_up

def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)

# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id,
coalesce(old.notif_count, 0) + upd.notif_count,
upd.stream_ordering,
old.user_id
FROM (
SELECT user_id, room_id, count(*) as notif_count,
max(stream_ordering) as stream_ordering
FROM event_push_actions
WHERE ? <= stream_ordering AND stream_ordering < ?
AND highlight = 0
GROUP BY user_id, room_id
) AS upd
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
"""

txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,))
rows = txn.fetchall()

# If the `old.user_id` above is NULL then we know there isn't already an
# entry in the table, so we simply insert it. Otherwise we update the
# existing table.
self._simple_insert_many_txn(
txn,
table="event_push_summary",
values=[
{
"user_id": row[0],
"room_id": row[1],
"notif_count": row[2],
"stream_ordering": row[3],
}
for row in rows if row[4] is None
]
)

txn.executemany(
"""
UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
WHERE user_id = ? AND room_id = ?
""",
((row[2], row[3], row[0], row[1],) for row in rows if row[4] is not None)
)

txn.execute(
"DELETE FROM event_push_actions"
" WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0",
(old_rotate_stream_ordering, rotate_to_stream_ordering,)
)

txn.execute(
"UPDATE event_push_summary_stream_ordering SET stream_ordering = ?",
(rotate_to_stream_ordering,)
)


def _action_has_highlight(actions):
for action in actions:
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
room_id=room_id,
user_id=user_id,
topological_ordering=topological_ordering,
stream_ordering=stream_ordering,
)

return True
Expand Down
37 changes: 37 additions & 0 deletions synapse/storage/schema/delta/40/event_push_summary.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/* Copyright 2017 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Aggregate of old notification counts that have been deleted out of the
-- main event_push_actions table. This count does not include those that were
-- highlights, as they remain in the event_push_actions table.
CREATE TABLE event_push_summary (
user_id TEXT NOT NULL,
room_id TEXT NOT NULL,
notif_count BIGINT NOT NULL,
stream_ordering BIGINT NOT NULL
);

CREATE INDEX event_push_summary_user_rm ON event_push_summary(user_id, room_id);


-- The stream ordering up to which we have aggregated the event_push_actions
-- table into event_push_summary
CREATE TABLE event_push_summary_stream_ordering (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_ordering BIGINT NOT NULL,
CHECK (Lock='X')
);

INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0);
Loading

0 comments on commit 04eca25

Please sign in to comment.