diff --git a/Cargo.lock b/Cargo.lock index 29e963eba..2d498a1b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -742,6 +742,7 @@ dependencies = [ "crypto", "fastcrypto", "insta", + "match_opt", "multiaddr", "rand 0.8.5", "serde", @@ -4785,6 +4786,7 @@ dependencies = [ "prometheus", "rand 0.8.5", "serde", + "tap", "tempfile", "test_utils", "tokio", diff --git a/config/Cargo.toml b/config/Cargo.toml index f60adae77..a8d95da37 100644 --- a/config/Cargo.toml +++ b/config/Cargo.toml @@ -6,6 +6,7 @@ authors = ["Mysten Labs "] edition = "2021" [dependencies] +match_opt = "0.1.2" multiaddr = "0.14.0" serde = { version = "1.0.144", features = ["derive"] } serde_json = "1.0.85" diff --git a/config/src/lib.rs b/config/src/lib.rs index 5d29afa58..d66d828b1 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -319,6 +319,12 @@ pub struct WorkerCache { pub epoch: Epoch, } +impl From for SharedWorkerCache { + fn from(worker_cache: WorkerCache) -> Self { + Arc::new(ArcSwap::from_pointee(worker_cache)) + } +} + impl std::fmt::Display for WorkerIndex { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -356,8 +362,7 @@ impl WorkerCache { pub fn worker(&self, to: &PublicKey, id: &WorkerId) -> Result { self.workers .iter() - .find(|(name, _)| *name == to) - .map(|(_, authority)| authority) + .find_map(|v| match_opt::match_opt!(v, (name, authority) if name == to => authority)) .ok_or_else(|| { ConfigError::NotInWorkerCache(ToString::to_string(&(*to).encode_base64())) })? @@ -373,8 +378,9 @@ impl WorkerCache { let res = self .workers .iter() - .find(|(name, _)| *name == myself) - .map(|(_, authority)| authority) + .find_map( + |v| match_opt::match_opt!(v, (name, authority) if name == myself => authority), + ) .ok_or_else(|| ConfigError::NotInWorkerCache((*myself).encode_base64()))? .0 .values() @@ -392,14 +398,10 @@ impl WorkerCache { ) -> Vec<(PublicKey, WorkerInfo)> { self.workers .iter() - .filter(|(name, _)| *name != myself) - .filter_map(|(name, authority)| { - authority - .0 - .iter() - .find(|(worker_id, _)| worker_id == &id) - .map(|(_, addresses)| (name.deref().clone(), addresses.clone())) - }) + .filter(|(name, _)| *name != myself ) + .flat_map( + |(name, authority)| authority.0.iter().flat_map( + |v| match_opt::match_opt!(v,(worker_id, addresses) if worker_id == id => (name.clone(), addresses.clone())))) .collect() } diff --git a/network/src/primary.rs b/network/src/primary.rs index b525c2aa0..df56f66eb 100644 --- a/network/src/primary.rs +++ b/network/src/primary.rs @@ -193,7 +193,7 @@ impl PrimaryToWorkerNetwork { where I: IntoIterator, { - // TODO: Add protection for primary owned worker addresses. + // TODO: Add protection for primary owned worker addresses (issue#840). for address in to_remove { self.clients.remove(address); } diff --git a/network/src/worker.rs b/network/src/worker.rs index 62e49cf5f..e4b707c02 100644 --- a/network/src/worker.rs +++ b/network/src/worker.rs @@ -72,7 +72,7 @@ impl WorkerNetwork { where I: IntoIterator, { - // TODO: Add protection for primary owned worker addresses. + // TODO: Add protection for primary owned worker addresses (issue#840). for address in to_remove { self.clients.remove(address); } diff --git a/primary/src/block_remover.rs b/primary/src/block_remover.rs index b3b38dcfd..42ec4ef99 100644 --- a/primary/src/block_remover.rs +++ b/primary/src/block_remover.rs @@ -81,7 +81,7 @@ pub struct DeleteBatchMessage { /// # use std::env::temp_dir; /// # use fastcrypto::Digest; /// # use fastcrypto::ed25519::Ed25519PublicKey; -/// # use config::{Committee, WorkerCache}; +/// # use config::{Committee, WorkerCache, SharedWorkerCache}; /// # use consensus::dag::Dag; /// # use futures::future::join_all; /// # use std::collections::BTreeMap; @@ -122,7 +122,7 @@ pub struct DeleteBatchMessage { /// /// let name = Ed25519PublicKey::default(); /// let committee = Committee{ epoch: 0, authorities: BTreeMap::new() }; -/// let worker_cache = Arc::new(ArcSwap::from_pointee(WorkerCache{ epoch: 0, workers: BTreeMap::new() })); +/// let worker_cache: SharedWorkerCache = WorkerCache{ epoch: 0, workers: BTreeMap::new() }.into(); /// let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); /// let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); /// // A dag with genesis for the committee diff --git a/primary/src/block_waiter.rs b/primary/src/block_waiter.rs index 057b617ca..0e96d60da 100644 --- a/primary/src/block_waiter.rs +++ b/primary/src/block_waiter.rs @@ -120,7 +120,7 @@ type RequestKey = Vec; /// # use fastcrypto::Hash; /// # use std::env::temp_dir; /// # use fastcrypto::ed25519::Ed25519PublicKey; -/// # use config::{Committee, WorkerCache}; +/// # use config::{Committee, WorkerCache, SharedWorkerCache}; /// # use std::collections::BTreeMap; /// # use types::Certificate; /// # use primary::{BlockWaiter, BlockHeader, BlockCommand, block_synchronizer::{BlockSynchronizeResult, handler::{Error, Handler}}}; @@ -161,7 +161,7 @@ type RequestKey = Vec; /// /// let name = Ed25519PublicKey::default(); /// let committee = Committee{ epoch: 0, authorities: BTreeMap::new() }; -/// let worker_cache = Arc::new(ArcSwap::from_pointee(WorkerCache{ epoch: 0, workers: BTreeMap::new() })); +/// let worker_cache: SharedWorkerCache = WorkerCache{ epoch: 0, workers: BTreeMap::new() }.into(); /// let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); /// /// // A dummy certificate diff --git a/primary/src/state_handler.rs b/primary/src/state_handler.rs index 05b48b3ac..dac7c0a30 100644 --- a/primary/src/state_handler.rs +++ b/primary/src/state_handler.rs @@ -7,8 +7,9 @@ use crypto::PublicKey; use network::{PrimaryToWorkerNetwork, UnreliableNetwork}; use std::collections::BTreeMap; use std::sync::Arc; +use tap::TapOptional; use tokio::{sync::watch, task::JoinHandle}; -use tracing::info; +use tracing::{info, warn}; use types::{metered_channel::Receiver, Certificate, ReconfigureNotification, Round}; /// Receives the highest round reached by consensus and update it for all tasks. @@ -110,7 +111,14 @@ impl StateHandler { workers: committee.keys().iter().map(|key| ( (*key).clone(), - self.worker_cache.load().workers.get(key).unwrap_or(&WorkerIndex(BTreeMap::new())).clone() + self.worker_cache + .load() + .workers + .get(key) + .tap_none(|| + warn!("Worker cache does not have a key for the new committee member")) + .unwrap_or(&WorkerIndex(BTreeMap::new())) + .clone() )).collect(), })); @@ -133,7 +141,14 @@ impl StateHandler { workers: committee.keys().iter().map(|key| ( (*key).clone(), - self.worker_cache.load().workers.get(key).unwrap_or(&WorkerIndex(BTreeMap::new())).clone() + self.worker_cache + .load() + .workers + .get(key) + .tap_none(|| + warn!("Worker cache does not have a key for the new committee member")) + .unwrap_or(&WorkerIndex(BTreeMap::new())) + .clone() )).collect(), })); diff --git a/test_utils/src/lib.rs b/test_utils/src/lib.rs index 47d5f856d..48d44f64e 100644 --- a/test_utils/src/lib.rs +++ b/test_utils/src/lib.rs @@ -1,7 +1,6 @@ // Copyright (c) 2021, Facebook, Inc. and its affiliates // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use arc_swap::ArcSwap; use config::{ utils::get_available_port, Authority, Committee, Epoch, PrimaryAddresses, SharedWorkerCache, WorkerCache, WorkerId, WorkerIndex, WorkerInfo, @@ -161,7 +160,7 @@ pub fn shared_worker_cache(rng_seed: impl Into>) -> SharedWorkerCach } pub fn shared_worker_cache_from_keys(keys: &[KeyPair]) -> SharedWorkerCache { - Arc::new(ArcSwap::from_pointee(worker_cache_from_keys(keys))) + worker_cache_from_keys(keys).into() } pub fn worker_cache_from_keys(keys: &[KeyPair]) -> WorkerCache { diff --git a/worker/Cargo.toml b/worker/Cargo.toml index 1273d3650..4abe02b05 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -13,6 +13,7 @@ bytes = "1.2.1" futures = "0.3.23" multiaddr = "0.14.0" serde = { version = "1.0.144", features = ["derive"] } +tap = "1.0.1" tokio = { version = "1.20.1", features = ["sync", "rt", "macros"] } tokio-stream = "0.1.9" tokio-util = { version = "0.7.3", features = ["codec"] } diff --git a/worker/src/synchronizer.rs b/worker/src/synchronizer.rs index 82fcb54cc..3d78fe8c1 100644 --- a/worker/src/synchronizer.rs +++ b/worker/src/synchronizer.rs @@ -13,12 +13,13 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; use store::{Store, StoreError}; +use tap::TapOptional; use tokio::{ sync::{mpsc, watch}, task::JoinHandle, time::{sleep, Duration, Instant}, }; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use types::{ metered_channel::{Receiver, Sender}, BatchDigest, ReconfigureNotification, Round, SerializedBatchMessage, WorkerMessage, @@ -217,7 +218,14 @@ impl Synchronizer { workers: new_committee.keys().iter().map(|key| ( (*key).clone(), - self.worker_cache.load().workers.get(key).unwrap_or(&WorkerIndex(BTreeMap::new())).clone() + self.worker_cache + .load() + .workers + .get(key) + .tap_none(|| + warn!("Worker cache does not have a key for the new committee member")) + .unwrap_or(&WorkerIndex(BTreeMap::new())) + .clone() )).collect(), })); @@ -238,7 +246,14 @@ impl Synchronizer { workers: new_committee.keys().iter().map(|key| ( (*key).clone(), - self.worker_cache.load().workers.get(key).unwrap_or(&WorkerIndex(BTreeMap::new())).clone() + self.worker_cache + .load() + .workers + .get(key) + .tap_none(|| + warn!("Worker cache does not have a key for the new committee member")) + .unwrap_or(&WorkerIndex(BTreeMap::new())) + .clone() )).collect(), })); diff --git a/worker/src/tests/synchronizer_tests.rs b/worker/src/tests/synchronizer_tests.rs index 7945201d0..7097e8fe5 100644 --- a/worker/src/tests/synchronizer_tests.rs +++ b/worker/src/tests/synchronizer_tests.rs @@ -7,8 +7,8 @@ use fastcrypto::traits::KeyPair; use prometheus::Registry; use test_utils::{ batch, batch_digest, batches, keys, open_batch_store, pure_committee_from_keys, - resolve_name_committee_and_worker_cache, serialize_batch_message, worker_cache_from_keys, - WorkerToWorkerMockServer, + resolve_name_committee_and_worker_cache, serialize_batch_message, + shared_worker_cache_from_keys, WorkerToWorkerMockServer, }; use tokio::time::timeout; use types::serialized_batch_digest; @@ -20,7 +20,7 @@ async fn synchronize() { let mut keys = keys(None); let committee = pure_committee_from_keys(&keys); - let worker_cache = worker_cache_from_keys(&keys); + let worker_cache = shared_worker_cache_from_keys(&keys); let name = keys.pop().unwrap().public().clone(); let id = 0; @@ -37,7 +37,7 @@ async fn synchronize() { name.clone(), id, Arc::new(ArcSwap::from_pointee(committee.clone())), - Arc::new(ArcSwap::from_pointee(worker_cache.clone())), + worker_cache.clone(), store.clone(), /* gc_depth */ 50, // Not used in this test. /* sync_retry_delay */ @@ -52,7 +52,11 @@ async fn synchronize() { // Spawn a listener to receive our batch requests. let target = keys.pop().unwrap().public().clone(); - let address = worker_cache.worker(&target, &id).unwrap().worker_to_worker; + let address = worker_cache + .load() + .worker(&target, &id) + .unwrap() + .worker_to_worker; let missing = vec![batch_digest()]; let message = WorkerMessage::BatchRequest(missing.clone(), name.clone()); let serialized = bincode::serialize(&message).unwrap();