Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Push badge counts #507

Merged
merged 10 commits into from
Jan 21, 2016
102 changes: 79 additions & 23 deletions synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from twisted.internet import defer

from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken
from synapse.types import StreamToken, UserID
from synapse.api.constants import Membership
from synapse.api.filtering import FilterCollection

import synapse.util.async
import push_rule_evaluator as push_rule_evaluator
Expand Down Expand Up @@ -55,6 +57,7 @@ def __init__(self, _hs, profile_tag, user_id, app_id,
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.failing_since = failing_since
self.alive = True
self.badge = None

# The last value of last_active_time that we saw
self.last_last_active_time = 0
Expand Down Expand Up @@ -92,8 +95,7 @@ def start(self):
# we fail to dispatch the push)
config = PaginationConfig(from_token=None, limit='1')
chunk = yield self.evStreamHandler.get_stream(
self.user_id, config, timeout=0, affect_presence=False,
only_room_events=True
self.user_id, config, timeout=0, affect_presence=False
)
self.last_token = chunk['end']
self.store.update_pusher_last_token(
Expand Down Expand Up @@ -125,20 +127,30 @@ def get_and_dispatch(self):
config = PaginationConfig(from_token=from_tok, limit='1')
timeout = (300 + random.randint(-60, 60)) * 1000
chunk = yield self.evStreamHandler.get_stream(
self.user_id, config, timeout=timeout, affect_presence=False,
only_room_events=True
self.user_id, config, timeout=timeout, affect_presence=False
)

# limiting to 1 may get 1 event plus 1 presence event, so
# pick out the actual event
single_event = None
read_receipt = None
for c in chunk['chunk']:
if 'event_id' in c: # Hmmm...
single_event = c
break
elif c['type'] == 'm.receipt':
read_receipt = c

have_updated_badge = False
if read_receipt:
for receipt_part in read_receipt['content'].values():
if 'm.read' in receipt_part:
if self.user_id in receipt_part['m.read'].keys():
have_updated_badge = True

if not single_event:
if have_updated_badge:
yield self.update_badge()
self.last_token = chunk['end']
logger.debug("Event stream timeout for pushkey %s", self.pushkey)
yield self.store.update_pusher_last_token(
self.app_id,
self.pushkey,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do this even if there's no event? That way we always update the last_token to the latest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it won't resend badge counts.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually good point, and this would make this a lot simpler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were you going to do this?

Expand All @@ -161,7 +173,8 @@ def get_and_dispatch(self):
tweaks = rule_evaluator.tweaks_for_actions(actions)

if 'notify' in actions:
rejected = yield self.dispatch_push(single_event, tweaks)
self.badge = yield self._get_badge_count()
rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
self.has_unread = True
if isinstance(rejected, list) or isinstance(rejected, tuple):
processed = True
Expand All @@ -182,6 +195,8 @@ def get_and_dispatch(self):
self.app_id, pk, self.user_id
)
else:
if have_updated_badge:
yield self.update_badge()
processed = True

if not self.alive:
Expand Down Expand Up @@ -254,7 +269,7 @@ def get_and_dispatch(self):
def stop(self):
self.alive = False

def dispatch_push(self, p, tweaks):
def dispatch_push(self, p, tweaks, badge):
"""
Overridden by implementing classes to actually deliver the notification
Args:
Expand All @@ -266,23 +281,64 @@ def dispatch_push(self, p, tweaks):
"""
pass

def reset_badge_count(self):
pass
@defer.inlineCallbacks
def update_badge(self):
new_badge = yield self._get_badge_count()
if self.badge != new_badge:
self.badge = new_badge
yield self.send_badge(self.badge)

def presence_changed(self, state):
def send_badge(self, badge):
"""
We clear badge counts whenever a user's last_active time is bumped
This is by no means perfect but I think it's the best we can do
without read receipts.
Overridden by implementing classes to send an updated badge count
"""
if 'last_active' in state.state:
last_active = state.state['last_active']
if last_active > self.last_last_active_time:
self.last_last_active_time = last_active
if self.has_unread:
logger.info("Resetting badge count for %s", self.user_id)
self.reset_badge_count()
self.has_unread = False
pass

@defer.inlineCallbacks
def _get_badge_count(self):
membership_list = (Membership.INVITE, Membership.JOIN)

room_list = yield self.store.get_rooms_for_user_where_membership_is(
user_id=self.user_id,
membership_list=membership_list
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to simply membership_list=(Membership.INVITE, Membership.JOIN),?

)

user_is_guest = yield self.store.is_guest(self.user_id)

# XXX: importing inside method to break circular dependency.
# should sort out the mess by moving all this logic out of
# push/__init__.py and probably moving the logic we use from the sync
# handler to somewhere more amenable to re-use.
from synapse.handlers.sync import SyncConfig
sync_config = SyncConfig(
user=UserID.from_string(self.user_id),
filter=FilterCollection({}),
is_guest=user_is_guest,
)
now_token = yield self.hs.get_event_sources().get_current_token()
sync_handler = self.hs.get_handlers().sync_handler
_, ephemeral_by_room = yield sync_handler.ephemeral_by_room(
sync_config, now_token
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than abusing SyncConfig it'd probably be cleaner to add a storage function get_receipts_for_room, e.g.:

    @cached(num_args=3)
    def get_receipts_for_room_for_room(self, room_id, user_id, receipt_type):
        return self._simple_select_list(
            table="receipts_linearized",
            keyvalues={
                "room_id": room_id,
                "receipt_type": receipt_type,
                "user_id": user_id,
            },
            retcols=("event_id"),
            desc="get_receipts_for_room",
        )

or possibly using _simple_select_one_onecol


badge = 0

for r in room_list:
if r.membership == Membership.INVITE:
badge += 1
else:
last_unread_event_id = sync_handler.last_read_event_id_for_room_and_user(
r.room_id, self.user_id, ephemeral_by_room
)

if last_unread_event_id:
notifs = yield (
self.store.get_unread_event_push_actions_by_room_for_user(
r.room_id, self.user_id, last_unread_event_id
)
)
badge += len(notifs)
defer.returnValue(badge)


class PusherConfigException(Exception):
Expand Down
14 changes: 7 additions & 7 deletions synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, _hs, profile_tag, user_id, app_id,
del self.data_minus_url['url']

@defer.inlineCallbacks
def _build_notification_dict(self, event, tweaks):
def _build_notification_dict(self, event, tweaks, badge):
# we probably do not want to push for every presence update
# (we may want to be able to set up notifications when specific
# people sign in, but we'd want to only deliver the pertinent ones)
Expand All @@ -71,7 +71,7 @@ def _build_notification_dict(self, event, tweaks):
'counts': { # -- we don't mark messages as read yet so
# we have no way of knowing
# Just set the badge to 1 until we have read receipts
'unread': 1,
'unread': badge,
# 'missed_calls': 2
},
'devices': [
Expand Down Expand Up @@ -101,8 +101,8 @@ def _build_notification_dict(self, event, tweaks):
defer.returnValue(d)

@defer.inlineCallbacks
def dispatch_push(self, event, tweaks):
notification_dict = yield self._build_notification_dict(event, tweaks)
def dispatch_push(self, event, tweaks, badge):
notification_dict = yield self._build_notification_dict(event, tweaks, badge)
if not notification_dict:
defer.returnValue([])
try:
Expand All @@ -116,15 +116,15 @@ def dispatch_push(self, event, tweaks):
defer.returnValue(rejected)

@defer.inlineCallbacks
def reset_badge_count(self):
def send_badge(self, badge):
logger.info("Sending updated badge count %d to %r", badge, self.user_id)
d = {
'notification': {
'id': '',
'type': None,
'sender': '',
'counts': {
'unread': 0,
'missed_calls': 0
'unread': badge
},
'devices': [
{
Expand Down
15 changes: 0 additions & 15 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,6 @@ def __init__(self, _hs):
self.pushers = {}
self.last_pusher_started = -1

distributor = self.hs.get_distributor()
distributor.observe(
"user_presence_changed", self.user_presence_changed
)

@defer.inlineCallbacks
def user_presence_changed(self, user, state):
user_id = user.to_string()

# until we have read receipts, pushers use this to reset a user's
# badge counters to zero
for p in self.pushers.values():
if p.user_id == user_id:
yield p.presence_changed(state)

@defer.inlineCallbacks
def start(self):
pushers = yield self.store.get_all_pushers()
Expand Down