Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-deprecated support for planning SQL without DDL, deprecate some more SessionContext methods #4721

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,18 +324,17 @@ async fn execute_query(
debug: bool,
enable_scheduler: bool,
) -> Result<Vec<RecordBatch>> {
let plan = ctx.sql(sql).await?;
let plan = plan.into_unoptimized_plan();
let plan = ctx.sql(sql).await?.into_unoptimized_plan();

if debug {
println!("=== Logical plan ===\n{:?}\n", plan);
}

let plan = ctx.optimize(&plan)?;
let plan = ctx.dataframe(plan).await?.into_optimized_plan()?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the new pattern to get an optimized plan (rather than calling ctx.optimize directly

if debug {
println!("=== Optimized logical plan ===\n{:?}\n", plan);
}
let physical_plan = ctx.create_physical_plan(&plan).await?;
let physical_plan = ctx.dataframe(plan).await?.create_physical_plan().await?;
if debug {
println!(
"=== Physical plan ===\n{}\n",
Expand Down
90 changes: 47 additions & 43 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,31 @@ impl SessionContext {
self.state.read().config.clone()
}

/// Creates a [`DataFrame`] that will execute a SQL query.
/// Creates a [`DataFrame`] that executes a SQL query supported by
/// DataFusion, including DDL (such as `CREATE TABLE`).
///
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred.
/// You can use [`Self::plan_sql`] and
/// [`DataFrame::create_physical_plan`] directly if you need read only
/// query support (no way to create external tables, for example)
///
/// This method is `async` because queries of type `CREATE
/// EXTERNAL TABLE` might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the core SessionContext::sql API does not change

let mut statements = DFParser::parse_sql(sql)?;
if statements.len() != 1 {
return Err(DataFusionError::NotImplemented(
"The context currently only supports a single SQL statement".to_string(),
));
}

// create a query planner
let plan = {
// TODO: Move catalog off SessionState onto SessionContext
let state = self.state.read();
let query_planner = SqlToRel::new(&*state);
query_planner.statement_to_plan(statements.pop_front().unwrap())?
};
let plan = self.plan_sql(sql)?;
self.dataframe(plan).await
}

/// Creates a [`DataFrame`] that will execute the specified
/// LogicalPlan, including DDL (such as `CREATE TABLE`).
/// Use [`Self::dataframe_without_ddl`] if you do not want
/// to support DDL statements.
///
/// Any DDL statements are executed during this function (not when
/// the [`DataFrame`] is evaluated)
///
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred by performing I/O.
pub async fn dataframe(&self, plan: LogicalPlan) -> Result<DataFrame> {
match plan {
LogicalPlan::CreateExternalTable(cmd) => {
self.create_external_table(&cmd).await
Expand Down Expand Up @@ -492,6 +497,15 @@ impl SessionContext {
}
}

/// Creates a [`DataFrame`] that will execute the specified
/// LogicalPlan, but will error if the plans represent DDL such as
/// `CREATE TABLE`
///
/// Use [`Self::dataframe`] to run plans with DDL
pub fn dataframe_without_ddl(&self, plan: LogicalPlan) -> Result<DataFrame> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the API to support #4720

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to just be DataFrame new, do we really need this?

Ok(DataFrame::new(self.state(), plan))
}

// return an empty dataframe
fn return_empty_dataframe(&self) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::empty(false).build()?;
Expand Down Expand Up @@ -559,11 +573,9 @@ impl SessionContext {
}
Ok(false)
}
/// Creates a logical plan.
///
/// This function is intended for internal use and should not be called directly.
#[deprecated(note = "Use SessionContext::sql which snapshots the SessionState")]
pub fn create_logical_plan(&self, sql: &str) -> Result<LogicalPlan> {

/// Creates a [`LogicalPlan`] from a SQL query.
pub fn plan_sql(&self, sql: &str) -> Result<LogicalPlan> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again this appears to just call DFParser followed by the query planner

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also problematic for the same reason that create_logical_plan is problematic - it returns a LogicalPlan without any mechanism to optimise/execute against the same state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again this appears to just call DFParser followed by the query planner

Yes that is exactly what it does.

There needs to be some way for users to create a LogicalPlan and get datafusion to optimize and run it properly it (e.g. if the user makes the LogicalPlan directly from their own query language such as influxrpc or VegaFusion)

let mut statements = DFParser::parse_sql(sql)?;

if statements.len() != 1 {
Expand Down Expand Up @@ -1000,32 +1012,24 @@ impl SessionContext {
}

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
self.state.read().optimize(plan)
#[deprecated(
note = "Use `SessionContext::dataframe_without_ddl` and `DataFrame::into_optimized_plan`"
)]
pub fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
self.dataframe_without_ddl(plan)?.into_optimized_plan()
}

/// Creates a physical plan from a logical plan.
/// Creates a physical [`ExecutionPlan`] from a [`LogicalPlan`].
#[deprecated(
note = "Use `SessionContext::::dataframe_without_ddl` and `DataFrame::create_physical_plan`"
)]
pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
logical_plan: LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let state_cloned = {
let mut state = self.state.write();
state.execution_props.start_execution();

// We need to clone `state` to release the lock that is not `Send`. We could
// make the lock `Send` by using `tokio::sync::Mutex`, but that would require to
// propagate async even to the `LogicalPlan` building methods.
// Cloning `state` here is fine as we then pass it as immutable `&state`, which
// means that we avoid write consistency issues as the cloned version will not
// be written to. As for eventual modifications that would be applied to the
// original state after it has been cloned, they will not be picked up by the
// clone but that is okay, as it is equivalent to postponing the state update
// by keeping the lock until the end of the function scope.
state.clone()
};

