Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always return ws.address from _remove_from_processing #6884

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2259,7 +2259,7 @@ def transition_processing_released(self, key: str, stimulus_id: str):
assert ts.state == "processing"

w = _remove_from_processing(self, ts)
if w:
if w in self.workers:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about this. It feels like another instance of #6392.

In #6614 (comment) I have a driveby refactor fixing a possible stale WorkerState in _remove_from_processing.

If we go ahead and return the address even if we know it's stale, then transition_processing_released here has no ability to verify whether w is stale or not, because it just has an address, not a WorkerState instance. It's possible that a worker left, then reconnected with the same address and port. If that's the case, we'd then be sending a free-keys to the wrong worker instance.

I'm wondering if instead of the approach in this PR, we should go back to the old behavior and instead explicitly handle the case in transition_processing_erred where both w and worker are None (i.e. the KilledWorker case)?

Copy link
Member Author

@hendrikmakait hendrikmakait Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two thoughts:

  1. EDIT: Look at Withhold root tasks [no co assignment] #6614 (comment) & Always return ws.address from _remove_from_processing #6884 (comment)
    IIUC, since we're not awaiting anywhere in
    w = _remove_from_processing(self, ts)
    if w in self.workers:
    , it should never happen that
    if ws.address not in state.workers: # may have been removed
    is True while
    if w in self.workers:
    is False. Thus, this piece of code should behave the same way as before. Am I missing something here?
  2. Instead of handling the None case, should we maybe return the entire WorkerState object to ensure that users of _remove_from_processing can check for stale worker states as you described? From your description, it sounds like returning a worker adress might generally not be all too helpful.

Copy link
Member Author

@hendrikmakait hendrikmakait Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I wrote the previous response without looking at the linked comment. That drive-by fix makes sense to me. To solve the issue at hand, I'd return whatever

ws = ts.processing_on
spits out. This should allow downstream users to differentiate between stale and current worker states.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I don't like downstream users having to differentiate themselves (more points to mess that up). Returning None felt pretty sensible to me in that case. In the rare case where callers need to know the worker we thought it was processing on, even if that worker might be stale—I believe transition_processing_erred is the only case where this applies?—why not retrieve that explicitly first?

worker_msgs[w] = [
{
"op": "free-keys",
Expand Down Expand Up @@ -2304,7 +2304,6 @@ def transition_processing_erred(
traceback=None,
exception_text: str | None = None,
traceback_text: str | None = None,
worker: str | None = None,
**kwargs,
):
ws: WorkerState
Expand All @@ -2329,7 +2328,7 @@ def transition_processing_erred(

w = _remove_from_processing(self, ts)

ts.erred_on.add(w or worker) # type: ignore
ts.erred_on.add(w)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this is what motivated your change. However, if _remove_from_processing returns None, we'd use worker instead. Is worker also None in this scenario? If so, is the kwarg even still in use?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we encountered a None in ts.erred_on in #6874, there needs to be some scenario where both w and worker_ are None. I dropped the worker kwarg since it's not being used anywhere and will get collected in **kwargs.

Copy link
Member

@fjetter fjetter Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried here about consistency.

The only way I am aware of to trigger this transition is by receiving a task-erred message from a worker

"task-erred": self.handle_task_erred,

which is handled by handle_worker which will always have a worker extra kwarg

await self.handle_stream(comm=comm, extra={"worker": worker})

handler(**merge(extra, msg))

i.e. calling handle_task_erred will always have worker as part of msg (proper signatures + mypy would help here)

def handle_task_erred(self, key: str, stimulus_id: str, **msg) -> None:
r: tuple = self.stimulus_task_erred(key=key, stimulus_id=stimulus_id, **msg)

which leads us to the actual stimulus handler which I think should verify if the task-erred message actually originated from the worker the task is currently processing on which it doesn't

def stimulus_task_erred(
self,
key=None,
worker=None,
exception=None,
stimulus_id=None,
traceback=None,
**kwargs,
):
"""Mark that a task has erred on a particular worker"""
logger.debug("Stimulus task erred %s, %s", key, worker)
ts: TaskState = self.tasks.get(key)
if ts is None or ts.state != "processing":
return {}, {}, {}
if ts.retries > 0:
ts.retries -= 1
return self._transition(key, "waiting", stimulus_id)
else:
return self._transition(
key,
"erred",
stimulus_id,
cause=key,
exception=exception,
traceback=traceback,
worker=worker,
**kwargs,
)

The whole point of this is that if I read all of this correctly, w == worker is always True unless something went wrong earlier.

How worker=None can happen is something I do not understand

cc @crusaderky I believe you've been working on something similar recently?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, the worker=None is coming from

if ts.suspicious > self.allowed_failures:
del recommendations[k]
e = pickle.dumps(
KilledWorker(task=k, last_worker=ws.clean()), protocol=4
)
r = self.transition(
k, "erred", exception=e, cause=k, stimulus_id=stimulus_id
)

therefore, the scenario of an empty ts.erred_on comes from tasks that have been transitioned to error because of a KilledWorker exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

therefore, the scenario of an empty ts.erred_on comes from tasks that have been transitioned to error because of a KilledWorker exception.

That makes sense, #6874 includes a bunch of KilledWorker exceptions.

if exception is not None:
ts.exception = exception
ts.exception_text = exception_text # type: ignore
Expand Down Expand Up @@ -7314,7 +7313,7 @@ def request_remove_replicas(
)


def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str | None:
def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str:
"""Remove *ts* from the set of processing tasks.

See also
Expand All @@ -7326,7 +7325,7 @@ def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str | None:
ts.processing_on = None

if ws.address not in state.workers: # may have been removed
return None
return ws.address

duration = ws.processing.pop(ts)
ws.long_running.discard(ts)
Expand Down