Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ObjectStoreProvider to create an object store based on the url #2906

Merged
merged 4 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,17 @@ mod tests {
#[tokio::test]
async fn test_schema_register_listing_table() {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("file:///{}/{}", testdata, "alltypes_plain.parquet");
let testdir = if testdata.starts_with('/') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this logic would be better handled in get_data_dir (to remove any trailing /)? As it stands I'm surprised just making this change here is sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For different OS, the detailed parsing logic is different. It seems the logic here is too trivial and should be refined.

format!("file://{}", testdata)
} else {
format!("file:///{}", testdata)
};
let filename = if testdir.ends_with('/') {
format!("{}{}", testdir, "alltypes_plain.parquet")
} else {
format!("{}/{}", testdir, "alltypes_plain.parquet")
};

let table_path = ListingTableUrl::parse(filename).unwrap();

let catalog = MemoryCatalogProvider::new();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub fn split_files(
pub async fn pruned_partition_list<'a>(
store: &'a dyn ObjectStore,
table_path: &'a ListingTableUrl,
filters: &[Expr],
filters: &'a [Expr],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When compiling, if without this, it throws lifetime exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a rust complier bug, the original code should not compile.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also see this error occasionally (but not always) locally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems it's the Rust compiler bug. From my understanding for the lifetime, it's necessary for us to add 'a

file_extension: &'a str,
table_partition_cols: &'a [String],
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
Expand Down
73 changes: 60 additions & 13 deletions datafusion/core/src/datasource/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,19 @@ impl std::fmt::Display for ObjectStoreUrl {
}
}

/// Object store provider can detector an object store based on the url
pub trait ObjectStoreProvider: Send + Sync + 'static {
/// Detector a suitable object store based on its url if possible
/// Return the key and object store
fn get_by_url(&self, url: &Url) -> Option<Arc<dyn ObjectStore>>;
}

/// Object store registry
#[derive(Clone)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed, afaict all locations use Arc<ObjectStoreRegistry>?

pub struct ObjectStoreRegistry {
/// A map from scheme to object store that serve list / read operations for the store
object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
object_stores: Arc<RwLock<HashMap<String, Arc<dyn ObjectStore>>>>,
provider: Option<Arc<dyn ObjectStoreProvider>>,
}

