From b9ff07117e8323fb406312f7f0538510165d0abf Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 23 Dec 2022 13:27:09 -0600 Subject: [PATCH] Non-deprecated support for planning SQL without DDL --- benchmarks/src/bin/tpch.rs | 7 +- datafusion/core/src/execution/context.rs | 90 ++++++++++---------- datafusion/core/src/physical_plan/planner.rs | 20 ++--- datafusion/core/tests/custom_sources.rs | 14 +-- datafusion/core/tests/parquet/mod.rs | 7 +- datafusion/core/tests/sql/aggregates.rs | 3 +- datafusion/core/tests/sql/errors.rs | 45 +++++----- datafusion/core/tests/sql/explain.rs | 6 +- datafusion/core/tests/sql/explain_analyze.rs | 24 ++++-- datafusion/core/tests/sql/mod.rs | 6 +- datafusion/core/tests/sql/projection.rs | 29 +++++-- datafusion/core/tests/sql/udf.rs | 5 +- datafusion/core/tests/tpcds_planning.rs | 2 +- datafusion/proto/README.md | 18 ++-- 14 files changed, 159 insertions(+), 117 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 8a78c357b951..3bcd18871a2e 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -324,18 +324,17 @@ async fn execute_query( debug: bool, enable_scheduler: bool, ) -> Result> { - let plan = ctx.sql(sql).await?; - let plan = plan.into_unoptimized_plan(); + let plan = ctx.sql(sql).await?.into_unoptimized_plan(); if debug { println!("=== Logical plan ===\n{:?}\n", plan); } - let plan = ctx.optimize(&plan)?; + let plan = ctx.dataframe(plan).await?.into_optimized_plan()?; if debug { println!("=== Optimized logical plan ===\n{:?}\n", plan); } - let physical_plan = ctx.create_physical_plan(&plan).await?; + let physical_plan = ctx.dataframe(plan).await?.create_physical_plan().await?; if debug { println!( "=== Physical plan ===\n{}\n", diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 1f61039cd9ea..702414822d31 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -245,26 +245,31 @@ impl SessionContext { self.state.read().config.clone() } - /// Creates a [`DataFrame`] that will execute a SQL query. + /// Creates a [`DataFrame`] that executes a SQL query supported by + /// DataFusion, including DDL (such as `CREATE TABLE`). /// - /// This method is `async` because queries of type `CREATE EXTERNAL TABLE` - /// might require the schema to be inferred. + /// You can use [`Self::plan_sql`] and + /// [`DataFrame::create_physical_plan`] directly if you need read only + /// query support (no way to create external tables, for example) + /// + /// This method is `async` because queries of type `CREATE + /// EXTERNAL TABLE` might require the schema to be inferred. pub async fn sql(&self, sql: &str) -> Result { - let mut statements = DFParser::parse_sql(sql)?; - if statements.len() != 1 { - return Err(DataFusionError::NotImplemented( - "The context currently only supports a single SQL statement".to_string(), - )); - } - - // create a query planner - let plan = { - // TODO: Move catalog off SessionState onto SessionContext - let state = self.state.read(); - let query_planner = SqlToRel::new(&*state); - query_planner.statement_to_plan(statements.pop_front().unwrap())? - }; + let plan = self.plan_sql(sql)?; + self.dataframe(plan).await + } + /// Creates a [`DataFrame`] that will execute the specified + /// LogicalPlan, including DDL (such as `CREATE TABLE`). + /// Use [`Self::dataframe_without_ddl`] if you do not want + /// to support DDL statements. + /// + /// Any DDL statements are executed during this function (not when + /// the [`DataFrame`] is evaluated) + /// + /// This method is `async` because queries of type `CREATE EXTERNAL TABLE` + /// might require the schema to be inferred by performing I/O. + pub async fn dataframe(&self, plan: LogicalPlan) -> Result { match plan { LogicalPlan::CreateExternalTable(cmd) => { self.create_external_table(&cmd).await @@ -492,6 +497,15 @@ impl SessionContext { } } + /// Creates a [`DataFrame`] that will execute the specified + /// LogicalPlan, but will error if the plans represent DDL such as + /// `CREATE TABLE` + /// + /// Use [`Self::dataframe`] to run plans with DDL + pub fn dataframe_without_ddl(&self, plan: LogicalPlan) -> Result { + Ok(DataFrame::new(self.state(), plan)) + } + // return an empty dataframe fn return_empty_dataframe(&self) -> Result { let plan = LogicalPlanBuilder::empty(false).build()?; @@ -559,11 +573,9 @@ impl SessionContext { } Ok(false) } - /// Creates a logical plan. - /// - /// This function is intended for internal use and should not be called directly. - #[deprecated(note = "Use SessionContext::sql which snapshots the SessionState")] - pub fn create_logical_plan(&self, sql: &str) -> Result { + + /// Creates a [`LogicalPlan`] from a SQL query. + pub fn plan_sql(&self, sql: &str) -> Result { let mut statements = DFParser::parse_sql(sql)?; if statements.len() != 1 { @@ -1000,32 +1012,24 @@ impl SessionContext { } /// Optimizes the logical plan by applying optimizer rules. - pub fn optimize(&self, plan: &LogicalPlan) -> Result { - self.state.read().optimize(plan) + #[deprecated( + note = "Use `SessionContext::dataframe_without_ddl` and `DataFrame::into_optimized_plan`" + )] + pub fn optimize(&self, plan: LogicalPlan) -> Result { + self.dataframe_without_ddl(plan)?.into_optimized_plan() } - /// Creates a physical plan from a logical plan. + /// Creates a physical [`ExecutionPlan`] from a [`LogicalPlan`]. + #[deprecated( + note = "Use `SessionContext::::dataframe_without_ddl` and `DataFrame::create_physical_plan`" + )] pub async fn create_physical_plan( &self, - logical_plan: &LogicalPlan, + logical_plan: LogicalPlan, ) -> Result> { - let state_cloned = { - let mut state = self.state.write(); - state.execution_props.start_execution(); - - // We need to clone `state` to release the lock that is not `Send`. We could - // make the lock `Send` by using `tokio::sync::Mutex`, but that would require to - // propagate async even to the `LogicalPlan` building methods. - // Cloning `state` here is fine as we then pass it as immutable `&state`, which - // means that we avoid write consistency issues as the cloned version will not - // be written to. As for eventual modifications that would be applied to the - // original state after it has been cloned, they will not be picked up by the - // clone but that is okay, as it is equivalent to postponing the state update - // by keeping the lock until the end of the function scope. - state.clone() - }; - - state_cloned.create_physical_plan(logical_plan).await + self.dataframe_without_ddl(logical_plan)? + .create_physical_plan() + .await } /// Executes a query and writes the results to a partitioned CSV file. diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 0a598be873cd..4256e29579e3 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1097,7 +1097,7 @@ impl DefaultPhysicalPlanner { // TABLE" -- it must be handled at a higher level (so // that the appropriate table can be registered with // the context) - Err(DataFusionError::Internal( + Err(DataFusionError::Plan( "Unsupported logical plan: CreateExternalTable".to_string(), )) } @@ -1105,7 +1105,7 @@ impl DefaultPhysicalPlanner { // There is no default plan for "PREPARE" -- it must be // handled at a higher level (so that the appropriate // statement can be prepared) - Err(DataFusionError::Internal( + Err(DataFusionError::Plan( "Unsupported logical plan: Prepare".to_string(), )) } @@ -1114,7 +1114,7 @@ impl DefaultPhysicalPlanner { // It must be handled at a higher level (so // that the schema can be registered with // the context) - Err(DataFusionError::Internal( + Err(DataFusionError::Plan( "Unsupported logical plan: CreateCatalogSchema".to_string(), )) } @@ -1123,7 +1123,7 @@ impl DefaultPhysicalPlanner { // It must be handled at a higher level (so // that the schema can be registered with // the context) - Err(DataFusionError::Internal( + Err(DataFusionError::Plan( "Unsupported logical plan: CreateCatalog".to_string(), )) } @@ -1132,7 +1132,7 @@ impl DefaultPhysicalPlanner { // It must be handled at a higher level (so // that the schema can be registered with // the context) - Err(DataFusionError::Internal( + Err(DataFusionError::Plan( "Unsupported logical plan: CreateMemoryTable".to_string(), )) } @@ -1141,7 +1141,7 @@ impl DefaultPhysicalPlanner { // It must be handled at a higher level (so // that the schema can be registered with // the context) - Err(DataFusionError::Internal( + Err(DataFusionError::Plan( "Unsupported logical plan: DropTable".to_string(), )) } @@ -1150,7 +1150,7 @@ impl DefaultPhysicalPlanner { // It must be handled at a higher level (so // that the schema can be registered with // the context) - Err(DataFusionError::Internal( + Err(DataFusionError::Plan( "Unsupported logical plan: DropView".to_string(), )) } @@ -1159,16 +1159,16 @@ impl DefaultPhysicalPlanner { // It must be handled at a higher level (so // that the schema can be registered with // the context) - Err(DataFusionError::Internal( + Err(DataFusionError::Plan( "Unsupported logical plan: CreateView".to_string(), )) } LogicalPlan::SetVariable(_) => { - Err(DataFusionError::Internal( + Err(DataFusionError::Plan( "Unsupported logical plan: SetVariable must be root of the plan".to_string(), )) } - LogicalPlan::Explain(_) => Err(DataFusionError::Internal( + LogicalPlan::Explain(_) => Err(DataFusionError::Plan( "Unsupported logical plan: Explain must be root of the plan".to_string(), )), LogicalPlan::Analyze(a) => { diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index f9d0c555532c..9b53a45b7717 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -212,7 +212,8 @@ async fn custom_source_dataframe() -> Result<()> { .project(vec![col("c2")])? .build()?; - let optimized_plan = ctx.optimize(&logical_plan)?; + let optimized_plan = ctx.dataframe(logical_plan).await?.into_optimized_plan()?; + match &optimized_plan { LogicalPlan::Projection(Projection { input, .. }) => match &**input { LogicalPlan::TableScan(TableScan { @@ -235,7 +236,11 @@ async fn custom_source_dataframe() -> Result<()> { ); assert_eq!(format!("{:?}", optimized_plan), expected); - let physical_plan = ctx.create_physical_plan(&optimized_plan).await?; + let physical_plan = ctx + .dataframe(optimized_plan) + .await? + .create_physical_plan() + .await?; assert_eq!(1, physical_plan.schema().fields().len()); assert_eq!("c2", physical_plan.schema().field(0).name().as_str()); @@ -261,10 +266,7 @@ async fn optimizers_catch_all_statistics() { .await .unwrap(); - let physical_plan = ctx - .create_physical_plan(&df.into_optimized_plan().unwrap()) - .await - .unwrap(); + let physical_plan = df.create_physical_plan().await.unwrap(); // when the optimization kicks in, the source is replaced by an EmptyExec assert!( diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 58f22716bbf1..83f2471e2468 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -210,11 +210,12 @@ impl ContextWithParquet { .expect("getting input"); let pretty_input = pretty_format_batches(&input).unwrap().to_string(); - let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan"); - let physical_plan = self .ctx - .create_physical_plan(&logical_plan) + .dataframe(logical_plan) + .await + .expect("planning") + .create_physical_plan() .await .expect("creating physical plan"); diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 97f043d9e022..47b0d2b0a324 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -1120,9 +1120,8 @@ async fn aggregate_with_alias() -> Result<()> { .project(vec![col("c1"), sum(col("c2")).alias("total_salary")])? .build()?; - let plan = ctx.optimize(&plan)?; + let physical_plan = ctx.dataframe(plan).await?.create_physical_plan().await?; - let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?; assert_eq!("c1", physical_plan.schema().field(0).name().as_str()); assert_eq!( "total_salary", diff --git a/datafusion/core/tests/sql/errors.rs b/datafusion/core/tests/sql/errors.rs index 1a02c3442a1c..1695bef7f6a3 100644 --- a/datafusion/core/tests/sql/errors.rs +++ b/datafusion/core/tests/sql/errors.rs @@ -135,39 +135,46 @@ async fn invalid_qualified_table_references() -> Result<()> { } #[tokio::test] -#[allow(deprecated)] // TODO: Remove this test once create_logical_plan removed +/// This test demonstrates it is posible to run SQL queries in DataFusion without +/// any DDL Support (such as CREATE TABLE / CREATE VIEW, ETC). async fn unsupported_sql_returns_error() -> Result<()> { let ctx = SessionContext::new(); register_aggregate_csv(&ctx).await?; // create view let sql = "create view test_view as select * from aggregate_test_100"; - let plan = ctx.create_logical_plan(sql); - let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await; - assert!(physical_plan.is_err()); + let plan = ctx.plan_sql(sql)?; + let err = ctx + .dataframe_without_ddl(plan)? + .create_physical_plan() + .await + .unwrap_err(); assert_eq!( - format!("{}", physical_plan.unwrap_err()), - "Internal error: Unsupported logical plan: CreateView. \ - This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker" + err.to_string(), + "Error during planning: Unsupported logical plan: CreateView" ); // // drop view let sql = "drop view test_view"; - let plan = ctx.create_logical_plan(sql); - let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await; - assert!(physical_plan.is_err()); + let plan = ctx.plan_sql(sql)?; + let err = ctx + .dataframe_without_ddl(plan)? + .create_physical_plan() + .await + .unwrap_err(); assert_eq!( - format!("{}", physical_plan.unwrap_err()), - "Internal error: Unsupported logical plan: DropView. \ - This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker" + err.to_string(), + "Error during planning: Unsupported logical plan: DropView" ); // // drop table let sql = "drop table aggregate_test_100"; - let plan = ctx.create_logical_plan(sql); - let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await; - assert!(physical_plan.is_err()); + let plan = ctx.plan_sql(sql)?; + let err = ctx + .dataframe_without_ddl(plan)? + .create_physical_plan() + .await + .unwrap_err(); assert_eq!( - format!("{}", physical_plan.unwrap_err()), - "Internal error: Unsupported logical plan: DropTable. \ - This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker" + err.to_string(), + "Error during planning: Unsupported logical plan: DropTable" ); Ok(()) } diff --git a/datafusion/core/tests/sql/explain.rs b/datafusion/core/tests/sql/explain.rs index 2dd4d4f64f3a..80b7278ffe51 100644 --- a/datafusion/core/tests/sql/explain.rs +++ b/datafusion/core/tests/sql/explain.rs @@ -38,7 +38,11 @@ fn optimize_explain() { } // now optimize the plan and expect to see more plans - let optimized_plan = SessionContext::new().optimize(&plan).unwrap(); + let optimized_plan = SessionContext::new() + .dataframe_without_ddl(plan.clone()) + .unwrap() + .into_optimized_plan() + .unwrap(); if let LogicalPlan::Explain(e) = &optimized_plan { // should have more than one plan assert!( diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 62aeb02557a7..fbde168fb276 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -188,7 +188,7 @@ async fn csv_explain_plans() { // Create plan let msg = format!("Creating logical plan for '{}'", sql); let dataframe = ctx.sql(sql).await.expect(&msg); - let logical_schema = dataframe.schema(); + let logical_schema = dataframe.schema().clone(); let plan = dataframe.logical_plan(); // @@ -264,10 +264,10 @@ async fn csv_explain_plans() { // Optimized logical plan // let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan); - let plan = ctx.optimize(plan).expect(&msg); + let plan = dataframe.into_optimized_plan().expect(&msg); let optimized_logical_schema = plan.schema(); // Both schema has to be the same - assert_eq!(logical_schema, optimized_logical_schema.as_ref()); + assert_eq!(&logical_schema, optimized_logical_schema.as_ref()); // // Verify schema let expected = vec![ @@ -339,7 +339,14 @@ async fn csv_explain_plans() { // Physical plan // Create plan let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); - let plan = ctx.create_physical_plan(&plan).await.expect(&msg); + let plan = ctx + .dataframe(plan) + .await + .expect(&msg) + .create_physical_plan() + .await + .unwrap(); + // // Execute plan let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); @@ -563,7 +570,14 @@ async fn csv_explain_verbose_plans() { // Physical plan // Create plan let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); - let plan = ctx.create_physical_plan(&plan).await.expect(&msg); + let plan = ctx + .dataframe(plan) + .await + .expect(&msg) + .create_physical_plan() + .await + .unwrap(); + // // Execute plan let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 41ccdb9a24bb..04f2f0384743 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -1023,9 +1023,9 @@ async fn try_execute_to_batches( let dataframe = ctx.sql(sql).await?; let logical_schema = dataframe.schema().clone(); - let optimized = ctx.optimize(dataframe.logical_plan())?; - let optimized_logical_schema = optimized.schema(); - let results = dataframe.collect().await?; + let optimized = dataframe.into_optimized_plan()?; + let optimized_logical_schema = optimized.schema().clone(); + let results = ctx.dataframe(optimized).await?.collect().await?; assert_eq!(&logical_schema, optimized_logical_schema.as_ref()); Ok(results) diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs index 524317dd6d74..5e4b7a9c14c9 100644 --- a/datafusion/core/tests/sql/projection.rs +++ b/datafusion/core/tests/sql/projection.rs @@ -172,7 +172,7 @@ async fn projection_on_table_scan() -> Result<()> { .project(vec![col("c2")])? .build()?; - let optimized_plan = ctx.optimize(&logical_plan)?; + let optimized_plan = ctx.dataframe(logical_plan).await?.into_optimized_plan()?; match &optimized_plan { LogicalPlan::Projection(Projection { input, .. }) => match &**input { LogicalPlan::TableScan(TableScan { @@ -192,7 +192,11 @@ async fn projection_on_table_scan() -> Result<()> { \n TableScan: test projection=[c2]"; assert_eq!(format!("{:?}", optimized_plan), expected); - let physical_plan = ctx.create_physical_plan(&optimized_plan).await?; + let physical_plan = ctx + .dataframe(optimized_plan) + .await? + .create_physical_plan() + .await?; assert_eq!(1, physical_plan.schema().fields().len()); assert_eq!("c2", physical_plan.schema().field(0).name().as_str()); @@ -215,8 +219,8 @@ async fn preserve_nullability_on_projection() -> Result<()> { .project(vec![col("c1")])? .build()?; - let plan = ctx.optimize(&plan)?; - let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?; + let physical_plan = ctx.dataframe(plan).await?.create_physical_plan().await?; + assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable()); Ok(()) } @@ -248,7 +252,14 @@ async fn project_cast_dictionary() { let logical_plan = builder.project(vec![expr]).unwrap().build().unwrap(); - let physical_plan = ctx.create_physical_plan(&logical_plan).await.unwrap(); + let physical_plan = ctx + .dataframe(logical_plan) + .await + .unwrap() + .create_physical_plan() + .await + .unwrap(); + let actual = collect(physical_plan, ctx.task_ctx()).await.unwrap(); let expected = vec![ @@ -289,7 +300,7 @@ async fn projection_on_memory_scan() -> Result<()> { assert_fields_eq(&plan, vec!["b"]); let ctx = SessionContext::new(); - let optimized_plan = ctx.optimize(&plan)?; + let optimized_plan = ctx.dataframe(plan).await?.into_optimized_plan()?; match &optimized_plan { LogicalPlan::Projection(Projection { input, .. }) => match &**input { LogicalPlan::TableScan(TableScan { @@ -312,7 +323,11 @@ async fn projection_on_memory_scan() -> Result<()> { ); assert_eq!(format!("{:?}", optimized_plan), expected); - let physical_plan = ctx.create_physical_plan(&optimized_plan).await?; + let physical_plan = ctx + .dataframe(optimized_plan) + .await? + .create_physical_plan() + .await?; assert_eq!(1, physical_plan.schema().fields().len()); assert_eq!("b", physical_plan.schema().field(0).name().as_str()); diff --git a/datafusion/core/tests/sql/udf.rs b/datafusion/core/tests/sql/udf.rs index eb416901a3e2..64bff7640710 100644 --- a/datafusion/core/tests/sql/udf.rs +++ b/datafusion/core/tests/sql/udf.rs @@ -90,10 +90,7 @@ async fn scalar_udf() -> Result<()> { "Projection: t.a, t.b, my_add(t.a, t.b)\n TableScan: t projection=[a, b]" ); - let plan = ctx.optimize(&plan)?; - let plan = ctx.create_physical_plan(&plan).await?; - let task_ctx = ctx.task_ctx(); - let result = collect(plan, task_ctx).await?; + let result = ctx.dataframe(plan).await?.collect().await?; let expected = vec![ "+-----+-----+-----------------+", diff --git a/datafusion/core/tests/tpcds_planning.rs b/datafusion/core/tests/tpcds_planning.rs index 6ecd6dcf410d..d731c0f21a8f 100644 --- a/datafusion/core/tests/tpcds_planning.rs +++ b/datafusion/core/tests/tpcds_planning.rs @@ -1069,7 +1069,7 @@ async fn regression_test(query_no: u8, create_physical: bool) -> Result<()> { let df = ctx.sql(sql).await?; let plan = df.into_optimized_plan()?; if create_physical { - let _ = ctx.create_physical_plan(&plan).await?; + let _ = ctx.dataframe(plan).await?.create_physical_plan().await?; } } diff --git a/datafusion/proto/README.md b/datafusion/proto/README.md index e99efab68cc4..6927b18690d0 100644 --- a/datafusion/proto/README.md +++ b/datafusion/proto/README.md @@ -58,10 +58,10 @@ use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes}; #[tokio::main] async fn main() -> Result<()> { let ctx = SessionContext::new(); - ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default()) - .await - ?; - let plan = ctx.table("t1")?.to_logical_plan()?; + let plan = ctx + .read_csv("testdata/test.csv", CsvReadOptions::default()) + .await? + .into_optimized_plan()?; let bytes = logical_plan_to_bytes(&plan)?; let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); @@ -81,11 +81,11 @@ use datafusion_proto::bytes::{physical_plan_from_bytes,physical_plan_to_bytes}; #[tokio::main] async fn main() -> Result<()> { let ctx = SessionContext::new(); - ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default()) - .await - ?; - let logical_plan = ctx.table("t1")?.to_logical_plan()?; - let physical_plan = ctx.create_physical_plan(&logical_plan).await?; + let physical_plan = ctx + .read_csv("testdata/test.csv", CsvReadOptions::default()) + .await? + .create_physical_plan() + .await?; let bytes = physical_plan_to_bytes(physical_plan.clone())?; let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip));