From 892e70ec8404865b4b1e6cf4eef7a3ada7114149 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 28 Oct 2015 16:06:57 +0000 Subject: [PATCH 01/10] Add APIs for adding and removing tags from rooms --- synapse/rest/client/v2_alpha/__init__.py | 2 + synapse/rest/client/v2_alpha/tags.py | 89 +++++++++++ synapse/storage/__init__.py | 2 + synapse/storage/schema/delta/25/tags.sql | 37 +++++ synapse/storage/tags.py | 190 +++++++++++++++++++++++ 5 files changed, 320 insertions(+) create mode 100644 synapse/rest/client/v2_alpha/tags.py create mode 100644 synapse/storage/schema/delta/25/tags.sql create mode 100644 synapse/storage/tags.py diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py index 5831ff0e623f..a10813234682 100644 --- a/synapse/rest/client/v2_alpha/__init__.py +++ b/synapse/rest/client/v2_alpha/__init__.py @@ -22,6 +22,7 @@ receipts, keys, tokenrefresh, + tags, ) from synapse.http.server import JsonResource @@ -44,3 +45,4 @@ def register_servlets(client_resource, hs): receipts.register_servlets(hs, client_resource) keys.register_servlets(hs, client_resource) tokenrefresh.register_servlets(hs, client_resource) + tags.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py new file mode 100644 index 000000000000..c4a670c5a7c7 --- /dev/null +++ b/synapse/rest/client/v2_alpha/tags.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import client_v2_pattern + +from synapse.http.servlet import RestServlet + +from twisted.internet import defer + +import logging + +logger = logging.getLogger(__name__) + + +class TagListServlet(RestServlet): + """ + GET /user/{user_id}/rooms/{room_id}/tags HTTP/1.1 + """ + PATTERN = client_v2_pattern( + "/user/(?P[^/]*)/rooms/(?P[^/]*)/tags" + ) + + def __init__(self, hs): + super(TagListServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_GET(self, request, user_id, room_id): + auth_user, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot get tags for other users.") + + tags = yield self.store.get_tags_for_room(user_id, room_id) + + defer.returnValue((200, {"tags": tags})) + + +class TagServlet(RestServlet): + """ + PUT /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1 + DELETE /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1 + """ + PATTERN = client_v2_pattern( + "/user/(?P[^/]*)/rooms/(?P[^/]*)/tags/(?P[^/]*)" + ) + def __init__(self, hs): + super(TagServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_PUT(self, request, user_id, room_id, tag): + auth_user, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot add tags for other users.") + + yield self.store.add_tag_to_room(user_id, room_id, tag) + + # TODO: poke the notifier. + defer.returnValue((200, {})) + + @defer.inlineCallbacks + def on_DELETE(self, request, user_id, room_id, tag): + auth_user, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot add tags for other users.") + + yield self.store.remove_tag_from_room(user_id, room_id, tag) + + # TODO: poke the notifier. + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + TagListServlet(hs).register(http_server) + TagServlet(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a1bd9c4ce900..e7443f283854 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -41,6 +41,7 @@ from .receipts import ReceiptsStore from .search import SearchStore +from .tags import TagsStore import logging @@ -71,6 +72,7 @@ class DataStore(RoomMemberStore, RoomStore, ReceiptsStore, EndToEndKeyStore, SearchStore, + TagsStore, ): def __init__(self, hs): diff --git a/synapse/storage/schema/delta/25/tags.sql b/synapse/storage/schema/delta/25/tags.sql new file mode 100644 index 000000000000..168766dcf396 --- /dev/null +++ b/synapse/storage/schema/delta/25/tags.sql @@ -0,0 +1,37 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE IF NOT EXISTS room_tags( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + tag TEXT NOT NULL, -- The name of the tag. + CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag) +); + +CREATE TABLE IF NOT EXISTS room_tags_revisions ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + stream_id BIGINT NOT NULL, -- The current version of the room tags. + CONSTRAINT room_tag_revisions_uniqueness UNIQUE (user_id, room_id) +); + +CREATE TABLE IF NOT EXISTS private_user_data_max_stream_id( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT NOT NULL, + CHECK (Lock='X') +); + +INSERT INTO private_user_data_max_stream_id (stream_id) VALUES (0); diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py new file mode 100644 index 000000000000..507e60596f2f --- /dev/null +++ b/synapse/storage/tags.py @@ -0,0 +1,190 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached +from twisted.internet import defer +from .util.id_generators import StreamIdGenerator + +import logging + +logger = logging.getLogger(__name__) + + +class TagsStore(SQLBaseStore): + def __init__(self, hs): + super(TagsStore, self).__init__(hs) + + self._private_user_data_id_gen = StreamIdGenerator( + "private_user_data_max_stream_id", "stream_id" + ) + + @cached() + def get_tags_for_user(self, user_id): + """Get all the tags for a user. + + + Args: + user_id(str): The user to get the tags for. + Returns: + A deferred dict mapping from room_id strings to lists of tag + strings. + """ + + deferred = self._simple_select_list( + "room_tags", {"user_id": user_id}, ["room_id", "tag"] + ) + + @deferred.addCallback + def tags_by_room(rows): + tags_by_room = {} + for row in rows: + tags_by_room.setdefault(row["room_id"], []).append(row["tag"]) + return tags_by_room + + return deferred + + @defer.inlineCallbacks + def get_updated_tags(self, user_id, stream_id): + """Get all the tags for the rooms where the tags have changed since the + given version + + Args: + user_id(str): The user to get the tags for. + stream_id(int): The earliest update to get for the user. + Returns: + A deferred dict mapping from room_id strings to lists of tag + strings for all the rooms that changed since the stream_id token. + """ + def get_updated_tags_txn(txn): + sql = ( + "SELECT room_id from room_tags_revisions" + " WHERE user_id = ? AND stream_id > ?" + ) + txn.execute(sql, (user_id, stream_id)) + room_ids = [row[0] for row in txn.fetchall()] + return room_ids + + room_ids = yield self.runInteraction( + "get_updated_tags", get_updated_tags_txn + ) + + results = {} + if room_ids: + tags_by_room = yield self.get_tags_for_user(self, user_id) + for room_id in rooms_ids: + results[room_id] = tags_by_room[room_id] + + defer.returnValue(results) + + def get_tags_for_room(self, user_id, room_id): + """Get all the tags for the given room + Args: + user_id(str): The user to get tags for + room_id(str): The room to get tags for + Returns: + A deferred list of string tags. + """ + return self._simple_select_onecol( + table="room_tags", + keyvalues={"user_id": user_id, "room_id": room_id}, + retcol="tag", + desc="get_tags_for_room", + ) + + @defer.inlineCallbacks + def add_tag_to_room(self, user_id, room_id, tag): + """Add a tag to a room for a user. + Returns: + A deferred that completes once the tag has been added. + """ + def add_tag_txn(txn, next_id): + sql = ( + "INSERT INTO room_tags (user_id, room_id, tag)" + " VALUES (?, ?, ?)" + ) + try: + txn.execute(sql, (user_id, room_id, tag)) + except database_engine.module.IntegrityError as e: + # Return early if the row is already in the table + # and we don't need to bump the revision number of the + # private_user_data. + return + self._update_revision_txn(txn, user_id, room_id, next_id) + + with (yield self._private_user_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction("add_tag", add_tag_txn, next_id) + + self.get_tags_for_user.invalidate((user_id,)) + + @defer.inlineCallbacks + def remove_tag_from_room(self, user_id, room_id, tag): + """Remove a tag from a room for a user. + Returns: + A deffered that completes once the tag has been removed + """ + def remove_tag_txn(txn, next_id): + sql = ( + "DELETE FROM room_tags " + " WHERE user_id = ? AND room_id = ? AND tag = ?" + ) + txn.execute(sql, (user_id, room_id, tag)) + self._update_revision_txn(txn, user_id, room_id, next_id) + + with (yield self._private_user_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction("remove_tag", remove_tag_txn, next_id) + + self.get_tags_for_user.invalidate((user_id,)) + + def _update_revision_txn(self, txn, user_id, room_id, next_id): + """Update the latest revision of the tags for the given user and room. + + Args: + txn: The database cursor + user_id(str): The ID of the user. + room_id(str): The ID of the room. + next_id(int): The the revision to advance to. + """ + + update_max_id_sql = ( + "UPDATE private_user_data_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(update_max_id_sql, (next_id, next_id)) + + update_sql = ( + "UPDATE room_tags_revisions" + " SET stream_id = ?" + " WHERE user_id = ?" + " AND room_id = ?" + ) + txn.execute(update_sql, (next_id, user_id, room_id)) + + if txn.rowcount == 0: + insert_sql = ( + "INSERT INTO room_tags_revisions (user_id, room_id, stream_id)" + " VALUES (?, ?, ?)" + ) + try: + txn.execute(insert_sql, (user_id, room_id, next_id)) + except database_engine.module.IntegrityError as e: + # Ignore insertion errors. It doesn't matter if the row wasn't + # inserted because if two updates happend concurrently the one + # with the higher stream_id will not be reported to a client + # unless the previous update has completed. It doesn't matter + # which stream_id ends up in the table, as long as it is higher + # than the id that the client has. + pass From a89b86dc473f5dc41a17207a17165b27d8f599ea Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 28 Oct 2015 16:45:57 +0000 Subject: [PATCH 02/10] Fix pyflakes errors --- synapse/rest/client/v2_alpha/tags.py | 2 ++ synapse/storage/tags.py | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index c4a670c5a7c7..15c9347fc18a 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -16,6 +16,7 @@ from ._base import client_v2_pattern from synapse.http.servlet import RestServlet +from synapse.api.errors import AuthError from twisted.internet import defer @@ -56,6 +57,7 @@ class TagServlet(RestServlet): PATTERN = client_v2_pattern( "/user/(?P[^/]*)/rooms/(?P[^/]*)/tags/(?P[^/]*)" ) + def __init__(self, hs): super(TagServlet, self).__init__() self.auth = hs.get_auth() diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 507e60596f2f..64c65fc32128 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -84,7 +84,7 @@ def get_updated_tags_txn(txn): results = {} if room_ids: tags_by_room = yield self.get_tags_for_user(self, user_id) - for room_id in rooms_ids: + for room_id in room_ids: results[room_id] = tags_by_room[room_id] defer.returnValue(results) @@ -117,7 +117,7 @@ def add_tag_txn(txn, next_id): ) try: txn.execute(sql, (user_id, room_id, tag)) - except database_engine.module.IntegrityError as e: + except self.database_engine.module.IntegrityError: # Return early if the row is already in the table # and we don't need to bump the revision number of the # private_user_data. @@ -180,7 +180,7 @@ def _update_revision_txn(self, txn, user_id, room_id, next_id): ) try: txn.execute(insert_sql, (user_id, room_id, next_id)) - except database_engine.module.IntegrityError as e: + except self.database_engine.module.IntegrityError: # Ignore insertion errors. It doesn't matter if the row wasn't # inserted because if two updates happend concurrently the one # with the higher stream_id will not be reported to a client From f40b0ed5e190a78ed6633505c4f437b6fddc41ee Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Oct 2015 15:20:52 +0000 Subject: [PATCH 03/10] Inform the client of new room tags using v1 /events --- synapse/handlers/private_user_data.py | 46 +++++++++++++++++++++++++++ synapse/notifier.py | 2 +- synapse/rest/client/v2_alpha/tags.py | 14 ++++++-- synapse/storage/tags.py | 16 +++++++++- synapse/streams/events.py | 5 +++ synapse/types.py | 22 +++++++------ 6 files changed, 91 insertions(+), 14 deletions(-) create mode 100644 synapse/handlers/private_user_data.py diff --git a/synapse/handlers/private_user_data.py b/synapse/handlers/private_user_data.py new file mode 100644 index 000000000000..1778c713250c --- /dev/null +++ b/synapse/handlers/private_user_data.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + + +class PrivateUserDataEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + def get_current_key(self, direction='f'): + return self.store.get_max_private_user_data_stream_id() + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_key, limit): + user_id = user.to_string() + last_stream_id = from_key + + current_stream_id = yield self.store.get_max_private_user_data_stream_id() + tags = yield self.store.get_updated_tags(user_id, last_stream_id) + + results = [] + for room_id, room_tags in tags.items(): + results.append({ + "type": "m.tag", + "content": {"tags": room_tags}, + "room_id": room_id, + }) + + defer.returnValue((results, current_stream_id)) + + @defer.inlineCallbacks + def get_pagination_rows(self, user, config, key): + defer.returnValue(([], config.to_id)) diff --git a/synapse/notifier.py b/synapse/notifier.py index f998fc83bf11..a78ee3c1e744 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -270,7 +270,7 @@ def on_new_event(self, stream_key, new_token, users=[], rooms=[], @defer.inlineCallbacks def wait_for_events(self, user, rooms, timeout, callback, - from_token=StreamToken("s0", "0", "0", "0")): + from_token=StreamToken("s0", "0", "0", "0", "0")): """Wait until the callback returns a non empty response or the timeout fires. """ diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index 15c9347fc18a..486add9909a8 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -62,6 +62,7 @@ def __init__(self, hs): super(TagServlet, self).__init__() self.auth = hs.get_auth() self.store = hs.get_datastore() + self.notifier = hs.get_notifier() @defer.inlineCallbacks def on_PUT(self, request, user_id, room_id, tag): @@ -69,9 +70,12 @@ def on_PUT(self, request, user_id, room_id, tag): if user_id != auth_user.to_string(): raise AuthError(403, "Cannot add tags for other users.") - yield self.store.add_tag_to_room(user_id, room_id, tag) + max_id = yield self.store.add_tag_to_room(user_id, room_id, tag) + + yield self.notifier.on_new_event( + "private_user_data_key", max_id, users=[user_id] + ) - # TODO: poke the notifier. defer.returnValue((200, {})) @defer.inlineCallbacks @@ -80,7 +84,11 @@ def on_DELETE(self, request, user_id, room_id, tag): if user_id != auth_user.to_string(): raise AuthError(403, "Cannot add tags for other users.") - yield self.store.remove_tag_from_room(user_id, room_id, tag) + max_id = yield self.store.remove_tag_from_room(user_id, room_id, tag) + + yield self.notifier.on_new_event( + "private_user_data_key", max_id, users=[user_id] + ) # TODO: poke the notifier. defer.returnValue((200, {})) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 64c65fc32128..2d5c49144a9d 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -31,6 +31,14 @@ def __init__(self, hs): "private_user_data_max_stream_id", "stream_id" ) + def get_max_private_user_data_stream_id(self): + """Get the current max stream id for the private user data stream + + Returns: + A deferred int. + """ + return self._private_user_data_id_gen.get_max_token(self) + @cached() def get_tags_for_user(self, user_id): """Get all the tags for a user. @@ -83,7 +91,7 @@ def get_updated_tags_txn(txn): results = {} if room_ids: - tags_by_room = yield self.get_tags_for_user(self, user_id) + tags_by_room = yield self.get_tags_for_user(user_id) for room_id in room_ids: results[room_id] = tags_by_room[room_id] @@ -129,6 +137,9 @@ def add_tag_txn(txn, next_id): self.get_tags_for_user.invalidate((user_id,)) + result = yield self._private_user_data_id_gen.get_max_token(self) + defer.returnValue(result) + @defer.inlineCallbacks def remove_tag_from_room(self, user_id, room_id, tag): """Remove a tag from a room for a user. @@ -148,6 +159,9 @@ def remove_tag_txn(txn, next_id): self.get_tags_for_user.invalidate((user_id,)) + result = yield self._private_user_data_id_gen.get_max_token(self) + defer.returnValue(result) + def _update_revision_txn(self, txn, user_id, room_id, next_id): """Update the latest revision of the tags for the given user and room. diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 699083ae1249..f0d68b5bf200 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -21,6 +21,7 @@ from synapse.handlers.room import RoomEventSource from synapse.handlers.typing import TypingNotificationEventSource from synapse.handlers.receipts import ReceiptEventSource +from synapse.handlers.private_user_data import PrivateUserDataEventSource class EventSources(object): @@ -29,6 +30,7 @@ class EventSources(object): "presence": PresenceEventSource, "typing": TypingNotificationEventSource, "receipt": ReceiptEventSource, + "private_user_data": PrivateUserDataEventSource, } def __init__(self, hs): @@ -52,5 +54,8 @@ def get_current_token(self, direction='f'): receipt_key=( yield self.sources["receipt"].get_current_key() ), + private_user_data_key=( + yield self.sources["private_user_data"].get_current_key() + ), ) defer.returnValue(token) diff --git a/synapse/types.py b/synapse/types.py index 9cffc33d27d1..8d3a8d88cc71 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -98,10 +98,13 @@ class EventID(DomainSpecificString): class StreamToken( - namedtuple( - "Token", - ("room_key", "presence_key", "typing_key", "receipt_key") - ) + namedtuple("Token", ( + "room_key", + "presence_key", + "typing_key", + "receipt_key", + "private_user_data_key", + )) ): _SEPARATOR = "_" @@ -128,13 +131,14 @@ def room_stream_id(self): else: return int(self.room_key[1:].split("-")[-1]) - def is_after(self, other_token): + def is_after(self, other): """Does this token contain events that the other doesn't?""" return ( - (other_token.room_stream_id < self.room_stream_id) - or (int(other_token.presence_key) < int(self.presence_key)) - or (int(other_token.typing_key) < int(self.typing_key)) - or (int(other_token.receipt_key) < int(self.receipt_key)) + (other.room_stream_id < self.room_stream_id) + or (int(other.presence_key) < int(self.presence_key)) + or (int(other.typing_key) < int(self.typing_key)) + or (int(other.receipt_key) < int(self.receipt_key)) + or (int(other.private_user_data_key) < int(self.private_user_data_key)) ) def copy_and_advance(self, key, new_value): From fdf73c6855f2b043f1af451e77e2413049a21ab2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Oct 2015 16:22:32 +0000 Subject: [PATCH 04/10] Include room tags v1 /initialSync --- synapse/handlers/message.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 024474d5fe63..c5dce3008c7b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -322,6 +322,8 @@ def snapshot_all_rooms(self, user_id=None, pagin_config=None, user, pagination_config.get_source_config("receipt"), None ) + tags_by_room = yield self.store.get_tags_for_user(user_id) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -398,6 +400,16 @@ def handle_room(event): serialize_event(c, time_now, as_client_event) for c in current_state.values() ] + + private_user_data = [] + tags = tags_by_room.get(event.room_id) + if tags: + private_user_data.append({ + "room_id": event.room_id, + "type": "m.tag", + "content": {"tags": tags}, + }) + d["private_user_data"] = private_user_data except: logger.exception("Failed to get snapshot") From 79b65f387538d1386369fcec142770ea91fdf8a2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Oct 2015 16:28:19 +0000 Subject: [PATCH 05/10] Include tags in v1 room initial sync --- synapse/handlers/message.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c5dce3008c7b..8f156e5c841b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -459,6 +459,17 @@ def room_initial_sync(self, user_id, room_id, pagin_config=None): result = yield self._room_initial_sync_parted( user_id, room_id, pagin_config, member_event ) + + private_user_data = [] + tags = yield self.store.get_tags_for_room(user_id, room_id) + if tags: + private_user_data.append({ + "type": "m.tag", + "content": {"tags": tags}, + "room_id": room_id, + }) + result["private_user_data"] = private_user_data + defer.returnValue(result) @defer.inlineCallbacks From fb46937413cc0ccbf12063a5743ddf914cd8170a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Oct 2015 16:38:35 +0000 Subject: [PATCH 06/10] Support clients supplying older tokens, fix short poll test --- synapse/types.py | 2 +- tests/rest/client/v1/test_presence.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/types.py b/synapse/types.py index 8d3a8d88cc71..84631d177dc8 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -112,7 +112,7 @@ class StreamToken( def from_string(cls, string): try: keys = string.split(cls._SEPARATOR) - if len(keys) == len(cls._fields) - 1: + while len(keys) < len(cls._fields): # i.e. old token from before receipt_key keys.append("0") return cls(*keys) diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 29d9bbaad4b4..0e3b92224651 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -369,7 +369,7 @@ def test_shortpoll(self): # all be ours # I'll already get my own presence state change - self.assertEquals({"start": "0_1_0_0", "end": "0_1_0_0", "chunk": []}, + self.assertEquals({"start": "0_1_0_0_0", "end": "0_1_0_0_0", "chunk": []}, response ) @@ -388,7 +388,7 @@ def test_shortpoll(self): "/events?from=s0_1_0&timeout=0", None) self.assertEquals(200, code) - self.assertEquals({"start": "s0_1_0_0", "end": "s0_2_0_0", "chunk": [ + self.assertEquals({"start": "s0_1_0_0_0", "end": "s0_2_0_0_0", "chunk": [ {"type": "m.presence", "content": { "user_id": "@banana:test", From ddd8566f415ab3a6092aa4947e5d2aebf67109fc Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 2 Nov 2015 15:11:31 +0000 Subject: [PATCH 07/10] Store room tag content and return the content in the m.tag event --- synapse/handlers/message.py | 6 ++-- synapse/rest/client/v2_alpha/tags.py | 12 +++++-- synapse/storage/schema/delta/25/tags.sql | 1 + synapse/storage/tags.py | 44 +++++++++++++++--------- 4 files changed, 41 insertions(+), 22 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8f156e5c841b..0f947993d1c9 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -405,7 +405,6 @@ def handle_room(event): tags = tags_by_room.get(event.room_id) if tags: private_user_data.append({ - "room_id": event.room_id, "type": "m.tag", "content": {"tags": tags}, }) @@ -466,7 +465,6 @@ def room_initial_sync(self, user_id, room_id, pagin_config=None): private_user_data.append({ "type": "m.tag", "content": {"tags": tags}, - "room_id": room_id, }) result["private_user_data"] = private_user_data @@ -499,8 +497,8 @@ def _room_initial_sync_parted(self, user_id, room_id, pagin_config, user_id, messages ) - start_token = StreamToken(token[0], 0, 0, 0) - end_token = StreamToken(token[1], 0, 0, 0) + start_token = StreamToken(token[0], 0, 0, 0, 0) + end_token = StreamToken(token[1], 0, 0, 0, 0) time_now = self.clock.time_msec() diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index 486add9909a8..4e3f917fc5eb 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -16,12 +16,14 @@ from ._base import client_v2_pattern from synapse.http.servlet import RestServlet -from synapse.api.errors import AuthError +from synapse.api.errors import AuthError, SynapseError from twisted.internet import defer import logging +import simplejson as json + logger = logging.getLogger(__name__) @@ -70,7 +72,13 @@ def on_PUT(self, request, user_id, room_id, tag): if user_id != auth_user.to_string(): raise AuthError(403, "Cannot add tags for other users.") - max_id = yield self.store.add_tag_to_room(user_id, room_id, tag) + try: + content_bytes = request.content.read() + body = json.loads(content_bytes) + except: + raise SynapseError(400, "Invalid tag JSON") + + max_id = yield self.store.add_tag_to_room(user_id, room_id, tag, body) yield self.notifier.on_new_event( "private_user_data_key", max_id, users=[user_id] diff --git a/synapse/storage/schema/delta/25/tags.sql b/synapse/storage/schema/delta/25/tags.sql index 168766dcf396..527424c99808 100644 --- a/synapse/storage/schema/delta/25/tags.sql +++ b/synapse/storage/schema/delta/25/tags.sql @@ -18,6 +18,7 @@ CREATE TABLE IF NOT EXISTS room_tags( user_id TEXT NOT NULL, room_id TEXT NOT NULL, tag TEXT NOT NULL, -- The name of the tag. + content TEXT NOT NULL, -- The JSON content of the tag. CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag) ); diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 2d5c49144a9d..34aa38c06afe 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -18,6 +18,7 @@ from twisted.internet import defer from .util.id_generators import StreamIdGenerator +import ujson as json import logging logger = logging.getLogger(__name__) @@ -52,14 +53,15 @@ def get_tags_for_user(self, user_id): """ deferred = self._simple_select_list( - "room_tags", {"user_id": user_id}, ["room_id", "tag"] + "room_tags", {"user_id": user_id}, ["room_id", "tag", "content"] ) @deferred.addCallback def tags_by_room(rows): tags_by_room = {} for row in rows: - tags_by_room.setdefault(row["room_id"], []).append(row["tag"]) + room_tags = tags_by_room.setdefault(row["room_id"], {}) + room_tags[row["tag"]] = json.loads(row["content"]) return tags_by_room return deferred @@ -105,31 +107,41 @@ def get_tags_for_room(self, user_id, room_id): Returns: A deferred list of string tags. """ - return self._simple_select_onecol( + return self._simple_select_list( table="room_tags", keyvalues={"user_id": user_id, "room_id": room_id}, - retcol="tag", + retcols=("tag", "content"), desc="get_tags_for_room", - ) + ).addCallback(lambda rows: { + row["tag"]: json.loads(row["content"]) for row in rows + }) @defer.inlineCallbacks - def add_tag_to_room(self, user_id, room_id, tag): + def add_tag_to_room(self, user_id, room_id, tag, content): """Add a tag to a room for a user. + Args: + user_id(str): The user to add a tag for. + room_id(str): The room to add a tag for. + tag(str): The tag name to add. + content(dict): A json object to associate with the tag. Returns: A deferred that completes once the tag has been added. """ + content_json = json.dumps(content) + def add_tag_txn(txn, next_id): - sql = ( - "INSERT INTO room_tags (user_id, room_id, tag)" - " VALUES (?, ?, ?)" + self._simple_upsert_txn( + txn, + table="room_tags", + keyvalues={ + "user_id": user_id, + "room_id": room_id, + "tag": tag, + }, + values={ + "content": content_json, + } ) - try: - txn.execute(sql, (user_id, room_id, tag)) - except self.database_engine.module.IntegrityError: - # Return early if the row is already in the table - # and we don't need to bump the revision number of the - # private_user_data. - return self._update_revision_txn(txn, user_id, room_id, next_id) with (yield self._private_user_data_id_gen.get_next(self)) as next_id: From 57be722c461f7727153d9563f20620f5a0549f5b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 2 Nov 2015 16:23:15 +0000 Subject: [PATCH 08/10] Include room tags in v2 /sync --- synapse/api/filtering.py | 7 +++ synapse/handlers/sync.py | 69 +++++++++++++++++++++++----- synapse/rest/client/v2_alpha/sync.py | 5 ++ 3 files changed, 70 insertions(+), 11 deletions(-) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index eb15d8c54a3e..e4e3d1c59d3c 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -147,6 +147,10 @@ def __init__(self, filter_json): self.filter_json.get("room", {}).get("ephemeral", {}) ) + self.room_private_user_data = Filter( + self.filter_json.get("room", {}).get("private_user_data", {}) + ) + self.presence_filter = Filter( self.filter_json.get("presence", {}) ) @@ -172,6 +176,9 @@ def filter_room_timeline(self, events): def filter_room_ephemeral(self, events): return self.room_ephemeral_filter.filter(events) + def filter_room_private_user_data(self, events): + return self.room_private_user_data.filter(events) + class Filter(object): def __init__(self, filter_json): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4c5a2353b2be..ea524fb67358 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -51,6 +51,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "timeline", "state", "ephemeral", + "private_user_data", ])): __slots__ = [] @@ -58,13 +59,19 @@ def __nonzero__(self): """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ - return bool(self.timeline or self.state or self.ephemeral) + return bool( + self.timeline + or self.state + or self.ephemeral + or self.private_user_data + ) class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ "room_id", "timeline", "state", + "private_user_data", ])): __slots__ = [] @@ -72,7 +79,11 @@ def __nonzero__(self): """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ - return bool(self.timeline or self.state) + return bool( + self.timeline + or self.state + or self.private_user_data + ) class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ @@ -197,6 +208,10 @@ def full_state_sync(self, sync_config, timeline_since_token): ) ) + tags_by_room = yield self.store.get_tags_for_user( + sync_config.user.to_string() + ) + joined = [] invited = [] archived = [] @@ -207,7 +222,8 @@ def full_state_sync(self, sync_config, timeline_since_token): sync_config=sync_config, now_token=now_token, timeline_since_token=timeline_since_token, - typing_by_room=typing_by_room + typing_by_room=typing_by_room, + tags_by_room=tags_by_room, ) joined.append(room_sync) elif event.membership == Membership.INVITE: @@ -226,6 +242,7 @@ def full_state_sync(self, sync_config, timeline_since_token): leave_event_id=event.event_id, leave_token=leave_token, timeline_since_token=timeline_since_token, + tags_by_room=tags_by_room, ) archived.append(room_sync) @@ -240,7 +257,7 @@ def full_state_sync(self, sync_config, timeline_since_token): @defer.inlineCallbacks def full_state_sync_for_joined_room(self, room_id, sync_config, now_token, timeline_since_token, - typing_by_room): + typing_by_room, tags_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -260,8 +277,21 @@ def full_state_sync_for_joined_room(self, room_id, sync_config, timeline=batch, state=current_state_events, ephemeral=typing_by_room.get(room_id, []), + private_user_data=self.private_user_data_for_room( + room_id, tags_by_room + ), )) + def private_user_data_for_room(self, room_id, tags_by_room): + private_user_data = [] + tags = tags_by_room.get(room_id) + if tags: + private_user_data.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + return private_user_data + @defer.inlineCallbacks def typing_by_room(self, sync_config, now_token, since_token=None): """Get the typing events for each room the user is in @@ -296,7 +326,7 @@ def typing_by_room(self, sync_config, now_token, since_token=None): @defer.inlineCallbacks def full_state_sync_for_archived_room(self, room_id, sync_config, leave_event_id, leave_token, - timeline_since_token): + timeline_since_token, tags_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -314,6 +344,9 @@ def full_state_sync_for_archived_room(self, room_id, sync_config, room_id=room_id, timeline=batch, state=leave_state[leave_event_id].values(), + private_user_data=self.private_user_data_for_room( + room_id, tags_by_room + ), )) @defer.inlineCallbacks @@ -359,6 +392,11 @@ def incremental_sync_with_gap(self, sync_config, since_token): limit=timeline_limit + 1, ) + tags_by_room = yield self.store.get_updated_tags( + sync_config.user.to_string(), + since_token.private_user_data_key, + ) + joined = [] archived = [] if len(room_events) <= timeline_limit: @@ -399,7 +437,10 @@ def incremental_sync_with_gap(self, sync_config, since_token): limited=limited, ), state=state, - ephemeral=typing_by_room.get(room_id, []) + ephemeral=typing_by_room.get(room_id, []), + private_user_data=self.private_user_data_for_room( + room_id, tags_by_room + ), ) if room_sync: joined.append(room_sync) @@ -416,14 +457,14 @@ def incremental_sync_with_gap(self, sync_config, since_token): for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, - typing_by_room + typing_by_room, tags_by_room ) if room_sync: joined.append(room_sync) for leave_event in leave_events: room_sync = yield self.incremental_sync_for_archived_room( - sync_config, leave_event, since_token + sync_config, leave_event, since_token, tags_by_room ) archived.append(room_sync) @@ -487,7 +528,7 @@ def load_filtered_recents(self, room_id, sync_config, now_token, @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, - typing_by_room): + typing_by_room, tags_by_room): """ Get the incremental delta needed to bring the client up to date for the room. Gives the client the most recent events and the changes to state. @@ -528,7 +569,10 @@ def incremental_sync_with_gap_for_room(self, room_id, sync_config, room_id=room_id, timeline=batch, state=state_events_delta, - ephemeral=typing_by_room.get(room_id, []) + ephemeral=typing_by_room.get(room_id, []), + private_user_data=self.private_user_data_for_room( + room_id, tags_by_room + ), ) logging.debug("Room sync: %r", room_sync) @@ -537,7 +581,7 @@ def incremental_sync_with_gap_for_room(self, room_id, sync_config, @defer.inlineCallbacks def incremental_sync_for_archived_room(self, sync_config, leave_event, - since_token): + since_token, tags_by_room): """ Get the incremental delta needed to bring the client up to date for the archived room. Returns: @@ -578,6 +622,9 @@ def incremental_sync_for_archived_room(self, sync_config, leave_event, room_id=leave_event.room_id, timeline=batch, state=state_events_delta, + private_user_data=self.private_user_data_for_room( + leave_event.room_id, tags_by_room + ), ) logging.debug("Room sync: %r", room_sync) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 1840eef7752e..32a1087c913b 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -220,6 +220,10 @@ def encode_room(room, filter, time_now, token_id, joined=True): ) timeline_event_ids.append(event.event_id) + private_user_data = filter.filter_room_private_user_data( + room.private_user_data + ) + result = { "event_map": event_map, "timeline": { @@ -228,6 +232,7 @@ def encode_room(room, filter, time_now, token_id, joined=True): "limited": room.timeline.limited, }, "state": {"events": state_event_ids}, + "private_user_data": {"events": private_user_data}, } if joined: From 5897e773fd50733fa448b36275130e5441d36f31 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 3 Nov 2015 14:27:35 +0000 Subject: [PATCH 09/10] Spell "deferred" more correctly --- synapse/storage/tags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 34aa38c06afe..641ea250f05b 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -156,7 +156,7 @@ def add_tag_txn(txn, next_id): def remove_tag_from_room(self, user_id, room_id, tag): """Remove a tag from a room for a user. Returns: - A deffered that completes once the tag has been removed + A deferred that completes once the tag has been removed """ def remove_tag_txn(txn, next_id): sql = ( From 06986e46a3e5187d339aa64d061742cdac0e33a8 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 3 Nov 2015 14:28:51 +0000 Subject: [PATCH 10/10] That TODO was done --- synapse/rest/client/v2_alpha/tags.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index 4e3f917fc5eb..dcfe6bd20ebf 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -98,7 +98,6 @@ def on_DELETE(self, request, user_id, room_id, tag): "private_user_data_key", max_id, users=[user_id] ) - # TODO: poke the notifier. defer.returnValue((200, {}))