diff --git a/CHANGELOG.md b/CHANGELOG.md index dd43419154..0743bb06c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ - PR #736 cuHornet KTruss integration - PR #735 Integration gunrock's betweenness centrality - PR #760 cuHornet Weighted KTruss +- PR #846 OPG dask infrastructure ## Improvements - PR #688 Cleanup datasets after testing on gpuCI diff --git a/cpp/include/algorithms.hpp b/cpp/include/algorithms.hpp index ac5600b59e..fd73400702 100644 --- a/cpp/include/algorithms.hpp +++ b/cpp/include/algorithms.hpp @@ -410,4 +410,12 @@ void bfs(experimental::GraphCSR const &graph, VT *predecessors, const VT start_vertex, bool directed = true); + + +///////////////////////////////////////////////////////// +template +void mg_pagerank_temp(experimental::GraphCSC const &graph, + WT* pagerank); +////////////////////////////////////////////////////////// + } //namespace cugraph diff --git a/cpp/src/link_analysis/pagerank.cu b/cpp/src/link_analysis/pagerank.cu index 075ecf8787..9c3af565ea 100644 --- a/cpp/src/link_analysis/pagerank.cu +++ b/cpp/src/link_analysis/pagerank.cu @@ -278,3 +278,24 @@ template void pagerank(experimental::GraphCSC double alpha, double tolerance, int64_t max_iter, bool has_guess); } //namespace cugraph + + + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +namespace cugraph { + +template +void mg_pagerank_temp(experimental::GraphCSC const &graph, WT* pagerank){ + +std::cout<<"\nINSIDE CPP\n"; +std::cout<(experimental::GraphCSC const &graph, float* pagerank); +template void mg_pagerank_temp(experimental::GraphCSC const &graph, double* pagerank); +} //namespace cugraph + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/python/cugraph/__init__.py b/python/cugraph/__init__.py index bf67fd3419..b3a47c5ea3 100644 --- a/python/cugraph/__init__.py +++ b/python/cugraph/__init__.py @@ -51,6 +51,8 @@ from cugraph.proto.components import strong_connected_component from cugraph.proto.structure import find_bicliques +from cugraph.opg.link_analysis.mg_pagerank_wrapper import mg_pagerank + # Versioneer from ._version import get_versions __version__ = get_versions()['version'] diff --git a/python/cugraph/dask/common/__init__.py b/python/cugraph/dask/common/__init__.py new file mode 100644 index 0000000000..6a4f38a9ff --- /dev/null +++ b/python/cugraph/dask/common/__init__.py @@ -0,0 +1,33 @@ +# +# Copyright (c) 2019-2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from cugraph.dask.common.comms import CommsContext, worker_state, default_comms + +#from cugraph.dask.common.comms_utils import inject_comms_on_handle, \ +# perform_test_comms_allreduce, perform_test_comms_send_recv, \ +# perform_test_comms_recv_any_rank, \ +# inject_comms_on_handle_coll_only, is_ucx_enabled + +#from cugraph.dask.common.dask_arr_utils import to_sparse_dask_array # NOQA + +#from cugraph.dask.common.dask_df_utils import get_meta # NOQA +#from cugraph.dask.common.dask_df_utils import to_dask_cudf # NOQA +#from cugraph.dask.common.dask_df_utils import to_dask_df # NOQA + +#from cugraph.dask.common.part_utils import * + +#from cuml.dask.common.utils import raise_exception_from_futures # NOQA +#from cuml.dask.common.utils import raise_mg_import_exception # NOQA diff --git a/python/cugraph/dask/common/comms.py b/python/cugraph/dask/common/comms.py new file mode 100644 index 0000000000..60671785d8 --- /dev/null +++ b/python/cugraph/dask/common/comms.py @@ -0,0 +1,405 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from cugraph.nccl import nccl +#from cugraph.dask.common.ucx import UCX + +import weakref +from .comms_utils import func_build_comms +#from .comms_utils import inject_comms_on_handle, \ +# inject_comms_on_handle_coll_only, is_ucx_enabled +#from .utils import parse_host_port +#from cugraph.common.handle import Handle + +from dask.distributed import get_worker, default_client + +#from cugraph.utils.import_utils import has_ucp +import warnings + +import time +import uuid + + +_global_comms = weakref.WeakValueDictionary() +_global_comms_index = [0] + + +def _set_global_comms(c): + if c is not None: + _global_comms[_global_comms_index[0]] = c + _global_comms_index[0] += 1 + + +def _del_global_comms(c): + for k in list(_global_comms): + try: + if _global_comms[k] is c: + del _global_comms[k] + except KeyError: + pass + + +def worker_state(sessionId=None): + """ + Retrieves cuML comms state on local worker for the given + sessionId, creating a new session if it does not exist. + If no session id is given, returns the state dict for all + sessions. + :param sessionId: + :return: + """ + worker = get_worker() + if not hasattr(worker, "_cugraph_comm_state"): + worker._cugraph_comm_state = {} + if sessionId is not None and sessionId not in worker._cugraph_comm_state: + # Build state for new session and mark session creation time + worker._cugraph_comm_state[sessionId] = {"ts": time.time()} + + if sessionId is not None: + return worker._cugraph_comm_state[sessionId] + return worker._cugraph_comm_state + +''' +def get_ucx(): + """ + A simple convenience wrapper to make sure UCP listener and + endpoints are only ever assigned once per worker. + """ + if "ucx" not in worker_state("ucp"): + worker_state("ucp")["ucx"] = UCX.get() + return worker_state("ucp")["ucx"] +''' + +def _get_global_comms(): + L = sorted(list(_global_comms), reverse=True) + for k in L: + c = _global_comms[k] + if c.nccl_initialized and (not c.comms_p2p or c.ucx_initialized): + return c + else: + del _global_comms[k] + del L + return None + + +def default_comms(comms_p2p=False, client=None): + """ Return a comms instance if one has been initialized. + Otherwise, initialize a new comms instance. + """ + c = _get_global_comms() + if c: + return c + else: + cb = CommsContext(comms_p2p, client) + cb.init() + + _set_global_comms(cb) + + return _get_global_comms() + +''' +def _func_ucp_listener_port(): + return get_ucx().listener_port() +''' + +async def _func_init_all(sessionId, uniqueId, comms_p2p, + worker_info, verbose, streams_per_handle): + + session_state = worker_state(sessionId) + session_state["nccl_uid"] = uniqueId + session_state["wid"] = worker_info[get_worker().address]["rank"] + session_state["nworkers"] = len(worker_info) + + if verbose: + print("Initializing NCCL") + start = time.time() + + _func_init_nccl(sessionId, uniqueId) + + if verbose: + elapsed = time.time() - start + print("NCCL Initialization took: %f seconds." % elapsed) + + '''if comms_p2p: + if verbose: + print("Initializing UCX Endpoints") + + if verbose: + start = time.time() + await _func_ucp_create_endpoints(sessionId, worker_info) + + if verbose: + elapsed = time.time() - start + print("Done initializing UCX endpoints. Took: %f seconds." % + elapsed) + print("Building handle") + + _func_build_handle_p2p(sessionId, streams_per_handle, verbose) + + if verbose: + print("Done building handle.") + + else: + _func_build_handle(sessionId, streams_per_handle, verbose) + ''' + _func_build_comms(sessionId, verbose) + + +def _func_init_nccl(sessionId, uniqueId): + """ + Initialize ncclComm_t on worker + :param workerId: int ID of the current worker running the function + :param nWorkers: int Number of workers in the cluster + :param uniqueId: array[byte] The NCCL unique Id generated from the + client. + """ + + wid = worker_state(sessionId)["wid"] + nWorkers = worker_state(sessionId)["nworkers"] + + try: + n = nccl() + n.init(nWorkers, uniqueId, wid) + worker_state(sessionId)["nccl"] = n + except Exception: + print("An error occurred initializing NCCL!") + +''' +def _func_build_handle_p2p(sessionId, streams_per_handle, verbose): + """ + Builds a cumlHandle on the current worker given the initialized comms + :param nccl_comm: ncclComm_t Initialized NCCL comm + :param eps: size_t initialized endpoints + :param nWorkers: int number of workers in cluster + :param workerId: int Rank of current worker + :return: + """ + ucp_worker = get_ucx().get_worker() + session_state = worker_state(sessionId) + + handle = Handle(streams_per_handle) + nccl_comm = session_state["nccl"] + eps = session_state["ucp_eps"] + nWorkers = session_state["nworkers"] + workerId = session_state["wid"] + + inject_comms_on_handle(handle, nccl_comm, ucp_worker, eps, + nWorkers, workerId, verbose) + + worker_state(sessionId)["handle"] = handle + + +def _func_build_handle(sessionId, streams_per_handle, verbose): + """ + Builds a cumlHandle on the current worker given the initialized comms + :param nccl_comm: ncclComm_t Initialized NCCL comm + :param nWorkers: int number of workers in cluster + :param workerId: int Rank of current worker + :return: + """ + handle = Handle(streams_per_handle) + + session_state = worker_state(sessionId) + + workerId = session_state["wid"] + nWorkers = session_state["nworkers"] + + nccl_comm = session_state["nccl"] + inject_comms_on_handle_coll_only(handle, nccl_comm, nWorkers, + workerId, verbose) + session_state["handle"] = handle + + +def _func_store_initial_state(nworkers, sessionId, uniqueId, wid): + session_state = worker_state(sessionId) + session_state["nccl_uid"] = uniqueId + session_state["wid"] = wid + session_state["nworkers"] = nworkers + + +async def _func_ucp_create_endpoints(sessionId, worker_info): + """ + Runs on each worker to create ucp endpoints to all other workers + :param sessionId: uuid unique id for this instance + :param worker_info: dict Maps worker address to rank & UCX port + :param r: float a random number to stop the function from being cached + """ + dask_worker = get_worker() + local_address = dask_worker.address + + eps = [None] * len(worker_info) + count = 1 + + for k in worker_info: + if str(k) != str(local_address): + + ip, port = parse_host_port(k) + + ep = await get_ucx().get_endpoint(ip, worker_info[k]["port"]) + + eps[worker_info[k]["rank"]] = ep + count += 1 + + worker_state(sessionId)["ucp_eps"] = eps +''' + +async def _func_destroy_all(sessionId, comms_p2p, verbose=False): + worker_state(sessionId)["nccl"].destroy() + del worker_state(sessionId)["nccl"] + #del worker_state(sessionId)["handle"] + +def _func_build_comms(sessionId, verbose): + """ + Builds a cumlHandle on the current worker given the initialized comms + :param nccl_comm: ncclComm_t Initialized NCCL comm + :param nWorkers: int number of workers in cluster + :param workerId: int Rank of current worker + :return: + """ + session_state = worker_state(sessionId) + + workerId = session_state["wid"] + nWorkers = session_state["nworkers"] + + nccl_comm = session_state["nccl"] + + session_state["comm"] = func_build_comms(nccl_comm, nWorkers, workerId) + +''' +def _func_ucp_ports(client, workers): + return client.run(_func_ucp_listener_port, + workers=workers) +''' + +def _func_worker_ranks(workers): + """ + Builds a dictionary of { (worker_address, worker_port) : worker_rank } + """ + return dict(list(zip(workers, range(len(workers))))) + + +class CommsContext: + + """ + A base class to initialize and manage underlying NCCL and UCX + comms handles across a Dask cluster. Classes extending CommsContext + are responsible for calling `self.init()` to initialize the comms. + Classes that extend or use the CommsContext are also responsible for + calling `destroy()` to clean up the underlying comms. + + This class is not meant to be thread-safe. + """ + + def __init__(self, comms_p2p=False, client=None, verbose=False, + streams_per_handle=0): + """ + Construct a new CommsContext instance + :param comms_p2p: bool Should p2p comms be initialized? + """ + self.client = client if client is not None else default_client() + self.comms_p2p = comms_p2p + + self.streams_per_handle = streams_per_handle + + self.sessionId = uuid.uuid4().bytes + + self.nccl_initialized = False + self.ucx_initialized = False + + self.verbose = verbose + + '''if comms_p2p and (not is_ucx_enabled() or not has_ucp()): + warnings.warn("ucx-py not found. UCP Integration will " + "be disabled.") + self.comms_p2p = False + ''' + + if verbose: + print("Initializing comms!") + + def __del__(self): + if self.nccl_initialized or self.ucx_initialized: + self.destroy() + + + def worker_info(self, workers): + """ + Builds a dictionary of { (worker_address, worker_port) : + (worker_rank, worker_port ) } + """ + ranks = _func_worker_ranks(workers) + #ports = _func_ucp_ports(self.client, workers) \ + # if self.comms_p2p else None + + output = {} + for k in ranks.keys(): + output[k] = {"rank": ranks[k]} + #if self.comms_p2p: + # output[k]["port"] = ports[k] + return output + + + def init(self, workers=None): + """ + Initializes the underlying comms. NCCL is required but + UCX is only initialized if `comms_p2p == True` + """ + + self.worker_addresses = list(set((self.client.has_what().keys() + if workers is None else workers))) + + if self.nccl_initialized: + warnings.warn("CommsContext has already been initialized.") + return + + worker_info = self.worker_info(self.worker_addresses) + worker_info = {w: worker_info[w] for w in self.worker_addresses} + + self.uniqueId = nccl.get_unique_id() + + self.client.run(_func_init_all, + self.sessionId, + self.uniqueId, + self.comms_p2p, + worker_info, + self.verbose, + self.streams_per_handle, + workers=self.worker_addresses, + wait=True) + + self.nccl_initialized = True + + if self.comms_p2p: + self.ucx_initialized = True + + if self.verbose: + print("Initialization complete.") + + def destroy(self): + """ + Shuts down initialized comms and cleans up resources. + """ + self.client.run(_func_destroy_all, + self.sessionId, + self.comms_p2p, + self.verbose, + wait=True, + workers=self.worker_addresses) + + if self.verbose: + print("Destroying comms.") + + self.nccl_initialized = False + self.ucx_initialized = False diff --git a/python/cugraph/dask/common/comms_utils.pyx b/python/cugraph/dask/common/comms_utils.pyx new file mode 100644 index 0000000000..6177349874 --- /dev/null +++ b/python/cugraph/dask/common/comms_utils.pyx @@ -0,0 +1,146 @@ +# Copyright (c) 2019-2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True +# cython: language_level = 3 + +from libc.stdlib cimport malloc, free +from cython.operator cimport dereference as deref + +from cpython.long cimport PyLong_AsVoidPtr + +from libcpp cimport bool + + +from libc.stdint cimport uintptr_t + +cdef extern from "nccl.h": + + cdef struct ncclComm + ctypedef ncclComm *ncclComm_t + + +cdef extern from "comms_mpi.hpp" namespace "cugraph::experimental": + cdef cppclass Comm: + Comm(ncclComm_t comm, + int size, + int rank) except + + +''' +cdef extern from "comms/cuML_comms_test.hpp" namespace "ML::Comms": + bool test_collective_allreduce(const cumlHandle &h) except + + bool test_pointToPoint_simple_send_recv(const cumlHandle &h, + int numTrials) except + + bool test_pointToPoint_recv_any_rank(const cumlHandle& h, + int numTrials) except + + +def perform_test_comms_allreduce(handle): + """ + Performs an allreduce on the current worker + :param handle: Handle handle containing cumlCommunicator to use + """ + cdef const cumlHandle* h = handle.getHandle() + return test_collective_allreduce(deref(h)) + + +def perform_test_comms_send_recv(handle, n_trials): + """ + Performs a p2p send/recv on the current worker + :param handle: Handle handle containing cumlCommunicator to use + """ + cdef const cumlHandle *h = handle.getHandle() + return test_pointToPoint_simple_send_recv(deref(h), n_trials) + + +def perform_test_comms_recv_any_rank(handle, n_trials): + """ + Performs a p2p send/recv on the current worker + :param handle: Handle handle containing cumlCommunicator to use + """ + cdef const cumlHandle * h = < cumlHandle * > < size_t > handle.getHandle() + return test_pointToPoint_recv_any_rank(deref(h), < int > n_trials) + + +def inject_comms_on_handle_coll_only(handle, nccl_inst, size, rank, verbose): + """ + Given a handle and initialized nccl comm, creates a cumlCommunicator + instance and injects it into the handle. + :param handle: Handle cumlHandle to inject comms into + :param nccl_inst: ncclComm_t initialized nccl comm + :param size: int number of workers in cluster + :param rank: int rank of current worker + """ + + cdef size_t handle_size_t = handle.getHandle() + handle_ = handle_size_t + + cdef size_t nccl_comm_size_t = nccl_inst.get_comm() + nccl_comm_ = nccl_comm_size_t + + inject_comms_py_coll(handle_, + deref(nccl_comm_), + size, + rank) + + +def inject_comms_on_handle(handle, nccl_inst, ucp_worker, eps, size, + rank, verbose): + """ + Given a handle and initialized comms, creates a cumlCommunicator instance + and injects it into the handle. + :param handle: Handle cumlHandle to inject comms into + :param nccl_inst: ncclComm_t initialized nccl comm + :param ucp_worker: size_t initialized ucp_worker_h instance + :param eps: size_t array of initialized ucp_ep_h instances + :param size: int number of workers in cluster + :param rank: int rank of current worker + """ + cdef size_t *ucp_eps = malloc(len(eps)*sizeof(size_t)) + + for i in range(len(eps)): + if eps[i] is not None: + ep_st = eps[i].get_ucp_endpoint() + ucp_eps[i] = ep_st + else: + ucp_eps[i] = 0 + + cdef void* ucp_worker_st = ucp_worker + + cdef size_t handle_size_t = handle.getHandle() + handle_ = handle_size_t + + cdef size_t nccl_comm_size_t = nccl_inst.get_comm() + nccl_comm_ = nccl_comm_size_t + + inject_comms_py(handle_, + deref(nccl_comm_), + ucp_worker_st, + ucp_eps, + size, + rank) + + free(ucp_eps) +''' + + +def func_build_comms(nccl_inst, size, rank): + + cdef size_t nccl_comm_size_t = nccl_inst.get_comm() + nccl_comm_ = nccl_comm_size_t + + communicator = (new Comm(deref(nccl_comm_), size, rank)) + + return communicator diff --git a/python/cugraph/dask/common/dask_df_utils.py b/python/cugraph/dask/common/dask_df_utils.py new file mode 100644 index 0000000000..9bb63e1fb3 --- /dev/null +++ b/python/cugraph/dask/common/dask_df_utils.py @@ -0,0 +1,73 @@ +# Copyright (c) 2019, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from dask.distributed import default_client +import dask.dataframe as dd + + +def get_meta(df): + """ + Return the metadata from a single dataframe + :param df: cudf.dataframe + :return: Row data from the first row of the dataframe + """ + ret = df.iloc[:0] + return ret + + +def to_dask_cudf(futures, client=None, verbose=False): + """ + Convert a list of futures containing cudf Dataframes into a Dask.Dataframe + :param futures: list[cudf.Dataframe] list of futures containing dataframes + :param client: dask.distributed.Client Optional client to use + :return: dask.Dataframe a dask.Dataframe + """ + c = default_client() if client is None else client + # Convert a list of futures containing dfs back into a dask_cudf + dfs = [d for d in futures if d.type != type(None)] # NOQA + if verbose: + print("to_dask_cudf dfs=%s" % str(dfs)) + meta = c.submit(get_meta, dfs[0]) + meta_local = meta.result() + return dd.from_delayed(dfs, meta=meta_local) + + +def to_dask_df(dask_cudf, client=None): + """ + Convert a Dask-cuDF into a Pandas-backed Dask Dataframe. + :param dask_cudf : dask_cudf.DataFrame + :param client: dask.distributed.Client Optional client to use + :return : dask.DataFrame + """ + + def to_pandas(df): + return df.to_pandas() + + c = default_client() if client is None else client + delayed_ddf = dask_cudf.to_delayed() + gpu_futures = c.compute(delayed_ddf) + + dfs = [c.submit( + to_pandas, + f, + pure=False) for idx, f in enumerate(gpu_futures)] + + meta = c.submit(get_meta, dfs[0]) + + # Using new variabe for local result to stop race-condition in scheduler + # Ref: https://github.com/dask/dask/issues/6027 + meta_local = meta.result() + + return dd.from_delayed(dfs, meta=meta_local) diff --git a/python/cugraph/dask/common/input_utils.py b/python/cugraph/dask/common/input_utils.py new file mode 100644 index 0000000000..dd96a0c384 --- /dev/null +++ b/python/cugraph/dask/common/input_utils.py @@ -0,0 +1,257 @@ +# +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import cudf +import cupy as cp +import dask.array as da + +from collections.abc import Sequence + +#from cuml.utils.memory_utils import with_cupy_rmm + +from collections import OrderedDict +from cudf.core import DataFrame +from dask_cudf.core import DataFrame as dcDataFrame +from dask_cudf.core import Series as daskSeries + +from cugraph.dask.common.utils import get_client +#from cuml.dask.common.dask_df_utils import to_dask_cudf +#from cuml.dask.common.dask_arr_utils import validate_dask_array +from cugraph.dask.common.part_utils import _extract_partitions +from dask.distributed import wait +from dask.distributed import default_client +from toolz import first + +from functools import reduce + +import dask.dataframe as dd + + +class DistributedDataHandler: + """ + Class to centralize distributed data management. Functionalities include: + - Data colocation + - Worker information extraction + - GPU futures extraction, + + Additional functionality can be added as needed. This class **does not** + contain the actual data, just the metadata necessary to handle it, + including common pieces of code that need to be performed to call + Dask functions. + + The constructor is not meant to be used directly, but through the factory + method DistributedDataHandler.create + + """ + + def __init__(self, gpu_futures=None, workers=None, + datatype=None, multiple=False, client=None): + self.client = get_client(client) + self.gpu_futures = gpu_futures + self.worker_to_parts = _workers_to_parts(gpu_futures) + self.workers = workers + self.datatype = datatype + self.multiple = multiple + self.worker_info = None + self.total_rows = None + self.ranks = None + self.parts_to_sizes = None + + @classmethod + def get_client(cls, client=None): + return default_client() if client is None else client + + """ Class methods for initalization """ + + @classmethod + def create(cls, data, client=None): + """ + Creates a distributed data handler instance with the given + distributed data set(s). + + Parameters + ---------- + + data : dask.array, dask.dataframe, or unbounded Sequence of + dask.array or dask.dataframe. + + client : dask.distributedClient + """ + + client = cls.get_client(client) + + multiple = isinstance(data, Sequence) + + if isinstance(first(data) if multiple else data, + (dcDataFrame, daskSeries)): + datatype = 'cudf' + ''' + else: + datatype = 'cupy' + if multiple: + for d in data: + validate_dask_array(d) + else: + validate_dask_array(data) + ''' + + gpu_futures = client.sync(_extract_partitions, data, client) + + workers = tuple(set(map(lambda x: x[0], gpu_futures))) + + return DistributedDataHandler(gpu_futures=gpu_futures, workers=workers, + datatype=datatype, multiple=multiple, + client=client) + + """ Methods to calculate further attributes """ + + def calculate_worker_and_rank_info(self, comms): + + self.worker_info = comms.worker_info(comms.worker_addresses) + self.ranks = dict() + + for w, futures in self.worker_to_parts.items(): + self.ranks[w] = self.worker_info[w]["rank"] + + def calculate_parts_to_sizes(self, comms=None, ranks=None): + + if self.worker_info is None and comms is not None: + self.calculate_worker_and_rank_info(comms) + + self.total_rows = 0 + + self.parts_to_sizes = dict() + + parts = [(wf[0], self.client.submit( + _get_rows, + wf[1], + self.multiple, + workers=[wf[0]], + pure=False)) + for idx, wf in enumerate(self.worker_to_parts.items())] + + sizes = self.client.compute(parts, sync=True) + + for w, sizes_parts in sizes: + sizes, total = sizes_parts + self.parts_to_sizes[self.worker_info[w]["rank"]] = \ + sizes + + self.total_rows += total + +''' +@with_cupy_rmm +def concatenate(objs, axis=0): + if isinstance(objs[0], DataFrame): + if len(objs) == 1: + return objs[0] + else: + return cudf.concat(objs) + + elif isinstance(objs[0], cp.ndarray): + return cp.concatenate(objs, axis=axis) + + +# TODO: This should be delayed. +def to_output(futures, type, client=None, verbose=False): + if type == 'cupy': + return to_dask_cupy(futures, client=client) + else: + return to_dask_cudf(futures, client=client) + + +def _get_meta(df): + """ + Return the metadata from a single dataframe + :param df: cudf.dataframe + :return: Row data from the first row of the dataframe + """ + ret = df[0].iloc[:0] + return ret + + +def _to_dask_cudf(futures, client=None, verbose=False): + """ + Convert a list of futures containing cudf Dataframes into a Dask.Dataframe + :param futures: list[cudf.Dataframe] list of futures containing dataframes + :param client: dask.distributed.Client Optional client to use + :return: dask.Dataframe a dask.Dataframe + """ + c = default_client() if client is None else client + # Convert a list of futures containing dfs back into a dask_cudf + dfs = [d for d in futures if d.type != type(None)] # NOQA + if verbose: + print("to_dask_cudf dfs=%s" % str(dfs)) + meta_future = c.submit(_get_meta, dfs[0], pure=False) + meta = meta_future.result() + return dd.from_delayed(dfs, meta=meta) +''' + +""" Internal methods, API subject to change """ + + +def _workers_to_parts(futures): + """ + Builds an ordered dict mapping each worker to their list + of parts + :param futures: list of (worker, part) tuples + :return: + """ + w_to_p_map = OrderedDict() + for w, p in futures: + if w not in w_to_p_map: + w_to_p_map[w] = [] + w_to_p_map[w].append(p) + return w_to_p_map + +''' +def _get_ary_meta(ary): + + if isinstance(ary, cp.ndarray): + return ary.shape, ary.dtype + elif isinstance(ary, cudf.DataFrame): + return ary.shape, first(set(ary.dtypes)) + else: + raise ValueError("Expected dask.Dataframe " + "or dask.Array, received %s" % type(ary)) +''' + +def _get_rows(objs, multiple): + def get_obj(x): return x[0] if multiple else x + total = list(map(lambda x: get_obj(x).shape[0], objs)) + return total, reduce(lambda a, b: a + b, total) + +''' +def to_dask_cupy(futures, dtype=None, shapes=None, client=None): + + wait(futures) + + c = default_client() if client is None else client + meta = [c.submit(_get_ary_meta, future, pure=False) + for future in futures] + + objs = [] + for i in range(len(futures)): + if not isinstance(futures[i].type, type(None)): + met_future = meta[i] + met = met_future.result() + obj = da.from_delayed(futures[i], shape=met[0], + dtype=met[1]) + objs.append(obj) + + return da.concatenate(objs, axis=0) +''' diff --git a/python/cugraph/dask/common/part_utils.py b/python/cugraph/dask/common/part_utils.py new file mode 100644 index 0000000000..bf21e921d3 --- /dev/null +++ b/python/cugraph/dask/common/part_utils.py @@ -0,0 +1,157 @@ +# Copyright (c) 2019, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import numpy as np +from collections import OrderedDict + +from functools import reduce +from tornado import gen +from collections import Sequence +from dask.distributed import futures_of, default_client, wait +from toolz import first + +from dask.array.core import Array as daskArray +from dask_cudf.core import DataFrame as daskDataFrame +from dask_cudf.core import Series as daskSeries + +#from cuml.dask.common.utils import parse_host_port + +''' +def hosts_to_parts(futures): + """ + Builds an ordered dict mapping each host to their list + of parts + :param futures: list of (worker, part) tuples + :return: + """ + w_to_p_map = OrderedDict() + for w, p in futures: + host, port = parse_host_port(w) + host_key = (host, port) + if host_key not in w_to_p_map: + w_to_p_map[host_key] = [] + w_to_p_map[host_key].append(p) + return w_to_p_map + + +def workers_to_parts(futures): + """ + Builds an ordered dict mapping each worker to their list + of parts + :param futures: list of (worker, part) tuples + :return: + """ + w_to_p_map = OrderedDict() + for w, p in futures: + if w not in w_to_p_map: + w_to_p_map[w] = [] + w_to_p_map[w].append(p) + return w_to_p_map + + +def _func_get_rows(df): + return df.shape[0] + + +def parts_to_ranks(client, worker_info, part_futures): + """ + Builds a list of (rank, size) tuples of partitions + :param worker_info: dict of {worker, {"rank": rank }}. Note: \ + This usually comes from the underlying communicator + :param part_futures: list of (worker, future) tuples + :return: [(part, size)] in the same order of part_futures + """ + futures = [(worker_info[wf[0]]["rank"], + client.submit(_func_get_rows, + wf[1], + workers=[wf[0]], + pure=False)) + for idx, wf in enumerate(part_futures)] + + sizes = client.compute(list(map(lambda x: x[1], futures)), sync=True) + total = reduce(lambda a, b: a + b, sizes) + + return [(futures[idx][0], size) for idx, size in enumerate(sizes)], total + + +def _default_part_getter(f, idx): return f[idx] + + +def flatten_grouped_results(client, gpu_futures, + worker_results_map, + getter_func=_default_part_getter): + """ + This function is useful when a series of partitions have been grouped by + the worker responsible for the data and the resulting partitions are + stored on each worker as a list. This happens when a communications + implementation is used which does not allow multiple ranks per device, so + the partitions need to be grouped on the ranks to be processed concurrently + using different streams. + + :param client: Dask client + :param gpu_futures: [(future, part)] worker to part list of tuples + :param worker_results_map: { rank: future } where future is a list + of data partitions on a Dask worker + :param getter_func: a function that takes a future and partition index + as arguments and returns the data for a specific partitions + :return: the ordered list of futures holding each partition on the workers + """ + futures = [] + completed_part_map = {} + for rank, part in gpu_futures: + if rank not in completed_part_map: + completed_part_map[rank] = 0 + + f = worker_results_map[rank] + + futures.append(client.submit( + getter_func, f, completed_part_map[rank])) + + completed_part_map[rank] += 1 + + return futures +''' + +@gen.coroutine +def _extract_partitions(dask_obj, client=None): + + client = default_client() if client is None else client + + # dask.dataframe or dask.array + if isinstance(dask_obj, (daskDataFrame, daskArray, daskSeries)): + persisted = client.persist(dask_obj) + parts = futures_of(persisted) + + # iterable of dask collections (need to colocate them) + elif isinstance(dask_obj, Sequence): + # NOTE: We colocate (X, y) here by zipping delayed + # n partitions of them as (X1, y1), (X2, y2)... + # and asking client to compute a single future for + # each tuple in the list + dela = [np.asarray(d.to_delayed()) for d in dask_obj] + + # TODO: ravel() is causing strange behavior w/ delayed Arrays which are + # not yet backed by futures. Need to investigate this behavior. + # ref: https://github.com/rapidsai/cuml/issues/2045 + raveled = [d.flatten() for d in dela] + parts = client.compute([p for p in zip(*raveled)]) + + yield wait(parts) + + key_to_part = [(str(part.key), part) for part in parts] + who_has = yield client.who_has(parts) + + raise gen.Return([(first(who_has[key]), part) + for key, part in key_to_part]) diff --git a/python/cugraph/dask/common/utils.py b/python/cugraph/dask/common/utils.py new file mode 100644 index 0000000000..1f1e09db24 --- /dev/null +++ b/python/cugraph/dask/common/utils.py @@ -0,0 +1,243 @@ +# Copyright (c) 2019, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import os +import numba.cuda +import random +import time + +from dask.distributed import default_client + +#from cuml.utils import device_of_gpu_matrix + +from asyncio import InvalidStateError + +from threading import Lock + +''' +def get_visible_devices(): + """ + Return a list of the CUDA_VISIBLE_DEVICES + :return: list[int] visible devices + """ + # TODO: Shouldn't have to split on every call + return os.environ["CUDA_VISIBLE_DEVICES"].split(",") + + +def device_of_devicendarray(devicendarray): + """ + Returns the device that backs memory allocated on the given + deviceNDArray + :param devicendarray: devicendarray array to check + :return: int device id + """ + dev = device_of_gpu_matrix(devicendarray) + return get_visible_devices()[dev] + + +def get_device_id(canonical_name): + """ + Given a local device id, find the actual "global" id + :param canonical_name: the local device name in CUDA_VISIBLE_DEVICES + :return: the global device id for the system + """ + dev_order = get_visible_devices() + idx = 0 + for dev in dev_order: + if dev == canonical_name: + return idx + idx += 1 + + return -1 + + +def select_device(dev, close=True): + """ + Use numbas numba to select the given device, optionally + closing and opening up a new cuda context if it fails. + :param dev: int device to select + :param close: bool close the cuda context and create new one? + """ + if numba.cuda.get_current_device().id != dev: + logging.warn("Selecting device " + str(dev)) + if close: + numba.cuda.close() + numba.cuda.select_device(dev) + if dev != numba.cuda.get_current_device().id: + logging.warn("Current device " + + str(numba.cuda.get_current_device()) + + " does not match expected " + str(dev)) +''' + +def get_client(client=None): + return default_client() if client is None else client + +''' +def parse_host_port(address): + """ + Given a string address with host/port, build a tuple(host, port) + :param address: string address to parse + :return: tuple(host, port) + """ + if '://' in address: + address = address.rsplit('://', 1)[1] + host, port = address.split(':') + port = int(port) + return host, port + + +def build_host_dict(workers): + """ + Builds a dict to map the set of ports running on each host to + the hostname. + :param workers: list(tuple(host, port)) list of worker addresses + :return: dict(host, set(port)) + """ + hosts = set(map(lambda x: parse_host_port(x), workers)) + hosts_dict = {} + for host, port in hosts: + if host not in hosts_dict: + hosts_dict[host] = set([port]) + else: + hosts_dict[host].add(port) + + return hosts_dict + + +def persist_across_workers(client, objects, workers=None): + """ + Calls persist on the 'objects' ensuring they are spread + across the workers on 'workers'. + + Parameters + ---------- + client : dask.distributed.Client + objects : list + Dask distributed objects to be persisted + workers : list or None + List of workers across which to persist objects + If None, then all workers attached to 'client' will be used + """ + if workers is None: + workers = client.has_what().keys() # Default to all workers + return client.persist(objects, workers={o: workers for o in objects}) + + +def raise_exception_from_futures(futures): + """Raises a RuntimeError if any of the futures indicates an exception""" + errs = [f.exception() for f in futures if f.exception()] + if errs: + raise RuntimeError("%d of %d worker jobs failed: %s" % ( + len(errs), len(futures), ", ".join(map(str, errs)) + )) + + +def raise_mg_import_exception(): + raise Exception("cuML has not been built with multiGPU support " + "enabled. Build with the --multigpu flag to" + " enable multiGPU support.") + + +class MultiHolderLock: + """ + A per-process synchronization lock allowing multiple concurrent holders + at any one time. This is used in situations where resources might be + limited and it's important that the number of concurrent users of + the resources are constained. + + This lock is serializable, but relies on a Python threading.Lock + underneath to properly synchronize internal state across threads. + Note that this lock is only intended to be used per-process and + the underlying threading.Lock will not be serialized. + """ + + def __init__(self, n): + """ + Initialize the lock + :param n : integer the maximum number of concurrent holders + """ + self.n = n + self.current_tasks = 0 + self.lock = Lock() + + def _acquire(self, blocking=True, timeout=10): + lock_acquired = False + + inner_lock_acquired = self.lock.acquire(blocking, timeout) + + if inner_lock_acquired and self.current_tasks < self.n - 1: + self.current_tasks += 1 + lock_acquired = True + self.lock.release() + + return lock_acquired + + def acquire(self, blocking=True, timeout=10): + """ + Acquire the lock. + :param blocking : bool will block if True + :param timeout : a timeout (in seconds) to wait for the lock + before failing. + :return : True if lock was acquired successfully, False otherwise + """ + + t = time.time() + + lock_acquired = self._acquire(blocking, timeout) + + while blocking and not lock_acquired: + + if time.time() - t > timeout: + raise TimeoutError() + + lock_acquired = self.acquire(blocking, timeout) + time.sleep(random.uniform(0, 0.01)) + + return lock_acquired + + def __getstate__(self): + d = self.__dict__.copy() + if "lock" in d: + del d["lock"] + return d + + def __setstate__(self, d): + d["lock"] = Lock() + self.__dict__ = d + + def release(self, blocking=True, timeout=10): + """ + Release a hold on the lock to allow another holder. Note that + while Python's threading.Lock does not have options for blocking + or timeout in release(), this lock uses a threading.Lock + internally and so will need to acquire that lock in order + to properly synchronize the underlying state. + :param blocking : bool will bock if True + :param timeout : a timeout (in seconds) to wait for the lock + before failing. + :return : True if lock was released successfully, False otherwise. + """ + + if self.current_tasks == 0: + raise InvalidStateError("Cannot release lock when no " + "concurrent tasks are executing") + + lock_acquired = self.lock.acquire(blocking, timeout) + if lock_acquired: + self.current_tasks -= 1 + self.lock.release() + return lock_acquired +''' diff --git a/python/cugraph/dask/opg_pagerank/__init__.py b/python/cugraph/dask/opg_pagerank/__init__.py new file mode 100644 index 0000000000..d87d20297c --- /dev/null +++ b/python/cugraph/dask/opg_pagerank/__init__.py @@ -0,0 +1,17 @@ +# +# Copyright (c) 2019, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from .opg_pagerank import pagerank diff --git a/python/cugraph/dask/opg_pagerank/pagerank.py b/python/cugraph/dask/opg_pagerank/pagerank.py new file mode 100644 index 0000000000..577ab6a2b0 --- /dev/null +++ b/python/cugraph/dask/opg_pagerank/pagerank.py @@ -0,0 +1,84 @@ +# Copyright (c) 2019-2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from cugraph.dask.common.comms import CommsContext +from cugraph.dask.common.input_utils import DistributedDataHandler +from dask.distributed import wait, default_client +from cugraph.dask.common.comms import worker_state +from cugraph.opg.link_analysis import mg_pagerank_wrapper as mg_pagerank + +def common_func(sID, data): + print(data) + sessionstate = worker_state(sID) + print("nworkers: ", sessionstate['nworkers']," id: ", sessionstate['wid']) + mg_pagerank.mg_pagerank(data[0], sessionstate['comm']) + return 1 + +def pagerank(input_graph): + print("INSIDE DASK PAGERANK") + client = default_client() + ddf = input_graph.edgelist.edgelist_df + + data = DistributedDataHandler.create(data=ddf) + + comms = CommsContext(comms_p2p=False) + comms.init(workers=data.workers) + + data.calculate_parts_to_sizes(comms) + #self.ranks = data.ranks + + print("Calling function") + result = dict([(data.worker_info[wf[0]]["rank"], + client.submit( + common_func, + comms.sessionId, + wf[1], + workers=[wf[0]])) + for idx, wf in enumerate(data.worker_to_parts.items())]) + wait(result) + print(result) + + +def get_n_gpus(): + import os + try: + return len(os.environ["CUDA_VISIBLE_DEVICES"].split(",")) + except KeyError: + return len(os.popen("nvidia-smi -L").read().strip().split("\n")) + + +def get_chunksize(input_path): + """ + Calculate the appropriate chunksize for dask_cudf.read_csv + to get a number of partitions equal to the number of GPUs + + Examples + -------- + >>> import dask_cugraph.pagerank as dcg + >>> chunksize = dcg.get_chunksize(edge_list.csv) + """ + + import os + from glob import glob + import math + + input_files = sorted(glob(str(input_path))) + if len(input_files) == 1: + size = os.path.getsize(input_files[0]) + chunksize = math.ceil(size/get_n_gpus()) + else: + size = [os.path.getsize(_file) for _file in input_files] + chunksize = max(size) + return chunksize diff --git a/python/cugraph/opg/__init__.py b/python/cugraph/opg/__init__.py new file mode 100644 index 0000000000..52a69a6408 --- /dev/null +++ b/python/cugraph/opg/__init__.py @@ -0,0 +1,12 @@ +# Copyright (c) 2019, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/cugraph/opg/link_analysis/__init__.py b/python/cugraph/opg/link_analysis/__init__.py new file mode 100644 index 0000000000..f3eea51b0c --- /dev/null +++ b/python/cugraph/opg/link_analysis/__init__.py @@ -0,0 +1 @@ +from cugraph.opg.link_analysis.mg_pagerank_wrapper import mg_pagerank diff --git a/python/cugraph/opg/link_analysis/mg_pagerank.pxd b/python/cugraph/opg/link_analysis/mg_pagerank.pxd new file mode 100644 index 0000000000..a20235e50d --- /dev/null +++ b/python/cugraph/opg/link_analysis/mg_pagerank.pxd @@ -0,0 +1,7 @@ +from cugraph.structure.graph_new cimport * + +cdef extern from "algorithms.hpp" namespace "cugraph": + + cdef void mg_pagerank_temp[VT,ET,WT]( + const GraphCSC[VT,ET,WT] &graph, + WT *pagerank) except + diff --git a/python/cugraph/opg/link_analysis/mg_pagerank_wrapper.pyx b/python/cugraph/opg/link_analysis/mg_pagerank_wrapper.pyx new file mode 100644 index 0000000000..83a3f93e93 --- /dev/null +++ b/python/cugraph/opg/link_analysis/mg_pagerank_wrapper.pyx @@ -0,0 +1,52 @@ +from cugraph.structure.utils_wrapper import * +from cugraph.opg.link_analysis cimport mg_pagerank as c_pagerank +import cudf +from cugraph.structure.graph_new cimport * +from cugraph.structure import graph_new_wrapper +from libc.stdint cimport uintptr_t +from cython.operator cimport dereference as deref + +def mg_pagerank(input_df, comm): + """ + Call pagerank + """ + + cdef size_t c_comm_size_t = comm + cdef c_pagerank.Comm* c_comm = c_comm_size_t + + [src, dst] = graph_new_wrapper.datatype_cast([input_df['src'], input_df['dst']], [np.int32]) + [weights] = graph_new_wrapper.datatype_cast([input_df['value']], [np.float32, np.float64]) + + offsets, indices, weights = coo2csr(dst, src, weights) + + num_verts = 34 #FIXME Get global number of vertices + num_edges = len(indices) + + df = cudf.DataFrame() + df['vertex'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) + df['pagerank'] = cudf.Series(np.zeros(num_verts, dtype=np.float32)) + + cdef uintptr_t c_identifier = df['vertex'].__cuda_array_interface__['data'][0]; + cdef uintptr_t c_pagerank_val = df['pagerank'].__cuda_array_interface__['data'][0]; + + cdef uintptr_t c_offsets = offsets.__cuda_array_interface__['data'][0] + cdef uintptr_t c_indices = indices.__cuda_array_interface__['data'][0] + cdef uintptr_t c_weights = NULL + + if weights is not None: + c_weights = weights.__cuda_array_interface__['data'][0] + + cdef GraphCSC[int,int,float] graph_float + cdef GraphCSC[int,int,double] graph_double + + if (df['pagerank'].dtype == np.float32): + graph_float = GraphCSC[int,int,float](c_offsets, c_indices, c_weights, num_verts, num_edges) + graph_float.set_communicator(deref(c_comm)) + c_pagerank.mg_pagerank_temp[int,int,float](graph_float, c_pagerank_val) + + else: + graph_double = GraphCSC[int,int,double](c_offsets, c_indices, c_weights, num_verts, num_edges) + graph_double.set_communicator(deref(c_comm)) + c_pagerank.mg_pagerank_temp[int,int,double](graph_double, c_pagerank_val) + + return df diff --git a/python/cugraph/structure/graph.py b/python/cugraph/structure/graph.py index 51854ee142..4e25a30357 100644 --- a/python/cugraph/structure/graph.py +++ b/python/cugraph/structure/graph.py @@ -29,8 +29,14 @@ def null_check(col): class Graph: class EdgeList: - def __init__(self, source, destination, edge_attr=None, - renumber_map=None): + def __init__(self, *args): + if len(args) == 1: + self.__from_dask_cudf(*args) + else: + self.__from_cudf(*args) + + def __from_cudf(self, source, destination, edge_attr=None, + renumber_map=None): self.renumber_map = renumber_map self.edgelist_df = cudf.DataFrame() self.edgelist_df['src'] = source @@ -44,6 +50,11 @@ def __init__(self, source, destination, edge_attr=None, else: self.edgelist_df['weights'] = edge_attr + def __from_dask_cudf(self, ddf): + self.renumber_map = None + self.edgelist_df = ddf + self.weights = False + class AdjList: def __init__(self, offsets, indices, value=None): self.offsets = offsets @@ -153,6 +164,7 @@ def from_cudf_edgelist(self, input_df, source='source', if self.edgelist is not None or self.adjlist is not None: raise Exception('Graph already has values') + if self.multi: if type(edge_attr) is not list: raise Exception('edge_attr should be a list of column names') @@ -187,7 +199,7 @@ def from_cudf_edgelist(self, input_df, source='source', else: source_col, dest_col = symmetrize(source_col, dest_col) - self.edgelist = Graph.EdgeList(source_col, dest_col, value_col, + self.edgelist = self.EdgeList(source_col, dest_col, value_col, renumber_map) def add_edge_list(self, source, destination, value=None): @@ -202,6 +214,10 @@ def add_edge_list(self, source, destination, value=None): else: self.from_cudf_edgelist(input_df) + + def from_dask_cudf_edgelist(self, input_ddf): + self.edgelist = self.EdgeList(input_ddf) + def view_edge_list(self): """ Display the edge list. Compute it if needed. @@ -327,7 +343,7 @@ def from_cudf_adjlist(self, offset_col, index_col, value_col=None): """ if self.edgelist is not None or self.adjlist is not None: raise Exception('Graph already has values') - self.adjlist = Graph.AdjList(offset_col, index_col, value_col) + self.adjlist = self.AdjList(offset_col, index_col, value_col) def add_adj_list(self, offset_col, index_col, value_col=None): warnings.warn('add_adj_list will be deprecated in next release.\ diff --git a/python/cugraph/structure/graph_new.pxd b/python/cugraph/structure/graph_new.pxd index 73e5510f73..2be26b5b8c 100644 --- a/python/cugraph/structure/graph_new.pxd +++ b/python/cugraph/structure/graph_new.pxd @@ -18,6 +18,10 @@ from libcpp cimport bool +cdef extern from "comms_mpi.hpp" namespace "cugraph::experimental": + cdef cppclass Comm: + pass + cdef extern from "graph.hpp" namespace "cugraph::experimental": ctypedef enum PropType: @@ -45,7 +49,7 @@ cdef extern from "graph.hpp" namespace "cugraph::experimental": ET number_of_edges void get_vertex_identifiers(VT *) const - + void set_communicator(Comm &comm_) GraphBase(WT*,VT,ET) cdef cppclass GraphCOO[VT,ET,WT](GraphBase[VT,ET,WT]): diff --git a/python/script.py b/python/script.py new file mode 100644 index 0000000000..98ae3a56f4 --- /dev/null +++ b/python/script.py @@ -0,0 +1,37 @@ +import cugraph.dask.opg_pagerank as dcg +from dask.distributed import Client +import gc +import cudf + +import cugraph +import dask_cudf + +## Move to conftest +from dask_cuda import LocalCUDACluster +#cluster = LocalCUDACluster(protocol="tcp", scheduler_port=0) +## + +def test_dask_pagerank(): + + gc.collect() + cluster = LocalCUDACluster(protocol="tcp", scheduler_port=0) + client = Client(cluster) + + input_data_path = r"../datasets/karate.csv" + + chunksize = dcg.get_chunksize(input_data_path) + + ddf = dask_cudf.read_csv(input_data_path, chunksize=chunksize, + delimiter=' ', + names=['src', 'dst', 'value'], + dtype=['int32', 'int32', 'float32']) + + + + g = cugraph.DiGraph() + g.from_dask_cudf_edgelist(ddf) + + dcg.pagerank(g) + + client.close() + cluster.close()