From 295ffb41f562f2c58b22ce36928df3f85f7fe09a Mon Sep 17 00:00:00 2001 From: Aleksey Kirilishin <54231417+avkirilishin@users.noreply.github.com> Date: Fri, 10 Jan 2025 13:19:23 +0300 Subject: [PATCH] test: Add plan execution during tests for bounded source (#14013) --- .../replace_with_order_preserving_variants.rs | 251 +++++++++++------- 1 file changed, 151 insertions(+), 100 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 96b2454fa330..9f5afc7abc2e 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -285,9 +285,7 @@ pub(crate) fn replace_with_order_preserving_variants( mod tests { use super::*; - use crate::datasource::file_format::file_compression_type::FileCompressionType; - use crate::datasource::listing::PartitionedFile; - use crate::datasource::physical_plan::{CsvExec, FileScanConfig}; + use crate::execution::TaskContext; use crate::physical_optimizer::test_utils::check_integrity; use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::filter::FilterExec; @@ -296,18 +294,24 @@ mod tests { use crate::physical_plan::{ displayable, get_plan_string, ExecutionPlan, Partitioning, }; - use crate::prelude::SessionConfig; + use crate::prelude::{SessionConfig, SessionContext}; use crate::test::TestStreamPartition; + use arrow::array::{ArrayRef, Int32Array}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use arrow::record_batch::RecordBatch; use datafusion_common::tree_node::{TransformedResult, TreeNode}; use datafusion_common::Result; - use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::expressions::{self, col, Column}; use datafusion_physical_expr::PhysicalSortExpr; + use datafusion_physical_plan::collect; + use datafusion_physical_plan::memory::MemoryExec; use datafusion_physical_plan::streaming::StreamingTableExec; + use object_store::memory::InMemory; + use object_store::ObjectStore; + use url::Url; use rstest::rstest; @@ -328,20 +332,24 @@ mod tests { /// * `$PLAN`: The plan to optimize. /// * `$SOURCE_UNBOUNDED`: Whether the given plan contains an unbounded source. macro_rules! assert_optimized_in_all_boundedness_situations { - ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr) => { + ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr, $PREFER_EXISTING_SORT: expr) => { if $SOURCE_UNBOUNDED { assert_optimized_prefer_sort_on_off!( $EXPECTED_UNBOUNDED_PLAN_LINES, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES, - $PLAN + $PLAN, + $PREFER_EXISTING_SORT, + $SOURCE_UNBOUNDED ); } else { assert_optimized_prefer_sort_on_off!( $EXPECTED_BOUNDED_PLAN_LINES, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, - $PLAN + $PLAN, + $PREFER_EXISTING_SORT, + $SOURCE_UNBOUNDED ); } }; @@ -359,19 +367,24 @@ mod tests { /// the flag `prefer_existing_sort` is `true`. /// * `$PLAN`: The plan to optimize. macro_rules! assert_optimized_prefer_sort_on_off { - ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => { - assert_optimized!( - $EXPECTED_PLAN_LINES, - $EXPECTED_OPTIMIZED_PLAN_LINES, - $PLAN.clone(), - false - ); - assert_optimized!( - $EXPECTED_PLAN_LINES, - $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, - $PLAN, - true - ); + ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => { + if $PREFER_EXISTING_SORT { + assert_optimized!( + $EXPECTED_PLAN_LINES, + $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, + $PLAN, + $PREFER_EXISTING_SORT, + $SOURCE_UNBOUNDED + ); + } else { + assert_optimized!( + $EXPECTED_PLAN_LINES, + $EXPECTED_OPTIMIZED_PLAN_LINES, + $PLAN, + $PREFER_EXISTING_SORT, + $SOURCE_UNBOUNDED + ); + } }; } @@ -385,7 +398,7 @@ mod tests { /// * `$PLAN`: The plan to optimize. /// * `$PREFER_EXISTING_SORT`: Value of the `prefer_existing_sort` flag. macro_rules! assert_optimized { - ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr) => { + ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => { let physical_plan = $PLAN; let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); @@ -412,6 +425,19 @@ mod tests { expected_optimized_lines, actual, "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" ); + + if !$SOURCE_UNBOUNDED { + let ctx = SessionContext::new(); + let object_store = InMemory::new(); + object_store.put(&object_store::path::Path::from("file_path"), bytes::Bytes::from("").into()).await?; + ctx.register_object_store(&Url::parse("test://").unwrap(), Arc::new(object_store)); + let task_ctx = Arc::new(TaskContext::from(&ctx)); + let res = collect(optimized_physical_plan, task_ctx).await; + assert!( + res.is_ok(), + "Some errors occurred while executing the optimized physical plan: {:?}", res.unwrap_err() + ); + } }; } @@ -420,13 +446,14 @@ mod tests { // Searches for a simple sort and a repartition just after it, the second repartition with 1 input partition should not be affected async fn test_replace_multiple_input_repartition_1( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition = repartition_exec_hash(repartition_exec_round_robin(source)); let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true); @@ -447,7 +474,7 @@ mod tests { " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -464,13 +491,13 @@ mod tests { " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -479,7 +506,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -488,13 +516,14 @@ mod tests { #[tokio::test] async fn test_with_inter_children_change_only( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr_default("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -538,7 +567,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", ]; // Expected unbounded result (same for with and without flag) @@ -564,7 +593,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC]", @@ -574,7 +603,7 @@ mod tests { " SortPreservingMergeExec: [a@0 ASC]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -583,7 +612,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -592,13 +622,14 @@ mod tests { #[tokio::test] async fn test_replace_multiple_input_repartition_2( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let filter = filter_exec(repartition_rr); @@ -623,7 +654,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -642,14 +673,14 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -658,7 +689,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -667,13 +699,14 @@ mod tests { #[tokio::test] async fn test_replace_multiple_input_repartition_with_extra_steps( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -701,7 +734,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -722,7 +755,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -730,7 +763,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -739,7 +772,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -748,13 +782,14 @@ mod tests { #[tokio::test] async fn test_replace_multiple_input_repartition_with_extra_steps_2( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr); @@ -786,7 +821,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -809,7 +844,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -818,7 +853,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -827,7 +862,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -836,13 +872,14 @@ mod tests { #[tokio::test] async fn test_not_replacing_when_no_need_to_preserve_sorting( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -867,7 +904,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -887,7 +924,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; @@ -898,7 +935,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -907,13 +945,14 @@ mod tests { #[tokio::test] async fn test_with_multiple_replacable_repartitions( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -944,7 +983,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -967,7 +1006,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -976,7 +1015,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -985,7 +1024,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -994,13 +1034,14 @@ mod tests { #[tokio::test] async fn test_not_replace_with_different_orderings( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -1028,7 +1069,7 @@ mod tests { " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -1046,7 +1087,7 @@ mod tests { " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; @@ -1057,7 +1098,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -1066,13 +1108,14 @@ mod tests { #[tokio::test] async fn test_with_lost_ordering( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -1093,7 +1136,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -1110,13 +1153,13 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -1125,7 +1168,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -1134,13 +1178,14 @@ mod tests { #[tokio::test] async fn test_with_lost_and_kept_ordering( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -1184,7 +1229,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -1211,7 +1256,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [c@1 ASC]", @@ -1222,7 +1267,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -1231,7 +1276,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -1240,6 +1286,7 @@ mod tests { #[tokio::test] async fn test_with_multiple_child_trees( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; @@ -1247,7 +1294,7 @@ mod tests { let left_source = if source_unbounded { stream_exec_ordered(&schema, left_sort_exprs) } else { - csv_exec_sorted(&schema, left_sort_exprs) + memory_exec_sorted(&schema, left_sort_exprs) }; let left_repartition_rr = repartition_exec_round_robin(left_source); let left_repartition_hash = repartition_exec_hash(left_repartition_rr); @@ -1258,7 +1305,7 @@ mod tests { let right_source = if source_unbounded { stream_exec_ordered(&schema, right_sort_exprs) } else { - csv_exec_sorted(&schema, right_sort_exprs) + memory_exec_sorted(&schema, right_sort_exprs) }; let right_repartition_rr = repartition_exec_round_robin(right_source); let right_repartition_hash = repartition_exec_hash(right_repartition_rr); @@ -1299,11 +1346,11 @@ mod tests { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -1330,11 +1377,11 @@ mod tests { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; @@ -1345,7 +1392,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -1492,33 +1540,36 @@ mod tests { ) } - // creates a csv exec source for the test purposes - // projection and has_header parameters are given static due to testing needs - fn csv_exec_sorted( + // creates a memory exec source for the test purposes + // projection parameter is given static due to testing needs + fn memory_exec_sorted( schema: &SchemaRef, sort_exprs: impl IntoIterator, ) -> Arc { - let sort_exprs = sort_exprs.into_iter().collect(); - let projection: Vec = vec![0, 2, 3]; + pub fn make_partition(schema: &SchemaRef, sz: i32) -> RecordBatch { + let values = (0..sz).collect::>(); + let arr = Arc::new(Int32Array::from(values)); + let arr = arr as ArrayRef; - Arc::new( - CsvExec::builder( - FileScanConfig::new( - ObjectStoreUrl::parse("test:///").unwrap(), - schema.clone(), - ) - .with_file(PartitionedFile::new("file_path".to_string(), 100)) - .with_projection(Some(projection)) - .with_output_ordering(vec![sort_exprs]), + RecordBatch::try_new( + schema.clone(), + vec![arr.clone(), arr.clone(), arr.clone(), arr], ) - .with_has_header(true) - .with_delimeter(0) - .with_quote(b'"') - .with_escape(None) - .with_comment(None) - .with_newlines_in_values(false) - .with_file_compression_type(FileCompressionType::UNCOMPRESSED) - .build(), - ) + .unwrap() + } + + let rows = 5; + let partitions = 1; + let sort_exprs = sort_exprs.into_iter().collect(); + Arc::new({ + let data: Vec> = (0..partitions) + .map(|_| vec![make_partition(schema, rows)]) + .collect(); + let projection: Vec = vec![0, 2, 3]; + MemoryExec::try_new(&data, schema.clone(), Some(projection)) + .unwrap() + .try_with_sort_information(vec![sort_exprs]) + .unwrap() + }) } }