Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arun-koshy committed Aug 23, 2022
1 parent fd2eb23 commit 502141e
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 31 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ authors = ["Mysten Labs <[email protected]>"]
edition = "2021"

[dependencies]
match_opt = "0.1.2"
multiaddr = "0.14.0"
serde = { version = "1.0.143", features = ["derive"] }
serde_json = "1.0.83"
Expand Down
26 changes: 14 additions & 12 deletions config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ pub struct WorkerCache {
pub epoch: Epoch,
}

impl From<WorkerCache> for SharedWorkerCache {
fn from(worker_cache: WorkerCache) -> Self {
Arc::new(ArcSwap::from_pointee(worker_cache))
}
}

impl std::fmt::Display for WorkerIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down Expand Up @@ -354,8 +360,7 @@ impl WorkerCache {
pub fn worker(&self, to: &PublicKey, id: &WorkerId) -> Result<WorkerInfo, ConfigError> {
self.workers
.iter()
.find(|(name, _)| *name == to)
.map(|(_, authority)| authority)
.find_map(|v| match_opt::match_opt!(v, (name, authority) if name == to => authority))
.ok_or_else(|| {
ConfigError::NotInWorkerCache(ToString::to_string(&(*to).encode_base64()))
})?
Expand All @@ -371,8 +376,9 @@ impl WorkerCache {
let res = self
.workers
.iter()
.find(|(name, _)| *name == myself)
.map(|(_, authority)| authority)
.find_map(
|v| match_opt::match_opt!(v, (name, authority) if name == myself => authority),
)
.ok_or_else(|| ConfigError::NotInWorkerCache((*myself).encode_base64()))?
.0
.values()
Expand All @@ -390,14 +396,10 @@ impl WorkerCache {
) -> Vec<(PublicKey, WorkerInfo)> {
self.workers
.iter()
.filter(|(name, _)| *name != myself)
.filter_map(|(name, authority)| {
authority
.0
.iter()
.find(|(worker_id, _)| worker_id == &id)
.map(|(_, addresses)| (name.deref().clone(), addresses.clone()))
})
.filter(|(name, _)| *name != myself )
.flat_map(
|(name, authority)| authority.0.iter().flat_map(
|v| match_opt::match_opt!(v,(worker_id, addresses) if worker_id == id => (name.clone(), addresses.clone()))))
.collect()
}

Expand Down
2 changes: 1 addition & 1 deletion network/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl PrimaryToWorkerNetwork {
where
I: IntoIterator<Item = &'a Multiaddr>,
{
// TODO: Add protection for primary owned worker addresses.
// TODO: Add protection for primary owned worker addresses (issue#840).
for address in to_remove {
self.clients.remove(address);
}
Expand Down
2 changes: 1 addition & 1 deletion network/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl WorkerNetwork {
where
I: IntoIterator<Item = &'a Multiaddr>,
{
// TODO: Add protection for primary owned worker addresses.
// TODO: Add protection for primary owned worker addresses (issue#840).
for address in to_remove {
self.clients.remove(address);
}
Expand Down
4 changes: 2 additions & 2 deletions primary/src/block_remover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub struct DeleteBatchMessage {
/// # use std::env::temp_dir;
/// # use crypto::Digest;
/// # use crypto::ed25519::Ed25519PublicKey;
/// # use config::{Committee, WorkerCache};
/// # use config::{Committee, WorkerCache, SharedWorkerCache};
/// # use consensus::dag::Dag;
/// # use futures::future::join_all;
/// # use std::collections::BTreeMap;
Expand Down Expand Up @@ -121,7 +121,7 @@ pub struct DeleteBatchMessage {
///
/// let name = Ed25519PublicKey::default();
/// let committee = Committee{ epoch: 0, authorities: BTreeMap::new() };
/// let worker_cache = Arc::new(ArcSwap::from_pointee(WorkerCache{ epoch: 0, workers: BTreeMap::new() }));
/// let worker_cache: SharedWorkerCache = WorkerCache{ epoch: 0, workers: BTreeMap::new() }.into();
/// let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone()));
/// let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
/// // A dag with genesis for the committee
Expand Down
4 changes: 2 additions & 2 deletions primary/src/block_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type RequestKey = Vec<u8>;
/// # use crypto::Hash;
/// # use std::env::temp_dir;
/// # use crypto::ed25519::Ed25519PublicKey;
/// # use config::{Committee, WorkerCache};
/// # use config::{Committee, WorkerCache, SharedWorkerCache};
/// # use std::collections::BTreeMap;
/// # use types::Certificate;
/// # use primary::{BlockWaiter, BlockHeader, BlockCommand, block_synchronizer::{BlockSynchronizeResult, handler::{Error, Handler}}};
Expand Down Expand Up @@ -160,7 +160,7 @@ type RequestKey = Vec<u8>;
///
/// let name = Ed25519PublicKey::default();
/// let committee = Committee{ epoch: 0, authorities: BTreeMap::new() };
/// let worker_cache = Arc::new(ArcSwap::from_pointee(WorkerCache{ epoch: 0, workers: BTreeMap::new() }));
/// let worker_cache: SharedWorkerCache = WorkerCache{ epoch: 0, workers: BTreeMap::new() }.into();
/// let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone()));
///
/// // A dummy certificate
Expand Down
21 changes: 18 additions & 3 deletions primary/src/state_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use crypto::PublicKey;
use network::{PrimaryToWorkerNetwork, UnreliableNetwork};
use std::collections::BTreeMap;
use std::sync::Arc;
use tap::TapOptional;
use tokio::{sync::watch, task::JoinHandle};
use tracing::info;
use tracing::{info, warn};
use types::{metered_channel::Receiver, Certificate, ReconfigureNotification, Round};

/// Receives the highest round reached by consensus and update it for all tasks.
Expand Down Expand Up @@ -110,7 +111,14 @@ impl StateHandler {
workers: committee.keys().iter().map(|key|
(
(*key).clone(),
self.worker_cache.load().workers.get(key).unwrap_or(&WorkerIndex(BTreeMap::new())).clone()
self.worker_cache
.load()
.workers
.get(key)
.tap_none(||
warn!("Worker cache does not have a key for the new committee member"))
.unwrap_or(&WorkerIndex(BTreeMap::new()))
.clone()
)).collect(),
}));

Expand All @@ -133,7 +141,14 @@ impl StateHandler {
workers: committee.keys().iter().map(|key|
(
(*key).clone(),
self.worker_cache.load().workers.get(key).unwrap_or(&WorkerIndex(BTreeMap::new())).clone()
self.worker_cache
.load()
.workers
.get(key)
.tap_none(||
warn!("Worker cache does not have a key for the new committee member"))
.unwrap_or(&WorkerIndex(BTreeMap::new()))
.clone()
)).collect(),
}));

Expand Down
3 changes: 1 addition & 2 deletions test_utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use arc_swap::ArcSwap;
use config::{
utils::get_available_port, Authority, Committee, Epoch, PrimaryAddresses, SharedWorkerCache,
WorkerCache, WorkerId, WorkerIndex, WorkerInfo,
Expand Down Expand Up @@ -159,7 +158,7 @@ pub fn shared_worker_cache(rng_seed: impl Into<Option<u64>>) -> SharedWorkerCach
}

pub fn shared_worker_cache_from_keys(keys: &[KeyPair]) -> SharedWorkerCache {
Arc::new(ArcSwap::from_pointee(worker_cache_from_keys(keys)))
worker_cache_from_keys(keys).into()
}

pub fn worker_cache_from_keys(keys: &[KeyPair]) -> WorkerCache {
Expand Down
1 change: 1 addition & 0 deletions worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bytes = "1.2.1"
futures = "0.3.23"
multiaddr = "0.14.0"
serde = { version = "1.0.143", features = ["derive"] }
tap = "1.0.1"
tokio = { version = "1.20.1", features = ["sync", "rt", "macros"] }
tokio-stream = "0.1.9"
tokio-util = { version = "0.7.3", features = ["codec"] }
Expand Down
21 changes: 18 additions & 3 deletions worker/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};
use store::{Store, StoreError};
use tap::TapOptional;
use tokio::{
sync::{mpsc, watch},
task::JoinHandle,
time::{sleep, Duration, Instant},
};
use tracing::{debug, error};
use tracing::{debug, error, warn};
use types::{
metered_channel::{Receiver, Sender},
BatchDigest, ReconfigureNotification, Round, SerializedBatchMessage, WorkerMessage,
Expand Down Expand Up @@ -217,7 +218,14 @@ impl Synchronizer {
workers: new_committee.keys().iter().map(|key|
(
(*key).clone(),
self.worker_cache.load().workers.get(key).unwrap_or(&WorkerIndex(BTreeMap::new())).clone()
self.worker_cache
.load()
.workers
.get(key)
.tap_none(||
warn!("Worker cache does not have a key for the new committee member"))
.unwrap_or(&WorkerIndex(BTreeMap::new()))
.clone()
)).collect(),
}));

Expand All @@ -238,7 +246,14 @@ impl Synchronizer {
workers: new_committee.keys().iter().map(|key|
(
(*key).clone(),
self.worker_cache.load().workers.get(key).unwrap_or(&WorkerIndex(BTreeMap::new())).clone()
self.worker_cache
.load()
.workers
.get(key)
.tap_none(||
warn!("Worker cache does not have a key for the new committee member"))
.unwrap_or(&WorkerIndex(BTreeMap::new()))
.clone()
)).collect(),
}));

Expand Down
14 changes: 9 additions & 5 deletions worker/src/tests/synchronizer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crypto::traits::KeyPair;
use prometheus::Registry;
use test_utils::{
batch, batch_digest, batches, keys, open_batch_store, pure_committee_from_keys,
resolve_name_committee_and_worker_cache, serialize_batch_message, worker_cache_from_keys,
WorkerToWorkerMockServer,
resolve_name_committee_and_worker_cache, serialize_batch_message,
shared_worker_cache_from_keys, WorkerToWorkerMockServer,
};
use tokio::time::timeout;
use types::serialized_batch_digest;
Expand All @@ -20,7 +20,7 @@ async fn synchronize() {

let mut keys = keys(None);
let committee = pure_committee_from_keys(&keys);
let worker_cache = worker_cache_from_keys(&keys);
let worker_cache = shared_worker_cache_from_keys(&keys);
let name = keys.pop().unwrap().public().clone();
let id = 0;

Expand All @@ -37,7 +37,7 @@ async fn synchronize() {
name.clone(),
id,
Arc::new(ArcSwap::from_pointee(committee.clone())),
Arc::new(ArcSwap::from_pointee(worker_cache.clone())),
worker_cache.clone(),
store.clone(),
/* gc_depth */ 50, // Not used in this test.
/* sync_retry_delay */
Expand All @@ -52,7 +52,11 @@ async fn synchronize() {

// Spawn a listener to receive our batch requests.
let target = keys.pop().unwrap().public().clone();
let address = worker_cache.worker(&target, &id).unwrap().worker_to_worker;
let address = worker_cache
.load()
.worker(&target, &id)
.unwrap()
.worker_to_worker;
let missing = vec![batch_digest()];
let message = WorkerMessage::BatchRequest(missing.clone(), name.clone());
let serialized = bincode::serialize(&message).unwrap();
Expand Down

0 comments on commit 502141e

Please sign in to comment.