Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracer support in cosmosdb #10265

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
089307a
initial commit
simplynaveen20 Apr 8, 2020
175222e
changing sample
simplynaveen20 Apr 8, 2020
fe03e66
adding query api tracer support
simplynaveen20 Apr 13, 2020
9f9cce8
merging with master
simplynaveen20 Apr 14, 2020
e2e8e9e
reverting change for jaeger ui local testing
simplynaveen20 Apr 14, 2020
c2fc40f
adding tracers in trigger udf and sproc
simplynaveen20 Apr 15, 2020
eb82b61
Merge branch 'latest-master' into users/nakumars/tracercosmosdb
simplynaveen20 Apr 21, 2020
e614db5
adding test case and refactoring code
simplynaveen20 Apr 21, 2020
c4ae21b
Merge branch 'latest-master' into users/nakumars/tracercosmosdb
simplynaveen20 Apr 24, 2020
eabbec9
refactoring code with PR ready version
simplynaveen20 Apr 24, 2020
36dde95
ending span on doOnComplete
simplynaveen20 Apr 24, 2020
36743ca
resolving error
simplynaveen20 May 4, 2020
cff33ad
merging with master
simplynaveen20 May 4, 2020
8ca24ec
perf improvement
simplynaveen20 May 5, 2020
21034b5
resolving merge conflict
simplynaveen20 May 6, 2020
1f9f139
fixing build error
simplynaveen20 May 6, 2020
aab85a6
resolving merge conflict
simplynaveen20 May 11, 2020
d14a0bb
resolving merge conflict
simplynaveen20 May 11, 2020
8fbad2c
check non opentelementry jar non existence in starting of all api
simplynaveen20 May 11, 2020
6b09e5c
fixing checkstyle
simplynaveen20 May 11, 2020
81040b2
resolving comments
simplynaveen20 May 11, 2020
dbc74c8
Merge branch 'latest-master' into users/nakumars/tracercosmosdb
simplynaveen20 May 11, 2020
806cdf1
check style fix as per java 8
simplynaveen20 May 11, 2020
d061db5
adding azure-core-tracing-opentelemetry in test scope
simplynaveen20 May 11, 2020
32d9172
resolving conflict
simplynaveen20 May 11, 2020
0fc0cc9
build error fix
simplynaveen20 May 11, 2020
1290ae1
test failure fix
simplynaveen20 May 12, 2020
d2a8154
test fix
simplynaveen20 May 12, 2020
30c2f89
resolving comments
simplynaveen20 May 13, 2020
e7fa831
complie error fix
simplynaveen20 May 13, 2020
3aa851d
removing query text from tracer
simplynaveen20 May 15, 2020
0debac0
resolving merge conflict
simplynaveen20 May 15, 2020
8efa451
merge with master and removing subscriber context for perf gain
simplynaveen20 May 26, 2020
e7fe8bb
resolving merge conflict
simplynaveen20 May 26, 2020
3461604
resolving merge conflict after GA
simplynaveen20 Jun 11, 2020
0984a9c
code formating change and some comment resolution
simplynaveen20 Jun 11, 2020
b9c60a2
resolving comments
simplynaveen20 Jun 11, 2020
aaabc72
moving AZ_TRACING_NAMESPACE_KEY to context from span attribute
simplynaveen20 Jun 15, 2020
8024999
updating core opentelementry jar
simplynaveen20 Jun 15, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion sdk/cosmos/azure-cosmos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,18 @@ Licensed under the MIT License.
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-tracing-opentelemetry</artifactId>
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
<scope>test</scope>
<version>1.0.0-beta.5</version> <!-- {x-version-update;com.azure:azure-core-tracing-opentelemetry;dependency} -->
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@

import com.azure.core.annotation.ServiceClient;
import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.util.Context;
import com.azure.core.util.tracing.Tracer;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.CosmosAuthorizationTokenResolver;
import com.azure.cosmos.implementation.Database;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdMetrics;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosDatabaseProperties;
import com.azure.cosmos.models.CosmosDatabaseRequestOptions;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosPermissionProperties;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
Expand All @@ -27,7 +30,9 @@

