Skip to content

Commit

Permalink
fix a few usage of register_parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
jychen7 committed Mar 27, 2022
1 parent 8f0a588 commit 9492d17
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 32 deletions.
6 changes: 5 additions & 1 deletion ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,11 @@ mod tests {

let testdata = datafusion::test_util::parquet_test_data();
context
.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata))
.register_parquet(
"single_nan",
&format!("{}/single_nan.parquet", testdata),
ParquetReadOptions::default(),
)
.await
.unwrap();

Expand Down
7 changes: 5 additions & 2 deletions benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion::error::Result;
use datafusion::execution::context::{SessionConfig, SessionContext};

use datafusion::physical_plan::collect;
use datafusion::prelude::CsvReadOptions;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use structopt::StructOpt;

#[cfg(feature = "snmalloc")]
Expand Down Expand Up @@ -82,7 +82,10 @@ async fn main() -> Result<()> {
let options = CsvReadOptions::new().schema(&schema).has_header(true);
ctx.register_csv("tripdata", path, options).await?
}
"parquet" => ctx.register_parquet("tripdata", path).await?,
"parquet" => {
ctx.register_parquet("tripdata", path, ParquetReadOptions::default())
.await?
}
other => {
println!("Invalid file format '{}'", other);
process::exit(-1);
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl FlightService for FlightServiceImpl {
ctx.register_parquet(
"alltypes_plain",
&format!("{}/alltypes_plain.parquet", testdata),
ParquetReadOptions::default(),
)
.await
.map_err(to_tonic_err)?;
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/parquet_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async fn main() -> Result<()> {
ctx.register_parquet(
"alltypes_plain",
&format!("{}/alltypes_plain.parquet", testdata),
ParquetReadOptions::default(),
)
.await?;

Expand Down
8 changes: 6 additions & 2 deletions datafusion/benches/parquet_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,12 @@ fn criterion_benchmark(c: &mut Criterion) {
let mut context = SessionContext::new();

let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
rt.block_on(context.register_parquet("t", file_path.as_str()))
.unwrap();
rt.block_on(context.register_parquet(
"t",
file_path.as_str(),
ParquetReadOptions::default(),
))
.unwrap();

// We read the queries from a file so they can be changed without recompiling the benchmark
let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
Expand Down
22 changes: 8 additions & 14 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,20 +558,14 @@ impl SessionContext {

/// Registers a Parquet data source so that it can be referenced from SQL statements
/// executed against this context.
pub async fn register_parquet(&mut self, name: &str, uri: &str) -> Result<()> {
let (target_partitions, enable_pruning) = {
let conf = self.copied_config();
(conf.target_partitions, conf.parquet_pruning)
};
let file_format = ParquetFormat::default().with_enable_pruning(enable_pruning);

let listing_options = ListingOptions {
format: Arc::new(file_format),
collect_stat: true,
file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
target_partitions,
table_partition_cols: vec![],
};
pub async fn register_parquet(
&mut self,
name: &str,
uri: &str,
options: ParquetReadOptions<'_>,
) -> Result<()> {
let listing_options =
options.to_listing_options(self.copied_config().target_partitions);

self.register_listing_table(name, uri, listing_options, None)
.await?;
Expand Down
35 changes: 26 additions & 9 deletions datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ mod tests {

use super::*;
use crate::execution::options::CsvReadOptions;
use crate::prelude::{SessionConfig, SessionContext};
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use arrow::array::Float32Array;
use arrow::{
array::{Int64Array, Int8Array, StringArray},
Expand Down Expand Up @@ -1329,15 +1329,32 @@ mod tests {
let mut ctx = SessionContext::new();

// register each partition as well as the top level dir
ctx.register_parquet("part0", &format!("{}/part-0.parquet", out_dir))
.await?;
ctx.register_parquet("part1", &format!("{}/part-1.parquet", out_dir))
.await?;
ctx.register_parquet("part2", &format!("{}/part-2.parquet", out_dir))
.await?;
ctx.register_parquet("part3", &format!("{}/part-3.parquet", out_dir))
ctx.register_parquet(
"part0",
&format!("{}/part-0.parquet", out_dir),
ParquetReadOptions::default(),
)
.await?;
ctx.register_parquet(
"part1",
&format!("{}/part-1.parquet", out_dir),
ParquetReadOptions::default(),
)
.await?;
ctx.register_parquet(
"part2",
&format!("{}/part-2.parquet", out_dir),
ParquetReadOptions::default(),
)
.await?;
ctx.register_parquet(
"part3",
&format!("{}/part-3.parquet", out_dir),
ParquetReadOptions::default(),
)
.await?;
ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default())
.await?;
ctx.register_parquet("allparts", &out_dir).await?;

let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
let allparts = ctx
Expand Down
4 changes: 3 additions & 1 deletion datafusion/tests/parquet_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,9 @@ impl ContextWithParquet {
// now, setup a the file as a data source and run a query against it
let mut ctx = SessionContext::with_config(config);

ctx.register_parquet("t", &parquet_path).await.unwrap();
ctx.register_parquet("t", &parquet_path, ParquetReadOptions::default())
.await
.unwrap();
let provider = ctx.deregister_table("t").unwrap().unwrap();
ctx.register_table("t", provider.clone()).unwrap();

Expand Down
1 change: 1 addition & 0 deletions datafusion/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ async fn register_alltypes_parquet(ctx: &mut SessionContext) {
ctx.register_parquet(
"alltypes_plain",
&format!("{}/alltypes_plain.parquet", testdata),
ParquetReadOptions::default(),
)
.await
.unwrap();
Expand Down
11 changes: 8 additions & 3 deletions datafusion/tests/sql/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@ async fn parquet_query() {
async fn parquet_single_nan_schema() {
let mut ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata))
.await
.unwrap();
ctx.register_parquet(
"single_nan",
&format!("{}/single_nan.parquet", testdata),
ParquetReadOptions::default(),
)
.await
.unwrap();
let sql = "SELECT mycol FROM single_nan";
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
Expand All @@ -75,6 +79,7 @@ async fn parquet_list_columns() {
ctx.register_parquet(
"list_columns",
&format!("{}/list_columns.parquet", testdata),
ParquetReadOptions::default(),
)
.await
.unwrap();
Expand Down

0 comments on commit 9492d17

Please sign in to comment.