From 9bee83b490788787f49b7630563aa51fba46e911 Mon Sep 17 00:00:00 2001 From: Mateusz Kondej Date: Tue, 27 Sep 2022 22:48:53 +0200 Subject: [PATCH] Cache collected file statistics --- .../core/src/datasource/listing/table.rs | 93 ++++++++++++++++++- 1 file changed, 88 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index e77773a822cd..40086a0a3bd5 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -17,11 +17,15 @@ //! The table implementation. +use ahash::HashMap; use std::{any::Any, sync::Arc}; use arrow::datatypes::{Field, Schema, SchemaRef}; use async_trait::async_trait; use futures::{future, stream, StreamExt, TryStreamExt}; +use object_store::path::Path; +use object_store::ObjectMeta; +use parking_lot::RwLock; use crate::datasource::{ file_format::{ @@ -237,6 +241,36 @@ impl ListingOptions { } } +/// Collected statistics for files +/// Cache is invalided when file size or last modification has changed +#[derive(Default)] +struct StatisticsCache { + statistics: RwLock>, +} + +impl StatisticsCache { + /// Get `Statistics` for file location. Returns None if file has changed or not found. + fn get(&self, meta: &ObjectMeta) -> Option { + let map = self.statistics.read(); + let (saved_meta, statistics) = map.get(&meta.location)?; + + if saved_meta.size != meta.size || saved_meta.last_modified != meta.last_modified + { + // file has changed + return None; + } + + Some(statistics.clone()) + } + + /// Save collected file statistics + fn save(&self, meta: ObjectMeta, statistics: Statistics) { + self.statistics + .write() + .insert(meta.location.clone(), (meta, statistics)); + } +} + /// An implementation of `TableProvider` that uses the object store /// or file system listing capability to get the list of files. pub struct ListingTable { @@ -247,6 +281,7 @@ pub struct ListingTable { table_schema: SchemaRef, options: ListingOptions, definition: Option, + collected_statistics: StatisticsCache, } impl ListingTable { @@ -282,6 +317,7 @@ impl ListingTable { table_schema: Arc::new(Schema::new(table_fields)), options, definition: None, + collected_statistics: Default::default(), }; Ok(table) @@ -400,14 +436,26 @@ impl ListingTable { let file_list = stream::iter(file_list).flatten(); // collect the statistics if required by the config - // TODO: Collect statistics and schema in single-pass let files = file_list.then(|part_file| async { let part_file = part_file?; let statistics = if self.options.collect_stat { - self.options - .format - .infer_stats(&store, self.file_schema.clone(), &part_file.object_meta) - .await? + match self.collected_statistics.get(&part_file.object_meta) { + Some(statistics) => statistics, + None => { + let statistics = self + .options + .format + .infer_stats( + &store, + self.file_schema.clone(), + &part_file.object_meta, + ) + .await?; + self.collected_statistics + .save(part_file.object_meta.clone(), statistics.clone()); + statistics + } + } } else { Statistics::default() }; @@ -434,6 +482,7 @@ mod tests { test::{columns, object_store::register_test_store}, }; use arrow::datatypes::DataType; + use chrono::DateTime; use super::*; @@ -752,4 +801,38 @@ mod tests { Ok(()) } + + #[test] + fn test_statistics_cache() { + let meta = ObjectMeta { + location: Path::from("test"), + last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") + .unwrap() + .into(), + size: 1024, + }; + + let cache = StatisticsCache::default(); + assert!(cache.get(&meta).is_none()); + + cache.save(meta.clone(), Statistics::default()); + assert!(cache.get(&meta).is_some()); + + // file size changed + let mut meta2 = meta.clone(); + meta2.size = 2048; + assert!(cache.get(&meta2).is_none()); + + // file last_modified changed + let mut meta2 = meta.clone(); + meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00") + .unwrap() + .into(); + assert!(cache.get(&meta2).is_none()); + + // different file + let mut meta2 = meta.clone(); + meta2.location = Path::from("test2"); + assert!(cache.get(&meta2).is_none()); + } }