Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to request Parquet encodings on a per-column basis #15081

Merged
merged 30 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
94d6401
add ability to request encodings per column
etseidl Feb 16, 2024
c77ecb1
remove out of date fixme
etseidl Feb 17, 2024
d5c2fdd
fix delta_byte_array case
etseidl Feb 17, 2024
dfe853b
clean up definition of is_use_delta
etseidl Feb 19, 2024
5b4c368
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 20, 2024
db821e2
Merge branch 'rapidsai:branch-24.04' into select_encodings
etseidl Feb 21, 2024
650376b
Merge remote-tracking branch 'origin/branch-24.04' into select_encodings
etseidl Feb 22, 2024
684ca61
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 24, 2024
1bf5c98
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 26, 2024
3445f59
refactor to use enum rather than strings for setting encodings
etseidl Feb 27, 2024
64e3234
clean up leftover cruft
etseidl Feb 27, 2024
67e85de
and a little more cruft
etseidl Feb 27, 2024
3989641
clean up some boolean logic
etseidl Feb 27, 2024
c0f769f
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 27, 2024
d5b451e
warn on DELTA_BYTE_ARRAY
etseidl Feb 27, 2024
45a9edc
suggested changes to warnings
etseidl Feb 27, 2024
4a3d434
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 27, 2024
fc38013
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 28, 2024
1620728
Merge branch 'branch-24.04' into select_encodings
etseidl Feb 29, 2024
0e3e8f5
Merge branch 'branch-24.04' into select_encodings
etseidl Mar 1, 2024
72ff916
Merge branch 'branch-24.04' into select_encodings
etseidl Mar 4, 2024
44473b0
catch corner case where dict encoding is requested, but cannot be used
etseidl Mar 4, 2024
7201a9b
Merge branch 'branch-24.04' into select_encodings
etseidl Mar 4, 2024
5405ddf
add test and refactor UserRequestedEncodings test
etseidl Mar 5, 2024
bf3ee39
Merge branch 'branch-24.04' into select_encodings
etseidl Mar 5, 2024
eb8631d
Merge branch 'branch-24.04' into select_encodings
vuule Mar 5, 2024
9882133
change enum name per review comment
etseidl Mar 5, 2024
810d951
Merge remote-tracking branch 'origin/branch-24.04' into select_encodings
etseidl Mar 5, 2024
ea63ec2
implement suggestion from review
etseidl Mar 5, 2024
26794f0
Merge branch 'branch-24.04' into select_encodings
etseidl Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ enum statistics_freq {
STATISTICS_COLUMN = 3, ///< Full column and offset indices. Implies STATISTICS_ROWGROUP
};

/**
* @brief Valid parquet encodings for use with `column_in_metadata::set_encoding()`
*/
struct parquet_encoding {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note on my thinking here (or lack thereof)...I was thinking using strings to specify the desired encoding would be better than an enum since the column_input_metadata is shared between multiple encoders, and it would be more natural to use strings with a CLI or through the python interface. And if ORC has different encoding names, then we could add another set of constants for that.

But a user interface could translate string values to an enum, and the enum could just add as many fields as necessary, some not relevant to one implementation or the other, so maybe this is silly. This acts like a scoped enum already, so I'm not opposed to switching.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does seem like this could be an enum :) I don't think a lot of code would change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I rushed a bit with this comment. Would this become just encoding and contain a superset of all encoding types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, probably, or page_encoding maybe. But it would have to apply to all encoders that use the metadata (which I assume is just parquet and orc for now).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

page_encoding isn't a great name for ORC...how about column_encoding?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't believe I forgot my boy ORC.
sounds good!

static std::string const PLAIN; ///< Use plain encoding
static std::string const DICTIONARY; ///< Use dictionary encoding
static std::string const
DELTA_BINARY_PACKED; ///< Use DELTA_BINARY_PACKED encoding (only valid for integer columns)
static std::string const DELTA_LENGTH_BYTE_ARRAY; ///< Use DELTA_LENGTH_BYTE_ARRAY encoding (only
///< valid for BYTE_ARRAY columns)
static std::string const DELTA_BYTE_ARRAY; ///< Use DELTA_BYTE_ARRAY encoding (only valid for
///< BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY columns)
};

