Skip to content

Commit

Permalink
[Tracing] Add EventHubs info on outgoing Message spans. (#9539)
Browse files Browse the repository at this point in the history
  • Loading branch information
samvaity authored Mar 27, 2020
1 parent 39d7391 commit eed24be
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,8 @@ public Context start(String spanName, Context context, ProcessKind processKind)
spanBuilder = getSpanBuilder(spanName, context);
span = spanBuilder.setSpanKind(Span.Kind.PRODUCER).startSpan();
if (span.isRecording()) {
span.setAttribute(AZ_NAMESPACE_KEY,
AttributeValue.stringAttributeValue(getOrDefault(context, AZ_TRACING_NAMESPACE_KEY, "",
String.class)));
// If span is sampled in, add additional request attributes
addSpanRequestAttributes(span, context, spanName);
}
// Add diagnostic Id and trace-headers to Context
context = setContextData(span);
Expand Down Expand Up @@ -270,6 +269,10 @@ private void addSpanRequestAttributes(Span span, Context context, String spanNam
if (messageEnqueuedTime != null) {
span.setAttribute(MESSAGE_ENQUEUED_TIME, messageEnqueuedTime);
}
String tracingNamespace = getOrDefault(context, AZ_TRACING_NAMESPACE_KEY, null, String.class);
if (tracingNamespace != null) {
span.setAttribute(AZ_NAMESPACE_KEY, tracingNamespace);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ public void startSpanProcessKindSend() {
// Start user parent span.
final Span.Builder spanBuilder = tracer.spanBuilder(METHOD_NAME);
// Add additional metadata to spans for SEND
final Context traceContext = tracingContext.addData(ENTITY_PATH_KEY, ENTITY_PATH_VALUE)
.addData(HOST_NAME_KEY, HOSTNAME_VALUE).addData(SPAN_BUILDER_KEY, spanBuilder)
final Context traceContext = tracingContext
.addData(ENTITY_PATH_KEY, ENTITY_PATH_VALUE)
.addData(HOST_NAME_KEY, HOSTNAME_VALUE)
.addData(SPAN_BUILDER_KEY, spanBuilder)
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE);

// Act
Expand All @@ -154,6 +156,7 @@ public void startSpanProcessKindSend() {
{
put(MESSAGE_BUS_DESTINATION, AttributeValue.stringAttributeValue(ENTITY_PATH_VALUE));
put(PEER_ENDPOINT, AttributeValue.stringAttributeValue(HOSTNAME_VALUE));
put(AZ_NAMESPACE_KEY, AttributeValue.stringAttributeValue(AZ_NAMESPACE_VALUE));
}
};
verifySpanAttributes(attributeMap, expectedAttributeMap);
Expand All @@ -177,17 +180,28 @@ public void startSpanProcessKindMessage() {
// verify diagnostic id and span context returned
assertNotNull(updatedContext.getData(SPAN_CONTEXT_KEY).get());
assertNotNull(updatedContext.getData(DIAGNOSTIC_ID_KEY).get());

final Map<String, AttributeValue> attributeMap = recordEventsSpan.toSpanData().getAttributes();
HashMap<String, AttributeValue> expectedAttributeMap = new HashMap<String, AttributeValue>() {
{
put(MESSAGE_BUS_DESTINATION, AttributeValue.stringAttributeValue(ENTITY_PATH_VALUE));
put(PEER_ENDPOINT, AttributeValue.stringAttributeValue(HOSTNAME_VALUE));
put(AZ_NAMESPACE_KEY, AttributeValue.stringAttributeValue(AZ_NAMESPACE_VALUE));
}
};
verifySpanAttributes(attributeMap, expectedAttributeMap);
}

@Test
public void startSpanProcessKindProcess() {
// Arrange
final SpanId parentSpanId = parentSpan.getContext().getSpanId();
// Add additional metadata to spans for SEND
final Context traceContext = tracingContext.addData(ENTITY_PATH_KEY, ENTITY_PATH_VALUE)
.addData(HOST_NAME_KEY, HOSTNAME_VALUE)
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE)
.addData(MESSAGE_ENQUEUED_TIME, MESSAGE_ENQUEUED_VALUE);
// Add additional metadata to spans for PROCESS
final Context traceContext = tracingContext
.addData(ENTITY_PATH_KEY, ENTITY_PATH_VALUE)
.addData(HOST_NAME_KEY, HOSTNAME_VALUE)
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE)
.addData(MESSAGE_ENQUEUED_TIME, MESSAGE_ENQUEUED_VALUE); // only in PROCESS

// Act
final Context updatedContext = openTelemetryTracer.start(METHOD_NAME, traceContext, ProcessKind.PROCESS);
Expand All @@ -202,13 +216,15 @@ public void startSpanProcessKindProcess() {
final ReadableSpan recordEventsSpan =
(ReadableSpan) updatedContext.getData(PARENT_SPAN_KEY).get();
assertEquals(Span.Kind.CONSUMER, recordEventsSpan.toSpanData().getKind());

// verify span attributes
final Map<String, AttributeValue> attributeMap = recordEventsSpan.toSpanData().getAttributes();
HashMap<String, AttributeValue> expectedAttributeMap = new HashMap<String, AttributeValue>() {
{
put(MESSAGE_BUS_DESTINATION, AttributeValue.stringAttributeValue(ENTITY_PATH_VALUE));
put(PEER_ENDPOINT, AttributeValue.stringAttributeValue(HOSTNAME_VALUE));
put(MESSAGE_ENQUEUED_TIME, AttributeValue.longAttributeValue(MESSAGE_ENQUEUED_VALUE));
put(AZ_NAMESPACE_KEY, AttributeValue.stringAttributeValue(AZ_NAMESPACE_VALUE));
}
};
verifySpanAttributes(attributeMap, expectedAttributeMap);
Expand Down Expand Up @@ -468,6 +484,7 @@ private static void assertSpanWithRemoteParent(Context updatedContext, SpanId pa

private static void verifySpanAttributes(Map<String, AttributeValue> actualAttributeMap,
Map<String, AttributeValue> expectedMapValue) {
actualAttributeMap.forEach((attributeKey, attributeValue) -> assertEquals(expectedMapValue.get(attributeKey), attributeValue));
actualAttributeMap.forEach((attributeKey, attributeValue) ->
assertEquals(expectedMapValue.get(attributeKey), attributeValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE;

Expand All @@ -54,9 +56,11 @@ public final class EventDataBatch {
private final String partitionId;
private int sizeInBytes;
private final TracerProvider tracerProvider;
private final String entityPath;
private final String hostname;

EventDataBatch(int maxMessageSize, String partitionId, String partitionKey, ErrorContextProvider contextProvider,
TracerProvider tracerProvider) {
TracerProvider tracerProvider, String entityPath, String hostname) {
this.maxMessageSize = maxMessageSize;
this.partitionKey = partitionKey;
this.partitionId = partitionId;
Expand All @@ -65,6 +69,8 @@ public final class EventDataBatch {
this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB
this.eventBytes = new byte[maxMessageSize];
this.tracerProvider = tracerProvider;
this.entityPath = entityPath;
this.hostname = hostname;
}

/**
Expand Down Expand Up @@ -144,7 +150,10 @@ private EventData traceMessageSpan(EventData eventData) {
return eventData;
} else {
// Starting the span makes the sampling decision (nothing is logged at this time)
Context eventContext = eventData.getContext().addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE);
Context eventContext = eventData.getContext()
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE)
.addData(ENTITY_PATH_KEY, this.entityPath)
.addData(HOST_NAME_KEY, this.hostname);
Context eventSpanContext = tracerProvider.startSpan(eventContext, ProcessKind.MESSAGE);
Optional<Object> eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY);
if (eventDiagnosticIdOptional.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public Mono<EventDataBatch> createBatch(CreateBatchOptions options) {
: maximumLinkSize;

return Mono.just(new EventDataBatch(batchSize, partitionId, partitionKey, link::getErrorContext,
tracerProvider));
tracerProvider, link.getEntityPath(), link.getHostname()));
}));
}

Expand Down Expand Up @@ -466,7 +466,7 @@ private Mono<Void> sendInternal(Flux<EventData> events, SendOptions options) {
.setPartitionId(options.getPartitionId())
.setMaximumSizeInBytes(batchSize);
return events.collect(new EventDataCollector(batchOptions, 1, link::getErrorContext,
tracerProvider));
tracerProvider, link.getEntityPath(), link.getHostname()));
})
.flatMap(list -> sendInternal(Flux.fromIterable(list))));
}
Expand Down Expand Up @@ -525,11 +525,13 @@ private static class EventDataCollector implements Collector<EventData, List<Eve
private final Integer maxNumberOfBatches;
private final ErrorContextProvider contextProvider;
private final TracerProvider tracerProvider;
private final String entityPath;
private final String hostname;

private volatile EventDataBatch currentBatch;

EventDataCollector(CreateBatchOptions options, Integer maxNumberOfBatches, ErrorContextProvider contextProvider,
TracerProvider tracerProvider) {
TracerProvider tracerProvider, String entityPath, String hostname) {
this.maxNumberOfBatches = maxNumberOfBatches;
this.maxMessageSize = options.getMaximumSizeInBytes() > 0
? options.getMaximumSizeInBytes()
Expand All @@ -538,9 +540,11 @@ private static class EventDataCollector implements Collector<EventData, List<Eve
this.partitionId = options.getPartitionId();
this.contextProvider = contextProvider;
this.tracerProvider = tracerProvider;
this.entityPath = entityPath;
this.hostname = hostname;

currentBatch = new EventDataBatch(maxMessageSize, partitionId, partitionKey, contextProvider,
tracerProvider);
tracerProvider, entityPath, hostname);
}

@Override
Expand All @@ -565,7 +569,7 @@ public BiConsumer<List<EventDataBatch>, EventData> accumulator() {
}

currentBatch = new EventDataBatch(maxMessageSize, partitionId, partitionKey, contextProvider,
tracerProvider);
tracerProvider, entityPath, hostname);
currentBatch.tryAdd(event);
list.add(batch);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected void afterTest() {
public void sendSmallEventsFullBatch() {
// Arrange
final EventDataBatch batch = new EventDataBatch(ClientConstants.MAX_MESSAGE_LENGTH_BYTES, null, null, contextProvider,
new TracerProvider(Collections.emptyList()));
new TracerProvider(Collections.emptyList()), getFullyQualifiedDomainName(), getEventHubName());
int count = 0;
while (batch.tryAdd(createData())) {
// We only print every 100th item or it'll be really spammy.
Expand All @@ -86,7 +86,7 @@ public void sendSmallEventsFullBatch() {
public void sendSmallEventsFullBatchPartitionKey() {
// Arrange
final EventDataBatch batch = new EventDataBatch(ClientConstants.MAX_MESSAGE_LENGTH_BYTES, null, PARTITION_KEY, contextProvider,
new TracerProvider(Collections.emptyList()));
new TracerProvider(Collections.emptyList()), getFullyQualifiedDomainName(), getEventHubName());
int count = 0;
while (batch.tryAdd(createData())) {
// We only print every 100th item or it'll be really spammy.
Expand All @@ -112,7 +112,7 @@ public void sendBatchPartitionKeyValidate() throws InterruptedException {

final SendOptions sendOptions = new SendOptions().setPartitionKey(PARTITION_KEY);
final EventDataBatch batch = new EventDataBatch(ClientConstants.MAX_MESSAGE_LENGTH_BYTES, null, PARTITION_KEY, contextProvider,
new TracerProvider(Collections.emptyList()));
new TracerProvider(Collections.emptyList()), getFullyQualifiedDomainName(), getEventHubName());
int count = 0;
while (count < 10) {
final EventData data = createData();
Expand Down Expand Up @@ -178,7 +178,7 @@ public void sendEventsFullBatchWithPartitionKey() {
// Arrange
final int maxMessageSize = 1024;
final EventDataBatch batch = new EventDataBatch(maxMessageSize, null, PARTITION_KEY, contextProvider,
new TracerProvider(Collections.emptyList()));
new TracerProvider(Collections.emptyList()), getFullyQualifiedDomainName(), getEventHubName());
final Random random = new Random();
final SendOptions sendOptions = new SendOptions().setPartitionKey(PARTITION_KEY);
int count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void setup() {
@Test
public void nullEventData() {
assertThrows(IllegalArgumentException.class, () -> {
final EventDataBatch batch = new EventDataBatch(1024, null, PARTITION_KEY, null, null);
final EventDataBatch batch = new EventDataBatch(1024, null, PARTITION_KEY, null, null, null, null);
batch.tryAdd(null);
});
}
Expand All @@ -47,7 +47,7 @@ public void payloadExceededException() {
when(errorContextProvider.getErrorContext()).thenReturn(new AmqpErrorContext("test-namespace"));

final EventDataBatch batch = new EventDataBatch(1024, null, PARTITION_KEY, errorContextProvider,
new TracerProvider(Collections.emptyList()));
new TracerProvider(Collections.emptyList()), null, null);
final EventData tooBig = new EventData(new byte[1024 * 1024 * 2]);
try {
batch.tryAdd(tooBig);
Expand All @@ -65,7 +65,7 @@ public void payloadExceededException() {
public void withinPayloadSize() {
final int maxSize = ClientConstants.MAX_MESSAGE_LENGTH_BYTES;
final EventDataBatch batch = new EventDataBatch(ClientConstants.MAX_MESSAGE_LENGTH_BYTES, null, PARTITION_KEY,
null, new TracerProvider(Collections.emptyList()));
null, new TracerProvider(Collections.emptyList()), null, null);
final EventData within = new EventData(new byte[1024]);

Assertions.assertEquals(maxSize, batch.getMaxSizeInBytes());
Expand All @@ -83,7 +83,7 @@ public void setsPartitionId() {

// Act
final EventDataBatch batch = new EventDataBatch(ClientConstants.MAX_MESSAGE_LENGTH_BYTES, partitionId,
PARTITION_KEY, null, null);
PARTITION_KEY, null, null, null, null);

// Assert
Assertions.assertEquals(PARTITION_KEY, batch.getPartitionKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_BUILDER_KEY;
Expand All @@ -79,6 +80,7 @@
class EventHubProducerAsyncClientTest {
private static final String HOSTNAME = "my-host-name";
private static final String EVENT_HUB_NAME = "my-event-hub-name";
private static final String ENTITY_PATH = HOSTNAME + ".servicebus.windows.net";

@Mock
private AmqpSendLink sendLink;
Expand Down Expand Up @@ -518,6 +520,8 @@ void startMessageSpansOnCreateBatch() {
final AmqpSendLink link = mock(AmqpSendLink.class);

when(link.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES));
when(link.getHostname()).thenReturn(HOSTNAME);
when(link.getEntityPath()).thenReturn(ENTITY_PATH);

// EC is the prefix they use when creating a link that sends to the service round-robin.
when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions)))
Expand All @@ -526,6 +530,10 @@ void startMessageSpansOnCreateBatch() {
when(tracer1.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE))).thenAnswer(
invocation -> {
Context passed = invocation.getArgument(1, Context.class);
assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE);
assertEquals(passed.getData(ENTITY_PATH_KEY).get(), ENTITY_PATH);
assertEquals(passed.getData(HOST_NAME_KEY).get(), HOSTNAME);

return passed.addData(PARENT_SPAN_KEY, "value").addData(DIAGNOSTIC_ID_KEY, "value2");
}
);
Expand Down

0 comments on commit eed24be

Please sign in to comment.