Skip to content

Commit

Permalink
Use strings concatenate to support large strings in CSV writer (#16148)
Browse files Browse the repository at this point in the history
Changes the CSV writer logic to use `cudf::strings::concatenate` instead of `cudf::strings::join_strings` when output size exceeds `join_strings` limit.

Closes #16137

Authors:
  - David Wendt (https://github.com/davidwendt)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - Yunsong Wang (https://github.com/PointKernel)

URL: #16148
  • Loading branch information
davidwendt authored Jul 5, 2024
1 parent ae42218 commit 37defc6
Showing 1 changed file with 29 additions and 9 deletions.
38 changes: 29 additions & 9 deletions cpp/src/io/csv/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <cudf/column/column_device_view.cuh>
#include <cudf/detail/copy.hpp>
#include <cudf/detail/fill.hpp>
#include <cudf/detail/null_mask.hpp>
#include <cudf/io/data_sink.hpp>
#include <cudf/io/detail/csv.hpp>
Expand Down Expand Up @@ -372,15 +373,33 @@ void write_chunked(data_sink* out_sink,
CUDF_EXPECTS(str_column_view.size() > 0, "Unexpected empty strings column.");

cudf::string_scalar newline{options.get_line_terminator(), true, stream};
auto p_str_col_w_nl = cudf::strings::detail::join_strings(str_column_view,
newline,
string_scalar{"", false, stream},
stream,
rmm::mr::get_current_device_resource());
strings_column_view strings_column{p_str_col_w_nl->view()};

auto total_num_bytes = strings_column.chars_size(stream);
char const* ptr_all_bytes = strings_column.chars_begin(stream);
// use strings concatenate to build the final CSV output in device memory
auto contents_w_nl = [&] {
auto const total_size =
str_column_view.chars_size(stream) + (newline.size() * str_column_view.size());
auto const empty_str = string_scalar("", true, stream);
// use join_strings when the output will be less than 2GB
if (total_size < static_cast<int64_t>(std::numeric_limits<size_type>::max())) {
return cudf::strings::detail::join_strings(str_column_view, newline, empty_str, stream, mr)
->release();
}
auto nl_col = cudf::make_column_from_scalar(newline, str_column_view.size(), stream);
// convert the last element into an empty string by resetting the last offset value
auto& offsets = nl_col->child(strings_column_view::offsets_column_index);
auto offsets_view = offsets.mutable_view();
cudf::fill_in_place(offsets_view,
offsets.size() - 1, // set the last element with
offsets.size(), // the value from 2nd to last element
*cudf::detail::get_element(offsets.view(), offsets.size() - 2, stream, mr),
stream);
auto const nl_tbl = cudf::table_view({str_column_view.parent(), nl_col->view()});
return cudf::strings::detail::concatenate(
nl_tbl, empty_str, empty_str, strings::separator_on_nulls::NO, stream, mr)
->release();
}();
auto const total_num_bytes = contents_w_nl.data->size();
auto const ptr_all_bytes = static_cast<char const*>(contents_w_nl.data->data());

if (out_sink->is_device_write_preferred(total_num_bytes)) {
// Direct write from device memory
Expand Down Expand Up @@ -491,7 +510,8 @@ void write_csv(data_sink* out_sink,
str_table_view.column(0), options_narep, stream, rmm::mr::get_current_device_resource());
}();

write_chunked(out_sink, str_concat_col->view(), options, stream, mr);
write_chunked(
out_sink, str_concat_col->view(), options, stream, rmm::mr::get_current_device_resource());
}
}
}
Expand Down

0 comments on commit 37defc6

Please sign in to comment.