Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add groups to sync stream
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jul 20, 2017
1 parent b3bf6a1 commit c544188
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 8 deletions.
64 changes: 63 additions & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ def __nonzero__(self):
return True


class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
"join",
"invite",
"leave",
])):
__slots__ = []

def __nonzero__(self):
return self.join or self.invite or self.leave


class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
"presence", # List of presence events for the user.
Expand All @@ -119,6 +130,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
"device_lists", # List of user_ids whose devices have chanegd
"device_one_time_keys_count", # Dict of algorithm to count for one time keys
# for this device
"groups",
])):
__slots__ = []

Expand All @@ -134,7 +146,8 @@ def __nonzero__(self):
self.archived or
self.account_data or
self.to_device or
self.device_lists
self.device_lists or
self.groups
)


Expand Down Expand Up @@ -560,6 +573,8 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
user_id, device_id
)

yield self._generate_sync_entry_for_groups(sync_result_builder)

defer.returnValue(SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
Expand All @@ -568,10 +583,56 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
archived=sync_result_builder.archived,
to_device=sync_result_builder.to_device,
device_lists=device_lists,
groups=sync_result_builder.groups,
device_one_time_keys_count=one_time_key_counts,
next_batch=sync_result_builder.now_token,
))

@measure_func("_generate_sync_entry_for_groups")
@defer.inlineCallbacks
def _generate_sync_entry_for_groups(self, sync_result_builder):
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token

if since_token and since_token.groups_key:
results = yield self.store.get_groups_changes_for_user(
user_id, since_token.groups_key, now_token.groups_key,
)
else:
results = yield self.store.get_all_groups_for_user(
user_id, now_token.groups_key,
)

invited = {}
joined = {}
left = {}
for result in results:
membership = result["membership"]
group_id = result["group_id"]
gtype = result["type"]
content = result["content"]

if membership == "join":
if gtype == "membership":
content.pop("membership", None)
invited[group_id] = content["content"]
else:
joined.setdefault(group_id, {})[gtype] = content
elif membership == "invite":
if gtype == "membership":
content.pop("membership", None)
invited[group_id] = content["content"]
else:
if gtype == "membership":
left[group_id] = content["content"]

sync_result_builder.groups = GroupsSyncResult(
join=joined,
invite=invited,
leave=left,
)

@measure_func("_generate_sync_entry_for_device_list")
@defer.inlineCallbacks
def _generate_sync_entry_for_device_list(self, sync_result_builder):
Expand Down Expand Up @@ -1260,6 +1321,7 @@ def __init__(self, sync_config, full_state, since_token, now_token):
self.invited = []
self.archived = []
self.device = []
self.groups = None


class RoomSyncResultBuilder(object):
Expand Down
5 changes: 5 additions & 0 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ def encode_response(time_now, sync_result, access_token_id, filter):
"invite": invited,
"leave": archived,
},
"groups": {
"join": sync_result.groups.join,
"invite": sync_result.groups.invite,
"leave": sync_result.groups.leave,
},
"device_one_time_keys_count": sync_result.device_one_time_keys_count,
"next_batch": sync_result.next_batch.to_string(),
}
Expand Down
15 changes: 15 additions & 0 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def __init__(self, db_conn, hs):
db_conn, "pushers", "id",
extra_tables=[("deleted_pushers", "stream_id")],
)
self._group_updates_id_gen = StreamIdGenerator(
db_conn, "local_group_updates", "stream_id",
)

if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = StreamIdGenerator(
Expand Down Expand Up @@ -236,6 +239,18 @@ def __init__(self, db_conn, hs):
prefilled_cache=curr_state_delta_prefill,
)

_group_updates_prefill, min_group_updates_id = self._get_cache_dict(
db_conn, "local_group_updates",
entity_column="user_id",
stream_column="stream_id",
max_value=self._group_updates_id_gen.get_current_token(),
limit=1000,
)
self._group_updates_stream_cache = StreamChangeCache(
"_group_updates_stream_cache", min_group_updates_id,
prefilled_cache=_group_updates_prefill,
)

