Skip to content

Commit

Permalink
Add Pulsar MessagingProducerMetrics (#11591)
Browse files Browse the repository at this point in the history
Co-authored-by: Lauri Tulmin <[email protected]>
Co-authored-by: Steve Rao <[email protected]>
  • Loading branch information
3 people authored Jul 9, 2024
1 parent ff5bf5d commit a46c8a0
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 1 deletion.
1 change: 1 addition & 0 deletions instrumentation-api-incubator/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
testImplementation(project(":testing-common"))
testImplementation("io.opentelemetry:opentelemetry-sdk")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
testImplementation("io.opentelemetry.semconv:opentelemetry-semconv-incubating")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.incubator.semconv.messaging;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder;
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
import io.opentelemetry.semconv.ErrorAttributes;
import io.opentelemetry.semconv.ServerAttributes;
import java.util.List;

final class MessagingMetricsAdvice {
static final List<Double> DURATION_SECONDS_BUCKETS =
unmodifiableList(
asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0));

// copied from MessagingIncubatingAttributes
private static final AttributeKey<String> MESSAGING_SYSTEM =
AttributeKey.stringKey("messaging.system");
private static final AttributeKey<String> MESSAGING_DESTINATION_NAME =
AttributeKey.stringKey("messaging.destination.name");
private static final AttributeKey<String> MESSAGING_OPERATION =
AttributeKey.stringKey("messaging.operation");
private static final AttributeKey<Long> MESSAGING_BATCH_MESSAGE_COUNT =
AttributeKey.longKey("messaging.batch.message_count");

static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) {
if (!(builder instanceof ExtendedDoubleHistogramBuilder)) {
return;
}
((ExtendedDoubleHistogramBuilder) builder)
.setAttributesAdvice(
asList(
MESSAGING_SYSTEM,
MESSAGING_DESTINATION_NAME,
MESSAGING_OPERATION,
MESSAGING_BATCH_MESSAGE_COUNT,
ErrorAttributes.ERROR_TYPE,
ServerAttributes.SERVER_PORT,
ServerAttributes.SERVER_ADDRESS));
}

private MessagingMetricsAdvice() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.incubator.semconv.messaging;

import static java.util.logging.Level.FINE;

import com.google.auto.value.AutoValue;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.instrumentation.api.instrumenter.OperationListener;
import io.opentelemetry.instrumentation.api.instrumenter.OperationMetrics;
import io.opentelemetry.instrumentation.api.internal.OperationMetricsUtil;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
* {@link OperationListener} which keeps track of <a
* href="https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/messaging/messaging-metrics.md#metric-messagingpublishduration">Producer
* metrics</a>.
*/
public final class MessagingProducerMetrics implements OperationListener {
private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1);

private static final ContextKey<MessagingProducerMetrics.State> MESSAGING_PRODUCER_METRICS_STATE =
ContextKey.named("messaging-producer-metrics-state");
private static final Logger logger = Logger.getLogger(MessagingProducerMetrics.class.getName());

private final DoubleHistogram publishDurationHistogram;

private MessagingProducerMetrics(Meter meter) {
DoubleHistogramBuilder durationBuilder =
meter
.histogramBuilder("messaging.publish.duration")
.setDescription("Measures the duration of publish operation.")
.setExplicitBucketBoundariesAdvice(MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS)
.setUnit("s");
MessagingMetricsAdvice.applyPublishDurationAdvice(durationBuilder);
publishDurationHistogram = durationBuilder.build();
}

public static OperationMetrics get() {
return OperationMetricsUtil.create("messaging produce", MessagingProducerMetrics::new);
}

@Override
@CanIgnoreReturnValue
public Context onStart(Context context, Attributes startAttributes, long startNanos) {
return context.with(
MESSAGING_PRODUCER_METRICS_STATE,
new AutoValue_MessagingProducerMetrics_State(startAttributes, startNanos));
}

@Override
public void onEnd(Context context, Attributes endAttributes, long endNanos) {
MessagingProducerMetrics.State state = context.get(MESSAGING_PRODUCER_METRICS_STATE);
if (state == null) {
logger.log(
FINE,
"No state present when ending context {0}. Cannot record produce publish metrics.",
context);
return;
}

Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build();

publishDurationHistogram.record(
(endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context);
}

@AutoValue
abstract static class State {

abstract Attributes startAttributes();

abstract long startTimeNanos();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.incubator.semconv.messaging;

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.OperationListener;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.semconv.ServerAttributes;
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

class MessagingProducerMetricsTest {

private static final double[] DURATION_BUCKETS =
MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS.stream().mapToDouble(d -> d).toArray();

@Test
void collectsMetrics() {
InMemoryMetricReader metricReader = InMemoryMetricReader.create();
SdkMeterProvider meterProvider =
SdkMeterProvider.builder().registerMetricReader(metricReader).build();

OperationListener listener = MessagingProducerMetrics.get().create(meterProvider.get("test"));

Attributes requestAttributes =
Attributes.builder()
.put(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "pulsar")
.put(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
"persistent://public/default/topic")
.put(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish")
.put(ServerAttributes.SERVER_PORT, 6650)
.put(ServerAttributes.SERVER_ADDRESS, "localhost")
.build();

Attributes responseAttributes =
Attributes.builder()
.put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID, "1:1:0:0")
.put(MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 2)
.build();

Context parent =
Context.root()
.with(
Span.wrap(
SpanContext.create(
"ff01020304050600ff0a0b0c0d0e0f00",
"090a0b0c0d0e0f00",
TraceFlags.getSampled(),
TraceState.getDefault())));

Context context1 = listener.onStart(parent, requestAttributes, nanos(100));

assertThat(metricReader.collectAllMetrics()).isEmpty();

Context context2 = listener.onStart(Context.root(), requestAttributes, nanos(150));

assertThat(metricReader.collectAllMetrics()).isEmpty();

listener.onEnd(context1, responseAttributes, nanos(250));

assertThat(metricReader.collectAllMetrics())
.satisfiesExactlyInAnyOrder(
metric ->
OpenTelemetryAssertions.assertThat(metric)
.hasName("messaging.publish.duration")
.hasUnit("s")
.hasDescription("Measures the duration of publish operation.")
.hasHistogramSatisfying(
histogram ->
histogram.hasPointsSatisfying(
point ->
point
.hasSum(0.15 /* seconds */)
.hasAttributesSatisfying(
equalTo(
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
"pulsar"),
equalTo(
MessagingIncubatingAttributes
.MESSAGING_DESTINATION_NAME,
"persistent://public/default/topic"),
equalTo(ServerAttributes.SERVER_PORT, 6650),
equalTo(ServerAttributes.SERVER_ADDRESS, "localhost"))
.hasExemplarsSatisfying(
exemplar ->
exemplar
.hasTraceId("ff01020304050600ff0a0b0c0d0e0f00")
.hasSpanId("090a0b0c0d0e0f00"))
.hasBucketBoundaries(DURATION_BUCKETS))));

listener.onEnd(context2, responseAttributes, nanos(300));

assertThat(metricReader.collectAllMetrics())
.satisfiesExactlyInAnyOrder(
metric ->
OpenTelemetryAssertions.assertThat(metric)
.hasName("messaging.publish.duration")
.hasHistogramSatisfying(
histogram ->
histogram.hasPointsSatisfying(
point -> point.hasSum(0.3 /* seconds */))));
}

private static long nanos(int millis) {
return TimeUnit.MILLISECONDS.toNanos(millis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingProducerMetrics;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
Expand Down Expand Up @@ -136,7 +137,8 @@ private static Instrumenter<PulsarRequest, Void> createProducerInstrumenter() {
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.PUBLISH))
.addAttributesExtractor(
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()));
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
.addOperationMetrics(MessagingProducerMetrics.get());

if (AgentInstrumentationConfig.get()
.getBoolean("otel.instrumentation.pulsar.experimental-span-attributes", false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.time.Duration;
Expand Down Expand Up @@ -76,6 +77,10 @@ abstract class AbstractPulsarClientTest {

private static final AttributeKey<String> MESSAGE_TYPE =
AttributeKey.stringKey("messaging.pulsar.message.type");
private static final double[] DURATION_BUCKETS =
new double[] {
0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0
};

@BeforeAll
static void beforeAll() throws PulsarClientException {
Expand Down Expand Up @@ -163,6 +168,26 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception {
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
batchReceiveAttributes(topic, null, false))));

assertThat(testing.metrics())
.satisfiesExactlyInAnyOrder(
metric ->
OpenTelemetryAssertions.assertThat(metric)
.hasName("messaging.publish.duration")
.hasUnit("s")
.hasDescription("Measures the duration of publish operation.")
.hasHistogramSatisfying(
histogram ->
histogram.hasPointsSatisfying(
point ->
point
.hasSumGreaterThan(0.0)
.hasAttributesSatisfying(
equalTo(MESSAGING_SYSTEM, "pulsar"),
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(SERVER_PORT, brokerPort),
equalTo(SERVER_ADDRESS, brokerHost))
.hasBucketBoundaries(DURATION_BUCKETS))));
}

@Test
Expand Down

0 comments on commit a46c8a0

Please sign in to comment.