Skip to content

Commit

Permalink
Minor refactorings and commentary in worker state machine (#5563)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Dec 7, 2021
1 parent ababd21 commit d0b40d3
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 27 deletions.
4 changes: 3 additions & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2796,7 +2796,9 @@ async def test_acquire_replicas_same_channel(c, s, a, b):
await futC
while fut.key not in b.tasks or not b.tasks[fut.key].state == "memory":
await asyncio.sleep(0.005)
assert len(s.who_has[fut.key]) == 2

while len(s.who_has[fut.key]) != 2:
await asyncio.sleep(0.005)

# Ensure that both the replica and an ordinary dependency pass through the
# same communication channel
Expand Down
105 changes: 79 additions & 26 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
"resumed",
}
READY = {"ready", "constrained"}
FETCH_INTENDED = {"missing", "fetch", "flight", "cancelled", "resumed"}

# Worker.status subsets
RUNNING = {Status.running, Status.paused, Status.closing_gracefully}
Expand Down Expand Up @@ -642,8 +643,8 @@ def __init__(
("resumed", "memory"): self.transition_generic_memory,
("resumed", "error"): self.transition_generic_error,
("resumed", "released"): self.transition_generic_released,
("resumed", "waiting"): self.transition_rescheduled_next,
("resumed", "fetch"): self.transition_rescheduled_next,
("resumed", "waiting"): self.transition_resumed_waiting,
("resumed", "fetch"): self.transition_resumed_fetch,
("constrained", "executing"): self.transition_constrained_executing,
("constrained", "released"): self.transition_generic_released,
("error", "released"): self.transition_generic_released,
Expand Down Expand Up @@ -2198,11 +2199,53 @@ def transition_executing_error(
stimulus_id=stimulus_id,
)

def transition_rescheduled_next(self, ts, *, stimulus_id):
next_state = ts._next
recs, smsgs = self.transition_generic_released(ts, stimulus_id=stimulus_id)
recs[ts] = next_state
return recs, smsgs
def transition_resumed_fetch(self, ts, *, stimulus_id):
"""`resumed` is an intermediate degenerate state which splits further up
into two states depending on what the last signal / next state is
intended to be. There are only two viable choices depending on whether
the task is required to be fetched from another worker `resumed(fetch)`
or the task shall be computed on this worker `resumed(waiting)`.
The only viable state transitions ending up here are
flight -> cancelled -> resumed(waiting)
or
executing -> cancelled -> resumed(fetch)
depending on the origin. Equally, only `fetch`, `waiting` or `released`
are allowed output states.
See also `transition_resumed_waiting`
"""
# if the next state is already intended to be fetch or if the
# coro/thread is still running (ts.done==False), this is a noop
if ts._next == "fetch":
return {}, []
ts._next = "fetch"

if ts.done:
next_state = ts._next
recs, smsgs = self.transition_generic_released(ts, stimulus_id=stimulus_id)
recs[ts] = next_state
return recs, smsgs
return {}, []

def transition_resumed_waiting(self, ts, *, stimulus_id):
"""
See transition_resumed_fetch
"""
if ts._next == "waiting":
return {}, []
ts._next = "waiting"

if ts.done:
next_state = ts._next
recs, smsgs = self.transition_generic_released(ts, stimulus_id=stimulus_id)
recs[ts] = next_state
return recs, smsgs
return {}, []

def transition_cancelled_fetch(self, ts, *, stimulus_id):
if ts.done:
Expand Down Expand Up @@ -2330,16 +2373,17 @@ def transition_ready_executing(self, ts, *, stimulus_id):
return {}, []

def transition_flight_fetch(self, ts, *, stimulus_id):
self._in_flight_tasks.discard(ts)
ts.coming_from = None

for w in ts.who_has:
self.pending_data_per_worker[w].append(ts.key)
ts.state = "fetch"
ts.done = False
heapq.heappush(self.data_needed, (ts.priority, ts.key))

return {}, []
# If this transition is called after the flight coroutine has finished,
# we can reset the task and transition to fetch again. If it is not yet
# finished, this should be a no-op
if ts.done:
recommendations, smsgs = self.transition_generic_released(
ts, stimulus_id=stimulus_id
)
recommendations[ts] = "fetch"
return recommendations, smsgs
else:
return {}, []

def transition_flight_error(
self, ts, exception, traceback, exception_text, traceback_text, *, stimulus_id
Expand Down Expand Up @@ -2769,9 +2813,9 @@ def _filter_deps_for_fetch(
Returns
-------
in_flight_keys:
The subset of keys in to_gather_keys in state `flight`
The subset of keys in to_gather_keys in state `flight` or `resumed`
cancelled_keys:
The subset of tasks in to_gather_keys in state `cancelled`
The subset of tasks in to_gather_keys in state `cancelled` or `memory`
cause:
The task to attach startstops of this transfer to
"""
Expand All @@ -2781,9 +2825,18 @@ def _filter_deps_for_fetch(
ts = self.tasks.get(key)
if ts is None:
continue

# At this point, a task has been transitioned fetch->flight
# flight is only allowed to be transitioned into
# {memory, resumed, cancelled}
# resumed and cancelled will block any further transition until this
# coro has been finished

if ts.state in ("flight", "resumed"):
in_flight_tasks.add(ts)
elif ts.state == "cancelled":
# If the key is already in memory, the fetch should not happen which
# is signalled by the cancelled_keys
elif ts.state in {"cancelled", "memory"}:
cancelled_keys.add(key)
else:
raise RuntimeError(
Expand Down Expand Up @@ -3041,14 +3094,14 @@ def update_who_has(self, who_has, *, stimulus_id):
# Do not mutate the input dict. That's rude
workers = set(workers) - {self.address}
dep_ts = self.tasks[dep]
dep_ts.who_has.update(workers)
if dep_ts.state in FETCH_INTENDED:
dep_ts.who_has.update(workers)

if dep_ts.state == "missing":
recommendations[dep_ts] = "fetch"
if dep_ts.state == "missing":
recommendations[dep_ts] = "fetch"

for worker in workers:
self.has_what[worker].add(dep)
if dep_ts.state in ("fetch", "flight", "missing"):
for worker in workers:
self.has_what[worker].add(dep)
self.pending_data_per_worker[worker].append(dep_ts.key)

self.transitions(recommendations=recommendations, stimulus_id=stimulus_id)
Expand Down

0 comments on commit d0b40d3

Please sign in to comment.