Skip to content

Commit

Permalink
Add new user doc to translate logical plan to physical plan (#12026)
Browse files Browse the repository at this point in the history
* Add new user doc to translate logical plan to physical plan

#7306

* prettier

* Run doc examples as part of cargo --doc

* Update first example to run

* Fix next example

* fix last example

* prettier

* clarify table source

* prettier

* Revert changes

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
jc4x4 and alamb authored Aug 21, 2024
1 parent 1c7209b commit 78f58c8
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 56 deletions.
6 changes: 6 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,12 @@ doc_comment::doctest!(
library_user_guide_sql_api
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/building-logical-plans.md",
library_user_guide_logical_plans
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/using-the-dataframe-api.md",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub mod tree_node;

pub use builder::{
build_join_schema, table_scan, union, wrap_projection_for_join_if_necessary,
LogicalPlanBuilder, UNNAMED_TABLE,
LogicalPlanBuilder, LogicalTableSource, UNNAMED_TABLE,
};
pub use ddl::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
Expand Down
187 changes: 132 additions & 55 deletions docs/source/library-user-guide/building-logical-plans.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,44 +31,52 @@ explained in more detail in the [Query Planning and Execution Overview] section
DataFusion's [LogicalPlan] is an enum containing variants representing all the supported operators, and also
contains an `Extension` variant that allows projects building on DataFusion to add custom logical operators.

It is possible to create logical plans by directly creating instances of the [LogicalPlan] enum as follows, but is is
It is possible to create logical plans by directly creating instances of the [LogicalPlan] enum as shown, but it is
much easier to use the [LogicalPlanBuilder], which is described in the next section.

Here is an example of building a logical plan directly:

<!-- source for this example is in datafusion_docs::library_logical_plan::plan_1 -->

```rust
// create a logical table source
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
]);
let table_source = LogicalTableSource::new(SchemaRef::new(schema));

// create a TableScan plan
let projection = None; // optional projection
let filters = vec![]; // optional filters to push down
let fetch = None; // optional LIMIT
let table_scan = LogicalPlan::TableScan(TableScan::try_new(
"person",
Arc::new(table_source),
projection,
filters,
fetch,
)?);

// create a Filter plan that evaluates `id > 500` that wraps the TableScan
let filter_expr = col("id").gt(lit(500));
let plan = LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(table_scan))?);

// print the plan
println!("{}", plan.display_indent_schema());
use datafusion::common::DataFusionError;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::logical_expr::{Filter, LogicalPlan, TableScan, LogicalTableSource};
use datafusion::prelude::*;
use std::sync::Arc;

fn main() -> Result<(), DataFusionError> {
// create a logical table source
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
]);
let table_source = LogicalTableSource::new(SchemaRef::new(schema));

// create a TableScan plan
let projection = None; // optional projection
let filters = vec![]; // optional filters to push down
let fetch = None; // optional LIMIT
let table_scan = LogicalPlan::TableScan(TableScan::try_new(
"person",
Arc::new(table_source),
projection,
filters,
fetch,
)?
);

// create a Filter plan that evaluates `id > 500` that wraps the TableScan
let filter_expr = col("id").gt(lit(500));
let plan = LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(table_scan)) ? );

// print the plan
println!("{}", plan.display_indent_schema());
Ok(())
}
```

This example produces the following plan:

```
```text
Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N]
TableScan: person [id:Int32;N, name:Utf8;N]
```
Expand All @@ -78,7 +86,7 @@ Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N]
DataFusion logical plans can be created using the [LogicalPlanBuilder] struct. There is also a [DataFrame] API which is
a higher-level API that delegates to [LogicalPlanBuilder].

The following associated functions can be used to create a new builder:
There are several functions that can can be used to create a new builder, such as

- `empty` - create an empty plan with no fields
- `values` - create a plan from a set of literal values
Expand All @@ -102,41 +110,107 @@ The following example demonstrates building the same simple query plan as the pr
<!-- source for this example is in datafusion_docs::library_logical_plan::plan_builder_1 -->

```rust
// create a logical table source
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
]);
let table_source = LogicalTableSource::new(SchemaRef::new(schema));

// optional projection
let projection = None;

// create a LogicalPlanBuilder for a table scan
let builder = LogicalPlanBuilder::scan("person", Arc::new(table_source), projection)?;

