Skip to content

Commit

Permalink
WIP draft05: Move to ping-pong aggregation flow
Browse files Browse the repository at this point in the history
Implement the new aggregation flow as of DAP spec PR
ietf-wg-ppm/draft-ietf-ppm-dap#393.

This PR introduces the following changes:

* In the current "leader-as-broadcast-channel" flow, the Leader gathers
  each round of prep shares from the Helpers, computes the prep message,
  and broadcasts it to the Helpers in the next round. In the new flow,
  we assume there is exactly one Helper, and each Aggregator runs the
  prep process as far as it it can until it needs input from its peer.
  This results in a significant reduction in the number of HTTP requests
  sent.

* The `Report` serialization has changed in light of specializing the
  protocol for two Aggregators.

* A new `PrepareStepState` variant is defined for signaling commitment
  to the share and transmitting the prep message.

TODO List the code changs that were needed. We've tried to maintain test
coverage, however some tests are no longer relevant since (a) we only
support 1-round VDAFs and (b) draft04 requires one request to complete
aggregation instead of two.
  • Loading branch information
cjpatton committed Jun 20, 2023
1 parent a3a3f59 commit 4745821
Show file tree
Hide file tree
Showing 11 changed files with 1,117 additions and 851 deletions.
55 changes: 37 additions & 18 deletions daphne/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use crate::{
},
taskprov::TaskprovVersion,
vdaf::{
prio2::prio2_decode_prepare_state,
prio3::{prio3_append_prepare_state, prio3_decode_prepare_state},
prio2::prio2_decode_prep_state,
prio3::{prio3_append_prep_state, prio3_decode_prep_state},
VdafAggregateShare, VdafMessage, VdafState, VdafVerifyKey,
},
};
Expand Down Expand Up @@ -447,13 +447,7 @@ pub enum DapAggregateResult {
/// The Leader's state after sending an AggregateInitReq.
#[derive(Debug)]
pub struct DapLeaderState {
pub(crate) seq: Vec<(VdafState, VdafMessage, Time, ReportId)>,
}

/// The Leader's state after sending an AggregateContReq.
#[derive(Debug)]
pub struct DapLeaderUncommitted {
pub(crate) seq: Vec<(DapOutputShare, ReportId)>,
pub(crate) seq: Vec<(VdafState, Option<VdafMessage>, Time, ReportId)>,
}

/// The Helper's state during the aggregation flow.
Expand All @@ -477,7 +471,7 @@ impl DapHelperState {
for (state, time, report_id) in self.seq.iter() {
match (vdaf_config, state) {
(VdafConfig::Prio3(prio3_config), _) => {
prio3_append_prepare_state(&mut bytes, prio3_config, state)?;
prio3_append_prep_state(&mut bytes, prio3_config, state)?;
}
(VdafConfig::Prio2 { .. }, VdafState::Prio2(state)) => {
state.encode(&mut bytes);
Expand All @@ -499,11 +493,9 @@ impl DapHelperState {
while (r.position() as usize) < data.len() {
let state = match vdaf_config {
VdafConfig::Prio3(ref prio3_config) => {
prio3_decode_prepare_state(prio3_config, 1, &mut r)?
}
VdafConfig::Prio2 { dimension } => {
prio2_decode_prepare_state(*dimension, 1, &mut r)?
prio3_decode_prep_state(prio3_config, 1, &mut r)?
}
VdafConfig::Prio2 { dimension } => prio2_decode_prep_state(*dimension, 1, &mut r)?,
};
let time = Time::decode(&mut r).map_err(|e| DapAbort::from_codec_error(e, None))?;
let report_id =
Expand All @@ -521,7 +513,8 @@ impl DapHelperState {
#[derive(Debug)]
/// An ouptut share produced by an Aggregator for a single report.
pub struct DapOutputShare {
pub(crate) time: u64, // Value from the report
pub(crate) report_id: ReportId, // Value from the report XXX Maybe remove this
pub(crate) time: u64, // Value from the report
pub(crate) checksum: [u8; 32],
pub(crate) data: VdafAggregateShare,
}
Expand Down Expand Up @@ -625,14 +618,30 @@ pub enum DapLeaderTransition<M: Debug> {
/// The Leader has produced the next outbound message and its state has been updated.
Continue(DapLeaderState, M),

/// The leader has computed output shares, but is waiting on an AggregateResp from the hepler
/// The Leader has computed output shares, but is waiting on an AggregationJobResp from the Helper
/// before committing them.
Uncommitted(DapLeaderUncommitted, M),
Uncommitted(Vec<DapOutputShare>, M),

/// The Leader has computed output shares and intends to commit them. This is in response to a
/// an AggregationJobResp in which the Helper indicated it has committed.
Committed(Vec<DapOutputShare>),

/// The Leader has completed the aggregation flow without computing an aggregate share.
Skip,
}

impl<M: Debug> std::fmt::Display for DapLeaderTransition<M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::Continue(..) => "continue",
Self::Uncommitted(..) => "uncommitted",
Self::Committed(..) => "uncommitted",
Self::Skip => "skip",
};
write!(f, "{s}")
}
}

/// Helper state transition during the aggregation flow.
#[derive(Debug)]
pub enum DapHelperTransition<M: Debug> {
Expand All @@ -647,7 +656,17 @@ pub enum DapHelperTransition<M: Debug> {
Finish(Vec<DapOutputShare>, M),
}

/// Specification of a concrete VDAF.
impl<M: Debug> std::fmt::Display for DapHelperTransition<M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::Continue(..) => "continue",
Self::Finish(..) => "finish",
};
write!(f, "{s}")
}
}

/// Specificaiton of a concrete VDAF.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum VdafConfig {
Expand Down
80 changes: 75 additions & 5 deletions daphne/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ pub struct Report {
pub draft02_task_id: Option<TaskId>, // Set in draft02
pub report_metadata: ReportMetadata,
pub public_share: Vec<u8>,
// draft02 compatibility: In the latest draft there are two named fields, one for the Leader's
// share and one for the Helper's. In draft02, this is a length-prefixed sequence.
pub encrypted_input_shares: Vec<HpkeCiphertext>,
}

Expand All @@ -229,7 +231,17 @@ impl ParameterizedEncode<DapVersion> for Report {
}
self.report_metadata.encode_with_param(version, bytes);
encode_u32_bytes(bytes, &self.public_share);
encode_u32_items(bytes, &(), &self.encrypted_input_shares);
match version {
DapVersion::Draft02 => encode_u32_items(bytes, &(), &self.encrypted_input_shares),
DapVersion::Draft05 => {
if self.encrypted_input_shares.len() != 2 {
unreachable!("draft05: tried to serialize Report with an invalid number of input shares: got {}; want {}", self.encrypted_input_shares.len(), 2);
}
self.encrypted_input_shares[0].encode(bytes);
self.encrypted_input_shares[1].encode(bytes);
}
DapVersion::Unknown => unreachable!("unhandled version {version:?}"),
}
}
}

Expand All @@ -247,7 +259,14 @@ impl ParameterizedDecode<DapVersion> for Report {
draft02_task_id,
report_metadata: ReportMetadata::decode_with_param(version, bytes)?,
public_share: decode_u32_bytes(bytes)?,
encrypted_input_shares: decode_u32_items(&(), bytes)?,
encrypted_input_shares: match version {
DapVersion::Draft02 => decode_u32_items(&(), bytes)?,
DapVersion::Draft05 => vec![
HpkeCiphertext::decode(bytes)?,
HpkeCiphertext::decode(bytes)?,
],
DapVersion::Unknown => unreachable!("unhandled version {version:?}"),
},
})
}
}
Expand Down Expand Up @@ -402,14 +421,58 @@ impl TryFrom<Query> for BatchSelector {
}
}

