Skip to content

Commit

Permalink
Refactor aggregate (opensearch-project#708)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Oviedo <[email protected]>
  • Loading branch information
OVI3D0 committed Dec 30, 2024
1 parent 34d0409 commit d0aa6d2
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 65 deletions.
142 changes: 78 additions & 64 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import uuid

from osbenchmark.metrics import FileTestRunStore
from osbenchmark.metrics import FileTestExecutionStore, TestExecution
from osbenchmark import metrics, workload, config
from osbenchmark.utils import io as rio

class Aggregator:
def __init__(self, cfg, test_runs_dict, args):
def __init__(self, cfg, test_runs_dict, args) -> None:
self.config = cfg
self.args = args
self.test_runs = test_runs_dict
Expand All @@ -21,69 +22,72 @@ def __init__(self, cfg, test_runs_dict, args):
self.test_procedure_name = None
self.loaded_workload = None

def count_iterations_for_each_op(self, test_run) -> None:
matching_test_procedure = next((tp for tp in self.loaded_workload.test_procedures if tp.name == self.test_procedure_name), None)
workload_params = test_run.workload_params if test_run.workload_params else {}

test_run_id = test_run.test_run_id
self.accumulated_iterations[test_run_id] = {}

if matching_test_procedure:
for task in matching_test_procedure.schedule:
task_name = task.name
task_name_iterations = f"{task_name}_iterations"
if task_name_iterations in workload_params:
iterations = int(workload_params[task_name_iterations])
else:
iterations = task.iterations or 1
self.accumulated_iterations[test_run_id][task_name] = iterations
else:
raise ValueError(f"Test procedure '{self.test_procedure_name}' not found in the loaded workload.")

def accumulate_results(self, test_run: Any) -> None:
for item in test_run.results.get("op_metrics", []):
task = item.get("task", "")
def count_iterations_for_each_op(self, test_execution: TestExecution) -> None:
"""Count iterations for each operation in the test execution"""
workload_params = test_execution.workload_params if test_execution.workload_params else {}
test_execution_id = test_execution.test_execution_id
self.accumulated_iterations[test_execution_id] = {}

for task in self.loaded_workload.find_test_procedure_or_default(self.test_procedure_name).schedule:
task_name = task.name
task_name_iterations = f"{task_name}_iterations"
iterations = int(workload_params.get(task_name_iterations, task.iterations or 1))
self.accumulated_iterations[test_execution_id][task_name] = iterations

def accumulate_results(self, test_execution: TestExecution) -> None:
"""Accumulate results from a single test execution"""
for operation_metric in test_execution.results.get("op_metrics", []):
task = operation_metric.get("task", "")
self.accumulated_results.setdefault(task, {})
for metric in self.metrics:
self.accumulated_results[task].setdefault(metric, [])
self.accumulated_results[task][metric].append(item.get(metric))
self.accumulated_results[task][metric].append(operation_metric.get(metric))

def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any:
all_jsons = [self.test_store.find_by_test_run_id(id).results for id in self.test_runs.keys()]

# retrieve nested value from a dictionary given a key path
def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any:
"""
Aggregates JSON results across multiple test executions using a specified key path.
Handles nested dictionary structures and calculates averages for numeric values
"""
all_json_results = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()]

def get_nested_value(json_data: Dict[str, Any], path: List[str]) -> Any:
"""
Retrieves a value from a nested dictionary structure using a path of keys.
"""
for key in path:
if isinstance(obj, dict):
obj = obj.get(key, {})
elif isinstance(obj, list) and key.isdigit():
obj = obj[int(key)] if int(key) < len(obj) else {}
if isinstance(json_data, dict):
json_data = json_data.get(key, {})
elif isinstance(json_data, list) and key.isdigit():
json_data = json_data[int(key)] if int(key) < len(json_data) else {}
else:
return None
return obj
return json_data

