Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for recursive CTEs #462

Closed
Dandandan opened this issue May 31, 2021 · 18 comments
Closed

Add support for recursive CTEs #462

Dandandan opened this issue May 31, 2021 · 18 comments
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@Dandandan
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Recursive CTEs are interesting to support more complex algorithms like graph processing

Describe the solution you'd like
Implement recursive CTEs. Some research needs to be done to do this in an efficient way in DataFusion

Describe alternatives you've considered

Additional context

@Dandandan Dandandan added the enhancement New feature or request label May 31, 2021
@jimexist
Copy link
Member

jimexist commented Jun 1, 2021

this is interesting to handle. i imaging it's easier than window functions. but i guess i shall finish that first.

@houqp houqp added the help wanted Extra attention is needed label Oct 18, 2021
@isidentical
Copy link
Contributor

isidentical commented Oct 2, 2022

@Dandandan I've been experimenting with whether support for recursive CTEs could be done without too much overhead (like holding onto all the results from all the iterations until the recursion is over) and without using any real 'tables' (like the "working table" concept in postgres, a table where the previous iteration's results are materialized). It seems like there might be a solution, but before going in too much deeper and spending a lot of time on the details I wanted to see if it makes sense in general (or whether there are any obvious obstacles that I might have missed) and whether I should submit it as a PR to Datafusion? (it is probably too large to be sent as a single PR, so I also might start a task list and send it as smaller / localized parts)

Implementation (very wordy)

The branch (the source code also has a decent-ish amount of documentation left-over from my experiments, in case anyone is interested).

WITH RECURSIVE nodes(node_1) AS (
    SELECT 1 as node_1
  UNION ALL
    SELECT node_1+1 as node_1 FROM nodes WHERE node_1 < 100
)
SELECT node_1 FROM nodes;

I've tried to localize the number of changes needed to compile the recursive queries in the SQL planner, and it seems like we could get away with no changes outside of the CTE compilation part (and a new logical op). The main workflow which we use when compiling a query like the one above is the following:

  • We split the CTE into two terms (the part before the UNION is called a static term and the one after is called a recursive term).
  • The static term is compiled to a logical plan as is.
  • For the recursive term, compiling it without any special operations is impossible since it includes a back-reference to the table it is originating from (FROM nodes). For getting around this:
    • We create a new 'named relation' (a relation object, like EmptyRelation, that has a handle and a schema). The schema comes from the static term's plan.
    • The 'named relation' is registered as a CTE with the name nodes. So when the SQL planner is resolving that relation, it will take the 'named relation' plan we created and use it.
    • We compile the recursive term and de-register the CTE (since the real nodes, is actually a different plan).
  • We combine both the static term and the recursive term in a new logical operation called RecursiveQuery (also with the handle name that we have used to create the named relation) and register it as a CTE.

Here is the plan for the query above:

Projection: #nodes.node_1                                       
  RecursiveQuery: name=nodes, is_distinct=false                              <--------
    Projection: Int64(1) AS node_1, alias=nodes                 
      EmptyRelation                                             
    Projection: #nodes.node_1 + Int64(1) AS node_1, alias=nodes 
      Filter: #nodes.node_1 < Int64(100)                        
        NamedRelation: nodes                                                 <--------

In terms of the physical plan, for each RecursiveQuery we create a RecursiveQueryExec (very similar in terms of fields). And for each NamedRelation, we create a ContinuationStream (more details below). Once these are translated, the final representation looks something like this:

ProjectionExec: expr=[node_1@0 as node_1]                       
  RecursiveQueryExec: name=nodes, is_distinct=false                         
    CoalescePartitionsExec                                      
      ProjectionExec: expr=[1 as node_1]                        
        RepartitionExec: partitioning=RoundRobinBatch(16)       
          EmptyExec: produce_one_row=true                       
    CoalescePartitionsExec                                      
      ProjectionExec: expr=[node_1@0 + 1 as node_1]             
        CoalesceBatchesExec: target_batch_size=4096             
          FilterExec: node_1@0 < 100                            
            RepartitionExec: partitioning=RoundRobinBatch(16)   
              ContinuanceExec: name=nodes  

image

