Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Jan 1, 2024
1 parent 7f2c2bf commit fac0546
Showing 1 changed file with 2 additions and 232 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,171 +36,11 @@ use datafusion_physical_plan::unbounded_output;
// /// descendants to decide whether it is beneficial to replace order-losing (but
// /// somewhat faster) variants of certain operators with their order-preserving
// /// (but somewhat slower) cousins.
// #[derive(Debug, Clone)]
// pub(crate) struct OrderPreservationContext {
// pub(crate) plan: Arc<dyn ExecutionPlan>,
// ordering_connection: bool,
// children_nodes: Vec<Self>,
// }
//
// impl OrderPreservationContext {
// /// Creates an empty context tree. Each node has `false` connections.
// pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
// let children = plan.children();
// Self {
// plan,
// ordering_connection: false,
// children_nodes: children.into_iter().map(Self::new).collect(),
// }
// }
//
// /// Creates a new order-preservation context from those of children nodes.
// pub fn update_children(mut self) -> Result<Self> {
// for node in self.children_nodes.iter_mut() {
// let plan = node.plan.clone();
// let children = plan.children();
// let maintains_input_order = plan.maintains_input_order();
// let inspect_child = |idx| {
// maintains_input_order[idx]
// || is_coalesce_partitions(&plan)
// || is_repartition(&plan)
// };
//
// // We cut the path towards nodes that do not maintain ordering.
// for (idx, c) in node.children_nodes.iter_mut().enumerate() {
// c.ordering_connection &= inspect_child(idx);
// }
//
// node.ordering_connection = if children.is_empty() {
// false
// } else if !node.children_nodes[0].ordering_connection
// && ((is_repartition(&plan) && !maintains_input_order[0])
// || (is_coalesce_partitions(&plan)
// && children[0].output_ordering().is_some()))
// {
// // We either have a RepartitionExec or a CoalescePartitionsExec
// // and they lose their input ordering, so initiate connection:
// true
// } else {
// // Maintain connection if there is a child with a connection,
// // and operator can possibly maintain that connection (either
// // in its current form or when we replace it with the corresponding
// // order preserving operator).
// node.children_nodes
// .iter()
// .enumerate()
// .any(|(idx, c)| c.ordering_connection && inspect_child(idx))
// }
// }
//
// self.plan = with_new_children_if_necessary(
// self.plan,
// self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
// )?
// .into();
// self.ordering_connection = false;
// Ok(self)
// }
// }
//
// impl TreeNode for OrderPreservationContext {
// fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
// where
// F: FnMut(&Self) -> Result<VisitRecursion>,
// {
// for child in &self.children_nodes {
// match op(child)? {
// VisitRecursion::Continue => {}
// VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
// VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
// }
// }
// Ok(VisitRecursion::Continue)
// }
//
// fn map_children<F>(mut self, transform: F) -> Result<Self>
// where
// F: FnMut(Self) -> Result<Self>,
// {
// if !self.children_nodes.is_empty() {
// self.children_nodes = self
// .children_nodes
// .into_iter()
// .map(transform)
// .collect::<Result<_>>()?;
// self.plan = with_new_children_if_necessary(
// self.plan,
// self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
// )?
// .into();
// }
// Ok(self)
// }
// }

// /// Calculates the updated plan by replacing operators that lose ordering
// /// inside `sort_input` with their order-preserving variants. This will
// /// generate an alternative plan, which will be accepted or rejected later on
// /// depending on whether it helps us remove a `SortExec`.
// fn get_updated_plan(
// mut sort_input: OrderPreservationContext,
// // Flag indicating that it is desirable to replace `RepartitionExec`s with
// // `SortPreservingRepartitionExec`s:
// is_spr_better: bool,
// // Flag indicating that it is desirable to replace `CoalescePartitionsExec`s
// // with `SortPreservingMergeExec`s:
// is_spm_better: bool,
// ) -> Result<OrderPreservationContext> {
// let updated_children = sort_input
// .children_nodes
// .clone()
// .into_iter()
// .map(|item| {
// // Update children and their descendants in the given tree if the connection is open:
// if item.ordering_connection {
// get_updated_plan(item, is_spr_better, is_spm_better)
// } else {
// Ok(item)
// }
// })
// .collect::<Result<Vec<_>>>()?;
//
// sort_input.plan = sort_input
// .plan
// .with_new_children(updated_children.iter().map(|c| c.plan.clone()).collect())?;
// sort_input.ordering_connection = false;
// sort_input.children_nodes = updated_children;
//
// // When a `RepartitionExec` doesn't preserve ordering, replace it with
// // a sort-preserving variant if appropriate:
// if is_repartition(&sort_input.plan)
// && !sort_input.plan.maintains_input_order()[0]
// && is_spr_better
// {
// let child = sort_input.plan.children().swap_remove(0);
// let repartition =
// RepartitionExec::try_new(child, sort_input.plan.output_partitioning())?
// .with_preserve_order();
// sort_input.plan = Arc::new(repartition) as _;
// sort_input.children_nodes[0].ordering_connection = true;
// } else if is_coalesce_partitions(&sort_input.plan) && is_spm_better {
// // When the input of a `CoalescePartitionsExec` has an ordering, replace it
// // with a `SortPreservingMergeExec` if appropriate:
// if let Some(ordering) = sort_input.children_nodes[0]
// .plan
// .output_ordering()
// .map(|o| o.to_vec())
// {
// // Now we can mutate `new_node.children_nodes` safely
// let child = sort_input.children_nodes.clone().swap_remove(0);
// sort_input.plan =
// Arc::new(SortPreservingMergeExec::new(ordering, child.plan)) as _;
// sort_input.children_nodes[0].ordering_connection = true;
// }
// }
//
// Ok(sort_input)
// }

