Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Baseline to allow providing MeterRegistry for client metrics #30065

Merged
merged 75 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
b1ab028
Fixes a regression for distinct queries with PoJo serialization
FabianMeiswinkel Jul 19, 2022
0d3d8ac
Update CHANGELOG.md
FabianMeiswinkel Jul 19, 2022
7fd6eb6
Update CHANGELOG.md
FabianMeiswinkel Jul 19, 2022
36d4537
Update ParallelDocumentQueryExecutionContext.java
FabianMeiswinkel Jul 19, 2022
503a498
Baseline to allow providing MeterRegistry for client metrics
FabianMeiswinkel Jul 21, 2022
497ab65
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Jul 21, 2022
055f211
Update CosmosQueryRequestOptions.java
FabianMeiswinkel Jul 21, 2022
1091b79
Update CosmosQueryRequestOptions.java
FabianMeiswinkel Jul 22, 2022
9068fff
API clean-up
FabianMeiswinkel Jul 22, 2022
3e11083
Iterating on meters
FabianMeiswinkel Jul 22, 2022
38b31b8
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Jul 22, 2022
a49d74a
Update CosmosPagedFlux.java
FabianMeiswinkel Jul 23, 2022
5231920
Update CosmosPagedFlux.java
FabianMeiswinkel Jul 25, 2022
3aa183c
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Jul 25, 2022
0376ea0
Update BridgeInternal.java
FabianMeiswinkel Jul 25, 2022
51eef29
Update CosmosPagedFlux.java
FabianMeiswinkel Jul 25, 2022
d9fb129
Iterating on metrics changes
FabianMeiswinkel Jul 26, 2022
1cb6c72
Create ClientTelemetryConfig.java
FabianMeiswinkel Jul 26, 2022
2dc2d46
Iterating on metrics
FabianMeiswinkel Jul 26, 2022
29d70dd
Iterating on Metrics
FabianMeiswinkel Jul 26, 2022
0b1310e
Refactoring ClientTelemetryConfig
FabianMeiswinkel Jul 27, 2022
a2fffc7
Making collection interval configurable
FabianMeiswinkel Jul 27, 2022
aafd32a
Adding RNTBD metrics to client metrics
FabianMeiswinkel Jul 28, 2022
e392af5
Adding metrics for Gateway and address resolution
FabianMeiswinkel Jul 28, 2022
10dfaae
Update Utils.java
FabianMeiswinkel Jul 28, 2022
147d6b9
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Jul 29, 2022
f08507f
Update SparkE2EGatewayChangeFeedITest.scala
FabianMeiswinkel Jul 29, 2022
a01641f
Fixing build validation errors
FabianMeiswinkel Jul 29, 2022
ff595db
Update spotbugs-exclude.xml
FabianMeiswinkel Jul 29, 2022
846f343
Fixing NPE regression in RntbdServiceEndpoint
FabianMeiswinkel Jul 29, 2022
769bed8
Update CosmosClientConfiguration.scala
FabianMeiswinkel Jul 29, 2022
49b348a
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Jul 29, 2022
91a3e21
Update SparkE2EThroughputControlITest.scala
FabianMeiswinkel Jul 29, 2022
9f5bea9
Actually emitting Gateway statistics even for changefeed
FabianMeiswinkel Jul 29, 2022
e072ca5
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Jul 29, 2022
61c44a3
Adding more metric assertions in Spark unit tests
FabianMeiswinkel Jul 29, 2022
85a837f
More tests initial skeleton of Metrics.md
FabianMeiswinkel Jul 30, 2022
d94fa7f
Update Metrics.md
FabianMeiswinkel Aug 1, 2022
d79e27e
Iterating on Metrics.md
FabianMeiswinkel Aug 1, 2022
b22c04c
Update Metrics.md
FabianMeiswinkel Aug 1, 2022
d015006
Adding query plan metrics
FabianMeiswinkel Aug 1, 2022
b3b1ec9
Fixing test issues/failures in new tests
FabianMeiswinkel Aug 1, 2022
2514081
Fixing new test failures
FabianMeiswinkel Aug 1, 2022
683e29b
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Aug 1, 2022
1e5d632
Adding vmId to clientCorrelationId when available
FabianMeiswinkel Aug 1, 2022
0dbc34f
Not shading com.codahale
FabianMeiswinkel Aug 1, 2022
7723672
Fixing test failures
FabianMeiswinkel Aug 2, 2022
019801f
Fixing shading for AppInsights plugin
FabianMeiswinkel Aug 2, 2022
eff89e5
Finishing Metric.md
FabianMeiswinkel Aug 3, 2022
79e9dba
Adding unit test coverage for client metrics for bulk requests
FabianMeiswinkel Aug 3, 2022
8c84abb
Adding TransactionalBatch test coverage with client metrics
FabianMeiswinkel Aug 3, 2022
14d9c64
Update CosmosTracerTest.java
FabianMeiswinkel Aug 3, 2022
139f805
Fixing shading config for appinsights
FabianMeiswinkel Aug 3, 2022
c646de2
Adding md file for metrics in Spark
FabianMeiswinkel Aug 3, 2022
b8b0cbd
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Aug 3, 2022
fc4a91a
Updated changelogs
FabianMeiswinkel Aug 3, 2022
a3a5549
Update pom.xml
FabianMeiswinkel Aug 3, 2022
91e08ff
Update metrics.md
FabianMeiswinkel Aug 3, 2022
9601374
Adding option to enable client metrics in benchmark
FabianMeiswinkel Aug 5, 2022
19a9fd8
Update pom.xml
FabianMeiswinkel Aug 5, 2022
c9e7513
Updating Markup with adjusted defaults
FabianMeiswinkel Aug 5, 2022
fdab8e0
Update pom.xml
FabianMeiswinkel Aug 5, 2022
a935132
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Aug 5, 2022
7dd5ca6
Reacting to code review feedback
FabianMeiswinkel Aug 6, 2022
77c797b
Merged latest main and resolved conflicts
kushagraThapar Aug 15, 2022
73f0b6a
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Sep 14, 2022
30e8412
Reacting to code review comments
FabianMeiswinkel Sep 14, 2022
e8772f4
Reacting to code review feedback
FabianMeiswinkel Sep 16, 2022
3ef9433
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Sep 16, 2022
7c22101
Merging with master
FabianMeiswinkel Sep 16, 2022
6f1e332
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Sep 21, 2022
046e14a
Update ClientTelemetryMetrics.java
FabianMeiswinkel Sep 21, 2022
c98f5fa
Reacting to code review feedback
FabianMeiswinkel Sep 22, 2022
31ff730
Disabling slf4j logging by default
FabianMeiswinkel Sep 22, 2022
749f1f3
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
FabianMeiswinkel Sep 23, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2174,6 +2174,8 @@
<Class name="com.azure.cosmos.implementation.ImplementationBridgeHelpers$DirectConnectionConfigHelper"/>
<Class name="com.azure.cosmos.implementation.ImplementationBridgeHelpers$FeedResponseHelper"/>
<Class name="com.azure.cosmos.implementation.ImplementationBridgeHelpers$PartitionKeyHelper"/>
<Class name="com.azure.cosmos.implementation.ImplementationBridgeHelpers$CosmosAsyncClientHelper"/>
<Class name="com.azure.cosmos.implementation.ImplementationBridgeHelpers$CosmosClientTelemetryConfigHelper"/>
</Or>
<Bug pattern="DM_EXIT"/>
</Match>
Expand Down
1 change: 1 addition & 0 deletions eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ cosmos_org.scala-lang:scala-library;2.12.10
cosmos_org.scala-lang.modules:scala-java8-compat_2.12;0.8.0
cosmos_io.projectreactor:reactor-scala-extensions_2.12;0.8.0
cosmos_commons-io:commons-io;2.4
cosmos_com.microsoft.azure:applicationinsights-core;2.6.4

