From 334d4f15e478584d1ae4d7844ffc10d1f834a34c Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 12 Jun 2018 19:25:53 +1000 Subject: [PATCH 01/14] explicitly pass around a Twisted reactor, don't use the global one --- synapse/handlers/auth.py | 35 ++++++++++++++++-------- synapse/http/matrixfederationclient.py | 1 + synapse/notifier.py | 3 ++ synapse/rest/client/v1/register.py | 2 +- synapse/rest/client/v2_alpha/register.py | 2 +- synapse/rest/media/v1/media_storage.py | 3 +- synapse/server.py | 16 ++++++++--- synapse/storage/client_ips.py | 6 ++-- synapse/storage/events_worker.py | 6 ++-- synapse/util/__init__.py | 13 ++++++--- synapse/util/async.py | 18 +++++++----- synapse/util/file_consumer.py | 15 ++++++---- tests/utils.py | 7 ++++- 13 files changed, 86 insertions(+), 41 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 912136534df6..22587b5ba791 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from twisted.internet import defer, threads from ._base import BaseHandler @@ -33,6 +34,7 @@ import bcrypt import pymacaroons import simplejson +import attr import synapse.util.stringutils as stringutils @@ -425,12 +427,12 @@ def _check_msisdn(self, authdict, _): @defer.inlineCallbacks def _check_dummy_auth(self, authdict, _): - yield run_on_reactor() + yield run_on_reactor(self.hs.get_clock()) defer.returnValue(True) @defer.inlineCallbacks def _check_threepid(self, medium, authdict): - yield run_on_reactor() + yield run_on_reactor(self.hs.get_clock()) if 'threepid_creds' not in authdict: raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM) @@ -858,7 +860,11 @@ def _do_hash(): return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper, bcrypt.gensalt(self.bcrypt_rounds)) - return make_deferred_yieldable(threads.deferToThread(_do_hash)) + return make_deferred_yieldable( + threads.deferToThreadPool( + self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash + ) + ) def validate_hash(self, password, stored_hash): """Validates that self.hash(password) == stored_hash. @@ -878,16 +884,21 @@ def _do_validate_hash(): ) if stored_hash: - return make_deferred_yieldable(threads.deferToThread(_do_validate_hash)) + return make_deferred_yieldable( + threads.deferToThreadPool( + self.hs.get_reactor(), + self.hs.get_reactor().getThreadPool(), + _do_validate_hash, + ) + ) else: return defer.succeed(False) -class MacaroonGeneartor(object): - def __init__(self, hs): - self.clock = hs.get_clock() - self.server_name = hs.config.server_name - self.macaroon_secret_key = hs.config.macaroon_secret_key +@attr.s +class MacaroonGenerator(object): + + hs = attr.ib() def generate_access_token(self, user_id, extra_caveats=None): extra_caveats = extra_caveats or [] @@ -905,7 +916,7 @@ def generate_access_token(self, user_id, extra_caveats=None): def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)): macaroon = self._generate_base_macaroon(user_id) macaroon.add_first_party_caveat("type = login") - now = self.clock.time_msec() + now = self.hs.get_clock().time_msec() expiry = now + duration_in_ms macaroon.add_first_party_caveat("time < %d" % (expiry,)) return macaroon.serialize() @@ -917,9 +928,9 @@ def generate_delete_pusher_token(self, user_id): def _generate_base_macaroon(self, user_id): macaroon = pymacaroons.Macaroon( - location=self.server_name, + location=self.hs.config.server_name, identifier="key", - key=self.macaroon_secret_key) + key=self.hs.config.macaroon_secret_key) macaroon.add_first_party_caveat("gen = 1") macaroon.add_first_party_caveat("user_id = %s" % (user_id,)) return macaroon diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 993dc06e02b7..50c56ffe6ebc 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -194,6 +194,7 @@ def _request(self, destination, method, path, request_deferred, timeout / 1000. if timeout else 60, cancelled_to_request_timed_out_error, + reactor=self.hs.get_reactor() ) response = yield make_deferred_yieldable( request_deferred, diff --git a/synapse/notifier.py b/synapse/notifier.py index 6dce20a284e3..a0a4a8bb7661 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -161,6 +161,7 @@ def __init__(self, hs): self.user_to_user_stream = {} self.room_to_user_streams = {} + self.hs = hs self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() self.pending_new_room_events = [] @@ -340,6 +341,7 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None, add_timeout_to_deferred( listener.deferred, (end_time - now) / 1000., + reactor=self.hs.get_reactor() ) with PreserveLoggingContext(): yield listener.deferred @@ -561,6 +563,7 @@ def wait_for_replication(self, callback, timeout): add_timeout_to_deferred( listener.deferred.addTimeout, (end_time - now) / 1000., + reactor=self.hs.get_reactor() ) try: with PreserveLoggingContext(): diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 9b3022e0b01b..ecd059659ddf 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -423,7 +423,7 @@ def on_OPTIONS(self, request): @defer.inlineCallbacks def _do_create(self, requester, user_json): - yield run_on_reactor() + yield run_on_reactor(self.hs.get_clock()) if "localpart" not in user_json: raise SynapseError(400, "Expected 'localpart' key.") diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 5cab00aea959..0520787f37c5 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -191,7 +191,7 @@ def __init__(self, hs): @interactive_auth_handler @defer.inlineCallbacks def on_POST(self, request): - yield run_on_reactor() + yield run_on_reactor(self.hs.get_clock()) body = parse_json_object_from_request(request) diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index d23fe10b074f..eae35ab377af 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -175,7 +175,8 @@ def ensure_media_is_in_local_cache(self, file_info): res = yield provider.fetch(path, file_info) if res: with res: - consumer = BackgroundFileConsumer(open(local_path, "w")) + consumer = BackgroundFileConsumer( + open(local_path, "w"), self.hs.get_reactor()) yield res.write_to_consumer(consumer) yield consumer.wait() defer.returnValue(local_path) diff --git a/synapse/server.py b/synapse/server.py index 58dbf78437a1..5a2ca0f7a5d2 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -40,7 +40,7 @@ from synapse.federation.transaction_queue import TransactionQueue from synapse.handlers import Handlers from synapse.handlers.appservice import ApplicationServicesHandler -from synapse.handlers.auth import AuthHandler, MacaroonGeneartor +from synapse.handlers.auth import AuthHandler, MacaroonGenerator from synapse.handlers.deactivate_account import DeactivateAccountHandler from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.device import DeviceHandler @@ -165,15 +165,19 @@ def build_DEPENDENCY(self) 'server_notices_sender', ] - def __init__(self, hostname, **kwargs): + def __init__(self, hostname, reactor=None, **kwargs): """ Args: hostname : The hostname for the server. """ + if not reactor: + from twisted.internet import reactor + + self._reactor = reactor self.hostname = hostname self._building = {} - self.clock = Clock() + self.clock = Clock(reactor) self.distributor = Distributor() self.ratelimiter = Ratelimiter() @@ -186,6 +190,9 @@ def setup(self): self.datastore = DataStore(self.get_db_conn(), self) logger.info("Finished setting up.") + def get_reactor(self): + return self._reactor + def get_ip_from_request(self, request): # X-Forwarded-For is handled by our custom request type. return request.getClientIP() @@ -261,7 +268,7 @@ def build_auth_handler(self): return AuthHandler(self) def build_macaroon_generator(self): - return MacaroonGeneartor(self) + return MacaroonGenerator(self) def build_device_handler(self): return DeviceHandler(self) @@ -328,6 +335,7 @@ def build_db_pool(self): return adbapi.ConnectionPool( name, + cp_reactor=self.get_reactor(), **self.db_config.get("args", {}) ) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index ce338514e8a0..968d2fed2231 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -15,7 +15,7 @@ import logging -from twisted.internet import defer, reactor +from twisted.internet import defer from ._base import Cache from . import background_updates @@ -70,7 +70,9 @@ def __init__(self, db_conn, hs): self._client_ip_looper = self._clock.looping_call( self._update_client_ips_batch, 5 * 1000 ) - reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch) + self.hs.get_reactor().addSystemEventTrigger( + "before", "shutdown", self._update_client_ips_batch + ) def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id, now=None): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 32d9d00ffbd5..38fcf7d444f6 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -14,7 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from twisted.internet import defer, reactor +from twisted.internet import defer from synapse.events import FrozenEvent from synapse.events.utils import prune_event @@ -265,7 +265,7 @@ def fire(lst, res): except Exception: logger.exception("Failed to callback") with PreserveLoggingContext(): - reactor.callFromThread(fire, event_list, row_dict) + self.hs.get_reactor().callFromThread(fire, event_list, row_dict) except Exception as e: logger.exception("do_fetch") @@ -278,7 +278,7 @@ def fire(evs): if event_list: with PreserveLoggingContext(): - reactor.callFromThread(fire, event_list) + self.hs.get_reactor().callFromThread(fire, event_list) @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index fc11e2662379..9ef7c48ea238 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -15,9 +15,8 @@ from synapse.util.logcontext import PreserveLoggingContext -from twisted.internet import defer, reactor, task +from twisted.internet import defer, task -import time import logging from itertools import islice @@ -38,9 +37,14 @@ class Clock(object): TODO(paul): Also move the sleep() functionality into it """ + def __init__(self, reactor=None): + if not reactor: + from twisted.internet import reactor + self._reactor = reactor + def time(self): """Returns the current system time in seconds since epoch.""" - return time.time() + return self._reactor.seconds() def time_msec(self): """Returns the current system time in miliseconds since epoch.""" @@ -56,6 +60,7 @@ def looping_call(self, f, msec): msec(float): How long to wait between calls in milliseconds. """ call = task.LoopingCall(f) + call.clock = self._reactor call.start(msec / 1000.0, now=False) return call @@ -73,7 +78,7 @@ def wrapped_callback(*args, **kwargs): callback(*args, **kwargs) with PreserveLoggingContext(): - return reactor.callLater(delay, wrapped_callback, *args, **kwargs) + return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer, ignore_errs=False): try: diff --git a/synapse/util/async.py b/synapse/util/async.py index 9dd4e6b5bca5..3484b21dbc93 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -14,14 +14,14 @@ # limitations under the License. -from twisted.internet import defer, reactor +from twisted.internet import defer from twisted.internet.defer import CancelledError from twisted.python import failure from .logcontext import ( PreserveLoggingContext, make_deferred_yieldable, run_in_background ) -from synapse.util import logcontext, unwrapFirstError +from synapse.util import logcontext, unwrapFirstError, Clock from contextlib import contextmanager @@ -33,19 +33,23 @@ @defer.inlineCallbacks -def sleep(seconds): +def sleep(seconds, clock=None): + if not clock: + from twisted.internet import reactor + clock = Clock(reactor) + d = defer.Deferred() with PreserveLoggingContext(): - reactor.callLater(seconds, d.callback, seconds) + clock._reactor.callLater(seconds, d.callback, seconds) res = yield d defer.returnValue(res) -def run_on_reactor(): +def run_on_reactor(clock=None): """ This will cause the rest of the function to be invoked upon the next iteration of the main loop """ - return sleep(0) + return sleep(0, clock=clock) class ObservableDeferred(object): @@ -404,7 +408,7 @@ class DeferredTimeoutError(Exception): """ -def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None): +def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None, reactor=None): """ Add a timeout to a deferred by scheduling it to be cancelled after timeout seconds. diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 3380970e4e3a..e57769978d5c 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import threads, reactor +from twisted.internet import threads from synapse.util.logcontext import make_deferred_yieldable, run_in_background @@ -34,9 +34,11 @@ class BackgroundFileConsumer(object): # And resume once the size of the queue is less than this _RESUME_ON_QUEUE_SIZE = 2 - def __init__(self, file_obj): + def __init__(self, file_obj, reactor): self._file_obj = file_obj + self._reactor = reactor + # Producer we're registered with self._producer = None @@ -71,7 +73,10 @@ def registerProducer(self, producer, streaming): self._producer = producer self.streaming = streaming self._finished_deferred = run_in_background( - threads.deferToThread, self._writer + threads.deferToThreadPool, + self._reactor, + self._reactor.threadpool, + self._writer, ) if not streaming: self._producer.resumeProducing() @@ -109,7 +114,7 @@ def _writer(self): # producer. if self._producer and self._paused_producer: if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: - reactor.callFromThread(self._resume_paused_producer) + self._runreactor.callFromThread(self._resume_paused_producer) bytes = self._bytes_queue.get() @@ -121,7 +126,7 @@ def _writer(self): # If its a pull producer then we need to explicitly ask for # more stuff. if not self.streaming and self._producer: - reactor.callFromThread(self._producer.resumeProducing) + self._reactor.callFromThread(self._producer.resumeProducing) except Exception as e: self._write_exception = e raise diff --git a/tests/utils.py b/tests/utils.py index 262c4a5714c7..189fd2711cc7 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -37,11 +37,15 @@ @defer.inlineCallbacks -def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): +def setup_test_homeserver(name="test", datastore=None, config=None, reactor=None, + **kargs): """Setup a homeserver suitable for running tests against. Keyword arguments are passed to the Homeserver constructor. If no datastore is supplied a datastore backed by an in-memory sqlite db will be given to the HS. """ + if reactor is None: + from twisted.internet import reactor + if config is None: config = Mock() config.signing_key = [MockKey()] @@ -110,6 +114,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): database_engine=db_engine, room_list_handler=object(), tls_server_context_factory=Mock(), + reactor=reactor, **kargs ) db_conn = hs.get_db_conn() From 6f2b56b0b18a20bd95bd8d2fc5e629c5bb762500 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 12 Jun 2018 19:34:58 +1000 Subject: [PATCH 02/14] move sleep into the clock --- synapse/util/__init__.py | 27 +++++++++++++++------------ synapse/util/async.py | 19 +++++-------------- 2 files changed, 20 insertions(+), 26 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 9ef7c48ea238..2a3df7c71de6 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,13 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import PreserveLoggingContext +import logging +from itertools import islice +import attr from twisted.internet import defer, task -import logging - -from itertools import islice +from synapse.util.logcontext import PreserveLoggingContext logger = logging.getLogger(__name__) @@ -30,17 +30,20 @@ def unwrapFirstError(failure): return failure.value.subFailure +@attr.s class Clock(object): - """A small utility that obtains current time-of-day so that time may be - mocked during unit-tests. - - TODO(paul): Also move the sleep() functionality into it """ + A Clock wraps a Twisted reactor and provides utilities on top of it. + """ + _reactor = attr.ib() - def __init__(self, reactor=None): - if not reactor: - from twisted.internet import reactor - self._reactor = reactor + @defer.inlineCallbacks + def sleep(self, seconds): + d = defer.Deferred() + with PreserveLoggingContext(): + self._reactor.callLater(seconds, d.callback, seconds) + res = yield d + defer.returnValue(res) def time(self): """Returns the current system time in seconds since epoch.""" diff --git a/synapse/util/async.py b/synapse/util/async.py index 3484b21dbc93..23595c2c055f 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -32,24 +32,15 @@ logger = logging.getLogger(__name__) -@defer.inlineCallbacks -def sleep(seconds, clock=None): - if not clock: - from twisted.internet import reactor - clock = Clock(reactor) - - d = defer.Deferred() - with PreserveLoggingContext(): - clock._reactor.callLater(seconds, d.callback, seconds) - res = yield d - defer.returnValue(res) - - def run_on_reactor(clock=None): """ This will cause the rest of the function to be invoked upon the next iteration of the main loop """ - return sleep(0, clock=clock) + if not clock: + from twisted.internet import reactor + clock = Clock(reactor) + + return clock.sleep(0) class ObservableDeferred(object): From a6e44020da4bbff6705f80592a5fe9a57904dfc6 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 12 Jun 2018 20:27:27 +1000 Subject: [PATCH 03/14] clock reunification --- synapse/handlers/message.py | 1 + synapse/handlers/user_directory.py | 9 ++++----- synapse/http/matrixfederationclient.py | 4 ++-- synapse/replication/http/send_event.py | 6 +++--- synapse/storage/event_push_actions.py | 3 +-- synapse/util/ratelimitutils.py | 1 - tests/util/test_file_consumer.py | 6 +++--- tests/util/test_linearizer.py | 7 ++++--- tests/util/test_logcontext.py | 11 ++++++----- 9 files changed, 24 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1cb81b6cf849..a4a65cb88338 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -806,6 +806,7 @@ def handle_new_client_event( # If we're a worker we need to hit out to the master. if self.config.worker_app: yield send_event_to_master( + self.hs.get_clock(), self.http_client, host=self.config.worker_replication_host, port=self.config.worker_replication_http_port, diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index a39f0f734327..7e4a114d4f48 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -19,7 +19,6 @@ from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.storage.roommember import ProfileInfo from synapse.util.metrics import Measure -from synapse.util.async import sleep from synapse.types import get_localpart_from_id from six import iteritems @@ -174,7 +173,7 @@ def _do_initial_spam(self): logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids)) yield self._handle_initial_room(room_id) num_processed_rooms += 1 - yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) logger.info("Processed all rooms.") @@ -188,7 +187,7 @@ def _do_initial_spam(self): logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids)) yield self._handle_local_user(user_id) num_processed_users += 1 - yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.) logger.info("Processed all users") @@ -236,7 +235,7 @@ def _handle_initial_room(self, room_id): count = 0 for user_id in user_ids: if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) if not self.is_mine_id(user_id): count += 1 @@ -251,7 +250,7 @@ def _handle_initial_room(self, room_id): continue if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) count += 1 user_set = (user_id, other_user_id) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 50c56ffe6ebc..32f2423b5969 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -22,7 +22,7 @@ from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint import synapse.metrics -from synapse.util.async import sleep, add_timeout_to_deferred +from synapse.util.async import add_timeout_to_deferred from synapse.util import logcontext from synapse.util.logcontext import make_deferred_yieldable import synapse.util.retryutils @@ -235,7 +235,7 @@ def _request(self, destination, method, path, delay = min(delay, 2) delay *= random.uniform(0.8, 1.4) - yield sleep(delay) + yield self.clock.sleep(delay) retries_left -= 1 else: raise diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index a9baa2c1c3d9..0535ad4c420b 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -21,7 +21,6 @@ from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext from synapse.http.servlet import RestServlet, parse_json_object_from_request -from synapse.util.async import sleep from synapse.util.caches.response_cache import ResponseCache from synapse.util.metrics import Measure from synapse.types import Requester, UserID @@ -33,11 +32,12 @@ @defer.inlineCallbacks -def send_event_to_master(client, host, port, requester, event, context, +def send_event_to_master(clock, client, host, port, requester, event, context, ratelimit, extra_users): """Send event to be handled on the master Args: + clock (Clock) client (SimpleHttpClient) host (str): host of master port (int): port on master listening for HTTP replication @@ -77,7 +77,7 @@ def send_event_to_master(client, host, port, requester, event, context, # If we timed out we probably don't need to worry about backing # off too much, but lets just wait a little anyway. - yield sleep(1) + yield clock.sleep(1) except MatrixCodeMessageException as e: # We convert to SynapseError as we know that it was a SynapseError # on the master process that we should send to the client. (And diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index d0350ee5fe70..c4a0208ce484 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -16,7 +16,6 @@ from synapse.storage._base import SQLBaseStore, LoggingTransaction from twisted.internet import defer -from synapse.util.async import sleep from synapse.util.caches.descriptors import cachedInlineCallbacks import logging @@ -800,7 +799,7 @@ def _rotate_notifs(self): ) if caught_up: break - yield sleep(5) + yield self.hs.get_clock().sleep(5) finally: self._doing_notif_rotation = False diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 0ab63c3d7d1c..5a33082d6b3c 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -17,7 +17,6 @@ from synapse.api.errors import LimitExceededError -from synapse.util.async import sleep from synapse.util.logcontext import ( run_in_background, make_deferred_yieldable, PreserveLoggingContext, diff --git a/tests/util/test_file_consumer.py b/tests/util/test_file_consumer.py index d6e108277901..c2aae8f54c7c 100644 --- a/tests/util/test_file_consumer.py +++ b/tests/util/test_file_consumer.py @@ -30,7 +30,7 @@ class FileConsumerTests(unittest.TestCase): @defer.inlineCallbacks def test_pull_consumer(self): string_file = StringIO() - consumer = BackgroundFileConsumer(string_file) + consumer = BackgroundFileConsumer(string_file, reactor=reactor) try: producer = DummyPullProducer() @@ -54,7 +54,7 @@ def test_pull_consumer(self): @defer.inlineCallbacks def test_push_consumer(self): string_file = BlockingStringWrite() - consumer = BackgroundFileConsumer(string_file) + consumer = BackgroundFileConsumer(string_file, reactor=reactor) try: producer = NonCallableMock(spec_set=[]) @@ -80,7 +80,7 @@ def test_push_consumer(self): @defer.inlineCallbacks def test_push_producer_feedback(self): string_file = BlockingStringWrite() - consumer = BackgroundFileConsumer(string_file) + consumer = BackgroundFileConsumer(string_file, reactor=reactor) try: producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"]) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index 4865eb4bc61f..bf7e3aa8853f 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -12,10 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util import async, logcontext + +from synapse.util import logcontext, Clock from tests import unittest -from twisted.internet import defer +from twisted.internet import defer, reactor from synapse.util.async import Linearizer from six.moves import range @@ -53,7 +54,7 @@ def func(i, sleep=False): self.assertEqual( logcontext.LoggingContext.current_context(), lc) if sleep: - yield async.sleep(0) + yield Clock(reactor).sleep(0) self.assertEqual( logcontext.LoggingContext.current_context(), lc) diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index ad78d884e0f8..9cf90fcfc4ef 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -3,8 +3,7 @@ from twisted.internet import reactor from .. import unittest -from synapse.util.async import sleep -from synapse.util import logcontext +from synapse.util import logcontext, Clock from synapse.util.logcontext import LoggingContext @@ -22,18 +21,20 @@ def test_with_context(self): @defer.inlineCallbacks def test_sleep(self): + clock = Clock(reactor) + @defer.inlineCallbacks def competing_callback(): with LoggingContext() as competing_context: competing_context.request = "competing" - yield sleep(0) + yield clock.sleep(0) self._check_test_key("competing") reactor.callLater(0, competing_callback) with LoggingContext() as context_one: context_one.request = "one" - yield sleep(0) + yield clock.sleep(0) self._check_test_key("one") def _test_run_in_background(self, function): @@ -87,7 +88,7 @@ def check_logcontext(): def test_run_in_background_with_blocking_fn(self): @defer.inlineCallbacks def blocking_function(): - yield sleep(0) + yield Clock(reactor).sleep(0) return self._test_run_in_background(blocking_function) From 4797ae4403ed1bf3e453deeecc6f95e335473d98 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 12 Jun 2018 20:41:42 +1000 Subject: [PATCH 04/14] pass around the homeserver when it's needed --- synapse/rest/media/v1/media_repository.py | 3 ++- synapse/rest/media/v1/media_storage.py | 4 +++- synapse/util/file_consumer.py | 4 ++-- tests/crypto/test_keyring.py | 9 +++++---- tests/rest/client/test_transactions.py | 6 +++--- tests/rest/media/v1/test_media_storage.py | 5 +++-- 6 files changed, 18 insertions(+), 13 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 2ac767d2dc70..218ba7a0835e 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -58,6 +58,7 @@ class MediaRepository(object): def __init__(self, hs): + self.hs = hs self.auth = hs.get_auth() self.client = MatrixFederationHttpClient(hs) self.clock = hs.get_clock() @@ -94,7 +95,7 @@ def __init__(self, hs): storage_providers.append(provider) self.media_storage = MediaStorage( - self.primary_base_path, self.filepaths, storage_providers, + self.hs, self.primary_base_path, self.filepaths, storage_providers, ) self.clock.looping_call( diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index eae35ab377af..811fa25793f9 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -37,13 +37,15 @@ class MediaStorage(object): """Responsible for storing/fetching files from local sources. Args: + hs (Homeserver) local_media_directory (str): Base path where we store media on disk filepaths (MediaFilePaths) storage_providers ([StorageProvider]): List of StorageProvider that are used to fetch and store files. """ - def __init__(self, local_media_directory, filepaths, storage_providers): + def __init__(self, hs, local_media_directory, filepaths, storage_providers): + self.hs = hs self.local_media_directory = local_media_directory self.filepaths = filepaths self.storage_providers = storage_providers diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index e57769978d5c..ad46f3ac8bf0 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -75,7 +75,7 @@ def registerProducer(self, producer, streaming): self._finished_deferred = run_in_background( threads.deferToThreadPool, self._reactor, - self._reactor.threadpool, + self._reactor.getThreadPool(), self._writer, ) if not streaming: @@ -114,7 +114,7 @@ def _writer(self): # producer. if self._producer and self._paused_producer: if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: - self._runreactor.callFromThread(self._resume_paused_producer) + self._reactor.callFromThread(self._resume_paused_producer) bytes = self._bytes_queue.get() diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 149e443022d0..cc1c862ba4aa 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -19,10 +19,10 @@ from mock import Mock from synapse.api.errors import SynapseError from synapse.crypto import keyring -from synapse.util import async, logcontext +from synapse.util import logcontext, Clock from synapse.util.logcontext import LoggingContext from tests import unittest, utils -from twisted.internet import defer +from twisted.internet import defer, reactor class MockPerspectiveServer(object): @@ -118,6 +118,7 @@ def test_wait_for_previous_lookups(self): @defer.inlineCallbacks def test_verify_json_objects_for_server_awaits_previous_requests(self): + clock = Clock(reactor) key1 = signedjson.key.generate_signing_key(1) kr = keyring.Keyring(self.hs) @@ -167,7 +168,7 @@ def get_perspectives(**kwargs): # wait a tick for it to send the request to the perspectives server # (it first tries the datastore) - yield async.sleep(1) # XXX find out why this takes so long! + yield clock.sleep(1) # XXX find out why this takes so long! self.http_client.post_json.assert_called_once() self.assertIs(LoggingContext.current_context(), context_11) @@ -183,7 +184,7 @@ def get_perspectives(**kwargs): res_deferreds_2 = kr.verify_json_objects_for_server( [("server10", json1)], ) - yield async.sleep(1) + yield clock.sleep(1) self.http_client.post_json.assert_not_called() res_deferreds_2[0].addBoth(self.check_context, None) diff --git a/tests/rest/client/test_transactions.py b/tests/rest/client/test_transactions.py index b5bc2fa255d6..6a757289dbda 100644 --- a/tests/rest/client/test_transactions.py +++ b/tests/rest/client/test_transactions.py @@ -1,9 +1,9 @@ from synapse.rest.client.transactions import HttpTransactionCache from synapse.rest.client.transactions import CLEANUP_PERIOD_MS -from twisted.internet import defer +from twisted.internet import defer, reactor from mock import Mock, call -from synapse.util import async +from synapse.util import Clock from synapse.util.logcontext import LoggingContext from tests import unittest from tests.utils import MockClock @@ -46,7 +46,7 @@ def test_deduplicates_based_on_key(self): def test_logcontexts_with_async_result(self): @defer.inlineCallbacks def cb(): - yield async.sleep(0) + yield Clock(reactor).sleep(0) defer.returnValue("yay") @defer.inlineCallbacks diff --git a/tests/rest/media/v1/test_media_storage.py b/tests/rest/media/v1/test_media_storage.py index eef38b67818f..c5e2f5549ad0 100644 --- a/tests/rest/media/v1/test_media_storage.py +++ b/tests/rest/media/v1/test_media_storage.py @@ -14,7 +14,7 @@ # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor from synapse.rest.media.v1._base import FileInfo from synapse.rest.media.v1.media_storage import MediaStorage @@ -38,6 +38,7 @@ def setUp(self): self.secondary_base_path = os.path.join(self.test_dir, "secondary") hs = Mock() + hs.get_reactor = Mock(return_value=reactor) hs.config.media_store_path = self.primary_base_path storage_providers = [FileStorageProviderBackend( @@ -46,7 +47,7 @@ def setUp(self): self.filepaths = MediaFilePaths(self.primary_base_path) self.media_storage = MediaStorage( - self.primary_base_path, self.filepaths, storage_providers, + hs, self.primary_base_path, self.filepaths, storage_providers, ) def tearDown(self): From 8ddb7fc2cf778c4a854fb776e4962b91421e85eb Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 12 Jun 2018 20:46:14 +1000 Subject: [PATCH 05/14] fix missing def --- synapse/util/ratelimitutils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 5a33082d6b3c..c5a45cef7c9a 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -152,7 +152,7 @@ def queue_request(): "Ratelimit [%s]: sleeping req", id(request_id), ) - ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0) + ret_defer = run_in_background(self.clock.sleep, self.sleep_msec / 1000.0) self.sleeping_requests.add(request_id) From 1456d962050ca8ea947bd44598895ec16c26a04b Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 12 Jun 2018 21:05:32 +1000 Subject: [PATCH 06/14] fix --- synapse/http/client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/http/client.py b/synapse/http/client.py index 8064a84c5c04..b4777d04e2a9 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -100,6 +100,7 @@ def request(self, method, uri, *args, **kwargs): add_timeout_to_deferred( request_deferred, 60, cancelled_to_request_timed_out_error, + reactor=self.hs.get_reactor(), ) response = yield make_deferred_yieldable(request_deferred) @@ -115,7 +116,7 @@ def request(self, method, uri, *args, **kwargs): "Error sending request to %s %s: %s %s", method, redact_uri(uri), type(e).__name__, e.message ) - raise e + raise @defer.inlineCallbacks def post_urlencoded_get_json(self, uri, args={}, headers=None): From f1006cd65f991f0f7b55559b52f5752d0d5eed1e Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 13 Jun 2018 23:10:59 +1000 Subject: [PATCH 07/14] remove run_on_reactor --- synapse/federation/transaction_queue.py | 4 ---- synapse/handlers/auth.py | 11 +++-------- synapse/handlers/federation.py | 6 +----- synapse/handlers/identity.py | 8 -------- synapse/handlers/message.py | 3 +-- synapse/handlers/register.py | 5 +---- synapse/push/pusherpool.py | 3 --- synapse/rest/client/v1/register.py | 7 ------- synapse/rest/client/v2_alpha/account.py | 7 ------- synapse/rest/client/v2_alpha/register.py | 3 --- tests/test_distributor.py | 2 -- tests/util/caches/test_descriptors.py | 2 -- 12 files changed, 6 insertions(+), 55 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f0aeb5a0d38a..bcbce7f6eb36 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -21,7 +21,6 @@ from synapse.api.errors import HttpResponseException, FederationDeniedError from synapse.util import logcontext, PreserveLoggingContext -from synapse.util.async import run_on_reactor from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.handlers.presence import format_user_presence_state, get_interested_remotes @@ -451,9 +450,6 @@ def _transaction_transmission_loop(self, destination): # hence why we throw the result away. yield get_retry_limiter(destination, self.clock, self.store) - # XXX: what's this for? - yield run_on_reactor() - pending_pdus = [] while True: device_message_edus, device_stream_id, dev_list_id = ( diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 22587b5ba791..a131b7f73f9f 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -24,7 +24,6 @@ ) from synapse.module_api import ModuleApi from synapse.types import UserID -from synapse.util.async import run_on_reactor from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logcontext import make_deferred_yieldable @@ -425,15 +424,11 @@ def _check_email_identity(self, authdict, _): def _check_msisdn(self, authdict, _): return self._check_threepid('msisdn', authdict) - @defer.inlineCallbacks def _check_dummy_auth(self, authdict, _): - yield run_on_reactor(self.hs.get_clock()) - defer.returnValue(True) + return defer.succeed(True) @defer.inlineCallbacks def _check_threepid(self, medium, authdict): - yield run_on_reactor(self.hs.get_clock()) - if 'threepid_creds' not in authdict: raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM) @@ -863,7 +858,7 @@ def _do_hash(): return make_deferred_yieldable( threads.deferToThreadPool( self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash - ) + ), ) def validate_hash(self, password, stored_hash): @@ -889,7 +884,7 @@ def _do_validate_hash(): self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_validate_hash, - ) + ), ) else: return defer.succeed(False) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 495ac4c648de..af94bf33bcee 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -39,7 +39,7 @@ from synapse.util import unwrapFirstError, logcontext from synapse.util.metrics import measure_func from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor, Linearizer +from synapse.util.async import Linearizer from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( compute_event_signature, add_hashes_and_signatures, @@ -1381,8 +1381,6 @@ def on_send_leave_request(self, origin, pdu): def get_state_for_pdu(self, room_id, event_id): """Returns the state at the event. i.e. not including said event. """ - yield run_on_reactor() - state_groups = yield self.store.get_state_groups( room_id, [event_id] ) @@ -1425,8 +1423,6 @@ def get_state_for_pdu(self, room_id, event_id): def get_state_ids_for_pdu(self, room_id, event_id): """Returns the state at the event. i.e. not including said event. """ - yield run_on_reactor() - state_groups = yield self.store.get_state_groups_ids( room_id, [event_id] ) diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 529400955dcd..f00dfe1d3ee6 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -27,7 +27,6 @@ MatrixCodeMessageException, CodeMessageException ) from ._base import BaseHandler -from synapse.util.async import run_on_reactor from synapse.api.errors import SynapseError, Codes logger = logging.getLogger(__name__) @@ -62,8 +61,6 @@ def _should_trust_id_server(self, id_server): @defer.inlineCallbacks def threepid_from_creds(self, creds): - yield run_on_reactor() - if 'id_server' in creds: id_server = creds['id_server'] elif 'idServer' in creds: @@ -106,7 +103,6 @@ def threepid_from_creds(self, creds): @defer.inlineCallbacks def bind_threepid(self, creds, mxid): - yield run_on_reactor() logger.debug("binding threepid %r to %s", creds, mxid) data = None @@ -188,8 +184,6 @@ def unbind_threepid(self, mxid, threepid): @defer.inlineCallbacks def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): - yield run_on_reactor() - if not self._should_trust_id_server(id_server): raise SynapseError( 400, "Untrusted ID server '%s'" % id_server, @@ -224,8 +218,6 @@ def requestMsisdnToken( self, id_server, country, phone_number, client_secret, send_attempt, **kwargs ): - yield run_on_reactor() - if not self._should_trust_id_server(id_server): raise SynapseError( 400, "Untrusted ID server '%s'" % id_server, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a4a65cb88338..25c47fea3d40 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -36,7 +36,7 @@ from synapse.types import ( UserID, RoomAlias, RoomStreamToken, ) -from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter +from synapse.util.async import ReadWriteLock, Limiter from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func from synapse.util.frozenutils import frozendict_json_encoder @@ -962,7 +962,6 @@ def is_inviter_member_event(e): @defer.inlineCallbacks def _notify(): - yield run_on_reactor() try: self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 7e52adda3c6c..e76ef5426daf 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -24,7 +24,7 @@ from synapse.http.client import CaptchaServerHttpClient from synapse import types from synapse.types import UserID, create_requester, RoomID, RoomAlias -from synapse.util.async import run_on_reactor, Linearizer +from synapse.util.async import Linearizer from synapse.util.threepids import check_3pid_allowed from ._base import BaseHandler @@ -139,7 +139,6 @@ def register( Raises: RegistrationError if there was a problem registering. """ - yield run_on_reactor() password_hash = None if password: password_hash = yield self.auth_handler().hash(password) @@ -431,8 +430,6 @@ def get_or_create_user(self, requester, localpart, displayname, Raises: RegistrationError if there was a problem registering. """ - yield run_on_reactor() - if localpart is None: raise SynapseError(400, "Request must include user id") diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 750d11ca38dc..36bb5bbc6543 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -19,7 +19,6 @@ from twisted.internet import defer from synapse.push.pusher import PusherFactory -from synapse.util.async import run_on_reactor from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -125,7 +124,6 @@ def remove_pushers_by_access_token(self, user_id, access_tokens): @defer.inlineCallbacks def on_new_notifications(self, min_stream_id, max_stream_id): - yield run_on_reactor() try: users_affected = yield self.store.get_push_action_users_in_range( min_stream_id, max_stream_id @@ -151,7 +149,6 @@ def on_new_notifications(self, min_stream_id, max_stream_id): @defer.inlineCallbacks def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): - yield run_on_reactor() try: # Need to subtract 1 from the minimum because the lower bound here # is not inclusive diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index ecd059659ddf..c10320dedf94 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -24,8 +24,6 @@ from synapse.http.servlet import parse_json_object_from_request from synapse.types import create_requester -from synapse.util.async import run_on_reactor - from hashlib import sha1 import hmac import logging @@ -272,7 +270,6 @@ def _do_email_identity(self, request, register_json, session): @defer.inlineCallbacks def _do_password(self, request, register_json, session): - yield run_on_reactor() if (self.hs.config.enable_registration_captcha and not session[LoginType.RECAPTCHA]): # captcha should've been done by this stage! @@ -333,8 +330,6 @@ def _do_app_service(self, request, register_json, session): @defer.inlineCallbacks def _do_shared_secret(self, request, register_json, session): - yield run_on_reactor() - if not isinstance(register_json.get("mac", None), string_types): raise SynapseError(400, "Expected mac.") if not isinstance(register_json.get("user", None), string_types): @@ -423,8 +418,6 @@ def on_OPTIONS(self, request): @defer.inlineCallbacks def _do_create(self, requester, user_json): - yield run_on_reactor(self.hs.get_clock()) - if "localpart" not in user_json: raise SynapseError(400, "Expected 'localpart' key.") diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 0291fba9e7c6..e1281cfbb657 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -24,7 +24,6 @@ RestServlet, assert_params_in_request, parse_json_object_from_request, ) -from synapse.util.async import run_on_reactor from synapse.util.msisdn import phone_number_to_msisdn from synapse.util.threepids import check_3pid_allowed from ._base import client_v2_patterns, interactive_auth_handler @@ -300,8 +299,6 @@ def __init__(self, hs): @defer.inlineCallbacks def on_GET(self, request): - yield run_on_reactor() - requester = yield self.auth.get_user_by_req(request) threepids = yield self.datastore.user_get_threepids( @@ -312,8 +309,6 @@ def on_GET(self, request): @defer.inlineCallbacks def on_POST(self, request): - yield run_on_reactor() - body = parse_json_object_from_request(request) threePidCreds = body.get('threePidCreds') @@ -365,8 +360,6 @@ def __init__(self, hs): @defer.inlineCallbacks def on_POST(self, request): - yield run_on_reactor() - body = parse_json_object_from_request(request) required = ['medium', 'address'] diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 0520787f37c5..97e7c0f7c630 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -32,7 +32,6 @@ import logging import hmac from hashlib import sha1 -from synapse.util.async import run_on_reactor from synapse.util.ratelimitutils import FederationRateLimiter from six import string_types @@ -191,8 +190,6 @@ def __init__(self, hs): @interactive_auth_handler @defer.inlineCallbacks def on_POST(self, request): - yield run_on_reactor(self.hs.get_clock()) - body = parse_json_object_from_request(request) kind = "user" diff --git a/tests/test_distributor.py b/tests/test_distributor.py index 010aeaee7eee..c066381698f4 100644 --- a/tests/test_distributor.py +++ b/tests/test_distributor.py @@ -19,7 +19,6 @@ from mock import Mock, patch from synapse.util.distributor import Distributor -from synapse.util.async import run_on_reactor class DistributorTestCase(unittest.TestCase): @@ -95,7 +94,6 @@ class MyException(Exception): @defer.inlineCallbacks def observer(): - yield run_on_reactor() raise MyException("Oopsie") self.dist.observe("whail", observer) diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 2516fe40f431..24754591df36 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -18,7 +18,6 @@ import mock from synapse.api.errors import SynapseError -from synapse.util import async from synapse.util import logcontext from twisted.internet import defer from synapse.util.caches import descriptors @@ -195,7 +194,6 @@ class Cls(object): def fn(self, arg1): @defer.inlineCallbacks def inner_fn(): - yield async.run_on_reactor() raise SynapseError(400, "blah") return inner_fn() From dc648a7828b8c5328c3abf30c981dcf457485670 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 13 Jun 2018 23:12:17 +1000 Subject: [PATCH 08/14] remove the last use of the API --- synapse/util/async.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/synapse/util/async.py b/synapse/util/async.py index 23595c2c055f..d1c1fad861c1 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -32,17 +32,6 @@ logger = logging.getLogger(__name__) -def run_on_reactor(clock=None): - """ This will cause the rest of the function to be invoked upon the next - iteration of the main loop - """ - if not clock: - from twisted.internet import reactor - clock = Clock(reactor) - - return clock.sleep(0) - - class ObservableDeferred(object): """Wraps a deferred object so that we can add observer deferreds. These observer deferreds do not affect the callback chain of the original @@ -175,13 +164,19 @@ class Linearizer(object): # do some work. """ - def __init__(self, name=None): + def __init__(self, name=None, clock=None): if name is None: self.name = id(self) else: self.name = name self.key_to_defer = {} + if not clock: + from twisted.internet import reactor + clock = Clock(reactor) + self._clock = clock + + @defer.inlineCallbacks def queue(self, key): # If there is already a deferred in the queue, we pull it out so that @@ -222,7 +217,7 @@ def queue(self, key): # the context manager, but it needs to happen while we hold the # lock, and the context manager's exit code must be synchronous, # so actually this is the only sensible place. - yield run_on_reactor() + yield self._clock.sleep(0) else: logger.info("Acquired uncontended linearizer lock %r for key %r", From 360c7a4b149e92efb4ea6ea776c26abe7c037280 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 13 Jun 2018 23:15:08 +1000 Subject: [PATCH 09/14] remove the last sleep --- synapse/storage/background_updates.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 8af325a9f5a7..b7e9c716c8bd 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import synapse.util.async from ._base import SQLBaseStore from . import engines @@ -92,7 +91,7 @@ def start_doing_background_updates(self): logger.info("Starting background schema updates") while True: - yield synapse.util.async.sleep( + yield self.hs.get_clock().sleep( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.) try: From d8febddb25f30b924808fa3d741ccfbe173a92bb Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 13 Jun 2018 23:21:31 +1000 Subject: [PATCH 10/14] docstrings and things --- synapse/replication/http/send_event.py | 2 +- synapse/rest/media/v1/media_storage.py | 2 +- synapse/util/async.py | 3 ++- synapse/util/file_consumer.py | 1 + 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 0535ad4c420b..f080f96cc1a3 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -37,7 +37,7 @@ def send_event_to_master(clock, client, host, port, requester, event, context, """Send event to be handled on the master Args: - clock (Clock) + clock (synapse.util.Clock) client (SimpleHttpClient) host (str): host of master port (int): port on master listening for HTTP replication diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index 811fa25793f9..d6b8ebbedb1a 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -37,7 +37,7 @@ class MediaStorage(object): """Responsible for storing/fetching files from local sources. Args: - hs (Homeserver) + hs (synapse.server.Homeserver) local_media_directory (str): Base path where we store media on disk filepaths (MediaFilePaths) storage_providers ([StorageProvider]): List of StorageProvider that are diff --git a/synapse/util/async.py b/synapse/util/async.py index d1c1fad861c1..b13c64ffc526 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -394,7 +394,7 @@ class DeferredTimeoutError(Exception): """ -def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None, reactor=None): +def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None): """ Add a timeout to a deferred by scheduling it to be cancelled after timeout seconds. @@ -409,6 +409,7 @@ def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None, reactor=N Args: deferred (defer.Deferred): deferred to be timed out timeout (Number): seconds to time out after + reactor (twisted.internet.reactor): the Twisted reactor to use on_timeout_cancel (callable): A callable which is called immediately after the deferred times out, and not if this deferred is diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index ad46f3ac8bf0..c78801015b2f 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -27,6 +27,7 @@ class BackgroundFileConsumer(object): Args: file_obj (file): The file like object to write to. Closed when finished. + reactor (twisted.internet.reactor): the Twisted reactor to use """ # For PushProducers pause if we have this many unwritten slices From 2ca9e8253f698356bb1b2b4b1ed29f2670bd53c6 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 13 Jun 2018 23:24:29 +1000 Subject: [PATCH 11/14] this is no longer an inlinecallback --- synapse/handlers/message.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 25c47fea3d40..7b9946ab910c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -960,7 +960,6 @@ def is_inviter_member_event(e): event_stream_id, max_stream_id ) - @defer.inlineCallbacks def _notify(): try: self.notifier.on_new_room_event( From 744dba9ffde1234b4e8f944d6955d69bdf60ec1a Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 13 Jun 2018 23:29:13 +1000 Subject: [PATCH 12/14] fix things --- synapse/http/client.py | 5 ++--- synapse/http/matrixfederationclient.py | 2 +- synapse/notifier.py | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/synapse/http/client.py b/synapse/http/client.py index b4777d04e2a9..46ffb41de1b0 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -98,9 +98,8 @@ def request(self, method, uri, *args, **kwargs): method, uri, *args, **kwargs ) add_timeout_to_deferred( - request_deferred, - 60, cancelled_to_request_timed_out_error, - reactor=self.hs.get_reactor(), + request_deferred, 60, self.hs.get_reactor(), + cancelled_to_request_timed_out_error, ) response = yield make_deferred_yieldable(request_deferred) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 32f2423b5969..4e0399e7629a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -193,8 +193,8 @@ def _request(self, destination, method, path, add_timeout_to_deferred( request_deferred, timeout / 1000. if timeout else 60, + self.hs.get_reactor(), cancelled_to_request_timed_out_error, - reactor=self.hs.get_reactor() ) response = yield make_deferred_yieldable( request_deferred, diff --git a/synapse/notifier.py b/synapse/notifier.py index a0a4a8bb7661..3c0622a29498 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -341,7 +341,7 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None, add_timeout_to_deferred( listener.deferred, (end_time - now) / 1000., - reactor=self.hs.get_reactor() + self.hs.get_reactor(), ) with PreserveLoggingContext(): yield listener.deferred @@ -563,7 +563,7 @@ def wait_for_replication(self, callback, timeout): add_timeout_to_deferred( listener.deferred.addTimeout, (end_time - now) / 1000., - reactor=self.hs.get_reactor() + self.hs.get_reactor(), ) try: with PreserveLoggingContext(): From dec693b050cfb77e55668c12d55b7761b5c2a540 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 13 Jun 2018 23:41:14 +1000 Subject: [PATCH 13/14] pep8 --- synapse/util/async.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/util/async.py b/synapse/util/async.py index b13c64ffc526..1668df4ce6a4 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from twisted.internet import defer from twisted.internet.defer import CancelledError from twisted.python import failure @@ -176,7 +175,6 @@ def __init__(self, name=None, clock=None): clock = Clock(reactor) self._clock = clock - @defer.inlineCallbacks def queue(self, key): # If there is already a deferred in the queue, we pull it out so that From 9fc5dc6933287fd20c947459377ba20bba2b71cb Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 21 Jun 2018 10:06:02 +0100 Subject: [PATCH 14/14] docstring --- synapse/server.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/server.py b/synapse/server.py index 5a2ca0f7a5d2..c29c19289af5 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -191,6 +191,9 @@ def setup(self): logger.info("Finished setting up.") def get_reactor(self): + """ + Fetch the Twisted reactor in use by this HomeServer. + """ return self._reactor def get_ip_from_request(self, request):