Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix request timeout handling #943

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions esrally/async_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,10 @@ def perform_request(self, method, url, params=None, body=None, timeout=None, ign
start = self.loop.time()
response = None
try:
with async_timeout.timeout(timeout or self.timeout.total, loop=self.loop):
response = yield from self.session.request(method, url, data=body, headers=headers)
request_timeout = timeout or self.timeout.total
with async_timeout.timeout(request_timeout, loop=self.loop):
# override the default session timeout explicitly
response = yield from self.session.request(method, url, data=body, headers=headers, timeout=request_timeout)
raw_data = yield from response.text()
duration = self.loop.time() - start

Expand Down
2 changes: 2 additions & 0 deletions esrally/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ async def on_request_end(session, trace_config_ctx, params):
trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)
trace_config.on_request_end.append(on_request_end)
# ensure that we also stop the timer when a request "ends" with an exception (e.g. a timeout)
trace_config.on_request_exception.append(on_request_end)

# needs patching as https://github.com/elastic/elasticsearch-py-async/pull/68 is not merged yet
class RallyAsyncTransport(elasticsearch_async.transport.AsyncTransport):
Expand Down
7 changes: 3 additions & 4 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,11 @@ async def __call__(self, es, params):
except elasticsearch.TransportError as e:
# this is caused by older versions of Elasticsearch (< 2.1), fall back to optimize
if e.status_code == 400:
params = {"request_timeout": request_timeout}
if max_num_segments:
await es.transport.perform_request("POST", "/_optimize?max_num_segments={}".format(max_num_segments),
params=params)
await es.transport.perform_request("POST", f"/_optimize?max_num_segments={max_num_segments}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And so it begins (f-strings in the codebase) :)

timeout=request_timeout)
else:
await es.transport.perform_request("POST", "/_optimize", params=params)
await es.transport.perform_request("POST", "/_optimize", timeout=request_timeout)
else:
raise e

Expand Down
7 changes: 3 additions & 4 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ async def test_force_merge_override_request_timeout(self, es):
es.indices.forcemerge.return_value = as_future()

force_merge = runner.ForceMerge()
await force_merge(es, params={"index" : "_all", "request-timeout": 50000})
await force_merge(es, params={"index": "_all", "request-timeout": 50000})

es.indices.forcemerge.assert_called_once_with(index="_all", request_timeout=50000)

Expand All @@ -934,7 +934,7 @@ async def test_optimize_with_defaults(self, es):
force_merge = runner.ForceMerge()
await force_merge(es, params={})

es.transport.perform_request.assert_called_once_with("POST", "/_optimize", params={"request_timeout": None})
es.transport.perform_request.assert_called_once_with("POST", "/_optimize", timeout=None)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
Expand All @@ -944,8 +944,7 @@ async def test_optimize_with_params(self, es):
force_merge = runner.ForceMerge()
await force_merge(es, params={"max-num-segments": 3, "request-timeout": 17000})

es.transport.perform_request.assert_called_once_with("POST", "/_optimize?max_num_segments=3",
params={"request_timeout": 17000})
es.transport.perform_request.assert_called_once_with("POST", "/_optimize?max_num_segments=3", timeout=17000)


class IndicesStatsRunnerTests(TestCase):
Expand Down