Skip to content

Commit

Permalink
fix: recursive cte hangs on joins (apache#9687)
Browse files Browse the repository at this point in the history
* fix: recursive cte hangs on joins

* Use ExecutionPlan::with_new_children

* Naming
  • Loading branch information
jonahgao authored Mar 20, 2024
1 parent 14972e6 commit b72d25c
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 9 deletions.
26 changes: 22 additions & 4 deletions datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,9 @@ impl RecursiveQueryStream {
// Downstream plans should not expect any partitioning.
let partition = 0;

self.recursive_stream = Some(
self.recursive_term
.execute(partition, self.task_context.clone())?,
);
let recursive_plan = reset_plan_states(self.recursive_term.clone())?;
self.recursive_stream =
Some(recursive_plan.execute(partition, self.task_context.clone())?);
self.poll_next(cx)
}
}
Expand Down Expand Up @@ -343,6 +342,25 @@ fn assign_work_table(
.data()
}

/// Some plans will change their internal states after execution, making them unable to be executed again.
/// This function uses `ExecutionPlan::with_new_children` to fork a new plan with initial states.
///
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
/// However, if the data of the left table is derived from the work table, it will become outdated
/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|plan| {
// WorkTableExec's states have already been updated correctly.
if plan.as_any().is::<WorkTableExec>() {
Ok(Transformed::no(plan))
} else {
let new_plan = plan.clone().with_new_children(plan.children())?;
Ok(Transformed::yes(new_plan))
}
})
.data()
}

impl Stream for RecursiveQueryStream {
type Item = Result<RecordBatch>;

Expand Down
73 changes: 68 additions & 5 deletions datafusion/sqllogictest/test_files/cte.slt
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ ProjectionExec: expr=[1 as a, 2 as b, 3 as c]
--PlaceholderRowExec



# enable recursive CTEs
statement ok
set datafusion.execution.enable_recursive_ctes = true;

# trivial recursive CTE works
query I rowsort
WITH RECURSIVE nodes AS (
Expand Down Expand Up @@ -651,3 +646,71 @@ WITH RECURSIVE my_cte AS (
WHERE my_cte.a<5
)
SELECT a FROM my_cte;


# Test issue: https://github.com/apache/arrow-datafusion/issues/9680
query I
WITH RECURSIVE recursive_cte AS (
SELECT 1 as val
UNION ALL
(
WITH sub_cte AS (
SELECT 2 as val
)
SELECT
2 as val
FROM recursive_cte
CROSS JOIN sub_cte
WHERE recursive_cte.val < 2
)
)
SELECT * FROM recursive_cte;
----
1
2

# Test issue: https://github.com/apache/arrow-datafusion/issues/9680
# 'recursive_cte' should be on the left of the cross join, as this is the test purpose of the above query.
query TT
explain WITH RECURSIVE recursive_cte AS (
SELECT 1 as val
UNION ALL
(
WITH sub_cte AS (
SELECT 2 as val
)
SELECT
2 as val
FROM recursive_cte
CROSS JOIN sub_cte
WHERE recursive_cte.val < 2
)
)
SELECT * FROM recursive_cte;
----
logical_plan
Projection: recursive_cte.val
--SubqueryAlias: recursive_cte
----RecursiveQuery: is_distinct=false
------Projection: Int64(1) AS val
--------EmptyRelation
------Projection: Int64(2) AS val
--------CrossJoin:
----------Filter: recursive_cte.val < Int64(2)
------------TableScan: recursive_cte
----------SubqueryAlias: sub_cte
------------Projection: Int64(2) AS val
--------------EmptyRelation
physical_plan
RecursiveQueryExec: name=recursive_cte, is_distinct=false
--ProjectionExec: expr=[1 as val]
----PlaceholderRowExec
--ProjectionExec: expr=[2 as val]
----CrossJoinExec
------CoalescePartitionsExec
--------CoalesceBatchesExec: target_batch_size=8182
----------FilterExec: val@0 < 2
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------WorkTableExec: name=recursive_cte
------ProjectionExec: expr=[2 as val]
--------PlaceholderRowExec

0 comments on commit b72d25c

Please sign in to comment.