Skip to content

Commit

Permalink
Optionally write version 2 page headers in Parquet writer (#13751)
Browse files Browse the repository at this point in the history
Part of #13501. This adds the ability to write V2 page headers to the Parquet writer.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)

URL: #13751
  • Loading branch information
etseidl authored Aug 15, 2023
1 parent 1d58d5f commit da2560f
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 61 deletions.
48 changes: 48 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ class parquet_writer_options {
std::optional<size_type> _max_page_fragment_size;
// Optional compression statistics
std::shared_ptr<writer_compression_statistics> _compression_stats;
// write V2 page headers?
bool _v2_page_headers = false;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -712,6 +714,13 @@ class parquet_writer_options {
return _compression_stats;
}

/**
* @brief Returns `true` if V2 page headers should be written.
*
* @return `true` if V2 page headers should be written.
*/
[[nodiscard]] auto is_enabled_write_v2_headers() const { return _v2_page_headers; }

/**
* @brief Sets partitions.
*
Expand Down Expand Up @@ -829,6 +838,13 @@ class parquet_writer_options {
{
_compression_stats = std::move(comp_stats);
}

/**
* @brief Sets preference for V2 page headers. Write V2 page headers if set to `true`.
*
* @param val Boolean value to enable/disable writing of V2 page headers.
*/
void enable_write_v2_headers(bool val) { _v2_page_headers = val; }
};

/**
Expand Down Expand Up @@ -1060,6 +1076,14 @@ class parquet_writer_options_builder {
return *this;
}

/**
* @brief Set to true if V2 page headers are to be written.
*
* @param enabled Boolean value to enable/disable writing of V2 page headers.
* @return this for chaining
*/
parquet_writer_options_builder& write_v2_headers(bool enabled);

/**
* @brief move parquet_writer_options member once it's built.
*/
Expand Down Expand Up @@ -1141,6 +1165,8 @@ class chunked_parquet_writer_options {
std::optional<size_type> _max_page_fragment_size;
// Optional compression statistics
std::shared_ptr<writer_compression_statistics> _compression_stats;
// write V2 page headers?
bool _v2_page_headers = false;

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -1281,6 +1307,13 @@ class chunked_parquet_writer_options {
return _compression_stats;
}

/**
* @brief Returns `true` if V2 page headers should be written.
*
* @return `true` if V2 page headers should be written.
*/
[[nodiscard]] auto is_enabled_write_v2_headers() const { return _v2_page_headers; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -1384,6 +1417,13 @@ class chunked_parquet_writer_options {
_compression_stats = std::move(comp_stats);
}

/**
* @brief Sets preference for V2 page headers. Write V2 page headers if set to `true`.
*
* @param val Boolean value to enable/disable writing of V2 page headers.
*/
void enable_write_v2_headers(bool val) { _v2_page_headers = val; }

/**
* @brief creates builder to build chunked_parquet_writer_options.
*
Expand Down Expand Up @@ -1475,6 +1515,14 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Set to true if V2 page headers are to be written.
*
* @param enabled Boolean value to enable/disable writing of V2 page headers.
* @return this for chaining
*/
chunked_parquet_writer_options_builder& write_v2_headers(bool enabled);

/**
* @brief Sets the maximum row group size, in bytes.
*
Expand Down
21 changes: 21 additions & 0 deletions cpp/include/cudf_test/base_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,27 @@ class BaseFixture : public ::testing::Test {
rmm::mr::device_memory_resource* mr() { return _mr; }
};

/**
* @brief Base test fixture that takes a parameter.
*
* Example:
* ```
* class MyIntTestFixture : public cudf::test::BaseFixtureWithParam<int> {};
* ```
*/
template <typename T>
class BaseFixtureWithParam : public ::testing::TestWithParam<T> {
rmm::mr::device_memory_resource* _mr{rmm::mr::get_current_device_resource()};

public:
/**
* @brief Returns pointer to `device_memory_resource` that should be used for
* all tests inheriting from this fixture
* @return pointer to memory resource
*/
rmm::mr::device_memory_resource* mr() const { return _mr; }
};

template <typename T, typename Enable = void>
struct uniform_distribution_impl {};
template <typename T>
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,12 @@ parquet_writer_options_builder& parquet_writer_options_builder::max_page_fragmen
return *this;
}

parquet_writer_options_builder& parquet_writer_options_builder::write_v2_headers(bool enabled)
{
options.enable_write_v2_headers(enabled);
return *this;
}

void chunked_parquet_writer_options::set_key_value_metadata(
std::vector<std::map<std::string, std::string>> metadata)
{
Expand Down Expand Up @@ -831,6 +837,13 @@ chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::
return *this;
}

chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::write_v2_headers(
bool enabled)
{
options.enable_write_v2_headers(enabled);
return *this;
}

chunked_parquet_writer_options_builder&
chunked_parquet_writer_options_builder::max_page_fragment_size(size_type val)
{
Expand Down
15 changes: 14 additions & 1 deletion cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ bool CompactProtocolReader::read(PageHeader* p)
ParquetFieldInt32(2, p->uncompressed_page_size),
ParquetFieldInt32(3, p->compressed_page_size),
ParquetFieldStruct(5, p->data_page_header),
ParquetFieldStruct(7, p->dictionary_page_header));
ParquetFieldStruct(7, p->dictionary_page_header),
ParquetFieldStruct(8, p->data_page_header_v2));
return function_builder(this, op);
}

Expand All @@ -275,6 +276,18 @@ bool CompactProtocolReader::read(DictionaryPageHeader* d)
return function_builder(this, op);
}

bool CompactProtocolReader::read(DataPageHeaderV2* d)
{
auto op = std::make_tuple(ParquetFieldInt32(1, d->num_values),
ParquetFieldInt32(2, d->num_nulls),
ParquetFieldInt32(3, d->num_rows),
ParquetFieldEnum<Encoding>(4, d->encoding),
ParquetFieldInt32(5, d->definition_levels_byte_length),
ParquetFieldInt32(6, d->repetition_levels_byte_length),
ParquetFieldBool(7, d->is_compressed));
return function_builder(this, op);
}

bool CompactProtocolReader::read(KeyValue* k)
{
auto op = std::make_tuple(ParquetFieldString(1, k->key), ParquetFieldString(2, k->value));
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/compact_protocol_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class CompactProtocolReader {
bool read(PageHeader* p);
bool read(DataPageHeader* d);
bool read(DictionaryPageHeader* d);
bool read(DataPageHeaderV2* d);
bool read(KeyValue* k);
bool read(PageLocation* p);
bool read(OffsetIndex* o);
Expand Down
Loading

0 comments on commit da2560f

Please sign in to comment.