Skip to content

Commit

Permalink
fix mg_renumber non-deterministic errors (#1523)
Browse files Browse the repository at this point in the history
* @Iroy30 added missing dask `persist()` call to ensure deterministic indirection map state prior to merging renumbering results.
* @rlratzel updated MG renumbering test for latest API changes, removed redundant test, and updated test IDs to include the dataset name.

Authors:
  - https://github.com/Iroy30
  - Rick Ratzel (https://github.com/rlratzel)

Approvers:
  - Brad Rees (https://github.com/BradReesWork)
  - Joseph Nke (https://github.com/jnke2016)

URL: #1523
  • Loading branch information
Iroy30 authored Apr 9, 2021
1 parent 63e69fc commit e9d09ee
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 91 deletions.
8 changes: 4 additions & 4 deletions python/cugraph/structure/number_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ def indirection_map(self, ddf, src_col_names, dst_col_names):
to_frame(name=newname)
else:
tmp_df[newname] = tmp[newname].append(tmp_dst[oldname])
print(tmp_df.columns)
else:
for newname in self.col_names:
tmp_df[newname] = tmp[newname]
Expand All @@ -273,7 +272,7 @@ def indirection_map(self, ddf, src_col_names, dst_col_names):
tmp_ddf = tmp_ddf.assign(idx=1)
tmp_ddf['global_id'] = tmp_ddf.idx.cumsum() - 1
tmp_ddf = tmp_ddf.drop(columns='idx')

tmp_ddf = tmp_ddf.persist()
self.ddf = tmp_ddf
return tmp_ddf

Expand Down Expand Up @@ -481,8 +480,6 @@ def renumber(df, src_col_names, dst_col_names, preserve_order=False,
renumber_type = 'legacy'
else:
renumber_type = 'experimental'
df = df.rename(columns={src_col_names: "src",
dst_col_names: "dst"})

renumber_map = NumberMap()
if not isinstance(src_col_names, list):
Expand Down Expand Up @@ -514,6 +511,9 @@ def renumber(df, src_col_names, dst_col_names, preserve_order=False,
df, "dst", dst_col_names, drop=True,
preserve_order=preserve_order
)
else:
df = df.rename(columns={src_col_names[0]: "src",
dst_col_names[0]: "dst"})

num_edges = len(df)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
@pytest.mark.skipif(
is_single_gpu(), reason="skipping MG testing on Single GPU system"
)
@pytest.mark.parametrize("graph_file", DATASETS)
@pytest.mark.parametrize("graph_file", DATASETS,
ids=[f"dataset={d.as_posix()}" for d in DATASETS])
@pytest.mark.parametrize("directed", DIRECTED_GRAPH_OPTIONS)
@pytest.mark.parametrize("subset_size", SUBSET_SIZE_OPTIONS)
@pytest.mark.parametrize("normalized", NORMALIZED_OPTIONS)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2020, NVIDIA CORPORATION.
# Copyright (c) 2019-2021, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -48,7 +48,8 @@
@pytest.mark.skipif(
is_single_gpu(), reason="skipping MG testing on Single GPU system"
)
@pytest.mark.parametrize("graph_file", DATASETS)
@pytest.mark.parametrize("graph_file", DATASETS,
ids=[f"dataset={d}" for d in DATASETS])
@pytest.mark.parametrize("directed", DIRECTED_GRAPH_OPTIONS)
@pytest.mark.parametrize("subset_size", SUBSET_SIZE_OPTIONS)
@pytest.mark.parametrize("normalized", NORMALIZED_OPTIONS)
Expand Down
5 changes: 4 additions & 1 deletion python/cugraph/tests/dask/test_mg_bfs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -35,7 +35,10 @@ def client_connection():
def test_dask_bfs(client_connection):
gc.collect()

# FIXME: update this to allow dataset to be parameterized and have dataset
# part of test param id (see other tests)
input_data_path = r"../datasets/netscience.csv"
print(f"dataset={input_data_path}")
chunksize = dcg.get_chunksize(input_data_path)

ddf = dask_cudf.read_csv(
Expand Down
6 changes: 5 additions & 1 deletion python/cugraph/tests/dask/test_mg_comms.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -38,10 +38,14 @@ def test_dask_pagerank(client_connection):
# Initialize and run pagerank on two distributed graphs
# with same communicator

# FIXME: update this to allow dataset to be parameterized and have dataset
# part of test param id (see other tests)
input_data_path1 = r"../datasets/karate.csv"
print(f"dataset1={input_data_path1}")
chunksize1 = dcg.get_chunksize(input_data_path1)

input_data_path2 = r"../datasets/dolphins.csv"
print(f"dataset2={input_data_path2}")
chunksize2 = dcg.get_chunksize(input_data_path2)

ddf1 = dask_cudf.read_csv(
Expand Down
5 changes: 4 additions & 1 deletion python/cugraph/tests/dask/test_mg_degree.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018-2020, NVIDIA CORPORATION.
# Copyright (c) 2018-2021, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -34,7 +34,10 @@ def client_connection():
def test_dask_mg_degree(client_connection):
gc.collect()

# FIXME: update this to allow dataset to be parameterized and have dataset
# part of test param id (see other tests)
input_data_path = r"../datasets/karate.csv"
print(f"dataset={input_data_path}")

chunksize = cugraph.dask.get_chunksize(input_data_path)

Expand Down
5 changes: 4 additions & 1 deletion python/cugraph/tests/dask/test_mg_katz_centrality.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -36,7 +36,10 @@ def client_connection():
def test_dask_katz_centrality(client_connection):
gc.collect()

# FIXME: update this to allow dataset to be parameterized and have dataset
# part of test param id (see other tests)
input_data_path = r"../datasets/karate.csv"
print(f"dataset={input_data_path}")
chunksize = dcg.get_chunksize(input_data_path)

ddf = dask_cudf.read_csv(
Expand Down
7 changes: 5 additions & 2 deletions python/cugraph/tests/dask/test_mg_louvain.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -52,7 +52,10 @@ def client_connection():
@pytest.mark.skipif(
is_single_gpu(), reason="skipping MG testing on Single GPU system"
)
@pytest.fixture(scope="module", params=utils.DATASETS_UNDIRECTED)
@pytest.fixture(scope="module",
params=utils.DATASETS_UNDIRECTED,
ids=[f"dataset={d.as_posix()}"
for d in utils.DATASETS_UNDIRECTED])
def daskGraphFromDataset(request, client_connection):
"""
Returns a new dask dataframe created from the dataset file param.
Expand Down
5 changes: 4 additions & 1 deletion python/cugraph/tests/dask/test_mg_pagerank.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -65,7 +65,10 @@ def client_connection():
def test_dask_pagerank(client_connection, personalization_perc):
gc.collect()

# FIXME: update this to allow dataset to be parameterized and have dataset
# part of test param id (see other tests)
input_data_path = r"../datasets/karate.csv"
print(f"dataset={input_data_path}")
chunksize = dcg.get_chunksize(input_data_path)

ddf = dask_cudf.read_csv(
Expand Down
93 changes: 30 additions & 63 deletions python/cugraph/tests/dask/test_mg_renumber.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2020, NVIDIA CORPORATION.
# Copyright (c) 2019-2021, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -38,11 +38,12 @@ def client_connection():
teardown_local_dask_cluster(cluster, client)


# Test all combinations of default/managed and pooled/non-pooled allocation
@pytest.mark.skipif(
is_single_gpu(), reason="skipping MG testing on Single GPU system"
)
@pytest.mark.parametrize("graph_file", utils.DATASETS_UNRENUMBERED)
@pytest.mark.parametrize("graph_file", utils.DATASETS_UNRENUMBERED,
ids=[f"dataset={d.as_posix()}"
for d in utils.DATASETS_UNRENUMBERED])
def test_mg_renumber(graph_file, client_connection):
gc.collect()

Expand All @@ -60,71 +61,37 @@ def test_mg_renumber(graph_file, client_connection):

ddf = dask.dataframe.from_pandas(gdf, npartitions=2)

numbering = NumberMap()
numbering.from_dataframe(ddf, ["src", "src_old"], ["dst", "dst_old"])
renumbered_df = numbering.add_internal_vertex_id(
numbering.add_internal_vertex_id(ddf, "src_id", ["src", "src_old"]),
"dst_id",
["dst", "dst_old"],
)

check_src = numbering.from_internal_vertex_id(
renumbered_df, "src_id"
).compute()
check_dst = numbering.from_internal_vertex_id(
renumbered_df, "dst_id"
).compute()

assert check_src["0"].to_pandas().equals(check_src["src"].to_pandas())
assert check_src["1"].to_pandas().equals(check_src["src_old"].to_pandas())
assert check_dst["0"].to_pandas().equals(check_dst["dst"].to_pandas())
assert check_dst["1"].to_pandas().equals(check_dst["dst_old"].to_pandas())


# Test all combinations of default/managed and pooled/non-pooled allocation
@pytest.mark.skipif(
is_single_gpu(), reason="skipping MG testing on Single GPU system"
)
@pytest.mark.parametrize("graph_file", utils.DATASETS_UNRENUMBERED)
def test_mg_renumber2(graph_file, client_connection):
gc.collect()

M = utils.read_csv_for_nx(graph_file)
sources = cudf.Series(M["0"])
destinations = cudf.Series(M["1"])

translate = 1000

gdf = cudf.DataFrame()
gdf["src_old"] = sources
gdf["dst_old"] = destinations
gdf["src"] = sources + translate
gdf["dst"] = destinations + translate
gdf["weight"] = gdf.index.astype(np.float)

ddf = dask.dataframe.from_pandas(gdf, npartitions=2)

ren2, num2 = NumberMap.renumber(
ddf, ["src", "src_old"], ["dst", "dst_old"]
)

check_src = num2.from_internal_vertex_id(ren2, "src").compute()
check_src = check_src.sort_values("weight").reset_index(drop=True)
check_dst = num2.from_internal_vertex_id(ren2, "dst").compute()
check_dst = check_dst.sort_values("weight").reset_index(drop=True)

assert check_src["0"].to_pandas().equals(gdf["src"].to_pandas())
assert check_src["1"].to_pandas().equals(gdf["src_old"].to_pandas())
assert check_dst["0"].to_pandas().equals(gdf["dst"].to_pandas())
assert check_dst["1"].to_pandas().equals(gdf["dst_old"].to_pandas())
# preserve_order is not supported for MG
renumbered_df, renumber_map = NumberMap.renumber(ddf,
["src", "src_old"],
["dst", "dst_old"],
preserve_order=False)
unrenumbered_df = renumber_map.unrenumber(renumbered_df, "src",
preserve_order=False)
unrenumbered_df = renumber_map.unrenumber(unrenumbered_df, "dst",
preserve_order=False)

# sort needed only for comparisons, since preserve_order is False
gdf = gdf.sort_values(by=["src", "src_old", "dst", "dst_old"])
gdf = gdf.reset_index()
unrenumbered_df = unrenumbered_df.compute()
unrenumbered_df = unrenumbered_df.sort_values(by=["0_src", "1_src",
"0_dst", "1_dst"])
unrenumbered_df = unrenumbered_df.reset_index()

assert gdf["src"].equals(unrenumbered_df["0_src"])
assert gdf["src_old"].equals(unrenumbered_df["1_src"])
assert gdf["dst"].equals(unrenumbered_df["0_dst"])
assert gdf["dst_old"].equals(unrenumbered_df["1_dst"])


# Test all combinations of default/managed and pooled/non-pooled allocation
@pytest.mark.skipif(
is_single_gpu(), reason="skipping MG testing on Single GPU system"
)
@pytest.mark.parametrize("graph_file", utils.DATASETS_UNRENUMBERED)
def test_mg_renumber3(graph_file, client_connection):
@pytest.mark.parametrize("graph_file", utils.DATASETS_UNRENUMBERED,
ids=[f"dataset={d.as_posix()}"
for d in utils.DATASETS_UNRENUMBERED])
def test_mg_renumber_add_internal_vertex_id(graph_file, client_connection):
gc.collect()

M = utils.read_csv_for_nx(graph_file)
Expand Down
Loading

0 comments on commit e9d09ee

Please sign in to comment.