// perform a filter operation and build the plan
let plan = builder
.filter(col("id").gt(lit(500)))? // WHERE id > 500
.build()?;

// print the plan
println!("{}", plan.display_indent_schema());
use datafusion::common::DataFusionError;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::logical_expr::{LogicalPlanBuilder, LogicalTableSource};
use datafusion::prelude::*;
use std::sync::Arc;

fn main() -> Result<(), DataFusionError> {
// create a logical table source
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
]);
let table_source = LogicalTableSource::new(SchemaRef::new(schema));

// optional projection
let projection = None;

// create a LogicalPlanBuilder for a table scan
let builder = LogicalPlanBuilder::scan("person", Arc::new(table_source), projection)?;

// perform a filter operation and build the plan
let plan = builder
.filter(col("id").gt(lit(500)))? // WHERE id > 500
.build()?;

// print the plan
println!("{}", plan.display_indent_schema());
Ok(())
}
```

This example produces the following plan:

```
```text
Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N]
TableScan: person [id:Int32;N, name:Utf8;N]
```

## Translating Logical Plan to Physical Plan

Logical plans can not be directly executed. They must be "compiled" into an
[`ExecutionPlan`], which is often referred to as a "physical plan".

Compared to `LogicalPlan`s `ExecutionPlans` have many more details such as
specific algorithms and detailed optimizations compared to. Given a
`LogicalPlan` the easiest way to create an `ExecutionPlan` is using
[`SessionState::create_physical_plan`] as shown below

```rust
use datafusion::datasource::{provider_as_source, MemTable};
use datafusion::common::DataFusionError;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::logical_expr::{LogicalPlanBuilder, LogicalTableSource};
use datafusion::prelude::*;
use std::sync::Arc;

// Creating physical plans may access remote catalogs and data sources
// thus it must be run with an async runtime.
#[tokio::main]
async fn main() -> Result<(), DataFusionError> {

// create a default table source
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
]);
// To create an ExecutionPlan we must provide an actual
// TableProvider. For this example, we don't provide any data
// but in production code, this would have `RecordBatch`es with
// in memory data
let table_provider = Arc::new(MemTable::try_new(Arc::new(schema), vec![])?);
// Use the provider_as_source function to convert the TableProvider to a table source
let table_source = provider_as_source(table_provider);

// create a LogicalPlanBuilder for a table scan without projection or filters
let logical_plan = LogicalPlanBuilder::scan("person", table_source, None)?.build()?;

// Now create the physical plan by calling `create_physical_plan`
let ctx = SessionContext::new();
let physical_plan = ctx.state().create_physical_plan(&logical_plan).await?;

// print the plan
println!("{}", DisplayableExecutionPlan::new(physical_plan.as_ref()).indent(true));
Ok(())
}
```

This example produces the following physical plan:

```text
MemoryExec: partitions=0, partition_sizes=[]
```

## Table Sources

The previous example used a [LogicalTableSource], which is used for tests and documentation in DataFusion, and is also
suitable if you are using DataFusion to build logical plans but do not use DataFusion's physical planner. However, if you
want to use a [TableSource] that can be executed in DataFusion then you will need to use [DefaultTableSource], which is a
wrapper for a [TableProvider].
The previous examples use a [LogicalTableSource], which is used for tests and documentation in DataFusion, and is also
suitable if you are using DataFusion to build logical plans but do not use DataFusion's physical planner.

However, it is more common to use a [TableProvider]. To get a [TableSource] from a
[TableProvider], use [provider_as_source] or [DefaultTableSource].

[query planning and execution overview]: https://docs.rs/datafusion/latest/datafusion/index.html#query-planning-and-execution-overview
[architecture guide]: https://docs.rs/datafusion/latest/datafusion/index.html#architecture
Expand All @@ -145,5 +219,8 @@ wrapper for a [TableProvider].
[dataframe]: using-the-dataframe-api.md
[logicaltablesource]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalTableSource.html
[defaulttablesource]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html
[provider_as_source]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/fn.provider_as_source.html
[tableprovider]: https://docs.rs/datafusion/latest/datafusion/datasource/provider/trait.TableProvider.html
[tablesource]: https://docs.rs/datafusion-expr/latest/datafusion_expr/trait.TableSource.html
[`executionplan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html
[`sessionstate::create_physical_plan`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.create_physical_plan

0 comments on commit 78f58c8

Please sign in to comment.