Skip to content

Commit

Permalink
Reorganize ORC reader into multiple files and perform some small fixe…
Browse files Browse the repository at this point in the history
…s to cuIO code (#14665)

This refactors the ORC reader, moving ORC code around to facilitate the upcoming support for chunked reading of the input files.

No new functionality/implementation is added in this PR. Only the existing code is moving around, except that some small issues of the related ORC/cuIO code are also fixed.

Authors:
  - Nghia Truong (https://github.com/ttnghia)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Robert Maynard (https://github.com/robertmaynard)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Bradley Dice (https://github.com/bdice)

URL: #14665
  • Loading branch information
ttnghia authored Jan 17, 2024
1 parent c811987 commit 42e946f
Show file tree
Hide file tree
Showing 10 changed files with 1,514 additions and 1,317 deletions.
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ add_library(
src/io/orc/dict_enc.cu
src/io/orc/orc.cpp
src/io/orc/reader_impl.cu
src/io/orc/reader_impl_helpers.cpp
src/io/orc/reader_impl_preprocess.cu
src/io/orc/stats_enc.cu
src/io/orc/stripe_data.cu
src/io/orc/stripe_enc.cu
Expand Down
45 changes: 21 additions & 24 deletions cpp/src/io/orc/aggregate_orc_metadata.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,6 @@

#include <algorithm>
#include <numeric>
#include <optional>

namespace cudf::io::orc::detail {

Expand Down Expand Up @@ -220,27 +219,25 @@ aggregate_orc_metadata::select_stripes(
}

// Read each stripe's stripefooter metadata
if (not selected_stripes_mapping.empty()) {
for (auto& mapping : selected_stripes_mapping) {
// Resize to all stripe_info for the source level
per_file_metadata[mapping.source_idx].stripefooters.resize(mapping.stripe_info.size());

for (size_t i = 0; i < mapping.stripe_info.size(); i++) {
auto const stripe = mapping.stripe_info[i].first;
auto const sf_comp_offset = stripe->offset + stripe->indexLength + stripe->dataLength;
auto const sf_comp_length = stripe->footerLength;
CUDF_EXPECTS(
sf_comp_offset + sf_comp_length < per_file_metadata[mapping.source_idx].source->size(),
"Invalid stripe information");
auto const buffer =
per_file_metadata[mapping.source_idx].source->host_read(sf_comp_offset, sf_comp_length);
auto sf_data = per_file_metadata[mapping.source_idx].decompressor->decompress_blocks(
{buffer->data(), buffer->size()}, stream);
ProtobufReader(sf_data.data(), sf_data.size())
.read(per_file_metadata[mapping.source_idx].stripefooters[i]);
mapping.stripe_info[i].second = &per_file_metadata[mapping.source_idx].stripefooters[i];
if (stripe->indexLength == 0) { row_grp_idx_present = false; }
}
for (auto& mapping : selected_stripes_mapping) {
// Resize to all stripe_info for the source level
per_file_metadata[mapping.source_idx].stripefooters.resize(mapping.stripe_info.size());

for (size_t i = 0; i < mapping.stripe_info.size(); i++) {
auto const stripe = mapping.stripe_info[i].first;
auto const sf_comp_offset = stripe->offset + stripe->indexLength + stripe->dataLength;
auto const sf_comp_length = stripe->footerLength;
CUDF_EXPECTS(
sf_comp_offset + sf_comp_length < per_file_metadata[mapping.source_idx].source->size(),
"Invalid stripe information");
auto const buffer =
per_file_metadata[mapping.source_idx].source->host_read(sf_comp_offset, sf_comp_length);
auto sf_data = per_file_metadata[mapping.source_idx].decompressor->decompress_blocks(
{buffer->data(), buffer->size()}, stream);
ProtobufReader(sf_data.data(), sf_data.size())
.read(per_file_metadata[mapping.source_idx].stripefooters[i]);
mapping.stripe_info[i].second = &per_file_metadata[mapping.source_idx].stripefooters[i];
if (stripe->indexLength == 0) { row_grp_idx_present = false; }
}
}

Expand Down Expand Up @@ -270,7 +267,7 @@ column_hierarchy aggregate_orc_metadata::select_columns(
CUDF_EXPECTS(name_found, "Unknown column name: " + std::string(path));
}
}
return {std::move(selected_columns)};
return column_hierarchy{std::move(selected_columns)};
}

} // namespace cudf::io::orc::detail
38 changes: 18 additions & 20 deletions cpp/src/io/orc/aggregate_orc_metadata.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,6 +14,8 @@
* limitations under the License.
*/

#pragma once

#include "orc.hpp"

#include <map>
Expand All @@ -33,8 +35,8 @@ struct column_hierarchy {
// Each element contains column at the given nesting level
std::vector<std::vector<orc_column_meta>> levels;

column_hierarchy(nesting_map child_map);
auto num_levels() const { return levels.size(); }
explicit column_hierarchy(nesting_map child_map);
[[nodiscard]] auto num_levels() const { return levels.size(); }
};

/**
Expand All @@ -50,11 +52,6 @@ class aggregate_orc_metadata {
*/
[[nodiscard]] int64_t calc_num_rows() const;

/**
* @brief Number of columns in a ORC file.
*/
[[nodiscard]] size_type calc_num_cols() const;

/**
* @brief Sums up the number of stripes of each source
*/
Expand All @@ -69,22 +66,23 @@ class aggregate_orc_metadata {
aggregate_orc_metadata(std::vector<std::unique_ptr<datasource>> const& sources,
rmm::cuda_stream_view stream);

[[nodiscard]] auto const& get_schema(int schema_idx) const
[[nodiscard]] auto get_col_type(int col_idx) const
{
return per_file_metadata[0].ff.types[schema_idx];
return per_file_metadata[0].ff.types[col_idx];
}

auto get_col_type(int col_idx) const { return per_file_metadata[0].ff.types[col_idx]; }

[[nodiscard]] auto get_num_rows() const { return num_rows; }

auto get_num_cols() const { return per_file_metadata[0].get_num_columns(); }
[[nodiscard]] auto get_num_cols() const { return per_file_metadata[0].get_num_columns(); }

[[nodiscard]] auto get_num_stripes() const { return num_stripes; }

[[nodiscard]] auto const& get_types() const { return per_file_metadata[0].ff.types; }

[[nodiscard]] int get_row_index_stride() const { return per_file_metadata[0].ff.rowIndexStride; }
[[nodiscard]] int get_row_index_stride() const
{
return static_cast<int>(per_file_metadata[0].ff.rowIndexStride);
}

[[nodiscard]] auto is_row_grp_idx_present() const { return row_grp_idx_present; }

Expand Down Expand Up @@ -115,11 +113,11 @@ class aggregate_orc_metadata {
*
* Stripes are potentially selected from multiple files.
*/
std::tuple<int64_t, size_type, std::vector<metadata::stripe_source_mapping>> select_stripes(
std::vector<std::vector<size_type>> const& user_specified_stripes,
uint64_t skip_rows,
std::optional<size_type> const& num_rows,
rmm::cuda_stream_view stream);
[[nodiscard]] std::tuple<int64_t, size_type, std::vector<metadata::stripe_source_mapping>>
select_stripes(std::vector<std::vector<size_type>> const& user_specified_stripes,
uint64_t skip_rows,
std::optional<size_type> const& num_rows,
rmm::cuda_stream_view stream);

/**
* @brief Filters ORC file to a selection of columns, based on their paths in the file.
Expand All @@ -131,7 +129,7 @@ class aggregate_orc_metadata {
* `nullopt` if user did not select columns to read
* @return Columns hierarchy - lists of children columns and sorted columns in each nesting level
*/
column_hierarchy select_columns(
[[nodiscard]] column_hierarchy select_columns(
std::optional<std::vector<std::string>> const& column_paths) const;
};

Expand Down
Loading

0 comments on commit 42e946f

Please sign in to comment.