diff --git a/cpp/src/io/json/json_tree.cu b/cpp/src/io/json/json_tree.cu index dbf026c351e..cf041b02a20 100644 --- a/cpp/src/io/json/json_tree.cu +++ b/cpp/src/io/json/json_tree.cu @@ -29,6 +29,8 @@ #include +#include + #include #include #include @@ -39,6 +41,7 @@ #include #include #include +#include #include #include #include @@ -125,6 +128,75 @@ struct node_ranges { } }; +/** + * @brief Returns stable sorted keys and its sorted order + * + * Uses cub stable radix sort. The order is internally generated, hence it saves a copy and memory. + * Since the key and order is returned, using double buffer helps to avoid extra copy to user + * provided output iterator. + * + * @tparam IndexType sorted order type + * @tparam KeyType key type + * @param keys keys to sort + * @param stream CUDA stream used for device memory operations and kernel launches. + * @return Sorted keys and indices producing that sorted order + */ +template +std::pair, rmm::device_uvector> stable_sorted_key_order( + cudf::device_span keys, rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + + // Determine temporary device storage requirements + rmm::device_uvector keys_buffer1(keys.size(), stream); + rmm::device_uvector keys_buffer2(keys.size(), stream); + rmm::device_uvector order_buffer1(keys.size(), stream); + rmm::device_uvector order_buffer2(keys.size(), stream); + cub::DoubleBuffer order_buffer(order_buffer1.data(), order_buffer2.data()); + cub::DoubleBuffer keys_buffer(keys_buffer1.data(), keys_buffer2.data()); + size_t temp_storage_bytes = 0; + cub::DeviceRadixSort::SortPairs( + nullptr, temp_storage_bytes, keys_buffer, order_buffer, keys.size()); + rmm::device_buffer d_temp_storage(temp_storage_bytes, stream); + + thrust::copy(rmm::exec_policy(stream), keys.begin(), keys.end(), keys_buffer1.begin()); + thrust::sequence(rmm::exec_policy(stream), order_buffer1.begin(), order_buffer1.end()); + + cub::DeviceRadixSort::SortPairs( + d_temp_storage.data(), temp_storage_bytes, keys_buffer, order_buffer, keys.size()); + + return std::pair{keys_buffer.Current() == keys_buffer1.data() ? std::move(keys_buffer1) + : std::move(keys_buffer2), + order_buffer.Current() == order_buffer1.data() ? std::move(order_buffer1) + : std::move(order_buffer2)}; +} + +/** + * @brief Propagate parent node to siblings from first sibling. + * + * @param node_levels Node levels of each node + * @param parent_node_ids parent node ids initialized for first child of each push node, + * and other siblings are initialized to -1. + * @param stream CUDA stream used for device memory operations and kernel launches. + */ +void propagate_parent_to_siblings(cudf::device_span node_levels, + cudf::device_span parent_node_ids, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + auto [sorted_node_levels, sorted_order] = stable_sorted_key_order(node_levels, stream); + // instead of gather, using permutation_iterator, which is ~17% faster + + thrust::inclusive_scan_by_key( + rmm::exec_policy(stream), + sorted_node_levels.begin(), + sorted_node_levels.end(), + thrust::make_permutation_iterator(parent_node_ids.begin(), sorted_order.begin()), + thrust::make_permutation_iterator(parent_node_ids.begin(), sorted_order.begin()), + thrust::equal_to{}, + thrust::maximum{}); +} + // Generates a tree representation of the given tokens, token_indices. tree_meta_t get_tree_representation(device_span tokens, device_span token_indices, @@ -166,12 +238,86 @@ tree_meta_t get_tree_representation(device_span tokens, }; auto num_tokens = tokens.size(); - auto is_node_it = thrust::make_transform_iterator( - tokens.begin(), - [is_node] __device__(auto t) -> size_type { return static_cast(is_node(t)); }); - auto num_nodes = thrust::count_if( + auto num_nodes = thrust::count_if( rmm::exec_policy(stream), tokens.begin(), tokens.begin() + num_tokens, is_node); + // Node levels: transform_exclusive_scan, copy_if. + rmm::device_uvector node_levels(num_nodes, stream, mr); + { + rmm::device_uvector token_levels(num_tokens, stream); + auto push_pop_it = thrust::make_transform_iterator( + tokens.begin(), [does_push, does_pop] __device__(PdaTokenT const token) -> size_type { + return does_push(token) - does_pop(token); + }); + thrust::exclusive_scan( + rmm::exec_policy(stream), push_pop_it, push_pop_it + num_tokens, token_levels.begin()); + + auto node_levels_end = thrust::copy_if(rmm::exec_policy(stream), + token_levels.begin(), + token_levels.begin() + num_tokens, + tokens.begin(), + node_levels.begin(), + is_node); + CUDF_EXPECTS(thrust::distance(node_levels.begin(), node_levels_end) == num_nodes, + "node level count mismatch"); + } + + // Node parent ids: + // previous push node_id transform, stable sort by level, segmented scan with Max, reorder. + rmm::device_uvector parent_node_ids(num_nodes, stream, mr); + // This block of code is generalized logical stack algorithm. TODO: make this a seperate function. + { + rmm::device_uvector node_token_ids(num_nodes, stream); + thrust::copy_if(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(0) + num_tokens, + tokens.begin(), + node_token_ids.begin(), + is_node); + + // previous push node_id + // if previous node is a push, then i-1 + // if previous node is FE, then i-2 (returns FB's index) + // if previous node is SMB and its previous node is a push, then i-2 + // eg. `{ SMB FB FE VB VE SME` -> `{` index as FB's parent. + // else -1 + auto first_childs_parent_token_id = [tokens_gpu = + tokens.begin()] __device__(auto i) -> NodeIndexT { + if (i <= 0) { return -1; } + if (tokens_gpu[i - 1] == token_t::StructBegin or tokens_gpu[i - 1] == token_t::ListBegin) { + return i - 1; + } else if (tokens_gpu[i - 1] == token_t::FieldNameEnd) { + return i - 2; + } else if (tokens_gpu[i - 1] == token_t::StructMemberBegin and + (tokens_gpu[i - 2] == token_t::StructBegin || + tokens_gpu[i - 2] == token_t::ListBegin)) { + return i - 2; + } else { + return -1; + } + }; + + thrust::transform( + rmm::exec_policy(stream), + node_token_ids.begin(), + node_token_ids.end(), + parent_node_ids.begin(), + [node_ids_gpu = node_token_ids.begin(), num_nodes, first_childs_parent_token_id] __device__( + NodeIndexT const tid) -> NodeIndexT { + auto pid = first_childs_parent_token_id(tid); + return pid < 0 + ? parent_node_sentinel + : thrust::lower_bound(thrust::seq, node_ids_gpu, node_ids_gpu + num_nodes, pid) - + node_ids_gpu; + // parent_node_sentinel is -1, useful for segmented max operation below + }); + } + // Propagate parent node to siblings from first sibling - inplace. + propagate_parent_to_siblings( + cudf::device_span{node_levels.data(), node_levels.size()}, + parent_node_ids, + stream); + // Node categories: copy_if with transform. rmm::device_uvector node_categories(num_nodes, stream, mr); auto node_categories_it = @@ -184,24 +330,6 @@ tree_meta_t get_tree_representation(device_span tokens, CUDF_EXPECTS(node_categories_end - node_categories_it == num_nodes, "node category count mismatch"); - // Node levels: transform_exclusive_scan, copy_if. - rmm::device_uvector token_levels(num_tokens, stream); - auto push_pop_it = thrust::make_transform_iterator( - tokens.begin(), [does_push, does_pop] __device__(PdaTokenT const token) -> size_type { - return does_push(token) - does_pop(token); - }); - thrust::exclusive_scan( - rmm::exec_policy(stream), push_pop_it, push_pop_it + num_tokens, token_levels.begin()); - - rmm::device_uvector node_levels(num_nodes, stream, mr); - auto node_levels_end = thrust::copy_if(rmm::exec_policy(stream), - token_levels.begin(), - token_levels.begin() + num_tokens, - tokens.begin(), - node_levels.begin(), - is_node); - CUDF_EXPECTS(node_levels_end - node_levels.begin() == num_nodes, "node level count mismatch"); - // Node ranges: copy_if with transform. rmm::device_uvector node_range_begin(num_nodes, stream, mr); rmm::device_uvector node_range_end(num_nodes, stream, mr); @@ -223,69 +351,6 @@ tree_meta_t get_tree_representation(device_span tokens, }); CUDF_EXPECTS(node_range_out_end - node_range_out_it == num_nodes, "node range count mismatch"); - // Node parent ids: previous push token_id transform, stable sort, segmented scan with Max, - // reorder, copy_if. This one is sort of logical stack. But more generalized. - // TODO: make it own function. - rmm::device_uvector parent_token_ids(num_tokens, stream); - rmm::device_uvector initial_order(num_tokens, stream); - // TODO re-write the algorithm to work only on nodes, not tokens. - - thrust::sequence(rmm::exec_policy(stream), initial_order.begin(), initial_order.end()); - thrust::tabulate(rmm::exec_policy(stream), - parent_token_ids.begin(), - parent_token_ids.end(), - [does_push, tokens_gpu = tokens.begin()] __device__(auto i) -> size_type { - return (i > 0) && does_push(tokens_gpu[i - 1]) ? i - 1 : -1; - // -1, not sentinel used here because of max operation below - }); - - auto out_pid = thrust::make_zip_iterator(parent_token_ids.data(), initial_order.data()); - // Uses radix sort for builtin types. - thrust::stable_sort_by_key(rmm::exec_policy(stream), - token_levels.data(), - token_levels.data() + token_levels.size(), - out_pid); - - // SegmentedScan Max. - thrust::inclusive_scan_by_key(rmm::exec_policy(stream), - token_levels.data(), - token_levels.data() + token_levels.size(), - parent_token_ids.data(), - parent_token_ids.data(), - thrust::equal_to{}, - thrust::maximum{}); - - // scatter to restore the original order. - { - rmm::device_uvector temp_storage(num_tokens, stream); - thrust::scatter(rmm::exec_policy(stream), - parent_token_ids.begin(), - parent_token_ids.end(), - initial_order.begin(), - temp_storage.begin()); - thrust::copy( - rmm::exec_policy(stream), temp_storage.begin(), temp_storage.end(), parent_token_ids.begin()); - } - - rmm::device_uvector node_ids_gpu(num_tokens, stream); - thrust::exclusive_scan( - rmm::exec_policy(stream), is_node_it, is_node_it + num_tokens, node_ids_gpu.begin()); - - rmm::device_uvector parent_node_ids(num_nodes, stream, mr); - auto parent_node_ids_it = thrust::make_transform_iterator( - parent_token_ids.begin(), - [node_ids_gpu = node_ids_gpu.begin()] __device__(size_type const pid) -> NodeIndexT { - return pid < 0 ? parent_node_sentinel : node_ids_gpu[pid]; - }); - auto parent_node_ids_end = thrust::copy_if(rmm::exec_policy(stream), - parent_node_ids_it, - parent_node_ids_it + parent_token_ids.size(), - tokens.begin(), - parent_node_ids.begin(), - is_node); - CUDF_EXPECTS(parent_node_ids_end - parent_node_ids.begin() == num_nodes, - "parent node id gather mismatch"); - return {std::move(node_categories), std::move(parent_node_ids), std::move(node_levels),