Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for large string columns to Parquet reader and writer #15632

Merged
merged 22 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ add_library(
src/io/text/multibyte_split.cu
src/io/utilities/arrow_io_source.cpp
src/io/utilities/column_buffer.cpp
src/io/utilities/column_buffer_strings.cu
src/io/utilities/config_utils.cpp
src/io/utilities/data_casting.cu
src/io/utilities/data_sink.cpp
Expand Down
32 changes: 18 additions & 14 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -579,15 +579,17 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
__syncthreads();
}

// now turn array of lengths into offsets
int value_count = nesting_info_base[leaf_level_index].value_count;
// now turn array of lengths into offsets, but skip if this is a large string column
bdice marked this conversation as resolved.
Show resolved Hide resolved
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }
// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}

if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}
Expand Down Expand Up @@ -738,15 +740,17 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
__syncthreads();
}

// now turn array of lengths into offsets
int value_count = nesting_info_base[leaf_level_index].value_count;
// now turn array of lengths into offsets, but skip if this is a large string column
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }
// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}

// finally, copy the string data into place
auto const dst = nesting_info_base[leaf_level_index].string_out;
Expand Down
26 changes: 14 additions & 12 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
{
using cudf::detail::warp_size;
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(4) size_type last_offset;
__shared__ __align__(8) size_t last_offset;
__shared__ __align__(16)
page_state_buffers_s<rolling_buf_size, rolling_buf_size, rolling_buf_size>
state_buffers;
Expand Down Expand Up @@ -1054,9 +1054,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
? gpuGetStringData(s, sb, src_pos + skipped_leaf_values + i)
: cuda::std::pair<char const*, size_t>{nullptr, 0};

__shared__ cub::WarpScan<size_type>::TempStorage temp_storage;
size_type offset, warp_total;
cub::WarpScan<size_type>(temp_storage).ExclusiveSum(len, offset, warp_total);
__shared__ cub::WarpScan<size_t>::TempStorage temp_storage;
size_t offset, warp_total;
cub::WarpScan<size_t>(temp_storage).ExclusiveSum(len, offset, warp_total);
offset += last_offset;

// choose a character parallel string copy when the average string is longer than a warp
Expand All @@ -1076,7 +1076,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
__syncwarp();
} else if (use_char_ll) {
__shared__ __align__(8) uint8_t const* pointers[warp_size];
__shared__ __align__(4) size_type offsets[warp_size];
__shared__ __align__(4) size_t offsets[warp_size];
etseidl marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
__shared__ __align__(4) size_t offsets[warp_size];
__shared__ __align__(8) size_t offsets[warp_size];

Are these align declarators needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh, I started using the __align__ as a monkey-see-monkey-do kind of thing 😅. I don't know if they're actually necessary at this point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this one is the only one that may have purpose
https://github.com/rapidsai/cudf/pull/15632/files#diff-52e09ddca44181e11af56d8526360207906f5f25ba888cf51efbd2c1b15d775cR957

__shared__ __align__(16) page_state_s state_g;

Only because page_state_s is a structure but it should probably have been declared with alignas(16)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove them since they're unnecessary.

__shared__ __align__(4) int dsts[warp_size];
__shared__ __align__(4) int lengths[warp_size];

Expand Down Expand Up @@ -1119,15 +1119,17 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
__syncthreads();
}

// now turn array of lengths into offsets
int value_count = nesting_info_base[leaf_level_index].value_count;
// now turn array of lengths into offsets, but skip if this is a large string column
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }
// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}

