Skip to content

Commit

Permalink
jwk #4: jwk update quorum certification (aptos-labs#11857)
Browse files Browse the repository at this point in the history
* jwk types update

* update

* update

* jwk txn and execution

* consensus ensure jwk txns are expected

* update

* jwk consensus network type defs

* update cargo.toml

* update

* update

* update

* lint

* jwk update quorum certification

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update
  • Loading branch information
zjma authored Feb 5, 2024
1 parent dae6f11 commit c2a3453
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 0 deletions.
2 changes: 2 additions & 0 deletions crates/aptos-jwk-consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ pub fn start_jwk_consensus_runtime(

pub mod network;
pub mod network_interface;
pub mod observation_aggregation;
pub mod types;
pub mod update_certifier;
97 changes: 97 additions & 0 deletions crates/aptos-jwk-consensus/src/observation_aggregation/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright © Aptos Foundation

use crate::types::{
JWKConsensusMsg, ObservedUpdate, ObservedUpdateRequest, ObservedUpdateResponse,
};
use anyhow::{anyhow, ensure};
use aptos_consensus_types::common::Author;
use aptos_infallible::Mutex;
use aptos_reliable_broadcast::BroadcastStatus;
use aptos_types::{
aggregate_signature::PartialSignatures,
epoch_state::EpochState,
jwks::{ProviderJWKs, QuorumCertifiedUpdate},
};
use move_core_types::account_address::AccountAddress;
use std::{collections::BTreeSet, sync::Arc};

/// The aggregation state of reliable broadcast where a validator broadcast JWK observation requests
/// and produce quorum-certified JWK updates.
pub struct ObservationAggregationState {
epoch_state: Arc<EpochState>,
local_view: ProviderJWKs,
inner_state: Mutex<PartialSignatures>,
}

impl ObservationAggregationState {
pub fn new(epoch_state: Arc<EpochState>, local_view: ProviderJWKs) -> Self {
Self {
epoch_state,
local_view,
inner_state: Mutex::new(PartialSignatures::empty()),
}
}
}

impl BroadcastStatus<JWKConsensusMsg> for Arc<ObservationAggregationState> {
type Aggregated = QuorumCertifiedUpdate;
type Message = ObservedUpdateRequest;
type Response = ObservedUpdateResponse;

fn add(
&self,
sender: Author,
response: Self::Response,
) -> anyhow::Result<Option<Self::Aggregated>> {
let ObservedUpdateResponse { epoch, update } = response;
let ObservedUpdate {
author,
observed: peer_view,
signature,
} = update;
ensure!(
epoch == self.epoch_state.epoch,
"adding peer observation failed with invalid epoch",
);
ensure!(
author == sender,
"adding peer observation failed with mismatched author",
);

let mut partial_sigs = self.inner_state.lock();
if partial_sigs.contains_voter(&sender) {
return Ok(None);
}

ensure!(
self.local_view == peer_view,
"adding peer observation failed with mismatched view"
);

// Verify peer signature.
self.epoch_state
.verifier
.verify(sender, &peer_view, &signature)?;

// All checks passed. Aggregating.
partial_sigs.add_signature(sender, signature);
let voters: BTreeSet<AccountAddress> = partial_sigs.signatures().keys().copied().collect();
if self
.epoch_state
.verifier
.check_voting_power(voters.iter(), true)
.is_err()
{
return Ok(None);
}
let multi_sig = self.epoch_state.verifier.aggregate_signatures(&partial_sigs).map_err(|e|anyhow!("adding peer observation failed with partial-to-aggregated conversion error: {e}"))?;

Ok(Some(QuorumCertifiedUpdate {
update: peer_view,
multi_sig,
}))
}
}

#[cfg(test)]
mod tests;
132 changes: 132 additions & 0 deletions crates/aptos-jwk-consensus/src/observation_aggregation/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright © Aptos Foundation

use crate::{
observation_aggregation::ObservationAggregationState,
types::{ObservedUpdate, ObservedUpdateResponse},
};
use aptos_crypto::{bls12381, SigningKey, Uniform};
use aptos_reliable_broadcast::BroadcastStatus;
use aptos_types::{
epoch_state::EpochState,
jwks::{
jwk::{JWKMoveStruct, JWK},
unsupported::UnsupportedJWK,
ProviderJWKs, QuorumCertifiedUpdate,
},
validator_verifier::{ValidatorConsensusInfo, ValidatorVerifier},
};
use move_core_types::account_address::AccountAddress;
use std::sync::Arc;

#[test]
fn test_observation_aggregation_state() {
let num_validators = 5;
let epoch = 999;
let addrs: Vec<AccountAddress> = (0..num_validators)
.map(|_| AccountAddress::random())
.collect();
let private_keys: Vec<bls12381::PrivateKey> = (0..num_validators)
.map(|_| bls12381::PrivateKey::generate_for_testing())
.collect();
let public_keys: Vec<bls12381::PublicKey> = (0..num_validators)
.map(|i| bls12381::PublicKey::from(&private_keys[i]))
.collect();
let voting_powers = [1, 1, 1, 6, 6]; // total voting power: 15, default threshold: 11
let validator_infos: Vec<ValidatorConsensusInfo> = (0..num_validators)
.map(|i| ValidatorConsensusInfo::new(addrs[i], public_keys[i].clone(), voting_powers[i]))
.collect();
let verifier = ValidatorVerifier::new(validator_infos);
let epoch_state = Arc::new(EpochState { epoch, verifier });
let view_0 = ProviderJWKs {
issuer: b"https::/alice.com".to_vec(),
version: 123,
jwks: vec![JWKMoveStruct::from(JWK::Unsupported(
UnsupportedJWK::new_for_testing("id1", "payload1"),
))],
};
let view_1 = ProviderJWKs {
issuer: b"https::/alice.com".to_vec(),
version: 123,
jwks: vec![JWKMoveStruct::from(JWK::Unsupported(
UnsupportedJWK::new_for_testing("id2", "payload2"),
))],
};
let ob_agg_state = Arc::new(ObservationAggregationState::new(
epoch_state.clone(),
view_0.clone(),
));

// `ObservedUpdate` with incorrect epoch should be rejected.
let result = ob_agg_state.add(addrs[0], ObservedUpdateResponse {
epoch: 998,
update: ObservedUpdate {
author: addrs[0],
observed: view_0.clone(),
signature: private_keys[0].sign(&view_0).unwrap(),
},
});
assert!(result.is_err());

// `ObservedUpdate` authored by X but sent by Y should be rejected.
let result = ob_agg_state.add(addrs[1], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[0],
observed: view_0.clone(),
signature: private_keys[0].sign(&view_0).unwrap(),
},
});
assert!(result.is_err());

