-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract Listing URI logic into ListingTableUri structure #2578
Changes from all commits
4314a3b
7dc7eef
18ceed5
9c8b546
230e42b
deef0b6
6c311ba
9c9b4b2
63df81c
938e916
c1cf2da
c32e27e
61d67cf
a504fe8
0ec7433
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" } | |
datafusion-row = { path = "../row", version = "8.0.0" } | ||
datafusion-sql = { path = "../sql", version = "8.0.0" } | ||
futures = "0.3" | ||
glob = "0.3.0" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is moved from data_access |
||
hashbrown = { version = "0.12", features = ["raw"] } | ||
itertools = "0.10" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is technically a new dependency of this crate, but it is so ubiquitous and is a dependency of prost, etc.. so this is probably ok There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But datafusion doesn't depend on prost, right? If we could avoid another dependency that would be goo in my opinion There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well once we bring in the new object_store we will depend on it... Ultimately it is such a fundamental crate, like bytes, parking_lot, etc... I'm inclined to think it isn't really a problem... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, I still cringe a little, but I vote |
||
lazy_static = { version = "^1.4.0" } | ||
log = "^0.4" | ||
num-traits = { version = "0.2", optional = true } | ||
|
@@ -85,6 +87,7 @@ sqlparser = "0.17" | |
tempfile = "3" | ||
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } | ||
tokio-stream = "0.1" | ||
url = "2.2" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a new dependency, it is brought in by hyper and friends and so again is pretty ubiquitous |
||
uuid = { version = "1.0", features = ["v4"] } | ||
|
||
[dev-dependencies] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,8 +23,8 @@ use std::any::Any; | |
use std::collections::HashMap; | ||
use std::sync::Arc; | ||
|
||
use crate::datasource::listing::{ListingTable, ListingTableConfig}; | ||
use crate::datasource::object_store_registry::ObjectStoreRegistry; | ||
use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; | ||
use crate::datasource::object_store::ObjectStoreRegistry; | ||
use crate::datasource::TableProvider; | ||
use crate::error::{DataFusionError, Result}; | ||
use datafusion_data_access::object_store::ObjectStore; | ||
|
@@ -156,31 +156,33 @@ impl ObjectStoreSchemaProvider { | |
.register_store(scheme.into(), object_store) | ||
} | ||
|
||
/// Retrieves a `ObjectStore` instance by scheme | ||
pub fn object_store<'a>( | ||
/// Retrieves a `ObjectStore` instance for a given Url | ||
pub fn object_store( | ||
&self, | ||
uri: &'a str, | ||
) -> Result<(Arc<dyn ObjectStore>, &'a str)> { | ||
url: impl AsRef<url::Url>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems like a reasonable change so that object store paths are always identified by url 👍 |
||
) -> Result<Arc<dyn ObjectStore>> { | ||
self.object_store_registry | ||
.lock() | ||
.get_by_uri(uri) | ||
.get_by_url(url) | ||
.map_err(DataFusionError::from) | ||
} | ||
|
||
/// If supported by the implementation, adds a new table to this schema by creating a | ||
/// `ListingTable` from the provided `uri` and a previously registered `ObjectStore`. | ||
/// `ListingTable` from the provided `url` and a previously registered `ObjectStore`. | ||
/// If a table of the same name existed before, it returns "Table already exists" error. | ||
pub async fn register_listing_table( | ||
&self, | ||
name: &str, | ||
uri: &str, | ||
table_path: ListingTableUrl, | ||
config: Option<ListingTableConfig>, | ||
) -> Result<()> { | ||
let config = match config { | ||
Some(cfg) => cfg, | ||
None => { | ||
let (object_store, _path) = self.object_store(uri)?; | ||
ListingTableConfig::new(object_store, uri).infer().await? | ||
let object_store = self.object_store(&table_path)?; | ||
ListingTableConfig::new(object_store, table_path) | ||
.infer() | ||
.await? | ||
} | ||
}; | ||
|
||
|
@@ -255,6 +257,7 @@ mod tests { | |
use crate::datasource::empty::EmptyTable; | ||
use crate::execution::context::SessionContext; | ||
|
||
use crate::datasource::listing::ListingTableUrl; | ||
use futures::StreamExt; | ||
|
||
#[tokio::test] | ||
|
@@ -280,12 +283,13 @@ mod tests { | |
async fn test_schema_register_listing_table() { | ||
let testdata = crate::test_util::parquet_test_data(); | ||
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); | ||
let table_path = ListingTableUrl::parse(filename).unwrap(); | ||
|
||
let schema = ObjectStoreSchemaProvider::new(); | ||
let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {})); | ||
|
||
schema | ||
.register_listing_table("alltypes_plain", &filename, None) | ||
.register_listing_table("alltypes_plain", table_path, None) | ||
.await | ||
.unwrap(); | ||
|
||
|
@@ -338,8 +342,9 @@ mod tests { | |
|| file == OsStr::new("alltypes_plain.parquet") | ||
{ | ||
let name = path.file_stem().unwrap().to_str().unwrap(); | ||
let path = ListingTableUrl::parse(&sized_file.path).unwrap(); | ||
schema | ||
.register_listing_table(name, &sized_file.path, None) | ||
.register_listing_table(name, path, None) | ||
.await | ||
.unwrap(); | ||
} | ||
|
@@ -360,17 +365,18 @@ mod tests { | |
async fn test_schema_register_same_listing_table() { | ||
let testdata = crate::test_util::parquet_test_data(); | ||
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); | ||
let table_path = ListingTableUrl::parse(filename).unwrap(); | ||
|
||
let schema = ObjectStoreSchemaProvider::new(); | ||
let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {})); | ||
|
||
schema | ||
.register_listing_table("alltypes_plain", &filename, None) | ||
.register_listing_table("alltypes_plain", table_path.clone(), None) | ||
.await | ||
.unwrap(); | ||
|
||
schema | ||
.register_listing_table("alltypes_plain", &filename, None) | ||
.register_listing_table("alltypes_plain", table_path, None) | ||
.await | ||
.unwrap(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it is time to move
datasource
to its own crate, if possible 🤔