Skip to content

Commit

Permalink
Extract EmptyRelation, Limit, Values in LogicalPlan (#1325)
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 authored Nov 19, 2021
1 parent 3625a1d commit feb9ec5
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 86 deletions.
13 changes: 7 additions & 6 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ use datafusion::datasource::TableProvider;

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingTable;
use datafusion::logical_plan::plan::EmptyRelation;
use datafusion::logical_plan::{
exprlist_to_fields,
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, JoinType, LogicalPlan,
Repartition, TableScanPlan,
Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, JoinType, Limit,
LogicalPlan, Repartition, TableScanPlan, Values,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::functions::BuiltinScalarFunction;
Expand Down Expand Up @@ -676,7 +677,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
fn try_into(self) -> Result<protobuf::LogicalPlanNode, Self::Error> {
use protobuf::logical_plan_node::LogicalPlanType;
match self {
LogicalPlan::Values { values, .. } => {
LogicalPlan::Values(Values { values, .. }) => {
let n_cols = if values.is_empty() {
0
} else {
Expand Down Expand Up @@ -875,7 +876,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
))),
})
}
LogicalPlan::Limit { input, n } => {
LogicalPlan::Limit(Limit { input, n }) => {
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
Expand Down Expand Up @@ -936,9 +937,9 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
))),
})
}
LogicalPlan::EmptyRelation {
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row, ..
} => Ok(protobuf::LogicalPlanNode {
}) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::EmptyRelation(
protobuf::EmptyRelationNode {
produce_one_row: *produce_one_row,
Expand Down
14 changes: 7 additions & 7 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::datasource::{
};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::plan::{
AnalyzePlan, ExplainPlan, TableScanPlan, ToStringifiedPlan, Union,
AnalyzePlan, EmptyRelation, ExplainPlan, TableScanPlan, ToStringifiedPlan, Union,
};
use crate::prelude::*;
use crate::scalar::ScalarValue;
Expand All @@ -45,7 +45,7 @@ use super::dfschema::ToDFSchema;
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
use crate::logical_plan::{
columnize_expr, normalize_col, normalize_cols, Column, CrossJoin, DFField, DFSchema,
DFSchemaRef, Partitioning, Repartition,
DFSchemaRef, Limit, Partitioning, Repartition, Values,
};
use crate::sql::utils::group_window_expr_by_sort_keys;

Expand Down Expand Up @@ -109,10 +109,10 @@ impl LogicalPlanBuilder {
///
/// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
pub fn empty(produce_one_row: bool) -> Self {
Self::from(LogicalPlan::EmptyRelation {
Self::from(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row,
schema: DFSchemaRef::new(DFSchema::empty()),
})
}))
}

/// Create a values list based relation, and the schema is inferred from data, consuming
Expand Down Expand Up @@ -186,7 +186,7 @@ impl LogicalPlanBuilder {
values[i][j] = Expr::Literal(ScalarValue::try_from(fields[j].data_type())?);
}
let schema = DFSchemaRef::new(DFSchema::new(fields)?);
Ok(Self::from(LogicalPlan::Values { schema, values }))
Ok(Self::from(LogicalPlan::Values(Values { schema, values })))
}

