-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new user doc to translate logical plan to physical plan #12026
Changes from 10 commits
e631793
f226b63
f3c144c
522a68b
7c7fd85
54bfc0c
380717d
a3d799e
098439e
dfcae07
48e3a85
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,3 +33,4 @@ workspace = true | |
|
||
[dependencies] | ||
datafusion = { workspace = true } | ||
tokio = { workspace = true } |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,44 +31,52 @@ explained in more detail in the [Query Planning and Execution Overview] section | |
DataFusion's [LogicalPlan] is an enum containing variants representing all the supported operators, and also | ||
contains an `Extension` variant that allows projects building on DataFusion to add custom logical operators. | ||
|
||
It is possible to create logical plans by directly creating instances of the [LogicalPlan] enum as follows, but is is | ||
It is possible to create logical plans by directly creating instances of the [LogicalPlan] enum as shown, but it is | ||
much easier to use the [LogicalPlanBuilder], which is described in the next section. | ||
|
||
Here is an example of building a logical plan directly: | ||
|
||
<!-- source for this example is in datafusion_docs::library_logical_plan::plan_1 --> | ||
|
||
```rust | ||
// create a logical table source | ||
let schema = Schema::new(vec![ | ||
Field::new("id", DataType::Int32, true), | ||
Field::new("name", DataType::Utf8, true), | ||
]); | ||
let table_source = LogicalTableSource::new(SchemaRef::new(schema)); | ||
|
||
// create a TableScan plan | ||
let projection = None; // optional projection | ||
let filters = vec![]; // optional filters to push down | ||
let fetch = None; // optional LIMIT | ||
let table_scan = LogicalPlan::TableScan(TableScan::try_new( | ||
"person", | ||
Arc::new(table_source), | ||
projection, | ||
filters, | ||
fetch, | ||
)?); | ||
|
||
// create a Filter plan that evaluates `id > 500` that wraps the TableScan | ||
let filter_expr = col("id").gt(lit(500)); | ||
let plan = LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(table_scan))?); | ||
|
||
// print the plan | ||
println!("{}", plan.display_indent_schema()); | ||
use datafusion::common::DataFusionError; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated these examples so they compile as standalone examples |
||
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; | ||
use datafusion::logical_expr::{Filter, LogicalPlan, TableScan, LogicalTableSource}; | ||
use datafusion::prelude::*; | ||
use std::sync::Arc; | ||
|
||
fn main() -> Result<(), DataFusionError> { | ||
// create a logical table source | ||
let schema = Schema::new(vec![ | ||
Field::new("id", DataType::Int32, true), | ||
Field::new("name", DataType::Utf8, true), | ||
]); | ||
let table_source = LogicalTableSource::new(SchemaRef::new(schema)); | ||
|
||
// create a TableScan plan | ||
let projection = None; // optional projection | ||
let filters = vec![]; // optional filters to push down | ||
let fetch = None; // optional LIMIT | ||
let table_scan = LogicalPlan::TableScan(TableScan::try_new( | ||
"person", | ||
Arc::new(table_source), | ||
projection, | ||
filters, | ||
fetch, | ||
)? | ||
); | ||
|
||
// create a Filter plan that evaluates `id > 500` that wraps the TableScan | ||
let filter_expr = col("id").gt(lit(500)); | ||
let plan = LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(table_scan)) ? ); | ||
|
||
// print the plan | ||
println!("{}", plan.display_indent_schema()); | ||
Ok(()) | ||
} | ||
``` | ||
|
||
This example produces the following plan: | ||
|
||
``` | ||
```text | ||
Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N] | ||
TableScan: person [id:Int32;N, name:Utf8;N] | ||
``` | ||
|
@@ -78,7 +86,7 @@ Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N] | |
DataFusion logical plans can be created using the [LogicalPlanBuilder] struct. There is also a [DataFrame] API which is | ||
a higher-level API that delegates to [LogicalPlanBuilder]. | ||
|
||
The following associated functions can be used to create a new builder: | ||
There are several functions that can can be used to create a new builder, such as | ||
|
||
- `empty` - create an empty plan with no fields | ||
- `values` - create a plan from a set of literal values | ||
|
@@ -102,41 +110,107 @@ The following example demonstrates building the same simple query plan as the pr | |
<!-- source for this example is in datafusion_docs::library_logical_plan::plan_builder_1 --> | ||
|
||
```rust | ||
// create a logical table source | ||
let schema = Schema::new(vec![ | ||
Field::new("id", DataType::Int32, true), | ||
Field::new("name", DataType::Utf8, true), | ||
]); | ||
let table_source = LogicalTableSource::new(SchemaRef::new(schema)); | ||
|
||
// optional projection | ||
let projection = None; | ||
|
||
// create a LogicalPlanBuilder for a table scan | ||
let builder = LogicalPlanBuilder::scan("person", Arc::new(table_source), projection)?; | ||
|
||
// perform a filter operation and build the plan | ||
let plan = builder | ||
.filter(col("id").gt(lit(500)))? // WHERE id > 500 | ||
.build()?; | ||
|
||
// print the plan | ||
println!("{}", plan.display_indent_schema()); | ||
use datafusion::common::DataFusionError; | ||
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; | ||
use datafusion::logical_expr::{LogicalPlanBuilder, LogicalTableSource}; | ||
use datafusion::prelude::*; | ||
use std::sync::Arc; | ||
|
||
fn main() -> Result<(), DataFusionError> { | ||
// create a logical table source | ||
let schema = Schema::new(vec![ | ||
Field::new("id", DataType::Int32, true), | ||
Field::new("name", DataType::Utf8, true), | ||
]); | ||
let table_source = LogicalTableSource::new(SchemaRef::new(schema)); | ||
|
||
// optional projection | ||
let projection = None; | ||
|
||
// create a LogicalPlanBuilder for a table scan | ||
let builder = LogicalPlanBuilder::scan("person", Arc::new(table_source), projection)?; | ||
|
||
// perform a filter operation and build the plan | ||
let plan = builder | ||
.filter(col("id").gt(lit(500)))? // WHERE id > 500 | ||
.build()?; | ||
|
||
// print the plan | ||
println!("{}", plan.display_indent_schema()); | ||
Ok(()) | ||
} | ||
``` | ||
|
||
This example produces the following plan: | ||
|
||
``` | ||
```text | ||
Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N] | ||
TableScan: person [id:Int32;N, name:Utf8;N] | ||
``` | ||
|
||
## Translating Logical Plan to Physical Plan | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @jc4x4 -- I have some ideas on improving this example -- I will push some suggestions to this branch |
||
|
||
Logical plans can not be directly executed. They must be "compiled" into an | ||
[`ExecutionPlan`], which is often referred to as a "physical plan". | ||
|
||
Compared to `LogicalPlan`s `ExecutionPlans` have many more details such as | ||
specific algorithms and detailed optimizations compared to. Given a | ||
`LogicalPlan` the easiest way to create an `ExecutionPlan` is using | ||
[`SessionState::create_physical_plan`] as shown below | ||
|
||
```rust | ||
use datafusion::datasource::{provider_as_source, MemTable}; | ||
use datafusion::common::DataFusionError; | ||
use datafusion::physical_plan::display::DisplayableExecutionPlan; | ||
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; | ||
use datafusion::logical_expr::{LogicalPlanBuilder, LogicalTableSource}; | ||
use datafusion::prelude::*; | ||
use std::sync::Arc; | ||
|
||
// Creating physical plans may access remote catalogs and data sources | ||
// thus it must be run with an async runtime. | ||
#[tokio::main] | ||
async fn main() -> Result<(), DataFusionError> { | ||
|
||
// create a default table source | ||
let schema = Schema::new(vec![ | ||
Field::new("id", DataType::Int32, true), | ||
Field::new("name", DataType::Utf8, true), | ||
]); | ||
// To create an ExecutionPlan we must provide an actual | ||
// TableProvider. For this example, we don't provide any data | ||
// but in production code, this would have `RecordBatch`es with | ||
// in memory data | ||
let table_provider = Arc::new(MemTable::try_new(Arc::new(schema), vec![])?); | ||
// Use the provider_as_source function to convert the TableProvider to a table source | ||
let table_source = provider_as_source(table_provider); | ||
|
||
// create a LogicalPlanBuilder for a table scan without projection or filters | ||
let logical_plan = LogicalPlanBuilder::scan("person", table_source, None)?.build()?; | ||
|
||
// Now create the physical plan by calling `create_physical_plan` | ||
let ctx = SessionContext::new(); | ||
let physical_plan = ctx.state().create_physical_plan(&logical_plan).await?; | ||
|
||
// print the plan | ||
println!("{}", DisplayableExecutionPlan::new(physical_plan.as_ref()).indent(true)); | ||
Ok(()) | ||
} | ||
``` | ||
|
||
This example produces the following physical plan: | ||
|
||
```text | ||
MemoryExec: partitions=0, partition_sizes=[] | ||
``` | ||
|
||
## Table Sources | ||
|
||
The previous example used a [LogicalTableSource], which is used for tests and documentation in DataFusion, and is also | ||
suitable if you are using DataFusion to build logical plans but do not use DataFusion's physical planner. However, if you | ||
want to use a [TableSource] that can be executed in DataFusion then you will need to use [DefaultTableSource], which is a | ||
wrapper for a [TableProvider]. | ||
The previous examples use a [LogicalTableSource], which is used for tests and documentation in DataFusion, and is also | ||
suitable if you are using DataFusion to build logical plans but do not use DataFusion's physical planner. | ||
|
||
However, it is more common to use a [TableProvider]. To get a [TableSource] from a | ||
[TableProvider], use [provider_as_source] or [DefaultTableSource]. | ||
|
||
[query planning and execution overview]: https://docs.rs/datafusion/latest/datafusion/index.html#query-planning-and-execution-overview | ||
[architecture guide]: https://docs.rs/datafusion/latest/datafusion/index.html#architecture | ||
|
@@ -145,5 +219,8 @@ wrapper for a [TableProvider]. | |
[dataframe]: using-the-dataframe-api.md | ||
[logicaltablesource]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalTableSource.html | ||
[defaulttablesource]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html | ||
[provider_as_source]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/fn.provider_as_source.html | ||
[tableprovider]: https://docs.rs/datafusion/latest/datafusion/datasource/provider/trait.TableProvider.html | ||
[tablesource]: https://docs.rs/datafusion-expr/latest/datafusion_expr/trait.TableSource.html | ||
[`executionplan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html | ||
[`sessionstate::create_physical_plan`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.create_physical_plan |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,9 +16,12 @@ | |
// under the License. | ||
|
||
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; | ||
use datafusion::datasource::{DefaultTableSource, MemTable}; | ||
use datafusion::error::Result; | ||
use datafusion::logical_expr::builder::LogicalTableSource; | ||
use datafusion::logical_expr::{Filter, LogicalPlan, LogicalPlanBuilder, TableScan}; | ||
use datafusion::physical_plan::display::DisplayableExecutionPlan; | ||
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; | ||
use datafusion::prelude::*; | ||
use std::sync::Arc; | ||
|
||
|
@@ -76,3 +79,32 @@ fn plan_builder_1() -> Result<()> { | |
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This file is confusing -- i don't think it is used anymore --- I added #12081 to remove it |
||
async fn translate_logical_to_physical() -> Result<()> { | ||
// create a default table source | ||
let schema = Schema::new(vec![ | ||
Field::new("id", DataType::Int32, true), | ||
Field::new("name", DataType::Utf8, true), | ||
]); | ||
let table_provider = Arc::new(MemTable::try_new(Arc::new(schema), vec![])?); | ||
let table_source = Arc::new(DefaultTableSource::new(table_provider)); | ||
|
||
// create a LogicalPlanBuilder for a table scan without projection or filters | ||
let logical_plan = LogicalPlanBuilder::scan("person", table_source, None)?.build()?; | ||
|
||
// create a physical plan using the default physical planner | ||
let ctx = SessionContext::new(); | ||
let planner = DefaultPhysicalPlanner::default(); | ||
let physical_plan = planner | ||
.create_physical_plan(&logical_plan, &ctx.state()) | ||
.await?; | ||
|
||
// print the plan | ||
println!( | ||
"{}", | ||
DisplayableExecutionPlan::new(physical_plan.as_ref()).indent(true) | ||
); | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this block which means the examples in the library guide are now run as part of
cargo test --doc