diff --git a/sdk/cosmos/azure-cosmos/pom.xml b/sdk/cosmos/azure-cosmos/pom.xml index 3fa73359a252..b08acd5a470f 100644 --- a/sdk/cosmos/azure-cosmos/pom.xml +++ b/sdk/cosmos/azure-cosmos/pom.xml @@ -128,7 +128,6 @@ Licensed under the MIT License. - com.fasterxml.jackson.module jackson-module-afterburner diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java index 3552c39a283a..c326b1095806 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java @@ -4,12 +4,15 @@ import com.azure.core.annotation.ServiceClient; import com.azure.core.credential.AzureKeyCredential; +import com.azure.core.util.Context; +import com.azure.core.util.tracing.Tracer; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ConnectionPolicy; import com.azure.cosmos.implementation.CosmosAuthorizationTokenResolver; import com.azure.cosmos.implementation.Database; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.TracerProvider; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdMetrics; import com.azure.cosmos.implementation.encryption.api.DataEncryptionKeyProvider; import com.azure.cosmos.models.CosmosDatabaseResponse; @@ -27,8 +30,11 @@ import reactor.core.publisher.Mono; import java.io.Closeable; +import java.util.Iterator; import java.util.List; +import java.util.ServiceLoader; +import static com.azure.core.util.FluxUtil.withContext; import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount; /** @@ -52,8 +58,20 @@ public final class CosmosAsyncClient implements Closeable { private final AzureKeyCredential credential; private final boolean sessionCapturingOverride; private final boolean enableTransportClientSharing; + private final TracerProvider tracerProvider; private final DataEncryptionKeyProvider dataEncryptionKeyProvider; private final boolean contentResponseOnWriteEnabled; + private static final Tracer TRACER; + + static { + ServiceLoader serviceLoader = ServiceLoader.load(Tracer.class); + Iterator iterator = serviceLoader.iterator(); + if (iterator.hasNext()) { + TRACER = serviceLoader.iterator().next(); + } else { + TRACER = null; + } + } CosmosAsyncClient(CosmosClientBuilder builder) { this.configs = builder.configs(); @@ -68,6 +86,7 @@ public final class CosmosAsyncClient implements Closeable { this.dataEncryptionKeyProvider = builder.getDataEncryptionKeyProvider(); this.enableTransportClientSharing = builder.isConnectionSharingAcrossClientsEnabled(); this.contentResponseOnWriteEnabled = builder.isContentResponseOnWriteEnabled(); + this.tracerProvider = new TracerProvider(TRACER); this.asyncDocumentClient = new AsyncDocumentClient.Builder() .withServiceEndpoint(this.serviceEndpoint) .withMasterKeyOrResourceToken(this.keyOrResourceToken) @@ -198,8 +217,9 @@ boolean isContentResponseOnWriteEnabled() { * @return a {@link Mono} containing the cosmos database response with the created or existing database or * an error. */ - Mono createDatabaseIfNotExists(CosmosDatabaseProperties databaseProperties) { - return createDatabaseIfNotExistsInternal(getDatabase(databaseProperties.getId())); + public Mono createDatabaseIfNotExists(CosmosDatabaseProperties databaseProperties) { + return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(databaseProperties.getId()), + null, context)); } /** @@ -213,21 +233,7 @@ Mono createDatabaseIfNotExists(CosmosDatabaseProperties * an error. */ public Mono createDatabaseIfNotExists(String id) { - return createDatabaseIfNotExistsInternal(getDatabase(id)); - } - - private Mono createDatabaseIfNotExistsInternal(CosmosAsyncDatabase database) { - return database.read().onErrorResume(exception -> { - final Throwable unwrappedException = Exceptions.unwrap(exception); - if (unwrappedException instanceof CosmosException) { - final CosmosException cosmosException = (CosmosException) unwrappedException; - if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { - return createDatabase(new CosmosDatabaseProperties(database.getId()), - new CosmosDatabaseRequestOptions()); - } - } - return Mono.error(unwrappedException); - }); + return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(id), null, context)); } /** @@ -244,19 +250,8 @@ private Mono createDatabaseIfNotExistsInternal(CosmosAsy * @return the mono. */ public Mono createDatabaseIfNotExists(String id, ThroughputProperties throughputProperties) { - return this.getDatabase(id).read().onErrorResume(exception -> { - final Throwable unwrappedException = Exceptions.unwrap(exception); - if (unwrappedException instanceof CosmosException) { - final CosmosException cosmosException = (CosmosException) unwrappedException; - if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { - CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions(); - ModelBridgeInternal.setThroughputProperties(options, throughputProperties); - return createDatabase(new CosmosDatabaseProperties(id), - options); - } - } - return Mono.error(unwrappedException); - }); + return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(id), + throughputProperties, context)); } /** @@ -273,14 +268,10 @@ public Mono createDatabaseIfNotExists(String id, Through */ public Mono createDatabase(CosmosDatabaseProperties databaseProperties, CosmosDatabaseRequestOptions options) { - if (options == null) { - options = new CosmosDatabaseRequestOptions(); - } + final CosmosDatabaseRequestOptions requestOptions = options == null ? new CosmosDatabaseRequestOptions() : options; Database wrappedDatabase = new Database(); wrappedDatabase.setId(databaseProperties.getId()); - return asyncDocumentClient.createDatabase(wrappedDatabase, ModelBridgeInternal.toRequestOptions(options)) - .map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse)) - .single(); + return withContext(context -> createDatabaseInternal(wrappedDatabase, requestOptions, context)); } /** @@ -332,12 +323,12 @@ public Mono createDatabase(CosmosDatabaseProperties data if (options == null) { options = new CosmosDatabaseRequestOptions(); } + ModelBridgeInternal.setThroughputProperties(options, throughputProperties); Database wrappedDatabase = new Database(); wrappedDatabase.setId(databaseProperties.getId()); - return asyncDocumentClient.createDatabase(wrappedDatabase, ModelBridgeInternal.toRequestOptions(options)) - .map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse)) - .single(); + final CosmosDatabaseRequestOptions requestOptions = options; + return withContext(context -> createDatabaseInternal(wrappedDatabase, requestOptions, context)); } /** @@ -358,24 +349,6 @@ public Mono createDatabase(CosmosDatabaseProperties data return createDatabase(databaseProperties, options); } - /** - * Creates a database. - *

- * After subscription the operation will be performed. - * The {@link Mono} upon successful completion will contain a single resource response with the - * created database. - * In case of failure the {@link Mono} will error. - * - * @param id id of the database. - * @param throughput the throughput for the database. - * @return a {@link Mono} containing the single cosmos database response with the created database or an error. - */ - Mono createDatabase(String id, int throughput) { - CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions(); - ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput)); - return createDatabase(new CosmosDatabaseProperties(id), options); - } - /** * Creates a database. * @@ -401,12 +374,13 @@ public Mono createDatabase(String id, ThroughputProperti */ CosmosPagedFlux readAllDatabases(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + pagedFluxOptions.setTracerInformation(this.tracerProvider, "readAllDatabases", this.serviceEndpoint, null); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return getDocClientWrapper().readDatabases(options) - .map(response -> - BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); + .map(response -> + BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); }); } @@ -436,7 +410,7 @@ public CosmosPagedFlux readAllDatabases() { * @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error. */ public CosmosPagedFlux queryDatabases(String query, CosmosQueryRequestOptions options) { - return queryDatabases(new SqlQuerySpec(query), options); + return queryDatabasesInternal(new SqlQuerySpec(query), options); } /** @@ -451,13 +425,7 @@ public CosmosPagedFlux queryDatabases(String query, Co * @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error. */ public CosmosPagedFlux queryDatabases(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); - return getDocClientWrapper().queryDatabases(querySpec, options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); - }); + return queryDatabasesInternal(querySpec, options); } /** @@ -477,4 +445,63 @@ public CosmosAsyncDatabase getDatabase(String id) { public void close() { asyncDocumentClient.close(); } + + TracerProvider getTracerProvider(){ + return this.tracerProvider; + } + + private CosmosPagedFlux queryDatabasesInternal(SqlQuerySpec querySpec, CosmosQueryRequestOptions options){ + return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + pagedFluxOptions.setTracerInformation(this.tracerProvider, "queryDatabases", this.serviceEndpoint, null); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); + return getDocClientWrapper().queryDatabases(querySpec, options) + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); + }); + } + + + private Mono createDatabaseIfNotExistsInternal(CosmosAsyncDatabase database, + ThroughputProperties throughputProperties, Context context) { + String spanName = "createDatabaseIfNotExists." + database.getId(); + Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL); + Mono responseMono = database.readInternal(new CosmosDatabaseRequestOptions(), + nestedContext).onErrorResume(exception -> { + final Throwable unwrappedException = Exceptions.unwrap(exception); + if (unwrappedException instanceof CosmosException) { + final CosmosException cosmosException = (CosmosException) unwrappedException; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + CosmosDatabaseRequestOptions requestOptions = new CosmosDatabaseRequestOptions(); + if (throughputProperties != null) { + ModelBridgeInternal.setThroughputProperties(requestOptions, throughputProperties); + } + + Database wrappedDatabase = new Database(); + wrappedDatabase.setId(database.getId()); + return createDatabaseInternal(wrappedDatabase, + requestOptions, nestedContext); + } + } + return Mono.error(unwrappedException); + }); + return tracerProvider.traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + database.getId(), + this.serviceEndpoint); + } + + private Mono createDatabaseInternal(Database database, CosmosDatabaseRequestOptions options, + Context context) { + String spanName = "createDatabase." + database.getId(); + Mono responseMono = asyncDocumentClient.createDatabase(database, ModelBridgeInternal.toRequestOptions(options)) + .map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse)) + .single(); + return tracerProvider.traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + database.getId(), + this.serviceEndpoint); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncConflict.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncConflict.java index 9e7a336e782f..7a69191a459a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncConflict.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncConflict.java @@ -2,13 +2,16 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Paths; import com.azure.cosmos.implementation.RequestOptions; -import com.azure.cosmos.models.CosmosConflictResponse; import com.azure.cosmos.models.CosmosConflictRequestOptions; +import com.azure.cosmos.models.CosmosConflictResponse; import com.azure.cosmos.models.ModelBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; + /** * Read and delete conflicts */ @@ -64,9 +67,7 @@ public Mono read(CosmosConflictRequestOptions options) { options = new CosmosConflictRequestOptions(); } RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options); - return this.container.getDatabase().getDocClientWrapper().readConflict(getLink(), requestOptions) - .map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single(); - + return withContext(context -> readInternal(requestOptions, context)); } /** @@ -85,8 +86,7 @@ public Mono delete(CosmosConflictRequestOptions options) options = new CosmosConflictRequestOptions(); } RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options); - return this.container.getDatabase().getDocClientWrapper().deleteConflict(getLink(), requestOptions) - .map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single(); + return withContext(context -> deleteInternal(requestOptions, context)); } String getURIPathSegment() { @@ -106,4 +106,27 @@ String getLink() { builder.append(getId()); return builder.toString(); } + + private Mono readInternal(RequestOptions options, Context context) { + String spanName = "readConflict." + getId(); + Mono responseMono = + this.container.getDatabase().getDocClientWrapper().readConflict(getLink(), options) + .map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single(); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + this.container.getDatabase().getId(), + this.container.getDatabase().getClient().getServiceEndpoint()); + + } + + private Mono deleteInternal(RequestOptions options, Context context) { + String spanName = "deleteConflict." + getId(); + Mono responseMono = + this.container.getDatabase().getDocClientWrapper().deleteConflict(getLink(), options) + .map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single(); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + this.container.getDatabase().getId(), + this.container.getDatabase().getClient().getServiceEndpoint()); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java index c48a8f82aab6..38f558c29868 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java @@ -2,22 +2,24 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Constants; -import com.azure.cosmos.implementation.InternalObjectNode; import com.azure.cosmos.implementation.Document; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.InternalObjectNode; import com.azure.cosmos.implementation.Offer; import com.azure.cosmos.implementation.Paths; import com.azure.cosmos.implementation.RequestOptions; +import com.azure.cosmos.implementation.TracerProvider; import com.azure.cosmos.implementation.ItemDeserializer; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.query.QueryInfo; -import com.azure.cosmos.models.CosmosContainerResponse; -import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosConflictProperties; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerRequestOptions; +import com.azure.cosmos.models.CosmosContainerResponse; import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.ModelBridgeInternal; @@ -33,6 +35,7 @@ import java.util.List; import java.util.stream.Collectors; +import static com.azure.core.util.FluxUtil.withContext; import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount; /** @@ -44,12 +47,40 @@ public class CosmosAsyncContainer { private final CosmosAsyncDatabase database; private final String id; private final String link; + private final String replaceContainerSpanName; + private final String deleteContainerSpanName; + private final String replaceThroughputSpanName; + private final String readThroughputSpanName; + private final String readContainerSpanName; + private final String readItemSpanName; + private final String upsertItemSpanName; + private final String deleteItemSpanName; + private final String replaceItemSpanName; + private final String createItemSpanName; + private final String readAllItemsSpanName; + private final String queryItemsSpanName; + private final String readAllConflictsSpanName; + private final String queryConflictsSpanName; private CosmosAsyncScripts scripts; CosmosAsyncContainer(String id, CosmosAsyncDatabase database) { this.id = id; this.database = database; this.link = getParentLink() + "/" + getURIPathSegment() + "/" + getId(); + this.replaceContainerSpanName = "replaceContainer." + this.id; + this.deleteContainerSpanName = "deleteContainer." + this.id; + this.replaceThroughputSpanName = "replaceThroughput." + this.id; + this.readThroughputSpanName = "readThroughput." + this.id; + this.readContainerSpanName = "readContainer." + this.id; + this.readItemSpanName = "readItem." + this.id; + this.upsertItemSpanName = "upsertItem." + this.id; + this.deleteItemSpanName = "deleteItem." + this.id; + this.replaceItemSpanName = "replaceItem." + this.id; + this.createItemSpanName = "createItem." + this.id; + this.readAllItemsSpanName = "readAllItems." + this.id; + this.queryItemsSpanName = "queryItems." + this.id; + this.readAllConflictsSpanName = "readAllConflicts." + this.id; + this.queryConflictsSpanName = "queryConflicts." + this.id; } /** @@ -87,11 +118,8 @@ public Mono read() { * the read container or an error. */ public Mono read(CosmosContainerRequestOptions options) { - if (options == null) { - options = new CosmosContainerRequestOptions(); - } - return database.getDocClientWrapper().readCollection(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + final CosmosContainerRequestOptions requestOptions = options == null ? new CosmosContainerRequestOptions() : options; + return withContext(context -> read(requestOptions, context)); } /** @@ -106,11 +134,8 @@ public Mono read(CosmosContainerRequestOptions options) * the deleted database or an error. */ public Mono delete(CosmosContainerRequestOptions options) { - if (options == null) { - options = new CosmosContainerRequestOptions(); - } - return database.getDocClientWrapper().deleteCollection(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + final CosmosContainerRequestOptions requestOptions = options == null ? new CosmosContainerRequestOptions() : options; + return withContext(context -> deleteInternal(requestOptions, context)); } /** @@ -159,12 +184,8 @@ public Mono replace(CosmosContainerProperties container public Mono replace( CosmosContainerProperties containerProperties, CosmosContainerRequestOptions options) { - if (options == null) { - options = new CosmosContainerRequestOptions(); - } - return database.getDocClientWrapper() - .replaceCollection(ModelBridgeInternal.getV2Collection(containerProperties), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + final CosmosContainerRequestOptions requestOptions = options == null ? new CosmosContainerRequestOptions() : options; + return withContext(context -> replaceInternal(containerProperties, requestOptions, context)); } /* CosmosAsyncItem operations */ @@ -209,7 +230,6 @@ public Mono> createItem( return createItem(item, options); } - /** * Creates a Cosmos item. * @@ -222,6 +242,21 @@ public Mono> createItem(T item, CosmosItemRequestOptio if (options == null) { options = new CosmosItemRequestOptions(); } + + final CosmosItemRequestOptions requestOptions = options; + return withContext(context -> createItemInternal(item, requestOptions, context)); + } + + private Mono> createItemInternal(T item, CosmosItemRequestOptions options, Context context) { + Mono> responseMono = createItemInternal(item, options); + return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono, + context, + this.createItemSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono> createItemInternal(T item, CosmosItemRequestOptions options) { @SuppressWarnings("unchecked") Class itemType = (Class) item.getClass(); RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options); @@ -262,17 +297,8 @@ public Mono> upsertItem(T item) { * @return an {@link Mono} containing the single resource response with the upserted item or an error. */ public Mono> upsertItem(T item, CosmosItemRequestOptions options) { - if (options == null) { - options = new CosmosItemRequestOptions(); - } - @SuppressWarnings("unchecked") - Class itemType = (Class) item.getClass(); - return this.getDatabase().getDocClientWrapper() - .upsertDocument(this.getLink(), item, - ModelBridgeInternal.toRequestOptions(options), - true) - .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType, getItemDeserializer())) - .single(); + final CosmosItemRequestOptions requestOptions = options == null ? new CosmosItemRequestOptions() : options; + return withContext(context -> upsertItemInternal(item, requestOptions, context)); } /** @@ -306,6 +332,9 @@ CosmosPagedFlux readAllItems(Class classType) { */ CosmosPagedFlux readAllItems(CosmosQueryRequestOptions options, Class classType) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), + this.readAllItemsSpanName, + this.getDatabase().getClient().getServiceEndpoint(), database.getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return getDatabase().getDocClientWrapper().readDocuments(getLink(), options).map( response -> prepareFeedResponse(response, classType)); @@ -326,7 +355,7 @@ CosmosPagedFlux readAllItems(CosmosQueryRequestOptions options, Class * error. */ public CosmosPagedFlux queryItems(String query, Class classType) { - return queryItems(new SqlQuerySpec(query), classType); + return queryItemsInternal(new SqlQuerySpec(query), new CosmosQueryRequestOptions(), classType); } /** @@ -344,7 +373,7 @@ public CosmosPagedFlux queryItems(String query, Class classType) { * error. */ public CosmosPagedFlux queryItems(String query, CosmosQueryRequestOptions options, Class classType) { - return queryItems(new SqlQuerySpec(query), options, classType); + return queryItemsInternal(new SqlQuerySpec(query), options, classType); } /** @@ -361,7 +390,7 @@ public CosmosPagedFlux queryItems(String query, CosmosQueryRequestOptions * error. */ public CosmosPagedFlux queryItems(SqlQuerySpec querySpec, Class classType) { - return queryItems(querySpec, new CosmosQueryRequestOptions(), classType); + return queryItemsInternal(querySpec, new CosmosQueryRequestOptions(), classType); } /** @@ -383,13 +412,16 @@ public CosmosPagedFlux queryItems(SqlQuerySpec querySpec, CosmosQueryRequ } private CosmosPagedFlux queryItemsInternal( - SqlQuerySpec sqlQuerySpec, CosmosQueryRequestOptions cosmosQueryRequestOptions, Class classType) { + SqlQuerySpec sqlQuerySpec, CosmosQueryRequestOptions cosmosQueryRequestOptions, Class classType) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = this.queryItemsSpanName; + pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), spanName, + this.getDatabase().getClient().getServiceEndpoint(), database.getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, cosmosQueryRequestOptions); return getDatabase().getDocClientWrapper() - .queryDocuments(CosmosAsyncContainer.this.getLink(), sqlQuerySpec, cosmosQueryRequestOptions) - .map(response -> - prepareFeedResponse(response, classType)); + .queryDocuments(CosmosAsyncContainer.this.getLink(), sqlQuerySpec, cosmosQueryRequestOptions) + .map(response -> + prepareFeedResponse(response, classType)); }); } @@ -457,12 +489,7 @@ public Mono> readItem( ModelBridgeInternal.setPartitionKey(options, partitionKey); RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options); - - return this.getDatabase().getDocClientWrapper() - .readDocument(getItemLink(itemId), requestOptions) - // TODO: add a deserializer and pass down? - .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType, this.getItemDeserializer())) - .single(); + return withContext(context -> readItemInternal(itemId, requestOptions, itemType, context)); } /** @@ -504,11 +531,8 @@ public Mono> replaceItem( ModelBridgeInternal.setPartitionKey(options, partitionKey); @SuppressWarnings("unchecked") Class itemType = (Class) item.getClass(); - return this.getDatabase() - .getDocClientWrapper() - .replaceDocument(getItemLink(itemId), doc, ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType, getItemDeserializer())) - .single(); + final CosmosItemRequestOptions requestOptions = options; + return withContext(context -> replaceItemInternal(itemType, itemId, doc, requestOptions, context)); } /** @@ -544,11 +568,7 @@ public Mono> deleteItem( } ModelBridgeInternal.setPartitionKey(options, partitionKey); RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options); - return this.getDatabase() - .getDocClientWrapper() - .deleteDocument(getItemLink(itemId), requestOptions) - .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponseWithObjectType(response)) - .single(); + return withContext(context -> deleteItemInternal(itemId, requestOptions, context)); } private String getItemLink(String itemId) { @@ -584,11 +604,14 @@ public CosmosAsyncScripts getScripts() { */ public CosmosPagedFlux readAllConflicts(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), + this.readAllConflictsSpanName, + this.getDatabase().getClient().getServiceEndpoint(), database.getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return database.getDocClientWrapper().readConflicts(getLink(), options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosConflictPropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosConflictPropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); }); } @@ -613,11 +636,14 @@ public CosmosPagedFlux queryConflicts(String query) { */ public CosmosPagedFlux queryConflicts(String query, CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), + this.queryConflictsSpanName, + this.getDatabase().getClient().getServiceEndpoint(), database.getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return database.getDocClientWrapper().queryConflicts(getLink(), query, options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosConflictPropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosConflictPropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); }); } @@ -632,35 +658,13 @@ public CosmosAsyncConflict getConflict(String id) { } /** - * Replace the throughput provisioned for the current container. + * Replace the throughput. * * @param throughputProperties the throughput properties. * @return the mono containing throughput response. */ public Mono replaceThroughput(ThroughputProperties throughputProperties) { - return this.read() - .flatMap(response -> this.database.getDocClientWrapper() - .queryOffers(database.getOfferQuerySpecFromResourceId(response.getProperties() - .getResourceId()) - , new CosmosQueryRequestOptions()) - .single() - .flatMap(offerFeedResponse -> { - if (offerFeedResponse.getResults().isEmpty()) { - return Mono.error(BridgeInternal - .createCosmosException( - HttpConstants.StatusCodes.BADREQUEST, - "No offers found for the " + - "resource " + this.getId())); - } - - Offer existingOffer = offerFeedResponse.getResults().get(0); - Offer updatedOffer = - ModelBridgeInternal.updateOfferFromProperties(existingOffer, - throughputProperties); - return this.database.getDocClientWrapper() - .replaceOffer(updatedOffer) - .single(); - }).map(ModelBridgeInternal::createThroughputRespose)); + return withContext(context -> replaceThroughputInternal(throughputProperties, context)); } /** @@ -669,30 +673,9 @@ public Mono replaceThroughput(ThroughputProperties throughpu * @return the mono containing throughput response. */ public Mono readThroughput() { - return this.read() - .flatMap(response -> this.database.getDocClientWrapper() - .queryOffers(database.getOfferQuerySpecFromResourceId(response.getProperties() - .getResourceId()) - , new CosmosQueryRequestOptions()) - .single() - .flatMap(offerFeedResponse -> { - if (offerFeedResponse.getResults().isEmpty()) { - return Mono.error(BridgeInternal - .createCosmosException( - HttpConstants.StatusCodes.BADREQUEST, - "No offers found for the resource " - + this.getId())); - } - return this.database.getDocClientWrapper() - .readOffer(offerFeedResponse.getResults() - .get(0) - .getSelfLink()) - .single(); - }) - .map(ModelBridgeInternal::createThroughputRespose)); + return withContext(context -> readThroughputInternal(context)); } - /** * Gets the parent {@link CosmosAsyncDatabase} for the current container. * @@ -714,6 +697,179 @@ String getLink() { return this.link; } + private Mono> deleteItemInternal( + String itemId, + RequestOptions requestOptions, + Context context) { + Mono> responseMono = this.getDatabase() + .getDocClientWrapper() + .deleteDocument(getItemLink(itemId), requestOptions) + .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponseWithObjectType(response)) + .single(); + return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono, + context, + this.deleteItemSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono> replaceItemInternal( + Class itemType, + String itemId, + Document doc, + CosmosItemRequestOptions options, + Context context) { + Mono> responseMono = this.getDatabase() + .getDocClientWrapper() + .replaceDocument(getItemLink(itemId), doc, ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType, getItemDeserializer())) + .single(); + return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono, + context, this.replaceItemSpanName, database.getId(), database.getClient().getServiceEndpoint()); + } + + private Mono> upsertItemInternal(T item, CosmosItemRequestOptions options, Context context) { + @SuppressWarnings("unchecked") + Class itemType = (Class) item.getClass(); + Mono> responseMono = this.getDatabase().getDocClientWrapper() + .upsertDocument(this.getLink(), item, + ModelBridgeInternal.toRequestOptions(options), + true) + .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType, getItemDeserializer())) + .single(); + return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono, + context, + this.upsertItemSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono> readItemInternal( + String itemId, + RequestOptions requestOptions, Class itemType, + Context context) { + Mono> responseMono = this.getDatabase().getDocClientWrapper() + .readDocument(getItemLink(itemId), requestOptions) + .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType, getItemDeserializer())) + .single(); + return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono, + context, + this.readItemSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + Mono read(CosmosContainerRequestOptions options, Context context) { + Mono responseMono = database.getDocClientWrapper().readCollection(getLink(), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + this.readContainerSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono deleteInternal(CosmosContainerRequestOptions options, Context context) { + Mono responseMono = database.getDocClientWrapper().deleteCollection(getLink(), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + this.deleteContainerSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono replaceInternal(CosmosContainerProperties containerProperties, + CosmosContainerRequestOptions options, + Context context) { + Mono responseMono = database.getDocClientWrapper() + .replaceCollection(ModelBridgeInternal.getV2Collection(containerProperties), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + this.replaceContainerSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono readThroughputInternal(Context context) { + Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL); + Mono responseMono = readThroughputInternal(this.read(new CosmosContainerRequestOptions(), + nestedContext)); + return this.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + this.readThroughputSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono readThroughputInternal(Mono responseMono) { + return responseMono + .flatMap(response -> this.database.getDocClientWrapper() + .queryOffers(database.getOfferQuerySpecFromResourceId(response.getProperties() + .getResourceId()) + , new CosmosQueryRequestOptions()) + .single() + .flatMap(offerFeedResponse -> { + if (offerFeedResponse.getResults().isEmpty()) { + return Mono.error(BridgeInternal + .createCosmosException( + HttpConstants.StatusCodes.BADREQUEST, + "No offers found for the resource " + + this.getId())); + } + return this.database.getDocClientWrapper() + .readOffer(offerFeedResponse.getResults() + .get(0) + .getSelfLink()) + .single(); + }) + .map(ModelBridgeInternal::createThroughputRespose)); + } + + private Mono replaceThroughputInternal(ThroughputProperties throughputProperties, + Context context) { + Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL); + Mono responseMono = + replaceThroughputInternal(this.read(new CosmosContainerRequestOptions(), nestedContext), + throughputProperties); + return this.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + this.replaceThroughputSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono replaceThroughputInternal(Mono responseMono, + ThroughputProperties throughputProperties) { + return responseMono + .flatMap(response -> this.database.getDocClientWrapper() + .queryOffers(database.getOfferQuerySpecFromResourceId(response.getProperties() + .getResourceId()) + , new CosmosQueryRequestOptions()) + .single() + .flatMap(offerFeedResponse -> { + if (offerFeedResponse.getResults().isEmpty()) { + return Mono.error(BridgeInternal + .createCosmosException( + HttpConstants.StatusCodes.BADREQUEST, + "No offers found for the " + + "resource " + this.getId())); + } + + Offer existingOffer = offerFeedResponse.getResults().get(0); + Offer updatedOffer = + ModelBridgeInternal.updateOfferFromProperties(existingOffer, + throughputProperties); + return this.database.getDocClientWrapper() + .replaceOffer(updatedOffer) + .single(); + }).map(ModelBridgeInternal::createThroughputRespose)); + } + ItemDeserializer getItemDeserializer() { return getDatabase().getDocClientWrapper().getItemDeserializer(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java index 7d495d715b45..dc956b9318a4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java @@ -2,22 +2,24 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.Offer; import com.azure.cosmos.implementation.Paths; -import com.azure.cosmos.models.CosmosContainerResponse; -import com.azure.cosmos.models.CosmosDatabaseResponse; -import com.azure.cosmos.models.CosmosUserResponse; +import com.azure.cosmos.implementation.TracerProvider; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerRequestOptions; +import com.azure.cosmos.models.CosmosContainerResponse; import com.azure.cosmos.models.CosmosDatabaseRequestOptions; -import com.azure.cosmos.models.CosmosUserProperties; +import com.azure.cosmos.models.CosmosDatabaseResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosUserProperties; +import com.azure.cosmos.models.CosmosUserResponse; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.SqlParameter; import com.azure.cosmos.models.SqlQuerySpec; -import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.models.ThroughputResponse; import com.azure.cosmos.util.CosmosPagedFlux; @@ -28,6 +30,7 @@ import java.util.Collections; import java.util.List; +import static com.azure.core.util.FluxUtil.withContext; import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount; /** @@ -79,11 +82,8 @@ public Mono read() { * the read database or an error. */ public Mono read(CosmosDatabaseRequestOptions options) { - if (options == null) { - options = new CosmosDatabaseRequestOptions(); - } - return getDocClientWrapper().readDatabase(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosDatabaseResponse(response)).single(); + final CosmosDatabaseRequestOptions requestOptions = options == null ? new CosmosDatabaseRequestOptions() : options; + return withContext(context -> readInternal(requestOptions, context)); } /** @@ -110,11 +110,8 @@ public Mono delete() { * @return an {@link Mono} containing the single cosmos database response. */ public Mono delete(CosmosDatabaseRequestOptions options) { - if (options == null) { - options = new CosmosDatabaseRequestOptions(); - } - return getDocClientWrapper().deleteDatabase(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosDatabaseResponse(response)).single(); + final CosmosDatabaseRequestOptions requestOptions = options == null ? new CosmosDatabaseRequestOptions() : options; + return withContext(context -> deleteInternal(requestOptions, context)); } /* CosmosAsyncContainer operations */ @@ -194,13 +191,9 @@ public Mono createContainer( if (containerProperties == null) { throw new IllegalArgumentException("containerProperties"); } - if (options == null) { - options = new CosmosContainerRequestOptions(); - } - return getDocClientWrapper() - .createCollection(this.getLink(), ModelBridgeInternal.getV2Collection(containerProperties), - ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + + final CosmosContainerRequestOptions requestOptions = options == null ? new CosmosContainerRequestOptions() : options; + return withContext(context -> createContainerInternal(containerProperties, requestOptions, context)); } /** @@ -278,14 +271,15 @@ public Mono createContainer(String id, String partition public Mono createContainerIfNotExists( CosmosContainerProperties containerProperties) { CosmosAsyncContainer container = getContainer(containerProperties.getId()); - return createContainerIfNotExistsInternal(containerProperties, container, null); + return withContext(context -> createContainerIfNotExistsInternal(containerProperties, container, null, + context)); } /** * Creates a Cosmos container if it does not exist on the service. *

* The throughput setting will only be used if the specified container - * does not exist and therefor a new container will be created. + * does not exist and therefore a new container will be created. * * After subscription the operation will be performed. The {@link Mono} upon * successful completion will contain a cosmos container response with the @@ -303,7 +297,8 @@ Mono createContainerIfNotExists( CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput)); CosmosAsyncContainer container = getContainer(containerProperties.getId()); - return createContainerIfNotExistsInternal(containerProperties, container, options); + return withContext(context -> createContainerIfNotExistsInternal(containerProperties, container, options, + context)); } /** @@ -328,7 +323,8 @@ public Mono createContainerIfNotExists( CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); ModelBridgeInternal.setThroughputProperties(options, throughputProperties); CosmosAsyncContainer container = getContainer(containerProperties.getId()); - return createContainerIfNotExistsInternal(containerProperties, container, options); + return withContext(context -> createContainerIfNotExistsInternal(containerProperties, container, options, + context)); } /** @@ -345,16 +341,16 @@ public Mono createContainerIfNotExists( */ public Mono createContainerIfNotExists(String id, String partitionKeyPath) { CosmosAsyncContainer container = getContainer(id); - return createContainerIfNotExistsInternal(new CosmosContainerProperties(id, partitionKeyPath), - container, - null); + return withContext(context -> createContainerIfNotExistsInternal(new CosmosContainerProperties(id, + partitionKeyPath), container, null, + context)); } /** * Creates a Cosmos container if it does not exist on the service. *

- * The throughput setting will only be used if the specified container - * does not exist and a new container will be created. + * The throughput properties will only be used if the specified container + * does not exist and therefor a new container will be created. * * After subscription the operation will be performed. The {@link Mono} upon * successful completion will contain a cosmos container response with the @@ -362,25 +358,25 @@ public Mono createContainerIfNotExists(String id, Strin * * @param id the cosmos container id. * @param partitionKeyPath the partition key path. - * @param throughput the throughput for the container. + * @param throughputProperties the throughput properties for the container. * @return a {@link Mono} containing the cosmos container response with the * created container or an error. */ - Mono createContainerIfNotExists( + public Mono createContainerIfNotExists( String id, String partitionKeyPath, - int throughput) { + ThroughputProperties throughputProperties) { CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); - ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput)); + ModelBridgeInternal.setThroughputProperties(options, throughputProperties); CosmosAsyncContainer container = getContainer(id); - return createContainerIfNotExistsInternal(new CosmosContainerProperties(id, partitionKeyPath), container, - options); + return withContext(context -> createContainerIfNotExistsInternal(new CosmosContainerProperties(id, + partitionKeyPath), container, options, context)); } /** * Creates a Cosmos container if it does not exist on the service. *

- * The throughput properties will only be used if the specified container - * does not exist and therefor a new container will be created. + * The throughput setting will only be used if the specified container + * does not exist and a new container will be created. * * After subscription the operation will be performed. The {@link Mono} upon * successful completion will contain a cosmos container response with the @@ -388,33 +384,18 @@ Mono createContainerIfNotExists( * * @param id the cosmos container id. * @param partitionKeyPath the partition key path. - * @param throughputProperties the throughput properties for the container. + * @param throughput the throughput for the container. * @return a {@link Mono} containing the cosmos container response with the * created container or an error. */ - public Mono createContainerIfNotExists( + Mono createContainerIfNotExists( String id, String partitionKeyPath, - ThroughputProperties throughputProperties) { + int throughput) { CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); - ModelBridgeInternal.setThroughputProperties(options, throughputProperties); + ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput)); CosmosAsyncContainer container = getContainer(id); - return createContainerIfNotExistsInternal(new CosmosContainerProperties(id, partitionKeyPath), container, - options); - } - - private Mono createContainerIfNotExistsInternal( - CosmosContainerProperties containerProperties, CosmosAsyncContainer container, - CosmosContainerRequestOptions options) { - return container.read(options).onErrorResume(exception -> { - final Throwable unwrappedException = Exceptions.unwrap(exception); - if (unwrappedException instanceof CosmosException) { - final CosmosException cosmosException = (CosmosException) unwrappedException; - if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { - return createContainer(containerProperties, options); - } - } - return Mono.error(unwrappedException); - }); + return withContext(context -> createContainerIfNotExistsInternal(new CosmosContainerProperties(id, + partitionKeyPath), container, options, context)); } /** @@ -430,11 +411,14 @@ private Mono createContainerIfNotExistsInternal( */ public CosmosPagedFlux readAllContainers(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "readAllContainers." + this.getId(); + pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName, + this.getClient().getServiceEndpoint(), getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return getDocClientWrapper().readCollections(getLink(), options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosContainerPropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosContainerPropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); }); } @@ -464,7 +448,7 @@ public CosmosPagedFlux readAllContainers() { * obtained containers or an error. */ public CosmosPagedFlux queryContainers(String query) { - return queryContainers(new SqlQuerySpec(query)); + return queryContainersInternal(new SqlQuerySpec(query), new CosmosQueryRequestOptions()); } /** @@ -480,7 +464,7 @@ public CosmosPagedFlux queryContainers(String query) * obtained containers or an error. */ public CosmosPagedFlux queryContainers(String query, CosmosQueryRequestOptions options) { - return queryContainers(new SqlQuerySpec(query), options); + return queryContainersInternal(new SqlQuerySpec(query), options); } /** @@ -495,7 +479,7 @@ public CosmosPagedFlux queryContainers(String query, * obtained containers or an error. */ public CosmosPagedFlux queryContainers(SqlQuerySpec querySpec) { - return queryContainers(querySpec, new CosmosQueryRequestOptions()); + return queryContainersInternal(querySpec, new CosmosQueryRequestOptions()); } /** @@ -510,14 +494,9 @@ public CosmosPagedFlux queryContainers(SqlQuerySpec q * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the * obtained containers or an error. */ - public CosmosPagedFlux queryContainers(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); - return getDocClientWrapper().queryCollections(getLink(), querySpec, options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosContainerPropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); - }); + public CosmosPagedFlux queryContainers(SqlQuerySpec querySpec + , CosmosQueryRequestOptions options) { + return queryContainersInternal(querySpec, options); } /** @@ -530,8 +509,6 @@ public CosmosAsyncContainer getContainer(String id) { return new CosmosAsyncContainer(id, this); } - /** User operations **/ - /** * Creates a user After subscription the operation will be performed. The * {@link Mono} upon successful completion will contain a single resource @@ -543,11 +520,9 @@ public CosmosAsyncContainer getContainer(String id) { * created cosmos user or an error. */ public Mono createUser(CosmosUserProperties userProperties) { - return getDocClientWrapper().createUser(this.getLink(), ModelBridgeInternal.getV2User(userProperties), null) - .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + return withContext(context -> createUserInternal(userProperties, context)); } - /** * Upsert a user. Upsert will create a new user if it doesn't exist, or replace * the existing one if it does. After subscription the operation will be @@ -560,8 +535,7 @@ public Mono createUser(CosmosUserProperties userProperties) * upserted user or an error. */ public Mono upsertUser(CosmosUserProperties userProperties) { - return getDocClientWrapper().upsertUser(this.getLink(), ModelBridgeInternal.getV2User(userProperties), null) - .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + return withContext(context -> upsertUserInternal(userProperties, context)); } /** @@ -591,11 +565,14 @@ public CosmosPagedFlux readAllUsers() { */ CosmosPagedFlux readAllUsers(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "readAllUsers." + this.getId(); + pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName, + this.getClient().getServiceEndpoint(), getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return getDocClientWrapper().readUsers(getLink(), options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosUserPropertiesFromV2Results(response.getResults()), response - .getResponseHeaders())); + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosUserPropertiesFromV2Results(response.getResults()), response + .getResponseHeaders())); }); } @@ -627,7 +604,7 @@ public CosmosPagedFlux queryUsers(String query) { * obtained users or an error. */ public CosmosPagedFlux queryUsers(String query, CosmosQueryRequestOptions options) { - return queryUsers(new SqlQuerySpec(query), options); + return queryUsersInternal(new SqlQuerySpec(query), options); } /** @@ -642,7 +619,7 @@ public CosmosPagedFlux queryUsers(String query, CosmosQuer * obtained users or an error. */ public CosmosPagedFlux queryUsers(SqlQuerySpec querySpec) { - return queryUsers(querySpec, new CosmosQueryRequestOptions()); + return queryUsersInternal(querySpec, new CosmosQueryRequestOptions()); } /** @@ -658,13 +635,7 @@ public CosmosPagedFlux queryUsers(SqlQuerySpec querySpec) * obtained users or an error. */ public CosmosPagedFlux queryUsers(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); - return getDocClientWrapper().queryUsers(getLink(), querySpec, options) - .map(response -> BridgeInternal.createFeedResponseWithQueryMetrics( - ModelBridgeInternal.getCosmosUserPropertiesFromV2Results(response.getResults()), response.getResponseHeaders(), - ModelBridgeInternal.queryMetrics(response))); - }); + return queryUsersInternal(querySpec, options); } /** @@ -685,30 +656,7 @@ public CosmosAsyncUser getUser(String id) { * @return the mono. */ public Mono replaceThroughput(ThroughputProperties throughputProperties) { - return this.read() - .flatMap(response -> this.getDocClientWrapper() - .queryOffers(getOfferQuerySpecFromResourceId(response.getProperties().getResourceId()), - new CosmosQueryRequestOptions()) - .single() - .flatMap(offerFeedResponse -> { - if (offerFeedResponse.getResults().isEmpty()) { - return Mono.error(BridgeInternal - .createCosmosException( - HttpConstants.StatusCodes.BADREQUEST, - "No offers found for the " + - "resource " + this.getId())); - } - - Offer existingOffer = offerFeedResponse.getResults().get(0); - Offer updatedOffer = - ModelBridgeInternal.updateOfferFromProperties(existingOffer, - throughputProperties); - - return this.getDocClientWrapper() - .replaceOffer(updatedOffer) - .single(); - }) - .map(ModelBridgeInternal::createThroughputRespose)); + return withContext(context -> replaceThroughputInternal(throughputProperties, context)); } /** @@ -717,26 +665,7 @@ public Mono replaceThroughput(ThroughputProperties throughpu * @return the mono containing throughput response. */ public Mono readThroughput() { - return this.read() - .flatMap(response -> getDocClientWrapper() - .queryOffers(getOfferQuerySpecFromResourceId(response.getProperties().getResourceId()), - new CosmosQueryRequestOptions()) - .single() - .flatMap(offerFeedResponse -> { - if (offerFeedResponse.getResults().isEmpty()) { - return Mono.error(BridgeInternal - .createCosmosException( - HttpConstants.StatusCodes.BADREQUEST, - "No offers found for the " + - "resource " + this.getId())); - } - return getDocClientWrapper() - .readOffer(offerFeedResponse.getResults() - .get(0) - .getSelfLink()) - .single(); - }) - .map(ModelBridgeInternal::createThroughputRespose)); + return withContext(context -> readThroughputInternal(context)); } SqlQuerySpec getOfferQuerySpecFromResourceId(String resourceId) { @@ -768,4 +697,184 @@ String getLink() { return this.link; } + private CosmosPagedFlux queryContainersInternal(SqlQuerySpec querySpec + , CosmosQueryRequestOptions options) { + return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "queryContainers." + this.getId(); + pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName, + this.getClient().getServiceEndpoint(), getId()); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); + return getDocClientWrapper().queryCollections(getLink(), querySpec, options) + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosContainerPropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); + }); + } + + private CosmosPagedFlux queryUsersInternal(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { + return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "queryUsers." + this.getId(); + pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName, + this.getClient().getServiceEndpoint(), getId()); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); + return getDocClientWrapper().queryUsers(getLink(), querySpec, options) + .map(response -> BridgeInternal.createFeedResponseWithQueryMetrics( + ModelBridgeInternal.getCosmosUserPropertiesFromV2Results(response.getResults()), + response.getResponseHeaders(), + ModelBridgeInternal.queryMetrics(response))); + }); + } + + private Mono createContainerIfNotExistsInternal( + CosmosContainerProperties containerProperties, + CosmosAsyncContainer container, + CosmosContainerRequestOptions options, + Context context) { + String spanName = "createContainerIfNotExists." + containerProperties.getId(); + Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL); + final CosmosContainerRequestOptions requestOptions = options == null ? new CosmosContainerRequestOptions() : + options; + Mono responseMono = + container.read(requestOptions, nestedContext).onErrorResume(exception -> { + final Throwable unwrappedException = Exceptions.unwrap(exception); + if (unwrappedException instanceof CosmosException) { + final CosmosException cosmosException = (CosmosException) unwrappedException; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + return createContainerInternal(containerProperties, requestOptions, nestedContext); + } + } + return Mono.error(unwrappedException); + }); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + private Mono createContainerInternal( + CosmosContainerProperties containerProperties, + CosmosContainerRequestOptions options, + Context context) { + String spanName = "createContainer." + containerProperties.getId(); + Mono responseMono = getDocClientWrapper() + .createCollection(this.getLink(), ModelBridgeInternal.getV2Collection(containerProperties), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + Mono readInternal(CosmosDatabaseRequestOptions options, Context context) { + String spanName = "readDatabase." + this.getId(); + Mono responseMono = getDocClientWrapper().readDatabase(getLink(), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosDatabaseResponse(response)).single(); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + private Mono deleteInternal(CosmosDatabaseRequestOptions options, Context context) { + String spanName = "deleteDatabase." + this.getId(); + Mono responseMono = getDocClientWrapper().deleteDatabase(getLink(), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosDatabaseResponse(response)).single(); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + private Mono createUserInternal(CosmosUserProperties userProperties, Context context) { + String spanName = "createUser." + this.getId(); + Mono responseMono = getDocClientWrapper().createUser(this.getLink(), ModelBridgeInternal.getV2User(userProperties), null) + .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + private Mono upsertUserInternal(CosmosUserProperties userProperties, Context context) { + String spanName = "upsertUser." + this.getId(); + Mono responseMono = getDocClientWrapper().upsertUser(this.getLink(), ModelBridgeInternal.getV2User(userProperties), null) + .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, getId(), getClient().getServiceEndpoint()); + } + + private Mono replaceThroughputInternal(ThroughputProperties throughputProperties, Context context){ + String spanName = "replaceThroughput." + this.getId(); + Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL); + Mono responseMono = replaceThroughputInternal(this.readInternal(new CosmosDatabaseRequestOptions(), nestedContext), throughputProperties); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + private Mono replaceThroughputInternal(Mono responseMono, ThroughputProperties throughputProperties) { + return responseMono + .flatMap(response -> this.getDocClientWrapper() + .queryOffers(getOfferQuerySpecFromResourceId(response.getProperties().getResourceId()), + new CosmosQueryRequestOptions()) + .single() + .flatMap(offerFeedResponse -> { + if (offerFeedResponse.getResults().isEmpty()) { + return Mono.error(BridgeInternal + .createCosmosException( + HttpConstants.StatusCodes.BADREQUEST, + "No offers found for the " + + "resource " + this.getId())); + } + + Offer existingOffer = offerFeedResponse.getResults().get(0); + Offer updatedOffer = + ModelBridgeInternal.updateOfferFromProperties(existingOffer, + throughputProperties); + + return this.getDocClientWrapper() + .replaceOffer(updatedOffer) + .single(); + }) + .map(ModelBridgeInternal::createThroughputRespose)); + } + + private Mono readThroughputInternal(Context context){ + String spanName = "readThroughput." + this.getId(); + Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL); + Mono responseMono = readThroughputInternal(this.readInternal(new CosmosDatabaseRequestOptions(), nestedContext)); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + private Mono readThroughputInternal(Mono responseMono) { + return responseMono + .flatMap(response -> getDocClientWrapper() + .queryOffers(getOfferQuerySpecFromResourceId(response.getProperties().getResourceId()), + new CosmosQueryRequestOptions()) + .single() + .flatMap(offerFeedResponse -> { + if (offerFeedResponse.getResults().isEmpty()) { + return Mono.error(BridgeInternal + .createCosmosException( + HttpConstants.StatusCodes.BADREQUEST, + "No offers found for the " + + "resource " + this.getId())); + } + return getDocClientWrapper() + .readOffer(offerFeedResponse.getResults() + .get(0) + .getSelfLink()) + .single(); + }) + .map(ModelBridgeInternal::createThroughputRespose)); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncPermission.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncPermission.java index f01a52766820..5628f5a48a6b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncPermission.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncPermission.java @@ -2,13 +2,16 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Paths; -import com.azure.cosmos.models.CosmosPermissionResponse; import com.azure.cosmos.models.CosmosPermissionProperties; import com.azure.cosmos.models.CosmosPermissionRequestOptions; +import com.azure.cosmos.models.CosmosPermissionResponse; import com.azure.cosmos.models.ModelBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; + /** * Has methods to operate on a per-User Permission to access a specific resource */ @@ -56,11 +59,9 @@ public Mono read(CosmosPermissionRequestOptions option if (options == null) { options = new CosmosPermissionRequestOptions(); } - return cosmosUser.getDatabase() - .getDocClientWrapper() - .readPermission(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) - .single(); + + final CosmosPermissionRequestOptions requestOptions = options; + return withContext(context -> readInternal(requestOptions, context)); } /** @@ -79,13 +80,9 @@ public Mono replace(CosmosPermissionProperties permiss if (options == null) { options = new CosmosPermissionRequestOptions(); } - CosmosAsyncDatabase databaseContext = cosmosUser.getDatabase(); - return databaseContext - .getDocClientWrapper() - .replacePermission(ModelBridgeInternal.getPermission(permissionProperties, databaseContext.getId()), - ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) - .single(); + + final CosmosPermissionRequestOptions requestOptions = options; + return withContext(context -> replaceInternal(permissionProperties, requestOptions, context)); } /** @@ -102,11 +99,9 @@ public Mono delete(CosmosPermissionRequestOptions opti if (options == null) { options = new CosmosPermissionRequestOptions(); } - return cosmosUser.getDatabase() - .getDocClientWrapper() - .deletePermission(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) - .single(); + + final CosmosPermissionRequestOptions requestOptions = options; + return withContext(context -> deleteInternal(requestOptions, context)); } String getURIPathSegment() { @@ -126,4 +121,51 @@ String getLink() { builder.append(getId()); return builder.toString(); } + + private Mono readInternal(CosmosPermissionRequestOptions options, Context context) { + + String spanName = "readPermission." + cosmosUser.getId(); + Mono responseMono = cosmosUser.getDatabase() + .getDocClientWrapper() + .readPermission(getLink(), ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) + .single(); + return cosmosUser.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + cosmosUser.getDatabase().getId(), + cosmosUser.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono replaceInternal(CosmosPermissionProperties permissionProperties, + CosmosPermissionRequestOptions options, + Context context) { + + String spanName = "replacePermission." + cosmosUser.getId(); + CosmosAsyncDatabase databaseContext = cosmosUser.getDatabase(); + Mono responseMono = cosmosUser.getDatabase() + .getDocClientWrapper() + .replacePermission(ModelBridgeInternal.getPermission(permissionProperties, databaseContext.getId()), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) + .single(); + return cosmosUser.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + cosmosUser.getDatabase().getId(), + cosmosUser.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono deleteInternal(CosmosPermissionRequestOptions options, + Context context) { + + String spanName = "deletePermission." + cosmosUser.getId(); + Mono responseMono = cosmosUser.getDatabase() + .getDocClientWrapper() + .deletePermission(getLink(), ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) + .single(); + return cosmosUser.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + cosmosUser.getDatabase().getId(), + cosmosUser.getDatabase().getClient().getServiceEndpoint()); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncScripts.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncScripts.java index 7ea5249b8b70..7079baa3e225 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncScripts.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncScripts.java @@ -2,23 +2,25 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.StoredProcedure; import com.azure.cosmos.implementation.Trigger; import com.azure.cosmos.implementation.UserDefinedFunction; -import com.azure.cosmos.models.CosmosStoredProcedureResponse; -import com.azure.cosmos.models.CosmosTriggerResponse; -import com.azure.cosmos.models.CosmosUserDefinedFunctionResponse; +import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.CosmosStoredProcedureProperties; import com.azure.cosmos.models.CosmosStoredProcedureRequestOptions; +import com.azure.cosmos.models.CosmosStoredProcedureResponse; import com.azure.cosmos.models.CosmosTriggerProperties; +import com.azure.cosmos.models.CosmosTriggerResponse; import com.azure.cosmos.models.CosmosUserDefinedFunctionProperties; -import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosUserDefinedFunctionResponse; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.util.CosmosPagedFlux; import com.azure.cosmos.util.UtilBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount; /** @@ -71,10 +73,8 @@ public Mono createStoredProcedure( StoredProcedure sProc = new StoredProcedure(); sProc.setId(properties.getId()); sProc.setBody(properties.getBody()); - return database.getDocClientWrapper() - .createStoredProcedure(container.getLink(), sProc, ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) - .single(); + final CosmosStoredProcedureRequestOptions requestOptions = options; + return withContext(context -> createStoredProcedureInternal(sProc, requestOptions, context)); } /** @@ -108,6 +108,11 @@ public CosmosPagedFlux readAllStoredProcedures( */ CosmosPagedFlux readAllStoredProcedures(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "readAllStoredProcedures." + this.container.getId(); + pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(), + spanName, + this.container.getDatabase().getClient().getServiceEndpoint(), + this.container.getDatabase().getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return database.getDocClientWrapper() .readStoredProcedures(container.getLink(), options) @@ -132,8 +137,8 @@ CosmosPagedFlux readAllStoredProcedures(CosmosQ */ public CosmosPagedFlux queryStoredProcedures( String query, - CosmosQueryRequestOptions options) { - return queryStoredProcedures(new SqlQuerySpec(query), options); + CosmosQueryRequestOptions options) { + return queryStoredProceduresInternal(new SqlQuerySpec(query), options); } /** @@ -152,14 +157,7 @@ public CosmosPagedFlux queryStoredProcedures( public CosmosPagedFlux queryStoredProcedures( SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); - return database.getDocClientWrapper() - .queryStoredProcedures(container.getLink(), querySpec, options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosStoredProcedurePropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); - }); + return queryStoredProceduresInternal(querySpec, options); } /** @@ -172,7 +170,6 @@ public CosmosAsyncStoredProcedure getStoredProcedure(String id) { return new CosmosAsyncStoredProcedure(id, this.container); } - /* UDF Operations */ /** @@ -191,10 +188,7 @@ public Mono createUserDefinedFunction( UserDefinedFunction udf = new UserDefinedFunction(); udf.setId(properties.getId()); udf.setBody(properties.getBody()); - - return database.getDocClientWrapper() - .createUserDefinedFunction(container.getLink(), udf, null) - .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)).single(); + return withContext(context -> createUserDefinedFunctionInternal(udf, context)); } /** @@ -226,12 +220,17 @@ public CosmosPagedFlux readAllUserDefinedFu */ CosmosPagedFlux readAllUserDefinedFunctions(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "readAllUserDefinedFunctions." + this.container.getId(); + pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(), + spanName, + this.container.getDatabase().getClient().getServiceEndpoint(), + this.container.getDatabase().getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return database.getDocClientWrapper() - .readUserDefinedFunctions(container.getLink(), options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosUserDefinedFunctionPropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); + .readUserDefinedFunctions(container.getLink(), options) + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosUserDefinedFunctionPropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); }); } @@ -272,14 +271,7 @@ public CosmosPagedFlux queryUserDefinedFunc public CosmosPagedFlux queryUserDefinedFunctions( SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); - return database.getDocClientWrapper() - .queryUserDefinedFunctions(container.getLink(), querySpec, options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosUserDefinedFunctionPropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); - }); + return queryUserDefinedFunctionsInternal(querySpec, options); } /** @@ -305,12 +297,7 @@ public CosmosAsyncUserDefinedFunction getUserDefinedFunction(String id) { * @return an {@link Mono} containing the single resource response with the created trigger or an error. */ public Mono createTrigger(CosmosTriggerProperties properties) { - Trigger trigger = new Trigger(ModelBridgeInternal.toJsonFromJsonSerializable(ModelBridgeInternal.getResource(properties))); - - return database.getDocClientWrapper() - .createTrigger(container.getLink(), trigger, null) - .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) - .single(); + return withContext(context -> createTriggerInternal(properties, context)); } /** @@ -344,6 +331,11 @@ public CosmosPagedFlux readAllTriggers() { */ CosmosPagedFlux readAllTriggers(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "readAllTriggers." + this.container.getId(); + pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(), + spanName, + this.container.getDatabase().getClient().getServiceEndpoint(), + this.container.getDatabase().getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return database.getDocClientWrapper() .readTriggers(container.getLink(), options) @@ -366,7 +358,7 @@ CosmosPagedFlux readAllTriggers(CosmosQueryRequestOptio * error. */ public CosmosPagedFlux queryTriggers(String query, CosmosQueryRequestOptions options) { - return queryTriggers(new SqlQuerySpec(query), options); + return queryTriggersInternal(false, new SqlQuerySpec(query), options); } /** @@ -384,14 +376,7 @@ public CosmosPagedFlux queryTriggers(String query, Cosm public CosmosPagedFlux queryTriggers( SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); - return database.getDocClientWrapper() - .queryTriggers(container.getLink(), querySpec, options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosTriggerPropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); - }); + return queryTriggersInternal(true, querySpec, options); } /** @@ -404,4 +389,120 @@ public CosmosAsyncTrigger getTrigger(String id) { return new CosmosAsyncTrigger(id, this.container); } + private CosmosPagedFlux queryStoredProceduresInternal( + SqlQuerySpec querySpec, + CosmosQueryRequestOptions options) { + return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "queryStoredProcedures." + this.container.getId(); + pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(), + spanName, + this.container.getDatabase().getClient().getServiceEndpoint(), + this.container.getDatabase().getId()); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); + return database.getDocClientWrapper() + .queryStoredProcedures(container.getLink(), querySpec, options) + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosStoredProcedurePropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); + }); + } + + private CosmosPagedFlux queryUserDefinedFunctionsInternal( + SqlQuerySpec querySpec, + CosmosQueryRequestOptions options) { + return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "queryUserDefinedFunctions." + this.container.getId(); + pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(), + spanName, + this.container.getDatabase().getClient().getServiceEndpoint(), + this.container.getDatabase().getId()); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); + return database.getDocClientWrapper() + .queryUserDefinedFunctions(container.getLink(), querySpec, options) + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosUserDefinedFunctionPropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); + }); + } + + private CosmosPagedFlux queryTriggersInternal( + boolean isParameterised, + SqlQuerySpec querySpec, + CosmosQueryRequestOptions options) { + return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName; + if (isParameterised) { + spanName = "queryTriggers." + this.container.getId() + "." + querySpec.getQueryText(); + } else { + spanName = "queryTriggers." + this.container.getId(); + } + + pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(), + spanName, + this.container.getDatabase().getClient().getServiceEndpoint(), + this.container.getDatabase().getId()); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); + return database.getDocClientWrapper() + .queryTriggers(container.getLink(), querySpec, options) + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosTriggerPropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); + }); + } + + private Mono createStoredProcedureInternal(StoredProcedure sProc, + CosmosStoredProcedureRequestOptions options, + Context context) { + String spanName = "createStoredProcedure." + container.getId(); + Mono responseMono = createStoredProcedureInternal(sProc, options); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono createStoredProcedureInternal(StoredProcedure sProc, + CosmosStoredProcedureRequestOptions options) { + return database.getDocClientWrapper() + .createStoredProcedure(container.getLink(), sProc, ModelBridgeInternal.toRequestOptions(options)).map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) + .single(); + } + + private Mono createUserDefinedFunctionInternal( + UserDefinedFunction udf, + Context context) { + String spanName = "createUserDefinedFunction." + container.getId(); + Mono responseMono = createUserDefinedFunctionInternal(udf); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono createUserDefinedFunctionInternal( + UserDefinedFunction udf) { + return database.getDocClientWrapper() + .createUserDefinedFunction(container.getLink(), udf, null).map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)).single(); + } + + private Mono createTriggerInternal(CosmosTriggerProperties properties, Context context) { + String spanName = "createTrigger." + container.getId(); + Mono responseMono = createTriggerInternal(properties); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono createTriggerInternal(CosmosTriggerProperties properties) { + Trigger trigger = new Trigger(ModelBridgeInternal.toJsonFromJsonSerializable(ModelBridgeInternal.getResource(properties))); + return database.getDocClientWrapper() + .createTrigger(container.getLink(), trigger, null) + .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) + .single(); + } + } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncStoredProcedure.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncStoredProcedure.java index 8e0a51768818..788aa4befa9e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncStoredProcedure.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncStoredProcedure.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Paths; import com.azure.cosmos.implementation.StoredProcedure; import com.azure.cosmos.models.CosmosStoredProcedureResponse; @@ -10,6 +11,7 @@ import com.azure.cosmos.models.ModelBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; import java.util.List; /** @@ -72,12 +74,7 @@ public Mono read() { * @return an {@link Mono} containing the single resource response with the read stored procedure or an error. */ public Mono read(CosmosStoredProcedureRequestOptions options) { - if (options == null) { - options = new CosmosStoredProcedureRequestOptions(); - } - return cosmosContainer.getDatabase().getDocClientWrapper().readStoredProcedure(getLink(), - ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)).single(); + return withContext(context -> readInternal(options, context)); } /** @@ -106,14 +103,7 @@ public Mono delete() { * @return an {@link Mono} containing the single resource response for the deleted stored procedure or an error. */ public Mono delete(CosmosStoredProcedureRequestOptions options) { - if (options == null) { - options = new CosmosStoredProcedureRequestOptions(); - } - return cosmosContainer.getDatabase() - .getDocClientWrapper() - .deleteStoredProcedure(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) - .single(); + return withContext(context -> deleteInternal(options, context)); } /** @@ -129,15 +119,8 @@ public Mono delete(CosmosStoredProcedureRequestOp * @return an {@link Mono} containing the single resource response with the stored procedure response or an error. */ public Mono execute(List procedureParams, - CosmosStoredProcedureRequestOptions options) { - if (options == null) { - options = new CosmosStoredProcedureRequestOptions(); - } - return cosmosContainer.getDatabase() - .getDocClientWrapper() - .executeStoredProcedure(getLink(), ModelBridgeInternal.toRequestOptions(options), procedureParams) - .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) - .single(); + CosmosStoredProcedureRequestOptions options) { + return withContext(context -> executeInternal(procedureParams, options, context)); } /** @@ -168,17 +151,9 @@ public Mono replace(CosmosStoredProcedureProperti * @return an {@link Mono} containing the single resource response with the replaced stored procedure or an error. */ public Mono replace(CosmosStoredProcedureProperties storedProcedureProperties, - CosmosStoredProcedureRequestOptions options) { - if (options == null) { - options = new CosmosStoredProcedureRequestOptions(); - } - return cosmosContainer.getDatabase() - .getDocClientWrapper() - .replaceStoredProcedure(new StoredProcedure(ModelBridgeInternal.toJsonFromJsonSerializable( - ModelBridgeInternal.getResource(storedProcedureProperties))), - ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) - .single(); + CosmosStoredProcedureRequestOptions options) { + return withContext(context -> replaceInternal(storedProcedureProperties, options, + context)); } String getURIPathSegment() { @@ -198,4 +173,74 @@ String getLink() { builder.append(getId()); return builder.toString(); } + + private Mono readInternal(CosmosStoredProcedureRequestOptions options, + Context context) { + if (options == null) { + options = new CosmosStoredProcedureRequestOptions(); + } + + String spanName = "readStoredProcedure." + cosmosContainer.getId(); + Mono responseMono = cosmosContainer.getDatabase().getDocClientWrapper().readStoredProcedure(getLink(), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)).single(); + return this.cosmosContainer.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + cosmosContainer.getDatabase().getId(), + cosmosContainer.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono deleteInternal(CosmosStoredProcedureRequestOptions options, + Context context) { + if (options == null) { + options = new CosmosStoredProcedureRequestOptions(); + } + + String spanName = "deleteStoredProcedure." + cosmosContainer.getId(); + Mono responseMono = cosmosContainer.getDatabase() + .getDocClientWrapper() + .deleteStoredProcedure(getLink(), ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) + .single(); + return this.cosmosContainer.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + cosmosContainer.getDatabase().getId(), + cosmosContainer.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono executeInternal(List procedureParams, + CosmosStoredProcedureRequestOptions options, + Context context) { + if (options == null) { + options = new CosmosStoredProcedureRequestOptions(); + } + + String spanName = "executeStoredProcedure." + cosmosContainer.getId(); + Mono responseMono = cosmosContainer.getDatabase() + .getDocClientWrapper() + .executeStoredProcedure(getLink(), ModelBridgeInternal.toRequestOptions(options), procedureParams) + .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) + .single(); + return this.cosmosContainer.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, spanName, cosmosContainer.getDatabase().getId(), cosmosContainer.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono replaceInternal(CosmosStoredProcedureProperties storedProcedureSettings, + CosmosStoredProcedureRequestOptions options, + Context context) { + if (options == null) { + options = new CosmosStoredProcedureRequestOptions(); + } + + String spanName = "replaceStoredProcedure." + cosmosContainer.getId(); + Mono responseMono = cosmosContainer.getDatabase() + .getDocClientWrapper() + .replaceStoredProcedure(new StoredProcedure(ModelBridgeInternal.toJsonFromJsonSerializable( + ModelBridgeInternal.getResource(storedProcedureSettings))), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) + .single(); + return this.cosmosContainer.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, spanName, cosmosContainer.getDatabase().getId(), cosmosContainer.getDatabase().getClient().getServiceEndpoint()); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncTrigger.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncTrigger.java index ae06eb230079..cba93102874d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncTrigger.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncTrigger.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Paths; import com.azure.cosmos.implementation.Trigger; import com.azure.cosmos.models.CosmosTriggerResponse; @@ -9,6 +10,8 @@ import com.azure.cosmos.models.ModelBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; + /** * The type Cosmos async trigger. This contains methods to operate on a cosmos trigger asynchronously */ @@ -52,14 +55,9 @@ CosmosAsyncTrigger setId(String id) { * @return an {@link Mono} containing the single resource response for the read cosmos trigger or an error. */ public Mono read() { - return container.getDatabase() - .getDocClientWrapper() - .readTrigger(getLink(), null) - .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) - .single(); + return withContext(context -> readInternal(context)); } - /** * Replaces a cosmos trigger. *

@@ -71,12 +69,7 @@ public Mono read() { * @return an {@link Mono} containing the single resource response with the replaced cosmos trigger or an error. */ public Mono replace(CosmosTriggerProperties triggerProperties) { - return container.getDatabase() - .getDocClientWrapper() - .replaceTrigger(new Trigger(ModelBridgeInternal.toJsonFromJsonSerializable( - ModelBridgeInternal.getResource(triggerProperties))), null) - .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) - .single(); + return withContext(context -> replaceInternal(triggerProperties, context)); } /** @@ -89,11 +82,7 @@ public Mono replace(CosmosTriggerProperties triggerProper * @return an {@link Mono} containing the single resource response for the deleted cosmos trigger or an error. */ public Mono delete() { - return container.getDatabase() - .getDocClientWrapper() - .deleteTrigger(getLink(), null) - .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) - .single(); + return withContext(context -> deleteInternal(context)); } String getURIPathSegment() { @@ -114,4 +103,46 @@ String getLink() { return builder.toString(); } + private Mono readInternal(Context context) { + String spanName = "readTrigger." + container.getId(); + Mono responseMono = container.getDatabase() + .getDocClientWrapper() + .readTrigger(getLink(), null) + .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) + .single(); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + container.getDatabase().getId(), + container.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono replaceInternal(CosmosTriggerProperties triggerSettings, Context context) { + String spanName = "replaceTrigger." + container.getId(); + Mono responseMono = container.getDatabase() + .getDocClientWrapper() + .replaceTrigger(new Trigger(ModelBridgeInternal.toJsonFromJsonSerializable( + ModelBridgeInternal.getResource(triggerSettings))), null) + .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) + .single(); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + container.getDatabase().getId(), + container.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono deleteInternal(Context context) { + String spanName = "deleteTrigger." + container.getId(); + Mono responseMono = container.getDatabase() + .getDocClientWrapper() + .deleteTrigger(getLink(), null) + .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) + .single(); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + container.getDatabase().getId(), + container.getDatabase().getClient().getServiceEndpoint()); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUser.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUser.java index d23e239e2969..ea879a5ff899 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUser.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUser.java @@ -3,7 +3,9 @@ package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Paths; +import com.azure.cosmos.implementation.Permission; import com.azure.cosmos.models.CosmosPermissionResponse; import com.azure.cosmos.models.CosmosUserResponse; import com.azure.cosmos.models.CosmosPermissionProperties; @@ -11,11 +13,11 @@ import com.azure.cosmos.models.CosmosUserProperties; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.ModelBridgeInternal; -import com.azure.cosmos.implementation.Permission; import com.azure.cosmos.util.CosmosPagedFlux; import com.azure.cosmos.util.UtilBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount; /** @@ -56,9 +58,7 @@ CosmosAsyncUser setId(String id) { * @return a {@link Mono} containing the single resource response with the read user or an error. */ public Mono read() { - return this.database.getDocClientWrapper() - .readUser(getLink(), null) - .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + return withContext(context -> readInternal(context)); } /** @@ -68,9 +68,7 @@ public Mono read() { * @return a {@link Mono} containing the single resource response with the replaced user or an error. */ public Mono replace(CosmosUserProperties userProperties) { - return this.database.getDocClientWrapper() - .replaceUser(ModelBridgeInternal.getV2User(userProperties), null) - .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + return withContext(context -> replaceInternal(userProperties, context)); } /** @@ -79,9 +77,7 @@ public Mono replace(CosmosUserProperties userProperties) { * @return a {@link Mono} containing the single resource response with the deleted user or an error. */ public Mono delete() { - return this.database.getDocClientWrapper() - .deleteUser(getLink(), null) - .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + return withContext(context -> deleteInternal(context)); } /** @@ -98,14 +94,9 @@ public Mono delete() { public Mono createPermission( CosmosPermissionProperties permissionProperties, CosmosPermissionRequestOptions options) { - if (options == null) { - options = new CosmosPermissionRequestOptions(); - } + final CosmosPermissionRequestOptions requestOptions = options == null ? new CosmosPermissionRequestOptions() : options; Permission permission = ModelBridgeInternal.getPermission(permissionProperties, database.getId()); - return database.getDocClientWrapper() - .createPermission(getLink(), permission, ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) - .single(); + return withContext(context -> createPermissionInternal(permission, requestOptions, context)); } /** @@ -123,13 +114,8 @@ public Mono upsertPermission( CosmosPermissionProperties permissionProperties, CosmosPermissionRequestOptions options) { Permission permission = ModelBridgeInternal.getPermission(permissionProperties, database.getId()); - if (options == null) { - options = new CosmosPermissionRequestOptions(); - } - return database.getDocClientWrapper() - .upsertPermission(getLink(), permission, ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) - .single(); + final CosmosPermissionRequestOptions requestOptions = options == null ? new CosmosPermissionRequestOptions() : options; + return withContext(context -> upsertPermissionInternal(permission, requestOptions, context)); } @@ -160,6 +146,11 @@ public CosmosPagedFlux readAllPermissions() { */ CosmosPagedFlux readAllPermissions(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "readAllPermissions." + this.getId(); + pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), + spanName, + this.getDatabase().getClient().getServiceEndpoint(), + this.getDatabase().getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return getDatabase().getDocClientWrapper() .readPermissions(getLink(), options) @@ -198,6 +189,11 @@ public CosmosPagedFlux queryPermissions(String query */ public CosmosPagedFlux queryPermissions(String query, CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "queryPermissions." + this.getId(); + pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), + spanName, + this.getDatabase().getClient().getServiceEndpoint(), + this.getDatabase().getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return getDatabase().getDocClientWrapper() .queryPermissions(getLink(), query, options) @@ -243,4 +239,67 @@ String getLink() { CosmosAsyncDatabase getDatabase() { return database; } + + private Mono readInternal(Context context) { + String spanName = "readUser." + getId(); + Mono responseMono = this.database.getDocClientWrapper() + .readUser(getLink(), null) + .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono replaceInternal(CosmosUserProperties userSettings, Context context) { + String spanName = "replaceUser." + getId(); + Mono responseMono = this.database.getDocClientWrapper() + .replaceUser(ModelBridgeInternal.getV2User(userSettings), null) + .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono deleteInternal(Context context) { + String spanName = "deleteUser." + getId(); + Mono responseMono = this.database.getDocClientWrapper() + .deleteUser(getLink(), null) + .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono createPermissionInternal( + Permission permission, + CosmosPermissionRequestOptions options, + Context context) { + String spanName = "createPermission." + getId(); + Mono responseMono = database.getDocClientWrapper() + .createPermission(getLink(), permission, ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) + .single(); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono upsertPermissionInternal( + Permission permission, + CosmosPermissionRequestOptions options, + Context context) { + String spanName = "upsertPermission." + getId(); + Mono responseMono = database.getDocClientWrapper() + .upsertPermission(getLink(), permission, ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) + .single(); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUserDefinedFunction.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUserDefinedFunction.java index 7710818dd0fb..457f912bc497 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUserDefinedFunction.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUserDefinedFunction.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Paths; import com.azure.cosmos.implementation.UserDefinedFunction; import com.azure.cosmos.models.CosmosUserDefinedFunctionResponse; @@ -9,6 +10,8 @@ import com.azure.cosmos.models.ModelBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; + /** * The type Cosmos async user defined function. */ @@ -54,8 +57,7 @@ CosmosAsyncUserDefinedFunction setId(String id) { * @return an {@link Mono} containing the single resource response for the read user defined function or an error. */ public Mono read() { - return container.getDatabase().getDocClientWrapper().readUserDefinedFunction(getLink(), null) - .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)).single(); + return withContext(context -> readInternal(context)); } /** @@ -71,12 +73,7 @@ public Mono read() { * or an error. */ public Mono replace(CosmosUserDefinedFunctionProperties udfSettings) { - return container.getDatabase() - .getDocClientWrapper() - .replaceUserDefinedFunction(new UserDefinedFunction(ModelBridgeInternal.toJsonFromJsonSerializable( - ModelBridgeInternal.getResource(udfSettings))), null) - .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)) - .single(); + return withContext(context -> replaceInternal(udfSettings, context)); } /** @@ -91,11 +88,7 @@ public Mono replace(CosmosUserDefinedFunction * an error. */ public Mono delete() { - return container.getDatabase() - .getDocClientWrapper() - .deleteUserDefinedFunction(this.getLink(), null) - .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)) - .single(); + return withContext(context -> deleteInternal(context)); } String getURIPathSegment() { @@ -115,4 +108,45 @@ String getLink() { builder.append(getId()); return builder.toString(); } + + private Mono readInternal(Context context) { + String spanName = "readUserDefinedFunction." + container.getId(); + Mono responseMono = container.getDatabase().getDocClientWrapper().readUserDefinedFunction(getLink(), null) + .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)).single(); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + container.getDatabase().getId(), + container.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono replaceInternal(CosmosUserDefinedFunctionProperties udfSettings, + Context context) { + String spanName = "replaceUserDefinedFunction." + container.getId(); + Mono responseMono = container.getDatabase() + .getDocClientWrapper() + .replaceUserDefinedFunction(new UserDefinedFunction(ModelBridgeInternal.toJsonFromJsonSerializable( + ModelBridgeInternal.getResource(udfSettings))), null) + .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)) + .single(); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + container.getDatabase().getId(), + container.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono deleteInternal(Context context) { + String spanName = "deleteUserDefinedFunction." + container.getId(); + Mono responseMono = container.getDatabase() + .getDocClientWrapper() + .deleteUserDefinedFunction(this.getLink(), null) + .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)) + .single(); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + container.getDatabase().getId(), + container.getDatabase().getClient().getServiceEndpoint()); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosPagedFluxOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosPagedFluxOptions.java index c4407b68bfee..28b4054180dc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosPagedFluxOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosPagedFluxOptions.java @@ -5,6 +5,8 @@ import com.azure.cosmos.util.CosmosPagedFlux; +import java.util.Map; + /** * Specifies paging options for Cosmos Paged Flux implementation. * @see CosmosPagedFlux @@ -13,6 +15,11 @@ public class CosmosPagedFluxOptions { private String requestContinuation; private Integer maxItemCount; + private TracerProvider tracerProvider; + private String tracerSpanName; + private String databaseId; + private String serviceEndpoint; + public CosmosPagedFluxOptions() {} @@ -57,4 +64,43 @@ public CosmosPagedFluxOptions setMaxItemCount(Integer maxItemCount) { this.maxItemCount = maxItemCount; return this; } + + /** + * Gets the tracer provider + * @return tracerProvider + */ + public TracerProvider getTracerProvider() { + return this.tracerProvider; + } + + /** + * Gets the tracer span name + * @return tracerSpanName + */ + public String getTracerSpanName() { + return tracerSpanName; + } + + /** + * Gets the databaseId + * @return databaseId + */ + public String getDatabaseId() { + return databaseId; + } + + /** + * Gets the service end point + * @return serviceEndpoint + */ + public String getServiceEndpoint() { + return serviceEndpoint; + } + + public void setTracerInformation(TracerProvider tracerProvider, String tracerSpanName, String serviceEndpoint, String databaseId) { + this.databaseId = databaseId; + this.serviceEndpoint = serviceEndpoint; + this.tracerSpanName = tracerSpanName; + this.tracerProvider = tracerProvider; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index b14fce7394f8..70df4dfc7a0f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -265,6 +265,7 @@ public static class Versions { } public static class StatusCodes { + public static final int OK = 200; public static final int NOT_MODIFIED = 304; // Client error public static final int MINIMUM_STATUSCODE_AS_ERROR_GATEWAY = 400; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java new file mode 100644 index 000000000000..87c51d14412a --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java @@ -0,0 +1,153 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation; + +import com.azure.core.util.Context; +import com.azure.core.util.tracing.Tracer; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosResponse; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Signal; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; + +public class TracerProvider { + private Tracer tracer; + public final static String DB_TYPE_VALUE = "Cosmos"; + public final static String DB_TYPE = "db.type"; + public final static String DB_INSTANCE = "db.instance"; + public final static String DB_URL = "db.url"; + public static final String DB_STATEMENT = "db.statement"; + public static final String ERROR_MSG = "error.msg"; + public static final String ERROR_TYPE = "error.type"; + public static final String COSMOS_CALL_DEPTH = "cosmosCallDepth"; + public static final String COSMOS_CALL_DEPTH_VAL = "nested"; + public static final int ERROR_CODE = 0; + public static final String RESOURCE_PROVIDER_NAME = "Microsoft.DocumentDB"; + + public TracerProvider(Tracer tracer) { + this.tracer = tracer; + } + + public boolean isEnabled() { + return tracer != null; + } + + /** + * For each tracer plugged into the SDK a new tracing span is created. + *

+ * The {@code context} will be checked for containing information about a parent span. If a parent span is found the + * new span will be added as a child, otherwise the span will be created and added to the context and any downstream + * start calls will use the created span as the parent. + * + * @param context Additional metadata that is passed through the call stack. + * @return An updated context object. + */ + public Context startSpan(String methodName, String databaseId, String endpoint, Context context) { + Context local = Objects.requireNonNull(context, "'context' cannot be null."); + local = local.addData(AZ_TRACING_NAMESPACE_KEY, RESOURCE_PROVIDER_NAME); + local = tracer.start(methodName, local); // start the span and return the started span + if (databaseId != null) { + tracer.setAttribute(TracerProvider.DB_INSTANCE, databaseId, local); + } + + tracer.setAttribute(TracerProvider.DB_TYPE, DB_TYPE_VALUE, local); + tracer.setAttribute(TracerProvider.DB_URL, endpoint, local); + tracer.setAttribute(TracerProvider.DB_STATEMENT, methodName, local); + return local; + } + + /** + * Given a context containing the current tracing span the span is marked completed with status info from + * {@link Signal}. For each tracer plugged into the SDK the current tracing span is marked as completed. + * + * @param context Additional metadata that is passed through the call stack. + * @param signal The signal indicates the status and contains the metadata we need to end the tracing span. + */ + public > void endSpan(Context context, + Signal signal, + int statusCode) { + Objects.requireNonNull(context, "'context' cannot be null."); + Objects.requireNonNull(signal, "'signal' cannot be null."); + + switch (signal.getType()) { + case ON_COMPLETE: + end(statusCode, null, context); + break; + case ON_ERROR: + Throwable throwable = null; + if (signal.hasError()) { + // The last status available is on error, this contains the thrown error. + throwable = signal.getThrowable(); + + if (throwable instanceof CosmosException) { + CosmosException exception = (CosmosException) throwable; + statusCode = exception.getStatusCode(); + } + } + end(statusCode, throwable, context); + break; + default: + // ON_SUBSCRIBE and ON_NEXT don't have the information to end the span so just return. + break; + } + } + + public > Mono traceEnabledCosmosResponsePublisher(Mono resultPublisher, + Context context, + String spanName, + String databaseId, + String endpoint) { + return traceEnabledPublisher(resultPublisher, context, spanName, databaseId, endpoint, + (T response) -> response.getStatusCode()); + } + + public Mono> traceEnabledCosmosItemResponsePublisher(Mono> resultPublisher, + Context context, + String spanName, + String databaseId, + String endpoint) { + return traceEnabledPublisher(resultPublisher, context, spanName, databaseId, endpoint, + CosmosItemResponse::getStatusCode); + } + + public Mono traceEnabledPublisher(Mono resultPublisher, + Context context, + String spanName, + String databaseId, + String endpoint, + Function statusCodeFunc) { + final AtomicReference parentContext = new AtomicReference<>(Context.NONE); + Optional callDepth = context.getData(COSMOS_CALL_DEPTH); + final boolean isNestedCall = callDepth.isPresent(); + return resultPublisher + .doOnSubscribe(ignoredValue -> { + if (isEnabled() && !isNestedCall) { + parentContext.set(this.startSpan(spanName, databaseId, endpoint, + context)); + } + }).doOnSuccess(response -> { + if (isEnabled() && !isNestedCall) { + this.endSpan(parentContext.get(), Signal.complete(), statusCodeFunc.apply(response)); + } + }).doOnError(throwable -> { + if (isEnabled() && !isNestedCall) { + this.endSpan(parentContext.get(), Signal.error(throwable), ERROR_CODE); + } + }); + } + + private void end(int statusCode, Throwable throwable, Context context) { + if (throwable != null) { + tracer.setAttribute(TracerProvider.ERROR_MSG, throwable.getMessage(), context); + tracer.setAttribute(TracerProvider.ERROR_TYPE, throwable.getClass().getName(), context); + } + tracer.end(statusCode, throwable, context); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java index ca9f20352f3a..91b9c4af81d2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java @@ -3,13 +3,19 @@ package com.azure.cosmos.util; +import com.azure.core.util.Context; +import com.azure.core.util.FluxUtil; import com.azure.core.util.IterableStream; import com.azure.core.util.paging.ContinuablePagedFlux; import com.azure.cosmos.implementation.CosmosPagedFluxOptions; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.TracerProvider; import com.azure.cosmos.models.FeedResponse; import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; +import reactor.core.publisher.Signal; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; /** @@ -37,24 +43,21 @@ public final class CosmosPagedFlux extends ContinuablePagedFlux> byPage() { CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions(); - - return this.optionsFluxFunction.apply(cosmosPagedFluxOptions); + return FluxUtil.fluxContext(context -> byPage(cosmosPagedFluxOptions, context)); } @Override public Flux> byPage(String continuationToken) { CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions(); cosmosPagedFluxOptions.setRequestContinuation(continuationToken); - - return this.optionsFluxFunction.apply(cosmosPagedFluxOptions); + return FluxUtil.fluxContext(context -> byPage(cosmosPagedFluxOptions, context)); } @Override public Flux> byPage(int preferredPageSize) { CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions(); cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize); - - return this.optionsFluxFunction.apply(cosmosPagedFluxOptions); + return FluxUtil.fluxContext(context -> byPage(cosmosPagedFluxOptions, context)); } @Override @@ -62,8 +65,7 @@ public Flux> byPage(String continuationToken, int preferredPageS CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions(); cosmosPagedFluxOptions.setRequestContinuation(continuationToken); cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize); - - return this.optionsFluxFunction.apply(cosmosPagedFluxOptions); + return FluxUtil.fluxContext(context -> byPage(cosmosPagedFluxOptions, context)); } /** @@ -84,4 +86,25 @@ public void subscribe(CoreSubscriber coreSubscriber) { return Flux.fromIterable(elements); }).subscribe(coreSubscriber); } + + private Flux> byPage(CosmosPagedFluxOptions pagedFluxOptions, Context context) { + final AtomicReference parentContext = new AtomicReference<>(Context.NONE); + return this.optionsFluxFunction.apply(pagedFluxOptions).doOnSubscribe(ignoredValue -> { + if (pagedFluxOptions.getTracerProvider().isEnabled()) { + parentContext.set(pagedFluxOptions.getTracerProvider().startSpan(pagedFluxOptions.getTracerSpanName(), + pagedFluxOptions.getDatabaseId(), pagedFluxOptions.getServiceEndpoint(), + context)); + } + }).doOnComplete(() -> { + if (pagedFluxOptions.getTracerProvider().isEnabled()) { + pagedFluxOptions.getTracerProvider().endSpan(parentContext.get(), Signal.complete(), + HttpConstants.StatusCodes.OK); + } + }).doOnError(throwable -> { + if (pagedFluxOptions.getTracerProvider().isEnabled()) { + pagedFluxOptions.getTracerProvider().endSpan(parentContext.get(), Signal.error(throwable), + TracerProvider.ERROR_CODE); + } + }); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/module-info.java b/sdk/cosmos/azure-cosmos/src/main/java/module-info.java index 7b2c9380962a..58e2692dd923 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/module-info.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/module-info.java @@ -51,4 +51,5 @@ opens com.azure.cosmos.util to com.fasterxml.jackson.databind; uses com.azure.cosmos.implementation.guava25.base.PatternCompiler; + uses com.azure.core.util.tracing.Tracer; } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTracerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTracerTest.java new file mode 100644 index 000000000000..0ca321b280d6 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTracerTest.java @@ -0,0 +1,493 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos; + +import com.azure.core.util.Context; +import com.azure.core.util.tracing.Tracer; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.InternalObjectNode; +import com.azure.cosmos.implementation.LifeCycleUtils; +import com.azure.cosmos.implementation.TestConfigurations; +import com.azure.cosmos.implementation.TracerProvider; +import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosStoredProcedureProperties; +import com.azure.cosmos.models.CosmosTriggerProperties; +import com.azure.cosmos.models.CosmosUserDefinedFunctionProperties; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.ThroughputProperties; +import com.azure.cosmos.models.TriggerOperation; +import com.azure.cosmos.models.TriggerType; +import com.azure.cosmos.rx.TestSuiteBase; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +public class CosmosTracerTest extends TestSuiteBase { + private static final String ITEM_ID = "tracerDoc"; + CosmosAsyncClient client; + CosmosAsyncDatabase cosmosAsyncDatabase; + CosmosAsyncContainer cosmosAsyncContainer; + + @BeforeClass(groups = {"emulator"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + client = new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + .key(TestConfigurations.MASTER_KEY) + .directMode(DirectConnectionConfig.getDefaultConfig()) + .buildAsyncClient(); + cosmosAsyncDatabase = getSharedCosmosDatabase(client); + cosmosAsyncContainer = getSharedMultiPartitionCosmosContainer(client); + + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void cosmosAsyncClient() { + Tracer mockTracer = getMockTracer(); + TracerProvider tracerProvider = Mockito.spy(new TracerProvider(mockTracer)); + ReflectionUtils.setTracerProvider(client, tracerProvider); + int traceApiCounter = 1; + + TracerProviderCapture tracerProviderCapture = new TracerProviderCapture(); + Mockito.doAnswer(tracerProviderCapture).when(tracerProvider).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + client.createDatabaseIfNotExists(cosmosAsyncDatabase.getId(), ThroughputProperties.createManualThroughput(5000)).block(); + Context context = tracerProviderCapture.getResult(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "createDatabaseIfNotExists." + cosmosAsyncDatabase.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + client.readAllDatabases(new CosmosQueryRequestOptions()).byPage().single().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readAllDatabases", context, null, traceApiCounter, null); + traceApiCounter++; + + + String query = "select * from c where c.id = '" + cosmosAsyncDatabase.getId() + "'"; + client.queryDatabases(query, new CosmosQueryRequestOptions()).byPage().single().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "queryDatabases", context, null, traceApiCounter, null); + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void cosmosAsyncDatabase() { + Tracer mockTracer = getMockTracer(); + TracerProvider tracerProvider = Mockito.spy(new TracerProvider(mockTracer)); + ReflectionUtils.setTracerProvider(client, tracerProvider); + TracerProviderCapture tracerProviderCapture = new TracerProviderCapture(); + Mockito.doAnswer(tracerProviderCapture).when(tracerProvider).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + int traceApiCounter = 1; + + cosmosAsyncDatabase.createContainerIfNotExists(cosmosAsyncContainer.getId(), + "/pk", 5000).block(); + Context context = tracerProviderCapture.getResult(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "createContainerIfNotExists." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncDatabase.readAllUsers().byPage().single().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readAllUsers." + cosmosAsyncDatabase.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncDatabase.readAllContainers().byPage().single().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readAllContainers." + cosmosAsyncDatabase.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + String errorType = null; + try { + cosmosAsyncDatabase.readThroughput().block(); + } catch (CosmosException ex) { + errorType = ex.getClass().getName(); + } + + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readThroughput." + cosmosAsyncDatabase.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, errorType); + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void cosmosAsyncContainer() { + Tracer mockTracer = getMockTracer(); + TracerProvider tracerProvider = Mockito.spy(new TracerProvider(mockTracer)); + ReflectionUtils.setTracerProvider(client, tracerProvider); + TracerProviderCapture tracerProviderCapture = new TracerProviderCapture(); + Mockito.doAnswer(tracerProviderCapture).when(tracerProvider).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + int traceApiCounter = 1; + + cosmosAsyncContainer.read().block(); + Context context = tracerProviderCapture.getResult(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readContainer." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + try { + cosmosAsyncContainer.readThroughput().block(); + } catch (CosmosException ex) { + //do nothing + } + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readThroughput." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + InternalObjectNode item = new InternalObjectNode(); + item.setId(ITEM_ID); + cosmosAsyncContainer.createItem(item).block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "createItem." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncContainer.upsertItem(item, + new CosmosItemRequestOptions()).block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "upsertItem." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + InternalObjectNode node = cosmosAsyncContainer.readItem(ITEM_ID, PartitionKey.NONE, + InternalObjectNode.class).block().getItem(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readItem." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncContainer.deleteItem(ITEM_ID, PartitionKey.NONE).block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "deleteItem." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncContainer.readAllItems(new CosmosQueryRequestOptions(), CosmosItemRequestOptions.class).byPage().single().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readAllItems." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + String query = "select * from c where c.id = '" + ITEM_ID + "'"; + cosmosAsyncContainer.queryItems(query, new CosmosQueryRequestOptions(), CosmosItemRequestOptions.class).byPage().single().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "queryItems." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void cosmosAsyncScripts() { + Tracer mockTracer = getMockTracer(); + TracerProvider tracerProvider = Mockito.spy(new TracerProvider(mockTracer)); + ReflectionUtils.setTracerProvider(client, tracerProvider); + TracerProviderCapture tracerProviderCapture = new TracerProviderCapture(); + Mockito.doAnswer(tracerProviderCapture).when(tracerProvider).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + int traceApiCounter = 1; + + cosmosAsyncContainer.getScripts().readAllStoredProcedures(new CosmosQueryRequestOptions()).byPage().single().block(); + Context context = tracerProviderCapture.getResult(); + + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readAllStoredProcedures." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncContainer.getScripts().readAllTriggers(new CosmosQueryRequestOptions()).byPage().single().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readAllTriggers." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncContainer.getScripts().readAllUserDefinedFunctions(new CosmosQueryRequestOptions()).byPage().single().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readAllUserDefinedFunctions." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + CosmosUserDefinedFunctionProperties cosmosUserDefinedFunctionProperties = + getCosmosUserDefinedFunctionProperties(); + CosmosUserDefinedFunctionProperties resultUdf = + cosmosAsyncContainer.getScripts().createUserDefinedFunction(cosmosUserDefinedFunctionProperties).block().getProperties(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "createUserDefinedFunction." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncContainer.getScripts().getUserDefinedFunction(cosmosUserDefinedFunctionProperties.getId()).read().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readUserDefinedFunction." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosUserDefinedFunctionProperties.setBody("function() {var x = 15;}"); + cosmosAsyncContainer.getScripts().getUserDefinedFunction(resultUdf.getId()).replace(resultUdf).block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "replaceUserDefinedFunction." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncContainer.getScripts().readAllUserDefinedFunctions(new CosmosQueryRequestOptions()).byPage().single().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + traceApiCounter++; + + cosmosAsyncContainer.getScripts().getUserDefinedFunction(cosmosUserDefinedFunctionProperties.getId()).delete().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "deleteUserDefinedFunction." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + CosmosTriggerProperties cosmosTriggerProperties = getCosmosTriggerProperties(); + CosmosTriggerProperties resultTrigger = + cosmosAsyncContainer.getScripts().createTrigger(cosmosTriggerProperties).block().getProperties(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "createTrigger." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncContainer.getScripts().getTrigger(cosmosTriggerProperties.getId()).read().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readTrigger." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncContainer.getScripts().getTrigger(cosmosTriggerProperties.getId()).replace(resultTrigger).block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "replaceTrigger." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncContainer.getScripts().readAllTriggers(new CosmosQueryRequestOptions()).byPage().single().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + traceApiCounter++; + + cosmosAsyncContainer.getScripts().getTrigger(cosmosTriggerProperties.getId()).delete().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "deleteTrigger." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + CosmosStoredProcedureProperties procedureProperties = getCosmosStoredProcedureProperties(); + CosmosStoredProcedureProperties resultSproc = + cosmosAsyncContainer.getScripts().createStoredProcedure(procedureProperties).block().getProperties(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "createStoredProcedure." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncContainer.getScripts().getStoredProcedure(procedureProperties.getId()).read().block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readStoredProcedure." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncContainer.getScripts().getStoredProcedure(procedureProperties.getId()).replace(resultSproc).block(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "replaceStoredProcedure." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + cosmosAsyncContainer.getScripts().readAllStoredProcedures(new CosmosQueryRequestOptions()).byPage().single().block(); + + cosmosAsyncContainer.getScripts().getStoredProcedure(procedureProperties.getId()).delete().block(); + traceApiCounter++; + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "deleteStoredProcedure." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void tracerExceptionSpan() { + Tracer mockTracer = getMockTracer(); + TracerProvider tracerProvider = Mockito.spy(new TracerProvider(mockTracer)); + ReflectionUtils.setTracerProvider(client, tracerProvider); + int traceApiCounter = 1; + + TracerProviderCapture tracerProviderCapture = new TracerProviderCapture(); + Mockito.doAnswer(tracerProviderCapture).when(tracerProvider).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + InternalObjectNode item = new InternalObjectNode(); + item.setId("testDoc"); + cosmosAsyncContainer.createItem(item).block(); + Context context = tracerProviderCapture.getResult(); + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "createItem." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter, null); + traceApiCounter++; + + String errorType = null; + try { + PartitionKey partitionKey = new PartitionKey("wrongPk"); + cosmosAsyncContainer.readItem("testDoc", partitionKey, null, InternalObjectNode.class).block(); + fail("readItem should fail due to wrong pk"); + } catch (CosmosException ex) { + assertThat(ex.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.NOTFOUND); + errorType = ex.getClass().getName(); + } + Mockito.verify(tracerProvider, Mockito.times(traceApiCounter)).startSpan(Matchers.anyString(), + Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + verifyTracerAttributes(mockTracer, "readItem." + cosmosAsyncContainer.getId(), context, + cosmosAsyncDatabase.getId(), traceApiCounter + , errorType); + } + + @AfterClass(groups = {"emulator"}, timeOut = SETUP_TIMEOUT) + public void afterClass() { + LifeCycleUtils.closeQuietly(client); + } + + private static CosmosUserDefinedFunctionProperties getCosmosUserDefinedFunctionProperties() { + CosmosUserDefinedFunctionProperties udf = + new CosmosUserDefinedFunctionProperties(UUID.randomUUID().toString(), "function() {var x = 10;}"); + return udf; + } + + private static CosmosTriggerProperties getCosmosTriggerProperties() { + CosmosTriggerProperties trigger = new CosmosTriggerProperties(UUID.randomUUID().toString(), "function() {var " + + "x = 10;}"); + trigger.setTriggerOperation(TriggerOperation.CREATE); + trigger.setTriggerType(TriggerType.PRE); + return trigger; + } + + private static CosmosStoredProcedureProperties getCosmosStoredProcedureProperties() { + CosmosStoredProcedureProperties storedProcedureDef = + new CosmosStoredProcedureProperties(UUID.randomUUID().toString(), "function() {var x = 10;}"); + return storedProcedureDef; + } + + private Tracer getMockTracer() { + Tracer mockTracer = Mockito.mock(Tracer.class); + Mockito.when(mockTracer.start(Matchers.anyString(), Matchers.any(Context.class))).thenReturn(Context.NONE); + return mockTracer; + } + + private void verifyTracerAttributes(Tracer mockTracer, String methodName, Context context, String databaseName, + int numberOfTimesCalledWithinTest, String errorType) { + if (databaseName != null) { + Mockito.verify(mockTracer, Mockito.times(numberOfTimesCalledWithinTest)).setAttribute(TracerProvider.DB_INSTANCE, + databaseName, context); + } + Mockito.verify(mockTracer, Mockito.times(numberOfTimesCalledWithinTest)).setAttribute(TracerProvider.DB_TYPE, + TracerProvider.DB_TYPE_VALUE, context); + Mockito.verify(mockTracer, Mockito.times(numberOfTimesCalledWithinTest)).setAttribute(TracerProvider.DB_URL, + TestConfigurations.HOST, + context); + Mockito.verify(mockTracer, Mockito.times(1)).setAttribute(TracerProvider.DB_STATEMENT, methodName, context); + if (errorType == null) { + Mockito.verify(mockTracer, Mockito.times(0)).setAttribute(Mockito.eq(TracerProvider.ERROR_MSG) + , Matchers.anyString(), Mockito.eq(context)); + Mockito.verify(mockTracer, Mockito.times(0)).setAttribute(Mockito.eq(TracerProvider.ERROR_TYPE) + , Matchers.anyString(), Mockito.eq(context)); + } else { + Mockito.verify(mockTracer, Mockito.times(1)).setAttribute(Mockito.eq(TracerProvider.ERROR_TYPE) + , Mockito.eq(errorType), Mockito.eq(context)); + Mockito.verify(mockTracer, Mockito.times(1)).setAttribute(Mockito.eq(TracerProvider.ERROR_MSG) + , Matchers.anyString(), Mockito.eq(context)); + } + } + + private class TracerProviderCapture implements Answer { + private Context result = null; + + public Context getResult() { + return result; + } + + @Override + public Context answer(InvocationOnMock invocationOnMock) throws Throwable { + result = (Context) invocationOnMock.callRealMethod(); + return result; + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java index 9dbdc312ffcd..5a9063584e37 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java @@ -9,6 +9,7 @@ import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.RxDocumentClientImpl; +import com.azure.cosmos.implementation.TracerProvider; import com.azure.cosmos.implementation.http.HttpClient; import org.apache.commons.lang3.reflect.FieldUtils; @@ -104,4 +105,8 @@ public static GatewayServiceConfigurationReader getServiceConfigurationReader(Rx public static void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager globalEndPointManager, int millSec){ set(globalEndPointManager, millSec, "backgroundRefreshLocationTimeIntervalInMS"); } + + public static void setTracerProvider(CosmosAsyncClient cosmosAsyncClient, TracerProvider tracerProvider){ + set(cosmosAsyncClient, tracerProvider, "tracerProvider"); + } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedExceptionHandlingTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedExceptionHandlingTest.java index b9424abd3d32..21d5a6680674 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedExceptionHandlingTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedExceptionHandlingTest.java @@ -2,12 +2,14 @@ // Licensed under the MIT License. package com.azure.cosmos.rx; +import com.azure.core.util.tracing.Tracer; import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosClientBuilder; -import com.azure.cosmos.util.CosmosPagedFlux; +import com.azure.cosmos.implementation.TracerProvider; import com.azure.cosmos.models.CosmosDatabaseProperties; import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.util.CosmosPagedFlux; import com.azure.cosmos.util.UtilBridgeInternal; import io.reactivex.subscribers.TestSubscriber; import org.mockito.Mockito; @@ -18,6 +20,7 @@ import reactor.core.publisher.Flux; import java.util.ArrayList; +import java.util.ServiceLoader; import static org.assertj.core.api.Assertions.assertThat; @@ -46,7 +49,10 @@ public void readFeedException() throws Exception { .mergeWith(Flux.fromIterable(frps)); final CosmosAsyncClientWrapper mockedClientWrapper = Mockito.spy(new CosmosAsyncClientWrapper(client)); - Mockito.when(mockedClientWrapper.readAllDatabases()).thenReturn(UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> response)); + Mockito.when(mockedClientWrapper.readAllDatabases()).thenReturn(UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + pagedFluxOptions.setTracerInformation(new TracerProvider(null), "testSpan", "testEndpoint,", "testDb"); + return response; + })); TestSubscriber> subscriber = new TestSubscriber<>(); mockedClientWrapper.readAllDatabases().byPage().subscribe(subscriber); assertThat(subscriber.valueCount()).isEqualTo(2);