Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3910 from matrix-org/erikj/update_timeout
Browse files Browse the repository at this point in the history
Update to use new timeout function everywhere.
  • Loading branch information
erikjohnston authored Sep 19, 2018
2 parents 05b9937 + 80d2d50 commit cb016ba
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 87 deletions.
1 change: 1 addition & 0 deletions changelog.d/3910.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where things occaisonally were not being timed out correctly.
4 changes: 2 additions & 2 deletions synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint
from synapse.util.async_helpers import add_timeout_to_deferred
from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable

Expand Down Expand Up @@ -99,7 +99,7 @@ def request(self, method, uri, data=b'', headers=None):
request_deferred = treq.request(
method, uri, agent=self.agent, data=data, headers=headers
)
add_timeout_to_deferred(
request_deferred = timeout_deferred(
request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
Expand Down
25 changes: 15 additions & 10 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
SynapseError,
)
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util.async_helpers import timeout_no_seriously
from synapse.util.async_helpers import timeout_deferred
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -145,8 +145,14 @@ def _handle_json_response(reactor, timeout_sec, request, response):
"""
try:
check_content_type_is_json(response.headers)

d = treq.json_content(response)
d.addTimeout(timeout_sec, reactor)
d = timeout_deferred(
d,
timeout=timeout_sec,
reactor=reactor,
)

body = yield make_deferred_yieldable(d)
except Exception as e:
logger.warn(
Expand Down Expand Up @@ -321,15 +327,10 @@ def _send_request(
reactor=self.hs.get_reactor(),
unbuffered=True
)
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())

# Sometimes the timeout above doesn't work, so lets hack yet
# another layer of timeouts in in the vain hope that at some
# point the world made sense and this really really really
# should work.
request_deferred = timeout_no_seriously(
request_deferred = timeout_deferred(
request_deferred,
timeout=_sec_timeout * 2,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)

Expand Down Expand Up @@ -388,7 +389,11 @@ def _send_request(
# :'(
# Update transactions table?
d = treq.content(response)
d.addTimeout(_sec_timeout, self.hs.get_reactor())
d = timeout_deferred(
d,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
body = yield make_deferred_yieldable(d)
raise HttpResponseException(
response.code, response.phrase, body
Expand Down
21 changes: 9 additions & 12 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
from synapse.types import StreamToken
from synapse.util.async_helpers import (
DeferredTimeoutError,
ObservableDeferred,
add_timeout_to_deferred,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -337,7 +333,7 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None,
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
add_timeout_to_deferred(
listener.deferred = timeout_deferred(
listener.deferred,
(end_time - now) / 1000.,
self.hs.get_reactor(),
Expand All @@ -354,7 +350,7 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None,
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
except DeferredTimeoutError:
except defer.TimeoutError:
break
except defer.CancelledError:
break
Expand Down Expand Up @@ -559,15 +555,16 @@ def wait_for_replication(self, callback, timeout):
if end_time <= now:
break

add_timeout_to_deferred(
listener.deferred.addTimeout,
(end_time - now) / 1000.,
self.hs.get_reactor(),
listener.deferred = timeout_deferred(
listener.deferred,
timeout=(end_time - now) / 1000.,
reactor=self.hs.get_reactor(),
)

try:
with PreserveLoggingContext():
yield listener.deferred
except DeferredTimeoutError:
except defer.TimeoutError:
break
except defer.CancelledError:
break
Expand Down
88 changes: 25 additions & 63 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,29 +374,25 @@ def _ctx_manager():
defer.returnValue(_ctx_manager())


class DeferredTimeoutError(Exception):
"""
This error is raised by default when a L{Deferred} times out.
"""

def _cancelled_to_timed_out_error(value, timeout):
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise defer.TimeoutError(timeout, "Deferred")
return value

def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
"""
Add a timeout to a deferred by scheduling it to be cancelled after
timeout seconds.

This is essentially a backport of deferred.addTimeout, which was introduced
in twisted 16.5.
def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
"""The in built twisted `Deferred.addTimeout` fails to time out deferreds
that have a canceller that throws exceptions. This method creates a new
deferred that wraps and times out the given deferred, correctly handling
the case where the given deferred's canceller throws.
If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
unless a cancelable function was passed to its initialization or unless
a different on_timeout_cancel callable is provided.
NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
Args:
deferred (defer.Deferred): deferred to be timed out
timeout (Number): seconds to time out after
reactor (twisted.internet.reactor): the Twisted reactor to use
deferred (Deferred)
timeout (float): Timeout in seconds
reactor (twisted.internet.reactor): The twisted reactor to use
on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is
otherwise cancelled before the timeout.
Expand All @@ -406,48 +402,10 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
the timeout.
The default callable (if none is provided) will translate a
CancelledError Failure into a DeferredTimeoutError.
"""
timed_out = [False]

def time_it_out():
timed_out[0] = True
deferred.cancel()

delayed_call = reactor.callLater(timeout, time_it_out)

def convert_cancelled(value):
if timed_out[0]:
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
return value

deferred.addBoth(convert_cancelled)
CancelledError Failure into a defer.TimeoutError.
def cancel_timeout(result):
# stop the pending call to cancel the deferred if it's been fired
if delayed_call.active():
delayed_call.cancel()
return result

deferred.addBoth(cancel_timeout)


def _cancelled_to_timed_out_error(value, timeout):
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise DeferredTimeoutError(timeout, "Deferred")
return value


def timeout_no_seriously(deferred, timeout, reactor):
"""The in build twisted deferred addTimeout (and the method above)
completely fail to time things out under some unknown circumstances.
Lets try a different way of timing things out and maybe that will make
things work?!
TODO: Kill this with fire.
Returns:
Deferred
"""

new_d = defer.Deferred()
Expand All @@ -457,16 +415,20 @@ def timeout_no_seriously(deferred, timeout, reactor):
def time_it_out():
timed_out[0] = True

if not new_d.called:
new_d.errback(DeferredTimeoutError(timeout, "Deferred"))
try:
deferred.cancel()
except: # noqa: E722, if we throw any exception it'll break time outs
logger.exception("Canceller failed during timeout")

deferred.cancel()
if not new_d.called:
new_d.errback(defer.TimeoutError(timeout, "Deferred"))

delayed_call = reactor.callLater(timeout, time_it_out)

def convert_cancelled(value):
if timed_out[0]:
return _cancelled_to_timed_out_error(value, timeout)
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
return value

deferred.addBoth(convert_cancelled)
Expand Down

0 comments on commit cb016ba

Please sign in to comment.