From bf28e9bed2d872abcfb2f54092f342e3d950bf41 Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 10 Jan 2025 05:58:29 +0800 Subject: [PATCH 1/4] Re-export TypeSignatureClass from the datafusion-expr package (#14051) --- datafusion/expr/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index c5016a18d443..a57fd80c48e1 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -71,7 +71,8 @@ pub use datafusion_expr_common::columnar_value::ColumnarValue; pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; pub use datafusion_expr_common::operator::Operator; pub use datafusion_expr_common::signature::{ - ArrayFunctionSignature, Signature, TypeSignature, Volatility, TIMEZONE_WILDCARD, + ArrayFunctionSignature, Signature, TypeSignature, TypeSignatureClass, Volatility, + TIMEZONE_WILDCARD, }; pub use datafusion_expr_common::type_coercion::binary; pub use expr::{ From 300c2afb076caa118c734f0d998595958f360ac3 Mon Sep 17 00:00:00 2001 From: Jonah Gao Date: Fri, 10 Jan 2025 16:33:20 +0800 Subject: [PATCH 2/4] Fix clippy for Rust 1.84 (#14065) --- datafusion/common/src/pyarrow.rs | 3 +++ datafusion/core/src/datasource/listing/table.rs | 4 ++-- datafusion/core/src/execution/context/mod.rs | 9 +++------ datafusion/physical-expr-common/src/physical_expr.rs | 4 +--- datafusion/physical-expr-common/src/sort_expr.rs | 6 +++--- 5 files changed, 12 insertions(+), 14 deletions(-) diff --git a/datafusion/common/src/pyarrow.rs b/datafusion/common/src/pyarrow.rs index bdcf831c7884..29869c8da561 100644 --- a/datafusion/common/src/pyarrow.rs +++ b/datafusion/common/src/pyarrow.rs @@ -138,6 +138,9 @@ mod tests { fn test_py_scalar() { init_python(); + // TODO: remove this attribute when bumping pyo3 to v0.23.0 + // See: + #[allow(unexpected_cfgs)] Python::with_gil(|py| { let scalar_float = ScalarValue::Float64(Some(12.34)); let py_float = scalar_float.into_py(py).call_method0(py, "as_py").unwrap(); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 7470597ef72c..06b94f804268 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1148,8 +1148,8 @@ impl ListingTable { /// This method first checks if the statistics for the given file are already cached. /// If they are, it returns the cached statistics. /// If they are not, it infers the statistics from the file and stores them in the cache. - async fn do_collect_statistics<'a>( - &'a self, + async fn do_collect_statistics( + &self, ctx: &SessionState, store: &Arc, part_file: &PartitionedFile, diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index f20475df150b..e5da49ad7b8b 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1047,7 +1047,7 @@ impl SessionContext { Ok(table) } - async fn find_and_deregister<'a>( + async fn find_and_deregister( &self, table_ref: impl Into, table_type: TableType, @@ -1481,10 +1481,7 @@ impl SessionContext { /// provided reference. /// /// [`register_table`]: SessionContext::register_table - pub async fn table<'a>( - &self, - table_ref: impl Into, - ) -> Result { + pub async fn table(&self, table_ref: impl Into) -> Result { let table_ref: TableReference = table_ref.into(); let provider = self.table_provider(table_ref.clone()).await?; let plan = LogicalPlanBuilder::scan( @@ -1511,7 +1508,7 @@ impl SessionContext { } /// Return a [`TableProvider`] for the specified table. - pub async fn table_provider<'a>( + pub async fn table_provider( &self, table_ref: impl Into, ) -> Result> { diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index c2e892d63da0..e90f9c32ee87 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -159,9 +159,7 @@ pub trait DynEq { impl DynEq for T { fn dyn_eq(&self, other: &dyn Any) -> bool { - other - .downcast_ref::() - .map_or(false, |other| other == self) + other.downcast_ref::() == Some(self) } } diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 0d7501610662..8395d3e5263d 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -291,9 +291,9 @@ impl PhysicalSortRequirement { /// Returns whether this requirement is equal or more specific than `other`. pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool { self.expr.eq(&other.expr) - && other.options.map_or(true, |other_opts| { - self.options.map_or(false, |opts| opts == other_opts) - }) + && other + .options + .map_or(true, |other_opts| self.options == Some(other_opts)) } #[deprecated(since = "43.0.0", note = "use LexRequirement::from_lex_ordering")] From 268df42a600e85ab80c6934d4ffb54062649dac2 Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 10 Jan 2025 17:14:16 +0800 Subject: [PATCH 3/4] fix: incorrect error message of function_length_check (#14056) * minor fix * add ut * remove check for 0 arg --- .../expr/src/type_coercion/functions.rs | 33 ++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 96bb5c4b2d8f..5294cc526d38 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -438,18 +438,11 @@ fn get_valid_types( } fn function_length_check(length: usize, expected_length: usize) -> Result<()> { - if length < 1 { - return plan_err!( - "The signature expected at least one argument but received {expected_length}" - ); - } - if length != expected_length { return plan_err!( - "The signature expected {length} arguments but received {expected_length}" + "The signature expected {expected_length} arguments but received {length}" ); } - Ok(()) } @@ -939,6 +932,7 @@ mod tests { use super::*; use arrow::datatypes::Field; + use datafusion_common::assert_contains; #[test] fn test_string_conversion() { @@ -1027,6 +1021,29 @@ mod tests { Ok(()) } + #[test] + fn test_get_valid_types_length_check() -> Result<()> { + let signature = TypeSignature::Numeric(1); + + let err = get_valid_types(&signature, &[]).unwrap_err(); + assert_contains!( + err.to_string(), + "The signature expected 1 arguments but received 0" + ); + + let err = get_valid_types( + &signature, + &[DataType::Int32, DataType::Int32, DataType::Int32], + ) + .unwrap_err(); + assert_contains!( + err.to_string(), + "The signature expected 1 arguments but received 3" + ); + + Ok(()) + } + #[test] fn test_fixed_list_wildcard_coerce() -> Result<()> { let inner = Arc::new(Field::new_list_field(DataType::Int32, false)); From 295ffb41f562f2c58b22ce36928df3f85f7fe09a Mon Sep 17 00:00:00 2001 From: Aleksey Kirilishin <54231417+avkirilishin@users.noreply.github.com> Date: Fri, 10 Jan 2025 13:19:23 +0300 Subject: [PATCH 4/4] test: Add plan execution during tests for bounded source (#14013) --- .../replace_with_order_preserving_variants.rs | 251 +++++++++++------- 1 file changed, 151 insertions(+), 100 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 96b2454fa330..9f5afc7abc2e 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -285,9 +285,7 @@ pub(crate) fn replace_with_order_preserving_variants( mod tests { use super::*; - use crate::datasource::file_format::file_compression_type::FileCompressionType; - use crate::datasource::listing::PartitionedFile; - use crate::datasource::physical_plan::{CsvExec, FileScanConfig}; + use crate::execution::TaskContext; use crate::physical_optimizer::test_utils::check_integrity; use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::filter::FilterExec; @@ -296,18 +294,24 @@ mod tests { use crate::physical_plan::{ displayable, get_plan_string, ExecutionPlan, Partitioning, }; - use crate::prelude::SessionConfig; + use crate::prelude::{SessionConfig, SessionContext}; use crate::test::TestStreamPartition; + use arrow::array::{ArrayRef, Int32Array}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use arrow::record_batch::RecordBatch; use datafusion_common::tree_node::{TransformedResult, TreeNode}; use datafusion_common::Result; - use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::expressions::{self, col, Column}; use datafusion_physical_expr::PhysicalSortExpr; + use datafusion_physical_plan::collect; + use datafusion_physical_plan::memory::MemoryExec; use datafusion_physical_plan::streaming::StreamingTableExec; + use object_store::memory::InMemory; + use object_store::ObjectStore; + use url::Url; use rstest::rstest; @@ -328,20 +332,24 @@ mod tests { /// * `$PLAN`: The plan to optimize. /// * `$SOURCE_UNBOUNDED`: Whether the given plan contains an unbounded source. macro_rules! assert_optimized_in_all_boundedness_situations { - ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr) => { + ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr, $PREFER_EXISTING_SORT: expr) => { if $SOURCE_UNBOUNDED { assert_optimized_prefer_sort_on_off!( $EXPECTED_UNBOUNDED_PLAN_LINES, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES, - $PLAN + $PLAN, + $PREFER_EXISTING_SORT, + $SOURCE_UNBOUNDED ); } else { assert_optimized_prefer_sort_on_off!( $EXPECTED_BOUNDED_PLAN_LINES, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, - $PLAN + $PLAN, + $PREFER_EXISTING_SORT, + $SOURCE_UNBOUNDED ); } }; @@ -359,19 +367,24 @@ mod tests { /// the flag `prefer_existing_sort` is `true`. /// * `$PLAN`: The plan to optimize. macro_rules! assert_optimized_prefer_sort_on_off { - ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => { - assert_optimized!( - $EXPECTED_PLAN_LINES, - $EXPECTED_OPTIMIZED_PLAN_LINES, - $PLAN.clone(), - false - ); - assert_optimized!( - $EXPECTED_PLAN_LINES, - $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, - $PLAN, - true - ); + ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => { + if $PREFER_EXISTING_SORT { + assert_optimized!( + $EXPECTED_PLAN_LINES, + $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, + $PLAN, + $PREFER_EXISTING_SORT, + $SOURCE_UNBOUNDED + ); + } else { + assert_optimized!( + $EXPECTED_PLAN_LINES, + $EXPECTED_OPTIMIZED_PLAN_LINES, + $PLAN, + $PREFER_EXISTING_SORT, + $SOURCE_UNBOUNDED + ); + } }; } @@ -385,7 +398,7 @@ mod tests { /// * `$PLAN`: The plan to optimize. /// * `$PREFER_EXISTING_SORT`: Value of the `prefer_existing_sort` flag. macro_rules! assert_optimized { - ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr) => { + ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => { let physical_plan = $PLAN; let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); @@ -412,6 +425,19 @@ mod tests { expected_optimized_lines, actual, "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" ); + + if !$SOURCE_UNBOUNDED { + let ctx = SessionContext::new(); + let object_store = InMemory::new(); + object_store.put(&object_store::path::Path::from("file_path"), bytes::Bytes::from("").into()).await?; + ctx.register_object_store(&Url::parse("test://").unwrap(), Arc::new(object_store)); + let task_ctx = Arc::new(TaskContext::from(&ctx)); + let res = collect(optimized_physical_plan, task_ctx).await; + assert!( + res.is_ok(), + "Some errors occurred while executing the optimized physical plan: {:?}", res.unwrap_err() + ); + } }; } @@ -420,13 +446,14 @@ mod tests { // Searches for a simple sort and a repartition just after it, the second repartition with 1 input partition should not be affected async fn test_replace_multiple_input_repartition_1( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition = repartition_exec_hash(repartition_exec_round_robin(source)); let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true); @@ -447,7 +474,7 @@ mod tests { " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -464,13 +491,13 @@ mod tests { " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -479,7 +506,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -488,13 +516,14 @@ mod tests { #[tokio::test] async fn test_with_inter_children_change_only( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr_default("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -538,7 +567,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", ]; // Expected unbounded result (same for with and without flag) @@ -564,7 +593,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC]", @@ -574,7 +603,7 @@ mod tests { " SortPreservingMergeExec: [a@0 ASC]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -583,7 +612,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -592,13 +622,14 @@ mod tests { #[tokio::test] async fn test_replace_multiple_input_repartition_2( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let filter = filter_exec(repartition_rr); @@ -623,7 +654,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -642,14 +673,14 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -658,7 +689,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -667,13 +699,14 @@ mod tests { #[tokio::test] async fn test_replace_multiple_input_repartition_with_extra_steps( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -701,7 +734,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -722,7 +755,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -730,7 +763,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -739,7 +772,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -748,13 +782,14 @@ mod tests { #[tokio::test] async fn test_replace_multiple_input_repartition_with_extra_steps_2( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr); @@ -786,7 +821,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -809,7 +844,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -818,7 +853,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -827,7 +862,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -836,13 +872,14 @@ mod tests { #[tokio::test] async fn test_not_replacing_when_no_need_to_preserve_sorting( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -867,7 +904,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -887,7 +924,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; @@ -898,7 +935,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -907,13 +945,14 @@ mod tests { #[tokio::test] async fn test_with_multiple_replacable_repartitions( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -944,7 +983,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -967,7 +1006,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -976,7 +1015,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -985,7 +1024,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -994,13 +1034,14 @@ mod tests { #[tokio::test] async fn test_not_replace_with_different_orderings( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -1028,7 +1069,7 @@ mod tests { " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -1046,7 +1087,7 @@ mod tests { " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; @@ -1057,7 +1098,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -1066,13 +1108,14 @@ mod tests { #[tokio::test] async fn test_with_lost_ordering( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -1093,7 +1136,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -1110,13 +1153,13 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -1125,7 +1168,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -1134,13 +1178,14 @@ mod tests { #[tokio::test] async fn test_with_lost_and_kept_ordering( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -1184,7 +1229,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -1211,7 +1256,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [c@1 ASC]", @@ -1222,7 +1267,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -1231,7 +1276,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -1240,6 +1286,7 @@ mod tests { #[tokio::test] async fn test_with_multiple_child_trees( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; @@ -1247,7 +1294,7 @@ mod tests { let left_source = if source_unbounded { stream_exec_ordered(&schema, left_sort_exprs) } else { - csv_exec_sorted(&schema, left_sort_exprs) + memory_exec_sorted(&schema, left_sort_exprs) }; let left_repartition_rr = repartition_exec_round_robin(left_source); let left_repartition_hash = repartition_exec_hash(left_repartition_rr); @@ -1258,7 +1305,7 @@ mod tests { let right_source = if source_unbounded { stream_exec_ordered(&schema, right_sort_exprs) } else { - csv_exec_sorted(&schema, right_sort_exprs) + memory_exec_sorted(&schema, right_sort_exprs) }; let right_repartition_rr = repartition_exec_round_robin(right_source); let right_repartition_hash = repartition_exec_hash(right_repartition_rr); @@ -1299,11 +1346,11 @@ mod tests { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -1330,11 +1377,11 @@ mod tests { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; @@ -1345,7 +1392,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -1492,33 +1540,36 @@ mod tests { ) } - // creates a csv exec source for the test purposes - // projection and has_header parameters are given static due to testing needs - fn csv_exec_sorted( + // creates a memory exec source for the test purposes + // projection parameter is given static due to testing needs + fn memory_exec_sorted( schema: &SchemaRef, sort_exprs: impl IntoIterator, ) -> Arc { - let sort_exprs = sort_exprs.into_iter().collect(); - let projection: Vec = vec![0, 2, 3]; + pub fn make_partition(schema: &SchemaRef, sz: i32) -> RecordBatch { + let values = (0..sz).collect::>(); + let arr = Arc::new(Int32Array::from(values)); + let arr = arr as ArrayRef; - Arc::new( - CsvExec::builder( - FileScanConfig::new( - ObjectStoreUrl::parse("test:///").unwrap(), - schema.clone(), - ) - .with_file(PartitionedFile::new("file_path".to_string(), 100)) - .with_projection(Some(projection)) - .with_output_ordering(vec![sort_exprs]), + RecordBatch::try_new( + schema.clone(), + vec![arr.clone(), arr.clone(), arr.clone(), arr], ) - .with_has_header(true) - .with_delimeter(0) - .with_quote(b'"') - .with_escape(None) - .with_comment(None) - .with_newlines_in_values(false) - .with_file_compression_type(FileCompressionType::UNCOMPRESSED) - .build(), - ) + .unwrap() + } + + let rows = 5; + let partitions = 1; + let sort_exprs = sort_exprs.into_iter().collect(); + Arc::new({ + let data: Vec> = (0..partitions) + .map(|_| vec![make_partition(schema, rows)]) + .collect(); + let projection: Vec = vec![0, 2, 3]; + MemoryExec::try_new(&data, schema.clone(), Some(projection)) + .unwrap() + .try_with_sort_information(vec![sort_exprs]) + .unwrap() + }) } }