From 155a02f7fccbc771d815b980168e093cc50a536b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 8 Feb 2023 06:13:28 -0700 Subject: [PATCH] Minor refactor to reduce duplicate code --- ballista/scheduler/src/planner.rs | 53 ++++++++++--------------------- 1 file changed, 17 insertions(+), 36 deletions(-) diff --git a/ballista/scheduler/src/planner.rs b/ballista/scheduler/src/planner.rs index 87aaad775..4b7809edd 100644 --- a/ballista/scheduler/src/planner.rs +++ b/ballista/scheduler/src/planner.rs @@ -16,8 +16,6 @@ // under the License. //! Distributed query execution -//! -//! This code is EXPERIMENTAL and still under development use std::collections::HashMap; use std::sync::Arc; @@ -84,7 +82,6 @@ impl DistributedPlanner { job_id: &'a str, execution_plan: Arc, ) -> Result { - // async move { // recurse down and replace children if execution_plan.children().is_empty() { return Ok((execution_plan, vec![])); @@ -109,17 +106,7 @@ impl DistributedPlanner { children[0].clone(), None, )?; - let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( - shuffle_writer.stage_id(), - shuffle_writer.schema(), - shuffle_writer.output_partitioning().partition_count(), - shuffle_writer - .shuffle_output_partitioning() - .map(|p| p.partition_count()) - .unwrap_or_else(|| { - shuffle_writer.output_partitioning().partition_count() - }), - )); + let unresolved_shuffle = create_unresolved_shuffle(&shuffle_writer); stages.push(shuffle_writer); Ok(( with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?, @@ -135,17 +122,7 @@ impl DistributedPlanner { children[0].clone(), None, )?; - let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( - shuffle_writer.stage_id(), - shuffle_writer.schema(), - shuffle_writer.output_partitioning().partition_count(), - shuffle_writer - .shuffle_output_partitioning() - .map(|p| p.partition_count()) - .unwrap_or_else(|| { - shuffle_writer.output_partitioning().partition_count() - }), - )); + let unresolved_shuffle = create_unresolved_shuffle(&shuffle_writer); stages.push(shuffle_writer); Ok(( with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?, @@ -162,17 +139,7 @@ impl DistributedPlanner { children[0].clone(), Some(repart.partitioning().to_owned()), )?; - let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( - shuffle_writer.stage_id(), - shuffle_writer.schema(), - shuffle_writer.output_partitioning().partition_count(), - shuffle_writer - .shuffle_output_partitioning() - .map(|p| p.partition_count()) - .unwrap_or_else(|| { - shuffle_writer.output_partitioning().partition_count() - }), - )); + let unresolved_shuffle = create_unresolved_shuffle(&shuffle_writer); stages.push(shuffle_writer); Ok((unresolved_shuffle, stages)) } @@ -202,6 +169,20 @@ impl DistributedPlanner { } } +fn create_unresolved_shuffle( + shuffle_writer: &ShuffleWriterExec, +) -> Arc { + Arc::new(UnresolvedShuffleExec::new( + shuffle_writer.stage_id(), + shuffle_writer.schema(), + shuffle_writer.output_partitioning().partition_count(), + shuffle_writer + .shuffle_output_partitioning() + .map(|p| p.partition_count()) + .unwrap_or_else(|| shuffle_writer.output_partitioning().partition_count()), + )) +} + /// Returns the unresolved shuffles in the execution plan pub fn find_unresolved_shuffles( plan: &Arc,