state_cloned.create_physical_plan(logical_plan).await
self.dataframe_without_ddl(logical_plan)?
.create_physical_plan()
.await
}

/// Executes a query and writes the results to a partitioned CSV file.
Expand Down
20 changes: 10 additions & 10 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,15 +1097,15 @@ impl DefaultPhysicalPlanner {
// TABLE" -- it must be handled at a higher level (so
// that the appropriate table can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not really "internal errors" as they can be triggered by trying to run a sql query that contains DDL

Copy link
Contributor

@tustvold tustvold Dec 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo this is a footgun we should aim to remove, I have a plan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#4721 (comment) are the key usecases -- as long as they are possible / easy / well documented it will be great

"Unsupported logical plan: CreateExternalTable".to_string(),
))
}
LogicalPlan::Prepare(_) => {
// There is no default plan for "PREPARE" -- it must be
// handled at a higher level (so that the appropriate
// statement can be prepared)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: Prepare".to_string(),
))
}
Expand All @@ -1114,7 +1114,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateCatalogSchema".to_string(),
))
}
Expand All @@ -1123,7 +1123,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateCatalog".to_string(),
))
}
Expand All @@ -1132,7 +1132,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateMemoryTable".to_string(),
))
}
Expand All @@ -1141,7 +1141,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: DropTable".to_string(),
))
}
Expand All @@ -1150,7 +1150,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: DropView".to_string(),
))
}
Expand All @@ -1159,16 +1159,16 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateView".to_string(),
))
}
LogicalPlan::SetVariable(_) => {
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: SetVariable must be root of the plan".to_string(),
))
}
LogicalPlan::Explain(_) => Err(DataFusionError::Internal(
LogicalPlan::Explain(_) => Err(DataFusionError::Plan(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
)),
LogicalPlan::Analyze(a) => {
Expand Down
14 changes: 8 additions & 6 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ async fn custom_source_dataframe() -> Result<()> {
.project(vec![col("c2")])?
.build()?;

let optimized_plan = ctx.optimize(&logical_plan)?;
let optimized_plan = ctx.dataframe(logical_plan).await?.into_optimized_plan()?;

match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
Expand All @@ -235,7 +236,11 @@ async fn custom_source_dataframe() -> Result<()> {
);
assert_eq!(format!("{:?}", optimized_plan), expected);

let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
let physical_plan = ctx
.dataframe(optimized_plan)
.await?
.create_physical_plan()
.await?;

assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
Expand All @@ -261,10 +266,7 @@ async fn optimizers_catch_all_statistics() {
.await
.unwrap();

let physical_plan = ctx
.create_physical_plan(&df.into_optimized_plan().unwrap())
.await
.unwrap();
let physical_plan = df.create_physical_plan().await.unwrap();

// when the optimization kicks in, the source is replaced by an EmptyExec
assert!(
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,12 @@ impl ContextWithParquet {
.expect("getting input");
let pretty_input = pretty_format_batches(&input).unwrap().to_string();

let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan");

let physical_plan = self
.ctx
.create_physical_plan(&logical_plan)
.dataframe(logical_plan)
.await
.expect("planning")
.create_physical_plan()
.await
.expect("creating physical plan");

Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,9 +1120,8 @@ async fn aggregate_with_alias() -> Result<()> {
.project(vec![col("c1"), sum(col("c2")).alias("total_salary")])?
.build()?;

let plan = ctx.optimize(&plan)?;
let physical_plan = ctx.dataframe(plan).await?.create_physical_plan().await?;

let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
assert_eq!("c1", physical_plan.schema().field(0).name().as_str());
assert_eq!(
"total_salary",
Expand Down
45 changes: 26 additions & 19 deletions datafusion/core/tests/sql/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,39 +135,46 @@ async fn invalid_qualified_table_references() -> Result<()> {
}

#[tokio::test]
#[allow(deprecated)] // TODO: Remove this test once create_logical_plan removed
/// This test demonstrates it is posible to run SQL queries in DataFusion without
/// any DDL Support (such as CREATE TABLE / CREATE VIEW, ETC).
async fn unsupported_sql_returns_error() -> Result<()> {
let ctx = SessionContext::new();
register_aggregate_csv(&ctx).await?;
// create view
let sql = "create view test_view as select * from aggregate_test_100";
let plan = ctx.create_logical_plan(sql);
let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await;
assert!(physical_plan.is_err());
let plan = ctx.plan_sql(sql)?;
let err = ctx
.dataframe_without_ddl(plan)?
.create_physical_plan()
.await
.unwrap_err();
assert_eq!(
format!("{}", physical_plan.unwrap_err()),
"Internal error: Unsupported logical plan: CreateView. \
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker"
err.to_string(),
"Error during planning: Unsupported logical plan: CreateView"
);
// // drop view
let sql = "drop view test_view";
let plan = ctx.create_logical_plan(sql);
let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await;
assert!(physical_plan.is_err());
let plan = ctx.plan_sql(sql)?;
let err = ctx
.dataframe_without_ddl(plan)?
.create_physical_plan()
.await
.unwrap_err();
assert_eq!(
format!("{}", physical_plan.unwrap_err()),
"Internal error: Unsupported logical plan: DropView. \
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker"
err.to_string(),
"Error during planning: Unsupported logical plan: DropView"
);
// // drop table
let sql = "drop table aggregate_test_100";
let plan = ctx.create_logical_plan(sql);
let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await;
assert!(physical_plan.is_err());
let plan = ctx.plan_sql(sql)?;
let err = ctx
.dataframe_without_ddl(plan)?
.create_physical_plan()
.await
.unwrap_err();
assert_eq!(
format!("{}", physical_plan.unwrap_err()),
"Internal error: Unsupported logical plan: DropTable. \
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker"
err.to_string(),
"Error during planning: Unsupported logical plan: DropTable"
);
Ok(())
}
6 changes: 5 additions & 1 deletion datafusion/core/tests/sql/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ fn optimize_explain() {
}

// now optimize the plan and expect to see more plans
let optimized_plan = SessionContext::new().optimize(&plan).unwrap();
let optimized_plan = SessionContext::new()
.dataframe_without_ddl(plan.clone())
.unwrap()
.into_optimized_plan()
.unwrap();
if let LogicalPlan::Explain(e) = &optimized_plan {
// should have more than one plan
assert!(
Expand Down
Loading