Skip to content

Commit

Permalink
Arbitrary committee change (#626)
Browse files Browse the repository at this point in the history
Arbitrary committee change (not only epoch)
  • Loading branch information
asonnino authored Aug 3, 2022
1 parent 9bc9c8c commit 8c41308
Show file tree
Hide file tree
Showing 47 changed files with 364 additions and 189 deletions.
19 changes: 13 additions & 6 deletions narwhal/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,9 @@ where
})
}

fn reconfigure(&mut self, new_committee: Committee) -> StoreResult<ConsensusState> {
fn change_epoch(&mut self, new_committee: Committee) -> StoreResult<ConsensusState> {
self.committee = new_committee.clone();
self.protocol.update_committee(new_committee)?;
tracing::debug!("Committee updated to {}", self.committee);

self.consensus_index = 0;

Expand Down Expand Up @@ -282,11 +281,15 @@ where
Ordering::Greater => {
let message = self.rx_reconfigure.borrow_and_update().clone();
match message {
ReconfigureNotification::NewCommittee(new_committee) => {
state = self.reconfigure(new_committee)?;
ReconfigureNotification::NewEpoch(new_committee) => {
state = self.change_epoch(new_committee)?;
},
ReconfigureNotification::UpdateCommittee(new_committee) => {
self.committee = new_committee;
}
ReconfigureNotification::Shutdown => return Ok(()),
}
tracing::debug!("Committee updated to {}", self.committee);
}
Ordering::Less => {
// We already updated committee but the core is slow.
Expand Down Expand Up @@ -336,11 +339,15 @@ where
result.expect("Committee channel dropped");
let message = self.rx_reconfigure.borrow().clone();
match message {
ReconfigureNotification::NewCommittee(new_committee) => {
state = self.reconfigure(new_committee)?;
ReconfigureNotification::NewEpoch(new_committee) => {
state = self.change_epoch(new_committee)?;
},
ReconfigureNotification::UpdateCommittee(new_committee) => {
self.committee = new_committee;
}
ReconfigureNotification::Shutdown => return Ok(())
}
tracing::debug!("Committee updated to {}", self.committee);
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions narwhal/consensus/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ impl SubscriberHandler {
result = self.rx_reconfigure.changed() => {
result.expect("Committee channel dropped");
let message = self.rx_reconfigure.borrow().clone();
match message {
ReconfigureNotification::NewCommittee(_) => (),
ReconfigureNotification::Shutdown => return Ok(())
if let ReconfigureNotification::Shutdown = message {
return Ok(());
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions narwhal/consensus/src/tests/bullshark_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn commit_one() {
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
let initial_committee = ReconfigureNotification::NewCommittee(committee.clone());
let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);

let store = make_consensus_store(&test_utils::temp_dir());
Expand Down Expand Up @@ -138,7 +138,7 @@ async fn dead_node() {
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
let initial_committee = ReconfigureNotification::NewCommittee(committee.clone());
let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);

let store = make_consensus_store(&test_utils::temp_dir());
Expand Down Expand Up @@ -248,7 +248,7 @@ async fn not_enough_support() {
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
let initial_committee = ReconfigureNotification::NewCommittee(committee.clone());
let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);

let store = make_consensus_store(&test_utils::temp_dir());
Expand Down Expand Up @@ -332,7 +332,7 @@ async fn missing_leader() {
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
let initial_committee = ReconfigureNotification::NewCommittee(committee.clone());
let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);

let store = make_consensus_store(&test_utils::temp_dir());
Expand Down Expand Up @@ -393,7 +393,7 @@ async fn epoch_change() {
let (tx_primary, mut rx_primary) = channel(1);
let (tx_output, mut rx_output) = channel(1);

let initial_committee = ReconfigureNotification::NewCommittee(committee.clone());
let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
let (tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);

let store = make_consensus_store(&test_utils::temp_dir());
Expand Down Expand Up @@ -456,7 +456,7 @@ async fn epoch_change() {

// Move to the next epoch.
committee.epoch = epoch + 1;
let message = ReconfigureNotification::NewCommittee(committee.clone());
let message = ReconfigureNotification::NewEpoch(committee.clone());
tx_reconfigure.send(message).unwrap();
}
}
Expand All @@ -478,7 +478,7 @@ async fn restart_with_new_committee() {
let (tx_primary, mut rx_primary) = channel(1);
let (tx_output, mut rx_output) = channel(1);

let initial_committee = ReconfigureNotification::NewCommittee(committee.clone());
let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
let (tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);
let store = make_consensus_store(&test_utils::temp_dir());
let cert_store = make_certificate_store(&test_utils::temp_dir());
Expand Down
2 changes: 1 addition & 1 deletion narwhal/consensus/src/tests/subscriber_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub async fn spawn_node(
.collect();

let committee = mock_committee(&keys[..]);
let initial_committee = ReconfigureNotification::NewCommittee(committee.clone());
let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
let (tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);

// Create the storages.
Expand Down
14 changes: 7 additions & 7 deletions narwhal/consensus/src/tests/tusk_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn commit_one() {
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
let initial_committee = ReconfigureNotification::NewCommittee(committee.clone());
let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);

let store = make_consensus_store(&test_utils::temp_dir());
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn dead_node() {
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
let initial_committee = ReconfigureNotification::NewCommittee(committee.clone());
let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);

let store = make_consensus_store(&test_utils::temp_dir());
Expand Down Expand Up @@ -245,7 +245,7 @@ async fn not_enough_support() {
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
let initial_committee = ReconfigureNotification::NewCommittee(committee.clone());
let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);

let store = make_consensus_store(&test_utils::temp_dir());
Expand Down Expand Up @@ -327,7 +327,7 @@ async fn missing_leader() {
let (tx_output, mut rx_output) = channel(1);

let committee = mock_committee(&keys[..]);
let initial_committee = ReconfigureNotification::NewCommittee(committee.clone());
let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);

let store = make_consensus_store(&test_utils::temp_dir());
Expand Down Expand Up @@ -387,7 +387,7 @@ async fn epoch_change() {
let (tx_primary, mut rx_primary) = channel(1);
let (tx_output, mut rx_output) = channel(1);

let initial_committee = ReconfigureNotification::NewCommittee(committee.clone());
let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
let (tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);

let store = make_consensus_store(&test_utils::temp_dir());
Expand Down Expand Up @@ -444,7 +444,7 @@ async fn epoch_change() {

// Move to the next epoch.
committee.epoch = epoch + 1;
let message = ReconfigureNotification::NewCommittee(committee.clone());
let message = ReconfigureNotification::NewEpoch(committee.clone());
tx_reconfigure.send(message).unwrap();
}
}
Expand All @@ -466,7 +466,7 @@ async fn restart_with_new_committee() {
let (tx_primary, mut rx_primary) = channel(1);
let (tx_output, mut rx_output) = channel(1);

let initial_committee = ReconfigureNotification::NewCommittee(committee.clone());
let initial_committee = ReconfigureNotification::NewEpoch(committee.clone());
let (tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);
let store = make_consensus_store(&test_utils::temp_dir());
let cert_store = make_certificate_store(&test_utils::temp_dir());
Expand Down
5 changes: 2 additions & 3 deletions narwhal/executor/src/batch_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ impl BatchLoader {
result = self.rx_reconfigure.changed() => {
result.expect("Committee channel dropped");
let message = self.rx_reconfigure.borrow().clone();
match message {
ReconfigureNotification::NewCommittee(_) => (),
ReconfigureNotification::Shutdown => return Ok(())
if let ReconfigureNotification::Shutdown = message {
return Ok(());
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions narwhal/executor/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ where
result = self.rx_reconfigure.changed() => {
result.expect("Committee channel dropped");
let message = self.rx_reconfigure.borrow().clone();
match message {
ReconfigureNotification::NewCommittee(_) => (),
ReconfigureNotification::Shutdown => return Ok(())
if let ReconfigureNotification::Shutdown = message {
return Ok(());
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions narwhal/executor/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,8 @@ impl Subscriber {
result = self.rx_reconfigure.changed() => {
result.expect("Committee channel dropped");
let message = self.rx_reconfigure.borrow().clone();
match message {
ReconfigureNotification::NewCommittee(_) => (),
ReconfigureNotification::Shutdown => return Ok(())
if let ReconfigureNotification::Shutdown = message {
return Ok(());
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions narwhal/executor/src/tests/executor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn execute_transactions() {
let (tx_output, mut rx_output) = channel(10);

let committee = committee(None);
let message = ReconfigureNotification::NewCommittee(committee);
let message = ReconfigureNotification::NewEpoch(committee);
let (_tx_reconfigure, rx_reconfigure) = watch::channel(message);

// Spawn the executor.
Expand Down Expand Up @@ -64,7 +64,7 @@ async fn execute_empty_certificate() {
let (tx_output, mut rx_output) = channel(10);

let committee = committee(None);
let message = ReconfigureNotification::NewCommittee(committee);
let message = ReconfigureNotification::NewEpoch(committee);
let (_tx_reconfigure, rx_reconfigure) = watch::channel(message);

// Spawn the executor.
Expand Down Expand Up @@ -120,7 +120,7 @@ async fn execute_malformed_transactions() {
let (tx_output, mut rx_output) = channel(10);

let committee = committee(None);
let message = ReconfigureNotification::NewCommittee(committee);
let message = ReconfigureNotification::NewEpoch(committee);
let (_tx_reconfigure, rx_reconfigure) = watch::channel(message);

// Spawn the executor.
Expand Down Expand Up @@ -182,7 +182,7 @@ async fn internal_error_execution() {
let (tx_output, mut rx_output) = channel(10);

let committee = committee(None);
let message = ReconfigureNotification::NewCommittee(committee);
let message = ReconfigureNotification::NewEpoch(committee);
let (_tx_reconfigure, rx_reconfigure) = watch::channel(message);

// Spawn the executor.
Expand Down Expand Up @@ -234,7 +234,7 @@ async fn crash_recovery() {
let (tx_output, mut rx_output) = channel(10);

let committee = committee(None);
let reconfigure_notification = ReconfigureNotification::NewCommittee(committee);
let reconfigure_notification = ReconfigureNotification::NewEpoch(committee);
let (_tx_reconfigure, rx_reconfigure) = watch::channel(reconfigure_notification.clone());

// Spawn the executor.
Expand Down
4 changes: 2 additions & 2 deletions narwhal/executor/src/tests/subscriber_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn spawn_consensus_and_subscriber(
let (tx_client_to_consensus, rx_client_to_consensus) = channel(10);

let committee = committee(None);
let message = ReconfigureNotification::NewCommittee(committee);
let message = ReconfigureNotification::NewEpoch(committee);
let (tx_reconfigure, rx_reconfigure) = watch::channel(message);

// Spawn a mock consensus core.
Expand Down Expand Up @@ -111,7 +111,7 @@ async fn synchronize() {
let (tx_client_to_consensus, rx_client_to_consensus) = channel(10);

let committee = committee(None);
let message = ReconfigureNotification::NewCommittee(committee);
let message = ReconfigureNotification::NewEpoch(committee);
let (_tx_reconfigure, rx_reconfigure) = watch::channel(message);

// Spawn a mock consensus core.
Expand Down
13 changes: 9 additions & 4 deletions narwhal/node/src/generate_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn get_registry() -> Result<Registry> {
let signature = kp.try_sign(msg).unwrap();
tracer.trace_value(&mut samples, &signature)?;

// Trace the correspondng header
// Trace the corresponding header
let keys: Vec<_> = (0..4).map(|_| KeyPair::generate(&mut rng)).collect();
let committee = Committee {
epoch: Epoch::default(),
Expand Down Expand Up @@ -107,13 +107,18 @@ fn get_registry() -> Result<Registry> {
let request_batch = PrimaryWorkerMessage::RequestBatch(BatchDigest([0u8; 32]));
let delete_batch = PrimaryWorkerMessage::DeleteBatches(vec![BatchDigest([0u8; 32])]);
let sync = PrimaryWorkerMessage::Synchronize(vec![BatchDigest([0u8; 32])], pk.clone());
let reconfigure =
PrimaryWorkerMessage::Reconfigure(ReconfigureNotification::NewCommittee(committee));
let epoch_change =
PrimaryWorkerMessage::Reconfigure(ReconfigureNotification::NewEpoch(committee.clone()));
let update_committee =
PrimaryWorkerMessage::Reconfigure(ReconfigureNotification::NewEpoch(committee));
let shutdown = PrimaryWorkerMessage::Reconfigure(ReconfigureNotification::Shutdown);
tracer.trace_value(&mut samples, &cleanup)?;
tracer.trace_value(&mut samples, &request_batch)?;
tracer.trace_value(&mut samples, &delete_batch)?;
tracer.trace_value(&mut samples, &sync)?;
tracer.trace_value(&mut samples, &reconfigure)?;
tracer.trace_value(&mut samples, &epoch_change)?;
tracer.trace_value(&mut samples, &update_committee)?;
tracer.trace_value(&mut samples, &shutdown)?;

// 2. Trace the main entry point(s) + every enum separately.
tracer.trace_type::<Batch>(&samples)?;
Expand Down
2 changes: 1 addition & 1 deletion narwhal/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl Node {
State::Outcome: Send + 'static,
State::Error: Debug,
{
let initial_committee = ReconfigureNotification::NewCommittee((**committee.load()).clone());
let initial_committee = ReconfigureNotification::NewEpoch((**committee.load()).clone());
let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee);

let (tx_new_certificates, rx_new_certificates) = channel(Self::CHANNEL_CAPACITY);
Expand Down
12 changes: 6 additions & 6 deletions narwhal/node/tests/reconfigure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,9 @@ async fn epoch_change() {
.primary(&name_clone)
.expect("Our key is not in the committee")
.primary_to_primary;
let message = WorkerPrimaryMessage::Reconfigure(
ReconfigureNotification::NewCommittee(committee.clone()),
);
let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::NewEpoch(
committee.clone(),
));
let primary_cancel_handle = primary_network.send(address, &message).await;

let addresses = committee
Expand All @@ -281,9 +281,9 @@ async fn epoch_change() {
.into_iter()
.map(|x| x.primary_to_worker)
.collect();
let message = PrimaryWorkerMessage::Reconfigure(
ReconfigureNotification::NewCommittee(committee.clone()),
);
let message = PrimaryWorkerMessage::Reconfigure(ReconfigureNotification::NewEpoch(
committee.clone(),
));
let worker_cancel_handles = worker_network
.unreliable_broadcast(addresses, &message)
.await;
Expand Down
6 changes: 5 additions & 1 deletion narwhal/node/tests/staged/narwhal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,14 @@ PrimaryWorkerMessage:
ReconfigureNotification:
ENUM:
0:
NewCommittee:
NewEpoch:
NEWTYPE:
TYPENAME: Committee
1:
UpdateCommittee:
NEWTYPE:
TYPENAME: Committee
2:
Shutdown: UNIT
WorkerInfo:
STRUCT:
Expand Down
Loading

0 comments on commit 8c41308

Please sign in to comment.