Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async jupyter client #10

Merged
merged 30 commits into from
Feb 26, 2020
Merged
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0d52672
Asynchronous cell execution
davidbrochart Jan 29, 2020
12420d7
Keep only async versions of execute related methods
davidbrochart Jan 30, 2020
145a038
Split async_run_cell in 2 tasks
davidbrochart Jan 30, 2020
69c21a9
Longer timeout in test_async_parallel_notebooks
davidbrochart Jan 30, 2020
c57f37c
Longer timeout
davidbrochart Jan 30, 2020
31284ba
No timeout
davidbrochart Jan 30, 2020
3d690cd
Fix linter
davidbrochart Jan 30, 2020
1d49ea9
poll_period=0
davidbrochart Jan 30, 2020
fd8ea1f
No timeout
davidbrochart Jan 30, 2020
5be3d95
Add test_many_async_parallel_notebooks
davidbrochart Jan 30, 2020
c4be851
Fully async with async jupyter_client
davidbrochart Feb 2, 2020
4cd62ec
Get jupyter_client #506
davidbrochart Feb 2, 2020
4139f26
Fix pip install
davidbrochart Feb 2, 2020
9f233a8
-
davidbrochart Feb 2, 2020
d6fa25c
-
davidbrochart Feb 2, 2020
d1ff6b9
-
davidbrochart Feb 2, 2020
6f498f6
Python 3.8 only
davidbrochart Feb 2, 2020
aa9bbff
-
davidbrochart Feb 2, 2020
c0b1157
-
davidbrochart Feb 2, 2020
81fb8dc
Workaround for AsyncMock so that we support python3.7
davidbrochart Feb 4, 2020
323fe5c
Fix linter
davidbrochart Feb 4, 2020
55f36e9
Import asynccontextmanager from async_generator for Python < 3.7
davidbrochart Feb 4, 2020
4aa3466
python 3.5 compatibility
davidbrochart Feb 4, 2020
16b3f52
Add py35 and py36 tests
davidbrochart Feb 4, 2020
146cf26
Decrease multiprocessing pool to 2 workers
davidbrochart Feb 4, 2020
3265d5e
Decrease number of async notebook run to 4
davidbrochart Feb 4, 2020
3afc591
Rework polling on shell and iopub channels
davidbrochart Feb 8, 2020
f24ecba
Merge with master
davidbrochart Feb 12, 2020
9c33393
Fix linter, rename
davidbrochart Feb 12, 2020
f513ee7
Pin jupyter_client>=6.0.0
davidbrochart Feb 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fully async with async jupyter_client
davidbrochart committed Feb 2, 2020
commit c4be851b2bddcae66b90e1e43df7fbcd89510226
133 changes: 56 additions & 77 deletions nbclient/execute.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import base64
from textwrap import dedent
from contextlib import contextmanager
from contextlib import contextmanager, asynccontextmanager
from time import monotonic
from queue import Empty
import asyncio
@@ -287,9 +287,10 @@ def start_kernel_manager(self):
self.km = self.kernel_manager_class(config=self.config)
else:
self.km = self.kernel_manager_class(kernel_name=self.kernel_name, config=self.config)
self.km.client_class = 'jupyter_client.asynchronous.AsyncKernelClient'
return self.km

def start_new_kernel_client(self, **kwargs):
async def start_new_kernel_client(self, **kwargs):
"""Creates a new kernel client.
Parameters
@@ -316,16 +317,16 @@ def start_new_kernel_client(self, **kwargs):
self.kc = self.km.client()
self.kc.start_channels()
try:
self.kc.wait_for_ready(timeout=self.startup_timeout)
await self.kc.wait_for_ready(timeout=self.startup_timeout)
except RuntimeError:
self.kc.stop_channels()
self.km.shutdown_kernel()
raise
self.kc.allow_stdin = False
return self.kc

@contextmanager
def setup_kernel(self, **kwargs):
@asynccontextmanager
async def setup_kernel(self, **kwargs):
"""
Context manager for setting up the kernel to execute a notebook.
@@ -338,7 +339,7 @@ def setup_kernel(self, **kwargs):
self.start_kernel_manager()

if not self.km.has_kernel:
self.start_new_kernel_client(**kwargs)
await self.start_new_kernel_client(**kwargs)
try:
yield
finally:
@@ -370,11 +371,11 @@ async def async_execute(self, **kwargs):
"""
self.reset_execution_trackers()

with self.setup_kernel(**kwargs):
async with self.setup_kernel(**kwargs):
self.log.info("Executing notebook with kernel: %s" % self.kernel_name)
for index, cell in enumerate(self.nb.cells):
await self.async_execute_cell(cell, index)
info_msg = self._wait_for_reply(self.kc.kernel_info())
info_msg = await self._wait_for_reply(self.kc.kernel_info())
self.nb.metadata['language_info'] = info_msg['content']['language_info']
self.set_widgets_metadata()

@@ -455,16 +456,22 @@ def _update_display_id(self, display_id, msg):
outputs[output_idx]['data'] = out['data']
outputs[output_idx]['metadata'] = out['metadata']

def _poll_for_reply(self, msg_id, cell=None, timeout=None):
try:
# check with timeout if kernel is still alive
msg = self.kc.shell_channel.get_msg(timeout=timeout)
if msg['parent_header'].get('msg_id') == msg_id:
return msg
except Empty:
# received no message, check if kernel is still alive
self._check_alive()
# kernel still alive, wait for a message
async def _poll_for_reply(self, msg_id, cell=None, timeout=None):
if timeout is not None:
deadline = monotonic() + timeout
while True:
try:
# check with timeout if kernel is still alive
msg = await self.kc.shell_channel.get_msg(timeout=timeout)
if msg['parent_header'].get('msg_id') == msg_id:
return msg
else:
if timeout is not None:
timeout = max(0, deadline-monotonic())
except Empty:
# received no message, check if kernel is still alive
self._check_alive()
self._handle_timeout(timeout, cell)

def _get_timeout(self, cell):
if self.timeout_func is not None and cell is not None:
@@ -492,14 +499,14 @@ def _check_alive(self):
self.log.error("Kernel died while waiting for execute reply.")
raise DeadKernelError("Kernel died")

def _wait_for_reply(self, msg_id, cell=None):
async def _wait_for_reply(self, msg_id, cell=None):
# wait for finish, with timeout
timeout = self._get_timeout(cell)
cummulative_time = 0
self.shell_timeout_interval = 5
while True:
try:
msg = self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval)
msg = await self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval)
except Empty:
self._check_alive()
cummulative_time += self.shell_timeout_interval
@@ -528,60 +535,32 @@ def run_cell(self, cell, cell_index=0, store_history=False):
loop = get_loop()
return loop.run_until_complete(self.async_run_cell(cell, cell_index, store_history))

async def poll_exec_reply(self, poll_period, parent_msg_id, cell):
exec_timeout = self._get_timeout(cell)
exec_deadline = None
if exec_timeout is not None:
exec_deadline = monotonic() + exec_timeout
while True: # polling for exec reply
exec_reply = self._poll_for_reply(parent_msg_id, cell, 0)
if exec_reply is not None:
# cell executed, stop polling
self._polling_exec_reply = False
break
if self._passed_deadline(exec_deadline):
# cell still not executed after timeout, stop polling
self._handle_timeout(exec_timeout, cell)
self._polling_exec_reply = False
break
await asyncio.sleep(poll_period)
return exec_reply

async def poll_output_msg(self, poll_period, parent_msg_id, cell, cell_index):
iopub_deadline = None
while True: # polling for output message
async def poll_output_msg(self, parent_msg_id, cell, cell_index, timeout=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need to do some more manual testing to ensure we correctly timeout, pass execution context, and handle errors cleanly. Some of those patterns are tested in the synchronous path with some assumptions that might not hold for the async version.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to explain a bit more how I implemented the asynchronous functionality, there are now two tasks that are run in parallel:

  • _poll_for_reply: asynchronously awaits a reply on the shell channel (possibly with timeout).
  • poll_output_msg: asynchronously awaits a message on the IOPub channel for cell execution. We first await as long as no reply is received on the shell channel, and when a reply is received on the shell channel we cancel this task and launch it again with a timeout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that helps!

if timeout is not None:
deadline = monotonic() + timeout
while True:
try:
msg = self.kc.iopub_channel.get_msg(timeout=0)
msg = await self.kc.iopub_channel.get_msg(timeout=timeout)
except Empty:
msg = None
if self._polling_exec_reply:
# still waiting for execution to finish so we expect that
# output may not always be produced yet (keep on polling)
pass
# timeout
if self.raise_on_iopub_timeout:
raise CellTimeoutError.error_from_timeout_and_cell(
"Timeout waiting for IOPub output", self.iopub_timeout, cell
)
else:
# cell executed, we should receive remaining messages
# before the deadline
if iopub_deadline is None:
iopub_deadline = monotonic() + self.iopub_timeout
if self._passed_deadline(iopub_deadline):
if self.raise_on_iopub_timeout:
raise CellTimeoutError.error_from_timeout_and_cell(
"Timeout waiting for IOPub output", self.iopub_timeout, cell
)
else:
self.log.warning("Timeout waiting for IOPub output")
break
if msg is not None:
if msg['parent_header'].get('msg_id') != parent_msg_id:
# not an output from our execution
pass
else:
try:
# Will raise CellExecutionComplete when completed
self.process_message(msg, cell, cell_index)
except CellExecutionComplete:
break
await asyncio.sleep(poll_period)
self.log.warning("Timeout waiting for IOPub output")
return
if msg['parent_header'].get('msg_id') != parent_msg_id:
# not an output from our execution
pass
else:
try:
# Will raise CellExecutionComplete when completed
self.process_message(msg, cell, cell_index)
except CellExecutionComplete:
return
if timeout is not None:
timeout = max(0, deadline-monotonic())

async def async_run_cell(self, cell, cell_index=0, store_history=False):
parent_msg_id = self.kc.execute(
@@ -598,12 +577,12 @@ async def async_run_cell(self, cell, cell_index=0, store_history=False):
# after exec_reply was obtained from shell_channel, leading to the
# aforementioned dropped data.

poll_period = 0 # in second
self._polling_exec_reply = True
tasks = []
tasks.append(self.poll_exec_reply(poll_period, parent_msg_id, cell))
tasks.append(self.poll_output_msg(poll_period, parent_msg_id, cell, cell_index))
exec_reply, _ = await asyncio.gather(*tasks)
exec_timeout = self._get_timeout(cell)
task_poll_output_msg = asyncio.ensure_future(self.poll_output_msg(parent_msg_id, cell, cell_index))
exec_reply = await self._poll_for_reply(parent_msg_id, cell, exec_timeout)
if not task_poll_output_msg.done():
task_poll_output_msg.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never knew about cancel, can this mean a message can get lost?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, because messages are queued anyway in zmq. And we create a new task polling for messages just after that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But can it cancel, when it's off the zmq queue, say, it's in the json parser. How does it cancel like Ctrl-c.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't do anything between two await, so it depends on the implementation. For instance, in zmq.asyncio.Socket.recv_multipart we might await zmq.asyncio.Socket.recv several times and so cancelling in the middle of it can lead to losing a message, you're right.

Copy link
Member Author

@davidbrochart davidbrochart Feb 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way that I tried out was to leave the first poll_output_msg task running (with no timeout) and launch another one with a timeout when _poll_for_reply is done (and if the first poll_output_msg task is not done), and then asyncio.wait(both_tasks, return_when=asyncio.FIRST_COMPLETED), but it doesn't handle exceptions well (there is a return_when=asyncio.FIRST_EXCEPTION for that).
I can't think of a better solution right now.

await self.poll_output_msg(parent_msg_id, cell, cell_index, self.iopub_timeout)

# Return cell.outputs still for backwards compatibility
return exec_reply, cell.outputs
8 changes: 4 additions & 4 deletions nbclient/tests/test_execute.py
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@
from pebble import ProcessPool

from queue import Empty
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, AsyncMock, patch


addr_pat = re.compile(r'0x[0-9a-f]{7,9}')
@@ -109,7 +109,7 @@ def prepare_cell_mocks(*messages, reply_msg=None):
def shell_channel_message_mock():
# Return the message generator for
# self.kc.shell_channel.get_msg => {'parent_header': {'msg_id': parent_id}}
return MagicMock(
return AsyncMock(
return_value=ExecutorTestsBase.merge_dicts(
{'parent_header': {'msg_id': parent_id}}, reply_msg or {}
)
@@ -118,7 +118,7 @@ def shell_channel_message_mock():
def iopub_messages_mock():
# Return the message generator for
# self.kc.iopub_channel.get_msg => messages[i]
return MagicMock(
return AsyncMock(
side_effect=[
# Default the parent_header so mocks don't need to include this
ExecutorTestsBase.merge_dicts({'parent_header': {'msg_id': parent_id}}, msg)
@@ -676,7 +676,7 @@ def message_seq(messages):
while True:
yield Empty()
message_mock.side_effect = message_seq(list(message_mock.side_effect)[:-1])
executor.kc.shell_channel.get_msg = MagicMock(
executor.kc.shell_channel.get_msg = AsyncMock(
return_value={'parent_header': {'msg_id': executor.parent_id}}
)
executor.raise_on_iopub_timeout = True