Skip to content

Commit

Permalink
Merge pull request #561 from xinlian12/moveLeaseContainerCreationInto…
Browse files Browse the repository at this point in the history
…Connector

MoveLeaseContainerCreationIntoConnector
  • Loading branch information
xinlian12 authored Mar 12, 2024
2 parents d66ca32 + cca995b commit 6ead162
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 87 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
## Release History

### 1.15.0-beta.1 (Unreleased)
#### New Features

#### Key Bug Fixes
* Fixed an issue where only 1 task run successfully when `CosmosDBSourceConnector` is configured with `maxTasks` larger than `1` - [PR 561](https://github.com/microsoft/kafka-connect-cosmosdb/pull/561)

#### Other Changes

### 1.14.1 (2024-02-29)
#### Key Bug Fixes
* Fixed `NullPointerException` in `CosmosDBSourceConnector`. [PR 555](https://github.com/microsoft/kafka-connect-cosmosdb/pull/555)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.azure.cosmos.kafka</groupId>
<artifactId>kafka-connect-cosmos</artifactId>
<version>1.14.1</version>
<version>1.15.0-beta.1</version>

<name> kafka-connect-cosmos</name>
<url>https://github.com/microsoft/kafka-connect-cosmosdb</url>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.implementations;

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.kafka.connect.source.CosmosDBSourceConfig;
import org.apache.commons.lang3.StringUtils;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;

public class CosmosClientStore {
public static CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config, String userAgentSuffix) {
checkArgument(StringUtils.isNotEmpty(userAgentSuffix), "Argument 'userAgentSuffix' can not be null");

CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
.endpoint(config.getConnEndpoint())
.key(config.getConnKey())
.consistencyLevel(ConsistencyLevel.SESSION)
.contentResponseOnWriteEnabled(true)
.connectionSharingAcrossClientsEnabled(config.isConnectionSharingEnabled())
.userAgentSuffix(userAgentSuffix);

if (config.isGatewayModeEnabled()) {
cosmosClientBuilder.gatewayMode();
}

return cosmosClientBuilder.buildAsyncClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class CosmosDBSourceConfig extends CosmosDBConfig {
private static final String COSMOS_USE_LATEST_OFFSET_DISPLAY = "Use latest offset";

static final String COSMOS_ASSIGNED_CONTAINER_CONF = "connect.cosmos.assigned.container";
static final String COSMOS_ASSIGNED_LEASE_CONTAINER_CONF = "connect.cosmos.assigned.lease.container";

static final String COSMOS_WORKER_NAME_CONF = "connect.cosmos.worker.name";
static final String COSMOS_WORKER_NAME_DEFAULT = "worker";
Expand All @@ -80,6 +81,7 @@ public class CosmosDBSourceConfig extends CosmosDBConfig {
// Variables not defined as Connect configs, should not be exposed when creating connector
private String workerName;
private String assignedContainer;
private String assignedLeaseContainer;

public CosmosDBSourceConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
Expand All @@ -98,6 +100,7 @@ public CosmosDBSourceConfig(Map<String, String> parsedConfig) {

// Since variables are not defined as Connect configs, grab values directly from Map
assignedContainer = parsedConfig.get(COSMOS_ASSIGNED_CONTAINER_CONF);
assignedLeaseContainer = parsedConfig.get(COSMOS_ASSIGNED_LEASE_CONTAINER_CONF);
workerName = parsedConfig.get(COSMOS_WORKER_NAME_CONF);
}

Expand Down Expand Up @@ -242,6 +245,10 @@ public String getAssignedContainer() {
return this.assignedContainer;
}

public String getAssignedLeaseContainer() {
return assignedLeaseContainer;
}

public String getWorkerName() {
return this.workerName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@

import java.util.function.Function;
import java.util.stream.Collectors;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.kafka.connect.CosmosDBConfig;
import com.azure.cosmos.kafka.connect.implementations.CosmosClientStore;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.ThroughputProperties;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -28,12 +39,20 @@ public class CosmosDBSourceConnector extends SourceConnector {

private static final Logger logger = LoggerFactory.getLogger(CosmosDBSourceConnector.class);
private CosmosDBSourceConfig config = null;
private CosmosAsyncClient cosmosClient = null;

@Override
public void start(Map<String, String> props) {
logger.info("Starting the Source Connector");
try {
config = new CosmosDBSourceConfig(props);
this.cosmosClient = CosmosClientStore.getCosmosClient(this.config, this.getUserAgentSuffix());

List<String> containerList = config.getTopicContainerMap().getContainerList();
for (String containerId : containerList) {
createLeaseContainerIfNotExists(cosmosClient, this.config.getDatabaseName(), this.getAssignedLeaseContainer(containerId));
}

} catch (ConfigException e) {
throw new ConnectException(
"Couldn't start CosmosDBSourceConnector due to configuration error", e);
Expand All @@ -59,8 +78,10 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
for (int i = 0; i < maxTasks; i++) {
// Equally distribute workers by assigning workers to containers in round-robin fashion.
Map<String, String> taskProps = config.originalsStrings();
taskProps.put(CosmosDBSourceConfig.COSMOS_ASSIGNED_CONTAINER_CONF,
containerList.get(i % containerList.size()));
String assignedContainer = containerList.get(i % containerList.size());

taskProps.put(CosmosDBSourceConfig.COSMOS_ASSIGNED_CONTAINER_CONF, assignedContainer);
taskProps.put(CosmosDBSourceConfig.COSMOS_ASSIGNED_LEASE_CONTAINER_CONF, this.getAssignedLeaseContainer(assignedContainer));
taskProps.put(CosmosDBSourceConfig.COSMOS_WORKER_NAME_CONF,
String.format("%s-%d-%d",
CosmosDBSourceConfig.COSMOS_WORKER_NAME_DEFAULT,
Expand All @@ -74,6 +95,9 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
@Override
public void stop() {
logger.info("Stopping CosmosDB Source Connector");
if (this.cosmosClient != null) {
this.cosmosClient.close();
}
}

@Override
Expand Down Expand Up @@ -101,4 +125,46 @@ public Config validate(Map<String, String> connectorConfigs) {

return config;
}

private String getAssignedLeaseContainer(String containerName) {
return containerName + "-leases";
}

private String getUserAgentSuffix() {
return CosmosDBConfig.COSMOS_CLIENT_USER_AGENT_SUFFIX + version();
}

private CosmosAsyncContainer createLeaseContainerIfNotExists(CosmosAsyncClient client, String databaseName, String leaseCollectionName) {
CosmosAsyncDatabase database = client.getDatabase(databaseName);
CosmosAsyncContainer leaseCollection = database.getContainer(leaseCollectionName);
CosmosContainerResponse leaseContainerResponse = null;

logger.info("Checking whether the lease container exists.");
try {
leaseContainerResponse = leaseCollection.read().block();
} catch (CosmosException ex) {
// Swallowing exceptions when the type is CosmosException and statusCode is 404
if (ex.getStatusCode() != 404) {
throw ex;
}
logger.info("Lease container does not exist {}", ex.getMessage());
}

if (leaseContainerResponse == null) {
logger.info("Creating the Lease container : {}", leaseCollectionName);
CosmosContainerProperties containerSettings = new CosmosContainerProperties(leaseCollectionName, "/id");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(400);
CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions();

try {
database.createContainer(containerSettings, throughputProperties, requestOptions).block();
} catch (Exception e) {
logger.error("Failed to create container {} in database {}", leaseCollectionName, databaseName);
throw e;
}
logger.info("Successfully created new lease container.");
}

return database.getContainer(leaseCollectionName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,22 @@

import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.ChangeFeedProcessorBuilder;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.kafka.connect.CosmosDBConfig;
import com.azure.cosmos.kafka.connect.TopicContainerMap;
import com.azure.cosmos.kafka.connect.implementations.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementations.CosmosKafkaSchedulers;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.ThroughputProperties;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -69,13 +64,13 @@ public void start(Map<String, String> map) {
this.queue = new LinkedTransferQueue<>();

logger.info("Worker {} Creating the client.", this.config.getWorkerName());
client = getCosmosClient(config);
client = CosmosClientStore.getCosmosClient(this.config, this.getUserAgentSuffix());

// Initialize the database, feed and lease containers
CosmosAsyncDatabase database = client.getDatabase(config.getDatabaseName());
String container = config.getAssignedContainer();
CosmosAsyncContainer feedContainer = database.getContainer(container);
leaseContainer = createNewLeaseContainer(client, config.getDatabaseName(), container + "-leases");
leaseContainer = database.getContainer(this.config.getAssignedLeaseContainer());

// Create source partition map
partitionMap = new HashMap<>();
Expand Down Expand Up @@ -212,34 +207,28 @@ public void stop() {
// NOTE: poll() method and stop() method are both called from the same thread,
// so it is important not to include any changes which may block both places forever
running.set(false);

// Release all the resources.
if (changeFeedProcessor != null) {
changeFeedProcessor.stop().block();
changeFeedProcessor = null;
}

if (this.client != null) {
this.client.close();
}
Mono.just(this)
.flatMap(connectorTask -> {
if (this.changeFeedProcessor != null) {
return this.changeFeedProcessor.stop()
.delayElement(Duration.ofMillis(500)) // delay some time here as the partitionProcessor will release the lease in background
.doOnNext(t -> {
this.changeFeedProcessor = null;
this.safeCloseClient();
});
} else {
this.safeCloseClient();
return Mono.empty();
}
})
.block();
}

private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) {
logger.info("Worker {} Creating Cosmos Client.", this.config.getWorkerName());

CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
.endpoint(config.getConnEndpoint())
.key(config.getConnKey())
.consistencyLevel(ConsistencyLevel.SESSION)
.contentResponseOnWriteEnabled(true)
.connectionSharingAcrossClientsEnabled(config.isConnectionSharingEnabled())
.userAgentSuffix(getUserAgentSuffix());

if (config.isGatewayModeEnabled()) {
cosmosClientBuilder.gatewayMode();
private void safeCloseClient() {
if (this.client != null) {
this.client.close();
}

return cosmosClientBuilder.buildAsyncClient();
}

private String getUserAgentSuffix() {
Expand Down Expand Up @@ -292,6 +281,7 @@ protected void handleCosmosDbChanges(List<JsonNode> docs) {
// The queue is being continuously polled and then put into a batch list, but the batch list is not being flushed right away
// until batch size or maxWaitTime reached. Which can cause CFP to checkpoint faster than kafka batch.
// In order to not move CFP checkpoint faster, we are using shouldFillMoreRecords to control the batch flush.
logger.debug("Transferring document " + this.config.getWorkerName());
this.queue.transfer(document);
} catch (InterruptedException e) {
logger.error("Interrupted! changeFeedReader.", e);
Expand All @@ -307,38 +297,4 @@ protected void handleCosmosDbChanges(List<JsonNode> docs) {
this.shouldFillMoreRecords.set(false);
}
}

private CosmosAsyncContainer createNewLeaseContainer(CosmosAsyncClient client, String databaseName, String leaseCollectionName) {
CosmosAsyncDatabase database = client.getDatabase(databaseName);
CosmosAsyncContainer leaseCollection = database.getContainer(leaseCollectionName);
CosmosContainerResponse leaseContainerResponse = null;

logger.info("Checking whether the lease container exists.");
try {
leaseContainerResponse = leaseCollection.read().block();
} catch (CosmosException ex) {
// Swallowing exceptions when the type is CosmosException and statusCode is 404
if (ex.getStatusCode() != 404) {
throw ex;
}
logger.info("Lease container does not exist {}", ex.getMessage());
}

if (leaseContainerResponse == null) {
logger.info("Creating the Lease container : {}", leaseCollectionName);
CosmosContainerProperties containerSettings = new CosmosContainerProperties(leaseCollectionName, "/id");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(400);
CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions();

try {
database.createContainer(containerSettings, throughputProperties, requestOptions).block();
} catch (Exception e) {
logger.error("Failed to create container {} in database {}", leaseCollectionName, databaseName);
throw e;
}
logger.info("Successfully created new lease container.");
}

return database.getContainer(leaseCollectionName);
}
}
Loading

0 comments on commit 6ead162

Please sign in to comment.