Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parquet schema interpretation issue #13277

Merged
7 changes: 5 additions & 2 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,14 @@ struct SchemaElement {
// https://github.com/apache/parquet-cpp/blob/642da05/src/parquet/schema.h#L49-L50
// One-level LIST encoding: Only allows required lists with required cells:
// repeated value_type name
[[nodiscard]] bool is_one_level_list() const
[[nodiscard]] bool is_one_level_list(SchemaElement const& parent) const
{
return repetition_type == REPEATED and num_children == 0;
return repetition_type == REPEATED and num_children == 0 and not parent.is_list();
}

// returns true if the element is a list
[[nodiscard]] bool is_list() const { return converted_type == LIST; }

// in parquet terms, a group is a level of nesting in the schema. a group
// can be a struct or a list
[[nodiscard]] bool is_struct() const
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>
}

// if we're at the root, this is a new output column
auto const col_type = schema_elem.is_one_level_list()
auto const col_type = schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))
? type_id::LIST
: to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const dtype = to_data_type(col_type, schema_elem);
Expand Down Expand Up @@ -465,7 +465,7 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>
input_column_info{schema_idx, schema_elem.name, schema_elem.max_repetition_level > 0});

// set up child output column for one-level encoding list
if (schema_elem.is_one_level_list()) {
if (schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))) {
// determine the element data type
auto const element_type =
to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
Expand All @@ -486,7 +486,9 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>
std::copy(nesting.cbegin(), nesting.cend(), std::back_inserter(input_col.nesting));

// pop off the extra nesting element.
if (schema_elem.is_one_level_list()) { nesting.pop_back(); }
if (schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))) {
nesting.pop_back();
}

path_is_valid = true; // If we're able to reach leaf then path is valid
}
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,12 @@ class aggregate_reader_metadata {

// walk upwards, skipping repeated fields
while (schema_index > 0) {
if (!pfm.schema[schema_index].is_stub()) { depth++; }
auto const& elm = pfm.schema[schema_index];
if (!elm.is_stub()) { depth++; }
// schema of one-level encoding list doesn't contain nesting information, so we need to
// manually add an extra nesting level
if (pfm.schema[schema_index].is_one_level_list()) { depth++; }
schema_index = pfm.schema[schema_index].parent_idx;
if (elm.is_one_level_list(pfm.schema[elm.parent_idx])) { depth++; }
schema_index = elm.parent_idx;
}
return depth;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void generate_depth_remappings(std::map<int, std::pair<std::vector<int>, std::ve
shallowest = cur_schema.is_stub() ? cur_depth + 1 : cur_depth;
}
// if it's one-level encoding list
else if (cur_schema.is_one_level_list()) {
else if (cur_schema.is_one_level_list(md.get_schema(cur_schema.parent_idx))) {
shallowest = cur_depth - 1;
}
if (!cur_schema.is_stub()) { cur_depth--; }
Expand Down
60 changes: 60 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5376,4 +5376,64 @@ TEST_F(ParquetWriterTest, UserNullabilityInvalid)
EXPECT_THROW(cudf::io::write_parquet(write_opts), cudf::logic_error);
}

TEST_F(ParquetReaderTest, SingleLevelLists)
{
unsigned char list_bytes[] = {
0x50, 0x41, 0x52, 0x31, 0x15, 0x00, 0x15, 0x28, 0x15, 0x28, 0x15, 0xa7, 0xce, 0x91, 0x8c, 0x06,
0x1c, 0x15, 0x04, 0x15, 0x00, 0x15, 0x06, 0x15, 0x06, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03,
0x02, 0x02, 0x00, 0x00, 0x00, 0x03, 0x03, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x15,
0x02, 0x19, 0x3c, 0x48, 0x0c, 0x73, 0x70, 0x61, 0x72, 0x6b, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d,
0x61, 0x15, 0x02, 0x00, 0x35, 0x00, 0x18, 0x01, 0x66, 0x15, 0x02, 0x15, 0x06, 0x4c, 0x3c, 0x00,
0x00, 0x00, 0x15, 0x02, 0x25, 0x04, 0x18, 0x05, 0x61, 0x72, 0x72, 0x61, 0x79, 0x00, 0x16, 0x02,
0x19, 0x1c, 0x19, 0x1c, 0x26, 0x08, 0x1c, 0x15, 0x02, 0x19, 0x25, 0x00, 0x06, 0x19, 0x28, 0x01,
0x66, 0x05, 0x61, 0x72, 0x72, 0x61, 0x79, 0x15, 0x00, 0x16, 0x04, 0x16, 0x56, 0x16, 0x56, 0x26,
0x08, 0x3c, 0x18, 0x04, 0x01, 0x00, 0x00, 0x00, 0x18, 0x04, 0x00, 0x00, 0x00, 0x00, 0x16, 0x00,
0x28, 0x04, 0x01, 0x00, 0x00, 0x00, 0x18, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x19, 0x1c, 0x15,
0x00, 0x15, 0x00, 0x15, 0x02, 0x00, 0x00, 0x00, 0x16, 0x56, 0x16, 0x02, 0x26, 0x08, 0x16, 0x56,
0x14, 0x00, 0x00, 0x28, 0x13, 0x52, 0x41, 0x50, 0x49, 0x44, 0x53, 0x20, 0x53, 0x70, 0x61, 0x72,
0x6b, 0x20, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x19, 0x1c, 0x1c, 0x00, 0x00, 0x00, 0x9f, 0x00,
0x00, 0x00, 0x50, 0x41, 0x52, 0x31};

// read single level list reproducing parquet file
cudf::io::parquet_reader_options read_opts = cudf::io::parquet_reader_options::builder(
cudf::io::source_info{reinterpret_cast<const char*>(list_bytes), sizeof(list_bytes)});
auto table = cudf::io::read_parquet(read_opts);

auto const c0 = table.tbl->get_column(0);
EXPECT_TRUE(c0.type().id() == cudf::type_id::LIST);

auto const lc = cudf::lists_column_view(c0);
auto const child = lc.child();
EXPECT_TRUE(child.type().id() == cudf::type_id::INT32);
}

TEST_F(ParquetReaderTest, ChunkedSingleLevelLists)
{
unsigned char list_bytes[] = {
0x50, 0x41, 0x52, 0x31, 0x15, 0x00, 0x15, 0x28, 0x15, 0x28, 0x15, 0xa7, 0xce, 0x91, 0x8c, 0x06,
0x1c, 0x15, 0x04, 0x15, 0x00, 0x15, 0x06, 0x15, 0x06, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03,
0x02, 0x02, 0x00, 0x00, 0x00, 0x03, 0x03, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x15,
0x02, 0x19, 0x3c, 0x48, 0x0c, 0x73, 0x70, 0x61, 0x72, 0x6b, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d,
0x61, 0x15, 0x02, 0x00, 0x35, 0x00, 0x18, 0x01, 0x66, 0x15, 0x02, 0x15, 0x06, 0x4c, 0x3c, 0x00,
0x00, 0x00, 0x15, 0x02, 0x25, 0x04, 0x18, 0x05, 0x61, 0x72, 0x72, 0x61, 0x79, 0x00, 0x16, 0x02,
0x19, 0x1c, 0x19, 0x1c, 0x26, 0x08, 0x1c, 0x15, 0x02, 0x19, 0x25, 0x00, 0x06, 0x19, 0x28, 0x01,
0x66, 0x05, 0x61, 0x72, 0x72, 0x61, 0x79, 0x15, 0x00, 0x16, 0x04, 0x16, 0x56, 0x16, 0x56, 0x26,
0x08, 0x3c, 0x18, 0x04, 0x01, 0x00, 0x00, 0x00, 0x18, 0x04, 0x00, 0x00, 0x00, 0x00, 0x16, 0x00,
0x28, 0x04, 0x01, 0x00, 0x00, 0x00, 0x18, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x19, 0x1c, 0x15,
0x00, 0x15, 0x00, 0x15, 0x02, 0x00, 0x00, 0x00, 0x16, 0x56, 0x16, 0x02, 0x26, 0x08, 0x16, 0x56,
0x14, 0x00, 0x00, 0x28, 0x13, 0x52, 0x41, 0x50, 0x49, 0x44, 0x53, 0x20, 0x53, 0x70, 0x61, 0x72,
0x6b, 0x20, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x19, 0x1c, 0x1c, 0x00, 0x00, 0x00, 0x9f, 0x00,
0x00, 0x00, 0x50, 0x41, 0x52, 0x31};

auto reader = cudf::io::chunked_parquet_reader(
1L << 31,
cudf::io::parquet_reader_options::builder(
cudf::io::source_info{reinterpret_cast<const char*>(list_bytes), sizeof(list_bytes)}));
int iterations = 0;
while (reader.has_next() && iterations < 10) {
auto chunk = reader.read_chunk();
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
}
EXPECT_TRUE(iterations < 10);
}

CUDF_TEST_PROGRAM_MAIN()
Binary file not shown.
14 changes: 14 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2573,6 +2573,20 @@ def postprocess(val):
assert_eq(expect, got, check_dtype=False)


# testing a specific bug-fix/edge case.
# specifically: in a parquet file containing a particular way of representing
# a list column in a schema, the cudf reader was confusing
# nesting information and building a list of list of int instead
# of a list of int
def test_parquet_reader_one_level_list3(datadir):
fname = datadir / "one_level_list3.parquet"

expect = pd.read_parquet(fname)
got = cudf.read_parquet(fname)

assert_eq(expect, got, check_dtype=True)


@pytest.mark.parametrize("size_bytes", [4_000_000, 1_000_000, 600_000])
@pytest.mark.parametrize("size_rows", [1_000_000, 100_000, 10_000])
def test_to_parquet_row_group_size(
Expand Down