The namings on the sketch might be a bit cryptic, so here is a brief explanation:

  • T'0 corresponds to the static term, where it will be executed without any access to the previous data.
  • For each batch T'0 produces, we'll directly mirror it to the parent operation (e.g. a filter, like F), so it will not suffer from buffering.
  • For the batches we are mirroring, we are also going to save a copy of them to an in-memory store (L).
  • Once T'0 ends, L will decide on whether to have another iteration based on the rows it collected (the previous term must produce at least 1 new [sometimes unique depending on the case] item). If it does, then it will setup an MPSC channel and start streaming the buffered batches. Once that is done it will execute the recursive term as T'1.
  • T'1 will start by reading the data from the newly created MPSC channel (this is what continuation streams does), repeat these steps, and if it can successfully produce new rows, then a new stream T'2 will be spawned (and all the buffered data from T'1 will be transferred to it).
  • And once T'2 fails to produce any more rows (e.g. a WHERE n <= 100 clause with all ns are >100), it will finish operating.

The parent operation receives its input no matter what happens internally (in terms of buffering). The buffering currently is only for the previous iteration's records, but we might also want to keep a hashset of the all the rows we have seen once we add support for UNION syntax with recursive (it is pretty useful and prevents most of the infinite recursion cases).

@isidentical
Copy link
Contributor

I was thinking about how we can split, and an initial plan might look like this if there are no objections on separating ContinuanceStream as a single patch (if it sounds better, also can combine first two steps).

Possible roadmap?

  • Add continuance streams (a "working table" operation for DataFusion that actually uses streams under the hood).

The implementation is self-contained enough that I think it could be split (with tests), and it would include the push_relation_handler/pop_relation_handler piece in task contexts, as well as the implementation of the physical operation. The only question would be whether it is fine to add a new physical operation that doesn't have immediate usage?

  • Implement recursive queries (as a both physical and a logical operation).

This would be a sizable change that can actually implement the initial piece of logic (without distinct) where we could execute queries up until a certain condition has been met. It would also include new logical operations (RecursiveQuery and NamedRelation) and also the actual usage of the continuance streams.

  • Enable SQL planning

The implementation in terms of SQL is completely decoupled from the actual logical/physical representation, and I think it can be added last, the algorithm is basically using a temporary CTE and then replacing it with the original form, more details in the main PR.

  • Start supporting UNION

This would require us to actually record what sort of values we have actually collected (probably not direct references, but hashes) and it would be a bit less efficient than the UNION ALL solution.

@Dandandan
Copy link
Contributor Author

Thank you for the write up @isidentical!
I hope to have some time soon to give some feedback on the approach

@matthewgapp
Copy link
Contributor

matthewgapp commented Sep 16, 2023

Huge plus one for support here @isidentical and @Dandandan :) I need this for a use case. Would help bring it up to feature parity with DuckDB.

@matthewgapp
Copy link
Contributor

I thought I'd take a stab at refreshing your PR @isidentical. Here's the refreshed PR. Quickly threw together and it compiles but it fails at runtime when using your recursive example. I haven't taken a step back to figure out why (yet). Thought I'd share in case you know next steps.

thread 'main' panicked at 'partition not used yet', /Users/matthewgapp/.cargo/git/checkouts/arrow-datafusion-152bda9bc50d03e7/9479b2b/datafusion/physical-plan/src/repartition/mod.rs:562:14

@isidentical
Copy link
Contributor

@matthewgapp thanks for trying to revive it (and if you are interested, you have my full blessing on taking over it), but unfortunately I might not be able to give a lot of help on it (am unavailable due to projects that require my attention). I am hoping that the written material in this issue and the comments on the PR might guide you if you choose to take over that implementation as is (although it was my first attempt, and I haven't poured too much time on it).

@matthewgapp
Copy link
Contributor

matthewgapp commented Sep 17, 2023

@isidentical thanks for the response and encouragement! I'll try to take a stab at getting it working. Seems like RepartitionExec#execute doesn't expect to be called more than once with the same partition number (RecursiveQueryExec expects to call its recursive term over and over). At the moment, I'm not sure of good/robust workaround (first time diving into DF's implementation). And not sure if that invariant is assumed across other ExecutionPlans

Here are the logical and physical plans for reference

+---------------+-------------------------------------------------------------------------------+
| plan_type     | plan                                                                          |
+---------------+-------------------------------------------------------------------------------+
| logical_plan  | RecursiveQuery: is_distinct=false                                             |
|               |   Projection: Int64(1) AS node_1                                              |
|               |     EmptyRelation                                                             |
|               |   Projection: node_1 + Int64(1) AS node_1                                     |
|               |     Filter: node_1 < Int64(100)                                               |
|               |       NamedRelation: nodes                                                    |
| physical_plan | RecursiveQueryExec: is_distinct=false                                         |
|               |   ProjectionExec: expr=[1 as node_1]                                          |
|               |     EmptyExec: produce_one_row=true                                           |
|               |   ProjectionExec: expr=[node_1@0 + 1 as node_1]                               |
|               |     CoalesceBatchesExec: target_batch_size=8192                               |
|               |       FilterExec: node_1@0 < 100                                              |
|               |         RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1 |
|               |           ContinuanceExec: name=nodes                                         |
|               |                                                                               |
+---------------+-------------------------------------------------------------------------------+

