Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to Parquet writer to skip compressing individual columns #15411

Merged
merged 17 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ class column_in_metadata {
bool _list_column_is_map = false;
bool _use_int96_timestamp = false;
bool _output_as_binary = false;
bool _skip_compression = false;
std::optional<uint8_t> _decimal_precision;
std::optional<int32_t> _parquet_field_id;
std::vector<column_in_metadata> children;
Expand Down Expand Up @@ -722,6 +723,19 @@ class column_in_metadata {
return *this;
}

/**
* @brief Specifies whether this column should not be compressed regardless of the compression
* codec specified for the file.
*
* @param skip If `true` do not compress this column
* @return this for chaining
*/
column_in_metadata& set_skip_compression(bool skip) noexcept
{
_skip_compression = skip;
return *this;
}

/**
* @brief Sets the encoding to use for this column.
*
Expand Down Expand Up @@ -844,6 +858,13 @@ class column_in_metadata {
*/
[[nodiscard]] bool is_enabled_output_as_binary() const noexcept { return _output_as_binary; }

/**
* @brief Get whether to skip compressing this column
*
* @return Boolean indicating whether to skip compression of this column
*/
[[nodiscard]] bool is_enabled_skip_compression() const noexcept { return _skip_compression; }

/**
* @brief Get the encoding that was set for this column.
*
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1591,7 +1591,9 @@ __device__ void finish_page_encode(state_buf* s,
}
pages[blockIdx.x] = s->page;
if (not comp_results.empty()) {
comp_results[blockIdx.x] = {0, compression_status::FAILURE};
auto const status =
s->col.skip_compression ? compression_status::SKIPPED : compression_status::FAILURE;
vuule marked this conversation as resolved.
Show resolved Hide resolved
comp_results[blockIdx.x] = {0, status};
pages[blockIdx.x].comp_res = &comp_results[blockIdx.x];
}
}
Expand Down Expand Up @@ -2494,6 +2496,7 @@ CUDF_KERNEL void __launch_bounds__(decide_compression_block_size)
if (auto comp_res = curr_page.comp_res; comp_res != nullptr) {
auto const lvl_bytes = curr_page.is_v2() ? curr_page.level_bytes() : 0;
compressed_data_size += comp_res->bytes_written + lvl_bytes;
// TODO: would this be better as a ballot?
if (comp_res->status != compression_status::SUCCESS) {
atomicOr(&compression_error[warp_id], 1);
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ struct parquet_column_device_view : stats_column_desc {
//!< nullability of parent_column. May be different from
//!< col.nullable() in case of chunked writing.
bool output_as_byte_array; //!< Indicates this list column is being written as a byte array
bool skip_compression; //!< Skip compression for this column
column_encoding requested_encoding; //!< User specified encoding for this column.
};

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ struct schema_tree_node : public SchemaElement {
statistics_dtype stats_dtype;
int32_t ts_scale;
column_encoding requested_encoding;
bool skip_compression;

// TODO(fut): Think about making schema a class that holds a vector of schema_tree_nodes. The
// function construct_schema_tree could be its constructor. It can have method to get the per
Expand Down Expand Up @@ -698,6 +699,7 @@ std::vector<schema_tree_node> construct_schema_tree(
set_field_id(col_schema, col_meta);
set_encoding(col_schema, col_meta);
col_schema.output_as_byte_array = col_meta.is_enabled_output_as_binary();
col_schema.skip_compression = col_meta.is_enabled_skip_compression();
schema.push_back(col_schema);
} else if (col->type().id() == type_id::STRUCT) {
// if struct, add current and recursively call for all children
Expand Down Expand Up @@ -833,6 +835,7 @@ std::vector<schema_tree_node> construct_schema_tree(
col_schema.leaf_column = col;
set_field_id(col_schema, col_meta);
set_encoding(col_schema, col_meta);
col_schema.skip_compression = col_meta.is_enabled_skip_compression();
schema.push_back(col_schema);
}
};
Expand Down Expand Up @@ -1023,6 +1026,7 @@ parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_stream
desc.max_def_level = _max_def_level;
desc.max_rep_level = _max_rep_level;
desc.requested_encoding = schema_node.requested_encoding;
desc.skip_compression = schema_node.skip_compression;
return desc;
}

Expand Down
42 changes: 42 additions & 0 deletions cpp/tests/io/parquet_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@

#include <cudf/io/data_sink.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/io/types.hpp>
#include <cudf/unary.hpp>

#include <src/io/parquet/parquet_common.hpp>

#include <fstream>

using cudf::test::iterators::no_nulls;
Expand Down Expand Up @@ -1321,6 +1324,45 @@ TEST_F(ParquetWriterTest, CompStatsEmptyTable)
expect_compression_stats_empty(stats);
}

TEST_F(ParquetWriterTest, SkipCompression)
{
constexpr auto page_rows = 1000;
constexpr auto row_group_rows = 2 * page_rows;
constexpr auto num_rows = 2 * row_group_rows;

auto sequence = thrust::make_counting_iterator(0);
column_wrapper<int> col(sequence, sequence + num_rows, no_nulls());

auto expected = table_view{{col, col}};
auto expected_metadata = cudf::io::table_input_metadata{expected};
expected_metadata.column_metadata[0].set_skip_compression(true);

auto const filepath = temp_env->get_temp_filepath("SkipCompression.parquet");
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected)
.compression(cudf::io::compression_type::ZSTD)
.max_page_size_rows(page_rows)
.row_group_size_rows(row_group_rows)
.max_page_fragment_size(page_rows)
vuule marked this conversation as resolved.
Show resolved Hide resolved
.metadata(std::move(expected_metadata));

cudf::io::write_parquet(out_opts);

cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath});
auto result = cudf::io::read_parquet(read_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, expected);

// check metadata to make sure column 0 is not compressed and column 1 is
auto const source = cudf::io::datasource::create(filepath);
cudf::io::parquet::detail::FileMetaData fmd;
read_footer(source, &fmd);
vuule marked this conversation as resolved.
Show resolved Hide resolved

EXPECT_EQ(fmd.row_groups[0].columns[0].meta_data.codec, cudf::io::parquet::detail::UNCOMPRESSED);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can fail if compressed size is, by chance, larger than uncompressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm...not that line, but the line below could 😟

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the other one.
Still, it shouldn't change randomly. The way the values are encoded and the way the compression works should be very stable.

EXPECT_EQ(fmd.row_groups[0].columns[1].meta_data.codec, cudf::io::parquet::detail::ZSTD);
}

TEST_F(ParquetWriterTest, NoNullsAsNonNullable)
{
column_wrapper<int32_t> col{{1, 2, 3}, no_nulls()};
Expand Down
Loading