Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle array of map and array of array case accurately in parquet reader #9728

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,33 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(

if (schemaElement.__isset.converted_type) {
switch (schemaElement.converted_type) {
case thrift::ConvertedType::LIST: {
VELOX_CHECK_EQ(children.size(), 1);
const auto& child = children[0];
isRepeated = true;
// In case the child is a MAP or current element is repeated then
// wrap child around additional ARRAY
if (child->type()->kind() == TypeKind::MAP ||
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see the check is testing if the child is a MAP here. Do you think it should not be checking the child's repeatetion type is repeated?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to check for child schemaElement being REPEATED which could have taken care of both MAP and LIST, it did work for MAP but failing for different LIST case. Hence I had to keep the exclusive child MAP check along with REPEATED check for current element. I think this is because we create a special ARRAY element when it is leaf node here . In such cases we don't want to create new ARRAY element at this level, hence exclusive MAP check here. The backward compatibility rules are tricky for LIST and MAP making this piece of code harder to fathom. There is a scope to refactor this code in future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hitarth Actually, have you tried to move the the leaf node Array construction to here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I had tried to do that. But it isn't trivial and would involve changes which are not related with this PR. Hence was thinking of addressing this in future refactor.

schemaElement.repetition_type ==
thrift::FieldRepetitionType::REPEATED) {
return std::make_unique<ParquetTypeWithId>(
TypeFactory<TypeKind::ARRAY>::create(child->type()),
std::move(children),
curSchemaIdx,
maxSchemaElementIdx,
ParquetTypeWithId::kNonLeaf,
std::move(name),
std::nullopt,
std::nullopt,
maxRepeat + 1,
maxDefine,
isOptional,
isRepeated);
}
// Only special case list of map and list of list is handled here,
// other generic case is handled with case MAP
[[fallthrough]];
}
case thrift::ConvertedType::MAP_KEY_VALUE:
// If the MAP_KEY_VALUE annotated group's parent is a MAP, it should
// be the repeated key_value group that directly contains the key and
Expand Down Expand Up @@ -337,7 +364,6 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
// a MAP-annotated group.
[[fallthrough]];

case thrift::ConvertedType::LIST:
case thrift::ConvertedType::MAP: {
VELOX_CHECK_EQ(children.size(), 1);
const auto& child = children[0];
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
149 changes: 149 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,155 @@ TEST_F(ParquetReaderTest, testV2PageWithZeroMaxDefRep) {
outputRowType, *rowReader, expected, *leafPool_);
}

TEST_F(ParquetReaderTest, arrayOfMapOfIntKeyArrayValue) {
// The Schema is of type
// message hive_schema {
// optional group test (LIST) {
// repeated group array (MAP) {
// repeated group key_value (MAP_KEY_VALUE) {
// required binary key (UTF8);
// optional group value (LIST) {
// repeated int32 array;
// }
// }
// }
// }
// }
const std::string expectedVeloxType =
"ROW<test:ARRAY<MAP<VARCHAR,ARRAY<INTEGER>>>>";
const std::string sample(
getExampleFilePath("array_of_map_of_int_key_array_value.parquet"));
facebook::velox::dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto reader = createReader(sample, readerOptions);
EXPECT_EQ(reader->rowType()->toString(), expectedVeloxType);
auto numRows = reader->numberOfRows();
auto type = reader->typeWithId();
RowReaderOptions rowReaderOpts;
auto rowType = ROW({"test"}, {ARRAY(MAP(VARCHAR(), ARRAY(INTEGER())))});
rowReaderOpts.setScanSpec(makeScanSpec(rowType));
auto rowReader = reader->createRowReader(rowReaderOpts);
auto result = BaseVector::create(rowType, 10, leafPool_.get());
constexpr int kBatchSize = 1000;
while (rowReader->next(kBatchSize, result)) {
}
}

TEST_F(ParquetReaderTest, arrayOfMapOfIntKeyStructValue) {
// The Schema is of type
// message hive_schema {
// optional group test (LIST) {
// repeated group array (MAP) {
// repeated group key_value (MAP_KEY_VALUE) {
// required int32 key;
// optional group value {
// optional binary stringfield (UTF8);
// optional int64 longfield;
// }
// }
// }
// }
// }
const std::string expectedVeloxType =
"ROW<test:ARRAY<MAP<INTEGER,ROW<stringfield:VARCHAR,longfield:BIGINT>>>>";
const std::string sample(
getExampleFilePath("array_of_map_of_int_key_struct_value.parquet"));
facebook::velox::dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto reader = createReader(sample, readerOptions);
EXPECT_EQ(reader->rowType()->toString(), expectedVeloxType);
auto numRows = reader->numberOfRows();
auto type = reader->typeWithId();
RowReaderOptions rowReaderOpts;
auto rowType = reader->rowType();
rowReaderOpts.setScanSpec(makeScanSpec(rowType));
auto rowReader = reader->createRowReader(rowReaderOpts);
auto result = BaseVector::create(rowType, 10, leafPool_.get());
constexpr int kBatchSize = 1000;
while (rowReader->next(kBatchSize, result)) {
}
}

TEST_F(ParquetReaderTest, struct_of_array_of_array) {
// The Schema is of type
// message hive_schema {
// optional group test {
// optional group stringarrayfield (LIST) {
// repeated group array (LIST) {
// repeated binary array (UTF8);
// }
// }
// optional group intarrayfield (LIST) {
// repeated group array (LIST) {
// repeated int32 array;
// }
// }
// }
// }
const std::string expectedVeloxType =
"ROW<test:ROW<stringarrayfield:ARRAY<ARRAY<VARCHAR>>,intarrayfield:ARRAY<ARRAY<INTEGER>>>>";
const std::string sample(
getExampleFilePath("struct_of_array_of_array.parquet"));
facebook::velox::dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto reader = createReader(sample, readerOptions);
auto numRows = reader->numberOfRows();
auto type = reader->typeWithId();
EXPECT_EQ(type->size(), 1ULL);
EXPECT_EQ(reader->rowType()->toString(), expectedVeloxType);

auto test_column = type->childAt(0);
EXPECT_EQ(test_column->type()->kind(), TypeKind::ROW);
EXPECT_EQ(type->childByName("test"), test_column);

// test_column has 2 children
EXPECT_EQ(test_column->size(), 2ULL);
// explore 1st child of test_column
auto stringarrayfield_column = test_column->childAt(0);
EXPECT_EQ(stringarrayfield_column->type()->kind(), TypeKind::ARRAY);

// stringarrayfield_column column has 1 child
EXPECT_EQ(stringarrayfield_column->size(), 1ULL);
// explore 1st child of stringarrayfield_column
auto array_column = stringarrayfield_column->childAt(0);
EXPECT_EQ(array_column->type()->kind(), TypeKind::ARRAY);

// array_column column has 1 child
EXPECT_EQ(array_column->size(), 1ULL);
// explore 1st child of array_column
auto array_leaf_column = array_column->childAt(0);
EXPECT_EQ(array_leaf_column->type()->kind(), TypeKind::VARCHAR);

// explore 2nd child of test_column
auto intarrayfield_column = test_column->childAt(1);
EXPECT_EQ(intarrayfield_column->type()->kind(), TypeKind::ARRAY);
EXPECT_EQ(test_column->childByName("intarrayfield"), intarrayfield_column);

// intarrayfield_column column has 1 child
EXPECT_EQ(intarrayfield_column->size(), 1ULL);
// explore 1st child of intarrayfield_column
auto array_column_for_intarrayfield = intarrayfield_column->childAt(0);
EXPECT_EQ(array_column_for_intarrayfield->type()->kind(), TypeKind::ARRAY);

// array_column_for_intarrayfield column has 1 child
EXPECT_EQ(array_column_for_intarrayfield->size(), 1ULL);
// explore 1st child
auto array_leaf_column_for_intarrayfield =
array_column_for_intarrayfield->childAt(0);
EXPECT_EQ(
array_leaf_column_for_intarrayfield->type()->kind(), TypeKind::INTEGER);

RowReaderOptions rowReaderOpts;
auto rowType =
ROW({"test"},
{ROW(
{"stringarrayfield", "intarrayfield"},
{ARRAY(ARRAY(VARCHAR())), ARRAY(ARRAY(INTEGER()))})});
rowReaderOpts.setScanSpec(makeScanSpec(rowType));
auto rowReader = reader->createRowReader(rowReaderOpts);
auto result = BaseVector::create(rowType, 10, leafPool_.get());
constexpr int kBatchSize = 1000;
while (rowReader->next(kBatchSize, result)) {
}
}

TEST_F(ParquetReaderTest, testLzoDataPage) {
const std::string sample(getExampleFilePath("lzo.parquet"));

Expand Down
Loading