# Cosmos Spark connector tests only
cosmos_org.scalatest:scalatest_2.12;3.2.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ abstract class AsyncBenchmark<T> {
private AtomicBoolean warmupMode = new AtomicBoolean(false);

AsyncBenchmark(Configuration cfg) {

logger = LoggerFactory.getLogger(this.getClass());
configuration = cfg;
MeterRegistry registry = configuration.getAzureMonitorMeterRegistry();

CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
.endpoint(cfg.getServiceEndpoint())
.key(cfg.getMasterKey())
Expand All @@ -84,6 +89,16 @@ abstract class AsyncBenchmark<T> {
.contentResponseOnWriteEnabled(cfg.isContentResponseOnWriteEnabled())
.clientTelemetryEnabled(cfg.isClientTelemetryEnabled());

if (registry != null) {
cosmosClientBuilder.clientTelemetryConfig().clientMetrics(registry);
}

registry = configuration.getGraphiteMeterRegistry();

if (registry != null) {
cosmosClientBuilder.clientTelemetryConfig().clientMetrics(registry);
}

if (cfg.getConnectionMode().equals(ConnectionMode.DIRECT)) {
cosmosClientBuilder = cosmosClientBuilder.directMode(DirectConnectionConfig.getDefaultConfig());
} else {
Expand All @@ -92,8 +107,6 @@ abstract class AsyncBenchmark<T> {
cosmosClientBuilder = cosmosClientBuilder.gatewayMode(gatewayConnectionConfig);
}
cosmosClient = cosmosClientBuilder.buildAsyncClient();
configuration = cfg;
logger = LoggerFactory.getLogger(this.getClass());

try {
cosmosAsyncDatabase = cosmosClient.getDatabase(this.configuration.getDatabaseId());
Expand Down Expand Up @@ -221,18 +234,6 @@ uuid, new PartitionKey(partitionKey), PojoizedJson.class)
.convertRatesTo(TimeUnit.SECONDS)
.build();
}

MeterRegistry registry = configuration.getAzureMonitorMeterRegistry();

if (registry != null) {
BridgeInternal.monitorTelemetry(registry);
}

registry = configuration.getGraphiteMeterRegistry();

if (registry != null) {
BridgeInternal.monitorTelemetry(registry);
}
}

protected void init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.micrometer.azuremonitor.AzureMonitorMeterRegistry;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.lang.Nullable;
import io.micrometer.graphite.GraphiteConfig;
Expand Down Expand Up @@ -574,6 +575,7 @@ private synchronized MeterRegistry azureMonitorMeterRegistry(String instrumentat
if (this.azureMonitorMeterRegistry == null) {

Duration step = Duration.ofSeconds(Integer.getInteger("azure.cosmos.monitoring.azureMonitor.step", this.printingInterval));
String testCategoryTag = System.getProperty("azure.cosmos.monitoring.azureMonitor.testCategory");
boolean enabled = !Boolean.getBoolean("azure.cosmos.monitoring.azureMonitor.disabled");

final AzureMonitorConfig config = new AzureMonitorConfig() {
Expand Down Expand Up @@ -602,6 +604,11 @@ public boolean enabled() {
};

this.azureMonitorMeterRegistry = new AzureMonitorMeterRegistry(config, Clock.SYSTEM);
if (!Strings.isNullOrEmpty(testCategoryTag)) {
List<Tag> globalTags = new ArrayList<>();
globalTags.add(Tag.of("TestCategory", testCategoryTag));
this.azureMonitorMeterRegistry.config().commonTags(globalTags);
}
}

return this.azureMonitorMeterRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.ClientTelemetryConfig;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.Database;
import com.azure.cosmos.implementation.DatabaseForTest;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.ResourceResponse;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.models.FeedResponse;
Expand All @@ -32,7 +32,11 @@ public static AsyncDocumentClient housekeepingClient() {
.withMasterKeyOrResourceToken(TestConfigurations.MASTER_KEY)
.withConnectionPolicy(connectionPolicy)
.withContentResponseOnWriteEnabled(true)
.withClientTelemetryConfig(ClientTelemetryConfig.getDefaultConfig())
.withClientTelemetryConfig(
ImplementationBridgeHelpers
.CosmosClientTelemetryConfigHelper
.getCosmosClientTelemetryConfigAccessor()
.getDefaultConfig())
.build();
}

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.14.0-beta.1 (Unreleased)

#### Features Added
* Added option to emit client-side metrics via micrometer.io MeterRegistry. - See [PR 30065](https://github.com/Azure/azure-sdk-for-java/pull/30065)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.14.0-beta.1 (Unreleased)

#### Features Added
* Added option to emit client-side metrics via micrometer.io MeterRegistry. - See [PR 30065](https://github.com/Azure/azure-sdk-for-java/pull/30065)

#### Breaking Changes

Expand Down
60 changes: 60 additions & 0 deletions sdk/cosmos/azure-cosmos-spark_3_2-12/docs/metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Cosmos DB client-side metrics

The Cosmos DB SDK for Java allows enabling micrometer.io metrics to track latency, request charge (RU/s), request rates etc. for logical operations (API calls into the SDK from your application/service) as well as actual requests to the Cosmos DB service (for example when due to Consistency level Strong or Bounded Staleness requests need to be sent to multiple replica). These metrics are tagged with different dimensions - so it is possible to look at metrics like latency, request charge etc. for a certain dimension like operation type, response status code etc.

**<u>Please note:</u> There is some performance overhead when enabling metrics due to increased CPU usage. The overhead is small (<10%) but should be tested before enabling it in production especially for large containers/workloads (> 5 TB of data or > 1,000,000 RU/s)**

More information about the metrics being emitted by the Azure Cosmos DB SDK are available here: https://aka.ms/azure-cosmos-metrics



## How to enable metrics?

When using the Azure Cosmos DB Spark connector for Spark 3.* you can enable these client-side metrics and emit them to the Spark metric system (ganglia plus optionally in Log4j log files) as well as to Azure Monitor / ApplicationInsights simply via configuration.

### How to enable in Azure Databricks?

The Spark API that would allow to provide custom metrics is the Spark plugin API - see ` https://issues.apache.org/jira/browse/SPARK-24918`, `https://github.com/apache/spark/pull/26170` and `http://blog.madhukaraphatak.com/spark-plugin-part-4/`

The Azure Cosmos DB Spark connector contains two such plugins to emit metrics:

- `com.azure.cosmos.spark.plugins.CosmosMetricsSparkPlugin`: This plugin emits the metrics to the Spark metric system (Spark config can be used to control where Spark emits metrics - Azure Databricks would usually push them to the Ganglia Metrics UI included in the Spark UI). Optionally the metrics can also be pushed to a Log4J log file (by setting the `spark.cosmos.metrics.slf4j.enabled` property in the spark config to `true`). The `spark.cosmos.metrics.intervalInSeconds` property can be used to determine how frequently metrics should be collected/aggregated (default is once per minute)
- `com.azure.cosmos.spark.plugins.CosmosMetricsApplicationInsightsPlugin`: This plugin can be used to emit the metrics to Azure monitor. You can provide the Azure Monitor connection string in the `spark.cosmos.metrics.azureMonitor.connectionString` property. The `spark.cosmos.metrics.intervalInSeconds` property can be used to determine how frequently metrics should be collected/aggregated (default is once per minute)

Last-but-not-least - to be able to use these Spark plugins the jar containing the plugins needs to be available at Cluster creation - so it is not sufficient to just install the Azure Cosmos DB Spark connector in the `Libraries` section of the cluster, but the jar needs to be copied into the `/databricks/jars` folder at cluster initialization via a start-up script.

Follow these steps for the installation:

- Download the latest Azure Cosmos DB Spark connector jar from Maven
- Spark 3.1: `https://repo1.maven.org/maven2/com/azure/cosmos/spark/azure-cosmos-spark_3-1_2-12/ReplaceWithTheLatestVersion/azure-cosmos-spark_3-1_2-12-ReplaceWithTheLatestVersion.jar`
- Spark 3.2: `https://repo1.maven.org/maven2/com/azure/cosmos/spark/azure-cosmos-spark_3-1_2-12/ReplaceWithTheLatestVersion/azure-cosmos-spark_3-2_2-12-ReplaceWithTheLatestVersion.jar`

- Upload this jar to your Databricks file system (in `/dbfs/FileStore/plugins` folder)
- Create a text file (NOTE: Use Unix Line feeds (LF - not Windows line feeds) with the content below and upload this start-up script as well.
- Configure the new start-up script in the cluster configuration

- Content of the start-up script
```sh
#!/bin/bash

STAGE_DIR="/dbfs/FileStore/plugins"

echo "BEGIN: Upload Spark Plugins"
cp -f $STAGE_DIR/*.jar /databricks/jars || { echo "Error copying Spark Plugin library file"; exit 1;}
echo "END: Upload Spark Plugin JARs"

```

- Now change the clusters's spark configuration to add these two options

- `spark.plugins ` com.azure.cosmos.spark.plugins.CosmosMetricsSparkPlugin,com.azure.cosmos.spark.plugins.CosmosMetricsApplicationInsightsPlugin
- `spark.cosmos.metrics.azureMonitor.connectionString ` **REPLACE_WITH_YOUR_AZURE_MONITOR_CONNECTION_STRING**

- Restart the cluster - and test...




### How to enable in Azure Synapse?

TBD
43 changes: 33 additions & 10 deletions sdk/cosmos/azure-cosmos-spark_3_2-12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,6 @@
<version>3.4.22</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version> <!-- {x-version-update;org.slf4j:slf4j-simple;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down Expand Up @@ -148,6 +142,19 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version> <!-- {x-version-update;org.slf4j:slf4j-api;external_dependency} -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-azure-monitor</artifactId>
<version>1.9.3</version> <!-- {x-version-update;io.micrometer:micrometer-registry-azure-monitor;external_dependency} -->
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>applicationinsights-core</artifactId>
<version>2.6.4</version> <!-- {x-version-update;cosmos_com.microsoft.azure:applicationinsights-core;external_dependency} -->
<scope>compile</scope>
</dependency>
</dependencies>

Expand Down Expand Up @@ -187,6 +194,8 @@
<include>org.scalastyle:scalastyle-maven-plugin:[1.0.0]</include> <!-- {x-include-update;cosmos_org.scalastyle:scalastyle-maven-plugin;external_dependency} -->
<include>com.fasterxml.jackson.core:jackson-databind:[2.13.3]</include> <!-- {x-include-update;com.fasterxml.jackson.core:jackson-databind;external_dependency} -->
<include>com.fasterxml.jackson.module:jackson-module-scala_2.12:[2.13.3]</include> <!-- {x-include-update;com.fasterxml.jackson.module:jackson-module-scala_2.12;external_dependency} -->
<include>io.micrometer:micrometer-registry-azure-monitor:[1.9.3]</include> <!-- {x-include-update;io.micrometer:micrometer-registry-azure-monitor;external_dependency} -->
<include>com.microsoft.azure:applicationinsights-core:[2.6.4]</include> <!-- {x-include-update;cosmos_com.microsoft.azure:applicationinsights-core;external_dependency} -->
</includes>
</bannedDependencies>
</rules>
Expand Down Expand Up @@ -378,10 +387,6 @@
<pattern>io.netty</pattern>
<shadedPattern>${shadingPrefix}.io.netty</shadedPattern>
</relocation>
<relocation>
<pattern>com.codahale</pattern>
<shadedPattern>${shadingPrefix}.com.codahale</shadedPattern>
</relocation>
<relocation>
<pattern>org.codehaus</pattern>
<shadedPattern>${shadingPrefix}.org.codehaus</shadedPattern>
Expand Down Expand Up @@ -418,8 +423,26 @@
<pattern>javax.xml</pattern>
<shadedPattern>${shadingPrefix}.javax.xml</shadedPattern>
</relocation>
<relocation>
<pattern>com.microsoft.azure</pattern>
<shadedPattern>${shadingPrefix}.com.microsoft.azure</shadedPattern>
</relocation>
<relocation>
<pattern>com.microsoft.applicationinsights</pattern>
<shadedPattern>${shadingPrefix}.com.microsoft.applicationinsights</shadedPattern>
</relocation>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>${shadingPrefix}.com.google</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
<artifact>com.microsoft.azure:applicationinsights-core</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>*:*</artifact>
<excludes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
// Licensed under the MIT License.
package com.azure.cosmos.spark

import com.azure.cosmos.implementation.{CosmosClientMetadataCachesSnapshot, CosmosDaemonThreadFactory, SparkBridgeImplementationInternal}
import com.azure.cosmos.implementation.clienttelemetry.TagName
import com.azure.cosmos.implementation.{CosmosClientMetadataCachesSnapshot, CosmosDaemonThreadFactory, SparkBridgeImplementationInternal, Strings}
import com.azure.cosmos.spark.CosmosPredicates.isOnSparkDriver
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import com.azure.cosmos.{ConsistencyLevel, CosmosAsyncClient, CosmosClientBuilder, DirectConnectionConfig, ThrottlingRetryOptions}
Expand Down Expand Up @@ -121,6 +122,7 @@ private[spark] object CosmosClientCache extends BasicLoggingTrait {
cosmosClientStateHandle: Option[CosmosClientMetadataCachesSnapshot],
ownerInfo: OwnerInfo)
: CosmosClientCacheItem = synchronized {

val clientConfigWrapper = ClientConfigurationWrapper(cosmosClientConfiguration)
cache.get(clientConfigWrapper) match {
case Some(clientCacheMetadata) => clientCacheMetadata.createCacheItemForReuse(ownerInfo)
Expand All @@ -134,6 +136,31 @@ private[spark] object CosmosClientCache extends BasicLoggingTrait {
.setMaxRetryAttemptsOnThrottledRequests(Int.MaxValue)
.setMaxRetryWaitTime(Duration.ofSeconds((Integer.MAX_VALUE/1000) - 1)))

if (CosmosClientMetrics.meterRegistry.isDefined) {
val customApplicationNameSuffix = cosmosClientConfiguration.customApplicationNameSuffix
.getOrElse("")

val clientCorrelationId = SparkSession.getActiveSession match {
case Some(session) =>
val ctx = session.sparkContext

if (Strings.isNullOrWhiteSpace(customApplicationNameSuffix)) {
s"${CosmosClientMetrics.executorId}-${ctx.appName}"
} else {
s"$customApplicationNameSuffix-${CosmosClientMetrics.executorId}-${ctx.appName}"
}
case None => customApplicationNameSuffix
}

builder.clientTelemetryConfig().clientMetrics(CosmosClientMetrics.meterRegistry.get)
builder.clientTelemetryConfig().clientCorrelationId(clientCorrelationId)
builder.clientTelemetryConfig().metricTagNames(
s"${TagName.Container}, ${TagName.ClientCorrelationId}, ${TagName.Operation}, " +
s"${TagName.OperationStatusCode}, ${TagName.PartitionKeyRangeId}, ${TagName.ServiceEndpoint}, " +
s"${TagName.ServiceAddress}"
)
}

if (cosmosClientConfiguration.disableTcpConnectionEndpointRediscovery) {
builder.endpointDiscoveryEnabled(false)
}
Expand Down Expand Up @@ -350,6 +377,12 @@ private[spark] object CosmosClientCache extends BasicLoggingTrait {
}
}

def clearCache(): Unit = {
cache.readOnlySnapshot().keys.foreach(clientCfgWrapper => purgeImpl(clientCfgWrapper, forceClosure = true))
cache.clear()
cleanUpToBeClosedWhenNotActiveAnymore(forceClosure = true)
}

private[this] class CacheItemImpl
(
val cosmosClient: CosmosAsyncClient,
Expand Down Expand Up @@ -384,9 +417,7 @@ private[spark] object CosmosClientCache extends BasicLoggingTrait {
case Some(_) =>
logInfo(
s"ApplicationEndListener:onApplicationEnd(${ctx.hashCode}) closed - purging all cosmos clients")
cache.readOnlySnapshot().keys.foreach(clientCfgWrapper => purgeImpl(clientCfgWrapper, forceClosure = true))
cache.clear()
cleanUpToBeClosedWhenNotActiveAnymore(forceClosure = true)
clearCache()
case None =>
logWarning(s"ApplicationEndListener:onApplicationEnd (${ctx.hashCode}) - not monitored anymore")
}
Expand Down
Loading