Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix timing function cross-platform bug #14919

Merged
merged 5 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,42 @@

import csv
from pathlib import Path
import platform

from functools import wraps, partial
from time import perf_counter_ns
import os
from typing import Callable, TypeVar, cast


from functools import wraps
from time import perf_counter_ns, clock_gettime_ns, CLOCK_REALTIME
from typing import Callable, TypeVar
from typing_extensions import ParamSpec
from collections import deque
from performance_metrics.datashapes import (
RawContextData,
RobotContextState,
)
from performance_metrics.datashapes import RawContextData, RobotContextState

P = ParamSpec("P")
R = TypeVar("R")


def _get_timing_function() -> Callable[[], int]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's refactor this so that it only does the logic check once at import time rather than every time it's called

"""Returns a timing function for the current platform."""
time_function: Callable[[], int]
if platform.system() == "Linux":
from time import clock_gettime_ns, CLOCK_REALTIME

time_function = cast(
Callable[[], int], partial(clock_gettime_ns, CLOCK_REALTIME)
)
else:
from time import time_ns

time_function = time_ns

return time_function


timing_function = _get_timing_function()


class RobotContextTracker:
"""Tracks and stores robot context and execution duration for different operations."""

Expand All @@ -43,7 +63,7 @@ def inner_decorator(func: Callable[P, R]) -> Callable[P, R]:

@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
function_start_time = clock_gettime_ns(CLOCK_REALTIME)
function_start_time = timing_function()
duration_start_time = perf_counter_ns()
try:
result = func(*args, **kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import pytest
from performance_metrics.robot_context_tracker import RobotContextTracker
from performance_metrics.datashapes import RobotContextState
from time import sleep
from time import sleep, time_ns
from unittest.mock import patch

# Corrected times in seconds
STARTING_TIME = 0.001
Expand Down Expand Up @@ -140,6 +141,24 @@ async def async_analyzing_operation() -> None:
), "State should be ANALYZING_PROTOCOL."


def test_sync_operation_timing_accuracy(
robot_context_tracker: RobotContextTracker,
) -> None:
"""Tests the timing accuracy of a synchronous operation tracking."""

@robot_context_tracker.track(state=RobotContextState.RUNNING_PROTOCOL)
def running_operation() -> None:
sleep(RUNNING_TIME)

running_operation()

duration_data = robot_context_tracker._storage[0]
measured_duration = duration_data.duration_end - duration_data.duration_start
assert (
abs(measured_duration - RUNNING_TIME * 1e9) < 1e7
), "Measured duration for sync operation should closely match the expected duration."


@pytest.mark.asyncio
async def test_async_operation_timing_accuracy(
robot_context_tracker: RobotContextTracker,
Expand Down Expand Up @@ -249,3 +268,39 @@ def analyzing_protocol() -> None:
assert (
len(lines) == 4
), "All stored data + header should be written to the file."


@patch(
"performance_metrics.robot_context_tracker._get_timing_function",
return_value=time_ns,
)
def test_using_non_linux_time_functions(tmp_path: Path) -> None:
"""Tests tracking operations using non-Linux time functions."""
file_path = tmp_path / "test_file.csv"
robot_context_tracker = RobotContextTracker(file_path, should_track=True)

@robot_context_tracker.track(state=RobotContextState.STARTING_UP)
def starting_robot() -> None:
sleep(STARTING_TIME)

@robot_context_tracker.track(state=RobotContextState.CALIBRATING)
def calibrating_robot() -> None:
sleep(CALIBRATING_TIME)

starting_robot()
calibrating_robot()

storage = robot_context_tracker._storage
assert all(
data.func_start > 0 for data in storage
), "All function start times should be greater than 0."
assert all(
data.duration_start > 0 for data in storage
), "All duration start times should be greater than 0."
assert all(
data.duration_end > 0 for data in storage
), "All duration end times should be greater than 0."
assert all(
data.duration_end > data.duration_start for data in storage
), "Duration end times should be greater than duration start times."
assert len(storage) == 2, "Both operations should be tracked."