Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Call noise correction python binary from kotlin. #1803

Merged
merged 30 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4b6d369
Call python binary from kotlin.
ple13 Sep 12, 2024
170778c
Merge branch 'main' into lephi-noise-correction-works-with-report-proto
ple13 Sep 12, 2024
0f56f0b
Fix Lint.
ple13 Sep 12, 2024
cb74ffa
Fix Lint.
ple13 Sep 12, 2024
0fa5b25
Fix Lint.
ple13 Sep 12, 2024
f45692b
Address comments.
ple13 Sep 16, 2024
178b19a
Clean up.
ple13 Sep 16, 2024
875164c
Fix Lint.
ple13 Sep 16, 2024
b073960
The noise correction tool uses report in json format as input.
ple13 Oct 1, 2024
bdc12e8
Fix Lint.
ple13 Oct 1, 2024
e2cf97d
Fix Lint.
ple13 Oct 1, 2024
6c5613e
Support multiple filter groups.
ple13 Oct 2, 2024
ed8654e
Fix Lint.
ple13 Oct 2, 2024
a7f6dff
Address comments.
ple13 Oct 3, 2024
2f3aca6
Fix lint.
ple13 Oct 4, 2024
ccb4f51
Merge branch 'main' into lephi-noise-correction-works-with-report-proto
ple13 Oct 4, 2024
e04927a
Address comments.
ple13 Oct 7, 2024
ffa54a0
Fix Lint.
ple13 Oct 7, 2024
202d233
Address comments.
ple13 Oct 8, 2024
400034b
Fix Lint.
ple13 Oct 8, 2024
0315e94
Clean up python import.
ple13 Oct 8, 2024
688d85f
Address comments.
ple13 Oct 9, 2024
79ccd89
Merge branch 'main' into lephi-noise-correction-works-with-report-proto
ple13 Oct 9, 2024
74bca3d
Address comments.
ple13 Oct 10, 2024
363ffda
Fix Lint.
ple13 Oct 10, 2024
282bb21
Address comments.
ple13 Oct 10, 2024
6d04398
Fix Lint.
ple13 Oct 10, 2024
babc055
Add newline at the end of files.
ple13 Oct 10, 2024
535f471
Address comments.
ple13 Oct 10, 2024
fd1fe20
Merge branch 'main' into lephi-noise-correction-works-with-report-proto
ple13 Oct 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion MODULE.bazel.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
load("@wfa_rules_kotlin_jvm//kotlin:defs.bzl", "kt_jvm_library")

package(default_visibility = ["//visibility:public"])

kt_jvm_library(
name = "report_post_processing",
srcs = ["ReportPostProcessing.kt"],
resources = [
"//experimental/dp_consistency/src/main/python/wfa/measurement/reporting/postprocess/tools:post_process_origin_report_pyzip",
],
deps = [
":report_conversion",
"//experimental/dp_consistency/src/main/proto/wfa/measurement/reporting/postprocess:report_summary_kt_jvm_proto",
"//src/main/proto/wfa/measurement/reporting/v2alpha:report_kt_jvm_proto",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common",
],
)

