-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Always return ws.address
from _remove_from_processing
#6884
Always return ws.address
from _remove_from_processing
#6884
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ±0 15 suites ±0 6h 17m 38s ⏱️ - 48m 17s Results for commit 3ffcf71. ± Comparison against base commit 1d0701b. ♻️ This comment has been updated with latest results. |
@@ -2329,7 +2329,7 @@ def transition_processing_erred( | |||
|
|||
w = _remove_from_processing(self, ts) | |||
|
|||
ts.erred_on.add(w or worker) # type: ignore | |||
ts.erred_on.add(w) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC this is what motivated your change. However, if _remove_from_processing
returns None, we'd use worker
instead. Is worker
also None in this scenario? If so, is the kwarg even still in use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we encountered a None
in ts.erred_on
in #6874, there needs to be some scenario where both w
and worker
_ are None
. I dropped the worker
kwarg since it's not being used anywhere and will get collected in **kwargs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit worried here about consistency.
The only way I am aware of to trigger this transition is by receiving a task-erred
message from a worker
distributed/distributed/scheduler.py
Line 3093 in 1d0701b
"task-erred": self.handle_task_erred, |
which is handled by handle_worker which will always have a worker
extra kwarg
distributed/distributed/scheduler.py
Line 4888 in 1d0701b
await self.handle_stream(comm=comm, extra={"worker": worker}) |
distributed/distributed/core.py
Line 842 in 1d0701b
handler(**merge(extra, msg)) |
i.e. calling handle_task_erred
will always have worker
as part of msg
(proper signatures + mypy would help here)
distributed/distributed/scheduler.py
Lines 4756 to 4757 in 1d0701b
def handle_task_erred(self, key: str, stimulus_id: str, **msg) -> None: | |
r: tuple = self.stimulus_task_erred(key=key, stimulus_id=stimulus_id, **msg) |
which leads us to the actual stimulus handler which I think should verify if the task-erred message actually originated from the worker the task is currently processing on which it doesn't
distributed/distributed/scheduler.py
Lines 4161 to 4190 in 1d0701b
def stimulus_task_erred( | |
self, | |
key=None, | |
worker=None, | |
exception=None, | |
stimulus_id=None, | |
traceback=None, | |
**kwargs, | |
): | |
"""Mark that a task has erred on a particular worker""" | |
logger.debug("Stimulus task erred %s, %s", key, worker) | |
ts: TaskState = self.tasks.get(key) | |
if ts is None or ts.state != "processing": | |
return {}, {}, {} | |
if ts.retries > 0: | |
ts.retries -= 1 | |
return self._transition(key, "waiting", stimulus_id) | |
else: | |
return self._transition( | |
key, | |
"erred", | |
stimulus_id, | |
cause=key, | |
exception=exception, | |
traceback=traceback, | |
worker=worker, | |
**kwargs, | |
) |
The whole point of this is that if I read all of this correctly, w == worker
is always True unless something went wrong earlier.
How worker=None
can happen is something I do not understand
cc @crusaderky I believe you've been working on something similar recently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, the worker=None
is coming from
distributed/distributed/scheduler.py
Lines 4308 to 4315 in 1d0701b
if ts.suspicious > self.allowed_failures: | |
del recommendations[k] | |
e = pickle.dumps( | |
KilledWorker(task=k, last_worker=ws.clean()), protocol=4 | |
) | |
r = self.transition( | |
k, "erred", exception=e, cause=k, stimulus_id=stimulus_id | |
) |
therefore, the scenario of an empty ts.erred_on
comes from tasks that have been transitioned to error because of a KilledWorker
exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
therefore, the scenario of an empty ts.erred_on comes from tasks that have been transitioned to error because of a KilledWorker exception.
That makes sense, #6874 includes a bunch of KilledWorker
exceptions.
I think there are a couple of things we should verify here.
should really be if ts is None or ts.state != "processing" or ts.processing_on != worker:
return {}, {}, {} it's very likely this is not tested
|
d410146
to
3ffcf71
Compare
Done.
I have added the assertion step for now, better safe than sorry. It's odd enough we ran into the |
@@ -2259,7 +2259,7 @@ def transition_processing_released(self, key: str, stimulus_id: str): | |||
assert ts.state == "processing" | |||
|
|||
w = _remove_from_processing(self, ts) | |||
if w: | |||
if w in self.workers: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little concerned about this. It feels like another instance of #6392.
In #6614 (comment) I have a driveby refactor fixing a possible stale WorkerState in _remove_from_processing
.
If we go ahead and return the address even if we know it's stale, then transition_processing_released
here has no ability to verify whether w
is stale or not, because it just has an address, not a WorkerState instance. It's possible that a worker left, then reconnected with the same address and port. If that's the case, we'd then be sending a free-keys
to the wrong worker instance.
I'm wondering if instead of the approach in this PR, we should go back to the old behavior and instead explicitly handle the case in transition_processing_erred
where both w
and worker
are None (i.e. the KilledWorker case)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two thoughts:
- EDIT: Look at Withhold root tasks [no co assignment] #6614 (comment) & Always return
ws.address
from_remove_from_processing
#6884 (comment)
IIUC, since we're not awaiting anywhere indistributed/distributed/scheduler.py
Lines 2261 to 2262 in c15a10e
w = _remove_from_processing(self, ts) if w in self.workers: distributed/distributed/scheduler.py
Line 7337 in c15a10e
if ws.address not in state.workers: # may have been removed distributed/distributed/scheduler.py
Line 2262 in c15a10e
if w in self.workers: - Instead of handling the
None
case, should we maybe return the entireWorkerState
object to ensure that users of_remove_from_processing
can check for stale worker states as you described? From your description, it sounds like returning a worker adress might generally not be all too helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: I wrote the previous response without looking at the linked comment. That drive-by fix makes sense to me. To solve the issue at hand, I'd return whatever
distributed/distributed/scheduler.py
Line 7333 in c15a10e
ws = ts.processing_on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I don't like downstream users having to differentiate themselves (more points to mess that up). Returning None felt pretty sensible to me in that case. In the rare case where callers need to know the worker we thought it was processing on, even if that worker might be stale—I believe transition_processing_erred
is the only case where this applies?—why not retrieve that explicitly first?
Did this happen? I'm not seeing it in the diff? I also disagree that it should be a no-op. We currently support an unexpected worker successfully competing a task: distributed/distributed/scheduler.py Lines 1987 to 1993 in c15a10e
so we should also support an unexpected worker encountering an error with a task. (I do think both cases probably indicate something is going wrong and should eventually be removed, I just don't see why we'd be inconsistent about the |
It looks like I forgot to push that commit, it's on my local branch.
I think being inconsistent between |
Closes #6874
pre-commit run --all-files