Skip to content

Commit

Permalink
revision for aggregate PR
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Oviedo <[email protected]>
  • Loading branch information
OVI3D0 committed Sep 17, 2024
1 parent 9526f06 commit b5da7f2
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 74 deletions.
116 changes: 68 additions & 48 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
import os
from typing import Any, Dict, List, Union
import uuid

from osbenchmark.metrics import FileTestExecutionStore
from osbenchmark import metrics, workload, config
from osbenchmark.utils import io as rio

class Aggregator:
def __init__(self, cfg, test_executions_dict):
def __init__(self, cfg, test_executions_dict, args):
self.config = cfg
self.args = args
self.test_executions = test_executions_dict
self.accumulated_results: Dict[str, Dict[str, List[Any]]] = {}
self.accumulated_iterations: Dict[str, int] = {}
self.statistics = ["throughput", "latency", "service_time", "client_processing_time", "processing_time", "error_rate", "duration"]
self.test_store = metrics.test_execution_store(self.config)
self.cwd = cfg.opts("node", "benchmark.cwd")

# count iterations for each operation in the workload
def iterations(self) -> None:
def count_iterations_for_each_op(self) -> None:
loaded_workload = workload.load_workload(self.config)
for task in loaded_workload.test_procedures:
for operation in task.schedule:
operation_name = operation.name
iterations = operation.iterations or 1
self.accumulated_iterations.setdefault(operation_name, 0)
self.accumulated_iterations[operation_name] += iterations

# accumulate metrics for each task from test execution results
def results(self, test_execution: Any) -> None:
for test_procedure in loaded_workload.test_procedures:
for task in test_procedure.schedule:
task_name = task.name
iterations = task.iterations or 1
self.accumulated_iterations.setdefault(task_name, 0)
self.accumulated_iterations[task_name] += iterations

def accumulate_results(self, test_execution: Any) -> None:
for item in test_execution.results.get("op_metrics", []):
task = item.get("task", "")
self.accumulated_results.setdefault(task, {})
for metric in ["throughput", "latency", "service_time", "client_processing_time", "processing_time", "error_rate", "duration"]:
for metric in self.statistics:
self.accumulated_results[task].setdefault(metric, [])
self.accumulated_results[task][metric].append(item.get(metric))

# aggregate values from multiple test execution result JSON objects by a specified key path
def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any:
test_store = metrics.test_execution_store(self.config)
all_jsons = [test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()]
all_jsons = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()]

# retrieve nested value from a dictionary given a key path
def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any:
Expand All @@ -46,7 +48,6 @@ def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any:
return None
return obj

# recursively aggregate values, handling different data types
def aggregate_helper(objects: List[Any]) -> Any:
if not objects:
return None
Expand All @@ -67,9 +68,8 @@ def aggregate_helper(objects: List[Any]) -> Any:
values = [get_nested_value(json, key_path) for json in all_jsons]
return aggregate_helper(values)

# construct aggregated results dict
def build_aggregated_results(self, test_store):
test_exe = test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
def build_aggregated_results(self):
test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
aggregated_results = {
"op-metrics": [],
"correctness_metrics": self.aggregate_json_by_key("correctness_metrics"),
Expand Down Expand Up @@ -125,39 +125,49 @@ def build_aggregated_results(self, test_store):
aggregated_results["op-metrics"].append(op_metric)

# extract the necessary data from the first test execution, since the configurations should be identical for all test executions
test_exe_store = metrics.test_execution_store(self.config)
first_test_execution = test_exe_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
current_timestamp = self.config.opts("system", "time.start")

if hasattr(self.args, 'results_file') and self.args.results_file != "":
normalized_results_file = rio.normalize_path(self.args.results_file, self.cwd)
# ensure that the parent folder already exists when we try to write the file...
rio.ensure_dir(rio.dirname(normalized_results_file))
test_execution_id = os.path.basename(normalized_results_file)
self.config.add(config.Scope.applicationOverride, "system", "test_execution.id", normalized_results_file)
elif hasattr(self.args, 'test_execution_id') and self.args.test_execution_id:
test_execution_id = f"aggregate_results_{test_exe.workload}_{self.args.test_execution_id}"
self.config.add(config.Scope.applicationOverride, "system", "test_execution.id", test_execution_id)
else:
test_execution_id = f"aggregate_results_{test_exe.workload}_{str(uuid.uuid4())}"
self.config.add(config.Scope.applicationOverride, "system", "test_execution.id", test_execution_id)

print("Aggregate test execution ID: ", test_execution_id)

# add values to the configuration object
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.names", first_test_execution.provision_config_instance)
"provision_config_instance.names", test_exe.provision_config_instance)
self.config.add(config.Scope.applicationOverride, "system",
"env.name", first_test_execution.environment_name)
self.config.add(config.Scope.applicationOverride, "system", "test_execution.id",
f"aggregate_results_{first_test_execution.workload}_{str(uuid.uuid4())}")
"env.name", test_exe.environment_name)
self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp)
self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", first_test_execution.pipeline)
self.config.add(config.Scope.applicationOverride, "workload", "params", first_test_execution.workload_params)
self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_exe.pipeline)
self.config.add(config.Scope.applicationOverride, "workload", "params", test_exe.workload_params)
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.params", first_test_execution.provision_config_instance_params)
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", first_test_execution.plugin_params)
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", first_test_execution.latency_percentiles)
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", first_test_execution.throughput_percentiles)
"provision_config_instance.params", test_exe.provision_config_instance_params)
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_exe.plugin_params)
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_exe.latency_percentiles)
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_exe.throughput_percentiles)