/// The `replace_with_order_preserving_variants` optimizer sub-rule tries to
/// remove `SortExec`s from the physical plan by replacing operators that do
Expand Down Expand Up @@ -236,9 +76,6 @@ pub(crate) fn propagate_order_maintaining_connections_down(
plan: Arc<dyn ExecutionPlan>,
mut ordering_connection: bool,
) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, Vec<bool>, bool)> {
println!("propagate_order_maintaining_connections_down:");
println!("plan: {:?}", plan);
println!("ordering_connection: {}", ordering_connection);
let children_ordering_connections = if is_sort(&plan) {
vec![true]
} else {
Expand All @@ -248,11 +85,6 @@ pub(crate) fn propagate_order_maintaining_connections_down(
.map(|mio| ordering_connection && mio)
.collect()
};
println!(
"children_ordering_connections: {:?}",
children_ordering_connections
);
println!("");
Ok((
Transformed::No(plan),
children_ordering_connections,
Expand All @@ -279,23 +111,13 @@ pub(crate) fn replace_with_order_preserving_variants_up(
Transformed<Arc<dyn ExecutionPlan>>,
Option<Arc<dyn ExecutionPlan>>,
)> {
println!("replace_with_order_preserving_variants_up:");
println!("plan: {:?}", plan);
println!("ordering_connection: {}", ordering_connection);
println!("order_preserving_children: {:?}", order_preserving_children);

// For unbounded cases, replace with the order-preserving variant in
// any case, as doing so helps fix the pipeline.
// Also do the replacement if opted-in via config options.
let use_order_preserving_variant =
config.optimizer.prefer_existing_sort || unbounded_output(&plan);

println!(
"use_order_preserving_variant: {:?}",
use_order_preserving_variant
);

let x = if is_sort(&plan) {
if is_sort(&plan) {
if let Some(order_preserving_plan) = order_preserving_children.swap_remove(0) {
// If there is an order preserving alternative available we need to check if
// it satisfies ordering of the original sort operator
Expand All @@ -316,35 +138,23 @@ pub(crate) fn replace_with_order_preserving_variants_up(
&& !plan.maintains_input_order()[0]
&& (is_spr_better || use_order_preserving_variant)
{
println!("is_repartition: true");

let child = order_preserving_children
.swap_remove(0)
.unwrap_or_else(|| plan.children().swap_remove(0));

println!("child: {:?}", child);

let order_preserving_plan: Arc<dyn ExecutionPlan> = Arc::new(
RepartitionExec::try_new(child, plan.output_partitioning())?
.with_preserve_order(),
);

println!("order_preserving_plan: {:?}", order_preserving_plan);
Ok((Transformed::No(plan), Some(order_preserving_plan)))
} else if ordering_connection
&& is_coalesce_partitions(&plan)
&& (is_spm_better || use_order_preserving_variant)
{
println!("is_coalesce_partitions: true");

// When the input of a `CoalescePartitionsExec` has an ordering, replace it
// with a `SortPreservingMergeExec` if appropriate:
let child = order_preserving_children
.swap_remove(0)
.unwrap_or_else(|| plan.children().swap_remove(0));

println!("child: {:?}", child);

let order_preserving_plan: Option<Arc<dyn ExecutionPlan>> =
child.output_ordering().map(|o| {
Arc::new(SortPreservingMergeExec::new(o.to_vec(), child.clone())) as _
Expand All @@ -366,47 +176,7 @@ pub(crate) fn replace_with_order_preserving_variants_up(
None
};
Ok((Transformed::No(plan), order_preserving_plan))
};

println!("");

x

// let mut requirements = requirements.update_children()?;
// if !(is_sort(&requirements.plan)
// && requirements.children_nodes[0].ordering_connection)
// {
// return Ok(Transformed::No(requirements));
// }
//
// // For unbounded cases, replace with the order-preserving variant in
// // any case, as doing so helps fix the pipeline.
// // Also do the replacement if opted-in via config options.
// let use_order_preserving_variant =
// config.optimizer.prefer_existing_sort || unbounded_output(&requirements.plan);
//
// let mut updated_sort_input = get_updated_plan(
// requirements.children_nodes.clone().swap_remove(0),
// is_spr_better || use_order_preserving_variant,
// is_spm_better || use_order_preserving_variant,
// )?;
//
// // If this sort is unnecessary, we should remove it and update the plan:
// if updated_sort_input
// .plan
// .equivalence_properties()
// .ordering_satisfy(requirements.plan.output_ordering().unwrap_or(&[]))
// {
// for child in updated_sort_input.children_nodes.iter_mut() {
// child.ordering_connection = false;
// }
// Ok(Transformed::Yes(updated_sort_input))
// } else {
// for child in requirements.children_nodes.iter_mut() {
// child.ordering_connection = false;
// }
// Ok(Transformed::Yes(requirements))
// }
}
}

#[cfg(test)]
Expand Down

0 comments on commit fac0546

Please sign in to comment.