Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT SUBMIT SPLIT INTO OTHER PRsAdd HarnessMonitoringInfosRequest/Response InstructionRequest and MonitoringInfosMetadataRequest/Response support to Java #14490

Closed
wants to merge 4 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added the BiMap back to ShortIdMap
Alex Amato committed May 13, 2021
commit 99f21e98629b99d768a5f7babf88e4289628ad77
Original file line number Diff line number Diff line change
@@ -22,28 +22,40 @@
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A Class for registering short ids for MonitoringInfos. */
public class ShortIdMap {
private static final Logger LOG = LoggerFactory.getLogger(ShortIdMap.class);

private int counter = 0;
// Bidirectional map.
private HashMap<String, MetricsApi.MonitoringInfo> shortIdToInfo = new HashMap<>();
private HashMap<MonitoringInfoMetricName, String> infoKeyToShortId = new HashMap<>();
private BiMap<String, MonitoringInfo> monitoringInfoMap = HashBiMap.create();

public synchronized String getOrCreateShortId(MonitoringInfo info) {
Preconditions.checkNotNull(info);
MonitoringInfoMetricName infoKey = MonitoringInfoMetricName.of(info);
String shortId = infoKeyToShortId.get(infoKey);
// Remove the payload and startTime before using the MonitoringInfo as a key.
// AS only the URN+labels uniquely identify the MonitoringInfo
MonitoringInfo.Builder cleaner = MonitoringInfo.newBuilder(info);
cleaner.clearPayload();
cleaner.clearStartTime();
MonitoringInfo cleaned = cleaner.build();
LOG.info("original MontitoringInfo " + info);
LOG.info("cleaned MontitoringInfo " + cleaned);
String shortId = monitoringInfoMap.inverse().get(cleaned);
if (shortId == null) {
shortId = "metric" + counter++;
infoKeyToShortId.put(infoKey, shortId);
shortIdToInfo.put(shortId, info);
LOG.info("Assign new short ID: " + shortId + " cleaned MonitoringInfo " + cleaned);
monitoringInfoMap.put(shortId, info);
}
return shortId;
}

public synchronized MonitoringInfo get(String shortId) {
MonitoringInfo monitoringInfo = shortIdToInfo.get(shortId);
MonitoringInfo monitoringInfo = monitoringInfoMap.get(shortId);
if (monitoringInfo == null) {
throw new NoSuchElementException(shortId);
}
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ public void testShortIdAssignment() throws Exception {
builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_DOUBLE);
builder.setDoubleDistributionValue(1, 1, 1, 1);
testCases.add(KV.of("metric2", builder.build()));

/*
builder = new SimpleMonitoringInfoBuilder(false);
builder.setUrn("TestingSentinelUrn");
builder.setType("TestingSentinelType");
@@ -92,7 +92,7 @@ public void testShortIdAssignment() throws Exception {
builder.setLabel(MonitoringInfoConstants.Labels.SERVICE, "Storage");
builder.setInt64SumValue(4);
testCases.add(KV.of("metric9", builder.build()));

*/
// Validate that modifying the payload, but using the same URN/labels
// does not change the shortId assignment.
builder = new SimpleMonitoringInfoBuilder(false);
@@ -123,7 +123,7 @@ public void testShortIdAssignment() throws Exception {
// metadata (everything but the payload) matches the originals
assertEquals(expectedShortIds, actualRecoveredInfos.keySet());
for (KV<String, MetricsApi.MonitoringInfo> entry : testCases) {
// Clear payloads of both expected and actual becfore comparing
// Clear payloads of both expected and actual before comparing
MetricsApi.MonitoringInfo expectedMonitoringInfo = entry.getValue();
MetricsApi.MonitoringInfo.Builder expected =
MetricsApi.MonitoringInfo.newBuilder(expectedMonitoringInfo);