Skip to content

Commit

Permalink
Fix MG PLC algos intermittent hang (#2607)
Browse files Browse the repository at this point in the history
Dask doesn't always release some of the inactive futures fast enough. This can be problematic when running the same `algo` several times with the same `PLC graph` because those futures can be cache in the next iteration causing a hang if some get released midway.

This PR manually delete inactive futures.
closes #2568

Authors:
  - Joseph Nke (https://github.com/jnke2016)

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)
  - AJ Schmidt (https://github.com/ajschmidt8)

URL: #2607
  • Loading branch information
jnke2016 authored Aug 31, 2022
1 parent 1dc77bb commit 821571d
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 5 deletions.
2 changes: 2 additions & 0 deletions conda/recipes/cugraph/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ requirements:
- ucx-proc=*=gpu
- cudatoolkit {{ cuda_version }}.*
- libraft-headers {{ minor_version }}
# FIXME: this pin can be removed once we move to the GitHub Actions build process
- setuptools<=65.2.0
run:
- python x.x
- pylibcugraph={{ version }}
Expand Down
13 changes: 12 additions & 1 deletion python/cugraph/cugraph/dask/cores/core_number.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def core_number(input_graph,
degree_type,
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
for w in Comms.get_workers()
]
Expand All @@ -126,7 +127,17 @@ def core_number(input_graph,

wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result)
ddf = dask_cudf.from_delayed(cudf_result).persist()
wait(ddf)

# FIXME: Dask doesn't always release it fast enough.
# For instance if the algo is run several times with
# the same PLC graph, the current iteration might try to cache
# the past iteration's futures and this can cause a hang if some
# of those futures get released midway
del result
del cudf_result

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, "vertex")

Expand Down
14 changes: 13 additions & 1 deletion python/cugraph/cugraph/dask/link_analysis/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ def pagerank(input_graph,
max_iter,
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
for w, data_personalization in data_prsztn.worker_to_parts.items()
]
Expand All @@ -304,6 +305,7 @@ def pagerank(input_graph,
max_iter,
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
for w in Comms.get_workers()
]
Expand All @@ -316,7 +318,17 @@ def pagerank(input_graph,

wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result)
ddf = dask_cudf.from_delayed(cudf_result).persist()
wait(ddf)

# FIXME: Dask doesn't always release it fast enough.
# For instance if the algo is run several times with
# the same PLC graph, the current iteration might try to cache
# the past iteration's futures and this can cause a hang if some
# of those futures get released midway
del result
del cudf_result

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, "vertex")

Expand Down
12 changes: 11 additions & 1 deletion python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def uniform_neighbor_sample(input_graph,
fanout_vals,
with_replacement,
workers=[w],
allow_other_workers=False,
)
for w in Comms.get_workers()
]
Expand All @@ -167,7 +168,16 @@ def uniform_neighbor_sample(input_graph,

wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result)
ddf = dask_cudf.from_delayed(cudf_result).persist()
wait(ddf)

# FIXME: Dask doesn't always release it fast enough.
# For instance if the algo is run several times with
# the same PLC graph, the current iteration might try to cache
# the past iteration's futures and this can cause a hang if some
# of those futures get released midway
del result
del cudf_result

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, "sources", preserve_order=True)
Expand Down
14 changes: 12 additions & 2 deletions python/cugraph/cugraph/dask/traversal/bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ def bfs(input_graph,
st[0],
depth_limit,
return_distances,
workers=[w]
workers=[w],
allow_other_workers=False,
)
for w, st in data_start.worker_to_parts.items()
]
Expand All @@ -188,7 +189,16 @@ def bfs(input_graph,
for cp_arrays in cupy_result]
wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result)
ddf = dask_cudf.from_delayed(cudf_result).persist()
wait(ddf)

# FIXME: Dask doesn't always release it fast enough.
# For instance if the algo is run several times with
# the same PLC graph, the current iteration might try to cache
# the past iteration's futures and this can cause a hang if some
# of those futures get released midway
del cupy_result
del cudf_result

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, 'vertex')
Expand Down

0 comments on commit 821571d

Please sign in to comment.