From 745d99420823ebaac5c8af19f54f3071ab3a5322 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Sun, 14 Jul 2024 10:37:29 -0700 Subject: [PATCH] Revert "Add Pulsar MessagingProducerMetrics (#11591)" This reverts commit a46c8a0c7c8695221c0daa5e3792f207504b7664. --- .../build.gradle.kts | 1 - .../messaging/MessagingMetricsAdvice.java | 50 ------- .../messaging/MessagingProducerMetrics.java | 85 ------------ .../MessagingProducerMetricsTest.java | 122 ------------------ .../v2_8/telemetry/PulsarSingletons.java | 4 +- .../pulsar/v2_8/AbstractPulsarClientTest.java | 25 ---- 6 files changed, 1 insertion(+), 286 deletions(-) delete mode 100644 instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java delete mode 100644 instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java delete mode 100644 instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java diff --git a/instrumentation-api-incubator/build.gradle.kts b/instrumentation-api-incubator/build.gradle.kts index 00d887da1b25..6acb75e5f79d 100644 --- a/instrumentation-api-incubator/build.gradle.kts +++ b/instrumentation-api-incubator/build.gradle.kts @@ -21,7 +21,6 @@ dependencies { testImplementation(project(":testing-common")) testImplementation("io.opentelemetry:opentelemetry-sdk") testImplementation("io.opentelemetry:opentelemetry-sdk-testing") - testImplementation("io.opentelemetry.semconv:opentelemetry-semconv-incubating") } tasks { diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java deleted file mode 100644 index 8ada61c1683f..000000000000 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; - -import static java.util.Arrays.asList; -import static java.util.Collections.unmodifiableList; - -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder; -import io.opentelemetry.api.metrics.DoubleHistogramBuilder; -import io.opentelemetry.semconv.ErrorAttributes; -import io.opentelemetry.semconv.ServerAttributes; -import java.util.List; - -final class MessagingMetricsAdvice { - static final List DURATION_SECONDS_BUCKETS = - unmodifiableList( - asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)); - - // copied from MessagingIncubatingAttributes - private static final AttributeKey MESSAGING_SYSTEM = - AttributeKey.stringKey("messaging.system"); - private static final AttributeKey MESSAGING_DESTINATION_NAME = - AttributeKey.stringKey("messaging.destination.name"); - private static final AttributeKey MESSAGING_OPERATION = - AttributeKey.stringKey("messaging.operation"); - private static final AttributeKey MESSAGING_BATCH_MESSAGE_COUNT = - AttributeKey.longKey("messaging.batch.message_count"); - - static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) { - if (!(builder instanceof ExtendedDoubleHistogramBuilder)) { - return; - } - ((ExtendedDoubleHistogramBuilder) builder) - .setAttributesAdvice( - asList( - MESSAGING_SYSTEM, - MESSAGING_DESTINATION_NAME, - MESSAGING_OPERATION, - MESSAGING_BATCH_MESSAGE_COUNT, - ErrorAttributes.ERROR_TYPE, - ServerAttributes.SERVER_PORT, - ServerAttributes.SERVER_ADDRESS)); - } - - private MessagingMetricsAdvice() {} -} diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java deleted file mode 100644 index 44d5b243744a..000000000000 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; - -import static java.util.logging.Level.FINE; - -import com.google.auto.value.AutoValue; -import com.google.errorprone.annotations.CanIgnoreReturnValue; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.DoubleHistogram; -import io.opentelemetry.api.metrics.DoubleHistogramBuilder; -import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.ContextKey; -import io.opentelemetry.instrumentation.api.instrumenter.OperationListener; -import io.opentelemetry.instrumentation.api.instrumenter.OperationMetrics; -import io.opentelemetry.instrumentation.api.internal.OperationMetricsUtil; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -/** - * {@link OperationListener} which keeps track of Producer - * metrics. - */ -public final class MessagingProducerMetrics implements OperationListener { - private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1); - - private static final ContextKey MESSAGING_PRODUCER_METRICS_STATE = - ContextKey.named("messaging-producer-metrics-state"); - private static final Logger logger = Logger.getLogger(MessagingProducerMetrics.class.getName()); - - private final DoubleHistogram publishDurationHistogram; - - private MessagingProducerMetrics(Meter meter) { - DoubleHistogramBuilder durationBuilder = - meter - .histogramBuilder("messaging.publish.duration") - .setDescription("Measures the duration of publish operation.") - .setExplicitBucketBoundariesAdvice(MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS) - .setUnit("s"); - MessagingMetricsAdvice.applyPublishDurationAdvice(durationBuilder); - publishDurationHistogram = durationBuilder.build(); - } - - public static OperationMetrics get() { - return OperationMetricsUtil.create("messaging produce", MessagingProducerMetrics::new); - } - - @Override - @CanIgnoreReturnValue - public Context onStart(Context context, Attributes startAttributes, long startNanos) { - return context.with( - MESSAGING_PRODUCER_METRICS_STATE, - new AutoValue_MessagingProducerMetrics_State(startAttributes, startNanos)); - } - - @Override - public void onEnd(Context context, Attributes endAttributes, long endNanos) { - MessagingProducerMetrics.State state = context.get(MESSAGING_PRODUCER_METRICS_STATE); - if (state == null) { - logger.log( - FINE, - "No state present when ending context {0}. Cannot record produce publish metrics.", - context); - return; - } - - Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build(); - - publishDurationHistogram.record( - (endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context); - } - - @AutoValue - abstract static class State { - - abstract Attributes startAttributes(); - - abstract long startTimeNanos(); - } -} diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java deleted file mode 100644 index 491387253ee9..000000000000 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; - -import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; -import static org.assertj.core.api.Assertions.assertThat; - -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; -import io.opentelemetry.api.trace.TraceFlags; -import io.opentelemetry.api.trace.TraceState; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.instrumenter.OperationListener; -import io.opentelemetry.sdk.metrics.SdkMeterProvider; -import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; -import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; -import io.opentelemetry.semconv.ServerAttributes; -import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; -import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.Test; - -class MessagingProducerMetricsTest { - - private static final double[] DURATION_BUCKETS = - MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS.stream().mapToDouble(d -> d).toArray(); - - @Test - void collectsMetrics() { - InMemoryMetricReader metricReader = InMemoryMetricReader.create(); - SdkMeterProvider meterProvider = - SdkMeterProvider.builder().registerMetricReader(metricReader).build(); - - OperationListener listener = MessagingProducerMetrics.get().create(meterProvider.get("test")); - - Attributes requestAttributes = - Attributes.builder() - .put(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "pulsar") - .put( - MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, - "persistent://public/default/topic") - .put(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish") - .put(ServerAttributes.SERVER_PORT, 6650) - .put(ServerAttributes.SERVER_ADDRESS, "localhost") - .build(); - - Attributes responseAttributes = - Attributes.builder() - .put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID, "1:1:0:0") - .put(MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 2) - .build(); - - Context parent = - Context.root() - .with( - Span.wrap( - SpanContext.create( - "ff01020304050600ff0a0b0c0d0e0f00", - "090a0b0c0d0e0f00", - TraceFlags.getSampled(), - TraceState.getDefault()))); - - Context context1 = listener.onStart(parent, requestAttributes, nanos(100)); - - assertThat(metricReader.collectAllMetrics()).isEmpty(); - - Context context2 = listener.onStart(Context.root(), requestAttributes, nanos(150)); - - assertThat(metricReader.collectAllMetrics()).isEmpty(); - - listener.onEnd(context1, responseAttributes, nanos(250)); - - assertThat(metricReader.collectAllMetrics()) - .satisfiesExactlyInAnyOrder( - metric -> - OpenTelemetryAssertions.assertThat(metric) - .hasName("messaging.publish.duration") - .hasUnit("s") - .hasDescription("Measures the duration of publish operation.") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> - point - .hasSum(0.15 /* seconds */) - .hasAttributesSatisfying( - equalTo( - MessagingIncubatingAttributes.MESSAGING_SYSTEM, - "pulsar"), - equalTo( - MessagingIncubatingAttributes - .MESSAGING_DESTINATION_NAME, - "persistent://public/default/topic"), - equalTo(ServerAttributes.SERVER_PORT, 6650), - equalTo(ServerAttributes.SERVER_ADDRESS, "localhost")) - .hasExemplarsSatisfying( - exemplar -> - exemplar - .hasTraceId("ff01020304050600ff0a0b0c0d0e0f00") - .hasSpanId("090a0b0c0d0e0f00")) - .hasBucketBoundaries(DURATION_BUCKETS)))); - - listener.onEnd(context2, responseAttributes, nanos(300)); - - assertThat(metricReader.collectAllMetrics()) - .satisfiesExactlyInAnyOrder( - metric -> - OpenTelemetryAssertions.assertThat(metric) - .hasName("messaging.publish.duration") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> point.hasSum(0.3 /* seconds */)))); - } - - private static long nanos(int millis) { - return TimeUnit.MILLISECONDS.toNanos(millis); - } -} diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 1ab6aa3f48ac..1d05086ef401 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -13,7 +13,6 @@ import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; -import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingProducerMetrics; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; @@ -137,8 +136,7 @@ private static Instrumenter createProducerInstrumenter() { .addAttributesExtractor( createMessagingAttributesExtractor(getter, MessageOperation.PUBLISH)) .addAttributesExtractor( - ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) - .addOperationMetrics(MessagingProducerMetrics.get()); + ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())); if (AgentInstrumentationConfig.get() .getBoolean("otel.instrumentation.pulsar.experimental-span-attributes", false)) { diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index 7883d5082c4f..f32daa20e074 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -22,7 +22,6 @@ import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; -import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import java.time.Duration; @@ -77,10 +76,6 @@ abstract class AbstractPulsarClientTest { private static final AttributeKey MESSAGE_TYPE = AttributeKey.stringKey("messaging.pulsar.message.type"); - private static final double[] DURATION_BUCKETS = - new double[] { - 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0 - }; @BeforeAll static void beforeAll() throws PulsarClientException { @@ -168,26 +163,6 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception { .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( batchReceiveAttributes(topic, null, false)))); - - assertThat(testing.metrics()) - .satisfiesExactlyInAnyOrder( - metric -> - OpenTelemetryAssertions.assertThat(metric) - .hasName("messaging.publish.duration") - .hasUnit("s") - .hasDescription("Measures the duration of publish operation.") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> - point - .hasSumGreaterThan(0.0) - .hasAttributesSatisfying( - equalTo(MESSAGING_SYSTEM, "pulsar"), - equalTo(MESSAGING_DESTINATION_NAME, topic), - equalTo(SERVER_PORT, brokerPort), - equalTo(SERVER_ADDRESS, brokerHost)) - .hasBucketBoundaries(DURATION_BUCKETS)))); } @Test