From 0bcb6e083218e13d7f346c60573e82c1b8bed7cb Mon Sep 17 00:00:00 2001 From: Joseph Nke <76006812+jnke2016@users.noreply.github.com> Date: Tue, 21 Jun 2022 09:44:36 -0500 Subject: [PATCH] Update the list of algos to benchmark (#2337) This PR 1. Update the way `uniform neighbor sample` is imported( it has been removed from experimental) 2. Ping `libraft-headers` and `pyraft` to 22.08 3. Add `Triangle count` to the list of algos to benchmarks Authors: - Joseph Nke (https://github.com/jnke2016) Approvers: - Rick Ratzel (https://github.com/rlratzel) URL: https://github.com/rapidsai/cugraph/pull/2337 --- benchmarks/python_e2e/benchmark.py | 67 ++-- benchmarks/python_e2e/cugraph_dask_funcs.py | 36 ++- benchmarks/python_e2e/cugraph_funcs.py | 23 +- benchmarks/python_e2e/main.py | 4 +- benchmarks/python_e2e/reporting.py | 8 +- .../cugraph/centrality/katz_centrality.py | 18 +- .../dask/centrality/eigenvector_centrality.py | 3 +- .../dask/centrality/katz_centrality.py | 19 +- .../cugraph/cugraph/dask/common/part_utils.py | 14 +- python/cugraph/cugraph/dask/traversal/bfs.py | 92 +++--- python/cugraph/cugraph/dask/traversal/sssp.py | 58 ++-- .../simpleDistributedGraph.py | 305 +++++++++++++----- .../graph_implementation/simpleGraph.py | 16 +- .../cugraph/cugraph/structure/number_map.py | 5 +- .../cugraph/tests/mg/test_mg_degree.py | 26 +- .../cugraph/cugraph/tests/mg/test_mg_graph.py | 119 +++++++ .../tests/mg/test_mg_katz_centrality.py | 15 +- .../tests/mg/test_mg_neighborhood_sampling.py | 58 ++-- .../tests/mg/test_mg_property_graph.py | 7 +- .../cugraph/tests/test_katz_centrality.py | 12 +- .../tests/test_neighborhood_sampling.py | 58 ++-- 21 files changed, 659 insertions(+), 304 deletions(-) create mode 100644 python/cugraph/cugraph/tests/mg/test_mg_graph.py diff --git a/benchmarks/python_e2e/benchmark.py b/benchmarks/python_e2e/benchmark.py index 0ce7597aa8d..4cb9576611a 100644 --- a/benchmarks/python_e2e/benchmark.py +++ b/benchmarks/python_e2e/benchmark.py @@ -86,15 +86,16 @@ def __init__(self, # FIXME: need to accept and save individual algo args self.construct_graph = benchmark(construct_graph_func) - #add starting node to algos: BFS and SSSP + # add starting node to algos: BFS and SSSP + # FIXME: Refactor BenchmarkRun __init__ because all the work + # done below should be done elsewhere for i, algo in enumerate (algo_func_param_list): - if benchmark(algo).name in ["bfs", "sssp", "neighborhood_sampling"]: + if benchmark(algo).name in ["bfs", "sssp", "uniform_neighbor_sample"]: param={} param["start"]=self.input_dataframe['src'].head()[0] - if benchmark(algo).name in ["neighborhood_sampling"]: + if benchmark(algo).name in ["uniform_neighbor_sample"]: start = [param.pop("start")] - labels = [0] - param["start_info_list"] = (start, labels) + param["start_list"] = start param["fanout_vals"] = [1] algo_func_param_list[i]=(algo,)+(param,) @@ -128,32 +129,44 @@ def run(self): self.__log("done.") G = result.retval self.results.append(result) - - #algos with transposed=True : PageRank, Katz - #algos with transposed=False: BFS, SSSP, Louvain, HITS, Neighborhood_sampling - #algos supporting the legacy_renum_only: HITS, Neighborhood_sampling + # + # Algos with transposed=True : PageRank, Katz. + # Algos with transposed=False: BFS, SSSP, Louvain, HITS, + # Neighborhood_sampling. + # Algos supporting the legacy_renum_only: HITS, Neighborhood_sampling + # for i in range(len(self.algos)): - if self.algos[i][0].name in ["pagerank", "katz"]: #set transpose=True when renumbering - if self.algos[i][0].name == "katz" and self.construct_graph.name == "from_dask_cudf_edgelist": - largest_out_degree = G.out_degree().compute().\ - nlargest(n=1, columns="degree") #compute outdegree before renumbering because outdegree has transpose=False - largest_out_degree = largest_out_degree["degree"].iloc[0] - katz_alpha = 1 / (largest_out_degree + 1) - self.algos[i][1]["alpha"] = katz_alpha - elif self.algos[i][0].name == "katz" and self.construct_graph.name == "from_cudf_edgelist": - largest_out_degree = G.out_degree().nlargest(n=1, columns="degree") - largest_out_degree = largest_out_degree["degree"].iloc[0] - katz_alpha = 1 / (largest_out_degree + 1) - self.algos[i][1]["alpha"] = katz_alpha - if hasattr(G, "compute_renumber_edge_list"): - G.compute_renumber_edge_list(transposed=True) - elif self.algos[i][0].name in ["neighborhood_sampling", "hits"]: - if hasattr(G, "compute_renumber_edge_list"): - G.compute_renumber_edge_list(transposed=False, legacy_renum_only=True) + # set transpose=True when renumbering + if self.algos[i][0].name in ["pagerank", "katz"]: + if self.algos[i][0].name == "katz": + if self.construct_graph.name == "from_dask_cudf_edgelist": + # compute out_degree before renumbering because out_degree + # has transpose=False + degree_max = G.degree()['degree'].max().compute() + katz_alpha = 1 / (degree_max) + self.algos[i][1]["alpha"] = katz_alpha + elif self.construct_graph.name == "from_cudf_edgelist": + degree_max = G.degree()['degree'].max() + katz_alpha = 1 / (degree_max) + self.algos[i][1]["alpha"] = katz_alpha + if hasattr(G, "compute_renumber_edge_list"): + G.compute_renumber_edge_list( + transposed=True, legacy_renum_only=True) + else: + # FIXME: Pagerank still follows the old path. Update this once it + # follows the pylibcugraph/C path + if hasattr(G, "compute_renumber_edge_list"): + G.compute_renumber_edge_list(transposed=True) else: #set transpose=False when renumbering self.__log("running compute_renumber_edge_list...", end="") if hasattr(G, "compute_renumber_edge_list"): - G.compute_renumber_edge_list(transposed=False) + if self.algos[i][0].name in ["wcc", "louvain"]: + # FIXME: Pagerank and Louvain still follow the old path. + # Update this once it follows the pylibcugraph/C path + G.compute_renumber_edge_list(transposed=False) + else: + G.compute_renumber_edge_list( + transposed=False, legacy_renum_only=True) self.__log("done.") # FIXME: need to handle individual algo args for ((algo, params), validator) in zip(self.algos, self.validators): diff --git a/benchmarks/python_e2e/cugraph_dask_funcs.py b/benchmarks/python_e2e/cugraph_dask_funcs.py index 98ca6feab20..b9d993795ae 100644 --- a/benchmarks/python_e2e/cugraph_dask_funcs.py +++ b/benchmarks/python_e2e/cugraph_dask_funcs.py @@ -18,12 +18,10 @@ from cugraph.structure.symmetrize import symmetrize_ddf from cugraph.dask.common.mg_utils import get_visible_devices from dask_cuda.initialize import initialize -from cugraph.experimental.dask import uniform_neighborhood_sampling import cudf import cugraph from cugraph.dask.comms import comms as Comms -from cugraph.dask.common.mg_utils import get_visible_devices from cugraph.generators import rmat import tempfile @@ -109,10 +107,15 @@ def construct_graph(dask_dataframe, symmetric=False): object must be symmetrized and have self loops removed. """ - G = cugraph.DiGraph() + if symmetric: + G = cugraph.Graph(directed=False) + else: + G = cugraph.Graph(directed=True) + if len(dask_dataframe.columns) > 2: if symmetric: #symmetrize dask dataframe - dask_dataframe = symmetrize_ddf(dask_dataframe, 'src', 'dst', 'weight') + dask_dataframe = symmetrize_ddf( + dask_dataframe, 'src', 'dst', 'weight') G.from_dask_cudf_edgelist( dask_dataframe, source="src", destination="dst", edge_attr="weight") @@ -130,11 +133,12 @@ def construct_graph(dask_dataframe, symmetric=False): def bfs(G, start): - return cugraph.dask.bfs(G, start=start, return_distances=True) + return cugraph.dask.bfs( + G, start=start, return_distances=True, check_start=False) def sssp(G, start): - return cugraph.dask.sssp(G, source=start) + return cugraph.dask.sssp(G, source=start, check_start=False) def wcc(G): @@ -156,15 +160,19 @@ def katz(G, alpha=None): def hits(G): return cugraph.dask.hits(G) -def neighborhood_sampling(G, start_info_list=None, fanout_vals=None): +def uniform_neighbor_sample(G, start_list=None, fanout_vals=None): # convert list to cudf.Series - start_info_list = ( - cudf.Series(start_info_list[0], dtype="int32"), - cudf.Series(start_info_list[1], dtype="int32"), - ) - - return uniform_neighborhood_sampling( - G, start_info_list=start_info_list, fanout_vals=fanout_vals) + start_list = cudf.Series(start_list, dtype="int32") + return cugraph.dask.uniform_neighbor_sample( + G, start_list=start_list, fanout_vals=fanout_vals) + +def triangle_count(G): + # FIXME: Update this calls once triangle_count is promoted + return cugraph.dask.triangle_count(G) + +def eigenvector_centrality(G): + # FIXME: Update this calls once triangle_count is promoted + return cugraph.dask.eigenvector_centrality(G) ################################################################################ # Session-wide setup and teardown diff --git a/benchmarks/python_e2e/cugraph_funcs.py b/benchmarks/python_e2e/cugraph_funcs.py index 09fcd45abed..cd05cde8200 100644 --- a/benchmarks/python_e2e/cugraph_funcs.py +++ b/benchmarks/python_e2e/cugraph_funcs.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -15,6 +15,7 @@ import cugraph from cugraph.generators import rmat +import cudf def generate_edgelist(scale, @@ -96,9 +97,9 @@ def construct_graph(dataframe, symmetric=False): symmetrized and have self loops removed. """ if symmetric: - G = cugraph.Graph() + G = cugraph.Graph(directed=False) else: - G = cugraph.DiGraph() + G = cugraph.Graph(directed=True) if len(dataframe.columns) > 2: G.from_cudf_edgelist( @@ -137,6 +138,22 @@ def pagerank(G): def katz(G, alpha=None): return cugraph.katz_centrality(G, alpha) +def hits(G): + return cugraph.hits(G) + +def uniform_neighbor_sample(G, start_list=None, fanout_vals=None): + # convert list to cudf.Series + start_list = cudf.Series(start_list, dtype="int32") + return cugraph.uniform_neighbor_sample( + G, start_list=start_list, fanout_vals=fanout_vals) + +def triangle_count(G): + # FIXME: Update this calls once triangle_count is promoted + return cugraph.experimental.triangle_count(G) + +def eigenvector_centrality(G): + # FIXME: Update this calls once triangle_count is promoted + return cugraph.eigenvector_centrality(G) ################################################################################ # Session-wide setup and teardown diff --git a/benchmarks/python_e2e/main.py b/benchmarks/python_e2e/main.py index bae0ce39b23..206be6e9362 100644 --- a/benchmarks/python_e2e/main.py +++ b/benchmarks/python_e2e/main.py @@ -88,7 +88,9 @@ def run(algos, "katz": funcs.katz, "wcc": funcs.wcc, "hits": funcs.hits, - "neighborhood_sampling": funcs.neighborhood_sampling, + "uniform_neighbor_sample": funcs.uniform_neighbor_sample, + "triangle_count": funcs.triangle_count, + "eigenvector_centrality": funcs.eigenvector_centrality, } if algos: diff --git a/benchmarks/python_e2e/reporting.py b/benchmarks/python_e2e/reporting.py index f13a7b28ce3..afc04241384 100644 --- a/benchmarks/python_e2e/reporting.py +++ b/benchmarks/python_e2e/reporting.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -43,15 +43,15 @@ def generate_console_report(benchmark_result_list): # the graph_create run, then a run of each algo. r = benchmark_result_list[0] name = f"{r.name}({__namify_dict(r.params)})" - space = " " * (30 - len(name)) + space = " " * (70 - len(name)) retstring += f"{name}{space}{r.runtime:.6}\n" remaining_results = benchmark_result_list[1:] for r in remaining_results: - retstring += f"{'-'*60}\n" + retstring += f"{'-'*80}\n" name = f"{r.name}({__namify_dict(r.params)})" - space = " " * (30 - len(name)) + space = " " * (70 - len(name)) retstring += f"{name}{space}{r.runtime:.6}\n" return retstring diff --git a/python/cugraph/cugraph/centrality/katz_centrality.py b/python/cugraph/cugraph/centrality/katz_centrality.py index 5c46fefe01b..5aff9f2dd2f 100644 --- a/python/cugraph/cugraph/centrality/katz_centrality.py +++ b/python/cugraph/cugraph/centrality/katz_centrality.py @@ -23,7 +23,7 @@ def katz_centrality( - G, alpha=None, beta=None, max_iter=100, tol=1.0e-6, + G, alpha=None, beta=1.0, max_iter=100, tol=1.0e-6, nstart=None, normalized=True ): """ @@ -114,11 +114,16 @@ def katz_centrality( >>> kc = cugraph.katz_centrality(G) """ + G, isNx = ensure_cugraph_obj_for_nx(G) + + if alpha is None: + degree_max = G.degree()['degree'].max() + alpha = 1 / (degree_max) + if (alpha is not None) and (alpha <= 0.0): raise ValueError(f"'alpha' must be a positive float or None, " f"got: {alpha}") - if beta is None: - beta = 1.0 + elif (not isinstance(beta, float)) or (beta <= 0.0): raise ValueError(f"'beta' must be a positive float or None, " f"got: {beta}") @@ -128,8 +133,6 @@ def katz_centrality( if (not isinstance(tol, float)) or (tol <= 0.0): raise ValueError(f"'tol' must be a positive float, got: {tol}") - G, isNx = ensure_cugraph_obj_for_nx(G) - srcs = G.edgelist.edgelist_df['src'] dsts = G.edgelist.edgelist_df['dst'] if 'weights' in G.edgelist.edgelist_df.columns: @@ -139,11 +142,6 @@ def katz_centrality( # with type hardcoded to float32 is passed into wrapper weights = cudf.Series((srcs + 1) / (srcs + 1), dtype="float32") - if alpha is None: - largest_out_degree = G.degrees().nlargest(n=1, columns="out_degree") - largest_out_degree = largest_out_degree["out_degree"].iloc[0] - alpha = 1 / (largest_out_degree + 1) - if nstart is not None: if G.renumbered is True: if len(G.renumber_map.implementation.col_names) > 1: diff --git a/python/cugraph/cugraph/dask/centrality/eigenvector_centrality.py b/python/cugraph/cugraph/dask/centrality/eigenvector_centrality.py index 7b251482d47..02ee00ebdd8 100644 --- a/python/cugraph/cugraph/dask/centrality/eigenvector_centrality.py +++ b/python/cugraph/cugraph/dask/centrality/eigenvector_centrality.py @@ -141,7 +141,8 @@ def eigenvector_centrality( """ client = default_client() # Calling renumbering results in data that is sorted by degree - input_graph.compute_renumber_edge_list(transposed=False) + input_graph.compute_renumber_edge_list( + transposed=False, legacy_renum_only=True) graph_properties = GraphProperties( is_multigraph=False) diff --git a/python/cugraph/cugraph/dask/centrality/katz_centrality.py b/python/cugraph/cugraph/dask/centrality/katz_centrality.py index dca8c6637f5..e3549685ba2 100644 --- a/python/cugraph/cugraph/dask/centrality/katz_centrality.py +++ b/python/cugraph/cugraph/dask/centrality/katz_centrality.py @@ -174,6 +174,21 @@ def katz_centrality( """ client = default_client() + if alpha is None: + degree_max = input_graph.degree()['degree'].max().compute() + alpha = 1 / (degree_max) + + if (alpha is not None) and (alpha <= 0.0): + raise ValueError(f"'alpha' must be a positive float or None, " + f"got: {alpha}") + + # FIXME: 'legacy_renum_only' will not trigger the C++ renumbering + # In the future, once all the algos follow the C/Pylibcugraph path, + # compute_renumber_edge_list will only be used for multicolumn and + # string vertices since the renumbering will be done in pylibcugraph + input_graph.compute_renumber_edge_list(transposed=True, + legacy_renum_only=False) + graph_properties = GraphProperties( is_multigraph=False) @@ -188,10 +203,6 @@ def katz_centrality( num_edges = len(ddf) data = get_distributed_data(ddf) - # FIXME: Incorporate legacy_renum_only=True to only trigger the python - # renumbering when more support is added in the C/C++ API - input_graph.compute_renumber_edge_list(transposed=True, - legacy_renum_only=False) vertex_partition_offsets = get_vertex_partition_offsets(input_graph) num_verts = vertex_partition_offsets.iloc[-1] diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index 30d252aebe6..b394ac138ec 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -93,14 +93,18 @@ async def _extract_partitions(dask_obj, client=None, batch_enabled=False): persisted = [client.persist( dask_obj.get_partition(p), workers=w) for p, w in enumerate( worker_list[:dask_obj.npartitions])] - # Persist empty dataframe with the remaining workers if there are - # less partitions than workers + # Persist empty dataframe/series with the remaining workers if + # there are less partitions than workers if dask_obj.npartitions < len(worker_list): # The empty df should have the same column names and dtypes as # dask_obj - empty_df = cudf.DataFrame(columns=list(dask_obj.columns)) - empty_df = empty_df.astype(dict(zip( - dask_obj.columns, dask_obj.dtypes))) + if isinstance(dask_obj, dask_cudf.DataFrame): + empty_df = cudf.DataFrame(columns=list(dask_obj.columns)) + empty_df = empty_df.astype(dict(zip( + dask_obj.columns, dask_obj.dtypes))) + else: + empty_df = cudf.Series(dtype=dask_obj.dtype) + for p, w in enumerate(worker_list[dask_obj.npartitions:]): empty_ddf = dask_cudf.from_cudf(empty_df, npartitions=1) persisted.append(client.persist(empty_ddf, workers=w)) diff --git a/python/cugraph/cugraph/dask/traversal/bfs.py b/python/cugraph/cugraph/dask/traversal/bfs.py index f03b511e316..472627717aa 100644 --- a/python/cugraph/cugraph/dask/traversal/bfs.py +++ b/python/cugraph/cugraph/dask/traversal/bfs.py @@ -40,7 +40,7 @@ def _call_plc_mg_bfs( return_predecessors=True): comms_handle = Comms.get_handle(sID) resource_handle = ResourceHandle(comms_handle.getHandle()) - + sources = sources[0] srcs = cudf.Series(data[0][src_col_name], dtype='int32') dsts = cudf.Series(data[0][dst_col_name], dtype='int32') weights = cudf.Series(data[0]['value'], dtype='float32') \ @@ -87,7 +87,8 @@ def convert_to_cudf(cp_arrays): def bfs(input_graph, start, depth_limit=None, - return_distances=True): + return_distances=True, + check_start=True): """ Find the distances and predecessors for a breadth first traversal of a graph. @@ -101,10 +102,9 @@ def bfs(input_graph, as dask cudf edge list dataframe (edge weights are not used for this algorithm). - start : Integer or list - The id of the graph vertex from which the traversal begins, or - if a list, the vertex from which the traversal begins in each - component of the graph. Only one vertex per connected + start : Integer or list or cudf object or dask_cudf object + The id(s) of the graph vertex from which the traversal begins + in each component of the graph. Only one vertex per connected component of the graph is allowed. depth_limit : Integer or None, optional (default=None) @@ -113,6 +113,10 @@ def bfs(input_graph, return_distances : bool, optional (default=True) Indicates if distances should be returned + check_start : bool, optional (default=True) + If True, performs more extensive tests on the start vertices + to ensure validitity, at the expense of increased run time. + Returns ------- df : dask_cudf.DataFrame @@ -145,7 +149,8 @@ def bfs(input_graph, client = default_client() - input_graph.compute_renumber_edge_list(transposed=False) + input_graph.compute_renumber_edge_list( + transposed=False, legacy_renum_only=True) ddf = input_graph.edgelist.edgelist_df graph_properties = GraphProperties( @@ -156,54 +161,40 @@ def bfs(input_graph, src_col_name = input_graph.renumber_map.renumbered_src_col_name dst_col_name = input_graph.renumber_map.renumbered_dst_col_name - renumber_ddf = input_graph.renumber_map.implementation.ddf - col_names = input_graph.renumber_map.implementation.col_names - - if isinstance(start, dask_cudf.DataFrame) \ - or isinstance(start, cudf.DataFrame): - tmp_df = start - tmp_col_names = start.columns - else: - tmp_df = cudf.DataFrame() - tmp_df["0"] = cudf.Series(start) - tmp_col_names = ["0"] - - original_start_len = len(tmp_df) - - tmp_ddf = tmp_df[tmp_col_names].rename( - columns=dict(zip(tmp_col_names, col_names))) - for name in col_names: - tmp_ddf[name] = tmp_ddf[name].astype(renumber_ddf[name].dtype) - renumber_data = get_distributed_data(renumber_ddf) - - def df_merge(df, tmp_df, tmp_col_names): - x = df[0].merge(tmp_df, on=tmp_col_names, how='inner') - return x['global_id'] - - start = [ - client.submit( - df_merge, - wf[1], - tmp_ddf, - col_names, - workers=[wf[0]] - ) - for idx, wf in enumerate(renumber_data.worker_to_parts.items()) - ] - def count_src(df): - return len(df) + if not isinstance(start, (dask_cudf.DataFrame, dask_cudf.Series)): + if not isinstance(start, (cudf.DataFrame, cudf.Series)): + start = cudf.Series(start) + if isinstance(start, (cudf.DataFrame, cudf.Series)): + # convert into a dask_cudf + start = dask_cudf.from_cudf(start, ddf.npartitions) + + def check_valid_vertex(G, start): + is_valid_vertex = G.has_node(start) + if not is_valid_vertex: + raise ValueError( + 'At least one start vertex provided was invalid') + + if check_start: + check_valid_vertex(input_graph, start) - count_src_results = client.map(count_src, start) - cg = client.gather(count_src_results) - if sum(cg) < original_start_len: - raise ValueError('At least one start vertex provided was invalid') + if input_graph.renumbered: + if isinstance(start, dask_cudf.DataFrame): + tmp_col_names = start.columns + + elif isinstance(start, dask_cudf.Series): + tmp_col_names = None + + start = input_graph.lookup_internal_vertex_id( + start, tmp_col_names) + + data_start = get_distributed_data(start) cupy_result = [client.submit( _call_plc_mg_bfs, Comms.get_session_id(), wf[1], - start[idx], + wf_start[1], depth_limit, src_col_name, dst_col_name, @@ -213,7 +204,10 @@ def count_src(df): True, return_distances, workers=[wf[0]]) - for idx, wf in enumerate(data.worker_to_parts.items())] + for idx, (wf, wf_start) in enumerate( + zip(data.worker_to_parts.items(), + data_start.worker_to_parts.items()))] + wait(cupy_result) cudf_result = [client.submit(convert_to_cudf, diff --git a/python/cugraph/cugraph/dask/traversal/sssp.py b/python/cugraph/cugraph/dask/traversal/sssp.py index bcee10fa377..ed7300a1223 100644 --- a/python/cugraph/cugraph/dask/traversal/sssp.py +++ b/python/cugraph/cugraph/dask/traversal/sssp.py @@ -13,7 +13,6 @@ # limitations under the License. # -from collections.abc import Iterable from dask.distributed import wait, default_client from cugraph.dask.common.input_utils import get_distributed_data @@ -76,7 +75,7 @@ def _call_plc_sssp( }) -def sssp(input_graph, source, cutoff=None): +def sssp(input_graph, source, cutoff=None, check_source=True): """ Compute the distance and predecessors for shortest paths from the specified source to all the vertices in the input_graph. The distances column will @@ -99,6 +98,10 @@ def sssp(input_graph, source, cutoff=None): cutoff : double, optional (default = None) Maximum edge weight sum considered by the algorithm + check_source : bool, optional (default=True) + If True, performs more extensive tests on the start vertices + to ensure validitity, at the expense of increased run time. + Returns ------- df : dask_cudf.DataFrame @@ -130,48 +133,35 @@ def sssp(input_graph, source, cutoff=None): """ client = default_client() - - input_graph.compute_renumber_edge_list(transposed=False) + # FIXME: 'legacy_renum_only' will not trigger the C++ renumbering + # In the future, once all the algos follow the C/Pylibcugraph path, + # compute_renumber_edge_list will only be used for multicolumn and + # string vertices since the renumbering will be done in pylibcugraph + input_graph.compute_renumber_edge_list( + transposed=False, legacy_renum_only=True) ddf = input_graph.edgelist.edgelist_df num_edges = len(ddf) data = get_distributed_data(ddf) - if input_graph.renumbered: - src_col_name = input_graph.renumber_map.renumbered_src_col_name - dst_col_name = input_graph.renumber_map.renumbered_dst_col_name - - source = input_graph.lookup_internal_vertex_id( - cudf.Series([source])).fillna(-1).compute() - source = source.iloc[0] + src_col_name = input_graph.renumber_map.renumbered_src_col_name + dst_col_name = input_graph.renumber_map.renumbered_dst_col_name - if source < 0: + def check_valid_vertex(G, source): + is_valid_vertex = G.has_node(source) + if not is_valid_vertex: raise ValueError('Invalid source vertex') - else: - # If the input graph was created with renumbering disabled (Graph(..., - # renumber=False), the above compute_renumber_edge_list() call will not - # perform a renumber step and the renumber_map will not have src/dst - # col names. In that case, the src/dst values specified when reading - # the edgelist dataframe are to be used, but only if they were single - # string values (ie. not a list representing multi-columns). - if isinstance(input_graph.source_columns, Iterable): - raise RuntimeError("input_graph was not renumbered but has a " - "non-string source column name (got: " - f"{input_graph.source_columns}). Re-create " - "input_graph with either renumbering enabled " - "or a source column specified as a string.") - if isinstance(input_graph.destination_columns, Iterable): - raise RuntimeError("input_graph was not renumbered but has a " - "non-string destination column name (got: " - f"{input_graph.destination_columns}). " - "Re-create input_graph with either renumbering " - "enabled or a destination column specified as " - "a string.") - src_col_name = input_graph.source_columns - dst_col_name = input_graph.destination_columns + + if check_source: + check_valid_vertex(input_graph, source) if cutoff is None: cutoff = cupy.inf + if input_graph.renumbered: + source = input_graph.lookup_internal_vertex_id( + cudf.Series([source])).fillna(-1).compute() + source = source.iloc[0] + result = [client.submit( _call_plc_sssp, Comms.get_session_id(), diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index afed5ad97f8..90d4e9da549 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -36,7 +36,7 @@ class Properties: def __init__(self, properties): self.multi_edge = getattr(properties, 'multi_edge', False) self.directed = properties.directed - self.renumbered = False + self.renumber = False self.store_transposed = False self.self_loop = None self.isolated_vertices = None @@ -125,7 +125,7 @@ def __from_edgelist( # FIXME: Edge Attribute not handled # FIXME: the parameter below is no longer used for unrenumbering - self.properties.renumbered = renumber + self.properties.renumber = renumber self.source_columns = source self.destination_columns = destination @@ -135,11 +135,12 @@ def renumbered(self): # by checking the column name. Only the renumbered dataframes will have # their column names renamed to 'renumbered_src' and 'renumbered_dst' renumbered_vertex_col_names = ["renumbered_src", "renumbered_dst"] - if self.edgelist.edgelist_df is not None and not ( - set(renumbered_vertex_col_names).issubset( - set(self.edgelist.edgelist_df.columns))): - return False - return True + if self.edgelist is not None: + if self.edgelist.edgelist_df is not None and ( + set(renumbered_vertex_col_names).issubset( + set(self.edgelist.edgelist_df.columns))): + return True + return False def view_edge_list(self): """ @@ -161,13 +162,13 @@ def view_edge_list(self): Returns ------- - df : cudf.DataFrame - This cudf.DataFrame wraps source, destination and weight - df[src] : cudf.Series + df : dask_cudf.DataFrame + This dask_cudf.DataFrame wraps source, destination and weight + df[src] : dask_cudf.Series contains the source index for each edge - df[dst] : cudf.Series + df[dst] : dask_cudf.Series contains the destination index for each edge - df[weight] : cusd.Series + df[weight] : dask_cudf.Series Column is only present for weighted Graph, then containing the weight value for each edge """ @@ -224,32 +225,80 @@ def in_degree(self, vertex_subset=None): Parameters ---------- - vertex_subset : cudf.Series or iterable container, opt. (default=None) + vertex_subset : cudf or dask_cudf object, iterable container, + opt. (default=None) A container of vertices for displaying corresponding in-degree. If not set, degrees are computed for the entire set of vertices. Returns ------- - df : cudf.DataFrame - GPU DataFrame of size N (the default) or the size of the given - vertices (vertex_subset) containing the in_degree. The ordering is - relative to the adjacency list, or that given by the specified - vertex_subset. - df[vertex] : cudf.Series + df : dask_cudf.DataFrame + Distributed GPU DataFrame of size N (the default) or the size of + the given vertices (vertex_subset) containing the in_degree. + The ordering is relative to the adjacency list, or that given by + the specified vertex_subset. + df[vertex] : dask_cudf.Series The vertex IDs (will be identical to vertex_subset if specified). - df[degree] : cudf.Series + df[degree] : dask_cudf.Series The computed in-degree of the corresponding vertex. Examples -------- - >>> M = cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', + >>> M = dask_cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', ... dtype=['int32', 'int32', 'float32'], header=None) >>> G = cugraph.Graph() - >>> G.from_cudf_edgelist(M, '0', '1') + >>> G.from_dask_cudf_edgelist(M, '0', '1') >>> df = G.in_degree([0,9,12]) """ - return self._degree(vertex_subset, direction=Direction.IN) + src_col_name = self.source_columns + dst_col_name = self.destination_columns + + # select only the vertex columns + if not isinstance(src_col_name, list) and \ + not isinstance(dst_col_name, list): + vertex_col_names = [src_col_name] + [dst_col_name] + + df = self.input_df[vertex_col_names] + df = df.drop(columns=src_col_name) + + nodes = self.nodes() + if isinstance(nodes, dask_cudf.Series): + nodes = nodes.to_frame() + + if not isinstance(dst_col_name, list): + df = df.rename(columns={dst_col_name: "vertex"}) + dst_col_name = "vertex" + + vertex_col_names = df.columns + nodes.columns = vertex_col_names + + df["degree"] = 1 + in_degree = df.groupby(dst_col_name).degree.count().reset_index() + + # Add vertices with zero in_degree + in_degree = nodes.merge(in_degree, how='outer').fillna(0) + + # Convert vertex_subset to dataframe. + if vertex_subset is not None: + if not isinstance(vertex_subset, ( + dask_cudf.DataFrame, cudf.DataFrame)): + if isinstance(vertex_subset, dask_cudf.Series): + vertex_subset = vertex_subset.to_frame() + else: + df = cudf.DataFrame() + if isinstance(vertex_subset, (cudf.Series, list)): + df["vertex"] = vertex_subset + vertex_subset = df + if isinstance(vertex_subset, ( + dask_cudf.DataFrame, cudf.DataFrame)): + vertex_subset.columns = vertex_col_names + in_degree = in_degree.merge(vertex_subset, how='inner') + else: + raise TypeError(f"Expected type are: cudf, dask_cudf objects, " + f"iterable container, got " + f"{type(vertex_subset)}") + return in_degree def out_degree(self, vertex_subset=None): """ @@ -261,32 +310,82 @@ def out_degree(self, vertex_subset=None): Parameters ---------- - vertex_subset : cudf.Series or iterable container, opt. (default=None) + vertex_subset : cudf or dask_cudf object, iterable container, + opt. (default=None) A container of vertices for displaying corresponding out-degree. If not set, degrees are computed for the entire set of vertices. Returns ------- - df : cudf.DataFrame - GPU DataFrame of size N (the default) or the size of the given - vertices (vertex_subset) containing the out_degree. The ordering is - relative to the adjacency list, or that given by the specified - vertex_subset. - df[vertex] : cudf.Series + df : dask_cudf.DataFrame + Distributed GPU DataFrame of size N (the default) or the size of + the given vertices (vertex_subset) containing the out_degree. + The ordering is relative to the adjacency list, or that given by + the specified vertex_subset. + df[vertex] : dask_cudf.Series The vertex IDs (will be identical to vertex_subset if specified). - df[degree] : cudf.Series + df[degree] : dask_cudf.Series The computed out-degree of the corresponding vertex. Examples -------- - >>> M = cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', + >>> M = dask_cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', ... dtype=['int32', 'int32', 'float32'], header=None) >>> G = cugraph.Graph() - >>> G.from_cudf_edgelist(M, '0', '1') + >>> G.from_dask_cudf_edgelist(M, '0', '1') >>> df = G.out_degree([0,9,12]) """ - return self._degree(vertex_subset, direction=Direction.OUT) + src_col_name = self.source_columns + dst_col_name = self.destination_columns + + # select only the vertex columns + if not isinstance(src_col_name, list) and \ + not isinstance(dst_col_name, list): + vertex_col_names = [src_col_name] + [dst_col_name] + + df = self.input_df[vertex_col_names] + df = df.drop(columns=dst_col_name) + + nodes = self.nodes() + if isinstance(nodes, dask_cudf.Series): + nodes = nodes.to_frame() + + if not isinstance(src_col_name, list): + df = df.rename(columns={src_col_name: "vertex"}) + src_col_name = "vertex" + + vertex_col_names = df.columns + + nodes.columns = vertex_col_names + + df["degree"] = 1 + out_degree = df.groupby(src_col_name).degree.count().reset_index() + + # Add vertices with zero out_degree + out_degree = nodes.merge(out_degree, how='outer').fillna(0) + + # Convert vertex_subset to dataframe. + if vertex_subset is not None: + if not isinstance(vertex_subset, ( + dask_cudf.DataFrame, cudf.DataFrame)): + if isinstance(vertex_subset, dask_cudf.Series): + vertex_subset = vertex_subset.to_frame() + else: + df = cudf.DataFrame() + if isinstance(vertex_subset, (cudf.Series, list)): + df["vertex"] = vertex_subset + vertex_subset = df + if isinstance(vertex_subset, ( + dask_cudf.DataFrame, cudf.DataFrame)): + vertex_subset.columns = vertex_col_names + out_degree = out_degree.merge(vertex_subset, how='inner') + else: + raise TypeError(f"Expected type are: cudf, dask_cudf objects, " + f"iterable container, got " + f"{type(vertex_subset)}") + + return out_degree def degree(self, vertex_subset=None): """ @@ -295,34 +394,42 @@ def degree(self, vertex_subset=None): degrees for the entire set of vertices. If vertex_subset is provided, then this method optionally filters out all but those listed in vertex_subset. + Parameters ---------- - vertex_subset : cudf.Series or iterable container, optional + vertex_subset : cudf or dask_cudf object, iterable container, + opt. (default=None) a container of vertices for displaying corresponding degree. If not set, degrees are computed for the entire set of vertices. Returns ------- - df : cudf.DataFrame - GPU DataFrame of size N (the default) or the size of the given - vertices (vertex_subset) containing the degree. The ordering is - relative to the adjacency list, or that given by the specified - vertex_subset. - df['vertex'] : cudf.Series + df : dask_cudf.DataFrame + Distributed GPU DataFrame of size N (the default) or the size of + the given vertices (vertex_subset) containing the degree. + The ordering is relative to the adjacency list, or that given by + the specified vertex_subset. + df['vertex'] : dask_cudf.Series The vertex IDs (will be identical to vertex_subset if specified). - df['degree'] : cudf.Series + df['degree'] : dask_cudf.Series The computed degree of the corresponding vertex. Examples -------- - >>> M = cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', + >>> M = dask_cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', ... dtype=['int32', 'int32', 'float32'], header=None) >>> G = cugraph.Graph() - >>> G.from_cudf_edgelist(M, '0', '1') + >>> G.from_dask_cudf_edgelist(M, '0', '1') >>> all_df = G.degree() >>> subset_df = G.degree([0,9,12]) """ - raise NotImplementedError("Not supported for distributed graph") + + vertex_in_degree = self.in_degree(vertex_subset) + vertex_out_degree = self.out_degree(vertex_subset) + vertex_degree = dask_cudf.concat([vertex_in_degree, vertex_out_degree]) + vertex_degree = vertex_degree.groupby(['vertex'], as_index=False).sum() + + return vertex_degree # FIXME: vertex_subset could be a DataFrame for multi-column vertices def degrees(self, vertex_subset=None): @@ -340,25 +447,25 @@ def degrees(self, vertex_subset=None): Returns ------- - df : cudf.DataFrame - GPU DataFrame of size N (the default) or the size of the given - vertices (vertex_subset) containing the degrees. The ordering is - relative to the adjacency list, or that given by the specified - vertex_subset. - df['vertex'] : cudf.Series + df : dask_cudf.DataFrame + Distributed GPU DataFrame of size N (the default) or the size of + the given vertices (vertex_subset) containing the degrees. + The ordering is relative to the adjacency list, or that given by + the specified vertex_subset. + df['vertex'] : dask_cudf.Series The vertex IDs (will be identical to vertex_subset if specified). - df['in_degree'] : cudf.Series + df['in_degree'] : dask_cudf.Series The in-degree of the vertex. - df['out_degree'] : cudf.Series + df['out_degree'] : dask_cudf.Series The out-degree of the vertex. Examples -------- - >>> M = cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', + >>> M = dask_cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', ... dtype=['int32', 'int32', 'float32'], header=None) >>> G = cugraph.Graph() - >>> G.from_cudf_edgelist(M, '0', '1') + >>> G.from_dask_cudf_edgelist(M, '0', '1') >>> df = G.degrees([0,9,12]) """ @@ -371,7 +478,7 @@ def _degree(self, vertex_subset, direction=Direction.ALL): df["vertex"] = vertex_col df["degree"] = degree_col - if self.properties.renumbered is True: + if self.renumbered is True: df = self.renumber_map.unrenumber(df, "vertex") if vertex_subset is not None: @@ -393,10 +500,10 @@ def to_directed(self, DiG): Examples -------- - >>> M = cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', + >>> M = dask_cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', ... dtype=['int32', 'int32', 'float32'], header=None) >>> G = cugraph.Graph() - >>> G.from_cudf_edgelist(M, '0', '1') + >>> G.from_dask_cudf_edgelist(M, '0', '1') >>> DiG = G.to_directed() """ @@ -415,10 +522,10 @@ def to_undirected(self, G): Examples -------- - >>> M = cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', + >>> M = dask_cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', ... dtype=['int32', 'int32', 'float32'], header=None) >>> DiG = cugraph.Graph(directed=True) - >>> DiG.from_cudf_edgelist(M, '0', '1') + >>> DiG.dask_from_cudf_edgelist(M, '0', '1') >>> G = DiG.to_undirected() """ @@ -427,20 +534,51 @@ def to_undirected(self, G): def has_node(self, n): """ - Returns True if the graph contains the node n. - """ - if self.edgelist is None: - raise RuntimeError("Graph has no Edgelist.") - # FIXME: Check renumber map - ddf = self.edgelist.edgelist_df[["src", "dst"]] - return (ddf == n).any().any().compute() + + Returns True if the graph contains the node(s) n. + Examples + -------- + >>> M = dask_cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', + ... dtype=['int32', 'int32', 'float32'], header=None) + >>> G = cugraph.Graph(directed=True) + >>> G.from_dask_cudf_edgelist(M, '0', '1') + >>> valid_source = cudf.Series([5]) + >>> invalid_source = cudf.Series([55]) + >>> is_valid_vertex = G.has_node(valid_source) + >>> assert is_valid_vertex is True + >>> is_valid_vertex = G.has_node(invalid_source) + >>> assert is_valid_vertex is False + """ + + # Convert input to dataframes so that it can be compared through merge + if not isinstance(n, (dask_cudf.DataFrame, cudf.DataFrame)): + if isinstance(n, dask_cudf.Series): + n = n.to_frame() + else: + df = cudf.DataFrame() + if not isinstance(n, (cudf.DataFrame, cudf.Series)): + n = [n] + if isinstance(n, (cudf.Series, list)): + df["vertex"] = n + n = df + + if isinstance(n, (dask_cudf.DataFrame, cudf.DataFrame)): + nodes = self.nodes() + if not isinstance(self.nodes(), ( + dask_cudf.DataFrame, cudf.DataFrame)): + nodes = nodes.to_frame() + + nodes.columns = n.columns + + valid_vertex = nodes.merge(n, how="inner") + return len(valid_vertex) == len(n) def has_edge(self, u, v): """ Returns True if the graph contains the edge (u,v). """ # TODO: Verify Correctness - if self.properties.renumbered: + if self.renumbered: src_col_name = self.renumber_map.renumbered_src_col_name tmp = cudf.DataFrame({src_col_name: [u, v]}) @@ -465,10 +603,31 @@ def edges(self): def nodes(self): """ - Returns all the nodes in the graph as a cudf.Series + Returns all nodes in the graph as a dask_cudf.Series. + If multi columns vertices, return a dask_cudf.DataFrame. + + If the edgelist was renumbered, this call returns the internal + nodes in the graph. To get the original nodes, convert the result to + a dataframe and do 'renumber_map.unrenumber' or 'G.unrenumber' """ - # FIXME: Return renumber map nodes - raise NotImplementedError("Not supported for distributed graph") + + if self.renumbered: + # FIXME: This relies on current implementation + # of NumberMap, should not really expose + # this, perhaps add a method to NumberMap + + df = self.renumber_map.implementation.ddf.drop(columns="global_id") + + if len(df.columns) > 1: + return df + else: + return df[df.columns[0]] + + else: + df = self.input_df + return dask_cudf.concat( + [df[self.source_columns], + df[self.destination_columns]]).drop_duplicates() def neighbors(self, n): if self.edgelist is None: @@ -507,7 +666,7 @@ def compute_renumber_edge_list(self, This parameter is added for new algos following the C/Pylibcugraph path """ - if not self.properties.renumbered: + if not self.properties.renumber: self.edgelist = self.EdgeList(self.input_df) self.renumber_map = None else: @@ -534,7 +693,7 @@ def compute_renumber_edge_list(self, self.properties.store_transposed = transposed def vertex_column_size(self): - if self.properties.renumbered: + if self.renumbered: return self.renumber_map.vertex_column_size() else: return 1 diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py index 54048307eb3..5929eb75030 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py @@ -160,6 +160,11 @@ def __from_edgelist( else: if type(source) is list and type(destination) is list: raise ValueError("set renumber to True for multi column ids") + elif (elist[source].dtype not in [np.int32, np.int64] or + elist[destination].dtype not in [np.int32, np.int64]): + raise ValueError( + "set renumber to True for non integer columns ids" + ) # The dataframe will be symmetrized iff the graph is undirected # otherwise the inital dataframe will be returned. Duplicated edges @@ -831,17 +836,20 @@ def edges(self): def nodes(self): """ - Returns all the nodes in the graph as a cudf.Series + Returns all the nodes in the graph as a cudf.Series. + If multi columns vertices, return a cudf.DataFrame. """ if self.edgelist is not None: df = self.edgelist.edgelist_df if self.properties.renumbered: - # FIXME: If vertices are multicolumn - # this needs to return a dataframe # FIXME: This relies on current implementation # of NumberMap, should not really expose # this, perhaps add a method to NumberMap - return self.renumber_map.implementation.df["0"] + df = self.renumber_map.implementation.df.drop(columns="id") + if len(df.columns) > 1: + return df + else: + return df[df.columns[0]] else: return cudf.concat([df["src"], df["dst"]]).unique() if self.adjlist is not None: diff --git a/python/cugraph/cugraph/structure/number_map.py b/python/cugraph/cugraph/structure/number_map.py index 4f6edf5fcf2..0d51d806c58 100644 --- a/python/cugraph/cugraph/structure/number_map.py +++ b/python/cugraph/cugraph/structure/number_map.py @@ -355,9 +355,8 @@ def to_internal_vertex_id(self, df, col_names=None): tmp_df["0"] = df tmp_col_names = ["0"] elif type(df) is dask_cudf.Series: - tmp_df = dask_cudf.DataFrame() - tmp_df["0"] = df - tmp_col_names = ["0"] + tmp_df = df.to_frame() + tmp_col_names = tmp_df.columns else: tmp_df = df tmp_col_names = col_names diff --git a/python/cugraph/cugraph/tests/mg/test_mg_degree.py b/python/cugraph/cugraph/tests/mg/test_mg_degree.py index ec3979630ed..d76f78b65bd 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_degree.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_degree.py @@ -62,23 +62,37 @@ def test_dask_mg_degree(dask_client, directed): dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist(ddf, "src", "dst") + dg.compute_renumber_edge_list() g = cugraph.Graph(directed=directed) g.from_cudf_edgelist(df, "src", "dst") - merge_df_in = ( + merge_df_in_degree = ( dg.in_degree() .merge(g.in_degree(), on="vertex", suffixes=["_dg", "_g"]) .compute() ) - merge_df_out = ( + merge_df_out_degree = ( dg.out_degree() .merge(g.out_degree(), on="vertex", suffixes=["_dg", "_g"]) .compute() ) - assert_series_equal(merge_df_in["degree_dg"], merge_df_in["degree_g"], - check_names=False) - assert_series_equal(merge_df_out["degree_dg"], merge_df_out["degree_g"], - check_names=False) + merge_df_degree = ( + dg.degree() + .merge(g.degree(), on="vertex", suffixes=["_dg", "_g"]) + .compute() + ) + + assert_series_equal( + merge_df_in_degree["degree_dg"], merge_df_in_degree["degree_g"], + check_names=False, check_dtype=False) + + assert_series_equal( + merge_df_out_degree["degree_dg"], merge_df_out_degree["degree_g"], + check_names=False, check_dtype=False) + + assert_series_equal( + merge_df_degree["degree_dg"], merge_df_degree["degree_g"], + check_names=False, check_dtype=False) diff --git a/python/cugraph/cugraph/tests/mg/test_mg_graph.py b/python/cugraph/cugraph/tests/mg/test_mg_graph.py new file mode 100644 index 00000000000..b5d3529926e --- /dev/null +++ b/python/cugraph/cugraph/tests/mg/test_mg_graph.py @@ -0,0 +1,119 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import gc + +import pytest +import cugraph.dask as dcg +import dask_cudf +from cugraph.testing import utils +import cugraph +import random + + +# ============================================================================= +# Pytest Setup / Teardown - called for each test function +# ============================================================================= +def setup_function(): + gc.collect() + + +# ============================================================================= +# Pytest fixtures +# ============================================================================= +IS_DIRECTED = [True, False] + +datasets = utils.DATASETS_UNDIRECTED + utils.DATASETS_UNRENUMBERED + +fixture_params = utils.genFixtureParamsProduct( + (datasets, "graph_file"), + (IS_DIRECTED, "directed"), + ([True, False], "legacy_renum_only") + ) + + +@pytest.fixture(scope="module", params=fixture_params) +def input_combo(request): + """ + Simply return the current combination of params as a dictionary for use in + tests or other parameterized fixtures. + """ + parameters = dict(zip(("graph_file", + "directed", + "legacy_renum_only"), request.param)) + + input_data_path = parameters["graph_file"] + directed = parameters["directed"] + legacy_renum_only = parameters["legacy_renum_only"] + + chunksize = dcg.get_chunksize(input_data_path) + ddf = dask_cudf.read_csv( + input_data_path, + chunksize=chunksize, + delimiter=" ", + names=["src", "dst", "value"], + dtype=["int32", "int32", "float32"], + ) + parameters["input_df"] = ddf + + dg = cugraph.Graph(directed=directed) + dg.from_dask_cudf_edgelist( + ddf, source='src', destination='dst', edge_attr='value') + + dg.compute_renumber_edge_list(legacy_renum_only=legacy_renum_only) + + parameters["MGGraph"] = dg + + return parameters + + +def test_nodes_functionality(dask_client, input_combo): + G = input_combo["MGGraph"] + ddf = input_combo["input_df"] + + # Series has no attributed sort_values so convert the Series + # to a DataFrame + nodes = G.nodes().to_frame() + col_name = nodes.columns[0] + nodes = nodes.rename(columns={col_name: "result_nodes"}) + + result_nodes = nodes.compute().sort_values( + "result_nodes").reset_index(drop=True) + + expected_nodes = dask_cudf.concat( + [ddf["src"], ddf["dst"]]).drop_duplicates().to_frame().sort_values(0) + + expected_nodes = expected_nodes.compute().reset_index(drop=True) + + result_nodes["expected_nodes"] = expected_nodes[0] + + compare = result_nodes.query('result_nodes != expected_nodes') + + assert len(compare) == 0 + + +def test_has_node_functionality(dask_client, input_combo): + + G = input_combo["MGGraph"] + + valid_nodes = G.nodes().compute() + + # randomly sample k nodes from the graph + k = random.randint(1, 20) + n = valid_nodes.sample(k).reset_index(drop=True) + print("nodes are \n", n) + + assert G.has_node(n) + + invalid_node = valid_nodes.max() + 1 + + assert G.has_node(invalid_node) is False diff --git a/python/cugraph/cugraph/tests/mg/test_mg_katz_centrality.py b/python/cugraph/cugraph/tests/mg/test_mg_katz_centrality.py index f258cca7e54..9160be73fcc 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_katz_centrality.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_katz_centrality.py @@ -56,10 +56,8 @@ def test_dask_katz_centrality(dask_client, directed): dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst") - largest_out_degree = dg.out_degree().compute().\ - nlargest(n=1, columns="degree") - largest_out_degree = largest_out_degree["degree"].iloc[0] - katz_alpha = 1 / (largest_out_degree + 1) + degree_max = dg.degree()['degree'].max().compute() + katz_alpha = 1 / (degree_max) mg_res = dcg.katz_centrality(dg, alpha=katz_alpha, tol=1e-6) mg_res = mg_res.compute() @@ -117,12 +115,7 @@ def test_dask_katz_centrality_nstart(dask_client, directed): dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst") - largest_out_degree = dg.out_degree().compute().\ - nlargest(n=1, columns="degree") - largest_out_degree = largest_out_degree["degree"].iloc[0] - katz_alpha = 1 / (largest_out_degree + 1) - - mg_res = dcg.katz_centrality(dg, alpha=katz_alpha, max_iter=50, tol=1e-6) + mg_res = dcg.katz_centrality(dg, max_iter=50, tol=1e-6) mg_res = mg_res.compute() estimate = mg_res.copy() @@ -130,7 +123,7 @@ def test_dask_katz_centrality_nstart(dask_client, directed): "katz_centrality": "values"}) estimate["values"] = 0.5 - mg_estimate_res = dcg.katz_centrality(dg, alpha=katz_alpha, + mg_estimate_res = dcg.katz_centrality(dg, nstart=estimate, max_iter=50, tol=1e-6) mg_estimate_res = mg_estimate_res.compute() diff --git a/python/cugraph/cugraph/tests/mg/test_mg_neighborhood_sampling.py b/python/cugraph/cugraph/tests/mg/test_mg_neighborhood_sampling.py index ebaf6a8880f..13122b2bfe4 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_neighborhood_sampling.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_neighborhood_sampling.py @@ -10,7 +10,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import pandas as pd import gc import pytest import cugraph.dask as dcg @@ -33,9 +32,9 @@ def setup_function(): # Pytest fixtures # ============================================================================= IS_DIRECTED = [True, False] -# FIXME: Do more testing for this datasets -# [utils.RAPIDS_DATASET_ROOT_DIR_PATH/"email-Eu-core.csv"] -datasets = utils.DATASETS_UNDIRECTED + +datasets = utils.DATASETS_UNDIRECTED + \ + [utils.RAPIDS_DATASET_ROOT_DIR_PATH/"email-Eu-core.csv"] fixture_params = utils.genFixtureParamsProduct( (datasets, "graph_file"), @@ -82,14 +81,14 @@ def input_combo(request): dsts = dg.input_df["dst"] vertices = dask_cudf.concat([srcs, dsts]).drop_duplicates().compute() - start_list = vertices.sample(k) + start_list = vertices.sample(k).astype("int32") - # Generate a random fanout_vals list of length k - fanout_vals = [random.randint(1, k) for _ in range(k)] + # Generate a random fanout_vals list of length random(1, k) + fanout_vals = [random.randint(1, k) for _ in range(random.randint(1, k))] # These prints are for debugging purposes since the vertices and the # fanout_vals are randomly sampled/chosen - print("start_list: \n", start_list) + print("\nstart_list: \n", start_list) print("fanout_vals: ", fanout_vals) parameters["start_list"] = start_list @@ -118,32 +117,45 @@ def test_mg_neighborhood_sampling_simple(dask_client, input_combo): join = result_nbr.merge( input_df, left_on=[*result_nbr.columns[:2]], right_on=[*input_df.columns[:2]]) + if len(result_nbr) != len(join): join2 = input_df.merge( - result_nbr, how='left', left_on=[*input_df.columns], + result_nbr, how='right', left_on=[*input_df.columns], right_on=[*result_nbr.columns]) - pd.set_option('display.max_rows', 500) - print('df1 = \n', input_df.sort_values([*input_df.columns])) - print('df2 = \n', result_nbr.sort_values( - [*result_nbr.columns]).compute()) - print('join2 = \n', join2.sort_values( - [*input_df.columns]).compute().to_pandas().query( - 'sources.isnull()', engine='python')) + # The left part of the datasets shows which edge is missing from the + # right part where the left and right part are respectively the + # uniform-neighbor-sample results and the input dataframe. + difference = join2.sort_values([*result_nbr.columns]) \ + .compute().to_pandas().query( + 'src.isnull()', engine='python') + + invalid_edge = difference[difference.columns[:3]] + raise Exception(f"\nThe edges below from uniform-neighbor-sample " + f"are invalid\n {invalid_edge}") - assert len(join) == len(result_nbr) # Ensure the right indices type is returned assert result_nbr['indices'].dtype == input_combo["indices_type"] - start_list = input_combo["start_list"].to_pandas() - result_nbr_vertices = dask_cudf.concat( + sampled_vertex_result = dask_cudf.concat( [result_nbr["sources"], result_nbr["destinations"]]). \ drop_duplicates().compute().reset_index(drop=True) - result_nbr_vertices = result_nbr_vertices.to_pandas() + sampled_vertex_result = sampled_vertex_result.to_pandas() + start_list = input_combo["start_list"].to_pandas() - # The vertices in start_list must be a subsets of the vertices - # in the result - assert set(start_list).issubset(set(result_nbr_vertices)) + if not set(start_list).issubset(set(sampled_vertex_result)): + missing_vertex = set(start_list) - set(sampled_vertex_result) + missing_vertex = list(missing_vertex) + # compute the out-degree of the missing vertices + out_degree = dg.out_degree(missing_vertex) + + out_degree = out_degree[out_degree.degree != 0] + # If the missing vertices have outgoing edges, return an error + if len(out_degree) != 0: + missing_vertex = out_degree["vertex"].compute(). \ + to_pandas().to_list() + raise Exception(f"vertex {missing_vertex} is missing from " + f"uniform neighbor sampling results") @pytest.mark.parametrize("directed", IS_DIRECTED) diff --git a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py index f8606c45715..f1ff0a3184e 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py @@ -142,7 +142,7 @@ def net_PropertyGraph(request): from cugraph.experimental import PropertyGraph dataframe_type = request.param[0] - netscience_csv = utils.RAPIDS_DATASET_ROOT_DIR_PATH/"netscience_test.csv" + netscience_csv = utils.RAPIDS_DATASET_ROOT_DIR_PATH/"netscience.csv" source_col_name = "src" dest_col_name = "dst" @@ -296,7 +296,7 @@ def net_MGPropertyGraph(dask_client): """ from cugraph.experimental import MGPropertyGraph input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / - "netscience_test.csv").as_posix() + "netscience.csv").as_posix() print(f"dataset={input_data_path}") chunksize = dcg.get_chunksize(input_data_path) ddf = dask_cudf.read_csv( @@ -312,6 +312,7 @@ def net_MGPropertyGraph(dask_client): return dpG +@pytest.mark.skip(reason="Skipping tests because it is a work in progress") def test_extract_subgraph_no_query(net_MGPropertyGraph, net_PropertyGraph): """ Call extract with no args, should result in the entire property graph. @@ -339,6 +340,7 @@ def test_extract_subgraph_no_query(net_MGPropertyGraph, net_PropertyGraph): .equals(mg_subgraph_df[['_SRC_', '_DST_']])) +@pytest.mark.skip(reason="Skipping tests because it is a work in progress") def test_adding_fixture(dataset1_PropertyGraph, dataset1_MGPropertyGraph): sgpG = dataset1_PropertyGraph mgPG = dataset1_MGPropertyGraph @@ -355,6 +357,7 @@ def test_adding_fixture(dataset1_PropertyGraph, dataset1_MGPropertyGraph): .equals(mg_subgraph_df[['_SRC_', '_DST_']])) +@pytest.mark.skip(reason="Skipping tests because it is a work in progress") def test_frame_data(dataset1_PropertyGraph, dataset1_MGPropertyGraph): sgpG = dataset1_PropertyGraph mgpG = dataset1_MGPropertyGraph diff --git a/python/cugraph/cugraph/tests/test_katz_centrality.py b/python/cugraph/cugraph/tests/test_katz_centrality.py index c790973d38e..4a8f259eeb8 100644 --- a/python/cugraph/cugraph/tests/test_katz_centrality.py +++ b/python/cugraph/cugraph/tests/test_katz_centrality.py @@ -52,9 +52,8 @@ def calc_katz(graph_file): G = cugraph.Graph(directed=True) G.from_cudf_edgelist(cu_M, source="0", destination="1") - largest_out_degree = G.degrees().nlargest(n=1, columns="out_degree") - largest_out_degree = largest_out_degree["out_degree"].iloc[0] - katz_alpha = 1 / (largest_out_degree + 1) + degree_max = G.degree()['degree'].max() + katz_alpha = 1 / (degree_max) k_df = cugraph.katz_centrality(G, alpha=None, max_iter=1000) k_df = k_df.sort_values("vertex").reset_index(drop=True) @@ -71,7 +70,7 @@ def calc_katz(graph_file): @pytest.mark.parametrize("graph_file", utils.DATASETS) -def test_katz_centrality(graph_file): +def test_katz_centrality_1(graph_file): katz_scores = calc_katz(graph_file) topKNX = topKVertices(katz_scores, "nx_katz", 10) @@ -89,9 +88,8 @@ def test_katz_centrality_nx(graph_file): ) G = cugraph.utilities.convert_from_nx(Gnx) - largest_out_degree = G.degrees().nlargest(n=1, columns="out_degree") - largest_out_degree = largest_out_degree["out_degree"].iloc[0] - katz_alpha = 1 / (largest_out_degree + 1) + degree_max = G.degree()['degree'].max() + katz_alpha = 1 / (degree_max) nk = nx.katz_centrality(Gnx, alpha=katz_alpha) ck = cugraph.katz_centrality(Gnx, alpha=None, max_iter=1000) diff --git a/python/cugraph/cugraph/tests/test_neighborhood_sampling.py b/python/cugraph/cugraph/tests/test_neighborhood_sampling.py index 9ca6f088a4a..8eacbb6a14c 100644 --- a/python/cugraph/cugraph/tests/test_neighborhood_sampling.py +++ b/python/cugraph/cugraph/tests/test_neighborhood_sampling.py @@ -10,7 +10,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import pandas as pd import gc import pytest import cugraph @@ -31,9 +30,9 @@ def setup_function(): # Pytest fixtures # ============================================================================= IS_DIRECTED = [True, False] -# FIXME: Do more testing for this datasets -# [utils.RAPIDS_DATASET_ROOT_DIR_PATH/"email-Eu-core.csv"] -datasets = utils.DATASETS_UNDIRECTED + +datasets = utils.DATASETS_UNDIRECTED + \ + [utils.RAPIDS_DATASET_ROOT_DIR_PATH/"email-Eu-core.csv"] fixture_params = utils.genFixtureParamsProduct( (datasets, "graph_file"), @@ -80,13 +79,14 @@ def input_combo(request): vertices = cudf.concat([srcs, dsts]).drop_duplicates() - start_list = vertices.sample(k) - # Generate a random fanout_vals list of length k - fanout_vals = [random.randint(1, k) for _ in range(k)] + start_list = vertices.sample(k).astype("int32") + + # Generate a random fanout_vals list of length random(1, k) + fanout_vals = [random.randint(1, k) for _ in range(random.randint(1, k))] # These prints are for debugging purposes since the vertices and # the fanout_vals are randomly sampled/chosen - print("start_list: \n", start_list) + print("\nstart_list: \n", start_list) print("fanout_vals: ", fanout_vals) parameters["start_list"] = start_list @@ -131,28 +131,40 @@ def test_neighborhood_sampling_simple(input_combo): if len(result_nbr) != len(join): join2 = input_df.merge( - result_nbr, how='left', left_on=[*input_df.columns], + result_nbr, how='right', left_on=[*input_df.columns], right_on=[*result_nbr.columns]) + # The left part of the datasets shows which edge is missing from the + # right part where the left and right part are respectively the + # uniform-neighbor-sample results and the input dataframe. + difference = join2.sort_values([*result_nbr.columns]) \ + .to_pandas().query( + 'src.isnull()', engine='python') - pd.set_option('display.max_rows', 500) - print('df1 = \n', input_df.sort_values([*input_df.columns])) - print('df2 = \n', result_nbr.sort_values([*result_nbr.columns])) - print('join2 = \n', join2.sort_values([*input_df.columns]) - .to_pandas().query('sources.isnull()', engine='python')) + invalid_edge = difference[difference.columns[:3]] + raise Exception(f"\nThe edges below from uniform-neighbor-sample " + f"are invalid\n {invalid_edge}") - assert len(join) == len(result_nbr) # Ensure the right indices type is returned assert result_nbr['indices'].dtype == input_combo["indices_type"] - start_list = input_combo["start_list"] - - result_nbr_vertices = cudf.concat( - [result_nbr["sources"], result_nbr["destinations"]]) \ - .drop_duplicates().reset_index(drop=True) + sampled_vertex_result = cudf.concat( + [result_nbr["sources"], result_nbr["destinations"]]). \ + drop_duplicates().reset_index(drop=True) - assert set( - start_list.to_pandas()).issubset( - set(result_nbr_vertices.to_pandas())) + sampled_vertex_result = sampled_vertex_result.to_pandas() + start_list = input_combo["start_list"].to_pandas() + + if not set(start_list).issubset(set(sampled_vertex_result)): + missing_vertex = set(start_list) - set(sampled_vertex_result) + missing_vertex = list(missing_vertex) + # compute the out-degree of the missing vertices + out_degree = G.out_degree(missing_vertex) + out_degree = out_degree[out_degree.degree != 0] + # If the missing vertices have outgoing edges, return an error + if len(out_degree) != 0: + missing_vertex = out_degree["vertex"].to_pandas().to_list() + raise Exception(f"vertex {missing_vertex} is missing from " + f"uniform neighbor sampling results") @pytest.mark.parametrize("directed", IS_DIRECTED)