diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index d066b454840..0af561be8da 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -229,6 +229,16 @@ Encoding __device__ determine_encoding(PageType page_type, } } +// operator to use with warp_reduce. stolen from cub::Sum +struct BitwiseOr { + /// Binary OR operator, returns a | b + template + __host__ __device__ __forceinline__ T operator()(T const& a, T const& b) const + { + return a | b; + } +}; + } // anonymous namespace // blockDim {512,1,1} @@ -1445,6 +1455,7 @@ __global__ void __launch_bounds__(decide_compression_block_size) uint32_t uncompressed_data_size = 0; uint32_t compressed_data_size = 0; + uint32_t encodings = 0; auto const num_pages = ck_g[warp_id].num_pages; for (auto page_id = lane_id; page_id < num_pages; page_id += cudf::detail::warp_size) { auto const& curr_page = ck_g[warp_id].pages[page_id]; @@ -1457,10 +1468,14 @@ __global__ void __launch_bounds__(decide_compression_block_size) atomicOr(&compression_error[warp_id], 1); } } + // collect encoding info for the chunk metadata + encodings |= encoding_to_mask(curr_page.encoding); } uncompressed_data_size = warp_reduce(temp_storage[warp_id][0]).Sum(uncompressed_data_size); compressed_data_size = warp_reduce(temp_storage[warp_id][1]).Sum(compressed_data_size); __syncwarp(); + encodings = warp_reduce(temp_storage[warp_id][0]).Reduce(encodings, BitwiseOr{}); + __syncwarp(); if (lane_id == 0) { auto const write_compressed = compressed_data_size != 0 and compression_error[warp_id] == 0 and @@ -1469,6 +1484,12 @@ __global__ void __launch_bounds__(decide_compression_block_size) chunks[chunk_id].bfr_size = uncompressed_data_size; chunks[chunk_id].compressed_size = write_compressed ? compressed_data_size : uncompressed_data_size; + + // if there is repetition or definition level data add RLE encoding + auto const rle_bits = + ck_g[warp_id].col_desc->num_def_level_bits() + ck_g[warp_id].col_desc->num_rep_level_bits(); + if (rle_bits > 0) { encodings |= encoding_to_mask(Encoding::RLE); } + chunks[chunk_id].encodings = encodings; } } diff --git a/cpp/src/io/parquet/parquet_common.hpp b/cpp/src/io/parquet/parquet_common.hpp index ab6290c4ed6..5f8f1617cb9 100644 --- a/cpp/src/io/parquet/parquet_common.hpp +++ b/cpp/src/io/parquet/parquet_common.hpp @@ -92,6 +92,7 @@ enum class Encoding : uint8_t { DELTA_BYTE_ARRAY = 7, RLE_DICTIONARY = 8, BYTE_STREAM_SPLIT = 9, + NUM_ENCODINGS = 10, }; /** diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 0a8640aef26..e82b6abc13d 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -345,8 +345,8 @@ struct parquet_column_device_view : stats_column_desc { ConvertedType converted_type; //!< logical data type uint8_t level_bits; //!< bits to encode max definition (lower nibble) & repetition (upper nibble) //!< levels - constexpr uint8_t num_def_level_bits() { return level_bits & 0xf; } - constexpr uint8_t num_rep_level_bits() { return level_bits >> 4; } + constexpr uint8_t num_def_level_bits() const { return level_bits & 0xf; } + constexpr uint8_t num_rep_level_bits() const { return level_bits >> 4; } size_type const* const* nesting_offsets; //!< If column is a nested type, contains offset array of each nesting level @@ -384,6 +384,12 @@ constexpr size_t kDictScratchSize = (1 << kDictHashBits) * sizeof(uint32_t); struct EncPage; struct slot_type; +// convert Encoding to a mask value +constexpr uint32_t encoding_to_mask(Encoding encoding) +{ + return 1 << static_cast(encoding); +} + /** * @brief Struct describing an encoder column chunk */ @@ -420,6 +426,7 @@ struct EncColumnChunk { bool use_dictionary; //!< True if the chunk uses dictionary encoding uint8_t* column_index_blob; //!< Binary blob containing encoded column index for this chunk uint32_t column_index_size; //!< Size of column index blob + uint32_t encodings; //!< Mask representing the set of encodings used for this chunk }; /** @@ -748,6 +755,8 @@ void EncodePages(device_span pages, /** * @brief Launches kernel to make the compressed vs uncompressed chunk-level decision * + * Also calculates the set of page encodings used for each chunk. + * * @param[in,out] chunks Column chunks (updated with actual compressed/uncompressed sizes) * @param[in] stream CUDA stream to use */ diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index c5fc852d20b..d2976a3f5d9 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -193,6 +193,20 @@ parquet::Compression to_parquet_compression(compression_type compression) } } +/** + * @brief Convert a mask of encodings to a vector. + * + * @param encodings Vector of `Encoding`s to populate + * @param enc_mask Mask of encodings used + */ +void update_chunk_encodings(std::vector& encodings, uint32_t enc_mask) +{ + for (uint8_t enc = 0; enc < static_cast(Encoding::NUM_ENCODINGS); enc++) { + auto const enc_enum = static_cast(enc); + if ((enc_mask & gpu::encoding_to_mask(enc_enum)) != 0) { encodings.push_back(enc_enum); } + } +} + /** * @brief Compute size (in bytes) of the data stored in the given column. * @@ -1671,6 +1685,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, ck.start_row = start_row; ck.num_rows = (uint32_t)row_group.num_rows; ck.first_fragment = c * num_fragments + f; + ck.encodings = 0; auto chunk_fragments = row_group_fragments[c].subspan(f, fragments_in_chunk); // In fragment struct, add a pointer to the chunk it belongs to // In each fragment in chunk_fragments, update the chunk pointer here. @@ -1687,7 +1702,6 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, }); auto& column_chunk_meta = row_group.columns[c].meta_data; column_chunk_meta.type = parquet_columns[c].physical_type(); - column_chunk_meta.encodings = {Encoding::PLAIN, Encoding::RLE}; column_chunk_meta.path_in_schema = parquet_columns[c].get_path_in_schema(); column_chunk_meta.codec = UNCOMPRESSED; column_chunk_meta.num_values = ck.num_values; @@ -1703,17 +1717,6 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, row_group_fragments.host_to_device_async(stream); [[maybe_unused]] auto dict_info_owner = build_chunk_dictionaries( chunks, col_desc, row_group_fragments, compression, dict_policy, max_dictionary_size, stream); - for (size_t p = 0; p < partitions.size(); p++) { - for (int rg = 0; rg < num_rg_in_part[p]; rg++) { - size_t global_rg = global_rowgroup_base[p] + rg; - for (int col = 0; col < num_columns; col++) { - if (chunks.host_view()[rg][col].use_dictionary) { - agg_meta->file(p).row_groups[global_rg].columns[col].meta_data.encodings.push_back( - Encoding::PLAIN_DICTIONARY); - } - } - } - } // The code preceding this used a uniform fragment size for all columns. Now recompute // fragments with a (potentially) varying number of fragments per column. @@ -1949,6 +1952,8 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } max_write_size = std::max(max_write_size, ck.compressed_size); + update_chunk_encodings(column_chunk_meta.encodings, ck.encodings); + if (ck.ck_stat_size != 0) { std::vector const stats_blob = cudf::detail::make_std_vector_sync( device_span(dev_bfr, ck.ck_stat_size), stream); diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 8c7d598d33f..b210452f619 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -6599,4 +6599,74 @@ TEST_F(ParquetWriterTest, TimestampMicrosINT96NoOverflow) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TEST_P(ParquetV2Test, CheckEncodings) +{ + using cudf::io::parquet::Encoding; + constexpr auto num_rows = 100'000; + auto const is_v2 = GetParam(); + + auto const validity = cudf::test::iterators::no_nulls(); + // data should be PLAIN for v1, RLE for V2 + auto col0_data = + cudf::detail::make_counting_transform_iterator(0, [](auto i) -> bool { return i % 2 == 0; }); + // data should be PLAIN for both + auto col1_data = random_values(num_rows); + // data should be PLAIN_DICTIONARY for v1, PLAIN and RLE_DICTIONARY for v2 + auto col2_data = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return 1; }); + + cudf::test::fixed_width_column_wrapper col0{col0_data, col0_data + num_rows, validity}; + column_wrapper col1{col1_data.begin(), col1_data.end(), validity}; + column_wrapper col2{col2_data, col2_data + num_rows, validity}; + + auto expected = table_view{{col0, col1, col2}}; + + auto const filename = is_v2 ? "CheckEncodingsV2.parquet" : "CheckEncodingsV1.parquet"; + auto filepath = temp_env->get_temp_filepath(filename); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .max_page_size_rows(num_rows) + .write_v2_headers(is_v2); + cudf::io::write_parquet(out_opts); + + // make sure the expected encodings are present + auto contains = [](auto const& vec, auto const& enc) { + return std::find(vec.begin(), vec.end(), enc) != vec.end(); + }; + + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::FileMetaData fmd; + + read_footer(source, &fmd); + auto const& chunk0_enc = fmd.row_groups[0].columns[0].meta_data.encodings; + auto const& chunk1_enc = fmd.row_groups[0].columns[1].meta_data.encodings; + auto const& chunk2_enc = fmd.row_groups[0].columns[2].meta_data.encodings; + if (is_v2) { + // col0 should have RLE for rep/def and data + EXPECT_TRUE(chunk0_enc.size() == 1); + EXPECT_TRUE(contains(chunk0_enc, Encoding::RLE)); + // col1 should have RLE for rep/def and PLAIN for data + EXPECT_TRUE(chunk1_enc.size() == 2); + EXPECT_TRUE(contains(chunk1_enc, Encoding::RLE)); + EXPECT_TRUE(contains(chunk1_enc, Encoding::PLAIN)); + // col2 should have RLE for rep/def, PLAIN for dict, and RLE_DICTIONARY for data + EXPECT_TRUE(chunk2_enc.size() == 3); + EXPECT_TRUE(contains(chunk2_enc, Encoding::RLE)); + EXPECT_TRUE(contains(chunk2_enc, Encoding::PLAIN)); + EXPECT_TRUE(contains(chunk2_enc, Encoding::RLE_DICTIONARY)); + } else { + // col0 should have RLE for rep/def and PLAIN for data + EXPECT_TRUE(chunk0_enc.size() == 2); + EXPECT_TRUE(contains(chunk0_enc, Encoding::RLE)); + EXPECT_TRUE(contains(chunk0_enc, Encoding::PLAIN)); + // col1 should have RLE for rep/def and PLAIN for data + EXPECT_TRUE(chunk1_enc.size() == 2); + EXPECT_TRUE(contains(chunk1_enc, Encoding::RLE)); + EXPECT_TRUE(contains(chunk1_enc, Encoding::PLAIN)); + // col2 should have RLE for rep/def and PLAIN_DICTIONARY for data and dict + EXPECT_TRUE(chunk2_enc.size() == 2); + EXPECT_TRUE(contains(chunk2_enc, Encoding::RLE)); + EXPECT_TRUE(contains(chunk2_enc, Encoding::PLAIN_DICTIONARY)); + } +} + CUDF_TEST_PROGRAM_MAIN()