Skip to content

Commit

Permalink
Create ListingTableConfig which includes file format and schema infer…
Browse files Browse the repository at this point in the history
…ence (#1715)

* Starting work on ListingTableConfig

* v1 of config

* Refactor for cleaner api

* Update references in rest of code base

* Docs

* Clippy

* Remove async from ListingTable::try_new

* Ballista

* Benchmark fix

* benchs

* Clippy

* Clippy

* Fix empty table bug

* Fix table path

* Get file from directory

* Fix read empty table test

* Remove unnecessary async

* Starting work on ListingTableConfig

* v1 of config

* Refactor for cleaner api

* Update references in rest of code base

* Docs

* Clippy

* Remove async from ListingTable::try_new

* Ballista

* Benchmark fix

* benchs

* Clippy

* Clippy

* Fix empty table bug

* Fix table path

* Get file from directory

* Fix read empty table test

* Remove unnecessary async

* Rebase

* Improve log

* Rebase fix
  • Loading branch information
matthewmturner authored Feb 10, 2022
1 parent d35adc9 commit e5f6969
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 80 deletions.
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::datasource::file_format::avro::AvroFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::datasource::object_store::{FileMeta, SizedFile};
use datafusion::logical_plan::window_frames::{
Expand Down
13 changes: 6 additions & 7 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::datasource::file_format::avro::AvroFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::logical_plan::plan::Extension;
use datafusion::logical_plan::plan::{
Expand Down Expand Up @@ -241,12 +241,11 @@ impl AsLogicalPlan for LogicalPlanNode {
scan.path.as_str()
);

let provider = ListingTable::new(
object_store,
scan.path.clone(),
Arc::new(schema),
options,
);
let config = ListingTableConfig::new(object_store, scan.path.as_str())
.with_listing_options(options)
.with_schema(Arc::new(schema));

let provider = ListingTable::try_new(config)?;

LogicalPlanBuilder::scan_with_filters(
&scan.table_name,
Expand Down
24 changes: 12 additions & 12 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use datafusion::{
use datafusion::{
arrow::util::pretty,
datasource::{
listing::{ListingOptions, ListingTable},
listing::{ListingOptions, ListingTable, ListingTableConfig},
object_store::local::LocalFileSystem,
},
};
Expand Down Expand Up @@ -724,12 +724,11 @@ fn get_table(
table_partition_cols: vec![],
};

Ok(Arc::new(ListingTable::new(
Arc::new(LocalFileSystem {}),
path,
schema,
options,
)))
let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), path)
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
}

fn get_schema(table: &str) -> Schema {
Expand Down Expand Up @@ -1389,12 +1388,13 @@ mod tests {
.has_header(false)
.file_extension(".tbl");
let listing_options = options.to_listing_options(1);
let provider = ListingTable::new(
let config = ListingTableConfig::new(
Arc::new(LocalFileSystem {}),
format!("{}/{}.tbl", tpch_data_path, table),
Arc::new(schema),
listing_options,
);
tpch_data_path.clone(),
)
.with_listing_options(listing_options)
.with_schema(Arc::new(schema));
let provider = ListingTable::try_new(config)?;
ctx.register_table(table, Arc::new(provider))?;
}

Expand Down
18 changes: 10 additions & 8 deletions datafusion/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
extern crate criterion;
use criterion::Criterion;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::object_store::local::LocalFileSystem;

use parking_lot::Mutex;
Expand Down Expand Up @@ -63,14 +63,16 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {

let testdata = datafusion::test_util::arrow_test_data();

let path = format!("{}/csv/aggregate_test_100.csv", testdata);

// create CSV data source
let listing_options = ListingOptions::new(Arc::new(CsvFormat::default()));
let csv = ListingTable::new(
Arc::new(LocalFileSystem {}),
format!("{}/csv/aggregate_test_100.csv", testdata),
schema,
listing_options,
);

let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), &path)
.with_listing_options(listing_options)
.with_schema(schema);

let csv = async { ListingTable::try_new(config).unwrap() };

let rt = Runtime::new().unwrap();

Expand All @@ -85,7 +87,7 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
ctx.state.lock().config.target_partitions = 1;
let runtime = ctx.state.lock().runtime_env.clone();

let mem_table = MemTable::load(Arc::new(csv), Some(partitions), runtime)
let mem_table = MemTable::load(Arc::new(csv.await), Some(partitions), runtime)
.await
.unwrap();
ctx.register_table("aggregate_test_100", Arc::new(mem_table))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
mod helpers;
mod table;

pub use table::{ListingOptions, ListingTable};
pub use table::{ListingOptions, ListingTable, ListingTableConfig};
Loading

0 comments on commit e5f6969

Please sign in to comment.