Skip to content

Commit

Permalink
feat(workers): allow to deploy decider to root scope, fix aliasing (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh authored Feb 24, 2023
1 parent b03da8b commit 193a6e7
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 194 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aquamarine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ key-manager = { workspace = true}

avm-server = { workspace = true }
libp2p = { workspace = true }
fluence-keypair = { workspace = true }

futures = { workspace = true }
log = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ mod tests {
use std::{sync::Arc, task::Context};

use avm_server::{AVMMemoryStats, AVMOutcome, CallResults, ParticleParameters};
use fluence_keypair::KeyPair;
use fluence_libp2p::RandomPeerId;
use futures::future::BoxFuture;
use futures::task::noop_waker_ref;
Expand Down Expand Up @@ -377,7 +378,7 @@ mod tests {
let builtin_mock = Arc::new(MockF);
let key_manager = KeyManager::new(
"keypair".into(),
RandomPeerId::random(),
KeyPair::generate_ed25519(),
RandomPeerId::random(),
RandomPeerId::random(),
);
Expand Down
20 changes: 13 additions & 7 deletions crates/key-manager/src/key_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ pub struct KeyManager {
host_peer_id: PeerId,
// temporary public, will refactor
pub insecure_keypair: KeyPair,
pub root_keypair: KeyPair,
management_peer_id: PeerId,
builtins_management_peer_id: PeerId,
}

impl KeyManager {
pub fn new(
keypairs_dir: PathBuf,
host_peer_id: PeerId,
root_keypair: KeyPair,
management_peer_id: PeerId,
builtins_management_peer_id: PeerId,
) -> Self {
Expand All @@ -59,12 +60,13 @@ impl KeyManager {
worker_ids: Arc::new(Default::default()),
worker_creators: Arc::new(Default::default()),
keypairs_dir,
host_peer_id,
host_peer_id: root_keypair.get_peer_id(),
insecure_keypair: KeyPair::from_secret_key(
INSECURE_KEYPAIR_SEED.collect(),
KeyFormat::Ed25519,
)
.expect("error creating insecure keypair"),
root_keypair,
management_peer_id,
builtins_management_peer_id,
};
Expand Down Expand Up @@ -149,11 +151,15 @@ impl KeyManager {
}

pub fn get_worker_keypair(&self, worker_id: PeerId) -> Result<KeyPair, KeyManagerError> {
self.worker_keypairs
.read()
.get(&worker_id)
.cloned()
.ok_or(KeyManagerError::KeypairNotFound(worker_id))
if self.is_host(worker_id) {
Ok(self.root_keypair.clone())
} else {
self.worker_keypairs
.read()
.get(&worker_id)
.cloned()
.ok_or(KeyManagerError::KeypairNotFound(worker_id))
}
}

pub fn get_worker_creator(&self, worker_id: PeerId) -> Result<PeerId, KeyManagerError> {
Expand Down
5 changes: 4 additions & 1 deletion crates/particle-node-tests/tests/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,10 @@ fn sign_verify() {
(call relay ("registry" "get_record_bytes") ["key_id" "" [] [] 1 []] data)
(seq
(call relay ("sig" "sign") [data] sig_result)
(call relay ("sig" "verify") [sig_result.$.signature.[0]! data] result)
(xor
(call relay ("sig" "verify") [sig_result.$.signature.[0]! data] result)
(call %init_peer_id% ("op" "return") [sig_result.$.error])
)
)
)
(call %init_peer_id% ("op" "return") [data sig_result result])
Expand Down
33 changes: 16 additions & 17 deletions crates/particle-node-tests/tests/spells.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use serde_json::{json, Value as JValue};
use connected_client::ConnectedClient;
use created_swarm::{make_swarms, make_swarms_with_builtins};
use fluence_spell_dtos::trigger_config::TriggerConfig;
use log_utils::enable_logs;
use service_modules::load_module;
use spell_event_bus::api::{TriggerInfo, TriggerInfoAqua, MAX_PERIOD_SEC};
use test_utils::create_service;
Expand Down Expand Up @@ -446,8 +445,6 @@ fn spell_install_fail_end_sec_past() {
// In this case we don't schedule a spell and return error.
#[test]
fn spell_install_fail_end_sec_before_start() {
enable_logs();

let swarms = make_swarms(1);
let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.wrap_err("connect client")
Expand Down Expand Up @@ -603,7 +600,6 @@ fn spell_remove() {

#[test]
fn spell_remove_by_alias() {
enable_logs();
let swarms = make_swarms(1);

let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
Expand Down Expand Up @@ -695,9 +691,12 @@ fn spell_remove_spell_as_service() {
#[test]
fn spell_remove_service_as_spell() {
let swarms = make_swarms(1);
let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.wrap_err("connect client")
.unwrap();
let mut client = ConnectedClient::connect_with_keypair(
swarms[0].multiaddr.clone(),
Some(swarms[0].management_keypair.clone()),
)
.wrap_err("connect client")
.unwrap();

let service = create_service(
&mut client,
Expand All @@ -714,10 +713,7 @@ fn spell_remove_service_as_spell() {
client.send_particle(
r#"
(xor
(seq
(call relay ("worker" "create") [] worker)
(call worker ("spell" "remove") [service_id])
)
(call relay ("spell" "remove") [service_id])
(call client ("return" "") [%last_error%.$.message])
)
"#,
Expand Down Expand Up @@ -1393,9 +1389,12 @@ fn spell_create_worker_twice() {
fn spell_install_root_scope() {
let swarms = make_swarms(1);

let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.wrap_err("connect client")
.unwrap();
let mut client = ConnectedClient::connect_with_keypair(
swarms[0].multiaddr.clone(),
Some(swarms[0].management_keypair.clone()),
)
.wrap_err("connect client")
.unwrap();

let script = r#"(call %init_peer_id% ("getDataSrv" "spell_id") [] spell_id)"#;

Expand All @@ -1415,9 +1414,9 @@ fn spell_install_root_scope() {
(seq
(seq
(call relay ("spell" "install") [script data config] spell_id)
(call relay ("worker" "get_peer_id") [] worker_peer_id)
(call relay ("srv" "info") [spell_id] info)
)
(call client ("return" "") [spell_id worker_peer_id])
(call client ("return" "") [spell_id info.$.worker_id])
)"#,
data.clone(),
);
Expand All @@ -1426,7 +1425,7 @@ fn spell_install_root_scope() {
let spell_id = response[0].as_str().unwrap().to_string();
assert_ne!(spell_id.len(), 0);
let worker_id = response[1].as_str().unwrap().to_string();
assert_ne!(worker_id, client.node.to_base58());
assert_eq!(worker_id, client.node.to_base58());
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/peer-metrics/src/services_metrics/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Encode for ServiceType {
let label = match self {
ServiceType::Builtin => "builtin",
ServiceType::Service(Some(x)) => x,
ServiceType::Service(_) => "service",
ServiceType::Service(_) => "non-aliased-services",
};

writer.write_all(label.as_bytes())?;
Expand Down
43 changes: 12 additions & 31 deletions particle-builtins/src/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::str::FromStr;
use std::time::{Duration, Instant};

use derivative::Derivative;
use fluence_keypair::{KeyPair, Signature};
use fluence_keypair::Signature;
use humantime_serde::re::humantime::format_duration as pretty;
use libp2p::{core::Multiaddr, kad::kbucket::Key, kad::K_VALUE, PeerId};
use multihash::{Code, MultihashDigest, MultihashGeneric};
Expand Down Expand Up @@ -72,8 +72,6 @@ pub struct Builtins<C> {
pub management_peer_id: PeerId,
pub builtins_management_peer_id: PeerId,
pub local_peer_id: PeerId,
#[derivative(Debug = "ignore")]
pub root_keypair: KeyPair,

pub modules: ModuleRepository,
pub services: ParticleAppServices,
Expand All @@ -98,7 +96,6 @@ where
node_info: NodeInfo,
config: ServicesConfig,
services_metrics: ServicesMetrics,
root_keypair: KeyPair,
key_manager: KeyManager,
) -> Self {
let modules_dir = &config.modules_dir;
Expand All @@ -123,7 +120,6 @@ where
management_peer_id,
builtins_management_peer_id,
local_peer_id,
root_keypair,
modules,
services,
node_info,
Expand Down Expand Up @@ -188,7 +184,7 @@ where
("kad", "neigh_with_addrs") => wrap(self.neighborhood_with_addresses(args).await),
("kad", "merge") => wrap(self.kad_merge(args.function_args)),

("srv", "list") => ok(self.list_services()),
("srv", "list") => ok(self.list_services(particle)),
("srv", "create") => wrap(self.create_service(args, particle)),
("srv", "get_interface") => wrap(self.get_interface(args, particle)),
("srv", "resolve_alias") => wrap(self.resolve_alias(args, particle)),
Expand Down Expand Up @@ -828,8 +824,8 @@ where
Ok(())
}

fn list_services(&self) -> JValue {
JValue::Array(self.services.list_services())
fn list_services(&self, params: ParticleParams) -> JValue {
JValue::Array(self.services.list_services(params.host_id))
}

fn call_service(&self, function_args: Args, particle: ParticleParams) -> FunctionOutcome {
Expand Down Expand Up @@ -951,13 +947,8 @@ where
return Err(JError::new(format!("expected tetraplet for a scalar argument, got tetraplet for an array: {tetraplet:?}, tetraplets")));
}

if params.host_id == self.local_peer_id {
json!(self.root_keypair.sign(&data)?.to_vec())
} else {
// if this call is initiated by the worker on this worker as host_id and init_peer_id
let keypair = self.key_manager.get_worker_keypair(params.init_peer_id)?;
json!(keypair.sign(&data)?.to_vec())
}
let keypair = self.key_manager.get_worker_keypair(params.host_id)?;
json!(keypair.sign(&data)?.to_vec())
};

match result {
Expand All @@ -979,23 +970,13 @@ where
let mut args = args.function_args.into_iter();
let signature: Vec<u8> = Args::next("signature", &mut args)?;
let data: Vec<u8> = Args::next("data", &mut args)?;
let signature =
Signature::from_bytes(self.root_keypair.public().get_key_format(), signature);
let pk = self
.key_manager
.get_worker_keypair(params.host_id)?
.public();
let signature = Signature::from_bytes(pk.get_key_format(), signature);

// TODO: move root_keypair to key_manager and unify verification
if params.host_id == self.local_peer_id {
Ok(JValue::Bool(
self.root_keypair.public().verify(&data, &signature).is_ok(),
))
} else {
Ok(JValue::Bool(
self.key_manager
.get_worker_keypair(params.host_id)?
.public()
.verify(&data, &signature)
.is_ok(),
))
}
Ok(JValue::Bool(pk.verify(&data, &signature).is_ok()))
}

fn get_peer_id(&self, params: ParticleParams) -> Result<JValue, JError> {
Expand Down
6 changes: 1 addition & 5 deletions particle-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::{io, net::SocketAddr};

use async_std::task;
use eyre::WrapErr;
use fluence_keypair::KeyPair;
use futures::{
channel::{mpsc::unbounded, oneshot},
select,
Expand Down Expand Up @@ -111,7 +110,7 @@ impl<RT: AquaRuntime> Node<RT> {

let key_manager = KeyManager::new(
config.dir_config.keypairs_base_dir.clone(),
to_peer_id(&key_pair),
key_pair.clone().try_into()?,
config.management_peer_id,
builtins_peer_id,
);
Expand Down Expand Up @@ -203,7 +202,6 @@ impl<RT: AquaRuntime> Node<RT> {
services_config,
script_storage_api,
services_metrics,
config.node_config.root_key_pair.clone(),
key_manager.clone(),
));

Expand Down Expand Up @@ -319,7 +317,6 @@ impl<RT: AquaRuntime> Node<RT> {
services_config: ServicesConfig,
script_storage_api: ScriptStorageApi,
services_metrics: ServicesMetrics,
root_keypair: KeyPair,
key_manager: KeyManager,
) -> Builtins<Connectivity> {
let node_info = NodeInfo {
Expand All @@ -334,7 +331,6 @@ impl<RT: AquaRuntime> Node<RT> {
node_info,
services_config,
services_metrics,
root_keypair,
key_manager,
)
}
Expand Down
11 changes: 2 additions & 9 deletions particle-services/src/app_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub fn create_app_service(
blueprint_id: String,
service_id: String,
aliases: Vec<String>,
root_aliases: Vec<String>,
owner_id: PeerId,
worker_id: PeerId,
metrics: Option<&ServicesMetrics>,
Expand Down Expand Up @@ -65,14 +64,8 @@ pub fn create_app_service(
.map_err(ServiceError::Engine)?;

// Save created service to disk, so it is recreated on restart
let persisted = PersistedService::new(
service_id,
blueprint_id,
aliases,
root_aliases,
owner_id,
worker_id,
);
let persisted =
PersistedService::new(service_id, blueprint_id, aliases, owner_id, worker_id);
persist_service(&config.services_dir, persisted)?;

service
Expand Down
Loading

0 comments on commit 193a6e7

Please sign in to comment.