Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better Grouping / aggregation pushdown #8699

Open
alamb opened this issue Dec 31, 2023 · 7 comments
Open

Better Grouping / aggregation pushdown #8699

alamb opened this issue Dec 31, 2023 · 7 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Dec 31, 2023

Is your feature request related to a problem or challenge?

@devinjdangelo asked on slack
https://the-asf.slack.com/archives/C04RJ0C85UZ/p1703891500973189?thread_ts=1703891392.037839&cid=C04RJ0C85UZ

Related, is there any mechanism to push partial aggregations down to a table provider? E.g. “select count(*) from custom_provider” would push the aggregate expression down to the TableProvider rather than doing a scan.

Pushing grouping down in a plan is a classic analytic database optimization.

Among other things, it would allow doing fast data exploration like looking at counts / mins / maxes in different columns from only metadata.

Describe the solution you'd like

I would like someone to propose

  1. a change to TableProvider that would allow pushing aggregation down into the TableProider
  2. A basic optimizer pass that pushes aggregation down when possible

We shouldn't innovate on API here, but should look at what other engines support in this area (e.g. TRINO / Spark / Postgres) and follow them unless there is a good reason to do something different

A good initial proof of concept would be to push COUNT(*), MIN(col), MAX(col) type queries down into parquet or other table providers that can provide such values from statistics or metadata with minimal work / data fetch

For example

datafusion-cli -c "select count(*), min(bigint_col), max(bigint_col) from './parquet-testing/data/alltypes_tiny_pages.parquet';"
DataFusion CLI v34.0.0
+----------+--------------------------------------------------------------------+--------------------------------------------------------------------+
| COUNT(*) | MIN(./parquet-testing/data/alltypes_tiny_pages.parquet.bigint_col) | MAX(./parquet-testing/data/alltypes_tiny_pages.parquet.bigint_col) |
+----------+--------------------------------------------------------------------+--------------------------------------------------------------------+
| 7300     | 0                                                                  | 90                                                                 |
+----------+--------------------------------------------------------------------+--------------------------------------------------------------------+
1 row in set. Query took 0.007 seconds.

The optimizer framework should eventually extend to pushing more sophisticated groupings (like pushing grouping down below joins) but many

Describe alternatives you've considered

I believe Eager Aggregation and Lazy Aggregation is the classic paper on the optimizer portion

Additional context

@devinjdangelo
Copy link
Contributor

Thank you for filing @alamb ! I was just reading related historical discussion in #970 . Trino also has some docs that were helpful for me understanding this: https://trino.io/docs/current/optimizer/pushdown.html#aggregation-pushdown.

I'm interested in helping out here where I can.

@alamb alamb changed the title Grouping / aggregation pushdown Better Grouping / aggregation pushdown Jan 1, 2024
@alamb
Copy link
Contributor Author

alamb commented Jan 1, 2024

I actually found / remembered that we have https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/src/physical_optimizer/aggregate_statistics.rs already that tries to handle the usecase of using statistics to rewrite queries.

Perhaps we need to improve documentation, or maybe make the existing rule more sophisticated 🤔

@devinjdangelo
Copy link
Contributor

devinjdangelo commented Jan 1, 2024

I took a look at the Statistics that a TableProvider can currently return as @andygrove suggested on slack. It feels that the interface was specifically designed/coupled to parquet as the only column level stats that can be returned are:

/// Statistics for a column within a relation
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct ColumnStatistics {
    /// Number of null values on column
    pub null_count: Precision<usize>,
    /// Maximum value of column
    pub max_value: Precision<ScalarValue>,
    /// Minimum value of column
    pub min_value: Precision<ScalarValue>,
    /// Number of distinct values
    pub distinct_count: Precision<usize>,
}

Further, the TableProvider::statistics method does not pass any information about the query under planning, which implies that the statistics are assumed to be static and precomputed outside of the context of a specific query.

/// Get statistics for this table, if available
fn statistics(&self) -> Option<Statistics> {
    None
}

If you think about a TableProvider backed by a fully fledged execution engine, we could push down much more than this. E.g. we could compute the mean of a column considering filter pushdown at the same time (if they are all "exact" which they likely are in this case). I also think calling the feature "Statistics" is confusing outside of the parquet context. Parquet statistics are precomputed and stored in the file, but other TableProviders could calculate arbitrary "statistics" on the fly, which I think more commonly would be called "aggregations" in this context.

Perhaps we could have a more general "AggregationPushdown feature" and "Statistics" could be a special case implementation for parquet backed tables to support push down of some Aggregations.

@alamb
Copy link
Contributor Author

alamb commented Jan 1, 2024

If you think about a TableProvider backed by a fully fledged execution engine, we could push down much more than this.

Someone also discussed recently about "sort pushdown" as well -- but it gets tricky for sorting as we would have to be able to communicate somehow if pushing down a sort was better than doing it in datafusion -- which is different than communicating / using a pre-existing sort

@backkem
Copy link
Contributor

backkem commented Jan 10, 2024

I had made a suggestion here that turns the current approach around: instead of a TableProvider expressing what it can push down. Let the TableProvider run it's own optimizer pass on (part of) the plan to self-determine what it can push down; replacing its federated part of the plan with a virtual TableScan. In the comment I linked I provided some references to the Presto codebase taking this approach.

@devinjdangelo
Copy link
Contributor

If anyone comes across this issue and is interested in helping develop or test this type of capability, I think consolidating effort in https://github.com/datafusion-contrib/datafusion-federation is a good plan. @backkem wired up a nice framework for this, and I think if we flesh it out it could eventually be a great addition to datafusion proper.

@Lordworms
Copy link
Contributor

Interested in this one!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants