Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise plugin exceptions on Worker.start() #4298

Merged
merged 17 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,24 @@ def __str__(self):
assert "Bar" in str(e.__cause__)


@pytest.mark.asyncio
async def test_plugin_exception():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async def test_plugin_exception():
async def test_plugin_exception(cleanup):

class MyException(Exception):
def __init__(self, msg):
self.msg = msg

def __str__(self):
return "MyException(%s)" % self.msg

class MyPlugin:
def setup(self, worker=None):
raise MyException("Foo")
mrocklin marked this conversation as resolved.
Show resolved Hide resolved

s = await Scheduler(port=8007)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using `async with Scheduler(port=0) as s:

jrbourbeau marked this conversation as resolved.
Show resolved Hide resolved
with pytest.raises(MyException):
await Worker(s.address, plugins={MyPlugin(),})


@gen_cluster()
async def test_gather(s, a, b):
b.data["x"] = 1
Expand Down
5 changes: 4 additions & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1139,9 +1139,12 @@ async def start(self):

setproctitle("dask-worker [%s]" % self.address)

await asyncio.gather(
plugins_msgs = await asyncio.gather(
*[self.plugin_add(plugin=plugin) for plugin in self._pending_plugins]
)
for msg in plugins_msgs:
if msg["status"] != "OK":
raise msg["exception"].data
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the right way of raising the exception that's being created by distributed.core.error_message. The object in msg["exception"] is delivered as a distributed.protocol.Serialized, but there seems to be no way exposed to extract its data other than just addressing the data attributed directly.

I'm not sure if this is also being handled correctly or if this is a bug in error_message. Any other suggestions here are appreciated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that if you just remove the try-except and special handling in the plugin_add function that things may just work. I suspect that the standard machinery around rpc will handle the exception comfortably.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure though, this is just a guess. I give it a 50/50 chance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your suggestion would work in Worker.start(), but plugin_add is used also as a handler in https://github.com/dask/distributed/blob/master/distributed/worker.py#L662 and I think that would break it. Of course, I could have two different functions, one that raises exceptions in place for Worker.start() and another to be used as a handler.

There's also a chance that I'm overlooking something and your suggestion would work in all situations, is that the case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functions used as handlers handle exceptions in the same way that this function is doing manually. I recommend trying things, seeing if everything passes, and reporting back.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I did that now. What I wanted to prevent is breaking some untested behavior, just like this isn't being tested currently I believe that may break some other use case out there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, perhaps we could add a small test then? I suspect that this would be the same as your current test, but you would use a pre-existing worker (just use gen_cluster rather than create things manually) and then call the client.register_plugin method (I think that that's the name but am not sure)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Matt, I really wasn't aware how to use that properly. I now added a test for register_worker_plugin, but I had to revert my previous commit, as the exception type doesn't seem to get propagated with the try/except block you suggested removing. I guess the serialization step is indeed required for handlers.

If this is still too dirty, the only idea I have right now to avoid the raise msg["exception"].data is to have two different functions for plugin_add, one that serializes the exception, and another that doesn't.

self._pending_plugins = ()

await self._register_with_scheduler()
Expand Down