import java.io.Closeable;
import java.util.List;
import java.util.ServiceLoader;

import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount;

/**
Expand All @@ -51,6 +56,7 @@ public final class CosmosAsyncClient implements Closeable {
private final AzureKeyCredential credential;
private final boolean sessionCapturingOverride;
private final boolean enableTransportClientSharing;
private final TracerProvider tracerProvider;
private final boolean contentResponseOnWriteEnabled;

CosmosAsyncClient(CosmosClientBuilder builder) {
Expand All @@ -65,6 +71,7 @@ public final class CosmosAsyncClient implements Closeable {
this.sessionCapturingOverride = builder.isSessionCapturingOverrideEnabled();
this.enableTransportClientSharing = builder.isConnectionSharingAcrossClientsEnabled();
this.contentResponseOnWriteEnabled = builder.isContentResponseOnWriteEnabled();
this.tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class));
this.asyncDocumentClient = new AsyncDocumentClient.Builder()
.withServiceEndpoint(this.serviceEndpoint)
.withMasterKeyOrResourceToken(this.keyOrResourceToken)
Expand Down Expand Up @@ -194,8 +201,14 @@ boolean isContentResponseOnWriteEnabled() {
* @return a {@link Mono} containing the cosmos database response with the created or existing database or
* an error.
*/
Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(CosmosDatabaseProperties databaseProperties) {
return createDatabaseIfNotExistsInternal(getDatabase(databaseProperties.getId()));
public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(CosmosDatabaseProperties databaseProperties) {
if(!getTracerProvider().isEnabled()) {
CosmosAsyncDatabase database = getDatabase(databaseProperties.getId());
return createDatabaseIfNotExistsInternal(database.read(), database, null, null);
}

return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(databaseProperties.getId()),
null, context));
}

/**
Expand All @@ -209,21 +222,12 @@ Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(CosmosDatabaseProperties
* an error.
*/
public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(String id) {
return createDatabaseIfNotExistsInternal(getDatabase(id));
}
if(!getTracerProvider().isEnabled()) {
CosmosAsyncDatabase database = getDatabase(id);
return createDatabaseIfNotExistsInternal(database.read(), database, null,null);
}

private Mono<CosmosDatabaseResponse> createDatabaseIfNotExistsInternal(CosmosAsyncDatabase database) {
return database.read().onErrorResume(exception -> {
final Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosException) {
final CosmosException cosmosException = (CosmosException) unwrappedException;
if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return createDatabase(new CosmosDatabaseProperties(database.getId()),
new CosmosDatabaseRequestOptions());
}
}
return Mono.error(unwrappedException);
});
return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(id), null, context));
}

/**
Expand All @@ -240,19 +244,13 @@ private Mono<CosmosDatabaseResponse> createDatabaseIfNotExistsInternal(CosmosAsy
* @return the mono.
*/
public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(String id, ThroughputProperties throughputProperties) {
return this.getDatabase(id).read().onErrorResume(exception -> {
final Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosException) {
final CosmosException cosmosException = (CosmosException) unwrappedException;
if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions();
ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
return createDatabase(new CosmosDatabaseProperties(id),
options);
}
}
return Mono.error(unwrappedException);
});
if(!getTracerProvider().isEnabled()) {
CosmosAsyncDatabase database = getDatabase(id);
return createDatabaseIfNotExistsInternal(database.read(), database, throughputProperties, null);
}

return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(id),
throughputProperties, context));
}