/**
* @brief Statistics about compression performed by a writer.
*/
Expand Down Expand Up @@ -585,6 +599,7 @@ class column_in_metadata {
std::optional<uint8_t> _decimal_precision;
std::optional<int32_t> _parquet_field_id;
std::vector<column_in_metadata> children;
std::optional<std::string> _encoding;

public:
column_in_metadata() = default;
Expand Down Expand Up @@ -701,6 +716,22 @@ class column_in_metadata {
return *this;
}

/**
* @brief Sets the encoding to use for this column.
*
* This is just a request, and the encoder may still choose to use a different encoding
* depending on resource constraints. Use the constants defined in the `parquet_encoding`
* struct.
*
* @param encoding The encoding to use
* @return this for chaining
*/
column_in_metadata& set_encoding(std::string const& encoding) noexcept
{
_encoding = encoding;
return *this;
}

/**
* @brief Get reference to a child of this column
*
Expand Down Expand Up @@ -806,6 +837,22 @@ class column_in_metadata {
* @return Boolean indicating whether to encode this column as binary data
*/
[[nodiscard]] bool is_enabled_output_as_binary() const noexcept { return _output_as_binary; }

/**
* @brief Get whether the encoding has been set for this column.
*
* @return Boolean indicating whether and encoding has been set for this column
*/
[[nodiscard]] bool is_encoding_set() const noexcept { return _encoding.has_value(); }

/**
* @brief Get the encoding that was set for this column.
*
* @throws std::bad_optional_access If encoding was not set for this
* column. Check using `is_encoding_set()` first.
* @return The encoding that was set for this column
*/
[[nodiscard]] std::string get_encoding() const { return _encoding.value(); }
};

