Skip to content

Commit

Permalink
Merging back Tracer PR without conditional check on withContext() (#1…
Browse files Browse the repository at this point in the history
…2867)

* initial commit

* changing sample

* adding query api tracer support

* reverting change for jaeger ui local testing

* adding tracers in trigger udf and sproc

* adding test case and refactoring code

* refactoring code with PR ready version

* ending span on doOnComplete

* resolving error

* perf improvement

* fixing build error

* resolving merge conflict

* check non opentelementry jar non existence in starting of all api

* fixing checkstyle

* resolving comments

* check style fix as per java 8

* adding azure-core-tracing-opentelemetry in test scope

* build error fix

* test failure fix

* test fix

* complie error fix

* removing query text from tracer

* code formating change and some comment resolution

* resolving comments

* moving AZ_TRACING_NAMESPACE_KEY to context from span attribute

* updating core opentelementry jar

* Tracer code without conditional check for withContext

* formating chage

* formating chage

* resolving comments

* build error fix

* resolving comment

* resolving comments

* resolving comment

* resolving comment

* removing extra method

* removing tets dependency

* adding span attribute unit test

* resolving comments

* resolving comments
  • Loading branch information
simplynaveen20 authored Jul 21, 2020
1 parent 3c41791 commit 5150e06
Show file tree
Hide file tree
Showing 19 changed files with 1,839 additions and 485 deletions.
1 change: 0 additions & 1 deletion sdk/cosmos/azure-cosmos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ Licensed under the MIT License.
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

import com.azure.core.annotation.ServiceClient;
import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.util.Context;
import com.azure.core.util.tracing.Tracer;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.CosmosAuthorizationTokenResolver;
import com.azure.cosmos.implementation.Database;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdMetrics;
import com.azure.cosmos.implementation.encryption.api.DataEncryptionKeyProvider;
import com.azure.cosmos.models.CosmosDatabaseResponse;
Expand All @@ -27,8 +30,11 @@
import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.util.Iterator;
import java.util.List;
import java.util.ServiceLoader;

import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount;

/**
Expand All @@ -52,8 +58,20 @@ public final class CosmosAsyncClient implements Closeable {
private final AzureKeyCredential credential;
private final boolean sessionCapturingOverride;
private final boolean enableTransportClientSharing;
private final TracerProvider tracerProvider;
private final DataEncryptionKeyProvider dataEncryptionKeyProvider;
private final boolean contentResponseOnWriteEnabled;
private static final Tracer TRACER;

static {
ServiceLoader<Tracer> serviceLoader = ServiceLoader.load(Tracer.class);
Iterator<?> iterator = serviceLoader.iterator();
if (iterator.hasNext()) {
TRACER = serviceLoader.iterator().next();
} else {
TRACER = null;
}
}

CosmosAsyncClient(CosmosClientBuilder builder) {
this.configs = builder.configs();
Expand All @@ -68,6 +86,7 @@ public final class CosmosAsyncClient implements Closeable {
this.dataEncryptionKeyProvider = builder.getDataEncryptionKeyProvider();
this.enableTransportClientSharing = builder.isConnectionSharingAcrossClientsEnabled();
this.contentResponseOnWriteEnabled = builder.isContentResponseOnWriteEnabled();
this.tracerProvider = new TracerProvider(TRACER);
this.asyncDocumentClient = new AsyncDocumentClient.Builder()
.withServiceEndpoint(this.serviceEndpoint)
.withMasterKeyOrResourceToken(this.keyOrResourceToken)
Expand Down Expand Up @@ -198,8 +217,9 @@ boolean isContentResponseOnWriteEnabled() {
* @return a {@link Mono} containing the cosmos database response with the created or existing database or
* an error.
*/
Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(CosmosDatabaseProperties databaseProperties) {
return createDatabaseIfNotExistsInternal(getDatabase(databaseProperties.getId()));
public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(CosmosDatabaseProperties databaseProperties) {
return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(databaseProperties.getId()),
null, context));
}

/**
Expand All @@ -213,21 +233,7 @@ Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(CosmosDatabaseProperties
* an error.
*/
public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(String id) {
return createDatabaseIfNotExistsInternal(getDatabase(id));
}

private Mono<CosmosDatabaseResponse> createDatabaseIfNotExistsInternal(CosmosAsyncDatabase database) {
return database.read().onErrorResume(exception -> {
final Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosException) {
final CosmosException cosmosException = (CosmosException) unwrappedException;
if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return createDatabase(new CosmosDatabaseProperties(database.getId()),
new CosmosDatabaseRequestOptions());
}
}
return Mono.error(unwrappedException);
});
return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(id), null, context));
}

