diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 531733a7df7..a5f6d737637 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -1175,7 +1175,8 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu int t) { // max nesting depth of the column - int const max_depth = s->col.max_nesting_depth; + int const max_depth = s->col.max_nesting_depth; + bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; // how many (input) values we've processed in the page so far int input_value_count = s->input_value_count; // how many rows we've processed in the page so far @@ -1235,7 +1236,7 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu uint32_t const warp_valid_mask = // for flat schemas, a simple ballot_sync gives us the correct count and bit positions // because every value in the input matches to a value in the output - max_depth == 1 + !has_repetition ? ballot(is_valid) : // for nested schemas, it's more complicated. This warp will visit 32 incoming values, @@ -1284,11 +1285,12 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // the correct position to start reading. since we are about to write the validity vector here // we need to adjust our computed mask to take into account the write row bounds. int const in_write_row_bounds = - max_depth == 1 + !has_repetition ? thread_row_index >= s->first_row && thread_row_index < (s->first_row + s->num_rows) : in_row_bounds; int const first_thread_in_write_range = - max_depth == 1 ? __ffs(ballot(in_write_row_bounds)) - 1 : 0; + !has_repetition ? __ffs(ballot(in_write_row_bounds)) - 1 : 0; + // # of bits to of the validity mask to write out int const warp_valid_mask_bit_count = first_thread_in_write_range < 0 ? 0 : warp_value_count - first_thread_in_write_range; @@ -1384,7 +1386,6 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, { // max nesting depth of the column int max_depth = s->col.max_nesting_depth; - // bool has_repetition = s->col.max_level[level_type::REPETITION] > 0 ? true : false; // how many input level values we've processed in the page so far int input_value_count = s->input_value_count; // how many leaf values we've processed in the page so far @@ -1479,6 +1480,11 @@ __global__ void __launch_bounds__(block_size) int t = threadIdx.x; PageInfo* pp = &pages[page_idx]; + // we only need to preprocess hierarchies with repetition in them (ie, hierarchies + // containing lists anywhere within). + bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0; + if (!has_repetition) { return; } + if (!setupLocalPageInfo(s, pp, chunks, trim_pass ? min_row : 0, trim_pass ? num_rows : INT_MAX)) { return; } @@ -1504,8 +1510,6 @@ __global__ void __launch_bounds__(block_size) } __syncthreads(); - bool has_repetition = s->col.max_level[level_type::REPETITION] > 0; - // optimization : it might be useful to have a version of gpuDecodeStream that could go wider than // 1 warp. Currently it only uses 1 warp so that it can overlap work with the value decoding step // when in the actual value decoding kernel. However, during this preprocess step we have no such @@ -1516,16 +1520,13 @@ __global__ void __launch_bounds__(block_size) while (!s->error && s->input_value_count < s->num_input_values) { // decode repetition and definition levels. these will attempt to decode at // least up to the target, but may decode a few more. - if (has_repetition) { - gpuDecodeStream(s->rep, s, target_input_count, t, level_type::REPETITION); - } + gpuDecodeStream(s->rep, s, target_input_count, t, level_type::REPETITION); gpuDecodeStream(s->def, s, target_input_count, t, level_type::DEFINITION); __syncwarp(); // we may have decoded different amounts from each stream, so only process what we've been - int actual_input_count = has_repetition ? min(s->lvl_count[level_type::REPETITION], - s->lvl_count[level_type::DEFINITION]) - : s->lvl_count[level_type::DEFINITION]; + int actual_input_count = + min(s->lvl_count[level_type::REPETITION], s->lvl_count[level_type::DEFINITION]); // process what we got back gpuUpdatePageSizes(s, actual_input_count, t, trim_pass); @@ -1573,6 +1574,8 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( ((s->col.data_type & 7) == BOOLEAN || (s->col.data_type & 7) == BYTE_ARRAY) ? 64 : 32; } + bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; + // skipped_leaf_values will always be 0 for flat hierarchies. uint32_t skipped_leaf_values = s->page.skipped_leaf_values; while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { @@ -1625,7 +1628,7 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( // - so we will end up ignoring the first two input rows, and input rows 2..n will // get written to the output starting at position 0. // - if (s->col.max_nesting_depth == 1) { dst_pos -= s->first_row; } + if (!has_repetition) { dst_pos -= s->first_row; } // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values // before first_row) in the flat hierarchy case. @@ -1765,6 +1768,8 @@ void PreprocessColumnData(hostdevice_vector& pages, // computes: // PageInfo::chunk_row for all pages + // Note: this is doing some redundant work for pages in flat hierarchies. chunk_row has already + // been computed during header decoding. the overall amount of work here is very small though. auto key_input = thrust::make_transform_iterator( pages.device_ptr(), [] __device__(PageInfo const& page) { return page.chunk_idx; }); auto page_input = thrust::make_transform_iterator( @@ -1840,26 +1845,14 @@ void PreprocessColumnData(hostdevice_vector& pages, return page.nesting[l_idx].size; }); - // compute column size. + // if this buffer is part of a list hierarchy, we need to determine it's + // final size and allocate it here. + // // for struct columns, higher levels of the output columns are shared between input // columns. so don't compute any given level more than once. - if (out_buf.size == 0) { + if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) { int size = thrust::reduce(rmm::exec_policy(stream), size_input, size_input + pages.size()); - // Handle a specific corner case. It is possible to construct a parquet file such that - // a column within a row group contains more rows than the row group itself. This may be - // invalid, but we have seen instances of this in the wild, including how they were created - // using the apache parquet tools. Normally, the trim pass would handle this case quietly, - // but if we are not running the trim pass (which is most of the time) we need to cap the - // number of rows we will allocate/read from the file with the amount specified in the - // associated row group. This only applies to columns that are not children of lists as - // those may have an arbitrary number of rows in them. - if (!uses_custom_row_bounds && - !(out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && - size > static_cast(num_rows)) { - size = static_cast(num_rows); - } - // if this is a list column add 1 for non-leaf levels for the terminating offset if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; } @@ -1867,16 +1860,21 @@ void PreprocessColumnData(hostdevice_vector& pages, out_buf.create(size, stream, mr); } - // compute per-page start offset - thrust::exclusive_scan_by_key(rmm::exec_policy(stream), - page_keys.begin(), - page_keys.end(), - size_input, - start_offset_output_iterator{pages.device_ptr(), - page_index.begin(), - 0, - static_cast(src_col_schema), - static_cast(l_idx)}); + // for nested hierarchies, compute per-page start offset. + // it would be better/safer to be checking (schema.max_repetition_level > 0) here, but there's + // no easy way to get at that info here. we'd have to move this function into reader_impl.cu + if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) || + out_buf.type.id() == type_id::LIST) { + thrust::exclusive_scan_by_key(rmm::exec_policy(stream), + page_keys.begin(), + page_keys.end(), + size_input, + start_offset_output_iterator{pages.device_ptr(), + page_index.begin(), + 0, + static_cast(src_col_schema), + static_cast(l_idx)}); + } } } diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 59bef6f5600..07869189089 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1353,26 +1353,39 @@ void reader::impl::preprocess_columns(hostdevice_vector& c hostdevice_vector& pages, size_t min_row, size_t total_rows, - bool uses_custom_row_bounds, - bool has_lists) + bool uses_custom_row_bounds) { - // TODO : we should be selectively preprocessing only columns that have - // lists in them instead of doing them all if even one contains lists. - - // if there are no lists, simply allocate every allocate every output - // column to be of size num_rows - if (!has_lists) { - std::function&)> create_columns = - [&](std::vector& cols) { - for (size_t idx = 0; idx < cols.size(); idx++) { - auto& col = cols[idx]; - col.create(total_rows, _stream, _mr); - create_columns(col.children); - } - }; - create_columns(_output_columns); - } else { - // preprocess per-nesting level sizes by page + // iterate over all input columns and allocate any associated output + // buffers if they are not part of a list hierarchy. mark down + // if we have any list columns that need further processing. + bool has_lists = false; + for (size_t idx = 0; idx < _input_columns.size(); idx++) { + auto const& input_col = _input_columns[idx]; + size_t const max_depth = input_col.nesting_depth(); + + auto* cols = &_output_columns; + for (size_t l_idx = 0; l_idx < max_depth; l_idx++) { + auto& out_buf = (*cols)[input_col.nesting[l_idx]]; + cols = &out_buf.children; + + // if this has a list parent, we will have to do further work in gpu::PreprocessColumnData + // to know how big this buffer actually is. + if (out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) { + has_lists = true; + } + // if we haven't already processed this column because it is part of a struct hierarchy + else if (out_buf.size == 0) { + // add 1 for the offset if this is a list column + out_buf.create( + out_buf.type.id() == type_id::LIST && l_idx < max_depth ? total_rows + 1 : total_rows, + _stream, + _mr); + } + } + } + + // if we have columns containing lists, further preprocessing is necessary. + if (has_lists) { gpu::PreprocessColumnData(pages, chunks, _input_columns, @@ -1636,9 +1649,6 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Keep track of column chunk file offsets std::vector column_chunk_offsets(num_chunks); - // if there are lists present, we need to preprocess - bool has_lists = false; - // Initialize column chunk information size_t total_decompressed_size = 0; auto remaining_rows = num_rows; @@ -1657,9 +1667,6 @@ table_with_metadata reader::impl::read(size_type skip_rows, auto& col_meta = _metadata->get_column_metadata(rg.index, rg.source_index, col.schema_idx); auto& schema = _metadata->get_schema(col.schema_idx); - // this column contains repetition levels and will require a preprocess - if (schema.max_repetition_level > 0) { has_lists = true; } - auto [type_width, clock_rate, converted_type] = conversion_info(to_type_id(schema, _strings_to_categorical, _timestamp_type.id()), _timestamp_type.id(), @@ -1755,7 +1762,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, // // - for nested schemas, output buffer offset values per-page, per nesting-level for the // purposes of decoding. - preprocess_columns(chunks, pages, skip_rows, num_rows, uses_custom_row_bounds, has_lists); + preprocess_columns(chunks, pages, skip_rows, num_rows, uses_custom_row_bounds); // decoding of column data itself decode_page_data(chunks, pages, page_nesting_info, skip_rows, num_rows); diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index e1f275bb8e8..6c3e05b4264 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -148,7 +148,7 @@ class reader::impl { hostdevice_vector& page_nesting_info); /** - * @brief Preprocess column information for nested schemas. + * @brief Preprocess column information and allocate output buffers. * * There are several pieces of information we can't compute directly from row counts in * the parquet headers when dealing with nested schemas. @@ -163,15 +163,13 @@ class reader::impl { * @param total_rows Maximum number of rows to read * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific * bounds - * @param has_lists Whether or not this data contains lists and requires * a preprocess. */ void preprocess_columns(hostdevice_vector& chunks, hostdevice_vector& pages, size_t min_row, size_t total_rows, - bool uses_custom_row_bounds, - bool has_lists); + bool uses_custom_row_bounds); /** * @brief Converts the page data and outputs to columns. diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 8a98efabcb5..134eff54144 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -112,7 +112,7 @@ std::unique_ptr create_compressible_fixed_table(cudf::size_type num // this function replicates the "list_gen" function in // python/cudf/cudf/tests/test_parquet.py template -std::unique_ptr make_parquet_list_col( +std::unique_ptr make_parquet_list_list_col( int skip_rows, int num_rows, int lists_per_row, int list_size, bool include_validity) { auto valids = @@ -2212,8 +2212,8 @@ TYPED_TEST(ParquetChunkedWriterNumericTypeTest, UnalignedSize) bool mask[] = {false, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, - true, true, true, true, true, true, true, true, true}; + true, true, true, true, true, true, true, true, true}; T c1a[num_els]; std::fill(c1a, c1a + num_els, static_cast(5)); T c1b[num_els]; @@ -2589,6 +2589,92 @@ TEST_F(ParquetReaderTest, UserBoundsWithNulls) } } +TEST_F(ParquetReaderTest, UserBoundsWithNullsMixedTypes) +{ + constexpr int num_rows = 32 * 1024; + + std::mt19937 gen(6542); + std::bernoulli_distribution bn(0.7f); + auto valids = + cudf::detail::make_counting_transform_iterator(0, [&](int index) { return bn(gen); }); + auto values = thrust::make_counting_iterator(0); + + // int64 + cudf::test::fixed_width_column_wrapper c0(values, values + num_rows, valids); + + // list + constexpr int floats_per_row = 4; + auto c1_offset_iter = cudf::detail::make_counting_transform_iterator( + 0, [floats_per_row](cudf::size_type idx) { return idx * floats_per_row; }); + cudf::test::fixed_width_column_wrapper c1_offsets( + c1_offset_iter, c1_offset_iter + num_rows + 1); + cudf::test::fixed_width_column_wrapper c1_floats( + values, values + (num_rows * floats_per_row), valids); + auto _c1 = cudf::make_lists_column(num_rows, + c1_offsets.release(), + c1_floats.release(), + cudf::UNKNOWN_NULL_COUNT, + cudf::test::detail::make_null_mask(valids, valids + num_rows)); + auto c1 = cudf::purge_nonempty_nulls(static_cast(*_c1)); + + // list> + auto c2 = make_parquet_list_list_col(0, num_rows, 5, 8, true); + + // struct, int, float> + std::vector strings{ + "abc", "x", "bananas", "gpu", "minty", "backspace", "", "cayenne", "turbine", "soft"}; + std::uniform_int_distribution uni(0, strings.size() - 1); + auto string_iter = cudf::detail::make_counting_transform_iterator( + 0, [&](cudf::size_type idx) { return strings[uni(gen)]; }); + constexpr int string_per_row = 3; + constexpr int num_string_rows = num_rows * string_per_row; + cudf::test::strings_column_wrapper string_col{string_iter, string_iter + num_string_rows}; + auto offset_iter = cudf::detail::make_counting_transform_iterator( + 0, [string_per_row](cudf::size_type idx) { return idx * string_per_row; }); + cudf::test::fixed_width_column_wrapper offsets(offset_iter, + offset_iter + num_rows + 1); + auto _c3_list = + cudf::make_lists_column(num_rows, + offsets.release(), + string_col.release(), + cudf::UNKNOWN_NULL_COUNT, + cudf::test::detail::make_null_mask(valids, valids + num_rows)); + auto c3_list = cudf::purge_nonempty_nulls(static_cast(*_c3_list)); + cudf::test::fixed_width_column_wrapper c3_ints(values, values + num_rows, valids); + cudf::test::fixed_width_column_wrapper c3_floats(values, values + num_rows, valids); + std::vector> c3_children; + c3_children.push_back(std::move(c3_list)); + c3_children.push_back(c3_ints.release()); + c3_children.push_back(c3_floats.release()); + cudf::test::structs_column_wrapper _c3(std::move(c3_children)); + auto c3 = cudf::purge_nonempty_nulls(static_cast(_c3)); + + // write it out + cudf::table_view tbl({c0, *c1, *c2, *c3}); + auto filepath = temp_env->get_temp_filepath("UserBoundsWithNullsMixedTypes.parquet"); + cudf::io::parquet_writer_options out_args = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tbl); + cudf::io::write_parquet(out_args); + + // read it back + std::vector> params{ + {-1, -1}, {0, num_rows}, {1, num_rows - 1}, {num_rows - 1, 1}, {517, 22000}}; + for (auto p : params) { + cudf::io::parquet_reader_options read_args = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + if (p.first >= 0) { read_args.set_skip_rows(p.first); } + if (p.second >= 0) { read_args.set_num_rows(p.second); } + auto result = cudf::io::read_parquet(read_args); + + p.first = p.first < 0 ? 0 : p.first; + p.second = p.second < 0 ? num_rows - p.first : p.second; + std::vector slice_indices{p.first, p.first + p.second}; + auto expected = cudf::slice(tbl, slice_indices); + + CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, expected[0]); + } +} + TEST_F(ParquetReaderTest, UserBoundsWithNullsLarge) { constexpr int num_rows = 30 * 1000000; @@ -2636,7 +2722,7 @@ TEST_F(ParquetReaderTest, UserBoundsWithNullsLarge) TEST_F(ParquetReaderTest, ListUserBoundsWithNullsLarge) { constexpr int num_rows = 5 * 1000000; - auto colp = make_parquet_list_col(0, num_rows, 5, 8, true); + auto colp = make_parquet_list_list_col(0, num_rows, 5, 8, true); cudf::column_view col = *colp; // this file will have row groups of 1,000,000 each