Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More fine-grained ML metrics #572

Merged
merged 1 commit into from
Sep 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,4 @@ Rally stores the following metrics:
* ``final_index_size_bytes``: Final resulting index size on the file system after all nodes have been shutdown at the end of the benchmark. It includes all files in the nodes' data directories (actual index files and translog).
* ``store_size_in_bytes``: The size in bytes of the index (excluding the translog) as reported by the indices stats API.
* ``translog_size_in_bytes``: The size in bytes of the translog as reported by the indices stats API.
* ``ml_processing_time``: A structure containing the minimum, mean, median and maximum bucket processing time in milliseconds per machine learning job. These metrics are only available if a machine learning job has been created in the respective benchmark.
6 changes: 6 additions & 0 deletions docs/summary_report.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ Where ``X`` is one of:
* **Definition**: Different merge times as reported by Lucene. Only available if Lucene index writer trace logging is enabled (use ``--car-params="verbose_iw_logging_enabled:true"`` for that).
* **Corresponding metrics keys**: ``merge_parts_total_time_*``

ML processing time
------------------

* **Definition**: Minimum, mean, median and maximum time in milliseconds that a machine learning job has spent processing a single bucket.
* **Corresponding metrics key**: ``ml_processing_time``


Median CPU usage
----------------
Expand Down
36 changes: 28 additions & 8 deletions esrally/mechanic/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1133,25 +1133,45 @@ def detach_from_cluster(self, cluster):
"query": {
"bool": {
"must": [
{"term": {"result_type": "bucket"}},
# TODO: We could restrict this by job id if we need to measure multiple jobs...
# {"term": {"job_id": "job_id"}}
{"term": {"result_type": "bucket"}}
]
}
},
"aggs": {
"max_bucket_processing_time": {
"max": {"field": "processing_time_ms"}
"jobs": {
"terms": {
"field": "job_id"
},
"aggs": {
"min_pt": {
"min": {"field": "processing_time_ms"}
},
"max_pt": {
"max": {"field": "processing_time_ms"}
},
"mean_pt": {
"avg": {"field": "processing_time_ms"}
},
"median_pt": {
"percentiles": {"field": "processing_time_ms", "percents": [50]}
}
}
}
}
})
except elasticsearch.TransportError:
self.logger.exception("Could not retrieve ML bucket processing time.")
return
try:
value = results["aggregations"]["max_bucket_processing_time"]["value"]
if value:
self.metrics_store.put_value_cluster_level("ml_max_processing_time_millis", value, "ms")
for job in results["aggregations"]["jobs"]["buckets"]:
ml_job_stats = collections.OrderedDict()
ml_job_stats["name"] = "ml_processing_time"
ml_job_stats["job_name"] = job["key"]
ml_job_stats["min_millis"] = job["min_pt"]["value"]
ml_job_stats["mean_millis"] = job["mean_pt"]["value"]
ml_job_stats["median_millis"] = job["median_pt"]["values"]["50.0"]
ml_job_stats["max_millis"] = job["max_pt"]["value"]
self.metrics_store.put_doc(doc=dict(ml_job_stats), level=MetaInfoScope.cluster)
except KeyError:
# no ML running
pass
Expand Down
43 changes: 35 additions & 8 deletions esrally/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ def __call__(self):
result.merge_part_time_vectors = self.sum("merge_parts_total_time_vectors")
result.merge_part_time_points = self.sum("merge_parts_total_time_points")

self.logger.debug("Gathering ML max processing time.")
result.ml_max_processing_time = self.one("ml_max_processing_time_millis")
self.logger.debug("Gathering ML max processing times.")
result.ml_processing_time = self.all("ml_processing_time")

self.logger.debug("Gathering CPU usage metrics.")
result.median_cpu_usage = self.median("cpu_utilization_1s", sample_type=metrics.SampleType.Normal)
Expand Down Expand Up @@ -195,6 +195,9 @@ def sum(self, metric_name):
def one(self, metric_name):
return self.store.get_one(metric_name, lap=self.lap)

def all(self, metric_name):
return self.store.get_raw(metric_name, lap=self.lap)

def summary_stats(self, metric_name, task_name):
median = self.store.get_median(metric_name, task=task_name, sample_type=metrics.SampleType.Normal, lap=self.lap)
unit = self.store.get_unit(metric_name, task=task_name)
Expand Down Expand Up @@ -269,7 +272,7 @@ def __init__(self, d=None):
self.flush_time_per_shard = self.v(d, "flush_time_per_shard", default={})
self.merge_throttle_time = self.v(d, "merge_throttle_time")
self.merge_throttle_time_per_shard = self.v(d, "merge_throttle_time_per_shard", default={})
self.ml_max_processing_time = self.v(d, "ml_max_processing_time")
self.ml_processing_time = self.v(d, "ml_processing_time")