/**
Expand All @@ -274,9 +272,12 @@ public Mono<CosmosDatabaseResponse> createDatabase(CosmosDatabaseProperties data
}
Database wrappedDatabase = new Database();
wrappedDatabase.setId(databaseProperties.getId());
return asyncDocumentClient.createDatabase(wrappedDatabase, ModelBridgeInternal.toRequestOptions(options))
.map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse))
.single();
if(!getTracerProvider().isEnabled()) {
return createDatabaseInternal(wrappedDatabase, options);
}

final CosmosDatabaseRequestOptions requestOptions = options;
return withContext(context -> createDatabaseInternal(wrappedDatabase, requestOptions, context));
}

/**
Expand Down Expand Up @@ -331,9 +332,13 @@ public Mono<CosmosDatabaseResponse> createDatabase(CosmosDatabaseProperties data
ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
Database wrappedDatabase = new Database();
wrappedDatabase.setId(databaseProperties.getId());
return asyncDocumentClient.createDatabase(wrappedDatabase, ModelBridgeInternal.toRequestOptions(options))
.map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse))
.single();
if (!getTracerProvider().isEnabled()) {
return createDatabaseInternal(wrappedDatabase, options);
}


final CosmosDatabaseRequestOptions requestOptions = options;
return withContext(context -> createDatabaseInternal(wrappedDatabase, requestOptions, context));
}

/**
Expand Down Expand Up @@ -397,13 +402,15 @@ public Mono<CosmosDatabaseResponse> createDatabase(String id, ThroughputProperti
*/
CosmosPagedFlux<CosmosDatabaseProperties> readAllDatabases(CosmosQueryRequestOptions options) {
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
String spanName = "readAllDatabases";
pagedFluxOptions.setTracerInformation(this.tracerProvider, spanName, this.serviceEndpoint, null);
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().readDatabases(options)
.map(response ->
BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()),
response.getResponseHeaders()));
});
}, this.tracerProvider.isEnabled());
}

/**
Expand Down Expand Up @@ -432,7 +439,7 @@ public CosmosPagedFlux<CosmosDatabaseProperties> readAllDatabases() {
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error.
*/
public CosmosPagedFlux<CosmosDatabaseProperties> queryDatabases(String query, CosmosQueryRequestOptions options) {
return queryDatabases(new SqlQuerySpec(query), options);
return queryDatabasesInternal(new SqlQuerySpec(query), options);
}

/**
Expand All @@ -447,13 +454,7 @@ public CosmosPagedFlux<CosmosDatabaseProperties> queryDatabases(String query, Co
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error.
*/
public CosmosPagedFlux<CosmosDatabaseProperties> queryDatabases(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().queryDatabases(querySpec, options)
.map(response -> BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()),
response.getResponseHeaders()));
});
return queryDatabasesInternal(querySpec, options);
}

