Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Introduce cacheManager in session ctx and make StatisticsCache share in session #7570

Merged
merged 15 commits into from
Sep 18, 2023
291 changes: 171 additions & 120 deletions datafusion/core/src/datasource/listing/table.rs

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,14 @@ impl TableProviderFactory for ListingTableFactory {
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(resolved_schema);
let table =
ListingTable::try_new(config)?.with_definition(cmd.definition.clone());
let provider;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you implement the with_cache API, this can look like

let provider = ListingTable::try_new(config)?
  .with_cache(state.runtime_env().cache_manager.get_file_statistic_cache())

Copy link
Member Author

@Ted-Jiang Ted-Jiang Sep 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More clear way ! 👍

if let Some(cache) = state.runtime_env().cache_manager.get_file_statistic_cache()
{
provider = ListingTable::try_new_with_cache(config, cache)?;
} else {
provider = ListingTable::try_new(config)?;
}
let table = provider.with_definition(cmd.definition.clone());
Ok(Arc::new(table))
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ parking_lot = "0.12"
rand = "0.8"
tempfile = "3"
url = "2.2"
chrono = { version = "0.4", default-features = false }
58 changes: 58 additions & 0 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::cache::CacheAccessor;
use datafusion_common::{Result, Statistics};
use object_store::path::Path;
use object_store::ObjectMeta;
use std::sync::Arc;

pub type FileStaticCache = Arc<dyn CacheAccessor<Path, Statistics, Extra = ObjectMeta>>;
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Default)]
pub struct CacheManager {
file_statistic_cache: Option<FileStaticCache>,
}

impl CacheManager {
pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
let mut manager = CacheManager::default();
if let Some(cc) = &config.table_files_statistics_cache {
manager.file_statistic_cache = Some(cc.clone())
}
Ok(Arc::new(manager))
}

pub fn get_file_statistic_cache(&self) -> Option<FileStaticCache> {
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
self.file_statistic_cache.clone()
}
}

#[derive(Clone, Default)]
pub struct CacheManagerConfig {
/// Enable cache of files statistics when listing files.
/// Avoid get same file statistics repeatedly in same datafusion session.
/// Default is disable. Fow now only supports Parquet files.
pub table_files_statistics_cache: Option<FileStaticCache>,
}

impl CacheManagerConfig {
pub fn enable_table_files_statistics_cache(mut self, cache: FileStaticCache) -> Self {
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we possibly have the field names match -- here it is called table_files_statistics_cache but on the CacheManager it is called file_statistics_cache -- I think they should be the same in both places (I like file_statistics_cache best as it matches the type name)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ops, this make sense

self.table_files_statistics_cache = Some(cache);
self
}
}
132 changes: 132 additions & 0 deletions datafusion/execution/src/cache/cache_unit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::cache::CacheAccessor;
use dashmap::DashMap;
use datafusion_common::Statistics;
use object_store::path::Path;
use object_store::ObjectMeta;

/// Collected statistics for files
/// Cache is invalided when file size or last modification has changed
#[derive(Default)]
pub struct FileStatisticsCache {
statistics: DashMap<Path, (ObjectMeta, Statistics)>,
}

impl CacheAccessor<Path, Statistics> for FileStatisticsCache {
type Extra = ObjectMeta;

/// Get `Statistics` for file location.
fn get(&self, k: &Path) -> Option<Statistics> {
self.statistics
.get(k)
.map(|s| Some(s.value().1.clone()))
.unwrap_or(None)
}

/// Get `Statistics` for file location. Returns None if file has changed or not found.
fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Statistics> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as written this is going to copy the statistics (though I realize that is what this PR did previously) -- maybe we could use something like Arc<Statistics> to store the statistics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes , thanks for point this out

self.statistics
.get(k)
.map(|s| {
let (saved_meta, statistics) = s.value();
if saved_meta.size != e.size
|| saved_meta.last_modified != e.last_modified
{
// file has changed
None
} else {
Some(statistics.clone())
}
})
.unwrap_or(None)
}

/// Save collected file statistics
fn put(&self, _key: &Path, _value: Statistics) -> Option<Statistics> {
panic!("Put cache in FileStatisticsCache without Extra not supported.")
}

fn put_with_extra(
&self,
key: &Path,
value: Statistics,
e: &Self::Extra,
) -> Option<Statistics> {
self.statistics
.insert(key.clone(), (e.clone(), value))
.map(|x| x.1)
}

fn evict(&self, k: &Path) -> bool {
self.statistics.remove(k).is_some()
}

fn contains_key(&self, k: &Path) -> bool {
self.statistics.contains_key(k)
}

fn len(&self) -> usize {
self.statistics.len()
}
}

#[cfg(test)]
mod tests {
use crate::cache::cache_unit::FileStatisticsCache;
use crate::cache::CacheAccessor;
use chrono::DateTime;
use datafusion_common::Statistics;
use object_store::path::Path;
use object_store::ObjectMeta;
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved

#[test]
fn test_statistics_cache() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
};

