Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Refactor communication out of transition functions #4343

Closed
wants to merge 12 commits into from

Conversation

jakirkham
Copy link
Member

Currently transition functions alternate between working on the task graph a bit and communicate out to workers and clients. As it is, this is pretty interwoven. Sometimes going back and forth between these two modes of processing multiple times in the same function. This makes it a little tricky to optimize the transition functions further as often the communication is handled in high-level Python and doesn't improve much through Cythonization. Further this makes a bit tricky to pull apart the transitions functions and task graph state from the Scheduler as it is a bit too reliant on these communication steps.

To attempt to remedy this situation, this PR tries to move all of the communication out of the transition functions. It does this by trying to return all messages to be communicated and handling those in transition after each transition function completes. While transition functions and communication still effectively interleave at a higher level, this should make it easier to separate out the transition functions along with relevant state for further optimization.

distributed/scheduler.py Outdated Show resolved Hide resolved
@jakirkham
Copy link
Member Author

This still has some warts. That said, it would be good to get a sense of whether this change seems reasonable and what issues we might expect going with this approach (which may in turn inform how the warts are addressed 😉).


def transition_released_waiting(self, key):
try:
ts: TaskState = self.tasks[key]
dts: TaskState
worker_msgs: dict = {}
report_msg: dict = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be None for the seemingly common case of no messages to report?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well as we have for-loops back in transition that try to iterate over these, we would need to add logic to check for None somehow. Atm this seems pretty straightforward even if there are cases where one of these doesn't contain something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add that ultimately my hope is this becomes a message object, which we pass from transition functions and the communication side of the Scheduler. For now it is sort of an arbitrary list of things we have found need to be passed around. So I think now that we have identified what those things that we message are, this should be easier to handle.

@mrocklin
Copy link
Member

At first glance this seems sensible to me. I would prefer that we not merge it until we get further along down this path. It seems like a decent path to go down though. (I'm thinking that we merge once we have a Python class that manages the scheduler state machine).

@jakirkham
Copy link
Member Author

Thanks Matt! That makes sense. Wanted to share my thought process before going too far down this path.

Atm am seeing some hangs in the test suite. Not sure why that is yet. Am curious does it matter what state the Scheduler's task graph is in before it communicates with workers? Also should we be concerned about message ordering at all?

@jakirkham
Copy link
Member Author

Atm am seeing some hangs in the test suite. Not sure why that is yet. Am curious does it matter what state the Scheduler's task graph is in before it communicates with workers? Also should we be concerned about message ordering at all?

Figured it out. Was overlooking another call to _add_to_memory so was not handling the report calls needed there. Fixing that fixed the issue.

@jakirkham
Copy link
Member Author

Here's the call graph from the Scheduler that I see with these changes. This is best compared with the recent benchmark results here ( quasiben/dask-scheduler-performance#51 ).

From this we see that transition_memory_released also drops out (as it was only in the call graph at this point due to communication). The only relevant transitions are transition_processing_memory and transition_waiting_processing. The latter also takes significantly less time as part of its work was communication as well (though not entirely).

Additionally we now see communication that stems from transition, which is expected. It also takes basically the same amount of time. Again this is expected. We've only shifted where the communication happens. Not how much of it occurs.

prof_68726 pstat dot

@jakirkham jakirkham force-pushed the ref_trans_comm branch 3 times, most recently from 58d1175 to 5ea450f Compare December 11, 2020 19:32
@jakirkham
Copy link
Member Author

jakirkham commented Dec 11, 2020

Have refactored things away from passing TaskState objects to things like report_on_key after they (and the Scheduler graph as a whole) have been modified. Instead we now construct messages for each Client in the same way we do for each worker as part of the transition. As a result we are now returning dicts of addresses with messages for them from transitions. One for the clients and one for the workers.

@jakirkham
Copy link
Member Author

Is there anything special we should be doing with the "fire-and-forget" Client? Does it receive messages? IIUC this Client is really just on the Scheduler (maybe not even a full Client in the same sense as a user created one). Though want to make sure there's nothing I'm missing here.

distributed/scheduler.py Outdated Show resolved Hide resolved
Provides a way for callers to simply construct the message if they are
not wanting to send it yet.
This provides us a way to effectively call `client_releases_keys` from
other transitions without starting a new transition of its own.
Separates out the code needed to build a message for `report` based on
the `TaskState` in question from the actual call to `self.report`.
This converts a `TaskState` into a `dict` of messages with the keys
being the Clients to notify and the message being the report message.
Allows us to think of messages simply in terms of the message and where
it needs to be delivered without needing to know anything about the
`TaskState` it came from or the `ClientState`s involved.
Instead of collecting a message to pass to `report` and letting the
relevant Clients be collected from the `TaskState` information later, go
ahead and collect that immediately while handling that `TaskState`.
These Clients then form the keys of `client_msgs` where the message
contains what was in `report_msg`. This allows us to keep all the
`TaskState` work contained to where it is relevant and can be handled
efficiently. Then the messaging out to Clients only needs be concerned
with the messages and where they go without needing to worry about what
they pertain to.
@jakirkham
Copy link
Member Author

Closing and continuing in PR ( #4365 ).

@jakirkham jakirkham closed this Dec 15, 2020
@jakirkham jakirkham deleted the ref_trans_comm branch December 15, 2020 15:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants