Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Pass around the reactor explicitly (#3385)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkowl authored Jun 22, 2018
1 parent c2eff93 commit 77ac14b
Show file tree
Hide file tree
Showing 25 changed files with 141 additions and 93 deletions.
30 changes: 20 additions & 10 deletions synapse/handlers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import bcrypt
import pymacaroons
import simplejson
import attr

import synapse.util.stringutils as stringutils

Expand Down Expand Up @@ -854,7 +855,11 @@ def _do_hash():
return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
bcrypt.gensalt(self.bcrypt_rounds))

return make_deferred_yieldable(threads.deferToThread(_do_hash))
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
),
)

def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
Expand All @@ -874,16 +879,21 @@ def _do_validate_hash():
)

if stored_hash:
return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(),
self.hs.get_reactor().getThreadPool(),
_do_validate_hash,
),
)
else:
return defer.succeed(False)


class MacaroonGeneartor(object):
def __init__(self, hs):
self.clock = hs.get_clock()
self.server_name = hs.config.server_name
self.macaroon_secret_key = hs.config.macaroon_secret_key
@attr.s
class MacaroonGenerator(object):

hs = attr.ib()

def generate_access_token(self, user_id, extra_caveats=None):
extra_caveats = extra_caveats or []
Expand All @@ -901,7 +911,7 @@ def generate_access_token(self, user_id, extra_caveats=None):
def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
macaroon = self._generate_base_macaroon(user_id)
macaroon.add_first_party_caveat("type = login")
now = self.clock.time_msec()
now = self.hs.get_clock().time_msec()
expiry = now + duration_in_ms
macaroon.add_first_party_caveat("time < %d" % (expiry,))
return macaroon.serialize()
Expand All @@ -913,9 +923,9 @@ def generate_delete_pusher_token(self, user_id):

def _generate_base_macaroon(self, user_id):
macaroon = pymacaroons.Macaroon(
location=self.server_name,
location=self.hs.config.server_name,
identifier="key",
key=self.macaroon_secret_key)
key=self.hs.config.macaroon_secret_key)
macaroon.add_first_party_caveat("gen = 1")
macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
return macaroon
1 change: 1 addition & 0 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ def handle_new_client_event(
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
self.hs.get_clock(),
self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
Expand Down
9 changes: 4 additions & 5 deletions synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.storage.roommember import ProfileInfo
from synapse.util.metrics import Measure
from synapse.util.async import sleep
from synapse.types import get_localpart_from_id

from six import iteritems
Expand Down Expand Up @@ -174,7 +173,7 @@ def _do_initial_spam(self):
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)

logger.info("Processed all rooms.")

Expand All @@ -188,7 +187,7 @@ def _do_initial_spam(self):
logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
yield self._handle_local_user(user_id)
num_processed_users += 1
yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)

logger.info("Processed all users")

Expand Down Expand Up @@ -236,7 +235,7 @@ def _handle_initial_room(self, room_id):
count = 0
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)

if not self.is_mine_id(user_id):
count += 1
Expand All @@ -251,7 +250,7 @@ def _handle_initial_room(self, room_id):
continue

if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
count += 1

user_set = (user_id, other_user_id)
Expand Down
6 changes: 3 additions & 3 deletions synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ def request(self, method, uri, *args, **kwargs):
method, uri, *args, **kwargs
)
add_timeout_to_deferred(
request_deferred,
60, cancelled_to_request_timed_out_error,
request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(request_deferred)

Expand All @@ -115,7 +115,7 @@ def request(self, method, uri, *args, **kwargs):
"Error sending request to %s %s: %s %s",
method, redact_uri(uri), type(e).__name__, e.message
)
raise e
raise

@defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}, headers=None):
Expand Down
5 changes: 3 additions & 2 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
import synapse.metrics
from synapse.util.async import sleep, add_timeout_to_deferred
from synapse.util.async import add_timeout_to_deferred
from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
import synapse.util.retryutils
Expand Down Expand Up @@ -193,6 +193,7 @@ def _request(self, destination, method, path,
add_timeout_to_deferred(
request_deferred,
timeout / 1000. if timeout else 60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(
Expand Down Expand Up @@ -234,7 +235,7 @@ def _request(self, destination, method, path,
delay = min(delay, 2)
delay *= random.uniform(0.8, 1.4)

yield sleep(delay)
yield self.clock.sleep(delay)
retries_left -= 1
else:
raise
Expand Down
3 changes: 3 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def __init__(self, hs):
self.user_to_user_stream = {}
self.room_to_user_streams = {}

self.hs = hs
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
self.pending_new_room_events = []
Expand Down Expand Up @@ -340,6 +341,7 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None,
add_timeout_to_deferred(
listener.deferred,
(end_time - now) / 1000.,
self.hs.get_reactor(),
)
with PreserveLoggingContext():
yield listener.deferred
Expand Down Expand Up @@ -561,6 +563,7 @@ def wait_for_replication(self, callback, timeout):
add_timeout_to_deferred(
listener.deferred.addTimeout,
(end_time - now) / 1000.,
self.hs.get_reactor(),
)
try:
with PreserveLoggingContext():
Expand Down
6 changes: 3 additions & 3 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.metrics import Measure
from synapse.types import Requester, UserID
Expand All @@ -33,11 +32,12 @@


@defer.inlineCallbacks
def send_event_to_master(client, host, port, requester, event, context,
def send_event_to_master(clock, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master
Args:
clock (synapse.util.Clock)
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
Expand Down Expand Up @@ -77,7 +77,7 @@ def send_event_to_master(client, host, port, requester, event, context,

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield sleep(1)
yield clock.sleep(1)
except MatrixCodeMessageException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/media/v1/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

class MediaRepository(object):
def __init__(self, hs):
self.hs = hs
self.auth = hs.get_auth()
self.client = MatrixFederationHttpClient(hs)
self.clock = hs.get_clock()
Expand Down Expand Up @@ -94,7 +95,7 @@ def __init__(self, hs):
storage_providers.append(provider)

self.media_storage = MediaStorage(
self.primary_base_path, self.filepaths, storage_providers,
self.hs, self.primary_base_path, self.filepaths, storage_providers,
)

self.clock.looping_call(
Expand Down
7 changes: 5 additions & 2 deletions synapse/rest/media/v1/media_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ class MediaStorage(object):
"""Responsible for storing/fetching files from local sources.
Args:
hs (synapse.server.Homeserver)
local_media_directory (str): Base path where we store media on disk
filepaths (MediaFilePaths)
storage_providers ([StorageProvider]): List of StorageProvider that are
used to fetch and store files.
"""

def __init__(self, local_media_directory, filepaths, storage_providers):
def __init__(self, hs, local_media_directory, filepaths, storage_providers):
self.hs = hs
self.local_media_directory = local_media_directory
self.filepaths = filepaths
self.storage_providers = storage_providers
Expand Down Expand Up @@ -175,7 +177,8 @@ def ensure_media_is_in_local_cache(self, file_info):
res = yield provider.fetch(path, file_info)
if res:
with res:
consumer = BackgroundFileConsumer(open(local_path, "w"))
consumer = BackgroundFileConsumer(
open(local_path, "w"), self.hs.get_reactor())
yield res.write_to_consumer(consumer)
yield consumer.wait()
defer.returnValue(local_path)
Expand Down
19 changes: 15 additions & 4 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from synapse.federation.transaction_queue import TransactionQueue
from synapse.handlers import Handlers
from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.handlers.auth import AuthHandler, MacaroonGeneartor
from synapse.handlers.auth import AuthHandler, MacaroonGenerator
from synapse.handlers.deactivate_account import DeactivateAccountHandler
from synapse.handlers.devicemessage import DeviceMessageHandler
from synapse.handlers.device import DeviceHandler
Expand Down Expand Up @@ -165,15 +165,19 @@ def build_DEPENDENCY(self)
'server_notices_sender',
]

def __init__(self, hostname, **kwargs):
def __init__(self, hostname, reactor=None, **kwargs):
"""
Args:
hostname : The hostname for the server.
"""
if not reactor:
from twisted.internet import reactor

self._reactor = reactor
self.hostname = hostname
self._building = {}

self.clock = Clock()
self.clock = Clock(reactor)
self.distributor = Distributor()
self.ratelimiter = Ratelimiter()

Expand All @@ -186,6 +190,12 @@ def setup(self):
self.datastore = DataStore(self.get_db_conn(), self)
logger.info("Finished setting up.")

def get_reactor(self):
"""
Fetch the Twisted reactor in use by this HomeServer.
"""
return self._reactor

def get_ip_from_request(self, request):
# X-Forwarded-For is handled by our custom request type.
return request.getClientIP()
Expand Down Expand Up @@ -261,7 +271,7 @@ def build_auth_handler(self):
return AuthHandler(self)

def build_macaroon_generator(self):
return MacaroonGeneartor(self)
return MacaroonGenerator(self)

def build_device_handler(self):
return DeviceHandler(self)
Expand Down Expand Up @@ -328,6 +338,7 @@ def build_db_pool(self):

return adbapi.ConnectionPool(
name,
cp_reactor=self.get_reactor(),
**self.db_config.get("args", {})
)

Expand Down
3 changes: 1 addition & 2 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import synapse.util.async

from ._base import SQLBaseStore
from . import engines
Expand Down Expand Up @@ -92,7 +91,7 @@ def start_doing_background_updates(self):
logger.info("Starting background schema updates")

while True:
yield synapse.util.async.sleep(
yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)

try:
Expand Down
6 changes: 4 additions & 2 deletions synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging

from twisted.internet import defer, reactor
from twisted.internet import defer

from ._base import Cache
from . import background_updates
Expand Down Expand Up @@ -70,7 +70,9 @@ def __init__(self, db_conn, hs):
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
self.hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)

def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
now=None):
Expand Down
3 changes: 1 addition & 2 deletions synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from synapse.storage._base import SQLBaseStore, LoggingTransaction
from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks

import logging
Expand Down Expand Up @@ -800,7 +799,7 @@ def _rotate_notifs(self):
)
if caught_up:
break
yield sleep(5)
yield self.hs.get_clock().sleep(5)
finally:
self._doing_notif_rotation = False

Expand Down
Loading

0 comments on commit 77ac14b

Please sign in to comment.