Skip to content

Commit

Permalink
addingMetricsSupportInKafkaConnector (Azure#39808)
Browse files Browse the repository at this point in the history
* allow config metrics from system property

---------

Co-authored-by: annie-mac <[email protected]>
  • Loading branch information
xinlian12 and annie-mac authored Apr 24, 2024
1 parent 602ccd6 commit d17f07b
Show file tree
Hide file tree
Showing 14 changed files with 327 additions and 18 deletions.
11 changes: 6 additions & 5 deletions sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ Licensed under the MIT License.
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}-jar-with-dependencies</finalName>
<filters>
<filter>
<artifact>*:*:*:*</artifact>
Expand All @@ -330,6 +329,12 @@ Licensed under the MIT License.
<exclude>META-INF.versions.9.module-info.class</exclude>
</excludes>
</filter>
<filter>
<artifact>io.micrometer:micrometer-core</artifact>
<includes>
<include>**</include>
</includes>
</filter>
</filters>
<relocations>
<relocation>
Expand Down Expand Up @@ -363,10 +368,6 @@ Licensed under the MIT License.
<pattern>com.thoughtworks.paranamer</pattern>
<shadedPattern>${shadingPrefix}.com.thoughtworks.paranamer</shadedPattern>
</relocation>
<relocation>
<pattern>io.micrometer</pattern>
<shadedPattern>${shadingPrefix}.io.micrometer</shadedPattern>
</relocation>
<relocation>
<pattern>org.HdrHistogram</pattern>
<shadedPattern>${shadingPrefix}.org.HdrHistogram</shadedPattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ New-Item -Path "$PSScriptRoot/src/test/connectorPlugins" -ItemType "directory" -

Write-Host "Rebuilding Cosmos DB connectors..."
mvn clean package -DskipTests -Dmaven.javadoc.skip
copy target\*-jar-with-dependencies.jar $PSScriptRoot/src/test/connectorPlugins/connectors
copy target/azure-cosmos-kafka-connect-*.jar $PSScriptRoot/src/test/connectorPlugins/connectors
cd $PSScriptRoot/src/test/connectorPlugins

Write-Host "Adding custom Insert UUID SMT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mkdir src/test/connectorPlugins/connectors