// `ObservedUpdate` that cannot be verified should be rejected.
let result = ob_agg_state.add(addrs[2], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[2],
observed: view_0.clone(),
signature: private_keys[2].sign(&view_1).unwrap(),
},
});
assert!(result.is_err());

// Good `ObservedUpdate` should be accepted.
let result = ob_agg_state.add(addrs[3], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[3],
observed: view_0.clone(),
signature: private_keys[3].sign(&view_0).unwrap(),
},
});
assert!(matches!(result, Ok(None)));

// `ObservedUpdate` from contributed author should be ignored.
let result = ob_agg_state.add(addrs[3], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[3],
observed: view_0.clone(),
signature: private_keys[3].sign(&view_0).unwrap(),
},
});
assert!(matches!(result, Ok(None)));

// Quorum-certified update should be returned if after adding an `ObservedUpdate`, the threshold is exceeded.
let result = ob_agg_state.add(addrs[4], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[4],
observed: view_0.clone(),
signature: private_keys[4].sign(&view_0).unwrap(),
},
});
let QuorumCertifiedUpdate {
update: observed,
multi_sig,
} = result.unwrap().unwrap();
assert_eq!(view_0, observed);
assert!(epoch_state
.verifier
.verify_multi_signatures(&observed, &multi_sig)
.is_ok());
}
63 changes: 63 additions & 0 deletions crates/aptos-jwk-consensus/src/update_certifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright © Aptos Foundation

use crate::{
observation_aggregation::ObservationAggregationState,
types::{JWKConsensusMsg, ObservedUpdateRequest},
};
use aptos_channels::aptos_channel;
use aptos_reliable_broadcast::ReliableBroadcast;
use aptos_types::{
epoch_state::EpochState,
jwks::{ProviderJWKs, QuorumCertifiedUpdate},
};
use futures_util::future::{AbortHandle, Abortable};
use std::sync::Arc;
use tokio_retry::strategy::ExponentialBackoff;

/// A sub-process of the whole JWK consensus process.
/// Once invoked by `JWKConsensusManager` to `start_produce`,
/// it starts producing a `QuorumCertifiedUpdate` and returns an abort handle.
/// Once an `QuorumCertifiedUpdate` is available, it is sent back via a channel given earlier.
pub trait TUpdateCertifier: Send + Sync {
fn start_produce(
&self,
epoch_state: Arc<EpochState>,
payload: ProviderJWKs,
qc_update_tx: aptos_channel::Sender<(), QuorumCertifiedUpdate>,
) -> AbortHandle;
}

pub struct CertifiedUpdateProducer {
reliable_broadcast: Arc<ReliableBroadcast<JWKConsensusMsg, ExponentialBackoff>>,
}

impl CertifiedUpdateProducer {
pub fn new(reliable_broadcast: ReliableBroadcast<JWKConsensusMsg, ExponentialBackoff>) -> Self {
Self {
reliable_broadcast: Arc::new(reliable_broadcast),
}
}
}

impl TUpdateCertifier for CertifiedUpdateProducer {
fn start_produce(
&self,
epoch_state: Arc<EpochState>,
payload: ProviderJWKs,
qc_update_tx: aptos_channel::Sender<(), QuorumCertifiedUpdate>,
) -> AbortHandle {
let rb = self.reliable_broadcast.clone();
let req = ObservedUpdateRequest {
epoch: epoch_state.epoch,
issuer: payload.issuer.clone(),
};
let agg_state = Arc::new(ObservationAggregationState::new(epoch_state, payload));
let task = async move {
let qc_update = rb.broadcast(req, agg_state).await;
let _ = qc_update_tx.push((), qc_update);
};
let (abort_handle, abort_registration) = AbortHandle::new_pair();
tokio::spawn(Abortable::new(task, abort_registration));
abort_handle
}
}
4 changes: 4 additions & 0 deletions types/src/aggregate_signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,8 @@ impl PartialSignatures {
pub fn signatures(&self) -> &BTreeMap<AccountAddress, bls12381::Signature> {
&self.signatures
}

pub fn contains_voter(&self, voter: &AccountAddress) -> bool {
self.signatures.contains_key(voter)
}
}

0 comments on commit c2a3453

Please sign in to comment.