From 27f55516572d8b822db9eac7b27add3ecc0f094b Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 19 Nov 2024 11:18:01 -0800 Subject: [PATCH] Avoid materializing a column when converting strings Signed-off-by: Nghia Truong --- src/main/cpp/src/from_json_to_structs.cu | 106 ++++++++++------------- src/main/cpp/src/json_utils.cu | 5 +- 2 files changed, 49 insertions(+), 62 deletions(-) diff --git a/src/main/cpp/src/from_json_to_structs.cu b/src/main/cpp/src/from_json_to_structs.cu index 02b2f61c2f..f3fa68e81b 100644 --- a/src/main/cpp/src/from_json_to_structs.cu +++ b/src/main/cpp/src/from_json_to_structs.cu @@ -207,73 +207,59 @@ std::unique_ptr cast_strings_to_integers(cudf::column_view const& auto const input_sv = cudf::strings_column_view{input}; auto const input_offsets_it = cudf::detail::offsetalator_factory::make_input_iterator(input_sv.offsets()); - auto const d_input_ptr = cudf::column_device_view::create(input, stream); - auto const is_valid_it = cudf::detail::make_validity_iterator(*d_input_ptr); + auto const d_input_ptr = cudf::column_device_view::create(input, stream); + auto const valid_input_it = cudf::detail::make_validity_iterator(*d_input_ptr); // We need to nullify the invalid string rows. - // Technically, we should just mask out these rows as invalid and ignore them. - // However, `spark_rapids_jni::string_to_integer` cannot handle these non-empty null rows, - // thus we have to materialzie the valid strings into a new strings column. - auto string_pairs = rmm::device_uvector(string_count, stream); + // Technically, we should just mask out these rows as nulls through the nullmask. + // These masked out non-empty nulls will be handled in the conversion API. + auto valids = rmm::device_uvector(string_count, stream); // Since the strings store integer numbers, they should be very short. - // As such, using one thread per string should be good. + // As such, using one thread per string should be fine. thrust::tabulate(rmm::exec_policy_nosync(stream), - string_pairs.begin(), - string_pairs.end(), - [chars = input_sv.chars_begin(stream), - offsets = input_offsets_it, - is_valid = is_valid_it] __device__(cudf::size_type idx) -> string_index_pair { - if (!is_valid[idx]) { return {nullptr, 0}; } - - auto const start_offset = offsets[idx]; - auto const end_offset = offsets[idx + 1]; - - auto in_ptr = chars + start_offset; - auto in_end = chars + end_offset; + valids.begin(), + valids.end(), + [chars = input_sv.chars_begin(stream), + offsets = input_offsets_it, + valid_input = valid_input_it] __device__(cudf::size_type idx) -> bool { + if (!valid_input[idx]) { return false; } + + auto in_ptr = chars + offsets[idx]; + auto const in_end = chars + offsets[idx + 1]; while (in_ptr != in_end) { - if (*in_ptr == '.' || *in_ptr == 'e' || *in_ptr == 'E') { - return {nullptr, 0}; - } + if (*in_ptr == '.' || *in_ptr == 'e' || *in_ptr == 'E') { return false; } ++in_ptr; } - return {chars + start_offset, end_offset - start_offset}; + return true; }); - auto const size_it = cudf::detail::make_counting_transform_iterator( - 0, - cuda::proclaim_return_type( - [string_pairs = string_pairs.begin()] __device__(cudf::size_type idx) -> cudf::size_type { - return string_pairs[idx].second; - })); - auto [offsets_column, bytes] = - cudf::strings::detail::make_offsets_child_column(size_it, size_it + string_count, stream, mr); - - // If the output strings column does not change in its total bytes, we can use the input directly. - if (bytes == input_sv.chars_size(stream)) { - return spark_rapids_jni::string_to_integer(output_type, - input_sv, - /*ansi_mode*/ false, - /*strip*/ false, - stream, - mr); - } - - // Build a new strings column, removing the invalid rows. - auto chars_data = cudf::strings::detail::make_chars_buffer( - offsets_column->view(), bytes, string_pairs.begin(), string_count, stream, mr); - - // Don't care about the null mask, as nulls imply empty strings, which will also result in nulls. - auto const sanitized_input = - cudf::make_strings_column(string_count, std::move(offsets_column), chars_data.release(), 0, {}); - - return spark_rapids_jni::string_to_integer(output_type, - cudf::strings_column_view{sanitized_input->view()}, - /*ansi_mode*/ false, - /*strip*/ false, - stream, - mr); + auto const [null_mask, null_count] = + cudf::detail::valid_if(valids.begin(), + valids.end(), + thrust::identity{}, + stream, + cudf::get_current_device_resource_ref()); + // If the null count doesn't change, just use the input column for conversion. + auto const input_applied_null = + null_count == input.null_count() + ? cudf::column_view{} + : cudf::column_view{cudf::data_type{cudf::type_id::STRING}, + input_sv.size(), + input_sv.chars_begin(stream), + reinterpret_cast(null_mask.data()), + null_count, + input_sv.offset(), + std::vector{input_sv.offsets()}}; + + return spark_rapids_jni::string_to_integer( + output_type, + null_count == input.null_count() ? input_sv : cudf::strings_column_view{input_applied_null}, + /*ansi_mode*/ false, + /*strip*/ false, + stream, + mr); } std::pair, bool> try_remove_quotes_for_floats( @@ -459,9 +445,11 @@ std::unique_ptr cast_strings_to_decimals(cudf::column_view const& quote_counts = quote_counts.begin(), remove_counts = remove_counts.begin()] __device__(auto idx) { auto const input_size = offsets[idx + 1] - offsets[idx]; - // If the current row is a non-quoted string, just return the original string. + // If the current row is non-quoted, just return the original string. + // As such, non-quoted string containing `,` character will not be preprocessed. if (quote_counts[idx] == 0) { return static_cast(input_size); } - // Otherwise, we will modify the string, removing characters '"' and ','. + + // For quoted strings, we will modify them, removing characters '"' and ','. return static_cast(input_size - remove_counts[idx]); })); auto [offsets_column, bytes] = cudf::strings::detail::make_offsets_child_column( @@ -483,7 +471,7 @@ std::unique_ptr cast_strings_to_decimals(cudf::column_view const& auto chars_data = rmm::device_uvector(bytes, stream, mr); // Since the strings store decimal numbers, they should not be very long. - // As such, using one thread per string should be good. + // As such, using one thread per string should be fine. thrust::for_each(rmm::exec_policy_nosync(stream), thrust::make_counting_iterator(0), thrust::make_counting_iterator(string_count), diff --git a/src/main/cpp/src/json_utils.cu b/src/main/cpp/src/json_utils.cu index 3305bec9ae..ef3d0db0f8 100644 --- a/src/main/cpp/src/json_utils.cu +++ b/src/main/cpp/src/json_utils.cu @@ -190,8 +190,7 @@ std::tuple, char, std::unique_ptr, char, std::unique_ptr(null_mask.data()), null_count, - 0, + input.offset(), std::vector{input.offsets()}}; auto concat_strings = cudf::strings::detail::join_strings(