diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 64d627483e6..65d4a4417f0 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -602,6 +602,7 @@ class column_in_metadata { bool _list_column_is_map = false; bool _use_int96_timestamp = false; bool _output_as_binary = false; + bool _skip_compression = false; std::optional _decimal_precision; std::optional _parquet_field_id; std::vector children; @@ -722,6 +723,19 @@ class column_in_metadata { return *this; } + /** + * @brief Specifies whether this column should not be compressed regardless of the compression + * codec specified for the file. + * + * @param skip If `true` do not compress this column + * @return this for chaining + */ + column_in_metadata& set_skip_compression(bool skip) noexcept + { + _skip_compression = skip; + return *this; + } + /** * @brief Sets the encoding to use for this column. * @@ -844,6 +858,13 @@ class column_in_metadata { */ [[nodiscard]] bool is_enabled_output_as_binary() const noexcept { return _output_as_binary; } + /** + * @brief Get whether to skip compressing this column + * + * @return Boolean indicating whether to skip compression of this column + */ + [[nodiscard]] bool is_enabled_skip_compression() const noexcept { return _skip_compression; } + /** * @brief Get the encoding that was set for this column. * diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 114e47b325b..2db6dc4270d 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -1591,7 +1591,9 @@ __device__ void finish_page_encode(state_buf* s, } pages[blockIdx.x] = s->page; if (not comp_results.empty()) { - comp_results[blockIdx.x] = {0, compression_status::FAILURE}; + auto const status = + s->col.skip_compression ? compression_status::SKIPPED : compression_status::FAILURE; + comp_results[blockIdx.x] = {0, status}; pages[blockIdx.x].comp_res = &comp_results[blockIdx.x]; } } @@ -2495,6 +2497,7 @@ CUDF_KERNEL void __launch_bounds__(decide_compression_block_size) if (auto comp_res = curr_page.comp_res; comp_res != nullptr) { auto const lvl_bytes = curr_page.is_v2() ? curr_page.level_bytes() : 0; compressed_data_size += comp_res->bytes_written + lvl_bytes; + // TODO: would this be better as a ballot? if (comp_res->status != compression_status::SUCCESS) { atomicOr(&compression_error[warp_id], 1); } diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 200a8ec9ddb..b165c60b2cf 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -475,6 +475,7 @@ struct parquet_column_device_view : stats_column_desc { //!< nullability of parent_column. May be different from //!< col.nullable() in case of chunked writing. bool output_as_byte_array; //!< Indicates this list column is being written as a byte array + bool skip_compression; //!< Skip compression for this column column_encoding requested_encoding; //!< User specified encoding for this column. }; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index fd8d4f8bd7f..823a08084ee 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -274,6 +274,7 @@ struct schema_tree_node : public SchemaElement { statistics_dtype stats_dtype; int32_t ts_scale; column_encoding requested_encoding; + bool skip_compression; // TODO(fut): Think about making schema a class that holds a vector of schema_tree_nodes. The // function construct_schema_tree could be its constructor. It can have method to get the per @@ -698,6 +699,7 @@ std::vector construct_schema_tree( set_field_id(col_schema, col_meta); set_encoding(col_schema, col_meta); col_schema.output_as_byte_array = col_meta.is_enabled_output_as_binary(); + col_schema.skip_compression = col_meta.is_enabled_skip_compression(); schema.push_back(col_schema); } else if (col->type().id() == type_id::STRUCT) { // if struct, add current and recursively call for all children @@ -833,6 +835,7 @@ std::vector construct_schema_tree( col_schema.leaf_column = col; set_field_id(col_schema, col_meta); set_encoding(col_schema, col_meta); + col_schema.skip_compression = col_meta.is_enabled_skip_compression(); schema.push_back(col_schema); } }; @@ -1023,6 +1026,7 @@ parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_stream desc.max_def_level = _max_def_level; desc.max_rep_level = _max_rep_level; desc.requested_encoding = schema_node.requested_encoding; + desc.skip_compression = schema_node.skip_compression; return desc; } diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index ffa672fb564..caddfee9f02 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -24,8 +24,11 @@ #include #include +#include #include +#include + #include using cudf::test::iterators::no_nulls; @@ -1321,6 +1324,45 @@ TEST_F(ParquetWriterTest, CompStatsEmptyTable) expect_compression_stats_empty(stats); } +TEST_F(ParquetWriterTest, SkipCompression) +{ + constexpr auto page_rows = 1000; + constexpr auto row_group_rows = 2 * page_rows; + constexpr auto num_rows = 2 * row_group_rows; + + auto sequence = thrust::make_counting_iterator(0); + column_wrapper col(sequence, sequence + num_rows, no_nulls()); + + auto expected = table_view{{col, col}}; + auto expected_metadata = cudf::io::table_input_metadata{expected}; + expected_metadata.column_metadata[0].set_skip_compression(true); + + auto const filepath = temp_env->get_temp_filepath("SkipCompression.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .compression(cudf::io::compression_type::ZSTD) + .max_page_size_rows(page_rows) + .row_group_size_rows(row_group_rows) + .max_page_fragment_size(page_rows) + .metadata(std::move(expected_metadata)); + + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options read_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(read_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, expected); + + // check metadata to make sure column 0 is not compressed and column 1 is + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::detail::FileMetaData fmd; + read_footer(source, &fmd); + + EXPECT_EQ(fmd.row_groups[0].columns[0].meta_data.codec, cudf::io::parquet::detail::UNCOMPRESSED); + EXPECT_EQ(fmd.row_groups[0].columns[1].meta_data.codec, cudf::io::parquet::detail::ZSTD); +} + TEST_F(ParquetWriterTest, NoNullsAsNonNullable) { column_wrapper col{{1, 2, 3}, no_nulls()};