Skip to content

Commit

Permalink
Add support for assertions (#1157)
Browse files Browse the repository at this point in the history
With this commit we add a new `assertions` property for operations. When
this property is present and assertions are enabled with `--enable-assertions`,
Rally checks all assertions and raises an error if the check fails.
  • Loading branch information
danielmitterdorfer authored Jan 28, 2021
1 parent bbb5ca1 commit b92decc
Show file tree
Hide file tree
Showing 13 changed files with 532 additions and 51 deletions.
5 changes: 5 additions & 0 deletions docs/command_line_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,11 @@ Selects the :doc:`pipeline </pipelines>` that Rally should run.

Rally can autodetect the pipeline in most cases. If you specify ``--distribution-version`` it will auto-select the pipeline ``from-distribution`` otherwise it will use ``from-sources``.

``enable-assertions``
~~~~~~~~~~~~~~~~~~~~~

This option enables assertions on tasks. If an assertion fails, the race is aborted with a message indicating which assertion has failed.

.. _clr_enable_driver_profiling:

``enable-driver-profiling``
Expand Down
33 changes: 32 additions & 1 deletion docs/recipes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ This behavior can also be changed, by invoking Rally with the :ref:`--on-error <
Errors can also be investigated if you have configured a :doc:`dedicated Elasticsearch metrics store </configuration>`.

Checking Queries and Responses
--------------------------------------------------------------
------------------------------

As described above, errors can lead to misleading benchmarking results. Some issues, however, are more subtle and the result of queries not behaving and matching as intended.

Expand All @@ -227,6 +227,7 @@ Consider the following simple Rally operation::
{
"name": "geo_distance",
"operation-type": "search",
"detailed-results": true,
"index": "logs-*",
"body": {
"query": {
Expand Down Expand Up @@ -296,3 +297,33 @@ The number of hits from queries can also be investigated if you have configured
"operation-type" : "Search"
}

Finally, it is also possible to add assertions to an operation::

{
"name": "geo_distance",
"operation-type": "search",
"detailed-results": true,
"index": "logs-*",
"assertions": [
{
"property": "hits",
"condition": ">",
"value": 0
}
],
"body": {
"query": {
"term": {
"http.request.method": {
"value": "GET"
}
}
}
}
}

When a benchmark is executed with ``--enable-assertions`` and this query returns no hits, the benchmark is aborted with a message::

[ERROR] Cannot race. Error in load generator [0]
Cannot run task [geo_distance]: Expected [hits] to be > [0] but was [0].

55 changes: 54 additions & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,54 @@ Each operation consists of the following properties:
* ``name`` (mandatory): The name of this operation. You can choose this name freely. It is only needed to reference the operation when defining schedules.
* ``operation-type`` (mandatory): Type of this operation. See below for the operation types that are supported out of the box in Rally. You can also add arbitrary operations by defining :doc:`custom runners </adding_tracks>`.
* ``include-in-reporting`` (optional, defaults to ``true`` for normal operations and to ``false`` for administrative operations): Whether or not this operation should be included in the command line report. For example you might want Rally to create an index for you but you are not interested in detailed metrics about it. Note that Rally will still record all metrics in the metrics store.
* ``assertions`` (optional, defaults to ``None``): A list of assertions that should be checked. See below for more details.
* ``request-timeout`` (optional, defaults to ``None``): The client-side timeout for this operation. Represented as a floating-point number in seconds, e.g. ``1.5``.
* ``headers`` (optional, defaults to ``None``): A dictionary of key-value pairs to pass as headers in the operation.
* ``opaque-id`` (optional, defaults to ``None`` [unused]): A special ID set as the value of ``x-opaque-id`` in the client headers of the operation. Overrides existing ``x-opaque-id`` entries in ``headers`` (case-insensitive).

**Assertions**

Use assertions for sanity checks, e.g. to ensure a query returns results. Assertions need to be defined with the following properties. All of them are mandatory:

* ``property``: A dot-delimited path to the meta-data field to be checked. Only meta-data fields that are returned by an operation are supported.
* ``condition``: The following conditions are supported: ``<``, ``<=``, ``==``, ``>=``, ``>=``.
* ``value``: The expected value.

Assertions are disabled by default and can be enabled with the command line flag ``--enable-assertions``. A failing assertion aborts the benchmark.

Example::

{
"name": "term",
"operation-type": "search",
"detailed-results": true,
"assertions": [
{
"property": "hits",
"condition": ">",
"value": 0
}
],
"body": {
"query": {
"term": {
"country_code.raw": "AT"
}
}
}
}

.. note::

This requires to set ``detailed-results`` to ``true`` so the search operation gathers additional meta-data, such as the number of hits.

If assertions are enabled with ``--enable-assertions`` and this assertion fails, it exits with the following error message::

[ERROR] Cannot race. Error in load generator [0]
Cannot run task [term]: Expected [hits] to be > [0] but was [0].

**Retries**

Some of the operations below are also retryable (marked accordingly below). Retryable operations expose the following properties:

* ``retries`` (optional, defaults to 0): The number of times the operation is retried.
Expand Down Expand Up @@ -1619,9 +1663,18 @@ composite

With the operation ``composite`` you can specify complex operations consisting of multiple requests to Elasticsearch. This can be used to simulate more complex application behavior, like populating a search page with custom filters. It supports the following parameters:

* ``requests`` (mandatory): A list that specifies the request streams to execute. Each stream consists of operations of either type ``raw-request`` to send requests to Elasticsearch or ``sleep`` to simulate client processing time. Streams execute concurrently, operations within a stream sequentially. It is possible to nest streams. See below for specific examples.
* ``requests`` (mandatory): A list that specifies the request streams to execute. Streams execute concurrently, operations within a stream sequentially. It is possible to nest streams. See below for specific examples.
* ``max-connections`` (optional: defaults to unbounded): The maximum number of concurrent connections per client executing this composite operation. By default, the operation itself does not restrict the number of connections but is bound to Rally's network connection limit. Therefore raise the number of available network connections appropriately (see :ref:`command line reference <clr_client_options>`).

The ``composite`` operation only supports the following operation-types:

* ``raw-request``
* ``sleep``
* ``search``
* ``submit-async-search``
* ``get-async-search``
* ``delete-async-search``

**Examples**

Here we execute two requests concurrently in two streams. The ``composite`` operation will return when both concurrent requests have finished::
Expand Down
8 changes: 6 additions & 2 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,7 @@ def __init__(self, cfg, track, task_allocations, sampler, cancel, complete, abor
self.complete = complete
self.abort_on_error = abort_on_error
self.profiling_enabled = self.cfg.opts("driver", "profiling")
self.assertions_enabled = self.cfg.opts("driver", "assertions")
self.debug_event_loop = self.cfg.opts("system", "async.debug", mandatory=False, default_value=False)
self.logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1242,6 +1243,9 @@ def es_clients(all_hosts, all_client_options):
es = es_clients(self.cfg.opts("client", "hosts").all_hosts,
self.cfg.opts("client", "options").with_max_connections(client_count))

self.logger.info("Task assertions enabled: %s", str(self.assertions_enabled))
runner.enable_assertions(self.assertions_enabled)

aws = []
# A parameter source should only be created once per task - it is partitioned later on per client.
params_per_task = {}
Expand Down Expand Up @@ -1401,9 +1405,9 @@ async def __call__(self, *args, **kwargs):
if completed:
self.logger.info("Task [%s] is considered completed due to external event.", self.task)
break
except BaseException:
except BaseException as e:
self.logger.exception("Could not execute schedule")
raise
raise exceptions.RallyError(f"Cannot run task [{self.task}]: {e}") from None
finally:
# Actively set it if this task completes its parent
if task_completes_parent:
Expand Down
124 changes: 109 additions & 15 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ def runner_for(operation_type):
raise exceptions.RallyError("No runner available for operation type [%s]" % operation_type)


def enable_assertions(enabled):
"""
Changes whether assertions are enabled. The status changes for all tasks that are executed after this call.
:param enabled: ``True`` to enable assertions, ``False`` to disable them.
"""
AssertingRunner.assertions_enabled = enabled


def register_runner(operation_type, runner, **kwargs):
logger = logging.getLogger(__name__)
async_runner = kwargs.get("async_runner", False)
Expand All @@ -109,24 +118,26 @@ def register_runner(operation_type, runner, **kwargs):
if "__aenter__" in dir(runner) and "__aexit__" in dir(runner):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Registering runner object [%s] for [%s].", str(runner), str(operation_type))
__RUNNERS[operation_type] = _multi_cluster_runner(runner, str(runner), context_manager_enabled=True)
cluster_aware_runner = _multi_cluster_runner(runner, str(runner), context_manager_enabled=True)
else:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Registering context-manager capable runner object [%s] for [%s].", str(runner), str(operation_type))
__RUNNERS[operation_type] = _multi_cluster_runner(runner, str(runner))
cluster_aware_runner = _multi_cluster_runner(runner, str(runner))
# we'd rather use callable() but this will erroneously also classify a class as callable...
elif isinstance(runner, types.FunctionType):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Registering runner function [%s] for [%s].", str(runner), str(operation_type))
__RUNNERS[operation_type] = _single_cluster_runner(runner, runner.__name__)
cluster_aware_runner = _single_cluster_runner(runner, runner.__name__)
elif "__aenter__" in dir(runner) and "__aexit__" in dir(runner):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Registering context-manager capable runner object [%s] for [%s].", str(runner), str(operation_type))
__RUNNERS[operation_type] = _single_cluster_runner(runner, str(runner), context_manager_enabled=True)
cluster_aware_runner = _single_cluster_runner(runner, str(runner), context_manager_enabled=True)
else:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Registering runner object [%s] for [%s].", str(runner), str(operation_type))
__RUNNERS[operation_type] = _single_cluster_runner(runner, str(runner))
cluster_aware_runner = _single_cluster_runner(runner, str(runner))

__RUNNERS[operation_type] = _with_completion(_with_assertions(cluster_aware_runner))


# Only intended for unit-testing!
Expand Down Expand Up @@ -212,14 +223,16 @@ def unwrap(runner):

def _single_cluster_runner(runnable, name, context_manager_enabled=False):
# only pass the default ES client
delegate = MultiClientRunner(runnable, name, lambda es: es["default"], context_manager_enabled)
return _with_completion(delegate)
return MultiClientRunner(runnable, name, lambda es: es["default"], context_manager_enabled)


def _multi_cluster_runner(runnable, name, context_manager_enabled=False):
# pass all ES clients
delegate = MultiClientRunner(runnable, name, lambda es: es, context_manager_enabled)
return _with_completion(delegate)
return MultiClientRunner(runnable, name, lambda es: es, context_manager_enabled)


def _with_assertions(delegate):
return AssertingRunner(delegate)


def _with_completion(delegate):
Expand Down Expand Up @@ -311,6 +324,69 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
return False


class AssertingRunner(Runner, Delegator):
assertions_enabled = False

def __init__(self, delegate):
super().__init__(delegate=delegate)
self.predicates = {
">": self.greater_than,
">=": self.greater_than_or_equal,
"<": self.smaller_than,
"<=": self.smaller_than_or_equal,
"==": self.equal,
}

def greater_than(self, expected, actual):
return actual > expected

def greater_than_or_equal(self, expected, actual):
return actual >= expected

def smaller_than(self, expected, actual):
return actual < expected

def smaller_than_or_equal(self, expected, actual):
return actual <= expected

def equal(self, expected, actual):
return actual == expected

def check_assertion(self, assertion, properties):
path = assertion["property"]
predicate_name = assertion["condition"]
expected_value = assertion["value"]
actual_value = properties
for k in path.split("."):
actual_value = actual_value[k]
predicate = self.predicates[predicate_name]
success = predicate(expected_value, actual_value)
if not success:
raise exceptions.RallyTaskAssertionError(
f"Expected [{path}] to be {predicate_name} [{expected_value}] but was [{actual_value}].")

async def __call__(self, *args):
params = args[1]
return_value = await self.delegate(*args)
if AssertingRunner.assertions_enabled and "assertions" in params:
if isinstance(return_value, dict):
for assertion in params["assertions"]:
self.check_assertion(assertion, return_value)
else:
self.logger.debug("Skipping assertion check as [%s] does not return a dict.", repr(self.delegate))
return return_value

def __repr__(self, *args, **kwargs):
return repr(self.delegate)

async def __aenter__(self):
await self.delegate.__aenter__()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self.delegate.__aexit__(exc_type, exc_val, exc_tb)


def mandatory(params, key, op):
try:
return params[key]
Expand Down Expand Up @@ -853,7 +929,10 @@ async def __call__(self, es, params):

async def request_body_query(self, es, params):
request_params, headers = self._transport_request_params(params)
index = params.get("index", "_all")
# Mandatory to ensure it is always provided. This is especially important when this runner is used in a
# composite context where there is no actual parameter source and the entire request structure must be provided
# by the composite's parameter source.
index = mandatory(params, "index", self)
body = mandatory(params, "body", self)
doc_type = params.get("type")
detailed_results = params.get("detailed-results", False)
Expand Down Expand Up @@ -921,7 +1000,10 @@ async def scroll_query(self, es, params):
try:
for page in range(total_pages):
if page == 0:
index = params.get("index", "_all")
# Mandatory to ensure it is always provided. This is especially important when this runner is used
# in a composite context where there is no actual parameter source and the entire request structure
# must be provided by the composite's parameter source.
index = mandatory(params, "index", self)
body = mandatory(params, "body", self)
sort = "_doc"
scroll = "10s"
Expand Down Expand Up @@ -2042,15 +2124,26 @@ async def __call__(self, es, params):
success = True
searches = mandatory(params, "retrieve-results-for", self)
request_params = params.get("request-params", {})
for search_id, _ in async_search_ids(searches):
stats = {}
for search_id, search in async_search_ids(searches):
response = await es.async_search.get(id=search_id,
params=request_params)
success = success and not response["is_running"]
is_running = response["is_running"]
success = success and not is_running
if not is_running:
stats[search] = {
"hits": response["response"]["hits"]["total"]["value"],
"hits_relation": response["response"]["hits"]["total"]["relation"],
"timed_out": response["response"]["timed_out"],
"took": response["response"]["took"]
}

return {
"weight": len(searches),
# only count completed searches - there is one key per search id in `stats`
"weight": len(stats),
"unit": "ops",
"success": success
"success": success,
"stats": stats
}

def __repr__(self, *args, **kwargs):
Expand Down Expand Up @@ -2119,6 +2212,7 @@ def __init__(self, *args, **kwargs):
self.supported_op_types = [
"raw-request",
"sleep",
"search",
"submit-async-search",
"get-async-search",
"delete-async-search"
Expand Down
9 changes: 9 additions & 0 deletions esrally/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def __init__(self, message, cause=None):
def __repr__(self):
return self.message

def __str__(self):
return self.message


class LaunchError(RallyError):
"""
Expand All @@ -48,6 +51,12 @@ class RallyAssertionError(RallyError):
"""


class RallyTaskAssertionError(RallyAssertionError):
"""
Thrown when an assertion on a task has been violated.
"""


class ConfigError(RallyError):
pass

Expand Down
Loading

0 comments on commit b92decc

Please sign in to comment.