Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] main from Azure:main #428

Merged
merged 21 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a60959d
dpg, upgrade codegen (#33890)
weidongxu-microsoft Mar 7, 2023
08fb047
Increment package versions for communication releases (#33882)
azure-sdk Mar 7, 2023
b11db1a
Prepare March 2023 release (#33891)
sjiherzig Mar 7, 2023
69561b2
Patch release of the azure-communication-common [1.2.6] (#33895)
maximrytych-ms Mar 7, 2023
f6af51c
Purview Share Version 2023-02-15-preview: Remove model objects (#33878)
kevinmichaelbowersox Mar 7, 2023
1d0bea3
Blob SeekableByteChannel - Read Mode (#33794)
jaschrep-msft Mar 7, 2023
dd7662b
Updates to the automated patch release process (#33568)
vcolin7 Mar 7, 2023
4349df3
Onboard EventHubs onto new tracing (#33600)
lmolkova Mar 7, 2023
c53e29e
Add product slug for Web PubSub (#33899)
azure-sdk Mar 7, 2023
4f6de82
Increment package versions for digitaltwins releases (#33904)
azure-sdk Mar 7, 2023
1e384fc
Increment package versions for communication releases (#33897)
azure-sdk Mar 7, 2023
c4984d8
Fix invalid JSON in fabricbot.json (#33906)
joshfree Mar 7, 2023
836bb17
ACR: Support blob upload in chunks (#33804)
lmolkova Mar 7, 2023
5e9408a
Revert "[TA] Replace json object Map<String, Object> by BinaryData (#…
mssfang Mar 7, 2023
229d70b
add missing override methods for bespoken method optional bags (#33913)
mssfang Mar 7, 2023
5cf2390
Update azure-sdk-build-tools Repository Resource Refs in Yaml files (…
azure-sdk Mar 7, 2023
c9ceb82
simplify read me (#33914)
mssfang Mar 8, 2023
5b16f2d
Removing fixed delay in retry after timeout exceptions when getting a…
ki1729 Mar 8, 2023
5b43b30
March 2023 ServiceBus Release (#33917)
ki1729 Mar 8, 2023
9eadab1
Increment package versions for textanalytics releases (#33918)
azure-sdk Mar 8, 2023
b1f0f9a
FaultInjection-Direct (#33329)
xinlian12 Mar 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Onboard EventHubs onto new tracing (Azure#33600)
* Update tracing to new core changes
* incorporating improvements from Azure#33559
  • Loading branch information
lmolkova authored Mar 7, 2023
commit 4349df3d05552489e7946461ba9373abd7813d58
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,13 @@
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
</Match>

<!-- Incorrect flagging, null is checked in signal.hasValue() -->
<Match>
<Class name="com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer"/>
<Method name="~(.*)\$reportSyncReceiveSpan\$(.*)"/>
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
</Match>

<!-- Incorrect flagging, if the response is null a StorageException would have been thrown -->
<Match>
<Or>
Expand Down
5 changes: 5 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@

### Features Added

- Added support for tracing options and configuration. ([#33600](https://github.com/Azure/azure-sdk-for-java/issues/33600))
- Aligned with OpenTelemetry messaging semantic conventions (when latest azure-core-tracing-opentelemetry package is used). ([#33600](https://github.com/Azure/azure-sdk-for-java/issues/33600))

### Breaking Changes

### Bugs Fixed

- Fixed exception when attempting to populate trace context on received `EventData`. ([#33594](https://github.com/Azure/azure-sdk-for-java/issues/33594))

### Other Changes

## 5.15.2 (2023-02-13)
Expand Down
7 changes: 6 additions & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
<description>Libraries built on Microsoft Azure Event Hubs</description>
<url>https://github.com/Azure/azure-sdk-for-java</url>

<properties>
<javaModulesSurefireArgLine>
--add-exports com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED
--add-reads com.azure.messaging.eventhubs=com.azure.core.tracing.opentelemetry
</javaModulesSurefireArgLine>
</properties>
<distributionManagement>
<site>
<id>azure-java-build-docs</id>
Expand Down Expand Up @@ -90,7 +96,6 @@
<scope>test</scope>
</dependency>


<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-tracing-opentelemetry</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.azure.core.util.ClientOptions;
import com.azure.core.util.Configuration;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.models.SendBatchFailedContext;
import com.azure.messaging.eventhubs.models.SendBatchSucceededContext;

Expand Down Expand Up @@ -568,7 +567,7 @@ public EventHubBufferedProducerAsyncClient buildAsyncClient() {
? EventHubClientBuilder.DEFAULT_RETRY
: retryOptions;

return new EventHubBufferedProducerAsyncClient(builder, clientOptions, partitionResolver, options, EventHubsTracer.getDefaultTracer());
return new EventHubBufferedProducerAsyncClient(builder, clientOptions, partitionResolver, options, builder.createTracer());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.metrics.MeterProvider;
import com.azure.core.util.tracing.Tracer;
import com.azure.core.util.tracing.TracerProvider;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection;
import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection;
import com.azure.messaging.eventhubs.implementation.EventHubSharedKeyCredential;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import org.apache.qpid.proton.engine.SslDomain;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
Expand All @@ -56,6 +57,7 @@
import java.util.function.Supplier;
import java.util.regex.Pattern;

import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONNECTION_ID_KEY;

/**
Expand Down Expand Up @@ -842,7 +844,7 @@ EventHubAsyncClient buildAsyncClient() {
}

return new EventHubAsyncClient(processor, messageSerializer, scheduler,
isSharedConnection.get(), this::onClientClose, identifier, meter, EventHubsTracer.getDefaultTracer());
isSharedConnection.get(), this::onClientClose, identifier, meter, createTracer());
}

/**
Expand Down Expand Up @@ -902,9 +904,9 @@ void onClientClose() {
}
}

Meter createMeter() {
return MeterProvider.getDefaultProvider().createMeter(LIBRARY_NAME, LIBRARY_VERSION,
clientOptions == null ? null : clientOptions.getMetricsOptions());
Tracer createTracer() {
return TracerProvider.getDefaultProvider().createTracer(LIBRARY_NAME, LIBRARY_VERSION,
AZ_NAMESPACE_VALUE, clientOptions == null ? null : clientOptions.getTracingOptions());
}

private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer messageSerializer, Meter meter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

import com.azure.core.util.Context;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.core.util.tracing.SpanKind;
import com.azure.core.util.tracing.StartSpanOptions;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsMetricsProvider;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import reactor.core.publisher.Mono;

import static com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer.MESSAGING_BATCH_SIZE_ATTRIBUTE_NAME;
import static com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer.REACTOR_PARENT_TRACE_CONTEXT_KEY;

class EventHubsProducerInstrumentation {
Expand All @@ -36,7 +38,7 @@ <T> Mono<T> onSendBatch(Mono<T> publisher, EventDataBatch batch, String spanName
tracer.endSpan(signal.getThrowable(), span, null);
}
})
.contextWrite(reactor.util.context.Context.of(REACTOR_PARENT_TRACE_CONTEXT_KEY, startSpanWithLinks(spanName, batch, Context.NONE)));
.contextWrite(reactor.util.context.Context.of(REACTOR_PARENT_TRACE_CONTEXT_KEY, startPublishSpanWithLinks(spanName, batch, Context.NONE)));
} else {
return publisher
.doOnEach(signal -> {
Expand All @@ -51,14 +53,15 @@ public EventHubsTracer getTracer() {
return tracer;
}

private Context startSpanWithLinks(String name, EventDataBatch batch, Context context) {
Context spanBuilder = tracer.getBuilder(name, context);
private Context startPublishSpanWithLinks(String spanName, EventDataBatch batch, Context context) {
StartSpanOptions startOptions = tracer.createStartOption(SpanKind.CLIENT, EventHubsTracer.OperationName.PUBLISH);
if (batch != null) {
startOptions.setAttribute(MESSAGING_BATCH_SIZE_ATTRIBUTE_NAME, batch.getCount());
for (EventData event : batch.getEvents()) {
tracer.addLink(event.getProperties(), null, spanBuilder, event.getContext());
startOptions.addLink(tracer.createLink(event.getProperties(), null, event.getContext()));
}
}

return tracer.startSpan(name, spanBuilder, ProcessKind.SEND);
return tracer.startSpan(spanName, startOptions, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventPosition;

import java.time.Duration;
import java.util.HashMap;
import java.util.Locale;
Expand Down Expand Up @@ -74,7 +75,6 @@ public class EventProcessorClient {
* @param loadBalancerUpdateInterval The time duration between load balancing update cycles.
* @param partitionOwnershipExpirationInterval The time duration after which the ownership of partition expires.
* @param loadBalancingStrategy The load balancing strategy to use.
* @param tracer Tracer instance.
*/
EventProcessorClient(EventHubClientBuilder eventHubClientBuilder, String consumerGroup,
Supplier<PartitionProcessor> partitionProcessorFactory, CheckpointStore checkpointStore,
Expand Down Expand Up @@ -195,7 +195,6 @@ public synchronized boolean isRunning() {

private void stopProcessing() {
partitionPumpManager.stopAllPartitionPumps();

// finally, remove ownerid from checkpointstore as the processor is shutting down
checkpointStore.listOwnership(fullyQualifiedNamespace, eventHubName, consumerGroup)
.filter(ownership -> identifier.equals(ownership.getOwnerId()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.azure.core.util.ClientOptions;
import com.azure.core.util.Configuration;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.models.CloseContext;
import com.azure.messaging.eventhubs.models.ErrorContext;
Expand Down Expand Up @@ -747,7 +746,7 @@ public EventProcessorClient buildEventProcessorClient() {
return new EventProcessorClient(eventHubClientBuilder, consumerGroup,
getPartitionProcessorSupplier(), checkpointStore, trackLastEnqueuedEventProperties,
processError, initialPartitionEventPosition, maxBatchSize, maxWaitTime, processEventBatch != null,
loadBalancingUpdateInterval, partitionOwnershipExpirationInterval, loadBalancingStrategy, EventHubsTracer.getDefaultTracer());
loadBalancingUpdateInterval, partitionOwnershipExpirationInterval, loadBalancingStrategy, eventHubClientBuilder.createTracer());
}

private Supplier<PartitionProcessor> getPartitionProcessorSupplier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@

import com.azure.core.util.Context;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.core.util.tracing.SpanKind;
import com.azure.core.util.tracing.StartSpanOptions;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.implementation.MessageUtils;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.message.Message;

import java.time.Instant;
import java.time.ZoneOffset;

import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME;
import static com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer.MESSAGE_ENQUEUED_TIME_ATTRIBUTE_NAME;

public class EventHubsConsumerInstrumentation {
private static final Symbol ENQUEUED_TIME_UTC_ANNOTATION_NAME_SYMBOL = Symbol.valueOf(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue());
Expand All @@ -39,7 +42,14 @@ public Context asyncConsume(String spanName, Message message, String partitionId
Instant enqueuedTime = MessageUtils.getEnqueuedTime(message.getMessageAnnotations().getValue(), ENQUEUED_TIME_UTC_ANNOTATION_NAME_SYMBOL);
Context child = parent;
if (tracer.isEnabled() && !isSync) {
child = tracer.startSpan(spanName, tracer.setParentAndAttributes(message, enqueuedTime, parent), ProcessKind.PROCESS);
StartSpanOptions options = tracer.createStartOption(SpanKind.CONSUMER, EventHubsTracer.OperationName.PROCESS)
.setAttribute(MESSAGE_ENQUEUED_TIME_ATTRIBUTE_NAME, enqueuedTime.atOffset(ZoneOffset.UTC).toEpochSecond());

if (message.getApplicationProperties() != null) {
options.setRemoteParent(tracer.extractContext(message.getApplicationProperties().getValue()));
}

child = tracer.startSpan(spanName, options, parent);
}

meter.reportReceive(enqueuedTime, partitionId, child);
Expand Down
Loading