diff --git a/datafusion/core/src/datasource/empty.rs b/datafusion/core/src/datasource/empty.rs index 77160aa5d1c0c..5100987520ee1 100644 --- a/datafusion/core/src/datasource/empty.rs +++ b/datafusion/core/src/datasource/empty.rs @@ -77,7 +77,7 @@ impl TableProvider for EmptyTable { // even though there is no data, projections apply let projected_schema = project_schema(&self.schema, projection)?; Ok(Arc::new( - EmptyExec::new(false, projected_schema).with_partitions(self.partitions), + EmptyExec::new(projected_schema).with_partitions(self.partitions), )) } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a3be57db3a83e..f341de2988271 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -673,7 +673,7 @@ impl TableProvider for ListingTable { if partitioned_file_lists.is_empty() { let schema = self.schema(); let projected_schema = project_schema(&schema, projection)?; - return Ok(Arc::new(EmptyExec::new(false, projected_schema))); + return Ok(Arc::new(EmptyExec::new(projected_schema))); } // extract types of partition columns @@ -701,7 +701,7 @@ impl TableProvider for ListingTable { let object_store_url = if let Some(url) = self.table_paths.get(0) { url.object_store() } else { - return Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty())))); + return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); }; // create the execution plan self.options diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index 4265e3ff80d0e..230a5d965fa4b 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -22,7 +22,7 @@ use super::optimizer::PhysicalOptimizerRule; use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_plan::aggregates::AggregateExec; -use crate::physical_plan::empty::EmptyExec; +use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::{expressions, AggregateExpr, ExecutionPlan, Statistics}; use crate::scalar::ScalarValue; @@ -82,7 +82,7 @@ impl PhysicalOptimizerRule for AggregateStatistics { // input can be entirely removed Ok(Arc::new(ProjectionExec::try_new( projections, - Arc::new(EmptyExec::new(true, plan.schema())), + Arc::new(MemoryExec::try_new_with_dummy_row(plan.schema(), 1)?), )?)) } else { plan.map_children(|child| self.optimize(child, _config)) diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 0c3ac2d24529d..6b2fe24acf005 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -1623,12 +1623,12 @@ mod hash_join_tests { let children = vec![ PipelineStatePropagator { - plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))), + plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))), unbounded: left_unbounded, children: vec![], }, PipelineStatePropagator { - plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))), + plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))), unbounded: right_unbounded, children: vec![], }, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 0e96b126b9674..3401f98747ff1 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1203,12 +1203,17 @@ impl DefaultPhysicalPlanner { } LogicalPlan::Subquery(_) => todo!(), LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row, + produce_one_row: false, schema, }) => Ok(Arc::new(EmptyExec::new( - *produce_one_row, SchemaRef::new(schema.as_ref().to_owned().into()), ))), + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: true, + schema, + }) => Ok(Arc::new( + MemoryExec::try_new_with_dummy_row(SchemaRef::new(schema.as_ref().to_owned().into()), 1)? + )), LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { self.create_initial_plan(input, session_state).await } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index a3e1fb79edb59..a9e403755f740 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -24,8 +24,7 @@ use super::expressions::PhysicalSortExpr; use super::{common, DisplayAs, SendableRecordBatchStream, Statistics}; use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning}; -use arrow::array::{ArrayRef, NullArray}; -use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; +use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; @@ -35,8 +34,6 @@ use log::trace; /// Execution plan for empty relation (produces no rows) #[derive(Debug)] pub struct EmptyExec { - /// Specifies whether this exec produces a row or not - produce_one_row: bool, /// The schema for the produced row schema: SchemaRef, /// Number of partitions @@ -45,9 +42,8 @@ pub struct EmptyExec { impl EmptyExec { /// Create a new EmptyExec - pub fn new(produce_one_row: bool, schema: SchemaRef) -> Self { + pub fn new(schema: SchemaRef) -> Self { EmptyExec { - produce_one_row, schema, partitions: 1, } @@ -59,36 +55,8 @@ impl EmptyExec { self } - /// Specifies whether this exec produces a row or not - pub fn produce_one_row(&self) -> bool { - self.produce_one_row - } - fn data(&self) -> Result> { - let batch = if self.produce_one_row { - let n_field = self.schema.fields.len(); - // hack for https://github.com/apache/arrow-datafusion/pull/3242 - let n_field = if n_field == 0 { 1 } else { n_field }; - vec![RecordBatch::try_new( - Arc::new(Schema::new( - (0..n_field) - .map(|i| { - Field::new(format!("placeholder_{i}"), DataType::Null, true) - }) - .collect::(), - )), - (0..n_field) - .map(|_i| { - let ret: ArrayRef = Arc::new(NullArray::new(1)); - ret - }) - .collect(), - )?] - } else { - vec![] - }; - - Ok(batch) + Ok(vec![]) } } @@ -100,7 +68,7 @@ impl DisplayAs for EmptyExec { ) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row) + write!(f, "EmptyExec") } } } @@ -133,10 +101,7 @@ impl ExecutionPlan for EmptyExec { self: Arc, _: Vec>, ) -> Result> { - Ok(Arc::new(EmptyExec::new( - self.produce_one_row, - self.schema.clone(), - ))) + Ok(Arc::new(EmptyExec::new(self.schema.clone()))) } fn execute( @@ -184,7 +149,7 @@ mod tests { let task_ctx = Arc::new(TaskContext::default()); let schema = test::aggr_test_schema(); - let empty = EmptyExec::new(false, schema.clone()); + let empty = EmptyExec::new(schema.clone()); assert_eq!(empty.schema(), schema); // we should have no results @@ -198,16 +163,11 @@ mod tests { #[test] fn with_new_children() -> Result<()> { let schema = test::aggr_test_schema(); - let empty = Arc::new(EmptyExec::new(false, schema.clone())); - let empty_with_row = Arc::new(EmptyExec::new(true, schema)); + let empty = Arc::new(EmptyExec::new(schema.clone())); let empty2 = with_new_children_if_necessary(empty.clone(), vec![])?.into(); assert_eq!(empty.schema(), empty2.schema()); - let empty_with_row_2 = - with_new_children_if_necessary(empty_with_row.clone(), vec![])?.into(); - assert_eq!(empty_with_row.schema(), empty_with_row_2.schema()); - let too_many_kids = vec![empty2]; assert!( with_new_children_if_necessary(empty, too_many_kids).is_err(), @@ -220,44 +180,11 @@ mod tests { async fn invalid_execute() -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); let schema = test::aggr_test_schema(); - let empty = EmptyExec::new(false, schema); + let empty = EmptyExec::new(schema); // ask for the wrong partition assert!(empty.execute(1, task_ctx.clone()).is_err()); assert!(empty.execute(20, task_ctx).is_err()); Ok(()) } - - #[tokio::test] - async fn produce_one_row() -> Result<()> { - let task_ctx = Arc::new(TaskContext::default()); - let schema = test::aggr_test_schema(); - let empty = EmptyExec::new(true, schema); - - let iter = empty.execute(0, task_ctx)?; - let batches = common::collect(iter).await?; - - // should have one item - assert_eq!(batches.len(), 1); - - Ok(()) - } - - #[tokio::test] - async fn produce_one_row_multiple_partition() -> Result<()> { - let task_ctx = Arc::new(TaskContext::default()); - let schema = test::aggr_test_schema(); - let partitions = 3; - let empty = EmptyExec::new(true, schema).with_partitions(partitions); - - for n in 0..partitions { - let iter = empty.execute(n, task_ctx.clone())?; - let batches = common::collect(iter).await?; - - // should have one item - assert_eq!(batches.len(), 1); - } - - Ok(()) - } } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 39cd47452eff9..d17b9983b82fe 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -28,7 +28,9 @@ use super::{ SendableRecordBatchStream, Statistics, }; +use arrow::array::{ArrayRef, NullArray}; use arrow::datatypes::SchemaRef; +use arrow::datatypes::{DataType, Field, Fields, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, project_schema, DataFusionError, Result}; use datafusion_execution::TaskContext; @@ -177,6 +179,26 @@ impl MemoryExec { }) } + pub fn try_new_with_dummy_row(schema: SchemaRef, partitions: usize) -> Result { + let n_field = schema.fields.len(); + // hack for https://github.com/apache/arrow-datafusion/pull/3242 + let n_field = if n_field == 0 { 1 } else { n_field }; + let part = vec![RecordBatch::try_new( + Arc::new(Schema::new( + (0..n_field) + .map(|i| Field::new(format!("placeholder_{i}"), DataType::Null, true)) + .collect::(), + )), + (0..n_field) + .map(|_i| { + let ret: ArrayRef = Arc::new(NullArray::new(1)); + ret + }) + .collect(), + )?]; + Self::try_new(&vec![part; partitions], schema, None) + } + pub fn partitions(&self) -> &[Vec] { &self.partitions } @@ -280,11 +302,14 @@ mod tests { use crate::memory::MemoryExec; use crate::ExecutionPlan; + use crate::{common, test}; use arrow_schema::{DataType, Field, Schema, SortOptions}; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::PhysicalSortExpr; + use datafusion_execution::TaskContext; + #[test] fn test_memory_order_eq() -> datafusion_common::Result<()> { let schema = Arc::new(Schema::new(vec![ @@ -316,4 +341,37 @@ mod tests { assert!(eq_properties.oeq_class().contains(&expected_order_eq)); Ok(()) } + + #[tokio::test] + async fn dummy_row() -> datafusion_common::Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = test::aggr_test_schema(); + let dummy: MemoryExec = MemoryExec::try_new_with_dummy_row(schema, 1)?; + + let iter = dummy.execute(0, task_ctx)?; + let batches = common::collect(iter).await?; + + // should have one item + assert_eq!(batches.len(), 1); + + Ok(()) + } + + #[tokio::test] + async fn dummy_row_multiple_partition() -> datafusion_common::Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = test::aggr_test_schema(); + let partitions = 3; + let dummy: MemoryExec = MemoryExec::try_new_with_dummy_row(schema, partitions)?; + + for n in 0..partitions { + let iter = dummy.execute(n, task_ctx.clone())?; + let batches = common::collect(iter).await?; + + // should have one item + assert_eq!(batches.len(), 1); + } + + Ok(()) + } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 8c2fd5369e33e..e3da5bfb5f643 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1484,7 +1484,6 @@ message JoinOn { } message EmptyExecNode { - bool produce_one_row = 1; Schema schema = 2; } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index b8c5f6a4aae85..a42db804fe333 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -6392,14 +6392,11 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode { D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "produce_one_row", - "produceOneRow", "schema", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { - ProduceOneRow, Schema, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -6422,7 +6419,6 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode { E: serde::de::Error, { match value { - "produceOneRow" | "produce_one_row" => Ok(GeneratedField::ProduceOneRow), "schema" => Ok(GeneratedField::Schema), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } @@ -6443,16 +6439,9 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode { where V: serde::de::MapAccess<'de>, { - let mut produce_one_row__ = None; let mut schema__ = None; while let Some(k) = map_.next_key()? { match k { - GeneratedField::ProduceOneRow => { - if produce_one_row__.is_some() { - return Err(serde::de::Error::duplicate_field("produceOneRow")); - } - produce_one_row__ = Some(map_.next_value()?); - } GeneratedField::Schema => { if schema__.is_some() { return Err(serde::de::Error::duplicate_field("schema")); @@ -6462,7 +6451,6 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode { } } Ok(EmptyExecNode { - produce_one_row: produce_one_row__.unwrap_or_default(), schema: schema__, }) } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index c31bc4ab59481..5478df270eabf 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2084,8 +2084,6 @@ pub struct JoinOn { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct EmptyExecNode { - #[prost(bool, tag = "1")] - pub produce_one_row: bool, #[prost(message, optional, tag = "2")] pub schema: ::core::option::Option, } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 6714c35dc6150..27e654d3cd60d 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -704,7 +704,7 @@ impl AsExecutionPlan for PhysicalPlanNode { } PhysicalPlanType::Empty(empty) => { let schema = Arc::new(convert_required!(empty.schema)?); - Ok(Arc::new(EmptyExec::new(empty.produce_one_row, schema))) + Ok(Arc::new(EmptyExec::new(schema))) } PhysicalPlanType::Sort(sort) => { let input: Arc = @@ -1289,7 +1289,6 @@ impl AsExecutionPlan for PhysicalPlanNode { return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Empty( protobuf::EmptyExecNode { - produce_one_row: empty.produce_one_row(), schema: Some(schema), }, )), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index d7d762d470d76..9117183365312 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -20,7 +20,9 @@ use std::sync::Arc; use datafusion::arrow::array::ArrayRef; use datafusion::arrow::compute::kernels::sort::SortOptions; -use datafusion::arrow::datatypes::{DataType, Field, Fields, IntervalUnit, Schema}; +use datafusion::arrow::datatypes::{ + DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, +}; use datafusion::datasource::file_format::json::JsonSink; use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; @@ -49,6 +51,7 @@ use datafusion::physical_plan::joins::{ HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode, }; use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::windows::{ @@ -102,7 +105,7 @@ fn roundtrip_test_with_context( #[test] fn roundtrip_empty() -> Result<()> { - roundtrip_test(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty())))) + roundtrip_test(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))) } #[test] @@ -115,7 +118,7 @@ fn roundtrip_date_time_interval() -> Result<()> { false, ), ]); - let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone()))); + let input = Arc::new(EmptyExec::new(Arc::new(schema.clone()))); let date_expr = col("some_date", &schema)?; let literal_expr = col("some_interval", &schema)?; let date_time_interval_expr = @@ -130,7 +133,7 @@ fn roundtrip_date_time_interval() -> Result<()> { #[test] fn roundtrip_local_limit() -> Result<()> { roundtrip_test(Arc::new(LocalLimitExec::new( - Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))), + Arc::new(EmptyExec::new(Arc::new(Schema::empty()))), 25, ))) } @@ -138,7 +141,7 @@ fn roundtrip_local_limit() -> Result<()> { #[test] fn roundtrip_global_limit() -> Result<()> { roundtrip_test(Arc::new(GlobalLimitExec::new( - Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))), + Arc::new(EmptyExec::new(Arc::new(Schema::empty()))), 0, Some(25), ))) @@ -147,7 +150,7 @@ fn roundtrip_global_limit() -> Result<()> { #[test] fn roundtrip_global_skip_no_limit() -> Result<()> { roundtrip_test(Arc::new(GlobalLimitExec::new( - Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))), + Arc::new(EmptyExec::new(Arc::new(Schema::empty()))), 10, None, // no limit ))) @@ -177,8 +180,8 @@ fn roundtrip_hash_join() -> Result<()> { ] { for partition_mode in &[PartitionMode::Partitioned, PartitionMode::CollectLeft] { roundtrip_test(Arc::new(HashJoinExec::try_new( - Arc::new(EmptyExec::new(false, schema_left.clone())), - Arc::new(EmptyExec::new(false, schema_right.clone())), + Arc::new(EmptyExec::new(schema_left.clone())), + Arc::new(EmptyExec::new(schema_right.clone())), on.clone(), None, join_type, @@ -209,8 +212,8 @@ fn roundtrip_nested_loop_join() -> Result<()> { JoinType::RightSemi, ] { roundtrip_test(Arc::new(NestedLoopJoinExec::try_new( - Arc::new(EmptyExec::new(false, schema_left.clone())), - Arc::new(EmptyExec::new(false, schema_right.clone())), + Arc::new(EmptyExec::new(schema_left.clone())), + Arc::new(EmptyExec::new(schema_right.clone())), None, join_type, )?))?; @@ -275,7 +278,7 @@ fn roundtrip_window() -> Result<()> { Arc::new(window_frame), )); - let input = Arc::new(EmptyExec::new(false, schema.clone())); + let input = Arc::new(EmptyExec::new(schema.clone())); roundtrip_test(Arc::new(WindowAggExec::try_new( vec![ @@ -309,7 +312,7 @@ fn rountrip_aggregate() -> Result<()> { aggregates.clone(), vec![None], vec![None], - Arc::new(EmptyExec::new(false, schema.clone())), + Arc::new(EmptyExec::new(schema.clone())), schema, )?)) } @@ -377,7 +380,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> { aggregates.clone(), vec![None], vec![None], - Arc::new(EmptyExec::new(false, schema.clone())), + Arc::new(EmptyExec::new(schema.clone())), schema, )?), ctx, @@ -403,7 +406,7 @@ fn roundtrip_filter_with_not_and_in_list() -> Result<()> { let and = binary(not, Operator::And, in_list, &schema)?; roundtrip_test(Arc::new(FilterExec::try_new( and, - Arc::new(EmptyExec::new(false, schema.clone())), + Arc::new(EmptyExec::new(schema.clone())), )?)) } @@ -430,7 +433,7 @@ fn roundtrip_sort() -> Result<()> { ]; roundtrip_test(Arc::new(SortExec::new( sort_exprs, - Arc::new(EmptyExec::new(false, schema)), + Arc::new(EmptyExec::new(schema)), ))) } @@ -458,11 +461,11 @@ fn roundtrip_sort_preserve_partitioning() -> Result<()> { roundtrip_test(Arc::new(SortExec::new( sort_exprs.clone(), - Arc::new(EmptyExec::new(false, schema.clone())), + Arc::new(EmptyExec::new(schema.clone())), )))?; roundtrip_test(Arc::new( - SortExec::new(sort_exprs, Arc::new(EmptyExec::new(false, schema))) + SortExec::new(sort_exprs, Arc::new(EmptyExec::new(schema))) .with_preserve_partitioning(true), )) } @@ -512,7 +515,7 @@ fn roundtrip_builtin_scalar_function() -> Result<()> { let field_b = Field::new("b", DataType::Int64, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); - let input = Arc::new(EmptyExec::new(false, schema.clone())); + let input = Arc::new(EmptyExec::new(schema.clone())); let execution_props = ExecutionProps::new(); @@ -539,7 +542,7 @@ fn roundtrip_scalar_udf() -> Result<()> { let field_b = Field::new("b", DataType::Int64, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); - let input = Arc::new(EmptyExec::new(false, schema.clone())); + let input = Arc::new(EmptyExec::new(schema.clone())); let fn_impl = |args: &[ArrayRef]| Ok(Arc::new(args[0].clone()) as ArrayRef); @@ -592,7 +595,7 @@ fn roundtrip_distinct_count() -> Result<()> { aggregates.clone(), vec![None], vec![None], - Arc::new(EmptyExec::new(false, schema.clone())), + Arc::new(EmptyExec::new(schema.clone())), schema, )?)) } @@ -603,7 +606,7 @@ fn roundtrip_like() -> Result<()> { Field::new("a", DataType::Utf8, false), Field::new("b", DataType::Utf8, false), ]); - let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone()))); + let input = Arc::new(EmptyExec::new(Arc::new(schema.clone()))); let like_expr = like( false, false, @@ -630,7 +633,7 @@ fn roundtrip_get_indexed_field_named_struct_field() -> Result<()> { ]; let schema = Schema::new(fields); - let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone()))); + let input = Arc::new(EmptyExec::new(Arc::new(schema.clone()))); let col_arg = col("arg", &schema)?; let get_indexed_field_expr = Arc::new(GetIndexedFieldExpr::new( @@ -657,7 +660,10 @@ fn roundtrip_get_indexed_field_list_index() -> Result<()> { ]; let schema = Schema::new(fields); - let input = Arc::new(EmptyExec::new(true, Arc::new(schema.clone()))); + let input = Arc::new(MemoryExec::try_new_with_dummy_row( + SchemaRef::new(schema.clone()), + 1, + )?); let col_arg = col("arg", &schema)?; let col_key = col("key", &schema)?; @@ -684,7 +690,7 @@ fn roundtrip_get_indexed_field_list_range() -> Result<()> { ]; let schema = Schema::new(fields); - let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone()))); + let input = Arc::new(EmptyExec::new(Arc::new(schema.clone()))); let col_arg = col("arg", &schema)?; let col_start = col("start", &schema)?; @@ -710,7 +716,10 @@ fn roundtrip_analyze() -> Result<()> { let field_a = Field::new("plan_type", DataType::Utf8, false); let field_b = Field::new("plan", DataType::Utf8, false); let schema = Schema::new(vec![field_a, field_b]); - let input = Arc::new(EmptyExec::new(true, Arc::new(schema.clone()))); + let input = Arc::new(MemoryExec::try_new_with_dummy_row( + SchemaRef::new(schema.clone()), + 1, + )?); roundtrip_test(Arc::new(AnalyzeExec::new( false, @@ -725,7 +734,7 @@ fn roundtrip_json_sink() -> Result<()> { let field_a = Field::new("plan_type", DataType::Utf8, false); let field_b = Field::new("plan", DataType::Utf8, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); - let input = Arc::new(EmptyExec::new(true, schema.clone())); + let input = Arc::new(MemoryExec::try_new_with_dummy_row(schema.clone(), 1)?); let file_sink_config = FileSinkConfig { object_store_url: ObjectStoreUrl::local_filesystem(), @@ -785,8 +794,8 @@ fn roundtrip_sym_hash_join() -> Result<()> { ] { roundtrip_test(Arc::new( datafusion::physical_plan::joins::SymmetricHashJoinExec::try_new( - Arc::new(EmptyExec::new(false, schema_left.clone())), - Arc::new(EmptyExec::new(false, schema_right.clone())), + Arc::new(EmptyExec::new(schema_left.clone())), + Arc::new(EmptyExec::new(schema_right.clone())), on.clone(), None, join_type,