From 779857b1081989cdf81b705ac897ea3b899b5da5 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Tue, 18 Jul 2023 01:45:24 +0000 Subject: [PATCH 1/6] Fixing parquet list of struct interpretation --- cpp/src/io/parquet/parquet.hpp | 8 ++- cpp/src/io/parquet/reader_impl_helpers.cpp | 12 +++- cpp/tests/io/parquet_test.cpp | 73 ++++++++++++++++++++++ 3 files changed, 89 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index a25c7fab712..4618191f4ce 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -182,11 +182,15 @@ struct SchemaElement { // repeated value_type name [[nodiscard]] bool is_one_level_list(SchemaElement const& parent) const { - return repetition_type == REPEATED and num_children == 0 and not parent.is_list(); + return repetition_type == REPEATED and num_children == 0 and not parent.is_list(*this); } // returns true if the element is a list - [[nodiscard]] bool is_list() const { return converted_type == LIST; } + [[nodiscard]] bool is_list(SchemaElement const& child) const + { + return converted_type == LIST || + (type == UNDEFINED_TYPE && num_children == 1 && child.repetition_type == REPEATED); + } // in parquet terms, a group is a level of nesting in the schema. a group // can be a struct or a list diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 006b8d69aad..3efad7821a7 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -233,7 +233,13 @@ int64_t aggregate_reader_metadata::calc_num_rows() const { return std::accumulate( per_file_metadata.begin(), per_file_metadata.end(), 0l, [](auto& sum, auto& pfm) { - return sum + pfm.num_rows; + auto const rowgroup_rows = std::accumulate( + pfm.row_groups.begin(), pfm.row_groups.end(), 0l, [](auto& rg_sum, auto& rg) { + return rg_sum + rg.num_rows; + }); + CUDF_EXPECTS(pfm.num_rows == 0 || pfm.num_rows == rowgroup_rows, + "Header and row groups disagree about number of rows in file!"); + return sum + (pfm.num_rows == 0 && rowgroup_rows > 0 ? rowgroup_rows : pfm.num_rows); }); } @@ -428,7 +434,9 @@ aggregate_reader_metadata::select_columns(std::optional } // if we're at the root, this is a new output column - auto const col_type = schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx)) + auto const col_type = schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx)) || + (schema_elem.num_children >= 1 && + schema_elem.is_list(get_schema(schema_elem.children_idx[0]))) ? type_id::LIST : to_type_id(schema_elem, strings_to_categorical, timestamp_type_id); auto const dtype = to_data_type(col_type, schema_elem); diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index ff8d308318a..9191d7676ec 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -5618,4 +5618,77 @@ TEST_F(ParquetWriterTest, NoNullsAsNonNullable) EXPECT_NO_THROW(cudf::io::write_parquet(out_opts)); } +TEST_F(ParquetReaderTest, RepeatedNoAnnotations) +{ + unsigned char repeated_bytes[] = { + 0x50, 0x41, 0x52, 0x31, 0x15, 0x04, 0x15, 0x30, 0x15, 0x30, 0x4c, 0x15, 0x0c, 0x15, 0x00, 0x12, + 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x04, 0x00, + 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x15, 0x00, 0x15, 0x0a, 0x15, 0x0a, + 0x2c, 0x15, 0x0c, 0x15, 0x10, 0x15, 0x06, 0x15, 0x06, 0x00, 0x00, 0x03, 0x03, 0x88, 0xc6, 0x02, + 0x26, 0x80, 0x01, 0x1c, 0x15, 0x02, 0x19, 0x25, 0x00, 0x10, 0x19, 0x18, 0x02, 0x69, 0x64, 0x15, + 0x00, 0x16, 0x0c, 0x16, 0x78, 0x16, 0x78, 0x26, 0x54, 0x26, 0x08, 0x00, 0x00, 0x15, 0x04, 0x15, + 0x40, 0x15, 0x40, 0x4c, 0x15, 0x08, 0x15, 0x00, 0x12, 0x00, 0x00, 0xe3, 0x0c, 0x23, 0x4b, 0x01, + 0x00, 0x00, 0x00, 0xc7, 0x35, 0x3a, 0x42, 0x00, 0x00, 0x00, 0x00, 0x8e, 0x6b, 0x74, 0x84, 0x00, + 0x00, 0x00, 0x00, 0x55, 0xa1, 0xae, 0xc6, 0x00, 0x00, 0x00, 0x00, 0x15, 0x00, 0x15, 0x22, 0x15, + 0x22, 0x2c, 0x15, 0x10, 0x15, 0x10, 0x15, 0x06, 0x15, 0x06, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, + 0x03, 0xc0, 0x03, 0x00, 0x00, 0x00, 0x03, 0x90, 0xaa, 0x02, 0x03, 0x94, 0x03, 0x26, 0xda, 0x02, + 0x1c, 0x15, 0x04, 0x19, 0x25, 0x00, 0x10, 0x19, 0x38, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x4e, + 0x75, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x05, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x06, 0x6e, 0x75, 0x6d, + 0x62, 0x65, 0x72, 0x15, 0x00, 0x16, 0x10, 0x16, 0xa0, 0x01, 0x16, 0xa0, 0x01, 0x26, 0x96, 0x02, + 0x26, 0xba, 0x01, 0x00, 0x00, 0x15, 0x04, 0x15, 0x24, 0x15, 0x24, 0x4c, 0x15, 0x04, 0x15, 0x00, + 0x12, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x68, 0x6f, 0x6d, 0x65, 0x06, 0x00, 0x00, 0x00, 0x6d, + 0x6f, 0x62, 0x69, 0x6c, 0x65, 0x15, 0x00, 0x15, 0x20, 0x15, 0x20, 0x2c, 0x15, 0x10, 0x15, 0x10, + 0x15, 0x06, 0x15, 0x06, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0xc0, 0x03, 0x00, 0x00, 0x00, + 0x03, 0x90, 0xef, 0x01, 0x03, 0x04, 0x26, 0xcc, 0x04, 0x1c, 0x15, 0x0c, 0x19, 0x25, 0x00, 0x10, + 0x19, 0x38, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x05, + 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x15, 0x00, 0x16, 0x10, 0x16, 0x82, + 0x01, 0x16, 0x82, 0x01, 0x26, 0x8a, 0x04, 0x26, 0xca, 0x03, 0x00, 0x00, 0x15, 0x02, 0x19, 0x6c, + 0x48, 0x04, 0x75, 0x73, 0x65, 0x72, 0x15, 0x04, 0x00, 0x15, 0x02, 0x25, 0x00, 0x18, 0x02, 0x69, + 0x64, 0x00, 0x35, 0x02, 0x18, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, + 0x72, 0x73, 0x15, 0x02, 0x00, 0x35, 0x04, 0x18, 0x05, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x15, 0x04, + 0x00, 0x15, 0x04, 0x25, 0x00, 0x18, 0x06, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x00, 0x15, 0x0c, + 0x25, 0x02, 0x18, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x25, 0x00, 0x00, 0x16, 0x00, 0x19, 0x1c, 0x19, + 0x3c, 0x26, 0x80, 0x01, 0x1c, 0x15, 0x02, 0x19, 0x25, 0x00, 0x10, 0x19, 0x18, 0x02, 0x69, 0x64, + 0x15, 0x00, 0x16, 0x0c, 0x16, 0x78, 0x16, 0x78, 0x26, 0x54, 0x26, 0x08, 0x00, 0x00, 0x26, 0xda, + 0x02, 0x1c, 0x15, 0x04, 0x19, 0x25, 0x00, 0x10, 0x19, 0x38, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, + 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x05, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x06, 0x6e, 0x75, + 0x6d, 0x62, 0x65, 0x72, 0x15, 0x00, 0x16, 0x10, 0x16, 0xa0, 0x01, 0x16, 0xa0, 0x01, 0x26, 0x96, + 0x02, 0x26, 0xba, 0x01, 0x00, 0x00, 0x26, 0xcc, 0x04, 0x1c, 0x15, 0x0c, 0x19, 0x25, 0x00, 0x10, + 0x19, 0x38, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x05, + 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x15, 0x00, 0x16, 0x10, 0x16, 0x82, + 0x01, 0x16, 0x82, 0x01, 0x26, 0x8a, 0x04, 0x26, 0xca, 0x03, 0x00, 0x00, 0x16, 0x9a, 0x03, 0x16, + 0x0c, 0x00, 0x28, 0x49, 0x70, 0x61, 0x72, 0x71, 0x75, 0x65, 0x74, 0x2d, 0x72, 0x73, 0x20, 0x76, + 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x30, 0x2e, 0x33, 0x2e, 0x30, 0x20, 0x28, 0x62, 0x75, + 0x69, 0x6c, 0x64, 0x20, 0x62, 0x34, 0x35, 0x63, 0x65, 0x37, 0x63, 0x62, 0x61, 0x32, 0x31, 0x39, + 0x39, 0x66, 0x32, 0x32, 0x64, 0x39, 0x33, 0x32, 0x36, 0x39, 0x63, 0x31, 0x35, 0x30, 0x64, 0x38, + 0x61, 0x38, 0x33, 0x39, 0x31, 0x36, 0x63, 0x36, 0x39, 0x62, 0x35, 0x65, 0x29, 0x00, 0x32, 0x01, + 0x00, 0x00, 0x50, 0x41, 0x52, 0x31}; + + auto read_opts = cudf::io::parquet_reader_options::builder( + cudf::io::source_info{reinterpret_cast(repeated_bytes), sizeof(repeated_bytes)}); + auto result = cudf::io::read_parquet(read_opts); + + EXPECT_EQ(result.tbl->view().column(0).size(), 6); + EXPECT_EQ(result.tbl->view().num_columns(), 2); + + column_wrapper col0{1, 2, 3, 4, 5, 6}; + column_wrapper child0{{5555555555l, 1111111111l, 1111111111l, 2222222222l, 3333333333l}}; + cudf::test::strings_column_wrapper child1{{"-", "home", "home", "-", "mobile"}, {0, 1, 1, 0, 1}}; + auto struct_col = cudf::test::structs_column_wrapper{{child0, child1}}; + + auto list_offsets_column = + cudf::test::fixed_width_column_wrapper{0, 0, 0, 0, 1, 2, 5}.release(); + auto num_list_rows = list_offsets_column->size() - 1; + + auto mask = cudf::create_null_mask(6, cudf::mask_state::ALL_VALID); + cudf::set_null_mask(static_cast(mask.data()), 0, 2, false); + + auto list_col = cudf::make_lists_column( + num_list_rows, std::move(list_offsets_column), struct_col.release(), 2, std::move(mask)); + + table_view expected{{col0, *list_col}}; + + CUDF_TEST_EXPECT_TABLES_EQUAL(result.tbl->view(), expected); +} + CUDF_TEST_PROGRAM_MAIN() From 8eaea09b4801f6d7ebf6db24c4678919490617df Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Thu, 21 Sep 2023 20:51:18 +0000 Subject: [PATCH 2/6] changing to massage schema before decoding data --- cpp/src/io/parquet/parquet.hpp | 14 ++- cpp/src/io/parquet/reader_impl_helpers.cpp | 111 +++++++++++++++++-- cpp/src/io/parquet/reader_impl_helpers.hpp | 3 +- cpp/src/io/parquet/reader_impl_preprocess.cu | 16 +-- cpp/src/io/parquet/writer_impl.cu | 2 +- cpp/tests/io/parquet_test.cpp | 11 +- 6 files changed, 134 insertions(+), 23 deletions(-) diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 4618191f4ce..c8c961dab49 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -175,30 +175,34 @@ struct SchemaElement { // required int32 num; // }; // } - [[nodiscard]] bool is_stub() const { return repetition_type == REPEATED && num_children == 1; } + [[nodiscard]] bool is_stub(SchemaElement const& parent) const + { + return repetition_type == REPEATED && num_children == 1 && parent.converted_type == LIST; + } // https://github.com/apache/parquet-cpp/blob/642da05/src/parquet/schema.h#L49-L50 // One-level LIST encoding: Only allows required lists with required cells: // repeated value_type name [[nodiscard]] bool is_one_level_list(SchemaElement const& parent) const { - return repetition_type == REPEATED and num_children == 0 and not parent.is_list(*this); + return repetition_type == REPEATED and num_children == 0 and not parent.is_list(); } // returns true if the element is a list - [[nodiscard]] bool is_list(SchemaElement const& child) const + [[nodiscard]] bool is_list() const { return converted_type == LIST || - (type == UNDEFINED_TYPE && num_children == 1 && child.repetition_type == REPEATED); + (type == UNDEFINED_TYPE && num_children == 1 && repetition_type == REPEATED); } // in parquet terms, a group is a level of nesting in the schema. a group // can be a struct or a list [[nodiscard]] bool is_struct() const { + // this assumption might be a little weak. return type == UNDEFINED_TYPE && // this assumption might be a little weak. - ((repetition_type != REPEATED) || (repetition_type == REPEATED && num_children == 2)); + ((repetition_type != REPEATED) || (repetition_type == REPEATED && num_children > 1)); } }; diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 3efad7821a7..26c0a77cd18 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -175,6 +175,102 @@ type_id to_type_id(SchemaElement const& schema, return type_id::EMPTY; } +void metadata::sanitize_schema() +{ + // Parquet isn't very strict about incoming metadata. Lots of things can and should be inferred. + // There are also a lot of rules that simply aren't followed and are expected to be worked around. + // This step sanitizes the metadata to something that isn't ambiguous. + // + // Take, for example, the following schema: + // + // required group field_id=-1 user { + // required int32 field_id=-1 id; + // optional group field_id=-1 phoneNumbers { + // repeated group field_id=-1 phone { + // required int64 field_id=-1 number; + // optional binary field_id=-1 kind (String); + // } + // } + // } + // + // This real-world example has no annotations telling us what is a list or a struct. On the + // surface this looks like a column of id's and a column of list>, but this + // actually should be interpreted as a struct>>. The phoneNumbers field + // has to be a struct because it is a group with no repeated tag and we have no annotation. The + // repeated group is actually BOTH a struct due to the multiple children and a list due to + // repeated. + // + // This code attempts to make this less messy for the code that follows. + + std::function print_node = [&](size_t idx, std::string prefix) { + auto& e = schema[idx]; + printf( + "%sschema element %lu(%s) type %d, converted type %d, repetition_type %d, num_children %d " + "parent %u, max def %d, max rep %d\n", + prefix.c_str(), + idx, + e.name.c_str(), + (int)e.type, + (int)e.converted_type, + (int)e.repetition_type, + (int)e.num_children, + e.parent_idx, + e.max_definition_level, + e.max_repetition_level); + for (auto& child_idx : e.children_idx) { + print_node(child_idx, " " + prefix); + } + }; + + std::function process = [&](size_t schema_idx) -> void { + if (schema_idx < 0) { return; } + auto& schema_elem = schema[schema_idx]; + if (schema_idx != 0 && schema_elem.type == UNDEFINED_TYPE) { + if (schema_elem.type == UNDEFINED_TYPE && schema_elem.repetition_type == REPEATED && + schema_elem.num_children > 1) { + // This is a list of structs, so we need to add a need to both mark this as a list, but also + // add a struct child and move this element's children to the struct + schema_elem.converted_type = LIST; + schema_elem.repetition_type = OPTIONAL; + auto const struct_node_idx = schema.size(); + + SchemaElement struct_elem; + struct_elem.name = "struct_node"; + struct_elem.repetition_type = REQUIRED; + struct_elem.num_children = schema_elem.num_children; + struct_elem.type = UNDEFINED_TYPE; + struct_elem.converted_type = UNKNOWN; + + // swap children + struct_elem.children_idx = std::move(schema_elem.children_idx); + schema_elem.children_idx = {struct_node_idx}; + schema_elem.num_children = 1; + + struct_elem.max_definition_level = schema_elem.max_definition_level; + struct_elem.max_repetition_level = schema_elem.max_repetition_level; + schema_elem.max_repetition_level = schema[schema_elem.parent_idx].max_repetition_level; + + // change parent index on new node and on children + struct_elem.parent_idx = schema_idx; + for (auto& child_idx : struct_elem.children_idx) { + schema[child_idx].parent_idx = struct_node_idx; + } + // add our struct + schema.push_back(struct_elem); + } + } + + for (auto& child_idx : schema_elem.children_idx) { + process(child_idx); + } + }; + + // printf("initial layout:\n"); + // print_node(0, "initial "); + process(0); + // print_node(0, "final_layout "); +} + metadata::metadata(datasource* source) { constexpr auto header_len = sizeof(file_header_s); @@ -195,6 +291,7 @@ metadata::metadata(datasource* source) CompactProtocolReader cp(buffer->data(), ender->footer_len); CUDF_EXPECTS(cp.read(this), "Cannot parse metadata"); CUDF_EXPECTS(cp.InitSchema(this), "Cannot initialize schema"); + sanitize_schema(); } std::vector aggregate_reader_metadata::metadatas_from_sources( @@ -425,7 +522,7 @@ aggregate_reader_metadata::select_columns(std::optional // if schema_elem is a stub then it does not exist in the column_name_info and column_buffer // hierarchy. So continue on - if (schema_elem.is_stub()) { + if (schema_elem.is_stub(get_schema(schema_elem.parent_idx))) { // is this legit? CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub"); auto child_col_name_info = (col_name_info) ? &col_name_info->children[0] : nullptr; @@ -433,10 +530,10 @@ aggregate_reader_metadata::select_columns(std::optional child_col_name_info, schema_elem.children_idx[0], out_col_array, has_list_parent); } + auto const one_level_list = schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx)); + // if we're at the root, this is a new output column - auto const col_type = schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx)) || - (schema_elem.num_children >= 1 && - schema_elem.is_list(get_schema(schema_elem.children_idx[0]))) + auto const col_type = one_level_list ? type_id::LIST : to_type_id(schema_elem, strings_to_categorical, timestamp_type_id); auto const dtype = to_data_type(col_type, schema_elem); @@ -475,7 +572,7 @@ aggregate_reader_metadata::select_columns(std::optional input_column_info{schema_idx, schema_elem.name, schema_elem.max_repetition_level > 0}); // set up child output column for one-level encoding list - if (schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))) { + if (one_level_list) { // determine the element data type auto const element_type = to_type_id(schema_elem, strings_to_categorical, timestamp_type_id); @@ -496,9 +593,7 @@ aggregate_reader_metadata::select_columns(std::optional std::copy(nesting.cbegin(), nesting.cend(), std::back_inserter(input_col.nesting)); // pop off the extra nesting element. - if (schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))) { - nesting.pop_back(); - } + if (one_level_list) { nesting.pop_back(); } path_is_valid = true; // If we're able to reach leaf then path is valid } diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 0192dcd373b..2ac804521bc 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -65,6 +65,7 @@ struct row_group_info { */ struct metadata : public FileMetaData { explicit metadata(datasource* source); + void sanitize_schema(); }; class aggregate_reader_metadata { @@ -130,7 +131,7 @@ class aggregate_reader_metadata { // walk upwards, skipping repeated fields while (schema_index > 0) { auto const& elm = pfm.schema[schema_index]; - if (!elm.is_stub()) { depth++; } + if (!elm.is_stub(pfm.schema[elm.parent_idx])) { depth++; } // schema of one-level encoding list doesn't contain nesting information, so we need to // manually add an extra nesting level if (elm.is_one_level_list(pfm.schema[elm.parent_idx])) { depth++; } diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 8c3bdabe6b4..76db3bee016 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -116,13 +116,14 @@ void generate_depth_remappings(std::map, std::ve auto cur_schema = md.get_schema(schema_idx); if (cur_schema.max_repetition_level == r) { // if this is a repeated field, map it one level deeper - shallowest = cur_schema.is_stub() ? cur_depth + 1 : cur_depth; + shallowest = + cur_schema.is_stub(md.get_schema(cur_schema.parent_idx)) ? cur_depth + 1 : cur_depth; } // if it's one-level encoding list else if (cur_schema.is_one_level_list(md.get_schema(cur_schema.parent_idx))) { shallowest = cur_depth - 1; } - if (!cur_schema.is_stub()) { cur_depth--; } + if (!cur_schema.is_stub(md.get_schema(cur_schema.parent_idx))) { cur_depth--; } schema_idx = cur_schema.parent_idx; } return shallowest; @@ -140,8 +141,9 @@ void generate_depth_remappings(std::map, std::ve SchemaElement cur_schema = md.get_schema(schema_idx); if (cur_schema.max_definition_level == d) { // if this is a repeated field, map it one level deeper - r1 = cur_schema.is_stub() ? prev_schema.max_repetition_level - : cur_schema.max_repetition_level; + r1 = cur_schema.is_stub(md.get_schema(cur_schema.parent_idx)) + ? prev_schema.max_repetition_level + : cur_schema.max_repetition_level; break; } prev_schema = cur_schema; @@ -156,10 +158,10 @@ void generate_depth_remappings(std::map, std::ve SchemaElement cur_schema = md.get_schema(schema_idx); if (cur_schema.max_repetition_level == r1) { // if this is a repeated field, map it one level deeper - depth = cur_schema.is_stub() ? depth + 1 : depth; + depth = cur_schema.is_stub(md.get_schema(cur_schema.parent_idx)) ? depth + 1 : depth; break; } - if (!cur_schema.is_stub()) { depth--; } + if (!cur_schema.is_stub(md.get_schema(cur_schema.parent_idx))) { depth--; } prev_schema = cur_schema; schema_idx = cur_schema.parent_idx; } @@ -648,7 +650,7 @@ void reader::impl::allocate_nesting_info() while (schema_idx > 0) { // stub columns (basically the inner field of a list scheme element) are not real columns. // we can ignore them for the purposes of output nesting info - if (!cur_schema.is_stub()) { + if (!cur_schema.is_stub(_metadata->get_schema(cur_schema.parent_idx))) { // initialize each page within the chunk for (int p_idx = 0; p_idx < chunks[idx].num_data_pages; p_idx++) { gpu::PageNestingInfo* pni = diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 17a0a903a47..337992c2e22 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -846,7 +846,7 @@ parquet_column_view::parquet_column_view(schema_tree_node const& schema_node, std::vector r_nullability; curr_schema_node = schema_node; while (curr_schema_node.parent_idx != -1) { - if (not curr_schema_node.is_stub()) { + if (not curr_schema_node.is_stub(schema_tree[curr_schema_node.parent_idx])) { r_nullability.push_back(curr_schema_node.repetition_type == FieldRepetitionType::OPTIONAL); } curr_schema_node = schema_tree[curr_schema_node.parent_idx]; diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 9191d7676ec..f4d259e62f9 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -5686,7 +5686,16 @@ TEST_F(ParquetReaderTest, RepeatedNoAnnotations) auto list_col = cudf::make_lists_column( num_list_rows, std::move(list_offsets_column), struct_col.release(), 2, std::move(mask)); - table_view expected{{col0, *list_col}}; + std::vector> struct_children; + struct_children.push_back(std::move(list_col)); + + auto outer_struct = + cudf::test::structs_column_wrapper{{std::move(struct_children)}, {0, 0, 1, 1, 1, 1}}; + table_view expected{{col0, outer_struct}}; + + cudf::test::print(expected.column(1)); + printf("vs:\n"); + cudf::test::print(result.tbl->view().column(1)); CUDF_TEST_EXPECT_TABLES_EQUAL(result.tbl->view(), expected); } From bd0e4c6248a9f2a0e2d42c0440a94c65e2922fda Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Tue, 26 Sep 2023 01:24:22 +0000 Subject: [PATCH 3/6] Fixing definition level issue --- cpp/src/io/parquet/page_decode.cuh | 2 +- cpp/src/io/parquet/reader_impl_helpers.cpp | 26 ++-------------------- cpp/tests/io/parquet_test.cpp | 4 ---- 3 files changed, 3 insertions(+), 29 deletions(-) diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 4469ec59b7a..487662631cb 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -718,7 +718,7 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value // for nested schemas, it's more complicated. This warp will visit 32 incoming values, // however not all of them will necessarily represent a value at this nesting level. so // the validity bit for thread t might actually represent output value t-6. the correct - // position for thread t's bit is cur_value_count. for cuda 11 we could use + // position for thread t's bit is thread_value_count. for cuda 11 we could use // __reduce_or_sync(), but until then we have to do a warp reduce. WarpReduceOr32(is_valid << thread_value_count); diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 26c0a77cd18..7b8b33fbdf7 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -202,33 +202,13 @@ void metadata::sanitize_schema() // // This code attempts to make this less messy for the code that follows. - std::function print_node = [&](size_t idx, std::string prefix) { - auto& e = schema[idx]; - printf( - "%sschema element %lu(%s) type %d, converted type %d, repetition_type %d, num_children %d " - "parent %u, max def %d, max rep %d\n", - prefix.c_str(), - idx, - e.name.c_str(), - (int)e.type, - (int)e.converted_type, - (int)e.repetition_type, - (int)e.num_children, - e.parent_idx, - e.max_definition_level, - e.max_repetition_level); - for (auto& child_idx : e.children_idx) { - print_node(child_idx, " " + prefix); - } - }; - std::function process = [&](size_t schema_idx) -> void { if (schema_idx < 0) { return; } auto& schema_elem = schema[schema_idx]; if (schema_idx != 0 && schema_elem.type == UNDEFINED_TYPE) { if (schema_elem.type == UNDEFINED_TYPE && schema_elem.repetition_type == REPEATED && schema_elem.num_children > 1) { - // This is a list of structs, so we need to add a need to both mark this as a list, but also + // This is a list of structs, so we need to mark this as a list, but also // add a struct child and move this element's children to the struct schema_elem.converted_type = LIST; schema_elem.repetition_type = OPTIONAL; @@ -248,6 +228,7 @@ void metadata::sanitize_schema() struct_elem.max_definition_level = schema_elem.max_definition_level; struct_elem.max_repetition_level = schema_elem.max_repetition_level; + schema_elem.max_definition_level--; schema_elem.max_repetition_level = schema[schema_elem.parent_idx].max_repetition_level; // change parent index on new node and on children @@ -265,10 +246,7 @@ void metadata::sanitize_schema() } }; - // printf("initial layout:\n"); - // print_node(0, "initial "); process(0); - // print_node(0, "final_layout "); } metadata::metadata(datasource* source) diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index f4d259e62f9..2d261e7d20a 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -5693,10 +5693,6 @@ TEST_F(ParquetReaderTest, RepeatedNoAnnotations) cudf::test::structs_column_wrapper{{std::move(struct_children)}, {0, 0, 1, 1, 1, 1}}; table_view expected{{col0, outer_struct}}; - cudf::test::print(expected.column(1)); - printf("vs:\n"); - cudf::test::print(result.tbl->view().column(1)); - CUDF_TEST_EXPECT_TABLES_EQUAL(result.tbl->view(), expected); } From 939497b0c5b4226551254caf12fd87694e2e95d4 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Tue, 3 Oct 2023 02:41:01 +0000 Subject: [PATCH 4/6] Fixing misinterpretation of maps --- cpp/src/io/parquet/parquet.hpp | 11 ++--------- cpp/src/io/parquet/reader_impl_helpers.cpp | 5 +++-- cpp/tests/io/parquet_test.cpp | 2 +- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index c35345178e8..eb665f85b5e 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -187,10 +187,7 @@ struct SchemaElement { // required int32 num; // }; // } - [[nodiscard]] bool is_stub(SchemaElement const& parent) const - { - return repetition_type == REPEATED && num_children == 1 && parent.converted_type == LIST; - } + [[nodiscard]] bool is_stub() const { return repetition_type == REPEATED && num_children == 1; } // https://github.com/apache/parquet-cpp/blob/642da05/src/parquet/schema.h#L49-L50 // One-level LIST encoding: Only allows required lists with required cells: @@ -201,11 +198,7 @@ struct SchemaElement { } // returns true if the element is a list - [[nodiscard]] bool is_list() const - { - return converted_type == LIST || - (type == UNDEFINED_TYPE && num_children == 1 && repetition_type == REPEATED); - } + [[nodiscard]] bool is_list() const { return converted_type == LIST; } // in parquet terms, a group is a level of nesting in the schema. a group // can be a struct or a list diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 8a196eecb21..2bdc8eeb812 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -206,8 +206,9 @@ void metadata::sanitize_schema() if (schema_idx < 0) { return; } auto& schema_elem = schema[schema_idx]; if (schema_idx != 0 && schema_elem.type == UNDEFINED_TYPE) { - if (schema_elem.type == UNDEFINED_TYPE && schema_elem.repetition_type == REPEATED && - schema_elem.num_children > 1) { + auto const parent_type = schema[schema_elem.parent_idx].converted_type; + if (schema_elem.repetition_type == REPEATED && schema_elem.num_children > 1 && + parent_type != LIST && parent_type != MAP) { // This is a list of structs, so we need to mark this as a list, but also // add a struct child and move this element's children to the struct schema_elem.converted_type = LIST; diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index a59fd90b410..73c946a5feb 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -6734,7 +6734,7 @@ TEST_P(ParquetV2Test, CheckEncodings) TEST_F(ParquetReaderTest, RepeatedNoAnnotations) { - unsigned char repeated_bytes[] = { + constexpr unsigned char repeated_bytes[] = { 0x50, 0x41, 0x52, 0x31, 0x15, 0x04, 0x15, 0x30, 0x15, 0x30, 0x4c, 0x15, 0x0c, 0x15, 0x00, 0x12, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x15, 0x00, 0x15, 0x0a, 0x15, 0x0a, From 630e8f8542d2e5ad8f9dc12e0fa4e29cd72bfd03 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Tue, 3 Oct 2023 03:07:02 +0000 Subject: [PATCH 5/6] removing stub parameter from calling locations --- cpp/src/io/parquet/reader_impl_helpers.cpp | 2 +- cpp/src/io/parquet/reader_impl_helpers.hpp | 2 +- cpp/src/io/parquet/reader_impl_preprocess.cu | 16 +++++++--------- cpp/src/io/parquet/writer_impl.cu | 2 +- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 2bdc8eeb812..06370f6e513 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -513,7 +513,7 @@ aggregate_reader_metadata::select_columns(std::optional // if schema_elem is a stub then it does not exist in the column_name_info and column_buffer // hierarchy. So continue on - if (schema_elem.is_stub(get_schema(schema_elem.parent_idx))) { + if (schema_elem.is_stub()) { // is this legit? CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub"); auto child_col_name_info = (col_name_info) ? &col_name_info->children[0] : nullptr; diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 0e4f0923e02..b8416d65a6a 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -138,7 +138,7 @@ class aggregate_reader_metadata { // walk upwards, skipping repeated fields while (schema_index > 0) { auto const& elm = pfm.schema[schema_index]; - if (!elm.is_stub(pfm.schema[elm.parent_idx])) { depth++; } + if (!elm.is_stub()) { depth++; } // schema of one-level encoding list doesn't contain nesting information, so we need to // manually add an extra nesting level if (elm.is_one_level_list(pfm.schema[elm.parent_idx])) { depth++; } diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 9252f433189..a2db0de26bb 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -116,14 +116,13 @@ void generate_depth_remappings(std::map, std::ve auto cur_schema = md.get_schema(schema_idx); if (cur_schema.max_repetition_level == r) { // if this is a repeated field, map it one level deeper - shallowest = - cur_schema.is_stub(md.get_schema(cur_schema.parent_idx)) ? cur_depth + 1 : cur_depth; + shallowest = cur_schema.is_stub() ? cur_depth + 1 : cur_depth; } // if it's one-level encoding list else if (cur_schema.is_one_level_list(md.get_schema(cur_schema.parent_idx))) { shallowest = cur_depth - 1; } - if (!cur_schema.is_stub(md.get_schema(cur_schema.parent_idx))) { cur_depth--; } + if (!cur_schema.is_stub()) { cur_depth--; } schema_idx = cur_schema.parent_idx; } return shallowest; @@ -141,9 +140,8 @@ void generate_depth_remappings(std::map, std::ve SchemaElement cur_schema = md.get_schema(schema_idx); if (cur_schema.max_definition_level == d) { // if this is a repeated field, map it one level deeper - r1 = cur_schema.is_stub(md.get_schema(cur_schema.parent_idx)) - ? prev_schema.max_repetition_level - : cur_schema.max_repetition_level; + r1 = cur_schema.is_stub() ? prev_schema.max_repetition_level + : cur_schema.max_repetition_level; break; } prev_schema = cur_schema; @@ -158,10 +156,10 @@ void generate_depth_remappings(std::map, std::ve SchemaElement cur_schema = md.get_schema(schema_idx); if (cur_schema.max_repetition_level == r1) { // if this is a repeated field, map it one level deeper - depth = cur_schema.is_stub(md.get_schema(cur_schema.parent_idx)) ? depth + 1 : depth; + depth = cur_schema.is_stub() ? depth + 1 : depth; break; } - if (!cur_schema.is_stub(md.get_schema(cur_schema.parent_idx))) { depth--; } + if (!cur_schema.is_stub()) { depth--; } prev_schema = cur_schema; schema_idx = cur_schema.parent_idx; } @@ -652,7 +650,7 @@ void reader::impl::allocate_nesting_info() while (schema_idx > 0) { // stub columns (basically the inner field of a list scheme element) are not real columns. // we can ignore them for the purposes of output nesting info - if (!cur_schema.is_stub(_metadata->get_schema(cur_schema.parent_idx))) { + if (!cur_schema.is_stub()) { // initialize each page within the chunk for (int p_idx = 0; p_idx < chunks[idx].num_data_pages; p_idx++) { gpu::PageNestingInfo* pni = diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index f78d0dce232..a124f352ee4 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -863,7 +863,7 @@ parquet_column_view::parquet_column_view(schema_tree_node const& schema_node, std::vector r_nullability; curr_schema_node = schema_node; while (curr_schema_node.parent_idx != -1) { - if (not curr_schema_node.is_stub(schema_tree[curr_schema_node.parent_idx])) { + if (not curr_schema_node.is_stub()) { r_nullability.push_back(curr_schema_node.repetition_type == FieldRepetitionType::OPTIONAL); } curr_schema_node = schema_tree[curr_schema_node.parent_idx]; From ef9dcdf170e3c03a8b2dffeb35b52e061a8a041d Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Fri, 6 Oct 2023 00:08:29 +0000 Subject: [PATCH 6/6] removing redundant comment of redundancy --- cpp/src/io/parquet/parquet.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index eb665f85b5e..1df49262e87 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -204,7 +204,6 @@ struct SchemaElement { // can be a struct or a list [[nodiscard]] bool is_struct() const { - // this assumption might be a little weak. return type == UNDEFINED_TYPE && // this assumption might be a little weak. ((repetition_type != REPEATED) || (repetition_type == REPEATED && num_children > 1));