Skip to content

Commit

Permalink
first pass at implementing predicate pushdown, seems to work (#16)
Browse files Browse the repository at this point in the history
* first pass at implementing predicate pushdown, seems to work

* made predicate pushdowns inexact and removed row filtering logic
  • Loading branch information
maximedion2 authored Apr 29, 2024
1 parent 43a5edb commit 4fd4131
Show file tree
Hide file tree
Showing 12 changed files with 568 additions and 158 deletions.
10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,17 @@ arrow-cast = { version = "50.0.0" }
arrow-schema = { version = "50.0.0" }
arrow-data = { version = "50.0.0" }
datafusion = { version = "36.0", optional = true }
datafusion-expr = { version = "36.0", optional = true }
datafusion-common = { version = "36.0", optional = true }
datafusion-physical-expr = { version = "36.0", optional = true }

[features]
datafusion = ["dep:datafusion"]
datafusion = [
"dep:datafusion",
"dep:datafusion-physical-expr",
"dep:datafusion-expr",
"dep:datafusion-common",
]
all = ["datafusion"]

[dev-dependencies]
Expand Down
44 changes: 32 additions & 12 deletions src/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ pub struct ZarrRecordBatchStream<T: ZarrStream> {
meta: ZarrStoreMetadata,
filter: Option<ZarrChunkFilter>,
state: ZarrStreamState<T>,
mask: Option<BooleanArray>,

// an option so that we can "take" the wrapper and bundle it
// in a future when polling the stream.
Expand Down Expand Up @@ -266,7 +265,6 @@ impl<T: ZarrStream> ZarrRecordBatchStream<T> {
predicate_store_wrapper,
store_wrapper: Some(ZarrStoreWrapper::new(zarr_store)),
state: ZarrStreamState::Init,
mask: None,
}
}
}
Expand Down Expand Up @@ -355,7 +353,6 @@ where
.skip_next_chunk();
self.state = ZarrStreamState::Init;
} else {
self.mask = Some(mask);
let wrapper = self.store_wrapper.take().expect(LOST_STORE_ERR);
let fut = wrapper.get_next().boxed();
self.state = ZarrStreamState::Reading(fut);
Expand All @@ -373,16 +370,13 @@ where

let chunk = chunk?;
let container = ZarrInMemoryChunkContainer::new(chunk);
let mut zarr_reader = ZarrRecordBatchReader::new(
let zarr_reader = ZarrRecordBatchReader::new(
self.meta.clone(),
Some(container),
None,
None,
);

if self.mask.is_some() {
zarr_reader = zarr_reader.with_row_mask(self.mask.take().unwrap());
}
self.state = ZarrStreamState::Decoding(zarr_reader);
} else {
// if store returns none, it's the end and it's time to return
Expand Down Expand Up @@ -469,7 +463,7 @@ impl<T: for<'a> ZarrReadAsync<'a> + Clone + Unpin + Send + 'static>

let mut predicate_stream: Option<ZarrStoreAsync<T>> = None;
if let Some(filter) = &self.filter {
let predicate_proj = filter.get_all_projections();
let predicate_proj = filter.get_all_projections()?;
predicate_stream = Some(
ZarrStoreAsync::new(
self.zarr_reader_async.clone(),
Expand Down Expand Up @@ -840,11 +834,37 @@ mod zarr_async_reader_tests {
("float_data".to_string(), DataType::Float64),
]);

let rec = &records[1];
// check the values in a chunk. the predicate pushdown only takes care of
// skipping whole chunks, so there is no guarantee that the values in the
// record batch fully satisfy the predicate, here we are only checking that
// the first chunk that was read is the first one with some values that
// satisfy the predicate.
let rec = &records[0];
validate_names_and_types(&target_types, rec);
validate_primitive_column::<Float64Type, f64>("lat", rec, &[38.8, 38.9, 39.0]);
validate_primitive_column::<Float64Type, f64>("lon", rec, &[-109.7, -109.7, -109.7]);
validate_primitive_column::<Float64Type, f64>("float_data", rec, &[1042.0, 1043.0, 1044.0]);
validate_primitive_column::<Float64Type, f64>(
"lat",
rec,
&[
38.4, 38.5, 38.6, 38.7, 38.4, 38.5, 38.6, 38.7, 38.4, 38.5, 38.6, 38.7, 38.4, 38.5,
38.6, 38.7,
],
);
validate_primitive_column::<Float64Type, f64>(
"lon",
rec,
&[
-110.0, -110.0, -110.0, -110.0, -109.9, -109.9, -109.9, -109.9, -109.8, -109.8,
-109.8, -109.8, -109.7, -109.7, -109.7, -109.7,
],
);
validate_primitive_column::<Float64Type, f64>(
"float_data",
rec,
&[
1005.0, 1006.0, 1007.0, 1008.0, 1016.0, 1017.0, 1018.0, 1019.0, 1027.0, 1028.0,
1029.0, 1030.0, 1038.0, 1039.0, 1040.0, 1041.0,
],
);
}

#[tokio::test]
Expand Down
32 changes: 24 additions & 8 deletions src/datafusion/file_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,26 @@

use arrow_schema::ArrowError;
use datafusion::{datasource::physical_plan::FileOpener, error::DataFusionError};
use datafusion_physical_expr::PhysicalExpr;
use futures::{StreamExt, TryStreamExt};
use std::sync::Arc;

use crate::{
async_reader::{ZarrPath, ZarrRecordBatchStreamBuilder},
async_reader::{ZarrPath, ZarrReadAsync, ZarrRecordBatchStreamBuilder},
reader::ZarrProjection,
};

use super::config::ZarrConfig;
use super::helpers::build_row_filter;

pub struct ZarrFileOpener {
config: ZarrConfig,
filters: Option<Arc<dyn PhysicalExpr>>,
}

impl ZarrFileOpener {
pub fn new(config: ZarrConfig) -> Self {
Self { config }
pub fn new(config: ZarrConfig, filters: Option<Arc<dyn PhysicalExpr>>) -> Self {
Self { config, filters }
}
}

Expand All @@ -43,15 +47,27 @@ impl FileOpener for ZarrFileOpener {
) -> datafusion::error::Result<datafusion::datasource::physical_plan::FileOpenFuture> {
let config = self.config.clone();

let filters_to_pushdown = self.filters.clone();
Ok(Box::pin(async move {
let zarr_path = ZarrPath::new(config.object_store, file_meta.object_meta.location);

let rng = file_meta.range.map(|r| (r.start as usize, r.end as usize));

let projection = ZarrProjection::from(config.projection.as_ref());

let batch_reader = ZarrRecordBatchStreamBuilder::new(zarr_path)
.with_projection(projection)
let mut batch_reader_builder =
ZarrRecordBatchStreamBuilder::new(zarr_path.clone()).with_projection(projection);
if let Some(filters) = filters_to_pushdown {
let schema = zarr_path
.get_zarr_metadata()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?
.arrow_schema()
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let filters = build_row_filter(&filters, &schema)?;
if let Some(filters) = filters {
batch_reader_builder = batch_reader_builder.with_filter(filters);
}
}
let batch_reader = batch_reader_builder
.build_partial_reader(rng)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Expand Down Expand Up @@ -81,7 +97,7 @@ mod tests {
let test_data = get_test_v2_data_path("lat_lon_example.zarr".to_string());

let config = ZarrConfig::new(Arc::new(local_fs));
let opener = ZarrFileOpener::new(config);
let opener = ZarrFileOpener::new(config, None);

let file_meta = FileMeta {
object_meta: ObjectMeta {
Expand Down
Loading

0 comments on commit 4fd4131

Please sign in to comment.