From d076399b3719c4127803aa08c2aa0d35d597a53a Mon Sep 17 00:00:00 2001 From: "Corey J. Nolet" Date: Wed, 24 Mar 2021 01:34:50 -0400 Subject: [PATCH] Fixing RAFT CI & a few small updates for SLHC Python wrapper (#178) Authors: - Corey J. Nolet (@cjnolet) Approvers: - Victor Lafargue (@viclafargue) - Alex Fender (@afender) URL: https://github.com/rapidsai/raft/pull/178 --- cpp/cmake/Dependencies.cmake | 2 +- cpp/include/raft/sparse/hierarchy/common.h | 7 +- .../sparse/hierarchy/detail/agglomerative.cuh | 7 +- .../hierarchy/detail/connectivities.cuh | 2 + .../raft/sparse/hierarchy/detail/mst.cuh | 20 +- .../raft/sparse/mst/detail/mst_solver_inl.cuh | 2 - cpp/include/raft/sparse/op/sort.h | 2 + .../sparse/selection/connect_components.cuh | 193 ++++++++++-------- cpp/test/sparse/connect_components.cu | 2 - cpp/test/sparse/linkage.cu | 116 ++++++++++- 10 files changed, 246 insertions(+), 107 deletions(-) diff --git a/cpp/cmake/Dependencies.cmake b/cpp/cmake/Dependencies.cmake index 080efb5b1f..e1e21a9cd3 100644 --- a/cpp/cmake/Dependencies.cmake +++ b/cpp/cmake/Dependencies.cmake @@ -23,7 +23,7 @@ if(NOT CUB_IS_PART_OF_CTK) set(CUB_DIR ${CMAKE_CURRENT_BINARY_DIR}/cub CACHE STRING "Path to cub repo") ExternalProject_Add(cub GIT_REPOSITORY https://github.com/thrust/cub.git - GIT_TAG 1.8.0 + GIT_TAG 1.12.0 PREFIX ${CUB_DIR} CONFIGURE_COMMAND "" BUILD_COMMAND "" diff --git a/cpp/include/raft/sparse/hierarchy/common.h b/cpp/include/raft/sparse/hierarchy/common.h index 48326bd347..29f541498b 100644 --- a/cpp/include/raft/sparse/hierarchy/common.h +++ b/cpp/include/raft/sparse/hierarchy/common.h @@ -29,7 +29,8 @@ enum LinkageDistance { PAIRWISE = 0, KNN_GRAPH = 1 }; * @tparam value_t */ template -struct linkage_output { +class linkage_output { + public: value_idx m; value_idx n_clusters; @@ -41,8 +42,8 @@ struct linkage_output { value_idx *children; // size: (m-1, 2) }; -struct linkage_output_int_float : public linkage_output {}; -struct linkage_output__int64_float : public linkage_output {}; +class linkage_output_int_float : public linkage_output {}; +class linkage_output__int64_float : public linkage_output {}; }; // namespace hierarchy }; // namespace raft \ No newline at end of file diff --git a/cpp/include/raft/sparse/hierarchy/detail/agglomerative.cuh b/cpp/include/raft/sparse/hierarchy/detail/agglomerative.cuh index aa2cfc8fe9..e94fdfb970 100644 --- a/cpp/include/raft/sparse/hierarchy/detail/agglomerative.cuh +++ b/cpp/include/raft/sparse/hierarchy/detail/agglomerative.cuh @@ -103,6 +103,8 @@ void build_dendrogram_host(const handle_t &handle, const value_idx *rows, std::vector mst_dst_h(n_edges); std::vector mst_weights_h(n_edges); + printf("n_edges: %d\n", n_edges); + update_host(mst_src_h.data(), rows, n_edges, stream); update_host(mst_dst_h.data(), cols, n_edges, stream); update_host(mst_weights_h.data(), data, n_edges, stream); @@ -132,6 +134,8 @@ void build_dendrogram_host(const handle_t &handle, const value_idx *rows, out_size.resize(n_edges, stream); + printf("Finished dendrogram\n"); + raft::update_device(children, children_h.data(), n_edges * 2, stream); raft::update_device(out_size.data(), out_size_h.data(), n_edges, stream); } @@ -319,9 +323,6 @@ void extract_flattened_clusters(const raft::handle_t &handle, value_idx *labels, raft::copy_async(label_roots.data(), children + children_cpy_start, child_size, stream); - // thrust::device_ptr t_label_roots = - // thrust::device_pointer_cast(label_roots.data()); - // thrust::sort(thrust_policy, label_roots.data(), label_roots.data() + (child_size), thrust::greater()); diff --git a/cpp/include/raft/sparse/hierarchy/detail/connectivities.cuh b/cpp/include/raft/sparse/hierarchy/detail/connectivities.cuh index 229f2034b0..2fe53ad204 100644 --- a/cpp/include/raft/sparse/hierarchy/detail/connectivities.cuh +++ b/cpp/include/raft/sparse/hierarchy/detail/connectivities.cuh @@ -68,6 +68,8 @@ struct distance_graph_impl connect_knn_graph( raft::copy_async(msf.weights.data() + msf.n_edges, connected_edges.vals(), connected_edges.nnz, stream); + printf("connected components nnz: %d\n", final_nnz); raft::sparse::COO final_coo(d_alloc, stream); raft::sparse::linalg::symmetrize(handle, msf.src.data(), msf.dst.data(), msf.weights.data(), m, n, final_nnz, @@ -162,16 +163,25 @@ void build_sorted_mst(const raft::handle_t &handle, const value_t *X, handle, indptr, indices, pw_dists, (value_idx)m, nnz, color.data(), stream, false); - if (linkage::get_n_components(color.data(), m, stream) > 1) { + int iters = 1; + int n_components = linkage::get_n_components(color.data(), m, stream); + while (n_components > 1 && iters < 100) { + printf("Found %d components. Going to try connecting graph\n", + n_components); mst_coo = connect_knn_graph(handle, X, mst_coo, m, n, color.data()); - printf("Edges: %d\n", mst_coo.n_edges); + iters++; - RAFT_EXPECTS( - mst_coo.n_edges == m - 1, - "MST was not able to connect knn graph in a single iteration."); + n_components = linkage::get_n_components(color.data(), m, stream); + // + // printf("Connecting knn graph!\n"); + // + // RAFT_EXPECTS( + // mst_coo.n_edges == m - 1, + // "MST was not able to connect knn graph in a single iteration."); } + printf("Found %d components.\n", n_components); sort_coo_by_data(mst_coo.src.data(), mst_coo.dst.data(), mst_coo.weights.data(), mst_coo.n_edges, stream); diff --git a/cpp/include/raft/sparse/mst/detail/mst_solver_inl.cuh b/cpp/include/raft/sparse/mst/detail/mst_solver_inl.cuh index 0235994feb..4e9876b3ba 100644 --- a/cpp/include/raft/sparse/mst/detail/mst_solver_inl.cuh +++ b/cpp/include/raft/sparse/mst/detail/mst_solver_inl.cuh @@ -190,8 +190,6 @@ MST_solver::solve() { mst_result.dst.resize(mst_result.n_edges, stream); mst_result.weights.resize(mst_result.n_edges, stream); - // raft::print_device_vector("Colors before sending: ", color_index, 7, std::cout); - return mst_result; } diff --git a/cpp/include/raft/sparse/op/sort.h b/cpp/include/raft/sparse/op/sort.h index b039e52517..642ff2b8be 100644 --- a/cpp/include/raft/sparse/op/sort.h +++ b/cpp/include/raft/sparse/op/sort.h @@ -71,6 +71,8 @@ void coo_sort(int m, int n, int nnz, int *rows, int *cols, T *vals, CUSPARSE_CHECK(cusparseCreateIdentityPermutation(handle, nnz, d_P.data())); + printf("nnz: %d\n", nnz); + CUSPARSE_CHECK(cusparseXcoosortByRow(handle, m, n, nnz, rows, cols, d_P.data(), pBuffer.data())); diff --git a/cpp/include/raft/sparse/selection/connect_components.cuh b/cpp/include/raft/sparse/selection/connect_components.cuh index a1944fc0b3..8f06a85ddc 100644 --- a/cpp/include/raft/sparse/selection/connect_components.cuh +++ b/cpp/include/raft/sparse/selection/connect_components.cuh @@ -105,6 +105,22 @@ struct FixConnectivitiesRedOp { } }; +struct TupleComp { + template + __host__ __device__ bool operator()(const one &t1, const two &t2) { + // sort first by each sample's color, + if (thrust::get<0>(t1) < thrust::get<0>(t2)) return true; + if (thrust::get<0>(t1) > thrust::get<0>(t2)) return false; + + // then by the color of each sample's closest neighbor, + if (thrust::get<1>(t1) < thrust::get<1>(t2)) return true; + if (thrust::get<1>(t1) > thrust::get<1>(t2)) return false; + + // then sort by value in descending order + return thrust::get<2>(t1).value > thrust::get<2>(t2).value; + } +}; + /** * Count the unique vertices adjacent to each component. * This is essentially a count_unique_by_key. @@ -117,26 +133,75 @@ __global__ void count_components_by_color_kernel(value_idx *out_indptr, value_idx tid = threadIdx.x; value_idx row = blockIdx.x; - __shared__ extern value_idx count_smem[]; - value_idx start_offset = colors_indptr[row]; value_idx stop_offset = colors_indptr[row + 1]; - for (value_idx i = tid; i < n_colors; i += blockDim.x) { - count_smem[i] = 0; + value_idx agg = 0; + + typedef cub::BlockReduce BlockReduce; + __shared__ typename BlockReduce::TempStorage temp_storage; + + value_idx v = 0; + for (value_idx i = tid; i < (stop_offset - start_offset) - 1; + i += blockDim.x) { + // Columns are sorted by row so only columns that are != next column + // should get counted + v += (colors_nn[start_offset + i + 1] != colors_nn[start_offset + i]) + // last thread should always write something + || i == (stop_offset - start_offset - 2); } - __syncthreads(); + agg = BlockReduce(temp_storage).Reduce(v, cub::Sum()); + + if (threadIdx.x == 0) out_indptr[row] = agg; +} + +template +__global__ void min_components_by_color_kernel( + value_idx *out_cols, value_t *out_vals, value_idx *out_rows, + const value_idx *out_indptr, const value_idx *colors_indptr, + const value_idx *colors_nn, const value_idx *indices, + const cub::KeyValuePair *kvp, value_idx n_colors) { + __shared__ value_idx smem[2]; - for (value_idx i = tid; i < (stop_offset - start_offset); i += blockDim.x) { - count_smem[colors_nn[start_offset + i]] = 1; + value_idx *cur_offset = smem; + value_idx *mutex = smem + 1; + + if (threadIdx.x == 0) { + mutex[0] = 0; + cur_offset[0] = 0; } __syncthreads(); - for (value_idx i = tid; i < n_colors; i += blockDim.x) { - // TODO: Warp-level reduction - atomicAdd(out_indptr + row, count_smem[i] > 0); + value_idx tid = threadIdx.x; + value_idx row = blockIdx.x; + + value_idx out_offset = out_indptr[blockIdx.x]; + + value_idx start_offset = colors_indptr[row]; + value_idx stop_offset = colors_indptr[row + 1]; + + for (value_idx i = tid; i < (stop_offset - start_offset) - 1; + i += blockDim.x) { + // Columns are sorted by row so ony columns that are != previous column + // should get counted + value_idx cur_color = colors_nn[start_offset + i]; + if (colors_nn[start_offset + i + 1] != cur_color + // last thread should always write something + || (i == stop_offset - start_offset - 2)) { + while (atomicCAS(mutex, 0, 1) == 1) + ; + __threadfence(); + + value_idx local_offset = cur_offset[0]; + out_rows[out_offset + local_offset] = indices[start_offset + i]; + out_cols[out_offset + local_offset] = kvp[start_offset + i].key; + out_vals[out_offset + local_offset] = kvp[start_offset + i].value; + cur_offset[0] += 1; + __threadfence(); + atomicCAS(mutex, 1, 0); + } } } @@ -155,79 +220,23 @@ void count_components_by_color(value_idx *out_indptr, const value_idx *colors_indptr, const value_idx *colors_nn, value_idx n_colors, cudaStream_t stream) { - count_components_by_color_kernel<<>>( + count_components_by_color_kernel<<>>( out_indptr, colors_indptr, colors_nn, n_colors); } -/** - * colors_nn is not assumed to be sorted wrt colors_indptr - * so we need to perform atomic reductions in each thread. - */ -template -__global__ void min_components_by_color_kernel( - value_idx *out_cols, value_t *out_vals, value_idx *out_rows, - const value_idx *out_indptr, const value_idx *colors_indptr, - const value_idx *colors_nn, const value_idx *indices, - const cub::KeyValuePair *kvp, value_idx n_colors) { - __shared__ extern char min_smem[]; +template +struct CubKVPMinReduce { + typedef cub::KeyValuePair KVP; - int *mutex = (int *)min_smem; - - cub::KeyValuePair *min = - (cub::KeyValuePair *)(mutex + n_colors); - value_idx *src_inds = (value_idx *)(min + n_colors); - - value_idx start_offset = colors_indptr[blockIdx.x]; - value_idx stop_offset = colors_indptr[blockIdx.x + 1]; - - // initialize - for (value_idx i = threadIdx.x; i < n_colors; i += blockDim.x) { - mutex[i] = 0; - auto skvp = min + i; - skvp->key = -1; - skvp->value = std::numeric_limits::max(); + DI KVP operator()(LabelT rit, const KVP &a, const KVP &b) { + return b.value < a.value ? b : a; } - __syncthreads(); - - for (value_idx i = threadIdx.x; i < (stop_offset - start_offset); - i += blockDim.x) { - value_idx new_color = colors_nn[start_offset + i]; - while (atomicCAS(mutex + new_color, 0, 1) == 1) - ; - __threadfence(); - auto cur_kvp = kvp[start_offset + i]; - if (cur_kvp.value < min[new_color].value) { - src_inds[new_color] = indices[start_offset + i]; - min[new_color].key = cur_kvp.key; - min[new_color].value = cur_kvp.value; - } - __threadfence(); - atomicCAS(mutex + new_color, 1, 0); + DI KVP operator()(const KVP &a, const KVP &b) { + return b.value < a.value ? b : a; } - __syncthreads(); - - // printf("block %d thread %d did final sync\n", blockIdx.x, threadIdx.x); - - value_idx out_offset = out_indptr[blockIdx.x]; - - // TODO: Do this across threads, using an atomic counter for each color - if (threadIdx.x == 0) { - value_idx cur_offset = 0; - - for (value_idx i = 0; i < n_colors; i++) { - auto min_color = min[i]; - if (min_color.key > -1) { - out_rows[out_offset + cur_offset] = src_inds[i]; - out_cols[out_offset + cur_offset] = min_color.key; - out_vals[out_offset + cur_offset] = min_color.value; - cur_offset += 1; - } - } - } -} +}; // KVPMinReduce /** * Computes the min set of unique components that neighbor the @@ -251,10 +260,12 @@ void min_components_by_color(raft::sparse::COO &coo, const value_idx *indices, const cub::KeyValuePair *kvp, value_idx n_colors, cudaStream_t stream) { - int smem_bytes = (n_colors * sizeof(int)) + (n_colors * sizeof(kvp)) + - ((n_colors + 1) * sizeof(value_idx)); - - min_components_by_color_kernel<<>>( + /** + * Arrays should be ordered by: colors_indptr->colors_n->kvp.value + * so the last element of each column in the input CSR should be + * the min. + */ + min_components_by_color_kernel<<>>( coo.cols(), coo.vals(), coo.rows(), out_indptr, colors_indptr, colors_nn, indices, kvp, n_colors); } @@ -385,13 +396,13 @@ void sort_by_color(value_idx *colors, value_idx *nn_colors, thrust::copy(thrust::cuda::par.on(stream), arg_sort_iter, arg_sort_iter + n_rows, src_indices); - auto keys = thrust::make_zip_iterator(thrust::make_tuple(colors)); - auto vals = thrust::make_zip_iterator( - thrust::make_tuple((raft::linkage::KeyValuePair *)kvp, - src_indices, nn_colors)); + auto keys = thrust::make_zip_iterator(thrust::make_tuple( + colors, nn_colors, (raft::linkage::KeyValuePair *)kvp)); + auto vals = thrust::make_zip_iterator(thrust::make_tuple(src_indices)); // get all the colors in contiguous locations so we can map them to warps. - thrust::sort_by_key(thrust::cuda::par.on(stream), keys, keys + n_rows, vals); + thrust::sort_by_key(thrust::cuda::par.on(stream), keys, keys + n_rows, vals, + TupleComp()); } /** @@ -433,9 +444,15 @@ void connect_components(const raft::handle_t &handle, rmm::device_uvector color_neigh_degrees(n_components + 1, stream); rmm::device_uvector colors_indptr(n_components + 1, stream); + printf("Performing 1nn\n"); + perform_1nn(temp_inds_dists.data(), nn_colors.data(), colors, X, n_rows, n_cols, d_alloc, stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + + printf("Sorting by color\n"); + /** * Sort data points by color (neighbors are not sorted) */ @@ -448,23 +465,31 @@ void connect_components(const raft::handle_t &handle, raft::sparse::convert::sorted_coo_to_csr(colors, n_rows, colors_indptr.data(), n_components + 1, d_alloc, stream); + printf("Building output colors indptr\n"); // create output degree array for closest components per row build_output_colors_indptr(color_neigh_degrees.data(), colors_indptr.data(), nn_colors.data(), n_components, stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); value_idx nnz; raft::update_host(&nnz, color_neigh_degrees.data() + n_components, 1, stream); CUDA_CHECK(cudaStreamSynchronize(stream)); + printf("Min components by color\n"); raft::sparse::COO min_edges(d_alloc, stream, nnz); min_components_by_color(min_edges, color_neigh_degrees.data(), colors_indptr.data(), nn_colors.data(), src_indices.data(), temp_inds_dists.data(), n_components, stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); // symmetrize + + printf("Symmetrize\n"); raft::sparse::linalg::symmetrize(handle, min_edges.rows(), min_edges.cols(), min_edges.vals(), n_rows, n_rows, nnz, out); + + CUDA_CHECK(cudaStreamSynchronize(stream)); } }; // end namespace linkage diff --git a/cpp/test/sparse/connect_components.cu b/cpp/test/sparse/connect_components.cu index 965dc20330..241062f54f 100644 --- a/cpp/test/sparse/connect_components.cu +++ b/cpp/test/sparse/connect_components.cu @@ -104,8 +104,6 @@ class ConnectComponentsTest : public ::testing::TestWithParam< handle, indptr.data(), knn_graph_coo.cols(), knn_graph_coo.vals(), params.n_row, knn_graph_coo.nnz, colors.data(), stream, false); - CUDA_CHECK(cudaStreamSynchronize(stream)); - printf("Got here.\n"); raft::print_device_vector("colors", colors.data(), params.n_row, std::cout); diff --git a/cpp/test/sparse/linkage.cu b/cpp/test/sparse/linkage.cu index fd1aca92ca..ce567e4298 100644 --- a/cpp/test/sparse/linkage.cu +++ b/cpp/test/sparse/linkage.cu @@ -46,9 +46,110 @@ struct LinkageInputs { int c; }; +/** + * @brief kernel to calculate the values of a and b + * @param firstClusterArray: the array of classes of type T + * @param secondClusterArray: the array of classes of type T + * @param size: the size of the data points + * @param a: number of pairs of points that both the clusters have classified the same + * @param b: number of pairs of points that both the clusters have classified differently + */ +template +__global__ void computeTheNumerator(const T* firstClusterArray, + const T* secondClusterArray, uint64_t size, + uint64_t* a, uint64_t* b) { + //calculating the indices of pairs of datapoints compared by the current thread + uint64_t j = threadIdx.x + blockIdx.x * blockDim.x; + uint64_t i = threadIdx.y + blockIdx.y * blockDim.y; + + //thread-local variables to count a and b + uint64_t myA = 0, myB = 0; + + if (i < size && j < size && j < i) { + //checking if the pair have been classified the same by both the clusters + if (firstClusterArray[i] == firstClusterArray[j] && + secondClusterArray[i] == secondClusterArray[j]) { + ++myA; + } + + //checking if the pair have been classified differently by both the clusters + else if (firstClusterArray[i] != firstClusterArray[j] && + secondClusterArray[i] != secondClusterArray[j]) { + ++myB; + } + } + + //specialize blockReduce for a 2D block of 1024 threads of type uint64_t + typedef cub::BlockReduce + BlockReduce; + + //Allocate shared memory for blockReduce + __shared__ typename BlockReduce::TempStorage temp_storage; + + //summing up thread-local counts specific to a block + myA = BlockReduce(temp_storage).Sum(myA); + __syncthreads(); + myB = BlockReduce(temp_storage).Sum(myB); + __syncthreads(); + + //executed once per block + if (threadIdx.x == 0 && threadIdx.y == 0) { + raft::myAtomicAdd((unsigned long long int*)a, myA); + raft::myAtomicAdd((unsigned long long int*)b, myB); + } +} + +/** +* @brief Function to calculate RandIndex +* more info on rand index +* @param firstClusterArray: the array of classes of type T +* @param secondClusterArray: the array of classes of type T +* @param size: the size of the data points of type uint64_t +* @param allocator: object that takes care of temporary device memory allocation of type std::shared_ptr +* @param stream: the cudaStream object +*/ +template +double compute_rand_index( + T* firstClusterArray, T* secondClusterArray, uint64_t size, + std::shared_ptr allocator, cudaStream_t stream) { + //rand index for size less than 2 is not defined + ASSERT(size >= 2, "Rand Index for size less than 2 not defined!"); + + //allocating and initializing memory for a and b in the GPU + raft::mr::device::buffer arr_buf(allocator, stream, 2); + CUDA_CHECK(cudaMemsetAsync(arr_buf.data(), 0, 2 * sizeof(uint64_t), stream)); + + //kernel configuration + static const int BLOCK_DIM_Y = 16, BLOCK_DIM_X = 16; + dim3 numThreadsPerBlock(BLOCK_DIM_X, BLOCK_DIM_Y); + dim3 numBlocks(raft::ceildiv(size, numThreadsPerBlock.x), + raft::ceildiv(size, numThreadsPerBlock.y)); + + //calling the kernel + computeTheNumerator + <<>>( + firstClusterArray, secondClusterArray, size, arr_buf.data(), + arr_buf.data() + 1); + + //synchronizing and updating the calculated values of a and b from device to host + uint64_t ab_host[2] = {0}; + raft::update_host(ab_host, arr_buf.data(), 2, stream); + CUDA_CHECK(cudaStreamSynchronize(stream)); + + //error handling + CUDA_CHECK(cudaGetLastError()); + + //denominator + uint64_t nChooseTwo = size * (size - 1) / 2; + + //calculating the rand_index + return (double)(((double)(ab_host[0] + ab_host[1])) / (double)nChooseTwo); +} + template -::std::ostream &operator<<(::std::ostream &os, - const LinkageInputs &dims) { +::std::ostream& operator<<(::std::ostream& os, + const LinkageInputs& dims) { return os; } @@ -83,10 +184,14 @@ class LinkageTest : public ::testing::TestWithParam> { raft::hierarchy::single_linkage< IdxT, T, raft::hierarchy::LinkageDistance::KNN_GRAPH>( handle, data.data(), params.n_row, params.n_col, - raft::distance::DistanceType::L2Unexpanded, &out_arrs, params.c, + raft::distance::DistanceType::L2SqrtExpanded, &out_arrs, params.c, params.n_clusters); CUDA_CHECK(cudaStreamSynchronize(handle.get_stream())); + + score = + compute_rand_index(labels, labels_ref, params.n_row, + handle.get_device_allocator(), handle.get_stream()); } void SetUp() override { basicTest(); } @@ -491,10 +596,7 @@ const std::vector> linkage_inputsf2 = { -4}}; typedef LinkageTest LinkageTestF_Int; -TEST_P(LinkageTestF_Int, Result) { - EXPECT_TRUE( - raft::devArrMatch(labels, labels_ref, params.n_row, raft::Compare())); -} +TEST_P(LinkageTestF_Int, Result) { EXPECT_TRUE(score == 1.0); } INSTANTIATE_TEST_CASE_P(LinkageTest, LinkageTestF_Int, ::testing::ValuesIn(linkage_inputsf2));