From c7d1d4a048aa7898b628d0b20dffce40213c4ea5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 11 Aug 2021 06:39:26 -0400 Subject: [PATCH] Add support for EXPLAIN ANALYZE --- ballista/rust/core/proto/ballista.proto | 6 + .../core/src/serde/logical_plan/from_proto.rs | 9 +- .../rust/core/src/serde/logical_plan/mod.rs | 41 +++- .../core/src/serde/logical_plan/to_proto.rs | 11 + datafusion/src/dataframe.rs | 4 +- datafusion/src/execution/context.rs | 2 +- datafusion/src/execution/dataframe_impl.rs | 6 +- datafusion/src/logical_plan/builder.rs | 36 +++- datafusion/src/logical_plan/plan.rs | 16 ++ datafusion/src/optimizer/constant_folding.rs | 1 + datafusion/src/optimizer/filter_push_down.rs | 1 + .../src/optimizer/hash_build_probe_order.rs | 6 + .../src/optimizer/projection_push_down.rs | 25 +++ datafusion/src/optimizer/utils.rs | 3 +- datafusion/src/physical_plan/analyze.rs | 201 ++++++++++++++++++ datafusion/src/physical_plan/mod.rs | 2 + datafusion/src/physical_plan/parquet.rs | 34 +-- datafusion/src/physical_plan/planner.rs | 12 +- datafusion/src/physical_plan/stream.rs | 64 ++++++ datafusion/src/sql/planner.rs | 33 +-- datafusion/src/test/exec.rs | 47 +--- datafusion/tests/sql.rs | 49 +++++ 22 files changed, 505 insertions(+), 104 deletions(-) create mode 100644 datafusion/src/physical_plan/analyze.rs create mode 100644 datafusion/src/physical_plan/stream.rs diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 2538a10ceda3b..a1608c652dbad 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -249,6 +249,7 @@ message LogicalPlanNode { RepartitionNode repartition = 9; EmptyRelationNode empty_relation = 10; CreateExternalTableNode create_external_table = 11; + AnalyzeNode analyze = 14; ExplainNode explain = 12; WindowNode window = 13; } @@ -323,6 +324,11 @@ enum FileType{ CSV = 2; } +message AnalyzeNode { + LogicalPlanNode input = 1; + bool verbose = 2; +} + message ExplainNode{ LogicalPlanNode input = 1; bool verbose = 2; diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 31b8b6d3bcbcd..f9761a2015414 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -231,10 +231,17 @@ impl TryInto for &protobuf::LogicalPlanNode { has_header: create_extern_table.has_header, }) } + LogicalPlanType::Analyze(analyze) => { + let input: LogicalPlan = convert_box_required!(analyze.input)?; + LogicalPlanBuilder::from(input) + .explain(analyze.verbose, true)? + .build() + .map_err(|e| e.into()) + } LogicalPlanType::Explain(explain) => { let input: LogicalPlan = convert_box_required!(explain.input)?; LogicalPlanBuilder::from(input) - .explain(explain.verbose)? + .explain(explain.verbose, false)? .build() .map_err(|e| e.into()) } diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index e4e438335efda..dbaac1de7b574 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -661,6 +661,43 @@ mod roundtrip_tests { Ok(()) } + #[test] + fn roundtrip_analyze() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("first_name", DataType::Utf8, false), + Field::new("last_name", DataType::Utf8, false), + Field::new("state", DataType::Utf8, false), + Field::new("salary", DataType::Int32, false), + ]); + + let verbose_plan = LogicalPlanBuilder::scan_csv( + "employee.csv", + CsvReadOptions::new().schema(&schema).has_header(true), + Some(vec![3, 4]), + ) + .and_then(|plan| plan.sort(vec![col("salary")])) + .and_then(|plan| plan.explain(true, true)) + .and_then(|plan| plan.build()) + .map_err(BallistaError::DataFusionError)?; + + let plan = LogicalPlanBuilder::scan_csv( + "employee.csv", + CsvReadOptions::new().schema(&schema).has_header(true), + Some(vec![3, 4]), + ) + .and_then(|plan| plan.sort(vec![col("salary")])) + .and_then(|plan| plan.explain(false, true)) + .and_then(|plan| plan.build()) + .map_err(BallistaError::DataFusionError)?; + + roundtrip_test!(plan); + + roundtrip_test!(verbose_plan); + + Ok(()) + } + #[test] fn roundtrip_explain() -> Result<()> { let schema = Schema::new(vec![ @@ -677,7 +714,7 @@ mod roundtrip_tests { Some(vec![3, 4]), ) .and_then(|plan| plan.sort(vec![col("salary")])) - .and_then(|plan| plan.explain(true)) + .and_then(|plan| plan.explain(true, false)) .and_then(|plan| plan.build()) .map_err(BallistaError::DataFusionError)?; @@ -687,7 +724,7 @@ mod roundtrip_tests { Some(vec![3, 4]), ) .and_then(|plan| plan.sort(vec![col("salary")])) - .and_then(|plan| plan.explain(false)) + .and_then(|plan| plan.explain(false, false)) .and_then(|plan| plan.build()) .map_err(BallistaError::DataFusionError)?; diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 1a3834af59d9c..e1c7f53cf9cf8 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -931,6 +931,17 @@ impl TryInto for &LogicalPlan { )), }) } + LogicalPlan::Analyze { verbose, input, .. } => { + let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?; + Ok(protobuf::LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::Analyze(Box::new( + protobuf::AnalyzeNode { + input: Some(Box::new(input)), + verbose: *verbose, + }, + ))), + }) + } LogicalPlan::Explain { verbose, plan, .. } => { let input: protobuf::LogicalPlanNode = plan.as_ref().try_into()?; Ok(protobuf::LogicalPlanNode { diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index 1d4cffdf89d49..71935468cecfa 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -289,6 +289,8 @@ pub trait DataFrame: Send + Sync { /// Return a DataFrame with the explanation of its plan so far. /// + /// if `analyze` is specified, runs the plan and reports metrics + /// /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -300,7 +302,7 @@ pub trait DataFrame: Send + Sync { /// # Ok(()) /// # } /// ``` - fn explain(&self, verbose: bool) -> Result>; + fn explain(&self, verbose: bool, analyze: bool) -> Result>; /// Return a `FunctionRegistry` used to plan udf's calls /// diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 0cf8b3b6c2765..7a54beac6d866 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -986,7 +986,7 @@ mod tests { let plan = LogicalPlanBuilder::scan_empty(Some("employee"), &schema, None) .unwrap() - .explain(true) + .explain(true, false) .unwrap() .build() .unwrap(); diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 1c0094b711d6b..ddaa04e5c909a 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -183,9 +183,9 @@ impl DataFrame for DataFrameImpl { self.plan.schema() } - fn explain(&self, verbose: bool) -> Result> { + fn explain(&self, verbose: bool, analyze: bool) -> Result> { let plan = LogicalPlanBuilder::from(self.to_logical_plan()) - .explain(verbose)? + .explain(verbose, analyze)? .build()?; Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan))) } @@ -318,7 +318,7 @@ mod tests { let df = df .select_columns(&["c1", "c2", "c11"])? .limit(10)? - .explain(false)?; + .explain(false, false)?; let plan = df.to_logical_plan(); // build query using SQL diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 0dfc1e7aa0480..d9afe2e01f389 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -455,18 +455,32 @@ impl LogicalPlanBuilder { } /// Create an expression to represent the explanation of the plan - pub fn explain(&self, verbose: bool) -> Result { - let stringified_plans = - vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)]; - + /// + /// if `analyze` is true, runs the actual plan and produces + /// information about metrics during run. + /// + /// if `verbose` is true, prints out additional details. + pub fn explain(&self, verbose: bool, analyze: bool) -> Result { let schema = LogicalPlan::explain_schema(); - - Ok(Self::from(LogicalPlan::Explain { - verbose, - plan: Arc::new(self.plan.clone()), - stringified_plans, - schema: schema.to_dfschema_ref()?, - })) + let schema = schema.to_dfschema_ref()?; + + if analyze { + Ok(Self::from(LogicalPlan::Analyze { + verbose, + input: Arc::new(self.plan.clone()), + schema, + })) + } else { + let stringified_plans = + vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)]; + + Ok(Self::from(LogicalPlan::Explain { + verbose, + plan: Arc::new(self.plan.clone()), + stringified_plans, + schema, + })) + } } /// Build the plan diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 28405fb6dfba0..cb81b8d852fb5 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -213,6 +213,16 @@ pub enum LogicalPlan { /// The output schema of the explain (2 columns of text) schema: DFSchemaRef, }, + /// Runs the actual plan, and then prints the physical plan with + /// with execution metrics. + Analyze { + /// Should extra detail be included? + verbose: bool, + /// The logical plan that is being EXPLAIN ANALYZE'd + input: Arc, + /// The output schema of the explain (2 columns of text) + schema: DFSchemaRef, + }, /// Extension operator defined outside of DataFusion Extension { /// The runtime extension operator @@ -239,6 +249,7 @@ impl LogicalPlan { LogicalPlan::Limit { input, .. } => input.schema(), LogicalPlan::CreateExternalTable { schema, .. } => schema, LogicalPlan::Explain { schema, .. } => schema, + LogicalPlan::Analyze { schema, .. } => schema, LogicalPlan::Extension { node } => node.schema(), LogicalPlan::Union { schema, .. } => schema, } @@ -278,6 +289,7 @@ impl LogicalPlan { } LogicalPlan::Extension { node } => vec![node.schema()], LogicalPlan::Explain { schema, .. } + | LogicalPlan::Analyze { schema, .. } | LogicalPlan::EmptyRelation { schema, .. } | LogicalPlan::CreateExternalTable { schema, .. } => vec![schema], LogicalPlan::Limit { input, .. } @@ -327,6 +339,7 @@ impl LogicalPlan { | LogicalPlan::Limit { .. } | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::CrossJoin { .. } + | LogicalPlan::Analyze { .. } | LogicalPlan::Explain { .. } | LogicalPlan::Union { .. } => { vec![] @@ -350,6 +363,7 @@ impl LogicalPlan { LogicalPlan::Extension { node } => node.inputs(), LogicalPlan::Union { inputs, .. } => inputs.iter().collect(), LogicalPlan::Explain { plan, .. } => vec![plan], + LogicalPlan::Analyze { input: plan, .. } => vec![plan], // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } @@ -495,6 +509,7 @@ impl LogicalPlan { true } LogicalPlan::Explain { plan, .. } => plan.accept(visitor)?, + LogicalPlan::Analyze { input: plan, .. } => plan.accept(visitor)?, // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } @@ -790,6 +805,7 @@ impl LogicalPlan { write!(f, "CreateExternalTable: {:?}", name) } LogicalPlan::Explain { .. } => write!(f, "Explain"), + LogicalPlan::Analyze { .. } => write!(f, "Analyze"), LogicalPlan::Union { .. } => write!(f, "Union"), LogicalPlan::Extension { ref node } => node.fmt_for_explain(f), } diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs index b4c4a96de4b5b..31b0b7f9e1997 100644 --- a/datafusion/src/optimizer/constant_folding.rs +++ b/datafusion/src/optimizer/constant_folding.rs @@ -80,6 +80,7 @@ impl OptimizerRule for ConstantFolding { | LogicalPlan::Extension { .. } | LogicalPlan::Sort { .. } | LogicalPlan::Explain { .. } + | LogicalPlan::Analyze { .. } | LogicalPlan::Limit { .. } | LogicalPlan::Union { .. } | LogicalPlan::Join { .. } diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index 039e92d1c1285..d0990de38dca9 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -284,6 +284,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // push the optimization to the plan of this explain push_down(&state, plan) } + LogicalPlan::Analyze { .. } => push_down(&state, plan), LogicalPlan::Filter { input, predicate } => { let mut predicates = vec![]; split_members(predicate, &mut predicates); diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs b/datafusion/src/optimizer/hash_build_probe_order.rs index ecb3b40e32032..209faf49bbe19 100644 --- a/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/datafusion/src/optimizer/hash_build_probe_order.rs @@ -80,6 +80,11 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option { // we cannot predict how rows will be repartitioned None } + LogicalPlan::Analyze { .. } => { + // Analyze produces one row, verbose produces more + // but it should never be used as an input to a Join anyways + None + } // the following operators are special cases and not querying data LogicalPlan::CreateExternalTable { .. } => None, LogicalPlan::Explain { .. } => None, @@ -201,6 +206,7 @@ impl OptimizerRule for HashBuildProbeOrder { | LogicalPlan::Sort { .. } | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::Explain { .. } + | LogicalPlan::Analyze { .. } | LogicalPlan::Union { .. } | LogicalPlan::Extension { .. } => { let expr = plan.expressions(); diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 96c5094711ba9..7dddbffa26437 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -356,6 +356,31 @@ fn optimize_plan( LogicalPlan::Explain { .. } => Err(DataFusionError::Internal( "Unsupported logical plan: Explain must be root of the plan".to_string(), )), + LogicalPlan::Analyze { + input, + verbose, + schema, + } => { + // make sure we keep all the columns from the input plan + let required_columns = input + .schema() + .fields() + .iter() + .map(|f| f.qualified_column()) + .collect::>(); + + Ok(LogicalPlan::Analyze { + input: Arc::new(optimize_plan( + optimizer, + input, + &required_columns, + false, + execution_props, + )?), + verbose: *verbose, + schema: schema.clone(), + }) + } LogicalPlan::Union { inputs, schema, diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 615f0ccfceaf5..8ce6fe5b557b7 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -198,7 +198,8 @@ pub fn from_plan( LogicalPlan::EmptyRelation { .. } | LogicalPlan::TableScan { .. } | LogicalPlan::CreateExternalTable { .. } - | LogicalPlan::Explain { .. } => Ok(plan.clone()), + | LogicalPlan::Explain { .. } + | LogicalPlan::Analyze { .. } => Ok(plan.clone()), } } diff --git a/datafusion/src/physical_plan/analyze.rs b/datafusion/src/physical_plan/analyze.rs new file mode 100644 index 0000000000000..36726ad3a7b48 --- /dev/null +++ b/datafusion/src/physical_plan/analyze.rs @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the ANALYZE operator + +use std::sync::Arc; +use std::{any::Any, time::Instant}; + +use crate::{ + error::{DataFusionError, Result}, + physical_plan::{display::DisplayableExecutionPlan, Partitioning}, + physical_plan::{DisplayFormatType, ExecutionPlan}, +}; +use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; +use futures::StreamExt; + +use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream}; +use async_trait::async_trait; + +/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input, +/// discards the results, and then prints out an annotated plan with metrics +#[derive(Debug, Clone)] +pub struct AnalyzeExec { + /// control how much extra to print + verbose: bool, + /// The input plan (the plan being analyzed) + input: Arc, + /// The output schema for RecordBatches of this exec node + schema: SchemaRef, +} + +impl AnalyzeExec { + /// Create a new AnalyzeExec + pub fn new(verbose: bool, input: Arc, schema: SchemaRef) -> Self { + AnalyzeExec { + verbose, + input, + schema, + } + } +} + +#[async_trait] +impl ExecutionPlan for AnalyzeExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + /// Specifies we want the input as a single stream + fn required_child_distribution(&self) -> Distribution { + Distribution::SinglePartition + } + + /// Get the output partitioning of this plan + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn with_new_children( + &self, + mut children: Vec>, + ) -> Result> { + if children.len() == 1 { + Ok(Arc::new(Self::new( + self.verbose, + children.pop().unwrap(), + self.schema.clone(), + ))) + } else { + Err(DataFusionError::Internal(format!( + "Invalid child count for AnalyzeExec. Expected 1 got {}", + children.len() + ))) + } + } + + async fn execute(&self, partition: usize) -> Result { + if 0 != partition { + return Err(DataFusionError::Internal(format!( + "AnalyzeExec invalid partition. Expected 0, got {}", + partition + ))); + } + + // should be ensured by `SinglePartition` above + let input_partitions = self.input.output_partitioning().partition_count(); + if input_partitions != 1 { + return Err(DataFusionError::Internal(format!( + "AnalyzeExec invalid number of input partitions. Expected 1, got {}", + input_partitions + ))); + } + + let (tx, rx) = tokio::sync::mpsc::channel(input_partitions); + + let captured_input = self.input.clone(); + let mut input_stream = captured_input.execute(0).await?; + let captured_schema = self.schema.clone(); + let verbose = self.verbose; + + // Task reads batches the input and when complete produce a + // RecordBatch with a report that is written to `tx` when done + tokio::task::spawn(async move { + let start = Instant::now(); + let mut total_rows = 0; + + // Note the code below ignores errors sending on tx. An + // error sending means the plan is being torn down and + // nothing is left that will handle the error (aka no one + // will hear us scream) + while let Some(b) = input_stream.next().await { + match b { + Ok(batch) => { + total_rows += batch.num_rows(); + } + b @ Err(_) => { + // try and pass on errors from input + if tx.send(b).await.is_err() { + // receiver hung up, stop executing (no + // one will look at any further results we + // send) + return; + } + } + } + } + let end = Instant::now(); + + let mut type_builder = StringBuilder::new(1); + let mut plan_builder = StringBuilder::new(1); + + // TODO use some sort of enum rather than strings? + type_builder.append_value("Plan with Metrics").unwrap(); + + let annotated_plan = + DisplayableExecutionPlan::with_metrics(captured_input.as_ref()) + .indent() + .to_string(); + plan_builder.append_value(annotated_plan).unwrap(); + + // Verbose output + // TODO make this more sophisticated + if verbose { + type_builder.append_value("Output Rows").unwrap(); + plan_builder.append_value(total_rows.to_string()).unwrap(); + + type_builder.append_value("Duration").unwrap(); + plan_builder + .append_value(format!("{:?}", end - start)) + .unwrap(); + } + + let maybe_batch = RecordBatch::try_new( + captured_schema, + vec![ + Arc::new(type_builder.finish()), + Arc::new(plan_builder.finish()), + ], + ); + // again ignore error + tx.send(maybe_batch).await.ok(); + }); + + Ok(RecordBatchReceiverStream::create(&self.schema, rx)) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "AnalyzeExec verbose={}", self.verbose) + } + } + } +} diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 0df6e6038e67d..8f7db72484c96 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -639,6 +639,7 @@ pub trait Accumulator: Send + Sync + Debug { } pub mod aggregates; +pub mod analyze; pub mod array_expressions; pub mod coalesce_batches; pub mod coalesce_partitions; @@ -671,6 +672,7 @@ pub mod repartition; pub mod sort; pub mod sort_preserving_merge; pub mod source; +pub mod stream; pub mod string_expressions; pub mod type_coercion; pub mod udaf; diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index ec5611f962922..ff8bb5b32678d 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -20,7 +20,6 @@ use std::fmt; use std::fs::File; use std::sync::Arc; -use std::task::{Context, Poll}; use std::{any::Any, convert::TryInto}; use crate::{ @@ -28,8 +27,7 @@ use crate::{ logical_plan::{Column, Expr}, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, physical_plan::{ - common, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, + common, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, }, scalar::ScalarValue, }; @@ -55,12 +53,11 @@ use tokio::{ sync::mpsc::{channel, Receiver, Sender}, task, }; -use tokio_stream::wrappers::ReceiverStream; use crate::datasource::datasource::{ColumnStatistics, Statistics}; use async_trait::async_trait; -use futures::stream::{Stream, StreamExt}; +use super::stream::RecordBatchReceiverStream; use super::SQLMetric; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::Accumulator; @@ -688,10 +685,7 @@ impl ExecutionPlan for ParquetExec { } }); - Ok(Box::pin(ParquetStream { - schema: self.schema.clone(), - inner: ReceiverStream::new(response_rx), - })) + Ok(RecordBatchReceiverStream::create(&self.schema, response_rx)) } fn fmt_as( @@ -938,28 +932,6 @@ fn split_files(filenames: &[String], n: usize) -> Vec<&[String]> { filenames.chunks(chunk_size).collect() } -struct ParquetStream { - schema: SchemaRef, - inner: ReceiverStream>, -} - -impl Stream for ParquetStream { - type Item = ArrowResult; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.inner.poll_next_unpin(cx) - } -} - -impl RecordBatchStream for ParquetStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index e662821e4539f..256a43b205e5f 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -17,6 +17,7 @@ //! Physical query planner +use super::analyze::AnalyzeExec; use super::{ aggregates, cross_join::CrossJoinExec, empty::EmptyExec, expressions::binary, functions, hash_join::PartitionMode, udaf, union::UnionExec, windows, @@ -741,6 +742,15 @@ impl DefaultPhysicalPlanner { LogicalPlan::Explain { .. } => Err(DataFusionError::Internal( "Unsupported logical plan: Explain must be root of the plan".to_string(), )), + LogicalPlan::Analyze { + verbose, + input, + schema, + } => { + let input = self.create_initial_plan(input, ctx_state)?; + let schema = SchemaRef::new(schema.as_ref().to_owned().into()); + Ok(Arc::new(AnalyzeExec::new(*verbose, input, schema))) + } LogicalPlan::Extension { node } => { let physical_inputs = node .inputs() @@ -1651,7 +1661,7 @@ mod tests { let logical_plan = LogicalPlanBuilder::scan_empty(Some("employee"), &schema, None) .unwrap() - .explain(true) + .explain(true, false) .unwrap() .build() .unwrap(); diff --git a/datafusion/src/physical_plan/stream.rs b/datafusion/src/physical_plan/stream.rs new file mode 100644 index 0000000000000..0c29f871b998a --- /dev/null +++ b/datafusion/src/physical_plan/stream.rs @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Stream wrappers for physical operators + +use arrow::{ + datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch, +}; +use futures::{Stream, StreamExt}; +use tokio_stream::wrappers::ReceiverStream; + +use super::{RecordBatchStream, SendableRecordBatchStream}; + +/// Adapter for a tokio [`ReceiverStream`] that implements the +/// [`SendableRecordBatchStream`] +/// interface +pub struct RecordBatchReceiverStream { + schema: SchemaRef, + inner: ReceiverStream>, +} + +impl RecordBatchReceiverStream { + /// Construct a new [`RecordBatchReceiverStream`] which will send + /// batches of the specfied schema from `inner` + pub fn create( + schema: &SchemaRef, + rx: tokio::sync::mpsc::Receiver>, + ) -> SendableRecordBatchStream { + let schema = schema.clone(); + let inner = ReceiverStream::new(rx); + Box::pin(Self { schema, inner }) + } +} + +impl Stream for RecordBatchReceiverStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_next_unpin(cx) + } +} + +impl RecordBatchStream for RecordBatchReceiverStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index ef2b63464969b..29204f4a6dedf 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -101,8 +101,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Statement::Explain { verbose, statement, - analyze: _, - } => self.explain_statement_to_plan(*verbose, statement), + analyze, + } => self.explain_statement_to_plan(*verbose, *analyze, statement), Statement::Query(query) => self.query_to_plan(query), Statement::ShowVariable { variable } => self.show_variable_to_plan(variable), Statement::ShowColumns { @@ -230,21 +230,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { pub fn explain_statement_to_plan( &self, verbose: bool, + analyze: bool, statement: &Statement, ) -> Result { let plan = self.sql_statement_to_plan(statement)?; - - let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)]; - - let schema = LogicalPlan::explain_schema(); let plan = Arc::new(plan); + let schema = LogicalPlan::explain_schema(); + let schema = schema.to_dfschema_ref()?; - Ok(LogicalPlan::Explain { - verbose, - plan, - stringified_plans, - schema: schema.to_dfschema_ref()?, - }) + if analyze { + Ok(LogicalPlan::Analyze { + verbose, + input: plan, + schema, + }) + } else { + let stringified_plans = + vec![plan.to_stringified(PlanType::InitialLogicalPlan)]; + Ok(LogicalPlan::Explain { + verbose, + plan, + stringified_plans, + schema, + }) + } } fn build_schema(&self, columns: &[SQLColumnDef]) -> Result { diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs index 3971db3adf823..247dab1fb5bd8 100644 --- a/datafusion/src/test/exec.rs +++ b/datafusion/src/test/exec.rs @@ -30,13 +30,15 @@ use arrow::{ error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; -use futures::{Stream, StreamExt}; -use tokio_stream::wrappers::ReceiverStream; +use futures::Stream; -use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, }; +use crate::{ + error::{DataFusionError, Result}, + physical_plan::stream::RecordBatchReceiverStream, +}; /// Index into the data that has been returned so far #[derive(Debug, Default, Clone)] @@ -161,8 +163,6 @@ impl ExecutionPlan for MockExec { async fn execute(&self, partition: usize) -> Result { assert_eq!(partition, 0); - let schema = self.schema(); - // Result doesn't implement clone, so do it ourself let data: Vec<_> = self .data @@ -188,11 +188,7 @@ impl ExecutionPlan for MockExec { }); // returned stream simply reads off the rx stream - let stream = DelayedStream { - schema, - inner: ReceiverStream::new(rx), - }; - Ok(Box::pin(stream)) + Ok(RecordBatchReceiverStream::create(&self.schema, rx)) } } @@ -204,29 +200,6 @@ fn clone_error(e: &ArrowError) -> ArrowError { } } -#[derive(Debug)] -pub struct DelayedStream { - schema: SchemaRef, - inner: ReceiverStream>, -} - -impl Stream for DelayedStream { - type Item = ArrowResult; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.inner.poll_next_unpin(cx) - } -} - -impl RecordBatchStream for DelayedStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} - /// A Mock ExecutionPlan that does not start producing input until a /// barrier is called /// @@ -289,8 +262,6 @@ impl ExecutionPlan for BarrierExec { async fn execute(&self, partition: usize) -> Result { assert!(partition < self.data.len()); - let schema = self.schema(); - let (tx, rx) = tokio::sync::mpsc::channel(2); // task simply sends data in order after barrier is reached @@ -308,11 +279,7 @@ impl ExecutionPlan for BarrierExec { }); // returned stream simply reads off the rx stream - let stream = DelayedStream { - schema, - inner: ReceiverStream::new(rx), - }; - Ok(Box::pin(stream)) + Ok(RecordBatchReceiverStream::create(&self.schema, rx)) } } diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 0c33bd4772668..eadda9345d102 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -2140,6 +2140,55 @@ async fn csv_explain() { assert_eq!(expected, actual); } +#[tokio::test] +async fn csv_explain_analyze() { + // This test uses the execute function to run an actual plan under EXPLAIN ANALYZE + let mut ctx = ExecutionContext::new(); + register_aggregate_csv_by_sql(&mut ctx).await; + let sql = "EXPLAIN ANALYZE SELECT count(*), c1 FROM aggregate_test_100 group by c1"; + let actual = execute_to_batches(&mut ctx, sql).await; + let formatted = arrow::util::pretty::pretty_format_batches(&actual).unwrap(); + let formatted = normalize_for_explain(&formatted); + + // Only test basic plumbing and try to avoid having to change too + // many things + let needle = + "RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), metrics=[fetchTime"; + assert!( + formatted.contains(needle), + "did not find '{}' in\n{}", + needle, + formatted + ); + let verbose_needle = "Output Rows | 5"; + assert!( + !formatted.contains(verbose_needle), + "found unexpected '{}' in\n{}", + verbose_needle, + formatted + ); +} + +#[tokio::test] +async fn csv_explain_analyze_verbose() { + // This test uses the execute function to run an actual plan under EXPLAIN VERBOSE ANALYZE + let mut ctx = ExecutionContext::new(); + register_aggregate_csv_by_sql(&mut ctx).await; + let sql = + "EXPLAIN ANALYZE VERBOSE SELECT count(*), c1 FROM aggregate_test_100 group by c1"; + let actual = execute_to_batches(&mut ctx, sql).await; + let formatted = arrow::util::pretty::pretty_format_batches(&actual).unwrap(); + let formatted = normalize_for_explain(&formatted); + + let verbose_needle = "Output Rows | 5"; + assert!( + formatted.contains(verbose_needle), + "did not find '{}' in\n{}", + verbose_needle, + formatted + ); +} + #[tokio::test] async fn csv_explain_plans() { // This test verify the look of each plan in its full cycle plan creation