Skip to content

Commit

Permalink
Deprecate SessionContext::create_phsyical_plan (apache#4617)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Dec 20, 2022
1 parent 975ff15 commit 9c9444b
Show file tree
Hide file tree
Showing 31 changed files with 305 additions and 532 deletions.
10 changes: 3 additions & 7 deletions benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use datafusion::arrow::util::pretty;
use datafusion::error::Result;
use datafusion::execution::context::{SessionConfig, SessionContext};

use datafusion::physical_plan::collect;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use structopt::StructOpt;

Expand Down Expand Up @@ -119,14 +118,11 @@ async fn datafusion_sql_benchmarks(
}

async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result<()> {
let plan = ctx.create_logical_plan(sql)?;
let plan = ctx.optimize(&plan)?;
let dataframe = ctx.sql(sql).await?;
if debug {
println!("Optimized logical plan:\n{:?}", plan);
println!("Optimized logical plan:\n{:?}", dataframe.logical_plan());
}
let physical_plan = ctx.create_physical_plan(&plan).await?;
let task_ctx = ctx.task_ctx();
let result = collect(physical_plan, task_ctx).await?;
let result = dataframe.collect().await?;
if debug {
pretty::print_batches(&result)?;
}
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ mod tests {
let sql = get_query_sql(query)?;
for sql in &sql {
let df = ctx.sql(sql.as_str()).await?;
let plan = df.to_logical_plan()?;
let plan = df.to_optimized_plan()?;
if !actual.is_empty() {
actual += "\n";
}
Expand Down
3 changes: 1 addition & 2 deletions benchmarks/src/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,7 @@ pub async fn convert_tbl(
}

// create the physical plan
let csv = csv.to_logical_plan()?;
let csv = ctx.create_physical_plan(&csv).await?;
let csv = csv.create_physical_plan().await?;

let output_path = output_root_path.join(table);
let output_path = output_path.to_str().unwrap().to_owned();
Expand Down
48 changes: 38 additions & 10 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use parquet::file::properties::WriterProperties;

use datafusion_common::{Column, DFSchema};
use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_expr::TableProviderFilterPushDown;

use crate::arrow::datatypes::Schema;
Expand Down Expand Up @@ -505,17 +505,40 @@ impl DataFrame {
self.plan.schema()
}

/// Return the unoptimized logical plan represented by this DataFrame.
/// Return the unoptimized logical plan
pub fn logical_plan(&self) -> &LogicalPlan {
&self.plan
}

/// Return the logical plan represented by this DataFrame without running the optimizers
///
/// Note: This method should not be used outside testing, as it loses the snapshot
/// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent
/// operations may take place against a different state
pub fn to_unoptimized_plan(self) -> LogicalPlan {
self.plan
}

/// Return the optimized logical plan represented by this DataFrame.
pub fn to_logical_plan(self) -> Result<LogicalPlan> {
///
/// Note: This method should not be used outside testing, as it loses the snapshot
/// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent
/// operations may take place against a different state
pub fn to_optimized_plan(self) -> Result<LogicalPlan> {
// Optimize the plan first for better UX
self.session_state.optimize(&self.plan)
}

/// Return the optimized logical plan represented by this DataFrame.
///
/// Note: This method should not be used outside testing, as it loses the snapshot
/// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent
/// operations may take place against a different state
#[deprecated(note = "Use DataFrame::to_optimized_plan")]
pub fn to_logical_plan(self) -> Result<LogicalPlan> {
self.to_optimized_plan()
}

/// Return a DataFrame with the explanation of its plan so far.
///
/// if `analyze` is specified, runs the plan and reports metrics
Expand Down Expand Up @@ -714,6 +737,12 @@ impl DataFrame {
}
}

/// Convert a prepare logical plan into its inner logical plan with all params replaced with their corresponding values
pub fn with_param_values(self, param_values: Vec<ScalarValue>) -> Result<Self> {
let plan = self.plan.with_param_values(param_values)?;
Ok(Self::new(self.session_state, plan))
}

/// Cache DataFrame as a memory table.
///
/// ```
Expand Down Expand Up @@ -1014,11 +1043,10 @@ mod tests {
let df = df.select(vec![expr])?;

// build query using SQL
let sql_plan =
ctx.create_logical_plan("SELECT my_fn(c12) FROM aggregate_test_100")?;
let sql_plan = ctx.sql("SELECT my_fn(c12) FROM aggregate_test_100").await?;

// the two plans should be identical
assert_same_plan(&df.plan, &sql_plan);
assert_same_plan(&df.plan, &sql_plan.plan);

Ok(())
}
Expand Down Expand Up @@ -1128,7 +1156,7 @@ mod tests {
async fn create_plan(sql: &str) -> Result<LogicalPlan> {
let mut ctx = SessionContext::new();
register_aggregate_csv(&mut ctx, "aggregate_test_100").await?;
ctx.create_logical_plan(sql)
Ok(ctx.sql(sql).await?.to_unoptimized_plan())
}

async fn test_table_with_name(name: &str) -> Result<DataFrame> {
Expand Down Expand Up @@ -1308,7 +1336,7 @@ mod tests {
\n Inner Join: t1.c1 = t2.c1\
\n TableScan: t1\
\n TableScan: t2",
format!("{:?}", df_renamed.clone().to_unoptimized_plan())
format!("{:?}", df_renamed.logical_plan())
);

assert_eq!("\
Expand All @@ -1322,7 +1350,7 @@ mod tests {
\n SubqueryAlias: t2\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
format!("{:?}", df_renamed.clone().to_logical_plan()?)
format!("{:?}", df_renamed.clone().to_optimized_plan()?)
);

let df_results = df_renamed.collect().await?;
Expand Down Expand Up @@ -1469,7 +1497,7 @@ mod tests {

assert_eq!(
"TableScan: ?table? projection=[c2, c3, sum]",
format!("{:?}", cached_df.clone().to_logical_plan()?)
format!("{:?}", cached_df.clone().to_optimized_plan()?)
);

let df_results = df.collect().await?;
Expand Down
24 changes: 9 additions & 15 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,10 @@ mod tests {
let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
session_ctx.sql(view_sql).await?.collect().await?;

let plan = session_ctx
let dataframe = session_ctx
.sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc")
.await?
.to_logical_plan()
.unwrap();
let plan = session_ctx.optimize(&plan).unwrap();
.await?;
let plan = dataframe.to_optimized_plan()?;
let actual = format!("{}", plan.display_indent());
let expected = "\
Explain\
Expand All @@ -507,12 +505,10 @@ mod tests {
\n TableScan: abc projection=[column1, column2, column3]";
assert_eq!(expected, actual);

let plan = session_ctx
let dataframe = session_ctx
.sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc WHERE column2 = 5")
.await?
.to_logical_plan()
.unwrap();
let plan = session_ctx.optimize(&plan).unwrap();
.await?;
let plan = dataframe.to_optimized_plan()?;
let actual = format!("{}", plan.display_indent());
let expected = "\
Explain\
Expand All @@ -522,12 +518,10 @@ mod tests {
\n TableScan: abc projection=[column1, column2, column3]";
assert_eq!(expected, actual);

let plan = session_ctx
let dataframe = session_ctx
.sql("EXPLAIN CREATE VIEW xyz AS SELECT column1, column2 FROM abc WHERE column2 = 5")
.await?
.to_logical_plan()
.unwrap();
let plan = session_ctx.optimize(&plan).unwrap();
.await?;
let plan = dataframe.to_optimized_plan()?;
let actual = format!("{}", plan.display_indent());
let expected = "\
Explain\
Expand Down
43 changes: 28 additions & 15 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,21 @@ impl SessionContext {
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
let plan = self.create_logical_plan(sql)?;
let mut statements = DFParser::parse_sql(sql)?;
if statements.len() != 1 {
return Err(DataFusionError::NotImplemented(
"The context currently only supports a single SQL statement".to_string(),
));
}

// create a query planner
let plan = {
// TODO: Move catalog off SessionState onto SessionContext
let state = self.state.read();
let query_planner = SqlToRel::new(&*state);
query_planner.statement_to_plan(statements.pop_front().unwrap())?
};

match plan {
LogicalPlan::CreateExternalTable(cmd) => {
self.create_external_table(&cmd).await
Expand Down Expand Up @@ -553,6 +567,7 @@ impl SessionContext {
/// Creates a logical plan.
///
/// This function is intended for internal use and should not be called directly.
#[deprecated(note = "Use SessionContext::sql which snapshots the SessionState")]
pub fn create_logical_plan(&self, sql: &str) -> Result<LogicalPlan> {
let mut statements = DFParser::parse_sql(sql)?;

Expand Down Expand Up @@ -1791,9 +1806,10 @@ impl SessionState {
&self,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let planner = self.query_planner.clone();
let logical_plan = self.optimize(logical_plan)?;
planner.create_physical_plan(&logical_plan, self).await
self.query_planner
.create_physical_plan(&logical_plan, self)
.await
}

/// return the configuration options
Expand Down Expand Up @@ -2046,8 +2062,7 @@ mod tests {
use std::fs::File;
use std::path::PathBuf;
use std::sync::Weak;
use std::thread::{self, JoinHandle};
use std::{env, io::prelude::*, sync::Mutex};
use std::{env, io::prelude::*};
use tempfile::TempDir;

#[tokio::test]
Expand Down Expand Up @@ -2267,23 +2282,21 @@ mod tests {
// environment. Usecase is for concurrent planing.
let tmp_dir = TempDir::new()?;
let partition_count = 4;
let ctx = Arc::new(Mutex::new(create_ctx(&tmp_dir, partition_count).await?));
let ctx = Arc::new(create_ctx(&tmp_dir, partition_count).await?);

let threads: Vec<JoinHandle<Result<_>>> = (0..2)
let threads: Vec<_> = (0..2)
.map(|_| ctx.clone())
.map(|ctx_clone| {
thread::spawn(move || {
let ctx = ctx_clone.lock().expect("Locked context");
.map(|ctx| {
tokio::spawn(async move {
// Ensure we can create logical plan code on a separate thread.
ctx.create_logical_plan(
"SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3",
)
ctx.sql("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")
.await
})
})
.collect();

for thread in threads {
thread.join().expect("Failed to join thread")?;
for handle in threads {
handle.await.unwrap().unwrap();
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,8 @@ mod tests {
let partition = create_vec_batches(&schema, 10);
let table = MemTable::try_new(schema, vec![partition])?;
ctx.register_table("a", Arc::new(table))?;
let plan = ctx.create_logical_plan("SELECT * FROM a WHERE c0 < 1")?;
ctx.create_physical_plan(&plan).await
let dataframe = ctx.sql("SELECT * FROM a WHERE c0 < 1").await?;
dataframe.create_physical_plan().await
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
7 changes: 2 additions & 5 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,8 @@ pub fn with_new_children_if_necessary(
/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await.unwrap();
///
/// // create a plan to run a SQL query
/// let plan = ctx
/// .create_logical_plan("SELECT a FROM example WHERE a < 5")
/// .unwrap();
/// let plan = ctx.optimize(&plan).unwrap();
/// let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
/// let dataframe = ctx.sql("SELECT a FROM example WHERE a < 5").await.unwrap();
/// let physical_plan = dataframe.create_physical_plan().await.unwrap();
///
/// // Format using display string
/// let displayable_plan = displayable(physical_plan.as_ref());
Expand Down
13 changes: 7 additions & 6 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2224,10 +2224,11 @@ mod tests {
let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
let ctx = SessionContext::new();

let logical_plan =
LogicalPlanBuilder::from(ctx.read_table(Arc::new(table))?.to_logical_plan()?)
.aggregate(vec![col("d1")], vec![sum(col("d2"))])?
.build()?;
let logical_plan = LogicalPlanBuilder::from(
ctx.read_table(Arc::new(table))?.to_optimized_plan()?,
)
.aggregate(vec![col("d1")], vec![sum(col("d2"))])?
.build()?;

let execution_plan = plan(&logical_plan).await?;
let formatted = format!("{:?}", execution_plan);
Expand Down Expand Up @@ -2435,7 +2436,7 @@ mod tests {
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
let logical_plan = match ctx.read_csv(path, options).await?.to_logical_plan()? {
let logical_plan = match ctx.read_csv(path, options).await?.to_optimized_plan()? {
LogicalPlan::TableScan(ref scan) => {
let mut scan = scan.clone();
scan.table_name = name.to_string();
Expand All @@ -2458,7 +2459,7 @@ mod tests {
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
Ok(LogicalPlanBuilder::from(
ctx.read_csv(path, options).await?.to_logical_plan()?,
ctx.read_csv(path, options).await?.to_optimized_plan()?,
))
}
}
4 changes: 2 additions & 2 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ async fn custom_source_dataframe() -> Result<()> {
let ctx = SessionContext::new();

let table = ctx.read_table(Arc::new(CustomTableProvider))?;
let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()?)
let logical_plan = LogicalPlanBuilder::from(table.to_optimized_plan()?)
.project(vec![col("c2")])?
.build()?;

Expand Down Expand Up @@ -262,7 +262,7 @@ async fn optimizers_catch_all_statistics() {
.unwrap();

let physical_plan = ctx
.create_physical_plan(&df.to_logical_plan().unwrap())
.create_physical_plan(&df.to_optimized_plan().unwrap())
.await
.unwrap();

Expand Down
13 changes: 4 additions & 9 deletions datafusion/core/tests/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,8 @@ async fn parquet_statistics() -> Result<()> {
.await;

//// NO PROJECTION ////
let logical_plan = ctx.sql("SELECT * FROM t").await?.to_logical_plan()?;

let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
let dataframe = ctx.sql("SELECT * FROM t").await?;
let physical_plan = dataframe.create_physical_plan().await?;
assert_eq!(physical_plan.schema().fields().len(), 4);

let stat_cols = physical_plan
Expand All @@ -466,12 +465,8 @@ async fn parquet_statistics() -> Result<()> {
assert_eq!(stat_cols[3], ColumnStatistics::default());

//// WITH PROJECTION ////
let logical_plan = ctx
.sql("SELECT mycol, day FROM t WHERE day='28'")
.await?
.to_logical_plan()?;

let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
let dataframe = ctx.sql("SELECT mycol, day FROM t WHERE day='28'").await?;
let physical_plan = dataframe.create_physical_plan().await?;
assert_eq!(physical_plan.schema().fields().len(), 2);

let stat_cols = physical_plan
Expand Down
Loading

0 comments on commit 9c9444b

Please sign in to comment.