Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Aggregate event push actions #1916

Merged
merged 2 commits into from
Feb 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ def __init__(self, db_conn, hs):
get_unread_event_push_actions_by_room_for_user = (
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
)
_get_unread_counts_by_receipt_txn = (
DataStore._get_unread_counts_by_receipt_txn.__func__
)
_get_unread_counts_by_pos_txn = (
DataStore._get_unread_counts_by_pos_txn.__func__
)
_get_state_group_for_events = (
StateStore.__dict__["_get_state_group_for_events"]
)
Expand Down
267 changes: 210 additions & 57 deletions synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from ._base import SQLBaseStore
from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.types import RoomStreamToken
from .stream import lower_bound
Expand All @@ -29,7 +30,6 @@ class EventPushActionsStore(SQLBaseStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"

def __init__(self, hs):
self.stream_ordering_month_ago = None
super(EventPushActionsStore, self).__init__(hs)

self.register_background_index_update(
Expand All @@ -47,6 +47,9 @@ def __init__(self, hs):
where_clause="highlight=1"
)

self._doing_notif_rotation = False
self._clock.looping_call(self._rotate_notifs, 30 * 60 * 1000)

def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
Args:
Expand Down Expand Up @@ -77,66 +80,89 @@ def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
):
def _get_unread_event_push_actions_by_room(txn):
sql = (
"SELECT stream_ordering, topological_ordering"
" FROM events"
" WHERE room_id = ? AND event_id = ?"
)
txn.execute(
sql, (room_id, last_read_event_id)
)
results = txn.fetchall()
if len(results) == 0:
return {"notify_count": 0, "highlight_count": 0}

stream_ordering = results[0][0]
topological_ordering = results[0][1]
token = RoomStreamToken(
topological_ordering, stream_ordering
)

# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
# notif=1
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)
ret = yield self.runInteraction(
"get_unread_event_push_actions_by_room",
self._get_unread_counts_by_receipt_txn,
room_id, user_id, last_read_event_id
)
defer.returnValue(ret)

txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
notify_count = row[0] if row else 0
def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id,
last_read_event_id):
sql = (
"SELECT stream_ordering, topological_ordering"
" FROM events"
" WHERE room_id = ? AND event_id = ?"
)
txn.execute(
sql, (room_id, last_read_event_id)
)
results = txn.fetchall()
if len(results) == 0:
return {"notify_count": 0, "highlight_count": 0}

# Now get the number of highlights
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" highlight = 1"
" AND user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)
stream_ordering = results[0][0]
topological_ordering = results[0][1]

txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
highlight_count = row[0] if row else 0
return self._get_unread_counts_by_pos_txn(
txn, room_id, user_id, topological_ordering, stream_ordering
)

return {
"notify_count": notify_count,
"highlight_count": highlight_count,
}
def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering,
stream_ordering):
token = RoomStreamToken(
topological_ordering, stream_ordering
)

ret = yield self.runInteraction(
"get_unread_event_push_actions_by_room",
_get_unread_event_push_actions_by_room
# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
# notif=1
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)

txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
notify_count = row[0] if row else 0

summary_notif_count = self._simple_select_one_onecol_txn(
txn,
table="event_push_summary",
keyvalues={
"user_id": user_id,
"room_id": room_id,
},
retcol="notif_count",
allow_none=True,
)
defer.returnValue(ret)

if summary_notif_count:
notify_count += summary_notif_count

# Now get the number of highlights
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" highlight = 1"
" AND user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)

txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
highlight_count = row[0] if row else 0

return {
"notify_count": notify_count,
"highlight_count": highlight_count,
}

@defer.inlineCallbacks
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
Expand Down Expand Up @@ -448,7 +474,7 @@ def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
)

