diff --git a/esrally/async_connection.py b/esrally/async_connection.py index bd7e302a3..f5293d073 100644 --- a/esrally/async_connection.py +++ b/esrally/async_connection.py @@ -117,8 +117,10 @@ def perform_request(self, method, url, params=None, body=None, timeout=None, ign start = self.loop.time() response = None try: - with async_timeout.timeout(timeout or self.timeout.total, loop=self.loop): - response = yield from self.session.request(method, url, data=body, headers=headers) + request_timeout = timeout or self.timeout.total + with async_timeout.timeout(request_timeout, loop=self.loop): + # override the default session timeout explicitly + response = yield from self.session.request(method, url, data=body, headers=headers, timeout=request_timeout) raw_data = yield from response.text() duration = self.loop.time() - start diff --git a/esrally/client.py b/esrally/client.py index e2df92d47..161d65013 100644 --- a/esrally/client.py +++ b/esrally/client.py @@ -161,6 +161,8 @@ async def on_request_end(session, trace_config_ctx, params): trace_config = aiohttp.TraceConfig() trace_config.on_request_start.append(on_request_start) trace_config.on_request_end.append(on_request_end) + # ensure that we also stop the timer when a request "ends" with an exception (e.g. a timeout) + trace_config.on_request_exception.append(on_request_end) # needs patching as https://github.com/elastic/elasticsearch-py-async/pull/68 is not merged yet class RallyAsyncTransport(elasticsearch_async.transport.AsyncTransport): diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index e832c0602..68a62d87e 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -605,12 +605,11 @@ async def __call__(self, es, params): except elasticsearch.TransportError as e: # this is caused by older versions of Elasticsearch (< 2.1), fall back to optimize if e.status_code == 400: - params = {"request_timeout": request_timeout} if max_num_segments: - await es.transport.perform_request("POST", "/_optimize?max_num_segments={}".format(max_num_segments), - params=params) + await es.transport.perform_request("POST", f"/_optimize?max_num_segments={max_num_segments}", + timeout=request_timeout) else: - await es.transport.perform_request("POST", "/_optimize", params=params) + await es.transport.perform_request("POST", "/_optimize", timeout=request_timeout) else: raise e diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 8370edc8a..840dc4eba 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -911,7 +911,7 @@ async def test_force_merge_override_request_timeout(self, es): es.indices.forcemerge.return_value = as_future() force_merge = runner.ForceMerge() - await force_merge(es, params={"index" : "_all", "request-timeout": 50000}) + await force_merge(es, params={"index": "_all", "request-timeout": 50000}) es.indices.forcemerge.assert_called_once_with(index="_all", request_timeout=50000) @@ -934,7 +934,7 @@ async def test_optimize_with_defaults(self, es): force_merge = runner.ForceMerge() await force_merge(es, params={}) - es.transport.perform_request.assert_called_once_with("POST", "/_optimize", params={"request_timeout": None}) + es.transport.perform_request.assert_called_once_with("POST", "/_optimize", timeout=None) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -944,8 +944,7 @@ async def test_optimize_with_params(self, es): force_merge = runner.ForceMerge() await force_merge(es, params={"max-num-segments": 3, "request-timeout": 17000}) - es.transport.perform_request.assert_called_once_with("POST", "/_optimize?max_num_segments=3", - params={"request_timeout": 17000}) + es.transport.perform_request.assert_called_once_with("POST", "/_optimize?max_num_segments=3", timeout=17000) class IndicesStatsRunnerTests(TestCase):