From a72730e51b8b60df7c26069b609962fcc2cde2c6 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Mon, 17 Sep 2018 15:23:21 +0200 Subject: [PATCH] More fine-grained ML metrics With this commit we retrieve and store ML-related metrics per machine learning job (instead of all jobs globally). We also store not only the maximum bucket processing time but in addition the minimum, median and mean. --- docs/metrics.rst | 1 + docs/summary_report.rst | 6 ++ esrally/mechanic/telemetry.py | 36 +++++++--- esrally/reporter.py | 43 +++++++++--- tests/mechanic/telemetry_test.py | 110 ++++++++++++++++++++++++++++++- tests/reporter_test.py | 14 ++++ 6 files changed, 193 insertions(+), 17 deletions(-) diff --git a/docs/metrics.rst b/docs/metrics.rst index bf24fb91e..a7dd0ca61 100644 --- a/docs/metrics.rst +++ b/docs/metrics.rst @@ -152,3 +152,4 @@ Rally stores the following metrics: * ``final_index_size_bytes``: Final resulting index size on the file system after all nodes have been shutdown at the end of the benchmark. It includes all files in the nodes' data directories (actual index files and translog). * ``store_size_in_bytes``: The size in bytes of the index (excluding the translog) as reported by the indices stats API. * ``translog_size_in_bytes``: The size in bytes of the translog as reported by the indices stats API. +* ``ml_processing_time``: A structure containing the minimum, mean, median and maximum bucket processing time in milliseconds per machine learning job. These metrics are only available if a machine learning job has been created in the respective benchmark. diff --git a/docs/summary_report.rst b/docs/summary_report.rst index 112fdb3dc..010948dcb 100644 --- a/docs/summary_report.rst +++ b/docs/summary_report.rst @@ -92,6 +92,12 @@ Where ``X`` is one of: * **Definition**: Different merge times as reported by Lucene. Only available if Lucene index writer trace logging is enabled (use ``--car-params="verbose_iw_logging_enabled:true"`` for that). * **Corresponding metrics keys**: ``merge_parts_total_time_*`` +ML processing time +------------------ + +* **Definition**: Minimum, mean, median and maximum time in milliseconds that a machine learning job has spent processing a single bucket. +* **Corresponding metrics key**: ``ml_processing_time`` + Median CPU usage ---------------- diff --git a/esrally/mechanic/telemetry.py b/esrally/mechanic/telemetry.py index d9cd9b0ca..9eaba9c72 100644 --- a/esrally/mechanic/telemetry.py +++ b/esrally/mechanic/telemetry.py @@ -1133,15 +1133,29 @@ def detach_from_cluster(self, cluster): "query": { "bool": { "must": [ - {"term": {"result_type": "bucket"}}, - # TODO: We could restrict this by job id if we need to measure multiple jobs... - # {"term": {"job_id": "job_id"}} + {"term": {"result_type": "bucket"}} ] } }, "aggs": { - "max_bucket_processing_time": { - "max": {"field": "processing_time_ms"} + "jobs": { + "terms": { + "field": "job_id" + }, + "aggs": { + "min_pt": { + "min": {"field": "processing_time_ms"} + }, + "max_pt": { + "max": {"field": "processing_time_ms"} + }, + "mean_pt": { + "avg": {"field": "processing_time_ms"} + }, + "median_pt": { + "percentiles": {"field": "processing_time_ms", "percents": [50]} + } + } } } }) @@ -1149,9 +1163,15 @@ def detach_from_cluster(self, cluster): self.logger.exception("Could not retrieve ML bucket processing time.") return try: - value = results["aggregations"]["max_bucket_processing_time"]["value"] - if value: - self.metrics_store.put_value_cluster_level("ml_max_processing_time_millis", value, "ms") + for job in results["aggregations"]["jobs"]["buckets"]: + ml_job_stats = collections.OrderedDict() + ml_job_stats["name"] = "ml_processing_time" + ml_job_stats["job_name"] = job["key"] + ml_job_stats["min_millis"] = job["min_pt"]["value"] + ml_job_stats["mean_millis"] = job["mean_pt"]["value"] + ml_job_stats["median_millis"] = job["median_pt"]["values"]["50.0"] + ml_job_stats["max_millis"] = job["max_pt"]["value"] + self.metrics_store.put_doc(doc=dict(ml_job_stats), level=MetaInfoScope.cluster) except KeyError: # no ML running pass diff --git a/esrally/reporter.py b/esrally/reporter.py index 95c644591..3ea2c31d0 100644 --- a/esrally/reporter.py +++ b/esrally/reporter.py @@ -153,8 +153,8 @@ def __call__(self): result.merge_part_time_vectors = self.sum("merge_parts_total_time_vectors") result.merge_part_time_points = self.sum("merge_parts_total_time_points") - self.logger.debug("Gathering ML max processing time.") - result.ml_max_processing_time = self.one("ml_max_processing_time_millis") + self.logger.debug("Gathering ML max processing times.") + result.ml_processing_time = self.all("ml_processing_time") self.logger.debug("Gathering CPU usage metrics.") result.median_cpu_usage = self.median("cpu_utilization_1s", sample_type=metrics.SampleType.Normal) @@ -195,6 +195,9 @@ def sum(self, metric_name): def one(self, metric_name): return self.store.get_one(metric_name, lap=self.lap) + def all(self, metric_name): + return self.store.get_raw(metric_name, lap=self.lap) + def summary_stats(self, metric_name, task_name): median = self.store.get_median(metric_name, task=task_name, sample_type=metrics.SampleType.Normal, lap=self.lap) unit = self.store.get_unit(metric_name, task=task_name) @@ -269,7 +272,7 @@ def __init__(self, d=None): self.flush_time_per_shard = self.v(d, "flush_time_per_shard", default={}) self.merge_throttle_time = self.v(d, "merge_throttle_time") self.merge_throttle_time_per_shard = self.v(d, "merge_throttle_time_per_shard", default={}) - self.ml_max_processing_time = self.v(d, "ml_max_processing_time") + self.ml_processing_time = self.v(d, "ml_processing_time") self.merge_part_time_postings = self.v(d, "merge_part_time_postings") self.merge_part_time_stored_fields = self.v(d, "merge_part_time_stored_fields") @@ -420,7 +423,7 @@ def report(self): metrics_table = [] metrics_table.extend(self.report_total_times(stats)) metrics_table.extend(self.report_merge_part_times(stats)) - metrics_table.extend(self.report_ml_max_processing_time(stats)) + metrics_table.extend(self.report_ml_processing_times(stats)) metrics_table.extend(self.report_cpu_usage(stats)) metrics_table.extend(self.report_gc_times(stats)) @@ -519,10 +522,15 @@ def report_merge_part_times(self, stats): self.line("Merge time (points)", "", stats.merge_part_time_points, unit, convert.ms_to_minutes) ) - def report_ml_max_processing_time(self, stats): - return self.join( - self.line("Max Processing Time (ML)", "", convert.ms_to_seconds(stats.ml_max_processing_time), "s") - ) + def report_ml_processing_times(self, stats): + lines = [] + for processing_time in stats.ml_processing_time: + job_name = processing_time["job_name"] + lines.append(self.line("Min ML processing time", job_name, processing_time["min_millis"], "s", convert.ms_to_seconds)), + lines.append(self.line("Mean ML processing time", job_name, processing_time["mean_millis"], "s", convert.ms_to_seconds)), + lines.append(self.line("Median ML processing time", job_name, processing_time["median_millis"], "s", convert.ms_to_seconds)), + lines.append(self.line("Max ML processing time", job_name, processing_time["max_millis"], "s", convert.ms_to_seconds)) + return lines def report_cpu_usage(self, stats): return self.join( @@ -621,6 +629,8 @@ def metrics_table(self, baseline_stats, contender_stats, plain): metrics_table = [] metrics_table.extend(self.report_total_times(baseline_stats, contender_stats)) metrics_table.extend(self.report_merge_part_times(baseline_stats, contender_stats)) + metrics_table.extend(self.report_merge_part_times(baseline_stats, contender_stats)) + metrics_table.extend(self.report_ml_processing_times(baseline_stats, contender_stats)) metrics_table.extend(self.report_gc_times(baseline_stats, contender_stats)) metrics_table.extend(self.report_disk_usage(baseline_stats, contender_stats)) metrics_table.extend(self.report_segment_memory(baseline_stats, contender_stats)) @@ -701,6 +711,23 @@ def report_merge_part_times(self, baseline_stats, contender_stats): "", "min", treat_increase_as_improvement=False, formatter=convert.ms_to_minutes) ) + def report_ml_processing_times(self, baseline_stats, contender_stats): + lines = [] + for baseline in baseline_stats.ml_processing_time: + job_name = baseline["job_name"] + # O(n^2) but we assume here only a *very* limited number of jobs (usually just one) + for contender in contender_stats.ml_processing_time: + if contender["job_name"] == job_name: + lines.append(self.line("Min ML processing time", baseline["min_millis"], contender["min_millis"], + job_name, "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds)) + lines.append(self.line("Mean ML processing time", baseline["mean_millis"], contender["mean_millis"], + job_name, "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds)) + lines.append(self.line("Median ML processing time", baseline["median_millis"], contender["median_millis"], + job_name, "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds)) + lines.append(self.line("Max ML processing time", baseline["max_millis"], contender["max_millis"], + job_name, "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds)) + return lines + def report_total_times(self, baseline_stats, contender_stats): lines = [] lines.extend(self.report_total_time("indexing time", diff --git a/tests/mechanic/telemetry_test.py b/tests/mechanic/telemetry_test.py index 8cc9344f6..ba1f58138 100644 --- a/tests/mechanic/telemetry_test.py +++ b/tests/mechanic/telemetry_test.py @@ -1,6 +1,7 @@ import random import collections import unittest.mock as mock +import elasticsearch from unittest import TestCase from esrally import config, metrics, exceptions @@ -156,7 +157,6 @@ def __init__(self, response=None, force_error=False): def perform_request(self, *args, **kwargs): if self._force_error: - import elasticsearch raise elasticsearch.TransportError else: return self._response @@ -1955,6 +1955,114 @@ def test_index_stats_are_per_lap(self, metrics_store_cluster_count, metrics_stor ], any_order=True) +class MlBucketProcessingTimeTests(TestCase): + @mock.patch("esrally.metrics.EsMetricsStore.put_doc") + @mock.patch("elasticsearch.Elasticsearch") + def test_error_on_retrieval_does_not_store_metrics(self, es, metrics_store_put_doc): + es.search.side_effect = elasticsearch.TransportError("unit test error") + cfg = create_config() + metrics_store = metrics.EsMetricsStore(cfg) + device = telemetry.MlBucketProcessingTime(es, metrics_store) + t = telemetry.Telemetry(cfg, devices=[device]) + # cluster is not used by this device + t.detach_from_cluster(cluster=None) + + self.assertEqual(0, metrics_store_put_doc.call_count) + + @mock.patch("esrally.metrics.EsMetricsStore.put_doc") + @mock.patch("elasticsearch.Elasticsearch") + def test_empty_result_does_not_store_metrics(self, es, metrics_store_put_doc): + es.search.return_value = { + "aggregations": { + "jobs": { + "buckets": [] + } + } + } + cfg = create_config() + metrics_store = metrics.EsMetricsStore(cfg) + device = telemetry.MlBucketProcessingTime(es, metrics_store) + t = telemetry.Telemetry(cfg, devices=[device]) + # cluster is not used by this device + t.detach_from_cluster(cluster=None) + + self.assertEqual(0, metrics_store_put_doc.call_count) + + @mock.patch("esrally.metrics.EsMetricsStore.put_doc") + @mock.patch("elasticsearch.Elasticsearch") + def test_result_is_stored(self, es, metrics_store_put_doc): + es.search.return_value = { + "aggregations": { + "jobs": { + "buckets": [ + { + "key": "benchmark_ml_job_1", + "doc_count": 4775, + "max_pt": { + "value": 36.0 + }, + "mean_pt": { + "value": 12.3 + }, + "median_pt": { + "values": { + "50.0": 17.2 + } + }, + "min_pt": { + "value": 2.2 + } + }, + { + "key": "benchmark_ml_job_2", + "doc_count": 3333, + "max_pt": { + "value": 226.3 + }, + "mean_pt": { + "value": 78.3 + }, + "median_pt": { + "values": { + "50.0": 37.4 + } + }, + "min_pt": { + "value": 32.2 + } + } + ] + } + } + } + + cfg = create_config() + metrics_store = metrics.EsMetricsStore(cfg) + device = telemetry.MlBucketProcessingTime(es, metrics_store) + t = telemetry.Telemetry(cfg, devices=[device]) + # cluster is not used by this device + t.detach_from_cluster(cluster=None) + + metrics_store_put_doc.assert_has_calls([ + mock.call(doc={ + "name": "ml_processing_time", + "job_name": "benchmark_ml_job_1", + "min_millis": 2.2, + "mean_millis": 12.3, + "median_millis": 17.2, + "max_millis": 36.0 + }, level=metrics.MetaInfoScope.cluster), + mock.call(doc={ + "name": "ml_processing_time", + "job_name": "benchmark_ml_job_2", + "min_millis": 32.2, + "mean_millis": 78.3, + "median_millis": 37.4, + "max_millis": 226.3 + }, level=metrics.MetaInfoScope.cluster) + ]) + + class IndexSizeTests(TestCase): @mock.patch("esrally.utils.io.get_size") @mock.patch("esrally.metrics.EsMetricsStore.put_count_node_level") diff --git a/tests/reporter_test.py b/tests/reporter_test.py index e11bdcafe..fc22ece36 100644 --- a/tests/reporter_test.py +++ b/tests/reporter_test.py @@ -45,6 +45,14 @@ def test_calculate_simple_index_stats(self): meta_data={"success": False}) store.put_value_cluster_level("service_time", 215, unit="ms", task="index #1", operation_type=track.OperationType.Bulk, meta_data={"success": True}) + store.put_doc(doc={ + "name": "ml_processing_time", + "job_name": "benchmark_ml_job_1", + "min_millis": 2.2, + "mean_millis": 12.3, + "median_millis": 17.2, + "max_millis": 36.0 + }, level=metrics.MetaInfoScope.cluster) store.put_count_node_level("rally-node-0", "final_index_size_bytes", 2048, unit="bytes") store.put_count_node_level("rally-node-1", "final_index_size_bytes", 4096, unit="bytes") @@ -59,6 +67,12 @@ def test_calculate_simple_index_stats(self): self.assertAlmostEqual(0.3333333333333333, opm["error_rate"]) self.assertEqual(6144, stats.index_size) + self.assertEqual(1, len(stats.ml_processing_time)) + self.assertEqual("benchmark_ml_job_1", stats.ml_processing_time[0]["job_name"]) + self.assertEqual(2.2, stats.ml_processing_time[0]["min_millis"]) + self.assertEqual(12.3, stats.ml_processing_time[0]["mean_millis"]) + self.assertEqual(17.2, stats.ml_processing_time[0]["median_millis"]) + self.assertEqual(36.0, stats.ml_processing_time[0]["max_millis"]) def select(l, name, operation=None, node=None):