diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 8d64f85fb2..1a01d0c6ec 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -1764,8 +1764,11 @@ impl From for DeltaColumn { #[cfg(test)] mod tests { - use arrow_array::StructArray; - use arrow_schema::Schema; + use crate::operations::create::CreateBuilder; + use crate::operations::write::SchemaMode; + use crate::writer::test_utils::get_delta_schema; + use arrow::array::StructArray; + use arrow::datatypes::{Field, Schema}; use chrono::{TimeZone, Utc}; use datafusion::assert_batches_sorted_eq; use datafusion::datasource::physical_plan::ParquetExec; @@ -1774,13 +1777,12 @@ mod tests { use datafusion_expr::lit; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf; + use delta_kernel::schema::StructField; use object_store::path::Path; use serde_json::json; use std::ops::Deref; use super::*; - use crate::operations::write::SchemaMode; - use crate::writer::test_utils::get_delta_schema; // test deserialization of serialized partition values. // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization @@ -2566,4 +2568,57 @@ mod tests { Ok(true) } } + + #[tokio::test] + async fn parent_distribution_requirements_bug() { + let arr: Arc = Arc::new(arrow::array::StringArray::from(vec!["s"])); + let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap(); + + let path = "/tmp/table"; + + let mut table = CreateBuilder::new() + .with_location(path) + .with_columns([StructField { + name: "a".to_string(), + data_type: delta_kernel::schema::DataType::STRING, + nullable: false, + metadata: HashMap::new(), + }]) + .await + .unwrap(); + + table = crate::DeltaOps(table) + .write(vec![batch.clone()]) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + table = crate::DeltaOps(table) + .write(vec![batch]) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let config = SessionConfig::default(); + let ctx = SessionContext::new_with_config(config); + + ctx.register_table("table", Arc::new(table)).unwrap(); + ctx.sql("SELECT * FROM `table` WHERE `a` > 's' ORDER BY `a` ASC") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let re_opened_table = open_table(path).await.unwrap(); + ctx.register_table("re_opened_table", Arc::new(re_opened_table)) + .unwrap(); + ctx.sql("SELECT * FROM `re_opened_table` WHERE `a` > 's' ORDER BY `a` ASC") + .await + .unwrap() + .collect() + .await + .unwrap(); + assert!(false); + } }