Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate cpu-bound (query-execution) and IO-bound(heartbeat) to … #1883

Merged
merged 6 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,7 @@ configure_me_codegen = "0.4.0"

[package.metadata.configure_me.bin]
executor = "executor_config_spec.toml"

# use libc on unix like platforms to set worker priority in DedicatedExecutor
[target."cfg(unix)".dependencies.libc]
version = "0.2"
376 changes: 376 additions & 0 deletions ballista/rust/executor/src/cpu_bound_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,376 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//Inspire by https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/

//! This module contains a dedicated thread pool for running "cpu
//! intensive" workloads as query plans

use log::warn;
use parking_lot::Mutex;
use std::{pin::Pin, sync::Arc};
use tokio::sync::oneshot::Receiver;

use futures::Future;

/// The type of thing that the dedicated executor runs
type Task = Pin<Box<dyn Future<Output = ()> + Send>>;

/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
/// them) on a separate tokio runtime, like separate CPU-bound (execute a datafusion plan) tasks
/// from IO-bound tasks(heartbeats). Get more from the above blog.
#[derive(Clone)]
pub struct DedicatedExecutor {
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
state: Arc<Mutex<State>>,
}

/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
/// them) on a separate tokio Executor
struct State {
/// The number of threads in this pool
num_threads: usize,

/// The name of the threads for this executor
thread_name: String,

/// Channel for requests -- the dedicated executor takes requests
/// from here and runs them.
requests: Option<std::sync::mpsc::Sender<Task>>,

/// The thread that is doing the work
thread: Option<std::thread::JoinHandle<()>>,
}

/// The default worker priority (value passed to `libc::setpriority`);
const WORKER_PRIORITY: i32 = 10;

impl std::fmt::Debug for DedicatedExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = self.state.lock();

let mut d = f.debug_struct("DedicatedExecutor");

d.field("num_threads", &state.num_threads)
.field("thread_name", &state.thread_name);

if state.requests.is_some() {
d.field("requests", &"Some(...)")
} else {
d.field("requests", &"None")
};

if state.thread.is_some() {
d.field("thread", &"Some(...)")
} else {
d.field("thread", &"None")
};

d.finish()
}
}

impl DedicatedExecutor {
/// https://stackoverflow.com/questions/62536566
/// Creates a new `DedicatedExecutor` with a dedicated tokio
/// runtime that is separate from the `[tokio::main]` threadpool.
///
/// The worker thread priority is set to low so that such tasks do
/// not starve other more important tasks (such as answering health checks)
///
pub fn new(thread_name: impl Into<String>, num_threads: usize) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think @Darksonn and others have noted that a tokio Handle might also be able to be used here https://docs.rs/tokio/1.17.0/tokio/runtime/struct.Handle.html

let thread_name = thread_name.into();
let name_copy = thread_name.to_string();

let (tx, rx) = std::sync::mpsc::channel();

//Cannot create a seperated tokio runtime in another tokio runtime,
//So use std::thread to spawn a thread
let thread = std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name(&name_copy)
.worker_threads(num_threads)
.on_thread_start(move || set_current_thread_priority(WORKER_PRIORITY))
.build()
.expect("Creating tokio runtime");

// By entering the context, all calls to `tokio::spawn` go
// to this executor
let _guard = runtime.enter();

while let Ok(request) = rx.recv() {
// TODO feedback request status
tokio::task::spawn(request);
}
});

let state = State {
num_threads,
thread_name,
requests: Some(tx),
thread: Some(thread),
};

Self {
state: Arc::new(Mutex::new(state)),
}
}

/// Runs the specified Future (and any tasks it spawns) on the
/// `DedicatedExecutor`.
///
/// Currently all tasks are added to the tokio executor
/// immediately and compete for the threadpool's resources.
pub fn spawn<T>(&self, task: T) -> Receiver<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();

// create a execution plan to spawn
let job = Box::pin(async move {
let task_output = task.await;
if tx.send(task_output).is_err() {
warn!("Spawned task output ignored: receiver dropped");
}
});

let mut state = self.state.lock();

if let Some(requests) = &mut state.requests {
// would fail if someone has started shutdown
requests.send(job).ok();
} else {
warn!("tried to schedule task on an executor that was shutdown");
}

rx
}

/// signals shutdown of this executor and any Clones
#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this #allow is really necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For fix clippy, In the future if realized close the executor, will delete this.

pub fn shutdown(&self) {
// hang up the channel which will cause the dedicated thread
// to quit
let mut state = self.state.lock();
// remaining job will still running
state.requests = None;
}

