-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract Listing URI logic into ListingTableUri structure #2578
Conversation
&self, | ||
uri: &'a str, | ||
) -> Result<(Arc<dyn ObjectStore>, &'a str)> { | ||
pub fn object_store(&self, uri: &ListingTableUrl) -> Result<Arc<dyn ObjectStore>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheme stripping logic is now part of ListingTableUrl
/// and lacks a leading `/` | ||
/// | ||
/// TODO: Handle paths consistently (#2489) | ||
fn prefix(&self) -> &str { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As part of #2489 I hope to bring some consistency to what the paths passed to the ObjectStore actually are. It's currently a bit confused
} | ||
|
||
/// List all files identified by this [`ListingTableUrl`] for the provided `file_extension` | ||
pub(crate) fn list_all_files<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This combines the various different ObjectStore listing methods into a single method. This is clearer and more consistent
@@ -504,7 +504,7 @@ mod tests { | |||
"bucket/key-prefix/file3", | |||
"bucket/key-prefix/file4", | |||
], | |||
"bucket/key-prefix/", | |||
"file:///bucket/key-prefix/", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These files don't actually exist, so we need to fully qualify the URLs
//! "| 1 | 2 |", | ||
//! "+---+--------------------------+" | ||
//! "+---+----------------+", | ||
//! "| a | MIN(?table?.b) |", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure what change caused this... I wonder if we aren't running doctests in CI 🤔
Edit: it is because of https://github.com/apache/arrow-datafusion/pull/2578/files#diff-e9c2d69b5793675f76c48aee263e29006065f1b28c50f056ccc6efe7d062aa77L611, this makes CSV consistent with other formats, which is probably a good thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems ok to me. I am not sure if the use the filename as table name
function of csv was intentional (like maybe it is some sort of dataframe compatibility)?
cc @andygrove
} | ||
|
||
#[tokio::test] | ||
async fn test_find_longest_search_path_without_glob_pattern() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is reformulated and moved to ListingTableUrl
@@ -78,54 +74,6 @@ pub trait ObjectStore: Sync + Send + Debug { | |||
/// Returns all the files in path `prefix` | |||
async fn list_file(&self, prefix: &str) -> Result<FileMetaStream>; | |||
|
|||
/// Calls `list_file` with a suffix filter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is now encapsulated in ListingTableUrl
Aside from the serialization issue we discussed above, I think this looks great. |
Looks good.. (I am under the impression that support for globbing (almost as an edge-case) does not belong in here but should be implemented on a more outward/user-facing api instead where it a proper part of the spec..) |
I don't disagree with this, but aside from not supporting globbing, I couldn't see another way to support it whilst also treating the argument as a URI, which is critical for object store support. Alternative suggestions welcome... |
@@ -642,7 +642,7 @@ order by | |||
#[tokio::test] | |||
async fn test_physical_plan_display_indent() { | |||
// Hard code target_partitions as it appears in the RepartitionExec output | |||
let config = SessionConfig::new().with_target_partitions(3); | |||
let config = SessionConfig::new().with_target_partitions(9000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was causing an amusing issue, as the mac runner happened to have 3 CPUs, it would replace the 3 with NUM_CPUS 🤦
I'm working on fixing the windows issues, will then do the ballista dance, I think this should be ready for review though |
@@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" } | |||
datafusion-row = { path = "../row", version = "8.0.0" } | |||
datafusion-sql = { path = "../sql", version = "8.0.0" } | |||
futures = "0.3" | |||
glob = "0.3.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved from data_access
hashbrown = { version = "0.12", features = ["raw"] } | ||
itertools = "0.10" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is technically a new dependency of this crate, but it is so ubiquitous and is a dependency of prost, etc.. so this is probably ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But datafusion doesn't depend on prost, right? If we could avoid another dependency that would be goo in my opinion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well once we bring in the new object_store we will depend on it...
Ultimately it is such a fundamental crate, like bytes, parking_lot, etc... I'm inclined to think it isn't really a problem...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I still cringe a little, but I vote
@@ -85,6 +87,7 @@ sqlparser = "0.17" | |||
tempfile = "3" | |||
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } | |||
tokio-stream = "0.1" | |||
url = "2.2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a new dependency, it is brought in by hyper and friends and so again is pretty ubiquitous
#2639 seems to have introduced logical conflicts... Fixing... |
@@ -68,6 +70,8 @@ lazy_static! { | |||
pub struct FileScanConfig { | |||
/// Store from which the `files` should be fetched | |||
pub object_store: Arc<dyn ObjectStore>, | |||
/// Object store URL | |||
pub object_store_url: ObjectStoreUrl, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the solution I came up with to allow ballista to round-trip the physical plan, store a canonical ObjectStoreUrl instead of relying on the files to contain the scheme.
I think perhaps in a subsequent PR I'll try to remove the object_store
member which is now redundant.
FYI @thinkharderdev
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worth a tracking ticket?
} | ||
|
||
/// Object store registry | ||
pub struct ObjectStoreRegistry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved from object_store_registry
use std::sync::Arc; | ||
|
||
/// Object store registry | ||
pub struct ObjectStoreRegistry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to object_store module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went over this PR quite carefully. I think it is a significant step forward. Nice work @tustvold
Other than the windows test removal in datafusion/core/src/datasource/listing/helpers.rs
it all looked pretty great to me 🏆
@@ -50,6 +50,7 @@ use datafusion::{ | |||
|
|||
use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; | |||
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; | |||
use datafusion::datasource::listing::ListingTableUrl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it is time to move datasource
to its own crate, if possible 🤔
@@ -0,0 +1,304 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comment: maybe this file should be called "url.rs" for consistency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heh, this was my trick to avoid a name collision with the url crate 😆
/// | ||
/// # Paths without a Scheme | ||
/// | ||
/// If no scheme is provided, or the string is an absolute filesystem path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 this is great. Thank you for this writeup. I think it makes sense to me
@yjshen is this behavior ok with you?
} | ||
|
||
/// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path | ||
fn parse_path(s: &str) -> Result<Self> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any use case for this being pub
? Like to allow users to force the parsing to treat it like a path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd rather keep it private until such a use-case comes along. There's enough funky here, that I'm loathe to introduce more potential for strangeness 😅
//! "| 1 | 2 |", | ||
//! "+---+--------------------------+" | ||
//! "+---+----------------+", | ||
//! "| a | MIN(?table?.b) |", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems ok to me. I am not sure if the use the filename as table name
function of csv was intentional (like maybe it is some sort of dataframe compatibility)?
cc @andygrove
@@ -68,6 +70,8 @@ lazy_static! { | |||
pub struct FileScanConfig { | |||
/// Store from which the `files` should be fetched | |||
pub object_store: Arc<dyn ObjectStore>, | |||
/// Object store URL | |||
pub object_store_url: ObjectStoreUrl, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worth a tracking ticket?
// to ARROW_TEST_DATA/csv/aggregate_test_100.csv | ||
let data_path = datafusion::test_util::arrow_test_data(); | ||
let s = s.replace(&data_path, "ARROW_TEST_DATA"); | ||
pub struct ExplainNormalizer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* Update to apache/datafusion#2578 * Fix standalone build * Update datafusion pin
Which issue does this PR close?
Closes #2562
Part of #2489
Rationale for this change
See tickets, the previous logic was inconsistent, and varied based on call-site.
What changes are included in this PR?
This extracts a
ListingTableUrl
that handles all the logic of translating from the user-provided string, to a canonical representation that can be used by the rest of DataFusion.I'm having some difficulty with some of the ballista tests, likely related to apache/datafusion-ballista#481, and would appreciate some help with this. Perhaps @thinkharderdev ?Are there any user-facing changes?
Yes, this makes changes both to the public API, and also the interpretation of certain strings. In particular:
FYI @timvw