Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve and test dataframe API examples in docs #11290

Merged
merged 11 commits into from
Jul 9, 2024
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ cargo run --example dataframe
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
- [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file
- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
Expand Down
8 changes: 7 additions & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,5 +641,11 @@ doc_comment::doctest!(
#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/using-the-sql-api.md",
library_user_guide_example_usage
library_user_guide_sql_api
);

#[cfg(doctest)]
doc_comment::doctest!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This runs the examples as part of cargo test --doc

"../../../docs/source/library-user-guide/using-the-dataframe-api.md",
library_user_guide_dataframe_api
);
302 changes: 220 additions & 82 deletions docs/source/library-user-guide/using-the-dataframe-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,129 +19,267 @@

# Using the DataFrame API

## What is a DataFrame
The [Users Guide] introduces the [`DataFrame`] API and this section describes
that API in more depth.

`DataFrame` in `DataFrame` is modeled after the Pandas DataFrame interface, and is a thin wrapper over LogicalPlan that adds functionality for building and executing those plans.
## What is a DataFrame?

```rust
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this example to the end as I think it makes more sense once you see the DataFrame in action

pub struct DataFrame {
session_state: SessionState,
plan: LogicalPlan,
}
```

You can build up `DataFrame`s using its methods, similarly to building `LogicalPlan`s using `LogicalPlanBuilder`:

```rust
let df = ctx.table("users").await?;
As described in the [Users Guide], DataFusion [`DataFrame`]s are modeled after
the [Pandas DataFrame] interface, and are implemented as thin wrapper over a
[`LogicalPlan`] that adds functionality for building and executing those plans.

// Create a new DataFrame sorted by `id`, `bank_account`
let new_df = df.select(vec![col("id"), col("bank_account")])?
.sort(vec![col("id")])?;

// Build the same plan using the LogicalPlanBuilder
let plan = LogicalPlanBuilder::from(&df.to_logical_plan())
.project(vec![col("id"), col("bank_account")])?
.sort(vec![col("id")])?
.build()?;
```

You can use `collect` or `execute_stream` to execute the query.
The simplest possible dataframe is one that scans a table and that table can be
in a file or in memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok taking advantage of my relative ignorance to share some things that might be confusing to new users like me.

I think it might be better to start with a short section called Reading a Dataframe? Because that's probably the first thing people will want to do. It should show a simple example of reading a csv and displaying it. We just want to establish that when you read from a file the thing you get back is a dataframe! Then I wonder if the next section might be called Generating a New Dataframe? And I think it might be preferable to have the SQL example second rather than first?

Interestingly, the distinction between a "table" and a "dataframe" is hazy to me. There is also some sort of subtle distinction going on here between a dataframe as a thing that contains some data (which is how I think about it mentally when I read from a csv) and a thing which contains an executable plan that performs a transformation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great feedback -- I looked around and the more basic introduction seems like it is in https://datafusion.apache.org/user-guide/dataframe.html (the "user guide"). I'll add some text that points there and rearrange the content (as well as make a PR to clean up that page)

## How to generate a DataFrame

You can directly use the `DataFrame` API or generate a `DataFrame` from a SQL query.

For example, to use `sql` to construct `DataFrame`:
You can construct [`DataFrame`]s programmatically using the API, similarly to
other DataFrame APIs. For example, you can read an in memory `RecordBatch` into
a `DataFrame`:

```rust
let ctx = SessionContext::new();
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(create_memtable()?))?;
let dataframe = ctx.sql("SELECT * FROM users;").await?;
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::arrow::array::{ArrayRef, Int32Array};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While these examples are now more verbose, they all stand on their own, which I think @efredine suggested would be an improvement

// Register an in-memory table containing the following data
// id | bank_account
// ---|-------------
// 1 | 9000
// 2 | 8000
// 3 | 7000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The in-memory examples are concise and its easy to get the gist of what's going on. But it also throws people in to the deep end of the Arrow format which lacks a gentle introduction IMO. The Arrow-rs documentation gets immediately into the weeds!

It's likely that many users might never even need to know or access the arrow format directly. They will just read and write to csv or parquet.

I don't think this needs to change, but perhaps what's missing is a section on how and when to use the Arrow format? A gentler introduction to Record Batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a gentle arrow introduction would be awesome -- here is a ticke tracking such a thing upstream: apache/arrow-rs#4071

I actually think the basic content / structure could be copied from https://jorgecarleitao.github.io/arrow2/main/guide/ with the examples being updated to reflect arrow-rs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also add a small section in the DataFusion docs about record batches as well - filed #11336 to track that idea

let data = RecordBatch::try_from_iter(vec![
("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))),
])?;
// Create a DataFrame that scans the user table, and finds
// all users with a bank account at least 8000
// and sorts the results by bank account in descending order
let dataframe = ctx
.read_batch(data)?
.filter(col("bank_account").gt_eq(lit(8000)))? // bank_account >= 8000
.sort(vec![col("bank_account").sort(false, true)])?; // ORDER BY bank_account DESC

Ok(())
}
```

