Skip to content

Commit

Permalink
feat(services,avm): use marine and avm-server with memory limits (#1957)
Browse files Browse the repository at this point in the history
* initial marine + avm-server update attempt

* update system services

* try fix tests

* fix create service test

* update system services

* uodate everything except aqua-ipfs distro

* use aqua-ipfs snapshot

* fix: update cargo.lock

* chore: update cargo.lock

* fix: remove max_mem metrics

* fix: module config test

* update cargo.lock

* Update avm-server version

* use aquavm snapshot

* point e2e to fixed decider

* point e2e to fixed decider 2

* point e2e to fixed decider 3

* update marine to super latest

* update lockfile

* use snapshot with older aquavm

* use decider from master

* use decider from master

* Revert "use decider from master"

This reverts commit b951392.

* Revert "use decider from master"

This reverts commit 547ab4c.

* update lockfile

* use decider from master

* use decider from master

---------

Co-authored-by: folex <[email protected]>
  • Loading branch information
ValeryAntopol and folex authored Jan 23, 2024
1 parent 9a6823f commit 66edf0f
Show file tree
Hide file tree
Showing 18 changed files with 588 additions and 756 deletions.
1,088 changes: 540 additions & 548 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ fluence-spell-dtos = "=0.6.9"
fluence-spell-distro = "=0.6.9"

# marine
fluence-app-service = { version = "0.29.0" }
fluence-app-service = { version = "0.31.1" }
marine-utils = "0.5.1"
marine-it-parser = "0.15.1"

# avm
avm-server = "0.33.4"
air-interpreter-wasm = "0.55.0"
avm-server = { version = "=0.55.1-feat-intermediate-marine-update-f3e4695-4194-1.0", registry = "fluence" }
air-interpreter-wasm = { version = "=0.55.1-feat-intermediate-marine-update-f3e4695-4194-1.0", registry = "fluence" }

# libp2p
libp2p = { version = "0.53.2", features = ["noise", "tcp", "dns", "websocket", "yamux", "tokio", "kad", "ping", "identify", "macros"] }
Expand Down
3 changes: 2 additions & 1 deletion aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,8 @@ mod tests {
fn memory_stats(&self) -> AVMMemoryStats {
AVMMemoryStats {
memory_size: 0,
max_memory_size: None,
total_memory_limit: None,
allocation_rejects: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/nox-tests/tests/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async fn test_system_service_override() {
// It's unnecessary to allow not static links or even vectors since real life we need only this
let module = include_bytes!("./tetraplets/artifacts/tetraplets.wasm");
let config = json!({
"total_memory_limit": "Infinity",
"module": [
{
"name": "tetraplets",
Expand Down
5 changes: 4 additions & 1 deletion crates/peer-metrics/src/services_metrics/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ impl ServicesMetricsBuiltin {
}

pub fn get_used_memory(stats: &MemoryStats) -> u64 {
stats.0.iter().fold(0, |acc, x| acc + x.memory_size as u64)
stats
.modules
.iter()
.fold(0, |acc, x| acc + x.memory_size as u64)
}
}
30 changes: 0 additions & 30 deletions crates/peer-metrics/src/services_metrics/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use prometheus_client::metrics::histogram::{linear_buckets, Histogram};
use prometheus_client::registry::Registry;
use std::fmt::Write;

use fluence_app_service::ModuleDescriptor;
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue, LabelValueEncoder};
use prometheus_client::metrics::family::Family;

Expand Down Expand Up @@ -41,8 +40,6 @@ pub struct ServiceTypeLabel {

#[derive(Clone)]
pub struct ServicesMemoryMetrics {
/// Maximum memory set in module config
pub mem_max_bytes: Histogram,
/// Actual memory used by a module
pub mem_max_per_module_bytes: Histogram,
/// Actual memory used by a service
Expand All @@ -53,19 +50,6 @@ pub struct ServicesMemoryMetrics {
pub mem_used_total_bytes: Family<ServiceTypeLabel, Gauge>,
}

impl ServicesMemoryMetrics {
/// Collect the service and the service's modules max available memory.
pub fn observe_service_max_mem(&self, default_max: u64, modules_config: &[ModuleDescriptor]) {
let mut max_service_size = 0;
for module_config in modules_config {
let module_max = module_config.config.max_heap_size.unwrap_or(default_max);
self.mem_max_per_module_bytes.observe(module_max as f64);
max_service_size += module_max;
}
self.mem_max_bytes.observe(max_service_size as f64);
}
}

#[derive(Clone)]
pub struct ServicesMetricsExternal {
/// Number of currently running services
Expand Down Expand Up @@ -134,13 +118,6 @@ impl ServicesMetricsExternal {
"number of srv remove calls",
);

let mem_max_bytes = register(
sub_registry,
Histogram::new(mem_buckets_8gib()),
"mem_max_bytes",
"maximum memory set in module config per service",
);

let mem_max_per_module_bytes = register(
sub_registry,
Histogram::new(mem_buckets_4gib()),
Expand Down Expand Up @@ -198,7 +175,6 @@ impl ServicesMetricsExternal {
);

let memory_metrics = ServicesMemoryMetrics {
mem_max_bytes,
mem_max_per_module_bytes,
mem_used_bytes,
mem_used_per_module_bytes,
Expand Down Expand Up @@ -233,12 +209,6 @@ impl ServicesMetricsExternal {
}
}

/// Collect the service and the service's modules max available memory.
pub fn observe_service_max_mem(&self, default_max: u64, modules_config: &[ModuleDescriptor]) {
self.memory_metrics
.observe_service_max_mem(default_max, modules_config);
}

/// Collect all metrics that are relevant on service removal.
pub fn observe_removed(&self, service_type: ServiceType, removal_time: f64) {
let label = ServiceTypeLabel { service_type };
Expand Down
2 changes: 1 addition & 1 deletion crates/peer-metrics/src/services_metrics/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl ServiceMemoryStat {
pub fn new(stats: &MemoryStats) -> ServiceMemoryStat {
let mut modules_stats = HashMap::new();
let mut used_mem: MemorySize = 0;
for stat in &stats.0 {
for stat in &stats.modules {
modules_stats.insert(stat.name.to_string(), stat.memory_size as MemorySize);
used_mem += stat.memory_size as MemorySize;
}
Expand Down
7 changes: 0 additions & 7 deletions crates/peer-metrics/src/services_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::services_metrics::external::ServiceTypeLabel;
pub use crate::services_metrics::external::ServicesMetricsExternal;
pub use crate::services_metrics::message::{ServiceCallStats, ServiceMemoryStat};
use crate::ServiceCallStats::Success;
use fluence_app_service::ModuleDescriptor;
use prometheus_client::registry::Registry;
use tokio::sync::mpsc;
use tokio::sync::mpsc::unbounded_channel;
Expand Down Expand Up @@ -177,12 +176,6 @@ impl ServicesMetrics {
});
}

pub fn observe_service_config(&self, max_heap_size: u64, modules_config: &[ModuleDescriptor]) {
self.observe_external(|external| {
external.observe_service_max_mem(max_heap_size, modules_config);
});
}

fn observe_external<F>(&self, callback: F)
where
F: FnOnce(&ServicesMetricsExternal),
Expand Down
2 changes: 1 addition & 1 deletion crates/server-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ pub fn default_key_format() -> String {
"ed25519".to_string()
}

pub fn default_module_max_heap_size() -> bytesize::ByteSize {
pub fn default_service_memory_limit() -> bytesize::ByteSize {
bytesize::ByteSize::b(bytesize::gib(4_u64) - 1)
}

Expand Down
23 changes: 7 additions & 16 deletions crates/server-config/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,10 @@ pub struct UnresolvedNodeConfig {
#[serde(default)]
pub aquavm_max_heap_size: Option<bytesize::ByteSize>,

/// Maximum heap size in bytes available for a WASM module.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "default_module_max_heap_size")]
pub module_max_heap_size: bytesize::ByteSize,

/// Default heap size in bytes available for a WASM module unless otherwise specified.
/// Default heap size in bytes available for a WASM service unless otherwise specified.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub module_default_heap_size: Option<bytesize::ByteSize>,
pub default_service_memory_limit: Option<bytesize::ByteSize>,

#[serde(default)]
pub kademlia: KademliaConfig,
Expand Down Expand Up @@ -170,9 +165,8 @@ impl UnresolvedNodeConfig {
services_envs: self.services_envs,
protocol_config: self.protocol_config,
aquavm_pool_size: self.aquavm_pool_size,
aquavm_max_heap_size: self.aquavm_max_heap_size,
module_max_heap_size: self.module_max_heap_size,
module_default_heap_size: self.module_default_heap_size,
aquavm_heap_size_limit: self.aquavm_max_heap_size,
default_service_memory_limit: self.default_service_memory_limit,
kademlia: self.kademlia,
particle_queue_buffer: self.particle_queue_buffer,
effects_queue_buffer: self.effects_queue_buffer,
Expand Down Expand Up @@ -325,13 +319,10 @@ pub struct NodeConfig {
pub aquavm_pool_size: usize,

/// Maximum heap size in bytes available for an interpreter instance.
pub aquavm_max_heap_size: Option<bytesize::ByteSize>,

/// Maximum heap size in bytes available for a WASM module.
pub module_max_heap_size: bytesize::ByteSize,
pub aquavm_heap_size_limit: Option<bytesize::ByteSize>,

/// Default heap size in bytes available for a WASM module unless otherwise specified.
pub module_default_heap_size: Option<bytesize::ByteSize>,
/// Default heap size in bytes available for a WASM service unless otherwise specified.
pub default_service_memory_limit: Option<bytesize::ByteSize>,

pub kademlia: KademliaConfig,

Expand Down
10 changes: 3 additions & 7 deletions crates/server-config/src/services_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ pub struct ServicesConfig {
pub management_peer_id: PeerId,
/// key to manage builtins services initialization
pub builtins_management_peer_id: PeerId,
/// Maximum heap size in bytes available for the module.
pub max_heap_size: ByteSize,
/// Default heap size in bytes available for the module unless otherwise specified.
pub default_heap_size: Option<ByteSize>,
pub default_service_memory_limit: Option<ByteSize>,
/// List of allowed binaries paths
pub allowed_binaries: HashSet<PathBuf>,
}
Expand All @@ -60,8 +58,7 @@ impl ServicesConfig {
envs: HashMap<String, String>,
management_peer_id: PeerId,
builtins_management_peer_id: PeerId,
max_heap_size: ByteSize,
default_heap_size: Option<ByteSize>,
default_service_memory_limit: Option<ByteSize>,
allowed_binaries: Vec<String>,
) -> Result<Self, std::io::Error> {
let base_dir = to_abs_path(base_dir);
Expand Down Expand Up @@ -89,8 +86,7 @@ impl ServicesConfig {
envs,
management_peer_id,
builtins_management_peer_id,
max_heap_size,
default_heap_size,
default_service_memory_limit,
allowed_binaries,
};

Expand Down
7 changes: 2 additions & 5 deletions crates/spell-service-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,16 +324,15 @@ mod tests {

let workers = Arc::new(workers);

let max_heap_size = server_config::default_module_max_heap_size();
let service_memory_limit = server_config::default_service_memory_limit();
let config = ServicesConfig::new(
local_pid,
base_dir,
vault_dir,
HashMap::new(),
management_pid,
root_key_pair.get_peer_id(),
max_heap_size,
None,
Some(service_memory_limit),
Default::default(),
)
.unwrap();
Expand All @@ -342,8 +341,6 @@ mod tests {
&config.modules_dir,
&config.blueprint_dir,
&config.particles_vault_dir,
max_heap_size,
None,
Default::default(),
);

Expand Down
3 changes: 2 additions & 1 deletion crates/toy-vms/src/easy_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ impl AquaRuntime for EasyVM {
fn memory_stats(&self) -> AVMMemoryStats {
AVMMemoryStats {
memory_size: 0,
max_memory_size: None,
total_memory_limit: None,
allocation_rejects: None,
}
}
}
2 changes: 1 addition & 1 deletion nox/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ fn vm_config(config: &ResolvedConfig) -> VmConfig {
config.dir_config.air_interpreter_path.clone(),
config
.node_config
.aquavm_max_heap_size
.aquavm_heap_size_limit
.map(|byte_size| byte_size.as_u64()),
)
}
3 changes: 1 addition & 2 deletions nox/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ impl<RT: AquaRuntime> Node<RT> {
config.services_envs.clone(),
config.management_peer_id,
builtins_peer_id,
config.node_config.module_max_heap_size,
config.node_config.module_default_heap_size,
config.node_config.default_service_memory_limit,
config.node_config.allowed_binaries.clone(),
)
.expect("create services config");
Expand Down
27 changes: 7 additions & 20 deletions particle-builtins/src/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ where
modules_dir,
blueprint_dir,
vault_dir,
config.max_heap_size,
config.default_heap_size,
config.allowed_binaries.clone(),
);
let particles_vault_dir = vault_dir.to_path_buf();
Expand Down Expand Up @@ -972,16 +970,10 @@ fn make_module_config(args: Args) -> Result<JValue, JError> {
let mut args = args.function_args.into_iter();

let name = Args::next("name", &mut args)?;
let mem_pages_count = Args::next_opt("mem_pages_count", &mut args)?;
let max_heap_size: Option<String> = Args::next_opt("max_heap_size", &mut args)?;
let max_heap_size = match max_heap_size {
Some(s) => Some(bytesize::ByteSize::from_str(&s).map_err(|err| {
JError::new(format!(
"error parsing max_heap_size from String to ByteSize: {err}"
))
})?),
None => None,
};
// These are not used anymore, keep them for backward compatibility, because args are positional
let _mem_pages_count: Option<u32> = Args::next_opt("mem_pages_count", &mut args)?;
let _max_heap_size: Option<String> = Args::next_opt("max_heap_size", &mut args)?;

let logger_enabled = Args::next_opt("logger_enabled", &mut args)?;
let preopened_files = Args::next_opt("preopened_files", &mut args)?;
let envs = Args::next_opt("envs", &mut args)?.map(table);
Expand All @@ -994,8 +986,6 @@ fn make_module_config(args: Args) -> Result<JValue, JError> {
load_from: None,
file_name: None,
config: ModuleConfig {
mem_pages_count,
max_heap_size,
logger_enabled,
wasi: Some(WASIConfig {
preopened_files,
Expand Down Expand Up @@ -1059,8 +1049,6 @@ where

#[cfg(test)]
mod prop_tests {
use std::str::FromStr;

use prop::collection::vec;
use proptest::arbitrary::StrategyFor;
use proptest::collection::{SizeRange, VecStrategy};
Expand Down Expand Up @@ -1117,7 +1105,7 @@ mod prop_tests {
let args = vec![
json!(name), // required: name
json!(mem_pages), // mem_pages_count = optional: None
json!(heap), // optional: max_heap_size
json!(heap), // optional: max_heap_size
json!(logger_enabled), // optional: logger_enabled
json!(preopened_files), // optional: preopened_files
json!(envs), // optional: envs
Expand All @@ -1133,9 +1121,8 @@ mod prop_tests {
};

let config = make_module_config(args).expect("parse config via make_module_config");
let prop_heap = heap.get(0).map(|h| bytesize::ByteSize::from_str(h).unwrap().to_string());
let config_heap = config.get("max_heap_size").map(|h| bytesize::ByteSize::from_str(h.as_str().unwrap()).unwrap().to_string());
prop_assert_eq!(prop_heap, config_heap);
let config_name = config.get("name").and_then(|n| n.as_str()).expect("'name' field in the config");
prop_assert_eq!(config_name, name);
}
}
}
Loading

0 comments on commit 66edf0f

Please sign in to comment.