Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Use stable identifiers for faster joins (#14832)
Browse files Browse the repository at this point in the history
* Use new query param when requesting a partial join

* Read new query param when serving partial join

* Provide new field names when serving partial joins

* Read new field names from partial join response

* Changelog
  • Loading branch information
David Robertson authored Jan 13, 2023
1 parent 73ff493 commit 52ae80d
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 24 deletions.
1 change: 1 addition & 0 deletions changelog.d/14832.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster joins: use stable identifiers from [MSC3706](https://github.com/matrix-org/matrix-spec-proposals/pull/3706).
2 changes: 2 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,10 +725,12 @@ async def on_send_join_request(
"state": [p.get_pdu_json(time_now) for p in state_events],
"auth_chain": [p.get_pdu_json(time_now) for p in auth_chain_events],
"org.matrix.msc3706.partial_state": caller_supports_partial_state,
"members_omitted": caller_supports_partial_state,
}

if servers_in_room is not None:
resp["org.matrix.msc3706.servers_in_room"] = list(servers_in_room)
resp["servers_in_room"] = list(servers_in_room)

return resp

Expand Down
18 changes: 18 additions & 0 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ async def send_join_v2(
if self._faster_joins_enabled:
# lazy-load state on join
query_params["org.matrix.msc3706.partial_state"] = "true"
query_params["omit_members"] = "true"

return await self.client.put_json(
destination=destination,
Expand Down Expand Up @@ -909,6 +910,14 @@ def __init__(self, room_version: RoomVersion, v1_api: bool):
use_float="True",
)
)
# The stable field name comes last, so it "wins" if the fields disagree
self._coros.append(
ijson.items_coro(
_partial_state_parser(self._response),
"members_omitted",
use_float="True",
)
)

self._coros.append(
ijson.items_coro(
Expand All @@ -918,6 +927,15 @@ def __init__(self, room_version: RoomVersion, v1_api: bool):
)
)

# Again, stable field name comes last
self._coros.append(
ijson.items_coro(
_servers_in_room_parser(self._response),
"servers_in_room",
use_float="True",
)
)

def write(self, data: bytes) -> int:
for c in self._coros:
c.send(data)
Expand Down
13 changes: 10 additions & 3 deletions synapse/federation/transport/server/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,16 @@ async def on_PUT(

partial_state = False
if self._msc3706_enabled:
partial_state = parse_boolean_from_args(
query, "org.matrix.msc3706.partial_state", default=False
)
# The stable query parameter wins, if it disagrees with the unstable
# parameter for some reason.
stable_param = parse_boolean_from_args(query, "omit_members", default=None)
if stable_param is not None:
partial_state = stable_param
else:
partial_state = parse_boolean_from_args(
query, "org.matrix.msc3706.partial_state", default=False
)

result = await self.handler.on_send_join_request(
origin, content, room_id, caller_supports_partial_state=partial_state
)
Expand Down
2 changes: 1 addition & 1 deletion tests/federation/test_federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def test_send_join_partial_state(self) -> None:
)
channel = self.make_signed_federation_request(
"PUT",
f"/_matrix/federation/v2/send_join/{self._room_id}/x?org.matrix.msc3706.partial_state=true",
f"/_matrix/federation/v2/send_join/{self._room_id}/x?omit_members=true",
content=join_event_dict,
)
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
Expand Down
77 changes: 57 additions & 20 deletions tests/federation/transport/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
# limitations under the License.

import json
from typing import List, Optional
from unittest.mock import Mock

import ijson.common

from synapse.api.room_versions import RoomVersions
from synapse.federation.transport.client import SendJoinParser
from synapse.types import JsonDict
from synapse.util import ExceptionBundle

from tests.unittest import TestCase
Expand Down Expand Up @@ -71,33 +73,68 @@ def test_two_writes(self) -> None:

def test_partial_state(self) -> None:
"""Check that the partial_state flag is correctly parsed"""
parser = SendJoinParser(RoomVersions.V1, False)
response = {
"org.matrix.msc3706.partial_state": True,
}

serialised_response = json.dumps(response).encode()
def parse(response: JsonDict) -> bool:
parser = SendJoinParser(RoomVersions.V1, False)
serialised_response = json.dumps(response).encode()

# Send data to the parser
parser.write(serialised_response)
# Send data to the parser
parser.write(serialised_response)

# Retrieve and check the parsed SendJoinResponse
parsed_response = parser.finish()
self.assertTrue(parsed_response.partial_state)
# Retrieve and check the parsed SendJoinResponse
parsed_response = parser.finish()
return parsed_response.partial_state

def test_servers_in_room(self) -> None:
"""Check that the servers_in_room field is correctly parsed"""
parser = SendJoinParser(RoomVersions.V1, False)
response = {"org.matrix.msc3706.servers_in_room": ["hs1", "hs2"]}
self.assertTrue(parse({"members_omitted": True}))
self.assertTrue(parse({"org.matrix.msc3706.partial_state": True}))

serialised_response = json.dumps(response).encode()
self.assertFalse(parse({"members_omitted": False}))
self.assertFalse(parse({"org.matrix.msc3706.partial_state": False}))

# Send data to the parser
parser.write(serialised_response)
# If there's a conflict, the stable field wins.
self.assertTrue(
parse({"members_omitted": True, "org.matrix.msc3706.partial_state": False})
)
self.assertFalse(
parse({"members_omitted": False, "org.matrix.msc3706.partial_state": True})
)

# Retrieve and check the parsed SendJoinResponse
parsed_response = parser.finish()
self.assertEqual(parsed_response.servers_in_room, ["hs1", "hs2"])
def test_servers_in_room(self) -> None:
"""Check that the servers_in_room field is correctly parsed"""

def parse(response: JsonDict) -> Optional[List[str]]:
parser = SendJoinParser(RoomVersions.V1, False)
serialised_response = json.dumps(response).encode()

# Send data to the parser
parser.write(serialised_response)

# Retrieve and check the parsed SendJoinResponse
parsed_response = parser.finish()
return parsed_response.servers_in_room

self.assertEqual(
parse({"org.matrix.msc3706.servers_in_room": ["hs1", "hs2"]}),
["hs1", "hs2"],
)
self.assertEqual(parse({"servers_in_room": ["example.com"]}), ["example.com"])

# If both are provided, the stable identifier should win
self.assertEqual(
parse(
{
"org.matrix.msc3706.servers_in_room": ["old"],
"servers_in_room": ["new"],
}
),
["new"],
)

# And lastly, we should be able to tell if neither field was present.
self.assertEqual(
parse({}),
None,
)

def test_errors_closing_coroutines(self) -> None:
"""Check we close all coroutines, even if closing the first raises an Exception.
Expand Down

0 comments on commit 52ae80d

Please sign in to comment.