From 94e555cebc3d73cc0f568600a806e919b44ad3b0 Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Mon, 5 Aug 2024 16:12:45 -0700 Subject: [PATCH 1/3] Multiple search clients for automatic scaling Signed-off-by: Finn Roblin --- osbenchmark/results_publisher.py | 54 +++++++++++++++++++++++++++----- osbenchmark/workload/loader.py | 19 ++++++++--- tests/workload/loader_test.py | 46 +++++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 12 deletions(-) diff --git a/osbenchmark/results_publisher.py b/osbenchmark/results_publisher.py index 32b2e4ecb..04814d621 100644 --- a/osbenchmark/results_publisher.py +++ b/osbenchmark/results_publisher.py @@ -27,6 +27,8 @@ import io import logging import sys +import re +from enum import Enum import tabulate @@ -43,6 +45,11 @@ ------------------------------------------------------ """ +class Throughput(Enum): + MEAN = "mean" + MAX = "max" + MIN = "min" + MEDIAN = "median" def summarize(results, cfg): SummaryResultsPublisher(results, cfg).publish() @@ -127,6 +134,16 @@ def __init__(self, results, config): "latency": comma_separated_string_to_number_list(config.opts("workload", "latency.percentiles", mandatory=False)) } + def publish_operational_statistics(self, metrics_table: list, warnings: list, record, task): + metrics_table.extend(self._publish_throughput(record, task)) + metrics_table.extend(self._publish_latency(record, task)) + metrics_table.extend(self._publish_service_time(record, task)) + # this is mostly needed for debugging purposes but not so relevant to end users + if self.show_processing_time: + metrics_table.extend(self._publish_processing_time(record, task)) + metrics_table.extend(self._publish_error_rate(record, task)) + self.add_warnings(warnings, record, task) + def publish(self): print_header(FINAL_SCORE) @@ -145,16 +162,33 @@ def publish(self): metrics_table.extend(self._publish_transform_stats(stats)) + max_throughput = -1 + record_with_best_throughput = None + + throughput_pattern = r"_(\d+)_clients$" + + for record in stats.op_metrics: task = record["task"] - metrics_table.extend(self._publish_throughput(record, task)) - metrics_table.extend(self._publish_latency(record, task)) - metrics_table.extend(self._publish_service_time(record, task)) - # this is mostly needed for debugging purposes but not so relevant to end users - if self.show_processing_time: - metrics_table.extend(self._publish_processing_time(record, task)) - metrics_table.extend(self._publish_error_rate(record, task)) - self.add_warnings(warnings, record, task) + maybe_match_task_is_part_of_throughput_testing = re.search(throughput_pattern, task) + if maybe_match_task_is_part_of_throughput_testing: + + # assumption: all units are the same and only maximizing throughput over one operation (i.e. not both ingest and search). + # To maximize throughput over multiple operations, would need a list/dictionary of maximum throughputs. + task_throughput = record["throughput"][Throughput.MEAN.value] + logger = logging.getLogger(__name__) + logger.info("Task %s has throughput %s", task, task_throughput) + if task_throughput > max_throughput: + max_throughput = task_throughput + record_with_best_throughput = record + + else: + self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record, task=task) + + if max_throughput != -1 and record_with_best_throughput is not None: + self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record_with_best_throughput, + task=record_with_best_throughput["task"]) + metrics_table.extend(self._publish_best_client_settings(record_with_best_throughput, record_with_best_throughput["task"])) for record in stats.correctness_metrics: task = record["task"] @@ -217,6 +251,10 @@ def _publish_recall(self, values, task): self._line("Mean recall@1", task, recall_1_mean, "", lambda v: "%.2f" % v) ) + def _publish_best_client_settings(self, record, task): + num_clients = re.search(r"_(\d+)_clients$", task).group(1) + return self._join(self._line("Num clients that achieved maximium throughput", "", num_clients, "")) + def _publish_percentiles(self, name, task, value, unit="ms"): lines = [] percentiles = self.display_percentiles.get(name, metrics.GlobalStatsCalculator.OTHER_PERCENTILES) diff --git a/osbenchmark/workload/loader.py b/osbenchmark/workload/loader.py index a84c07559..e55581e23 100644 --- a/osbenchmark/workload/loader.py +++ b/osbenchmark/workload/loader.py @@ -1596,11 +1596,22 @@ def _create_test_procedures(self, workload_spec): schedule = [] for op in self._r(test_procedure_spec, "schedule", error_ctx=name): - if "parallel" in op: - task = self.parse_parallel(op["parallel"], ops, name) + if "clients_list" in op: + self.logger.info("Clients list specified, running multiple search tasks with %s clients.", op["clients_list"]) + for client in op["clients_list"]: + op["clients"] = client + + new_name = name + "_" + str(client) + "_clients" + new_task = self.parse_task(op, ops, new_name) + new_task.name = new_name + schedule.append(new_task) else: - task = self.parse_task(op, ops, name) - schedule.append(task) + if "parallel" in op: + task = self.parse_parallel(op["parallel"], ops, name) + else: + task = self.parse_task(op, ops, name) + + schedule.append(task) # verify we don't have any duplicate task names (which can be confusing / misleading in results_publishing). known_task_names = set() diff --git a/tests/workload/loader_test.py b/tests/workload/loader_test.py index eeccc14ec..a9e3a338f 100644 --- a/tests/workload/loader_test.py +++ b/tests/workload/loader_test.py @@ -2477,6 +2477,52 @@ def test_parse_unique_task_names(self): self.assertEqual("search-two-clients", schedule[1].name) self.assertEqual("search", schedule[1].operation.name) + def test_parse_clients_list(self): + workload_specification = { + "description": "description for unit test", + "operations": [ + { + "name": "search", + "operation-type": "search", + "index": "_all" + } + ], + "test_procedure": { + "name": "default-test_procedure", + "schedule": [ + { + "name": "search-one-client", + "operation": "search", + "clients": 1, + "clients_list": [1,2,3] + }, + { + "name": "search-two-clients", + "operation": "search", + "clients": 2 + } + ] + } + } + + reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test_procedure") + resulting_workload = reader("unittest", workload_specification, "/mappings") + self.assertEqual("unittest", resulting_workload.name) + test_procedure = resulting_workload.test_procedures[0] + self.assertTrue(test_procedure.selected) + schedule = test_procedure.schedule + self.assertEqual(4, len(schedule)) + + self.assertEqual("default-test_procedure_1_clients", schedule[0].name) + self.assertEqual("search", schedule[0].operation.name) + self.assertEqual("default-test_procedure_2_clients", schedule[1].name) + self.assertEqual("search", schedule[1].operation.name) + self.assertEqual("default-test_procedure_3_clients", schedule[2].name) + self.assertEqual("search", schedule[2].operation.name) + + self.assertEqual("search-two-clients", schedule[3].name) + self.assertEqual("search", schedule[3].operation.name) + def test_parse_indices_valid_workload_specification(self): workload_specification = { "description": "description for unit test", From c94f1c195850e34b0cc54024216809703a4cef9f Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Wed, 14 Aug 2024 16:16:51 -0700 Subject: [PATCH 2/3] Address Vijay offline feedback Signed-off-by: Finn Roblin --- osbenchmark/results_publisher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osbenchmark/results_publisher.py b/osbenchmark/results_publisher.py index 04814d621..1e9aff61f 100644 --- a/osbenchmark/results_publisher.py +++ b/osbenchmark/results_publisher.py @@ -253,7 +253,7 @@ def _publish_recall(self, values, task): def _publish_best_client_settings(self, record, task): num_clients = re.search(r"_(\d+)_clients$", task).group(1) - return self._join(self._line("Num clients that achieved maximium throughput", "", num_clients, "")) + return self._join(self._line("Num clients to reach max throughput", "", num_clients, "")) def _publish_percentiles(self, name, task, value, unit="ms"): lines = [] From 38c1241493ac5fd8e8dcb71b95923e621e0aff3e Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Thu, 22 Aug 2024 17:34:29 -0700 Subject: [PATCH 3/3] Address Ian's feedback Signed-off-by: Finn Roblin --- osbenchmark/results_publisher.py | 13 +++++++------ osbenchmark/workload/loader.py | 23 +++++++++++++++++++---- tests/workload/loader_test.py | 28 +++++++++++++++++++++++----- 3 files changed, 49 insertions(+), 15 deletions(-) diff --git a/osbenchmark/results_publisher.py b/osbenchmark/results_publisher.py index 1e9aff61f..4b3ebebd6 100644 --- a/osbenchmark/results_publisher.py +++ b/osbenchmark/results_publisher.py @@ -133,6 +133,7 @@ def __init__(self, results, config): "throughput":comma_separated_string_to_number_list(config.opts("workload", "throughput.percentiles", mandatory=False)), "latency": comma_separated_string_to_number_list(config.opts("workload", "latency.percentiles", mandatory=False)) } + self.logger = logging.getLogger(__name__) def publish_operational_statistics(self, metrics_table: list, warnings: list, record, task): metrics_table.extend(self._publish_throughput(record, task)) @@ -162,6 +163,7 @@ def publish(self): metrics_table.extend(self._publish_transform_stats(stats)) + # These variables are used with the clients_list parameter in test_procedures to find the max throughput. max_throughput = -1 record_with_best_throughput = None @@ -170,14 +172,12 @@ def publish(self): for record in stats.op_metrics: task = record["task"] - maybe_match_task_is_part_of_throughput_testing = re.search(throughput_pattern, task) - if maybe_match_task_is_part_of_throughput_testing: - + is_task_part_of_throughput_testing = re.search(throughput_pattern, task) + if is_task_part_of_throughput_testing: # assumption: all units are the same and only maximizing throughput over one operation (i.e. not both ingest and search). # To maximize throughput over multiple operations, would need a list/dictionary of maximum throughputs. task_throughput = record["throughput"][Throughput.MEAN.value] - logger = logging.getLogger(__name__) - logger.info("Task %s has throughput %s", task, task_throughput) + self.logger.info("Task %s has throughput %s", task, task_throughput) if task_throughput > max_throughput: max_throughput = task_throughput record_with_best_throughput = record @@ -185,6 +185,7 @@ def publish(self): else: self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record, task=task) + # The following code is run when the clients_list parameter is specified and publishes the max throughput. if max_throughput != -1 and record_with_best_throughput is not None: self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record_with_best_throughput, task=record_with_best_throughput["task"]) @@ -253,7 +254,7 @@ def _publish_recall(self, values, task): def _publish_best_client_settings(self, record, task): num_clients = re.search(r"_(\d+)_clients$", task).group(1) - return self._join(self._line("Num clients to reach max throughput", "", num_clients, "")) + return self._join(self._line("Number of clients that achieved max throughput", "", num_clients, "")) def _publish_percentiles(self, name, task, value, unit="ms"): lines = [] diff --git a/osbenchmark/workload/loader.py b/osbenchmark/workload/loader.py index e55581e23..0d8114234 100644 --- a/osbenchmark/workload/loader.py +++ b/osbenchmark/workload/loader.py @@ -1597,11 +1597,14 @@ def _create_test_procedures(self, workload_spec): for op in self._r(test_procedure_spec, "schedule", error_ctx=name): if "clients_list" in op: - self.logger.info("Clients list specified, running multiple search tasks with %s clients.", op["clients_list"]) - for client in op["clients_list"]: - op["clients"] = client + self.logger.info("Clients list specified: %s. Running multiple search tasks, "\ + "each scheduled with the corresponding number of clients from the list.", op["clients_list"]) + for num_clients in op["clients_list"]: + op["clients"] = num_clients - new_name = name + "_" + str(client) + "_clients" + new_name = self._rename_task_based_on_num_clients(name, num_clients) + + new_name = name + "_" + str(num_clients) + "_clients" new_task = self.parse_task(op, ops, new_name) new_task.name = new_name schedule.append(new_task) @@ -1646,6 +1649,18 @@ def _create_test_procedures(self, workload_spec): % ", ".join([c.name for c in test_procedures])) return test_procedures + def _rename_task_based_on_num_clients(self, name: str, num_clients: int) -> str: + has_underscore = "_" in name + has_hyphen = "-" in name + if has_underscore and has_hyphen: + self.logger.warning("The test procedure name %s contains a mix of _ and -. "\ + "Consider changing the name to avoid frustrating bugs in the future.", name) + return name + "_" + str(num_clients) + "_clients" + elif has_hyphen: + return name + "-" + str(num_clients) + "-clients" + else: + return name + "_" + str(num_clients) + "_clients" + def _get_test_procedure_specs(self, workload_spec): schedule = self._r(workload_spec, "schedule", mandatory=False) test_procedure = self._r(workload_spec, "test_procedure", mandatory=False) diff --git a/tests/workload/loader_test.py b/tests/workload/loader_test.py index a9e3a338f..285e52772 100644 --- a/tests/workload/loader_test.py +++ b/tests/workload/loader_test.py @@ -2488,7 +2488,7 @@ def test_parse_clients_list(self): } ], "test_procedure": { - "name": "default-test_procedure", + "name": "default-test-procedure", "schedule": [ { "name": "search-one-client", @@ -2505,7 +2505,7 @@ def test_parse_clients_list(self): } } - reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test_procedure") + reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test-procedure") resulting_workload = reader("unittest", workload_specification, "/mappings") self.assertEqual("unittest", resulting_workload.name) test_procedure = resulting_workload.test_procedures[0] @@ -2513,15 +2513,33 @@ def test_parse_clients_list(self): schedule = test_procedure.schedule self.assertEqual(4, len(schedule)) - self.assertEqual("default-test_procedure_1_clients", schedule[0].name) + self.assertEqual("default-test-procedure_1_clients", schedule[0].name) self.assertEqual("search", schedule[0].operation.name) - self.assertEqual("default-test_procedure_2_clients", schedule[1].name) + self.assertEqual("default-test-procedure_2_clients", schedule[1].name) self.assertEqual("search", schedule[1].operation.name) - self.assertEqual("default-test_procedure_3_clients", schedule[2].name) + self.assertEqual("default-test-procedure_3_clients", schedule[2].name) self.assertEqual("search", schedule[2].operation.name) self.assertEqual("search-two-clients", schedule[3].name) self.assertEqual("search", schedule[3].operation.name) + # pylint: disable=W0212 + def test_naming_with_clients_list(self): + reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test_procedure") + # Test case 1: name contains both "_" and "-" + result = reader._rename_task_based_on_num_clients("test_name-task", 5) + self.assertEqual(result, "test_name-task_5_clients") + + # Test case 2: name contains only "-" + result = reader._rename_task_based_on_num_clients("test-name", 3) + self.assertEqual(result, "test-name-3-clients") + + # Test case 3: name contains only "_" + result = reader._rename_task_based_on_num_clients("test_name", 2) + self.assertEqual(result, "test_name_2_clients") + + # Test case 4: name contains neither "_" nor "-" + result = reader._rename_task_based_on_num_clients("testname", 1) + self.assertEqual(result, "testname_1_clients") def test_parse_indices_valid_workload_specification(self): workload_specification = {