Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change min/max to overall_min/overall_max + update comparison results publisher #692

Merged
merged 3 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,24 +193,27 @@ def build_aggregated_results(self):

def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterations: int) -> Dict[str, Any]:
weighted_metrics = {}
num_executions = len(next(iter(task_metrics.values())))
total_iterations = iterations * num_executions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, iterations is the number of iterations that the user inputted in workload params / default number of times a task is run in a workload? Not the number of times we executed the same test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct!


for metric, values in task_metrics.items():
if isinstance(values[0], dict):
weighted_metrics[metric] = {}
for item_key in values[0].keys():
if item_key == 'unit':
weighted_metrics[metric][item_key] = values[0][item_key]
for metric_field in values[0].keys():
if metric_field == 'unit':
weighted_metrics[metric][metric_field] = values[0][metric_field]
elif metric_field == 'min':
weighted_metrics[metric]['overall_min'] = min(value.get(metric_field, 0) for value in values)
elif metric_field == 'max':
weighted_metrics[metric]['overall_max'] = max(value.get(metric_field, 0) for value in values)
else:
item_values = [value.get(item_key, 0) for value in values]
# for items like median or containing percentile values
item_values = [value.get(metric_field, 0) for value in values]
weighted_sum = sum(value * iterations for value in item_values)
total_iterations = iterations * len(item_values)
weighted_avg = weighted_sum / total_iterations
weighted_metrics[metric][item_key] = weighted_avg
weighted_metrics[metric][metric_field] = weighted_sum / total_iterations
else:
weighted_sum = sum(value * iterations for value in values)
total_iterations = iterations * len(values)
weighted_avg = weighted_sum / total_iterations
weighted_metrics[metric] = weighted_avg
weighted_metrics[metric] = weighted_sum / total_iterations

return weighted_metrics

Expand Down
8 changes: 4 additions & 4 deletions osbenchmark/results_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,16 +464,16 @@ def _write_results(self, metrics_table, metrics_table_console):
data_plain=metrics_table, data_rich=metrics_table_console)

def _publish_throughput(self, baseline_stats, contender_stats, task):
b_min = baseline_stats.metrics(task)["throughput"]["min"]
b_min = baseline_stats.metrics(task)["throughput"].get("overall_min") or baseline_stats.metrics(task)["throughput"]["min"]
b_mean = baseline_stats.metrics(task)["throughput"]["mean"]
b_median = baseline_stats.metrics(task)["throughput"]["median"]
b_max = baseline_stats.metrics(task)["throughput"]["max"]
b_max = baseline_stats.metrics(task)["throughput"].get("overall_max") or baseline_stats.metrics(task)["throughput"]["max"]
b_unit = baseline_stats.metrics(task)["throughput"]["unit"]

c_min = contender_stats.metrics(task)["throughput"]["min"]
c_min = contender_stats.metrics(task)["throughput"].get("overall_min") or contender_stats.metrics(task)["throughput"]["min"]
c_mean = contender_stats.metrics(task)["throughput"]["mean"]
c_median = contender_stats.metrics(task)["throughput"]["median"]
c_max = contender_stats.metrics(task)["throughput"]["max"]
c_max = contender_stats.metrics(task)["throughput"].get("overall_max") or contender_stats.metrics(task)["throughput"]["max"]

