diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 23cf8830e36d..a3f100cbcb53 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -58,7 +58,7 @@ cargo run --example dataframe - [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format - [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3 - [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file. -- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde +- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results (Arrow ArrayRefs) into Rust structs - [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s - [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks. - [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients diff --git a/datafusion-examples/examples/deserialize_to_struct.rs b/datafusion-examples/examples/deserialize_to_struct.rs index 985cab703a5c..5ac3ee6187d1 100644 --- a/datafusion-examples/examples/deserialize_to_struct.rs +++ b/datafusion-examples/examples/deserialize_to_struct.rs @@ -15,62 +15,136 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::AsArray; +use arrow::array::{AsArray, PrimitiveArray}; use arrow::datatypes::{Float64Type, Int32Type}; use datafusion::error::Result; use datafusion::prelude::*; +use datafusion_common::assert_batches_eq; use futures::StreamExt; -/// This example shows that it is possible to convert query results into Rust structs . +/// This example shows how to convert query results into Rust structs by using +/// the Arrow APIs to convert the results into Rust native types. +/// +/// This is a bit tricky initially as the results are returned as columns stored +/// as [ArrayRef] +/// +/// [ArrayRef]: arrow::array::ArrayRef #[tokio::main] async fn main() -> Result<()> { - let data_list = Data::new().await?; - println!("{data_list:#?}"); - Ok(()) -} + // Run a query that returns two columns of data + let ctx = SessionContext::new(); + let testdata = datafusion::test_util::parquet_test_data(); + ctx.register_parquet( + "alltypes_plain", + &format!("{testdata}/alltypes_plain.parquet"), + ParquetReadOptions::default(), + ) + .await?; + let df = ctx + .sql("SELECT int_col, double_col FROM alltypes_plain") + .await?; -#[derive(Debug)] -struct Data { - #[allow(dead_code)] - int_col: i32, - #[allow(dead_code)] - double_col: f64, -} + // print out the results showing we have an int32 and a float64 column + let results = df.clone().collect().await?; + assert_batches_eq!( + [ + "+---------+------------+", + "| int_col | double_col |", + "+---------+------------+", + "| 0 | 0.0 |", + "| 1 | 10.1 |", + "| 0 | 0.0 |", + "| 1 | 10.1 |", + "| 0 | 0.0 |", + "| 1 | 10.1 |", + "| 0 | 0.0 |", + "| 1 | 10.1 |", + "+---------+------------+", + ], + &results + ); -impl Data { - pub async fn new() -> Result> { - // this group is almost the same as the one you find it in parquet_sql.rs - let ctx = SessionContext::new(); + // We will now convert the query results into a Rust struct + let mut stream = df.execute_stream().await?; + let mut list = vec![]; - let testdata = datafusion::test_util::parquet_test_data(); + // DataFusion produces data in chunks called `RecordBatch`es which are + // typically 8000 rows each. This loop processes each `RecordBatch` as it is + // produced by the query plan and adds it to the list + while let Some(b) = stream.next().await.transpose()? { + // Each `RecordBatch` has one or more columns. Each column is stored as + // an `ArrayRef`. To interact with data using Rust native types we need to + // convert these `ArrayRef`s into concrete array types using APIs from + // the arrow crate. - ctx.register_parquet( - "alltypes_plain", - &format!("{testdata}/alltypes_plain.parquet"), - ParquetReadOptions::default(), - ) - .await?; + // In this case, we know that each batch has two columns of the Arrow + // types Int32 and Float64, so first we cast the two columns to the + // appropriate Arrow PrimitiveArray (this is a fast / zero-copy cast).: + let int_col: &PrimitiveArray = b.column(0).as_primitive(); + let float_col: &PrimitiveArray = b.column(1).as_primitive(); - let df = ctx - .sql("SELECT int_col, double_col FROM alltypes_plain") - .await?; + // With PrimitiveArrays, we can access to the values as native Rust + // types i32 and f64, and forming the desired `Data` structs + for (i, f) in int_col.values().iter().zip(float_col.values()) { + list.push(Data { + int_col: *i, + double_col: *f, + }) + } + } - df.clone().show().await?; + // Finally, we have the results in the list of Rust structs + let res = format!("{list:#?}"); + assert_eq!( + res, + r#"[ + Data { + int_col: 0, + double_col: 0.0, + }, + Data { + int_col: 1, + double_col: 10.1, + }, + Data { + int_col: 0, + double_col: 0.0, + }, + Data { + int_col: 1, + double_col: 10.1, + }, + Data { + int_col: 0, + double_col: 0.0, + }, + Data { + int_col: 1, + double_col: 10.1, + }, + Data { + int_col: 0, + double_col: 0.0, + }, + Data { + int_col: 1, + double_col: 10.1, + }, +]"# + ); - let mut stream = df.execute_stream().await?; - let mut list = vec![]; - while let Some(b) = stream.next().await.transpose()? { - let int_col = b.column(0).as_primitive::(); - let float_col = b.column(1).as_primitive::(); + // Use the fields in the struct to avoid clippy complaints + let int_sum = list.iter().fold(0, |acc, x| acc + x.int_col); + let double_sum = list.iter().fold(0.0, |acc, x| acc + x.double_col); + assert_eq!(int_sum, 4); + assert_eq!(double_sum, 40.4); - for (i, f) in int_col.values().iter().zip(float_col.values()) { - list.push(Data { - int_col: *i, - double_col: *f, - }) - } - } + Ok(()) +} - Ok(list) - } +/// This is target struct where we want the query results. +#[derive(Debug)] +struct Data { + int_col: i32, + double_col: f64, }