loaded_workload = workload.load_workload(self.config)
test_procedure = loaded_workload.find_test_procedure_or_default(first_test_execution.test_procedure)
test_procedure = loaded_workload.find_test_procedure_or_default(test_exe.test_procedure)

test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure, first_test_execution.workload_revision)
test_execution.add_results(aggregated_results)
test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure, test_exe.workload_revision)
test_execution.add_results(AggregatedResults(aggregated_results))
test_execution.distribution_version = test_exe.distribution_version
test_execution.revision = test_exe.revision
test_execution.distribution_flavor = test_exe.distribution_flavor
test_execution.provision_config_revision = test_exe.provision_config_revision

return test_execution

# calculate weighted averages for task metrics
def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterations: int) -> Dict[str, Any]:
weighted_metrics = {}

Expand All @@ -184,33 +194,43 @@ def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterati
weighted_metrics[metric] = sum(values) / len(values)
return weighted_metrics

# verify that all test executions have the same workload
def compatibility_check(self, test_store) -> None:
first_test_execution = test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
def test_execution_compatibility_check(self) -> None:
first_test_execution = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
workload = first_test_execution.workload
test_procedure = first_test_execution.test_procedure
for id in self.test_executions.keys():
test_execution = test_store.find_by_test_execution_id(id)
test_execution = self.test_store.find_by_test_execution_id(id)
if test_execution:
if test_execution.workload != workload:
raise ValueError(f"Incompatible workload: test {id} has workload '{test_execution.workload}' instead of '{workload}'")
if test_execution.test_procedure != test_procedure:
raise ValueError(f"Incompatible test procedure: test {id} has test procedure
'{test_execution.test_procedure}' instead of '{test_procedure}'")
else:
raise ValueError("Test execution not found: ", id)

self.config.add(config.Scope.applicationOverride, "workload", "test_procedure.name", first_test_execution.test_procedure)
return True

# driver code
def aggregate(self) -> None:
test_execution_store = metrics.test_execution_store(self.config)
if self.compatibility_check(test_execution_store):
if self.test_execution_compatibility_check():
for id in self.test_executions.keys():
test_execution = test_execution_store.find_by_test_execution_id(id)
test_execution = self.test_store.find_by_test_execution_id(id)
if test_execution:
self.config.add(config.Scope.applicationOverride, "workload", "repository.name", "default")
self.config.add(config.Scope.applicationOverride, "workload", "repository.name", self.args.workload_repository)
self.config.add(config.Scope.applicationOverride, "workload", "workload.name", test_execution.workload)
self.iterations()
self.results(test_execution)
self.count_iterations_for_each_op()
self.accumulate_results(test_execution)

aggregated_results = self.build_aggregated_results(test_execution_store)
aggregated_results = self.build_aggregated_results()
file_test_exe_store = FileTestExecutionStore(self.config)
file_test_exe_store.store_test_execution(aggregated_results)
else:
raise ValueError("Incompatible test execution results")

class AggregatedResults:
def __init__(self, results):
self.results = results

def as_dict(self):
return self.results
23 changes: 18 additions & 5 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,18 @@ def add_workload_source(subparser):
type=non_empty_list,
required=True,
help="Comma-separated list of TestExecution IDs to aggregate")
aggregate_parser.add_argument(
"--test-execution-id",
help="Define a unique id for this aggregated test_execution.",
default="")
aggregate_parser.add_argument(
"--results-file",
help="Write the aggregated results to the provided file.",
default="")
aggregate_parser.add_argument(
"--workload-repository",
help="Define the repository from where OSB will load workloads (default: default).",
default="default")

