Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49836][SQL][SS] Fix possibly broken query when window is provided to window/session_window fn #48309

Closed
wants to merge 3 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Oct 1, 2024

What changes were proposed in this pull request?

This PR fixes the correctness issue about losing operators during analysis - it happens when window is provided to window()/session_window() function.

The rule TimeWindowing and SessionWindowing are responsible to resolve the time window functions. When the window function has window as parameter (time column) (in other words, building time window from time window), the rule wraps window with WindowTime function so that the rule ResolveWindowTime will further resolve this. (And TimeWindowing/SessionWindowing will resolve this again against the result of ResolveWindowTime.)

The issue is that the rule uses "return" for the above, which intends to have "early return" as the other branch is too long compared to this branch. This unfortunately does not work as intended - the intention is just to go out of current local scope (mostly end of curly brace), but it seems to break the loop of execution in "outer" side.
(I haven't debugged further but it's simply clear that it doesn't work as intended.)

Quoting from Scala doc:

Nonlocal returns are implemented by throwing and catching scala.runtime.NonLocalReturnException-s.

It's not super clear where NonLocalReturnException is caught in the call stack; it might exit the execution for much broader scope (context) than expected. And it's finally deprecated in Scala 3.2 and likely be removed in future.

https://dotty.epfl.ch/docs/reference/dropped-features/nonlocal-returns.html

Interestingly it does not break every query for chained time window aggregations. Spark already has several tests with DataFrame API and they haven't failed. The reproducer in community report is using SQL statement - where each aggregation is considered as subquery.

This PR fixes the rule to NOT use early return and instead have a huge if else.

Why are the changes needed?

Described in above.

Does this PR introduce any user-facing change?

Yes, this fixes the possible query breakage. The impacted workloads may not be very huge as chained time window aggregations is an advanced usage, and it does not break every query for the usage.

How was this patch tested?

New UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Oct 1, 2024
@HeartSaVioR
Copy link
Contributor Author

https://dotty.epfl.ch/docs/reference/dropped-features/nonlocal-returns.html

Going further, this is deprecated in newer Scala version (though it's like 3.2 which Spark wouldn't catch up for years) and will be removed. This clearly indicates that programmers have problems with this and they want to make this be much more explicit rather than just the same with local return.

@HeartSaVioR
Copy link
Contributor Author

cc. @cloud-fan Would you mind take a look at this? Thanks!

@@ -87,85 +87,86 @@ object TimeWindowing extends Rule[LogicalPlan] {

val window = windowExpressions.head

// time window is provided as time column of window function, replace it with WindowTime
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewer: except this code comment, the only change is to remove return and replace it with a huge if-else statement.

@@ -210,74 +211,74 @@ object SessionWindowing extends Rule[LogicalPlan] {
val session = sessionExpressions.head

if (StructType.acceptsType(session.timeColumn.dataType)) {
return p transformExpressions {
p transformExpressions {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewer: the only change is to remove return and replace it with a huge if-else statement.

@HeartSaVioR
Copy link
Contributor Author

cc. @viirya

@HeartSaVioR HeartSaVioR changed the title [SPARK-49836][SS] Fix possibly broken query when window is provided to window/session_window fn [SPARK-49836][SQL][SS] Fix possibly broken query when window is provided to window/session_window fn Oct 2, 2024
@HeartSaVioR
Copy link
Contributor Author

Thanks! Merging to master/3.5/3.4.

HeartSaVioR added a commit that referenced this pull request Oct 4, 2024
…ded to window/session_window fn

This PR fixes the correctness issue about losing operators during analysis - it happens when window is provided to window()/session_window() function.

The rule `TimeWindowing` and `SessionWindowing` are responsible to resolve the time window functions. When the window function has `window` as parameter (time column) (in other words, building time window from time window), the rule wraps window with WindowTime function so that the rule ResolveWindowTime will further resolve this. (And TimeWindowing/SessionWindowing will resolve this again against the result of ResolveWindowTime.)

The issue is that the rule uses "return" for the above, which intends to have "early return" as the other branch is too long compared to this branch. This unfortunately does not work as intended - the intention is just to go out of current local scope (mostly end of curly brace), but it seems to break the loop of execution in "outer" side.
(I haven't debugged further but it's simply clear that it doesn't work as intended.)

Quoting from Scala doc:

> Nonlocal returns are implemented by throwing and catching scala.runtime.NonLocalReturnException-s.

It's not super clear where NonLocalReturnException is caught in the call stack; it might exit the execution for much broader scope (context) than expected. And it's finally deprecated in Scala 3.2 and likely be removed in future.

https://dotty.epfl.ch/docs/reference/dropped-features/nonlocal-returns.html

Interestingly it does not break every query for chained time window aggregations. Spark already has several tests with DataFrame API and they haven't failed. The reproducer in community report is using SQL statement - where each aggregation is considered as subquery.

This PR fixes the rule to NOT use early return and instead have a huge if else.

Described in above.

Yes, this fixes the possible query breakage. The impacted workloads may not be very huge as chained time window aggregations is an advanced usage, and it does not break every query for the usage.

New UTs.

No.

Closes #48309 from HeartSaVioR/SPARK-49836.

Lead-authored-by: Jungtaek Lim <[email protected]>
Co-authored-by: Andrzej Zera <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit d8c04cf)
Signed-off-by: Jungtaek Lim <[email protected]>
HeartSaVioR added a commit that referenced this pull request Oct 5, 2024
…ded to window/session_window fn

This PR fixes the correctness issue about losing operators during analysis - it happens when window is provided to window()/session_window() function.

The rule `TimeWindowing` and `SessionWindowing` are responsible to resolve the time window functions. When the window function has `window` as parameter (time column) (in other words, building time window from time window), the rule wraps window with WindowTime function so that the rule ResolveWindowTime will further resolve this. (And TimeWindowing/SessionWindowing will resolve this again against the result of ResolveWindowTime.)

The issue is that the rule uses "return" for the above, which intends to have "early return" as the other branch is too long compared to this branch. This unfortunately does not work as intended - the intention is just to go out of current local scope (mostly end of curly brace), but it seems to break the loop of execution in "outer" side.
(I haven't debugged further but it's simply clear that it doesn't work as intended.)

Quoting from Scala doc:

> Nonlocal returns are implemented by throwing and catching scala.runtime.NonLocalReturnException-s.

It's not super clear where NonLocalReturnException is caught in the call stack; it might exit the execution for much broader scope (context) than expected. And it's finally deprecated in Scala 3.2 and likely be removed in future.

https://dotty.epfl.ch/docs/reference/dropped-features/nonlocal-returns.html

Interestingly it does not break every query for chained time window aggregations. Spark already has several tests with DataFrame API and they haven't failed. The reproducer in community report is using SQL statement - where each aggregation is considered as subquery.

This PR fixes the rule to NOT use early return and instead have a huge if else.

Described in above.

Yes, this fixes the possible query breakage. The impacted workloads may not be very huge as chained time window aggregations is an advanced usage, and it does not break every query for the usage.

New UTs.

No.

Closes #48309 from HeartSaVioR/SPARK-49836.

Lead-authored-by: Jungtaek Lim <[email protected]>
Co-authored-by: Andrzej Zera <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit d8c04cf)
Signed-off-by: Jungtaek Lim <[email protected]>
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
…ded to window/session_window fn

### What changes were proposed in this pull request?

This PR fixes the correctness issue about losing operators during analysis - it happens when window is provided to window()/session_window() function.

The rule `TimeWindowing` and `SessionWindowing` are responsible to resolve the time window functions. When the window function has `window` as parameter (time column) (in other words, building time window from time window), the rule wraps window with WindowTime function so that the rule ResolveWindowTime will further resolve this. (And TimeWindowing/SessionWindowing will resolve this again against the result of ResolveWindowTime.)

The issue is that the rule uses "return" for the above, which intends to have "early return" as the other branch is too long compared to this branch. This unfortunately does not work as intended - the intention is just to go out of current local scope (mostly end of curly brace), but it seems to break the loop of execution in "outer" side.
(I haven't debugged further but it's simply clear that it doesn't work as intended.)

Quoting from Scala doc:

> Nonlocal returns are implemented by throwing and catching scala.runtime.NonLocalReturnException-s.

It's not super clear where NonLocalReturnException is caught in the call stack; it might exit the execution for much broader scope (context) than expected. And it's finally deprecated in Scala 3.2 and likely be removed in future.

https://dotty.epfl.ch/docs/reference/dropped-features/nonlocal-returns.html

Interestingly it does not break every query for chained time window aggregations. Spark already has several tests with DataFrame API and they haven't failed. The reproducer in community report is using SQL statement - where each aggregation is considered as subquery.

This PR fixes the rule to NOT use early return and instead have a huge if else.

### Why are the changes needed?

Described in above.

### Does this PR introduce _any_ user-facing change?

Yes, this fixes the possible query breakage. The impacted workloads may not be very huge as chained time window aggregations is an advanced usage, and it does not break every query for the usage.

### How was this patch tested?

New UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48309 from HeartSaVioR/SPARK-49836.

Lead-authored-by: Jungtaek Lim <[email protected]>
Co-authored-by: Andrzej Zera <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants