Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing parquet list of struct interpretation #13715

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
779857b
Fixing parquet list of struct interpretation
hyperbolic2346 Jul 18, 2023
7a68d43
Merge remote-tracking branch 'upstream/branch-23.08' into mwilson/par…
hyperbolic2346 Jul 18, 2023
7a893af
Merge branch 'branch-23.08' into mwilson/parquet_list_of_struct
vuule Jul 19, 2023
8eaea09
changing to massage schema before decoding data
hyperbolic2346 Sep 21, 2023
8f9d989
Merge branch 'mwilson/parquet_list_of_struct' of github.com:hyperboli…
hyperbolic2346 Sep 21, 2023
f2662f3
Merge branch 'branch-23.10' into mwilson/parquet_list_of_struct
hyperbolic2346 Sep 21, 2023
bd0e4c6
Fixing definition level issue
hyperbolic2346 Sep 26, 2023
0382d87
Merge remote-tracking branch 'upstream/branch-23.10' into mwilson/par…
hyperbolic2346 Sep 26, 2023
115a115
Merge branch 'mwilson/parquet_list_of_struct' of github.com:hyperboli…
hyperbolic2346 Sep 26, 2023
987fb9d
Merge remote-tracking branch 'upstream/branch-23.10' into mwilson/par…
hyperbolic2346 Sep 27, 2023
939497b
Fixing misinterpretation of maps
hyperbolic2346 Oct 3, 2023
630e8f8
removing stub parameter from calling locations
hyperbolic2346 Oct 3, 2023
cfad781
Merge remote-tracking branch 'upstream/branch-23.12' into mwilson/par…
hyperbolic2346 Oct 3, 2023
5b42ada
Merge branch 'branch-23.12' into mwilson/parquet_list_of_struct
harrism Oct 3, 2023
01327d2
Merge branch 'branch-23.12' into mwilson/parquet_list_of_struct
hyperbolic2346 Oct 4, 2023
11dd94e
Merge branch 'branch-23.12' into mwilson/parquet_list_of_struct
hyperbolic2346 Oct 5, 2023
ef9dcdf
removing redundant comment of redundancy
hyperbolic2346 Oct 6, 2023
5b1cabc
Merge branch 'mwilson/parquet_list_of_struct' of github.com:hyperboli…
hyperbolic2346 Oct 6, 2023
3179e99
Merge branch 'branch-23.12' into mwilson/parquet_list_of_struct
hyperbolic2346 Oct 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value
// for nested schemas, it's more complicated. This warp will visit 32 incoming values,
// however not all of them will necessarily represent a value at this nesting level. so
// the validity bit for thread t might actually represent output value t-6. the correct
// position for thread t's bit is cur_value_count. for cuda 11 we could use
// position for thread t's bit is thread_value_count. for cuda 11 we could use
// __reduce_or_sync(), but until then we have to do a warp reduce.
WarpReduceOr32(is_valid << thread_value_count);

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,10 @@ struct SchemaElement {
// can be a struct or a list
[[nodiscard]] bool is_struct() const
{
// this assumption might be a little weak.
return type == UNDEFINED_TYPE &&
// this assumption might be a little weak.
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
((repetition_type != REPEATED) || (repetition_type == REPEATED && num_children == 2));
((repetition_type != REPEATED) || (repetition_type == REPEATED && num_children > 1));
}
};

Expand Down
86 changes: 81 additions & 5 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,81 @@ type_id to_type_id(SchemaElement const& schema,
return type_id::EMPTY;
}

void metadata::sanitize_schema()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat approach, localizes the madness 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could ultimately fix things like single-level list as well. I like this approach too.

{
// Parquet isn't very strict about incoming metadata. Lots of things can and should be inferred.
// There are also a lot of rules that simply aren't followed and are expected to be worked around.
// This step sanitizes the metadata to something that isn't ambiguous.
//
// Take, for example, the following schema:
//
// required group field_id=-1 user {
// required int32 field_id=-1 id;
// optional group field_id=-1 phoneNumbers {
// repeated group field_id=-1 phone {
// required int64 field_id=-1 number;
// optional binary field_id=-1 kind (String);
// }
// }
// }
//
// This real-world example has no annotations telling us what is a list or a struct. On the
// surface this looks like a column of id's and a column of list<struct<int64, string>>, but this
// actually should be interpreted as a struct<list<struct<int64, string>>>. The phoneNumbers field
// has to be a struct because it is a group with no repeated tag and we have no annotation. The
// repeated group is actually BOTH a struct due to the multiple children and a list due to
// repeated.
//
// This code attempts to make this less messy for the code that follows.

std::function<void(size_t)> process = [&](size_t schema_idx) -> void {
if (schema_idx < 0) { return; }
auto& schema_elem = schema[schema_idx];
if (schema_idx != 0 && schema_elem.type == UNDEFINED_TYPE) {
auto const parent_type = schema[schema_elem.parent_idx].converted_type;
if (schema_elem.repetition_type == REPEATED && schema_elem.num_children > 1 &&
parent_type != LIST && parent_type != MAP) {
// This is a list of structs, so we need to mark this as a list, but also
// add a struct child and move this element's children to the struct
schema_elem.converted_type = LIST;
schema_elem.repetition_type = OPTIONAL;
auto const struct_node_idx = schema.size();

SchemaElement struct_elem;
struct_elem.name = "struct_node";
struct_elem.repetition_type = REQUIRED;
struct_elem.num_children = schema_elem.num_children;
struct_elem.type = UNDEFINED_TYPE;
struct_elem.converted_type = UNKNOWN;

// swap children
struct_elem.children_idx = std::move(schema_elem.children_idx);
schema_elem.children_idx = {struct_node_idx};
schema_elem.num_children = 1;
vuule marked this conversation as resolved.
Show resolved Hide resolved

struct_elem.max_definition_level = schema_elem.max_definition_level;
struct_elem.max_repetition_level = schema_elem.max_repetition_level;
schema_elem.max_definition_level--;
schema_elem.max_repetition_level = schema[schema_elem.parent_idx].max_repetition_level;

// change parent index on new node and on children
struct_elem.parent_idx = schema_idx;
for (auto& child_idx : struct_elem.children_idx) {
schema[child_idx].parent_idx = struct_node_idx;
}
// add our struct
schema.push_back(struct_elem);
}
}

for (auto& child_idx : schema_elem.children_idx) {
process(child_idx);
}
};

process(0);
}

metadata::metadata(datasource* source)
{
constexpr auto header_len = sizeof(file_header_s);
Expand All @@ -195,6 +270,7 @@ metadata::metadata(datasource* source)
CompactProtocolReader cp(buffer->data(), ender->footer_len);
CUDF_EXPECTS(cp.read(this), "Cannot parse metadata");
CUDF_EXPECTS(cp.InitSchema(this), "Cannot initialize schema");
sanitize_schema();
}

std::vector<metadata> aggregate_reader_metadata::metadatas_from_sources(
Expand Down Expand Up @@ -445,8 +521,10 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>
child_col_name_info, schema_elem.children_idx[0], out_col_array, has_list_parent);
}

auto const one_level_list = schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx));

// if we're at the root, this is a new output column
auto const col_type = schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))
auto const col_type = one_level_list
? type_id::LIST
: to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const dtype = to_data_type(col_type, schema_elem);
Expand Down Expand Up @@ -485,7 +563,7 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>
input_column_info{schema_idx, schema_elem.name, schema_elem.max_repetition_level > 0});

// set up child output column for one-level encoding list
if (schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))) {
if (one_level_list) {
// determine the element data type
auto const element_type =
to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
Expand All @@ -506,9 +584,7 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>
std::copy(nesting.cbegin(), nesting.cend(), std::back_inserter(input_col.nesting));

// pop off the extra nesting element.
if (schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))) {
nesting.pop_back();
}
if (one_level_list) { nesting.pop_back(); }

path_is_valid = true; // If we're able to reach leaf then path is valid
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ using namespace cudf::io::parquet;
*/
struct metadata : public FileMetaData {
explicit metadata(datasource* source);
void sanitize_schema();
};

class aggregate_reader_metadata {
Expand Down
78 changes: 78 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6732,4 +6732,82 @@ TEST_P(ParquetV2Test, CheckEncodings)
}
}

