diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs index 7634328f323e..b886966bf6db 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/core/src/catalog/schema.rs @@ -162,7 +162,17 @@ mod tests { #[tokio::test] async fn test_schema_register_listing_table() { let testdata = crate::test_util::parquet_test_data(); - let filename = format!("file:///{}/{}", testdata, "alltypes_plain.parquet"); + let testdir = if testdata.starts_with('/') { + format!("file://{}", testdata) + } else { + format!("file:///{}", testdata) + }; + let filename = if testdir.ends_with('/') { + format!("{}{}", testdir, "alltypes_plain.parquet") + } else { + format!("{}/{}", testdir, "alltypes_plain.parquet") + }; + let table_path = ListingTableUrl::parse(filename).unwrap(); let catalog = MemoryCatalogProvider::new(); diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 873d005b4baf..b4b93d5fc17e 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -162,7 +162,7 @@ pub fn split_files( pub async fn pruned_partition_list<'a>( store: &'a dyn ObjectStore, table_path: &'a ListingTableUrl, - filters: &[Expr], + filters: &'a [Expr], file_extension: &'a str, table_partition_cols: &'a [String], ) -> Result>> { diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs index aca5b0ca412e..65f3900091e6 100644 --- a/datafusion/core/src/datasource/object_store.rs +++ b/datafusion/core/src/datasource/object_store.rs @@ -81,10 +81,19 @@ impl std::fmt::Display for ObjectStoreUrl { } } +/// Object store provider can detector an object store based on the url +pub trait ObjectStoreProvider: Send + Sync + 'static { + /// Detector a suitable object store based on its url if possible + /// Return the key and object store + fn get_by_url(&self, url: &Url) -> Option>; +} + /// Object store registry +#[derive(Clone)] pub struct ObjectStoreRegistry { /// A map from scheme to object store that serve list / read operations for the store - object_stores: RwLock>>, + object_stores: Arc>>>, + provider: Option>, } impl std::fmt::Debug for ObjectStoreRegistry { @@ -105,13 +114,19 @@ impl Default for ObjectStoreRegistry { } impl ObjectStoreRegistry { + /// By default the self detector is None + pub fn new() -> Self { + ObjectStoreRegistry::new_with_provider(None) + } + /// Create the registry that object stores can registered into. /// ['LocalFileSystem'] store is registered in by default to support read local files natively. - pub fn new() -> Self { + pub fn new_with_provider(provider: Option>) -> Self { let mut map: HashMap> = HashMap::new(); map.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); Self { - object_stores: RwLock::new(map), + object_stores: Arc::new(RwLock::new(map)), + provider, } } @@ -132,19 +147,43 @@ impl ObjectStoreRegistry { /// /// - URL with scheme `file:///` or no schema will return the default LocalFS store /// - URL with scheme `s3://bucket/` will return the S3 store if it's registered + /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store if it's registered /// pub fn get_by_url(&self, url: impl AsRef) -> Result> { let url = url.as_ref(); - let s = &url[url::Position::BeforeScheme..url::Position::AfterHost]; - let stores = self.object_stores.read(); - let store = stores.get(s).ok_or_else(|| { - DataFusionError::Internal(format!( - "No suitable object store found for {}", - url - )) - })?; - - Ok(store.clone()) + // First check whether can get object store from registry + let store = { + let stores = self.object_stores.read(); + let s = &url[url::Position::BeforeScheme..url::Position::BeforePath]; + stores.get(s).cloned() + }; + + // If not, then try to detector based on its url. + let store = store + .or_else(|| { + if let Some(provider) = &self.provider { + // If detected, register it + if let Some(store) = provider.get_by_url(url) { + let mut stores = self.object_stores.write(); + let key = + &url[url::Position::BeforeScheme..url::Position::BeforePath]; + stores.insert(key.to_owned(), store.clone()); + Some(store) + } else { + None + } + } else { + None + } + }) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "No suitable object store found for {}", + url + )) + })?; + + Ok(store) } } @@ -190,6 +229,14 @@ mod tests { assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo"); } + #[test] + fn test_get_by_url_hdfs() { + let sut = ObjectStoreRegistry::default(); + sut.register_store("hdfs", "localhost:8020", Arc::new(LocalFileSystem::new())); + let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap(); + sut.get_by_url(&url).unwrap(); + } + #[test] fn test_get_by_url_s3() { let sut = ObjectStoreRegistry::default(); diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 5cb45be2f065..e186e829abf3 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -85,7 +85,7 @@ use crate::config::{ ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE, OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES, }; -use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use crate::execution::runtime_env::RuntimeEnv; use crate::logical_plan::plan::Explain; use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet}; use crate::physical_plan::planner::DefaultPhysicalPlanner; @@ -180,7 +180,7 @@ impl SessionContext { /// Creates a new session context using the provided session configuration. pub fn with_config(config: SessionConfig) -> Self { - let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); + let runtime = Arc::new(RuntimeEnv::default()); Self::with_config_rt(config, runtime) } @@ -1211,10 +1211,7 @@ impl Debug for SessionState { /// Default session builder using the provided configuration pub fn default_session_builder(config: SessionConfig) -> SessionState { - SessionState::with_config_rt( - config, - Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()), - ) + SessionState::with_config_rt(config, Arc::new(RuntimeEnv::default())) } impl SessionState { @@ -1902,7 +1899,7 @@ mod tests { #[tokio::test] async fn custom_query_planner() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); + let runtime = Arc::new(RuntimeEnv::default()); let session_state = SessionState::with_config_rt(SessionConfig::new(), runtime) .with_query_planner(Arc::new(MyQueryPlanner {})); let ctx = SessionContext::with_state(session_state); diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs index d810c882fb09..36159db8e802 100644 --- a/datafusion/core/src/execution/runtime_env.rs +++ b/datafusion/core/src/execution/runtime_env.rs @@ -57,12 +57,13 @@ impl RuntimeEnv { let RuntimeConfig { memory_manager, disk_manager, + object_store_registry, } = config; Ok(Self { memory_manager: MemoryManager::new(memory_manager), disk_manager: DiskManager::try_new(disk_manager)?, - object_store_registry: Arc::new(ObjectStoreRegistry::new()), + object_store_registry: Arc::new(object_store_registry), }) } @@ -121,6 +122,8 @@ pub struct RuntimeConfig { pub disk_manager: DiskManagerConfig, /// MemoryManager to limit access to memory pub memory_manager: MemoryManagerConfig, + /// ObjectStoreRegistry to get object store based on url + pub object_store_registry: ObjectStoreRegistry, } impl RuntimeConfig { @@ -141,6 +144,15 @@ impl RuntimeConfig { self } + /// Customize object store registry + pub fn with_object_store_registry( + mut self, + object_store_registry: ObjectStoreRegistry, + ) -> Self { + self.object_store_registry = object_store_registry; + self + } + /// Specify the total memory to use while running the DataFusion /// plan to `max_memory * memory_fraction` in bytes. /// diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 90ff7a448f5d..46e09ff27445 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1582,7 +1582,7 @@ mod tests { use crate::assert_contains; use crate::execution::context::TaskContext; use crate::execution::options::CsvReadOptions; - use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; + use crate::execution::runtime_env::RuntimeEnv; use crate::logical_plan::plan::Extension; use crate::physical_plan::{ expressions, DisplayFormatType, Partitioning, Statistics, @@ -1604,7 +1604,7 @@ mod tests { use std::{any::Any, fmt}; fn make_session_state() -> SessionState { - let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); + let runtime = Arc::new(RuntimeEnv::default()); SessionState::with_config_rt(SessionConfig::new(), runtime) } diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs index 7e0a7a600d0d..13ddb1eb8da1 100644 --- a/datafusion/core/tests/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined_plan.rs @@ -87,7 +87,7 @@ use std::{any::Any, collections::BTreeMap, fmt, sync::Arc}; use async_trait::async_trait; use datafusion::execution::context::TaskContext; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::plan::{Extension, Sort}; use datafusion::logical_plan::{DFSchemaRef, Limit}; use datafusion::optimizer::optimizer::OptimizerConfig; @@ -247,7 +247,7 @@ async fn topk_plan() -> Result<()> { fn make_topk_context() -> SessionContext { let config = SessionConfig::new().with_target_partitions(48); - let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); + let runtime = Arc::new(RuntimeEnv::default()); let state = SessionState::with_config_rt(config, runtime) .with_query_planner(Arc::new(TopKQueryPlanner {})) .add_optimizer_rule(Arc::new(TopKOptimizerRule {}));