Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Update to use new timeout function everywhere. #3910

Merged
merged 5 commits into from
Sep 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3910.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where things occaisonally were not being timed out correctly.
4 changes: 2 additions & 2 deletions synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint
from synapse.util.async_helpers import add_timeout_to_deferred
from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable

Expand Down Expand Up @@ -99,7 +99,7 @@ def request(self, method, uri, data=b'', headers=None):
request_deferred = treq.request(
method, uri, agent=self.agent, data=data, headers=headers
)
add_timeout_to_deferred(
request_deferred = timeout_deferred(
request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
Expand Down
25 changes: 15 additions & 10 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
SynapseError,
)
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util.async_helpers import timeout_no_seriously
from synapse.util.async_helpers import timeout_deferred
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -145,8 +145,14 @@ def _handle_json_response(reactor, timeout_sec, request, response):
"""
try:
check_content_type_is_json(response.headers)

d = treq.json_content(response)
d.addTimeout(timeout_sec, reactor)
d = timeout_deferred(
d,
timeout=timeout_sec,
reactor=reactor,
)

body = yield make_deferred_yieldable(d)
except Exception as e:
logger.warn(
Expand Down Expand Up @@ -321,15 +327,10 @@ def _send_request(
reactor=self.hs.get_reactor(),
unbuffered=True
)
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())

# Sometimes the timeout above doesn't work, so lets hack yet
# another layer of timeouts in in the vain hope that at some
# point the world made sense and this really really really
# should work.
request_deferred = timeout_no_seriously(
request_deferred = timeout_deferred(
request_deferred,
timeout=_sec_timeout * 2,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)

Expand Down Expand Up @@ -388,7 +389,11 @@ def _send_request(
# :'(
# Update transactions table?
d = treq.content(response)
d.addTimeout(_sec_timeout, self.hs.get_reactor())
d = timeout_deferred(
d,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
body = yield make_deferred_yieldable(d)
raise HttpResponseException(
response.code, response.phrase, body
Expand Down
21 changes: 9 additions & 12 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
from synapse.types import StreamToken
from synapse.util.async_helpers import (
DeferredTimeoutError,
ObservableDeferred,
add_timeout_to_deferred,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -337,7 +333,7 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None,
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
add_timeout_to_deferred(
listener.deferred = timeout_deferred(
listener.deferred,
(end_time - now) / 1000.,
self.hs.get_reactor(),
Expand All @@ -354,7 +350,7 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None,
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
except DeferredTimeoutError:
except defer.TimeoutError:
break
except defer.CancelledError:
break
Expand Down Expand Up @@ -559,15 +555,16 @@ def wait_for_replication(self, callback, timeout):
if end_time <= now:
break

add_timeout_to_deferred(
listener.deferred.addTimeout,
(end_time - now) / 1000.,
self.hs.get_reactor(),
listener.deferred = timeout_deferred(
listener.deferred,
timeout=(end_time - now) / 1000.,
reactor=self.hs.get_reactor(),
)

try:
with PreserveLoggingContext():
yield listener.deferred
except DeferredTimeoutError:
except defer.TimeoutError:
break
except defer.CancelledError:
break
Expand Down
88 changes: 25 additions & 63 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,29 +374,25 @@ def _ctx_manager():
defer.returnValue(_ctx_manager())


class DeferredTimeoutError(Exception):
"""
This error is raised by default when a L{Deferred} times out.
"""

def _cancelled_to_timed_out_error(value, timeout):
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise defer.TimeoutError(timeout, "Deferred")
return value

def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
"""
Add a timeout to a deferred by scheduling it to be cancelled after
timeout seconds.

This is essentially a backport of deferred.addTimeout, which was introduced
in twisted 16.5.
def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
"""The in built twisted `Deferred.addTimeout` fails to time out deferreds
that have a canceller that throws exceptions. This method creates a new
deferred that wraps and times out the given deferred, correctly handling
the case where the given deferred's canceller throws.

If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
unless a cancelable function was passed to its initialization or unless
a different on_timeout_cancel callable is provided.
NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred

Args:
deferred (defer.Deferred): deferred to be timed out
timeout (Number): seconds to time out after
reactor (twisted.internet.reactor): the Twisted reactor to use

deferred (Deferred)
timeout (float): Timeout in seconds
reactor (twisted.internet.reactor): The twisted reactor to use
on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is
otherwise cancelled before the timeout.
Expand All @@ -406,48 +402,10 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
the timeout.

The default callable (if none is provided) will translate a
CancelledError Failure into a DeferredTimeoutError.
"""
timed_out = [False]

def time_it_out():
timed_out[0] = True
deferred.cancel()

delayed_call = reactor.callLater(timeout, time_it_out)

def convert_cancelled(value):
if timed_out[0]:
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
return value

deferred.addBoth(convert_cancelled)
CancelledError Failure into a defer.TimeoutError.

def cancel_timeout(result):
# stop the pending call to cancel the deferred if it's been fired
if delayed_call.active():
delayed_call.cancel()
return result

deferred.addBoth(cancel_timeout)


def _cancelled_to_timed_out_error(value, timeout):
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise DeferredTimeoutError(timeout, "Deferred")
return value


def timeout_no_seriously(deferred, timeout, reactor):
"""The in build twisted deferred addTimeout (and the method above)
completely fail to time things out under some unknown circumstances.

Lets try a different way of timing things out and maybe that will make
things work?!

TODO: Kill this with fire.
Returns:
Deferred
"""

new_d = defer.Deferred()
Expand All @@ -457,16 +415,20 @@ def timeout_no_seriously(deferred, timeout, reactor):
def time_it_out():
timed_out[0] = True

if not new_d.called:
new_d.errback(DeferredTimeoutError(timeout, "Deferred"))
try:
deferred.cancel()
except: # noqa: E722, if we throw any exception it'll break time outs
logger.exception("Canceller failed during timeout")

deferred.cancel()
if not new_d.called:
new_d.errback(defer.TimeoutError(timeout, "Deferred"))

delayed_call = reactor.callLater(timeout, time_it_out)

def convert_cancelled(value):
if timed_out[0]:
return _cancelled_to_timed_out_error(value, timeout)
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
return value

deferred.addBoth(convert_cancelled)
Expand Down