Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for DATE predicate pushdown with Parquet via min/max and … #10181

Closed

Conversation

ryanrupp
Copy link
Contributor

…dictionary (plain). Fix an issue where during existing pushdown this would throw an exception for date to integer domain comparison.

This fixes an issue where because Parquet dates are stored as int32 with logical typing of "date", Presto was detecting that the column was an Integer. Then trying to do comparisons against a Date value the domain comparison would throw an exception:

Mismatched Domain types: date vs integer

This would happen on queries such as:

select count(*) from my_table where date_field > DATE '2018-01-01'

Where the left hand side column was detected as an integer and the right hand side literal was a date. Resolved by inspecting Parquet column data for INT32 to detect if the original type was a DATE.

@facebook-github-bot
Copy link
Collaborator

Thank you for your pull request and welcome to our community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. In order for us to review and merge your code, please sign up at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need the corporate CLA signed.

If you have received this in error or have any questions, please contact us at [email protected]. Thanks!

@ryanrupp
Copy link
Contributor Author

ryanrupp commented Mar 16, 2018

This should be the same issue described here - #8243 (comment) - here's an example exception stack when doing a query such as:

select count(*) from my_table where date_field > DATE '2018-01-01'

when the backing data is Parquet with "date_field" being an INT32 logically typed as a DATE.

Also, relates to #6892 since the types now come from the Parquet data rather than before I think this was looking at the column from the query that Presto derived. So for example, the above query worked in 0.172 (with predicate pushdown enabled it just ignored dates) because before the call to TupleDomainParquetPredicate.getDomain passed the type as "DATE" where as after the linked change the type came in as "INTEGER". There may be some other issues that bubbled up along these same lines e.g. I believe this would happen with timestamp fields but I just wanted to fix dates to start.

@facebook-github-bot
Copy link
Collaborator

Thank you for signing our Contributor License Agreement. We can now accept your code for this (and any) Facebook open source project. Thanks!

Copy link
Contributor

@nezihyigitbasi nezihyigitbasi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this!

Please update your commit message (its title is too long) and make sure it follows the guidelines here.

@ryanrupp ryanrupp force-pushed the fixParquetDatePredicatePushdown branch from 0b43a30 to 669c012 Compare March 16, 2018 21:39
@ryanrupp
Copy link
Contributor Author

I rebased the commit message and pushed changes to support TIMESTAMP as well here (which includes dictionary support although it's sort of weird because cardinality will be high typically but Parquet will use a dictionary if it's not). The pushdown support I added is only for INT64 currently logically typed as TIMESTAMP_MILLIS. TIMESTAMP_MICROS is also supported in newer versions of Parquet I believe. I didn't do anything with INT96 but it looks like maybe this is getting deprecated anyway - https://issues.apache.org/jira/browse/PARQUET-323

@ryanrupp ryanrupp force-pushed the fixParquetDatePredicatePushdown branch from 669c012 to 1168491 Compare March 16, 2018 21:53
@findepi
Copy link
Contributor

findepi commented Mar 16, 2018

and pushed changes to support TIMESTAMP as well here

@ryanrupp how does Parquet TIMESTAMP relates to Presto's TIMESTAMP semantics? Especially given that this semantics is going to be changed, aligned with the SQL standard? (see #7122)

@ryanrupp
Copy link
Contributor Author

ryanrupp commented Mar 16, 2018

@findepi So for Parquet, my understanding is TIMESTAMP_MILLIS is a logical type annotation for a INT64 to indicate milliseconds since epoch - https://github.com/apache/parquet-format/blob/master/LogicalTypes.md - although there's this whole Impala article about the nuances of this so I may be missing some quirks there -https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_timestamp.html

I may back out the timestamp changes then though because I was going off of TIMESTAMP in Presto = Instant (UTC) which the linked issue talking about how those semantics are incorrect. It seems like then the corresponding Presto type should be TIMESTAMP_WITH_TIME_ZONE with the zone being UTC however Hive doesn't support that as a column type yet (https://issues.apache.org/jira/browse/HIVE-14412). Then if I try to create a table in Hive with just a timestamp column backed by Parquet's INT64 TIMESTAMP_MILLIS, Presto won't push down the predicate because of the cast that has to be issued e.g.:

select count(*) from my_table where timestamp_col > TIMESTAMP '2018-01-01 00:00:00 UTC'

Since Presto ends up having to wrap timestamp_col with cast(timestamp_col AS timestamp with time zone) and then this doesn't get pushed down in the effective predicate.

@findepi
Copy link
Contributor

findepi commented Mar 16, 2018

