diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 365d8e9fd9a42..9f77e5b290773 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -565,12 +565,17 @@ message CsvScanExecNode { repeated string filename = 8; } +enum PartitionMode { + COLLECT_LEFT = 0; + PARTITIONED = 1; +} + message HashJoinExecNode { PhysicalPlanNode left = 1; PhysicalPlanNode right = 2; repeated JoinOn on = 3; JoinType join_type = 4; - + PartitionMode partition_mode = 6; } message PhysicalColumn { diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 47d330b8f8486..719e0d99901a4 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -42,7 +42,9 @@ use datafusion::arrow::ipc::writer::FileWriter; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result}; use datafusion::physical_plan::hash_join::create_hashes; -use datafusion::physical_plan::{ExecutionPlan, Partitioning, RecordBatchStream}; +use datafusion::physical_plan::{ + displayable, ExecutionPlan, Partitioning, RecordBatchStream, +}; use futures::StreamExt; use log::info; use std::fs::File; @@ -133,6 +135,13 @@ impl ExecutionPlan for ShuffleWriterExec { ) -> Result>> { let now = Instant::now(); + let displayable_plan = displayable(self.plan.as_ref()); + info!( + "ShuffleWriterExec executing partition {}\n{}", + partition, + displayable_plan.indent() + ); + let mut stream = self.plan.execute(partition).await?; let mut path = PathBuf::from(&self.work_dir); diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 4049622b83dc5..f6245c31121bf 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -30,6 +30,7 @@ use datafusion::logical_plan::{ }; use datafusion::physical_plan::aggregates::AggregateFunction; use datafusion::physical_plan::functions::BuiltinScalarFunction; +use datafusion::physical_plan::hash_join::PartitionMode; use datafusion::physical_plan::window_functions::{ BuiltInWindowFunction, WindowFunction, }; diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 9c70603d17278..7ca1758fe30f0 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -356,12 +356,24 @@ impl TryInto> for &protobuf::PhysicalPlanNode { protobuf::JoinType::Semi => JoinType::Semi, protobuf::JoinType::Anti => JoinType::Anti, }; + let partition_mode = + protobuf::PartitionMode::from_i32(hashjoin.partition_mode) + .ok_or_else(|| { + proto_error(format!( + "Received a HashJoinNode message with unknown PartitionMode {}", + hashjoin.partition_mode + )) + })?; + let partition_mode = match partition_mode { + protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft, + protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned, + }; Ok(Arc::new(HashJoinExec::try_new( left, right, on, &join_type, - PartitionMode::CollectLeft, + partition_mode, )?)) } PhysicalPlanType::ShuffleReader(shuffle_reader) => { diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs index c0fe81f0ffb91..3703fdd39acfa 100644 --- a/ballista/rust/core/src/serde/physical_plan/mod.rs +++ b/ballista/rust/core/src/serde/physical_plan/mod.rs @@ -88,13 +88,29 @@ mod roundtrip_tests { Column::new("col", schema_right.index_of("col")?), )]; - roundtrip_test(Arc::new(HashJoinExec::try_new( - Arc::new(EmptyExec::new(false, Arc::new(schema_left))), - Arc::new(EmptyExec::new(false, Arc::new(schema_right))), - on, - &JoinType::Inner, - PartitionMode::CollectLeft, - )?)) + let schema_left = Arc::new(schema_left); + let schema_right = Arc::new(schema_right); + for join_type in vec![ + JoinType::Inner, + JoinType::Left, + JoinType::Right, + JoinType::Full, + JoinType::Anti, + JoinType::Semi, + ] { + for partition_mode in + vec![PartitionMode::Partitioned, PartitionMode::CollectLeft] + { + roundtrip_test(Arc::new(HashJoinExec::try_new( + Arc::new(EmptyExec::new(false, schema_left.clone())), + Arc::new(EmptyExec::new(false, schema_right.clone())), + on.clone(), + &join_type, + partition_mode, + )?))?; + } + } + Ok(()) } #[test] diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 7d310da639164..e8a0d24eedda7 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -34,7 +34,7 @@ use datafusion::physical_plan::expressions::{ use datafusion::physical_plan::expressions::{CastExpr, TryCastExpr}; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::hash_aggregate::AggregateMode; -use datafusion::physical_plan::hash_join::HashJoinExec; +use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode}; use datafusion::physical_plan::hash_utils::JoinType; use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion::physical_plan::parquet::ParquetExec; @@ -143,6 +143,10 @@ impl TryInto for Arc { JoinType::Semi => protobuf::JoinType::Semi, JoinType::Anti => protobuf::JoinType::Anti, }; + let partition_mode = match exec.partition_mode() { + PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft, + PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned, + }; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new( protobuf::HashJoinExecNode { @@ -150,6 +154,7 @@ impl TryInto for Arc { right: Some(Box::new(right)), on, join_type: join_type.into(), + partition_mode: partition_mode.into(), }, ))), }) diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index 00ebd1e98cf93..c0307056b1dbe 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -177,7 +177,11 @@ fn build_exec_plan_diagram( .is_some() { "CoalesceBatchesExec" - } else if plan.as_any().downcast_ref::().is_some() { + } else if plan + .as_any() + .downcast_ref::() + .is_some() + { "MergeExec" } else { println!("Unknown: {:?}", plan); diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 8d8fa67c5fa85..d0c93c773dada 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -67,7 +67,12 @@ impl DistributedPlanner { info!("planning query stages"); let (new_plan, mut stages) = self.plan_query_stages_internal(job_id, execution_plan)?; - stages.push(create_shuffle_writer(job_id, self.next_stage_id(), new_plan, None)?); + stages.push(create_shuffle_writer( + job_id, + self.next_stage_id(), + new_plan, + None, + )?); Ok(stages) } @@ -96,12 +101,15 @@ impl DistributedPlanner { if let Some(adapter) = execution_plan.as_any().downcast_ref::() { let ctx = ExecutionContext::new(); Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages)) - } else if let Some(coalesce) = execution_plan.as_any().downcast_ref::() { + } else if let Some(coalesce) = execution_plan + .as_any() + .downcast_ref::() + { let query_stage = create_shuffle_writer( job_id, self.next_stage_id(), coalesce.children()[0].clone(), - None + None, )?; let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( vec![query_stage.stage_id()], @@ -109,7 +117,10 @@ impl DistributedPlanner { query_stage.output_partitioning().partition_count(), )); stages.push(query_stage); - Ok((coalesce.with_new_children(vec![unresolved_shuffle])?, stages)) + Ok(( + coalesce.with_new_children(vec![unresolved_shuffle])?, + stages, + )) } else if let Some(repart) = execution_plan.as_any().downcast_ref::() { @@ -117,7 +128,7 @@ impl DistributedPlanner { job_id, self.next_stage_id(), repart.children()[0].clone(), - Some(repart.partitioning().to_owned()) + Some(repart.partitioning().to_owned()), )?; let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( vec![query_stage.stage_id()], @@ -207,7 +218,9 @@ mod test { use datafusion::physical_plan::hash_aggregate::HashAggregateExec; use datafusion::physical_plan::sort::SortExec; use datafusion::physical_plan::{displayable, ExecutionPlan}; - use datafusion::physical_plan::{merge::CoalescePartitionsExec, projection::ProjectionExec}; + use datafusion::physical_plan::{ + merge::CoalescePartitionsExec, projection::ProjectionExec, + }; use std::convert::TryInto; use std::sync::Arc; use uuid::Uuid; diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index a8a6f31f9dae2..3afd31d28b946 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -177,6 +177,11 @@ impl HashJoinExec { &self.join_type } + /// The partitioning mode of this hash join + pub fn partition_mode(&self) -> &PartitionMode { + &self.mode + } + /// Calculates column indices and left/right placement on input / output schemas and jointype fn column_indices_from_schema(&self) -> ArrowResult> { let (primary_is_left, primary_schema, secondary_schema) = match self.join_type { diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs index 91c042fcc4bb1..8a585ed505287 100644 --- a/datafusion/src/physical_plan/limit.rs +++ b/datafusion/src/physical_plan/limit.rs @@ -319,7 +319,8 @@ mod tests { // input should have 4 partitions assert_eq!(csv.output_partitioning().partition_count(), num_partitions); - let limit = GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7); + let limit = + GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7); // the result should contain 4 batches (one per input partition) let iter = limit.execute(0).await?;