Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(runtime)!: use async marine [fixes VM-365,NET-758,NET-757] #2206

Merged
merged 33 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
342 changes: 97 additions & 245 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,14 @@ fluence-spell-dtos = "=0.7.5"
fluence-spell-distro = "=0.7.5"

# marine
fluence-app-service = "0.35.1"
fluence-app-service = { version = "=0.35.2", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call", features = ["wasmtime"] }
marine-utils = "0.5.1"
marine-it-parser = "0.16.0"
marine-module-info-parser = "0.15.0"
marine-it-parser = { version = "0.16.0", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call" }
marine-module-info-parser = { version = "0.15.0", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call" }
marine-wasmtime-backend = { version = "0.6.0", git = "https://github.com/fluencelabs/marine", branch = "feat/async-call" }

# avm
avm-server = "=0.37.0"
avm-server = { git = "https://github.com/fluencelabs/aquavm", version = "=0.37.0", branch = "feat/use-async-marine" }
air-interpreter-wasm = "=0.62.0"

# libp2p
Expand Down
2 changes: 2 additions & 0 deletions aquamarine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ workers = { workspace = true }
types = { workspace = true }

avm-server = { workspace = true }
marine-wasmtime-backend = { workspace = true }
fluence-app-service = { workspace = true }
libp2p = { workspace = true }
fluence-keypair = { workspace = true }

Expand Down
50 changes: 35 additions & 15 deletions aquamarine/src/aqua_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

use async_trait::async_trait;
use std::str::FromStr;
use std::{error::Error, task::Waker};

Expand All @@ -23,12 +24,14 @@ use avm_server::{
};
use fluence_keypair::KeyPair;
use libp2p::PeerId;
use marine_wasmtime_backend::{WasmtimeConfig, WasmtimeWasmBackend};
use tracing::Level;

use crate::config::VmConfig;
use crate::error::{ExecutionError, FieldError};
use crate::particle_effects::ParticleEffects;

#[async_trait]
pub trait AquaRuntime: Sized + Send + 'static {
type Config: Clone + Send + 'static;
type Error: Error + Send + Sync + 'static;
Expand All @@ -41,11 +44,11 @@ pub trait AquaRuntime: Sized + Send + 'static {
particle_id: String,
) -> ParticleEffects;

fn call(
async fn call(
&mut self,
air: impl Into<String>,
prev_data: impl Into<Vec<u8>>,
current_data: impl Into<Vec<u8>>,
air: impl Into<String> + Send,
prev_data: impl Into<Vec<u8>> + Send,
current_data: impl Into<Vec<u8>> + Send,
particle_params: ParticleParameters<'_>,
call_results: CallResults,
key_pair: &KeyPair,
Expand All @@ -55,24 +58,40 @@ pub trait AquaRuntime: Sized + Send + 'static {
fn memory_stats(&self) -> AVMMemoryStats;
}

impl AquaRuntime for AVMRunner {
#[async_trait]
impl AquaRuntime for AVMRunner<WasmtimeWasmBackend> {
type Config = VmConfig;
type Error = RunnerError;

/// Creates `AVM` in background (on blocking threadpool)
fn create_runtime(config: Self::Config, waker: Waker) -> Result<Self, Self::Error> {
let mut wasmtime_config = WasmtimeConfig::default();
// TODO async-marine: impl proper configuration
// TODO async-marine: move to the right place
// TODO async-marine: maybe use the same as for ParticleAppServices
wasmtime_config
.debug_info(true)
.wasm_backtrace(true)
.epoch_interruption(true)
.async_wasm_stack(2 * 1024 * 1024)
.max_wasm_stack(2 * 1024 * 1024);
let backend = WasmtimeWasmBackend::new(wasmtime_config).map_err(|e| {
Self::Error::MarineError(fluence_app_service::MarineError::EngineError(e.into()))
})?;
let avm_runtime_limits = AVMRuntimeLimits::new(
config.air_size_limit,
config.particle_size_limit,
config.call_result_size_limit,
config.hard_limit_enabled,
);
let vm: AVMRunner = AVMRunner::new(
config.air_interpreter,
config.max_heap_size,
avm_runtime_limits,
i32::MAX,
)?;
let vm: AVMRunner<WasmtimeWasmBackend> =
tokio::runtime::Handle::current().block_on(AVMRunner::new(
config.air_interpreter,
config.max_heap_size,
avm_runtime_limits,
i32::MAX,
backend,
))?;
waker.wake();
Ok(vm)
}
Expand Down Expand Up @@ -122,11 +141,11 @@ impl AquaRuntime for AVMRunner {
}

#[inline]
fn call(
async fn call(
&mut self,
air: impl Into<String>,
prev_data: impl Into<Vec<u8>>,
current_data: impl Into<Vec<u8>>,
air: impl Into<String> + Send,
prev_data: impl Into<Vec<u8>> + Send,
current_data: impl Into<Vec<u8>> + Send,
particle_params: ParticleParameters<'_>,
call_results: CallResults,
key_pair: &KeyPair,
Expand All @@ -144,6 +163,7 @@ impl AquaRuntime for AVMRunner {
key_pair,
particle_params.particle_id.to_string(),
)
.await
}

fn memory_stats(&self) -> AVMMemoryStats {
Expand Down
4 changes: 3 additions & 1 deletion aquamarine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ mod particle_effects;
mod health;
mod vm_pool;

use marine_wasmtime_backend::WasmtimeWasmBackend;

pub use crate::aqua_runtime::AquaRuntime;
pub use crate::aquamarine::{AquamarineApi, AquamarineBackend};
pub use crate::config::{DataStoreConfig, VmConfig, VmPoolConfig};
pub use crate::particle_effects::{InterpretationStats, ParticleEffects, RemoteRoutingEffects};
pub use avm_server::avm_runner::AVMRunner;
pub type AVMRunner = avm_server::avm_runner::AVMRunner<WasmtimeWasmBackend>;
pub use error::AquamarineApiError;
pub use particle_data_store::{DataStoreError, ParticleDataStore};
pub use plumber::Plumber;
20 changes: 11 additions & 9 deletions aquamarine/src/particle_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ async fn avm_call<'a, RT: AquaRuntime>(
prev_data: Vec<u8>,
) -> Result<AVMCallResult<'a, RT>, JoinError> {
spawner
.spawn_avm_call(move || {
.spawn_avm_call(async move {
let particle_id = particle.id.clone();
let now = Instant::now();
let memory_size_before = vm.memory_stats().memory_size;
Expand All @@ -268,14 +268,16 @@ async fn avm_call<'a, RT: AquaRuntime>(
ttl: particle.ttl,
};
let current_data = &particle.data[..];
let avm_outcome = vm.call(
&particle.script,
prev_data,
current_data,
particle_params.clone(),
call_results.clone(),
&key_pair,
);
let avm_outcome = vm
.call(
&particle.script,
prev_data,
current_data,
particle_params.clone(),
call_results.clone(),
&key_pair,
)
.await;
let memory_size_after = vm.memory_stats().memory_size;

let interpretation_time = now.elapsed();
Expand Down
9 changes: 5 additions & 4 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ mod tests {

struct VMMock;

#[async_trait]
impl AquaRuntime for VMMock {
type Config = ();
type Error = Infallible;
Expand All @@ -710,11 +711,11 @@ mod tests {
}
}

fn call(
async fn call(
&mut self,
_air: impl Into<String>,
_prev_data: impl Into<Vec<u8>>,
_current_data: impl Into<Vec<u8>>,
_air: impl Into<String> + Send,
_prev_data: impl Into<Vec<u8>> + Send,
_current_data: impl Into<Vec<u8>> + Send,
_particle_params: ParticleParameters<'_>,
_call_results: CallResults,
_key_pair: &KeyPair,
Expand Down
24 changes: 13 additions & 11 deletions aquamarine/src/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ pub(crate) trait SpawnFunctions {
///
/// - `F`: The type of the closure.
/// - `R`: The type of the result returned by the closure.
fn spawn_avm_call<F, R>(&self, func: F) -> JoinHandle<F::Output>
fn spawn_avm_call<F>(&self, fut: F) -> JoinHandle<F::Output>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static;
F: Future + Send + 'static,
F::Output: Send + 'static;

/// Shift execution to the specific pool
///
Expand Down Expand Up @@ -122,12 +122,13 @@ impl SpawnFunctions for RootSpawner {
.expect("Failed to spawn a task")
}

fn spawn_avm_call<F, R>(&self, func: F) -> JoinHandle<F::Output>
fn spawn_avm_call<F>(&self, fut: F) -> JoinHandle<F::Output>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.runtime_handle.spawn_blocking(func)
self.runtime_handle
.spawn_blocking(|| Handle::current().block_on(fut))
}

fn wrap<F>(&self, fut: F) -> TokioContext<F>
Expand Down Expand Up @@ -178,12 +179,13 @@ impl SpawnFunctions for WorkerSpawner {
.expect("Failed to spawn a task")
}

fn spawn_avm_call<F, R>(&self, func: F) -> JoinHandle<F::Output>
fn spawn_avm_call<F>(&self, fut: F) -> JoinHandle<F::Output>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.runtime_handle.spawn(async { func() })
let fut = async { fut.await };
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
self.runtime_handle.spawn(fut)
}

fn wrap<F>(&self, fut: F) -> TokioContext<F>
Expand Down
1 change: 1 addition & 0 deletions crates/connected-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ particle-protocol = { workspace = true }
fluence-libp2p = { workspace = true }
test-constants = { workspace = true }
local-vm = { workspace = true }
marine-wasmtime-backend = { workspace = true }

fluence-keypair = { workspace = true }
libp2p = { workspace = true, features = ["identify"] }
Expand Down
5 changes: 3 additions & 2 deletions crates/connected-client/src/connected_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use fluence_keypair::KeyPair;
use fluence_libp2p::Transport;
use libp2p::{core::Multiaddr, PeerId};
use local_vm::{make_particle, make_vm, read_args, ParticleDataStore};
use marine_wasmtime_backend::WasmtimeWasmBackend;
use particle_protocol::Particle;
use serde_json::{Value as JValue, Value};
use tempfile::TempDir;
Expand All @@ -35,7 +36,7 @@ use crate::client::Client;
use crate::event::ClientEvent;

#[allow(clippy::upper_case_acronyms)]
type AVM = local_vm::AVMRunner;
type AVM = local_vm::AVMRunner<WasmtimeWasmBackend>;

pub struct ConnectedClient {
pub client: Client,
Expand Down Expand Up @@ -158,7 +159,7 @@ impl ConnectedClient {
self.local_vm
.get_or_init(|| async {
let dir = self.tmp_dir.path();
tokio::sync::Mutex::new(make_vm(dir))
tokio::sync::Mutex::new(make_vm(dir).await)
})
.await
}
Expand Down
1 change: 1 addition & 0 deletions crates/local-vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ air-interpreter-fs = { workspace = true }
fluence-keypair = { workspace = true }
air-interpreter-wasm = { workspace = true }
avm-server = { workspace = true }
marine-wasmtime-backend = { workspace = true }
libp2p = { workspace = true }
fstrings = { workspace = true }
serde_json = { workspace = true }
Expand Down
42 changes: 32 additions & 10 deletions crates/local-vm/src/local_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use avm_server::avm_runner::{AVMRunner, RawAVMOutcome};
use avm_server::{CallResults, CallServiceResult};
use fstrings::f;
use libp2p::PeerId;
use marine_wasmtime_backend::{WasmtimeConfig, WasmtimeWasmBackend};
use serde_json::{json, Value as JValue};

use air_interpreter_fs::{air_interpreter_path, write_default_air_interpreter};
Expand Down Expand Up @@ -170,19 +171,38 @@ pub fn host_call(data: &HashMap<String, JValue>, args: Args) -> (CallServiceResu
(outcome, result.returned)
}

pub fn make_vm(tmp_dir_path: &Path) -> AVMRunner {
pub fn make_wasm_backend() -> WasmtimeWasmBackend {
let mut wasmtime_config = WasmtimeConfig::default();
// TODO async-marine: impl proper configuration
wasmtime_config
.debug_info(true)
.wasm_backtrace(true)
.epoch_interruption(true)
.async_wasm_stack(2 * 1024 * 1024)
.max_wasm_stack(2 * 1024 * 1024);
WasmtimeWasmBackend::new(wasmtime_config).expect("Cannot create WasmtimeWasmBackend")
}

pub async fn make_vm(tmp_dir_path: &Path) -> AVMRunner<WasmtimeWasmBackend> {
let interpreter = air_interpreter_path(tmp_dir_path);
write_default_air_interpreter(&interpreter).expect("write air interpreter");

AVMRunner::new(interpreter, None, <_>::default(), i32::MAX)
.map_err(|err| {
log::error!("\n\n\nFailed to create local AVM: {:#?}\n\n\n", err);
AVMRunner::new(
interpreter,
None,
<_>::default(),
i32::MAX,
make_wasm_backend(),
)
.await
.map_err(|err| {
log::error!("\n\n\nFailed to create local AVM: {:#?}\n\n\n", err);

println!("\n\n\nFailed to create local AVM: {err:#?}\n\n\n");
println!("\n\n\nFailed to create local AVM: {err:#?}\n\n\n");

err
})
.expect("vm should be created")
err
})
.expect("vm should be created")
}

pub fn wrap_script(
Expand Down Expand Up @@ -236,7 +256,7 @@ pub async fn make_particle(
service_in: &HashMap<String, JValue>,
script: String,
relay: impl Into<Option<PeerId>>,
local_vm: &mut AVMRunner,
local_vm: &mut AVMRunner<WasmtimeWasmBackend>,
data_store: Arc<ParticleDataStore>,
generated: bool,
particle_ttl: Duration,
Expand Down Expand Up @@ -289,6 +309,7 @@ pub async fn make_particle(
key_pair,
id.clone(),
)
.await
.expect("execute & make particle");

data_store
Expand Down Expand Up @@ -330,7 +351,7 @@ pub async fn make_particle(
pub async fn read_args(
particle: Particle,
peer_id: PeerId,
local_vm: &mut AVMRunner,
local_vm: &mut AVMRunner<WasmtimeWasmBackend>,
data_store: Arc<ParticleDataStore>,
key_pair: &KeyPair,
) -> Option<Result<Vec<JValue>, Vec<JValue>>> {
Expand Down Expand Up @@ -363,6 +384,7 @@ pub async fn read_args(
key_pair,
particle.id.clone(),
)
.await
.expect("execute & make particle");

data_store
Expand Down
Loading
Loading