-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feat] Introduce cacheManager in session ctx and make StatisticsCache share in session #7570
Conversation
@@ -1092,6 +1092,95 @@ mod tests { | |||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] | |||
async fn load_table_stats_with_session_level_cache() -> Result<()> { | |||
let testdata = crate::test_util::parquet_test_data(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add test to check cache share in session level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice -- thank you. Since this is an end to end test, I recommend moving it to somewhere in core_integration
: datafusion/core/tests/core_integration.rs perhaps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Ted-Jiang -- this is looking really nice. I left some various clean up comments but I think this PR looks very nice and is well commented and structured 🏆
// The cache accessor, users usually working on this interface while manipulating caches | ||
pub trait CacheAccessor<K, V>: Send + Sync { | ||
// Extra info but not part of the cache key or cache value. | ||
type Extra: Clone; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain what the usecase for Extra
is? Specifically I wonder why such information could not be added as a field to the Value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like in default FileStatisticsCache
, get
func need check last_modified
from ObjectMeta
which not impl Hash
so can not be part of the key, we need put this info in Extra
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see -- that makes sense -- it might help to document the rationale in statistics
} | ||
|
||
/// Get `Statistics` for file location. Returns None if file has changed or not found. | ||
fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Statistics> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as written this is going to copy the statistics (though I realize that is what this PR did previously) -- maybe we could use something like Arc<Statistics>
to store the statistics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes , thanks for point this out
@@ -1092,6 +1092,95 @@ mod tests { | |||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] | |||
async fn load_table_stats_with_session_level_cache() -> Result<()> { | |||
let testdata = crate::test_util::parquet_test_data(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice -- thank you. Since this is an end to end test, I recommend moving it to somewhere in core_integration
: datafusion/core/tests/core_integration.rs perhaps
} | ||
|
||
impl CacheManagerConfig { | ||
pub fn enable_table_files_statistics_cache(mut self, cache: FileStaticCache) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we possibly have the field names match -- here it is called table_files_statistics_cache
but on the CacheManager
it is called file_statistics_cache
-- I think they should be the same in both places (I like file_statistics_cache
best as it matches the type name)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ops, this make sense
pub mod cache_unit; | ||
|
||
// The cache accessor, users usually working on this interface while manipulating caches | ||
pub trait CacheAccessor<K, V>: Send + Sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another classic API to add here would be "clear()" to clear all the values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need this
@@ -229,8 +229,14 @@ impl TableProviderFactory for ListingTableFactory { | |||
let config = ListingTableConfig::new(table_path) | |||
.with_listing_options(options) | |||
.with_schema(resolved_schema); | |||
let table = | |||
ListingTable::try_new(config)?.with_definition(cmd.definition.clone()); | |||
let provider; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you implement the with_cache
API, this can look like
let provider = ListingTable::try_new(config)?
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More clear way ! 👍
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
@alamb PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Ted-Jiang -- I think this looks great -- I had a suggestion to improve the comments but I think we can do that as a follow on PR as well. Nice work!
@@ -40,6 +40,7 @@ use std::sync::Arc; | |||
use tempfile::NamedTempFile; | |||
|
|||
mod custom_reader; | |||
mod file_statistics; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
cc @Dandandan / @thinkharderdev @liukun4515 and @mateuszkj who appears to have added this feature originally in 85c11c1 / #3649 |
/// locking via internal mutability. It can be accessed via multiple concurrent queries | ||
/// during planning and execution. | ||
|
||
pub trait CacheAccessor<K, V>: Send + Sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice abstract/trait
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for file statistics trait api
Co-authored-by: Andrew Lamb <[email protected]>
@alamb @liukun4515 Thanks for the review. |
Which issue does this PR close?
Closes #7556 .
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?