Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Commit

Permalink
Use singleton cache & update reconfiguration
Browse files Browse the repository at this point in the history
  • Loading branch information
arun-koshy committed Aug 16, 2022
1 parent 6035eef commit 1af5300
Show file tree
Hide file tree
Showing 41 changed files with 448 additions and 258 deletions.
2 changes: 1 addition & 1 deletion Docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ from the database and log data. This is useful to preserve the state between mul


```
# arguments for script are {num_primary} & {num_worker} in that order
# arguments for script are {num_primary} & {num_worker_per_primary} in that order
./gen.validators.sh 6 1
Expand Down
38 changes: 20 additions & 18 deletions config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crypto::{traits::EncodeDecodeBase64, PublicKey};
use multiaddr::Multiaddr;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, HashSet},
fs::{self, OpenOptions},
io::{BufWriter, Write as _},
ops::Deref,
Expand All @@ -35,6 +35,9 @@ pub enum ConfigError {
#[error("Node {0} is not in the committee")]
NotInCommittee(String),

#[error("Node {0} is not in the worker cache")]
NotInWorkerCache(String),

#[error("Unknown worker id {0}")]
UnknownWorker(WorkerId),

Expand Down Expand Up @@ -304,7 +307,7 @@ pub struct WorkerInfo {
pub type SharedWorkerCache = Arc<ArcSwap<WorkerCache>>;

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct WorkerIndex(pub HashMap<WorkerId, WorkerInfo>);
pub struct WorkerIndex(pub BTreeMap<WorkerId, WorkerInfo>);

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct WorkerCache {
Expand Down Expand Up @@ -354,13 +357,13 @@ impl WorkerCache {
.find(|(name, _)| *name == to)
.map(|(_, authority)| authority)
.ok_or_else(|| {
ConfigError::NotInCommittee(ToString::to_string(&(*to).encode_base64()))
ConfigError::NotInWorkerCache(ToString::to_string(&(*to).encode_base64()))
})?
.0
.iter()
.find(|(worker_id, _)| worker_id == &id)
.map(|(_, worker)| worker.clone())
.ok_or_else(|| ConfigError::NotInCommittee((*to).encode_base64()))
.ok_or_else(|| ConfigError::NotInWorkerCache((*to).encode_base64()))
}

/// Returns the addresses of all our workers.
Expand All @@ -370,7 +373,7 @@ impl WorkerCache {
.iter()
.find(|(name, _)| *name == myself)
.map(|(_, authority)| authority)
.ok_or_else(|| ConfigError::NotInCommittee((*myself).encode_base64()))?
.ok_or_else(|| ConfigError::NotInWorkerCache((*myself).encode_base64()))?
.0
.values()
.cloned()
Expand Down Expand Up @@ -398,11 +401,14 @@ impl WorkerCache {
.collect()
}

/// Return all the network addresses in the worker cache.
fn get_all_network_addresses(&self) -> HashSet<&Multiaddr> {
/// Return the network addresses that are present in the current worker cache
/// that are from a primary key that are no longer in the committee. Current
/// committee keys provided as an argument.
pub fn network_diff(&self, keys: Vec<&PublicKey>) -> HashSet<&Multiaddr> {
self.workers
.values()
.flat_map(|authority| {
.iter()
.filter(|(name, _)| !keys.contains(name))
.flat_map(|(_, authority)| {
authority
.0
.values()
Expand All @@ -422,15 +428,6 @@ impl WorkerCache {
})
.collect()
}

/// Return the network addresses that are present in the current worker cache
/// but that are absent from the new worker cache (provided as argument).
pub fn network_diff<'a>(&'a self, other: &'a Self) -> HashSet<&Multiaddr> {
self.get_all_network_addresses()
.difference(&other.get_all_network_addresses())
.cloned()
.collect()
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -479,6 +476,11 @@ impl Committee {
self.epoch
}

/// Returns the keys in the committee
pub fn keys(&self) -> Vec<&PublicKey> {
self.authorities.keys().clone().collect::<Vec<&PublicKey>>()
}

/// Returns the number of authorities.
pub fn size(&self) -> usize {
self.authorities.len()
Expand Down
34 changes: 32 additions & 2 deletions config/tests/config_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ use std::collections::BTreeMap;

use config::{
Committee, ConsensusAPIGrpcParameters, Epoch, Import, Parameters, PrimaryAddresses,
PrometheusMetricsParameters, Stake,
PrometheusMetricsParameters, Stake, WorkerCache,
};
use crypto::{traits::KeyPair as _, PublicKey};
use insta::assert_json_snapshot;
use rand::seq::SliceRandom;
use std::{fs::File, io::Write};
use tempfile::tempdir;
use test_utils::make_authority_with_port_getter;
use test_utils::{initialize_worker_index_with_port_getter, make_authority_with_port_getter};

#[test]
fn update_primary_network_info_test() {
Expand Down Expand Up @@ -208,3 +208,33 @@ fn commmittee_snapshot_matches() {
settings.set_sort_maps(true);
settings.bind(|| assert_json_snapshot!("committee", committee));
}

#[test]
fn workers_snapshot_matches() {
// The shape of this configuration is load-bearing in the NW benchmarks,
// and in Sui (prod)
let keys = test_utils::keys(None);

let worker_cache = WorkerCache {
epoch: Epoch::default(),
workers: keys
.iter()
.map(|kp| {
let mut port = 0;
let increment_port_getter = || {
port += 1;
port
};
(
kp.public().clone(),
initialize_worker_index_with_port_getter(increment_port_getter),
)
})
.collect(),
};

// we need authorities to be serialized in order
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.bind(|| assert_json_snapshot!("worker_cache", worker_cache));
}
97 changes: 97 additions & 0 deletions config/tests/snapshots/config_tests__worker_cache.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
---
source: config/tests/config_tests.rs
expression: worker_cache
---
{
"workers": {
"ildi4hrBzbOHBELHe0w69Yx87bh3nQJw5tTx4vc2fXQ=": {
"0": {
"transactions": "/ip4/127.0.0.1/tcp/2/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/3/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/1/http"
},
"1": {
"transactions": "/ip4/127.0.0.1/tcp/5/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/6/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/4/http"
},
"2": {
"transactions": "/ip4/127.0.0.1/tcp/8/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/9/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/7/http"
},
"3": {
"transactions": "/ip4/127.0.0.1/tcp/11/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/12/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/10/http"
}
},
"rvP0pLjsod/DQzYb+OQ2vULeklnAS4MU644gVN1ugqs=": {
"0": {
"transactions": "/ip4/127.0.0.1/tcp/2/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/3/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/1/http"
},
"1": {
"transactions": "/ip4/127.0.0.1/tcp/5/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/6/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/4/http"
},
"2": {
"transactions": "/ip4/127.0.0.1/tcp/8/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/9/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/7/http"
},
"3": {
"transactions": "/ip4/127.0.0.1/tcp/11/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/12/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/10/http"
}
},
"ucbuFjDvPnERRKZI2wa7sihPcnTPvuU//O5QPMGkkgA=": {
"0": {
"transactions": "/ip4/127.0.0.1/tcp/2/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/3/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/1/http"
},
"1": {
"transactions": "/ip4/127.0.0.1/tcp/5/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/6/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/4/http"
},
"2": {
"transactions": "/ip4/127.0.0.1/tcp/8/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/9/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/7/http"
},
"3": {
"transactions": "/ip4/127.0.0.1/tcp/11/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/12/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/10/http"
}
},
"zGIzLjS7LVzWn2Dvuyo2y5FsfrRYMB6jZjbE27ASvYg=": {
"0": {
"transactions": "/ip4/127.0.0.1/tcp/2/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/3/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/1/http"
},
"1": {
"transactions": "/ip4/127.0.0.1/tcp/5/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/6/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/4/http"
},
"2": {
"transactions": "/ip4/127.0.0.1/tcp/8/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/9/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/7/http"
},
"3": {
"transactions": "/ip4/127.0.0.1/tcp/11/http",
"worker_to_worker": "/ip4/127.0.0.1/tcp/12/http",
"primary_to_worker": "/ip4/127.0.0.1/tcp/10/http"
}
}
},
"epoch": 0
}
1 change: 1 addition & 0 deletions network/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ impl PrimaryToWorkerNetwork {
where
I: IntoIterator<Item = &'a Multiaddr>,
{
// TODO(ajkoshy): Add protection for primary owned worker addresses.
for address in to_remove {
self.clients.remove(address);
}
Expand Down
1 change: 1 addition & 0 deletions network/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl WorkerNetwork {
where
I: IntoIterator<Item = &'a Multiaddr>,
{
// TODO(ajkoshy): Add protection for primary owned worker addresses.
for address in to_remove {
self.clients.remove(address);
}
Expand Down
45 changes: 16 additions & 29 deletions node/src/generate_format.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use config::{Authority, Committee, Epoch, PrimaryAddresses, WorkerCache, WorkerIndex, WorkerInfo};
use config::{Authority, Committee, Epoch, PrimaryAddresses, WorkerIndex, WorkerInfo};
use crypto::{
traits::{KeyPair as _, Signer},
Digest, Hash, KeyPair,
Expand Down Expand Up @@ -80,34 +80,21 @@ fn get_registry() -> Result<Registry> {
tracer.trace_value(&mut samples, &header)?;
tracer.trace_value(&mut samples, &certificate)?;

let worker_cache = WorkerCache {
epoch: Epoch::default(),
workers: keys
.iter()
.enumerate()
.map(|(i, kp)| {
let workers = vec![(
0,
WorkerInfo {
primary_to_worker: format!("/ip4/127.0.0.1/tcp/{}/http", 300 + i)
.parse()
.unwrap(),
transactions: format!("/ip4/127.0.0.1/tcp/{}/http", 400 + i)
.parse()
.unwrap(),
worker_to_worker: format!("/ip4/127.0.0.1/tcp/{}/http", 500 + i)
.parse()
.unwrap(),
},
)]
.into_iter()
.collect();
(kp.public().clone(), WorkerIndex(workers))
})
.collect(),
};

tracer.trace_value(&mut samples, &worker_cache)?;
// WorkerIndex & WorkerInfo will be present in a protocol message once dynamic
// worker integration is complete.
let worker_index = WorkerIndex(
vec![(
0,
WorkerInfo {
primary_to_worker: "/ip4/127.0.0.1/tcp/300/http".to_string().parse().unwrap(),
transactions: "/ip4/127.0.0.1/tcp/400/http".to_string().parse().unwrap(),
worker_to_worker: "/ip4/127.0.0.1/tcp/500/http".to_string().parse().unwrap(),
},
)]
.into_iter()
.collect(),
);
tracer.trace_value(&mut samples, &worker_index)?;

let cleanup = PrimaryWorkerMessage::Cleanup(1u64);
let request_batch = PrimaryWorkerMessage::RequestBatch(BatchDigest([0u8; 32]));
Expand Down
10 changes: 5 additions & 5 deletions node/src/restarter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::{Node, NodeStorage};
use arc_swap::ArcSwap;
use config::{Committee, Parameters, WorkerCache};
use config::{Committee, Parameters, SharedWorkerCache};
use crypto::{traits::KeyPair as _, KeyPair};
use executor::{ExecutionState, ExecutorOutput};
use futures::future::join_all;
Expand All @@ -20,7 +20,7 @@ impl NodeRestarter {
pub async fn watch<State>(
keypair: KeyPair,
committee: &Committee,
worker_cache: &WorkerCache,
worker_cache: SharedWorkerCache,
storage_base_path: PathBuf,
execution_state: Arc<State>,
parameters: Parameters,
Expand All @@ -34,7 +34,6 @@ impl NodeRestarter {
let mut keypair = keypair;
let mut name = keypair.public().clone();
let mut committee = committee.clone();
let worker_cache = worker_cache.clone();

let mut handles = Vec::new();
let mut primary_network = WorkerToPrimaryNetwork::default();
Expand All @@ -53,7 +52,7 @@ impl NodeRestarter {
let primary_handles = Node::spawn_primary(
keypair,
Arc::new(ArcSwap::new(Arc::new(committee.clone()))),
Arc::new(ArcSwap::new(Arc::new(worker_cache.clone()))),
worker_cache.clone(),
&store,
parameters.clone(),
/* consensus */ true,
Expand All @@ -68,7 +67,7 @@ impl NodeRestarter {
name.clone(),
/* worker_ids */ vec![0],
Arc::new(ArcSwap::new(Arc::new(committee.clone()))),
Arc::new(ArcSwap::new(Arc::new(worker_cache.clone()))),
worker_cache.clone(),
&store,
parameters.clone(),
&Registry::new(),
Expand All @@ -93,6 +92,7 @@ impl NodeRestarter {
let primary_cancel_handle = primary_network.send(address, &message).await;

let addresses = worker_cache
.load()
.our_workers(&name)
.expect("Our key is not in the worker cache")
.into_iter()
Expand Down
Loading

0 comments on commit 1af5300

Please sign in to comment.