Skip to content

Commit

Permalink
chore(api, performance-metrics): clean up performance-metrics tracking (
Browse files Browse the repository at this point in the history
  • Loading branch information
DerekMaggio authored Jun 5, 2024
1 parent 42633bd commit dfbde9f
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 141 deletions.
2 changes: 0 additions & 2 deletions api/src/opentrons/cli/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
)

from opentrons_shared_data.robot.dev_types import RobotType
from opentrons.util.performance_helpers import track_analysis

OutputKind = Literal["json", "human-json"]

Expand Down Expand Up @@ -198,7 +197,6 @@ def _get_return_code(analysis: RunResult) -> int:
return 0


@track_analysis
async def _do_analyze(protocol_source: ProtocolSource) -> RunResult:

runner = await create_simulating_runner(
Expand Down
52 changes: 38 additions & 14 deletions api/src/opentrons/util/performance_helpers.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
"""Performance helpers for tracking robot context."""

import functools
from pathlib import Path
from opentrons_shared_data.performance.dev_types import (
SupportsTracking,
F,
UnderlyingFunction,
UnderlyingFunctionParameters,
UnderlyingFunctionReturn,
RobotContextState,
)
from opentrons_shared_data.robot.dev_types import RobotTypeEnum
from typing import Callable, Type
import typing
from opentrons.config import (
feature_flags as ff,
get_performance_metrics_data_dir,
robot_configs,
feature_flags as ff,
)


Expand All @@ -27,11 +30,14 @@ def __init__(self, storage_location: Path, should_track: bool) -> None:
"""Initialize the stubbed tracker."""
pass

def track(self, state: RobotContextState) -> Callable[[F], F]:
"""Return the function unchanged."""
def track(
self,
state: "RobotContextState",
) -> typing.Callable[[UnderlyingFunction], UnderlyingFunction]:
"""Return the original function."""

def inner_decorator(func: F) -> F:
"""Return the function unchanged."""
def inner_decorator(func: UnderlyingFunction) -> UnderlyingFunction:
"""Return the original function."""
return func

return inner_decorator
Expand All @@ -41,7 +47,7 @@ def store(self) -> None:
pass


def _handle_package_import() -> Type[SupportsTracking]:
def _handle_package_import() -> typing.Type[SupportsTracking]:
"""Handle the import of the performance_metrics package.
If the package is not available, return a stubbed tracker.
Expand All @@ -58,19 +64,37 @@ def _handle_package_import() -> Type[SupportsTracking]:
_robot_context_tracker: SupportsTracking | None = None


# TODO: derek maggio (06-03-2024): investigate if _should_track should be
# reevaluated each time _get_robot_context_tracker is called. I think this
# might get stuck in a state where after the first call, _should_track is
# always considered the initial value. It might miss changes to the feature
# flag. The easiest way to test this is on a robot when that is working.


def _get_robot_context_tracker() -> SupportsTracking:
"""Singleton for the robot context tracker."""
global _robot_context_tracker
if _robot_context_tracker is None:
# TODO: replace with path lookup and should_store lookup
_robot_context_tracker = package_to_use(
get_performance_metrics_data_dir(), _should_track
)
return _robot_context_tracker


def track_analysis(func: F) -> F:
"""Track the analysis of a protocol."""
return _get_robot_context_tracker().track(RobotContextState.ANALYZING_PROTOCOL)(
func
)
def track_analysis(func: UnderlyingFunction) -> UnderlyingFunction:
"""Track the analysis of a protocol and store each run."""
# TODO: derek maggio (06-03-2024): generalize creating wrapper functions for tracking different states
tracker: SupportsTracking = _get_robot_context_tracker()
wrapped = tracker.track(state=RobotContextState.ANALYZING_PROTOCOL)(func)

@functools.wraps(func)
def wrapper(
*args: UnderlyingFunctionParameters.args,
**kwargs: UnderlyingFunctionParameters.kwargs
) -> UnderlyingFunctionReturn:
try:
return wrapped(*args, **kwargs)
finally:
tracker.store()

return wrapper
67 changes: 2 additions & 65 deletions api/tests/opentrons/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,13 @@
import tempfile
import textwrap

from dataclasses import dataclass, replace
from dataclasses import dataclass
from typing import Any, Dict, Iterator, List, Optional
from pathlib import Path

import pytest
from click.testing import CliRunner
from opentrons_shared_data.performance.dev_types import (
RobotContextState,
)
from opentrons.util.performance_helpers import _get_robot_context_tracker


# Enable tracking for the RobotContextTracker
# This must come before the import of the analyze CLI
context_tracker = _get_robot_context_tracker()

# Ignore the type error for the next line, as we're setting a private attribute for testing purposes
context_tracker._should_track = True # type: ignore[attr-defined]

from opentrons.cli.analyze import analyze # noqa: E402


@pytest.fixture
def override_data_store(tmp_path: Path) -> Iterator[None]:
"""Override the data store metadata for the RobotContextTracker."""
old_store = context_tracker._store # type: ignore[attr-defined]
old_metadata = old_store.metadata
new_metadata = replace(old_metadata, storage_dir=tmp_path)
context_tracker._store = old_store.__class__(metadata=new_metadata) # type: ignore[attr-defined]
context_tracker._store.setup() # type: ignore[attr-defined]
yield
context_tracker._store = old_store # type: ignore[attr-defined]
from opentrons.cli.analyze import analyze


def _list_fixtures(version: int) -> Iterator[Path]:
Expand Down Expand Up @@ -285,41 +260,3 @@ def test_python_error_line_numbers(
assert result.json_output is not None
[error] = result.json_output["errors"]
assert error["detail"] == expected_detail


@pytest.mark.usefixtures("override_data_store")
@pytest.mark.parametrize("output", ["--json-output", "--human-json-output"])
def test_track_analysis(tmp_path: Path, output: str) -> None:
"""Test that the RobotContextTracker tracks analysis."""
protocol_source = textwrap.dedent(
"""
requirements = {"apiLevel": "2.15"}
def run(protocol):
pass
"""
)
protocol_source_file = tmp_path / "protocol.py"
protocol_source_file.write_text(protocol_source, encoding="utf-8")
store = context_tracker._store # type: ignore[attr-defined]

num_storage_entities_before_analysis = len(store._data)

_get_analysis_result([protocol_source_file], output)

assert len(store._data) == num_storage_entities_before_analysis + 1

with open(store.metadata.data_file_location, "r") as f:
stored_data = f.readlines()
assert len(stored_data) == 0

context_tracker.store()

with open(store.metadata.data_file_location, "r") as f:
stored_data = f.readlines()
stored_data = [line.strip() for line in stored_data if line.strip()]
assert len(stored_data) == 1
state_id, start_time, duration = stored_data[0].strip().split(",")
assert state_id == str(RobotContextState.ANALYZING_PROTOCOL.state_id)
assert start_time.isdigit()
assert duration.isdigit()
120 changes: 84 additions & 36 deletions performance-metrics/src/performance_metrics/robot_context_tracker.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
"""Module for tracking robot context and execution duration for different operations."""

import inspect
from pathlib import Path
import platform

from functools import wraps, partial
from functools import partial, wraps
from time import perf_counter_ns
from typing import Callable, TypeVar, cast, Literal, Final
import typing


from typing_extensions import ParamSpec
from performance_metrics.datashapes import (
RawContextData,
)
Expand All @@ -17,20 +16,20 @@
RobotContextState,
SupportsTracking,
MetricsMetadata,
UnderlyingFunction,
UnderlyingFunctionReturn,
UnderlyingFunctionParameters,
)

P = ParamSpec("P")
R = TypeVar("R")


def _get_timing_function() -> Callable[[], int]:
def _get_timing_function() -> typing.Callable[[], int]:
"""Returns a timing function for the current platform."""
time_function: Callable[[], int]
time_function: typing.Callable[[], int]
if platform.system() == "Linux":
from time import clock_gettime_ns, CLOCK_REALTIME

time_function = cast(
Callable[[], int], partial(clock_gettime_ns, CLOCK_REALTIME)
time_function = typing.cast(
typing.Callable[[], int], partial(clock_gettime_ns, CLOCK_REALTIME)
)
else:
from time import time_ns
Expand All @@ -46,9 +45,11 @@ def _get_timing_function() -> Callable[[], int]:
class RobotContextTracker(SupportsTracking):
"""Tracks and stores robot context and execution duration for different operations."""

METADATA_NAME: Final[Literal["robot_context_data"]] = "robot_context_data"
METADATA_NAME: typing.Final[
typing.Literal["robot_context_data"]
] = "robot_context_data"

def __init__(self, storage_location: Path, should_track: bool = False) -> None:
def __init__(self, storage_location: Path, should_track: bool) -> None:
"""Initializes the RobotContextTracker with an empty storage list."""
self._store = MetricsStore[RawContextData](
MetricsMetadata(
Expand All @@ -62,39 +63,86 @@ def __init__(self, storage_location: Path, should_track: bool = False) -> None:
if self._should_track:
self._store.setup()

def track(self, state: RobotContextState) -> Callable: # type: ignore
"""Decorator factory for tracking the execution duration and state of robot operations.
def track(
self,
state: RobotContextState,
) -> typing.Callable[[UnderlyingFunction], UnderlyingFunction]:
"""Tracks the given function and its execution duration.
If tracking is disabled, the function is called immediately and its result is returned.
Args:
state: The state to track for the decorated function.
func_to_track: The function to track.
state: The state of the robot context during the function execution.
*args: The arguments to pass to the function.
**kwargs: The keyword arguments to pass to the function.
Returns:
Callable: A decorator that wraps a function to track its execution duration and state.
If the function executes successfully, its return value is returned.
If the function raises an exception, the exception the function raised is raised.
"""

def inner_decorator(func: Callable[P, R]) -> Callable[P, R]:
def inner_decorator(func_to_track: UnderlyingFunction) -> UnderlyingFunction:
if not self._should_track:
return func

@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
function_start_time = timing_function()
duration_start_time = perf_counter_ns()
try:
result = func(*args, **kwargs)
finally:
duration_end_time = perf_counter_ns()
self._store.add(
RawContextData(
func_start=function_start_time,
duration=duration_end_time - duration_start_time,
state=state,
return func_to_track

if inspect.iscoroutinefunction(func_to_track):

@wraps(func_to_track)
async def async_wrapper(
*args: UnderlyingFunctionParameters.args,
**kwargs: UnderlyingFunctionParameters.kwargs
) -> UnderlyingFunctionReturn:
function_start_time = timing_function()
duration_start_time = perf_counter_ns()

try:
result = typing.cast(
UnderlyingFunctionReturn,
await func_to_track(*args, **kwargs),
)
finally:
duration_end_time = perf_counter_ns()

self._store.add(
RawContextData(
func_start=function_start_time,
duration=duration_end_time - duration_start_time,
state=state,
)
)

return result

return async_wrapper
else:

@wraps(func_to_track)
def wrapper(
*args: UnderlyingFunctionParameters.args,
**kwargs: UnderlyingFunctionParameters.kwargs
) -> UnderlyingFunctionReturn:
function_start_time = timing_function()
duration_start_time = perf_counter_ns()

try:
result = typing.cast(
UnderlyingFunctionReturn, func_to_track(*args, **kwargs)
)
finally:
duration_end_time = perf_counter_ns()

self._store.add(
RawContextData(
func_start=function_start_time,
duration=duration_end_time - duration_start_time,
state=state,
)
)
)

return result
return result

return wrapper
return wrapper

return inner_decorator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@ async def test_storing_to_file(tmp_path: Path) -> None:
"""Tests storing the tracked data to a file."""
robot_context_tracker = RobotContextTracker(tmp_path, should_track=True)

@robot_context_tracker.track(state=RobotContextState.STARTING_UP)
@robot_context_tracker.track(RobotContextState.STARTING_UP)
def starting_robot() -> None:
sleep(STARTING_TIME)

@robot_context_tracker.track(state=RobotContextState.CALIBRATING)
def calibrating_robot() -> None:
@robot_context_tracker.track(RobotContextState.CALIBRATING)
async def calibrating_robot() -> None:
sleep(CALIBRATING_TIME)

@robot_context_tracker.track(state=RobotContextState.ANALYZING_PROTOCOL)
@robot_context_tracker.track(RobotContextState.ANALYZING_PROTOCOL)
def analyzing_protocol() -> None:
sleep(ANALYZING_TIME)

starting_robot()
calibrating_robot()
await calibrating_robot()
analyzing_protocol()

robot_context_tracker.store()
Expand Down
Loading

0 comments on commit dfbde9f

Please sign in to comment.