From 38ff1b6377e1bd0cf84e7321f016243b714c6f3b Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Tue, 27 Jul 2021 14:56:43 +0530 Subject: [PATCH 01/24] Use returned references from emplace_back because c++17 --- cpp/src/io/parquet/reader_impl.cu | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 9f9bdfd4755..ca8c978e930 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -596,9 +596,9 @@ class aggregate_metadata { auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64 ? data_type{col_type, numeric::scale_type{-schema.decimal_scale}} : data_type{col_type}; - output_columns.emplace_back(dtype, schema.repetition_type == OPTIONAL ? true : false); - column_buffer& output_col = output_columns.back(); - output_col.name = schema.name; + column_buffer& output_col = + output_columns.emplace_back(dtype, schema.repetition_type == OPTIONAL ? true : false); + output_col.name = schema.name; // build each child for (int idx = 0; idx < schema.num_children; idx++) { @@ -614,8 +614,8 @@ class aggregate_metadata { // if I have no children, we're at a leaf and I'm an input column (that is, one with actual // data stored) so add me to the list. if (schema.num_children == 0) { - input_columns.emplace_back(input_column_info{start_schema_idx, schema.name}); - input_column_info& input_col = input_columns.back(); + input_column_info& input_col = + input_columns.emplace_back(input_column_info{start_schema_idx, schema.name}); std::copy(nesting.begin(), nesting.end(), std::back_inserter(input_col.nesting)); } @@ -1581,18 +1581,16 @@ table_with_metadata reader::impl::read(size_type skip_rows, // create the final output cudf columns for (size_t i = 0; i < _output_columns.size(); ++i) { - out_metadata.schema_info.push_back(column_name_info{""}); - out_columns.emplace_back( - make_column(_output_columns[i], &out_metadata.schema_info.back(), stream, _mr)); + column_name_info& col_name = out_metadata.schema_info.emplace_back(""); + out_columns.emplace_back(make_column(_output_columns[i], &col_name, stream, _mr)); } } } // Create empty columns as needed (this can happen if we've ended up with no actual data to read) for (size_t i = out_columns.size(); i < _output_columns.size(); ++i) { - out_metadata.schema_info.push_back(column_name_info{""}); - out_columns.emplace_back(cudf::io::detail::empty_like( - _output_columns[i], &out_metadata.schema_info.back(), stream, _mr)); + column_name_info& col_name = out_metadata.schema_info.emplace_back(""); + out_columns.emplace_back(io::detail::empty_like(_output_columns[i], &col_name, stream, _mr)); } // Return column names (must match order of returned columns) From 45067a46e4403d78c98cf84a9d4da5544e2ef908 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Tue, 27 Jul 2021 16:31:23 +0530 Subject: [PATCH 02/24] Merge input path in schema --- cpp/src/io/parquet/parquet.cpp | 1 + cpp/src/io/parquet/parquet.hpp | 1 + cpp/src/io/parquet/reader_impl.cu | 54 +++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+) diff --git a/cpp/src/io/parquet/parquet.cpp b/cpp/src/io/parquet/parquet.cpp index 6c658788fa1..c8c54e9933f 100644 --- a/cpp/src/io/parquet/parquet.cpp +++ b/cpp/src/io/parquet/parquet.cpp @@ -347,6 +347,7 @@ int CompactProtocolReader::WalkSchema( ++idx; if (e->num_children > 0) { for (int i = 0; i < e->num_children; i++) { + e->children_idx.push_back(idx); int idx_old = idx; idx = WalkSchema(md, idx, parent_idx, max_def_level, max_rep_level); if (idx <= idx_old) break; // Error diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 2232017409d..4390d1c788f 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -165,6 +165,7 @@ struct SchemaElement { int max_definition_level = 0; int max_repetition_level = 0; int parent_idx = 0; + std::vector children_idx; bool operator==(SchemaElement const& other) const { diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index ca8c978e930..517d237e34c 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -622,6 +622,60 @@ class aggregate_metadata { nesting.pop_back(); } + auto select_columns2(std::vector> const& use_names, + bool include_index, + bool strings_to_categorical, + type_id timestamp_type_id, + bool strict_decimal_types) const + { + // Merge the vector use_names into a set of hierarchical column_name_info objects + /* This is because if we have columns like this: + * col1 + * / \ + * s3 f4 + * / \ + * f5 f6 + * + * there may be common paths in use_names like: + * {"col1", "s3", "f5"}, {"col1", "f4"} + * which means we want the output to contain + * col1 + * / \ + * s3 f4 + * / + * f5 + * + * rather than + * col1 col1 + * | | + * s3 f4 + * | + * f5 + */ + std::vector selected_columns; + for (auto const& path : use_names) { + auto array_to_find_in = &selected_columns; + for (size_t depth = 0; depth < path.size(); ++depth) { + // Check if the path exists in our selected_columns and if not, add it. + auto const& name_to_find = path[depth]; + auto found_col = std::find_if( + array_to_find_in->begin(), + array_to_find_in->end(), + [&name_to_find](column_name_info const& col) { return col.name == name_to_find; }); + if (found_col == array_to_find_in->end()) { + auto& col = array_to_find_in->emplace_back(name_to_find); + array_to_find_in = &col.children; + } else { + // Path exists. go down further. + array_to_find_in = &found_col->children; + } + } + } + + std::cout << "here" << std::endl; + + } + /** * @brief Filters and reduces down to a selection of columns * From 0a3e55cdcd080862a1551961df165aaa0c9d8546 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 28 Jul 2021 13:31:43 +0530 Subject: [PATCH 03/24] Working deep select_columns --- cpp/src/io/parquet/parquet.cpp | 1 + cpp/src/io/parquet/parquet.hpp | 1 + cpp/src/io/parquet/reader_impl.cu | 84 +++++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+) diff --git a/cpp/src/io/parquet/parquet.cpp b/cpp/src/io/parquet/parquet.cpp index c8c54e9933f..16e7471d8f1 100644 --- a/cpp/src/io/parquet/parquet.cpp +++ b/cpp/src/io/parquet/parquet.cpp @@ -342,6 +342,7 @@ int CompactProtocolReader::WalkSchema( e->max_definition_level = max_def_level; e->max_repetition_level = max_rep_level; e->parent_idx = parent_idx; + e->self_idx = idx; parent_idx = idx; ++idx; diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 4390d1c788f..67c5ada1296 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -165,6 +165,7 @@ struct SchemaElement { int max_definition_level = 0; int max_repetition_level = 0; int parent_idx = 0; + int self_idx = 0; std::vector children_idx; bool operator==(SchemaElement const& other) const diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 517d237e34c..a2dd48fd637 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -672,8 +672,92 @@ class aggregate_metadata { } } + // TODO: Wherever we use this, should use get_schema() + auto const& schema = per_file_metadata[0].schema; + + std::cout << "here" << std::endl; + + auto find_schema_child = [&schema](SchemaElement const& schema_elem, + std::string const& name) -> SchemaElement const& { + auto const& col_schema_idx = + std::find_if(schema_elem.children_idx.begin(), + schema_elem.children_idx.end(), + [&](size_t col_schema_idx) { return schema[col_schema_idx].name == name; }); + CUDF_EXPECTS(col_schema_idx != schema_elem.children_idx.end(), "Child not found"); + return schema[*col_schema_idx]; + }; + + std::vector output_columns; + std::vector input_columns; + std::deque nesting; + + std::function&)> + build_column = [&](column_name_info const* col_name_info, + SchemaElement const& schema_elem, + std::vector& out_col_array) { + // if I am a stub, continue on + if (schema_elem.is_stub()) { + // is this legit? + CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub"); + build_column((col_name_info) ? &col_name_info->children[0] : nullptr, + schema[schema_elem.children_idx[0]], + out_col_array); + return; + } + + // if we're at the root, this is a new output column + // TODO: This should be done after out_col_array.emplace_back to signify that we're storing + // the idx of the newly created outcol in nesting + nesting.push_back(static_cast(out_col_array.size())); + auto const col_type = + to_type_id(schema_elem, strings_to_categorical, timestamp_type_id, strict_decimal_types); + auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64 + ? data_type{col_type, numeric::scale_type{-schema_elem.decimal_scale}} + : data_type{col_type}; + + column_buffer& output_col = + out_col_array.emplace_back(dtype, schema_elem.repetition_type == OPTIONAL ? true : false); + output_col.name = schema_elem.name; + + // build each child + if (col_name_info == nullptr or col_name_info->children.empty()) { + // add all children of schema_elem. + // At this point, we can no longer pass a col_name_info to build_column + for (int idx = 0; idx < schema_elem.num_children; idx++) { + build_column(nullptr, schema[schema_elem.children_idx[idx]], output_col.children); + } + } else { + for (size_t idx = 0; idx < col_name_info->children.size(); idx++) { + build_column(&col_name_info->children[idx], + find_schema_child(schema_elem, col_name_info->children[idx].name), + output_col.children); + } + } + + // if I have no children, we're at a leaf and I'm an input column (that is, one with actual + // data stored) so add me to the list. + if (schema_elem.num_children == 0) { + input_column_info& input_col = + input_columns.emplace_back(input_column_info{schema_elem.self_idx, schema_elem.name}); + std::copy(nesting.begin(), nesting.end(), std::back_inserter(input_col.nesting)); + } + + nesting.pop_back(); + }; + + std::vector output_column_schemas; + + for (auto& col : selected_columns) { + auto const& root = schema.front(); + auto const& top_level_col_schem_idx = find_schema_child(root, col.name); + build_column(&col, top_level_col_schem_idx, output_columns); + output_column_schemas.push_back(top_level_col_schem_idx.self_idx); + } + std::cout << "here" << std::endl; + return std::make_tuple( + std::move(input_columns), std::move(output_columns), std::move(output_column_schemas)); } /** From 920b1942811eff73f768e966e749bebdc58bf1fd Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 28 Jul 2021 16:36:34 +0530 Subject: [PATCH 04/24] Change external API and switch over to new select_column --- cpp/include/cudf/io/parquet.hpp | 14 +++++++++----- cpp/src/io/parquet/reader_impl.cu | 12 ++++++------ python/cudf/cudf/_lib/cpp/io/parquet.pxd | 6 +++--- python/cudf/cudf/_lib/parquet.pyx | 7 +++++-- python/cudf/cudf/io/parquet.py | 5 +++++ 5 files changed, 28 insertions(+), 16 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index ecd9607a87e..843131e379e 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -50,8 +50,9 @@ class parquet_reader_options_builder; class parquet_reader_options { source_info _source; - // Names of column to read; empty is all - std::vector _columns; + // Path in schema of column to read; empty is all + // TODO: A more descriptive doc that mentions how things work when using path to field of nested + std::vector> _columns; // List of individual row groups to read (ignored if empty) std::vector> _row_groups; @@ -125,7 +126,7 @@ class parquet_reader_options { /** * @brief Returns names of column to be read. */ - std::vector const& get_columns() const { return _columns; } + std::vector> const& get_columns() const { return _columns; } /** * @brief Returns list of individual row groups to be read. @@ -148,7 +149,10 @@ class parquet_reader_options { * * @param col_names Vector of column names. */ - void set_columns(std::vector col_names) { _columns = std::move(col_names); } + void set_columns(std::vector> col_names) + { + _columns = std::move(col_names); + } /** * @brief Sets vector of individual row groups to read. @@ -246,7 +250,7 @@ class parquet_reader_options_builder { * @param col_names Vector of column names. * @return this for chaining. */ - parquet_reader_options_builder& columns(std::vector col_names) + parquet_reader_options_builder& columns(std::vector> col_names) { options._columns = std::move(col_names); return *this; diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index a2dd48fd637..b85379a02f1 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -796,7 +796,7 @@ class aggregate_metadata { // process to produce the final cudf "name" column. // std::vector output_column_schemas; - if (use_names.empty()) { + if (use_names.empty()) { // Will be `not use_names.has_value()` // walk the schema and choose all top level columns for (size_t schema_idx = 1; schema_idx < pfm.schema.size(); schema_idx++) { auto const& schema = pfm.schema[schema_idx]; @@ -1541,11 +1541,11 @@ reader::impl::impl(std::vector>&& sources, // Select only columns required by the options std::tie(_input_columns, _output_columns, _output_column_schemas) = - _metadata->select_columns(options.get_columns(), - options.is_enabled_use_pandas_metadata(), - _strings_to_categorical, - _timestamp_type.id(), - _strict_decimal_types); + _metadata->select_columns2(options.get_columns(), + options.is_enabled_use_pandas_metadata(), + _strings_to_categorical, + _timestamp_type.id(), + _strict_decimal_types); } table_with_metadata reader::impl::read(size_type skip_rows, diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd index e2053f8ce4f..12d32b9c6bd 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd @@ -16,7 +16,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cdef cppclass parquet_reader_options: parquet_reader_options() except + cudf_io_types.source_info get_source_info() except + - vector[string] get_columns() except + + vector[vector[string]] get_columns() except + vector[vector[size_type]] get_row_groups() except + data_type get_timestamp_type() except + bool is_enabled_convert_strings_to_categories() except + @@ -26,7 +26,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: # setter - void set_columns(vector[string] col_names) except + + void set_columns(vector[vector[string]] col_names) except + void set_row_groups(vector[vector[size_type]] row_grp) except + void enable_convert_strings_to_categories(bool val) except + void enable_use_pandas_metadata(bool val) except + @@ -45,7 +45,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_io_types.source_info src ) except + parquet_reader_options_builder& columns( - vector[string] col_names + vector[vector[string]] col_names ) except + parquet_reader_options_builder& row_groups( vector[vector[size_type]] row_grp diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 52f3aada00b..103106d2ae7 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -120,7 +120,7 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, cdef cudf_io_types.source_info source = make_source_info( filepaths_or_buffers) - cdef vector[string] cpp_columns + cdef vector[vector[string]] cpp_columns cdef bool cpp_strings_to_categorical = strings_to_categorical cdef bool cpp_use_pandas_metadata = use_pandas_metadata cdef size_type cpp_skiprows = skiprows if skiprows is not None else 0 @@ -133,7 +133,10 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, if columns is not None: cpp_columns.reserve(len(columns)) for col in columns or []: - cpp_columns.push_back(str(col).encode()) + cpp_columns.push_back([]) + for path in col: + cpp_columns.back().push_back(str(path).encode()) + if row_groups is not None: cpp_row_groups = row_groups diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index a18486cff3c..bb1349d9391 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -210,6 +210,11 @@ def read_parquet( else: filepaths_or_buffers.append(tmp_source) + if columns is not None: + if not is_list_like(columns): + raise ValueError("Expected list like for columns") + columns = [c.split(".") for c in columns] + if filters is not None: # Convert filters to ds.Expression filters = pq._filters_to_expression(filters) From faa07ecca12dc07b458aaecab43e5e8ed85ac4ca Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 28 Jul 2021 23:10:02 +0530 Subject: [PATCH 05/24] Read all when columns=[] --- cpp/src/io/parquet/reader_impl.cu | 87 +++++++++++++++++-------------- 1 file changed, 47 insertions(+), 40 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index b85379a02f1..2034c2fc41a 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -628,50 +628,57 @@ class aggregate_metadata { type_id timestamp_type_id, bool strict_decimal_types) const { - // Merge the vector use_names into a set of hierarchical column_name_info objects - /* This is because if we have columns like this: - * col1 - * / \ - * s3 f4 - * / \ - * f5 f6 - * - * there may be common paths in use_names like: - * {"col1", "s3", "f5"}, {"col1", "f4"} - * which means we want the output to contain - * col1 - * / \ - * s3 f4 - * / - * f5 - * - * rather than - * col1 col1 - * | | - * s3 f4 - * | - * f5 - */ std::vector selected_columns; - for (auto const& path : use_names) { - auto array_to_find_in = &selected_columns; - for (size_t depth = 0; depth < path.size(); ++depth) { - // Check if the path exists in our selected_columns and if not, add it. - auto const& name_to_find = path[depth]; - auto found_col = std::find_if( - array_to_find_in->begin(), - array_to_find_in->end(), - [&name_to_find](column_name_info const& col) { return col.name == name_to_find; }); - if (found_col == array_to_find_in->end()) { - auto& col = array_to_find_in->emplace_back(name_to_find); - array_to_find_in = &col.children; - } else { - // Path exists. go down further. - array_to_find_in = &found_col->children; + if (use_names.empty()) { + // select all columns + auto const& root = get_schema(0); + for (auto const& col_idx : root.children_idx) { + selected_columns.emplace_back(get_schema(col_idx).name); + } + } else { + // Merge the vector use_names into a set of hierarchical column_name_info objects + /* This is because if we have columns like this: + * col1 + * / \ + * s3 f4 + * / \ + * f5 f6 + * + * there may be common paths in use_names like: + * {"col1", "s3", "f5"}, {"col1", "f4"} + * which means we want the output to contain + * col1 + * / \ + * s3 f4 + * / + * f5 + * + * rather than + * col1 col1 + * | | + * s3 f4 + * | + * f5 + */ + for (auto const& path : use_names) { + auto array_to_find_in = &selected_columns; + for (size_t depth = 0; depth < path.size(); ++depth) { + // Check if the path exists in our selected_columns and if not, add it. + auto const& name_to_find = path[depth]; + auto found_col = std::find_if( + array_to_find_in->begin(), + array_to_find_in->end(), + [&name_to_find](column_name_info const& col) { return col.name == name_to_find; }); + if (found_col == array_to_find_in->end()) { + auto& col = array_to_find_in->emplace_back(name_to_find); + array_to_find_in = &col.children; + } else { + // Path exists. go down further. + array_to_find_in = &found_col->children; + } } } } - // TODO: Wherever we use this, should use get_schema() auto const& schema = per_file_metadata[0].schema; From d634f3e06d13e753b052c91c58276344d264cd4c Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 28 Jul 2021 23:29:46 +0530 Subject: [PATCH 06/24] Use new API in gtests --- cpp/tests/io/parquet_test.cpp | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 1ad844d6706..3133c102999 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -827,7 +827,7 @@ TEST_F(ParquetWriterTest, MultiIndex) cudf_io::parquet_reader_options in_opts = cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) .use_pandas_metadata(true) - .columns({"int8s", "int16s", "int32s"}); + .columns({{"int8s"}, {"int16s"}, {"int32s"}}); auto result = cudf_io::read_parquet(in_opts); CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result.tbl->view()); @@ -2410,7 +2410,8 @@ TEST_F(ParquetReaderTest, ReorderedColumns) // read them out of order cudf_io::parquet_reader_options read_opts = - cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).columns({"b", "a"}); + cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) + .columns({{"b"}, {"a"}}); auto result = cudf_io::read_parquet(read_opts); cudf::test::expect_columns_equal(result.tbl->view().column(0), b); @@ -2432,7 +2433,8 @@ TEST_F(ParquetReaderTest, ReorderedColumns) // read them out of order cudf_io::parquet_reader_options read_opts = - cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).columns({"b", "a"}); + cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) + .columns({{"b"}, {"a"}}); auto result = cudf_io::read_parquet(read_opts); cudf::test::expect_columns_equal(result.tbl->view().column(0), b); @@ -2461,7 +2463,7 @@ TEST_F(ParquetReaderTest, ReorderedColumns) // read them out of order cudf_io::parquet_reader_options read_opts = cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) - .columns({"d", "a", "b", "c"}); + .columns({{"d"}, {"a"}, {"b"}, {"c"}}); auto result = cudf_io::read_parquet(read_opts); cudf::test::expect_columns_equal(result.tbl->view().column(0), d); @@ -2474,7 +2476,7 @@ TEST_F(ParquetReaderTest, ReorderedColumns) // read them out of order cudf_io::parquet_reader_options read_opts = cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) - .columns({"c", "d", "a", "b"}); + .columns({{"c"}, {"d"}, {"a"}, {"b"}}); auto result = cudf_io::read_parquet(read_opts); cudf::test::expect_columns_equal(result.tbl->view().column(0), c); @@ -2487,7 +2489,7 @@ TEST_F(ParquetReaderTest, ReorderedColumns) // read them out of order cudf_io::parquet_reader_options read_opts = cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) - .columns({"d", "c", "b", "a"}); + .columns({{"d"}, {"c"}, {"b"}, {"a"}}); auto result = cudf_io::read_parquet(read_opts); cudf::test::expect_columns_equal(result.tbl->view().column(0), d); @@ -2715,7 +2717,7 @@ TEST_F(ParquetReaderTest, DecimalRead) cudf_io::parquet_reader_options read_strict_opts = read_opts; read_strict_opts.set_strict_decimal_types(true); - read_strict_opts.set_columns({"dec7p4", "dec14p5"}); + read_strict_opts.set_columns({{"dec7p4"}, {"dec14p5"}}); EXPECT_NO_THROW(cudf_io::read_parquet(read_strict_opts)); } { @@ -2813,7 +2815,7 @@ TEST_F(ParquetReaderTest, DecimalRead) cudf_io::parquet_reader_options::builder(cudf_io::source_info{ reinterpret_cast(fixed_len_bytes_decimal_parquet), parquet_len}); read_opts.set_strict_decimal_types(true); - read_opts.set_columns({"dec7p3", "dec12p11"}); + read_opts.set_columns({{"dec7p3"}, {"dec12p11"}}); auto result = cudf_io::read_parquet(read_opts); EXPECT_EQ(result.tbl->view().num_columns(), 2); @@ -2858,7 +2860,7 @@ TEST_F(ParquetReaderTest, DecimalRead) std::begin(col1_data), std::end(col1_data), validity_c1, numeric::scale_type{-11}); cudf::test::expect_columns_equal(result.tbl->view().column(1), col1); - read_opts.set_columns({"dec20p1"}); + read_opts.set_columns({{"dec20p1"}}); EXPECT_THROW(cudf_io::read_parquet(read_opts), cudf::logic_error); } } From 24e71661b47d38edbafb9536e6d4a46594a4909b Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Thu, 29 Jul 2021 01:19:18 +0530 Subject: [PATCH 07/24] Add pandas index --- cpp/src/io/parquet/reader_impl.cu | 11 ++++++++++- cpp/tests/io/parquet_test.cpp | 4 ++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 2034c2fc41a..76ecbf2d363 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -464,8 +464,9 @@ class aggregate_metadata { * * @param names List of column names to load, where index column name(s) will be added */ - void add_pandas_index_names(std::vector& names) const + std::vector get_pandas_index_names() const { + std::vector names; auto str = get_pandas_index(); if (str.length() != 0) { std::regex index_name_expr{R"(\"((?:\\.|[^\"])*)\")"}; @@ -480,6 +481,7 @@ class aggregate_metadata { str = sm.suffix(); } } + return names; } struct row_group_info { @@ -636,6 +638,13 @@ class aggregate_metadata { selected_columns.emplace_back(get_schema(col_idx).name); } } else { + if (include_index) { + std::vector index_names = get_pandas_index_names(); + std::transform(index_names.begin(), + index_names.end(), + std::back_inserter(selected_columns), + [](std::string name) { return column_name_info(name); }); + } // Merge the vector use_names into a set of hierarchical column_name_info objects /* This is because if we have columns like this: * col1 diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 3133c102999..bab6c17d3c4 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -816,7 +816,7 @@ TEST_F(ParquetWriterTest, MultiIndex) expected_metadata.column_metadata[3].set_name("floats"); expected_metadata.column_metadata[4].set_name("doubles"); expected_metadata.user_data.insert( - {"pandas", "\"index_columns\": [\"floats\", \"doubles\"], \"column1\": [\"int8s\"]"}); + {"pandas", "\"index_columns\": [\"int8s\", \"int16s\"], \"column1\": [\"int32s\"]"}); auto filepath = temp_env->get_temp_filepath("MultiIndex.parquet"); cudf_io::parquet_writer_options out_opts = @@ -827,7 +827,7 @@ TEST_F(ParquetWriterTest, MultiIndex) cudf_io::parquet_reader_options in_opts = cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) .use_pandas_metadata(true) - .columns({{"int8s"}, {"int16s"}, {"int32s"}}); + .columns({{"int32s"}, {"floats"}, {"doubles"}}); auto result = cudf_io::read_parquet(in_opts); CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result.tbl->view()); From 0d0baedd6f9f3f57a6823eb5fdb985461ca8036e Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Thu, 29 Jul 2021 22:22:45 +0530 Subject: [PATCH 08/24] Gtests --- cpp/src/io/parquet/reader_impl.cu | 4 - cpp/tests/io/parquet_test.cpp | 129 +++++++++++++++++++++++++++++- 2 files changed, 127 insertions(+), 6 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 76ecbf2d363..8070d72d47b 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -691,8 +691,6 @@ class aggregate_metadata { // TODO: Wherever we use this, should use get_schema() auto const& schema = per_file_metadata[0].schema; - std::cout << "here" << std::endl; - auto find_schema_child = [&schema](SchemaElement const& schema_elem, std::string const& name) -> SchemaElement const& { auto const& col_schema_idx = @@ -770,8 +768,6 @@ class aggregate_metadata { output_column_schemas.push_back(top_level_col_schem_idx.self_idx); } - std::cout << "here" << std::endl; - return std::make_tuple( std::move(input_columns), std::move(output_columns), std::move(output_column_schemas)); } diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index bab6c17d3c4..62d2bfe96e4 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -967,8 +967,6 @@ TEST_F(ParquetWriterTest, StructOfList) auto struct_2 = cudf::test::structs_column_wrapper{{is_human_col, struct_1}, {0, 1, 1, 1, 1, 1}}.release(); - // cudf::test::print(struct_2->child(1).child(2)); - auto expected = table_view({*struct_2}); cudf_io::table_input_metadata expected_metadata(expected); @@ -2499,6 +2497,133 @@ TEST_F(ParquetReaderTest, ReorderedColumns) } } +TEST_F(ParquetReaderTest, SelectNestedColumn) +{ + // Struct>, + // flats:List> + // > + // > + + auto weights_col = cudf::test::fixed_width_column_wrapper{1.1, 2.4, 5.3, 8.0, 9.6, 6.9}; + + auto ages_col = + cudf::test::fixed_width_column_wrapper{{48, 27, 25, 31, 351, 351}, {1, 1, 1, 1, 1, 0}}; + + auto struct_1 = cudf::test::structs_column_wrapper{{weights_col, ages_col}, {1, 1, 1, 1, 0, 1}}; + + auto is_human_col = cudf::test::fixed_width_column_wrapper{ + {true, true, false, false, false, false}, {1, 1, 0, 1, 1, 0}}; + + auto struct_2 = + cudf::test::structs_column_wrapper{{is_human_col, struct_1}, {0, 1, 1, 1, 1, 1}}.release(); + + auto input = table_view({*struct_2}); + + cudf_io::table_input_metadata input_metadata(input); + input_metadata.column_metadata[0].set_name("being"); + input_metadata.column_metadata[0].child(0).set_name("human?"); + input_metadata.column_metadata[0].child(1).set_name("particulars"); + input_metadata.column_metadata[0].child(1).child(0).set_name("weight"); + input_metadata.column_metadata[0].child(1).child(1).set_name("age"); + + auto filepath = temp_env->get_temp_filepath("SelectNestedColumn.parquet"); + cudf_io::parquet_writer_options args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, input) + .metadata(&input_metadata); + cudf_io::write_parquet(args); + + { // Test selecting a single leaf from the table + cudf_io::parquet_reader_options read_args = + cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath)) + .columns({{"being", "particulars", "age"}}); + const auto result = cudf_io::read_parquet(read_args); + + auto expect_ages_col = cudf::test::fixed_width_column_wrapper{ + {48, 27, 25, 31, 351, 351}, {1, 1, 1, 1, 1, 0}}; + auto expect_s_1 = cudf::test::structs_column_wrapper{{expect_ages_col}, {1, 1, 1, 1, 0, 1}}; + auto expect_s_2 = + cudf::test::structs_column_wrapper{{expect_s_1}, {0, 1, 1, 1, 1, 1}}.release(); + auto expected = table_view({*expect_s_2}); + + cudf_io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("being"); + expected_metadata.column_metadata[0].child(0).set_name("particulars"); + expected_metadata.column_metadata[0].child(0).child(0).set_name("age"); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + compare_metadata_equality(expected_metadata, result.metadata); + } + + { // Test selecting a non-leaf and expecting all hierarchy from that node onwards + cudf_io::parquet_reader_options read_args = + cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath)) + .columns({{"being", "particulars"}}); + const auto result = cudf_io::read_parquet(read_args); + + auto expected_weights_col = + cudf::test::fixed_width_column_wrapper{1.1, 2.4, 5.3, 8.0, 9.6, 6.9}; + + auto expected_ages_col = cudf::test::fixed_width_column_wrapper{ + {48, 27, 25, 31, 351, 351}, {1, 1, 1, 1, 1, 0}}; + + auto expected_s_1 = cudf::test::structs_column_wrapper{ + {expected_weights_col, expected_ages_col}, {1, 1, 1, 1, 0, 1}}; + + auto expect_s_2 = + cudf::test::structs_column_wrapper{{expected_s_1}, {0, 1, 1, 1, 1, 1}}.release(); + auto expected = table_view({*expect_s_2}); + + cudf_io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("being"); + expected_metadata.column_metadata[0].child(0).set_name("particulars"); + expected_metadata.column_metadata[0].child(0).child(0).set_name("weight"); + expected_metadata.column_metadata[0].child(0).child(1).set_name("age"); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + compare_metadata_equality(expected_metadata, result.metadata); + } + + { // Test selecting struct children out of order + cudf_io::parquet_reader_options read_args = + cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath)) + .columns({{"being", "particulars", "age"}, + {"being", "particulars", "weight"}, + {"being", "human?"}}); + const auto result = cudf_io::read_parquet(read_args); + + auto expected_weights_col = + cudf::test::fixed_width_column_wrapper{1.1, 2.4, 5.3, 8.0, 9.6, 6.9}; + + auto expected_ages_col = cudf::test::fixed_width_column_wrapper{ + {48, 27, 25, 31, 351, 351}, {1, 1, 1, 1, 1, 0}}; + + auto expected_is_human_col = cudf::test::fixed_width_column_wrapper{ + {true, true, false, false, false, false}, {1, 1, 0, 1, 1, 0}}; + + auto expect_s_1 = cudf::test::structs_column_wrapper{{expected_ages_col, expected_weights_col}, + {1, 1, 1, 1, 0, 1}}; + + auto expect_s_2 = + cudf::test::structs_column_wrapper{{expect_s_1, expected_is_human_col}, {0, 1, 1, 1, 1, 1}} + .release(); + + auto expected = table_view({*expect_s_2}); + + cudf_io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("being"); + expected_metadata.column_metadata[0].child(0).set_name("particulars"); + expected_metadata.column_metadata[0].child(0).child(0).set_name("age"); + expected_metadata.column_metadata[0].child(0).child(1).set_name("weight"); + expected_metadata.column_metadata[0].child(1).set_name("human?"); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + compare_metadata_equality(expected_metadata, result.metadata); + } +} + TEST_F(ParquetReaderTest, DecimalRead) { { From 6afae817176c307eea393096579c0448c81a48d3 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Mon, 2 Aug 2021 15:00:17 +0530 Subject: [PATCH 09/24] Enable pytest that depended on #7561 --- python/cudf/cudf/tests/test_parquet.py | 32 ++++++++++---------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 21dc8315e32..34609ee32f0 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1860,26 +1860,18 @@ def test_parquet_writer_list_statistics(tmpdir): ] }, # List of Structs - pytest.param( - { - "family": [ - [ - None, - {"human?": True, "deets": {"weight": 2.4, "age": 27}}, - ], - [ - {"human?": None, "deets": {"weight": 5.3, "age": 25}}, - {"human?": False, "deets": {"weight": 8.0, "age": 31}}, - {"human?": False, "deets": None}, - ], - [], - [{"human?": None, "deets": {"weight": 6.9, "age": None}}], - ] - }, - marks=pytest.mark.xfail( - reason="https://github.com/rapidsai/cudf/issues/7561" - ), - ), + { + "family": [ + [None, {"human?": True, "deets": {"weight": 2.4, "age": 27}},], + [ + {"human?": None, "deets": {"weight": 5.3, "age": 25}}, + {"human?": False, "deets": {"weight": 8.0, "age": 31}}, + {"human?": False, "deets": None}, + ], + [], + [{"human?": None, "deets": {"weight": 6.9, "age": None}}], + ] + }, # Struct of Lists pytest.param( { From 2b9d65943d11641866ff4bfd8886e6ba6b211ff4 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Mon, 2 Aug 2021 15:01:40 +0530 Subject: [PATCH 10/24] Fix test breakage in pyarrow engine columns should be a list of list only for cudf engine --- python/cudf/cudf/_lib/parquet.pyx | 1 + python/cudf/cudf/io/parquet.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 103106d2ae7..5c0fa5f62bd 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -131,6 +131,7 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, ) if columns is not None: + columns = [c.split(".") for c in columns] cpp_columns.reserve(len(columns)) for col in columns or []: cpp_columns.push_back([]) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index bb1349d9391..fa748761695 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -213,7 +213,6 @@ def read_parquet( if columns is not None: if not is_list_like(columns): raise ValueError("Expected list like for columns") - columns = [c.split(".") for c in columns] if filters is not None: # Convert filters to ds.Expression From cbdf5379a6ae2e54b447439c42f3e1b9ff298cff Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Mon, 2 Aug 2021 15:04:05 +0530 Subject: [PATCH 11/24] verify selected col name for stub (list) schema element --- cpp/src/io/parquet/reader_impl.cu | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 8070d72d47b..820131c8133 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -697,7 +697,8 @@ class aggregate_metadata { std::find_if(schema_elem.children_idx.begin(), schema_elem.children_idx.end(), [&](size_t col_schema_idx) { return schema[col_schema_idx].name == name; }); - CUDF_EXPECTS(col_schema_idx != schema_elem.children_idx.end(), "Child not found"); + CUDF_EXPECTS(col_schema_idx != schema_elem.children_idx.end(), + "Child \"" + name + "\" not found in \"" + schema_elem.name + "\""); return schema[*col_schema_idx]; }; @@ -713,9 +714,14 @@ class aggregate_metadata { if (schema_elem.is_stub()) { // is this legit? CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub"); - build_column((col_name_info) ? &col_name_info->children[0] : nullptr, - schema[schema_elem.children_idx[0]], - out_col_array); + auto child_col_name_info = (col_name_info) ? &col_name_info->children[0] : nullptr; + + // if we still have a specified col_name_info at this level then verify if name matches + // with child + auto& child_schema = (col_name_info) + ? find_schema_child(schema_elem, col_name_info->children[0].name) + : schema[schema_elem.children_idx[0]]; + build_column(child_col_name_info, child_schema, out_col_array); return; } From fd30a0030163ff06ac883de790167b48f6f53888 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Mon, 2 Aug 2021 15:20:35 +0530 Subject: [PATCH 12/24] Fix perf regression for large number of cols Large number of columns (n) would require a top level search of n in a vector of n children of root schema. Making this an O(n^2) operation. For 5000 columns with no selection, this took 52ms, more than the actual reading time. --- cpp/src/io/parquet/reader_impl.cu | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 820131c8133..154bdef9520 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -767,11 +767,18 @@ class aggregate_metadata { std::vector output_column_schemas; - for (auto& col : selected_columns) { - auto const& root = schema.front(); - auto const& top_level_col_schem_idx = find_schema_child(root, col.name); - build_column(&col, top_level_col_schem_idx, output_columns); - output_column_schemas.push_back(top_level_col_schem_idx.self_idx); + auto const& root = schema.front(); + if (use_names.empty()) { + for (auto const& schema_idx : root.children_idx) { + build_column(nullptr, get_schema(schema_idx), output_columns); + output_column_schemas.push_back(schema_idx); + } + } else { + for (auto& col : selected_columns) { + auto const& top_level_col_schem_idx = find_schema_child(root, col.name); + build_column(&col, top_level_col_schem_idx, output_columns); + output_column_schemas.push_back(top_level_col_schem_idx.self_idx); + } } return std::make_tuple( @@ -823,7 +830,7 @@ class aggregate_metadata { } else { // Load subset of columns; include PANDAS index unless excluded std::vector local_use_names = use_names; - if (include_index) { add_pandas_index_names(local_use_names); } + // if (include_index) { add_pandas_index_names(local_use_names); } for (const auto& use_name : local_use_names) { for (size_t schema_idx = 1; schema_idx < pfm.schema.size(); schema_idx++) { auto const& schema = pfm.schema[schema_idx]; From 4965097fe9e400d0bd30e1c0bc4c9c5e2c36c10f Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Mon, 2 Aug 2021 15:56:50 +0530 Subject: [PATCH 13/24] Cleanups: - Remove old code - Replace schema with get_schema --- cpp/src/io/parquet/reader_impl.cu | 312 +++++++++--------------------- 1 file changed, 90 insertions(+), 222 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 154bdef9520..cc4caf67b72 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -552,154 +552,32 @@ class aggregate_metadata { } /** - * @brief Build input and output column structures based on schema input. Recursive. + * @brief Filters and reduces down to a selection of columns * - * @param[in,out] schema_idx Schema index to build information for. This value gets - * incremented as the function recurses. - * @param[out] input_columns Input column information (source data in the file) - * @param[out] output_columns Output column structure (resulting cudf columns) - * @param[in,out] nesting A stack keeping track of child column indices so we can - * reproduce the linear list of output columns that correspond to an input column. - * @param[in] strings_to_categorical Type conversion parameter - * @param[in] timestamp_type_id Type conversion parameter - * @param[in] strict_decimal_types True if it is an error to load an unsupported decimal type + * @param use_names List of paths of column names to select + * @param include_index Whether to always include the PANDAS index column(s) + * @param strings_to_categorical Type conversion parameter + * @param timestamp_type_id Type conversion parameter + * @param strict_decimal_types Type conversion parameter * + * @return input column information, output column information, list of output column schema + * indices */ - void build_column_info(int& schema_idx, - std::vector& input_columns, - std::vector& output_columns, - std::deque& nesting, - bool strings_to_categorical, - type_id timestamp_type_id, - bool strict_decimal_types) const - { - int start_schema_idx = schema_idx; - auto const& schema = get_schema(schema_idx); - schema_idx++; - - // if I am a stub, continue on - if (schema.is_stub()) { - // is this legit? - CUDF_EXPECTS(schema.num_children == 1, "Unexpected number of children for stub"); - build_column_info(schema_idx, - input_columns, - output_columns, - nesting, - strings_to_categorical, - timestamp_type_id, - strict_decimal_types); - return; - } - - // if we're at the root, this is a new output column - nesting.push_back(static_cast(output_columns.size())); - auto const col_type = - to_type_id(schema, strings_to_categorical, timestamp_type_id, strict_decimal_types); - auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64 - ? data_type{col_type, numeric::scale_type{-schema.decimal_scale}} - : data_type{col_type}; - column_buffer& output_col = - output_columns.emplace_back(dtype, schema.repetition_type == OPTIONAL ? true : false); - output_col.name = schema.name; - - // build each child - for (int idx = 0; idx < schema.num_children; idx++) { - build_column_info(schema_idx, - input_columns, - output_col.children, - nesting, - strings_to_categorical, - timestamp_type_id, - strict_decimal_types); - } - - // if I have no children, we're at a leaf and I'm an input column (that is, one with actual - // data stored) so add me to the list. - if (schema.num_children == 0) { - input_column_info& input_col = - input_columns.emplace_back(input_column_info{start_schema_idx, schema.name}); - std::copy(nesting.begin(), nesting.end(), std::back_inserter(input_col.nesting)); - } - - nesting.pop_back(); - } - - auto select_columns2(std::vector> const& use_names, - bool include_index, - bool strings_to_categorical, - type_id timestamp_type_id, - bool strict_decimal_types) const + auto select_columns(std::vector> const& use_names, + bool include_index, + bool strings_to_categorical, + type_id timestamp_type_id, + bool strict_decimal_types) const { - std::vector selected_columns; - if (use_names.empty()) { - // select all columns - auto const& root = get_schema(0); - for (auto const& col_idx : root.children_idx) { - selected_columns.emplace_back(get_schema(col_idx).name); - } - } else { - if (include_index) { - std::vector index_names = get_pandas_index_names(); - std::transform(index_names.begin(), - index_names.end(), - std::back_inserter(selected_columns), - [](std::string name) { return column_name_info(name); }); - } - // Merge the vector use_names into a set of hierarchical column_name_info objects - /* This is because if we have columns like this: - * col1 - * / \ - * s3 f4 - * / \ - * f5 f6 - * - * there may be common paths in use_names like: - * {"col1", "s3", "f5"}, {"col1", "f4"} - * which means we want the output to contain - * col1 - * / \ - * s3 f4 - * / - * f5 - * - * rather than - * col1 col1 - * | | - * s3 f4 - * | - * f5 - */ - for (auto const& path : use_names) { - auto array_to_find_in = &selected_columns; - for (size_t depth = 0; depth < path.size(); ++depth) { - // Check if the path exists in our selected_columns and if not, add it. - auto const& name_to_find = path[depth]; - auto found_col = std::find_if( - array_to_find_in->begin(), - array_to_find_in->end(), - [&name_to_find](column_name_info const& col) { return col.name == name_to_find; }); - if (found_col == array_to_find_in->end()) { - auto& col = array_to_find_in->emplace_back(name_to_find); - array_to_find_in = &col.children; - } else { - // Path exists. go down further. - array_to_find_in = &found_col->children; - } - } - } - } - // TODO: Wherever we use this, should use get_schema() - auto const& schema = per_file_metadata[0].schema; - - auto find_schema_child = [&schema](SchemaElement const& schema_elem, - std::string const& name) -> SchemaElement const& { - auto const& col_schema_idx = - std::find_if(schema_elem.children_idx.begin(), - schema_elem.children_idx.end(), - [&](size_t col_schema_idx) { return schema[col_schema_idx].name == name; }); + auto find_schema_child = [&](SchemaElement const& schema_elem, + std::string const& name) -> SchemaElement const& { + auto const& col_schema_idx = std::find_if( + schema_elem.children_idx.begin(), + schema_elem.children_idx.end(), + [&](size_t col_schema_idx) { return get_schema(col_schema_idx).name == name; }); CUDF_EXPECTS(col_schema_idx != schema_elem.children_idx.end(), "Child \"" + name + "\" not found in \"" + schema_elem.name + "\""); - return schema[*col_schema_idx]; + return get_schema(*col_schema_idx); }; std::vector output_columns; @@ -720,15 +598,12 @@ class aggregate_metadata { // with child auto& child_schema = (col_name_info) ? find_schema_child(schema_elem, col_name_info->children[0].name) - : schema[schema_elem.children_idx[0]]; + : get_schema(schema_elem.children_idx[0]); build_column(child_col_name_info, child_schema, out_col_array); return; } // if we're at the root, this is a new output column - // TODO: This should be done after out_col_array.emplace_back to signify that we're storing - // the idx of the newly created outcol in nesting - nesting.push_back(static_cast(out_col_array.size())); auto const col_type = to_type_id(schema_elem, strings_to_categorical, timestamp_type_id, strict_decimal_types); auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64 @@ -737,6 +612,8 @@ class aggregate_metadata { column_buffer& output_col = out_col_array.emplace_back(dtype, schema_elem.repetition_type == OPTIONAL ? true : false); + // store the index of this newly inserted element + nesting.push_back(static_cast(out_col_array.size()) - 1); output_col.name = schema_elem.name; // build each child @@ -744,7 +621,7 @@ class aggregate_metadata { // add all children of schema_elem. // At this point, we can no longer pass a col_name_info to build_column for (int idx = 0; idx < schema_elem.num_children; idx++) { - build_column(nullptr, schema[schema_elem.children_idx[idx]], output_col.children); + build_column(nullptr, get_schema(schema_elem.children_idx[idx]), output_col.children); } } else { for (size_t idx = 0; idx < col_name_info->children.size(); idx++) { @@ -767,44 +644,6 @@ class aggregate_metadata { std::vector output_column_schemas; - auto const& root = schema.front(); - if (use_names.empty()) { - for (auto const& schema_idx : root.children_idx) { - build_column(nullptr, get_schema(schema_idx), output_columns); - output_column_schemas.push_back(schema_idx); - } - } else { - for (auto& col : selected_columns) { - auto const& top_level_col_schem_idx = find_schema_child(root, col.name); - build_column(&col, top_level_col_schem_idx, output_columns); - output_column_schemas.push_back(top_level_col_schem_idx.self_idx); - } - } - - return std::make_tuple( - std::move(input_columns), std::move(output_columns), std::move(output_column_schemas)); - } - - /** - * @brief Filters and reduces down to a selection of columns - * - * @param use_names List of column names to select - * @param include_index Whether to always include the PANDAS index column(s) - * @param strings_to_categorical Type conversion parameter - * @param timestamp_type_id Type conversion parameter - * - * @return input column information, output column information, list of output column schema - * indices - */ - auto select_columns(std::vector const& use_names, - bool include_index, - bool strings_to_categorical, - type_id timestamp_type_id, - bool strict_decimal_types) const - { - auto const& pfm = per_file_metadata[0]; - - // determine the list of output columns // // there is not necessarily a 1:1 mapping between input columns and output columns. // For example, parquet does not explicitly store a ColumnChunkDesc for struct columns. @@ -820,43 +659,72 @@ class aggregate_metadata { // "firstname", "middlename" and "lastname" represent the input columns in the file that we // process to produce the final cudf "name" column. // - std::vector output_column_schemas; - if (use_names.empty()) { // Will be `not use_names.has_value()` - // walk the schema and choose all top level columns - for (size_t schema_idx = 1; schema_idx < pfm.schema.size(); schema_idx++) { - auto const& schema = pfm.schema[schema_idx]; - if (schema.parent_idx == 0) { output_column_schemas.push_back(schema_idx); } + // A user can ask for a single field out of the struct e.g. firstname. + // In this case they'll pass a fully qualified name to the schema element like + // ["name", "firstname"] + // + auto const& root = get_schema(0); + if (use_names.empty()) { + for (auto const& schema_idx : root.children_idx) { + build_column(nullptr, get_schema(schema_idx), output_columns); + output_column_schemas.push_back(schema_idx); } } else { - // Load subset of columns; include PANDAS index unless excluded - std::vector local_use_names = use_names; - // if (include_index) { add_pandas_index_names(local_use_names); } - for (const auto& use_name : local_use_names) { - for (size_t schema_idx = 1; schema_idx < pfm.schema.size(); schema_idx++) { - auto const& schema = pfm.schema[schema_idx]; - // We select only top level columns by name. Selecting nested columns by name is not - // supported. Top level columns are identified by their parent being the root (idx == 0) - if (use_name == schema.name and schema.parent_idx == 0) { - output_column_schemas.push_back(schema_idx); + std::vector selected_columns; + if (include_index) { + std::vector index_names = get_pandas_index_names(); + std::transform(index_names.begin(), + index_names.end(), + std::back_inserter(selected_columns), + [](std::string name) { return column_name_info(name); }); + } + // Merge the vector use_names into a set of hierarchical column_name_info objects + /* This is because if we have columns like this: + * col1 + * / \ + * s3 f4 + * / \ + * f5 f6 + * + * there may be common paths in use_names like: + * {"col1", "s3", "f5"}, {"col1", "f4"} + * which means we want the output to contain + * col1 + * / \ + * s3 f4 + * / + * f5 + * + * rather than + * col1 col1 + * | | + * s3 f4 + * | + * f5 + */ + for (auto const& path : use_names) { + auto array_to_find_in = &selected_columns; + for (size_t depth = 0; depth < path.size(); ++depth) { + // Check if the path exists in our selected_columns and if not, add it. + auto const& name_to_find = path[depth]; + auto found_col = std::find_if( + array_to_find_in->begin(), + array_to_find_in->end(), + [&name_to_find](column_name_info const& col) { return col.name == name_to_find; }); + if (found_col == array_to_find_in->end()) { + auto& col = array_to_find_in->emplace_back(name_to_find); + array_to_find_in = &col.children; + } else { + // Path exists. go down further. + array_to_find_in = &found_col->children; } } } - } - - // construct input and output output column info - std::vector output_columns; - output_columns.reserve(output_column_schemas.size()); - std::vector input_columns; - std::deque nesting; - for (size_t idx = 0; idx < output_column_schemas.size(); idx++) { - int schema_index = output_column_schemas[idx]; - build_column_info(schema_index, - input_columns, - output_columns, - nesting, - strings_to_categorical, - timestamp_type_id, - strict_decimal_types); + for (auto& col : selected_columns) { + auto const& top_level_col_schema = find_schema_child(root, col.name); + build_column(&col, top_level_col_schema, output_columns); + output_column_schemas.push_back(top_level_col_schema.self_idx); + } } return std::make_tuple( @@ -1566,11 +1434,11 @@ reader::impl::impl(std::vector>&& sources, // Select only columns required by the options std::tie(_input_columns, _output_columns, _output_column_schemas) = - _metadata->select_columns2(options.get_columns(), - options.is_enabled_use_pandas_metadata(), - _strings_to_categorical, - _timestamp_type.id(), - _strict_decimal_types); + _metadata->select_columns(options.get_columns(), + options.is_enabled_use_pandas_metadata(), + _strings_to_categorical, + _timestamp_type.id(), + _strict_decimal_types); } table_with_metadata reader::impl::read(size_type skip_rows, From 75f318d29140112d3061a7d34c811af9df97510c Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Mon, 2 Aug 2021 23:56:45 +0530 Subject: [PATCH 14/24] Skip verifying list's child's name Because parquet spec is flexible about it --- cpp/src/io/parquet/reader_impl.cu | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index cc4caf67b72..9356ce9650e 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -575,6 +575,8 @@ class aggregate_metadata { schema_elem.children_idx.begin(), schema_elem.children_idx.end(), [&](size_t col_schema_idx) { return get_schema(col_schema_idx).name == name; }); + // fut: Maybe it'd be better if we could return the full path in the exception rather than + // just the parent CUDF_EXPECTS(col_schema_idx != schema_elem.children_idx.end(), "Child \"" + name + "\" not found in \"" + schema_elem.name + "\""); return get_schema(*col_schema_idx); @@ -593,13 +595,7 @@ class aggregate_metadata { // is this legit? CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub"); auto child_col_name_info = (col_name_info) ? &col_name_info->children[0] : nullptr; - - // if we still have a specified col_name_info at this level then verify if name matches - // with child - auto& child_schema = (col_name_info) - ? find_schema_child(schema_elem, col_name_info->children[0].name) - : get_schema(schema_elem.children_idx[0]); - build_column(child_col_name_info, child_schema, out_col_array); + build_column(child_col_name_info, get_schema(schema_elem.children_idx[0]), out_col_array); return; } From 4acd894a857dbd95dc5ec3f10d5325aa021b6edf Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Tue, 3 Aug 2021 14:49:21 +0530 Subject: [PATCH 15/24] Add pytests for select columns --- python/cudf/cudf/tests/test_parquet.py | 68 +++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 34609ee32f0..8ab0ffa4739 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1140,6 +1140,72 @@ def test_parquet_reader_struct_basic(tmpdir, data): assert expect.equals(got.to_arrow()) +def select_columns_params(): + dfs = [ + # struct + ( + [ + {"a": 1, "b": 2}, + {"a": 10, "b": 20}, + {"a": None, "b": 22}, + {"a": None, "b": None}, + {"a": 15, "b": None}, + ], + [["struct"], ["struct.a"], ["struct.b"]], + ), + # struct-of-list + ( + [ + {"a": 1, "b": 2, "c": [1, 2, 3]}, + {"a": 10, "b": 20, "c": [4, 5]}, + {"a": None, "b": 22, "c": [6]}, + {"a": None, "b": None, "c": None}, + {"a": 15, "b": None, "c": [-1, -2]}, + None, + {"a": 100, "b": 200, "c": [-10, None, -20]}, + ], + [ + ["struct"], + ["struct.c"], + ["struct.c.list"], + ["struct.c.list.item"], + ["struct.b", "struct.c"], + ], + ), + # list-of-struct + ( + [ + [{"a": 1, "b": 2}, {"a": 2, "b": 3}, {"a": 4, "b": 5}], + None, + [{"a": 10, "b": 20}], + [{"a": 100, "b": 200}, {"a": None, "b": 300}, None], + ], + [ + ["struct"], + ["struct.list"], + ["struct.list.item"], + ["struct.list.item.a", "struct.list.item.b"], + ], + ), + ] + for df_col_pair in dfs: + for cols in df_col_pair[1]: + yield df_col_pair[0], cols + + +@pytest.mark.parametrize("data, columns", select_columns_params()) +def test_parquet_reader_struct_select_columns1(tmpdir, data, columns): + table = pa.Table.from_pydict({"struct": data}) + fname = tmpdir.join("test_parquet_reader_struct_basic.parquet") + + pa.parquet.write_table(table, fname) + assert os.path.exists(fname) + + expect = pq.ParquetFile(fname).read(columns=columns) + got = cudf.read_parquet(fname, columns=columns) + assert expect.equals(got.to_arrow()) + + def test_parquet_reader_struct_los_large(tmpdir): num_rows = 256 list_size = 64 @@ -1862,7 +1928,7 @@ def test_parquet_writer_list_statistics(tmpdir): # List of Structs { "family": [ - [None, {"human?": True, "deets": {"weight": 2.4, "age": 27}},], + [None, {"human?": True, "deets": {"weight": 2.4, "age": 27}}], [ {"human?": None, "deets": {"weight": 5.3, "age": 25}}, {"human?": False, "deets": {"weight": 8.0, "age": 31}}, From 245f2cc77945cc98eae28fc77f9a62da1afa26c1 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Tue, 3 Aug 2021 21:58:48 +0530 Subject: [PATCH 16/24] Fix arrow source test with new API --- cpp/tests/io/arrow_io_source_test.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/tests/io/arrow_io_source_test.cpp b/cpp/tests/io/arrow_io_source_test.cpp index 24964db5f8c..5d0df496dcf 100644 --- a/cpp/tests/io/arrow_io_source_test.cpp +++ b/cpp/tests/io/arrow_io_source_test.cpp @@ -71,8 +71,8 @@ TEST_F(ArrowIOTest, S3FileSystem) // Populate the Parquet Reader Options cudf::io::source_info src(datasource.get()); - std::vector single_column; - single_column.insert(single_column.begin(), "total_bill"); + std::vector> single_column; + single_column.insert(single_column.begin(), {"total_bill"}); cudf::io::parquet_reader_options_builder builder(src); cudf::io::parquet_reader_options options = builder.columns(single_column).build(); From 2b7c1811aeae27c78a8268c2430e56fd8babd1a8 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Thu, 5 Aug 2021 02:01:36 +0530 Subject: [PATCH 17/24] Use schema idx instead of SchemaElement& --- cpp/src/io/parquet/parquet.cpp | 1 - cpp/src/io/parquet/parquet.hpp | 1 - cpp/src/io/parquet/reader_impl.cu | 31 +++++++++++++++++-------------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/cpp/src/io/parquet/parquet.cpp b/cpp/src/io/parquet/parquet.cpp index 16e7471d8f1..c8c54e9933f 100644 --- a/cpp/src/io/parquet/parquet.cpp +++ b/cpp/src/io/parquet/parquet.cpp @@ -342,7 +342,6 @@ int CompactProtocolReader::WalkSchema( e->max_definition_level = max_def_level; e->max_repetition_level = max_rep_level; e->parent_idx = parent_idx; - e->self_idx = idx; parent_idx = idx; ++idx; diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 67c5ada1296..4390d1c788f 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -165,7 +165,6 @@ struct SchemaElement { int max_definition_level = 0; int max_repetition_level = 0; int parent_idx = 0; - int self_idx = 0; std::vector children_idx; bool operator==(SchemaElement const& other) const diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 9356ce9650e..71e971c67de 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -569,8 +569,7 @@ class aggregate_metadata { type_id timestamp_type_id, bool strict_decimal_types) const { - auto find_schema_child = [&](SchemaElement const& schema_elem, - std::string const& name) -> SchemaElement const& { + auto find_schema_child = [&](SchemaElement const& schema_elem, std::string const& name) { auto const& col_schema_idx = std::find_if( schema_elem.children_idx.begin(), schema_elem.children_idx.end(), @@ -579,23 +578,27 @@ class aggregate_metadata { // just the parent CUDF_EXPECTS(col_schema_idx != schema_elem.children_idx.end(), "Child \"" + name + "\" not found in \"" + schema_elem.name + "\""); - return get_schema(*col_schema_idx); + return (col_schema_idx != schema_elem.children_idx.end()) ? static_cast(*col_schema_idx) + : -1; }; std::vector output_columns; std::vector input_columns; std::deque nesting; - std::function&)> - build_column = [&](column_name_info const* col_name_info, - SchemaElement const& schema_elem, - std::vector& out_col_array) { + std::function&)> build_column = + [&](column_name_info const* col_name_info, + int schema_idx, + std::vector& out_col_array) { + if (schema_idx < 0) { return; } + auto const& schema_elem = get_schema(schema_idx); + // if I am a stub, continue on if (schema_elem.is_stub()) { // is this legit? CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub"); auto child_col_name_info = (col_name_info) ? &col_name_info->children[0] : nullptr; - build_column(child_col_name_info, get_schema(schema_elem.children_idx[0]), out_col_array); + build_column(child_col_name_info, schema_elem.children_idx[0], out_col_array); return; } @@ -617,7 +620,7 @@ class aggregate_metadata { // add all children of schema_elem. // At this point, we can no longer pass a col_name_info to build_column for (int idx = 0; idx < schema_elem.num_children; idx++) { - build_column(nullptr, get_schema(schema_elem.children_idx[idx]), output_col.children); + build_column(nullptr, schema_elem.children_idx[idx], output_col.children); } } else { for (size_t idx = 0; idx < col_name_info->children.size(); idx++) { @@ -631,7 +634,7 @@ class aggregate_metadata { // data stored) so add me to the list. if (schema_elem.num_children == 0) { input_column_info& input_col = - input_columns.emplace_back(input_column_info{schema_elem.self_idx, schema_elem.name}); + input_columns.emplace_back(input_column_info{schema_idx, schema_elem.name}); std::copy(nesting.begin(), nesting.end(), std::back_inserter(input_col.nesting)); } @@ -662,7 +665,7 @@ class aggregate_metadata { auto const& root = get_schema(0); if (use_names.empty()) { for (auto const& schema_idx : root.children_idx) { - build_column(nullptr, get_schema(schema_idx), output_columns); + build_column(nullptr, schema_idx, output_columns); output_column_schemas.push_back(schema_idx); } } else { @@ -717,9 +720,9 @@ class aggregate_metadata { } } for (auto& col : selected_columns) { - auto const& top_level_col_schema = find_schema_child(root, col.name); - build_column(&col, top_level_col_schema, output_columns); - output_column_schemas.push_back(top_level_col_schema.self_idx); + auto const& top_level_col_schema_idx = find_schema_child(root, col.name); + build_column(&col, top_level_col_schema_idx, output_columns); + output_column_schemas.push_back(top_level_col_schema_idx); } } From ba6def8cd1af67d35dc9504c2808ec80e60fe53d Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Thu, 5 Aug 2021 17:27:51 +0530 Subject: [PATCH 18/24] Allow invalid paths to be passed --- cpp/src/io/parquet/reader_impl.cu | 41 +++++++++++++++----------- python/cudf/cudf/_lib/parquet.pyx | 2 +- python/cudf/cudf/tests/test_parquet.py | 4 ++- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 71e971c67de..b959a08ae9e 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -574,10 +574,7 @@ class aggregate_metadata { schema_elem.children_idx.begin(), schema_elem.children_idx.end(), [&](size_t col_schema_idx) { return get_schema(col_schema_idx).name == name; }); - // fut: Maybe it'd be better if we could return the full path in the exception rather than - // just the parent - CUDF_EXPECTS(col_schema_idx != schema_elem.children_idx.end(), - "Child \"" + name + "\" not found in \"" + schema_elem.name + "\""); + return (col_schema_idx != schema_elem.children_idx.end()) ? static_cast(*col_schema_idx) : -1; }; @@ -586,11 +583,14 @@ class aggregate_metadata { std::vector input_columns; std::deque nesting; - std::function&)> build_column = + // Return true if column path is valid. e.g. if the path is {"struct1", "child1"}, then it is + // valid if "struct1.child1" exists in this file's schema. If "struct1" exists but "child1" is + // not a child of "struct1" then the function will return false for "struct1" + std::function&)> build_column = [&](column_name_info const* col_name_info, int schema_idx, std::vector& out_col_array) { - if (schema_idx < 0) { return; } + if (schema_idx < 0) { return false; } auto const& schema_elem = get_schema(schema_idx); // if I am a stub, continue on @@ -598,8 +598,7 @@ class aggregate_metadata { // is this legit? CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub"); auto child_col_name_info = (col_name_info) ? &col_name_info->children[0] : nullptr; - build_column(child_col_name_info, schema_elem.children_idx[0], out_col_array); - return; + return build_column(child_col_name_info, schema_elem.children_idx[0], out_col_array); } // if we're at the root, this is a new output column @@ -609,24 +608,26 @@ class aggregate_metadata { ? data_type{col_type, numeric::scale_type{-schema_elem.decimal_scale}} : data_type{col_type}; - column_buffer& output_col = - out_col_array.emplace_back(dtype, schema_elem.repetition_type == OPTIONAL ? true : false); - // store the index of this newly inserted element - nesting.push_back(static_cast(out_col_array.size()) - 1); + column_buffer output_col(dtype, schema_elem.repetition_type == OPTIONAL ? true : false); + // store the index of this element if inserted in out_col_array + nesting.push_back(static_cast(out_col_array.size())); output_col.name = schema_elem.name; // build each child + bool path_is_valid = false; if (col_name_info == nullptr or col_name_info->children.empty()) { // add all children of schema_elem. // At this point, we can no longer pass a col_name_info to build_column for (int idx = 0; idx < schema_elem.num_children; idx++) { - build_column(nullptr, schema_elem.children_idx[idx], output_col.children); + path_is_valid |= + build_column(nullptr, schema_elem.children_idx[idx], output_col.children); } } else { for (size_t idx = 0; idx < col_name_info->children.size(); idx++) { - build_column(&col_name_info->children[idx], - find_schema_child(schema_elem, col_name_info->children[idx].name), - output_col.children); + path_is_valid |= + build_column(&col_name_info->children[idx], + find_schema_child(schema_elem, col_name_info->children[idx].name), + output_col.children); } } @@ -636,9 +637,13 @@ class aggregate_metadata { input_column_info& input_col = input_columns.emplace_back(input_column_info{schema_idx, schema_elem.name}); std::copy(nesting.begin(), nesting.end(), std::back_inserter(input_col.nesting)); + path_is_valid = true; // If we're able to reach leaf then path is valid } + if (path_is_valid) { out_col_array.push_back(std::move(output_col)); } + nesting.pop_back(); + return path_is_valid; }; std::vector output_column_schemas; @@ -721,8 +726,8 @@ class aggregate_metadata { } for (auto& col : selected_columns) { auto const& top_level_col_schema_idx = find_schema_child(root, col.name); - build_column(&col, top_level_col_schema_idx, output_columns); - output_column_schemas.push_back(top_level_col_schema_idx); + bool valid_column = build_column(&col, top_level_col_schema_idx, output_columns); + if (valid_column) output_column_schemas.push_back(top_level_col_schema_idx); } } diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 5c0fa5f62bd..cee334fd6e7 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -131,7 +131,7 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, ) if columns is not None: - columns = [c.split(".") for c in columns] + columns = [c.split(".") if c else [] for c in columns] cpp_columns.reserve(len(columns)) for col in columns or []: cpp_columns.push_back([]) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 8ab0ffa4739..5e3ca45bb79 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1151,7 +1151,7 @@ def select_columns_params(): {"a": None, "b": None}, {"a": 15, "b": None}, ], - [["struct"], ["struct.a"], ["struct.b"]], + [["struct"], ["struct.a"], ["struct.b"], ["c"]], ), # struct-of-list ( @@ -1170,6 +1170,7 @@ def select_columns_params(): ["struct.c.list"], ["struct.c.list.item"], ["struct.b", "struct.c"], + ["struct.b", "struct.d", "struct.c"], ], ), # list-of-struct @@ -1185,6 +1186,7 @@ def select_columns_params(): ["struct.list"], ["struct.list.item"], ["struct.list.item.a", "struct.list.item.b"], + ["struct.list.item.c"], ], ), ] From 0627343ff23ab5b4e46e322a4509a5370e1fc965 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Thu, 12 Aug 2021 03:21:51 +0530 Subject: [PATCH 19/24] Review fix for pytest --- python/cudf/cudf/tests/test_parquet.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 5e3ca45bb79..123cc652e1f 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1198,13 +1198,12 @@ def select_columns_params(): @pytest.mark.parametrize("data, columns", select_columns_params()) def test_parquet_reader_struct_select_columns1(tmpdir, data, columns): table = pa.Table.from_pydict({"struct": data}) - fname = tmpdir.join("test_parquet_reader_struct_basic.parquet") + buff = BytesIO() - pa.parquet.write_table(table, fname) - assert os.path.exists(fname) + pa.parquet.write_table(table, buff) - expect = pq.ParquetFile(fname).read(columns=columns) - got = cudf.read_parquet(fname, columns=columns) + expect = pq.ParquetFile(buff).read(columns=columns) + got = cudf.read_parquet(buff, columns=columns) assert expect.equals(got.to_arrow()) From ef0b5754a3ee60f23d45ecee94d28dfdc783f31b Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Fri, 13 Aug 2021 00:08:24 +0530 Subject: [PATCH 20/24] Review cpp fixes --- cpp/src/io/parquet/reader_impl.cu | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index b959a08ae9e..dd45ed0a55b 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -571,8 +571,8 @@ class aggregate_metadata { { auto find_schema_child = [&](SchemaElement const& schema_elem, std::string const& name) { auto const& col_schema_idx = std::find_if( - schema_elem.children_idx.begin(), - schema_elem.children_idx.end(), + schema_elem.children_idx.cbegin(), + schema_elem.children_idx.cend(), [&](size_t col_schema_idx) { return get_schema(col_schema_idx).name == name; }); return (col_schema_idx != schema_elem.children_idx.end()) ? static_cast(*col_schema_idx) @@ -608,7 +608,7 @@ class aggregate_metadata { ? data_type{col_type, numeric::scale_type{-schema_elem.decimal_scale}} : data_type{col_type}; - column_buffer output_col(dtype, schema_elem.repetition_type == OPTIONAL ? true : false); + column_buffer output_col(dtype, schema_elem.repetition_type == OPTIONAL); // store the index of this element if inserted in out_col_array nesting.push_back(static_cast(out_col_array.size())); output_col.name = schema_elem.name; @@ -636,7 +636,7 @@ class aggregate_metadata { if (schema_elem.num_children == 0) { input_column_info& input_col = input_columns.emplace_back(input_column_info{schema_idx, schema_elem.name}); - std::copy(nesting.begin(), nesting.end(), std::back_inserter(input_col.nesting)); + std::copy(nesting.cbegin(), nesting.cend(), std::back_inserter(input_col.nesting)); path_is_valid = true; // If we're able to reach leaf then path is valid } @@ -677,10 +677,10 @@ class aggregate_metadata { std::vector selected_columns; if (include_index) { std::vector index_names = get_pandas_index_names(); - std::transform(index_names.begin(), - index_names.end(), + std::transform(index_names.cbegin(), + index_names.cend(), std::back_inserter(selected_columns), - [](std::string name) { return column_name_info(name); }); + [](std::string const& name) { return column_name_info(name); }); } // Merge the vector use_names into a set of hierarchical column_name_info objects /* This is because if we have columns like this: From 9c5c5a76f4be3229c67878fcf81198f6022c5776 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 18 Aug 2021 00:03:25 +0530 Subject: [PATCH 21/24] Change the logic so that . can be in col name --- cpp/src/io/parquet/reader_impl.cu | 58 ++++++++++++++++++++++++++ python/cudf/cudf/tests/test_parquet.py | 13 +++++- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index dd45ed0a55b..d4e9315f1d3 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -674,6 +674,64 @@ class aggregate_metadata { output_column_schemas.push_back(schema_idx); } } else { + // Convert schema into a vector of every possible path + std::vector> all_paths; + std::function add_path = [&](std::string path_till_now, + int schema_idx) { + auto const& schema_elem = get_schema(schema_idx); + std::string curr_path = path_till_now + schema_elem.name; + all_paths.emplace_back(curr_path, schema_idx); + for (auto const& child_idx : schema_elem.children_idx) { + add_path(curr_path + ".", child_idx); + } + }; + for (auto const& child_idx : get_schema(0).children_idx) { + add_path("", child_idx); + } + + // Find which of the selected paths are valid and get their schema index + + // TODO: converting vec into . separated str. Change API to take str + std::vector use_names2; + std::transform(use_names.begin(), + use_names.end(), + std::back_inserter(use_names2), + [](std::vector vec_path) -> std::string { + return std::accumulate(std::next(vec_path.begin()), + vec_path.end(), + vec_path[0], + [](std::string cumulate, std::string node) { + return cumulate + "." + node; + }); + }); + + std::vector> valid_selected_paths; + for (auto const& selected_path : use_names2) { + auto found_path = std::find_if( + all_paths.begin(), all_paths.end(), [&](std::pair& valid_path) { + return valid_path.first == selected_path; + }); + if (found_path != all_paths.end()) { + valid_selected_paths.emplace_back(selected_path, found_path->second); + } + } + + // Now construct paths as vector of strings for further consumption + std::vector> use_names3; + std::transform(valid_selected_paths.begin(), + valid_selected_paths.end(), + std::back_inserter(use_names3), + [&](std::pair const& valid_path) { + auto schema_idx = valid_path.second; + std::vector result_path; + do { + SchemaElement const& elem = get_schema(schema_idx); + result_path.push_back(elem.name); + schema_idx = elem.parent_idx; + } while (schema_idx > 0); + return std::vector(result_path.rbegin(), result_path.rend()); + }); + std::vector selected_columns; if (include_index) { std::vector index_names = get_pandas_index_names(); diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 123cc652e1f..e4a61a2a37e 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1189,6 +1189,17 @@ def select_columns_params(): ["struct.list.item.c"], ], ), + # struct with "." in field names + ( + [ + {"a.b": 1, "b.a": 2}, + {"a.b": 10, "b.a": 20}, + {"a.b": None, "b.a": 22}, + {"a.b": None, "b.a": None}, + {"a.b": 15, "b.a": None}, + ], + [["struct"], ["struct.a"], ["struct.b.a"]], + ), ] for df_col_pair in dfs: for cols in df_col_pair[1]: @@ -1196,7 +1207,7 @@ def select_columns_params(): @pytest.mark.parametrize("data, columns", select_columns_params()) -def test_parquet_reader_struct_select_columns1(tmpdir, data, columns): +def test_parquet_reader_struct_select_columns(tmpdir, data, columns): table = pa.Table.from_pydict({"struct": data}) buff = BytesIO() From 1e4427e72bfeeff12d41ade83754179409d4ac55 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 18 Aug 2021 01:31:58 +0530 Subject: [PATCH 22/24] Revert select columns API to single str --- cpp/include/cudf/io/parquet.hpp | 12 ++++-------- cpp/src/io/parquet/reader_impl.cu | 21 +++----------------- cpp/tests/io/arrow_io_source_test.cpp | 4 ++-- cpp/tests/io/parquet_test.cpp | 28 ++++++++++++--------------- 4 files changed, 21 insertions(+), 44 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 843131e379e..031228ae6de 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -51,8 +51,7 @@ class parquet_reader_options { source_info _source; // Path in schema of column to read; empty is all - // TODO: A more descriptive doc that mentions how things work when using path to field of nested - std::vector> _columns; + std::vector _columns; // List of individual row groups to read (ignored if empty) std::vector> _row_groups; @@ -126,7 +125,7 @@ class parquet_reader_options { /** * @brief Returns names of column to be read. */ - std::vector> const& get_columns() const { return _columns; } + std::vector const& get_columns() const { return _columns; } /** * @brief Returns list of individual row groups to be read. @@ -149,10 +148,7 @@ class parquet_reader_options { * * @param col_names Vector of column names. */ - void set_columns(std::vector> col_names) - { - _columns = std::move(col_names); - } + void set_columns(std::vector col_names) { _columns = std::move(col_names); } /** * @brief Sets vector of individual row groups to read. @@ -250,7 +246,7 @@ class parquet_reader_options_builder { * @param col_names Vector of column names. * @return this for chaining. */ - parquet_reader_options_builder& columns(std::vector> col_names) + parquet_reader_options_builder& columns(std::vector col_names) { options._columns = std::move(col_names); return *this; diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index d4e9315f1d3..f264064fe61 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -563,7 +563,7 @@ class aggregate_metadata { * @return input column information, output column information, list of output column schema * indices */ - auto select_columns(std::vector> const& use_names, + auto select_columns(std::vector const& use_names, bool include_index, bool strings_to_categorical, type_id timestamp_type_id, @@ -690,23 +690,8 @@ class aggregate_metadata { } // Find which of the selected paths are valid and get their schema index - - // TODO: converting vec into . separated str. Change API to take str - std::vector use_names2; - std::transform(use_names.begin(), - use_names.end(), - std::back_inserter(use_names2), - [](std::vector vec_path) -> std::string { - return std::accumulate(std::next(vec_path.begin()), - vec_path.end(), - vec_path[0], - [](std::string cumulate, std::string node) { - return cumulate + "." + node; - }); - }); - std::vector> valid_selected_paths; - for (auto const& selected_path : use_names2) { + for (auto const& selected_path : use_names) { auto found_path = std::find_if( all_paths.begin(), all_paths.end(), [&](std::pair& valid_path) { return valid_path.first == selected_path; @@ -764,7 +749,7 @@ class aggregate_metadata { * | * f5 */ - for (auto const& path : use_names) { + for (auto const& path : use_names3) { auto array_to_find_in = &selected_columns; for (size_t depth = 0; depth < path.size(); ++depth) { // Check if the path exists in our selected_columns and if not, add it. diff --git a/cpp/tests/io/arrow_io_source_test.cpp b/cpp/tests/io/arrow_io_source_test.cpp index 5d0df496dcf..24964db5f8c 100644 --- a/cpp/tests/io/arrow_io_source_test.cpp +++ b/cpp/tests/io/arrow_io_source_test.cpp @@ -71,8 +71,8 @@ TEST_F(ArrowIOTest, S3FileSystem) // Populate the Parquet Reader Options cudf::io::source_info src(datasource.get()); - std::vector> single_column; - single_column.insert(single_column.begin(), {"total_bill"}); + std::vector single_column; + single_column.insert(single_column.begin(), "total_bill"); cudf::io::parquet_reader_options_builder builder(src); cudf::io::parquet_reader_options options = builder.columns(single_column).build(); diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 48b4a383b9d..3c323c9bcde 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -827,7 +827,7 @@ TEST_F(ParquetWriterTest, MultiIndex) cudf_io::parquet_reader_options in_opts = cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) .use_pandas_metadata(true) - .columns({{"int32s"}, {"floats"}, {"doubles"}}); + .columns({"int32s", "floats", "doubles"}); auto result = cudf_io::read_parquet(in_opts); CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result.tbl->view()); @@ -2408,8 +2408,7 @@ TEST_F(ParquetReaderTest, ReorderedColumns) // read them out of order cudf_io::parquet_reader_options read_opts = - cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) - .columns({{"b"}, {"a"}}); + cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).columns({"b", "a"}); auto result = cudf_io::read_parquet(read_opts); cudf::test::expect_columns_equal(result.tbl->view().column(0), b); @@ -2431,8 +2430,7 @@ TEST_F(ParquetReaderTest, ReorderedColumns) // read them out of order cudf_io::parquet_reader_options read_opts = - cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) - .columns({{"b"}, {"a"}}); + cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).columns({"b", "a"}); auto result = cudf_io::read_parquet(read_opts); cudf::test::expect_columns_equal(result.tbl->view().column(0), b); @@ -2461,7 +2459,7 @@ TEST_F(ParquetReaderTest, ReorderedColumns) // read them out of order cudf_io::parquet_reader_options read_opts = cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) - .columns({{"d"}, {"a"}, {"b"}, {"c"}}); + .columns({"d", "a", "b", "c"}); auto result = cudf_io::read_parquet(read_opts); cudf::test::expect_columns_equal(result.tbl->view().column(0), d); @@ -2474,7 +2472,7 @@ TEST_F(ParquetReaderTest, ReorderedColumns) // read them out of order cudf_io::parquet_reader_options read_opts = cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) - .columns({{"c"}, {"d"}, {"a"}, {"b"}}); + .columns({"c", "d", "a", "b"}); auto result = cudf_io::read_parquet(read_opts); cudf::test::expect_columns_equal(result.tbl->view().column(0), c); @@ -2487,7 +2485,7 @@ TEST_F(ParquetReaderTest, ReorderedColumns) // read them out of order cudf_io::parquet_reader_options read_opts = cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) - .columns({{"d"}, {"c"}, {"b"}, {"a"}}); + .columns({"d", "c", "b", "a"}); auto result = cudf_io::read_parquet(read_opts); cudf::test::expect_columns_equal(result.tbl->view().column(0), d); @@ -2538,7 +2536,7 @@ TEST_F(ParquetReaderTest, SelectNestedColumn) { // Test selecting a single leaf from the table cudf_io::parquet_reader_options read_args = cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath)) - .columns({{"being", "particulars", "age"}}); + .columns({"being.particulars.age"}); const auto result = cudf_io::read_parquet(read_args); auto expect_ages_col = cudf::test::fixed_width_column_wrapper{ @@ -2560,7 +2558,7 @@ TEST_F(ParquetReaderTest, SelectNestedColumn) { // Test selecting a non-leaf and expecting all hierarchy from that node onwards cudf_io::parquet_reader_options read_args = cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath)) - .columns({{"being", "particulars"}}); + .columns({"being.particulars"}); const auto result = cudf_io::read_parquet(read_args); auto expected_weights_col = @@ -2589,9 +2587,7 @@ TEST_F(ParquetReaderTest, SelectNestedColumn) { // Test selecting struct children out of order cudf_io::parquet_reader_options read_args = cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath)) - .columns({{"being", "particulars", "age"}, - {"being", "particulars", "weight"}, - {"being", "human?"}}); + .columns({"being.particulars.age", "being.particulars.weight", "being.human?"}); const auto result = cudf_io::read_parquet(read_args); auto expected_weights_col = @@ -2842,7 +2838,7 @@ TEST_F(ParquetReaderTest, DecimalRead) cudf_io::parquet_reader_options read_strict_opts = read_opts; read_strict_opts.set_strict_decimal_types(true); - read_strict_opts.set_columns({{"dec7p4"}, {"dec14p5"}}); + read_strict_opts.set_columns({"dec7p4", "dec14p5"}); EXPECT_NO_THROW(cudf_io::read_parquet(read_strict_opts)); } { @@ -2940,7 +2936,7 @@ TEST_F(ParquetReaderTest, DecimalRead) cudf_io::parquet_reader_options::builder(cudf_io::source_info{ reinterpret_cast(fixed_len_bytes_decimal_parquet), parquet_len}); read_opts.set_strict_decimal_types(true); - read_opts.set_columns({{"dec7p3"}, {"dec12p11"}}); + read_opts.set_columns({"dec7p3", "dec12p11"}); auto result = cudf_io::read_parquet(read_opts); EXPECT_EQ(result.tbl->view().num_columns(), 2); @@ -2985,7 +2981,7 @@ TEST_F(ParquetReaderTest, DecimalRead) std::begin(col1_data), std::end(col1_data), validity_c1, numeric::scale_type{-11}); cudf::test::expect_columns_equal(result.tbl->view().column(1), col1); - read_opts.set_columns({{"dec20p1"}}); + read_opts.set_columns({"dec20p1"}); EXPECT_THROW(cudf_io::read_parquet(read_opts), cudf::logic_error); } } From fa58f2c001b5a594fd71b8d9d8f33443560656f2 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 18 Aug 2021 01:37:44 +0530 Subject: [PATCH 23/24] Revert API in python --- python/cudf/cudf/_lib/cpp/io/parquet.pxd | 6 +++--- python/cudf/cudf/_lib/parquet.pyx | 8 ++------ 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd index 12d32b9c6bd..e2053f8ce4f 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd @@ -16,7 +16,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cdef cppclass parquet_reader_options: parquet_reader_options() except + cudf_io_types.source_info get_source_info() except + - vector[vector[string]] get_columns() except + + vector[string] get_columns() except + vector[vector[size_type]] get_row_groups() except + data_type get_timestamp_type() except + bool is_enabled_convert_strings_to_categories() except + @@ -26,7 +26,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: # setter - void set_columns(vector[vector[string]] col_names) except + + void set_columns(vector[string] col_names) except + void set_row_groups(vector[vector[size_type]] row_grp) except + void enable_convert_strings_to_categories(bool val) except + void enable_use_pandas_metadata(bool val) except + @@ -45,7 +45,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_io_types.source_info src ) except + parquet_reader_options_builder& columns( - vector[vector[string]] col_names + vector[string] col_names ) except + parquet_reader_options_builder& row_groups( vector[vector[size_type]] row_grp diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index d50221fbbf2..471aa3107d9 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -120,7 +120,7 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, cdef cudf_io_types.source_info source = make_source_info( filepaths_or_buffers) - cdef vector[vector[string]] cpp_columns + cdef vector[string] cpp_columns cdef bool cpp_strings_to_categorical = strings_to_categorical cdef bool cpp_use_pandas_metadata = use_pandas_metadata cdef size_type cpp_skiprows = skiprows if skiprows is not None else 0 @@ -131,13 +131,9 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, ) if columns is not None: - columns = [c.split(".") if c else [] for c in columns] cpp_columns.reserve(len(columns)) for col in columns or []: - cpp_columns.push_back([]) - for path in col: - cpp_columns.back().push_back(str(path).encode()) - + cpp_columns.push_back(str(col).encode()) if row_groups is not None: cpp_row_groups = row_groups From 1c9ab5b0a6d53bc6b2e1050a189f3c3e81c5b262 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Thu, 19 Aug 2021 00:10:30 +0530 Subject: [PATCH 24/24] Review fixes --- cpp/src/io/parquet/reader_impl.cu | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index f264064fe61..caf11b66206 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -581,7 +581,7 @@ class aggregate_metadata { std::vector output_columns; std::vector input_columns; - std::deque nesting; + std::vector nesting; // Return true if column path is valid. e.g. if the path is {"struct1", "child1"}, then it is // valid if "struct1.child1" exists in this file's schema. If "struct1" exists but "child1" is @@ -593,7 +593,8 @@ class aggregate_metadata { if (schema_idx < 0) { return false; } auto const& schema_elem = get_schema(schema_idx); - // if I am a stub, continue on + // if schema_elem is a stub then it does not exist in the column_name_info and column_buffer + // hierarchy. So continue on if (schema_elem.is_stub()) { // is this legit? CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub"); @@ -674,13 +675,18 @@ class aggregate_metadata { output_column_schemas.push_back(schema_idx); } } else { + struct path_info { + std::string full_path; + int schema_idx; + }; + // Convert schema into a vector of every possible path - std::vector> all_paths; + std::vector all_paths; std::function add_path = [&](std::string path_till_now, int schema_idx) { auto const& schema_elem = get_schema(schema_idx); std::string curr_path = path_till_now + schema_elem.name; - all_paths.emplace_back(curr_path, schema_idx); + all_paths.push_back({curr_path, schema_idx}); for (auto const& child_idx : schema_elem.children_idx) { add_path(curr_path + ".", child_idx); } @@ -690,14 +696,14 @@ class aggregate_metadata { } // Find which of the selected paths are valid and get their schema index - std::vector> valid_selected_paths; + std::vector valid_selected_paths; for (auto const& selected_path : use_names) { - auto found_path = std::find_if( - all_paths.begin(), all_paths.end(), [&](std::pair& valid_path) { - return valid_path.first == selected_path; + auto found_path = + std::find_if(all_paths.begin(), all_paths.end(), [&](path_info& valid_path) { + return valid_path.full_path == selected_path; }); if (found_path != all_paths.end()) { - valid_selected_paths.emplace_back(selected_path, found_path->second); + valid_selected_paths.push_back({selected_path, found_path->schema_idx}); } } @@ -706,8 +712,8 @@ class aggregate_metadata { std::transform(valid_selected_paths.begin(), valid_selected_paths.end(), std::back_inserter(use_names3), - [&](std::pair const& valid_path) { - auto schema_idx = valid_path.second; + [&](path_info const& valid_path) { + auto schema_idx = valid_path.schema_idx; std::vector result_path; do { SchemaElement const& elem = get_schema(schema_idx);