Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add APIs for adding and removing tags from rooms #335

Merged
merged 11 commits into from
Nov 3, 2015
7 changes: 7 additions & 0 deletions synapse/api/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ def __init__(self, filter_json):
self.filter_json.get("room", {}).get("ephemeral", {})
)

self.room_private_user_data = Filter(
self.filter_json.get("room", {}).get("private_user_data", {})
)

self.presence_filter = Filter(
self.filter_json.get("presence", {})
)
Expand All @@ -172,6 +176,9 @@ def filter_room_timeline(self, events):
def filter_room_ephemeral(self, events):
return self.room_ephemeral_filter.filter(events)

def filter_room_private_user_data(self, events):
return self.room_private_user_data.filter(events)


class Filter(object):
def __init__(self, filter_json):
Expand Down
25 changes: 23 additions & 2 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ def snapshot_all_rooms(self, user_id=None, pagin_config=None,
user, pagination_config.get_source_config("receipt"), None
)

tags_by_room = yield self.store.get_tags_for_user(user_id)

public_room_ids = yield self.store.get_public_room_ids()

limit = pagin_config.limit
Expand Down Expand Up @@ -398,6 +400,15 @@ def handle_room(event):
serialize_event(c, time_now, as_client_event)
for c in current_state.values()
]

private_user_data = []
tags = tags_by_room.get(event.room_id)
if tags:
private_user_data.append({
"type": "m.tag",
"content": {"tags": tags},
})
d["private_user_data"] = private_user_data
except:
logger.exception("Failed to get snapshot")

Expand Down Expand Up @@ -447,6 +458,16 @@ def room_initial_sync(self, user_id, room_id, pagin_config=None):
result = yield self._room_initial_sync_parted(
user_id, room_id, pagin_config, member_event
)

private_user_data = []
tags = yield self.store.get_tags_for_room(user_id, room_id)
if tags:
private_user_data.append({
"type": "m.tag",
"content": {"tags": tags},
})
result["private_user_data"] = private_user_data

defer.returnValue(result)

@defer.inlineCallbacks
Expand Down Expand Up @@ -476,8 +497,8 @@ def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
user_id, messages
)

start_token = StreamToken(token[0], 0, 0, 0)
end_token = StreamToken(token[1], 0, 0, 0)
start_token = StreamToken(token[0], 0, 0, 0, 0)
end_token = StreamToken(token[1], 0, 0, 0, 0)

time_now = self.clock.time_msec()

Expand Down
46 changes: 46 additions & 0 deletions synapse/handlers/private_user_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer


class PrivateUserDataEventSource(object):
def __init__(self, hs):
self.store = hs.get_datastore()

def get_current_key(self, direction='f'):
return self.store.get_max_private_user_data_stream_id()

@defer.inlineCallbacks
def get_new_events_for_user(self, user, from_key, limit):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I'm changing this method signature on a separate branch; I imagine you'll get this in first, though

user_id = user.to_string()
last_stream_id = from_key

current_stream_id = yield self.store.get_max_private_user_data_stream_id()
tags = yield self.store.get_updated_tags(user_id, last_stream_id)

results = []
for room_id, room_tags in tags.items():
results.append({
"type": "m.tag",
"content": {"tags": room_tags},
"room_id": room_id,
})

defer.returnValue((results, current_stream_id))

@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
defer.returnValue(([], config.to_id))
69 changes: 58 additions & 11 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,39 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"timeline",
"state",
"ephemeral",
"private_user_data",
])):
__slots__ = []

def __nonzero__(self):
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.timeline or self.state or self.ephemeral)
return bool(
self.timeline
or self.state
or self.ephemeral
or self.private_user_data
)


class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
"room_id",
"timeline",
"state",
"private_user_data",
])):
__slots__ = []

def __nonzero__(self):
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.timeline or self.state)
return bool(
self.timeline
or self.state
or self.private_user_data
)


class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
Expand Down Expand Up @@ -197,6 +208,10 @@ def full_state_sync(self, sync_config, timeline_since_token):
)
)

tags_by_room = yield self.store.get_tags_for_user(
sync_config.user.to_string()
)

joined = []
invited = []
archived = []
Expand All @@ -207,7 +222,8 @@ def full_state_sync(self, sync_config, timeline_since_token):
sync_config=sync_config,
now_token=now_token,
timeline_since_token=timeline_since_token,
typing_by_room=typing_by_room
typing_by_room=typing_by_room,
tags_by_room=tags_by_room,
)
joined.append(room_sync)
elif event.membership == Membership.INVITE:
Expand All @@ -226,6 +242,7 @@ def full_state_sync(self, sync_config, timeline_since_token):
leave_event_id=event.event_id,
leave_token=leave_token,
timeline_since_token=timeline_since_token,
tags_by_room=tags_by_room,
)
archived.append(room_sync)

