From 128c983b33362a870b9ff0dac8fec1eeda99a651 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 2 Dec 2020 10:13:35 -0800 Subject: [PATCH 01/14] Raise plugin exceptions on Worker.start() --- distributed/worker.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index 83e63e4a737..2764a0d1fc9 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1139,9 +1139,12 @@ async def start(self): setproctitle("dask-worker [%s]" % self.address) - await asyncio.gather( + plugins_msgs = await asyncio.gather( *[self.plugin_add(plugin=plugin) for plugin in self._pending_plugins] ) + for msg in plugins_msgs: + if msg["status"] != "OK": + raise msg["exception"].data self._pending_plugins = () await self._register_with_scheduler() From 3224d42d5a102f185c6f2d951e631a718f9c07ee Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 2 Dec 2020 10:35:37 -0800 Subject: [PATCH 02/14] Add test for Worker plugin exceptions --- distributed/tests/test_worker.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index b41e174bf5f..f4e2213ae31 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -407,6 +407,24 @@ def __str__(self): assert "Bar" in str(e.__cause__) +@pytest.mark.asyncio +async def test_plugin_exception(): + class MyException(Exception): + def __init__(self, msg): + self.msg = msg + + def __str__(self): + return "MyException(%s)" % self.msg + + class MyPlugin: + def setup(self, worker=None): + raise MyException("Foo") + + s = await Scheduler(port=8007) + with pytest.raises(MyException): + await Worker(s.address, plugins={MyPlugin(),}) + + @gen_cluster() async def test_gather(s, a, b): b.data["x"] = 1 From 3eae880aff92491af439d2d16ee310ca6e54b56b Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 2 Dec 2020 10:58:33 -0800 Subject: [PATCH 03/14] Fix black formatting --- distributed/tests/test_worker.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index f4e2213ae31..efb2c80ae20 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -422,7 +422,12 @@ def setup(self, worker=None): s = await Scheduler(port=8007) with pytest.raises(MyException): - await Worker(s.address, plugins={MyPlugin(),}) + await Worker( + s.address, + plugins={ + MyPlugin(), + }, + ) @gen_cluster() From fa81d3d6f0861f1720dd7023034176885b05e464 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 2 Dec 2020 14:48:18 -0800 Subject: [PATCH 04/14] Apply review suggestions to test_plugin_exception --- distributed/tests/test_worker.py | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index efb2c80ae20..bc256ae10e8 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -408,26 +408,20 @@ def __str__(self): @pytest.mark.asyncio -async def test_plugin_exception(): - class MyException(Exception): - def __init__(self, msg): - self.msg = msg - - def __str__(self): - return "MyException(%s)" % self.msg - +async def test_plugin_exception(cleanup): class MyPlugin: def setup(self, worker=None): - raise MyException("Foo") + raise ValueError("Setup failed") - s = await Scheduler(port=8007) - with pytest.raises(MyException): - await Worker( - s.address, - plugins={ - MyPlugin(), - }, - ) + async with Scheduler(port=0) as s: + with pytest.raises(ValueError, match="Setup failed"): + async with Worker( + s.address, + plugins={ + MyPlugin(), + }, + ) as w: + pass @gen_cluster() From cc3bec5bcf9d5575acf27c2b0252cb9f6224bc33 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 3 Dec 2020 07:52:52 -0800 Subject: [PATCH 05/14] Raise exceptions in plugin_add rather then serializing them --- distributed/worker.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 2764a0d1fc9..048137d2498 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1139,12 +1139,9 @@ async def start(self): setproctitle("dask-worker [%s]" % self.address) - plugins_msgs = await asyncio.gather( + await asyncio.gather( *[self.plugin_add(plugin=plugin) for plugin in self._pending_plugins] ) - for msg in plugins_msgs: - if msg["status"] != "OK": - raise msg["exception"].data self._pending_plugins = () await self._register_with_scheduler() @@ -2392,13 +2389,9 @@ async def plugin_add(self, comm=None, plugin=None, name=None): logger.info("Starting Worker plugin %s" % name) if hasattr(plugin, "setup"): - try: - result = plugin.setup(worker=self) - if isawaitable(result): - result = await result - except Exception as e: - msg = error_message(e) - return msg + result = plugin.setup(worker=self) + if isawaitable(result): + result = await result return {"status": "OK"} From f85475fbb673e99c9ee45a5296401938d475d870 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 3 Dec 2020 09:41:29 -0800 Subject: [PATCH 06/14] Revert "Raise exceptions in plugin_add rather then serializing them" This reverts commit cc3bec5bcf9d5575acf27c2b0252cb9f6224bc33. --- distributed/worker.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 048137d2498..2764a0d1fc9 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1139,9 +1139,12 @@ async def start(self): setproctitle("dask-worker [%s]" % self.address) - await asyncio.gather( + plugins_msgs = await asyncio.gather( *[self.plugin_add(plugin=plugin) for plugin in self._pending_plugins] ) + for msg in plugins_msgs: + if msg["status"] != "OK": + raise msg["exception"].data self._pending_plugins = () await self._register_with_scheduler() @@ -2389,9 +2392,13 @@ async def plugin_add(self, comm=None, plugin=None, name=None): logger.info("Starting Worker plugin %s" % name) if hasattr(plugin, "setup"): - result = plugin.setup(worker=self) - if isawaitable(result): - result = await result + try: + result = plugin.setup(worker=self) + if isawaitable(result): + result = await result + except Exception as e: + msg = error_message(e) + return msg return {"status": "OK"} From dde7a0976a8de6372b9dcfb044d78cb0893ee8a8 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 3 Dec 2020 09:43:06 -0800 Subject: [PATCH 07/14] Add test for exceptions in register_worker_plugin --- distributed/tests/test_client.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 5712c6a9315..6da6e3ea8f0 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -6270,6 +6270,16 @@ async def test_get_task_metadata_multiple(c, s, a, b): assert metadata2[f2.key] == s.tasks.get(f2.key).metadata +@gen_cluster(client=True) +async def test_register_worker_plugin_exception(c, s, a, b): + class MyPlugin(): + def setup(self, worker=None): + raise ValueError("Setup failed") + + with pytest.raises(ValueError, match="Setup failed"): + await c.register_worker_plugin(MyPlugin()) + + @gen_cluster(client=True) async def test_log_event(c, s, a, b): From df4a6cb3db9b9c404d50c75d548f6b050ec43964 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 3 Dec 2020 13:31:42 -0800 Subject: [PATCH 08/14] Fix test_register_worker_plugin_exception --- distributed/tests/test_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 6da6e3ea8f0..4a66ee8d39c 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -6272,7 +6272,7 @@ async def test_get_task_metadata_multiple(c, s, a, b): @gen_cluster(client=True) async def test_register_worker_plugin_exception(c, s, a, b): - class MyPlugin(): + class MyPlugin: def setup(self, worker=None): raise ValueError("Setup failed") From 5604af52c2ed94ae77cb694de5583affdeaa1eed Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 23 Nov 2021 13:12:18 -0800 Subject: [PATCH 09/14] Unpickle plugin exceptions --- distributed/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index c5be0bacb77..014eaa3a7e4 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1499,7 +1499,7 @@ async def start(self): ) for msg in plugins_msgs: if msg["status"] != "OK": - raise msg["exception"].data + raise pickle.loads(msg["exception"].data) self._pending_plugins = () await self._register_with_scheduler() From d3239133d1b0adc7c4732b5b60fd8fb60231927c Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 3 Dec 2021 06:09:01 -0800 Subject: [PATCH 10/14] Add test for internal plugin_add exceptions --- distributed/tests/test_worker.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 79f16e6613d..bcc3a7f342f 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -38,6 +38,7 @@ from distributed.diagnostics import nvml from distributed.diagnostics.plugin import PipInstall from distributed.metrics import time +from distributed.protocol import pickle from distributed.scheduler import Scheduler from distributed.utils import TimeoutError from distributed.utils_test import ( @@ -395,6 +396,19 @@ def setup(self, worker=None): pass +@pytest.mark.asyncio +async def test_plugin_internal_exception(cleanup): + async with Scheduler(port=0) as s: + with pytest.raises(UnicodeDecodeError, match="codec can't decode"): + async with Worker( + s.address, + plugins={ + b"corrupting pickle" + pickle.dumps(lambda: None, protocol=4), + }, + ) as w: + pass + + @gen_cluster(client=True) async def test_gather(c, s, a, b): x, y = await c.scatter(["x", "y"], workers=[b.address]) From 7552423149be6a123c35d6f642a969f976d0fbbc Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 3 Dec 2021 07:14:15 -0800 Subject: [PATCH 11/14] Log all plugin exceptions, raise first --- distributed/tests/test_worker.py | 28 ++++++++++++++++++++++++++++ distributed/worker.py | 13 ++++++++++++- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index bcc3a7f342f..3cd293f4349 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -396,6 +396,34 @@ def setup(self, worker=None): pass +@pytest.mark.asyncio +async def test_plugin_multiple_exceptions(cleanup): + class MyPlugin1: + def setup(self, worker=None): + raise ValueError("MyPlugin1 Error") + + class MyPlugin2: + def setup(self, worker=None): + raise RuntimeError("MyPlugin2 Error") + + async with Scheduler(port=0) as s: + # There's no guarantee on the order of which exception is raised first + with pytest.raises((ValueError, RuntimeError), match="MyPlugin.* Error"): + with captured_logger("distributed.worker") as logger: + async with Worker( + s.address, + plugins={ + MyPlugin1(), + MyPlugin2(), + }, + ) as w: + pass + + text = logger.getvalue() + assert "MyPlugin1 Error" in text + assert "MyPlugin2 Error" in text + + @pytest.mark.asyncio async def test_plugin_internal_exception(cleanup): async with Scheduler(port=0) as s: diff --git a/distributed/worker.py b/distributed/worker.py index 014eaa3a7e4..6e2fc56b14f 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1497,9 +1497,20 @@ async def start(self): plugins_msgs = await asyncio.gather( *(self.plugin_add(plugin=plugin) for plugin in self._pending_plugins) ) + plugins_exceptions = [] for msg in plugins_msgs: if msg["status"] != "OK": - raise pickle.loads(msg["exception"].data) + exc = pickle.loads(msg["exception"].data) + plugins_exceptions.append(pickle.loads(msg["exception"].data)) + if len(plugins_exceptions) >= 1: + if len(plugins_exceptions) > 1: + logger.error( + "Multiple plugin exceptions raised. All exceptions will be logged, the first is raised." + ) + for exc in plugins_exceptions: + logger.error(repr(exc)) + raise plugins_exceptions[0] + self._pending_plugins = () await self._register_with_scheduler() From 560bb55ea9c918faa495024af46f5c5ba47f6c78 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 13 Jan 2022 13:33:07 -0800 Subject: [PATCH 12/14] Raise plugin_add exceptions rather than pickling them --- distributed/worker.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index e045b8dd6f1..a1c18747e7a 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1504,14 +1504,13 @@ async def start(self): setproctitle("dask-worker [%s]" % self.address) - plugins_msgs = await asyncio.gather( - *(self.plugin_add(plugin=plugin) for plugin in self._pending_plugins) + plugins_exceptions = await asyncio.gather( + *( + self.plugin_add(plugin=plugin, catch_errors=False) + for plugin in self._pending_plugins + ), + return_exceptions=True, ) - plugins_exceptions = [] - for msg in plugins_msgs: - if msg["status"] != "OK": - exc = pickle.loads(msg["exception"].data) - plugins_exceptions.append(pickle.loads(msg["exception"].data)) if len(plugins_exceptions) >= 1: if len(plugins_exceptions) > 1: logger.error( @@ -3262,7 +3261,7 @@ def run(self, comm, function, args=(), wait=True, kwargs=None): def run_coroutine(self, comm, function, args=(), kwargs=None, wait=True): return run(self, comm, function=function, args=args, kwargs=kwargs, wait=wait) - async def plugin_add(self, comm=None, plugin=None, name=None): + async def plugin_add(self, comm=None, plugin=None, name=None, catch_errors=True): with log_errors(pdb=False): if isinstance(plugin, bytes): plugin = pickle.loads(plugin) @@ -3284,6 +3283,8 @@ async def plugin_add(self, comm=None, plugin=None, name=None): if isawaitable(result): result = await result except Exception as e: + if not catch_errors: + raise msg = error_message(e) return msg From dc3aaeb6932103f4b29df3ee0b6ad956b98eb48f Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 14 Jan 2022 02:19:24 -0800 Subject: [PATCH 13/14] Raise only on plugin_add exceptions --- distributed/worker.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index a1c18747e7a..a4f7dcef245 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1504,13 +1504,17 @@ async def start(self): setproctitle("dask-worker [%s]" % self.address) - plugins_exceptions = await asyncio.gather( + plugins_msgs = await asyncio.gather( *( self.plugin_add(plugin=plugin, catch_errors=False) for plugin in self._pending_plugins ), return_exceptions=True, ) + plugins_exceptions = [] + for msg in plugins_msgs: + if isinstance(msg, Exception): + plugins_exceptions.append(msg) if len(plugins_exceptions) >= 1: if len(plugins_exceptions) > 1: logger.error( From d44edb45d10574f17773ad1d8fa143d3047ad788 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 14 Jan 2022 19:17:21 +0100 Subject: [PATCH 14/14] Use list comprehension to generate plugins_exceptions Co-authored-by: Gabe Joseph --- distributed/worker.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index a4f7dcef245..25148958f9d 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1511,10 +1511,7 @@ async def start(self): ), return_exceptions=True, ) - plugins_exceptions = [] - for msg in plugins_msgs: - if isinstance(msg, Exception): - plugins_exceptions.append(msg) + plugins_exceptions = [msg for msg in plugins_msgs if isinstance(msg, Exception)] if len(plugins_exceptions) >= 1: if len(plugins_exceptions) > 1: logger.error(