Skip to content

Commit

Permalink
use worker-id in particle vault name
Browse files Browse the repository at this point in the history
  • Loading branch information
kmd-fl committed Feb 21, 2024
1 parent 505fd72 commit 65d37d9
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 69 deletions.
11 changes: 5 additions & 6 deletions aquamarine/src/particle_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,14 @@ impl ParticleDataStore {
let futures: FuturesUnordered<_> = cleanup_keys
.into_iter()
.map(|(particle_id, peer_id, signature)| async move {
let peer_id = peer_id.to_string();
tracing::debug!(
target: "particle_reap",
particle_id = particle_id, worker_id = peer_id,
particle_id = particle_id, worker_id = peer_id.to_base58(),
"Reaping particle's actor"
);

if let Err(err) = self
.cleanup_data(particle_id.as_str(), peer_id.as_str(), &signature)
.cleanup_data(particle_id.as_str(), peer_id, &signature)
.await
{
tracing::warn!(
Expand All @@ -151,19 +150,19 @@ impl ParticleDataStore {
async fn cleanup_data(
&self,
particle_id: &str,
current_peer_id: &str,
current_peer_id: PeerId,
signature: &[u8],
) -> Result<()> {
tracing::debug!(target: "particle_reap", particle_id = particle_id, "Cleaning up particle data for particle");
let path = self.data_file(particle_id, current_peer_id, signature);
let path = self.data_file(particle_id, &current_peer_id.to_base58(), signature);
match tokio::fs::remove_file(&path).await {
Ok(_) => Ok(()),
// ignore NotFound
Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
Err(err) => Err(DataStoreError::CleanupData(err)),
}?;

self.vault.cleanup(particle_id).await?;
self.vault.cleanup(current_peer_id, particle_id).await?;

Ok(())
}
Expand Down
20 changes: 14 additions & 6 deletions particle-builtins/src/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ where

let module_hash = self.modules.add_module_from_vault(
&self.services.vault,
self.scopes.to_peer_id(params.peer_scope),
module_path,
config,
params,
Expand Down Expand Up @@ -649,6 +650,7 @@ where

let config = ModuleRepository::load_module_config_from_vault(
&self.services.vault,
self.scopes.to_peer_id(params.peer_scope),
config_path,
params,
)?;
Expand Down Expand Up @@ -694,10 +696,11 @@ where
let mut args = args.function_args.into_iter();
let blueprint_path: String = Args::next("blueprint_path", &mut args)?;

let data = self
.services
.vault
.cat_slice(&params, Path::new(&blueprint_path))?;
let current_peer_id = self.scopes.to_peer_id(params.peer_scope);
let data =
self.services
.vault
.cat_slice(current_peer_id, &params, Path::new(&blueprint_path))?;
let blueprint = AddBlueprint::decode(&data).map_err(|err| {
JError::new(format!(
"Error parsing blueprint from vault {blueprint_path:?}: {err}"
Expand Down Expand Up @@ -986,17 +989,22 @@ where
let mut args = args.function_args.into_iter();
let data: String = Args::next("data", &mut args)?;
let name = uuid();
let virtual_path = self.services.vault.put(&params, name, &data)?;
let current_peer_id = self.scopes.to_peer_id(params.peer_scope);
let virtual_path = self
.services
.vault
.put(current_peer_id, &params, name, &data)?;

Ok(JValue::String(virtual_path.display().to_string()))
}

fn vault_cat(&self, args: Args, params: ParticleParams) -> Result<JValue, JError> {
let mut args = args.function_args.into_iter();
let path: String = Args::next("path", &mut args)?;
let current_peer_id = self.scopes.to_peer_id(params.peer_scope);
self.services
.vault
.cat(&params, Path::new(&path))
.cat(current_peer_id, &params, Path::new(&path))
.map(JValue::String)
.map_err(|_| JError::new(format!("Error reading vault file `{path}`")))
}
Expand Down
2 changes: 1 addition & 1 deletion particle-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use particle_function::{
ParticleFunctionStatic, ServiceFunction, ServiceFunctionImmut, ServiceFunctionMut,
};
pub use particle_params::ParticleParams;
pub use particle_vault::{ParticleVault, VaultError, VIRTUAL_PARTICLE_HOST_VAULT_PREFIX};
pub use particle_vault::{ParticleVault, VaultError, VIRTUAL_PARTICLE_VAULT_PREFIX};

mod function_outcome;
mod particle_function;
Expand Down
96 changes: 46 additions & 50 deletions particle-execution/src/particle_vault.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@ use std::io::ErrorKind;
use std::path;
use std::path::{Path, PathBuf};

use fluence_libp2p::PeerId;
use thiserror::Error;

use fs_utils::create_dir;
use types::peer_scope::PeerScope;

use crate::ParticleParams;
use crate::VaultError::WrongVault;
use VaultError::{CleanupVault, CreateVault, InitializeVault};

// TODO: how to make read-only for workers?
pub const VIRTUAL_PARTICLE_HOST_VAULT_PREFIX: &str = "/tmp/vault";
pub const VIRTUAL_PARTICLE_WORKER_VAULT_PREFIX: &str = "/tmp/worker_vault";
pub const VIRTUAL_PARTICLE_VAULT_PREFIX: &str = "/tmp/vault";

#[derive(Debug, Clone)]
pub struct ParticleVault {
Expand All @@ -42,14 +40,18 @@ impl ParticleVault {
Self { vault_dir }
}

pub fn real_worker_particle_vault(&self, peer_id: PeerId) -> PathBuf {
self.vault_dir.join(peer_id.to_base58())
}

/// Returns Particle File Vault path on Nox's filesystem
pub fn particle_vault(&self, particle_id: &str) -> PathBuf {
self.vault_dir.join(particle_id)
pub fn real_particle_vault(&self, peer_id: PeerId, particle_id: &str) -> PathBuf {
self.real_worker_particle_vault(peer_id).join(particle_id)
}

/// Returns Particle File Vault path on Marine's filesystem (ie how it would look like inside service)
pub fn virtual_particle_vault(&self, particle_id: &str) -> PathBuf {
Path::new(VIRTUAL_PARTICLE_HOST_VAULT_PREFIX).join(particle_id)
Path::new(VIRTUAL_PARTICLE_VAULT_PREFIX).join(particle_id)
}

pub async fn initialize(&self) -> Result<(), VaultError> {
Expand All @@ -60,31 +62,21 @@ impl ParticleVault {
Ok(())
}

pub async fn initialize_scoped(&self, peer_scope: &PeerScope) -> Result<(), VaultError> {
match peer_scope {
PeerScope::Host => self.initialize().await,
PeerScope::WorkerId(_worker_id) => {}
}
}

pub fn create(&self, particle: &ParticleParams) -> Result<(), VaultError> {
let path = self.particle_vault(&particle.id);
pub fn create(&self, current_peer_id: PeerId, particle_id: &str) -> Result<(), VaultError> {
let path = self.real_particle_vault(current_peer_id, particle_id);
create_dir(path).map_err(CreateVault)?;

Ok(())
}

pub fn exists(&self, particle: &ParticleParams) -> bool {
self.particle_vault(&particle.id).exists()
}

pub fn put(
&self,
current_peer_id: PeerId,
particle: &ParticleParams,
filename: String,
payload: &str,
) -> Result<PathBuf, VaultError> {
let vault_dir = self.particle_vault(&particle.id);
let vault_dir = self.real_particle_vault(current_peer_id, &particle.id);
// Note that we can't use `to_real_path` here since the target file cannot exist yet,
// but `to_real_path` do path normalization which requires existence of the file to resolve
// symlinks.
Expand All @@ -96,15 +88,16 @@ impl ParticleVault {
std::fs::write(real_path.clone(), payload.as_bytes())
.map_err(|e| VaultError::WriteVault(e, filename))?;

self.to_virtual_path(&real_path, &particle.id)
self.to_virtual_path(current_peer_id, &particle, &real_path)
}

pub fn cat(
&self,
current_peer_id: PeerId,
particle: &ParticleParams,
virtual_path: &Path,
) -> Result<String, VaultError> {
let real_path = self.to_real_path(virtual_path, &particle.id)?;
let real_path = self.to_real_path(current_peer_id, particle, virtual_path)?;

let contents = std::fs::read_to_string(real_path)
.map_err(|e| VaultError::ReadVault(e, virtual_path.to_path_buf()))?;
Expand All @@ -114,15 +107,16 @@ impl ParticleVault {

pub fn cat_slice(
&self,
current_peer_id: PeerId,
particle: &ParticleParams,
virtual_path: &Path,
) -> Result<Vec<u8>, VaultError> {
let real_path = self.to_real_path(virtual_path, &particle.id)?;
let real_path = self.to_real_path(current_peer_id, particle, virtual_path)?;
std::fs::read(real_path).map_err(|e| VaultError::ReadVault(e, virtual_path.to_path_buf()))
}

pub async fn cleanup(&self, particle_id: &str) -> Result<(), VaultError> {
let path = self.particle_vault(particle_id);
pub async fn cleanup(&self, peer_id: PeerId, particle_id: &str) -> Result<(), VaultError> {
let path = self.real_particle_vault(peer_id, particle_id);
match tokio::fs::remove_dir_all(&path).await {
Ok(_) => Ok(()),
// ignore NotFound
Expand All @@ -135,9 +129,14 @@ impl ParticleVault {

/// Converts real path in `vault_dir` to virtual path with `VIRTUAL_PARTICLE_VAULT_PREFIX`.
/// Virtual path looks like `/tmp/vault/<particle_id>/<path>`.
fn to_virtual_path(&self, path: &Path, particle_id: &str) -> Result<PathBuf, VaultError> {
let virtual_prefix = self.virtual_particle_vault(particle_id);
let real_prefix = self.particle_vault(particle_id);
fn to_virtual_path(
&self,
current_peer_id: PeerId,
particle: &ParticleParams,
path: &Path,
) -> Result<PathBuf, VaultError> {
let virtual_prefix = self.virtual_particle_vault(&particle.id);
let real_prefix = self.real_particle_vault(current_peer_id, &particle.id);
let rest = path
.strip_prefix(&real_prefix)
.map_err(|e| WrongVault(Some(e), path.to_path_buf(), real_prefix))?;
Expand All @@ -149,21 +148,27 @@ impl ParticleVault {
/// Support full paths to the file in the vault starting this the prefix as well as relative paths
/// inside the vault.
/// For example, `some/file.txt` is valid and will be resolved to `REAL_PARTICLE_VAULT_PREFIX/some/file.txt`.
fn to_real_path(&self, path: &Path, particle_id: &str) -> Result<PathBuf, VaultError> {
let rest = if path.has_root() {
fn to_real_path(
&self,
current_peer_id: PeerId,
particle: &ParticleParams,
virtual_path: &Path,
) -> Result<PathBuf, VaultError> {
let rest = if virtual_path.has_root() {
// If path starts with the `/` then we consider it a full path containing the virtual vault prefix
let virtual_prefix = self.virtual_particle_vault(particle_id);
path.strip_prefix(&virtual_prefix)
.map_err(|e| WrongVault(Some(e), path.to_path_buf(), virtual_prefix.clone()))?
let virtual_prefix = self.virtual_particle_vault(&particle.id);
virtual_path.strip_prefix(&virtual_prefix).map_err(|e| {
WrongVault(Some(e), virtual_path.to_path_buf(), virtual_prefix.clone())
})?
} else {
// Otherwise we consider it a relative path inside the vault
path
virtual_path
};
let real_prefix = self.particle_vault(particle_id);
let real_prefix = self.real_particle_vault(current_peer_id, &particle.id);
let real_path = real_prefix.join(rest);
let resolved_path = real_path
.canonicalize()
.map_err(|e| VaultError::NotFound(e, path.to_path_buf()))?;
.map_err(|e| VaultError::NotFound(e, virtual_path.to_path_buf()))?;
// Check again after normalization that the path leads to the real particle vault
if resolved_path.starts_with(&real_prefix) {
Ok(resolved_path)
Expand All @@ -172,30 +177,21 @@ impl ParticleVault {
}
}

/// Map `vault_dir` to `/tmp/vault` inside the service.
/// Map `vault_dir/$current-peer-id` to `/tmp/vault` inside the service.
/// Particle File Vaults will be available as `/tmp/vault/$particle_id`
pub fn inject_vault(&self, peer_scope: &PeerScope, module: &mut ModuleDescriptor) {
pub fn inject_vault(&self, current_peer_id: PeerId, module: &mut ModuleDescriptor) {
let wasi = &mut module.config.wasi;
if wasi.is_none() {
*wasi = Some(MarineWASIConfig::default());
}
// SAFETY: set wasi to Some in the code above
let wasi = wasi.as_mut().unwrap();

// TODO: host path
let vault_dir = self.vault_dir.to_path_buf();
let vault_dir = self.real_worker_particle_vault(current_peer_id);

wasi.preopened_files.insert(vault_dir.clone());
wasi.mapped_dirs
.insert(VIRTUAL_PARTICLE_HOST_VAULT_PREFIX.into(), vault_dir);
if let PeerScope::WorkerId(_worker_id) = peer_scope {
// TODO: worker path
let worker_vault_dir = self.vault_dir.to_path_buf();
wasi.mapped_dirs.insert(
VIRTUAL_PARTICLE_WORKER_VAULT_PREFIX.into(),
worker_vault_dir,
);
}
.insert(VIRTUAL_PARTICLE_VAULT_PREFIX.into(), vault_dir);
}
}

Expand Down
1 change: 1 addition & 0 deletions particle-modules/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ particle-args = { workspace = true }
json-utils = { workspace = true }
fs-utils = { workspace = true }
service-modules = { workspace = true }
fluence-libp2p = { workspace = true }

marine-it-parser = { workspace = true }
fluence-app-service = { workspace = true }
Expand Down
9 changes: 7 additions & 2 deletions particle-modules/src/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use marine_it_parser::module_interface;
use parking_lot::RwLock;
use serde_json::{json, Value as JValue};

use fluence_libp2p::PeerId;
use particle_args::JError;
use particle_execution::{ParticleParams, ParticleVault};
use service_modules::{
Expand Down Expand Up @@ -98,10 +99,12 @@ impl ModuleRepository {

pub fn load_module_config_from_vault(
vault: &ParticleVault,
// TODO: refactor this out of this crate
current_peer_id: PeerId,
config_path: String,
particle: ParticleParams,
) -> Result<TomlMarineNamedModuleConfig> {
let config = vault.cat_slice(&particle, Path::new(&config_path))?;
let config = vault.cat_slice(current_peer_id, &particle, Path::new(&config_path))?;
serde_json::from_slice(&config)
.map_err(|err| IncorrectVaultModuleConfig { config_path, err })
}
Expand All @@ -121,11 +124,13 @@ impl ModuleRepository {
pub fn add_module_from_vault(
&self,
vault: &ParticleVault,
// TODO: refactor this out of this crate
current_peer_id: PeerId,
module_path: String,
config: TomlMarineNamedModuleConfig,
particle: ParticleParams,
) -> Result<String> {
let module = vault.cat_slice(&particle, Path::new(&module_path))?;
let module = vault.cat_slice(current_peer_id, &particle, Path::new(&module_path))?;
// copy module & config to module_dir
let hash = self.add_module(module, config)?;

Expand Down
13 changes: 9 additions & 4 deletions particle-services/src/app_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,8 @@ impl ParticleAppServices {

// TODO: move particle vault creation to aquamarine::particle_functions
if create_vault {
self.vault.create(&particle)?;
self.vault
.create(self.scopes.to_peer_id(particle.peer_scope), &particle.id)?;
}

let call_parameters_worker_id = self.scopes.to_peer_id(peer_scope);
Expand Down Expand Up @@ -931,7 +932,11 @@ impl ParticleAppServices {
) -> Result<Option<Arc<Service>>, ServiceError> {
let creation_start_time = Instant::now();
let service = self
.create_app_service(&peer_scope, blueprint_id.clone(), service_id.clone())
.create_app_service(
self.scopes.to_peer_id(peer_scope),
blueprint_id.clone(),
service_id.clone(),
)
.inspect_err(|_| {
if let Some(metrics) = self.metrics.as_ref() {
metrics.observe_created_failed();
Expand Down Expand Up @@ -991,14 +996,14 @@ impl ParticleAppServices {

fn create_app_service(
&self,
peer_scope: &PeerScope,
current_peer_id: PeerId,
blueprint_id: String,
service_id: String,
) -> Result<AppService, ServiceError> {
let mut modules_config = self.modules.resolve_blueprint(&blueprint_id)?;
modules_config
.iter_mut()
.for_each(|module| self.vault.inject_vault(peer_scope, module));
.for_each(|module| self.vault.inject_vault(current_peer_id, module));

let app_config = AppServiceConfig {
service_working_dir: self.config.workdir.join(&service_id),
Expand Down

0 comments on commit 65d37d9

Please sign in to comment.