Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging back Tracer PR without conditional check on withContext() #12867

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
089307a
initial commit
simplynaveen20 Apr 8, 2020
175222e
changing sample
simplynaveen20 Apr 8, 2020
fe03e66
adding query api tracer support
simplynaveen20 Apr 13, 2020
9f9cce8
merging with master
simplynaveen20 Apr 14, 2020
e2e8e9e
reverting change for jaeger ui local testing
simplynaveen20 Apr 14, 2020
c2fc40f
adding tracers in trigger udf and sproc
simplynaveen20 Apr 15, 2020
eb82b61
Merge branch 'latest-master' into users/nakumars/tracercosmosdb
simplynaveen20 Apr 21, 2020
e614db5
adding test case and refactoring code
simplynaveen20 Apr 21, 2020
c4ae21b
Merge branch 'latest-master' into users/nakumars/tracercosmosdb
simplynaveen20 Apr 24, 2020
eabbec9
refactoring code with PR ready version
simplynaveen20 Apr 24, 2020
36dde95
ending span on doOnComplete
simplynaveen20 Apr 24, 2020
36743ca
resolving error
simplynaveen20 May 4, 2020
cff33ad
merging with master
simplynaveen20 May 4, 2020
8ca24ec
perf improvement
simplynaveen20 May 5, 2020
21034b5
resolving merge conflict
simplynaveen20 May 6, 2020
1f9f139
fixing build error
simplynaveen20 May 6, 2020
aab85a6
resolving merge conflict
simplynaveen20 May 11, 2020
d14a0bb
resolving merge conflict
simplynaveen20 May 11, 2020
8fbad2c
check non opentelementry jar non existence in starting of all api
simplynaveen20 May 11, 2020
6b09e5c
fixing checkstyle
simplynaveen20 May 11, 2020
81040b2
resolving comments
simplynaveen20 May 11, 2020
dbc74c8
Merge branch 'latest-master' into users/nakumars/tracercosmosdb
simplynaveen20 May 11, 2020
806cdf1
check style fix as per java 8
simplynaveen20 May 11, 2020
d061db5
adding azure-core-tracing-opentelemetry in test scope
simplynaveen20 May 11, 2020
32d9172
resolving conflict
simplynaveen20 May 11, 2020
0fc0cc9
build error fix
simplynaveen20 May 11, 2020
1290ae1
test failure fix
simplynaveen20 May 12, 2020
d2a8154
test fix
simplynaveen20 May 12, 2020
30c2f89
resolving comments
simplynaveen20 May 13, 2020
e7fa831
complie error fix
simplynaveen20 May 13, 2020
3aa851d
removing query text from tracer
simplynaveen20 May 15, 2020
0debac0
resolving merge conflict
simplynaveen20 May 15, 2020
8efa451
merge with master and removing subscriber context for perf gain
simplynaveen20 May 26, 2020
e7fe8bb
resolving merge conflict
simplynaveen20 May 26, 2020
3461604
resolving merge conflict after GA
simplynaveen20 Jun 11, 2020
0984a9c
code formating change and some comment resolution
simplynaveen20 Jun 11, 2020
b9c60a2
resolving comments
simplynaveen20 Jun 11, 2020
aaabc72
moving AZ_TRACING_NAMESPACE_KEY to context from span attribute
simplynaveen20 Jun 15, 2020
8024999
updating core opentelementry jar
simplynaveen20 Jun 15, 2020
2101851
Merge branch 'latest-master' into users/nakumars/tracercosmosdb
simplynaveen20 Jul 6, 2020
1f4c6ac
Tracer code without conditional check for withContext
simplynaveen20 Jul 7, 2020
1b0becb
Merge branch 'latest-master' into users/nakumars/tracercosmosdb
simplynaveen20 Jul 7, 2020
3548215
formating chage
simplynaveen20 Jul 7, 2020
eb398d9
formating chage
simplynaveen20 Jul 7, 2020
b168fd2
Merge branch 'latest-master' into users/nakumars/tracercosmosdb
simplynaveen20 Jul 9, 2020
9c1d31f
resolving comments
simplynaveen20 Jul 9, 2020
0703a28
build error fix
simplynaveen20 Jul 9, 2020
c5283fe
Merge branch 'latest-master' into users/nakumars/tracercosmosdb
simplynaveen20 Jul 9, 2020
deb6978
resolving comment
simplynaveen20 Jul 10, 2020
8af1cc9
resolving comments
simplynaveen20 Jul 10, 2020
798839c
resolving comment
simplynaveen20 Jul 10, 2020
21c5743
resolving comment
simplynaveen20 Jul 10, 2020
42e9789
removing extra method
simplynaveen20 Jul 10, 2020
be8d7eb
removing tets dependency
simplynaveen20 Jul 10, 2020
1f9725c
adding span attribute unit test
simplynaveen20 Jul 13, 2020
fc1296f
resolving comments
simplynaveen20 Jul 13, 2020
0a7aab0
resolving comments
simplynaveen20 Jul 14, 2020
8292a83
resolving merge conflic and comments
simplynaveen20 Jul 17, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion sdk/cosmos/azure-cosmos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ Licensed under the MIT License.
</exclusion>
</exclusions>
</dependency>