impl std::fmt::Debug for ObjectStoreRegistry {
Expand All @@ -105,13 +114,19 @@ impl Default for ObjectStoreRegistry {
}

impl ObjectStoreRegistry {
/// By default the self detector is None
pub fn new() -> Self {
ObjectStoreRegistry::new_with_provider(None)
}

/// Create the registry that object stores can registered into.
/// ['LocalFileSystem'] store is registered in by default to support read local files natively.
pub fn new() -> Self {
pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>) -> Self {
let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
map.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
Self {
object_stores: RwLock::new(map),
object_stores: Arc::new(RwLock::new(map)),
provider,
}
}

Expand All @@ -132,19 +147,43 @@ impl ObjectStoreRegistry {
///
/// - URL with scheme `file:///` or no schema will return the default LocalFS store
/// - URL with scheme `s3://bucket/` will return the S3 store if it's registered
/// - URL with scheme `hdfs://hostname:port/` will return the hdfs store if it's registered
///
pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
let url = url.as_ref();
let s = &url[url::Position::BeforeScheme..url::Position::AfterHost];
let stores = self.object_stores.read();
let store = stores.get(s).ok_or_else(|| {
DataFusionError::Internal(format!(
"No suitable object store found for {}",
url
))
})?;

Ok(store.clone())
// First check whether can get object store from registry
let store = {
let stores = self.object_stores.read();
let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
stores.get(s).cloned()
};

// If not, then try to detector based on its url.
let store = store
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This currently has a thread race, it needs to:

  • Acquire the write lock
  • Verify that a value hasn't been inserted in the intervening time
  • Call the detector
  • Add the result
  • Drop the write lock

To be honest, this could probably switch to just using a Mutex to keep things simple. There is unlikely to be significant contention to warrant the RWLock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The RWLock is better than just using Mutex. Here, the read case will happen frequently. While the write case only happens a few times.

Copy link
Contributor

@tustvold tustvold Jul 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the uncontended case, which is extremely likely given how short the critical section is, they will perform exactly the same - if anything the Mutex might be marginally faster. It was more an observation that the complexity of using a RWLock is probably not actually yielding any return.

A simple Mutex would allow you to use get_or_insert_with, and avoid what is currently a thread race

.or_else(|| {
if let Some(provider) = &self.provider {
// If detected, register it
if let Some(store) = provider.get_by_url(url) {
let mut stores = self.object_stores.write();
let key =
&url[url::Position::BeforeScheme..url::Position::BeforePath];
stores.insert(key.to_owned(), store.clone());
Some(store)
} else {
None
}
} else {
None
}
})
.ok_or_else(|| {
DataFusionError::Internal(format!(
"No suitable object store found for {}",
url
))
})?;

Ok(store)
}
}

Expand Down Expand Up @@ -190,6 +229,14 @@ mod tests {
assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo");
}

#[test]
fn test_get_by_url_hdfs() {
let sut = ObjectStoreRegistry::default();
sut.register_store("hdfs", "localhost:8020", Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
sut.get_by_url(&url).unwrap();
}

#[test]
fn test_get_by_url_s3() {
let sut = ObjectStoreRegistry::default();
Expand Down
11 changes: 4 additions & 7 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::execution::runtime_env::RuntimeEnv;
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
Expand Down Expand Up @@ -180,7 +180,7 @@ impl SessionContext {

/// Creates a new session context using the provided session configuration.
pub fn with_config(config: SessionConfig) -> Self {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
let runtime = Arc::new(RuntimeEnv::default());
Self::with_config_rt(config, runtime)
}

Expand Down Expand Up @@ -1211,10 +1211,7 @@ impl Debug for SessionState {

/// Default session builder using the provided configuration
pub fn default_session_builder(config: SessionConfig) -> SessionState {
SessionState::with_config_rt(
config,
Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()),
)
SessionState::with_config_rt(config, Arc::new(RuntimeEnv::default()))
}

impl SessionState {
Expand Down Expand Up @@ -1902,7 +1899,7 @@ mod tests {

#[tokio::test]
async fn custom_query_planner() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
let runtime = Arc::new(RuntimeEnv::default());
let session_state = SessionState::with_config_rt(SessionConfig::new(), runtime)
.with_query_planner(Arc::new(MyQueryPlanner {}));
let ctx = SessionContext::with_state(session_state);
Expand Down
14 changes: 13 additions & 1 deletion datafusion/core/src/execution/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ impl RuntimeEnv {
let RuntimeConfig {
memory_manager,
disk_manager,
object_store_registry,
} = config;

Ok(Self {
memory_manager: MemoryManager::new(memory_manager),
disk_manager: DiskManager::try_new(disk_manager)?,
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
object_store_registry: Arc::new(object_store_registry),
})
}

Expand Down Expand Up @@ -121,6 +122,8 @@ pub struct RuntimeConfig {
pub disk_manager: DiskManagerConfig,
/// MemoryManager to limit access to memory
pub memory_manager: MemoryManagerConfig,
/// ObjectStoreRegistry to get object store based on url
pub object_store_registry: ObjectStoreRegistry,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should possibly be Arc<ObjectStoreRegistry> for consistency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Other properties are not wrapped with Arc. And for one env, there should be only one runtime_env and its related properties.

Copy link
Contributor

@tustvold tustvold Jul 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two reasons:

  • We use Arc<ObjectStoreRegistry> elsewhere and so using it here is more consistent
  • It provides a hint that this is shared state, e.g. we use Arc<DiskManager> instead of just DiskManager, Arc<MemoryManager> instead of MemoryManager etc...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Agree with you that here better to use Arc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

impl RuntimeConfig {
Expand All @@ -141,6 +144,15 @@ impl RuntimeConfig {
self
}

/// Customize object store registry
pub fn with_object_store_registry(
mut self,
object_store_registry: ObjectStoreRegistry,
) -> Self {
self.object_store_registry = object_store_registry;
self
}

/// Specify the total memory to use while running the DataFusion
/// plan to `max_memory * memory_fraction` in bytes.
///
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,7 @@ mod tests {
use crate::assert_contains;
use crate::execution::context::TaskContext;
use crate::execution::options::CsvReadOptions;
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::execution::runtime_env::RuntimeEnv;
use crate::logical_plan::plan::Extension;
use crate::physical_plan::{
expressions, DisplayFormatType, Partitioning, Statistics,
Expand All @@ -1604,7 +1604,7 @@ mod tests {
use std::{any::Any, fmt};

fn make_session_state() -> SessionState {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
let runtime = Arc::new(RuntimeEnv::default());
SessionState::with_config_rt(SessionConfig::new(), runtime)
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};

use async_trait::async_trait;
use datafusion::execution::context::TaskContext;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::plan::{Extension, Sort};
use datafusion::logical_plan::{DFSchemaRef, Limit};
use datafusion::optimizer::optimizer::OptimizerConfig;
Expand Down Expand Up @@ -247,7 +247,7 @@ async fn topk_plan() -> Result<()> {

fn make_topk_context() -> SessionContext {
let config = SessionConfig::new().with_target_partitions(48);
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
let runtime = Arc::new(RuntimeEnv::default());
let state = SessionState::with_config_rt(config, runtime)
.with_query_planner(Arc::new(TopKQueryPlanner {}))
.add_optimizer_rule(Arc::new(TopKOptimizerRule {}));
Expand Down