/**
Expand All @@ -473,4 +474,77 @@ public CosmosAsyncDatabase getDatabase(String id) {
public void close() {
asyncDocumentClient.close();
}

TracerProvider getTracerProvider(){
return this.tracerProvider;
}

private CosmosPagedFlux<CosmosDatabaseProperties> queryDatabasesInternal(SqlQuerySpec querySpec, CosmosQueryRequestOptions options){
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
String spanName = "queryDatabases";
pagedFluxOptions.setTracerInformation(this.tracerProvider, spanName, this.serviceEndpoint, null);
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().queryDatabases(querySpec, options)
.map(response -> BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()),
response.getResponseHeaders()));
}, this.tracerProvider.isEnabled());
}


private Mono<CosmosDatabaseResponse> createDatabaseIfNotExistsInternal(CosmosAsyncDatabase database,
ThroughputProperties throughputProperties, Context context) {
String spanName = "createDatabaseIfNotExists." + database.getId();
Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL);
Mono<CosmosDatabaseResponse> responseMono = createDatabaseIfNotExistsInternal(database.readInternal(new CosmosDatabaseRequestOptions(), nestedContext), database, throughputProperties, nestedContext);
return tracerProvider.traceEnabledCosmosResponsePublisher(responseMono,
context,
spanName,
database.getId(),
this.serviceEndpoint);
}

private Mono<CosmosDatabaseResponse> createDatabaseIfNotExistsInternal(Mono<CosmosDatabaseResponse> responseMono, CosmosAsyncDatabase database, ThroughputProperties throughputProperties, Context context) {
return responseMono.onErrorResume(exception -> {
final Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosException) {
final CosmosException cosmosException = (CosmosException) unwrappedException;
if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
CosmosDatabaseRequestOptions requestOptions = new CosmosDatabaseRequestOptions();
if(throughputProperties != null) {
ModelBridgeInternal.setThroughputProperties(requestOptions, throughputProperties);
}

if (context != null) {
Database wrappedDatabase = new Database();
wrappedDatabase.setId(database.getId());
return createDatabaseInternal(wrappedDatabase,
requestOptions, context);
}

return createDatabase(new CosmosDatabaseProperties(database.getId()),
requestOptions);
}
}
return Mono.error(unwrappedException);
});
}


private Mono<CosmosDatabaseResponse> createDatabaseInternal(Database database, CosmosDatabaseRequestOptions options,
Context context) {
String spanName = "createDatabase." + database.getId();
Mono<CosmosDatabaseResponse> responseMono = createDatabaseInternal(database, options);
return tracerProvider.traceEnabledCosmosResponsePublisher(responseMono,
context,
spanName,
database.getId(),
this.serviceEndpoint);
}

private Mono<CosmosDatabaseResponse> createDatabaseInternal(Database database, CosmosDatabaseRequestOptions options) {
return asyncDocumentClient.createDatabase(database, ModelBridgeInternal.toRequestOptions(options))
.map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse))
.single();
}
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
// Licensed under the MIT License.
package com.azure.cosmos;

import com.azure.core.util.Context;
import com.azure.cosmos.implementation.Paths;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.models.CosmosConflictResponse;
import com.azure.cosmos.models.CosmosConflictRequestOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
import reactor.core.publisher.Mono;

import static com.azure.core.util.FluxUtil.withContext;

/**
* Read and delete conflicts
*/
Expand Down Expand Up @@ -64,9 +67,11 @@ public Mono<CosmosConflictResponse> read(CosmosConflictRequestOptions options) {
options = new CosmosConflictRequestOptions();
}
RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options);
return this.container.getDatabase().getDocClientWrapper().readConflict(getLink(), requestOptions)
.map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single();
if (!this.container.getDatabase().getClient().getTracerProvider().isEnabled()) {
return readInternal(requestOptions);
}

return withContext(context -> readInternal(requestOptions, context));
}

/**
Expand All @@ -85,8 +90,11 @@ public Mono<CosmosConflictResponse> delete(CosmosConflictRequestOptions options)
options = new CosmosConflictRequestOptions();
}
RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options);
return this.container.getDatabase().getDocClientWrapper().deleteConflict(getLink(), requestOptions)
.map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single();
if (!this.container.getDatabase().getClient().getTracerProvider().isEnabled()) {
return deleteInternal(requestOptions);
}

return withContext(context -> deleteInternal(requestOptions, context));
}

String getURIPathSegment() {
Expand All @@ -106,4 +114,33 @@ String getLink() {
builder.append(getId());
return builder.toString();
}

private Mono<CosmosConflictResponse> readInternal(RequestOptions options, Context context) {
String spanName = "readConflict." + getId();
Mono<CosmosConflictResponse> responseMono = this.readInternal(options);
return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context,
spanName,
this.container.getDatabase().getId(),
this.container.getDatabase().getClient().getServiceEndpoint());

}

private Mono<CosmosConflictResponse> readInternal(RequestOptions options) {
return this.container.getDatabase().getDocClientWrapper().readConflict(getLink(), options)
.map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single();
}

private Mono<CosmosConflictResponse> deleteInternal(RequestOptions options, Context context) {
String spanName = "deleteConflict." + getId();
Mono<CosmosConflictResponse> responseMono = deleteInternal(options);
return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context,
spanName,
this.container.getDatabase().getId(),
this.container.getDatabase().getClient().getServiceEndpoint());
}

private Mono<CosmosConflictResponse> deleteInternal(RequestOptions options) {
return this.container.getDatabase().getDocClientWrapper().deleteConflict(getLink(), options)
.map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single();
}
}
Loading