Skip to content

Commit

Permalink
using lsn as the sourceoffset
Browse files Browse the repository at this point in the history
  • Loading branch information
annie-mac committed Oct 17, 2023
1 parent c2d37a8 commit 49d5274
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ThroughputProperties;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -44,7 +43,7 @@ public class CosmosDBSourceTask extends SourceTask {

private static final Logger logger = LoggerFactory.getLogger(CosmosDBSourceTask.class);
private static final String OFFSET_KEY = "recordContinuationToken";
private static final String CONTINUATION_TOKEN = "ContinuationToken";
private static final String LSN_ATTRIBUTE_NAME = "_lsn";

private final AtomicBoolean running = new AtomicBoolean(false);
private CosmosAsyncClient client = null;
Expand Down Expand Up @@ -105,26 +104,8 @@ public void start(Map<String, String> map) {
logger.info("Started CosmosDB source task.");
}

private JsonNode getLeaseContainerRecord() {
String sql = "SELECT * FROM c WHERE IS_DEFINED(c.Owner)";
Iterable<JsonNode> filteredDocs = leaseContainer.queryItems(sql, new CosmosQueryRequestOptions(), JsonNode.class).toIterable();
if (filteredDocs.iterator().hasNext()) {
JsonNode result = filteredDocs.iterator().next();
// Return node only if it has the continuation token field present
if (result.has(CONTINUATION_TOKEN)) {
return result;
}
}

return null;
}

private String getContinuationToken() {
JsonNode leaseRecord = getLeaseContainerRecord();
if (client == null || leaseRecord == null) {
return null;
}
return leaseRecord.get(CONTINUATION_TOKEN).textValue();
private String getItemLsn(JsonNode item) {
return item.get(LSN_ATTRIBUTE_NAME).asText();
}

@Override
Expand Down Expand Up @@ -171,10 +152,8 @@ private void fillRecords(List<SourceRecord> records, String topic) throws Interr
messageKey = (messageKeyFieldNode != null) ? messageKeyFieldNode.toString() : "";
}

// Get the latest token and record as offset
// TODO: The continuationToken here is picked from any lease with owner, so maybe a little bit random
// change to show the continuationToken for the leases processed by the current worker
Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getContinuationToken());
// Get the latest lsn and record as offset
Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getItemLsn(node));

if (logger.isDebugEnabled()) {
logger.debug("Latest offset is {}.", sourceOffset.get(OFFSET_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class CosmosDBSourceTaskTest {
private final String topicName = "testtopic";
private final String containerName = "container666";
private final String databaseName = "fakeDatabase312";
private final String OFFSET_KEY = "recordContinuationToken";
private CosmosAsyncClient mockCosmosClient;
private CosmosAsyncContainer mockFeedContainer;
private CosmosAsyncContainer mockLeaseContainer;
Expand Down Expand Up @@ -183,6 +184,24 @@ public void testPollWithMessageKey() throws InterruptedException, JsonProcessing
Assert.assertEquals("123", result.get(0).key());
}

@Test
public void testSourceRecordOffset() throws InterruptedException, JsonProcessingException {
String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
changes.add(actualObj);

new Thread(() -> {
testTask.handleCosmosDbChanges(changes);
}).start();

List<SourceRecord> results = testTask.poll();
Assert.assertEquals(1, results.size());
Assert.assertEquals("123", results.get(0).key());
Assert.assertEquals("2", results.get(0).sourceOffset().get(OFFSET_KEY));
}

@Test
public void testZeroBatchSize() throws InterruptedException, JsonProcessingException, IllegalAccessException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
Expand Down

0 comments on commit 49d5274

Please sign in to comment.