Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify multiple search clients for easier benchmarking #614

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions osbenchmark/results_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io
import logging
import sys
import re
from enum import Enum

import tabulate

Expand All @@ -43,6 +45,11 @@
------------------------------------------------------
"""

class Throughput(Enum):
MEAN = "mean"
MAX = "max"
MIN = "min"
MEDIAN = "median"

def summarize(results, cfg):
SummaryResultsPublisher(results, cfg).publish()
Expand Down Expand Up @@ -126,6 +133,17 @@ def __init__(self, results, config):
"throughput":comma_separated_string_to_number_list(config.opts("workload", "throughput.percentiles", mandatory=False)),
"latency": comma_separated_string_to_number_list(config.opts("workload", "latency.percentiles", mandatory=False))
}
self.logger = logging.getLogger(__name__)

def publish_operational_statistics(self, metrics_table: list, warnings: list, record, task):
metrics_table.extend(self._publish_throughput(record, task))
metrics_table.extend(self._publish_latency(record, task))
metrics_table.extend(self._publish_service_time(record, task))
# this is mostly needed for debugging purposes but not so relevant to end users
if self.show_processing_time:
metrics_table.extend(self._publish_processing_time(record, task))
metrics_table.extend(self._publish_error_rate(record, task))
self.add_warnings(warnings, record, task)

def publish(self):
print_header(FINAL_SCORE)
Expand All @@ -145,16 +163,33 @@ def publish(self):

metrics_table.extend(self._publish_transform_stats(stats))

# These variables are used with the clients_list parameter in test_procedures to find the max throughput.
max_throughput = -1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Would also add a comment above this to specify that the following three variables are related to clients_list parameter in test_procedures. Newcomers might not quickly understand that this is only relevant to throughput testing clients

record_with_best_throughput = None

throughput_pattern = r"_(\d+)_clients$"


for record in stats.op_metrics:
task = record["task"]
metrics_table.extend(self._publish_throughput(record, task))
metrics_table.extend(self._publish_latency(record, task))
metrics_table.extend(self._publish_service_time(record, task))
# this is mostly needed for debugging purposes but not so relevant to end users
if self.show_processing_time:
metrics_table.extend(self._publish_processing_time(record, task))
metrics_table.extend(self._publish_error_rate(record, task))
self.add_warnings(warnings, record, task)
is_task_part_of_throughput_testing = re.search(throughput_pattern, task)
if is_task_part_of_throughput_testing:
# assumption: all units are the same and only maximizing throughput over one operation (i.e. not both ingest and search).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting emphasis on this comment. Right now search clients is the main use case for vector search (users want to see search throughput from inference perspective). I think adding support for multiple clients in any operation can be deferred to a later PR. Any thoughts?

# To maximize throughput over multiple operations, would need a list/dictionary of maximum throughputs.
task_throughput = record["throughput"][Throughput.MEAN.value]
self.logger.info("Task %s has throughput %s", task, task_throughput)
if task_throughput > max_throughput:
max_throughput = task_throughput
record_with_best_throughput = record

else:
self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record, task=task)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Would be nice if there was another comment above this if statement to stating that the following blurb is related to throughput testing / when specifying multiple clients within the operation

# The following code is run when the clients_list parameter is specified and publishes the max throughput.
if max_throughput != -1 and record_with_best_throughput is not None:
self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record_with_best_throughput,
task=record_with_best_throughput["task"])
metrics_table.extend(self._publish_best_client_settings(record_with_best_throughput, record_with_best_throughput["task"]))

for record in stats.correctness_metrics:
task = record["task"]
Expand Down Expand Up @@ -217,6 +252,10 @@ def _publish_recall(self, values, task):
self._line("Mean recall@1", task, recall_1_mean, "", lambda v: "%.2f" % v)
)

def _publish_best_client_settings(self, record, task):
num_clients = re.search(r"_(\d+)_clients$", task).group(1)
return self._join(self._line("Number of clients that achieved max throughput", "", num_clients, ""))

def _publish_percentiles(self, name, task, value, unit="ms"):
lines = []
percentiles = self.display_percentiles.get(name, metrics.GlobalStatsCalculator.OTHER_PERCENTILES)
Expand Down
34 changes: 30 additions & 4 deletions osbenchmark/workload/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1596,11 +1596,25 @@ def _create_test_procedures(self, workload_spec):
schedule = []

for op in self._r(test_procedure_spec, "schedule", error_ctx=name):
if "parallel" in op:
task = self.parse_parallel(op["parallel"], ops, name)
if "clients_list" in op:
self.logger.info("Clients list specified: %s. Running multiple search tasks, "\
"each scheduled with the corresponding number of clients from the list.", op["clients_list"])
for num_clients in op["clients_list"]:
op["clients"] = num_clients

new_name = self._rename_task_based_on_num_clients(name, num_clients)

new_name = name + "_" + str(num_clients) + "_clients"
new_task = self.parse_task(op, ops, new_name)
new_task.name = new_name
schedule.append(new_task)
else:
task = self.parse_task(op, ops, name)
schedule.append(task)
if "parallel" in op:
task = self.parse_parallel(op["parallel"], ops, name)
else:
task = self.parse_task(op, ops, name)

schedule.append(task)

# verify we don't have any duplicate task names (which can be confusing / misleading in results_publishing).
known_task_names = set()
Expand Down Expand Up @@ -1635,6 +1649,18 @@ def _create_test_procedures(self, workload_spec):
% ", ".join([c.name for c in test_procedures]))
return test_procedures

def _rename_task_based_on_num_clients(self, name: str, num_clients: int) -> str:
has_underscore = "_" in name
has_hyphen = "-" in name
if has_underscore and has_hyphen:
self.logger.warning("The test procedure name %s contains a mix of _ and -. "\
"Consider changing the name to avoid frustrating bugs in the future.", name)
return name + "_" + str(num_clients) + "_clients"
elif has_hyphen:
return name + "-" + str(num_clients) + "-clients"
else:
return name + "_" + str(num_clients) + "_clients"

def _get_test_procedure_specs(self, workload_spec):
schedule = self._r(workload_spec, "schedule", mandatory=False)
test_procedure = self._r(workload_spec, "test_procedure", mandatory=False)
Expand Down
64 changes: 64 additions & 0 deletions tests/workload/loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2477,6 +2477,70 @@ def test_parse_unique_task_names(self):
self.assertEqual("search-two-clients", schedule[1].name)
self.assertEqual("search", schedule[1].operation.name)

def test_parse_clients_list(self):
workload_specification = {
"description": "description for unit test",
"operations": [
{
"name": "search",
"operation-type": "search",
"index": "_all"
}
],
"test_procedure": {
"name": "default-test-procedure",
"schedule": [
{
"name": "search-one-client",
"operation": "search",
"clients": 1,
"clients_list": [1,2,3]
},
{
"name": "search-two-clients",
"operation": "search",
"clients": 2
}
]
}
}

reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test-procedure")
resulting_workload = reader("unittest", workload_specification, "/mappings")
self.assertEqual("unittest", resulting_workload.name)
test_procedure = resulting_workload.test_procedures[0]
self.assertTrue(test_procedure.selected)
schedule = test_procedure.schedule
self.assertEqual(4, len(schedule))

self.assertEqual("default-test-procedure_1_clients", schedule[0].name)
self.assertEqual("search", schedule[0].operation.name)
self.assertEqual("default-test-procedure_2_clients", schedule[1].name)
self.assertEqual("search", schedule[1].operation.name)
self.assertEqual("default-test-procedure_3_clients", schedule[2].name)
self.assertEqual("search", schedule[2].operation.name)

self.assertEqual("search-two-clients", schedule[3].name)
self.assertEqual("search", schedule[3].operation.name)
# pylint: disable=W0212
def test_naming_with_clients_list(self):
reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test_procedure")
# Test case 1: name contains both "_" and "-"
result = reader._rename_task_based_on_num_clients("test_name-task", 5)
self.assertEqual(result, "test_name-task_5_clients")

# Test case 2: name contains only "-"
result = reader._rename_task_based_on_num_clients("test-name", 3)
self.assertEqual(result, "test-name-3-clients")

# Test case 3: name contains only "_"
result = reader._rename_task_based_on_num_clients("test_name", 2)
self.assertEqual(result, "test_name_2_clients")

# Test case 4: name contains neither "_" nor "-"
result = reader._rename_task_based_on_num_clients("testname", 1)
self.assertEqual(result, "testname_1_clients")

def test_parse_indices_valid_workload_specification(self):
workload_specification = {
"description": "description for unit test",
Expand Down
Loading