Skip to content

Commit

Permalink
Rename "prepare error" to "report error".
Browse files Browse the repository at this point in the history
  • Loading branch information
branlwyd committed Nov 9, 2024
1 parent e46905d commit c780b60
Show file tree
Hide file tree
Showing 17 changed files with 125 additions and 124 deletions.
30 changes: 15 additions & 15 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use janus_messages::{
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep,
BatchSelector, Collection, CollectionJobId, CollectionReq, Duration, ExtensionType, HpkeConfig,
HpkeConfigList, InputShareAad, Interval, PartialBatchSelector, PlaintextInputShare,
PrepareError, PrepareResp, PrepareStepResult, Report, ReportIdChecksum, ReportShare, Role,
PrepareResp, PrepareStepResult, Report, ReportError, ReportIdChecksum, ReportShare, Role,
TaskId,
};
use opentelemetry::{
Expand Down Expand Up @@ -2138,7 +2138,7 @@ impl VdafOps {
metrics
.aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "unknown_hpke_config_id")]);
Err(PrepareError::HpkeUnknownConfigId)
Err(ReportError::HpkeUnknownConfigId)
} else {
Ok(())
};
Expand All @@ -2164,7 +2164,7 @@ impl VdafOps {
// HpkeDecryptError isn't strictly accurate, but given that this
// fallible encoding is part of the HPKE decryption process, I think
// this is as close as we can get to a meaningful error signal.
PrepareError::HpkeDecryptError
ReportError::HpkeDecryptError
})
});

Expand Down Expand Up @@ -2207,7 +2207,7 @@ impl VdafOps {
metrics
.aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "decrypt_failure")]);
PrepareError::HpkeDecryptError
ReportError::HpkeDecryptError
})
});

Expand All @@ -2226,7 +2226,7 @@ impl VdafOps {
"plaintext_input_share_decode_failure",
)],
);
PrepareError::InvalidMessage
ReportError::InvalidMessage
})?;

// Build map of extension type to extension data, checking for duplicates.
Expand All @@ -2244,7 +2244,7 @@ impl VdafOps {
metrics
.aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "duplicate_extension")]);
return Err(PrepareError::InvalidMessage);
return Err(ReportError::InvalidMessage);
}

if require_taskprov_extension {
Expand All @@ -2266,7 +2266,7 @@ impl VdafOps {
"missing_or_malformed_taskprov_extension",
)],
);
return Err(PrepareError::InvalidMessage);
return Err(ReportError::InvalidMessage);
}
} else if extensions.contains_key(&ExtensionType::Taskprov) {
// taskprov not enabled, but the taskprov extension is present.
Expand All @@ -2279,7 +2279,7 @@ impl VdafOps {
metrics
.aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "unexpected_taskprov_extension")]);
return Err(PrepareError::InvalidMessage);
return Err(ReportError::InvalidMessage);
}

Ok(plaintext_input_share)
Expand All @@ -2299,7 +2299,7 @@ impl VdafOps {
metrics
.aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "input_share_decode_failure")]);
PrepareError::InvalidMessage
ReportError::InvalidMessage
})
});

Expand All @@ -2316,7 +2316,7 @@ impl VdafOps {
metrics
.aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "public_share_decode_failure")]);
PrepareError::InvalidMessage
ReportError::InvalidMessage
});

