-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure steal requests from same-IP but distinct workers are rejected #6585
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ±0 15 suites ±0 7h 29m 35s ⏱️ + 12m 45s For more details on these failures and errors, see this check. Results for commit b20ada6. ± Comparison against base commit e6cc40a. ♻️ This comment has been updated with latest results. |
I missed the comment about the worker closing again in #6263 (comment) |
distributed/stealing.py
Outdated
def remove_worker(self, scheduler: Scheduler, worker: str): | ||
del self.stealable[worker] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I thought about modifying / tracking removed workers and match up with in_flight. That led me towards a "remember cancellation" mechanism which was way too complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about something like (with appropriate KeyError handling)
levels = self.stealable.pop(worker)
for level_i, level_tasks in enumerate(levels):
for ts in level_tasks:
self.stealable_all[level_i].remove(ts)
self.key_stealable.pop(ts, None)
self.in_flight.pop(ts) # I don't know??
# Maybe??:
self.in_flight_occupancy.pop(worker) # this may cause KeyErrors in other places, need to handle
if not self.in_flight:
self.in_flight_occupancy.clear()
self._in_flight_event.set()
I'm just copying from
distributed/distributed/stealing.py
Lines 193 to 206 in cb88e3b
def remove_key_from_stealable(self, ts): | |
result = self.key_stealable.pop(ts, None) | |
if result is None: | |
return | |
worker, level = result | |
try: | |
self.stealable[worker][level].remove(ts) | |
except KeyError: | |
pass | |
try: | |
self.stealable_all[level].remove(ts) | |
except KeyError: | |
pass |
and mixing in
distributed/distributed/stealing.py
Lines 166 to 177 in cb88e3b
elif start == "processing": | |
ts = self.scheduler.tasks[key] | |
self.remove_key_from_stealable(ts) | |
d = self.in_flight.pop(ts, None) | |
if d: | |
thief = d["thief"] | |
victim = d["victim"] | |
self.in_flight_occupancy[thief] -= d["thief_duration"] | |
self.in_flight_occupancy[victim] += d["victim_duration"] | |
if not self.in_flight: | |
self.in_flight_occupancy.clear() | |
self._in_flight_event.set() |
(but I'm not sure if the second part is necessary, because it would happen anyway when
remove_worker
transitions any tasks processing on the worker to released
.)
I don't know if this is necessary or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried something very similar and ended up deleting it again because of complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. I'm a little hesitant about not clearing out the state properly, but I guess it's ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See also #6600 I would like to clarify the discussion on that ticket before we implement any more complex logic for work stealing
There are failing tests |
I wonder why the FutureWarning didn't fail for me locally. Is there anything I need to enable for this to cause a failure? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably less important, but could the metrics get thrown off here by stale workers?
distributed/distributed/stealing.py
Lines 170 to 177 in cb88e3b
if d: | |
thief = d["thief"] | |
victim = d["victim"] | |
self.in_flight_occupancy[thief] -= d["thief_duration"] | |
self.in_flight_occupancy[victim] += d["victim_duration"] | |
if not self.in_flight: | |
self.in_flight_occupancy.clear() | |
self._in_flight_event.set() |
distributed/stealing.py
Outdated
def remove_worker(self, scheduler: Scheduler, worker: str): | ||
del self.stealable[worker] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about something like (with appropriate KeyError handling)
levels = self.stealable.pop(worker)
for level_i, level_tasks in enumerate(levels):
for ts in level_tasks:
self.stealable_all[level_i].remove(ts)
self.key_stealable.pop(ts, None)
self.in_flight.pop(ts) # I don't know??
# Maybe??:
self.in_flight_occupancy.pop(worker) # this may cause KeyErrors in other places, need to handle
if not self.in_flight:
self.in_flight_occupancy.clear()
self._in_flight_event.set()
I'm just copying from
distributed/distributed/stealing.py
Lines 193 to 206 in cb88e3b
def remove_key_from_stealable(self, ts): | |
result = self.key_stealable.pop(ts, None) | |
if result is None: | |
return | |
worker, level = result | |
try: | |
self.stealable[worker][level].remove(ts) | |
except KeyError: | |
pass | |
try: | |
self.stealable_all[level].remove(ts) | |
except KeyError: | |
pass |
and mixing in
distributed/distributed/stealing.py
Lines 166 to 177 in cb88e3b
elif start == "processing": | |
ts = self.scheduler.tasks[key] | |
self.remove_key_from_stealable(ts) | |
d = self.in_flight.pop(ts, None) | |
if d: | |
thief = d["thief"] | |
victim = d["victim"] | |
self.in_flight_occupancy[thief] -= d["thief_duration"] | |
self.in_flight_occupancy[victim] += d["victim_duration"] | |
if not self.in_flight: | |
self.in_flight_occupancy.clear() | |
self._in_flight_event.set() |
(but I'm not sure if the second part is necessary, because it would happen anyway when
remove_worker
transitions any tasks processing on the worker to released
.)
I don't know if this is necessary or not.
distributed/tests/test_steal.py
Outdated
while len_before == len(s.events["stealing"]): | ||
await asyncio.sleep(0.1) | ||
|
||
assert victim_ts.processing_on != wsB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What should it be processing on? Because wsA confirmed the steal request, so wsA has released the task. Stealing called Scheduler.reschedule
which transitioned it to released:
distributed/distributed/scheduler.py
Lines 6561 to 6579 in b59a322
def reschedule(self, key=None, worker=None): | |
"""Reschedule a task | |
Things may have shifted and this task may now be better suited to run | |
elsewhere | |
""" | |
try: | |
ts = self.tasks[key] | |
except KeyError: | |
logger.warning( | |
"Attempting to reschedule task {}, which was not " | |
"found on the scheduler. Aborting reschedule.".format(key) | |
) | |
return | |
if ts.state != "processing": | |
return | |
if worker and ts.processing_on.address != worker: | |
return | |
self.transitions({key: "released"}, f"reschedule-{time()}") |
So I think I'd expect something like assert victim_ts.state == "released"
. So if reschedule
goes through a full transitions cycle, you'd actually expect it to go processing->released->waiting->processing
and be assigned to a worker (probably wsB2
since it's idle).
I guess I'd expect something like assert victim_ts.processing_on in (wsA, wsB2)
and assert victim_ts.state == "processing"
. We want to be sure we don't forget about this task, since that would cause a deadlock too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm asserting that it's not the stale WorkerState
. Everything else is beyond the scope of this unit test. We'll reschedule the task and the scheduler can make a decision. I don't care as long as I get my result, i.e. the futures complete.
I prefer not asserting too much here. From a high level, I don't even care at this point that we're going through full rescheduling.
As long as |
I removed |
If any other cosmetic changes pop up, feel free to push on this branch so we can merge this asap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have permission to push to your branch. Could you add
diff --git a/distributed/stealing.py b/distributed/stealing.py
index 2c78704c..30b99175 100644
--- a/distributed/stealing.py
+++ b/distributed/stealing.py
@@ -332,12 +332,12 @@ class WorkStealing(SchedulerPlugin):
state in _WORKER_STATE_UNDEFINED
# If our steal information is somehow stale we need to reschedule
or state in _WORKER_STATE_CONFIRM
- and thief != self.scheduler.workers.get(thief.address)
+ and thief is not self.scheduler.workers.get(thief.address)
):
self.log(
(
"reschedule",
- thief.address not in self.scheduler.workers,
+ thief is not self.scheduler.workers.get(thief.address),
*_log_msg,
)
)
distributed/stealing.py
Outdated
def remove_worker(self, scheduler: Scheduler, worker: str): | ||
del self.stealable[worker] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. I'm a little hesitant about not clearing out the state properly, but I guess it's ok.
It seems you have some tests that are actually failing, |
if not isinstance(other, WorkerState): | ||
return False | ||
return hash(self) == hash(other) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#6593 instead makes it explicit, and explains why the decision. I think we should agree on one or the other and be consistent.
def __hash__(self) -> int:
# TODO eplain
return id(self)
def __eq__(self, other: object) -> bool:
# TODO explain
return other is self
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#6593 adds to worker_state_machine.WorkerState.validate_state
a check that you can never have two instances of a TaskState with the same key in any of its sets. I think scheduler.SchedulerState.validate_state
should have the same logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely convinced that this is necessary. There are many collections and this particular problem was not even a collection on the scheduler itself but rather an extension.
Specifically, without https://github.com/dask/distributed/pull/6585/files#r899011537 this condition would not even be true and I don't think we should implement the full remove_worker cleanup on the stealing extension
The failing test_feed is actually interesting since is performs a cloudpickle roundtrip and asserts that WorkerStates are rountrip-able, i.e. they compare equal after the roundtrip. If we go for a compare by python ID approach, this is clearly not working any longer. I would even go as far as to say that if we compare by python ID we should not allow any serialization of the object. By extension, the same would be true for TaskState objects. Right now, we're not actually relying on this being serializable. The only place where we're serializing this right now is when we're raising a |
5ba8296
to
ba31256
Compare
I opted for a different approach and am using the Worker.id / Server.id attribute now to uniquely identify the WorkerState object. This is based on a type 4 UUID such that I have sufficient confidence in it's uniqueness. The scheduler is verifying that addresses are not overwritten so the |
I kinda dislike the UUID. There simply never should be multiple diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py
index 82d40fd6..75bcd297 100644
--- a/distributed/tests/test_scheduler.py
+++ b/distributed/tests/test_scheduler.py
@@ -423,14 +423,14 @@ async def test_scheduler_init_pulls_blocked_handlers_from_config(s):
@gen_cluster()
async def test_feed(s, a, b):
def func(scheduler):
- return dumps(dict(scheduler.workers))
+ return dumps(list(scheduler.workers))
comm = await connect(s.address)
await comm.write({"op": "feed", "function": dumps(func), "interval": 0.01})
for i in range(5):
response = await comm.read()
- expected = dict(s.workers)
+ expected = list(s.workers)
assert cloudpickle.loads(response) == expected
await comm.close() |
818d207
to
fa090b6
Compare
I consider the UUID useful beyond this deadlock fix. It is the only entity that can uniquely identify a server across the entire cluster. It can be logged, it can be shared, it is meaningful outside of the scheduler process. This is also how we're identifying clients inside and outside of the scheduler. Adding the I would like us to focus on fixing this issue now and not escalate into whether or not certain objects are serializable or roundtrip-able or not. This question popped up a couple of times recently and there doesn't appear to be proper consensus, e.g.
I don't want this conversation blocking the fix of this issue. We can still come back and remove |
I dislike the UUID for TaskStates. It's a fairly expensive function (3us on my beefed-up host), which would make it unsavory for I think I would be happy to just state "thou cannot round-trip a TaskState from scheduler to worker and back". On the other hand, I think that a full pickle dump of the |
The current code is prone to hash collisions. def __eq__(self, other: object) -> bool:
if not isinstance(other, WorkerState):
return False
return hash(self) == hash(other) to def __eq__(self, other: object) -> bool:
return isinstance(other, WorkerState) and other.server_id == self.server_id If you put this one change in, I'm happy to sign off the PR. |
Yes. This is a hard No for TaskStates. I'm simply using an already generated UUID in this case. I would like to keep this ID part of the WorkerState even if we decide to forbid any kind of serialization and we start comparing by IDs
Done. While I'm not overly concerned about hash collisions, it is indeed not a great idea to reuse hash for equal since afaik dicts are using equal to resolve hash collisions... |
Closes #6356
Closes #6198
Closes #6263
#6356 describes a situation where a worker may die and a new worker with the same IP would connect to the scheduler. This could mess up our stealing logic since the WorkerState objects we're referencing there would reference the wrong worker, i.e. state between the scheduler and stealing extension would drift.
With the test in this draft PR I could prove that this is indeed the case. However, so far nothing bad happens. Upon task completion, the scheduler would issue a
Unexpected worker completed task
message and send acancel-compute
event to the worker. The worker would ignore this event since the task is in state memory.Before I fix the state drift I want to poke at this further since this is supposedly responsible for a couple of deadlocks. I would like to confirm that this is the only trigger for these deadlocks and there is nothing else going on.
This could definitely explain how a dead worker is shown on the dashboard #6198 even if the deadlock was unrelated
FWIW I was always suspicious why this
Unexpected worker completed task
was necessary and struggled to reproduce it. This finally sheds some light onto it and I actually hope that we can get rid of this message and therefore thecancel-compute
event entirely.