Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidKatz-il committed Jul 28, 2020
1 parent 3531c26 commit b2c7256
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
num1,num2
1,2
3,4
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"num1":1,"num2":2}
{"num1":3,"num2":4}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import shutil

import dask.dataframe as dd
import pytest
from dagster_dask import DataFrame
from dask.dataframe.utils import assert_eq

from dagster import InputDefinition, OutputDefinition, execute_solid, file_relative_path, solid
from dagster.utils.test import get_temp_dir


def create_dask_df():
path = file_relative_path(__file__, 'num.csv')
return dd.read_csv(path)


@pytest.mark.parametrize(
'file_type,read,kwargs',
[
pytest.param('csv', dd.read_csv, {'index': False}, id='csv'),
pytest.param('parquet', dd.read_parquet, {'write_index': False}, id='parquet'),
pytest.param('json', dd.read_json, {}, id='json'),
],
)
def test_dataframe_outputs(file_type, read, kwargs):
df = create_dask_df()

@solid(output_defs=[OutputDefinition(dagster_type=DataFrame, name='output_df')])
def return_df(_):
return df

with get_temp_dir() as temp_path:
shutil.rmtree(temp_path)
result = execute_solid(
return_df,
run_config={
'solids': {
'return_df': {
'outputs': [{'output_df': {file_type: {'path': temp_path, **kwargs}}}]
}
}
},
)
assert result.success
actual = read(f"{temp_path}/*")
assert assert_eq(actual, df)


@pytest.mark.parametrize(
'file_type',
[
pytest.param('csv', id='csv'),
pytest.param('parquet', id='parquet'),
pytest.param('json', id='json'),
],
)
def test_dataframe_inputs(file_type):
@solid(input_defs=[InputDefinition(dagster_type=DataFrame, name='input_df')])
def return_df(_, input_df):
return input_df

file_name = file_relative_path(__file__, f"num.{file_type}")
result = execute_solid(
return_df,
run_config={
'solids': {'return_df': {'inputs': {'input_df': {file_type: {'path': file_name}}}}}
},
)
assert result.success
assert assert_eq(result.output_value(), create_dask_df())
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import dagster_pandas as dagster_pd
from dagster_dask import dask_executor
from dagster_dask import DataFrame, dask_executor

from dagster import (
InputDefinition,
Expand Down Expand Up @@ -88,3 +88,35 @@ def test_pandas_dask():
)

assert result.success


@solid(input_defs=[InputDefinition('df', DataFrame)])
def dask_solid(_, df): # pylint: disable=unused-argument
pass


@pipeline(mode_defs=[ModeDefinition(executor_defs=default_executors + [dask_executor])])
def dask_pipeline():
return dask_solid()


def test_dask():
run_config = {
'solids': {
'dask_solid': {
'inputs': {'df': {'csv': {'path': file_relative_path(__file__, 'ex*.csv')}}}
}
}
}

result = execute_pipeline(
ReconstructablePipeline.for_file(__file__, dask_pipeline.name),
run_config={
'storage': {'filesystem': {}},
'execution': {'dask': {'config': {'cluster': {'local': {'timeout': 30}}}}},
**run_config,
},
instance=DagsterInstance.local_temp(),
)

assert result.success
2 changes: 2 additions & 0 deletions python_modules/libraries/dagster-dask/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ commands =
!windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster -e dagit'
coverage erase
echo -e "--- \033[0;32m:pytest: Running tox tests\033[0m"
python -m pip install "dask[dataframe]" --upgrade
pip install pyarrow
pytest -vv --junitxml=test_results.xml --cov=dagster_dask --cov-append --cov-report=
coverage report --omit='.tox/*,**/test_*.py' --skip-covered
coverage html --omit='.tox/*,**/test_*.py'
Expand Down

0 comments on commit b2c7256

Please sign in to comment.