Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove redundant WrappedConnection #4409

Merged
merged 3 commits into from
Jan 18, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4409.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove redundant federation connection wrapping code
75 changes: 4 additions & 71 deletions synapse/http/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,82 +140,15 @@ def transport_endpoint(reactor, host, port, timeout):
default_port = 8448

if port is None:
return _WrappingEndpointFac(SRVClientEndpoint(
return SRVClientEndpoint(
reactor, "matrix", domain, protocol="tcp",
default_port=default_port, endpoint=transport_endpoint,
endpoint_kw_args=endpoint_kw_args
), reactor)
)
else:
return _WrappingEndpointFac(transport_endpoint(
return transport_endpoint(
reactor, domain, port, **endpoint_kw_args
), reactor)


class _WrappingEndpointFac(object):
def __init__(self, endpoint_fac, reactor):
self.endpoint_fac = endpoint_fac
self.reactor = reactor

@defer.inlineCallbacks
def connect(self, protocolFactory):
conn = yield self.endpoint_fac.connect(protocolFactory)
conn = _WrappedConnection(conn, self.reactor)
defer.returnValue(conn)


class _WrappedConnection(object):
"""Wraps a connection and calls abort on it if it hasn't seen any action
for 2.5-3 minutes.
"""
__slots__ = ["conn", "last_request"]

def __init__(self, conn, reactor):
object.__setattr__(self, "conn", conn)
object.__setattr__(self, "last_request", time.time())
self._reactor = reactor

def __getattr__(self, name):
return getattr(self.conn, name)

def __setattr__(self, name, value):
setattr(self.conn, name, value)

def _time_things_out_maybe(self):
# We use a slightly shorter timeout here just in case the callLater is
# triggered early. Paranoia ftw.
# TODO: Cancel the previous callLater rather than comparing time.time()?
if time.time() - self.last_request >= 2.5 * 60:
self.abort()
# Abort the underlying TLS connection. The abort() method calls
# loseConnection() on the TLS connection which tries to
# shutdown the connection cleanly. We call abortConnection()
# since that will promptly close the TLS connection.
#
# In Twisted >18.4; the TLS connection will be None if it has closed
# which will make abortConnection() throw. Check that the TLS connection
# is not None before trying to close it.
if self.transport.getHandle() is not None:
self.transport.abortConnection()

def request(self, request):
self.last_request = time.time()

# Time this connection out if we haven't send a request in the last
# N minutes
# TODO: Cancel the previous callLater?
self._reactor.callLater(3 * 60, self._time_things_out_maybe)

d = self.conn.request(request)

def update_request_time(res):
self.last_request = time.time()
# TODO: Cancel the previous callLater?
self._reactor.callLater(3 * 60, self._time_things_out_maybe)
return res

d.addCallback(update_request_time)

return d
)


class SRVClientEndpoint(object):
Expand Down
30 changes: 15 additions & 15 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,23 +320,23 @@ def _send_request(
url_str,
)

# we don't want all the fancy cookie and redirect handling that
# treq.request gives: just use the raw Agent.
request_deferred = self.agent.request(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some reason, the change makes this now throw some exceptions synchronously, hence moving it inside the try/catch. It should have been there anyway imho.

method_bytes,
url_bytes,
headers=Headers(headers_dict),
bodyProducer=producer,
)

request_deferred = timeout_deferred(
request_deferred,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)

try:
with Measure(self.clock, "outbound_request"):
# we don't want all the fancy cookie and redirect handling
# that treq.request gives: just use the raw Agent.
request_deferred = self.agent.request(
method_bytes,
url_bytes,
headers=Headers(headers_dict),
bodyProducer=producer,
)

request_deferred = timeout_deferred(
request_deferred,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)

response = yield make_deferred_yieldable(
request_deferred,
)
Expand Down
76 changes: 69 additions & 7 deletions tests/http/test_fedclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

from mock import Mock

from zope.interface import implementer

from twisted.internet.defer import TimeoutError
from twisted.internet.error import ConnectingCancelledError, DNSLookupError
from twisted.internet.interfaces import ITransport
from twisted.web.client import ResponseNeverReceived
from twisted.web.http import HTTPChannel

Expand Down Expand Up @@ -44,7 +47,7 @@ def prepare(self, reactor, clock, homeserver):

def test_dns_error(self):
"""
If the DNS raising returns an error, it will bubble up.
If the DNS lookup returns an error, it will bubble up.
"""
d = self.cl.get_json("testserv2:8008", "foo/bar", timeout=10000)
self.pump()
Expand All @@ -63,7 +66,7 @@ def test_client_never_connect(self):
self.pump()

# Nothing happened yet
self.assertFalse(d.called)
self.assertNoResult(d)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this gives a slightly more helpful error when it fails.


# Make sure treq is trying to connect
clients = self.reactor.tcpClients
Expand All @@ -72,7 +75,7 @@ def test_client_never_connect(self):
self.assertEqual(clients[0][1], 8008)

# Deferred is still without a result
self.assertFalse(d.called)
self.assertNoResult(d)

# Push by enough to time it out
self.reactor.advance(10.5)
Expand All @@ -94,7 +97,7 @@ def test_client_connect_no_response(self):
self.pump()

# Nothing happened yet
self.assertFalse(d.called)
self.assertNoResult(d)

# Make sure treq is trying to connect
clients = self.reactor.tcpClients
Expand All @@ -107,7 +110,7 @@ def test_client_connect_no_response(self):
client.makeConnection(conn)

# Deferred is still without a result
self.assertFalse(d.called)
self.assertNoResult(d)

# Push by enough to time it out
self.reactor.advance(10.5)
Expand Down Expand Up @@ -135,7 +138,7 @@ def test_client_gets_headers(self):
client.makeConnection(conn)

# Deferred does not have a result
self.assertFalse(d.called)
self.assertNoResult(d)

# Send it the HTTP response
client.dataReceived(b"HTTP/1.1 200 OK\r\nServer: Fake\r\n\r\n")
Expand All @@ -159,7 +162,7 @@ def test_client_headers_no_body(self):
client.makeConnection(conn)

# Deferred does not have a result
self.assertFalse(d.called)
self.assertNoResult(d)

# Send it the HTTP response
client.dataReceived(
Expand Down Expand Up @@ -195,3 +198,62 @@ def test_client_sends_body(self):
request = server.requests[0]
content = request.content.read()
self.assertEqual(content, b'{"a":"b"}')

def test_closes_connection(self):
d = self.cl.get_json("testserv:8008", "foo/bar")

self.pump()

# there should have been a call to connectTCP
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
(_host, _port, factory, _timeout, _bindAddress) = clients[0]

# complete the connection and wire it up to a fake transport
client = factory.buildProtocol(None)
conn = MockTransport()
client.makeConnection(conn)

# that should have made it send the request to the connection
self.assertRegex(conn.written, b"^GET /foo/bar")

# Send the HTTP response
client.dataReceived(
b"HTTP/1.1 200 OK\r\n"
b"Content-Type: application/json\r\n"
b"Content-Length: 2\r\n"
b"\r\n"
b"{}"
)

# We should get a successful response
r = self.successResultOf(d)
self.assertEqual(r, {})

self.assertFalse(conn.lostConnection)

# wait for a while
self.pump(120)

self.assertTrue(conn.lostConnection)


@implementer(ITransport)
class MockTransport(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe https://twistedmatrix.com/documents/current/api/twisted.test.proto_helpers.StringTransport.html would reduce some duplication here? (twisted.test.proto_helpers is public API)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes :)

def __init__(self):
self.paused = False
self.written = b''
self.lostConnection = False

def pauseProducing(self):
self.paused = True

def resumeProducing(self):
self.paused = False

def writeSequence(self, data):
for d in data:
self.written += d

def loseConnection(self):
self.lostConnection = True