Skip to content

Commit

Permalink
Add EventGrid distributed tracing (Azure#15850)
Browse files Browse the repository at this point in the history
* Add EventGrid distributed tracing
* Update version number data type from Integer to Long for two classes AcsChatMessageEventBaseProperties and AcsChatThreadEventBaseProperties
* opens com.azure.messaging.eventgrid.implementation in module-info.java
  • Loading branch information
YijunXieMS authored Oct 2, 2020
1 parent 64a0778 commit eaf0db9
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -336,6 +337,9 @@ public String getSpecVersion() {
* @return the extension attributes as an unmodifiable map.
*/
public Map<String, Object> getExtensionAttributes() {
if (this.cloudEvent.getAdditionalProperties() == null) {
return null;
}
return Collections.unmodifiableMap(this.cloudEvent.getAdditionalProperties());
}

Expand All @@ -348,6 +352,9 @@ public Map<String, Object> getExtensionAttributes() {
* @return the cloud event itself.
*/
public CloudEvent addExtensionAttribute(String name, Object value) {
if (this.cloudEvent.getAdditionalProperties() == null) {
this.cloudEvent.setAdditionalProperties(new HashMap<>());
}
this.cloudEvent.getAdditionalProperties().put(name.toLowerCase(Locale.ENGLISH), value);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.SerializerAdapter;
import com.azure.core.util.tracing.TracerProxy;
import com.azure.messaging.eventgrid.implementation.Constants;
import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImpl;
import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImplBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;

/**
* A service client that publishes events to an EventGrid topic or domain. Use {@link EventGridPublisherClientBuilder}
Expand All @@ -33,6 +38,9 @@ public final class EventGridPublisherAsyncClient {

private final EventGridServiceVersion serviceVersion;

private final ClientLogger logger = new ClientLogger(EventGridPublisherAsyncClient.class);


EventGridPublisherAsyncClient(HttpPipeline pipeline, String hostname, SerializerAdapter serializerAdapter,
EventGridServiceVersion serviceVersion) {
this.impl = new EventGridPublisherClientImplBuilder()
Expand Down Expand Up @@ -63,14 +71,19 @@ public EventGridServiceVersion getServiceVersion() {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendEvents(Iterable<EventGridEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendEvents(events, context));
}

Mono<Void> sendEvents(Iterable<EventGridEvent> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.map(EventGridEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishEventsAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishEventsAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

/**
Expand All @@ -81,14 +94,20 @@ Mono<Void> sendEvents(Iterable<EventGridEvent> events, Context context) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendCloudEvents(Iterable<CloudEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCloudEvents(events, context));
}

Mono<Void> sendCloudEvents(Iterable<CloudEvent> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
this.addCloudEventTracePlaceHolder(events);
return Flux.fromIterable(events)
.map(CloudEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishCloudEventEventsAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishCloudEventEventsAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

/**
Expand All @@ -99,13 +118,18 @@ Mono<Void> sendCloudEvents(Iterable<CloudEvent> events, Context context) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendCustomEvents(Iterable<Object> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCustomEvents(events, context));
}

Mono<Void> sendCustomEvents(Iterable<Object> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.collectList()
.flatMap(list -> this.impl.publishCustomEventEventsAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishCustomEventEventsAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

/**
Expand All @@ -116,14 +140,19 @@ Mono<Void> sendCustomEvents(Iterable<Object> events, Context context) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendEventsWithResponse(Iterable<EventGridEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendEventsWithResponse(events, context));
}

Mono<Response<Void>> sendEventsWithResponse(Iterable<EventGridEvent> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.map(EventGridEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishEventsWithResponseAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishEventsWithResponseAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

/**
Expand All @@ -134,14 +163,20 @@ Mono<Response<Void>> sendEventsWithResponse(Iterable<EventGridEvent> events, Con
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCloudEventsWithResponse(events, context));
}

Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
this.addCloudEventTracePlaceHolder(events);
return Flux.fromIterable(events)
.map(CloudEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishCloudEventEventsWithResponseAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishCloudEventEventsWithResponseAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

/**
Expand All @@ -152,12 +187,31 @@ Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events, Co
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendCustomEventsWithResponse(Iterable<Object> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCustomEventsWithResponse(events, context));
}

Mono<Response<Void>> sendCustomEventsWithResponse(Iterable<Object> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.collectList()
.flatMap(list -> this.impl.publishCustomEventEventsWithResponseAsync(this.hostname, list, context));
.flatMap(list -> this.impl.publishCustomEventEventsWithResponseAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}

private void addCloudEventTracePlaceHolder(Iterable<CloudEvent> events) {
if (TracerProxy.isTracingEnabled()) {
for (CloudEvent event : events) {
if (event.getExtensionAttributes() == null ||
(event.getExtensionAttributes().get(Constants.TRACE_PARENT) == null &&
event.getExtensionAttributes().get(Constants.TRACE_STATE) == null)) {

event.addExtensionAttribute(Constants.TRACE_PARENT, Constants.TRACE_PARENT_PLACEHOLDER_UUID);
event.addExtensionAttribute(Constants.TRACE_STATE, Constants.TRACE_STATE_PLACEHOLDER_UUID);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.JacksonAdapter;
import com.azure.core.util.serializer.SerializerAdapter;
import com.azure.core.util.tracing.TracerProxy;
import com.azure.messaging.eventgrid.implementation.CloudEventTracingPipelinePolicy;

import java.net.MalformedURLException;
import java.net.URL;
Expand Down Expand Up @@ -144,6 +146,9 @@ public EventGridPublisherAsyncClient buildAsyncClient() {

HttpPolicyProviders.addAfterRetryPolicies(httpPipelinePolicies);

if (TracerProxy.isTracingEnabled()) {
httpPipelinePolicies.add(new CloudEventTracingPipelinePolicy());
}
httpPipelinePolicies.add(new HttpLoggingPolicy(httpLogOptions));

HttpPipeline buildPipeline = new HttpPipelineBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.azure.messaging.eventgrid.implementation;

import com.azure.core.http.HttpHeader;
import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.tracing.TracerProxy;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;

import com.azure.messaging.eventgrid.CloudEvent;
/**
* This pipeline policy should be added after OpenTelemetryPolicy in the http pipeline.
*
* It checks whether the {@link HttpRequest} headers have "traceparent" or "tracestate" and whether the serialized
* http body json string for a list of {@link CloudEvent} instances has place holders
* {@link Constants#TRACE_PARENT_PLACEHOLDER} or {@link Constants#TRACE_STATE_PLACEHOLDER}.
* The place holders will be replaced by the value from headers if the headers have "traceparent" or "tracestate",
* or be removed if the headers don't have.
*
* The place holders won't exist in the json string if the {@link TracerProxy#isTracingEnabled()} returns false.
*/
public class CloudEventTracingPipelinePolicy implements HttpPipelinePolicy {
@Override
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
final HttpRequest request = context.getHttpRequest();
final HttpHeader contentType = request.getHeaders().get(Constants.CONTENT_TYPE);
StringBuilder bodyStringBuilder = new StringBuilder();
if (TracerProxy.isTracingEnabled() && contentType != null &&
Constants.CLOUD_EVENT_CONTENT_TYPE.equals(contentType.getValue())) {
return request.getBody().map(byteBuffer -> bodyStringBuilder.append(new String(byteBuffer.array(),
StandardCharsets.UTF_8)))
.then(Mono.fromCallable(() -> replaceTracingPlaceHolder(request, bodyStringBuilder)))
.then(next.process());
}
else {
return next.process();
}
}

/**
*
* @param request The {@link HttpRequest}, whose body will be mutated by replacing traceparent and tracestate
* placeholders.
* @param bodyStringBuilder The {@link StringBuilder} that contains the full HttpRequest body string.
* @return The new body string with the place holders replaced (if header has tracing)
* or removed (if header no tracing).
*/
static String replaceTracingPlaceHolder(HttpRequest request, StringBuilder bodyStringBuilder) {
final int traceParentPlaceHolderIndex = bodyStringBuilder.indexOf(Constants.TRACE_PARENT_PLACEHOLDER);
if (traceParentPlaceHolderIndex >= 0) { // There is "traceparent" placeholder in body, replace it.
final HttpHeader traceparentHeader = request.getHeaders().get(Constants.TRACE_PARENT);
bodyStringBuilder.replace(traceParentPlaceHolderIndex,
Constants.TRACE_PARENT_PLACEHOLDER.length() + traceParentPlaceHolderIndex,
traceparentHeader != null
? String.format(",\"%s\":\"%s\"", Constants.TRACE_PARENT, traceparentHeader.getValue())
: "");
}
final int traceStatePlaceHolderIndex = bodyStringBuilder.indexOf(Constants.TRACE_STATE_PLACEHOLDER);
if (traceStatePlaceHolderIndex >= 0) { // There is "tracestate" placeholder in body, replace it.
final HttpHeader tracestateHeader = request.getHeaders().get(Constants.TRACE_STATE);
bodyStringBuilder.replace(traceStatePlaceHolderIndex,
Constants.TRACE_STATE_PLACEHOLDER.length() + traceStatePlaceHolderIndex,
tracestateHeader != null
? String.format(",\"%s\":\"%s\"", Constants.TRACE_STATE, tracestateHeader.getValue())
: "");
}
String newBodyString = bodyStringBuilder.toString();
request.setHeader(Constants.CONTENT_LENGTH, String.valueOf(newBodyString.length()));
request.setBody(newBodyString);
return newBodyString;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.azure.messaging.eventgrid.implementation;

public class Constants {
public static final String CONTENT_TYPE = "Content-Type";
public static final String CONTENT_LENGTH = "Content-Length";
public static final String CLOUD_EVENT_CONTENT_TYPE = "application/cloudevents-batch+json; charset=utf-8";
public static final String TRACE_PARENT = "traceparent";
public static final String TRACE_STATE = "tracestate";
public static final String TRACE_PARENT_PLACEHOLDER_UUID = "TP-14b6b15b-74b6-4178-847e-d142aa2727b2";
public static final String TRACE_STATE_PLACEHOLDER_UUID = "TS-14b6b15b-74b6-4178-847e-d142aa2727b2";
public static final String TRACE_PARENT_PLACEHOLDER = ",\"" + TRACE_PARENT + "\":\"TP-14b6b15b-74b6-4178-847e-d142aa2727b2\"";
public static final String TRACE_STATE_PLACEHOLDER = ",\"" + TRACE_STATE + "\":\"TS-14b6b15b-74b6-4178-847e-d142aa2727b2\"";

// Please see <a href=https://docs.microsoft.com/en-us/azure/azure-resource-manager/management/azure-services-resource-providers>here</a>
// for more information on Azure resource provider namespaces.
public static final String EVENT_GRID_TRACING_NAMESPACE_VALUE = "Microsoft.EventGrid";
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class AcsChatMessageEventBaseProperties extends AcsChatEventBasePropertie
* The version of the message
*/
@JsonProperty(value = "version")
private Integer version;
private Long version;

/**
* Get the messageId property: The chat message id.
Expand Down Expand Up @@ -152,7 +152,7 @@ public AcsChatMessageEventBaseProperties setType(String type) {
*
* @return the version value.
*/
public Integer getVersion() {
public Long getVersion() {
return this.version;
}

Expand All @@ -162,7 +162,7 @@ public Integer getVersion() {
* @param version the version value to set.
* @return the AcsChatMessageEventBaseProperties object itself.
*/
public AcsChatMessageEventBaseProperties setVersion(Integer version) {
public AcsChatMessageEventBaseProperties setVersion(Long version) {
this.version = version;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class AcsChatThreadEventBaseProperties extends AcsChatEventBaseProperties
* The version of the thread
*/
@JsonProperty(value = "version")
private Integer version;
private Long version;

/**
* Get the createTime property: The original creation time of the thread.
Expand All @@ -48,7 +48,7 @@ public AcsChatThreadEventBaseProperties setCreateTime(OffsetDateTime createTime)
*
* @return the version value.
*/
public Integer getVersion() {
public Long getVersion() {
return this.version;
}

Expand All @@ -58,7 +58,7 @@ public Integer getVersion() {
* @param version the version value to set.
* @return the AcsChatThreadEventBaseProperties object itself.
*/
public AcsChatThreadEventBaseProperties setVersion(Integer version) {
public AcsChatThreadEventBaseProperties setVersion(Long version) {
this.version = version;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
exports com.azure.messaging.eventgrid;
exports com.azure.messaging.eventgrid.systemevents;

opens com.azure.messaging.eventgrid.implementation;
opens com.azure.messaging.eventgrid.implementation.models to com.fasterxml.jackson.databind;
opens com.azure.messaging.eventgrid.systemevents to com.fasterxml.jackson.databind;
}
Loading

0 comments on commit eaf0db9

Please sign in to comment.