Skip to content

Commit

Permalink
Cache collected file statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
mateuszkj committed Sep 28, 2022
1 parent 06a4f79 commit 9bee83b
Showing 1 changed file with 88 additions and 5 deletions.
93 changes: 88 additions & 5 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

//! The table implementation.
use ahash::HashMap;
use std::{any::Any, sync::Arc};

use arrow::datatypes::{Field, Schema, SchemaRef};
use async_trait::async_trait;
use futures::{future, stream, StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectMeta;
use parking_lot::RwLock;

use crate::datasource::{
file_format::{
Expand Down Expand Up @@ -237,6 +241,36 @@ impl ListingOptions {
}
}

/// Collected statistics for files
/// Cache is invalided when file size or last modification has changed
#[derive(Default)]
struct StatisticsCache {
statistics: RwLock<HashMap<Path, (ObjectMeta, Statistics)>>,
}

impl StatisticsCache {
/// Get `Statistics` for file location. Returns None if file has changed or not found.
fn get(&self, meta: &ObjectMeta) -> Option<Statistics> {
let map = self.statistics.read();
let (saved_meta, statistics) = map.get(&meta.location)?;

if saved_meta.size != meta.size || saved_meta.last_modified != meta.last_modified
{
// file has changed
return None;
}

Some(statistics.clone())
}

/// Save collected file statistics
fn save(&self, meta: ObjectMeta, statistics: Statistics) {
self.statistics
.write()
.insert(meta.location.clone(), (meta, statistics));
}
}

/// An implementation of `TableProvider` that uses the object store
/// or file system listing capability to get the list of files.
pub struct ListingTable {
Expand All @@ -247,6 +281,7 @@ pub struct ListingTable {
table_schema: SchemaRef,
options: ListingOptions,
definition: Option<String>,
collected_statistics: StatisticsCache,
}

impl ListingTable {
Expand Down Expand Up @@ -282,6 +317,7 @@ impl ListingTable {
table_schema: Arc::new(Schema::new(table_fields)),
options,
definition: None,
collected_statistics: Default::default(),
};

Ok(table)
Expand Down Expand Up @@ -400,14 +436,26 @@ impl ListingTable {
let file_list = stream::iter(file_list).flatten();

// collect the statistics if required by the config
// TODO: Collect statistics and schema in single-pass
let files = file_list.then(|part_file| async {
let part_file = part_file?;
let statistics = if self.options.collect_stat {
self.options
.format
.infer_stats(&store, self.file_schema.clone(), &part_file.object_meta)
.await?
match self.collected_statistics.get(&part_file.object_meta) {
Some(statistics) => statistics,
None => {
let statistics = self
.options
.format
.infer_stats(
&store,
self.file_schema.clone(),
&part_file.object_meta,
)
.await?;
self.collected_statistics
.save(part_file.object_meta.clone(), statistics.clone());
statistics
}
}
} else {
Statistics::default()
};
Expand All @@ -434,6 +482,7 @@ mod tests {
test::{columns, object_store::register_test_store},
};
use arrow::datatypes::DataType;
use chrono::DateTime;

use super::*;

Expand Down Expand Up @@ -752,4 +801,38 @@ mod tests {

Ok(())
}

#[test]
fn test_statistics_cache() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
};

let cache = StatisticsCache::default();
assert!(cache.get(&meta).is_none());

cache.save(meta.clone(), Statistics::default());
assert!(cache.get(&meta).is_some());

// file size changed
let mut meta2 = meta.clone();
meta2.size = 2048;
assert!(cache.get(&meta2).is_none());

// file last_modified changed
let mut meta2 = meta.clone();
meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00")
.unwrap()
.into();
assert!(cache.get(&meta2).is_none());

// different file
let mut meta2 = meta.clone();
meta2.location = Path::from("test2");
assert!(cache.get(&meta2).is_none());
}
}

0 comments on commit 9bee83b

Please sign in to comment.