From 5b6672c66de693c390091c402f2dbb4a0f467aaf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Dec 2016 22:49:31 +0000 Subject: [PATCH 1/5] Wrap connections in an N minute timeout to ensure they get reaped correctly --- synapse/http/endpoint.py | 58 ++++++++++++++++++++++++-- synapse/http/matrixfederationclient.py | 8 +++- 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 442696d39388..16ad09a4812e 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -14,7 +14,7 @@ # limitations under the License. from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint -from twisted.internet import defer +from twisted.internet import defer, reactor, task from twisted.internet.error import ConnectError from twisted.names import client, dns from twisted.names.error import DNSNameError, DomainError @@ -66,13 +66,63 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, default_port = 8448 if port is None: - return SRVClientEndpoint( + return _WrappingEndointFac(SRVClientEndpoint( reactor, "matrix", domain, protocol="tcp", default_port=default_port, endpoint=transport_endpoint, endpoint_kw_args=endpoint_kw_args - ) + )) else: - return transport_endpoint(reactor, domain, port, **endpoint_kw_args) + return _WrappingEndointFac(transport_endpoint(reactor, domain, port, **endpoint_kw_args)) + + +class _WrappingEndointFac(object): + def __init__(self, endpoint_fac): + self.endpoint_fac = endpoint_fac + + @defer.inlineCallbacks + def connect(self, protocolFactory): + conn = yield self.endpoint_fac.connect(protocolFactory) + conn = _WrappedConnection(conn) + defer.returnValue(conn) + + +class _WrappedConnection(object): + """Wraps a connection and calls abort on it if it hasn't seen any actio + for 5 minutes + """ + __slots__ = ["conn", "last_request"] + + def __init__(self, conn): + object.__setattr__(self, "conn", conn) + object.__setattr__(self, "last_request", time.time()) + + def __getattr__(self, name): + return getattr(self.conn, name) + + def __setattr__(self, name, value): + setattr(self.conn, name, value) + + def _time_things_out_maybe(self): + if time.time() - self.last_request >= 2 * 60: + self.abort() + + def request(self, request): + self.last_request = time.time() + + # Time this connection out if we haven't send a request in the last + # N minutes + reactor.callLater(3 * 60, self._time_things_out_maybe) + + d = self.conn.request(request) + + def update_request_time(res): + self.last_request = time.time() + reactor.callLater(3 * 60, self._time_things_out_maybe) + return res + + d.addCallback(update_request_time) + + return d class SpiderEndpoint(object): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index d5970c05a802..da98d2d6665c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -61,6 +61,11 @@ MAX_SHORT_RETRIES = 3 +def test(conn): + conn.loseConnection() + return conn + + class MatrixFederationEndpointFactory(object): def __init__(self, hs): self.tls_server_context_factory = hs.tls_server_context_factory @@ -88,7 +93,8 @@ def __init__(self, hs): self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname pool = HTTPConnectionPool(reactor) - pool.maxPersistentPerHost = 10 + pool.maxPersistentPerHost = 5 + pool.cachedConnectionTimeout = 2 * 60 self.agent = Agent.usingEndpointFactory( reactor, MatrixFederationEndpointFactory(hs), pool=pool ) From b7336ff32d4f9883c06c538901f6566b7ccb1ebe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Dec 2016 00:09:33 +0000 Subject: [PATCH 2/5] Clean up --- synapse/http/endpoint.py | 6 ++++-- synapse/http/matrixfederationclient.py | 5 ----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 16ad09a4812e..4d361bbe73f6 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -14,7 +14,7 @@ # limitations under the License. from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint -from twisted.internet import defer, reactor, task +from twisted.internet import defer, reactor from twisted.internet.error import ConnectError from twisted.names import client, dns from twisted.names.error import DNSNameError, DomainError @@ -72,7 +72,9 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, endpoint_kw_args=endpoint_kw_args )) else: - return _WrappingEndointFac(transport_endpoint(reactor, domain, port, **endpoint_kw_args)) + return _WrappingEndointFac(transport_endpoint( + reactor, domain, port, **endpoint_kw_args + )) class _WrappingEndointFac(object): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index da98d2d6665c..4d40219fcccb 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -61,11 +61,6 @@ MAX_SHORT_RETRIES = 3 -def test(conn): - conn.loseConnection() - return conn - - class MatrixFederationEndpointFactory(object): def __init__(self, hs): self.tls_server_context_factory = hs.tls_server_context_factory From 68030fd37bcae8e1dd3deea971b7f13461e0ef72 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Dec 2016 00:10:49 +0000 Subject: [PATCH 3/5] Spelling and comments --- synapse/http/endpoint.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 4d361bbe73f6..95424481ca0a 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -66,18 +66,18 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, default_port = 8448 if port is None: - return _WrappingEndointFac(SRVClientEndpoint( + return _WrappingEndpointFac(SRVClientEndpoint( reactor, "matrix", domain, protocol="tcp", default_port=default_port, endpoint=transport_endpoint, endpoint_kw_args=endpoint_kw_args )) else: - return _WrappingEndointFac(transport_endpoint( + return _WrappingEndpointFac(transport_endpoint( reactor, domain, port, **endpoint_kw_args )) -class _WrappingEndointFac(object): +class _WrappingEndpointFac(object): def __init__(self, endpoint_fac): self.endpoint_fac = endpoint_fac @@ -105,7 +105,9 @@ def __setattr__(self, name, value): setattr(self.conn, name, value) def _time_things_out_maybe(self): - if time.time() - self.last_request >= 2 * 60: + # We use a slightly shorter timeout here just in case the callLater is + # triggered early. Paranoia ftw. + if time.time() - self.last_request >= 2.5 * 60: self.abort() def request(self, request): From b4bc6fef5b2624f9f1a7319d266827027e260bec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Dec 2016 00:58:34 +0000 Subject: [PATCH 4/5] Respect long_retries param and default to off --- synapse/http/matrixfederationclient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 4d40219fcccb..78b92cef369c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -300,7 +300,7 @@ def body_callback(method, url_bytes, headers_dict): defer.returnValue(json.loads(body)) @defer.inlineCallbacks - def post_json(self, destination, path, data={}, long_retries=True, + def post_json(self, destination, path, data={}, long_retries=False, timeout=None): """ Sends the specifed json data using POST @@ -333,7 +333,7 @@ def body_callback(method, url_bytes, headers_dict): path.encode("ascii"), body_callback=body_callback, headers_dict={"Content-Type": ["application/json"]}, - long_retries=True, + long_retries=long_retries, timeout=timeout, ) From 97ffc5690b713b64556dc4d0993cf2a96f4477e8 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Dec 2016 15:51:04 +0000 Subject: [PATCH 5/5] Manually abort the underlying TLS connection. The abort() method calls loseConnection() which tries to shutdown the TLS connection cleanly. We now call abortConnection() directly which should promptly close both the TLS connection and the underlying TCP connection. I also added some TODO markers to consider cancelling the old previous timeout rather than checking time.time(). But given how urgently we want to get this code released I'd rather leave the existing code with the duplicate timeouts and the time.time() check. --- synapse/http/endpoint.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 95424481ca0a..8c64339a7cf6 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -89,8 +89,8 @@ def connect(self, protocolFactory): class _WrappedConnection(object): - """Wraps a connection and calls abort on it if it hasn't seen any actio - for 5 minutes + """Wraps a connection and calls abort on it if it hasn't seen any action + for 2.5-3 minutes. """ __slots__ = ["conn", "last_request"] @@ -107,20 +107,28 @@ def __setattr__(self, name, value): def _time_things_out_maybe(self): # We use a slightly shorter timeout here just in case the callLater is # triggered early. Paranoia ftw. + # TODO: Cancel the previous callLater rather than comparing time.time()? if time.time() - self.last_request >= 2.5 * 60: self.abort() + # Abort the underlying TLS connection. The abort() method calls + # loseConnection() on the underlying TLS connection which tries to + # shutdown the connection cleanly. We call abortConnection() + # since that will promptly close the underlying TCP connection. + self.transport.abortConnection() def request(self, request): self.last_request = time.time() # Time this connection out if we haven't send a request in the last # N minutes + # TODO: Cancel the previous callLater? reactor.callLater(3 * 60, self._time_things_out_maybe) d = self.conn.request(request) def update_request_time(res): self.last_request = time.time() + # TODO: Cancel the previous callLater? reactor.callLater(3 * 60, self._time_things_out_maybe) return res