From 9c9444bed85b0ae59b5bd50d52a81b38ef6be683 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 20 Dec 2022 12:40:39 +0000 Subject: [PATCH] Deprecate SessionContext::create_phsyical_plan (#4617) --- benchmarks/src/bin/nyctaxi.rs | 10 +- benchmarks/src/bin/tpch.rs | 2 +- benchmarks/src/tpch.rs | 3 +- datafusion/core/src/dataframe.rs | 48 ++++- datafusion/core/src/datasource/view.rs | 24 +-- datafusion/core/src/execution/context.rs | 43 ++-- .../src/physical_plan/coalesce_batches.rs | 4 +- datafusion/core/src/physical_plan/mod.rs | 7 +- datafusion/core/src/physical_plan/planner.rs | 13 +- datafusion/core/tests/custom_sources.rs | 4 +- datafusion/core/tests/path_partition.rs | 13 +- datafusion/core/tests/sql/aggregates.rs | 7 +- datafusion/core/tests/sql/avro.rs | 7 +- datafusion/core/tests/sql/errors.rs | 52 ++--- datafusion/core/tests/sql/explain_analyze.rs | 61 +++--- datafusion/core/tests/sql/joins.rs | 189 ++++++------------ datafusion/core/tests/sql/json.rs | 7 +- datafusion/core/tests/sql/mod.rs | 66 ++---- datafusion/core/tests/sql/parquet.rs | 21 +- datafusion/core/tests/sql/predicates.rs | 11 +- datafusion/core/tests/sql/projection.rs | 2 +- datafusion/core/tests/sql/references.rs | 2 +- datafusion/core/tests/sql/select.rs | 80 ++------ datafusion/core/tests/sql/subqueries.rs | 59 ++---- datafusion/core/tests/sql/udf.rs | 8 +- datafusion/core/tests/sql/wildcard.rs | 10 +- datafusion/core/tests/sql/window.rs | 37 ++-- datafusion/core/tests/statistics.rs | 26 +-- .../proto/examples/logical_plan_serde.rs | 2 +- .../proto/examples/physical_plan_serde.rs | 4 +- datafusion/proto/src/logical_plan/mod.rs | 15 +- 31 files changed, 305 insertions(+), 532 deletions(-) diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs index e22c71e5e9e3..57024460d28a 100644 --- a/benchmarks/src/bin/nyctaxi.rs +++ b/benchmarks/src/bin/nyctaxi.rs @@ -28,7 +28,6 @@ use datafusion::arrow::util::pretty; use datafusion::error::Result; use datafusion::execution::context::{SessionConfig, SessionContext}; -use datafusion::physical_plan::collect; use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; use structopt::StructOpt; @@ -119,14 +118,11 @@ async fn datafusion_sql_benchmarks( } async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result<()> { - let plan = ctx.create_logical_plan(sql)?; - let plan = ctx.optimize(&plan)?; + let dataframe = ctx.sql(sql).await?; if debug { - println!("Optimized logical plan:\n{:?}", plan); + println!("Optimized logical plan:\n{:?}", dataframe.logical_plan()); } - let physical_plan = ctx.create_physical_plan(&plan).await?; - let task_ctx = ctx.task_ctx(); - let result = collect(physical_plan, task_ctx).await?; + let result = dataframe.collect().await?; if debug { pretty::print_batches(&result)?; } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 699be423862b..6372c0f78e85 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -643,7 +643,7 @@ mod tests { let sql = get_query_sql(query)?; for sql in &sql { let df = ctx.sql(sql.as_str()).await?; - let plan = df.to_logical_plan()?; + let plan = df.to_optimized_plan()?; if !actual.is_empty() { actual += "\n"; } diff --git a/benchmarks/src/tpch.rs b/benchmarks/src/tpch.rs index cb11f30faa5e..4ebaf5ef5762 100644 --- a/benchmarks/src/tpch.rs +++ b/benchmarks/src/tpch.rs @@ -353,8 +353,7 @@ pub async fn convert_tbl( } // create the physical plan - let csv = csv.to_logical_plan()?; - let csv = ctx.create_physical_plan(&csv).await?; + let csv = csv.create_physical_plan().await?; let output_path = output_root_path.join(table); let output_path = output_path.to_str().unwrap().to_owned(); diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 77f7e76159b8..b6dfcfb249c4 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use async_trait::async_trait; use parquet::file::properties::WriterProperties; -use datafusion_common::{Column, DFSchema}; +use datafusion_common::{Column, DFSchema, ScalarValue}; use datafusion_expr::TableProviderFilterPushDown; use crate::arrow::datatypes::Schema; @@ -505,17 +505,40 @@ impl DataFrame { self.plan.schema() } - /// Return the unoptimized logical plan represented by this DataFrame. + /// Return the unoptimized logical plan + pub fn logical_plan(&self) -> &LogicalPlan { + &self.plan + } + + /// Return the logical plan represented by this DataFrame without running the optimizers + /// + /// Note: This method should not be used outside testing, as it loses the snapshot + /// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent + /// operations may take place against a different state pub fn to_unoptimized_plan(self) -> LogicalPlan { self.plan } /// Return the optimized logical plan represented by this DataFrame. - pub fn to_logical_plan(self) -> Result { + /// + /// Note: This method should not be used outside testing, as it loses the snapshot + /// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent + /// operations may take place against a different state + pub fn to_optimized_plan(self) -> Result { // Optimize the plan first for better UX self.session_state.optimize(&self.plan) } + /// Return the optimized logical plan represented by this DataFrame. + /// + /// Note: This method should not be used outside testing, as it loses the snapshot + /// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent + /// operations may take place against a different state + #[deprecated(note = "Use DataFrame::to_optimized_plan")] + pub fn to_logical_plan(self) -> Result { + self.to_optimized_plan() + } + /// Return a DataFrame with the explanation of its plan so far. /// /// if `analyze` is specified, runs the plan and reports metrics @@ -714,6 +737,12 @@ impl DataFrame { } } + /// Convert a prepare logical plan into its inner logical plan with all params replaced with their corresponding values + pub fn with_param_values(self, param_values: Vec) -> Result { + let plan = self.plan.with_param_values(param_values)?; + Ok(Self::new(self.session_state, plan)) + } + /// Cache DataFrame as a memory table. /// /// ``` @@ -1014,11 +1043,10 @@ mod tests { let df = df.select(vec![expr])?; // build query using SQL - let sql_plan = - ctx.create_logical_plan("SELECT my_fn(c12) FROM aggregate_test_100")?; + let sql_plan = ctx.sql("SELECT my_fn(c12) FROM aggregate_test_100").await?; // the two plans should be identical - assert_same_plan(&df.plan, &sql_plan); + assert_same_plan(&df.plan, &sql_plan.plan); Ok(()) } @@ -1128,7 +1156,7 @@ mod tests { async fn create_plan(sql: &str) -> Result { let mut ctx = SessionContext::new(); register_aggregate_csv(&mut ctx, "aggregate_test_100").await?; - ctx.create_logical_plan(sql) + Ok(ctx.sql(sql).await?.to_unoptimized_plan()) } async fn test_table_with_name(name: &str) -> Result { @@ -1308,7 +1336,7 @@ mod tests { \n Inner Join: t1.c1 = t2.c1\ \n TableScan: t1\ \n TableScan: t2", - format!("{:?}", df_renamed.clone().to_unoptimized_plan()) + format!("{:?}", df_renamed.logical_plan()) ); assert_eq!("\ @@ -1322,7 +1350,7 @@ mod tests { \n SubqueryAlias: t2\ \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ \n TableScan: aggregate_test_100 projection=[c1, c2, c3]", - format!("{:?}", df_renamed.clone().to_logical_plan()?) + format!("{:?}", df_renamed.clone().to_optimized_plan()?) ); let df_results = df_renamed.collect().await?; @@ -1469,7 +1497,7 @@ mod tests { assert_eq!( "TableScan: ?table? projection=[c2, c3, sum]", - format!("{:?}", cached_df.clone().to_logical_plan()?) + format!("{:?}", cached_df.clone().to_optimized_plan()?) ); let df_results = df.collect().await?; diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index c50bc61783ee..0502c091ae95 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -493,12 +493,10 @@ mod tests { let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc"; session_ctx.sql(view_sql).await?.collect().await?; - let plan = session_ctx + let dataframe = session_ctx .sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc") - .await? - .to_logical_plan() - .unwrap(); - let plan = session_ctx.optimize(&plan).unwrap(); + .await?; + let plan = dataframe.to_optimized_plan()?; let actual = format!("{}", plan.display_indent()); let expected = "\ Explain\ @@ -507,12 +505,10 @@ mod tests { \n TableScan: abc projection=[column1, column2, column3]"; assert_eq!(expected, actual); - let plan = session_ctx + let dataframe = session_ctx .sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc WHERE column2 = 5") - .await? - .to_logical_plan() - .unwrap(); - let plan = session_ctx.optimize(&plan).unwrap(); + .await?; + let plan = dataframe.to_optimized_plan()?; let actual = format!("{}", plan.display_indent()); let expected = "\ Explain\ @@ -522,12 +518,10 @@ mod tests { \n TableScan: abc projection=[column1, column2, column3]"; assert_eq!(expected, actual); - let plan = session_ctx + let dataframe = session_ctx .sql("EXPLAIN CREATE VIEW xyz AS SELECT column1, column2 FROM abc WHERE column2 = 5") - .await? - .to_logical_plan() - .unwrap(); - let plan = session_ctx.optimize(&plan).unwrap(); + .await?; + let plan = dataframe.to_optimized_plan()?; let actual = format!("{}", plan.display_indent()); let expected = "\ Explain\ diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index d6eb5240ae2e..c4f08e5b1ef7 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -254,7 +254,21 @@ impl SessionContext { /// This method is `async` because queries of type `CREATE EXTERNAL TABLE` /// might require the schema to be inferred. pub async fn sql(&self, sql: &str) -> Result { - let plan = self.create_logical_plan(sql)?; + let mut statements = DFParser::parse_sql(sql)?; + if statements.len() != 1 { + return Err(DataFusionError::NotImplemented( + "The context currently only supports a single SQL statement".to_string(), + )); + } + + // create a query planner + let plan = { + // TODO: Move catalog off SessionState onto SessionContext + let state = self.state.read(); + let query_planner = SqlToRel::new(&*state); + query_planner.statement_to_plan(statements.pop_front().unwrap())? + }; + match plan { LogicalPlan::CreateExternalTable(cmd) => { self.create_external_table(&cmd).await @@ -553,6 +567,7 @@ impl SessionContext { /// Creates a logical plan. /// /// This function is intended for internal use and should not be called directly. + #[deprecated(note = "Use SessionContext::sql which snapshots the SessionState")] pub fn create_logical_plan(&self, sql: &str) -> Result { let mut statements = DFParser::parse_sql(sql)?; @@ -1791,9 +1806,10 @@ impl SessionState { &self, logical_plan: &LogicalPlan, ) -> Result> { - let planner = self.query_planner.clone(); let logical_plan = self.optimize(logical_plan)?; - planner.create_physical_plan(&logical_plan, self).await + self.query_planner + .create_physical_plan(&logical_plan, self) + .await } /// return the configuration options @@ -2046,8 +2062,7 @@ mod tests { use std::fs::File; use std::path::PathBuf; use std::sync::Weak; - use std::thread::{self, JoinHandle}; - use std::{env, io::prelude::*, sync::Mutex}; + use std::{env, io::prelude::*}; use tempfile::TempDir; #[tokio::test] @@ -2267,23 +2282,21 @@ mod tests { // environment. Usecase is for concurrent planing. let tmp_dir = TempDir::new()?; let partition_count = 4; - let ctx = Arc::new(Mutex::new(create_ctx(&tmp_dir, partition_count).await?)); + let ctx = Arc::new(create_ctx(&tmp_dir, partition_count).await?); - let threads: Vec>> = (0..2) + let threads: Vec<_> = (0..2) .map(|_| ctx.clone()) - .map(|ctx_clone| { - thread::spawn(move || { - let ctx = ctx_clone.lock().expect("Locked context"); + .map(|ctx| { + tokio::spawn(async move { // Ensure we can create logical plan code on a separate thread. - ctx.create_logical_plan( - "SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3", - ) + ctx.sql("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3") + .await }) }) .collect(); - for thread in threads { - thread.join().expect("Failed to join thread")?; + for handle in threads { + handle.await.unwrap().unwrap(); } Ok(()) } diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index 4bec6d73db4b..afa2a85abd25 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -347,8 +347,8 @@ mod tests { let partition = create_vec_batches(&schema, 10); let table = MemTable::try_new(schema, vec![partition])?; ctx.register_table("a", Arc::new(table))?; - let plan = ctx.create_logical_plan("SELECT * FROM a WHERE c0 < 1")?; - ctx.create_physical_plan(&plan).await + let dataframe = ctx.sql("SELECT * FROM a WHERE c0 < 1").await?; + dataframe.create_physical_plan().await } #[tokio::test(flavor = "multi_thread")] diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 5a247e16761f..07b4a6abb906 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -317,11 +317,8 @@ pub fn with_new_children_if_necessary( /// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await.unwrap(); /// /// // create a plan to run a SQL query -/// let plan = ctx -/// .create_logical_plan("SELECT a FROM example WHERE a < 5") -/// .unwrap(); -/// let plan = ctx.optimize(&plan).unwrap(); -/// let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); +/// let dataframe = ctx.sql("SELECT a FROM example WHERE a < 5").await.unwrap(); +/// let physical_plan = dataframe.create_physical_plan().await.unwrap(); /// /// // Format using display string /// let displayable_plan = displayable(physical_plan.as_ref()); diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 548a1dd36bd3..7feab12433cc 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -2224,10 +2224,11 @@ mod tests { let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; let ctx = SessionContext::new(); - let logical_plan = - LogicalPlanBuilder::from(ctx.read_table(Arc::new(table))?.to_logical_plan()?) - .aggregate(vec![col("d1")], vec![sum(col("d2"))])? - .build()?; + let logical_plan = LogicalPlanBuilder::from( + ctx.read_table(Arc::new(table))?.to_optimized_plan()?, + ) + .aggregate(vec![col("d1")], vec![sum(col("d2"))])? + .build()?; let execution_plan = plan(&logical_plan).await?; let formatted = format!("{:?}", execution_plan); @@ -2435,7 +2436,7 @@ mod tests { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); - let logical_plan = match ctx.read_csv(path, options).await?.to_logical_plan()? { + let logical_plan = match ctx.read_csv(path, options).await?.to_optimized_plan()? { LogicalPlan::TableScan(ref scan) => { let mut scan = scan.clone(); scan.table_name = name.to_string(); @@ -2458,7 +2459,7 @@ mod tests { let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); Ok(LogicalPlanBuilder::from( - ctx.read_csv(path, options).await?.to_logical_plan()?, + ctx.read_csv(path, options).await?.to_optimized_plan()?, )) } } diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index 260b991f5307..922195572eba 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -208,7 +208,7 @@ async fn custom_source_dataframe() -> Result<()> { let ctx = SessionContext::new(); let table = ctx.read_table(Arc::new(CustomTableProvider))?; - let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()?) + let logical_plan = LogicalPlanBuilder::from(table.to_optimized_plan()?) .project(vec![col("c2")])? .build()?; @@ -262,7 +262,7 @@ async fn optimizers_catch_all_statistics() { .unwrap(); let physical_plan = ctx - .create_physical_plan(&df.to_logical_plan().unwrap()) + .create_physical_plan(&df.to_optimized_plan().unwrap()) .await .unwrap(); diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index 40009bad634b..aeba5ee3c0dd 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -448,9 +448,8 @@ async fn parquet_statistics() -> Result<()> { .await; //// NO PROJECTION //// - let logical_plan = ctx.sql("SELECT * FROM t").await?.to_logical_plan()?; - - let physical_plan = ctx.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql("SELECT * FROM t").await?; + let physical_plan = dataframe.create_physical_plan().await?; assert_eq!(physical_plan.schema().fields().len(), 4); let stat_cols = physical_plan @@ -466,12 +465,8 @@ async fn parquet_statistics() -> Result<()> { assert_eq!(stat_cols[3], ColumnStatistics::default()); //// WITH PROJECTION //// - let logical_plan = ctx - .sql("SELECT mycol, day FROM t WHERE day='28'") - .await? - .to_logical_plan()?; - - let physical_plan = ctx.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql("SELECT mycol, day FROM t WHERE day='28'").await?; + let physical_plan = dataframe.create_physical_plan().await?; assert_eq!(physical_plan.schema().fields().len(), 2); let stat_cols = physical_plan diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 2dd3a8dec8d3..ec44d28c6994 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -25,11 +25,8 @@ async fn csv_query_avg_multi_batch() -> Result<()> { let ctx = SessionContext::new(); register_aggregate_csv(&ctx).await?; let sql = "SELECT avg(c12) FROM aggregate_test_100"; - let plan = ctx.create_logical_plan(sql).unwrap(); - let plan = ctx.optimize(&plan).unwrap(); - let plan = ctx.create_physical_plan(&plan).await.unwrap(); - let task_ctx = ctx.task_ctx(); - let results = collect(plan, task_ctx).await.unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let results = dataframe.collect().await.unwrap(); let batch = &results[0]; let column = batch.column(0); let array = as_float64_array(column)?; diff --git a/datafusion/core/tests/sql/avro.rs b/datafusion/core/tests/sql/avro.rs index 38f4ae7dd808..202a175b2230 100644 --- a/datafusion/core/tests/sql/avro.rs +++ b/datafusion/core/tests/sql/avro.rs @@ -121,11 +121,8 @@ async fn avro_single_nan_schema() { .await .unwrap(); let sql = "SELECT mycol FROM single_nan"; - let plan = ctx.create_logical_plan(sql).unwrap(); - let plan = ctx.optimize(&plan).unwrap(); - let plan = ctx.create_physical_plan(&plan).await.unwrap(); - let runtime = ctx.task_ctx(); - let results = collect(plan, runtime).await.unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let results = dataframe.collect().await.unwrap(); for batch in results { assert_eq!(1, batch.num_rows()); assert_eq!(1, batch.num_columns()); diff --git a/datafusion/core/tests/sql/errors.rs b/datafusion/core/tests/sql/errors.rs index f3761320bf60..05d290a183eb 100644 --- a/datafusion/core/tests/sql/errors.rs +++ b/datafusion/core/tests/sql/errors.rs @@ -23,8 +23,8 @@ async fn csv_query_error() -> Result<()> { let ctx = create_ctx(); register_aggregate_csv(&ctx).await?; let sql = "SELECT sin(c1) FROM aggregate_test_100"; - let plan = ctx.create_logical_plan(sql); - assert!(plan.is_err()); + let err = ctx.sql(sql).await.unwrap_err().to_string(); + assert_eq!(err, "sdlfksdlfks"); Ok(()) } @@ -34,11 +34,8 @@ async fn test_cast_expressions_error() -> Result<()> { let ctx = create_ctx(); register_aggregate_csv(&ctx).await?; let sql = "SELECT CAST(c1 AS INT) FROM aggregate_test_100"; - let plan = ctx.create_logical_plan(sql).unwrap(); - let plan = ctx.optimize(&plan).unwrap(); - let plan = ctx.create_physical_plan(&plan).await.unwrap(); - let task_ctx = ctx.task_ctx(); - let result = collect(plan, task_ctx).await; + let dataframe = ctx.sql(sql).await.unwrap(); + let result = dataframe.collect().await; match result { Ok(_) => panic!("expected error"), @@ -58,10 +55,9 @@ async fn test_aggregation_with_bad_arguments() -> Result<()> { let ctx = SessionContext::new(); register_aggregate_csv(&ctx).await?; let sql = "SELECT COUNT(DISTINCT) FROM aggregate_test_100"; - let logical_plan = ctx.create_logical_plan(sql); - let err = logical_plan.unwrap_err(); + let err = ctx.sql(sql).await.unwrap_err().to_string(); assert_eq!( - err.to_string(), + err, DataFusionError::Plan( "The function Count expects 1 arguments, but 0 were provided".to_string() ) @@ -76,28 +72,25 @@ async fn query_cte_incorrect() -> Result<()> { // self reference let sql = "WITH t AS (SELECT * FROM t) SELECT * from u"; - let plan = ctx.create_logical_plan(sql); - assert!(plan.is_err()); + let err = ctx.sql(sql).await.unwrap_err().to_string(); assert_eq!( - format!("{}", plan.unwrap_err()), + err, "Error during planning: table 'datafusion.public.t' not found" ); // forward referencing let sql = "WITH t AS (SELECT * FROM u), u AS (SELECT 1) SELECT * from u"; - let plan = ctx.create_logical_plan(sql); - assert!(plan.is_err()); + let err = ctx.sql(sql).await.unwrap_err().to_string(); assert_eq!( - format!("{}", plan.unwrap_err()), + err, "Error during planning: table 'datafusion.public.u' not found" ); // wrapping should hide u let sql = "WITH t AS (WITH u as (SELECT 1) SELECT 1) SELECT * from u"; - let plan = ctx.create_logical_plan(sql); - assert!(plan.is_err()); + let err = ctx.sql(sql).await.unwrap_err().to_string(); assert_eq!( - format!("{}", plan.unwrap_err()), + err, "Error during planning: table 'datafusion.public.u' not found" ); @@ -148,31 +141,28 @@ async fn unsupported_sql_returns_error() -> Result<()> { register_aggregate_csv(&ctx).await?; // create view let sql = "create view test_view as select * from aggregate_test_100"; - let plan = ctx.create_logical_plan(sql); - let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await; - assert!(physical_plan.is_err()); + let dataframe = ctx.sql(sql).await.unwrap(); + let err = dataframe.create_physical_plan().await.unwrap_err(); assert_eq!( - format!("{}", physical_plan.unwrap_err()), + err.to_string(), "Internal error: Unsupported logical plan: CreateView. \ This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker" ); // // drop view let sql = "drop view test_view"; - let plan = ctx.create_logical_plan(sql); - let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await; - assert!(physical_plan.is_err()); + let dataframe = ctx.sql(sql).await.unwrap(); + let err = dataframe.create_physical_plan().await.unwrap_err(); assert_eq!( - format!("{}", physical_plan.unwrap_err()), + err.to_string(), "Internal error: Unsupported logical plan: DropView. \ This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker" ); // // drop table let sql = "drop table aggregate_test_100"; - let plan = ctx.create_logical_plan(sql); - let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await; - assert!(physical_plan.is_err()); + let dataframe = ctx.sql(sql).await.unwrap(); + let err = dataframe.create_physical_plan().await.unwrap_err(); assert_eq!( - format!("{}", physical_plan.unwrap_err()), + err.to_string(), "Internal error: Unsupported logical plan: DropTable. \ This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker" ); diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index b6bbb3cff16a..761b38e48db1 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -42,9 +42,8 @@ async fn explain_analyze_baseline_metrics() { SELECT lead(c1, 1) OVER () as cnt FROM (select 1 as c1) AS b \ LIMIT 3"; println!("running query: {}", sql); - let plan = ctx.create_logical_plan(sql).unwrap(); - let plan = ctx.optimize(&plan).unwrap(); - let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let physical_plan = dataframe.create_physical_plan().await.unwrap(); let task_ctx = ctx.task_ctx(); let results = collect(physical_plan.clone(), task_ctx).await.unwrap(); let formatted = arrow::util::pretty::pretty_format_batches(&results) @@ -188,8 +187,10 @@ async fn csv_explain_plans() { // Logical plan // Create plan let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let logical_schema = plan.schema(); + let dataframe = ctx.sql(sql).await.expect(&msg); + let logical_schema = dataframe.schema(); + let plan = dataframe.logical_plan(); + // println!("SQL: {}", sql); // @@ -263,10 +264,10 @@ async fn csv_explain_plans() { // Optimized logical plan // let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan); - let plan = ctx.optimize(&plan).expect(&msg); + let plan = ctx.optimize(plan).expect(&msg); let optimized_logical_schema = plan.schema(); // Both schema has to be the same - assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref()); + assert_eq!(logical_schema, optimized_logical_schema.as_ref()); // // Verify schema let expected = vec![ @@ -411,8 +412,8 @@ async fn csv_explain_verbose_plans() { // Logical plan // Create plan let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let logical_schema = plan.schema(); + let dataframe = ctx.sql(sql).await.expect(&msg); + let logical_schema = dataframe.schema().clone(); // println!("SQL: {}", sql); @@ -424,7 +425,7 @@ async fn csv_explain_verbose_plans() { " Filter: aggregate_test_100.c2 > Int64(10) [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]", " TableScan: aggregate_test_100 [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]", ]; - let formatted = plan.display_indent_schema().to_string(); + let formatted = dataframe.logical_plan().display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); assert_eq!( expected, actual, @@ -439,7 +440,7 @@ async fn csv_explain_verbose_plans() { " Filter: aggregate_test_100.c2 > Int64(10)", " TableScan: aggregate_test_100", ]; - let formatted = plan.display_indent().to_string(); + let formatted = dataframe.logical_plan().display_indent().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); assert_eq!( expected, actual, @@ -476,7 +477,7 @@ async fn csv_explain_verbose_plans() { "}", "// End DataFusion GraphViz Plan", ]; - let formatted = plan.display_graphviz().to_string(); + let formatted = dataframe.logical_plan().display_graphviz().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); assert_eq!( expected, actual, @@ -486,11 +487,11 @@ async fn csv_explain_verbose_plans() { // Optimized logical plan // - let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan); - let plan = ctx.optimize(&plan).expect(&msg); + let msg = format!("Optimizing logical plan for '{}': {:?}", sql, dataframe); + let plan = dataframe.to_optimized_plan().expect(&msg); let optimized_logical_schema = plan.schema(); // Both schema has to be the same - assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref()); + assert_eq!(&logical_schema, optimized_logical_schema.as_ref()); // // Verify schema let expected = vec![ @@ -648,8 +649,8 @@ group by order by revenue desc;"; - let mut plan = ctx.create_logical_plan(sql); - plan = ctx.optimize(&plan.unwrap()); + let dataframe = ctx.sql(sql).await.unwrap(); + let plan = dataframe.to_optimized_plan().unwrap(); let expected = "\ Sort: revenue DESC NULLS FIRST\ @@ -664,7 +665,7 @@ order by \n Filter: lineitem.l_returnflag = Utf8(\"R\")\ \n TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_returnflag], partial_filters=[lineitem.l_returnflag = Utf8(\"R\")]\ \n TableScan: nation projection=[n_nationkey, n_name]"; - assert_eq!(expected, format!("{:?}", plan.unwrap())); + assert_eq!(expected, format!("{:?}", plan)); Ok(()) } @@ -681,10 +682,8 @@ async fn test_physical_plan_display_indent() { GROUP BY c1 \ ORDER BY the_min DESC \ LIMIT 10"; - let plan = ctx.create_logical_plan(sql).unwrap(); - let plan = ctx.optimize(&plan).unwrap(); - - let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let physical_plan = dataframe.create_physical_plan().await.unwrap(); let expected = vec![ "GlobalLimitExec: skip=0, fetch=10", " SortPreservingMergeExec: [the_min@2 DESC]", @@ -728,10 +727,8 @@ async fn test_physical_plan_display_indent_multi_children() { ON c1=c2\ "; - let plan = ctx.create_logical_plan(sql).unwrap(); - let plan = ctx.optimize(&plan).unwrap(); - - let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let physical_plan = dataframe.create_physical_plan().await.unwrap(); let expected = vec![ "ProjectionExec: expr=[c1@0 as c1]", " CoalesceBatchesExec: target_batch_size=4096", @@ -915,15 +912,9 @@ async fn explain_nested() { .set_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY, explain_phy_plan_flag); let ctx = SessionContext::with_config(config); let sql = "EXPLAIN explain select 1"; - let plan = ctx.create_logical_plan(sql).unwrap(); - - let plan = ctx.create_physical_plan(&plan).await; - - assert!(plan - .err() - .unwrap() - .to_string() - .contains("Explain must be root of the plan")); + let dataframe = ctx.sql(sql).await.unwrap(); + let err = dataframe.create_physical_plan().await.unwrap_err(); + assert!(err.to_string().contains("Explain must be root of the plan")); } test_nested_explain(true).await; diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index e3f92cbb3705..8f1b7134cf15 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -119,9 +119,8 @@ async fn equijoin_left_and_condition_from_right() -> Result<()> { let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?; let sql = "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id"; - let res = ctx.create_logical_plan(sql); - assert!(res.is_ok()); - let actual = execute_to_batches(&ctx, sql).await; + let dataframe = ctx.sql(sql).await.unwrap(); + let actual = dataframe.collect().await.unwrap(); let expected = vec![ "+-------+---------+---------+", "| t1_id | t1_name | t2_name |", @@ -145,9 +144,8 @@ async fn equijoin_left_and_not_null_condition_from_right() -> Result<()> { let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?; let sql = "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name is not null ORDER BY t1_id"; - let res = ctx.create_logical_plan(sql); - assert!(res.is_ok()); - let actual = execute_to_batches(&ctx, sql).await; + let dataframe = ctx.sql(sql).await.unwrap(); + let actual = dataframe.collect().await.unwrap(); let expected = vec![ "+-------+---------+---------+", "| t1_id | t1_name | t2_name |", @@ -172,9 +170,8 @@ async fn full_join_sub_query() -> Result<()> { let sql = " SELECT t1_id, t1_name, t2_name FROM (SELECT * from (t1) AS t1) FULL JOIN (SELECT * from (t2) AS t2) ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id, t2_name"; - let res = ctx.create_logical_plan(sql); - assert!(res.is_ok()); - let actual = execute_to_batches(&ctx, sql).await; + let dataframe = ctx.sql(sql).await.unwrap(); + let actual = dataframe.collect().await.unwrap(); let expected = vec![ "+-------+---------+---------+", "| t1_id | t1_name | t2_name |", @@ -200,9 +197,8 @@ async fn equijoin_right_and_condition_from_left() -> Result<()> { let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?; let sql = "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t1_id >= 22 ORDER BY t2_name"; - let res = ctx.create_logical_plan(sql); - assert!(res.is_ok()); - let actual = execute_to_batches(&ctx, sql).await; + let dataframe = ctx.sql(sql).await.unwrap(); + let actual = dataframe.collect().await.unwrap(); let expected = vec![ "+-------+---------+---------+", "| t1_id | t1_name | t2_name |", @@ -272,9 +268,8 @@ async fn equijoin_right_and_condition_from_right() -> Result<()> { let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?; let sql = "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t2_id >= 22 ORDER BY t2_name"; - let res = ctx.create_logical_plan(sql); - assert!(res.is_ok()); - let actual = execute_to_batches(&ctx, sql).await; + let dataframe = ctx.sql(sql).await.unwrap(); + let actual = dataframe.collect().await.unwrap(); let expected = vec![ "+-------+---------+---------+", "| t1_id | t1_name | t2_name |", @@ -297,9 +292,8 @@ async fn equijoin_right_and_condition_from_both() -> Result<()> { let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?; let sql = "SELECT t1_int, t2_int, t2_id FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t2_int <= t1_int ORDER BY t2_id"; - let res = ctx.create_logical_plan(sql); - assert!(res.is_ok()); - let actual = execute_to_batches(&ctx, sql).await; + let dataframe = ctx.sql(sql).await.unwrap(); + let actual = dataframe.collect().await.unwrap(); let expected = vec![ "+--------+--------+-------+", "| t1_int | t2_int | t2_id |", @@ -1356,11 +1350,8 @@ async fn hash_join_with_date32() -> Result<()> { // inner join on hash supported data type (Date32) let sql = "select * from t1 join t2 on t1.c1 = t2.c1"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t1.c1, t1.c2, t1.c3, t1.c4, t2.c1, t2.c2, t2.c3, t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", @@ -1398,11 +1389,8 @@ async fn hash_join_with_date64() -> Result<()> { // left join on hash supported data type (Date64) let sql = "select * from t1 left join t2 on t1.c2 = t2.c2"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t1.c1, t1.c2, t1.c3, t1.c4, t2.c1, t2.c2, t2.c3, t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", @@ -1442,11 +1430,8 @@ async fn hash_join_with_decimal() -> Result<()> { // right join on hash supported data type (Decimal) let sql = "select * from t1 right join t2 on t1.c3 = t2.c3"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t1.c1, t1.c2, t1.c3, t1.c4, t2.c1, t2.c2, t2.c3, t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", @@ -1486,11 +1471,8 @@ async fn hash_join_with_dictionary() -> Result<()> { // inner join on hash supported data type (Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))) let sql = "select * from t1 join t2 on t1.c4 = t2.c4"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t1.c1, t1.c2, t1.c3, t1.c4, t2.c1, t2.c2, t2.c3, t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", @@ -1530,11 +1512,8 @@ async fn reduce_left_join_1() -> Result<()> { let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_id < 100"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", @@ -1577,11 +1556,8 @@ async fn reduce_left_join_2() -> Result<()> { // reduce to inner join let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_int < 10 or (t1.t1_int > 2 and t2.t2_name != 'w')"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; // filter expr: `t2.t2_int < 10 or (t1.t1_int > 2 and t2.t2_name != 'w')` // could be write to: `(t1.t1_int > 2 or t2.t2_int < 10) and (t2.t2_name != 'w' or t2.t2_int < 10)` @@ -1629,11 +1605,8 @@ async fn reduce_left_join_3() -> Result<()> { // reduce subquery to inner join let sql = "select * from (select t1.* from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_int < 3) t3 left join t2 on t3.t1_int = t2.t2_int where t3.t1_id < 100"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t3.t1_id, t3.t1_name, t3.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", @@ -1678,11 +1651,8 @@ async fn reduce_right_join_1() -> Result<()> { // reduce to inner join let sql = "select * from t1 right join t2 on t1.t1_id = t2.t2_id where t1.t1_int is not null"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", @@ -1724,11 +1694,8 @@ async fn reduce_right_join_2() -> Result<()> { // reduce to inner join let sql = "select * from t1 right join t2 on t1.t1_id = t2.t2_id where not(t1.t1_int = t2.t2_int)"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", @@ -1770,11 +1737,8 @@ async fn reduce_full_join_to_right_join() -> Result<()> { // reduce to right join let sql = "select * from t1 full join t2 on t1.t1_id = t2.t2_id where t2.t2_name is not null"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", @@ -1818,11 +1782,8 @@ async fn reduce_full_join_to_left_join() -> Result<()> { let sql = "select * from t1 full join t2 on t1.t1_id = t2.t2_id where t1.t1_name != 'b'"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", @@ -1863,11 +1824,8 @@ async fn reduce_full_join_to_inner_join() -> Result<()> { // reduce to inner join let sql = "select * from t1 full join t2 on t1.t1_id = t2.t2_id where t1.t1_name != 'b' and t2.t2_name = 'x'"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", @@ -1930,10 +1888,8 @@ async fn sort_merge_join_on_date32() -> Result<()> { // inner sort merge join on data type (Date32) let sql = "select * from t1 join t2 on t1.c1 = t2.c1"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let logical_plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; let expected = vec![ "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c1@4 as c1, c2@5 as c2, c3@6 as c3, c4@7 as c4]", " SortMergeJoin: join_type=Inner, on=[(Column { name: \"c1\", index: 0 }, Column { name: \"c1\", index: 0 })]", @@ -1978,10 +1934,8 @@ async fn sort_merge_join_on_decimal() -> Result<()> { // right join on data type (Decimal) let sql = "select * from t1 right join t2 on t1.c3 = t2.c3"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let logical_plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; let expected = vec![ "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c1@4 as c1, c2@5 as c2, c3@6 as c3, c4@7 as c4]", " SortMergeJoin: join_type=Right, on=[(Column { name: \"c3\", index: 2 }, Column { name: \"c3\", index: 2 })]", @@ -2034,10 +1988,8 @@ async fn left_semi_join() -> Result<()> { let sql = "SELECT t1_id, t1_name FROM t1 WHERE t1_id IN (SELECT t2_id FROM t2) ORDER BY t1_id"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let logical_plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; let expected = if repartition_joins { vec![ "SortExec: [t1_id@0 ASC NULLS LAST]", @@ -2225,10 +2177,8 @@ async fn right_semi_join() -> Result<()> { let sql = "SELECT t1_id, t1_name, t1_int FROM t1 WHERE EXISTS (SELECT * FROM t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let logical_plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; let expected = if repartition_joins { vec![ "SortExec: [t1_id@0 ASC NULLS LAST]", " CoalescePartitionsExec", @@ -2314,11 +2264,8 @@ async fn reduce_cross_join_with_expr_join_key_all() -> Result<()> { // reduce to inner join let sql = "select * from t1 cross join t2 where t1.t1_id + 12 = t2.t2_id + 1"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", @@ -2360,11 +2307,8 @@ async fn reduce_cross_join_with_cast_expr_join_key() -> Result<()> { let sql = "select t1.t1_id, t2.t2_id, t1.t1_name from t1 cross join t2 where t1.t1_id + 11 = cast(t2.t2_id as BIGINT)"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: t1.t1_id, t2.t2_id, t1.t1_name [t1_id:UInt32;N, t2_id:UInt32;N, t1_name:Utf8;N]", @@ -2407,11 +2351,8 @@ async fn reduce_cross_join_with_wildcard_and_expr() -> Result<()> { // assert logical plan let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan()?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", @@ -2431,10 +2372,8 @@ async fn reduce_cross_join_with_wildcard_and_expr() -> Result<()> { // assert physical plan let msg = format!("Creating physical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let logical_plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; let expected = if repartition_joins { vec![ "ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int, CAST(t1_id@0 AS Int64) + 11 as t1.t1_id + Int64(11)]", @@ -2505,10 +2444,8 @@ async fn both_side_expr_key_inner_join() -> Result<()> { ON t1.t1_id + cast(12 as INT UNSIGNED) = t2.t2_id + cast(1 as INT UNSIGNED)"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let logical_plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; let expected = if repartition_joins { vec![ @@ -2579,10 +2516,8 @@ async fn left_side_expr_key_inner_join() -> Result<()> { ON t1.t1_id + cast(11 as INT UNSIGNED) = t2.t2_id"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let logical_plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; let expected = if repartition_joins { vec![ @@ -2651,10 +2586,8 @@ async fn right_side_expr_key_inner_join() -> Result<()> { ON t1.t1_id = t2.t2_id - cast(11 as INT UNSIGNED)"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let logical_plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; let expected = if repartition_joins { vec![ @@ -2721,10 +2654,8 @@ async fn select_wildcard_with_expr_key_inner_join() -> Result<()> { ON t1.t1_id = t2.t2_id - cast(11 as INT UNSIGNED)"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let logical_plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; let expected = if repartition_joins { vec![ diff --git a/datafusion/core/tests/sql/json.rs b/datafusion/core/tests/sql/json.rs index e66fcb65561a..b33c0aa46af7 100644 --- a/datafusion/core/tests/sql/json.rs +++ b/datafusion/core/tests/sql/json.rs @@ -60,11 +60,8 @@ async fn json_single_nan_schema() { .await .unwrap(); let sql = "SELECT mycol FROM single_nan"; - let plan = ctx.create_logical_plan(sql).unwrap(); - let plan = ctx.optimize(&plan).unwrap(); - let plan = ctx.create_physical_plan(&plan).await.unwrap(); - let task_ctx = ctx.task_ctx(); - let results = collect(plan, task_ctx).await.unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let results = dataframe.collect().await.unwrap(); for batch in results { assert_eq!(1, batch.num_rows()); assert_eq!(1, batch.num_columns()); diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index d97eb04bd7fe..21117b2f8d23 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -1020,61 +1020,20 @@ async fn try_execute_to_batches( ctx: &SessionContext, sql: &str, ) -> Result> { - let plan = ctx.create_logical_plan(sql)?; - let logical_schema = plan.schema(); + let dataframe = ctx.sql(sql).await?; + let logical_schema = dataframe.schema().clone(); - let plan = ctx.optimize(&plan)?; - let optimized_logical_schema = plan.schema(); + let optimized = ctx.optimize(dataframe.logical_plan())?; + let optimized_logical_schema = optimized.schema(); + let results = dataframe.collect().await?; - let plan = ctx.create_physical_plan(&plan).await?; - - let task_ctx = ctx.task_ctx(); - let results = collect(plan, task_ctx).await?; - - assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref()); + assert_eq!(&logical_schema, optimized_logical_schema.as_ref()); Ok(results) } /// Execute query and return results as a Vec of RecordBatches async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec { - let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(sql) - .map_err(|e| format!("{:?} at {}", e, msg)) - .unwrap(); - let logical_schema = plan.schema(); - - // We are not really interested in the direct output of optimized_logical_plan - // since the physical plan construction already optimizes the given logical plan - // and we want to avoid double-optimization as a consequence. So we just construct - // it here to make sure that it doesn't fail at this step and get the optimized - // schema (to assert later that the logical and optimized schemas are the same). - let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan); - let optimized_logical_plan = ctx - .optimize(&plan) - .map_err(|e| format!("{:?} at {}", e, msg)) - .unwrap(); - let optimized_logical_schema = optimized_logical_plan.schema(); - - let msg = format!( - "Creating physical plan for '{}': {:?}", - sql, optimized_logical_plan - ); - let plan = ctx - .create_physical_plan(&plan) - .await - .map_err(|e| format!("{:?} at {}", e, msg)) - .unwrap(); - - let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); - let task_ctx = ctx.task_ctx(); - let results = collect(plan, task_ctx) - .await - .map_err(|e| format!("{:?} at {}", e, msg)) - .unwrap(); - - assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref()); - results + ctx.sql(sql).await.unwrap().collect().await.unwrap() } /// Execute query and return result set as 2-d table of Vecs @@ -1566,12 +1525,13 @@ async fn nyc() -> Result<()> { ) .await?; - let logical_plan = ctx.create_logical_plan( - "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount) \ + let dataframe = ctx + .sql( + "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount) \ FROM tripdata GROUP BY passenger_count", - )?; - - let optimized_plan = ctx.optimize(&logical_plan)?; + ) + .await?; + let optimized_plan = dataframe.to_optimized_plan().unwrap(); match &optimized_plan { LogicalPlan::Projection(Projection { input, .. }) => match input.as_ref() { diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index e2a33c7c29a1..a7a5dad76bfd 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -157,11 +157,8 @@ async fn fixed_size_binary_columns() { .await .unwrap(); let sql = "SELECT ids FROM t0 ORDER BY ids"; - let plan = ctx.create_logical_plan(sql).unwrap(); - let plan = ctx.optimize(&plan).unwrap(); - let plan = ctx.create_physical_plan(&plan).await.unwrap(); - let task_ctx = ctx.task_ctx(); - let results = collect(plan, task_ctx).await.unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let results = dataframe.collect().await.unwrap(); for batch in results { assert_eq!(466, batch.num_rows()); assert_eq!(1, batch.num_columns()); @@ -180,11 +177,8 @@ async fn parquet_single_nan_schema() { .await .unwrap(); let sql = "SELECT mycol FROM single_nan"; - let plan = ctx.create_logical_plan(sql).unwrap(); - let plan = ctx.optimize(&plan).unwrap(); - let plan = ctx.create_physical_plan(&plan).await.unwrap(); - let task_ctx = ctx.task_ctx(); - let results = collect(plan, task_ctx).await.unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let results = dataframe.collect().await.unwrap(); for batch in results { assert_eq!(1, batch.num_rows()); assert_eq!(1, batch.num_columns()); @@ -218,11 +212,8 @@ async fn parquet_list_columns() { ])); let sql = "SELECT int64_list, utf8_list FROM list_columns"; - let plan = ctx.create_logical_plan(sql).unwrap(); - let plan = ctx.optimize(&plan).unwrap(); - let plan = ctx.create_physical_plan(&plan).await.unwrap(); - let task_ctx = ctx.task_ctx(); - let results = collect(plan, task_ctx).await.unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let results = dataframe.collect().await.unwrap(); // int64_list utf8_list // 0 [1, 2, 3] [abc, efg, hij] diff --git a/datafusion/core/tests/sql/predicates.rs b/datafusion/core/tests/sql/predicates.rs index 0510b12637b2..aa52044ba0b8 100644 --- a/datafusion/core/tests/sql/predicates.rs +++ b/datafusion/core/tests/sql/predicates.rs @@ -581,9 +581,8 @@ async fn multiple_or_predicates() -> Result<()> { and p_size between 1 and 15 )"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(sql).await.expect(&msg); + let plan = dataframe.to_optimized_plan().unwrap(); // Note that we expect `part.p_partkey = lineitem.l_partkey` to have been // factored out and appear only once in the following plan let expected = vec![ @@ -644,10 +643,8 @@ where group by p_partkey ;"#; - let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(sql).await.unwrap(); + let plan = dataframe.to_optimized_plan().unwrap(); let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); let expected =vec![ diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs index 3163482e8b4e..5326aaf5d11f 100644 --- a/datafusion/core/tests/sql/projection.rs +++ b/datafusion/core/tests/sql/projection.rs @@ -168,7 +168,7 @@ async fn projection_on_table_scan() -> Result<()> { let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?; let table = ctx.table("test")?; - let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()?) + let logical_plan = LogicalPlanBuilder::from(table.to_optimized_plan()?) .project(vec![col("c2")])? .build()?; diff --git a/datafusion/core/tests/sql/references.rs b/datafusion/core/tests/sql/references.rs index 52a82f071ff2..1a7866917f35 100644 --- a/datafusion/core/tests/sql/references.rs +++ b/datafusion/core/tests/sql/references.rs @@ -64,7 +64,7 @@ async fn qualified_table_references_and_fields() -> Result<()> { // referring to the unquoted column is an error let sql = r#"SELECT f1.c1 from test"#; - let error = ctx.create_logical_plan(sql).unwrap_err(); + let error = ctx.sql(sql).await.unwrap_err(); assert_contains!( error.to_string(), "No field named 'f1'.'c1'. Valid fields are 'test'.'f.c1', 'test'.'test.c2'" diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index e7d6beedf464..135550d174df 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -16,10 +16,7 @@ // under the License. use super::*; -use datafusion::{ - datasource::empty::EmptyTable, from_slice::FromSlice, - physical_plan::collect_partitioned, -}; +use datafusion::{datasource::empty::EmptyTable, from_slice::FromSlice}; use datafusion_common::ScalarValue; use tempfile::TempDir; @@ -28,38 +25,31 @@ async fn select_values_list() -> Result<()> { let ctx = SessionContext::new(); { let sql = "VALUES"; - let plan = ctx.create_logical_plan(sql); - assert!(plan.is_err()); + ctx.sql(sql).await.unwrap_err(); } { let sql = "VALUES ()"; - let plan = ctx.create_logical_plan(sql); - assert!(plan.is_err()); + ctx.sql(sql).await.unwrap_err(); } { let sql = "VALUES (1),()"; - let plan = ctx.create_logical_plan(sql); - assert!(plan.is_err()); + ctx.sql(sql).await.unwrap_err(); } { let sql = "VALUES (1),(1,2)"; - let plan = ctx.create_logical_plan(sql); - assert!(plan.is_err()); + ctx.sql(sql).await.unwrap_err(); } { let sql = "VALUES (1),('2')"; - let plan = ctx.create_logical_plan(sql); - assert!(plan.is_err()); + ctx.sql(sql).await.unwrap_err(); } { let sql = "VALUES (1),(2.0)"; - let plan = ctx.create_logical_plan(sql); - assert!(plan.is_err()); + ctx.sql(sql).await.unwrap_err(); } { let sql = "VALUES (1,2), (1,'2')"; - let plan = ctx.create_logical_plan(sql); - assert!(plan.is_err()); + ctx.sql(sql).await.unwrap_err(); } { let sql = "VALUES (1,'a'),(NULL,'b'),(3,'c')"; @@ -1275,32 +1265,14 @@ async fn test_prepare_statement() -> Result<()> { // sql to statement then to prepare logical plan with parameters // c1 defined as UINT32, c2 defined as UInt64 but the params are Int32 and Float64 - let logical_plan = - ctx.create_logical_plan("PREPARE my_plan(INT, DOUBLE) AS SELECT c1, c2 FROM test WHERE c1 > $2 AND c1 < $1")?; + let dataframe = + ctx.sql("PREPARE my_plan(INT, DOUBLE) AS SELECT c1, c2 FROM test WHERE c1 > $2 AND c1 < $1").await?; // prepare logical plan to logical plan without parameters let param_values = vec![ScalarValue::Int32(Some(3)), ScalarValue::Float64(Some(0.0))]; - let logical_plan = logical_plan.with_param_values(param_values)?; + let dataframe = dataframe.with_param_values(param_values)?; + let results = dataframe.collect().await?; - // logical plan to optimized logical plan - let logical_plan = ctx.optimize(&logical_plan)?; - - // optimized logical plan to physical plan - let physical_plan = ctx.create_physical_plan(&logical_plan).await?; - - let task_ctx = ctx.task_ctx(); - let results = collect_partitioned(physical_plan, task_ctx).await?; - - // note that the order of partitions is not deterministic - let mut num_rows = 0; - for partition in &results { - for batch in partition { - num_rows += batch.num_rows(); - } - } - assert_eq!(20, num_rows); - - let results: Vec = results.into_iter().flatten().collect(); let expected = vec![ "+----+----+", "| c1 | c2 |", @@ -1338,25 +1310,10 @@ async fn parallel_query_with_filter() -> Result<()> { let partition_count = 4; let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?; - let logical_plan = - ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?; - let logical_plan = ctx.optimize(&logical_plan)?; - - let physical_plan = ctx.create_physical_plan(&logical_plan).await?; - - let task_ctx = ctx.task_ctx(); - let results = collect_partitioned(physical_plan, task_ctx).await?; - - // note that the order of partitions is not deterministic - let mut num_rows = 0; - for partition in &results { - for batch in partition { - num_rows += batch.num_rows(); - } - } - assert_eq!(20, num_rows); - - let results: Vec = results.into_iter().flatten().collect(); + let dataframe = ctx + .sql("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3") + .await?; + let results = dataframe.collect().await.unwrap(); let expected = vec![ "+----+----+", "| c1 | c2 |", @@ -1472,7 +1429,7 @@ async fn unprojected_filter() { .select(vec![col("i") + col("i")]) .unwrap(); - let plan = df.clone().to_logical_plan().unwrap(); + let plan = df.clone().to_optimized_plan().unwrap(); println!("{}", plan.display_indent()); let results = df.collect().await.unwrap(); @@ -1499,8 +1456,7 @@ async fn case_sensitive_in_default_dialect() { { let sql = "select \"int32\" from t"; - let plan = ctx.create_logical_plan(sql); - assert!(plan.is_err()); + ctx.sql(sql).await.unwrap_err(); } { diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs index e6c98edf5986..c81b9d52f609 100644 --- a/datafusion/core/tests/sql/subqueries.rs +++ b/datafusion/core/tests/sql/subqueries.rs @@ -45,10 +45,10 @@ where c_acctbal < ( ) order by c_custkey;"#; // assert plan - let plan = ctx.create_logical_plan(sql).unwrap(); - debug!("input:\n{}", plan.display_indent()); + let dataframe = ctx.sql(sql).await.unwrap(); + debug!("input:\n{}", dataframe.logical_plan().display_indent()); - let plan = ctx.optimize(&plan).unwrap(); + let plan = dataframe.to_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); let expected = "Sort: customer.c_custkey ASC NULLS LAST\ \n Projection: customer.c_custkey\ @@ -91,8 +91,8 @@ where o_orderstatus in ( );"#; // assert plan - let plan = ctx.create_logical_plan(sql).unwrap(); - let plan = ctx.optimize(&plan).unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let plan = dataframe.to_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); let expected = "Projection: orders.o_orderkey\ \n LeftSemi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey = __sq_1.l_orderkey\ @@ -137,8 +137,8 @@ where p_partkey = ps_partkey and s_suppkey = ps_suppkey and p_size = 15 and p_ty order by s_acctbal desc, n_name, s_name, p_partkey;"#; // assert plan - let plan = ctx.create_logical_plan(sql).unwrap(); - let plan = ctx.optimize(&plan).unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let plan = dataframe.to_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); let expected = "Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST\ \n Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment\ @@ -201,8 +201,8 @@ async fn tpch_q4_correlated() -> Result<()> { "#; // assert plan - let plan = ctx.create_logical_plan(sql).unwrap(); - let plan = ctx.optimize(&plan).unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let plan = dataframe.to_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); let expected = r#"Sort: orders.o_orderpriority ASC NULLS LAST Projection: orders.o_orderpriority, COUNT(UInt8(1)) AS order_count @@ -252,15 +252,8 @@ async fn tpch_q17_correlated() -> Result<()> { );"#; // assert plan - let plan = ctx - .create_logical_plan(sql) - .map_err(|e| format!("{:?} at {}", e, "error")) - .unwrap(); - println!("before:\n{}", plan.display_indent()); - let plan = ctx - .optimize(&plan) - .map_err(|e| format!("{:?} at {}", e, "error")) - .unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let plan = dataframe.to_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); let expected = r#"Projection: CAST(SUM(lineitem.l_extendedprice) AS Decimal128(38, 33)) / CAST(Float64(7) AS Decimal128(38, 33)) AS avg_yearly Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]] @@ -313,14 +306,8 @@ order by s_name; "#; // assert plan - let plan = ctx - .create_logical_plan(sql) - .map_err(|e| format!("{:?} at {}", e, "error")) - .unwrap(); - let plan = ctx - .optimize(&plan) - .map_err(|e| format!("{:?} at {}", e, "error")) - .unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let plan = dataframe.to_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); let expected = "Sort: supplier.s_name ASC NULLS LAST\ \n Projection: supplier.s_name, supplier.s_address\ @@ -374,14 +361,8 @@ group by cntrycode order by cntrycode;"#; // assert plan - let plan = ctx - .create_logical_plan(sql) - .map_err(|e| format!("{:?} at {}", e, "error")) - .unwrap(); - let plan = ctx - .optimize(&plan) - .map_err(|e| format!("{:?} at {}", e, "error")) - .unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let plan = dataframe.to_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); let expected = "Sort: custsale.cntrycode ASC NULLS LAST\ \n Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal\ @@ -436,14 +417,8 @@ order by value desc; "#; // assert plan - let plan = ctx - .create_logical_plan(sql) - .map_err(|e| format!("{:?} at {}", e, "error")) - .unwrap(); - let plan = ctx - .optimize(&plan) - .map_err(|e| format!("{:?} at {}", e, "error")) - .unwrap(); + let dataframe = ctx.sql(sql).await.unwrap(); + let plan = dataframe.to_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); let expected = "Sort: value DESC NULLS FIRST\ \n Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value\ diff --git a/datafusion/core/tests/sql/udf.rs b/datafusion/core/tests/sql/udf.rs index 3a150dc7c85f..6918ff69042b 100644 --- a/datafusion/core/tests/sql/udf.rs +++ b/datafusion/core/tests/sql/udf.rs @@ -77,7 +77,7 @@ async fn scalar_udf() -> Result<()> { let t = ctx.table("t")?; - let plan = LogicalPlanBuilder::from(t.to_logical_plan()?) + let plan = LogicalPlanBuilder::from(t.to_optimized_plan()?) .project(vec![ col("a"), col("b"), @@ -169,8 +169,8 @@ async fn simple_udaf() -> Result<()> { Ok(()) } -#[test] -fn udaf_as_window_func() -> Result<()> { +#[tokio::test] +async fn udaf_as_window_func() -> Result<()> { #[derive(Debug)] struct MyAccumulator; @@ -222,7 +222,7 @@ fn udaf_as_window_func() -> Result<()> { WindowAggr: windowExpr=[[AggregateUDF { name: "my_acc", signature: Signature { type_signature: Exact([Int32]), volatility: Immutable }, fun: "" }(my_table.b) PARTITION BY [my_table.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] TableScan: my_table"#; - let plan = context.create_logical_plan(sql)?; + let plan = context.sql(sql).await.unwrap(); assert_eq!(format!("{:?}", plan), expected); Ok(()) } diff --git a/datafusion/core/tests/sql/wildcard.rs b/datafusion/core/tests/sql/wildcard.rs index ddf50dfa813f..a55ccb80f282 100644 --- a/datafusion/core/tests/sql/wildcard.rs +++ b/datafusion/core/tests/sql/wildcard.rs @@ -136,14 +136,8 @@ async fn select_wrong_qualified_wildcard() -> Result<()> { register_aggregate_simple_csv(&ctx).await?; let sql = "SELECT agg.* FROM aggregate_simple order by c1"; - let result = ctx.create_logical_plan(sql); - match result { - Ok(_) => panic!("unexpected OK"), - Err(err) => assert_eq!( - err.to_string(), - "Error during planning: Invalid qualifier agg" - ), - }; + let err = ctx.sql(sql).await.unwrap_err().to_string(); + assert_eq!(err, "Error during planning: Invalid qualifier agg"); Ok(()) } diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index 72af7ae8a7c5..3d2948e17b50 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -325,11 +325,8 @@ async fn window_expr_eliminate() -> Result<()> { ORDER BY d.b;"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let state = ctx.state(); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan().unwrap(); let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Sort: d.b ASC NULLS LAST [b:Utf8, max_a:Int64;N]", @@ -391,10 +388,8 @@ async fn window_expr_eliminate() -> Result<()> { GROUP BY d.b ORDER BY d.b;"; - let plan = ctx - .create_logical_plan(&("explain ".to_owned() + sql)) - .expect(&msg); - let plan = state.optimize(&plan)?; + let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); + let plan = dataframe.to_optimized_plan().unwrap(); let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Sort: d.b ASC NULLS LAST [b:Utf8, max_a:Int64;N, MAX(d.seq):UInt64;N]", @@ -1621,10 +1616,8 @@ async fn test_window_agg_sort() -> Result<()> { FROM aggregate_test_100"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let logical_plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; let formatted = displayable(physical_plan.as_ref()).indent().to_string(); // Only 1 SortExec was added let expected = { @@ -1655,10 +1648,8 @@ async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> { let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c2), SUM(c9) OVER (), MIN(c9) OVER (ORDER BY c2, c9) from aggregate_test_100"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let logical_plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; let formatted = displayable(physical_plan.as_ref()).indent().to_string(); // Only 1 SortExec was added let expected = { @@ -1690,10 +1681,8 @@ async fn over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()> let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c9, c2), SUM(c9) OVER (), MIN(c9) OVER (ORDER BY c2, c9) from aggregate_test_100 ORDER BY c2"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let logical_plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; let formatted = displayable(physical_plan.as_ref()).indent().to_string(); // 3 SortExec are added let expected = { @@ -1732,10 +1721,8 @@ async fn test_window_partition_by_order_by() -> Result<()> { FROM aggregate_test_100"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); - let state = ctx.state(); - let logical_plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await.unwrap(); let formatted = displayable(physical_plan.as_ref()).indent().to_string(); // Only 1 SortExec was added let expected = { diff --git a/datafusion/core/tests/statistics.rs b/datafusion/core/tests/statistics.rs index bb43c5c4dbbb..1e40117cdc85 100644 --- a/datafusion/core/tests/statistics.rs +++ b/datafusion/core/tests/statistics.rs @@ -211,11 +211,7 @@ async fn sql_basic() -> Result<()> { let ctx = init_ctx(stats.clone(), schema)?; let df = ctx.sql("SELECT * from stats_table").await.unwrap(); - - let physical_plan = ctx - .create_physical_plan(&df.to_logical_plan()?) - .await - .unwrap(); + let physical_plan = df.create_physical_plan().await.unwrap(); // the statistics should be those of the source assert_eq!(stats, physical_plan.statistics()); @@ -233,10 +229,7 @@ async fn sql_filter() -> Result<()> { .await .unwrap(); - let physical_plan = ctx - .create_physical_plan(&df.to_logical_plan()?) - .await - .unwrap(); + let physical_plan = df.create_physical_plan().await.unwrap(); let stats = physical_plan.statistics(); assert!(!stats.is_exact); @@ -251,10 +244,7 @@ async fn sql_limit() -> Result<()> { let ctx = init_ctx(stats.clone(), schema)?; let df = ctx.sql("SELECT * FROM stats_table LIMIT 5").await.unwrap(); - let physical_plan = ctx - .create_physical_plan(&df.to_logical_plan()?) - .await - .unwrap(); + let physical_plan = df.create_physical_plan().await.unwrap(); // when the limit is smaller than the original number of lines // we loose all statistics except the for number of rows which becomes the limit assert_eq!( @@ -270,10 +260,7 @@ async fn sql_limit() -> Result<()> { .sql("SELECT * FROM stats_table LIMIT 100") .await .unwrap(); - let physical_plan = ctx - .create_physical_plan(&df.to_logical_plan()?) - .await - .unwrap(); + let physical_plan = df.create_physical_plan().await.unwrap(); // when the limit is larger than the original number of lines, statistics remain unchanged assert_eq!(stats, physical_plan.statistics()); @@ -290,10 +277,7 @@ async fn sql_window() -> Result<()> { .await .unwrap(); - let physical_plan = ctx - .create_physical_plan(&df.to_logical_plan()?) - .await - .unwrap(); + let physical_plan = df.create_physical_plan().await.unwrap(); let result = physical_plan.statistics(); diff --git a/datafusion/proto/examples/logical_plan_serde.rs b/datafusion/proto/examples/logical_plan_serde.rs index d98d88b2a46a..1ecb7daf6e37 100644 --- a/datafusion/proto/examples/logical_plan_serde.rs +++ b/datafusion/proto/examples/logical_plan_serde.rs @@ -24,7 +24,7 @@ async fn main() -> Result<()> { let ctx = SessionContext::new(); ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default()) .await?; - let plan = ctx.table("t1")?.to_logical_plan()?; + let plan = ctx.table("t1")?.to_optimized_plan()?; let bytes = logical_plan_to_bytes(&plan)?; let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); diff --git a/datafusion/proto/examples/physical_plan_serde.rs b/datafusion/proto/examples/physical_plan_serde.rs index 3fdcff2f0121..decdad55bc5c 100644 --- a/datafusion/proto/examples/physical_plan_serde.rs +++ b/datafusion/proto/examples/physical_plan_serde.rs @@ -24,8 +24,8 @@ async fn main() -> Result<()> { let ctx = SessionContext::new(); ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default()) .await?; - let logical_plan = ctx.table("t1")?.to_logical_plan()?; - let physical_plan = ctx.create_physical_plan(&logical_plan).await?; + let dataframe = ctx.table("t1")?; + let physical_plan = dataframe.create_physical_plan().await?; let bytes = physical_plan_to_bytes(physical_plan.clone())?; let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx)?; assert_eq!( diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index d099f4022abb..b25c5ee0bc37 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1443,7 +1443,7 @@ mod roundtrip_tests { let ctx = SessionContext::new(); ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default()) .await?; - let scan = ctx.table("t1")?.to_logical_plan()?; + let scan = ctx.table("t1")?.to_optimized_plan()?; let topk_plan = LogicalPlan::Extension(Extension { node: Arc::new(TopKPlanNode::new(3, scan, col("revenue"))), }); @@ -1540,7 +1540,7 @@ mod roundtrip_tests { ctx.sql(sql).await.unwrap(); let codec = TestTableProviderCodec {}; - let scan = ctx.table("t")?.to_logical_plan()?; + let scan = ctx.table("t")?.to_optimized_plan()?; let bytes = logical_plan_to_bytes_with_extension_codec(&scan, &codec)?; let logical_round_trip = logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?; @@ -1566,7 +1566,7 @@ mod roundtrip_tests { let query = "SELECT a, SUM(b + 1) as b_sum FROM t1 GROUP BY a ORDER BY b_sum DESC"; - let plan = ctx.sql(query).await?.to_logical_plan()?; + let plan = ctx.sql(query).await?.to_optimized_plan()?; let bytes = logical_plan_to_bytes(&plan)?; let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; @@ -1592,7 +1592,7 @@ mod roundtrip_tests { .await?; let query = "SELECT a, COUNT(DISTINCT b) as b_cd FROM t1 GROUP BY a"; - let plan = ctx.sql(query).await?.to_logical_plan()?; + let plan = ctx.sql(query).await?.to_optimized_plan()?; let bytes = logical_plan_to_bytes(&plan)?; let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; @@ -1606,7 +1606,7 @@ mod roundtrip_tests { let ctx = SessionContext::new(); ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default()) .await?; - let plan = ctx.table("t1")?.to_logical_plan()?; + let plan = ctx.table("t1")?.to_optimized_plan()?; let bytes = logical_plan_to_bytes(&plan)?; let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); @@ -1620,7 +1620,10 @@ mod roundtrip_tests { .await?; ctx.sql("CREATE VIEW view_t1(a, b) AS SELECT a, b FROM t1") .await?; - let plan = ctx.sql("SELECT * FROM view_t1").await?.to_logical_plan()?; + let plan = ctx + .sql("SELECT * FROM view_t1") + .await? + .to_optimized_plan()?; let bytes = logical_plan_to_bytes(&plan)?; let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));