return self._join(
self._line("Min Throughput", b_min, c_min, task, b_unit, treat_increase_as_improvement=True),
Expand Down
113 changes: 78 additions & 35 deletions tests/aggregator_test.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from unittest.mock import patch, Mock
from unittest.mock import Mock, patch, mock_open
import pytest

from osbenchmark import config
from osbenchmark.aggregator import Aggregator
from osbenchmark.aggregator import Aggregator, AggregatedResults

@pytest.fixture
def mock_config():
mock_cfg = Mock(spec=config.Config)
mock_cfg.opts.side_effect = lambda *args: "/path/to/root" if args == ("node", "root.dir") else None
mock_cfg.opts.side_effect = lambda *args: "test_procedure_name" if args == ("workload", "test_procedure.name") else "/path/to/root"
return mock_cfg

@pytest.fixture
Expand All @@ -29,8 +28,8 @@ def mock_args():
def mock_test_store():
mock_store = Mock()
mock_store.find_by_test_execution_id.side_effect = [
Mock(results={"key1": {"nested": 10}}),
Mock(results={"key1": {"nested": 20}})
Mock(results={"key1": {"nested": 10}}, workload="workload1", test_procedure="test_proc1"),
Mock(results={"key1": {"nested": 20}}, workload="workload1", test_procedure="test_proc1")
]
return mock_store

Expand All @@ -40,28 +39,35 @@ def aggregator(mock_config, mock_test_executions, mock_args, mock_test_store):
aggregator.test_store = mock_test_store
return aggregator

def test_iterations(aggregator, mock_args):
def test_count_iterations_for_each_op(aggregator):
mock_workload = Mock()
mock_schedule = [Mock(name="op1", iterations=5)]
mock_task = Mock(name="task1", schedule=mock_schedule)
mock_workload.test_procedures = [mock_task]

# Mock the config.opts call to return the same test_procedure.name
aggregator.config.opts.side_effect = lambda *args: mock_task.name if args == ("workload", "test_procedure.name") else None

mock_task = Mock(spec=['name', 'iterations'])
mock_task.name = "op1"
mock_task.iterations = 5
mock_schedule = [mock_task]
mock_test_procedure = Mock(spec=['name', 'schedule'])
mock_test_procedure.name = "test_procedure_name"
mock_test_procedure.schedule = mock_schedule
mock_workload.test_procedures = [mock_test_procedure]

# Update the config mock to return the correct test_procedure_name
aggregator.config.opts.side_effect = lambda *args: \
mock_test_procedure.name if args == ("workload", "test_procedure.name") else "/path/to/root"
with patch('osbenchmark.workload.load_workload', return_value=mock_workload):
aggregator.count_iterations_for_each_op()

assert list(aggregator.accumulated_iterations.values())[0] == 5
print(f"accumulated_iterations: {aggregator.accumulated_iterations}") # Debug print
assert "op1" in aggregator.accumulated_iterations, "op1 not found in accumulated_iterations"
assert aggregator.accumulated_iterations["op1"] == 5

def test_results(aggregator):
def test_accumulate_results(aggregator):
mock_test_execution = Mock()
mock_test_execution.results = {
"op_metrics": [
{
"task": "task1",
"throughput": 100,
"latency": 10,
"latency": {"avg": 10, "unit": "ms"},
"service_time": 5,
"client_processing_time": 2,
"processing_time": 3,
Expand All @@ -74,9 +80,19 @@ def test_results(aggregator):
aggregator.accumulate_results(mock_test_execution)

assert "task1" in aggregator.accumulated_results
assert all(metric in aggregator.accumulated_results["task1"] for metric in
["throughput", "latency", "service_time", "client_processing_time",
"processing_time", "error_rate", "duration"])
assert all(metric in aggregator.accumulated_results["task1"] for metric in aggregator.metrics)

def test_test_execution_compatibility_check(aggregator):
mock_test_store = Mock()
mock_test_store.find_by_test_execution_id.side_effect = [
Mock(workload="workload1", test_procedure="test_proc1"),
Mock(workload="workload1", test_procedure="test_proc1"),
Mock(workload="workload1", test_procedure="test_proc1"), # Add one more mock response
]
aggregator.test_store = mock_test_store
aggregator.test_executions = {"test1": Mock(), "test2": Mock()}

assert aggregator.test_execution_compatibility_check()

def test_aggregate_json_by_key(aggregator):
result = aggregator.aggregate_json_by_key("key1.nested")
Expand All @@ -95,25 +111,52 @@ def test_calculate_weighted_average(aggregator):
assert result["latency"]["avg"] == 15
assert result["latency"]["unit"] == "ms"

def test_compatibility_check(aggregator):
mock_test_procedure = Mock(name="test_procedure")
mock_test_store = Mock()
mock_test_store.find_by_test_execution_id.side_effect = [
Mock(workload="workload1", test_procedure=mock_test_procedure),
Mock(workload="workload1", test_procedure=mock_test_procedure),
Mock(workload="workload1", test_procedure=mock_test_procedure)
]
aggregator.test_store = mock_test_store
assert aggregator.test_execution_compatibility_check()

def test_calculate_rsd(aggregator):
values = [1, 2, 3, 4, 5]
rsd = aggregator.calculate_rsd(values, "test_metric")
assert isinstance(rsd, float)

def test_compatibility_check_incompatible(aggregator):
def test_test_execution_compatibility_check_incompatible(aggregator):
mock_test_store = Mock()
mock_test_store.find_by_test_execution_id.side_effect = [
Mock(workload="workload1"),
Mock(workload="workload2"),
Mock(workload="workload1")
Mock(workload="workload1", test_procedure="test_proc1"),
Mock(workload="workload2", test_procedure="test_proc1"),
]
aggregator.test_store = mock_test_store
aggregator.test_executions = {"test1": Mock(), "test2": Mock()}
with pytest.raises(ValueError):
aggregator.test_execution_compatibility_check()

def test_aggregate(aggregator):
mock_aggregated_results = Mock(test_execution_id="mock_id", as_dict=lambda: {})

with patch.object(aggregator, 'test_execution_compatibility_check', return_value=True), \
patch.object(aggregator, 'count_iterations_for_each_op'), \
patch.object(aggregator, 'accumulate_results'), \
patch.object(aggregator, 'build_aggregated_results', return_value=mock_aggregated_results) as mock_build, \
patch('osbenchmark.aggregator.FileTestExecutionStore') as mock_store_class, \
patch('osbenchmark.utils.io.ensure_dir') as mock_ensure_dir, \
patch('builtins.open', mock_open()) as mock_file:

mock_store = mock_store_class.return_value
mock_store.store_aggregated_execution.side_effect = lambda x: print(f"Storing aggregated execution: {x}")

aggregator.aggregate()

print(f"mock_build called: {mock_build.called}")
print(f"mock_store.store_aggregated_execution called: {mock_store.store_aggregated_execution.called}")

assert mock_build.called, "build_aggregated_results was not called"
mock_store.store_aggregated_execution.assert_called_once_with(mock_aggregated_results)

print(f"ensure_dir called: {mock_ensure_dir.called}")
print(f"ensure_dir call args: {mock_ensure_dir.call_args_list}")
print(f"open called: {mock_file.called}")
print(f"open call args: {mock_file.call_args_list}")

assert mock_store.store_aggregated_execution.called, "store_aggregated_execution was not called"

def test_aggregated_results():
results = {"key": "value"}
agg_results = AggregatedResults(results)
assert agg_results.as_dict() == results
58 changes: 57 additions & 1 deletion tests/results_publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
# under the License.

from unittest import TestCase
from unittest.mock import Mock, patch

from osbenchmark import results_publisher


# pylint: disable=protected-access
class FormatterTests(TestCase):
def setUp(self):
self.empty_header = ["Header"]
Expand Down Expand Up @@ -57,3 +58,58 @@ def test_formats_as_csv(self):
formatted = results_publisher.format_as_csv(self.metrics_header, self.metrics_data)
# 1 header line, no separation line + 3 data lines
self.assertEqual(1 + 3, len(formatted.splitlines()))

@patch('osbenchmark.results_publisher.convert.to_bool')
def test_publish_throughput_handles_different_metrics(self, mock_to_bool):
config = Mock()

# Configure mock to return appropriate values for different calls
def config_opts_side_effect(*args, **kwargs):
if args[0] == "results_publishing":
if args[1] == "output.processingtime":
return False
elif args[1] == "percentiles":
return None
return Mock()

config.opts.side_effect = config_opts_side_effect

publisher = results_publisher.ComparisonResultsPublisher(config)

# Mock for regular test execution
regular_stats = Mock()
regular_stats.metrics.return_value = {
"throughput": {
"min": 100,
"max": 200,
"mean": 150,
"median": 160,
"unit": "ops/s"
}
}

# Mock for aggregated test execution
aggregated_stats = Mock()
aggregated_stats.metrics.return_value = {
"throughput": {
"overall_min": 95,
"overall_max": 205,
"min": 100,
"max": 200,
"mean": 150,
"median": 160,
"unit": "ops/s"
}
}

# Test with regular stats
result_regular = publisher._publish_throughput(regular_stats, regular_stats, "test_task")
self.assertEqual(len(result_regular), 4)
self.assertEqual(result_regular[0][2], 100) # baseline min
self.assertEqual(result_regular[3][3], 200) # contender max

# Test with aggregated stats
result_aggregated = publisher._publish_throughput(aggregated_stats, aggregated_stats, "test_task")
self.assertEqual(len(result_aggregated), 4)
self.assertEqual(result_aggregated[0][2], 95) # baseline overall_min
self.assertEqual(result_aggregated[3][3], 205) # contender overall_max
Loading