Skip to content

Commit

Permalink
Merge pull request #22 from SylvainCorlay/filter-message
Browse files Browse the repository at this point in the history
WIP - Enable kernel message filtering
  • Loading branch information
SylvainCorlay authored Aug 6, 2018
2 parents bdf256c + 8898a19 commit e0f3c08
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 28 deletions.
2 changes: 1 addition & 1 deletion jupyter_server/services/api/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ def test_get_status(self):
assert data['kernels'] == 0
assert data['last_activity'].endswith('Z')
assert data['started'].endswith('Z')
assert data['started'] == isoformat(self.notebook.web_app.settings['started'])
assert data['started'] == isoformat(self.server.web_app.settings['started'])
4 changes: 2 additions & 2 deletions jupyter_server/services/contents/tests/test_contents_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ def patch_cp_root(self, dirname):
"""
Temporarily patch the root dir of our checkpoint manager.
"""
cpm = self.notebook.contents_manager.checkpoints
cpm = self.server.contents_manager.checkpoints
old_dirname = cpm.root_dir
cpm.root_dir = dirname
try:
Expand Down Expand Up @@ -717,7 +717,7 @@ class GenericFileCheckpointsAPITest(APITest):
def test_config_did_something(self):

self.assertIsInstance(
self.notebook.contents_manager.checkpoints,
self.server.contents_manager.checkpoints,
GenericFileCheckpoints,
)

Expand Down
11 changes: 8 additions & 3 deletions jupyter_server/services/kernels/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def pre_get(self):
kernel = self.kernel_manager.get_kernel(self.kernel_id)
self.session.key = kernel.session.key
future = self.request_kernel_info()

def give_up():
"""Don't wait forever for the kernel to reply"""
if future.done():
Expand Down Expand Up @@ -307,8 +307,13 @@ def on_message(self, msg):
if channel not in self.channels:
self.log.warning("No such channel: %r", channel)
return
stream = self.channels[channel]
self.session.send(stream, msg)
am = self.kernel_manager.allowed_message_types
mt = msg['header']['msg_type']
if am and mt not in am:
self.log.warning('Received message of type "%s", which is not allowed. Ignoring.' % mt)
else:
stream = self.channels[channel]
self.session.send(stream, msg)

def _on_zmq_reply(self, stream, msg_list):
idents, fed_msg_list = self.session.feed_identities(msg_list)
Expand Down
30 changes: 20 additions & 10 deletions jupyter_server/services/kernels/kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@


class MappingKernelManager(MultiKernelManager):
"""A KernelManager that handles file mapping and HTTP error handling"""
"""A KernelManager that handles
- File mapping
- HTTP error handling
- Kernel message filtering
"""

