Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Parquet encoding statistics to column chunk metadata #15452

Merged
merged 20 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#include "compact_protocol_reader.hpp"

#include "parquet.hpp"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the empty line above 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, but clang-format insists on that line being there 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird. Definitely clang-format bug 😓

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Maybe a special group for header with the same root as the compilation unit 🤷

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harrism Would you mind looking into this please?

#include "parquet_common.hpp"

#include <cudf/utilities/error.hpp>

#include <algorithm>
Expand Down Expand Up @@ -640,6 +643,9 @@ void CompactProtocolReader::read(ColumnChunkMetaData* c)
{
using optional_size_statistics =
parquet_field_optional<SizeStatistics, parquet_field_struct<SizeStatistics>>;
using optional_list_enc_stats =
parquet_field_optional<std::vector<PageEncodingStats>,
parquet_field_struct_list<PageEncodingStats>>;
auto op = std::make_tuple(parquet_field_enum<Type>(1, c->type),
parquet_field_enum_list(2, c->encodings),
parquet_field_string_list(3, c->path_in_schema),
Expand All @@ -651,6 +657,7 @@ void CompactProtocolReader::read(ColumnChunkMetaData* c)
parquet_field_int64(10, c->index_page_offset),
parquet_field_int64(11, c->dictionary_page_offset),
parquet_field_struct(12, c->statistics),
optional_list_enc_stats(13, c->encoding_stats),
optional_size_statistics(16, c->size_statistics));
function_builder(this, op);
}
Expand Down Expand Up @@ -762,6 +769,14 @@ void CompactProtocolReader::read(ColumnOrder* c)
function_builder(this, op);
}

void CompactProtocolReader::read(PageEncodingStats* s)
{
auto op = std::make_tuple(parquet_field_enum<PageType>(1, s->page_type),
parquet_field_enum<Encoding>(2, s->encoding),
parquet_field_int32(3, s->count));
function_builder(this, op);
}

/**
* @brief Constructs the schema from the file-level metadata
*
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/parquet/compact_protocol_reader.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION.
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -120,6 +120,7 @@ class CompactProtocolReader {
void read(ColumnIndex* c);
void read(Statistics* s);
void read(ColumnOrder* c);
void read(PageEncodingStats* s);

public:
static int NumRequiredBits(uint32_t max_level) noexcept
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/io/parquet/compact_protocol_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "compact_protocol_writer.hpp"

#include "parquet.hpp"
ttnghia marked this conversation as resolved.
Show resolved Hide resolved

#include <cudf/utilities/error.hpp>

namespace cudf::io::parquet::detail {
Expand Down Expand Up @@ -182,6 +184,7 @@ size_t CompactProtocolWriter::write(ColumnChunkMetaData const& s)
if (s.index_page_offset != 0) { c.field_int(10, s.index_page_offset); }
if (s.dictionary_page_offset != 0) { c.field_int(11, s.dictionary_page_offset); }
c.field_struct(12, s.statistics);
if (s.encoding_stats.has_value()) { c.field_struct_list(13, s.encoding_stats.value()); }
if (s.size_statistics.has_value()) { c.field_struct(16, s.size_statistics.value()); }
return c.value();
}
Expand Down Expand Up @@ -242,6 +245,15 @@ size_t CompactProtocolWriter::write(ColumnOrder const& co)
return c.value();
}

size_t CompactProtocolWriter::write(PageEncodingStats const& enc)
{
CompactProtocolFieldWriter c(*this);
c.field_int(1, static_cast<int32_t>(enc.page_type));
c.field_int(2, static_cast<int32_t>(enc.encoding));
c.field_int(3, enc.count);
return c.value();
}

void CompactProtocolFieldWriter::put_byte(uint8_t v) { writer.m_buf.push_back(v); }

void CompactProtocolFieldWriter::put_byte(uint8_t const* raw, uint32_t len)
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/compact_protocol_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class CompactProtocolWriter {
size_t write(OffsetIndex const&);
size_t write(SizeStatistics const&);
size_t write(ColumnOrder const&);
size_t write(PageEncodingStats const&);

protected:
std::vector<uint8_t>& m_buf;
Expand Down
48 changes: 36 additions & 12 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION.
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -322,25 +322,49 @@ struct ColumnIndex {
thrust::optional<std::vector<int64_t>> definition_level_histogram;
};

/**
* @brief Thrift-derived struct describing page encoding statistics
*/
struct PageEncodingStats {
PageType page_type; // The page type (data/dic/...)
Encoding encoding; // Encoding of the page
int32_t count; // Number of pages of this type with this encoding
};

