Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Leader async aggregation. #3564

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 108 additions & 99 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use janus_core::vdaf::Prio3FixedPointBoundedL2VecSumBitSize;
use janus_core::{
auth_tokens::AuthenticationToken,
hpke::{self, HpkeApplicationInfo, Label},
retries::retry_http_request_notify,
retries::{retry_http_request_notify, HttpResponse},
time::{Clock, DurationExt, IntervalExt, TimeExt},
vdaf::{
new_prio3_sum_vec_field64_multiproof_hmacsha256_aes128, vdaf_application_context,
Expand Down Expand Up @@ -1752,105 +1752,106 @@ impl VdafOps {
C: Clock,
for<'a> A::PrepareState: ParameterizedDecode<(&'a A, usize)>,
{
if let Some(existing_aggregation_job) = tx
let existing_aggregation_job = match tx
.get_aggregation_job::<SEED_SIZE, B, A>(task_id, aggregation_job_id)
.await?
{
if existing_aggregation_job.state() == &AggregationJobState::Deleted {
return Err(datastore::Error::User(
Error::DeletedAggregationJob(*task_id, *aggregation_job_id).into(),
));
}
Some(existing_aggregation_job) => existing_aggregation_job,
None => return Ok(None),
};

if existing_aggregation_job.last_request_hash() != Some(request_hash) {
if let Some(log_forbidden_mutations) = log_forbidden_mutations {
let original_report_ids: Vec<_> = tx
.get_report_aggregations_for_aggregation_job(
vdaf,
&Role::Helper,
task_id,
aggregation_job_id,
)
.await?
.iter()
.map(|ra| *ra.report_id())
.collect();
let mutating_request_report_ids: Vec<_> = req
.prepare_inits()
.iter()
.map(|pi| *pi.report_share().metadata().id())
.collect();
let event = AggregationJobInitForbiddenMutationEvent {
task_id: *task_id,
aggregation_job_id: *aggregation_job_id,
original_request_hash: existing_aggregation_job.last_request_hash(),
original_report_ids,
original_batch_id: format!(
"{:?}",
existing_aggregation_job.partial_batch_identifier()
),
original_aggregation_parameter: existing_aggregation_job
.aggregation_parameter()
.get_encoded()
.map_err(|e| datastore::Error::User(e.into()))?,
mutating_request_hash: Some(request_hash),
mutating_request_report_ids,
mutating_request_batch_id: format!(
"{:?}",
req.batch_selector().batch_identifier()
),
mutating_request_aggregation_parameter: req
.aggregation_parameter()
.to_vec(),
};
let event_id = crate::diagnostic::write_event(
log_forbidden_mutations,
"agg-job-illegal-mutation",
event,
)
.await
.map(|event_id| format!("{event_id:?}"))
.unwrap_or_else(|error| {
tracing::error!(?error, "failed to write hash mismatch event");
"no event id".to_string()
});

tracing::info!(
?event_id,
original_request_hash = existing_aggregation_job
.last_request_hash()
.map(hex::encode),
mutating_request_hash = hex::encode(request_hash),
"request hash mismatch on retried aggregation job request",
);
}
return Err(datastore::Error::User(
Error::ForbiddenMutation {
resource_type: "aggregation job",
identifier: aggregation_job_id.to_string(),
}
.into(),
));
}
if existing_aggregation_job.state() == &AggregationJobState::Deleted {
return Err(datastore::Error::User(
Error::DeletedAggregationJob(*task_id, *aggregation_job_id).into(),
));
}

// This is a repeated request. Send the same response we computed last time.
return Ok(Some(AggregationJobResp::Finished {
prepare_resps: tx
if existing_aggregation_job.last_request_hash() != Some(request_hash) {
if let Some(log_forbidden_mutations) = log_forbidden_mutations {
let original_report_ids: Vec<_> = tx
.get_report_aggregations_for_aggregation_job(
vdaf,
&Role::Helper,
task_id,
aggregation_job_id,
existing_aggregation_job.aggregation_parameter(),
)
.await?
.iter()
.filter_map(ReportAggregation::last_prep_resp)
.cloned()
.collect(),
}));
.map(|ra| *ra.report_id())
.collect();
let mutating_request_report_ids: Vec<_> = req
.prepare_inits()
.iter()
.map(|pi| *pi.report_share().metadata().id())
.collect();
let event = AggregationJobInitForbiddenMutationEvent {
task_id: *task_id,
aggregation_job_id: *aggregation_job_id,
original_request_hash: existing_aggregation_job.last_request_hash(),
original_report_ids,
original_batch_id: format!(
"{:?}",
existing_aggregation_job.partial_batch_identifier()
),
original_aggregation_parameter: existing_aggregation_job
.aggregation_parameter()
.get_encoded()
.map_err(|e| datastore::Error::User(e.into()))?,
mutating_request_hash: Some(request_hash),
mutating_request_report_ids,
mutating_request_batch_id: format!(
"{:?}",
req.batch_selector().batch_identifier()
),
mutating_request_aggregation_parameter: req.aggregation_parameter().to_vec(),
};
let event_id = crate::diagnostic::write_event(
log_forbidden_mutations,
"agg-job-illegal-mutation",
event,
)
.await
.map(|event_id| format!("{event_id:?}"))
.unwrap_or_else(|error| {
tracing::error!(?error, "failed to write hash mismatch event");
"no event id".to_string()
});

tracing::info!(
?event_id,
original_request_hash = existing_aggregation_job
.last_request_hash()
.map(hex::encode),
mutating_request_hash = hex::encode(request_hash),
"request hash mismatch on retried aggregation job request",
);
}
return Err(datastore::Error::User(
Error::ForbiddenMutation {
resource_type: "aggregation job",
identifier: aggregation_job_id.to_string(),
}
.into(),
));
}

Ok(None)
// This is a repeated request. Send the same response we computed last time.
return Ok(Some(AggregationJobResp::Finished {
prepare_resps: tx
.get_report_aggregations_for_aggregation_job(
vdaf,
&Role::Helper,
task_id,
aggregation_job_id,
existing_aggregation_job.aggregation_parameter(),
)
.await?
.iter()
.filter_map(ReportAggregation::last_prep_resp)
.cloned()
.collect(),
}));
}

/// Implements [helper aggregate initialization][1].
Expand Down Expand Up @@ -2202,7 +2203,7 @@ impl VdafOps {
// Helper is not finished. Await the next message from the Leader to advance to
// the next step.
(
ReportAggregationState::WaitingHelper { prepare_state },
ReportAggregationState::HelperContinue { prepare_state },
PrepareStepResult::Continue {
message: outgoing_message,
},
Expand Down Expand Up @@ -2421,22 +2422,24 @@ impl VdafOps {

Box::pin(async move {
// Read existing state.
let (aggregation_job, report_aggregations) = try_join!(
tx.get_aggregation_job::<SEED_SIZE, B, A>(task.id(), &aggregation_job_id),
tx.get_report_aggregations_for_aggregation_job(
let aggregation_job = tx
.get_aggregation_job::<SEED_SIZE, B, A>(task.id(), &aggregation_job_id)
.await?
.ok_or_else(|| {
datastore::Error::User(
Error::UnrecognizedAggregationJob(*task.id(), aggregation_job_id)
.into(),
)
})?;
let report_aggregations = tx
.get_report_aggregations_for_aggregation_job(
vdaf.as_ref(),
&Role::Helper,
task.id(),
&aggregation_job_id,
aggregation_job.aggregation_parameter(),
)
)?;

let aggregation_job = aggregation_job.ok_or_else(|| {
datastore::Error::User(
Error::UnrecognizedAggregationJob(*task.id(), aggregation_job_id)
.into(),
)
})?;
.await?;

// Deleted aggregation jobs cannot be stepped
if *aggregation_job.state() == AggregationJobState::Deleted {
Expand Down Expand Up @@ -3201,6 +3204,12 @@ fn write_task_aggregation_counter<C: Clock>(
task_id: TaskId,
counters: TaskAggregationCounter,
) {
if counters.is_zero() {
// Don't spawn a task or interact with the datastore if doing so won't change the state of
// the datastore.
return;
}

// We write task aggregation counters back in a separate tokio task & datastore transaction,
// so that any slowness induced by writing the counters (e.g. due to transaction retry) does
// not slow the main processing. The lack of transactionality between writing the updated
Expand Down Expand Up @@ -3354,7 +3363,7 @@ async fn send_request_to_helper(
request_body: Option<RequestBody>,
auth_token: &AuthenticationToken,
http_request_duration_histogram: &Histogram<f64>,
) -> Result<Bytes, Error> {
) -> Result<HttpResponse, Error> {
let (auth_header, auth_value) = auth_token.request_authentication();
let domain = Arc::from(url.domain().unwrap_or_default());
let method_str = Arc::from(method.as_str());
Expand Down Expand Up @@ -3383,7 +3392,7 @@ async fn send_request_to_helper(
// Successful response.
Ok(response) => {
timer.finish_attempt("success");
Ok(response.body().clone())
Ok(response)
}

// HTTP-level error.
Expand Down
16 changes: 9 additions & 7 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl VdafOps {
// the report was dropped (if it's not already in an error state) and continue.
if matches!(
report_agg.state(),
ReportAggregationState::WaitingHelper { .. }
ReportAggregationState::HelperContinue { .. }
) {
report_aggregations_to_write.push(WritableReportAggregation::new(
report_agg
Expand All @@ -99,11 +99,11 @@ impl VdafOps {
};

let prep_state = match report_aggregation.state() {
ReportAggregationState::WaitingHelper { prepare_state } => prepare_state.clone(),
ReportAggregationState::WaitingLeader { .. } => {
ReportAggregationState::HelperContinue { prepare_state } => prepare_state.clone(),
ReportAggregationState::LeaderContinue { .. } => {
return Err(datastore::Error::User(
Error::Internal(
"helper encountered unexpected ReportAggregationState::WaitingLeader"
"helper encountered unexpected ReportAggregationState::LeaderContinue"
.to_string(),
)
.into(),
Expand All @@ -128,7 +128,7 @@ impl VdafOps {
// the report was dropped (if it's not already in an error state) and continue.
if matches!(
report_aggregation.state(),
ReportAggregationState::WaitingHelper { .. }
ReportAggregationState::HelperContinue { .. }
) {
report_aggregations_to_write.push(WritableReportAggregation::new(
report_aggregation
Expand Down Expand Up @@ -189,7 +189,7 @@ impl VdafOps {
// state and await the next message from
// the Leader to advance preparation.
PingPongState::Continued(prepare_state) => (
ReportAggregationState::WaitingHelper {
ReportAggregationState::HelperContinue {
prepare_state,
},
None,
Expand Down Expand Up @@ -517,7 +517,7 @@ mod tests {
*prepare_init.report_share().metadata().time(),
0,
None,
ReportAggregationState::WaitingHelper {
ReportAggregationState::HelperContinue {
prepare_state: *transcript.helper_prepare_transitions[0]
.prepare_state(),
},
Expand Down Expand Up @@ -743,6 +743,7 @@ mod tests {
&Role::Helper,
&task_id,
&aggregation_job_id,
&test_case.aggregation_parameter,
)
.await
.unwrap();
Expand Down Expand Up @@ -795,6 +796,7 @@ mod tests {
&Role::Helper,
&task_id,
&aggregation_job_id,
&test_case.aggregation_parameter,
)
.await
.unwrap();
Expand Down
Loading
Loading