diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index a870ecdde9fa..9c138101503c 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -30,11 +30,12 @@ use datafusion::datasource::TableProvider; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingTable; +use datafusion::logical_plan::plan::EmptyRelation; use datafusion::logical_plan::{ exprlist_to_fields, window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, - Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, JoinType, LogicalPlan, - Repartition, TableScanPlan, + Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, JoinType, Limit, + LogicalPlan, Repartition, TableScanPlan, Values, }; use datafusion::physical_plan::aggregates::AggregateFunction; use datafusion::physical_plan::functions::BuiltinScalarFunction; @@ -676,7 +677,7 @@ impl TryInto for &LogicalPlan { fn try_into(self) -> Result { use protobuf::logical_plan_node::LogicalPlanType; match self { - LogicalPlan::Values { values, .. } => { + LogicalPlan::Values(Values { values, .. }) => { let n_cols = if values.is_empty() { 0 } else { @@ -875,7 +876,7 @@ impl TryInto for &LogicalPlan { ))), }) } - LogicalPlan::Limit { input, n } => { + LogicalPlan::Limit(Limit { input, n }) => { let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Limit(Box::new( @@ -936,9 +937,9 @@ impl TryInto for &LogicalPlan { ))), }) } - LogicalPlan::EmptyRelation { + LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row, .. - } => Ok(protobuf::LogicalPlanNode { + }) => Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::EmptyRelation( protobuf::EmptyRelationNode { produce_one_row: *produce_one_row, diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 8c3057926ce9..94e53d63bff8 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -26,7 +26,7 @@ use crate::datasource::{ }; use crate::error::{DataFusionError, Result}; use crate::logical_plan::plan::{ - AnalyzePlan, ExplainPlan, TableScanPlan, ToStringifiedPlan, Union, + AnalyzePlan, EmptyRelation, ExplainPlan, TableScanPlan, ToStringifiedPlan, Union, }; use crate::prelude::*; use crate::scalar::ScalarValue; @@ -45,7 +45,7 @@ use super::dfschema::ToDFSchema; use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType}; use crate::logical_plan::{ columnize_expr, normalize_col, normalize_cols, Column, CrossJoin, DFField, DFSchema, - DFSchemaRef, Partitioning, Repartition, + DFSchemaRef, Limit, Partitioning, Repartition, Values, }; use crate::sql::utils::group_window_expr_by_sort_keys; @@ -109,10 +109,10 @@ impl LogicalPlanBuilder { /// /// `produce_one_row` set to true means this empty node needs to produce a placeholder row. pub fn empty(produce_one_row: bool) -> Self { - Self::from(LogicalPlan::EmptyRelation { + Self::from(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row, schema: DFSchemaRef::new(DFSchema::empty()), - }) + })) } /// Create a values list based relation, and the schema is inferred from data, consuming @@ -186,7 +186,7 @@ impl LogicalPlanBuilder { values[i][j] = Expr::Literal(ScalarValue::try_from(fields[j].data_type())?); } let schema = DFSchemaRef::new(DFSchema::new(fields)?); - Ok(Self::from(LogicalPlan::Values { schema, values })) + Ok(Self::from(LogicalPlan::Values(Values { schema, values }))) } /// Scan a memory data source @@ -459,10 +459,10 @@ impl LogicalPlanBuilder { /// Apply a limit pub fn limit(&self, n: usize) -> Result { - Ok(Self::from(LogicalPlan::Limit { + Ok(Self::from(LogicalPlan::Limit(Limit { n, input: Arc::new(self.plan.clone()), - })) + }))) } /// Apply a sort diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs index bb8954283e8e..73fdcb9b9ee0 100644 --- a/datafusion/src/logical_plan/mod.rs +++ b/datafusion/src/logical_plan/mod.rs @@ -51,9 +51,9 @@ pub use expr::{ pub use extension::UserDefinedLogicalNode; pub use operators::Operator; pub use plan::{ - CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable, JoinConstraint, - JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, - TableScanPlan, Union, + CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, + JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, + Repartition, TableScanPlan, Union, Values, }; pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan}; pub use registry::FunctionRegistry; diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index c12edd98fc01..31de255b3d73 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -173,6 +173,35 @@ pub struct ExtensionPlan { pub node: Arc, } +/// Produces no rows: An empty relation with an empty schema +#[derive(Clone)] +pub struct EmptyRelation { + /// Whether to produce a placeholder row + pub produce_one_row: bool, + /// The schema description of the output + pub schema: DFSchemaRef, +} + +/// Produces the first `n` tuples from its input and discards the rest. +#[derive(Clone)] +pub struct Limit { + /// The limit + pub n: usize, + /// The logical plan + pub input: Arc, +} + +/// Values expression. See +/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) +/// documentation for more details. +#[derive(Clone)] +pub struct Values { + /// The table schema + pub schema: DFSchemaRef, + /// Values + pub values: Vec>, +} + /// A LogicalPlan represents the different types of relational /// operators (such as Projection, Filter, etc) and can be created by /// the SQL query planner and the DataFrame API. @@ -263,19 +292,9 @@ pub enum LogicalPlan { /// Produces rows from a table provider by reference or from the context TableScan(TableScanPlan), /// Produces no rows: An empty relation with an empty schema - EmptyRelation { - /// Whether to produce a placeholder row - produce_one_row: bool, - /// The schema description of the output - schema: DFSchemaRef, - }, + EmptyRelation(EmptyRelation), /// Produces the first `n` tuples from its input and discards the rest. - Limit { - /// The limit - n: usize, - /// The logical plan - input: Arc, - }, + Limit(Limit), /// Creates an external table. CreateExternalTable(CreateExternalTable), /// Creates an in memory table. @@ -285,12 +304,7 @@ pub enum LogicalPlan { /// Values expression. See /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) /// documentation for more details. - Values { - /// The table schema - schema: DFSchemaRef, - /// Values - values: Vec>, - }, + Values(Values), /// Produces a relation with string representations of /// various parts of the plan Explain(ExplainPlan), @@ -305,8 +319,8 @@ impl LogicalPlan { /// Get a reference to the logical plan's schema pub fn schema(&self) -> &DFSchemaRef { match self { - LogicalPlan::EmptyRelation { schema, .. } => schema, - LogicalPlan::Values { schema, .. } => schema, + LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema, + LogicalPlan::Values(Values { schema, .. }) => schema, LogicalPlan::TableScan(TableScanPlan { projected_schema, .. }) => projected_schema, @@ -318,7 +332,7 @@ impl LogicalPlan { LogicalPlan::Join { schema, .. } => schema, LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema, LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(), - LogicalPlan::Limit { input, .. } => input.schema(), + LogicalPlan::Limit(Limit { input, .. }) => input.schema(), LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => { schema } @@ -339,7 +353,7 @@ impl LogicalPlan { LogicalPlan::TableScan(TableScanPlan { projected_schema, .. }) => vec![projected_schema], - LogicalPlan::Values { schema, .. } => vec![schema], + LogicalPlan::Values(Values { schema, .. }) => vec![schema], LogicalPlan::Window { input, schema, .. } | LogicalPlan::Aggregate { input, schema, .. } | LogicalPlan::Projection { input, schema, .. } => { @@ -369,11 +383,11 @@ impl LogicalPlan { LogicalPlan::Extension(extension) => vec![extension.node.schema()], LogicalPlan::Explain(ExplainPlan { schema, .. }) | LogicalPlan::Analyze(AnalyzePlan { schema, .. }) - | LogicalPlan::EmptyRelation { schema, .. } + | LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) | LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => { vec![schema] } - LogicalPlan::Limit { input, .. } + LogicalPlan::Limit(Limit { input, .. }) | LogicalPlan::Repartition(Repartition { input, .. }) | LogicalPlan::Sort { input, .. } | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) @@ -396,7 +410,7 @@ impl LogicalPlan { pub fn expressions(self: &LogicalPlan) -> Vec { match self { LogicalPlan::Projection { expr, .. } => expr.clone(), - LogicalPlan::Values { values, .. } => { + LogicalPlan::Values(Values { values, .. }) => { values.iter().flatten().cloned().collect() } LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()], @@ -421,8 +435,8 @@ impl LogicalPlan { LogicalPlan::Extension(extension) => extension.node.expressions(), // plans without expressions LogicalPlan::TableScan { .. } - | LogicalPlan::EmptyRelation { .. } - | LogicalPlan::Limit { .. } + | LogicalPlan::EmptyRelation(_) + | LogicalPlan::Limit(_) | LogicalPlan::CreateExternalTable(_) | LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable(_) @@ -447,7 +461,7 @@ impl LogicalPlan { LogicalPlan::Sort { input, .. } => vec![input], LogicalPlan::Join { left, right, .. } => vec![left, right], LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right], - LogicalPlan::Limit { input, .. } => vec![input], + LogicalPlan::Limit(Limit { input, .. }) => vec![input], LogicalPlan::Extension(extension) => extension.node.inputs(), LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(), LogicalPlan::Explain(explain) => vec![&explain.plan], @@ -594,7 +608,7 @@ impl LogicalPlan { } true } - LogicalPlan::Limit { input, .. } => input.accept(visitor)?, + LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?, LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => { input.accept(visitor)? } @@ -610,8 +624,8 @@ impl LogicalPlan { LogicalPlan::Analyze(analyze) => analyze.input.accept(visitor)?, // plans without inputs LogicalPlan::TableScan { .. } - | LogicalPlan::EmptyRelation { .. } - | LogicalPlan::Values { .. } + | LogicalPlan::EmptyRelation(_) + | LogicalPlan::Values(_) | LogicalPlan::CreateExternalTable(_) | LogicalPlan::DropTable(_) => true, }; @@ -799,8 +813,8 @@ impl LogicalPlan { impl<'a> fmt::Display for Wrapper<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match &*self.0 { - LogicalPlan::EmptyRelation { .. } => write!(f, "EmptyRelation"), - LogicalPlan::Values { ref values, .. } => { + LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"), + LogicalPlan::Values(Values { ref values, .. }) => { let str_values: Vec<_> = values .iter() // limit to only 5 values to avoid horrible display @@ -924,7 +938,7 @@ impl LogicalPlan { ) } }, - LogicalPlan::Limit { ref n, .. } => write!(f, "Limit: {}", n), + LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n), LogicalPlan::CreateExternalTable(CreateExternalTable { ref name, .. diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index 1ab87e7dd1b6..c8f14040e886 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -199,9 +199,9 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { @@ -107,7 +107,7 @@ impl OptimizerRule for ConstantFolding { utils::from_plan(plan, &expr, &new_inputs) } - LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } => { + LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation(_) => { Ok(plan.clone()) } } diff --git a/datafusion/src/optimizer/eliminate_limit.rs b/datafusion/src/optimizer/eliminate_limit.rs index bf3f2b3be283..1f74ae2ef50f 100644 --- a/datafusion/src/optimizer/eliminate_limit.rs +++ b/datafusion/src/optimizer/eliminate_limit.rs @@ -18,7 +18,7 @@ //! Optimizer rule to replace `LIMIT 0` on a plan with an empty relation. //! This saves time in planning and executing the query. use crate::error::Result; -use crate::logical_plan::LogicalPlan; +use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; use super::utils; @@ -41,11 +41,11 @@ impl OptimizerRule for EliminateLimit { execution_props: &ExecutionProps, ) -> Result { match plan { - LogicalPlan::Limit { n, input } if *n == 0 => { - Ok(LogicalPlan::EmptyRelation { + LogicalPlan::Limit(Limit { n, input }) if *n == 0 => { + Ok(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: input.schema().clone(), - }) + })) } // Rest: recurse and find possible LIMIT 0 nodes _ => { diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index cc8f01193048..7e5c2c1f5712 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -17,7 +17,7 @@ use crate::datasource::datasource::TableProviderFilterPushDown; use crate::execution::context::ExecutionProps; use crate::logical_plan::{ - and, replace_col, Column, CrossJoin, LogicalPlan, TableScanPlan, + and, replace_col, Column, CrossJoin, Limit, LogicalPlan, TableScanPlan, }; use crate::logical_plan::{DFSchema, Expr}; use crate::optimizer::optimizer::OptimizerRule; @@ -380,7 +380,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // union all is filter-commutable push_down(&state, plan) } - LogicalPlan::Limit { input, .. } => { + LogicalPlan::Limit(Limit { input, .. }) => { // limit is _not_ filter-commutable => collect all columns from its input let used_columns = input .schema() diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs index ce8b5bcf3418..320e1c769571 100644 --- a/datafusion/src/optimizer/limit_push_down.rs +++ b/datafusion/src/optimizer/limit_push_down.rs @@ -20,7 +20,7 @@ use super::utils; use crate::error::Result; use crate::execution::context::ExecutionProps; -use crate::logical_plan::TableScanPlan; +use crate::logical_plan::{Limit, TableScanPlan}; use crate::logical_plan::{LogicalPlan, Union}; use crate::optimizer::optimizer::OptimizerRule; use std::sync::Arc; @@ -43,9 +43,9 @@ fn limit_push_down( execution_props: &ExecutionProps, ) -> Result { match (plan, upper_limit) { - (LogicalPlan::Limit { n, input }, upper_limit) => { + (LogicalPlan::Limit(Limit { n, input }), upper_limit) => { let smallest = upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n); - Ok(LogicalPlan::Limit { + Ok(LogicalPlan::Limit(Limit { n: smallest, // push down limit to plan (minimum of upper limit and current limit) input: Arc::new(limit_push_down( @@ -54,7 +54,7 @@ fn limit_push_down( input.as_ref(), execution_props, )?), - }) + })) } ( LogicalPlan::TableScan(TableScanPlan { @@ -110,7 +110,7 @@ fn limit_push_down( let new_inputs = inputs .iter() .map(|x| { - Ok(LogicalPlan::Limit { + Ok(LogicalPlan::Limit(Limit { n: upper_limit, input: Arc::new(limit_push_down( optimizer, @@ -118,7 +118,7 @@ fn limit_push_down( x, execution_props, )?), - }) + })) }) .collect::>()?; Ok(LogicalPlan::Union(Union { diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index b725b1d12411..b6f0ff5e96fe 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -427,11 +427,11 @@ fn optimize_plan( } // all other nodes: Add any additional columns used by // expressions in this node to the list of required columns - LogicalPlan::Limit { .. } + LogicalPlan::Limit(_) | LogicalPlan::Filter { .. } | LogicalPlan::Repartition(_) - | LogicalPlan::EmptyRelation { .. } - | LogicalPlan::Values { .. } + | LogicalPlan::EmptyRelation(_) + | LogicalPlan::Values(_) | LogicalPlan::Sort { .. } | LogicalPlan::CreateExternalTable(_) | LogicalPlan::CreateMemoryTable(_) diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index b9a3b996377c..05c362988570 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -26,8 +26,8 @@ use crate::execution::context::{ExecutionContextState, ExecutionProps}; use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan}; use crate::logical_plan::{ build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr, - ExprRewriter, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Recursion, - Repartition, RewriteRecursion, Union, + ExprRewriter, Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, + Recursion, Repartition, RewriteRecursion, Union, Values, }; use crate::physical_plan::functions::Volatility; use crate::physical_plan::planner::DefaultPhysicalPlanner; @@ -151,13 +151,13 @@ pub fn from_plan( schema: schema.clone(), alias: alias.clone(), }), - LogicalPlan::Values { schema, .. } => Ok(LogicalPlan::Values { + LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { schema: schema.clone(), values: expr .chunks_exact(schema.fields().len()) .map(|s| s.to_vec()) .collect::>(), - }), + })), LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter { predicate: expr[0].clone(), input: Arc::new(inputs[0].clone()), @@ -222,10 +222,10 @@ pub fn from_plan( let right = &inputs[1]; LogicalPlanBuilder::from(left).cross_join(right)?.build() } - LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit { + LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit { n: *n, input: Arc::new(inputs[0].clone()), - }), + })), LogicalPlan::CreateMemoryTable(CreateMemoryTable { name, .. }) => { Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable { input: Arc::new(inputs[0].clone()), @@ -265,7 +265,7 @@ pub fn from_plan( ); Ok(plan.clone()) } - LogicalPlan::EmptyRelation { .. } + LogicalPlan::EmptyRelation(_) | LogicalPlan::TableScan { .. } | LogicalPlan::CreateExternalTable(_) | LogicalPlan::DropTable(_) => { diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index ad858be84171..44bd4b16bb5c 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -23,12 +23,13 @@ use super::{ hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, }; use crate::execution::context::ExecutionContextState; -use crate::logical_plan::TableScanPlan; +use crate::logical_plan::plan::EmptyRelation; use crate::logical_plan::{ unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType, Repartition, ToStringifiedPlan, Union, UserDefinedLogicalNode, }; +use crate::logical_plan::{Limit, TableScanPlan, Values}; use crate::physical_optimizer::optimizer::PhysicalOptimizerRule; use crate::physical_plan::cross_join::CrossJoinExec; use crate::physical_plan::explain::ExplainExec; @@ -347,10 +348,10 @@ impl DefaultPhysicalPlanner { let filters = unnormalize_cols(filters.iter().cloned()); source.scan(projection, batch_size, &filters, *limit).await } - LogicalPlan::Values { + LogicalPlan::Values(Values { values, schema, - } => { + }) => { let exec_schema = schema.as_ref().to_owned().into(); let exprs = values.iter() .map(|row| { @@ -781,14 +782,14 @@ impl DefaultPhysicalPlanner { let right = self.create_initial_plan(right, ctx_state).await?; Ok(Arc::new(CrossJoinExec::try_new(left, right)?)) } - LogicalPlan::EmptyRelation { + LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row, schema, - } => Ok(Arc::new(EmptyExec::new( + }) => Ok(Arc::new(EmptyExec::new( *produce_one_row, SchemaRef::new(schema.as_ref().to_owned().into()), ))), - LogicalPlan::Limit { input, n, .. } => { + LogicalPlan::Limit(Limit { input, n, .. }) => { let limit = *n; let input = self.create_initial_plan(input, ctx_state).await?; diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index 74fbd4eeca2d..a63ef2a3f412 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -87,7 +87,7 @@ use std::{any::Any, collections::BTreeMap, fmt, sync::Arc}; use async_trait::async_trait; use datafusion::execution::context::ExecutionProps; use datafusion::logical_plan::plan::ExtensionPlan; -use datafusion::logical_plan::DFSchemaRef; +use datafusion::logical_plan::{DFSchemaRef, Limit}; /// Execute the specified sql and return the resulting record batches /// pretty printed as a String. @@ -288,7 +288,7 @@ impl OptimizerRule for TopKOptimizerRule { // Note: this code simply looks for the pattern of a Limit followed by a // Sort and replaces it by a TopK node. It does not handle many // edge cases (e.g multiple sort columns, sort ASC / DESC), etc. - if let LogicalPlan::Limit { ref n, ref input } = plan { + if let LogicalPlan::Limit(Limit { ref n, ref input }) = plan { if let LogicalPlan::Sort { ref expr, ref input,