Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename several concepts for DAP-13. #3470

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 96 additions & 96 deletions aggregator/src/aggregator.rs

Large diffs are not rendered by default.

26 changes: 13 additions & 13 deletions aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use janus_aggregator_core::{
datastore::test_util::{ephemeral_datastore, EphemeralDatastore},
task::{
test_util::{Task, TaskBuilder},
AggregatorTask, QueryType,
AggregatorTask, BatchMode,
},
test_util::noop_meter,
};
Expand All @@ -23,10 +23,10 @@ use janus_core::{
vdaf::VdafInstance,
};
use janus_messages::{
query_type::{self, TimeInterval},
batch_mode::{self, TimeInterval},
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, Duration, Extension,
ExtensionType, HpkeConfig, PartialBatchSelector, PrepareError, PrepareInit, PrepareResp,
PrepareStepResult, ReportMetadata, ReportShare,
ExtensionType, HpkeConfig, PartialBatchSelector, PrepareInit, PrepareResp, PrepareStepResult,
ReportError, ReportMetadata, ReportShare,
};
use prio::{
codec::Encode,
Expand Down Expand Up @@ -246,7 +246,7 @@ async fn setup_aggregate_init_test_without_sending_request<
) -> AggregationJobInitTestCase<VERIFY_KEY_SIZE, V> {
install_test_trace_subscriber();

let task = TaskBuilder::new(QueryType::TimeInterval, vdaf_instance)
let task = TaskBuilder::new(BatchMode::TimeInterval, vdaf_instance)
.with_aggregator_auth_token(auth_token)
.build();
let helper_task = task.helper_view().unwrap();
Expand Down Expand Up @@ -302,10 +302,10 @@ async fn setup_aggregate_init_test_without_sending_request<
}
}

