diff --git a/executor/engine/job/utils.py b/executor/engine/job/utils.py index 0ec0a92..94a730a 100644 --- a/executor/engine/job/utils.py +++ b/executor/engine/job/utils.py @@ -41,6 +41,7 @@ def __init__(self, job: "Job", valid_status: T.List[JobStatusType]): _T = T.TypeVar("_T") _thread_locals = threading.local() + def _gen_initializer(gen_func, args=tuple(), kwargs={}): # pragma: no cover global _thread_locals if "_thread_locals" not in globals(): @@ -49,7 +50,7 @@ def _gen_initializer(gen_func, args=tuple(), kwargs={}): # pragma: no cover _thread_locals._generator = gen_func(*args, **kwargs) -def _gen_next(fut = None): # pragma: no cover +def _gen_next(fut=None): # pragma: no cover global _thread_locals if fut is None: return next(_thread_locals._generator) @@ -57,7 +58,7 @@ def _gen_next(fut = None): # pragma: no cover return next(fut) -def _gen_anext(fut = None): # pragma: no cover +def _gen_anext(fut=None): # pragma: no cover global _thread_locals if fut is None: return asyncio.run(_thread_locals._generator.__anext__()) @@ -69,6 +70,7 @@ class GeneratorWrapper(T.Generic[_T]): """ wrap a generator in executor pool """ + def __init__(self, job: "Job", fut: T.Optional[Future] = None): self._job = job self._fut = fut