Skip to content

Commit

Permalink
Specify "streaming" aggregation as the norm.
Browse files Browse the repository at this point in the history
Previously, aggregation was specified as if all output shares were
aggregated into an aggregate share in one fell swoop, at time of
collection. However, all implementations instead used an incremental
aggregation process to keep a "running total" aggregate share which was
updated as aggregation jobs completed, for performance reasons. This
commit switches the specification to match the implementations.

Phrasing things this way also makes the name "aggregation job" make
sense; as previously specified, there was no aggregation occurring
inside of an aggregation job!
  • Loading branch information
branlwyd committed Oct 25, 2024
1 parent bdd3ad6 commit 309ce85
Showing 1 changed file with 134 additions and 62 deletions.
196 changes: 134 additions & 62 deletions draft-ietf-ppm-dap.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,11 @@ Batch:
: A set of reports (i.e., measurements) that are aggregated into an aggregate
result. As defined in {{!VDAF}}.

Batch bucket:
: State associated with a given batch allowing the aggregators to perform
incremental aggregation. Depending on the batch mode, there may be many batch
buckets tracking the state of a single batch.

Batch duration:
: The time difference between the oldest and newest report in a batch.

Expand Down Expand Up @@ -663,7 +668,10 @@ time.

The Leader distributes the shares to the Helper and orchestrates the process of
verifying them (see {{validating-inputs}}) and assembling them into a final
aggregate result for the Collector.
aggregate result for the Collector. Depending on the VDAF, it may be possible to
incrementally process each report as it is uploaded, or it may be necessary to
wait until the Collector transmits a collection request before processing can
begin.

## Validating Measurements {#validating-inputs}

Expand Down Expand Up @@ -1406,10 +1414,13 @@ across vast numbers of reports and preparation of large histograms, may be
better suited for the asynchronous model. For use cases where datastore
performance is a concern, the synchronous model may be better suited.

In general, reports cannot be aggregated until the Collector specifies an
aggregation parameter. However, in some situations it is possible to begin
aggregation as soon as reports arrive. For example, Prio3 has just one valid
aggregation parameter (the empty string).
In general, aggregation cannot begin until the Collector specifies a query and
an aggregation parameter. However, depending on the VDAF and batch mode in use,
it is often possible to begin aggregation as soon as reports arrive. For
example, Prio3 has just one valid aggregation parameter (the empty string), and
thus allows for eager aggregation; and both the time-interval and
leader-selected batch modes defined in this document allow for eager
aggregation.

An aggregation job can be thought of as having three phases, which are
described in the remaining subsections:
Expand All @@ -1419,15 +1430,13 @@ described in the remaining subsections:
- Continuation: Continue the aggregation flow by exchanging prep shares and
messages until preparation completes or an error occurs.
- Completion: Finish the aggregation flow, yielding an output share
corresponding to each report share in the aggregation job.
corresponding to each report share in the aggregation job, and update the
appropriate aggregate shares with these output shares.

After an aggregation job is completed, each Aggregator stores the output shares
until the aggregate share is collected as described in {{collect-flow}}. Note
that it is usually not necessary to store output shares individually: depending
on the batch mode and VDAF, the output shares can be merged into existing
aggregate shares that are updated as aggregation jobs complete. This streaming
aggregation is compatible with Prio3 and all batch modes specified in this
document.
After an aggregation job is completed, each Aggregator updates running-total
aggregate shares and other values for each "batch bucket" associated with a
recovered output share, as described in {{batch-buckets}}. These values are
stored until the batch is collected as described in {{collect-flow}}.

Apart from VDAF preparation, another important task of the aggregation
interaction is to provide replay protection ({{replay-protection}}). Along with
Expand Down Expand Up @@ -1626,8 +1635,8 @@ Otherwise, the Leader proceeds as follows with each report:
the preparation process is complete: either `state == Rejected()`, in which
case the Leader rejects the report and removes it from the candidate set; or
`state == Finished(out_share)`, in which case preparation is complete and the
Leader stores the output share for use in the collection interaction
{{collect-flow}}.
Leader updates the relevant batch bucket with `out_share` as described in
{{batch-buckets}}.

