Skip to content

Commit

Permalink
Non-deprecated support for planning SQL without DDL
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 23, 2022
1 parent 720bdb0 commit beef643
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 43 deletions.
44 changes: 21 additions & 23 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,26 @@ impl SessionContext {
self.state.read().config.clone()
}

/// Creates a [`DataFrame`] that will execute a SQL query.
/// Creates a [`DataFrame`] that executes a SQL query supported by
/// DataFusion, including DDL such as (such as `CREATE TABLE`).
///
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred.
/// You can use [`Self::plan_sql`] and
/// [`Self::create_physical_plan`] directly if you need read only
/// query support (no way to create external tables, for example)
///
/// This method is `async` because queries of type `CREATE
/// EXTERNAL TABLE` might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
let mut statements = DFParser::parse_sql(sql)?;
if statements.len() != 1 {
return Err(DataFusionError::NotImplemented(
"The context currently only supports a single SQL statement".to_string(),
));
}

// create a query planner
let plan = {
// TODO: Move catalog off SessionState onto SessionContext
let state = self.state.read();
let query_planner = SqlToRel::new(&*state);
query_planner.statement_to_plan(statements.pop_front().unwrap())?
};
let plan = self.plan_sql(sql)?;
self.plan_to_dataframe(plan).await
}

/// Creates a [`DataFrame`] that will execute the specified
/// LogicalPlan, including DDL such as (such as `CREATE TABLE`)
///
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred by performing I/O.
pub async fn plan_to_dataframe(&self, plan: LogicalPlan) -> Result<DataFrame> {
match plan {
LogicalPlan::CreateExternalTable(cmd) => {
self.create_external_table(&cmd).await
Expand Down Expand Up @@ -559,11 +559,9 @@ impl SessionContext {
}
Ok(false)
}
/// Creates a logical plan.
///
/// This function is intended for internal use and should not be called directly.
#[deprecated(note = "Use SessionContext::sql which snapshots the SessionState")]
pub fn create_logical_plan(&self, sql: &str) -> Result<LogicalPlan> {

/// Creates a [`LogicalPlan`] from a SQL query.
pub fn plan_sql(&self, sql: &str) -> Result<LogicalPlan> {
let mut statements = DFParser::parse_sql(sql)?;

if statements.len() != 1 {
Expand Down Expand Up @@ -1004,7 +1002,7 @@ impl SessionContext {
self.state.read().optimize(plan)
}

/// Creates a physical plan from a logical plan.
/// Creates a [`ExecutionPlan`] physical plan from a [`LogicalPlan`].
pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
Expand Down
20 changes: 10 additions & 10 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,15 +1097,15 @@ impl DefaultPhysicalPlanner {
// TABLE" -- it must be handled at a higher level (so
// that the appropriate table can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateExternalTable".to_string(),
))
}
LogicalPlan::Prepare(_) => {
// There is no default plan for "PREPARE" -- it must be
// handled at a higher level (so that the appropriate
// statement can be prepared)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: Prepare".to_string(),
))
}
Expand All @@ -1114,7 +1114,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateCatalogSchema".to_string(),
))
}
Expand All @@ -1123,7 +1123,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateCatalog".to_string(),
))
}
Expand All @@ -1132,7 +1132,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateMemoryTable".to_string(),
))
}
Expand All @@ -1141,7 +1141,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: DropTable".to_string(),
))
}
Expand All @@ -1150,7 +1150,7 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: DropView".to_string(),
))
}
Expand All @@ -1159,16 +1159,16 @@ impl DefaultPhysicalPlanner {
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: CreateView".to_string(),
))
}
LogicalPlan::SetVariable(_) => {
Err(DataFusionError::Internal(
Err(DataFusionError::Plan(
"Unsupported logical plan: SetVariable must be root of the plan".to_string(),
))
}
LogicalPlan::Explain(_) => Err(DataFusionError::Internal(
LogicalPlan::Explain(_) => Err(DataFusionError::Plan(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
)),
LogicalPlan::Analyze(a) => {
Expand Down
18 changes: 8 additions & 10 deletions datafusion/core/tests/sql/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,39 +135,37 @@ async fn invalid_qualified_table_references() -> Result<()> {
}

#[tokio::test]
#[allow(deprecated)] // TODO: Remove this test once create_logical_plan removed
/// This test demonstrates it is posible to run SQL queries in DataFusion without
/// any DDL Support (such as CREATE TABLE / CREATE VIEW, ETC).
async fn unsupported_sql_returns_error() -> Result<()> {
let ctx = SessionContext::new();
register_aggregate_csv(&ctx).await?;
// create view
let sql = "create view test_view as select * from aggregate_test_100";
let plan = ctx.create_logical_plan(sql);
let plan = ctx.plan_sql(sql);
let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await;
assert!(physical_plan.is_err());
assert_eq!(
format!("{}", physical_plan.unwrap_err()),
"Internal error: Unsupported logical plan: CreateView. \
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker"
"Error during planning: Unsupported logical plan: CreateView"
);
// // drop view
let sql = "drop view test_view";
let plan = ctx.create_logical_plan(sql);
let plan = ctx.plan_sql(sql);
let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await;
assert!(physical_plan.is_err());
assert_eq!(
format!("{}", physical_plan.unwrap_err()),
"Internal error: Unsupported logical plan: DropView. \
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker"
"Error during planning: Unsupported logical plan: DropView"
);
// // drop table
let sql = "drop table aggregate_test_100";
let plan = ctx.create_logical_plan(sql);
let plan = ctx.plan_sql(sql);
let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await;
assert!(physical_plan.is_err());
assert_eq!(
format!("{}", physical_plan.unwrap_err()),
"Internal error: Unsupported logical plan: DropTable. \
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker"
"Error during planning: Unsupported logical plan: DropTable"
);
Ok(())
}

0 comments on commit beef643

Please sign in to comment.