Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Dask DataFrame read/to options under read/to keys #2821

Merged
merged 25 commits into from
Sep 4, 2020

Conversation

kinghuang
Copy link
Contributor

@kinghuang kinghuang commented Aug 16, 2020

This PR replaces #2809.

This PR modifies the loader and materializer options for the Dask DataFrame DagsterType to place the options for reading and writing DataFrames under keys named read and to, respectively. Moving the keys in one level opens up room at the root of the type config for other options.

Before:

solids:
  example:
    inputs:
      dataframe:
        parquet:
          path: s3://some_bucket/some_path

After:

solids:
  example:
    inputs:
      dataframe:
        read:
          parquet:
            path: s3://some_bucket/some_path

read and to were chosen as the key names to match the underlying method names for reading and writing DataFrames (e.g., read_parquet, to_json).

Several typos in option names are also fixed.

  • read_hdf: Keykey
  • read_json: storage_optionstorage_options

@kinghuang kinghuang marked this pull request as draft August 16, 2020 19:09
@kinghuang kinghuang force-pushed the dask-df-read-to branch 2 times, most recently from 594e426 to f255529 Compare August 21, 2020 20:54
@kinghuang kinghuang marked this pull request as ready for review August 24, 2020 22:44
@kinghuang
Copy link
Contributor Author

@alangenfeld, this is ready for a review.

To address the breaking change you noted in #2809 (review), the existing config keys are allowed, but will generate deprecation warnings on use.

This PR has been narrowed down to just the key change. I'll submit the rest of #2809 in a separate PR after this one clears.

Copy link
Member

@alangenfeld alangenfeld left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file an issue for removing the deprecated and add it as a comment to the places where the cleanup would occur

@kinghuang
Copy link
Contributor Author

Create #2872 about deprecated options and added as comments for future cleanup.

@kinghuang kinghuang mentioned this pull request Sep 1, 2020
@alangenfeld
Copy link
Member

black and pylint failures are real - worth rebasing too since some of the flakey test failures are fixed in master

The DataFrame loader configs currently specify where to load data from.
Place these under a key named “read to make room for additional loader
options.
The DataFrame materializer configs currently specify where to write
data to. Place these under a key named “to” to make room for additional
materializer options.
Modify the DataFrame materializer to allow materialization in multiple
formats.
Separate the data for the DataFrame loader into a dict and create a
function that generates the loader config using it. This reduces
repetition and will be used to provide both the original form of the
loader config and the new form with the read key.

The descriptions of the read options are also cut down to the core
sentence instead of being a copy of the full Dask documentation.
Replace the long if/else list for each read function with a lookup to
DataFrameReadTypes for metadata about the read type, which includes the
function and whether it is path based. Use this to prepare the
arguments to the specified read function, then call it and return the
resulting Dask dataframe.
Temporarily add support for specify a read type (e.g.m parquet) at the
root of a loader config. This form is deprecated in favour of the read
key.
Separate the data for the DataFrame materializer into a dict and create
a function that generates the materializer config using it. This
reduces repetition and will be used to provide both the original form
of the materializer config and the new form with the read key.

The descriptions of the to options are also cut down to the core
sentence instead of being a copy of the full Dask documentation.
Replace the long if/else list for each to function with a lookup to
DataFrameToTypes for metadata about the to type, which includes the
function and whether it is path based. Use this to prepare the
arguments to the specified to function, then call it.

Also, limit the AssetMaterialization yield to just file based
assets for this commit.
Accept to specifications on the root of the type config in addition to
those under the to key. This is temporary for compatibility purposes,
and should be considered deprecated.
The dict_without_keys function is not used.
This field was accidentally lost in the conversion.
Order the tests from input to output.
Allow extra fields to be specified when loading or materializing
dataframes. The extra options are passed to the underlying Pandas
implementations.
Dataframe read/to configs should be placed under read/to keys, not at
the root of the type config.
Change the root level config for both loader and materializer from
Shape to Selector. This will change back to Shape when additional
options are introduced later on.
See dagster-io#2872 for information
about deprecated loader and materializer config options.
@kinghuang
Copy link
Contributor Author

Fixed failures and rebased.

@alangenfeld alangenfeld merged commit 03efa3e into dagster-io:master Sep 4, 2020
@kinghuang
Copy link
Contributor Author

Thanks! I'll rebase and prep #2811, next.

@kinghuang kinghuang deleted the dask-df-read-to branch September 7, 2020 17:19
alangenfeld pushed a commit that referenced this pull request Sep 18, 2020
This PR creates a Dask resource, which manages a Dask client and optional cluster. The resource is to be used by solids and the Dask DataFrame type's materializer to compute Dask graphs.

The resource can be configured to connect to a pre-existing cluster by its scheduler address, or create a cluster on demand.

Here are example resource configs for typical use cases.

Connect to an existing Dask cluster via its scheduler:

resources:
  dask:
    config:
      client:
        name: my-dagster-pipeline
        address: tcp://dask-scheduler-here:8786
Create a local cluster with 4 workers, and 1 thread per worker:

The resource will create a Cluster object, and pass it as the address option to the client.

resources:
  dask:
    config:
      client:
        name: my-dagster-pipeline
      cluster:
        local:
          n_workers: 4
          threads_per_worker: 1
The DataFrame type does not require a Dask resource. But, if a resource is provided under the dask key, the type's materializer (if configured) will run with the resource's client as the current client. Otherwise, the global client and scheduler will apply.

Related to the above, the compute field is removed from all the materialization options. Providing a way to not compute a Dask DataFrame materialization does not make sense, as the future is never returned in the pipeline and cannot be computed at a later time. Materializations must be computed if specified.

Solids using the Dask resource for computation should consider making the resource client the current client by using the as_current() resource manager.

@solid(…)
def some_solid(context):
  with context.resources.dask.client.as_current():
    …
This PR carries the commits from #2821, and is meant to be merged after it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants