Skip to content

Commit

Permalink
Fix parquet schema interpretation issue (#13277)
Browse files Browse the repository at this point in the history
There is a bug reading parquet files that have a specific encoding. The typical list is 
```
<list-repetition> group <name> (LIST) {
  repeated group list {
    <element-repetition> <element-type> element;
  }
}
```
but it can also be
```
<list-repetition> group <name> (LIST) {
  repeated group list {
    repeated int32 name
  }
}
```
This second case was failing and this fixes that. The issue was the `one_level_list` was returning true for the name, which resulted in another nesting level. The fix is to only return true if the immediate parent is not a list.

closes #13237
closes #13239

Authors:
  - Mike Wilson (https://github.com/hyperbolic2346)
  - MithunR (https://github.com/mythrocks)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - https://github.com/nvdbaranec
  - MithunR (https://github.com/mythrocks)
  - Ashwin Srinath (https://github.com/shwina)

URL: #13277
  • Loading branch information
hyperbolic2346 authored May 8, 2023
1 parent fce2f0b commit 0a5065f
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 9 deletions.
7 changes: 5 additions & 2 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,14 @@ struct SchemaElement {
// https://github.com/apache/parquet-cpp/blob/642da05/src/parquet/schema.h#L49-L50
// One-level LIST encoding: Only allows required lists with required cells:
// repeated value_type name
[[nodiscard]] bool is_one_level_list() const
[[nodiscard]] bool is_one_level_list(SchemaElement const& parent) const
{
return repetition_type == REPEATED and num_children == 0;
return repetition_type == REPEATED and num_children == 0 and not parent.is_list();
}

// returns true if the element is a list
[[nodiscard]] bool is_list() const { return converted_type == LIST; }

// in parquet terms, a group is a level of nesting in the schema. a group
// can be a struct or a list
[[nodiscard]] bool is_struct() const
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>
}

// if we're at the root, this is a new output column
auto const col_type = schema_elem.is_one_level_list()
auto const col_type = schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))
? type_id::LIST
: to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const dtype = to_data_type(col_type, schema_elem);
Expand Down Expand Up @@ -465,7 +465,7 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>
input_column_info{schema_idx, schema_elem.name, schema_elem.max_repetition_level > 0});

// set up child output column for one-level encoding list
if (schema_elem.is_one_level_list()) {
if (schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))) {
// determine the element data type
auto const element_type =
to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
Expand All @@ -486,7 +486,9 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>
std::copy(nesting.cbegin(), nesting.cend(), std::back_inserter(input_col.nesting));

// pop off the extra nesting element.
if (schema_elem.is_one_level_list()) { nesting.pop_back(); }
if (schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx))) {
nesting.pop_back();
}

path_is_valid = true; // If we're able to reach leaf then path is valid
}
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,12 @@ class aggregate_reader_metadata {

// walk upwards, skipping repeated fields
while (schema_index > 0) {
if (!pfm.schema[schema_index].is_stub()) { depth++; }
auto const& elm = pfm.schema[schema_index];
if (!elm.is_stub()) { depth++; }
// schema of one-level encoding list doesn't contain nesting information, so we need to
// manually add an extra nesting level
if (pfm.schema[schema_index].is_one_level_list()) { depth++; }
schema_index = pfm.schema[schema_index].parent_idx;
if (elm.is_one_level_list(pfm.schema[elm.parent_idx])) { depth++; }
schema_index = elm.parent_idx;
}
return depth;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void generate_depth_remappings(std::map<int, std::pair<std::vector<int>, std::ve
shallowest = cur_schema.is_stub() ? cur_depth + 1 : cur_depth;
}
// if it's one-level encoding list
else if (cur_schema.is_one_level_list()) {
else if (cur_schema.is_one_level_list(md.get_schema(cur_schema.parent_idx))) {
shallowest = cur_depth - 1;
}
if (!cur_schema.is_stub()) { cur_depth--; }
Expand Down
60 changes: 60 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5376,4 +5376,64 @@ TEST_F(ParquetWriterTest, UserNullabilityInvalid)
EXPECT_THROW(cudf::io::write_parquet(write_opts), cudf::logic_error);
}

TEST_F(ParquetReaderTest, SingleLevelLists)
{
unsigned char list_bytes[] = {
0x50, 0x41, 0x52, 0x31, 0x15, 0x00, 0x15, 0x28, 0x15, 0x28, 0x15, 0xa7, 0xce, 0x91, 0x8c, 0x06,
0x1c, 0x15, 0x04, 0x15, 0x00, 0x15, 0x06, 0x15, 0x06, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03,
0x02, 0x02, 0x00, 0x00, 0x00, 0x03, 0x03, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x15,
0x02, 0x19, 0x3c, 0x48, 0x0c, 0x73, 0x70, 0x61, 0x72, 0x6b, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d,
0x61, 0x15, 0x02, 0x00, 0x35, 0x00, 0x18, 0x01, 0x66, 0x15, 0x02, 0x15, 0x06, 0x4c, 0x3c, 0x00,
0x00, 0x00, 0x15, 0x02, 0x25, 0x04, 0x18, 0x05, 0x61, 0x72, 0x72, 0x61, 0x79, 0x00, 0x16, 0x02,
0x19, 0x1c, 0x19, 0x1c, 0x26, 0x08, 0x1c, 0x15, 0x02, 0x19, 0x25, 0x00, 0x06, 0x19, 0x28, 0x01,
0x66, 0x05, 0x61, 0x72, 0x72, 0x61, 0x79, 0x15, 0x00, 0x16, 0x04, 0x16, 0x56, 0x16, 0x56, 0x26,
0x08, 0x3c, 0x18, 0x04, 0x01, 0x00, 0x00, 0x00, 0x18, 0x04, 0x00, 0x00, 0x00, 0x00, 0x16, 0x00,
0x28, 0x04, 0x01, 0x00, 0x00, 0x00, 0x18, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x19, 0x1c, 0x15,
0x00, 0x15, 0x00, 0x15, 0x02, 0x00, 0x00, 0x00, 0x16, 0x56, 0x16, 0x02, 0x26, 0x08, 0x16, 0x56,
0x14, 0x00, 0x00, 0x28, 0x13, 0x52, 0x41, 0x50, 0x49, 0x44, 0x53, 0x20, 0x53, 0x70, 0x61, 0x72,
0x6b, 0x20, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x19, 0x1c, 0x1c, 0x00, 0x00, 0x00, 0x9f, 0x00,
0x00, 0x00, 0x50, 0x41, 0x52, 0x31};

// read single level list reproducing parquet file
cudf::io::parquet_reader_options read_opts = cudf::io::parquet_reader_options::builder(
cudf::io::source_info{reinterpret_cast<const char*>(list_bytes), sizeof(list_bytes)});
auto table = cudf::io::read_parquet(read_opts);

auto const c0 = table.tbl->get_column(0);
EXPECT_TRUE(c0.type().id() == cudf::type_id::LIST);

auto const lc = cudf::lists_column_view(c0);
auto const child = lc.child();
EXPECT_TRUE(child.type().id() == cudf::type_id::INT32);
}

TEST_F(ParquetReaderTest, ChunkedSingleLevelLists)
{
unsigned char list_bytes[] = {
0x50, 0x41, 0x52, 0x31, 0x15, 0x00, 0x15, 0x28, 0x15, 0x28, 0x15, 0xa7, 0xce, 0x91, 0x8c, 0x06,
0x1c, 0x15, 0x04, 0x15, 0x00, 0x15, 0x06, 0x15, 0x06, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03,
0x02, 0x02, 0x00, 0x00, 0x00, 0x03, 0x03, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x15,
0x02, 0x19, 0x3c, 0x48, 0x0c, 0x73, 0x70, 0x61, 0x72, 0x6b, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d,
0x61, 0x15, 0x02, 0x00, 0x35, 0x00, 0x18, 0x01, 0x66, 0x15, 0x02, 0x15, 0x06, 0x4c, 0x3c, 0x00,
0x00, 0x00, 0x15, 0x02, 0x25, 0x04, 0x18, 0x05, 0x61, 0x72, 0x72, 0x61, 0x79, 0x00, 0x16, 0x02,
0x19, 0x1c, 0x19, 0x1c, 0x26, 0x08, 0x1c, 0x15, 0x02, 0x19, 0x25, 0x00, 0x06, 0x19, 0x28, 0x01,
0x66, 0x05, 0x61, 0x72, 0x72, 0x61, 0x79, 0x15, 0x00, 0x16, 0x04, 0x16, 0x56, 0x16, 0x56, 0x26,
0x08, 0x3c, 0x18, 0x04, 0x01, 0x00, 0x00, 0x00, 0x18, 0x04, 0x00, 0x00, 0x00, 0x00, 0x16, 0x00,
0x28, 0x04, 0x01, 0x00, 0x00, 0x00, 0x18, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x19, 0x1c, 0x15,
0x00, 0x15, 0x00, 0x15, 0x02, 0x00, 0x00, 0x00, 0x16, 0x56, 0x16, 0x02, 0x26, 0x08, 0x16, 0x56,
0x14, 0x00, 0x00, 0x28, 0x13, 0x52, 0x41, 0x50, 0x49, 0x44, 0x53, 0x20, 0x53, 0x70, 0x61, 0x72,
0x6b, 0x20, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x19, 0x1c, 0x1c, 0x00, 0x00, 0x00, 0x9f, 0x00,
0x00, 0x00, 0x50, 0x41, 0x52, 0x31};

auto reader = cudf::io::chunked_parquet_reader(
1L << 31,
cudf::io::parquet_reader_options::builder(
cudf::io::source_info{reinterpret_cast<const char*>(list_bytes), sizeof(list_bytes)}));
int iterations = 0;
while (reader.has_next() && iterations < 10) {
auto chunk = reader.read_chunk();
}
EXPECT_TRUE(iterations < 10);
}

CUDF_TEST_PROGRAM_MAIN()
Binary file not shown.
14 changes: 14 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2573,6 +2573,20 @@ def postprocess(val):
assert_eq(expect, got, check_dtype=False)


# testing a specific bug-fix/edge case.
# specifically: in a parquet file containing a particular way of representing
# a list column in a schema, the cudf reader was confusing
# nesting information and building a list of list of int instead
# of a list of int
def test_parquet_reader_one_level_list3(datadir):
fname = datadir / "one_level_list3.parquet"

expect = pd.read_parquet(fname)
got = cudf.read_parquet(fname)

assert_eq(expect, got, check_dtype=True)


@pytest.mark.parametrize("size_bytes", [4_000_000, 1_000_000, 600_000])
@pytest.mark.parametrize("size_rows", [1_000_000, 100_000, 10_000])
def test_to_parquet_row_group_size(
Expand Down

0 comments on commit 0a5065f

Please sign in to comment.