/// Helper's report share and the Leader's first-round prep share.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ReportInit {
pub helper_report_share: ReportShare,
pub draft05_leader_prep_share: Option<Vec<u8>>, // Set in draft05+PR393
}

impl ParameterizedEncode<DapVersion> for ReportInit {
fn encode_with_param(&self, version: &DapVersion, bytes: &mut Vec<u8>) {
self.helper_report_share.encode_with_param(version, bytes);
match (version, &self.draft05_leader_prep_share) {
(DapVersion::Draft02, None) => (),
(DapVersion::Draft05, Some(leader_prep_share)) => {
encode_u32_bytes(bytes, leader_prep_share)
}
(_, prep_share) => {
unreachable!(
"unhandled version {version:?} (leader prep share set = {}",
prep_share.is_some()
)
}
};
}
}

impl ParameterizedDecode<DapVersion> for ReportInit {
fn decode_with_param(
version: &DapVersion,
bytes: &mut Cursor<&[u8]>,
) -> Result<Self, CodecError> {
let helper_report_share = ReportShare::decode_with_param(version, bytes)?;
let draft05_leader_prep_share = match version {
DapVersion::Draft02 => None,
DapVersion::Draft05 => Some(decode_u32_bytes(bytes)?),
DapVersion::Unknown => unreachable!("unhandled version {version:?}"),
};

Ok(Self {
helper_report_share,
draft05_leader_prep_share,
})
}
}

