Skip to content

Commit

Permalink
Refactor aggregate_metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
devavret committed Dec 10, 2021
1 parent 2e1c359 commit f44a50b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 47 deletions.
117 changes: 72 additions & 45 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,43 @@ parquet::Compression to_parquet_compression(compression_type compression)

} // namespace

struct per_file_metadata {
per_file_metadata(int num_files) : files(num_files) {}
struct aggregate_metadata {
aggregate_metadata(std::vector<partition_info> const& partitions,
size_type num_columns,
std::vector<SchemaElement> schema,

statistics_freq stats_granularity,
std::vector<std::map<std::string, std::string>> const& kv_md)
: version(1), schema(std::move(schema)), files(partitions.size())
{
for (size_t i = 0; i < partitions.size(); ++i) {
this->files[i].num_rows = partitions[i].num_rows;
}
this->column_order_listsize =
(stats_granularity != statistics_freq::STATISTICS_NONE) ? num_columns : 0;

for (size_t p = 0; p < kv_md.size(); ++p) {
std::transform(kv_md[p].begin(),
kv_md[p].end(),
std::back_inserter(this->files[p].key_value_metadata),
[](auto const& kv) {
return KeyValue{kv.first, kv.second};
});
}
}

void update_files(std::vector<partition_info> const& partitions)
{
CUDF_EXPECTS(partitions.size() == this->files.size(),
"New partitions must be same size as previously passed number of partitions");
for (size_t i = 0; i < partitions.size(); ++i) {
this->files[i].num_rows += partitions[i].num_rows;
}
}

FileMetaData get_metadata(int part)
FileMetaData get_metadata(size_t part)
{
CUDF_EXPECTS(part < files.size(), "Invalid part index queried");
FileMetaData meta{};
meta.version = this->version;
meta.schema = this->schema;
Expand All @@ -95,14 +127,44 @@ struct per_file_metadata {
return meta;
}

void set_file_paths(std::vector<std::string> const& column_chunks_file_path)
{
for (size_t p = 0; p < this->files.size(); ++p) {
auto& file = this->files[p];
auto const& file_path = column_chunks_file_path[p];
for (auto& rowgroup : file.row_groups) {
for (auto& col : rowgroup.columns) {
col.file_path = file_path;
}
}
}
}

FileMetaData get_merged_metadata()
{
FileMetaData merged_md;
for (size_t p = 0; p < this->files.size(); ++p) {
auto& file = this->files[p];
if (p == 0) {
merged_md = this->get_metadata(0);
} else {
merged_md.row_groups.insert(merged_md.row_groups.end(),
std::make_move_iterator(file.row_groups.begin()),
std::make_move_iterator(file.row_groups.end()));
merged_md.num_rows += file.num_rows;
}
}
return merged_md;
}

int32_t version = 0;
std::vector<SchemaElement> schema;
struct per_file_members {
struct per_file_metadata {
int64_t num_rows = 0;
std::vector<RowGroup> row_groups;
std::vector<KeyValue> key_value_metadata;
};
std::vector<per_file_members> files;
std::vector<per_file_metadata> files;
std::string created_by = "";
uint32_t column_order_listsize = 0;
};
Expand Down Expand Up @@ -1148,32 +1210,14 @@ void writer::impl::write(table_view const& table, std::vector<partition_info> pa
std::vector<SchemaElement> this_table_schema(schema_tree.begin(), schema_tree.end());

if (!md) {
md = std::make_unique<per_file_metadata>(partitions.size());
md->version = 1;
for (size_t i = 0; i < partitions.size(); ++i) {
md->files[i].num_rows = partitions[i].num_rows;
}
md->column_order_listsize =
(stats_granularity_ != statistics_freq::STATISTICS_NONE) ? num_columns : 0;

for (size_t p = 0; p < kv_md.size(); ++p) {
std::transform(kv_md[p].begin(),
kv_md[p].end(),
std::back_inserter(md->files[p].key_value_metadata),
[](auto const& kv) {
return KeyValue{kv.first, kv.second};
});
}
md->schema = this_table_schema;
md = std::make_unique<aggregate_metadata>(
partitions, num_columns, std::move(this_table_schema), stats_granularity_, kv_md);
} else {
// verify the user isn't passing mismatched tables
CUDF_EXPECTS(md->schema == this_table_schema,
"Mismatch in schema between multiple calls to write_chunk");

// increment num rows
for (size_t i = 0; i < partitions.size(); ++i) {
md->files[i].num_rows += partitions[i].num_rows;
}
md->update_files(partitions);
}
// Create table_device_view so that corresponding column_device_view data
// can be written into col_desc members
Expand Down Expand Up @@ -1586,33 +1630,16 @@ std::unique_ptr<std::vector<uint8_t>> writer::impl::close(
if (column_chunks_file_path.size() > 0) {
CUDF_EXPECTS(column_chunks_file_path.size() == md->files.size(),
"Expected one column chunk path per output file");
md->set_file_paths(column_chunks_file_path);
file_header_s fhdr = {parquet_magic};
std::vector<uint8_t> buffer;
CompactProtocolWriter cpw(&buffer);
buffer.insert(buffer.end(),
reinterpret_cast<const uint8_t*>(&fhdr),
reinterpret_cast<const uint8_t*>(&fhdr) + sizeof(fhdr));
FileMetaData merged_md;
for (size_t p = 0; p < md->files.size(); ++p) {
auto& file = md->files[p];
auto const& file_path = column_chunks_file_path[p];
for (auto& rowgroup : file.row_groups) {
for (auto& col : rowgroup.columns) {
col.file_path = file_path;
}
}
if (p == 0) {
merged_md = md->get_metadata(0);
} else {
merged_md.row_groups.insert(merged_md.row_groups.end(),
std::make_move_iterator(file.row_groups.begin()),
std::make_move_iterator(file.row_groups.end()));
merged_md.num_rows += file.num_rows;
}
}
file_ender_s fendr;
fendr.magic = parquet_magic;
fendr.footer_len = static_cast<uint32_t>(cpw.write(merged_md));
fendr.footer_len = static_cast<uint32_t>(cpw.write(md->get_merged_metadata()));
buffer.insert(buffer.end(),
reinterpret_cast<const uint8_t*>(&fendr),
reinterpret_cast<const uint8_t*>(&fendr) + sizeof(fendr));
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace detail {
namespace parquet {
// Forward internal classes
struct parquet_column_view;
struct per_file_metadata;
struct aggregate_metadata;

using namespace cudf::io::parquet;
using namespace cudf::io;
Expand Down Expand Up @@ -212,7 +212,7 @@ class writer::impl {
statistics_freq stats_granularity_ = statistics_freq::STATISTICS_NONE;
bool int96_timestamps = false;
// Overall file metadata. Filled in during the process and written during write_chunked_end()
std::unique_ptr<per_file_metadata> md;
std::unique_ptr<aggregate_metadata> md;
// File footer key-value metadata. Written during write_chunked_end()
std::vector<std::map<std::string, std::string>> kv_md;
// optional user metadata
Expand Down

0 comments on commit f44a50b

Please sign in to comment.