Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for large string columns to Parquet reader and writer #15632

Merged
merged 22 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ add_library(
src/io/text/multibyte_split.cu
src/io/utilities/arrow_io_source.cpp
src/io/utilities/column_buffer.cpp
src/io/utilities/column_buffer_strings.cu
src/io/utilities/config_utils.cpp
src/io/utilities/data_casting.cu
src/io/utilities/data_sink.cpp
Expand Down
34 changes: 20 additions & 14 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -579,15 +579,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
__syncthreads();
}

// now turn array of lengths into offsets
int value_count = nesting_info_base[leaf_level_index].value_count;
// Now turn the array of lengths into offsets, but skip if this is a large string column. In the
// latter case, offsets will be computed during string column creation.
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }
// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}

if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}
Expand Down Expand Up @@ -738,15 +741,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
__syncthreads();
}

// now turn array of lengths into offsets
int value_count = nesting_info_base[leaf_level_index].value_count;
// Now turn the array of lengths into offsets, but skip if this is a large string column. In the
// latter case, offsets will be computed during string column creation.
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }
// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}

// finally, copy the string data into place
auto const dst = nesting_info_base[leaf_level_index].string_out;
Expand Down
33 changes: 18 additions & 15 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
{
using cudf::detail::warp_size;
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(4) size_type last_offset;
__shared__ size_t last_offset;
__shared__ __align__(16)
page_state_buffers_s<rolling_buf_size, rolling_buf_size, rolling_buf_size>
state_buffers;
Expand Down Expand Up @@ -1054,9 +1054,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
? gpuGetStringData(s, sb, src_pos + skipped_leaf_values + i)
: cuda::std::pair<char const*, size_t>{nullptr, 0};

__shared__ cub::WarpScan<size_type>::TempStorage temp_storage;
size_type offset, warp_total;
cub::WarpScan<size_type>(temp_storage).ExclusiveSum(len, offset, warp_total);
__shared__ cub::WarpScan<size_t>::TempStorage temp_storage;
size_t offset, warp_total;
cub::WarpScan<size_t>(temp_storage).ExclusiveSum(len, offset, warp_total);
offset += last_offset;

// choose a character parallel string copy when the average string is longer than a warp
Expand All @@ -1075,10 +1075,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
}
__syncwarp();
} else if (use_char_ll) {
__shared__ __align__(8) uint8_t const* pointers[warp_size];
__shared__ __align__(4) size_type offsets[warp_size];
__shared__ __align__(4) int dsts[warp_size];
__shared__ __align__(4) int lengths[warp_size];
__shared__ uint8_t const* pointers[warp_size];
__shared__ size_t offsets[warp_size];
__shared__ int dsts[warp_size];
__shared__ int lengths[warp_size];

offsets[me] = offset;
pointers[me] = reinterpret_cast<uint8_t const*>(ptr);
Expand Down Expand Up @@ -1119,15 +1119,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
__syncthreads();
}

// now turn array of lengths into offsets
int value_count = nesting_info_base[leaf_level_index].value_count;
// Now turn the array of lengths into offsets, but skip if this is a large string column. In the
// latter case, offsets will be computed during string column creation.
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }
// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}

