Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert replication code to async/await. #7987

Merged
merged 2 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7987.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
2 changes: 1 addition & 1 deletion synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ def register_with_store(
address (str|None): the IP address used to perform the registration.
Returns:
Deferred
Awaitable
"""
if self.hs.config.worker_app:
return self._register_client(
Expand Down
18 changes: 7 additions & 11 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
from inspect import signature
from typing import Dict, List, Tuple

from twisted.internet import defer

from synapse.api.errors import (
CodeMessageException,
HttpResponseException,
Expand Down Expand Up @@ -101,7 +99,7 @@ def __init__(self, hs):
assert self.METHOD in ("PUT", "POST", "GET")

@abc.abstractmethod
def _serialize_payload(**kwargs):
async def _serialize_payload(**kwargs):
"""Static method that is called when creating a request.

Concrete implementations should have explicit parameters (rather than
Expand All @@ -110,9 +108,8 @@ def _serialize_payload(**kwargs):
argument list.

Returns:
Deferred[dict]|dict: If POST/PUT request then dictionary must be
JSON serialisable, otherwise must be appropriate for adding as
query args.
dict: If POST/PUT request then dictionary must be JSON serialisable,
otherwise must be appropriate for adding as query args.
"""
return {}

Expand Down Expand Up @@ -144,8 +141,7 @@ def make_client(cls, hs):
instance_map = hs.config.worker.instance_map

@trace(opname="outgoing_replication_request")
@defer.inlineCallbacks
def send_request(instance_name="master", **kwargs):
async def send_request(instance_name="master", **kwargs):
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master":
Expand All @@ -159,7 +155,7 @@ def send_request(instance_name="master", **kwargs):
"Instance %r not in 'instance_map' config" % (instance_name,)
)

data = yield cls._serialize_payload(**kwargs)
data = await cls._serialize_payload(**kwargs)

url_args = [
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
Expand Down Expand Up @@ -197,7 +193,7 @@ def send_request(instance_name="master", **kwargs):
headers = {} # type: Dict[bytes, List[bytes]]
inject_active_span_byte_dict(headers, None, check_destination=False)
try:
result = yield request_func(uri, data, headers=headers)
result = await request_func(uri, data, headers=headers)
break
except CodeMessageException as e:
if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
Expand All @@ -207,7 +203,7 @@ def send_request(instance_name="master", **kwargs):

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield clock.sleep(1)
await clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/http/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(self, hs):
self.clock = hs.get_clock()

@staticmethod
def _serialize_payload(user_id):
async def _serialize_payload(user_id):
return {}

async def _handle_request(self, request, user_id):
Expand Down
17 changes: 6 additions & 11 deletions synapse/replication/http/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import make_event_from_dict
from synapse.events.snapshot import EventContext
Expand Down Expand Up @@ -67,8 +65,7 @@ def __init__(self, hs):
self.federation_handler = hs.get_handlers().federation_handler

@staticmethod
@defer.inlineCallbacks
def _serialize_payload(store, event_and_contexts, backfilled):
async def _serialize_payload(store, event_and_contexts, backfilled):
"""
Args:
store
Expand All @@ -78,9 +75,7 @@ def _serialize_payload(store, event_and_contexts, backfilled):
"""
event_payloads = []
for event, context in event_and_contexts:
serialized_context = yield defer.ensureDeferred(
context.serialize(event, store)
)
serialized_context = await context.serialize(event, store)

event_payloads.append(
{
Expand Down Expand Up @@ -156,7 +151,7 @@ def __init__(self, hs):
self.registry = hs.get_federation_registry()

@staticmethod
def _serialize_payload(edu_type, origin, content):
async def _serialize_payload(edu_type, origin, content):
return {"origin": origin, "content": content}

async def _handle_request(self, request, edu_type):
Expand Down Expand Up @@ -199,7 +194,7 @@ def __init__(self, hs):
self.registry = hs.get_federation_registry()

@staticmethod
def _serialize_payload(query_type, args):
async def _serialize_payload(query_type, args):
"""
Args:
query_type (str)
Expand Down Expand Up @@ -240,7 +235,7 @@ def __init__(self, hs):
self.store = hs.get_datastore()

@staticmethod
def _serialize_payload(room_id, args):
async def _serialize_payload(room_id, args):
"""
Args:
room_id (str)
Expand Down Expand Up @@ -275,7 +270,7 @@ def __init__(self, hs):
self.store = hs.get_datastore()

@staticmethod
def _serialize_payload(room_id, room_version):
async def _serialize_payload(room_id, room_version):
return {"room_version": room_version.identifier}

async def _handle_request(self, request, room_id):
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/http/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, hs):
self.registration_handler = hs.get_registration_handler()

@staticmethod
def _serialize_payload(user_id, device_id, initial_display_name, is_guest):
async def _serialize_payload(user_id, device_id, initial_display_name, is_guest):
"""
Args:
device_id (str|None): Device ID to use, if None a new one is
Expand Down
8 changes: 5 additions & 3 deletions synapse/replication/http/membership.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def __init__(self, hs):
self.clock = hs.get_clock()

@staticmethod
def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content):
async def _serialize_payload(
requester, room_id, user_id, remote_room_hosts, content
):
"""
Args:
requester(Requester)
Expand Down Expand Up @@ -112,7 +114,7 @@ def __init__(self, hs: "HomeServer"):
self.member_handler = hs.get_room_member_handler()

@staticmethod
def _serialize_payload( # type: ignore
async def _serialize_payload( # type: ignore
invite_event_id: str,
txn_id: Optional[str],
requester: Requester,
Expand Down Expand Up @@ -174,7 +176,7 @@ def __init__(self, hs):
self.distributor = hs.get_distributor()

@staticmethod
def _serialize_payload(room_id, user_id, change):
async def _serialize_payload(room_id, user_id, change):
"""
Args:
room_id (str)
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/http/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, hs: "HomeServer"):
self._presence_handler = hs.get_presence_handler()