@matthewgapp
Copy link
Contributor

matthewgapp commented Sep 17, 2023

Ok, I got it working by eliminating RepartitionExec from the physical plan (removed the physical optimizer rules) 💯. Perhaps, because RecursiveQuery always executes the 0th partition of its recursive term, we should disallow physical plan optimizations that introduce repartitions in descendents of RecursiveQuery (upstream in terms of dataflow).

As an aside, this approach seems to be about 4x faster than DuckDB (using the trivial example with 1 million iterations). I have yet to test it thoroughly.

@matthewgapp
Copy link
Contributor

matthewgapp commented Sep 19, 2023

I've continued to clean up the rough edges and test more use cases. I have a couple priorities before requesting additional feedback

  • stop optimizer from inserting RepartitionExec as a decedent of the RecursiveQueryExec
  • implement more robust validation in the Query to LogicalPlan transform step. We should try to catch the obvious cases of infinite recursion, etc.
  • perform more testing and perhaps write some tests that capture this behavior. So far I've manually tested the trivial case, selects from another table in the static term, joins in the static term and joins in the recursive term.
  • outline obvious performance improvements (like [optionally] eliminating metric in the recursive tree since these need to be rebuilt every iteration because recursive queries behave very much unlike regular queries). Other "regular query" optimizations might need to be eliminated (for example, CoalesceBatches seems to slow things down)

@alamb
Copy link
Contributor

alamb commented Sep 20, 2023

stop optimizer from inserting RepartitionExec as a decedent of the RecursiveQueryExec

I think you can control this by setting some combination of https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_distribution and https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.benefits_from_input_partitioning

perform more testing and perhaps write some tests that capture this behavior. So far I've manually tested the trivial case, selects from another table in the static term, joins in the static term and joins in the recursive term.

Definitely -- at the very least I think it should have some sqllogictest end to end sql test coverage (see https://github.com/apache/arrow-datafusion/tree/main/datafusion/sqllogictest for more details)

Excited to see this work progress. Thank you @matthewgapp

@matthewgapp
Copy link
Contributor

matthewgapp commented Sep 21, 2023

@alamb thanks for the pointer on those "characteristic metadata" methods. I ran across them late last night and it seemed to solve the blocking issue. But the optimizer still slides in RepartitionsExec and CoalesceExec into the physical plan which slows things down by an order of magnitude (~30x). To that end, I've drafted a follow-on PR that omits them (only) when they'd be descendants of a recursive query.

Will work on getting this thing tested (might not be until this weekend). Lmk if any other comments/thoughts in meantime.

@matthewgapp
Copy link
Contributor

matthewgapp commented Jan 12, 2024

Per the conversation in #7581, I'll break down #7581 into, roughly, the following PRs

@alamb
Copy link
Contributor

alamb commented Mar 11, 2024

I filed #9554 to track enabling recursive CTEs by default. Let's track progress there

@l1t1
Copy link

l1t1 commented Apr 25, 2024

which version began to support it?
I tried the latest python version 36.0, it failed

ctx.sql('with recursive t as (select 1 union all select 2 from t)select * from t')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
Exception: DataFusion error: NotImplemented("Recursive CTEs are not enabled")

@alamb
Copy link
Contributor

alamb commented Apr 25, 2024

I believe the initial implementation is in 37 -- https://github.com/apache/datafusion/blob/main/dev/changelog/37.0.0.md

@l1t1
Copy link

l1t1 commented Apr 26, 2024

the speed seems not fast. I used the binary file

C:\Users\LD>d:datafusion-cli
DataFusion CLI v37.0.0
> with recursive t as (select 1 a union all select a+1 from t where a <10000)select count(*) from t;
+----------+
| COUNT(*) |
+----------+
| 10000    |
+----------+
1 row(s) fetched.
Elapsed 5.281 seconds.

C:\Users\LD>d:sqlite\sqlite340
SQLite version 3.40.0 2022-11-16 12:10:08

sqlite> with recursive t as (select 1 a union all select a+1 from t where a <10000000)select count(*) from t;
10000000
Run Time: real 4.903 user 4.898431 sys 0.000000

@alamb
Copy link
Contributor

alamb commented Apr 26, 2024

I am sure there are ways the performance could be improved -- the overhead of running each iteration is non trivial (it starts a new plan). It might be worth filing a new ticket to track potential performance improvements if you are interested in helping out there

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants