-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support unparsing LogicalPlan::Window nodes #10767
Conversation
self.scalar_to_sql(val).map(Box::new).ok(), | ||
) | ||
Ok(ast::WindowFrameBound::Preceding({ | ||
let val = self.scalar_to_sql(val)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a subtle difference in how datafusion plans a window frame bound that is None
vs ScalarValue::Null.
The former yields PRECEDING UNBOUNDED
and the latter yields PRECEDING NULL
.
Datafusion's planner accepts the former, but rejects the latter.
@@ -1148,7 +1158,7 @@ mod tests { | |||
window_frame: WindowFrame::new(None), | |||
null_treatment: None, | |||
}), | |||
r#"ROW_NUMBER(col) OVER (ROWS BETWEEN NULL PRECEDING AND NULL FOLLOWING)"#, | |||
r#"ROW_NUMBER(col) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)"#, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment on L520 for explanation of this test change.
|
||
/// Recursively searches children of [LogicalPlan] to find an Aggregate node if one exists | ||
/// One of the possible aggregation plans which can be found within a single select query. | ||
pub(crate) enum AggVariant<'a> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assumes a SELECT query can exclusively have only a window function or an aggregate function but not both. A LogicalPlan can certainly have both, but I could not find an example of a single SELECT query without any nesting/derived table factors that is allowed to have both.
@@ -127,7 +127,10 @@ fn roundtrip_statement() -> Result<()> { | |||
UNION ALL | |||
SELECT j2_string as string FROM j2 | |||
ORDER BY string DESC | |||
LIMIT 10"# | |||
LIMIT 10"#, | |||
"SELECT id, count(*) over (PARTITION BY first_name ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The roundtrip test will fail if ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
is not explicitly included. E.g. the datafusion planner generates a non identical plan for the following two SQL queries:
SELECT id,
count(*) over (PARTITION BY first_name ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),
last_name,
sum(id) over (PARTITION BY first_name ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),
first_name from person
vs
SELECT id,
count(*) over (PARTITION BY first_name),
last_name,
sum(id) over (PARTITION BY first_name),
first_name from person
While the two plans are not identical p1!=p2
, I believe the difference is trivial and will actually result in the same computations taking place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While the two plans are not identical p1!=p2, I believe the difference is trivial and will actually result in the same computations taking place.
That is my understanding as well
We could plausibly simply the resulting plan of the window bounds are the default
Can we also add some tests that have aggregate and window functions? Something like
SELECT id, count(distinct id), sum(id) OVER (PARTITION BY first_name) from person
SELECT id, sum(id) OVER (PARTITION BY first_name ROWS 5 PRECEDING ROWS 2 FOLLOWING) from person
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that the datafusion planner does not support mixing aggregate and window functions. It does allow mixing window functions with different WindowSpecs, including some over all rows (which is almost the same thing as an aggregate function). I think this behavior makes sense as an aggregate function is strict in how many tuples it will return (one per group) while a window function can return multiple tuples per group as needed.
DataFusion CLI v38.0.0
> create table person (id int, name varchar);
0 row(s) fetched.
Elapsed 0.001 seconds.
> insert into person values (1, 'a'), (2, 'b'), (3, 'c');
+-------+
| count |
+-------+
| 3 |
+-------+
1 row(s) fetched.
Elapsed 0.001 seconds.
> select count(distinct id), sum(id) over (partition by name) from person;
Error during planning: Projection references non-aggregate values: Expression person.id could not be resolved from available columns: COUNT(DISTINCT person.id)
> select count(distinct id) over (), sum(id) over (partition by name) from person;
+---------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------+
| COUNT(person.id) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING | SUM(person.id) PARTITION BY [person.name] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING |
+---------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------+
| 3 | 2 |
| 3 | 1 |
| 3 | 3 |
+---------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------+
3 row(s) fetched.
Elapsed 0.002 seconds.
I added some tests and made a few changes to correctly support unparsing a SELECT query with multiple different WindowSpecs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests look great now -- thanks @devinjdangelo
datafusion/sql/src/unparser/plan.rs
Outdated
.map(|expr| self.expr_to_sql(expr)) | ||
.collect::<Result<Vec<_>>>()?, | ||
)); | ||
if let Some(aggvariant) = find_agg_node_within_select(plan, true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The select_to_sql_recursively
method has grown deeply nested/complex and is due for a refactor or at least breaking up into more helper methods to improve readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this would be a good follow on PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @devinjdangelo -- this is looking pretty neat
datafusion/sql/src/unparser/plan.rs
Outdated
.map(|expr| self.expr_to_sql(expr)) | ||
.collect::<Result<Vec<_>>>()?, | ||
)); | ||
if let Some(aggvariant) = find_agg_node_within_select(plan, true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this would be a good follow on PR.
@@ -127,7 +127,10 @@ fn roundtrip_statement() -> Result<()> { | |||
UNION ALL | |||
SELECT j2_string as string FROM j2 | |||
ORDER BY string DESC | |||
LIMIT 10"# | |||
LIMIT 10"#, | |||
"SELECT id, count(*) over (PARTITION BY first_name ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While the two plans are not identical p1!=p2, I believe the difference is trivial and will actually result in the same computations taking place.
That is my understanding as well
We could plausibly simply the resulting plan of the window bounds are the default
Can we also add some tests that have aggregate and window functions? Something like
SELECT id, count(distinct id), sum(id) OVER (PARTITION BY first_name) from person
SELECT id, sum(id) OVER (PARTITION BY first_name ROWS 5 PRECEDING ROWS 2 FOLLOWING) from person
datafusion/sql/src/unparser/utils.rs
Outdated
/// | ||
/// For example, if expr contains the column expr "COUNT(*) PARTITION BY id" it will be transformed | ||
/// into an actual window expression as identified in the window node. | ||
pub(crate) fn unproject_window_exprs(expr: &Expr, window: &Window) -> Result<Expr> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to implement unproject_window_exprs
before by using the same method in unproject_agg_exprs
, that's finding the index_of_column
but it's not correct. Could you tell me why there's a difference? 😊
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference I found is that the Aggregate structure maintains all of the relevant Exprs that a Projection node may be referencing in its group by and aggr expr vecs. So, you can look up any column in the schema of the aggregate plan to find the correct expression to sub into the Projection node.
Window on the other hand only keeps the list of window functions whereas the schema has all of the fields. So its no longer possible to rely on the index of a column within the schema to find the correct expression.
Instead, I am using the auto generated column name for the expression to identify the correct expression. This is definitely a more brittle way to do it and could break if there are unexpected changes to how names are generated or propagated during planning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It turns out another difference is there can be multiple Window nodes representing different WindowSpecs within the same select query. I had to make a few additional tweaks to handle this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great -- thank you @devinjdangelo
@@ -127,7 +127,10 @@ fn roundtrip_statement() -> Result<()> { | |||
UNION ALL | |||
SELECT j2_string as string FROM j2 | |||
ORDER BY string DESC | |||
LIMIT 10"# | |||
LIMIT 10"#, | |||
"SELECT id, count(*) over (PARTITION BY first_name ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests look great now -- thanks @devinjdangelo
* unparse window plans * new tests + fixes * fmt
Which issue does this PR close?
closes #10664
Rationale for this change
Queries involving window functions are common and should be supported for unparsing a plan into SQL.
What changes are included in this PR?
Are these changes tested?
Yes, with a new round trip plan_to_sql test
Are there any user-facing changes?
Additional queries can be unparsed