Skip to content

Commit

Permalink
Generalize interface of InfluxDBPublisher to support more use cases (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Moritz Mack authored Sep 5, 2022
1 parent 25c6ed7 commit 3c91e7b
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.sdk.nexmark;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.beam.sdk.nexmark.NexmarkUtils.processingMode;

import java.io.IOException;
Expand All @@ -27,6 +26,7 @@
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,6 +42,7 @@
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher;
import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher.DataPoint;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -150,17 +151,9 @@ void runAll(String[] args) throws IOException {
saveSummary(null, configurations, actual, baseline, start);
}

final ImmutableMap<String, String> schema =
ImmutableMap.<String, String>builder()
.put("timestamp", "timestamp")
.put("runtimeSec", "float")
.put("eventsPerSec", "float")
.put("numResults", "integer")
.build();

if (options.getExportSummaryToInfluxDB()) {
final long timestamp = start.getMillis() / 1000; // seconds
savePerfsToInfluxDB(options, schema, actual, timestamp);
savePerfsToInfluxDB(options, actual, timestamp);
}

} finally {
Expand All @@ -179,24 +172,18 @@ void runAll(String[] args) throws IOException {

private static void savePerfsToInfluxDB(
final NexmarkOptions options,
final Map<String, String> schema,
final Map<NexmarkConfiguration, NexmarkPerf> results,
final long timestamp) {
final InfluxDBSettings settings = getInfluxSettings(options);
final Map<String, String> tags = options.getInfluxTags();
final String runner = options.getRunner().getSimpleName();
final List<Map<String, Object>> schemaResults =
final Map<String, String> tags =
options.getInfluxTags() != null ? new HashMap<>(options.getInfluxTags()) : new HashMap<>();
tags.put("runner", options.getRunner().getSimpleName());

final List<DataPoint> dataPoints =
results.entrySet().stream()
.map(
entry ->
getResultsFromSchema(
entry.getValue(),
schema,
timestamp,
runner,
produceMeasurement(options, entry)))
.map(entry -> createInfluxDBDataPoint(options, entry, tags, timestamp))
.collect(toList());
InfluxDBPublisher.publishNexmarkResults(schemaResults, settings, tags);
InfluxDBPublisher.publish(settings, dataPoints);
}

private static InfluxDBSettings getInfluxSettings(final NexmarkOptions options) {
Expand All @@ -208,38 +195,25 @@ private static InfluxDBSettings getInfluxSettings(final NexmarkOptions options)
.get();
}

private static String produceMeasurement(
final NexmarkOptions options, Map.Entry<NexmarkConfiguration, NexmarkPerf> entry) {
private static String generateMeasurementName(
final NexmarkOptions options, NexmarkConfiguration config) {
final String queryName =
NexmarkUtils.fullQueryName(
options.getQueryLanguage(), entry.getKey().query.getNumberOrName());
NexmarkUtils.fullQueryName(options.getQueryLanguage(), config.query.getNumberOrName());
return String.format(
"%s_%s_%s",
options.getBaseInfluxMeasurement(), queryName, processingMode(options.isStreaming()));
}

private static Map<String, Object> getResultsFromSchema(
final NexmarkPerf results,
final Map<String, String> schema,
final long timestamp,
final String runner,
final String measurement) {
final Map<String, Object> schemaResults =
results.toMap().entrySet().stream()
.filter(element -> schema.containsKey(element.getKey()))
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
final int runtimeMs =
(int) ((double) schemaResults.get("runtimeSec") * 1000); // change sec to ms
schemaResults.put("timestamp", timestamp);
schemaResults.put("runner", runner);
schemaResults.put("measurement", measurement);

// By default, InfluxDB treats all number values as floats. We need to add 'i' suffix to
// interpret the value as an integer.
schemaResults.put("runtimeMs", runtimeMs + "i");
schemaResults.put("numResults", schemaResults.get("numResults") + "i");

return schemaResults;
private static InfluxDBPublisher.DataPoint createInfluxDBDataPoint(
final NexmarkOptions options,
final Map.Entry<NexmarkConfiguration, NexmarkPerf> entry,
final Map<String, String> tags,
final long timestamp) {
String measurement = generateMeasurementName(options, entry.getKey());
int runtimeMs = (int) (entry.getValue().runtimeSec * 1000); // change sec to ms
Map<String, Number> fields =
ImmutableMap.of("runtimeMs", runtimeMs, "numResults", entry.getValue().numResults);
return InfluxDBPublisher.dataPoint(measurement, tags, fields, timestamp);
}

/** Append the pair of {@code configuration} and {@code perf} to perf file. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.cloud.bigquery.LegacySQLTypeName;
import java.util.Map;
import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,10 +77,9 @@ public static NamedTestResult create(
@Override
public Map<String, Object> toMap() {
return ImmutableMap.<String, Object>builder()
.put("test_id", testId)
.putAll(tags())
.putAll(fields())
.put("timestamp", timestamp)
.put("metric", metric)
.put("value", value)
.build();
}

Expand All @@ -94,4 +94,17 @@ public String getMetric() {
public double getValue() {
return value;
}

public Map<String, String> tags() {
return ImmutableMap.of("test_id", testId, "metric", metric);
}

public Map<String, Number> fields() {
return ImmutableMap.of("value", value);
}

/** Convert this result to InfluxDB data point. */
public InfluxDBPublisher.DataPoint toInfluxDBDataPoint(String measurement) {
return InfluxDBPublisher.dataPoint(measurement, tags(), fields(), null);
}
}
Loading

0 comments on commit 3c91e7b

Please sign in to comment.