Skip to content

Commit

Permalink
Fix fallback implementation of ML-related runners (#956)
Browse files Browse the repository at this point in the history
With this commit we fix several issues of the fallback implementation of
ML-related runners. This implementation is only used for older versions
of Elasticsearch which still include the `_xpack` path component and use
the raw transport API of the Elasticsearch client. However, especially
the handling of request parameters did not conform to the API
requirements.

Relates #812
  • Loading branch information
danielmitterdorfer authored Apr 14, 2020
1 parent 40c1dca commit 9393201
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 36 deletions.
62 changes: 45 additions & 17 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,21 @@ def mandatory(params, key, op):
" parameter source." % (str(op), key))


def escape(v):
"""
Escapes values so they can be used as query parameters
:param v: The raw value. May be None.
:return: The escaped value.
"""
if v is None:
return None
elif isinstance(v, bool):
return str(v).lower()
else:
return str(v)


class BulkIndex(Runner):
"""
Bulk indexes the given documents.
Expand Down Expand Up @@ -1166,8 +1181,7 @@ async def __call__(self, es, params):
if e.status_code == 400:
await es.transport.perform_request(
"PUT",
"/_xpack/ml/datafeeds/%s" % datafeed_id,
params=params,
f"/_xpack/ml/datafeeds/{datafeed_id}",
body=body,
)
else:
Expand All @@ -1194,8 +1208,11 @@ async def __call__(self, es, params):
if e.status_code == 400:
await es.transport.perform_request(
"DELETE",
"/_xpack/ml/datafeeds/%s" % datafeed_id,
params=params,
f"/_xpack/ml/datafeeds/{datafeed_id}",
params={
"force": escape(force),
"ignore": 404
},
)
else:
raise e
Expand Down Expand Up @@ -1223,8 +1240,7 @@ async def __call__(self, es, params):
if e.status_code == 400:
await es.transport.perform_request(
"POST",
"/_xpack/ml/datafeeds/%s/_start" % datafeed_id,
params=params,
f"/_xpack/ml/datafeeds/{datafeed_id}/_start",
body=body,
)
else:
Expand All @@ -1249,10 +1265,15 @@ async def __call__(self, es, params):
except elasticsearch.TransportError as e:
# fallback to old path (ES < 7)
if e.status_code == 400:
request_params = {
"force": escape(force),
}
if timeout:
request_params["timeout"] = escape(timeout)
await es.transport.perform_request(
"POST",
"/_xpack/ml/datafeeds/%s/_stop" % datafeed_id,
params=params
f"/_xpack/ml/datafeeds/{datafeed_id}/_stop",
params=request_params
)
else:
raise e
Expand All @@ -1277,8 +1298,7 @@ async def __call__(self, es, params):
if e.status_code == 400:
await es.transport.perform_request(
"PUT",
"/_xpack/ml/anomaly_detectors/%s" % job_id,
params=params,
f"/_xpack/ml/anomaly_detectors/{job_id}",
body=body,
)
else:
Expand All @@ -1303,10 +1323,13 @@ async def __call__(self, es, params):
except elasticsearch.TransportError as e:
# fallback to old path (ES < 7)
if e.status_code == 400:
es.transport.perform_request(
await es.transport.perform_request(
"DELETE",
"/_xpack/ml/anomaly_detectors/%s" % job_id,
params=params,
f"/_xpack/ml/anomaly_detectors/{job_id}",
params={
"force": escape(force),
"ignore": 404
},
)
else:
raise e
Expand All @@ -1330,8 +1353,7 @@ async def __call__(self, es, params):
if e.status_code == 400:
await es.transport.perform_request(
"POST",
"/_xpack/ml/anomaly_detectors/%s/_open" % job_id,
params=params,
f"/_xpack/ml/anomaly_detectors/{job_id}/_open",
)
else:
raise e
Expand All @@ -1355,10 +1377,16 @@ async def __call__(self, es, params):
except elasticsearch.TransportError as e:
# fallback to old path (ES < 7)
if e.status_code == 400:
request_params = {
"force": escape(force),
}
if timeout:
request_params["timeout"] = escape(timeout)

await es.transport.perform_request(
"POST",
"/_xpack/ml/anomaly_detectors/%s/_close" % job_id,
params=params,
f"/_xpack/ml/anomaly_detectors/{job_id}/_close",
params=request_params,
)
else:
raise e
Expand Down
44 changes: 25 additions & 19 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2201,10 +2201,7 @@ async def test_create_ml_datafeed_fallback(self, es):
r = runner.CreateMlDatafeed()
await r(es, params)

es.transport.perform_request.assert_called_once_with("PUT",
"/_xpack/ml/datafeeds/%s" % datafeed_id,
body=body,
params=params)
es.transport.perform_request.assert_called_once_with("PUT", f"/_xpack/ml/datafeeds/{datafeed_id}", body=body)


class DeleteMlDatafeedTests(TestCase):
Expand Down Expand Up @@ -2237,8 +2234,11 @@ async def test_delete_ml_datafeed_fallback(self, es):
await r(es, params)

es.transport.perform_request.assert_called_once_with("DELETE",
"/_xpack/ml/datafeeds/%s" % datafeed_id,
params=params)
f"/_xpack/ml/datafeeds/{datafeed_id}",
params={
"force": "false",
"ignore": 404
})


class StartMlDatafeedTests(TestCase):
Expand Down Expand Up @@ -2279,9 +2279,8 @@ async def test_start_ml_datafeed_with_body_fallback(self, es):
await r(es, params)

es.transport.perform_request.assert_called_once_with("POST",
"/_xpack/ml/datafeeds/%s/_start" % params["datafeed-id"],
body=body,
params=params)
f"/_xpack/ml/datafeeds/{params['datafeed-id']}/_start",
body=body)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
Expand Down Expand Up @@ -2338,8 +2337,11 @@ async def test_stop_ml_datafeed_fallback(self, es):
await r(es, params)

es.transport.perform_request.assert_called_once_with("POST",
"/_xpack/ml/datafeeds/%s/_stop" % params["datafeed-id"],
params=params)
f"/_xpack/ml/datafeeds/{params['datafeed-id']}/_stop",
params={
"force": str(params["force"]).lower(),
"timeout": params["timeout"]
})


class CreateMlJobTests(TestCase):
Expand Down Expand Up @@ -2406,8 +2408,7 @@ async def test_create_ml_job_fallback(self, es):
await r(es, params)

es.transport.perform_request.assert_called_once_with("PUT",
"/_xpack/ml/anomaly_detectors/%s" % params["job-id"],
params=params,
f"/_xpack/ml/anomaly_detectors/{params['job-id']}",
body=body)


Expand Down Expand Up @@ -2442,8 +2443,11 @@ async def test_delete_ml_job_fallback(self, es):
await r(es, params)

es.transport.perform_request.assert_called_once_with("DELETE",
"/_xpack/ml/anomaly_detectors/%s" % params["job-id"],
params=params)
f"/_xpack/ml/anomaly_detectors/{params['job-id']}",
params={
"force": "false",
"ignore": 404
})


class OpenMlJobTests(TestCase):
Expand Down Expand Up @@ -2477,8 +2481,7 @@ async def test_open_ml_job_fallback(self, es):
await r(es, params)

es.transport.perform_request.assert_called_once_with("POST",
"/_xpack/ml/anomaly_detectors/%s/_open" % params["job-id"],
params=params)
f"/_xpack/ml/anomaly_detectors/{params['job-id']}/_open")


class CloseMlJobTests(TestCase):
Expand Down Expand Up @@ -2513,8 +2516,11 @@ async def test_close_ml_job_fallback(self, es):
await r(es, params)

es.transport.perform_request.assert_called_once_with("POST",
"/_xpack/ml/anomaly_detectors/%s/_close" % params["job-id"],
params=params)
f"/_xpack/ml/anomaly_detectors/{params['job-id']}/_close",
params={
"force": str(params["force"]).lower(),
"timeout": params["timeout"]
})


class RawRequestRunnerTests(TestCase):
Expand Down

0 comments on commit 9393201

Please sign in to comment.