Skip to content

Commit

Permalink
feat(spells): store last trigger by trigger key in KV [NET-511] (#1728
Browse files Browse the repository at this point in the history
)
  • Loading branch information
justprosh authored Jul 31, 2023
1 parent ba2c9d1 commit 7272040
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 69 deletions.
56 changes: 16 additions & 40 deletions crates/nox-tests/tests/spells.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,12 +901,9 @@ async fn spell_timer_trigger_mailbox_test() {
(seq
(seq
(call %init_peer_id% ("getDataSrv" "spell_id") [] spell_id)
(call %init_peer_id% (spell_id "pop_mailbox") [] result)
)
(seq
(call %init_peer_id% ("json" "parse") [result.$.message.[0].message] obj)
(call "{}" ("return" "") [obj])
(call %init_peer_id% ("getDataSrv" "trigger") [] trigger)
)
(call "{}" ("return" "") [trigger])
)
"#,
client.peer_id
Expand All @@ -931,7 +928,7 @@ async fn spell_timer_trigger_mailbox_test() {
}

#[tokio::test]
async fn spell_connection_pool_trigger_mailbox_test() {
async fn spell_connection_pool_trigger_test() {
let swarms = make_swarms(1).await;
let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.await
Expand All @@ -945,14 +942,11 @@ async fn spell_connection_pool_trigger_mailbox_test() {
(seq
(call %init_peer_id% ("getDataSrv" "spell_id") [] spell_id)
(seq
(call %init_peer_id% (spell_id "pop_mailbox") [] trigger)
(call %init_peer_id% ("run-console" "print") ["pop mailbox, trigger:" trigger])
(call %init_peer_id% ("getDataSrv" "trigger") [] trigger)
(call %init_peer_id% ("run-console" "print") ["getDataSrv, trigger:" trigger])
)
)
(seq
(call %init_peer_id% ("json" "parse") [trigger.$.message.[0].message] obj)
(call "{}" ("return" "") [obj])
)
(call "{}" ("return" "") [trigger])
)
(call %init_peer_id% ("run-console" "print") ["herror" %last_error%])
)
Expand Down Expand Up @@ -1102,9 +1096,9 @@ async fn spell_update_config() {
r#"(seq
(seq
(call %init_peer_id% ("getDataSrv" "spell_id") [] spell_id)
(call %init_peer_id% (spell_id "pop_mailbox") [] result)
(call %init_peer_id% ("getDataSrv" "trigger") [] trigger)
)
(call "{}" ("return" "") [result])
(call "{}" ("return" "") [trigger])
)"#,
client.peer_id
);
Expand All @@ -1115,20 +1109,14 @@ async fn spell_update_config() {
.await
.unwrap();

if let [JValue::Object(x)] = client
if let [trigger] = client
.receive_args()
.await
.wrap_err("receive")
.unwrap()
.as_slice()
{
assert_eq!(x["absent"], JValue::Bool(false), "spell must be triggered");
let message = x["message"].as_array().unwrap()[0]
.as_object()
.cloned()
.unwrap();
let info: TriggerInfoAqua =
serde_json::from_str(message["message"].as_str().unwrap()).unwrap();
let info: TriggerInfoAqua = serde_json::from_str(&trigger.to_string()).unwrap();
let info: TriggerInfo = info.into();
assert_matches!(info, TriggerInfo::Peer(p) if p.connected, "spell must be triggered by the `connected` event");
} else {
Expand Down Expand Up @@ -1171,20 +1159,14 @@ async fn spell_update_config() {

drop(connected);

if let [JValue::Object(x)] = client
if let [trigger] = client
.receive_args()
.await
.wrap_err("receive")
.unwrap()
.as_slice()
{
assert_eq!(x["absent"], JValue::Bool(false), "spell must be triggered");
let message = x["message"].as_array().unwrap()[0]
.as_object()
.cloned()
.unwrap();
let info: TriggerInfoAqua =
serde_json::from_str(message["message"].as_str().unwrap()).unwrap();
let info: TriggerInfoAqua = serde_json::from_str(&trigger.to_string()).unwrap();
let info: TriggerInfo = info.into();
assert_matches!(info, TriggerInfo::Peer(p) if !p.connected, "spell must be triggered by the `disconnected` event");
} else {
Expand All @@ -1204,9 +1186,9 @@ async fn spell_update_config_stopped_spell() {
r#"(seq
(seq
(call %init_peer_id% ("getDataSrv" "spell_id") [] spell_id)
(call %init_peer_id% (spell_id "pop_mailbox") [] result)
(call %init_peer_id% ("getDataSrv" "trigger") [] trigger)
)
(call "{}" ("return" "") [result])
(call "{}" ("return" "") [trigger])
)"#,
client.peer_id
);
Expand Down Expand Up @@ -1248,20 +1230,14 @@ async fn spell_update_config_stopped_spell() {
};
assert_eq!(result, "done", "spell must be updated");

if let [JValue::Object(x)] = client
if let [trigger] = client
.receive_args()
.await
.wrap_err("receive")
.unwrap()
.as_slice()
{
assert_eq!(x["absent"], JValue::Bool(false), "spell must be triggered");
let message = x["message"].as_array().unwrap()[0]
.as_object()
.cloned()
.unwrap();
let info: TriggerInfoAqua =
serde_json::from_str(message["message"].as_str().unwrap()).unwrap();
let info: TriggerInfoAqua = serde_json::from_str(&trigger.to_string()).unwrap();
let info: TriggerInfo = info.into();
assert_matches!(
info,
Expand Down
42 changes: 13 additions & 29 deletions sorcerer/src/script_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,38 +127,22 @@ impl Sorcerer {
event: TriggerEvent,
worker_id: PeerId,
) -> Result<(), JError> {
log::info!("storing trigger {:?}", event);
let serialized_event = serde_json::to_string(&TriggerInfoAqua::from(event.info))?;

let func_outcome = self.services.call_function(
worker_id,
process_func_outcome::<UnitValue>(
self.services.call_function(
worker_id,
&event.spell_id,
"set_string",
vec![json!("trigger"), json!(serialized_event)],
None,
worker_id,
self.spell_script_particle_ttl,
),
&event.spell_id,
"push_mailbox",
vec![json!(serialized_event)],
None,
worker_id,
self.spell_script_particle_ttl,
);

match process_func_outcome::<UnitValue>(func_outcome, &event.spell_id, "push_mailbox") {
Ok(_) => Ok(()),
Err(err) => {
log::warn!("Error on push_mailbox for spell {}: {}. Trying a fallback with list_push_string", event.spell_id, err);

// fallback for older spell versions
let func_outcome = self.services.call_function(
worker_id,
&event.spell_id,
"list_push_string",
vec![json!("trigger_mailbox"), json!(serialized_event)],
None,
worker_id,
self.spell_script_particle_ttl,
);
process_func_outcome::<UnitValue>(func_outcome, &event.spell_id, "list_push_string")
.map(drop)
}
}
"set_string",
)
.map(drop)
}

pub async fn execute_script(&self, event: TriggerEvent) {
Expand Down

0 comments on commit 7272040

Please sign in to comment.