/**
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@
#include <algorithm>

namespace cudf::io {

std::string const parquet_encoding::PLAIN = "PLAIN";
std::string const parquet_encoding::DICTIONARY = "DICTIONARY";
std::string const parquet_encoding::DELTA_BINARY_PACKED = "DELTA_BINARY_PACKED";
std::string const parquet_encoding::DELTA_LENGTH_BYTE_ARRAY = "DELTA_LENGTH_BYTE_ARRAY";
std::string const parquet_encoding::DELTA_BYTE_ARRAY = "DELTA_BYTE_ARRAY";

// Returns builder for csv_reader_options
csv_reader_options_builder csv_reader_options::builder(source_info src)
{
Expand Down
21 changes: 18 additions & 3 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -577,8 +577,10 @@ CUDF_KERNEL void __launch_bounds__(128)
auto const physical_type = col_g.physical_type;
auto const type_id = col_g.leaf_column->type().id();
auto const is_use_delta =
write_v2_headers && !ck_g.use_dictionary &&
(physical_type == INT32 || physical_type == INT64 || physical_type == BYTE_ARRAY);
col_g.requested_encoding == Encoding::DELTA_BINARY_PACKED ||
col_g.requested_encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY ||
(write_v2_headers && !ck_g.use_dictionary &&
(physical_type == INT32 || physical_type == INT64 || physical_type == BYTE_ARRAY));

if (t < 32) {
uint32_t fragments_in_chunk = 0;
Expand Down Expand Up @@ -789,7 +791,20 @@ CUDF_KERNEL void __launch_bounds__(128)
if (t == 0) {
if (not pages.empty()) {
// set encoding
if (is_use_delta) {
if (col_g.requested_encoding != Encoding::UNDEFINED) {
switch (col_g.requested_encoding) {
case Encoding::PLAIN: page_g.kernel_mask = encode_kernel_mask::PLAIN; break;
case Encoding::RLE_DICTIONARY:
page_g.kernel_mask = encode_kernel_mask::DICTIONARY;
break;
case Encoding::DELTA_BINARY_PACKED:
page_g.kernel_mask = encode_kernel_mask::DELTA_BINARY;
break;
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
page_g.kernel_mask = encode_kernel_mask::DELTA_LENGTH_BA;
break;
}
} else if (is_use_delta) {
// TODO(ets): at some point make a more intelligent decision on this. DELTA_LENGTH_BA
// should always be preferred over PLAIN, but DELTA_BINARY is a different matter.
// If the delta encoding size is going to be close to 32 bits anyway, then plain
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ enum class Encoding : uint8_t {
RLE_DICTIONARY = 8,
BYTE_STREAM_SPLIT = 9,
NUM_ENCODINGS = 10,
UNDEFINED = 255,
etseidl marked this conversation as resolved.
Show resolved Hide resolved
};

/**
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,11 @@ struct parquet_column_device_view : stats_column_desc {
size_type const* level_offsets; //!< Offset array for per-row pre-calculated rep/def level values
uint8_t const* rep_values; //!< Pre-calculated repetition level values
uint8_t const* def_values; //!< Pre-calculated definition level values
uint8_t const* nullability; //!< Array of nullability of each nesting level. e.g. nullable[0] is
//!< nullability of parent_column. May be different from
//!< col.nullable() in case of chunked writing.
bool output_as_byte_array; //!< Indicates this list column is being written as a byte array
uint8_t const* nullability; //!< Array of nullability of each nesting level. e.g. nullable[0] is
//!< nullability of parent_column. May be different from
//!< col.nullable() in case of chunked writing.
bool output_as_byte_array; //!< Indicates this list column is being written as a byte array
Encoding requested_encoding; //!< User specified encoding for this column.
};

struct EncColumnChunk;
Expand Down
70 changes: 63 additions & 7 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,24 @@ bool is_col_fixed_width(column_view const& column)
return is_fixed_width(column.type());
}

// Convert encoding passed in by user to the correct enum value. Returns `std::nullopt`
// if passed an unsupported (or mistyped) encoding.
std::optional<Encoding> string_to_encoding(std::string const& encoding)
{
if (encoding == parquet_encoding::PLAIN) {
return Encoding::PLAIN;
} else if (encoding == parquet_encoding::DICTIONARY) {
return Encoding::RLE_DICTIONARY;
} else if (encoding == parquet_encoding::DELTA_BINARY_PACKED) {
return Encoding::DELTA_BINARY_PACKED;
} else if (encoding == parquet_encoding::DELTA_LENGTH_BYTE_ARRAY) {
return Encoding::DELTA_LENGTH_BYTE_ARRAY;
} else if (encoding == parquet_encoding::DELTA_BYTE_ARRAY) {
return Encoding::DELTA_BYTE_ARRAY;
}
return std::nullopt;
}

/**
* @brief Extends SchemaElement to add members required in constructing parquet_column_view
*
Expand All @@ -268,11 +286,13 @@ bool is_col_fixed_width(column_view const& column)
* 2. stats_dtype: datatype for statistics calculation required for the data stream of a leaf node.
* 3. ts_scale: scale to multiply or divide timestamp by in order to convert timestamp to parquet
* supported types
* 4. requested_encoding: A user provided encoding to use for the column.
*/
struct schema_tree_node : public SchemaElement {
cudf::detail::LinkedColPtr leaf_column;
statistics_dtype stats_dtype;
int32_t ts_scale;
std::optional<Encoding> requested_encoding;

// TODO(fut): Think about making schema a class that holds a vector of schema_tree_nodes. The
// function construct_schema_tree could be its constructor. It can have method to get the per
Expand Down Expand Up @@ -589,7 +609,7 @@ std::vector<schema_tree_node> construct_schema_tree(

std::function<void(cudf::detail::LinkedColPtr const&, column_in_metadata&, size_t)> add_schema =
[&](cudf::detail::LinkedColPtr const& col, column_in_metadata& col_meta, size_t parent_idx) {
bool col_nullable = is_col_nullable(col, col_meta, write_mode);
bool const col_nullable = is_col_nullable(col, col_meta, write_mode);

auto set_field_id = [&schema, parent_idx](schema_tree_node& s,
column_in_metadata const& col_meta) {
Expand All @@ -605,6 +625,37 @@ std::vector<schema_tree_node> construct_schema_tree(
return child_col_type == type_id::UINT8;
};

// only call this after col_schema.type has been set
auto set_encoding = [&schema, parent_idx](schema_tree_node& s,
column_in_metadata const& col_meta) {
if (schema[parent_idx].name != "list" and col_meta.is_encoding_set()) {
auto enc = string_to_encoding(col_meta.get_encoding());

// do some validation on the requested encoding
// TODO(ets): should we print a warning or error out if the requested encoding is
// invalid? for now just silently fall back to the default encoder.
if (!enc.has_value() || !is_supported_encoding(enc.value())) { return; }
vuule marked this conversation as resolved.
Show resolved Hide resolved

switch (enc.value()) {
case Encoding::DELTA_BINARY_PACKED:
if (s.type != Type::INT32 && s.type != Type::INT64) { return; }
vuule marked this conversation as resolved.
Show resolved Hide resolved
break;

case Encoding::DELTA_LENGTH_BYTE_ARRAY:
if (s.type != Type::BYTE_ARRAY) { return; }
vuule marked this conversation as resolved.
Show resolved Hide resolved
break;

// this is not caught by the check for supported encodings above
case Encoding::DELTA_BYTE_ARRAY: return;

default: break;
}

// requested encoding seems to be ok, set it
s.requested_encoding = enc;
}
};

// There is a special case for a list<int8> column with one byte column child. This column can
// have a special flag that indicates we write this out as binary instead of a list. This is a
// more efficient storage mechanism for a single-depth list of bytes, but is a departure from
Expand All @@ -627,6 +678,7 @@ std::vector<schema_tree_node> construct_schema_tree(
col_schema.parent_idx = parent_idx;
col_schema.leaf_column = col;
set_field_id(col_schema, col_meta);
set_encoding(col_schema, col_meta);
col_schema.output_as_byte_array = col_meta.is_enabled_output_as_binary();
schema.push_back(col_schema);
} else if (col->type().id() == type_id::STRUCT) {
Expand Down Expand Up @@ -762,6 +814,7 @@ std::vector<schema_tree_node> construct_schema_tree(
col_schema.parent_idx = parent_idx;
col_schema.leaf_column = col;
set_field_id(col_schema, col_meta);
set_encoding(col_schema, col_meta);
schema.push_back(col_schema);
}
};
Expand Down Expand Up @@ -948,9 +1001,10 @@ parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_stream

desc.level_bits = CompactProtocolReader::NumRequiredBits(max_rep_level()) << 4 |
CompactProtocolReader::NumRequiredBits(max_def_level());
desc.nullability = _d_nullability.data();
desc.max_def_level = _max_def_level;
desc.max_rep_level = _max_rep_level;
desc.nullability = _d_nullability.data();
desc.max_def_level = _max_def_level;
desc.max_rep_level = _max_rep_level;
desc.requested_encoding = schema_node.requested_encoding.value_or(Encoding::UNDEFINED);
return desc;
}

Expand Down Expand Up @@ -1170,9 +1224,11 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
std::vector<rmm::device_uvector<slot_type>> hash_maps_storage;
hash_maps_storage.reserve(h_chunks.size());
for (auto& chunk : h_chunks) {
if (col_desc[chunk.col_desc_id].physical_type == Type::BOOLEAN ||
(col_desc[chunk.col_desc_id].output_as_byte_array &&
col_desc[chunk.col_desc_id].physical_type == Type::BYTE_ARRAY)) {
auto const& chunk_col_desc = col_desc[chunk.col_desc_id];
if (chunk_col_desc.physical_type == Type::BOOLEAN ||
(chunk_col_desc.output_as_byte_array && chunk_col_desc.physical_type == Type::BYTE_ARRAY) ||
(chunk_col_desc.requested_encoding != Encoding::UNDEFINED &&
vuule marked this conversation as resolved.
Show resolved Hide resolved
chunk_col_desc.requested_encoding != Encoding::RLE_DICTIONARY)) {
chunk.use_dictionary = false;
} else {
chunk.use_dictionary = true;
Expand Down
90 changes: 90 additions & 0 deletions cpp/tests/io/parquet_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,96 @@ TEST_F(ParquetWriterTest, RowGroupMetadata)
static_cast<int64_t>(num_rows * sizeof(column_type)));
}

TEST_F(ParquetWriterTest, UserRequestedEncodings)
{
constexpr int num_rows = 500;

auto const ones = thrust::make_constant_iterator(1);
auto const col =
cudf::test::fixed_width_column_wrapper<int32_t>{ones, ones + num_rows, no_nulls()};

auto const strings = thrust::make_constant_iterator("string");
auto const string_col =
cudf::test::strings_column_wrapper(strings, strings + num_rows, no_nulls());

auto const table = table_view(
{col, col, col, col, col, string_col, string_col, string_col, string_col, string_col});

cudf::io::table_input_metadata table_metadata(table);
table_metadata.column_metadata[0].set_name("int_plain");
table_metadata.column_metadata[0].set_encoding(cudf::io::parquet_encoding::PLAIN);
table_metadata.column_metadata[1].set_name("int_dict");
table_metadata.column_metadata[1].set_encoding(cudf::io::parquet_encoding::DICTIONARY);
table_metadata.column_metadata[2].set_name("int_delta_binary_packed");
table_metadata.column_metadata[2].set_encoding(cudf::io::parquet_encoding::DELTA_BINARY_PACKED);
table_metadata.column_metadata[3].set_name("int_delta_length_byte_array");
table_metadata.column_metadata[3].set_encoding(
cudf::io::parquet_encoding::DELTA_LENGTH_BYTE_ARRAY);
table_metadata.column_metadata[4].set_name("int_bogus");
table_metadata.column_metadata[4].set_encoding("no such encoding");
table_metadata.column_metadata[5].set_name("string_plain");
table_metadata.column_metadata[5].set_encoding(cudf::io::parquet_encoding::PLAIN);
table_metadata.column_metadata[6].set_name("string_dict");
table_metadata.column_metadata[6].set_encoding(cudf::io::parquet_encoding::DICTIONARY);
table_metadata.column_metadata[7].set_name("string_delta_length_byte_array");
table_metadata.column_metadata[7].set_encoding(
cudf::io::parquet_encoding::DELTA_LENGTH_BYTE_ARRAY);
table_metadata.column_metadata[8].set_name("string_delta_binary_packed");
table_metadata.column_metadata[8].set_encoding(cudf::io::parquet_encoding::DELTA_BINARY_PACKED);
table_metadata.column_metadata[9].set_name("string_bogus");
table_metadata.column_metadata[9].set_encoding("no such encoding");

for (auto& col_meta : table_metadata.column_metadata) {
col_meta.set_nullability(false);
}

auto const filepath = temp_env->get_temp_filepath("UserRequestedEncodings.parquet");
cudf::io::parquet_writer_options opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, table)
.metadata(table_metadata)
.stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN)
.compression(cudf::io::compression_type::ZSTD);
cudf::io::write_parquet(opts);

// check page headers to make sure each column is encoded with the appropriate encoder
auto const source = cudf::io::datasource::create(filepath);
cudf::io::parquet::detail::FileMetaData fmd;
read_footer(source, &fmd);

// no nulls and no repetition, so the only encoding used should be for the data.
// since we're writing v1, both dict and data pages should use PLAIN_DICTIONARY.
// requested plain
EXPECT_EQ(fmd.row_groups[0].columns[0].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN);
// requested dictionary
EXPECT_EQ(fmd.row_groups[0].columns[1].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN_DICTIONARY);
// requested delta_binary_packed
EXPECT_EQ(fmd.row_groups[0].columns[2].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::DELTA_BINARY_PACKED);
// requested delta_length_byte_array, but should fall back to dictionary
EXPECT_EQ(fmd.row_groups[0].columns[3].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN_DICTIONARY);
// requested nonsense, but should fall back to dictionary
EXPECT_EQ(fmd.row_groups[0].columns[4].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN_DICTIONARY);
// requested plain
EXPECT_EQ(fmd.row_groups[0].columns[5].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN);
// requested dictionary
EXPECT_EQ(fmd.row_groups[0].columns[6].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN_DICTIONARY);
// requested delta_length_byte_array
EXPECT_EQ(fmd.row_groups[0].columns[7].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::DELTA_LENGTH_BYTE_ARRAY);
// requested delta_binary_packed, but should fall back to dictionary
EXPECT_EQ(fmd.row_groups[0].columns[8].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN_DICTIONARY);
// requested nonsense, but should fall back to dictionary
EXPECT_EQ(fmd.row_groups[0].columns[9].meta_data.encodings[0],
cudf::io::parquet::detail::Encoding::PLAIN_DICTIONARY);
}

/////////////////////////////////////////////////////////////
// custom mem mapped data sink that supports device writes
template <bool supports_device_writes>
Expand Down
Loading