Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove multi-collection. #3481

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,6 @@ impl<C: Clock> Aggregator<C> {
task_config.query_config().query().try_into()?,
vdaf_instance,
vdaf_verify_key,
task_config.query_config().max_batch_query_count() as u64,
Some(*task_config.task_expiration()),
peer_aggregator.report_expiry_age().cloned(),
task_config.query_config().min_batch_size() as u64,
Expand Down
57 changes: 21 additions & 36 deletions aggregator/src/aggregator/batch_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use janus_messages::{
Role,
};
use prio::vdaf;
use std::{collections::HashSet, hash::Hash};
use std::hash::Hash;

#[async_trait]
pub trait UploadableBatchMode: BatchMode {
Expand Down Expand Up @@ -124,13 +124,9 @@ impl CollectableBatchMode for TimeInterval {
where
A::AggregationParam: Send + Sync + Eq + Hash,
{
// Check how distinct aggregation parameters appear in rows in the relevant table with an
// intersecting batch interval. Each distinct aggregation parameter consumes one unit of
// query count.
//
// https://www.ietf.org/archive/id/draft-ietf-ppm-dap-05.html#section-4.6.6
// Compute the aggregation parameters that have already been collected for.
let mut found_overlapping_nonequal_interval = false;
let agg_params = match task.role() {
let agg_params: Vec<_> = match task.role() {
Role::Leader => tx
.get_collection_jobs_intersecting_interval::<SEED_SIZE, A>(
vdaf,
Expand All @@ -145,7 +141,7 @@ impl CollectableBatchMode for TimeInterval {
}
job.take_aggregation_parameter()
})
.collect::<HashSet<_>>(),
.collect(),

Role::Helper => tx
.get_aggregate_share_jobs_intersecting_interval::<SEED_SIZE, A>(
Expand All @@ -161,7 +157,7 @@ impl CollectableBatchMode for TimeInterval {
};
job.take_aggregation_parameter()
})
.collect::<HashSet<_>>(),
.collect(),

_ => panic!("Unexpected task role {:?}", task.role()),
};
Expand All @@ -173,17 +169,13 @@ impl CollectableBatchMode for TimeInterval {
));
}

// Check that the batch query count is being consumed appropriately.
let max_batch_query_count: usize = task.max_batch_query_count().try_into()?;
let query_count = agg_params.len()
+ if agg_params.contains(aggregation_param) {
0
} else {
1
};
if query_count > max_batch_query_count {
// Check that the batch has not already been queried with a distinct aggregation parameter.
if agg_params
.iter()
.any(|agg_param| agg_param != aggregation_param)
{
return Err(datastore::Error::User(
Error::BatchQueriedTooManyTimes(*task.id(), query_count as u64).into(),
Error::BatchQueriedMultipleTimes(*task.id()).into(),
));
}
Ok(())
Expand All @@ -206,39 +198,32 @@ impl CollectableBatchMode for LeaderSelected {
where
A::AggregationParam: Send + Sync + Eq + Hash,
{
// Check how distinct aggregation parameters appear in rows in the relevant table with an
// intersecting batch interval. Each distinct aggregation parameter consumes one unit of
// query count.
//
// https://www.ietf.org/archive/id/draft-ietf-ppm-dap-05.html#section-4.6.6
let agg_params = match task.role() {
// Compute the aggregation parameters that have already been collected for.
let agg_params: Vec<_> = match task.role() {
Role::Leader => tx
.get_collection_jobs_by_batch_id::<SEED_SIZE, A>(vdaf, task.id(), batch_id)
.await?
.into_iter()
.map(|job| job.take_aggregation_parameter())
.collect::<HashSet<_>>(),
.collect(),

Role::Helper => tx
.get_aggregate_share_jobs_by_batch_id::<SEED_SIZE, A>(vdaf, task.id(), batch_id)
.await?
.into_iter()
.map(|job| job.take_aggregation_parameter())
.collect::<HashSet<_>>(),
.collect(),

_ => panic!("Unexpected task role {:?}", task.role()),
};

let max_batch_query_count: usize = task.max_batch_query_count().try_into()?;
let query_count = agg_params.len()
+ if agg_params.contains(aggregation_param) {
0
} else {
1
};
if query_count > max_batch_query_count {
// Check that the batch has not already been queried with a distinct aggregation parameter.
if agg_params
.iter()
.any(|agg_param| agg_param != aggregation_param)
{
return Err(datastore::Error::User(
Error::BatchQueriedTooManyTimes(*task.id(), query_count as u64).into(),
Error::BatchQueriedMultipleTimes(*task.id()).into(),
));
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions aggregator/src/aggregator/collection_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ mod tests {
.match_body(leader_request.get_encoded().unwrap())
.with_status(500)
.with_header("Content-Type", "application/problem+json")
.with_body("{\"type\": \"urn:ietf:params:ppm:dap:error:batchQueriedTooManyTimes\"}")
.with_body("{\"type\": \"urn:ietf:params:ppm:dap:error:batchQueriedMultipleTimes\"}")
.create_async()
.await;

Expand All @@ -1257,7 +1257,7 @@ mod tests {
assert_matches!(
error,
Error::Http(error_response) => {
assert_matches!(error_response.dap_problem_type(), Some(DapProblemType::BatchQueriedTooManyTimes));
assert_matches!(error_response.dap_problem_type(), Some(DapProblemType::BatchQueriedMultipleTimes));
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
);
Expand Down
Loading
Loading