Skip to content

Commit

Permalink
Deprecate nest-asyncio using greenlet to reentry
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Feb 5, 2025
1 parent 1fa8a09 commit 0da8fd5
Show file tree
Hide file tree
Showing 8 changed files with 518 additions and 28 deletions.
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ keywords = ['workflow', 'multithreaded', 'rabbitmq']
requires-python = '>=3.9'
dependencies = [
'kiwipy[rmq]~=0.8.5',
'nest_asyncio~=1.5,>=1.5.1',
'greenlet~=3.1',
'pyyaml~=6.0',
]

Expand Down Expand Up @@ -132,7 +132,6 @@ module = [
'aiocontextvars.*',
'frozendict.*',
'kiwipy.*',
'nest_asyncio.*',
'tblib.*',
]
ignore_missing_imports = true
Expand Down
9 changes: 0 additions & 9 deletions src/plumpy/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,8 @@ class PlumpyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):

def get_event_loop(self) -> asyncio.AbstractEventLoop:
"""Return the patched event loop."""
import nest_asyncio

if self._loop is None:
self._loop = super().get_event_loop()
nest_asyncio.apply(self._loop)

return self._loop

Expand All @@ -55,12 +52,6 @@ def set_event_loop_policy() -> None:

def reset_event_loop_policy() -> None:
"""Reset the event loop policy to the default."""
loop = get_event_loop()

cls = loop.__class__

del cls._check_running # type: ignore
del cls._nest_patched # type: ignore

asyncio.set_event_loop_policy(None)

Expand Down
33 changes: 21 additions & 12 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1307,10 +1307,29 @@ def execute(self) -> Optional[Dict[str, Any]]:
:return: None if not terminated, otherwise `self.outputs`
"""
from plumpy.reentry import get_runner

if not self.has_terminated():
self.loop.run_until_complete(self.step_until_terminated())
coro = self.step_until_terminated()
with get_runner(loop=self.loop) as runner:
print('I got runner: ', runner)
result = runner.run(coro)

return result

else:
self.result()

return self.future().result()
async def step_until_terminated(self) -> Any:
"""If the process has not terminated,
run the current step and wait until the step finished.
This is the function run by the event loop (not ``step``).
"""
while not self.has_terminated():
await self.step()

return await self.future()

@ensure_not_closed
async def step(self) -> None:
Expand Down Expand Up @@ -1361,16 +1380,6 @@ async def step(self) -> None:
self._stepping = False
self._set_interrupt_action(None)

async def step_until_terminated(self) -> None:
"""If the process has not terminated,
run the current step and wait until the step finished.
This is the function run by the event loop (not ``step``).
"""
while not self.has_terminated():
await self.step()

# endregion

@ensure_not_closed
Expand Down
Loading

0 comments on commit 0da8fd5

Please sign in to comment.