/**
* @brief Thrift-derived struct describing a column chunk
*/
struct ColumnChunkMetaData {
// Type of this column
Type type = BOOLEAN;
// Set of all encodings used for this column. The purpose is to validate
// whether we can decode those pages.
std::vector<Encoding> encodings;
// Path in schema
std::vector<std::string> path_in_schema;
Compression codec = UNCOMPRESSED;
// Compression codec
Compression codec = UNCOMPRESSED;
// Number of values in this column
int64_t num_values = 0;
int64_t total_uncompressed_size =
0; // total byte size of all uncompressed pages in this column chunk (including the headers)
int64_t total_compressed_size =
0; // total byte size of all compressed pages in this column chunk (including the headers)
int64_t data_page_offset = 0; // Byte offset from beginning of file to first data page
int64_t index_page_offset = 0; // Byte offset from beginning of file to root index page
int64_t dictionary_page_offset =
0; // Byte offset from the beginning of file to first (only) dictionary page
Statistics statistics; // Encoded chunk-level statistics
thrust::optional<SizeStatistics> size_statistics; // Size statistics for the chunk
// Total byte size of all uncompressed pages in this column chunk (including the headers)
int64_t total_uncompressed_size = 0;
// Total byte size of all compressed pages in this column chunk (including the headers)
int64_t total_compressed_size = 0;
// Byte offset from beginning of file to first data page
int64_t data_page_offset = 0;
// Byte offset from beginning of file to root index page
int64_t index_page_offset = 0;
// Byte offset from the beginning of file to first (only) dictionary page
int64_t dictionary_page_offset = 0;
// Optional statistics for this column chunk
Statistics statistics;
// Set of all encodings used for pages in this column chunk. This information can be used to
// determine if all data pages are dictionary encoded for example.
thrust::optional<std::vector<PageEncodingStats>> encoding_stats;
// Optional statistics to help estimate total memory when converted to in-memory representations.
// The histograms contained in these statistics can also be useful in some cases for more
// fine-grained nullability/list length filter pushdown.
thrust::optional<SizeStatistics> size_statistics;
};

/**
Expand Down
50 changes: 50 additions & 0 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "compact_protocol_reader.hpp"
#include "compact_protocol_writer.hpp"
#include "io/comp/nvcomp_adapter.hpp"
#include "io/parquet/parquet.hpp"
#include "io/parquet/parquet_gpu.hpp"
#include "io/statistics/column_statistics.cuh"
#include "io/utilities/column_utils.cuh"
#include "io/utilities/config_utils.hpp"
Expand Down Expand Up @@ -213,6 +215,53 @@ void update_chunk_encodings(std::vector<Encoding>& encodings, uint32_t enc_mask)
}
}

/**
* @brief Update the encoding_stats field in the column chunk metadata.
*
* @param chunk_meta The `ColumnChunkMetaData` struct for the column chunk
* @param ck The column chunk to summarize stats for
* @param is_v2 True if V2 page headers are used
*/
void update_chunk_encoding_stats(ColumnChunkMetaData& chunk_meta,
EncColumnChunk const& ck,
bool is_v2)
{
// don't set encoding stats if there are no pages
if (ck.num_pages == 0) { return; }

// NOTE: since cudf doesn't use mixed encodings for a chunk, we really only need to account
// for the dictionary page (if there is one), and the encoding used for the data pages. We can
// examine the chunk's encodings field to figure out the encodings without having to examine
// the page data.
auto const num_data_pages = static_cast<int32_t>(ck.num_data_pages());
auto const data_page_type = is_v2 ? PageType::DATA_PAGE_V2 : PageType::DATA_PAGE;

std::vector<PageEncodingStats> result;
if (ck.use_dictionary) {
// For dictionary encoding, if V1 then both data and dictionary use PLAIN_DICTIONARY. For V2
// the dictionary uses PLAIN and the data RLE_DICTIONARY.
auto const dict_enc = is_v2 ? Encoding::PLAIN : Encoding::PLAIN_DICTIONARY;
auto const data_enc = is_v2 ? Encoding::RLE_DICTIONARY : Encoding::PLAIN_DICTIONARY;
result.push_back({PageType::DICTIONARY_PAGE, dict_enc, 1});
if (num_data_pages > 0) { result.push_back({data_page_type, data_enc, num_data_pages}); }
} else {
// No dictionary page, the pages are encoded with something other than RLE (unless it's a
// boolean column).
for (auto const enc : chunk_meta.encodings) {
if (enc != Encoding::RLE) {
result.push_back({data_page_type, enc, num_data_pages});
break;
}
}
// if result is empty and we're using V2 headers, then assume the data is RLE as well
if (result.empty() and is_v2 and (ck.encodings & encoding_to_mask(Encoding::RLE)) != 0) {
result.push_back({data_page_type, Encoding::RLE, num_data_pages});
}
}

if (not result.empty()) { chunk_meta.encoding_stats = std::move(result); }
}

/**
* @brief Compute size (in bytes) of the data stored in the given column.
*
Expand Down Expand Up @@ -2129,6 +2178,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
max_write_size = std::max(max_write_size, ck.compressed_size);

update_chunk_encodings(column_chunk_meta.encodings, ck.encodings);
update_chunk_encoding_stats(column_chunk_meta, ck, write_v2_headers);

if (ck.ck_stat_size != 0) {
std::vector<uint8_t> const stats_blob = cudf::detail::make_std_vector_sync(
Expand Down
13 changes: 12 additions & 1 deletion cpp/tests/io/parquet_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1635,7 +1635,18 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings)
// no nulls and no repetition, so the only encoding used should be for the data.
// since we're writing v1, both dict and data pages should use PLAIN_DICTIONARY.
auto const expect_enc = [&fmd](int idx, cudf::io::parquet::detail::Encoding enc) {
EXPECT_EQ(fmd.row_groups[0].columns[idx].meta_data.encodings[0], enc);
auto const& col_meta = fmd.row_groups[0].columns[idx].meta_data;
EXPECT_EQ(col_meta.encodings[0], enc);

// also check encoding stats are written properly
ASSERT_TRUE(col_meta.encoding_stats.has_value());
auto const& enc_stats = col_meta.encoding_stats.value();
for (auto const& ec : enc_stats) {
if (ec.page_type == cudf::io::parquet::detail::PageType::DATA_PAGE) {
EXPECT_EQ(ec.encoding, enc);
EXPECT_EQ(ec.count, 1);
}
}
};

// requested plain
Expand Down
Loading