Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2900 from matrix-org/erikj/split_event_push_actions
Browse files Browse the repository at this point in the history
Split out EventPushActionWorkerStore
  • Loading branch information
erikjohnston authored Feb 23, 2018
2 parents 50e8657 + a90c609 commit 2ec4982
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 91 deletions.
23 changes: 3 additions & 20 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from synapse.api.constants import EventTypes
from synapse.storage import DataStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.event_push_actions import EventPushActionsStore
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.roommember import RoomMemberStore
from synapse.storage.state import StateGroupWorkerStore
Expand All @@ -40,7 +40,8 @@
# the method descriptor on the DataStore and chuck them into our class.


class SlavedEventStore(EventsWorkerStore, StateGroupWorkerStore, BaseSlavedStore):
class SlavedEventStore(EventPushActionsWorkerStore, EventsWorkerStore,
StateGroupWorkerStore, BaseSlavedStore):

def __init__(self, db_conn, hs):
super(SlavedEventStore, self).__init__(db_conn, hs)
Expand Down Expand Up @@ -82,30 +83,12 @@ def __init__(self, db_conn, hs):
get_invited_rooms_for_user = RoomMemberStore.__dict__[
"get_invited_rooms_for_user"
]
get_unread_event_push_actions_by_room_for_user = (
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
)
_get_unread_counts_by_receipt_txn = (
DataStore._get_unread_counts_by_receipt_txn.__func__
)
_get_unread_counts_by_pos_txn = (
DataStore._get_unread_counts_by_pos_txn.__func__
)
get_recent_event_ids_for_room = (
StreamStore.__dict__["get_recent_event_ids_for_room"]
)
_get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
has_room_changed_since = DataStore.has_room_changed_since.__func__

get_unread_push_actions_for_user_in_range_for_http = (
DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
)
get_unread_push_actions_for_user_in_range_for_email = (
DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
)
get_push_action_users_in_range = (
DataStore.get_push_action_users_in_range.__func__
)
get_rooms_for_user_where_membership_is = (
DataStore.get_rooms_for_user_where_membership_is.__func__
)
Expand Down
145 changes: 74 additions & 71 deletions synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -62,77 +63,7 @@ def _deserialize_action(actions, is_highlight):
return DEFAULT_NOTIF_ACTION


class EventPushActionsStore(SQLBaseStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"

def __init__(self, db_conn, hs):
super(EventPushActionsStore, self).__init__(db_conn, hs)

self.register_background_index_update(
self.EPA_HIGHLIGHT_INDEX,
index_name="event_push_actions_u_highlight",
table="event_push_actions",
columns=["user_id", "stream_ordering"],
)

self.register_background_index_update(
"event_push_actions_highlights_index",
index_name="event_push_actions_highlights_index",
table="event_push_actions",
columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
where_clause="highlight=1"
)

self._doing_notif_rotation = False
self._rotate_notif_loop = self._clock.looping_call(
self._rotate_notifs, 30 * 60 * 1000
)

def _set_push_actions_for_event_and_users_txn(self, txn, event):
"""
Args:
event: the event set actions for
tuples: list of tuples of (user_id, actions)
"""

sql = """
INSERT INTO event_push_actions (
room_id, event_id, user_id, actions, stream_ordering,
topological_ordering, notif, highlight
)
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
FROM event_push_actions_staging
WHERE event_id = ?
"""

txn.execute(sql, (
event.room_id, event.internal_metadata.stream_ordering,
event.depth, event.event_id,
))

user_ids = self._simple_select_onecol_txn(
txn,
table="event_push_actions_staging",
keyvalues={
"event_id": event.event_id,
},
retcol="user_id",
)

self._simple_delete_txn(
txn,
table="event_push_actions_staging",
keyvalues={
"event_id": event.event_id,
},
)

for uid in user_ids:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(event.room_id, uid,)
)

class EventPushActionsWorkerStore(SQLBaseStore):
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
Expand Down Expand Up @@ -449,6 +380,78 @@ def get_no_receipt(txn):
# Now return the first `limit`
defer.returnValue(notifs[:limit])


class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"

def __init__(self, db_conn, hs):
super(EventPushActionsStore, self).__init__(db_conn, hs)

self.register_background_index_update(
self.EPA_HIGHLIGHT_INDEX,
index_name="event_push_actions_u_highlight",
table="event_push_actions",
columns=["user_id", "stream_ordering"],
)

self.register_background_index_update(
"event_push_actions_highlights_index",
index_name="event_push_actions_highlights_index",
table="event_push_actions",
columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
where_clause="highlight=1"
)

self._doing_notif_rotation = False
self._rotate_notif_loop = self._clock.looping_call(
self._rotate_notifs, 30 * 60 * 1000
)

def _set_push_actions_for_event_and_users_txn(self, txn, event):
"""
Args:
event: the event set actions for
tuples: list of tuples of (user_id, actions)
"""

sql = """
INSERT INTO event_push_actions (
room_id, event_id, user_id, actions, stream_ordering,
topological_ordering, notif, highlight
)
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
FROM event_push_actions_staging
WHERE event_id = ?
"""

txn.execute(sql, (
event.room_id, event.internal_metadata.stream_ordering,
event.depth, event.event_id,
))

user_ids = self._simple_select_onecol_txn(
txn,
table="event_push_actions_staging",
keyvalues={
"event_id": event.event_id,
},
retcol="user_id",
)

self._simple_delete_txn(
txn,
table="event_push_actions_staging",
keyvalues={
"event_id": event.event_id,
},
)

for uid in user_ids:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(event.room_id, uid,)
)

@defer.inlineCallbacks
def get_push_actions_for_user(self, user_id, before=None, limit=50,
only_highlight=False):
Expand Down

0 comments on commit 2ec4982

Please sign in to comment.