Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create ListingTableConfig which includes file format and schema inference #1715

Merged
merged 38 commits into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b5078ce
Starting work on ListingTableConfig
matthewmturner Jan 31, 2022
f5eeff8
v1 of config
matthewmturner Feb 5, 2022
716cf69
Refactor for cleaner api
matthewmturner Feb 6, 2022
4ffdd77
Update references in rest of code base
matthewmturner Feb 7, 2022
a603c3c
Docs
matthewmturner Feb 7, 2022
12daa27
Clippy
matthewmturner Feb 7, 2022
98e7478
Remove async from ListingTable::try_new
matthewmturner Feb 8, 2022
8e6e510
Ballista
matthewmturner Feb 8, 2022
fb7455d
Benchmark fix
matthewmturner Feb 8, 2022
0bc76b6
benchs
matthewmturner Feb 8, 2022
2d90462
Clippy
matthewmturner Feb 8, 2022
69e35e7
Clippy
matthewmturner Feb 8, 2022
daf817c
Fix empty table bug
matthewmturner Feb 8, 2022
1479ccf
Fix table path
matthewmturner Feb 8, 2022
d643cc8
Get file from directory
matthewmturner Feb 9, 2022
2fc3413
Fix read empty table test
matthewmturner Feb 9, 2022
33aebb6
Remove unnecessary async
matthewmturner Feb 9, 2022
684a310
Starting work on ListingTableConfig
matthewmturner Jan 31, 2022
b807ea4
v1 of config
matthewmturner Feb 5, 2022
b208c41
Refactor for cleaner api
matthewmturner Feb 6, 2022
32f0aad
Update references in rest of code base
matthewmturner Feb 7, 2022
2256f69
Docs
matthewmturner Feb 7, 2022
c4556b5
Clippy
matthewmturner Feb 7, 2022
48cb37b
Remove async from ListingTable::try_new
matthewmturner Feb 8, 2022
4641078
Ballista
matthewmturner Feb 8, 2022
97a2c72
Benchmark fix
matthewmturner Feb 8, 2022
536bb10
benchs
matthewmturner Feb 8, 2022
cfc2390
Clippy
matthewmturner Feb 8, 2022
17991c2
Clippy
matthewmturner Feb 8, 2022
d84ff25
Fix empty table bug
matthewmturner Feb 8, 2022
34aeacb
Fix table path
matthewmturner Feb 8, 2022
9d042d1
Get file from directory
matthewmturner Feb 9, 2022
41eb0bf
Fix read empty table test
matthewmturner Feb 9, 2022
1a27836
Remove unnecessary async
matthewmturner Feb 9, 2022
f7360d6
Rebase
matthewmturner Feb 9, 2022
195956b
Merge branch 'add_listing_config' of github.com:matthewmturner/arrow-…
matthewmturner Feb 9, 2022
3788855
Improve log
matthewmturner Feb 9, 2022
301d4d6
Rebase fix
matthewmturner Feb 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::datasource::file_format::avro::AvroFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::datasource::object_store::{FileMeta, SizedFile};
use datafusion::logical_plan::window_frames::{
Expand Down
13 changes: 6 additions & 7 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::datasource::file_format::avro::AvroFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::logical_plan::plan::Extension;
use datafusion::logical_plan::plan::{
Expand Down Expand Up @@ -241,12 +241,11 @@ impl AsLogicalPlan for LogicalPlanNode {
scan.path.as_str()
);

let provider = ListingTable::new(
object_store,
scan.path.clone(),
Arc::new(schema),
options,
);
let config = ListingTableConfig::new(object_store, scan.path.as_str())
.with_listing_options(options)
.with_schema(Arc::new(schema));

let provider = ListingTable::try_new(config)?;

LogicalPlanBuilder::scan_with_filters(
&scan.table_name,
Expand Down
24 changes: 12 additions & 12 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use datafusion::{
use datafusion::{
arrow::util::pretty,
datasource::{
listing::{ListingOptions, ListingTable},
listing::{ListingOptions, ListingTable, ListingTableConfig},
object_store::local::LocalFileSystem,
},
};
Expand Down Expand Up @@ -724,12 +724,11 @@ fn get_table(
table_partition_cols: vec![],
};

Ok(Arc::new(ListingTable::new(
Arc::new(LocalFileSystem {}),
path,
schema,
options,
)))
let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), path)
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
}

fn get_schema(table: &str) -> Schema {
Expand Down Expand Up @@ -1389,12 +1388,13 @@ mod tests {
.has_header(false)
.file_extension(".tbl");
let listing_options = options.to_listing_options(1);
let provider = ListingTable::new(
let config = ListingTableConfig::new(
Arc::new(LocalFileSystem {}),
format!("{}/{}.tbl", tpch_data_path, table),
Arc::new(schema),
listing_options,
);
tpch_data_path.clone(),
)
.with_listing_options(listing_options)
.with_schema(Arc::new(schema));
let provider = ListingTable::try_new(config)?;
ctx.register_table(table, Arc::new(provider))?;
}

Expand Down
18 changes: 10 additions & 8 deletions datafusion/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
extern crate criterion;
use criterion::Criterion;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::object_store::local::LocalFileSystem;

use parking_lot::Mutex;
Expand Down Expand Up @@ -63,14 +63,16 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {

let testdata = datafusion::test_util::arrow_test_data();

let path = format!("{}/csv/aggregate_test_100.csv", testdata);

// create CSV data source
let listing_options = ListingOptions::new(Arc::new(CsvFormat::default()));
let csv = ListingTable::new(
Arc::new(LocalFileSystem {}),
format!("{}/csv/aggregate_test_100.csv", testdata),
schema,
listing_options,
);

let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), &path)
.with_listing_options(listing_options)
.with_schema(schema);

let csv = async { ListingTable::try_new(config).unwrap() };

let rt = Runtime::new().unwrap();

Expand All @@ -85,7 +87,7 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
ctx.state.lock().config.target_partitions = 1;
let runtime = ctx.state.lock().runtime_env.clone();

let mem_table = MemTable::load(Arc::new(csv), Some(partitions), runtime)
let mem_table = MemTable::load(Arc::new(csv.await), Some(partitions), runtime)
.await
.unwrap();
ctx.register_table("aggregate_test_100", Arc::new(mem_table))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
mod helpers;
mod table;

pub use table::{ListingOptions, ListingTable};
pub use table::{ListingOptions, ListingTable, ListingTableConfig};
Loading