Skip to content

Commit

Permalink
Dont consider struct fields for filtering in parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
manoj-inukolunu committed Jan 13, 2024
1 parent d9a1d42 commit 0d8d91a
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
66 changes: 65 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ mod tests {
array::{Int64Array, Int8Array, StringArray},
datatypes::{DataType, Field, SchemaBuilder},
};
use arrow_array::Date64Array;
use arrow_array::{Date64Array, GenericStringArray, StructArray};
use chrono::{TimeZone, Utc};
use datafusion_common::{assert_contains, ToDFSchema};
use datafusion_common::{FileType, GetExt, ScalarValue};
Expand All @@ -795,6 +795,9 @@ mod tests {
use object_store::ObjectMeta;
use std::fs::{self, File};
use std::io::Write;
use arrow_array::cast::AsArray;
use arrow_schema::Fields;
use parquet::arrow::ArrowWriter;
use tempfile::TempDir;
use url::Url;

Expand Down Expand Up @@ -2136,4 +2139,65 @@ mod tests {
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
}


#[tokio::test]
async fn test_struct_filter_parquet() -> Result<()> {
let tmp_dir = TempDir::new()?;
let path =tmp_dir.path().to_str().unwrap().to_string()+"/test.parquet";
write_file(&path);
let ctx = SessionContext::new();
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
ctx.register_listing_table("base_table", path, opt, None, None)
.await
.unwrap();
let sql = "select * from base_table where name='test02'";
let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap();
assert_eq!(batch.len(),1);
let expected = ["+---------------------+----+--------+",
"| struct | id | name |",
"+---------------------+----+--------+",
"| {id: 4, name: aaa2} | 2 | test02 |",
"+---------------------+----+--------+"];
crate::assert_batches_eq!(expected, &batch);
Ok(())
}

fn write_file(file: &String) {
let struct_fields = Fields::from(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
]);
let schema = Schema::new(vec![
Field::new("struct", DataType::Struct(struct_fields.clone()), false),
Field::new("id", DataType::Int64, true),
Field::new("name", DataType::Utf8, false),
]);
let id_array = Int64Array::from(vec![Some(1), Some(2)]);
let columns = vec![
Arc::new(Int64Array::from(vec![3, 4])) as _,
Arc::new(StringArray::from(vec!["aaa1", "aaa2"])) as _,
];
let struct_array = StructArray::new(struct_fields, columns, None);

let name_array = StringArray::from(vec![Some("test01"), Some("test02")]);
let schema = Arc::new(schema);

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(struct_array),
Arc::new(id_array),
Arc::new(name_array),
],
)
.unwrap();
let file = File::create(file).unwrap();
let w_opt = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(file, schema, Some(w_opt)).unwrap();
writer.write(&batch).unwrap();
writer.flush().unwrap();
writer.close().unwrap();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ fn find_column_index(
.columns()
.iter()
.enumerate()
.find(|(_idx, c)| c.column_descr().name() == column.name())
.find(|(_idx, c)| c.column_descr().name() == column.name() && c.column_descr().path().parts().len() == 0)
.map(|(idx, _c)| idx);

if col_idx.is_none() {
Expand Down

0 comments on commit 0d8d91a

Please sign in to comment.