diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 90469e6715a6a..2696f74775cf3 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -55,6 +55,7 @@ cargo run --example dataframe - [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization - [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file - [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider) +- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format - [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3 - [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file - [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 956e9f7246a36..f5805bc069825 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -641,5 +641,11 @@ doc_comment::doctest!( #[cfg(doctest)] doc_comment::doctest!( "../../../docs/source/library-user-guide/using-the-sql-api.md", - library_user_guide_example_usage + library_user_guide_sql_api +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/using-the-dataframe-api.md", + library_user_guide_dataframe_api ); diff --git a/docs/source/library-user-guide/using-the-dataframe-api.md b/docs/source/library-user-guide/using-the-dataframe-api.md index c4f4ecd4f1370..9e7774cbb944c 100644 --- a/docs/source/library-user-guide/using-the-dataframe-api.md +++ b/docs/source/library-user-guide/using-the-dataframe-api.md @@ -19,129 +19,267 @@ # Using the DataFrame API -## What is a DataFrame +The [Users Guide] introduces the [`DataFrame`] API and this section describes +that API in more depth. -`DataFrame` in `DataFrame` is modeled after the Pandas DataFrame interface, and is a thin wrapper over LogicalPlan that adds functionality for building and executing those plans. +## What is a DataFrame? -```rust -pub struct DataFrame { - session_state: SessionState, - plan: LogicalPlan, -} -``` - -You can build up `DataFrame`s using its methods, similarly to building `LogicalPlan`s using `LogicalPlanBuilder`: - -```rust -let df = ctx.table("users").await?; +As described in the [Users Guide], DataFusion [`DataFrame`]s are modeled after +the [Pandas DataFrame] interface, and are implemented as thin wrapper over a +[`LogicalPlan`] that adds functionality for building and executing those plans. -// Create a new DataFrame sorted by `id`, `bank_account` -let new_df = df.select(vec![col("id"), col("bank_account")])? - .sort(vec![col("id")])?; - -// Build the same plan using the LogicalPlanBuilder -let plan = LogicalPlanBuilder::from(&df.to_logical_plan()) - .project(vec![col("id"), col("bank_account")])? - .sort(vec![col("id")])? - .build()?; -``` - -You can use `collect` or `execute_stream` to execute the query. +The simplest possible dataframe is one that scans a table and that table can be +in a file or in memory. ## How to generate a DataFrame -You can directly use the `DataFrame` API or generate a `DataFrame` from a SQL query. - -For example, to use `sql` to construct `DataFrame`: +You can construct [`DataFrame`]s programmatically using the API, similarly to +other DataFrame APIs. For example, you can read an in memory `RecordBatch` into +a `DataFrame`: ```rust -let ctx = SessionContext::new(); -// Register the in-memory table containing the data -ctx.register_table("users", Arc::new(create_memtable()?))?; -let dataframe = ctx.sql("SELECT * FROM users;").await?; +use std::sync::Arc; +use datafusion::prelude::*; +use datafusion::arrow::array::{ArrayRef, Int32Array}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::Result; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // Register an in-memory table containing the following data + // id | bank_account + // ---|------------- + // 1 | 9000 + // 2 | 8000 + // 3 | 7000 + let data = RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef), + ("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))), + ])?; + // Create a DataFrame that scans the user table, and finds + // all users with a bank account at least 8000 + // and sorts the results by bank account in descending order + let dataframe = ctx + .read_batch(data)? + .filter(col("bank_account").gt_eq(lit(8000)))? // bank_account >= 8000 + .sort(vec![col("bank_account").sort(false, true)])?; // ORDER BY bank_account DESC + + Ok(()) +} ``` -To construct `DataFrame` using the API: +You can _also_ generate a `DataFrame` from a SQL query and use the DataFrame's APIs +to manipulate the output of the query. ```rust -let ctx = SessionContext::new(); -// Register the in-memory table containing the data -ctx.register_table("users", Arc::new(create_memtable()?))?; -let dataframe = ctx - .table("users") - .filter(col("a").lt_eq(col("b")))? - .sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?; +use std::sync::Arc; +use datafusion::prelude::*; +use datafusion::assert_batches_eq; +use datafusion::arrow::array::{ArrayRef, Int32Array}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::Result; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // Register the same in-memory table as the previous example + let data = RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef), + ("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))), + ])?; + ctx.register_batch("users", data)?; + // Create a DataFrame using SQL + let dataframe = ctx.sql("SELECT * FROM users;") + .await? + // Note we can filter the output of the query using the DataFrame API + .filter(col("bank_account").gt_eq(lit(8000)))?; // bank_account >= 8000 + + let results = &dataframe.collect().await?; + + // use the `assert_batches_eq` macro to show the output + assert_batches_eq!( + vec![ + "+----+--------------+", + "| id | bank_account |", + "+----+--------------+", + "| 1 | 9000 |", + "| 2 | 8000 |", + "+----+--------------+", + ], + &results + ); + Ok(()) +} ``` ## Collect / Streaming Exec -DataFusion `DataFrame`s are "lazy", meaning they do not do any processing until they are executed, which allows for additional optimizations. +DataFusion [`DataFrame`]s are "lazy", meaning they do no processing until +they are executed, which allows for additional optimizations. -When you have a `DataFrame`, you can run it in one of three ways: +You can run a `DataFrame` in one of three ways: -1. `collect` which executes the query and buffers all the output into a `Vec` -2. `streaming_exec`, which begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()` -3. `cache` which executes the query and buffers the output into a new in memory DataFrame. +1. `collect`: executes the query and buffers all the output into a `Vec` +2. `execute_stream`: begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()` +3. `cache`: executes the query and buffers the output into a new in memory `DataFrame.` -You can just collect all outputs once like: +To collect all outputs into a memory buffer, use the `collect` method: ```rust -let ctx = SessionContext::new(); -let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; -let batches = df.collect().await?; +use datafusion::prelude::*; +use datafusion::error::Result; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // read the contents of a CSV file into a DataFrame + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // execute the query and collect the results as a Vec + let batches = df.collect().await?; + for record_batch in batches { + println!("{record_batch:?}"); + } + Ok(()) +} ``` -You can also use stream output to incrementally generate output one `RecordBatch` at a time +Use `execute_stream` to incrementally generate output one `RecordBatch` at a time: ```rust -let ctx = SessionContext::new(); -let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; -let mut stream = df.execute_stream().await?; -while let Some(rb) = stream.next().await { - println!("{rb:?}"); +use datafusion::prelude::*; +use datafusion::error::Result; +use futures::stream::StreamExt; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // read example.csv file into a DataFrame + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // begin execution (returns quickly, does not compute results) + let mut stream = df.execute_stream().await?; + // results are returned incrementally as they are computed + while let Some(record_batch) = stream.next().await { + println!("{record_batch:?}"); + } + Ok(()) } ``` # Write DataFrame to Files -You can also serialize `DataFrame` to a file. For now, `Datafusion` supports write `DataFrame` to `csv`, `json` and `parquet`. - -When writing a file, DataFusion will execute the DataFrame and stream the results to a file. +You can also write the contents of a `DataFrame` to a file. When writing a file, +DataFusion executes the `DataFrame` and streams the results to the output. +DataFusion comes with support for writing `csv`, `json` `arrow` `avro`, and +`parquet` files, and supports writing custom file formats via API (see +[`custom_file_format.rs`] for an example) -For example, to write a csv_file +For example, to read a CSV file and write it to a parquet file, use the +[`DataFrame::write_parquet`] method ```rust -let ctx = SessionContext::new(); -// Register the in-memory table containing the data -ctx.register_table("users", Arc::new(mem_table))?; -let dataframe = ctx.sql("SELECT * FROM users;").await?; - -dataframe - .write_csv("user_dataframe.csv", DataFrameWriteOptions::default(), None) - .await; +use datafusion::prelude::*; +use datafusion::error::Result; +use datafusion::dataframe::DataFrameWriteOptions; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // read example.csv file into a DataFrame + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // stream the contents of the DataFrame to the `example.parquet` file + df.write_parquet( + "example.parquet", + DataFrameWriteOptions::new(), + None, // writer_options + ).await; + Ok(()) +} ``` -and the file will look like (Example Output): +[`custom_file_format.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_file_format.rs -``` -id,bank_account -1,9000 +The output file will look like (Example Output): + +```sql +> select * from '../datafusion/core/example.parquet'; ++---+---+---+ +| a | b | c | ++---+---+---+ +| 1 | 2 | 3 | ++---+---+---+ ``` -## Transform between LogicalPlan and DataFrame +## Relationship between `LogicalPlan`s and `DataFrame`s -As shown above, `DataFrame` is just a very thin wrapper of `LogicalPlan`, so you can easily go back and forth between them. +The `DataFrame` struct is defined like this: ```rust -// Just combine LogicalPlan with SessionContext and you get a DataFrame -let ctx = SessionContext::new(); -// Register the in-memory table containing the data -ctx.register_table("users", Arc::new(mem_table))?; -let dataframe = ctx.sql("SELECT * FROM users;").await?; +use datafusion::execution::session_state::SessionState; +use datafusion::logical_expr::LogicalPlan; +pub struct DataFrame { + // state required to execute a LogicalPlan + session_state: Box, + // LogicalPlan that describes the computation to perform + plan: LogicalPlan, +} +``` -// get LogicalPlan in dataframe -let plan = dataframe.logical_plan().clone(); +As shown above, `DataFrame` is a thin wrapper of `LogicalPlan`, so you can +easily go back and forth between them. -// construct a DataFrame with LogicalPlan -let new_df = DataFrame::new(ctx.state(), plan); +```rust +use datafusion::prelude::*; +use datafusion::error::Result; +use datafusion::logical_expr::LogicalPlanBuilder; + +#[tokio::main] +async fn main() -> Result<()>{ + let ctx = SessionContext::new(); + // read example.csv file into a DataFrame + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // You can easily get the LogicalPlan from the DataFrame + let (_state, plan) = df.into_parts(); + // Just combine LogicalPlan with SessionContext and you get a DataFrame + // get LogicalPlan in dataframe + let new_df = DataFrame::new(ctx.state(), plan); + Ok(()) +} ``` + +In fact, using the [`DataFrame`]s methods you can create the same +[`LogicalPlan`]s as when using [`LogicalPlanBuilder`]: + +```rust +use datafusion::prelude::*; +use datafusion::error::Result; +use datafusion::logical_expr::LogicalPlanBuilder; + +#[tokio::main] +async fn main() -> Result<()>{ + let ctx = SessionContext::new(); + // read example.csv file into a DataFrame + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // Create a new DataFrame sorted by `id`, `bank_account` + let new_df = df.select(vec![col("a"), col("b")])? + .sort(vec![col("a")])?; + // Build the same plan using the LogicalPlanBuilder + // Similar to `SELECT a, b FROM example.csv ORDER BY a` + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + let (_state, plan) = df.into_parts(); // get the DataFrame's LogicalPlan + let plan = LogicalPlanBuilder::from(plan) + .project(vec![col("a"), col("b")])? + .sort(vec![col("a")])? + .build()?; + // prove they are the same + assert_eq!(new_df.logical_plan(), &plan); + Ok(()) +} +``` + +[users guide]: ../user-guide/dataframe.md +[pandas dataframe]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html +[`dataframe`]: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html +[`logicalplan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html +[`logicalplanbuilder`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.LogicalPlanBuilder.html +[`dataframe::write_parquet`]: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.write_parquet diff --git a/docs/source/library-user-guide/using-the-sql-api.md b/docs/source/library-user-guide/using-the-sql-api.md index 1a25f078cc2e2..9c32004db4359 100644 --- a/docs/source/library-user-guide/using-the-sql-api.md +++ b/docs/source/library-user-guide/using-the-sql-api.md @@ -29,16 +29,15 @@ using the [`SessionContext::sql`] method. For lower level control such as preventing DDL, you can use [`SessionContext::sql_with_options`] or the [`SessionState`] APIs -[`sessioncontext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html -[`sessioncontext::sql`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql -[`sessioncontext::sql_with_options`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql_with_options -[`sessionstate`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html - ## Registering Data Sources using `SessionContext::register*` The `SessionContext::register*` methods tell DataFusion the name of the source and how to read data. Once registered, you can execute SQL queries -using the `SessionContext::sql` method referring to your data source as a table. +using the [`SessionContext::sql`] method referring to your data source as a table. + +The [`SessionContext::sql`] method returns a `DataFrame` for ease of +use. See the ["Using the DataFrame API"] section for more information on how to +work with DataFrames. ### Read a CSV File @@ -215,3 +214,9 @@ async fn main() -> Result<()> { Ok(()) } ``` + +[`sessioncontext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html +[`sessioncontext::sql`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql +[`sessioncontext::sql_with_options`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql_with_options +[`sessionstate`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html +["using the dataframe api"]: ../library-user-guide/using-the-dataframe-api.md