let shares =
Expand All @@ -2330,7 +2330,7 @@ impl VdafOps {
.time()
.is_after(&report_deadline)
{
return Err(PrepareError::ReportTooEarly);
return Err(ReportError::ReportTooEarly);
}
Ok(shares)
});
Expand Down Expand Up @@ -2382,9 +2382,9 @@ impl VdafOps {
},
Some(output_share),
),
Err(prepare_error) => (
ReportAggregationState::Failed { prepare_error },
PrepareStepResult::Reject(prepare_error),
Err(report_error) => (
ReportAggregationState::Failed { report_error },
PrepareStepResult::Reject(report_error),
None,
),
};
Expand Down Expand Up @@ -2503,7 +2503,7 @@ impl VdafOps {
report_aggregation = Cow::Owned(
report_aggregation
.into_owned()
.with_failure(PrepareError::ReportReplayed),
.with_failure(ReportError::ReportReplayed),
)
}
Err(err) => return Err(err),
Expand Down
8 changes: 4 additions & 4 deletions aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use janus_core::{
use janus_messages::{
batch_mode::{self, TimeInterval},
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, Duration, Extension,
ExtensionType, HpkeConfig, PartialBatchSelector, PrepareError, PrepareInit, PrepareResp,
PrepareStepResult, ReportMetadata, ReportShare,
ExtensionType, HpkeConfig, PartialBatchSelector, PrepareInit, PrepareResp, PrepareStepResult,
ReportError, ReportMetadata, ReportShare,
};
use prio::{
codec::Encode,
Expand Down Expand Up @@ -430,7 +430,7 @@ async fn aggregation_job_init_unexpected_taskprov_extension() {

let want_aggregation_job_resp = AggregationJobResp::new(Vec::from([PrepareResp::new(
report_id,
PrepareStepResult::Reject(PrepareError::InvalidMessage),
PrepareStepResult::Reject(ReportError::InvalidMessage),
)]));
let got_aggregation_job_resp: AggregationJobResp = decode_response_body(&mut response).await;
assert_eq!(want_aggregation_job_resp, got_aggregation_job_resp);
Expand Down Expand Up @@ -608,7 +608,7 @@ async fn aggregation_job_intolerable_clock_skew() {
);
assert_matches!(
aggregation_job_init_resp.prepare_resps()[1].result(),
&PrepareStepResult::Reject(PrepareError::ReportTooEarly)
&PrepareStepResult::Reject(ReportError::ReportTooEarly)
);
}

Expand Down
12 changes: 6 additions & 6 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use janus_aggregator_core::{
};
use janus_core::time::Clock;
use janus_messages::{
AggregationJobContinueReq, AggregationJobResp, PrepareError, PrepareResp, PrepareStepResult,
AggregationJobContinueReq, AggregationJobResp, PrepareResp, PrepareStepResult, ReportError,
Role,
};
use prio::{
Expand Down Expand Up @@ -87,7 +87,7 @@ impl VdafOps {
report_agg
.clone()
.with_state(ReportAggregationState::Failed {
prepare_error: PrepareError::ReportDropped,
report_error: ReportError::ReportDropped,
})
.with_last_prep_resp(None),
None,
Expand Down Expand Up @@ -134,7 +134,7 @@ impl VdafOps {
report_aggregation
.clone()
.with_state(ReportAggregationState::Failed {
prepare_error: PrepareError::ReportDropped,
report_error: ReportError::ReportDropped,
})
.with_last_prep_resp(None),
None,
Expand Down Expand Up @@ -229,10 +229,10 @@ impl VdafOps {
&metrics.aggregate_step_failure_counter,
)
})
.unwrap_or_else(|prepare_error| {
.unwrap_or_else(|report_error| {
(
ReportAggregationState::Failed { prepare_error },
PrepareStepResult::Reject(prepare_error),
ReportAggregationState::Failed { report_error },
PrepareStepResult::Reject(report_error),
None,
)
});
Expand Down
4 changes: 2 additions & 2 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ mod tests {
use janus_messages::{
batch_mode::{LeaderSelected, TimeInterval},
codec::ParameterizedDecode,
AggregationJobStep, Interval, PrepareError, Query, ReportId, ReportIdChecksum,
AggregationJobStep, Interval, Query, ReportError, ReportId, ReportIdChecksum,
ReportMetadata, Role, TaskId, Time,
};
use prio::vdaf::{
Expand Down Expand Up @@ -1652,7 +1652,7 @@ mod tests {
(
(*report.metadata().id(), ()),
ReportAggregationState::Failed {
prepare_error: PrepareError::BatchCollected,
report_error: ReportError::BatchCollected,
},
)
})
Expand Down
22 changes: 11 additions & 11 deletions aggregator/src/aggregator/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use janus_core::{
use janus_messages::{
batch_mode::{LeaderSelected, TimeInterval},
AggregationJobContinueReq, AggregationJobInitializeReq, AggregationJobResp,
PartialBatchSelector, PrepareContinue, PrepareError, PrepareInit, PrepareStepResult,
PartialBatchSelector, PrepareContinue, PrepareInit, PrepareStepResult, ReportError,
ReportShare, Role,
};
use opentelemetry::{
Expand Down Expand Up @@ -391,7 +391,7 @@ where
.add(1, &[KeyValue::new("type", "duplicate_extension")]);
return ra_sender.send(WritableReportAggregation::new(
report_aggregation.with_state(ReportAggregationState::Failed {
prepare_error: PrepareError::InvalidMessage,
report_error: ReportError::InvalidMessage,
}),
None,
)).map_err(|_| ());
Expand All @@ -406,7 +406,7 @@ where
.add(1, &[KeyValue::new("type", "public_share_encode_failure")]);
return ra_sender.send(WritableReportAggregation::new(
report_aggregation.with_state(ReportAggregationState::Failed {
prepare_error: PrepareError::InvalidMessage,
report_error: ReportError::InvalidMessage,
}),
None,
)).map_err(|_| ());
Expand Down Expand Up @@ -449,10 +449,10 @@ where
},
)).map_err(|_| ())
}
Err(prepare_error) => {
Err(report_error) => {
ra_sender.send(WritableReportAggregation::new(
report_aggregation
.with_state(ReportAggregationState::Failed { prepare_error }),
.with_state(ReportAggregationState::Failed { report_error }),
None,
)).map_err(|_| ())
}
Expand Down Expand Up @@ -627,7 +627,7 @@ where
let (prep_state, message) = match result {
Ok((state, message)) => (state, message),
Err(error) => {
let prepare_error = handle_ping_pong_error(
let report_error = handle_ping_pong_error(
&task_id,
Role::Leader,
report_aggregation.report_id(),
Expand All @@ -637,7 +637,7 @@ where
return ra_sender
.send(WritableReportAggregation::new(
report_aggregation.with_state(
ReportAggregationState::Failed { prepare_error },
ReportAggregationState::Failed { report_error },
),
None,
))
Expand Down Expand Up @@ -841,8 +841,8 @@ where
// already finished. Commit the output share.
(ReportAggregationState::Finished, Some(output_share))
}
Err(prepare_error) => {
(ReportAggregationState::Failed { prepare_error }, None)
Err(report_error) => {
(ReportAggregationState::Failed { report_error }, None)
}
}
}
Expand All @@ -861,7 +861,7 @@ where
.add(1, &[KeyValue::new("type", "finish_mismatch")]);
(
ReportAggregationState::Failed {
prepare_error: PrepareError::VdafPrepError,
report_error: ReportError::VdafPrepError,
},
None,
)
Expand All @@ -880,7 +880,7 @@ where
.add(1, &[KeyValue::new("type", "helper_step_failure")]);
(
ReportAggregationState::Failed {
prepare_error: *err,
report_error: *err,
},
None,
)
Expand Down
4 changes: 2 additions & 2 deletions aggregator/src/aggregator/aggregation_job_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use janus_messages::{
problem_type::DapProblemType,
AggregationJobContinueReq, AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep,
Duration, Extension, ExtensionType, Interval, PartialBatchSelector, PrepareContinue,
PrepareError, PrepareInit, PrepareResp, PrepareStepResult, ReportIdChecksum, ReportMetadata,
PrepareInit, PrepareResp, PrepareStepResult, ReportError, ReportIdChecksum, ReportMetadata,
ReportShare, Role, Time,
};
use mockito::ServerGuard;
Expand Down Expand Up @@ -588,7 +588,7 @@ async fn step_time_interval_aggregation_job_init_single_step() {
1,
None,
ReportAggregationState::Failed {
prepare_error: PrepareError::InvalidMessage,
report_error: ReportError::InvalidMessage,
},
);
let want_batch_aggregations = Vec::from([BatchAggregation::<
Expand Down
24 changes: 12 additions & 12 deletions aggregator/src/aggregator/aggregation_job_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use janus_core::{
vdaf::VdafInstance,
};
use janus_messages::{
AggregationJobId, Interval, PrepareError, PrepareResp, PrepareStepResult, ReportId,
AggregationJobId, Interval, PrepareResp, PrepareStepResult, ReportError, ReportId,
ReportIdChecksum, Time,
};
use opentelemetry::{
Expand Down Expand Up @@ -603,7 +603,7 @@ where
*report_aggregation.to_mut() = report_aggregation
.as_ref()
.clone()
.with_failure(PrepareError::BatchCollected);
.with_failure(ReportError::BatchCollected);
}
}
}
Expand Down Expand Up @@ -830,7 +830,7 @@ where
*report_aggregation.to_mut() = report_aggregation
.as_ref()
.clone()
.with_failure(PrepareError::VdafPrepError);
.with_failure(ReportError::VdafPrepError);
}
}
}
Expand Down Expand Up @@ -981,8 +981,8 @@ pub trait ReportAggregationUpdate<const SEED_SIZE: usize, A: vdaf::Aggregator<SE
fn is_failed(&self) -> bool;

/// Returns a new report aggregation corresponding to this report aggregation updated to have
/// the "Failed" state, with the given [`PrepareError`].
fn with_failure(self, prepare_error: PrepareError) -> Self;
/// the "Failed" state, with the given [`ReportError`].
fn with_failure(self, report_error: ReportError) -> Self;

/// Returns the last preparation response from this report aggregation, if any.
fn last_prep_resp(&self) -> Option<&PrepareResp>;
Expand Down Expand Up @@ -1040,10 +1040,10 @@ where
)
}

fn with_failure(self, prepare_error: PrepareError) -> Self {
fn with_failure(self, report_error: ReportError) -> Self {
let mut report_aggregation = self
.report_aggregation
.with_state(ReportAggregationState::Failed { prepare_error });
.with_state(ReportAggregationState::Failed { report_error });

// This check effectively checks if we are the Helper. (The Helper will always set
// last_prep_resp for all non-failed report aggregations, and most failed report
Expand All @@ -1052,7 +1052,7 @@ where
let report_id = *report_aggregation.report_id();
report_aggregation = report_aggregation.with_last_prep_resp(Some(PrepareResp::new(
report_id,
PrepareStepResult::Reject(prepare_error),
PrepareStepResult::Reject(report_error),
)));
}

Expand Down Expand Up @@ -1107,8 +1107,8 @@ where
matches!(self.state(), ReportAggregationMetadataState::Failed { .. })
}

fn with_failure(self, prepare_error: PrepareError) -> Self {
self.with_state(ReportAggregationMetadataState::Failed { prepare_error })
fn with_failure(self, report_error: ReportError) -> Self {
self.with_state(ReportAggregationMetadataState::Failed { report_error })
}

/// Returns the last preparation response from this report aggregation, if any.
Expand Down Expand Up @@ -1158,9 +1158,9 @@ where
self.as_ref().is_failed()
}

fn with_failure(self, prepare_error: PrepareError) -> Self {
fn with_failure(self, report_error: ReportError) -> Self {
// Since `with_failure` consumes the caller, we must own the CoW.
Self::Owned(self.into_owned().with_failure(prepare_error))
Self::Owned(self.into_owned().with_failure(report_error))
}

/// Returns the last preparation response from this report aggregation, if any.
Expand Down
Loading

0 comments on commit c780b60

Please sign in to comment.