Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move storage functions for push calculations #2913

Merged
merged 1 commit into from
Feb 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions synapse/app/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
Expand Down Expand Up @@ -75,10 +74,6 @@ class PusherSlaveStore(
DataStore.get_profile_displayname.__func__
)

who_forgot_in_room = (
RoomMemberStore.__dict__["who_forgot_in_room"]
)


class PusherServer(HomeServer):
def setup(self):
Expand Down
8 changes: 2 additions & 6 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@


class SynchrotronSlavedStore(
SlavedPushRuleStore,
SlavedEventStore,
SlavedReceiptsStore,
SlavedAccountDataStore,
SlavedApplicationServiceStore,
Expand All @@ -73,14 +71,12 @@ class SynchrotronSlavedStore(
SlavedGroupServerStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedPushRuleStore,
SlavedEventStore,
SlavedClientIpStore,
RoomStore,
BaseSlavedStore,
):
who_forgot_in_room = (
RoomMemberStore.__dict__["who_forgot_in_room"]
)

did_forget = (
RoomMemberStore.__dict__["did_forget"]
)
Expand Down
126 changes: 63 additions & 63 deletions synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,69 @@ def get_no_receipt(txn):
# Now return the first `limit`
defer.returnValue(notifs[:limit])

def add_push_actions_to_staging(self, event_id, user_id_actions):
"""Add the push actions for the event to the push action staging area.

Args:
event_id (str)
user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
user_id to list of push actions, where an action can either be
a string or dict.

Returns:
Deferred
"""

if not user_id_actions:
return

# This is a helper function for generating the necessary tuple that
# can be used to inert into the `event_push_actions_staging` table.
def _gen_entry(user_id, actions):
is_highlight = 1 if _action_has_highlight(actions) else 0
return (
event_id, # event_id column
user_id, # user_id column
_serialize_action(actions, is_highlight), # actions column
1, # notif column
is_highlight, # highlight column
)

def _add_push_actions_to_staging_txn(txn):
# We don't use _simple_insert_many here to avoid the overhead
# of generating lists of dicts.

sql = """
INSERT INTO event_push_actions_staging
(event_id, user_id, actions, notif, highlight)
VALUES (?, ?, ?, ?, ?)
"""

txn.executemany(sql, (
_gen_entry(user_id, actions)
for user_id, actions in user_id_actions.iteritems()
))

return self.runInteraction(
"add_push_actions_to_staging", _add_push_actions_to_staging_txn
)

def remove_push_actions_from_staging(self, event_id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to be using this on the workers? I assumed it would just be for the master.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we use it if we throw an exception between persisting the push rules and persisting the event.

"""Called if we failed to persist the event to ensure that stale push
actions don't build up in the DB

Args:
event_id (str)
"""

return self._simple_delete(
table="event_push_actions_staging",
keyvalues={
"event_id": event_id,
},
desc="remove_push_actions_from_staging",
)


class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
Expand Down Expand Up @@ -775,69 +838,6 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
(rotate_to_stream_ordering,)
)

def add_push_actions_to_staging(self, event_id, user_id_actions):
"""Add the push actions for the event to the push action staging area.

Args:
event_id (str)
user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
user_id to list of push actions, where an action can either be
a string or dict.

Returns:
Deferred
"""

if not user_id_actions:
return

# This is a helper function for generating the necessary tuple that
# can be used to inert into the `event_push_actions_staging` table.
def _gen_entry(user_id, actions):
is_highlight = 1 if _action_has_highlight(actions) else 0
return (
event_id, # event_id column
user_id, # user_id column
_serialize_action(actions, is_highlight), # actions column
1, # notif column
is_highlight, # highlight column
)

def _add_push_actions_to_staging_txn(txn):
# We don't use _simple_insert_many here to avoid the overhead
# of generating lists of dicts.

sql = """
INSERT INTO event_push_actions_staging
(event_id, user_id, actions, notif, highlight)
VALUES (?, ?, ?, ?, ?)
"""

txn.executemany(sql, (
_gen_entry(user_id, actions)
for user_id, actions in user_id_actions.iteritems()
))

return self.runInteraction(
"add_push_actions_to_staging", _add_push_actions_to_staging_txn
)

def remove_push_actions_from_staging(self, event_id):
"""Called if we failed to persist the event to ensure that stale push
actions don't build up in the DB

Args:
event_id (str)
"""

return self._simple_delete(
table="event_push_actions_staging",
keyvalues={
"event_id": event_id,
},
desc="remove_push_actions_from_staging",
)


def _action_has_highlight(actions):
for action in actions:
Expand Down
14 changes: 11 additions & 3 deletions synapse/storage/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
# limitations under the License.

from ._base import SQLBaseStore
from synapse.storage.appservice import ApplicationServiceWorkerStore
from synapse.storage.pusher import PusherWorkerStore
from synapse.storage.receipts import ReceiptsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.push.baserules import list_with_base_rules
Expand Down Expand Up @@ -51,7 +55,11 @@ def _load_rules(rawrules, enabled_map):
return rules


class PushRulesWorkerStore(SQLBaseStore):
class PushRulesWorkerStore(ApplicationServiceWorkerStore,
ReceiptsWorkerStore,
PusherWorkerStore,
RoomMemberWorkerStore,
SQLBaseStore):
"""This is an abstract base class where subclasses must implement
`get_max_push_rules_stream_id` which can be called in the initializer.
"""
Expand Down Expand Up @@ -140,8 +148,6 @@ def have_push_rules_changed_txn(txn):
"have_push_rules_changed", have_push_rules_changed_txn
)


