From f5c6cf71cf1085cc41c5970711f8f8b171e75873 Mon Sep 17 00:00:00 2001 From: Mohammad Razeghi Date: Mon, 4 Dec 2023 12:23:58 +0100 Subject: [PATCH 1/4] move EmptyExec produce_one_row to be part of MemoryExec --- datafusion/core/src/datasource/empty.rs | 2 +- .../core/src/datasource/listing/table.rs | 4 +- .../aggregate_statistics.rs | 4 +- .../src/physical_optimizer/join_selection.rs | 4 +- datafusion/core/src/physical_planner.rs | 9 +- datafusion/physical-plan/src/empty.rs | 89 ++----------------- datafusion/physical-plan/src/memory.rs | 58 ++++++++++++ datafusion/proto/proto/datafusion.proto | 1 - datafusion/proto/src/generated/pbjson.rs | 18 ---- datafusion/proto/src/generated/prost.rs | 2 - datafusion/proto/src/physical_plan/mod.rs | 3 +- .../tests/cases/roundtrip_physical_plan.rs | 65 ++++++++------ 12 files changed, 118 insertions(+), 141 deletions(-) diff --git a/datafusion/core/src/datasource/empty.rs b/datafusion/core/src/datasource/empty.rs index 77160aa5d1c0..5100987520ee 100644 --- a/datafusion/core/src/datasource/empty.rs +++ b/datafusion/core/src/datasource/empty.rs @@ -77,7 +77,7 @@ impl TableProvider for EmptyTable { // even though there is no data, projections apply let projected_schema = project_schema(&self.schema, projection)?; Ok(Arc::new( - EmptyExec::new(false, projected_schema).with_partitions(self.partitions), + EmptyExec::new(projected_schema).with_partitions(self.partitions), )) } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a3be57db3a83..f341de298827 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -673,7 +673,7 @@ impl TableProvider for ListingTable { if partitioned_file_lists.is_empty() { let schema = self.schema(); let projected_schema = project_schema(&schema, projection)?; - return Ok(Arc::new(EmptyExec::new(false, projected_schema))); + return Ok(Arc::new(EmptyExec::new(projected_schema))); } // extract types of partition columns @@ -701,7 +701,7 @@ impl TableProvider for ListingTable { let object_store_url = if let Some(url) = self.table_paths.get(0) { url.object_store() } else { - return Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty())))); + return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); }; // create the execution plan self.options diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index 4265e3ff80d0..230a5d965fa4 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -22,7 +22,7 @@ use super::optimizer::PhysicalOptimizerRule; use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_plan::aggregates::AggregateExec; -use crate::physical_plan::empty::EmptyExec; +use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::{expressions, AggregateExpr, ExecutionPlan, Statistics}; use crate::scalar::ScalarValue; @@ -82,7 +82,7 @@ impl PhysicalOptimizerRule for AggregateStatistics { // input can be entirely removed Ok(Arc::new(ProjectionExec::try_new( projections, - Arc::new(EmptyExec::new(true, plan.schema())), + Arc::new(MemoryExec::try_new_with_dummy_row(plan.schema(), 1)?), )?)) } else { plan.map_children(|child| self.optimize(child, _config)) diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 0c3ac2d24529..6b2fe24acf00 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -1623,12 +1623,12 @@ mod hash_join_tests { let children = vec![ PipelineStatePropagator { - plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))), + plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))), unbounded: left_unbounded, children: vec![], }, PipelineStatePropagator { - plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))), + plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))), unbounded: right_unbounded, children: vec![], }, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 0e96b126b967..3401f98747ff 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1203,12 +1203,17 @@ impl DefaultPhysicalPlanner { } LogicalPlan::Subquery(_) => todo!(), LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row, + produce_one_row: false, schema, }) => Ok(Arc::new(EmptyExec::new( - *produce_one_row, SchemaRef::new(schema.as_ref().to_owned().into()), ))), + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: true, + schema, + }) => Ok(Arc::new( + MemoryExec::try_new_with_dummy_row(SchemaRef::new(schema.as_ref().to_owned().into()), 1)? + )), LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { self.create_initial_plan(input, session_state).await } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index a3e1fb79edb5..a9e403755f74 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -24,8 +24,7 @@ use super::expressions::PhysicalSortExpr; use super::{common, DisplayAs, SendableRecordBatchStream, Statistics}; use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning}; -use arrow::array::{ArrayRef, NullArray}; -use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; +use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; @@ -35,8 +34,6 @@ use log::trace; /// Execution plan for empty relation (produces no rows) #[derive(Debug)] pub struct EmptyExec { - /// Specifies whether this exec produces a row or not - produce_one_row: bool, /// The schema for the produced row schema: SchemaRef, /// Number of partitions @@ -45,9 +42,8 @@ pub struct EmptyExec { impl EmptyExec { /// Create a new EmptyExec - pub fn new(produce_one_row: bool, schema: SchemaRef) -> Self { + pub fn new(schema: SchemaRef) -> Self { EmptyExec { - produce_one_row, schema, partitions: 1, } @@ -59,36 +55,8 @@ impl EmptyExec { self } - /// Specifies whether this exec produces a row or not - pub fn produce_one_row(&self) -> bool { - self.produce_one_row - } - fn data(&self) -> Result> { - let batch = if self.produce_one_row { - let n_field = self.schema.fields.len(); - // hack for https://github.com/apache/arrow-datafusion/pull/3242 - let n_field = if n_field == 0 { 1 } else { n_field }; - vec![RecordBatch::try_new( - Arc::new(Schema::new( - (0..n_field) - .map(|i| { - Field::new(format!("placeholder_{i}"), DataType::Null, true) - }) - .collect::(), - )), - (0..n_field) - .map(|_i| { - let ret: ArrayRef = Arc::new(NullArray::new(1)); - ret - }) - .collect(), - )?] - } else { - vec![] - }; - - Ok(batch) + Ok(vec![]) } } @@ -100,7 +68,7 @@ impl DisplayAs for EmptyExec { ) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row) + write!(f, "EmptyExec") } } } @@ -133,10 +101,7 @@ impl ExecutionPlan for EmptyExec { self: Arc, _: Vec>, ) -> Result> { - Ok(Arc::new(EmptyExec::new( - self.produce_one_row, - self.schema.clone(), - ))) + Ok(Arc::new(EmptyExec::new(self.schema.clone()))) } fn execute( @@ -184,7 +149,7 @@ mod tests { let task_ctx = Arc::new(TaskContext::default()); let schema = test::aggr_test_schema(); - let empty = EmptyExec::new(false, schema.clone()); + let empty = EmptyExec::new(schema.clone()); assert_eq!(empty.schema(), schema); // we should have no results @@ -198,16 +163,11 @@ mod tests { #[test] fn with_new_children() -> Result<()> { let schema = test::aggr_test_schema(); - let empty = Arc::new(EmptyExec::new(false, schema.clone())); - let empty_with_row = Arc::new(EmptyExec::new(true, schema)); + let empty = Arc::new(EmptyExec::new(schema.clone())); let empty2 = with_new_children_if_necessary(empty.clone(), vec![])?.into(); assert_eq!(empty.schema(), empty2.schema()); - let empty_with_row_2 = - with_new_children_if_necessary(empty_with_row.clone(), vec![])?.into(); - assert_eq!(empty_with_row.schema(), empty_with_row_2.schema()); - let too_many_kids = vec![empty2]; assert!( with_new_children_if_necessary(empty, too_many_kids).is_err(), @@ -220,44 +180,11 @@ mod tests { async fn invalid_execute() -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); let schema = test::aggr_test_schema(); - let empty = EmptyExec::new(false, schema); + let empty = EmptyExec::new(schema); // ask for the wrong partition assert!(empty.execute(1, task_ctx.clone()).is_err()); assert!(empty.execute(20, task_ctx).is_err()); Ok(()) } - - #[tokio::test] - async fn produce_one_row() -> Result<()> { - let task_ctx = Arc::new(TaskContext::default()); - let schema = test::aggr_test_schema(); - let empty = EmptyExec::new(true, schema); - - let iter = empty.execute(0, task_ctx)?; - let batches = common::collect(iter).await?; - - // should have one item - assert_eq!(batches.len(), 1); - - Ok(()) - } - - #[tokio::test] - async fn produce_one_row_multiple_partition() -> Result<()> { - let task_ctx = Arc::new(TaskContext::default()); - let schema = test::aggr_test_schema(); - let partitions = 3; - let empty = EmptyExec::new(true, schema).with_partitions(partitions); - - for n in 0..partitions { - let iter = empty.execute(n, task_ctx.clone())?; - let batches = common::collect(iter).await?; - - // should have one item - assert_eq!(batches.len(), 1); - } - - Ok(()) - } } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 39cd47452eff..d17b9983b82f 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -28,7 +28,9 @@ use super::{ SendableRecordBatchStream, Statistics, }; +use arrow::array::{ArrayRef, NullArray}; use arrow::datatypes::SchemaRef; +use arrow::datatypes::{DataType, Field, Fields, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, project_schema, DataFusionError, Result}; use datafusion_execution::TaskContext; @@ -177,6 +179,26 @@ impl MemoryExec { }) } + pub fn try_new_with_dummy_row(schema: SchemaRef, partitions: usize) -> Result { + let n_field = schema.fields.len(); + // hack for https://github.com/apache/arrow-datafusion/pull/3242 + let n_field = if n_field == 0 { 1 } else { n_field }; + let part = vec![RecordBatch::try_new( + Arc::new(Schema::new( + (0..n_field) + .map(|i| Field::new(format!("placeholder_{i}"), DataType::Null, true)) + .collect::(), + )), + (0..n_field) + .map(|_i| { + let ret: ArrayRef = Arc::new(NullArray::new(1)); + ret + }) + .collect(), + )?]; + Self::try_new(&vec![part; partitions], schema, None) + } + pub fn partitions(&self) -> &[Vec] { &self.partitions } @@ -280,11 +302,14 @@ mod tests { use crate::memory::MemoryExec; use crate::ExecutionPlan; + use crate::{common, test}; use arrow_schema::{DataType, Field, Schema, SortOptions}; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::PhysicalSortExpr; + use datafusion_execution::TaskContext; + #[test] fn test_memory_order_eq() -> datafusion_common::Result<()> { let schema = Arc::new(Schema::new(vec![ @@ -316,4 +341,37 @@ mod tests { assert!(eq_properties.oeq_class().contains(&expected_order_eq)); Ok(()) } + + #[tokio::test] + async fn dummy_row() -> datafusion_common::Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = test::aggr_test_schema(); + let dummy: MemoryExec = MemoryExec::try_new_with_dummy_row(schema, 1)?; + + let iter = dummy.execute(0, task_ctx)?; + let batches = common::collect(iter).await?; + + // should have one item + assert_eq!(batches.len(), 1); + + Ok(()) + } + + #[tokio::test] + async fn dummy_row_multiple_partition() -> datafusion_common::Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = test::aggr_test_schema(); + let partitions = 3; + let dummy: MemoryExec = MemoryExec::try_new_with_dummy_row(schema, partitions)?; + + for n in 0..partitions { + let iter = dummy.execute(n, task_ctx.clone())?; + let batches = common::collect(iter).await?; + + // should have one item + assert_eq!(batches.len(), 1); + } + + Ok(()) + } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 8c2fd5369e33..e3da5bfb5f64 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1484,7 +1484,6 @@ message JoinOn { } message EmptyExecNode { - bool produce_one_row = 1; Schema schema = 2; } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index b8c5f6a4aae8..b2a6b4ac2bb2 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -6369,16 +6369,10 @@ impl serde::Serialize for EmptyExecNode { { use serde::ser::SerializeStruct; let mut len = 0; - if self.produce_one_row { - len += 1; - } if self.schema.is_some() { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion.EmptyExecNode", len)?; - if self.produce_one_row { - struct_ser.serialize_field("produceOneRow", &self.produce_one_row)?; - } if let Some(v) = self.schema.as_ref() { struct_ser.serialize_field("schema", v)?; } @@ -6392,14 +6386,11 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode { D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "produce_one_row", - "produceOneRow", "schema", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { - ProduceOneRow, Schema, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -6422,7 +6413,6 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode { E: serde::de::Error, { match value { - "produceOneRow" | "produce_one_row" => Ok(GeneratedField::ProduceOneRow), "schema" => Ok(GeneratedField::Schema), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } @@ -6443,16 +6433,9 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode { where V: serde::de::MapAccess<'de>, { - let mut produce_one_row__ = None; let mut schema__ = None; while let Some(k) = map_.next_key()? { match k { - GeneratedField::ProduceOneRow => { - if produce_one_row__.is_some() { - return Err(serde::de::Error::duplicate_field("produceOneRow")); - } - produce_one_row__ = Some(map_.next_value()?); - } GeneratedField::Schema => { if schema__.is_some() { return Err(serde::de::Error::duplicate_field("schema")); @@ -6462,7 +6445,6 @@ impl<'de> serde::Deserialize<'de> for EmptyExecNode { } } Ok(EmptyExecNode { - produce_one_row: produce_one_row__.unwrap_or_default(), schema: schema__, }) } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index c31bc4ab5948..5478df270eab 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2084,8 +2084,6 @@ pub struct JoinOn { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct EmptyExecNode { - #[prost(bool, tag = "1")] - pub produce_one_row: bool, #[prost(message, optional, tag = "2")] pub schema: ::core::option::Option, } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 6714c35dc615..27e654d3cd60 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -704,7 +704,7 @@ impl AsExecutionPlan for PhysicalPlanNode { } PhysicalPlanType::Empty(empty) => { let schema = Arc::new(convert_required!(empty.schema)?); - Ok(Arc::new(EmptyExec::new(empty.produce_one_row, schema))) + Ok(Arc::new(EmptyExec::new(schema))) } PhysicalPlanType::Sort(sort) => { let input: Arc = @@ -1289,7 +1289,6 @@ impl AsExecutionPlan for PhysicalPlanNode { return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Empty( protobuf::EmptyExecNode { - produce_one_row: empty.produce_one_row(), schema: Some(schema), }, )), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index d7d762d470d7..911718336531 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -20,7 +20,9 @@ use std::sync::Arc; use datafusion::arrow::array::ArrayRef; use datafusion::arrow::compute::kernels::sort::SortOptions; -use datafusion::arrow::datatypes::{DataType, Field, Fields, IntervalUnit, Schema}; +use datafusion::arrow::datatypes::{ + DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, +}; use datafusion::datasource::file_format::json::JsonSink; use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; @@ -49,6 +51,7 @@ use datafusion::physical_plan::joins::{ HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode, }; use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::windows::{ @@ -102,7 +105,7 @@ fn roundtrip_test_with_context( #[test] fn roundtrip_empty() -> Result<()> { - roundtrip_test(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty())))) + roundtrip_test(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))) } #[test] @@ -115,7 +118,7 @@ fn roundtrip_date_time_interval() -> Result<()> { false, ), ]); - let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone()))); + let input = Arc::new(EmptyExec::new(Arc::new(schema.clone()))); let date_expr = col("some_date", &schema)?; let literal_expr = col("some_interval", &schema)?; let date_time_interval_expr = @@ -130,7 +133,7 @@ fn roundtrip_date_time_interval() -> Result<()> { #[test] fn roundtrip_local_limit() -> Result<()> { roundtrip_test(Arc::new(LocalLimitExec::new( - Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))), + Arc::new(EmptyExec::new(Arc::new(Schema::empty()))), 25, ))) } @@ -138,7 +141,7 @@ fn roundtrip_local_limit() -> Result<()> { #[test] fn roundtrip_global_limit() -> Result<()> { roundtrip_test(Arc::new(GlobalLimitExec::new( - Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))), + Arc::new(EmptyExec::new(Arc::new(Schema::empty()))), 0, Some(25), ))) @@ -147,7 +150,7 @@ fn roundtrip_global_limit() -> Result<()> { #[test] fn roundtrip_global_skip_no_limit() -> Result<()> { roundtrip_test(Arc::new(GlobalLimitExec::new( - Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))), + Arc::new(EmptyExec::new(Arc::new(Schema::empty()))), 10, None, // no limit ))) @@ -177,8 +180,8 @@ fn roundtrip_hash_join() -> Result<()> { ] { for partition_mode in &[PartitionMode::Partitioned, PartitionMode::CollectLeft] { roundtrip_test(Arc::new(HashJoinExec::try_new( - Arc::new(EmptyExec::new(false, schema_left.clone())), - Arc::new(EmptyExec::new(false, schema_right.clone())), + Arc::new(EmptyExec::new(schema_left.clone())), + Arc::new(EmptyExec::new(schema_right.clone())), on.clone(), None, join_type, @@ -209,8 +212,8 @@ fn roundtrip_nested_loop_join() -> Result<()> { JoinType::RightSemi, ] { roundtrip_test(Arc::new(NestedLoopJoinExec::try_new( - Arc::new(EmptyExec::new(false, schema_left.clone())), - Arc::new(EmptyExec::new(false, schema_right.clone())), + Arc::new(EmptyExec::new(schema_left.clone())), + Arc::new(EmptyExec::new(schema_right.clone())), None, join_type, )?))?; @@ -275,7 +278,7 @@ fn roundtrip_window() -> Result<()> { Arc::new(window_frame), )); - let input = Arc::new(EmptyExec::new(false, schema.clone())); + let input = Arc::new(EmptyExec::new(schema.clone())); roundtrip_test(Arc::new(WindowAggExec::try_new( vec![ @@ -309,7 +312,7 @@ fn rountrip_aggregate() -> Result<()> { aggregates.clone(), vec![None], vec![None], - Arc::new(EmptyExec::new(false, schema.clone())), + Arc::new(EmptyExec::new(schema.clone())), schema, )?)) } @@ -377,7 +380,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> { aggregates.clone(), vec![None], vec![None], - Arc::new(EmptyExec::new(false, schema.clone())), + Arc::new(EmptyExec::new(schema.clone())), schema, )?), ctx, @@ -403,7 +406,7 @@ fn roundtrip_filter_with_not_and_in_list() -> Result<()> { let and = binary(not, Operator::And, in_list, &schema)?; roundtrip_test(Arc::new(FilterExec::try_new( and, - Arc::new(EmptyExec::new(false, schema.clone())), + Arc::new(EmptyExec::new(schema.clone())), )?)) } @@ -430,7 +433,7 @@ fn roundtrip_sort() -> Result<()> { ]; roundtrip_test(Arc::new(SortExec::new( sort_exprs, - Arc::new(EmptyExec::new(false, schema)), + Arc::new(EmptyExec::new(schema)), ))) } @@ -458,11 +461,11 @@ fn roundtrip_sort_preserve_partitioning() -> Result<()> { roundtrip_test(Arc::new(SortExec::new( sort_exprs.clone(), - Arc::new(EmptyExec::new(false, schema.clone())), + Arc::new(EmptyExec::new(schema.clone())), )))?; roundtrip_test(Arc::new( - SortExec::new(sort_exprs, Arc::new(EmptyExec::new(false, schema))) + SortExec::new(sort_exprs, Arc::new(EmptyExec::new(schema))) .with_preserve_partitioning(true), )) } @@ -512,7 +515,7 @@ fn roundtrip_builtin_scalar_function() -> Result<()> { let field_b = Field::new("b", DataType::Int64, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); - let input = Arc::new(EmptyExec::new(false, schema.clone())); + let input = Arc::new(EmptyExec::new(schema.clone())); let execution_props = ExecutionProps::new(); @@ -539,7 +542,7 @@ fn roundtrip_scalar_udf() -> Result<()> { let field_b = Field::new("b", DataType::Int64, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); - let input = Arc::new(EmptyExec::new(false, schema.clone())); + let input = Arc::new(EmptyExec::new(schema.clone())); let fn_impl = |args: &[ArrayRef]| Ok(Arc::new(args[0].clone()) as ArrayRef); @@ -592,7 +595,7 @@ fn roundtrip_distinct_count() -> Result<()> { aggregates.clone(), vec![None], vec![None], - Arc::new(EmptyExec::new(false, schema.clone())), + Arc::new(EmptyExec::new(schema.clone())), schema, )?)) } @@ -603,7 +606,7 @@ fn roundtrip_like() -> Result<()> { Field::new("a", DataType::Utf8, false), Field::new("b", DataType::Utf8, false), ]); - let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone()))); + let input = Arc::new(EmptyExec::new(Arc::new(schema.clone()))); let like_expr = like( false, false, @@ -630,7 +633,7 @@ fn roundtrip_get_indexed_field_named_struct_field() -> Result<()> { ]; let schema = Schema::new(fields); - let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone()))); + let input = Arc::new(EmptyExec::new(Arc::new(schema.clone()))); let col_arg = col("arg", &schema)?; let get_indexed_field_expr = Arc::new(GetIndexedFieldExpr::new( @@ -657,7 +660,10 @@ fn roundtrip_get_indexed_field_list_index() -> Result<()> { ]; let schema = Schema::new(fields); - let input = Arc::new(EmptyExec::new(true, Arc::new(schema.clone()))); + let input = Arc::new(MemoryExec::try_new_with_dummy_row( + SchemaRef::new(schema.clone()), + 1, + )?); let col_arg = col("arg", &schema)?; let col_key = col("key", &schema)?; @@ -684,7 +690,7 @@ fn roundtrip_get_indexed_field_list_range() -> Result<()> { ]; let schema = Schema::new(fields); - let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone()))); + let input = Arc::new(EmptyExec::new(Arc::new(schema.clone()))); let col_arg = col("arg", &schema)?; let col_start = col("start", &schema)?; @@ -710,7 +716,10 @@ fn roundtrip_analyze() -> Result<()> { let field_a = Field::new("plan_type", DataType::Utf8, false); let field_b = Field::new("plan", DataType::Utf8, false); let schema = Schema::new(vec![field_a, field_b]); - let input = Arc::new(EmptyExec::new(true, Arc::new(schema.clone()))); + let input = Arc::new(MemoryExec::try_new_with_dummy_row( + SchemaRef::new(schema.clone()), + 1, + )?); roundtrip_test(Arc::new(AnalyzeExec::new( false, @@ -725,7 +734,7 @@ fn roundtrip_json_sink() -> Result<()> { let field_a = Field::new("plan_type", DataType::Utf8, false); let field_b = Field::new("plan", DataType::Utf8, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); - let input = Arc::new(EmptyExec::new(true, schema.clone())); + let input = Arc::new(MemoryExec::try_new_with_dummy_row(schema.clone(), 1)?); let file_sink_config = FileSinkConfig { object_store_url: ObjectStoreUrl::local_filesystem(), @@ -785,8 +794,8 @@ fn roundtrip_sym_hash_join() -> Result<()> { ] { roundtrip_test(Arc::new( datafusion::physical_plan::joins::SymmetricHashJoinExec::try_new( - Arc::new(EmptyExec::new(false, schema_left.clone())), - Arc::new(EmptyExec::new(false, schema_right.clone())), + Arc::new(EmptyExec::new(schema_left.clone())), + Arc::new(EmptyExec::new(schema_right.clone())), on.clone(), None, join_type, From 2312e1fbe8f181130609b90e302c989b5c7072d0 Mon Sep 17 00:00:00 2001 From: Mohammad Razeghi Date: Mon, 4 Dec 2023 19:31:06 +0100 Subject: [PATCH 2/4] reflect the change to test strings, slt and md files --- datafusion/core/src/physical_planner.rs | 2 +- datafusion/core/tests/sql/explain_analyze.rs | 4 ++-- datafusion/optimizer/README.md | 6 +++--- datafusion/physical-plan/src/display.rs | 2 +- datafusion/sqllogictest/test_files/explain.slt | 6 +++--- datafusion/sqllogictest/test_files/join.slt | 2 +- datafusion/sqllogictest/test_files/limit.slt | 6 +++--- datafusion/sqllogictest/test_files/union.slt | 10 +++++----- datafusion/sqllogictest/test_files/window.slt | 16 ++++++++-------- 9 files changed, 27 insertions(+), 27 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 3401f98747ff..ab9a1a8318a6 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2779,7 +2779,7 @@ mod tests { digraph { 1[shape=box label="ProjectionExec: expr=[id@0 + 2 as employee.id + Int32(2)]", tooltip=""] - 2[shape=box label="EmptyExec: produce_one_row=false", tooltip=""] + 2[shape=box label="EmptyExec", tooltip=""] 1 -> 2 [arrowhead=none, arrowtail=normal, dir=back] } // End DataFusion GraphViz Plan diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index ecb5766a3bb5..38603698f184 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -575,7 +575,7 @@ async fn explain_analyze_runs_optimizers() { // This happens as an optimization pass where count(*) can be // answered using statistics only. - let expected = "EmptyExec: produce_one_row=true"; + let expected = "MemoryExec: partitions=1, partition_sizes=[1]"; let sql = "EXPLAIN SELECT count(*) from alltypes_plain"; let actual = execute_to_batches(&ctx, sql).await; @@ -806,7 +806,7 @@ async fn explain_physical_plan_only() { let expected = vec![vec![ "physical_plan", "ProjectionExec: expr=[2 as COUNT(*)]\ - \n EmptyExec: produce_one_row=true\ + \n MemoryExec: partitions=1, partition_sizes=[1]\ \n", ]]; assert_eq!(expected, actual); diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index b8e5b93e6692..87c33be011d6 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -153,7 +153,7 @@ Looking at the `EXPLAIN` output we can see that the optimizer has effectively re | logical_plan | Projection: Int64(3) AS Int64(1) + Int64(2) | | | EmptyRelation | | physical_plan | ProjectionExec: expr=[3 as Int64(1) + Int64(2)] | -| | EmptyExec: produce_one_row=true | +| | MemoryExec: partitions=1, partition_sizes=[1] | | | | +---------------+-------------------------------------------------+ ``` @@ -318,7 +318,7 @@ In the following example, the `type_coercion` and `simplify_expressions` passes | logical_plan | Projection: Utf8("3.2") AS foo | | | EmptyRelation | | initial_physical_plan | ProjectionExec: expr=[3.2 as foo] | -| | EmptyExec: produce_one_row=true | +| | MemoryExec: partitions=1, partition_sizes=[1] | | | | | physical_plan after aggregate_statistics | SAME TEXT AS ABOVE | | physical_plan after join_selection | SAME TEXT AS ABOVE | @@ -326,7 +326,7 @@ In the following example, the `type_coercion` and `simplify_expressions` passes | physical_plan after repartition | SAME TEXT AS ABOVE | | physical_plan after add_merge_exec | SAME TEXT AS ABOVE | | physical_plan | ProjectionExec: expr=[3.2 as foo] | -| | EmptyExec: produce_one_row=true | +| | MemoryExec: partitions=1, partition_sizes=[1] | | | | +------------------------------------------------------------+---------------------------------------------------------------------------+ ``` diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index aa368251ebf3..612e164be0e2 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -132,7 +132,7 @@ impl<'a> DisplayableExecutionPlan<'a> { /// ```dot /// strict digraph dot_plan { // 0[label="ProjectionExec: expr=[id@0 + 2 as employee.id + Int32(2)]",tooltip=""] - // 1[label="EmptyExec: produce_one_row=false",tooltip=""] + // 1[label="EmptyExec",tooltip=""] // 0 -> 1 // } /// ``` diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 18792735ffed..e06ce421eb55 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -94,7 +94,7 @@ EXPLAIN select count(*) from (values ('a', 1, 100), ('a', 2, 150)) as t (c1,c2,c ---- physical_plan ProjectionExec: expr=[2 as COUNT(*)] ---EmptyExec: produce_one_row=true +--MemoryExec: partitions=1, partition_sizes=[1] statement ok set datafusion.explain.physical_plan_only = false @@ -368,7 +368,7 @@ Projection: List([[1, 2, 3], [4, 5, 6]]) AS make_array(make_array(Int64(1),Int64 --EmptyRelation physical_plan ProjectionExec: expr=[[[1, 2, 3], [4, 5, 6]] as make_array(make_array(Int64(1),Int64(2),Int64(3)),make_array(Int64(4),Int64(5),Int64(6)))] ---EmptyExec: produce_one_row=true +--MemoryExec: partitions=1, partition_sizes=[1] query TT explain select [[1, 2, 3], [4, 5, 6]]; @@ -378,4 +378,4 @@ Projection: List([[1, 2, 3], [4, 5, 6]]) AS make_array(make_array(Int64(1),Int64 --EmptyRelation physical_plan ProjectionExec: expr=[[[1, 2, 3], [4, 5, 6]] as make_array(make_array(Int64(1),Int64(2),Int64(3)),make_array(Int64(4),Int64(5),Int64(6)))] ---EmptyExec: produce_one_row=true +--MemoryExec: partitions=1, partition_sizes=[1] diff --git a/datafusion/sqllogictest/test_files/join.slt b/datafusion/sqllogictest/test_files/join.slt index 874d849e9a29..386ffe766b19 100644 --- a/datafusion/sqllogictest/test_files/join.slt +++ b/datafusion/sqllogictest/test_files/join.slt @@ -556,7 +556,7 @@ query TT explain select * from t1 join t2 on false; ---- logical_plan EmptyRelation -physical_plan EmptyExec: produce_one_row=false +physical_plan EmptyExec # Make batch size smaller than table row number. to introduce parallelism to the plan. statement ok diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 182195112e87..f2aa9fa673bd 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -312,7 +312,7 @@ Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] ----TableScan: t1 projection=[], fetch=14 physical_plan ProjectionExec: expr=[0 as COUNT(*)] ---EmptyExec: produce_one_row=true +--MemoryExec: partitions=1, partition_sizes=[1] query I SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 11); @@ -330,7 +330,7 @@ Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] ----TableScan: t1 projection=[], fetch=11 physical_plan ProjectionExec: expr=[2 as COUNT(*)] ---EmptyExec: produce_one_row=true +--MemoryExec: partitions=1, partition_sizes=[1] query I SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 8); @@ -348,7 +348,7 @@ Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] ----TableScan: t1 projection=[] physical_plan ProjectionExec: expr=[2 as COUNT(*)] ---EmptyExec: produce_one_row=true +--MemoryExec: partitions=1, partition_sizes=[1] query I SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 8); diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 0f255cdb9fb9..8b6e90e73bf2 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -546,11 +546,11 @@ UnionExec ------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=1 ----------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[] -------------EmptyExec: produce_one_row=true +------------MemoryExec: partitions=1, partition_sizes=[1] --ProjectionExec: expr=[2 as a] -----EmptyExec: produce_one_row=true +----MemoryExec: partitions=1, partition_sizes=[1] --ProjectionExec: expr=[3 as a] -----EmptyExec: produce_one_row=true +----MemoryExec: partitions=1, partition_sizes=[1] # test UNION ALL aliases correctly with aliased subquery query TT @@ -578,7 +578,7 @@ UnionExec --------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=1 ----------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[COUNT(*)] ------------ProjectionExec: expr=[5 as n] ---------------EmptyExec: produce_one_row=true +--------------MemoryExec: partitions=1, partition_sizes=[1] --ProjectionExec: expr=[1 as count, MAX(Int64(10))@0 as n] ----AggregateExec: mode=Single, gby=[], aggr=[MAX(Int64(10))] -------EmptyExec: produce_one_row=true +------MemoryExec: partitions=1, partition_sizes=[1] diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index bb6ca119480d..6011649d64c3 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -279,13 +279,13 @@ SortPreservingMergeExec: [b@0 ASC NULLS LAST] ------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[MAX(d.a)] --------------UnionExec ----------------ProjectionExec: expr=[1 as a, aa as b] -------------------EmptyExec: produce_one_row=true +------------------MemoryExec: partitions=1, partition_sizes=[1] ----------------ProjectionExec: expr=[3 as a, aa as b] -------------------EmptyExec: produce_one_row=true +------------------MemoryExec: partitions=1, partition_sizes=[1] ----------------ProjectionExec: expr=[5 as a, bb as b] -------------------EmptyExec: produce_one_row=true +------------------MemoryExec: partitions=1, partition_sizes=[1] ----------------ProjectionExec: expr=[7 as a, bb as b] -------------------EmptyExec: produce_one_row=true +------------------MemoryExec: partitions=1, partition_sizes=[1] # Check actual result: query TI @@ -365,13 +365,13 @@ SortPreservingMergeExec: [b@0 ASC NULLS LAST] --------------RepartitionExec: partitioning=Hash([b@1], 4), input_partitions=4 ----------------UnionExec ------------------ProjectionExec: expr=[1 as a, aa as b] ---------------------EmptyExec: produce_one_row=true +--------------------MemoryExec ------------------ProjectionExec: expr=[3 as a, aa as b] ---------------------EmptyExec: produce_one_row=true +--------------------MemoryExec ------------------ProjectionExec: expr=[5 as a, bb as b] ---------------------EmptyExec: produce_one_row=true +--------------------MemoryExec ------------------ProjectionExec: expr=[7 as a, bb as b] ---------------------EmptyExec: produce_one_row=true +--------------------MemoryExec # check actual result From faa521e7ce9cd4f71f707abd60b8fd8f8835ac4e Mon Sep 17 00:00:00 2001 From: Mohammad Razeghi Date: Mon, 4 Dec 2023 20:05:59 +0100 Subject: [PATCH 3/4] fix optimizers_catch_all_statistics expecting EmptyExec --- datafusion/core/tests/custom_sources.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index daf1ef41a297..a1bd4e972c03 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -42,6 +42,7 @@ use datafusion_common::project_schema; use datafusion_common::stats::Precision; use async_trait::async_trait; +use datafusion_physical_plan::memory::MemoryExec; use futures::stream::Stream; /// Also run all tests that are found in the `custom_sources_cases` directory @@ -258,7 +259,7 @@ async fn optimizers_catch_all_statistics() { // when the optimization kicks in, the source is replaced by an EmptyExec assert!( - contains_empty_exec(Arc::clone(&physical_plan)), + contains_memory_exec(Arc::clone(&physical_plan)), "Expected aggregate_statistics optimizations missing: {physical_plan:?}" ); @@ -283,12 +284,12 @@ async fn optimizers_catch_all_statistics() { assert_eq!(format!("{:?}", actual[0]), format!("{expected:?}")); } -fn contains_empty_exec(plan: Arc) -> bool { - if plan.as_any().is::() { +fn contains_memory_exec(plan: Arc) -> bool { + if plan.as_any().is::() { true } else if plan.children().len() != 1 { false } else { - contains_empty_exec(Arc::clone(&plan.children()[0])) + contains_memory_exec(Arc::clone(&plan.children()[0])) } } From 45a5ef0a18665a48c6f567a513debcbdddca3a5b Mon Sep 17 00:00:00 2001 From: Mohammad Razeghi Date: Thu, 7 Dec 2023 01:46:47 +0100 Subject: [PATCH 4/4] implementing serialization for MemoryExec (unfinished) --- datafusion/core/tests/custom_sources.rs | 1 - datafusion/physical-plan/src/memory.rs | 4 + datafusion/proto/proto/datafusion.proto | 25 +- datafusion/proto/src/generated/pbjson.rs | 489 ++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 42 +- datafusion/proto/src/physical_plan/mod.rs | 108 +++- .../tests/cases/roundtrip_physical_plan.rs | 6 +- 7 files changed, 666 insertions(+), 9 deletions(-) diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index a1bd4e972c03..c65eeb52a9b1 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -30,7 +30,6 @@ use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::logical_expr::{ col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE, }; -use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::{ collect, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index d17b9983b82f..8ec322bf8462 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -231,6 +231,10 @@ impl MemoryExec { pub fn original_schema(&self) -> SchemaRef { self.schema.clone() } + + pub fn sort_information(&self) -> Vec { + self.sort_information.clone() + } } /// Iterator over batches diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index e3da5bfb5f64..8cc31281fac4 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -840,6 +840,16 @@ message Field { map metadata = 5; } +message RecordBatch { + Schema schema = 1; + repeated string columns = 2; + int32 row_count = 3; +} + +message RecordBatchList { + repeated RecordBatch record_batchs = 1; +} + message FixedSizeBinary{ int32 length = 1; } @@ -1159,6 +1169,7 @@ message PhysicalPlanNode { AnalyzeExecNode analyze = 23; JsonSinkExecNode json_sink = 24; SymmetricHashJoinExecNode symmetric_hash_join = 25; + MemoryExecNode memory = 26; } } @@ -1256,6 +1267,10 @@ message PhysicalExprNode { } } +message PhysicalExprNodeList { + repeated PhysicalExprNode expr_nodes = 1; +} + message PhysicalScalarUdfNode { string name = 1; repeated PhysicalExprNode args = 2; @@ -1484,7 +1499,7 @@ message JoinOn { } message EmptyExecNode { - Schema schema = 2; + Schema schema = 1; } message ProjectionExecNode { @@ -1679,3 +1694,11 @@ message PhysicalGetIndexedFieldExprNode { ListRangeExpr list_range_expr = 4; } } + +message MemoryExecNode { + repeated RecordBatchList partitions = 1; + Schema schema = 2; + Schema projected_schema = 3; + repeated uint32 projection = 4; + repeated PhysicalExprNodeList sort_information = 5; +} \ No newline at end of file diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index b2a6b4ac2bb2..ad41ccc5774f 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -13846,6 +13846,170 @@ impl<'de> serde::Deserialize<'de> for MaybePhysicalSortExprs { deserializer.deserialize_struct("datafusion.MaybePhysicalSortExprs", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for MemoryExecNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.partitions.is_empty() { + len += 1; + } + if self.schema.is_some() { + len += 1; + } + if self.projected_schema.is_some() { + len += 1; + } + if !self.projection.is_empty() { + len += 1; + } + if !self.sort_information.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.MemoryExecNode", len)?; + if !self.partitions.is_empty() { + struct_ser.serialize_field("partitions", &self.partitions)?; + } + if let Some(v) = self.schema.as_ref() { + struct_ser.serialize_field("schema", v)?; + } + if let Some(v) = self.projected_schema.as_ref() { + struct_ser.serialize_field("projectedSchema", v)?; + } + if !self.projection.is_empty() { + struct_ser.serialize_field("projection", &self.projection)?; + } + if !self.sort_information.is_empty() { + struct_ser.serialize_field("sortInformation", &self.sort_information)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for MemoryExecNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "partitions", + "schema", + "projected_schema", + "projectedSchema", + "projection", + "sort_information", + "sortInformation", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Partitions, + Schema, + ProjectedSchema, + Projection, + SortInformation, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "partitions" => Ok(GeneratedField::Partitions), + "schema" => Ok(GeneratedField::Schema), + "projectedSchema" | "projected_schema" => Ok(GeneratedField::ProjectedSchema), + "projection" => Ok(GeneratedField::Projection), + "sortInformation" | "sort_information" => Ok(GeneratedField::SortInformation), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = MemoryExecNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.MemoryExecNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut partitions__ = None; + let mut schema__ = None; + let mut projected_schema__ = None; + let mut projection__ = None; + let mut sort_information__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Partitions => { + if partitions__.is_some() { + return Err(serde::de::Error::duplicate_field("partitions")); + } + partitions__ = Some(map_.next_value()?); + } + GeneratedField::Schema => { + if schema__.is_some() { + return Err(serde::de::Error::duplicate_field("schema")); + } + schema__ = map_.next_value()?; + } + GeneratedField::ProjectedSchema => { + if projected_schema__.is_some() { + return Err(serde::de::Error::duplicate_field("projectedSchema")); + } + projected_schema__ = map_.next_value()?; + } + GeneratedField::Projection => { + if projection__.is_some() { + return Err(serde::de::Error::duplicate_field("projection")); + } + projection__ = + Some(map_.next_value::>>()? + .into_iter().map(|x| x.0).collect()) + ; + } + GeneratedField::SortInformation => { + if sort_information__.is_some() { + return Err(serde::de::Error::duplicate_field("sortInformation")); + } + sort_information__ = Some(map_.next_value()?); + } + } + } + Ok(MemoryExecNode { + partitions: partitions__.unwrap_or_default(), + schema: schema__, + projected_schema: projected_schema__, + projection: projection__.unwrap_or_default(), + sort_information: sort_information__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.MemoryExecNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for NamedStructField { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -16745,6 +16909,98 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { deserializer.deserialize_struct("datafusion.PhysicalExprNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalExprNodeList { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.expr_nodes.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalExprNodeList", len)?; + if !self.expr_nodes.is_empty() { + struct_ser.serialize_field("exprNodes", &self.expr_nodes)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalExprNodeList { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "expr_nodes", + "exprNodes", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + ExprNodes, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "exprNodes" | "expr_nodes" => Ok(GeneratedField::ExprNodes), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalExprNodeList; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalExprNodeList") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut expr_nodes__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::ExprNodes => { + if expr_nodes__.is_some() { + return Err(serde::de::Error::duplicate_field("exprNodes")); + } + expr_nodes__ = Some(map_.next_value()?); + } + } + } + Ok(PhysicalExprNodeList { + expr_nodes: expr_nodes__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalExprNodeList", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalExtensionNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -17829,6 +18085,9 @@ impl serde::Serialize for PhysicalPlanNode { physical_plan_node::PhysicalPlanType::SymmetricHashJoin(v) => { struct_ser.serialize_field("symmetricHashJoin", v)?; } + physical_plan_node::PhysicalPlanType::Memory(v) => { + struct_ser.serialize_field("memory", v)?; + } } } struct_ser.end() @@ -17877,6 +18136,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "jsonSink", "symmetric_hash_join", "symmetricHashJoin", + "memory", ]; #[allow(clippy::enum_variant_names)] @@ -17905,6 +18165,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { Analyze, JsonSink, SymmetricHashJoin, + Memory, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -17950,6 +18211,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "analyze" => Ok(GeneratedField::Analyze), "jsonSink" | "json_sink" => Ok(GeneratedField::JsonSink), "symmetricHashJoin" | "symmetric_hash_join" => Ok(GeneratedField::SymmetricHashJoin), + "memory" => Ok(GeneratedField::Memory), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -18138,6 +18400,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { return Err(serde::de::Error::duplicate_field("symmetricHashJoin")); } physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::SymmetricHashJoin) +; + } + GeneratedField::Memory => { + if physical_plan_type__.is_some() { + return Err(serde::de::Error::duplicate_field("memory")); + } + physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::Memory) ; } } @@ -20132,6 +20401,226 @@ impl<'de> serde::Deserialize<'de> for ProjectionNode { deserializer.deserialize_struct("datafusion.ProjectionNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for RecordBatch { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.schema.is_some() { + len += 1; + } + if !self.columns.is_empty() { + len += 1; + } + if self.row_count != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.RecordBatch", len)?; + if let Some(v) = self.schema.as_ref() { + struct_ser.serialize_field("schema", v)?; + } + if !self.columns.is_empty() { + struct_ser.serialize_field("columns", &self.columns)?; + } + if self.row_count != 0 { + struct_ser.serialize_field("rowCount", &self.row_count)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for RecordBatch { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "schema", + "columns", + "row_count", + "rowCount", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Schema, + Columns, + RowCount, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "schema" => Ok(GeneratedField::Schema), + "columns" => Ok(GeneratedField::Columns), + "rowCount" | "row_count" => Ok(GeneratedField::RowCount), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = RecordBatch; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.RecordBatch") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut schema__ = None; + let mut columns__ = None; + let mut row_count__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Schema => { + if schema__.is_some() { + return Err(serde::de::Error::duplicate_field("schema")); + } + schema__ = map_.next_value()?; + } + GeneratedField::Columns => { + if columns__.is_some() { + return Err(serde::de::Error::duplicate_field("columns")); + } + columns__ = Some(map_.next_value()?); + } + GeneratedField::RowCount => { + if row_count__.is_some() { + return Err(serde::de::Error::duplicate_field("rowCount")); + } + row_count__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(RecordBatch { + schema: schema__, + columns: columns__.unwrap_or_default(), + row_count: row_count__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.RecordBatch", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for RecordBatchList { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.record_batchs.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.RecordBatchList", len)?; + if !self.record_batchs.is_empty() { + struct_ser.serialize_field("recordBatchs", &self.record_batchs)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for RecordBatchList { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "record_batchs", + "recordBatchs", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + RecordBatchs, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "recordBatchs" | "record_batchs" => Ok(GeneratedField::RecordBatchs), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = RecordBatchList; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.RecordBatchList") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut record_batchs__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::RecordBatchs => { + if record_batchs__.is_some() { + return Err(serde::de::Error::duplicate_field("recordBatchs")); + } + record_batchs__ = Some(map_.next_value()?); + } + } + } + Ok(RecordBatchList { + record_batchs: record_batchs__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.RecordBatchList", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for RepartitionExecNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 5478df270eab..025ddedc5770 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1025,6 +1025,22 @@ pub struct Field { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct RecordBatch { + #[prost(message, optional, tag = "1")] + pub schema: ::core::option::Option, + #[prost(string, repeated, tag = "2")] + pub columns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(int32, tag = "3")] + pub row_count: i32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RecordBatchList { + #[prost(message, repeated, tag = "1")] + pub record_batchs: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct FixedSizeBinary { #[prost(int32, tag = "1")] pub length: i32, @@ -1516,7 +1532,7 @@ pub mod owned_table_reference { pub struct PhysicalPlanNode { #[prost( oneof = "physical_plan_node::PhysicalPlanType", - tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25" + tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26" )] pub physical_plan_type: ::core::option::Option, } @@ -1575,6 +1591,8 @@ pub mod physical_plan_node { JsonSink(::prost::alloc::boxed::Box), #[prost(message, tag = "25")] SymmetricHashJoin(::prost::alloc::boxed::Box), + #[prost(message, tag = "26")] + Memory(super::MemoryExecNode), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -1716,6 +1734,12 @@ pub mod physical_expr_node { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalExprNodeList { + #[prost(message, repeated, tag = "1")] + pub expr_nodes: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalScalarUdfNode { #[prost(string, tag = "1")] pub name: ::prost::alloc::string::String, @@ -2084,7 +2108,7 @@ pub struct JoinOn { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct EmptyExecNode { - #[prost(message, optional, tag = "2")] + #[prost(message, optional, tag = "1")] pub schema: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] @@ -2393,6 +2417,20 @@ pub mod physical_get_indexed_field_expr_node { ListRangeExpr(::prost::alloc::boxed::Box), } } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MemoryExecNode { + #[prost(message, repeated, tag = "1")] + pub partitions: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub schema: ::core::option::Option, + #[prost(message, optional, tag = "3")] + pub projected_schema: ::core::option::Option, + #[prost(uint32, repeated, tag = "4")] + pub projection: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "5")] + pub sort_information: ::prost::alloc::vec::Vec, +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum JoinType { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 27e654d3cd60..d0ac5bb072fa 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -19,6 +19,7 @@ use std::convert::TryInto; use std::fmt::Debug; use std::sync::Arc; +use arrow::record_batch::RecordBatch; use datafusion::arrow::compute::SortOptions; use datafusion::arrow::datatypes::SchemaRef; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; @@ -44,6 +45,7 @@ use datafusion::physical_plan::joins::{ }; use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; @@ -55,6 +57,7 @@ use datafusion::physical_plan::windows::{ use datafusion::physical_plan::{ udaf, AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr, }; +use datafusion::sql::sqlparser::dialect::dialect_from_str; use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use prost::bytes::BufMut; use prost::Message; @@ -69,7 +72,7 @@ use crate::protobuf::physical_expr_node::ExprType; use crate::protobuf::physical_plan_node::PhysicalPlanType; use crate::protobuf::repartition_exec_node::PartitionMethod; use crate::protobuf::{ - self, window_agg_exec_node, PhysicalPlanNode, PhysicalSortExprNodeCollection, + self, window_agg_exec_node, PhysicalPlanNode, PhysicalSortExprNodeCollection, Schema, }; use crate::{convert_required, into_required}; @@ -706,6 +709,40 @@ impl AsExecutionPlan for PhysicalPlanNode { let schema = Arc::new(convert_required!(empty.schema)?); Ok(Arc::new(EmptyExec::new(schema))) } + PhysicalPlanType::Memory(memory) => { + println!("{:?}", memory); + todo!("not implemented") + // let partitions = memory.partitions.iter().map( + // |record_batchs| { + // record_batchs.record_batchs.iter().map( + // |record| { + // let schema: Result = record.schema + // .ok_or_else(|| DataFusionError::Internal("No Schema".to_string())) + // .and_then(|s| s.try_into().map_err(|_| DataFusionError::Internal("Schema conversion error".to_string()))); + + // let cols = record.columns.iter().map( + // |col| { + // todo!("what") + // } + // ).collect(); + + // schema.and_then(|s| { + // RecordBatch::try_new(s,cols).map_err(DataFusionError::Internal("Can't Instantiate RecordBatch".to_string())) + // }) + // } + // ).collect::,_>>() + // } + // ).collect::>,_>>().map_err(|e| DataFusionError::ArrowError(e)); + + // let schema = todo!("put schema"); + // let projection = todo!("projection"); + + // partitions.and_then(|p| { + // MemoryExec::try_new(&p, schema, projection) + // }).map( |e| { + // Arc::new(e) as _ + // }) + } PhysicalPlanType::Sort(sort) => { let input: Arc = into_physical_plan(&sort.input, registry, runtime, extension_codec)?; @@ -1295,6 +1332,75 @@ impl AsExecutionPlan for PhysicalPlanNode { }); } + if let Some(memory) = plan.downcast_ref::() { + let projected_schema = memory.schema().as_ref().try_into()?; + + let expr = memory.sort_information() + .iter() + .map(|exprs| { + let expr_nodes = exprs.iter().map(|expr| { + let converted_expr = expr.expr.to_owned().try_into()?; + Ok(protobuf::PhysicalExprNode { + expr_type: Some(protobuf::physical_expr_node::ExprType::Sort( + Box::new(protobuf::PhysicalSortExprNode { + expr: Some(Box::new(converted_expr)), + asc: !expr.options.descending, + nulls_first: expr.options.nulls_first, + }), + )), + }) + }).collect::, DataFusionError>>()?; + Ok(protobuf::PhysicalExprNodeList { expr_nodes }) + }).collect::, DataFusionError>>()?; + + let original_schema: protobuf::Schema = memory.original_schema().as_ref().try_into()?; + + let partitions = memory.partitions().iter().map( + |partition| { + let records = partition.iter().map(|record| { + let schema = record.schema().try_into().ok(); + let cols = record.columns().into_iter().map( + |col| { + format!("{:?}", col) + } + ).collect(); + protobuf::RecordBatch { + schema: schema, + columns: cols, + row_count: record.num_rows() as i32, + } + }).collect::>(); + protobuf::RecordBatchList{ + record_batchs: records + } + } + ).collect::>(); + + + let projections = memory.projection().as_ref().map_or_else( + Vec::new, + |projs| { + projs.into_iter().filter_map( + |proj| { + let value_u32: u32 = (*proj).try_into().expect("Value is too large for u32"); + Some(value_u32) + } + ).collect() + }); + + return Ok(protobuf::PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::Memory( + protobuf::MemoryExecNode { + partitions, + schema: Some(original_schema), + projected_schema: Some(projected_schema), + projection: projections, + sort_information: expr, + } + )), + }); + } + if let Some(coalesce_batches) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( coalesce_batches.input().to_owned(), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 911718336531..4f9305422bbd 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -653,13 +653,11 @@ fn roundtrip_get_indexed_field_named_struct_field() -> Result<()> { #[test] fn roundtrip_get_indexed_field_list_index() -> Result<()> { - let fields = vec![ + let schema = Schema::new(vec![ Field::new("id", DataType::Int64, true), Field::new_list("arg", Field::new("item", DataType::Float64, true), true), Field::new("key", DataType::Int64, true), - ]; - - let schema = Schema::new(fields); + ]); let input = Arc::new(MemoryExec::try_new_with_dummy_row( SchemaRef::new(schema.clone()), 1,