Skip to content

Commit

Permalink
Adding client telemetry (Azure#16822)
Browse files Browse the repository at this point in the history
* client telemetry changes

* build fix

* adding incremental change for client telemetry

* adding incremental change for client telemetry

* adding incremental change for client telemetry

* incremental check in

* Incremental work

* Incremental work

* incremental work

* adding setAutoResize on cpu and memory

* adding useragent in serialization

* Adding test file

* adding suprression for reportpayload equals method

* performance turning and resolving comments

* resolving comments and merge with latest

* resolving comments

* fixing build error

* resolving build error

* moving before and after from simple to emulator for telemetry

* merge with master and changing database field name in report payload
  • Loading branch information
simplynaveen20 authored Nov 20, 2020
1 parent f4a5de0 commit d3d49f9
Show file tree
Hide file tree
Showing 34 changed files with 1,586 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1997,6 +1997,13 @@
<Bug pattern="EQ_UNUSUAL"/>
</Match>

<!-- False positive, equals method is implemented correctly by using equality of member variables -->
<Match>
<Class name="com.azure.cosmos.implementation.clientTelemetry.ReportPayload"/>
<Method name="equals"/>
<Bug pattern="EQ_UNUSUAL"/>
</Match>

<!-- Bug: https://github.com/Azure/azure-sdk-for-java/issues/9106 -->
<Match>
<Class name="com.azure.cosmos.implementation.PathsHelper"/>
Expand Down
2 changes: 2 additions & 0 deletions eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ org.codehaus.mojo:properties-maven-plugin;1.0.0
org.codehaus.mojo:xml-maven-plugin;1.0
org.eclipse.jetty:jetty-maven-plugin;9.4.33.v20201020
org.eclipse.m2e:lifecycle-mapping;1.0.0
org.hdrhistogram:HdrHistogram;2.1.12
org.jacoco:jacoco-maven-plugin;0.8.5
org.jacoco:org.jacoco.agent;0.8.5
org.mockito:mockito-core;3.3.3
Expand Down Expand Up @@ -280,6 +281,7 @@ cosmos_io.dropwizard.metrics:metrics-graphite;4.1.0
cosmos_io.dropwizard.metrics:metrics-jvm;4.1.0
cosmos_org.mockito:mockito-core;1.10.19
cosmos_org.mpierce.metrics.reservoir:hdrhistogram-metrics-reservoir;1.1.0
cosmos_org.hdrhistogram:HdrHistogram;2.1.12

# sdk\core\azure-core-serializer-avro-jackson\pom.xml
# This dependency is needed since Jackson Avro uses an older dependency on Apache Avro which is another library.
Expand Down
9 changes: 7 additions & 2 deletions sdk/cosmos/azure-cosmos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,12 @@ Licensed under the MIT License.
<scope>test</scope>
<version>4.2</version> <!-- {x-version-update;org.apache.commons:commons-collections4;external_dependency} -->
</dependency>

<!-- https://mvnrepository.com/artifact/org.hdrhistogram/HdrHistogram -->
<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
<version>2.1.12</version> <!-- {x-version-update;cosmos_org.hdrhistogram:HdrHistogram;external_dependency} -->
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
Expand Down Expand Up @@ -201,7 +206,6 @@ Licensed under the MIT License.
<skip>true</skip>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down Expand Up @@ -232,6 +236,7 @@ Licensed under the MIT License.
<include>io.dropwizard.metrics:metrics-core:[4.1.0]</include> <!-- {x-include-update;cosmos_io.dropwizard.metrics:metrics-core;external_dependency} -->
<include>io.micrometer:micrometer-core:[1.5.6]</include> <!-- {x-include-update;io.micrometer:micrometer-core;external_dependency} -->
<include>org.slf4j:slf4j-api:[1.7.30]</include> <!-- {x-include-update;org.slf4j:slf4j-api;external_dependency} -->
<include>org.hdrhistogram:HdrHistogram:[2.1.12]</include> <!-- {x-include-update;cosmos_org.hdrhistogram:HdrHistogram;external_dependency} -->
</includes>
</bannedDependencies>
</rules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.cosmos;

import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.implementation.CosmosError;
Expand All @@ -25,6 +26,7 @@
import com.azure.cosmos.implementation.SerializationDiagnosticsContext;
import com.azure.cosmos.implementation.ServiceUnavailableException;
import com.azure.cosmos.implementation.StoredProcedureResponse;
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.Warning;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.StoreResult;
Expand All @@ -33,6 +35,7 @@
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.implementation.query.metrics.ClientSideMetrics;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosStoredProcedureProperties;
import com.azure.cosmos.models.FeedResponse;
Expand Down Expand Up @@ -70,6 +73,26 @@ public static CosmosDiagnostics createCosmosDiagnostics(DiagnosticsClientContext
return new CosmosDiagnostics(diagnosticsClientContext);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static Set<URI> getRegionsContacted(CosmosDiagnostics cosmosDiagnostics) {
return cosmosDiagnostics.clientSideRequestStatistics().getRegionsContacted();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static AsyncDocumentClient getContextClient(CosmosAsyncClient cosmosAsyncClient) {
return cosmosAsyncClient.getContextClient();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static String getServiceEndpoint(CosmosAsyncClient cosmosAsyncClient) {
return cosmosAsyncClient.getServiceEndpoint();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static boolean isClientTelemetryEnabled(CosmosAsyncClient cosmosAsyncClient) {
return cosmosAsyncClient.isClientTelemetryEnabled();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static Document documentFromObject(Object document, ObjectMapper mapper) {
return Document.fromObject(document, mapper);
Expand Down Expand Up @@ -578,6 +601,11 @@ public static CosmosDatabase createCosmosDatabase(String id, CosmosClient client
return new CosmosDatabase(id, client, database);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static TracerProvider getTracerProvider(CosmosAsyncClient client) {
return client.getTracerProvider();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static CosmosUser createCosmosUser(CosmosAsyncUser asyncUser, CosmosDatabase database, String id) {
return new CosmosUser(asyncUser, database, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.azure.cosmos;

import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.DiagnosticsInstantSerializer;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.OperationType;
Expand All @@ -12,16 +13,15 @@
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.SerializationDiagnosticsContext;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.DiagnosticsInstantSerializer;
import com.azure.cosmos.implementation.cpu.CpuMonitor;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor;
import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.StoreResult;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;

import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -120,18 +120,25 @@ void recordGatewayResponse(
CosmosException exception) {
Instant responseTime = Instant.now();
connectionMode = ConnectionMode.GATEWAY;


synchronized (this) {
if (responseTime.isAfter(this.requestEndTimeUTC)) {
this.requestEndTimeUTC = responseTime;
}

if (rxDocumentServiceRequest != null
&& rxDocumentServiceRequest.requestContext != null
&& rxDocumentServiceRequest.requestContext.retryContext != null) {
rxDocumentServiceRequest.requestContext.retryContext.retryEndTime = Instant.now();
this.retryContext = new RetryContext(rxDocumentServiceRequest.requestContext.retryContext);
URI locationEndPoint = null;
if (rxDocumentServiceRequest != null && rxDocumentServiceRequest.requestContext != null) {
locationEndPoint = rxDocumentServiceRequest.requestContext.locationEndpointToRoute;
if (rxDocumentServiceRequest.requestContext.retryContext != null) {
rxDocumentServiceRequest.requestContext.retryContext.retryEndTime = Instant.now();
this.retryContext = new RetryContext(rxDocumentServiceRequest.requestContext.retryContext);
}
}

if (locationEndPoint != null) {
this.regionsContacted.add(locationEndPoint);
}
this.gatewayStatistics = new GatewayStatistics();
if (rxDocumentServiceRequest != null) {
this.gatewayStatistics.operationType = rxDocumentServiceRequest.getOperationType();
Expand Down Expand Up @@ -312,7 +319,7 @@ public void serialize(
systemInformation.availableMemory = (maxMemory - (totalMemory - freeMemory)) + " KB";

// TODO: other system related info also can be captured using a similar approach
systemInformation.systemCpuLoad = CpuMonitor.getCpuLoad().toString();
systemInformation.systemCpuLoad = CpuMemoryMonitor.getCpuLoad().toString();
generator.writeObjectField("systemInformation", systemInformation);
} catch (Exception e) {
// Error while evaluating system information, do nothing
Expand Down Expand Up @@ -372,4 +379,4 @@ public RequestTimeline getRequestTimeline() {
return requestTimeline;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public final class CosmosAsyncClient implements Closeable {
private final TokenCredential tokenCredential;
private final boolean sessionCapturingOverride;
private final boolean enableTransportClientSharing;
private final boolean clientTelemetryEnabled;
private final TracerProvider tracerProvider;
private final boolean contentResponseOnWriteEnabled;
private static final Tracer TRACER;
Expand All @@ -85,6 +86,7 @@ public final class CosmosAsyncClient implements Closeable {
this.tokenCredential = builder.getTokenCredential();
this.sessionCapturingOverride = builder.isSessionCapturingOverrideEnabled();
this.enableTransportClientSharing = builder.isConnectionSharingAcrossClientsEnabled();
this.clientTelemetryEnabled = builder.isClientTelemetryEnabled();
this.contentResponseOnWriteEnabled = builder.isContentResponseOnWriteEnabled();
this.tracerProvider = new TracerProvider(TRACER);
this.asyncDocumentClient = new AsyncDocumentClient.Builder()
Expand Down Expand Up @@ -207,6 +209,10 @@ boolean isContentResponseOnWriteEnabled() {
return contentResponseOnWriteEnabled;
}

boolean isClientTelemetryEnabled() {
return clientTelemetryEnabled;
}

/**
* CREATE a Database if it does not already exist on the service.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.ItemDeserializer;
import com.azure.cosmos.implementation.Offer;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.Paths;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
Expand All @@ -35,7 +37,6 @@
import com.azure.cosmos.util.Beta;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -262,8 +263,12 @@ private <T> Mono<CosmosItemResponse<T>> createItemInternal(T item, CosmosItemReq
return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono,
context,
this.createItemSpanName,
getId(),
database.getId(),
database.getClient().getServiceEndpoint());
database.getClient(),
ModelBridgeInternal.getConsistencyLevel(options),
OperationType.Create,
ResourceType.Document);
}

private <T> Mono<CosmosItemResponse<T>> createItemInternal(T item, CosmosItemRequestOptions options) {
Expand Down Expand Up @@ -361,9 +366,8 @@ <T> CosmosPagedFlux<T> readAllItems(Class<T> classType) {
*/
<T> CosmosPagedFlux<T> readAllItems(CosmosQueryRequestOptions options, Class<T> classType) {
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(),
this.readAllItemsSpanName,
this.getDatabase().getClient().getServiceEndpoint(), database.getId());
pagedFluxOptions.setTracerAndTelemetryInformation(this.readAllItemsSpanName, database.getId(),
this.getId(), OperationType.ReadFeed, ResourceType.Document, this.getDatabase().getClient());
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDatabase().getDocClientWrapper().readDocuments(getLink(), options).map(
response -> prepareFeedResponse(response, classType));
Expand Down Expand Up @@ -457,8 +461,8 @@ <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInternalFu
SqlQuerySpec sqlQuerySpec, CosmosQueryRequestOptions cosmosQueryRequestOptions, Class<T> classType) {
Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> pagedFluxOptionsFluxFunction = (pagedFluxOptions -> {
String spanName = this.queryItemsSpanName;
pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), spanName,
this.getDatabase().getClient().getServiceEndpoint(), database.getId());
pagedFluxOptions.setTracerAndTelemetryInformation(spanName, database.getId(),
this.getId(), OperationType.Query, ResourceType.Document, this.getDatabase().getClient());
setContinuationTokenAndMaxItemCount(pagedFluxOptions, cosmosQueryRequestOptions);

return getDatabase().getDocClientWrapper()
Expand Down Expand Up @@ -701,18 +705,17 @@ public <T> CosmosPagedFlux<T> readAllItems(
* @param partitionKey the partition key value of the documents that need to be read
* @param options the feed options.
* @param classType the class type.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages
* @return a {@link CosmosPagedFlux} containing one or several feed response pages
* of the read Cosmos items or an error.
*/
public <T> CosmosPagedFlux<T> readAllItems(
PartitionKey partitionKey,
CosmosQueryRequestOptions options,
Class<T> classType) {

return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(),
this.readAllItemsSpanName,
this.getDatabase().getClient().getServiceEndpoint(), database.getId());
pagedFluxOptions.setTracerAndTelemetryInformation(this.readAllItemsSpanName, database.getId(),
this.getId(), OperationType.ReadFeed, ResourceType.Document, this.getDatabase().getClient());
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDatabase()
.getDocClientWrapper()
Expand Down Expand Up @@ -857,6 +860,7 @@ public CosmosPagedFlux<CosmosConflictProperties> readAllConflicts(CosmosQueryReq
pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(),
this.readAllConflictsSpanName,
this.getDatabase().getClient().getServiceEndpoint(), database.getId());

setContinuationTokenAndMaxItemCount(pagedFluxOptions, requestOptions);
return database.getDocClientWrapper().readConflicts(getLink(), requestOptions)
.map(response -> BridgeInternal.createFeedResponse(
Expand Down Expand Up @@ -961,8 +965,12 @@ private Mono<CosmosItemResponse<Object>> deleteItemInternal(
return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono,
context,
this.deleteItemSpanName,
this.getId(),
database.getId(),
database.getClient().getServiceEndpoint());
database.getClient(),
requestOptions.getConsistencyLevel(),
OperationType.Delete,
ResourceType.Document);
}

private <T> Mono<CosmosItemResponse<T>> replaceItemInternal(
Expand All @@ -977,7 +985,14 @@ private <T> Mono<CosmosItemResponse<T>> replaceItemInternal(
.map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType, getItemDeserializer()))
.single();
return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono,
context, this.replaceItemSpanName, database.getId(), database.getClient().getServiceEndpoint());
context,
this.replaceItemSpanName,
this.getId(),
database.getId(),
database.getClient(),
ModelBridgeInternal.getConsistencyLevel(options),
OperationType.Replace,
ResourceType.Document);
}

private <T> Mono<CosmosItemResponse<T>> upsertItemInternal(T item, CosmosItemRequestOptions options, Context context) {
Expand All @@ -992,8 +1007,12 @@ private <T> Mono<CosmosItemResponse<T>> upsertItemInternal(T item, CosmosItemReq
return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono,
context,
this.upsertItemSpanName,
this.getId(),
database.getId(),
database.getClient().getServiceEndpoint());
database.getClient(),
ModelBridgeInternal.getConsistencyLevel(options),
OperationType.Upsert,
ResourceType.Document);
}

private <T> Mono<CosmosItemResponse<T>> readItemInternal(
Expand All @@ -1007,8 +1026,12 @@ private <T> Mono<CosmosItemResponse<T>> readItemInternal(
return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono,
context,
this.readItemSpanName,
this.getId(),
database.getId(),
database.getClient().getServiceEndpoint());
database.getClient(),
requestOptions.getConsistencyLevel(),
OperationType.Read,
ResourceType.Document);
}

Mono<CosmosContainerResponse> read(CosmosContainerRequestOptions options, Context context) {
Expand Down
Loading

0 comments on commit d3d49f9

Please sign in to comment.