class PushRuleStore(PushRulesWorkerStore):
@cachedList(cached_method_name="get_push_rules_for_user",
list_name="user_ids", num_args=1, inlineCallbacks=True)
def bulk_get_push_rules(self, user_ids):
Expand Down Expand Up @@ -281,6 +287,8 @@ def bulk_get_push_rules_enabled(self, user_ids):
results.setdefault(row['user_name'], {})[row['rule_id']] = enabled
defer.returnValue(results)


class PushRuleStore(PushRulesWorkerStore):
@defer.inlineCallbacks
def add_push_rule(
self, user_id, rule_id, priority_class, conditions, actions,
Expand Down
22 changes: 13 additions & 9 deletions synapse/storage/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,6 @@ def get_all_updated_pushers_rows_txn(txn):
"get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
)


class PusherStore(PusherWorkerStore):
def get_pushers_stream_token(self):
return self._pushers_id_gen.get_current_token()

@cachedInlineCallbacks(num_args=1, max_entries=15000)
def get_if_user_has_pusher(self, user_id):
# This only exists for the cachedList decorator
Expand All @@ -201,6 +196,11 @@ def get_if_users_have_pushers(self, user_ids):

defer.returnValue(result)


class PusherStore(PusherWorkerStore):
def get_pushers_stream_token(self):
return self._pushers_id_gen.get_current_token()

@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
app_display_name, device_display_name,
Expand Down Expand Up @@ -233,14 +233,18 @@ def add_pusher(self, user_id, access_token, kind, app_id,
)

if newly_inserted:
# get_if_user_has_pusher only cares if the user has
# at least *one* pusher.
self.get_if_user_has_pusher.invalidate(user_id,)
self.runInteraction(
"add_pusher",
self._invalidate_cache_and_stream,
self.get_if_user_has_pusher, (user_id,)
)

@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
def delete_pusher_txn(txn, stream_id):
txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
self._invalidate_cache_and_stream(
txn, self.get_if_user_has_pusher, (user_id,)
)

self._simple_delete_one_txn(
txn,
Expand Down
24 changes: 12 additions & 12 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,18 @@ def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry
def _get_joined_hosts_cache(self, room_id):
return _JoinedHostsCache(self, room_id)

@cached()
def who_forgot_in_room(self, room_id):
return self._simple_select_list(
table="room_memberships",
retcols=("user_id", "event_id"),
keyvalues={
"room_id": room_id,
"forgotten": 1,
},
desc="who_forgot"
)


class RoomMemberStore(RoomMemberWorkerStore):
def __init__(self, db_conn, hs):
Expand Down Expand Up @@ -595,18 +607,6 @@ def f(txn):
forgot = yield self.runInteraction("did_forget_membership_at", f)
defer.returnValue(forgot == 1)

@cached()
def who_forgot_in_room(self, room_id):
return self._simple_select_list(
table="room_memberships",
retcols=("user_id", "event_id"),
keyvalues={
"room_id": room_id,
"forgotten": 1,
},
desc="who_forgot"
)

@defer.inlineCallbacks
def _background_add_membership_profile(self, progress, batch_size):
target_min_stream_id = progress.get(
Expand Down