Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use deferred.addTimeout instead of time_bound_deferred #3127

Merged
merged 4 commits into from
Apr 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions synapse/http/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,3 +13,24 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet.defer import CancelledError
from twisted.python import failure

from synapse.api.errors import SynapseError


class RequestTimedOutError(SynapseError):
"""Exception representing timeout of an outbound request"""
def __init__(self):
super(RequestTimedOutError, self).__init__(504, "Timed out")


def cancelled_to_request_timed_out_error(value):
"""Turns CancelledErrors into RequestTimedOutErrors.

For use with async.add_timeout_to_deferred
"""
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise RequestTimedOutError()
return value
20 changes: 9 additions & 11 deletions synapse/http/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -18,9 +19,10 @@
from synapse.api.errors import (
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
)
from synapse.http import cancelled_to_request_timed_out_error
from synapse.util.async import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util import logcontext
import synapse.metrics
from synapse.http.endpoint import SpiderEndpoint

Expand Down Expand Up @@ -95,21 +97,17 @@ def request(self, method, uri, *args, **kwargs):
# counters to it
outgoing_requests_counter.inc(method)

def send_request():
logger.info("Sending request %s %s", method, uri)

try:
request_deferred = self.agent.request(
method, uri, *args, **kwargs
)

return self.clock.time_bound_deferred(
add_timeout_to_deferred(
request_deferred,
time_out=60,
60, cancelled_to_request_timed_out_error,
)

logger.info("Sending request %s %s", method, uri)

try:
with logcontext.PreserveLoggingContext():
response = yield send_request()
response = yield make_deferred_yieldable(request_deferred)

incoming_responses_counter.inc(method, response.code)
logger.info(
Expand Down
38 changes: 20 additions & 18 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,17 +13,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import synapse.util.retryutils
from twisted.internet import defer, reactor, protocol
from twisted.internet.error import DNSLookupError
from twisted.web.client import readBody, HTTPConnectionPool, Agent
from twisted.web.http_headers import Headers
from twisted.web._newclient import ResponseDone

from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util.async import sleep
from synapse.util import logcontext
import synapse.metrics
from synapse.util.async import sleep, add_timeout_to_deferred
from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
import synapse.util.retryutils

from canonicaljson import encode_canonical_json

Expand Down Expand Up @@ -184,21 +187,20 @@ def _request(self, destination, method, path,
producer = body_callback(method, http_url_bytes, headers_dict)

try:
def send_request():
request_deferred = self.agent.request(
method,
url_bytes,
Headers(headers_dict),
producer
)

return self.clock.time_bound_deferred(
request_deferred,
time_out=timeout / 1000. if timeout else 60,
)

with logcontext.PreserveLoggingContext():
response = yield send_request()
request_deferred = self.agent.request(
method,
url_bytes,
Headers(headers_dict),
producer
)
add_timeout_to_deferred(
request_deferred,
timeout / 1000. if timeout else 60,
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(
request_deferred,
)

log_result = "%d %s" % (response.code, response.phrase,)
break
Expand Down
29 changes: 17 additions & 12 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
# limitations under the License.

from twisted.internet import defer

from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state

from synapse.util import DeferredTimedOutError
from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred
from synapse.util.async import (
ObservableDeferred, add_timeout_to_deferred,
DeferredTimeoutError,
)
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.metrics import Measure
from synapse.types import StreamToken
Expand Down Expand Up @@ -336,11 +339,12 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None,
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
add_timeout_to_deferred(
listener.deferred,
(end_time - now) / 1000.,
)
with PreserveLoggingContext():
yield self.clock.time_bound_deferred(
listener.deferred,
time_out=(end_time - now) / 1000.
)
yield listener.deferred

current_token = user_stream.current_token

Expand All @@ -351,7 +355,7 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None,
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
except DeferredTimedOutError:
except DeferredTimeoutError:
break
except defer.CancelledError:
break
Expand Down Expand Up @@ -556,13 +560,14 @@ def wait_for_replication(self, callback, timeout):
if end_time <= now:
break

add_timeout_to_deferred(
listener.deferred.addTimeout,
(end_time - now) / 1000.,
)
try:
with PreserveLoggingContext():
yield self.clock.time_bound_deferred(
listener.deferred,
time_out=(end_time - now) / 1000.
)
except DeferredTimedOutError:
yield listener.deferred
except DeferredTimeoutError:
break
except defer.CancelledError:
break
Expand Down
56 changes: 0 additions & 56 deletions synapse/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.api.errors import SynapseError
from synapse.util.logcontext import PreserveLoggingContext

from twisted.internet import defer, reactor, task
Expand All @@ -24,11 +23,6 @@
logger = logging.getLogger(__name__)


class DeferredTimedOutError(SynapseError):
def __init__(self):
super(DeferredTimedOutError, self).__init__(504, "Timed out")


def unwrapFirstError(failure):
# defer.gatherResults and DeferredLists wrap failures.
failure.trap(defer.FirstError)
Expand Down Expand Up @@ -85,53 +79,3 @@ def cancel_call_later(self, timer, ignore_errs=False):
except Exception:
if not ignore_errs:
raise

def time_bound_deferred(self, given_deferred, time_out):
if given_deferred.called:
return given_deferred

ret_deferred = defer.Deferred()

def timed_out_fn():
e = DeferredTimedOutError()

try:
ret_deferred.errback(e)
except Exception:
pass

try:
given_deferred.cancel()
except Exception:
pass

timer = None

def cancel(res):
try:
self.cancel_call_later(timer)
except Exception:
pass
return res

ret_deferred.addBoth(cancel)

def success(res):
try:
ret_deferred.callback(res)
except Exception:
pass

return res

def err(res):
try:
ret_deferred.errback(res)
except Exception:
pass

given_deferred.addCallbacks(callback=success, errback=err)

timer = self.call_later(time_out, timed_out_fn)

return ret_deferred
67 changes: 67 additions & 0 deletions synapse/util/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@


from twisted.internet import defer, reactor
from twisted.internet.defer import CancelledError
from twisted.python import failure

from .logcontext import (
PreserveLoggingContext, make_deferred_yieldable, preserve_fn
Expand Down Expand Up @@ -392,3 +394,68 @@ def _ctx_manager():
self.key_to_current_writer.pop(key)

defer.returnValue(_ctx_manager())


class DeferredTimeoutError(Exception):
"""
This error is raised by default when a L{Deferred} times out.
"""


def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
"""
Add a timeout to a deferred by scheduling it to be cancelled after
timeout seconds.

This is essentially a backport of deferred.addTimeout, which was introduced
in twisted 16.5.

If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
unless a cancelable function was passed to its initialization or unless
a different on_timeout_cancel callable is provided.

Args:
deferred (defer.Deferred): deferred to be timed out
timeout (Number): seconds to time out after

on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is
otherwise cancelled before the timeout.

It takes an arbitrary value, which is the value of the deferred at
that exact point in time (probably a CancelledError Failure), and
the timeout.

The default callable (if none is provided) will translate a
CancelledError Failure into a DeferredTimeoutError.
"""
timed_out = [False]

def time_it_out():
timed_out[0] = True
deferred.cancel()

delayed_call = reactor.callLater(timeout, time_it_out)

def convert_cancelled(value):
if timed_out[0]:
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
return value

deferred.addBoth(convert_cancelled)

def cancel_timeout(result):
# stop the pending call to cancel the deferred if it's been fired
if delayed_call.active():
delayed_call.cancel()
return result

deferred.addBoth(cancel_timeout)


def _cancelled_to_timed_out_error(value, timeout):
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise DeferredTimeoutError(timeout, "Deferred")
return value
33 changes: 0 additions & 33 deletions tests/util/test_clock.py

This file was deleted.