/**
Expand All @@ -244,19 +250,8 @@ private Mono<CosmosDatabaseResponse> createDatabaseIfNotExistsInternal(CosmosAsy
* @return the mono.
*/
public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(String id, ThroughputProperties throughputProperties) {
return this.getDatabase(id).read().onErrorResume(exception -> {
final Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosException) {
final CosmosException cosmosException = (CosmosException) unwrappedException;
if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions();
ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
return createDatabase(new CosmosDatabaseProperties(id),
options);
}
}
return Mono.error(unwrappedException);
});
return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(id),
throughputProperties, context));
}

/**
Expand All @@ -273,14 +268,10 @@ public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(String id, Through
*/
public Mono<CosmosDatabaseResponse> createDatabase(CosmosDatabaseProperties databaseProperties,
CosmosDatabaseRequestOptions options) {
if (options == null) {
options = new CosmosDatabaseRequestOptions();
}
final CosmosDatabaseRequestOptions requestOptions = options == null ? new CosmosDatabaseRequestOptions() : options;
Database wrappedDatabase = new Database();
wrappedDatabase.setId(databaseProperties.getId());
return asyncDocumentClient.createDatabase(wrappedDatabase, ModelBridgeInternal.toRequestOptions(options))
.map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse))
.single();
return withContext(context -> createDatabaseInternal(wrappedDatabase, requestOptions, context));
}

/**
Expand Down Expand Up @@ -332,12 +323,12 @@ public Mono<CosmosDatabaseResponse> createDatabase(CosmosDatabaseProperties data
if (options == null) {
options = new CosmosDatabaseRequestOptions();
}

ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
Database wrappedDatabase = new Database();
wrappedDatabase.setId(databaseProperties.getId());
return asyncDocumentClient.createDatabase(wrappedDatabase, ModelBridgeInternal.toRequestOptions(options))
.map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse))
.single();
final CosmosDatabaseRequestOptions requestOptions = options;
return withContext(context -> createDatabaseInternal(wrappedDatabase, requestOptions, context));
}

/**
Expand All @@ -358,24 +349,6 @@ public Mono<CosmosDatabaseResponse> createDatabase(CosmosDatabaseProperties data
return createDatabase(databaseProperties, options);
}

/**
* Creates a database.
* <p>
* After subscription the operation will be performed.
* The {@link Mono} upon successful completion will contain a single resource response with the
* created database.
* In case of failure the {@link Mono} will error.
*
* @param id id of the database.
* @param throughput the throughput for the database.
* @return a {@link Mono} containing the single cosmos database response with the created database or an error.
*/
Mono<CosmosDatabaseResponse> createDatabase(String id, int throughput) {
CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions();
ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput));
return createDatabase(new CosmosDatabaseProperties(id), options);
}

/**
* Creates a database.
*
Expand All @@ -401,12 +374,13 @@ public Mono<CosmosDatabaseResponse> createDatabase(String id, ThroughputProperti
*/
CosmosPagedFlux<CosmosDatabaseProperties> readAllDatabases(CosmosQueryRequestOptions options) {
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
pagedFluxOptions.setTracerInformation(this.tracerProvider, "readAllDatabases", this.serviceEndpoint, null);
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().readDatabases(options)
.map(response ->
BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()),
response.getResponseHeaders()));
.map(response ->
BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()),
response.getResponseHeaders()));
});
}

Expand Down Expand Up @@ -436,7 +410,7 @@ public CosmosPagedFlux<CosmosDatabaseProperties> readAllDatabases() {
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error.
*/
public CosmosPagedFlux<CosmosDatabaseProperties> queryDatabases(String query, CosmosQueryRequestOptions options) {
return queryDatabases(new SqlQuerySpec(query), options);
return queryDatabasesInternal(new SqlQuerySpec(query), options);
}

/**
Expand All @@ -451,13 +425,7 @@ public CosmosPagedFlux<CosmosDatabaseProperties> queryDatabases(String query, Co
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error.
*/
public CosmosPagedFlux<CosmosDatabaseProperties> queryDatabases(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().queryDatabases(querySpec, options)
.map(response -> BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()),
response.getResponseHeaders()));
});
return queryDatabasesInternal(querySpec, options);
}

