Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nested column selection to parquet reader #8933

Merged
merged 26 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
38ff1b6
Use returned references from emplace_back because c++17
devavret Jul 27, 2021
45067a4
Merge input path in schema
devavret Jul 27, 2021
0a3e55c
Working deep select_columns
devavret Jul 28, 2021
920b194
Change external API and switch over to new select_column
devavret Jul 28, 2021
faa07ec
Read all when columns=[]
devavret Jul 28, 2021
d634f3e
Use new API in gtests
devavret Jul 28, 2021
24e7166
Add pandas index
devavret Jul 28, 2021
0d0baed
Gtests
devavret Jul 29, 2021
11334e5
Merge branch 'branch-21.10' into parq-read-select-columns
devavret Jul 30, 2021
6afae81
Enable pytest that depended on #7561
devavret Aug 2, 2021
2b9d659
Fix test breakage in pyarrow engine
devavret Aug 2, 2021
cbdf537
verify selected col name for stub (list) schema element
devavret Aug 2, 2021
fd30a00
Fix perf regression for large number of cols
devavret Aug 2, 2021
4965097
Cleanups:
devavret Aug 2, 2021
75f318d
Skip verifying list's child's name
devavret Aug 2, 2021
4acd894
Add pytests for select columns
devavret Aug 3, 2021
245f2cc
Fix arrow source test with new API
devavret Aug 3, 2021
2b7c181
Use schema idx instead of SchemaElement&
devavret Aug 4, 2021
ba6def8
Allow invalid paths to be passed
devavret Aug 5, 2021
9a62d1f
Merge branch 'branch-21.10' into parq-read-select-columns
devavret Aug 11, 2021
0627343
Review fix for pytest
devavret Aug 11, 2021
ef0b575
Review cpp fixes
devavret Aug 12, 2021
9c5c5a7
Change the logic so that . can be in col name
devavret Aug 17, 2021
1e4427e
Revert select columns API to single str
devavret Aug 17, 2021
fa58f2c
Revert API in python
devavret Aug 17, 2021
1c9ab5b
Review fixes
devavret Aug 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class parquet_reader_options_builder;
class parquet_reader_options {
source_info _source;

// Names of column to read; empty is all
// Path in schema of column to read; empty is all
std::vector<std::string> _columns;

// List of individual row groups to read (ignored if empty)
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ int CompactProtocolReader::WalkSchema(
++idx;
if (e->num_children > 0) {
for (int i = 0; i < e->num_children; i++) {
e->children_idx.push_back(idx);
int idx_old = idx;
idx = WalkSchema(md, idx, parent_idx, max_def_level, max_rep_level);
if (idx <= idx_old) break; // Error
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ struct SchemaElement {
int max_definition_level = 0;
int max_repetition_level = 0;
int parent_idx = 0;
std::vector<size_t> children_idx;

bool operator==(SchemaElement const& other) const
{
Expand Down
304 changes: 190 additions & 114 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,9 @@ class aggregate_metadata {
*
* @param names List of column names to load, where index column name(s) will be added
*/
void add_pandas_index_names(std::vector<std::string>& names) const
std::vector<std::string> get_pandas_index_names() const
cwharris marked this conversation as resolved.
Show resolved Hide resolved
{
std::vector<std::string> names;
auto str = get_pandas_index();
if (str.length() != 0) {
std::regex index_name_expr{R"(\"((?:\\.|[^\"])*)\")"};
Expand All @@ -480,6 +481,7 @@ class aggregate_metadata {
str = sm.suffix();
}
}
return names;
}

struct row_group_info {
Expand Down Expand Up @@ -549,86 +551,14 @@ class aggregate_metadata {
return selection;
}

/**
* @brief Build input and output column structures based on schema input. Recursive.
*
* @param[in,out] schema_idx Schema index to build information for. This value gets
* incremented as the function recurses.
* @param[out] input_columns Input column information (source data in the file)
* @param[out] output_columns Output column structure (resulting cudf columns)
* @param[in,out] nesting A stack keeping track of child column indices so we can
* reproduce the linear list of output columns that correspond to an input column.
* @param[in] strings_to_categorical Type conversion parameter
* @param[in] timestamp_type_id Type conversion parameter
* @param[in] strict_decimal_types True if it is an error to load an unsupported decimal type
*
*/
void build_column_info(int& schema_idx,
std::vector<input_column_info>& input_columns,
std::vector<column_buffer>& output_columns,
std::deque<int>& nesting,
bool strings_to_categorical,
type_id timestamp_type_id,
bool strict_decimal_types) const
{
int start_schema_idx = schema_idx;
auto const& schema = get_schema(schema_idx);
schema_idx++;

// if I am a stub, continue on
if (schema.is_stub()) {
// is this legit?
CUDF_EXPECTS(schema.num_children == 1, "Unexpected number of children for stub");
build_column_info(schema_idx,
input_columns,
output_columns,
nesting,
strings_to_categorical,
timestamp_type_id,
strict_decimal_types);
return;
}

// if we're at the root, this is a new output column
nesting.push_back(static_cast<int>(output_columns.size()));
auto const col_type =
to_type_id(schema, strings_to_categorical, timestamp_type_id, strict_decimal_types);
auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64
? data_type{col_type, numeric::scale_type{-schema.decimal_scale}}
: data_type{col_type};
output_columns.emplace_back(dtype, schema.repetition_type == OPTIONAL ? true : false);
column_buffer& output_col = output_columns.back();
output_col.name = schema.name;

// build each child
for (int idx = 0; idx < schema.num_children; idx++) {
build_column_info(schema_idx,
input_columns,
output_col.children,
nesting,
strings_to_categorical,
timestamp_type_id,
strict_decimal_types);
}

// if I have no children, we're at a leaf and I'm an input column (that is, one with actual
// data stored) so add me to the list.
if (schema.num_children == 0) {
input_columns.emplace_back(input_column_info{start_schema_idx, schema.name});
input_column_info& input_col = input_columns.back();
std::copy(nesting.begin(), nesting.end(), std::back_inserter(input_col.nesting));
}

nesting.pop_back();
}

/**
* @brief Filters and reduces down to a selection of columns
*
* @param use_names List of column names to select
* @param use_names List of paths of column names to select
* @param include_index Whether to always include the PANDAS index column(s)
* @param strings_to_categorical Type conversion parameter
* @param timestamp_type_id Type conversion parameter
* @param strict_decimal_types Type conversion parameter
*
* @return input column information, output column information, list of output column schema
* indices
Expand All @@ -639,9 +569,85 @@ class aggregate_metadata {
type_id timestamp_type_id,
bool strict_decimal_types) const
{
auto const& pfm = per_file_metadata[0];
auto find_schema_child = [&](SchemaElement const& schema_elem, std::string const& name) {
auto const& col_schema_idx = std::find_if(
schema_elem.children_idx.cbegin(),
schema_elem.children_idx.cend(),
[&](size_t col_schema_idx) { return get_schema(col_schema_idx).name == name; });

return (col_schema_idx != schema_elem.children_idx.end()) ? static_cast<int>(*col_schema_idx)
: -1;
};

std::vector<column_buffer> output_columns;
std::vector<input_column_info> input_columns;
std::deque<int> nesting;
devavret marked this conversation as resolved.
Show resolved Hide resolved

// Return true if column path is valid. e.g. if the path is {"struct1", "child1"}, then it is
// valid if "struct1.child1" exists in this file's schema. If "struct1" exists but "child1" is
// not a child of "struct1" then the function will return false for "struct1"
std::function<bool(column_name_info const*, int, std::vector<column_buffer>&)> build_column =
cwharris marked this conversation as resolved.
Show resolved Hide resolved
[&](column_name_info const* col_name_info,
int schema_idx,
std::vector<column_buffer>& out_col_array) {
if (schema_idx < 0) { return false; }
auto const& schema_elem = get_schema(schema_idx);

// if I am a stub, continue on
devavret marked this conversation as resolved.
Show resolved Hide resolved
if (schema_elem.is_stub()) {
// is this legit?
CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub");
auto child_col_name_info = (col_name_info) ? &col_name_info->children[0] : nullptr;
return build_column(child_col_name_info, schema_elem.children_idx[0], out_col_array);
}

// if we're at the root, this is a new output column
auto const col_type =
to_type_id(schema_elem, strings_to_categorical, timestamp_type_id, strict_decimal_types);
auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64
? data_type{col_type, numeric::scale_type{-schema_elem.decimal_scale}}
: data_type{col_type};

column_buffer output_col(dtype, schema_elem.repetition_type == OPTIONAL);
// store the index of this element if inserted in out_col_array
nesting.push_back(static_cast<int>(out_col_array.size()));
output_col.name = schema_elem.name;

// build each child
bool path_is_valid = false;
if (col_name_info == nullptr or col_name_info->children.empty()) {
// add all children of schema_elem.
// At this point, we can no longer pass a col_name_info to build_column
for (int idx = 0; idx < schema_elem.num_children; idx++) {
path_is_valid |=
build_column(nullptr, schema_elem.children_idx[idx], output_col.children);
}
} else {
for (size_t idx = 0; idx < col_name_info->children.size(); idx++) {
path_is_valid |=
build_column(&col_name_info->children[idx],
find_schema_child(schema_elem, col_name_info->children[idx].name),
output_col.children);
}
vuule marked this conversation as resolved.
Show resolved Hide resolved
}

// if I have no children, we're at a leaf and I'm an input column (that is, one with actual
// data stored) so add me to the list.
if (schema_elem.num_children == 0) {
input_column_info& input_col =
input_columns.emplace_back(input_column_info{schema_idx, schema_elem.name});
std::copy(nesting.cbegin(), nesting.cend(), std::back_inserter(input_col.nesting));
path_is_valid = true; // If we're able to reach leaf then path is valid
}

if (path_is_valid) { out_col_array.push_back(std::move(output_col)); }

nesting.pop_back();
return path_is_valid;
};

std::vector<int> output_column_schemas;

// determine the list of output columns
//
// there is not necessarily a 1:1 mapping between input columns and output columns.
// For example, parquet does not explicitly store a ColumnChunkDesc for struct columns.
Expand All @@ -657,43 +663,115 @@ class aggregate_metadata {
// "firstname", "middlename" and "lastname" represent the input columns in the file that we
// process to produce the final cudf "name" column.
//
std::vector<int> output_column_schemas;
// A user can ask for a single field out of the struct e.g. firstname.
// In this case they'll pass a fully qualified name to the schema element like
// ["name", "firstname"]
//
auto const& root = get_schema(0);
if (use_names.empty()) {
// walk the schema and choose all top level columns
for (size_t schema_idx = 1; schema_idx < pfm.schema.size(); schema_idx++) {
auto const& schema = pfm.schema[schema_idx];
if (schema.parent_idx == 0) { output_column_schemas.push_back(schema_idx); }
for (auto const& schema_idx : root.children_idx) {
build_column(nullptr, schema_idx, output_columns);
output_column_schemas.push_back(schema_idx);
}
} else {
// Load subset of columns; include PANDAS index unless excluded
std::vector<std::string> local_use_names = use_names;
if (include_index) { add_pandas_index_names(local_use_names); }
for (const auto& use_name : local_use_names) {
for (size_t schema_idx = 1; schema_idx < pfm.schema.size(); schema_idx++) {
auto const& schema = pfm.schema[schema_idx];
// We select only top level columns by name. Selecting nested columns by name is not
// supported. Top level columns are identified by their parent being the root (idx == 0)
if (use_name == schema.name and schema.parent_idx == 0) {
output_column_schemas.push_back(schema_idx);
}
// Convert schema into a vector of every possible path
std::vector<std::pair<std::string, size_t>> all_paths;
devavret marked this conversation as resolved.
Show resolved Hide resolved
std::function<void(std::string, int)> add_path = [&](std::string path_till_now,
int schema_idx) {
auto const& schema_elem = get_schema(schema_idx);
std::string curr_path = path_till_now + schema_elem.name;
all_paths.emplace_back(curr_path, schema_idx);
for (auto const& child_idx : schema_elem.children_idx) {
add_path(curr_path + ".", child_idx);
}
};
for (auto const& child_idx : get_schema(0).children_idx) {
add_path("", child_idx);
}
}

// construct input and output output column info
std::vector<column_buffer> output_columns;
output_columns.reserve(output_column_schemas.size());
std::vector<input_column_info> input_columns;
std::deque<int> nesting;
for (size_t idx = 0; idx < output_column_schemas.size(); idx++) {
int schema_index = output_column_schemas[idx];
build_column_info(schema_index,
input_columns,
output_columns,
nesting,
strings_to_categorical,
timestamp_type_id,
strict_decimal_types);
// Find which of the selected paths are valid and get their schema index
std::vector<std::pair<std::string, size_t>> valid_selected_paths;
devavret marked this conversation as resolved.
Show resolved Hide resolved
for (auto const& selected_path : use_names) {
auto found_path = std::find_if(
all_paths.begin(), all_paths.end(), [&](std::pair<std::string, size_t>& valid_path) {
return valid_path.first == selected_path;
});
if (found_path != all_paths.end()) {
valid_selected_paths.emplace_back(selected_path, found_path->second);
}
}

// Now construct paths as vector of strings for further consumption
std::vector<std::vector<std::string>> use_names3;
std::transform(valid_selected_paths.begin(),
valid_selected_paths.end(),
std::back_inserter(use_names3),
[&](std::pair<std::string, size_t> const& valid_path) {
auto schema_idx = valid_path.second;
std::vector<std::string> result_path;
do {
SchemaElement const& elem = get_schema(schema_idx);
result_path.push_back(elem.name);
schema_idx = elem.parent_idx;
} while (schema_idx > 0);
return std::vector<std::string>(result_path.rbegin(), result_path.rend());
});

std::vector<column_name_info> selected_columns;
if (include_index) {
std::vector<std::string> index_names = get_pandas_index_names();
std::transform(index_names.cbegin(),
index_names.cend(),
std::back_inserter(selected_columns),
[](std::string const& name) { return column_name_info(name); });
}
// Merge the vector use_names into a set of hierarchical column_name_info objects
/* This is because if we have columns like this:
* col1
* / \
* s3 f4
* / \
* f5 f6
*
* there may be common paths in use_names like:
* {"col1", "s3", "f5"}, {"col1", "f4"}
* which means we want the output to contain
* col1
* / \
* s3 f4
* /
* f5
*
* rather than
* col1 col1
* | |
* s3 f4
* |
* f5
*/
for (auto const& path : use_names3) {
auto array_to_find_in = &selected_columns;
for (size_t depth = 0; depth < path.size(); ++depth) {
// Check if the path exists in our selected_columns and if not, add it.
auto const& name_to_find = path[depth];
auto found_col = std::find_if(
array_to_find_in->begin(),
array_to_find_in->end(),
[&name_to_find](column_name_info const& col) { return col.name == name_to_find; });
if (found_col == array_to_find_in->end()) {
auto& col = array_to_find_in->emplace_back(name_to_find);
array_to_find_in = &col.children;
} else {
// Path exists. go down further.
array_to_find_in = &found_col->children;
}
}
}
for (auto& col : selected_columns) {
auto const& top_level_col_schema_idx = find_schema_child(root, col.name);
bool valid_column = build_column(&col, top_level_col_schema_idx, output_columns);
if (valid_column) output_column_schemas.push_back(top_level_col_schema_idx);
}
}

return std::make_tuple(
Expand Down Expand Up @@ -1581,18 +1659,16 @@ table_with_metadata reader::impl::read(size_type skip_rows,

// create the final output cudf columns
for (size_t i = 0; i < _output_columns.size(); ++i) {
out_metadata.schema_info.push_back(column_name_info{""});
out_columns.emplace_back(
make_column(_output_columns[i], &out_metadata.schema_info.back(), stream, _mr));
column_name_info& col_name = out_metadata.schema_info.emplace_back("");
out_columns.emplace_back(make_column(_output_columns[i], &col_name, stream, _mr));
}
}
}

// Create empty columns as needed (this can happen if we've ended up with no actual data to read)
for (size_t i = out_columns.size(); i < _output_columns.size(); ++i) {
out_metadata.schema_info.push_back(column_name_info{""});
out_columns.emplace_back(cudf::io::detail::empty_like(
_output_columns[i], &out_metadata.schema_info.back(), stream, _mr));
column_name_info& col_name = out_metadata.schema_info.emplace_back("");
out_columns.emplace_back(io::detail::empty_like(_output_columns[i], &col_name, stream, _mr));
}

// Return column names (must match order of returned columns)
Expand Down
Loading