Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Functional change for get_worker #7696

Open
quasiben opened this issue Mar 22, 2023 · 10 comments
Open

Functional change for get_worker #7696

quasiben opened this issue Mar 22, 2023 · 10 comments

Comments

@quasiben
Copy link
Member

In #7580 we removed the ability for get_worker to return non-task worker functions to return the worker. This is a confusing statement -- essentially for the following is no longer supported

from dask.distributed import Client, get_worker

def main():
    c = Client()
    def foo():
        worker = get_worker()
        print(worker)

    c.run(foo)

if __name__ == "__main__":
    main()

client.run runs functions on each worker but there is no task so get_worker returns None. This is functionality RAPIDS folks have been using like the following:

https://github.com/rapidsai/raft/blob/05d899b36b76545d2439dbe47e4659d644ced227/python/raft-dask/raft_dask/common/comms.py#L410-L414

We are fixing this by relying on the dask_worker arg for client.run functions:

If your function takes an input argument named dask_worker then that variable will be populated with the worker itself.

We have a fix for this in Dask-CUDA and working on one for RAFT. I think we should include a statement like the following to warn legacy users of the change get_worker:

If you need to get the worker in a non-task function like client.run please use dask_worker in the function argument (xref: client.run)

@quasiben
Copy link
Member Author

Another alternative is to add the functionality back in with a specific flag for doing so:

diff --git a/distributed/worker.py b/distributed/worker.py
index a4d4e053..1427bd8f 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -2684,7 +2684,7 @@ class Worker(BaseWorker, ServerNode):
         return self.transfer_outgoing_count_limit


-def get_worker() -> Worker:
+def get_worker(ignore_tasks=False) -> Worker:
     """Get the worker currently running this task

     Examples
@@ -2705,6 +2705,10 @@ def get_worker() -> Worker:
     try:
         return thread_state.execution_state["worker"]
     except AttributeError:
+        if ignore_tasks:
+            from tlz import first
+            return first(w for w in Worker._instances if w.status in
+                    WORKER_ANY_RUNNING)
         raise ValueError("No worker found") from None

I think either approach should work. @fjetter if you have time can you weigh in ?

@fjetter
Copy link
Member

fjetter commented Mar 22, 2023

If you are using client.run and need the worker, the best way to do so is to include an argument dask_worker to the function and we're injecting this for you, e.g.

# replace this
def foo():
    get_worker().address

# with this
def foo(dask_worker):
    dask_worker.address

@fjetter
Copy link
Member

fjetter commented Mar 22, 2023

Looks like this is the approach you are taking in rapidsai/raft#1365
This would be my preferred way of doing it. I'm very open to polishing this injection behavior.

If this is not good enough for you, we can look into making Worker.run context aware. I believe the only reason was scope creep because execution_state is actually only set in a worker thread but that doesn't necessarily need to be this way. We could also move this to ContextVars, I believe. That'd be a bit of work and I figured the injection mechanism would be sufficient

@pentschev
Copy link
Member

I'm ok with that change, but I think this has caused some confusion, and I'm confused myself. For example, I can't run get_worker() from an async function that runs via client.submit, but I can if the function is non-async, whereas client.run can be used to pass dask_worker for both async and non-async functions, this is how I had to workaround so far: rapidsai/dask-cuda@884a595 .

Is the above expected? Can we use get_worker() on an async function that is executed via client.submit somehow?

@fjetter
Copy link
Member

fjetter commented Mar 22, 2023

For example, I can't run get_worker() from an async function that runs via client.submit,

The behavior of async functions is a bit odd nowadays. The intention is to run those in a dedicated thread as well which would fix this problem for you, see #7339 Nothing is blocking this PR other than me finding time. If this is important for you, we can push this PR over the finishing line.

@fjetter
Copy link
Member

fjetter commented Mar 22, 2023

I rebased #7339 and if CI is green-ish we can merge this to fix your async submit problem.

@fjetter
Copy link
Member

fjetter commented Mar 22, 2023

I looked at rapidsai/dask-cuda@884a595 and argue this "workaround" is how it is supposed to be. client.submit is intended to run user functions (on a thread) while client.run is intended to run a function/coroutine on the worker event loop, in the main thread, outside of our task scheduling. If you are actually inspecting the worker in your test, that's how you should do it

@pentschev
Copy link
Member

Thanks for the details @fjetter , that makes sense to me. It seems to me that #7339 is also a good improvement.

@pentschev
Copy link
Member

Just one small request for #7339 , it would be great if it could be left to merge after the upcoming release this week. We're getting into code-freeze and we've managed to fix the issues raised by #7580 , so I'd prefer if we could play it safe to avoid any potential breakage in the very short term.

@fjetter
Copy link
Member

fjetter commented Mar 23, 2023

it would be great if it could be left to merge after the upcoming release this week.

Not a problem. There are still some test failures anyhow and this is not top priority so this aligns well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants