-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor ensure_communicating #6165
Conversation
distributed/worker.py
Outdated
@@ -3077,7 +3123,7 @@ async def gather_dep( | |||
for k in self.in_flight_workers[worker]: | |||
ts = self.tasks[k] | |||
recommendations[ts] = tuple(msg.values()) | |||
raise | |||
return GatherDepDoneEvent(stimulus_id=stimulus_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception is now shielded from @log_errors
. Previously it was double reported as it is already logged by logger.exceptions(e)
on line 3117.
distributed/worker.py
Outdated
@@ -3152,7 +3196,6 @@ async def find_missing(self) -> None: | |||
self.periodic_callbacks[ | |||
"find-missing" | |||
].callback_time = self.periodic_callbacks["heartbeat"].callback_time | |||
self.ensure_communicating() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer necessary as _ensure_communicating is now in all transitions to fetch.
@fjetter you may give it a look now or wait until after I've fixed the regressions |
Current status: 8 tests failing. Pending investigation. FAILED distributed/dashboard/tests/test_worker_bokeh.py::test_basicThis is because FAILED distributed/diagnostics/tests/test_eventstream.py::test_eventstreamFAILED distributed/tests/test_scheduler.py::test_decide_worker_coschedule_order_neighbors[nthreads1-1]FAILED distributed/tests/test_worker.py::test_gather_many_smallThis looks like a genuine regression in _select_keys_for_gather - concurrent fetches are being limited to 1 key per worker FAILED distributed/tests/test_worker.py::test_acquire_replicas_already_in_flightFAILED distributed/tests/test_worker.py::test_missing_released_zombie_tasks_2FAILED distributed/tests/test_worker.py::test_gather_dep_cancelled_rescheduledFAILED distributed/tests/test_worker.py::test_gather_dep_do_not_handle_response_of_not_requested_tasks |
df6f189
to
738f7c6
Compare
361b734
to
7569dd8
Compare
distributed/worker.py
Outdated
# compute-task or acquire-replicas command from the scheduler, it allows | ||
# clustering the transfers into less GatherDep instructions; see | ||
# _select_keys_for_gather(). | ||
return {}, [EnsureCommunicatingLater(stimulus_id=stimulus_id)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The alternative to this was to delete _select_keys_for_gather
and either
- Add logic to
_handle_instructions
to squash individual GatherDep instructions on the same worker - Implement no grouping whatsoever and just rely on rpc pooling (needs performance testing)
Both systems would remove the out-of-priority fetch from workers and imply revisiting the limits for concurrent connections.
Either way it would be a major functional change so I opted for this somewhat dirty hack instead which is functionally identical to main.
@@ -2855,13 +2889,16 @@ def stimulus_story( | |||
keys = {e.key if isinstance(e, TaskState) else e for e in keys_or_tasks} | |||
return [ev for ev in self.stimulus_log if getattr(ev, "key", None) in keys] | |||
|
|||
def ensure_communicating(self) -> None: | |||
def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fetching the stimulus_id from outside means that now gather_dep commands will have the stimulus_id of the event that triggered them, e.g.
- compute-task
- acquire-replicas
- find_missing
- GatherDepDoneEvent
|
||
recommendations: Recs = {} | ||
response = {} | ||
to_gather_keys: set[str] = set() | ||
cancelled_keys: set[str] = set() | ||
|
||
def done_event(): | ||
return GatherDepDoneEvent(stimulus_id=f"gather-dep-done-{time()}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Temp hack, to be removed when refactoring gather_dep
self.ensure_communicating() | ||
self.handle_stimulus( | ||
GatherDepDoneEvent(stimulus_id=f"readd-busy-worker-{time()}") | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change this method in a later PR to an async instruction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pedantic, but it feels weird to issue a GatherDepDoneEvent
when that isn't actually what happened. Something like
class BusyWorkerReAddedEvent(GatherDepDoneEvent):
pass
might make it clearer that they're different things, just happen to be handled in the same way (for now).
But if GatherDepDoneEvent
is itself a temporary hack and will be removed soon, then this isn't important.
All tests pass! This is ready for review and merge 🥳 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some naming and design questions, but overall this seems good.
distributed/worker.py
Outdated
stimulus_id=inst.stimulus_id | ||
) | ||
self.transitions(recs, stimulus_id=inst.stimulus_id) | ||
self._handle_instructions(instructions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we recurse into _handle_instructions
here, instead of adding the new instructions onto the end of the current instructions
list (in a safe way)? I'm wondering why the new instructions are treated as "higher priority" than the current ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I overhauled the method, please have another look
distributed/worker_state_machine.py
Outdated
@@ -292,6 +287,12 @@ def to_dict(self) -> dict[str, Any]: | |||
return d | |||
|
|||
|
|||
@dataclass | |||
class EnsureCommunicatingLater(Instruction): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the "later" part of EnsureCommunicatingLater
a little confusing. EnsureCommunicatingOnce
? EnsureCommunicatingIdempotent
?
AFAIU the point of doing this as an instruction (instead of calling _ensure_communicating
directly in many places) is to allow batching of multiple EnsureCommunicating instructions into one, via special logic in _handle_instructions
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not just a matter of doing it once; it must happen after all recommendations to transition to fetch have been enacted.
Renamed to EnsureCommunicatingAfterTransitions.
distributed/worker.py
Outdated
stimulus_id=stimulus_id, | ||
# Note: given n tasks that must be fetched from the same worker, this method | ||
# may generate anywhere between 1 and n GatherDep instructions, as multiple | ||
# tasks may be clustered in the same instruction by _select_keys_for_gather |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# tasks may be clustered in the same instruction by _select_keys_for_gather | |
# tasks may be clustered in the same instruction by _select_keys_for_gather. | |
# The number will be greater than 1 when the tasks don't all fit in `target_message_size`. |
This just took me a few reads to make sense of
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was incorrect to begin with - you'll never have more than one GatherDep from the same worker within the same iteration of ensure_communicating. I rewrote it.
self.ensure_communicating() | ||
self.handle_stimulus( | ||
GatherDepDoneEvent(stimulus_id=f"readd-busy-worker-{time()}") | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pedantic, but it feels weird to issue a GatherDepDoneEvent
when that isn't actually what happened. Something like
class BusyWorkerReAddedEvent(GatherDepDoneEvent):
pass
might make it clearer that they're different things, just happen to be handled in the same way (for now).
But if GatherDepDoneEvent
is itself a temporary hack and will be removed soon, then this isn't important.
distributed/worker_state_machine.py
Outdated
@@ -292,6 +287,12 @@ def to_dict(self) -> dict[str, Any]: | |||
return d | |||
|
|||
|
|||
@dataclass | |||
class EnsureCommunicatingLater(Instruction): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also find it a little odd that unlike other instructions, EnsureCommunicatingLater
doesn't contain any data. It's relying on the side effect of _add_to_data_needed
having already mutated data_needed
and data_needed_per_worker
, but the instruction itself is pointless without that side effect having occurred. I can't think of a cleaner design that avoids this and still has batching though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but the whole instruction is a hack to begin with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The renaming and _handle_instructions
refactor helped, thank you. Just some comment-rewording and type annotations for clarity.
to_gather, total_nbytes = self._select_keys_for_gather( | ||
worker, ts.key, all_keys_to_gather | ||
) | ||
all_keys_to_gather |= to_gather | ||
|
||
self.log.append( | ||
("gather-dependencies", worker, to_gather, stimulus_id, time()) | ||
) | ||
|
||
self.comm_nbytes += total_nbytes | ||
self.in_flight_workers[worker] = to_gather |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.in_flight_workers[worker] = to_gather | |
assert worker not in self.in_flight_workers, self.in_flight_workers[worker] | |
self.in_flight_workers[worker] = to_gather |
Are we guaranteed that in_flight_workers[worker]
is not already set? Because we'd be overwriting it if it is.
EDIT: I think we are because of the if w not in self.in_flight_workers
above. Still might be nice to validate though? If this was not the case, it could probably cause a deadlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it's impossible to be there already due to the line you mentioned just above. I think validation should be overkill since it's directly above
# 1. there are many fetches queued because all workers are in flight | ||
# 2. a single compute-task or acquire-replicas command just sent | ||
# many dependencies to fetch at once. | ||
ensure_communicating = inst |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it even necessary to store the instruction right now (since it's just a sentinel), or could this just be a bool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need the stimulus_id
Co-authored-by: Gabe Joseph <[email protected]>
Co-authored-by: Gabe Joseph <[email protected]>
Co-authored-by: Gabe Joseph <[email protected]>
Co-authored-by: Gabe Joseph <[email protected]>
Partially closes #5896
In scope for this PR
ensure_communicating() -> None
to_ensure_communicating() -> RecsInstr
self.loop.add_callback(self.gather_dep, ...)
ensure_communicating
is no longer called periodically "just in case" - neither fromevery_cycle
nor fromfind_missing
Out of scope for this PR, but in scope for #5896
gather_dep
GatherDepDoneEvent
(introduced in this PR)_readd_busy_worker
as an async instructionOut of scope for #5896
find_missing
EnsureCommunicatingLater
(introduced in this PR) and_select_keys_for_gather