Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@delayed on dask distributed and intake.to_dask don't seem to work together #5756

Closed
JoranDox opened this issue Feb 4, 2022 · 6 comments
Closed
Labels
needs info Needs further information from the user

Comments

@JoranDox
Copy link

JoranDox commented Feb 4, 2022

What happened:
We tried using a delayed function together with an intake catalog to parallelise reading multiple days of data (because of unrelated reasons, we can't read multiple days at once directly in intake at the moment). Using a dask cluster, these delayed functions are perfectly split over the workers (about 4-5 tasks per worker) but the resulting new tasks (over 3000 at some point) from this function all get scheduled on worker #0, which doesn't make a lot of sense. Is this a bug in how dask(/distributed) works from inside a delayed function, or are we doing something fundamentally wrong?
Notice how one worker (in the worker view, worker #0) is doing almost all the tasks, and also has lots of data put to disk.

image

What you expected to happen:

Work distributed over the workers

Minimal Complete Verifiable Example:

Not quite minimal or verifiable, but wanted to give some code as example of what is happening (generalised some var/catalog names but mostly left it as-is):

  • cat is predefined as a catalog
  • guids is a list of guids (as test we published it as a dataset, but I don't think that part is related)
@delayed
def get_data(date):
    # compute year, month, day from date object
    year, month, day = str(date.year), str(date.month).zfill(2), str(date.day).zfill(2)
    # get dask dataframe for one day from catalog
    df = cat.subcat().weekly_df(year=year, month=month, day=day).to_dask()
    df_history = cat.subcat().other_df(year=year, month=month, day=day).to_dask()
    df = df.join(df_history, how='outer')
    df.habit_score = df.habit_score.astype(float)

    df['day'] = date
    df = df.compute()
    guids = distributed.get_client().get_dataset('guids')
    return df[df.index.isin(guids)]

dfs = [get_data(day).persist() for day in pd.date_range(d1, d2) if day.weekday() == 6]
wait(dfs)

It looks like the join between the two dataframes is scheduled form each worker, all on worker 0. Is this something stupid we're missing or a default dask setting that may be broken in our setup or something?

Anything else we need to know?:

Environment:

  • Dask version: 2021.12.0
  • Python version: Python 3.9.7
  • Operating System: Linux (docker in kubernetes, based on ubuntu I believe, based on a jupyterhub image I believe)
  • Install method (conda, pip, source): packed with the jupyterhub base image (I think conda in the image?)
@JoranDox
Copy link
Author

JoranDox commented Feb 8, 2022

Update:

we can force it to split over the workers using this snippet:

@delayed
def get_data(...):
    ...
    ip = distributed.utils.get_ip()
    df = df.compute(workers=ip)
    ...

And now it's correctly split over each worker that initially got the get_data function, but it reinforces my idea that something's bugged when doing the compute() from within a @delayed function. Maybe the code path without workers=... is doing something weird?

@ian-r-rose
Copy link
Collaborator

Hi @JoranDox, usually you don't want to create and compute dask collections (in this case your Dask dataframe) from within Delayeds. Instead, you will typically create them on the client, then submit those to your scheduler directly. In some cases delayed tasks can produce and submit work to the cluster, but those are typically pretty advanced workflows, and I'd recommend you avoid it unless you need it.

So, in your case, I'd try reworking things into something like this (pseudocode):

def get_dask_dataframe(date):
    # compute year, month, day from date object
    year, month, day = str(date.year), str(date.month).zfill(2), str(date.day).zfill(2)
    # get dask dataframe for one day from catalog
    df = cat.subcat().weekly_df(year=year, month=month, day=day).to_dask()
    df_history = cat.subcat().other_df(year=year, month=month, day=day).to_dask()
    df = df.join(df_history, how='outer')
    df.habit_score = df.habit_score.astype(float)

    df['day'] = date
    return df

dfs = [get_dask_dataframe(day) for day in pd.date_range(d1, d2) if day.weekday() == 6]
dask.compute(dfs)

@ian-r-rose ian-r-rose added the needs info Needs further information from the user label Feb 16, 2022
@ian-r-rose
Copy link
Collaborator

Closing due to lack of response, but feel free to ping here or at the Dask Discourse if you want to continue the conversation.

@JoranDox
Copy link
Author

Huh, weird, I'm sure I typed a response, but maybe forgot to send?

Anyway the tldr was that it takes a nonnegligible amount of time to fetch the file metadata, but we're looking in alternative ways to parallelise that (e.g. python's multiprocessing) without doing dask in dask

@ian-r-rose
Copy link
Collaborator

You might be interested in following/weighing-in-on this conversation, which is around improving the ergonomics of how to launch tasks from other tasks: #5671. It is possible to do this today, it's just not particularly easy to get right: http://distributed.dask.org/en/stable/task-launch.html

@JoranDox
Copy link
Author

JoranDox commented Mar 1, 2022

@ian-r-rose in the end I believe we fixed it by using the more idiomatic dask_client.map, though I think we're technically still running tasks from within tasks. It seems to have solved the issue with all the tasks running on one worker somehow (and feels cleaner too).

params = [
# list of param dicts, generated or hardcoded
]

def get_data(params):
    return cat.subcat().weekly_df(**params).to_dask()

ddf = dask.dataframe.multi.concat(
    dask_client.gather(
        dask_client.map(get_data, params)
    )
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs info Needs further information from the user
Projects
None yet
Development

No branches or pull requests

2 participants