Skip to content

Commit

Permalink
feat(anomaly): new anomaly criteria [fixes vm-426] (#2037)
Browse files Browse the repository at this point in the history
* feat(anomaly): new anomaly criteria [fixes vm-426]

* WIP review changes

---------

Co-authored-by: folex <[email protected]>
Co-authored-by: Nick <[email protected]>
  • Loading branch information
3 people authored Jan 31, 2024
1 parent f56db61 commit d59b885
Show file tree
Hide file tree
Showing 16 changed files with 315 additions and 5 deletions.
120 changes: 116 additions & 4 deletions aquamarine/src/particle_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::path::{Path, PathBuf};
use std::time::Duration;

use avm_server::avm_runner::RawAVMOutcome;
use avm_server::{AnomalyData, CallResults, ParticleParameters};
use avm_server::{AnomalyData, CallResults, CallServiceResult, ParticleParameters};
use fluence_libp2p::PeerId;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
Expand All @@ -29,6 +29,7 @@ use tracing::instrument;

use now_millis::now_ms;
use particle_execution::{ParticleVault, VaultError};
use serde_json::Value as JValue;

type Result<T> = std::result::Result<T, DataStoreError>;

Expand Down Expand Up @@ -72,6 +73,9 @@ impl ParticleDataStore {

const EXECUTION_TIME_THRESHOLD: Duration = Duration::from_millis(500);
const MEMORY_DELTA_BYTES_THRESHOLD: usize = 10 * bytesize::MB as usize;
const AIR_SCRIPT_SIZE_THRESHOLD: usize = 8 * bytesize::MB as usize;
const PARTICLE_DATA_SIZE_THRESHOLD: usize = 64 * bytesize::MB as usize;
const CALL_SERVICE_RESULT_SIZE_THRESHOLD: usize = 8 * bytesize::MB as usize;

impl ParticleDataStore {
pub async fn initialize(&self) -> Result<()> {
Expand Down Expand Up @@ -148,15 +152,30 @@ impl ParticleDataStore {
Ok(())
}

fn detect_mem_limits_anomaly(
&self,
memory_delta: usize,
outcome: &RawAVMOutcome,
call_results: &CallResults,
air_script: &str,
) -> bool {
memory_delta > MEMORY_DELTA_BYTES_THRESHOLD
|| outcome.data.len() > PARTICLE_DATA_SIZE_THRESHOLD
|| air_script.len() > AIR_SCRIPT_SIZE_THRESHOLD
|| call_results.values().any(call_result_size_limit_check)
}

pub fn detect_anomaly(
&self,
execution_time: Duration,
memory_delta: usize,
outcome: &RawAVMOutcome,
call_results: &CallResults,
air_script: &str,
) -> bool {
execution_time > EXECUTION_TIME_THRESHOLD
|| memory_delta > MEMORY_DELTA_BYTES_THRESHOLD
|| outcome.ret_code != 0
|| self.detect_mem_limits_anomaly(memory_delta, outcome, call_results, air_script)
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -225,6 +244,21 @@ impl ParticleDataStore {
}
}

fn get_jvalue_size(value: &JValue) -> usize {
match value {
JValue::Null => 0,
JValue::Bool(_) => 1,
JValue::Number(_) => 8,
JValue::String(s) => s.len(),
JValue::Array(arr) => arr.iter().map(get_jvalue_size).sum(),
JValue::Object(obj) => obj.iter().map(|(k, v)| k.len() + get_jvalue_size(v)).sum(),
}
}

fn call_result_size_limit_check(service_result: &CallServiceResult) -> bool {
get_jvalue_size(&service_result.result) > CALL_SERVICE_RESULT_SIZE_THRESHOLD
}

#[derive(Debug, Error)]
pub enum DataStoreError {
#[error("error creating particle_data_store")]
Expand All @@ -251,9 +285,12 @@ fn store_key_from_components(particle_id: &str, current_peer_id: &str) -> String

#[cfg(test)]
mod tests {
use crate::particle_data_store::{
AIR_SCRIPT_SIZE_THRESHOLD, CALL_SERVICE_RESULT_SIZE_THRESHOLD, PARTICLE_DATA_SIZE_THRESHOLD,
};
use crate::ParticleDataStore;
use avm_server::avm_runner::RawAVMOutcome;
use avm_server::CallRequests;
use avm_server::{CallRequests, CallResults, CallServiceResult};
use std::path::PathBuf;
use std::time::Duration;

Expand Down Expand Up @@ -306,6 +343,9 @@ mod tests {

#[tokio::test]
async fn test_detect_anomaly() {
use AIR_SCRIPT_SIZE_THRESHOLD;
use PARTICLE_DATA_SIZE_THRESHOLD;

let particle_data_store = ParticleDataStore::new(
PathBuf::from("dummy"),
PathBuf::from("dummy"),
Expand Down Expand Up @@ -335,15 +375,87 @@ mod tests {
execution_time_below_threshold,
memory_delta_below_threshold,
&outcome_success,
&CallResults::new(),
"(null)",
);

assert!(!anomaly_below_threshold);

let anomaly_above_threshold = particle_data_store.detect_anomaly(
execution_time_above_threshold,
memory_delta_above_threshold,
&outcome_failure,
&CallResults::new(),
"(null)",
);

assert!(!anomaly_below_threshold);
assert!(anomaly_above_threshold);

let anomaly_below_air_size_limit = particle_data_store.detect_anomaly(
execution_time_below_threshold,
memory_delta_below_threshold,
&outcome_success,
&CallResults::new(),
"(null)",
);

assert!(!anomaly_below_air_size_limit);

let air = "a".repeat(AIR_SCRIPT_SIZE_THRESHOLD + 1);
let anomaly_above_air_size_limit = particle_data_store.detect_anomaly(
execution_time_below_threshold,
memory_delta_below_threshold,
&outcome_success,
&CallResults::new(),
&air,
);

assert!(anomaly_above_air_size_limit);

let anomaly_below_particle_size_limit = particle_data_store.detect_anomaly(
execution_time_below_threshold,
memory_delta_below_threshold,
&outcome_success,
&CallResults::new(),
"(null)",
);

assert!(!anomaly_below_particle_size_limit);

let outcome = RawAVMOutcome {
ret_code: 0,
error_message: "".to_string(),
data: vec![0; PARTICLE_DATA_SIZE_THRESHOLD + 1],
call_requests: CallRequests::new(),
next_peer_pks: vec![],
};
let anomaly_above_particle_size_limit = particle_data_store.detect_anomaly(
execution_time_below_threshold,
memory_delta_below_threshold,
&outcome,
&CallResults::new(),
"(null)",
);

assert!(anomaly_above_particle_size_limit);

let call_result = CallServiceResult {
ret_code: 0,
result: serde_json::json!({ "data": vec![0; CALL_SERVICE_RESULT_SIZE_THRESHOLD + 1] }),
};
let mut call_results = CallResults::new();
call_results.insert(42, call_result);
let anomaly_below_call_result_size_limit = particle_data_store.detect_anomaly(
execution_time_below_threshold,
memory_delta_below_threshold,
&outcome_success,
&call_results,
"(null)",
);

assert!(anomaly_below_call_result_size_limit);

// let anomaly_call_result_size =
}

#[tokio::test]
Expand Down
8 changes: 7 additions & 1 deletion aquamarine/src/particle_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,13 @@ where
humantime::format_duration(stats.interpretation_time), prev_data_len, len
);

if data_store.detect_anomaly(stats.interpretation_time, stats.memory_delta, outcome) {
if data_store.detect_anomaly(
stats.interpretation_time,
stats.memory_delta,
outcome,
&avm_result.call_results,
avm_result.particle.script.as_str(),
) {
let anomaly_result = data_store
.save_anomaly_data(
avm_result.particle.script.as_str(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name": "registry",
"dependencies": [
"name:sqlite3",
"name:registry"
]
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name": "registry",
"max_heap_size": "8 MiB",
"preopened_files": ["/tmp"],
"mapped_dirs": {
"tmp": "./tmp"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
; This file is auto-generated. Do not edit manually: changes may be erased.
; Generated by Aqua compiler: https://github.com/fluencelabs/aqua/.
; If you find any bugs, please write an issue on GitHub: https://github.com/fluencelabs/aqua/issues
; Aqua version: 0.7.3-319

(xor
(xor
(seq
(call %init_peer_id% ("peer" "timestamp_sec") [] t)
(call %init_peer_id% ("registry" "clear_expired") [t])
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
; This file is auto-generated. Do not edit manually: changes may be erased.
; Generated by Aqua compiler: https://github.com/fluencelabs/aqua/.
; If you find any bugs, please write an issue on GitHub: https://github.com/fluencelabs/aqua/issues
; Aqua version: 0.7.3-319

(xor
(xor
(seq
(seq
(call %init_peer_id% ("peer" "timestamp_sec") [] t)
(call %init_peer_id% ("registry" "evict_stale") [t] res)
)
(par
(fold res.$.results! r-0
(par
(seq
(seq
(call %init_peer_id% ("op" "string_to_b58") [r-0.$.key.id!] k)
(call %init_peer_id% ("kad" "neighborhood") [k [] []] nodes)
)
(par
(fold nodes n-0
(par
(new $records_weights
(xor
(seq
(seq
(seq
(seq
(call n-0 ("peer" "timestamp_sec") [] tt)
(call n-0 ("trust-graph" "get_weight") [r-0.$.key.owner_peer_id! tt] key_weight)
)
(call n-0 ("registry" "republish_key") [r-0.$.key! key_weight tt])
)
(fold r-0.$.records! record-0
(seq
(call n-0 ("trust-graph" "get_weight") [record-0.$.peer_id! tt] $records_weights)
(next record-0)
)
)
)
(call n-0 ("registry" "republish_records") [r-0.$.records! $records_weights tt])
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
)
)
(next n-0)
)
)
(null)
)
)
(next r-0)
)
)
(null)
)
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
)
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"name": "sqlite3",
"max_heap_size": "64 MiB"
}
7 changes: 7 additions & 0 deletions particle-node/tests/builtins/services/registry/blueprint.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name": "registry",
"dependencies": [
"name:sqlite3",
"name:registry"
]
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name": "registry",
"max_heap_size": "8 MiB",
"preopened_files": ["/tmp"],
"mapped_dirs": {
"tmp": "./tmp"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
; This file is auto-generated. Do not edit manually: changes may be erased.
; Generated by Aqua compiler: https://github.com/fluencelabs/aqua/.
; If you find any bugs, please write an issue on GitHub: https://github.com/fluencelabs/aqua/issues
; Aqua version: 0.7.3-319

(xor
(xor
(seq
(call %init_peer_id% ("peer" "timestamp_sec") [] t)
(call %init_peer_id% ("registry" "clear_expired") [t])
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
Loading

0 comments on commit d59b885

Please sign in to comment.