let cache = FileStatisticsCache::default();
assert!(cache.get_with_extra(&meta.location, &meta).is_none());

cache.put_with_extra(&meta.location, Statistics::default(), &meta);
assert!(cache.get_with_extra(&meta.location, &meta).is_some());

// file size changed
let mut meta2 = meta.clone();
meta2.size = 2048;
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());

// file last_modified changed
let mut meta2 = meta.clone();
meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00")
.unwrap()
.into();
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());

// different file
let mut meta2 = meta;
meta2.location = Path::from("test2");
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
}
}
44 changes: 44 additions & 0 deletions datafusion/execution/src/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod cache_manager;
pub mod cache_unit;

// The cache accessor, users usually working on this interface while manipulating caches
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
pub trait CacheAccessor<K, V>: Send + Sync {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another classic API to add here would be "clear()" to clear all the values

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice abstract/trait

// Extra info but not part of the cache key or cache value.
type Extra: Clone;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what the usecase for Extra is? Specifically I wonder why such information could not be added as a field to the Value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like in default FileStatisticsCache, get func need check last_modified from ObjectMeta which not impl Hash so can not be part of the key, we need put this info in Extra.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- that makes sense -- it might help to document the rationale in statistics


/// Get value from cache.
fn get(&self, k: &K) -> Option<V>;
/// Get value from cache.
fn get_with_extra(&self, k: &K, e: &Self::Extra) -> Option<V>;
/// Put value into cache. Returns the old value associated with the key if there was one.
fn put(&self, key: &K, value: V) -> Option<V>;
/// Put value into cache. Returns the old value associated with the key if there was one.
fn put_with_extra(&self, key: &K, value: V, e: &Self::Extra) -> Option<V>;
/// Remove an entry from the cache, returning `true` if they existed in the cache.
fn evict(&self, k: &K) -> bool;
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
/// Check if the cache contains a specific key.
fn contains_key(&self, k: &K) -> bool;
/// Fetch the total number of cache entries.
fn len(&self) -> usize;
/// Check if the Cache collection is empty or not.
fn is_empty(&self) -> bool {
self.len() == 0
}
}
1 change: 1 addition & 0 deletions datafusion/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! DataFusion execution configuration and runtime structures

pub mod cache;
pub mod config;
pub mod disk_manager;
pub mod memory_pool;
Expand Down
14 changes: 14 additions & 0 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
};

use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
use datafusion_common::{DataFusionError, Result};
use object_store::ObjectStore;
use std::fmt::{Debug, Formatter};
Expand All @@ -46,6 +47,8 @@ pub struct RuntimeEnv {
pub memory_pool: Arc<dyn MemoryPool>,
/// Manage temporary files during query execution
pub disk_manager: Arc<DiskManager>,
/// Manage temporary cache during query execution
pub cache_manager: Arc<CacheManager>,
/// Object Store Registry
pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
}
Expand All @@ -62,6 +65,7 @@ impl RuntimeEnv {
let RuntimeConfig {
memory_pool,
disk_manager,
cache_manager,
object_store_registry,
} = config;

Expand All @@ -71,6 +75,7 @@ impl RuntimeEnv {
Ok(Self {
memory_pool,
disk_manager: DiskManager::try_new(disk_manager)?,
cache_manager: CacheManager::try_new(&cache_manager)?,
object_store_registry,
})
}
Expand Down Expand Up @@ -116,6 +121,8 @@ pub struct RuntimeConfig {
///
/// Defaults to using an [`UnboundedMemoryPool`] if `None`
pub memory_pool: Option<Arc<dyn MemoryPool>>,
/// CacheManager to manage cache data
pub cache_manager: CacheManagerConfig,
/// ObjectStoreRegistry to get object store based on url
pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
}
Expand All @@ -132,6 +139,7 @@ impl RuntimeConfig {
Self {
disk_manager: Default::default(),
memory_pool: Default::default(),
cache_manager: Default::default(),
object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
}
}
Expand All @@ -148,6 +156,12 @@ impl RuntimeConfig {
self
}

/// Customize cache policy
pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self {
self.cache_manager = cache_manager;
self
}

/// Customize object store registry
pub fn with_object_store_registry(
mut self,
Expand Down
12 changes: 11 additions & 1 deletion datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,17 @@ impl AsLogicalPlan for LogicalPlanNode {
.with_listing_options(options)
.with_schema(Arc::new(schema));

let provider = ListingTable::try_new(config)?;
let provider;
if let Some(cache) = ctx
.state()
.runtime_env()
.cache_manager
.get_file_statistic_cache()
{
provider = ListingTable::try_new_with_cache(config, cache)?;
} else {
provider = ListingTable::try_new(config)?;
}

let table_name = from_owned_table_reference(
scan.table_name.as_ref(),
Expand Down