Skip to content

Commit

Permalink
fixing datafusion tests (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen authored Sep 25, 2021
1 parent fde82cf commit b585f3b
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 162 deletions.
9 changes: 7 additions & 2 deletions datafusion/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
};

use arrow::array::BooleanArray;
use arrow::array::{BooleanArray, Array};
use arrow::compute::filter::filter_record_batch;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::Result as ArrowResult;
Expand All @@ -39,6 +39,7 @@ use arrow::record_batch::RecordBatch;
use async_trait::async_trait;

use futures::stream::{Stream, StreamExt};
use arrow::compute::boolean::{and, is_not_null};

/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
/// include in its output batches.
Expand Down Expand Up @@ -184,7 +185,11 @@ fn batch_filter(
.into_arrow_external_error()
})
// apply filter array to record batch
.and_then(|filter_array| filter_record_batch(batch, filter_array))
.and_then(|filter_array| {
let is_not_null = is_not_null(filter_array as &dyn Array);
let and_filter = and(&is_not_null, filter_array)?;
filter_record_batch(batch, &and_filter)
})
})
}

Expand Down
1 change: 0 additions & 1 deletion datafusion/tests/parquet_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,6 @@ async fn make_test_file(scenario: Scenario) -> NamedTempFile {
};

let schema = batches[0].schema();
eprintln!("----------- schema {:?}", schema);

let options = WriteOptions {
compression: Compression::Uncompressed,
Expand Down
Loading

0 comments on commit b585f3b

Please sign in to comment.