kt_jvm_library(
name = "report_conversion",
srcs = ["ReportConversion.kt"],
deps = [
"//experimental/dp_consistency/src/main/proto/wfa/measurement/reporting/postprocess:report_summary_kt_jvm_proto",
"//src/main/proto/wfa/measurement/reporting/v2alpha:report_kt_jvm_proto",
"@maven//:com_google_protobuf_protobuf_java_util",
"@wfa_common_jvm//imports/java/com/google/gson",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2024 The Cross-Media Measurement Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.wfanet.measurement.reporting.postprocessing

import com.google.protobuf.InvalidProtocolBufferException
import com.google.protobuf.util.JsonFormat
import org.wfanet.measurement.reporting.MeasurementDetailKt
import org.wfanet.measurement.reporting.ReportSummary
import org.wfanet.measurement.reporting.measurementDetail
import org.wfanet.measurement.reporting.reportSummary
import org.wfanet.measurement.reporting.v2alpha.Metric
import org.wfanet.measurement.reporting.v2alpha.Report

data class ReportingSetSummary(
/** The measurement policy (e.g. AMI, MRC, or CUSTOM) used for this reporting set. */
val measurementPolicy: String,
/** The data providers associated with the reporting set. */
val dataProviders: List<String>,
)

data class SetOperationSummary(
val isCumulative: Boolean,
/** The type of set operation which is one of cumulative, union, difference, or incremental. */
val setOperation: String,
)

object ReportConversion {
fun getReportFromJsonString(reportAsJsonString: String): Report {
val protoBuilder = Report.newBuilder()
try {
JsonFormat.parser().merge(reportAsJsonString, protoBuilder)
} catch (e: InvalidProtocolBufferException) {
throw IllegalArgumentException("Failed to parse Report from JSON string", e)
}
return protoBuilder.build()
}

fun convertJsontoReportSummaries(reportAsJsonString: String): List<ReportSummary> {
return getReportFromJsonString(reportAsJsonString).toReportSummaries()
}

fun getMeasurementPolicy(tag: String): String {
when {
"measurement_policy=AMI" in tag -> return "ami"
"measurement_policy=MRC" in tag -> return "mrc"
"measurement_policy=CUSTOM" in tag -> return "custom"
else -> error("Measurement policy must be ami, or mrc, or custom.")
}
}

fun getSetOperation(tag: String): String {
val parts = tag.split(", ")
val setOperationPart = parts.find { it.startsWith("set_operation=") }
return setOperationPart?.let { it.substringAfter("set_operation=") }
?: error("Set operation must be specified.")
}

fun isCumulative(tag: String): Boolean {
return tag.contains("cumulative=true")
}

fun getTargets(tag: String): List<String> {
val parts = tag.split(", ")
val targetPart = parts.find { it.startsWith("target=") }
return targetPart?.let { it.substringAfter("target=").split(",") }
?: error("There must be at least one target.")
}
}

fun Report.toReportSummaries(): List<ReportSummary> {
require(state == Report.State.SUCCEEDED) { "Unsucceeded report is not supported." }

val measurementPoliciesByReportingSet =
reportingMetricEntriesList.associate { entry ->
val reportingSet = entry.key
val tag = tags.getValue(reportingSet)
reportingSet to
ReportingSetSummary(
ReportConversion.getMeasurementPolicy(tag),
ReportConversion.getTargets(tag),
)
}

val metricCalculationSpecs =
reportingMetricEntriesList.flatMapTo(mutableSetOf()) { it.value.metricCalculationSpecsList }

val setOperationByMetricCalculationSpec =
metricCalculationSpecs.associate { spec ->
val tag = tags.getValue(spec)
spec to
SetOperationSummary(
ReportConversion.isCumulative(tag),
ReportConversion.getSetOperation(tag),
)
}

val filterGroupByMetricCalculationSpec =
metricCalculationSpecs.associate { spec ->
val tag = tags.getValue(spec)
spec to tag.split(", ").find { it.startsWith("common_filter=") }
}

val filterGroups = filterGroupByMetricCalculationSpec.values.toSet()

// Groups results by (reporting set x metric calculation spec).
val measurementSets =
metricCalculationResultsList.groupBy { Pair(it.metricCalculationSpec, it.reportingSet) }

val reportSummaries = mutableListOf<ReportSummary>()
for (filter in filterGroups) {
val reportSummary = reportSummary {
measurementSets.forEach { (key, value) ->
if (filterGroupByMetricCalculationSpec.getValue(key.first) == filter) {
measurementDetails += measurementDetail {
measurementPolicy = measurementPoliciesByReportingSet[key.second]!!.measurementPolicy
dataProviders += measurementPoliciesByReportingSet[key.second]!!.dataProviders
isCumulative = setOperationByMetricCalculationSpec[key.first]!!.isCumulative
setOperation = setOperationByMetricCalculationSpec[key.first]!!.setOperation
var measurementList =
value
.flatMap { it.resultAttributesList }
.sortedBy { it.timeInterval.endTime.seconds }
.filter { it.metricResult.hasReach() || it.metricResult.hasReachAndFrequency() }
.map { resultAttribute ->
require(resultAttribute.state == Metric.State.SUCCEEDED) {
"Unsucceeded measurement result is not supported."
}
MeasurementDetailKt.measurementResult {
if (resultAttribute.metricResult.hasReach()) {
reach = resultAttribute.metricResult.reach.value
standardDeviation =
resultAttribute.metricResult.reach.univariateStatistics.standardDeviation
} else {
reach = resultAttribute.metricResult.reachAndFrequency.reach.value
standardDeviation =
resultAttribute.metricResult.reachAndFrequency.reach.univariateStatistics
.standardDeviation
}
metric = resultAttribute.metric
}
}
measurementResults.addAll(measurementList)
}
}
}
}
reportSummaries.add(reportSummary)
}
return reportSummaries
}
Loading
Loading