To construct `DataFrame` using the API:
You can _also_ generate a `DataFrame` from a SQL query and use the DataFrame's APIs
to manipulate the output of the query.

```rust
let ctx = SessionContext::new();
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(create_memtable()?))?;
let dataframe = ctx
.table("users")
.filter(col("a").lt_eq(col("b")))?
.sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?;
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::assert_batches_eq;
use datafusion::arrow::array::{ArrayRef, Int32Array};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// Register the same in-memory table as the previous example
let data = RecordBatch::try_from_iter(vec![
("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))),
])?;
ctx.register_batch("users", data)?;
// Create a DataFrame using SQL
let dataframe = ctx.sql("SELECT * FROM users;")
.await?
// Note we can filter the output of the query using the DataFrame API
.filter(col("bank_account").gt_eq(lit(8000)))?; // bank_account >= 8000

let results = &dataframe.collect().await?;

// use the `assert_batches_eq` macro to show the output
assert_batches_eq!(
vec![
"+----+--------------+",
"| id | bank_account |",
"+----+--------------+",
"| 1 | 9000 |",
"| 2 | 8000 |",
"+----+--------------+",
],
&results
);
Ok(())
}
```

## Collect / Streaming Exec

DataFusion `DataFrame`s are "lazy", meaning they do not do any processing until they are executed, which allows for additional optimizations.
DataFusion [`DataFrame`]s are "lazy", meaning they do no processing until
they are executed, which allows for additional optimizations.

When you have a `DataFrame`, you can run it in one of three ways:
You can run a `DataFrame` in one of three ways:

1. `collect` which executes the query and buffers all the output into a `Vec<RecordBatch>`
2. `streaming_exec`, which begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()`
3. `cache` which executes the query and buffers the output into a new in memory DataFrame.
1. `collect`: executes the query and buffers all the output into a `Vec<RecordBatch>`
2. `execute_stream`: begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()`
3. `cache`: executes the query and buffers the output into a new in memory `DataFrame.`

You can just collect all outputs once like:
To collect all outputs into a memory buffer, use the `collect` method:

```rust
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect().await?;
use datafusion::prelude::*;
use datafusion::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// read the contents of a CSV file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// execute the query and collect the results as a Vec<RecordBatch>
let batches = df.collect().await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the next example, it might be worth iterating the batch here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

for record_batch in batches {
println!("{record_batch:?}");
}
Ok(())
}
```

You can also use stream output to incrementally generate output one `RecordBatch` at a time
Use `execute_stream` to incrementally generate output one `RecordBatch` at a time:

```rust
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let mut stream = df.execute_stream().await?;
while let Some(rb) = stream.next().await {
println!("{rb:?}");
use datafusion::prelude::*;
use datafusion::error::Result;
use futures::stream::StreamExt;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// begin execution (returns quickly, does not compute results)
let mut stream = df.execute_stream().await?;
// results are returned incrementally as they are computed
while let Some(record_batch) = stream.next().await {
println!("{record_batch:?}");
}
Ok(())
}
```

# Write DataFrame to Files

You can also serialize `DataFrame` to a file. For now, `Datafusion` supports write `DataFrame` to `csv`, `json` and `parquet`.

When writing a file, DataFusion will execute the DataFrame and stream the results to a file.
You can also write the contents of a `DataFrame` to a file. When writing a file,
DataFusion executes the `DataFrame` and streams the results to the output.
DataFusion comes with support for writing `csv`, `json` `arrow` `avro`, and
`parquet` files, and supports writing custom file formats via API (see
[`custom_file_format.rs`] for an example)

