Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Push badge counts #507

Merged
merged 10 commits into from
Jan 21, 2016
73 changes: 50 additions & 23 deletions synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken
from synapse.api.constants import Membership

import synapse.util.async
import push_rule_evaluator as push_rule_evaluator
Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(self, _hs, profile_tag, user_id, app_id,
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.failing_since = failing_since
self.alive = True
self.badge = None

# The last value of last_active_time that we saw
self.last_last_active_time = 0
Expand Down Expand Up @@ -92,8 +94,7 @@ def start(self):
# we fail to dispatch the push)
config = PaginationConfig(from_token=None, limit='1')
chunk = yield self.evStreamHandler.get_stream(
self.user_id, config, timeout=0, affect_presence=False,
only_room_events=True
self.user_id, config, timeout=0, affect_presence=False
)
self.last_token = chunk['end']
self.store.update_pusher_last_token(
Expand Down Expand Up @@ -124,9 +125,11 @@ def get_and_dispatch(self):
from_tok = StreamToken.from_string(self.last_token)
config = PaginationConfig(from_token=from_tok, limit='1')
timeout = (300 + random.randint(-60, 60)) * 1000
# note that we need to get read receipts down the stream as we need to
# wake up when one arrives. we don't need to explicitly look for
# them though.
chunk = yield self.evStreamHandler.get_stream(
self.user_id, config, timeout=timeout, affect_presence=False,
only_room_events=True
self.user_id, config, timeout=timeout, affect_presence=False
)

# limiting to 1 may get 1 event plus 1 presence event, so
Expand All @@ -135,10 +138,10 @@ def get_and_dispatch(self):
for c in chunk['chunk']:
if 'event_id' in c: # Hmmm...
single_event = c
break

if not single_event:
yield self.update_badge()
self.last_token = chunk['end']
logger.debug("Event stream timeout for pushkey %s", self.pushkey)
yield self.store.update_pusher_last_token(
self.app_id,
self.pushkey,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do this even if there's no event? That way we always update the last_token to the latest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it won't resend badge counts.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually good point, and this would make this a lot simpler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were you going to do this?

Expand All @@ -161,7 +164,8 @@ def get_and_dispatch(self):
tweaks = rule_evaluator.tweaks_for_actions(actions)

if 'notify' in actions:
rejected = yield self.dispatch_push(single_event, tweaks)
self.badge = yield self._get_badge_count()
rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
self.has_unread = True
if isinstance(rejected, list) or isinstance(rejected, tuple):
processed = True
Expand All @@ -181,7 +185,6 @@ def get_and_dispatch(self):
yield self.hs.get_pusherpool().remove_pusher(
self.app_id, pk, self.user_id
)
else:
processed = True

if not self.alive:
Expand Down Expand Up @@ -254,7 +257,7 @@ def get_and_dispatch(self):
def stop(self):
self.alive = False

def dispatch_push(self, p, tweaks):
def dispatch_push(self, p, tweaks, badge):
"""
Overridden by implementing classes to actually deliver the notification
Args:
Expand All @@ -266,23 +269,47 @@ def dispatch_push(self, p, tweaks):
"""
pass

def reset_badge_count(self):
pass
@defer.inlineCallbacks
def update_badge(self):
new_badge = yield self._get_badge_count()
if self.badge != new_badge:
self.badge = new_badge
yield self.send_badge(self.badge)

def presence_changed(self, state):
def send_badge(self, badge):
"""
We clear badge counts whenever a user's last_active time is bumped
This is by no means perfect but I think it's the best we can do
without read receipts.
Overridden by implementing classes to send an updated badge count
"""
if 'last_active' in state.state:
last_active = state.state['last_active']
if last_active > self.last_last_active_time:
self.last_last_active_time = last_active
if self.has_unread:
logger.info("Resetting badge count for %s", self.user_id)
self.reset_badge_count()
self.has_unread = False
pass

@defer.inlineCallbacks
def _get_badge_count(self):
room_list = yield self.store.get_rooms_for_user_where_membership_is(
user_id=self.user_id,
membership_list=(Membership.INVITE, Membership.JOIN)
)

my_receipts_by_room = yield self.store.get_receipts_for_user(
self.user_id,
"m.read",
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than abusing SyncConfig it'd probably be cleaner to add a storage function get_receipts_for_room, e.g.:

    @cached(num_args=3)
    def get_receipts_for_room_for_room(self, room_id, user_id, receipt_type):
        return self._simple_select_list(
            table="receipts_linearized",
            keyvalues={
                "room_id": room_id,
                "receipt_type": receipt_type,
                "user_id": user_id,
            },
            retcols=("event_id"),
            desc="get_receipts_for_room",
        )

or possibly using _simple_select_one_onecol


badge = 0

for r in room_list:
if r.membership == Membership.INVITE:
badge += 1
else:
if r.room_id in my_receipts_by_room:
last_unread_event_id = my_receipts_by_room[r.room_id]

notifs = yield (
self.store.get_unread_event_push_actions_by_room_for_user(
r.room_id, self.user_id, last_unread_event_id
)
)
badge += len(notifs)
defer.returnValue(badge)


class PusherConfigException(Exception):
Expand Down
14 changes: 7 additions & 7 deletions synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, _hs, profile_tag, user_id, app_id,
del self.data_minus_url['url']

@defer.inlineCallbacks
def _build_notification_dict(self, event, tweaks):
def _build_notification_dict(self, event, tweaks, badge):
# we probably do not want to push for every presence update
# (we may want to be able to set up notifications when specific
# people sign in, but we'd want to only deliver the pertinent ones)
Expand All @@ -71,7 +71,7 @@ def _build_notification_dict(self, event, tweaks):
'counts': { # -- we don't mark messages as read yet so
# we have no way of knowing
# Just set the badge to 1 until we have read receipts
'unread': 1,
'unread': badge,
# 'missed_calls': 2
},
'devices': [
Expand Down Expand Up @@ -101,8 +101,8 @@ def _build_notification_dict(self, event, tweaks):
defer.returnValue(d)

@defer.inlineCallbacks
def dispatch_push(self, event, tweaks):
notification_dict = yield self._build_notification_dict(event, tweaks)
def dispatch_push(self, event, tweaks, badge):
notification_dict = yield self._build_notification_dict(event, tweaks, badge)
if not notification_dict:
defer.returnValue([])
try:
Expand All @@ -116,15 +116,15 @@ def dispatch_push(self, event, tweaks):
defer.returnValue(rejected)

@defer.inlineCallbacks
def reset_badge_count(self):
def send_badge(self, badge):
logger.info("Sending updated badge count %d to %r", badge, self.user_id)
d = {
'notification': {
'id': '',
'type': None,
'sender': '',
'counts': {
'unread': 0,
'missed_calls': 0
'unread': badge
},
'devices': [
{
Expand Down
15 changes: 0 additions & 15 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,6 @@ def __init__(self, _hs):
self.pushers = {}
self.last_pusher_started = -1

distributor = self.hs.get_distributor()
distributor.observe(
"user_presence_changed", self.user_presence_changed
)

@defer.inlineCallbacks
def user_presence_changed(self, user, state):
user_id = user.to_string()

# until we have read receipts, pushers use this to reset a user's
# badge counters to zero
for p in self.pushers.values():
if p.user_id == user_id:
yield p.presence_changed(state)

@defer.inlineCallbacks
def start(self):
pushers = yield self.store.get_all_pushers()
Expand Down
54 changes: 33 additions & 21 deletions synapse/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ def get_receipts_for_room(self, room_id, receipt_type):
desc="get_receipts_for_room",
)

@cachedInlineCallbacks(num_args=2)
def get_receipts_for_user(self, user_id, receipt_type):
def f(txn):
sql = (
"SELECT room_id,event_id "
"FROM receipts_linearized "
"WHERE user_id = ? AND receipt_type = ? "
)
txn.execute(sql, (user_id, receipt_type))
return txn.fetchall()

defer.returnValue(dict(
(yield self.runInteraction("get_receipts_for_user", f))
))

@defer.inlineCallbacks
def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
"""Get receipts for multiple rooms for sending to clients.
Expand Down Expand Up @@ -194,29 +209,16 @@ def f(txn):
def get_max_receipt_stream_id(self):
return self._receipts_id_gen.get_max_token(self)

@cachedInlineCallbacks()
def get_graph_receipts_for_room(self, room_id):
"""Get receipts for sending to remote servers.
"""
rows = yield self._simple_select_list(
table="receipts_graph",
keyvalues={"room_id": room_id},
retcols=["receipt_type", "user_id", "event_id"],
desc="get_linearized_receipts_for_room",
)

result = {}
for row in rows:
result.setdefault(
row["user_id"], {}
).setdefault(
row["receipt_type"], []
).append(row["event_id"])

defer.returnValue(result)

def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
user_id, event_id, data, stream_id):
txn.call_after(
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
)
txn.call_after(
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)

# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
Expand Down Expand Up @@ -324,6 +326,7 @@ def graph_to_linear(txn):
)

max_persisted_id = yield self._stream_id_gen.get_max_token(self)

defer.returnValue((stream_id, max_persisted_id))

def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids,
Expand All @@ -336,6 +339,15 @@ def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids,

def insert_graph_receipt_txn(self, txn, room_id, receipt_type,
user_id, event_ids, data):
txn.call_after(
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
)
txn.call_after(
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)

self._simple_delete_txn(
txn,
table="receipts_graph",
Expand Down
18 changes: 18 additions & 0 deletions synapse/storage/schema/delta/28/receipts_user_id_index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* Copyright 2015, 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE INDEX receipts_linearized_user ON receipts_linearized(
user_id
);