-
-
Notifications
You must be signed in to change notification settings - Fork 721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix decide_worker picking a closing worker #8032
Conversation
@@ -8205,6 +8202,7 @@ def decide_worker( | |||
candidates = set(all_workers) | |||
else: | |||
candidates = {wws for dts in ts.dependencies for wws in dts.who_has} | |||
candidates &= all_workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes #8019
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I think this is a situation where an actual decide_worker
unit test would be appropriate
a1cbca6
to
882a546
Compare
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 20 files ± 0 20 suites ±0 11h 58m 7s ⏱️ + 1h 37m 24s For more details on these failures, see this check. Results for commit 80b36c9. ± Comparison against base commit a7f7764. This pull request removes 5 and adds 8 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
in_update_graph = asyncio.Event() | ||
|
||
async def update_graph(*args, **kwargs): | ||
in_update_graph.set() | ||
await async_poll_for( | ||
lambda: b_ws.status == Status.closing, timeout=5, period=0 | ||
) | ||
s.update_graph(*args, **kwargs) | ||
nonlocal done_update_graph | ||
done_update_graph = True | ||
|
||
s.stream_handlers["update-graph"] = update_graph | ||
await in_update_graph.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the problem, I would've expected something like
def block(arg, enter, exit):
enter.set()
exit.wait()
return arg
enter = Event()
exit = Event()
enter2 = Event()
exit2 = Event()
d1 = c.submit(inc, 0, key='d1', workers=["A"])
d2 = c.submit(block, 1, enter=enter, exit=exit, key='d2', workers=["B"])
x = c.submit(sum, [d1, d2], key='x')
block_executor = c.submit(block, None, enter=enter2, exit=exit2, key='x', workers=["B"])
await enter.wait()
await enter2.wait()
await asyncio.gather([
exit.set(),
B.close()
])
I haven't tested the above and it may still need fine tuning but I would expect something like this to trigger the condition you're talking about. d2 completes while B is closing s.t. when d2 finishes, x is transitioned while B is still closing.
I don't entirely understand why we need update_graph to trigger this condition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Especially considering that update_graph (right now) is still synchronous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too happy with my test, either. But I don't think your suggestion works (read below).
What we're trying to test:
decide_worker_non_rootish(ts)
is called on a task withworkers=[b.address], allow_other_workers=True
,- with the dependencies of the task partially on a and partially on b,
- while b is in
closing
status, and - with
decide_worker(ts, valid_workers=set(), all_workers={a})
that would pick b fromts.dependencies[...].who_has
, due to less dependency bytes needing to be transferred to it, but instead picks a because b is not in all_workers.
The problem with writing the test is that we need to time update_graph
to land exactly during the 1-2 event loop cycles while the worker is in closing
status.
The worker transitions from closing
to being removed when the batched comms collapse, here:
distributed/distributed/scheduler.py
Lines 5700 to 5705 in f0303aa
finally: | |
if worker in self.stream_comms: | |
worker_comm.abort() | |
await self.remove_worker( | |
worker, stimulus_id=f"handle-worker-cleanup-{time()}" | |
) |
Alternatively to monkey-patching update-graph, I could have
- monkey-patched Scheduler.remove_worker. In hindsight that's a better idea; I'll have a look at it now.
- synchronously call update_graph directly on the scheduler and update the state on the client by hand (complicated and brittle).
In your code:
I think that the scheduler will never receive the task-finished message for d2, since it's a whole 2 cycles of event loop after Worker.close collapses the batched comms.
I'm not sure why you think the scheduler should receive {op: task-finished, key: d2}
deterministically after {op: worker-status-change, worker: <b>, status: closing}
, but deterministically before the collapse of the TCP channel?
Also I can't understand what's the purpose of block_executor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but deterministically before the collapse of the TCP channel?
I never said it would do so deterministically. I said I would expect something like this to trigger the condition you described. I also said it would require more fine tuning like distributing nbytes properly and maybe introduce an event somewhere.
I got 2 failures out of 400 runs:
neither seem to be related to this PR. |
await wait_remove_worker.wait() | ||
return await orig_remove_worker(*args, **kwargs) | ||
|
||
monkeypatch.setattr(s, "remove_worker", remove_worker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK-ish with using monkeypatch here. However, just for the sake of prosperity, there is also a way to use our RPC mechanism more naturally. Essentially you want to intercept the point in time just when a request handler is called. You can make this very explitc
async def new_remove_worker_handler_with_events(self, *args, **kwargs):
in_remove_worker.set()
await wait_remove_worker.wait()
return await self.remove_worker(*args, **kwargs)
s.handlers['unregister'] = new_remove_worker_handler_with_events`
Semantically, this overrides the unregister
handler and replaces it with a new handler.
However, in the end, it's the same thing just the way the patch is installed is different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're not arriving here from the unregister
handler. We're arriving from
distributed/distributed/scheduler.py
Lines 5700 to 5705 in f0303aa
finally: | |
if worker in self.stream_comms: | |
worker_comm.abort() | |
await self.remove_worker( | |
worker, stimulus_id=f"handle-worker-cleanup-{time()}" | |
) |
L = c.map( | ||
inc, | ||
range(10), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually need a map for this? This feels much more difficult to control than if we used single tasks with specific placement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. Simplified.
if not self.running: | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this related? At least the new test doesn't seem to care about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unreachable because the same condition is already tested on line 2218
@@ -8205,6 +8202,7 @@ def decide_worker( | |||
candidates = set(all_workers) | |||
else: | |||
candidates = {wws for dts in ts.dependencies for wws in dts.who_has} | |||
candidates &= all_workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I think this is a situation where an actual decide_worker
unit test would be appropriate
Code review comments have been addressed |
test_submit_after_failed_worker_async
#6311