diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java index 0ac009712af2..95554c0c8099 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.nexmark; import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toMap; import static org.apache.beam.sdk.nexmark.NexmarkUtils.processingMode; import java.io.IOException; @@ -27,6 +26,7 @@ import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -42,6 +42,7 @@ import org.apache.beam.sdk.nexmark.model.Person; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher; +import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher.DataPoint; import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; @@ -150,17 +151,9 @@ void runAll(String[] args) throws IOException { saveSummary(null, configurations, actual, baseline, start); } - final ImmutableMap schema = - ImmutableMap.builder() - .put("timestamp", "timestamp") - .put("runtimeSec", "float") - .put("eventsPerSec", "float") - .put("numResults", "integer") - .build(); - if (options.getExportSummaryToInfluxDB()) { final long timestamp = start.getMillis() / 1000; // seconds - savePerfsToInfluxDB(options, schema, actual, timestamp); + savePerfsToInfluxDB(options, actual, timestamp); } } finally { @@ -179,24 +172,18 @@ void runAll(String[] args) throws IOException { private static void savePerfsToInfluxDB( final NexmarkOptions options, - final Map schema, final Map results, final long timestamp) { final InfluxDBSettings settings = getInfluxSettings(options); - final Map tags = options.getInfluxTags(); - final String runner = options.getRunner().getSimpleName(); - final List> schemaResults = + final Map tags = + options.getInfluxTags() != null ? new HashMap<>(options.getInfluxTags()) : new HashMap<>(); + tags.put("runner", options.getRunner().getSimpleName()); + + final List dataPoints = results.entrySet().stream() - .map( - entry -> - getResultsFromSchema( - entry.getValue(), - schema, - timestamp, - runner, - produceMeasurement(options, entry))) + .map(entry -> createInfluxDBDataPoint(options, entry, tags, timestamp)) .collect(toList()); - InfluxDBPublisher.publishNexmarkResults(schemaResults, settings, tags); + InfluxDBPublisher.publish(settings, dataPoints); } private static InfluxDBSettings getInfluxSettings(final NexmarkOptions options) { @@ -208,38 +195,25 @@ private static InfluxDBSettings getInfluxSettings(final NexmarkOptions options) .get(); } - private static String produceMeasurement( - final NexmarkOptions options, Map.Entry entry) { + private static String generateMeasurementName( + final NexmarkOptions options, NexmarkConfiguration config) { final String queryName = - NexmarkUtils.fullQueryName( - options.getQueryLanguage(), entry.getKey().query.getNumberOrName()); + NexmarkUtils.fullQueryName(options.getQueryLanguage(), config.query.getNumberOrName()); return String.format( "%s_%s_%s", options.getBaseInfluxMeasurement(), queryName, processingMode(options.isStreaming())); } - private static Map getResultsFromSchema( - final NexmarkPerf results, - final Map schema, - final long timestamp, - final String runner, - final String measurement) { - final Map schemaResults = - results.toMap().entrySet().stream() - .filter(element -> schema.containsKey(element.getKey())) - .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); - final int runtimeMs = - (int) ((double) schemaResults.get("runtimeSec") * 1000); // change sec to ms - schemaResults.put("timestamp", timestamp); - schemaResults.put("runner", runner); - schemaResults.put("measurement", measurement); - - // By default, InfluxDB treats all number values as floats. We need to add 'i' suffix to - // interpret the value as an integer. - schemaResults.put("runtimeMs", runtimeMs + "i"); - schemaResults.put("numResults", schemaResults.get("numResults") + "i"); - - return schemaResults; + private static InfluxDBPublisher.DataPoint createInfluxDBDataPoint( + final NexmarkOptions options, + final Map.Entry entry, + final Map tags, + final long timestamp) { + String measurement = generateMeasurementName(options, entry.getKey()); + int runtimeMs = (int) (entry.getValue().runtimeSec * 1000); // change sec to ms + Map fields = + ImmutableMap.of("runtimeMs", runtimeMs, "numResults", entry.getValue().numResults); + return InfluxDBPublisher.dataPoint(measurement, tags, fields, timestamp); } /** Append the pair of {@code configuration} and {@code perf} to perf file. */ diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java index 7855a292501d..72200ab54f9f 100644 --- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java +++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java @@ -19,6 +19,7 @@ import com.google.cloud.bigquery.LegacySQLTypeName; import java.util.Map; +import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,10 +77,9 @@ public static NamedTestResult create( @Override public Map toMap() { return ImmutableMap.builder() - .put("test_id", testId) + .putAll(tags()) + .putAll(fields()) .put("timestamp", timestamp) - .put("metric", metric) - .put("value", value) .build(); } @@ -94,4 +94,17 @@ public String getMetric() { public double getValue() { return value; } + + public Map tags() { + return ImmutableMap.of("test_id", testId, "metric", metric); + } + + public Map fields() { + return ImmutableMap.of("value", value); + } + + /** Convert this result to InfluxDB data point. */ + public InfluxDBPublisher.DataPoint toInfluxDBDataPoint(String measurement) { + return InfluxDBPublisher.dataPoint(measurement, tags(), fields(), null); + } } diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java index 02c721e3eff0..d00580c88e58 100644 --- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java +++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java @@ -20,9 +20,13 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.isNull; import static java.util.Objects.requireNonNull; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNoneBlank; +import com.google.auto.value.AutoValue; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; @@ -31,7 +35,14 @@ import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.apache.beam.sdk.testutils.NamedTestResult; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Collections2; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.commons.compress.utils.Charsets; import org.apache.http.Header; import org.apache.http.HttpEntity; @@ -46,35 +57,87 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; +import org.checkerframework.dataflow.qual.Pure; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public final class InfluxDBPublisher { private static final Logger LOG = LoggerFactory.getLogger(InfluxDBPublisher.class); private InfluxDBPublisher() {} + /** InfluxDB data point. */ + @AutoValue + public abstract static class DataPoint { + DataPoint() {} + + public abstract @Pure String measurement(); + + public abstract @Pure Map tags(); + + public abstract @Pure Map fields(); + + @Nullable + public abstract @Pure Long timestamp(); + + public abstract @Pure TimeUnit timestampUnit(); + + @Override + public final String toString() { + return append(new StringBuilder()).toString(); + } + + private @Nullable Long timestampSecs() { + return timestamp() != null ? timestampUnit().toSeconds(timestamp()) : null; + } + + private StringBuilder append(StringBuilder builder) { + return addMeasurement(builder, measurement(), tags(), fields(), timestampSecs()); + } + } + + /** Creates an InfluxDB data point using optional custom Unix timestamp in seconds. */ + public static DataPoint dataPoint( + String measurement, + Map tags, + Map fields, + @Nullable Long timestampSecs) { + return new AutoValue_InfluxDBPublisher_DataPoint( + measurement, tags, fields, timestampSecs, TimeUnit.SECONDS); + } + + /** @deprecated Use {@link #publish} instead. */ + @Deprecated public static void publishNexmarkResults( final Collection> results, final InfluxDBSettings settings, final Map tags) { - publishWithCheck(settings, () -> publishNexmark(results, settings, tags)); + publishWithCheck(settings, nexmarkDataPoints(results, tags)); } public static void publishWithSettings( final Collection results, final InfluxDBSettings settings) { - publishWithCheck(settings, () -> publishCommon(results, settings)); + @SuppressWarnings("nullness") + Collection dataPoints = + Collections2.transform(results, res -> res.toInfluxDBDataPoint(settings.measurement)); + publish(settings, dataPoints); + } + + public static void publish( + final InfluxDBSettings settings, final Collection dataPoints) { + final StringBuilder builder = new StringBuilder(); + dataPoints.forEach(m -> m.append(builder).append('\n')); + publishWithCheck(settings, builder.toString()); } - private static void publishWithCheck( - final InfluxDBSettings settings, final PublishFunction publishFunction) { + private static void publishWithCheck(final InfluxDBSettings settings, final String data) { requireNonNull(settings, "InfluxDB settings must not be null"); if (isNoneBlank(settings.measurement, settings.database)) { try { - publishFunction.publish(); + final HttpClientBuilder builder = provideHttpBuilder(settings); + final HttpPost postRequest = providePOSTRequest(settings); + postRequest.setEntity(new GzipCompressingEntity(new ByteArrayEntity(data.getBytes(UTF_8)))); + executeWithVerification(postRequest, builder); } catch (Exception exception) { LOG.warn("Unable to publish metrics due to error: {}", exception.getMessage()); } @@ -83,73 +146,50 @@ private static void publishWithCheck( } } - private static void publishNexmark( - final Collection> results, - final InfluxDBSettings settings, - final Map tags) - throws Exception { - - final HttpClientBuilder builder = provideHttpBuilder(settings); - final HttpPost postRequest = providePOSTRequest(settings); - final StringBuilder metricBuilder = new StringBuilder(); - + /** @deprecated To be removed, kept for legacy interface {@link #publishNexmarkResults} */ + @VisibleForTesting + @Deprecated + static String nexmarkDataPoints( + final Collection> results, final Map tags) { + final StringBuilder builder = new StringBuilder(); + final Set fields = ImmutableSet.of("runtimeMs", "numResults"); results.forEach( map -> { - metricBuilder.append(map.get("measurement")).append(",").append(getKV(map, "runner")); - if (tags != null && !tags.isEmpty()) { - tags.entrySet().stream() - .forEach( - entry -> { - metricBuilder - .append(",") - .append(entry.getKey()) - .append("=") - .append(entry.getValue()); - }); - } - metricBuilder - .append(" ") - .append(getKV(map, "runtimeMs")) - .append(",") - .append(getKV(map, "numResults")) - .append(" ") - .append(map.get("timestamp")) + String measurement = checkArgumentNotNull(map.get("measurement")).toString(); + addMeasurement(builder, measurement, tags, filterKeys(map, fields), map.get("timestamp")) .append('\n'); }); + return builder.toString(); + } - postRequest.setEntity( - new GzipCompressingEntity(new ByteArrayEntity(metricBuilder.toString().getBytes(UTF_8)))); - - executeWithVerification(postRequest, builder); + @SuppressWarnings("nullness") + private static Map filterKeys(final Map map, final Set keys) { + return Maps.filterKeys(map, keys::contains); } - private static String getKV(final Map map, final String key) { - return key + "=" + map.get(key); + // fix types once nexmarkMeasurements is removed + private static StringBuilder addMeasurement( + StringBuilder builder, + String measurement, + Map tags, + Map fields, + @Nullable Object timestampSecs) { + checkState(!fields.isEmpty(), "fields cannot be empty"); + builder.append(measurement); + tags.forEach((k, v) -> builder.append(',').append(k).append('=').append(v)); + builder.append(' '); + fields.forEach((k, v) -> builder.append(k).append('=').append(fieldValue(v)).append(',')); + builder.setLength(builder.length() - 1); // skip last comma + if (timestampSecs != null) { + builder.append(' ').append(timestampSecs); + } + return builder; } - private static void publishCommon( - final Collection results, final InfluxDBSettings settings) throws Exception { - - final HttpClientBuilder builder = provideHttpBuilder(settings); - final HttpPost postRequest = providePOSTRequest(settings); - final StringBuilder metricBuilder = new StringBuilder(); - results.stream() - .map(NamedTestResult::toMap) - .forEach( - map -> - metricBuilder - .append(settings.measurement) - .append(",") - .append(getKV(map, "test_id")) - .append(",") - .append(getKV(map, "metric")) - .append(" ") - .append(getKV(map, "value")) - .append('\n')); - - postRequest.setEntity(new ByteArrayEntity(metricBuilder.toString().getBytes(UTF_8))); - - executeWithVerification(postRequest, builder); + private static String fieldValue(@Nullable Object value) { + checkStateNotNull(value, "field value cannot be null"); + // append 'i' suffix for 64-bit integer, default is float + return (value instanceof Integer || value instanceof Long) ? value + "i" : value.toString(); } private static HttpClientBuilder provideHttpBuilder(final InfluxDBSettings settings) { @@ -197,9 +237,4 @@ private static String getErrorMessage(final HttpEntity entity) throws IOExceptio new Gson().fromJson(EntityUtils.toString(entity, encoding), JsonObject.class).get("error"); return isNull(errorElement) ? "[Unable to get error message]" : errorElement.toString(); } - - @FunctionalInterface - private interface PublishFunction { - void publish() throws Exception; - } } diff --git a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisherTest.java b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisherTest.java new file mode 100644 index 000000000000..b796af6a1063 --- /dev/null +++ b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisherTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testutils.publishing; + +import static org.junit.Assert.assertEquals; + +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.testutils.NamedTestResult; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.junit.Test; + +public final class InfluxDBPublisherTest { + + @Test + public void testNexmarkDataPoints() { + Map measurement = + ImmutableMap.builder() + .put("measurement", "name") + .put("timestamp", 9999L) + .put("runtimeMs", "1000i") + .put("numResults", "10i") + .build(); + List> measurements = + ImmutableList.of(measurement, measurement, measurement); + + Map tags = + ImmutableMap.of( + "runner", "test", + "tag", "value"); + + String actual = InfluxDBPublisher.nexmarkDataPoints(measurements, tags); + String expected = "name,runner=test,tag=value runtimeMs=1000i,numResults=10i 9999\n"; + + assertEquals(expected + expected + expected, actual); + } + + @Test + public void testNamedTestResultToDataPoint() { + NamedTestResult result = NamedTestResult.create("id1", "9999", "metric1", 100); + + String actual = result.toInfluxDBDataPoint("name").toString(); + String expected = "name,test_id=id1,metric=metric1 value=100.0"; + + assertEquals(expected, actual); + } + + @Test + public void testDataPointToString() { + Map tags = ImmutableMap.of("tag1", "t1", "tag2", "t2"); + Map fields = ImmutableMap.of("integer", 100, "float", 100.0); + + assertEquals( + "m1,tag1=t1,tag2=t2 integer=100i,float=100.0 999", + InfluxDBPublisher.dataPoint("m1", tags, fields, 999L).toString()); + assertEquals( + "m1,tag1=t1,tag2=t2 integer=100i,float=100.0", + InfluxDBPublisher.dataPoint("m1", tags, fields, null).toString()); + } +}