Skip to content

Commit

Permalink
fix(async): move builtins call to blocking threadpool (#1621)
Browse files Browse the repository at this point in the history
* fix(async): move builtins call to blocking threadpool

* fix(async): remove single_thread tokio runtime
  • Loading branch information
folex authored May 22, 2023
1 parent bdcd492 commit 7e18801
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 60 deletions.
45 changes: 24 additions & 21 deletions aquamarine/src/particle_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use futures::{FutureExt, StreamExt};
use humantime::format_duration as pretty;
use serde_json::json;
use serde_json::Value as JValue;
use tokio::runtime::Handle;

use particle_args::{Args, JError};
use particle_execution::{
Expand Down Expand Up @@ -159,27 +160,29 @@ impl<F: ParticleFunctionStatic> Functions<F> {
"Call function {}:{}",
args.service_id, args.function_name
))
.spawn(async move {
let outcome = builtins.call(args, params).await;
// record whether call was handled by builtin or not. needed for stats.
let mut call_kind = FunctionKind::Service;
let outcome = match outcome {
// If particle_function isn't set, just return what we have
outcome if particle_function.is_none() => outcome,
// If builtins weren't defined over these args, try particle_function
FunctionOutcome::NotDefined { args, params } => {
let func = particle_function.unwrap();
// TODO: Actors would allow to get rid of Mutex
// i.e., wrap each callback with a queue & channel
let func = func.lock().await;
let outcome = func.call(args, params).await;
call_kind = FunctionKind::ParticleFunction;
outcome
}
// Builtins were called, return their outcome
outcome => outcome,
};
(outcome, call_kind)
.spawn_blocking(|| {
Handle::current().block_on(async move {
let outcome = builtins.call(args, params).await;
// record whether call was handled by builtin or not. needed for stats.
let mut call_kind = FunctionKind::Service;
let outcome = match outcome {
// If particle_function isn't set, just return what we have
outcome if particle_function.is_none() => outcome,
// If builtins weren't defined over these args, try particle_function
FunctionOutcome::NotDefined { args, params } => {
let func = particle_function.unwrap();
// TODO: Actors would allow to get rid of Mutex
// i.e., wrap each callback with a queue & channel
let func = func.lock().await;
let outcome = func.call(args, params).await;
call_kind = FunctionKind::ParticleFunction;
outcome
}
// Builtins were called, return their outcome
outcome => outcome,
};
(outcome, call_kind)
})
})
.expect("Could not spawn task");

Expand Down
72 changes: 33 additions & 39 deletions particle-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ trait Stoppable {
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

#[tokio::main(flavor = "current_thread")]
async fn main() -> eyre::Result<()> {
fn main() -> eyre::Result<()> {
#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::new_heap();

Expand Down Expand Up @@ -87,45 +86,40 @@ async fn main() -> eyre::Result<()> {
)
}
}
tracing_subscriber::registry()
.with(log_layer(&config.log))
.with(tokio_console_layer(&config.console)?)
.with(tracing_layer(&config.tracing)?)
.init();

if let Some(true) = config.print_config {
log::info!("Loaded config: {:#?}", config);
}

let config = config.resolve()?;

let interpreter_path = to_abs_path(config.dir_config.air_interpreter_path.clone());
write_default_air_interpreter(&interpreter_path)?;
log::info!("AIR interpreter: {:?}", interpreter_path);

//TODO: add thread count configuration based on config
tokio::task::spawn_blocking(|| {
let result: eyre::Result<()> = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("tokio")
.build()
.expect("Could not make tokio runtime")
.block_on(async {
let fluence = start_fluence(config).await?;
log::info!("Fluence has been successfully started.");
log::info!("Waiting for Ctrl-C to exit...");

signal::ctrl_c().await.expect("Failed to listen for event");
log::info!("Shutting down...");

fluence.stop();
Ok(())
});
result
})
.await??;

Ok(())
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("tokio")
.build()
.expect("Could not make tokio runtime")
.block_on(async {
tracing_subscriber::registry()
.with(log_layer(&config.log))
.with(tokio_console_layer(&config.console)?)
.with(tracing_layer(&config.tracing)?)
.init();

if let Some(true) = config.print_config {
log::info!("Loaded config: {:#?}", config);
}

let config = config.resolve()?;

let interpreter_path = to_abs_path(config.dir_config.air_interpreter_path.clone());
write_default_air_interpreter(&interpreter_path)?;
log::info!("AIR interpreter: {:?}", interpreter_path);

let fluence = start_fluence(config).await?;
log::info!("Fluence has been successfully started.");
log::info!("Waiting for Ctrl-C to exit...");

signal::ctrl_c().await.expect("Failed to listen for event");
log::info!("Shutting down...");

fluence.stop();
Ok(())
})
}

// NOTE: to stop Fluence just call Stoppable::stop()
Expand Down

0 comments on commit 7e18801

Please sign in to comment.