diff --git a/draft-ietf-ppm-dap.md b/draft-ietf-ppm-dap.md index 19215eee..762e78c7 100644 --- a/draft-ietf-ppm-dap.md +++ b/draft-ietf-ppm-dap.md @@ -413,6 +413,11 @@ Batch: : A set of reports (i.e., measurements) that are aggregated into an aggregate result. As defined in {{!VDAF}}. +Batch bucket: +: State associated with a given batch allowing the aggregators to perform + incremental aggregation. Depending on the batch mode, there may be many batch + buckets tracking the state of a single batch. + Batch duration: : The time difference between the oldest and newest report in a batch. @@ -663,7 +668,10 @@ time. The Leader distributes the shares to the Helper and orchestrates the process of verifying them (see {{validating-inputs}}) and assembling them into a final -aggregate result for the Collector. +aggregate result for the Collector. Depending on the VDAF, it may be possible to +incrementally process each report as it is uploaded, or it may be necessary to +wait until the Collector transmits a collection request before processing can +begin. ## Validating Measurements {#validating-inputs} @@ -1406,10 +1414,13 @@ across vast numbers of reports and preparation of large histograms, may be better suited for the asynchronous model. For use cases where datastore performance is a concern, the synchronous model may be better suited. -In general, reports cannot be aggregated until the Collector specifies an -aggregation parameter. However, in some situations it is possible to begin -aggregation as soon as reports arrive. For example, Prio3 has just one valid -aggregation parameter (the empty string). +In general, aggregation cannot begin until the Collector specifies a query and +an aggregation parameter. However, depending on the VDAF and batch mode in use, +it is often possible to begin aggregation as soon as reports arrive. For +example, Prio3 has just one valid aggregation parameter (the empty string), and +thus allows for eager aggregation; and both the time-interval and +leader-selected batch modes defined in this document allow for eager +aggregation. An aggregation job can be thought of as having three phases, which are described in the remaining subsections: @@ -1419,15 +1430,13 @@ described in the remaining subsections: - Continuation: Continue the aggregation flow by exchanging prep shares and messages until preparation completes or an error occurs. - Completion: Finish the aggregation flow, yielding an output share - corresponding to each report share in the aggregation job. + corresponding to each report share in the aggregation job, and update the + appropriate aggregate shares with these output shares. -After an aggregation job is completed, each Aggregator stores the output shares -until the aggregate share is collected as described in {{collect-flow}}. Note -that it is usually not necessary to store output shares individually: depending -on the batch mode and VDAF, the output shares can be merged into existing -aggregate shares that are updated as aggregation jobs complete. This streaming -aggregation is compatible with Prio3 and all batch modes specified in this -document. +After an aggregation job is completed, each Aggregator updates running-total +aggregate shares and other values for each "batch bucket" associated with a +recovered output share, as described in {{batch-buckets}}. These values are +stored until the batch is collected as described in {{collect-flow}}. Apart from VDAF preparation, another important task of the aggregation interaction is to provide replay protection ({{replay-protection}}). Along with @@ -1626,8 +1635,8 @@ Otherwise, the Leader proceeds as follows with each report: the preparation process is complete: either `state == Rejected()`, in which case the Leader rejects the report and removes it from the candidate set; or `state == Finished(out_share)`, in which case preparation is complete and the - Leader stores the output share for use in the collection interaction - {{collect-flow}}. + Leader updates the relevant batch bucket with `out_share` as described in + {{batch-buckets}}. 1. Else if the type is "reject", then the Leader rejects the report and removes it from the candidate set. The Leader MUST NOT include the report in a @@ -1637,8 +1646,8 @@ Otherwise, the Leader proceeds as follows with each report: 1. Else the type is invalid, in which case the Leader MUST abort the aggregation job. -When the Leader stores the `out_share`, it MUST also store the report ID for -replay protection. +When the Leader updates a batch bucket based on `out_share`, it MUST also store +the report ID for replay protection. (Note: Since VDAF preparation completes in a constant number of rounds, it will never be the case that some reports are completed and others are not.) @@ -1785,8 +1794,8 @@ variant { } PrepareResp; ~~~ -Otherwise it stores the report ID for replay protection and `out_share` for use -in the collection interaction ({{collect-flow}}). +Otherwise it stores the report ID for replay protection and updates the relevant +batch bucket with `out_share` as described in {{batch-buckets}}. Finally, the Helper creates an `AggregationJobResp` to send to the Leader. This message is structured as follows: @@ -1991,13 +2000,14 @@ Otherwise, the Leader proceeds as follows with each report: ) ~~~ - where `task_id` is the task ID and `inbound` is the message payload. If `outbound != None`, then the - Leader stores `state` and `outbound` and proceeds to another continuation - step. If `outbound == None`, then the preparation process is complete: either - `state == Rejected()`, in which case the Leader rejects the report and - removes it from the candidate set; or `state == Finished(out_share)`, in - which case preparation is complete and the Leader stores the output share for - use in the collection interaction {{collect-flow}}. + where `task_id` is the task ID and `inbound` is the message payload. If + `outbound != None`, then the Leader stores `state` and `outbound` and + proceeds to another continuation step. If `outbound == None`, then the + preparation process is complete: either `state == Rejected()`, in which case + the Leader rejects the report and removes it from the candidate set; or + `state == Finished(out_share)`, in which case preparation is complete and and + the Leader updates the relevant batch bucket with `out_share` as described in + {{batch-buckets}}. 1. Else if the type is "finished" and `state == Finished(out_share)`, then preparation is complete and the Leader stores the output share for use in @@ -2089,8 +2099,8 @@ variant { } PrepareResp; ~~~ -Otherwise it stores the report ID for replay protection and `out_share` for use -in the collection interaction ({{collect-flow}}). +Otherwise it stores the report ID for replay protection and updates the relevant +batch bucket with `out_share` as described in {{batch-buckets}}. The Helper's response depends on the value of `outbound`. If `outbound != None`, then the Helper's response is @@ -2137,6 +2147,59 @@ send an HTTP DELETE request to `{helper}/tasks/{task-id}/aggregation_jobs/{aggregation-job-id}` so that the Helper knows it can clean up its state. +#### Batch Buckets {#batch-buckets} + +When aggregation recovers an output share, it must be stored into an appropriate +"batch bucket", which is defined in this section. The data stored in a batch +bucket is kept for eventual use in the {{collect-flow}}. + +Batch buckets are indexed by a batch mode-specific "batch bucket identifier": + +* For the time-interval batch mode, the batch bucket identifier is an interval + of time whose beginning is a multiple of the task's `time_precision` and whose + duration is equal to the task's `time_precision`. The identifier associated + with a given output share is the unique such interval containing the client + timestamp of the report the output share was recovered from; for example, if + the task's `time_precision` is 1000 seconds and the report's timestamp is + 1729629081, the relevant batch bucket identifier is `(1729629000, 1000)`. (The + first element of the pair denotes the start of the batch interval and the + second denotes the duration.) +* For the leader-selected batch mode, the batch bucket identifier is a batch ID. + The identifier associated with a given output share is the batch ID of the + related aggregation job. + +A few different pieces of information are associated with each batch bucket: + +* A VDAF-specific aggregate share, as defined in {{!VDAF}}. +* A count of a number of reports included in the batch bucket. +* A 32-byte checksum value, as defined below. + +When processing an output share `out_share`, the following procedure is used to +update the batch buckets: + +* Look up the existing batch bucket for the batch bucket identifier + associated with the aggregation job and output share. + * If there is no existing batch bucket, initialize a new one. The initial + aggregate share value is computed as `Vdaf.agg_init(agg_param)`, where + `agg_param` is the aggregation parameter associated with the aggregation job + (see {{!VDAF, Section 4.4}}); the initial count is `0`; and the initial + checksum is 32 zero bytes. +* Update the aggregate share `agg_share` to `Vdaf.agg_update(agg_param, + agg_share, out_share)`. +* Increment the count by 1. +* Update the checksum value to the bitwise-XOR of the checksum value with the + SHA256 {{!SHS=DOI.10.6028/NIST.FIPS.180-4}} hash of the report ID associated + with the output share. + +Implementation note: this section describes a single set of values associated +with each batch bucket. However, implementations are free to shard the aggregate +share/count/checksum values associated with the batch bucket, combining them +back into a single set of values when reading the batch bucket. This may be +useful to avoid write contention. The aggregate share values are combined using +`Vdaf.merge(agg_param, agg_shares)` (see {{!VDAF, Section 4.4}}), the count +values are combined by summing, and the checksum values are combined by bitwise +XOR. + #### Recovering from Aggregation Step Skew {#aggregation-step-skew-recovery} `AggregationJobContinueReq` messages contain a `step` field, allowing @@ -2223,8 +2286,10 @@ running aggregation jobs ({{aggregate-flow}}) until the Collector initiates a collection job. This is because, in general, the aggregation parameter is not known until this point. In certain situations it is possible to predict the aggregation parameter in advance. For example, for Prio3 the only valid -aggregation parameter is the empty string. For these reasons, the collection -job is handled asynchronously. +aggregation parameter is the empty string. For these reasons, the collection job +is handled asynchronously. If aggregation is performed eagerly, the Leader MUST +validate that the aggregation parameter received in the `CollectionJobReq` +matches the aggregation parameter used in aggregations. Upon receipt of a `CollectionJobReq`, the Leader begins by checking that it recognizes the task ID in the request path. If not, it MUST abort with error @@ -2337,12 +2402,28 @@ state related to it. ### Obtaining Aggregate Shares {#collect-aggregate} -The Leader must obtain the Helper's encrypted aggregate share before it can -complete a collection job. To do this, the Leader first computes a checksum -over the reports included in the batch. The checksum is computed by taking the -SHA256 {{!SHS=DOI.10.6028/NIST.FIPS.180-4}} hash of each report ID from the -Client reports included in the aggregation, then combining the hash values with -a bitwise-XOR operation. +The Leader must compute its own aggregate share, as well as obtaining the +Helper's encrypted aggregate share, before it can complete a collection job. + +First, the Leader retrieves all batch buckets ({{batch-buckets}}) associated +with this collection job. The batch buckets to retrieve depend on the batch mode +of this task: + +* For time-interval tasks, this is all batch buckets whose batch bucket + identifiers are contained within the batch interval specified in the + `CollectionJobReq`'s query. +* For leader-selected tasks, this is the batch bucket associated with the batch + ID the Leader has chosen for this collection job. + +The Leader then combines the values inside the batch bucket as follows: + +* Aggregate shares are combined via `Vdaf.merge(agg_param, agg_shares)` ((see + {{!VDAF, Section 4.4}})), where `agg_param` is the aggregation parameter + provided in the `CollectionJobReq`, and `agg_shares` are the (partial) + aggregate shares in the batch buckets. The result is the final aggregate share + for this collection job. +* Report counts are combined via summing. +* Checksums are combined via bitwise XOR. Then the Leader sends a POST request to `{helper}/tasks/{task-id}/aggregate_shares` with the following message: @@ -2379,14 +2460,15 @@ message contains the following parameters: Helper MUST abort with "invalidMessage". * `agg_param`: The opaque aggregation parameter for the VDAF being executed. - This value MUST match the AggregationJobInitReq message for each aggregation + This value MUST match the `AggregationJobInitReq` message for each aggregation job used to compute the aggregate shares (see {{leader-init}}) and the aggregation parameter indicated by the Collector in the CollectionJobReq message (see {{collect-init}}). -* `report_count`: The number number of reports included in the batch. +* `report_count`: The number number of reports included in the batch, as + computed above. -* `checksum`: The batch checksum. +* `checksum`: The batch checksum, as computed above. Leaders MUST authenticate their requests to Helpers using a scheme that meets the requirements in {{request-authentication}}. @@ -2395,25 +2477,16 @@ To handle the Leader's request, the Helper first ensures that it recognizes the task ID in the request path. If not, it MUST abort with error `unrecognizedTask`. The Helper then verifies that the request meets the requirements for batch parameters following the procedure in -{{batch-validation}}. - -Next, it computes a checksum based on the reports that satisfy the query, and -checks that the `report_count` and `checksum` included in the request match its -computed values. If not, then it MUST abort with an error of type -"batchMismatch". - -Next, it computes the aggregate share `agg_share` corresponding to the set of -output shares, denoted `out_shares`, for the batch interval, as follows: - -~~~ pseudocode -agg_share = Vdaf.aggregate(agg_param, out_shares) -~~~ +{{batch-validation}}. The Helper MUST validate that the aggregation parameter +matches the aggregation parameter used during aggregation of this batch; +otherwise, it aborts with "invalidMessage". -Implementation note: For most VDAFs, including Prio3, it is possible to -aggregate output shares as they arrive rather than wait until the batch is -collected. For the batch modes specified in this document, it is necessary to -enforce the batch parameters as described in {{batch-validation}} so that the -Aggregator knows which aggregate share to update. +Next, the Helper retrieves and combines the batch buckets associated with the +request using the same process used by the Leader (as described at the beginning +of this section {{collect-aggregate}}), arriving at its final aggregate share, +report count, and checksum values. If the Helper's computed report count and +checksum values do not match the values provided in the `AggregateShareReq`, it +MUST abort with an error of type "batchMismatch". The Helper then encrypts `agg_share` under the Collector's HPKE public key as described in {{aggregate-share-encrypt}}, yielding `encrypted_agg_share`. @@ -2442,15 +2515,14 @@ same response. After receiving the Helper's response, the Leader uses the HpkeCiphertext to finalize a collection job (see {{collect-finalization}}). -Once an AggregateShareReq has been issued for the batch determined by a given +Once an `AggregateShareReq` has been issued for the batch determined by a given query, it is an error for the Leader to issue any more aggregation jobs for additional reports that satisfy the query. These reports will be rejected by the Helper as described in {{input-share-validation}}. -Before completing the collection job, the Leader also computes its own aggregate -share `agg_share` by aggregating all of the prepared output shares that fall -within the batch interval. Finally, it encrypts its aggregate share under the -Collector's HPKE public key as described in {{aggregate-share-encrypt}}. +Before completing the collection job, the Leader encrypts its aggregate share +under the Collector's HPKE public key as described in +{{aggregate-share-encrypt}}. ### Collection Job Finalization {#collect-finalization}