Skip to content

Commit

Permalink
Publish results of JMH benchmark runs (Java SDK) to InfluxDB (part of #…
Browse files Browse the repository at this point in the history
…22238). (#23041)

* Publish results of JMH benchmark runs (Java SDK) to InfluxDB (#22238).

* review comments

* build global measurement name from JMH mode

* Apply suggestions from code review

Co-authored-by: Lukasz Cwik <[email protected]>

* Review feedback

Co-authored-by: Lukasz Cwik <[email protected]>
  • Loading branch information
Moritz Mack and lukecwik authored Sep 14, 2022
1 parent 9be9a43 commit 9d9db56
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ class BeamModulePlugin implements Plugin<Project> {
jamm : 'io.github.stephankoelle:jamm:0.4.1',
jaxb_api : "jakarta.xml.bind:jakarta.xml.bind-api:$jaxb_api_version",
jaxb_impl : "com.sun.xml.bind:jaxb-impl:$jaxb_api_version",
jmh_core : "org.openjdk.jmh:jmh-core:$jmh_version",
joda_time : "joda-time:joda-time:2.10.10",
jsonassert : "org.skyscreamer:jsonassert:1.5.0",
jsr305 : "com.google.code.findbugs:jsr305:$jsr305_version",
Expand Down Expand Up @@ -1393,8 +1394,9 @@ class BeamModulePlugin implements Plugin<Project> {

if (configuration.enableJmh) {
project.dependencies {
runtimeOnly it.project(path: ":sdks:java:testing:test-utils")
annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$jmh_version"
implementation "org.openjdk.jmh:jmh-core:$jmh_version"
implementation project.library.java.jmh_core
}

project.compileJava {
Expand All @@ -1411,8 +1413,12 @@ class BeamModulePlugin implements Plugin<Project> {

project.tasks.register("jmh", JavaExec) {
dependsOn project.classes
mainClass = "org.openjdk.jmh.Main"
// Note: this will wrap the default JMH runner publishing results to InfluxDB
mainClass = "org.apache.beam.sdk.testutils.jmh.Main"
classpath = project.sourceSets.main.runtimeClasspath

environment 'INFLUXDB_BASE_MEASUREMENT', 'java_jmh'

// For a list of arguments, see
// https://github.com/guozheng/jmh-tutorial/blob/master/README.md
//
Expand Down Expand Up @@ -1449,7 +1455,8 @@ class BeamModulePlugin implements Plugin<Project> {
// Note that these tests will fail on JVMs that JMH doesn't support.
def jmhTest = project.tasks.register("jmhTest", JavaExec) {
dependsOn project.classes
mainClass = "org.openjdk.jmh.Main"
// Note: this will just delegate to the default JMH runner, single shot times are not published to InfluxDB
mainClass = "org.apache.beam.sdk.testutils.jmh.Main"
classpath = project.sourceSets.main.runtimeClasspath

// We filter for only Apache Beam benchmarks to ensure that we aren't
Expand Down
1 change: 1 addition & 0 deletions sdks/java/testing/test-utils/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dependencies {
implementation library.java.http_client
implementation library.java.http_core
implementation library.java.slf4j_api
provided library.java.jmh_core

testImplementation library.java.junit
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadowTest")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.testutils.jmh;

import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.openjdk.jmh.annotations.Mode.SingleShotTime;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher;
import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher.DataPoint;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.infra.BenchmarkParams;
import org.openjdk.jmh.results.BenchmarkResult;
import org.openjdk.jmh.results.BenchmarkResultMetaData;
import org.openjdk.jmh.results.Result;
import org.openjdk.jmh.results.RunResult;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.CommandLineOptionException;
import org.openjdk.jmh.runner.options.CommandLineOptions;

/**
* Custom main wrapper around the {@link Runner JMH runner} that supports publishing JMH benchmark
* results to InfluxDB.
*
* <h3>Schema</h3>
*
* <p>The wrapper writes an aggregated InfluxDB datapoint for each benchmark to <b>measurement</b>
* {@code {INFLUXDB_BASE_MEASUREMENT}_{mode}}. Typically this is {@code java_jmh_thrpt}.
*
* <p>The <b>timestamp</b> of the datapoint corresponds to the start time of the respective
* benchmark.
*
* <p>Individual timeseries are discriminated using the following <b>tags</b> including tags
* corresponding to additional benchmark parameters in case of parameterized benchmarks:
*
* <ul>
* <li>{@code benchmark} (string): Fully qualified name of the benchmark
* <li>{@code scoreUnit} (string): JMH score unit
* <li>optionally, additional parameters in case of a parameterized benchmark (string)
* </ul>
*
* <p>The following fields are captured for each benchmark:
*
* <ul>
* <li>{@code score} (float): JMH score
* <li>{@code scoreMean} (float): Mean score of all iterations
* <li>{@code scoreMedian} (float): Median score of all iterations
* <li>{@code scoreError} (float): Mean error of the score
* <li>{@code sampleCount} (integer): Number of score samples
* <li>{@code durationMs} (integer): Total benchmark duration (including warmups)
* </ul>
*
* <h3>Configuration</h3>
*
* <p>If settings can be inferred from the environment, benchmark results will be published to
* InfluxDB. Otherwise this will just delegate to the default {@link org.openjdk.jmh.Main JMH Main}
* class.
*
* <p>Use the following environment variables to configure the publisher:
*
* <ul>
* <li>{@code INFLUXDB_HOST}: InfluxDB host
* <li>{@code INFLUXDB_DATABASE}: InfluxDB database
* <li>{@code INFLUXDB_USER}: InfluxDB user
* <li>{@code INFLUXDB_USER_PASSWORD}: InfluxDB user password
* <li>{@code INFLUXDB_BASE_MEASUREMENT}: Prefix for measurement name, the benchmark mode will be
* appended to this
* </ul>
*/
public class Main {
private static final String INFLUXDB_HOST = "INFLUXDB_HOST";
private static final String INFLUXDB_DATABASE = "INFLUXDB_DATABASE";
private static final String INFLUXDB_BASE_MEASUREMENT = "INFLUXDB_BASE_MEASUREMENT";

public static void main(String[] args)
throws CommandLineOptionException, IOException, RunnerException {
final CommandLineOptions opts = new CommandLineOptions(args);

final InfluxDBSettings influxDB = influxDBSettings();
final String baseMeasurement = System.getenv(INFLUXDB_BASE_MEASUREMENT);

if (influxDB == null
|| baseMeasurement == null
|| isSingleShotTimeOnly(opts.getBenchModes())
|| opts.shouldHelp()
|| opts.shouldList()
|| opts.shouldListWithParams()
|| opts.shouldListProfilers()
|| opts.shouldListResultFormats()) {
// delegate to JMH runner
org.openjdk.jmh.Main.main(args);
return;
}

final Runner runner = new Runner(opts);
final Collection<RunResult> results = runner.run();

final Collection<DataPoint> dataPoints =
results.stream()
.filter(r -> r.getParams().getMode() != SingleShotTime)
.map(r -> dataPoint(baseMeasurement, r))
.collect(toList());

InfluxDBPublisher.publish(influxDB, dataPoints);
}

private static boolean isSingleShotTimeOnly(Collection<Mode> modes) {
return !modes.isEmpty() && modes.stream().allMatch(SingleShotTime::equals);
}

private static DataPoint dataPoint(String baseMeasurement, RunResult run) {
final BenchmarkParams params = run.getParams();
final Result<?> result = run.getPrimaryResult();

final long startTimeMs =
metaDataStream(run).mapToLong(BenchmarkResultMetaData::getStartTime).min().getAsLong();
final long stopTimeMs =
metaDataStream(run).mapToLong(BenchmarkResultMetaData::getStopTime).max().getAsLong();

final String measurement =
String.format("%s_%s", baseMeasurement, params.getMode().shortLabel());

final Map<String, String> tags = new HashMap<>();
tags.put("benchmark", params.getBenchmark());
tags.put("scoreUnit", result.getScoreUnit());
// add params of parameterized benchmarks as tags
tags.putAll(params.getParamsKeys().stream().collect(toMap(identity(), params::getParam)));

final Map<String, Number> fields = new HashMap<>();
fields.put("score", result.getScore());
fields.put("scoreMean", result.getStatistics().getMean());
fields.put("scoreMedian", result.getStatistics().getPercentile(0.5));
if (!Double.isNaN(result.getScoreError())) {
fields.put("scoreError", result.getScoreError());
}
fields.put("sampleCount", result.getSampleCount());
fields.put("durationMs", stopTimeMs - startTimeMs);

return InfluxDBPublisher.dataPoint(
measurement, tags, fields, startTimeMs, TimeUnit.MILLISECONDS);
}

private static Stream<BenchmarkResultMetaData> metaDataStream(RunResult runResult) {
return runResult.getBenchmarkResults().stream()
.map(BenchmarkResult::getMetadata)
.filter(Objects::nonNull);
}

/** Construct InfluxDB settings from environment variables to not mess with JMH args. */
private static @Nullable InfluxDBSettings influxDBSettings() {
String host = System.getenv(INFLUXDB_HOST);
String database = System.getenv(INFLUXDB_DATABASE);

if (database == null) {
return null;
}

InfluxDBSettings.Builder builder = InfluxDBSettings.builder();
if (host != null) {
builder.withHost(host); // default to localhost otherwise
}
return builder.withDatabase(database).get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Tools for running JMH benchmarks and collecting results. */
package org.apache.beam.sdk.testutils.jmh;
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ public static DataPoint dataPoint(
measurement, tags, fields, timestampSecs, TimeUnit.SECONDS);
}

/** Creates an InfluxDB data point. */
public static DataPoint dataPoint(
String measurement,
Map<String, String> tags,
Map<String, Number> fields,
@Nullable Long timestamp,
TimeUnit timestampUnit) {
return new AutoValue_InfluxDBPublisher_DataPoint(
measurement, tags, fields, timestamp, timestampUnit);
}

/** @deprecated Use {@link #publish} instead. */
@Deprecated
public static void publishNexmarkResults(
Expand Down

0 comments on commit 9d9db56

Please sign in to comment.