Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Wrap connections in an N minute timeout to ensure they get reaped correctly #1725

Merged
merged 6 commits into from
Dec 29, 2016
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 58 additions & 4 deletions synapse/http/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint
from twisted.internet import defer
from twisted.internet import defer, reactor
from twisted.internet.error import ConnectError
from twisted.names import client, dns
from twisted.names.error import DNSNameError, DomainError
Expand Down Expand Up @@ -66,13 +66,67 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
default_port = 8448

if port is None:
return SRVClientEndpoint(
return _WrappingEndpointFac(SRVClientEndpoint(
reactor, "matrix", domain, protocol="tcp",
default_port=default_port, endpoint=transport_endpoint,
endpoint_kw_args=endpoint_kw_args
)
))
else:
return transport_endpoint(reactor, domain, port, **endpoint_kw_args)
return _WrappingEndpointFac(transport_endpoint(
reactor, domain, port, **endpoint_kw_args
))


class _WrappingEndpointFac(object):
def __init__(self, endpoint_fac):
self.endpoint_fac = endpoint_fac

@defer.inlineCallbacks
def connect(self, protocolFactory):
conn = yield self.endpoint_fac.connect(protocolFactory)
conn = _WrappedConnection(conn)
defer.returnValue(conn)


class _WrappedConnection(object):
"""Wraps a connection and calls abort on it if it hasn't seen any actio
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing "n" in "action". Also, the comment says 5 minutes, but the code says 3 minutes?

for 5 minutes
"""
__slots__ = ["conn", "last_request"]

def __init__(self, conn):
object.__setattr__(self, "conn", conn)
object.__setattr__(self, "last_request", time.time())

def __getattr__(self, name):
return getattr(self.conn, name)

def __setattr__(self, name, value):
setattr(self.conn, name, value)

def _time_things_out_maybe(self):
# We use a slightly shorter timeout here just in case the callLater is
# triggered early. Paranoia ftw.
if time.time() - self.last_request >= 2.5 * 60:
self.abort()

def request(self, request):
self.last_request = time.time()

# Time this connection out if we haven't send a request in the last
# N minutes
reactor.callLater(3 * 60, self._time_things_out_maybe)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it 3 mins here, but 2 mins in the _time_things_out_maybe above?


d = self.conn.request(request)

def update_request_time(res):
self.last_request = time.time()
reactor.callLater(3 * 60, self._time_things_out_maybe)
return res

d.addCallback(update_request_time)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we do this by both a callback and a reactor.callLater()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We schedule a timeout in 3 minutes both before we send the request, and after we received the response.


return d


class SpiderEndpoint(object):
Expand Down
7 changes: 4 additions & 3 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ def __init__(self, hs):
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 10
pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60
self.agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=pool
)
Expand Down Expand Up @@ -299,7 +300,7 @@ def body_callback(method, url_bytes, headers_dict):
defer.returnValue(json.loads(body))

@defer.inlineCallbacks
def post_json(self, destination, path, data={}, long_retries=True,
def post_json(self, destination, path, data={}, long_retries=False,
timeout=None):
""" Sends the specifed json data using POST

Expand Down Expand Up @@ -332,7 +333,7 @@ def body_callback(method, url_bytes, headers_dict):
path.encode("ascii"),
body_callback=body_callback,
headers_dict={"Content-Type": ["application/json"]},
long_retries=True,
long_retries=long_retries,
timeout=timeout,
)

Expand Down