From e95f322ca8634e93736dc9e66fc7a646f6a6335b Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 25 Feb 2022 15:37:27 +0800 Subject: [PATCH 1/6] Separate cpu-bound (query-execution) and IO-bound(heartbeat) work in diff tokio runtime in ballista_executor --- ballista/rust/executor/Cargo.toml | 4 + .../rust/executor/src/cpu_bound_executor.rs | 377 ++++++++++++++++++ ballista/rust/executor/src/executor_server.rs | 5 +- ballista/rust/executor/src/lib.rs | 2 + 4 files changed, 387 insertions(+), 1 deletion(-) create mode 100644 ballista/rust/executor/src/cpu_bound_executor.rs diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 8f0d29e208d3..b19d472c8ff3 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -55,3 +55,7 @@ configure_me_codegen = "0.4.0" [package.metadata.configure_me.bin] executor = "executor_config_spec.toml" + +# use libc on unix like platforms to set worker priority in DedicatedExecutor +[target."cfg(unix)".dependencies.libc] +version = "0.2" \ No newline at end of file diff --git a/ballista/rust/executor/src/cpu_bound_executor.rs b/ballista/rust/executor/src/cpu_bound_executor.rs new file mode 100644 index 000000000000..b3fa271839b6 --- /dev/null +++ b/ballista/rust/executor/src/cpu_bound_executor.rs @@ -0,0 +1,377 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//Inspire by https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/ +//! This module contains a dedicated thread pool for running "cpu +//! intensive" workloads such as DataFusion plans + +//! This module contains a dedicated thread pool for running "cpu +//! intensive" workloads such as DataFusion plans + +use log::warn; +use parking_lot::Mutex; +use std::{pin::Pin, sync::Arc}; +use tokio::sync::oneshot::Receiver; + +use futures::Future; + +/// The type of thing that the dedicated executor runs +type Task = Pin + Send>>; + +/// Runs futures (and any `tasks` that are `tokio::task::spawned` by +/// them) on a separate tokio runtime +#[derive(Clone)] +pub struct DedicatedExecutor { + state: Arc>, +} + +/// Runs futures (and any `tasks` that are `tokio::task::spawned` by +/// them) on a separate tokio Executor +struct State { + /// The number of threads in this pool + num_threads: usize, + + /// The name of the threads for this executor + thread_name: String, + + /// Channel for requests -- the dedicated executor takes requests + /// from here and runs them. + requests: Option>, + + /// The thread that is doing the work + thread: Option>, +} + +/// The default worker priority (value passed to `libc::setpriority`); +const WORKER_PRIORITY: i32 = 10; + +impl std::fmt::Debug for DedicatedExecutor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let state = self.state.lock(); + + let mut d = f.debug_struct("DedicatedExecutor"); + + d.field("num_threads", &state.num_threads) + .field("thread_name", &state.thread_name); + + if state.requests.is_some() { + d.field("requests", &"Some(...)") + } else { + d.field("requests", &"None") + }; + + if state.thread.is_some() { + d.field("thread", &"Some(...)") + } else { + d.field("thread", &"None") + }; + + d.finish() + } +} + +impl DedicatedExecutor { + /// https://stackoverflow.com/questions/62536566 + /// Creates a new `DedicatedExecutor` with a dedicated tokio + /// runtime that is separate from the `[tokio::main]` threadpool. + /// + /// The worker thread priority is set to low so that such tasks do + /// not starve other more important tasks (such as answering health checks) + /// + pub fn new(thread_name: impl Into, num_threads: usize) -> Self { + let thread_name = thread_name.into(); + let name_copy = thread_name.to_string(); + + let (tx, rx) = std::sync::mpsc::channel(); + + //Cannot create a seperated tokio runtime in another tokio runtime, + //So use std::thread to spawn a thread + let thread = std::thread::spawn(move || { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name(&name_copy) + .worker_threads(num_threads) + .on_thread_start(move || set_current_thread_priority(WORKER_PRIORITY)) + .build() + .expect("Creating tokio runtime"); + + // By entering the context, all calls to `tokio::spawn` go + // to this executor + let _guard = runtime.enter(); + + while let Ok(request) = rx.recv() { + // TODO feedback request status + tokio::task::spawn(request); + } + }); + + let state = State { + num_threads, + thread_name, + requests: Some(tx), + thread: Some(thread), + }; + + Self { + state: Arc::new(Mutex::new(state)), + } + } + + /// Runs the specified Future (and any tasks it spawns) on the + /// `DedicatedExecutor`. + /// + /// Currently all tasks are added to the tokio executor + /// immediately and compete for the threadpool's resources. + pub fn spawn(&self, task: T) -> Receiver + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let (tx, rx) = tokio::sync::oneshot::channel(); + + // create a execution plan to spawn + let job = Box::pin(async move { + let task_output = task.await; + if tx.send(task_output).is_err() { + warn!("Spawned task output ignored: receiver dropped"); + } + }); + + let mut state = self.state.lock(); + + if let Some(requests) = &mut state.requests { + // would fail if someone has started shutdown + requests.send(job).ok(); + } else { + warn!("tried to schedule task on an executor that was shutdown"); + } + + rx + } + + /// signals shutdown of this executor and any Clones + #[allow(dead_code)] + pub fn shutdown(&self) { + // hang up the channel which will cause the dedicated thread + // to quit + let mut state = self.state.lock(); + // remaining job will still running + state.requests = None; + } + + /// Stops all subsequent task executions, and waits for the worker + /// thread to complete. Note this will shutdown all clones of this + /// `DedicatedExecutor` as well. + /// + /// Only the first one to `join` will actually wait for the + /// executing thread to complete. All other calls to join will + /// complete immediately. + #[allow(dead_code)] + pub fn join(&self) { + self.shutdown(); + + // take the thread out when mutex is held + let thread = { + let mut state = self.state.lock(); + state.thread.take() + }; + + // wait for completion while not holding the mutex to avoid + // deadlocks + if let Some(thread) = thread { + thread.join().ok(); + } + } +} + +#[cfg(unix)] +fn set_current_thread_priority(prio: i32) { + unsafe { libc::setpriority(0, 0, prio) }; +} + +#[cfg(not(unix))] +fn set_current_thread_priority(prio: i32) { + warn!("Setting worker thread priority not supported on this platform"); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Barrier}; + + #[cfg(unix)] + fn get_current_thread_priority() -> i32 { + // on linux setpriority sets the current thread's priority + // (as opposed to the current process). + unsafe { libc::getpriority(0, 0) } + } + + #[cfg(not(unix))] + fn get_current_thread_priority() -> i32 { + WORKER_PRIORITY + } + + #[tokio::test] + async fn basic_test_in_diff_thread() { + let barrier = Arc::new(Barrier::new(2)); + + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier))); + + // Note the dedicated task will never complete if it runs on + // the main tokio thread + //#[tokio::test] will only create one thread, if we running use tokio spwan + // after call do_work with barrier.wait() the only thread will be blocked and never finished + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task.await.unwrap(), 42); + } + + #[tokio::test] + async fn basic_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(dedicated_task.await.unwrap(), 42); + } + + #[tokio::test] + async fn multi_task() { + let barrier = Arc::new(Barrier::new(3)); + + // make an executor with two threads + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 2); + let dedicated_task1 = exec.spawn(do_work(11, Arc::clone(&barrier))); + let dedicated_task2 = exec.spawn(do_work(42, Arc::clone(&barrier))); + + // block main thread until completion of other two tasks + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task1.await.unwrap(), 11); + assert_eq!(dedicated_task2.await.unwrap(), 42); + + exec.join(); + } + + #[tokio::test] + async fn worker_priority() { + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 2); + + let dedicated_task = exec.spawn(async move { get_current_thread_priority() }); + + assert_eq!(dedicated_task.await.unwrap(), WORKER_PRIORITY); + } + + #[tokio::test] + async fn tokio_spawn() { + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 2); + + // spawn a task that spawns to other tasks and ensure they run on the dedicated + // executor + let dedicated_task = exec.spawn(async move { + // spawn separate tasks + let t1 = tokio::task::spawn(async { + assert_eq!( + std::thread::current().name(), + Some("Test DedicatedExecutor") + ); + 25usize + }); + t1.await.unwrap() + }); + + assert_eq!(dedicated_task.await.unwrap(), 25); + } + + #[tokio::test] + async fn panic_on_executor() { + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let dedicated_task = exec.spawn(async move { + panic!("At the disco, on the dedicated task scheduler"); + }); + + // should not be able to get the result + dedicated_task.await.unwrap_err(); + } + + #[tokio::test] + async fn executor_shutdown_while_task_running() { + let barrier = Arc::new(Barrier::new(2)); + + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier))); + + exec.shutdown(); + // block main thread until completion of the outstanding task + barrier.wait(); + + // task should complete successfully + assert_eq!(dedicated_task.await.unwrap(), 42); + } + + #[tokio::test] + async fn executor_submit_task_after_shutdown() { + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + + // Simulate trying to submit tasks once executor has shutdown + exec.shutdown(); + let dedicated_task = exec.spawn(async { 11 }); + + // task should complete, but return an error + dedicated_task.await.unwrap_err(); + } + + #[tokio::test] + async fn executor_submit_task_after_clone_shutdown() { + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + + // shutdown the clone (but not the exec) + exec.clone().join(); + + // Simulate trying to submit tasks once executor has shutdown + let dedicated_task = exec.spawn(async { 11 }); + + // task should complete, but return an error + dedicated_task.await.unwrap_err(); + } + + #[tokio::test] + async fn executor_join() { + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + // test it doesn't hang + exec.join() + } + + #[tokio::test] + #[allow(clippy::redundant_clone)] + async fn executor_clone_join() { + let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + // test not hang + exec.clone().join(); + exec.clone().join(); + exec.join(); + } + + /// Wait for the barrier and then return `result` + async fn do_work(result: usize, barrier: Arc) -> usize { + barrier.wait(); + result + } +} diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs index 0c065c3da688..3a865f9075e1 100644 --- a/ballista/rust/executor/src/executor_server.rs +++ b/ballista/rust/executor/src/executor_server.rs @@ -39,6 +39,7 @@ use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec}; use datafusion::physical_plan::ExecutionPlan; use crate::as_task_status; +use crate::cpu_bound_executor::DedicatedExecutor; use crate::executor::Executor; pub async fn startup( @@ -261,6 +262,8 @@ impl TaskRunnerPool TaskRunnerPool Date: Fri, 25 Feb 2022 16:15:14 +0800 Subject: [PATCH 2/6] fix fmt --- ballista/rust/executor/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/rust/executor/src/lib.rs b/ballista/rust/executor/src/lib.rs index c87894d61f52..bb6dfa69bbe2 100644 --- a/ballista/rust/executor/src/lib.rs +++ b/ballista/rust/executor/src/lib.rs @@ -23,8 +23,8 @@ pub mod executor; pub mod executor_server; pub mod flight_service; -mod standalone; mod cpu_bound_executor; +mod standalone; pub use standalone::new_standalone_executor; From fd2cb8feec9184d15e3d07add70242572f992622 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 25 Feb 2022 17:38:43 +0800 Subject: [PATCH 3/6] add line --- ballista/rust/executor/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index b19d472c8ff3..8ecd7a1d2fc9 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -58,4 +58,4 @@ executor = "executor_config_spec.toml" # use libc on unix like platforms to set worker priority in DedicatedExecutor [target."cfg(unix)".dependencies.libc] -version = "0.2" \ No newline at end of file +version = "0.2" From c65089da3f59f6d8f77f12bf67f7df8489a8b252 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 28 Feb 2022 10:27:53 +0800 Subject: [PATCH 4/6] add annotations. --- ballista/rust/executor/src/cpu_bound_executor.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ballista/rust/executor/src/cpu_bound_executor.rs b/ballista/rust/executor/src/cpu_bound_executor.rs index b3fa271839b6..d476d50fec21 100644 --- a/ballista/rust/executor/src/cpu_bound_executor.rs +++ b/ballista/rust/executor/src/cpu_bound_executor.rs @@ -33,7 +33,8 @@ use futures::Future; type Task = Pin + Send>>; /// Runs futures (and any `tasks` that are `tokio::task::spawned` by -/// them) on a separate tokio runtime +/// them) on a separate tokio runtime, like separate CPU-bound (execute a datafusion plan) tasks +/// from IO-bound tasks(heartbeats). Get more from the above blog. #[derive(Clone)] pub struct DedicatedExecutor { state: Arc>, From 189690bcdb88ec775d130bf4f815b82c3c18f42f Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 28 Feb 2022 10:30:39 +0800 Subject: [PATCH 5/6] fix --- ballista/rust/executor/src/cpu_bound_executor.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ballista/rust/executor/src/cpu_bound_executor.rs b/ballista/rust/executor/src/cpu_bound_executor.rs index d476d50fec21..243d5fe69bd6 100644 --- a/ballista/rust/executor/src/cpu_bound_executor.rs +++ b/ballista/rust/executor/src/cpu_bound_executor.rs @@ -16,11 +16,9 @@ // under the License. //Inspire by https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/ -//! This module contains a dedicated thread pool for running "cpu -//! intensive" workloads such as DataFusion plans //! This module contains a dedicated thread pool for running "cpu -//! intensive" workloads such as DataFusion plans +//! intensive" workloads as query plans use log::warn; use parking_lot::Mutex; From b80d2419f61bcb0258510037edfd61c03282ee69 Mon Sep 17 00:00:00 2001 From: Yang Jiang <37145547+Ted-Jiang@users.noreply.github.com> Date: Tue, 1 Mar 2022 10:15:52 +0800 Subject: [PATCH 6/6] add annotation Co-authored-by: Andrew Lamb --- ballista/rust/executor/src/executor_server.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs index 3a865f9075e1..ad34634265bb 100644 --- a/ballista/rust/executor/src/executor_server.rs +++ b/ballista/rust/executor/src/executor_server.rs @@ -262,6 +262,8 @@ impl TaskRunnerPool