diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f6a5924c3e..81c7b68a2bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ - PR #992 Fix unrenumber of predecessor - PR #1008 Fix for cudf updates disabling iteration of Series/Columns/Index - PR #1012 Fix Local build script README +- PR #1017 Fix more mg bugs - PR #1022 Fix support for using a cudf.DataFrame with a MG graph # cuGraph 0.14.0 (03 Jun 2020) diff --git a/python/cugraph/comms/comms.py b/python/cugraph/comms/comms.py index 28ce2a3fc1e..642d99440e0 100644 --- a/python/cugraph/comms/comms.py +++ b/python/cugraph/comms/comms.py @@ -9,16 +9,33 @@ # Intialize Comms. If explicit Comms not provided as arg, # default Comms are initialized as per client information. -def initialize(arg=None): +def initialize(comms=None, p2p=False): + """ + Intitializes a communicator for multi-node multi-gpu communications. + It is expected to be called right after client initialization for running + mnmg algorithms. It wraps raft comms that manages underlying NCCL and UCX + comms handles across the workers of a Dask cluster. + It is recommended to also call `destroy()` when the comms are no longer + needed so the underlying resources can be cleaned up. + + Parameters + ---------- + comms : raft Comms + A pre-initialized raft communicator. If provided, this is used for mnmg + communications. + p2p : bool + Initialize UCX endpoints + """ + global __instance if __instance is None: global __default_handle __default_handle = None - if arg is None: - __instance = raftComms() + if comms is None: + __instance = raftComms(comms_p2p=p2p) __instance.init() else: - __instance = arg + __instance = comms else: raise Exception("Communicator is already initialized") @@ -47,6 +64,9 @@ def get_session_id(): # Destroy Comms def destroy(): + """ + Shuts down initialized comms and cleans up resources. + """ global __instance if is_initialized(): __instance.destroy() diff --git a/python/cugraph/dask/common/input_utils.py b/python/cugraph/dask/common/input_utils.py index f26e1a8f4d1..ab038507b78 100644 --- a/python/cugraph/dask/common/input_utils.py +++ b/python/cugraph/dask/common/input_utils.py @@ -151,8 +151,13 @@ def calculate_local_data(self, comms, by): for rank in range(len(_local_data_dict)): data = _local_data_dict[rank] local_data_dict['edges'].append(data[0]) - local_data_dict['offsets'].append(data[1]) - local_data_dict['verts'].append(data[2]) + if rank == 0: + local_offset = 0 + else: + prev_data = _local_data_dict[rank-1] + local_offset = prev_data[1] + 1 + local_data_dict['offsets'].append(local_offset) + local_data_dict['verts'].append(data[1] - local_offset + 1) import numpy as np local_data_dict['edges'] = np.array(local_data_dict['edges'], @@ -191,13 +196,8 @@ def get_obj(x): return x[0] if multiple else x def _get_local_data(df, by): df = df[0] num_local_edges = len(df) - if num_local_edges == 0: - local_offset = 0 - num_local_verts = 0 - else: - local_offset = df[by].min() - num_local_verts = df[by].max() - local_offset + 1 - return num_local_edges, local_offset, num_local_verts + local_max = df[by].iloc[-1] + return num_local_edges, local_max def get_local_data(input_graph, by, load_balance=True): diff --git a/python/cugraph/dask/mg_pagerank/pagerank.py b/python/cugraph/dask/mg_pagerank/pagerank.py index c4f0d8bf675..cf1b8c3bc90 100644 --- a/python/cugraph/dask/mg_pagerank/pagerank.py +++ b/python/cugraph/dask/mg_pagerank/pagerank.py @@ -100,7 +100,8 @@ def pagerank(input_graph, names=['src', 'dst', 'value'], dtype=['int32', 'int32', 'float32']) >>> dg = cugraph.DiGraph() - >>> dg.from_dask_cudf_edgelist(ddf) + >>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst', + edge_attr='value') >>> pr = dcg.pagerank(dg) """ from cugraph.structure.graph import null_check @@ -121,7 +122,7 @@ def pagerank(input_graph, if input_graph.renumbered is True: personalization = input_graph.add_internal_vertex_id( personalization, "vertex", "vertex" - ) + ).compute() result = dict([(data.worker_info[wf[0]]["rank"], client.submit( diff --git a/python/cugraph/dask/pagerank/__init__.py b/python/cugraph/dask/pagerank/__init__.py deleted file mode 100644 index 01d75d2a3d1..00000000000 --- a/python/cugraph/dask/pagerank/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .pagerank import pagerank, get_chunksize, read_split_csv, drop_duplicates diff --git a/python/cugraph/dask/pagerank/pagerank.py b/python/cugraph/dask/pagerank/pagerank.py deleted file mode 100644 index 2e5a636683c..00000000000 --- a/python/cugraph/dask/pagerank/pagerank.py +++ /dev/null @@ -1,45 +0,0 @@ -def pagerank(edge_list, alpha=0.85, max_iter=30): - """ - Find the PageRank values for each vertex in a graph using multiple GPUs. - cuGraph computes an approximation of the Pagerank using the power method. - The input edge list should be provided in dask-cudf dataframe - with one partition per GPU. - - Parameters - ---------- - edge_list : dask_cudf.DataFrame - Contain the connectivity information as an edge list. - Source 'src' and destination 'dst' columns must be of type 'int32'. - Edge weights are not used for this algorithm. - Indices must be in the range [0, V-1], where V is the global number - of vertices. - alpha : float - The damping factor alpha represents the probability to follow an - outgoing edge, standard value is 0.85. - Thus, 1.0-alpha is the probability to “teleport” to a random vertex. - Alpha should be greater than 0.0 and strictly lower than 1.0. - max_iter : int - The maximum number of iterations before an answer is returned. - If this value is lower or equal to 0 cuGraph will use the default - value, which is 30. - - Returns - ------- - PageRank : dask_cudf.DataFrame - Dask GPU DataFrame containing two columns of size V: the vertex - identifiers and the corresponding PageRank values. - - Examples - -------- - >>> import dask_cugraph.pagerank as dcg - >>> chunksize = dcg.get_chunksize(edge_list.csv) - >>> ddf_edge_list = dask_cudf.read_csv(edge_list.csv, - >>> chunksize = chunksize, - >>> delimiter='\t', - >>> names=['src', 'dst'], - >>> dtype=['int32', 'int32']) - >>> pr = dcg.pagerank(ddf_edge_list, alpha=0.85, max_iter=50) - """ - - raise Exception("mg_pagerank currently disabled... " - "new MG version coming soon") diff --git a/python/cugraph/structure/graph.py b/python/cugraph/structure/graph.py index a10c1f29f63..a0a62ee3ca3 100644 --- a/python/cugraph/structure/graph.py +++ b/python/cugraph/structure/graph.py @@ -330,7 +330,9 @@ def add_edge_list(self, source, destination, value=None): else: self.from_cudf_edgelist(input_df) - def from_dask_cudf_edgelist(self, input_ddf, renumber=True): + def from_dask_cudf_edgelist(self, input_ddf, source='source', + destination='destination', + edge_attr=None, renumber=True): """ Initializes the distributed graph from the dask_cudf.DataFrame edgelist. Undirected Graphs are not currently supported. @@ -345,9 +347,12 @@ def from_dask_cudf_edgelist(self, input_ddf, renumber=True): ---------- input_ddf : dask_cudf.DataFrame The edgelist as a dask_cudf.DataFrame - Source vertices are in a column named 'src', Destination - vertices are in a column named 'dst' - + source : str + source argument is source column name + destination : str + destination argument is destination column name. + edge_attr : str + edge_attr argument is the weights column name. renumber : bool If source and destination indices are not in range 0 to V where V is number of vertices, renumber argument should be True. @@ -359,6 +364,10 @@ def from_dask_cudf_edgelist(self, input_ddf, renumber=True): if isinstance(input_ddf, dask_cudf.DataFrame): self.distributed = True self.local_data = None + rename_map = {source: 'src', destination: 'dst'} + if edge_attr is not None: + rename_map[edge_attr] = 'weights' + input_ddf = input_ddf.rename(columns=rename_map) if renumber: renumbered_ddf, number_map = NumberMap.renumber( input_ddf, "src", "dst" diff --git a/python/cugraph/tests/dask/mg_utility_testing.py b/python/cugraph/tests/dask/mg_utility_testing.py index aef81fb53f5..2819d3b3253 100644 --- a/python/cugraph/tests/dask/mg_utility_testing.py +++ b/python/cugraph/tests/dask/mg_utility_testing.py @@ -20,7 +20,8 @@ def test_compute_local_data(): dtype=['int32', 'int32', 'float32']) dg = cugraph.DiGraph() - dg.from_dask_cudf_edgelist(ddf) + dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst', + edge_attr='value') # Compute_local_data dg.compute_local_data(by='dst') diff --git a/python/cugraph/tests/dask/test_mg_comms.py b/python/cugraph/tests/dask/test_mg_comms.py index f9ab39ee35d..2b9df6f9efd 100644 --- a/python/cugraph/tests/dask/test_mg_comms.py +++ b/python/cugraph/tests/dask/test_mg_comms.py @@ -53,7 +53,7 @@ def test_dask_pagerank(client_connection): dtype=['int32', 'int32', 'float32']) dg1 = cugraph.DiGraph() - dg1.from_dask_cudf_edgelist(ddf1) + dg1.from_dask_cudf_edgelist(ddf1, 'src', 'dst') result_pr1 = dcg.pagerank(dg1) ddf2 = dask_cudf.read_csv(input_data_path2, chunksize=chunksize2, @@ -62,7 +62,7 @@ def test_dask_pagerank(client_connection): dtype=['int32', 'int32', 'float32']) dg2 = cugraph.DiGraph() - dg2.from_dask_cudf_edgelist(ddf2, renumber=False) + dg2.from_dask_cudf_edgelist(ddf2, 'src', 'dst') result_pr2 = dcg.pagerank(dg2) # Calculate single GPU pagerank for verification of results diff --git a/python/cugraph/tests/dask/test_mg_degree.py b/python/cugraph/tests/dask/test_mg_degree.py index dc2915060f6..ba5c8cef054 100644 --- a/python/cugraph/tests/dask/test_mg_degree.py +++ b/python/cugraph/tests/dask/test_mg_degree.py @@ -41,7 +41,7 @@ def test_dask_mg_degree(client_connection): dtype=['int32', 'int32', 'float32']) dg = cugraph.DiGraph() - dg.from_dask_cudf_edgelist(ddf) + dg.from_dask_cudf_edgelist(ddf, 'src', 'dst') g = cugraph.DiGraph() g.from_cudf_edgelist(df, 'src', 'dst') diff --git a/python/cugraph/tests/dask/test_mg_pagerank.py b/python/cugraph/tests/dask/test_mg_pagerank.py index 42a721c55e9..be6c9ab45b4 100644 --- a/python/cugraph/tests/dask/test_mg_pagerank.py +++ b/python/cugraph/tests/dask/test_mg_pagerank.py @@ -81,10 +81,10 @@ def test_dask_pagerank(client_connection, personalization_perc): dtype=['int32', 'int32', 'float32']) g = cugraph.DiGraph() - g.from_cudf_edgelist(df, 'src', 'dst', renumber=False) + g.from_cudf_edgelist(df, 'src', 'dst') dg = cugraph.DiGraph() - dg.from_dask_cudf_edgelist(ddf, renumber=False) + dg.from_dask_cudf_edgelist(ddf) # Pre compute local data and personalize personalization = None