@ryanrupp as part of #7122, mapping from Hive's TIMESTAMP is going to be changed to Presto's TIMESTAMP WITH TIME ZONE.
Does this clear your doubts?

@ryanrupp
Copy link
Contributor Author

ryanrupp commented Mar 17, 2018

Yeah, if Hive's TIMESTAMP column gets mapped internally then to Presto's TIMESTAMP WITH TIME ZONE then this should work. I just pushed a change that treats Parquet's INT64 + TIMESTAMP_MILLIS as a TIMESTAMP WITH TIME ZONE in Presto. Then it takes the statistics/dictionary long values which are in Epoch millis and just UTC packs them for use in Presto.

As I mentioned before though this won't actually work until the Hive mapping change takes place because the predicate doesn't get pushed down since Presto ends up performing a cast then on the Hive timestamp column into timestamp with time zone. It doesn't really break anything though, just the pushdown portion doesn't trim any data. To test this out I changed the HiveType to Presto TIMESTAMP WITH TIME ZONE here and the predicate pushdown portion works correctly but the scan returns incorrect results (probably just needs the changes from #7122)

@ryanrupp
Copy link
Contributor Author

@findepi I took a look at the changes in #9385 although I saw it's been superseded by #10193 (removes the Hive changes though for now). It looks like via TimestampRewriter the long millisecond values are being taken and then normalized to UTC via the Hive storageTimeZone offset. Hive timestamp columns still get mapped to timestamp though in Presto so to get the predicate pushdown here to work I would want to switch that back to just TIMESTAMP for now.

Guessing this stuff may be influx still but just wondering because I could back out the timestamp changes for now so the date changes could get merged (I don't see an issue with this for dates at least).

@findepi
Copy link
Contributor

findepi commented Mar 23, 2018

@ryanrupp thank you for taking your time to look into this. #10193 doesn't ship Hive changes for the very reason they are not in their final shape yet.

Guessing this stuff may be influx still but just wondering because I could back out the timestamp changes for now so the date changes could get merged (I don't see an issue with this for dates at least).

That might be safer way to do. However, please note that I'm fluent in parquet file handling, i'd defer to @nezihyigitbasi for an ultimate decision here.

@findepi findepi mentioned this pull request Apr 4, 2018
16 tasks
@nezihyigitbasi
Copy link
Contributor

The changes to the timestamp types are in progress and it may take some time to have the issue completely fixed. So, I think it makes sense to first get this patch in with only fixing the problem with the DATE type (reverting your TIMESTAMP changes) as that has been reported by a few users in the past. We can then add support for the TIMESTAMP types once the new semantics are implemented.

Copy link
Contributor

@nezihyigitbasi nezihyigitbasi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will take another look once you update the PR to have the fix for only the DATE type.

@ryanrupp ryanrupp force-pushed the fixParquetDatePredicatePushdown branch from f90813c to 0ee54b7 Compare November 12, 2018 03:11
@ryanrupp
Copy link
Contributor Author

@nezihyigitbasi I backed out the TIMESTAMP changes so this is just DATE now and is cleaned up for review.

As a seperate follow up I was going to try to catch up and see what the current state of the TIMESTAMP semantic changes are. This is useful to us because there's some cases where structuring data to be partitioned by date creates too much complexity (e.g. making sure partitions are not too granular, end users writing SQL needing to understand the partitioning scheme when something other than a full date is used e.g. monthly or yearly partitions) so it's nice to fallback on Parquet statistics in that case to trim the data scanned and still use DATE and TIMESTAMP columns.

@@ -169,7 +171,7 @@ public static Type getPrestoType(TupleDomain<ColumnDescriptor> effectivePredicat
case DOUBLE:
return DoubleType.DOUBLE;
case INT32:
return createDecimalType(descriptor).orElse(IntegerType.INTEGER);
return getInt32Type(descriptor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why INT64 and INT32 cases are handled differently now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because Parquet has Logical Types where additional metadata in originalType determines how an int32 can be interpreted. In this case, it's checking if the int32 was configured to be interpreted as a date (number of days since Unix epoch which matches the semantics of the Presto Date type).

Before this change if I had a query like:

select count(*) from my_table where date_col = DATE '2018-01-01'

The query would fail with a type mismatch exception about comparing a date value to integer because the logical type wasn't considered. There's a similar issue related to this here - #11118 - which is more involved as I was only targeting the date pushdown currently.

I backed out the timestamp changes but the case statement for int64 in the future would be adjusted in a similar way to consider logical types e.g. an int64 can be logically typed as a timestamp where the value is in epoch millis, micro or nanos.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this, #11118 actually solves this differently by adding the HiveType to RichColumnDescriptor then determining the mapped Presto type for an int32 based on that here

The outcome should be the same though.

Copy link
Contributor Author

@ryanrupp ryanrupp Nov 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some unit tests in 1f78d8e3c0dfa4ac8e9ad7a8e2b9f53409a55e05 for this in case this changes in the future.

IntStatistics intStatistics = (IntStatistics) statistics;
// ignore corrupted statistics
if (intStatistics.genericGetMin() > intStatistics.genericGetMax()) {
return Domain.create(ValueSet.all(type), hasNullValue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nezihyigitbasi do you know the reason why we silently ignore something? I see we do this already for other cases above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't exactly remember the history behind that. @zhenxiao do you remember why we are silently ignoring corrupt stats?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as I remembered, if the statistics is obviously incorrect/corrupted(in this case, min > max), we silently do not leverage statistics to evaluate predicates.
it should not error out, as if stats are incorrect/corrupted, we could still scan data to complete the query, just lost a potential speedup.
maybe adding a log message here? @findepi @nezihyigitbasi what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I prefer failing fast & loudly for such cases as that may uncover bugs/issues in whoever has written those files/stats.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. fail loudly is better. I will file following task to fix it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here it is: #12036

@ryanrupp
Copy link
Contributor Author

@nezihyigitbasi could this get another review? I'm not sure how to indicate the review open issues were followed up on, may be because I force pushed out the timestamp change so github is just referencing a commit that no longer exists. Thanks

Copy link
Contributor

@nezihyigitbasi nezihyigitbasi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@nezihyigitbasi
Copy link
Contributor

@ryanrupp I think we can squash the commits into a single commit as I don't think we need a separate commit for the unit tests. Let me know once you address the comments and we can merge this. Thanks!

This additionally fixes an issue that filtering against a DATE column
would result in an exception due to type mismatch. DATE support is for
Parquet INT32 columns logically typed as DATE.
@ryanrupp ryanrupp force-pushed the fixParquetDatePredicatePushdown branch from 1f78d8e to 7626073 Compare November 28, 2018 04:46
@ryanrupp
Copy link
Contributor Author

@nezihyigitbasi thanks for the review. I made the follow up changes and squashed the commits.

@nezihyigitbasi
Copy link
Contributor

Merged, thanks @ryanrupp.

@ryanrupp ryanrupp deleted the fixParquetDatePredicatePushdown branch November 29, 2018 00:11
@sa255304
Copy link

sa255304 commented Dec 17, 2018

Hi All, I am still facing this issue. Please confirm how to resolve this ?

Do i need to change anything in code or is there a new jar availalbe with the fix for this ?

Error ?

Caused by: java.sql.SQLException: Query failed (#20181217_165822_00916_nh8w3): Error opening Hive split /user/test/file_path/part-00000-830904de-7b0e-41c0-88e5-40d3990689e5-c000.snappy.parquet (offset=0, length=7333): Mismatched Domain types: date vs integer
at com.facebook.presto.jdbc.PrestoResultSet.resultsException(PrestoResultSet.java:1840)
at com.facebook.presto.jdbc.PrestoResultSet$ResultsPageIterator.computeNext(PrestoResultSet.java:1820)
at com.facebook.presto.jdbc.PrestoResultSet$ResultsPageIterator.computeNext(PrestoResultSet.java:1759)
at com.facebook.presto.jdbc.internal.guava.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
at com.facebook.presto.jdbc.internal.guava.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
at com.facebook.presto.jdbc.internal.guava.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
at com.facebook.presto.jdbc.internal.guava.collect.Iterators$ConcatenatedIterator.getTopMetaIterator(Iterators.java:1319)
at com.facebook.presto.jdbc.internal.guava.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1335)
at com.facebook.presto.jdbc.PrestoResultSet.next(PrestoResultSet.java:144)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:343)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:329)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Cause

@beatbull
Copy link

beatbull commented Jan 3, 2019

@sa255304 I recently ran into the same issue using Presto version 0.211. According to the code, the fix has been released in version 0.215. In version 0.215, the fix works fine. Please upgrade to Presto to version 0.215.

@sa255304
Copy link

sa255304 commented Jan 3, 2019

thanks for your response. I am using presto 0.212 by the time i posted my problem. I am Reading my presto table data to pyspark code and trying to do date comparisons with spark sql . Then i got that issue.

my EMR Cluster is currently having presto 0.214 version. Still, I tried to pass presto 0.215 jars as an external dependency to pyspark code. The issue still exists.

Is there any alternative for this issue?
As i have to wait for Amazon to upgrade presto version in latest EMR Release.

@nezihyigitbasi
Copy link
Contributor

You need version 0.215 to get the fix. You should talk to the EMR team for EMR-specific questions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants