From 5bc2793acb7490be99f1b0326deffa0fbfbae2e0 Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Mon, 6 Aug 2018 11:55:08 +0200 Subject: [PATCH 1/2] Enable kernel message filtering --- jupyter_server/services/kernels/handlers.py | 11 +++++-- .../services/kernels/kernelmanager.py | 30 ++++++++++++------- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/jupyter_server/services/kernels/handlers.py b/jupyter_server/services/kernels/handlers.py index 067420fed9..a3f52fb214 100644 --- a/jupyter_server/services/kernels/handlers.py +++ b/jupyter_server/services/kernels/handlers.py @@ -224,7 +224,7 @@ def pre_get(self): kernel = self.kernel_manager.get_kernel(self.kernel_id) self.session.key = kernel.session.key future = self.request_kernel_info() - + def give_up(): """Don't wait forever for the kernel to reply""" if future.done(): @@ -307,8 +307,13 @@ def on_message(self, msg): if channel not in self.channels: self.log.warning("No such channel: %r", channel) return - stream = self.channels[channel] - self.session.send(stream, msg) + am = self.kernel_manager.allowed_message_types + mt = msg['header']['msg_type'] + if am and mt not in am: + self.log.warning('Received message of type "%s", which is not allowed. Ignoring.' % mt) + else: + stream = self.channels[channel] + self.session.send(stream, msg) def _on_zmq_reply(self, stream, msg_list): idents, fed_msg_list = self.session.feed_identities(msg_list) diff --git a/jupyter_server/services/kernels/kernelmanager.py b/jupyter_server/services/kernels/kernelmanager.py index 28a0527fef..8356b50697 100644 --- a/jupyter_server/services/kernels/kernelmanager.py +++ b/jupyter_server/services/kernels/kernelmanager.py @@ -28,7 +28,11 @@ class MappingKernelManager(MultiKernelManager): - """A KernelManager that handles file mapping and HTTP error handling""" + """A KernelManager that handles + - File mapping + - HTTP error handling + - Kernel message filtering + """ @default('kernel_manager_class') def _default_kernel_manager_class(self): @@ -93,15 +97,15 @@ def _update_root_dir(self, proposal): no frontends are connected. """ ) - + kernel_info_timeout = Float(60, config=True, help="""Timeout for giving up on a kernel (in seconds). On starting and restarting kernels, we check whether the kernel is running and responsive by sending kernel_info_requests. This sets the timeout in seconds for how long the kernel can take - before being presumed dead. - This affects the MappingKernelManager (which handles kernel restarts) + before being presumed dead. + This affects the MappingKernelManager (which handles kernel restarts) and the ZMQChannelsHandler (which handles the startup). """ ) @@ -118,6 +122,12 @@ def __init__(self, **kwargs): super(MappingKernelManager, self).__init__(**kwargs) self.last_kernel_activity = utcnow() + allowed_message_types = List(trait=Unicode(), config=True, + help="""White list of allowed kernel message types. + When the list is empty, all message types are allowed. + """ + ) + #------------------------------------------------------------------------- # Methods for managing kernels and sessions #------------------------------------------------------------------------- @@ -287,32 +297,32 @@ def restart_kernel(self, kernel_id): # return a Future that will resolve when the kernel has successfully restarted channel = kernel.connect_shell() future = Future() - + def finish(): """Common cleanup when restart finishes/fails for any reason.""" if not channel.closed(): channel.close() loop.remove_timeout(timeout) kernel.remove_restart_callback(on_restart_failed, 'dead') - + def on_reply(msg): self.log.debug("Kernel info reply received: %s", kernel_id) finish() if not future.done(): future.set_result(msg) - + def on_timeout(): self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id) finish() if not future.done(): future.set_exception(gen.TimeoutError("Timeout waiting for restart")) - + def on_restart_failed(): self.log.warning("Restarting kernel failed: %s", kernel_id) finish() if not future.done(): future.set_exception(RuntimeError("Restart failed")) - + kernel.add_restart_callback(on_restart_failed, 'dead') kernel.session.send(channel, "kernel_info_request") channel.on_recv(on_reply) @@ -366,7 +376,7 @@ def _check_kernel_id(self, kernel_id): def start_watching_activity(self, kernel_id): """Start watching IOPub messages on a kernel for activity. - + - update last_activity on every message - record execution_state from status messages """ From 8898a19b1856b8b32b383bf952fa289c7d86866b Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Mon, 6 Aug 2018 12:57:48 +0200 Subject: [PATCH 2/2] Add test for kernel filtering --- jupyter_server/services/api/tests/test_api.py | 2 +- .../contents/tests/test_contents_api.py | 4 ++-- .../kernels/tests/test_kernels_api.py | 19 +++++++++++++++++++ jupyter_server/tests/launchserver.py | 19 +++++++++---------- jupyter_server/tests/test_files.py | 4 ++-- 5 files changed, 33 insertions(+), 15 deletions(-) diff --git a/jupyter_server/services/api/tests/test_api.py b/jupyter_server/services/api/tests/test_api.py index cc04564988..76e089d3c6 100644 --- a/jupyter_server/services/api/tests/test_api.py +++ b/jupyter_server/services/api/tests/test_api.py @@ -29,4 +29,4 @@ def test_get_status(self): assert data['kernels'] == 0 assert data['last_activity'].endswith('Z') assert data['started'].endswith('Z') - assert data['started'] == isoformat(self.notebook.web_app.settings['started']) + assert data['started'] == isoformat(self.server.web_app.settings['started']) diff --git a/jupyter_server/services/contents/tests/test_contents_api.py b/jupyter_server/services/contents/tests/test_contents_api.py index 931c8693b7..88c348df87 100644 --- a/jupyter_server/services/contents/tests/test_contents_api.py +++ b/jupyter_server/services/contents/tests/test_contents_api.py @@ -681,7 +681,7 @@ def patch_cp_root(self, dirname): """ Temporarily patch the root dir of our checkpoint manager. """ - cpm = self.notebook.contents_manager.checkpoints + cpm = self.server.contents_manager.checkpoints old_dirname = cpm.root_dir cpm.root_dir = dirname try: @@ -717,7 +717,7 @@ class GenericFileCheckpointsAPITest(APITest): def test_config_did_something(self): self.assertIsInstance( - self.notebook.contents_manager.checkpoints, + self.server.contents_manager.checkpoints, GenericFileCheckpoints, ) diff --git a/jupyter_server/services/kernels/tests/test_kernels_api.py b/jupyter_server/services/kernels/tests/test_kernels_api.py index c61bd0b085..2199a41ebe 100644 --- a/jupyter_server/services/kernels/tests/test_kernels_api.py +++ b/jupyter_server/services/kernels/tests/test_kernels_api.py @@ -3,6 +3,8 @@ import json import time +from traitlets.config import Config + from tornado.httpclient import HTTPRequest from tornado.ioloop import IOLoop from tornado.websocket import websocket_connect @@ -184,3 +186,20 @@ def test_connections(self): break model = self.kern_api.get(kid).json() self.assertEqual(model['connections'], 0) + + +class KernelFilterTest(ServerTestBase): + # A special install of ServerTestBase where only `kernel_info_request` + # messages are allowed. + + config = Config({ + 'ServerApp': { + 'MappingKernelManager': { + 'allowed_message_types': ['kernel_info_request'] + } + } + }) + + # Sanity check verifying that the configurable was properly set. + def test_config(self): + self.assertEqual(self.server.kernel_manager.allowed_message_types, ['kernel_info_request']) diff --git a/jupyter_server/tests/launchserver.py b/jupyter_server/tests/launchserver.py index 6ac796f294..72c8b25628 100644 --- a/jupyter_server/tests/launchserver.py +++ b/jupyter_server/tests/launchserver.py @@ -59,7 +59,7 @@ def wait_until_alive(cls): try: requests.get(url) except Exception as e: - if not cls.notebook_thread.is_alive(): + if not cls.server_thread.is_alive(): raise RuntimeError("The Jupyter server failed to start") time.sleep(POLL_INTERVAL) else: @@ -70,8 +70,8 @@ def wait_until_alive(cls): @classmethod def wait_until_dead(cls): """Wait for the server process to terminate after shutdown""" - cls.notebook_thread.join(timeout=MAX_WAITTIME) - if cls.notebook_thread.is_alive(): + cls.server_thread.join(timeout=MAX_WAITTIME) + if cls.server_thread.is_alive(): raise TimeoutError("Undead Jupyter server") @classmethod @@ -110,11 +110,10 @@ def tmp(*parts): data_dir = cls.data_dir = tmp('data') config_dir = cls.config_dir = tmp('config') runtime_dir = cls.runtime_dir = tmp('runtime') - cls.root_dir = tmp('notebooks') + cls.root_dir = tmp('root_dir') cls.env_patch = patch.dict('os.environ', { 'HOME': cls.home_dir, 'PYTHONPATH': os.pathsep.join(sys.path), - 'IPYTHONDIR': pjoin(cls.home_dir, '.ipython'), 'JUPYTER_NO_CONFIG': '1', # needed in the future 'JUPYTER_CONFIG_DIR' : config_dir, 'JUPYTER_DATA_DIR' : data_dir, @@ -140,7 +139,7 @@ def start_thread(): if 'asyncio' in sys.modules: import asyncio asyncio.set_event_loop(asyncio.new_event_loop()) - app = cls.notebook = ServerApp( + app = cls.server = ServerApp( port=cls.port, port_retries=0, open_browser=False, @@ -170,15 +169,15 @@ def start_thread(): # set the event, so failure to start doesn't cause a hang started.set() app.session_manager.close() - cls.notebook_thread = Thread(target=start_thread) - cls.notebook_thread.daemon = True - cls.notebook_thread.start() + cls.server_thread = Thread(target=start_thread) + cls.server_thread.daemon = True + cls.server_thread.start() started.wait() cls.wait_until_alive() @classmethod def teardown_class(cls): - cls.notebook.stop() + cls.server.stop() cls.wait_until_dead() cls.env_patch.stop() cls.path_patch.stop() diff --git a/jupyter_server/tests/test_files.py b/jupyter_server/tests/test_files.py index 36d0e2e5b2..c8220fd1c5 100644 --- a/jupyter_server/tests/test_files.py +++ b/jupyter_server/tests/test_files.py @@ -57,7 +57,7 @@ def test_hidden_files(self): r = self.request('GET', url_path_join('files', d, foo)) self.assertEqual(r.status_code, 404) - self.notebook.contents_manager.allow_hidden = True + self.server.contents_manager.allow_hidden = True try: for d in not_hidden: path = pjoin(rootdir, d.replace('/', os.sep)) @@ -75,7 +75,7 @@ def test_hidden_files(self): r.raise_for_status() self.assertEqual(r.text, foo) finally: - self.notebook.contents_manager.allow_hidden = False + self.server.contents_manager.allow_hidden = False def test_contents_manager(self): "make sure ContentsManager returns right files (ipynb, bin, txt)."