diff --git a/cpp/include/cugraph/prims/property_op_utils.cuh b/cpp/include/cugraph/prims/property_op_utils.cuh index f61df807cdd..1168617ae63 100644 --- a/cpp/include/cugraph/prims/property_op_utils.cuh +++ b/cpp/include/cugraph/prims/property_op_utils.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,8 @@ namespace cugraph { +namespace detail { + template struct is_valid_edge_op { static constexpr bool value = false; @@ -43,6 +45,55 @@ struct is_valid_edge_op< static constexpr bool valid = true; }; +template +struct edge_op_result_type; + +template +struct edge_op_result_type< + key_t, + vertex_t, + weight_t, + row_value_t, + col_value_t, + EdgeOp, + std::enable_if_t>:: + valid>> { + using type = + typename std::invoke_result::type; +}; + +template +struct edge_op_result_type< + key_t, + vertex_t, + weight_t, + row_value_t, + col_value_t, + EdgeOp, + std::enable_if_t>::valid>> { + using type = typename std::invoke_result::type; +}; + +} // namespace detail + template :: + type; template __device__ - std::enable_if_t>::valid, + std::enable_if_t>::valid, typename std::invoke_result::type> compute(K r, V c, W w, R rv, C cv, E e) { @@ -74,9 +128,10 @@ struct evaluate_edge_op { typename R = row_value_type, typename C = col_value_type, typename E = EdgeOp> - __device__ std::enable_if_t>::valid, - typename std::invoke_result::type> - compute(K r, V c, W w, R rv, C cv, E e) + __device__ + std::enable_if_t>::valid, + typename std::invoke_result::type> + compute(K r, V c, W w, R rv, C cv, E e) { return e(r, c, rv, cv); } @@ -104,7 +159,8 @@ struct cast_edge_op_bool_to_integer { typename C = col_value_type, typename E = EdgeOp> __device__ - std::enable_if_t>::valid, T> + std::enable_if_t>::valid, + T> operator()(K r, V c, W w, R rv, C cv) { return e_op(r, c, w, rv, cv) ? T{1} : T{0}; @@ -116,7 +172,7 @@ struct cast_edge_op_bool_to_integer { typename C = col_value_type, typename E = EdgeOp> __device__ - std::enable_if_t>::valid, T> + std::enable_if_t>::valid, T> operator()(K r, V c, R rv, C cv) { return e_op(r, c, rv, cv) ? T{1} : T{0}; diff --git a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh index f88237e0a67..f2cad3166e8 100644 --- a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh @@ -180,61 +180,32 @@ struct check_invalid_bucket_idx_t { } }; -template -__device__ void push_if_buffer_element( - matrix_partition_device_view_t& matrix_partition, - typename std::iterator_traits::value_type key, - typename GraphViewType::vertex_type row_offset, - typename GraphViewType::vertex_type col, - typename GraphViewType::weight_type weight, - AdjMatrixRowValueInputWrapper adj_matrix_row_value_input, - AdjMatrixColValueInputWrapper adj_matrix_col_value_input, - BufferKeyOutputIterator buffer_key_output_first, - BufferPayloadOutputIterator buffer_payload_output_first, - size_t* buffer_idx_ptr, - EdgeOp e_op) + typename BufferPayloadOutputIterator> +__device__ void push_buffer_element(vertex_t col, + e_op_result_t e_op_result, + BufferKeyOutputIterator buffer_key_output_first, + BufferPayloadOutputIterator buffer_payload_output_first, + size_t buffer_idx) { - using vertex_t = typename GraphViewType::vertex_type; - using key_t = typename std::iterator_traits::value_type; + using key_t = typename std::iterator_traits::value_type; using payload_t = typename optional_payload_buffer_value_type_t::value; - auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); - auto e_op_result = evaluate_edge_op() - .compute(key, - col, - weight, - adj_matrix_row_value_input.get(row_offset), - adj_matrix_col_value_input.get(col_offset), - e_op); - if (e_op_result) { - static_assert(sizeof(unsigned long long int) == sizeof(size_t)); - auto buffer_idx = atomicAdd(reinterpret_cast(buffer_idx_ptr), - static_cast(1)); - if constexpr (std::is_same_v && std::is_same_v) { - *(buffer_key_output_first + buffer_idx) = col; - } else if constexpr (std::is_same_v && !std::is_same_v) { - *(buffer_key_output_first + buffer_idx) = col; - *(buffer_payload_output_first + buffer_idx) = *e_op_result; - } else if constexpr (!std::is_same_v && std::is_same_v) { - *(buffer_key_output_first + buffer_idx) = thrust::make_tuple(col, *e_op_result); - } else { - *(buffer_key_output_first + buffer_idx) = - thrust::make_tuple(col, thrust::get<0>(*e_op_result)); - *(buffer_payload_output_first + buffer_idx) = thrust::get<1>(*e_op_result); - } + assert(e_op_result.has_value()); + + if constexpr (std::is_same_v && std::is_same_v) { + *(buffer_key_output_first + buffer_idx) = col; + } else if constexpr (std::is_same_v && !std::is_same_v) { + *(buffer_key_output_first + buffer_idx) = col; + *(buffer_payload_output_first + buffer_idx) = *e_op_result; + } else if constexpr (!std::is_same_v && std::is_same_v) { + *(buffer_key_output_first + buffer_idx) = thrust::make_tuple(col, *e_op_result); + } else { + *(buffer_key_output_first + buffer_idx) = thrust::make_tuple(col, thrust::get<0>(*e_op_result)); + *(buffer_payload_output_first + buffer_idx) = thrust::get<1>(*e_op_result); } } @@ -268,46 +239,140 @@ __global__ void for_all_frontier_row_for_all_nbr_hypersparse( std::is_same_v::value_type>); using payload_t = typename optional_payload_buffer_value_type_t::value; + using e_op_result_t = typename evaluate_edge_op::result_type; static_assert(!GraphViewType::is_adj_matrix_transposed, "GraphViewType should support the push model."); - auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto const warp_id = threadIdx.x / raft::warp_size(); + auto const lane_id = tid % raft::warp_size(); auto row_start_offset = static_cast(major_hypersparse_first - matrix_partition.get_major_first()); auto idx = static_cast(tid); - auto dcs_nzd_vertices = *(matrix_partition.get_dcs_nzd_vertices()); - auto dcs_nzd_vertex_count = *(matrix_partition.get_dcs_nzd_vertex_count()); - - while (idx < static_cast(thrust::distance(key_first, key_last))) { - auto key = *(key_first + idx); - vertex_t row{}; - if constexpr (std::is_same_v) { - row = key; - } else { - row = thrust::get<0>(key); + __shared__ edge_t + warp_local_degree_inclusive_sums[update_frontier_v_push_if_out_nbr_for_all_block_size]; + __shared__ edge_t + warp_key_local_edge_offsets[update_frontier_v_push_if_out_nbr_for_all_block_size]; + + using WarpScan = cub::WarpScan; + __shared__ typename WarpScan::TempStorage temp_storage; + + __shared__ size_t buffer_warp_start_indices[update_frontier_v_push_if_out_nbr_for_all_block_size / + raft::warp_size()]; + + auto indices = matrix_partition.get_indices(); + auto weights = matrix_partition.get_weights(); + + vertex_t num_keys = static_cast(thrust::distance(key_first, key_last)); + auto rounded_up_num_keys = + ((static_cast(num_keys) + (raft::warp_size() - 1)) / raft::warp_size()) * + raft::warp_size(); + while (idx < rounded_up_num_keys) { + auto min_key_idx = static_cast(idx - (idx % raft::warp_size())); // inclusive + auto max_key_idx = + static_cast(std::min(static_cast(min_key_idx) + raft::warp_size(), + static_cast(num_keys))); // exclusive + + // update warp_local_degree_inclusive_sums & warp_key_local_edge_offsets + + edge_t local_degree{0}; + if (lane_id < static_cast(max_key_idx - min_key_idx)) { + auto key = *(key_first + idx); + vertex_t row{}; + if constexpr (std::is_same_v) { + row = key; + } else { + row = thrust::get<0>(key); + } + auto row_hypersparse_idx = matrix_partition.get_major_hypersparse_idx_from_major_nocheck(row); + if (row_hypersparse_idx) { + auto row_idx = row_start_offset + *row_hypersparse_idx; + local_degree = matrix_partition.get_local_degree(row_idx); + warp_key_local_edge_offsets[threadIdx.x] = matrix_partition.get_local_offset(row_idx); + } else { + local_degree = edge_t{0}; + warp_key_local_edge_offsets[threadIdx.x] = edge_t{0}; // dummy + } } - auto row_hypersparse_idx = matrix_partition.get_major_hypersparse_idx_from_major_nocheck(row); - if (row_hypersparse_idx) { - auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); - auto row_idx = row_start_offset + *row_hypersparse_idx; - vertex_t const* indices{nullptr}; - thrust::optional weights{thrust::nullopt}; - edge_t local_out_degree{}; - thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_idx); - for (edge_t i = 0; i < local_out_degree; ++i) { - push_if_buffer_element(matrix_partition, - key, - row_offset, - indices[i], - weights ? (*weights)[i] : weight_t{1.0}, - adj_matrix_row_value_input, - adj_matrix_col_value_input, - buffer_key_output_first, - buffer_payload_output_first, - buffer_idx_ptr, - e_op); + WarpScan(temp_storage) + .InclusiveSum(local_degree, warp_local_degree_inclusive_sums[threadIdx.x]); + __syncwarp(); + + // process local edges for the keys in [key_first + min_key_idx, key_first + max_key_idx) + + auto num_edges_this_warp = warp_local_degree_inclusive_sums[warp_id * raft::warp_size() + + (max_key_idx - min_key_idx) - 1]; + auto rounded_up_num_edges_this_warp = + ((static_cast(num_edges_this_warp) + (raft::warp_size() - 1)) / raft::warp_size()) * + raft::warp_size(); + + for (size_t i = lane_id; i < rounded_up_num_edges_this_warp; i += raft::warp_size()) { + e_op_result_t e_op_result{}; + vertex_t col{}; + + if (i < static_cast(num_edges_this_warp)) { + auto key_idx_this_warp = static_cast(thrust::distance( + warp_local_degree_inclusive_sums + warp_id * raft::warp_size(), + thrust::upper_bound(thrust::seq, + warp_local_degree_inclusive_sums + warp_id * raft::warp_size(), + warp_local_degree_inclusive_sums + warp_id * raft::warp_size() + + (max_key_idx - min_key_idx), + i))); + auto local_edge_offset = + warp_key_local_edge_offsets[warp_id * raft::warp_size() + key_idx_this_warp] + + static_cast(i - + ((key_idx_this_warp == 0) + ? edge_t{0} + : warp_local_degree_inclusive_sums[warp_id * raft::warp_size() + + key_idx_this_warp - 1])); + auto key = *(key_first + (min_key_idx + key_idx_this_warp)); + vertex_t row{}; + if constexpr (std::is_same_v) { + row = key; + } else { + row = thrust::get<0>(key); + } + col = indices[local_edge_offset]; + auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); + auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); + e_op_result = evaluate_edge_op() + .compute(key, + col, + weights ? (*weights)[local_edge_offset] : weight_t{1.0}, + adj_matrix_row_value_input.get(row_offset), + adj_matrix_col_value_input.get(col_offset), + e_op); + } + auto ballot_e_op = + __ballot_sync(uint32_t{0xffffffff}, e_op_result ? uint32_t{1} : uint32_t{0}); + if (ballot_e_op) { + if (lane_id == 0) { + auto increment = __popc(ballot_e_op); + static_assert(sizeof(unsigned long long int) == sizeof(size_t)); + buffer_warp_start_indices[warp_id] = + static_cast(atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(increment))); + } + __syncwarp(); + if (e_op_result) { + auto buffer_warp_offset = + static_cast(__popc(ballot_e_op & ~(uint32_t{0xffffffff} << lane_id))); + push_buffer_element(col, + e_op_result, + buffer_key_output_first, + buffer_payload_output_first, + buffer_warp_start_indices[warp_id] + buffer_warp_offset); + } } } idx += gridDim.x * blockDim.x; @@ -343,39 +408,133 @@ __global__ void for_all_frontier_row_for_all_nbr_low_degree( std::is_same_v::value_type>); using payload_t = typename optional_payload_buffer_value_type_t::value; + using e_op_result_t = typename evaluate_edge_op::result_type; static_assert(!GraphViewType::is_adj_matrix_transposed, "GraphViewType should support the push model."); - auto const tid = threadIdx.x + blockIdx.x * blockDim.x; - auto idx = static_cast(tid); - - while (idx < static_cast(thrust::distance(key_first, key_last))) { - auto key = *(key_first + idx); - vertex_t row{}; - if constexpr (std::is_same_v) { - row = key; - } else { - row = thrust::get<0>(key); + auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto const warp_id = threadIdx.x / raft::warp_size(); + auto const lane_id = tid % raft::warp_size(); + auto idx = static_cast(tid); + + __shared__ edge_t + warp_local_degree_inclusive_sums[update_frontier_v_push_if_out_nbr_for_all_block_size]; + __shared__ edge_t + warp_key_local_edge_offsets[update_frontier_v_push_if_out_nbr_for_all_block_size]; + + using WarpScan = cub::WarpScan; + __shared__ typename WarpScan::TempStorage temp_storage; + + __shared__ size_t buffer_warp_start_indices[update_frontier_v_push_if_out_nbr_for_all_block_size / + raft::warp_size()]; + + auto indices = matrix_partition.get_indices(); + auto weights = matrix_partition.get_weights(); + + vertex_t num_keys = static_cast(thrust::distance(key_first, key_last)); + auto rounded_up_num_keys = + ((static_cast(num_keys) + (raft::warp_size() - 1)) / raft::warp_size()) * + raft::warp_size(); + while (idx < rounded_up_num_keys) { + auto min_key_idx = static_cast(idx - (idx % raft::warp_size())); // inclusive + auto max_key_idx = + static_cast(std::min(static_cast(min_key_idx) + raft::warp_size(), + static_cast(num_keys))); // exclusive + + // update warp_local_degree_inclusive_sums & warp_key_local_edge_offsets + + edge_t local_degree{0}; + if (lane_id < static_cast(max_key_idx - min_key_idx)) { + auto key = *(key_first + idx); + vertex_t row{}; + if constexpr (std::is_same_v) { + row = key; + } else { + row = thrust::get<0>(key); + } + auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); + local_degree = matrix_partition.get_local_degree(row_offset); + warp_key_local_edge_offsets[threadIdx.x] = matrix_partition.get_local_offset(row_offset); } - auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); - vertex_t const* indices{nullptr}; - thrust::optional weights{thrust::nullopt}; - edge_t local_out_degree{}; - thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_offset); - for (edge_t i = 0; i < local_out_degree; ++i) { - push_if_buffer_element(matrix_partition, - key, - row_offset, - indices[i], - weights ? (*weights)[i] : weight_t{1.0}, - adj_matrix_row_value_input, - adj_matrix_col_value_input, - buffer_key_output_first, - buffer_payload_output_first, - buffer_idx_ptr, - e_op); + WarpScan(temp_storage) + .InclusiveSum(local_degree, warp_local_degree_inclusive_sums[threadIdx.x]); + __syncwarp(); + + // processes local edges for the keys in [key_first + min_key_idx, key_first + max_key_idx) + + auto num_edges_this_warp = warp_local_degree_inclusive_sums[warp_id * raft::warp_size() + + (max_key_idx - min_key_idx) - 1]; + auto rounded_up_num_edges_this_warp = + ((static_cast(num_edges_this_warp) + (raft::warp_size() - 1)) / raft::warp_size()) * + raft::warp_size(); + for (size_t i = lane_id; i < rounded_up_num_edges_this_warp; i += raft::warp_size()) { + e_op_result_t e_op_result{}; + vertex_t col{}; + + if (i < static_cast(num_edges_this_warp)) { + auto key_idx_this_warp = static_cast(thrust::distance( + warp_local_degree_inclusive_sums + warp_id * raft::warp_size(), + thrust::upper_bound(thrust::seq, + warp_local_degree_inclusive_sums + warp_id * raft::warp_size(), + warp_local_degree_inclusive_sums + warp_id * raft::warp_size() + + (max_key_idx - min_key_idx), + i))); + auto local_edge_offset = + warp_key_local_edge_offsets[warp_id * raft::warp_size() + key_idx_this_warp] + + static_cast(i - + ((key_idx_this_warp == 0) + ? edge_t{0} + : warp_local_degree_inclusive_sums[warp_id * raft::warp_size() + + key_idx_this_warp - 1])); + auto key = *(key_first + (min_key_idx + key_idx_this_warp)); + vertex_t row{}; + if constexpr (std::is_same_v) { + row = key; + } else { + row = thrust::get<0>(key); + } + col = indices[local_edge_offset]; + auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); + auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); + e_op_result = evaluate_edge_op() + .compute(key, + col, + weights ? (*weights)[local_edge_offset] : weight_t{1.0}, + adj_matrix_row_value_input.get(row_offset), + adj_matrix_col_value_input.get(col_offset), + e_op); + } + auto ballot = __ballot_sync(uint32_t{0xffffffff}, e_op_result ? uint32_t{1} : uint32_t{0}); + if (ballot > 0) { + if (lane_id == 0) { + auto increment = __popc(ballot); + static_assert(sizeof(unsigned long long int) == sizeof(size_t)); + buffer_warp_start_indices[warp_id] = + static_cast(atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(increment))); + } + __syncwarp(); + if (e_op_result) { + auto buffer_warp_offset = + static_cast(__popc(ballot & ~(uint32_t{0xffffffff} << lane_id))); + push_buffer_element(col, + e_op_result, + buffer_key_output_first, + buffer_payload_output_first, + buffer_warp_start_indices[warp_id] + buffer_warp_offset); + } + } } + idx += gridDim.x * blockDim.x; } } @@ -409,15 +568,23 @@ __global__ void for_all_frontier_row_for_all_nbr_mid_degree( std::is_same_v::value_type>); using payload_t = typename optional_payload_buffer_value_type_t::value; + using e_op_result_t = typename evaluate_edge_op::result_type; static_assert(!GraphViewType::is_adj_matrix_transposed, "GraphViewType should support the push model."); auto const tid = threadIdx.x + blockIdx.x * blockDim.x; static_assert(update_frontier_v_push_if_out_nbr_for_all_block_size % raft::warp_size() == 0); + auto const warp_id = threadIdx.x / raft::warp_size(); auto const lane_id = tid % raft::warp_size(); auto idx = static_cast(tid / raft::warp_size()); + __shared__ size_t buffer_warp_start_indices[update_frontier_v_push_if_out_nbr_for_all_block_size / + raft::warp_size()]; while (idx < static_cast(thrust::distance(key_first, key_last))) { auto key = *(key_first + idx); vertex_t row{}; @@ -431,18 +598,48 @@ __global__ void for_all_frontier_row_for_all_nbr_mid_degree( thrust::optional weights{thrust::nullopt}; edge_t local_out_degree{}; thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_offset); - for (edge_t i = lane_id; i < local_out_degree; i += raft::warp_size()) { - push_if_buffer_element(matrix_partition, - key, - row_offset, - indices[i], - weights ? (*weights)[i] : weight_t{1.0}, - adj_matrix_row_value_input, - adj_matrix_col_value_input, - buffer_key_output_first, - buffer_payload_output_first, - buffer_idx_ptr, - e_op); + auto rounded_up_local_out_degree = + ((static_cast(local_out_degree) + (raft::warp_size() - 1)) / raft::warp_size()) * + raft::warp_size(); + for (size_t i = lane_id; i < rounded_up_local_out_degree; i += raft::warp_size()) { + e_op_result_t e_op_result{}; + vertex_t col{}; + + if (i < static_cast(local_out_degree)) { + col = indices[i]; + auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); + e_op_result = evaluate_edge_op() + .compute(key, + col, + weights ? (*weights)[i] : weight_t{1.0}, + adj_matrix_row_value_input.get(row_offset), + adj_matrix_col_value_input.get(col_offset), + e_op); + } + auto ballot = __ballot_sync(uint32_t{0xffffffff}, e_op_result ? uint32_t{1} : uint32_t{0}); + if (ballot > 0) { + if (lane_id == 0) { + auto increment = __popc(ballot); + static_assert(sizeof(unsigned long long int) == sizeof(size_t)); + buffer_warp_start_indices[warp_id] = + static_cast(atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(increment))); + } + __syncwarp(); + if (e_op_result) { + auto buffer_warp_offset = + static_cast(__popc(ballot & ~(uint32_t{0xffffffff} << lane_id))); + push_buffer_element(col, + e_op_result, + buffer_key_output_first, + buffer_payload_output_first, + buffer_warp_start_indices[warp_id] + buffer_warp_offset); + } + } } idx += gridDim.x * (blockDim.x / raft::warp_size()); @@ -478,12 +675,21 @@ __global__ void for_all_frontier_row_for_all_nbr_high_degree( std::is_same_v::value_type>); using payload_t = typename optional_payload_buffer_value_type_t::value; + using e_op_result_t = typename evaluate_edge_op::result_type; static_assert(!GraphViewType::is_adj_matrix_transposed, "GraphViewType should support the push model."); auto idx = static_cast(blockIdx.x); + using BlockScan = cub::BlockScan; + __shared__ typename BlockScan::TempStorage temp_storage; + __shared__ size_t buffer_block_start_idx; + while (idx < static_cast(thrust::distance(key_first, key_last))) { auto key = *(key_first + idx); vertex_t row{}; @@ -497,18 +703,50 @@ __global__ void for_all_frontier_row_for_all_nbr_high_degree( thrust::optional weights{thrust::nullopt}; edge_t local_out_degree{}; thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_offset); - for (edge_t i = threadIdx.x; i < local_out_degree; i += blockDim.x) { - push_if_buffer_element(matrix_partition, - key, - row_offset, - indices[i], - weights ? (*weights)[i] : weight_t{1.0}, - adj_matrix_row_value_input, - adj_matrix_col_value_input, - buffer_key_output_first, - buffer_payload_output_first, - buffer_idx_ptr, - e_op); + auto rounded_up_local_out_degree = + ((static_cast(local_out_degree) + + (update_frontier_v_push_if_out_nbr_for_all_block_size - 1)) / + update_frontier_v_push_if_out_nbr_for_all_block_size) * + update_frontier_v_push_if_out_nbr_for_all_block_size; + for (size_t i = threadIdx.x; i < rounded_up_local_out_degree; i += blockDim.x) { + e_op_result_t e_op_result{}; + vertex_t col{}; + edge_t buffer_block_offset{0}; + + if (i < static_cast(local_out_degree)) { + col = indices[i]; + auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); + e_op_result = evaluate_edge_op() + .compute(key, + col, + weights ? (*weights)[i] : weight_t{1.0}, + adj_matrix_row_value_input.get(row_offset), + adj_matrix_col_value_input.get(col_offset), + e_op); + } + BlockScan(temp_storage) + .ExclusiveSum(e_op_result ? edge_t{1} : edge_t{0}, buffer_block_offset); + if (threadIdx.x == (blockDim.x - 1)) { + auto increment = buffer_block_offset + (e_op_result ? edge_t{1} : edge_t{0}); + static_assert(sizeof(unsigned long long int) == sizeof(size_t)); + buffer_block_start_idx = increment > 0 + ? static_cast(atomicAdd( + reinterpret_cast(buffer_idx_ptr), + static_cast(increment))) + : size_t{0} /* dummy */; + } + __syncthreads(); + if (e_op_result) { + push_buffer_element(col, + e_op_result, + buffer_key_output_first, + buffer_payload_output_first, + buffer_block_start_idx + buffer_block_offset); + } } idx += gridDim.x; @@ -544,8 +782,6 @@ size_t sort_and_reduce_buffer_elements(raft::handle_t const& handle, num_reduced_buffer_elements = static_cast(thrust::distance(buffer_key_output_first, it)); } else if constexpr (std::is_same>::value) { - // FIXME: if ReducOp is any, we may have a cheaper alternative than sort & uique (i.e. discard - // non-first elements) auto it = thrust::unique_by_key(execution_policy, buffer_key_output_first, buffer_key_output_first + num_buffer_elements, @@ -553,15 +789,6 @@ size_t sort_and_reduce_buffer_elements(raft::handle_t const& handle, num_reduced_buffer_elements = static_cast(thrust::distance(buffer_key_output_first, thrust::get<0>(it))); } else { - // FIXME: better avoid temporary buffer or at least limit the maximum buffer size (if we adopt - // CUDA cooperative group https://devblogs.nvidia.com/cooperative-groups and global sync(), we - // can use aggregate shared memory as a temporary buffer, or we can limit the buffer size, and - // split one thrust::reduce_by_key call to multiple thrust::reduce_by_key calls if the - // temporary buffer size exceeds the maximum buffer size (may be definied as percentage of the - // system HBM size or a function of the maximum number of threads in the system)) - // FIXME: actually, we can find how many unique keys are here by now. - // FIXME: if GraphViewType::is_multi_gpu is true, this should be executed on the GPU holding - // the vertex unless reduce_op is a pure function. rmm::device_uvector keys(num_buffer_elements, handle.get_stream()); auto value_buffer = allocate_dataframe_buffer(num_buffer_elements, handle.get_stream()); @@ -913,20 +1140,6 @@ void update_frontier_v_push_if_out_nbr( edge_t{0}, thrust::plus()); - // FIXME: This is highly pessimistic for single GPU (and multi-GPU as well if we maintain - // additional per column data for filtering in e_op). If we can pause & resume execution if - // buffer needs to be increased (and if we reserve address space to avoid expensive - // reallocation; - // https://devblogs.nvidia.com/introducing-low-level-gpu-virtual-memory-management/), we can - // start with a smaller buffer size (especially when the frontier size is large). - // for special cases when we can assure that there is no more than one push per destination - // (e.g. if cugraph::reduce_op::any is used), we can limit the buffer size to - // std::min(max_pushes, matrix_partition.get_minor_size()). - // For Volta+, we can limit the buffer size to std::min(max_pushes, - // matrix_partition.get_minor_size()) if the reduction operation is a pure function if we use - // locking. - // FIXME: if i != 0, this will require costly reallocation if we don't use the new CUDA feature - // to reserve address space. auto new_buffer_size = buffer_idx.value(handle.get_stream()) + max_pushes; resize_dataframe_buffer(key_buffer, new_buffer_size, handle.get_stream()); if constexpr (!std::is_same_v) { diff --git a/cpp/src/traversal/bfs_impl.cuh b/cpp/src/traversal/bfs_impl.cuh index 5e3d01e7b3c..b61289dbaf8 100644 --- a/cpp/src/traversal/bfs_impl.cuh +++ b/cpp/src/traversal/bfs_impl.cuh @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -41,6 +42,46 @@ #include namespace cugraph { + +namespace { + +template +struct e_op_t { + std:: + conditional_t, uint32_t*> + visited_flags{nullptr}; + uint32_t const* prev_visited_flags{ + nullptr}; // relevant only if multi_gpu is false (this affects only local-computing with 0 + // impact in communication volume, so this may improve performance in small-scale but + // will eat-up more memory with no benefit in performance in large-scale). + vertex_t dst_first{}; // relevant only if multi_gpu is true + + __device__ thrust::optional operator()(vertex_t src, + vertex_t dst, + thrust::nullopt_t, + thrust::nullopt_t) const + { + thrust::optional ret{}; + if constexpr (multi_gpu) { + auto dst_offset = dst - dst_first; + auto old = atomicOr(visited_flags.get_iter(dst_offset), uint8_t{1}); + ret = old == uint8_t{0} ? thrust::optional{src} : thrust::nullopt; + } else { + auto mask = uint32_t{1} << (dst % (sizeof(uint32_t) * 8)); + if (*(prev_visited_flags + (dst / (sizeof(uint32_t) * 8))) & + mask) { // check if unvisited in previous iterations + ret = thrust::nullopt; + } else { // check if unvisited in this iteration as well + auto old = atomicOr(visited_flags + (dst / (sizeof(uint32_t) * 8)), mask); + ret = (old & mask) == 0 ? thrust::optional{src} : thrust::nullopt; + } + } + return ret; + } +}; + +} // namespace + namespace detail { template @@ -132,6 +173,22 @@ void bfs(raft::handle_t const& handle, vertex_frontier(handle); vertex_frontier.get_bucket(static_cast(Bucket::cur)).insert(sources, sources + n_sources); + rmm::device_uvector visited_flags( + (push_graph_view.get_number_of_local_vertices() + (sizeof(uint32_t) * 8 - 1)) / + (sizeof(uint32_t) * 8), + handle.get_stream()); + thrust::fill(handle.get_thrust_policy(), visited_flags.begin(), visited_flags.end(), uint32_t{0}); + rmm::device_uvector prev_visited_flags( + GraphViewType::is_multi_gpu ? size_t{0} : visited_flags.size(), + handle.get_stream()); // relevant only if GraphViewType::is_multi_gpu is false + auto dst_visited_flags = + GraphViewType::is_multi_gpu + ? col_properties_t(handle, push_graph_view) + : col_properties_t(); // relevant only if GraphViewType::is_multi_gpu is true + if constexpr (GraphViewType::is_multi_gpu) { + dst_visited_flags.fill(uint8_t{0}, handle.get_stream()); + } // 4. BFS iteration vertex_t depth{0}; @@ -139,8 +196,28 @@ void bfs(raft::handle_t const& handle, if (direction_optimizing) { CUGRAPH_FAIL("unimplemented."); } else { - auto vertex_partition = vertex_partition_device_view_t( - push_graph_view.get_vertex_partition_view()); + if (GraphViewType::is_multi_gpu) { + copy_to_adj_matrix_col(handle, + push_graph_view, + vertex_frontier.get_bucket(static_cast(Bucket::cur)).begin(), + vertex_frontier.get_bucket(static_cast(Bucket::cur)).end(), + thrust::make_constant_iterator(uint8_t{1}), + dst_visited_flags); + } else { + thrust::copy(handle.get_thrust_policy(), + visited_flags.begin(), + visited_flags.end(), + prev_visited_flags.begin()); + } + + e_op_t e_op{}; + if constexpr (GraphViewType::is_multi_gpu) { + e_op.visited_flags = dst_visited_flags.mutable_device_view(); + e_op.dst_first = push_graph_view.get_local_adj_matrix_partition_col_first(); + } else { + e_op.visited_flags = visited_flags.data(); + e_op.prev_visited_flags = prev_visited_flags.data(); + } update_frontier_v_push_if_out_nbr( handle, @@ -150,6 +227,12 @@ void bfs(raft::handle_t const& handle, std::vector{static_cast(Bucket::next)}, dummy_properties_t{}.device_view(), dummy_properties_t{}.device_view(), +#if 1 + e_op, +#else + // FIXME: need to test more about the performance trade-offs between additional + // communication in updating dst_visited_flags (+ using atomics) vs reduced number of pushes + // (leading to both less computation & communication in reduction) [vertex_partition, distances] __device__( vertex_t src, vertex_t dst, auto src_val, auto dst_val) { auto push = true; @@ -160,6 +243,7 @@ void bfs(raft::handle_t const& handle, } return push ? thrust::optional{src} : thrust::nullopt; }, +#endif reduce_op::any(), distances, thrust::make_zip_iterator(thrust::make_tuple(distances, predecessor_first)), diff --git a/cpp/tests/traversal/bfs_test.cpp b/cpp/tests/traversal/bfs_test.cpp index 57a2304f26e..5b75329ba46 100644 --- a/cpp/tests/traversal/bfs_test.cpp +++ b/cpp/tests/traversal/bfs_test.cpp @@ -110,7 +110,7 @@ class Tests_BFS : public ::testing::TestWithParam( - handle, input_usecase, true, renumber); + handle, input_usecase, false, renumber); if (cugraph::test::g_perf) { RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement @@ -157,7 +157,7 @@ class Tests_BFS : public ::testing::TestWithParam( - handle, input_usecase, true, false); + handle, input_usecase, false, false); } auto unrenumbered_graph_view = renumber ? unrenumbered_graph.view() : graph_view;