Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pulsar] Enhance SendCallback to address PIP-363 #11648

Merged
merged 8 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest;
import java.util.concurrent.CompletableFuture;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
Expand Down Expand Up @@ -73,7 +69,7 @@ public static class ProducerSendAsyncMethodAdvice {
public static void before(
@Advice.This ProducerImpl<?> producer,
@Advice.Argument(value = 0) Message<?> message,
@Advice.Argument(value = 1, readOnly = false) SendCallback callback) {
@Advice.Argument(value = 1) SendCallback callback) {
Context parent = Context.current();
PulsarRequest request = PulsarRequest.create(message, VirtualFieldStore.extract(producer));

Expand All @@ -82,54 +78,9 @@ public static void before(
}

Context context = producerInstrumenter().start(parent, request);
callback = new SendCallbackWrapper(context, request, callback);
dao-jun marked this conversation as resolved.
Show resolved Hide resolved
}
}

public static class SendCallbackWrapper implements SendCallback {

private final Context context;
private final PulsarRequest request;
private final SendCallback delegate;

public SendCallbackWrapper(Context context, PulsarRequest request, SendCallback callback) {
this.context = context;
this.request = request;
this.delegate = callback;
}

@Override
public void sendComplete(Exception e) {
if (context == null) {
this.delegate.sendComplete(e);
return;
}

try (Scope ignore = context.makeCurrent()) {
this.delegate.sendComplete(e);
} finally {
producerInstrumenter().end(context, request, null, e);
}
}

@Override
public void addCallback(MessageImpl<?> msg, SendCallback scb) {
this.delegate.addCallback(msg, scb);
}

@Override
public SendCallback getNextSendCallback() {
return this.delegate.getNextSendCallback();
}

@Override
public MessageImpl<?> getNextMessage() {
return this.delegate.getNextMessage();
}

@Override
public CompletableFuture<MessageId> getFuture() {
return this.delegate.getFuture();
// Inject the context/request into the SendCallback. This will be extracted and used when the
// message is sent and the callback is invoked. see `SendCallbackInstrumentation`.
VirtualFieldStore.inject(callback, context, request);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
new ConsumerImplInstrumentation(),
new ProducerImplInstrumentation(),
new MessageInstrumentation(),
new MessageListenerInstrumentation());
new MessageListenerInstrumentation(),
new SendCallbackInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;

import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest;

public final class SendCallbackData {
public final Context context;
public final PulsarRequest request;

private SendCallbackData(Context context, PulsarRequest request) {
this.context = context;
this.request = request;
}

public static SendCallbackData create(Context context, PulsarRequest request) {
return new SendCallbackData(context, request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasSuperType;
import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons.producerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.impl.SendCallback;

public class SendCallbackInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
return hasClassesNamed("org.apache.pulsar.client.impl.SendCallback");
}

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return hasSuperType(named("org.apache.pulsar.client.impl.SendCallback"));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(named("sendComplete")),
SendCallbackInstrumentation.class.getName() + "$SendCallbackSendCompleteAdvice");
}

@SuppressWarnings("unused")
public static class SendCallbackSendCompleteAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This SendCallback callback,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("otelRequest") PulsarRequest request) {
// Extract the Context and PulsarRequest from the SendCallback instance.
SendCallbackData callBackData = VirtualFieldStore.extract(callback);
if (callBackData != null) {
// If the extraction was successful, store the Context and PulsarRequest in local variables.
otelContext = callBackData.context;
request = callBackData.request;
otelScope = otelContext.makeCurrent();
}
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Argument(0) Throwable t,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("otelRequest") PulsarRequest request) {
if (otelScope != null) {
// Close the Scope and end the span.
otelScope.close();
producerInstrumenter().end(otelContext, request, null, t);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.client.impl.TopicMessageImpl;

public class VirtualFieldStore {
Expand All @@ -19,6 +21,8 @@ public class VirtualFieldStore {
VirtualField.find(Producer.class, ProducerData.class);
private static final VirtualField<Consumer<?>, String> CONSUMER_FIELD =
VirtualField.find(Consumer.class, String.class);
private static final VirtualField<SendCallback, SendCallbackData> CALLBACK_FIELD =
VirtualField.find(SendCallback.class, SendCallbackData.class);

private VirtualFieldStore() {}

Expand All @@ -40,6 +44,12 @@ public static void inject(Consumer<?> instance, String serviceUrl) {
CONSUMER_FIELD.set(instance, serviceUrl);
}

public static void inject(SendCallback instance, Context context, PulsarRequest request) {
if (instance != null) {
CALLBACK_FIELD.set(instance, SendCallbackData.create(context, request));
}
}

public static Context extract(Message<?> instance) {
if (instance instanceof TopicMessageImpl<?>) {
TopicMessageImpl<?> topicMessage = (TopicMessageImpl<?>) instance;
Expand All @@ -59,4 +69,8 @@ public static ProducerData extract(Producer<?> instance) {
public static String extract(Consumer<?> instance) {
return CONSUMER_FIELD.get(instance);
}

public static SendCallbackData extract(SendCallback instance) {
return CALLBACK_FIELD.get(instance);
}
}
Loading