Expand All @@ -240,7 +257,7 @@ def full_state_sync(self, sync_config, timeline_since_token):
@defer.inlineCallbacks
def full_state_sync_for_joined_room(self, room_id, sync_config,
now_token, timeline_since_token,
typing_by_room):
typing_by_room, tags_by_room):
"""Sync a room for a client which is starting without any state
Returns:
A Deferred JoinedSyncResult.
Expand All @@ -260,8 +277,21 @@ def full_state_sync_for_joined_room(self, room_id, sync_config,
timeline=batch,
state=current_state_events,
ephemeral=typing_by_room.get(room_id, []),
private_user_data=self.private_user_data_for_room(
room_id, tags_by_room
),
))

def private_user_data_for_room(self, room_id, tags_by_room):
private_user_data = []
tags = tags_by_room.get(room_id)
if tags:
private_user_data.append({
"type": "m.tag",
"content": {"tags": tags},
})
return private_user_data

@defer.inlineCallbacks
def typing_by_room(self, sync_config, now_token, since_token=None):
"""Get the typing events for each room the user is in
Expand Down Expand Up @@ -296,7 +326,7 @@ def typing_by_room(self, sync_config, now_token, since_token=None):
@defer.inlineCallbacks
def full_state_sync_for_archived_room(self, room_id, sync_config,
leave_event_id, leave_token,
timeline_since_token):
timeline_since_token, tags_by_room):
"""Sync a room for a client which is starting without any state
Returns:
A Deferred JoinedSyncResult.
Expand All @@ -314,6 +344,9 @@ def full_state_sync_for_archived_room(self, room_id, sync_config,
room_id=room_id,
timeline=batch,
state=leave_state[leave_event_id].values(),
private_user_data=self.private_user_data_for_room(
room_id, tags_by_room
),
))

@defer.inlineCallbacks
Expand Down Expand Up @@ -359,6 +392,11 @@ def incremental_sync_with_gap(self, sync_config, since_token):
limit=timeline_limit + 1,
)

tags_by_room = yield self.store.get_updated_tags(
sync_config.user.to_string(),
since_token.private_user_data_key,
)

joined = []
archived = []
if len(room_events) <= timeline_limit:
Expand Down Expand Up @@ -399,7 +437,10 @@ def incremental_sync_with_gap(self, sync_config, since_token):
limited=limited,
),
state=state,
ephemeral=typing_by_room.get(room_id, [])
ephemeral=typing_by_room.get(room_id, []),
private_user_data=self.private_user_data_for_room(
room_id, tags_by_room
),
)
if room_sync:
joined.append(room_sync)
Expand All @@ -416,14 +457,14 @@ def incremental_sync_with_gap(self, sync_config, since_token):
for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token,
typing_by_room
typing_by_room, tags_by_room
)
if room_sync:
joined.append(room_sync)

for leave_event in leave_events:
room_sync = yield self.incremental_sync_for_archived_room(
sync_config, leave_event, since_token
sync_config, leave_event, since_token, tags_by_room
)
archived.append(room_sync)

Expand Down Expand Up @@ -487,7 +528,7 @@ def load_filtered_recents(self, room_id, sync_config, now_token,
@defer.inlineCallbacks
def incremental_sync_with_gap_for_room(self, room_id, sync_config,
since_token, now_token,
typing_by_room):
typing_by_room, tags_by_room):
""" Get the incremental delta needed to bring the client up to date for
the room. Gives the client the most recent events and the changes to
state.
Expand Down Expand Up @@ -528,7 +569,10 @@ def incremental_sync_with_gap_for_room(self, room_id, sync_config,
room_id=room_id,
timeline=batch,
state=state_events_delta,
ephemeral=typing_by_room.get(room_id, [])
ephemeral=typing_by_room.get(room_id, []),
private_user_data=self.private_user_data_for_room(
room_id, tags_by_room
),
)

logging.debug("Room sync: %r", room_sync)
Expand All @@ -537,7 +581,7 @@ def incremental_sync_with_gap_for_room(self, room_id, sync_config,

@defer.inlineCallbacks
def incremental_sync_for_archived_room(self, sync_config, leave_event,
since_token):
since_token, tags_by_room):
""" Get the incremental delta needed to bring the client up to date for
the archived room.
Returns:
Expand Down Expand Up @@ -578,6 +622,9 @@ def incremental_sync_for_archived_room(self, sync_config, leave_event,
room_id=leave_event.room_id,
timeline=batch,
state=state_events_delta,
private_user_data=self.private_user_data_for_room(
leave_event.room_id, tags_by_room
),
)

logging.debug("Room sync: %r", room_sync)
Expand Down
2 changes: 1 addition & 1 deletion synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def on_new_event(self, stream_key, new_token, users=[], rooms=[],

@defer.inlineCallbacks
def wait_for_events(self, user, rooms, timeout, callback,
from_token=StreamToken("s0", "0", "0", "0")):
from_token=StreamToken("s0", "0", "0", "0", "0")):
"""Wait until the callback returns a non empty response or the
timeout fires.
"""
Expand Down
2 changes: 2 additions & 0 deletions synapse/rest/client/v2_alpha/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
receipts,
keys,
tokenrefresh,
tags,
)

from synapse.http.server import JsonResource
Expand All @@ -44,3 +45,4 @@ def register_servlets(client_resource, hs):
receipts.register_servlets(hs, client_resource)
keys.register_servlets(hs, client_resource)
tokenrefresh.register_servlets(hs, client_resource)
tags.register_servlets(hs, client_resource)
5 changes: 5 additions & 0 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ def encode_room(room, filter, time_now, token_id, joined=True):
)
timeline_event_ids.append(event.event_id)

private_user_data = filter.filter_room_private_user_data(
room.private_user_data
)

result = {
"event_map": event_map,
"timeline": {
Expand All @@ -228,6 +232,7 @@ def encode_room(room, filter, time_now, token_id, joined=True):
"limited": room.timeline.limited,
},
"state": {"events": state_event_ids},
"private_user_data": {"events": private_user_data},
}

if joined:
Expand Down
Loading