<dependency>
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

import com.azure.core.annotation.ServiceClient;
import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.util.Context;
import com.azure.core.util.tracing.Tracer;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.CosmosAuthorizationTokenResolver;
import com.azure.cosmos.implementation.Database;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdMetrics;
import com.azure.cosmos.implementation.encryption.api.DataEncryptionKeyProvider;
import com.azure.cosmos.models.CosmosDatabaseResponse;
Expand All @@ -27,8 +30,11 @@
import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.util.Iterator;
import java.util.List;
import java.util.ServiceLoader;

import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount;

/**
Expand All @@ -52,8 +58,20 @@ public final class CosmosAsyncClient implements Closeable {
private final AzureKeyCredential credential;
private final boolean sessionCapturingOverride;
private final boolean enableTransportClientSharing;
private final TracerProvider tracerProvider;
private final DataEncryptionKeyProvider dataEncryptionKeyProvider;
private final boolean contentResponseOnWriteEnabled;
private static final Tracer TRACER;

static {
ServiceLoader<Tracer> serviceLoader = ServiceLoader.load(Tracer.class);
Iterator<?> iterator = serviceLoader.iterator();
if (iterator.hasNext()) {
TRACER = serviceLoader.iterator().next();
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
} else {
TRACER = null;
}
}

CosmosAsyncClient(CosmosClientBuilder builder) {
this.configs = builder.configs();
Expand All @@ -68,6 +86,7 @@ public final class CosmosAsyncClient implements Closeable {
this.dataEncryptionKeyProvider = builder.getDataEncryptionKeyProvider();
this.enableTransportClientSharing = builder.isConnectionSharingAcrossClientsEnabled();
this.contentResponseOnWriteEnabled = builder.isContentResponseOnWriteEnabled();
this.tracerProvider = new TracerProvider(TRACER);
this.asyncDocumentClient = new AsyncDocumentClient.Builder()
.withServiceEndpoint(this.serviceEndpoint)
.withMasterKeyOrResourceToken(this.keyOrResourceToken)
Expand Down Expand Up @@ -198,8 +217,9 @@ boolean isContentResponseOnWriteEnabled() {
* @return a {@link Mono} containing the cosmos database response with the created or existing database or
* an error.
*/
Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(CosmosDatabaseProperties databaseProperties) {
return createDatabaseIfNotExistsInternal(getDatabase(databaseProperties.getId()));
public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(CosmosDatabaseProperties databaseProperties) {
return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(databaseProperties.getId()),
null, context));
}

/**
Expand All @@ -213,21 +233,7 @@ Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(CosmosDatabaseProperties
* an error.
*/
public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(String id) {
return createDatabaseIfNotExistsInternal(getDatabase(id));
}

private Mono<CosmosDatabaseResponse> createDatabaseIfNotExistsInternal(CosmosAsyncDatabase database) {
return database.read().onErrorResume(exception -> {
final Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosException) {
final CosmosException cosmosException = (CosmosException) unwrappedException;
if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return createDatabase(new CosmosDatabaseProperties(database.getId()),
new CosmosDatabaseRequestOptions());
}
}
return Mono.error(unwrappedException);
});
return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(id), null, context));
samvaity marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -244,19 +250,8 @@ private Mono<CosmosDatabaseResponse> createDatabaseIfNotExistsInternal(CosmosAsy
* @return the mono.
*/
public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(String id, ThroughputProperties throughputProperties) {
samvaity marked this conversation as resolved.
Show resolved Hide resolved
return this.getDatabase(id).read().onErrorResume(exception -> {
final Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosException) {
final CosmosException cosmosException = (CosmosException) unwrappedException;
if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions();
ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
return createDatabase(new CosmosDatabaseProperties(id),
options);
}
}
return Mono.error(unwrappedException);
});
return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(id),
throughputProperties, context));
}