if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ struct PageInfo {
int32_t skipped_leaf_values;
// for string columns only, the size of all the chars in the string for
// this page. only valid/computed during the base preprocess pass
size_t str_offset; // offset into string data for this page
int32_t str_bytes;
int32_t str_offset; // offset into string data for this page
bool has_page_index; // true if str_bytes, num_valids, etc are derivable from page indexes

// nesting information (input/output) for each page. this array contains
Expand Down Expand Up @@ -420,7 +420,8 @@ struct ColumnChunkDesc {
src_col_schema(src_col_schema_),
h_chunk_info(chunk_info_),
list_bytes_per_row_est(list_bytes_per_row_est_),
is_strings_to_cat(strings_to_categorical_)
is_strings_to_cat(strings_to_categorical_),
is_large_string_col(false)
{
}

Expand Down Expand Up @@ -454,7 +455,8 @@ struct ColumnChunkDesc {

float list_bytes_per_row_est{}; // for LIST columns, an estimate on number of bytes per row

bool is_strings_to_cat{}; // convert strings to hashes
bool is_strings_to_cat{}; // convert strings to hashes
bool is_large_string_col{}; // `true` if string data exceeds 2GB limit
etseidl marked this conversation as resolved.
Show resolved Hide resolved
};

/**
Expand Down
29 changes: 21 additions & 8 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cudf/detail/transform.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/strings/detail/utilities.hpp>

#include <rmm/resource_ref.hpp>

Expand Down Expand Up @@ -99,11 +100,21 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row
col_string_sizes = calculate_page_string_offsets();

// check for overflow
if (std::any_of(col_string_sizes.cbegin(), col_string_sizes.cend(), [](std::size_t sz) {
return sz > std::numeric_limits<size_type>::max();
})) {
auto const threshold = static_cast<size_t>(strings::detail::get_offset64_threshold());
auto const has_large_strings = std::any_of(col_string_sizes.cbegin(),
col_string_sizes.cend(),
[=](std::size_t sz) { return sz > threshold; });
if (has_large_strings and not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}

// mark any chunks that are large string columns
if (has_large_strings) {
for (auto& chunk : pass.chunks) {
auto const idx = chunk.src_col_index;
if (col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; }
}
}
}

// In order to reduce the number of allocations of hostdevice_vector, we allocate a single vector
Expand Down Expand Up @@ -348,11 +359,13 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row
} else if (out_buf.type.id() == type_id::STRING) {
// need to cap off the string offsets column
auto const sz = static_cast<size_type>(col_string_sizes[idx]);
CUDF_CUDA_TRY(cudaMemcpyAsync(static_cast<size_type*>(out_buf.data()) + out_buf.size,
&sz,
sizeof(size_type),
cudaMemcpyDefault,
_stream.value()));
if (sz <= strings::detail::get_offset64_threshold()) {
CUDF_CUDA_TRY(cudaMemcpyAsync(static_cast<size_type*>(out_buf.data()) + out_buf.size,
&sz,
sizeof(size_type),
cudaMemcpyDefault,
_stream.value()));
}
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1169,10 +1169,10 @@ struct page_to_string_size {
struct page_offset_output_iter {
PageInfo* p;

using value_type = size_type;
using difference_type = size_type;
using pointer = size_type*;
using reference = size_type&;
using value_type = size_t;
using difference_type = size_t;
using pointer = size_t*;
using reference = size_t&;
using iterator_category = thrust::output_device_iterator_tag;

__host__ __device__ page_offset_output_iter operator+(int i) { return {p + i}; }
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/lists/detail/dremel.hpp>
#include <cudf/lists/lists_column_view.hpp>
#include <cudf/strings/detail/utilities.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/structs/structs_column_view.hpp>
#include <cudf/table/table_device_view.cuh>
Expand Down Expand Up @@ -278,8 +279,8 @@ size_t column_size(column_view const& column, rmm::cuda_stream_view stream)
return size_of(column.type()) * column.size();
} else if (column.type().id() == type_id::STRING) {
auto const scol = strings_column_view(column);
return cudf::detail::get_value<size_type>(scol.offsets(), column.size(), stream) -
cudf::detail::get_value<size_type>(scol.offsets(), 0, stream);
return cudf::strings::detail::get_offset_value(scol.offsets(), column.size(), stream) -
cudf::strings::detail::get_offset_value(scol.offsets(), 0, stream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that the input column could have been sliced?
If so, then this would be more correct.

Suggested change
return cudf::strings::detail::get_offset_value(scol.offsets(), column.size(), stream) -
cudf::strings::detail::get_offset_value(scol.offsets(), 0, stream);
return cudf::strings::detail::get_offset_value(scol.offsets(), column.size() + column.offset(), stream) -
cudf::strings::detail::get_offset_value(scol.offsets(), column.offset(), stream);

Note that if the column has not been sliced then column.offset()==0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, IIRC it was originally written that way due to a concern @vuule had about sliced columns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't get_offset_value already adjust for the column offset? AFAICT, it's using get_value, which uses data(), which is implemented as head<T>() + _offset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The get_offset_value() takes an offsets column which does not include it's parent's sliced values offset/size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thank you for clearing that up.

} else if (column.type().id() == type_id::STRUCT) {
auto const scol = structs_column_view(column);
size_t ret = 0;
Expand Down
10 changes: 0 additions & 10 deletions cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,6 @@ void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes
_string_data = rmm::device_buffer(num_bytes, stream, _mr);
}

std::unique_ptr<column> cudf::io::detail::inline_column_buffer::make_string_column_impl(
rmm::cuda_stream_view stream)
{
// no need for copies, just transfer ownership of the data_buffers to the columns
auto offsets_col = std::make_unique<column>(
data_type{type_to_id<size_type>()}, size + 1, std::move(_data), rmm::device_buffer{}, 0);
return make_strings_column(
size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask));
}

namespace {

/**
Expand Down
53 changes: 53 additions & 0 deletions cpp/src/io/utilities/column_buffer_strings.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "column_buffer.hpp"

#include <cudf/strings/detail/strings_children.cuh>
#include <cudf/strings/detail/utilities.hpp>
#include <cudf/utilities/error.hpp>

namespace cudf::io::detail {

std::unique_ptr<column> cudf::io::detail::inline_column_buffer::make_string_column_impl(
rmm::cuda_stream_view stream)
{
// if the size of _string_data is over the threshold for 64bit size_type, _data will contain
// sizes rather than offsets. need special handling for that case.
auto const threshold = static_cast<size_t>(strings::detail::get_offset64_threshold());
if (_string_data.size() > threshold) {
if (not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}
// create new offsets
auto const offsets_ptr = static_cast<size_type*>(_data.data());
auto offsets_col = make_numeric_column(
data_type{type_id::INT64}, size + 1, mask_state::UNALLOCATED, stream, _mr);
auto d_offsets64 = offsets_col->mutable_view().template data<int64_t>();
// it's safe to call with size + 1 because _data is also sized that large
cudf::detail::sizes_to_offsets(offsets_ptr, offsets_ptr + size + 1, d_offsets64, stream);
return make_strings_column(
size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask));
} else {
// no need for copies, just transfer ownership of the data_buffers to the columns
auto offsets_col = std::make_unique<column>(
data_type{type_to_id<size_type>()}, size + 1, std::move(_data), rmm::device_buffer{}, 0);
return make_strings_column(
size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask));
}
}

} // namespace cudf::io::detail
2 changes: 1 addition & 1 deletion cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ ConfigureTest(
# * large strings test ----------------------------------------------------------------------------
ConfigureTest(
LARGE_STRINGS_TEST large_strings/large_strings_fixture.cpp large_strings/merge_tests.cpp
large_strings/concatenate_tests.cpp
large_strings/concatenate_tests.cpp large_strings/parquet_tests.cpp
GPUS 1
PERCENT 100
)
Expand Down
73 changes: 73 additions & 0 deletions cpp/tests/large_strings/parquet_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "large_strings_fixture.hpp"

#include <cudf_test/column_utilities.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/table_utilities.hpp>

#include <cudf/io/parquet.hpp>
#include <cudf/io/types.hpp>
#include <cudf/table/table_view.hpp>

namespace {

cudf::test::TempDirTestEnvironment* const g_temp_env =
static_cast<cudf::test::TempDirTestEnvironment*>(
::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment));

} // namespace

struct ParquetStringsTest : public cudf::test::StringsLargeTest {};

TEST_F(ParquetStringsTest, ReadLargeStrings)
{
// need to create a string column larger than `threshold`
constexpr int string_width = 1'024;
constexpr size_t column_size = 512 * 1'024 * 1'024U;
constexpr size_t threshold = column_size - 1;
constexpr int nrows = column_size / string_width;

auto elements = thrust::constant_iterator<std::string_view>(std::string(string_width, 'a'));
auto const col0 = cudf::test::strings_column_wrapper(elements, elements + nrows);
etseidl marked this conversation as resolved.
Show resolved Hide resolved
auto const expected = cudf::table_view{{col0, col0, col0}};

auto expected_metadata = cudf::io::table_input_metadata{expected};
expected_metadata.column_metadata[1].set_encoding(
cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY);
expected_metadata.column_metadata[2].set_encoding(cudf::io::column_encoding::DELTA_BYTE_ARRAY);

// set smaller threshold to reduce file size and execution time
setenv("LIBCUDF_LARGE_STRINGS_THRESHOLD", std::to_string(threshold).c_str(), 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

candidate for inclusion in StringsLargeTest?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good idea.
I think I'd want it behave like the CUDF_TEST_ENABLE_LARGE_STRINGS() macro where it automatically unsets the environment variable at the end of the scope. I can do this in a follow on PR so to keep this one more focused.


auto const filepath = g_temp_env->get_temp_filepath("ReadLargeStrings.parquet");
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected)
.compression(cudf::io::compression_type::ZSTD)
.stats_level(cudf::io::STATISTICS_NONE)
.metadata(expected_metadata);
cudf::io::write_parquet(out_opts);

cudf::io::parquet_reader_options default_in_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath});
auto const result = cudf::io::read_parquet(default_in_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(result.tbl->view(), expected);
etseidl marked this conversation as resolved.
Show resolved Hide resolved

// go back to normal threshold
unsetenv("LIBCUDF_LARGE_STRINGS_THRESHOLD");
}
Loading