Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encapsulate Worker.batched_stream.send() #6475

Merged
merged 3 commits into from
Jun 2, 2022

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented May 30, 2022

Encapsulate calls to self.batched_stream.send in a way that

  1. tweaks and harmonizes the many repeated tests for running comms
  2. will be easier to abstract with the WorkerBase class (see (Worker) State Machine determinism and replayability #5736)

and not self.batched_stream.comm.closed()
):
self.batched_stream.send(msg)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method will become an @abc.abstractmethod in the WorkerBase class

len(instructions),
)
else:
self._handle_instructions(instructions)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is old cruft from when the only possible instructions were messages

@github-actions
Copy link
Contributor

github-actions bot commented May 30, 2022

Unit Test Results

       15 files  +       3         15 suites  +3   6h 38m 51s ⏱️ + 1h 57m 38s
  2 829 tests ±       0    2 714 ✔️  -      20    81 💤  -   13  30 +29  4 🔥 +4 
20 965 runs  +4 017  19 949 ✔️ +3 837  946 💤 +111  64 +63  6 🔥 +6 

For more details on these failures and errors, see this check.

Results for commit c4ff4c3. ± Comparison against base commit a341432.

♻️ This comment has been updated with latest results.

@crusaderky crusaderky self-assigned this May 30, 2022
@crusaderky
Copy link
Collaborator Author

Failure in test_transition_counter_max_worker is fixed by #6474

@crusaderky crusaderky linked an issue May 30, 2022 that may be closed by this pull request
@mrocklin
Copy link
Member

cc @gjoseph92 for review if you have time

},
)
elif self._status != Status.closed:
self.loop.call_later(0.05, self._send_worker_status_change, stimulus_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This retry will now be lost. I imagine this case occurs if the worker's status changes before it's connected to the scheduler. I'm not sure if this is important.

As part of #6389, we could pretty easily make BatchedSend support queuing messages before it's been started.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to be necessary when I implemented it - the WorkerState on the scheduler side would never emerge out of initialising state. Now it is not needed anymore.

distributed/worker.py Show resolved Hide resolved
@crusaderky crusaderky force-pushed the WSMR/batched_send branch from cf50bd0 to a258f11 Compare June 1, 2022 14:34
@crusaderky crusaderky marked this pull request as ready for review June 1, 2022 21:56
@crusaderky
Copy link
Collaborator Author

Ready for final review and merge

@crusaderky crusaderky merged commit 69b798d into dask:main Jun 2, 2022
@crusaderky crusaderky deleted the WSMR/batched_send branch June 2, 2022 17:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Yank state machine out of Worker class
3 participants