Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise plugin exceptions on Worker.start() #4298

Merged
merged 17 commits into from
Jan 18, 2022

Conversation

pentschev
Copy link
Member

This PR raises plugin exception in Worker.start() that were so far being silently ignored.

Fixes #4297

*[self.plugin_add(plugin=plugin) for plugin in self._pending_plugins]
)
for msg in plugins_msgs:
if msg["status"] != "OK":
raise msg["exception"].data
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the right way of raising the exception that's being created by distributed.core.error_message. The object in msg["exception"] is delivered as a distributed.protocol.Serialized, but there seems to be no way exposed to extract its data other than just addressing the data attributed directly.

I'm not sure if this is also being handled correctly or if this is a bug in error_message. Any other suggestions here are appreciated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that if you just remove the try-except and special handling in the plugin_add function that things may just work. I suspect that the standard machinery around rpc will handle the exception comfortably.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure though, this is just a guess. I give it a 50/50 chance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your suggestion would work in Worker.start(), but plugin_add is used also as a handler in https://github.com/dask/distributed/blob/master/distributed/worker.py#L662 and I think that would break it. Of course, I could have two different functions, one that raises exceptions in place for Worker.start() and another to be used as a handler.

There's also a chance that I'm overlooking something and your suggestion would work in all situations, is that the case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functions used as handlers handle exceptions in the same way that this function is doing manually. I recommend trying things, seeing if everything passes, and reporting back.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I did that now. What I wanted to prevent is breaking some untested behavior, just like this isn't being tested currently I believe that may break some other use case out there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, perhaps we could add a small test then? I suspect that this would be the same as your current test, but you would use a pre-existing worker (just use gen_cluster rather than create things manually) and then call the client.register_plugin method (I think that that's the name but am not sure)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Matt, I really wasn't aware how to use that properly. I now added a test for register_worker_plugin, but I had to revert my previous commit, as the exception type doesn't seem to get propagated with the try/except block you suggested removing. I guess the serialization step is indeed required for handlers.

If this is still too dirty, the only idea I have right now to avoid the raise msg["exception"].data is to have two different functions for plugin_add, one that serializes the exception, and another that doesn't.

def setup(self, worker=None):
raise MyException("Foo")

