-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Wrap connections in an N minute timeout to ensure they get reaped correctly #1725
Changes from 2 commits
f5a4001
5b6672c
b7336ff
68030fd
b4bc6fe
97ffc56
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,7 @@ | |
# limitations under the License. | ||
|
||
from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint | ||
from twisted.internet import defer | ||
from twisted.internet import defer, reactor, task | ||
from twisted.internet.error import ConnectError | ||
from twisted.names import client, dns | ||
from twisted.names.error import DNSNameError, DomainError | ||
|
@@ -66,13 +66,63 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, | |
default_port = 8448 | ||
|
||
if port is None: | ||
return SRVClientEndpoint( | ||
return _WrappingEndointFac(SRVClientEndpoint( | ||
reactor, "matrix", domain, protocol="tcp", | ||
default_port=default_port, endpoint=transport_endpoint, | ||
endpoint_kw_args=endpoint_kw_args | ||
) | ||
)) | ||
else: | ||
return transport_endpoint(reactor, domain, port, **endpoint_kw_args) | ||
return _WrappingEndointFac(transport_endpoint(reactor, domain, port, **endpoint_kw_args)) | ||
|
||
|
||
class _WrappingEndointFac(object): | ||
def __init__(self, endpoint_fac): | ||
self.endpoint_fac = endpoint_fac | ||
|
||
@defer.inlineCallbacks | ||
def connect(self, protocolFactory): | ||
conn = yield self.endpoint_fac.connect(protocolFactory) | ||
conn = _WrappedConnection(conn) | ||
defer.returnValue(conn) | ||
|
||
|
||
class _WrappedConnection(object): | ||
"""Wraps a connection and calls abort on it if it hasn't seen any actio | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missing "n" in "action". Also, the comment says 5 minutes, but the code says 3 minutes? |
||
for 5 minutes | ||
""" | ||
__slots__ = ["conn", "last_request"] | ||
|
||
def __init__(self, conn): | ||
object.__setattr__(self, "conn", conn) | ||
object.__setattr__(self, "last_request", time.time()) | ||
|
||
def __getattr__(self, name): | ||
return getattr(self.conn, name) | ||
|
||
def __setattr__(self, name, value): | ||
setattr(self.conn, name, value) | ||
|
||
def _time_things_out_maybe(self): | ||
if time.time() - self.last_request >= 2 * 60: | ||
self.abort() | ||
|
||
def request(self, request): | ||
self.last_request = time.time() | ||
|
||
# Time this connection out if we haven't send a request in the last | ||
# N minutes | ||
reactor.callLater(3 * 60, self._time_things_out_maybe) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is it 3 mins here, but 2 mins in the _time_things_out_maybe above? |
||
|
||
d = self.conn.request(request) | ||
|
||
def update_request_time(res): | ||
self.last_request = time.time() | ||
reactor.callLater(3 * 60, self._time_things_out_maybe) | ||
return res | ||
|
||
d.addCallback(update_request_time) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we do this by both a callback and a reactor.callLater()? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We schedule a timeout in 3 minutes both before we send the request, and after we received the response. |
||
|
||
return d | ||
|
||
|
||
class SpiderEndpoint(object): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,6 +61,11 @@ | |
MAX_SHORT_RETRIES = 3 | ||
|
||
|
||
def test(conn): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hm, is this meant to be here? |
||
conn.loseConnection() | ||
return conn | ||
|
||
|
||
class MatrixFederationEndpointFactory(object): | ||
def __init__(self, hs): | ||
self.tls_server_context_factory = hs.tls_server_context_factory | ||
|
@@ -88,7 +93,8 @@ def __init__(self, hs): | |
self.signing_key = hs.config.signing_key[0] | ||
self.server_name = hs.hostname | ||
pool = HTTPConnectionPool(reactor) | ||
pool.maxPersistentPerHost = 10 | ||
pool.maxPersistentPerHost = 5 | ||
pool.cachedConnectionTimeout = 2 * 60 | ||
self.agent = Agent.usingEndpointFactory( | ||
reactor, MatrixFederationEndpointFactory(hs), pool=pool | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Endoint/Endpoint/ probably ;)