Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable LIMIT in unaggregated reports query #2690

Merged
merged 1 commit into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ pub struct AggregationJobCreator<C: Clock> {
min_aggregation_job_size: usize,
/// The maximum number of client reports to include in an aggregation job.
max_aggregation_job_size: usize,
/// Maximum number of reports to load at a time when creating aggregation jobs.
aggregation_job_creation_report_window: usize,
}

impl<C: Clock + 'static> AggregationJobCreator<C> {
Expand All @@ -82,6 +84,7 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
aggregation_job_creation_interval: Duration,
min_aggregation_job_size: usize,
max_aggregation_job_size: usize,
aggregation_job_creation_report_window: usize,
) -> AggregationJobCreator<C> {
assert!(
max_aggregation_job_size > 0,
Expand All @@ -94,6 +97,7 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
aggregation_job_creation_interval,
min_aggregation_job_size,
max_aggregation_job_size,
aggregation_job_creation_report_window,
}
}

Expand Down Expand Up @@ -545,11 +549,17 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
let this = Arc::clone(&self);
let task = Arc::clone(&task);
let vdaf = Arc::clone(&vdaf);
let aggregation_job_creation_report_window =
self.aggregation_job_creation_report_window;

Box::pin(async move {
// Find some unaggregated client reports.
let mut reports = tx
.get_unaggregated_client_reports_for_task(vdaf.as_ref(), task.id())
.get_unaggregated_client_reports_for_task(
vdaf.as_ref(),
task.id(),
aggregation_job_creation_report_window,
)
.await?;
reports.sort_by_key(|report| *report.metadata().time());

Expand Down Expand Up @@ -706,11 +716,17 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
let this = Arc::clone(&self);
let task = Arc::clone(&task);
let vdaf = Arc::clone(&vdaf);
let aggregation_job_creation_report_window =
self.aggregation_job_creation_report_window;

Box::pin(async move {
// Find unaggregated client reports.
let unaggregated_reports = tx
.get_unaggregated_client_reports_for_task(vdaf.as_ref(), task.id())
.get_unaggregated_client_reports_for_task(
vdaf.as_ref(),
task.id(),
aggregation_job_creation_report_window,
)
.await?;

let mut aggregation_job_writer =
Expand Down Expand Up @@ -868,6 +884,7 @@ mod tests {
AGGREGATION_JOB_CREATION_INTERVAL,
1,
100,
5000,
));
let stopper = Stopper::new();
let task_handle = task::spawn(Arc::clone(&job_creator).run(stopper.clone()));
Expand Down Expand Up @@ -1043,6 +1060,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -1217,6 +1235,7 @@ mod tests {
Duration::from_secs(1),
2,
100,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -1429,6 +1448,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -1589,6 +1609,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -1791,6 +1812,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -1852,7 +1874,7 @@ mod tests {

Box::pin(async move {
let report_ids = tx
.get_unaggregated_client_reports_for_task(vdaf.as_ref(), task.id())
.get_unaggregated_client_reports_for_task(vdaf.as_ref(), task.id(), 5000)
.await
.unwrap()
.into_iter()
Expand Down Expand Up @@ -1958,6 +1980,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -2220,6 +2243,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -2511,6 +2535,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -2768,6 +2793,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down
9 changes: 9 additions & 0 deletions aggregator/src/binaries/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
Duration::from_secs(ctx.config.aggregation_job_creation_interval_secs),
ctx.config.min_aggregation_job_size,
ctx.config.max_aggregation_job_size,
ctx.config.aggregation_job_creation_report_window,
));
aggregation_job_creator.run(ctx.stopper).await;

Expand Down Expand Up @@ -78,6 +79,13 @@ pub struct Config {
pub min_aggregation_job_size: usize,
/// The maximum number of client reports to include in an aggregation job.
pub max_aggregation_job_size: usize,
/// Maximum number of reports to load at a time when creating aggregation jobs.
#[serde(default = "default_aggregation_job_creation_report_window")]
pub aggregation_job_creation_report_window: usize,
}

fn default_aggregation_job_creation_report_window() -> usize {
5000
}

impl BinaryConfig for Config {
Expand Down Expand Up @@ -119,6 +127,7 @@ mod tests {
aggregation_job_creation_interval_secs: 60,
min_aggregation_job_size: 100,
max_aggregation_job_size: 500,
aggregation_job_creation_report_window: 5000,
})
}

Expand Down
1 change: 1 addition & 0 deletions aggregator/tests/integration/graceful_shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ async fn aggregation_job_creator_shutdown() {
aggregation_job_creation_interval_secs: 60,
min_aggregation_job_size: 100,
max_aggregation_job_size: 100,
aggregation_job_creation_report_window: 5000,
};

graceful_shutdown(trycmd::cargo::cargo_bin!("aggregation_job_creator"), config).await;
Expand Down
5 changes: 3 additions & 2 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1188,12 +1188,12 @@ impl<C: Clock> Transaction<'_, C> {
&self,
vdaf: &A,
task_id: &TaskId,
limit: usize,
) -> Result<Vec<LeaderStoredReport<SEED_SIZE, A>>, Error>
where
A::InputShare: PartialEq,
A::PublicShare: PartialEq,
{
// TODO(#269): allow the number of returned results to be controlled?
let stmt = self
.prepare_cached(
"WITH unaggregated_reports AS (
Expand All @@ -1204,7 +1204,7 @@ impl<C: Clock> Transaction<'_, C> {
AND client_reports.client_timestamp >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)
ORDER BY client_timestamp DESC
FOR UPDATE OF client_reports SKIP LOCKED
LIMIT 5000
LIMIT $5::BIGINT
)
UPDATE client_reports SET
aggregation_started = TRUE, updated_at = $3, updated_by = $4
Expand All @@ -1221,6 +1221,7 @@ impl<C: Clock> Transaction<'_, C> {
/* now */ &self.clock.now().as_naive_date_time()?,
/* updated_at */ &self.clock.now().as_naive_date_time()?,
/* updated_by */ &self.name,
/* limit */ &i64::try_from(limit)?,
],
)
.await?;
Expand Down
18 changes: 15 additions & 3 deletions aggregator_core/src/datastore/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,11 @@ async fn get_unaggregated_client_reports_for_task(ephemeral_datastore: Ephemeral
.unwrap());

Ok(tx
.get_unaggregated_client_reports_for_task(&dummy_vdaf::Vdaf::new(), task.id())
.get_unaggregated_client_reports_for_task(
&dummy_vdaf::Vdaf::new(),
task.id(),
5000,
)
.await
.unwrap())
})
Expand Down Expand Up @@ -836,7 +840,11 @@ async fn get_unaggregated_client_reports_for_task(ephemeral_datastore: Ephemeral
.unwrap());

Ok(tx
.get_unaggregated_client_reports_for_task(&dummy_vdaf::Vdaf::new(), task.id())
.get_unaggregated_client_reports_for_task(
&dummy_vdaf::Vdaf::new(),
task.id(),
5000,
)
.await
.unwrap())
})
Expand Down Expand Up @@ -869,7 +877,11 @@ async fn get_unaggregated_client_reports_for_task(ephemeral_datastore: Ephemeral
);

Ok(tx
.get_unaggregated_client_reports_for_task(&dummy_vdaf::Vdaf::new(), task.id())
.get_unaggregated_client_reports_for_task(
&dummy_vdaf::Vdaf::new(),
task.id(),
5000,
)
.await
.unwrap())
})
Expand Down
4 changes: 4 additions & 0 deletions docs/samples/advanced_config/aggregation_job_creator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ min_aggregation_job_size: 10

# Maximum aggregation job size, in reports. (required)
max_aggregation_job_size: 100

# Maximum number of reports to load at a time when creating aggregation jobs.
# (optional, defaults to 5000)
aggregation_job_creation_report_window: 5000
1 change: 1 addition & 0 deletions integration_tests/src/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl JanusInProcess {
aggregation_job_creation_interval_secs: 1,
min_aggregation_job_size: 1,
max_aggregation_job_size: 100,
aggregation_job_creation_report_window: 5000,
};
let aggregation_job_driver_options = AggregationJobDriverOptions {
common: common_binary_options.clone(),
Expand Down
Loading