Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release1.9.0 #519

Merged
merged 17 commits into from
Jun 20, 2023
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
## Release History

### 1.9.0 (2023-06-19)
#### New Features
* Updated `azure-cosmos` version to 4.46.0.

#### Breaking Changes
* This change will stop updating the lease container continuation token based on kafka offset when using the config `connect.cosmos.offset.useLatest`. [PR 516](https://github.com/microsoft/kafka-connect-cosmosdb/pull/516)

When set to `false`, if the lease container exists, connector will now process the changes from the current continuationToken in `leaseContainer`, if the lease container does not exist, then it will start process the changes from beginning.
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
If you want to start from beginning, it is advised to delete the lease container or change the kafka worker name.

when set to `true`, if the lease container exists, connector will now process the changes from the current continuation token in `leaseContainer`, if the lease container does exist, then it will start process the changes from now.
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved

#### Key Bug Fixes
* Fixed issue in `CosmosDBSourceConnector` where no record being read when configured `connect.cosmos.offset.useLatest` to be false. [PR 516](https://github.com/microsoft/kafka-connect-cosmosdb/pull/516)

#### Other Changes
* Return from `poll` method once all records have been processed from the changeFeedProcessor batches. [PR 517](https://github.com/microsoft/kafka-connect-cosmosdb/pull/517)

### 1.8.0 (2023-04-12)
#### New Features
* Updated `azure-cosmos` version to 4.42.0.
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.azure.cosmos.kafka</groupId>
<artifactId>kafka-connect-cosmos</artifactId>
<version>1.8.0</version>
<version>1.9.0</version>

<name> kafka-connect-cosmos</name>
<url>https://github.com/microsoft/kafka-connect-cosmosdb</url>
Expand Down Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>4.42.0</version>
<version>4.46.0</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,30 @@

package com.azure.cosmos.kafka.connect.source;

import static java.lang.Thread.sleep;
import static java.util.Collections.singletonMap;

import com.azure.cosmos.*;
import com.azure.cosmos.models.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.azure.cosmos.kafka.connect.TopicContainerMap;
import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.ChangeFeedProcessorBuilder;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.kafka.connect.CosmosDBConfig;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.azure.cosmos.kafka.connect.TopicContainerMap;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ThroughputProperties;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -28,14 +37,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import reactor.core.scheduler.Schedulers;
import static java.lang.Thread.sleep;
import static java.util.Collections.singletonMap;

public class CosmosDBSourceTask extends SourceTask {

private static final Logger logger = LoggerFactory.getLogger(CosmosDBSourceTask.class);
private static final String OFFSET_KEY = "recordContinuationToken";
private static final String CONTINUATION_TOKEN = "ContinuationToken";
private static final String ZERO_CONTINUATION_TOKEN = "0";

private final AtomicBoolean running = new AtomicBoolean(false);
private CosmosAsyncClient client = null;
Expand All @@ -45,6 +54,7 @@ public class CosmosDBSourceTask extends SourceTask {
private JsonToStruct jsonToStruct = new JsonToStruct();
private Map<String, String> partitionMap;
private CosmosAsyncContainer leaseContainer;
private final AtomicBoolean shouldFillMoreRecords = new AtomicBoolean(true);

@Override
public String version() {
Expand All @@ -70,26 +80,13 @@ public void start(Map<String, String> map) {
partitionMap = new HashMap<>();
partitionMap.put("DatabaseName", config.getDatabaseName());
partitionMap.put("Container", config.getAssignedContainer());

Map<String, Object> offset = context.offsetStorageReader().offset(partitionMap);
// If NOT using the latest offset, reset lease container token to earliest possible value
if (!config.useLatestOffset()) {
updateContinuationToken(ZERO_CONTINUATION_TOKEN);
} else if (offset != null) {
// Check for previous offset and compare with lease container token
// If there's a mismatch, rewind lease container token to offset value
String lastOffsetToken = (String) offset.get(OFFSET_KEY);
String continuationToken = getContinuationToken();

if (continuationToken != null && !lastOffsetToken.equals(continuationToken)) {
logger.info("Mismatch in last offset {} and current continuation token {}.",
lastOffsetToken, continuationToken);
updateContinuationToken(lastOffsetToken);
}
}

// ChangeFeedProcessor tracks its progress in the lease container
// We are going to skip using kafka offset
// In the future when we change to ues changeFeed pull model, then it will be required to track/use the kafka offset to resume the work

// Initiate Cosmos change feed processor
changeFeedProcessor = getChangeFeedProcessor(config.getWorkerName(),feedContainer,leaseContainer);
changeFeedProcessor = getChangeFeedProcessor(config.getWorkerName(), feedContainer, leaseContainer, config.useLatestOffset());
changeFeedProcessor.start()
.subscribeOn(Schedulers.boundedElastic())
.doOnSuccess(aVoid -> running.set(true))
Expand Down Expand Up @@ -130,14 +127,6 @@ private String getContinuationToken() {
return leaseRecord.get(CONTINUATION_TOKEN).textValue();
}

private void updateContinuationToken(String newToken) {
JsonNode leaseRecord = getLeaseContainerRecord();
if (leaseRecord != null) {
((ObjectNode)leaseRecord).put(CONTINUATION_TOKEN, newToken);
leaseContainer.upsertItem(leaseRecord).block();
}
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
Expand All @@ -155,7 +144,8 @@ public List<SourceRecord> poll() throws InterruptedException {
break;
}
}


this.shouldFillMoreRecords.set(true);
return records;
}

Expand All @@ -166,7 +156,7 @@ private void fillRecords(List<SourceRecord> records, String topic) throws Interr
long maxWaitTime = System.currentTimeMillis() + config.getTaskTimeout();

int count = 0;
while (bufferSize > 0 && count < batchSize && System.currentTimeMillis() < maxWaitTime) {
while (bufferSize > 0 && count < batchSize && System.currentTimeMillis() < maxWaitTime && this.shouldFillMoreRecords.get()) {
JsonNode node = this.queue.poll(config.getTaskPollInterval(), TimeUnit.MILLISECONDS);

if (node == null) {
Expand Down Expand Up @@ -259,13 +249,18 @@ private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) {
return cosmosClientBuilder.buildAsyncClient();
}

private ChangeFeedProcessor getChangeFeedProcessor(String hostName, CosmosAsyncContainer feedContainer, CosmosAsyncContainer leaseContainer) {
private ChangeFeedProcessor getChangeFeedProcessor(
String hostName,
CosmosAsyncContainer feedContainer,
CosmosAsyncContainer leaseContainer,
boolean useLatestOffset) {
logger.info("Creating Change Feed Processor for {}.", hostName);

ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
changeFeedProcessorOptions.setFeedPollDelay(Duration.ofMillis(config.getTaskPollInterval()));
changeFeedProcessorOptions.setMaxItemCount(config.getTaskBatchSize().intValue());
changeFeedProcessorOptions.setLeasePrefix(config.getAssignedContainer() + config.getDatabaseName() + ".");
changeFeedProcessorOptions.setStartFromBeginning(!useLatestOffset);

return new ChangeFeedProcessorBuilder()
.options(changeFeedProcessorOptions)
Expand All @@ -283,13 +278,22 @@ protected void handleCosmosDbChanges(List<JsonNode> docs) {
try {
logger.trace("Queuing document");

// The item is being transferred to the queue, and the method will only return if the item has been polled from the queue.
// The queue is being continuously polled and then put into a batch list, but the batch list is not being flushed right away
// until batch size or maxWaitTime reached. Which can cause CFP to checkpoint faster than kafka batch.
// In order to not move CFP checkpoint faster, we are using shouldFillMoreRecords to control the batch flush.
this.queue.transfer(document);
} catch (InterruptedException e) {
logger.error("Interrupted! changeFeedReader.", e);
// Restore interrupted state...
Thread.currentThread().interrupt();
}
}

if (docs.size() > 0) {
// it is important to flush the current batches to kafka as currently we are using lease container continuationToken for book marking
// so we would only want to move ahead of the book marking when all the records have been returned to kafka
this.shouldFillMoreRecords.set(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class CosmosDBSourceTaskTest {
private CosmosAsyncContainer mockLeaseContainer;
private Map<String, String> sourceSettings;
private CosmosDBSourceConfig config;
private LinkedTransferQueue<JsonNode> queue;

@Before
@SuppressWarnings("unchecked") // Need to maintain Typed objects
Expand All @@ -59,7 +60,7 @@ public void setup() throws IllegalAccessException {
FieldUtils.writeField(testTask, "config", config, true);

// Create the TransferQueue
LinkedTransferQueue<JsonNode> queue = new LinkedTransferQueue<>();
this.queue = new LinkedTransferQueue<>();
FieldUtils.writeField(testTask, "queue", queue, true);

// Set the running flag to true
Expand Down Expand Up @@ -95,7 +96,34 @@ public void setup() throws IllegalAccessException {
}

@Test
public void testPoll() throws InterruptedException, JsonProcessingException {
public void testHandleChanges() throws JsonProcessingException, IllegalAccessException, InterruptedException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
changes.add(actualObj);

new Thread(() -> {
testTask.handleCosmosDbChanges(changes);
}).start();

int recordCount = 0;
while(recordCount == 0) {
JsonNode jsonNode = this.queue.poll();
if (jsonNode != null) {
recordCount++;
}
}

// wait for the handleChanges logic to finish
Thread.sleep(500);
AtomicBoolean shouldFillMoreRecords =
(AtomicBoolean) FieldUtils.readField(FieldUtils.getField(CosmosDBSourceTask.class, "shouldFillMoreRecords", true), testTask);
Assert.assertFalse(shouldFillMoreRecords.get());
}

@Test
public void testPoll() throws InterruptedException, JsonProcessingException, IllegalAccessException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
Expand All @@ -108,6 +136,34 @@ public void testPoll() throws InterruptedException, JsonProcessingException {

List<SourceRecord> result=testTask.poll();
Assert.assertEquals(1, result.size());
AtomicBoolean shouldFillMoreRecords =
(AtomicBoolean) FieldUtils.readField(FieldUtils.getField(CosmosDBSourceTask.class, "shouldFillMoreRecords", true), testTask);
Assert.assertTrue(shouldFillMoreRecords.get());
}

@Test
public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, JsonProcessingException, IllegalAccessException {
// test when should fillMoreRecords false, then poll method will return immediately
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);

new Thread(() -> {
try {
this.queue.transfer(actualObj);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();

Thread.sleep(500);
AtomicBoolean shouldFillMoreRecords =
(AtomicBoolean) FieldUtils.readField(FieldUtils.getField(CosmosDBSourceTask.class, "shouldFillMoreRecords", true), testTask);
shouldFillMoreRecords.set(false);

List<SourceRecord> result=testTask.poll();
Assert.assertEquals(0, result.size());
Assert.assertTrue(shouldFillMoreRecords.get());
}

@Test
Expand Down
Loading