From 9492d177dd4cda847dbb6c0880235c4622fb46c9 Mon Sep 17 00:00:00 2001 From: jychen7 Date: Sun, 27 Mar 2022 11:31:18 -0400 Subject: [PATCH] fix a few usage of register_parquet --- ballista/rust/client/src/context.rs | 6 +++- benchmarks/src/bin/nyctaxi.rs | 7 ++-- datafusion-examples/examples/flight_server.rs | 1 + datafusion-examples/examples/parquet_sql.rs | 1 + datafusion/benches/parquet_query_sql.rs | 8 +++-- datafusion/src/execution/context.rs | 22 +++++------- .../src/physical_plan/file_format/parquet.rs | 35 ++++++++++++++----- datafusion/tests/parquet_pruning.rs | 4 ++- datafusion/tests/sql/mod.rs | 1 + datafusion/tests/sql/parquet.rs | 11 ++++-- 10 files changed, 64 insertions(+), 32 deletions(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index bd132da1516e..c0bdbca6c34b 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -516,7 +516,11 @@ mod tests { let testdata = datafusion::test_util::parquet_test_data(); context - .register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata)) + .register_parquet( + "single_nan", + &format!("{}/single_nan.parquet", testdata), + ParquetReadOptions::default(), + ) .await .unwrap(); diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs index a0cdb748a31e..e22c71e5e9e3 100644 --- a/benchmarks/src/bin/nyctaxi.rs +++ b/benchmarks/src/bin/nyctaxi.rs @@ -29,7 +29,7 @@ use datafusion::error::Result; use datafusion::execution::context::{SessionConfig, SessionContext}; use datafusion::physical_plan::collect; -use datafusion::prelude::CsvReadOptions; +use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; use structopt::StructOpt; #[cfg(feature = "snmalloc")] @@ -82,7 +82,10 @@ async fn main() -> Result<()> { let options = CsvReadOptions::new().schema(&schema).has_header(true); ctx.register_csv("tripdata", path, options).await? } - "parquet" => ctx.register_parquet("tripdata", path).await?, + "parquet" => { + ctx.register_parquet("tripdata", path, ParquetReadOptions::default()) + .await? + } other => { println!("Invalid file format '{}'", other); process::exit(-1); diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index d8b0405f19b9..697b56117e04 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -98,6 +98,7 @@ impl FlightService for FlightServiceImpl { ctx.register_parquet( "alltypes_plain", &format!("{}/alltypes_plain.parquet", testdata), + ParquetReadOptions::default(), ) .await .map_err(to_tonic_err)?; diff --git a/datafusion-examples/examples/parquet_sql.rs b/datafusion-examples/examples/parquet_sql.rs index 6ab24ff41ab6..22923c5239af 100644 --- a/datafusion-examples/examples/parquet_sql.rs +++ b/datafusion-examples/examples/parquet_sql.rs @@ -31,6 +31,7 @@ async fn main() -> Result<()> { ctx.register_parquet( "alltypes_plain", &format!("{}/alltypes_plain.parquet", testdata), + ParquetReadOptions::default(), ) .await?; diff --git a/datafusion/benches/parquet_query_sql.rs b/datafusion/benches/parquet_query_sql.rs index 183f64739069..35fef901867a 100644 --- a/datafusion/benches/parquet_query_sql.rs +++ b/datafusion/benches/parquet_query_sql.rs @@ -196,8 +196,12 @@ fn criterion_benchmark(c: &mut Criterion) { let mut context = SessionContext::new(); let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap(); - rt.block_on(context.register_parquet("t", file_path.as_str())) - .unwrap(); + rt.block_on(context.register_parquet( + "t", + file_path.as_str(), + ParquetReadOptions::default(), + )) + .unwrap(); // We read the queries from a file so they can be changed without recompiling the benchmark let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap(); diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index dac4fab24558..9b608da28b6e 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -558,20 +558,14 @@ impl SessionContext { /// Registers a Parquet data source so that it can be referenced from SQL statements /// executed against this context. - pub async fn register_parquet(&mut self, name: &str, uri: &str) -> Result<()> { - let (target_partitions, enable_pruning) = { - let conf = self.copied_config(); - (conf.target_partitions, conf.parquet_pruning) - }; - let file_format = ParquetFormat::default().with_enable_pruning(enable_pruning); - - let listing_options = ListingOptions { - format: Arc::new(file_format), - collect_stat: true, - file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), - target_partitions, - table_partition_cols: vec![], - }; + pub async fn register_parquet( + &mut self, + name: &str, + uri: &str, + options: ParquetReadOptions<'_>, + ) -> Result<()> { + let listing_options = + options.to_listing_options(self.copied_config().target_partitions); self.register_listing_table(name, uri, listing_options, None) .await?; diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index c77e7d8a27c0..97dde31c3d72 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -593,7 +593,7 @@ mod tests { use super::*; use crate::execution::options::CsvReadOptions; - use crate::prelude::{SessionConfig, SessionContext}; + use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use arrow::array::Float32Array; use arrow::{ array::{Int64Array, Int8Array, StringArray}, @@ -1329,15 +1329,32 @@ mod tests { let mut ctx = SessionContext::new(); // register each partition as well as the top level dir - ctx.register_parquet("part0", &format!("{}/part-0.parquet", out_dir)) - .await?; - ctx.register_parquet("part1", &format!("{}/part-1.parquet", out_dir)) - .await?; - ctx.register_parquet("part2", &format!("{}/part-2.parquet", out_dir)) - .await?; - ctx.register_parquet("part3", &format!("{}/part-3.parquet", out_dir)) + ctx.register_parquet( + "part0", + &format!("{}/part-0.parquet", out_dir), + ParquetReadOptions::default(), + ) + .await?; + ctx.register_parquet( + "part1", + &format!("{}/part-1.parquet", out_dir), + ParquetReadOptions::default(), + ) + .await?; + ctx.register_parquet( + "part2", + &format!("{}/part-2.parquet", out_dir), + ParquetReadOptions::default(), + ) + .await?; + ctx.register_parquet( + "part3", + &format!("{}/part-3.parquet", out_dir), + ParquetReadOptions::default(), + ) + .await?; + ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default()) .await?; - ctx.register_parquet("allparts", &out_dir).await?; let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?; let allparts = ctx diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs index 83d51e071aaa..d3b86ab28eed 100644 --- a/datafusion/tests/parquet_pruning.rs +++ b/datafusion/tests/parquet_pruning.rs @@ -482,7 +482,9 @@ impl ContextWithParquet { // now, setup a the file as a data source and run a query against it let mut ctx = SessionContext::with_config(config); - ctx.register_parquet("t", &parquet_path).await.unwrap(); + ctx.register_parquet("t", &parquet_path, ParquetReadOptions::default()) + .await + .unwrap(); let provider = ctx.deregister_table("t").unwrap().unwrap(); ctx.register_table("t", provider.clone()).unwrap(); diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs index 1e8938a06f83..ddceab2c371c 100644 --- a/datafusion/tests/sql/mod.rs +++ b/datafusion/tests/sql/mod.rs @@ -627,6 +627,7 @@ async fn register_alltypes_parquet(ctx: &mut SessionContext) { ctx.register_parquet( "alltypes_plain", &format!("{}/alltypes_plain.parquet", testdata), + ParquetReadOptions::default(), ) .await .unwrap(); diff --git a/datafusion/tests/sql/parquet.rs b/datafusion/tests/sql/parquet.rs index 92ad35cc5942..bed507a37262 100644 --- a/datafusion/tests/sql/parquet.rs +++ b/datafusion/tests/sql/parquet.rs @@ -52,9 +52,13 @@ async fn parquet_query() { async fn parquet_single_nan_schema() { let mut ctx = SessionContext::new(); let testdata = datafusion::test_util::parquet_test_data(); - ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata)) - .await - .unwrap(); + ctx.register_parquet( + "single_nan", + &format!("{}/single_nan.parquet", testdata), + ParquetReadOptions::default(), + ) + .await + .unwrap(); let sql = "SELECT mycol FROM single_nan"; let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); @@ -75,6 +79,7 @@ async fn parquet_list_columns() { ctx.register_parquet( "list_columns", &format!("{}/list_columns.parquet", testdata), + ParquetReadOptions::default(), ) .await .unwrap();