cur = LoggingTransaction(
db_conn.cursor(),
name="_find_stream_orderings_for_times_txn",
Expand Down
68 changes: 63 additions & 5 deletions synapse/storage/group_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ def register_user_group_membership(self, group_id, user_id, membership,
remote_attestation (dict): If remote group then store the remote
attestation from the group, else None.
"""
def _register_user_group_membership_txn(txn):
def _register_user_group_membership_txn(txn, next_id):
# TODO: Upsert?
self._simple_delete_txn(
txn,
Expand All @@ -798,6 +798,19 @@ def _register_user_group_membership_txn(txn):
},
)

self._simple_insert_txn(
txn,
table="local_group_updates",
values={
"stream_id": next_id,
"group_id": group_id,
"user_id": user_id,
"type": "membership",
"content": json.dumps({"membership": membership, "content": content}),
}
)
self._group_updates_stream_cache.entity_has_changed(user_id, next_id)

# TODO: Insert profile to ensure it comes down stream if its a join.

if membership == "join":
Expand Down Expand Up @@ -840,10 +853,11 @@ def _register_user_group_membership_txn(txn):
},
)

yield self.runInteraction(
"register_user_group_membership",
_register_user_group_membership_txn,
)
with self._group_updates_id_gen.get_next() as next_id:
yield self.runInteraction(
"register_user_group_membership",
_register_user_group_membership_txn, next_id,
)

@defer.inlineCallbacks
def create_group(self, group_id, user_id, name, avatar_url, short_description,
Expand Down Expand Up @@ -937,3 +951,47 @@ def get_joined_groups(self, user_id):
retcol="group_id",
desc="get_joined_groups",
)

def get_all_groups_for_user(self, user_id, now_token):
def _get_all_groups_for_user_txn(txn):
sql = """
SELECT group_id, type, membership, u.content
FROM local_group_updates AS u
INNER JOIN local_group_membership USING (group_id, user_id)
WHERE user_id = ? AND membership != 'leave'
AND stream_id <= ?
"""
txn.execute(sql, (user_id, now_token,))
return self.cursor_to_dict(txn)
return self.runInteraction(
"get_all_groups_for_user", _get_all_groups_for_user_txn,
)

def get_groups_changes_for_user(self, user_id, from_token, to_token):
from_token = int(from_token)
has_changed = self._group_updates_stream_cache.has_entity_changed(
user_id, from_token,
)
if not has_changed:
return []

def _get_groups_changes_for_user_txn(txn):
sql = """
SELECT group_id, membership, type, u.content
FROM local_group_updates AS u
INNER JOIN local_group_membership USING (group_id, user_id)
WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
"""
txn.execute(sql, (user_id, from_token, to_token,))
return [{
"group_id": group_id,
"membership": membership,
"type": gtype,
"content": json.loads(content_json),
} for group_id, membership, gtype, content_json in txn]
return self.runInteraction(
"get_groups_changes_for_user", _get_groups_changes_for_user_txn,
)

def get_group_stream_token(self):
return self._group_updates_id_gen.get_current_token()
9 changes: 9 additions & 0 deletions synapse/storage/schema/delta/43/group_server.sql
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,12 @@ CREATE TABLE local_group_membership (

CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id);
CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id);


CREATE TABLE local_group_updates (
stream_id BIGINT NOT NULL,
group_id TEXT NOT NULL,
user_id TEXT NOT NULL,
type TEXT NOT NULL,
content TEXT NOT NULL
);
2 changes: 2 additions & 0 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def get_current_token(self):
push_rules_key, _ = self.store.get_push_rules_stream_token()
to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token()
groups_key = self.store.get_group_stream_token()

token = StreamToken(
room_key=(
Expand All @@ -65,6 +66,7 @@ def get_current_token(self):
push_rules_key=push_rules_key,
to_device_key=to_device_key,
device_list_key=device_list_key,
groups_key=groups_key,
)
defer.returnValue(token)

Expand Down
2 changes: 2 additions & 0 deletions synapse/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class StreamToken(
"push_rules_key",
"to_device_key",
"device_list_key",
"groups_key",
))
):
_SEPARATOR = "_"
Expand Down Expand Up @@ -209,6 +210,7 @@ def is_after(self, other):
or (int(other.push_rules_key) < int(self.push_rules_key))
or (int(other.to_device_key) < int(self.to_device_key))
or (int(other.device_list_key) < int(self.device_list_key))
or (int(other.groups_key) < int(self.groups_key))
)

def copy_and_advance(self, key, new_value):
Expand Down
4 changes: 2 additions & 2 deletions tests/rest/client/v1/test_rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ def _insert_client_ip(*args, **kwargs):

@defer.inlineCallbacks
def test_topo_token_is_accepted(self):
token = "t1-0_0_0_0_0_0_0_0"
token = "t1-0_0_0_0_0_0_0_0_0"
(code, response) = yield self.mock_resource.trigger_get(
"/rooms/%s/messages?access_token=x&from=%s" %
(self.room_id, token))
Expand All @@ -1044,7 +1044,7 @@ def test_topo_token_is_accepted(self):

@defer.inlineCallbacks
def test_stream_token_is_accepted_for_fwd_pagianation(self):
token = "s0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0"
(code, response) = yield self.mock_resource.trigger_get(
"/rooms/%s/messages?access_token=x&from=%s" %
(self.room_id, token))
Expand Down

0 comments on commit c544188

Please sign in to comment.