Skip to content

Commit

Permalink
feat(avm): update to avm 0.39.1 (#1627)
Browse files Browse the repository at this point in the history
* chore(avm): update avm & avm-server to 0.39.1

* feat(particle-execution): pass key pair to AVM

* chore: fix compilation

* chore(avm): update avm version to fix error

* fix(particle-execution): wake after creating a future

* chore: fix clippy warnings

* chore: add TODO

* fix(particle-execution): do not clone particle on every AVM::call (#1628)

* fix(particle-execution): do not clone particle on every AVM::call
  • Loading branch information
folex authored Jun 22, 2023
1 parent f95bbcf commit e6b1afa
Show file tree
Hide file tree
Showing 23 changed files with 379 additions and 193 deletions.
57 changes: 36 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ marine-utils = "0.5.0"
marine-it-parser = "0.12.1"

# avm
avm-server = "0.31.0"
air-interpreter-wasm = "=0.38.0"
avm-server = { version = "=0.39.1-feat-VM-276-aquavm-keypair-775a729-1587-1.0", registry = "fluence" }
air-interpreter-wasm = { version = "=0.39.1-feat-VM-276-aquavm-keypair-775a729-1587-1.0", registry = "fluence" }

# libp2p
libp2p = { version = "0.51.3", features = ["noise", "tcp", "dns", "websocket", "yamux", "mplex", "tokio", "kad", "ping", "identify", "macros"] }
Expand Down
75 changes: 52 additions & 23 deletions aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::{
task::{Context, Poll, Waker},
};

use avm_server::CallResults;
use fluence_keypair::KeyPair;
use futures::future::BoxFuture;
use futures::FutureExt;

use fluence_libp2p::PeerId;
Expand All @@ -28,14 +29,19 @@ use particle_protocol::Particle;

use crate::deadline::Deadline;
use crate::particle_effects::RoutingEffects;
use crate::particle_executor::{Fut, FutResult, ParticleExecutor};
use crate::particle_executor::{FutResult, ParticleExecutor};
use crate::particle_functions::{Functions, SingleCallStat};
use crate::{AquaRuntime, InterpretationStats};
use crate::{AquaRuntime, InterpretationStats, ParticleEffects};

struct Reusables<RT> {
vm_id: usize,
vm: Option<RT>,
}

pub struct Actor<RT, F> {
/// Particle of that actor is expired after that deadline
deadline: Deadline,
future: Option<(usize, Fut<RT>)>,
future: Option<BoxFuture<'static, (Reusables<RT>, ParticleEffects, InterpretationStats)>>,
mailbox: VecDeque<Particle>,
waker: Option<Waker>,
functions: Functions<F>,
Expand All @@ -46,14 +52,20 @@ pub struct Actor<RT, F> {
/// Particles and call results will be processed in the security scope of this peer id
/// It's either `host_peer_id` or local worker peer id
current_peer_id: PeerId,
key_pair: KeyPair,
}

impl<RT, F> Actor<RT, F>
where
RT: AquaRuntime + ParticleExecutor<Particle = (Particle, CallResults), Future = Fut<RT>>,
RT: AquaRuntime,
F: ParticleFunctionStatic,
{
pub fn new(particle: &Particle, functions: Functions<F>, current_peer_id: PeerId) -> Self {
pub fn new(
particle: &Particle,
functions: Functions<F>,
current_peer_id: PeerId,
key_pair: KeyPair,
) -> Self {
Self {
deadline: Deadline::from(particle),
functions,
Expand All @@ -71,6 +83,7 @@ where
data: vec![],
},
current_peer_id,
key_pair,
}
}

Expand Down Expand Up @@ -110,33 +123,35 @@ where
pub fn poll_completed(
&mut self,
cx: &mut Context<'_>,
) -> Poll<FutResult<(usize, RT), RoutingEffects, InterpretationStats>> {
) -> Poll<FutResult<(usize, Option<RT>), RoutingEffects, InterpretationStats>> {
use Poll::Ready;

self.waker = Some(cx.waker().clone());

self.functions.poll(cx);

// Poll AquaVM future
if let Some((vm_id, Ready(r))) = self.future.as_mut().map(|(i, f)| (*i, f.poll_unpin(cx))) {
if let Some(Ready((reusables, effects, stats))) =
self.future.as_mut().map(|f| f.poll_unpin(cx))
{
self.future.take();

let waker = cx.waker().clone();
// Schedule execution of functions
self.functions.execute(
r.effects.particle.id.clone(),
r.effects.call_requests,
waker,
);
self.functions
.execute(self.particle.id.clone(), effects.call_requests, waker);

let effects = RoutingEffects {
particle: r.effects.particle,
next_peers: r.effects.next_peers,
particle: Particle {
data: effects.new_data,
..self.particle.clone()
},
next_peers: effects.next_peers,
};
return Poll::Ready(FutResult {
vm: r.vm.map(|vm| (vm_id, vm)),
return Ready(FutResult {
runtime: (reusables.vm_id, reusables.vm),
effects,
stats: r.stats,
stats,
});
}

Expand Down Expand Up @@ -175,12 +190,26 @@ where
self.particle.clone()
});
let waker = cx.waker().clone();
// TODO: add timeout for execution https://github.com/fluencelabs/fluence/issues/1212
// Take ownership of vm to process particle
self.future = Some((
vm_id,
vm.execute((particle, calls), waker, self.current_peer_id),
));
let peer_id = self.current_peer_id;
// TODO: get rid of this clone by recovering key_pair after `vm.execute` (not trivial to implement)
let key_pair = self.key_pair.clone();
// TODO: add timeout for execution https://github.com/fluencelabs/fluence/issues/1212
self.future = Some(
async move {
let res = vm
.execute((particle, calls), waker, peer_id, key_pair)
.await;

let reusables = Reusables {
vm_id,
vm: res.runtime,
};
(reusables, res.effects, res.stats)
}
.boxed(),
);
self.wake();

ActorPoll::Executing(stats)
}
Expand Down
Loading

0 comments on commit e6b1afa

Please sign in to comment.