diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
index fded2f0b1b380..ef55212090c10 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
@@ -6,6 +6,7 @@
* Added Source connector. See [PR 39410](https://github.com/Azure/azure-sdk-for-java/pull/39410)
* Added Sink connector. See [PR 39434](https://github.com/Azure/azure-sdk-for-java/pull/39434)
* Added throughput control support. See [PR 39218](https://github.com/Azure/azure-sdk-for-java/pull/39218)
+* Added `ServicePrincipal` support - See [PR 39490](https://github.com/Azure/azure-sdk-for-java/pull/39490)
#### Breaking Changes
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md b/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md
index 9c89668f17809..2f4c8da55f432 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md
@@ -2,26 +2,36 @@
## Generic Configuration
-| Config Property Name | Default | Description |
-|:---------------------------------------------------------------------------|:--------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `kafka.connect.cosmos.accountEndpoint` | None | Cosmos DB Account Endpoint Uri |
-| `kafka.connect.cosmos.accountEndpoint` | None | Cosmos DB Account Key |
-| `kafka.connect.cosmos.useGatewayMode` | `false` | Flag to indicate whether to use gateway mode. By default it is false. |
-| `kafka.connect.cosmos.preferredRegionsList` | `[]` | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated kafka cluster with your Cosmos DB account and pass the kafka cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). |
-| `kafka.connect.cosmos.applicationName` | `""` | Application name. Will be added as the userAgent suffix. |
-| `kafka.connect.cosmos.throughputControl.enabled` | `false` | A flag to indicate whether throughput control is enabled. |
-| `kafka.connect.cosmos.throughputControl.accountEndpoint` | `` | Cosmos DB Throughput Control Account Endpoint Uri. |
-| `kafka.connect.cosmos.throughputControl.accountKey` | `` | Cosmos DB Throughput Control Account Key. |
-| `kafka.connect.cosmos.throughputControl.preferredRegionsList` | `[]` | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated kafka cluster with your Cosmos DB account and pass the kafka cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). |
-| `kafka.connect.cosmos.throughputControl.useGatewayMode` | `false` | Flag to indicate whether to use gateway mode. By default it is false. |
-| `kafka.connect.cosmos.throughputControl.name` | `` | Throughput control group name. Since customer is allowed to create many groups for a container, the name should be unique. |
-| `kafka.connect.cosmos.throughputControl.targetThroughput` | `-1` | Throughput control group target throughput. The value should be larger than 0. |
-| `kafka.connect.cosmos.throughputControl.targetThroughputThreshold` | `-1` | Throughput control group target throughput threshold. The value should be between (0,1]. |
-| `kafka.connect.cosmos.throughputControl.priorityLevel` | `None` | Throughput control group priority level. The value can be None, High or Low. |
-| `kafka.connect.cosmos.throughputControl.globalControl.database` | `` | Database which will be used for throughput global control. |
-| `kafka.connect.cosmos.throughputControl.globalControl.container` | `` | Container which will be used for throughput global control. |
-| `kafka.connect.cosmos.throughputControl.globalControl.renewIntervalInMS` | `-1` | This controls how often the client is going to update the throughput usage of itself and adjust its own throughput share based on the throughput usage of other clients. Default is 5s, the allowed min value is 5s. |
-| `kafka.connect.cosmos.throughputControl.globalControl.expireIntervalInMS` | `-1` | This controls how quickly we will detect the client has been offline and hence allow its throughput share to be taken by other clients. Default is 11s, the allowed min value is 2 * renewIntervalInMS + 1. |
+| Config Property Name | Default | Description |
+|:--------------------------------------------------------------------------|:-------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `kafka.connect.cosmos.accountEndpoint` | None | Cosmos DB Account Endpoint Uri |
+| `kafka.connect.cosmos.account.azureEnvironment` | `Azure` | The azure environment of the CosmosDB account: `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`. |
+| `kafka.connect.cosmos.account.tenantId` | `""` | The tenantId of the CosmosDB account. Required for `ServicePrincipal` authentication. |
+| `kafka.connect.cosmos.auth.type` | `MasterKey` | There are two auth types are supported currently: `MasterKey`(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), `ServicePrincipal` |
+| `kafka.connect.cosmos.accountKey` | `""` | Cosmos DB Account Key (only required in case of `auth.type` as `MasterKey`) |
+| `kafka.connect.cosmos.auth.aad.clientId` | `""` | The clientId/ApplicationId of the service principal. Required for `ServicePrincipal` authentication. |
+| `kafka.connect.cosmos.auth.aad.clientSecret` | `""` | The client secret/password of the service principal. |
+| `kafka.connect.cosmos.useGatewayMode` | `false` | Flag to indicate whether to use gateway mode. By default it is false. |
+| `kafka.connect.cosmos.preferredRegionsList` | `[]` | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated kafka cluster with your Cosmos DB account and pass the kafka cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). |
+| `kafka.connect.cosmos.applicationName` | `""` | Application name. Will be added as the userAgent suffix. |
+| `kafka.connect.cosmos.throughputControl.enabled` | `false` | A flag to indicate whether throughput control is enabled. |
+| `kafka.connect.cosmos.throughputControl.accountEndpoint` | `""` | Cosmos DB Throughput Control Account Endpoint Uri. |
+| `kafka.connect.cosmos.throughputControl.account.azureEnvironment` | `Azure` | The azure environment of the CosmosDB account: `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`. |
+| `kafka.connect.cosmos.throughputControl.account.tenantId` | `""` | The tenantId of the CosmosDB account. Required for `ServicePrincipal` authentication. |
+| `kafka.connect.cosmos.throughputControl.auth.type` | `MasterKey` | There are two auth types are supported currently: `MasterKey`(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), `ServicePrincipal` |
+| `kafka.connect.cosmos.throughputControl.accountKey` | `""` | Cosmos DB Throughput Control Account Key (only required in case of `throughputControl.auth.type` as `MasterKey`). |
+| `kafka.connect.cosmos.throughputControl.auth.aad.clientId` | `""` | The clientId/ApplicationId of the service principal. Required for `ServicePrincipal` authentication. |
+| `kafka.connect.cosmos.throughputControl.auth.aad.clientSecret` | `""` | The client secret/password of the service principal. |
+| `kafka.connect.cosmos.throughputControl.preferredRegionsList` | `[]` | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated kafka cluster with your Cosmos DB account and pass the kafka cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). |
+| `kafka.connect.cosmos.throughputControl.useGatewayMode` | `false` | Flag to indicate whether to use gateway mode. By default it is false. |
+| `kafka.connect.cosmos.throughputControl.name` | `""` | Throughput control group name. Since customer is allowed to create many groups for a container, the name should be unique. |
+| `kafka.connect.cosmos.throughputControl.targetThroughput` | `-1` | Throughput control group target throughput. The value should be larger than 0. |
+| `kafka.connect.cosmos.throughputControl.targetThroughputThreshold` | `-1` | Throughput control group target throughput threshold. The value should be between (0,1]. |
+| `kafka.connect.cosmos.throughputControl.priorityLevel` | `None` | Throughput control group priority level. The value can be None, High or Low. |
+| `kafka.connect.cosmos.throughputControl.globalControl.database` | `""` | Database which will be used for throughput global control. |
+| `kafka.connect.cosmos.throughputControl.globalControl.container` | `""` | Container which will be used for throughput global control. |
+| `kafka.connect.cosmos.throughputControl.globalControl.renewIntervalInMS` | `-1` | This controls how often the client is going to update the throughput usage of itself and adjust its own throughput share based on the throughput usage of other clients. Default is 5s, the allowed min value is 5s. |
+| `kafka.connect.cosmos.throughputControl.globalControl.expireIntervalInMS` | `-1` | This controls how quickly we will detect the client has been offline and hence allow its throughput share to be taken by other clients. Default is 11s, the allowed min value is 2 * renewIntervalInMS + 1. |
## Source Connector Configuration
| Config Property Name | Default | Description |
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml b/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
index 75b35046faf8c..9de684e2470cd 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
@@ -44,6 +44,7 @@ Licensed under the MIT License.
true
+ --add-opens com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED
@@ -116,6 +117,17 @@ Licensed under the MIT License.
json-path
2.9.0
+
+ com.azure
+ azure-identity
+ 1.11.4
+
+
+ com.azure
+ azure-core
+
+
+
org.apache.kafka
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java
index dc05ff76f91ec..af8a7a7004799 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java
@@ -20,6 +20,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
+import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateCosmosAccountAuthConfig;
import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig;
/**
@@ -81,7 +82,8 @@ public Config validate(Map connectorConfigs) {
.stream()
.collect(Collectors.toMap(ConfigValue::name, Function.identity()));
- validateThroughputControlConfig(connectorConfigs, configValues);
+ validateCosmosAccountAuthConfig(configValues);
+ validateThroughputControlConfig(configValues);
return config;
}
}
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java
index 4752fdcfb07de..985f513193899 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java
@@ -43,6 +43,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
+import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateCosmosAccountAuthConfig;
import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig;
/***
@@ -367,7 +368,8 @@ public Config validate(Map connectorConfigs) {
.stream()
.collect(Collectors.toMap(ConfigValue::name, Function.identity()));
- validateThroughputControlConfig(connectorConfigs, configValues);
+ validateCosmosAccountAuthConfig(configValues);
+ validateThroughputControlConfig(configValues);
return config;
}
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAadAuthConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAadAuthConfig.java
new file mode 100644
index 0000000000000..cf6db409fa568
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAadAuthConfig.java
@@ -0,0 +1,42 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.kafka.connect.implementation;
+
+import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
+
+import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
+
+public class CosmosAadAuthConfig implements CosmosAuthConfig {
+ private final String clientId;
+ private final String clientSecret;
+ private final String tenantId;
+ private final CosmosAzureEnvironments azureEnvironment;
+
+ public CosmosAadAuthConfig(String clientId, String clientSecret, String tenantId, CosmosAzureEnvironments azureEnvironment) {
+ checkArgument(StringUtils.isNotEmpty(clientId), "Argument 'clientId' should not be null");
+ checkArgument(StringUtils.isNotEmpty(clientSecret), "Argument 'clientSecret' should not be null");
+ checkArgument(StringUtils.isNotEmpty(tenantId), "Argument 'tenantId' should not be null");
+
+ this.clientId = clientId;
+ this.clientSecret = clientSecret;
+ this.tenantId = tenantId;
+ this.azureEnvironment = azureEnvironment;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public String getClientSecret() {
+ return clientSecret;
+ }
+
+ public String getTenantId() {
+ return tenantId;
+ }
+
+ public CosmosAzureEnvironments getAzureEnvironment() {
+ return azureEnvironment;
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAccountConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAccountConfig.java
index 49e2f731a05a3..7fed496d7d86d 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAccountConfig.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAccountConfig.java
@@ -8,26 +8,27 @@
import java.util.List;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
+import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
public class CosmosAccountConfig {
private final String endpoint;
- private final String accountKey;
+ private final CosmosAuthConfig cosmosAuthConfig;
private final String applicationName;
private final boolean useGatewayMode;
private final List preferredRegionsList;
public CosmosAccountConfig(
String endpoint,
- String accountKey,
+ CosmosAuthConfig cosmosAuthConfig,
String applicationName,
boolean useGatewayMode,
List preferredRegionsList) {
checkArgument(StringUtils.isNotEmpty(endpoint), "Argument 'endpoint' should not be null");
- checkArgument(StringUtils.isNotEmpty(accountKey), "Argument 'accountKey' should not be null");
+ checkNotNull(cosmosAuthConfig, "Argument 'cosmosAuthConfig' should not be null");
this.endpoint = endpoint;
- this.accountKey = accountKey;
+ this.cosmosAuthConfig = cosmosAuthConfig;
this.applicationName = applicationName;
this.useGatewayMode = useGatewayMode;
this.preferredRegionsList = preferredRegionsList;
@@ -37,8 +38,8 @@ public String getEndpoint() {
return endpoint;
}
- public String getAccountKey() {
- return accountKey;
+ public CosmosAuthConfig getCosmosAuthConfig() {
+ return cosmosAuthConfig;
}
public String getApplicationName() {
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAuthConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAuthConfig.java
new file mode 100644
index 0000000000000..2eddbf77edd01
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAuthConfig.java
@@ -0,0 +1,7 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.kafka.connect.implementation;
+
+public interface CosmosAuthConfig {
+}
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAuthTypes.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAuthTypes.java
new file mode 100644
index 0000000000000..abb5a685a6ef9
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAuthTypes.java
@@ -0,0 +1,29 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.kafka.connect.implementation;
+
+public enum CosmosAuthTypes {
+ MASTER_KEY("MasterKey"),
+ SERVICE_PRINCIPAL("ServicePrincipal");
+
+ private final String name;
+
+ CosmosAuthTypes(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public static CosmosAuthTypes fromName(String name) {
+ for (CosmosAuthTypes authTypes : CosmosAuthTypes.values()) {
+ if (authTypes.getName().equalsIgnoreCase(name)) {
+ return authTypes;
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAzureEnvironments.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAzureEnvironments.java
new file mode 100644
index 0000000000000..c9e53b734a6ad
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosAzureEnvironments.java
@@ -0,0 +1,30 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.kafka.connect.implementation;
+
+public enum CosmosAzureEnvironments {
+ AZURE("azure"),
+ AZURE_CHINA("AzureChina"),
+ AZURE_US_GOVERNMENT("AzureUsGovernment"),
+ AZURE_GERMANY("AzureGermany");
+
+ private final String name;
+
+ CosmosAzureEnvironments(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public static CosmosAzureEnvironments fromName(String name) {
+ for (CosmosAzureEnvironments azureEnvironment : CosmosAzureEnvironments.values()) {
+ if (azureEnvironment.getName().equalsIgnoreCase(name)) {
+ return azureEnvironment;
+ }
+ }
+ return null;
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java
index 10589369d0e8d..6a8f284e76a0e 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java
@@ -8,10 +8,24 @@
import com.azure.cosmos.GatewayConnectionConfig;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
+import com.azure.identity.ClientSecretCredential;
+import com.azure.identity.ClientSecretCredentialBuilder;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
public class CosmosClientStore {
+ // TODO[Public Preview]: revalidate how to get the active directory endpoint map. It suppose to come from management SDK.
+ private static final Map ACTIVE_DIRECTORY_ENDPOINT_MAP;
+ static {
+ ACTIVE_DIRECTORY_ENDPOINT_MAP = new HashMap<>();
+ ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironments.AZURE, "https://login.microsoftonline.com/");
+ ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironments.AZURE_CHINA, "https://login.chinacloudapi.cn/");
+ ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironments.AZURE_US_GOVERNMENT, "https://login.microsoftonline.us/");
+ ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironments.AZURE_GERMANY, "https://login.microsoftonline.de/");
+ }
+
public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfig) {
if (accountConfig == null) {
return null;
@@ -19,7 +33,6 @@ public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfi
CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
.endpoint(accountConfig.getEndpoint())
- .key(accountConfig.getAccountKey())
.preferredRegions(accountConfig.getPreferredRegionsList())
.throttlingRetryOptions(
new ThrottlingRetryOptions()
@@ -31,6 +44,22 @@ public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfi
cosmosClientBuilder.gatewayMode(new GatewayConnectionConfig().setMaxConnectionPoolSize(10000));
}
+ if (accountConfig.getCosmosAuthConfig() instanceof CosmosMasterKeyAuthConfig) {
+ cosmosClientBuilder.key(((CosmosMasterKeyAuthConfig) accountConfig.getCosmosAuthConfig()).getMasterKey());
+ } else if (accountConfig.getCosmosAuthConfig() instanceof CosmosAadAuthConfig) {
+
+ CosmosAadAuthConfig aadAuthConfig = (CosmosAadAuthConfig) accountConfig.getCosmosAuthConfig();
+ ClientSecretCredential tokenCredential = new ClientSecretCredentialBuilder()
+ .authorityHost(ACTIVE_DIRECTORY_ENDPOINT_MAP.get(aadAuthConfig.getAzureEnvironment()).replaceAll("/$", "") + "/")
+ .tenantId(aadAuthConfig.getTenantId())
+ .clientId(aadAuthConfig.getClientId())
+ .clientSecret(aadAuthConfig.getClientSecret())
+ .build();
+ cosmosClientBuilder.credential(tokenCredential);
+ } else {
+ throw new IllegalArgumentException("Authorization type " + accountConfig.getCosmosAuthConfig().getClass() + "is not supported");
+ }
+
return cosmosClientBuilder.buildAsyncClient();
}
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosMasterKeyAuthConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosMasterKeyAuthConfig.java
new file mode 100644
index 0000000000000..d1029f9588ec0
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosMasterKeyAuthConfig.java
@@ -0,0 +1,22 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.kafka.connect.implementation;
+
+import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
+
+import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
+
+public class CosmosMasterKeyAuthConfig implements CosmosAuthConfig {
+ private final String masterKey;
+
+ public CosmosMasterKeyAuthConfig(String masterKey) {
+ checkArgument(StringUtils.isNotEmpty(masterKey), "Argument 'masterKey' should not be null");
+
+ this.masterKey = masterKey;
+ }
+
+ public String getMasterKey() {
+ return masterKey;
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosThroughputControlConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosThroughputControlConfig.java
index f900a1846a84f..ffe7331fa3e29 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosThroughputControlConfig.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosThroughputControlConfig.java
@@ -3,10 +3,7 @@
package com.azure.cosmos.kafka.connect.implementation;
-import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
-
import java.time.Duration;
-import java.util.List;
public class CosmosThroughputControlConfig {
private final boolean throughputControlEnabled;
@@ -22,11 +19,7 @@ public class CosmosThroughputControlConfig {
public CosmosThroughputControlConfig(
boolean throughputControlEnabled,
- String throughputControlAccountEndpoint,
- String throughputControlAccountKey,
- List throughputControlAccountPreferredRegionList,
- boolean throughputControlUseGatewayMode,
- String applicationName,
+ CosmosAccountConfig throughputControlAccountConfig,
String throughputControlGroupName,
int targetThroughput,
double targetThroughputThreshold,
@@ -37,17 +30,7 @@ public CosmosThroughputControlConfig(
int globalThroughputControlExpireIntervalInMs) {
this.throughputControlEnabled = throughputControlEnabled;
- if (!throughputControlEnabled || StringUtils.isEmpty(throughputControlAccountEndpoint)) {
- this.throughputControlAccountConfig = null;
- } else {
- this.throughputControlAccountConfig =
- new CosmosAccountConfig(
- throughputControlAccountEndpoint,
- throughputControlAccountKey,
- applicationName,
- throughputControlUseGatewayMode,
- throughputControlAccountPreferredRegionList);
- }
+ this.throughputControlAccountConfig = throughputControlAccountConfig;
this.throughputControlGroupName = throughputControlGroupName;
this.targetThroughput = targetThroughput;
this.targetThroughputThreshold = targetThroughputThreshold;
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConfig.java
index 10d115ac247fc..f52f310963ad3 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConfig.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConfig.java
@@ -10,6 +10,7 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.common.config.types.Password;
import java.net.MalformedURLException;
import java.net.URL;
@@ -31,9 +32,36 @@ public class KafkaCosmosConfig extends AbstractConfig {
private static final String ACCOUNT_ENDPOINT_DOC = "Cosmos DB Account Endpoint Uri.";
private static final String ACCOUNT_ENDPOINT_DISPLAY = "Cosmos DB Account Endpoint Uri.";
+ private static final String ACCOUNT_AZURE_ENVIRONMENT = CONFIG_PREFIX + "account.azureEnvironment";
+ private static final String ACCOUNT_AZURE_ENVIRONMENT_DOC = "The azure environment of the CosmosDB account: `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`.";
+ private static final String ACCOUNT_AZURE_ENVIRONMENT_DISPLAY = "The azure environment of the CosmosDB account.";
+ private static final String DEFAULT_ACCOUNT_AZURE_ENVIRONMENT = CosmosAzureEnvironments.AZURE.getName();
+
+ private static final String ACCOUNT_TENANT_ID = CONFIG_PREFIX + "account.tenantId";
+ private static final String ACCOUNT_TENANT_ID_DOC = "The tenantId of the CosmosDB account. Required for `ServicePrincipal` authentication.";
+ private static final String ACCOUNT_TENANT_ID_DISPLAY = "The tenantId of the CosmosDB account.";
+ private static final String DEFAULT_ACCOUNT_TENANT_ID = Strings.Emtpy;
+
+ private static final String AUTH_TYPE = CONFIG_PREFIX + "auth.type";
+ private static final String AUTH_TYPE_DOC = "There are two auth types are supported currently: "
+ + "`MasterKey`(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), `ServicePrincipal`";
+ private static final String AUTH_TYPE_DISPLAY = "Cosmos Auth type.";
+ private static final String DEFAULT_AUTH_TYPE = CosmosAuthTypes.MASTER_KEY.getName();
+
private static final String ACCOUNT_KEY = CONFIG_PREFIX + "accountKey";
- private static final String ACCOUNT_KEY_DOC = "Cosmos DB Account Key.";
+ private static final String ACCOUNT_KEY_DOC = "Cosmos DB Account Key (only required in case of `auth.type` as `MasterKey`)";
private static final String ACCOUNT_KEY_DISPLAY = "Cosmos DB Account Key.";
+ private static final String DEFAULT_ACCOUNT_KEY = Strings.Emtpy;
+
+ private static final String AAD_CLIENT_ID = CONFIG_PREFIX + "auth.aad.clientId";
+ private static final String AAD_CLIENT_ID_DOC = "The clientId/ApplicationId of the service principal. Required for `ServicePrincipal` authentication.";
+ private static final String AAD_CLIENT_ID_DISPLAY = "The clientId/ApplicationId of the service principal.";
+ private static final String DEFAULT_AAD_CLIENT_ID = Strings.Emtpy;
+
+ private static final String AAD_CLIENT_SECRET = CONFIG_PREFIX + "auth.aad.clientSecret";
+ private static final String AAD_CLIENT_SECRET_DOC = "The client secret/password of the service principal. Required for `ServicePrincipal` authentication.";
+ private static final String AAD_CLIENT_SECRET_DISPLAY = "The client secret/password of the service principal.";
+ private static final String DEFAULT_AAD_CLIENT_SECRET = Strings.Emtpy;
private static final String USE_GATEWAY_MODE = CONFIG_PREFIX + "useGatewayMode";
private static final String USE_GATEWAY_MODE_DOC = "Flag to indicate whether to use gateway mode. By default it is false.";
@@ -62,11 +90,37 @@ public class KafkaCosmosConfig extends AbstractConfig {
private static final String THROUGHPUT_CONTROL_ACCOUNT_ENDPOINT_DISPLAY = "Cosmos DB Throughput Control Account Endpoint Uri.";
private static final String DEFAULT_THROUGHPUT_CONTROL_ACCOUNT_ENDPOINT = Strings.Emtpy;
+ private static final String THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT = CONFIG_PREFIX + "throughputControl.account.azureEnvironment";
+ private static final String THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT_DOC = "The azure environment of the CosmosDB account: `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`.";
+ private static final String THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT_DISPLAY = "The azure environment of the CosmosDB account.";
+ private static final String DEFAULT_THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT = CosmosAzureEnvironments.AZURE.getName();
+
+ private static final String THROUGHPUT_CONTROL_ACCOUNT_TENANT_ID = CONFIG_PREFIX + "throughputControl.account.tenantId";
+ private static final String THROUGHPUT_CONTROL_ACCOUNT_TENANT_ID_DOC = "The tenantId of the CosmosDB account. Required for `ServicePrincipal` authentication.";
+ private static final String THROUGHPUT_CONTROL_ACCOUNT_TENANT_ID_DISPLAY = "The tenantId of the CosmosDB account.";
+ private static final String DEFAULT_THROUGHPUT_CONTROL_ACCOUNT_TENANT_ID = Strings.Emtpy;
+
+ private static final String THROUGHPUT_CONTROL_AUTH_TYPE = CONFIG_PREFIX + "throughputControl.auth.type";
+ private static final String THROUGHPUT_CONTROL_AUTH_TYPE_DOC = "There are two auth types are supported currently: "
+ + "`MasterKey`(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), `ServicePrincipal`";
+ private static final String THROUGHPUT_CONTROL_AUTH_TYPE_DISPLAY = "Cosmos Auth type.";
+ private static final String DEFAULT_THROUGHPUT_CONTROL_AUTH_TYPE = CosmosAuthTypes.MASTER_KEY.getName();
+
private static final String THROUGHPUT_CONTROL_ACCOUNT_KEY = CONFIG_PREFIX + "throughputControl.accountKey";
- private static final String THROUGHPUT_CONTROL_ACCOUNT_KEY_DOC = "Cosmos DB Throughput Control Account Key.";
+ private static final String THROUGHPUT_CONTROL_ACCOUNT_KEY_DOC = "Cosmos DB Throughput Control Account Key (only required in case of `throughputControl.auth.type` as `MasterKey`)";
private static final String THROUGHPUT_CONTROL_ACCOUNT_KEY_DISPLAY = "Cosmos DB Throughput Control Account Key.";
private static final String DEFAULT_THROUGHPUT_CONTROL_ACCOUNT_KEY = Strings.Emtpy;
+ private static final String THROUGHPUT_CONTROL_AAD_CLIENT_ID = CONFIG_PREFIX + "throughputControl.auth.aad.clientId";
+ private static final String THROUGHPUT_CONTROL_AAD_CLIENT_ID_DOC = "The clientId/ApplicationId of the service principal. Required for `ServicePrincipal` authentication.";
+ private static final String THROUGHPUT_CONTROL_AAD_CLIENT_ID_DISPLAY = "The clientId/ApplicationId of the service principal.";
+ private static final String DEFAULT_THROUGHPUT_CONTROL_AAD_CLIENT_ID = Strings.Emtpy;
+
+ private static final String THROUGHPUT_CONTROL_AAD_CLIENT_SECRET = CONFIG_PREFIX + "throughputControl.auth.aad.clientSecret";
+ private static final String THROUGHPUT_CONTROL_AAD_CLIENT_SECRET_DOC = "The client secret/password of the service principal. Required for `ServicePrincipal` authentication.";
+ private static final String THROUGHPUT_CONTROL_AAD_CLIENT_SECRET_DISPLAY = "The client secret/password of the service principal.";
+ private static final String DEFAULT_THROUGHPUT_CONTROL_AAD_CLIENT_SECRET = Strings.Emtpy;
+
private static final String THROUGHPUT_CONTROL_PREFERRED_REGIONS_LIST = CONFIG_PREFIX + "throughputControl.preferredRegionsList";
private static final String THROUGHPUT_CONTROL_PREFERRED_REGIONS_LIST_DOC = "Preferred regions list to be used for a multi region Cosmos DB account. "
+ "This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. "
@@ -139,26 +193,99 @@ public KafkaCosmosConfig(ConfigDef config, Map parsedConfig) {
}
private CosmosAccountConfig parseAccountConfig() {
- String endpoint = this.getString(ACCOUNT_ENDPOINT);
- String accountKey = this.getPassword(ACCOUNT_KEY).value();
- String applicationName = this.getString(APPLICATION_NAME);
- boolean useGatewayMode = this.getBoolean(USE_GATEWAY_MODE);
- List preferredRegionList = this.getPreferredRegionList();
+ return parseAccountConfigCore(
+ ACCOUNT_ENDPOINT,
+ ACCOUNT_AZURE_ENVIRONMENT,
+ ACCOUNT_TENANT_ID,
+ AUTH_TYPE,
+ ACCOUNT_KEY,
+ AAD_CLIENT_ID,
+ AAD_CLIENT_SECRET,
+ APPLICATION_NAME,
+ USE_GATEWAY_MODE,
+ PREFERRED_REGIONS_LIST);
+ }
+
+ private CosmosAccountConfig parseAccountConfigCore(
+ String accountEndpointConfig,
+ String accountAzureEnvironmentConfig,
+ String accountTenantIdConfig,
+ String authTypeConfig,
+ String accountKeyConfig,
+ String clientIdConfig,
+ String clientSecretConfig,
+ String applicationNameConfig,
+ String useGatewayModeConfig,
+ String preferredRegionListConfig) {
+
+ String endpoint = this.getString(accountEndpointConfig);
+ CosmosAzureEnvironments azureEnvironment = this.parseAzureEnvironment(accountAzureEnvironmentConfig);
+ String tenantId = this.getString(accountTenantIdConfig);
+ CosmosAuthTypes authType = this.parseCosmosAuthType(authTypeConfig);
+ String masterKey = this.getPassword(accountKeyConfig).value();
+ String clientId = this.getString(clientIdConfig);
+ String clientSecret = this.getPassword(clientSecretConfig).value();
+ CosmosAuthConfig authConfig = getAuthConfig(azureEnvironment, tenantId, authType, masterKey, clientId, clientSecret);
+
+ String applicationName = this.getString(applicationNameConfig);
+ boolean useGatewayMode = this.getBoolean(useGatewayModeConfig);
+ List preferredRegionList = this.getPreferredRegionList(preferredRegionListConfig);
return new CosmosAccountConfig(
endpoint,
- accountKey,
+ authConfig,
applicationName,
useGatewayMode,
preferredRegionList);
}
+ private CosmosAuthConfig getAuthConfig(
+ CosmosAzureEnvironments azureEnvironment,
+ String tenantId,
+ CosmosAuthTypes authType,
+ String masterKey,
+ String clientId,
+ String clientSecret) {
+
+ switch (authType) {
+ case MASTER_KEY:
+ return new CosmosMasterKeyAuthConfig(masterKey);
+ case SERVICE_PRINCIPAL:
+ return new CosmosAadAuthConfig(clientId, clientSecret, tenantId, azureEnvironment);
+ default:
+ throw new IllegalArgumentException("AuthType " + authType + " is not supported");
+ }
+ }
+
+ private CosmosAzureEnvironments parseAzureEnvironment(String configName) {
+ String authType = this.getString(configName);
+ return CosmosAzureEnvironments.fromName(authType);
+ }
+
+ private CosmosAuthTypes parseCosmosAuthType(String configName) {
+ String authType = this.getString(configName);
+ return CosmosAuthTypes.fromName(authType);
+ }
+
private CosmosThroughputControlConfig parseThroughputControlConfig() {
boolean enabled = this.getBoolean(THROUGHPUT_CONTROL_ENABLED);
- String throughputControlEndpoint = this.getString(THROUGHPUT_CONTROL_ACCOUNT_ENDPOINT);
- String throughputControlAccountKey = this.getPassword(THROUGHPUT_CONTROL_ACCOUNT_KEY).value();
- List throughputControlPreferredRegionList = this.getThroughputControlPreferredRegionList();
- boolean throughputControlUseGatewayMode = this.getBoolean(THROUGHPUT_CONTROL_USE_GATEWAY_MODE);
+ String accountEndpoint = this.getString(THROUGHPUT_CONTROL_ACCOUNT_ENDPOINT);
+
+ CosmosAccountConfig throughputControlAccountConfig = null;
+ if (enabled && StringUtils.isNotEmpty(accountEndpoint)) {
+ throughputControlAccountConfig = parseAccountConfigCore(
+ THROUGHPUT_CONTROL_ACCOUNT_ENDPOINT,
+ THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT,
+ THROUGHPUT_CONTROL_ACCOUNT_TENANT_ID,
+ THROUGHPUT_CONTROL_AUTH_TYPE,
+ THROUGHPUT_CONTROL_ACCOUNT_KEY,
+ THROUGHPUT_CONTROL_AAD_CLIENT_ID,
+ THROUGHPUT_CONTROL_AAD_CLIENT_SECRET,
+ APPLICATION_NAME,
+ THROUGHPUT_CONTROL_USE_GATEWAY_MODE,
+ THROUGHPUT_CONTROL_PREFERRED_REGIONS_LIST);
+ }
+
String throughputControlGroupName = this.getString(THROUGHPUT_CONTROL_GROUP_NAME);
int targetThroughput = this.getInt(THROUGHPUT_CONTROL_TARGET_THROUGHPUT);
double targetThroughputThreshold = this.getDouble(THROUGHPUT_CONTROL_TARGET_THROUGHPUT_THRESHOLD);
@@ -167,15 +294,10 @@ private CosmosThroughputControlConfig parseThroughputControlConfig() {
String globalControlContainerName = this.getString(THROUGHPUT_CONTROL_GLOBAL_CONTROL_CONTAINER);
int globalThroughputControlRenewInterval = this.getInt(THROUGHPUT_CONTROL_GLOBAL_CONTROL_RENEW_INTERVAL_IN_MS);
int globalThroughputControlExpireInterval = this.getInt(THROUGHPUT_CONTROL_GLOBAL_CONTROL_EXPIRE_INTERVAL_IN_MS);
- String applicationName = this.getString(APPLICATION_NAME);
return new CosmosThroughputControlConfig(
enabled,
- throughputControlEndpoint,
- throughputControlAccountKey,
- throughputControlPreferredRegionList,
- throughputControlUseGatewayMode,
- applicationName,
+ throughputControlAccountConfig,
throughputControlGroupName,
targetThroughput,
targetThroughputThreshold,
@@ -186,12 +308,8 @@ private CosmosThroughputControlConfig parseThroughputControlConfig() {
globalThroughputControlExpireInterval);
}
- private List getPreferredRegionList() {
- return convertToList(this.getString(PREFERRED_REGIONS_LIST));
- }
-
- private List getThroughputControlPreferredRegionList() {
- return convertToList(this.getString(THROUGHPUT_CONTROL_PREFERRED_REGIONS_LIST));
+ private List getPreferredRegionList(String preferredRegionListConfig) {
+ return convertToList(this.getString(preferredRegionListConfig));
}
private CosmosPriorityLevel parsePriorityLevel() {
@@ -226,17 +344,74 @@ private static void defineAccountConfig(ConfigDef result) {
ConfigDef.Width.LONG,
ACCOUNT_ENDPOINT_DISPLAY
)
+ .define(
+ ACCOUNT_AZURE_ENVIRONMENT,
+ ConfigDef.Type.STRING,
+ DEFAULT_ACCOUNT_AZURE_ENVIRONMENT,
+ new AzureEnvironmentValidator(),
+ ConfigDef.Importance.MEDIUM,
+ ACCOUNT_AZURE_ENVIRONMENT_DOC,
+ accountGroupName,
+ accountGroupOrder++,
+ ConfigDef.Width.LONG,
+ ACCOUNT_AZURE_ENVIRONMENT_DISPLAY
+ )
+ .define(
+ ACCOUNT_TENANT_ID,
+ ConfigDef.Type.STRING,
+ DEFAULT_ACCOUNT_TENANT_ID,
+ ConfigDef.Importance.MEDIUM,
+ ACCOUNT_TENANT_ID_DOC,
+ accountGroupName,
+ accountGroupOrder++,
+ ConfigDef.Width.LONG,
+ ACCOUNT_TENANT_ID_DISPLAY
+ )
+ .define(
+ AUTH_TYPE,
+ ConfigDef.Type.STRING,
+ DEFAULT_AUTH_TYPE,
+ new AuthTypeValidator(),
+ ConfigDef.Importance.MEDIUM,
+ AUTH_TYPE_DOC,
+ accountGroupName,
+ accountGroupOrder++,
+ ConfigDef.Width.MEDIUM,
+ AUTH_TYPE_DISPLAY
+ )
.define(
ACCOUNT_KEY,
ConfigDef.Type.PASSWORD,
- ConfigDef.NO_DEFAULT_VALUE,
- ConfigDef.Importance.HIGH,
+ DEFAULT_ACCOUNT_KEY,
+ ConfigDef.Importance.MEDIUM,
ACCOUNT_KEY_DOC,
accountGroupName,
accountGroupOrder++,
ConfigDef.Width.LONG,
ACCOUNT_KEY_DISPLAY
)
+ .define(
+ AAD_CLIENT_ID,
+ ConfigDef.Type.STRING,
+ DEFAULT_AAD_CLIENT_ID,
+ ConfigDef.Importance.MEDIUM,
+ AAD_CLIENT_ID_DOC,
+ accountGroupName,
+ accountGroupOrder++,
+ ConfigDef.Width.MEDIUM,
+ AAD_CLIENT_ID_DISPLAY
+ )
+ .define(
+ AAD_CLIENT_SECRET,
+ ConfigDef.Type.PASSWORD,
+ DEFAULT_AAD_CLIENT_SECRET,
+ ConfigDef.Importance.MEDIUM,
+ AAD_CLIENT_SECRET_DOC,
+ accountGroupName,
+ accountGroupOrder++,
+ ConfigDef.Width.MEDIUM,
+ AAD_CLIENT_SECRET_DISPLAY
+ )
.define(
APPLICATION_NAME,
ConfigDef.Type.STRING,
@@ -300,6 +475,41 @@ private static void defineThroughputControlConfig(ConfigDef result) {
ConfigDef.Width.LONG,
THROUGHPUT_CONTROL_ACCOUNT_ENDPOINT_DISPLAY
)
+ .define(
+ THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT,
+ ConfigDef.Type.STRING,
+ DEFAULT_THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT,
+ new AzureEnvironmentValidator(),
+ ConfigDef.Importance.LOW,
+ THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT_DOC,
+ throughputControlGroupName,
+ throughputControlGroupOrder++,
+ ConfigDef.Width.LONG,
+ THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT_DISPLAY
+ )
+ .define(
+ THROUGHPUT_CONTROL_ACCOUNT_TENANT_ID,
+ ConfigDef.Type.STRING,
+ DEFAULT_THROUGHPUT_CONTROL_ACCOUNT_TENANT_ID,
+ ConfigDef.Importance.LOW,
+ THROUGHPUT_CONTROL_ACCOUNT_TENANT_ID_DOC,
+ throughputControlGroupName,
+ throughputControlGroupOrder++,
+ ConfigDef.Width.LONG,
+ THROUGHPUT_CONTROL_ACCOUNT_TENANT_ID_DISPLAY
+ )
+ .define(
+ THROUGHPUT_CONTROL_AUTH_TYPE,
+ ConfigDef.Type.STRING,
+ DEFAULT_THROUGHPUT_CONTROL_AUTH_TYPE,
+ new AuthTypeValidator(),
+ ConfigDef.Importance.LOW,
+ THROUGHPUT_CONTROL_AUTH_TYPE_DOC,
+ throughputControlGroupName,
+ throughputControlGroupOrder++,
+ ConfigDef.Width.MEDIUM,
+ THROUGHPUT_CONTROL_AUTH_TYPE_DISPLAY
+ )
.define(
THROUGHPUT_CONTROL_ACCOUNT_KEY,
ConfigDef.Type.PASSWORD,
@@ -311,6 +521,28 @@ private static void defineThroughputControlConfig(ConfigDef result) {
ConfigDef.Width.LONG,
THROUGHPUT_CONTROL_ACCOUNT_KEY_DISPLAY
)
+ .define(
+ THROUGHPUT_CONTROL_AAD_CLIENT_ID,
+ ConfigDef.Type.STRING,
+ DEFAULT_THROUGHPUT_CONTROL_AAD_CLIENT_ID,
+ ConfigDef.Importance.LOW,
+ THROUGHPUT_CONTROL_AAD_CLIENT_ID_DOC,
+ throughputControlGroupName,
+ throughputControlGroupOrder++,
+ ConfigDef.Width.MEDIUM,
+ THROUGHPUT_CONTROL_AAD_CLIENT_ID_DISPLAY
+ )
+ .define(
+ THROUGHPUT_CONTROL_AAD_CLIENT_SECRET,
+ ConfigDef.Type.PASSWORD,
+ DEFAULT_THROUGHPUT_CONTROL_AAD_CLIENT_SECRET,
+ ConfigDef.Importance.LOW,
+ THROUGHPUT_CONTROL_AAD_CLIENT_SECRET_DOC,
+ throughputControlGroupName,
+ throughputControlGroupOrder++,
+ ConfigDef.Width.MEDIUM,
+ THROUGHPUT_CONTROL_AAD_CLIENT_SECRET_DISPLAY
+ )
.define(
THROUGHPUT_CONTROL_PREFERRED_REGIONS_LIST,
ConfigDef.Type.STRING,
@@ -443,18 +675,16 @@ protected static List convertToList(String configValue) {
return new ArrayList<>();
}
- public static void validateThroughputControlConfig(
- Map connectorConfigs,
- Map configValueMap) {
+ public static void validateThroughputControlConfig(Map configValueMap) {
- boolean throughputControlEnabled = Boolean.parseBoolean(connectorConfigs.get(THROUGHPUT_CONTROL_ENABLED));
+ boolean throughputControlEnabled = Boolean.parseBoolean(configValueMap.get(THROUGHPUT_CONTROL_ENABLED).value().toString());
if (!throughputControlEnabled) {
return;
}
// throughput control enabled, validate required configs
// throughput control group name is required
- String throughputControlGroupName = connectorConfigs.get(THROUGHPUT_CONTROL_GROUP_NAME);
+ String throughputControlGroupName = configValueMap.get(THROUGHPUT_CONTROL_GROUP_NAME).value().toString();
if (StringUtils.isEmpty(throughputControlGroupName)) {
configValueMap
.get(THROUGHPUT_CONTROL_GROUP_NAME)
@@ -462,9 +692,9 @@ public static void validateThroughputControlConfig(
}
// one of targetThroughput, targetThroughputThreshold, priorityLevel should be defined
- int targetThroughput = Integer.parseInt(connectorConfigs.get(THROUGHPUT_CONTROL_TARGET_THROUGHPUT));
- double targetThroughputThreshold = Double.parseDouble(connectorConfigs.get(THROUGHPUT_CONTROL_TARGET_THROUGHPUT_THRESHOLD));
- String priorityLevel = connectorConfigs.get(THROUGHPUT_CONTROL_PRIORITY_LEVEL);
+ int targetThroughput = Integer.parseInt(configValueMap.get(THROUGHPUT_CONTROL_TARGET_THROUGHPUT).value().toString());
+ double targetThroughputThreshold = Double.parseDouble(configValueMap.get(THROUGHPUT_CONTROL_TARGET_THROUGHPUT_THRESHOLD).value().toString());
+ String priorityLevel = configValueMap.get(THROUGHPUT_CONTROL_PRIORITY_LEVEL).value().toString();
if (targetThroughput <= 0 && targetThroughputThreshold <= 0
&& priorityLevel.equalsIgnoreCase(CosmosPriorityLevel.NONE.getName())) {
@@ -480,7 +710,7 @@ public static void validateThroughputControlConfig(
}
// throughput control databaseName is required
- String throughputControlDatabaseName = connectorConfigs.get(THROUGHPUT_CONTROL_GLOBAL_CONTROL_DATABASE);
+ String throughputControlDatabaseName = configValueMap.get(THROUGHPUT_CONTROL_GLOBAL_CONTROL_DATABASE).value().toString();
if (StringUtils.isEmpty(throughputControlDatabaseName)) {
configValueMap
.get(THROUGHPUT_CONTROL_GLOBAL_CONTROL_DATABASE)
@@ -488,12 +718,92 @@ public static void validateThroughputControlConfig(
}
// throughput control containerName is required
- String throughputControlContainerName = connectorConfigs.get(THROUGHPUT_CONTROL_GLOBAL_CONTROL_CONTAINER);
+ String throughputControlContainerName = configValueMap.get(THROUGHPUT_CONTROL_GLOBAL_CONTROL_CONTAINER).value().toString();
if (StringUtils.isEmpty(throughputControlContainerName)) {
configValueMap
.get(THROUGHPUT_CONTROL_GLOBAL_CONTROL_CONTAINER)
.addErrorMessage("ThroughputControl is enabled, throughput control container name can not be null or empty");
}
+
+ String throughputControlAccountEndpoint = configValueMap.get(THROUGHPUT_CONTROL_ACCOUNT_ENDPOINT).value().toString();
+ if (StringUtils.isNotEmpty(throughputControlAccountEndpoint)) {
+ validateAccountAuthConfigCore(
+ configValueMap,
+ THROUGHPUT_CONTROL_ACCOUNT_TENANT_ID,
+ THROUGHPUT_CONTROL_AUTH_TYPE,
+ THROUGHPUT_CONTROL_ACCOUNT_KEY,
+ THROUGHPUT_CONTROL_AAD_CLIENT_ID,
+ THROUGHPUT_CONTROL_AAD_CLIENT_SECRET);
+ }
+
+ // if throughput control is using aad auth, then only targetThroughput is supported
+ String throughputControlAuthTypeString =
+ StringUtils.isNotEmpty(throughputControlAccountEndpoint)
+ ? configValueMap.get(THROUGHPUT_CONTROL_AUTH_TYPE).value().toString() : configValueMap.get(AUTH_TYPE).value().toString();
+ CosmosAuthTypes throughputControlAuthType = CosmosAuthTypes.fromName(throughputControlAuthTypeString);
+ if (throughputControlAuthType == CosmosAuthTypes.SERVICE_PRINCIPAL) {
+ if (targetThroughputThreshold > 0) {
+ configValueMap
+ .get(THROUGHPUT_CONTROL_TARGET_THROUGHPUT_THRESHOLD)
+ .addErrorMessage("TargetThroughputThreshold is not supported when using aad auth");
+ }
+ }
+ }
+
+ public static void validateCosmosAccountAuthConfig(Map configValueMap) {
+
+ validateAccountAuthConfigCore(
+ configValueMap,
+ ACCOUNT_TENANT_ID,
+ AUTH_TYPE,
+ ACCOUNT_KEY,
+ AAD_CLIENT_ID,
+ AAD_CLIENT_SECRET);
+ }
+
+ public static void validateAccountAuthConfigCore(
+ Map configValueMap,
+ String accountTenantIdConfig,
+ String authTypeConfig,
+ String accountKeyConfig,
+ String clientIdConfig,
+ String clientSecretConfig) {
+
+ CosmosAuthTypes authType = CosmosAuthTypes.fromName(configValueMap.get(authTypeConfig).value().toString());
+ switch (authType) {
+ case MASTER_KEY:
+ String masterKey = ((Password) configValueMap.get(accountKeyConfig).value()).value();
+ if (StringUtils.isEmpty(masterKey)) {
+ configValueMap
+ .get(accountKeyConfig)
+ .addErrorMessage("MasterKey is required for masterKey auth type");
+ }
+ break;
+ case SERVICE_PRINCIPAL:
+ String tenantId = configValueMap.get(accountTenantIdConfig).value().toString();
+ if (StringUtils.isEmpty(tenantId)) {
+ configValueMap
+ .get(accountTenantIdConfig)
+ .addErrorMessage("TenantId is required for Service Principal auth type");
+ }
+
+ String clientId = configValueMap.get(clientIdConfig).value().toString();
+ if (StringUtils.isEmpty(clientId)) {
+ configValueMap
+ .get(clientIdConfig)
+ .addErrorMessage("ClientId is required for Service Principal auth type");
+ }
+
+ String clientSecret = ((Password) configValueMap.get(clientSecretConfig).value()).value();
+ if (StringUtils.isEmpty(clientSecret)) {
+ configValueMap
+ .get(clientSecretConfig)
+ .addErrorMessage("ClientSecret is required for Service Principal auth type");
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("AuthType " + authType + " is not supported");
+ }
}
public static class AccountEndpointValidator implements ConfigDef.Validator {
@@ -553,4 +863,46 @@ public String toString() {
return "Containers topic map";
}
}
+
+ public static class AuthTypeValidator implements ConfigDef.Validator {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void ensureValid(String name, Object o) {
+ String authTypeString = (String) o;
+ if (StringUtils.isEmpty(authTypeString)) {
+ throw new ConfigException(name, o, "AuthType can not be empty or null");
+ }
+
+ CosmosAuthTypes authType = CosmosAuthTypes.fromName(authTypeString);
+ if (authType == null) {
+ throw new ConfigException(name, o, "Invalid AuthType, only allow MasterKey or ServicePrincipal");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AuthType. Only allow " + CosmosAuthTypes.values();
+ }
+ }
+
+ public static class AzureEnvironmentValidator implements ConfigDef.Validator {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void ensureValid(String name, Object o) {
+ String azureEnvironmentString = (String) o;
+ if (StringUtils.isEmpty(azureEnvironmentString)) {
+ throw new ConfigException(name, o, "AzureEnvironment can not be empty or null");
+ }
+
+ CosmosAzureEnvironments azureEnvironment = CosmosAzureEnvironments.fromName(azureEnvironmentString);
+ if (azureEnvironment == null) {
+ throw new ConfigException(name, o, "Invalid AzureEnvironment, only allow `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AzureEnvironment. Only allow " + CosmosAzureEnvironments.values();
+ }
+ }
}
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosBulkWriter.java
similarity index 99%
rename from sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java
rename to sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosBulkWriter.java
index 61b67b44f9c49..31024a36a3d34 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosBulkWriter.java
@@ -35,8 +35,8 @@
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
-public class KafkaCosmosBulkWriter extends KafkaCosmosWriterBase {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCosmosBulkWriter.class);
+public class CosmosBulkWriter extends CosmosWriterBase {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CosmosBulkWriter.class);
private static final int MAX_DELAY_ON_408_REQUEST_TIMEOUT_IN_MS = 10000;
private static final int MIN_DELAY_ON_408_REQUEST_TIMEOUT_IN_MS = 1000;
private static final Random RANDOM = new Random();
@@ -45,7 +45,7 @@ public class KafkaCosmosBulkWriter extends KafkaCosmosWriterBase {
private final CosmosThroughputControlConfig throughputControlConfig;
private final Sinks.EmitFailureHandler emitFailureHandler;
- public KafkaCosmosBulkWriter(
+ public CosmosBulkWriter(
CosmosSinkWriteConfig writeConfig,
CosmosThroughputControlConfig throughputControlConfig,
ErrantRecordReporter errantRecordReporter) {
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosPointWriter.java
similarity index 98%
rename from sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java
rename to sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosPointWriter.java
index cc5013e954b2f..886240c6ba561 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosPointWriter.java
@@ -21,13 +21,13 @@
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
-public class KafkaCosmosPointWriter extends KafkaCosmosWriterBase {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCosmosPointWriter.class);
+public class CosmosPointWriter extends CosmosWriterBase {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CosmosPointWriter.class);
private final CosmosSinkWriteConfig writeConfig;
private final CosmosThroughputControlConfig throughputControlConfig;
- public KafkaCosmosPointWriter(
+ public CosmosPointWriter(
CosmosSinkWriteConfig writeConfig,
CosmosThroughputControlConfig throughputControlConfig,
ErrantRecordReporter errantRecordReporter) {
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java
index 9a3bf9e2565bc..9e8204418de1c 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java
@@ -42,13 +42,13 @@ public void start(Map props) {
if (this.sinkTaskConfig.getWriteConfig().isBulkEnabled()) {
this.cosmosWriter =
- new KafkaCosmosBulkWriter(
+ new CosmosBulkWriter(
this.sinkTaskConfig.getWriteConfig(),
this.sinkTaskConfig.getThroughputControlConfig(),
this.context.errantRecordReporter());
} else {
this.cosmosWriter =
- new KafkaCosmosPointWriter(
+ new CosmosPointWriter(
this.sinkTaskConfig.getWriteConfig(),
this.sinkTaskConfig.getThroughputControlConfig(),
context.errantRecordReporter());
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosWriterBase.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosWriterBase.java
similarity index 95%
rename from sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosWriterBase.java
rename to sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosWriterBase.java
index c4633cc2df60a..71ed4b840f3be 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosWriterBase.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosWriterBase.java
@@ -22,13 +22,13 @@
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
-public abstract class KafkaCosmosWriterBase implements IWriter {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCosmosWriterBase.class);
+public abstract class CosmosWriterBase implements IWriter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CosmosWriterBase.class);
private static final String ID = "id";
private static final String ETAG = "_etag";
private final ErrantRecordReporter errantRecordReporter;
- public KafkaCosmosWriterBase(ErrantRecordReporter errantRecordReporter) {
+ public CosmosWriterBase(ErrantRecordReporter errantRecordReporter) {
this.errantRecordReporter = errantRecordReporter;
}
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/module-info.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/module-info.java
index aa261ccdbf404..02d92df0b3442 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/module-info.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/module-info.java
@@ -9,6 +9,7 @@
requires connect.api;
requires com.fasterxml.jackson.module.afterburner;
requires json.path;
+ requires com.azure.identity;
// public API surface area
exports com.azure.cosmos.kafka.connect;
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorITest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorITest.java
index cfeb28f71d658..75929707ca19d 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorITest.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorITest.java
@@ -6,6 +6,7 @@
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.Utils;
+import com.azure.cosmos.kafka.connect.implementation.CosmosAuthTypes;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig;
import com.fasterxml.jackson.databind.JsonNode;
@@ -15,6 +16,7 @@
import org.apache.kafka.connect.storage.StringConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.ArrayList;
@@ -28,25 +30,43 @@
public class CosmosSinkConnectorITest extends KafkaCosmosIntegrationTestSuiteBase {
private static final Logger logger = LoggerFactory.getLogger(CosmosSinkConnectorITest.class);
+ @DataProvider(name = "sinkAuthParameterProvider")
+ public static Object[][] sinkAuthParameterProvider() {
+ return new Object[][]{
+ // use masterKey auth
+ { true },
+ { false }
+ };
+ }
+
// TODO[public preview]: add more integration tests
- @Test(groups = { "kafka-integration"}, timeOut = TIMEOUT)
- public void sinkToSingleContainer() throws InterruptedException {
+ @Test(groups = { "kafka-integration"}, dataProvider = "sinkAuthParameterProvider", timeOut = TIMEOUT)
+ public void sinkToSingleContainer(boolean useMasterKey) throws InterruptedException {
Map sinkConnectorConfig = new HashMap<>();
+ String topicName = singlePartitionContainerName + "-" + UUID.randomUUID();
- sinkConnectorConfig.put("topics", singlePartitionContainerName);
+ sinkConnectorConfig.put("topics", topicName);
sinkConnectorConfig.put("value.converter", JsonConverter.class.getName());
// TODO[Public Preview]: add tests for with schema
sinkConnectorConfig.put("value.converter.schemas.enable", "false");
sinkConnectorConfig.put("key.converter", StringConverter.class.getName());
sinkConnectorConfig.put("connector.class", "com.azure.cosmos.kafka.connect.CosmosSinkConnector");
sinkConnectorConfig.put("kafka.connect.cosmos.accountEndpoint", KafkaCosmosTestConfigurations.HOST);
- sinkConnectorConfig.put("kafka.connect.cosmos.accountKey", KafkaCosmosTestConfigurations.MASTER_KEY);
sinkConnectorConfig.put("kafka.connect.cosmos.applicationName", "Test");
sinkConnectorConfig.put("kafka.connect.cosmos.sink.database.name", databaseName);
- sinkConnectorConfig.put("kafka.connect.cosmos.sink.containers.topicMap", singlePartitionContainerName + "#" + singlePartitionContainerName);
+ sinkConnectorConfig.put("kafka.connect.cosmos.sink.containers.topicMap", topicName + "#" + singlePartitionContainerName);
+
+ if (useMasterKey) {
+ sinkConnectorConfig.put("kafka.connect.cosmos.accountKey", KafkaCosmosTestConfigurations.MASTER_KEY);
+ } else {
+ sinkConnectorConfig.put("kafka.connect.cosmos.auth.type", CosmosAuthTypes.SERVICE_PRINCIPAL.getName());
+ sinkConnectorConfig.put("kafka.connect.cosmos.account.tenantId", KafkaCosmosTestConfigurations.ACCOUNT_TENANT_ID);
+ sinkConnectorConfig.put("kafka.connect.cosmos.auth.aad.clientId", KafkaCosmosTestConfigurations.ACCOUNT_AAD_CLIENT_ID);
+ sinkConnectorConfig.put("kafka.connect.cosmos.auth.aad.clientSecret", KafkaCosmosTestConfigurations.ACCOUNT_AAD_CLIENT_SECRET);
+ }
// Create topic ahead of time
- kafkaCosmosConnectContainer.createTopic(singlePartitionContainerName, 1);
+ kafkaCosmosConnectContainer.createTopic(topicName, 1);
CosmosSinkConfig sinkConfig = new CosmosSinkConfig(sinkConnectorConfig);
CosmosAsyncClient client = CosmosClientStore.getCosmosClient(sinkConfig.getAccountConfig());
@@ -65,7 +85,7 @@ public void sinkToSingleContainer() throws InterruptedException {
for (int i = 0; i < 10; i++) {
TestItem testItem = TestItem.createNewItem();
ProducerRecord record =
- new ProducerRecord<>(singlePartitionContainerName, testItem.getId(), Utils.getSimpleObjectMapper().valueToTree(testItem));
+ new ProducerRecord<>(topicName, testItem.getId(), Utils.getSimpleObjectMapper().valueToTree(testItem));
kafkaProducer.send(record);
recordValueIds.add(testItem.getId());
}
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorTest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorTest.java
index a24b1c0afaf0a..d4743d2c62abd 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorTest.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorTest.java
@@ -4,6 +4,8 @@
package com.azure.cosmos.kafka.connect;
import com.azure.cosmos.implementation.Strings;
+import com.azure.cosmos.implementation.TestConfigurations;
+import com.azure.cosmos.kafka.connect.implementation.CosmosAuthTypes;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask;
import com.azure.cosmos.kafka.connect.implementation.sink.IdStrategies;
@@ -62,7 +64,6 @@ public void requiredConfig() {
Map> errorMessages = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
assertThat(errorMessages.get("kafka.connect.cosmos.accountEndpoint").size()).isGreaterThan(0);
- assertThat(errorMessages.get("kafka.connect.cosmos.accountKey").size()).isGreaterThan(0);
assertThat(errorMessages.get("kafka.connect.cosmos.sink.database.name").size()).isGreaterThan(0);
assertThat(errorMessages.get("kafka.connect.cosmos.sink.containers.topicMap").size()).isGreaterThan(0);
}
@@ -153,13 +154,15 @@ public void sinkConfigWithThroughputControl() {
@Test(groups = { "unit" })
public void invalidThroughputControlConfig() {
+ CosmosSinkConnector sinkConnector = new CosmosSinkConnector();
+ // invalid targetThroughput, targetThroughputThreshold, priorityLevel config and missing required config for throughput control container info
Map sinkConfigMap = this.getValidSinkConfig();
sinkConfigMap.put("kafka.connect.cosmos.throughputControl.enabled", "true");
sinkConfigMap.put("kafka.connect.cosmos.throughputControl.targetThroughput", "-1");
sinkConfigMap.put("kafka.connect.cosmos.throughputControl.targetThroughputThreshold", "-1");
sinkConfigMap.put("kafka.connect.cosmos.throughputControl.priorityLevel", "None");
- Config config = new CosmosSinkConnector().validate(sinkConfigMap);
+ Config config = sinkConnector.validate(sinkConfigMap);
Map> errorMessages = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.name").size()).isGreaterThan(0);
@@ -168,6 +171,37 @@ public void invalidThroughputControlConfig() {
assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.priorityLevel").size()).isGreaterThan(0);
assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.globalControl.database").size()).isGreaterThan(0);
assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.globalControl.container").size()).isGreaterThan(0);
+
+ // invalid throughput control account config with masterKey auth
+ sinkConfigMap = this.getValidSinkConfig();
+ sinkConfigMap.put("kafka.connect.cosmos.throughputControl.enabled", "true");
+ sinkConfigMap.put("kafka.connect.cosmos.throughputControl.targetThroughput", "1");
+ sinkConfigMap.put("kafka.connect.cosmos.throughputControl.globalControl.database", "ThroughputControlDatabase");
+ sinkConfigMap.put("kafka.connect.cosmos.throughputControl.globalControl.container", "ThroughputControlContainer");
+ sinkConfigMap.put("kafka.connect.cosmos.throughputControl.name", "groupName");
+ sinkConfigMap.put("kafka.connect.cosmos.throughputControl.accountEndpoint", TestConfigurations.HOST);
+
+ config = sinkConnector.validate(sinkConfigMap);
+ errorMessages = config.configValues().stream()
+ .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
+ assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.accountKey").size()).isGreaterThan(0);
+
+ // targetThroughputThreshold is not supported when using add auth for throughput control
+ sinkConfigMap = this.getValidSinkConfig();
+ sinkConfigMap.put("kafka.connect.cosmos.throughputControl.enabled", "true");
+ sinkConfigMap.put("kafka.connect.cosmos.throughputControl.targetThroughputThreshold", "0.9");
+ sinkConfigMap.put("kafka.connect.cosmos.throughputControl.globalControl.database", "ThroughputControlDatabase");
+ sinkConfigMap.put("kafka.connect.cosmos.throughputControl.globalControl.container", "ThroughputControlContainer");
+ sinkConfigMap.put("kafka.connect.cosmos.throughputControl.name", "groupName");
+ sinkConfigMap.put("kafka.connect.cosmos.throughputControl.accountEndpoint", TestConfigurations.HOST);
+ sinkConfigMap.put("kafka.connect.cosmos.throughputControl.auth.type", CosmosAuthTypes.SERVICE_PRINCIPAL.getName());
+
+ config = sinkConnector.validate(sinkConfigMap);
+ errorMessages = config.configValues().stream()
+ .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
+ assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.auth.aad.clientId").size()).isGreaterThan(0);
+ assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.auth.aad.clientSecret").size()).isGreaterThan(0);
+ assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.account.tenantId").size()).isGreaterThan(0);
}
private Map getValidSinkConfig() {
@@ -183,13 +217,21 @@ private Map getValidSinkConfig() {
public static class SinkConfigs {
public static final List> ALL_VALID_CONFIGS = Arrays.asList(
new KafkaCosmosConfigEntry("kafka.connect.cosmos.accountEndpoint", null, false),
- new KafkaCosmosConfigEntry("kafka.connect.cosmos.accountKey", null, false),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.account.tenantId", Strings.Emtpy, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.auth.type", CosmosAuthTypes.MASTER_KEY.getName(), true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.accountKey", Strings.Emtpy, true, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.auth.aad.clientId", Strings.Emtpy, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.auth.aad.clientSecret", Strings.Emtpy, true, true),
new KafkaCosmosConfigEntry("kafka.connect.cosmos.useGatewayMode", false, true),
new KafkaCosmosConfigEntry("kafka.connect.cosmos.preferredRegionsList", Strings.Emtpy, true),
new KafkaCosmosConfigEntry("kafka.connect.cosmos.applicationName", Strings.Emtpy, true),
new KafkaCosmosConfigEntry<>("kafka.connect.cosmos.throughputControl.enabled", false, true),
new KafkaCosmosConfigEntry<>("kafka.connect.cosmos.throughputControl.accountEndpoint", Strings.Emtpy, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.throughputControl.account.tenantId", Strings.Emtpy, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.throughputControl.auth.type", CosmosAuthTypes.MASTER_KEY.getName(), true),
new KafkaCosmosConfigEntry<>("kafka.connect.cosmos.throughputControl.accountKey", Strings.Emtpy, true, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.throughputControl.auth.aad.clientId", Strings.Emtpy, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.throughputControl.auth.aad.clientSecret", Strings.Emtpy, true, true),
new KafkaCosmosConfigEntry<>("kafka.connect.cosmos.throughputControl.preferredRegionsList", Strings.Emtpy, true),
new KafkaCosmosConfigEntry<>("kafka.connect.cosmos.throughputControl.useGatewayMode", false, true),
new KafkaCosmosConfigEntry<>("kafka.connect.cosmos.throughputControl.name", Strings.Emtpy, true),
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java
index 057755a47fc25..94ace901f907a 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java
@@ -5,6 +5,7 @@
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
+import com.azure.cosmos.kafka.connect.implementation.CosmosAuthTypes;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig;
import com.fasterxml.jackson.databind.JsonNode;
@@ -13,6 +14,7 @@
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.time.Duration;
@@ -30,20 +32,40 @@
public class CosmosSourceConnectorITest extends KafkaCosmosIntegrationTestSuiteBase {
private static final Logger logger = LoggerFactory.getLogger(CosmosSourceConnectorITest.class);
+ @DataProvider(name = "sourceAuthParameterProvider")
+ public static Object[][] sourceAuthParameterProvider() {
+ return new Object[][]{
+ // use masterKey auth
+ { true },
+ { false }
+ };
+ }
+
// TODO[public preview]: add more integration tests
- @Test(groups = { "kafka-integration"}, timeOut = TIMEOUT)
- public void readFromSingleContainer() {
+ @Test(groups = { "kafka-integration"}, dataProvider = "sourceAuthParameterProvider", timeOut = TIMEOUT)
+ public void readFromSingleContainer(boolean useMasterKey) {
+ String topicName = singlePartitionContainerName + "-" + UUID.randomUUID();
+
Map sourceConnectorConfig = new HashMap<>();
sourceConnectorConfig.put("connector.class", "com.azure.cosmos.kafka.connect.CosmosSourceConnector");
sourceConnectorConfig.put("kafka.connect.cosmos.accountEndpoint", KafkaCosmosTestConfigurations.HOST);
- sourceConnectorConfig.put("kafka.connect.cosmos.accountKey", KafkaCosmosTestConfigurations.MASTER_KEY);
sourceConnectorConfig.put("kafka.connect.cosmos.applicationName", "Test");
sourceConnectorConfig.put("kafka.connect.cosmos.source.database.name", databaseName);
sourceConnectorConfig.put("kafka.connect.cosmos.source.containers.includeAll", "false");
sourceConnectorConfig.put("kafka.connect.cosmos.source.containers.includedList", singlePartitionContainerName);
+ sourceConnectorConfig.put("kafka.connect.cosmos.source.containers.topicMap", topicName + "#" + singlePartitionContainerName);
+
+ if (useMasterKey) {
+ sourceConnectorConfig.put("kafka.connect.cosmos.accountKey", KafkaCosmosTestConfigurations.MASTER_KEY);
+ } else {
+ sourceConnectorConfig.put("kafka.connect.cosmos.auth.type", CosmosAuthTypes.SERVICE_PRINCIPAL.getName());
+ sourceConnectorConfig.put("kafka.connect.cosmos.account.tenantId", KafkaCosmosTestConfigurations.ACCOUNT_TENANT_ID);
+ sourceConnectorConfig.put("kafka.connect.cosmos.auth.aad.clientId", KafkaCosmosTestConfigurations.ACCOUNT_AAD_CLIENT_ID);
+ sourceConnectorConfig.put("kafka.connect.cosmos.auth.aad.clientSecret", KafkaCosmosTestConfigurations.ACCOUNT_AAD_CLIENT_SECRET);
+ }
// Create topic ahead of time
- kafkaCosmosConnectContainer.createTopic(singlePartitionContainerName, 1);
+ kafkaCosmosConnectContainer.createTopic(topicName, 1);
CosmosSourceConfig sourceConfig = new CosmosSourceConfig(sourceConnectorConfig);
CosmosAsyncClient client = CosmosClientStore.getCosmosClient(sourceConfig.getAccountConfig());
@@ -66,7 +88,7 @@ public void readFromSingleContainer() {
KafkaConsumer kafkaConsumer = kafkaCosmosConnectContainer.getConsumer();
kafkaConsumer.subscribe(
Arrays.asList(
- singlePartitionContainerName,
+ topicName,
sourceConfig.getMetadataConfig().getMetadataTopicName()));
List> metadataRecords = new ArrayList<>();
@@ -76,7 +98,7 @@ public void readFromSingleContainer() {
kafkaConsumer.poll(Duration.ofMillis(1000))
.iterator()
.forEachRemaining(consumerRecord -> {
- if (consumerRecord.topic().equals(singlePartitionContainerName)) {
+ if (consumerRecord.topic().equals(topicName)) {
itemRecords.add(consumerRecord);
} else if (consumerRecord.topic().equals(sourceConfig.getMetadataConfig().getMetadataTopicName())) {
metadataRecords.add(consumerRecord);
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorTest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorTest.java
index 447c3d9267992..15b5501d02280 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorTest.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorTest.java
@@ -6,7 +6,6 @@
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.Strings;
-import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
@@ -16,6 +15,7 @@
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
+import com.azure.cosmos.kafka.connect.implementation.CosmosAuthTypes;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosChangeFeedModes;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosChangeFeedStartFromModes;
@@ -90,8 +90,8 @@ public void getTaskConfigsWithoutPersistedOffset() throws JsonProcessingExceptio
CosmosSourceConnector sourceConnector = new CosmosSourceConnector();
try {
Map sourceConfigMap = new HashMap<>();
- sourceConfigMap.put("kafka.connect.cosmos.accountEndpoint", TestConfigurations.HOST);
- sourceConfigMap.put("kafka.connect.cosmos.accountKey", TestConfigurations.MASTER_KEY);
+ sourceConfigMap.put("kafka.connect.cosmos.accountEndpoint", KafkaCosmosTestConfigurations.HOST);
+ sourceConfigMap.put("kafka.connect.cosmos.accountKey", KafkaCosmosTestConfigurations.MASTER_KEY);
sourceConfigMap.put("kafka.connect.cosmos.source.database.name", databaseName);
List containersIncludedList = Arrays.asList(
singlePartitionContainerName,
@@ -163,8 +163,8 @@ public void getTaskConfigsAfterSplit() throws JsonProcessingException {
try {
Map sourceConfigMap = new HashMap<>();
- sourceConfigMap.put("kafka.connect.cosmos.accountEndpoint", TestConfigurations.HOST);
- sourceConfigMap.put("kafka.connect.cosmos.accountKey", TestConfigurations.MASTER_KEY);
+ sourceConfigMap.put("kafka.connect.cosmos.accountEndpoint", KafkaCosmosTestConfigurations.HOST);
+ sourceConfigMap.put("kafka.connect.cosmos.accountKey", KafkaCosmosTestConfigurations.MASTER_KEY);
sourceConfigMap.put("kafka.connect.cosmos.source.database.name", databaseName);
List containersIncludedList = Arrays.asList(multiPartitionContainerName);
sourceConfigMap.put("kafka.connect.cosmos.source.containers.includedList", containersIncludedList.toString());
@@ -258,8 +258,8 @@ public void getTaskConfigsAfterMerge() throws JsonProcessingException {
try {
Map sourceConfigMap = new HashMap<>();
- sourceConfigMap.put("kafka.connect.cosmos.accountEndpoint", TestConfigurations.HOST);
- sourceConfigMap.put("kafka.connect.cosmos.accountKey", TestConfigurations.MASTER_KEY);
+ sourceConfigMap.put("kafka.connect.cosmos.accountEndpoint", KafkaCosmosTestConfigurations.HOST);
+ sourceConfigMap.put("kafka.connect.cosmos.accountKey", KafkaCosmosTestConfigurations.MASTER_KEY);
sourceConfigMap.put("kafka.connect.cosmos.source.database.name", databaseName);
List containersIncludedList = Arrays.asList(singlePartitionContainerName);
sourceConfigMap.put("kafka.connect.cosmos.source.containers.includedList", containersIncludedList.toString());
@@ -467,13 +467,16 @@ public void sourceConfigWithThroughputControl() {
@Test(groups = { "unit" })
public void invalidThroughputControlConfig() {
+ CosmosSourceConnector sourceConnector = new CosmosSourceConnector();
+ // invalid targetThroughput, targetThroughputThreshold, priorityLevel config and missing required config for throughput control container info
+
Map sourceConfigMap = this.getValidSourceConfig();
sourceConfigMap.put("kafka.connect.cosmos.throughputControl.enabled", "true");
sourceConfigMap.put("kafka.connect.cosmos.throughputControl.targetThroughput", "-1");
sourceConfigMap.put("kafka.connect.cosmos.throughputControl.targetThroughputThreshold", "-1");
sourceConfigMap.put("kafka.connect.cosmos.throughputControl.priorityLevel", "None");
- Config config = new CosmosSourceConnector().validate(sourceConfigMap);
+ Config config = sourceConnector.validate(sourceConfigMap);
Map> errorMessages = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.name").size()).isGreaterThan(0);
@@ -482,12 +485,43 @@ public void invalidThroughputControlConfig() {
assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.priorityLevel").size()).isGreaterThan(0);
assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.globalControl.database").size()).isGreaterThan(0);
assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.globalControl.container").size()).isGreaterThan(0);
+
+ // invalid throughput control account config with masterKey auth
+ sourceConfigMap = this.getValidSourceConfig();
+ sourceConfigMap.put("kafka.connect.cosmos.throughputControl.enabled", "true");
+ sourceConfigMap.put("kafka.connect.cosmos.throughputControl.targetThroughput", "1");
+ sourceConfigMap.put("kafka.connect.cosmos.throughputControl.globalControl.database", "ThroughputControlDatabase");
+ sourceConfigMap.put("kafka.connect.cosmos.throughputControl.globalControl.container", "ThroughputControlContainer");
+ sourceConfigMap.put("kafka.connect.cosmos.throughputControl.name", "groupName");
+ sourceConfigMap.put("kafka.connect.cosmos.throughputControl.accountEndpoint", KafkaCosmosTestConfigurations.HOST);
+
+ config = sourceConnector.validate(sourceConfigMap);
+ errorMessages = config.configValues().stream()
+ .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
+ assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.accountKey").size()).isGreaterThan(0);
+
+ // targetThroughputThreshold is not supported when using add auth for throughput control
+ sourceConfigMap = this.getValidSourceConfig();
+ sourceConfigMap.put("kafka.connect.cosmos.throughputControl.enabled", "true");
+ sourceConfigMap.put("kafka.connect.cosmos.throughputControl.targetThroughputThreshold", "0.9");
+ sourceConfigMap.put("kafka.connect.cosmos.throughputControl.globalControl.database", "ThroughputControlDatabase");
+ sourceConfigMap.put("kafka.connect.cosmos.throughputControl.globalControl.container", "ThroughputControlContainer");
+ sourceConfigMap.put("kafka.connect.cosmos.throughputControl.name", "groupName");
+ sourceConfigMap.put("kafka.connect.cosmos.throughputControl.accountEndpoint", KafkaCosmosTestConfigurations.HOST);
+ sourceConfigMap.put("kafka.connect.cosmos.throughputControl.auth.type", CosmosAuthTypes.SERVICE_PRINCIPAL.getName());
+
+ config = sourceConnector.validate(sourceConfigMap);
+ errorMessages = config.configValues().stream()
+ .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
+ assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.auth.aad.clientId").size()).isGreaterThan(0);
+ assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.auth.aad.clientSecret").size()).isGreaterThan(0);
+ assertThat(errorMessages.get("kafka.connect.cosmos.throughputControl.account.tenantId").size()).isGreaterThan(0);
}
private Map getValidSourceConfig() {
Map sourceConfigMap = new HashMap<>();
- sourceConfigMap.put("kafka.connect.cosmos.accountEndpoint", TestConfigurations.HOST);
- sourceConfigMap.put("kafka.connect.cosmos.accountKey", TestConfigurations.MASTER_KEY);
+ sourceConfigMap.put("kafka.connect.cosmos.accountEndpoint", KafkaCosmosTestConfigurations.HOST);
+ sourceConfigMap.put("kafka.connect.cosmos.accountKey", KafkaCosmosTestConfigurations.MASTER_KEY);
sourceConfigMap.put("kafka.connect.cosmos.source.database.name", databaseName);
List containersIncludedList = Arrays.asList(singlePartitionContainerName);
sourceConfigMap.put("kafka.connect.cosmos.source.containers.includedList", containersIncludedList.toString());
@@ -636,13 +670,21 @@ private void validateMetadataTask(
public static class SourceConfigs {
public static final List> ALL_VALID_CONFIGS = Arrays.asList(
new KafkaCosmosConfigEntry("kafka.connect.cosmos.accountEndpoint", null, false),
- new KafkaCosmosConfigEntry("kafka.connect.cosmos.accountKey", null, false),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.account.tenantId", Strings.Emtpy, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.auth.type", CosmosAuthTypes.MASTER_KEY.getName(), true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.accountKey", Strings.Emtpy, true, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.auth.aad.clientId", Strings.Emtpy, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.auth.aad.clientSecret", Strings.Emtpy, true, true),
new KafkaCosmosConfigEntry("kafka.connect.cosmos.useGatewayMode", false, true),
new KafkaCosmosConfigEntry("kafka.connect.cosmos.preferredRegionsList", Strings.Emtpy, true),
new KafkaCosmosConfigEntry("kafka.connect.cosmos.applicationName", Strings.Emtpy, true),
new KafkaCosmosConfigEntry<>("kafka.connect.cosmos.throughputControl.enabled", false, true),
new KafkaCosmosConfigEntry<>("kafka.connect.cosmos.throughputControl.accountEndpoint", Strings.Emtpy, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.throughputControl.account.tenantId", Strings.Emtpy, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.throughputControl.auth.type", CosmosAuthTypes.MASTER_KEY.getName(), true),
new KafkaCosmosConfigEntry<>("kafka.connect.cosmos.throughputControl.accountKey", Strings.Emtpy, true, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.throughputControl.auth.aad.clientId", Strings.Emtpy, true),
+ new KafkaCosmosConfigEntry("kafka.connect.cosmos.throughputControl.auth.aad.clientSecret", Strings.Emtpy, true, true),
new KafkaCosmosConfigEntry<>("kafka.connect.cosmos.throughputControl.preferredRegionsList", Strings.Emtpy, true),
new KafkaCosmosConfigEntry<>("kafka.connect.cosmos.throughputControl.useGatewayMode", false, true),
new KafkaCosmosConfigEntry<>("kafka.connect.cosmos.throughputControl.name", Strings.Emtpy, true),
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/KafkaCosmosTestConfigurations.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/KafkaCosmosTestConfigurations.java
index f26d86ce17286..bd59c470694a9 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/KafkaCosmosTestConfigurations.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/KafkaCosmosTestConfigurations.java
@@ -53,6 +53,27 @@ public class KafkaCosmosTestConfigurations {
StringUtils.defaultString(
Strings.emptyToNull(System.getenv().get("ACCOUNT_HOST")), COSMOS_EMULATOR_HOST));
+ public final static String ACCOUNT_TENANT_ID =
+ properties
+ .getProperty(
+ "ACCOUNT_TENANT_ID",
+ StringUtils.defaultString(
+ Strings.emptyToNull(System.getenv().get("ACCOUNT_TENANT_ID")), StringUtils.EMPTY));
+
+ public final static String ACCOUNT_AAD_CLIENT_ID =
+ properties
+ .getProperty(
+ "ACCOUNT_AAD_CLIENT_ID",
+ StringUtils.defaultString(
+ Strings.emptyToNull(System.getenv().get("ACCOUNT_AAD_CLIENT_ID")), StringUtils.EMPTY));
+
+ public final static String ACCOUNT_AAD_CLIENT_SECRET =
+ properties
+ .getProperty(
+ "ACCOUNT_AAD_CLIENT_SECRET",
+ StringUtils.defaultString(
+ Strings.emptyToNull(System.getenv().get("ACCOUNT_AAD_CLIENT_SECRET")), StringUtils.EMPTY));
+
public final static String KAFKA_CLUSTER_KEY =
properties
.getProperty(
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThreadTest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThreadTest.java
index 35645a2e71f3d..95a5c0523789a 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThreadTest.java
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThreadTest.java
@@ -10,6 +10,7 @@
import com.azure.cosmos.kafka.connect.KafkaCosmosTestSuiteBase;
import com.azure.cosmos.kafka.connect.implementation.CosmosAccountConfig;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
+import com.azure.cosmos.kafka.connect.implementation.CosmosMasterKeyAuthConfig;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.FeedRange;
import org.apache.kafka.connect.source.SourceConnectorContext;
@@ -33,7 +34,7 @@ public class MetadataMonitorThreadTest extends KafkaCosmosTestSuiteBase {
public void before_MetadataMonitorThreadTest() {
CosmosAccountConfig accountConfig = new CosmosAccountConfig(
TestConfigurations.HOST,
- TestConfigurations.MASTER_KEY,
+ new CosmosMasterKeyAuthConfig(TestConfigurations.MASTER_KEY),
"requestTaskReconfigurationTest",
false,
new ArrayList());