From d6540dbf9f21298255f869f1c4b724216af86bd4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 5 Jul 2024 14:06:49 -0400 Subject: [PATCH] Improve and test dataframe API examples in docs --- datafusion-examples/README.md | 1 + datafusion/core/src/lib.rs | 8 +- .../using-the-dataframe-api.md | 271 ++++++++++++------ .../library-user-guide/using-the-sql-api.md | 17 +- 4 files changed, 208 insertions(+), 89 deletions(-) diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index f868a5310cbe..7ee88ebf3923 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -52,6 +52,7 @@ cargo run --example dataframe - [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization - [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file - [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider) +- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format - [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3 - [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file - [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index fb7abcd795e8..6a41c441e602 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -635,5 +635,11 @@ doc_comment::doctest!( #[cfg(doctest)] doc_comment::doctest!( "../../../docs/source/library-user-guide/using-the-sql-api.md", - library_user_guide_example_usage + library_user_guide_sql_api +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/using-the-dataframe-api.md", + library_user_guide_dataframe_api ); diff --git a/docs/source/library-user-guide/using-the-dataframe-api.md b/docs/source/library-user-guide/using-the-dataframe-api.md index c4f4ecd4f137..4aeb3bc24845 100644 --- a/docs/source/library-user-guide/using-the-dataframe-api.md +++ b/docs/source/library-user-guide/using-the-dataframe-api.md @@ -19,129 +19,236 @@ # Using the DataFrame API -## What is a DataFrame +## What is a DataFrame? -`DataFrame` in `DataFrame` is modeled after the Pandas DataFrame interface, and is a thin wrapper over LogicalPlan that adds functionality for building and executing those plans. - -```rust -pub struct DataFrame { - session_state: SessionState, - plan: LogicalPlan, -} -``` - -You can build up `DataFrame`s using its methods, similarly to building `LogicalPlan`s using `LogicalPlanBuilder`: - -```rust -let df = ctx.table("users").await?; - -// Create a new DataFrame sorted by `id`, `bank_account` -let new_df = df.select(vec![col("id"), col("bank_account")])? - .sort(vec![col("id")])?; - -// Build the same plan using the LogicalPlanBuilder -let plan = LogicalPlanBuilder::from(&df.to_logical_plan()) - .project(vec![col("id"), col("bank_account")])? - .sort(vec![col("id")])? - .build()?; -``` - -You can use `collect` or `execute_stream` to execute the query. +DataFusion [`DataFrame`]s are modeled after the [Pandas DataFrame] interface, +and is implemented as thin wrapper over a [`LogicalPlan`] that adds +functionality for building and executing those plans. ## How to generate a DataFrame -You can directly use the `DataFrame` API or generate a `DataFrame` from a SQL query. - -For example, to use `sql` to construct `DataFrame`: +You can directly use the `DataFrame` API or generate a `DataFrame` from a SQL +query. For example, to use `sql` to construct a `DataFrame`: ```rust -let ctx = SessionContext::new(); -// Register the in-memory table containing the data -ctx.register_table("users", Arc::new(create_memtable()?))?; -let dataframe = ctx.sql("SELECT * FROM users;").await?; +use std::sync::Arc; +use datafusion::prelude::*; +use datafusion::arrow::array::{ArrayRef, Int32Array}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::Result; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // Register an in-memory table containing the following data + // id | bank_account + // ---|------------- + // 1 | 9000 + // 2 | 8000 + // 3 | 7000 + let data = RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef), + ("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))), + ])?; + ctx.register_batch("users", data)?; + // Create a DataFrame using SQL + let dataframe = ctx.sql("SELECT * FROM users;").await?; + Ok(()) +} ``` -To construct `DataFrame` using the API: +You can also construct [`DataFrame`]s programmatically using the API: ```rust -let ctx = SessionContext::new(); -// Register the in-memory table containing the data -ctx.register_table("users", Arc::new(create_memtable()?))?; -let dataframe = ctx - .table("users") - .filter(col("a").lt_eq(col("b")))? - .sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?; +use std::sync::Arc; +use datafusion::prelude::*; +use datafusion::arrow::array::{ArrayRef, Int32Array}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::Result; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // Register the same in-memory table as the previous example + let data = RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef), + ("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))), + ])?; + ctx.register_batch("users", data)?; + // Create a DataFrame that scans the user table, and finds + // all users with a bank account at least 8000 + // and sorts the results by bank account in descending order + let dataframe = ctx + .table("users") + .await? + .filter(col("bank_account").gt_eq(lit(8000)))? // bank_account >= 8000 + .sort(vec![col("bank_account").sort(false, true)])?; // ORDER BY bank_account DESC + + Ok(()) +} ``` ## Collect / Streaming Exec -DataFusion `DataFrame`s are "lazy", meaning they do not do any processing until they are executed, which allows for additional optimizations. +DataFusion [`DataFrame`]s are "lazy", meaning they do no processing until +they are executed, which allows for additional optimizations. When you have a `DataFrame`, you can run it in one of three ways: -1. `collect` which executes the query and buffers all the output into a `Vec` -2. `streaming_exec`, which begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()` -3. `cache` which executes the query and buffers the output into a new in memory DataFrame. +1. `collect`: executes the query and buffers all the output into a `Vec` +2. `execute_stream`: begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()` +3. `cache`: executes the query and buffers the output into a new in memory `DataFrame.` -You can just collect all outputs once like: +To collect all outputs: ```rust -let ctx = SessionContext::new(); -let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; -let batches = df.collect().await?; +use datafusion::prelude::*; +use datafusion::error::Result; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // read the contents of a CSV file into a DataFrame + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // execute the query and collect the results as a Vec + let batches = df.collect().await?; + Ok(()) +} ``` -You can also use stream output to incrementally generate output one `RecordBatch` at a time +You can also use `execute_stream` to incrementally generate output one `RecordBatch` at a time: ```rust -let ctx = SessionContext::new(); -let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; -let mut stream = df.execute_stream().await?; -while let Some(rb) = stream.next().await { - println!("{rb:?}"); +use datafusion::prelude::*; +use datafusion::error::Result; +use futures::stream::StreamExt; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // begin execution (returns quickly, does not compute results) + let mut stream = df.execute_stream().await?; + // results are returned incrementally as they are computed + while let Some(record_batch) = stream.next().await { + println!("{record_batch:?}"); + } + Ok(()) } ``` # Write DataFrame to Files -You can also serialize `DataFrame` to a file. For now, `Datafusion` supports write `DataFrame` to `csv`, `json` and `parquet`. +You can also write the contents of a `DataFrame` to a file. When writing a file, DataFusion +executes the `DataFrame` and streams the results. DataFusion comes with support for writing +`csv`, `json` `arrow` `avro`, and `parquet` files, and supports writing custom +file formats via API (see [`custom_file_format.rs`] for an example) -When writing a file, DataFusion will execute the DataFrame and stream the results to a file. - -For example, to write a csv_file +For example, to read a CSV file and write it to a parquet file, use the +[`DataFrame::write_parquet`] method ```rust -let ctx = SessionContext::new(); -// Register the in-memory table containing the data -ctx.register_table("users", Arc::new(mem_table))?; -let dataframe = ctx.sql("SELECT * FROM users;").await?; - -dataframe - .write_csv("user_dataframe.csv", DataFrameWriteOptions::default(), None) - .await; +use datafusion::prelude::*; +use datafusion::error::Result; +use datafusion::dataframe::DataFrameWriteOptions; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // Register the in-memory table containing the data + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // execute the query and write it to a parquet file + df.write_parquet( + "example.parquet", + DataFrameWriteOptions::new(), + None, // writer_options + ) + .await; + Ok(()) +} ``` +[`custom_file_format.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_file_format.rs + and the file will look like (Example Output): -``` -id,bank_account -1,9000 +```sql +> select * from '../datafusion/core/example.parquet'; ++---+---+---+ +| a | b | c | ++---+---+---+ +| 1 | 2 | 3 | ++---+---+---+ ``` -## Transform between LogicalPlan and DataFrame +## `LogicalPlan`s and `DataFrame`s -As shown above, `DataFrame` is just a very thin wrapper of `LogicalPlan`, so you can easily go back and forth between them. +The `DataFrame` struct is defined like this ```rust -// Just combine LogicalPlan with SessionContext and you get a DataFrame -let ctx = SessionContext::new(); -// Register the in-memory table containing the data -ctx.register_table("users", Arc::new(mem_table))?; -let dataframe = ctx.sql("SELECT * FROM users;").await?; +use datafusion::execution::session_state::SessionState; +use datafusion::logical_expr::LogicalPlan; +pub struct DataFrame { + // state required to execute a LogicalPlan + session_state: Box, + // LogicalPlan that describes the computation to perform + plan: LogicalPlan, +} +``` -// get LogicalPlan in dataframe -let plan = dataframe.logical_plan().clone(); +As shown above, `DataFrame` is just a very thin wrapper of `LogicalPlan`, so you +can easily go back and forth between them. -// construct a DataFrame with LogicalPlan -let new_df = DataFrame::new(ctx.state(), plan); +```rust +use datafusion::prelude::*; +use datafusion::error::Result; +use datafusion::logical_expr::LogicalPlanBuilder; + +#[tokio::main] +async fn main() -> Result<()>{ + let ctx = SessionContext::new(); + // A DataFrame to scan the "example" table + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // You can easily get the LogicalPlan from the DataFrame + let (_state, plan) = df.into_parts(); + // Just combine LogicalPlan with SessionContext and you get a DataFrame + // get LogicalPlan in dataframe + let new_df = DataFrame::new(ctx.state(), plan); + Ok(()) +} +``` + +Note that can build up [`DataFrame`]s using its methods, similarly to building [`LogicalPlan`]s using [`LogicalPlanBuilder`]: + +```rust +use datafusion::prelude::*; +use datafusion::error::Result; +use datafusion::logical_expr::LogicalPlanBuilder; + +#[tokio::main] +async fn main() -> Result<()>{ + let ctx = SessionContext::new(); + // Create a DataFrame to scan the "example" table + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // Create a new DataFrame sorted by `id`, `bank_account` + let new_df = df.select(vec![col("a"), col("b")])? + .sort(vec![col("a")])?; + // Build the same plan using the LogicalPlanBuilder + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + let (_state, plan) = df.into_parts(); // get the DataFrame's LogicalPlan + let plan = LogicalPlanBuilder::from(plan) + .project(vec![col("a"), col("b")])? + .sort(vec![col("a")])? + .build()?; + // prove they are the same + assert_eq!(new_df.logical_plan(), &plan); + Ok(()) +} ``` + +[pandas dataframe]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html +[`dataframe`]: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html +[`logicalplan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html +[`logicalplanbuilder`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.LogicalPlanBuilder.html + +[`DataFrame::write_parquet`] https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.write_parquet diff --git a/docs/source/library-user-guide/using-the-sql-api.md b/docs/source/library-user-guide/using-the-sql-api.md index 1a25f078cc2e..9c32004db435 100644 --- a/docs/source/library-user-guide/using-the-sql-api.md +++ b/docs/source/library-user-guide/using-the-sql-api.md @@ -29,16 +29,15 @@ using the [`SessionContext::sql`] method. For lower level control such as preventing DDL, you can use [`SessionContext::sql_with_options`] or the [`SessionState`] APIs -[`sessioncontext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html -[`sessioncontext::sql`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql -[`sessioncontext::sql_with_options`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql_with_options -[`sessionstate`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html - ## Registering Data Sources using `SessionContext::register*` The `SessionContext::register*` methods tell DataFusion the name of the source and how to read data. Once registered, you can execute SQL queries -using the `SessionContext::sql` method referring to your data source as a table. +using the [`SessionContext::sql`] method referring to your data source as a table. + +The [`SessionContext::sql`] method returns a `DataFrame` for ease of +use. See the ["Using the DataFrame API"] section for more information on how to +work with DataFrames. ### Read a CSV File @@ -215,3 +214,9 @@ async fn main() -> Result<()> { Ok(()) } ``` + +[`sessioncontext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html +[`sessioncontext::sql`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql +[`sessioncontext::sql_with_options`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql_with_options +[`sessionstate`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html +["using the dataframe api"]: ../library-user-guide/using-the-dataframe-api.md