1. Else if the type is "reject", then the Leader rejects the report and removes
it from the candidate set. The Leader MUST NOT include the report in a
Expand All @@ -1637,8 +1646,8 @@ Otherwise, the Leader proceeds as follows with each report:
1. Else the type is invalid, in which case the Leader MUST abort the
aggregation job.

When the Leader stores the `out_share`, it MUST also store the report ID for
replay protection.
When the Leader updates a batch bucket based on `out_share`, it MUST also store
the report ID for replay protection.

(Note: Since VDAF preparation completes in a constant number of rounds, it will
never be the case that some reports are completed and others are not.)
Expand Down Expand Up @@ -1785,8 +1794,8 @@ variant {
} PrepareResp;
~~~

Otherwise it stores the report ID for replay protection and `out_share` for use
in the collection interaction ({{collect-flow}}).
Otherwise it stores the report ID for replay protection and updates the relevant
batch bucket with `out_share` as described in {{batch-buckets}}.

Finally, the Helper creates an `AggregationJobResp` to send to the Leader. This
message is structured as follows:
Expand Down Expand Up @@ -1991,13 +2000,14 @@ Otherwise, the Leader proceeds as follows with each report:
)
~~~

where `task_id` is the task ID and `inbound` is the message payload. If `outbound != None`, then the
Leader stores `state` and `outbound` and proceeds to another continuation
step. If `outbound == None`, then the preparation process is complete: either
`state == Rejected()`, in which case the Leader rejects the report and
removes it from the candidate set; or `state == Finished(out_share)`, in
which case preparation is complete and the Leader stores the output share for
use in the collection interaction {{collect-flow}}.
where `task_id` is the task ID and `inbound` is the message payload. If
`outbound != None`, then the Leader stores `state` and `outbound` and
proceeds to another continuation step. If `outbound == None`, then the
preparation process is complete: either `state == Rejected()`, in which case
the Leader rejects the report and removes it from the candidate set; or
`state == Finished(out_share)`, in which case preparation is complete and and
the Leader updates the relevant batch bucket with `out_share` as described in
{{batch-buckets}}.

1. Else if the type is "finished" and `state == Finished(out_share)`, then
preparation is complete and the Leader stores the output share for use in
Expand Down Expand Up @@ -2089,8 +2099,8 @@ variant {
} PrepareResp;
~~~

Otherwise it stores the report ID for replay protection and `out_share` for use
in the collection interaction ({{collect-flow}}).
Otherwise it stores the report ID for replay protection and updates the relevant
batch bucket with `out_share` as described in {{batch-buckets}}.

