Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Pass around the reactor explicitly #3385

Merged
merged 16 commits into from
Jun 22, 2018
Prev Previous commit
Next Next commit
clock reunification
  • Loading branch information
hawkowl committed Jun 12, 2018
commit a6e44020da4bbff6705f80592a5fe9a57904dfc6
1 change: 1 addition & 0 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ def handle_new_client_event(
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
self.hs.get_clock(),
self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
Expand Down
9 changes: 4 additions & 5 deletions synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.storage.roommember import ProfileInfo
from synapse.util.metrics import Measure
from synapse.util.async import sleep
from synapse.types import get_localpart_from_id

from six import iteritems
Expand Down Expand Up @@ -174,7 +173,7 @@ def _do_initial_spam(self):
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)

logger.info("Processed all rooms.")

Expand All @@ -188,7 +187,7 @@ def _do_initial_spam(self):
logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
yield self._handle_local_user(user_id)
num_processed_users += 1
yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)

logger.info("Processed all users")

Expand Down Expand Up @@ -236,7 +235,7 @@ def _handle_initial_room(self, room_id):
count = 0
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)

if not self.is_mine_id(user_id):
count += 1
Expand All @@ -251,7 +250,7 @@ def _handle_initial_room(self, room_id):
continue

if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
count += 1

user_set = (user_id, other_user_id)
Expand Down
4 changes: 2 additions & 2 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
import synapse.metrics
from synapse.util.async import sleep, add_timeout_to_deferred
from synapse.util.async import add_timeout_to_deferred
from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
import synapse.util.retryutils
Expand Down Expand Up @@ -235,7 +235,7 @@ def _request(self, destination, method, path,
delay = min(delay, 2)
delay *= random.uniform(0.8, 1.4)

yield sleep(delay)
yield self.clock.sleep(delay)
retries_left -= 1
else:
raise
Expand Down
6 changes: 3 additions & 3 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.metrics import Measure
from synapse.types import Requester, UserID
Expand All @@ -33,11 +32,12 @@


@defer.inlineCallbacks
def send_event_to_master(client, host, port, requester, event, context,
def send_event_to_master(clock, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master

Args:
clock (Clock)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to try and get the types for these right, to help editors implement clickthrough etc. I believe this is a synapse.util.Clock ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
Expand Down Expand Up @@ -77,7 +77,7 @@ def send_event_to_master(client, host, port, requester, event, context,

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield sleep(1)
yield clock.sleep(1)
except MatrixCodeMessageException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
Expand Down
3 changes: 1 addition & 2 deletions synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from synapse.storage._base import SQLBaseStore, LoggingTransaction
from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks

import logging
Expand Down Expand Up @@ -800,7 +799,7 @@ def _rotate_notifs(self):
)
if caught_up:
break
yield sleep(5)
yield self.hs.get_clock().sleep(5)
finally:
self._doing_notif_rotation = False

Expand Down
1 change: 0 additions & 1 deletion synapse/util/ratelimitutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from synapse.api.errors import LimitExceededError

from synapse.util.async import sleep
from synapse.util.logcontext import (
run_in_background, make_deferred_yieldable,
PreserveLoggingContext,
Expand Down
6 changes: 3 additions & 3 deletions tests/util/test_file_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class FileConsumerTests(unittest.TestCase):
@defer.inlineCallbacks
def test_pull_consumer(self):
string_file = StringIO()
consumer = BackgroundFileConsumer(string_file)
consumer = BackgroundFileConsumer(string_file, reactor=reactor)

try:
producer = DummyPullProducer()
Expand All @@ -54,7 +54,7 @@ def test_pull_consumer(self):
@defer.inlineCallbacks
def test_push_consumer(self):
string_file = BlockingStringWrite()
consumer = BackgroundFileConsumer(string_file)
consumer = BackgroundFileConsumer(string_file, reactor=reactor)

try:
producer = NonCallableMock(spec_set=[])
Expand All @@ -80,7 +80,7 @@ def test_push_consumer(self):
@defer.inlineCallbacks
def test_push_producer_feedback(self):
string_file = BlockingStringWrite()
consumer = BackgroundFileConsumer(string_file)
consumer = BackgroundFileConsumer(string_file, reactor=reactor)

try:
producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"])
Expand Down
7 changes: 4 additions & 3 deletions tests/util/test_linearizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util import async, logcontext

from synapse.util import logcontext, Clock
from tests import unittest

from twisted.internet import defer
from twisted.internet import defer, reactor

from synapse.util.async import Linearizer
from six.moves import range
Expand Down Expand Up @@ -53,7 +54,7 @@ def func(i, sleep=False):
self.assertEqual(
logcontext.LoggingContext.current_context(), lc)
if sleep:
yield async.sleep(0)
yield Clock(reactor).sleep(0)

self.assertEqual(
logcontext.LoggingContext.current_context(), lc)
Expand Down
11 changes: 6 additions & 5 deletions tests/util/test_logcontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
from twisted.internet import reactor
from .. import unittest

from synapse.util.async import sleep
from synapse.util import logcontext
from synapse.util import logcontext, Clock
from synapse.util.logcontext import LoggingContext


Expand All @@ -22,18 +21,20 @@ def test_with_context(self):

@defer.inlineCallbacks
def test_sleep(self):
clock = Clock(reactor)

@defer.inlineCallbacks
def competing_callback():
with LoggingContext() as competing_context:
competing_context.request = "competing"
yield sleep(0)
yield clock.sleep(0)
self._check_test_key("competing")

reactor.callLater(0, competing_callback)

with LoggingContext() as context_one:
context_one.request = "one"
yield sleep(0)
yield clock.sleep(0)
self._check_test_key("one")

def _test_run_in_background(self, function):
Expand Down Expand Up @@ -87,7 +88,7 @@ def check_logcontext():
def test_run_in_background_with_blocking_fn(self):
@defer.inlineCallbacks
def blocking_function():
yield sleep(0)
yield Clock(reactor).sleep(0)

return self._test_run_in_background(blocking_function)

Expand Down