Skip to content

Commit

Permalink
Avoid materializing a column when converting strings
Browse files Browse the repository at this point in the history
Signed-off-by: Nghia Truong <[email protected]>
  • Loading branch information
ttnghia committed Nov 19, 2024
1 parent d2b6fb5 commit 27f5551
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 62 deletions.
106 changes: 47 additions & 59 deletions src/main/cpp/src/from_json_to_structs.cu
Original file line number Diff line number Diff line change
Expand Up @@ -207,73 +207,59 @@ std::unique_ptr<cudf::column> cast_strings_to_integers(cudf::column_view const&
auto const input_sv = cudf::strings_column_view{input};
auto const input_offsets_it =
cudf::detail::offsetalator_factory::make_input_iterator(input_sv.offsets());
auto const d_input_ptr = cudf::column_device_view::create(input, stream);
auto const is_valid_it = cudf::detail::make_validity_iterator<true>(*d_input_ptr);
auto const d_input_ptr = cudf::column_device_view::create(input, stream);
auto const valid_input_it = cudf::detail::make_validity_iterator<true>(*d_input_ptr);

// We need to nullify the invalid string rows.
// Technically, we should just mask out these rows as invalid and ignore them.
// However, `spark_rapids_jni::string_to_integer` cannot handle these non-empty null rows,
// thus we have to materialzie the valid strings into a new strings column.
auto string_pairs = rmm::device_uvector<string_index_pair>(string_count, stream);
// Technically, we should just mask out these rows as nulls through the nullmask.
// These masked out non-empty nulls will be handled in the conversion API.
auto valids = rmm::device_uvector<bool>(string_count, stream);

// Since the strings store integer numbers, they should be very short.
// As such, using one thread per string should be good.
// As such, using one thread per string should be fine.
thrust::tabulate(rmm::exec_policy_nosync(stream),
string_pairs.begin(),
string_pairs.end(),
[chars = input_sv.chars_begin(stream),
offsets = input_offsets_it,
is_valid = is_valid_it] __device__(cudf::size_type idx) -> string_index_pair {
if (!is_valid[idx]) { return {nullptr, 0}; }

auto const start_offset = offsets[idx];
auto const end_offset = offsets[idx + 1];

auto in_ptr = chars + start_offset;
auto in_end = chars + end_offset;
valids.begin(),
valids.end(),
[chars = input_sv.chars_begin(stream),
offsets = input_offsets_it,
valid_input = valid_input_it] __device__(cudf::size_type idx) -> bool {
if (!valid_input[idx]) { return false; }

auto in_ptr = chars + offsets[idx];
auto const in_end = chars + offsets[idx + 1];
while (in_ptr != in_end) {
if (*in_ptr == '.' || *in_ptr == 'e' || *in_ptr == 'E') {
return {nullptr, 0};
}
if (*in_ptr == '.' || *in_ptr == 'e' || *in_ptr == 'E') { return false; }
++in_ptr;
}

return {chars + start_offset, end_offset - start_offset};
return true;
});

auto const size_it = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<cudf::size_type>(
[string_pairs = string_pairs.begin()] __device__(cudf::size_type idx) -> cudf::size_type {
return string_pairs[idx].second;
}));
auto [offsets_column, bytes] =
cudf::strings::detail::make_offsets_child_column(size_it, size_it + string_count, stream, mr);

// If the output strings column does not change in its total bytes, we can use the input directly.
if (bytes == input_sv.chars_size(stream)) {
return spark_rapids_jni::string_to_integer(output_type,
input_sv,
/*ansi_mode*/ false,
/*strip*/ false,
stream,
mr);
}

// Build a new strings column, removing the invalid rows.
auto chars_data = cudf::strings::detail::make_chars_buffer(
offsets_column->view(), bytes, string_pairs.begin(), string_count, stream, mr);

// Don't care about the null mask, as nulls imply empty strings, which will also result in nulls.
auto const sanitized_input =
cudf::make_strings_column(string_count, std::move(offsets_column), chars_data.release(), 0, {});

return spark_rapids_jni::string_to_integer(output_type,
cudf::strings_column_view{sanitized_input->view()},
/*ansi_mode*/ false,
/*strip*/ false,
stream,
mr);
auto const [null_mask, null_count] =
cudf::detail::valid_if(valids.begin(),
valids.end(),
thrust::identity{},
stream,
cudf::get_current_device_resource_ref());
// If the null count doesn't change, just use the input column for conversion.
auto const input_applied_null =
null_count == input.null_count()
? cudf::column_view{}
: cudf::column_view{cudf::data_type{cudf::type_id::STRING},
input_sv.size(),
input_sv.chars_begin(stream),
reinterpret_cast<cudf::bitmask_type const*>(null_mask.data()),
null_count,
input_sv.offset(),
std::vector<cudf::column_view>{input_sv.offsets()}};

return spark_rapids_jni::string_to_integer(
output_type,
null_count == input.null_count() ? input_sv : cudf::strings_column_view{input_applied_null},
/*ansi_mode*/ false,
/*strip*/ false,
stream,
mr);
}

std::pair<std::unique_ptr<cudf::column>, bool> try_remove_quotes_for_floats(
Expand Down Expand Up @@ -459,9 +445,11 @@ std::unique_ptr<cudf::column> cast_strings_to_decimals(cudf::column_view const&
quote_counts = quote_counts.begin(),
remove_counts = remove_counts.begin()] __device__(auto idx) {
auto const input_size = offsets[idx + 1] - offsets[idx];
// If the current row is a non-quoted string, just return the original string.
// If the current row is non-quoted, just return the original string.
// As such, non-quoted string containing `,` character will not be preprocessed.
if (quote_counts[idx] == 0) { return static_cast<cudf::size_type>(input_size); }
// Otherwise, we will modify the string, removing characters '"' and ','.

// For quoted strings, we will modify them, removing characters '"' and ','.
return static_cast<cudf::size_type>(input_size - remove_counts[idx]);
}));
auto [offsets_column, bytes] = cudf::strings::detail::make_offsets_child_column(
Expand All @@ -483,7 +471,7 @@ std::unique_ptr<cudf::column> cast_strings_to_decimals(cudf::column_view const&
auto chars_data = rmm::device_uvector<char>(bytes, stream, mr);

// Since the strings store decimal numbers, they should not be very long.
// As such, using one thread per string should be good.
// As such, using one thread per string should be fine.
thrust::for_each(rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(string_count),
Expand Down
5 changes: 2 additions & 3 deletions src/main/cpp/src/json_utils.cu
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ std::tuple<std::unique_ptr<rmm::device_buffer>, char, std::unique_ptr<cudf::colu

auto [null_mask, null_count] = cudf::detail::valid_if(
is_valid_input.begin(), is_valid_input.end(), thrust::identity{}, stream, default_mr);
// If the null count doesn't change, that mean we do not have any rows containing `null` string
// literal or empty rows. In such cases, just use the input column for concatenation.
// If the null count doesn't change, just use the input column for concatenation.
auto const input_applied_null =
null_count == input.null_count()
? cudf::column_view{}
Expand All @@ -200,7 +199,7 @@ std::tuple<std::unique_ptr<rmm::device_buffer>, char, std::unique_ptr<cudf::colu
input.chars_begin(stream),
reinterpret_cast<cudf::bitmask_type const*>(null_mask.data()),
null_count,
0,
input.offset(),
std::vector<cudf::column_view>{input.offsets()}};

auto concat_strings = cudf::strings::detail::join_strings(
Expand Down

0 comments on commit 27f5551

Please sign in to comment.