Skip to content

Commit

Permalink
Test multiple local cluster configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
kinghuang committed Sep 1, 2020
1 parent 70e2867 commit aa75183
Showing 1 changed file with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def scheduler_info_pipeline():
return scheduler_info_solid()


def test_custom_local_cluster():
def test_single_local_cluster():
cluster_config = {
"n_workers": 2,
"threads_per_worker": 3,
Expand All @@ -49,9 +49,32 @@ def test_custom_local_cluster():

run_config = {"resources": {"dask": {"config": {"cluster": {"local": cluster_config}}}}}
result = execute_pipeline(scheduler_info_pipeline, run_config=run_config, instance=DagsterInstance.local_temp())
_assert_scheduler_info_result(result, cluster_config)


def test_multiple_local_cluster():
cluster_configs = [
{
"n_workers": 1,
"threads_per_worker": 2,
"dashboard_address": None,
},
{
"n_workers": 2,
"threads_per_worker": 3,
"dashboard_address": None,
},
]

for cluster_config in cluster_configs:
run_config = {"resources": {"dask": {"config": {"cluster": {"local": cluster_config}}}}}
result = execute_pipeline(scheduler_info_pipeline, run_config=run_config, instance=DagsterInstance.local_temp())
_assert_scheduler_info_result(result, cluster_config)


def _assert_scheduler_info_result(result, config):
scheduler_info = result.result_for_solid("scheduler_info_solid").output_value("scheduler_info")
assert len(scheduler_info["workers"]) == cluster_config["n_workers"]
assert len(scheduler_info["workers"]) == config["n_workers"]

nthreads = result.result_for_solid("scheduler_info_solid").output_value("nthreads")
assert all(v == cluster_config["threads_per_worker"] for v in nthreads.values())
assert all(v == config["threads_per_worker"] for v in nthreads.values())

0 comments on commit aa75183

Please sign in to comment.