diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java index 4b782edd6e28..79211824e317 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java @@ -14,18 +14,14 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; -import java.util.concurrent.CompletableFuture; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.SendCallback; @@ -73,7 +69,7 @@ public static class ProducerSendAsyncMethodAdvice { public static void before( @Advice.This ProducerImpl producer, @Advice.Argument(value = 0) Message message, - @Advice.Argument(value = 1, readOnly = false) SendCallback callback) { + @Advice.Argument(value = 1) SendCallback callback) { Context parent = Context.current(); PulsarRequest request = PulsarRequest.create(message, VirtualFieldStore.extract(producer)); @@ -82,54 +78,9 @@ public static void before( } Context context = producerInstrumenter().start(parent, request); - callback = new SendCallbackWrapper(context, request, callback); - } - } - - public static class SendCallbackWrapper implements SendCallback { - - private final Context context; - private final PulsarRequest request; - private final SendCallback delegate; - - public SendCallbackWrapper(Context context, PulsarRequest request, SendCallback callback) { - this.context = context; - this.request = request; - this.delegate = callback; - } - - @Override - public void sendComplete(Exception e) { - if (context == null) { - this.delegate.sendComplete(e); - return; - } - - try (Scope ignore = context.makeCurrent()) { - this.delegate.sendComplete(e); - } finally { - producerInstrumenter().end(context, request, null, e); - } - } - - @Override - public void addCallback(MessageImpl msg, SendCallback scb) { - this.delegate.addCallback(msg, scb); - } - - @Override - public SendCallback getNextSendCallback() { - return this.delegate.getNextSendCallback(); - } - - @Override - public MessageImpl getNextMessage() { - return this.delegate.getNextMessage(); - } - - @Override - public CompletableFuture getFuture() { - return this.delegate.getFuture(); + // Inject the context/request into the SendCallback. This will be extracted and used when the + // message is sent and the callback is invoked. see `SendCallbackInstrumentation`. + VirtualFieldStore.inject(callback, context, request); } } } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java index a6ed299c64f6..172538815cfb 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java @@ -24,6 +24,7 @@ public List typeInstrumentations() { new ConsumerImplInstrumentation(), new ProducerImplInstrumentation(), new MessageInstrumentation(), - new MessageListenerInstrumentation()); + new MessageListenerInstrumentation(), + new SendCallbackInstrumentation()); } } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackData.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackData.java new file mode 100644 index 000000000000..462843d70e31 --- /dev/null +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackData.java @@ -0,0 +1,23 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; + +import io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; + +public final class SendCallbackData { + public final Context context; + public final PulsarRequest request; + + private SendCallbackData(Context context, PulsarRequest request) { + this.context = context; + this.request = request; + } + + public static SendCallbackData create(Context context, PulsarRequest request) { + return new SendCallbackData(context, request); + } +} diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java new file mode 100644 index 000000000000..85295042e335 --- /dev/null +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java @@ -0,0 +1,75 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasSuperType; +import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons.producerInstrumenter; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.pulsar.client.impl.SendCallback; + +public class SendCallbackInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("org.apache.pulsar.client.impl.SendCallback"); + } + + @Override + public ElementMatcher typeMatcher() { + return hasSuperType(named("org.apache.pulsar.client.impl.SendCallback")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(named("sendComplete")), + SendCallbackInstrumentation.class.getName() + "$SendCallbackSendCompleteAdvice"); + } + + @SuppressWarnings("unused") + public static class SendCallbackSendCompleteAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.This SendCallback callback, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("otelRequest") PulsarRequest request) { + // Extract the Context and PulsarRequest from the SendCallback instance. + SendCallbackData callBackData = VirtualFieldStore.extract(callback); + if (callBackData != null) { + // If the extraction was successful, store the Context and PulsarRequest in local variables. + otelContext = callBackData.context; + request = callBackData.request; + otelScope = otelContext.makeCurrent(); + } + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.Argument(0) Throwable t, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("otelRequest") PulsarRequest request) { + if (otelScope != null) { + // Close the Scope and end the span. + otelScope.close(); + producerInstrumenter().end(otelContext, request, null, t); + } + } + } +} diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/VirtualFieldStore.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/VirtualFieldStore.java index 193079174c9e..9ec84944feaf 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/VirtualFieldStore.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/VirtualFieldStore.java @@ -7,9 +7,11 @@ import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.util.VirtualField; +import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.impl.SendCallback; import org.apache.pulsar.client.impl.TopicMessageImpl; public class VirtualFieldStore { @@ -19,6 +21,8 @@ public class VirtualFieldStore { VirtualField.find(Producer.class, ProducerData.class); private static final VirtualField, String> CONSUMER_FIELD = VirtualField.find(Consumer.class, String.class); + private static final VirtualField CALLBACK_FIELD = + VirtualField.find(SendCallback.class, SendCallbackData.class); private VirtualFieldStore() {} @@ -40,6 +44,12 @@ public static void inject(Consumer instance, String serviceUrl) { CONSUMER_FIELD.set(instance, serviceUrl); } + public static void inject(SendCallback instance, Context context, PulsarRequest request) { + if (instance != null) { + CALLBACK_FIELD.set(instance, SendCallbackData.create(context, request)); + } + } + public static Context extract(Message instance) { if (instance instanceof TopicMessageImpl) { TopicMessageImpl topicMessage = (TopicMessageImpl) instance; @@ -59,4 +69,8 @@ public static ProducerData extract(Producer instance) { public static String extract(Consumer instance) { return CONSUMER_FIELD.get(instance); } + + public static SendCallbackData extract(SendCallback instance) { + return CALLBACK_FIELD.get(instance); + } }