Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Optimize dask.uniform_neighbor_sample #2887

Merged
103 changes: 61 additions & 42 deletions python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,48 @@

import dask_cudf
import cudf
import cupy as cp

from pylibcugraph import ResourceHandle

from pylibcugraph import uniform_neighbor_sample as pylibcugraph_uniform_neighbor_sample

from cugraph.dask.comms import comms as Comms
from cugraph.dask.common.input_utils import get_distributed_data

src_n = "sources"
dst_n = "destinations"
indices_n = "indices"


def create_iterable_args(
session_id, input_graph, start_list, fanout_vals, with_replacement, weight_t
):
npartitions = input_graph._npartitions
session_id_it = [session_id] * npartitions
graph_it = input_graph._plc_graph.values()
start_list_it = cp.array_split(start_list.values, npartitions)
fanout_vals_it = [fanout_vals] * npartitions
with_replacement_it = [with_replacement] * npartitions
weight_t_it = [weight_t] * npartitions
return [
session_id_it,
graph_it,
start_list_it,
fanout_vals_it,
with_replacement_it,
weight_t_it,
]


def create_empty_df(indices_t, weight_t):
df = cudf.DataFrame(
{
src_n: numpy.empty(shape=0, dtype=indices_t),
dst_n: numpy.empty(shape=0, dtype=indices_t),
indices_n: numpy.empty(shape=0, dtype=weight_t),
}
)
return df


def convert_to_cudf(cp_arrays, weight_t):
Expand All @@ -33,9 +68,9 @@ def convert_to_cudf(cp_arrays, weight_t):
cupy_sources, cupy_destinations, cupy_indices = cp_arrays

df = cudf.DataFrame()
df["sources"] = cupy_sources
df["destinations"] = cupy_destinations
df["indices"] = cupy_indices
df[src_n] = cupy_sources
df[dst_n] = cupy_destinations
df[indices_n] = cupy_indices

if weight_t == "int32":
df.indices = df.indices.astype("int32")
Expand All @@ -46,17 +81,16 @@ def convert_to_cudf(cp_arrays, weight_t):


def _call_plc_uniform_neighbor_sample(
sID, mg_graph_x, st_x, fanout_vals, with_replacement
sID, mg_graph_x, st_x, fanout_vals, with_replacement, weight_t
):
return pylibcugraph_uniform_neighbor_sample(
cp_arrays = pylibcugraph_uniform_neighbor_sample(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
input_graph=mg_graph_x,
start_list=st_x,
h_fan_out=fanout_vals,
with_replacement=with_replacement,
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved
# FIXME: should we add this parameter as an option?
do_expensive_check=True,
)
return convert_to_cudf(cp_arrays, weight_t)


def uniform_neighbor_sample(
Expand Down Expand Up @@ -123,47 +157,32 @@ def uniform_neighbor_sample(
else:
weight_t = "float32"

# start_list uses "external" vertex IDs, but if the graph has been
# renumbered, the start vertex IDs must also be renumbered.
if "_SRC_" in input_graph.edgelist.edgelist_df:
indices_t = input_graph.edgelist.edgelist_df["_SRC_"].dtype
elif src_n in input_graph.edgelist.edgelist_df:
indices_t = input_graph.edgelist.edgelist_df[src_n].dtype
else:
indices_t = numpy.int32

if input_graph.renumbered:
start_list = input_graph.lookup_internal_vertex_id(start_list).compute()

start_list = dask_cudf.from_cudf(
start_list, npartitions=min(input_graph._npartitions, len(start_list))
)
start_list = get_distributed_data(start_list)
wait(start_list)
start_list = start_list.worker_to_parts

client = input_graph._client

result = [
client.submit(
_call_plc_uniform_neighbor_sample,
Comms.get_session_id(),
input_graph._plc_graph[w],
start_list[w][0],
fanout_vals,
with_replacement,
workers=[w],
allow_other_workers=False,
)
for w in Comms.get_workers()
]

wait(result)
session_id = Comms.get_session_id()

cudf_result = [
client.submit(convert_to_cudf, cp_arrays, weight_t) for cp_arrays in result
]

wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result).persist()
# Send tasks all at once
Copy link
Member

@alexbarghi-nv alexbarghi-nv Nov 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one comment; it would be nice to have a helper function for this so we can apply it to more algorithms. But I suggest we leave that to a future PR given how much work we have left before burndown.

To clarify, this was referencing lines 174-180

result = client.map(
_call_plc_uniform_neighbor_sample,
*create_iterable_args(
session_id, input_graph, start_list, fanout_vals, with_replacement, weight_t
),
)
ddf = dask_cudf.from_delayed(
result, meta=create_empty_df(indices_t, weight_t), verify_meta=False
).persist()
wait(ddf)

# Wait until the inactive futures are released
wait([(r.release(), c_r.release()) for r, c_r in zip(result, cudf_result)])
wait([r.release() for r in result])

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, "sources", preserve_order=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,23 @@
from cugraph.testing import utils
from cugraph.dask import uniform_neighbor_sample

# If the rapids-pytest-benchmark plugin is installed, the "gpubenchmark"
# fixture will be available automatically. Check that this fixture is available
# by trying to import rapids_pytest_benchmark, and if that fails, set
# "gpubenchmark" to the standard "benchmark" fixture provided by
# pytest-benchmark.
try:
import rapids_pytest_benchmark # noqa: F401
except ImportError:
import pytest_benchmark

gpubenchmark = pytest_benchmark.plugin.benchmark

# =============================================================================
# Pytest Setup / Teardown - called for each test function
# =============================================================================


def setup_function():
gc.collect()

Expand Down Expand Up @@ -300,3 +313,42 @@ def test_mg_uniform_neighbor_sample_ensure_no_duplicates(dask_client):
)

assert len(output_df.compute()) == 3


# =============================================================================
# Benchmarks
# =============================================================================


@pytest.mark.slow
@pytest.mark.parametrize("n_samples", [1_000, 5_000, 10_000])
def bench_uniform_neigbour_sample_email_eu_core(gpubenchmark, dask_client, n_samples):
input_data_path = utils.RAPIDS_DATASET_ROOT_DIR_PATH / "email-Eu-core.csv"
chunksize = dcg.get_chunksize(input_data_path)

ddf = dask_cudf.read_csv(
input_data_path,
chunksize=chunksize,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "int32"],
)

dg = cugraph.Graph(directed=False)
dg.from_dask_cudf_edgelist(
ddf,
source="src",
destination="dst",
edge_attr="value",
store_transposed=False,
legacy_renum_only=True,
)
# Partition the dataframe to add in chunks
srcs = dg.input_df["src"]
start_list = srcs[:n_samples].compute()

def func():
_ = cugraph.dask.uniform_neighbor_sample(dg, start_list, [10])
del _

gpubenchmark(func)