Skip to content

Commit

Permalink
Improve and test dataframe API examples in docs
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 6, 2024
1 parent 13cb65e commit d6540db
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 89 deletions.
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ cargo run --example dataframe
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
- [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file
- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
Expand Down
8 changes: 7 additions & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,5 +635,11 @@ doc_comment::doctest!(
#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/using-the-sql-api.md",
library_user_guide_example_usage
library_user_guide_sql_api
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/using-the-dataframe-api.md",
library_user_guide_dataframe_api
);
271 changes: 189 additions & 82 deletions docs/source/library-user-guide/using-the-dataframe-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,129 +19,236 @@

# Using the DataFrame API

## What is a DataFrame
## What is a DataFrame?

`DataFrame` in `DataFrame` is modeled after the Pandas DataFrame interface, and is a thin wrapper over LogicalPlan that adds functionality for building and executing those plans.

```rust
pub struct DataFrame {
session_state: SessionState,
plan: LogicalPlan,
}
```

You can build up `DataFrame`s using its methods, similarly to building `LogicalPlan`s using `LogicalPlanBuilder`:

```rust
let df = ctx.table("users").await?;

// Create a new DataFrame sorted by `id`, `bank_account`
let new_df = df.select(vec![col("id"), col("bank_account")])?
.sort(vec![col("id")])?;

// Build the same plan using the LogicalPlanBuilder
let plan = LogicalPlanBuilder::from(&df.to_logical_plan())
.project(vec![col("id"), col("bank_account")])?
.sort(vec![col("id")])?
.build()?;
```

You can use `collect` or `execute_stream` to execute the query.
DataFusion [`DataFrame`]s are modeled after the [Pandas DataFrame] interface,
and is implemented as thin wrapper over a [`LogicalPlan`] that adds
functionality for building and executing those plans.

## How to generate a DataFrame

You can directly use the `DataFrame` API or generate a `DataFrame` from a SQL query.

For example, to use `sql` to construct `DataFrame`:
You can directly use the `DataFrame` API or generate a `DataFrame` from a SQL
query. For example, to use `sql` to construct a `DataFrame`:

```rust
let ctx = SessionContext::new();
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(create_memtable()?))?;
let dataframe = ctx.sql("SELECT * FROM users;").await?;
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::arrow::array::{ArrayRef, Int32Array};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// Register an in-memory table containing the following data
// id | bank_account
// ---|-------------
// 1 | 9000
// 2 | 8000
// 3 | 7000
let data = RecordBatch::try_from_iter(vec![
("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))),
])?;
ctx.register_batch("users", data)?;
// Create a DataFrame using SQL
let dataframe = ctx.sql("SELECT * FROM users;").await?;
Ok(())
}
```

To construct `DataFrame` using the API:
You can also construct [`DataFrame`]s programmatically using the API:

```rust
let ctx = SessionContext::new();
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(create_memtable()?))?;
let dataframe = ctx
.table("users")
.filter(col("a").lt_eq(col("b")))?
.sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?;
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::arrow::array::{ArrayRef, Int32Array};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// Register the same in-memory table as the previous example
let data = RecordBatch::try_from_iter(vec![
("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))),
])?;
ctx.register_batch("users", data)?;
// Create a DataFrame that scans the user table, and finds
// all users with a bank account at least 8000
// and sorts the results by bank account in descending order
let dataframe = ctx
.table("users")
.await?
.filter(col("bank_account").gt_eq(lit(8000)))? // bank_account >= 8000
.sort(vec![col("bank_account").sort(false, true)])?; // ORDER BY bank_account DESC

Ok(())
}
```

## Collect / Streaming Exec

DataFusion `DataFrame`s are "lazy", meaning they do not do any processing until they are executed, which allows for additional optimizations.
DataFusion [`DataFrame`]s are "lazy", meaning they do no processing until
they are executed, which allows for additional optimizations.

When you have a `DataFrame`, you can run it in one of three ways:

1. `collect` which executes the query and buffers all the output into a `Vec<RecordBatch>`
2. `streaming_exec`, which begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()`
3. `cache` which executes the query and buffers the output into a new in memory DataFrame.
1. `collect`: executes the query and buffers all the output into a `Vec<RecordBatch>`
2. `execute_stream`: begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()`
3. `cache`: executes the query and buffers the output into a new in memory `DataFrame.`

You can just collect all outputs once like:
To collect all outputs:

```rust
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect().await?;
use datafusion::prelude::*;
use datafusion::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// read the contents of a CSV file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// execute the query and collect the results as a Vec<RecordBatch>
let batches = df.collect().await?;
Ok(())
}
```

You can also use stream output to incrementally generate output one `RecordBatch` at a time
You can also use `execute_stream` to incrementally generate output one `RecordBatch` at a time:

```rust
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let mut stream = df.execute_stream().await?;
while let Some(rb) = stream.next().await {
println!("{rb:?}");
use datafusion::prelude::*;
use datafusion::error::Result;
use futures::stream::StreamExt;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// begin execution (returns quickly, does not compute results)
let mut stream = df.execute_stream().await?;
// results are returned incrementally as they are computed
while let Some(record_batch) = stream.next().await {
println!("{record_batch:?}");
}
Ok(())
}
```

# Write DataFrame to Files

You can also serialize `DataFrame` to a file. For now, `Datafusion` supports write `DataFrame` to `csv`, `json` and `parquet`.
You can also write the contents of a `DataFrame` to a file. When writing a file, DataFusion
executes the `DataFrame` and streams the results. DataFusion comes with support for writing
`csv`, `json` `arrow` `avro`, and `parquet` files, and supports writing custom
file formats via API (see [`custom_file_format.rs`] for an example)

When writing a file, DataFusion will execute the DataFrame and stream the results to a file.

For example, to write a csv_file
For example, to read a CSV file and write it to a parquet file, use the
[`DataFrame::write_parquet`] method

```rust
let ctx = SessionContext::new();
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(mem_table))?;
let dataframe = ctx.sql("SELECT * FROM users;").await?;

dataframe
.write_csv("user_dataframe.csv", DataFrameWriteOptions::default(), None)
.await;
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::dataframe::DataFrameWriteOptions;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// Register the in-memory table containing the data
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// execute the query and write it to a parquet file
df.write_parquet(
"example.parquet",
DataFrameWriteOptions::new(),
None, // writer_options
)
.await;
Ok(())
}
```

[`custom_file_format.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_file_format.rs

and the file will look like (Example Output):

```
id,bank_account
1,9000
```sql
> select * from '../datafusion/core/example.parquet';
+---+---+---+
| a | b | c |
+---+---+---+
| 1 | 2 | 3 |
+---+---+---+
```

## Transform between LogicalPlan and DataFrame
## `LogicalPlan`s and `DataFrame`s

As shown above, `DataFrame` is just a very thin wrapper of `LogicalPlan`, so you can easily go back and forth between them.
The `DataFrame` struct is defined like this

```rust
// Just combine LogicalPlan with SessionContext and you get a DataFrame
let ctx = SessionContext::new();
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(mem_table))?;
let dataframe = ctx.sql("SELECT * FROM users;").await?;
use datafusion::execution::session_state::SessionState;
use datafusion::logical_expr::LogicalPlan;
pub struct DataFrame {
// state required to execute a LogicalPlan
session_state: Box<SessionState>,
// LogicalPlan that describes the computation to perform
plan: LogicalPlan,
}
```

// get LogicalPlan in dataframe
let plan = dataframe.logical_plan().clone();
As shown above, `DataFrame` is just a very thin wrapper of `LogicalPlan`, so you
can easily go back and forth between them.

// construct a DataFrame with LogicalPlan
let new_df = DataFrame::new(ctx.state(), plan);
```rust
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::logical_expr::LogicalPlanBuilder;

#[tokio::main]
async fn main() -> Result<()>{
let ctx = SessionContext::new();
// A DataFrame to scan the "example" table
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// You can easily get the LogicalPlan from the DataFrame
let (_state, plan) = df.into_parts();
// Just combine LogicalPlan with SessionContext and you get a DataFrame
// get LogicalPlan in dataframe
let new_df = DataFrame::new(ctx.state(), plan);
Ok(())
}
```

Note that can build up [`DataFrame`]s using its methods, similarly to building [`LogicalPlan`]s using [`LogicalPlanBuilder`]:

```rust
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::logical_expr::LogicalPlanBuilder;

#[tokio::main]
async fn main() -> Result<()>{
let ctx = SessionContext::new();
// Create a DataFrame to scan the "example" table
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// Create a new DataFrame sorted by `id`, `bank_account`
let new_df = df.select(vec![col("a"), col("b")])?
.sort(vec![col("a")])?;
// Build the same plan using the LogicalPlanBuilder
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let (_state, plan) = df.into_parts(); // get the DataFrame's LogicalPlan
let plan = LogicalPlanBuilder::from(plan)
.project(vec![col("a"), col("b")])?
.sort(vec![col("a")])?
.build()?;
// prove they are the same
assert_eq!(new_df.logical_plan(), &plan);
Ok(())
}
```

[pandas dataframe]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
[`dataframe`]: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html
[`logicalplan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html
[`logicalplanbuilder`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.LogicalPlanBuilder.html

[`DataFrame::write_parquet`] https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.write_parquet
17 changes: 11 additions & 6 deletions docs/source/library-user-guide/using-the-sql-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,15 @@ using the [`SessionContext::sql`] method. For lower level control such as
preventing DDL, you can use [`SessionContext::sql_with_options`] or the
[`SessionState`] APIs

[`sessioncontext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html
[`sessioncontext::sql`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql
[`sessioncontext::sql_with_options`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql_with_options
[`sessionstate`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html

## Registering Data Sources using `SessionContext::register*`

The `SessionContext::register*` methods tell DataFusion the name of
the source and how to read data. Once registered, you can execute SQL queries
using the `SessionContext::sql` method referring to your data source as a table.
using the [`SessionContext::sql`] method referring to your data source as a table.

The [`SessionContext::sql`] method returns a `DataFrame` for ease of
use. See the ["Using the DataFrame API"] section for more information on how to
work with DataFrames.

### Read a CSV File

Expand Down Expand Up @@ -215,3 +214,9 @@ async fn main() -> Result<()> {
Ok(())
}
```

[`sessioncontext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html
[`sessioncontext::sql`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql
[`sessioncontext::sql_with_options`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql_with_options
[`sessionstate`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html
["using the dataframe api"]: ../library-user-guide/using-the-dataframe-api.md

0 comments on commit d6540db

Please sign in to comment.