if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ struct PageInfo {
int32_t skipped_leaf_values;
// for string columns only, the size of all the chars in the string for
// this page. only valid/computed during the base preprocess pass
size_t str_offset; // offset into string data for this page
int32_t str_bytes;
int32_t str_offset; // offset into string data for this page
bool has_page_index; // true if str_bytes, num_valids, etc are derivable from page indexes

// nesting information (input/output) for each page. this array contains
Expand Down Expand Up @@ -420,7 +420,8 @@ struct ColumnChunkDesc {
src_col_schema(src_col_schema_),
h_chunk_info(chunk_info_),
list_bytes_per_row_est(list_bytes_per_row_est_),
is_strings_to_cat(strings_to_categorical_)
is_strings_to_cat(strings_to_categorical_),
is_large_string_col(false)
{
}

Expand Down Expand Up @@ -454,7 +455,8 @@ struct ColumnChunkDesc {

float list_bytes_per_row_est{}; // for LIST columns, an estimate on number of bytes per row

bool is_strings_to_cat{}; // convert strings to hashes
bool is_strings_to_cat{}; // convert strings to hashes
bool is_large_string_col{}; // `true` if string data uses 64-bit offsets
};

/**
Expand Down
29 changes: 21 additions & 8 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cudf/detail/transform.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/strings/detail/utilities.hpp>

#include <rmm/resource_ref.hpp>

Expand Down Expand Up @@ -99,11 +100,21 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row
col_string_sizes = calculate_page_string_offsets();

// check for overflow
if (std::any_of(col_string_sizes.cbegin(), col_string_sizes.cend(), [](std::size_t sz) {
return sz > std::numeric_limits<size_type>::max();
})) {
auto const threshold = static_cast<size_t>(strings::detail::get_offset64_threshold());
auto const has_large_strings = std::any_of(col_string_sizes.cbegin(),
col_string_sizes.cend(),
[=](std::size_t sz) { return sz > threshold; });
if (has_large_strings and not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}

// mark any chunks that are large string columns
if (has_large_strings) {
for (auto& chunk : pass.chunks) {
auto const idx = chunk.src_col_index;
if (col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; }
}
}
}

// In order to reduce the number of allocations of hostdevice_vector, we allocate a single vector
Expand Down Expand Up @@ -348,11 +359,13 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row
} else if (out_buf.type.id() == type_id::STRING) {
// need to cap off the string offsets column
auto const sz = static_cast<size_type>(col_string_sizes[idx]);
CUDF_CUDA_TRY(cudaMemcpyAsync(static_cast<size_type*>(out_buf.data()) + out_buf.size,
&sz,
sizeof(size_type),
cudaMemcpyDefault,
_stream.value()));
if (sz <= strings::detail::get_offset64_threshold()) {
CUDF_CUDA_TRY(cudaMemcpyAsync(static_cast<size_type*>(out_buf.data()) + out_buf.size,
&sz,
sizeof(size_type),
cudaMemcpyDefault,
_stream.value()));
}
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1169,10 +1169,10 @@ struct page_to_string_size {
struct page_offset_output_iter {
PageInfo* p;

using value_type = size_type;
using difference_type = size_type;
using pointer = size_type*;
using reference = size_type&;
using value_type = size_t;
using difference_type = size_t;
using pointer = size_t*;
using reference = size_t&;
using iterator_category = thrust::output_device_iterator_tag;

__host__ __device__ page_offset_output_iter operator+(int i) { return {p + i}; }
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/lists/detail/dremel.hpp>
#include <cudf/lists/lists_column_view.hpp>
#include <cudf/strings/detail/utilities.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/structs/structs_column_view.hpp>
#include <cudf/table/table_device_view.cuh>
Expand Down Expand Up @@ -278,8 +279,9 @@ size_t column_size(column_view const& column, rmm::cuda_stream_view stream)
return size_of(column.type()) * column.size();
} else if (column.type().id() == type_id::STRING) {
auto const scol = strings_column_view(column);
return cudf::detail::get_value<size_type>(scol.offsets(), column.size(), stream) -
cudf::detail::get_value<size_type>(scol.offsets(), 0, stream);
return cudf::strings::detail::get_offset_value(
scol.offsets(), column.size() + column.offset(), stream) -
cudf::strings::detail::get_offset_value(scol.offsets(), column.offset(), stream);
} else if (column.type().id() == type_id::STRUCT) {
auto const scol = structs_column_view(column);
size_t ret = 0;
Expand Down
10 changes: 0 additions & 10 deletions cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,6 @@ void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes
_string_data = rmm::device_buffer(num_bytes, stream, _mr);
}

std::unique_ptr<column> cudf::io::detail::inline_column_buffer::make_string_column_impl(
rmm::cuda_stream_view stream)
{
// no need for copies, just transfer ownership of the data_buffers to the columns
auto offsets_col = std::make_unique<column>(
data_type{type_to_id<size_type>()}, size + 1, std::move(_data), rmm::device_buffer{}, 0);
return make_strings_column(
size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask));
}

namespace {

/**
Expand Down
53 changes: 53 additions & 0 deletions cpp/src/io/utilities/column_buffer_strings.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "column_buffer.hpp"

#include <cudf/strings/detail/strings_children.cuh>
#include <cudf/strings/detail/utilities.hpp>
#include <cudf/utilities/error.hpp>

namespace cudf::io::detail {

std::unique_ptr<column> cudf::io::detail::inline_column_buffer::make_string_column_impl(
rmm::cuda_stream_view stream)
{
// if the size of _string_data is over the threshold for 64bit size_type, _data will contain
// sizes rather than offsets. need special handling for that case.
auto const threshold = static_cast<size_t>(strings::detail::get_offset64_threshold());
if (_string_data.size() > threshold) {
if (not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}
// create new offsets
auto const offsets_ptr = static_cast<size_type*>(_data.data());
auto offsets_col = make_numeric_column(
data_type{type_id::INT64}, size + 1, mask_state::UNALLOCATED, stream, _mr);
auto d_offsets64 = offsets_col->mutable_view().template data<int64_t>();
// it's safe to call with size + 1 because _data is also sized that large
cudf::detail::sizes_to_offsets(offsets_ptr, offsets_ptr + size + 1, d_offsets64, stream);
return make_strings_column(
size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask));
} else {
// no need for copies, just transfer ownership of the data_buffers to the columns
auto offsets_col = std::make_unique<column>(
data_type{type_to_id<size_type>()}, size + 1, std::move(_data), rmm::device_buffer{}, 0);
return make_strings_column(
size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask));
}
}

} // namespace cudf::io::detail
2 changes: 1 addition & 1 deletion cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ ConfigureTest(
# * large strings test ----------------------------------------------------------------------------
ConfigureTest(
LARGE_STRINGS_TEST large_strings/large_strings_fixture.cpp large_strings/merge_tests.cpp
large_strings/concatenate_tests.cpp
large_strings/concatenate_tests.cpp large_strings/parquet_tests.cpp
GPUS 1
PERCENT 100
)
Expand Down
73 changes: 73 additions & 0 deletions cpp/tests/large_strings/parquet_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "large_strings_fixture.hpp"

#include <cudf_test/column_utilities.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/table_utilities.hpp>

#include <cudf/io/parquet.hpp>
#include <cudf/io/types.hpp>
#include <cudf/table/table_view.hpp>

namespace {

cudf::test::TempDirTestEnvironment* const g_temp_env =
static_cast<cudf::test::TempDirTestEnvironment*>(
::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment));

} // namespace

struct ParquetStringsTest : public cudf::test::StringsLargeTest {};

TEST_F(ParquetStringsTest, ReadLargeStrings)
{
// need to create a string column larger than `threshold`
auto const col0 = this->long_column();
auto const column_size = cudf::strings_column_view(col0).chars_size(cudf::get_default_stream());
auto const threshold = column_size - 1;
auto const expected = cudf::table_view{{col0, col0, col0}};

auto expected_metadata = cudf::io::table_input_metadata{expected};
expected_metadata.column_metadata[1].set_encoding(
cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY);
expected_metadata.column_metadata[2].set_encoding(cudf::io::column_encoding::DELTA_BYTE_ARRAY);

// set smaller threshold to reduce file size and execution time
setenv("LIBCUDF_LARGE_STRINGS_THRESHOLD", std::to_string(threshold).c_str(), 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

candidate for inclusion in StringsLargeTest?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good idea.
I think I'd want it behave like the CUDF_TEST_ENABLE_LARGE_STRINGS() macro where it automatically unsets the environment variable at the end of the scope. I can do this in a follow on PR so to keep this one more focused.


auto const filepath = g_temp_env->get_temp_filepath("ReadLargeStrings.parquet");
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected)
.compression(cudf::io::compression_type::ZSTD)
.stats_level(cudf::io::STATISTICS_NONE)
.metadata(expected_metadata);
cudf::io::write_parquet(out_opts);

cudf::io::parquet_reader_options default_in_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath});
auto const result = cudf::io::read_parquet(default_in_opts);
auto const result_view = result.tbl->view();
for (auto cv : result_view) {
auto const offsets = cudf::strings_column_view(cv).offsets();
EXPECT_EQ(offsets.type(), cudf::data_type{cudf::type_id::INT64});
}
CUDF_TEST_EXPECT_TABLES_EQUAL(result_view, expected);

// go back to normal threshold
unsetenv("LIBCUDF_LARGE_STRINGS_THRESHOLD");
}
Loading