Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to request Parquet encodings on a per-column basis #15081

Merged
merged 30 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
94d6401
add ability to request encodings per column
etseidl Feb 16, 2024
c77ecb1
remove out of date fixme
etseidl Feb 17, 2024
d5c2fdd
fix delta_byte_array case
etseidl Feb 17, 2024
dfe853b
clean up definition of is_use_delta
etseidl Feb 19, 2024
5b4c368
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 20, 2024
db821e2
Merge branch 'rapidsai:branch-24.04' into select_encodings
etseidl Feb 21, 2024
650376b
Merge remote-tracking branch 'origin/branch-24.04' into select_encodings
etseidl Feb 22, 2024
684ca61
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 24, 2024
1bf5c98
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 26, 2024
3445f59
refactor to use enum rather than strings for setting encodings
etseidl Feb 27, 2024
64e3234
clean up leftover cruft
etseidl Feb 27, 2024
67e85de
and a little more cruft
etseidl Feb 27, 2024
3989641
clean up some boolean logic
etseidl Feb 27, 2024
c0f769f
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 27, 2024
d5b451e
warn on DELTA_BYTE_ARRAY
etseidl Feb 27, 2024
45a9edc
suggested changes to warnings
etseidl Feb 27, 2024
4a3d434
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 27, 2024
fc38013
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 28, 2024
1620728
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 29, 2024
0e3e8f5
Merge branch 'branch-24.04' into select_encodings
etseidl Mar 1, 2024
72ff916
Merge branch 'branch-24.04' into select_encodings
etseidl Mar 4, 2024
44473b0
catch corner case where dict encoding is requested, but cannot be used
etseidl Mar 4, 2024
7201a9b
Merge branch 'branch-24.04' into select_encodings
etseidl Mar 4, 2024
5405ddf
add test and refactor UserRequestedEncodings test
etseidl Mar 5, 2024
bf3ee39
Merge branch 'branch-24.04' into select_encodings
etseidl Mar 5, 2024
eb8631d
Merge branch 'branch-24.04' into select_encodings
vuule Mar 5, 2024
9882133
change enum name per review comment
etseidl Mar 5, 2024
810d951
Merge remote-tracking branch 'origin/branch-24.04' into select_encodings
etseidl Mar 5, 2024
ea63ec2
implement suggestion from review
etseidl Mar 5, 2024
26794f0
Merge branch 'branch-24.04' into select_encodings
etseidl Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,26 @@ enum statistics_freq {
STATISTICS_COLUMN = 3, ///< Full column and offset indices. Implies STATISTICS_ROWGROUP
};

/**
* @brief Valid encodings for use with `column_in_metadata::set_encoding()`
*/
enum class column_encoding {
// common encodings
NOT_SET = -1, ///< No encoding has been requested
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
DICTIONARY, ///< Use dictionary encoding
// parquet encodings
PLAIN, ///< Use plain encoding
DELTA_BINARY_PACKED, ///< Use DELTA_BINARY_PACKED encoding (only valid for integer columns)
DELTA_LENGTH_BYTE_ARRAY, ///< Use DELTA_LENGTH_BYTE_ARRAY encoding (only
///< valid for BYTE_ARRAY columns)
DELTA_BYTE_ARRAY, ///< Use DELTA_BYTE_ARRAY encoding (only valid for
///< BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY columns)
// orc encodings
DIRECT, ///< Use DIRECT encoding
DIRECT_V2, ///< Use DIRECT_V2 encoding
DICTIONARY_V2, ///< Use DICTIONARY_V2 encoding
};

/**
* @brief Statistics about compression performed by a writer.
*/
Expand Down Expand Up @@ -585,6 +605,7 @@ class column_in_metadata {
std::optional<uint8_t> _decimal_precision;
std::optional<int32_t> _parquet_field_id;
std::vector<column_in_metadata> children;
column_encoding _encoding = column_encoding::NOT_SET;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
column_encoding _encoding = column_encoding::NOT_SET;
column_encoding _encoding{column_encoding::NOT_SET};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing else in the struct (or AFAICT in this file) uses an initializer list...maybe clean this up separately?


public:
column_in_metadata() = default;
Expand Down Expand Up @@ -701,6 +722,22 @@ class column_in_metadata {
return *this;
}

/**
* @brief Sets the encoding to use for this column.
*
* This is just a request, and the encoder may still choose to use a different encoding
* depending on resource constraints. Use the constants defined in the `parquet_encoding`
* struct.
*
* @param encoding The encoding to use
* @return this for chaining
*/
column_in_metadata& set_encoding(column_encoding encoding) noexcept
{
_encoding = encoding;
return *this;
}

/**
* @brief Get reference to a child of this column
*
Expand Down Expand Up @@ -806,6 +843,13 @@ class column_in_metadata {
* @return Boolean indicating whether to encode this column as binary data
*/
[[nodiscard]] bool is_enabled_output_as_binary() const noexcept { return _output_as_binary; }

/**
* @brief Get the encoding that was set for this column.
*
* @return The encoding that was set for this column
*/
[[nodiscard]] column_encoding get_encoding() const { return _encoding; }
};

