diff --git a/cpp/include/raft/sparse/distance/coo_spmv.cuh b/cpp/include/raft/sparse/distance/coo_spmv.cuh index 4f979aa6f1..24be171900 100644 --- a/cpp/include/raft/sparse/distance/coo_spmv.cuh +++ b/cpp/include/raft/sparse/distance/coo_spmv.cuh @@ -34,8 +34,6 @@ #include -#include - namespace raft { namespace sparse { namespace distance { diff --git a/cpp/include/raft/sparse/distance/coo_spmv_strategies/base_strategy.cuh b/cpp/include/raft/sparse/distance/coo_spmv_strategies/base_strategy.cuh index 5ace978a23..194799aed0 100644 --- a/cpp/include/raft/sparse/distance/coo_spmv_strategies/base_strategy.cuh +++ b/cpp/include/raft/sparse/distance/coo_spmv_strategies/base_strategy.cuh @@ -23,7 +23,6 @@ #include #include -#include namespace raft { namespace sparse { diff --git a/cpp/include/raft/sparse/distance/coo_spmv_strategies/coo_mask_row_iterators.cuh b/cpp/include/raft/sparse/distance/coo_spmv_strategies/coo_mask_row_iterators.cuh index 44c3833f96..74eb37bc2b 100644 --- a/cpp/include/raft/sparse/distance/coo_spmv_strategies/coo_mask_row_iterators.cuh +++ b/cpp/include/raft/sparse/distance/coo_spmv_strategies/coo_mask_row_iterators.cuh @@ -20,7 +20,6 @@ #include "../utils.cuh" #include -#include namespace raft { namespace sparse { diff --git a/cpp/include/raft/sparse/distance/coo_spmv_strategies/hash_strategy.cuh b/cpp/include/raft/sparse/distance/coo_spmv_strategies/hash_strategy.cuh index 1295d24103..a95c6ff85b 100644 --- a/cpp/include/raft/sparse/distance/coo_spmv_strategies/hash_strategy.cuh +++ b/cpp/include/raft/sparse/distance/coo_spmv_strategies/hash_strategy.cuh @@ -55,7 +55,7 @@ class hash_strategy : public coo_spmv_strategy { rmm::device_uvector &mask_indptr, std::tuple &n_rows_divided, cudaStream_t stream) { - auto policy = rmm::exec_policy(stream); + auto policy = this->config.handle.get_thrust_policy(); auto less = thrust::copy_if( policy, thrust::make_counting_iterator(value_idx(0)), diff --git a/cpp/include/raft/sparse/distance/l2_distance.cuh b/cpp/include/raft/sparse/distance/l2_distance.cuh index 5f89101082..534280191b 100644 --- a/cpp/include/raft/sparse/distance/l2_distance.cuh +++ b/cpp/include/raft/sparse/distance/l2_distance.cuh @@ -245,9 +245,8 @@ class hellinger_expanded_distances_t : public distances_t { : config_(&config), workspace(0, config.handle.get_stream()) {} void compute(value_t *out_dists) { - raft::mr::device::buffer coo_rows( - config_->handle.get_device_allocator(), config_->handle.get_stream(), - max(config_->b_nnz, config_->a_nnz)); + rmm::device_uvector coo_rows(max(config_->b_nnz, config_->a_nnz), + config_->handle.get_stream()); raft::sparse::convert::csr_to_coo(config_->b_indptr, config_->b_nrows, coo_rows.data(), config_->b_nnz, diff --git a/cpp/include/raft/sparse/hierarchy/detail/agglomerative.cuh b/cpp/include/raft/sparse/hierarchy/detail/agglomerative.cuh index 187985627f..2a7a8b3e4e 100644 --- a/cpp/include/raft/sparse/hierarchy/detail/agglomerative.cuh +++ b/cpp/include/raft/sparse/hierarchy/detail/agglomerative.cuh @@ -21,7 +21,6 @@ #include #include #include -#include #include #include @@ -224,7 +223,7 @@ void extract_flattened_clusters(const raft::handle_t &handle, value_idx *labels, const value_idx *children, size_t n_clusters, size_t n_leaves) { auto stream = handle.get_stream(); - auto thrust_policy = rmm::exec_policy(rmm::cuda_stream_view{stream}); + auto thrust_policy = handle.get_thrust_policy(); // Handle special case where n_clusters == 1 if (n_clusters == 1) { diff --git a/cpp/include/raft/sparse/hierarchy/detail/connectivities.cuh b/cpp/include/raft/sparse/hierarchy/detail/connectivities.cuh index b6ec190a98..31e4a0f263 100644 --- a/cpp/include/raft/sparse/hierarchy/detail/connectivities.cuh +++ b/cpp/include/raft/sparse/hierarchy/detail/connectivities.cuh @@ -22,7 +22,6 @@ #include #include -#include #include #include @@ -61,7 +60,7 @@ struct distance_graph_impl &indices, rmm::device_uvector &data, int c) { auto stream = handle.get_stream(); - auto exec_policy = rmm::exec_policy(rmm::cuda_stream_view{stream}); + auto thrust_policy = handle.get_thrust_policy(); // Need to symmetrize knn into undirected graph raft::sparse::COO knn_graph_coo(stream); @@ -77,7 +76,7 @@ struct distance_graph_impl &tup) { bool self_loop = thrust::get<0>(tup) == thrust::get<1>(tup); diff --git a/cpp/include/raft/sparse/hierarchy/detail/mst.cuh b/cpp/include/raft/sparse/hierarchy/detail/mst.cuh index 033d5881d5..6ef6f9879b 100644 --- a/cpp/include/raft/sparse/hierarchy/detail/mst.cuh +++ b/cpp/include/raft/sparse/hierarchy/detail/mst.cuh @@ -25,12 +25,9 @@ #include #include -#include - #include #include #include -#include namespace raft { namespace hierarchy { diff --git a/cpp/include/raft/sparse/linalg/spectral.cuh b/cpp/include/raft/sparse/linalg/spectral.cuh index 28b9190c53..d15c2cdf23 100644 --- a/cpp/include/raft/sparse/linalg/spectral.cuh +++ b/cpp/include/raft/sparse/linalg/spectral.cuh @@ -63,7 +63,7 @@ void fit_embedding(const raft::handle_t &handle, int *rows, int *cols, T *vals, index_type maxiter = 4000; //default reset value (when set to 0); value_type tol = 0.01; index_type restart_iter = 15 + neigvs; //what cugraph is using - auto t_exe_p = thrust::cuda::par.on(stream); + auto t_exe_p = handle.get_thrust_policy(); using thrust_exe_policy_t = decltype(t_exe_p); raft::eigen_solver_config_t cfg{neigvs, maxiter, @@ -83,8 +83,7 @@ void fit_embedding(const raft::handle_t &handle, int *rows, int *cols, T *vals, using value_type_t = value_type; std::pair solve( - handle_t const &handle, thrust_exe_policy_t t_exe_policy, - size_type_t n_obs_vecs, size_type_t dim, + handle_t const &handle, size_type_t n_obs_vecs, size_type_t dim, value_type_t const *__restrict__ obs, index_type_t *__restrict__ codes) const { return std::make_pair(0, 0); diff --git a/cpp/include/raft/sparse/linalg/symmetrize.cuh b/cpp/include/raft/sparse/linalg/symmetrize.cuh index 5fcd336551..614c9d830e 100644 --- a/cpp/include/raft/sparse/linalg/symmetrize.cuh +++ b/cpp/include/raft/sparse/linalg/symmetrize.cuh @@ -30,7 +30,6 @@ #include #include -#include #include #include diff --git a/cpp/include/raft/sparse/mst/detail/mst_solver_inl.cuh b/cpp/include/raft/sparse/mst/detail/mst_solver_inl.cuh index c5ba4fcb4f..029b76a945 100644 --- a/cpp/include/raft/sparse/mst/detail/mst_solver_inl.cuh +++ b/cpp/include/raft/sparse/mst/detail/mst_solver_inl.cuh @@ -24,7 +24,6 @@ #include #include -#include #include #include @@ -38,7 +37,6 @@ #include #include -#include namespace raft { namespace mst { @@ -88,7 +86,7 @@ MST_solver::MST_solver( sm_count = handle_.get_device_properties().multiProcessorCount; //Initially, color holds the vertex id as color - auto policy = rmm::exec_policy(rmm::cuda_stream_view{stream}); + auto policy = handle.get_thrust_policy(); if (initialize_colors_) { thrust::sequence(policy, color.begin(), color.end(), 0); thrust::sequence(policy, color_index, color_index + v, 0); @@ -227,7 +225,7 @@ template alteration_t MST_solver::alteration_max() { - auto policy = rmm::exec_policy(rmm::cuda_stream_view{stream}); + auto policy = handle.get_thrust_policy(); rmm::device_vector tmp(e); thrust::device_ptr weights_ptr(weights); thrust::copy(policy, weights_ptr, weights_ptr + e, tmp.begin()); @@ -327,7 +325,7 @@ template void MST_solver::min_edge_per_vertex() { - auto policy = rmm::exec_policy(rmm::cuda_stream_view{stream}); + auto policy = handle.get_thrust_policy(); thrust::fill(policy, min_edge_color.begin(), min_edge_color.end(), std::numeric_limits::max()); thrust::fill(policy, new_mst_edge.begin(), new_mst_edge.end(), @@ -354,7 +352,7 @@ void MST_solver::max()); @@ -411,7 +409,7 @@ template void MST_solver::append_src_dst_pair( vertex_t* mst_src, vertex_t* mst_dst, weight_t* mst_weights) { - auto policy = rmm::exec_policy(rmm::cuda_stream_view{stream}); + auto policy = handle.get_thrust_policy(); auto curr_mst_edge_count = prev_mst_edge_count[0]; diff --git a/cpp/include/raft/sparse/op/reduce.cuh b/cpp/include/raft/sparse/op/reduce.cuh index 2708f0491e..09a35720fb 100644 --- a/cpp/include/raft/sparse/op/reduce.cuh +++ b/cpp/include/raft/sparse/op/reduce.cuh @@ -31,7 +31,6 @@ #include #include #include -#include #include #include @@ -126,16 +125,15 @@ void max_duplicates(const raft::handle_t &handle, const value_idx *rows, const value_idx *cols, const value_t *vals, size_t nnz, size_t m, size_t n) { auto stream = handle.get_stream(); - - auto exec_policy = rmm::exec_policy(rmm::cuda_stream_view{stream}); + auto thrust_policy = handle.get_thrust_policy(); // compute diffs & take exclusive scan rmm::device_uvector diff(nnz + 1, stream); compute_duplicates_mask(diff.data(), rows, cols, nnz, stream); - thrust::exclusive_scan(thrust::cuda::par.on(stream), diff.data(), - diff.data() + diff.size(), diff.data()); + thrust::exclusive_scan(thrust_policy, diff.data(), diff.data() + diff.size(), + diff.data()); // compute final size value_idx size = 0; diff --git a/cpp/include/raft/sparse/selection/connect_components.cuh b/cpp/include/raft/sparse/selection/connect_components.cuh index 9b02ae67e6..390522c9bc 100644 --- a/cpp/include/raft/sparse/selection/connect_components.cuh +++ b/cpp/include/raft/sparse/selection/connect_components.cuh @@ -30,7 +30,6 @@ #include #include #include -#include #include @@ -361,7 +360,7 @@ void connect_components(const raft::handle_t &handle, raft::sparse::op::compute_duplicates_mask(out_index.data(), colors.data(), nn_colors.data(), n_rows, stream); - thrust::exclusive_scan(thrust::cuda::par.on(stream), out_index.data(), + thrust::exclusive_scan(handle.get_thrust_policy(), out_index.data(), out_index.data() + out_index.size(), out_index.data()); // compute final size diff --git a/cpp/include/raft/spectral/cluster_solvers.hpp b/cpp/include/raft/spectral/cluster_solvers.hpp index 922ae7cfab..6f507331d9 100644 --- a/cpp/include/raft/spectral/cluster_solvers.hpp +++ b/cpp/include/raft/spectral/cluster_solvers.hpp @@ -42,19 +42,16 @@ struct kmeans_solver_t { size_type_t> const& config) : config_(config) {} - template std::pair solve( - handle_t const& handle, thrust_exe_policy_t t_exe_policy, - size_type_t n_obs_vecs, size_type_t dim, + handle_t const& handle, size_type_t n_obs_vecs, size_type_t dim, value_type_t const* __restrict__ obs, index_type_t* __restrict__ codes) const { RAFT_EXPECTS(obs != nullptr, "Null obs buffer."); RAFT_EXPECTS(codes != nullptr, "Null codes buffer."); value_type_t residual{}; index_type_t iters{}; - kmeans(handle, t_exe_policy, n_obs_vecs, dim, config_.n_clusters, - config_.tol, config_.maxIter, obs, codes, residual, iters, - config_.seed); + kmeans(handle, n_obs_vecs, dim, config_.n_clusters, config_.tol, + config_.maxIter, obs, codes, residual, iters, config_.seed); return std::make_pair(residual, iters); } diff --git a/cpp/include/raft/spectral/kmeans.hpp b/cpp/include/raft/spectral/kmeans.hpp index fb05bff3e2..5928c727c6 100644 --- a/cpp/include/raft/spectral/kmeans.hpp +++ b/cpp/include/raft/spectral/kmeans.hpp @@ -325,7 +325,6 @@ static __global__ void divideCentroids( * Centroid is randomly chosen with k-means++ algorithm. * @tparam index_type_t the type of data used for indexing. * @tparam value_type_t the type of data used for weights, distances. - * @tparam thrust_exe_pol_t the type of thrust execution policy. * @param handle the raft handle. * @param n Number of observation vectors. * @param d Dimension of observation vectors. @@ -341,12 +340,9 @@ static __global__ void divideCentroids( * coordinates. * @return Zero if successful. Otherwise non-zero. */ -template -static int chooseNewCentroid(handle_t const& handle, - thrust_exe_pol_t thrust_exec_policy, - index_type_t n, index_type_t d, index_type_t k, - value_type_t rand, +template +static int chooseNewCentroid(handle_t const& handle, index_type_t n, + index_type_t d, index_type_t k, value_type_t rand, const value_type_t* __restrict__ obs, value_type_t* __restrict__ dists, value_type_t* __restrict__ centroid) { @@ -357,8 +353,9 @@ static int chooseNewCentroid(handle_t const& handle, // Observation vector that is chosen as new centroid index_type_t obsIndex; - auto cublas_h = handle.get_cublas_handle(); auto stream = handle.get_stream(); + auto cublas_h = handle.get_cublas_handle(); + auto thrust_exec_policy = handle.get_thrust_policy(); // Compute cumulative sum of distances thrust::inclusive_scan(thrust_exec_policy, thrust::device_pointer_cast(dists), @@ -417,10 +414,7 @@ static int chooseNewCentroid(handle_t const& handle, * Centroids are randomly chosen with k-means++ algorithm * @tparam index_type_t the type of data used for indexing. * @tparam value_type_t the type of data used for weights, distances. - * @tparam thrust_exe_pol_t the type of thrust execution policy. * @param handle the raft handle. - * @param thrust_exec_policy thrust execution policy - * (assumed to have same stream as handle.stream). * @param n Number of observation vectors. * @param d Dimension of observation vectors. * @param k Number of clusters. @@ -439,14 +433,12 @@ static int chooseNewCentroid(handle_t const& handle, * distance between observation vectors and the closest centroid. * @return Zero if successful. Otherwise non-zero. */ -template +template static int initializeCentroids( - handle_t const& handle, thrust_exe_pol_t thrust_exec_policy, index_type_t n, - index_type_t d, index_type_t k, const value_type_t* __restrict__ obs, - value_type_t* __restrict__ centroids, index_type_t* __restrict__ codes, - index_type_t* __restrict__ clusterSizes, value_type_t* __restrict__ dists, - unsigned long long seed) { + handle_t const& handle, index_type_t n, index_type_t d, index_type_t k, + const value_type_t* __restrict__ obs, value_type_t* __restrict__ centroids, + index_type_t* __restrict__ codes, index_type_t* __restrict__ clusterSizes, + value_type_t* __restrict__ dists, unsigned long long seed) { // ------------------------------------------------------- // Variable declarations // ------------------------------------------------------- @@ -458,8 +450,9 @@ static int initializeCentroids( thrust::default_random_engine rng(seed); thrust::uniform_real_distribution uniformDist(0, 1); - auto cublas_h = handle.get_cublas_handle(); auto stream = handle.get_stream(); + auto cublas_h = handle.get_cublas_handle(); + auto thrust_exec_policy = handle.get_thrust_policy(); constexpr index_type_t grid_lower_bound{65535}; @@ -486,8 +479,8 @@ static int initializeCentroids( thrust::fill(thrust_exec_policy, thrust::device_pointer_cast(dists), thrust::device_pointer_cast(dists + n), 1); CHECK_CUDA(stream); - if (chooseNewCentroid(handle, thrust_exec_policy, n, d, k, uniformDist(rng), - obs, dists, centroids)) + if (chooseNewCentroid(handle, n, d, k, uniformDist(rng), obs, dists, + centroids)) WARNING("error in k-means++ (could not pick centroid)"); // Compute distances from first centroid @@ -499,8 +492,8 @@ static int initializeCentroids( // Choose remaining centroids for (i = 1; i < k; ++i) { // Choose ith centroid - if (chooseNewCentroid(handle, thrust_exec_policy, n, d, k, uniformDist(rng), - obs, dists, centroids + IDX(0, i, d))) + if (chooseNewCentroid(handle, n, d, k, uniformDist(rng), obs, dists, + centroids + IDX(0, i, d))) WARNING("error in k-means++ (could not pick centroid)"); // Compute distances from ith centroid @@ -529,10 +522,7 @@ static int initializeCentroids( * Distance is measured with Euclidean norm. * @tparam index_type_t the type of data used for indexing. * @tparam value_type_t the type of data used for weights, distances. - * @tparam thrust_exe_pol_t the type of thrust execution policy. * @param handle the raft handle. - * @param thrust_exec_policy thrust execution policy - * (assumed to have same stream as handle.stream). * @param n Number of observation vectors. * @param d Dimension of observation vectors. * @param k Number of clusters. @@ -553,16 +543,18 @@ static int initializeCentroids( * of squares of assignment. * @return Zero if successful. Otherwise non-zero. */ -template -static int assignCentroids( - handle_t const& handle, thrust_exe_pol_t thrust_exec_policy, index_type_t n, - index_type_t d, index_type_t k, const value_type_t* __restrict__ obs, - const value_type_t* __restrict__ centroids, value_type_t* __restrict__ dists, - index_type_t* __restrict__ codes, index_type_t* __restrict__ clusterSizes, - value_type_t* residual_host) { - auto cublas_h = handle.get_cublas_handle(); +template +static int assignCentroids(handle_t const& handle, index_type_t n, + index_type_t d, index_type_t k, + const value_type_t* __restrict__ obs, + const value_type_t* __restrict__ centroids, + value_type_t* __restrict__ dists, + index_type_t* __restrict__ codes, + index_type_t* __restrict__ clusterSizes, + value_type_t* residual_host) { auto stream = handle.get_stream(); + auto cublas_h = handle.get_cublas_handle(); + auto thrust_exec_policy = handle.get_thrust_policy(); // Compute distance between centroids and observation vectors CUDA_TRY(cudaMemsetAsync(dists, 0, n * k * sizeof(value_type_t), stream)); @@ -606,10 +598,7 @@ static int assignCentroids( * All clusters are assumed to be non-empty. * @tparam index_type_t the type of data used for indexing. * @tparam value_type_t the type of data used for weights, distances. - * @tparam thrust_exe_pol_t the type of thrust execution policy. * @param handle the raft handle. - * @param thrust_exec_policy thrust execution policy - * (assumed to have same stream as handle.stream). * @param n Number of observation vectors. * @param d Dimension of observation vectors. * @param k Number of clusters. @@ -628,10 +617,8 @@ static int assignCentroids( * Workspace. * @return Zero if successful. Otherwise non-zero. */ -template -static int updateCentroids(handle_t const& handle, - thrust_exe_pol_t thrust_exec_policy, index_type_t n, +template +static int updateCentroids(handle_t const& handle, index_type_t n, index_type_t d, index_type_t k, const value_type_t* __restrict__ obs, const index_type_t* __restrict__ codes, @@ -649,8 +636,9 @@ static int updateCentroids(handle_t const& handle, constexpr index_type_t grid_lower_bound{65535}; - auto cublas_h = handle.get_cublas_handle(); auto stream = handle.get_stream(); + auto cublas_h = handle.get_cublas_handle(); + auto thrust_exec_policy = handle.get_thrust_policy(); // Device memory thrust::device_ptr obs_copy(work); @@ -722,10 +710,7 @@ namespace raft { * k-means++ algorithm. * @tparam index_type_t the type of data used for indexing. * @tparam value_type_t the type of data used for weights, distances. - * @tparam thrust_exe_pol_t the type of thrust execution policy. * @param handle the raft handle. - * @param thrust_exec_policy thrust execution policy - * (assumed to have same stream as handle.stream). * @param n Number of observation vectors. * @param d Dimension of observation vectors. * @param k Number of clusters. @@ -754,11 +739,10 @@ namespace raft { * @param seed random seed to be used. * @return error flag. */ -template -int kmeans(handle_t const& handle, thrust_exe_pol_t thrust_exec_policy, - index_type_t n, index_type_t d, index_type_t k, value_type_t tol, - index_type_t maxiter, const value_type_t* __restrict__ obs, +template +int kmeans(handle_t const& handle, index_type_t n, index_type_t d, + index_type_t k, value_type_t tol, index_type_t maxiter, + const value_type_t* __restrict__ obs, index_type_t* __restrict__ codes, index_type_t* __restrict__ clusterSizes, value_type_t* __restrict__ centroids, @@ -785,16 +769,17 @@ int kmeans(handle_t const& handle, thrust_exe_pol_t thrust_exec_policy, // Initialization // ------------------------------------------------------- - auto cublas_h = handle.get_cublas_handle(); auto stream = handle.get_stream(); + auto cublas_h = handle.get_cublas_handle(); + auto thrust_exec_policy = handle.get_thrust_policy(); // Trivial cases if (k == 1) { CUDA_TRY(cudaMemsetAsync(codes, 0, n * sizeof(index_type_t), stream)); CUDA_TRY(cudaMemcpyAsync(clusterSizes, &n, sizeof(index_type_t), cudaMemcpyHostToDevice, stream)); - if (updateCentroids(handle, thrust_exec_policy, n, d, k, obs, codes, - clusterSizes, centroids, work, work_int)) + if (updateCentroids(handle, n, d, k, obs, codes, clusterSizes, centroids, + work, work_int)) WARNING("could not compute k-means centroids"); dim3 blockDim{WARP_SIZE, 1, BLOCK_SIZE / WARP_SIZE}; @@ -840,21 +825,21 @@ int kmeans(handle_t const& handle, thrust_exe_pol_t thrust_exec_policy, // ------------------------------------------------------- // Choose initial cluster centroids - if (initializeCentroids(handle, thrust_exec_policy, n, d, k, obs, centroids, - codes, clusterSizes, work, seed)) + if (initializeCentroids(handle, n, d, k, obs, centroids, codes, clusterSizes, + work, seed)) WARNING("could not initialize k-means centroids"); // Apply k-means iteration until convergence for (iter = 0; iter < maxiter; ++iter) { // Update cluster centroids - if (updateCentroids(handle, thrust_exec_policy, n, d, k, obs, codes, - clusterSizes, centroids, work, work_int)) + if (updateCentroids(handle, n, d, k, obs, codes, clusterSizes, centroids, + work, work_int)) WARNING("could not update k-means centroids"); // Determine centroid closest to each observation residualPrev = *residual_host; - if (assignCentroids(handle, thrust_exec_policy, n, d, k, obs, centroids, - work, codes, clusterSizes, residual_host)) + if (assignCentroids(handle, n, d, k, obs, centroids, work, codes, + clusterSizes, residual_host)) WARNING("could not assign observation vectors to k-means clusters"); // Reinitialize empty clusters with new centroids @@ -868,12 +853,11 @@ int kmeans(handle_t const& handle, thrust_exe_pol_t thrust_exec_policy, // conditions, such as if obs is corrupt (as seen as a result of a // DataFrame column of NULL edge vals used to create the Graph) while (emptyCentroid < k) { - if (chooseNewCentroid(handle, thrust_exec_policy, n, d, k, - uniformDist(rng), obs, work, + if (chooseNewCentroid(handle, n, d, k, uniformDist(rng), obs, work, centroids + IDX(0, emptyCentroid, d))) WARNING("could not replace empty centroid"); - if (assignCentroids(handle, thrust_exec_policy, n, d, k, obs, centroids, - work, codes, clusterSizes, residual_host)) + if (assignCentroids(handle, n, d, k, obs, centroids, work, codes, + clusterSizes, residual_host)) WARNING("could not assign observation vectors to k-means clusters"); emptyCentroid = (thrust::find(thrust_exec_policy, @@ -905,10 +889,7 @@ int kmeans(handle_t const& handle, thrust_exe_pol_t thrust_exec_policy, * k-means++ algorithm. * @tparam index_type_t the type of data used for indexing. * @tparam value_type_t the type of data used for weights, distances. - * @tparam thrust_exe_pol_t the type of thrust execution policy. * @param handle the raft handle. - * @param thrust_exec_policy thrust execution policy - * (assumed to have same stream as handle.stream). * @param n Number of observation vectors. * @param d Dimension of observation vectors. * @param k Number of clusters. @@ -926,11 +907,10 @@ int kmeans(handle_t const& handle, thrust_exe_pol_t thrust_exec_policy, * @param seed random seed to be used. * @return error flag */ -template -int kmeans(handle_t const& handle, thrust_exe_pol_t thrust_exec_policy, - index_type_t n, index_type_t d, index_type_t k, value_type_t tol, - index_type_t maxiter, const value_type_t* __restrict__ obs, +template +int kmeans(handle_t const& handle, index_type_t n, index_type_t d, + index_type_t k, value_type_t tol, index_type_t maxiter, + const value_type_t* __restrict__ obs, index_type_t* __restrict__ codes, value_type_t& residual, index_type_t& iters, unsigned long long seed = 123456) { using namespace matrix; @@ -950,9 +930,8 @@ int kmeans(handle_t const& handle, thrust_exe_pol_t thrust_exec_policy, // Perform k-means return kmeans( - handle, thrust_exec_policy, n, d, k, tol, maxiter, obs, codes, - clusterSizes.raw(), centroids.raw(), work.raw(), work_int.raw(), &residual, - &iters, seed); + handle, n, d, k, tol, maxiter, obs, codes, clusterSizes.raw(), + centroids.raw(), work.raw(), work_int.raw(), &residual, &iters, seed); } } // namespace raft diff --git a/cpp/include/raft/spectral/matrix_wrappers.hpp b/cpp/include/raft/spectral/matrix_wrappers.hpp index efa98313b6..42fc621a1a 100644 --- a/cpp/include/raft/spectral/matrix_wrappers.hpp +++ b/cpp/include/raft/spectral/matrix_wrappers.hpp @@ -85,7 +85,8 @@ template class vector_t { public: vector_t(handle_t const& raft_handle, size_type sz) - : buffer_(sz, raft_handle.get_stream()) {} + : buffer_(sz, raft_handle.get_stream()), + thrust_policy(raft_handle.get_thrust_policy()) {} size_type size(void) const { return buffer_.size(); } @@ -93,9 +94,8 @@ class vector_t { value_type const* raw(void) const { return buffer_.data(); } - template - value_type nrm1(ThrustExecPolicy t_exe_pol) const { - return thrust::reduce(t_exe_pol, buffer_.data(), + value_type nrm1() const { + return thrust::reduce(thrust_policy, buffer_.data(), buffer_.data() + buffer_.size(), value_type{0}, [] __device__(auto left, auto right) { auto abs_left = left > 0 ? left : -left; @@ -104,13 +104,15 @@ class vector_t { }); } - template - void fill(ThrustExecPolicy t_exe_pol, value_type value) { - thrust::fill_n(t_exe_pol, buffer_.data(), buffer_.size(), value); + void fill(value_type value) { + thrust::fill_n(thrust_policy, buffer_.data(), buffer_.size(), value); } private: + using thrust_exec_policy_t = thrust::detail::execute_with_allocator< + rmm::mr::thrust_allocator, thrust::cuda_cub::execute_on_stream_base>; rmm::device_uvector buffer_; + const thrust_exec_policy_t thrust_policy; }; template @@ -262,31 +264,26 @@ struct sparse_matrix_t { template struct laplacian_matrix_t : sparse_matrix_t { - template - laplacian_matrix_t(handle_t const& raft_handle, - ThrustExePolicy thrust_exec_policy, - index_type const* row_offsets, + laplacian_matrix_t(handle_t const& raft_handle, index_type const* row_offsets, index_type const* col_indices, value_type const* values, index_type const nrows, index_type const nnz) : sparse_matrix_t(raft_handle, row_offsets, col_indices, values, nrows, nnz), diagonal_(raft_handle, nrows) { vector_t ones{raft_handle, nrows}; - ones.fill(thrust_exec_policy, 1.0); + ones.fill(1.0); sparse_matrix_t::mv(1, ones.raw(), 0, diagonal_.raw()); } - template laplacian_matrix_t(handle_t const& raft_handle, - ThrustExePolicy thrust_exec_policy, sparse_matrix_t const& csr_m) : sparse_matrix_t(raft_handle, csr_m.row_offsets_, csr_m.col_indices_, csr_m.values_, csr_m.nrows_, csr_m.nnz_), diagonal_(raft_handle, csr_m.nrows_) { vector_t ones{raft_handle, csr_m.nrows_}; - ones.fill(thrust_exec_policy, 1.0); + ones.fill(1.0); sparse_matrix_t::mv(1, ones.raw(), 0, diagonal_.raw()); } @@ -333,27 +330,19 @@ struct laplacian_matrix_t : sparse_matrix_t { template struct modularity_matrix_t : laplacian_matrix_t { - template modularity_matrix_t(handle_t const& raft_handle, - ThrustExePolicy thrust_exec_policy, index_type const* row_offsets, index_type const* col_indices, value_type const* values, index_type const nrows, index_type const nnz) : laplacian_matrix_t( - raft_handle, thrust_exec_policy, row_offsets, col_indices, values, - nrows, nnz) { - edge_sum_ = laplacian_matrix_t::diagonal_.nrm1( - thrust_exec_policy); + raft_handle, row_offsets, col_indices, values, nrows, nnz) { + edge_sum_ = laplacian_matrix_t::diagonal_.nrm1(); } - template modularity_matrix_t(handle_t const& raft_handle, - ThrustExePolicy thrust_exec_policy, sparse_matrix_t const& csr_m) - : laplacian_matrix_t(raft_handle, - thrust_exec_policy, csr_m) { - edge_sum_ = laplacian_matrix_t::diagonal_.nrm1( - thrust_exec_policy); + : laplacian_matrix_t(raft_handle, csr_m) { + edge_sum_ = laplacian_matrix_t::diagonal_.nrm1(); } // y = alpha*A*x + beta*y diff --git a/cpp/include/raft/spectral/modularity_maximization.hpp b/cpp/include/raft/spectral/modularity_maximization.hpp index f8dfe5daa3..1fe7819a7e 100644 --- a/cpp/include/raft/spectral/modularity_maximization.hpp +++ b/cpp/include/raft/spectral/modularity_maximization.hpp @@ -79,19 +79,18 @@ using namespace linalg; * performed. * @return error flag. */ -template +template std::tuple modularity_maximization( - handle_t const &handle, ThrustExePolicy thrust_exec_policy, - sparse_matrix_t const &csr_m, + handle_t const &handle, sparse_matrix_t const &csr_m, EigenSolver const &eigen_solver, ClusterSolver const &cluster_solver, vertex_t *__restrict__ clusters, weight_t *eigVals, weight_t *eigVecs) { RAFT_EXPECTS(clusters != nullptr, "Null clusters buffer."); RAFT_EXPECTS(eigVals != nullptr, "Null eigVals buffer."); RAFT_EXPECTS(eigVecs != nullptr, "Null eigVecs buffer."); - auto cublas_h = handle.get_cublas_handle(); auto stream = handle.get_stream(); + auto cublas_h = handle.get_cublas_handle(); std::tuple stats; // # iters eigen solver, cluster solver residual, # iters cluster solver @@ -101,7 +100,7 @@ std::tuple modularity_maximization( // Compute eigenvectors of Modularity Matrix // Initialize Modularity Matrix - modularity_matrix_t B{handle, thrust_exec_policy, csr_m}; + modularity_matrix_t B{handle, csr_m}; auto eigen_config = eigen_solver.get_config(); auto nEigVecs = eigen_config.n_eigVecs; @@ -111,7 +110,7 @@ std::tuple modularity_maximization( eigen_solver.solve_largest_eigenvectors(handle, B, eigVals, eigVecs); // Whiten eigenvector matrix - transform_eigen_matrix(handle, thrust_exec_policy, n, nEigVecs, eigVecs); + transform_eigen_matrix(handle, n, nEigVecs, eigVecs); // notice that at this point the matrix has already been transposed, so we are scaling // columns @@ -119,8 +118,8 @@ std::tuple modularity_maximization( CHECK_CUDA(stream); // Find partition clustering - auto pair_cluster = cluster_solver.solve(handle, thrust_exec_policy, n, - nEigVecs, eigVecs, clusters); + auto pair_cluster = + cluster_solver.solve(handle, n, nEigVecs, eigVecs, clusters); std::get<1>(stats) = pair_cluster.first; std::get<2>(stats) = pair_cluster.second; @@ -138,9 +137,8 @@ std::tuple modularity_maximization( * @param clusters (Input, device memory, n entries) Cluster assignments. * @param modularity On exit, modularity */ -template +template void analyzeModularity(handle_t const &handle, - ThrustExePolicy thrust_exec_policy, sparse_matrix_t const &csr_m, vertex_t nClusters, vertex_t const *__restrict__ clusters, @@ -163,15 +161,15 @@ void analyzeModularity(handle_t const &handle, cublassetpointermode(cublas_h, CUBLAS_POINTER_MODE_HOST, stream)); // Initialize Modularity - modularity_matrix_t B{handle, thrust_exec_policy, csr_m}; + modularity_matrix_t B{handle, csr_m}; // Initialize output modularity = 0; // Iterate through partitions for (i = 0; i < nClusters; ++i) { - if (!construct_indicator(handle, thrust_exec_policy, i, n, clustersize, - partModularity, clusters, part_i, Bx, B)) { + if (!construct_indicator(handle, i, n, clustersize, partModularity, + clusters, part_i, Bx, B)) { WARNING("empty partition"); continue; } @@ -180,7 +178,7 @@ void analyzeModularity(handle_t const &handle, modularity += partModularity; } - modularity = modularity / B.diagonal_.nrm1(thrust_exec_policy); + modularity = modularity / B.diagonal_.nrm1(); } } // namespace spectral diff --git a/cpp/include/raft/spectral/partition.hpp b/cpp/include/raft/spectral/partition.hpp index 841fca04d9..a994895886 100644 --- a/cpp/include/raft/spectral/partition.hpp +++ b/cpp/include/raft/spectral/partition.hpp @@ -62,19 +62,18 @@ using namespace linalg; * performed. * @return statistics: number of eigensolver iterations, . */ -template +template std::tuple partition( - handle_t const &handle, ThrustExePolicy thrust_exec_policy, - sparse_matrix_t const &csr_m, + handle_t const &handle, sparse_matrix_t const &csr_m, EigenSolver const &eigen_solver, ClusterSolver const &cluster_solver, vertex_t *__restrict__ clusters, weight_t *eigVals, weight_t *eigVecs) { RAFT_EXPECTS(clusters != nullptr, "Null clusters buffer."); RAFT_EXPECTS(eigVals != nullptr, "Null eigVals buffer."); RAFT_EXPECTS(eigVecs != nullptr, "Null eigVecs buffer."); - auto cublas_h = handle.get_cublas_handle(); auto stream = handle.get_stream(); + auto cublas_h = handle.get_cublas_handle(); std::tuple stats; //{iters_eig_solver,residual_cluster,iters_cluster_solver} // # iters eigen solver, cluster solver residual, # iters cluster solver @@ -89,7 +88,7 @@ std::tuple partition( // Initialize Laplacian ///sparse_matrix_t A{handle, graph}; - laplacian_matrix_t L{handle, thrust_exec_policy, csr_m}; + laplacian_matrix_t L{handle, csr_m}; auto eigen_config = eigen_solver.get_config(); auto nEigVecs = eigen_config.n_eigVecs; @@ -99,11 +98,11 @@ std::tuple partition( eigen_solver.solve_smallest_eigenvectors(handle, L, eigVals, eigVecs); // Whiten eigenvector matrix - transform_eigen_matrix(handle, thrust_exec_policy, n, nEigVecs, eigVecs); + transform_eigen_matrix(handle, n, nEigVecs, eigVecs); // Find partition clustering - auto pair_cluster = cluster_solver.solve(handle, thrust_exec_policy, n, - nEigVecs, eigVecs, clusters); + auto pair_cluster = + cluster_solver.solve(handle, n, nEigVecs, eigVecs, clusters); std::get<1>(stats) = pair_cluster.first; std::get<2>(stats) = pair_cluster.second; @@ -129,9 +128,8 @@ std::tuple partition( * @param cost On exit, partition cost function. * @return error flag. */ -template +template void analyzePartition(handle_t const &handle, - ThrustExePolicy thrust_exec_policy, sparse_matrix_t const &csr_m, vertex_t nClusters, const vertex_t *__restrict__ clusters, weight_t &edgeCut, weight_t &cost) { @@ -140,8 +138,8 @@ void analyzePartition(handle_t const &handle, vertex_t i; vertex_t n = csr_m.nrows_; - auto cublas_h = handle.get_cublas_handle(); auto stream = handle.get_stream(); + auto cublas_h = handle.get_cublas_handle(); weight_t partEdgesCut, clustersize; @@ -155,7 +153,7 @@ void analyzePartition(handle_t const &handle, // Initialize Laplacian ///sparse_matrix_t A{handle, graph}; - laplacian_matrix_t L{handle, thrust_exec_policy, csr_m}; + laplacian_matrix_t L{handle, csr_m}; // Initialize output cost = 0; @@ -164,8 +162,8 @@ void analyzePartition(handle_t const &handle, // Iterate through partitions for (i = 0; i < nClusters; ++i) { // Construct indicator vector for ith partition - if (!construct_indicator(handle, thrust_exec_policy, i, n, clustersize, - partEdgesCut, clusters, part_i, Lx, L)) { + if (!construct_indicator(handle, i, n, clustersize, partEdgesCut, clusters, + part_i, Lx, L)) { WARNING("empty partition"); continue; } diff --git a/cpp/include/raft/spectral/spectral_util.hpp b/cpp/include/raft/spectral/spectral_util.hpp index 40dde30a74..de9ff1917f 100644 --- a/cpp/include/raft/spectral/spectral_util.hpp +++ b/cpp/include/raft/spectral/spectral_util.hpp @@ -108,13 +108,12 @@ cudaError_t scale_obs(index_type_t m, index_type_t n, value_type_t* obs) { return cudaSuccess; } -template -void transform_eigen_matrix(handle_t const& handle, - ThrustExePolicy thrust_exec_policy, edge_t n, - vertex_t nEigVecs, weight_t* eigVecs) { - auto cublas_h = handle.get_cublas_handle(); +template +void transform_eigen_matrix(handle_t const& handle, edge_t n, vertex_t nEigVecs, + weight_t* eigVecs) { auto stream = handle.get_stream(); + auto cublas_h = handle.get_cublas_handle(); + auto thrust_exec_policy = handle.get_thrust_policy(); const weight_t zero{0.0}; const weight_t one{1.0}; @@ -187,16 +186,15 @@ struct equal_to_i_op { // Construct indicator vector for ith partition // -template -bool construct_indicator(handle_t const& handle, - ThrustExePolicy thrust_exec_policy, edge_t index, - edge_t n, weight_t& clustersize, weight_t& partStats, +template +bool construct_indicator(handle_t const& handle, edge_t index, edge_t n, + weight_t& clustersize, weight_t& partStats, vertex_t const* __restrict__ clusters, vector_t& part_i, vector_t& Bx, laplacian_matrix_t const& B) { - auto cublas_h = handle.get_cublas_handle(); auto stream = handle.get_stream(); + auto cublas_h = handle.get_cublas_handle(); + auto thrust_exec_policy = handle.get_thrust_policy(); thrust::for_each(thrust_exec_policy, thrust::make_zip_iterator(thrust::make_tuple( diff --git a/cpp/test/cluster_solvers.cu b/cpp/test/cluster_solvers.cu index 4ff6cdf5fa..d280b3e95c 100644 --- a/cpp/test/cluster_solvers.cu +++ b/cpp/test/cluster_solvers.cu @@ -49,8 +49,7 @@ TEST(Raft, ClusterSolvers) { kmeans_solver_t cluster_solver{cfg}; - EXPECT_ANY_THROW(cluster_solver.solve(h, thrust::cuda::par.on(stream), n, d, - eigvecs, codes)); + EXPECT_ANY_THROW(cluster_solver.solve(h, n, d, eigvecs, codes)); } TEST(Raft, ModularitySolvers) { @@ -89,14 +88,12 @@ TEST(Raft, ModularitySolvers) { auto stream = h.get_stream(); sparse_matrix_t sm{h, nullptr, nullptr, nullptr, 0, 0}; - auto t_exe_p = thrust::cuda::par.on(stream); EXPECT_ANY_THROW(spectral::modularity_maximization( - h, t_exe_p, sm, eig_solver, cluster_solver, clusters, eigvals, eigvecs)); + h, sm, eig_solver, cluster_solver, clusters, eigvals, eigvecs)); value_type modularity{0}; - EXPECT_ANY_THROW( - spectral::analyzeModularity(h, t_exe_p, sm, k, clusters, modularity)); + EXPECT_ANY_THROW(spectral::analyzeModularity(h, sm, k, clusters, modularity)); } } // namespace raft diff --git a/cpp/test/eigen_solvers.cu b/cpp/test/eigen_solvers.cu index e6ee09262e..8025d8dcd6 100644 --- a/cpp/test/eigen_solvers.cu +++ b/cpp/test/eigen_solvers.cu @@ -102,16 +102,15 @@ TEST(Raft, SpectralSolvers) { auto stream = h.get_stream(); - auto t_exe_p = thrust::cuda::par.on(stream); sparse_matrix_t sm{h, nullptr, nullptr, nullptr, 0, 0}; - EXPECT_ANY_THROW(spectral::partition( - h, t_exe_p, sm, eig_solver, cluster_solver, clusters, eigvals, eigvecs)); + EXPECT_ANY_THROW(spectral::partition(h, sm, eig_solver, cluster_solver, + clusters, eigvals, eigvecs)); value_type edgeCut{0}; value_type cost{0}; EXPECT_ANY_THROW( - spectral::analyzePartition(h, t_exe_p, sm, k, clusters, edgeCut, cost)); + spectral::analyzePartition(h, sm, k, clusters, edgeCut, cost)); } } // namespace raft diff --git a/cpp/test/spectral_matrix.cu b/cpp/test/spectral_matrix.cu index e5c2d52764..b85d35e3f8 100644 --- a/cpp/test/spectral_matrix.cu +++ b/cpp/test/spectral_matrix.cu @@ -57,27 +57,24 @@ TEST(Raft, SpectralMatrices) { ASSERT_EQ(nullptr, sm2.row_offsets_); auto stream = h.get_stream(); - auto t_exe_pol = thrust::cuda::par.on(stream); - auto cnstr_lm1 = [&h, t_exe_pol, ro, ci, vs, nrows, nnz](void) { - laplacian_matrix_t lm1{h, t_exe_pol, ro, ci, - vs, nrows, nnz}; + auto cnstr_lm1 = [&h, ro, ci, vs, nrows, nnz](void) { + laplacian_matrix_t lm1{h, ro, ci, vs, nrows, nnz}; }; EXPECT_ANY_THROW(cnstr_lm1()); // because of nullptr ptr args - auto cnstr_lm2 = [&h, t_exe_pol, &sm2](void) { - laplacian_matrix_t lm2{h, t_exe_pol, sm2}; + auto cnstr_lm2 = [&h, &sm2](void) { + laplacian_matrix_t lm2{h, sm2}; }; EXPECT_ANY_THROW(cnstr_lm2()); // because of nullptr ptr args - auto cnstr_mm1 = [&h, t_exe_pol, ro, ci, vs, nrows, nnz](void) { - modularity_matrix_t mm1{h, t_exe_pol, ro, ci, - vs, nrows, nnz}; + auto cnstr_mm1 = [&h, ro, ci, vs, nrows, nnz](void) { + modularity_matrix_t mm1{h, ro, ci, vs, nrows, nnz}; }; EXPECT_ANY_THROW(cnstr_mm1()); // because of nullptr ptr args - auto cnstr_mm2 = [&h, t_exe_pol, &sm2](void) { - modularity_matrix_t mm2{h, t_exe_pol, sm2}; + auto cnstr_mm2 = [&h, &sm2](void) { + modularity_matrix_t mm2{h, sm2}; }; EXPECT_ANY_THROW(cnstr_mm2()); // because of nullptr ptr args }