Skip to content

Commit

Permalink
feat: use particle signature as id for actors [NET-540] (#1877)
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh authored Nov 7, 2023
1 parent e0de88c commit da84a39
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ cid = "0.10.1"
libipld = "0.16.0"
axum = "0.6.20"
hyper = "0.14.27"
faster-hex = "0.8.1"
once_cell = "1.18.0"

# Enable a small amount of optimization in debug mode
Expand Down
22 changes: 14 additions & 8 deletions aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,21 @@ where
self.future.is_some()
}

pub fn cleanup(
&self,
particle_id: &str,
current_peer_id: &str,
vm: &mut RT,
) -> eyre::Result<()> {
pub fn cleanup(&self, vm: &mut RT) {
tracing::debug!(
target: "particle_reap",
particle_id = self.particle.id, worker_id = self.current_peer_id.to_string(),
"Reaping particle's actor"
);
// TODO: remove dirs without using vm https://github.com/fluencelabs/fluence/issues/1216
vm.cleanup(particle_id, current_peer_id)?;
Ok(())
// TODO: ??? we don't have this issue anymore
if let Err(err) = vm.cleanup(&self.particle.id, &self.current_peer_id.to_string()) {
tracing::warn!(
particle_id = self.particle.id,
"Error cleaning up after particle {:?}",
err
);
}
}

pub fn mailbox_size(&self) -> usize {
Expand Down
1 change: 1 addition & 0 deletions aquamarine/src/particle_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl<RT: AquaRuntime> ParticleExecutor for RT {
let particle_params = ParticleParameters {
current_peer_id: Cow::Owned(current_peer_id.to_string()),
init_peer_id: Cow::Owned(particle.init_peer_id.to_string()),
// we use signature hex as particle id to prevent compromising of particle data store
particle_id: Cow::Borrowed(&particle.id),
timestamp: particle.timestamp,
ttl: particle.ttl,
Expand Down
22 changes: 7 additions & 15 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ use crate::particle_effects::RoutingEffects;
use crate::particle_functions::Functions;
use crate::vm_pool::VmPool;

type ParticleId = String;
/// particle signature is used as a particle id
#[derive(PartialEq, Hash, Eq)]
struct ParticleId(Vec<u8>);

pub struct Plumber<RT: AquaRuntime, F> {
events: VecDeque<Result<RoutingEffects, AquamarineApiError>>,
Expand Down Expand Up @@ -103,7 +105,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
}

let builtins = &self.builtins;
let key = (particle.id.clone(), worker_id);
let key = (ParticleId(particle.signature.clone()), worker_id);
let entry = self.actors.entry(key);

let actor = match entry {
Expand Down Expand Up @@ -224,27 +226,17 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
if let Some((vm_id, mut vm)) = self.vm_pool.get_vm() {
let now = now_ms();

self.actors.retain(|(particle_id, worker_id), actor| {
self.actors.retain(|_, actor| {
// if actor hasn't yet expired or is still executing, keep it
// TODO: if actor is expired, cancel execution and return VM back to pool
// https://github.com/fluencelabs/fluence/issues/1212
if !actor.is_expired(now) || actor.is_executing() {
return true; // keep actor
}
tracing::debug!(
target: "particle_reap",
particle_id = particle_id, worker_id = worker_id.to_string(),
"Reaping particle's actor"
);

// cleanup files and dirs after particle processing (vault & prev_data)
// TODO: do not pass vm https://github.com/fluencelabs/fluence/issues/1216
if let Err(err) = actor.cleanup(particle_id, &worker_id.to_string(), &mut vm) {
tracing::warn!(
particle_id = particle_id,
"Error cleaning up after particle {:?}",
err
)
}
actor.cleanup(&mut vm);
false // remove actor
});

Expand Down
2 changes: 1 addition & 1 deletion crates/air-interpreter-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ eyre = { workspace = true }
serde = { version = "1.0.190", features = ["derive"] }
serde_json = { workspace = true }
blake3 = "1.5.0"
faster-hex = "0.8.1"
faster-hex = { workspace = true }
2 changes: 1 addition & 1 deletion crates/service-modules/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ cid-utils = { workspace = true }

eyre = { workspace = true }
serde = { version = "1.0.190", features = ["derive"] }
faster-hex = "0.8.1"
faster-hex = { workspace = true }
maplit = { workspace = true }
serde_json = { workspace = true }
libipld = { workspace = true }
1 change: 1 addition & 0 deletions particle-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ log = { workspace = true }
derivative = { workspace = true }
base64 = { workspace = true }
tokio = { workspace = true }
faster-hex = { workspace = true }

[dev-dependencies]
rand = "0.8.5"
Expand Down

0 comments on commit da84a39

Please sign in to comment.