self.merge_part_time_postings = self.v(d, "merge_part_time_postings")
self.merge_part_time_stored_fields = self.v(d, "merge_part_time_stored_fields")
Expand Down Expand Up @@ -420,7 +423,7 @@ def report(self):
metrics_table = []
metrics_table.extend(self.report_total_times(stats))
metrics_table.extend(self.report_merge_part_times(stats))
metrics_table.extend(self.report_ml_max_processing_time(stats))
metrics_table.extend(self.report_ml_processing_times(stats))

metrics_table.extend(self.report_cpu_usage(stats))
metrics_table.extend(self.report_gc_times(stats))
Expand Down Expand Up @@ -519,10 +522,15 @@ def report_merge_part_times(self, stats):
self.line("Merge time (points)", "", stats.merge_part_time_points, unit, convert.ms_to_minutes)
)

def report_ml_max_processing_time(self, stats):
return self.join(
self.line("Max Processing Time (ML)", "", convert.ms_to_seconds(stats.ml_max_processing_time), "s")
)
def report_ml_processing_times(self, stats):
lines = []
for processing_time in stats.ml_processing_time:
job_name = processing_time["job_name"]
lines.append(self.line("Min ML processing time", job_name, processing_time["min_millis"], "s", convert.ms_to_seconds)),
lines.append(self.line("Mean ML processing time", job_name, processing_time["mean_millis"], "s", convert.ms_to_seconds)),
lines.append(self.line("Median ML processing time", job_name, processing_time["median_millis"], "s", convert.ms_to_seconds)),
lines.append(self.line("Max ML processing time", job_name, processing_time["max_millis"], "s", convert.ms_to_seconds))
return lines

def report_cpu_usage(self, stats):
return self.join(
Expand Down Expand Up @@ -621,6 +629,8 @@ def metrics_table(self, baseline_stats, contender_stats, plain):
metrics_table = []
metrics_table.extend(self.report_total_times(baseline_stats, contender_stats))
metrics_table.extend(self.report_merge_part_times(baseline_stats, contender_stats))
metrics_table.extend(self.report_merge_part_times(baseline_stats, contender_stats))
metrics_table.extend(self.report_ml_processing_times(baseline_stats, contender_stats))
metrics_table.extend(self.report_gc_times(baseline_stats, contender_stats))
metrics_table.extend(self.report_disk_usage(baseline_stats, contender_stats))
metrics_table.extend(self.report_segment_memory(baseline_stats, contender_stats))
Expand Down Expand Up @@ -701,6 +711,23 @@ def report_merge_part_times(self, baseline_stats, contender_stats):
"", "min", treat_increase_as_improvement=False, formatter=convert.ms_to_minutes)
)

