Skip to content

Commit

Permalink
Add dask dataframe (#2758)
Browse files Browse the repository at this point in the history
* add data_frame

* add tests

* fix `description`

* fix tests
  • Loading branch information
DavidKatz-il authored Aug 3, 2020
1 parent befc67e commit 83a8e30
Show file tree
Hide file tree
Showing 10 changed files with 1,015 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from dagster.core.utils import check_dagster_package_version

from .data_frame import DataFrame
from .executor import dask_executor
from .version import __version__

check_dagster_package_version('dagster-dask', __version__)

__all__ = ['dask_executor']
__all__ = [
'DataFrame',
'dask_executor',
]
898 changes: 898 additions & 0 deletions python_modules/libraries/dagster-dask/dagster_dask/data_frame.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
num1,num2
1,2
3,4
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"num1":1,"num2":2}
{"num1":3,"num2":4}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import shutil

import dask.dataframe as dd
import pytest
from dagster_dask import DataFrame
from dask.dataframe.utils import assert_eq

from dagster import InputDefinition, OutputDefinition, execute_solid, file_relative_path, solid
from dagster.utils.test import get_temp_dir


def create_dask_df():
path = file_relative_path(__file__, 'num.csv')
return dd.read_csv(path)


@pytest.mark.parametrize(
'file_type,read,kwargs',
[
pytest.param('csv', dd.read_csv, {'index': False}, id='csv'),
pytest.param('parquet', dd.read_parquet, {'write_index': False}, id='parquet'),
pytest.param('json', dd.read_json, {}, id='json'),
],
)
def test_dataframe_outputs(file_type, read, kwargs):
df = create_dask_df()

@solid(output_defs=[OutputDefinition(dagster_type=DataFrame, name='output_df')])
def return_df(_):
return df

with get_temp_dir() as temp_path:
shutil.rmtree(temp_path)
result = execute_solid(
return_df,
run_config={
'solids': {
'return_df': {
'outputs': [{'output_df': {file_type: {'path': temp_path, **kwargs}}}]
}
}
},
)
assert result.success
actual = read(f"{temp_path}/*")
assert assert_eq(actual, df)


@pytest.mark.parametrize(
'file_type',
[
pytest.param('csv', id='csv'),
pytest.param('parquet', id='parquet'),
pytest.param('json', id='json'),
],
)
def test_dataframe_inputs(file_type):
@solid(input_defs=[InputDefinition(dagster_type=DataFrame, name='input_df')])
def return_df(_, input_df):
return input_df

file_name = file_relative_path(__file__, f"num.{file_type}")
result = execute_solid(
return_df,
run_config={
'solids': {'return_df': {'inputs': {'input_df': {file_type: {'path': file_name}}}}}
},
)
assert result.success
assert assert_eq(result.output_value(), create_dask_df())
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import dagster_pandas as dagster_pd
from dagster_dask import dask_executor
from dagster_dask import DataFrame, dask_executor

from dagster import (
InputDefinition,
Expand Down Expand Up @@ -88,3 +88,35 @@ def test_pandas_dask():
)

assert result.success


@solid(input_defs=[InputDefinition('df', DataFrame)])
def dask_solid(_, df): # pylint: disable=unused-argument
pass


@pipeline(mode_defs=[ModeDefinition(executor_defs=default_executors + [dask_executor])])
def dask_pipeline():
return dask_solid()


def test_dask():
run_config = {
'solids': {
'dask_solid': {
'inputs': {'df': {'csv': {'path': file_relative_path(__file__, 'ex*.csv')}}}
}
}
}

result = execute_pipeline(
ReconstructablePipeline.for_file(__file__, dask_pipeline.name),
run_config={
'storage': {'filesystem': {}},
'execution': {'dask': {'config': {'cluster': {'local': {'timeout': 30}}}}},
**run_config,
},
instance=DagsterInstance.local_temp(),
)

assert result.success
2 changes: 2 additions & 0 deletions python_modules/libraries/dagster-dask/dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# we need `pyarrow` for testing read/write parquet files.
pyarrow
2 changes: 1 addition & 1 deletion python_modules/libraries/dagster-dask/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def get_version():
'bokeh',
'dagster',
'dagster_graphql',
'dask>=1.2.2',
'dask[dataframe]>=1.2.2',
'distributed>=1.28.1',
],
extras_require={
Expand Down
1 change: 1 addition & 0 deletions python_modules/libraries/dagster-dask/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ deps =
-e ../dagster-aws
-e ../dagster-pandas
-e .
-r dev-requirements.txt
extras = yarn,pbs,kube
usedevelop = true
whitelist_externals =
Expand Down

0 comments on commit 83a8e30

Please sign in to comment.