Skip to content

Commit

Permalink
Remove multi-collection.
Browse files Browse the repository at this point in the history
DAP-11 removed the ability to collect a batch more than once (with
differing aggregation parameters). This commit also includes related
changes to a few messages, and removes the max_batch_query_count task
parameter.
  • Loading branch information
branlwyd committed Nov 13, 2024
1 parent cd883aa commit 9591330
Show file tree
Hide file tree
Showing 41 changed files with 417 additions and 1,216 deletions.
1 change: 0 additions & 1 deletion aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,6 @@ impl<C: Clock> Aggregator<C> {
task_config.query_config().query().try_into()?,
vdaf_instance,
vdaf_verify_key,
task_config.query_config().max_batch_query_count() as u64,
Some(*task_config.task_expiration()),
peer_aggregator.report_expiry_age().cloned(),
task_config.query_config().min_batch_size() as u64,
Expand Down
57 changes: 21 additions & 36 deletions aggregator/src/aggregator/batch_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use janus_messages::{
Role,
};
use prio::vdaf;
use std::{collections::HashSet, hash::Hash};
use std::hash::Hash;

#[async_trait]
pub trait UploadableBatchMode: BatchMode {
Expand Down Expand Up @@ -124,13 +124,9 @@ impl CollectableBatchMode for TimeInterval {
where
A::AggregationParam: Send + Sync + Eq + Hash,
{
// Check how distinct aggregation parameters appear in rows in the relevant table with an
// intersecting batch interval. Each distinct aggregation parameter consumes one unit of
// query count.
//
// https://www.ietf.org/archive/id/draft-ietf-ppm-dap-05.html#section-4.6.6
// Compute the aggregation parameters that have already been collected for.
let mut found_overlapping_nonequal_interval = false;
let agg_params = match task.role() {
let agg_params: Vec<_> = match task.role() {
Role::Leader => tx
.get_collection_jobs_intersecting_interval::<SEED_SIZE, A>(
vdaf,
Expand All @@ -145,7 +141,7 @@ impl CollectableBatchMode for TimeInterval {
}
job.take_aggregation_parameter()
})
.collect::<HashSet<_>>(),
.collect(),

Role::Helper => tx
.get_aggregate_share_jobs_intersecting_interval::<SEED_SIZE, A>(
Expand All @@ -161,7 +157,7 @@ impl CollectableBatchMode for TimeInterval {
};
job.take_aggregation_parameter()
})
.collect::<HashSet<_>>(),
.collect(),

_ => panic!("Unexpected task role {:?}", task.role()),
};
Expand All @@ -173,17 +169,13 @@ impl CollectableBatchMode for TimeInterval {
));
}

// Check that the batch query count is being consumed appropriately.
let max_batch_query_count: usize = task.max_batch_query_count().try_into()?;
let query_count = agg_params.len()
+ if agg_params.contains(aggregation_param) {
0
} else {
1
};
if query_count > max_batch_query_count {
// Check that the batch has not already been queried with a distinct aggregation parameter.
if agg_params
.iter()
.any(|agg_param| agg_param != aggregation_param)
{
return Err(datastore::Error::User(
Error::BatchQueriedTooManyTimes(*task.id(), query_count as u64).into(),
Error::BatchQueriedMultipleTimes(*task.id()).into(),
));
}
Ok(())
Expand All @@ -206,39 +198,32 @@ impl CollectableBatchMode for LeaderSelected {
where
A::AggregationParam: Send + Sync + Eq + Hash,
{
// Check how distinct aggregation parameters appear in rows in the relevant table with an
// intersecting batch interval. Each distinct aggregation parameter consumes one unit of
// query count.
//
// https://www.ietf.org/archive/id/draft-ietf-ppm-dap-05.html#section-4.6.6
let agg_params = match task.role() {
// Compute the aggregation parameters that have already been collected for.
let agg_params: Vec<_> = match task.role() {
Role::Leader => tx
.get_collection_jobs_by_batch_id::<SEED_SIZE, A>(vdaf, task.id(), batch_id)
.await?
.into_iter()
.map(|job| job.take_aggregation_parameter())
.collect::<HashSet<_>>(),
.collect(),

Role::Helper => tx
.get_aggregate_share_jobs_by_batch_id::<SEED_SIZE, A>(vdaf, task.id(), batch_id)
.await?
.into_iter()
.map(|job| job.take_aggregation_parameter())
.collect::<HashSet<_>>(),
.collect(),

_ => panic!("Unexpected task role {:?}", task.role()),
};

let max_batch_query_count: usize = task.max_batch_query_count().try_into()?;
let query_count = agg_params.len()
+ if agg_params.contains(aggregation_param) {
0
} else {
1
};
if query_count > max_batch_query_count {
// Check that the batch has not already been queried with a distinct aggregation parameter.
if agg_params
.iter()
.any(|agg_param| agg_param != aggregation_param)
{
return Err(datastore::Error::User(
Error::BatchQueriedTooManyTimes(*task.id(), query_count as u64).into(),
Error::BatchQueriedMultipleTimes(*task.id()).into(),
));
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions aggregator/src/aggregator/collection_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ mod tests {
.match_body(leader_request.get_encoded().unwrap())
.with_status(500)
.with_header("Content-Type", "application/problem+json")
.with_body("{\"type\": \"urn:ietf:params:ppm:dap:error:batchQueriedTooManyTimes\"}")
.with_body("{\"type\": \"urn:ietf:params:ppm:dap:error:batchQueriedMultipleTimes\"}")
.create_async()
.await;

Expand All @@ -1257,7 +1257,7 @@ mod tests {
assert_matches!(
error,
Error::Http(error_response) => {
assert_matches!(error_response.dap_problem_type(), Some(DapProblemType::BatchQueriedTooManyTimes));
assert_matches!(error_response.dap_problem_type(), Some(DapProblemType::BatchQueriedMultipleTimes));
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
);
Expand Down
Loading

0 comments on commit 9591330

Please sign in to comment.