/**
Expand All @@ -273,14 +268,10 @@ public Mono<CosmosDatabaseResponse> createDatabaseIfNotExists(String id, Through
*/
public Mono<CosmosDatabaseResponse> createDatabase(CosmosDatabaseProperties databaseProperties,
CosmosDatabaseRequestOptions options) {
if (options == null) {
options = new CosmosDatabaseRequestOptions();
}
final CosmosDatabaseRequestOptions requestOptions = options == null ? new CosmosDatabaseRequestOptions() : options;
Database wrappedDatabase = new Database();
wrappedDatabase.setId(databaseProperties.getId());
return asyncDocumentClient.createDatabase(wrappedDatabase, ModelBridgeInternal.toRequestOptions(options))
.map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse))
.single();
return withContext(context -> createDatabaseInternal(wrappedDatabase, requestOptions, context));
}

/**
Expand Down Expand Up @@ -332,12 +323,12 @@ public Mono<CosmosDatabaseResponse> createDatabase(CosmosDatabaseProperties data
if (options == null) {
options = new CosmosDatabaseRequestOptions();
}

ModelBridgeInternal.setThroughputProperties(options, throughputProperties);
Database wrappedDatabase = new Database();
wrappedDatabase.setId(databaseProperties.getId());
return asyncDocumentClient.createDatabase(wrappedDatabase, ModelBridgeInternal.toRequestOptions(options))
.map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse))
.single();
final CosmosDatabaseRequestOptions requestOptions = options;
return withContext(context -> createDatabaseInternal(wrappedDatabase, requestOptions, context));
samvaity marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -358,24 +349,6 @@ public Mono<CosmosDatabaseResponse> createDatabase(CosmosDatabaseProperties data
return createDatabase(databaseProperties, options);
}

/**
* Creates a database.
* <p>
* After subscription the operation will be performed.
* The {@link Mono} upon successful completion will contain a single resource response with the
* created database.
* In case of failure the {@link Mono} will error.
*
* @param id id of the database.
* @param throughput the throughput for the database.
* @return a {@link Mono} containing the single cosmos database response with the created database or an error.
*/
Mono<CosmosDatabaseResponse> createDatabase(String id, int throughput) {
CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions();
ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput));
return createDatabase(new CosmosDatabaseProperties(id), options);
}

/**
* Creates a database.
*
Expand All @@ -401,12 +374,13 @@ public Mono<CosmosDatabaseResponse> createDatabase(String id, ThroughputProperti
*/
CosmosPagedFlux<CosmosDatabaseProperties> readAllDatabases(CosmosQueryRequestOptions options) {
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
pagedFluxOptions.setTracerInformation(this.tracerProvider, "readAllDatabases", this.serviceEndpoint, null);
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().readDatabases(options)
.map(response ->
BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()),
response.getResponseHeaders()));
.map(response ->
BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()),
response.getResponseHeaders()));
});
}

Expand Down Expand Up @@ -436,7 +410,7 @@ public CosmosPagedFlux<CosmosDatabaseProperties> readAllDatabases() {
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error.
*/
public CosmosPagedFlux<CosmosDatabaseProperties> queryDatabases(String query, CosmosQueryRequestOptions options) {
samvaity marked this conversation as resolved.
Show resolved Hide resolved
return queryDatabases(new SqlQuerySpec(query), options);
return queryDatabasesInternal(new SqlQuerySpec(query), options);
}

/**
Expand All @@ -451,13 +425,7 @@ public CosmosPagedFlux<CosmosDatabaseProperties> queryDatabases(String query, Co
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error.
*/
public CosmosPagedFlux<CosmosDatabaseProperties> queryDatabases(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) {
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().queryDatabases(querySpec, options)
.map(response -> BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()),
response.getResponseHeaders()));
});
return queryDatabasesInternal(querySpec, options);
}

/**
Expand All @@ -477,4 +445,63 @@ public CosmosAsyncDatabase getDatabase(String id) {
public void close() {
asyncDocumentClient.close();
}

TracerProvider getTracerProvider(){
return this.tracerProvider;
}

private CosmosPagedFlux<CosmosDatabaseProperties> queryDatabasesInternal(SqlQuerySpec querySpec, CosmosQueryRequestOptions options){
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
pagedFluxOptions.setTracerInformation(this.tracerProvider, "queryDatabases", this.serviceEndpoint, null);
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
return getDocClientWrapper().queryDatabases(querySpec, options)
.map(response -> BridgeInternal.createFeedResponse(
ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()),
response.getResponseHeaders()));
});
}


private Mono<CosmosDatabaseResponse> createDatabaseIfNotExistsInternal(CosmosAsyncDatabase database,
ThroughputProperties throughputProperties, Context context) {
String spanName = "createDatabaseIfNotExists." + database.getId();
Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL);
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
Mono<CosmosDatabaseResponse> responseMono = database.readInternal(new CosmosDatabaseRequestOptions(),
nestedContext).onErrorResume(exception -> {
final Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosException) {
final CosmosException cosmosException = (CosmosException) unwrappedException;
if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
CosmosDatabaseRequestOptions requestOptions = new CosmosDatabaseRequestOptions();
if (throughputProperties != null) {
ModelBridgeInternal.setThroughputProperties(requestOptions, throughputProperties);
}

Database wrappedDatabase = new Database();
wrappedDatabase.setId(database.getId());
return createDatabaseInternal(wrappedDatabase,
requestOptions, nestedContext);
}
}
return Mono.error(unwrappedException);
});
return tracerProvider.traceEnabledCosmosResponsePublisher(responseMono,
context,
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
spanName,
database.getId(),
this.serviceEndpoint);
}

private Mono<CosmosDatabaseResponse> createDatabaseInternal(Database database, CosmosDatabaseRequestOptions options,
Context context) {
String spanName = "createDatabase." + database.getId();
samvaity marked this conversation as resolved.
Show resolved Hide resolved
Mono<CosmosDatabaseResponse> responseMono = asyncDocumentClient.createDatabase(database, ModelBridgeInternal.toRequestOptions(options))
.map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse))
.single();
return tracerProvider.traceEnabledCosmosResponsePublisher(responseMono,
context,
spanName,
database.getId(),
this.serviceEndpoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
// Licensed under the MIT License.
package com.azure.cosmos;

import com.azure.core.util.Context;
import com.azure.cosmos.implementation.Paths;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.models.CosmosConflictResponse;
import com.azure.cosmos.models.CosmosConflictRequestOptions;
import com.azure.cosmos.models.CosmosConflictResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import reactor.core.publisher.Mono;

import static com.azure.core.util.FluxUtil.withContext;

/**
* Read and delete conflicts
*/
Expand Down Expand Up @@ -64,9 +67,7 @@ public Mono<CosmosConflictResponse> read(CosmosConflictRequestOptions options) {
options = new CosmosConflictRequestOptions();
}
RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options);
return this.container.getDatabase().getDocClientWrapper().readConflict(getLink(), requestOptions)
.map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single();

return withContext(context -> readInternal(requestOptions, context));
}

/**
Expand All @@ -85,8 +86,7 @@ public Mono<CosmosConflictResponse> delete(CosmosConflictRequestOptions options)
options = new CosmosConflictRequestOptions();
}
RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options);
return this.container.getDatabase().getDocClientWrapper().deleteConflict(getLink(), requestOptions)
.map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single();
return withContext(context -> deleteInternal(requestOptions, context));
}

String getURIPathSegment() {
Expand All @@ -106,4 +106,27 @@ String getLink() {
builder.append(getId());
return builder.toString();
}

private Mono<CosmosConflictResponse> readInternal(RequestOptions options, Context context) {
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
String spanName = "readConflict." + getId();
Mono<CosmosConflictResponse> responseMono =
this.container.getDatabase().getDocClientWrapper().readConflict(getLink(), options)
.map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single();
return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context,
spanName,
this.container.getDatabase().getId(),
samvaity marked this conversation as resolved.
Show resolved Hide resolved
this.container.getDatabase().getClient().getServiceEndpoint());

}

private Mono<CosmosConflictResponse> deleteInternal(RequestOptions options, Context context) {
String spanName = "deleteConflict." + getId();
Mono<CosmosConflictResponse> responseMono =
this.container.getDatabase().getDocClientWrapper().deleteConflict(getLink(), options)
.map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single();
return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context,
spanName,
this.container.getDatabase().getId(),
this.container.getDatabase().getClient().getServiceEndpoint());
}
}
Loading