From 8c41308a244441e260c5943599d6723dcf6cc64d Mon Sep 17 00:00:00 2001 From: Alberto Sonnino Date: Wed, 3 Aug 2022 11:28:55 -0400 Subject: [PATCH] Arbitrary committee change (#626) Arbitrary committee change (not only epoch) --- narwhal/consensus/src/consensus.rs | 19 ++- narwhal/consensus/src/subscriber.rs | 5 +- .../consensus/src/tests/bullshark_tests.rs | 14 +- .../consensus/src/tests/subscriber_tests.rs | 2 +- narwhal/consensus/src/tests/tusk_tests.rs | 14 +- narwhal/executor/src/batch_loader.rs | 5 +- narwhal/executor/src/core.rs | 5 +- narwhal/executor/src/subscriber.rs | 5 +- narwhal/executor/src/tests/executor_tests.rs | 10 +- .../executor/src/tests/subscriber_tests.rs | 4 +- narwhal/node/src/generate_format.rs | 13 +- narwhal/node/src/lib.rs | 2 +- narwhal/node/tests/reconfigure.rs | 12 +- narwhal/node/tests/staged/narwhal.yaml | 6 +- narwhal/primary/src/block_remover.rs | 9 +- narwhal/primary/src/block_synchronizer/mod.rs | 9 +- .../tests/block_synchronizer_tests.rs | 18 +-- narwhal/primary/src/block_waiter.rs | 11 +- narwhal/primary/src/certificate_waiter.rs | 7 +- narwhal/primary/src/core.rs | 21 +-- narwhal/primary/src/header_waiter.rs | 18 +-- narwhal/primary/src/helper.rs | 7 +- narwhal/primary/src/primary.rs | 7 +- narwhal/primary/src/proposer.rs | 21 ++- narwhal/primary/src/state_handler.rs | 10 +- .../primary/src/tests/block_remover_tests.rs | 7 +- .../primary/src/tests/block_waiter_tests.rs | 16 +-- .../src/tests/certificate_waiter_tests.rs | 4 +- narwhal/primary/src/tests/core_tests.rs | 18 ++- .../primary/src/tests/header_waiter_tests.rs | 2 +- narwhal/primary/src/tests/helper_tests.rs | 8 +- narwhal/primary/src/tests/proposer_tests.rs | 4 +- narwhal/primary/tests/epoch_change.rs | 129 ++++++++++++++++-- .../tests/integration_tests_proposer_api.rs | 8 +- .../tests/integration_tests_validator_api.rs | 16 +-- narwhal/types/src/primary.rs | 8 +- narwhal/worker/src/batch_maker.rs | 8 +- narwhal/worker/src/helper.rs | 8 +- narwhal/worker/src/primary_connector.rs | 8 +- narwhal/worker/src/quorum_waiter.rs | 19 ++- narwhal/worker/src/synchronizer.rs | 11 +- narwhal/worker/src/tests/batch_maker_tests.rs | 4 +- narwhal/worker/src/tests/helper_tests.rs | 4 +- narwhal/worker/src/tests/processor_tests.rs | 2 +- .../worker/src/tests/quorum_waiter_tests.rs | 2 +- .../worker/src/tests/synchronizer_tests.rs | 8 +- narwhal/worker/src/worker.rs | 5 +- 47 files changed, 364 insertions(+), 189 deletions(-) diff --git a/narwhal/consensus/src/consensus.rs b/narwhal/consensus/src/consensus.rs index d7149d5ed824c..88dac118368df 100644 --- a/narwhal/consensus/src/consensus.rs +++ b/narwhal/consensus/src/consensus.rs @@ -244,10 +244,9 @@ where }) } - fn reconfigure(&mut self, new_committee: Committee) -> StoreResult { + fn change_epoch(&mut self, new_committee: Committee) -> StoreResult { self.committee = new_committee.clone(); self.protocol.update_committee(new_committee)?; - tracing::debug!("Committee updated to {}", self.committee); self.consensus_index = 0; @@ -282,11 +281,15 @@ where Ordering::Greater => { let message = self.rx_reconfigure.borrow_and_update().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { - state = self.reconfigure(new_committee)?; + ReconfigureNotification::NewEpoch(new_committee) => { + state = self.change_epoch(new_committee)?; }, + ReconfigureNotification::UpdateCommittee(new_committee) => { + self.committee = new_committee; + } ReconfigureNotification::Shutdown => return Ok(()), } + tracing::debug!("Committee updated to {}", self.committee); } Ordering::Less => { // We already updated committee but the core is slow. @@ -336,11 +339,15 @@ where result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { - state = self.reconfigure(new_committee)?; + ReconfigureNotification::NewEpoch(new_committee) => { + state = self.change_epoch(new_committee)?; }, + ReconfigureNotification::UpdateCommittee(new_committee) => { + self.committee = new_committee; + } ReconfigureNotification::Shutdown => return Ok(()) } + tracing::debug!("Committee updated to {}", self.committee); } } } diff --git a/narwhal/consensus/src/subscriber.rs b/narwhal/consensus/src/subscriber.rs index 7b1772bbee3a0..15306df59327e 100644 --- a/narwhal/consensus/src/subscriber.rs +++ b/narwhal/consensus/src/subscriber.rs @@ -82,9 +82,8 @@ impl SubscriberHandler { result = self.rx_reconfigure.changed() => { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); - match message { - ReconfigureNotification::NewCommittee(_) => (), - ReconfigureNotification::Shutdown => return Ok(()) + if let ReconfigureNotification::Shutdown = message { + return Ok(()); } } } diff --git a/narwhal/consensus/src/tests/bullshark_tests.rs b/narwhal/consensus/src/tests/bullshark_tests.rs index 41df370948fe7..06e0c5b0e8c21 100644 --- a/narwhal/consensus/src/tests/bullshark_tests.rs +++ b/narwhal/consensus/src/tests/bullshark_tests.rs @@ -75,7 +75,7 @@ async fn commit_one() { let (tx_output, mut rx_output) = channel(1); let committee = mock_committee(&keys[..]); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); let store = make_consensus_store(&test_utils::temp_dir()); @@ -138,7 +138,7 @@ async fn dead_node() { let (tx_output, mut rx_output) = channel(1); let committee = mock_committee(&keys[..]); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); let store = make_consensus_store(&test_utils::temp_dir()); @@ -248,7 +248,7 @@ async fn not_enough_support() { let (tx_output, mut rx_output) = channel(1); let committee = mock_committee(&keys[..]); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); let store = make_consensus_store(&test_utils::temp_dir()); @@ -332,7 +332,7 @@ async fn missing_leader() { let (tx_output, mut rx_output) = channel(1); let committee = mock_committee(&keys[..]); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); let store = make_consensus_store(&test_utils::temp_dir()); @@ -393,7 +393,7 @@ async fn epoch_change() { let (tx_primary, mut rx_primary) = channel(1); let (tx_output, mut rx_output) = channel(1); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); let store = make_consensus_store(&test_utils::temp_dir()); @@ -456,7 +456,7 @@ async fn epoch_change() { // Move to the next epoch. committee.epoch = epoch + 1; - let message = ReconfigureNotification::NewCommittee(committee.clone()); + let message = ReconfigureNotification::NewEpoch(committee.clone()); tx_reconfigure.send(message).unwrap(); } } @@ -478,7 +478,7 @@ async fn restart_with_new_committee() { let (tx_primary, mut rx_primary) = channel(1); let (tx_output, mut rx_output) = channel(1); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); let store = make_consensus_store(&test_utils::temp_dir()); let cert_store = make_certificate_store(&test_utils::temp_dir()); diff --git a/narwhal/consensus/src/tests/subscriber_tests.rs b/narwhal/consensus/src/tests/subscriber_tests.rs index 5f55709566db5..8ce251900407f 100644 --- a/narwhal/consensus/src/tests/subscriber_tests.rs +++ b/narwhal/consensus/src/tests/subscriber_tests.rs @@ -50,7 +50,7 @@ pub async fn spawn_node( .collect(); let committee = mock_committee(&keys[..]); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); // Create the storages. diff --git a/narwhal/consensus/src/tests/tusk_tests.rs b/narwhal/consensus/src/tests/tusk_tests.rs index 61a11dbfa8648..ab7bf5017b126 100644 --- a/narwhal/consensus/src/tests/tusk_tests.rs +++ b/narwhal/consensus/src/tests/tusk_tests.rs @@ -73,7 +73,7 @@ async fn commit_one() { let (tx_output, mut rx_output) = channel(1); let committee = mock_committee(&keys[..]); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); let store = make_consensus_store(&test_utils::temp_dir()); @@ -137,7 +137,7 @@ async fn dead_node() { let (tx_output, mut rx_output) = channel(1); let committee = mock_committee(&keys[..]); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); let store = make_consensus_store(&test_utils::temp_dir()); @@ -245,7 +245,7 @@ async fn not_enough_support() { let (tx_output, mut rx_output) = channel(1); let committee = mock_committee(&keys[..]); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); let store = make_consensus_store(&test_utils::temp_dir()); @@ -327,7 +327,7 @@ async fn missing_leader() { let (tx_output, mut rx_output) = channel(1); let committee = mock_committee(&keys[..]); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (_tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); let store = make_consensus_store(&test_utils::temp_dir()); @@ -387,7 +387,7 @@ async fn epoch_change() { let (tx_primary, mut rx_primary) = channel(1); let (tx_output, mut rx_output) = channel(1); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); let store = make_consensus_store(&test_utils::temp_dir()); @@ -444,7 +444,7 @@ async fn epoch_change() { // Move to the next epoch. committee.epoch = epoch + 1; - let message = ReconfigureNotification::NewCommittee(committee.clone()); + let message = ReconfigureNotification::NewEpoch(committee.clone()); tx_reconfigure.send(message).unwrap(); } } @@ -466,7 +466,7 @@ async fn restart_with_new_committee() { let (tx_primary, mut rx_primary) = channel(1); let (tx_output, mut rx_output) = channel(1); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee); let store = make_consensus_store(&test_utils::temp_dir()); let cert_store = make_certificate_store(&test_utils::temp_dir()); diff --git a/narwhal/executor/src/batch_loader.rs b/narwhal/executor/src/batch_loader.rs index e03d2c400cc2c..80a500447b0d4 100644 --- a/narwhal/executor/src/batch_loader.rs +++ b/narwhal/executor/src/batch_loader.rs @@ -102,9 +102,8 @@ impl BatchLoader { result = self.rx_reconfigure.changed() => { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); - match message { - ReconfigureNotification::NewCommittee(_) => (), - ReconfigureNotification::Shutdown => return Ok(()) + if let ReconfigureNotification::Shutdown = message { + return Ok(()); } } } diff --git a/narwhal/executor/src/core.rs b/narwhal/executor/src/core.rs index 9d15c74440b58..b20e5215ab540 100644 --- a/narwhal/executor/src/core.rs +++ b/narwhal/executor/src/core.rs @@ -101,9 +101,8 @@ where result = self.rx_reconfigure.changed() => { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); - match message { - ReconfigureNotification::NewCommittee(_) => (), - ReconfigureNotification::Shutdown => return Ok(()) + if let ReconfigureNotification::Shutdown = message { + return Ok(()); } } } diff --git a/narwhal/executor/src/subscriber.rs b/narwhal/executor/src/subscriber.rs index 2b298e4e5d771..c1ff6aacda488 100644 --- a/narwhal/executor/src/subscriber.rs +++ b/narwhal/executor/src/subscriber.rs @@ -213,9 +213,8 @@ impl Subscriber { result = self.rx_reconfigure.changed() => { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); - match message { - ReconfigureNotification::NewCommittee(_) => (), - ReconfigureNotification::Shutdown => return Ok(()) + if let ReconfigureNotification::Shutdown = message { + return Ok(()); } } } diff --git a/narwhal/executor/src/tests/executor_tests.rs b/narwhal/executor/src/tests/executor_tests.rs index 67892515cc3eb..3a505e97a2c06 100644 --- a/narwhal/executor/src/tests/executor_tests.rs +++ b/narwhal/executor/src/tests/executor_tests.rs @@ -17,7 +17,7 @@ async fn execute_transactions() { let (tx_output, mut rx_output) = channel(10); let committee = committee(None); - let message = ReconfigureNotification::NewCommittee(committee); + let message = ReconfigureNotification::NewEpoch(committee); let (_tx_reconfigure, rx_reconfigure) = watch::channel(message); // Spawn the executor. @@ -64,7 +64,7 @@ async fn execute_empty_certificate() { let (tx_output, mut rx_output) = channel(10); let committee = committee(None); - let message = ReconfigureNotification::NewCommittee(committee); + let message = ReconfigureNotification::NewEpoch(committee); let (_tx_reconfigure, rx_reconfigure) = watch::channel(message); // Spawn the executor. @@ -120,7 +120,7 @@ async fn execute_malformed_transactions() { let (tx_output, mut rx_output) = channel(10); let committee = committee(None); - let message = ReconfigureNotification::NewCommittee(committee); + let message = ReconfigureNotification::NewEpoch(committee); let (_tx_reconfigure, rx_reconfigure) = watch::channel(message); // Spawn the executor. @@ -182,7 +182,7 @@ async fn internal_error_execution() { let (tx_output, mut rx_output) = channel(10); let committee = committee(None); - let message = ReconfigureNotification::NewCommittee(committee); + let message = ReconfigureNotification::NewEpoch(committee); let (_tx_reconfigure, rx_reconfigure) = watch::channel(message); // Spawn the executor. @@ -234,7 +234,7 @@ async fn crash_recovery() { let (tx_output, mut rx_output) = channel(10); let committee = committee(None); - let reconfigure_notification = ReconfigureNotification::NewCommittee(committee); + let reconfigure_notification = ReconfigureNotification::NewEpoch(committee); let (_tx_reconfigure, rx_reconfigure) = watch::channel(reconfigure_notification.clone()); // Spawn the executor. diff --git a/narwhal/executor/src/tests/subscriber_tests.rs b/narwhal/executor/src/tests/subscriber_tests.rs index e362cc5ba3018..c972c12127ca9 100644 --- a/narwhal/executor/src/tests/subscriber_tests.rs +++ b/narwhal/executor/src/tests/subscriber_tests.rs @@ -22,7 +22,7 @@ async fn spawn_consensus_and_subscriber( let (tx_client_to_consensus, rx_client_to_consensus) = channel(10); let committee = committee(None); - let message = ReconfigureNotification::NewCommittee(committee); + let message = ReconfigureNotification::NewEpoch(committee); let (tx_reconfigure, rx_reconfigure) = watch::channel(message); // Spawn a mock consensus core. @@ -111,7 +111,7 @@ async fn synchronize() { let (tx_client_to_consensus, rx_client_to_consensus) = channel(10); let committee = committee(None); - let message = ReconfigureNotification::NewCommittee(committee); + let message = ReconfigureNotification::NewEpoch(committee); let (_tx_reconfigure, rx_reconfigure) = watch::channel(message); // Spawn a mock consensus core. diff --git a/narwhal/node/src/generate_format.rs b/narwhal/node/src/generate_format.rs index 7ae084266c92e..12b25efe8b81d 100644 --- a/narwhal/node/src/generate_format.rs +++ b/narwhal/node/src/generate_format.rs @@ -33,7 +33,7 @@ fn get_registry() -> Result { let signature = kp.try_sign(msg).unwrap(); tracer.trace_value(&mut samples, &signature)?; - // Trace the correspondng header + // Trace the corresponding header let keys: Vec<_> = (0..4).map(|_| KeyPair::generate(&mut rng)).collect(); let committee = Committee { epoch: Epoch::default(), @@ -107,13 +107,18 @@ fn get_registry() -> Result { let request_batch = PrimaryWorkerMessage::RequestBatch(BatchDigest([0u8; 32])); let delete_batch = PrimaryWorkerMessage::DeleteBatches(vec![BatchDigest([0u8; 32])]); let sync = PrimaryWorkerMessage::Synchronize(vec![BatchDigest([0u8; 32])], pk.clone()); - let reconfigure = - PrimaryWorkerMessage::Reconfigure(ReconfigureNotification::NewCommittee(committee)); + let epoch_change = + PrimaryWorkerMessage::Reconfigure(ReconfigureNotification::NewEpoch(committee.clone())); + let update_committee = + PrimaryWorkerMessage::Reconfigure(ReconfigureNotification::NewEpoch(committee)); + let shutdown = PrimaryWorkerMessage::Reconfigure(ReconfigureNotification::Shutdown); tracer.trace_value(&mut samples, &cleanup)?; tracer.trace_value(&mut samples, &request_batch)?; tracer.trace_value(&mut samples, &delete_batch)?; tracer.trace_value(&mut samples, &sync)?; - tracer.trace_value(&mut samples, &reconfigure)?; + tracer.trace_value(&mut samples, &epoch_change)?; + tracer.trace_value(&mut samples, &update_committee)?; + tracer.trace_value(&mut samples, &shutdown)?; // 2. Trace the main entry point(s) + every enum separately. tracer.trace_type::(&samples)?; diff --git a/narwhal/node/src/lib.rs b/narwhal/node/src/lib.rs index 00785ac6914a6..50a11dfc457ee 100644 --- a/narwhal/node/src/lib.rs +++ b/narwhal/node/src/lib.rs @@ -129,7 +129,7 @@ impl Node { State::Outcome: Send + 'static, State::Error: Debug, { - let initial_committee = ReconfigureNotification::NewCommittee((**committee.load()).clone()); + let initial_committee = ReconfigureNotification::NewEpoch((**committee.load()).clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let (tx_new_certificates, rx_new_certificates) = channel(Self::CHANNEL_CAPACITY); diff --git a/narwhal/node/tests/reconfigure.rs b/narwhal/node/tests/reconfigure.rs index db93df3b33ee6..b60dadcaf78c8 100644 --- a/narwhal/node/tests/reconfigure.rs +++ b/narwhal/node/tests/reconfigure.rs @@ -270,9 +270,9 @@ async fn epoch_change() { .primary(&name_clone) .expect("Our key is not in the committee") .primary_to_primary; - let message = WorkerPrimaryMessage::Reconfigure( - ReconfigureNotification::NewCommittee(committee.clone()), - ); + let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::NewEpoch( + committee.clone(), + )); let primary_cancel_handle = primary_network.send(address, &message).await; let addresses = committee @@ -281,9 +281,9 @@ async fn epoch_change() { .into_iter() .map(|x| x.primary_to_worker) .collect(); - let message = PrimaryWorkerMessage::Reconfigure( - ReconfigureNotification::NewCommittee(committee.clone()), - ); + let message = PrimaryWorkerMessage::Reconfigure(ReconfigureNotification::NewEpoch( + committee.clone(), + )); let worker_cancel_handles = worker_network .unreliable_broadcast(addresses, &message) .await; diff --git a/narwhal/node/tests/staged/narwhal.yaml b/narwhal/node/tests/staged/narwhal.yaml index 249f4688be1ab..e9243e8efc996 100644 --- a/narwhal/node/tests/staged/narwhal.yaml +++ b/narwhal/node/tests/staged/narwhal.yaml @@ -99,10 +99,14 @@ PrimaryWorkerMessage: ReconfigureNotification: ENUM: 0: - NewCommittee: + NewEpoch: NEWTYPE: TYPENAME: Committee 1: + UpdateCommittee: + NEWTYPE: + TYPENAME: Committee + 2: Shutdown: UNIT WorkerInfo: STRUCT: diff --git a/narwhal/primary/src/block_remover.rs b/narwhal/primary/src/block_remover.rs index 02bb92b786174..9e26f9fea6b2b 100644 --- a/narwhal/primary/src/block_remover.rs +++ b/narwhal/primary/src/block_remover.rs @@ -122,7 +122,7 @@ pub struct DeleteBatchMessage { /// /// let name = Ed25519PublicKey::default(); /// let committee = Committee{ epoch: 0, authorities: BTreeMap::new() }; -/// let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); +/// let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); /// let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); /// // A dag with genesis for the committee /// let (tx_new_certificates, rx_new_certificates) = channel(1); @@ -281,12 +281,15 @@ impl BlockRemover { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { + ReconfigureNotification::NewEpoch(new_committee)=> { + self.committee = new_committee; + } + ReconfigureNotification::UpdateCommittee(new_committee)=> { self.committee = new_committee; - tracing::debug!("Committee updated to {}", self.committee); } ReconfigureNotification::Shutdown => return } + tracing::debug!("Committee updated to {}", self.committee); } } } diff --git a/narwhal/primary/src/block_synchronizer/mod.rs b/narwhal/primary/src/block_synchronizer/mod.rs index 24b3d41507971..3ee7f84621414 100644 --- a/narwhal/primary/src/block_synchronizer/mod.rs +++ b/narwhal/primary/src/block_synchronizer/mod.rs @@ -310,12 +310,15 @@ impl BlockSynchronizer { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { + ReconfigureNotification::NewEpoch(new_committee)=> { self.committee = new_committee; - tracing::debug!("Committee updated to {}", self.committee); - }, + } + ReconfigureNotification::UpdateCommittee(new_committee)=> { + self.committee = new_committee; + } ReconfigureNotification::Shutdown => return } + tracing::debug!("Committee updated to {}", self.committee); } } } diff --git a/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs b/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs index 411ca195a3958..386b9636d24d6 100644 --- a/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs +++ b/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs @@ -43,7 +43,7 @@ async fn test_successful_headers_synchronization() { let (name, committee) = resolve_name_and_committee(); let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_commands, rx_commands) = channel(10); let (tx_certificate_responses, rx_certificate_responses) = channel(10); let (_, rx_payload_availability_responses) = channel(10); @@ -199,7 +199,7 @@ async fn test_successful_payload_synchronization() { let (name, committee) = resolve_name_and_committee(); let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_commands, rx_commands) = channel(10); let (_tx_certificate_responses, rx_certificate_responses) = channel(10); let (tx_payload_availability_responses, rx_payload_availability_responses) = channel(10); @@ -388,8 +388,7 @@ async fn test_multiple_overlapping_requests() { let (_, certificate_store, payload_store) = create_db_stores(); let (name, committee) = resolve_name_and_committee(); - let (_, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (_, rx_commands) = channel(10); let (_, rx_certificate_responses) = channel(10); let (_, rx_payload_availability_responses) = channel(10); @@ -508,7 +507,7 @@ async fn test_timeout_while_waiting_for_certificates() { let key = keys(None).pop().unwrap(); let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_commands, rx_commands) = channel(10); let (_, rx_certificate_responses) = channel(10); let (_, rx_payload_availability_responses) = channel(10); @@ -598,8 +597,7 @@ async fn test_reply_with_certificates_already_in_storage() { let (name, committee) = resolve_name_and_committee(); let key = keys(None).pop().unwrap(); - let (_, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (_, rx_commands) = channel(10); let (_, rx_certificate_responses) = channel(10); let (_, rx_payload_availability_responses) = channel(10); @@ -692,8 +690,7 @@ async fn test_reply_with_payload_already_in_storage() { let (name, committee) = resolve_name_and_committee(); let key = keys(None).pop().unwrap(); - let (_, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (_, rx_commands) = channel(10); let (_, rx_certificate_responses) = channel(10); let (_, rx_payload_availability_responses) = channel(10); @@ -794,8 +791,7 @@ async fn test_reply_with_payload_already_in_storage_for_own_certificates() { // be used to create the headers. let name = key.public().clone(); - let (_, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (_, rx_commands) = channel(10); let (_, rx_certificate_responses) = channel(10); let (_, rx_payload_availability_responses) = channel(10); diff --git a/narwhal/primary/src/block_waiter.rs b/narwhal/primary/src/block_waiter.rs index 813167bb6ae9d..53079e605200d 100644 --- a/narwhal/primary/src/block_waiter.rs +++ b/narwhal/primary/src/block_waiter.rs @@ -158,7 +158,7 @@ type RequestKey = Vec; /// /// let name = Ed25519PublicKey::default(); /// let committee = Committee{ epoch: 0, authorities: BTreeMap::new() }; -/// let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); +/// let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); /// /// // A dummy certificate /// let certificate = Certificate::default(); @@ -328,12 +328,15 @@ impl BlockWaiter { + ReconfigureNotification::NewEpoch(new_committee)=> { self.committee = new_committee; - tracing::debug!("Committee updated to {}", self.committee); } - ReconfigureNotification::Shutdown => return, + ReconfigureNotification::UpdateCommittee(new_committee)=> { + self.committee = new_committee; + } + ReconfigureNotification::Shutdown => return } + tracing::debug!("Committee updated to {}", self.committee); } } } diff --git a/narwhal/primary/src/certificate_waiter.rs b/narwhal/primary/src/certificate_waiter.rs index 4a122e0045d3e..5b58bba1c5362 100644 --- a/narwhal/primary/src/certificate_waiter.rs +++ b/narwhal/primary/src/certificate_waiter.rs @@ -161,13 +161,16 @@ impl CertificateWaiter { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow_and_update().clone(); match message { - ReconfigureNotification::NewCommittee(committee) => { + ReconfigureNotification::NewEpoch(committee) => { self.committee = committee; - tracing::debug!("Committee updated to {}", self.committee); self.pending.clear(); }, + ReconfigureNotification::UpdateCommittee(committee) => { + self.committee = committee; + }, ReconfigureNotification::Shutdown => return } + tracing::debug!("Committee updated to {}", self.committee); } () = &mut timer => { diff --git a/narwhal/primary/src/core.rs b/narwhal/primary/src/core.rs index 50106a0a316e1..3021537398ae1 100644 --- a/narwhal/primary/src/core.rs +++ b/narwhal/primary/src/core.rs @@ -524,8 +524,8 @@ impl Core { .expect("Reconfigure channel dropped") { let message = self.rx_reconfigure.borrow().clone(); - if let ReconfigureNotification::NewCommittee(new_committee) = message { - self.update_committee(new_committee); + if let ReconfigureNotification::NewEpoch(new_committee) = message { + self.change_epoch(new_committee); // Mark the value as seen. let _ = self.rx_reconfigure.borrow_and_update(); } @@ -533,16 +533,15 @@ impl Core { } /// Update the committee and cleanup internal state. - fn update_committee(&mut self, committee: Committee) { - tracing::info!("Committee updated to epoch {}", committee.epoch); + fn change_epoch(&mut self, committee: Committee) { + self.committee = committee; + self.last_voted.clear(); self.processing.clear(); self.certificates_aggregators.clear(); self.cancel_handlers.clear(); - self.committee = committee; self.synchronizer.update_genesis(&self.committee); - tracing::debug!("Committee updated to {}", self.committee); } // Main loop listening to incoming messages. @@ -592,12 +591,16 @@ impl Core { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { - self.update_committee(new_committee); - Ok(()) + ReconfigureNotification::NewEpoch(new_committee) => { + self.change_epoch(new_committee); + }, + ReconfigureNotification::UpdateCommittee(new_committee) => { + self.committee = new_committee; }, ReconfigureNotification::Shutdown => return } + tracing::debug!("Committee updated to {}", self.committee); + Ok(()) } // Check whether the consensus round has changed, to clean up structures diff --git a/narwhal/primary/src/header_waiter.rs b/narwhal/primary/src/header_waiter.rs index 9a751ec82cfbe..bfa4f5139fece 100644 --- a/narwhal/primary/src/header_waiter.rs +++ b/narwhal/primary/src/header_waiter.rs @@ -135,13 +135,12 @@ impl HeaderWaiter { } /// Update the committee and cleanup internal state. - fn update_committee(&mut self, committee: Committee) { + fn change_epoch(&mut self, committee: Committee) { + self.committee = committee; + self.pending.clear(); self.batch_requests.clear(); self.parent_requests.clear(); - - self.committee = committee; - tracing::debug!("Committee updated to {}", self.committee); } /// Helper function. It waits for particular data to become available in the storage @@ -214,8 +213,7 @@ impl HeaderWaiter { .expect("Author of valid header is not in the committee") .primary_to_worker; - // TODO [issue #423]: This network transmission needs to be reliable. The worker may crash-recover - // or a committee change may change the worker's IP address. + // TODO [issue #423]: This network transmission needs to be reliable: the worker may crash-recover. let message = PrimaryWorkerMessage::Synchronize(digests, author.clone()); self.worker_network.unreliable_send(address, &message).await; } @@ -326,11 +324,15 @@ impl HeaderWaiter { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { - self.update_committee(new_committee); + ReconfigureNotification::NewEpoch(new_committee) => { + self.change_epoch(new_committee); + }, + ReconfigureNotification::UpdateCommittee(new_committee) => { + self.committee = new_committee; }, ReconfigureNotification::Shutdown => return } + tracing::debug!("Committee updated to {}", self.committee); }, // Check for a new consensus round number diff --git a/narwhal/primary/src/helper.rs b/narwhal/primary/src/helper.rs index d3dd987ee3e18..84581fbc3d559 100644 --- a/narwhal/primary/src/helper.rs +++ b/narwhal/primary/src/helper.rs @@ -115,12 +115,15 @@ impl Helper { result.expect("Committee channel dropped"); let message = self.rx_committee.borrow().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { + ReconfigureNotification::NewEpoch(new_committee) => { + self.committee = new_committee; + }, + ReconfigureNotification::UpdateCommittee(new_committee) => { self.committee = new_committee; - tracing::debug!("Committee updated to {}", self.committee); }, ReconfigureNotification::Shutdown => return } + tracing::debug!("Committee updated to {}", self.committee); } } } diff --git a/narwhal/primary/src/primary.rs b/narwhal/primary/src/primary.rs index 58da8bcb8112b..1ff5b543797df 100644 --- a/narwhal/primary/src/primary.rs +++ b/narwhal/primary/src/primary.rs @@ -162,11 +162,8 @@ impl Primary { result = mon_rx_reconfigure.changed() => { result.expect("Committee channel dropped"); let message = mon_rx_reconfigure.borrow().clone(); - match message { - ReconfigureNotification::NewCommittee(_new_committee) => { - // noop - }, - ReconfigureNotification::Shutdown => return + if let ReconfigureNotification::Shutdown = message { + return; } } } diff --git a/narwhal/primary/src/proposer.rs b/narwhal/primary/src/proposer.rs index f1ab38d202274..902d62a833a21 100644 --- a/narwhal/primary/src/proposer.rs +++ b/narwhal/primary/src/proposer.rs @@ -129,11 +129,11 @@ impl Proposer { } /// Update the committee and cleanup internal state. - fn update_committee(&mut self, committee: Committee) { + fn change_epoch(&mut self, committee: Committee) { self.committee = committee; + self.round = 0; self.last_parents = Certificate::genesis(&self.committee); - tracing::debug!("Committee updated to {}", self.committee); } // Main loop listening to incoming messages. @@ -259,11 +259,15 @@ impl Proposer { Ordering::Greater => { let message = self.rx_reconfigure.borrow_and_update().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { - self.update_committee(new_committee); + ReconfigureNotification::NewEpoch(new_committee) => { + self.change_epoch(new_committee); + }, + ReconfigureNotification::UpdateCommittee(new_committee) => { + self.committee = new_committee; }, ReconfigureNotification::Shutdown => return, } + tracing::debug!("Committee updated to {}", self.committee); } Ordering::Less => { // We already updated committee but the core is slow. Ignore the parents @@ -316,11 +320,16 @@ impl Proposer { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { - self.update_committee(new_committee); + ReconfigureNotification::NewEpoch(new_committee) => { + self.change_epoch(new_committee); + }, + ReconfigureNotification::UpdateCommittee(new_committee) => { + self.committee = new_committee; }, ReconfigureNotification::Shutdown => return, } + tracing::debug!("Committee updated to {}", self.committee); + } } } diff --git a/narwhal/primary/src/state_handler.rs b/narwhal/primary/src/state_handler.rs index be27de5ef28a9..fa76a4dfedc77 100644 --- a/narwhal/primary/src/state_handler.rs +++ b/narwhal/primary/src/state_handler.rs @@ -93,16 +93,22 @@ impl StateHandler { Some(message) = self.rx_reconfigure.recv() => { let shutdown = match &message { - ReconfigureNotification::NewCommittee(committee) => { + ReconfigureNotification::NewEpoch(committee) => { // Update the committee. self.committee.swap(Arc::new(committee.clone())); - tracing::debug!("Committee updated to {}", self.committee); // Trigger cleanup on the primary. let _ = self.tx_consensus_round_updates.send(0); // ignore error when receivers dropped. + tracing::debug!("Committee updated to {}", self.committee); false }, + ReconfigureNotification::UpdateCommittee(committee) => { + self.committee.swap(Arc::new(committee.clone())); + + tracing::debug!("Committee updated to {}", self.committee); + false + } ReconfigureNotification::Shutdown => true, }; diff --git a/narwhal/primary/src/tests/block_remover_tests.rs b/narwhal/primary/src/tests/block_remover_tests.rs index 0481352b9b4c2..3c5f062f713ff 100644 --- a/narwhal/primary/src/tests/block_remover_tests.rs +++ b/narwhal/primary/src/tests/block_remover_tests.rs @@ -46,7 +46,7 @@ async fn test_successful_blocks_delete() { // AND the necessary keys let (name, committee) = resolve_name_and_committee(); let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // AND a Dag with genesis populated let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); let dag = Arc::new(Dag::new(&committee, rx_consensus, consensus_metrics).1); @@ -210,7 +210,7 @@ async fn test_timeout() { // AND the necessary keys let (name, committee) = resolve_name_and_committee(); let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // AND a Dag with genesis populated let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); let dag = Arc::new(Dag::new(&committee, rx_consensus, consensus_metrics).1); @@ -346,8 +346,7 @@ async fn test_unlocking_pending_requests() { // AND the necessary keys let (name, committee) = resolve_name_and_committee(); - let (_, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // AND a Dag with genesis populated let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); diff --git a/narwhal/primary/src/tests/block_waiter_tests.rs b/narwhal/primary/src/tests/block_waiter_tests.rs index 0536070f21055..b48bf30b4065e 100644 --- a/narwhal/primary/src/tests/block_waiter_tests.rs +++ b/narwhal/primary/src/tests/block_waiter_tests.rs @@ -40,7 +40,7 @@ async fn test_successfully_retrieve_block() { // AND spawn a new blocks waiter let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_commands, rx_commands) = channel(1); let (tx_get_block, rx_get_block) = oneshot::channel(); let (tx_batch_messages, rx_batch_messages) = channel(10); @@ -212,7 +212,7 @@ async fn test_successfully_retrieve_multiple_blocks() { // AND spawn a new blocks waiter let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_commands, rx_commands) = channel(1); let (tx_get_blocks, rx_get_blocks) = oneshot::channel(); let (tx_batch_messages, rx_batch_messages) = channel(10); @@ -302,8 +302,7 @@ async fn test_one_pending_request_for_block_at_time() { let block_id = certificate.digest(); // AND - let (_, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (_, rx_commands) = channel(1); let (_, rx_batch_messages) = channel(1); @@ -380,8 +379,7 @@ async fn test_unlocking_pending_get_block_request_after_response() { let block_id = certificate.digest(); // AND spawn a new blocks waiter - let (_, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (_, rx_commands) = channel(1); let (_, rx_batch_messages) = channel(1); @@ -450,7 +448,7 @@ async fn test_batch_timeout() { // AND spawn a new blocks waiter let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_commands, rx_commands) = channel(1); let (tx_get_block, rx_get_block) = oneshot::channel(); let (_, rx_batch_messages) = channel(10); @@ -518,7 +516,7 @@ async fn test_return_error_when_certificate_is_missing() { // AND spawn a new blocks waiter let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_commands, rx_commands) = channel(1); let (tx_get_block, rx_get_block) = oneshot::channel(); let (_, rx_batch_messages) = channel(10); @@ -580,7 +578,7 @@ async fn test_return_error_when_certificate_is_missing_when_get_blocks() { // AND spawn a new blocks waiter let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_commands, rx_commands) = channel(1); let (tx_get_blocks, rx_get_blocks) = oneshot::channel(); let (_, rx_batch_messages) = channel(10); diff --git a/narwhal/primary/src/tests/certificate_waiter_tests.rs b/narwhal/primary/src/tests/certificate_waiter_tests.rs index b7c7bbc28d9b0..187f9c68e4f6d 100644 --- a/narwhal/primary/src/tests/certificate_waiter_tests.rs +++ b/narwhal/primary/src/tests/certificate_waiter_tests.rs @@ -29,7 +29,7 @@ async fn process_certificate_missing_parents_in_reverse() { // kept empty let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee(None))); + watch::channel(ReconfigureNotification::NewEpoch(committee(None))); // synchronizer to header waiter let (tx_sync_headers, rx_sync_headers) = channel(1); // synchronizer to certificate waiter @@ -172,7 +172,7 @@ async fn process_certificate_check_gc_fires() { // kept empty let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee(None))); + watch::channel(ReconfigureNotification::NewEpoch(committee(None))); // synchronizer to header waiter let (tx_sync_headers, rx_sync_headers) = channel(1); // synchronizer to certificate waiter diff --git a/narwhal/primary/src/tests/core_tests.rs b/narwhal/primary/src/tests/core_tests.rs index 9cc95b53632bd..e372875503b4a 100644 --- a/narwhal/primary/src/tests/core_tests.rs +++ b/narwhal/primary/src/tests/core_tests.rs @@ -23,7 +23,7 @@ async fn process_header() { let committee = committee(None); let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_sync_headers, _rx_sync_headers) = channel(1); let (tx_sync_certificates, _rx_sync_certificates) = channel(1); let (tx_primary_messages, rx_primary_messages) = channel(1); @@ -113,8 +113,7 @@ async fn process_header_missing_parent() { let name = kp.public().clone(); let signature_service = SignatureService::new(kp); - let (_, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee(None))); + let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee(None))); let (tx_sync_headers, _rx_sync_headers) = channel(1); let (tx_sync_certificates, _rx_sync_certificates) = channel(1); let (tx_primary_messages, rx_primary_messages) = channel(1); @@ -190,8 +189,7 @@ async fn process_header_missing_payload() { let name = kp.public().clone(); let signature_service = SignatureService::new(kp); - let (_, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee(None))); + let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee(None))); let (tx_sync_headers, _rx_sync_headers) = channel(1); let (tx_sync_certificates, _rx_sync_certificates) = channel(1); let (tx_primary_messages, rx_primary_messages) = channel(1); @@ -280,7 +278,7 @@ async fn process_votes() { let committee = committee(None); let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_sync_headers, _rx_sync_headers) = channel(1); let (tx_sync_certificates, _rx_sync_certificates) = channel(1); let (tx_primary_messages, rx_primary_messages) = channel(1); @@ -374,7 +372,7 @@ async fn process_certificates() { let signature_service = SignatureService::new(kp); let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee(None))); + watch::channel(ReconfigureNotification::NewEpoch(committee(None))); let (tx_sync_headers, _rx_sync_headers) = channel(1); let (tx_sync_certificates, _rx_sync_certificates) = channel(1); let (tx_primary_messages, rx_primary_messages) = channel(3); @@ -479,7 +477,7 @@ async fn shutdown_core() { let committee = committee(None); let (tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_sync_headers, _rx_sync_headers) = channel(1); let (tx_sync_certificates, _rx_sync_certificates) = channel(1); let (_tx_primary_messages, rx_primary_messages) = channel(1); @@ -546,7 +544,7 @@ async fn reconfigure_core() { // All the channels to interface with the core. let (tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_sync_headers, _rx_sync_headers) = channel(1); let (tx_sync_certificates, _rx_sync_certificates) = channel(1); let (tx_primary_messages, rx_primary_messages) = channel(1); @@ -604,7 +602,7 @@ async fn reconfigure_core() { ); // Change committee - let message = ReconfigureNotification::NewCommittee(new_committee.clone()); + let message = ReconfigureNotification::NewEpoch(new_committee.clone()); tx_reconfigure.send(message).unwrap(); // Send a header to the core. diff --git a/narwhal/primary/src/tests/header_waiter_tests.rs b/narwhal/primary/src/tests/header_waiter_tests.rs index c3f68b002b95a..42b4ea0dfa351 100644 --- a/narwhal/primary/src/tests/header_waiter_tests.rs +++ b/narwhal/primary/src/tests/header_waiter_tests.rs @@ -25,7 +25,7 @@ async fn successfully_synchronize_batches() { let (_, certificate_store, payload_store) = create_db_stores(); let gc_depth: Round = 1; let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_synchronizer, rx_synchronizer) = channel(10); let (tx_core, mut rx_core) = channel(10); let metrics = Arc::new(PrimaryMetrics::new(&Registry::new())); diff --git a/narwhal/primary/src/tests/helper_tests.rs b/narwhal/primary/src/tests/helper_tests.rs index 4c84cbb107200..2e9d692baa1b5 100644 --- a/narwhal/primary/src/tests/helper_tests.rs +++ b/narwhal/primary/src/tests/helper_tests.rs @@ -29,7 +29,7 @@ async fn test_process_certificates_stream_mode() { let (_, certificate_store, payload_store) = create_db_stores(); let key = keys(None).pop().unwrap(); let (name, committee) = resolve_name_and_committee(); - let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewCommittee( + let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch( test_utils::committee(None), )); let (tx_primaries, rx_primaries) = channel(10); @@ -105,7 +105,7 @@ async fn test_process_certificates_batch_mode() { let (_, certificate_store, payload_store) = create_db_stores(); let key = keys(None).pop().unwrap(); let (name, committee) = resolve_name_and_committee(); - let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewCommittee( + let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch( test_utils::committee(None), )); let (tx_primaries, rx_primaries) = channel(10); @@ -202,7 +202,7 @@ async fn test_process_payload_availability_success() { let (_, certificate_store, payload_store) = create_db_stores(); let key = keys(None).pop().unwrap(); let (name, committee) = resolve_name_and_committee(); - let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewCommittee( + let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch( test_utils::committee(None), )); let (tx_primaries, rx_primaries) = channel(10); @@ -318,7 +318,7 @@ async fn test_process_payload_availability_when_failures() { let key = keys(None).pop().unwrap(); let (name, committee) = resolve_name_and_committee(); - let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewCommittee( + let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch( test_utils::committee(None), )); let (tx_primaries, rx_primaries) = channel(10); diff --git a/narwhal/primary/src/tests/proposer_tests.rs b/narwhal/primary/src/tests/proposer_tests.rs index 7b374caeffe8c..a753fb3129928 100644 --- a/narwhal/primary/src/tests/proposer_tests.rs +++ b/narwhal/primary/src/tests/proposer_tests.rs @@ -14,7 +14,7 @@ async fn propose_empty() { let signature_service = SignatureService::new(kp); let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee(None))); + watch::channel(ReconfigureNotification::NewEpoch(committee(None))); let (_tx_parents, rx_parents) = channel(1); let (_tx_our_digests, rx_our_digests) = channel(1); let (tx_headers, mut rx_headers) = channel(1); @@ -50,7 +50,7 @@ async fn propose_payload() { let signature_service = SignatureService::new(kp); let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewCommittee(committee(None))); + watch::channel(ReconfigureNotification::NewEpoch(committee(None))); let (_tx_parents, rx_parents) = channel(1); let (tx_our_digests, rx_our_digests) = channel(1); let (tx_headers, mut rx_headers) = channel(1); diff --git a/narwhal/primary/tests/epoch_change.rs b/narwhal/primary/tests/epoch_change.rs index 9b18dd4375db3..9c9db61e6d962 100644 --- a/narwhal/primary/tests/epoch_change.rs +++ b/narwhal/primary/tests/epoch_change.rs @@ -37,7 +37,7 @@ async fn test_simple_epoch_change() { let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); tx_channels.push(tx_feedback.clone()); - let initial_committee = ReconfigureNotification::NewCommittee(committee_0.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee_0.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let store = NodeStorage::reopen(temp_dir()); @@ -86,7 +86,7 @@ async fn test_simple_epoch_change() { .values() .map(|authority| authority.primary.worker_to_primary.clone()) .collect(); - let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::NewCommittee( + let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::NewEpoch( new_committee.clone(), )); let mut _do_not_drop: Vec> = Vec::new(); @@ -142,7 +142,7 @@ async fn test_partial_committee_change() { epoch_0_rx_channels.push(rx_new_certificates); let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); epoch_0_tx_channels.push(tx_feedback.clone()); - let initial_committee = ReconfigureNotification::NewCommittee(committee_0.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee_0.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let store = NodeStorage::reopen(temp_dir()); @@ -219,7 +219,7 @@ async fn test_partial_committee_change() { let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); epoch_1_tx_channels.push(tx_feedback.clone()); - let initial_committee = ReconfigureNotification::NewCommittee(committee_1.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee_1.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let store = NodeStorage::reopen(temp_dir()); @@ -248,9 +248,8 @@ async fn test_partial_committee_change() { .values() .map(|authority| authority.primary.worker_to_primary.clone()) .collect(); - let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::NewCommittee( - committee_1.clone(), - )); + let message = + WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::NewEpoch(committee_1.clone())); let mut _do_not_drop: Vec> = Vec::new(); for address in addresses { _do_not_drop.push( @@ -296,7 +295,7 @@ async fn test_restart_with_new_committee_change() { let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); tx_channels.push(tx_feedback.clone()); - let initial_committee = ReconfigureNotification::NewCommittee(committee_0.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee_0.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let store = NodeStorage::reopen(temp_dir()); @@ -367,7 +366,7 @@ async fn test_restart_with_new_committee_change() { let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); tx_channels.push(tx_feedback.clone()); - let initial_committee = ReconfigureNotification::NewCommittee(new_committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(new_committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let store = NodeStorage::reopen(temp_dir()); @@ -421,3 +420,115 @@ async fn test_restart_with_new_committee_change() { join_all(handles).await; } } + +/// Update the committee without changing the epoch. +#[tokio::test] +async fn test_simple_committee_update() { + let parameters = Parameters { + batch_size: 200, // Two transactions. + ..Parameters::default() + }; + + // The configuration of epoch 0. + let keys_0 = keys(None); + let committee_0 = pure_committee_from_keys(&keys_0); + + // Spawn the committee of epoch 0. + let mut rx_channels = Vec::new(); + let mut tx_channels = Vec::new(); + for keypair in keys_0 { + let name = keypair.public().clone(); + let signer = keypair; + + let (tx_new_certificates, rx_new_certificates) = channel(CHANNEL_CAPACITY); + rx_channels.push(rx_new_certificates); + let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); + tx_channels.push(tx_feedback.clone()); + + let initial_committee = ReconfigureNotification::NewEpoch(committee_0.clone()); + let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); + + let store = NodeStorage::reopen(temp_dir()); + + Primary::spawn( + name, + signer, + Arc::new(ArcSwap::from_pointee(committee_0.clone())), + parameters.clone(), + store.header_store.clone(), + store.certificate_store.clone(), + store.payload_store.clone(), + /* tx_consensus */ tx_new_certificates, + /* rx_consensus */ rx_feedback, + /* dag */ None, + NetworkModel::Asynchronous, + tx_reconfigure, + /* tx_committed_certificates */ tx_feedback, + &Registry::new(), + ); + } + + // Run for a while in epoch 0. + for rx in rx_channels.iter_mut() { + loop { + let certificate = rx.recv().await.unwrap(); + assert_eq!(certificate.epoch(), 0); + if certificate.round() == 10 { + break; + } + } + } + + // Update the committee + let mut old_committee = committee_0; + for _ in 1..=3 { + // Update the committee + let mut new_committee = old_committee.clone(); + + let mut total_stake = 0; + let threshold = new_committee.validity_threshold(); + for (_, authority) in new_committee.authorities.iter_mut() { + if total_stake < threshold { + authority.primary.primary_to_primary = format!( + "/ip4/127.0.0.1/tcp/{}/http", + config::utils::get_available_port() + ) + .parse() + .unwrap(); + + total_stake += authority.stake; + } + } + + // Notify the old committee about the change in committee information. + let addresses: Vec<_> = old_committee + .authorities + .values() + .map(|authority| authority.primary.worker_to_primary.clone()) + .collect(); + let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::UpdateCommittee( + new_committee.clone(), + )); + let mut _do_not_drop: Vec> = Vec::new(); + for address in addresses { + _do_not_drop.push( + WorkerToPrimaryNetwork::default() + .send(address, &message) + .await, + ); + } + + // Run for a while. + for rx in rx_channels.iter_mut() { + loop { + let certificate = rx.recv().await.unwrap(); + assert_eq!(certificate.epoch(), 0); + if certificate.round() == 10 { + break; + } + } + } + + old_committee = new_committee; + } +} diff --git a/narwhal/primary/tests/integration_tests_proposer_api.rs b/narwhal/primary/tests/integration_tests_proposer_api.rs index 807a0725948b7..2d797c7026e18 100644 --- a/narwhal/primary/tests/integration_tests_proposer_api.rs +++ b/narwhal/primary/tests/integration_tests_proposer_api.rs @@ -71,7 +71,7 @@ async fn test_rounds_errors() { // Spawn the primary let (tx_new_certificates, rx_new_certificates) = channel(CHANNEL_CAPACITY); let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); // AND create a committee passed exclusively to the DAG that does not include the name public key @@ -154,7 +154,7 @@ async fn test_rounds_return_successful_response() { // Spawn the primary let (tx_new_certificates, rx_new_certificates) = channel(CHANNEL_CAPACITY); let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); // AND setup the DAG @@ -291,7 +291,7 @@ async fn test_node_read_causal_signed_certificates() { .unwrap(); let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let primary_1_parameters = Parameters { @@ -322,7 +322,7 @@ async fn test_node_read_causal_signed_certificates() { let (tx_new_certificates_2, rx_new_certificates_2) = channel(CHANNEL_CAPACITY); let (tx_feedback_2, rx_feedback_2) = channel(CHANNEL_CAPACITY); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let primary_2_parameters = Parameters { diff --git a/narwhal/primary/tests/integration_tests_validator_api.rs b/narwhal/primary/tests/integration_tests_validator_api.rs index 8c0b86f830351..d1aaf8db68707 100644 --- a/narwhal/primary/tests/integration_tests_validator_api.rs +++ b/narwhal/primary/tests/integration_tests_validator_api.rs @@ -101,7 +101,7 @@ async fn test_get_collections() { let (tx_new_certificates, rx_new_certificates) = channel(CHANNEL_CAPACITY); let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); @@ -287,7 +287,7 @@ async fn test_remove_collections() { } let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); Primary::spawn( @@ -490,7 +490,7 @@ async fn test_read_causal_signed_certificates() { let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let primary_1_parameters = Parameters { @@ -521,7 +521,7 @@ async fn test_read_causal_signed_certificates() { let (tx_new_certificates_2, rx_new_certificates_2) = channel(CHANNEL_CAPACITY); let (tx_feedback_2, rx_feedback_2) = channel(CHANNEL_CAPACITY); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let primary_2_parameters = Parameters { @@ -696,7 +696,7 @@ async fn test_read_causal_unsigned_certificates() { let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); // Spawn Primary 1 that we will be interacting with. @@ -719,7 +719,7 @@ async fn test_read_causal_unsigned_certificates() { let (tx_new_certificates_2, rx_new_certificates_2) = channel(CHANNEL_CAPACITY); let (tx_feedback_2, rx_feedback_2) = channel(CHANNEL_CAPACITY); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let consensus_metrics_2 = Arc::new(ConsensusMetrics::new(&Registry::new())); @@ -860,7 +860,7 @@ async fn test_get_collections_with_missing_certificates() { // Spawn the primary 1 (which will be the one that we'll interact with) let (tx_new_certificates_1, rx_new_certificates_1) = channel(CHANNEL_CAPACITY); let (tx_feedback_1, rx_feedback_1) = channel(CHANNEL_CAPACITY); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); let consensus_metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); @@ -904,7 +904,7 @@ async fn test_get_collections_with_missing_certificates() { // Spawn the primary 2 - a peer to fetch missing certificates from let (tx_new_certificates_2, _) = channel(CHANNEL_CAPACITY); let (tx_feedback_2, rx_feedback_2) = channel(CHANNEL_CAPACITY); - let initial_committee = ReconfigureNotification::NewCommittee(committee.clone()); + let initial_committee = ReconfigureNotification::NewEpoch(committee.clone()); let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee); Primary::spawn( diff --git a/narwhal/types/src/primary.rs b/narwhal/types/src/primary.rs index ae636da106e99..f6999b296821a 100644 --- a/narwhal/types/src/primary.rs +++ b/narwhal/types/src/primary.rs @@ -545,11 +545,13 @@ pub enum PrimaryMessage { }, } -/// Message to reconfigure worker tasks. +/// Message to reconfigure worker tasks. This message must be sent by a trusted source. #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ReconfigureNotification { - /// Indicate the committee has been updated. - NewCommittee(Committee), + /// Indicate the committee has changed. This happens at epoch change. + NewEpoch(Committee), + /// Update some network information of the committee. + UpdateCommittee(Committee), /// Indicate a shutdown. Shutdown, } diff --git a/narwhal/worker/src/batch_maker.rs b/narwhal/worker/src/batch_maker.rs index a0d0459096c3b..d9c7eb7e32207 100644 --- a/narwhal/worker/src/batch_maker.rs +++ b/narwhal/worker/src/batch_maker.rs @@ -94,12 +94,16 @@ impl BatchMaker { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { + ReconfigureNotification::NewEpoch(new_committee) => { self.committee = new_committee; - tracing::debug!("Committee updated to {}", self.committee); + }, + ReconfigureNotification::UpdateCommittee(new_committee) => { + self.committee = new_committee; + }, ReconfigureNotification::Shutdown => return } + tracing::debug!("Committee updated to {}", self.committee); } } diff --git a/narwhal/worker/src/helper.rs b/narwhal/worker/src/helper.rs index 3af315644eb17..824f4e87bcc2b 100644 --- a/narwhal/worker/src/helper.rs +++ b/narwhal/worker/src/helper.rs @@ -113,12 +113,16 @@ impl Helper { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { + ReconfigureNotification::NewEpoch(new_committee) => { self.committee = new_committee; - tracing::debug!("Committee updated to {}", self.committee); + }, + ReconfigureNotification::UpdateCommittee(new_committee) => { + self.committee = new_committee; + }, ReconfigureNotification::Shutdown => return } + tracing::debug!("Committee updated to {}", self.committee); } } } diff --git a/narwhal/worker/src/primary_connector.rs b/narwhal/worker/src/primary_connector.rs index 02c5c5c65f5e6..656921a055c68 100644 --- a/narwhal/worker/src/primary_connector.rs +++ b/narwhal/worker/src/primary_connector.rs @@ -73,12 +73,16 @@ impl PrimaryConnector { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { + ReconfigureNotification::NewEpoch(new_committee) => { self.committee = new_committee; - tracing::debug!("Committee updated to {}", self.committee); + }, + ReconfigureNotification::UpdateCommittee(new_committee) => { + self.committee = new_committee; + }, ReconfigureNotification::Shutdown => return } + tracing::debug!("Committee updated to {}", self.committee); } Some(_result) = futures.next() => () diff --git a/narwhal/worker/src/quorum_waiter.rs b/narwhal/worker/src/quorum_waiter.rs index 992e75f538d95..ff84e49923b74 100644 --- a/narwhal/worker/src/quorum_waiter.rs +++ b/narwhal/worker/src/quorum_waiter.rs @@ -120,9 +120,14 @@ impl QuorumWaiter { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { - self.committee=new_committee; - tracing::debug!("Committee updated to {}", self.committee); + ReconfigureNotification::NewEpoch(new_committee) => { + self.committee = new_committee; + tracing::debug!("Dropping batch: committee updated to {}", self.committee); + break; // Don't wait for acknowledgements. + }, + ReconfigureNotification::UpdateCommittee(new_committee) => { + self.committee = new_committee; + tracing::debug!("Dropping batch: committee updated to {}", self.committee); break; // Don't wait for acknowledgements. }, ReconfigureNotification::Shutdown => return @@ -137,12 +142,16 @@ impl QuorumWaiter { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); match message { - ReconfigureNotification::NewCommittee(new_committee) => { + ReconfigureNotification::NewEpoch(new_committee) => { self.committee = new_committee; - tracing::debug!("Committee updated to {}", self.committee); + }, + ReconfigureNotification::UpdateCommittee(new_committee) => { + self.committee = new_committee; + }, ReconfigureNotification::Shutdown => return } + tracing::debug!("Committee updated to {}", self.committee); } } } diff --git a/narwhal/worker/src/synchronizer.rs b/narwhal/worker/src/synchronizer.rs index 795b20f0bc8a2..66ad7df50941e 100644 --- a/narwhal/worker/src/synchronizer.rs +++ b/narwhal/worker/src/synchronizer.rs @@ -205,13 +205,20 @@ impl Synchronizer { PrimaryWorkerMessage::Reconfigure(message) => { // Reconfigure this task and update the shared committee. let shutdown = match &message { - ReconfigureNotification::NewCommittee(new_committee) => { + ReconfigureNotification::NewEpoch(new_committee) => { self.committee.swap(Arc::new(new_committee.clone())); - tracing::debug!("Committee updated to {}", self.committee); + self.pending.clear(); self.round = 0; waiting.clear(); + + false + } + ReconfigureNotification::UpdateCommittee(new_committee) => { + self.committee.swap(Arc::new(new_committee.clone())); + + tracing::debug!("Committee updated to {}", self.committee); false } ReconfigureNotification::Shutdown => true diff --git a/narwhal/worker/src/tests/batch_maker_tests.rs b/narwhal/worker/src/tests/batch_maker_tests.rs index 5053a83e1c40a..554e859a74bcb 100644 --- a/narwhal/worker/src/tests/batch_maker_tests.rs +++ b/narwhal/worker/src/tests/batch_maker_tests.rs @@ -9,7 +9,7 @@ use tokio::sync::mpsc::channel; async fn make_batch() { let committee = committee(None).clone(); let (_tx_reconfiguration, rx_reconfiguration) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_transaction, rx_transaction) = channel(1); let (tx_message, mut rx_message) = channel(1); @@ -39,7 +39,7 @@ async fn make_batch() { async fn batch_timeout() { let committee = committee(None).clone(); let (_tx_reconfiguration, rx_reconfiguration) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_transaction, rx_transaction) = channel(1); let (tx_message, mut rx_message) = channel(1); diff --git a/narwhal/worker/src/tests/helper_tests.rs b/narwhal/worker/src/tests/helper_tests.rs index f47e4f1dbec11..f8017f83b2cac 100644 --- a/narwhal/worker/src/tests/helper_tests.rs +++ b/narwhal/worker/src/tests/helper_tests.rs @@ -19,7 +19,7 @@ async fn worker_batch_reply() { let id = 0; let committee = committee(None).clone(); let (_tx_reconfiguration, rx_reconfiguration) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // Create a new test store. let db = rocks::DBMap::::open( @@ -67,7 +67,7 @@ async fn client_batch_reply() { let id = 0; let committee = committee(None).clone(); let (_tx_reconfiguration, rx_reconfiguration) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // Create a new test store. let db = rocks::DBMap::::open( diff --git a/narwhal/worker/src/tests/processor_tests.rs b/narwhal/worker/src/tests/processor_tests.rs index 3455b9fda8aa6..a9e13b8ec9a9f 100644 --- a/narwhal/worker/src/tests/processor_tests.rs +++ b/narwhal/worker/src/tests/processor_tests.rs @@ -15,7 +15,7 @@ async fn hash_and_store() { let committee = committee(None).clone(); let (_tx_reconfiguration, rx_reconfiguration) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // Create a new test store. let db = rocks::DBMap::::open( diff --git a/narwhal/worker/src/tests/quorum_waiter_tests.rs b/narwhal/worker/src/tests/quorum_waiter_tests.rs index 965b2e53b2293..ae6387989ada2 100644 --- a/narwhal/worker/src/tests/quorum_waiter_tests.rs +++ b/narwhal/worker/src/tests/quorum_waiter_tests.rs @@ -15,7 +15,7 @@ async fn wait_for_quorum() { let committee = committee(None).clone(); let (_tx_reconfiguration, rx_reconfiguration) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // Spawn a `QuorumWaiter` instance. let _quorum_waiter_handler = QuorumWaiter::spawn( diff --git a/narwhal/worker/src/tests/synchronizer_tests.rs b/narwhal/worker/src/tests/synchronizer_tests.rs index d83f021bc7faf..4f57aebc7af00 100644 --- a/narwhal/worker/src/tests/synchronizer_tests.rs +++ b/narwhal/worker/src/tests/synchronizer_tests.rs @@ -23,7 +23,7 @@ async fn synchronize() { let committee = committee(None); let (tx_reconfiguration, _rx_reconfiguration) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // Create a new test store. let store = open_batch_store(); @@ -74,7 +74,7 @@ async fn test_successful_request_batch() { let committee = committee(None); let (tx_reconfiguration, _rx_reconfiguration) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // Create a new test store. let store = open_batch_store(); @@ -137,7 +137,7 @@ async fn test_request_batch_not_found() { let committee = committee(None); let (tx_reconfiguration, _rx_reconfiguration) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // Create a new test store. let store = open_batch_store(); @@ -199,7 +199,7 @@ async fn test_successful_batch_delete() { let committee = committee(None); let (tx_reconfiguration, _rx_reconfiguration) = - watch::channel(ReconfigureNotification::NewCommittee(committee.clone())); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // Create a new test store. let store = open_batch_store(); diff --git a/narwhal/worker/src/worker.rs b/narwhal/worker/src/worker.rs index 651d551ff4771..ba7bd3c5af30c 100644 --- a/narwhal/worker/src/worker.rs +++ b/narwhal/worker/src/worker.rs @@ -82,9 +82,8 @@ impl Worker { let (tx_primary, rx_primary) = channel(CHANNEL_CAPACITY); let initial_committee = (*(*(*committee).load()).clone()).clone(); - let (tx_reconfigure, rx_reconfigure) = watch::channel( - ReconfigureNotification::NewCommittee(initial_committee.clone()), - ); + let (tx_reconfigure, rx_reconfigure) = + watch::channel(ReconfigureNotification::NewEpoch(initial_committee.clone())); let client_flow_handles = worker.handle_clients_transactions( &tx_reconfigure,