Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add the Notifications API #1028

Merged
merged 8 commits into from
Aug 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions synapse/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
account_data,
report_event,
openid,
notifications,
devices,
thirdparty,
)
Expand Down Expand Up @@ -92,5 +93,6 @@ def register_servlets(client_resource, hs):
account_data.register_servlets(hs, client_resource)
report_event.register_servlets(hs, client_resource)
openid.register_servlets(hs, client_resource)
notifications.register_servlets(hs, client_resource)
devices.register_servlets(hs, client_resource)
thirdparty.register_servlets(hs, client_resource)
99 changes: 99 additions & 0 deletions synapse/rest/client/v2_alpha/notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer

from synapse.http.servlet import (
RestServlet, parse_string, parse_integer
)
from synapse.events.utils import (
serialize_event, format_event_for_client_v2_without_room_id,
)

from ._base import client_v2_patterns

import logging

logger = logging.getLogger(__name__)


class NotificationsServlet(RestServlet):
PATTERNS = client_v2_patterns("/notifications$", releases=())

def __init__(self, hs):
super(NotificationsServlet, self).__init__()
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.clock = hs.get_clock()

@defer.inlineCallbacks
def on_GET(self, request):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()

from_token = parse_string(request, "from", required=False)
limit = parse_integer(request, "limit", default=50)

limit = min(limit, 500)

push_actions = yield self.store.get_push_actions_for_user(
user_id, from_token, limit
)

receipts_by_room = yield self.store.get_receipts_for_user_with_orderings(
user_id, 'm.read'
)

notif_event_ids = [pa["event_id"] for pa in push_actions]
notif_events = yield self.store.get_events(notif_event_ids)

returned_push_actions = []

next_token = None

for pa in push_actions:
returned_pa = {
"room_id": pa["room_id"],
"profile_tag": pa["profile_tag"],
"actions": pa["actions"],
"ts": pa["received_ts"],
"event": serialize_event(
notif_events[pa["event_id"]],
self.clock.time_msec(),
event_format=format_event_for_client_v2_without_room_id,
),
}

if pa["room_id"] not in receipts_by_room:
returned_pa["read"] = False
else:
receipt = receipts_by_room[pa["room_id"]]

returned_pa["read"] = (
receipt["topological_ordering"], receipt["stream_ordering"]
) >= (
pa["topological_ordering"], pa["stream_ordering"]
)
returned_push_actions.append(returned_pa)
next_token = pa["stream_ordering"]

defer.returnValue((200, {
"notifications": returned_push_actions,
"next_token": next_token,
}))


def register_servlets(hs, http_server):
NotificationsServlet(hs).register(http_server)
30 changes: 30 additions & 0 deletions synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,36 @@ def get_no_receipt(txn):
# Now return the first `limit`
defer.returnValue(notifs[:limit])

@defer.inlineCallbacks
def get_push_actions_for_user(self, user_id, before=None, limit=50):
def f(txn):
before_clause = ""
if before:
before_clause = "AND stream_ordering < ?"
args = [user_id, before, limit]
else:
args = [user_id, limit]
sql = (
"SELECT epa.event_id, epa.room_id,"
" epa.stream_ordering, epa.topological_ordering,"
" epa.actions, epa.profile_tag, e.received_ts"
" FROM event_push_actions epa, events e"
" WHERE epa.room_id = e.room_id AND epa.event_id = e.event_id"
" AND epa.user_id = ? %s"
" ORDER BY epa.stream_ordering DESC"
" LIMIT ?"
% (before_clause,)
)
txn.execute(sql, args)
return self.cursor_to_dict(txn)

push_actions = yield self.runInteraction(
"get_push_actions_for_user", f
)
for pa in push_actions:
pa["actions"] = json.loads(pa["actions"])
defer.returnValue(push_actions)

@defer.inlineCallbacks
def get_time_of_last_push_action_before(self, stream_ordering):
def f(txn):
Expand Down
25 changes: 25 additions & 0 deletions synapse/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,31 @@ def get_receipts_for_user(self, user_id, receipt_type):

defer.returnValue({row["room_id"]: row["event_id"] for row in rows})

@defer.inlineCallbacks
def get_receipts_for_user_with_orderings(self, user_id, receipt_type):
def f(txn):
sql = (
"SELECT rl.room_id, rl.event_id,"
" e.topological_ordering, e.stream_ordering"
" FROM receipts_linearized AS rl"
" INNER JOIN events AS e USING (room_id, event_id)"
" WHERE rl.room_id = e.room_id"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally I prefer to use explicit joins, as it makes it easier to see what the actual conditions are. i.e.:

SELECT rl.room_id, rl.event_id, e.topological_ordering, e.stream_ordering FROM receipts_linearized AS rl
INNER JOIN events AS e USING (room_id, event_id)
WHERE user_id = ?

" AND rl.event_id = e.event_id"
" AND user_id = ?"
)
txn.execute(sql, (user_id,))
return txn.fetchall()
rows = yield self.runInteraction(
"get_receipts_for_user_with_orderings", f
)
defer.returnValue({
row[0]: {
"event_id": row[1],
"topological_ordering": row[2],
"stream_ordering": row[3],
} for row in rows
})

@defer.inlineCallbacks
def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
"""Get receipts for multiple rooms for sending to clients.
Expand Down