Skip to content

Commit

Permalink
[SPARK-50489][SQL][PYTHON][FOLLOW-UP] Add applyInArrow in `Deduplicat…
Browse files Browse the repository at this point in the history
…eRelations#collectConflictPlans`

### What changes were proposed in this pull request?
Add applyInArrow in `DeduplicateRelations#collectConflictPlans`

### Why are the changes needed?
In #49056, I forgot to add `applyInArrow` in `DeduplicateRelations#collectConflictPlans`

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
tests added in #49056

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #49069 from zhengruifeng/apply_in_arrow_rule.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
zhengruifeng authored and HyukjinKwon committed Dec 6, 2024
1 parent fc69194 commit a435b2e
Showing 1 changed file with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,24 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
newVersion.copyTagsFrom(oldVersion)
Seq((oldVersion, newVersion))

case oldVersion @ FlatMapGroupsInArrow(_, _, output, _)
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
val newVersion = oldVersion.copy(output = output.map(_.newInstance()))
newVersion.copyTagsFrom(oldVersion)
Seq((oldVersion, newVersion))

case oldVersion @ FlatMapCoGroupsInPandas(_, _, _, output, _, _)
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
val newVersion = oldVersion.copy(output = output.map(_.newInstance()))
newVersion.copyTagsFrom(oldVersion)
Seq((oldVersion, newVersion))

case oldVersion @ FlatMapCoGroupsInArrow(_, _, _, output, _, _)
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
val newVersion = oldVersion.copy(output = output.map(_.newInstance()))
newVersion.copyTagsFrom(oldVersion)
Seq((oldVersion, newVersion))

case oldVersion @ MapInPandas(_, output, _, _, _)
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
val newVersion = oldVersion.copy(output = output.map(_.newInstance()))
Expand Down

0 comments on commit a435b2e

Please sign in to comment.