Skip to content

Commit

Permalink
chore(api): fix typing of task_queue workers (#14755)
Browse files Browse the repository at this point in the history
task_queue has these cleanup and run functions that you pass in with
some arguments to bind. None of this was typesafe because it predated
ParamSpec. Now that we have ParamSpec, we can make these typesafe.

## changelog
typing only in `task_queue`. `set_run_func` and `set_cleanup_func` get
paramspecs. `set_run_func` is trivial; `set_cleanup_func` has a
requirement to have an `error` argument (which has to be the first
argument, for "paramspec still isn't good enough" reasons, but that was
already an implicit requirement).

also, we do a manual closure instead of `partial()` because I'm pretty
sure `partial()` isn't actually typesafe.
  • Loading branch information
sfoster1 authored Mar 29, 2024
1 parent 75d8795 commit 0fbb4c7
Showing 1 changed file with 24 additions and 30 deletions.
54 changes: 24 additions & 30 deletions api/src/opentrons/protocol_runner/task_queue.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,12 @@
"""Asynchronous task queue to accomplish a protocol run."""
import asyncio
import logging
from functools import partial
from typing import Any, Awaitable, Callable, Optional
from typing_extensions import Protocol as Callback
from typing import Any, Awaitable, Callable, Optional, ParamSpec, Concatenate

log = logging.getLogger(__name__)


class CleanupFunc(Callback):
"""Expected cleanup function signature."""

def __call__(
self,
error: Optional[Exception],
) -> Any:
"""Cleanup, optionally taking an error thrown.
Return value will not be used.
"""
...
CleanupFuncInput = ParamSpec("CleanupFuncInput")
RunFuncInput = ParamSpec("RunFuncInput")


class TaskQueue:
Expand All @@ -32,41 +19,48 @@ def __init__(
self,
# cleanup_func: CleanupFunc,
) -> None:
"""Initialize the TaskQueue.
Args:
cleanup_func: A function to call at run function completion
with any error raised by the run function.
"""
"""Initialize the TaskQueue."""
self._cleanup_func: Optional[
Callable[[Optional[Exception]], Any]
] = None # CleanupFunc = cleanup_func
Callable[[Optional[Exception]], Awaitable[Any]]
] = None

self._run_func: Optional[Callable[[], Any]] = None
self._run_task: Optional["asyncio.Task[None]"] = None
self._ok_to_join_event: asyncio.Event = asyncio.Event()

def set_cleanup_func(
self,
func: Callable[..., Awaitable[Any]],
**kwargs: Any,
func: Callable[
Concatenate[Optional[Exception], CleanupFuncInput], Awaitable[Any]
],
*args: CleanupFuncInput.args,
**kwargs: CleanupFuncInput.kwargs,
) -> None:
"""Add the protocol cleanup task to the queue.
The "cleanup" task will be run after the "run" task.
"""
self._cleanup_func = partial(func, **kwargs)

async def _do_cleanup(error: Optional[Exception]) -> None:
await func(error, *args, **kwargs)

self._cleanup_func = _do_cleanup

def set_run_func(
self,
func: Callable[..., Awaitable[Any]],
**kwargs: Any,
func: Callable[RunFuncInput, Awaitable[Any]],
*args: RunFuncInput.args,
**kwargs: RunFuncInput.kwargs,
) -> None:
"""Add the protocol run task to the queue.
The "run" task will be run first, before the "cleanup" task.
"""
self._run_func = partial(func, **kwargs)

async def _do_run() -> None:
await func(*args, **kwargs)

self._run_func = _do_run

def start(self) -> None:
"""Start running tasks in the queue."""
Expand Down

0 comments on commit 0fbb4c7

Please sign in to comment.