Skip to content

Commit

Permalink
[improve][pulsar] Enhance SendCallback to address PIP-363 (#11648)
Browse files Browse the repository at this point in the history
Co-authored-by: Lauri Tulmin <[email protected]>
Co-authored-by: Lauri Tulmin <[email protected]>
  • Loading branch information
3 people authored Aug 19, 2024
1 parent 2b42bc2 commit 7d004b5
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest;
import java.util.concurrent.CompletableFuture;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
Expand Down Expand Up @@ -73,7 +69,7 @@ public static class ProducerSendAsyncMethodAdvice {
public static void before(
@Advice.This ProducerImpl<?> producer,
@Advice.Argument(value = 0) Message<?> message,
@Advice.Argument(value = 1, readOnly = false) SendCallback callback) {
@Advice.Argument(value = 1) SendCallback callback) {
Context parent = Context.current();
PulsarRequest request = PulsarRequest.create(message, VirtualFieldStore.extract(producer));

Expand All @@ -82,54 +78,9 @@ public static void before(
}

Context context = producerInstrumenter().start(parent, request);
callback = new SendCallbackWrapper(context, request, callback);
}
}

public static class SendCallbackWrapper implements SendCallback {

private final Context context;
private final PulsarRequest request;
private final SendCallback delegate;

public SendCallbackWrapper(Context context, PulsarRequest request, SendCallback callback) {
this.context = context;
this.request = request;
this.delegate = callback;
}

@Override
public void sendComplete(Exception e) {
if (context == null) {
this.delegate.sendComplete(e);
return;
}

try (Scope ignore = context.makeCurrent()) {
this.delegate.sendComplete(e);
} finally {
producerInstrumenter().end(context, request, null, e);
}
}

@Override
public void addCallback(MessageImpl<?> msg, SendCallback scb) {
this.delegate.addCallback(msg, scb);
}

@Override
public SendCallback getNextSendCallback() {
return this.delegate.getNextSendCallback();
}

@Override
public MessageImpl<?> getNextMessage() {
return this.delegate.getNextMessage();
}

@Override
public CompletableFuture<MessageId> getFuture() {
return this.delegate.getFuture();
// Inject the context/request into the SendCallback. This will be extracted and used when the
// message is sent and the callback is invoked. see `SendCallbackInstrumentation`.
VirtualFieldStore.inject(callback, context, request);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
new ConsumerImplInstrumentation(),
new ProducerImplInstrumentation(),
new MessageInstrumentation(),
new MessageListenerInstrumentation());
new MessageListenerInstrumentation(),
new SendCallbackInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;

import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest;

public final class SendCallbackData {
public final Context context;
public final PulsarRequest request;

private SendCallbackData(Context context, PulsarRequest request) {
this.context = context;
this.request = request;
}

public static SendCallbackData create(Context context, PulsarRequest request) {
return new SendCallbackData(context, request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasSuperType;
import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons.producerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.impl.SendCallback;

public class SendCallbackInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
return hasClassesNamed("org.apache.pulsar.client.impl.SendCallback");
}

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return hasSuperType(named("org.apache.pulsar.client.impl.SendCallback"));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(named("sendComplete")),
SendCallbackInstrumentation.class.getName() + "$SendCallbackSendCompleteAdvice");
}

@SuppressWarnings("unused")
public static class SendCallbackSendCompleteAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This SendCallback callback,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("otelRequest") PulsarRequest request) {
// Extract the Context and PulsarRequest from the SendCallback instance.
SendCallbackData callBackData = VirtualFieldStore.extract(callback);
if (callBackData != null) {
// If the extraction was successful, store the Context and PulsarRequest in local variables.
otelContext = callBackData.context;
request = callBackData.request;
otelScope = otelContext.makeCurrent();
}
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Argument(0) Throwable t,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("otelRequest") PulsarRequest request) {
if (otelScope != null) {
// Close the Scope and end the span.
otelScope.close();
producerInstrumenter().end(otelContext, request, null, t);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.client.impl.TopicMessageImpl;

public class VirtualFieldStore {
Expand All @@ -19,6 +21,8 @@ public class VirtualFieldStore {
VirtualField.find(Producer.class, ProducerData.class);
private static final VirtualField<Consumer<?>, String> CONSUMER_FIELD =
VirtualField.find(Consumer.class, String.class);
private static final VirtualField<SendCallback, SendCallbackData> CALLBACK_FIELD =
VirtualField.find(SendCallback.class, SendCallbackData.class);

private VirtualFieldStore() {}

Expand All @@ -40,6 +44,12 @@ public static void inject(Consumer<?> instance, String serviceUrl) {
CONSUMER_FIELD.set(instance, serviceUrl);
}

public static void inject(SendCallback instance, Context context, PulsarRequest request) {
if (instance != null) {
CALLBACK_FIELD.set(instance, SendCallbackData.create(context, request));
}
}

public static Context extract(Message<?> instance) {
if (instance instanceof TopicMessageImpl<?>) {
TopicMessageImpl<?> topicMessage = (TopicMessageImpl<?>) instance;
Expand All @@ -59,4 +69,8 @@ public static ProducerData extract(Producer<?> instance) {
public static String extract(Consumer<?> instance) {
return CONSUMER_FIELD.get(instance);
}

public static SendCallbackData extract(SendCallback instance) {
return CALLBACK_FIELD.get(instance);
}
}

0 comments on commit 7d004b5

Please sign in to comment.