s = await Scheduler(port=8007)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using `async with Scheduler(port=0) as s:

*[self.plugin_add(plugin=plugin) for plugin in self._pending_plugins]
)
for msg in plugins_msgs:
if msg["status"] != "OK":
raise msg["exception"].data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that if you just remove the try-except and special handling in the plugin_add function that things may just work. I suspect that the standard machinery around rpc will handle the exception comfortably.

*[self.plugin_add(plugin=plugin) for plugin in self._pending_plugins]
)
for msg in plugins_msgs:
if msg["status"] != "OK":
raise msg["exception"].data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure though, this is just a guess. I give it a 50/50 chance.

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @pentschev!

distributed/tests/test_worker.py Outdated Show resolved Hide resolved
@@ -407,6 +407,29 @@ def __str__(self):
assert "Bar" in str(e.__cause__)


@pytest.mark.asyncio
async def test_plugin_exception():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async def test_plugin_exception():
async def test_plugin_exception(cleanup):

distributed/tests/test_worker.py Outdated Show resolved Hide resolved
distributed/tests/test_worker.py Outdated Show resolved Hide resolved
@pentschev
Copy link
Member Author

@jrbourbeau @mrocklin you both had pretty much the same comments at the same time on the tests. Responding here, the test was mostly a copy of another test, given my first attempts with async with ... had failed and the previous solution with await worked, I chose to use that one. Now with both of your suggestions everything works, all test-related changes should be in, but please let me know if I missed something.

@pentschev
Copy link
Member Author

Any other suggestions on improving this PR or are we ok with its current state? The test seems unrelated too, but correct me if I'm mistaken.

@pentschev
Copy link
Member Author

Friendly ping here, do we have anything left/other suggestions to improve this PR?

@mrocklin
Copy link
Member

Sorry for the delay @pentschev . I and others have been busy recently. I would like for someone (maybe me) to take a look at why the exception isn't being passed through. I don't want our code to manually pack and unpack exceptions if we don't have to. I personally will put this on my holiday TODO list (starts at the end of next week). Hopefully I have time to look at it before then.

@pentschev
Copy link
Member Author

No worries, thanks @mrocklin . I'm also happy to do things here, I just want to be sure on what's the general "right" way of handling such cases. Also this isn't very high-priority, so no pressure, I just wanted to make sure it wasn't forgotten. :)

Base automatically changed from master to main March 8, 2021 19:04
@pentschev
Copy link
Member Author

@jrbourbeau @mrocklin if there are no further changes needed, could we merge this?

The failing tests are #5494 and the other pytest process seems to have died without reporting back final results in https://github.com/dask/distributed/runs/4304808476?check_suite_focus=true, but it seems unrelated to me.

@quasiben
Copy link
Member

quasiben commented Dec 2, 2021

@florian-jetter-by as you've done some work on the worker recently, would you have any issue with us merging this in today or tomorrow ?

@mrocklin
Copy link
Member

mrocklin commented Dec 2, 2021

Pinging me these days probably isn't effective.

Also, cc'ing @fjetter (Florian's more standard username)

Comment on lines 1500 to 1502
for msg in plugins_msgs:
if msg["status"] != "OK":
raise pickle.loads(msg["exception"].data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering what happens if multiple plugins raise an exception. do we at least log all of them? If not, should we iterate over all, log them and only afterwards raise?

FWIW, I think this would be a great application for exception groups proposed (and accepted) in PEP654. It's not implemented, yet, just a heads up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 7552423 .

Comment on lines 1497 to 1499
plugins_msgs = await asyncio.gather(
*(self.plugin_add(plugin=plugin) for plugin in self._pending_plugins)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also possible for plugin_add to raise other exceptions (e.g. unpickling errors). What would happen in these cases? I assume it is failing hard but I would like to confirm. Do we have tests for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test for that in d323913 .

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Once green-ish we can merge

@pentschev
Copy link
Member Author

Failing gpuCI tests are being addressed by #5540 and #5562 .

@pentschev
Copy link
Member Author

It seems Windows failures are unrelated to this. Failures from https://github.com/dask/distributed/runs/4409717678?check_suite_focus=true are #5494, and test_worker_reconnects_mid_compute/test_worker_reconnects_mid_compute_multiple_states_on_scheduler that don't rely on worker plugins. The other failure from https://github.com/dask/distributed/runs/4409717724?check_suite_focus=true seems like a general timeout without most tests even running. And gpuCI failures are as per #4298 (comment).

Are we good to merge this @fjetter ?

@pentschev
Copy link
Member Author

Can we merge this @fjetter @jrbourbeau ?

@quasiben quasiben mentioned this pull request Jan 13, 2022
2 tasks
plugins_exceptions = []
for msg in plugins_msgs:
if msg["status"] != "OK":
exc = pickle.loads(msg["exception"].data)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pickle-unpickle here feels a little cumbersome to me. I'm not sure I love this idea, but what if you add a catch_errors=True to plugin_add, and in start just set catch_errors=False?

diff --git a/distributed/worker.py b/distributed/worker.py
index 6e2fc56b..f4e3ed03 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -1494,22 +1494,12 @@ class Worker(ServerNode):
 
         setproctitle("dask-worker [%s]" % self.address)
 
-        plugins_msgs = await asyncio.gather(
-            *(self.plugin_add(plugin=plugin) for plugin in self._pending_plugins)
+        await asyncio.gather(
+            *(
+                self.plugin_add(plugin=plugin, catch_errors=False)
+                for plugin in self._pending_plugins
+            )
         )
-        plugins_exceptions = []
-        for msg in plugins_msgs:
-            if msg["status"] != "OK":
-                exc = pickle.loads(msg["exception"].data)
-                plugins_exceptions.append(pickle.loads(msg["exception"].data))
-        if len(plugins_exceptions) >= 1:
-            if len(plugins_exceptions) > 1:
-                logger.error(
-                    "Multiple plugin exceptions raised. All exceptions will be logged, the first is raised."
-                )
-                for exc in plugins_exceptions:
-                    logger.error(repr(exc))
-            raise plugins_exceptions[0]
 
         self._pending_plugins = ()
 
@@ -3182,7 +3172,7 @@ class Worker(ServerNode):
     def run_coroutine(self, comm, function, args=(), kwargs=None, wait=True):
         return run(self, comm, function=function, args=args, kwargs=kwargs, wait=wait)
 
-    async def plugin_add(self, comm=None, plugin=None, name=None):
+    async def plugin_add(self, comm=None, plugin=None, name=None, catch_errors=True):
         with log_errors(pdb=False):
             if isinstance(plugin, bytes):
                 plugin = pickle.loads(plugin)
@@ -3204,6 +3194,8 @@ class Worker(ServerNode):
                     if isawaitable(result):
                         result = await result
                 except Exception as e:
+                    if not catch_errors:
+                        raise
                     msg = error_message(e)
                     return msg

If you wanted to maintain the behavior you have here where multiple plugin exceptions can be handled and logged, then you could pass gather(..., return_exceptions=True) and handle them similarly as here, just without the unpickling. But I'm not sure if this functionality is actually necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pickle-unpickle here feels a little cumbersome to me. I'm not sure I love this idea, but what if you add a catch_errors=True to plugin_add, and in start just set catch_errors=False?

Thanks for the suggestion, that indeed looks better.

If you wanted to maintain the behavior you have here where multiple plugin exceptions can be handled and logged, then you could pass gather(..., return_exceptions=True) and handle them similarly as here, just without the unpickling. But I'm not sure if this functionality is actually necessary.

I'm ok with having this or not, it was suggested by @fjetter in #4298 (comment) , for now I'm keeping this but we can remove it if people prefer.

Relevant changes applied in 560bb55 .

)
if len(plugins_exceptions) >= 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think plugins_exceptions is not just the exceptions; it contains all the values returned by plugin_add, including OK messages. I think you'd want plugins_exceptions = [x for x in plugins_results if isinstance(x, Exception)] first.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, that was the case with plugins_msgs (the previous pickled version), but now adding a line just above this print(f"plugins_exceptions: {plugins_exceptions}"), I see the following for distributed/tests/test_worker.py::test_plugin_exception:

plugins_exceptions: [ValueError('Setup failed')]

and for distributed/tests/test_worker.py::test_plugin_internal_exception I see:

plugins_exceptions: [UnicodeDecodeError('utf-8', b'orrupting pickle\x80\x04\x95e\x02\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\r_builtin_type\x94\x93\x94\x8c', 16, 17, 'invalid start byte')]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try having the plugin not raise an exception and see what happens.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're absolutely right, thanks for pointing it out. This is fixed now by dc3aaeb. Could you take another look when you have a chance?

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM besides this small nit

distributed/worker.py Outdated Show resolved Hide resolved
@pentschev
Copy link
Member Author

A quick look at the failing tests seem to point all failures to timeouts. Once the release is complete, can we get this merged?

@quasiben
Copy link
Member

With the release completed for 2022.01.0 and the last comments addressed I'm going to merge in this PR.

Thank your @pentschev for the work and @fjetter / @gjoseph92 for the reviews

@quasiben quasiben merged commit 4c8ebfd into dask:main Jan 18, 2022
@pentschev
Copy link
Member Author

Thanks everyone who reviewed and @quasiben for merging!

@pentschev pentschev deleted the raise-worker-plugin-exceptions branch January 18, 2022 22:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Handling exceptions in Worker plugins
6 participants