diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 3e16b756a01b..d541b87cdb51 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -77,9 +77,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15c4c2c83f81532e5845a733998b6971faca23490340a418e9b72a3ec9de12ea" +checksum = "b84bf0a05bbb2a83e5eb6fa36bb6e87baa08193c35ff52bbf6b38d8af2890e46" [[package]] name = "arrayref" @@ -323,9 +323,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d495b6dc0184693324491a5ac05f559acc97bf937ab31d7a1c33dd0016be6d2b" +checksum = "bb42b2197bf15ccb092b62c74515dbd8b86d0effd934795f6687c93b6e679a2c" dependencies = [ "bzip2", "flate2", @@ -347,7 +347,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.33", ] [[package]] @@ -674,9 +674,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.21.3" +version = "0.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "414dcefbc63d77c526a76b3afcf6fbb9b5e2791c19c3aa2297733208750c6e53" +checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" [[package]] name = "base64-simd" @@ -766,9 +766,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "byteorder" @@ -1031,7 +1031,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f34ba9a9bcb8645379e9de8cb3ecfcf4d1c85ba66d90deb3259206fa5aa193b" dependencies = [ "quote", - "syn 2.0.31", + "syn 2.0.33", ] [[package]] @@ -1144,6 +1144,7 @@ name = "datafusion-execution" version = "31.0.0" dependencies = [ "arrow", + "chrono", "dashmap", "datafusion-common", "datafusion-expr", @@ -1502,7 +1503,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.33", ] [[package]] @@ -1937,9 +1938,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.147" +version = "0.2.148" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" [[package]] name = "libm" @@ -1949,9 +1950,9 @@ checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" [[package]] name = "libmimalloc-sys" -version = "0.1.34" +version = "0.1.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25d058a81af0d1c22d7a1c948576bee6d673f7af3c0f35564abd6c81122f513d" +checksum = "3979b5c37ece694f1f5e51e7ecc871fdb0f517ed04ee45f88d15d6d553cb9664" dependencies = [ "cc", "libc", @@ -1959,9 +1960,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.5" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" +checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128" [[package]] name = "lock_api" @@ -2027,9 +2028,9 @@ checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" [[package]] name = "mimalloc" -version = "0.1.38" +version = "0.1.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "972e5f23f6716f62665760b0f4cbf592576a80c7b879ba9beaafc0e558894127" +checksum = "fa01922b5ea280a911e323e4d2fd24b7fe5cc4042e0d2cda3c40775cdc4bdc9c" dependencies = [ "libmimalloc-sys", ] @@ -2388,7 +2389,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.33", ] [[package]] @@ -2478,9 +2479,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.66" +version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328" dependencies = [ "unicode-ident", ] @@ -2701,9 +2702,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.11" +version = "0.38.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0c3dde1fc030af041adc40e79c0e7fbcf431dd24870053d187d7c66e4b87453" +checksum = "d7db8590df6dfcd144d22afd1b83b36c21a18d7cbc1dc4bb5295a8712e9eb662" dependencies = [ "bitflags 2.4.0", "errno", @@ -2759,9 +2760,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.4" +version = "0.101.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d93931baf2d282fff8d3a532bbfd7653f734643161b87e3e01e59a04439bf0d" +checksum = "45a27e3b59326c16e23d30aeb7a36a24cc0d29e71d68ff611cdfb4a01d013bed" dependencies = [ "ring", "untrusted", @@ -2888,14 +2889,14 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.33", ] [[package]] name = "serde_json" -version = "1.0.105" +version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360" +checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" dependencies = [ "itoa", "ryu", @@ -2986,9 +2987,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" dependencies = [ "libc", "windows-sys", @@ -3077,7 +3078,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.31", + "syn 2.0.33", ] [[package]] @@ -3099,9 +3100,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.31" +version = "2.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "718fa2415bcb8d8bd775917a1bf12a7931b6dfa890753378538118181e0cb398" +checksum = "9caece70c63bfba29ec2fed841a09851b14a235c60010fa4de58089b6c025668" dependencies = [ "proc-macro2", "quote", @@ -3159,7 +3160,7 @@ checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.33", ] [[package]] @@ -3237,7 +3238,7 @@ dependencies = [ "num_cpus", "parking_lot", "pin-project-lite", - "socket2 0.5.3", + "socket2 0.5.4", "tokio-macros", "windows-sys", ] @@ -3250,7 +3251,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.33", ] [[package]] @@ -3348,7 +3349,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.33", ] [[package]] @@ -3390,9 +3391,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" [[package]] name = "unicode-ident" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "unicode-normalization" @@ -3520,7 +3521,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.33", "wasm-bindgen-shared", ] @@ -3554,7 +3555,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.33", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index e36252a99566..c84cdde08e3b 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -24,15 +24,12 @@ use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; use arrow_schema::Schema; use async_trait::async_trait; -use dashmap::DashMap; use datafusion_common::FileTypeWriterOptions; use datafusion_common::{internal_err, plan_err, project_schema, SchemaExt, ToDFSchema}; use datafusion_expr::expr::Sort; use datafusion_optimizer::utils::conjunction; use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr}; use futures::{future, stream, StreamExt, TryStreamExt}; -use object_store::path::Path; -use object_store::ObjectMeta; use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use crate::datasource::{ @@ -53,6 +50,8 @@ use crate::{ physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}, }; use datafusion_common::{FileCompressionType, FileType}; +use datafusion_execution::cache::cache_manager::FileStatisticsCache; +use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; use super::PartitionedFile; @@ -232,6 +231,7 @@ impl FromStr for ListingTableInsertMode { } } } + /// Options for creating a [`ListingTable`] #[derive(Clone, Debug)] pub struct ListingOptions { @@ -510,39 +510,6 @@ impl ListingOptions { } } -/// Collected statistics for files -/// Cache is invalided when file size or last modification has changed -#[derive(Default)] -struct StatisticsCache { - statistics: DashMap, -} - -impl StatisticsCache { - /// Get `Statistics` for file location. Returns None if file has changed or not found. - fn get(&self, meta: &ObjectMeta) -> Option { - self.statistics - .get(&meta.location) - .map(|s| { - let (saved_meta, statistics) = s.value(); - if saved_meta.size != meta.size - || saved_meta.last_modified != meta.last_modified - { - // file has changed - None - } else { - Some(statistics.clone()) - } - }) - .unwrap_or(None) - } - - /// Save collected file statistics - fn save(&self, meta: ObjectMeta, statistics: Statistics) { - self.statistics - .insert(meta.location.clone(), (meta, statistics)); - } -} - /// Reads data from one or more files via an /// [`ObjectStore`](object_store::ObjectStore). For example, from /// local files or objects from AWS S3. Implements [`TableProvider`], @@ -616,7 +583,7 @@ pub struct ListingTable { table_schema: SchemaRef, options: ListingOptions, definition: Option, - collected_statistics: StatisticsCache, + collected_statistics: FileStatisticsCache, infinite_source: bool, } @@ -653,13 +620,25 @@ impl ListingTable { table_schema: Arc::new(builder.finish()), options, definition: None, - collected_statistics: Default::default(), + collected_statistics: Arc::new(DefaultFileStatisticsCache::default()), infinite_source, }; Ok(table) } + /// Set the [`FileStatisticsCache`] used to cache parquet file statistics. + /// + /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics + /// multiple times in the same session. + /// + /// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query. + pub fn with_cache(mut self, cache: Option) -> Self { + self.collected_statistics = + cache.unwrap_or(Arc::new(DefaultFileStatisticsCache::default())); + self + } + /// Specify the SQL definition for this table, if any pub fn with_definition(mut self, defintion: Option) -> Self { self.definition = defintion; @@ -683,27 +662,26 @@ impl ListingTable { for exprs in &self.options.file_sort_order { // Construct PhsyicalSortExpr objects from Expr objects: let sort_exprs = exprs - .iter() - .map(|expr| { - if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr { - if let Expr::Column(col) = expr.as_ref() { - let expr = physical_plan::expressions::col(&col.name, self.table_schema.as_ref())?; - Ok(PhysicalSortExpr { - expr, - options: SortOptions { - descending: !asc, - nulls_first: *nulls_first, - }, - }) - } - else { - plan_err!("Expected single column references in output_ordering, got {expr}") + .iter() + .map(|expr| { + if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr { + if let Expr::Column(col) = expr.as_ref() { + let expr = physical_plan::expressions::col(&col.name, self.table_schema.as_ref())?; + Ok(PhysicalSortExpr { + expr, + options: SortOptions { + descending: !asc, + nulls_first: *nulls_first, + }, + }) + } else { + plan_err!("Expected single column references in output_ordering, got {expr}") + } + } else { + plan_err!("Expected Expr::Sort in output_ordering, but got {expr}") } - } else { - plan_err!("Expected Expr::Sort in output_ordering, but got {expr}") - } - }) - .collect::>>()?; + }) + .collect::>>()?; all_sort_orders.push(sort_exprs); } Ok(all_sort_orders) @@ -870,12 +848,11 @@ impl TableProvider for ListingTable { .await?; let file_groups = file_list_stream.try_collect::>().await?; - let writer_mode; //if we are writing a single output_partition to a table backed by a single file //we can append to that file. Otherwise, we can write new files into the directory //adding new files to the listing table in order to insert to the table. let input_partitions = input.output_partitioning().partition_count(); - match self.options.insert_mode { + let writer_mode = match self.options.insert_mode { ListingTableInsertMode::AppendToFile => { if input_partitions > file_groups.len() { return Err(DataFusionError::Plan(format!( @@ -883,19 +860,18 @@ impl TableProvider for ListingTable { file_groups.len() ))); } - writer_mode = - crate::datasource::file_format::write::FileWriterMode::Append; + + crate::datasource::file_format::write::FileWriterMode::Append } ListingTableInsertMode::AppendNewFiles => { - writer_mode = - crate::datasource::file_format::write::FileWriterMode::PutMultipart + crate::datasource::file_format::write::FileWriterMode::PutMultipart } ListingTableInsertMode::Error => { return plan_err!( "Invalid plan attempting write to table with TableWriteMode::Error!" - ) + ); } - } + }; let file_format = self.options().format.as_ref(); @@ -960,9 +936,14 @@ impl ListingTable { // collect the statistics if required by the config let files = file_list.then(|part_file| async { let part_file = part_file?; - let statistics = if self.options.collect_stat { - match self.collected_statistics.get(&part_file.object_meta) { - Some(statistics) => statistics, + let mut statistics_result = Statistics::default(); + if self.options.collect_stat { + let statistics_cache = self.collected_statistics.clone(); + match statistics_cache.get_with_extra( + &part_file.object_meta.location, + &part_file.object_meta, + ) { + Some(statistics) => statistics_result = statistics.as_ref().clone(), None => { let statistics = self .options @@ -974,15 +955,16 @@ impl ListingTable { &part_file.object_meta, ) .await?; - self.collected_statistics - .save(part_file.object_meta.clone(), statistics.clone()); - statistics + statistics_cache.put_with_extra( + &part_file.object_meta.location, + statistics.clone().into(), + &part_file.object_meta, + ); + statistics_result = statistics; } } - } else { - Statistics::default() - }; - Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)> + } + Ok((part_file, statistics_result)) as Result<(PartitionedFile, Statistics)> }); let (files, statistics) = @@ -1011,7 +993,6 @@ mod tests { }; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; - use chrono::DateTime; use datafusion_common::assert_contains; use datafusion_common::GetExt; use datafusion_expr::LogicalPlanBuilder; @@ -1155,7 +1136,6 @@ mod tests { nulls_first: false, }, }]]) - ), // ok with two columns, different options ( @@ -1179,9 +1159,7 @@ mod tests { }, }, ]]) - ), - ]; for (file_sort_order, expected_result) in cases { @@ -1568,41 +1546,6 @@ mod tests { Ok(()) } - #[test] - fn test_statistics_cache() { - let meta = ObjectMeta { - location: Path::from("test"), - last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") - .unwrap() - .into(), - size: 1024, - e_tag: None, - }; - - let cache = StatisticsCache::default(); - assert!(cache.get(&meta).is_none()); - - cache.save(meta.clone(), Statistics::default()); - assert!(cache.get(&meta).is_some()); - - // file size changed - let mut meta2 = meta.clone(); - meta2.size = 2048; - assert!(cache.get(&meta2).is_none()); - - // file last_modified changed - let mut meta2 = meta.clone(); - meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00") - .unwrap() - .into(); - assert!(cache.get(&meta2).is_none()); - - // different file - let mut meta2 = meta; - meta2.location = Path::from("test2"); - assert!(cache.get(&meta2).is_none()); - } - #[tokio::test] async fn test_insert_into_append_to_json_file() -> Result<()> { helper_test_insert_into_append_to_existing_files( @@ -2302,9 +2245,9 @@ mod tests { // insert data session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo', 'bar', 2), ('foo', 'bar', 3)") - .await? - .collect() - .await?; + .await? + .collect() + .await?; // check count let batches = session_ctx diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index b4892119b785..40aeccf233cc 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -229,8 +229,9 @@ impl TableProviderFactory for ListingTableFactory { let config = ListingTableConfig::new(table_path) .with_listing_options(options) .with_schema(resolved_schema); - let table = - ListingTable::try_new(config)?.with_definition(cmd.definition.clone()); + let provider = ListingTable::try_new(config)? + .with_cache(state.runtime_env().cache_manager.get_file_statistic_cache()); + let table = provider.with_definition(cmd.definition.clone()); Ok(Arc::new(table)) } } diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs new file mode 100644 index 000000000000..90abbe9e2128 --- /dev/null +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, +}; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion::prelude::SessionContext; +use datafusion_execution::cache::cache_manager::CacheManagerConfig; +use datafusion_execution::cache::cache_unit; +use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; +use datafusion_execution::config::SessionConfig; +use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use std::sync::Arc; + +#[tokio::test] +async fn load_table_stats_with_session_level_cache() { + let testdata = datafusion::test_util::parquet_test_data(); + let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); + let table_path = ListingTableUrl::parse(filename).unwrap(); + + let (cache1, state1) = get_cache_runtime_state(); + + // Create a separate DefaultFileStatisticsCache + let (cache2, state2) = get_cache_runtime_state(); + + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); + + let table1 = get_listing_with_cache(&table_path, cache1, &state1, &opt).await; + let table2 = get_listing_with_cache(&table_path, cache2, &state2, &opt).await; + + //Session 1 first time list files + assert_eq!(get_cache_size(&state1), 0); + let exec1 = table1.scan(&state1, None, &[], None).await.unwrap(); + + assert_eq!(exec1.statistics().num_rows, Some(8)); + assert_eq!(exec1.statistics().total_byte_size, Some(671)); + assert_eq!(get_cache_size(&state1), 1); + + //Session 2 first time list files + //check session 1 cache result not show in session 2 + assert_eq!( + state2 + .runtime_env() + .cache_manager + .get_file_statistic_cache() + .unwrap() + .len(), + 0 + ); + let exec2 = table2.scan(&state2, None, &[], None).await.unwrap(); + assert_eq!(exec2.statistics().num_rows, Some(8)); + assert_eq!(exec2.statistics().total_byte_size, Some(671)); + assert_eq!(get_cache_size(&state2), 1); + + //Session 1 second time list files + //check session 1 cache result not show in session 2 + assert_eq!(get_cache_size(&state1), 1); + let exec3 = table1.scan(&state1, None, &[], None).await.unwrap(); + assert_eq!(exec3.statistics().num_rows, Some(8)); + assert_eq!(exec3.statistics().total_byte_size, Some(671)); + // List same file no increase + assert_eq!(get_cache_size(&state1), 1); +} + +async fn get_listing_with_cache( + table_path: &ListingTableUrl, + cache1: Arc, + state1: &SessionState, + opt: &ListingOptions, +) -> ListingTable { + let schema = opt.infer_schema(state1, table_path).await.unwrap(); + let config1 = ListingTableConfig::new(table_path.clone()) + .with_listing_options(opt.clone()) + .with_schema(schema); + ListingTable::try_new(config1) + .unwrap() + .with_cache(Some(cache1)) +} + +fn get_cache_runtime_state() -> (Arc, SessionState) { + let cache_config = CacheManagerConfig::default(); + let cache1 = Arc::new(cache_unit::DefaultFileStatisticsCache::default()); + let cache_config = cache_config.with_files_statistics_cache(Some(cache1.clone())); + let rt = Arc::new( + RuntimeEnv::new(RuntimeConfig::new().with_cache_manager(cache_config)).unwrap(), + ); + let state = SessionContext::with_config_rt(SessionConfig::default(), rt).state(); + + (cache1, state) +} + +fn get_cache_size(state1: &SessionState) -> usize { + state1 + .runtime_env() + .cache_manager + .get_file_statistic_cache() + .unwrap() + .len() +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 6f289e0c064b..29ae81be4d33 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -40,6 +40,7 @@ use std::sync::Arc; use tempfile::NamedTempFile; mod custom_reader; +mod file_statistics; mod filter_pushdown; mod page_pruning; mod row_group_pruning; diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index a44a1ff1ef38..cf4eb5ef1f25 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -18,7 +18,7 @@ [package] name = "datafusion-execution" description = "Execution configuration support for DataFusion query engine" -keywords = [ "arrow", "query", "sql" ] +keywords = ["arrow", "query", "sql"] version = { workspace = true } edition = { workspace = true } readme = { workspace = true } @@ -34,6 +34,7 @@ path = "src/lib.rs" [dependencies] arrow = { workspace = true } +chrono = { version = "0.4", default-features = false } dashmap = "5.4.0" datafusion-common = { path = "../common", version = "31.0.0" } datafusion-expr = { path = "../expr", version = "31.0.0" } diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs new file mode 100644 index 000000000000..987b47bbb8f5 --- /dev/null +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::cache::CacheAccessor; +use datafusion_common::{Result, Statistics}; +use object_store::path::Path; +use object_store::ObjectMeta; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +/// The cache of listing files statistics. +/// if set [`CacheManagerConfig::with_files_statistics_cache`] +/// Will avoid infer same file statistics repeatedly during the session lifetime, +/// this cache will store in [`crate::runtime_env::RuntimeEnv`]. +pub type FileStatisticsCache = + Arc, Extra = ObjectMeta>>; + +impl Debug for dyn CacheAccessor, Extra = ObjectMeta> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Cache name: {} with length: {}", self.name(), self.len()) + } +} + +#[derive(Default, Debug)] +pub struct CacheManager { + file_statistic_cache: Option, +} + +impl CacheManager { + pub fn try_new(config: &CacheManagerConfig) -> Result> { + let mut manager = CacheManager::default(); + if let Some(cc) = &config.table_files_statistics_cache { + manager.file_statistic_cache = Some(cc.clone()) + } + Ok(Arc::new(manager)) + } + + /// Get the cache of listing files statistics. + pub fn get_file_statistic_cache(&self) -> Option { + self.file_statistic_cache.clone() + } +} + +#[derive(Clone, Default)] +pub struct CacheManagerConfig { + /// Enable cache of files statistics when listing files. + /// Avoid get same file statistics repeatedly in same datafusion session. + /// Default is disable. Fow now only supports Parquet files. + pub table_files_statistics_cache: Option, +} + +impl CacheManagerConfig { + pub fn with_files_statistics_cache( + mut self, + cache: Option, + ) -> Self { + self.table_files_statistics_cache = cache; + self + } +} diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs new file mode 100644 index 000000000000..3ef699ac2360 --- /dev/null +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::cache::CacheAccessor; +use dashmap::DashMap; +use datafusion_common::Statistics; +use object_store::path::Path; +use object_store::ObjectMeta; +use std::sync::Arc; + +/// Collected statistics for files +/// Cache is invalided when file size or last modification has changed +#[derive(Default)] +pub struct DefaultFileStatisticsCache { + statistics: DashMap)>, +} + +impl CacheAccessor> for DefaultFileStatisticsCache { + type Extra = ObjectMeta; + + /// Get `Statistics` for file location. + fn get(&self, k: &Path) -> Option> { + self.statistics + .get(k) + .map(|s| Some(s.value().1.clone())) + .unwrap_or(None) + } + + /// Get `Statistics` for file location. Returns None if file has changed or not found. + fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option> { + self.statistics + .get(k) + .map(|s| { + let (saved_meta, statistics) = s.value(); + if saved_meta.size != e.size + || saved_meta.last_modified != e.last_modified + { + // file has changed + None + } else { + Some(statistics.clone()) + } + }) + .unwrap_or(None) + } + + /// Save collected file statistics + fn put(&self, _key: &Path, _value: Arc) -> Option> { + panic!("Put cache in DefaultFileStatisticsCache without Extra not supported.") + } + + fn put_with_extra( + &self, + key: &Path, + value: Arc, + e: &Self::Extra, + ) -> Option> { + self.statistics + .insert(key.clone(), (e.clone(), value)) + .map(|x| x.1) + } + + fn remove(&mut self, k: &Path) -> Option> { + self.statistics.remove(k).map(|x| x.1 .1) + } + + fn contains_key(&self, k: &Path) -> bool { + self.statistics.contains_key(k) + } + + fn len(&self) -> usize { + self.statistics.len() + } + + fn clear(&self) { + self.statistics.clear() + } + fn name(&self) -> String { + "DefaultFileStatisticsCache".to_string() + } +} + +#[cfg(test)] +mod tests { + use crate::cache::cache_unit::DefaultFileStatisticsCache; + use crate::cache::CacheAccessor; + use chrono::DateTime; + use datafusion_common::Statistics; + use object_store::path::Path; + use object_store::ObjectMeta; + + #[test] + fn test_statistics_cache() { + let meta = ObjectMeta { + location: Path::from("test"), + last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") + .unwrap() + .into(), + size: 1024, + e_tag: None, + }; + + let cache = DefaultFileStatisticsCache::default(); + assert!(cache.get_with_extra(&meta.location, &meta).is_none()); + + cache.put_with_extra(&meta.location, Statistics::default().into(), &meta); + assert!(cache.get_with_extra(&meta.location, &meta).is_some()); + + // file size changed + let mut meta2 = meta.clone(); + meta2.size = 2048; + assert!(cache.get_with_extra(&meta2.location, &meta2).is_none()); + + // file last_modified changed + let mut meta2 = meta.clone(); + meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00") + .unwrap() + .into(); + assert!(cache.get_with_extra(&meta2.location, &meta2).is_none()); + + // different file + let mut meta2 = meta; + meta2.location = Path::from("test2"); + assert!(cache.get_with_extra(&meta2.location, &meta2).is_none()); + } +} diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs new file mode 100644 index 000000000000..da19bff5658a --- /dev/null +++ b/datafusion/execution/src/cache/mod.rs @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod cache_manager; +pub mod cache_unit; + +/// The cache accessor, users usually working on this interface while manipulating caches. +/// This interface does not get `mut` references and thus has to handle its own +/// locking via internal mutability. It can be accessed via multiple concurrent queries +/// during planning and execution. + +pub trait CacheAccessor: Send + Sync { + // Extra info but not part of the cache key or cache value. + type Extra: Clone; + + /// Get value from cache. + fn get(&self, k: &K) -> Option; + /// Get value from cache. + fn get_with_extra(&self, k: &K, e: &Self::Extra) -> Option; + /// Put value into cache. Returns the old value associated with the key if there was one. + fn put(&self, key: &K, value: V) -> Option; + /// Put value into cache. Returns the old value associated with the key if there was one. + fn put_with_extra(&self, key: &K, value: V, e: &Self::Extra) -> Option; + /// Remove an entry from the cache, returning value if they existed in the map. + fn remove(&mut self, k: &K) -> Option; + /// Check if the cache contains a specific key. + fn contains_key(&self, k: &K) -> bool; + /// Fetch the total number of cache entries. + fn len(&self) -> usize; + /// Check if the Cache collection is empty or not. + fn is_empty(&self) -> bool { + self.len() == 0 + } + /// Remove all entries from the cache. + fn clear(&self); + /// Return the cache name. + fn name(&self) -> String; +} diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index 57d77aa1dde0..a1a1551c2ca6 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -17,6 +17,7 @@ //! DataFusion execution configuration and runtime structures +pub mod cache; pub mod config; pub mod disk_manager; pub mod memory_pool; diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 8f9c594681d0..e78a9e0de9f0 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -24,6 +24,7 @@ use crate::{ object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, }; +use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; use datafusion_common::{DataFusionError, Result}; use object_store::ObjectStore; use std::fmt::{Debug, Formatter}; @@ -33,19 +34,22 @@ use url::Url; #[derive(Clone)] /// Execution runtime environment that manages system resources such -/// as memory, disk and storage. +/// as memory, disk, cache and storage. /// /// A [`RuntimeEnv`] is created from a [`RuntimeConfig`] and has the /// following resource management functionality: /// /// * [`MemoryPool`]: Manage memory /// * [`DiskManager`]: Manage temporary files on local disk +/// * [`CacheManager`]: Manage temporary cache data during the session lifetime /// * [`ObjectStoreRegistry`]: Manage mapping URLs to object store instances pub struct RuntimeEnv { /// Runtime memory management pub memory_pool: Arc, /// Manage temporary files during query execution pub disk_manager: Arc, + /// Manage temporary cache during query execution + pub cache_manager: Arc, /// Object Store Registry pub object_store_registry: Arc, } @@ -62,6 +66,7 @@ impl RuntimeEnv { let RuntimeConfig { memory_pool, disk_manager, + cache_manager, object_store_registry, } = config; @@ -71,6 +76,7 @@ impl RuntimeEnv { Ok(Self { memory_pool, disk_manager: DiskManager::try_new(disk_manager)?, + cache_manager: CacheManager::try_new(&cache_manager)?, object_store_registry, }) } @@ -116,6 +122,8 @@ pub struct RuntimeConfig { /// /// Defaults to using an [`UnboundedMemoryPool`] if `None` pub memory_pool: Option>, + /// CacheManager to manage cache data + pub cache_manager: CacheManagerConfig, /// ObjectStoreRegistry to get object store based on url pub object_store_registry: Arc, } @@ -132,6 +140,7 @@ impl RuntimeConfig { Self { disk_manager: Default::default(), memory_pool: Default::default(), + cache_manager: Default::default(), object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()), } } @@ -148,6 +157,12 @@ impl RuntimeConfig { self } + /// Customize cache policy + pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self { + self.cache_manager = cache_manager; + self + } + /// Customize object store registry pub fn with_object_store_registry( mut self, diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index e1a107c399a5..edbcddf34d85 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -387,7 +387,12 @@ impl AsLogicalPlan for LogicalPlanNode { .with_listing_options(options) .with_schema(Arc::new(schema)); - let provider = ListingTable::try_new(config)?; + let provider = ListingTable::try_new(config)?.with_cache( + ctx.state() + .runtime_env() + .cache_manager + .get_file_statistic_cache(), + ); let table_name = from_owned_table_reference( scan.table_name.as_ref(),