Skip to content

Commit

Permalink
Refactor ORC ProtobufReader to make it more extendable(#7055)
Browse files Browse the repository at this point in the history
Related to #5826

Refactor the `ProtobufReader` API to facilitate expansion to support robust reading of column statistics.
Changes include:

- Move `orc::metadata` from `readder_impl.cu` to `orc.h` so it can be reused for statistics related APIs.
- Removed duplicated code in `read_orc_statistics` - use `orc::metadata` instead.
- Rename `ColumnStatistics` to `ColStatsBlob`, since that's what it currently is.
- Avoid redundant copies in `read_orc_statistics`,
- Replace `get_u32`, `get_i32`, etc. with templated `get`.
- Replace per-type functors (e.g. `FieldUInt64`) with templated `field_reader`s to reduce code repetition.
- The two type-specific parts of `FieldXYZ` functors (field enum and read impl) are now separate to avoid redundant code.
- `field_reader` dispatches based on the value type, so also added `packed_field_reader` and `raw_field_reader` for packed fields and blob reads (respectively).
- Replace return value based error checking in `ProtobufReader` with `CUDF_EXPECTS`.
- Removed `InitSchema` from `ProtobufReader` - schema is only used to determine column names. The names are now lazily calculated in `metadata::get_column_name`

Authors:
  - vuule <[email protected]>
  - Vukasin Milovanovic <[email protected]>

Approvers:
  - Kumar Aatish
  - Conor Hoekstra

URL: #7055
  • Loading branch information
vuule authored Jan 5, 2021
1 parent 91322ba commit 7bf0505
Show file tree
Hide file tree
Showing 6 changed files with 576 additions and 682 deletions.
60 changes: 12 additions & 48 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,65 +211,29 @@ std::vector<std::vector<std::string>> read_orc_statistics(source_info const& src
CUDF_FAIL("Unsupported source type");
}

// Get size of file and size of postscript
const auto len = source->size();
const auto max_ps_size = std::min(len, static_cast<size_t>(256));

// Read uncompressed postscript section (max 255 bytes + 1 byte for length)
auto buffer = source->host_read(len - max_ps_size, max_ps_size);
const size_t ps_length = buffer->data()[max_ps_size - 1];
const uint8_t* ps_data = &buffer->data()[max_ps_size - ps_length - 1];
orc::ProtobufReader pb;
orc::PostScript ps;
pb.init(ps_data, ps_length);
CUDF_EXPECTS(pb.read(ps, ps_length), "Cannot read postscript");
CUDF_EXPECTS(ps.footerLength + ps_length < len, "Invalid footer length");

// If compression is used, all the rest of the metadata is compressed
// If no compressed is used, the decompressor is simply a pass-through
std::unique_ptr<orc::OrcDecompressor> decompressor =
std::make_unique<orc::OrcDecompressor>(ps.compression, ps.compressionBlockSize);

// Read compressed filefooter section
buffer = source->host_read(len - ps_length - 1 - ps.footerLength, ps.footerLength);
size_t ff_length = 0;
auto ff_data = decompressor->Decompress(buffer->data(), ps.footerLength, &ff_length);
orc::FileFooter ff;
pb.init(ff_data, ff_length);
CUDF_EXPECTS(pb.read(ff, ff_length), "Cannot read filefooter");
CUDF_EXPECTS(ff.types.size() > 0, "No columns found");

// Read compressed metadata section
buffer =
source->host_read(len - ps_length - 1 - ps.footerLength - ps.metadataLength, ps.metadataLength);
size_t md_length = 0;
auto md_data = decompressor->Decompress(buffer->data(), ps.metadataLength, &md_length);
orc::Metadata md;
pb.init(md_data, md_length);
CUDF_EXPECTS(pb.read(md, md_length), "Cannot read metadata");
orc::metadata metadata(source.get());

// Initialize statistics to return
std::vector<std::vector<std::string>> statistics_blobs;

// Get column names
std::vector<std::string> column_names;
for (auto i = 0; i < ff.types.size(); i++) { column_names.push_back(ff.GetColumnName(i)); }
statistics_blobs.push_back(column_names);
statistics_blobs.emplace_back();
for (auto i = 0; i < metadata.get_num_columns(); i++) {
statistics_blobs.back().push_back(metadata.get_column_name(i));
}

// Get file-level statistics, statistics of each column of file
std::vector<std::string> file_column_statistics_blobs;
for (orc::ColumnStatistics stats : ff.statistics) {
file_column_statistics_blobs.push_back(std::string(stats.begin(), stats.end()));
statistics_blobs.emplace_back();
for (auto const& stats : metadata.ff.statistics) {
statistics_blobs.back().push_back(std::string(stats.cbegin(), stats.cend()));
}
statistics_blobs.push_back(file_column_statistics_blobs);

// Get stripe-level statistics
for (orc::StripeStatistics stripe_stats : md.stripeStats) {
std::vector<std::string> stripe_column_statistics_blobs;
for (orc::ColumnStatistics stats : stripe_stats.colStats) {
stripe_column_statistics_blobs.push_back(std::string(stats.begin(), stats.end()));
for (auto const& stripe_stats : metadata.md.stripeStats) {
statistics_blobs.emplace_back();
for (auto const& stats : stripe_stats.colStats) {
statistics_blobs.back().push_back(std::string(stats.cbegin(), stats.cend()));
}
statistics_blobs.push_back(stripe_column_statistics_blobs);
}

return statistics_blobs;
Expand Down
Loading

0 comments on commit 7bf0505

Please sign in to comment.