From 7cfd330e47eb04da7cd6eec314fd1ca8f82635d1 Mon Sep 17 00:00:00 2001 From: Alex Amato Date: Thu, 8 Apr 2021 20:56:01 -0700 Subject: [PATCH] [BEAM-11994] Update ShortIdMap's maps to only use valid keyable fields in the MonitoringInfo (urn and labels). --- .../beam/runners/core/metrics/ShortIdMap.java | 3 + .../runners/core/metrics/ShortIdMapTest.java | 133 ++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ShortIdMapTest.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ShortIdMap.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ShortIdMap.java index 03ac54ac8cdf..ef9481f4a3b6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ShortIdMap.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ShortIdMap.java @@ -30,6 +30,9 @@ public class ShortIdMap { public synchronized String getOrCreateShortId(MonitoringInfo info) { Preconditions.checkNotNull(info); + Preconditions.checkArgument(info.getPayload().isEmpty()); + Preconditions.checkArgument(!info.hasStartTime()); + String shortId = monitoringInfoMap.inverse().get(info); if (shortId == null) { shortId = "metric" + counter++; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ShortIdMapTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ShortIdMapTest.java new file mode 100644 index 000000000000..0a1eb12e7ef9 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ShortIdMapTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.metrics; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.beam.model.pipeline.v1.MetricsApi; +import org.apache.beam.sdk.values.KV; +import org.junit.Test; + +public class ShortIdMapTest { + + @Test + public void testShortIdAssignment() throws Exception { + ShortIdMap shortIdMap = new ShortIdMap(); + List> testCases = new ArrayList<>(); + + SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder(false); + builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64); + testCases.add(KV.of("metric0", builder.build())); + + builder = new SimpleMonitoringInfoBuilder(false); + builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT); + testCases.add(KV.of("metric1", builder.build())); + + builder = new SimpleMonitoringInfoBuilder(false); + builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_DOUBLE); + testCases.add(KV.of("metric2", builder.build())); + + builder = new SimpleMonitoringInfoBuilder(false); + builder.setUrn("TestingSentinelUrn"); + builder.setType("TestingSentinelType"); + testCases.add(KV.of("metric3", builder.build())); + + builder = new SimpleMonitoringInfoBuilder(false); + builder.setUrn(MonitoringInfoConstants.Urns.FINISH_BUNDLE_MSECS); + testCases.add(KV.of("metric4", builder.build())); + + builder = new SimpleMonitoringInfoBuilder(false); + builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64); + testCases.add(KV.of("metric5", builder.build())); + + builder = new SimpleMonitoringInfoBuilder(false); + builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64); + builder.setLabel(MonitoringInfoConstants.Labels.NAME, "metricNumber7"); + builder.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "myNamespace"); + builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "myPtransform"); + testCases.add(KV.of("metric6", builder.build())); + + builder = new SimpleMonitoringInfoBuilder(false); + builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64); + builder.setLabel(MonitoringInfoConstants.Labels.NAME, "metricNumber8"); + builder.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "myNamespace"); + builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "myPtransform"); + testCases.add(KV.of("metric7", builder.build())); + + builder = new SimpleMonitoringInfoBuilder(false); + builder.setUrn(MonitoringInfoConstants.Urns.API_REQUEST_COUNT); + builder.setLabel(MonitoringInfoConstants.Labels.SERVICE, "BigQuery"); + testCases.add(KV.of("metric8", builder.build())); + + builder = new SimpleMonitoringInfoBuilder(false); + builder.setUrn(MonitoringInfoConstants.Urns.API_REQUEST_COUNT); + builder.setLabel(MonitoringInfoConstants.Labels.SERVICE, "Storage"); + testCases.add(KV.of("metric9", builder.build())); + + // Validate that modifying the payload, but using the same URN/labels + // does not change the shortId assignment. + builder = new SimpleMonitoringInfoBuilder(false); + builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_DOUBLE); + testCases.add(KV.of("metric2", builder.build())); + + builder = new SimpleMonitoringInfoBuilder(false); + builder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64); + builder.setLabel(MonitoringInfoConstants.Labels.NAME, "metricNumber7"); + builder.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "myNamespace"); + builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "myPtransform"); + testCases.add(KV.of("metric6", builder.build())); + + // Verify each short ID is assigned properly. + Set expectedShortIds = new HashSet(); + for (KV entry : testCases) { + assertEquals(entry.getKey(), shortIdMap.getOrCreateShortId(entry.getValue())); + expectedShortIds.add(entry.getKey()); + } + + HashMap actualRecoveredInfos = new HashMap<>(); + for (String expectedShortId : expectedShortIds) { + actualRecoveredInfos.put(expectedShortId, shortIdMap.get(expectedShortId)); + } + // Retrieve all of the MonitoringInfos by short id, and verify that the + // metadata (everything but the payload) matches the originals + assertEquals(expectedShortIds, actualRecoveredInfos.keySet()); + for (KV entry : testCases) { + // Clear payloads of both expected and actual before comparing + MetricsApi.MonitoringInfo expectedMonitoringInfo = entry.getValue(); + MetricsApi.MonitoringInfo.Builder expected = + MetricsApi.MonitoringInfo.newBuilder(expectedMonitoringInfo); + expected.clearPayload(); + + MetricsApi.MonitoringInfo.Builder actual = + MetricsApi.MonitoringInfo.newBuilder(actualRecoveredInfos.get(entry.getKey())); + actual.clearPayload(); + assertEquals(expected.build(), actual.build()); + } + + // Verify each short ID is assigned properly, in reverse. + for (int i = testCases.size() - 1; i > 0; i--) { + assertEquals( + testCases.get(i).getKey(), shortIdMap.getOrCreateShortId(testCases.get(i).getValue())); + } + } +}