Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate SessionContext::create_logical_plan (#4617) #4679

Merged
merged 6 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use datafusion::arrow::util::pretty;
use datafusion::error::Result;
use datafusion::execution::context::{SessionConfig, SessionContext};

use datafusion::physical_plan::collect;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use structopt::StructOpt;

Expand Down Expand Up @@ -119,14 +118,11 @@ async fn datafusion_sql_benchmarks(
}

async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result<()> {
let plan = ctx.create_logical_plan(sql)?;
let plan = ctx.optimize(&plan)?;
let dataframe = ctx.sql(sql).await?;
if debug {
println!("Optimized logical plan:\n{:?}", plan);
println!("Optimized logical plan:\n{:?}", dataframe.logical_plan());
}
let physical_plan = ctx.create_physical_plan(&plan).await?;
let task_ctx = ctx.task_ctx();
let result = collect(physical_plan, task_ctx).await?;
Comment on lines -122 to -129
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As ctx is interior mutable this would effectively plan, optimize, and execute potentially against at least 3 different states 😱

let result = dataframe.collect().await?;
if debug {
pretty::print_batches(&result)?;
}
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ mod tests {
let sql = get_query_sql(query)?;
for sql in &sql {
let df = ctx.sql(sql.as_str()).await?;
let plan = df.to_logical_plan()?;
let plan = df.to_optimized_plan()?;
if !actual.is_empty() {
actual += "\n";
}
Expand Down
3 changes: 1 addition & 2 deletions benchmarks/src/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,7 @@ pub async fn convert_tbl(
}

// create the physical plan
let csv = csv.to_logical_plan()?;
let csv = ctx.create_physical_plan(&csv).await?;
let csv = csv.create_physical_plan().await?;

let output_path = output_root_path.join(table);
let output_path = output_path.to_str().unwrap().to_owned();
Expand Down
48 changes: 38 additions & 10 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use parquet::file::properties::WriterProperties;

use datafusion_common::{Column, DFSchema};
use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_expr::TableProviderFilterPushDown;

use crate::arrow::datatypes::Schema;
Expand Down Expand Up @@ -505,17 +505,40 @@ impl DataFrame {
self.plan.schema()
}

/// Return the unoptimized logical plan represented by this DataFrame.
/// Return the unoptimized logical plan
pub fn logical_plan(&self) -> &LogicalPlan {
&self.plan
}

/// Return the logical plan represented by this DataFrame without running the optimizers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
I wonder if we should call it into_unoptimized_plan for consistency 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

///
/// Note: This method should not be used outside testing, as it loses the snapshot
/// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent
/// operations may take place against a different state
pub fn to_unoptimized_plan(self) -> LogicalPlan {
self.plan
}

/// Return the optimized logical plan represented by this DataFrame.
pub fn to_logical_plan(self) -> Result<LogicalPlan> {
///
/// Note: This method should not be used outside testing, as it loses the snapshot
/// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent
/// operations may take place against a different state
pub fn to_optimized_plan(self) -> Result<LogicalPlan> {
tustvold marked this conversation as resolved.
Show resolved Hide resolved
// Optimize the plan first for better UX
self.session_state.optimize(&self.plan)
}

/// Return the optimized logical plan represented by this DataFrame.
///
/// Note: This method should not be used outside testing, as it loses the snapshot
/// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent
/// operations may take place against a different state
#[deprecated(note = "Use DataFrame::to_optimized_plan")]
Copy link
Contributor Author

@tustvold tustvold Dec 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted to deprecate this method as it was a touch confusing, having a method that actually performed a potentially mutating optimization pass without explicitly calling it out

pub fn to_logical_plan(self) -> Result<LogicalPlan> {
self.to_optimized_plan()
}

/// Return a DataFrame with the explanation of its plan so far.
///
/// if `analyze` is specified, runs the plan and reports metrics
Expand Down Expand Up @@ -714,6 +737,12 @@ impl DataFrame {
}
}

/// Convert a prepare logical plan into its inner logical plan with all params replaced with their corresponding values
pub fn with_param_values(self, param_values: Vec<ScalarValue>) -> Result<Self> {
let plan = self.plan.with_param_values(param_values)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to move this off LogicalPlan and onto LogicalPlanBuilder in a subsequent PR, to be consistent with the rest of the system

Ok(Self::new(self.session_state, plan))
}

/// Cache DataFrame as a memory table.
///
/// ```
Expand Down Expand Up @@ -1014,11 +1043,10 @@ mod tests {
let df = df.select(vec![expr])?;

// build query using SQL
let sql_plan =
ctx.create_logical_plan("SELECT my_fn(c12) FROM aggregate_test_100")?;
let sql_plan = ctx.sql("SELECT my_fn(c12) FROM aggregate_test_100").await?;

// the two plans should be identical
assert_same_plan(&df.plan, &sql_plan);
assert_same_plan(&df.plan, &sql_plan.plan);

Ok(())
}
Expand Down Expand Up @@ -1128,7 +1156,7 @@ mod tests {
async fn create_plan(sql: &str) -> Result<LogicalPlan> {
let mut ctx = SessionContext::new();
register_aggregate_csv(&mut ctx, "aggregate_test_100").await?;
ctx.create_logical_plan(sql)
Ok(ctx.sql(sql).await?.to_unoptimized_plan())
}

async fn test_table_with_name(name: &str) -> Result<DataFrame> {
Expand Down Expand Up @@ -1308,7 +1336,7 @@ mod tests {
\n Inner Join: t1.c1 = t2.c1\
\n TableScan: t1\
\n TableScan: t2",
format!("{:?}", df_renamed.clone().to_unoptimized_plan())
format!("{:?}", df_renamed.logical_plan())
);

assert_eq!("\
Expand All @@ -1322,7 +1350,7 @@ mod tests {
\n SubqueryAlias: t2\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
format!("{:?}", df_renamed.clone().to_logical_plan()?)
format!("{:?}", df_renamed.clone().to_optimized_plan()?)
);

let df_results = df_renamed.collect().await?;
Expand Down Expand Up @@ -1469,7 +1497,7 @@ mod tests {

assert_eq!(
"TableScan: ?table? projection=[c2, c3, sum]",
format!("{:?}", cached_df.clone().to_logical_plan()?)
format!("{:?}", cached_df.clone().to_optimized_plan()?)
);

let df_results = df.collect().await?;
Expand Down
24 changes: 9 additions & 15 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,10 @@ mod tests {
let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
session_ctx.sql(view_sql).await?.collect().await?;

let plan = session_ctx
let dataframe = session_ctx
.sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc")
.await?
.to_logical_plan()
.unwrap();
let plan = session_ctx.optimize(&plan).unwrap();
.await?;
let plan = dataframe.to_optimized_plan()?;
let actual = format!("{}", plan.display_indent());
let expected = "\
Explain\
Expand All @@ -507,12 +505,10 @@ mod tests {
\n TableScan: abc projection=[column1, column2, column3]";
assert_eq!(expected, actual);

let plan = session_ctx
let dataframe = session_ctx
.sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc WHERE column2 = 5")
.await?
.to_logical_plan()
.unwrap();
let plan = session_ctx.optimize(&plan).unwrap();
.await?;
let plan = dataframe.to_optimized_plan()?;
let actual = format!("{}", plan.display_indent());
let expected = "\
Explain\
Expand All @@ -522,12 +518,10 @@ mod tests {
\n TableScan: abc projection=[column1, column2, column3]";
assert_eq!(expected, actual);

let plan = session_ctx
let dataframe = session_ctx
.sql("EXPLAIN CREATE VIEW xyz AS SELECT column1, column2 FROM abc WHERE column2 = 5")
.await?
.to_logical_plan()
.unwrap();
let plan = session_ctx.optimize(&plan).unwrap();
.await?;
let plan = dataframe.to_optimized_plan()?;
let actual = format!("{}", plan.display_indent());
let expected = "\
Explain\
Expand Down
43 changes: 28 additions & 15 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,21 @@ impl SessionContext {
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
let plan = self.create_logical_plan(sql)?;
let mut statements = DFParser::parse_sql(sql)?;
if statements.len() != 1 {
return Err(DataFusionError::NotImplemented(
"The context currently only supports a single SQL statement".to_string(),
));
}

// create a query planner
let plan = {
// TODO: Move catalog off SessionState onto SessionContext
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This somewhat relates to #4607 but also serves avoids issues due to the interior mutability of CatalogList

let state = self.state.read();
let query_planner = SqlToRel::new(&*state);
query_planner.statement_to_plan(statements.pop_front().unwrap())?
};

match plan {
LogicalPlan::CreateExternalTable(cmd) => {
self.create_external_table(&cmd).await
Expand Down Expand Up @@ -553,6 +567,7 @@ impl SessionContext {
/// Creates a logical plan.
///
/// This function is intended for internal use and should not be called directly.
#[deprecated(note = "Use SessionContext::sql which snapshots the SessionState")]
pub fn create_logical_plan(&self, sql: &str) -> Result<LogicalPlan> {
let mut statements = DFParser::parse_sql(sql)?;

Expand Down Expand Up @@ -1791,9 +1806,10 @@ impl SessionState {
&self,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let planner = self.query_planner.clone();
let logical_plan = self.optimize(logical_plan)?;
planner.create_physical_plan(&logical_plan, self).await
self.query_planner
.create_physical_plan(&logical_plan, self)
.await
}

/// return the configuration options
Expand Down Expand Up @@ -2046,8 +2062,7 @@ mod tests {
use std::fs::File;
use std::path::PathBuf;
use std::sync::Weak;
use std::thread::{self, JoinHandle};
use std::{env, io::prelude::*, sync::Mutex};
use std::{env, io::prelude::*};
use tempfile::TempDir;

#[tokio::test]
Expand Down Expand Up @@ -2267,23 +2282,21 @@ mod tests {
// environment. Usecase is for concurrent planing.
let tmp_dir = TempDir::new()?;
let partition_count = 4;
let ctx = Arc::new(Mutex::new(create_ctx(&tmp_dir, partition_count).await?));
let ctx = Arc::new(create_ctx(&tmp_dir, partition_count).await?);

let threads: Vec<JoinHandle<Result<_>>> = (0..2)
let threads: Vec<_> = (0..2)
.map(|_| ctx.clone())
.map(|ctx_clone| {
thread::spawn(move || {
let ctx = ctx_clone.lock().expect("Locked context");
.map(|ctx| {
tokio::spawn(async move {
// Ensure we can create logical plan code on a separate thread.
ctx.create_logical_plan(
"SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3",
)
ctx.sql("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")
.await
})
})
.collect();

for thread in threads {
thread.join().expect("Failed to join thread")?;
for handle in threads {
handle.await.unwrap().unwrap();
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,8 @@ mod tests {
let partition = create_vec_batches(&schema, 10);
let table = MemTable::try_new(schema, vec![partition])?;
ctx.register_table("a", Arc::new(table))?;
let plan = ctx.create_logical_plan("SELECT * FROM a WHERE c0 < 1")?;
ctx.create_physical_plan(&plan).await
let dataframe = ctx.sql("SELECT * FROM a WHERE c0 < 1").await?;
dataframe.create_physical_plan().await
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
7 changes: 2 additions & 5 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,8 @@ pub fn with_new_children_if_necessary(
/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await.unwrap();
///
/// // create a plan to run a SQL query
/// let plan = ctx
/// .create_logical_plan("SELECT a FROM example WHERE a < 5")
/// .unwrap();
/// let plan = ctx.optimize(&plan).unwrap();
/// let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
/// let dataframe = ctx.sql("SELECT a FROM example WHERE a < 5").await.unwrap();
/// let physical_plan = dataframe.create_physical_plan().await.unwrap();
///
/// // Format using display string
/// let displayable_plan = displayable(physical_plan.as_ref());
Expand Down
13 changes: 7 additions & 6 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2224,10 +2224,11 @@ mod tests {
let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
let ctx = SessionContext::new();

let logical_plan =
LogicalPlanBuilder::from(ctx.read_table(Arc::new(table))?.to_logical_plan()?)
.aggregate(vec![col("d1")], vec![sum(col("d2"))])?
.build()?;
let logical_plan = LogicalPlanBuilder::from(
ctx.read_table(Arc::new(table))?.to_optimized_plan()?,
)
.aggregate(vec![col("d1")], vec![sum(col("d2"))])?
.build()?;

let execution_plan = plan(&logical_plan).await?;
let formatted = format!("{:?}", execution_plan);
Expand Down Expand Up @@ -2435,7 +2436,7 @@ mod tests {
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
let logical_plan = match ctx.read_csv(path, options).await?.to_logical_plan()? {
let logical_plan = match ctx.read_csv(path, options).await?.to_optimized_plan()? {
LogicalPlan::TableScan(ref scan) => {
let mut scan = scan.clone();
scan.table_name = name.to_string();
Expand All @@ -2458,7 +2459,7 @@ mod tests {
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);
Ok(LogicalPlanBuilder::from(
ctx.read_csv(path, options).await?.to_logical_plan()?,
ctx.read_csv(path, options).await?.to_optimized_plan()?,
))
}
}
4 changes: 2 additions & 2 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ async fn custom_source_dataframe() -> Result<()> {
let ctx = SessionContext::new();

let table = ctx.read_table(Arc::new(CustomTableProvider))?;
let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()?)
let logical_plan = LogicalPlanBuilder::from(table.to_optimized_plan()?)
.project(vec![col("c2")])?
.build()?;

Expand Down Expand Up @@ -262,7 +262,7 @@ async fn optimizers_catch_all_statistics() {
.unwrap();

let physical_plan = ctx
.create_physical_plan(&df.to_logical_plan().unwrap())
.create_physical_plan(&df.to_optimized_plan().unwrap())
.await
.unwrap();

Expand Down
13 changes: 4 additions & 9 deletions datafusion/core/tests/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,8 @@ async fn parquet_statistics() -> Result<()> {
.await;

//// NO PROJECTION ////
let logical_plan = ctx.sql("SELECT * FROM t").await?.to_logical_plan()?;

let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
let dataframe = ctx.sql("SELECT * FROM t").await?;
let physical_plan = dataframe.create_physical_plan().await?;
assert_eq!(physical_plan.schema().fields().len(), 4);

let stat_cols = physical_plan
Expand All @@ -466,12 +465,8 @@ async fn parquet_statistics() -> Result<()> {
assert_eq!(stat_cols[3], ColumnStatistics::default());

//// WITH PROJECTION ////
let logical_plan = ctx
.sql("SELECT mycol, day FROM t WHERE day='28'")
.await?
.to_logical_plan()?;

let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
let dataframe = ctx.sql("SELECT mycol, day FROM t WHERE day='28'").await?;
let physical_plan = dataframe.create_physical_plan().await?;
assert_eq!(physical_plan.schema().fields().len(), 2);

let stat_cols = physical_plan
Expand Down
Loading