/// Aggregate initialization request.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AggregationJobInitReq {
pub draft02_task_id: Option<TaskId>, // Set in draft02
pub draft02_agg_job_id: Option<Draft02AggregationJobId>, // Set in draft02
pub agg_param: Vec<u8>,
pub part_batch_sel: PartialBatchSelector,
pub report_shares: Vec<ReportShare>,
pub report_inits: Vec<ReportInit>,
}

impl ParameterizedEncode<DapVersion> for AggregationJobInitReq {
Expand All @@ -430,7 +493,7 @@ impl ParameterizedEncode<DapVersion> for AggregationJobInitReq {
DapVersion::Unknown => unreachable!("unhandled version {version:?}"),
};
self.part_batch_sel.encode(bytes);
encode_u32_items(bytes, version, &self.report_shares);
encode_u32_items(bytes, version, &self.report_inits);
}
}

Expand All @@ -454,7 +517,7 @@ impl ParameterizedDecode<DapVersion> for AggregationJobInitReq {
draft02_agg_job_id,
agg_param,
part_batch_sel: PartialBatchSelector::decode(bytes)?,
report_shares: decode_u32_items(version, bytes)?,
report_inits: decode_u32_items(version, bytes)?,
})
}
}
Expand Down Expand Up @@ -546,6 +609,8 @@ impl Decode for Transition {
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TransitionVar {
Continued(Vec<u8>),
// XXX Figure out what to call this. We also need to allocate the enum value in the spec.
FinishedWithPayload(Vec<u8>),
Finished,
Failed(TransitionFailure),
}
Expand All @@ -557,6 +622,10 @@ impl Encode for TransitionVar {
0_u8.encode(bytes);
encode_u32_bytes(bytes, vdaf_message);
}
TransitionVar::FinishedWithPayload(vdaf_message) => {
255_u8.encode(bytes); // XXX Pick a byte
encode_u32_bytes(bytes, vdaf_message);
}
TransitionVar::Finished => {
1_u8.encode(bytes);
}
Expand All @@ -572,6 +641,7 @@ impl Decode for TransitionVar {
fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
match u8::decode(bytes)? {
0 => Ok(Self::Continued(decode_u32_bytes(bytes)?)),
255 => Ok(Self::FinishedWithPayload(decode_u32_bytes(bytes)?)), // XXX Pick a byte
1 => Ok(Self::Finished),
2 => Ok(Self::Failed(TransitionFailure::decode(bytes)?)),
_ => Err(CodecError::UnexpectedValue),
Expand Down
Loading

0 comments on commit 4745821

Please sign in to comment.