diff --git a/CHANGELOG.md b/CHANGELOG.md
index e2e319eb..06a5ee7e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,23 @@
## Release History
+### 1.9.0 (2023-06-19)
+#### New Features
+* Updated `azure-cosmos` version to 4.46.0.
+
+#### Breaking Changes
+* This change will stop updating the lease container continuation token based on kafka offset when using the config `connect.cosmos.offset.useLatest`. [PR 516](https://github.com/microsoft/kafka-connect-cosmosdb/pull/516)
+
+When set to `false`, if the lease container exists, connector will now process the changes from the current continuationToken in `leaseContainer`, if the lease container does not exist, then it will start process the changes from beginning.
+If you want to start from beginning, it is advised to delete the lease container or change the kafka worker name.
+
+when set to `true`, if the lease container exists, connector will now process the changes from the current continuation token in `leaseContainer`, if the lease container does exist, then it will start process the changes from now.
+
+#### Key Bug Fixes
+* Fixed issue in `CosmosDBSourceConnector` where no record being read when configured `connect.cosmos.offset.useLatest` to be false. [PR 516](https://github.com/microsoft/kafka-connect-cosmosdb/pull/516)
+
+#### Other Changes
+* Return from `poll` method once all records have been processed from the changeFeedProcessor batches. [PR 517](https://github.com/microsoft/kafka-connect-cosmosdb/pull/517)
+
### 1.8.0 (2023-04-12)
#### New Features
* Updated `azure-cosmos` version to 4.42.0.
diff --git a/pom.xml b/pom.xml
index 03da0ddd..8aee743a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
com.azure.cosmos.kafka
kafka-connect-cosmos
- 1.8.0
+ 1.9.0
kafka-connect-cosmos
https://github.com/microsoft/kafka-connect-cosmosdb
@@ -48,7 +48,7 @@
com.azure
azure-cosmos
- 4.42.0
+ 4.46.0
com.jayway.jsonpath
diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
index 6eaa7345..5e3c2757 100644
--- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
+++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
@@ -3,21 +3,30 @@
package com.azure.cosmos.kafka.connect.source;
-import static java.lang.Thread.sleep;
-import static java.util.Collections.singletonMap;
-
-import com.azure.cosmos.*;
-import com.azure.cosmos.models.*;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.azure.cosmos.kafka.connect.TopicContainerMap;
+import com.azure.cosmos.ChangeFeedProcessor;
+import com.azure.cosmos.ChangeFeedProcessorBuilder;
+import com.azure.cosmos.ConsistencyLevel;
+import com.azure.cosmos.CosmosAsyncClient;
+import com.azure.cosmos.CosmosAsyncContainer;
+import com.azure.cosmos.CosmosAsyncDatabase;
+import com.azure.cosmos.CosmosClientBuilder;
+import com.azure.cosmos.CosmosException;
import com.azure.cosmos.kafka.connect.CosmosDBConfig;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.azure.cosmos.kafka.connect.TopicContainerMap;
+import com.azure.cosmos.models.ChangeFeedProcessorOptions;
+import com.azure.cosmos.models.CosmosContainerProperties;
+import com.azure.cosmos.models.CosmosContainerRequestOptions;
+import com.azure.cosmos.models.CosmosContainerResponse;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.models.ThroughputProperties;
+import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
-import org.apache.kafka.connect.data.SchemaAndValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.ArrayList;
@@ -28,14 +37,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import reactor.core.scheduler.Schedulers;
+import static java.lang.Thread.sleep;
+import static java.util.Collections.singletonMap;
public class CosmosDBSourceTask extends SourceTask {
private static final Logger logger = LoggerFactory.getLogger(CosmosDBSourceTask.class);
private static final String OFFSET_KEY = "recordContinuationToken";
private static final String CONTINUATION_TOKEN = "ContinuationToken";
- private static final String ZERO_CONTINUATION_TOKEN = "0";
private final AtomicBoolean running = new AtomicBoolean(false);
private CosmosAsyncClient client = null;
@@ -45,6 +54,7 @@ public class CosmosDBSourceTask extends SourceTask {
private JsonToStruct jsonToStruct = new JsonToStruct();
private Map partitionMap;
private CosmosAsyncContainer leaseContainer;
+ private final AtomicBoolean shouldFillMoreRecords = new AtomicBoolean(true);
@Override
public String version() {
@@ -70,26 +80,13 @@ public void start(Map map) {
partitionMap = new HashMap<>();
partitionMap.put("DatabaseName", config.getDatabaseName());
partitionMap.put("Container", config.getAssignedContainer());
-
- Map offset = context.offsetStorageReader().offset(partitionMap);
- // If NOT using the latest offset, reset lease container token to earliest possible value
- if (!config.useLatestOffset()) {
- updateContinuationToken(ZERO_CONTINUATION_TOKEN);
- } else if (offset != null) {
- // Check for previous offset and compare with lease container token
- // If there's a mismatch, rewind lease container token to offset value
- String lastOffsetToken = (String) offset.get(OFFSET_KEY);
- String continuationToken = getContinuationToken();
-
- if (continuationToken != null && !lastOffsetToken.equals(continuationToken)) {
- logger.info("Mismatch in last offset {} and current continuation token {}.",
- lastOffsetToken, continuationToken);
- updateContinuationToken(lastOffsetToken);
- }
- }
+
+ // ChangeFeedProcessor tracks its progress in the lease container
+ // We are going to skip using kafka offset
+ // In the future when we change to ues changeFeed pull model, then it will be required to track/use the kafka offset to resume the work
// Initiate Cosmos change feed processor
- changeFeedProcessor = getChangeFeedProcessor(config.getWorkerName(),feedContainer,leaseContainer);
+ changeFeedProcessor = getChangeFeedProcessor(config.getWorkerName(), feedContainer, leaseContainer, config.useLatestOffset());
changeFeedProcessor.start()
.subscribeOn(Schedulers.boundedElastic())
.doOnSuccess(aVoid -> running.set(true))
@@ -130,14 +127,6 @@ private String getContinuationToken() {
return leaseRecord.get(CONTINUATION_TOKEN).textValue();
}
- private void updateContinuationToken(String newToken) {
- JsonNode leaseRecord = getLeaseContainerRecord();
- if (leaseRecord != null) {
- ((ObjectNode)leaseRecord).put(CONTINUATION_TOKEN, newToken);
- leaseContainer.upsertItem(leaseRecord).block();
- }
- }
-
@Override
public List poll() throws InterruptedException {
List records = new ArrayList<>();
@@ -155,7 +144,8 @@ public List poll() throws InterruptedException {
break;
}
}
-
+
+ this.shouldFillMoreRecords.set(true);
return records;
}
@@ -166,7 +156,7 @@ private void fillRecords(List records, String topic) throws Interr
long maxWaitTime = System.currentTimeMillis() + config.getTaskTimeout();
int count = 0;
- while (bufferSize > 0 && count < batchSize && System.currentTimeMillis() < maxWaitTime) {
+ while (bufferSize > 0 && count < batchSize && System.currentTimeMillis() < maxWaitTime && this.shouldFillMoreRecords.get()) {
JsonNode node = this.queue.poll(config.getTaskPollInterval(), TimeUnit.MILLISECONDS);
if (node == null) {
@@ -259,13 +249,18 @@ private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) {
return cosmosClientBuilder.buildAsyncClient();
}
- private ChangeFeedProcessor getChangeFeedProcessor(String hostName, CosmosAsyncContainer feedContainer, CosmosAsyncContainer leaseContainer) {
+ private ChangeFeedProcessor getChangeFeedProcessor(
+ String hostName,
+ CosmosAsyncContainer feedContainer,
+ CosmosAsyncContainer leaseContainer,
+ boolean useLatestOffset) {
logger.info("Creating Change Feed Processor for {}.", hostName);
ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
changeFeedProcessorOptions.setFeedPollDelay(Duration.ofMillis(config.getTaskPollInterval()));
changeFeedProcessorOptions.setMaxItemCount(config.getTaskBatchSize().intValue());
changeFeedProcessorOptions.setLeasePrefix(config.getAssignedContainer() + config.getDatabaseName() + ".");
+ changeFeedProcessorOptions.setStartFromBeginning(!useLatestOffset);
return new ChangeFeedProcessorBuilder()
.options(changeFeedProcessorOptions)
@@ -283,13 +278,22 @@ protected void handleCosmosDbChanges(List docs) {
try {
logger.trace("Queuing document");
+ // The item is being transferred to the queue, and the method will only return if the item has been polled from the queue.
+ // The queue is being continuously polled and then put into a batch list, but the batch list is not being flushed right away
+ // until batch size or maxWaitTime reached. Which can cause CFP to checkpoint faster than kafka batch.
+ // In order to not move CFP checkpoint faster, we are using shouldFillMoreRecords to control the batch flush.
this.queue.transfer(document);
} catch (InterruptedException e) {
logger.error("Interrupted! changeFeedReader.", e);
// Restore interrupted state...
Thread.currentThread().interrupt();
}
+ }
+ if (docs.size() > 0) {
+ // it is important to flush the current batches to kafka as currently we are using lease container continuationToken for book marking
+ // so we would only want to move ahead of the book marking when all the records have been returned to kafka
+ this.shouldFillMoreRecords.set(false);
}
}
diff --git a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
index 0a481ebf..6d9f3320 100644
--- a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
+++ b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
@@ -40,6 +40,7 @@ public class CosmosDBSourceTaskTest {
private CosmosAsyncContainer mockLeaseContainer;
private Map sourceSettings;
private CosmosDBSourceConfig config;
+ private LinkedTransferQueue queue;
@Before
@SuppressWarnings("unchecked") // Need to maintain Typed objects
@@ -59,7 +60,7 @@ public void setup() throws IllegalAccessException {
FieldUtils.writeField(testTask, "config", config, true);
// Create the TransferQueue
- LinkedTransferQueue queue = new LinkedTransferQueue<>();
+ this.queue = new LinkedTransferQueue<>();
FieldUtils.writeField(testTask, "queue", queue, true);
// Set the running flag to true
@@ -95,7 +96,34 @@ public void setup() throws IllegalAccessException {
}
@Test
- public void testPoll() throws InterruptedException, JsonProcessingException {
+ public void testHandleChanges() throws JsonProcessingException, IllegalAccessException, InterruptedException {
+ String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode actualObj = mapper.readTree(jsonString);
+ List changes = new ArrayList<>();
+ changes.add(actualObj);
+
+ new Thread(() -> {
+ testTask.handleCosmosDbChanges(changes);
+ }).start();
+
+ int recordCount = 0;
+ while(recordCount == 0) {
+ JsonNode jsonNode = this.queue.poll();
+ if (jsonNode != null) {
+ recordCount++;
+ }
+ }
+
+ // wait for the handleChanges logic to finish
+ Thread.sleep(500);
+ AtomicBoolean shouldFillMoreRecords =
+ (AtomicBoolean) FieldUtils.readField(FieldUtils.getField(CosmosDBSourceTask.class, "shouldFillMoreRecords", true), testTask);
+ Assert.assertFalse(shouldFillMoreRecords.get());
+ }
+
+ @Test
+ public void testPoll() throws InterruptedException, JsonProcessingException, IllegalAccessException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
@@ -108,6 +136,34 @@ public void testPoll() throws InterruptedException, JsonProcessingException {
List result=testTask.poll();
Assert.assertEquals(1, result.size());
+ AtomicBoolean shouldFillMoreRecords =
+ (AtomicBoolean) FieldUtils.readField(FieldUtils.getField(CosmosDBSourceTask.class, "shouldFillMoreRecords", true), testTask);
+ Assert.assertTrue(shouldFillMoreRecords.get());
+ }
+
+ @Test
+ public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, JsonProcessingException, IllegalAccessException {
+ // test when should fillMoreRecords false, then poll method will return immediately
+ String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode actualObj = mapper.readTree(jsonString);
+
+ new Thread(() -> {
+ try {
+ this.queue.transfer(actualObj);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }).start();
+
+ Thread.sleep(500);
+ AtomicBoolean shouldFillMoreRecords =
+ (AtomicBoolean) FieldUtils.readField(FieldUtils.getField(CosmosDBSourceTask.class, "shouldFillMoreRecords", true), testTask);
+ shouldFillMoreRecords.set(false);
+
+ List result=testTask.poll();
+ Assert.assertEquals(0, result.size());
+ Assert.assertTrue(shouldFillMoreRecords.get());
}
@Test
diff --git a/src/test/java/com/azure/cosmos/kafka/connect/source/integration/SourceConnectorIT.java b/src/test/java/com/azure/cosmos/kafka/connect/source/integration/SourceConnectorIT.java
index e22ebd0f..1e033515 100644
--- a/src/test/java/com/azure/cosmos/kafka/connect/source/integration/SourceConnectorIT.java
+++ b/src/test/java/com/azure/cosmos/kafka/connect/source/integration/SourceConnectorIT.java
@@ -7,44 +7,45 @@
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosDatabase;
+import com.azure.cosmos.CosmosException;
+import com.azure.cosmos.kafka.connect.IntegrationTest;
import com.azure.cosmos.models.CosmosContainerProperties;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ThroughputProperties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonDeserializer;
-import org.sourcelab.kafka.connect.apiclient.Configuration;
-import org.sourcelab.kafka.connect.apiclient.KafkaConnectClient;
-import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition;
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-
-import com.azure.cosmos.kafka.connect.IntegrationTest;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.sourcelab.kafka.connect.apiclient.Configuration;
+import org.sourcelab.kafka.connect.apiclient.KafkaConnectClient;
+import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
-import java.util.Arrays;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import java.time.Duration;
+import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.sleep;
import static org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition.Builder;
@@ -78,6 +79,7 @@ public class SourceConnectorIT {
private KafkaConsumer avroConsumer;
private List> recordBuffer;
private List> avroRecordBuffer;
+ private String leaseContainerName;
/**
* Load CosmosDB configuration from the connector config JSON and set up CosmosDB client.
@@ -93,7 +95,7 @@ public void before() throws URISyntaxException, IOException {
String topicContainerMap = config.get("connect.cosmos.containers.topicmap").textValue();
String topic = StringUtils.substringBefore(topicContainerMap, "#");
String containerName = StringUtils.substringAfter(topicContainerMap, "#");
-
+ this.leaseContainerName = containerName + "-leases";
// Setup Cosmos Client
logger.debug("Setting up the Cosmos DB client");
cosmosClient = new CosmosClientBuilder()
@@ -495,6 +497,101 @@ public void testResumeFromLatestOffsetsMultipleWorkers() throws InterruptedExcep
Assert.assertNotNull("Person B could not be retrieved from messages", resultRecord.orElse(null));
}
+ /**
+ * Test when lease container is not initialized and connect.cosmos.offset.useLatest = false
+ * the change feed should start processing from beginning
+ */
+ @Test
+ public void testStart_notUseLatestOffset() {
+
+ // Delete previous created lease container
+ try {
+ cosmosClient.getDatabase(this.databaseName).getContainer(this.leaseContainerName).delete();
+ } catch (CosmosException e) {
+ if (e.getStatusCode() == 404) {
+ logger.info("Lease container does not exists");
+ } else {
+ throw e;
+ }
+ }
+
+ // Create source connector with default config
+ connectClient.addConnector(connectConfig.build());
+
+ // Allow time for Source connector to setup resources
+ sleep(8000);
+
+ // Create item in Cosmos DB
+ Person person = new Person("Lucy Ferr", RandomUtils.nextLong(1L, 9999999L) + "");
+ targetContainer.createItem(person);
+
+ // Allow time for Source connector to process data from Cosmos DB
+ sleep(8000);
+ // Verify the lease document continuationToken is not null and > 0
+ List leaseDocuments = this.getAllLeaseDocuments();
+ for (JsonNode leaseDocument : leaseDocuments) {
+ Assert.assertTrue(
+ Integer.parseInt(leaseDocument.get("ContinuationToken").asText().replace("\"", "")) > 0);
+ }
+ }
+
+ /**
+ * Test when lease container is not initialized and connect.cosmos.offset.useLatest = true
+ * the change feed should start processing from now
+ */
+ @Test
+ public void testStart_useLatestOffset() {
+
+ // Delete previous created lease container
+ try {
+ cosmosClient.getDatabase(this.databaseName).getContainer(this.leaseContainerName).delete();
+ } catch (CosmosException e) {
+ if (e.getStatusCode() == 404) {
+ logger.info("Lease container does not exists");
+ } else {
+ throw e;
+ }
+ }
+
+ // Create item before the task start
+ Person person = new Person("Lucy Ferr", RandomUtils.nextLong(1L, 9999999L) + "");
+ targetContainer.createItem(person);
+
+ // Create source connector with default config
+ connectClient.addConnector(
+ connectConfig.withConfig("connect.cosmos.offset.useLatest", true).build());
+
+ // Allow time for Source connector to setup resources
+ sleep(10000);
+
+ // Verify the lease document continuationToken will be null
+ List leaseDocuments = this.getAllLeaseDocuments();
+ for (JsonNode leaseDocument : leaseDocuments) {
+ Assert.assertTrue(leaseDocument.get("ContinuationToken").isNull());
+ }
+
+ // now create some new items
+ person = new Person("Lucy Ferr", RandomUtils.nextLong(1L, 9999999L) + "");
+ targetContainer.createItem(person);
+ // Allow time for Source connector to setup resources
+ sleep(10000);
+
+ // Verify the lease document continuationToken will be null
+ leaseDocuments = this.getAllLeaseDocuments();
+ for (JsonNode leaseDocument : leaseDocuments) {
+ Assert.assertTrue(
+ Integer.parseInt(leaseDocument.get("ContinuationToken").asText().replace("\"", "")) > 0); }
+ }
+
+ private List getAllLeaseDocuments() {
+ String sql = "SELECT * FROM c WHERE IS_DEFINED(c.Owner)";
+ return this.cosmosClient
+ .getDatabase(this.databaseName)
+ .getContainer(this.leaseContainerName)
+ .queryItems(sql, new CosmosQueryRequestOptions(), JsonNode.class)
+ .stream().collect(Collectors.toList());
+ }
+
/**
* A simple entity to serialize to/deserialize from JSON in tests.
*/