Skip to content

Commit

Permalink
support inter leave node
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 committed Dec 7, 2023
1 parent 33fc110 commit 5b95fa8
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 26 deletions.
5 changes: 5 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,7 @@ message PhysicalPlanNode {
AnalyzeExecNode analyze = 23;
JsonSinkExecNode json_sink = 24;
SymmetricHashJoinExecNode symmetric_hash_join = 25;
InterleaveExecNode interleave = 26;
}
}

Expand Down Expand Up @@ -1454,6 +1455,10 @@ message SymmetricHashJoinExecNode {
JoinFilter filter = 8;
}

message InterleaveExecNode {
repeated PhysicalPlanNode inputs = 1;
}

message UnionExecNode {
repeated PhysicalPlanNode inputs = 1;
}
Expand Down
104 changes: 104 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 35 additions & 9 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use datafusion::physical_plan::{
udaf, AggregateExpr, ExecutionPlan, InputOrderMode, Partitioning, PhysicalExpr,
Expand Down Expand Up @@ -545,7 +545,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
f.expression.as_ref().ok_or_else(|| {
proto_error("Unexpected empty filter expression")
})?,
registry, &schema
registry, &schema,
)?;
let column_indices = f.column_indices
.iter()
Expand All @@ -556,7 +556,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
i.side))
)?;

Ok(ColumnIndex{
Ok(ColumnIndex {
index: i.index as usize,
side: side.into(),
})
Expand Down Expand Up @@ -634,7 +634,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
f.expression.as_ref().ok_or_else(|| {
proto_error("Unexpected empty filter expression")
})?,
registry, &schema
registry, &schema,
)?;
let column_indices = f.column_indices
.iter()
Expand All @@ -645,7 +645,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
i.side))
)?;

Ok(ColumnIndex{
Ok(ColumnIndex {
index: i.index as usize,
side: side.into(),
})
Expand Down Expand Up @@ -693,6 +693,17 @@ impl AsExecutionPlan for PhysicalPlanNode {
}
Ok(Arc::new(UnionExec::new(inputs)))
}
PhysicalPlanType::Interleave(interleave) => {
let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
for input in &interleave.inputs {
inputs.push(input.try_into_physical_plan(
registry,
runtime,
extension_codec,
)?);
}
Ok(Arc::new(InterleaveExec::try_new(inputs)?))
}
PhysicalPlanType::CrossJoin(crossjoin) => {
let left: Arc<dyn ExecutionPlan> = into_physical_plan(
&crossjoin.left,
Expand Down Expand Up @@ -735,7 +746,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
})?
.as_ref();
Ok(PhysicalSortExpr {
expr: parse_physical_expr(expr,registry, input.schema().as_ref())?,
expr: parse_physical_expr(expr, registry, input.schema().as_ref())?,
options: SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
Expand Down Expand Up @@ -782,7 +793,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
})?
.as_ref();
Ok(PhysicalSortExpr {
expr: parse_physical_expr(expr,registry, input.schema().as_ref())?,
expr: parse_physical_expr(expr, registry, input.schema().as_ref())?,
options: SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
Expand Down Expand Up @@ -845,7 +856,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
f.expression.as_ref().ok_or_else(|| {
proto_error("Unexpected empty filter expression")
})?,
registry, &schema
registry, &schema,
)?;
let column_indices = f.column_indices
.iter()
Expand All @@ -856,7 +867,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
i.side))
)?;

Ok(ColumnIndex{
Ok(ColumnIndex {
index: i.index as usize,
side: side.into(),
})
Expand Down Expand Up @@ -1463,6 +1474,21 @@ impl AsExecutionPlan for PhysicalPlanNode {
});
}

if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
let mut inputs: Vec<PhysicalPlanNode> = vec![];
for input in interleave.inputs() {
inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
input.to_owned(),
extension_codec,
)?);
}
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Interleave(
protobuf::InterleaveExecNode { inputs },
)),
});
}

if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
Expand Down
Loading

0 comments on commit 5b95fa8

Please sign in to comment.