-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Parquet TupleDomain using ColumnDescriptor #6892
Conversation
@nezihyigitbasi @dain this is a first step to support nested predicate pushdown for Parquet in Presto. This PR is to make Parquet TupleDomain based on Parquet ColumnDescriptor, so that when nested predicates are pushed down, it could easily work |
0dba438
to
eef5773
Compare
@dain @nezihyigitbasi kindly ping |
eef5773
to
fb3cf7a
Compare
2dfa193
to
470f87a
Compare
b3aec2e
to
b8e275f
Compare
@dain @nezihyigitbasi kindly ping |
b8e275f
to
71e17e6
Compare
@dain @nezihyigitbasi could you please review when you are free? |
@nezihyigitbasi @dain kindly ping |
1 similar comment
@nezihyigitbasi @dain kindly ping |
In an offline discussion we decided that @zhenxiao was going to investigate using synthetic virtual-columns in the connector to enable this push-down feature. |
Hi @dain: Could you please review this first? |
Ah ok. Will do. |
@dain @nezihyigitbasi kindly ping |
@zhenxiao I will take a look at this shortly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhenxiao I did a first pass, back to you.
return index; | ||
} | ||
|
||
public static Type getPrestoType(RichColumnDescriptor descriptor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you, get it resolved
@@ -348,8 +350,9 @@ public void close() | |||
long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); | |||
if (firstDataPage >= start && firstDataPage < start + length) { | |||
if (predicatePushdownEnabled) { | |||
ParquetPredicate parquetPredicate = buildParquetPredicate(columns, effectivePredicate, fileMetaData.getSchema(), typeManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that the typeManager
is not used anymore in this method we can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get it, resolved
} | ||
|
||
return new TupleDomainParquetPredicate<>(effectivePredicate, columnReferences.build()); | ||
ImmutableMap.Builder<ColumnDescriptor, Domain> predicate = ImmutableMap.builder(); | ||
for (Map.Entry<HiveColumnHandle, Domain> entry : effectivePredicate.getDomains().get().entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static import Map.Entry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, updated
import static com.google.common.base.MoreObjects.toStringHelper; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
public class ParquetColumnReference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class already has access to the column descriptor so we shouldn't need to pass the Presto type as well since Presto type can be derived from that (like you do in getPrestoType()
). So I guess we can move getPrestoType()
logic to this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get it. this class not needed at all. Will delete and use RichColumnDescriptor directly
@@ -39,16 +40,14 @@ public boolean matches(Map<Integer, ParquetDictionaryDescriptor> dictionariesByC | |||
* | |||
* @param numberOfRows the number of rows in the segment; this can be used with | |||
* Statistics to determine if a column is only null | |||
* @param statisticsByColumnIndex statistics for column by ordinal position | |||
* in the file; this will match the field order from the hive metastore | |||
* @param statistics statistics for column by Parquet Column Descriptor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@param statistics column statistics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved
if (columnStatistics != null) { | ||
statistics.put(ordinal, columnStatistics); | ||
String[] paths = columnMetaData.getPath().toArray(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need the paths
variable, you can just pass Arrays.asList(columnMetaData.getPath().toArray())
below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get it. Updated
dictionaries.put(ordinal, new ParquetDictionaryDescriptor(columnDescriptor, dictionaryPage)); | ||
dataSource.readFully(columnMetaData.getStartingPos(), buffer); | ||
Optional<ParquetDictionaryPage> dictionaryPage = readDictionaryPage(buffer, columnMetaData.getCodec()); | ||
dictionaries.put(columnDescriptor, new ParquetDictionaryDescriptor(columnDescriptor, dictionaryPage)); | ||
} | ||
catch (IOException ignored) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although not entirely related to this patch, why are we ignoring the IOException
here? If we get an exception we will silently skip populating dictionaries
for that particular column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if hitting any problem when trying to read the dictionary page, we will just silently skip, and will not use that dictionary to try skip reading row groups
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK as long as we are not messing up while we read.
for (ColumnChunkMetaData columnMetaData : blockMetadata.getColumns()) { | ||
String[] paths = columnMetaData.getPath().toArray(); | ||
Optional<RichColumnDescriptor> descriptor = getDescriptor(fileSchema, requestedSchema, Arrays.asList(paths)); | ||
if (descriptor.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if the descriptor is not present and we return an empty map of dictionaries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only build dictionaries when descriptor exist, otherwise, dictionaries is empty, will scan file, dictionary predicate will not apply
return effectivePredicate.getDomains().get().keySet().stream() | ||
.map(HiveColumnHandle::getName) | ||
.anyMatch(columnName::equals); | ||
return parquetTupleDomain.getDomains().get().keySet().contains(columnDescriptor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getDomains()
returns an Optional
, are we sure it's non-empty? If yes, we should better assert that (com.google.common.base.Verify::verify
).
throw new ParquetDecodingException("Parquet type FIXED_LEN_BYTE_ARRAY supported as DECIMAL; got " + descriptor.getPrimitiveType().getOriginalType()); | ||
} | ||
default: | ||
throw new ParquetDecodingException("Presto does not have type for : " + descriptor.getType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new PrestoException(NOT_SUPPORTED, "Unsupported parquet type: " + descriptor.getType());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, updated
71e17e6
to
bbed115
Compare
thank you @nezihyigitbasi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except some minor comments. BTW do we need similar changes in the new reader for nested predicate pushdown support?
if (type.getName().equalsIgnoreCase(columnName)) { | ||
return type; | ||
} | ||
} | ||
|
||
return null; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that we have this method in ParquetTypeUtils
we can update ParquetColumnReader
to also call this one instead of maintaining two copies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, get it updated
return effectivePredicate.getDomains().get().keySet().stream() | ||
.map(HiveColumnHandle::getName) | ||
.anyMatch(columnName::equals); | ||
verify(parquetTupleDomain.getDomains().isPresent(), "Parquet TupleDomain should not be empty"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"parquetTupleDomain is empty"
bbed115
to
5eda135
Compare
thank you @nezihyigitbasi get comments addressed |
@dain I think this looks good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one minor comment. Otherwise, Nezih, merge whenever you want.
ColumnIO[] fields = columnIO.getPath(); | ||
List<PrimitiveColumnIO> columns = getColumns(fileSchema, requestedSchema); | ||
int index = -1; | ||
for (int i = 0; i < columns.size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename i
and j
to something more descriptive? Maybe columnIndex
and level
(and rename level
to maxLevel
)?
5eda135
to
ff5cd08
Compare
thank you @dain @nezihyigitbasi |
thanks @zhenxiao I will merge this once the tests all pass. |
Hi @zhenxiao , |
Hi @shurvitz thanks for reaching out. |
Get it. Thank you, @shurvitz @pettyjamesm |
Currently Parquet TupleDomain is constructed based on HiveColumnHandle. This would not work if Nested predicate are pushed down, e.g.
This patch construct Parquet TupleDomain with Parquet's ColumnDescriptor, so that it could work with nested predicate pushdown