Skip to content

Commit

Permalink
Fix request timeout handling (#943)
Browse files Browse the repository at this point in the history
* Fix request timeout handling

With this commit we address several issues in request timeout handling:

1. We ensure that the end of the request is properly measured
2. We set an explicit request timeout on the connection to avoid that
the default connection timeout is picked.
3. We use the proper method parameter `timeout` when falling back to the
raw transport API instead of setting the ineffective parameter
`request_timeout`.
  • Loading branch information
danielmitterdorfer authored Apr 1, 2020
1 parent c3caddd commit 6766745
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 10 deletions.
6 changes: 4 additions & 2 deletions esrally/async_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,10 @@ def perform_request(self, method, url, params=None, body=None, timeout=None, ign
start = self.loop.time()
response = None
try:
with async_timeout.timeout(timeout or self.timeout.total, loop=self.loop):
response = yield from self.session.request(method, url, data=body, headers=headers)
request_timeout = timeout or self.timeout.total
with async_timeout.timeout(request_timeout, loop=self.loop):
# override the default session timeout explicitly
response = yield from self.session.request(method, url, data=body, headers=headers, timeout=request_timeout)
raw_data = yield from response.text()
duration = self.loop.time() - start

Expand Down
2 changes: 2 additions & 0 deletions esrally/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ async def on_request_end(session, trace_config_ctx, params):
trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)
trace_config.on_request_end.append(on_request_end)
# ensure that we also stop the timer when a request "ends" with an exception (e.g. a timeout)
trace_config.on_request_exception.append(on_request_end)

# needs patching as https://github.com/elastic/elasticsearch-py-async/pull/68 is not merged yet
class RallyAsyncTransport(elasticsearch_async.transport.AsyncTransport):
Expand Down
7 changes: 3 additions & 4 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,11 @@ async def __call__(self, es, params):
except elasticsearch.TransportError as e:
# this is caused by older versions of Elasticsearch (< 2.1), fall back to optimize
if e.status_code == 400:
params = {"request_timeout": request_timeout}
if max_num_segments:
await es.transport.perform_request("POST", "/_optimize?max_num_segments={}".format(max_num_segments),
params=params)
await es.transport.perform_request("POST", f"/_optimize?max_num_segments={max_num_segments}",
timeout=request_timeout)
else:
await es.transport.perform_request("POST", "/_optimize", params=params)
await es.transport.perform_request("POST", "/_optimize", timeout=request_timeout)
else:
raise e

Expand Down
7 changes: 3 additions & 4 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ async def test_force_merge_override_request_timeout(self, es):
es.indices.forcemerge.return_value = as_future()

force_merge = runner.ForceMerge()
await force_merge(es, params={"index" : "_all", "request-timeout": 50000})
await force_merge(es, params={"index": "_all", "request-timeout": 50000})

es.indices.forcemerge.assert_called_once_with(index="_all", request_timeout=50000)

Expand All @@ -934,7 +934,7 @@ async def test_optimize_with_defaults(self, es):
force_merge = runner.ForceMerge()
await force_merge(es, params={})

es.transport.perform_request.assert_called_once_with("POST", "/_optimize", params={"request_timeout": None})
es.transport.perform_request.assert_called_once_with("POST", "/_optimize", timeout=None)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
Expand All @@ -944,8 +944,7 @@ async def test_optimize_with_params(self, es):
force_merge = runner.ForceMerge()
await force_merge(es, params={"max-num-segments": 3, "request-timeout": 17000})

es.transport.perform_request.assert_called_once_with("POST", "/_optimize?max_num_segments=3",
params={"request_timeout": 17000})
es.transport.perform_request.assert_called_once_with("POST", "/_optimize?max_num_segments=3", timeout=17000)


class IndicesStatsRunnerTests(TestCase):
Expand Down

0 comments on commit 6766745

Please sign in to comment.