Skip to content

Commit

Permalink
Add Parquet encoding statistics to column chunk metadata (#15452)
Browse files Browse the repository at this point in the history
Closes #15313

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - Muhammad Haseeb (https://github.com/mhaseeb123)

URL: #15452
  • Loading branch information
etseidl authored Apr 26, 2024
1 parent 79cd473 commit d91a4ad
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 12 deletions.
13 changes: 13 additions & 0 deletions cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "compact_protocol_reader.hpp"

#include "parquet.hpp"
#include "parquet_common.hpp"

#include <cudf/utilities/error.hpp>

Expand Down Expand Up @@ -652,6 +653,9 @@ void CompactProtocolReader::read(ColumnChunkMetaData* c)
{
using optional_size_statistics =
parquet_field_optional<SizeStatistics, parquet_field_struct<SizeStatistics>>;
using optional_list_enc_stats =
parquet_field_optional<std::vector<PageEncodingStats>,
parquet_field_struct_list<PageEncodingStats>>;
auto op = std::make_tuple(parquet_field_enum<Type>(1, c->type),
parquet_field_enum_list(2, c->encodings),
parquet_field_string_list(3, c->path_in_schema),
Expand All @@ -663,6 +667,7 @@ void CompactProtocolReader::read(ColumnChunkMetaData* c)
parquet_field_int64(10, c->index_page_offset),
parquet_field_int64(11, c->dictionary_page_offset),
parquet_field_struct(12, c->statistics),
optional_list_enc_stats(13, c->encoding_stats),
optional_size_statistics(16, c->size_statistics));
function_builder(this, op);
}
Expand Down Expand Up @@ -774,6 +779,14 @@ void CompactProtocolReader::read(ColumnOrder* c)
function_builder(this, op);
}

void CompactProtocolReader::read(PageEncodingStats* s)
{
auto op = std::make_tuple(parquet_field_enum<PageType>(1, s->page_type),
parquet_field_enum<Encoding>(2, s->encoding),
parquet_field_int32(3, s->count));
function_builder(this, op);
}

void CompactProtocolReader::read(SortingColumn* s)
{
auto op = std::make_tuple(parquet_field_int32(1, s->column_idx),
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/compact_protocol_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class CompactProtocolReader {
void read(ColumnIndex* c);
void read(Statistics* s);
void read(ColumnOrder* c);
void read(PageEncodingStats* s);
void read(SortingColumn* s);

public:
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/io/parquet/compact_protocol_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ size_t CompactProtocolWriter::write(ColumnChunkMetaData const& s)
if (s.index_page_offset != 0) { c.field_int(10, s.index_page_offset); }
if (s.dictionary_page_offset != 0) { c.field_int(11, s.dictionary_page_offset); }
c.field_struct(12, s.statistics);
if (s.encoding_stats.has_value()) { c.field_struct_list(13, s.encoding_stats.value()); }
if (s.size_statistics.has_value()) { c.field_struct(16, s.size_statistics.value()); }
return c.value();
}
Expand Down Expand Up @@ -248,6 +249,15 @@ size_t CompactProtocolWriter::write(ColumnOrder const& co)
return c.value();
}

size_t CompactProtocolWriter::write(PageEncodingStats const& enc)
{
CompactProtocolFieldWriter c(*this);
c.field_int(1, static_cast<int32_t>(enc.page_type));
c.field_int(2, static_cast<int32_t>(enc.encoding));
c.field_int(3, enc.count);
return c.value();
}

size_t CompactProtocolWriter::write(SortingColumn const& sc)
{
CompactProtocolFieldWriter c(*this);
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/compact_protocol_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class CompactProtocolWriter {
size_t write(OffsetIndex const&);
size_t write(SizeStatistics const&);
size_t write(ColumnOrder const&);
size_t write(PageEncodingStats const&);
size_t write(SortingColumn const&);

protected:
Expand Down
46 changes: 35 additions & 11 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,15 @@ struct ColumnIndex {
thrust::optional<std::vector<int64_t>> definition_level_histogram;
};

/**
* @brief Thrift-derived struct describing page encoding statistics
*/
struct PageEncodingStats {
PageType page_type; // The page type (data/dic/...)
Encoding encoding; // Encoding of the page
int32_t count; // Number of pages of this type with this encoding
};

/**
* @brief Thrift-derived struct describing column sort order
*/
Expand All @@ -335,21 +344,36 @@ struct SortingColumn {
* @brief Thrift-derived struct describing a column chunk
*/
struct ColumnChunkMetaData {
// Type of this column
Type type = BOOLEAN;
// Set of all encodings used for this column. The purpose is to validate
// whether we can decode those pages.
std::vector<Encoding> encodings;
// Path in schema
std::vector<std::string> path_in_schema;
Compression codec = UNCOMPRESSED;
// Compression codec
Compression codec = UNCOMPRESSED;
// Number of values in this column
int64_t num_values = 0;
int64_t total_uncompressed_size =
0; // total byte size of all uncompressed pages in this column chunk (including the headers)
int64_t total_compressed_size =
0; // total byte size of all compressed pages in this column chunk (including the headers)
int64_t data_page_offset = 0; // Byte offset from beginning of file to first data page
int64_t index_page_offset = 0; // Byte offset from beginning of file to root index page
int64_t dictionary_page_offset =
0; // Byte offset from the beginning of file to first (only) dictionary page
Statistics statistics; // Encoded chunk-level statistics
thrust::optional<SizeStatistics> size_statistics; // Size statistics for the chunk
// Total byte size of all uncompressed pages in this column chunk (including the headers)
int64_t total_uncompressed_size = 0;
// Total byte size of all compressed pages in this column chunk (including the headers)
int64_t total_compressed_size = 0;
// Byte offset from beginning of file to first data page
int64_t data_page_offset = 0;
// Byte offset from beginning of file to root index page
int64_t index_page_offset = 0;
// Byte offset from the beginning of file to first (only) dictionary page
int64_t dictionary_page_offset = 0;
// Optional statistics for this column chunk
Statistics statistics;
// Set of all encodings used for pages in this column chunk. This information can be used to
// determine if all data pages are dictionary encoded for example.
thrust::optional<std::vector<PageEncodingStats>> encoding_stats;
// Optional statistics to help estimate total memory when converted to in-memory representations.
// The histograms contained in these statistics can also be useful in some cases for more
// fine-grained nullability/list length filter pushdown.
thrust::optional<SizeStatistics> size_statistics;
};

/**
Expand Down
50 changes: 50 additions & 0 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "compact_protocol_reader.hpp"
#include "compact_protocol_writer.hpp"
#include "io/comp/nvcomp_adapter.hpp"
#include "io/parquet/parquet.hpp"
#include "io/parquet/parquet_gpu.hpp"
#include "io/statistics/column_statistics.cuh"
#include "io/utilities/column_utils.cuh"
#include "io/utilities/config_utils.hpp"
Expand Down Expand Up @@ -214,6 +216,53 @@ void update_chunk_encodings(std::vector<Encoding>& encodings, uint32_t enc_mask)
}
}

/**
* @brief Update the encoding_stats field in the column chunk metadata.
*
* @param chunk_meta The `ColumnChunkMetaData` struct for the column chunk
* @param ck The column chunk to summarize stats for
* @param is_v2 True if V2 page headers are used
*/
void update_chunk_encoding_stats(ColumnChunkMetaData& chunk_meta,
EncColumnChunk const& ck,
bool is_v2)
{
// don't set encoding stats if there are no pages
if (ck.num_pages == 0) { return; }

// NOTE: since cudf doesn't use mixed encodings for a chunk, we really only need to account
// for the dictionary page (if there is one), and the encoding used for the data pages. We can
// examine the chunk's encodings field to figure out the encodings without having to examine
// the page data.
auto const num_data_pages = static_cast<int32_t>(ck.num_data_pages());
auto const data_page_type = is_v2 ? PageType::DATA_PAGE_V2 : PageType::DATA_PAGE;

std::vector<PageEncodingStats> result;
if (ck.use_dictionary) {
// For dictionary encoding, if V1 then both data and dictionary use PLAIN_DICTIONARY. For V2
// the dictionary uses PLAIN and the data RLE_DICTIONARY.
auto const dict_enc = is_v2 ? Encoding::PLAIN : Encoding::PLAIN_DICTIONARY;
auto const data_enc = is_v2 ? Encoding::RLE_DICTIONARY : Encoding::PLAIN_DICTIONARY;
result.push_back({PageType::DICTIONARY_PAGE, dict_enc, 1});
if (num_data_pages > 0) { result.push_back({data_page_type, data_enc, num_data_pages}); }
} else {
// No dictionary page, the pages are encoded with something other than RLE (unless it's a
// boolean column).
for (auto const enc : chunk_meta.encodings) {
if (enc != Encoding::RLE) {
result.push_back({data_page_type, enc, num_data_pages});
break;
}
}
// if result is empty and we're using V2 headers, then assume the data is RLE as well
if (result.empty() and is_v2 and (ck.encodings & encoding_to_mask(Encoding::RLE)) != 0) {
result.push_back({data_page_type, Encoding::RLE, num_data_pages});
}
}

if (not result.empty()) { chunk_meta.encoding_stats = std::move(result); }
}

/**
* @brief Compute size (in bytes) of the data stored in the given column.
*
Expand Down Expand Up @@ -2144,6 +2193,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
max_write_size = std::max(max_write_size, ck.compressed_size);

update_chunk_encodings(column_chunk_meta.encodings, ck.encodings);
update_chunk_encoding_stats(column_chunk_meta, ck, write_v2_headers);

if (ck.ck_stat_size != 0) {
std::vector<uint8_t> const stats_blob = cudf::detail::make_std_vector_sync(
Expand Down
13 changes: 12 additions & 1 deletion cpp/tests/io/parquet_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1674,7 +1674,18 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings)
// no nulls and no repetition, so the only encoding used should be for the data.
// since we're writing v1, both dict and data pages should use PLAIN_DICTIONARY.
auto const expect_enc = [&fmd](int idx, cudf::io::parquet::detail::Encoding enc) {
EXPECT_EQ(fmd.row_groups[0].columns[idx].meta_data.encodings[0], enc);
auto const& col_meta = fmd.row_groups[0].columns[idx].meta_data;
EXPECT_EQ(col_meta.encodings[0], enc);

// also check encoding stats are written properly
ASSERT_TRUE(col_meta.encoding_stats.has_value());
auto const& enc_stats = col_meta.encoding_stats.value();
for (auto const& ec : enc_stats) {
if (ec.page_type == cudf::io::parquet::detail::PageType::DATA_PAGE) {
EXPECT_EQ(ec.encoding, enc);
EXPECT_EQ(ec.count, 1);
}
}
};

// requested plain
Expand Down

0 comments on commit d91a4ad

Please sign in to comment.