/// Stops all subsequent task executions, and waits for the worker
/// thread to complete. Note this will shutdown all clones of this
/// `DedicatedExecutor` as well.
///
/// Only the first one to `join` will actually wait for the
/// executing thread to complete. All other calls to join will
/// complete immediately.
#[allow(dead_code)]
pub fn join(&self) {
self.shutdown();

// take the thread out when mutex is held
let thread = {
let mut state = self.state.lock();
state.thread.take()
};

// wait for completion while not holding the mutex to avoid
// deadlocks
if let Some(thread) = thread {
thread.join().ok();
}
}
}

#[cfg(unix)]
fn set_current_thread_priority(prio: i32) {
unsafe { libc::setpriority(0, 0, prio) };
}

#[cfg(not(unix))]
fn set_current_thread_priority(prio: i32) {
warn!("Setting worker thread priority not supported on this platform");
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Barrier};

#[cfg(unix)]
fn get_current_thread_priority() -> i32 {
// on linux setpriority sets the current thread's priority
// (as opposed to the current process).
unsafe { libc::getpriority(0, 0) }
}

#[cfg(not(unix))]
fn get_current_thread_priority() -> i32 {
WORKER_PRIORITY
}

#[tokio::test]
async fn basic_test_in_diff_thread() {
let barrier = Arc::new(Barrier::new(2));

let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier)));

// Note the dedicated task will never complete if it runs on
// the main tokio thread
//#[tokio::test] will only create one thread, if we running use tokio spwan
// after call do_work with barrier.wait() the only thread will be blocked and never finished
barrier.wait();

// should be able to get the result
assert_eq!(dedicated_task.await.unwrap(), 42);
}

#[tokio::test]
async fn basic_clone() {
let barrier = Arc::new(Barrier::new(2));
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier)));
barrier.wait();
assert_eq!(dedicated_task.await.unwrap(), 42);
}

#[tokio::test]
async fn multi_task() {
let barrier = Arc::new(Barrier::new(3));

// make an executor with two threads
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 2);
let dedicated_task1 = exec.spawn(do_work(11, Arc::clone(&barrier)));
let dedicated_task2 = exec.spawn(do_work(42, Arc::clone(&barrier)));

// block main thread until completion of other two tasks
barrier.wait();

// should be able to get the result
assert_eq!(dedicated_task1.await.unwrap(), 11);
assert_eq!(dedicated_task2.await.unwrap(), 42);

exec.join();
}

#[tokio::test]
async fn worker_priority() {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 2);

let dedicated_task = exec.spawn(async move { get_current_thread_priority() });

assert_eq!(dedicated_task.await.unwrap(), WORKER_PRIORITY);
}

#[tokio::test]
async fn tokio_spawn() {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 2);

// spawn a task that spawns to other tasks and ensure they run on the dedicated
// executor
let dedicated_task = exec.spawn(async move {
// spawn separate tasks
let t1 = tokio::task::spawn(async {
assert_eq!(
std::thread::current().name(),
Some("Test DedicatedExecutor")
);
25usize
});
t1.await.unwrap()
});

assert_eq!(dedicated_task.await.unwrap(), 25);
}

#[tokio::test]
async fn panic_on_executor() {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
let dedicated_task = exec.spawn(async move {
panic!("At the disco, on the dedicated task scheduler");
});

// should not be able to get the result
dedicated_task.await.unwrap_err();
}

#[tokio::test]
async fn executor_shutdown_while_task_running() {
let barrier = Arc::new(Barrier::new(2));

let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier)));

exec.shutdown();
// block main thread until completion of the outstanding task
barrier.wait();

// task should complete successfully
assert_eq!(dedicated_task.await.unwrap(), 42);
}

#[tokio::test]
async fn executor_submit_task_after_shutdown() {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);

// Simulate trying to submit tasks once executor has shutdown
exec.shutdown();
let dedicated_task = exec.spawn(async { 11 });

// task should complete, but return an error
dedicated_task.await.unwrap_err();
}

#[tokio::test]
async fn executor_submit_task_after_clone_shutdown() {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);

// shutdown the clone (but not the exec)
exec.clone().join();

// Simulate trying to submit tasks once executor has shutdown
let dedicated_task = exec.spawn(async { 11 });

// task should complete, but return an error
dedicated_task.await.unwrap_err();
}

#[tokio::test]
async fn executor_join() {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
// test it doesn't hang
exec.join()
}

#[tokio::test]
#[allow(clippy::redundant_clone)]
async fn executor_clone_join() {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
// test not hang
exec.clone().join();
exec.clone().join();
exec.join();
}

/// Wait for the barrier and then return `result`
async fn do_work(result: usize, barrier: Arc<Barrier>) -> usize {
barrier.wait();
result
}
}
Loading