Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add get_dask_client #33

Merged
merged 24 commits into from
Oct 3, 2022
Merged

Add get_dask_client #33

merged 24 commits into from
Oct 3, 2022

Conversation

ahuang11
Copy link
Contributor

@ahuang11 ahuang11 commented Sep 27, 2022

Summary

Adds get_dask_client.

This is intended to be called within tasks that run on workers, and is useful for operating on dask collections, such as a dask.DataFrame.

Without invoking this, workers in a task do not automatically get a client to connect to the full cluster. Therefore, it will attempt perform work within the worker itself serially, and potentially overwhelming the single worker.

Internally, this context manager is a simple wrapper around distributed.worker_client with separate_thread=False fixed.

With it:
image

Without it (a single worker takes on the entire job):
image

Why a separate util function, from Michael:
"""
I think we should document our required pattern for accessing the client. If we want, we can add a utility to the library that, as you noted, just sets the required keyword argument. The utility function is nice because:

  • We can maintain it over time without our users having to change their invocations
  • It is discoverable without reading documentation — otherwise I'd never guess that I need to pass separate_thread
  • We can account for both sync and async invocations
    """

Relevant Issue(s)

Closes #26

Checklist

README.md Outdated Show resolved Hide resolved
tests/test_utils.py Outdated Show resolved Hide resolved
tests/test_utils.py Outdated Show resolved Hide resolved
tests/test_utils.py Outdated Show resolved Hide resolved
tests/test_utils.py Outdated Show resolved Hide resolved
tests/test_utils.py Outdated Show resolved Hide resolved
tests/test_utils.py Outdated Show resolved Hide resolved
tests/test_utils.py Outdated Show resolved Hide resolved
@ahuang11
Copy link
Contributor Author

I was able to get it to connect to existing client, but unable to get asynchronous working because if I make this is async, then users need to call async with get_dask_client

Also, I couldn't get sync_compatible working on it.

Thoughts?

Copy link

@scharlottej13 scharlottej13 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for your work on this @ahuang11!

README.md Outdated Show resolved Hide resolved
README.md Outdated Show resolved Hide resolved
@ahuang11
Copy link
Contributor Author

I think this is almost ready; just missing the @sync_compatible piece:

import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client

@task
async def compute_task():
    with get_dask_client() as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = await client.compute(df.describe())
    return summary_df

@flow(task_runner=DaskTaskRunner())
async def dask_flow():
    prefect_future = await compute_task.submit()
    return prefect_future.result()

await dask_flow()

results in:

/Users/andrew/mambaforge/envs/prefect-dask/lib/python3.9/site-packages/distributed/client.py:1405: RuntimeWarning: coroutine 'wait_for' was never awaited
  self.close()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/Users/andrew/mambaforge/envs/prefect-dask/lib/python3.9/site-packages/distributed/client.py:1405: RuntimeWarning: coroutine 'Client._close' was never awaited
  self.close()
...
TypeError: cannot pickle 'coroutine' object

@ahuang11 ahuang11 requested review from scharlottej13 and zanieb and removed request for scharlottej13 September 29, 2022 22:25
scharlottej13
scharlottej13 previously approved these changes Sep 29, 2022
prefect_dask/utils.py Outdated Show resolved Hide resolved
Copy link
Contributor

@zanieb zanieb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good! I have some minor questions and comments.

CHANGELOG.md Outdated Show resolved Hide resolved
README.md Show resolved Hide resolved
README.md Outdated
Comment on lines 180 to 182
However you must `await client.compute` before exiting out of the context manager.

Running `await dask_collection.compute()` will result in an error: `TypeError: 'coroutine' object is not iterable`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems contrary, can you clarify? Is the second bit a Dask bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will have to try with dask alone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import dask
from dask.distributed import Client

async with Client(asynchronous=True) as client:
    df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
    print(type(df))
    print(type(df.describe()))
    print(type(df.describe().compute())) # errors on this line here
    summary_df = df.describe().compute()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this works

import dask
from dask.distributed import Client, wait

async with Client(asynchronous=True) as client:
    df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
    summary_df = await df.describe().compute(sync=False)[0].result()

prefect_dask/exceptions.py Outdated Show resolved Hide resolved
prefect_dask/task_runners.py Outdated Show resolved Hide resolved
prefect_dask/utils.py Outdated Show resolved Hide resolved
prefect_dask/utils.py Outdated Show resolved Hide resolved
@ahuang11 ahuang11 requested a review from zanieb October 3, 2022 15:59
zanieb
zanieb previously approved these changes Oct 3, 2022
Copy link
Contributor

@zanieb zanieb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for dealing with all the comments!

README.md Outdated Show resolved Hide resolved
prefect_dask/utils.py Outdated Show resolved Hide resolved
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

How to best use Dask objects in a Prefect task
4 participants