Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Batch inserts into event_push_actions_staging #2892

Merged
merged 3 commits into from
Feb 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def action_for_event_by_user(self, event, context):
Deferred
"""
rules_by_user = yield self._get_rules_for_event(event, context)
actions_by_user = {}

room_members = yield self.store.get_joined_users_from_context(
event, context
Expand Down Expand Up @@ -189,14 +190,17 @@ def action_for_event_by_user(self, event, context):
if matches:
actions = [x for x in rule['actions'] if x != 'dont_notify']
if actions and 'notify' in actions:
# Push rules say we should notify the user of this event,
# so we mark it in the DB in the staging area. (This
# will then get handled when we persist the event)
yield self.store.add_push_actions_to_staging(
event.event_id, uid, actions,
)
# Push rules say we should notify the user of this event
actions_by_user[uid] = actions
break

# Mark in the DB staging area the push actions for users who should be
# notified for this event. (This will then get handled when we persist
# the event)
yield self.store.add_push_actions_to_staging(
event.event_id, actions_by_user,
)


def _condition_checker(evaluator, conditions, uid, display_name, cache):
for cond in conditions:
Expand Down
53 changes: 36 additions & 17 deletions synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,32 +755,51 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
(rotate_to_stream_ordering,)
)

def add_push_actions_to_staging(self, event_id, user_id, actions):
"""Add the push actions for the user and event to the push
action staging area.
def add_push_actions_to_staging(self, event_id, user_id_actions):
"""Add the push actions for the event to the push action staging area.

Args:
event_id (str)
user_id (str)
actions (list[dict|str]): An action can either be a string or
dict.
user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
user_id to list of push actions, where an action can either be
a string or dict.

Returns:
Deferred
"""

is_highlight = 1 if _action_has_highlight(actions) else 0
if not user_id_actions:
return

return self._simple_insert(
table="event_push_actions_staging",
values={
"event_id": event_id,
"user_id": user_id,
"actions": _serialize_action(actions, is_highlight),
"notif": 1,
"highlight": is_highlight,
},
desc="add_push_actions_to_staging",
# This is a helper function for generating the necessary tuple that
# can be used to inert into the `event_push_actions_staging` table.
def _gen_entry(user_id, actions):
is_highlight = 1 if _action_has_highlight(actions) else 0
return (
event_id, # event_id column
user_id, # user_id column
_serialize_action(actions, is_highlight), # actions column
1, # notif column
is_highlight, # highlight column
)

def _add_push_actions_to_staging_txn(txn):
# We don't use _simple_insert_many here to avoid the overhead
# of generating lists of dicts.

sql = """
INSERT INTO event_push_actions_staging
(event_id, user_id, actions, notif, highlight)
VALUES (?, ?, ?, ?, ?)
"""

txn.executemany(sql, (
_gen_entry(user_id, actions)
for user_id, actions in user_id_actions.iteritems()
))

return self.runInteraction(
"add_push_actions_to_staging", _add_push_actions_to_staging_txn
)

def remove_push_actions_from_staging(self, event_id):
Expand Down
10 changes: 6 additions & 4 deletions tests/replication/slave/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,12 @@ def persist(
state_handler = self.hs.get_state_handler()
context = yield state_handler.compute_event_context(event)

for user_id, actions in push_actions:
yield self.master_store.add_push_actions_to_staging(
event.event_id, user_id, actions,
)
yield self.master_store.add_push_actions_to_staging(
event.event_id, {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this the same as just passing push_actions? or are you deliberately copying?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The push actions are lists of tuples, rather than dicts here, I think? We could go and change the function to take the push_actions as a dict.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nobody knows, because it's not documented. But yes, I see what you mean.

user_id: actions
for user_id, actions in push_actions
},
)

ordering = None
if backfill:
Expand Down
2 changes: 1 addition & 1 deletion tests/storage/test_event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def _inject_actions(stream, action):
event.depth = stream

yield self.store.add_push_actions_to_staging(
event.event_id, user_id, action,
event.event_id, {user_id: action},
)
yield self.store.runInteraction(
"", self.store._set_push_actions_for_event_and_users_txn,
Expand Down