Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(performance-metrics): separate storage logic from tracker #14982

Merged
merged 9 commits into from
Apr 26, 2024
Next Next commit
refactor(performance-metrics): pull out storage logic into class
DerekMaggio committed Apr 24, 2024
commit 8e5aa72baea29932f1c162f2523089dae240f406
38 changes: 34 additions & 4 deletions api/tests/opentrons/cli/test_cli.py
Original file line number Diff line number Diff line change
@@ -5,12 +5,15 @@
import tempfile
import textwrap

from dataclasses import dataclass
from dataclasses import dataclass, replace
from typing import Any, Dict, Iterator, List, Optional
from pathlib import Path

import pytest
from click.testing import CliRunner
from opentrons_shared_data.performance.dev_types import (
RobotContextState,
)
from opentrons.util.performance_helpers import _get_robot_context_tracker


@@ -24,6 +27,18 @@
from opentrons.cli.analyze import analyze # noqa: E402


@pytest.fixture
def override_data_store(tmp_path: Path) -> Iterator[None]:
"""Override the data store metadata for the RobotContextTracker."""
old_store = context_tracker._store # type: ignore[attr-defined]
old_metadata = old_store.metadata
new_metadata = replace(old_metadata, storage_dir=tmp_path)
context_tracker._store = old_store.__class__(metadata=new_metadata) # type: ignore[attr-defined]
context_tracker._store.setup() # type: ignore[attr-defined]
yield
context_tracker._store = old_store # type: ignore[attr-defined]


def _list_fixtures(version: int) -> Iterator[Path]:
return Path(__file__).parent.glob(
f"../../../../shared-data/protocol/fixtures/{version}/*.json"
@@ -255,6 +270,7 @@ def test_python_error_line_numbers(
assert error["detail"] == expected_detail


@pytest.mark.usefixtures("override_data_store")
def test_track_analysis(tmp_path: Path) -> None:
"""Test that the RobotContextTracker tracks analysis."""
protocol_source = textwrap.dedent(
@@ -265,12 +281,26 @@ def run(protocol):
pass
"""
)

protocol_source_file = tmp_path / "protocol.py"
protocol_source_file.write_text(protocol_source, encoding="utf-8")
store = context_tracker._store # type: ignore[attr-defined]

before_analysis = len(context_tracker._storage) # type: ignore[attr-defined]
num_storage_entities_before_analysis = len(store.data)

_get_analysis_result([protocol_source_file])

assert len(context_tracker._storage) == before_analysis + 1 # type: ignore[attr-defined]
assert len(store.data) == num_storage_entities_before_analysis + 1

with open(store.metadata.data_file_location, "r") as f:
stored_data = f.readlines()
assert len(stored_data) == 0

context_tracker.store()

with open(store.metadata.data_file_location, "r") as f:
stored_data = f.readlines()
assert len(stored_data) == 1
state_id, start_time, duration = stored_data[0].strip().split(",")
assert state_id == str(RobotContextState.ANALYZING_PROTOCOL.state_id)
assert start_time.isdigit()
assert duration.isdigit()
33 changes: 33 additions & 0 deletions performance-metrics/src/performance_metrics/data_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Interface for storing performance metrics data to a CSV file."""

import csv
import typing
from opentrons_shared_data.performance.dev_types import StorageMetadata
from performance_metrics.datashapes import SupportsCSVStorage

T = typing.TypeVar("T", bound=SupportsCSVStorage)


class MetricsStore(typing.Generic[T]):
"""Dataclass to store data for tracking robot context."""

def __init__(self, metadata: StorageMetadata) -> None:
"""Initialize the storage engine."""
self.metadata = metadata
self.data: typing.List[T] = []

def setup(self) -> None:
"""Set up the data store."""
self.metadata.storage_dir.mkdir(parents=True, exist_ok=True)
self.metadata.data_file_location.touch(exist_ok=True)
self.metadata.headers_file_location.touch(exist_ok=True)
self.metadata.headers_file_location.write_text(",".join(self.metadata.headers))

def store(self) -> None:
"""Clear the stored data and write it to the storage file."""
stored_data = self.data.copy()
self.data.clear()
rows_to_write = [context_data.csv_row() for context_data in stored_data]
with open(self.metadata.data_file_location, "a") as storage_file:
writer = csv.writer(storage_file)
writer.writerows(rows_to_write)
42 changes: 35 additions & 7 deletions performance-metrics/src/performance_metrics/datashapes.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,32 @@
"""Defines data classes and enums used in the performance metrics module."""
"""Defines the shape of stored data."""

import dataclasses
from typing import Tuple
from typing import Sequence, Tuple, Protocol, Union
from opentrons_shared_data.performance.dev_types import RobotContextState

StorableData = Union[int, float, str]


class SupportsCSVStorage(Protocol):
"""A protocol for classes that support CSV storage."""

@classmethod
def headers(self) -> Tuple[str, ...]:
"""Returns the headers for the CSV data."""
...

def csv_row(self) -> Tuple[StorableData, ...]:
"""Returns the object as a CSV row."""
...

@classmethod
def from_csv_row(cls, row: Tuple[StorableData, ...]) -> "SupportsCSVStorage":
"""Returns an object from a CSV row."""
...


@dataclasses.dataclass(frozen=True)
class RawContextData:
class RawContextData(SupportsCSVStorage):
"""Represents raw duration data with context state information.
Attributes:
@@ -16,10 +36,9 @@ class RawContextData:
- state (RobotContextStates): The current state of the context.
"""

func_start: int
duration_start: int
duration_end: int
state: RobotContextState
func_start: int
duration: int

@classmethod
def headers(self) -> Tuple[str, str, str]:
@@ -31,5 +50,14 @@ def csv_row(self) -> Tuple[int, int, int]:
return (
self.state.state_id,
self.func_start,
self.duration_end - self.duration_start,
self.duration,
)

@classmethod
def from_csv_row(cls, row: Sequence[StorableData]) -> SupportsCSVStorage:
"""Returns a RawContextData object from a CSV row."""
return cls(
state=RobotContextState.from_id(int(row[0])),
func_start=int(row[1]),
duration=int(row[2]),
)
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
"""Module for tracking robot context and execution duration for different operations."""

import csv
from pathlib import Path
import platform

from functools import wraps, partial
from time import perf_counter_ns
import os
from typing import Callable, TypeVar, cast


from typing_extensions import ParamSpec
from collections import deque
from performance_metrics.datashapes import (
RawContextData,
)
from performance_metrics.data_store import MetricsStore
from opentrons_shared_data.performance.dev_types import (
RobotContextState,
SupportsTracking,
StorageMetadata,
)

P = ParamSpec("P")
@@ -47,14 +46,23 @@ def _get_timing_function() -> Callable[[], int]:
class RobotContextTracker(SupportsTracking):
"""Tracks and stores robot context and execution duration for different operations."""

FILE_NAME = "context_data.csv"
METADATA_NAME = "robot_context_data"

def __init__(self, storage_location: Path, should_track: bool = False) -> None:
"""Initializes the RobotContextTracker with an empty storage list."""
self._storage: deque[RawContextData] = deque()
self._storage_file_path = storage_location / self.FILE_NAME
self._store = MetricsStore[RawContextData](
StorageMetadata(
name=self.METADATA_NAME,
storage_dir=storage_location,
storage_format="csv",
headers=RawContextData.headers(),
)
)
self._should_track = should_track

if self._should_track:
self._store.setup()

def track(self, state: RobotContextState) -> Callable: # type: ignore
"""Decorator factory for tracking the execution duration and state of robot operations.
@@ -77,14 +85,14 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
result = func(*args, **kwargs)
finally:
duration_end_time = perf_counter_ns()
self._storage.append(
self._store.data.append(
RawContextData(
function_start_time,
duration_start_time,
duration_end_time,
state,
func_start=function_start_time,
duration=duration_end_time - duration_start_time,
state=state,
)
)

return result

return wrapper
@@ -93,11 +101,6 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:

def store(self) -> None:
"""Returns the stored context data and clears the storage list."""
stored_data = self._storage.copy()
self._storage.clear()
rows_to_write = [context_data.csv_row() for context_data in stored_data]
os.makedirs(self._storage_file_path.parent, exist_ok=True)
with open(self._storage_file_path, "a") as storage_file:
writer = csv.writer(storage_file)
writer.writerow(RawContextData.headers())
writer.writerows(rows_to_write)
if not self._should_track:
return
self._store.store()
Empty file.
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@

import asyncio
from pathlib import Path
from performance_metrics.datashapes import RawContextData
import pytest
from performance_metrics.robot_context_tracker import RobotContextTracker
from opentrons_shared_data.performance.dev_types import RobotContextState
@@ -47,7 +48,7 @@ def shutting_down_robot() -> None:

# Ensure storage is initially empty
assert (
len(robot_context_tracker._storage) == 0
len(robot_context_tracker._store.data) == 0
), "Storage should be initially empty."

starting_robot()
@@ -57,7 +58,7 @@ def shutting_down_robot() -> None:
shutting_down_robot()

# Verify that all states were tracked
assert len(robot_context_tracker._storage) == 5, "All states should be tracked."
assert len(robot_context_tracker._store.data) == 5, "All states should be tracked."

# Validate the sequence and accuracy of tracked states
expected_states = [
@@ -69,7 +70,9 @@ def shutting_down_robot() -> None:
]
for i, state in enumerate(expected_states):
assert (
RobotContextState.from_id(robot_context_tracker._storage[i].state.state_id)
RobotContextState.from_id(
robot_context_tracker._store.data[i].state.state_id
)
== state
), f"State at index {i} should be {state}."

@@ -91,11 +94,11 @@ def second_operation() -> None:
second_operation()

assert (
len(robot_context_tracker._storage) == 2
len(robot_context_tracker._store.data) == 2
), "Both operations should be tracked."
assert (
robot_context_tracker._storage[0].state
== robot_context_tracker._storage[1].state
robot_context_tracker._store.data[0].state
== robot_context_tracker._store.data[1].state
== RobotContextState.RUNNING_PROTOCOL
), "Both operations should have the same state."

@@ -114,10 +117,10 @@ def error_prone_operation() -> None:
error_prone_operation()

assert (
len(robot_context_tracker._storage) == 1
len(robot_context_tracker._store.data) == 1
), "Failed operation should still be tracked."
assert (
robot_context_tracker._storage[0].state == RobotContextState.SHUTTING_DOWN
robot_context_tracker._store.data[0].state == RobotContextState.SHUTTING_DOWN
), "State should be correctly logged despite the exception."


@@ -134,10 +137,11 @@ async def async_analyzing_operation() -> None:
await async_analyzing_operation()

assert (
len(robot_context_tracker._storage) == 1
len(robot_context_tracker._store.data) == 1
), "Async operation should be tracked."
assert (
robot_context_tracker._storage[0].state == RobotContextState.ANALYZING_PROTOCOL
robot_context_tracker._store.data[0].state
== RobotContextState.ANALYZING_PROTOCOL
), "State should be ANALYZING_PROTOCOL."


@@ -152,10 +156,9 @@ def running_operation() -> None:

running_operation()

duration_data = robot_context_tracker._storage[0]
measured_duration = duration_data.duration_end - duration_data.duration_start
duration_data = robot_context_tracker._store.data[0]
assert (
abs(measured_duration - RUNNING_TIME * 1e9) < 1e7
abs(duration_data.duration - RUNNING_TIME * 1e9) < 1e7
), "Measured duration for sync operation should closely match the expected duration."


@@ -171,10 +174,9 @@ async def async_running_operation() -> None:

await async_running_operation()

duration_data = robot_context_tracker._storage[0]
measured_duration = duration_data.duration_end - duration_data.duration_start
duration_data = robot_context_tracker._store.data[0]
assert (
abs(measured_duration - RUNNING_TIME * 1e9) < 1e7
abs(duration_data.duration - RUNNING_TIME * 1e9) < 1e7
), "Measured duration for async operation should closely match the expected duration."


@@ -193,10 +195,10 @@ async def async_error_prone_operation() -> None:
await async_error_prone_operation()

assert (
len(robot_context_tracker._storage) == 1
len(robot_context_tracker._store.data) == 1
), "Failed async operation should still be tracked."
assert (
robot_context_tracker._storage[0].state == RobotContextState.SHUTTING_DOWN
robot_context_tracker._store.data[0].state == RobotContextState.SHUTTING_DOWN
), "State should be SHUTTING_DOWN despite the exception."


@@ -217,11 +219,11 @@ async def second_async_calibrating() -> None:
await asyncio.gather(first_async_calibrating(), second_async_calibrating())

assert (
len(robot_context_tracker._storage) == 2
len(robot_context_tracker._store.data) == 2
), "Both concurrent async operations should be tracked."
assert all(
data.state == RobotContextState.CALIBRATING
for data in robot_context_tracker._storage
for data in robot_context_tracker._store.data
), "All tracked operations should be in CALIBRATING state."


@@ -236,7 +238,7 @@ def operation_without_tracking() -> None:
operation_without_tracking()

assert (
len(robot_context_tracker._storage) == 0
len(robot_context_tracker._store.data) == 0
), "Operation should not be tracked when tracking is disabled."


@@ -262,11 +264,19 @@ def analyzing_protocol() -> None:

robot_context_tracker.store()

with open(robot_context_tracker._storage_file_path, "r") as file:
with open(robot_context_tracker._store.metadata.data_file_location, "r") as file:
lines = file.readlines()
assert (
len(lines) == 4
), "All stored data + header should be written to the file."
assert len(lines) == 3, "All stored data should be written to the file."

split_lines: list[list[str]] = [line.strip().split(",") for line in lines]
assert all(
RawContextData.from_csv_row(line) for line in split_lines
), "All lines should be valid RawContextData instances."

with open(robot_context_tracker._store.metadata.headers_file_location, "r") as file:
headers = file.readlines()
assert len(headers) == 1, "Header should be written to the headers file."
assert tuple(headers[0].strip().split(",")) == RawContextData.headers()


@patch(
@@ -289,17 +299,11 @@ def calibrating_robot() -> None:
starting_robot()
calibrating_robot()

storage = robot_context_tracker._storage
storage = robot_context_tracker._store.data
assert all(
data.func_start > 0 for data in storage
), "All function start times should be greater than 0."
assert all(
data.duration_start > 0 for data in storage
), "All duration start times should be greater than 0."
assert all(
data.duration_end > 0 for data in storage
), "All duration end times should be greater than 0."
assert all(
data.duration_end > data.duration_start for data in storage
), "Duration end times should be greater than duration start times."
data.duration > 0 for data in storage
), "All duration times should be greater than 0."
assert len(storage) == 2, "Both operations should be tracked."
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Type definitions for performance tracking."""

from typing import Protocol, TypeVar, Callable, Any
from dataclasses import dataclass
from typing import Literal, Protocol, Tuple, TypeVar, Callable, Any
from pathlib import Path
from enum import Enum

@@ -54,3 +54,25 @@ def from_id(cls, state_id: int) -> "RobotContextState":
if state.state_id == state_id:
return state
raise ValueError(f"Invalid state id: {state_id}")


@dataclass(frozen=True)
class StorageMetadata:
"""Dataclass to store information about the storage."""

name: str
storage_dir: Path
storage_format: Literal["csv"]
headers: Tuple[str, ...]

@property
def data_file_location(self) -> Path:
"""The location of the data file."""
return self.storage_dir / self.name

@property
def headers_file_location(self) -> Path:
"""The location of the header file."""
return self.data_file_location.with_stem(
self.data_file_location.stem + "_headers"
)