Skip to content

Commit

Permalink
fix(spells): Allow parallel execution of immut service functions [NET…
Browse files Browse the repository at this point in the history
…-331] (#1430)

* Allow parallel execution of immutable service functions
  • Loading branch information
kmd-fl authored Jan 26, 2023
1 parent 2b018de commit e9a05d6
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 63 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions aquamarine/src/particle_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ impl<F: ParticleFunctionStatic> Functions<F> {
let func = particle_function.unwrap();
// TODO: Actors would allow to get rid of Mutex
// i.e., wrap each callback with a queue & channel
let mut func = func.lock();
let outcome = func(args, params).await;
let func = func.lock();
let outcome = func.call(args, params).await;
call_kind = FunctionKind::ParticleFunction;
outcome
}
Expand Down
7 changes: 6 additions & 1 deletion crates/builtins-deployer/src/builtins_deployer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use futures::executor::block_on;
use futures::FutureExt;
use humantime::format_duration as pretty;
use maplit::hashmap;
use parking_lot::Mutex;
use serde_json::{json, Value as JValue};

use aquamarine::AquamarineApi;
Expand All @@ -33,6 +34,7 @@ use fs_utils::list_files;
use fs_utils::{file_name, to_abs_path};
use local_vm::{client_functions, wrap_script};
use now_millis::now_ms;
use particle_execution::ServiceFunction;
use particle_protocol::Particle;
use uuid_utils::uuid;

Expand Down Expand Up @@ -118,7 +120,10 @@ impl BuiltinsDeployer {
let future = async move {
try {
aquamarine
.execute(particle, Some(Box::new(closure)))
.execute(
particle,
Some(ServiceFunction::Mut(Mutex::new(Box::new(closure)))),
)
.await?;

let result = inlet.await;
Expand Down
4 changes: 2 additions & 2 deletions crates/created-swarm/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use serde_json::json;

pub fn add_print<'a>(swarms: impl Iterator<Item = &'a mut CreatedSwarm>) {
let print = |peer_id: PeerId| -> Box<
dyn FnMut(_, _) -> BoxFuture<'static, FunctionOutcome> + 'static + Send + Sync,
dyn Fn(_, _) -> BoxFuture<'static, FunctionOutcome> + 'static + Send + Sync,
> {
Box::new(move |args: particle_args::Args, _| {
async move {
Expand All @@ -24,7 +24,7 @@ pub fn add_print<'a>(swarms: impl Iterator<Item = &'a mut CreatedSwarm>) {
task::block_on(s.aquamarine_api.clone().add_service(
"test".into(),
hashmap! {
"print".to_string() => print(s.peer_id)
"print".to_string() => print(s.peer_id).into(),
},
))
.expect("add service");
Expand Down
33 changes: 16 additions & 17 deletions crates/particle-node-tests/tests/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,19 @@ fn call_custom_service() {
// pub type ServiceFunction =
// Box<dyn FnMut(Args, ParticleParams) -> Output<'static> + 'static + Send + Sync>;

let closure: Box<
dyn FnMut(_, _) -> BoxFuture<'static, FunctionOutcome> + 'static + Send + Sync,
> = Box::new(move |args, params| {
async move {
println!("got call!!! {args:?} {params:?}");
FunctionOutcome::Ok(json!("hello!"))
}
.boxed()
});
let closure: Box<dyn Fn(_, _) -> BoxFuture<'static, FunctionOutcome> + 'static + Send + Sync> =
Box::new(move |args, params| {
async move {
println!("got call!!! {args:?} {params:?}");
FunctionOutcome::Ok(json!("hello!"))
}
.boxed()
});

let add_first_f = swarms[0]
.aquamarine_api
.clone()
.add_service("hello".into(), hashmap! { "world".to_string() => closure });
let add_first_f = swarms[0].aquamarine_api.clone().add_service(
"hello".into(),
hashmap! { "world".to_string() => closure.into() },
);

let (outlet, inlet) = channel();
let mut outlet = Some(outlet);
Expand All @@ -70,10 +69,10 @@ fn call_custom_service() {
.boxed()
});

let add_second_f = swarms[1]
.aquamarine_api
.clone()
.add_service("op".into(), hashmap! { "return".to_string() => closure });
let add_second_f = swarms[1].aquamarine_api.clone().add_service(
"op".into(),
hashmap! { "return".to_string() => closure.into() },
);

let script = format!(
r#"
Expand Down
9 changes: 4 additions & 5 deletions particle-builtins/src/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures::StreamExt;
use humantime_serde::re::humantime::format_duration as pretty;
use libp2p::{core::Multiaddr, kad::kbucket::Key, kad::K_VALUE, PeerId};
use multihash::{Code, MultihashDigest, MultihashGeneric};
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use serde::Deserialize;
use serde_json::{json, Value as JValue};
use JValue::Array;
Expand Down Expand Up @@ -58,9 +58,9 @@ use crate::{json, math};

pub struct CustomService {
/// (function_name -> service function)
pub functions: HashMap<String, Mutex<ServiceFunction>>,
pub functions: HashMap<String, ServiceFunction>,
/// if set, all `function_name` mismatches with `custom_service.functions` will be routed to `unhandled`
pub unhandled: Option<Mutex<ServiceFunction>>,
pub unhandled: Option<ServiceFunction>,
}

#[derive(Derivative)]
Expand Down Expand Up @@ -156,8 +156,7 @@ where
.or(fs.unhandled.as_ref())
})
{
let mut function = function.lock();
async_std::task::block_on(function(args, particle))
async_std::task::block_on(function.call(args, particle))
} else {
FunctionOutcome::NotDefined {
args,
Expand Down
21 changes: 6 additions & 15 deletions particle-builtins/src/particle_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

use futures::FutureExt;
use parking_lot::Mutex;
use std::collections::HashMap;

use connection_pool::ConnectionPoolApi;
Expand Down Expand Up @@ -45,11 +44,8 @@ where
self.custom_services.write().insert(
service,
CustomService {
functions: functions
.into_iter()
.map(|(k, v)| (k, Mutex::new(v)))
.collect(),
unhandled: unhandled.map(Mutex::new),
functions,
unhandled,
},
);
}
Expand All @@ -58,14 +54,9 @@ where
&self,
service: &str,
) -> Option<(HashMap<String, ServiceFunction>, Option<ServiceFunction>)> {
self.custom_services.write().remove(service).map(|hm| {
(
hm.functions
.into_iter()
.map(|(k, v)| (k, v.into_inner()))
.collect(),
hm.unhandled.map(Mutex::into_inner),
)
})
self.custom_services
.write()
.remove(service)
.map(|hm| (hm.functions, hm.unhandled))
}
}
1 change: 1 addition & 0 deletions particle-execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ thiserror = { workspace = true }
futures = { workspace = true }
serde_json = { workspace = true }
async-std = { workspace = true }
parking_lot = { workspace = true }
2 changes: 1 addition & 1 deletion particle-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
pub use function_outcome::FunctionOutcome;
pub use particle_function::{
Output as ParticleFunctionOutput, ParticleFunction, ParticleFunctionMut,
ParticleFunctionStatic, ServiceFunction,
ParticleFunctionStatic, ServiceFunction, ServiceFunctionImmut, ServiceFunctionMut,
};
pub use particle_params::ParticleParams;
pub use particle_vault::{ParticleVault, VaultError};
Expand Down
33 changes: 32 additions & 1 deletion particle-execution/src/particle_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,39 @@ use crate::{FunctionOutcome, ParticleParams};

pub type Output<'a> = BoxFuture<'a, FunctionOutcome>;

pub type ServiceFunction =
pub type ServiceFunctionMut =
Box<dyn FnMut(Args, ParticleParams) -> Output<'static> + 'static + Send + Sync>;
pub type ServiceFunctionImmut =
Box<dyn Fn(Args, ParticleParams) -> Output<'static> + 'static + Send + Sync>;

pub enum ServiceFunction {
Mut(parking_lot::Mutex<ServiceFunctionMut>),
Immut(ServiceFunctionImmut),
}

impl ServiceFunction {
pub fn call(&self, args: Args, particle: ParticleParams) -> Output<'static> {
match self {
ServiceFunction::Mut(f) => {
let mut func = f.lock();
func(args, particle)
}
ServiceFunction::Immut(f) => f(args, particle),
}
}
}

impl From<ServiceFunctionImmut> for ServiceFunction {
fn from(f: ServiceFunctionImmut) -> Self {
ServiceFunction::Immut(f)
}
}

impl From<ServiceFunctionMut> for ServiceFunction {
fn from(f: ServiceFunctionMut) -> Self {
ServiceFunction::Mut(parking_lot::Mutex::new(f))
}
}

pub trait ParticleFunction: 'static + Send + Sync {
fn call(&self, args: Args, particle: ParticleParams) -> Output<'_>;
Expand Down
38 changes: 19 additions & 19 deletions sorcerer/src/sorcerer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use fluence_libp2p::types::Inlet;
use key_manager::KeyManager;
use particle_args::JError;
use particle_builtins::{wrap, wrap_unit};
use particle_execution::ServiceFunction;
use particle_execution::{ServiceFunction, ServiceFunctionImmut};
use particle_modules::ModuleRepository;
use particle_services::ParticleAppServices;
use server_config::ResolvedConfig;
Expand Down Expand Up @@ -153,36 +153,36 @@ impl Sorcerer {
let mut spell_builtins: Vec<SpellBuiltin> = vec![];

let mut spell_service = SpellBuiltin::new("spell");
spell_service.append("install", self.make_spell_install_closure());
spell_service.append("remove", self.make_spell_remove_closure());
spell_service.append("list", self.make_spell_list_closure());
spell_service.append("install", self.make_spell_install_closure().into());
spell_service.append("remove", self.make_spell_remove_closure().into());
spell_service.append("list", self.make_spell_list_closure().into());
spell_service.append(
"update_trigger_config",
self.make_spell_update_config_closure(),
self.make_spell_update_config_closure().into(),
);
spell_builtins.push(spell_service);

let mut get_data_srv = SpellBuiltin::new("getDataSrv");
get_data_srv.append("spell_id", self.make_get_spell_id_closure());
get_data_srv.set_unhandled(self.make_get_spell_arg_closure());
get_data_srv.append("spell_id", self.make_get_spell_id_closure().into());
get_data_srv.set_unhandled(self.make_get_spell_arg_closure().into());
spell_builtins.push(get_data_srv);

let mut error_handler_srv = SpellBuiltin::new("errorHandlingSrv");
error_handler_srv.append("error", self.make_error_handler_closure());
error_handler_srv.append("error", self.make_error_handler_closure().into());
spell_builtins.push(error_handler_srv);

let mut callback_srv = SpellBuiltin::new("callbackSrv");
callback_srv.append("response", self.make_response_handler_closure());
callback_srv.append("response", self.make_response_handler_closure().into());
spell_builtins.push(callback_srv);

let mut scope_srv = SpellBuiltin::new("scope");
scope_srv.append("get_peer_id", self.make_get_scope_peer_id_closure());
scope_srv.append("get_peer_id", self.make_get_scope_peer_id_closure().into());
spell_builtins.push(scope_srv);

spell_builtins
}

fn make_spell_install_closure(&self) -> ServiceFunction {
fn make_spell_install_closure(&self) -> ServiceFunctionImmut {
let services = self.services.clone();
let storage = self.spell_storage.clone();
let spell_event_bus = self.spell_event_bus_api.clone();
Expand All @@ -209,7 +209,7 @@ impl Sorcerer {
})
}

fn make_spell_remove_closure(&self) -> ServiceFunction {
fn make_spell_remove_closure(&self) -> ServiceFunctionImmut {
let services = self.services.clone();
let storage = self.spell_storage.clone();
let spell_event_bus_api = self.spell_event_bus_api.clone();
Expand All @@ -227,15 +227,15 @@ impl Sorcerer {
})
}

fn make_spell_list_closure(&self) -> ServiceFunction {
fn make_spell_list_closure(&self) -> ServiceFunctionImmut {
let storage = self.spell_storage.clone();
Box::new(move |_, _| {
let storage = storage.clone();
async move { wrap(spell_list(storage)) }.boxed()
})
}

fn make_spell_update_config_closure(&self) -> ServiceFunction {
fn make_spell_update_config_closure(&self) -> ServiceFunctionImmut {
let api = self.spell_event_bus_api.clone();
let services = self.services.clone();
let key_manager = self.key_manager.clone();
Expand All @@ -247,35 +247,35 @@ impl Sorcerer {
})
}

fn make_get_spell_id_closure(&self) -> ServiceFunction {
fn make_get_spell_id_closure(&self) -> ServiceFunctionImmut {
Box::new(move |_, params| async move { wrap(get_spell_id(params)) }.boxed())
}

fn make_get_spell_arg_closure(&self) -> ServiceFunction {
fn make_get_spell_arg_closure(&self) -> ServiceFunctionImmut {
let services = self.services.clone();
Box::new(move |args, params| {
let services = services.clone();
async move { wrap(get_spell_arg(args, params, services)) }.boxed()
})
}

fn make_error_handler_closure(&self) -> ServiceFunction {
fn make_error_handler_closure(&self) -> ServiceFunctionImmut {
let services = self.services.clone();
Box::new(move |args, params| {
let services = services.clone();
async move { wrap_unit(store_error(args, params, services)) }.boxed()
})
}

fn make_response_handler_closure(&self) -> ServiceFunction {
fn make_response_handler_closure(&self) -> ServiceFunctionImmut {
let services = self.services.clone();
Box::new(move |args, params| {
let services = services.clone();
async move { wrap_unit(store_response(args, params, services)) }.boxed()
})
}

fn make_get_scope_peer_id_closure(&self) -> ServiceFunction {
fn make_get_scope_peer_id_closure(&self) -> ServiceFunctionImmut {
let key_manager = self.key_manager.clone();
Box::new(move |_, params| {
let key_manager = key_manager.clone();
Expand Down

0 comments on commit e9a05d6

Please sign in to comment.