From 4084a774a89f0d02406eebda8279c2b8aab89812 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Sep 2018 09:59:15 +0100 Subject: [PATCH 1/2] Timeout reading body for outbound HTTP requests --- synapse/http/matrixfederationclient.py | 52 ++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 7 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 6a1fc8ca553f..f9a1fbf95d76 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -280,7 +280,10 @@ def _request(self, destination, method, path, # :'( # Update transactions table? with logcontext.PreserveLoggingContext(): - body = yield treq.content(response) + body = yield self._timeout_deferred( + treq.content(response), + timeout, + ) raise HttpResponseException( response.code, response.phrase, body ) @@ -394,7 +397,10 @@ def put_json(self, destination, path, args={}, data={}, check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield treq.json_content(response) + body = yield self._timeout_deferred( + treq.json_content(response), + timeout, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -444,7 +450,10 @@ def post_json(self, destination, path, data={}, long_retries=False, check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield treq.json_content(response) + body = yield self._timeout_deferred( + treq.json_content(response), + timeout, + ) defer.returnValue(body) @@ -496,7 +505,10 @@ def get_json(self, destination, path, args=None, retry_on_dns_fail=True, check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield treq.json_content(response) + body = yield self._timeout_deferred( + treq.json_content(response), + timeout, + ) defer.returnValue(body) @@ -543,7 +555,10 @@ def delete_json(self, destination, path, long_retries=False, check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield treq.json_content(response) + body = yield self._timeout_deferred( + treq.json_content(response), + timeout, + ) defer.returnValue(body) @@ -585,8 +600,10 @@ def get_file(self, destination, path, output_stream, args={}, try: with logcontext.PreserveLoggingContext(): - length = yield _readBodyToFile( - response, output_stream, max_size + length = yield self._timeout_deferred( + _readBodyToFile( + response, output_stream, max_size + ), ) except Exception: logger.exception("Failed to download body") @@ -594,6 +611,27 @@ def get_file(self, destination, path, output_stream, args={}, defer.returnValue((length, headers)) + def _timeout_deferred(self, deferred, timeout_ms=None): + """Times the deferred out after `timeout_ms` ms + + Args: + deferred (Deferred) + timeout_ms (int|None): Timeout in milliseconds. If None defaults + to 60 seconds. + + Returns: + Deferred + """ + + add_timeout_to_deferred( + deferred, + timeout_ms / 1000. if timeout_ms else 60, + self.hs.get_reactor(), + cancelled_to_request_timed_out_error, + ) + + return deferred + class _ReadBodyToFileProtocol(protocol.Protocol): def __init__(self, stream, deferred, max_size): From 649c647955dee037e9f7d0d0d81341cac020a901 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Sep 2018 10:10:32 +0100 Subject: [PATCH 2/2] Newsfile --- changelog.d/3845.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3845.bugfix diff --git a/changelog.d/3845.bugfix b/changelog.d/3845.bugfix new file mode 100644 index 000000000000..5b7e8f193498 --- /dev/null +++ b/changelog.d/3845.bugfix @@ -0,0 +1 @@ +Fix outbound requests occasionally wedging, which can result in federation breaking between servers.