-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce ObjectStoreProvider to create an object store based on the url #2906
Conversation
Hi @alamb, @thinkharderdev, @andygrove, could help review this PR? Thanks in advance. |
cc @tustvold |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yahoNanJing I started to look at this PR and the code seems to do what is described in the description 👍 but I was not quite sure I understand the usecase.
Is the primary usecase "lazy" object store registration? Namely that the relevant ObjectStore
instance isn't instantiated unless a URL in a plan requires it?
To avoid to introduce the remote object store dependency for DataFusion core,
I agree this is not good.
I thought the way Ballista (or other systems) would work would be that:
- The user would provide a configuration file for whatever object stores (e.g. HDFS credentials, or similar) they wanted to connect to
- On process startup, Ballista would create
ObjectStore
instances based on the configuration and register them with the globalObjectStoreRegistry
- Since all Ballista processes would have the same (or compatible) configuration, serialized urls created by one would have the correct
ObjectStore
already instantiated int hem
Cargo.toml
Outdated
@@ -19,7 +19,6 @@ | |||
members = [ | |||
"datafusion/common", | |||
"datafusion/core", | |||
"datafusion/data-access", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code is unused, correct? And this is a cleanup that is not strictly necessary for this PR? (this is fine, I am just trying to make sure I understand)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It's for clean up and this PR is based on another PR #2904
Thanks @alamb for your comments.
Yes. By self-registration, we can achieve the "lazy" registration.
For the ballista usage, it's necessary for us to introduce the object store extensions as optional features. The detailed usage way may differ for different object stores. For example,
|
I guess I don't understand why the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly the issue is following switching to object_store
, the stores have to be registered per-host whereas previously they were registered per-scheme. The desired functionality is to be able to register a generic connector that knows given any hdfs URL how to create an appropriate ObjectStore
for it.
As such I wonder if we might instead call this concept ObjectStoreSchemeProvider
and store a HashMap on ObjectStoreRegistry
? What do people think? This would also allow creating an S3SchemeProvider that knows how to connect to any S3 bucket, etc...
@@ -162,7 +162,7 @@ pub fn split_files( | |||
pub async fn pruned_partition_list<'a>( | |||
store: &'a dyn ObjectStore, | |||
table_path: &'a ListingTableUrl, | |||
filters: &[Expr], | |||
filters: &'a [Expr], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When compiling, if without this, it throws lifetime exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a rust complier bug, the original code should not compile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also see this error occasionally (but not always) locally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems it's the Rust compiler bug. From my understanding for the lifetime, it's necessary for us to add 'a
@@ -81,10 +81,19 @@ impl std::fmt::Display for ObjectStoreUrl { | |||
} | |||
} | |||
|
|||
/// Object store self detector can detector an object store based on the url | |||
pub trait ObjectStoreSelfDetector: Send + Sync + 'static { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we might call this ObjectStoreSchemeProvider
, to make it more pluggable??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the functionality for this trait is for getting object store from url.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @tustvold 's suggestion is to use a slightly different name that perhaps is more conventional (I think often the term "Provider" is used to describe factory-like things in object oriented programming that instantiate instances of interfaces -- aka what this is doing)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it's based on the url rather than the scheme. How about just rename it to be ObjectStoreProvider
pub struct ObjectStoreRegistry { | ||
/// A map from scheme to object store that serve list / read operations for the store | ||
object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>, | ||
object_stores: Arc<RwLock<HashMap<String, Arc<dyn ObjectStore>>>>, | ||
self_detector: Option<Arc<dyn ObjectStoreSelfDetector>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above comment, I wonder if instead we make this a HashMap<String, Arc<dyn ObjectStoreSchemeProvider>>
keyed by url scheme
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think string is OK
// First check whether can get object store from registry | ||
let store = { | ||
let stores = self.object_stores.read(); | ||
let s = &url[url::Position::BeforeScheme..url::Position::BeforeHost]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this, the host should not be and is not part of the path passed to ObjectStore
implementations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we called the detector first and if that does not return an instance, we can fall back to the self.object_stores
? This would preserve the existing behavior and would likely have a simpler implementation
Something like (untested):
let store = self_detector
// try to get url from detector first
.map(|detector| detector.get_by_url(url))
// fallback to looking up by registered scheme
.or_else(|| self.oject_stores.read().get(s))
.ok_or_else(|| {
DataFusionError::Internal(format!(
"No suitable object store found for {}",
url
))
})?;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to call the detector later so that we can reuse the registered object stores.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I don't like about the current code is that it is hard to concisely explain how urls are resolved as various parts of the URL are tried in a sequence that I find confusing.
I guess I was thinking that we could keep the logic in DataFusion simple, Ballista or some other system could implement whatever arbitrary logic was needed in its implementation of the detector. If the detector wanted to first try the registered object stores it could do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the reason why it becomes complex is because the object_store changes the behavior and it's no longer based on the scheme any more. And we have to deal with complex parsing for different object stores.
The self-detector feature based on its url is a necessary feature for the ballista. However, manually registration is also a necessary feature for the datafusion, right? We still need to deal with different kinds of object stores in the datafusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If use before path, then it may include credential infos to the key for S3, right? 😭 It's also not good, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is that not good? I'd expect to be able to register two object stores with the same host but different credentials? I don't follow why this is an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will easily cause security issues. In general, we should not store credentials in memory, especially directly depending on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you perhaps expand on the threat model here, and how this would impact it. If the user has opted to use HTTP Basic Auth, I don't see how storing that data in memory as part of the URL is avoidable...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, we should not store the users' credential info in memory. The security topic related to many points:
- The expiration mechanism
- The encoding algorithm
- The encryption algorithm
- ...
However, this PR is not for dealing with it. If anyone is interested in it, we should open another issue. For HDFS, we leverage Kerberos technique to deal with security issue. However, I have little knowledge for S3. My intuition tells me that it's not safe to put the credential info in each request, although it's base64 encoded.
For this PR, it's OK to just use url::Position::BeforeScheme..url::Position::BeforePath to extract the key for one specific object store. And I'll submit a related commit to fix it.
Is this correct @yahoNanJing ? If so then it makes sense to me ( I didn't realize it was per host rather than per-scheme) Does that means it requires an |
Correct, we only pass the URL path to |
Hi @alamb and @tustvold, as @tustvold mentioned above, the switching to the object_store changes the previous behavior. And now we need to create ObjectStore for each host. |
Sounds like a good idea to me -- depending on @yahoNanJing 's preference, we could also add the feature to DataFusion and then consolidate when it was added / integrated into |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yahoNanJing -- I understand the need for this PR and would like to get it in to unblock you if needed
The only thing that this PR needs, in my opinion, is a test (of the ObjectStoreDetector logic)
@tustvold and I had some suggestions on how to improve the code but I think they could be done as a follow on PR as well
@@ -162,7 +162,7 @@ pub fn split_files( | |||
pub async fn pruned_partition_list<'a>( | |||
store: &'a dyn ObjectStore, | |||
table_path: &'a ListingTableUrl, | |||
filters: &[Expr], | |||
filters: &'a [Expr], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also see this error occasionally (but not always) locally
@@ -81,10 +81,19 @@ impl std::fmt::Display for ObjectStoreUrl { | |||
} | |||
} | |||
|
|||
/// Object store self detector can detector an object store based on the url | |||
pub trait ObjectStoreSelfDetector: Send + Sync + 'static { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @tustvold 's suggestion is to use a slightly different name that perhaps is more conventional (I think often the term "Provider" is used to describe factory-like things in object oriented programming that instantiate instances of interfaces -- aka what this is doing)
// First check whether can get object store from registry | ||
let store = { | ||
let stores = self.object_stores.read(); | ||
let s = &url[url::Position::BeforeScheme..url::Position::BeforeHost]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we called the detector first and if that does not return an instance, we can fall back to the self.object_stores
? This would preserve the existing behavior and would likely have a simpler implementation
Something like (untested):
let store = self_detector
// try to get url from detector first
.map(|detector| detector.get_by_url(url))
// fallback to looking up by registered scheme
.or_else(|| self.oject_stores.read().get(s))
.ok_or_else(|| {
DataFusionError::Internal(format!(
"No suitable object store found for {}",
url
))
})?;
@@ -81,10 +81,19 @@ impl std::fmt::Display for ObjectStoreUrl { | |||
} | |||
} | |||
|
|||
/// Object store self detector can detector an object store based on the url |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Object store self detector can detector an object store based on the url | |
/// Object store self detector can detector an object store based on the url | |
/// | |
/// This supports the usecase of providing [`ObjectStore`] instances based on different hosts | |
// (such as `hdfs://10.10.0.1` and `hdfs://10.10.0.2` which might have different access access credentials)? | |
Correct, we only pass the URL path to ObjectStore, and not any authority (host, port, credentials, etc...). |
Actually, this feature is only ballista's interests currently. And I'll add a test in the ballista repo. However, due to the dependency issue among datafusion, ballista and datafusion-objectstore-hdfs. It's necessary for us to merge this PR first. 😢 |
For someone may be interested, an initial implementation with the integration of datafusion, ballista and objectstore-hdfs is in https://github.com/yahoNanJing/arrow-ballista/tree/dev-202207. And all of the tests passed. |
Codecov Report
@@ Coverage Diff @@
## master #2906 +/- ##
==========================================
- Coverage 85.32% 85.31% -0.02%
==========================================
Files 273 273
Lines 49343 49377 +34
==========================================
+ Hits 42102 42126 +24
- Misses 7241 7251 +10
Continue to review full report at Codecov.
|
pub trait ObjectStoreProvider: Send + Sync + 'static { | ||
/// Detector a suitable object store based on its url if possible | ||
/// Return the key and object store | ||
fn get_by_url(&self, url: &Url) -> Option<(String, Arc<dyn ObjectStore>)>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how this returning of a String
is intended to work, as it will always be looked up in the hashmap based on &url[url::Position::BeforeScheme..url::Position::BeforePath];
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. With the new logic of get_by_url, we can just use the key as the parameter and the returned value without the String key.
/// Create the registry that object stores can registered into. | ||
/// ['LocalFileSystem'] store is registered in by default to support read local files natively. | ||
pub fn new() -> Self { | ||
pub fn new_with_detector( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub fn new_with_detector( | |
pub fn new_with_provider( |
pub struct ObjectStoreRegistry { | ||
/// A map from scheme to object store that serve list / read operations for the store | ||
object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>, | ||
object_stores: Arc<RwLock<HashMap<String, Arc<dyn ObjectStore>>>>, | ||
self_detector: Option<Arc<dyn ObjectStoreProvider>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self_detector: Option<Arc<dyn ObjectStoreProvider>>, | |
provider: Option<Arc<dyn ObjectStoreProvider>>, |
}; | ||
|
||
// If not, then try to detector based on its url. | ||
let store = store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This currently has a thread race, it needs to:
- Acquire the write lock
- Verify that a value hasn't been inserted in the intervening time
- Call the detector
- Add the result
- Drop the write lock
To be honest, this could probably switch to just using a Mutex
to keep things simple. There is unlikely to be significant contention to warrant the RWLock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. The RWLock is better than just using Mutex. Here, the read case will happen frequently. While the write case only happens a few times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the uncontended case, which is extremely likely given how short the critical section is, they will perform exactly the same - if anything the Mutex might be marginally faster. It was more an observation that the complexity of using a RWLock is probably not actually yielding any return.
A simple Mutex would allow you to use get_or_insert_with
, and avoid what is currently a thread race
/// Object store registry | ||
#[derive(Clone)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed, afaict all locations use Arc<ObjectStoreRegistry>
?
@@ -121,6 +122,8 @@ pub struct RuntimeConfig { | |||
pub disk_manager: DiskManagerConfig, | |||
/// MemoryManager to limit access to memory | |||
pub memory_manager: MemoryManagerConfig, | |||
/// ObjectStoreRegistry to get object store based on url | |||
pub object_store_registry: ObjectStoreRegistry, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should possibly be Arc<ObjectStoreRegistry>
for consistency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? Other properties are not wrapped with Arc. And for one env, there should be only one runtime_env and its related properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two reasons:
- We use
Arc<ObjectStoreRegistry>
elsewhere and so using it here is more consistent - It provides a hint that this is shared state, e.g. we use
Arc<DiskManager>
instead of justDiskManager
,Arc<MemoryManager>
instead ofMemoryManager
etc...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Agree with you that here better to use Arc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -162,7 +162,17 @@ mod tests { | |||
#[tokio::test] | |||
async fn test_schema_register_listing_table() { | |||
let testdata = crate::test_util::parquet_test_data(); | |||
let filename = format!("file:///{}/{}", testdata, "alltypes_plain.parquet"); | |||
let testdir = if testdata.starts_with('/') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this logic would be better handled in get_data_dir (to remove any trailing /)? As it stands I'm surprised just making this change here is sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For different OS, the detailed parsing logic is different. It seems the logic here is too trivial and should be refined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think lets get this in and we can refine it in follow up PRs
Benchmark runs are scheduled for baseline = 6cb695f and contender = 305e265. 305e265 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #2905.
Rationale for this change
Currently ObjectStore cannot be self-registered by its path url, which is very important feature for Ballista to support remote storage, like HDFS, etc. To avoid to introduce the remote object store dependency for DataFusion core, it's better to make the ObjectStoreRegistry have the ability to self detect an ObjectStore based on url.
What changes are included in this PR?
Introduce a trait property, ObjectStoreSelfDetector, for the ObjectStoreRegistry for the ability to self detect an ObjectStore based on url. If Ballista wants to have this ability, it needs to set a specific detector for the ObjectStoreRegistry and config it in the RumtimeEnv.
Are there any user-facing changes?