def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
topological_ordering):
topological_ordering, stream_ordering):
"""
Purges old push actions for a user and room before a given
topological_ordering.
Expand Down Expand Up @@ -479,11 +505,16 @@ def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
txn.execute(
"DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND "
" topological_ordering < ?"
" topological_ordering <= ?"
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
)

txn.execute("""
DELETE FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""", (room_id, user_id, stream_ordering))

@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
yield self.runInteraction(
Expand All @@ -500,6 +531,14 @@ def _find_stream_orderings_for_times_txn(self, txn):
"Found stream ordering 1 month ago: it's %d",
self.stream_ordering_month_ago
)
logger.info("Searching for stream ordering 1 day ago")
self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
)
logger.info(
"Found stream ordering 1 day ago: it's %d",
self.stream_ordering_day_ago
)

def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
"""
Expand Down Expand Up @@ -539,6 +578,120 @@ def _find_first_stream_ordering_after_ts_txn(self, txn, ts):

return range_end

@defer.inlineCallbacks
def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
return
self._doing_notif_rotation = True

try:
while True:
logger.info("Rotating notifications")

caught_up = yield self.runInteraction(
"_rotate_notifs",
self._rotate_notifs_txn
)
if caught_up:
break
yield sleep(5)
finally:
self._doing_notif_rotation = False

def _rotate_notifs_txn(self, txn):
"""Archives older notifications into event_push_summary. Returns whether
the archiving process has caught up or not.
"""

# We want to make sure that we only ever do this one at a time
self.database_engine.lock_table(txn, "event_push_summary")

# We don't to try and rotate millions of rows at once, so we cap the
# maximum stream ordering we'll rotate before.
txn.execute("""
SELECT stream_ordering FROM event_push_actions
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
""")
stream_row = txn.fetchone()
if stream_row:
offset_stream_ordering, = stream_row
rotate_to_stream_ordering = min(
self.stream_ordering_day_ago, offset_stream_ordering
)
caught_up = offset_stream_ordering >= self.stream_ordering_day_ago
else:
rotate_to_stream_ordering = self.stream_ordering_day_ago
caught_up = True

self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)

# We have caught up iff we were limited by `stream_ordering_day_ago`
return caught_up

def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)

# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id,
coalesce(old.notif_count, 0) + upd.notif_count,
upd.stream_ordering,
old.user_id
FROM (
SELECT user_id, room_id, count(*) as notif_count,
max(stream_ordering) as stream_ordering
FROM event_push_actions
WHERE ? <= stream_ordering AND stream_ordering < ?
AND highlight = 0
GROUP BY user_id, room_id
) AS upd
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
"""

txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,))
rows = txn.fetchall()

# If the `old.user_id` above is NULL then we know there isn't already an
# entry in the table, so we simply insert it. Otherwise we update the
# existing table.
self._simple_insert_many_txn(
txn,
table="event_push_summary",
values=[
{
"user_id": row[0],
"room_id": row[1],
"notif_count": row[2],
"stream_ordering": row[3],
}
for row in rows if row[4] is None
]
)

txn.executemany(
"""
UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
WHERE user_id = ? AND room_id = ?
""",
((row[2], row[3], row[0], row[1],) for row in rows if row[4] is not None)
)

txn.execute(
"DELETE FROM event_push_actions"
" WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0",
(old_rotate_stream_ordering, rotate_to_stream_ordering,)
)

txn.execute(
"UPDATE event_push_summary_stream_ordering SET stream_ordering = ?",
(rotate_to_stream_ordering,)
)


def _action_has_highlight(actions):
for action in actions:
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
room_id=room_id,
user_id=user_id,
topological_ordering=topological_ordering,
stream_ordering=stream_ordering,
)

return True
Expand Down
37 changes: 37 additions & 0 deletions synapse/storage/schema/delta/40/event_push_summary.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/* Copyright 2017 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Aggregate of old notification counts that have been deleted out of the
-- main event_push_actions table. This count does not include those that were
-- highlights, as they remain in the event_push_actions table.
CREATE TABLE event_push_summary (
user_id TEXT NOT NULL,
room_id TEXT NOT NULL,
notif_count BIGINT NOT NULL,
stream_ordering BIGINT NOT NULL
);

CREATE INDEX event_push_summary_user_rm ON event_push_summary(user_id, room_id);


-- The stream ordering up to which we have aggregated the event_push_actions
-- table into event_push_summary
CREATE TABLE event_push_summary_stream_ordering (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_ordering BIGINT NOT NULL,
CHECK (Lock='X')
);

INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0);
Loading