/**
Expand Down
23 changes: 20 additions & 3 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,13 @@ CUDF_KERNEL void __launch_bounds__(128)
// at the worst case number of bytes needed to encode.
auto const physical_type = col_g.physical_type;
auto const type_id = col_g.leaf_column->type().id();
auto const is_use_delta =
write_v2_headers && !ck_g.use_dictionary &&
auto const is_requested_delta =
col_g.requested_encoding == column_encoding::DELTA_BINARY_PACKED ||
col_g.requested_encoding == column_encoding::DELTA_LENGTH_BYTE_ARRAY;
auto const is_fallback_to_delta =
!ck_g.use_dictionary && write_v2_headers &&
(physical_type == INT32 || physical_type == INT64 || physical_type == BYTE_ARRAY);
auto const is_use_delta = is_requested_delta || is_fallback_to_delta;

if (t < 32) {
uint32_t fragments_in_chunk = 0;
Expand Down Expand Up @@ -786,7 +790,20 @@ CUDF_KERNEL void __launch_bounds__(128)
if (t == 0) {
if (not pages.empty()) {
// set encoding
if (is_use_delta) {
if (col_g.requested_encoding != column_encoding::NOT_SET) {
switch (col_g.requested_encoding) {
case column_encoding::PLAIN: page_g.kernel_mask = encode_kernel_mask::PLAIN; break;
case column_encoding::DICTIONARY:
page_g.kernel_mask = encode_kernel_mask::DICTIONARY;
break;
case column_encoding::DELTA_BINARY_PACKED:
page_g.kernel_mask = encode_kernel_mask::DELTA_BINARY;
break;
case column_encoding::DELTA_LENGTH_BYTE_ARRAY:
page_g.kernel_mask = encode_kernel_mask::DELTA_LENGTH_BA;
break;
}
} else if (is_use_delta) {
// TODO(ets): at some point make a more intelligent decision on this. DELTA_LENGTH_BA
// should always be preferred over PLAIN, but DELTA_BINARY is a different matter.
// If the delta encoding size is going to be close to 32 bits anyway, then plain
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ struct parquet_column_device_view : stats_column_desc {
//!< nullability of parent_column. May be different from
//!< col.nullable() in case of chunked writing.
bool output_as_byte_array; //!< Indicates this list column is being written as a byte array
column_encoding requested_encoding; //!< User specified encoding for this column.
};

struct EncColumnChunk;
Expand Down
60 changes: 53 additions & 7 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,13 @@ bool is_col_fixed_width(column_view const& column)
* 2. stats_dtype: datatype for statistics calculation required for the data stream of a leaf node.
* 3. ts_scale: scale to multiply or divide timestamp by in order to convert timestamp to parquet
* supported types
* 4. requested_encoding: A user provided encoding to use for the column.
*/
struct schema_tree_node : public SchemaElement {
cudf::detail::LinkedColPtr leaf_column;
statistics_dtype stats_dtype;
int32_t ts_scale;
column_encoding requested_encoding;

// TODO(fut): Think about making schema a class that holds a vector of schema_tree_nodes. The
// function construct_schema_tree could be its constructor. It can have method to get the per
Expand Down Expand Up @@ -588,7 +590,7 @@ std::vector<schema_tree_node> construct_schema_tree(

std::function<void(cudf::detail::LinkedColPtr const&, column_in_metadata&, size_t)> add_schema =
[&](cudf::detail::LinkedColPtr const& col, column_in_metadata& col_meta, size_t parent_idx) {
bool col_nullable = is_col_nullable(col, col_meta, write_mode);
bool const col_nullable = is_col_nullable(col, col_meta, write_mode);

auto set_field_id = [&schema, parent_idx](schema_tree_node& s,
column_in_metadata const& col_meta) {
Expand All @@ -604,6 +606,41 @@ std::vector<schema_tree_node> construct_schema_tree(
return child_col_type == type_id::UINT8;
};

// only call this after col_schema.type has been set
auto set_encoding = [&schema, parent_idx](schema_tree_node& s,
column_in_metadata const& col_meta) {
s.requested_encoding = column_encoding::NOT_SET;

if (schema[parent_idx].name != "list" and
col_meta.get_encoding() != column_encoding::NOT_SET) {
// do some validation
switch (col_meta.get_encoding()) {
case column_encoding::DELTA_BINARY_PACKED:
if (s.type != Type::INT32 && s.type != Type::INT64) { return; }
vuule marked this conversation as resolved.
Show resolved Hide resolved
break;

case column_encoding::DELTA_LENGTH_BYTE_ARRAY:
if (s.type != Type::BYTE_ARRAY) { return; }
vuule marked this conversation as resolved.
Show resolved Hide resolved
break;

// supported parquet encodings
case column_encoding::PLAIN:
case column_encoding::DICTIONARY: break;

// not yet supported for write (soon...)
case column_encoding::DELTA_BYTE_ARRAY: [[fallthrough]];
// all others
default:
CUDF_LOG_WARN("Unsupported page encoding requested: {}",
static_cast<int>(col_meta.get_encoding()));
vuule marked this conversation as resolved.
Show resolved Hide resolved
return;
}

// requested encoding seems to be ok, set it
s.requested_encoding = col_meta.get_encoding();
}
};

// There is a special case for a list<int8> column with one byte column child. This column can
// have a special flag that indicates we write this out as binary instead of a list. This is a
// more efficient storage mechanism for a single-depth list of bytes, but is a departure from
Expand All @@ -626,6 +663,7 @@ std::vector<schema_tree_node> construct_schema_tree(
col_schema.parent_idx = parent_idx;
col_schema.leaf_column = col;
set_field_id(col_schema, col_meta);
set_encoding(col_schema, col_meta);
col_schema.output_as_byte_array = col_meta.is_enabled_output_as_binary();
schema.push_back(col_schema);
} else if (col->type().id() == type_id::STRUCT) {
Expand Down Expand Up @@ -761,6 +799,7 @@ std::vector<schema_tree_node> construct_schema_tree(
col_schema.parent_idx = parent_idx;
col_schema.leaf_column = col;
set_field_id(col_schema, col_meta);
set_encoding(col_schema, col_meta);
schema.push_back(col_schema);
}
};
Expand Down Expand Up @@ -947,9 +986,10 @@ parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_stream

desc.level_bits = CompactProtocolReader::NumRequiredBits(max_rep_level()) << 4 |
CompactProtocolReader::NumRequiredBits(max_def_level());
desc.nullability = _d_nullability.data();
desc.max_def_level = _max_def_level;
desc.max_rep_level = _max_rep_level;
desc.nullability = _d_nullability.data();
desc.max_def_level = _max_def_level;
desc.max_rep_level = _max_rep_level;
desc.requested_encoding = schema_node.requested_encoding;
return desc;
}

Expand Down Expand Up @@ -1169,9 +1209,15 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
std::vector<rmm::device_uvector<slot_type>> hash_maps_storage;
hash_maps_storage.reserve(h_chunks.size());
for (auto& chunk : h_chunks) {
if (col_desc[chunk.col_desc_id].physical_type == Type::BOOLEAN ||
(col_desc[chunk.col_desc_id].output_as_byte_array &&
col_desc[chunk.col_desc_id].physical_type == Type::BYTE_ARRAY)) {
auto const& chunk_col_desc = col_desc[chunk.col_desc_id];
auto const is_requested_non_dict =
chunk_col_desc.requested_encoding != column_encoding::NOT_SET &&
chunk_col_desc.requested_encoding != column_encoding::DICTIONARY;
auto const is_type_non_dict =
chunk_col_desc.physical_type == Type::BOOLEAN ||
(chunk_col_desc.output_as_byte_array && chunk_col_desc.physical_type == Type::BYTE_ARRAY);

if (is_type_non_dict || is_requested_non_dict) {
chunk.use_dictionary = false;
} else {
chunk.use_dictionary = true;
Expand Down
88 changes: 88 additions & 0 deletions cpp/tests/io/parquet_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,94 @@ TEST_F(ParquetWriterTest, RowGroupMetadata)
static_cast<int64_t>(num_rows * sizeof(column_type)));
}

TEST_F(ParquetWriterTest, UserRequestedEncodings)
{
constexpr int num_rows = 500;

auto const ones = thrust::make_constant_iterator(1);
auto const col =
cudf::test::fixed_width_column_wrapper<int32_t>{ones, ones + num_rows, no_nulls()};

auto const strings = thrust::make_constant_iterator("string");
auto const string_col =
cudf::test::strings_column_wrapper(strings, strings + num_rows, no_nulls());

auto const table = table_view(
{col, col, col, col, col, string_col, string_col, string_col, string_col, string_col});

cudf::io::table_input_metadata table_metadata(table);
table_metadata.column_metadata[0].set_name("int_plain");
table_metadata.column_metadata[0].set_encoding(cudf::io::column_encoding::PLAIN);
table_metadata.column_metadata[1].set_name("int_dict");
table_metadata.column_metadata[1].set_encoding(cudf::io::column_encoding::DICTIONARY);
table_metadata.column_metadata[2].set_name("int_delta_binary_packed");
table_metadata.column_metadata[2].set_encoding(cudf::io::column_encoding::DELTA_BINARY_PACKED);
table_metadata.column_metadata[3].set_name("int_delta_length_byte_array");
table_metadata.column_metadata[3].set_encoding(
cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY);
table_metadata.column_metadata[4].set_name("int_none");
table_metadata.column_metadata[5].set_name("string_plain");
table_metadata.column_metadata[5].set_encoding(cudf::io::column_encoding::PLAIN);
table_metadata.column_metadata[6].set_name("string_dict");
table_metadata.column_metadata[6].set_encoding(cudf::io::column_encoding::DICTIONARY);
table_metadata.column_metadata[7].set_name("string_delta_length_byte_array");
table_metadata.column_metadata[7].set_encoding(
cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY);
table_metadata.column_metadata[8].set_name("string_delta_binary_packed");
table_metadata.column_metadata[8].set_encoding(cudf::io::column_encoding::DELTA_BINARY_PACKED);
table_metadata.column_metadata[9].set_name("string_none");

for (auto& col_meta : table_metadata.column_metadata) {
col_meta.set_nullability(false);
}

auto const filepath = temp_env->get_temp_filepath("UserRequestedEncodings.parquet");
cudf::io::parquet_writer_options opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, table)
.metadata(table_metadata)
.stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN)
.compression(cudf::io::compression_type::ZSTD);
cudf::io::write_parquet(opts);

// check page headers to make sure each column is encoded with the appropriate encoder
auto const source = cudf::io::datasource::create(filepath);
cudf::io::parquet::detail::FileMetaData fmd;
read_footer(source, &fmd);

// no nulls and no repetition, so the only encoding used should be for the data.
// since we're writing v1, both dict and data pages should use PLAIN_DICTIONARY.
// requested plain
EXPECT_EQ(fmd.row_groups[0].columns[0].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN);
// requested dictionary
EXPECT_EQ(fmd.row_groups[0].columns[1].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN_DICTIONARY);
// requested delta_binary_packed
EXPECT_EQ(fmd.row_groups[0].columns[2].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::DELTA_BINARY_PACKED);
// requested delta_length_byte_array, but should fall back to dictionary
EXPECT_EQ(fmd.row_groups[0].columns[3].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN_DICTIONARY);
// no request, should fall back to dictionary
EXPECT_EQ(fmd.row_groups[0].columns[4].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN_DICTIONARY);
// requested plain
EXPECT_EQ(fmd.row_groups[0].columns[5].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN);
// requested dictionary
EXPECT_EQ(fmd.row_groups[0].columns[6].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN_DICTIONARY);
// requested delta_length_byte_array
EXPECT_EQ(fmd.row_groups[0].columns[7].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::DELTA_LENGTH_BYTE_ARRAY);
// requested delta_binary_packed, but should fall back to dictionary
EXPECT_EQ(fmd.row_groups[0].columns[8].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN_DICTIONARY);
// no request, should fall back to dictionary
EXPECT_EQ(fmd.row_groups[0].columns[9].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN_DICTIONARY);
}

TEST_F(ParquetWriterTest, DeltaBinaryStartsWithNulls)
{
// test that the DELTA_BINARY_PACKED writer can properly encode a column that begins with
Expand Down
Loading