Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement producer part of RocketMQ new client instrumentation #6884

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Settings for the Apache RocketMQ Remoting-based client instrumentation
# Settings for the Apache RocketMQ remoting-based client instrumentation

| System property | Type | Default | Description |
|---|---|---|---|
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Library Instrumentation for Apache RocketMQ Remoting-based Client 4.0.0+
# Library Instrumentation for Apache RocketMQ remoting-based client 4.0.0+

Provides OpenTelemetry instrumentation for [Apache RocketMQ](https://rocketmq.apache.org/) remoting-based client.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("org.apache.rocketmq")
module.set("rocketmq-client-java")
versions.set("[5.0.0,)")
assertInverse.set(true)
}
}

dependencies {
library("org.apache.rocketmq:rocketmq-client-java:5.0.0")

testImplementation(project(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-5.0:testing"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture;

/** Future converter, which covert future of list into list of future. */
public class FutureConverter {
private FutureConverter() {}

public static <T> List<SettableFuture<T>> convert(SettableFuture<List<T>> future, int num) {
List<SettableFuture<T>> futures = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
SettableFuture<T> f = SettableFuture.create();
futures.add(f);
}
ListFutureCallback<T> futureCallback = new ListFutureCallback<>(futures);
Futures.addCallback(future, futureCallback, MoreExecutors.directExecutor());
return futures;
}

public static class ListFutureCallback<T> implements FutureCallback<List<T>> {
private final List<SettableFuture<T>> futures;

public ListFutureCallback(List<SettableFuture<T>> futures) {
this.futures = futures;
}

@Override
public void onSuccess(List<T> result) {
for (int i = 0; i < result.size(); i++) {
futures.get(i).set(result.get(i));
}
}

@Override
public void onFailure(Throwable t) {
for (SettableFuture<T> future : futures) {
future.setException(t);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import io.opentelemetry.context.propagation.TextMapSetter;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

enum MapSetter implements TextMapSetter<PublishingMessageImpl> {
INSTANCE;

@Override
public void set(@Nullable PublishingMessageImpl message, String key, String value) {
if (message == null) {
return;
}
Map<String, String> extraProperties = VirtualFieldStore.getExtraPropertiesByMessage(message);
if (extraProperties == null) {
extraProperties = new HashMap<>();
VirtualFieldStore.setExtraPropertiesByMessage(message, extraProperties);
}
extraProperties.put(key, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public final class RocketMqInstrumentationModule extends InstrumentationModule {
public RocketMqInstrumentationModule() {
super("rocketmq-client", "rocketmq-client-5.0");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new RocketMqPublishingMessageImplInstrumentation(), new RocketMqProducerInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import java.util.List;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

final class RocketMqInstrumenterFactory {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.rocketmq-client-5.0";

private RocketMqInstrumenterFactory() {}

public static Instrumenter<PublishingMessageImpl, SendReceiptImpl> createProducerInstrumenter(
OpenTelemetry openTelemetry, List<String> capturedHeaders) {

RocketMqProducerAttributeGetter getter = RocketMqProducerAttributeGetter.INSTANCE;
MessageOperation operation = MessageOperation.SEND;

AttributesExtractor<PublishingMessageImpl, SendReceiptImpl> attributesExtractor =
buildMessagingAttributesExtractor(getter, operation, capturedHeaders);

InstrumenterBuilder<PublishingMessageImpl, SendReceiptImpl> instrumenterBuilder =
Instrumenter.<PublishingMessageImpl, SendReceiptImpl>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE)
.setSpanStatusExtractor(
(spanStatusBuilder, message, sendReceipt, error) -> {
if (null != error) {
spanStatusBuilder.setStatus(StatusCode.ERROR);
}
});

return instrumenterBuilder.buildProducerInstrumenter(MapSetter.INSTANCE);
}

private static <T, R> MessagingAttributesExtractor<T, R> buildMessagingAttributesExtractor(
MessagingAttributesGetter<T, R> getter,
MessageOperation operation,
List<String> capturedHeaders) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(capturedHeaders)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.DELAY;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.FIFO;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.TRANSACTION;

import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import java.util.ArrayList;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

enum RocketMqProducerAttributeExtractor
implements AttributesExtractor<PublishingMessageImpl, SendReceiptImpl> {
INSTANCE;

@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, PublishingMessageImpl message) {
message.getTag().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TAG, s));
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_KEYS, new ArrayList<>(message.getKeys()));
switch (message.getMessageType()) {
case FIFO:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, FIFO);
break;
case DELAY:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, DELAY);
break;
case TRANSACTION:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, TRANSACTION);
break;
default:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL);
}
}

@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
PublishingMessageImpl message,
@Nullable SendReceiptImpl sendReceipt,
@Nullable Throwable error) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

enum RocketMqProducerAttributeGetter
implements MessagingAttributesGetter<PublishingMessageImpl, SendReceiptImpl> {
INSTANCE;

@Nullable
@Override
public String system(PublishingMessageImpl message) {
return "rocketmq";
}

@Nullable
@Override
public String destinationKind(PublishingMessageImpl message) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}

@Nullable
@Override
public String destination(PublishingMessageImpl message) {
return message.getTopic();
}

@Override
public boolean temporaryDestination(PublishingMessageImpl message) {
return false;
}

@Nullable
@Override
public String protocol(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public String protocolVersion(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public String url(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public String conversationId(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public Long messagePayloadSize(PublishingMessageImpl message) {
return (long) message.getBody().remaining();
}

@Nullable
@Override
public Long messagePayloadCompressedSize(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public String messageId(PublishingMessageImpl message, @Nullable SendReceiptImpl sendReceipt) {
return message.getMessageId().toString();
}

@Override
public List<String> header(PublishingMessageImpl message, String name) {
String value = message.getProperties().get(name);
if (value != null) {
return Collections.singletonList(value);
}
return Collections.emptyList();
}
}
Loading