def aggregate_helper(objects: List[Any]) -> Any:
if not objects:
def aggregate_json_elements(json_elements: List[Any]) -> Any:
if not json_elements:
return None
if all(isinstance(obj, (int, float)) for obj in objects):
avg = sum(objects) / len(objects)
return avg
if all(isinstance(obj, dict) for obj in objects):
keys = set().union(*objects)
return {key: aggregate_helper([obj.get(key) for obj in objects]) for key in keys}
if all(isinstance(obj, list) for obj in objects):
max_length = max(len(obj) for obj in objects)
return [aggregate_helper([obj[i] if i < len(obj) else None for obj in objects]) for i in range(max_length)]
return next((obj for obj in objects if obj is not None), None)
# If all elements are numbers, calculate the average
if all(isinstance(obj, (int, float)) for obj in json_elements):
return sum(json_elements) / len(json_elements)
# If all elements are dictionaries, recursively aggregate their values
if all(isinstance(obj, dict) for obj in json_elements):
keys = set().union(*json_elements)
return {key: aggregate_json_elements([obj.get(key) for obj in json_elements]) for key in keys}
# If all elements are lists, recursively aggregate corresponding elements
if all(isinstance(obj, list) for obj in json_elements):
max_length = max(len(obj) for obj in json_elements)
return [aggregate_json_elements([obj[i] if i < len(obj) else None for obj in json_elements]) for i in range(max_length)]
# If elements are of mixed types, return the first non-None value
return next((obj for obj in json_elements if obj is not None), None)

if isinstance(key_path, str):
key_path = key_path.split('.')

values = [get_nested_value(json, key_path) for json in all_jsons]
return aggregate_helper(values)
nested_values = [get_nested_value(json_result, key_path) for json_result in all_json_results]
return aggregate_json_elements(nested_values)

def build_aggregated_results(self):
test_run = self.test_store.find_by_test_run_id(list(self.test_runs.keys())[0])
def build_aggregated_results_dict(self) -> Dict[str, Any]:
"""Builds a dictionary of aggregated metrics from all test executions"""
aggregated_results = {
"op_metrics": [],
"correctness_metrics": self.aggregate_json_by_key("correctness_metrics"),
Expand Down Expand Up @@ -147,8 +151,30 @@ def build_aggregated_results(self):

aggregated_results["op_metrics"].append(op_metric)

# extract the necessary data from the first test run, since the configurations should be identical for all test runs
return aggregated_results

def update_config_object(self, test_execution: TestExecution) -> None:
"""
Updates the configuration object with values from a test execution.
Uses the first test execution as reference since configurations should be identical
"""
current_timestamp = self.config.opts("system", "time.start")
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.names", test_execution.provision_config_instance)
self.config.add(config.Scope.applicationOverride, "system",
"env.name", test_execution.environment_name)
self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp)
self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_execution.pipeline)
self.config.add(config.Scope.applicationOverride, "workload", "params", test_execution.workload_params)
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.params", test_execution.provision_config_instance_params)
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_execution.plugin_params)
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_execution.latency_percentiles)
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_execution.throughput_percentiles)

def build_aggregated_results(self) -> TestExecution:
test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
aggregated_results = self.build_aggregated_results_dict()

if hasattr(self.args, 'results_file') and self.args.results_file != "":
normalized_results_file = rio.normalize_path(self.args.results_file, self.cwd)
Expand All @@ -165,19 +191,7 @@ def build_aggregated_results(self):

print("Aggregate test run ID: ", test_run_id)

# add values to the configuration object
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.names", test_run.provision_config_instance)
self.config.add(config.Scope.applicationOverride, "system",
"env.name", test_run.environment_name)
self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp)
self.config.add(config.Scope.applicationOverride, "test_run", "pipeline", test_run.pipeline)
self.config.add(config.Scope.applicationOverride, "workload", "params", test_run.workload_params)
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.params", test_run.provision_config_instance_params)
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_run.plugin_params)
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_run.latency_percentiles)
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_run.throughput_percentiles)
self.update_config_object(test_exe)

loaded_workload = workload.load_workload(self.config)
test_procedure_object = loaded_workload.find_test_procedure_or_default(self.test_procedure_name)
Expand Down Expand Up @@ -223,7 +237,7 @@ def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], task_na

return weighted_metrics

def calculate_rsd(self, values: List[Union[int, float]], metric_name: str):
def calculate_rsd(self, values: List[Union[int, float]], metric_name: str) -> Union[float, str]:
if not values:
raise ValueError(f"Cannot calculate RSD for metric '{metric_name}': empty list of values")
if len(values) == 1:
Expand Down
3 changes: 2 additions & 1 deletion tests/aggregator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def test_count_iterations_for_each_op(aggregator):
mock_test_procedure.schedule = mock_schedule
mock_workload.test_procedures = [mock_test_procedure]

mock_test_run = Mock(test_run_id="test1", workload_params={})
mock_workload.find_test_procedure_or_default = Mock(return_value=mock_test_procedure)
mock_test_execution = Mock(test_execution_id="test1", workload_params={})

aggregator.loaded_workload = mock_workload
aggregator.test_procedure_name = "test_procedure_name"
Expand Down

0 comments on commit d0aa6d2

Please sign in to comment.