Skip to content

Commit

Permalink
Non-deprecated support for planning SQL without DDL
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 23, 2022
1 parent 2f5b25d commit b9ff071
Show file tree
Hide file tree
Showing 14 changed files with 159 additions and 117 deletions.
7 changes: 3 additions & 4 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,18 +324,17 @@ async fn execute_query(
debug: bool,
enable_scheduler: bool,
) -> Result<Vec<RecordBatch>> {
let plan = ctx.sql(sql).await?;
let plan = plan.into_unoptimized_plan();
let plan = ctx.sql(sql).await?.into_unoptimized_plan();

if debug {
println!("=== Logical plan ===\n{:?}\n", plan);
}

let plan = ctx.optimize(&plan)?;
let plan = ctx.dataframe(plan).await?.into_optimized_plan()?;
if debug {
println!("=== Optimized logical plan ===\n{:?}\n", plan);
}
let physical_plan = ctx.create_physical_plan(&plan).await?;
let physical_plan = ctx.dataframe(plan).await?.create_physical_plan().await?;
if debug {
println!(
"=== Physical plan ===\n{}\n",
Expand Down
90 changes: 47 additions & 43 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,31 @@ impl SessionContext {
self.state.read().config.clone()
}

/// Creates a [`DataFrame`] that will execute a SQL query.
/// Creates a [`DataFrame`] that executes a SQL query supported by
/// DataFusion, including DDL (such as `CREATE TABLE`).
///
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred.
/// You can use [`Self::plan_sql`] and
/// [`DataFrame::create_physical_plan`] directly if you need read only
/// query support (no way to create external tables, for example)
///
/// This method is `async` because queries of type `CREATE
/// EXTERNAL TABLE` might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
let mut statements = DFParser::parse_sql(sql)?;
if statements.len() != 1 {
return Err(DataFusionError::NotImplemented(
"The context currently only supports a single SQL statement".to_string(),
));
}

// create a query planner
let plan = {
// TODO: Move catalog off SessionState onto SessionContext
let state = self.state.read();
let query_planner = SqlToRel::new(&*state);
query_planner.statement_to_plan(statements.pop_front().unwrap())?
};
let plan = self.plan_sql(sql)?;
self.dataframe(plan).await
}

/// Creates a [`DataFrame`] that will execute the specified
/// LogicalPlan, including DDL (such as `CREATE TABLE`).
/// Use [`Self::dataframe_without_ddl`] if you do not want
/// to support DDL statements.
///
/// Any DDL statements are executed during this function (not when
/// the [`DataFrame`] is evaluated)
///
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred by performing I/O.
pub async fn dataframe(&self, plan: LogicalPlan) -> Result<DataFrame> {
match plan {
LogicalPlan::CreateExternalTable(cmd) => {
self.create_external_table(&cmd).await
Expand Down Expand Up @@ -492,6 +497,15 @@ impl SessionContext {
}
}

/// Creates a [`DataFrame`] that will execute the specified
/// LogicalPlan, but will error if the plans represent DDL such as
/// `CREATE TABLE`
///
/// Use [`Self::dataframe`] to run plans with DDL
pub fn dataframe_without_ddl(&self, plan: LogicalPlan) -> Result<DataFrame> {
Ok(DataFrame::new(self.state(), plan))
}

// return an empty dataframe
fn return_empty_dataframe(&self) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::empty(false).build()?;
Expand Down Expand Up @@ -559,11 +573,9 @@ impl SessionContext {
}
Ok(false)
}
/// Creates a logical plan.
///
/// This function is intended for internal use and should not be called directly.
#[deprecated(note = "Use SessionContext::sql which snapshots the SessionState")]
pub fn create_logical_plan(&self, sql: &str) -> Result<LogicalPlan> {

/// Creates a [`LogicalPlan`] from a SQL query.
pub fn plan_sql(&self, sql: &str) -> Result<LogicalPlan> {
let mut statements = DFParser::parse_sql(sql)?;

if statements.len() != 1 {
Expand Down Expand Up @@ -1000,32 +1012,24 @@ impl SessionContext {
}

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
self.state.read().optimize(plan)
#[deprecated(
note = "Use `SessionContext::dataframe_without_ddl` and `DataFrame::into_optimized_plan`"
)]
pub fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
self.dataframe_without_ddl(plan)?.into_optimized_plan()
}

/// Creates a physical plan from a logical plan.
/// Creates a physical [`ExecutionPlan`] from a [`LogicalPlan`].
#[deprecated(
note = "Use `SessionContext::::dataframe_without_ddl` and `DataFrame::create_physical_plan`"
)]
pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
logical_plan: LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let state_cloned = {
let mut state = self.state.write();
state.execution_props.start_execution();

// We need to clone `state` to release the lock that is not `Send`. We could
// make the lock `Send` by using `tokio::sync::Mutex`, but that would require to
// propagate async even to the `LogicalPlan` building methods.
// Cloning `state` here is fine as we then pass it as immutable `&state`, which
// means that we avoid write consistency issues as the cloned version will not
// be written to. As for eventual modifications that would be applied to the
// original state after it has been cloned, they will not be picked up by the
// clone but that is okay, as it is equivalent to postponing the state update
// by keeping the lock until the end of the function scope.
state.clone()
};

state_cloned.create_physical_plan(logical_plan).await
self.dataframe_without_ddl(logical_plan)?
.create_physical_plan()
.await
}

