Skip to content

Commit

Permalink
Make physical_recursive_cte a parallel sink
Browse files Browse the repository at this point in the history
  • Loading branch information
kryonix committed Oct 12, 2023
1 parent 0801192 commit 5be9f78
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/execution/operator/set/physical_recursive_cte.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class RecursiveCTEState : public GlobalSinkState {
unique_ptr<GroupedAggregateHashTable> ht;

bool intermediate_empty = true;
mutex intermediate_table_lock;
ColumnDataCollection intermediate_table;
ColumnDataScanState scan_state;
bool initialized = false;
Expand All @@ -63,6 +64,8 @@ idx_t PhysicalRecursiveCTE::ProbeHT(DataChunk &chunk, RecursiveCTEState &state)

SinkResultType PhysicalRecursiveCTE::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
auto &gstate = input.global_state.Cast<RecursiveCTEState>();

lock_guard<mutex> guard(gstate.intermediate_table_lock);
if (!union_all) {
idx_t match_count = ProbeHT(chunk, gstate);
if (match_count > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class PhysicalRecursiveCTE : public PhysicalOperator {
return true;
}

bool ParallelSink() const override {
return true;
}

string ParamsToString() const override;

public:
Expand Down

0 comments on commit 5be9f78

Please sign in to comment.