Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Listing URI logic into ListingTableUri structure #2578

Merged
merged 15 commits into from
May 30, 2022
4 changes: 3 additions & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use datafusion::{

use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use datafusion::datasource::listing::ListingTableUrl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is time to move datasource to its own crate, if possible 🤔

use serde::Serialize;
use structopt::StructOpt;

Expand Down Expand Up @@ -425,7 +426,8 @@ fn get_table(
table_partition_cols: vec![],
};

let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), path)
let uri = ListingTableUrl::parse(path)?;
let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), uri)
.with_listing_options(options)
.with_schema(schema);

Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;
use arrow_flight::SchemaAsIpc;
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
use futures::Stream;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};
Expand Down Expand Up @@ -68,9 +68,10 @@ impl FlightService for FlightServiceImpl {
let request = request.into_inner();

let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
let url = ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?;

let schema = listing_options
.infer_schema(Arc::new(LocalFileSystem {}), &request.path[0])
.infer_schema(Arc::new(LocalFileSystem {}), &url)
.await
.unwrap();

Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" }
datafusion-row = { path = "../row", version = "8.0.0" }
datafusion-sql = { path = "../sql", version = "8.0.0" }
futures = "0.3"
glob = "0.3.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved from data_access

hashbrown = { version = "0.12", features = ["raw"] }
itertools = "0.10"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is technically a new dependency of this crate, but it is so ubiquitous and is a dependency of prost, etc.. so this is probably ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But datafusion doesn't depend on prost, right? If we could avoid another dependency that would be goo in my opinion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well once we bring in the new object_store we will depend on it...

Ultimately it is such a fundamental crate, like bytes, parking_lot, etc... I'm inclined to think it isn't really a problem...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I still cringe a little, but I vote :shipit:

lazy_static = { version = "^1.4.0" }
log = "^0.4"
num-traits = { version = "0.2", optional = true }
Expand All @@ -85,6 +87,7 @@ sqlparser = "0.17"
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
tokio-stream = "0.1"
url = "2.2"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new dependency, it is brought in by hyper and friends and so again is pretty ubiquitous

uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ extern crate criterion;
use criterion::Criterion;
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};

use parking_lot::Mutex;
use std::sync::Arc;
Expand Down Expand Up @@ -64,11 +66,12 @@ fn create_context() -> Arc<Mutex<SessionContext>> {
let testdata = datafusion::test_util::arrow_test_data();

let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let url = ListingTableUrl::parse(path).unwrap();

// create CSV data source
let listing_options = ListingOptions::new(Arc::new(CsvFormat::default()));

let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), &path)
let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), url)
.with_listing_options(listing_options)
.with_schema(schema);

Expand Down
23 changes: 12 additions & 11 deletions datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

use crate::datasource::listing::{ListingTable, ListingTableConfig};
use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl};
use crate::datasource::object_store_registry::ObjectStoreRegistry;
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -157,10 +157,7 @@ impl ObjectStoreSchemaProvider {
}

/// Retrieves a `ObjectStore` instance by scheme
pub fn object_store<'a>(
&self,
uri: &'a str,
) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
pub fn object_store(&self, uri: &ListingTableUrl) -> Result<Arc<dyn ObjectStore>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheme stripping logic is now part of ListingTableUrl

self.object_store_registry
.lock()
.get_by_uri(uri)
Expand All @@ -173,13 +170,13 @@ impl ObjectStoreSchemaProvider {
pub async fn register_listing_table(
&self,
name: &str,
uri: &str,
uri: ListingTableUrl,
config: Option<ListingTableConfig>,
) -> Result<()> {
let config = match config {
Some(cfg) => cfg,
None => {
let (object_store, _path) = self.object_store(uri)?;
let object_store = self.object_store(&uri)?;
ListingTableConfig::new(object_store, uri).infer().await?
}
};
Expand Down Expand Up @@ -255,6 +252,7 @@ mod tests {
use crate::datasource::empty::EmptyTable;
use crate::execution::context::SessionContext;

use crate::datasource::listing::ListingTableUrl;
use futures::StreamExt;

#[tokio::test]
Expand All @@ -280,12 +278,13 @@ mod tests {
async fn test_schema_register_listing_table() {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let uri = ListingTableUrl::parse(filename).unwrap();

let schema = ObjectStoreSchemaProvider::new();
let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {}));

schema
.register_listing_table("alltypes_plain", &filename, None)
.register_listing_table("alltypes_plain", uri, None)
.await
.unwrap();

Expand Down Expand Up @@ -338,8 +337,9 @@ mod tests {
|| file == OsStr::new("alltypes_plain.parquet")
{
let name = path.file_stem().unwrap().to_str().unwrap();
let path = ListingTableUrl::parse(&sized_file.path).unwrap();
schema
.register_listing_table(name, &sized_file.path, None)
.register_listing_table(name, path, None)
.await
.unwrap();
}
Expand All @@ -360,17 +360,18 @@ mod tests {
async fn test_schema_register_same_listing_table() {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let uri = ListingTableUrl::parse(filename).unwrap();

let schema = ObjectStoreSchemaProvider::new();
let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {}));

schema
.register_listing_table("alltypes_plain", &filename, None)
.register_listing_table("alltypes_plain", uri.clone(), None)
.await
.unwrap();

schema
.register_listing_table("alltypes_plain", &filename, None)
.register_listing_table("alltypes_plain", uri, None)
.await
.unwrap();
}
Expand Down
Loading