download_parser = subparsers.add_parser("download", help="Downloads an artifact")
download_parser.add_argument(
Expand Down Expand Up @@ -841,10 +853,11 @@ def configure_results_publishing_params(args, cfg):
cfg.add(config.Scope.applicationOverride, "results_publishing", "output.path", args.results_file)
cfg.add(config.Scope.applicationOverride, "results_publishing", "numbers.align", args.results_numbers_align)

def prepare_test_executions_dict(test_executions_arg):
def prepare_test_executions_dict(args, cfg):
cfg.add(config.Scope.applicationOverride, "results_publishing", "output.path", args.results_file)
test_executions_dict = {}
if test_executions_arg:
for execution in test_executions_arg:
if args.test_executions:
for execution in args.test_executions:
execution = execution.strip()
if execution:
test_executions_dict[execution] = None
Expand All @@ -865,8 +878,8 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "results_publishing", "percentiles", args.percentiles)
results_publisher.compare(cfg, args.baseline, args.contender)
elif sub_command == "aggregate":
test_executions_dict = prepare_test_executions_dict(args.test_executions)
aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict)
test_executions_dict = prepare_test_executions_dict(args, cfg)
aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict, args)
aggregator_instance.aggregate()
elif sub_command == "list":
cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration)
Expand Down
2 changes: 1 addition & 1 deletion osbenchmark/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,7 @@ def as_dict(self):
}
}
if self.results:
d["results"] = self.results if isinstance(self.results, dict) else self.results.as_dict()
d["results"] = self.results.as_dict()
if self.workload_revision:
d["workload-revision"] = self.workload_revision
if not self.test_procedure.auto_generated:
Expand Down
55 changes: 35 additions & 20 deletions tests/aggregator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

@pytest.fixture
def mock_config():
return Mock(spec=config.Config)
mock_cfg = Mock(spec=config.Config)
mock_cfg.opts.side_effect = lambda *args: "/path/to/root" if args == ("node", "root.dir") else None
return mock_cfg

@pytest.fixture
def mock_test_executions():
Expand All @@ -16,8 +18,27 @@ def mock_test_executions():
}

@pytest.fixture
def aggregator(mock_config, mock_test_executions):
return Aggregator(mock_config, mock_test_executions)
def mock_args():
return Mock(
results_file="",
test_execution_id="",
workload_repository="default"
)

@pytest.fixture
def mock_test_store():
mock_store = Mock()
mock_store.find_by_test_execution_id.side_effect = [
Mock(results={"key1": {"nested": 10}}),
Mock(results={"key1": {"nested": 20}})
]
return mock_store

@pytest.fixture
def aggregator(mock_config, mock_test_executions, mock_args, mock_test_store):
aggregator = Aggregator(mock_config, mock_test_executions, mock_args)
aggregator.test_store = mock_test_store
return aggregator

def test_iterations(aggregator):
mock_workload = Mock()
Expand All @@ -27,7 +48,7 @@ def test_iterations(aggregator):
mock_workload.test_procedures = [mock_task]

with patch('osbenchmark.workload.load_workload', return_value=mock_workload):
aggregator.iterations()
aggregator.count_iterations_for_each_op()

assert aggregator.accumulated_iterations == {mock_operation.name: 5}

Expand All @@ -48,23 +69,15 @@ def test_results(aggregator):
]
}

aggregator.results(mock_test_execution)
aggregator.accumulate_results(mock_test_execution)

assert "task1" in aggregator.accumulated_results
assert all(metric in aggregator.accumulated_results["task1"] for metric in
["throughput", "latency", "service_time", "client_processing_time",
"processing_time", "error_rate", "duration"])

def test_aggregate_json_by_key(aggregator):
mock_test_store = Mock()
mock_test_store.find_by_test_execution_id.side_effect = [
Mock(results={"key1": {"nested": 10}}),
Mock(results={"key1": {"nested": 20}})
]

with patch('osbenchmark.metrics.test_execution_store', return_value=mock_test_store):
result = aggregator.aggregate_json_by_key("key1.nested")

result = aggregator.aggregate_json_by_key("key1.nested")
assert result == 15

def test_calculate_weighted_average(aggregator):
Expand All @@ -81,14 +94,16 @@ def test_calculate_weighted_average(aggregator):
assert result["latency"]["unit"] == "ms"

def test_compatibility_check(aggregator):
mock_test_procedure = Mock(name="test_procedure")
mock_test_store = Mock()
mock_test_store.find_by_test_execution_id.side_effect = [
Mock(workload="workload1"),
Mock(workload="workload1"),
Mock(workload="workload1")
Mock(workload="workload1", test_procedure=mock_test_procedure),
Mock(workload="workload1", test_procedure=mock_test_procedure),
Mock(workload="workload1", test_procedure=mock_test_procedure)
]
aggregator.test_store = mock_test_store
assert aggregator.test_execution_compatibility_check()

assert aggregator.compatibility_check(mock_test_store)

def test_compatibility_check_incompatible(aggregator):
mock_test_store = Mock()
Expand All @@ -97,6 +112,6 @@ def test_compatibility_check_incompatible(aggregator):
Mock(workload="workload2"),
Mock(workload="workload1")
]

aggregator.test_store = mock_test_store
with pytest.raises(ValueError):
aggregator.compatibility_check(mock_test_store)
aggregator.test_execution_compatibility_check()

0 comments on commit b5da7f2

Please sign in to comment.