Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Implement v2 APIs for send_join and send_leave #6349

Merged
merged 10 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6349.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement v2 APIs for the `send_join` and `send_leave` federation endpoints (as described in [MSC1802](https://github.com/matrix-org/matrix-doc/pull/1802)).
88 changes: 76 additions & 12 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,13 +664,7 @@ def check_authchain_validity(signed_auth_chain):

@defer.inlineCallbacks
def send_request(destination):
time_now = self._clock.time_msec()
_, content = yield self.transport_layer.send_join(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
content = yield self._do_send_join(destination, pdu)

logger.debug("Got content: %s", content)

Expand Down Expand Up @@ -737,6 +731,44 @@ def send_request(destination):

return self._try_destination_list("send_join", destinations, send_request)

@defer.inlineCallbacks
def _do_send_join(self, destination, pdu):
time_now = self._clock.time_msec()

try:
content = yield self.transport_layer.send_join_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)

return content
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()

# If we receive an error response that isn't a generic error, or an
# unrecognised endpoint error, we assume that the remote understands
# the v2 invite API and this is a legitimate error.
if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
raise err
else:
raise e.to_synapse_error()

logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")

resp = yield self.transport_layer.send_join_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)

# We expect the v1 API to respond with [200, content], so we only return the
# content.
return resp[1]

@defer.inlineCallbacks
def send_invite(self, destination, room_id, event_id, pdu):
room_version = yield self.store.get_room_version(room_id)
Expand Down Expand Up @@ -846,18 +878,50 @@ def send_leave(self, destinations, pdu):

@defer.inlineCallbacks
def send_request(destination):
time_now = self._clock.time_msec()
_, content = yield self.transport_layer.send_leave(
content = yield self._do_send_leave(destination, pdu)

logger.debug("Got content: %s", content)
return None

return self._try_destination_list("send_leave", destinations, send_request)

@defer.inlineCallbacks
def _do_send_leave(self, destination, pdu):
time_now = self._clock.time_msec()

try:
content = yield self.transport_layer.send_leave_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)

logger.debug("Got content: %s", content)
return None
return content
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()

return self._try_destination_list("send_leave", destinations, send_request)
# If we receive an error response that isn't a generic error, or an
# unrecognised endpoint error, we assume that the remote understands
# the v2 invite API and this is a legitimate error.
if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
raise err
else:
raise e.to_synapse_error()

logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API")

resp = yield self.transport_layer.send_leave_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)

# We expect the v1 API to respond with [200, content], so we only return the
# content.
return resp[1]

def get_public_rooms(
self,
Expand Down
15 changes: 5 additions & 10 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,15 +376,10 @@ async def on_send_join_request(self, origin, content, room_id):

res_pdus = await self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
return (
200,
{
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
"auth_chain": [
p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
],
},
)
return {
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
"auth_chain": [p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]],
}

async def on_make_leave_request(self, origin, room_id, user_id):
origin_host, _ = parse_server_name(origin)
Expand All @@ -411,7 +406,7 @@ async def on_send_leave_request(self, origin, content, room_id):
pdu = await self._check_sigs_and_hash(room_version, pdu)

await self.handler.on_send_leave_request(origin, pdu)
return 200, {}
return {}

async def on_event_auth(self, origin, room_id, event_id):
with (await self._server_linearizer.queue((origin, room_id))):
Expand Down
33 changes: 31 additions & 2 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def make_membership_event(self, destination, room_id, user_id, membership, param

@defer.inlineCallbacks
@log_function
def send_join(self, destination, room_id, event_id, content):
def send_join_v1(self, destination, room_id, event_id, content):
path = _create_v1_path("/send_join/%s/%s", room_id, event_id)

response = yield self.client.put_json(
Expand All @@ -278,7 +278,18 @@ def send_join(self, destination, room_id, event_id, content):

@defer.inlineCallbacks
@log_function
def send_leave(self, destination, room_id, event_id, content):
def send_join_v2(self, destination, room_id, event_id, content):
path = _create_v2_path("/send_join/%s/%s", room_id, event_id)

response = yield self.client.put_json(
destination=destination, path=path, data=content
)

return response

@defer.inlineCallbacks
@log_function
def send_leave_v1(self, destination, room_id, event_id, content):
path = _create_v1_path("/send_leave/%s/%s", room_id, event_id)

response = yield self.client.put_json(
Expand All @@ -294,6 +305,24 @@ def send_leave(self, destination, room_id, event_id, content):

return response

@defer.inlineCallbacks
@log_function
def send_leave_v2(self, destination, room_id, event_id, content):
path = _create_v2_path("/send_leave/%s/%s", room_id, event_id)

response = yield self.client.put_json(
destination=destination,
path=path,
data=content,
# we want to do our best to send this through. The problem is
# that if it fails, we won't retry it later, so if the remote
# server was just having a momentary blip, the room will be out of
# sync.
ignore_backoff=True,
)

return response

@defer.inlineCallbacks
@log_function
def send_invite_v1(self, destination, room_id, event_id, content):
Expand Down
32 changes: 28 additions & 4 deletions synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,9 +506,19 @@ async def on_GET(self, origin, content, query, context, user_id):
return 200, content


class FederationSendLeaveServlet(BaseFederationServlet):
class FederationV1SendLeaveServlet(BaseFederationServlet):
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"

async def on_PUT(self, origin, content, query, room_id, event_id):
content = await self.handler.on_send_leave_request(origin, content, room_id)
return 200, (200, content)


class FederationV2SendLeaveServlet(BaseFederationServlet):
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"

PREFIX = FEDERATION_V2_PREFIX

async def on_PUT(self, origin, content, query, room_id, event_id):
content = await self.handler.on_send_leave_request(origin, content, room_id)
return 200, content
Expand All @@ -521,9 +531,21 @@ async def on_GET(self, origin, content, query, context, event_id):
return await self.handler.on_event_auth(origin, context, event_id)


class FederationSendJoinServlet(BaseFederationServlet):
class FederationV1SendJoinServlet(BaseFederationServlet):
PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"

async def on_PUT(self, origin, content, query, context, event_id):
# TODO(paul): assert that context/event_id parsed from path actually
# match those given in content
content = await self.handler.on_send_join_request(origin, content, context)
return 200, (200, content)


class FederationV2SendJoinServlet(BaseFederationServlet):
PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"

PREFIX = FEDERATION_V2_PREFIX

async def on_PUT(self, origin, content, query, context, event_id):
# TODO(paul): assert that context/event_id parsed from path actually
# match those given in content
Expand Down Expand Up @@ -1367,8 +1389,10 @@ async def on_GET(self, origin, content, query, room_id):
FederationMakeJoinServlet,
FederationMakeLeaveServlet,
FederationEventServlet,
FederationSendJoinServlet,
FederationSendLeaveServlet,
FederationV1SendJoinServlet,
FederationV2SendJoinServlet,
FederationV1SendLeaveServlet,
FederationV2SendLeaveServlet,
FederationV1InviteServlet,
FederationV2InviteServlet,
FederationQueryAuthServlet,
Expand Down