From 80bf8d9f2fa2719d20e6d7d4c8d12ea3448324cf Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Thu, 19 Aug 2021 14:42:28 +0530 Subject: [PATCH] Add nested column selection to parquet reader (#8933) Closes #8850 Adds ability to select specific children of a nested column. The python API mimics pyarrow and the format is ```python cudf.read_parquet("test.parquet", columns=["struct1.child1.grandchild2", "struct1.child2"]) ``` The C++ API takes each path as a vector ```c++ cudf::io::parquet_reader_options read_args = cudf::io::parquet_reader_options::builder(cudf::io::source_info(filepath)) .columns({{"struct1", "child1", "grandchild2"}, {"struct1", "child2"}}); ``` Authors: - Devavret Makkar (https://github.com/devavret) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - Vukasin Milovanovic (https://github.com/vuule) - Christopher Harris (https://github.com/cwharris) URL: https://github.com/rapidsai/cudf/pull/8933 --- cpp/include/cudf/io/parquet.hpp | 2 +- cpp/src/io/parquet/parquet.cpp | 1 + cpp/src/io/parquet/parquet.hpp | 1 + cpp/src/io/parquet/reader_impl.cu | 310 ++++++++++++++++--------- cpp/tests/io/parquet_test.cpp | 131 ++++++++++- python/cudf/cudf/io/parquet.py | 4 + python/cudf/cudf/tests/test_parquet.py | 110 +++++++-- 7 files changed, 420 insertions(+), 139 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index ecd9607a87e..031228ae6de 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -50,7 +50,7 @@ class parquet_reader_options_builder; class parquet_reader_options { source_info _source; - // Names of column to read; empty is all + // Path in schema of column to read; empty is all std::vector _columns; // List of individual row groups to read (ignored if empty) diff --git a/cpp/src/io/parquet/parquet.cpp b/cpp/src/io/parquet/parquet.cpp index 6c658788fa1..c8c54e9933f 100644 --- a/cpp/src/io/parquet/parquet.cpp +++ b/cpp/src/io/parquet/parquet.cpp @@ -347,6 +347,7 @@ int CompactProtocolReader::WalkSchema( ++idx; if (e->num_children > 0) { for (int i = 0; i < e->num_children; i++) { + e->children_idx.push_back(idx); int idx_old = idx; idx = WalkSchema(md, idx, parent_idx, max_def_level, max_rep_level); if (idx <= idx_old) break; // Error diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 2232017409d..4390d1c788f 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -165,6 +165,7 @@ struct SchemaElement { int max_definition_level = 0; int max_repetition_level = 0; int parent_idx = 0; + std::vector children_idx; bool operator==(SchemaElement const& other) const { diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 9f9bdfd4755..caf11b66206 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -464,8 +464,9 @@ class aggregate_metadata { * * @param names List of column names to load, where index column name(s) will be added */ - void add_pandas_index_names(std::vector& names) const + std::vector get_pandas_index_names() const { + std::vector names; auto str = get_pandas_index(); if (str.length() != 0) { std::regex index_name_expr{R"(\"((?:\\.|[^\"])*)\")"}; @@ -480,6 +481,7 @@ class aggregate_metadata { str = sm.suffix(); } } + return names; } struct row_group_info { @@ -549,86 +551,14 @@ class aggregate_metadata { return selection; } - /** - * @brief Build input and output column structures based on schema input. Recursive. - * - * @param[in,out] schema_idx Schema index to build information for. This value gets - * incremented as the function recurses. - * @param[out] input_columns Input column information (source data in the file) - * @param[out] output_columns Output column structure (resulting cudf columns) - * @param[in,out] nesting A stack keeping track of child column indices so we can - * reproduce the linear list of output columns that correspond to an input column. - * @param[in] strings_to_categorical Type conversion parameter - * @param[in] timestamp_type_id Type conversion parameter - * @param[in] strict_decimal_types True if it is an error to load an unsupported decimal type - * - */ - void build_column_info(int& schema_idx, - std::vector& input_columns, - std::vector& output_columns, - std::deque& nesting, - bool strings_to_categorical, - type_id timestamp_type_id, - bool strict_decimal_types) const - { - int start_schema_idx = schema_idx; - auto const& schema = get_schema(schema_idx); - schema_idx++; - - // if I am a stub, continue on - if (schema.is_stub()) { - // is this legit? - CUDF_EXPECTS(schema.num_children == 1, "Unexpected number of children for stub"); - build_column_info(schema_idx, - input_columns, - output_columns, - nesting, - strings_to_categorical, - timestamp_type_id, - strict_decimal_types); - return; - } - - // if we're at the root, this is a new output column - nesting.push_back(static_cast(output_columns.size())); - auto const col_type = - to_type_id(schema, strings_to_categorical, timestamp_type_id, strict_decimal_types); - auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64 - ? data_type{col_type, numeric::scale_type{-schema.decimal_scale}} - : data_type{col_type}; - output_columns.emplace_back(dtype, schema.repetition_type == OPTIONAL ? true : false); - column_buffer& output_col = output_columns.back(); - output_col.name = schema.name; - - // build each child - for (int idx = 0; idx < schema.num_children; idx++) { - build_column_info(schema_idx, - input_columns, - output_col.children, - nesting, - strings_to_categorical, - timestamp_type_id, - strict_decimal_types); - } - - // if I have no children, we're at a leaf and I'm an input column (that is, one with actual - // data stored) so add me to the list. - if (schema.num_children == 0) { - input_columns.emplace_back(input_column_info{start_schema_idx, schema.name}); - input_column_info& input_col = input_columns.back(); - std::copy(nesting.begin(), nesting.end(), std::back_inserter(input_col.nesting)); - } - - nesting.pop_back(); - } - /** * @brief Filters and reduces down to a selection of columns * - * @param use_names List of column names to select + * @param use_names List of paths of column names to select * @param include_index Whether to always include the PANDAS index column(s) * @param strings_to_categorical Type conversion parameter * @param timestamp_type_id Type conversion parameter + * @param strict_decimal_types Type conversion parameter * * @return input column information, output column information, list of output column schema * indices @@ -639,9 +569,86 @@ class aggregate_metadata { type_id timestamp_type_id, bool strict_decimal_types) const { - auto const& pfm = per_file_metadata[0]; + auto find_schema_child = [&](SchemaElement const& schema_elem, std::string const& name) { + auto const& col_schema_idx = std::find_if( + schema_elem.children_idx.cbegin(), + schema_elem.children_idx.cend(), + [&](size_t col_schema_idx) { return get_schema(col_schema_idx).name == name; }); + + return (col_schema_idx != schema_elem.children_idx.end()) ? static_cast(*col_schema_idx) + : -1; + }; + + std::vector output_columns; + std::vector input_columns; + std::vector nesting; + + // Return true if column path is valid. e.g. if the path is {"struct1", "child1"}, then it is + // valid if "struct1.child1" exists in this file's schema. If "struct1" exists but "child1" is + // not a child of "struct1" then the function will return false for "struct1" + std::function&)> build_column = + [&](column_name_info const* col_name_info, + int schema_idx, + std::vector& out_col_array) { + if (schema_idx < 0) { return false; } + auto const& schema_elem = get_schema(schema_idx); + + // if schema_elem is a stub then it does not exist in the column_name_info and column_buffer + // hierarchy. So continue on + if (schema_elem.is_stub()) { + // is this legit? + CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub"); + auto child_col_name_info = (col_name_info) ? &col_name_info->children[0] : nullptr; + return build_column(child_col_name_info, schema_elem.children_idx[0], out_col_array); + } + + // if we're at the root, this is a new output column + auto const col_type = + to_type_id(schema_elem, strings_to_categorical, timestamp_type_id, strict_decimal_types); + auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64 + ? data_type{col_type, numeric::scale_type{-schema_elem.decimal_scale}} + : data_type{col_type}; + + column_buffer output_col(dtype, schema_elem.repetition_type == OPTIONAL); + // store the index of this element if inserted in out_col_array + nesting.push_back(static_cast(out_col_array.size())); + output_col.name = schema_elem.name; + + // build each child + bool path_is_valid = false; + if (col_name_info == nullptr or col_name_info->children.empty()) { + // add all children of schema_elem. + // At this point, we can no longer pass a col_name_info to build_column + for (int idx = 0; idx < schema_elem.num_children; idx++) { + path_is_valid |= + build_column(nullptr, schema_elem.children_idx[idx], output_col.children); + } + } else { + for (size_t idx = 0; idx < col_name_info->children.size(); idx++) { + path_is_valid |= + build_column(&col_name_info->children[idx], + find_schema_child(schema_elem, col_name_info->children[idx].name), + output_col.children); + } + } + + // if I have no children, we're at a leaf and I'm an input column (that is, one with actual + // data stored) so add me to the list. + if (schema_elem.num_children == 0) { + input_column_info& input_col = + input_columns.emplace_back(input_column_info{schema_idx, schema_elem.name}); + std::copy(nesting.cbegin(), nesting.cend(), std::back_inserter(input_col.nesting)); + path_is_valid = true; // If we're able to reach leaf then path is valid + } + + if (path_is_valid) { out_col_array.push_back(std::move(output_col)); } + + nesting.pop_back(); + return path_is_valid; + }; + + std::vector output_column_schemas; - // determine the list of output columns // // there is not necessarily a 1:1 mapping between input columns and output columns. // For example, parquet does not explicitly store a ColumnChunkDesc for struct columns. @@ -657,43 +664,120 @@ class aggregate_metadata { // "firstname", "middlename" and "lastname" represent the input columns in the file that we // process to produce the final cudf "name" column. // - std::vector output_column_schemas; + // A user can ask for a single field out of the struct e.g. firstname. + // In this case they'll pass a fully qualified name to the schema element like + // ["name", "firstname"] + // + auto const& root = get_schema(0); if (use_names.empty()) { - // walk the schema and choose all top level columns - for (size_t schema_idx = 1; schema_idx < pfm.schema.size(); schema_idx++) { - auto const& schema = pfm.schema[schema_idx]; - if (schema.parent_idx == 0) { output_column_schemas.push_back(schema_idx); } + for (auto const& schema_idx : root.children_idx) { + build_column(nullptr, schema_idx, output_columns); + output_column_schemas.push_back(schema_idx); } } else { - // Load subset of columns; include PANDAS index unless excluded - std::vector local_use_names = use_names; - if (include_index) { add_pandas_index_names(local_use_names); } - for (const auto& use_name : local_use_names) { - for (size_t schema_idx = 1; schema_idx < pfm.schema.size(); schema_idx++) { - auto const& schema = pfm.schema[schema_idx]; - // We select only top level columns by name. Selecting nested columns by name is not - // supported. Top level columns are identified by their parent being the root (idx == 0) - if (use_name == schema.name and schema.parent_idx == 0) { - output_column_schemas.push_back(schema_idx); - } + struct path_info { + std::string full_path; + int schema_idx; + }; + + // Convert schema into a vector of every possible path + std::vector all_paths; + std::function add_path = [&](std::string path_till_now, + int schema_idx) { + auto const& schema_elem = get_schema(schema_idx); + std::string curr_path = path_till_now + schema_elem.name; + all_paths.push_back({curr_path, schema_idx}); + for (auto const& child_idx : schema_elem.children_idx) { + add_path(curr_path + ".", child_idx); } + }; + for (auto const& child_idx : get_schema(0).children_idx) { + add_path("", child_idx); } - } - // construct input and output output column info - std::vector output_columns; - output_columns.reserve(output_column_schemas.size()); - std::vector input_columns; - std::deque nesting; - for (size_t idx = 0; idx < output_column_schemas.size(); idx++) { - int schema_index = output_column_schemas[idx]; - build_column_info(schema_index, - input_columns, - output_columns, - nesting, - strings_to_categorical, - timestamp_type_id, - strict_decimal_types); + // Find which of the selected paths are valid and get their schema index + std::vector valid_selected_paths; + for (auto const& selected_path : use_names) { + auto found_path = + std::find_if(all_paths.begin(), all_paths.end(), [&](path_info& valid_path) { + return valid_path.full_path == selected_path; + }); + if (found_path != all_paths.end()) { + valid_selected_paths.push_back({selected_path, found_path->schema_idx}); + } + } + + // Now construct paths as vector of strings for further consumption + std::vector> use_names3; + std::transform(valid_selected_paths.begin(), + valid_selected_paths.end(), + std::back_inserter(use_names3), + [&](path_info const& valid_path) { + auto schema_idx = valid_path.schema_idx; + std::vector result_path; + do { + SchemaElement const& elem = get_schema(schema_idx); + result_path.push_back(elem.name); + schema_idx = elem.parent_idx; + } while (schema_idx > 0); + return std::vector(result_path.rbegin(), result_path.rend()); + }); + + std::vector selected_columns; + if (include_index) { + std::vector index_names = get_pandas_index_names(); + std::transform(index_names.cbegin(), + index_names.cend(), + std::back_inserter(selected_columns), + [](std::string const& name) { return column_name_info(name); }); + } + // Merge the vector use_names into a set of hierarchical column_name_info objects + /* This is because if we have columns like this: + * col1 + * / \ + * s3 f4 + * / \ + * f5 f6 + * + * there may be common paths in use_names like: + * {"col1", "s3", "f5"}, {"col1", "f4"} + * which means we want the output to contain + * col1 + * / \ + * s3 f4 + * / + * f5 + * + * rather than + * col1 col1 + * | | + * s3 f4 + * | + * f5 + */ + for (auto const& path : use_names3) { + auto array_to_find_in = &selected_columns; + for (size_t depth = 0; depth < path.size(); ++depth) { + // Check if the path exists in our selected_columns and if not, add it. + auto const& name_to_find = path[depth]; + auto found_col = std::find_if( + array_to_find_in->begin(), + array_to_find_in->end(), + [&name_to_find](column_name_info const& col) { return col.name == name_to_find; }); + if (found_col == array_to_find_in->end()) { + auto& col = array_to_find_in->emplace_back(name_to_find); + array_to_find_in = &col.children; + } else { + // Path exists. go down further. + array_to_find_in = &found_col->children; + } + } + } + for (auto& col : selected_columns) { + auto const& top_level_col_schema_idx = find_schema_child(root, col.name); + bool valid_column = build_column(&col, top_level_col_schema_idx, output_columns); + if (valid_column) output_column_schemas.push_back(top_level_col_schema_idx); + } } return std::make_tuple( @@ -1581,18 +1665,16 @@ table_with_metadata reader::impl::read(size_type skip_rows, // create the final output cudf columns for (size_t i = 0; i < _output_columns.size(); ++i) { - out_metadata.schema_info.push_back(column_name_info{""}); - out_columns.emplace_back( - make_column(_output_columns[i], &out_metadata.schema_info.back(), stream, _mr)); + column_name_info& col_name = out_metadata.schema_info.emplace_back(""); + out_columns.emplace_back(make_column(_output_columns[i], &col_name, stream, _mr)); } } } // Create empty columns as needed (this can happen if we've ended up with no actual data to read) for (size_t i = out_columns.size(); i < _output_columns.size(); ++i) { - out_metadata.schema_info.push_back(column_name_info{""}); - out_columns.emplace_back(cudf::io::detail::empty_like( - _output_columns[i], &out_metadata.schema_info.back(), stream, _mr)); + column_name_info& col_name = out_metadata.schema_info.emplace_back(""); + out_columns.emplace_back(io::detail::empty_like(_output_columns[i], &col_name, stream, _mr)); } // Return column names (must match order of returned columns) diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 70b4bd1d873..7260aa9e686 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -816,7 +816,7 @@ TEST_F(ParquetWriterTest, MultiIndex) expected_metadata.column_metadata[3].set_name("floats"); expected_metadata.column_metadata[4].set_name("doubles"); expected_metadata.user_data.insert( - {"pandas", "\"index_columns\": [\"floats\", \"doubles\"], \"column1\": [\"int8s\"]"}); + {"pandas", "\"index_columns\": [\"int8s\", \"int16s\"], \"column1\": [\"int32s\"]"}); auto filepath = temp_env->get_temp_filepath("MultiIndex.parquet"); cudf_io::parquet_writer_options out_opts = @@ -827,7 +827,7 @@ TEST_F(ParquetWriterTest, MultiIndex) cudf_io::parquet_reader_options in_opts = cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) .use_pandas_metadata(true) - .columns({"int8s", "int16s", "int32s"}); + .columns({"int32s", "floats", "doubles"}); auto result = cudf_io::read_parquet(in_opts); CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result.tbl->view()); @@ -967,8 +967,6 @@ TEST_F(ParquetWriterTest, StructOfList) auto struct_2 = cudf::test::structs_column_wrapper{{is_human_col, struct_1}, {0, 1, 1, 1, 1, 1}}.release(); - // cudf::test::print(struct_2->child(1).child(2)); - auto expected = table_view({*struct_2}); cudf_io::table_input_metadata expected_metadata(expected); @@ -2497,6 +2495,131 @@ TEST_F(ParquetReaderTest, ReorderedColumns) } } +TEST_F(ParquetReaderTest, SelectNestedColumn) +{ + // Struct>, + // flats:List> + // > + // > + + auto weights_col = cudf::test::fixed_width_column_wrapper{1.1, 2.4, 5.3, 8.0, 9.6, 6.9}; + + auto ages_col = + cudf::test::fixed_width_column_wrapper{{48, 27, 25, 31, 351, 351}, {1, 1, 1, 1, 1, 0}}; + + auto struct_1 = cudf::test::structs_column_wrapper{{weights_col, ages_col}, {1, 1, 1, 1, 0, 1}}; + + auto is_human_col = cudf::test::fixed_width_column_wrapper{ + {true, true, false, false, false, false}, {1, 1, 0, 1, 1, 0}}; + + auto struct_2 = + cudf::test::structs_column_wrapper{{is_human_col, struct_1}, {0, 1, 1, 1, 1, 1}}.release(); + + auto input = table_view({*struct_2}); + + cudf_io::table_input_metadata input_metadata(input); + input_metadata.column_metadata[0].set_name("being"); + input_metadata.column_metadata[0].child(0).set_name("human?"); + input_metadata.column_metadata[0].child(1).set_name("particulars"); + input_metadata.column_metadata[0].child(1).child(0).set_name("weight"); + input_metadata.column_metadata[0].child(1).child(1).set_name("age"); + + auto filepath = temp_env->get_temp_filepath("SelectNestedColumn.parquet"); + cudf_io::parquet_writer_options args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, input) + .metadata(&input_metadata); + cudf_io::write_parquet(args); + + { // Test selecting a single leaf from the table + cudf_io::parquet_reader_options read_args = + cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath)) + .columns({"being.particulars.age"}); + const auto result = cudf_io::read_parquet(read_args); + + auto expect_ages_col = cudf::test::fixed_width_column_wrapper{ + {48, 27, 25, 31, 351, 351}, {1, 1, 1, 1, 1, 0}}; + auto expect_s_1 = cudf::test::structs_column_wrapper{{expect_ages_col}, {1, 1, 1, 1, 0, 1}}; + auto expect_s_2 = + cudf::test::structs_column_wrapper{{expect_s_1}, {0, 1, 1, 1, 1, 1}}.release(); + auto expected = table_view({*expect_s_2}); + + cudf_io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("being"); + expected_metadata.column_metadata[0].child(0).set_name("particulars"); + expected_metadata.column_metadata[0].child(0).child(0).set_name("age"); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + compare_metadata_equality(expected_metadata, result.metadata); + } + + { // Test selecting a non-leaf and expecting all hierarchy from that node onwards + cudf_io::parquet_reader_options read_args = + cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath)) + .columns({"being.particulars"}); + const auto result = cudf_io::read_parquet(read_args); + + auto expected_weights_col = + cudf::test::fixed_width_column_wrapper{1.1, 2.4, 5.3, 8.0, 9.6, 6.9}; + + auto expected_ages_col = cudf::test::fixed_width_column_wrapper{ + {48, 27, 25, 31, 351, 351}, {1, 1, 1, 1, 1, 0}}; + + auto expected_s_1 = cudf::test::structs_column_wrapper{ + {expected_weights_col, expected_ages_col}, {1, 1, 1, 1, 0, 1}}; + + auto expect_s_2 = + cudf::test::structs_column_wrapper{{expected_s_1}, {0, 1, 1, 1, 1, 1}}.release(); + auto expected = table_view({*expect_s_2}); + + cudf_io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("being"); + expected_metadata.column_metadata[0].child(0).set_name("particulars"); + expected_metadata.column_metadata[0].child(0).child(0).set_name("weight"); + expected_metadata.column_metadata[0].child(0).child(1).set_name("age"); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + compare_metadata_equality(expected_metadata, result.metadata); + } + + { // Test selecting struct children out of order + cudf_io::parquet_reader_options read_args = + cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath)) + .columns({"being.particulars.age", "being.particulars.weight", "being.human?"}); + const auto result = cudf_io::read_parquet(read_args); + + auto expected_weights_col = + cudf::test::fixed_width_column_wrapper{1.1, 2.4, 5.3, 8.0, 9.6, 6.9}; + + auto expected_ages_col = cudf::test::fixed_width_column_wrapper{ + {48, 27, 25, 31, 351, 351}, {1, 1, 1, 1, 1, 0}}; + + auto expected_is_human_col = cudf::test::fixed_width_column_wrapper{ + {true, true, false, false, false, false}, {1, 1, 0, 1, 1, 0}}; + + auto expect_s_1 = cudf::test::structs_column_wrapper{{expected_ages_col, expected_weights_col}, + {1, 1, 1, 1, 0, 1}}; + + auto expect_s_2 = + cudf::test::structs_column_wrapper{{expect_s_1, expected_is_human_col}, {0, 1, 1, 1, 1, 1}} + .release(); + + auto expected = table_view({*expect_s_2}); + + cudf_io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("being"); + expected_metadata.column_metadata[0].child(0).set_name("particulars"); + expected_metadata.column_metadata[0].child(0).child(0).set_name("age"); + expected_metadata.column_metadata[0].child(0).child(1).set_name("weight"); + expected_metadata.column_metadata[0].child(1).set_name("human?"); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + compare_metadata_equality(expected_metadata, result.metadata); + } +} + TEST_F(ParquetReaderTest, DecimalRead) { { diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index a18486cff3c..fa748761695 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -210,6 +210,10 @@ def read_parquet( else: filepaths_or_buffers.append(tmp_source) + if columns is not None: + if not is_list_like(columns): + raise ValueError("Expected list like for columns") + if filters is not None: # Convert filters to ds.Expression filters = pq._filters_to_expression(filters) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 21dc8315e32..e4a61a2a37e 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1140,6 +1140,84 @@ def test_parquet_reader_struct_basic(tmpdir, data): assert expect.equals(got.to_arrow()) +def select_columns_params(): + dfs = [ + # struct + ( + [ + {"a": 1, "b": 2}, + {"a": 10, "b": 20}, + {"a": None, "b": 22}, + {"a": None, "b": None}, + {"a": 15, "b": None}, + ], + [["struct"], ["struct.a"], ["struct.b"], ["c"]], + ), + # struct-of-list + ( + [ + {"a": 1, "b": 2, "c": [1, 2, 3]}, + {"a": 10, "b": 20, "c": [4, 5]}, + {"a": None, "b": 22, "c": [6]}, + {"a": None, "b": None, "c": None}, + {"a": 15, "b": None, "c": [-1, -2]}, + None, + {"a": 100, "b": 200, "c": [-10, None, -20]}, + ], + [ + ["struct"], + ["struct.c"], + ["struct.c.list"], + ["struct.c.list.item"], + ["struct.b", "struct.c"], + ["struct.b", "struct.d", "struct.c"], + ], + ), + # list-of-struct + ( + [ + [{"a": 1, "b": 2}, {"a": 2, "b": 3}, {"a": 4, "b": 5}], + None, + [{"a": 10, "b": 20}], + [{"a": 100, "b": 200}, {"a": None, "b": 300}, None], + ], + [ + ["struct"], + ["struct.list"], + ["struct.list.item"], + ["struct.list.item.a", "struct.list.item.b"], + ["struct.list.item.c"], + ], + ), + # struct with "." in field names + ( + [ + {"a.b": 1, "b.a": 2}, + {"a.b": 10, "b.a": 20}, + {"a.b": None, "b.a": 22}, + {"a.b": None, "b.a": None}, + {"a.b": 15, "b.a": None}, + ], + [["struct"], ["struct.a"], ["struct.b.a"]], + ), + ] + for df_col_pair in dfs: + for cols in df_col_pair[1]: + yield df_col_pair[0], cols + + +@pytest.mark.parametrize("data, columns", select_columns_params()) +def test_parquet_reader_struct_select_columns(tmpdir, data, columns): + table = pa.Table.from_pydict({"struct": data}) + buff = BytesIO() + + pa.parquet.write_table(table, buff) + + expect = pq.ParquetFile(buff).read(columns=columns) + got = cudf.read_parquet(buff, columns=columns) + assert expect.equals(got.to_arrow()) + + def test_parquet_reader_struct_los_large(tmpdir): num_rows = 256 list_size = 64 @@ -1860,26 +1938,18 @@ def test_parquet_writer_list_statistics(tmpdir): ] }, # List of Structs - pytest.param( - { - "family": [ - [ - None, - {"human?": True, "deets": {"weight": 2.4, "age": 27}}, - ], - [ - {"human?": None, "deets": {"weight": 5.3, "age": 25}}, - {"human?": False, "deets": {"weight": 8.0, "age": 31}}, - {"human?": False, "deets": None}, - ], - [], - [{"human?": None, "deets": {"weight": 6.9, "age": None}}], - ] - }, - marks=pytest.mark.xfail( - reason="https://github.com/rapidsai/cudf/issues/7561" - ), - ), + { + "family": [ + [None, {"human?": True, "deets": {"weight": 2.4, "age": 27}}], + [ + {"human?": None, "deets": {"weight": 5.3, "age": 25}}, + {"human?": False, "deets": {"weight": 8.0, "age": 31}}, + {"human?": False, "deets": None}, + ], + [], + [{"human?": None, "deets": {"weight": 6.9, "age": None}}], + ] + }, # Struct of Lists pytest.param( {