From 7e188014e9bafbe5d6d9b15c6e62ea7b4b879b27 Mon Sep 17 00:00:00 2001 From: folex <0xdxdy@gmail.com> Date: Mon, 22 May 2023 16:15:48 +0200 Subject: [PATCH] fix(async): move builtins call to blocking threadpool (#1621) * fix(async): move builtins call to blocking threadpool * fix(async): remove single_thread tokio runtime --- aquamarine/src/particle_functions.rs | 45 +++++++++-------- particle-node/src/main.rs | 72 +++++++++++++--------------- 2 files changed, 57 insertions(+), 60 deletions(-) diff --git a/aquamarine/src/particle_functions.rs b/aquamarine/src/particle_functions.rs index 11f7af6e6d..82032c1e76 100644 --- a/aquamarine/src/particle_functions.rs +++ b/aquamarine/src/particle_functions.rs @@ -26,6 +26,7 @@ use futures::{FutureExt, StreamExt}; use humantime::format_duration as pretty; use serde_json::json; use serde_json::Value as JValue; +use tokio::runtime::Handle; use particle_args::{Args, JError}; use particle_execution::{ @@ -159,27 +160,29 @@ impl Functions { "Call function {}:{}", args.service_id, args.function_name )) - .spawn(async move { - let outcome = builtins.call(args, params).await; - // record whether call was handled by builtin or not. needed for stats. - let mut call_kind = FunctionKind::Service; - let outcome = match outcome { - // If particle_function isn't set, just return what we have - outcome if particle_function.is_none() => outcome, - // If builtins weren't defined over these args, try particle_function - FunctionOutcome::NotDefined { args, params } => { - let func = particle_function.unwrap(); - // TODO: Actors would allow to get rid of Mutex - // i.e., wrap each callback with a queue & channel - let func = func.lock().await; - let outcome = func.call(args, params).await; - call_kind = FunctionKind::ParticleFunction; - outcome - } - // Builtins were called, return their outcome - outcome => outcome, - }; - (outcome, call_kind) + .spawn_blocking(|| { + Handle::current().block_on(async move { + let outcome = builtins.call(args, params).await; + // record whether call was handled by builtin or not. needed for stats. + let mut call_kind = FunctionKind::Service; + let outcome = match outcome { + // If particle_function isn't set, just return what we have + outcome if particle_function.is_none() => outcome, + // If builtins weren't defined over these args, try particle_function + FunctionOutcome::NotDefined { args, params } => { + let func = particle_function.unwrap(); + // TODO: Actors would allow to get rid of Mutex + // i.e., wrap each callback with a queue & channel + let func = func.lock().await; + let outcome = func.call(args, params).await; + call_kind = FunctionKind::ParticleFunction; + outcome + } + // Builtins were called, return their outcome + outcome => outcome, + }; + (outcome, call_kind) + }) }) .expect("Could not spawn task"); diff --git a/particle-node/src/main.rs b/particle-node/src/main.rs index 89a1ad2f7d..88405cef9d 100644 --- a/particle-node/src/main.rs +++ b/particle-node/src/main.rs @@ -54,8 +54,7 @@ trait Stoppable { #[global_allocator] static ALLOC: dhat::Alloc = dhat::Alloc; -#[tokio::main(flavor = "current_thread")] -async fn main() -> eyre::Result<()> { +fn main() -> eyre::Result<()> { #[cfg(feature = "dhat-heap")] let _profiler = dhat::Profiler::new_heap(); @@ -87,45 +86,40 @@ async fn main() -> eyre::Result<()> { ) } } - tracing_subscriber::registry() - .with(log_layer(&config.log)) - .with(tokio_console_layer(&config.console)?) - .with(tracing_layer(&config.tracing)?) - .init(); - - if let Some(true) = config.print_config { - log::info!("Loaded config: {:#?}", config); - } - - let config = config.resolve()?; - - let interpreter_path = to_abs_path(config.dir_config.air_interpreter_path.clone()); - write_default_air_interpreter(&interpreter_path)?; - log::info!("AIR interpreter: {:?}", interpreter_path); //TODO: add thread count configuration based on config - tokio::task::spawn_blocking(|| { - let result: eyre::Result<()> = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .thread_name("tokio") - .build() - .expect("Could not make tokio runtime") - .block_on(async { - let fluence = start_fluence(config).await?; - log::info!("Fluence has been successfully started."); - log::info!("Waiting for Ctrl-C to exit..."); - - signal::ctrl_c().await.expect("Failed to listen for event"); - log::info!("Shutting down..."); - - fluence.stop(); - Ok(()) - }); - result - }) - .await??; - - Ok(()) + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("tokio") + .build() + .expect("Could not make tokio runtime") + .block_on(async { + tracing_subscriber::registry() + .with(log_layer(&config.log)) + .with(tokio_console_layer(&config.console)?) + .with(tracing_layer(&config.tracing)?) + .init(); + + if let Some(true) = config.print_config { + log::info!("Loaded config: {:#?}", config); + } + + let config = config.resolve()?; + + let interpreter_path = to_abs_path(config.dir_config.air_interpreter_path.clone()); + write_default_air_interpreter(&interpreter_path)?; + log::info!("AIR interpreter: {:?}", interpreter_path); + + let fluence = start_fluence(config).await?; + log::info!("Fluence has been successfully started."); + log::info!("Waiting for Ctrl-C to exit..."); + + signal::ctrl_c().await.expect("Failed to listen for event"); + log::info!("Shutting down..."); + + fluence.stop(); + Ok(()) + }) } // NOTE: to stop Fluence just call Stoppable::stop()