/**
Expand All @@ -477,4 +445,63 @@ public CosmosAsyncDatabase getDatabase(String id) {
public void close() {
asyncDocumentClient.close();
}

TracerProvider getTracerProvider(){
return this.tracerProvider;
}

private CosmosPagedFlux<CosmosDatabaseProperties> queryDatabasesInternal(SqlQuerySpec querySpec, CosmosQueryRequestOptions options){
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
pagedFluxOptions.setTracerInformation(this.tracerProvider, "queryDatabases", this.serviceEndpoint, null);
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().queryDatabases(querySpec, options)
.map(response -> BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()),
response.getResponseHeaders()));
});
}


private Mono<CosmosDatabaseResponse> createDatabaseIfNotExistsInternal(CosmosAsyncDatabase database,
ThroughputProperties throughputProperties, Context context) {
String spanName = "createDatabaseIfNotExists." + database.getId();
Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL);
Mono<CosmosDatabaseResponse> responseMono = database.readInternal(new CosmosDatabaseRequestOptions(),
nestedContext).onErrorResume(exception -> {
final Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosException) {
final CosmosException cosmosException = (CosmosException) unwrappedException;
if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
CosmosDatabaseRequestOptions requestOptions = new CosmosDatabaseRequestOptions();
if (throughputProperties != null) {
ModelBridgeInternal.setThroughputProperties(requestOptions, throughputProperties);
}

Database wrappedDatabase = new Database();
wrappedDatabase.setId(database.getId());
return createDatabaseInternal(wrappedDatabase,
requestOptions, nestedContext);
}
}
return Mono.error(unwrappedException);
});
return tracerProvider.traceEnabledCosmosResponsePublisher(responseMono,
context,
spanName,
database.getId(),
this.serviceEndpoint);
}

private Mono<CosmosDatabaseResponse> createDatabaseInternal(Database database, CosmosDatabaseRequestOptions options,
Context context) {
String spanName = "createDatabase." + database.getId();
Mono<CosmosDatabaseResponse> responseMono = asyncDocumentClient.createDatabase(database, ModelBridgeInternal.toRequestOptions(options))
.map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse))
.single();
return tracerProvider.traceEnabledCosmosResponsePublisher(responseMono,
context,
spanName,
database.getId(),
this.serviceEndpoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
// Licensed under the MIT License.
package com.azure.cosmos;

import com.azure.core.util.Context;
import com.azure.cosmos.implementation.Paths;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.models.CosmosConflictResponse;
import com.azure.cosmos.models.CosmosConflictRequestOptions;
import com.azure.cosmos.models.CosmosConflictResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import reactor.core.publisher.Mono;

import static com.azure.core.util.FluxUtil.withContext;

/**
* Read and delete conflicts
*/
Expand Down Expand Up @@ -64,9 +67,7 @@ public Mono<CosmosConflictResponse> read(CosmosConflictRequestOptions options) {
options = new CosmosConflictRequestOptions();
}
RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options);
return this.container.getDatabase().getDocClientWrapper().readConflict(getLink(), requestOptions)
.map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single();

return withContext(context -> readInternal(requestOptions, context));
}

/**
Expand All @@ -85,8 +86,7 @@ public Mono<CosmosConflictResponse> delete(CosmosConflictRequestOptions options)
options = new CosmosConflictRequestOptions();
}
RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options);
return this.container.getDatabase().getDocClientWrapper().deleteConflict(getLink(), requestOptions)
.map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single();
return withContext(context -> deleteInternal(requestOptions, context));
}

String getURIPathSegment() {
Expand All @@ -106,4 +106,27 @@ String getLink() {
builder.append(getId());
return builder.toString();
}

private Mono<CosmosConflictResponse> readInternal(RequestOptions options, Context context) {
String spanName = "readConflict." + getId();
Mono<CosmosConflictResponse> responseMono =
this.container.getDatabase().getDocClientWrapper().readConflict(getLink(), options)
.map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single();
return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context,
spanName,
this.container.getDatabase().getId(),
this.container.getDatabase().getClient().getServiceEndpoint());

}

private Mono<CosmosConflictResponse> deleteInternal(RequestOptions options, Context context) {
String spanName = "deleteConflict." + getId();
Mono<CosmosConflictResponse> responseMono =
this.container.getDatabase().getDocClientWrapper().deleteConflict(getLink(), options)
.map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single();
return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context,
spanName,
this.container.getDatabase().getId(),
this.container.getDatabase().getClient().getServiceEndpoint());
}
}
Loading

0 comments on commit 5150e06

Please sign in to comment.