diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 84b312520161..b42107384d97 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -782,7 +782,7 @@ mod tests { array::{Int64Array, Int8Array, StringArray}, datatypes::{DataType, Field, SchemaBuilder}, }; - use arrow_array::Date64Array; + use arrow_array::{Date64Array, GenericStringArray, StructArray}; use chrono::{TimeZone, Utc}; use datafusion_common::{assert_contains, ToDFSchema}; use datafusion_common::{FileType, GetExt, ScalarValue}; @@ -795,6 +795,9 @@ mod tests { use object_store::ObjectMeta; use std::fs::{self, File}; use std::io::Write; + use arrow_array::cast::AsArray; + use arrow_schema::Fields; + use parquet::arrow::ArrowWriter; use tempfile::TempDir; use url::Url; @@ -2136,4 +2139,65 @@ mod tests { let execution_props = ExecutionProps::new(); create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap() } + + + #[tokio::test] + async fn test_struct_filter_parquet() -> Result<()> { + let tmp_dir = TempDir::new()?; + let path =tmp_dir.path().to_str().unwrap().to_string()+"/test.parquet"; + write_file(&path); + let ctx = SessionContext::new(); + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); + ctx.register_listing_table("base_table", path, opt, None, None) + .await + .unwrap(); + let sql = "select * from base_table where name='test02'"; + let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap(); + assert_eq!(batch.len(),1); + let expected = ["+---------------------+----+--------+", + "| struct | id | name |", + "+---------------------+----+--------+", + "| {id: 4, name: aaa2} | 2 | test02 |", + "+---------------------+----+--------+"]; + crate::assert_batches_eq!(expected, &batch); + Ok(()) + } + + fn write_file(file: &String) { + let struct_fields = Fields::from(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + ]); + let schema = Schema::new(vec![ + Field::new("struct", DataType::Struct(struct_fields.clone()), false), + Field::new("id", DataType::Int64, true), + Field::new("name", DataType::Utf8, false), + ]); + let id_array = Int64Array::from(vec![Some(1), Some(2)]); + let columns = vec![ + Arc::new(Int64Array::from(vec![3, 4])) as _, + Arc::new(StringArray::from(vec!["aaa1", "aaa2"])) as _, + ]; + let struct_array = StructArray::new(struct_fields, columns, None); + + let name_array = StringArray::from(vec![Some("test01"), Some("test02")]); + let schema = Arc::new(schema); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(struct_array), + Arc::new(id_array), + Arc::new(name_array), + ], + ) + .unwrap(); + let file = File::create(file).unwrap(); + let w_opt = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(file, schema, Some(w_opt)).unwrap(); + writer.write(&batch).unwrap(); + writer.flush().unwrap(); + writer.close().unwrap(); + + } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index a0637f379610..190e733b42c3 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -280,7 +280,7 @@ fn find_column_index( .columns() .iter() .enumerate() - .find(|(_idx, c)| c.column_descr().name() == column.name()) + .find(|(_idx, c)| c.column_descr().name() == column.name() && c.column_descr().path().parts().len() == 0) .map(|(idx, _c)| idx); if col_idx.is_none() {