Skip to content

Commit

Permalink
Only collect DiskIO data for the disk used for benchmarking
Browse files Browse the repository at this point in the history
  • Loading branch information
JackKelly committed Oct 18, 2023
1 parent 4d8253f commit 36f9620
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/perfcapture/metrics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pathlib
from dataclasses import dataclass


Expand Down
67 changes: 54 additions & 13 deletions src/perfcapture/performance_counters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import pathlib
from collections import namedtuple
from dataclasses import dataclass, field
from dataclasses import InitVar, dataclass, field
from datetime import datetime

import numpy as np
Expand All @@ -18,13 +19,15 @@ class _PerfCounterABC(abc.ABC):
Usage:
1. Init PerfCounter subclass when we start benchmarking a specific
combination of <Workload> and <Dataset>.
2. Call `start_timing_iteration()` at the start of each iteration.
3. Call `stop_timing_iteration()` at the end of each iteration.
4. Call `get_results()` at the end of the `run`, to get a `pd.DataFrame` of results.
2. Set `PerfCounter.dataset_path` if this perf counter needs to know the dataset path.
3. Call `start_timing_iteration()` at the start of each iteration.
4. Call `stop_timing_iteration()` at the end of each iteration.
5. Call `get_results()` at the end of the `run`, to get a `pd.DataFrame` of results.
"""
def __init__(self) -> None:
self._data_per_run = pd.DataFrame(columns=[self.name])
self._data_per_run.index.name = "run_ID"
self._dataset_path: pathlib.Path | None = None

def start_timing_run(self) -> None:
pass
Expand All @@ -41,16 +44,26 @@ def get_results(self) -> pd.DataFrame:
def name(self) -> str:
return self.__class__.__name__

@property
def dataset_path(self) -> pathlib.Path:
return self._dataset_path

@dataset_path.setter
def dataset_path(self, dataset_path: pathlib.Path) -> None:
self._dataset_path = dataset_path

@dataclass
class PerfCounterManager:
"""Simple manager for multiple performance counters."""
dataset_path: InitVar[pathlib.Path]
counters: list[_PerfCounterABC] = field(
default_factory=lambda: [Runtime(), BandwidthToNumpy(), DiskIO()]
)

def __post_init__(self) -> None:
def __post_init__(self, dataset_path: pathlib.Path) -> None:
self._run_id: int = 0
for counter in self.counters:
counter.dataset_path = dataset_path

def start_timing_run(self) -> None:
self._run_id += 1
Expand Down Expand Up @@ -100,9 +113,9 @@ def name(self) -> str:
class DiskIO(_PerfCounterABC):
"""Record performance of disks.
Note that this records performance of all disks on this machine,
used by all processes. So, if other processes are using any disk
(even a different disk to the disk that you're benchmarking) then
Note that this records performance of the specific disk used
to store the benchmark datasets, but we record the activity of
all processes. So, if other processes are using this disk then
you'll get misleading results!
For more information on the fields recorded, please see:
Expand All @@ -125,7 +138,17 @@ def __init__(self) -> None:
("read_bytes", "write_bytes", "read_time", "write_time", "busy_time"))
self._data_per_run = pd.DataFrame(dtype=np.int64, columns=columns)
self._data_per_run.index.name = "run_ID"


@property
def dataset_path(self) -> pathlib.Path:
return super().dataset_path()

@dataset_path.setter
def dataset_path(self, dataset_path: pathlib.Path) -> None:
self._dataset_path = dataset_path
self._dataset_partition_name = _get_partition_name_from_path(dataset_path)
print("dataset_partition_name =", self._dataset_partition_name)

def start_timing_run(self) -> None:
self._disk_counters_at_start_of_run = self._get_disk_io_counters_as_series()

Expand Down Expand Up @@ -162,9 +185,9 @@ def stop_timing_run(self, metrics_for_run: MetricsForRun) -> None:

self._data_per_run.loc[metrics_for_run.run_id] = count_diff

@classmethod
def _get_disk_io_counters_as_series(cls) -> pd.Series:
counters: namedtuple = psutil.disk_io_counters()
def _get_disk_io_counters_as_series(self) -> pd.Series:
counters: dict[str, namedtuple] = psutil.disk_io_counters(perdisk=True)
counters: namedtuple = counters[self._dataset_partition_name]
return pd.Series(counters._asdict())

@property
Expand All @@ -181,4 +204,22 @@ def __init__(self) -> None:

def total_secs_elapsed(self) -> float:
duration = datetime.now() - self._time_at_start
return duration.total_seconds()
return duration.total_seconds()


def _get_partition_name_from_path(dataset_path: pathlib.Path) -> str:
dataset_mount_point = _get_mount_point_from_path(dataset_path)
partitions: list[psutil._common.sdiskpart] = psutil.disk_partitions()
for partition in partitions:
if pathlib.Path(partition.mountpoint) == dataset_mount_point:
return pathlib.Path(partition.device).parts[-1]
raise RuntimeError(f"Could not find partition for {dataset_path}")


def _get_mount_point_from_path(p: pathlib.Path) -> pathlib.Path:
if len(p.parts) == 0:
raise RuntimeError(f"Path '{p}' should not be empty!")
elif p.is_mount():
return p
else:
return _get_mount_point_from_path(p.parent)
2 changes: 1 addition & 1 deletion src/perfcapture/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def run_workloads(
for workload in workloads:
for dataset in workload.datasets:
print(f"Running {workload.name} {workload.n_runs} times on {dataset.name}!")
perf_counter = PerfCounterManager()
perf_counter = PerfCounterManager(dataset.path)
for i in range(workload.n_runs):
print(f"Run {i+1} of {workload.n_runs}...")
if not keep_cache:
Expand Down

0 comments on commit 36f9620

Please sign in to comment.