echo "Rebuilding Cosmos DB connectors..."
mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true
cp target/*-jar-with-dependencies.jar src/test/connectorPlugins/connectors
cp target/azure-cosmos-kafka-connect-*.jar src/test/connectorPlugins/connectors
cd src/test/connectorPlugins

echo "Adding custom Insert UUID SMT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private static void createConnectorJar() throws IOException, InterruptedExceptio
if (exitCode == 0) {
logger.info("Build script completed successfully");
//validate the jar exists
File jarFile = findFile("src/test/connectorPlugins/connectors", "jar-with-dependencies.jar");
File jarFile = findFile("src/test/connectorPlugins/connectors", "azure-cosmos-kafka-connect");

assertThat(jarFile).isNotNull();
assertThat(jarFile.exists()).isTrue();
Expand All @@ -147,13 +147,13 @@ private static void createConnectorJar() throws IOException, InterruptedExceptio
}
}

private static File findFile(String folder, String filenameFilterEndsWith) {
private static File findFile(String folder, String filenameFilterStartsWith) {
File file = new File(folder);
if (!file.exists() || !file.isDirectory()) {
return null;
}
return Arrays.stream(file.listFiles())
.filter(f -> f.getName().endsWith(filenameFilterEndsWith))
.filter(f -> f.getName().startsWith(filenameFilterStartsWith))
.findFirst().orElse(null);
}

Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3_2-12/dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ The release process is currently manual.

Download the spark artifacts or build locally:
```bash
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos -am clean install
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos -am clean install
true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos -am clean install
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-spark_3-1_2-12 clean install
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-spark_3-2_2-12 clean install
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-spark_3-3_2-12 clean install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,62 @@ public void meterNameFromStringConversion() {
.isSameAs(CosmosMetricName.DIRECT_REQUEST_SIZE_REQUEST);
}

@Test(groups = { "unit" }, timeOut = TIMEOUT)
public void metricConfigsThroughSystemProperty() {
System.setProperty(
"COSMOS.METRICS_CONFIG",
"{\"metricCategories\":\"[OperationDetails]\","
+ "\"tagNames\":\"[PartitionId]\","
+ "\"sampleRate\":0.5,"
+ "\"percentiles\":[0.90,0.99],"
+ "\"enableHistograms\":false,"
+ "\"applyDiagnosticThresholdsForTransportLevelMeters\":true}");

CosmosClientBuilder testClientBuilder = new CosmosClientBuilder();
CosmosClientTelemetryConfig clientTelemetryConfig = ReflectionUtils.getClientTelemetryConfig(testClientBuilder);
EnumSet<MetricCategory> effectiveMetricsCategory = ImplementationBridgeHelpers
.CosmosClientTelemetryConfigHelper
.getCosmosClientTelemetryConfigAccessor()
.getMetricCategories(clientTelemetryConfig);
assertThat(effectiveMetricsCategory).containsAll(MetricCategory.MINIMAL_CATEGORIES);
assertThat(effectiveMetricsCategory).contains(MetricCategory.OperationDetails);

EnumSet<TagName> effectiveTagNames =
ImplementationBridgeHelpers
.CosmosClientTelemetryConfigHelper
.getCosmosClientTelemetryConfigAccessor()
.getMetricTagNames(clientTelemetryConfig);
assertThat(effectiveTagNames).containsAll(TagName.MINIMUM_TAGS);
assertThat(effectiveTagNames).contains(TagName.PartitionId);

double sampleRate = ImplementationBridgeHelpers
.CosmosClientTelemetryConfigHelper
.getCosmosClientTelemetryConfigAccessor()
.getSamplingRate(clientTelemetryConfig);
assertThat(sampleRate).isEqualTo(0.5);

double[] percentiles =
ImplementationBridgeHelpers
.CosmosClientTelemetryConfigHelper
.getCosmosClientTelemetryConfigAccessor()
.getDefaultPercentiles(clientTelemetryConfig);
assertThat(percentiles).contains(0.90, 0.99);

boolean publishHistograms =
ImplementationBridgeHelpers
.CosmosClientTelemetryConfigHelper
.getCosmosClientTelemetryConfigAccessor()
.shouldPublishHistograms(clientTelemetryConfig);
assertThat(publishHistograms).isFalse();

boolean applyDiagnosticThresholdsForTransportLevelMeters =
ImplementationBridgeHelpers
.CosmosClientTelemetryConfigHelper
.getCosmosClientTelemetryConfigAccessor()
.shouldApplyDiagnosticThresholdsForTransportLevelMeters(clientTelemetryConfig);
assertThat(applyDiagnosticThresholdsForTransportLevelMeters).isTrue();
}

private InternalObjectNode getDocumentDefinition(String documentId) {
final String uuid = UUID.randomUUID().toString();
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@

package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.clienttelemetry.MetricCategory;
import com.azure.cosmos.implementation.clienttelemetry.TagName;
import com.azure.cosmos.implementation.directconnectivity.Protocol;
import org.testng.annotations.Test;

import java.util.EnumSet;

import static org.assertj.core.api.Assertions.assertThat;

public class ConfigsTests {
Expand Down Expand Up @@ -33,4 +37,36 @@ public void getDirectHttpsMaxConnectionLimit() {
Configs config = new Configs();
assertThat(config.getDirectHttpsMaxConnectionLimit()).isEqualTo(Runtime.getRuntime().availableProcessors() * 500);
}

@Test(groups = { "unit" })
public void getMetricsConfig() {
System.clearProperty("COSMOS.METRICS_CONFIG");
CosmosMicrometerMetricsConfig metricsConfig = Configs.getMetricsConfig();
assertThat(metricsConfig.getMetricCategories()).isEqualTo(MetricCategory.DEFAULT_CATEGORIES);
assertThat(metricsConfig.getTagNames()).isEqualTo(TagName.DEFAULT_TAGS);
assertThat(metricsConfig.getPercentiles()).contains(0.95, 0.99);
assertThat(metricsConfig.getEnableHistograms()).isTrue();
assertThat(metricsConfig.getApplyDiagnosticThresholdsForTransportLevelMeters()).isFalse();
assertThat(metricsConfig.getSampleRate()).isEqualTo(1.0);

System.setProperty(
"COSMOS.METRICS_CONFIG",
"{\"metricCategories\":\"[OperationSummary, RequestSummary]\","
+ "\"tagNames\":\"[Container, Operation]\","
+ "\"sampleRate\":0.5,"
+ "\"percentiles\":[0.90,0.99],"
+ "\"enableHistograms\":false,"
+ "\"applyDiagnosticThresholdsForTransportLevelMeters\":true}");
try {
metricsConfig = Configs.getMetricsConfig();
assertThat(metricsConfig.getMetricCategories()).isEqualTo(EnumSet.of(MetricCategory.OperationSummary, MetricCategory.RequestSummary));
assertThat(metricsConfig.getTagNames()).isEqualTo(EnumSet.of(TagName.Container, TagName.Operation));
assertThat(metricsConfig.getPercentiles()).contains(0.90, 0.99);
assertThat(metricsConfig.getEnableHistograms()).isFalse();
assertThat(metricsConfig.getApplyDiagnosticThresholdsForTransportLevelMeters()).isTrue();
assertThat(metricsConfig.getSampleRate()).isEqualTo(0.5);
} finally {
System.clearProperty("COSMOS.METRICS_CONFIG");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLException;

import java.time.Duration;
import java.util.Locale;

Expand Down Expand Up @@ -175,6 +174,19 @@ public class Configs {
public static final String DIAGNOSTICS_PROVIDER_SYSTEM_EXIT_ON_ERROR = "COSMOS.DIAGNOSTICS_PROVIDER_SYSTEM_EXIT_ON_ERROR";
public static final boolean DEFAULT_DIAGNOSTICS_PROVIDER_SYSTEM_EXIT_ON_ERROR = true;

// Metrics
// Samples:
// System.setProperty(
// "COSMOS.METRICS_CONFIG",
// "{\"metricCategories\":\"[OperationSummary, RequestSummary]\","
// + "\"tagNames\":\"[Container, Operation]\","
// + "\"sampleRate\":0.5,"
// + "\"percentiles\":[0.90,0.99],"
// + "\"enableHistograms\":false,"
// + "\"applyDiagnosticThresholdsForTransportLevelMeters\":true}");
public static final String METRICS_CONFIG = "COSMOS.METRICS_CONFIG";
public static final String DEFAULT_METRICS_CONFIG = CosmosMicrometerMetricsConfig.DEFAULT.toJson();

public Configs() {
this.sslContext = sslContextInit();
}
Expand Down Expand Up @@ -509,4 +521,15 @@ public static boolean shouldDiagnosticsProviderSystemExitOnError() {

return Boolean.parseBoolean(shouldSystemExit);
}

public static CosmosMicrometerMetricsConfig getMetricsConfig() {
String metricsConfig =
System.getProperty(
METRICS_CONFIG,
firstNonNull(
emptyToNull(System.getenv().get(METRICS_CONFIG)),
DEFAULT_METRICS_CONFIG));

return CosmosMicrometerMetricsConfig.fromJsonString(metricsConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.clienttelemetry.MetricCategory;
import com.azure.cosmos.implementation.clienttelemetry.TagName;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.Nulls;
import com.fasterxml.jackson.core.JsonProcessingException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;

public class CosmosMicrometerMetricsConfig {
public static final CosmosMicrometerMetricsConfig DEFAULT = new CosmosMicrometerMetricsConfig();

@JsonSetter(nulls = Nulls.SKIP)
@JsonProperty
private String metricCategories = MetricCategory.DEFAULT_CATEGORIES.clone().toString();
@JsonSetter(nulls = Nulls.SKIP)
@JsonProperty
private String tagNames = TagName.DEFAULT_TAGS.clone().toString();
@JsonSetter(nulls = Nulls.SKIP)
@JsonProperty
private Double sampleRate = 1.0;
@JsonSetter(nulls = Nulls.SKIP)
@JsonProperty
private double[] percentiles = { 0.95, 0.99 };
@JsonSetter(nulls = Nulls.SKIP)
@JsonProperty
private Boolean enableHistograms = true;
@JsonSetter(nulls = Nulls.SKIP)
@JsonProperty
private Boolean applyDiagnosticThresholdsForTransportLevelMeters = false;

public CosmosMicrometerMetricsConfig() {}

public String toJson() {
try {
return Utils.getSimpleObjectMapper().writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException("Unable to convert to Json String", e);
}
}

@JsonIgnore
public EnumSet<MetricCategory> getMetricCategories() {
List<String> metricsCategoryList = convertToList(this.metricCategories);
return EnumSet.copyOf(
metricsCategoryList
.stream()
.map(categoryValue -> MetricCategory.fromValue(categoryValue))
.collect(Collectors.toList())
);
}

@JsonIgnore
public EnumSet<TagName> getTagNames() {
List<String> tagNames = convertToList(this.tagNames);
return EnumSet.copyOf(
tagNames
.stream()
.map(tagName -> TagName.fromValue(tagName))
.collect(Collectors.toList())
);
}

public double[] getPercentiles() {
return this.percentiles;
}

public double getSampleRate() {
return this.sampleRate;
}

public Boolean getEnableHistograms() {
return this.enableHistograms;
}

public Boolean getApplyDiagnosticThresholdsForTransportLevelMeters() {
return applyDiagnosticThresholdsForTransportLevelMeters;
}

private static List<String> convertToList(String value) {
if (StringUtils.isEmpty(value)) {
return new ArrayList<>();
}
if (value.startsWith("[") && value.endsWith("]")) {
value = value.substring(1, value.length() - 1);
}

return Arrays.stream(value.split(",")).map(String::trim).collect(Collectors.toList());
}

public static CosmosMicrometerMetricsConfig fromJsonString(String jsonString) {
try {
return Utils.getSimpleObjectMapper().readValue(jsonString, CosmosMicrometerMetricsConfig.class);
} catch (JsonProcessingException e) {
throw new RuntimeException("Unable to convert from Json String", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,9 @@ CosmosClientTelemetryConfig createSnapshot(
void setUseLegacyTracing(CosmosClientTelemetryConfig config, boolean useLegacyTracing);
void setTracer(CosmosClientTelemetryConfig config, Tracer tracer);
double getSamplingRate(CosmosClientTelemetryConfig config);
double[] getDefaultPercentiles(CosmosClientTelemetryConfig config);
boolean shouldPublishHistograms(CosmosClientTelemetryConfig config);
boolean shouldApplyDiagnosticThresholdsForTransportLevelMeters(CosmosClientTelemetryConfig config);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,13 @@ public int value() {
MetricCategory.OperationSummary,
MetricCategory.System
);

public static MetricCategory fromValue(String value) {
for (MetricCategory metricCategory : MetricCategory.values()) {
if (metricCategory.toLowerStringValue.equalsIgnoreCase(value)) {
return metricCategory;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,14 @@ public int value() {
TagName.RequestStatusCode,
TagName.RequestOperationType
);

public static TagName fromValue(String value) {
for (TagName tagName : TagName.values()) {
if (tagName.toLowerStringValue.equalsIgnoreCase(value)) {
return tagName;
}
}
return null;
}
}

Loading

0 comments on commit d17f07b

Please sign in to comment.