/// Executes a query and writes the results to a partitioned CSV file.
Expand Down
20 changes: 10 additions & 10 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,15 +1097,15 @@ impl DefaultPhysicalPlanner {
// TABLE" -- it must be handled at a higher level (so
// that the appropriate table can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateExternalTable".to_string(),
))
}
LogicalPlan::Prepare(_) => {
// There is no default plan for "PREPARE" -- it must be
// handled at a higher level (so that the appropriate
// statement can be prepared)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: Prepare".to_string(),
))
}
Expand All @@ -1114,7 +1114,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateCatalogSchema".to_string(),
))
}
Expand All @@ -1123,7 +1123,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateCatalog".to_string(),
))
}
Expand All @@ -1132,7 +1132,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateMemoryTable".to_string(),
))
}
Expand All @@ -1141,7 +1141,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: DropTable".to_string(),
))
}
Expand All @@ -1150,7 +1150,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: DropView".to_string(),
))
}
Expand All @@ -1159,16 +1159,16 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateView".to_string(),
))
}
LogicalPlan::SetVariable(_) => {
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: SetVariable must be root of the plan".to_string(),
))
}
LogicalPlan::Explain(_) => Err(DataFusionError::Internal(
LogicalPlan::Explain(_) => Err(DataFusionError::Plan(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
)),
LogicalPlan::Analyze(a) => {
Expand Down
14 changes: 8 additions & 6 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ async fn custom_source_dataframe() -> Result<()> {
.project(vec![col("c2")])?
.build()?;

let optimized_plan = ctx.optimize(&logical_plan)?;
let optimized_plan = ctx.dataframe(logical_plan).await?.into_optimized_plan()?;

match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
Expand All @@ -235,7 +236,11 @@ async fn custom_source_dataframe() -> Result<()> {
);
assert_eq!(format!("{:?}", optimized_plan), expected);

let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
let physical_plan = ctx
.dataframe(optimized_plan)
.await?
.create_physical_plan()
.await?;

assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
Expand All @@ -261,10 +266,7 @@ async fn optimizers_catch_all_statistics() {
.await
.unwrap();

let physical_plan = ctx
.create_physical_plan(&df.into_optimized_plan().unwrap())
.await
.unwrap();
let physical_plan = df.create_physical_plan().await.unwrap();

// when the optimization kicks in, the source is replaced by an EmptyExec
assert!(
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,12 @@ impl ContextWithParquet {
.expect("getting input");
let pretty_input = pretty_format_batches(&input).unwrap().to_string();

let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan");

let physical_plan = self
.ctx
.create_physical_plan(&logical_plan)
.dataframe(logical_plan)
.await
.expect("planning")
.create_physical_plan()
.await
.expect("creating physical plan");

Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,9 +1120,8 @@ async fn aggregate_with_alias() -> Result<()> {
.project(vec![col("c1"), sum(col("c2")).alias("total_salary")])?
.build()?;

let plan = ctx.optimize(&plan)?;
let physical_plan = ctx.dataframe(plan).await?.create_physical_plan().await?;

let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
assert_eq!("c1", physical_plan.schema().field(0).name().as_str());
assert_eq!(
"total_salary",
Expand Down
45 changes: 26 additions & 19 deletions datafusion/core/tests/sql/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,39 +135,46 @@ async fn invalid_qualified_table_references() -> Result<()> {
}

#[tokio::test]
#[allow(deprecated)] // TODO: Remove this test once create_logical_plan removed
/// This test demonstrates it is posible to run SQL queries in DataFusion without
/// any DDL Support (such as CREATE TABLE / CREATE VIEW, ETC).
async fn unsupported_sql_returns_error() -> Result<()> {
let ctx = SessionContext::new();
register_aggregate_csv(&ctx).await?;
// create view
let sql = "create view test_view as select * from aggregate_test_100";
let plan = ctx.create_logical_plan(sql);
let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await;
assert!(physical_plan.is_err());
let plan = ctx.plan_sql(sql)?;
let err = ctx
.dataframe_without_ddl(plan)?
.create_physical_plan()
.await
.unwrap_err();
assert_eq!(
format!("{}", physical_plan.unwrap_err()),
"Internal error: Unsupported logical plan: CreateView. \
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker"
err.to_string(),
"Error during planning: Unsupported logical plan: CreateView"
);
// // drop view
let sql = "drop view test_view";
let plan = ctx.create_logical_plan(sql);
let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await;
assert!(physical_plan.is_err());
let plan = ctx.plan_sql(sql)?;
let err = ctx
.dataframe_without_ddl(plan)?
.create_physical_plan()
.await
.unwrap_err();
assert_eq!(
format!("{}", physical_plan.unwrap_err()),
"Internal error: Unsupported logical plan: DropView. \
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker"
err.to_string(),
"Error during planning: Unsupported logical plan: DropView"
);
// // drop table
let sql = "drop table aggregate_test_100";
let plan = ctx.create_logical_plan(sql);
let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await;
assert!(physical_plan.is_err());
let plan = ctx.plan_sql(sql)?;
let err = ctx
.dataframe_without_ddl(plan)?
.create_physical_plan()
.await
.unwrap_err();
assert_eq!(
format!("{}", physical_plan.unwrap_err()),
"Internal error: Unsupported logical plan: DropTable. \
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker"
err.to_string(),
"Error during planning: Unsupported logical plan: DropTable"
);
Ok(())
}
6 changes: 5 additions & 1 deletion datafusion/core/tests/sql/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ fn optimize_explain() {
}

// now optimize the plan and expect to see more plans
let optimized_plan = SessionContext::new().optimize(&plan).unwrap();
let optimized_plan = SessionContext::new()
.dataframe_without_ddl(plan.clone())
.unwrap()
.into_optimized_plan()
.unwrap();
if let LogicalPlan::Explain(e) = &optimized_plan {
// should have more than one plan
assert!(
Expand Down
Loading

0 comments on commit b9ff071

Please sign in to comment.