Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trouble using get_dask_client with a Coiled cluster #12971

Open
scharlottej13 opened this issue Mar 16, 2023 · 7 comments
Open

Trouble using get_dask_client with a Coiled cluster #12971

scharlottej13 opened this issue Mar 16, 2023 · 7 comments

Comments

@scharlottej13
Copy link
Contributor

Hi! I noticed recently that get_dask_client (added in PrefectHQ/prefect-dask#33) is not working with a Coiled cluster. I'm using Prefect Cloud.

Problem

The following snippet does not work using a Coiled cluster (but does work with dask.distributed.LocalCluster):

import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client


coiled_runner = DaskTaskRunner(
    cluster_class="coiled.Cluster",
    cluster_kwargs={
        "name": "get-dask-client",
    },
)

@task
def compute_task():
    with get_dask_client() as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = df.describe().compute()
    return summary_df


@flow(task_runner=coiled_runner)
def dask_flow():
    prefect_future = compute_task.submit()
    return prefect_future.result()


if __name__ == "__main__":
    dask_flow()

When I run the above snippet I get a TypeError: TLS expects a ssl_context argument of type ssl.SSLContext (perhaps check your TLS configuration?) Instead got None.

Full traceback
Traceback (most recent call last):
  File "/Users/sarahj/Downloads/prefect-dask-client.py", line 31, in <module>
    dask_flow()
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/flows.py", line 468, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/engine.py", line 175, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/engine.py", line 256, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/engine.py", line 665, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/Users/sarahj/Downloads/prefect-dask-client.py", line 27, in dask_flow
    return prefect_future.result()
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/futures.py", line 226, in result
    return sync(
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 267, in sync
    return run_async_from_worker_thread(__async_fn, *args, **kwargs)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/opt/coiled/env/lib/python3.9/site-packages/prefect/engine.py", line 1533, in orchestrate_task_run
  File "/opt/coiled/env/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
  File "/opt/coiled/env/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
  File "/opt/coiled/env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
  File "/opt/coiled/env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
  File "/Users/sarahj/Downloads/prefect-dask-client.py", line 18, in compute_task
    with get_dask_client() as client:  # noqa
  File "/opt/coiled/env/lib/python3.9/contextlib.py", line 119, in __enter__
    return next(self.gen)
  File "/opt/coiled/env/lib/python3.9/site-packages/prefect_dask/utils.py", line 103, in get_dask_client
  File "/opt/coiled/env/lib/python3.9/site-packages/distributed/client.py", line 988, in __init__
  File "/opt/coiled/env/lib/python3.9/site-packages/distributed/client.py", line 1185, in start
  File "/opt/coiled/env/lib/python3.9/site-packages/distributed/utils.py", line 405, in sync
  File "/opt/coiled/env/lib/python3.9/site-packages/distributed/utils.py", line 378, in f
  File "/opt/coiled/env/lib/python3.9/site-packages/tornado/gen.py", line 769, in run
  File "/opt/coiled/env/lib/python3.9/site-packages/distributed/client.py", line 1265, in _start
  File "/opt/coiled/env/lib/python3.9/site-packages/distributed/client.py", line 1328, in _ensure_connected
  File "/opt/coiled/env/lib/python3.9/site-packages/distributed/comm/core.py", line 291, in connect
  File "/opt/coiled/env/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
  File "/opt/coiled/env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 488, in connect
  File "/opt/coiled/env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 543, in _get_connect_args
  File "/opt/coiled/env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 401, in _expect_tls_context
TypeError: TLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?) Instead got None

Looking at the full traceback, it seems _expect_tls_context in https://github.com/dask/distributed/blob/main/distributed/comm/tcp.py expects TLS arguments, but is not getting the information needed.

Since get_dask_client is a utility around worker_client, I checked to see if using worker_client works with a Coiled cluster, and it does:

import dask
from dask.distributed import worker_client
from prefect import flow, task
from prefect_dask import DaskTaskRunner

coiled_runner = DaskTaskRunner(
    cluster_class="coiled.Cluster",
    cluster_kwargs={
        "name": "prefect-worker-client",
    },
)


@task
def calling_compute_in_a_task():
    with worker_client(separate_thread=False) as client:
        df = dask.datasets.timeseries("2000", "2005", partition_freq="2w")
        summary_df = df.describe()
        client.compute(summary_df)


@flow(task_runner=coiled_runner)
def test_flow():
    calling_compute_in_a_task.submit()


if __name__ == "__main__":
    test_flow()

Potential solution?

If I get the client the way dask.distributed.worker_client does, then the aforementioned failing snippet works. ie changing (lines 103-104 in get_dask_client) from:

with Client(**client_kwargs) as client:
    yield client

to:

client = get_client(timeout=timeout)
yield client

It seems like there are other arguments get_dask_client may need, so not saying this exactly will work, but thought it'd be a helpful start.

@ghislainp
Copy link

the get_dask_client function seems not to work when using an external dask cluster declared with DaskTaskRunner(address=data_cluster_address). I get an error "No global client found and no address provided". Below is one of the examples in README.md slightly adapted to run on an external dask cluster.

I investigated a bit and neither get_worker nor get_client work. It seems that the task 'process_data' is unable to know that it's running in a dask worker. I suspect this is related to other problems with external dask cluster, such as PrefectHQ/prefect-dask#47

I hacked prefect_dask.utils._generate_client_kwargs to use client_kwargs['address'] instead of calling get_client().scheduler.address, and it works now. I can make a PR.
However, this is still not satisfactorily, because I need to secede and rejoin my task as explained here https://distributed.dask.org/en/stable/task-launch.html because I deadlock my cluster very easily.

I think that scheduling task with prefect_dask on a cluster, and running calculations with the same cluster is a useful pattern to deal with memory problems, and to monitor what happens on the cluster with the dask dashboard. Though, I'm open to other options, because I've been stuck on this for months...

import dask.dataframe
import dask.distributed
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client

# client = dask.distributed.Client()

@task
def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
    df = dask.datasets.timeseries(start, end, partition_freq="4w")
    return df

@task
def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:
    with get_dask_client():
        df_yearly_avg = df.groupby(df.index.year).mean()
        return df_yearly_avg.compute()

@flow(task_runner=DaskTaskRunner(address="tcp://127.0.0.1:8786"))
def dask_pipeline():
    df = read_data.submit("1988", "2022")
    df_yearly_average = process_data.submit(df)
    return df_yearly_average

# dask_pipeline()

@fjetter
Copy link

fjetter commented Apr 27, 2023

I briefly peeked into the implementation and noticed that the get_dask_client method is initializing a new client every time.

Dask offers an API that tries to use already existing clients and ensure the task cannot deadlock, see https://distributed.dask.org/en/stable/api.html#distributed.worker_client

and https://distributed.dask.org/en/stable/task-launch.html

It's strongly recommend to use the worker_client contextmanager for this application. I briefly skimmed PrefectHQ/prefect-dask#33 but couldn't find an explanation why this was not used

@zanieb
Copy link
Contributor

zanieb commented Apr 27, 2023

Hey @fjetter thanks for taking a look!

I think we weren't using the worker_client context manager because it did not support retrieval of an asynchronous client which we need for asynchronous tasks.

@dchudz
Copy link

dchudz commented May 15, 2023

FWIW I think there's an issue here for async worker_client: dask/distributed#5513.

@jrbourbeau
Copy link
Contributor

I tried running this example again using worker_client and the main branch of distributed (which contains dask/distributed#7844) and things seemed to work as expected

import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client
from distributed import worker_client


coiled_runner = DaskTaskRunner(
    cluster_class="coiled.Cluster",
    cluster_kwargs={
        "name": "get-dask-client",
    },
)

@task
def compute_task():
    # with get_dask_client() as client:
    with worker_client() as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = df.describe().compute()
    return summary_df


@flow(task_runner=coiled_runner)
def dask_flow():
    prefect_future = compute_task.submit()
    return prefect_future.result()


if __name__ == "__main__":
    dask_flow()

@madkinsz do you see something similar? Is there a reason to still prefer get_dask_client over worker_client?

@zanieb
Copy link
Contributor

zanieb commented Jun 1, 2023

@jrbourbeau I think we're just providing get_dask_client for symmetry with get_async_dask_client which we need to expose for users writing asynchronous tasks e.g.

from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_async_dask_client
import distributed

@task
async def atest_task():

    async with get_async_dask_client() as client:
        await client.submit(lambda x: x, 1)

    with distributed.worker_client() as client:
        await client.submit(lambda x: x, 1)
        # AttributeError: 'int' object has no attribute '__await__'



@flow(task_runner=DaskTaskRunner())
def test_flow():
    atest_task.submit()


if __name__ == "__main__":
    test_flow()

@giorgiobasile
Copy link
Contributor

giorgiobasile commented Aug 4, 2023

Hi, I just wanted to mention that I am hitting a similar problem while trying to instantiate a DaskTaskRunner connecting to a remote cluster preemptively instantiated with the Dask Gateway instance of the Microsoft Planetary Computer.
Unfortunately, my code doesn't add much to what has been shared so far, except that I hit the ssl_context-related exception as soon as the flow is started, instead of waiting for any task submission or get_dask_client() call.

Here is my code:

import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def compute_task():
    with get_dask_client() as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = client.compute(df.describe())
    return summary_df

@flow(task_runner=DaskTaskRunner(address="gateway://pccompute-dask.westeurope.cloudapp.azure.com:80/prod.530b7bd7e4164670bac990661b5fbdc4"))
def dask_flow():
    prefect_future = compute_task.submit()
    return prefect_future.result()


if __name__ == "__main__":
    dask_flow()
    

Output:

16:55:35.269 | INFO    | prefect.engine - Created flow run 'papaya-mouflon' for flow 'dask-flow'
16:55:35.270 | INFO    | prefect.task_runner.dask - Connecting to an existing Dask cluster at gateway://pccompute-dask.westeurope.cloudapp.azure.com:80/prod.530b7bd7e4164670bac990661b5fbdc4
16:55:35.271 | ERROR   | Flow run 'papaya-mouflon' - Crash detected! Execution was interrupted by an unexpected exception: TypeError: Gateway expects a `ssl_context` argument of type ssl.SSLContext, instead got None
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/test_dask_runner.py", line 43, in <module>
    dask_flow()
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/flows.py", line 540, in __call__
    return enter_flow_run_engine_from_flow_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/engine.py", line 272, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.4/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/engine.py", line 364, in create_then_begin_flow_run
    state = await begin_flow_run(
            ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/engine.py", line 509, in begin_flow_run
    flow_run_context.task_runner = await stack.enter_async_context(
                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.4/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py", line 638, in enter_async_context
    result = await _enter(cm)
             ^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.4/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py", line 204, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/task_runners.py", line 185, in start
    await self._start(exit_stack)
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect_dask/task_runners.py", line 331, in _start
    self._client = await exit_stack.enter_async_context(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.4/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py", line 638, in enter_async_context
    result = await _enter(cm)
             ^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/distributed/client.py", line 1457, in __aenter__
    await self
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/distributed/client.py", line 1268, in _start
    await self._ensure_connected(timeout=timeout)
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/distributed/client.py", line 1331, in _ensure_connected
    comm = await connect(
           ^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/distributed/comm/core.py", line 292, in connect
    comm = await wait_for(
           ^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/distributed/utils.py", line 1807, in wait_for
    return await fut
           ^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/dask_gateway/comm.py", line 39, in connect
    raise TypeError(
TypeError: Gateway expects a `ssl_context` argument of type ssl.SSLContext, instead got None

EDIT:
In my case the error was due to SSL info not being passed to the task runner, as explained here. Setting the security parameter solves the issue.

@desertaxle desertaxle transferred this issue from PrefectHQ/prefect-dask Apr 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants