-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change ObjectStoreRegistry from struct to trait to provide polymorphism #5543
Conversation
…vide polymorphism for get_by_url
Hi @alamb, @tustvold, @thinkharderdev and @andygrove, an initial prototype of the data source cache layer for Ballista has almost been implemented https://github.com/yahoNanJing/arrow-ballista/tree/dev-20230302. From now on, I will continuously reorganize the code and contribute back to the community. To achieve this, we need to apply a few changes to the datafusion and arrow-rs. Could you help review this PR, first? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to have non-trivial overlap with ObjectStoreProvider, I wonder if we could combine them?
On a somewhat related note, I'm a little confused why this is needed for a cache system, couldn't that just be kept as a detail of whatever is creating the ObjectStore? I.e. the stores are registered already wrapped in a CachingStore (or whatever it gets called). I think this would be significantly simpler and easier to understand, and would not require any changes outside of Ballista?
The An example usage is linked on the issue description. https://github.com/yahoNanJing/arrow-ballista/blob/dev-20230302/ballista/core/src/cache_layer/object_store/file.rs |
impl ObjectStoreManager for DefaultObjectStoreManager { | ||
fn register_store( | ||
&self, | ||
scheme: &str, | ||
host: &str, | ||
store: Arc<dyn ObjectStore>, | ||
) -> Option<Arc<dyn ObjectStore>> { | ||
let s = format!("{scheme}://{host}"); | ||
self.object_stores.insert(s, store) | ||
} | ||
|
||
/// Get a suitable store for the provided URL. For example: | ||
/// | ||
/// - URL with scheme `file:///` or no schema will return the default LocalFS store | ||
/// - URL with scheme `s3://bucket/` will return the S3 store if it's registered | ||
/// - URL with scheme `hdfs://hostname:port/` will return the hdfs store if it's registered | ||
fn get_by_url(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> { | ||
let s = &url[url::Position::BeforeScheme..url::Position::BeforePath]; | ||
self.object_stores | ||
.get(s) | ||
.map(|o| o.value().clone()) | ||
.ok_or_else(|| { | ||
DataFusionError::Internal(format!( | ||
"No suitable object store found for {url}" | ||
)) | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused by this. This is the same interface as ObjectStoreRegistry
. What is the functional difference that requires wrapping this in another layer of indirection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for this is to make the cache transparent for the caller. An example usage is here,
yahoNanJing/arrow-ballista@48c703b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Ballista, we can create an ObjectStoreRegistry
with a CacheBasedObjectStoreManager
. Then every ObjectStore
got by ObjectStoreRegistry
will be wrapped with a cache layer.
Why can't whatever is creating the inner |
The reason is to make the cache layer transparent for the caller. To achieve this, when invoking |
Who is the caller in this case? I had understood Ballista to have a config system that handled object store registration, in order to allow it to propagate to the workers correctly?
This is already covered by ObjectStoreProvider, no? You can just have an ObjectStoreProvider that returns decorated ObjectStore? |
When you have already cached the |
Is that not an advantage, for example, if registering a InMemory or LocalFileSystem, I would not want them to be automatically wrapped in a caching decorator? Perhaps am I missing something here, but I had understood the manual registration API being for advanced use-cases where you want fine-grained control of the |
/// - URL with scheme `file:///` or no schema will return the default LocalFS store | ||
/// - URL with scheme `s3://bucket/` will return the S3 store | ||
/// - URL with scheme `hdfs://hostname:port/` will return the hdfs store | ||
fn get_by_url(&self, url: &Url) -> Result<Arc<dyn ObjectStore>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry I am missing something -- how is this different than
?
While ObjectStoreManager focuses on the polymorphism of how to get an ObjectStore by get_by_url.
It seems like the only function of ObjectStoreProvider
is a polymorphic get_by_url
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alamb for reviewing. Actually, the main concern is that ObjectStoreProvider
only covers the get_by_url
. It does not cover the manual registration. Therefore, in this PR, I introduced ObjectStoreManager
to cover both. In regard to whether to combine ObjectStoreManager
and ObjectStoreProvider
, I have no preference. Here, the main reason of not combining them is for the backward compatibility of existing usage of ObjectStoreProvider
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it -- thank you for the explanation @yahoNanJing , I missed the register_store
I have no preference. Here, the main reason of not combining them is for the backward compatibility of existing usage of ObjectStoreProvider.
I think we can add a new method to ObjectStoreProvider
that has a default implementation and maintain backwards compatibility.
For example, what if we added something like this (not tested) which would require no changes to existing ObjectStoreProvider
s?
/// Registers the specified object store for urls with scheme/host
/// returning the previously registered store if any.
fn register_store(
&self,
scheme: &str,
host: &str,
store: Arc<dyn ObjectStore>,
) -> Result<Option<Arc<dyn ObjectStore>>>
{
Err(DataFusionError::NotImplemented("register_store is not supported by this provider"))
}
If we could extend ObjectStoreProvider
that would be my preference as I think it keeps the overall code simpler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
register_store
already exists on ObjectStoreRegistry
so this would essentially make ObjectStoreRegistry
and ObjectStoreProvider
have the same interface which I think is confusing. So as I understand it, the role of ObjectStoreProvider
is just to allow lazy construction. Manual registration can already be done through the registry. So it just seems to me like we could serve the same use case by holding an Arc<dyn ObjectStoreRegistry>
in whatever is managing the cache layer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to improve the docs in #5577
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it just seems to me like we could serve the same use case by holding an Arc in whatever is managing the cache layer.
Thanks @thinkharderdev. Since now the ObjectStoreRegistry
is a struct, I introduced another trait ObjectStoreManager
for this with less code change. However, We can just simplify this by changing ObjectStoreRegistry
to a trait and provide default implementation with a few code changes.
I don't think it's an advantage. How to register an If you don't want to automatically wrapped in a caching decorator, it's doable to create a specific |
It depends. For Ballista client, it can manually register stores. And we should not have the assumption that users don't do the registration before using the auto-configured stores registered by |
Hi @thinkharderdev, @alamb, according to previous discussion, I remove the trait |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yahoNanJing -- I think the structure of this PR now looks very nice 👌 I like how the traits have been combined. It would be good I think to put back the docstrings with examples prior to merge.
I think others should have a chance to weigh in prior to merging, but I really like the result of community discussion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I always thought this was a bit convoluted. I think this makes the API more straightforward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like the switch to a trait object, more flexible and significantly easier to follow 👍
Only question concerns the overlap of register_store
and put_with_url
. The original design intentionally did not allow registering arbitrary URLs to make it clearer what bits of a URL identified an object store.
I don't feel especially strongly about this, but feel we probably shouldn't have overlapping methods, and definitely should document that things like the path should not influence the returned ObjectStore (as this would have very funky semantics)
let store = | ||
ObjectStoreScheme::from_str(url.scheme()).map( | ||
|scheme| match scheme { | ||
ObjectStoreScheme::S3 => build_s3_object_store(url), | ||
ObjectStoreScheme::GCS => build_gcs_object_store(url), | ||
}, | ||
)??; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let store = | |
ObjectStoreScheme::from_str(url.scheme()).map( | |
|scheme| match scheme { | |
ObjectStoreScheme::S3 => build_s3_object_store(url), | |
ObjectStoreScheme::GCS => build_gcs_object_store(url), | |
}, | |
)??; | |
let store = match ObjectStoreScheme::from_str(url.scheme())? | |
ObjectStoreScheme::S3 => build_s3_object_store(url)?, | |
ObjectStoreScheme::GCS => build_gcs_object_store(url)?, | |
}; |
Or something similar
/// Insert a [`ObjectStore`] with the key of a given url got by [`get_url_key()`] | ||
/// | ||
/// If a store with the same url key, it is replaced and returned | ||
fn put_with_url( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need both a put_with_url
and register_store
?
The motivation for not passing the entire URL, was to avoid confusion over what bits of the URL "counted", e.g. should credential influence this? I don't feel strongly, but feel we probably shouldn't have two methods that do basically the same thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tustvold for your suggestion. put_with_url
and register_store
are using different strategies for registration. Perhaps it's better to just use key and value as the parameters for the interface. I'll refine this in the next commit.
@yahoNanJing do you plan to merge this PR and refine in a follow on one, or shall we hold off merging this PR until you have made more changes? |
What is the plan for this PR? |
Hi @alamb, sorry for the delay. I'm going to add commit to this PR for the review comments today |
Hi @alamb, @thinkharderdev, @tustvold, just add a commit for your comments. Could you help have a review again? |
datafusion-cli/src/object_storage.rs
Outdated
impl ObjectStoreRegistry for DatafusionCliObjectStoreRegistry { | ||
fn register_store( | ||
&self, | ||
key: &str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the use-case for registering stores not based on scheme and host? I mainly ask as it is unclear how this method interacts with get_by_url? Could we just keep the host and scheme signature instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to consider port
? Sometimes, it's also important to identify a url. I don't know whether it's too strict to register with host and scheme. For previous implementation, for a given url, the key for registration is &url[url::Position::BeforeScheme..url::Position::BeforePath]
which may be not the same as your expectation, although I agree with you it's should not include the credential info
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the key for registration is
That's the key for lookup, and so it was harmless for it to contain more, it would just not be found.
port
Including an optional port seems like a sensible addition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If with only scheme and host for register, how can we achieve register by url which may be invoked in get_with_url
for lazy registration with key of &url[url::Position::BeforeScheme..url::Position::BeforePath]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be a bug then, yes. We shouldn't be registering with that, well spotted.
It should instead call register_store with just the relevant parts of the URL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with either with url or a pair of (scheme, host, port).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's go with a URL, it is more flexible, avoids a potentially breaking change even if it was unintentional, and also matches the get method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I'll refine it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, apologies for the back and forth 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Np. We are making things better and better. Thanks for your suggestions to let the interface simply and meaningful.
let key = | ||
&url[url::Position::BeforeScheme..url::Position::BeforePath]; | ||
self.object_stores.insert(key.to_owned(), store.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As pointed out in https://github.com/apache/arrow-datafusion/pull/5543/files#r1143108925 this has a bug, in that it registers using the entire URL authority, including things like credentials, etc... This should instead be calling register_store
as "normal".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly just some minor nits, looks good to me, thank you
/// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be executed depending on | ||
/// the `url`. An [`ObjectStore`] may be lazily created and registered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be executed depending on | |
/// the `url`. An [`ObjectStore`] may be lazily created and registered. | |
/// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be executed depending on | |
/// the `url` and [`ObjectStoreRegistry`] implementation |
@@ -132,18 +115,37 @@ pub trait ObjectStoreProvider: Send + Sync + 'static { | |||
/// buckets using [`ObjectStoreRegistry::register_store`] | |||
/// | |||
/// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can create [`ObjectStore`] | |||
/// lazily, on-demand using [`ObjectStoreProvider`] | |||
/// lazily |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// lazily | |
/// lazily by providing a custom implementation of [`ObjectStoreRegistry`] |
let port_info = url | ||
.port() | ||
.map(|port| format!(":{port}")) | ||
.unwrap_or(String::new()); | ||
format!( | ||
"{}://{}{}/", | ||
&url[url::Position::BeforeScheme..url::Position::AfterScheme], | ||
&url[url::Position::BeforeHost..url::Position::AfterHost], | ||
port_info | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let port_info = url | |
.port() | |
.map(|port| format!(":{port}")) | |
.unwrap_or(String::new()); | |
format!( | |
"{}://{}{}/", | |
&url[url::Position::BeforeScheme..url::Position::AfterScheme], | |
&url[url::Position::BeforeHost..url::Position::AfterHost], | |
port_info | |
) | |
format!( | |
"{}://{}{}", | |
url->scheme(), | |
&url[url::Position::BeforeHost..url::Position::AfterPort], | |
) |
Note: this also removes the trailing /
which I think isn't correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad. The trailing /
can be removed.
let url = Url::parse("file://").unwrap(); | ||
let key = get_url_key(&url); | ||
object_stores.insert(key, Arc::new(LocalFileSystem::new())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let url = Url::parse("file://").unwrap(); | |
let key = get_url_key(&url); | |
object_stores.insert(key, Arc::new(LocalFileSystem::new())); | |
object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); |
See below comment about removing trailing /
/// Get a suitable store for the provided URL. For example: | ||
/// | ||
/// - URL with scheme `file:///` or no schema will return the default LocalFS store | ||
/// - URL with scheme `s3://bucket/` will return the S3 store | ||
/// - URL with scheme `hdfs://hostname:port/` will return the hdfs store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Get a suitable store for the provided URL. For example: | |
/// | |
/// - URL with scheme `file:///` or no schema will return the default LocalFS store | |
/// - URL with scheme `s3://bucket/` will return the S3 store | |
/// - URL with scheme `hdfs://hostname:port/` will return the hdfs store | |
/// Get a suitable store for the provided URL |
These are outdated now
} | ||
|
||
/// The default [`ObjectStoreRegistry`] | ||
pub struct DefaultObjectStoreRegistry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub struct DefaultObjectStoreRegistry { | |
/// | |
/// Stores are registered based on the scheme, host and port of the provided URL | |
/// with a [`LocalFileSystem::new`] automatically registered for `file://` | |
/// | |
/// For example: | |
/// | |
/// - `file:///my_path` will return the default LocalFS store | |
/// - `s3://bucket/path` will return a store registered with `s3://bucket` if any | |
/// - `hdfs://host:port/path` will return a store registered with `hdfs://host:port` if any | |
pub struct DefaultObjectStoreRegistry { |
datafusion-cli/src/object_storage.rs
Outdated
self.inner.register_store(url, store) | ||
} | ||
|
||
fn get_or_lazy_register_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn get_or_lazy_register_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> { | |
fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> { |
The lazy creation is an implementation detail of certain ObjectStoreRegistry
, I'm not sure it needs to be encoded in the name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree.
/// The [`DefaultObjectStoreRegistry`] will only depend on the inner object store cache | ||
/// to decide whether it's able to find an [`ObjectStore`] for a url. No ad-hoc discovery | ||
/// and lazy registration will be executed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// The [`DefaultObjectStoreRegistry`] will only depend on the inner object store cache | |
/// to decide whether it's able to find an [`ObjectStore`] for a url. No ad-hoc discovery | |
/// and lazy registration will be executed. |
These docs aren't rendered by docs.rs, I moved this onto the type definition and reworded it slightly
@@ -43,28 +46,52 @@ impl FromStr for ObjectStoreScheme { | |||
} | |||
} | |||
|
|||
#[derive(Debug)] | |||
pub struct DatafusionCliObjectStoreProvider {} | |||
#[derive(Debug, Default)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[derive(Debug, Default)] | |
/// An [`ObjectStoreRegistry`] that can automatically create S3 and GCS stores for a given URL | |
#[derive(Debug, Default)] |
Thanks for sticking with this @yahoNanJing and @tustvold |
Which issue does this PR close?
Closes #5541.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?