TEST_F(ParquetReaderTest, RepeatedNoAnnotations)
{
constexpr unsigned char repeated_bytes[] = {
0x50, 0x41, 0x52, 0x31, 0x15, 0x04, 0x15, 0x30, 0x15, 0x30, 0x4c, 0x15, 0x0c, 0x15, 0x00, 0x12,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x04, 0x00,
0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x15, 0x00, 0x15, 0x0a, 0x15, 0x0a,
0x2c, 0x15, 0x0c, 0x15, 0x10, 0x15, 0x06, 0x15, 0x06, 0x00, 0x00, 0x03, 0x03, 0x88, 0xc6, 0x02,
0x26, 0x80, 0x01, 0x1c, 0x15, 0x02, 0x19, 0x25, 0x00, 0x10, 0x19, 0x18, 0x02, 0x69, 0x64, 0x15,
0x00, 0x16, 0x0c, 0x16, 0x78, 0x16, 0x78, 0x26, 0x54, 0x26, 0x08, 0x00, 0x00, 0x15, 0x04, 0x15,
0x40, 0x15, 0x40, 0x4c, 0x15, 0x08, 0x15, 0x00, 0x12, 0x00, 0x00, 0xe3, 0x0c, 0x23, 0x4b, 0x01,
0x00, 0x00, 0x00, 0xc7, 0x35, 0x3a, 0x42, 0x00, 0x00, 0x00, 0x00, 0x8e, 0x6b, 0x74, 0x84, 0x00,
0x00, 0x00, 0x00, 0x55, 0xa1, 0xae, 0xc6, 0x00, 0x00, 0x00, 0x00, 0x15, 0x00, 0x15, 0x22, 0x15,
0x22, 0x2c, 0x15, 0x10, 0x15, 0x10, 0x15, 0x06, 0x15, 0x06, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
0x03, 0xc0, 0x03, 0x00, 0x00, 0x00, 0x03, 0x90, 0xaa, 0x02, 0x03, 0x94, 0x03, 0x26, 0xda, 0x02,
0x1c, 0x15, 0x04, 0x19, 0x25, 0x00, 0x10, 0x19, 0x38, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x4e,
0x75, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x05, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x06, 0x6e, 0x75, 0x6d,
0x62, 0x65, 0x72, 0x15, 0x00, 0x16, 0x10, 0x16, 0xa0, 0x01, 0x16, 0xa0, 0x01, 0x26, 0x96, 0x02,
0x26, 0xba, 0x01, 0x00, 0x00, 0x15, 0x04, 0x15, 0x24, 0x15, 0x24, 0x4c, 0x15, 0x04, 0x15, 0x00,
0x12, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x68, 0x6f, 0x6d, 0x65, 0x06, 0x00, 0x00, 0x00, 0x6d,
0x6f, 0x62, 0x69, 0x6c, 0x65, 0x15, 0x00, 0x15, 0x20, 0x15, 0x20, 0x2c, 0x15, 0x10, 0x15, 0x10,
0x15, 0x06, 0x15, 0x06, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0xc0, 0x03, 0x00, 0x00, 0x00,
0x03, 0x90, 0xef, 0x01, 0x03, 0x04, 0x26, 0xcc, 0x04, 0x1c, 0x15, 0x0c, 0x19, 0x25, 0x00, 0x10,
0x19, 0x38, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x05,
0x70, 0x68, 0x6f, 0x6e, 0x65, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x15, 0x00, 0x16, 0x10, 0x16, 0x82,
0x01, 0x16, 0x82, 0x01, 0x26, 0x8a, 0x04, 0x26, 0xca, 0x03, 0x00, 0x00, 0x15, 0x02, 0x19, 0x6c,
0x48, 0x04, 0x75, 0x73, 0x65, 0x72, 0x15, 0x04, 0x00, 0x15, 0x02, 0x25, 0x00, 0x18, 0x02, 0x69,
0x64, 0x00, 0x35, 0x02, 0x18, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65,
0x72, 0x73, 0x15, 0x02, 0x00, 0x35, 0x04, 0x18, 0x05, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x15, 0x04,
0x00, 0x15, 0x04, 0x25, 0x00, 0x18, 0x06, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x00, 0x15, 0x0c,
0x25, 0x02, 0x18, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x25, 0x00, 0x00, 0x16, 0x00, 0x19, 0x1c, 0x19,
0x3c, 0x26, 0x80, 0x01, 0x1c, 0x15, 0x02, 0x19, 0x25, 0x00, 0x10, 0x19, 0x18, 0x02, 0x69, 0x64,
0x15, 0x00, 0x16, 0x0c, 0x16, 0x78, 0x16, 0x78, 0x26, 0x54, 0x26, 0x08, 0x00, 0x00, 0x26, 0xda,
0x02, 0x1c, 0x15, 0x04, 0x19, 0x25, 0x00, 0x10, 0x19, 0x38, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65,
0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x05, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x06, 0x6e, 0x75,
0x6d, 0x62, 0x65, 0x72, 0x15, 0x00, 0x16, 0x10, 0x16, 0xa0, 0x01, 0x16, 0xa0, 0x01, 0x26, 0x96,
0x02, 0x26, 0xba, 0x01, 0x00, 0x00, 0x26, 0xcc, 0x04, 0x1c, 0x15, 0x0c, 0x19, 0x25, 0x00, 0x10,
0x19, 0x38, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x05,
0x70, 0x68, 0x6f, 0x6e, 0x65, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x15, 0x00, 0x16, 0x10, 0x16, 0x82,
0x01, 0x16, 0x82, 0x01, 0x26, 0x8a, 0x04, 0x26, 0xca, 0x03, 0x00, 0x00, 0x16, 0x9a, 0x03, 0x16,
0x0c, 0x00, 0x28, 0x49, 0x70, 0x61, 0x72, 0x71, 0x75, 0x65, 0x74, 0x2d, 0x72, 0x73, 0x20, 0x76,
0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x30, 0x2e, 0x33, 0x2e, 0x30, 0x20, 0x28, 0x62, 0x75,
0x69, 0x6c, 0x64, 0x20, 0x62, 0x34, 0x35, 0x63, 0x65, 0x37, 0x63, 0x62, 0x61, 0x32, 0x31, 0x39,
0x39, 0x66, 0x32, 0x32, 0x64, 0x39, 0x33, 0x32, 0x36, 0x39, 0x63, 0x31, 0x35, 0x30, 0x64, 0x38,
0x61, 0x38, 0x33, 0x39, 0x31, 0x36, 0x63, 0x36, 0x39, 0x62, 0x35, 0x65, 0x29, 0x00, 0x32, 0x01,
0x00, 0x00, 0x50, 0x41, 0x52, 0x31};

auto read_opts = cudf::io::parquet_reader_options::builder(
cudf::io::source_info{reinterpret_cast<char const*>(repeated_bytes), sizeof(repeated_bytes)});
auto result = cudf::io::read_parquet(read_opts);

EXPECT_EQ(result.tbl->view().column(0).size(), 6);
EXPECT_EQ(result.tbl->view().num_columns(), 2);

column_wrapper<int32_t> col0{1, 2, 3, 4, 5, 6};
column_wrapper<int64_t> child0{{5555555555l, 1111111111l, 1111111111l, 2222222222l, 3333333333l}};
cudf::test::strings_column_wrapper child1{{"-", "home", "home", "-", "mobile"}, {0, 1, 1, 0, 1}};
auto struct_col = cudf::test::structs_column_wrapper{{child0, child1}};

auto list_offsets_column =
cudf::test::fixed_width_column_wrapper<cudf::size_type>{0, 0, 0, 0, 1, 2, 5}.release();
auto num_list_rows = list_offsets_column->size() - 1;

auto mask = cudf::create_null_mask(6, cudf::mask_state::ALL_VALID);
cudf::set_null_mask(static_cast<cudf::bitmask_type*>(mask.data()), 0, 2, false);

auto list_col = cudf::make_lists_column(
num_list_rows, std::move(list_offsets_column), struct_col.release(), 2, std::move(mask));

std::vector<std::unique_ptr<cudf::column>> struct_children;
struct_children.push_back(std::move(list_col));

auto outer_struct =
cudf::test::structs_column_wrapper{{std::move(struct_children)}, {0, 0, 1, 1, 1, 1}};
table_view expected{{col0, outer_struct}};

CUDF_TEST_EXPECT_TABLES_EQUAL(result.tbl->view(), expected);
}

CUDF_TEST_PROGRAM_MAIN()
Loading