def report_ml_processing_times(self, baseline_stats, contender_stats):
lines = []
for baseline in baseline_stats.ml_processing_time:
job_name = baseline["job_name"]
# O(n^2) but we assume here only a *very* limited number of jobs (usually just one)
for contender in contender_stats.ml_processing_time:
if contender["job_name"] == job_name:
lines.append(self.line("Min ML processing time", baseline["min_millis"], contender["min_millis"],
job_name, "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds))
lines.append(self.line("Mean ML processing time", baseline["mean_millis"], contender["mean_millis"],
job_name, "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds))
lines.append(self.line("Median ML processing time", baseline["median_millis"], contender["median_millis"],
job_name, "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds))
lines.append(self.line("Max ML processing time", baseline["max_millis"], contender["max_millis"],
job_name, "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds))
return lines

def report_total_times(self, baseline_stats, contender_stats):
lines = []
lines.extend(self.report_total_time("indexing time",
Expand Down
110 changes: 109 additions & 1 deletion tests/mechanic/telemetry_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import random
import collections
import unittest.mock as mock
import elasticsearch

from unittest import TestCase
from esrally import config, metrics, exceptions
Expand Down Expand Up @@ -156,7 +157,6 @@ def __init__(self, response=None, force_error=False):

def perform_request(self, *args, **kwargs):
if self._force_error:
import elasticsearch
raise elasticsearch.TransportError
else:
return self._response
Expand Down Expand Up @@ -1955,6 +1955,114 @@ def test_index_stats_are_per_lap(self, metrics_store_cluster_count, metrics_stor
], any_order=True)


class MlBucketProcessingTimeTests(TestCase):
@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
@mock.patch("elasticsearch.Elasticsearch")
def test_error_on_retrieval_does_not_store_metrics(self, es, metrics_store_put_doc):
es.search.side_effect = elasticsearch.TransportError("unit test error")
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
device = telemetry.MlBucketProcessingTime(es, metrics_store)
t = telemetry.Telemetry(cfg, devices=[device])
# cluster is not used by this device
t.detach_from_cluster(cluster=None)

self.assertEqual(0, metrics_store_put_doc.call_count)

@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
@mock.patch("elasticsearch.Elasticsearch")
def test_empty_result_does_not_store_metrics(self, es, metrics_store_put_doc):
es.search.return_value = {
"aggregations": {
"jobs": {
"buckets": []
}
}
}
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
device = telemetry.MlBucketProcessingTime(es, metrics_store)
t = telemetry.Telemetry(cfg, devices=[device])
# cluster is not used by this device
t.detach_from_cluster(cluster=None)

self.assertEqual(0, metrics_store_put_doc.call_count)

@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
@mock.patch("elasticsearch.Elasticsearch")
def test_result_is_stored(self, es, metrics_store_put_doc):
es.search.return_value = {
"aggregations": {
"jobs": {
"buckets": [
{
"key": "benchmark_ml_job_1",
"doc_count": 4775,
"max_pt": {
"value": 36.0
},
"mean_pt": {
"value": 12.3
},
"median_pt": {
"values": {
"50.0": 17.2
}
},
"min_pt": {
"value": 2.2
}
},
{
"key": "benchmark_ml_job_2",
"doc_count": 3333,
"max_pt": {
"value": 226.3
},
"mean_pt": {
"value": 78.3
},
"median_pt": {
"values": {
"50.0": 37.4
}
},
"min_pt": {
"value": 32.2
}
}
]
}
}
}

cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
device = telemetry.MlBucketProcessingTime(es, metrics_store)
t = telemetry.Telemetry(cfg, devices=[device])
# cluster is not used by this device
t.detach_from_cluster(cluster=None)

metrics_store_put_doc.assert_has_calls([
mock.call(doc={
"name": "ml_processing_time",
"job_name": "benchmark_ml_job_1",
"min_millis": 2.2,
"mean_millis": 12.3,
"median_millis": 17.2,
"max_millis": 36.0
}, level=metrics.MetaInfoScope.cluster),
mock.call(doc={
"name": "ml_processing_time",
"job_name": "benchmark_ml_job_2",
"min_millis": 32.2,
"mean_millis": 78.3,
"median_millis": 37.4,
"max_millis": 226.3
}, level=metrics.MetaInfoScope.cluster)
])


class IndexSizeTests(TestCase):
@mock.patch("esrally.utils.io.get_size")
@mock.patch("esrally.metrics.EsMetricsStore.put_count_node_level")
Expand Down
14 changes: 14 additions & 0 deletions tests/reporter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ def test_calculate_simple_index_stats(self):
meta_data={"success": False})
store.put_value_cluster_level("service_time", 215, unit="ms", task="index #1", operation_type=track.OperationType.Bulk,
meta_data={"success": True})
store.put_doc(doc={
"name": "ml_processing_time",
"job_name": "benchmark_ml_job_1",
"min_millis": 2.2,
"mean_millis": 12.3,
"median_millis": 17.2,
"max_millis": 36.0
}, level=metrics.MetaInfoScope.cluster)
store.put_count_node_level("rally-node-0", "final_index_size_bytes", 2048, unit="bytes")
store.put_count_node_level("rally-node-1", "final_index_size_bytes", 4096, unit="bytes")

Expand All @@ -59,6 +67,12 @@ def test_calculate_simple_index_stats(self):
self.assertAlmostEqual(0.3333333333333333, opm["error_rate"])

self.assertEqual(6144, stats.index_size)
self.assertEqual(1, len(stats.ml_processing_time))
self.assertEqual("benchmark_ml_job_1", stats.ml_processing_time[0]["job_name"])
self.assertEqual(2.2, stats.ml_processing_time[0]["min_millis"])
self.assertEqual(12.3, stats.ml_processing_time[0]["mean_millis"])
self.assertEqual(17.2, stats.ml_processing_time[0]["median_millis"])
self.assertEqual(36.0, stats.ml_processing_time[0]["max_millis"])


def select(l, name, operation=None, node=None):
Expand Down