From 53abf48ca1752d948771965c363f6bce335bc8cb Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 13 Sep 2018 22:48:22 +1000 Subject: [PATCH 1/4] resolving --- synapse/http/matrixfederationclient.py | 28 ++++--- tests/http/test_fedclient.py | 101 +++++++++++++++++++++++++ tests/server.py | 33 +++++++- 3 files changed, 152 insertions(+), 10 deletions(-) create mode 100644 tests/http/test_fedclient.py diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index cf920bc04144..6e4daca75616 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -26,7 +26,7 @@ from prometheus_client import Counter from signedjson.sign import sign_json -from twisted.internet import defer, protocol, reactor +from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError from twisted.web._newclient import ResponseDone from twisted.web.client import Agent, HTTPConnectionPool @@ -66,13 +66,14 @@ class MatrixFederationEndpointFactory(object): def __init__(self, hs): + self.reactor = hs.get_reactor() self.tls_client_options_factory = hs.tls_client_options_factory def endpointForURI(self, uri): destination = uri.netloc.decode('ascii') return matrix_federation_endpoint( - reactor, destination, timeout=10, + self.reactor, destination, timeout=10, tls_client_options_factory=self.tls_client_options_factory ) @@ -90,6 +91,7 @@ def __init__(self, hs): self.hs = hs self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname + reactor = hs.get_reactor() pool = HTTPConnectionPool(reactor) pool.maxPersistentPerHost = 5 pool.cachedConnectionTimeout = 2 * 60 @@ -143,6 +145,11 @@ def _request(self, destination, method, path, (May also fail with plenty of other Exceptions for things like DNS failures, connection failures, SSL failures.) """ + if timeout: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = 60 + if ( self.hs.config.federation_domain_whitelist is not None and destination not in self.hs.config.federation_domain_whitelist @@ -215,13 +222,9 @@ def _request(self, destination, method, path, headers=Headers(headers_dict), data=data, agent=self.agent, + reactor=self.hs.get_reactor() ) - add_timeout_to_deferred( - request_deferred, - timeout / 1000. if timeout else 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, - ) + request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) response = yield make_deferred_yieldable( request_deferred, ) @@ -261,6 +264,13 @@ def _request(self, destination, method, path, delay = min(delay, 2) delay *= random.uniform(0.8, 1.4) + logger.debug( + "{%s} Waiting %s before sending to %s...", + txn_id, + delay, + destination + ) + yield self.clock.sleep(delay) retries_left -= 1 else: @@ -388,7 +398,7 @@ def put_json(self, destination, path, args={}, data={}, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, - backoff_on_404=backoff_on_404, + backoff_on_404=backoff_on_404 ) if 200 <= response.code < 300: diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py new file mode 100644 index 000000000000..2c5f7e0fb7d9 --- /dev/null +++ b/tests/http/test_fedclient.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mock import Mock + +from twisted.internet.error import ConnectingCancelledError, DNSLookupError +from twisted.web.error import ResponseNeverReceived + +from synapse.http.matrixfederationclient import MatrixFederationHttpClient + +from tests.unittest import HomeserverTestCase + + +class FederationClientTests(HomeserverTestCase): + + def make_homeserver(self, reactor, clock): + + hs = self.setup_test_homeserver(reactor=reactor, clock=clock) + hs.tls_client_options_factory = None + return hs + + def prepare(self, reactor, clock, homeserver): + + self.cl = MatrixFederationHttpClient(self.hs) + self.reactor.lookups["testserv"] = "1.2.3.4" + + + def test_dns_error(self): + """ + If the DNS raising returns an error, it will bubble up. + """ + d = self.cl.put_json("testserv2:8008", "foo/bar", timeout=10000) + self.pump() + + f = self.failureResultOf(d) + self.assertIsInstance(f.value, DNSLookupError) + + def test_client_never_connect(self): + + d = self.cl.put_json("testserv:8008", "foo/bar", timeout=10000) + + self.pump() + + # Nothing happened yet + self.assertFalse(d.called) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + self.assertEqual(clients[0][0], '1.2.3.4') + self.assertEqual(clients[0][1], 8008) + + # Deferred is still without a result + self.assertFalse(d.called) + + # Push by enough to time it out + self.reactor.advance(10.5) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, ConnectingCancelledError) + + def test_client_connect_no_response(self): + + d = self.cl.put_json("testserv:8008", "foo/bar", timeout=10000) + + self.pump() + + # Nothing happened yet + self.assertFalse(d.called) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + self.assertEqual(clients[0][0], '1.2.3.4') + self.assertEqual(clients[0][1], 8008) + + conn = Mock() + + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred is still without a result + self.assertFalse(d.called) + + # Push by enough to time it out + self.reactor.advance(10.5) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, ResponseNeverReceived) diff --git a/tests/server.py b/tests/server.py index a2c3ca61f66d..5dbce0746972 100644 --- a/tests/server.py +++ b/tests/server.py @@ -4,9 +4,14 @@ from six import text_type import attr +from zope.interface import implementer -from twisted.internet import address, threads +from twisted.internet import address, defer, threads, udp +from twisted.internet._resolver import HostResolution +from twisted.internet.address import IPv4Address from twisted.internet.defer import Deferred +from twisted.internet.error import DNSLookupError +from twisted.internet.interfaces import IReactorPluggableNameResolver from twisted.python.failure import Failure from twisted.test.proto_helpers import MemoryReactorClock @@ -154,10 +159,36 @@ def render(request, resource, clock): wait_until_result(clock, request) +@implementer(IReactorPluggableNameResolver) class ThreadedMemoryReactorClock(MemoryReactorClock): """ A MemoryReactorClock that supports callFromThread. """ + def __init__(self): + self._udp = [] + self.lookups = {} + + class Resolver(object): + def resolveHostName(_self, resolutionReceiver, hostName, portNumber=0, addressTypes=None, transportSemantics='TCP'): + + resolution = HostResolution(hostName) + resolutionReceiver.resolutionBegan(resolution) + if hostName not in self.lookups: + raise DNSLookupError("OH NO") + + resolutionReceiver.addressResolved(IPv4Address('TCP', self.lookups[hostName], portNumber)) + resolutionReceiver.resolutionComplete() + return resolution + + self.nameResolver = Resolver() + super(ThreadedMemoryReactorClock, self).__init__() + + def listenUDP(self, port, protocol, interface='', maxPacketSize=8196): + print(port) + p = udp.Port(port, protocol, interface, maxPacketSize, self) + p.startListening() + self._udp.append(p) + return p def callFromThread(self, callback, *args, **kwargs): """ From 2c709ef46fbfb98d5a935f823574d14066ec23ad Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 13 Sep 2018 23:07:47 +1000 Subject: [PATCH 2/4] some fixes? --- synapse/http/matrixfederationclient.py | 73 +++++++++----------------- tests/http/test_fedclient.py | 70 ++++++++++++++++++++---- tests/server.py | 14 ++++- 3 files changed, 98 insertions(+), 59 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 6e4daca75616..7c10aa54864a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -43,7 +43,6 @@ from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint from synapse.util import logcontext -from synapse.util.async_helpers import add_timeout_to_deferred from synapse.util.logcontext import make_deferred_yieldable logger = logging.getLogger(__name__) @@ -102,6 +101,7 @@ def __init__(self, hs): self._store = hs.get_datastore() self.version_string = hs.version_string.encode('ascii') self._next_id = 1 + self.default_timeout = 60 def _create_url(self, destination, path_bytes, param_bytes, query_bytes): return urllib.parse.urlunparse( @@ -148,7 +148,7 @@ def _request(self, destination, method, path, if timeout: _sec_timeout = timeout / 1000 else: - _sec_timeout = 60 + _sec_timeout = self.default_timeout if ( self.hs.config.federation_domain_whitelist is not None and @@ -289,10 +289,9 @@ def _request(self, destination, method, path, # :'( # Update transactions table? with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.content(response), - timeout, - ) + d = treq.content(response) + d.addTimeout(_sec_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) raise HttpResponseException( response.code, response.phrase, body ) @@ -406,10 +405,9 @@ def put_json(self, destination, path, args={}, data={}, check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @defer.inlineCallbacks @@ -459,10 +457,14 @@ def post_json(self, destination, path, data={}, long_retries=False, check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + if timeout: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + + d.addTimeout(_sec_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @@ -514,10 +516,9 @@ def get_json(self, destination, path, args=None, retry_on_dns_fail=True, check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @@ -564,10 +565,9 @@ def delete_json(self, destination, path, long_retries=False, check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @@ -609,38 +609,15 @@ def get_file(self, destination, path, output_stream, args={}, try: with logcontext.PreserveLoggingContext(): - length = yield self._timeout_deferred( - _readBodyToFile( - response, output_stream, max_size - ), - ) + d = _readBodyToFile(response, output_stream, max_size) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + length = yield make_deferred_yieldable(d) except Exception: logger.exception("Failed to download body") raise defer.returnValue((length, headers)) - def _timeout_deferred(self, deferred, timeout_ms=None): - """Times the deferred out after `timeout_ms` ms - - Args: - deferred (Deferred) - timeout_ms (int|None): Timeout in milliseconds. If None defaults - to 60 seconds. - - Returns: - Deferred - """ - - add_timeout_to_deferred( - deferred, - timeout_ms / 1000. if timeout_ms else 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, - ) - - return deferred - class _ReadBodyToFileProtocol(protocol.Protocol): def __init__(self, stream, deferred, max_size): diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py index 2c5f7e0fb7d9..c48df3f6f217 100644 --- a/tests/http/test_fedclient.py +++ b/tests/http/test_fedclient.py @@ -15,8 +15,9 @@ from mock import Mock +from twisted.internet.defer import TimeoutError from twisted.internet.error import ConnectingCancelledError, DNSLookupError -from twisted.web.error import ResponseNeverReceived +from twisted.web.client import ResponseNeverReceived from synapse.http.matrixfederationclient import MatrixFederationHttpClient @@ -24,7 +25,6 @@ class FederationClientTests(HomeserverTestCase): - def make_homeserver(self, reactor, clock): hs = self.setup_test_homeserver(reactor=reactor, clock=clock) @@ -36,20 +36,21 @@ def prepare(self, reactor, clock, homeserver): self.cl = MatrixFederationHttpClient(self.hs) self.reactor.lookups["testserv"] = "1.2.3.4" - def test_dns_error(self): """ If the DNS raising returns an error, it will bubble up. """ - d = self.cl.put_json("testserv2:8008", "foo/bar", timeout=10000) + d = self.cl._request("testserv2:8008", "GET", "foo/bar", timeout=10000) self.pump() f = self.failureResultOf(d) self.assertIsInstance(f.value, DNSLookupError) def test_client_never_connect(self): - - d = self.cl.put_json("testserv:8008", "foo/bar", timeout=10000) + """ + If the HTTP request is not connected and is timed out, it'll give a ConnectingCancelledError. + """ + d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) self.pump() @@ -72,8 +73,10 @@ def test_client_never_connect(self): self.assertIsInstance(f.value, ConnectingCancelledError) def test_client_connect_no_response(self): - - d = self.cl.put_json("testserv:8008", "foo/bar", timeout=10000) + """ + If the HTTP request is connected, but gets no response before being timed out, it'll give a ResponseNeverReceived. + """ + d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) self.pump() @@ -87,7 +90,6 @@ def test_client_connect_no_response(self): self.assertEqual(clients[0][1], 8008) conn = Mock() - client = clients[0][2].buildProtocol(None) client.makeConnection(conn) @@ -99,3 +101,53 @@ def test_client_connect_no_response(self): f = self.failureResultOf(d) self.assertIsInstance(f.value, ResponseNeverReceived) + + def test_client_gets_headers(self): + """ + Once the client gets the headers, _request returns successfully. + """ + d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) + + self.pump() + + conn = Mock() + clients = self.reactor.tcpClients + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred does not have a result + self.assertFalse(d.called) + + # Send it the HTTP response + client.dataReceived(b"HTTP/1.1 200 OK\r\nServer: Fake\r\n\r\n") + + # We should get a successful response + r = self.successResultOf(d) + self.assertEqual(r.code, 200) + + def test_client_headers_no_body(self): + """ + If the HTTP request is connected, but gets no response before being timed out, it'll give a ResponseNeverReceived. + """ + d = self.cl.post_json("testserv:8008", "foo/bar", timeout=10000) + + self.pump() + + conn = Mock() + clients = self.reactor.tcpClients + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred does not have a result + self.assertFalse(d.called) + + # Send it the HTTP response + client.dataReceived( + b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nServer: Fake\r\n\r\n" + ) + + # Push by enough to time it out + self.reactor.advance(10.5) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, TimeoutError) diff --git a/tests/server.py b/tests/server.py index 5dbce0746972..575c873cedb1 100644 --- a/tests/server.py +++ b/tests/server.py @@ -164,19 +164,29 @@ class ThreadedMemoryReactorClock(MemoryReactorClock): """ A MemoryReactorClock that supports callFromThread. """ + def __init__(self): self._udp = [] self.lookups = {} class Resolver(object): - def resolveHostName(_self, resolutionReceiver, hostName, portNumber=0, addressTypes=None, transportSemantics='TCP'): + def resolveHostName( + _self, + resolutionReceiver, + hostName, + portNumber=0, + addressTypes=None, + transportSemantics='TCP', + ): resolution = HostResolution(hostName) resolutionReceiver.resolutionBegan(resolution) if hostName not in self.lookups: raise DNSLookupError("OH NO") - resolutionReceiver.addressResolved(IPv4Address('TCP', self.lookups[hostName], portNumber)) + resolutionReceiver.addressResolved( + IPv4Address('TCP', self.lookups[hostName], portNumber) + ) resolutionReceiver.resolutionComplete() return resolution From c7ef6c1ec28e3d18c31a16275229d270933913f7 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 13 Sep 2018 23:15:02 +1000 Subject: [PATCH 3/4] cleanups --- synapse/http/matrixfederationclient.py | 2 +- tests/http/test_fedclient.py | 12 ++++++++---- tests/server.py | 1 - 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 7c10aa54864a..e52b0563e83e 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -397,7 +397,7 @@ def put_json(self, destination, path, args={}, data={}, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, - backoff_on_404=backoff_on_404 + backoff_on_404=backoff_on_404, ) if 200 <= response.code < 300: diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py index c48df3f6f217..1c46c9cfeb9e 100644 --- a/tests/http/test_fedclient.py +++ b/tests/http/test_fedclient.py @@ -48,7 +48,8 @@ def test_dns_error(self): def test_client_never_connect(self): """ - If the HTTP request is not connected and is timed out, it'll give a ConnectingCancelledError. + If the HTTP request is not connected and is timed out, it'll give a + ConnectingCancelledError. """ d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) @@ -74,7 +75,8 @@ def test_client_never_connect(self): def test_client_connect_no_response(self): """ - If the HTTP request is connected, but gets no response before being timed out, it'll give a ResponseNeverReceived. + If the HTTP request is connected, but gets no response before being + timed out, it'll give a ResponseNeverReceived. """ d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) @@ -127,7 +129,8 @@ def test_client_gets_headers(self): def test_client_headers_no_body(self): """ - If the HTTP request is connected, but gets no response before being timed out, it'll give a ResponseNeverReceived. + If the HTTP request is connected, but gets no response before being + timed out, it'll give a ResponseNeverReceived. """ d = self.cl.post_json("testserv:8008", "foo/bar", timeout=10000) @@ -143,7 +146,8 @@ def test_client_headers_no_body(self): # Send it the HTTP response client.dataReceived( - b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nServer: Fake\r\n\r\n" + (b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n" + b"Server: Fake\r\n\r\n") ) # Push by enough to time it out diff --git a/tests/server.py b/tests/server.py index 575c873cedb1..d15753513bcf 100644 --- a/tests/server.py +++ b/tests/server.py @@ -194,7 +194,6 @@ def resolveHostName( super(ThreadedMemoryReactorClock, self).__init__() def listenUDP(self, port, protocol, interface='', maxPacketSize=8196): - print(port) p = udp.Port(port, protocol, interface, maxPacketSize, self) p.startListening() self._udp.append(p) From d4c61a43f25442247033cff0e5dd1a7362b5955c Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 14 Sep 2018 00:00:24 +1000 Subject: [PATCH 4/4] cleanups --- changelog.d/3857.misc | 1 + synapse/http/matrixfederationclient.py | 1 - tests/server.py | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) create mode 100644 changelog.d/3857.misc diff --git a/changelog.d/3857.misc b/changelog.d/3857.misc new file mode 100644 index 000000000000..e128d193d9a3 --- /dev/null +++ b/changelog.d/3857.misc @@ -0,0 +1 @@ +Refactor some HTTP timeout code. \ No newline at end of file diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index e52b0563e83e..c49dbacd9371 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -40,7 +40,6 @@ HttpResponseException, SynapseError, ) -from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint from synapse.util import logcontext from synapse.util.logcontext import make_deferred_yieldable diff --git a/tests/server.py b/tests/server.py index d15753513bcf..420ec4e08859 100644 --- a/tests/server.py +++ b/tests/server.py @@ -6,7 +6,7 @@ import attr from zope.interface import implementer -from twisted.internet import address, defer, threads, udp +from twisted.internet import address, threads, udp from twisted.internet._resolver import HostResolution from twisted.internet.address import IPv4Address from twisted.internet.defer import Deferred