From 836f800e61acafa0fa6b3c7d9826904f0ba2ad06 Mon Sep 17 00:00:00 2001 From: Conor Hoekstra <36027403+codereport@users.noreply.github.com> Date: Wed, 1 Dec 2021 16:46:14 -0500 Subject: [PATCH 01/25] Use CTAD with Thrust function objects (#9768) While reviewing another PR, I noticed unnecessary usage of explicit template parameters with Thrust function objects and decided to open a small PR to clean this up (CTAD showed up in C++17). CI depends on https://github.com/rapidsai/cudf/pull/9766 Authors: - Conor Hoekstra (https://github.com/codereport) Approvers: - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu) - Mike Wilson (https://github.com/hyperbolic2346) - Mark Harris (https://github.com/harrism) URL: https://github.com/rapidsai/cudf/pull/9768 --- cpp/include/cudf/strings/detail/gather.cuh | 2 +- cpp/include/cudf_test/column_wrapper.hpp | 7 ++----- cpp/src/copying/concatenate.cu | 2 +- cpp/src/groupby/sort/group_merge_m2.cu | 4 ++-- cpp/src/groupby/sort/group_rank_scan.cu | 2 +- cpp/src/groupby/sort/group_scan_util.cuh | 6 +++--- .../sort/group_single_pass_reduction_util.cuh | 16 ++++++++-------- cpp/src/groupby/sort/group_tdigest.cu | 10 +++++----- cpp/src/join/hash_join.cu | 2 +- cpp/src/join/join_utils.cu | 2 +- .../lists/combine/concatenate_list_elements.cu | 2 +- cpp/src/lists/contains.cu | 7 ++----- cpp/src/lists/interleave_columns.cu | 8 ++++---- cpp/src/quantiles/tdigest/tdigest.cu | 7 ++----- cpp/src/reductions/scan/scan_inclusive.cu | 9 ++++----- cpp/src/rolling/grouped_rolling.cu | 6 +++--- cpp/src/rolling/rolling_collect_list.cu | 2 +- cpp/src/sort/rank.cu | 10 +++++----- cpp/src/strings/copying/concatenate.cu | 2 +- cpp/src/strings/findall.cu | 7 ++----- cpp/src/strings/repeat_strings.cu | 2 +- cpp/src/strings/split/split.cu | 14 ++++---------- cpp/tests/iterator/iterator_tests.cuh | 11 +++-------- .../apply_boolean_mask_tests.cpp | 4 ++-- cpp/tests/strings/fixed_point_tests.cpp | 2 +- cpp/tests/transform/row_bit_count_test.cu | 6 ++---- 26 files changed, 63 insertions(+), 89 deletions(-) diff --git a/cpp/include/cudf/strings/detail/gather.cuh b/cpp/include/cudf/strings/detail/gather.cuh index ec4a88a0e46..eb7258830ce 100644 --- a/cpp/include/cudf/strings/detail/gather.cuh +++ b/cpp/include/cudf/strings/detail/gather.cuh @@ -315,7 +315,7 @@ std::unique_ptr gather( d_out_offsets + output_count, [] __device__(auto size) { return static_cast(size); }, size_t{0}, - thrust::plus{}); + thrust::plus{}); CUDF_EXPECTS(total_bytes < static_cast(std::numeric_limits::max()), "total size of output strings is too large for a cudf column"); diff --git a/cpp/include/cudf_test/column_wrapper.hpp b/cpp/include/cudf_test/column_wrapper.hpp index cd2ac9f3ec1..ccfdde2270c 100644 --- a/cpp/include/cudf_test/column_wrapper.hpp +++ b/cpp/include/cudf_test/column_wrapper.hpp @@ -1502,11 +1502,8 @@ class lists_column_wrapper : public detail::column_wrapper { // concatenate them together, skipping children that are null. std::vector children; - thrust::copy_if(std::cbegin(cols), - std::cend(cols), - valids, // stencil - std::back_inserter(children), - thrust::identity{}); + thrust::copy_if( + std::cbegin(cols), std::cend(cols), valids, std::back_inserter(children), thrust::identity{}); auto data = children.empty() ? cudf::empty_like(expected_hierarchy) : concatenate(children); diff --git a/cpp/src/copying/concatenate.cu b/cpp/src/copying/concatenate.cu index f4b6a8bf5fd..34c0cea683e 100644 --- a/cpp/src/copying/concatenate.cu +++ b/cpp/src/copying/concatenate.cu @@ -79,7 +79,7 @@ auto create_device_views(host_span views, rmm::cuda_stream_vi device_views.cend(), std::next(offsets.begin()), [](auto const& col) { return col.size(); }, - thrust::plus{}); + thrust::plus{}); auto d_offsets = make_device_uvector_async(offsets, stream); auto const output_size = offsets.back(); diff --git a/cpp/src/groupby/sort/group_merge_m2.cu b/cpp/src/groupby/sort/group_merge_m2.cu index 4e2a5b68abc..bde7c985df1 100644 --- a/cpp/src/groupby/sort/group_merge_m2.cu +++ b/cpp/src/groupby/sort/group_merge_m2.cu @@ -173,8 +173,8 @@ std::unique_ptr group_merge_m2(column_view const& values, // Generate bitmask for the output. // Only mean and M2 values can be nullable. Count column must be non-nullable. - auto [null_mask, null_count] = cudf::detail::valid_if( - validities.begin(), validities.end(), thrust::identity{}, stream, mr); + auto [null_mask, null_count] = + cudf::detail::valid_if(validities.begin(), validities.end(), thrust::identity{}, stream, mr); if (null_count > 0) { result_means->set_null_mask(null_mask, null_count); // copy null_mask result_M2s->set_null_mask(std::move(null_mask), null_count); // take over null_mask diff --git a/cpp/src/groupby/sort/group_rank_scan.cu b/cpp/src/groupby/sort/group_rank_scan.cu index 935ef9554a9..f36bdc0a660 100644 --- a/cpp/src/groupby/sort/group_rank_scan.cu +++ b/cpp/src/groupby/sort/group_rank_scan.cu @@ -79,7 +79,7 @@ std::unique_ptr rank_generator(column_view const& order_by, group_labels.end(), mutable_ranks.begin(), mutable_ranks.begin(), - thrust::equal_to{}, + thrust::equal_to{}, scan_op); return ranks; diff --git a/cpp/src/groupby/sort/group_scan_util.cuh b/cpp/src/groupby/sort/group_scan_util.cuh index ae3e3232e06..e25fdd6fc27 100644 --- a/cpp/src/groupby/sort/group_scan_util.cuh +++ b/cpp/src/groupby/sort/group_scan_util.cuh @@ -115,7 +115,7 @@ struct group_scan_functor() group_labels.end(), inp_iter, out_iter, - thrust::equal_to{}, + thrust::equal_to{}, binop); }; @@ -160,7 +160,7 @@ struct group_scan_functor{}, + thrust::equal_to{}, binop); }; @@ -214,7 +214,7 @@ struct group_scan_functor{}, + thrust::equal_to{}, binop); }; diff --git a/cpp/src/groupby/sort/group_single_pass_reduction_util.cuh b/cpp/src/groupby/sort/group_single_pass_reduction_util.cuh index decb127b264..95a36f40e57 100644 --- a/cpp/src/groupby/sort/group_single_pass_reduction_util.cuh +++ b/cpp/src/groupby/sort/group_single_pass_reduction_util.cuh @@ -191,7 +191,7 @@ struct group_reduction_functor{}, + thrust::equal_to{}, binop); }; @@ -215,10 +215,10 @@ struct group_reduction_functor validity(num_groups, stream); do_reduction(cudf::detail::make_validity_iterator(*d_values_ptr), validity.begin(), - thrust::logical_or{}); + thrust::logical_or{}); - auto [null_mask, null_count] = cudf::detail::valid_if( - validity.begin(), validity.end(), thrust::identity{}, stream, mr); + auto [null_mask, null_count] = + cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity{}, stream, mr); result->set_null_mask(std::move(null_mask), null_count); } return result; @@ -264,7 +264,7 @@ struct group_reduction_functor< inp_iter, thrust::make_discard_iterator(), out_iter, - thrust::equal_to{}, + thrust::equal_to{}, binop); }; @@ -283,10 +283,10 @@ struct group_reduction_functor< auto validity = rmm::device_uvector(num_groups, stream); do_reduction(cudf::detail::make_validity_iterator(*d_values_ptr), validity.begin(), - thrust::logical_or{}); + thrust::logical_or{}); - auto [null_mask, null_count] = cudf::detail::valid_if( - validity.begin(), validity.end(), thrust::identity{}, stream, mr); + auto [null_mask, null_count] = + cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity{}, stream, mr); result->set_null_mask(std::move(null_mask), null_count); } else { auto const binop = diff --git a/cpp/src/groupby/sort/group_tdigest.cu b/cpp/src/groupby/sort/group_tdigest.cu index 146a6a8c31c..551eb128231 100644 --- a/cpp/src/groupby/sort/group_tdigest.cu +++ b/cpp/src/groupby/sort/group_tdigest.cu @@ -625,7 +625,7 @@ std::unique_ptr compute_tdigests(int delta, centroids_begin, // values thrust::make_discard_iterator(), // key output output, // output - thrust::equal_to{}, // key equality check + thrust::equal_to{}, // key equality check merge_centroids{}); // create final tdigest column @@ -850,8 +850,8 @@ std::unique_ptr group_merge_tdigest(column_view const& input, min_iter, thrust::make_discard_iterator(), merged_min_col->mutable_view().begin(), - thrust::equal_to{}, // key equality check - thrust::minimum{}); + thrust::equal_to{}, // key equality check + thrust::minimum{}); auto merged_max_col = cudf::make_numeric_column( data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr); @@ -864,8 +864,8 @@ std::unique_ptr group_merge_tdigest(column_view const& input, max_iter, thrust::make_discard_iterator(), merged_max_col->mutable_view().begin(), - thrust::equal_to{}, // key equality check - thrust::maximum{}); + thrust::equal_to{}, // key equality check + thrust::maximum{}); // for any empty groups, set the min and max to be 0. not technically necessary but it makes // testing simpler. diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index e4bd1938ecc..c5b680f129e 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -266,7 +266,7 @@ std::size_t get_full_join_size(cudf::table_device_view build_table, left_join_complement_size = thrust::count_if(rmm::exec_policy(stream), invalid_index_map->begin(), invalid_index_map->end(), - thrust::identity()); + thrust::identity()); } return join_size + left_join_complement_size; } diff --git a/cpp/src/join/join_utils.cu b/cpp/src/join/join_utils.cu index 4aca4b4a9cf..9e98f87e7f0 100644 --- a/cpp/src/join/join_utils.cu +++ b/cpp/src/join/join_utils.cu @@ -136,7 +136,7 @@ get_left_join_indices_complement(std::unique_ptr> thrust::make_counting_iterator(end_counter), invalid_index_map->begin(), right_indices_complement->begin(), - thrust::identity()) - + thrust::identity{}) - right_indices_complement->begin(); right_indices_complement->resize(indices_count, stream); } diff --git a/cpp/src/lists/combine/concatenate_list_elements.cu b/cpp/src/lists/combine/concatenate_list_elements.cu index 4bef312b396..2ddede97ce4 100644 --- a/cpp/src/lists/combine/concatenate_list_elements.cu +++ b/cpp/src/lists/combine/concatenate_list_elements.cu @@ -225,7 +225,7 @@ std::unique_ptr concatenate_lists_nullifying_rows(column_view const& inp auto list_entries = gather_list_entries(input, offsets_view, num_rows, num_output_entries, stream, mr); auto [null_mask, null_count] = cudf::detail::valid_if( - list_validities.begin(), list_validities.end(), thrust::identity{}, stream, mr); + list_validities.begin(), list_validities.end(), thrust::identity{}, stream, mr); return make_lists_column(num_rows, std::move(list_offsets), diff --git a/cpp/src/lists/contains.cu b/cpp/src/lists/contains.cu index bdbc9ae013c..b48982d205a 100644 --- a/cpp/src/lists/contains.cu +++ b/cpp/src/lists/contains.cu @@ -74,11 +74,8 @@ struct lookup_functor { if (!search_keys_have_nulls && !input_lists.has_nulls() && !input_lists.child().has_nulls()) { return {rmm::device_buffer{0, stream, mr}, size_type{0}}; } else { - return cudf::detail::valid_if(result_validity.begin(), - result_validity.end(), - thrust::identity{}, - stream, - mr); + return cudf::detail::valid_if( + result_validity.begin(), result_validity.end(), thrust::identity{}, stream, mr); } } diff --git a/cpp/src/lists/interleave_columns.cu b/cpp/src/lists/interleave_columns.cu index b9b73d98ed2..220cb25a942 100644 --- a/cpp/src/lists/interleave_columns.cu +++ b/cpp/src/lists/interleave_columns.cu @@ -228,8 +228,8 @@ struct interleave_list_entries_impl{}, stream, mr); + auto [null_mask, null_count] = + cudf::detail::valid_if(validities.begin(), validities.end(), thrust::identity{}, stream, mr); return make_strings_column(num_output_entries, std::move(offsets_column), @@ -306,7 +306,7 @@ struct interleave_list_entries_impl( if (data_has_null_mask) { auto [null_mask, null_count] = cudf::detail::valid_if( - validities.begin(), validities.end(), thrust::identity{}, stream, mr); + validities.begin(), validities.end(), thrust::identity{}, stream, mr); if (null_count > 0) { output->set_null_mask(null_mask, null_count); } } @@ -405,7 +405,7 @@ std::unique_ptr interleave_columns(table_view const& input, } auto [null_mask, null_count] = cudf::detail::valid_if( - list_validities.begin(), list_validities.end(), thrust::identity{}, stream, mr); + list_validities.begin(), list_validities.end(), thrust::identity{}, stream, mr); return make_lists_column(num_output_lists, std::move(list_offsets), std::move(list_entries), diff --git a/cpp/src/quantiles/tdigest/tdigest.cu b/cpp/src/quantiles/tdigest/tdigest.cu index 57c221b15ed..18e7d02d086 100644 --- a/cpp/src/quantiles/tdigest/tdigest.cu +++ b/cpp/src/quantiles/tdigest/tdigest.cu @@ -348,11 +348,8 @@ std::unique_ptr percentile_approx(tdigest_column_view const& input, if (null_count == 0) { return std::pair{rmm::device_buffer{}, null_count}; } - return cudf::detail::valid_if(tdigest_is_empty, - tdigest_is_empty + tdv.size(), - thrust::logical_not{}, - stream, - mr); + return cudf::detail::valid_if( + tdigest_is_empty, tdigest_is_empty + tdv.size(), thrust::logical_not{}, stream, mr); }(); return cudf::make_lists_column( diff --git a/cpp/src/reductions/scan/scan_inclusive.cu b/cpp/src/reductions/scan/scan_inclusive.cu index 70f5ca90539..b0e761c4c3b 100644 --- a/cpp/src/reductions/scan/scan_inclusive.cu +++ b/cpp/src/reductions/scan/scan_inclusive.cu @@ -50,11 +50,10 @@ rmm::device_buffer mask_scan(column_view const& input_view, auto valid_itr = detail::make_validity_iterator(*d_input); auto first_null_position = [&] { - size_type const first_null = thrust::find_if_not(rmm::exec_policy(stream), - valid_itr, - valid_itr + input_view.size(), - thrust::identity{}) - - valid_itr; + size_type const first_null = + thrust::find_if_not( + rmm::exec_policy(stream), valid_itr, valid_itr + input_view.size(), thrust::identity{}) - + valid_itr; size_type const exclusive_offset = (inclusive == scan_type::EXCLUSIVE) ? 1 : 0; return std::min(input_view.size(), first_null + exclusive_offset); }(); diff --git a/cpp/src/rolling/grouped_rolling.cu b/cpp/src/rolling/grouped_rolling.cu index 509f67bb5c6..5a7f15148d8 100644 --- a/cpp/src/rolling/grouped_rolling.cu +++ b/cpp/src/rolling/grouped_rolling.cu @@ -142,8 +142,8 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, preceding_window] __device__(size_type idx) { auto group_label = d_group_labels[idx]; auto group_start = d_group_offsets[group_label]; - return thrust::minimum{}(preceding_window, - idx - group_start + 1); // Preceding includes current row. + return thrust::minimum{}(preceding_window, + idx - group_start + 1); // Preceding includes current row. }; auto following_calculator = [d_group_offsets = group_offsets.data(), @@ -152,7 +152,7 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, auto group_label = d_group_labels[idx]; auto group_end = d_group_offsets[group_label + 1]; // Cannot fall off the end, since offsets // is capped with `input.size()`. - return thrust::minimum{}(following_window, (group_end - 1) - idx); + return thrust::minimum{}(following_window, (group_end - 1) - idx); }; if (aggr.kind == aggregation::CUDA || aggr.kind == aggregation::PTX) { diff --git a/cpp/src/rolling/rolling_collect_list.cu b/cpp/src/rolling/rolling_collect_list.cu index ecef90dc8e1..30c39bde7d2 100644 --- a/cpp/src/rolling/rolling_collect_list.cu +++ b/cpp/src/rolling/rolling_collect_list.cu @@ -75,7 +75,7 @@ std::unique_ptr get_list_child_to_list_row_mapping(cudf::column_view con per_row_mapping_begin, per_row_mapping_begin + num_child_rows, per_row_mapping_begin, - thrust::maximum{}); + thrust::maximum{}); return per_row_mapping; } diff --git a/cpp/src/sort/rank.cu b/cpp/src/sort/rank.cu index c8a908e44cd..e9589e6c4b3 100644 --- a/cpp/src/sort/rank.cu +++ b/cpp/src/sort/rank.cu @@ -117,7 +117,7 @@ void tie_break_ranks_transform(cudf::device_span dense_rank_sor tie_iter, thrust::make_discard_iterator(), tie_sorted.begin(), - thrust::equal_to{}, + thrust::equal_to{}, tie_breaker); auto sorted_tied_rank = thrust::make_transform_iterator( dense_rank_sorted.begin(), @@ -171,8 +171,8 @@ void rank_min(cudf::device_span group_keys, thrust::make_counting_iterator(1), sorted_order_view, rank_mutable_view.begin(), - thrust::minimum{}, - thrust::identity{}, + thrust::minimum{}, + thrust::identity{}, stream); } @@ -189,8 +189,8 @@ void rank_max(cudf::device_span group_keys, thrust::make_counting_iterator(1), sorted_order_view, rank_mutable_view.begin(), - thrust::maximum{}, - thrust::identity{}, + thrust::maximum{}, + thrust::identity{}, stream); } diff --git a/cpp/src/strings/copying/concatenate.cu b/cpp/src/strings/copying/concatenate.cu index db8b37a9592..3822fa8bf5a 100644 --- a/cpp/src/strings/copying/concatenate.cu +++ b/cpp/src/strings/copying/concatenate.cu @@ -96,7 +96,7 @@ auto create_strings_device_views(host_span views, rmm::cuda_s device_views_ptr + views.size(), std::next(d_partition_offsets.begin()), chars_size_transform{}, - thrust::plus{}); + thrust::plus{}); auto const output_chars_size = d_partition_offsets.back_element(stream); stream.synchronize(); // ensure copy of output_chars_size is complete before returning diff --git a/cpp/src/strings/findall.cu b/cpp/src/strings/findall.cu index 3ab5b55020c..8d96f0de415 100644 --- a/cpp/src/strings/findall.cu +++ b/cpp/src/strings/findall.cu @@ -153,11 +153,8 @@ std::unique_ptr findall_re( std::vector> results; - size_type const columns = thrust::reduce(rmm::exec_policy(stream), - find_counts.begin(), - find_counts.end(), - 0, - thrust::maximum{}); + size_type const columns = thrust::reduce( + rmm::exec_policy(stream), find_counts.begin(), find_counts.end(), 0, thrust::maximum{}); // boundary case: if no columns, return all nulls column (issue #119) if (columns == 0) results.emplace_back(std::make_unique( diff --git a/cpp/src/strings/repeat_strings.cu b/cpp/src/strings/repeat_strings.cu index 458f3ed885c..7820e0064a6 100644 --- a/cpp/src/strings/repeat_strings.cu +++ b/cpp/src/strings/repeat_strings.cu @@ -369,7 +369,7 @@ std::pair, int64_t> repeat_strings_output_sizes( thrust::make_counting_iterator(strings_count), fn, int64_t{0}, - thrust::plus{}); + thrust::plus{}); return std::make_pair(std::move(output_sizes), total_bytes); } diff --git a/cpp/src/strings/split/split.cu b/cpp/src/strings/split/split.cu index 5113b418501..c6e52a79059 100644 --- a/cpp/src/strings/split/split.cu +++ b/cpp/src/strings/split/split.cu @@ -490,11 +490,8 @@ std::unique_ptr
split_fn(strings_column_view const& strings_column, }); // the columns_count is the maximum number of tokens for any string - auto const columns_count = thrust::reduce(rmm::exec_policy(stream), - token_counts.begin(), - token_counts.end(), - 0, - thrust::maximum{}); + auto const columns_count = thrust::reduce( + rmm::exec_policy(stream), token_counts.begin(), token_counts.end(), 0, thrust::maximum{}); // boundary case: if no columns, return one null column (custrings issue #119) if (columns_count == 0) { results.push_back(std::make_unique( @@ -748,11 +745,8 @@ std::unique_ptr
whitespace_split_fn(size_type strings_count, [tokenizer] __device__(size_type idx) { return tokenizer.count_tokens(idx); }); // column count is the maximum number of tokens for any string - size_type const columns_count = thrust::reduce(rmm::exec_policy(stream), - token_counts.begin(), - token_counts.end(), - 0, - thrust::maximum{}); + size_type const columns_count = thrust::reduce( + rmm::exec_policy(stream), token_counts.begin(), token_counts.end(), 0, thrust::maximum{}); std::vector> results; // boundary case: if no columns, return one null column (issue #119) diff --git a/cpp/tests/iterator/iterator_tests.cuh b/cpp/tests/iterator/iterator_tests.cuh index 07eb595449c..d93c1275122 100644 --- a/cpp/tests/iterator/iterator_tests.cuh +++ b/cpp/tests/iterator/iterator_tests.cuh @@ -51,13 +51,8 @@ struct IteratorTest : public cudf::test::BaseFixture { // Get temporary storage size size_t temp_storage_bytes = 0; - cub::DeviceReduce::Reduce(nullptr, - temp_storage_bytes, - d_in, - dev_result.begin(), - num_items, - thrust::minimum{}, - init); + cub::DeviceReduce::Reduce( + nullptr, temp_storage_bytes, d_in, dev_result.begin(), num_items, thrust::minimum{}, init); // Allocate temporary storage rmm::device_buffer d_temp_storage(temp_storage_bytes, rmm::cuda_stream_default); @@ -68,7 +63,7 @@ struct IteratorTest : public cudf::test::BaseFixture { d_in, dev_result.begin(), num_items, - thrust::minimum{}, + thrust::minimum{}, init); evaluate(expected, dev_result, "cub test"); diff --git a/cpp/tests/stream_compaction/apply_boolean_mask_tests.cpp b/cpp/tests/stream_compaction/apply_boolean_mask_tests.cpp index 813cceb0861..c80a8fba55c 100644 --- a/cpp/tests/stream_compaction/apply_boolean_mask_tests.cpp +++ b/cpp/tests/stream_compaction/apply_boolean_mask_tests.cpp @@ -204,13 +204,13 @@ TEST_F(ApplyBooleanMask, FixedPointLargeColumnTest) dec32_data.cend(), mask_data.cbegin(), std::back_inserter(expect_dec32_data), - thrust::identity()); + thrust::identity{}); thrust::copy_if(thrust::seq, dec64_data.cbegin(), dec64_data.cend(), mask_data.cbegin(), std::back_inserter(expect_dec64_data), - thrust::identity()); + thrust::identity{}); decimal32_wrapper expect_col32( expect_dec32_data.begin(), expect_dec32_data.end(), numeric::scale_type{-3}); diff --git a/cpp/tests/strings/fixed_point_tests.cpp b/cpp/tests/strings/fixed_point_tests.cpp index ce4280e0733..5872a9e5bb7 100644 --- a/cpp/tests/strings/fixed_point_tests.cpp +++ b/cpp/tests/strings/fixed_point_tests.cpp @@ -329,4 +329,4 @@ TEST_F(StringsConvertTest, DISABLED_FixedPointStringConversionOperator) auto const c = numeric::decimal128{numeric::scaled_integer{max, numeric::scale_type{-38}}}; EXPECT_EQ(static_cast(c), "1.70141183460469231731687303715884105727"); -} \ No newline at end of file +} diff --git a/cpp/tests/transform/row_bit_count_test.cu b/cpp/tests/transform/row_bit_count_test.cu index 7fb7326f221..43d63c9fd22 100644 --- a/cpp/tests/transform/row_bit_count_test.cu +++ b/cpp/tests/transform/row_bit_count_test.cu @@ -239,10 +239,8 @@ TEST_F(RowBitCount, StructsWithLists_RowsExceedingASingleBlock) // List child column = {0, 1, 2, 3, 4, ..., 2*num_rows}; auto ints = make_numeric_column(data_type{type_id::INT32}, num_rows * 2); auto ints_view = ints->mutable_view(); - thrust::tabulate(thrust::device, - ints_view.begin(), - ints_view.end(), - thrust::identity()); + thrust::tabulate( + thrust::device, ints_view.begin(), ints_view.end(), thrust::identity{}); // List offsets = {0, 2, 4, 6, 8, ..., num_rows*2}; auto list_offsets = make_numeric_column(data_type{type_id::INT32}, num_rows + 1); From 677e63236a81ea3c402df993845a1fdc98072c9e Mon Sep 17 00:00:00 2001 From: Conor Hoekstra <36027403+codereport@users.noreply.github.com> Date: Wed, 1 Dec 2021 16:46:25 -0500 Subject: [PATCH 02/25] Avoid overflow for `fixed_point` `cudf::cast` and performance optimization (#9772) This resolves https://github.com/rapidsai/cudf/issues/9000. When using `cudf::cast` for a wider decimal type to a narrower decimal type, you can overflow. This PR modifies the code path for this specific use case so that the "rescale" happens for the type cast. A small perf improvement was added when you have identical scales to avoid rescaling. CI depends on https://github.com/rapidsai/cudf/pull/9766 Authors: - Conor Hoekstra (https://github.com/codereport) Approvers: - Nghia Truong (https://github.com/ttnghia) - Mike Wilson (https://github.com/hyperbolic2346) - Vukasin Milovanovic (https://github.com/vuule) URL: https://github.com/rapidsai/cudf/pull/9772 --- cpp/src/unary/cast_ops.cu | 49 +++++++++++++++++++++------------- cpp/tests/unary/cast_tests.cpp | 13 +++++++++ 2 files changed, 43 insertions(+), 19 deletions(-) diff --git a/cpp/src/unary/cast_ops.cu b/cpp/src/unary/cast_ops.cu index e852b00796a..131fde11cf8 100644 --- a/cpp/src/unary/cast_ops.cu +++ b/cpp/src/unary/cast_ops.cu @@ -305,28 +305,39 @@ struct dispatch_unary_cast_to { rmm::mr::device_memory_resource* mr) { using namespace numeric; - - auto const size = input.size(); - auto temporary = - std::make_unique(cudf::data_type{type.id(), input.type().scale()}, - size, - rmm::device_buffer{size * cudf::size_of(type), stream}, - copy_bitmask(input, stream), - input.null_count()); - using SourceDeviceT = device_storage_type_t; using TargetDeviceT = device_storage_type_t; - mutable_column_view output_mutable = *temporary; - - thrust::transform(rmm::exec_policy(stream), - input.begin(), - input.end(), - output_mutable.begin(), - device_cast{}); - - // clearly there is a more efficient way to do this, can optimize in the future - return rescale(*temporary, numeric::scale_type{type.scale()}, stream, mr); + auto casted = [&]() { + auto const size = input.size(); + auto output = std::make_unique(cudf::data_type{type.id(), input.type().scale()}, + size, + rmm::device_buffer{size * cudf::size_of(type), stream}, + copy_bitmask(input, stream), + input.null_count()); + + mutable_column_view output_mutable = *output; + + thrust::transform(rmm::exec_policy(stream), + input.begin(), + input.end(), + output_mutable.begin(), + device_cast{}); + + return output; + }; + + if (input.type().scale() == type.scale()) return casted(); + + if constexpr (sizeof(SourceDeviceT) < sizeof(TargetDeviceT)) { + // device_cast BEFORE rescale when SourceDeviceT is < TargetDeviceT + auto temporary = casted(); + return detail::rescale(*temporary, scale_type{type.scale()}, stream, mr); + } else { + // device_cast AFTER rescale when SourceDeviceT is > TargetDeviceT to avoid overflow + auto temporary = detail::rescale(input, scale_type{type.scale()}, stream, mr); + return detail::cast(*temporary, type, stream, mr); + } } template view()); } + +TEST_F(FixedPointTestSingleType, Int32ToInt64Convert) +{ + using namespace numeric; + using fp_wrapperA = cudf::test::fixed_point_column_wrapper; + using fp_wrapperB = cudf::test::fixed_point_column_wrapper; + + auto const input = fp_wrapperB{{141230900000L}, scale_type{-10}}; + auto const expected = fp_wrapperA{{14123}, scale_type{-3}}; + auto const result = cudf::cast(input, make_fixed_point_data_type(-3)); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->view()); +} From 7d8a8e53f495279ae129fa46948c07230d6e77b4 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 1 Dec 2021 13:53:05 -0800 Subject: [PATCH 03/25] Allow cast decimal128 to string and add tests (#9756) Small PR that enables Decimal128 cast Authors: - Raza Jafri (https://github.com/razajafri) Approvers: - Jason Lowe (https://github.com/jlowe) URL: https://github.com/rapidsai/cudf/pull/9756 --- java/src/main/native/src/ColumnViewJni.cpp | 3 ++- .../java/ai/rapids/cudf/ColumnVectorTest.java | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/java/src/main/native/src/ColumnViewJni.cpp b/java/src/main/native/src/ColumnViewJni.cpp index 4efac307627..02d5dc4569c 100644 --- a/java/src/main/native/src/ColumnViewJni.cpp +++ b/java/src/main/native/src/ColumnViewJni.cpp @@ -916,7 +916,8 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ColumnView_castTo(JNIEnv *env, jclas case cudf::type_id::INT64: case cudf::type_id::UINT64: result = cudf::strings::from_integers(*column); break; case cudf::type_id::DECIMAL32: - case cudf::type_id::DECIMAL64: result = cudf::strings::from_fixed_point(*column); break; + case cudf::type_id::DECIMAL64: + case cudf::type_id::DECIMAL128: result = cudf::strings::from_fixed_point(*column); break; default: JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "Invalid data type", 0); } } else if (column->type().id() == cudf::type_id::STRING) { diff --git a/java/src/test/java/ai/rapids/cudf/ColumnVectorTest.java b/java/src/test/java/ai/rapids/cudf/ColumnVectorTest.java index fa9052029cc..31a52eb2ec0 100644 --- a/java/src/test/java/ai/rapids/cudf/ColumnVectorTest.java +++ b/java/src/test/java/ai/rapids/cudf/ColumnVectorTest.java @@ -3372,6 +3372,22 @@ void testFixedWidthCast() { } } + @Test + void testCastBigDecimalToString() { + BigDecimal[] bigValues = {new BigDecimal("923121331938210123.321"), + new BigDecimal("9223372036854775808.191"), + new BigDecimal("9328323982309091029831.002") + }; + + try (ColumnVector cv = ColumnVector.fromDecimals(bigValues); + ColumnVector values = cv.castTo(DType.STRING); + ColumnVector expected = ColumnVector.fromStrings("923121331938210123.321", + "9223372036854775808.191", + "9328323982309091029831.002")) { + assertColumnsAreEqual(expected, values); + } + } + @Test void testCastStringToBigDecimal() { String[] bigValues = {"923121331938210123.321", From 5491cc789bbfbaad7099124dcfe004719e7f013c Mon Sep 17 00:00:00 2001 From: Karthikeyan <6488848+karthikeyann@users.noreply.github.com> Date: Thu, 2 Dec 2021 03:30:50 +0530 Subject: [PATCH 04/25] Fix memory error due to lambda return type deduction limitation (#9778) Fixes #9703 replace device lambda with device functor with return type. (due to [14. extended-lambda-restrictions](https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#extended-lambda-restrictions) ) ~add `__host__` to lambda for nvcc return type deduction to work properly.~ ~replaced `auto` (generic lambda) with `size_type`.~ fixes shared memory write error caused in #9703 Authors: - Karthikeyan (https://github.com/karthikeyann) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - David Wendt (https://github.com/davidwendt) - Jake Hemstad (https://github.com/jrhemstad) URL: https://github.com/rapidsai/cudf/pull/9778 --- cpp/src/sort/rank.cu | 13 +++++++++---- cpp/tests/sort/rank_test.cpp | 14 ++++++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/cpp/src/sort/rank.cu b/cpp/src/sort/rank.cu index e9589e6c4b3..de0a44e3234 100644 --- a/cpp/src/sort/rank.cu +++ b/cpp/src/sort/rank.cu @@ -194,6 +194,12 @@ void rank_max(cudf::device_span group_keys, stream); } +// Returns index, count +template +struct index_counter { + __device__ T operator()(size_type i) { return T{i, 1}; } +}; + void rank_average(cudf::device_span group_keys, column_view sorted_order_view, mutable_column_view rank_mutable_view, @@ -208,10 +214,9 @@ void rank_average(cudf::device_span group_keys, using MinCount = thrust::pair; tie_break_ranks_transform( group_keys, - cudf::detail::make_counting_transform_iterator(1, - [] __device__(auto i) { - return MinCount{i, 1}; - }), + // Use device functor with return type. Cannot use device lambda due to limitation. + // https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#extended-lambda-restrictions + cudf::detail::make_counting_transform_iterator(1, index_counter{}), sorted_order_view, rank_mutable_view.begin(), [] __device__(auto rank_count1, auto rank_count2) { diff --git a/cpp/tests/sort/rank_test.cpp b/cpp/tests/sort/rank_test.cpp index 94e389fc7ce..926ad1e203e 100644 --- a/cpp/tests/sort/rank_test.cpp +++ b/cpp/tests/sort/rank_test.cpp @@ -410,5 +410,19 @@ TYPED_TEST(Rank, min_desc_bottom_pct) this->run_all_tests(rank_method::MIN, desc_bottom, col1_rank, col2_rank, col3_rank, true); } +struct RankLarge : public BaseFixture { +}; + +TEST_F(RankLarge, average_large) +{ + // testcase of https://github.com/rapidsai/cudf/issues/9703 + auto iter = thrust::counting_iterator(0); + fixed_width_column_wrapper col1(iter, iter + 10558); + auto result = + cudf::rank(col1, rank_method::AVERAGE, {}, null_policy::EXCLUDE, null_order::AFTER, false); + fixed_width_column_wrapper expected(iter + 1, iter + 10559); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result->view(), expected); +} + } // namespace test } // namespace cudf From c10966cc3847ca9837ddc7ce5df9c4d9b7c743d8 Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Thu, 2 Dec 2021 18:48:03 +0800 Subject: [PATCH 05/25] Fix make_empty_scalar_like on list_type (#9759) Fixes #9758 In `make_empty_scalar_like`, we create list scalar with the list column itself, which is wrong. The correct way is with the child of list column. Authors: - Alfred Xu (https://github.com/sperlingxx) Approvers: - Nghia Truong (https://github.com/ttnghia) - Devavret Makkar (https://github.com/devavret) URL: https://github.com/rapidsai/cudf/pull/9759 --- cpp/src/scalar/scalar_factories.cpp | 7 +++++-- cpp/tests/reductions/reduction_tests.cpp | 8 ++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/cpp/src/scalar/scalar_factories.cpp b/cpp/src/scalar/scalar_factories.cpp index d2876435780..c18b57d220f 100644 --- a/cpp/src/scalar/scalar_factories.cpp +++ b/cpp/src/scalar/scalar_factories.cpp @@ -21,6 +21,7 @@ #include #include +#include #include namespace cudf { @@ -184,10 +185,12 @@ std::unique_ptr make_empty_scalar_like(column_view const& column, { std::unique_ptr result; switch (column.type().id()) { - case type_id::LIST: - result = make_list_scalar(empty_like(column)->view(), stream, mr); + case type_id::LIST: { + auto const empty_child = empty_like(lists_column_view(column).child()); + result = make_list_scalar(empty_child->view(), stream, mr); result->set_valid_async(false, stream); break; + } case type_id::STRUCT: // The input column must have at least 1 row to extract a scalar (row) from it. result = detail::get_element(column, 0, stream, mr); diff --git a/cpp/tests/reductions/reduction_tests.cpp b/cpp/tests/reductions/reduction_tests.cpp index d8ee8f9d08d..e138cd6f68e 100644 --- a/cpp/tests/reductions/reduction_tests.cpp +++ b/cpp/tests/reductions/reduction_tests.cpp @@ -1961,7 +1961,11 @@ struct ListReductionTest : public cudf::test::BaseFixture { cudf::reduce(input_data, agg, cudf::data_type(cudf::type_id::LIST)); auto list_result = dynamic_cast(result.get()); EXPECT_EQ(is_valid, list_result->is_valid()); - if (is_valid) { CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_value, list_result->view()); } + if (is_valid) { + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_value, list_result->view()); + } else { + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_value, list_result->view()); + } }; if (succeeded_condition) { @@ -2047,7 +2051,7 @@ TEST_F(ListReductionTest, NonValidListReductionNthElement) // test against empty input this->reduction_test(LCW{}, - ElementCol{{0}, {0}}, // expected_value, + ElementCol{}, // expected_value, true, false, cudf::make_nth_element_aggregation(0, cudf::null_policy::INCLUDE)); From 582cc6e466c7d941e1b34893fd56fbd42fe90d68 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Thu, 2 Dec 2021 21:12:01 +0800 Subject: [PATCH 06/25] Add sample JNI API (#9728) Add sample JNI Signed-off-by: Chong Gao Authors: - Chong Gao (https://github.com/res-life) Approvers: - Robert (Bobby) Evans (https://github.com/revans2) URL: https://github.com/rapidsai/cudf/pull/9728 --- java/src/main/java/ai/rapids/cudf/Table.java | 30 +++++++++++++++++++ java/src/main/native/src/TableJni.cpp | 15 ++++++++++ .../test/java/ai/rapids/cudf/TableTest.java | 21 +++++++++++++ 3 files changed, 66 insertions(+) diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index b0791fb440f..b11808ed023 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -678,6 +678,8 @@ private static native ContiguousTable[] contiguousSplitGroups(long inputTable, boolean[] keysDescending, boolean[] keysNullSmallest); + private static native long[] sample(long tableHandle, long n, boolean replacement, long seed); + ///////////////////////////////////////////////////////////////////////////// // TABLE CREATION APIs ///////////////////////////////////////////////////////////////////////////// @@ -2801,6 +2803,34 @@ public static Table fromPackedTable(ByteBuffer metadata, DeviceMemoryBuffer data return result; } + + /** + * Gather `n` samples from table randomly + * Note: does not preserve the ordering + * Example: + * input: {col1: {1, 2, 3, 4, 5}, col2: {6, 7, 8, 9, 10}} + * n: 3 + * replacement: false + * + * output: {col1: {3, 1, 4}, col2: {8, 6, 9}} + * + * replacement: true + * + * output: {col1: {3, 1, 1}, col2: {8, 6, 6}} + * + * throws "logic_error" if `n` > table rows and `replacement` == FALSE. + * throws "logic_error" if `n` < 0. + * + * @param n non-negative number of samples expected from table + * @param replacement Allow or disallow sampling of the same row more than once. + * @param seed Seed value to initiate random number generator. + * + * @return Table containing samples + */ + public Table sample(long n, boolean replacement, long seed) { + return new Table(sample(nativeHandle, n, replacement, seed)); + } + ///////////////////////////////////////////////////////////////////////////// // HELPER CLASSES ///////////////////////////////////////////////////////////////////////////// diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index a78d40a58f7..f3377bb002d 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -3147,4 +3148,18 @@ JNIEXPORT jobjectArray JNICALL Java_ai_rapids_cudf_Table_contiguousSplitGroups( CATCH_STD(env, NULL); } +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_sample(JNIEnv *env, jclass, jlong j_input, + jlong n, jboolean replacement, + jlong seed) { + JNI_NULL_CHECK(env, j_input, "input table is null", 0); + try { + cudf::jni::auto_set_device(env); + cudf::table_view *input = reinterpret_cast(j_input); + auto sample_with_replacement = + replacement ? cudf::sample_with_replacement::TRUE : cudf::sample_with_replacement::FALSE; + std::unique_ptr result = cudf::sample(*input, n, sample_with_replacement, seed); + return cudf::jni::convert_table_for_return(env, result); + } + CATCH_STD(env, 0); +} } // extern "C" diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index fa221e19387..0b2f56895e9 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -7584,4 +7584,25 @@ void testExplodeOuterPosition() { } } } + + @Test + void testSample() { + try (Table t = new Table.TestBuilder().column("s1", "s2", "s3", "s4", "s5").build()) { + try (Table ret = t.sample(3, false, 0); + Table expected = new Table.TestBuilder().column("s3", "s4", "s5").build()) { + assertTablesAreEqual(expected, ret); + } + + try (Table ret = t.sample(5, false, 0); + Table expected = new Table.TestBuilder().column("s3", "s4", "s5", "s2", "s1").build()) { + assertTablesAreEqual(expected, ret); + } + + try (Table ret = t.sample(8, true, 0); + Table expected = new Table.TestBuilder() + .column("s1", "s1", "s4", "s5", "s5", "s1", "s3", "s2").build()) { + assertTablesAreEqual(expected, ret); + } + } + } } From 1077daeaad8ff710de6f4fbb99f2e7371b4af8de Mon Sep 17 00:00:00 2001 From: brandon-b-miller <53796099+brandon-b-miller@users.noreply.github.com> Date: Thu, 2 Dec 2021 15:51:04 -0600 Subject: [PATCH 07/25] Fix caching in `Series.applymap` (#9821) The cache key we were generating for these functions didn't take into account the constants that could be different in the bytecode. Hence certain functions were causing cache hits when they actually differ by a constant value somewhere in the logic. Authors: - https://github.com/brandon-b-miller Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - Bradley Dice (https://github.com/bdice) - Ashwin Srinath (https://github.com/shwina) URL: https://github.com/rapidsai/cudf/pull/9821 --- python/cudf/cudf/tests/test_udf_masked_ops.py | 19 +++++++++++++++++++ python/cudf/cudf/utils/cudautils.py | 4 +++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index dc126546f15..c9c2c440632 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -593,3 +593,22 @@ def func(row, c, k): return y run_masked_udf_test(func, data, args=(1, 2), check_dtype=False) + + +def test_masked_udf_caching(): + # Make sure similar functions that differ + # by simple things like constants actually + # recompile + + data = cudf.Series([1, 2, 3]) + expect = data ** 2 + got = data.applymap(lambda x: x ** 2) + + assert_eq(expect, got, check_dtype=False) + + # update the constant value being used and make sure + # it does not result in a cache hit + + expect = data ** 3 + got = data.applymap(lambda x: x ** 3) + assert_eq(expect, got, check_dtype=False) diff --git a/python/cudf/cudf/utils/cudautils.py b/python/cudf/cudf/utils/cudautils.py index 5fa091a0081..f0533dcaa72 100755 --- a/python/cudf/cudf/utils/cudautils.py +++ b/python/cudf/cudf/utils/cudautils.py @@ -216,12 +216,14 @@ def make_cache_key(udf, sig): recompiling the same function for the same set of types """ codebytes = udf.__code__.co_code + constants = udf.__code__.co_consts if udf.__closure__ is not None: cvars = tuple([x.cell_contents for x in udf.__closure__]) cvarbytes = dumps(cvars) else: cvarbytes = b"" - return codebytes, cvarbytes, sig + + return constants, codebytes, cvarbytes, sig def compile_udf(udf, type_signature): From 50acf076d4a35bc57dc00a416f0d9507b1992c0f Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 2 Dec 2021 14:07:31 -0800 Subject: [PATCH 08/25] Fix stream usage in `segmented_gather()` (#9679) `detail::segmented_gather()` inadvertently uses `cuda_default_stream` in some parts of its implementation, while using the user-specified stream in others. This applies to the calls to `copy_range_in_place()`, `allocate_like()`, and `make_lists_column()`. ~This might produce race conditions, which might explain NVIDIA/spark-rapids/issues/4060. It's a rare failure that's quite hard to reproduce.~ This might lead to over-synchronization, though bad output is unlikely. The commit here should sort this out, by switching to the `detail` APIs corresponding to the calls above. Authors: - MithunR (https://github.com/mythrocks) Approvers: - Mike Wilson (https://github.com/hyperbolic2346) - Nghia Truong (https://github.com/ttnghia) - Karthikeyan (https://github.com/karthikeyann) URL: https://github.com/rapidsai/cudf/pull/9679 --- cpp/src/lists/copying/segmented_gather.cu | 21 ++++++++++++--------- cpp/src/lists/extract.cu | 2 +- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/cpp/src/lists/copying/segmented_gather.cu b/cpp/src/lists/copying/segmented_gather.cu index 8cbcddc1c58..41187b96cdb 100644 --- a/cpp/src/lists/copying/segmented_gather.cu +++ b/cpp/src/lists/copying/segmented_gather.cu @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include #include -#include #include #include #include @@ -88,14 +88,15 @@ std::unique_ptr segmented_gather(lists_column_view const& value_column, auto child = std::move(child_table->release().front()); // Create list offsets from gather_map. - auto output_offset = cudf::allocate_like( - gather_map.offsets(), gather_map.size() + 1, mask_allocation_policy::RETAIN, mr); + auto output_offset = cudf::detail::allocate_like( + gather_map.offsets(), gather_map.size() + 1, mask_allocation_policy::RETAIN, stream, mr); auto output_offset_view = output_offset->mutable_view(); - cudf::copy_range_in_place(gather_map.offsets(), - output_offset_view, - gather_map.offset(), - gather_map.offset() + output_offset_view.size(), - 0); + cudf::detail::copy_range_in_place(gather_map.offsets(), + output_offset_view, + gather_map.offset(), + gather_map.offset() + output_offset_view.size(), + 0, + stream); // Assemble list column & return auto null_mask = cudf::detail::copy_bitmask(value_column.parent(), stream, mr); size_type null_count = value_column.null_count(); @@ -103,7 +104,9 @@ std::unique_ptr segmented_gather(lists_column_view const& value_column, std::move(output_offset), std::move(child), null_count, - std::move(null_mask)); + std::move(null_mask), + stream, + mr); } } // namespace detail diff --git a/cpp/src/lists/extract.cu b/cpp/src/lists/extract.cu index 381864e1a68..7c6c612eb25 100644 --- a/cpp/src/lists/extract.cu +++ b/cpp/src/lists/extract.cu @@ -53,7 +53,7 @@ std::unique_ptr make_index_child(column_view const& indices, // `segmented_gather()` on a null index should produce a null row. if (not indices.nullable()) { return std::make_unique(indices, stream); } - auto const d_indices = column_device_view::create(indices); + auto const d_indices = column_device_view::create(indices, stream); // Replace null indices with MAX_SIZE_TYPE, so that gather() returns null for them. auto const null_replaced_iter_begin = cudf::detail::make_null_replacement_iterator(*d_indices, std::numeric_limits::max()); From b848dd5c9cfef7e3523810d67296e037f31945c1 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 2 Dec 2021 14:40:57 -0800 Subject: [PATCH 09/25] Fix ORC writer crash with empty input columns (#9808) Fixes https://github.com/rapidsai/cudf/issues/9783 Skip some parts of writing when the input table was zero rows. Add is_empty to `hostdevice_2dvector`. Add Python test with empty columns. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - Vyas Ramasubramani (https://github.com/vyasr) - Devavret Makkar (https://github.com/devavret) - Conor Hoekstra (https://github.com/codereport) URL: https://github.com/rapidsai/cudf/pull/9808 --- cpp/src/io/orc/writer_impl.cu | 338 +++++++++++---------- cpp/src/io/utilities/hostdevice_vector.hpp | 1 + python/cudf/cudf/tests/test_orc.py | 15 + 3 files changed, 188 insertions(+), 166 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index e53fb3589bc..db02125ce77 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -579,12 +579,15 @@ orc_streams writer::impl::create_streams(host_span columns, } auto const direct_data_size = - std::accumulate(segmentation.stripes.front().cbegin(), - segmentation.stripes.back().cend(), - size_t{0}, - [&](auto data_size, auto rg_idx) { - return data_size + column.host_dict_chunk(rg_idx)->string_char_count; - }); + segmentation.num_stripes() == 0 + ? 0 + : std::accumulate(segmentation.stripes.front().cbegin(), + segmentation.stripes.back().cend(), + size_t{0}, + [&](auto data_size, auto rg_idx) { + return data_size + + column.host_dict_chunk(rg_idx)->string_char_count; + }); if (enable_dict) { uint32_t dict_bits = 0; for (dict_bits = 1; dict_bits < 32; dict_bits <<= 1) { @@ -988,17 +991,19 @@ encoded_data encode_columns(orc_table_view const& orc_table, } chunk_streams.host_to_device(stream); - if (orc_table.num_string_columns() != 0) { - auto d_stripe_dict = orc_table.string_column(0).device_stripe_dict(); - gpu::EncodeStripeDictionaries(d_stripe_dict, - chunks, - orc_table.num_string_columns(), - segmentation.num_stripes(), - chunk_streams, - stream); - } + if (orc_table.num_rows() > 0) { + if (orc_table.num_string_columns() != 0) { + auto d_stripe_dict = orc_table.string_column(0).device_stripe_dict(); + gpu::EncodeStripeDictionaries(d_stripe_dict, + chunks, + orc_table.num_string_columns(), + segmentation.num_stripes(), + chunk_streams, + stream); + } - gpu::EncodeOrcColumnData(chunks, chunk_streams, stream); + gpu::EncodeOrcColumnData(chunks, chunk_streams, stream); + } dictionaries.data.clear(); dictionaries.index.clear(); stream.synchronize(); @@ -1803,7 +1808,7 @@ void writer::impl::write(table_view const& table) auto dictionaries = allocate_dictionaries(orc_table, rowgroup_bounds, stream); hostdevice_2dvector dict( rowgroup_bounds.size().first, orc_table.num_string_columns(), stream); - if (orc_table.num_string_columns() != 0) { + if (not dict.is_empty()) { init_dictionaries(orc_table, rowgroup_bounds, dictionaries.d_data_view, @@ -1819,7 +1824,7 @@ void writer::impl::write(table_view const& table) // Build stripe-level dictionaries hostdevice_2dvector stripe_dict( segmentation.num_stripes(), orc_table.num_string_columns(), stream); - if (orc_table.num_string_columns() != 0) { + if (not stripe_dict.is_empty()) { build_dictionaries(orc_table, segmentation.stripes, dict, @@ -1842,165 +1847,166 @@ void writer::impl::write(table_view const& table) segmentation.num_stripes(), num_data_streams, stream); auto stripes = gather_stripes(num_index_streams, segmentation, &enc_data.streams, &strm_descs); - // Gather column statistics - std::vector column_stats; - if (enable_statistics_ && table.num_columns() > 0 && num_rows > 0) { - column_stats = gather_statistic_blobs(orc_table, segmentation); - } + if (num_rows > 0) { + // Gather column statistics + auto const column_stats = enable_statistics_ && table.num_columns() > 0 + ? gather_statistic_blobs(orc_table, segmentation) + : std::vector{}; - // Allocate intermediate output stream buffer - size_t compressed_bfr_size = 0; - size_t num_compressed_blocks = 0; - size_t max_compressed_block_size = 0; - if (compression_kind_ != NONE) { - nvcompBatchedSnappyCompressGetMaxOutputChunkSize( - compression_blocksize_, nvcompBatchedSnappyDefaultOpts, &max_compressed_block_size); - } - auto stream_output = [&]() { - size_t max_stream_size = 0; - bool all_device_write = true; + // Allocate intermediate output stream buffer + size_t compressed_bfr_size = 0; + size_t num_compressed_blocks = 0; + size_t max_compressed_block_size = 0; + if (compression_kind_ != NONE) { + nvcompBatchedSnappyCompressGetMaxOutputChunkSize( + compression_blocksize_, nvcompBatchedSnappyDefaultOpts, &max_compressed_block_size); + } + auto stream_output = [&]() { + size_t max_stream_size = 0; + bool all_device_write = true; + + for (auto& ss : strm_descs.host_view().flat_view()) { + if (!out_sink_->is_device_write_preferred(ss.stream_size)) { all_device_write = false; } + size_t stream_size = ss.stream_size; + if (compression_kind_ != NONE) { + ss.first_block = num_compressed_blocks; + ss.bfr_offset = compressed_bfr_size; + + auto num_blocks = std::max( + (stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1); + stream_size += num_blocks * BLOCK_HEADER_SIZE; + num_compressed_blocks += num_blocks; + compressed_bfr_size += (max_compressed_block_size + BLOCK_HEADER_SIZE) * num_blocks; + } + max_stream_size = std::max(max_stream_size, stream_size); + } - for (auto& ss : strm_descs.host_view().flat_view()) { - if (!out_sink_->is_device_write_preferred(ss.stream_size)) { all_device_write = false; } - size_t stream_size = ss.stream_size; - if (compression_kind_ != NONE) { - ss.first_block = num_compressed_blocks; - ss.bfr_offset = compressed_bfr_size; - - auto num_blocks = std::max( - (stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1); - stream_size += num_blocks * BLOCK_HEADER_SIZE; - num_compressed_blocks += num_blocks; - compressed_bfr_size += (max_compressed_block_size + BLOCK_HEADER_SIZE) * num_blocks; + if (all_device_write) { + return pinned_buffer{nullptr, cudaFreeHost}; + } else { + return pinned_buffer{[](size_t size) { + uint8_t* ptr = nullptr; + CUDA_TRY(cudaMallocHost(&ptr, size)); + return ptr; + }(max_stream_size), + cudaFreeHost}; } - max_stream_size = std::max(max_stream_size, stream_size); - } + }(); - if (all_device_write) { - return pinned_buffer{nullptr, cudaFreeHost}; - } else { - return pinned_buffer{[](size_t size) { - uint8_t* ptr = nullptr; - CUDA_TRY(cudaMallocHost(&ptr, size)); - return ptr; - }(max_stream_size), - cudaFreeHost}; + // Compress the data streams + rmm::device_buffer compressed_data(compressed_bfr_size, stream); + hostdevice_vector comp_out(num_compressed_blocks, stream); + hostdevice_vector comp_in(num_compressed_blocks, stream); + if (compression_kind_ != NONE) { + strm_descs.host_to_device(stream); + gpu::CompressOrcDataStreams(static_cast(compressed_data.data()), + num_compressed_blocks, + compression_kind_, + compression_blocksize_, + max_compressed_block_size, + strm_descs, + enc_data.streams, + comp_in, + comp_out, + stream); + strm_descs.device_to_host(stream); + comp_out.device_to_host(stream, true); } - }(); - - // Compress the data streams - rmm::device_buffer compressed_data(compressed_bfr_size, stream); - hostdevice_vector comp_out(num_compressed_blocks, stream); - hostdevice_vector comp_in(num_compressed_blocks, stream); - if (compression_kind_ != NONE) { - strm_descs.host_to_device(stream); - gpu::CompressOrcDataStreams(static_cast(compressed_data.data()), - num_compressed_blocks, - compression_kind_, - compression_blocksize_, - max_compressed_block_size, - strm_descs, - enc_data.streams, - comp_in, - comp_out, - stream); - strm_descs.device_to_host(stream); - comp_out.device_to_host(stream, true); - } - ProtobufWriter pbw_(&buffer_); - - // Write stripes - std::vector> write_tasks; - for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) { - auto const& rowgroups_range = segmentation.stripes[stripe_id]; - auto& stripe = stripes[stripe_id]; - - stripe.offset = out_sink_->bytes_written(); - - // Column (skippable) index streams appear at the start of the stripe - for (size_type stream_id = 0; stream_id < num_index_streams; ++stream_id) { - write_index_stream(stripe_id, - stream_id, - orc_table.columns, - rowgroups_range, - enc_data.streams, - strm_descs, - comp_out, - &stripe, - &streams, - &pbw_); - } + ProtobufWriter pbw_(&buffer_); + + // Write stripes + std::vector> write_tasks; + for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) { + auto const& rowgroups_range = segmentation.stripes[stripe_id]; + auto& stripe = stripes[stripe_id]; + + stripe.offset = out_sink_->bytes_written(); + + // Column (skippable) index streams appear at the start of the stripe + for (size_type stream_id = 0; stream_id < num_index_streams; ++stream_id) { + write_index_stream(stripe_id, + stream_id, + orc_table.columns, + rowgroups_range, + enc_data.streams, + strm_descs, + comp_out, + &stripe, + &streams, + &pbw_); + } - // Column data consisting one or more separate streams - for (auto const& strm_desc : strm_descs[stripe_id]) { - write_tasks.push_back( - write_data_stream(strm_desc, - enc_data.streams[strm_desc.column_id][rowgroups_range.first], - static_cast(compressed_data.data()), - stream_output.get(), - &stripe, - &streams)); - } + // Column data consisting one or more separate streams + for (auto const& strm_desc : strm_descs[stripe_id]) { + write_tasks.push_back( + write_data_stream(strm_desc, + enc_data.streams[strm_desc.column_id][rowgroups_range.first], + static_cast(compressed_data.data()), + stream_output.get(), + &stripe, + &streams)); + } - // Write stripefooter consisting of stream information - StripeFooter sf; - sf.streams = streams; - sf.columns.resize(orc_table.num_columns() + 1); - sf.columns[0].kind = DIRECT; - for (size_t i = 1; i < sf.columns.size(); ++i) { - sf.columns[i].kind = orc_table.column(i - 1).orc_encoding(); - sf.columns[i].dictionarySize = - (sf.columns[i].kind == DICTIONARY_V2) - ? orc_table.column(i - 1).host_stripe_dict(stripe_id)->num_strings - : 0; - if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } + // Write stripefooter consisting of stream information + StripeFooter sf; + sf.streams = streams; + sf.columns.resize(orc_table.num_columns() + 1); + sf.columns[0].kind = DIRECT; + for (size_t i = 1; i < sf.columns.size(); ++i) { + sf.columns[i].kind = orc_table.column(i - 1).orc_encoding(); + sf.columns[i].dictionarySize = + (sf.columns[i].kind == DICTIONARY_V2) + ? orc_table.column(i - 1).host_stripe_dict(stripe_id)->num_strings + : 0; + if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } + } + buffer_.resize((compression_kind_ != NONE) ? 3 : 0); + pbw_.write(sf); + stripe.footerLength = buffer_.size(); + if (compression_kind_ != NONE) { + uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; + buffer_[0] = static_cast(uncomp_sf_len >> 0); + buffer_[1] = static_cast(uncomp_sf_len >> 8); + buffer_[2] = static_cast(uncomp_sf_len >> 16); + } + out_sink_->host_write(buffer_.data(), buffer_.size()); } - buffer_.resize((compression_kind_ != NONE) ? 3 : 0); - pbw_.write(sf); - stripe.footerLength = buffer_.size(); - if (compression_kind_ != NONE) { - uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; - buffer_[0] = static_cast(uncomp_sf_len >> 0); - buffer_[1] = static_cast(uncomp_sf_len >> 8); - buffer_[2] = static_cast(uncomp_sf_len >> 16); + for (auto const& task : write_tasks) { + task.wait(); } - out_sink_->host_write(buffer_.data(), buffer_.size()); - } - for (auto const& task : write_tasks) { - task.wait(); - } - if (column_stats.size() != 0) { - // File-level statistics - // NOTE: Excluded from chunked write mode to avoid the need for merging stats across calls - if (single_write_mode) { - // First entry contains total number of rows - buffer_.resize(0); - pbw_.putb(1 * 8 + PB_TYPE_VARINT); - pbw_.put_uint(num_rows); - ff.statistics.reserve(1 + orc_table.num_columns()); - ff.statistics.emplace_back(std::move(buffer_)); - // Add file stats, stored after stripe stats in `column_stats` - ff.statistics.insert( - ff.statistics.end(), - std::make_move_iterator(column_stats.begin()) + stripes.size() * orc_table.num_columns(), - std::make_move_iterator(column_stats.end())); - } - // Stripe-level statistics - size_t first_stripe = md.stripeStats.size(); - md.stripeStats.resize(first_stripe + stripes.size()); - for (size_t stripe_id = 0; stripe_id < stripes.size(); stripe_id++) { - md.stripeStats[first_stripe + stripe_id].colStats.resize(1 + orc_table.num_columns()); - buffer_.resize(0); - pbw_.putb(1 * 8 + PB_TYPE_VARINT); - pbw_.put_uint(stripes[stripe_id].numberOfRows); - md.stripeStats[first_stripe + stripe_id].colStats[0] = std::move(buffer_); - for (size_t col_idx = 0; col_idx < orc_table.num_columns(); col_idx++) { - size_t idx = stripes.size() * col_idx + stripe_id; - if (idx < column_stats.size()) { - md.stripeStats[first_stripe + stripe_id].colStats[1 + col_idx] = - std::move(column_stats[idx]); + if (not column_stats.empty()) { + // File-level statistics + // NOTE: Excluded from chunked write mode to avoid the need for merging stats across calls + if (single_write_mode) { + // First entry contains total number of rows + buffer_.resize(0); + pbw_.putb(1 * 8 + PB_TYPE_VARINT); + pbw_.put_uint(num_rows); + ff.statistics.reserve(1 + orc_table.num_columns()); + ff.statistics.emplace_back(std::move(buffer_)); + // Add file stats, stored after stripe stats in `column_stats` + ff.statistics.insert( + ff.statistics.end(), + std::make_move_iterator(column_stats.begin()) + stripes.size() * orc_table.num_columns(), + std::make_move_iterator(column_stats.end())); + } + // Stripe-level statistics + size_t first_stripe = md.stripeStats.size(); + md.stripeStats.resize(first_stripe + stripes.size()); + for (size_t stripe_id = 0; stripe_id < stripes.size(); stripe_id++) { + md.stripeStats[first_stripe + stripe_id].colStats.resize(1 + orc_table.num_columns()); + buffer_.resize(0); + pbw_.putb(1 * 8 + PB_TYPE_VARINT); + pbw_.put_uint(stripes[stripe_id].numberOfRows); + md.stripeStats[first_stripe + stripe_id].colStats[0] = std::move(buffer_); + for (size_t col_idx = 0; col_idx < orc_table.num_columns(); col_idx++) { + size_t idx = stripes.size() * col_idx + stripe_id; + if (idx < column_stats.size()) { + md.stripeStats[first_stripe + stripe_id].colStats[1 + col_idx] = + std::move(column_stats[idx]); + } } } } diff --git a/cpp/src/io/utilities/hostdevice_vector.hpp b/cpp/src/io/utilities/hostdevice_vector.hpp index 283715478a0..a7f9aec7bb4 100644 --- a/cpp/src/io/utilities/hostdevice_vector.hpp +++ b/cpp/src/io/utilities/hostdevice_vector.hpp @@ -179,6 +179,7 @@ class hostdevice_2dvector { auto size() const noexcept { return _size; } auto count() const noexcept { return _size.first * _size.second; } + auto is_empty() const noexcept { return count() == 0; } T* base_host_ptr(size_t offset = 0) { return _data.host_ptr(offset); } T* base_device_ptr(size_t offset = 0) { return _data.device_ptr(offset); } diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 6b02874146e..dc176992434 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1526,3 +1526,18 @@ def test_orc_writer_rle_stream_size(datadir, tmpdir): # Segfaults when RLE stream sizes don't account for varint length pa_out = pa.orc.ORCFile(reencoded).read() assert_eq(df.to_pandas(), pa_out) + + +def test_empty_columns(): + buffer = BytesIO() + # string and decimal columns have additional steps that need to be skipped + expected = cudf.DataFrame( + { + "string": cudf.Series([], dtype="str"), + "decimal": cudf.Series([], dtype=cudf.Decimal64Dtype(10, 1)), + } + ) + expected.to_orc(buffer, compression="snappy") + + got_df = cudf.read_orc(buffer) + assert_eq(expected, got_df) From 0c08543955a01470baa4fbdbab927298dcf6afd9 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Fri, 3 Dec 2021 04:53:37 +0530 Subject: [PATCH 10/25] Update cmake and conda to 22.02 (#9746) Changes related to update to 22.02 in one conda environment recipe (only 11.5) was missed. This adds that. Also makes project version changes in cmake related to update from 21.12 to 22.02. Authors: - Devavret Makkar (https://github.com/devavret) Approvers: - Robert Maynard (https://github.com/robertmaynard) - Ray Douglass (https://github.com/raydouglass) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/9746 --- ci/release/update-version.sh | 6 +++--- cpp/CMakeLists.txt | 2 +- cpp/libcudf_kafka/CMakeLists.txt | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh index eeb76a15fcc..86432a92128 100755 --- a/ci/release/update-version.sh +++ b/ci/release/update-version.sh @@ -30,13 +30,13 @@ function sed_runner() { } # cpp update -sed_runner 's/'"CUDF VERSION .* LANGUAGES"'/'"CUDF VERSION ${NEXT_FULL_TAG} LANGUAGES"'/g' cpp/CMakeLists.txt +sed_runner 's/'"VERSION ${CURRENT_SHORT_TAG}.*"'/'"VERSION ${NEXT_FULL_TAG}"'/g' cpp/CMakeLists.txt # cpp libcudf_kafka update -sed_runner 's/'"CUDA_KAFKA VERSION .* LANGUAGES"'/'"CUDA_KAFKA VERSION ${NEXT_FULL_TAG} LANGUAGES"'/g' cpp/libcudf_kafka/CMakeLists.txt +sed_runner 's/'"VERSION ${CURRENT_SHORT_TAG}.*"'/'"VERSION ${NEXT_FULL_TAG}"'/g' cpp/libcudf_kafka/CMakeLists.txt # cpp cudf_jni update -sed_runner 's/'"CUDF_JNI VERSION .* LANGUAGES"'/'"CUDF_JNI VERSION ${NEXT_FULL_TAG} LANGUAGES"'/g' java/src/main/native/CMakeLists.txt +sed_runner 's/'"VERSION ${CURRENT_SHORT_TAG}.*"'/'"VERSION ${NEXT_FULL_TAG}"'/g' java/src/main/native/CMakeLists.txt # rapids-cmake version sed_runner 's/'"branch-.*\/RAPIDS.cmake"'/'"branch-${NEXT_SHORT_TAG}\/RAPIDS.cmake"'/g' fetch_rapids.cmake diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 50bdc30b292..e2b317f2e03 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -25,7 +25,7 @@ rapids_cuda_init_architectures(CUDF) project( CUDF - VERSION 21.12.00 + VERSION 22.02.00 LANGUAGES C CXX CUDA ) diff --git a/cpp/libcudf_kafka/CMakeLists.txt b/cpp/libcudf_kafka/CMakeLists.txt index 435ff3b5987..d0874b57c2d 100644 --- a/cpp/libcudf_kafka/CMakeLists.txt +++ b/cpp/libcudf_kafka/CMakeLists.txt @@ -22,7 +22,7 @@ include(rapids-find) project( CUDA_KAFKA - VERSION 21.12.00 + VERSION 22.02.00 LANGUAGES CXX ) From ce64e53264d21c6e59fe98548796a7b6bae24c07 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Thu, 2 Dec 2021 20:19:12 -0600 Subject: [PATCH 11/25] Add directory-partitioned data support to cudf.read_parquet (#9720) Closes #9684 Closes #9690 This PR refactors path handling in `cudf.read_parquet` and uses `pyarrow.dataset` to support for directory-partitioned datasets (with full filterings support at row-group granularity). Since it is my understanding that some users may wish for directory-partitioned columns to be represented as a raw dtype (rather than always becoming categorical), I also added an optional `categorical_partitions` argument (open to suggestions on a better name). Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - Benjamin Zaitlen (https://github.com/quasiben) URL: https://github.com/rapidsai/cudf/pull/9720 --- python/cudf/cudf/io/json.py | 2 +- python/cudf/cudf/io/orc.py | 2 +- python/cudf/cudf/io/parquet.py | 286 +++++++++++++++++++---- python/cudf/cudf/tests/test_parquet.py | 94 +++++++- python/cudf/cudf/tests/test_s3.py | 9 +- python/cudf/cudf/utils/ioutils.py | 26 ++- python/dask_cudf/dask_cudf/io/parquet.py | 7 +- 7 files changed, 355 insertions(+), 71 deletions(-) diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index a48cfd07d3f..1f876214b16 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -37,7 +37,7 @@ def read_json( for source in path_or_buf: if ioutils.is_directory(source, **kwargs): fs = ioutils._ensure_filesystem( - passed_filesystem=None, path=source + passed_filesystem=None, path=source, **kwargs ) source = ioutils.stringify_pathlike(source) source = fs.sep.join([source, "*.json"]) diff --git a/python/cudf/cudf/io/orc.py b/python/cudf/cudf/io/orc.py index ecb1b0cd185..c1cce3f996f 100644 --- a/python/cudf/cudf/io/orc.py +++ b/python/cudf/cudf/io/orc.py @@ -316,7 +316,7 @@ def read_orc( for source in filepath_or_buffer: if ioutils.is_directory(source, **kwargs): fs = ioutils._ensure_filesystem( - passed_filesystem=None, path=source + passed_filesystem=None, path=source, **kwargs, ) source = stringify_path(source) source = fs.sep.join([source, "*.orc"]) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 9d665d9a0a5..04d64969a16 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -12,6 +12,7 @@ import cudf from cudf._lib import parquet as libparquet from cudf.api.types import is_list_like +from cudf.core.column import as_column, build_categorical_column from cudf.utils import ioutils @@ -80,7 +81,7 @@ def write_to_dataset( kwargs for to_parquet function. """ - fs = ioutils._ensure_filesystem(fs, root_path) + fs = ioutils._ensure_filesystem(fs, root_path, **kwargs) fs.mkdirs(root_path, exist_ok=True) metadata = [] @@ -163,11 +164,19 @@ def read_parquet_metadata(path): return num_rows, num_row_groups, col_names -def _process_row_groups(paths, fs, filters=None, row_groups=None): +def _process_dataset( + paths, fs, filters=None, row_groups=None, categorical_partitions=True, +): + # Returns: + # file_list - Expanded/filtered list of paths + # row_groups - Filtered list of row-group selections + # partition_keys - list of partition keys for each file + # partition_categories - Categories for each partition # The general purpose of this function is to (1) expand # directory input into a list of paths (using the pyarrow - # dataset API), and (2) to apply row-group filters. + # dataset API), (2) to apply row-group filters, and (3) + # to discover directory-partitioning information # Deal with case that the user passed in a directory name file_list = paths @@ -186,28 +195,107 @@ def _process_row_groups(paths, fs, filters=None, row_groups=None): if len(file_list) == 0: raise FileNotFoundError(f"{paths} could not be resolved to any files") - if filters is not None: - # Load IDs of filtered row groups for each file in dataset - filtered_rg_ids = defaultdict(list) - for fragment in dataset.get_fragments(filter=filters): - for rg_fragment in fragment.split_by_row_group(filters): - for rg_info in rg_fragment.row_groups: - filtered_rg_ids[rg_fragment.path].append(rg_info.id) - - # Initialize row_groups to be selected - if row_groups is None: - row_groups = [None for _ in dataset.files] - - # Store IDs of selected row groups for each file - for i, file in enumerate(dataset.files): - if row_groups[i] is None: - row_groups[i] = filtered_rg_ids[file] - else: - row_groups[i] = filter( - lambda id: id in row_groups[i], filtered_rg_ids[file] + # Deal with directory partitioning + # Get all partition keys (without filters) + partition_categories = defaultdict(list) + file_fragment = None + for file_fragment in dataset.get_fragments(): + keys = ds._get_partition_keys(file_fragment.partition_expression) + if not (keys or partition_categories): + # Bail - This is not a directory-partitioned dataset + break + for k, v in keys.items(): + if v not in partition_categories[k]: + partition_categories[k].append(v) + if not categorical_partitions: + # Bail - We don't need to discover all categories. + # We only need to save the partition keys from this + # first `file_fragment` + break + + if partition_categories and file_fragment is not None: + # Check/correct order of `categories` using last file_frag, + # because `_get_partition_keys` does NOT preserve the + # partition-hierarchy order of the keys. + cat_keys = [ + part.split("=")[0] + for part in file_fragment.path.split(fs.sep) + if "=" in part + ] + if set(partition_categories) == set(cat_keys): + partition_categories = { + k: partition_categories[k] + for k in cat_keys + if k in partition_categories + } + + # If we do not have partitioned data and + # are not filtering, we can return here + if filters is None and not partition_categories: + return file_list, row_groups, [], {} + + # Record initial row_groups input + row_groups_map = {} + if row_groups is not None: + # Make sure paths and row_groups map 1:1 + # and save the initial mapping + if len(paths) != len(file_list): + raise ValueError( + "Cannot specify a row_group selection for a directory path." + ) + row_groups_map = {path: rgs for path, rgs in zip(paths, row_groups)} + + # Apply filters and discover partition columns + partition_keys = [] + if partition_categories or filters is not None: + file_list = [] + if filters is not None: + row_groups = [] + for file_fragment in dataset.get_fragments(filter=filters): + path = file_fragment.path + + # Extract hive-partition keys, and make sure they + # are orederd the same as they are in `partition_categories` + if partition_categories: + raw_keys = ds._get_partition_keys( + file_fragment.partition_expression + ) + partition_keys.append( + [ + (name, raw_keys[name]) + for name in partition_categories.keys() + ] ) - return file_list, row_groups + # Apply row-group filtering + selection = row_groups_map.get(path, None) + if selection is not None or filters is not None: + filtered_row_groups = [ + rg_info.id + for rg_fragment in file_fragment.split_by_row_group( + filters, schema=dataset.schema, + ) + for rg_info in rg_fragment.row_groups + ] + file_list.append(path) + if filters is not None: + if selection is None: + row_groups.append(filtered_row_groups) + else: + row_groups.append( + [ + rg_id + for rg_id in filtered_row_groups + if rg_id in selection + ] + ) + + return ( + file_list, + row_groups, + partition_keys, + partition_categories if categorical_partitions else {}, + ) def _get_byte_ranges(file_list, row_groups, columns, fs, **kwargs): @@ -319,6 +407,7 @@ def read_parquet( strings_to_categorical=False, use_pandas_metadata=True, use_python_file_object=False, + categorical_partitions=True, *args, **kwargs, ): @@ -345,17 +434,29 @@ def read_parquet( # Start by trying construct a filesystem object, so we # can apply filters on remote file-systems fs, paths = ioutils._get_filesystem_and_paths(filepath_or_buffer, **kwargs) - filepath_or_buffer = paths if paths else filepath_or_buffer - if fs is None and filters is not None: - raise ValueError("cudf cannot apply filters to open file objects.") - # Apply filters now (before converting non-local paths to buffers). - # Note that `_process_row_groups` will also expand `filepath_or_buffer` - # into a full list of files if it is a directory. - if fs is not None: - filepath_or_buffer, row_groups = _process_row_groups( - filepath_or_buffer, fs, filters=filters, row_groups=row_groups, + # Use pyarrow dataset to detect/process directory-partitioned + # data and apply filters. Note that we can only support partitioned + # data and filtering if the input is a single directory or list of + # paths. + partition_keys = [] + partition_categories = {} + if fs and paths: + ( + paths, + row_groups, + partition_keys, + partition_categories, + ) = _process_dataset( + paths, + fs, + filters=filters, + row_groups=row_groups, + categorical_partitions=categorical_partitions, ) + elif filters is not None: + raise ValueError("cudf cannot apply filters to open file objects.") + filepath_or_buffer = paths if paths else filepath_or_buffer # Check if we should calculate the specific byte-ranges # needed for each parquet file. We always do this when we @@ -380,15 +481,6 @@ def read_parquet( filepaths_or_buffers = [] for i, source in enumerate(filepath_or_buffer): - if ioutils.is_directory(source, **kwargs): - # Note: For now, we know `fs` is an fsspec filesystem - # object, but it may be an arrow object in the future - fsspec_fs = ioutils._ensure_filesystem( - passed_filesystem=fs, path=source - ) - source = ioutils.stringify_pathlike(source) - source = fsspec_fs.sep.join([source, "*.parquet"]) - tmp_source, compression = ioutils.get_filepath_or_buffer( path_or_data=source, compression=None, @@ -410,6 +502,117 @@ def read_parquet( else: filepaths_or_buffers.append(tmp_source) + # Warn user if they are not using cudf for IO + # (There is a good chance this was not the intention) + if engine != "cudf": + warnings.warn( + "Using CPU via PyArrow to read Parquet dataset." + "This option is both inefficient and unstable!" + ) + if filters is not None: + warnings.warn( + "Parquet row-group filtering is only supported with " + "'engine=cudf'. Use pandas or pyarrow API directly " + "for full CPU-based filtering functionality." + ) + + return _parquet_to_frame( + filepaths_or_buffers, + engine, + *args, + columns=columns, + row_groups=row_groups, + skiprows=skiprows, + num_rows=num_rows, + strings_to_categorical=strings_to_categorical, + use_pandas_metadata=use_pandas_metadata, + partition_keys=partition_keys, + partition_categories=partition_categories, + **kwargs, + ) + + +def _parquet_to_frame( + paths_or_buffers, + *args, + row_groups=None, + partition_keys=None, + partition_categories=None, + **kwargs, +): + + # If this is not a partitioned read, only need + # one call to `_read_parquet` + if not partition_keys: + return _read_parquet( + paths_or_buffers, *args, row_groups=row_groups, **kwargs, + ) + + # For partitioned data, we need a distinct read for each + # unique set of partition keys. Therefore, we start by + # aggregating all paths with matching keys using a dict + plan = {} + for i, (keys, path) in enumerate(zip(partition_keys, paths_or_buffers)): + rgs = row_groups[i] if row_groups else None + tkeys = tuple(keys) + if tkeys in plan: + plan[tkeys][0].append(path) + if rgs is not None: + plan[tkeys][1].append(rgs) + else: + plan[tkeys] = ([path], None if rgs is None else [rgs]) + + dfs = [] + for part_key, (key_paths, key_row_groups) in plan.items(): + # Add new DataFrame to our list + dfs.append( + _read_parquet( + key_paths, *args, row_groups=key_row_groups, **kwargs, + ) + ) + # Add partition columns to the last DataFrame + for (name, value) in part_key: + if partition_categories and name in partition_categories: + # Build the categorical column from `codes` + codes = as_column( + partition_categories[name].index(value), + length=len(dfs[-1]), + ) + dfs[-1][name] = build_categorical_column( + categories=partition_categories[name], + codes=codes, + size=codes.size, + offset=codes.offset, + ordered=False, + ) + else: + # Not building categorical columns, so + # `value` is already what we want + dfs[-1][name] = as_column(value, length=len(dfs[-1])) + + # Concatenate dfs and return. + # Assume we can ignore the index if it has no name. + return ( + cudf.concat(dfs, ignore_index=dfs[-1].index.name is None) + if len(dfs) > 1 + else dfs[0] + ) + + +def _read_parquet( + filepaths_or_buffers, + engine, + columns=None, + row_groups=None, + skiprows=None, + num_rows=None, + strings_to_categorical=None, + use_pandas_metadata=None, + *args, + **kwargs, +): + # Simple helper function to dispatch between + # cudf and pyarrow to read parquet data if engine == "cudf": return libparquet.read_parquet( filepaths_or_buffers, @@ -421,7 +624,6 @@ def read_parquet( use_pandas_metadata=use_pandas_metadata, ) else: - warnings.warn("Using CPU via PyArrow to read Parquet dataset.") return cudf.DataFrame.from_arrow( pq.ParquetDataset(filepaths_or_buffers).read_pandas( columns=columns, *args, **kwargs diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index b6595be9566..516ee0d17d3 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1578,7 +1578,7 @@ def test_parquet_writer_bytes_io(simple_gdf): @pytest.mark.parametrize("filename", ["myfile.parquet", None]) @pytest.mark.parametrize("cols", [["b"], ["c", "b"]]) -def test_parquet_write_partitioned(tmpdir_factory, cols, filename): +def test_parquet_partitioned(tmpdir_factory, cols, filename): # Checks that write_to_dataset is wrapping to_parquet # as expected gdf_dir = str(tmpdir_factory.mktemp("gdf_dir")) @@ -1597,10 +1597,14 @@ def test_parquet_write_partitioned(tmpdir_factory, cols, filename): gdf_dir, index=False, partition_cols=cols, partition_file_name=filename ) - # Use pandas since dataset may be partitioned - expect = pd.read_parquet(pdf_dir) - got = pd.read_parquet(gdf_dir) - assert_eq(expect, got) + # Read back with pandas to compare + expect_pd = pd.read_parquet(pdf_dir) + got_pd = pd.read_parquet(gdf_dir) + assert_eq(expect_pd, got_pd) + + # Check that cudf and pd return the same read + got_cudf = cudf.read_parquet(gdf_dir) + assert_eq(got_pd, got_cudf) # If filename is specified, check that it is correct if filename: @@ -1629,9 +1633,9 @@ def test_parquet_write_to_dataset(tmpdir_factory, cols): gdf.to_parquet(dir1, partition_cols=cols) cudf.io.write_to_dataset(gdf, dir2, partition_cols=cols) - # cudf read_parquet cannot handle partitioned dataset - expect = pd.read_parquet(dir1) - got = pd.read_parquet(dir2) + # Read back with cudf + expect = cudf.read_parquet(dir1) + got = cudf.read_parquet(dir2) assert_eq(expect, got) gdf = cudf.DataFrame( @@ -1645,6 +1649,80 @@ def test_parquet_write_to_dataset(tmpdir_factory, cols): gdf.to_parquet(dir1, partition_cols=cols) +@pytest.mark.parametrize( + "pfilters", [[("b", "==", "b")], [("b", "==", "a"), ("c", "==", 1)]], +) +@pytest.mark.parametrize("selection", ["directory", "files", "row-groups"]) +@pytest.mark.parametrize("use_cat", [True, False]) +def test_read_parquet_partitioned_filtered( + tmpdir, pfilters, selection, use_cat +): + path = str(tmpdir) + size = 100 + df = cudf.DataFrame( + { + "a": np.arange(0, stop=size, dtype="int64"), + "b": np.random.choice(list("abcd"), size=size), + "c": np.random.choice(np.arange(4), size=size), + } + ) + df.to_parquet(path, partition_cols=["c", "b"]) + + if selection == "files": + # Pass in a list of paths + fs = get_fs_token_paths(path)[0] + read_path = fs.find(path) + row_groups = None + elif selection == "row-groups": + # Pass in a list of paths AND row-group ids + fs = get_fs_token_paths(path)[0] + read_path = fs.find(path) + row_groups = [[0] for p in read_path] + else: + # Pass in a directory path + # (row-group selection not allowed in this case) + read_path = path + row_groups = None + + # Filter on partitioned columns + expect = pd.read_parquet(read_path, filters=pfilters) + got = cudf.read_parquet( + read_path, + filters=pfilters, + row_groups=row_groups, + categorical_partitions=use_cat, + ) + if use_cat: + assert got.dtypes["b"] == "category" + assert got.dtypes["c"] == "category" + else: + # Check that we didn't get categorical + # columns, but convert back to categorical + # for comparison with pandas + assert got.dtypes["b"] == "object" + assert got.dtypes["c"] == "int" + got["b"] = pd.Categorical( + got["b"].to_pandas(), categories=list("abcd") + ) + got["c"] = pd.Categorical( + got["c"].to_pandas(), categories=np.arange(4) + ) + assert_eq(expect, got) + + # Filter on non-partitioned column. + # Cannot compare to pandas, since the pyarrow + # backend will filter by row (and cudf can + # only filter by column, for now) + filters = [("a", "==", 10)] + got = cudf.read_parquet(read_path, filters=filters, row_groups=row_groups,) + assert len(got) < len(df) and 10 in got["a"] + + # Filter on both kinds of columns + filters = [[("a", "==", 10)], [("c", "==", 1)]] + got = cudf.read_parquet(read_path, filters=filters, row_groups=row_groups,) + assert len(got) < len(df) and (1 in got["c"] and 10 in got["a"]) + + def test_parquet_writer_chunked_metadata(tmpdir, simple_pdf, simple_gdf): gdf_fname = tmpdir.join("gdf.parquet") test_path = "test/path" diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index dea876891f8..5738e1f0d00 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -346,12 +346,17 @@ def test_read_parquet_filters(s3_base, s3so, pdf, python_file): assert_eq(pdf.iloc[:0], got.reset_index(drop=True)) -def test_write_parquet(s3_base, s3so, pdf): +@pytest.mark.parametrize("partition_cols", [None, ["String"]]) +def test_write_parquet(s3_base, s3so, pdf, partition_cols): fname = "test_parquet_writer.parquet" bname = "parquet" gdf = cudf.from_pandas(pdf) with s3_context(s3_base=s3_base, bucket=bname) as s3fs: - gdf.to_parquet("s3://{}/{}".format(bname, fname), storage_options=s3so) + gdf.to_parquet( + "s3://{}/{}".format(bname, fname), + partition_cols=partition_cols, + storage_options=s3so, + ) assert s3fs.exists("s3://{}/{}".format(bname, fname)) got = pd.read_parquet(s3fs.open("s3://{}/{}".format(bname, fname))) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 0f9d9d53b23..e6c031acac7 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -154,6 +154,9 @@ strings_to_categorical : boolean, default False If True, return string columns as GDF_CATEGORY dtype; if False, return a as GDF_STRING dtype. +categorical_partitions : boolean, default True + Whether directory-partitioned columns should be interpreted as categorical + or raw dtypes. use_pandas_metadata : boolean, default True If True and dataset has custom PANDAS schema metadata, ensure that index columns are also loaded. @@ -1129,7 +1132,7 @@ def ensure_single_filepath_or_buffer(path_or_data, **kwargs): storage_options = kwargs.get("storage_options") path_or_data = os.path.expanduser(path_or_data) try: - fs, _, paths = fsspec.get_fs_token_paths( + fs, _, paths = get_fs_token_paths( path_or_data, mode="rb", storage_options=storage_options ) except ValueError as e: @@ -1153,9 +1156,9 @@ def is_directory(path_or_data, **kwargs): storage_options = kwargs.get("storage_options") path_or_data = os.path.expanduser(path_or_data) try: - fs, _, paths = fsspec.get_fs_token_paths( + fs = get_fs_token_paths( path_or_data, mode="rb", storage_options=storage_options - ) + )[0] except ValueError as e: if str(e).startswith("Protocol not known"): return False @@ -1189,10 +1192,8 @@ def _get_filesystem_and_paths(path_or_data, **kwargs): else: path_or_data = [path_or_data] - # Pyarrow did not support the protocol or storage options. - # Fall back to fsspec try: - fs, _, fs_paths = fsspec.get_fs_token_paths( + fs, _, fs_paths = get_fs_token_paths( path_or_data, mode="rb", storage_options=storage_options ) return_paths = fs_paths @@ -1322,9 +1323,9 @@ def get_writer_filepath_or_buffer(path_or_data, mode, **kwargs): if isinstance(path_or_data, str): storage_options = kwargs.get("storage_options", {}) path_or_data = os.path.expanduser(path_or_data) - fs, _, _ = fsspec.get_fs_token_paths( + fs = get_fs_token_paths( path_or_data, mode=mode or "w", storage_options=storage_options - ) + )[0] if not _is_local_filesystem(fs): filepath_or_buffer = fsspec.open( @@ -1513,11 +1514,12 @@ def _prepare_filters(filters): return filters -def _ensure_filesystem(passed_filesystem, path): +def _ensure_filesystem(passed_filesystem, path, **kwargs): if passed_filesystem is None: - return get_fs_token_paths(path[0] if isinstance(path, list) else path)[ - 0 - ] + return get_fs_token_paths( + path[0] if isinstance(path, list) else path, + storage_options=kwargs.get("storage_options", {}), + )[0] return passed_filesystem diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index b47a5e78095..a49d73493ec 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -126,11 +126,8 @@ def _read_paths( # Build the column from `codes` directly # (since the category is often a larger dtype) - codes = ( - as_column(partitions[i].keys.index(index2)) - .as_frame() - .repeat(len(df)) - ._data[None] + codes = as_column( + partitions[i].keys.index(index2), length=len(df), ) df[name] = build_categorical_column( categories=partitions[i].keys, From e82cc62e2ea61211c64ba4784cb131d5b535644c Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Fri, 3 Dec 2021 04:46:25 -0800 Subject: [PATCH 12/25] Fix join of MultiIndex to Index with one column and overlapping name. (#9830) This PR resolves #9823 Authors: - Vyas Ramasubramani (https://github.com/vyasr) - Ashwin Srinath (https://github.com/shwina) Approvers: - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/9830 --- python/cudf/cudf/core/_base_index.py | 4 ++-- python/cudf/cudf/tests/test_joining.py | 13 +++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/core/_base_index.py b/python/cudf/cudf/core/_base_index.py index d688b75ed14..2fcc976d8e1 100644 --- a/python/cudf/cudf/core/_base_index.py +++ b/python/cudf/cudf/core/_base_index.py @@ -1147,14 +1147,14 @@ def join( if isinstance(lhs, cudf.MultiIndex): if level is not None and isinstance(level, int): on = lhs._data.select_by_index(level).names[0] - right_names = (on,) or right_names + right_names = (on,) if on is not None else right_names on = right_names[0] if how == "outer": how = "left" elif how == "right": how = "inner" else: - # Both are nomal indices + # Both are normal indices right_names = left_names on = right_names[0] diff --git a/python/cudf/cudf/tests/test_joining.py b/python/cudf/cudf/tests/test_joining.py index 0518cc2c9b9..d25c6130bfb 100644 --- a/python/cudf/cudf/tests/test_joining.py +++ b/python/cudf/cudf/tests/test_joining.py @@ -2150,3 +2150,16 @@ def test_join_redundant_params(): lhs.merge(rhs, right_on="a", left_index=True, right_index=True) with pytest.raises(ValueError): lhs.merge(rhs, left_on="c", right_on="b") + + +def test_join_multiindex_index(): + # test joining a MultiIndex with an Index with overlapping name + lhs = ( + cudf.DataFrame({"a": [2, 3, 1], "b": [3, 4, 2]}) + .set_index(["a", "b"]) + .index + ) + rhs = cudf.DataFrame({"a": [1, 4, 3]}).set_index("a").index + expect = lhs.to_pandas().join(rhs.to_pandas(), how="inner") + got = lhs.join(rhs, how="inner") + assert_join_results_equal(expect, got, how="inner") From 69e6dbbf447a951e4b08f15c737eedcbaf3291da Mon Sep 17 00:00:00 2001 From: Robert Maynard Date: Fri, 3 Dec 2021 10:18:04 -0500 Subject: [PATCH 13/25] Move the binary_ops common dispatcher logic to be executed on the CPU (#9816) * move NullEquals to separate file * To improve runtime performance move more binary_ops dispatch to host * make sure to forceinline the operator_dispatcher * Correct style issues found by ci * Expand the binary-op compiled benchmark suite * Ensure forceinline is on binary ops device dispatch functions * Correct style issues found by ci Co-authored-by: Karthikeyan Natarajan Co-authored-by: Karthikeyan <6488848+karthikeyann@users.noreply.github.com> --- cpp/CMakeLists.txt | 1 + .../binaryop/compiled_binaryop_benchmark.cpp | 66 ++++++++++--------- .../cudf/utilities/type_dispatcher.hpp | 14 ++-- cpp/src/binaryop/compiled/NullEquals.cu | 26 ++++++++ cpp/src/binaryop/compiled/binary_ops.cu | 2 +- cpp/src/binaryop/compiled/binary_ops.cuh | 63 ++++++++++++------ cpp/src/binaryop/compiled/equality_ops.cu | 41 ++++++++---- 7 files changed, 141 insertions(+), 72 deletions(-) create mode 100644 cpp/src/binaryop/compiled/NullEquals.cu diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 59dc3c74af2..37f93f1868b 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -185,6 +185,7 @@ add_library( src/binaryop/compiled/LogicalOr.cu src/binaryop/compiled/Mod.cu src/binaryop/compiled/Mul.cu + src/binaryop/compiled/NullEquals.cu src/binaryop/compiled/NullMax.cu src/binaryop/compiled/NullMin.cu src/binaryop/compiled/PMod.cu diff --git a/cpp/benchmarks/binaryop/compiled_binaryop_benchmark.cpp b/cpp/benchmarks/binaryop/compiled_binaryop_benchmark.cpp index bc0818ace4b..8d04f8bdcb2 100644 --- a/cpp/benchmarks/binaryop/compiled_binaryop_benchmark.cpp +++ b/cpp/benchmarks/binaryop/compiled_binaryop_benchmark.cpp @@ -50,14 +50,14 @@ void BM_compiled_binaryop(benchmark::State& state, cudf::binary_operator binop) } // TODO tparam boolean for null. -#define BINARYOP_BENCHMARK_DEFINE(TypeLhs, TypeRhs, binop, TypeOut) \ +#define BINARYOP_BENCHMARK_DEFINE(name, TypeLhs, TypeRhs, binop, TypeOut) \ BENCHMARK_TEMPLATE_DEFINE_F( \ - COMPILED_BINARYOP, binop, TypeLhs, TypeRhs, TypeOut, cudf::binary_operator::binop) \ + COMPILED_BINARYOP, name, TypeLhs, TypeRhs, TypeOut, cudf::binary_operator::binop) \ (::benchmark::State & st) \ { \ BM_compiled_binaryop(st, cudf::binary_operator::binop); \ } \ - BENCHMARK_REGISTER_F(COMPILED_BINARYOP, binop) \ + BENCHMARK_REGISTER_F(COMPILED_BINARYOP, name) \ ->Unit(benchmark::kMicrosecond) \ ->UseManualTime() \ ->Arg(10000) /* 10k */ \ @@ -70,30 +70,36 @@ using namespace cudf; using namespace numeric; // clang-format off -BINARYOP_BENCHMARK_DEFINE(float, int64_t, ADD, int32_t); -BINARYOP_BENCHMARK_DEFINE(duration_s, duration_D, SUB, duration_ms); -BINARYOP_BENCHMARK_DEFINE(float, float, MUL, int64_t); -BINARYOP_BENCHMARK_DEFINE(int64_t, int64_t, DIV, int64_t); -BINARYOP_BENCHMARK_DEFINE(int64_t, int64_t, TRUE_DIV, int64_t); -BINARYOP_BENCHMARK_DEFINE(int64_t, int64_t, FLOOR_DIV, int64_t); -BINARYOP_BENCHMARK_DEFINE(double, double, MOD, double); -BINARYOP_BENCHMARK_DEFINE(int32_t, int64_t, PMOD, double); -BINARYOP_BENCHMARK_DEFINE(int32_t, uint8_t, PYMOD, int64_t); -BINARYOP_BENCHMARK_DEFINE(int64_t, int64_t, POW, double); -BINARYOP_BENCHMARK_DEFINE(float, double, LOG_BASE, double); -BINARYOP_BENCHMARK_DEFINE(float, double, ATAN2, double); -BINARYOP_BENCHMARK_DEFINE(int, int, SHIFT_LEFT, int); -BINARYOP_BENCHMARK_DEFINE(int16_t, int64_t, SHIFT_RIGHT, int); -BINARYOP_BENCHMARK_DEFINE(int64_t, int32_t, SHIFT_RIGHT_UNSIGNED, int64_t); -BINARYOP_BENCHMARK_DEFINE(int64_t, int32_t, BITWISE_AND, int16_t); -BINARYOP_BENCHMARK_DEFINE(int16_t, int32_t, BITWISE_OR, int64_t); -BINARYOP_BENCHMARK_DEFINE(int16_t, int64_t, BITWISE_XOR, int32_t); -BINARYOP_BENCHMARK_DEFINE(double, int8_t, LOGICAL_AND, bool); -BINARYOP_BENCHMARK_DEFINE(int16_t, int64_t, LOGICAL_OR, bool); -BINARYOP_BENCHMARK_DEFINE(duration_ms, duration_ns, EQUAL, bool); -BINARYOP_BENCHMARK_DEFINE(decimal32, decimal32, NOT_EQUAL, bool); -BINARYOP_BENCHMARK_DEFINE(timestamp_s, timestamp_s, LESS, bool); -BINARYOP_BENCHMARK_DEFINE(timestamp_ms, timestamp_s, GREATER, bool); -BINARYOP_BENCHMARK_DEFINE(duration_ms, duration_ns, NULL_EQUALS, bool); -BINARYOP_BENCHMARK_DEFINE(decimal32, decimal32, NULL_MAX, decimal32); -BINARYOP_BENCHMARK_DEFINE(timestamp_D, timestamp_s, NULL_MIN, timestamp_s); +BINARYOP_BENCHMARK_DEFINE(ADD_1, float, float, ADD, float); +BINARYOP_BENCHMARK_DEFINE(ADD_2, timestamp_s, duration_s, ADD, timestamp_s); +BINARYOP_BENCHMARK_DEFINE(SUB_1, duration_s, duration_D, SUB, duration_ms); +BINARYOP_BENCHMARK_DEFINE(SUB_2, int64_t, int64_t, SUB, int64_t); +BINARYOP_BENCHMARK_DEFINE(MUL_1, float, float, MUL, int64_t); +BINARYOP_BENCHMARK_DEFINE(MUL_2, duration_s, int64_t, MUL, duration_s); +BINARYOP_BENCHMARK_DEFINE(DIV_1, int64_t, int64_t, DIV, int64_t); +BINARYOP_BENCHMARK_DEFINE(DIV_2, duration_ms, int32_t, DIV, duration_ms); +BINARYOP_BENCHMARK_DEFINE(TRUE_DIV, int64_t, int64_t, TRUE_DIV, int64_t); +BINARYOP_BENCHMARK_DEFINE(FLOOR_DIV, int64_t, int64_t, FLOOR_DIV, int64_t); +BINARYOP_BENCHMARK_DEFINE(MOD_1, double, double, MOD, double); +BINARYOP_BENCHMARK_DEFINE(MOD_2, duration_ms, int64_t, MOD, duration_ms); +BINARYOP_BENCHMARK_DEFINE(PMOD, int32_t, int64_t, PMOD, double); +BINARYOP_BENCHMARK_DEFINE(PYMOD, int32_t, uint8_t, PYMOD, int64_t); +BINARYOP_BENCHMARK_DEFINE(POW, int64_t, int64_t, POW, double); +BINARYOP_BENCHMARK_DEFINE(LOG_BASE, float, double, LOG_BASE, double); +BINARYOP_BENCHMARK_DEFINE(ATAN2, float, double, ATAN2, double); +BINARYOP_BENCHMARK_DEFINE(SHIFT_LEFT, int, int, SHIFT_LEFT, int); +BINARYOP_BENCHMARK_DEFINE(SHIFT_RIGHT, int16_t, int64_t, SHIFT_RIGHT, int); +BINARYOP_BENCHMARK_DEFINE(USHIFT_RIGHT, int64_t, int32_t, SHIFT_RIGHT_UNSIGNED, int64_t); +BINARYOP_BENCHMARK_DEFINE(BITWISE_AND, int64_t, int32_t, BITWISE_AND, int16_t); +BINARYOP_BENCHMARK_DEFINE(BITWISE_OR, int16_t, int32_t, BITWISE_OR, int64_t); +BINARYOP_BENCHMARK_DEFINE(BITWISE_XOR, int16_t, int64_t, BITWISE_XOR, int32_t); +BINARYOP_BENCHMARK_DEFINE(LOGICAL_AND, double, int8_t, LOGICAL_AND, bool); +BINARYOP_BENCHMARK_DEFINE(LOGICAL_OR, int16_t, int64_t, LOGICAL_OR, bool); +BINARYOP_BENCHMARK_DEFINE(EQUAL_1, int32_t, int64_t, EQUAL, bool); +BINARYOP_BENCHMARK_DEFINE(EQUAL_2, duration_ms, duration_ns, EQUAL, bool); +BINARYOP_BENCHMARK_DEFINE(NOT_EQUAL, decimal32, decimal32, NOT_EQUAL, bool); +BINARYOP_BENCHMARK_DEFINE(LESS, timestamp_s, timestamp_s, LESS, bool); +BINARYOP_BENCHMARK_DEFINE(GREATER, timestamp_ms, timestamp_s, GREATER, bool); +BINARYOP_BENCHMARK_DEFINE(NULL_EQUALS, duration_ms, duration_ns, NULL_EQUALS, bool); +BINARYOP_BENCHMARK_DEFINE(NULL_MAX, decimal32, decimal32, NULL_MAX, decimal32); +BINARYOP_BENCHMARK_DEFINE(NULL_MIN, timestamp_D, timestamp_s, NULL_MIN, timestamp_s); diff --git a/cpp/include/cudf/utilities/type_dispatcher.hpp b/cpp/include/cudf/utilities/type_dispatcher.hpp index a04b8309142..d7d38aba4f3 100644 --- a/cpp/include/cudf/utilities/type_dispatcher.hpp +++ b/cpp/include/cudf/utilities/type_dispatcher.hpp @@ -531,7 +531,7 @@ template struct double_type_dispatcher_second_type { #pragma nv_exec_check_disable template - CUDA_HOST_DEVICE_CALLABLE decltype(auto) operator()(F&& f, Ts&&... args) const + CUDF_HDFI decltype(auto) operator()(F&& f, Ts&&... args) const { return f.template operator()(std::forward(args)...); } @@ -541,9 +541,7 @@ template