-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow scans with the number of partitions exceeding the limit #14225
Conversation
67f4ba4
to
28bcba0
Compare
An attempt to remove the limitation on the number of partitions per scan. This change doesn't remove the setting defining the limit, instead when the number of partitions is higher than the limit the system will try to avoid loading partitions eagerly. Certain optimizations are not possible (such as partition pruning) when partitions cannot be loaded inmemory. This change also changes how HiveSplitSource works to minimize memory footprint when the number of partitions is high. @electrum @findepi @Praveen2112 Could you please take a look and let me know what you think? |
|
The |
One practical way this config is used to prevent people from running queries which scan too many partitions (to prevent costs and to force users to add predicates on partition columns to get efficient queries). For cost there's the better option now with I don't think someone would still actually want to enforce limits based on number of partitions since that's very arbitrary but I agree with @raunaqmorarka we should not re-purpose existing configs. Ideally we can mark existing config as |
Looking at configs like
Not like this, yes
Do we have any migration path? Do we need any? |
I don't think users take into account the intent of the original author when using a config. In this case there is no documentation or description available to see that this had anything to do with memory. So anyone already using it can't possibly know that this was not meant to be used as a way to block queries touching a large number of partitions. I think we should look into deprecating and removing I'm also wondering if this trade-off of reducing coordinator memory usage but giving up on partition pruning makes sense. If someone wanted to run queries on a large number of partitions and the coordinator memory was a limiting factor, why wouldn't they get a bigger coordinator instead of incurring higher cost of running query without partition pruning ? It seems cheaper to upgrade 1 node rather than consume a lot more resources on workers. Is it possible that we can still prune splits on the workers using the predicate on partitioned columns ? E.g. for dynamic partition pruning we have |
28bcba0
to
3148f77
Compare
Thanks everybody for the feedback. I updated the PR preserving the Instead I introduced a new property, This PR also makes it possible to scan tables that exceed the value set by Please take an another look. |
@@ -2722,7 +2722,9 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle | |||
metastore.truncateUnpartitionedTable(session, handle.getSchemaName(), handle.getTableName()); | |||
} | |||
else { | |||
for (HivePartition hivePartition : partitionManager.getOrLoadPartitions(metastore, handle)) { | |||
Iterator<HivePartition> partitions = partitionManager.getPartitions(metastore, handle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we skip the partition check during DELETE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't think there's a real reason to limit the number of partitions for delete. Also the property says "scan".
@@ -221,9 +227,6 @@ public ConnectorSplitSource getSplits( | |||
// validate bucket bucketed execution | |||
Optional<HiveBucketHandle> bucketHandle = hiveTable.getBucketHandle(); | |||
|
|||
// sort partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we have to sort them ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addLoaderIfNecessary(); | ||
} | ||
|
||
private void addLoaderIfNecessary() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we extract it as a preparatory commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What specifically? The commit is to "Avoid loading partitions eagerly in HiveSplitManager" and the changes to the BackgroundHiveSplitLoader
are necessary to achieve that
@@ -70,7 +70,7 @@ | |||
private boolean singleStatementWritesOnly; | |||
|
|||
private DataSize maxSplitSize = DataSize.of(64, MEGABYTE); | |||
private int maxPartitionsPerScan = 100_000; | |||
private int maxPartitionsPerScan = 1_000_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Avoid loading partitions eagerly in HiveSplitManager
improves split enumeration to avoid parsing / loading partitions eagerly. The memory should no longer be of concern. The property now is mostly for system administrators to prevent unnecessary large scans.
ReentrantLock lock = new ReentrantLock(); | ||
lock.lock(); | ||
try { | ||
executor.execute(() -> checkState(!lock.isHeldByCurrentThread(), "executor is a direct executor")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually need the lock? This seems to be dependent on the fact that the exception is thrown in the calling thread, so we could simply do
executor.execute(() -> {
throw new IllegalArgumentException("executor is a direct executor");
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that. I'm worried that if an executor is configured to handle uncaught exceptions (for example log them) it will create unnecessary noise in the output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3148f77
to
1161609
Compare
Updated |
To have the hive.max-partitions-per-scan property only as a limit
1161609
to
f22436d
Compare
Thx for very interesting feature. We were expecting this kind of feature to limit maximum number of partitions. Before this, we were using Bytebuddy to Hook HivePartitionManager class to prevent users from throwing abusing queries and slowing cluster by having limitations on maximum number of partitions they can query in single query. We simply check and return error if number of partitions read is greater than config for specific tables only (tables which we are sure that has HUGE amount of data per partitions) |
Description
Treat such scans as scans over non partitioned tables without applying partition pruning. This is needed to avoid storing too much information in HiveTableHandle.
Non-technical explanation
Allow scanning more partitions than the limit per scan
Release notes
( ) This is not user-visible or docs only and no release notes are required.
(x) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: