Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine Parquet schema mismatch message #12550

Merged
merged 1 commit into from
Aug 30, 2019
Merged

Conversation

zhenxiao
Copy link
Collaborator

Copy link
Contributor

@nezihyigitbasi nezihyigitbasi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a unit test?

return columnReader.readPrimitive(field);
}
catch (UnsupportedOperationException e) {
throw new PrestoException(INVALID_SCHEMA_PROPERTY, format("There is a mismatch between file schema and partition schema. The column %s in file %s is declared as type %s but parquet file declared column type as %s.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

The column %s is declared as type %s, but the Parquet file %s declares the column as type %s

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

try {
return columnReader.readPrimitive(field);
}
catch (UnsupportedOperationException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnsupportedOperationException is a broad exception to catch here. Maybe we should throw a specific Parquet exception and catch it here. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. UnsupportedOperationException is actually from type.writeLong etc, when lower level ColumnReader trying to read parquet values. How about throw ParquetCorruptionException there, and catch it here?

Copy link
Collaborator Author

@zhenxiao zhenxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you, @nezihyigitbasi
will get comments addressed

return columnReader.readPrimitive(field);
}
catch (UnsupportedOperationException e) {
throw new PrestoException(INVALID_SCHEMA_PROPERTY, format("There is a mismatch between file schema and partition schema. The column %s in file %s is declared as type %s but parquet file declared column type as %s.",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

try {
return columnReader.readPrimitive(field);
}
catch (UnsupportedOperationException e) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. UnsupportedOperationException is actually from type.writeLong etc, when lower level ColumnReader trying to read parquet values. How about throw ParquetCorruptionException there, and catch it here?

@zhenxiao
Copy link
Collaborator Author

@nezihyigitbasi I get comments addressed
could you please review?

@zhenxiao
Copy link
Collaborator Author

zhenxiao commented Jul 1, 2019

kindly ping @nezihyigitbasi

@nezihyigitbasi
Copy link
Contributor

@arhimondr @rschlussel can you please take a look?

@arhimondr arhimondr self-assigned this Aug 8, 2019
@arhimondr
Copy link
Member

arhimondr commented Aug 10, 2019

From what i understand, this PR tries to address the confusing error message in case there's a schema mismatch between a file and the partition schema.

@zhenxiao Could you please elaborate a little bit more why it was decided to implement it this way? (by catching the exception in the very last moment).

For example the schema validation can be done early, when getting the Parquet Type:
https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java#L163
https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java#L255
The HiveColumnHandle contains the Hive Schema type, so it is possible to check if types match.

The problem is that the fact that the method in Type is implemented (e.g.: Type#writeSlice) does not necessarily mean that the types are the same / compatible. For example both Decimal and Varchar use Slice as a value. Or Integer and Float use long (writeLong).

@nezihyigitbasi Do you know how something similar is done in ORC?

@zhenxiao zhenxiao force-pushed the parquet-msg branch 3 times, most recently from 58cac73 to 36d2855 Compare August 15, 2019 14:54
@zhenxiao
Copy link
Collaborator Author

thank you, @arhimondr
get comments addressed. how about this approach?

Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the current approach. Could you please implement it for the complex types (ARRAY, MAP, ROW) as well?

private static boolean schemaMatch(org.apache.parquet.schema.Type parquetType, HiveColumnHandle column)
{
String prestoType = column.getTypeSignature().getBase();
if (prestoType.equals(MAP) || prestoType.equals(ARRAY) || prestoType.equals(ROW)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please check the signature for the complex types recursively?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest passing here the resolved Type object, so the convenient Type#getTypeParameters can be used to recurse into the nested types.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g.:

Type type = typeManager.getType(column.getTypeSignature());

}
if (parquetType.isPrimitive()) {
PrimitiveTypeName parquetTypeName = parquetType.asPrimitiveType().getPrimitiveTypeName();
return ((parquetTypeName == PrimitiveTypeName.INT32 && (prestoType.equals(INTEGER) || prestoType.equals(SMALLINT) || prestoType.equals(DATE) || prestoType.equals(DECIMAL) || prestoType.equals(TINYINT))) ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: switch seems to be more readable

switch (parquetTypeName) {
                case INT64:
                    return prestoType.equals(BIGINT) || prestoType.equals(DECIMAL);
                case INT32:
                    return prestoType.equals(INTEGER) || prestoType.equals(SMALLINT) || prestoType.equals(DATE) || prestoType.equals(DECIMAL) || prestoType.equals(TINYINT);
                case BOOLEAN:
                    return prestoType.equals(BOOLEAN);
                case BINARY:
                    return prestoType.equals(VARBINARY) || prestoType.equals(VARCHAR) || prestoType.startsWith(CHAR);
                case FLOAT:
                    return prestoType.equals(REAL);
                case DOUBLE:
                    return prestoType.equals(DOUBLE);
                case INT96:
                    return prestoType.equals(TIMESTAMP);
                case FIXED_LEN_BYTE_ARRAY:
                    return prestoType.equals(DECIMAL);
                default:
                    throw new IllegalArgumentException("Unexpected parquet type name: " + parquetTypeName);
            }

@zhenxiao
Copy link
Collaborator Author

thank you, @arhimondr
I get comments addressed
I just found, actually, the schema checking should only work for Parquet index based access, for Parquet name based access, the fields inside structs could change order, or primitive columns could change order, should not check schema in this case.
Could you please review when u are free?

Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % question

Also I'm not quite sure how good is the test coverage for Parquet. Do you have any internal test suites? Have you tried to reply some real workload with this patch on?

}

// name based access could support schema evolution in Parquet
if (useParquetColumnNames) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we check for this case?

Could you please elaborate a little bit more what does the schema evolution mean? Is it the table -> partition schema migration? (changing the columns for a table with existing partitions)?

What If there's a mismatch? Wouldn't it fail the old way when reading?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @arhimondr
Yes, production queries were tested with this patch.
Schema evolution in our case:

  1. table schema unchanged
  2. Parquet file schema changes(mostly in struct fields), e.g.
    new fields added to struct, s<a, b, c> becomes: s<a, b, c, d>, and partition schema will be updated to s<a, b, c, d>. In this case, select s.d from old Parquet files will return null.
    fields reordered, s<a, b, c> becomes s<b, c, a>, and partition schema will be updated to s<b, c, a>. In this case, we could get s,a, s,b, and s,c values if turning on use-parquet-column-names.
  3. old Parquet files will not be changed
  4. field rename or type change are not allowed.

Based on above, when useParquetColumnNames are enabled, Parquet could use field name to get the corresponding parquet type, no need to check. (It is possible partition schema with field double but parquet schema with field float, which is a schema mismatch, will fail as before)

We need this check only for index based access for Parquet (use-parquet-column-names set to false), so that field types or primitive types could check based on field/column order before decoding the data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for Parquet name based access, the fields inside structs could change order

From what i understand it is mostly about structs (since there are two modes of accessing fields).

However it seems that structs are always matched in the ordinal way (by index): https://github.com/prestodb/presto/blob/master/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetReader.java#L176

Does it mean that we can safely check the schema for both access types ("by name" and "by index")?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch. because we are running a slightly different version of Presto, with our customized subfield pruning patch, which only scan necessary fields from struct, instead of scanning the whole struct. Part of that is in: #13271

let me double check with name based access

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @arhimondr you are correct, we could turn check on for both index based access and name based access
I've made corresponding changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@nezihyigitbasi, I think it is ready to go. Do you have any additional comments?

Copy link
Contributor

@nezihyigitbasi nezihyigitbasi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % comments. Thanks @zhenxiao and @arhimondr!

String prestoType = type.getTypeSignature().getBase();
if (parquetType instanceof GroupType) {
GroupType groupType = parquetType.asGroupType();
if (prestoType.equals(StandardTypes.ROW)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a switch here?

}

if (!schemaMatch(type, prestoType)) {
String parquetTypeName = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This null assignment is redundant.

}

if (type == null) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in a separate PR it would be good to update this method to return Optional instead of nulls.

return type;
}

private static boolean schemaMatch(org.apache.parquet.schema.Type parquetType, Type type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we may want to rename this method. Some options are checkSchemaMatch, isSchemaMatching, isSchemaCompatible, etc.

String prestoType = type.getTypeSignature().getBase();
if (parquetType instanceof GroupType) {
GroupType groupType = parquetType.asGroupType();
if (prestoType.equals(StandardTypes.ROW)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can static import StandardTypes.X here and below.

return type;
}

private static boolean schemaMatch(org.apache.parquet.schema.Type parquetType, Type type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have tests to test this method for different backward compatibility rules defined in this doc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet, we could add tests in another PR

throws Exception
{
TestColumn floatColumn = new TestColumn("column_name", javaFloatObjectInspector, 5.1f, 5.1f);
TestColumn doubleColumn = new TestColumn("column_name", javaDoubleObjectInspector, 5.1, 5.1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this cover all types in the ParquetPageSourceFactory::schemaMatch(), e.g., different int types and fixed len byte array?

Also, the coverage for nested complex types of this test is not much. So, it would be good to have some comprehensive testing for that case, e.g., can we automate the testing of nested complex types up to a certain nesting level and have some systematic testing instead of having a point test like we have here with nestColumn?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, I will add comprehensive tests in another PR.

mapBlockOf(createUnboundedVarcharType(), new ArrayType(RowType.anonymous(ImmutableList.of(INTEGER))),
"test", arrayBlockOf(RowType.anonymous(ImmutableList.of(INTEGER)), rowBlockOf(ImmutableList.of(INTEGER), 1L))));

HiveErrorCode expectedErrorCode = HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static import HIVE_PARTITION_SCHEMA_MISMATCH.

.withSession(parquetPageSourceSession)
.isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageMapLongLong);

String expectedMessageMapLongMapDouble = "The column column_name is declared as type map<bigint,bigint>, but the Parquet file declares the column as type optional group column_name (MAP) {\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The whitespace before optional looks a bit weird.

.withSession(parquetPageSourceSession)
.isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageMapLongMapDouble);

String expectedMessageArrayStringArrayBoolean = "The column column_name is declared as type array<string>, but the Parquet file declares the column as type optional group column_name (LIST) {\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The whitespace before optional looks a bit weird. I guess that's the indentation used when doing group.writeToStringBuilder(...).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, let me fix it

Copy link
Collaborator Author

@zhenxiao zhenxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you, @nezihyigitbasi
get comments addressed
some need extra PR to address. noted inline

return type;
}

private static boolean schemaMatch(org.apache.parquet.schema.Type parquetType, Type type)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet, we could add tests in another PR

.withSession(parquetPageSourceSession)
.isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageMapLongMapDouble);

String expectedMessageArrayStringArrayBoolean = "The column column_name is declared as type array<string>, but the Parquet file declares the column as type optional group column_name (LIST) {\n"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, let me fix it

throws Exception
{
TestColumn floatColumn = new TestColumn("column_name", javaFloatObjectInspector, 5.1f, 5.1f);
TestColumn doubleColumn = new TestColumn("column_name", javaDoubleObjectInspector, 5.1, 5.1);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, I will add comprehensive tests in another PR.

@arhimondr arhimondr merged commit 5d18a1c into prestodb:master Aug 30, 2019
@zhenxiao zhenxiao deleted the parquet-msg branch March 3, 2020 22:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants