diff --git a/aquamarine/src/particle_data_store.rs b/aquamarine/src/particle_data_store.rs index 8bb03e9544..6a1f5915b8 100644 --- a/aquamarine/src/particle_data_store.rs +++ b/aquamarine/src/particle_data_store.rs @@ -126,15 +126,14 @@ impl ParticleDataStore { let futures: FuturesUnordered<_> = cleanup_keys .into_iter() .map(|(particle_id, peer_id, signature)| async move { - let peer_id = peer_id.to_string(); tracing::debug!( target: "particle_reap", - particle_id = particle_id, worker_id = peer_id, + particle_id = particle_id, worker_id = peer_id.to_base58(), "Reaping particle's actor" ); if let Err(err) = self - .cleanup_data(particle_id.as_str(), peer_id.as_str(), &signature) + .cleanup_data(particle_id.as_str(), peer_id, &signature) .await { tracing::warn!( @@ -151,11 +150,11 @@ impl ParticleDataStore { async fn cleanup_data( &self, particle_id: &str, - current_peer_id: &str, + current_peer_id: PeerId, signature: &[u8], ) -> Result<()> { tracing::debug!(target: "particle_reap", particle_id = particle_id, "Cleaning up particle data for particle"); - let path = self.data_file(particle_id, current_peer_id, signature); + let path = self.data_file(particle_id, ¤t_peer_id.to_base58(), signature); match tokio::fs::remove_file(&path).await { Ok(_) => Ok(()), // ignore NotFound @@ -163,7 +162,7 @@ impl ParticleDataStore { Err(err) => Err(DataStoreError::CleanupData(err)), }?; - self.vault.cleanup(particle_id).await?; + self.vault.cleanup(current_peer_id, particle_id).await?; Ok(()) } diff --git a/particle-builtins/src/builtins.rs b/particle-builtins/src/builtins.rs index a8aaa91c6d..bb1bfd655e 100644 --- a/particle-builtins/src/builtins.rs +++ b/particle-builtins/src/builtins.rs @@ -621,6 +621,7 @@ where let module_hash = self.modules.add_module_from_vault( &self.services.vault, + self.scopes.to_peer_id(params.peer_scope), module_path, config, params, @@ -649,6 +650,7 @@ where let config = ModuleRepository::load_module_config_from_vault( &self.services.vault, + self.scopes.to_peer_id(params.peer_scope), config_path, params, )?; @@ -694,10 +696,11 @@ where let mut args = args.function_args.into_iter(); let blueprint_path: String = Args::next("blueprint_path", &mut args)?; - let data = self - .services - .vault - .cat_slice(¶ms, Path::new(&blueprint_path))?; + let current_peer_id = self.scopes.to_peer_id(params.peer_scope); + let data = + self.services + .vault + .cat_slice(current_peer_id, ¶ms, Path::new(&blueprint_path))?; let blueprint = AddBlueprint::decode(&data).map_err(|err| { JError::new(format!( "Error parsing blueprint from vault {blueprint_path:?}: {err}" @@ -986,7 +989,11 @@ where let mut args = args.function_args.into_iter(); let data: String = Args::next("data", &mut args)?; let name = uuid(); - let virtual_path = self.services.vault.put(¶ms, name, &data)?; + let current_peer_id = self.scopes.to_peer_id(params.peer_scope); + let virtual_path = self + .services + .vault + .put(current_peer_id, ¶ms, name, &data)?; Ok(JValue::String(virtual_path.display().to_string())) } @@ -994,9 +1001,10 @@ where fn vault_cat(&self, args: Args, params: ParticleParams) -> Result { let mut args = args.function_args.into_iter(); let path: String = Args::next("path", &mut args)?; + let current_peer_id = self.scopes.to_peer_id(params.peer_scope); self.services .vault - .cat(¶ms, Path::new(&path)) + .cat(current_peer_id, ¶ms, Path::new(&path)) .map(JValue::String) .map_err(|_| JError::new(format!("Error reading vault file `{path}`"))) } diff --git a/particle-execution/src/lib.rs b/particle-execution/src/lib.rs index b95d2e7859..4ad18a516d 100644 --- a/particle-execution/src/lib.rs +++ b/particle-execution/src/lib.rs @@ -7,7 +7,7 @@ pub use particle_function::{ ParticleFunctionStatic, ServiceFunction, ServiceFunctionImmut, ServiceFunctionMut, }; pub use particle_params::ParticleParams; -pub use particle_vault::{ParticleVault, VaultError, VIRTUAL_PARTICLE_HOST_VAULT_PREFIX}; +pub use particle_vault::{ParticleVault, VaultError, VIRTUAL_PARTICLE_VAULT_PREFIX}; mod function_outcome; mod particle_function; diff --git a/particle-execution/src/particle_vault.rs b/particle-execution/src/particle_vault.rs index f0a59758d1..88958726f1 100644 --- a/particle-execution/src/particle_vault.rs +++ b/particle-execution/src/particle_vault.rs @@ -19,18 +19,16 @@ use std::io::ErrorKind; use std::path; use std::path::{Path, PathBuf}; +use fluence_libp2p::PeerId; use thiserror::Error; use fs_utils::create_dir; -use types::peer_scope::PeerScope; use crate::ParticleParams; use crate::VaultError::WrongVault; use VaultError::{CleanupVault, CreateVault, InitializeVault}; -// TODO: how to make read-only for workers? -pub const VIRTUAL_PARTICLE_HOST_VAULT_PREFIX: &str = "/tmp/vault"; -pub const VIRTUAL_PARTICLE_WORKER_VAULT_PREFIX: &str = "/tmp/worker_vault"; +pub const VIRTUAL_PARTICLE_VAULT_PREFIX: &str = "/tmp/vault"; #[derive(Debug, Clone)] pub struct ParticleVault { @@ -42,14 +40,18 @@ impl ParticleVault { Self { vault_dir } } + pub fn real_worker_particle_vault(&self, peer_id: PeerId) -> PathBuf { + self.vault_dir.join(peer_id.to_base58()) + } + /// Returns Particle File Vault path on Nox's filesystem - pub fn particle_vault(&self, particle_id: &str) -> PathBuf { - self.vault_dir.join(particle_id) + pub fn real_particle_vault(&self, peer_id: PeerId, particle_id: &str) -> PathBuf { + self.real_worker_particle_vault(peer_id).join(particle_id) } /// Returns Particle File Vault path on Marine's filesystem (ie how it would look like inside service) pub fn virtual_particle_vault(&self, particle_id: &str) -> PathBuf { - Path::new(VIRTUAL_PARTICLE_HOST_VAULT_PREFIX).join(particle_id) + Path::new(VIRTUAL_PARTICLE_VAULT_PREFIX).join(particle_id) } pub async fn initialize(&self) -> Result<(), VaultError> { @@ -60,31 +62,21 @@ impl ParticleVault { Ok(()) } - pub async fn initialize_scoped(&self, peer_scope: &PeerScope) -> Result<(), VaultError> { - match peer_scope { - PeerScope::Host => self.initialize().await, - PeerScope::WorkerId(_worker_id) => {} - } - } - - pub fn create(&self, particle: &ParticleParams) -> Result<(), VaultError> { - let path = self.particle_vault(&particle.id); + pub fn create(&self, current_peer_id: PeerId, particle_id: &str) -> Result<(), VaultError> { + let path = self.real_particle_vault(current_peer_id, particle_id); create_dir(path).map_err(CreateVault)?; Ok(()) } - pub fn exists(&self, particle: &ParticleParams) -> bool { - self.particle_vault(&particle.id).exists() - } - pub fn put( &self, + current_peer_id: PeerId, particle: &ParticleParams, filename: String, payload: &str, ) -> Result { - let vault_dir = self.particle_vault(&particle.id); + let vault_dir = self.real_particle_vault(current_peer_id, &particle.id); // Note that we can't use `to_real_path` here since the target file cannot exist yet, // but `to_real_path` do path normalization which requires existence of the file to resolve // symlinks. @@ -96,15 +88,16 @@ impl ParticleVault { std::fs::write(real_path.clone(), payload.as_bytes()) .map_err(|e| VaultError::WriteVault(e, filename))?; - self.to_virtual_path(&real_path, &particle.id) + self.to_virtual_path(current_peer_id, &particle, &real_path) } pub fn cat( &self, + current_peer_id: PeerId, particle: &ParticleParams, virtual_path: &Path, ) -> Result { - let real_path = self.to_real_path(virtual_path, &particle.id)?; + let real_path = self.to_real_path(current_peer_id, particle, virtual_path)?; let contents = std::fs::read_to_string(real_path) .map_err(|e| VaultError::ReadVault(e, virtual_path.to_path_buf()))?; @@ -114,15 +107,16 @@ impl ParticleVault { pub fn cat_slice( &self, + current_peer_id: PeerId, particle: &ParticleParams, virtual_path: &Path, ) -> Result, VaultError> { - let real_path = self.to_real_path(virtual_path, &particle.id)?; + let real_path = self.to_real_path(current_peer_id, particle, virtual_path)?; std::fs::read(real_path).map_err(|e| VaultError::ReadVault(e, virtual_path.to_path_buf())) } - pub async fn cleanup(&self, particle_id: &str) -> Result<(), VaultError> { - let path = self.particle_vault(particle_id); + pub async fn cleanup(&self, peer_id: PeerId, particle_id: &str) -> Result<(), VaultError> { + let path = self.real_particle_vault(peer_id, particle_id); match tokio::fs::remove_dir_all(&path).await { Ok(_) => Ok(()), // ignore NotFound @@ -135,9 +129,14 @@ impl ParticleVault { /// Converts real path in `vault_dir` to virtual path with `VIRTUAL_PARTICLE_VAULT_PREFIX`. /// Virtual path looks like `/tmp/vault//`. - fn to_virtual_path(&self, path: &Path, particle_id: &str) -> Result { - let virtual_prefix = self.virtual_particle_vault(particle_id); - let real_prefix = self.particle_vault(particle_id); + fn to_virtual_path( + &self, + current_peer_id: PeerId, + particle: &ParticleParams, + path: &Path, + ) -> Result { + let virtual_prefix = self.virtual_particle_vault(&particle.id); + let real_prefix = self.real_particle_vault(current_peer_id, &particle.id); let rest = path .strip_prefix(&real_prefix) .map_err(|e| WrongVault(Some(e), path.to_path_buf(), real_prefix))?; @@ -149,21 +148,27 @@ impl ParticleVault { /// Support full paths to the file in the vault starting this the prefix as well as relative paths /// inside the vault. /// For example, `some/file.txt` is valid and will be resolved to `REAL_PARTICLE_VAULT_PREFIX/some/file.txt`. - fn to_real_path(&self, path: &Path, particle_id: &str) -> Result { - let rest = if path.has_root() { + fn to_real_path( + &self, + current_peer_id: PeerId, + particle: &ParticleParams, + virtual_path: &Path, + ) -> Result { + let rest = if virtual_path.has_root() { // If path starts with the `/` then we consider it a full path containing the virtual vault prefix - let virtual_prefix = self.virtual_particle_vault(particle_id); - path.strip_prefix(&virtual_prefix) - .map_err(|e| WrongVault(Some(e), path.to_path_buf(), virtual_prefix.clone()))? + let virtual_prefix = self.virtual_particle_vault(&particle.id); + virtual_path.strip_prefix(&virtual_prefix).map_err(|e| { + WrongVault(Some(e), virtual_path.to_path_buf(), virtual_prefix.clone()) + })? } else { // Otherwise we consider it a relative path inside the vault - path + virtual_path }; - let real_prefix = self.particle_vault(particle_id); + let real_prefix = self.real_particle_vault(current_peer_id, &particle.id); let real_path = real_prefix.join(rest); let resolved_path = real_path .canonicalize() - .map_err(|e| VaultError::NotFound(e, path.to_path_buf()))?; + .map_err(|e| VaultError::NotFound(e, virtual_path.to_path_buf()))?; // Check again after normalization that the path leads to the real particle vault if resolved_path.starts_with(&real_prefix) { Ok(resolved_path) @@ -172,9 +177,9 @@ impl ParticleVault { } } - /// Map `vault_dir` to `/tmp/vault` inside the service. + /// Map `vault_dir/$current-peer-id` to `/tmp/vault` inside the service. /// Particle File Vaults will be available as `/tmp/vault/$particle_id` - pub fn inject_vault(&self, peer_scope: &PeerScope, module: &mut ModuleDescriptor) { + pub fn inject_vault(&self, current_peer_id: PeerId, module: &mut ModuleDescriptor) { let wasi = &mut module.config.wasi; if wasi.is_none() { *wasi = Some(MarineWASIConfig::default()); @@ -182,20 +187,11 @@ impl ParticleVault { // SAFETY: set wasi to Some in the code above let wasi = wasi.as_mut().unwrap(); - // TODO: host path - let vault_dir = self.vault_dir.to_path_buf(); + let vault_dir = self.real_worker_particle_vault(current_peer_id); wasi.preopened_files.insert(vault_dir.clone()); wasi.mapped_dirs - .insert(VIRTUAL_PARTICLE_HOST_VAULT_PREFIX.into(), vault_dir); - if let PeerScope::WorkerId(_worker_id) = peer_scope { - // TODO: worker path - let worker_vault_dir = self.vault_dir.to_path_buf(); - wasi.mapped_dirs.insert( - VIRTUAL_PARTICLE_WORKER_VAULT_PREFIX.into(), - worker_vault_dir, - ); - } + .insert(VIRTUAL_PARTICLE_VAULT_PREFIX.into(), vault_dir); } } diff --git a/particle-modules/Cargo.toml b/particle-modules/Cargo.toml index 2da70d493b..69529cc64a 100644 --- a/particle-modules/Cargo.toml +++ b/particle-modules/Cargo.toml @@ -11,6 +11,7 @@ particle-args = { workspace = true } json-utils = { workspace = true } fs-utils = { workspace = true } service-modules = { workspace = true } +fluence-libp2p = { workspace = true } marine-it-parser = { workspace = true } fluence-app-service = { workspace = true } diff --git a/particle-modules/src/modules.rs b/particle-modules/src/modules.rs index ed0dbb445b..a11608497f 100644 --- a/particle-modules/src/modules.rs +++ b/particle-modules/src/modules.rs @@ -29,6 +29,7 @@ use marine_it_parser::module_interface; use parking_lot::RwLock; use serde_json::{json, Value as JValue}; +use fluence_libp2p::PeerId; use particle_args::JError; use particle_execution::{ParticleParams, ParticleVault}; use service_modules::{ @@ -98,10 +99,12 @@ impl ModuleRepository { pub fn load_module_config_from_vault( vault: &ParticleVault, + // TODO: refactor this out of this crate + current_peer_id: PeerId, config_path: String, particle: ParticleParams, ) -> Result { - let config = vault.cat_slice(&particle, Path::new(&config_path))?; + let config = vault.cat_slice(current_peer_id, &particle, Path::new(&config_path))?; serde_json::from_slice(&config) .map_err(|err| IncorrectVaultModuleConfig { config_path, err }) } @@ -121,11 +124,13 @@ impl ModuleRepository { pub fn add_module_from_vault( &self, vault: &ParticleVault, + // TODO: refactor this out of this crate + current_peer_id: PeerId, module_path: String, config: TomlMarineNamedModuleConfig, particle: ParticleParams, ) -> Result { - let module = vault.cat_slice(&particle, Path::new(&module_path))?; + let module = vault.cat_slice(current_peer_id, &particle, Path::new(&module_path))?; // copy module & config to module_dir let hash = self.add_module(module, config)?; diff --git a/particle-services/src/app_services.rs b/particle-services/src/app_services.rs index 0ed7941448..ef47105334 100644 --- a/particle-services/src/app_services.rs +++ b/particle-services/src/app_services.rs @@ -431,7 +431,8 @@ impl ParticleAppServices { // TODO: move particle vault creation to aquamarine::particle_functions if create_vault { - self.vault.create(&particle)?; + self.vault + .create(self.scopes.to_peer_id(particle.peer_scope), &particle.id)?; } let call_parameters_worker_id = self.scopes.to_peer_id(peer_scope); @@ -931,7 +932,11 @@ impl ParticleAppServices { ) -> Result>, ServiceError> { let creation_start_time = Instant::now(); let service = self - .create_app_service(&peer_scope, blueprint_id.clone(), service_id.clone()) + .create_app_service( + self.scopes.to_peer_id(peer_scope), + blueprint_id.clone(), + service_id.clone(), + ) .inspect_err(|_| { if let Some(metrics) = self.metrics.as_ref() { metrics.observe_created_failed(); @@ -991,14 +996,14 @@ impl ParticleAppServices { fn create_app_service( &self, - peer_scope: &PeerScope, + current_peer_id: PeerId, blueprint_id: String, service_id: String, ) -> Result { let mut modules_config = self.modules.resolve_blueprint(&blueprint_id)?; modules_config .iter_mut() - .for_each(|module| self.vault.inject_vault(peer_scope, module)); + .for_each(|module| self.vault.inject_vault(current_peer_id, module)); let app_config = AppServiceConfig { service_working_dir: self.config.workdir.join(&service_id),