@staticmethod
def _serialize_payload(user_id):
async def _serialize_payload(user_id):
return {}

async def _handle_request(self, request, user_id):
Expand Down Expand Up @@ -92,7 +92,7 @@ def __init__(self, hs: "HomeServer"):
self._presence_handler = hs.get_presence_handler()

@staticmethod
def _serialize_payload(user_id, state, ignore_status_msg=False):
async def _serialize_payload(user_id, state, ignore_status_msg=False):
return {
"state": state,
"ignore_status_msg": ignore_status_msg,
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/http/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, hs):
self.registration_handler = hs.get_registration_handler()

@staticmethod
def _serialize_payload(
async def _serialize_payload(
user_id,
password_hash,
was_guest,
Expand Down Expand Up @@ -105,7 +105,7 @@ def __init__(self, hs):
self.registration_handler = hs.get_registration_handler()

@staticmethod
def _serialize_payload(user_id, auth_result, access_token):
async def _serialize_payload(user_id, auth_result, access_token):
"""
Args:
user_id (str): The user ID that consented
Expand Down
7 changes: 2 additions & 5 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import make_event_from_dict
from synapse.events.snapshot import EventContext
Expand Down Expand Up @@ -62,8 +60,7 @@ def __init__(self, hs):
self.clock = hs.get_clock()

@staticmethod
@defer.inlineCallbacks
def _serialize_payload(
async def _serialize_payload(
event_id, store, event, context, requester, ratelimit, extra_users
):
"""
Expand All @@ -77,7 +74,7 @@ def _serialize_payload(
extra_users (list(UserID)): Any extra users to notify about event
"""

serialized_context = yield defer.ensureDeferred(context.serialize(event, store))
serialized_context = await context.serialize(event, store)

payload = {
"event": event.get_pdu_json(),
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/http/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, hs):
self.streams = hs.get_replication_streams()

@staticmethod
def _serialize_payload(stream_name, from_token, upto_token):
async def _serialize_payload(stream_name, from_token, upto_token):
return {"from_token": from_token, "upto_token": upto_token}

async def _handle_request(self, request, stream_name):
Expand Down