Skip to content

Commit

Permalink
Stable JVM semconv implementation: threads
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek committed Nov 10, 2023
1 parent 31ad4b1 commit b59395d
Show file tree
Hide file tree
Showing 2 changed files with 248 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,30 @@

package io.opentelemetry.instrumentation.runtimemetrics.java8;

import static io.opentelemetry.api.common.AttributeKey.booleanKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static java.util.Objects.requireNonNull;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
import io.opentelemetry.instrumentation.runtimemetrics.java8.internal.JmxRuntimeMetricsUtil;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Nullable;

/**
* Registers measurements that generate metrics about JVM threads.
Expand All @@ -36,7 +51,9 @@ public final class Threads {
// Visible for testing
static final Threads INSTANCE = new Threads();

static final AttributeKey<Boolean> DAEMON = AttributeKey.booleanKey("daemon");
static final AttributeKey<Boolean> DAEMON = booleanKey("daemon");
static final AttributeKey<Boolean> JVM_THREAD_DAEMON = booleanKey("jvm.thread.daemon");
static final AttributeKey<String> JVM_THREAD_STATE = stringKey("jvm.thread.state");

/** Register observers for java runtime class metrics. */
public static List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry) {
Expand All @@ -47,22 +64,94 @@ public static List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry)
List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry, ThreadMXBean threadBean) {
Meter meter = JmxRuntimeMetricsUtil.getMeter(openTelemetry);
List<AutoCloseable> observables = new ArrayList<>();
observables.add(
meter
.upDownCounterBuilder("process.runtime.jvm.threads.count")
.setDescription("Number of executing threads")
.setUnit("{thread}")
.buildWithCallback(
observableMeasurement -> {
observableMeasurement.record(
threadBean.getDaemonThreadCount(),
Attributes.builder().put(DAEMON, true).build());
observableMeasurement.record(
threadBean.getThreadCount() - threadBean.getDaemonThreadCount(),
Attributes.builder().put(DAEMON, false).build());
}));

if (SemconvStability.emitOldJvmSemconv()) {
observables.add(
meter
.upDownCounterBuilder("process.runtime.jvm.threads.count")
.setDescription("Number of executing threads")
.setUnit("{thread}")
.buildWithCallback(
observableMeasurement -> {
observableMeasurement.record(
threadBean.getDaemonThreadCount(),
Attributes.builder().put(DAEMON, true).build());
observableMeasurement.record(
threadBean.getThreadCount() - threadBean.getDaemonThreadCount(),
Attributes.builder().put(DAEMON, false).build());
}));
}

if (SemconvStability.emitStableJvmSemconv()) {
observables.add(
meter
.upDownCounterBuilder("jvm.thread.count")
.setDescription("Number of executing platform threads.")
.setUnit("{thread}")
.buildWithCallback(
isJava9OrNewer()
? java9AndNewerCallback(threadBean)
: java8Callback(threadBean)));
}

return observables;
}

@Nullable private static final MethodHandle THREAD_INFO_IS_DAEMON;