/// Scan a memory data source
Expand Down Expand Up @@ -459,10 +459,10 @@ impl LogicalPlanBuilder {

/// Apply a limit
pub fn limit(&self, n: usize) -> Result<Self> {
Ok(Self::from(LogicalPlan::Limit {
Ok(Self::from(LogicalPlan::Limit(Limit {
n,
input: Arc::new(self.plan.clone()),
}))
})))
}

/// Apply a sort
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ pub use expr::{
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
pub use plan::{
CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable, JoinConstraint,
JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition,
TableScanPlan, Union,
CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable, EmptyRelation,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor,
Repartition, TableScanPlan, Union, Values,
};
pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
pub use registry::FunctionRegistry;
82 changes: 48 additions & 34 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,35 @@ pub struct ExtensionPlan {
pub node: Arc<dyn UserDefinedLogicalNode + Send + Sync>,
}

/// Produces no rows: An empty relation with an empty schema
#[derive(Clone)]
pub struct EmptyRelation {
/// Whether to produce a placeholder row
pub produce_one_row: bool,
/// The schema description of the output
pub schema: DFSchemaRef,
}

/// Produces the first `n` tuples from its input and discards the rest.
#[derive(Clone)]
pub struct Limit {
/// The limit
pub n: usize,
/// The logical plan
pub input: Arc<LogicalPlan>,
}

/// Values expression. See
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
#[derive(Clone)]
pub struct Values {
/// The table schema
pub schema: DFSchemaRef,
/// Values
pub values: Vec<Vec<Expr>>,
}

/// A LogicalPlan represents the different types of relational
/// operators (such as Projection, Filter, etc) and can be created by
/// the SQL query planner and the DataFrame API.
Expand Down Expand Up @@ -263,19 +292,9 @@ pub enum LogicalPlan {
/// Produces rows from a table provider by reference or from the context
TableScan(TableScanPlan),
/// Produces no rows: An empty relation with an empty schema
EmptyRelation {
/// Whether to produce a placeholder row
produce_one_row: bool,
/// The schema description of the output
schema: DFSchemaRef,
},
EmptyRelation(EmptyRelation),
/// Produces the first `n` tuples from its input and discards the rest.
Limit {
/// The limit
n: usize,
/// The logical plan
input: Arc<LogicalPlan>,
},
Limit(Limit),
/// Creates an external table.
CreateExternalTable(CreateExternalTable),
/// Creates an in memory table.
Expand All @@ -285,12 +304,7 @@ pub enum LogicalPlan {
/// Values expression. See
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
Values {
/// The table schema
schema: DFSchemaRef,
/// Values
values: Vec<Vec<Expr>>,
},
Values(Values),
/// Produces a relation with string representations of
/// various parts of the plan
Explain(ExplainPlan),
Expand All @@ -305,8 +319,8 @@ impl LogicalPlan {
/// Get a reference to the logical plan's schema
pub fn schema(&self) -> &DFSchemaRef {
match self {
LogicalPlan::EmptyRelation { schema, .. } => schema,
LogicalPlan::Values { schema, .. } => schema,
LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
LogicalPlan::Values(Values { schema, .. }) => schema,
LogicalPlan::TableScan(TableScanPlan {
projected_schema, ..
}) => projected_schema,
Expand All @@ -318,7 +332,7 @@ impl LogicalPlan {
LogicalPlan::Join { schema, .. } => schema,
LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
LogicalPlan::Limit { input, .. } => input.schema(),
LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
schema
}
Expand All @@ -339,7 +353,7 @@ impl LogicalPlan {
LogicalPlan::TableScan(TableScanPlan {
projected_schema, ..
}) => vec![projected_schema],
LogicalPlan::Values { schema, .. } => vec![schema],
LogicalPlan::Values(Values { schema, .. }) => vec![schema],
LogicalPlan::Window { input, schema, .. }
| LogicalPlan::Aggregate { input, schema, .. }
| LogicalPlan::Projection { input, schema, .. } => {
Expand Down Expand Up @@ -369,11 +383,11 @@ impl LogicalPlan {
LogicalPlan::Extension(extension) => vec![extension.node.schema()],
LogicalPlan::Explain(ExplainPlan { schema, .. })
| LogicalPlan::Analyze(AnalyzePlan { schema, .. })
| LogicalPlan::EmptyRelation { schema, .. }
| LogicalPlan::EmptyRelation(EmptyRelation { schema, .. })
| LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
vec![schema]
}
LogicalPlan::Limit { input, .. }
LogicalPlan::Limit(Limit { input, .. })
| LogicalPlan::Repartition(Repartition { input, .. })
| LogicalPlan::Sort { input, .. }
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
Expand All @@ -396,7 +410,7 @@ impl LogicalPlan {
pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
match self {
LogicalPlan::Projection { expr, .. } => expr.clone(),
LogicalPlan::Values { values, .. } => {
LogicalPlan::Values(Values { values, .. }) => {
values.iter().flatten().cloned().collect()
}
LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()],
Expand All @@ -421,8 +435,8 @@ impl LogicalPlan {
LogicalPlan::Extension(extension) => extension.node.expressions(),
// plans without expressions
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Limit(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::DropTable(_)
Expand All @@ -447,7 +461,7 @@ impl LogicalPlan {
LogicalPlan::Sort { input, .. } => vec![input],
LogicalPlan::Join { left, right, .. } => vec![left, right],
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
LogicalPlan::Limit { input, .. } => vec![input],
LogicalPlan::Limit(Limit { input, .. }) => vec![input],
LogicalPlan::Extension(extension) => extension.node.inputs(),
LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(),
LogicalPlan::Explain(explain) => vec![&explain.plan],
Expand Down Expand Up @@ -594,7 +608,7 @@ impl LogicalPlan {
}
true
}
LogicalPlan::Limit { input, .. } => input.accept(visitor)?,
LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
input.accept(visitor)?
}
Expand All @@ -610,8 +624,8 @@ impl LogicalPlan {
LogicalPlan::Analyze(analyze) => analyze.input.accept(visitor)?,
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Values(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::DropTable(_) => true,
};
Expand Down Expand Up @@ -799,8 +813,8 @@ impl LogicalPlan {
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &*self.0 {
LogicalPlan::EmptyRelation { .. } => write!(f, "EmptyRelation"),
LogicalPlan::Values { ref values, .. } => {
LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
LogicalPlan::Values(Values { ref values, .. }) => {
let str_values: Vec<_> = values
.iter()
// limit to only 5 values to avoid horrible display
Expand Down Expand Up @@ -924,7 +938,7 @@ impl LogicalPlan {
)
}
},
LogicalPlan::Limit { ref n, .. } => write!(f, "Limit: {}", n),
LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n),
LogicalPlan::CreateExternalTable(CreateExternalTable {
ref name,
..
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::Repartition(_)
| LogicalPlan::Union(_)
| LogicalPlan::TableScan { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::Values(_)
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Limit(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::Values { .. }
| LogicalPlan::Values(_)
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::Limit(_)
| LogicalPlan::Union(_)
| LogicalPlan::Join { .. }
| LogicalPlan::CrossJoin(_) => {
Expand Down Expand Up @@ -107,7 +107,7 @@ impl OptimizerRule for ConstantFolding {

utils::from_plan(plan, &expr, &new_inputs)
}
LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } => {
LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation(_) => {
Ok(plan.clone())
}
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/src/optimizer/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Optimizer rule to replace `LIMIT 0` on a plan with an empty relation.
//! This saves time in planning and executing the query.
use crate::error::Result;
use crate::logical_plan::LogicalPlan;
use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;

use super::utils;
Expand All @@ -41,11 +41,11 @@ impl OptimizerRule for EliminateLimit {
execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Limit { n, input } if *n == 0 => {
Ok(LogicalPlan::EmptyRelation {
LogicalPlan::Limit(Limit { n, input }) if *n == 0 => {
Ok(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: input.schema().clone(),
})
}))
}
// Rest: recurse and find possible LIMIT 0 nodes
_ => {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::{
and, replace_col, Column, CrossJoin, LogicalPlan, TableScanPlan,
and, replace_col, Column, CrossJoin, Limit, LogicalPlan, TableScanPlan,
};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
Expand Down Expand Up @@ -380,7 +380,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
// union all is filter-commutable
push_down(&state, plan)
}
LogicalPlan::Limit { input, .. } => {
LogicalPlan::Limit(Limit { input, .. }) => {
// limit is _not_ filter-commutable => collect all columns from its input
let used_columns = input
.schema()
Expand Down
Loading

0 comments on commit feb9ec5

Please sign in to comment.