@default('kernel_manager_class')
def _default_kernel_manager_class(self):
Expand Down Expand Up @@ -93,15 +97,15 @@ def _update_root_dir(self, proposal):
no frontends are connected.
"""
)

kernel_info_timeout = Float(60, config=True,
help="""Timeout for giving up on a kernel (in seconds).
On starting and restarting kernels, we check whether the
kernel is running and responsive by sending kernel_info_requests.
This sets the timeout in seconds for how long the kernel can take
before being presumed dead.
This affects the MappingKernelManager (which handles kernel restarts)
before being presumed dead.
This affects the MappingKernelManager (which handles kernel restarts)
and the ZMQChannelsHandler (which handles the startup).
"""
)
Expand All @@ -118,6 +122,12 @@ def __init__(self, **kwargs):
super(MappingKernelManager, self).__init__(**kwargs)
self.last_kernel_activity = utcnow()

allowed_message_types = List(trait=Unicode(), config=True,
help="""White list of allowed kernel message types.
When the list is empty, all message types are allowed.
"""
)

#-------------------------------------------------------------------------
# Methods for managing kernels and sessions
#-------------------------------------------------------------------------
Expand Down Expand Up @@ -287,32 +297,32 @@ def restart_kernel(self, kernel_id):
# return a Future that will resolve when the kernel has successfully restarted
channel = kernel.connect_shell()
future = Future()

def finish():
"""Common cleanup when restart finishes/fails for any reason."""
if not channel.closed():
channel.close()
loop.remove_timeout(timeout)
kernel.remove_restart_callback(on_restart_failed, 'dead')

def on_reply(msg):
self.log.debug("Kernel info reply received: %s", kernel_id)
finish()
if not future.done():
future.set_result(msg)

def on_timeout():
self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id)
finish()
if not future.done():
future.set_exception(gen.TimeoutError("Timeout waiting for restart"))

def on_restart_failed():
self.log.warning("Restarting kernel failed: %s", kernel_id)
finish()
if not future.done():
future.set_exception(RuntimeError("Restart failed"))

kernel.add_restart_callback(on_restart_failed, 'dead')
kernel.session.send(channel, "kernel_info_request")
channel.on_recv(on_reply)
Expand Down Expand Up @@ -366,7 +376,7 @@ def _check_kernel_id(self, kernel_id):

def start_watching_activity(self, kernel_id):
"""Start watching IOPub messages on a kernel for activity.
- update last_activity on every message
- record execution_state from status messages
"""
Expand Down
19 changes: 19 additions & 0 deletions jupyter_server/services/kernels/tests/test_kernels_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import json
import time

from traitlets.config import Config

from tornado.httpclient import HTTPRequest
from tornado.ioloop import IOLoop
from tornado.websocket import websocket_connect
Expand Down Expand Up @@ -184,3 +186,20 @@ def test_connections(self):
break
model = self.kern_api.get(kid).json()
self.assertEqual(model['connections'], 0)


class KernelFilterTest(ServerTestBase):
# A special install of ServerTestBase where only `kernel_info_request`
# messages are allowed.

config = Config({
'ServerApp': {
'MappingKernelManager': {
'allowed_message_types': ['kernel_info_request']
}
}
})

# Sanity check verifying that the configurable was properly set.
def test_config(self):
self.assertEqual(self.server.kernel_manager.allowed_message_types, ['kernel_info_request'])
19 changes: 9 additions & 10 deletions jupyter_server/tests/launchserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def wait_until_alive(cls):
try:
requests.get(url)
except Exception as e:
if not cls.notebook_thread.is_alive():
if not cls.server_thread.is_alive():
raise RuntimeError("The Jupyter server failed to start")
time.sleep(POLL_INTERVAL)
else:
Expand All @@ -70,8 +70,8 @@ def wait_until_alive(cls):
@classmethod
def wait_until_dead(cls):
"""Wait for the server process to terminate after shutdown"""
cls.notebook_thread.join(timeout=MAX_WAITTIME)
if cls.notebook_thread.is_alive():
cls.server_thread.join(timeout=MAX_WAITTIME)
if cls.server_thread.is_alive():
raise TimeoutError("Undead Jupyter server")

@classmethod
Expand Down Expand Up @@ -110,11 +110,10 @@ def tmp(*parts):
data_dir = cls.data_dir = tmp('data')
config_dir = cls.config_dir = tmp('config')
runtime_dir = cls.runtime_dir = tmp('runtime')
cls.root_dir = tmp('notebooks')
cls.root_dir = tmp('root_dir')
cls.env_patch = patch.dict('os.environ', {
'HOME': cls.home_dir,
'PYTHONPATH': os.pathsep.join(sys.path),
'IPYTHONDIR': pjoin(cls.home_dir, '.ipython'),
'JUPYTER_NO_CONFIG': '1', # needed in the future
'JUPYTER_CONFIG_DIR' : config_dir,
'JUPYTER_DATA_DIR' : data_dir,
Expand All @@ -140,7 +139,7 @@ def start_thread():
if 'asyncio' in sys.modules:
import asyncio
asyncio.set_event_loop(asyncio.new_event_loop())
app = cls.notebook = ServerApp(
app = cls.server = ServerApp(
port=cls.port,
port_retries=0,
open_browser=False,
Expand Down Expand Up @@ -170,15 +169,15 @@ def start_thread():
# set the event, so failure to start doesn't cause a hang
started.set()
app.session_manager.close()
cls.notebook_thread = Thread(target=start_thread)
cls.notebook_thread.daemon = True
cls.notebook_thread.start()
cls.server_thread = Thread(target=start_thread)
cls.server_thread.daemon = True
cls.server_thread.start()
started.wait()
cls.wait_until_alive()

@classmethod
def teardown_class(cls):
cls.notebook.stop()
cls.server.stop()
cls.wait_until_dead()
cls.env_patch.stop()
cls.path_patch.stop()
Expand Down
4 changes: 2 additions & 2 deletions jupyter_server/tests/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_hidden_files(self):
r = self.request('GET', url_path_join('files', d, foo))
self.assertEqual(r.status_code, 404)

self.notebook.contents_manager.allow_hidden = True
self.server.contents_manager.allow_hidden = True
try:
for d in not_hidden:
path = pjoin(rootdir, d.replace('/', os.sep))
Expand All @@ -75,7 +75,7 @@ def test_hidden_files(self):
r.raise_for_status()
self.assertEqual(r.text, foo)
finally:
self.notebook.contents_manager.allow_hidden = False
self.server.contents_manager.allow_hidden = False

def test_contents_manager(self):
"make sure ContentsManager returns right files (ipynb, bin, txt)."
Expand Down

0 comments on commit e0f3c08

Please sign in to comment.