diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java index 172538815cfb..544a818ee12d 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java @@ -25,6 +25,7 @@ public List typeInstrumentations() { new ProducerImplInstrumentation(), new MessageInstrumentation(), new MessageListenerInstrumentation(), - new SendCallbackInstrumentation()); + new SendCallbackInstrumentation(), + new TransactionImplInstrumentation()); } } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java new file mode 100644 index 000000000000..15684a75fb53 --- /dev/null +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/TransactionImplInstrumentation.java @@ -0,0 +1,45 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; + +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons; +import java.util.concurrent.CompletableFuture; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class TransactionImplInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.pulsar.client.impl.transaction.TransactionImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("registerProducedTopic") + .and(isPublic()) + .and(takesArguments(1)) + .and(takesArgument(0, String.class)), + TransactionImplInstrumentation.class.getName() + "$RegisterProducedTopicAdvice"); + } + + @SuppressWarnings("unused") + public static class RegisterProducedTopicAdvice { + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void after(@Advice.Return(readOnly = false) CompletableFuture future) { + future = PulsarSingletons.wrap(future); + } + } +} diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 51d534f8158b..14ad9a137aa0 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -210,6 +210,24 @@ private static Context startAndEndConsumerReceive( timer.now()); } + public static CompletableFuture wrap(CompletableFuture future) { + Context parent = Context.current(); + CompletableFuture result = new CompletableFuture<>(); + future.whenComplete( + (unused, t) -> + runWithContext( + parent, + () -> { + if (t != null) { + result.completeExceptionally(t); + } else { + result.complete(null); + } + })); + + return result; + } + public static CompletableFuture> wrap( CompletableFuture> future, Timer timer, Consumer consumer) { boolean listenerContextActive = MessageListenerContext.isProcessing(); diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index 42ebf260705b..993e53536f8b 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -89,12 +89,17 @@ static void beforeAll() throws PulsarClientException { new PulsarContainer(DEFAULT_IMAGE_NAME) .withEnv("PULSAR_MEM", "-Xmx128m") .withLogConsumer(new Slf4jLogConsumer(logger)) - .withStartupTimeout(Duration.ofMinutes(2)); + .withStartupTimeout(Duration.ofMinutes(2)) + .withTransactions(); pulsar.start(); brokerHost = pulsar.getHost(); brokerPort = pulsar.getMappedPort(6650); - client = PulsarClient.builder().serviceUrl(pulsar.getPulsarBrokerUrl()).build(); + client = + PulsarClient.builder() + .serviceUrl(pulsar.getPulsarBrokerUrl()) + .enableTransaction(true) + .build(); admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build(); } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index 93f13f9ee1d8..a2c4c94b69bc 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.transaction.Transaction; import org.junit.jupiter.api.Test; class PulsarClientTest extends AbstractPulsarClientTest { @@ -671,4 +672,30 @@ void testConsumePartitionedTopicUsingBatchReceive() throws Exception { }); })); } + + @Test + void testSendMessageWithTxn() throws Exception { + String topic = "persistent://public/default/testSendMessageWithTxn"; + admin.topics().createNonPartitionedTopic(topic); + producer = + client + .newProducer(Schema.STRING) + .topic(topic) + .sendTimeout(0, TimeUnit.SECONDS) + .enableBatching(false) + .create(); + Transaction transaction = + client.newTransaction().withTransactionTimeout(15, TimeUnit.SECONDS).build().get(); + testing.runWithSpan("parent1", () -> producer.newMessage(transaction).value("test1").send()); + transaction.commit(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)))); + } }