For example, to write a csv_file
For example, to read a CSV file and write it to a parquet file, use the
[`DataFrame::write_parquet`] method

```rust
let ctx = SessionContext::new();
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(mem_table))?;
let dataframe = ctx.sql("SELECT * FROM users;").await?;

dataframe
.write_csv("user_dataframe.csv", DataFrameWriteOptions::default(), None)
.await;
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::dataframe::DataFrameWriteOptions;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
alamb marked this conversation as resolved.
Show resolved Hide resolved
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// stream the contents of the DataFrame to the `example.parquet` file
df.write_parquet(
"example.parquet",
DataFrameWriteOptions::new(),
None, // writer_options
).await;
Ok(())
}
```

and the file will look like (Example Output):
[`custom_file_format.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_file_format.rs

```
id,bank_account
1,9000
The output file will look like (Example Output):

```sql
> select * from '../datafusion/core/example.parquet';
+---+---+---+
| a | b | c |
+---+---+---+
| 1 | 2 | 3 |
+---+---+---+
```

## Transform between LogicalPlan and DataFrame
## Relationship between `LogicalPlan`s and `DataFrame`s

As shown above, `DataFrame` is just a very thin wrapper of `LogicalPlan`, so you can easily go back and forth between them.
The `DataFrame` struct is defined like this:

```rust
// Just combine LogicalPlan with SessionContext and you get a DataFrame
let ctx = SessionContext::new();
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(mem_table))?;
let dataframe = ctx.sql("SELECT * FROM users;").await?;
use datafusion::execution::session_state::SessionState;
use datafusion::logical_expr::LogicalPlan;
pub struct DataFrame {
// state required to execute a LogicalPlan
session_state: Box<SessionState>,
// LogicalPlan that describes the computation to perform
plan: LogicalPlan,
}
```

// get LogicalPlan in dataframe
let plan = dataframe.logical_plan().clone();
As shown above, `DataFrame` is a thin wrapper of `LogicalPlan`, so you can
easily go back and forth between them.

// construct a DataFrame with LogicalPlan
let new_df = DataFrame::new(ctx.state(), plan);
```rust
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::logical_expr::LogicalPlanBuilder;

#[tokio::main]
async fn main() -> Result<()>{
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// You can easily get the LogicalPlan from the DataFrame
let (_state, plan) = df.into_parts();
// Just combine LogicalPlan with SessionContext and you get a DataFrame
// get LogicalPlan in dataframe
let new_df = DataFrame::new(ctx.state(), plan);
Ok(())
}
```

In fact, using the [`DataFrame`]s methods you can create the same
[`LogicalPlan`]s as when using [`LogicalPlanBuilder`]:

```rust
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::logical_expr::LogicalPlanBuilder;

#[tokio::main]
async fn main() -> Result<()>{
let ctx = SessionContext::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok - this comment, really clarified the distinction between a table and a dataframe for me. In essence, the simplest possible dataframe is one that scans a table and that table can be in a file or in memory. I think this might be worth including in the introduction. Maybe worth consistently using scan when reading a file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to clarify and add some additional comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example really drives it home.

And in the near future we'll be able to turn it back into SQL which probably wouldn't belong here but is cool all the same ;-).

// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// Create a new DataFrame sorted by `id`, `bank_account`
let new_df = df.select(vec![col("a"), col("b")])?
.sort(vec![col("a")])?;
// Build the same plan using the LogicalPlanBuilder
// Similar to `SELECT a, b FROM example.csv ORDER BY a`
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let (_state, plan) = df.into_parts(); // get the DataFrame's LogicalPlan
let plan = LogicalPlanBuilder::from(plan)
.project(vec![col("a"), col("b")])?
.sort(vec![col("a")])?
.build()?;
// prove they are the same
assert_eq!(new_df.logical_plan(), &plan);
Ok(())
}
```

[users guide]: ../user-guide/dataframe.md
[pandas dataframe]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
[`dataframe`]: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html
[`logicalplan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html
[`logicalplanbuilder`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.LogicalPlanBuilder.html
[`dataframe::write_parquet`]: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.write_parquet
Loading
Loading