pub(crate) async fn put_aggregation_job<Q: query_type::QueryType>(
pub(crate) async fn put_aggregation_job<B: batch_mode::BatchMode>(
task: &Task,
aggregation_job_id: &AggregationJobId,
aggregation_job: &AggregationJobInitializeReq<Q>,
aggregation_job: &AggregationJobInitializeReq<B>,
handler: &impl Handler,
) -> TestConn {
let (header, value) = task.aggregator_auth_token().request_authentication();
Expand All @@ -314,7 +314,7 @@ pub(crate) async fn put_aggregation_job<Q: query_type::QueryType>(
.with_request_header(header, value)
.with_request_header(
KnownHeaderName::ContentType,
AggregationJobInitializeReq::<Q>::MEDIA_TYPE,
AggregationJobInitializeReq::<B>::MEDIA_TYPE,
)
.with_request_body(aggregation_job.get_encoded().unwrap())
.run_async(handler)
Expand Down Expand Up @@ -430,7 +430,7 @@ async fn aggregation_job_init_unexpected_taskprov_extension() {

let want_aggregation_job_resp = AggregationJobResp::new(Vec::from([PrepareResp::new(
report_id,
PrepareStepResult::Reject(PrepareError::InvalidMessage),
PrepareStepResult::Reject(ReportError::InvalidMessage),
)]));
let got_aggregation_job_resp: AggregationJobResp = decode_response_body(&mut response).await;
assert_eq!(want_aggregation_job_resp, got_aggregation_job_resp);
Expand Down Expand Up @@ -608,7 +608,7 @@ async fn aggregation_job_intolerable_clock_skew() {
);
assert_matches!(
aggregation_job_init_resp.prepare_resps()[1].result(),
&PrepareStepResult::Reject(PrepareError::ReportTooEarly)
&PrepareStepResult::Reject(ReportError::ReportTooEarly)
);
}

Expand Down Expand Up @@ -637,11 +637,11 @@ async fn aggregation_job_init_two_step_vdaf_idempotence() {
async fn aggregation_job_init_wrong_query() {
let test_case = setup_aggregate_init_test().await;

// setup_aggregate_init_test sets up a task with a time interval query. We send a fixed size
// query which should yield an error.
// setup_aggregate_init_test sets up a task with a time interval query. We send a
// leader-selected query which should yield an error.
let wrong_query = AggregationJobInitializeReq::new(
test_case.aggregation_param.get_encoded().unwrap(),
PartialBatchSelector::new_fixed_size(random()),
PartialBatchSelector::new_leader_selected(random()),
test_case.aggregation_job_init_req.prepare_inits().to_vec(),
);

Expand Down
6 changes: 3 additions & 3 deletions aggregator/src/aggregator/aggregate_share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use janus_aggregator_core::{
task::AggregatorTask,
};
use janus_core::{report_id::ReportIdChecksumExt, time::IntervalExt as _};
use janus_messages::{query_type::QueryType, Interval, ReportIdChecksum};
use janus_messages::{batch_mode::BatchMode, Interval, ReportIdChecksum};
use prio::vdaf::{self, Aggregatable};

/// Computes the aggregate share over the provided batch aggregations.
Expand All @@ -20,11 +20,11 @@ use prio::vdaf::{self, Aggregatable};
#[tracing::instrument(skip(task, batch_aggregations), fields(task_id = ?task.id()), err)]
pub(crate) async fn compute_aggregate_share<
const SEED_SIZE: usize,
Q: QueryType,
B: BatchMode,
A: vdaf::Aggregator<SEED_SIZE, 16>,
>(
task: &AggregatorTask,
batch_aggregations: &[BatchAggregation<SEED_SIZE, Q, A>],
batch_aggregations: &[BatchAggregation<SEED_SIZE, B, A>],
) -> Result<(A::AggregateShare, u64, Interval, ReportIdChecksum), Error> {
// At the moment we construct an aggregate share (either handling AggregateShareReq in the
// helper or driving a collection job in the leader), there could be some incomplete aggregation
Expand Down
30 changes: 15 additions & 15 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ use crate::aggregator::{
AggregatorMetrics, Error, VdafOps,
};
use janus_aggregator_core::{
batch_mode::AccumulableBatchMode,
datastore::{
self,
models::{
AggregationJob, ReportAggregation, ReportAggregationState, TaskAggregationCounter,
},
Transaction,
},
query_type::AccumulableQueryType,
task::AggregatorTask,
};
use janus_core::time::Clock;
use janus_messages::{
AggregationJobContinueReq, AggregationJobResp, PrepareError, PrepareResp, PrepareStepResult,
AggregationJobContinueReq, AggregationJobResp, PrepareResp, PrepareStepResult, ReportError,
Role,
};
use prio::{
Expand All @@ -34,20 +34,20 @@ use tracing::{info_span, trace_span, Span};
impl VdafOps {
/// Step the helper's aggregation job to the next step using the step `n` ping pong state in
/// `report_aggregations` with the step `n+1` ping pong messages in `leader_aggregation_job`.
pub(super) async fn step_aggregation_job<const SEED_SIZE: usize, C, Q, A>(
pub(super) async fn step_aggregation_job<const SEED_SIZE: usize, C, B, A>(
tx: &Transaction<'_, C>,
task: Arc<AggregatorTask>,
vdaf: Arc<A>,
batch_aggregation_shard_count: u64,
aggregation_job: AggregationJob<SEED_SIZE, Q, A>,
aggregation_job: AggregationJob<SEED_SIZE, B, A>,
report_aggregations: Vec<ReportAggregation<SEED_SIZE, A>>,
req: Arc<AggregationJobContinueReq>,
request_hash: [u8; 32],
metrics: &AggregatorMetrics,
) -> Result<(AggregationJobResp, TaskAggregationCounter), datastore::Error>
where
C: Clock,
Q: AccumulableQueryType,
B: AccumulableBatchMode,
A: vdaf::Aggregator<SEED_SIZE, 16> + 'static + Send + Sync,
A::AggregationParam: Send + Sync + PartialEq + Eq,
A::InputShare: Send + Sync,
Expand Down Expand Up @@ -87,7 +87,7 @@ impl VdafOps {
report_agg
.clone()
.with_state(ReportAggregationState::Failed {
prepare_error: PrepareError::ReportDropped,
report_error: ReportError::ReportDropped,
})
.with_last_prep_resp(None),
None,
Expand Down Expand Up @@ -134,7 +134,7 @@ impl VdafOps {
report_aggregation
.clone()
.with_state(ReportAggregationState::Failed {
prepare_error: PrepareError::ReportDropped,
report_error: ReportError::ReportDropped,
})
.with_last_prep_resp(None),
None,
Expand Down Expand Up @@ -229,10 +229,10 @@ impl VdafOps {
&metrics.aggregate_step_failure_counter,
)
})
.unwrap_or_else(|prepare_error| {
.unwrap_or_else(|report_error| {
(
ReportAggregationState::Failed { prepare_error },
PrepareStepResult::Reject(prepare_error),
ReportAggregationState::Failed { report_error },
PrepareStepResult::Reject(report_error),
None,
)
});
Expand Down Expand Up @@ -415,7 +415,7 @@ mod tests {
},
task::{
test_util::{Task, TaskBuilder},
QueryType,
BatchMode,
},
test_util::noop_meter,
};
Expand All @@ -425,7 +425,7 @@ mod tests {
vdaf::{VdafInstance, VERIFY_KEY_LENGTH},
};
use janus_messages::{
query_type::TimeInterval, AggregationJobContinueReq, AggregationJobId,
batch_mode::TimeInterval, AggregationJobContinueReq, AggregationJobId,
AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep, Interval,
PartialBatchSelector, PrepareContinue, PrepareResp, PrepareStepResult, Role,
};
Expand Down Expand Up @@ -468,7 +468,7 @@ mod tests {

let aggregation_job_id = random();
let task =
TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build();
TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build();
let helper_task = task.helper_view().unwrap();
let clock = MockClock::default();
let ephemeral_datastore = ephemeral_datastore().await;
Expand Down Expand Up @@ -620,7 +620,7 @@ mod tests {
..
} = HttpHandlerTest::new().await;

let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count).build();
let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count).build();
datastore
.put_aggregator_task(&task.leader_view().unwrap())
.await
Expand Down Expand Up @@ -651,7 +651,7 @@ mod tests {
..
} = HttpHandlerTest::new().await;

let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count).build();
let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count).build();
datastore
.put_aggregator_task(&task.leader_view().unwrap())
.await
Expand Down
Loading
Loading