Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can not get client when not on worker thread #7763

Open
mrocklin opened this issue Apr 7, 2023 · 6 comments · May be fixed by #7912
Open

Can not get client when not on worker thread #7763

mrocklin opened this issue Apr 7, 2023 · 6 comments · May be fixed by #7912

Comments

@mrocklin
Copy link
Member

mrocklin commented Apr 7, 2023

from dask.distributed import Client, get_client
client = Client()
def f():
    get_client()
    return 1
client.submit(f).result() # works fine
client.run(f)  # fails
File ~/mambaforge/lib/python3.9/site-packages/distributed/client.py:2965, in Client.run(self, function, workers, wait, nanny, on_error, *args, **kwargs)
   2882 def run(
   2883     self,
   2884     function,
   (...)
   2890     **kwargs,
   2891 ):
   2892     """
   2893     Run a function on all workers outside of task scheduling system
   2894
   (...)
   2963     >>> c.run(print_state, wait=False)  # doctest: +SKIP
   2964     """
-> 2965     return self.sync(
   2966         self._run,
   2967         function,
   2968         *args,
   2969         workers=workers,
   2970         wait=wait,
   2971         nanny=nanny,
   2972         on_error=on_error,
   2973         **kwargs,
   2974     )

File ~/mambaforge/lib/python3.9/site-packages/distributed/utils.py:349, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    347     return future
    348 else:
--> 349     return sync(
    350         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    351     )

File ~/mambaforge/lib/python3.9/site-packages/distributed/utils.py:416, in sync(loop, func, callback_timeout, *args, **kwargs)
    414 if error:
    415     typ, exc, tb = error
--> 416     raise exc.with_traceback(tb)
    417 else:
    418     return result

File ~/mambaforge/lib/python3.9/site-packages/distributed/utils.py:389, in sync.<locals>.f()
    387         future = wait_for(future, callback_timeout)
    388     future = asyncio.ensure_future(future)
--> 389     result = yield future
    390 except Exception:
    391     error = sys.exc_info()

File ~/mambaforge/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File ~/mambaforge/lib/python3.9/site-packages/distributed/client.py:2870, in Client._run(self, function, nanny, workers, wait, on_error, *args, **kwargs)
   2867     continue
   2869 if on_error == "raise":
-> 2870     raise exc
   2871 elif on_error == "return":
   2872     results[key] = exc

Input In [5], in f()
      1 def f():
----> 2     get_client()
      3     return 1

File ~/mambaforge/lib/python3.9/site-packages/distributed/worker.py:2782, in get_client()
   2780     return Client(address, timeout=timeout)
   2781 else:
-> 2782     raise ValueError("No global client found and no address provided")

ValueError: No global client found and no address provided
@mrocklin
Copy link
Member Author

mrocklin commented Apr 7, 2023

Not a huge deal here. I have a workaround for my immediate situation. Still, it's a little strange.

@fjetter
Copy link
Member

fjetter commented Apr 27, 2023

That was actually an intentional change in #7580 The semantics around get_worker that is used internally by get_client were a bit messed up and I corrected this.

Can you elaborate how you were using this? I find this usage of Client.run a bit strange (The exception could be better)

Edit: just saw #7762

@fjetter
Copy link
Member

fjetter commented Apr 27, 2023

This is related to #7696

@zanieb
Copy link
Contributor

zanieb commented Apr 27, 2023

Hey @fjetter this also breaks Prefect — we provide a utility that uses get_client to retrieve a client on a worker for users within a task e.g.

from distributed import get_client
from prefect import flow, task

from prefect_dask import DaskTaskRunner


@task
def test_task():
    # normally this would use our utility, but for simplicity we call directly
    client = get_client()
    # ValueError: No global client found and no address provided


@flow(task_runner=DaskTaskRunner())
def test_flow():
    test_task.submit()


if __name__ == "__main__":
    test_flow()

I believe the issue here is that we submit a wrapper for tasks to Dask so we orchestrate the task in an async function on a Dask worker but actually run the task in a thread we manage. In that thread, the client cannot be retrieved.

Perhaps related to #7339 as well although installing that branch does not fix the error.

@jrbourbeau
Copy link
Member

I believe the issue here is that we submit a wrapper for tasks to Dask so we orchestrate the task in an async function on a Dask worker but actually run the task in a thread we manage. In that thread, the client cannot be retrieved.

@madkinsz I just tried the latest main branch of distributed out (which includes #7844) and the example you posted now passes. Could you confirm things are working as expected again on your end?

@zanieb
Copy link
Contributor

zanieb commented Jun 1, 2023

Thanks @jrbourbeau! With a less minimal example, it looks like things are working for us:

from prefect import flow, task

from prefect_dask import DaskTaskRunner, get_async_dask_client, get_dask_client


@task
def test_task():
    with get_dask_client() as client:
        client.submit(lambda x: x, 1).result()


@task
async def atest_task():
    async with get_async_dask_client() as client:
        await client.submit(lambda x: x, 1)


@flow(task_runner=DaskTaskRunner())
def test_flow():
    test_task.submit()
    atest_task.submit()


if __name__ == "__main__":
    test_flow()

I'm also running our CI against distributed@main at PrefectHQ/prefect-dask#103

graingert added a commit to graingert/distributed that referenced this issue Jun 14, 2023
@graingert graingert linked a pull request Jun 14, 2023 that will close this issue
2 tasks
graingert added a commit to graingert/distributed that referenced this issue Jun 15, 2023
graingert added a commit to graingert/distributed that referenced this issue Jun 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants