From 0a387c97979302007f5b0125dbe4f56ebed05db4 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 15 Nov 2024 02:32:04 +0800 Subject: [PATCH 01/10] Support send txn message. --- .../v2_8/PulsarInstrumentationModule.java | 3 +- .../v2_8/TransactionImplInstrumentation.java | 46 +++++++++++++++++++ .../v2_8/telemetry/PulsarSingletons.java | 38 +++++++++++++++ .../pulsar/v2_8/AbstractPulsarClientTest.java | 3 +- .../pulsar/v2_8/PulsarClientTest.java | 30 ++++++++++++ 5 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java index a6ed299c64f6..de8d784d25c9 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java @@ -24,6 +24,7 @@ public List typeInstrumentations() { new ConsumerImplInstrumentation(), new ProducerImplInstrumentation(), new MessageInstrumentation(), - new MessageListenerInstrumentation()); + new MessageListenerInstrumentation(), + new TransactionImplInstrumentation()); } } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java new file mode 100644 index 000000000000..e55267f3d89c --- /dev/null +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java @@ -0,0 +1,46 @@ +package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; + +import io.opentelemetry.instrumentation.api.internal.Timer; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons; +import java.util.concurrent.CompletableFuture; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +public class TransactionImplInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.pulsar.client.impl.ProducerImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod(named("registerProducedTopic") + .and(isPublic()) + .and(takesArguments(1)) + .and(takesArgument(0, named("java.lang.String"))), + TransactionImplInstrumentation.class.getName() + "$RegisterProducedTopicAdvice"); + } + + @SuppressWarnings("unused") + public static class RegisterProducedTopicAdvice { + + @Advice.OnMethodEnter + public static Timer before() { + return Timer.start(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void after(@Advice.Enter Timer timer, + @Advice.Return(readOnly = false) CompletableFuture future) { + future = PulsarSingletons.wrap(future, timer); + } + } +} diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 1ab6aa3f48ac..392a20b03b77 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -52,6 +52,8 @@ public final class PulsarSingletons { createConsumerBatchReceiveInstrumenter(); private static final Instrumenter PRODUCER_INSTRUMENTER = createProducerInstrumenter(); + private static final Instrumenter TXN_PRODUCE_INSTRUMENTER = + createTxnProduceInstrumenter(); public static Instrumenter consumerProcessInstrumenter() { return CONSUMER_PROCESS_INSTRUMENTER; @@ -148,6 +150,23 @@ private static Instrumenter createProducerInstrumenter() { return builder.buildProducerInstrumenter(MessageTextMapSetter.INSTANCE); } + private static Instrumenter createTxnProduceInstrumenter() { + InstrumenterBuilder instrumenterBuilder = + Instrumenter.builder( + TELEMETRY, + INSTRUMENTATION_NAME, request -> "Txn Produce Register Topic"); + return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysProducer()); + } + + public static Context startAndEndTxnProduceRegister(Context parent, Timer timer, + Throwable throwable) { + if (!TXN_PRODUCE_INSTRUMENTER.shouldStart(parent, "")) { + return null; + } + return InstrumenterUtil.startAndEnd( + TXN_PRODUCE_INSTRUMENTER, parent, "", null, throwable, timer.startTime(), timer.now()); + } + private static AttributesExtractor createMessagingAttributesExtractor( MessagingAttributesGetter getter, MessageOperation operation) { return MessagingAttributesExtractor.builder(getter, operation) @@ -207,6 +226,25 @@ private static Context startAndEndConsumerReceive( timer.now()); } + public static CompletableFuture wrap(CompletableFuture future, Timer timer) { + Context parent = Context.current(); + CompletableFuture result = new CompletableFuture<>(); + future.whenComplete((unused, t) -> { + Context context = startAndEndTxnProduceRegister(parent, timer, t); + runWithContext( + context, + () -> { + if (t != null) { + result.completeExceptionally(t); + } else { + result.complete(null); + } + }); + }); + + return result; + } + public static CompletableFuture> wrap( CompletableFuture> future, Timer timer, Consumer consumer) { boolean listenerContextActive = MessageListenerContext.isProcessing(); diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index 7883d5082c4f..404d84bce5cf 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -93,7 +93,8 @@ static void beforeAll() throws PulsarClientException { brokerHost = pulsar.getHost(); brokerPort = pulsar.getMappedPort(6650); - client = PulsarClient.builder().serviceUrl(pulsar.getPulsarBrokerUrl()).build(); + client = PulsarClient.builder().serviceUrl(pulsar.getPulsarBrokerUrl()) + .enableTransaction(true).build(); admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build(); } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index 4ffb30469265..bfaaa5f54fb9 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -20,6 +20,7 @@ import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.transaction.Transaction; import org.junit.jupiter.api.Test; class PulsarClientTest extends AbstractPulsarClientTest { @@ -444,4 +445,33 @@ void testConsumeMultiTopics() throws Exception { .hasAttributesSatisfyingExactly( processAttributes(topic2, msgId2.toString(), false)))); } + + @Test + void testSendMessageWithTxn() throws Exception { + String topic = "persistent://public/default/testSendMessageWithTxn"; + admin.topics().createNonPartitionedTopic(topic); + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + Transaction txn = client.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS).build().get(); + + testing.runWithSpan("parent1", + () -> producer.newMessage(txn).value("test1").send()); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("parent1") + .hasKind(SpanKind.INTERNAL) + .hasNoParent(), + span -> + span.hasName("Txn Produce Register Topic") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)), + span -> + span.hasName(topic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(1)) + )); + } } From 48bc01635d516213b80dec7b2a6de406c8a04387 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 15 Nov 2024 02:51:04 +0800 Subject: [PATCH 02/10] fix codestyle --- .../v2_8/TransactionImplInstrumentation.java | 27 ++++++++++------ .../v2_8/telemetry/PulsarSingletons.java | 32 +++++++++---------- .../pulsar/v2_8/AbstractPulsarClientTest.java | 7 ++-- .../pulsar/v2_8/PulsarClientTest.java | 15 +++------ 4 files changed, 43 insertions(+), 38 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java index e55267f3d89c..2b1df69e258d 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java @@ -1,5 +1,15 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; @@ -9,11 +19,6 @@ import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import static net.bytebuddy.matcher.ElementMatchers.isPublic; -import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import static net.bytebuddy.matcher.ElementMatchers.takesArguments; - public class TransactionImplInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { @@ -22,10 +27,11 @@ public ElementMatcher typeMatcher() { @Override public void transform(TypeTransformer transformer) { - transformer.applyAdviceToMethod(named("registerProducedTopic") - .and(isPublic()) - .and(takesArguments(1)) - .and(takesArgument(0, named("java.lang.String"))), + transformer.applyAdviceToMethod( + named("registerProducedTopic") + .and(isPublic()) + .and(takesArguments(1)) + .and(takesArgument(0, named("java.lang.String"))), TransactionImplInstrumentation.class.getName() + "$RegisterProducedTopicAdvice"); } @@ -38,7 +44,8 @@ public static Timer before() { } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void after(@Advice.Enter Timer timer, + public static void after( + @Advice.Enter Timer timer, @Advice.Return(readOnly = false) CompletableFuture future) { future = PulsarSingletons.wrap(future, timer); } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 7802b20f826a..86bcbfae64c6 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -156,13 +156,12 @@ private static Instrumenter createProducerInstrumenter() { private static Instrumenter createTxnProduceInstrumenter() { InstrumenterBuilder instrumenterBuilder = Instrumenter.builder( - TELEMETRY, - INSTRUMENTATION_NAME, request -> "Txn Produce Register Topic"); + TELEMETRY, INSTRUMENTATION_NAME, request -> "Txn Produce Register Topic"); return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysProducer()); } - public static Context startAndEndTxnProduceRegister(Context parent, Timer timer, - Throwable throwable) { + public static Context startAndEndTxnProduceRegister( + Context parent, Timer timer, Throwable throwable) { if (!TXN_PRODUCE_INSTRUMENTER.shouldStart(parent, "")) { return null; } @@ -232,18 +231,19 @@ private static Context startAndEndConsumerReceive( public static CompletableFuture wrap(CompletableFuture future, Timer timer) { Context parent = Context.current(); CompletableFuture result = new CompletableFuture<>(); - future.whenComplete((unused, t) -> { - Context context = startAndEndTxnProduceRegister(parent, timer, t); - runWithContext( - context, - () -> { - if (t != null) { - result.completeExceptionally(t); - } else { - result.complete(null); - } - }); - }); + future.whenComplete( + (unused, t) -> { + Context context = startAndEndTxnProduceRegister(parent, timer, t); + runWithContext( + context, + () -> { + if (t != null) { + result.completeExceptionally(t); + } else { + result.complete(null); + } + }); + }); return result; } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index ce2da473c879..6e70b15c5249 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -94,8 +94,11 @@ static void beforeAll() throws PulsarClientException { brokerHost = pulsar.getHost(); brokerPort = pulsar.getMappedPort(6650); - client = PulsarClient.builder().serviceUrl(pulsar.getPulsarBrokerUrl()) - .enableTransaction(true).build(); + client = + PulsarClient.builder() + .serviceUrl(pulsar.getPulsarBrokerUrl()) + .enableTransaction(true) + .build(); admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build(); } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index a914a90c0435..67be73e547d1 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -678,19 +678,15 @@ void testSendMessageWithTxn() throws Exception { String topic = "persistent://public/default/testSendMessageWithTxn"; admin.topics().createNonPartitionedTopic(topic); producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); - Transaction txn = client.newTransaction() - .withTransactionTimeout(5, TimeUnit.SECONDS).build().get(); + Transaction txn = + client.newTransaction().withTransactionTimeout(5, TimeUnit.SECONDS).build().get(); - testing.runWithSpan("parent1", - () -> producer.newMessage(txn).value("test1").send()); + testing.runWithSpan("parent1", () -> producer.newMessage(txn).value("test1").send()); testing.waitAndAssertTraces( trace -> trace.hasSpansSatisfyingExactly( - span -> - span.hasName("parent1") - .hasKind(SpanKind.INTERNAL) - .hasNoParent(), + span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> span.hasName("Txn Produce Register Topic") .hasKind(SpanKind.PRODUCER) @@ -698,7 +694,6 @@ void testSendMessageWithTxn() throws Exception { span -> span.hasName(topic + " publish") .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(1)) - )); + .hasParent(trace.getSpan(1)))); } } From 1664a14b9588d44e002e8d16eae98760dccbc51c Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 15 Nov 2024 03:39:15 +0800 Subject: [PATCH 03/10] enable pulsar txn coordinator --- .../pulsar/v2_8/AbstractPulsarClientTest.java | 2 ++ .../instrumentation/pulsar/v2_8/PulsarClientTest.java | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index 6e70b15c5249..f5684222c33e 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -88,6 +88,8 @@ static void beforeAll() throws PulsarClientException { pulsar = new PulsarContainer(DEFAULT_IMAGE_NAME) .withEnv("PULSAR_MEM", "-Xmx128m") + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true") + .withEnv("PULSAR_PREFIX_transactionMetadataStoreProviderClassName", "org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider") .withLogConsumer(new Slf4jLogConsumer(logger)) .withStartupTimeout(Duration.ofMinutes(2)); pulsar.start(); diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index 67be73e547d1..bd1f6843e397 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -677,12 +677,13 @@ void testConsumePartitionedTopicUsingBatchReceive() throws Exception { void testSendMessageWithTxn() throws Exception { String topic = "persistent://public/default/testSendMessageWithTxn"; admin.topics().createNonPartitionedTopic(topic); - producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + producer = client.newProducer(Schema.STRING).topic(topic) + .sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create(); Transaction txn = client.newTransaction().withTransactionTimeout(5, TimeUnit.SECONDS).build().get(); + txn.commit(); testing.runWithSpan("parent1", () -> producer.newMessage(txn).value("test1").send()); - testing.waitAndAssertTraces( trace -> trace.hasSpansSatisfyingExactly( From 7ed6449dc72079ccbb91fd6270d3596c45c29454 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 15 Nov 2024 03:59:21 +0800 Subject: [PATCH 04/10] enable pulsar txn coordinator --- .../pulsar/v2_8/AbstractPulsarClientTest.java | 7 ++++++- .../instrumentation/pulsar/v2_8/PulsarClientTest.java | 9 +++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index f5684222c33e..5a1ff8377416 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -89,7 +89,12 @@ static void beforeAll() throws PulsarClientException { new PulsarContainer(DEFAULT_IMAGE_NAME) .withEnv("PULSAR_MEM", "-Xmx128m") .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true") - .withEnv("PULSAR_PREFIX_transactionMetadataStoreProviderClassName", "org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider") + .withEnv( + "PULSAR_PREFIX_transactionMetadataStoreProviderClassName", + "org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider") + .withEnv( + "PULSAR_PREFIX_transactionBufferProviderClassName", + "org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider") .withLogConsumer(new Slf4jLogConsumer(logger)) .withStartupTimeout(Duration.ofMinutes(2)); pulsar.start(); diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index bd1f6843e397..e8057acbe107 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -677,8 +677,13 @@ void testConsumePartitionedTopicUsingBatchReceive() throws Exception { void testSendMessageWithTxn() throws Exception { String topic = "persistent://public/default/testSendMessageWithTxn"; admin.topics().createNonPartitionedTopic(topic); - producer = client.newProducer(Schema.STRING).topic(topic) - .sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create(); + producer = + client + .newProducer(Schema.STRING) + .topic(topic) + .sendTimeout(0, TimeUnit.SECONDS) + .enableBatching(false) + .create(); Transaction txn = client.newTransaction().withTransactionTimeout(5, TimeUnit.SECONDS).build().get(); txn.commit(); From 65ed64499b25d5e91796cd7bda7bc401da90d3e7 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 15 Nov 2024 04:30:05 +0800 Subject: [PATCH 05/10] fix test --- .../javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index e8057acbe107..eb03befccb15 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -686,9 +686,9 @@ void testSendMessageWithTxn() throws Exception { .create(); Transaction txn = client.newTransaction().withTransactionTimeout(5, TimeUnit.SECONDS).build().get(); + testing.runWithSpan("parent1", () -> producer.newMessage(txn).value("test1").send()); txn.commit(); - testing.runWithSpan("parent1", () -> producer.newMessage(txn).value("test1").send()); testing.waitAndAssertTraces( trace -> trace.hasSpansSatisfyingExactly( From 837675e35516d90edde0a438389abc836c333fc0 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 15 Nov 2024 05:20:56 +0800 Subject: [PATCH 06/10] fix test --- conventions/src/main/kotlin/otel.java-conventions.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conventions/src/main/kotlin/otel.java-conventions.gradle.kts b/conventions/src/main/kotlin/otel.java-conventions.gradle.kts index fa08fe38cc7a..4779decab69e 100644 --- a/conventions/src/main/kotlin/otel.java-conventions.gradle.kts +++ b/conventions/src/main/kotlin/otel.java-conventions.gradle.kts @@ -369,7 +369,7 @@ tasks.withType().configureEach { // All tests must complete within 15 minutes. // This value is quite big because with lower values (3 mins) we were experiencing large number of false positives - timeout.set(Duration.ofMinutes(15)) + timeout.set(Duration.ofMinutes(30)) develocity.testRetry { // You can see tests that were retried by this mechanism in the collected test reports and build scans. From 507350f1f69684280bf9c623658779c07865b7f7 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 15 Nov 2024 13:32:06 +0800 Subject: [PATCH 07/10] fix test --- .../src/main/kotlin/otel.java-conventions.gradle.kts | 2 +- .../pulsar/v2_8/TransactionImplInstrumentation.java | 2 +- .../pulsar/v2_8/AbstractPulsarClientTest.java | 10 ++-------- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/conventions/src/main/kotlin/otel.java-conventions.gradle.kts b/conventions/src/main/kotlin/otel.java-conventions.gradle.kts index 4779decab69e..fa08fe38cc7a 100644 --- a/conventions/src/main/kotlin/otel.java-conventions.gradle.kts +++ b/conventions/src/main/kotlin/otel.java-conventions.gradle.kts @@ -369,7 +369,7 @@ tasks.withType().configureEach { // All tests must complete within 15 minutes. // This value is quite big because with lower values (3 mins) we were experiencing large number of false positives - timeout.set(Duration.ofMinutes(30)) + timeout.set(Duration.ofMinutes(15)) develocity.testRetry { // You can see tests that were retried by this mechanism in the collected test reports and build scans. diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java index 2b1df69e258d..95525ae1eaa9 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java @@ -22,7 +22,7 @@ public class TransactionImplInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { - return named("org.apache.pulsar.client.impl.ProducerImpl"); + return named("org.apache.pulsar.client.impl.transaction.TransactionImpl"); } @Override diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index 5a1ff8377416..993e53536f8b 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -88,15 +88,9 @@ static void beforeAll() throws PulsarClientException { pulsar = new PulsarContainer(DEFAULT_IMAGE_NAME) .withEnv("PULSAR_MEM", "-Xmx128m") - .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true") - .withEnv( - "PULSAR_PREFIX_transactionMetadataStoreProviderClassName", - "org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider") - .withEnv( - "PULSAR_PREFIX_transactionBufferProviderClassName", - "org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider") .withLogConsumer(new Slf4jLogConsumer(logger)) - .withStartupTimeout(Duration.ofMinutes(2)); + .withStartupTimeout(Duration.ofMinutes(2)) + .withTransactions(); pulsar.start(); brokerHost = pulsar.getHost(); From b039e11bff1ead63274d70d341bd028e1ff6c4e4 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 18 Nov 2024 14:37:03 +0800 Subject: [PATCH 08/10] Address review comment --- .../v2_8/TransactionImplInstrumentation.java | 11 +---- .../v2_8/telemetry/PulsarSingletons.java | 43 ++++++------------- .../pulsar/v2_8/PulsarClientTest.java | 6 +-- 3 files changed, 15 insertions(+), 45 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java index 95525ae1eaa9..7f109a2a4979 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java @@ -10,7 +10,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; -import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons; @@ -31,23 +30,17 @@ public void transform(TypeTransformer transformer) { named("registerProducedTopic") .and(isPublic()) .and(takesArguments(1)) - .and(takesArgument(0, named("java.lang.String"))), + .and(takesArgument(0, String.class)), TransactionImplInstrumentation.class.getName() + "$RegisterProducedTopicAdvice"); } @SuppressWarnings("unused") public static class RegisterProducedTopicAdvice { - @Advice.OnMethodEnter - public static Timer before() { - return Timer.start(); - } - @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void after( - @Advice.Enter Timer timer, @Advice.Return(readOnly = false) CompletableFuture future) { - future = PulsarSingletons.wrap(future, timer); + future = PulsarSingletons.wrap(future); } } } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 86bcbfae64c6..13d35eee08e6 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -53,8 +53,6 @@ public final class PulsarSingletons { createConsumerBatchReceiveInstrumenter(); private static final Instrumenter PRODUCER_INSTRUMENTER = createProducerInstrumenter(); - private static final Instrumenter TXN_PRODUCE_INSTRUMENTER = - createTxnProduceInstrumenter(); public static Instrumenter consumerProcessInstrumenter() { return CONSUMER_PROCESS_INSTRUMENTER; @@ -153,22 +151,6 @@ private static Instrumenter createProducerInstrumenter() { return builder.buildProducerInstrumenter(MessageTextMapSetter.INSTANCE); } - private static Instrumenter createTxnProduceInstrumenter() { - InstrumenterBuilder instrumenterBuilder = - Instrumenter.builder( - TELEMETRY, INSTRUMENTATION_NAME, request -> "Txn Produce Register Topic"); - return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysProducer()); - } - - public static Context startAndEndTxnProduceRegister( - Context parent, Timer timer, Throwable throwable) { - if (!TXN_PRODUCE_INSTRUMENTER.shouldStart(parent, "")) { - return null; - } - return InstrumenterUtil.startAndEnd( - TXN_PRODUCE_INSTRUMENTER, parent, "", null, throwable, timer.startTime(), timer.now()); - } - private static AttributesExtractor createMessagingAttributesExtractor( MessagingAttributesGetter getter, MessageOperation operation) { return MessagingAttributesExtractor.builder(getter, operation) @@ -228,22 +210,21 @@ private static Context startAndEndConsumerReceive( timer.now()); } - public static CompletableFuture wrap(CompletableFuture future, Timer timer) { + public static CompletableFuture wrap(CompletableFuture future) { Context parent = Context.current(); CompletableFuture result = new CompletableFuture<>(); future.whenComplete( - (unused, t) -> { - Context context = startAndEndTxnProduceRegister(parent, timer, t); - runWithContext( - context, - () -> { - if (t != null) { - result.completeExceptionally(t); - } else { - result.complete(null); - } - }); - }); + (unused, t) -> + runWithContext( + parent, + () -> { + if (t != null) { + result.completeExceptionally(t); + } else { + result.complete(null); + } + }) + ); return result; } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index eb03befccb15..609105f1ebbf 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -693,13 +693,9 @@ void testSendMessageWithTxn() throws Exception { trace -> trace.hasSpansSatisfyingExactly( span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(), - span -> - span.hasName("Txn Produce Register Topic") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)), span -> span.hasName(topic + " publish") .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(1)))); + .hasParent(trace.getSpan(0)))); } } From 67d5b3bf8657b6f2348a3244da0c530b0cf3fa0e Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 18 Nov 2024 15:45:45 +0800 Subject: [PATCH 09/10] fix checkstyle --- .../pulsar/v2_8/TransactionImplInstrumentation.java | 3 +-- .../pulsar/v2_8/telemetry/PulsarSingletons.java | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java index 7f109a2a4979..15684a75fb53 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java @@ -38,8 +38,7 @@ public void transform(TypeTransformer transformer) { public static class RegisterProducedTopicAdvice { @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void after( - @Advice.Return(readOnly = false) CompletableFuture future) { + public static void after(@Advice.Return(readOnly = false) CompletableFuture future) { future = PulsarSingletons.wrap(future); } } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 13d35eee08e6..14ad9a137aa0 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -223,8 +223,7 @@ public static CompletableFuture wrap(CompletableFuture future) { } else { result.complete(null); } - }) - ); + })); return result; } From a36ef16ddc72b043facf15bafb49a05effd058a3 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 19 Nov 2024 01:42:20 +0800 Subject: [PATCH 10/10] Address review comments --- .../instrumentation/pulsar/v2_8/PulsarClientTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index 609105f1ebbf..a2c4c94b69bc 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -684,10 +684,10 @@ void testSendMessageWithTxn() throws Exception { .sendTimeout(0, TimeUnit.SECONDS) .enableBatching(false) .create(); - Transaction txn = - client.newTransaction().withTransactionTimeout(5, TimeUnit.SECONDS).build().get(); - testing.runWithSpan("parent1", () -> producer.newMessage(txn).value("test1").send()); - txn.commit(); + Transaction transaction = + client.newTransaction().withTransactionTimeout(15, TimeUnit.SECONDS).build().get(); + testing.runWithSpan("parent1", () -> producer.newMessage(transaction).value("test1").send()); + transaction.commit(); testing.waitAndAssertTraces( trace ->