static {
MethodHandle isDaemon;
try {
isDaemon =
MethodHandles.publicLookup()
.findVirtual(ThreadInfo.class, "isDaemon", MethodType.methodType(boolean.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
isDaemon = null;
}
THREAD_INFO_IS_DAEMON = isDaemon;
}

private static boolean isJava9OrNewer() {
return THREAD_INFO_IS_DAEMON != null;
}

private static Consumer<ObservableLongMeasurement> java8Callback(ThreadMXBean threadBean) {
return measurement -> {
measurement.record(
threadBean.getDaemonThreadCount(),
Attributes.builder().put(JVM_THREAD_DAEMON, true).build());
measurement.record(
threadBean.getThreadCount() - threadBean.getDaemonThreadCount(),
Attributes.builder().put(JVM_THREAD_DAEMON, false).build());
};
}

private static Consumer<ObservableLongMeasurement> java9AndNewerCallback(
ThreadMXBean threadBean) {
return measurement -> {
Map<Attributes, Long> counts = new HashMap<>();
long[] threadIds = threadBean.getAllThreadIds();
for (ThreadInfo threadInfo : threadBean.getThreadInfo(threadIds)) {
if (threadInfo == null) {
continue;
}
Attributes threadAttributes = threadAttributes(threadInfo);
counts.compute(threadAttributes, (k, value) -> value == null ? 1 : value + 1);
}
counts.forEach((threadAttributes, count) -> measurement.record(count, threadAttributes));
};
}

private static Attributes threadAttributes(ThreadInfo threadInfo) {
boolean isDaemon;
try {
isDaemon = (boolean) requireNonNull(THREAD_INFO_IS_DAEMON).invoke(threadInfo);
} catch (Throwable e) {
throw new IllegalStateException("Unexpected error happened during ThreadInfo#isDaemon()", e);
}
String threadState = threadInfo.getThreadState().name().toLowerCase(Locale.ROOT);
return Attributes.of(JVM_THREAD_DAEMON, isDaemon, JVM_THREAD_STATE, threadState);
}

private Threads() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.runtimemetrics.java8;

import static io.opentelemetry.instrumentation.runtimemetrics.java8.ScopeUtil.EXPECTED_SCOPE;
import static io.opentelemetry.instrumentation.runtimemetrics.java8.Threads.JVM_THREAD_DAEMON;
import static io.opentelemetry.instrumentation.runtimemetrics.java8.Threads.JVM_THREAD_STATE;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.EnabledOnJre;
import org.junit.jupiter.api.condition.JRE;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;

@ExtendWith(MockitoExtension.class)
class ThreadsStableSemconvTest {

@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();

@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();

@Mock private ThreadMXBean threadBean;

@Test
@EnabledOnJre(JRE.JAVA_8)
void registerObservers_Java8() {
when(threadBean.getThreadCount()).thenReturn(7);
when(threadBean.getDaemonThreadCount()).thenReturn(2);

Threads.INSTANCE
.registerObservers(testing.getOpenTelemetry(), threadBean)
.forEach(cleanup::deferCleanup);

testing.waitAndAssertMetrics(
"io.opentelemetry.runtime-telemetry-java8",
"jvm.thread.count",
metrics ->
metrics.anySatisfy(
metricData ->
assertThat(metricData)
.hasInstrumentationScope(EXPECTED_SCOPE)
.hasDescription("Number of executing platform threads.")
.hasUnit("{thread}")
.hasLongSumSatisfying(
sum ->
sum.isNotMonotonic()
.hasPointsSatisfying(
point ->
point
.hasValue(2)
.hasAttributesSatisfying(
equalTo(JVM_THREAD_DAEMON, true)),
point ->
point
.hasValue(5)
.hasAttributesSatisfying(
equalTo(JVM_THREAD_DAEMON, false))))));
}

@Test
@EnabledForJreRange(min = JRE.JAVA_9)
void registerObservers_Java9AndNewer() {
ThreadInfo threadInfo1 =
mock(ThreadInfo.class, new ThreadInfoAnswer(false, Thread.State.RUNNABLE));
ThreadInfo threadInfo2 =
mock(ThreadInfo.class, new ThreadInfoAnswer(true, Thread.State.WAITING));

long[] threadIds = {12, 32, 42};
when(threadBean.getAllThreadIds()).thenReturn(threadIds);
when(threadBean.getThreadInfo(threadIds))
.thenReturn(new ThreadInfo[] {threadInfo1, null, threadInfo2});

Threads.INSTANCE
.registerObservers(testing.getOpenTelemetry(), threadBean)
.forEach(cleanup::deferCleanup);

testing.waitAndAssertMetrics(
"io.opentelemetry.runtime-telemetry-java8",
"jvm.thread.count",
metrics ->
metrics.anySatisfy(
metricData ->
assertThat(metricData)
.hasInstrumentationScope(EXPECTED_SCOPE)
.hasDescription("Number of executing platform threads.")
.hasUnit("{thread}")
.hasLongSumSatisfying(
sum ->
sum.isNotMonotonic()
.hasPointsSatisfying(
point ->
point
.hasValue(1)
.hasAttributesSatisfying(
equalTo(JVM_THREAD_DAEMON, false),
equalTo(JVM_THREAD_STATE, "runnable")),
point ->
point
.hasValue(1)
.hasAttributesSatisfying(
equalTo(JVM_THREAD_DAEMON, true),
equalTo(JVM_THREAD_STATE, "waiting"))))));
}

static final class ThreadInfoAnswer implements Answer<Object> {

private final boolean isDaemon;
private final Thread.State state;

ThreadInfoAnswer(boolean isDaemon, Thread.State state) {
this.isDaemon = isDaemon;
this.state = state;
}

@Override
public Object answer(InvocationOnMock invocation) {
String methodName = invocation.getMethod().getName();
if (methodName.equals("isDaemon")) {
return isDaemon;
} else if (methodName.equals("getThreadState")) {
return state;
}
return null;
}
}
}

0 comments on commit b59395d

Please sign in to comment.