Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-dask] Support execution on dask_yarn #2273

Closed
natekupp opened this issue Mar 15, 2020 · 4 comments
Closed

[dagster-dask] Support execution on dask_yarn #2273

natekupp opened this issue Mar 15, 2020 · 4 comments
Assignees
Labels
area: integrations Related to general integrations, including requests for a new integration

Comments

@natekupp
Copy link
Contributor

Right now we just pass config along to the dask.distributed.Client: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-dask/dagster_dask/engine.py#L83

that would need to change to support dask_yarn which needs to pass in a YARN cluster object to the client

@mgasner mgasner added the area: integrations Related to general integrations, including requests for a new integration label Mar 18, 2020
@natekupp
Copy link
Contributor Author

hmm wonder if we should use something like https://jcristharif.com/introducing-skein.html and just support YARN directly

@DavidKatz-il
Copy link
Contributor

I have an idea how to address this issue.
When configuring the dask Client we can submit a specific type of cluster.

We cam amend the execute function in the DaskEngine class to include the following (similar to dask_yarn example)

from dask.distributed import Client, SSHCluster
from dask_yarn import YarnCluster
from dask_jobqueue import PBSCluster
from dask_kubernetes import KubeCluster

client_type = pipeline_context.client_type  # coming from environment_dict
if client_type == 'local':
   cluster = LocalCluster(**dask_config.build_dict(pipeline_name))
elif client_type == 'yarn':
   cluster = YarnCluster(**dask_config.build_dict(pipeline_name))
elif client_type == 'ssh':
   cluster = SSHCluster(**dask_config.build_dict(pipeline_name))
elif client_type == 'pbs':
   cluster = PBSCluster(**dask_config.build_dict(pipeline_name))
elif client_type == 'kube':
   cluster = KubeCluster(**dask_config.build_dict(pipeline_name))
else:
   raise ValueError(f"'client_type' must be one of the following ('local', 'yarn', 'ssh', 'pbs', 'kube') not {client_type}") 

with dask.distributed.Client(cluster) as client:
...

If it make sense i will be happy to submit a PR.

@natekupp
Copy link
Contributor Author

@DavidKatz-il yes something along those lines could work; one additional concern I should flag here is that to properly support these, we should expose a way to configure the cluster via the executor config schema (currently: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-dask/dagster_dask/executor.py#L9-L36)

We could either explicitly create the config schema fields for each cluster configuration parameter, or use Permissive() to allow the end user to to pass through these fields. While the latter would bypass config schema validation, it has the advantage that we won't need to worry about drift between our APIs and the underlying Dask configs, so I think that would be preferable.

Please ping me on the Dagster Slack if you decide to jump in, I'm happy to help you get a PR together!

@natekupp
Copy link
Contributor Author

Fixed by #2498

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: integrations Related to general integrations, including requests for a new integration
Projects
None yet
Development

No branches or pull requests

3 participants