Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

addServicePrincipalSupportInKafkaConnector #39490

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Added Source connector. See [PR 39410](https://github.com/Azure/azure-sdk-for-java/pull/39410)
* Added Sink connector. See [PR 39434](https://github.com/Azure/azure-sdk-for-java/pull/39434)
* Added throughput control support. See [PR 39218](https://github.com/Azure/azure-sdk-for-java/pull/39218)
* Added `ServicePrincipal` support - See [PR 39490](https://github.com/Azure/azure-sdk-for-java/pull/39490)

#### Breaking Changes

Expand Down

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Licensed under the MIT License.
<revapi.skip>true</revapi.skip>
<!-- Configures the Java 9+ run to perform the required module exports, opens, and reads that are necessary for testing but shouldn't be part of the module-info. -->
<javaModulesSurefireArgLine>
--add-opens com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED
Expand Down Expand Up @@ -116,6 +117,17 @@ Licensed under the MIT License.
<artifactId>json-path</artifactId>
<version>2.9.0</version> <!-- {x-version-update;cosmos_com.jayway.jsonpath:json-path;external_dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.11.4</version> <!-- {x-version-update;com.azure:azure-identity;dependency} -->
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.azure.cosmos.kafka.connect;

import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.CosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask;
import org.apache.kafka.common.config.Config;
Expand All @@ -20,7 +20,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig;
import static com.azure.cosmos.kafka.connect.implementation.CosmosConfig.validateCosmosAccountAuthConfig;
import static com.azure.cosmos.kafka.connect.implementation.CosmosConfig.validateThroughputControlConfig;

/**
* A Sink connector that publishes topic messages to CosmosDB.
Expand Down Expand Up @@ -64,7 +65,7 @@ public ConfigDef config() {

@Override
public String version() {
return KafkaCosmosConstants.CURRENT_VERSION;
return CosmosConstants.CURRENT_VERSION;
}

@Override
Expand All @@ -81,7 +82,8 @@ public Config validate(Map<String, String> connectorConfigs) {
.stream()
.collect(Collectors.toMap(ConfigValue::name, Function.identity()));

validateThroughputControlConfig(connectorConfigs, configValues);
validateCosmosAccountAuthConfig(configValues);
validateThroughputControlConfig(configValues);
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.CosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceOffsetStorageReader;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTask;
Expand Down Expand Up @@ -43,7 +43,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig;
import static com.azure.cosmos.kafka.connect.implementation.CosmosConfig.validateCosmosAccountAuthConfig;
import static com.azure.cosmos.kafka.connect.implementation.CosmosConfig.validateThroughputControlConfig;

/***
* The CosmosDb source connector.
Expand Down Expand Up @@ -105,7 +106,7 @@ public ConfigDef config() {

@Override
public String version() {
return KafkaCosmosConstants.CURRENT_VERSION;
return CosmosConstants.CURRENT_VERSION;
}

private List<Map<String, String>> getTaskConfigs(int maxTasks) {
Expand Down Expand Up @@ -320,7 +321,7 @@ private List<FeedRange> getFeedRanges(CosmosContainerProperties containerPropert
.getContainer(containerProperties.getId())
.getFeedRanges()
.onErrorMap(throwable ->
KafkaCosmosExceptionsHelper.convertToConnectException(
CosmosExceptionsHelper.convertToConnectException(
throwable,
"GetFeedRanges failed for container " + containerProperties.getId()))
.block();
Expand Down Expand Up @@ -367,7 +368,8 @@ public Config validate(Map<String, String> connectorConfigs) {
.stream()
.collect(Collectors.toMap(ConfigValue::name, Function.identity()));

validateThroughputControlConfig(connectorConfigs, configValues);
validateCosmosAccountAuthConfig(configValues);
validateThroughputControlConfig(configValues);
return config;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.implementation;

import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;

public class CosmosAadAuthConfig implements CosmosAuthConfig {
private final String clientId;
private final String clientSecret;
private final String tenantId;
private final CosmosAzureEnvironments azureEnvironment;

public CosmosAadAuthConfig(String clientId, String clientSecret, String tenantId, CosmosAzureEnvironments azureEnvironment) {
checkArgument(StringUtils.isNotEmpty(clientId), "Argument 'clientId' should not be null");
checkArgument(StringUtils.isNotEmpty(clientSecret), "Argument 'clientSecret' should not be null");
checkArgument(StringUtils.isNotEmpty(tenantId), "Argument 'tenantId' should not be null");

this.clientId = clientId;
this.clientSecret = clientSecret;
this.tenantId = tenantId;
this.azureEnvironment = azureEnvironment;
}

public String getClientId() {
return clientId;
}

public String getClientSecret() {
return clientSecret;
}

public String getTenantId() {
return tenantId;
}

public CosmosAzureEnvironments getAzureEnvironment() {
return azureEnvironment;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,27 @@
import java.util.List;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public class CosmosAccountConfig {
private final String endpoint;
private final String accountKey;
private final CosmosAuthConfig cosmosAuthConfig;
private final String applicationName;
private final boolean useGatewayMode;
private final List<String> preferredRegionsList;

public CosmosAccountConfig(
String endpoint,
String accountKey,
CosmosAuthConfig cosmosAuthConfig,
String applicationName,
boolean useGatewayMode,
List<String> preferredRegionsList) {

checkArgument(StringUtils.isNotEmpty(endpoint), "Argument 'endpoint' should not be null");
checkArgument(StringUtils.isNotEmpty(accountKey), "Argument 'accountKey' should not be null");
checkNotNull(cosmosAuthConfig, "Argument 'cosmosAuthConfig' should not be null");

this.endpoint = endpoint;
this.accountKey = accountKey;
this.cosmosAuthConfig = cosmosAuthConfig;
this.applicationName = applicationName;
this.useGatewayMode = useGatewayMode;
this.preferredRegionsList = preferredRegionsList;
Expand All @@ -37,8 +38,8 @@ public String getEndpoint() {
return endpoint;
}

public String getAccountKey() {
return accountKey;
public CosmosAuthConfig getCosmosAuthConfig() {
return cosmosAuthConfig;
}

public String getApplicationName() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.implementation;

public interface CosmosAuthConfig {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.implementation;

public enum CosmosAuthTypes {
MASTER_KEY("MasterKey"),
SERVICE_PRINCIPAL("ServicePrincipal");

private final String name;

CosmosAuthTypes(String name) {
this.name = name;
}

public String getName() {
return name;
}

public static CosmosAuthTypes fromName(String name) {
for (CosmosAuthTypes authTypes : CosmosAuthTypes.values()) {
if (authTypes.getName().equalsIgnoreCase(name)) {
return authTypes;
}
}
return null;
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.implementation;

public enum CosmosAzureEnvironments {
AZURE("azure"),
AZURE_CHINA("AzureChina"),
AZURE_US_GOVERNMENT("AzureUsGovernment"),
AZURE_GERMANY("AzureGermany");
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved

private final String name;

CosmosAzureEnvironments(String name) {
this.name = name;
}

public String getName() {
return name;
}

public static CosmosAzureEnvironments fromName(String name) {
for (CosmosAzureEnvironments azureEnvironment : CosmosAzureEnvironments.values()) {
if (azureEnvironment.getName().equalsIgnoreCase(name)) {
return azureEnvironment;
}
}
return null;
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,27 @@
import com.azure.cosmos.GatewayConnectionConfig;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;

import java.time.Duration;
import java.util.Map;

public class CosmosClientStore {
// TODO[Public Preview]: revalidate how to get the active directory endpoint map. It suppose to come from management SDK.
private final static Map<CosmosAzureEnvironments, String> activeDirectoryEndpointMap = Map.ofEntries(
Map.entry(CosmosAzureEnvironments.AZURE, "https://login.microsoftonline.com/"),
Map.entry(CosmosAzureEnvironments.AZURE_CHINA, "https://login.chinacloudapi.cn/"),
Map.entry(CosmosAzureEnvironments.AZURE_US_GOVERNMENT, "https://login.microsoftonline.us/"),
Map.entry(CosmosAzureEnvironments.AZURE_GERMANY, "https://login.microsoftonline.de/"));

public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfig) {
if (accountConfig == null) {
return null;
}

CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
.endpoint(accountConfig.getEndpoint())
.key(accountConfig.getAccountKey())
.preferredRegions(accountConfig.getPreferredRegionsList())
.throttlingRetryOptions(
new ThrottlingRetryOptions()
Expand All @@ -31,14 +40,30 @@ public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfi
cosmosClientBuilder.gatewayMode(new GatewayConnectionConfig().setMaxConnectionPoolSize(10000));
}

if (accountConfig.getCosmosAuthConfig() instanceof CosmosMasterKeyAuthConfig) {
cosmosClientBuilder.key(((CosmosMasterKeyAuthConfig) accountConfig.getCosmosAuthConfig()).getMasterKey());
} else if (accountConfig.getCosmosAuthConfig() instanceof CosmosAadAuthConfig) {

CosmosAadAuthConfig aadAuthConfig = (CosmosAadAuthConfig) accountConfig.getCosmosAuthConfig();
ClientSecretCredential tokenCredential = new ClientSecretCredentialBuilder()
.authorityHost(activeDirectoryEndpointMap.get(aadAuthConfig.getAzureEnvironment()).replaceAll("/$", "") +"/")
.tenantId(aadAuthConfig.getTenantId())
.clientId(aadAuthConfig.getClientId())
.clientSecret(aadAuthConfig.getClientSecret())
.build();
cosmosClientBuilder.credential(tokenCredential);
} else {
throw new IllegalArgumentException("Authorization type " + accountConfig.getCosmosAuthConfig().getClass() + "is not supported");
}

return cosmosClientBuilder.buildAsyncClient();
}

private static String getUserAgentSuffix(CosmosAccountConfig accountConfig) {
if (StringUtils.isNotEmpty(accountConfig.getApplicationName())) {
return KafkaCosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName();
return CosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName();
}

return KafkaCosmosConstants.USER_AGENT_SUFFIX;
return CosmosConstants.USER_AGENT_SUFFIX;
}
}
Loading
Loading