Skip to content

Commit

Permalink
Add ExecutionEngine abstraction (#687)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Mar 2, 2023
1 parent fe6e2f5 commit 85031c4
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 70 deletions.
1 change: 1 addition & 0 deletions ballista/executor/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ async fn main() -> Result<()> {
print_thread_info: opt.print_thread_info,
job_data_ttl_seconds: opt.job_data_ttl_seconds,
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
execution_engine: None,
};

start_executor_process(config).await
Expand Down
114 changes: 114 additions & 0 deletions ballista/executor/src/execution_engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use async_trait::async_trait;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf::ShuffleWritePartition;
use ballista_core::utils;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use std::fmt::Debug;
use std::sync::Arc;

/// Execution engine extension point

pub trait ExecutionEngine: Sync + Send {
fn create_query_stage_exec(
&self,
job_id: String,
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
work_dir: &str,
) -> Result<Arc<dyn QueryStageExecutor>>;
}

/// QueryStageExecutor executes a section of a query plan that has consistent partitioning and
/// can be executed as one unit with each partition being executed in parallel. The output of each
/// partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
/// will use the ShuffleReaderExec to read these results.
#[async_trait]
pub trait QueryStageExecutor: Sync + Send + Debug {
async fn execute_query_stage(
&self,
input_partition: usize,
context: Arc<TaskContext>,
) -> Result<Vec<ShuffleWritePartition>>;

fn collect_plan_metrics(&self) -> Vec<MetricsSet>;
}

pub struct DefaultExecutionEngine {}

impl ExecutionEngine for DefaultExecutionEngine {
fn create_query_stage_exec(
&self,
job_id: String,
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
work_dir: &str,
) -> Result<Arc<dyn QueryStageExecutor>> {
// the query plan created by the scheduler always starts with a ShuffleWriterExec
let exec = if let Some(shuffle_writer) =
plan.as_any().downcast_ref::<ShuffleWriterExec>()
{
// recreate the shuffle writer with the correct working directory
ShuffleWriterExec::try_new(
job_id,
stage_id,
plan.children()[0].clone(),
work_dir.to_string(),
shuffle_writer.shuffle_output_partitioning().cloned(),
)
} else {
Err(DataFusionError::Internal(
"Plan passed to new_query_stage_exec is not a ShuffleWriterExec"
.to_string(),
))
}?;
Ok(Arc::new(DefaultQueryStageExec::new(exec)))
}
}

#[derive(Debug)]
pub struct DefaultQueryStageExec {
shuffle_writer: ShuffleWriterExec,
}

impl DefaultQueryStageExec {
pub fn new(shuffle_writer: ShuffleWriterExec) -> Self {
Self { shuffle_writer }
}
}

#[async_trait]
impl QueryStageExecutor for DefaultQueryStageExec {
async fn execute_query_stage(
&self,
input_partition: usize,
context: Arc<TaskContext>,
) -> Result<Vec<ShuffleWritePartition>> {
self.shuffle_writer
.execute_shuffle_write(input_partition, context)
.await
}

fn collect_plan_metrics(&self) -> Vec<MetricsSet> {
utils::collect_plan_metrics(self.shuffle_writer.children()[0].as_ref())
}
}
15 changes: 9 additions & 6 deletions ballista/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::{as_task_status, TaskExecutionTimes};
use ballista_core::error::BallistaError;
use ballista_core::serde::scheduler::{ExecutorSpecification, PartitionId};
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::collect_plan_metrics;
use datafusion::execution::context::TaskContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
Expand Down Expand Up @@ -209,8 +208,12 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
plan.schema().as_ref(),
)?;