The Helper's response depends on the value of `outbound`. If `outbound !=
None`, then the Helper's response is
Expand Down Expand Up @@ -2137,6 +2147,59 @@ send an HTTP DELETE request to
`{helper}/tasks/{task-id}/aggregation_jobs/{aggregation-job-id}` so that the
Helper knows it can clean up its state.

#### Batch Buckets {#batch-buckets}

When aggregation recovers an output share, it must be stored into an appropriate
"batch bucket", which is defined in this section. The data stored in a batch
bucket is kept for eventual use in the {{collect-flow}}.

Batch buckets are indexed by a batch mode-specific "batch bucket identifier":

* For the time-interval batch mode, the batch bucket identifier is an interval
of time whose beginning is a multiple of the task's `time_precision` and whose
duration is equal to the task's `time_precision`. The identifier associated
with a given output share is the unique such interval containing the client
timestamp of the report the output share was recovered from; for example, if
the task's `time_precision` is 1000 seconds and the report's timestamp is
1729629081, the relevant batch bucket identifier is `(1729629000, 1000)`. (The
first element of the pair denotes the start of the batch interval and the
second denotes the duration.)
* For the leader-selected batch mode, the batch bucket identifier is a batch ID.
The identifier associated with a given output share is the batch ID of the
related aggregation job.

A few different pieces of information are associated with each batch bucket:

* A VDAF-specific aggregate share, as defined in {{!VDAF}}.
* A count of a number of reports included in the batch bucket.
* A 32-byte checksum value, as defined below.

When processing an output share `out_share`, the following procedure is used to
update the batch buckets:

* Look up the existing batch bucket for the batch bucket identifier
associated with the aggregation job and output share.
* If there is no existing batch bucket, initialize a new one. The initial
aggregate share value is computed as `Vdaf.agg_init(agg_param)`, where
`agg_param` is the aggregation parameter associated with the aggregation job
(see {{!VDAF, Section 4.4}}); the initial count is `0`; and the initial
checksum is 32 zero bytes.
* Update the aggregate share `agg_share` to `Vdaf.agg_update(agg_param,
agg_share, out_share)`.
* Increment the count by 1.
* Update the checksum value to the bitwise-XOR of the checksum value with the
SHA256 {{!SHS=DOI.10.6028/NIST.FIPS.180-4}} hash of the report ID associated
with the output share.

Implementation note: this section describes a single set of values associated
with each batch bucket. However, implementations are free to shard the aggregate
share/count/checksum values associated with the batch bucket, combining them
back into a single set of values when reading the batch bucket. This may be
useful to avoid write contention. The aggregate share values are combined using
`Vdaf.merge(agg_param, agg_shares)` (see {{!VDAF, Section 4.4}}), the count
values are combined by summing, and the checksum values are combined by bitwise
XOR.

#### Recovering from Aggregation Step Skew {#aggregation-step-skew-recovery}

`AggregationJobContinueReq` messages contain a `step` field, allowing
Expand Down Expand Up @@ -2223,8 +2286,10 @@ running aggregation jobs ({{aggregate-flow}}) until the Collector initiates a
collection job. This is because, in general, the aggregation parameter is not
known until this point. In certain situations it is possible to predict the
aggregation parameter in advance. For example, for Prio3 the only valid
aggregation parameter is the empty string. For these reasons, the collection
job is handled asynchronously.
aggregation parameter is the empty string. For these reasons, the collection job
is handled asynchronously. If aggregation is performed eagerly, the Leader MUST
validate that the aggregation parameter received in the `CollectionJobReq`
matches the aggregation parameter used in aggregations.

Upon receipt of a `CollectionJobReq`, the Leader begins by checking that it
recognizes the task ID in the request path. If not, it MUST abort with error
Expand Down Expand Up @@ -2337,12 +2402,28 @@ state related to it.

### Obtaining Aggregate Shares {#collect-aggregate}

The Leader must obtain the Helper's encrypted aggregate share before it can
complete a collection job. To do this, the Leader first computes a checksum
over the reports included in the batch. The checksum is computed by taking the
SHA256 {{!SHS=DOI.10.6028/NIST.FIPS.180-4}} hash of each report ID from the
Client reports included in the aggregation, then combining the hash values with
a bitwise-XOR operation.
The Leader must compute its own aggregate share, as well as obtaining the
Helper's encrypted aggregate share, before it can complete a collection job.

First, the Leader retrieves all batch buckets ({{batch-buckets}}) associated
with this collection job. The batch buckets to retrieve depend on the batch mode
of this task:

* For time-interval tasks, this is all batch buckets whose batch bucket
identifiers are contained within the batch interval specified in the
`CollectionJobReq`'s query.
* For leader-selected tasks, this is the batch bucket associated with the batch
ID the Leader has chosen for this collection job.
The Leader then combines the values inside the batch bucket as follows:
* Aggregate shares are combined via `Vdaf.merge(agg_param, agg_shares)` ((see
{{!VDAF, Section 4.4}})), where `agg_param` is the aggregation parameter
provided in the `CollectionJobReq`, and `agg_shares` are the (partial)
aggregate shares in the batch buckets. The result is the final aggregate share
for this collection job.
* Report counts are combined via summing.
* Checksums are combined via bitwise XOR.
Then the Leader sends a POST request to
`{helper}/tasks/{task-id}/aggregate_shares` with the following message:
Expand Down Expand Up @@ -2379,14 +2460,15 @@ message contains the following parameters:
Helper MUST abort with "invalidMessage".

* `agg_param`: The opaque aggregation parameter for the VDAF being executed.
This value MUST match the AggregationJobInitReq message for each aggregation
This value MUST match the `AggregationJobInitReq` message for each aggregation
job used to compute the aggregate shares (see {{leader-init}}) and the
aggregation parameter indicated by the Collector in the CollectionJobReq
message (see {{collect-init}}).

* `report_count`: The number number of reports included in the batch.
* `report_count`: The number number of reports included in the batch, as
computed above.

* `checksum`: The batch checksum.
* `checksum`: The batch checksum, as computed above.

Leaders MUST authenticate their requests to Helpers using a scheme that meets
the requirements in {{request-authentication}}.
Expand All @@ -2395,25 +2477,16 @@ To handle the Leader's request, the Helper first ensures that it recognizes the
task ID in the request path. If not, it MUST abort with error
`unrecognizedTask`. The Helper then verifies that the request meets the
requirements for batch parameters following the procedure in
{{batch-validation}}.

Next, it computes a checksum based on the reports that satisfy the query, and
checks that the `report_count` and `checksum` included in the request match its
computed values. If not, then it MUST abort with an error of type
"batchMismatch".

Next, it computes the aggregate share `agg_share` corresponding to the set of
output shares, denoted `out_shares`, for the batch interval, as follows:

~~~ pseudocode
agg_share = Vdaf.aggregate(agg_param, out_shares)
~~~
{{batch-validation}}. The Helper MUST validate that the aggregation parameter
matches the aggregation parameter used during aggregation of this batch;
otherwise, it aborts with "invalidMessage".

Implementation note: For most VDAFs, including Prio3, it is possible to
aggregate output shares as they arrive rather than wait until the batch is
collected. For the batch modes specified in this document, it is necessary to
enforce the batch parameters as described in {{batch-validation}} so that the
Aggregator knows which aggregate share to update.
Next, the Helper retrieves and combines the batch buckets associated with the
request using the same process used by the Leader (as described at the beginning
of this section {{collect-aggregate}}), arriving at its final aggregate share,
report count, and checksum values. If the Helper's computed report count and
checksum values do not match the values provided in the `AggregateShareReq`, it
MUST abort with an error of type "batchMismatch".

The Helper then encrypts `agg_share` under the Collector's HPKE public key as
described in {{aggregate-share-encrypt}}, yielding `encrypted_agg_share`.
Expand Down Expand Up @@ -2442,15 +2515,14 @@ same response.
After receiving the Helper's response, the Leader uses the HpkeCiphertext to
finalize a collection job (see {{collect-finalization}}).

Once an AggregateShareReq has been issued for the batch determined by a given
Once an `AggregateShareReq` has been issued for the batch determined by a given
query, it is an error for the Leader to issue any more aggregation jobs for
additional reports that satisfy the query. These reports will be rejected by the
Helper as described in {{input-share-validation}}.

Before completing the collection job, the Leader also computes its own aggregate
share `agg_share` by aggregating all of the prepared output shares that fall
within the batch interval. Finally, it encrypts its aggregate share under the
Collector's HPKE public key as described in {{aggregate-share-encrypt}}.
Before completing the collection job, the Leader encrypts its aggregate share
under the Collector's HPKE public key as described in
{{aggregate-share-encrypt}}.

### Collection Job Finalization {#collect-finalization}

Expand Down

0 comments on commit 309ce85

Please sign in to comment.