Skip to content

Commit

Permalink
Add nested column selection to parquet reader (#8933)
Browse files Browse the repository at this point in the history
Closes #8850 

Adds ability to select specific children of a nested column. The python API mimics pyarrow and the format is
```python
cudf.read_parquet("test.parquet", columns=["struct1.child1.grandchild2", "struct1.child2"])
```
The C++ API takes each path as a vector
```c++
cudf::io::parquet_reader_options read_args =
  cudf::io::parquet_reader_options::builder(cudf::io::source_info(filepath))
    .columns({{"struct1", "child1", "grandchild2"},
              {"struct1", "child2"}});
```

Authors:
  - Devavret Makkar (https://github.com/devavret)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Christopher Harris (https://github.com/cwharris)

URL: #8933
  • Loading branch information
devavret authored Aug 19, 2021
1 parent f95b43e commit 80bf8d9
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 139 deletions.
2 changes: 1 addition & 1 deletion cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class parquet_reader_options_builder;
class parquet_reader_options {
source_info _source;

// Names of column to read; empty is all
// Path in schema of column to read; empty is all
std::vector<std::string> _columns;

// List of individual row groups to read (ignored if empty)
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ int CompactProtocolReader::WalkSchema(
++idx;
if (e->num_children > 0) {
for (int i = 0; i < e->num_children; i++) {
e->children_idx.push_back(idx);
int idx_old = idx;
idx = WalkSchema(md, idx, parent_idx, max_def_level, max_rep_level);
if (idx <= idx_old) break; // Error
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ struct SchemaElement {
int max_definition_level = 0;
int max_repetition_level = 0;
int parent_idx = 0;
std::vector<size_t> children_idx;

bool operator==(SchemaElement const& other) const
{
Expand Down
310 changes: 196 additions & 114 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,9 @@ class aggregate_metadata {
*
* @param names List of column names to load, where index column name(s) will be added
*/
void add_pandas_index_names(std::vector<std::string>& names) const
std::vector<std::string> get_pandas_index_names() const
{
std::vector<std::string> names;
auto str = get_pandas_index();
if (str.length() != 0) {
std::regex index_name_expr{R"(\"((?:\\.|[^\"])*)\")"};
Expand All @@ -480,6 +481,7 @@ class aggregate_metadata {
str = sm.suffix();
}
}
return names;
}

struct row_group_info {
Expand Down Expand Up @@ -549,86 +551,14 @@ class aggregate_metadata {
return selection;
}

/**
* @brief Build input and output column structures based on schema input. Recursive.
*
* @param[in,out] schema_idx Schema index to build information for. This value gets
* incremented as the function recurses.
* @param[out] input_columns Input column information (source data in the file)
* @param[out] output_columns Output column structure (resulting cudf columns)
* @param[in,out] nesting A stack keeping track of child column indices so we can
* reproduce the linear list of output columns that correspond to an input column.
* @param[in] strings_to_categorical Type conversion parameter
* @param[in] timestamp_type_id Type conversion parameter
* @param[in] strict_decimal_types True if it is an error to load an unsupported decimal type
*
*/
void build_column_info(int& schema_idx,
std::vector<input_column_info>& input_columns,
std::vector<column_buffer>& output_columns,
std::deque<int>& nesting,
bool strings_to_categorical,
type_id timestamp_type_id,
bool strict_decimal_types) const
{
int start_schema_idx = schema_idx;
auto const& schema = get_schema(schema_idx);
schema_idx++;

// if I am a stub, continue on
if (schema.is_stub()) {
// is this legit?
CUDF_EXPECTS(schema.num_children == 1, "Unexpected number of children for stub");
build_column_info(schema_idx,
input_columns,
output_columns,
nesting,
strings_to_categorical,
timestamp_type_id,
strict_decimal_types);
return;
}

// if we're at the root, this is a new output column
nesting.push_back(static_cast<int>(output_columns.size()));
auto const col_type =
to_type_id(schema, strings_to_categorical, timestamp_type_id, strict_decimal_types);
auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64
? data_type{col_type, numeric::scale_type{-schema.decimal_scale}}
: data_type{col_type};
output_columns.emplace_back(dtype, schema.repetition_type == OPTIONAL ? true : false);
column_buffer& output_col = output_columns.back();
output_col.name = schema.name;

// build each child
for (int idx = 0; idx < schema.num_children; idx++) {
build_column_info(schema_idx,
input_columns,
output_col.children,
nesting,
strings_to_categorical,
timestamp_type_id,
strict_decimal_types);
}

// if I have no children, we're at a leaf and I'm an input column (that is, one with actual
// data stored) so add me to the list.
if (schema.num_children == 0) {
input_columns.emplace_back(input_column_info{start_schema_idx, schema.name});
input_column_info& input_col = input_columns.back();
std::copy(nesting.begin(), nesting.end(), std::back_inserter(input_col.nesting));
}

nesting.pop_back();
}

/**
* @brief Filters and reduces down to a selection of columns
*
* @param use_names List of column names to select
* @param use_names List of paths of column names to select
* @param include_index Whether to always include the PANDAS index column(s)
* @param strings_to_categorical Type conversion parameter
* @param timestamp_type_id Type conversion parameter
* @param strict_decimal_types Type conversion parameter
*
* @return input column information, output column information, list of output column schema
* indices
Expand All @@ -639,9 +569,86 @@ class aggregate_metadata {
type_id timestamp_type_id,
bool strict_decimal_types) const
{
auto const& pfm = per_file_metadata[0];
auto find_schema_child = [&](SchemaElement const& schema_elem, std::string const& name) {
auto const& col_schema_idx = std::find_if(
schema_elem.children_idx.cbegin(),
schema_elem.children_idx.cend(),
[&](size_t col_schema_idx) { return get_schema(col_schema_idx).name == name; });

return (col_schema_idx != schema_elem.children_idx.end()) ? static_cast<int>(*col_schema_idx)
: -1;
};

std::vector<column_buffer> output_columns;
std::vector<input_column_info> input_columns;
std::vector<int> nesting;

// Return true if column path is valid. e.g. if the path is {"struct1", "child1"}, then it is
// valid if "struct1.child1" exists in this file's schema. If "struct1" exists but "child1" is
// not a child of "struct1" then the function will return false for "struct1"
std::function<bool(column_name_info const*, int, std::vector<column_buffer>&)> build_column =
[&](column_name_info const* col_name_info,
int schema_idx,
std::vector<column_buffer>& out_col_array) {
if (schema_idx < 0) { return false; }
auto const& schema_elem = get_schema(schema_idx);

// if schema_elem is a stub then it does not exist in the column_name_info and column_buffer
// hierarchy. So continue on
if (schema_elem.is_stub()) {
// is this legit?
CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub");
auto child_col_name_info = (col_name_info) ? &col_name_info->children[0] : nullptr;
return build_column(child_col_name_info, schema_elem.children_idx[0], out_col_array);
}

// if we're at the root, this is a new output column
auto const col_type =
to_type_id(schema_elem, strings_to_categorical, timestamp_type_id, strict_decimal_types);
auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64
? data_type{col_type, numeric::scale_type{-schema_elem.decimal_scale}}
: data_type{col_type};

column_buffer output_col(dtype, schema_elem.repetition_type == OPTIONAL);
// store the index of this element if inserted in out_col_array
nesting.push_back(static_cast<int>(out_col_array.size()));
output_col.name = schema_elem.name;

// build each child
bool path_is_valid = false;
if (col_name_info == nullptr or col_name_info->children.empty()) {
// add all children of schema_elem.
// At this point, we can no longer pass a col_name_info to build_column
for (int idx = 0; idx < schema_elem.num_children; idx++) {
path_is_valid |=
build_column(nullptr, schema_elem.children_idx[idx], output_col.children);
}
} else {
for (size_t idx = 0; idx < col_name_info->children.size(); idx++) {
path_is_valid |=
build_column(&col_name_info->children[idx],
find_schema_child(schema_elem, col_name_info->children[idx].name),
output_col.children);
}
}

// if I have no children, we're at a leaf and I'm an input column (that is, one with actual
// data stored) so add me to the list.
if (schema_elem.num_children == 0) {
input_column_info& input_col =
input_columns.emplace_back(input_column_info{schema_idx, schema_elem.name});
std::copy(nesting.cbegin(), nesting.cend(), std::back_inserter(input_col.nesting));
path_is_valid = true; // If we're able to reach leaf then path is valid
}

if (path_is_valid) { out_col_array.push_back(std::move(output_col)); }

nesting.pop_back();
return path_is_valid;
};

std::vector<int> output_column_schemas;

// determine the list of output columns
//
// there is not necessarily a 1:1 mapping between input columns and output columns.
// For example, parquet does not explicitly store a ColumnChunkDesc for struct columns.
Expand All @@ -657,43 +664,120 @@ class aggregate_metadata {
// "firstname", "middlename" and "lastname" represent the input columns in the file that we
// process to produce the final cudf "name" column.
//
std::vector<int> output_column_schemas;
// A user can ask for a single field out of the struct e.g. firstname.
// In this case they'll pass a fully qualified name to the schema element like
// ["name", "firstname"]
//
auto const& root = get_schema(0);
if (use_names.empty()) {
// walk the schema and choose all top level columns
for (size_t schema_idx = 1; schema_idx < pfm.schema.size(); schema_idx++) {
auto const& schema = pfm.schema[schema_idx];
if (schema.parent_idx == 0) { output_column_schemas.push_back(schema_idx); }
for (auto const& schema_idx : root.children_idx) {
build_column(nullptr, schema_idx, output_columns);
output_column_schemas.push_back(schema_idx);
}
} else {
// Load subset of columns; include PANDAS index unless excluded
std::vector<std::string> local_use_names = use_names;
if (include_index) { add_pandas_index_names(local_use_names); }
for (const auto& use_name : local_use_names) {
for (size_t schema_idx = 1; schema_idx < pfm.schema.size(); schema_idx++) {
auto const& schema = pfm.schema[schema_idx];
// We select only top level columns by name. Selecting nested columns by name is not
// supported. Top level columns are identified by their parent being the root (idx == 0)
if (use_name == schema.name and schema.parent_idx == 0) {
output_column_schemas.push_back(schema_idx);
}
struct path_info {
std::string full_path;
int schema_idx;
};

// Convert schema into a vector of every possible path
std::vector<path_info> all_paths;
std::function<void(std::string, int)> add_path = [&](std::string path_till_now,
int schema_idx) {
auto const& schema_elem = get_schema(schema_idx);
std::string curr_path = path_till_now + schema_elem.name;
all_paths.push_back({curr_path, schema_idx});
for (auto const& child_idx : schema_elem.children_idx) {
add_path(curr_path + ".", child_idx);
}
};
for (auto const& child_idx : get_schema(0).children_idx) {
add_path("", child_idx);
}
}

// construct input and output output column info
std::vector<column_buffer> output_columns;
output_columns.reserve(output_column_schemas.size());
std::vector<input_column_info> input_columns;
std::deque<int> nesting;
for (size_t idx = 0; idx < output_column_schemas.size(); idx++) {
int schema_index = output_column_schemas[idx];
build_column_info(schema_index,
input_columns,
output_columns,
nesting,
strings_to_categorical,
timestamp_type_id,
strict_decimal_types);
// Find which of the selected paths are valid and get their schema index
std::vector<path_info> valid_selected_paths;
for (auto const& selected_path : use_names) {
auto found_path =
std::find_if(all_paths.begin(), all_paths.end(), [&](path_info& valid_path) {
return valid_path.full_path == selected_path;
});
if (found_path != all_paths.end()) {
valid_selected_paths.push_back({selected_path, found_path->schema_idx});
}
}

// Now construct paths as vector of strings for further consumption
std::vector<std::vector<std::string>> use_names3;
std::transform(valid_selected_paths.begin(),
valid_selected_paths.end(),
std::back_inserter(use_names3),
[&](path_info const& valid_path) {
auto schema_idx = valid_path.schema_idx;
std::vector<std::string> result_path;
do {
SchemaElement const& elem = get_schema(schema_idx);
result_path.push_back(elem.name);
schema_idx = elem.parent_idx;
} while (schema_idx > 0);
return std::vector<std::string>(result_path.rbegin(), result_path.rend());
});

std::vector<column_name_info> selected_columns;
if (include_index) {
std::vector<std::string> index_names = get_pandas_index_names();
std::transform(index_names.cbegin(),
index_names.cend(),
std::back_inserter(selected_columns),
[](std::string const& name) { return column_name_info(name); });
}
// Merge the vector use_names into a set of hierarchical column_name_info objects
/* This is because if we have columns like this:
* col1
* / \
* s3 f4
* / \
* f5 f6
*
* there may be common paths in use_names like:
* {"col1", "s3", "f5"}, {"col1", "f4"}
* which means we want the output to contain
* col1
* / \
* s3 f4
* /
* f5
*
* rather than
* col1 col1
* | |
* s3 f4
* |
* f5
*/
for (auto const& path : use_names3) {
auto array_to_find_in = &selected_columns;
for (size_t depth = 0; depth < path.size(); ++depth) {
// Check if the path exists in our selected_columns and if not, add it.
auto const& name_to_find = path[depth];
auto found_col = std::find_if(
array_to_find_in->begin(),
array_to_find_in->end(),
[&name_to_find](column_name_info const& col) { return col.name == name_to_find; });
if (found_col == array_to_find_in->end()) {
auto& col = array_to_find_in->emplace_back(name_to_find);
array_to_find_in = &col.children;
} else {
// Path exists. go down further.
array_to_find_in = &found_col->children;
}
}
}
for (auto& col : selected_columns) {
auto const& top_level_col_schema_idx = find_schema_child(root, col.name);
bool valid_column = build_column(&col, top_level_col_schema_idx, output_columns);
if (valid_column) output_column_schemas.push_back(top_level_col_schema_idx);
}
}

return std::make_tuple(
Expand Down Expand Up @@ -1581,18 +1665,16 @@ table_with_metadata reader::impl::read(size_type skip_rows,

// create the final output cudf columns
for (size_t i = 0; i < _output_columns.size(); ++i) {
out_metadata.schema_info.push_back(column_name_info{""});
out_columns.emplace_back(
make_column(_output_columns[i], &out_metadata.schema_info.back(), stream, _mr));
column_name_info& col_name = out_metadata.schema_info.emplace_back("");
out_columns.emplace_back(make_column(_output_columns[i], &col_name, stream, _mr));
}
}
}

// Create empty columns as needed (this can happen if we've ended up with no actual data to read)
for (size_t i = out_columns.size(); i < _output_columns.size(); ++i) {
out_metadata.schema_info.push_back(column_name_info{""});
out_columns.emplace_back(cudf::io::detail::empty_like(
_output_columns[i], &out_metadata.schema_info.back(), stream, _mr));
column_name_info& col_name = out_metadata.schema_info.emplace_back("");
out_columns.emplace_back(io::detail::empty_like(_output_columns[i], &col_name, stream, _mr));
}

// Return column names (must match order of returned columns)
Expand Down
Loading

0 comments on commit 80bf8d9

Please sign in to comment.