let shuffle_writer_plan =
executor.new_shuffle_writer(job_id.clone(), stage_id as usize, plan)?;
let query_stage_exec = executor.execution_engine.create_query_stage_exec(
job_id.clone(),
stage_id as usize,
plan,
&executor.work_dir,
)?;
dedicated_executor.spawn(async move {
use std::panic::AssertUnwindSafe;
let part = PartitionId {
Expand All @@ -219,10 +222,10 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
partition_id: partition_id as usize,
};

let execution_result = match AssertUnwindSafe(executor.execute_shuffle_write(
let execution_result = match AssertUnwindSafe(executor.execute_query_stage(
task_id as usize,
part.clone(),
shuffle_writer_plan.clone(),
query_stage_exec.clone(),
task_context,
shuffle_output_partitioning,
))
Expand All @@ -240,7 +243,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
info!("Done with task {}", task_identity);
debug!("Statistics: {:?}", execution_result);

let plan_metrics = collect_plan_metrics(shuffle_writer_plan.as_ref());
let plan_metrics = query_stage_exec.collect_plan_metrics();
let operator_metrics = plan_metrics
.into_iter()
.map(|m| m.try_into())
Expand Down
74 changes: 28 additions & 46 deletions ballista/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,26 @@

//! Ballista executor logic

use dashmap::DashMap;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::execution_engine::DefaultExecutionEngine;
use crate::execution_engine::ExecutionEngine;
use crate::execution_engine::QueryStageExecutor;
use crate::metrics::ExecutorMetricsCollector;
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf;
use ballista_core::serde::protobuf::ExecutorRegistration;
use datafusion::error::DataFusionError;
use ballista_core::serde::scheduler::PartitionId;
use dashmap::DashMap;
use datafusion::execution::context::TaskContext;
use datafusion::execution::runtime_env::RuntimeEnv;

use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use datafusion::physical_plan::Partitioning;
use futures::future::AbortHandle;

use ballista_core::serde::scheduler::PartitionId;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

pub struct TasksDrainedFuture(pub Arc<Executor>);

Expand Down Expand Up @@ -82,6 +80,10 @@ pub struct Executor {

/// Handles to abort executing tasks
abort_handles: AbortHandles,

/// Execution engine that the executor will delegate to
/// for executing query stages
pub(crate) execution_engine: Arc<dyn ExecutionEngine>,
}

impl Executor {
Expand All @@ -92,6 +94,7 @@ impl Executor {
runtime: Arc<RuntimeEnv>,
metrics_collector: Arc<dyn ExecutorMetricsCollector>,
concurrent_tasks: usize,
execution_engine: Option<Arc<dyn ExecutionEngine>>,
) -> Self {
Self {
metadata,
Expand All @@ -103,6 +106,8 @@ impl Executor {
metrics_collector,
concurrent_tasks,
abort_handles: Default::default(),
execution_engine: execution_engine
.unwrap_or_else(|| Arc::new(DefaultExecutionEngine {})),
}
}
}
Expand All @@ -111,16 +116,16 @@ impl Executor {
/// Execute one partition of a query stage and persist the result to disk in IPC format. On
/// success, return a RecordBatch containing metadata about the results, including path
/// and statistics.
pub async fn execute_shuffle_write(
pub async fn execute_query_stage(
&self,
task_id: usize,
partition: PartitionId,
shuffle_writer: Arc<ShuffleWriterExec>,
query_stage_exec: Arc<dyn QueryStageExecutor>,
task_ctx: Arc<TaskContext>,
_shuffle_output_partitioning: Option<Partitioning>,
) -> Result<Vec<protobuf::ShuffleWritePartition>, BallistaError> {
let (task, abort_handle) = futures::future::abortable(
shuffle_writer.execute_shuffle_write(partition.partition_id, task_ctx),
query_stage_exec.execute_query_stage(partition.partition_id, task_ctx),
);

self.abort_handles
Expand All @@ -134,39 +139,12 @@ impl Executor {
&partition.job_id,
partition.stage_id,
partition.partition_id,
shuffle_writer,
query_stage_exec,
);

Ok(partitions)
}

/// Recreate the shuffle writer with the correct working directory.
pub fn new_shuffle_writer(
&self,
job_id: String,
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<ShuffleWriterExec>, BallistaError> {
let exec = if let Some(shuffle_writer) =
plan.as_any().downcast_ref::<ShuffleWriterExec>()
{
// recreate the shuffle writer with the correct working directory
ShuffleWriterExec::try_new(
job_id,
stage_id,
plan.children()[0].clone(),
self.work_dir.clone(),
shuffle_writer.shuffle_output_partitioning().cloned(),
)
} else {
Err(DataFusionError::Internal(
"Plan passed to execute_shuffle_write is not a ShuffleWriterExec"
.to_string(),
))
}?;
Ok(Arc::new(exec))
}

pub async fn cancel_task(
&self,
task_id: usize,
Expand Down Expand Up @@ -208,6 +186,7 @@ mod test {
use ballista_core::serde::protobuf::ExecutorRegistration;
use datafusion::execution::context::TaskContext;

use crate::execution_engine::DefaultQueryStageExec;
use ballista_core::serde::scheduler::PartitionId;
use datafusion::error::DataFusionError;
use datafusion::physical_expr::PhysicalSortExpr;
Expand Down Expand Up @@ -307,6 +286,8 @@ mod test {
)
.expect("creating shuffle writer");

let query_stage_exec = DefaultQueryStageExec::new(shuffle_write);

let executor_registration = ExecutorRegistration {
id: "executor".to_string(),
port: 0,
Expand All @@ -323,6 +304,7 @@ mod test {
ctx.runtime_env(),
Arc::new(LoggingMetricsCollector {}),
2,
None,
);

let (sender, receiver) = tokio::sync::oneshot::channel();
Expand All @@ -336,10 +318,10 @@ mod test {
partition_id: 0,
};
let task_result = executor_clone
.execute_shuffle_write(
.execute_query_stage(
1,
part,
Arc::new(shuffle_write),
Arc::new(query_stage_exec),
ctx.task_ctx(),
None,
)
Expand Down
5 changes: 5 additions & 0 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use ballista_core::utils::{
};
use ballista_core::BALLISTA_VERSION;

use crate::execution_engine::ExecutionEngine;
use crate::executor::{Executor, TasksDrainedFuture};
use crate::executor_server::TERMINATING;
use crate::flight_service::BallistaFlightService;
Expand Down Expand Up @@ -82,6 +83,9 @@ pub struct ExecutorProcessConfig {
pub log_rotation_policy: LogRotationPolicy,
pub job_data_ttl_seconds: u64,
pub job_data_clean_up_interval_seconds: u64,
/// Optional execution engine to use to execute physical plans, will default to
/// DataFusion if none is provided.
pub execution_engine: Option<Arc<dyn ExecutionEngine>>,
}

pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
Expand Down Expand Up @@ -181,6 +185,7 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
runtime,
metrics_collector,
concurrent_tasks,
opt.execution_engine,
));

let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
Expand Down
Loading

0 comments on commit 85031c4

Please sign in to comment.